"""
Auto Classfier for Node Classification
"""
import logging
import time
import json
from copy import deepcopy
from typing import Sequence
import torch
import numpy as np
import yaml
from .base import BaseClassifier
from ..base import _parse_hp_space, _initialize_single_model, _parse_model_hp
from ...module.feature import FEATURE_DICT
from ...module.train import TRAINER_DICT, BaseLinkPredictionTrainer
from ...module.train import get_feval
from ..utils import LeaderBoard, get_graph_node_features, convert_dataset, set_seed
from ...datasets import utils
from ..utils import get_logger
from ...backend import DependentBackend
LOGGER = get_logger("LinkPredictor")
BACKEND = DependentBackend.get_backend_name()
def _negative_sample_dgl(train_graph, pos_graph):
import scipy.sparse as sp
import dgl
u, v = train_graph.edges()
up, vp = pos_graph.edges()
u_all, v_all = np.concatenate([u.numpy(), up.numpy()]), np.concatenate([v.numpy(), vp.numpy()])
adj = sp.coo_matrix((np.ones(len(u_all)), (u_all, v_all)))
adj_neg = 1 - adj.todense() - np.eye(train_graph.number_of_nodes())
neg_u, neg_v = np.where(adj_neg != 0)
# sample negative edges
neg_eids = np.random.choice(len(neg_u), len(up))
return dgl.DGLGraph((neg_u[:neg_eids], neg_v[:neg_eids]), num_nodes=train_graph.number_of_nodes())
[docs]class AutoLinkPredictor(BaseClassifier):
"""
Auto Link Predictor.
Used to automatically solve the link prediction problems.
Parameters
----------
feature_module: autogl.module.feature.BaseFeatureEngineer or str or None
The (name of) auto feature engineer used to process the given dataset. Default ``deepgl``.
Disable feature engineer by setting it to ``None``.
graph_models: list of autogl.module.model.BaseModel or list of str
The (name of) models to be optimized as backbone. Default ``['gat', 'gcn']``.
hpo_module: autogl.module.hpo.BaseHPOptimizer or str or None
The (name of) hpo module used to search for best hyper parameters. Default ``anneal``.
Disable hpo by setting it to ``None``.
ensemble_module: autogl.module.ensemble.BaseEnsembler or str or None
The (name of) ensemble module used to ensemble the multi-models found. Default ``voting``.
Disable ensemble by setting it to ``None``.
max_evals: int (Optional)
If given, will set the number eval times the hpo module will use.
Only be effective when hpo_module is ``str``. Default ``None``.
default_trainer: str (Optional)
The (name of) the trainer used in this solver. Default to ``NodeClassificationFull``.
trainer_hp_space: list of dict (Optional)
trainer hp space or list of trainer hp spaces configuration.
If a single trainer hp is given, will specify the hp space of trainer for every model.
If a list of trainer hp is given, will specify every model with corrsponding
trainer hp space.
Default ``None``.
model_hp_spaces: list of list of dict (Optional)
model hp space configuration.
If given, will specify every hp space of every passed model. Default ``None``.
If the encoder(-decoder) is passed, the space should be a dict containing keys "encoder"
and "decoder", specifying the detailed encoder decoder hp spaces.
size: int (Optional)
The max models ensemble module will use. Default ``None``.
device: torch.device or str
The device where model will be running on. If set to ``auto``, will use gpu when available.
You can also specify the device by directly giving ``gpu`` or ``cuda:0``, etc.
Default ``auto``.
"""
def __init__(
self,
feature_module=None,
graph_models=("gat", "gcn"),
hpo_module="anneal",
ensemble_module="voting",
max_evals=50,
default_trainer="LinkPredictionFull",
trainer_hp_space=None,
model_hp_spaces=None,
size=4,
device="auto",
):
super().__init__(
feature_module=feature_module,
graph_models=graph_models,
nas_algorithms=None,
nas_spaces=None,
nas_estimators=None,
hpo_module=hpo_module,
ensemble_module=ensemble_module,
max_evals=max_evals,
default_trainer=default_trainer,
trainer_hp_space=trainer_hp_space,
model_hp_spaces=model_hp_spaces,
size=size,
device=device,
)
# data to be kept when fit
self.dataset = None
def _init_graph_module(
self, graph_models, num_features, feval, device, loss
) -> "AutoLinkPredictor":
self.graph_model_list = []
for i, model in enumerate(graph_models):
# init the trainer
if not isinstance(model, BaseLinkPredictionTrainer):
trainer = (
self._default_trainer if not isinstance(self._default_trainer, (tuple, list))
else self._default_trainer[i]
)
if isinstance(trainer, str):
if trainer not in TRAINER_DICT:
raise KeyError(f"Does not support trainer {trainer}")
trainer = TRAINER_DICT[trainer]()
if isinstance(model, (tuple, list)):
trainer.encoder = model[0]
trainer.decoder = model[1]
else:
trainer.encoder = model
else:
trainer = model
# set model hp space
if self._model_hp_spaces is not None:
if self._model_hp_spaces[i] is not None:
if isinstance(self._model_hp_spaces[i], dict):
encoder_hp_space = self._model_hp_spaces[i].get('encoder', None)
decoder_hp_space = self._model_hp_spaces[i].get('decoder', None)
else:
encoder_hp_space = self._model_hp_spaces[i]
decoder_hp_space = None
if encoder_hp_space is not None:
trainer.encoder.hyper_parameter_space = encoder_hp_space
if decoder_hp_space is not None:
trainer.decoder.hyper_parameter_space = decoder_hp_space
# set trainer hp space
if self._trainer_hp_space is not None:
if isinstance(self._trainer_hp_space[0], list):
current_hp_for_trainer = self._trainer_hp_space[i]
else:
current_hp_for_trainer = self._trainer_hp_space
trainer.hyper_parameter_space = current_hp_for_trainer
trainer.num_features = num_features
trainer.loss = loss
trainer.feval = feval
trainer.to(device)
self.graph_model_list.append(trainer)
return self
def _to_prob(self, sig_prob: np.ndarray):
nelements = len(sig_prob)
prob = np.zeros([nelements, 2])
prob[:, 0] = 1 - sig_prob
prob[:, 1] = sig_prob
return prob
def _compose_dataset(self, dataset):
if isinstance(dataset[0], (list, tuple)):
new_dataset = []
for data in dataset:
new_dataset.append({
"train": data[0],
"train_pos": data[1],
"train_neg": data[2],
"val_pos": data[3],
"val_neg": data[4]
})
if len(data) == 7:
new_dataset[-1]["test_pos"] = data[5]
new_dataset[-1]["test_neg"] = data[6]
return new_dataset
else:
return convert_dataset(dataset)
# pylint: disable=arguments-differ
[docs] def fit(
self,
dataset,
time_limit=-1,
inplace=False,
train_split=None,
val_split=None,
evaluation_method="infer",
seed=None,
) -> "AutoLinkPredictor":
"""
Fit current solver on given dataset.
Parameters
----------
dataset: torch_geometric.data.dataset.Dataset
The dataset needed to fit on. This dataset must have only one graph.
time_limit: int
The time limit of the whole fit process (in seconds). If set below 0,
will ignore time limit. Default ``-1``.
inplace: bool
Whether we process the given dataset in inplace manner. Default ``False``.
Set it to True if you want to save memory by modifying the given dataset directly.
train_split: float or int (Optional)
The train ratio (in ``float``) or number (in ``int``) of dataset. If you want to
use default train/val/test split in dataset, please set this to ``None``.
Default ``None``.
val_split: float or int (Optional)
The validation ratio (in ``float``) or number (in ``int``) of dataset. If you want
to use default train/val/test split in dataset, please set this to ``None``.
Default ``None``.
evaluation_method: (list of) str or autogl.module.train.evaluation
A (list of) evaluation method for current solver. If ``infer``, will automatically
determine. Default ``infer``.
seed: int (Optional)
The random seed. If set to ``None``, will run everything at random.
Default ``None``.
Returns
-------
self: autogl.solver.AutoNodeClassifier
A reference of current solver.
"""
set_seed(seed)
if time_limit < 0:
time_limit = 3600 * 24
time_begin = time.time()
# initialize leaderboard
if evaluation_method == "infer":
if hasattr(dataset, "metric"):
evaluation_method = [dataset.metric]
else:
num_of_label = dataset.num_classes
if num_of_label == 2:
evaluation_method = ["auc"]
else:
evaluation_method = ["acc"]
assert isinstance(evaluation_method, list)
evaluator_list = get_feval(evaluation_method)
self.leaderboard = LeaderBoard(
[e.get_eval_name() for e in evaluator_list],
{e.get_eval_name(): e.is_higher_better() for e in evaluator_list},
)
graph_data = dataset[0] # get_graph_from_dataset(dataset)
# set up the dataset
if train_split is not None and val_split is not None:
dataset = utils.split_edges(dataset, train_split, val_split)
graph_data = dataset[0]
else:
if BACKEND == 'pyg':
assert all(
[
hasattr(graph_data, f"{name}")
for name in [
"train_pos_edge_index",
"train_neg_adj_mask",
"val_pos_edge_index",
"val_neg_edge_index",
"test_pos_edge_index",
"test_neg_edge_index",
]
]
), (
"The dataset has no default train/val split! Please manually pass "
"train and val ratio."
)
elif BACKEND == 'dgl':
assert len(graph_data) in [5, 7], (
"The dataset has no default train/val split! Please manually pass "
"train and val ratio."
)
LOGGER.info("Use the default train/val/test ratio in given dataset")
# feature engineering
if self.feature_module is not None:
if BACKEND == 'pyg':
dataset = self.feature_module.fit_transform(dataset, inplace=inplace)
else:
_dataset = self.feature_module.fit_transform([g[0] for g in dataset], inplace=inplace)
dataset = [[_d, *d[1:]] for _d, d in zip(_dataset, dataset)]
self.dataset = dataset
# check whether the dataset has features.
# currently we only support graph classification with features.
if BACKEND == 'dgl':
feat = get_graph_node_features(graph_data[0])
else:
feat = get_graph_node_features(graph_data)
assert feat is not None, (
"Does not support fit on non node-feature dataset!"
" Please add node features to dataset or specify feature engineers that generate"
" node features."
)
# TODO: how can we get num_features?
num_features = feat.size(-1)
# initialize graph networks
self._init_graph_module(
self.gml,
num_features=num_features,
feval=evaluator_list,
device=self.runtime_device,
loss="binary_cross_entropy_with_logits"
if not hasattr(dataset, "loss")
else dataset.loss,
)
# train the models and tune hpo
result_valid = []
names = []
for idx, model in enumerate(self.graph_model_list):
time_for_each_model = (time_limit - time.time() + time_begin) / (
len(self.graph_model_list) - idx
)
if self.hpo_module is None:
model.initialize()
model.train(self._compose_dataset(self.dataset), True)
optimized = model
else:
optimized, _ = self.hpo_module.optimize(
trainer=model, dataset=self._compose_dataset(self.dataset), time_limit=time_for_each_model
)
# to save memory, all the trainer derived will be mapped to cpu
optimized.to(torch.device("cpu"))
name = str(optimized) + "_idx%d" % (idx)
names.append(name)
performance_on_valid, _ = optimized.get_valid_score(return_major=False)
result_valid.append(
self._to_prob(optimized.get_valid_predict_proba().cpu().numpy())
)
self.leaderboard.insert_model_performance(
name,
dict(
zip(
[e.get_eval_name() for e in evaluator_list],
performance_on_valid,
)
),
)
self.trained_models[name] = optimized
# fit the ensemble model
if self.ensemble_module is not None:
if BACKEND == 'pyg':
pos_edge_index, neg_edge_index = (
self.dataset[0].val_pos_edge_index,
self.dataset[0].val_neg_edge_index,
)
elif BACKEND == 'dgl':
pos_edge_index, neg_edge_index = torch.stack(self.dataset[0][3].edges()), torch.stack(self.dataset[0][4].edges())
E = pos_edge_index.size(1) + neg_edge_index.size(1)
link_labels = torch.zeros(E, dtype=torch.float)
link_labels[: pos_edge_index.size(1)] = 1.0
performance = self.ensemble_module.fit(
result_valid,
link_labels.detach().cpu().numpy(),
names,
evaluator_list,
n_classes=2
)
self.leaderboard.insert_model_performance(
"ensemble",
dict(zip([e.get_eval_name() for e in evaluator_list], performance)),
)
return self
[docs] def fit_predict(
self,
dataset,
time_limit=-1,
inplace=False,
train_split=None,
val_split=None,
evaluation_method="infer",
use_ensemble=True,
use_best=True,
name=None,
) -> np.ndarray:
"""
Fit current solver on given dataset and return the predicted value.
Parameters
----------
dataset: torch_geometric.data.dataset.Dataset
The dataset needed to fit on. This dataset must have only one graph.
time_limit: int
The time limit of the whole fit process (in seconds).
If set below 0, will ignore time limit. Default ``-1``.
inplace: bool
Whether we process the given dataset in inplace manner. Default ``False``.
Set it to True if you want to save memory by modifying the given dataset directly.
train_split: float or int (Optional)
The train ratio (in ``float``) or number (in ``int``) of dataset. If you want to
use default train/val/test split in dataset, please set this to ``None``.
Default ``None``.
val_split: float or int (Optional)
The validation ratio (in ``float``) or number (in ``int``) of dataset. If you want
to use default train/val/test split in dataset, please set this to ``None``.
Default ``None``.
balanced: bool
Wether to create the train/valid/test split in a balanced way.
If set to ``True``, the train/valid will have the same number of different classes.
Default ``False``.
evaluation_method: (list of) str or autogl.module.train.evaluation
A (list of) evaluation method for current solver. If ``infer``, will automatically
determine. Default ``infer``.
use_ensemble: bool
Whether to use ensemble to do the predict. Default ``True``.
use_best: bool
Whether to use the best single model to do the predict. Will only be effective when
``use_ensemble`` is ``False``.
Default ``True``.
name: str or None
The name of model used to predict. Will only be effective when ``use_ensemble`` and
``use_best`` both are ``False``.
Default ``None``.
Returns
-------
result: np.ndarray
An array of shape ``(N,)``, where ``N`` is the number of test nodes. The prediction
on given dataset.
"""
self.fit(
dataset=dataset,
time_limit=time_limit,
inplace=inplace,
train_split=train_split,
val_split=val_split,
evaluation_method=evaluation_method,
)
return self.predict(
dataset=dataset,
inplaced=inplace,
inplace=inplace,
use_ensemble=use_ensemble,
use_best=use_best,
name=name,
)
[docs] def predict_proba(
self,
dataset=None,
inplaced=False,
inplace=False,
use_ensemble=True,
use_best=True,
name=None,
mask="test",
) -> np.ndarray:
"""
Predict the node probability.
Parameters
----------
dataset: torch_geometric.data.dataset.Dataset or None
The dataset needed to predict. If ``None``, will use the processed dataset passed
to ``fit()`` instead. Default ``None``.
inplaced: bool
Whether the given dataset is processed. Only be effective when ``dataset``
is not ``None``. If you pass the dataset to ``fit()`` with ``inplace=True``, and
you pass the dataset again to this method, you should set this argument to ``True``.
Otherwise ``False``. Default ``False``.
inplace: bool
Whether we process the given dataset in inplace manner. Default ``False``. Set it to
True if you want to save memory by modifying the given dataset directly.
use_ensemble: bool
Whether to use ensemble to do the predict. Default ``True``.
use_best: bool
Whether to use the best single model to do the predict. Will only be effective when
``use_ensemble`` is ``False``. Default ``True``.
name: str or None
The name of model used to predict. Will only be effective when ``use_ensemble`` and
``use_best`` both are ``False``. Default ``None``.
mask: str
The data split to give prediction on. Default ``test``.
Returns
-------
result: np.ndarray
An array of shape ``(N,C,)``, where ``N`` is the number of test nodes and ``C`` is
the number of classes. The prediction on given dataset.
"""
if dataset is None:
dataset = self.dataset
assert dataset is not None, (
"Please execute fit() first before" " predicting on remembered dataset"
)
elif not inplaced and self.feature_module is not None:
if BACKEND == 'pyg':
dataset = self.feature_module.transform(dataset, inplace=inplace)
elif BACKEND == 'dgl':
import dgl
transformed = self.feature_module.transform([d[0] for d in dataset], inplace=inplace)
dataset = [[tran, None, None, None, None, d[1], d[2] if len(d) == 3 else dgl.DGLGraph()] for tran, d in zip(transformed, dataset)]
if use_ensemble:
LOGGER.info("Ensemble argument on, will try using ensemble model.")
if not use_ensemble and use_best:
LOGGER.info(
"Ensemble argument off and best argument on, will try using best model."
)
if (use_ensemble and self.ensemble_module is not None) or (
not use_best and name == "ensemble"
):
# we need to get all the prediction of every model trained
predict_result = []
names = []
for model_name in self.trained_models:
predict_result.append(
self._to_prob(
self._predict_proba_by_name(dataset, model_name, mask)
)
)
names.append(model_name)
return self.ensemble_module.ensemble(predict_result, names)[:, 1]
if use_ensemble and self.ensemble_module is None:
LOGGER.warning(
"Cannot use ensemble because no ensebmle module is given. "
"Will use best model instead."
)
if use_best or (use_ensemble and self.ensemble_module is None):
# just return the best model we have found
name = self.leaderboard.get_best_model()
return self._predict_proba_by_name(dataset, name, mask)
if name is not None:
# return model performance by name
return self._predict_proba_by_name(dataset, name, mask)
LOGGER.error(
"No model name is given while ensemble and best arguments are off."
)
raise ValueError(
"You need to specify a model name if you do not want use ensemble and best model."
)
def _predict_proba_by_name(self, dataset, name, mask="test"):
self.trained_models[name].to(self.runtime_device)
predicted = (
self.trained_models[name].predict_proba(self._compose_dataset(dataset), mask=mask).cpu().numpy()
)
self.trained_models[name].to(torch.device("cpu"))
return predicted
[docs] def predict(
self,
dataset=None,
inplaced=False,
inplace=False,
use_ensemble=True,
use_best=True,
name=None,
mask="test",
threshold=0.5,
) -> np.ndarray:
"""
Predict the node class number.
Parameters
----------
dataset: torch_geometric.data.dataset.Dataset or None
The dataset needed to predict. If ``None``, will use the processed dataset passed
to ``fit()`` instead. Default ``None``.
inplaced: bool
Whether the given dataset is processed. Only be effective when ``dataset``
is not ``None``. If you pass the dataset to ``fit()`` with ``inplace=True``,
and you pass the dataset again to this method, you should set this argument
to ``True``. Otherwise ``False``. Default ``False``.
inplace: bool
Whether we process the given dataset in inplace manner. Default ``False``.
Set it to True if you want to save memory by modifying the given dataset directly.
use_ensemble: bool
Whether to use ensemble to do the predict. Default ``True``.
use_best: bool
Whether to use the best single model to do the predict. Will only be effective
when ``use_ensemble`` is ``False``. Default ``True``.
name: str or None
The name of model used to predict. Will only be effective when ``use_ensemble``
and ``use_best`` both are ``False``. Default ``None``.
mask: str
The data split to give prediction on. Default ``test``.
threshold: float
The threshold to judge whether the edges are positive or not.
Returns
-------
result: np.ndarray
An array of shape ``(N,)``, where ``N`` is the number of test nodes.
The prediction on given dataset.
"""
proba = self.predict_proba(
dataset, inplaced, inplace, use_ensemble, use_best, name, mask
)
return (proba > threshold).astype("int")
[docs] def evaluate(self, dataset=None,
inplaced=False,
inplace=False,
use_ensemble=True,
use_best=True,
name=None,
mask="test",
label=None,
metric="auc"
):
"""
Evaluate the given dataset.
Parameters
----------
dataset: torch_geometric.data.dataset.Dataset or None
The dataset needed to predict. If ``None``, will use the processed dataset passed
to ``fit()`` instead. Default ``None``.
inplaced: bool
Whether the given dataset is processed. Only be effective when ``dataset``
is not ``None``. If you pass the dataset to ``fit()`` with ``inplace=True``, and
you pass the dataset again to this method, you should set this argument to ``True``.
Otherwise ``False``. Default ``False``.
inplace: bool
Whether we process the given dataset in inplace manner. Default ``False``. Set it to
True if you want to save memory by modifying the given dataset directly.
use_ensemble: bool
Whether to use ensemble to do the predict. Default ``True``.
use_best: bool
Whether to use the best single model to do the predict. Will only be effective when
``use_ensemble`` is ``False``. Default ``True``.
name: str or None
The name of model used to predict. Will only be effective when ``use_ensemble`` and
``use_best`` both are ``False``. Default ``None``.
mask: str
The data split to give prediction on. Default ``test``.
label: torch.Tensor (Optional)
The groud truth label of the given predicted dataset split. If not passed, will extract
labels from the input dataset.
metric: str
The metric to be used for evaluating the model. Default ``auc``.
Returns
-------
score(s): (list of) evaluation scores
the evaluation results according to the evaluator passed.
"""
if dataset is None:
dataset = self.dataset
assert dataset is not None, (
"Please execute fit() first before" " predicting on remembered dataset"
)
elif not inplaced and self.feature_module is not None:
if BACKEND == 'pyg':
dataset = self.feature_module.transform(dataset, inplace=inplace)
elif BACKEND == 'dgl':
import dgl
transformed = self.feature_module.transform([d[0] for d in dataset], inplace=inplace)
dataset = [[tran, None, None, None, None, d[1], d[2] if len(d) == 3 else dgl.DGLGraph()] for tran, d in zip(transformed, dataset)]
graph = dataset[0]
mask2posid_dgl = {"train": 1, "val": 3, "test": 5}
mask2negid_dgl = {"train": 2, "val": 4, "test": 6}
if BACKEND == 'pyg' and not hasattr(graph, f"{mask}_neg_edge_index"):
from torch_geometric.utils import negative_sampling
logging.warn(
"No negative edges passed, will generate random negative edges instead."
" However, results may be inconsistent across different run."
" Fix negative edges before passing the dataset is recommended"
)
setattr(graph, f"{mask}_neg_edge_index", negative_sampling(
getattr(graph, f"{mask}_pos_edge_index"), graph.num_nodes
))
elif BACKEND == 'dgl':
neg_graph = graph[{"train": 2, "val": 4, "test": 6}[mask]]
if neg_graph is None or len(neg_graph.edges()[0]) == 0:
logging.warn(
"No negative edges passed, will generate random negative edges instead."
" However, results may be inconsistent across different run."
" Fix negative edges before passing the dataset is recommended"
)
neg_edges = _negative_sample_dgl(graph[0], graph[{"train": 1, "val": 3, "test": 5}[mask]])
graph[{"train": 2, "val": 4, "test": 6}[mask]] = neg_edges
predicted = self.predict_proba(dataset, True, True, use_ensemble, use_best, name, mask)
if label is None:
if BACKEND == 'pyg':
pos_edge_index, neg_edge_index = (
getattr(dataset[0], f"{mask}_pos_edge_index"),
getattr(dataset[0], f"{mask}_neg_edge_index"),
)
elif BACKEND == 'dgl':
pos_edge_index, neg_edge_index = (
torch.stack(self.dataset[0][mask2posid_dgl[mask]].edges()),
torch.stack(self.dataset[0][mask2negid_dgl[mask]].edges())
)
E = pos_edge_index.size(1) + neg_edge_index.size(1)
label = torch.zeros(E, dtype=torch.float)
label[: pos_edge_index.size(1)] = 1.0
label = label.cpu().numpy()
evaluator = get_feval(metric)
if isinstance(evaluator, Sequence):
return [evals.evaluate(predicted, label) for evals in evaluator]
return evaluator.evaluate(predicted, label)
[docs] @classmethod
def from_config(cls, path_or_dict, filetype="auto") -> "AutoLinkPredictor":
"""
Load solver from config file.
You can use this function to directly load a solver from predefined config dict
or config file path. Currently, only support file type of ``json`` or ``yaml``,
if you pass a path.
Parameters
----------
path_or_dict: str or dict
The path to the config file or the config dictionary object
filetype: str
The filetype the given file if the path is specified. Currently only support
``json`` or ``yaml``. You can set to ``auto`` to automatically detect the file
type (from file name). Default ``auto``.
Returns
-------
solver: autogl.solver.AutoGraphClassifier
The solver that is created from given file or dictionary.
"""
assert filetype in ["auto", "yaml", "json"], (
"currently only support yaml file or json file type, but get type "
+ filetype
)
if isinstance(path_or_dict, str):
if filetype == "auto":
if path_or_dict.endswith(".yaml") or path_or_dict.endswith(".yml"):
filetype = "yaml"
elif path_or_dict.endswith(".json"):
filetype = "json"
else:
LOGGER.error(
"cannot parse the type of the given file name, "
"please manually set the file type"
)
raise ValueError(
"cannot parse the type of the given file name, "
"please manually set the file type"
)
if filetype == "yaml":
path_or_dict = yaml.load(
open(path_or_dict, "r").read(), Loader=yaml.FullLoader
)
else:
path_or_dict = json.load(open(path_or_dict, "r"))
path_or_dict = deepcopy(path_or_dict)
solver = cls(None, [], None, None)
fe_list = path_or_dict.pop("feature", None)
if fe_list is not None:
fe_list_ele = []
for feature_engineer in fe_list:
name = feature_engineer.pop("name")
if name is not None:
fe_list_ele.append(FEATURE_DICT[name](**feature_engineer))
if fe_list_ele != []:
solver.set_feature_module(fe_list_ele)
models = path_or_dict.pop("models", [{"name": "gcn"}, {"name": "gat"}, {"name": "sage"}, {"name": "gin"}])
# models should be a list of model
# with each element in two cases
# * a dict describing a certain model
# * a dict containing {"encoder": encoder, "decoder": decoder}
model_hp_space = [
_parse_model_hp(model) for model in models
]
model_list = [
_initialize_single_model(model) for model in models
]
trainer = path_or_dict.pop("trainer", None)
default_trainer = "LinkPredictionFull"
trainer_space = None
if isinstance(trainer, dict):
# global default
default_trainer = trainer.pop("name", "LinkPredictionFull")
trainer_space = _parse_hp_space(trainer.pop("hp_space", None))
default_kwargs = {"num_features": None}
default_kwargs.update(trainer)
default_kwargs["init"] = False
for i in range(len(model_list)):
model = model_list[i]
trainer_wrap = TRAINER_DICT[default_trainer](
model=model, **default_kwargs
)
model_list[i] = trainer_wrap
elif isinstance(trainer, list):
# sequential trainer definition
assert len(trainer) == len(
model_list
), "The number of trainer and model does not match"
trainer_space = []
for i in range(len(model_list)):
train, model = trainer[i], model_list[i]
default_trainer = train.pop("name", "LinkPredictionFull")
trainer_space.append(_parse_hp_space(train.pop("hp_space", None)))
default_kwargs = {"num_features": None}
default_kwargs.update(train)
default_kwargs["init"] = False
trainer_wrap = TRAINER_DICT[default_trainer](
model=model, **default_kwargs
)
model_list[i] = trainer_wrap
solver.set_graph_models(
model_list, default_trainer, trainer_space, model_hp_space
)
hpo_dict = path_or_dict.pop("hpo", {"name": "anneal"})
if hpo_dict is not None:
name = hpo_dict.pop("name")
solver.set_hpo_module(name, **hpo_dict)
ensemble_dict = path_or_dict.pop("ensemble", {"name": "voting"})
if ensemble_dict is not None:
name = ensemble_dict.pop("name")
solver.set_ensemble_module(name, **ensemble_dict)
return solver