mirror of
https://github.com/believethehype/nostrdvm.git
synced 2025-03-26 17:41:43 +01:00
204 lines
7.0 KiB
Python
204 lines
7.0 KiB
Python
import json
|
|
import os
|
|
from datetime import timedelta
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
import dotenv
|
|
from nostr_sdk import Filter, Client, Alphabet, EventId, Event, PublicKey, Tag, Keys, nip04_decrypt, Metadata, Options, \
|
|
Nip19Event
|
|
|
|
|
|
def get_event_by_id(event_id: str, client: Client, config=None) -> Event | None:
|
|
split = event_id.split(":")
|
|
if len(split) == 3:
|
|
pk = PublicKey.from_hex(split[1])
|
|
id_filter = Filter().author(pk).custom_tag(Alphabet.D, [split[2]])
|
|
events = client.get_events_of([id_filter], timedelta(seconds=config.RELAY_TIMEOUT))
|
|
else:
|
|
if str(event_id).startswith('note'):
|
|
event_id = EventId.from_bech32(event_id)
|
|
elif str(event_id).startswith("nevent"):
|
|
event_id = Nip19Event.from_bech32(event_id).event_id()
|
|
elif str(event_id).startswith('nostr:note'):
|
|
event_id = EventId.from_nostr_uri(event_id)
|
|
elif str(event_id).startswith("nostr:nevent"):
|
|
event_id = Nip19Event.from_nostr_uri(event_id).event_id()
|
|
|
|
else:
|
|
event_id = EventId.from_hex(event_id)
|
|
|
|
id_filter = Filter().id(event_id).limit(1)
|
|
events = client.get_events_of([id_filter], timedelta(seconds=config.RELAY_TIMEOUT))
|
|
if len(events) > 0:
|
|
|
|
return events[0]
|
|
else:
|
|
return None
|
|
|
|
|
|
def get_events_by_id(event_ids: list, client: Client, config=None) -> list[Event] | None:
|
|
id_filter = Filter().ids(event_ids)
|
|
events = client.get_events_of([id_filter], timedelta(seconds=config.RELAY_TIMEOUT))
|
|
if len(events) > 0:
|
|
return events
|
|
else:
|
|
return None
|
|
|
|
|
|
def get_referenced_event_by_id(event_id, client, dvm_config, kinds) -> Event | None:
|
|
if kinds is None:
|
|
kinds = []
|
|
if str(event_id).startswith('note'):
|
|
event_id = EventId.from_bech32(event_id)
|
|
elif str(event_id).startswith("nevent"):
|
|
event_id = Nip19Event.from_bech32(event_id).event_id()
|
|
elif str(event_id).startswith('nostr:note'):
|
|
event_id = EventId.from_nostr_uri(event_id)
|
|
elif str(event_id).startswith("nostr:nevent"):
|
|
event_id = Nip19Event.from_nostr_uri(event_id).event_id()
|
|
else:
|
|
event_id = EventId.from_hex(event_id)
|
|
|
|
if len(kinds) > 0:
|
|
job_id_filter = Filter().kinds(kinds).event(event_id).limit(1)
|
|
else:
|
|
job_id_filter = Filter().event(event_id).limit(1)
|
|
|
|
events = client.get_events_of([job_id_filter], timedelta(seconds=dvm_config.RELAY_TIMEOUT))
|
|
|
|
if len(events) > 0:
|
|
return events[0]
|
|
else:
|
|
return None
|
|
|
|
|
|
def send_event(event: Event, client: Client, dvm_config) -> EventId:
|
|
try:
|
|
relays = []
|
|
|
|
for tag in event.tags():
|
|
if tag.as_vec()[0] == 'relays':
|
|
for index, param in enumerate(tag.as_vec()):
|
|
if index != 0:
|
|
relays.append(tag.as_vec()[index])
|
|
|
|
for relay in relays:
|
|
if relay not in dvm_config.RELAY_LIST:
|
|
client.add_relay(relay)
|
|
|
|
event_id = client.send_event(event)
|
|
|
|
for relay in relays:
|
|
if relay not in dvm_config.RELAY_LIST:
|
|
client.remove_relay(relay)
|
|
|
|
return event_id
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
|
|
def check_and_decrypt_tags(event, dvm_config):
|
|
try:
|
|
|
|
is_encrypted = False
|
|
p = ""
|
|
for tag in event.tags():
|
|
if tag.as_vec()[0] == 'encrypted':
|
|
is_encrypted = True
|
|
elif tag.as_vec()[0] == 'p':
|
|
p = tag.as_vec()[1]
|
|
|
|
if is_encrypted:
|
|
if p != dvm_config.PUBLIC_KEY:
|
|
print("[" + dvm_config.NIP89.NAME + "] Task encrypted and not addressed to this DVM, "
|
|
"skipping..")
|
|
return None
|
|
|
|
elif p == dvm_config.PUBLIC_KEY:
|
|
tags_str = nip04_decrypt(Keys.from_sk_str(dvm_config.PRIVATE_KEY).secret_key(),
|
|
event.pubkey(), event.content())
|
|
params = json.loads(tags_str)
|
|
params.append(Tag.parse(["p", p]).as_vec())
|
|
params.append(Tag.parse(["encrypted"]).as_vec())
|
|
event_as_json = json.loads(event.as_json())
|
|
event_as_json['tags'] = params
|
|
event_as_json['content'] = ""
|
|
event = Event.from_json(json.dumps(event_as_json))
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
return event
|
|
|
|
|
|
def check_and_decrypt_own_tags(event, dvm_config):
|
|
try:
|
|
is_encrypted = False
|
|
p = ""
|
|
for tag in event.tags():
|
|
if tag.as_vec()[0] == 'encrypted':
|
|
is_encrypted = True
|
|
elif tag.as_vec()[0] == 'p':
|
|
p = tag.as_vec()[1]
|
|
|
|
if is_encrypted:
|
|
if dvm_config.PUBLIC_KEY != event.pubkey().to_hex():
|
|
print("[" + dvm_config.NIP89.NAME + "] Task encrypted and not addressed to this DVM, "
|
|
"skipping..")
|
|
return None
|
|
|
|
elif event.pubkey().to_hex() == dvm_config.PUBLIC_KEY:
|
|
tags_str = nip04_decrypt(Keys.from_sk_str(dvm_config.PRIVATE_KEY).secret_key(),
|
|
PublicKey.from_hex(p), event.content())
|
|
params = json.loads(tags_str)
|
|
params.append(Tag.parse(["p", p]).as_vec())
|
|
params.append(Tag.parse(["encrypted"]).as_vec())
|
|
event_as_json = json.loads(event.as_json())
|
|
event_as_json['tags'] = params
|
|
event_as_json['content'] = ""
|
|
event = Event.from_json(json.dumps(event_as_json))
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
return event
|
|
|
|
|
|
def update_profile(dvm_config, client, lud16=""):
|
|
keys = Keys.from_sk_str(dvm_config.PRIVATE_KEY)
|
|
nip89content = json.loads(dvm_config.NIP89.CONTENT)
|
|
if nip89content.get("name"):
|
|
name = nip89content.get("name")
|
|
about = nip89content.get("about")
|
|
image = nip89content.get("image")
|
|
|
|
# Set metadata
|
|
metadata = Metadata() \
|
|
.set_name(name) \
|
|
.set_display_name(name) \
|
|
.set_about(about) \
|
|
.set_picture(image) \
|
|
.set_lud16(lud16) \
|
|
.set_nip05(lud16)
|
|
# .set_banner("https://example.com/banner.png") \
|
|
|
|
print(f"Setting profile metadata for {keys.public_key().to_bech32()}...")
|
|
print(metadata.as_json())
|
|
client.set_metadata(metadata)
|
|
|
|
|
|
def check_and_set_private_key(identifier):
|
|
if not os.getenv("DVM_PRIVATE_KEY_" + identifier.upper()):
|
|
pk = Keys.generate().secret_key().to_hex()
|
|
add_pk_to_env_file("DVM_PRIVATE_KEY_" + identifier.upper(), pk)
|
|
return pk
|
|
else:
|
|
return os.getenv("DVM_PRIVATE_KEY_" + identifier.upper())
|
|
|
|
|
|
def add_pk_to_env_file(dtag, oskey):
|
|
env_path = Path('.env')
|
|
if env_path.is_file():
|
|
print(f'loading environment from {env_path.resolve()}')
|
|
dotenv.load_dotenv(env_path, verbose=True, override=True)
|
|
dotenv.set_key(env_path, dtag, oskey)
|