From 5f4886f0ba2f55046954a619e95ddb7aa0cba31b Mon Sep 17 00:00:00 2001 From: Believethehype Date: Sun, 26 Nov 2023 10:31:38 +0100 Subject: [PATCH] added private encrypted NIP90 jobs, use balance if available and ptag given --- .env_example | 1 + .idea/dataSources.xml | 12 ++ bot.py | 205 +++++++++++++----- dvm.py | 289 +++++++++++++------------- main.py | 1 + tasks/imagegeneration_openai_dalle.py | 2 +- tasks/imagegeneration_sdxl.py | 2 +- tasks/textextractionpdf.py | 2 +- tasks/translation.py | 8 +- test_dvm_client.py | 4 +- utils/backend_utils.py | 2 +- utils/definitions.py | 4 +- utils/dvmconfig.py | 1 + utils/output_utils.py | 2 +- utils/zap_utils.py | 33 ++- 15 files changed, 356 insertions(+), 212 deletions(-) diff --git a/.env_example b/.env_example index 2779cf8..ee31095 100644 --- a/.env_example +++ b/.env_example @@ -12,6 +12,7 @@ NOSTR_TEST_CLIENT_PRIVATE_KEY = "a secret hex key for the test dvm client" # Optional LNBITS options to create invoices (if empty, it will use the lud16 from profile, make sure to set one) LNBITS_INVOICE_KEY = "" +LNBITS_ADMIN_KEY = "" # In order to pay invoices, e.g. from the bot to DVMs, or reimburse users. Keep this secret and use responsibly. LNBITS_HOST = "https://lnbits.com" diff --git a/.idea/dataSources.xml b/.idea/dataSources.xml index a9eee01..e4a1d00 100644 --- a/.idea/dataSources.xml +++ b/.idea/dataSources.xml @@ -44,5 +44,17 @@ + + sqlite.xerial + true + org.sqlite.JDBC + jdbc:sqlite:$PROJECT_DIR$/db/Unstable Diffusion.db + $ProjectFileDir$ + + + file://$APPLICATION_CONFIG_DIR$/jdbc-drivers/Xerial SQLiteJDBC/3.43.0/org/xerial/sqlite-jdbc/3.43.0.0/sqlite-jdbc-3.43.0.0.jar + + + \ No newline at end of file diff --git a/bot.py b/bot.py index 8ab0323..2fb3781 100644 --- a/bot.py +++ b/bot.py @@ -4,17 +4,20 @@ from datetime import timedelta from threading import Thread from nostr_sdk import (Keys, Client, Timestamp, Filter, nip04_decrypt, HandleNotification, EventBuilder, PublicKey, - Options) + Options, Tag, Event, nip04_encrypt) from utils.admin_utils import admin_make_database_updates from utils.backend_utils import get_amount_per_task from utils.database_utils import get_or_add_user, update_user_balance, create_sql_table, update_sql_table, User from utils.definitions import EventDefinitions from utils.nostr_utils import send_event -from utils.zap_utils import parse_zap_event_tags +from utils.zap_utils import parse_zap_event_tags, pay_bolt11_ln_bits class Bot: + job_list: list + + # This is a simple list just to keep track which events we created and manage, so we don't pay for other requests def __init__(self, dvm_config, admin_config=None): self.NAME = "Bot" dvm_config.DB = "db/" + self.NAME + ".db" @@ -29,6 +32,8 @@ class Bot: pk = self.keys.public_key() + self.job_list = [] + print("Nostr BOT public key: " + str(pk.to_bech32()) + " Hex: " + str(pk.to_hex()) + " Name: " + self.NAME + " Supported DVM tasks: " + ', '.join(p.NAME + ":" + p.TASK for p in self.dvm_config.SUPPORTED_DVMS) + "\n") @@ -39,8 +44,13 @@ class Bot: zap_filter = Filter().pubkey(pk).kinds([EventDefinitions.KIND_ZAP]).since(Timestamp.now()) dm_filter = Filter().pubkey(pk).kinds([EventDefinitions.KIND_DM]).since(Timestamp.now()) + kinds = [EventDefinitions.KIND_NIP90_GENERIC, EventDefinitions.KIND_FEEDBACK] + for dvm in self.dvm_config.SUPPORTED_DVMS: + if dvm.KIND not in kinds: + kinds.append(dvm.KIND + 1000) + dvm_filter = (Filter().kinds(kinds).since(Timestamp.now())) - self.client.subscribe([zap_filter, dm_filter]) + self.client.subscribe([zap_filter, dm_filter, dvm_filter]) create_sql_table(self.dvm_config.DB) admin_make_database_updates(adminconfig=self.admin_config, dvmconfig=self.dvm_config, client=self.client) @@ -51,7 +61,11 @@ class Bot: keys = self.keys def handle(self, relay_url, nostr_event): - if nostr_event.kind() == EventDefinitions.KIND_DM: + if EventDefinitions.KIND_NIP90_EXTRACT_TEXT + 1000 <= nostr_event.kind() <= EventDefinitions.KIND_NIP90_GENERIC + 1000: + handle_nip90_response_event(nostr_event) + elif nostr_event.kind() == EventDefinitions.KIND_FEEDBACK: + handle_nip90_feedback(nostr_event) + elif nostr_event.kind() == EventDefinitions.KIND_DM: handle_dm(nostr_event) elif nostr_event.kind() == EventDefinitions.KIND_ZAP: handle_zap(nostr_event) @@ -64,6 +78,7 @@ class Bot: try: decrypted_text = nip04_decrypt(self.keys.secret_key(), nostr_event.pubkey(), nostr_event.content()) + print(decrypted_text) user = get_or_add_user(db=self.dvm_config.DB, npub=sender, client=self.client, config=self.dvm_config) # We do a selection of tasks now, maybe change this later, Idk. @@ -81,54 +96,69 @@ class Bot: # For some reason an admin might blacklist npubs, e.g. for abusing the service evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(), "Your are currently blocked from all " - "services.",None).to_event(self.keys) + "services.", None).to_event(self.keys) send_event(evt, client=self.client, dvm_config=dvm_config) elif user.iswhitelisted or user.balance >= required_amount or required_amount == 0: + time.sleep(2.0) + if user.iswhitelisted: + evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(), + "As you are " + "whitelisted, your balance remains at" + + str(user.balance) + " Sats.\n", + nostr_event.id()).to_event(self.keys) - if not user.iswhitelisted: - + else: balance = max(user.balance - required_amount, 0) update_sql_table(db=self.dvm_config.DB, npub=user.npub, balance=balance, iswhitelisted=user.iswhitelisted, isblacklisted=user.isblacklisted, nip05=user.nip05, lud16=user.lud16, name=user.name, lastactive=Timestamp.now().as_secs()) - time.sleep(2.0) evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(), - "Your Job is now scheduled. New balance is " + + "New balance is " + str(balance) - + " Sats.\nI will DM you once I'm done " - "processing.", - nostr_event.id()).to_event(self.keys) - else: - time.sleep(2.0) - evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(), - "Your Job is now scheduled. As you are " - "whitelisted, your balance remains at" - + str(user.balance) + " Sats.\n" - "I will DM you once I'm " - "done processing.", + + " Sats.\n", nostr_event.id()).to_event(self.keys) + input = decrypted_text.replace(decrypted_text.split(' ')[0] + " ", "") + + dvm_keys = Keys.from_sk_str(self.dvm_config.SUPPORTED_DVMS[index].PK) + i_tag = Tag.parse(["i", input, "text"]) + + # we use the y tag to keep information about the original sender, in order to forward the + # results later + + # TODO more advanced logic, more parsing, params etc, just very basic test functions for now + # outTag = Tag.parse(["output", "image/png;format=url"]) + # paramTag1 = Tag.parse(["param", "size", "1024x1024"]) + + bid = str(self.dvm_config.SUPPORTED_DVMS[index].COST * 1000) + bid_tag = Tag.parse(['bid', bid, bid]) + relays_tag = Tag.parse(["relays", json.dumps(self.dvm_config.RELAY_LIST)]) + alt_tag = Tag.parse(["alt", self.dvm_config.SUPPORTED_DVMS[index].TASK]) + + encrypted_params_string = json.dumps([i_tag.as_vec(), bid_tag.as_vec(), + relays_tag.as_vec(), alt_tag.as_vec()]) + + print(encrypted_params_string) + + encrypted_params = nip04_encrypt(self.keys.secret_key(), dvm_keys.public_key(), + encrypted_params_string) + p_tag = Tag.parse(['p', dvm_keys.public_key().to_hex()]) + encrypted_tag = Tag.parse(['encrypted']) + nip90request = EventBuilder(self.dvm_config.SUPPORTED_DVMS[index].KIND, encrypted_params, + [p_tag, encrypted_tag]).to_event(self.keys) + + entry = {"npub": user.npub, "event_id": nip90request.id().to_hex(), + "dvm_key": dvm_keys.public_key().to_hex(), "is_paid": False} + self.job_list.append(entry) + + send_event(nip90request, client=self.client, dvm_config=dvm_config) + print("[" + self.NAME + "] Replying " + user.name + " with \"scheduled\" confirmation") send_event(evt, client=self.client, dvm_config=dvm_config) - - i_tag = decrypted_text.replace(decrypted_text.split(' ')[0] + " ", "") - # TODO more advanced logic, more parsing, params etc, just very basic test functions for now - dvm_keys = Keys.from_sk_str(self.dvm_config.SUPPORTED_DVMS[index].PK) - params = { - "sender": nostr_event.pubkey().to_hex(), - "input": i_tag, - "task": self.dvm_config.SUPPORTED_DVMS[index].TASK - } - message = json.dumps(params) - evt = EventBuilder.new_encrypted_direct_msg(self.keys, dvm_keys.public_key(), - message, None).to_event(self.keys) - print("[" + self.NAME + "] Forwarding task " + self.dvm_config.SUPPORTED_DVMS[index].TASK + - " for user " + user.name + " to " + self.dvm_config.SUPPORTED_DVMS[index].NAME) - send_event(evt, client=self.client, dvm_config=dvm_config) else: - print("payment-required") + print("Bot payment-required") time.sleep(2.0) evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(), "Balance required, please zap me with at least " + @@ -138,21 +168,6 @@ class Bot: send_event(evt, client=self.client, dvm_config=dvm_config) - # TODO if we receive the result from one of the dvms, need some better management, maybe check for keys - elif decrypted_text.startswith('{"result":'): - - dvm_result = json.loads(decrypted_text) - user_npub_hex = dvm_result["sender"] - user = get_or_add_user(db=self.dvm_config.DB, npub=user_npub_hex, - client=self.client, config=self.dvm_config) - print("[" + self.NAME + "] Received results, message to orignal sender " + user.name) - reply_event = EventBuilder.new_encrypted_direct_msg(self.keys, - PublicKey.from_hex(user.npub), - dvm_result["result"], - None).to_event(self.keys) - - send_event(reply_event, client=self.client, dvm_config=dvm_config) - else: print("[" + self.NAME + "] Message from " + user.name + ": " + decrypted_text) message = "DVMs that I support:\n\n" @@ -173,6 +188,94 @@ class Bot: print("Error in bot " + str(e)) + def handle_nip90_feedback(nostr_event): + try: + status = "" + etag = "" + ptag = "" + + for tag in nostr_event.tags(): + if tag.as_vec()[0] == "status": + status = tag.as_vec()[1] + elif tag.as_vec()[0] == "e": + etag = tag.as_vec()[1] + elif tag.as_vec()[0] == "p": + ptag = tag.as_vec()[1] + + if status == "success" or status == "error" or status == "processing" or status == "partial": + entry = next((x for x in self.job_list if x['event_id'] == etag), None) + if entry is not None: + user = get_or_add_user(db=self.dvm_config.DB, npub=entry['npub'], + client=self.client, config=self.dvm_config) + + reply_event = EventBuilder.new_encrypted_direct_msg(self.keys, + PublicKey.from_hex(user.npub), + nostr_event.content(), + None).to_event(self.keys) + print(status + ": " + nostr_event.content()) + print( + "[" + self.NAME + "] Received reaction from " + nostr_event.pubkey().to_hex() + " message to orignal sender " + user.name) + send_event(reply_event, client=self.client, dvm_config=dvm_config) + + + elif status == "payment-required" or status == "partial": + for tag in nostr_event.tags(): + if tag.as_vec()[0] == "amount": + amount_msats = int(tag.as_vec()[1]) + amount = str(amount_msats / 1000) + + if len(tag.as_vec()) > 2: + bolt11 = tag.as_vec()[2] + entry = next((x for x in self.job_list if x['event_id'] == etag), None) + if entry is not None and entry['is_paid'] is False and entry['dvm_key'] == ptag: + + try: + payment_hash = pay_bolt11_ln_bits(bolt11, self.dvm_config) + self.job_list[self.job_list.index(entry)]['is_paid'] = True + print("[" + self.NAME + "] payment_hash: " + payment_hash + + " Forwarding payment of " + amount + " Sats to DVM") + except Exception as e: + print(e) + else: + print("not implemented: request bolt11 invoice") + # TODO request zap invoice + + except Exception as e: + print(e) + + def handle_nip90_response_event(nostr_event: Event): + try: + is_encrypted = False + for tag in nostr_event.tags(): + if tag.as_vec()[0] == "e": + etag = tag.as_vec()[1] + elif tag.as_vec()[0] == "p": + ptag = tag.as_vec()[1] + elif tag.as_vec()[0] == "encrypted": + is_encrypted = True + + entry = next((x for x in self.job_list if x['event_id'] == etag), None) + if entry is not None: + print(entry) + user = get_or_add_user(db=self.dvm_config.DB, npub=entry['npub'], + client=self.client, config=self.dvm_config) + + self.job_list.remove(entry) + content = nostr_event.content() + if is_encrypted: + content = nip04_decrypt(self.keys.secret_key(), nostr_event.pubkey(), content) + + print("[" + self.NAME + "] Received results, message to orignal sender " + user.name) + time.sleep(1.0) + reply_event = EventBuilder.new_encrypted_direct_msg(self.keys, + PublicKey.from_hex(user.npub), + content, + None).to_event(self.keys) + send_event(reply_event, client=self.client, dvm_config=dvm_config) + + except Exception as e: + print(e) + def handle_zap(zap_event): print("[" + self.NAME + "] Zap received") try: diff --git a/dvm.py b/dvm.py index 65c5604..eae9ab2 100644 --- a/dvm.py +++ b/dvm.py @@ -1,8 +1,9 @@ import json +import typing from datetime import timedelta from nostr_sdk import PublicKey, Keys, Client, Tag, Event, EventBuilder, Filter, HandleNotification, Timestamp, \ - init_logger, LogLevel, nip04_decrypt, Options + init_logger, LogLevel, nip04_decrypt, Options, nip04_encrypt import time @@ -10,8 +11,7 @@ from utils.definitions import EventDefinitions, RequiredJobToWatch, JobToWatch from utils.dvmconfig import DVMConfig from utils.admin_utils import admin_make_database_updates, AdminConfig from utils.backend_utils import get_amount_per_task, check_task_is_supported, get_task -from utils.database_utils import update_sql_table, \ - create_sql_table, get_or_add_user, update_user_balance +from utils.database_utils import create_sql_table, get_or_add_user, update_user_balance, update_sql_table from utils.nostr_utils import get_event_by_id, get_referenced_event_by_id, send_event from utils.output_utils import post_process_result, build_status_reaction from utils.zap_utils import check_bolt11_ln_bits_is_paid, create_bolt11_ln_bits, parse_zap_event_tags @@ -79,15 +79,53 @@ class DVM: handle_nip90_job_event(nostr_event) elif nostr_event.kind() == EventDefinitions.KIND_ZAP: handle_zap(nostr_event) - elif nostr_event.kind() == EventDefinitions.KIND_DM: - handle_dm(nostr_event) + # elif nostr_event.kind() == EventDefinitions.KIND_DM: + # handle_dm(nostr_event) def handle_msg(self, relay_url, msg): return def handle_nip90_job_event(nip90_event): + """ + :type nip90_event: Event + """ + p = "" + is_encrypted = False + tags: typing.List[Tag] + tags = nip90_event.tags() + + for tag in tags: + if tag.as_vec()[0] == 'encrypted': + is_encrypted = True + elif tag.as_vec()[0] == 'p': + p = tag.as_vec()[1] + + if is_encrypted: + if p != Keys.from_sk_str(self.dvm_config.PRIVATE_KEY).public_key().to_hex(): + print("[" + self.dvm_config.NIP89.name + "] Task encrypted and not addressed to this DVM, " + "skipping..") + return + + elif p == Keys.from_sk_str(self.dvm_config.PRIVATE_KEY).public_key().to_hex(): + print("encrypted") + encrypted_tag = Tag.parse(["encrypted"]) + p_tag = Tag.parse(["p", p]) + + tags_str = nip04_decrypt(Keys.from_sk_str(self.dvm_config.PRIVATE_KEY).secret_key(), + nip90_event.pubkey(), nip90_event.content()) + params = json.loads(tags_str) + + for element in params: + tags.append(Tag.parse(element)) + + # Keep the encrypted tag + tags.append(p_tag) + tags.append(encrypted_tag) + + nip90_event.tags = tags user = get_or_add_user(self.dvm_config.DB, nip90_event.pubkey().to_hex(), client=self.client, config=self.dvm_config) + task_supported, task, duration = check_task_is_supported(nip90_event, client=self.client, get_duration=(not user.iswhitelisted), config=self.dvm_config) @@ -107,17 +145,40 @@ class DVM: if dvm.TASK == task and dvm.COST == 0: task_is_free = True + + #if user is whitelisted or task is free, just do the job if user.iswhitelisted or task_is_free: print( "[" + self.dvm_config.NIP89.name + "] Free task or Whitelisted for task " + task + ". Starting processing..") + send_job_status_reaction(nip90_event, "processing", True, 0, client=self.client, dvm_config=self.dvm_config) - do_work(nip90_event, is_from_bot=False) + do_work(nip90_event) + # if task is directed to us via p tag and user has balance, do the job and update balance + elif p == Keys.from_sk_str(self.dvm_config.PRIVATE_KEY).public_key().to_hex(): + if user.balance > amount: + balance = max(user.balance - amount, 0) + update_sql_table(db=self.dvm_config.DB, npub=user.npub, balance=balance, + iswhitelisted=user.iswhitelisted, isblacklisted=user.isblacklisted, + nip05=user.nip05, lud16=user.lud16, name=user.name, + lastactive=Timestamp.now().as_secs()) + + print( + "[" + self.dvm_config.NIP89.name + "] Using user's balance for task: " + task + + + ". Starting processing.. New balance is: " + str(balance)) + + send_job_status_reaction(nip90_event, "processing", True, 0, + client=self.client, dvm_config=self.dvm_config) + + do_work(nip90_event) + + #else send a payment required event to user else: bid = 0 - for tag in nip90_event.tags(): + for tag in nip90_event.tags: if tag.as_vec()[0] == 'bid': bid = int(tag.as_vec()[1]) @@ -137,6 +198,7 @@ class DVM: nip90_event.id().to_hex()) send_job_status_reaction(nip90_event, "payment-required", False, amount, client=self.client, dvm_config=self.dvm_config) + else: print("Task not supported on this DVM, skipping..") @@ -161,7 +223,8 @@ class DVM: elif tag.as_vec()[0] == 'e': job_event = get_event_by_id(tag.as_vec()[1], client=self.client, config=self.dvm_config) - task_supported, task, duration = check_task_is_supported(job_event, client=self.client, + task_supported, task, duration = check_task_is_supported(job_event, + client=self.client, get_duration=False, config=self.dvm_config) if job_event is not None and task_supported: @@ -170,7 +233,7 @@ class DVM: send_job_status_reaction(job_event, "processing", client=self.client, dvm_config=self.dvm_config) indices = [i for i, x in enumerate(self.job_list) if - x.event_id == job_event.id().to_hex()] + x.event == job_event] index = -1 if len(indices) > 0: index = indices[0] @@ -183,10 +246,10 @@ class DVM: # If payment-required appears before processing self.job_list.pop(index) print("Starting work...") - do_work(job_event, is_from_bot=False) + do_work(job_event) else: print("Job not in List, but starting work...") - do_work(job_event, is_from_bot=False) + do_work(job_event) else: send_job_status_reaction(job_event, "payment-rejected", @@ -213,31 +276,9 @@ class DVM: except Exception as e: print(f"Error during content decryption: {e}") - def handle_dm(dm_event): - # Note that DVMs only listen and answer to Bots, if at all. - decrypted_text = nip04_decrypt(self.keys.secret_key(), dm_event.pubkey(), dm_event.content()) - ob = json.loads(decrypted_text) - - # One key might host multiple DVMs, so we check current task - if ob['task'] == self.dvm_config.SUPPORTED_DVMS[0].TASK: - input_type = "text" - print(decrypted_text) - if str(ob['input']).startswith("http"): - input_type = "url" - - # TODO Handle additional inputs/params - j_tag = Tag.parse(["j", self.dvm_config.SUPPORTED_DVMS[0].TASK]) - i_tag = Tag.parse(["i", ob['input'], input_type]) - - y_tag = Tag.parse(["y", dm_event.pubkey().to_hex()]) - z_tag = Tag.parse(["z", ob['sender']]) - tags = [j_tag, i_tag, y_tag, z_tag] - job_event = EventBuilder(EventDefinitions.KIND_DM, "", tags).to_event(self.keys) - - do_work(job_event, is_from_bot=True) - - def check_event_has_not_unfinished_job_input(nevent, append, client, dvm_config): - task_supported, task, duration = check_task_is_supported(nevent, client, False, config=dvm_config) + def check_event_has_not_unfinished_job_input(nevent, append, client, dvmconfig): + task_supported, task, duration = check_task_is_supported(nevent, client, False, + config=dvmconfig) if not task_supported: return False @@ -252,34 +293,36 @@ class DVM: if input_type == "job": evt = get_referenced_event_by_id(event_id=input, client=client, kinds=EventDefinitions.ANY_RESULT, - dvm_config=dvm_config) + dvm_config=dvmconfig) if evt is None: if append: - job = RequiredJobToWatch(event=nevent, timestamp=Timestamp.now().as_secs()) - self.jobs_on_hold_list.append(job) + job_ = RequiredJobToWatch(event=nevent, timestamp=Timestamp.now().as_secs()) + self.jobs_on_hold_list.append(job_) send_job_status_reaction(nevent, "chain-scheduled", True, 0, - client=client, dvm_config=dvm_config) + client=client, dvm_config=dvmconfig) return False else: return True - def check_and_return_event(data, original_event_str: str, is_from_bot: bool): + def check_and_return_event(data, original_event_str: str): original_event = Event.from_json(original_event_str) for x in self.job_list: - if x.event_id == original_event.id().to_hex(): + if x.event == original_event: is_paid = x.is_paid amount = x.amount x.result = data x.is_processed = True if self.dvm_config.SHOW_RESULT_BEFORE_PAYMENT and not is_paid: - send_nostr_reply_event(data, original_event_str, ) + send_nostr_reply_event(data, original_event_str) send_job_status_reaction(original_event, "success", amount, - dvm_config=self.dvm_config) # or payment-required, or both? + dvm_config=self.dvm_config, + ) # or payment-required, or both? elif not self.dvm_config.SHOW_RESULT_BEFORE_PAYMENT and not is_paid: send_job_status_reaction(original_event, "success", amount, - dvm_config=self.dvm_config) # or payment-required, or both? + dvm_config=self.dvm_config, + ) # or payment-required, or both? if self.dvm_config.SHOW_RESULT_BEFORE_PAYMENT and is_paid: self.job_list.remove(x) @@ -290,103 +333,68 @@ class DVM: try: post_processed_content = post_process_result(data, original_event) - - if is_from_bot: - # Reply to Bot - original_sender = "" - receiver_key = PublicKey - for tag in original_event.tags(): - if tag.as_vec()[0] == "y": # TODO we temporally use internal tags in Kind4 to move information - receiver_key = PublicKey.from_hex(tag.as_vec()[1]) - elif tag.as_vec()[0] == "z": - original_sender = tag.as_vec()[1] - - params = { - "result": post_processed_content, - "sender": original_sender - } - message = json.dumps(params) - print("[" + self.dvm_config.NIP89.name + "] " + message) - response_event = EventBuilder.new_encrypted_direct_msg(self.keys, receiver_key, message, - None).to_event(self.keys) - send_event(response_event, client=self.client, dvm_config=self.dvm_config) - else: - # Regular DVM reply - send_nostr_reply_event(post_processed_content, original_event_str) + send_nostr_reply_event(post_processed_content, original_event_str) except Exception as e: - respond_to_error(str(e), original_event_str, False) + send_job_status_reaction(original_event, "error", content=str(e), dvm_config=self.dvm_config, + ) def send_nostr_reply_event(content, original_event_as_str): original_event = Event.from_json(original_event_as_str) request_tag = Tag.parse(["request", original_event_as_str.replace("\\", "")]) e_tag = Tag.parse(["e", original_event.id().to_hex()]) - p_tag = Tag.parse(["p", original_event.pubkey().to_hex()]) + # p_tag = Tag.parse(["p", original_event.pubkey().to_hex()]) alt_tag = Tag.parse(["alt", "This is the result of a NIP90 DVM AI task with kind " + str( original_event.kind()) + ". The task was: " + original_event.content()]) status_tag = Tag.parse(["status", "success"]) - reply_tags = [request_tag, e_tag, p_tag, alt_tag, status_tag] + reply_tags = [request_tag, e_tag, alt_tag, status_tag] + encrypted = False + for tag in original_event.tags(): + if tag.as_vec()[0] == "encrypted": + encrypted = True + encrypted_tag = Tag.parse(["encrypted"]) + reply_tags.append(encrypted_tag) + for tag in original_event.tags(): if tag.as_vec()[0] == "i": - i_content = tag.as_vec()[1] - i_kind = tag.as_vec()[2] - i_tag = Tag.parse(["i", i_content, i_kind]) - reply_tags.append(i_tag) + i_tag = tag + if not encrypted: + reply_tags.append(i_tag) + elif tag.as_vec()[0] == "p": + p_tag = tag + reply_tags.append(p_tag) - key = Keys.from_sk_str(self.dvm_config.PRIVATE_KEY) + if encrypted: + content = nip04_encrypt(self.keys.secret_key(), PublicKey.from_hex(original_event.pubkey().to_hex()), + content) + + reply_event = EventBuilder(original_event.kind() + 1000, str(content), reply_tags).to_event(self.keys) - response_kind = original_event.kind() + 1000 - reply_event = EventBuilder(response_kind, str(content), reply_tags).to_event(key) send_event(reply_event, client=self.client, dvm_config=self.dvm_config) print("[" + self.dvm_config.NIP89.name + "] " + str( - response_kind) + " Job Response event sent: " + reply_event.as_json()) - return reply_event.as_json() - - def respond_to_error(content: str, original_event_as_str: str, is_from_bot=False): - print("ERROR: " + str(content)) - keys = Keys.from_sk_str(self.dvm_config.PRIVATE_KEY) - original_event = Event.from_json(original_event_as_str) - sender = "" - task = "" - if not is_from_bot: - send_job_status_reaction(original_event, "error", content=content, dvm_config=self.dvm_config) - # TODO Send Zap back - else: - for tag in original_event.tags(): - if tag.as_vec()[0] == "p": - sender = tag.as_vec()[1] - elif tag.as_vec()[0] == "i": - task = tag.as_vec()[1] - - user = get_or_add_user(db=self.dvm_config.DB, npub=sender, client=self.client, config=self.dvm_config) - if not user.iswhitelisted: - amount = int(user.balance) + get_amount_per_task(task, self.dvm_config) - update_sql_table(self.dvm_config.DB, sender, amount, user.iswhitelisted, user.isblacklisted, - user.nip05, user.lud16, user.name, Timestamp.now().as_secs()) - message = "There was the following error : " + content + ". Credits have been reimbursed" - else: - # User didn't pay, so no reimbursement - message = "There was the following error : " + content - - evt = EventBuilder.new_encrypted_direct_msg(keys, PublicKey.from_hex(sender), message, - None).to_event(keys) - send_event(evt, client=self.client, dvm_config=self.dvm_config) + original_event.kind() + 1000) + " Job Response event sent: " + reply_event.as_json()) def send_job_status_reaction(original_event, status, is_paid=True, amount=0, client=None, content=None, dvm_config=None): - dvm_config = dvm_config + task = get_task(original_event, client=client, dvmconfig=dvm_config) alt_description, reaction = build_status_reaction(status, task, amount, content) e_tag = Tag.parse(["e", original_event.id().to_hex()]) - p_tag = Tag.parse(["p", original_event.pubkey().to_hex()]) + # p_tag = Tag.parse(["p", original_event.pubkey().to_hex()]) alt_tag = Tag.parse(["alt", alt_description]) status_tag = Tag.parse(["status", status]) - tags = [e_tag, p_tag, alt_tag, status_tag] + + tags = [e_tag, alt_tag, status_tag] + for tag in original_event.tags: + + if tag.as_vec()[0] == "p": + p_tag = tag + tags.append(p_tag) if status == "success" or status == "error": # for x in self.job_list: - if x.event_id == original_event.id(): + if x.event == original_event: is_paid = x.is_paid amount = x.amount break @@ -401,15 +409,15 @@ class DVM: except Exception as e: print(e) - if not any(x.event_id == original_event.id().to_hex() for x in self.job_list): + if not any(x.event == original_event for x in self.job_list): self.job_list.append( - JobToWatch(event_id=original_event.id().to_hex(), + JobToWatch(event=original_event, timestamp=original_event.created_at().as_secs(), amount=amount, is_paid=is_paid, status=status, result="", is_processed=False, bolt11=bolt11, payment_hash=payment_hash, - expires=expires, from_bot=False)) + expires=expires)) # print(str(self.job_list)) if (status == "payment-required" or status == "payment-rejected" or ( status == "processing" and not is_paid) @@ -422,14 +430,13 @@ class DVM: tags.append(amount_tag) keys = Keys.from_sk_str(dvm_config.PRIVATE_KEY) - event = EventBuilder(EventDefinitions.KIND_FEEDBACK, reaction, tags).to_event(keys) - - send_event(event, client=self.client, dvm_config=self.dvm_config) + reaction_event = EventBuilder(EventDefinitions.KIND_FEEDBACK, reaction, tags).to_event(keys) + send_event(reaction_event, client=self.client, dvm_config=self.dvm_config) print("[" + self.dvm_config.NIP89.name + "]" + ": Sent Kind " + str( - EventDefinitions.KIND_FEEDBACK) + " Reaction: " + status + " " + event.as_json()) - return event.as_json() + EventDefinitions.KIND_FEEDBACK) + " Reaction: " + status + " " + reaction_event.as_json()) + return reaction_event.as_json() - def do_work(job_event, is_from_bot=False): + def do_work(job_event): if ((EventDefinitions.KIND_NIP90_EXTRACT_TEXT <= job_event.kind() <= EventDefinitions.KIND_NIP90_GENERIC) or job_event.kind() == EventDefinitions.KIND_DM): @@ -440,42 +447,36 @@ class DVM: request_form = dvm.create_request_form_from_nostr_event(job_event, self.client, self.dvm_config) result = dvm.process(request_form) - check_and_return_event(result, str(job_event.as_json()), is_from_bot=is_from_bot) + check_and_return_event(result, str(job_event.as_json())) except Exception as e: print(e) - respond_to_error(str(e), job_event.as_json(), is_from_bot=is_from_bot) + send_job_status_reaction(job_event, "error", content=str(e), dvm_config=self.dvm_config) return self.client.handle_notifications(NotificationHandler()) while True: for job in self.job_list: if job.bolt11 != "" and job.payment_hash != "" and not job.is_paid: - if str(check_bolt11_ln_bits_is_paid(job.payment_hash, self.dvm_config)) == "True": - job.is_paid = True - event = get_event_by_id(job.event_id, client=self.client, config=self.dvm_config) - if event is not None: - send_job_status_reaction(event, "processing", True, 0, - client=self.client, - dvm_config=self.dvm_config) - print("[" + self.dvm_config.NIP89.name + "] doing work from joblist") + ispaid = check_bolt11_ln_bits_is_paid(job.payment_hash, self.dvm_config) + if ispaid and job.is_paid is False: + print("is paid") - do_work(event, is_from_bot=False) - elif check_bolt11_ln_bits_is_paid(job.payment_hash, self.dvm_config) is None: # invoice expired - try: - self.job_list.remove(job) - except: - print("[" + self.dvm_config.NIP89.name + "] Error removing Job from List after payment") + job.is_paid = True + send_job_status_reaction(job.event, "processing", True, 0, + client=self.client, + dvm_config=self.dvm_config) + print("[" + self.dvm_config.NIP89.name + "] doing work from joblist") + do_work(job.event) + elif ispaid is None: # invoice expired + self.job_list.remove(job) if Timestamp.now().as_secs() > job.expires: - try: - self.job_list.remove(job) - except: - print("[" + self.dvm_config.NIP89.name + "] Error removing Job from List after expiry") + self.job_list.remove(job) for job in self.jobs_on_hold_list: if check_event_has_not_unfinished_job_input(job.event, False, client=self.client, - dvm_config=self.dvm_config): + dvmconfig=self.dvm_config): handle_nip90_job_event(nip90_event=job.event) try: self.jobs_on_hold_list.remove(job) diff --git a/main.py b/main.py index dfc08ec..c523c3c 100644 --- a/main.py +++ b/main.py @@ -22,6 +22,7 @@ def run_nostr_dvm_with_local_config(): bot_config = DVMConfig() bot_config.PRIVATE_KEY = os.getenv("BOT_PRIVATE_KEY") bot_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY") + bot_config.LNBITS_ADMIN_KEY = os.getenv("LNBITS_ADMIN_KEY") # The bot will forward zaps for us, use responsibly bot_config.LNBITS_URL = os.getenv("LNBITS_HOST") # Spawn some DVMs in the playground and run them diff --git a/tasks/imagegeneration_openai_dalle.py b/tasks/imagegeneration_openai_dalle.py index 1b1784a..f2801a5 100644 --- a/tasks/imagegeneration_openai_dalle.py +++ b/tasks/imagegeneration_openai_dalle.py @@ -46,7 +46,7 @@ class ImageGenerationDALLE(DVMTaskInterface): model = "dall-e-3" quality = "standard" - for tag in event.tags(): + for tag in event.tags: if tag.as_vec()[0] == 'i': input_type = tag.as_vec()[2] if input_type == "text": diff --git a/tasks/imagegeneration_sdxl.py b/tasks/imagegeneration_sdxl.py index df07576..b9e5de2 100644 --- a/tasks/imagegeneration_sdxl.py +++ b/tasks/imagegeneration_sdxl.py @@ -55,7 +55,7 @@ class ImageGenerationSDXL(DVMTaskInterface): lora_weight = "" strength = "" guidance_scale = "" - for tag in event.tags(): + for tag in event.tags: if tag.as_vec()[0] == 'i': input_type = tag.as_vec()[2] if input_type == "text": diff --git a/tasks/textextractionpdf.py b/tasks/textextractionpdf.py index ba85899..1bf9639 100644 --- a/tasks/textextractionpdf.py +++ b/tasks/textextractionpdf.py @@ -43,7 +43,7 @@ class TextExtractionPDF(DVMTaskInterface): input_content = "" url = "" - for tag in event.tags(): + for tag in event.tags: if tag.as_vec()[0] == 'i': input_type = tag.as_vec()[2] input_content = tag.as_vec()[1] diff --git a/tasks/translation.py b/tasks/translation.py index d852265..8ed23d0 100644 --- a/tasks/translation.py +++ b/tasks/translation.py @@ -42,7 +42,7 @@ class Translation(DVMTaskInterface): text = "" translation_lang = "en" - for tag in event.tags(): + for tag in event.tags: if tag.as_vec()[0] == 'i': input_type = tag.as_vec()[2] @@ -52,20 +52,20 @@ class Translation(DVMTaskInterface): translation_lang = str(tag.as_vec()[2]).split('-')[0] if input_type == "event": - for tag in event.tags(): + for tag in event.tags: if tag.as_vec()[0] == 'i': evt = get_event_by_id(tag.as_vec()[1], client=client, config=dvm_config) text = evt.content() break elif input_type == "text": - for tag in event.tags(): + for tag in event.tags: if tag.as_vec()[0] == 'i': text = tag.as_vec()[1] break elif input_type == "job": - for tag in event.tags(): + for tag in event.tags: if tag.as_vec()[0] == 'i': evt = get_referenced_event_by_id(event_id=tag.as_vec()[1], client=client, kinds=[EventDefinitions.KIND_NIP90_RESULT_EXTRACT_TEXT, diff --git a/test_dvm_client.py b/test_dvm_client.py index 55d15a7..8455a28 100644 --- a/test_dvm_client.py +++ b/test_dvm_client.py @@ -84,11 +84,11 @@ def nostr_client(): EventDefinitions.KIND_FEEDBACK]).since(Timestamp.now())) # public events client.subscribe([dm_zap_filter, dvm_filter]) - nostr_client_test_translation("This is the result of the DVM in spanish", "text", "es", 20, 20) + #nostr_client_test_translation("This is the result of the DVM in spanish", "text", "es", 20, 20) #nostr_client_test_translation("note1p8cx2dz5ss5gnk7c59zjydcncx6a754c0hsyakjvnw8xwlm5hymsnc23rs", "event", "es", 20,20) #nostr_client_test_translation("44a0a8b395ade39d46b9d20038b3f0c8a11168e67c442e3ece95e4a1703e2beb", "event", "zh", 20, 20) - #nostr_client_test_image("a beautiful purple ostrich watching the sunset") + nostr_client_test_image("a beautiful purple ostrich watching the sunset") class NotificationHandler(HandleNotification): def handle(self, relay_url, event): print(f"Received new event from {relay_url}: {event.as_json()}") diff --git a/utils/backend_utils.py b/utils/backend_utils.py index b9b1281..afa2019 100644 --- a/utils/backend_utils.py +++ b/utils/backend_utils.py @@ -56,7 +56,7 @@ def check_task_is_supported(event, client, get_duration=False, config=None): input_type = "" duration = 1 task = get_task(event, client=client, dvmconfig=dvm_config) - for tag in event.tags(): + for tag in event.tags: if tag.as_vec()[0] == 'i': if len(tag.as_vec()) < 3: print("Job Event missing/malformed i tag, skipping..") diff --git a/utils/definitions.py b/utils/definitions.py index 28a173e..cc7140f 100644 --- a/utils/definitions.py +++ b/utils/definitions.py @@ -35,7 +35,7 @@ class EventDefinitions: @dataclass class JobToWatch: - event_id: str + event: str timestamp: int is_paid: bool amount: int @@ -45,8 +45,6 @@ class JobToWatch: bolt11: str payment_hash: str expires: int - from_bot: bool - @dataclass class RequiredJobToWatch: diff --git a/utils/dvmconfig.py b/utils/dvmconfig.py index a42d479..e6776ba 100644 --- a/utils/dvmconfig.py +++ b/utils/dvmconfig.py @@ -14,6 +14,7 @@ class DVMConfig: RELAY_TIMEOUT = 3 LNBITS_INVOICE_KEY = '' + LNBITS_ADMIN_KEY = '' # In order to pay invoices, e.g. from the bot to DVMs, or reimburse users. LNBITS_URL = 'https://lnbits.com' DB: str NEW_USER_BALANCE: int = 250 # Free credits for new users diff --git a/utils/output_utils.py b/utils/output_utils.py index 90aaa0d..5ca215c 100644 --- a/utils/output_utils.py +++ b/utils/output_utils.py @@ -17,7 +17,7 @@ Post process results to either given output format or a Nostr readable plain tex def post_process_result(anno, original_event): print("Post-processing...") if isinstance(anno, pandas.DataFrame): # if input is an anno we parse it to required output format - for tag in original_event.tags(): + for tag in original_event.tags: print(tag.as_vec()[0]) if tag.as_vec()[0] == "output": output_format = tag.as_vec()[1] diff --git a/utils/zap_utils.py b/utils/zap_utils.py index 728328c..880ad89 100644 --- a/utils/zap_utils.py +++ b/utils/zap_utils.py @@ -3,8 +3,8 @@ import json import requests from Crypto.Cipher import AES -from bech32 import bech32_decode, convertbits -from nostr_sdk import nostr_sdk, PublicKey, SecretKey, Event +from bech32 import bech32_decode, convertbits, bech32_encode +from nostr_sdk import nostr_sdk, PublicKey, SecretKey, Event, EventBuilder, Tag from utils.dvmconfig import DVMConfig from utils.nostr_utils import get_event_by_id @@ -91,10 +91,22 @@ def check_bolt11_ln_bits_is_paid(payment_hash: str, config: DVMConfig): try: res = requests.get(url, headers=headers) obj = json.loads(res.text) - return obj["paid"] #TODO cast + return obj["paid"] except Exception as e: return None +def pay_bolt11_ln_bits(bolt11: str, config: DVMConfig): + url = config.LNBITS_URL + "/api/v1/payments" + data = {'out': True, 'bolt11': bolt11} + headers = {'X-API-Key': config.LNBITS_ADMIN_KEY, 'Content-Type': 'application/json', 'charset': 'UTF-8'} + try: + res = requests.post(url, json=data, headers=headers) + obj = json.loads(res.text) + return obj["payment_hash"] + except Exception as e: + print("LNBITS: " + str(e)) + return None, None + # DECRYPT ZAPS def check_for_zapplepay(pubkey_hex: str, content: str): @@ -133,3 +145,18 @@ def decrypt_private_zap_message(msg: str, privkey: SecretKey, pubkey: PublicKey) return decoded except Exception as ex: return str(ex) + + +def zap_request(lud16, recipientPubkey, amount_in_sats, keys, dvm_config): + amount_tag = Tag.parse(['amount', str(amount_in_sats*1000)]) + relays_tag = Tag.parse(['relays', dvm_config.RELAY_LIST]) + p_tag = Tag.parse(['p', recipientPubkey]) + + + #_, encrypted_msg = bech32_encode(parts[0]) + + lnurl_tag = Tag.parse(['lnurl', recipientPubkey]) + + + zaprequest = EventBuilder(9734, "", + [amount_tag, relays_tag, p_tag,lnurl_tag ]).to_event(keys)