Source code for autogl.module.nas.space.autoattend

# codes in this file are reproduced from AutoAttend with some changes.
from nni.nas.pytorch.mutables import Mutable
import typing as _typ
import torch

import torch.nn.functional as F
from nni.nas.pytorch import mutables

from . import register_nas_space
from .base import BaseSpace
from ..utils import count_parameters, measure_latency

from torch import nn
from .operation import act_map, gnn_map

from ..backend import *

from .autoattend_space.ops1 import OPS as OPS1
from .autoattend_space.ops2 import OPS as OPS2
from .autoattend_space.operations import agg_map
OPS = [OPS1, OPS2]


[docs]@register_nas_space("autoattend") class AutoAttendNodeClassificationSpace(BaseSpace): """ AutoAttend Search Space , please refer to http://proceedings.mlr.press/v139/guan21a.html for details. The current implementation is nc (no context weight sharing), we will in future add other types of partial weight sharing proposed in the paper. Parameters ---------- ops_type : int 0 or 1 , choosing from two sets of ops with index ops_type gnn_ops : list of str op names for searching, which descripts the compostion of operation pool act_op : str determine used activation function agg_ops : list of str agg op names for searching. Only ['add','attn'] are options, as mentioned in the paper. """ def __init__( self, hidden_dim: _typ.Optional[int] = 64, layer_number: _typ.Optional[int] = 2, dropout: _typ.Optional[float] = 0.9, input_dim: _typ.Optional[int] = None, output_dim: _typ.Optional[int] = None, ops_type=0, gnn_ops: _typ.Sequence[_typ.Union[str, _typ.Any] ] = None, act_op="tanh", head=8, agg_ops=['add', 'attn'], # agg_ops=['attn'], ): super().__init__() from autogl.backend import DependentBackend;assert not DependentBackend.is_dgl(),"Now AutoAttend only support pyg" self.layer_number = layer_number self.hidden_dim = hidden_dim self.input_dim = input_dim self.output_dim = output_dim self.gnn_ops = gnn_ops self.dropout = dropout self.act_op = act_op self.ops_type = ops_type self.head = head self.agg_ops = agg_ops
[docs] def instantiate( self, hidden_dim: _typ.Optional[int] = None, layer_number: _typ.Optional[int] = None, dropout: _typ.Optional[float] = None, input_dim: _typ.Optional[int] = None, output_dim: _typ.Optional[int] = None, ops_type=None, gnn_ops: _typ.Sequence[_typ.Union[str, _typ.Any]] = None, act_op=None, head=None, agg_ops=None, # con_ops: _typ.Sequence[_typ.Union[str, _typ.Any]] = None, ): super().instantiate() self.dropout = dropout or self.dropout self.hidden_dim = hidden_dim or self.hidden_dim self.layer_number = layer_number or self.layer_number self.input_dim = input_dim or self.input_dim self.output_dim = output_dim or self.output_dim self.gnn_ops = gnn_ops or self.gnn_ops self.act_op = act_op or self.act_op self.act = act_map(self.act_op) self.head = head or self.head self.ops_type = ops_type or self.ops_type self.agg_ops = agg_ops or self.agg_ops PRIMITIVES = list(OPS[self.ops_type].keys()) self.gnn_map = lambda x, * \ args, **kwargs: OPS[self.ops_type][x](*args, **kwargs) self.gnn_ops = self.gnn_ops or PRIMITIVES self.agg_map = lambda x, * \ args, **kwargs: agg_map[x](*args, **kwargs) # self.preproc0 = nn.Linear(self.input_dim, self.input_dim) for layer in range(1, self.layer_number+1): # stem path layer_stem = f"{layer-1}->{layer}" key = f"stem_{layer_stem}" self._set_layer_choice(layer_stem, key) # input key = f"in_{layer}" # self._set_input_choice(key,layer, choose_from=node_labels, n_chosen=1, return_mask=False) self._set_input_choice( key, layer, n_candidates=layer, n_chosen=1, return_mask=False) # agg key = f"agg_{layer}" self._set_agg_choice(layer, key=key) for l1 in range(1, self.layer_number+1): for l2 in range(l1): # from l2 to l1; l2 means input layer; l1 means output layer layer = f"{l2}->{l1}" # side path key = f"side_{layer}" for i in range(2): sub_key = f"{key}_{i}" self._set_layer_choice(layer, sub_key) self._initialized = True
# self.classifier2 = nn.Linear(self.hidden_dim, self.output_dim) def _set_agg_choice(self, layer, key): if layer == self.layer_number: dim = self.output_dim else: dim = self.hidden_dim agg_head = self.head if layer < self.layer_number else 1 ops = [self.agg_map(op, dim, agg_head, self.dropout)for op in self.agg_ops] choice = self.setLayerChoice( layer, ops, key=key, ) setattr(self, key, choice) return choice def _set_layer_choice(self, layer, key): l1, l2 = list(map(int, layer.split('->'))) input_dim = output_dim = self.hidden_dim input_dim = self.input_dim if l1 == 0 else self.hidden_dim output_dim = self.output_dim if l2 == self.layer_number else self.hidden_dim opnames = self.gnn_ops.copy() if input_dim != output_dim: for invalid_op in "ZERO IDEN".split(): opnames.remove(invalid_op) concat = (l2 != self.layer_number) if self.ops_type == 0: ops = [self.gnn_map(op, input_dim, output_dim, self.dropout, concat=concat)for op in opnames] elif self.ops_type == 1: ops = [self.gnn_map(op, input_dim, output_dim, self.head, self.dropout, concat=concat)for op in opnames] choice = self.setLayerChoice( l1, ops, key=key, ) setattr(self, key, choice) return choice def _set_input_choice(self, key, layer, **kwargs): setattr(self, key, self.setInputChoice( layer, key=key, **kwargs ))
[docs] def forward(self, data): x = bk_feat(data) def drop(x): x = F.dropout(x, p=self.dropout, training=self.training) return x # prev_ = self.preproc0(x) prev_ = x side_outs = {} # {l:[side1,side2...]} collects lth layer's input stem_outs = [] input = prev_ for layer in range(1, self.layer_number + 1): # do layer choice for stem layer_stem = f"{layer-1}->{layer}" op = getattr(self, f"stem_{layer_stem}") stem_out = bk_gconv(op, data, drop(input)) stem_out = self.act(stem_out) stem_outs.append(stem_out) # do double layer choice for sides for l2 in range(layer, self.layer_number+1): l1 = layer-1 layer_side = f"{l1}->{l2}" side_out_list = [] for i in range(2): op = getattr(self, f'side_{layer_side}_{i}') side_out = bk_gconv(op, data, drop(input)) side_out = self.act(side_out) # torch.Size([2, 2708, 64]) side_out_list.append(side_out) side_out = torch.stack(side_out_list, dim=0) if l2 not in side_outs: side_outs[l2] = [] side_outs[l2].append(side_out) # select input [x1,x2,x3] from side1,side2,stem current_side_outs = side_outs[layer] side_selected = getattr(self, f"in_{layer}")(current_side_outs) input = [stem_outs[-1], side_selected] # do agg in [add , attn] agg = getattr(self, f"agg_{layer}") input = bk_gconv(agg, data, input) # x = self.classifier2(input) x = input return F.log_softmax(x, dim=1)
[docs] def parse_model(self, selection, device): return self.wrap().fix(selection)