nostrdvm/tests/test_events.py
2025-02-09 21:05:47 +01:00

154 lines
6.6 KiB
Python

import asyncio
from pathlib import Path
import dotenv
from nostr_sdk import Keys, Client, Tag, Filter, Timestamp, \
EventId, PublicKey, NostrSigner, Nip19Event, Kind
from nostr_dvm.utils import definitions, dvmconfig
from nostr_dvm.utils.definitions import relay_timeout
from nostr_dvm.utils.dvmconfig import DVMConfig
from nostr_dvm.utils.nip89_utils import NIP89Config
from nostr_dvm.utils.nostr_utils import check_and_set_private_key
async def test():
relay_list = dvmconfig.DVMConfig.RELAY_LIST
keys = Keys.parse(check_and_set_private_key("test_client"))
client = Client(NostrSigner.keys(keys))
for relay in relay_list:
await client.add_relay(relay)
await client.connect()
await test_referred_events(client, "c70fbd4dbaad22c427d4359981d3bdddd3971ed1a38227ca2f8e5e760f58103c",
definitions.EventDefinitions.ANY_RESULT)
# shows kind 7000 reaction but not kind 6300 result (d05e7ae9271fe2d8968cccb67c01e3458dbafa4a415e306d49b22729b088c8a1)
await test_referred_events(client, "5635e5dd930b3c831f6ab1e348bb488f3c9aca2f13190e93ab5e5e1e1ba1835e",
definitions.EventDefinitions.ANY_RESULT)
bech32evnt = EventId.parse("5635e5dd930b3c831f6ab1e348bb488f3c9aca2f13190e93ab5e5e1e1ba1835e").to_bech32()
print(bech32evnt)
test = Nip19Event.from_bech32(
"nevent1qqsrjcpejsrlt3u7dy42y6rc97svrq9ver08xy4jr2ll55ynq3sxafcppamhxue69uhkummnw3ezumt0d5pzpmnqx2pla0zvxxcfjqeeysy29ll3mtmf4s3yff0y45r7egau080vqvzqqqqqqyu4q839")
print(test.event_id().to_hex())
nostruri = EventId.parse("5635e5dd930b3c831f6ab1e348bb488f3c9aca2f13190e93ab5e5e1e1ba1835e").to_nostr_uri()
print(nostruri)
await test_search_by_user_since_days(client,
PublicKey.parse(
"npub1nxa4tywfz9nqp7z9zp7nr7d4nchhclsf58lcqt5y782rmf2hefjquaa6q8"), 60,
"Bitcoin")
async def test_referred_events(client, event_id, kinds=None):
if kinds is None:
kinds = []
if len(kinds) > 0:
job_id_filter = Filter().kinds(kinds).event(EventId.parse(event_id))
else:
job_id_filter = Filter().event(EventId.parse(event_id))
event_struct = await client.fetch_events(job_id_filter, relay_timeout)
events = event_struct.to_vec()
if len(events) > 0:
for event in events:
print(event.as_json())
return events[0]
else:
print("None")
return None
async def test_gallery():
relay_list = dvmconfig.DVMConfig.RELAY_LIST
keys = Keys.parse(check_and_set_private_key("test_client"))
client = Client(NostrSigner.keys(keys))
for relay in relay_list:
await client.add_relay(relay)
await client.connect()
dvm_config = DVMConfig()
dvm_config.NIP89 = NIP89Config()
keys = Keys.parse(check_and_set_private_key("RTEST_ACCOUNT_PK"))
print(keys.public_key().to_hex())
dvm_config.NIP89.PK = keys.secret_key().to_hex()
tagname = "url"
tags = [
Tag.parse([tagname, "https://i.nostr.build/xEZqV.jpg",
"3b0ec270394dc496f9f9c7db5c68a5b7f7311ff9080a51f1e8cb5f5cffc2c0b2", "wss://nostr.mom"]),
# Tag.parse([tagname, "https://i.nostr.build/2RnXd.jpg", "dd6e5c2891fbe9f53bcaa351b48faeeedccd16e9541b508adcb2c16d11bceaaf", "wss://nostr.mom"]),
# Tag.parse([tagname, "https://i.nostr.build/WG2Ra.jpg", "b2868e1ef93523ecf15b26e1cfdb6f252fe5074867d9c042fd6fcfbf07959193", "wss://nostr.mom"]),
# Tag.parse([tagname, "https://i.nostr.build/M5keE.jpg", "489402bf3ec070e7ebf2ba459508d2e1a408c0adad02954470602f232026a37d", "wss://nostr.mom"]),
Tag.parse([tagname, "https://v.nostr.build/M5kZ5.mp4",
"0e37cb0373189e01be3c744c0434e0c8559953910e44b05ed270313c47abe142", "wss://nostr.mom"]),
Tag.parse([tagname, "https://i.nostr.build/vGLg7.jpg",
"102d1f411a9a2b4de37ef62cdd4943673b4941080a51a8fa8829cd9f1de46d13", "wss://nostr.mom"]),
Tag.parse([tagname, "https://i.nostr.build/O4WxA.jpg",
"4022d4e893c224186bbef4414340e35cbf251c681bc84ab05446fec1d2ec67df", "wss://nostr.mom"]),
Tag.parse([tagname, "https://i.nostr.build/M5a96.jpg",
"6f04dc6a2a05f710b9c6c6d09a02c5fe0174da9c95399d3d01963a784d195803", "wss://nostr.mom"]),
Tag.parse([tagname, "https://i.nostr.build/Dj2Q4.jpg",
"737a169c245ce7957a8b6acf190c57d70256cc52630862f5ba0fd7315ef83425", "wss://nostr.mom"]),
Tag.parse([tagname, "https://i.nostr.build/Dj2Q4.jpg",
"c3a3a8759502cb3c06d592e5715cad0826982a2ff60a0ae525e3f253ab9e462a", "wss://nostr.mom"]),
# Tag.parse([tagname, "https://i.nostr.build/7G2G2.jpg", "015e71ded102e96d2b30f63dec0c04546d52a51f709709391af68d73f7502feb", "wss://nostr.mom"]),
# Tag.parse([tagname, "https://i.nostr.build/XVLkd.jpg", "43da37c84113d4c0bdc60ae1c82cef9761ff7a2a1ef29b1cc26abfd4932786c5", "wss://nostr.mom"]),
Tag.parse(["alt", "Profile Gallery List"])
]
# await gallery_announce_list(tags, dvm_config, client)
#evt = EventBuilder.delete([EventId.parse("721ac7c7d9309b6d3e6728a7274f5a1f10096b4ab17068233bcfa05cb233e84a")],
# reason="deleted").sign_with_keys(keys)
#await client.send_event(evt)
#key1 = Keys.parse("3e99f38a8e8c59ff3683cdc0942e26471c1aae9b225eb34dd410cb9d6dde93a6")
#key2 = Keys.parse("c35006d2c1e5618bfc107030047b85da4f11638b2901b24193fa45160890d851")
#encrypted_text = nip04_encrypt(key1.secret_key(), key2.public_key(), "Hello")
#print(encrypted_text)
#encrypted_text = "T+f58RV7ENaSLWZPgUEiWA==?iv=Ize4cInMI3xuWnL5I0nYFw=="
#rsult = nip04_decrypt(key2.secret_key(), key1.public_key(), encrypted_text)
#print(rsult)
async def test_search_by_user_since_days(client, pubkey, days, prompt):
since_seconds = int(days) * 24 * 60 * 60
dif = Timestamp.now().as_secs() - since_seconds
since = Timestamp.from_secs(dif)
filterts = Filter().search(prompt).author(pubkey).kinds([Kind(1)]).since(since)
event_struct = await client.fetch_events(filterts, relay_timeout)
events = event_struct.to_vec()
if len(events) > 0:
for event in events:
print(event.as_json())
return events[0]
else:
print("None")
return None
if __name__ == '__main__':
env_path = Path('.env')
if env_path.is_file():
print(f'loading environment from {env_path.resolve()}')
dotenv.load_dotenv(env_path, verbose=True, override=True)
else:
raise FileNotFoundError(f'.env file not found at {env_path} ')
asyncio.run(test_gallery())