import time
import numpy as np
import torch
import typing as _typing
from sklearn import preprocessing
from sklearn.metrics.pairwise import cosine_similarity
import tqdm
import tabulate
import autogl.data.graph
from ._base_feature_engineer import BaseFeatureEngineer
from ._feature_engineer_registry import FeatureEngineerUniversalRegistry
from ._selectors import GBDTFeatureSelector
from ...utils import get_logger
LOGGER = get_logger("Feature")
@FeatureEngineerUniversalRegistry.register_feature_engineer("identity")
class IdentityFeature(BaseFeatureEngineer):
...
@FeatureEngineerUniversalRegistry.register_feature_engineer("OnlyConst".lower())
class OnlyConstFeature(BaseFeatureEngineer):
def _transform(
self, data: _typing.Union[autogl.data.graph.GeneralStaticGraph, _typing.Any]
) -> _typing.Union[autogl.data.graph.GeneralStaticGraph, _typing.Any]:
if isinstance(data, autogl.data.graph.GeneralStaticGraph):
for node_t in data.nodes:
for candidate_feature_key in ('feat', 'x'):
if candidate_feature_key in data.nodes[node_t].data:
data.nodes[node_t].data[candidate_feature_key] = torch.ones(
(data.nodes[node_t].data[candidate_feature_key].size(0), 1)
).to(data.nodes[node_t].data[candidate_feature_key])
elif len(data.nodes[node_t].data) > 0:
_ref = data.nodes[node_t].data[list(data.nodes[node_t].data)[0]]
data.nodes[node_t].data[candidate_feature_key] = (
torch.ones((_ref.size(0), 1)).to(_ref)
)
else:
data.nodes[node_t].data[candidate_feature_key] = torch.ones(
(torch.unique(data.edges.connections).size(0), 1)
)
elif hasattr(data, 'x') and isinstance(data.x, torch.Tensor):
data.x = torch.ones((data.x.shape[0], 1)).to(data.x)
elif hasattr(data, 'edge_index') and isinstance(data.edge_index, torch.Tensor):
data.x = torch.ones((torch.unique(data.edge_index).size(0), 1)).to(data.edge_index)
else:
raise ValueError("Unsupported provided data")
return data
def op_sum(x, nbs):
res = np.zeros_like(x)
for u in range(len(nbs)):
nb = nbs[u]
if len(nb != 0):
res[u] = np.sum(x[nb], axis=0)
return res
def op_mean(x, nbs):
res = np.zeros_like(x)
for u in range(len(nbs)):
nb = nbs[u]
if len(nb != 0):
res[u] = np.mean(x[nb], axis=0)
return res
def op_max(x, nbs):
res = np.zeros_like(x)
for u in range(len(nbs)):
nb = nbs[u]
if len(nb != 0):
res[u] = np.max(x[nb], axis=0)
return res
def op_min(x, nbs):
res = np.zeros_like(x)
for u in range(len(nbs)):
nb = nbs[u]
if len(nb != 0):
res[u] = np.min(x[nb], axis=0)
return res
def op_prod(x, nbs):
res = np.zeros_like(x)
for u in range(len(nbs)):
nb = nbs[u]
if len(nb != 0):
res[u] = np.prod(x[nb], axis=0)
return res
mms = preprocessing.MinMaxScaler()
ss = preprocessing.StandardScaler()
def scale(x):
return ss.fit_transform(x)
class Timer:
def __init__(self, timebudget=None):
self._timebudget = timebudget
self._esti_time = 0
self._g_start = time.time()
def start(self):
self._start = time.time()
def end(self):
time_use = time.time() - self._start
self._esti_time = (self._esti_time + time_use) / 2
def is_timeout(self):
timebudget = self._timebudget
if timebudget:
timebudget = self._timebudget - (time.time() - self._g_start)
if timebudget < self._esti_time:
return True
return False
[docs]@FeatureEngineerUniversalRegistry.register_feature_engineer('DeepGL'.lower())
class AutoFeatureEngineer(BaseFeatureEngineer):
r"""
Notes
-----
An implementation of auto feature engineering method Deepgl [#]_ ,which iteratively generates features by aggregating neighbour features
and select a fixed number of features to automatically add important graph-aware features.
References
----------
.. [#] Rossi, R. A., Zhou, R., & Ahmed, N. K. (2020).
Deep Inductive Graph Representation Learning.
IEEE Transactions on Knowledge and Data Engineering, 32(3), 438–452.
https://doi.org/10.1109/TKDE.2018.2878247
Parameters
----------
fix_length : int
fixed number of features for every epoch. The final number of features added will be
``fixlen`` \times ``max_epoch``, 200 \times 5 in default.
max_epoch : int
number of epochs in total process.
time_budget : int
timebudget(seconds) for the feature engineering process, None for no time budget . Note that
this time budget is a soft budget ,which is obtained by rough time estimation through previous iterations and
may finally exceed the actual timebudget
y_sel_func : Callable
feature selector function object for selection at each iteration ,lightgbm in default. Note that in original paper,
connected components of feature graph is used , and you may implement it by yourself if you want.
verbosity : int
hide any infomation except error and fatal if ``verbosity`` < 1
"""
def __init__(
self,
fix_length: int = 200,
max_epoch: int = 5,
time_budget: _typing.Optional[int] = None,
feature_selector=GBDTFeatureSelector,
verbosity: int = 0,
*args, **kwargs
):
super(AutoFeatureEngineer, self).__init__()
self._ops = [op_sum, op_mean, op_max, op_min]
self._sim = cosine_similarity
self._fixlen = fix_length
self._max_epoch = max_epoch
self._timebudget = time_budget
self._feature_selector = feature_selector(
fix_length, verbose_eval=verbosity >= 1, *args, **kwargs
)
self._verbosity = verbosity
def _gen(self, x) -> np.ndarray:
res = []
for i, op in enumerate(self._ops):
res.append(op(x, self.__neighbours))
res = np.concatenate(res, axis=1)
return res
def _fit(self, homogeneous_static_graph: autogl.data.graph.GeneralStaticGraph):
if not (
homogeneous_static_graph.nodes.is_homogeneous and
homogeneous_static_graph.edges.is_homogeneous
):
raise ValueError
if 'x' in homogeneous_static_graph.nodes.data:
_feature_key = 'x'
_original_features: torch.Tensor = (
homogeneous_static_graph.nodes.data['x']
)
elif 'feat' in homogeneous_static_graph.nodes.data:
_feature_key = 'feat'
_original_features: torch.Tensor = (
homogeneous_static_graph.nodes.data['feat']
)
else:
raise ValueError
num_nodes = _original_features.size(0)
neighbours = [[] for _ in range(num_nodes)]
for u, v in homogeneous_static_graph.edges.connections.t().numpy():
neighbours[u].append(v)
self.__neighbours: _typing.Sequence[np.ndarray] = tuple(
[np.array(v) for v in neighbours]
)
x: np.ndarray = _original_features.numpy()
gx: np.ndarray = x.copy()
verbs = []
soft_timer = Timer(self._timebudget)
self._selection = []
for epoch in tqdm.tqdm(range(self._max_epoch), disable=self._verbosity <= 0):
soft_timer.start()
verb = [epoch, gx.shape[1]]
gx = self._gen(gx)
gx = scale(gx)
verb.append(gx.shape[1])
homogeneous_static_graph.nodes.data[_feature_key] = torch.from_numpy(gx)
self._feature_selector._fit(homogeneous_static_graph)
self._selection.append(self._feature_selector._selection)
homogeneous_static_graph = self._feature_selector._transform(
homogeneous_static_graph
)
gx: np.ndarray = homogeneous_static_graph.nodes.data[_feature_key].numpy()
verb.append(gx.shape[1])
x = np.concatenate([x, gx], axis=1)
verbs.append(verb)
soft_timer.end()
if soft_timer.is_timeout():
break
if self._verbosity >= 1:
LOGGER.info(
tabulate.tabulate(verbs, headers="epoch origin after-gen after-sel".split())
)
homogeneous_static_graph.nodes.data[_feature_key] = torch.from_numpy(x)
return homogeneous_static_graph
def _transform(self, homogeneous_static_graph: autogl.data.graph.GeneralStaticGraph):
if not (
homogeneous_static_graph.nodes.is_homogeneous and
homogeneous_static_graph.edges.is_homogeneous
):
raise ValueError
if 'x' in homogeneous_static_graph.nodes.data:
_feature_key = 'x'
_original_features: torch.Tensor = (
homogeneous_static_graph.nodes.data['x']
)
elif 'feat' in homogeneous_static_graph.nodes.data:
_feature_key = 'feat'
_original_features: torch.Tensor = (
homogeneous_static_graph.nodes.data['feat']
)
else:
raise ValueError
x: np.ndarray = _original_features.numpy()
gx: np.ndarray = x.copy()
for selection in self._selection:
gx = scale(self._gen(gx))[:, selection]
x = np.concatenate([x, gx], axis=1)
homogeneous_static_graph.nodes.data[_feature_key] = torch.from_numpy(x)
return homogeneous_static_graph