import torch
import torch.nn.functional
import typing as _typing
from torch_geometric.nn.conv import GCNConv
import autogl.data
from . import register_model
from .base import BaseAutoModel, activate_func, ClassificationSupportedSequentialModel
from ....utils import get_logger
LOGGER = get_logger("GCNModel")
class GCN(ClassificationSupportedSequentialModel):
class _GCNLayer(torch.nn.Module):
def __init__(
self,
input_channels: int,
output_channels: int,
add_self_loops: bool = True,
normalize: bool = True,
activation_name: _typing.Optional[str] = ...,
dropout_probability: _typing.Optional[float] = ...,
):
super().__init__()
self._convolution: GCNConv = GCNConv(
input_channels,
output_channels,
add_self_loops=bool(add_self_loops),
normalize=bool(normalize),
)
if (
activation_name is not Ellipsis
and activation_name is not None
and type(activation_name) == str
):
self._activation_name: _typing.Optional[str] = activation_name
else:
self._activation_name: _typing.Optional[str] = None
if (
dropout_probability is not Ellipsis
and dropout_probability is not None
and type(dropout_probability) == float
):
if dropout_probability < 0:
dropout_probability = 0
if dropout_probability > 1:
dropout_probability = 1
self._dropout: _typing.Optional[torch.nn.Dropout] = torch.nn.Dropout(
dropout_probability
)
else:
self._dropout: _typing.Optional[torch.nn.Dropout] = None
def forward(self, data, enable_activation: bool = True) -> torch.Tensor:
x: torch.Tensor = getattr(data, "x")
edge_index: torch.LongTensor = getattr(data, "edge_index")
edge_weight: _typing.Optional[torch.Tensor] = getattr(
data, "edge_weight", None
)
""" Validate the arguments """
if not type(x) == type(edge_index) == torch.Tensor:
raise TypeError
if edge_weight is not None and (
type(edge_weight) != torch.Tensor
or edge_index.size() != (2, edge_weight.size(0))
):
edge_weight: _typing.Optional[torch.Tensor] = None
x: torch.Tensor = self._convolution.forward(x, edge_index, edge_weight)
if self._activation_name is not None and enable_activation:
x: torch.Tensor = activate_func(x, self._activation_name)
if self._dropout is not None:
x: torch.Tensor = self._dropout.forward(x)
return x
def __init__(
self,
num_features: int,
num_classes: int,
hidden_features: _typing.Sequence[int],
activation_name: str,
dropout: _typing.Union[
_typing.Optional[float], _typing.Sequence[_typing.Optional[float]]
] = None,
add_self_loops: bool = True,
normalize: bool = True,
):
if isinstance(dropout, _typing.Sequence):
if len(dropout) != len(hidden_features) + 1:
raise TypeError(
"When the dropout argument is a sequence, "
"The sequence length must equal to the number of layers to construct."
)
for _dropout in dropout:
if _dropout is not None and type(_dropout) != float:
raise TypeError(
"When the dropout argument is a sequence, "
"every item in the sequence must be float or None"
)
dropout_list: _typing.Sequence[_typing.Optional[float]] = dropout
elif type(dropout) == float:
if dropout < 0:
dropout = 0
if dropout > 1:
dropout = 1
dropout_list: _typing.Sequence[_typing.Optional[float]] = [
dropout for _ in range(len(hidden_features))
] + [None]
elif dropout in (None, Ellipsis, ...):
dropout_list: _typing.Sequence[_typing.Optional[float]] = [
None for _ in range(len(hidden_features) + 1)
]
else:
raise TypeError(
"The provided dropout argument must be a float number or None or "
"a sequence in which each item is either a float Number or None."
)
super().__init__()
if len(hidden_features) == 0:
self.__sequential_encoding_layers: torch.nn.ModuleList = (
torch.nn.ModuleList(
(
self._GCNLayer(
num_features,
num_classes,
add_self_loops,
normalize,
dropout_probability=dropout_list[0],
),
)
)
)
else:
self.__sequential_encoding_layers: torch.nn.ModuleList = (
torch.nn.ModuleList()
)
self.__sequential_encoding_layers.append(
self._GCNLayer(
num_features,
hidden_features[0],
add_self_loops,
normalize,
activation_name,
dropout_list[0],
)
)
for hidden_feature_index in range(len(hidden_features)):
if hidden_feature_index + 1 < len(hidden_features):
self.__sequential_encoding_layers.append(
self._GCNLayer(
hidden_features[hidden_feature_index],
hidden_features[hidden_feature_index + 1],
add_self_loops,
normalize,
activation_name,
dropout_list[hidden_feature_index + 1],
)
)
# print((
# hidden_features[hidden_feature_index],
# hidden_features[hidden_feature_index + 1],
# add_self_loops,
# normalize,
# activation_name,
# dropout_list[hidden_feature_index + 1],
# ))
else:
self.__sequential_encoding_layers.append(
self._GCNLayer(
hidden_features[hidden_feature_index],
num_classes,
add_self_loops,
normalize,
dropout_list[-1],
)
)
# print((
# hidden_features[hidden_feature_index],
# num_classes,
# add_self_loops,
# normalize,
# dropout_list[-1],
# ))
@property
def sequential_encoding_layers(self) -> torch.nn.ModuleList:
return self.__sequential_encoding_layers
def __extract_edge_indexes_and_weights(
self, data
) -> _typing.Union[
_typing.Sequence[
_typing.Tuple[torch.LongTensor, _typing.Optional[torch.Tensor]]
],
_typing.Tuple[torch.LongTensor, _typing.Optional[torch.Tensor]],
]:
def __compose_edge_index_and_weight(
_edge_index: torch.LongTensor,
_edge_weight: _typing.Optional[torch.Tensor] = None,
) -> _typing.Tuple[torch.LongTensor, _typing.Optional[torch.Tensor]]:
if type(_edge_index) != torch.Tensor or _edge_index.dtype != torch.int64:
print(type(_edge_index))
raise TypeError
if _edge_weight is not None and (
type(_edge_weight) != torch.Tensor
or _edge_index.size() != (2, _edge_weight.size(0))
):
_edge_weight: _typing.Optional[torch.Tensor] = None
return _edge_index, _edge_weight
if not (
hasattr(data, "edge_indexes")
and isinstance(getattr(data, "edge_indexes"), _typing.Sequence)
and len(getattr(data, "edge_indexes"))
== len(self.__sequential_encoding_layers)
):
return __compose_edge_index_and_weight(
getattr(data, "edge_index"), getattr(data, "edge_weight", None)
)
for __edge_index in getattr(data, "edge_indexes"):
if type(__edge_index) != torch.Tensor or __edge_index.dtype != torch.int64:
return __compose_edge_index_and_weight(
getattr(data, "edge_index"), getattr(data, "edge_weight", None)
)
if (
hasattr(data, "edge_weights")
and isinstance(getattr(data, "edge_weights"), _typing.Sequence)
and len(getattr(data, "edge_weights"))
== len(self.__sequential_encoding_layers)
):
return [
__compose_edge_index_and_weight(_edge_index, _edge_weight)
for _edge_index, _edge_weight in zip(
getattr(data, "edge_indexes"), getattr(data, "edge_weights")
)
]
else:
return [
__compose_edge_index_and_weight(__edge_index)
for __edge_index in getattr(data, "edge_indexes")
]
def forward(self, data):
return self.cls_decode(self.cls_encode(data))
def cls_encode(self, data) -> torch.Tensor:
edge_indexes_and_weights: _typing.Union[
_typing.Sequence[
_typing.Tuple[torch.LongTensor, _typing.Optional[torch.Tensor]]
],
_typing.Tuple[torch.LongTensor, _typing.Optional[torch.Tensor]],
] = self.__extract_edge_indexes_and_weights(data)
if (not isinstance(edge_indexes_and_weights, tuple)) and isinstance(
edge_indexes_and_weights[0], tuple
):
""" edge_indexes_and_weights is sequence of (edge_index, edge_weight) """
assert len(edge_indexes_and_weights) == len(
self.__sequential_encoding_layers
)
x: torch.Tensor = getattr(data, "x")
for _edge_index_and_weight, gcn in zip(
edge_indexes_and_weights, self.__sequential_encoding_layers
):
_temp_data = autogl.data.Data(x=x, edge_index=_edge_index_and_weight[0])
_temp_data.edge_weight = _edge_index_and_weight[1]
x = gcn(_temp_data)
return x
else:
""" edge_indexes_and_weights is (edge_index, edge_weight) """
x = getattr(data, "x")
for gcn in self.__sequential_encoding_layers:
_temp_data = autogl.data.Data(
x=x, edge_index=edge_indexes_and_weights[0]
)
_temp_data.edge_weight = edge_indexes_and_weights[1]
x = gcn(_temp_data)
return x
def cls_decode(self, x: torch.Tensor) -> torch.Tensor:
return torch.nn.functional.log_softmax(x, dim=1)
def lp_encode(self, data):
x: torch.Tensor = getattr(data, "x")
for i in range(len(self.__sequential_encoding_layers) - 2):
x = self.__sequential_encoding_layers[i](
autogl.data.Data(x, getattr(data, "edge_index"))
)
# _GCNLayer(
# (_convolution): GCNConv(1433, 128)
# (_dropout): Dropout(p=0.0, inplace=False)
# )
# print(self.__sequential_encoding_layers)
x = self.__sequential_encoding_layers[-2](
autogl.data.Data(x, getattr(data, "edge_index")), enable_activation=False
)
return x
def lp_decode(self, z, pos_edge_index, neg_edge_index):
edge_index = torch.cat([pos_edge_index, neg_edge_index], dim=-1)
logits = (z[edge_index[0]] * z[edge_index[1]]).sum(dim=-1)
return logits
def lp_decode_all(self, z):
prob_adj = z @ z.t()
return (prob_adj > 0).nonzero(as_tuple=False).t()
[docs]@register_model("gcn-model")
class AutoGCN(BaseAutoModel):
r"""
AutoGCN.
The model used in this automodel is GCN, i.e., the graph convolutional network from the
`"Semi-supervised Classification with Graph Convolutional
Networks" <https://arxiv.org/abs/1609.02907>`_ paper. The layer is
.. math::
\mathbf{X}^{\prime} = \mathbf{\hat{D}}^{-1/2} \mathbf{\hat{A}}
\mathbf{\hat{D}}^{-1/2} \mathbf{X} \mathbf{\Theta},
where :math:`\mathbf{\hat{A}} = \mathbf{A} + \mathbf{I}` denotes the
adjacency matrix with inserted self-loops and
:math:`\hat{D}_{ii} = \sum_{j=0} \hat{A}_{ij}` its diagonal degree matrix.
Parameters
----------
num_features: ``int``
The dimension of features.
num_classes: ``int``
The number of classes.
device: ``torch.device`` or ``str``
The device where model will be running on.
init: `bool`.
If True(False), the model will (not) be initialized.
"""
def __init__(
self,
num_features: int = ...,
num_classes: int = ...,
device: _typing.Union[str, torch.device] = ...,
**kwargs
) -> None:
super().__init__(num_features, num_classes, device, **kwargs)
self.hyper_parameter_space = [
{
"parameterName": "add_self_loops",
"type": "CATEGORICAL",
"feasiblePoints": [1],
},
{
"parameterName": "normalize",
"type": "CATEGORICAL",
"feasiblePoints": [1],
},
{
"parameterName": "num_layers",
"type": "DISCRETE",
"feasiblePoints": "2,3,4",
},
{
"parameterName": "hidden",
"type": "NUMERICAL_LIST",
"numericalType": "INTEGER",
"length": 3,
"minValue": [8, 8, 8],
"maxValue": [128, 128, 128],
"scalingType": "LOG",
"cutPara": ("num_layers",),
"cutFunc": lambda x: x[0] - 1,
},
{
"parameterName": "dropout",
"type": "DOUBLE",
"maxValue": 0.8,
"minValue": 0.2,
"scalingType": "LINEAR",
},
{
"parameterName": "act",
"type": "CATEGORICAL",
"feasiblePoints": ["leaky_relu", "relu", "elu", "tanh"],
},
]
self.hyper_parameters = {
"num_layers": 3,
"hidden": [128, 64],
"dropout": 0,
"act": "relu",
}
def _initialize(self):
self._model = GCN(
self.input_dimension,
self.output_dimension,
self.hyper_parameters.get("hidden"),
self.hyper_parameters.get("act"),
self.hyper_parameters.get("dropout", None),
bool(self.hyper_parameters.get("add_self_loops", True)),
bool(self.hyper_parameters.get("normalize", True)),
).to(self.device)
# print((
# self.input_dimension,
# self.output_dimension,
# self.hyper_parameters.get("hidden"),
# self.hyper_parameters.get("act"),
# self.hyper_parameters.get("dropout", None),
# bool(self.hyper_parameters.get("add_self_loops", True)),
# bool(self.hyper_parameters.get("normalize", True))))