nostrdvm/nostr_dvm/utils/wot_utils.py
2025-02-01 19:35:39 +01:00

640 lines
20 KiB
Python

import asyncio
import datetime
# General
import json
import random
import time
from itertools import islice
import networkx as nx
import nostr_sdk
import numpy as np
from nostr_sdk import Options, Keys, NostrSigner, ClientBuilder, Kind, PublicKey, Filter
from scipy.sparse import lil_matrix, isspmatrix_csr
from nostr_dvm.utils.definitions import relay_timeout
from nostr_dvm.utils.dvmconfig import DVMConfig
from nostr_dvm.utils.nostr_utils import check_and_set_private_key
async def get_following(pks, max_time_request=10, newer_than_time=None, dvm_config=DVMConfig()):
'''
OUTPUT: following; a networkx graph
> each node has associated the timestamp of the latest retrivable kind3 event
> it's possible to limit the search to events newer_than_time
NOTE: do not get_following of more than 1000 pks;
instead, divide the request into batches of smaller size (500 recommended)
'''
# handling the case of a single key passed
if type(pks) == str:
pks = [pks]
# transforming into PubKey object type
list_pk = [nostr_sdk.PublicKey.parse(pk) for pk in pks]
# newer_than_time provided? If so, it only fetch events that are newer
if newer_than_time is None:
filter = Filter().authors(list_pk).kind(Kind(3))
else:
newer_than_time = round(newer_than_time)
ts = nostr_sdk.Timestamp().from_secs(newer_than_time)
filter = Filter().authors(list_pk).kind(Kind(3)).since(ts)
# fetching events
keys = Keys.parse(check_and_set_private_key("test_client"))
cli = ClientBuilder().signer(NostrSigner.keys(keys)).build()
for relay in dvm_config.SYNC_DB_RELAY_LIST:
await cli.add_relay(relay)
await cli.connect()
events = await cli.fetch_events(filter, relay_timeout)
for relay in dvm_config.SYNC_DB_RELAY_LIST:
await cli.force_remove_relay(relay)
await cli.shutdown()
# initializing the graph structure
following = nx.DiGraph()
following.add_nodes_from(pks)
if not events.to_vec():
return following
for event in events.to_vec():
author = event.author().to_hex()
# events are returned based on the timestamp,
# the first event for each pubkey is the most recent = the one we should use
if event.verify() and author in following.nodes() and 'timestamp' not in following.nodes[author]:
# updating the nodes and edges
nodes = event.tags().public_keys() # TODO
# converting to hex and removing self-following
nodes = [pk.to_hex() for pk in nodes if pk.to_hex() != author]
following.update(edges=[(author, pk) for pk in nodes], nodes=nodes)
# updating the timestamp
tp = event.created_at().as_secs()
following.nodes[author]['timestamp'] = tp
return following
async def build_wot_network(seed_pks, depth=2, max_batch=500, max_time_request=10, dvm_config=DVMConfig()):
if not seed_pks:
print('Error: seed_pks cannot be empty')
return
if depth < 1:
print('Error: depth cannot be lower than 1')
return
# handling the case of a single key being passed
if type(seed_pks) == str:
seed_pks = [seed_pks]
print('WOT: fetching kind 3 events from relays & pre-processing to build WOT')
tic = time.time()
# initialize the index_map
index_map = {pk: i for i, pk in enumerate(seed_pks)}
# initialize the graph
seed_graph = nx.DiGraph()
seed_graph.add_nodes_from(index_map.values())
# build the network internal function
index_map, network_graph = await _build_network_from(index_map, seed_graph, set(), depth, max_batch,
max_time_request, dvm_config=dvm_config)
toc = time.time()
print('\nFinished in ' + str(toc - tic))
return index_map, network_graph
async def _build_network_from(index_map, network_graph, visited_pk=None, depth=2, max_batch=500, max_time_request=10, dvm_config=DVMConfig()):
'''
OUTPUTS:
1. index_map = {pk0 : 0, pk1 : 1, ... }
> an index that translates pub keys in hex format into nodes in the graph
2. network_graph; a networkx directed graph about who follows who
> each node has associated the timestamp of the latest retrivable kind3 event
> if the node is a leaf or doesn't follow anyone, it has no attribute 'timestamp'
'''
# pks to be visited next, splitted in batches
if visited_pk is None:
visited_pk = set()
to_visit_pk = split_set(index_map.keys() - visited_pk, max_batch)
for pks in to_visit_pk:
# getting the followings as a graph
following = await get_following(pks, max_time_request, None, dvm_config)
# update the visited_pk
visited_pk.update(pks)
# add the new pub_keys to the index_map
index_map = _extend_index_map(index_map, following)
# re-lable nodes using the index_map to use storage more efficiently
nx.relabel_nodes(following, index_map, copy=False)
# extend the network graph
network_graph.update(following)
if depth == 1:
return index_map, network_graph
else:
# recursive call
try:
index_map, network_graph = await _build_network_from(index_map, network_graph, visited_pk, depth - 1, max_batch,
max_time_request, dvm_config=dvm_config)
except BaseException as e:
print(e)
print('current network: ' + str(len(network_graph.nodes())) + ' npubs', end='\r')
return index_map, network_graph
def _extend_index_map(index_map, following):
'''
index_map = { pk0: 0, pk1:1, ...}
'''
# getting all new pubkeys
new_pk = following.nodes() - index_map.keys()
# start assigning new indices from the next available index
start_index = len(index_map)
# Update the index_map with new keys and their assigned indices
for i, pk in enumerate(new_pk, start=start_index):
index_map[pk] = i
return index_map
def split_set(my_set, max_batch):
my_list = list(my_set)
return [set(my_list[x: x + max_batch]) for x in range(0, len(my_set), max_batch)]
def save_network(index_map, network_graph, name=None):
if name is None:
# adding unix time to file name to avoid replacing an existing file
name = str(round(time.time()))
# filename = os.path.join('/cache/', 'index_map_' + name + '.json')
filename = 'index_map_' + name + '.json'
# saving the index_map as a json file
with open(filename, 'w') as f:
json.dump(index_map, f, indent=4)
# Convert to node-link format suitable for JSON
data = nx.node_link_data(network_graph)
# saving the network_graph as a json file
# filename = os.path.join('/cache/', 'network_graph_' + name + '.json')
filename = 'network_graph_' + name + '.json'
with open(filename, 'w') as f:
json.dump(data, f)
print(' > index_map_' + name + '.json')
print(' > network_graph_' + name + '.json')
return
def load_network(name):
if type(name) != str:
name = str(name)
# loading the index_map
with open('index_map_' + name + '.json', 'r') as f:
index_map = json.load(f)
# loading the JSON for the graph
with open('network_graph_' + name + '.json', 'r') as f:
data = json.load(f)
# Convert JSON back to graph
network_graph = nx.node_link_graph(data)
return index_map, network_graph
def get_mc_pagerank(G, R, nodelist=None, alpha=0.85):
'''
Monte-Carlo complete path stopping at dandling nodes
INPUTS
------
G: graph
A directed Networkx graph. This function cannot work on directed graphs.
R: int
The number of random walks to be performed per node
nodelist: list, optional
the list of nodes in G networkx graph.
It is used to order the nodes in a specified way
alpha: float, optional
It is the dampening factor of Pagerank. default value is 0.85
OUTPUTS
-------
walk_visited_count: CSR matrix
a Compressed Sparse Row (CSR) matrix; element (i,j) is equal to
the number of times v_j has been visited by a random walk started from v_i
mc_pagerank: dict
The dictionary {node: pg} of the pagerank value for each node in G
References
----------
[1] K.Avrachenkov, N. Litvak, D. Nemirovsky, N. Osipova
"Monte Carlo methods in PageRank computation: When one iteration is sufficient"
https://www-sop.inria.fr/members/Konstantin.Avratchenkov/pubs/mc.pdf
'''
# validate all the inputs and initialize variables
N, nodelist, inverse_nodelist = _validate_inputs_and_init_mc(G, R, nodelist, alpha)
# initialize walk_visited_count as a sparse LIL matrix
walk_visited_count = lil_matrix((N, N), dtype='int')
progress_count = 0
# perform R random walks for each node
for node in nodelist:
# print progress every 200 nodes
progress_count += 1
if progress_count % 200 == 0:
print('progress = {:.2f}%'.format(100 * progress_count / N), end='\r')
for _ in range(R):
node_pos = inverse_nodelist[node]
walk_visited_count[node_pos, node_pos] += 1
current_node = node
while random.uniform(0, 1) < alpha:
successors = list(G.successors(current_node))
if not successors:
break
current_node = random.choice(successors)
current_node_pos = inverse_nodelist[current_node]
# add current node to the walk_visited_count
walk_visited_count[node_pos, current_node_pos] += 1
# convert lil_matrix to csr_matrix for efficient storage and access
walk_visited_count = walk_visited_count.tocsr()
# sum all visits for each node into a numpy array
total_visited_count = np.array(walk_visited_count.sum(axis=0)).flatten()
# reciprocal of the number of total visits
one_over_s = 1 / sum(total_visited_count)
mc_pagerank = {nodelist[j]: total_visited_count[j] * one_over_s for j in range(N)}
print('progress = 100% ', end='\r')
print('\nTotal walks performed: ', N * R)
return walk_visited_count, mc_pagerank
def _validate_inputs_and_init_mc(G, R, nodelist, alpha):
'''
This function validate the inputs and initialize the following variables:
N: int
the number of nodes in G Networkx graph
nodelist : list
the list of nodes in G Networkx graph
inverse_nodelist : dict
a dictionary that maps each node in G to its position in nodelist
'''
N = len(G)
if N == 0:
raise ValueError("Graph G is empty")
if not isinstance(R, int) or R <= 0:
raise ValueError("R must be a positive integer")
if not isinstance(alpha, float) or not (0 < alpha < 1):
raise ValueError("alpha must be a float between 0 and 1")
if nodelist is not None and set(nodelist) != set(G.nodes()):
raise ValueError("nodelist does not match the nodes in G")
elif nodelist is None:
nodelist = list(G.nodes())
# compute the inverse map of nodelist
inverse_nodelist = {nodelist[j]: j for j in range(N)}
return N, nodelist, inverse_nodelist
def get_subrank(S, G, walk_visited_count, nodelist, alpha=0.85):
'''
Subrank algorithm (stopping at dandling nodes);
it aims to approximate the Pagerank over S subgraph of G
INPUTS
------
S: graph
A directed Networkx graph, induced subgraph of G
G: graph
A directed Networkx graph. This function cannot work on directed graphs.
walk_visited_count: CSR matrix
a Compressed Sparse Row (CSR) matrix; element (i,j) is equal to
the number of times v_j has been visited by a random walk started from v_i
nodelist: list, optional
the list of nodes in G Networkx graph. It is used to decode walk_visited_count
alpha: float, optional
It is the dampening factor of Pagerank. default value is 0.85
OUTPUTS
-------
subrank: dict
The dictionary {node: pg} of the pagerank value for each node in S
References
----------
[1] Pippellia,
"Pagerank on subgraphs—efficient Monte-Carlo estimation"
https://pippellia.com/pippellia/Social+Graph/Pagerank+on+subgraphs%E2%80%94efficient+Monte-Carlo+estimation
'''
# validate inputs and initialize variables
N, S_nodes, G_nodes, inverse_nodelist = _validate_inputs_and_init(S, G, walk_visited_count, nodelist, alpha)
# compute visited count from walks that started from S
visited_count_from_S = _get_visited_count_from_S(N, S_nodes, walk_visited_count, nodelist, inverse_nodelist)
# compute positive and negative walks to do
positive_walks, negative_walks = _get_walks_to_do(S_nodes, G_nodes, S, G, visited_count_from_S, alpha)
print(f'walks performed = {sum(positive_walks.values()) + sum(negative_walks.values())}')
# perform the walks and get the visited counts
positive_count = _perform_walks(S_nodes, S, positive_walks, alpha)
negative_count = _perform_walks(S_nodes, S, negative_walks, alpha)
# add the effects of the random walk to the count of G
new_visited_count = {node: visited_count_from_S[node] + positive_count[node] - negative_count[node]
for node in S_nodes}
# compute the number of total visits
total_visits = sum(new_visited_count.values())
# compute the subrank
subrank = {node: visits / total_visits
for node, visits in new_visited_count.items()}
return subrank
def _validate_inputs_and_init(S, G, walk_visited_count, nodelist, alpha):
'''
This function validate the inputs and initialize the following variables:
N: int
the number of nodes in G Networkx graph
S_nodes: set
the set of nodes that belongs to S
G_nodes: set
the set of nodes that belongs to G
inverse_nodelist : dict
a dictionary that maps each node in G to its position in nodelist
Note: S being a subgraph of G is NOT checked because it's computationally expensive.
'''
if len(S) == 0:
raise ValueError("graph S is empty")
N = len(G)
if N == 0:
raise ValueError("graph G is empty")
if not isinstance(alpha, float) or not (0 < alpha < 1):
raise ValueError("alpha must be a float between 0 and 1")
if not isspmatrix_csr(walk_visited_count) or walk_visited_count.shape != (N, N):
raise ValueError(f"walk_visited_count must be a {(N, N)} CSR matrix")
S_nodes = set(S.nodes())
G_nodes = set(G.nodes())
if not nodelist or set(nodelist) != set(G_nodes):
raise ValueError("nodelist does not match the nodes in G")
# compute the inverse map of nodelist
inverse_nodelist = {nodelist[j]: j for j in range(N)}
return N, S_nodes, G_nodes, inverse_nodelist
def _get_visited_count_from_S(N, S_nodes, walk_visited_count, nodelist, inverse_nodelist):
'''
This function extracts the number of visits that come from walks that started from S
'''
# getting the indices of nodes in S
S_indices = [inverse_nodelist[node] for node in S_nodes]
# Extract the rows
S_matrix = walk_visited_count[S_indices, :]
# Sum the rows
visited_count_from_S = np.array(S_matrix.sum(axis=0)).flatten()
# convert to a dictionary
visited_count_from_S = {nodelist[j]: visited_count_from_S[j] for j in range(N)}
return visited_count_from_S
def _get_walks_to_do(S_nodes, G_nodes, S, G, visited_count_from_S, alpha):
'''
This function calculates the positive and negative walks to be done for each node.
It is a necessary step to take into account the different structure of S
with respect to that of G.
'''
# compute nodes in G-S
external_nodes = G_nodes - S_nodes
# compute nodes in S that point to G-S
nodes_that_point_externally = {u for u, v in nx.edge_boundary(G, S_nodes, external_nodes)}
walks_to_do = {node: 0 for node in S_nodes}
# add positive random walks to walks_to_do
for node in nodes_that_point_externally:
successors = set(G.successors(node)) & S_nodes
if successors:
# compute estimate visits
visited_count = visited_count_from_S[node]
degree_S = S.out_degree(node)
degree_G = G.out_degree(node)
estimate_visits = alpha * visited_count * (1 / degree_S - 1 / degree_G)
for succ in successors:
walks_to_do[succ] += estimate_visits
# subtract number of negative random walks
for node in external_nodes:
successors = set(G.successors(node)) & S_nodes
if successors:
# compute estimate visits
visited_count = visited_count_from_S[node]
degree = G.out_degree(node)
estimate_visits = alpha * visited_count / degree
for succ in successors:
walks_to_do[succ] -= estimate_visits
# split the walks to do into positive and negative
positive_walks_to_do = {node: round(value) for node, value in walks_to_do.items() if value > 0}
negative_walks_to_do = {node: round(-value) for node, value in walks_to_do.items() if value < 0}
return positive_walks_to_do, negative_walks_to_do
def _perform_walks(S_nodes, S, walks_to_do, alpha):
'''
This function performs a certain number of random walks on S for each node;
It then returns the visited count for each node in S.
'''
# initializing the visited count
visited_count = {node: 0 for node in S_nodes}
for starting_node in walks_to_do.keys():
num = walks_to_do[starting_node]
# performing num random walks
for _ in range(num):
current_node = starting_node
visited_count[current_node] += 1
# performing one random walk
while random.uniform(0, 1) < alpha:
successors = list(S.successors(current_node))
if not successors:
break
current_node = random.choice(successors)
# updating the visited count
visited_count[current_node] += 1
return visited_count
async def get_metadata(npub):
name = ""
nip05 = ""
lud16 = ""
try:
pk = PublicKey.parse(npub)
except:
return "", "", ""
keys = Keys.parse(check_and_set_private_key("test_client"))
client = ClientBuilder().signer(NostrSigner.keys(keys)).build()
await client.add_relay("wss://relay.damus.io")
await client.add_relay("wss://purplepag.es")
await client.connect()
profile_filter = Filter().kind(Kind(0)).author(pk).limit(1)
events_struct = await client.fetch_events(profile_filter, relay_timeout)
events = events_struct.to_vec()
if len(events) > 0:
try:
profile = json.loads(events[0].content())
if profile.get("name"):
name = profile['name']
if profile.get("nip05"):
nip05 = profile['nip05']
if profile.get("lud16"):
lud16 = profile['lud16']
except Exception as e:
print(e)
await client.shutdown()
return name, nip05, lud16
async def print_results(graph, index_map, show_results_num, getmetadata=True):
for item in islice(graph, show_results_num):
key = next((PublicKey.parse(pubkey).to_bech32() for pubkey, id in index_map.items() if id == item), None)
name = ""
if getmetadata:
name, nip05, lud16 = await get_metadata(key)
print(name + "(" + key + ") " + str(graph[item]))
async def convert_index_to_hex(graph, index_map, show_results_num):
result = {}
for item in islice(graph, show_results_num):
key = next((pubkey for pubkey, id in index_map.items() if id == item), None)
result[key] = graph[item]
return result
def test():
# WARNING, DEPENDING ON DEPTH THIS TAKES LONG
user = '3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d'
dvm_config = DVMConfig()
index_map, network_graph = asyncio.run(build_wot_network(user, depth=2, max_batch=500, max_time_request=10, dvm_config=dvm_config))
save_network(index_map, network_graph, user)