Source code for autogl.module.model.pyg.gat

import torch
import torch.nn.functional as F
from torch_geometric.nn import GATConv
from . import register_model
from .base import BaseAutoModel, activate_func
from ....utils import get_logger

LOGGER = get_logger("GATModel")


def set_default(args, d):
    for k, v in d.items():
        if k not in args:
            args[k] = v
    return args


class GAT(torch.nn.Module):
    def __init__(self, args):
        super(GAT, self).__init__()
        self.args = args
        self.num_layer = int(self.args["num_layers"])

        missing_keys = list(
            set(
                [
                    "features_num",
                    "num_class",
                    "num_layers",
                    "hidden",
                    "heads",
                    "dropout",
                    "act",
                ]
            )
            - set(self.args.keys())
        )
        if len(missing_keys) > 0:
            raise Exception("Missing keys: %s." % ",".join(missing_keys))

        if not self.num_layer == len(self.args["hidden"]) + 1:
            LOGGER.warn("Warning: layer size does not match the length of hidden units")
        self.convs = torch.nn.ModuleList()
        self.convs.append(
            GATConv(
                self.args["features_num"],
                self.args["hidden"][0],
                heads=self.args["heads"],
                dropout=self.args["dropout"],
            )
        )
        last_dim = self.args["hidden"][0] * self.args["heads"]
        for i in range(self.num_layer - 2):
            self.convs.append(
                GATConv(
                    last_dim,
                    self.args["hidden"][i + 1],
                    heads=self.args["heads"],
                    dropout=self.args["dropout"],
                )
            )
            last_dim = self.args["hidden"][i + 1] * self.args["heads"]
        self.convs.append(
            GATConv(
                last_dim,
                self.args["num_class"],
                heads=1,
                concat=False,
                dropout=self.args["dropout"],
            )
        )

    def forward(self, data):
        try:
            x = data.x
        except:
            print("no x")
            pass
        try:
            edge_index = data.edge_index
        except:
            print("no index")
            pass
        try:
            edge_weight = data.edge_weight
        except:
            edge_weight = None
            pass

        for i in range(self.num_layer):
            x = F.dropout(x, p=self.args["dropout"], training=self.training)
            x = self.convs[i](x, edge_index, edge_weight)
            if i != self.num_layer - 1:
                x = activate_func(x, self.args["act"])

        return F.log_softmax(x, dim=1)

    def lp_encode(self, data):
        x = data.x
        for i in range(self.num_layer - 1):
            x = self.convs[i](x, data.edge_index) # Jie
            if i != self.num_layer - 2:
                x = activate_func(x, self.args["act"])
                # x = F.dropout(x, p=self.args["dropout"], training=self.training)
        return x

    def lp_decode(self, z, pos_edge_index, neg_edge_index):
        edge_index = torch.cat([pos_edge_index, neg_edge_index], dim=-1)
        logits = (z[edge_index[0]] * z[edge_index[1]]).sum(dim=-1)
        return logits

    def lp_decode_all(self, z):
        prob_adj = z @ z.t()
        return (prob_adj > 0).nonzero(as_tuple=False).t()


[docs]@register_model("gat-model") class AutoGAT(BaseAutoModel): r""" AutoGAT. The model used in this automodel is GAT, i.e., the graph attentional network from the `"Graph Attention Networks" <https://arxiv.org/abs/1710.10903>`_ paper. The layer is .. math:: \mathbf{x}^{\prime}_i = \alpha_{i,i}\mathbf{\Theta}\mathbf{x}_{i} + \sum_{j \in \mathcal{N}(i)} \alpha_{i,j}\mathbf{\Theta}\mathbf{x}_{j} where the attention coefficients :math:`\alpha_{i,j}` are computed as .. math:: \alpha_{i,j} = \frac{ \exp\left(\mathrm{LeakyReLU}\left(\mathbf{a}^{\top} [\mathbf{\Theta}\mathbf{x}_i \, \Vert \, \mathbf{\Theta}\mathbf{x}_j] \right)\right)} {\sum_{k \in \mathcal{N}(i) \cup \{ i \}} \exp\left(\mathrm{LeakyReLU}\left(\mathbf{a}^{\top} [\mathbf{\Theta}\mathbf{x}_i \, \Vert \, \mathbf{\Theta}\mathbf{x}_k] \right)\right)}. Parameters ---------- num_features: `int`. The dimension of features. num_classes: `int`. The number of classes. device: `torch.device` or `str` The device where model will be running on. init: `bool`. If True(False), the model will (not) be initialized. args: Other parameters. """ def __init__( self, num_features=None, num_classes=None, device=None, **args ): super().__init__(num_features, num_classes, device, **args) self.hyper_parameter_space = [ { "parameterName": "num_layers", "type": "DISCRETE", "feasiblePoints": "2,3,4", }, { "parameterName": "hidden", "type": "NUMERICAL_LIST", "numericalType": "INTEGER", "length": 3, "minValue": [8, 8, 8], "maxValue": [64, 64, 64], "scalingType": "LOG", "cutPara": ("num_layers",), "cutFunc": lambda x: x[0] - 1, }, { "parameterName": "dropout", "type": "DOUBLE", "maxValue": 0.8, "minValue": 0.2, "scalingType": "LINEAR", }, { "parameterName": "heads", "type": "DISCRETE", "feasiblePoints": "2,4,8,16", }, { "parameterName": "act", "type": "CATEGORICAL", "feasiblePoints": ["leaky_relu", "relu", "elu", "tanh"], }, ] self.hyper_parameters = { "num_layers": 2, "hidden": [32], "heads": 4, "dropout": 0.2, "act": "leaky_relu", } def _initialize(self): # """Initialize model.""" self._model = GAT({ "features_num": self.input_dimension, "num_class": self.output_dimension, **self.hyper_parameters }).to(self.device)