"""
HPO Module for tuning hyper parameters
"""
import time
import numpy as np
from tqdm import trange
from . import register_hpo
from .base import BaseHPOptimizer, TimeTooLimitedError
import random
from .autone_file import utils
from ..feature import NetLSD as SgNetLSD
from autogl.backend import DependentBackend
_isdgl=DependentBackend.is_dgl()
if _isdgl:
import dgl
else:
from torch_geometric.data import InMemoryDataset
from torch_geometric.data import GraphSAINTRandomWalkSampler
class _MyDataset(InMemoryDataset):
def __init__(self, datalist) -> None:
super().__init__()
self.data, self.slices = self.collate(datalist)
[docs]@register_hpo("autone")
class AutoNE(BaseHPOptimizer):
"""
AutoNE HPOptimizer
The Implementation of "AutoNE: Hyperparameter Optimization for Massive Network Embedding"(KDD 2019).
See https://github.com/tadpole/AutoNE for more information
Attributes
----------
max_evals : int
The max rounds of evaluating HPs
subgraphs : int
The number of subgraphs
sub_evals : int
The number of evaluation times on each subgraph
sample_batch_size, sample_walk_length : int
Using for sampling subgraph, see torch_geometric.data.GraphSAINRandomWalkSampler
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.max_evals = kwargs.get("max_evals", 100)
self.subgraphs = kwargs.get("subgraphs", 2)
self.sub_evals = kwargs.get("sub_evals", 2)
self.sample_batch_size = kwargs.get("sample_batch_size", 150)
self.sample_walk_length = kwargs.get("sample_walk_length", 100)
[docs] def optimize(self, trainer, dataset, time_limit=None, memory_limit=None):
"""
Optimize the HP by the method within give model and HP space
See .base.BaseHPOptimizer.optimize
"""
self.feval_name = trainer.get_feval(return_major=True).get_eval_name()
self.is_higher_better = trainer.get_feval(return_major=True).is_higher_better()
space = trainer.combined_hyper_parameter_space()
# space = (
# trainer.hyper_parameter_space + trainer.get_model().hyper_parameter_space
# )
current_space = self._encode_para(space)
def sample_subgraph(whole_data):
data = whole_data.data
loader = GraphSAINTRandomWalkSampler(
data,
batch_size=self.sample_batch_size,
walk_length=self.sample_walk_length,
num_steps=self.subgraphs,
save_dir=whole_data.processed_dir,
)
results = []
for data in loader:
in_dataset = _MyDataset([data])
results.append(in_dataset)
return results
def sample_subgraph_dgl(whole_data):
data = whole_data[0] # dgl data
# find data with different labels
# random walk
start = [random.randint(0, data.num_nodes - 1) for i in range(self.subgraphs)]
traces, _ = dgl.sampling.random_walk_with_restart(data, start, length=self.sample_batch_size, restart_prob= 1 / self.sample_walk_length)
subgraphs = dgl.node_subgraph(data, [traces[i, :] for i in traces.size(0)])
return subgraphs
func = SgNetLSD()
def get_wne(graph):
graph = func.fit_transform(graph)
# transform = nx.NxGraph.compose(map(lambda x: x(), nx.NX_EXTRACTORS))
# print(type(graph))
# gf = transform.fit_transform(graph).data.gf
gf = graph.data.gf
fin = list(gf[0]) + list(map(lambda x: float(x), gf[1:]))
return fin
start_time = time.time()
def fn(dset, para):
para['dropout'] = float(para['dropout'])
current_trainer = trainer.duplicate_from_hyper_parameter(para)
current_trainer.train(dset)
loss, self.is_higher_better = current_trainer.get_valid_score(dset)
if self.is_higher_better:
loss = -loss
return current_trainer, loss
# code in AutoNE
sampled_number = self.subgraphs
k = self.sub_evals
s = self.max_evals
X = []
y = []
params = utils.Params()
params.set_space(current_space)
total_t = 0.0
info = []
K = utils.K(len(params.type_))
gp = utils.GaussianProcessRegressor(K)
if _isdgl:
sample_graphs = sample_subgraph_dgl(dataset)
else:
sample_graphs = sample_subgraph(dataset)
print("Sample Phase:\n")
for t in trange(sampled_number):
b_t = time.time()
i = t
subgraph = sample_graphs[t]
wne = get_wne(subgraph)
for v in range(k):
kargs = params.random_x()
para = params.x2dict(kargs)
externel_para, trial_para = self._decode_para(para)
_, res = fn(subgraph, externel_para)
X_reg = params.dict2x(trial_para)
X.append(np.hstack((X_reg, wne)))
y.append(res)
best_res = None
best_trainer = None
best_para = None
wne = get_wne(dataset)
print("HPO Search Phase:\n")
for t in trange(s):
if time.time() - start_time > time_limit:
self.logger.info("Time out of limit, Epoch: {}".format(str(i)))
break
b_t = time.time()
gp.fit(np.vstack(X), y)
X_temp, _ = gp.predict(params.get_bound(), params.get_type(), wne)
X_temp = X_temp[: len(params.type_)]
para = params.x2dict(X_temp)
externel_para, trial_para = self._decode_para(para)
current_trainer, res_temp = fn(dataset, externel_para)
self._print_info(externel_para, res_temp)
X_reg = params.dict2x(trial_para)
X.append(np.hstack((X_reg, wne)))
y.append(res_temp)
if not best_res or res_temp < best_res:
best_res = res_temp
best_trainer = current_trainer
best_para = para
else:
del current_trainer
e_t = time.time()
total_t += e_t - b_t
if not best_res:
raise TimeTooLimitedError(
"Given time is too limited to finish one round in HPO."
)
decoded_json, _ = self._decode_para(best_para)
self.logger.info("Best Parameter:")
self._print_info(decoded_json, best_res)
return best_trainer, decoded_json
[docs] @classmethod
def build_hpo_from_args(cls, args):
"""Build a new hpo instance."""
raise NotImplementedError(
"HP Optimizer must implement the build_hpo_from_args method"
)