"""
HPO Module for tuning hyper parameters
"""
import random
import time
import numpy as np
from tqdm import trange
# from ..feature import NetLSD as SgNetLSD
from . import register_hpo
from .autone_file import utils
from .base import BaseHPOptimizer, TimeTooLimitedError
# _isdgl = DependentBackend.is_dgl()
_isdgl = False
if _isdgl:
import dgl
else:
from torch_geometric.data import GraphSAINTRandomWalkSampler, InMemoryDataset
class _MyDataset(InMemoryDataset):
def __init__(self, datalist) -> None:
super().__init__()
self.data, self.slices = self.collate(datalist)
[docs]@register_hpo("autone")
class AutoNE(BaseHPOptimizer):
"""
AutoNE HPOptimizer
The Implementation of "AutoNE: Hyperparameter Optimization for Massive Network Embedding"(KDD 2019).
See https://github.com/tadpole/AutoNE for more information
Attributes
----------
max_evals : int
The max rounds of evaluating HPs
subgraphs : int
The number of subgraphs
sub_evals : int
The number of evaluation times on each subgraph
sample_batch_size, sample_walk_length : int
Using for sampling subgraph, see torch_geometric.data.GraphSAINRandomWalkSampler
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.max_evals = kwargs.get("max_evals", 100)
self.subgraphs = kwargs.get("subgraphs", 2)
self.sub_evals = kwargs.get("sub_evals", 2)
self.sample_batch_size = kwargs.get("sample_batch_size", 150)
self.sample_walk_length = kwargs.get("sample_walk_length", 100)
self.dataset = kwargs.get("dataset", None)
[docs] def optimize(self):
"""
Optimize the HP by the method within give model and HP space
See .base.BaseHPOptimizer.optimize
"""
if self.dataset is None:
raise ValueError("dataset is None")
dataset = self.dataset
space = self.hp_space
current_space = self._encode_para(space)
def sample_subgraph(whole_data):
data = whole_data.data
loader = GraphSAINTRandomWalkSampler(
data,
batch_size=self.sample_batch_size,
walk_length=self.sample_walk_length,
num_steps=self.subgraphs,
save_dir=whole_data.processed_dir,
)
results = []
for data in loader:
in_dataset = _MyDataset([data])
results.append(in_dataset)
return results
def sample_subgraph_dgl(whole_data):
data = whole_data[0] # dgl data
# find data with different labels
# random walk
start = [
random.randint(0, data.num_nodes - 1) for i in range(self.subgraphs)
]
traces, _ = dgl.sampling.random_walk_with_restart(
data,
start,
length=self.sample_batch_size,
restart_prob=1 / self.sample_walk_length,
)
subgraphs = dgl.node_subgraph(data, [traces[i, :] for i in traces.size(0)])
return subgraphs
func = SgNetLSD()
def get_wne(graph):
graph = func.fit_transform(graph)
# transform = nx.NxGraph.compose(map(lambda x: x(), nx.NX_EXTRACTORS))
# print(type(graph))
# gf = transform.fit_transform(graph).data.gf
gf = graph.data.gf
fin = list(gf[0]) + list(map(lambda x: float(x), gf[1:]))
return fin
start_time = time.time()
# code in AutoNE
sampled_number = self.subgraphs
k = self.sub_evals
s = self.max_evals
X = []
y = []
params = utils.Params()
params.set_space(current_space)
total_t = 0.0
info = []
K = utils.K(len(params.type_))
gp = utils.GaussianProcessRegressor(K)
if _isdgl:
sample_graphs = sample_subgraph_dgl(dataset)
else:
sample_graphs = sample_subgraph(dataset)
print("Sample Phase:\n")
for t in trange(sampled_number):
b_t = time.time()
i = t
subgraph = sample_graphs[t]
wne = get_wne(subgraph)
for v in range(k):
kargs = params.random_x()
para = params.x2dict(kargs)
externel_para, trial_para = self._decode_para(para)
perf = self.f(externel_para, subgraph)
X_reg = params.dict2x(trial_para)
X.append(np.hstack((X_reg, wne)))
y.append(perf)
best_perf = None
best_hp = None
wne = get_wne(dataset)
print("HPO Search Phase:\n")
for t in trange(s):
if time.time() - start_time > self.time_limit:
self.logger.info("Time out of limit, Epoch: {}".format(str(i)))
break
b_t = time.time()
gp.fit(np.vstack(X), y)
X_temp, _ = gp.predict(params.get_bound(), params.get_type(), wne)
X_temp = X_temp[: len(params.type_)]
para = params.x2dict(X_temp)
externel_para, trial_para = self._decode_para(para)
perf_temp = self.f(externel_para, dataset)
self.trials.append(
self._creat_a_trail(
"HPO", "Autone", "Completed", externel_para, perf_temp
)
)
# self._print_info(externel_para, perf_temp)
X_reg = params.dict2x(trial_para)
X.append(np.hstack((X_reg, wne)))
y.append(perf_temp)
if not best_perf or perf_temp < best_perf:
best_perf = perf_temp
best_hp = externel_para
e_t = time.time()
total_t += e_t - b_t
if not best_perf:
raise TimeTooLimitedError(
"Given time is too limited to finish one round in HPO."
)
# self.logger.info("Best Parameter:")
# self._print_info(best_hp, best_perf)
return best_hp, best_perf