Compare commits

...

5 Commits

Author SHA1 Message Date
Dongzy
2e5b380d22 Merge branch 'master' into parallel 2018-11-30 22:37:19 +08:00
Dongzy
b48f2e38b7 Merge branch 'master' into parallel 2018-11-22 18:59:53 +08:00
Dongzy
3dc6936a84 Merge branch 'master' into parallel 2018-11-19 08:22:48 +08:00
Dongzy
4c788680ad Merge branch 'master' into parallel 2018-11-18 00:20:22 +08:00
Dongzy
cbf674e3cd parallel walker 2018-11-18 00:19:32 +08:00

View File

@ -6,6 +6,8 @@ alias sampling; walks by multiprocessors; etc.
by Chengbin Hou & Zeyu Dong 2018
"""
import functools
import multiprocessing
import random
import time
@ -26,48 +28,70 @@ class WeightedWalker:
# alias sampling for ABRW-------------------------
def simulate_walks(self, num_walks, walk_length):
global P_G
P_G = self.rec_G
t1 = time.time()
self.preprocess_transition_probs(weighted_G=self.rec_G) # construct alias table; adapted from node2vec
t2 = time.time()
print(f'Time for construct alias table: {(t2-t1):.2f}')
global alias_nodes
alias_nodes = self.alias_nodes
print(f'Time for construct alias table: {(t2-t1):.2f}')
walks = []
nodes = list(self.rec_G.nodes())
pool = multiprocessing.Pool(self.workers)
for walk_iter in range(num_walks):
t1 = time.time()
random.shuffle(nodes)
for node in nodes:
walks.append(self.weighted_walk(weighted_G=self.rec_G, walk_length=walk_length, start_node=node))
walks += pool.map(functools.partial(node2vec_walk, walk_length=walk_length), nodes)
t2 = time.time()
print(f'Walk iteration: {walk_iter+1}/{num_walks}; time cost: {(t2-t1):.2f}')
pool.close()
pool.join()
del alias_nodes, P_G
for i in range(len(walks)): # use ind to retrive original node ID
for j in range(len(walks[0])):
walks[i][j] = self.look_back_list[int(walks[i][j])]
return walks
def weighted_walk(self, weighted_G, walk_length, start_node): # more efficient way instead of copy from node2vec
G = weighted_G
walk = [start_node]
while len(walk) < walk_length:
cur = walk[-1]
cur_nbrs = list(G.neighbors(cur))
if len(cur_nbrs) > 0: # if non-isolated node
walk.append(cur_nbrs[alias_draw(self.alias_nodes[cur][0], self.alias_nodes[cur][1])]) # alias sampling in O(1) time to get the index of
else: # if isolated node # 1) randomly choose a nbr; 2) judge if use nbr or its alias
break
return walk
def preprocess_transition_probs(self, weighted_G):
''' reconstructed G mush be weighted; \n
return a dict of alias table for each node
'''
G = weighted_G
alias_nodes = {} # unlike node2vec, the reconstructed graph is based on transtion matrix
for node in G.nodes(): # no need to normalize again
probs = [G[node][nbr]['weight'] for nbr in G.neighbors(node)] # pick prob of neighbors with non-zero weight --> sum up to 1.0
alias_nodes[node] = alias_setup(probs) # alias table format {node_id: (array1, array2)}
self.alias_nodes = alias_nodes # where array1 gives alias node indexes; array2 gives its prob
alias_nodes = {}
nodes = G.nodes()
pool = multiprocessing.Pool(self.workers)
alias_nodes = dict(zip(nodes, pool.map(get_alias_node, nodes)))
pool.close()
pool.join()
self.alias_nodes = alias_nodes
def node2vec_walk(start_node, walk_length): # to do...
global P_G # more efficient way instead of copy from node2vec
global alias_nodes
walk = [start_node]
while len(walk) < walk_length:
cur = walk[-1]
cur_nbrs = list(P_G.neighbors(cur))
if len(cur_nbrs) > 0:
walk.append(cur_nbrs[alias_draw(alias_nodes[cur][0], alias_nodes[cur][1])])
else:
break
return walk
def get_alias_node(node):
global P_G
probs = [P_G[node][nbr]['weight'] for nbr in P_G.neighbors(node)]
return alias_setup(probs)
def deepwalk_walk_wrapper(class_instance, walk_length, start_node):