diff --git a/.env_example b/.env_example index ea271f8..2779cf8 100644 --- a/.env_example +++ b/.env_example @@ -1,9 +1,28 @@ -NOSTR_PRIVATE_KEY = nostrSecretkeyinhex -NOSTR_TEST_CLIENT_PRIVATE_KEY = nostrSecretkeyinhex_forthetestclient -USER_DB_PATH = nostrzaps.db +#This is needed for the test_client +NOSTR_TEST_CLIENT_PRIVATE_KEY = "a secret hex key for the test dvm client" +#This is needed for the (optional) bot +BOT_PRIVATE_KEY = "The private key for a test bot that communicates with dvms" -LNBITS_INVOICE_KEY = lnbitswalletinvoicekey -LNBITS_HOST = https://lnbits.com +#These are all for the playground and can be replaced and adjusted however needed +NOSTR_PRIVATE_KEY = "a secret hexkey for some demo dvms" +NOSTR_PRIVATE_KEY2 = "another secret hexkey for demo dvm with another key" +BOT_PRIVATE_KEY = "The private key for a test bot that communicates with dvms" +NOSTR_TEST_CLIENT_PRIVATE_KEY = "a secret hex key for the test dvm client" -TASK_TEXTEXTRACTION_NIP89_DTAG = "asdd" -TASK_TRANSLATION_NIP89_DTAG = abcded + +# Optional LNBITS options to create invoices (if empty, it will use the lud16 from profile, make sure to set one) +LNBITS_INVOICE_KEY = "" +LNBITS_HOST = "https://lnbits.com" + + +# Some d tags we use in the testfile to announce or dvms. Create one at vendata.io) +TASK_TEXT_EXTRACTION_NIP89_DTAG = "asdd" +TASK_TRANSLATION_NIP89_DTAG = "abcded" +TASK_IMAGE_GENERATION_NIP89_DTAG = "fgdfgdf" +TASK_IMAGE_GENERATION_NIP89_DTAG2 = "fdgdfg" +TASK_IMAGE_GENERATION_NIP89_DTAG3 = "asdasd" + + +#Backend Specific Options for tasks that require them +OPENAI_API_KEY = "" # Enter your OpenAI API Key to use DVMs with OpenAI services +NOVA_SERVER = "" # Enter the address of a nova-server instance, locally or on a machine in your network host:port \ No newline at end of file diff --git a/.gitignore b/.gitignore index 8bbe727..bd811a6 100644 --- a/.gitignore +++ b/.gitignore @@ -158,5 +158,5 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ -nostrzaps.db .DS_Store +*.db diff --git a/.idea/dataSources.xml b/.idea/dataSources.xml index 211f211..a9eee01 100644 --- a/.idea/dataSources.xml +++ b/.idea/dataSources.xml @@ -8,5 +8,41 @@ jdbc:sqlite:$PROJECT_DIR$/nostrzaps.db $ProjectFileDir$ + + sqlite.xerial + true + org.sqlite.JDBC + jdbc:sqlite:$PROJECT_DIR$/db/bot.db + $ProjectFileDir$ + + + file://$APPLICATION_CONFIG_DIR$/jdbc-drivers/Xerial SQLiteJDBC/3.43.0/org/xerial/sqlite-jdbc/3.43.0.0/sqlite-jdbc-3.43.0.0.jar + + + + + sqlite.xerial + true + org.sqlite.JDBC + jdbc:sqlite:$PROJECT_DIR$/db/Dall-E 3.db + $ProjectFileDir$ + + + file://$APPLICATION_CONFIG_DIR$/jdbc-drivers/Xerial SQLiteJDBC/3.43.0/org/xerial/sqlite-jdbc/3.43.0.0/sqlite-jdbc-3.43.0.0.jar + + + + + sqlite.xerial + true + org.sqlite.JDBC + jdbc:sqlite:$PROJECT_DIR$/db/Translator.db + $ProjectFileDir$ + + + file://$APPLICATION_CONFIG_DIR$/jdbc-drivers/Xerial SQLiteJDBC/3.43.0/org/xerial/sqlite-jdbc/3.43.0.0/sqlite-jdbc-3.43.0.0.jar + + + \ No newline at end of file diff --git a/README.md b/README.md index 637e042..3910177 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,28 @@ -# Nostr AI Data Vending Machine +# NostrAI: Nostr NIP90 Data Vending Machine Framework -This example DVM implementation in Python currently supports simple translations using Google translate, as well as extraction of text from links with pdf files. +This framework provides a way to easily build and/or run `Nostr NIP90 DVMs in Python`. -At a later stage, additional example tasks will be added, as well as the integration into a larger Machine Learning backend +This project is currently under development and additional tasks and features are added along the way. +This means the project is in alpha status, interfaces might still change/break. -Place .env file (based on .env_example) in main folder, install requirements.txt (python 3.10) run main.py. Optionally supports LNbits to create invoices instead of lnaddresses. +## To get started: +(Tested on Python 3.10) -Use vendata.io to create a nip89 announcement of your dvm and save the dtag in your .env config. +Create a new venv by running `"python -m venv venv"` + - Place .env file (based on .env_example) in main folder. + - Set your own private hex keys, create NIP89 dtags on vendata.io, + - Install requirements.txt + - Run python main.py. -A tutorial on how to add additional tasks, as well as the larger server backend will be added soon. +In `playground.py` some DVMs are already prepared. Feel free to play along with the existing ones. +You can also add new tasks by using the interface, just like the existing tasks in the `tasks` folder. + +A `bot` is running by default that lists and communicates with the `DVMs` added to it, +so your DVMs can be controled via any regular client as well. + +The Framework optionally supports `LNbits` to create invoices instead of using a `lightning address`. If LNBits is not used, +make sure your nostr accounts have a valid lightning address. + + +A tutorial on how to add additional tasks, as well as the larger server backend will be added at a later stage. diff --git a/backends/README.md b/backends/README.md new file mode 100644 index 0000000..c7a2bac --- /dev/null +++ b/backends/README.md @@ -0,0 +1,10 @@ +# NostrAI Data Vending Machine Backends + +Each DVM task might either run locally or use a specific backend. +Especially for GPU tasks it might make sense to outsource some tasks on other machines. +Backends can also be API calls to (paid) services. This directory contains basic calling functions to such backends. +Modules in the folder "tasks" might use these functions to call a specific backend. + +Using backends might require some extra work like running/hosting a server or acquiring an API key. + + diff --git a/backends/nova_server.py b/backends/nova_server.py new file mode 100644 index 0000000..a961082 --- /dev/null +++ b/backends/nova_server.py @@ -0,0 +1,107 @@ +import io +import json +import os +import time +import zipfile +import pandas as pd +import requests +import PIL.Image as Image + +from utils.output_utils import upload_media_to_hoster + +""" +This file contains basic calling functions for ML tasks that are outsourced to nova-server +(https://github.com/hcmlab/nova-server). nova-server is an Open-Source backend that enables running models locally + based on preefined modules (nova-server-modules), by accepting a request form. + Modules are deployed in in separate virtual environments so dependencies won't conflict. + +Setup nova-server: +https://hcmlab.github.io/nova-server/docbuild/html/tutorials/introduction.html + +""" + +""" +send_request_to_nova_server(request_form, address) +Function to send a request_form to the server, containing all the information we parsed from the Nostr event and added +in the module that is calling the server + +""" + + +def send_request_to_nova_server(request_form, address): + print("Sending job to NOVA-Server") + url = ('http://' + address + '/process') + headers = {'Content-type': 'application/x-www-form-urlencoded'} + response = requests.post(url, headers=headers, data=request_form) + return response.text + + +""" +check_nova_server_status(request_form, address) +Function that requests the status of the current process with the jobID (we use the Nostr event as jobID). +When the Job is successfully finished we grab the result and depending on the type return the output +We throw an exception on error +""" + + +def check_nova_server_status(jobID, address): + headers = {'Content-type': 'application/x-www-form-urlencoded'} + url_status = 'http://' + address + '/job_status' + url_log = 'http://' + address + '/log' + + print("Sending Status Request to NOVA-Server") + data = {"jobID": jobID} + + status = 0 + length = 0 + while status != 2 and status != 3: + response_status = requests.post(url_status, headers=headers, data=data) + response_log = requests.post(url_log, headers=headers, data=data) + status = int(json.loads(response_status.text)['status']) + log_content = str(json.loads(response_log.text)['message']).replace("ERROR", "").replace("INFO", "") + log = log_content[length:] + length = len(log_content) + if log != "": + print(log) + # WAITING = 0, RUNNING = 1, FINISHED = 2, ERROR = 3 + time.sleep(1.0) + + if status == 2: + try: + result = "" + url_fetch = 'http://' + address + '/fetch_result' + print("Fetching Results from NOVA-Server...") + data = {"jobID": jobID, "delete_after_download": True} + response = requests.post(url_fetch, headers=headers, data=data) + content_type = response.headers['content-type'] + print("Content-type: " + str(content_type)) + if content_type == "image/jpeg": + image = Image.open(io.BytesIO(response.content)) + image.save("./outputs/image.jpg") + result = upload_media_to_hoster("./outputs/image.jpg") + os.remove("./outputs/image.jpg") + elif content_type == 'text/plain; charset=utf-8': + result = response.content.decode('utf-8') + elif content_type == "zip": + zf = zipfile.ZipFile(io.BytesIO(response.content), "r") + + for fileinfo in zf.infolist(): + if fileinfo.filename.endswith(".annotation~"): + try: + anno_string = zf.read(fileinfo).decode('utf-8', errors='replace') + columns = ['from', 'to', 'name', 'conf'] + result = pd.DataFrame([row.split(';') for row in anno_string.split('\n')], + columns=columns) + print(result) + with open("response.zip", "wb") as f: + f.write(response.content) + except Exception as e: + #zf.extractall() + print(e) + + return result + except Exception as e: + print("Couldn't fetch result: " + str(e)) + + elif status == 3: + return "error" diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..55c5b7e --- /dev/null +++ b/bot.py @@ -0,0 +1,216 @@ +import json +import time +from datetime import timedelta +from threading import Thread + +from nostr_sdk import Keys, Client, Timestamp, Filter, nip04_decrypt, HandleNotification, EventBuilder, PublicKey, \ + Event, Options + +from utils.admin_utils import admin_make_database_updates +from utils.backend_utils import get_amount_per_task +from utils.database_utils import get_or_add_user, update_user_balance, create_sql_table, update_sql_table, User +from utils.definitions import EventDefinitions +from utils.nostr_utils import send_event, get_event_by_id +from utils.zap_utils import parse_amount_from_bolt11_invoice, check_for_zapplepay, decrypt_private_zap_message, \ + parse_zap_event_tags + + +class Bot: + def __init__(self, dvm_config, admin_config=None): + self.NAME = "Bot" + dvm_config.DB = "db/" + self.NAME + ".db" + self.dvm_config = dvm_config + self.admin_config = admin_config + self.keys = Keys.from_sk_str(dvm_config.PRIVATE_KEY) + wait_for_send = True + skip_disconnected_relays = True + opts = (Options().wait_for_send(wait_for_send).send_timeout(timedelta(seconds=self.dvm_config.RELAY_TIMEOUT)) + .skip_disconnected_relays(skip_disconnected_relays)) + self.client = Client.with_opts(self.keys, opts) + + pk = self.keys.public_key() + + print("Nostr BOT public key: " + str(pk.to_bech32()) + " Hex: " + str(pk.to_hex()) + " Name: " + self.NAME + + " Supported DVM tasks: " + + ', '.join(p.NAME + ":" + p.TASK for p in self.dvm_config.SUPPORTED_DVMS) + "\n") + + for relay in self.dvm_config.RELAY_LIST: + self.client.add_relay(relay) + self.client.connect() + + zap_filter = Filter().pubkey(pk).kinds([EventDefinitions.KIND_ZAP]).since(Timestamp.now()) + dm_filter = Filter().pubkey(pk).kinds([EventDefinitions.KIND_DM]).since(Timestamp.now()) + + self.client.subscribe([zap_filter, dm_filter]) + + create_sql_table(self.dvm_config.DB) + admin_make_database_updates(adminconfig=self.admin_config, dvmconfig=self.dvm_config, client=self.client) + + class NotificationHandler(HandleNotification): + client = self.client + dvm_config = self.dvm_config + keys = self.keys + + def handle(self, relay_url, nostr_event): + if nostr_event.kind() == EventDefinitions.KIND_DM: + handle_dm(nostr_event) + elif nostr_event.kind() == EventDefinitions.KIND_ZAP: + handle_zap(nostr_event) + + def handle_msg(self, relay_url, msg): + return + + def handle_dm(nostr_event): + sender = nostr_event.pubkey().to_hex() + + try: + decrypted_text = nip04_decrypt(self.keys.secret_key(), nostr_event.pubkey(), nostr_event.content()) + user = get_or_add_user(db=self.dvm_config.DB, npub=sender, client=self.client, config=self.dvm_config) + + + # We do a selection of tasks now, maybe change this later, Idk. + if decrypted_text[0].isdigit(): + index = int(decrypted_text.split(' ')[0]) - 1 + task = self.dvm_config.SUPPORTED_DVMS[index].TASK + print("[" + self.NAME + "] Request from " + str(user.name) + " (" + str(user.nip05) + ", Balance: " + + str(user.balance)+ " Sats) Task: " + str(task)) + + duration = 1 + required_amount = get_amount_per_task(self.dvm_config.SUPPORTED_DVMS[index].TASK, + self.dvm_config, duration) + + + if user.isblacklisted: + # For some reason an admin might blacklist npubs, e.g. for abusing the service + evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(), + "Your are currently blocked from all services.", + None).to_event(self.keys) + send_event(evt, client=self.client, dvm_config=dvm_config) + + elif user.iswhitelisted or user.balance >= required_amount or required_amount == 0: + if not user.iswhitelisted: + + balance = max(user.balance - required_amount, 0) + update_sql_table(db=self.dvm_config.DB, npub=user.npub, balance=balance, + iswhitelisted=user.iswhitelisted, isblacklisted=user.isblacklisted, + nip05=user.nip05, lud16=user.lud16, name=user.name, + lastactive=Timestamp.now().as_secs()) + + evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(), + "Your Job is now scheduled. New balance is " + + str(balance) + + " Sats.\nI will DM you once I'm done " + "processing.", + nostr_event.id()).to_event(self.keys) + else: + evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(), + "Your Job is now scheduled. As you are " + "whitelisted, your balance remains at" + + str(user.balance) + " Sats.\n" + "I will DM you once I'm " + "done processing.", + nostr_event.id()).to_event(self.keys) + + print("[" + self.NAME + "] Replying " + user.name + " with \"scheduled\" confirmation") + time.sleep(2.0) + send_event(evt, client=self.client, dvm_config=dvm_config) + + i_tag = decrypted_text.replace(decrypted_text.split(' ')[0] + " ", "") + # TODO more advanced logic, more parsing, params etc, just very basic test functions for now + dvm_keys = Keys.from_sk_str(self.dvm_config.SUPPORTED_DVMS[index].PK) + params = { + "sender": nostr_event.pubkey().to_hex(), + "input": i_tag, + "task": self.dvm_config.SUPPORTED_DVMS[index].TASK + } + message = json.dumps(params) + evt = EventBuilder.new_encrypted_direct_msg(self.keys, dvm_keys.public_key(), + message, None).to_event(self.keys) + print("[" + self.NAME + "] Forwarding task " + self.dvm_config.SUPPORTED_DVMS[index].TASK + + " for user " + user.name + " to " + self.dvm_config.SUPPORTED_DVMS[index].NAME) + send_event(evt, client=self.client, dvm_config=dvm_config) + else: + print("payment-required") + time.sleep(2.0) + evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(), + "Balance required, please zap me with at least " + + str(int(required_amount - user.balance)) + + " Sats, then try again.", + nostr_event.id()).to_event(self.keys) + time.sleep(2.0) + send_event(evt, client=self.client, dvm_config=dvm_config) + + + # TODO if we receive the result from one of the dvms, need some better management, maybe check for keys + elif decrypted_text.startswith('{"result":'): + + + dvm_result = json.loads(decrypted_text) + user_npub_hex = dvm_result["sender"] + user = get_or_add_user(db=self.dvm_config.DB, npub=user_npub_hex, + client=self.client, config=self.dvm_config) + print("[" + self.NAME + "] Received results, message to orignal sender " + user.name) + reply_event = EventBuilder.new_encrypted_direct_msg(self.keys, + PublicKey.from_hex(user.npub), + dvm_result["result"], + None).to_event(self.keys) + time.sleep(2.0) + send_event(reply_event, client=self.client, dvm_config=dvm_config) + + else: + print("[" + self.NAME + "] Message from " + user.name + ": " + decrypted_text) + message = "DVMs that I support:\n\n" + index = 1 + for p in self.dvm_config.SUPPORTED_DVMS: + message += str(index) + " " + p.NAME + " " + p.TASK + " " + str(p.COST) + " Sats" + "\n" + index += 1 + + evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(), + message + "\nSelect an Index and provide an input (" + "e.g. 1 A purple ostrich)", + None).to_event(self.keys) + #nostr_event.id()).to_event(self.keys) + time.sleep(3) + send_event(evt, client=self.client, dvm_config=dvm_config) + + except Exception as e: + pass + # TODO we still receive (broken content) events after fetching the metadata, but we don't listen to them. + # probably in client.get_events_of in fetch_user_metadata + print("Error in bot " + str(e)) + def handle_zap(zap_event): + print("[" + self.NAME + "] Zap received") + + try: + invoice_amount, zapped_event, sender, anon = parse_zap_event_tags(zap_event, + self.keys, self.NAME, + self.client, self.dvm_config) + + user = get_or_add_user(self.dvm_config.DB, sender, client=self.client, config=self.dvm_config) + + if zapped_event is not None: + if not anon: + print("[" + self.NAME + "] Note Zap received for Bot balance: " + str(invoice_amount) + " Sats from " + str( + user.name)) + update_user_balance(self.dvm_config.DB, sender, invoice_amount, client=self.client, + config=self.dvm_config) + + # a regular note + elif not anon: + print("[" + self.NAME + "] Profile Zap received for Bot balance: " + str(invoice_amount) + " Sats from " + str( + user.name)) + update_user_balance(self.dvm_config.DB, sender, invoice_amount, client=self.client, + config=self.dvm_config) + + except Exception as e: + print("[" + self.NAME + "] Error during content decryption:" + str(e)) + + self.client.handle_notifications(NotificationHandler()) + while True: + time.sleep(1.0) + + def run(self): + bot = Bot + nostr_dvm_thread = Thread(target=bot, args=[self.dvm_config]) + nostr_dvm_thread.start() + diff --git a/dvm.py b/dvm.py index c712ff1..d352346 100644 --- a/dvm.py +++ b/dvm.py @@ -1,150 +1,153 @@ -from nostr_sdk import PublicKey, Keys, Client, Tag, Event, EventBuilder, Filter, HandleNotification, Timestamp, \ - init_logger, LogLevel -import time -import emoji +import json +from datetime import timedelta -from utils.definitions import EventDefinitions, DVMConfig, RequiredJobToWatch, JobToWatch -from utils.admin_utils import admin_make_database_updates +from nostr_sdk import PublicKey, Keys, Client, Tag, Event, EventBuilder, Filter, HandleNotification, Timestamp, \ + init_logger, LogLevel, nip04_decrypt, EventId, Options + +import time + +from utils.definitions import EventDefinitions, RequiredJobToWatch, JobToWatch +from utils.dvmconfig import DVMConfig +from utils.admin_utils import admin_make_database_updates, AdminConfig from utils.backend_utils import get_amount_per_task, check_task_is_supported, get_task from utils.database_utils import update_sql_table, get_from_sql_table, \ create_sql_table, get_or_add_user, update_user_balance from utils.nostr_utils import get_event_by_id, get_referenced_event_by_id, send_event -from utils.output_utils import post_process_result -from utils.zap_utils import check_bolt11_ln_bits_is_paid, parse_bolt11_invoice, \ - check_for_zapplepay, decrypt_private_zap_message, create_bolt11_ln_bits +from utils.output_utils import post_process_result, build_status_reaction +from utils.zap_utils import check_bolt11_ln_bits_is_paid, parse_amount_from_bolt11_invoice, \ + check_for_zapplepay, decrypt_private_zap_message, create_bolt11_ln_bits, parse_zap_event_tags use_logger = False if use_logger: init_logger(LogLevel.DEBUG) -job_list = [] -jobs_on_hold_list = [] -dvm_config = DVMConfig() +class DVM: + dvm_config: DVMConfig + admin_config: AdminConfig + keys: Keys + client: Client + job_list: list + jobs_on_hold_list: list -def dvm(config): - dvm_config = config - keys = Keys.from_sk_str(dvm_config.PRIVATE_KEY) - pk = keys.public_key() + def __init__(self, dvmconfig, adminconfig=None): + self.dvm_config = dvmconfig + self.admin_config = adminconfig + self.keys = Keys.from_sk_str(dvmconfig.PRIVATE_KEY) + wait_for_send = True + skip_disconnected_relays = True + opts = (Options().wait_for_send(wait_for_send).send_timeout(timedelta(seconds=self.dvm_config.RELAY_TIMEOUT)) + .skip_disconnected_relays(skip_disconnected_relays)) - print(f"Nostr DVM public key: {pk.to_bech32()}, Hex: {pk.to_hex()} ") - print('Supported DVM tasks: ' + ', '.join(p.TASK for p in dvm_config.SUPPORTED_TASKS)) + self.client = Client.with_opts(self.keys, opts) - client = Client(keys) - for relay in dvm_config.RELAY_LIST: - client.add_relay(relay) - client.connect() + self.job_list = [] + self.jobs_on_hold_list = [] - dm_zap_filter = Filter().pubkey(pk).kinds([EventDefinitions.KIND_ZAP]).since(Timestamp.now()) + pk = self.keys.public_key() - kinds = [EventDefinitions.KIND_NIP90_GENERIC] - for dvm in dvm_config.SUPPORTED_TASKS: - if dvm.KIND not in kinds: - kinds.append(dvm.KIND) - dvm_filter = (Filter().kinds(kinds).since(Timestamp.now())) - client.subscribe([dm_zap_filter, dvm_filter]) + print("Nostr DVM public key: " + str(pk.to_bech32()) + " Hex: " + str(pk.to_hex()) + " Supported DVM tasks: " + + ', '.join(p.NAME + ":" + p.TASK for p in self.dvm_config.SUPPORTED_DVMS) + "\n") - create_sql_table() - admin_make_database_updates(config=dvm_config, client=client) + for relay in self.dvm_config.RELAY_LIST: + self.client.add_relay(relay) + self.client.connect() - class NotificationHandler(HandleNotification): - def handle(self, relay_url, nostr_event): - if EventDefinitions.KIND_NIP90_EXTRACT_TEXT <= nostr_event.kind() <= EventDefinitions.KIND_NIP90_GENERIC: - print(f"[Nostr] Received new NIP90 Job Request from {relay_url}: {nostr_event.as_json()}") - handle_nip90_job_event(nostr_event, dvm_config) - elif nostr_event.kind() == EventDefinitions.KIND_ZAP: - handle_zap(nostr_event, dvm_config) + zap_filter = Filter().pubkey(pk).kinds([EventDefinitions.KIND_ZAP]).since(Timestamp.now()) + bot_dm_filter = Filter().pubkey(pk).kinds([EventDefinitions.KIND_DM]).authors(self.dvm_config.DM_ALLOWED).since( + Timestamp.now()) - def handle_msg(self, relay_url, msg): - return + kinds = [EventDefinitions.KIND_NIP90_GENERIC] + for dvm in self.dvm_config.SUPPORTED_DVMS: + if dvm.KIND not in kinds: + kinds.append(dvm.KIND) + dvm_filter = (Filter().kinds(kinds).since(Timestamp.now())) + self.client.subscribe([dvm_filter, zap_filter, bot_dm_filter]) - def handle_nip90_job_event(event, dvm_config): - user = get_or_add_user(event.pubkey().to_hex()) - task_supported, task, duration = check_task_is_supported(event, client=client, - get_duration=(not user.iswhitelisted), - config=dvm_config) - print(task) + create_sql_table(self.dvm_config.DB) + admin_make_database_updates(adminconfig=self.admin_config, dvmconfig=self.dvm_config, client=self.client) - if user.isblacklisted: - send_job_status_reaction(event, "error", client=client, config=dvm_config) - print("[Nostr] Request by blacklisted user, skipped") + class NotificationHandler(HandleNotification): + client = self.client + dvm_config = self.dvm_config + keys = self.keys - elif task_supported: - print("Received new Task: " + task) - print(duration) - amount = get_amount_per_task(task, dvm_config, duration) - if amount is None: + def handle(self, relay_url, nostr_event): + if EventDefinitions.KIND_NIP90_EXTRACT_TEXT <= nostr_event.kind() <= EventDefinitions.KIND_NIP90_GENERIC: + print( + "[" + self.dvm_config.NIP89.name + "] " + f"Received new NIP90 Job Request from {relay_url}: {nostr_event.as_json()}") + handle_nip90_job_event(nostr_event) + elif nostr_event.kind() == EventDefinitions.KIND_ZAP: + handle_zap(nostr_event) + elif nostr_event.kind() == EventDefinitions.KIND_DM: + handle_dm(nostr_event) + + def handle_msg(self, relay_url, msg): return - task_is_free = False - for dvm in dvm_config.SUPPORTED_TASKS: - if dvm.TASK == task and dvm.COST == 0: - task_is_free = True + def handle_nip90_job_event(nip90_event): + user = get_or_add_user(self.dvm_config.DB, nip90_event.pubkey().to_hex(), client=self.client, + config=self.dvm_config) + task_supported, task, duration = check_task_is_supported(nip90_event, client=self.client, + get_duration=(not user.iswhitelisted), + config=self.dvm_config) - if user.iswhitelisted or task_is_free: - print("[Nostr] Free or Whitelisted for task " + task + ". Starting processing..") - send_job_status_reaction(event, "processing", True, 0, client=client, config=dvm_config) - do_work(event, is_from_bot=False) - # otherwise send payment request + if user.isblacklisted: + send_job_status_reaction(nip90_event, "error", client=self.client, dvm_config=self.dvm_config) + print("[" + self.dvm_config.NIP89.name + "] Request by blacklisted user, skipped") + + elif task_supported: + print("[" + self.dvm_config.NIP89.name + "] Received new Request: " + task + " from " + user.name) + amount = get_amount_per_task(task, self.dvm_config, duration) + if amount is None: + return + + task_is_free = False + for dvm in self.dvm_config.SUPPORTED_DVMS: + if dvm.TASK == task and dvm.COST == 0: + task_is_free = True + + if user.iswhitelisted or task_is_free: + print( + "[" + self.dvm_config.NIP89.name + "] Free task or Whitelisted for task " + task + ". Starting processing..") + send_job_status_reaction(nip90_event, "processing", True, 0, client=self.client, + dvm_config=self.dvm_config) + do_work(nip90_event, is_from_bot=False) + + else: + bid = 0 + for tag in nip90_event.tags(): + if tag.as_vec()[0] == 'bid': + bid = int(tag.as_vec()[1]) + + print( + "[" + self.dvm_config.NIP89.name + "] Payment required: New Nostr " + task + " Job event: " + nip90_event.as_json()) + if bid > 0: + bid_offer = int(bid / 1000) + if bid_offer >= amount: + send_job_status_reaction(nip90_event, "payment-required", False, + amount, # bid_offer + client=self.client, dvm_config=self.dvm_config) + + else: # If there is no bid, just request server rate from user + print( + "[" + self.dvm_config.NIP89.name + "] Requesting payment for Event: " + nip90_event.id().to_hex()) + send_job_status_reaction(nip90_event, "payment-required", + False, amount, client=self.client, dvm_config=self.dvm_config) else: - bid = 0 - for tag in event.tags(): - if tag.as_vec()[0] == 'bid': - bid = int(tag.as_vec()[1]) + print("Task not supported on this DVM, skipping..") - print("[Nostr][Payment required] New Nostr " + task + " Job event: " + event.as_json()) - if bid > 0: - bid_offer = int(bid / 1000) - if bid_offer >= amount: - send_job_status_reaction(event, "payment-required", False, - amount, # bid_offer - client=client, config=dvm_config) + def handle_zap(zap_event): + print("Zap received") - else: # If there is no bid, just request server rate from user - print("[Nostr] Requesting payment for Event: " + event.id().to_hex()) - send_job_status_reaction(event, "payment-required", - False, amount, client=client, config=dvm_config) - else: - print("Task not supported on this DVM, skipping..") + try: + invoice_amount, zapped_event, sender, anon = parse_zap_event_tags(zap_event, + self.keys, self.dvm_config.NIP89.name, + self.client, self.dvm_config) + user = get_or_add_user(db=self.dvm_config.DB, npub=sender, client=self.client, config=self.dvm_config) - def handle_zap(event, dvm_config): - zapped_event = None - invoice_amount = 0 - anon = False - sender = event.pubkey() - - try: - for tag in event.tags(): - if tag.as_vec()[0] == 'bolt11': - invoice_amount = parse_bolt11_invoice(tag.as_vec()[1]) - elif tag.as_vec()[0] == 'e': - zapped_event = get_event_by_id(tag.as_vec()[1], config=dvm_config) - elif tag.as_vec()[0] == 'description': - zap_request_event = Event.from_json(tag.as_vec()[1]) - sender = check_for_zapplepay(zap_request_event.pubkey().to_hex(), - zap_request_event.content()) - for ztag in zap_request_event.tags(): - if ztag.as_vec()[0] == 'anon': - if len(ztag.as_vec()) > 1: - print("Private Zap received.") - decrypted_content = decrypt_private_zap_message(ztag.as_vec()[1], - keys.secret_key(), - zap_request_event.pubkey()) - decrypted_private_event = Event.from_json(decrypted_content) - if decrypted_private_event.kind() == 9733: - sender = decrypted_private_event.pubkey().to_hex() - message = decrypted_private_event.content() - if message != "": - print("Zap Message: " + message) - else: - anon = True - print("Anonymous Zap received. Unlucky, I don't know from whom, and never will") - user = get_or_add_user(sender) - print(str(user)) - - if zapped_event is not None: - if zapped_event.kind() == EventDefinitions.KIND_FEEDBACK: # if a reaction by us got zapped - if not dvm_config.IS_BOT: + if zapped_event is not None: + if zapped_event.kind() == EventDefinitions.KIND_FEEDBACK: # if a reaction by us got zapped print("Zap received for NIP90 task: " + str(invoice_amount) + " Sats from " + str( user.name)) amount = 0 @@ -153,28 +156,29 @@ def dvm(config): if tag.as_vec()[0] == 'amount': amount = int(float(tag.as_vec()[1]) / 1000) elif tag.as_vec()[0] == 'e': - job_event = get_event_by_id(tag.as_vec()[1], config=dvm_config) + job_event = get_event_by_id(tag.as_vec()[1], client=self.client, config=self.dvm_config) - task_supported, task, duration = check_task_is_supported(job_event, client=client, - get_duration=False, config=dvm_config) + task_supported, task, duration = check_task_is_supported(job_event, client=self.client, + get_duration=False, + config=self.dvm_config) if job_event is not None and task_supported: if amount <= invoice_amount: - print("[Nostr] Payment-request fulfilled...") - send_job_status_reaction(job_event, "processing", client=client, - config=dvm_config) - indices = [i for i, x in enumerate(job_list) if + print("[" + self.dvm_config.NIP89.name + "] Payment-request fulfilled...") + send_job_status_reaction(job_event, "processing", client=self.client, + dvm_config=self.dvm_config) + indices = [i for i, x in enumerate(self.job_list) if x.event_id == job_event.id().to_hex()] index = -1 if len(indices) > 0: index = indices[0] if index > -1: - if job_list[index].is_processed: # If payment-required appears a processing - job_list[index].is_paid = True - check_and_return_event(job_list[index].result, str(job_event.as_json()), - dvm_key=dvm_config.PRIVATE_KEY) - elif not (job_list[index]).is_processed: + if self.job_list[index].is_processed: # If payment-required appears a processing + self.job_list[index].is_paid = True + check_and_return_event(self.job_list[index].result, + str(job_event.as_json())) + elif not (self.job_list[index]).is_processed: # If payment-required appears before processing - job_list.pop(index) + self.job_list.pop(index) print("Starting work...") do_work(job_event, is_from_bot=False) else: @@ -183,263 +187,293 @@ def dvm(config): else: send_job_status_reaction(job_event, "payment-rejected", - False, invoice_amount, client=client, config=dvm_config) - print("[Nostr] Invoice was not paid sufficiently") + False, invoice_amount, client=self.client, + dvm_config=self.dvm_config) + print("[" + self.dvm_config.NIP89.name + "] Invoice was not paid sufficiently") - elif zapped_event.kind() in EventDefinitions.ANY_RESULT: - print("Someone zapped the result of an exisiting Task. Nice") - elif not anon and not dvm_config.PASSIVE_MODE: - print("Note Zap received for Bot balance: " + str(invoice_amount) + " Sats from " + str( + elif zapped_event.kind() in EventDefinitions.ANY_RESULT: + print("Someone zapped the result of an exisiting Task. Nice") + elif not anon: + print("Note Zap received for Bot balance: " + str(invoice_amount) + " Sats from " + str( + user.name)) + update_user_balance(self.dvm_config.DB, sender, invoice_amount, client=self.client, + config=self.dvm_config) + + # a regular note + elif not anon: + print("Profile Zap received for Bot balance: " + str(invoice_amount) + " Sats from " + str( user.name)) - update_user_balance(sender, invoice_amount, config=dvm_config) + update_user_balance(self.dvm_config.DB, sender, invoice_amount, client=self.client, + config=self.dvm_config) - # a regular note - elif not anon and not dvm_config.PASSIVE_MODE: - print("Profile Zap received for Bot balance: " + str(invoice_amount) + " Sats from " + str( - user.name)) - update_user_balance(sender, invoice_amount, config=dvm_config) + except Exception as e: + print(f"Error during content decryption: {e}") - except Exception as e: - print(f"Error during content decryption: {e}") + def handle_dm(dm_event): + decrypted_text = nip04_decrypt(self.keys.secret_key(), dm_event.pubkey(), dm_event.content()) + ob = json.loads(decrypted_text) - def do_work(job_event, is_from_bot=False): - if ((EventDefinitions.KIND_NIP90_EXTRACT_TEXT <= job_event.kind() <= EventDefinitions.KIND_NIP90_GENERIC) - or job_event.kind() == EventDefinitions.KIND_DM): + # One key might host multiple DVMs, so we check current task + if ob['task'] == self.dvm_config.SUPPORTED_DVMS[0].TASK: + input_type = "text" + print(decrypted_text) + if str(ob['input']).startswith("http"): + input_type = "url" - task = get_task(job_event, client=client, dvmconfig=dvm_config) - for dvm in dvm_config.SUPPORTED_TASKS: - try: - if task == dvm.TASK: - request_form = dvm.create_request_form_from_nostr_event(job_event, client, dvm_config) - result = dvm.process(request_form) - check_and_return_event(result, str(job_event.as_json()), dvm_key=dvm_config.PRIVATE_KEY) + j_tag = Tag.parse(["j", self.dvm_config.SUPPORTED_DVMS[0].TASK]) + i_tag = Tag.parse(["i", ob['input'], input_type]) + tags = [j_tag, i_tag] + tags.append(Tag.parse(["y", dm_event.pubkey().to_hex()])) + tags.append(Tag.parse(["z", ob['sender']])) + job_event = EventBuilder(EventDefinitions.KIND_DM, "", tags).to_event(self.keys) - except Exception as e: - respond_to_error(e, job_event.as_json(), is_from_bot, dvm_config.PRIVATE_KEY) + do_work(job_event, is_from_bot=True) - def check_event_has_not_unfinished_job_input(nevent, append, client, dvmconfig): - task_supported, task, duration = check_task_is_supported(nevent, client, False, config=dvmconfig) - if not task_supported: - return False + def check_event_has_not_unfinished_job_input(nevent, append, client, dvmconfig): + task_supported, task, duration = check_task_is_supported(nevent, client, False, config=dvmconfig) + if not task_supported: + return False - for tag in nevent.tags(): - if tag.as_vec()[0] == 'i': - if len(tag.as_vec()) < 3: - print("Job Event missing/malformed i tag, skipping..") - return False - else: - input = tag.as_vec()[1] - input_type = tag.as_vec()[2] - if input_type == "job": - evt = get_referenced_event_by_id(input, EventDefinitions.ANY_RESULT, client, config=dvmconfig) - if evt is None: - if append: - job = RequiredJobToWatch(event=nevent, timestamp=Timestamp.now().as_secs()) - jobs_on_hold_list.append(job) - send_job_status_reaction(nevent, "chain-scheduled", True, 0, client=client, - config=dvmconfig) + for tag in nevent.tags(): + if tag.as_vec()[0] == 'i': + if len(tag.as_vec()) < 3: + print("Job Event missing/malformed i tag, skipping..") + return False + else: + input = tag.as_vec()[1] + input_type = tag.as_vec()[2] + if input_type == "job": + evt = get_referenced_event_by_id(event_id=input, client=client, + kinds=EventDefinitions.ANY_RESULT, + dvm_config=dvmconfig) + if evt is None: + if append: + job = RequiredJobToWatch(event=nevent, timestamp=Timestamp.now().as_secs()) + self.jobs_on_hold_list.append(job) + send_job_status_reaction(nevent, "chain-scheduled", True, 0, client=client, + dvm_config=dvmconfig) - return False - else: - return True - - def send_job_status_reaction(original_event, status, is_paid=True, amount=0, client=None, content=None, config=None, - key=None): - dvmconfig = config - altdesc = "This is a reaction to a NIP90 DVM AI task. " - task = get_task(original_event, client=client, dvmconfig=dvmconfig) - if status == "processing": - altdesc = "NIP90 DVM AI task " + task + " started processing. " - reaction = altdesc + emoji.emojize(":thumbs_up:") - elif status == "success": - altdesc = "NIP90 DVM AI task " + task + " finished successfully. " - reaction = altdesc + emoji.emojize(":call_me_hand:") - elif status == "chain-scheduled": - altdesc = "NIP90 DVM AI task " + task + " Chain Task scheduled" - reaction = altdesc + emoji.emojize(":thumbs_up:") - elif status == "error": - altdesc = "NIP90 DVM AI task " + task + " had an error. " - if content is None: - reaction = altdesc + emoji.emojize(":thumbs_down:") + return False else: - reaction = altdesc + emoji.emojize(":thumbs_down:") + content + return True - elif status == "payment-required": + def check_and_return_event(data, original_event_str: str, is_from_bot: bool): + original_event = Event.from_json(original_event_str) - altdesc = "NIP90 DVM AI task " + task + " requires payment of min " + str(amount) + " Sats. " - reaction = altdesc + emoji.emojize(":orange_heart:") - - elif status == "payment-rejected": - altdesc = "NIP90 DVM AI task " + task + " payment is below required amount of " + str(amount) + " Sats. " - reaction = altdesc + emoji.emojize(":thumbs_down:") - elif status == "user-blocked-from-service": - - altdesc = "NIP90 DVM AI task " + task + " can't be performed. User has been blocked from Service. " - reaction = altdesc + emoji.emojize(":thumbs_down:") - else: - reaction = emoji.emojize(":thumbs_down:") - - etag = Tag.parse(["e", original_event.id().to_hex()]) - ptag = Tag.parse(["p", original_event.pubkey().to_hex()]) - alttag = Tag.parse(["alt", altdesc]) - statustag = Tag.parse(["status", status]) - tags = [etag, ptag, alttag, statustag] - - if status == "success" or status == "error": # - for x in job_list: - if x.event_id == original_event.id(): + for x in self.job_list: + if x.event_id == original_event.id().to_hex(): is_paid = x.is_paid amount = x.amount + x.result = data + x.is_processed = True + if self.dvm_config.SHOW_RESULT_BEFORE_PAYMENT and not is_paid: + send_nostr_reply_event(data, original_event_str, ) + send_job_status_reaction(original_event, "success", amount, + dvm_config=self.dvm_config) # or payment-required, or both? + elif not self.dvm_config.SHOW_RESULT_BEFORE_PAYMENT and not is_paid: + send_job_status_reaction(original_event, "success", amount, + dvm_config=self.dvm_config) # or payment-required, or both? + + if self.dvm_config.SHOW_RESULT_BEFORE_PAYMENT and is_paid: + self.job_list.remove(x) + elif not self.dvm_config.SHOW_RESULT_BEFORE_PAYMENT and is_paid: + self.job_list.remove(x) + send_nostr_reply_event(data, original_event_str) break - bolt11 = "" - payment_hash = "" - expires = original_event.created_at().as_secs() + (60 * 60 * 24) - if status == "payment-required" or (status == "processing" and not is_paid): - if dvmconfig.LNBITS_INVOICE_KEY != "": - try: - bolt11, payment_hash = create_bolt11_ln_bits(amount, dvmconfig) - except Exception as e: - print(e) + try: + post_processed_content = post_process_result(data, original_event) - if not any(x.event_id == original_event.id().to_hex() for x in job_list): - job_list.append( - JobToWatch(event_id=original_event.id().to_hex(), timestamp=original_event.created_at().as_secs(), - amount=amount, - is_paid=is_paid, - status=status, result="", is_processed=False, bolt11=bolt11, payment_hash=payment_hash, - expires=expires, from_bot=False)) - print(str(job_list)) - if status == "payment-required" or status == "payment-rejected" or (status == "processing" and not is_paid) or ( - status == "success" and not is_paid): + if is_from_bot: + # Reply to Bot + for tag in original_event.tags(): + if tag.as_vec()[0] == "y": # TODO we temporally use internal tags to move information + receiver_key = PublicKey.from_hex(tag.as_vec()[1]) + elif tag.as_vec()[0] == "z": + original_sender = tag.as_vec()[1] - if dvmconfig.LNBITS_INVOICE_KEY != "": - amount_tag = Tag.parse(["amount", str(amount * 1000), bolt11]) - else: - amount_tag = Tag.parse(["amount", str(amount * 1000)]) # to millisats - tags.append(amount_tag) - if key is not None: - keys = Keys.from_sk_str(key) - else: - keys = Keys.from_sk_str(dvmconfig.PRIVATE_KEY) - event = EventBuilder(EventDefinitions.KIND_FEEDBACK, reaction, tags).to_event(keys) + params = { + "result": post_processed_content, + "sender": original_sender + } + message = json.dumps(params) + print(message) + response_event = EventBuilder.new_encrypted_direct_msg(self.keys, receiver_key, message, + None).to_event(self.keys) + send_event(response_event, client=self.client, dvm_config=self.dvm_config) + else: + # Regular DVM reply + send_nostr_reply_event(post_processed_content, original_event_str) + except Exception as e: + respond_to_error(str(e), original_event_str, False) - send_event(event, key=keys) - print( - "[Nostr] Sent Kind " + str(EventDefinitions.KIND_FEEDBACK) + " Reaction: " + status + " " + event.as_json()) - return event.as_json() - - def check_and_return_event(data, original_event_str: str, dvm_key=""): - original_event = Event.from_json(original_event_str) - keys = Keys.from_sk_str(dvm_key) - - for x in job_list: - if x.event_id == original_event.id().to_hex(): - is_paid = x.is_paid - amount = x.amount - x.result = data - x.is_processed = True - if dvm_config.SHOWRESULTBEFOREPAYMENT and not is_paid: - send_nostr_reply_event(data, original_event_str, key=keys) - send_job_status_reaction(original_event, "success", amount, - config=dvm_config) # or payment-required, or both? - elif not dvm_config.SHOWRESULTBEFOREPAYMENT and not is_paid: - send_job_status_reaction(original_event, "success", amount, - config=dvm_config) # or payment-required, or both? - - if dvm_config.SHOWRESULTBEFOREPAYMENT and is_paid: - job_list.remove(x) - elif not dvm_config.SHOWRESULTBEFOREPAYMENT and is_paid: - job_list.remove(x) - send_nostr_reply_event(data, original_event_str, key=keys) - break - - post_processed_content = post_process_result(data, original_event) - send_nostr_reply_event(post_processed_content, original_event_str, key=keys) - - def send_nostr_reply_event(content, original_event_as_str, key=None): - originalevent = Event.from_json(original_event_as_str) - requesttag = Tag.parse(["request", original_event_as_str.replace("\\", "")]) - etag = Tag.parse(["e", originalevent.id().to_hex()]) - ptag = Tag.parse(["p", originalevent.pubkey().to_hex()]) - alttag = Tag.parse(["alt", "This is the result of a NIP90 DVM AI task with kind " + str( - originalevent.kind()) + ". The task was: " + originalevent.content()]) - statustag = Tag.parse(["status", "success"]) - replytags = [requesttag, etag, ptag, alttag, statustag] - for tag in originalevent.tags(): - if tag.as_vec()[0] == "i": - icontent = tag.as_vec()[1] - ikind = tag.as_vec()[2] - itag = Tag.parse(["i", icontent, ikind]) - replytags.append(itag) - - if key is None: - key = Keys.from_sk_str(dvm_config.PRIVATE_KEY) - - response_kind = originalevent.kind() + 1000 - event = EventBuilder(response_kind, str(content), replytags).to_event(key) - send_event(event, key=key) - print("[Nostr] " + str(response_kind) + " Job Response event sent: " + event.as_json()) - return event.as_json() - - client.handle_notifications(NotificationHandler()) - - def respond_to_error(content, originaleventstr, is_from_bot=False, dvm_key=None): - print("ERROR") - if dvm_key is None: - keys = Keys.from_sk_str(dvm_config.PRIVATE_KEY) - else: - keys = Keys.from_sk_str(dvm_key) - - original_event = Event.from_json(originaleventstr) - sender = "" - task = "" - if not is_from_bot: - send_job_status_reaction(original_event, "error", content=content, key=dvm_key) - # TODO Send Zap back - else: + def send_nostr_reply_event(content, original_event_as_str): + original_event = Event.from_json(original_event_as_str) + request_tag = Tag.parse(["request", original_event_as_str.replace("\\", "")]) + e_tag = Tag.parse(["e", original_event.id().to_hex()]) + p_tag = Tag.parse(["p", original_event.pubkey().to_hex()]) + alt_tag = Tag.parse(["alt", "This is the result of a NIP90 DVM AI task with kind " + str( + original_event.kind()) + ". The task was: " + original_event.content()]) + status_tag = Tag.parse(["status", "success"]) + reply_tags = [request_tag, e_tag, p_tag, alt_tag, status_tag] for tag in original_event.tags(): - if tag.as_vec()[0] == "p": - sender = tag.as_vec()[1] - elif tag.as_vec()[0] == "i": - task = tag.as_vec()[1] + if tag.as_vec()[0] == "i": + i_content = tag.as_vec()[1] + i_kind = tag.as_vec()[2] + i_tag = Tag.parse(["i", i_content, i_kind]) + reply_tags.append(i_tag) - user = get_from_sql_table(sender) - if not user.iswhitelisted: - amount = int(user.balance) + get_amount_per_task(task, dvm_config) - update_sql_table(sender, amount, user.iswhitelisted, user.isblacklisted, user.nip05, user.lud16, user.name, - Timestamp.now().as_secs()) - message = "There was the following error : " + content + ". Credits have been reimbursed" + key = Keys.from_sk_str(self.dvm_config.PRIVATE_KEY) + + response_kind = original_event.kind() + 1000 + reply_event = EventBuilder(response_kind, str(content), reply_tags).to_event(key) + send_event(reply_event, client=self.client, dvm_config=self.dvm_config) + print("[" + self.dvm_config.NIP89.name + "] " + str( + response_kind) + " Job Response event sent: " + reply_event.as_json()) + return reply_event.as_json() + + def respond_to_error(content: str, original_event_as_str: str, is_from_bot=False): + print("ERROR: " + str(content)) + keys = Keys.from_sk_str(self.dvm_config.PRIVATE_KEY) + original_event = Event.from_json(original_event_as_str) + sender = "" + task = "" + if not is_from_bot: + send_job_status_reaction(original_event, "error", content=content, dvm_config=self.dvm_config) + # TODO Send Zap back else: - # User didn't pay, so no reimbursement - message = "There was the following error : " + content + for tag in original_event.tags(): + if tag.as_vec()[0] == "p": + sender = tag.as_vec()[1] + elif tag.as_vec()[0] == "i": + task = tag.as_vec()[1] - evt = EventBuilder.new_encrypted_direct_msg(keys, PublicKey.from_hex(sender), message, None).to_event(keys) - send_event(evt, key=keys) + user = get_or_add_user(db=self.dvm_config.DB, npub=sender, client=self.client, config=self.dvm_config) + if not user.iswhitelisted: + amount = int(user.balance) + get_amount_per_task(task, self.dvm_config) + update_sql_table(self.dvm_config.DB, sender, amount, user.iswhitelisted, user.isblacklisted, + user.nip05, user.lud16, user.name, Timestamp.now().as_secs()) + message = "There was the following error : " + content + ". Credits have been reimbursed" + else: + # User didn't pay, so no reimbursement + message = "There was the following error : " + content - while True: - for job in job_list: - if job.bolt11 != "" and job.payment_hash != "" and not job.is_paid: - if str(check_bolt11_ln_bits_is_paid(job.payment_hash, dvm_config)) == "True": - job.is_paid = True - event = get_event_by_id(job.event_id, config=dvm_config) - if event != None: - send_job_status_reaction(event, "processing", True, 0, client=client, config=dvm_config) - print("do work from joblist") + evt = EventBuilder.new_encrypted_direct_msg(keys, PublicKey.from_hex(sender), message, + None).to_event(keys) + send_event(evt, client=self.client, dvm_config=self.dvm_config) - do_work(event, is_from_bot=False) - elif check_bolt11_ln_bits_is_paid(job.payment_hash, dvm_config) is None: # invoice expired - job_list.remove(job) + def send_job_status_reaction(original_event, status, is_paid=True, amount=0, client=None, + content=None, + dvm_config=None): + dvm_config = dvm_config + task = get_task(original_event, client=client, dvmconfig=dvm_config) + alt_description, reaction = build_status_reaction(status, task, amount, content) - if Timestamp.now().as_secs() > job.expires: - job_list.remove(job) + e_tag = Tag.parse(["e", original_event.id().to_hex()]) + p_tag = Tag.parse(["p", original_event.pubkey().to_hex()]) + alt_tag = Tag.parse(["alt", alt_description]) + status_tag = Tag.parse(["status", status]) + tags = [e_tag, p_tag, alt_tag, status_tag] - for job in jobs_on_hold_list: - if check_event_has_not_unfinished_job_input(job.event, False, client=client, dvmconfig=dvm_config): - handle_nip90_job_event(job.event) - jobs_on_hold_list.remove(job) + if status == "success" or status == "error": # + for x in self.job_list: + if x.event_id == original_event.id(): + is_paid = x.is_paid + amount = x.amount + break - if Timestamp.now().as_secs() > job.timestamp + 60 * 20: # remove jobs to look for after 20 minutes.. - jobs_on_hold_list.remove(job) + bolt11 = "" + payment_hash = "" + expires = original_event.created_at().as_secs() + (60 * 60 * 24) + if status == "payment-required" or (status == "processing" and not is_paid): + if dvm_config.LNBITS_INVOICE_KEY != "": + try: + bolt11, payment_hash = create_bolt11_ln_bits(amount, dvm_config) + except Exception as e: + print(e) - time.sleep(1.0) + if not any(x.event_id == original_event.id().to_hex() for x in self.job_list): + self.job_list.append( + JobToWatch(event_id=original_event.id().to_hex(), + timestamp=original_event.created_at().as_secs(), + amount=amount, + is_paid=is_paid, + status=status, result="", is_processed=False, bolt11=bolt11, + payment_hash=payment_hash, + expires=expires, from_bot=False)) + # print(str(self.job_list)) + if (status == "payment-required" or status == "payment-rejected" or ( + status == "processing" and not is_paid) + or (status == "success" and not is_paid)): + + if dvm_config.LNBITS_INVOICE_KEY != "": + amount_tag = Tag.parse(["amount", str(amount * 1000), bolt11]) + else: + amount_tag = Tag.parse(["amount", str(amount * 1000)]) # to millisats + tags.append(amount_tag) + + keys = Keys.from_sk_str(dvm_config.PRIVATE_KEY) + event = EventBuilder(EventDefinitions.KIND_FEEDBACK, reaction, tags).to_event(keys) + + send_event(event, client=self.client, dvm_config=self.dvm_config) + print("[" + self.dvm_config.NIP89.name + "]" + ": Sent Kind " + str( + EventDefinitions.KIND_FEEDBACK) + " Reaction: " + status + " " + event.as_json()) + return event.as_json() + + def do_work(job_event, is_from_bot=False): + if ((EventDefinitions.KIND_NIP90_EXTRACT_TEXT <= job_event.kind() <= EventDefinitions.KIND_NIP90_GENERIC) + or job_event.kind() == EventDefinitions.KIND_DM): + + task = get_task(job_event, client=self.client, dvmconfig=self.dvm_config) + for dvm in self.dvm_config.SUPPORTED_DVMS: + try: + if task == dvm.TASK: + request_form = dvm.create_request_form_from_nostr_event(job_event, self.client, + self.dvm_config) + result = dvm.process(request_form) + check_and_return_event(result, str(job_event.as_json()), is_from_bot=is_from_bot) + + except Exception as e: + print(e) + respond_to_error(str(e), job_event.as_json(), is_from_bot=is_from_bot) + return + + self.client.handle_notifications(NotificationHandler()) + while True: + for job in self.job_list: + if job.bolt11 != "" and job.payment_hash != "" and not job.is_paid: + if str(check_bolt11_ln_bits_is_paid(job.payment_hash, self.dvm_config)) == "True": + job.is_paid = True + event = get_event_by_id(job.event_id, client=self.client, config=self.dvm_config) + if event is not None: + send_job_status_reaction(event, "processing", True, 0, + client=self.client, + dvm_config=self.dvm_config) + print("do work from joblist") + + do_work(event, is_from_bot=False) + elif check_bolt11_ln_bits_is_paid(job.payment_hash, self.dvm_config) is None: # invoice expired + try: + self.job_list.remove(job) + except: + print("Error removing Job from List after payment") + + if Timestamp.now().as_secs() > job.expires: + try: + self.job_list.remove(job) + except: + print("Error removing Job from List after expiry") + + for job in self.jobs_on_hold_list: + if check_event_has_not_unfinished_job_input(job.event, False, client=self.client, + dvmconfig=self.dvm_config): + handle_nip90_job_event(nip90_event=job.event) + try: + self.jobs_on_hold_list.remove(job) + except: + print("Error removing Job on Hold from List after expiry") + + if Timestamp.now().as_secs() > job.timestamp + 60 * 20: # remove jobs to look for after 20 minutes.. + self.jobs_on_hold_list.remove(job) + + time.sleep(1.0) diff --git a/interfaces/dvmtaskinterface.py b/interfaces/dvmtaskinterface.py index 25316c0..10fa5a4 100644 --- a/interfaces/dvmtaskinterface.py +++ b/interfaces/dvmtaskinterface.py @@ -1,11 +1,48 @@ +import json +from threading import Thread + +from utils.admin_utils import AdminConfig +from utils.dvmconfig import DVMConfig +from utils.nip89_utils import NIP89Announcement, NIP89Config +from dvm import DVM + + class DVMTaskInterface: + NAME: str KIND: int TASK: str COST: int + PK: str + DVM = DVM + dvm_config: DVMConfig + admin_config: AdminConfig + + def NIP89_announcement(self, nip89config: NIP89Config): + nip89 = NIP89Announcement() + nip89.name = self.NAME + nip89.kind = self.KIND + nip89.pk = self.PK + nip89.dtag = nip89config.DTAG + nip89.content = nip89config.CONTENT + return nip89 + + def init(self, name, dvm_config, admin_config, nip89config): + self.NAME = name + self.PK = dvm_config.PRIVATE_KEY + if dvm_config.COST is not None: + self.COST = dvm_config.COST + + dvm_config.SUPPORTED_DVMS = [self] + dvm_config.DB = "db/" + self.NAME + ".db" + dvm_config.NIP89 = self.NIP89_announcement(nip89config) + self.dvm_config = dvm_config + self.admin_config = admin_config + + + def run(self): + nostr_dvm_thread = Thread(target=self.DVM, args=[self.dvm_config, self.admin_config]) + nostr_dvm_thread.start() - def NIP89_announcement(self): - """Define the NIP89 Announcement""" - pass def is_input_supported(self, input_type, input_content) -> bool: """Check if input is supported for current Task.""" @@ -20,13 +57,11 @@ class DVMTaskInterface: pass @staticmethod - def setOptions(request_form): + def set_options(request_form): print("Setting options...") opts = [] - if request_form.get("optStr"): - for k, v in [option.split("=") for option in request_form["optStr"].split(";")]: - t = (k, v) - opts.append(t) - print(k + "=" + v) - print("...done.") + if request_form.get("options"): + opts = json.loads(request_form["options"]) + print(opts) + return dict(opts) diff --git a/main.py b/main.py index b8a12f1..dfc08ec 100644 --- a/main.py +++ b/main.py @@ -1,49 +1,80 @@ import os +import signal +import sys +import time from pathlib import Path from threading import Thread import dotenv -import utils.env as env -from tasks.textextractionPDF import TextExtractionPDF -from tasks.translation import Translation -from utils.definitions import EventDefinitions +from nostr_sdk import Keys + +from bot import Bot +from playground import build_pdf_extractor, build_translator, build_unstable_diffusion, build_sketcher, build_dalle +from utils.dvmconfig import DVMConfig def run_nostr_dvm_with_local_config(): - from dvm import dvm, DVMConfig + # We extract the Publickey from our bot, so the DVMs know who they should listen and react to. + bot_publickey = Keys.from_sk_str(os.getenv("BOT_PRIVATE_KEY")).public_key() - dvmconfig = DVMConfig() - dvmconfig.PRIVATE_KEY = os.getenv(env.NOSTR_PRIVATE_KEY) + # We will run an optional bot that can communicate with the DVMs + # Note this is very basic for now and still under development + bot_config = DVMConfig() + bot_config.PRIVATE_KEY = os.getenv("BOT_PRIVATE_KEY") + bot_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY") + bot_config.LNBITS_URL = os.getenv("LNBITS_HOST") - #Spawn two DVMs - PDFextactor = TextExtractionPDF("PDF Extractor", env.NOSTR_PRIVATE_KEY) - Translator = Translation("Translator", env.NOSTR_PRIVATE_KEY) + # Spawn some DVMs in the playground and run them + # You can add arbitrary DVMs there and instantiate them here - #Add the 2 DVMS to the config - dvmconfig.SUPPORTED_TASKS = [PDFextactor, Translator] + # Spawn DVM1 Kind 5000: A local Text Extractor from PDFs + pdfextractor = build_pdf_extractor("PDF Extractor", [bot_publickey]) + # If we don't add it to the bot, the bot will not provide access to the DVM + pdfextractor.run() - # Add NIP89 events for both DVMs (set rebroad_cast = True in admin_utils) - # Add the dtag in your .env file so you can update your dvm later and change the content in the module file as needed. - # Get a dtag at vendata.io - dvmconfig.NIP89s.append(PDFextactor.NIP89_announcement()) - dvmconfig.NIP89s.append(Translator.NIP89_announcement()) + # Spawn DVM2 Kind 5002 Local Text Translation, calling the free Google API. + translator = build_translator("Translator", [bot_publickey]) + bot_config.SUPPORTED_DVMS.append(translator) # We add translator to the bot + translator.run() - #SET Lnbits Invoice Key and Server if DVM should provide invoices directly, else make sure you have a lnaddress on the profile - dvmconfig.LNBITS_INVOICE_KEY = os.getenv(env.LNBITS_INVOICE_KEY) - dvmconfig.LNBITS_URL = os.getenv(env.LNBITS_HOST) + # Spawn DVM3 Kind 5100 Image Generation This one uses a specific backend called nova-server. + # If you want to use it, see the instructions in backends/nova_server + if os.getenv("NOVA_SERVER") is not None and os.getenv("NOVA_SERVER") != "": + unstable_artist = build_unstable_diffusion("Unstable Diffusion", [bot_publickey]) + bot_config.SUPPORTED_DVMS.append(unstable_artist) # We add unstable Diffusion to the bot + unstable_artist.run() - #Start the DVM - nostr_dvm_thread = Thread(target=dvm, args=[dvmconfig]) - nostr_dvm_thread.start() + # Spawn DVM4, another Instance of text-to-image, as before but use a different privatekey, model and lora this time. + if os.getenv("NOVA_SERVER") is not None and os.getenv("NOVA_SERVER") != "": + sketcher = build_sketcher("Sketcher", [bot_publickey]) + bot_config.SUPPORTED_DVMS.append(sketcher) # We also add Sketcher to the bot + sketcher.run() + + # Spawn DVM5, this one requires an OPENAI API Key and balance with OpenAI, you will move the task to them and pay + # per call. Make sure you have enough balance and the DVM's cost is set higher than what you pay yourself, except, you know, + # you're being generous. + if os.getenv("OPENAI_API_KEY") is not None and os.getenv("OPENAI_API_KEY") != "": + dalle = build_dalle("Dall-E 3", [bot_publickey]) + bot_config.SUPPORTED_DVMS.append(dalle) + dalle.run() + + bot = Bot(bot_config) + bot.run() + + # Keep the main function alive for libraries like openai + try: + while True: + time.sleep(10) + except KeyboardInterrupt: + print('Stay weird!') + os.kill(os.getpid(), signal.SIGKILL) if __name__ == '__main__': - env_path = Path('.env') if env_path.is_file(): print(f'loading environment from {env_path.resolve()}') dotenv.load_dotenv(env_path, verbose=True, override=True) else: raise FileNotFoundError(f'.env file not found at {env_path} ') - run_nostr_dvm_with_local_config() diff --git a/playground.py b/playground.py new file mode 100644 index 0000000..9666a9f --- /dev/null +++ b/playground.py @@ -0,0 +1,215 @@ +import json +import os + +from tasks.imagegeneration_openai_dalle import ImageGenerationDALLE +from tasks.imagegeneration_sdxl import ImageGenerationSDXL +from tasks.textextractionpdf import TextExtractionPDF +from tasks.translation import Translation +from utils.admin_utils import AdminConfig +from utils.dvmconfig import DVMConfig +from utils.nip89_utils import NIP89Config + +""" +This File is a playground to create DVMs. It shows some examples of DVMs that make use of the modules in the tasks folder +These DVMs should be considered examples and will be extended in the future. env variables are used to not commit keys, +but if used privatley, these can also be directly filled in this file. The main.py function calls some of the functions +defined here and starts the DVMs. + +Note that the admin_config is optional, and if given commands as defined in admin_utils will be called at start of the +DVM. For example the NIP89 event can be rebroadcasted (store the d_tag somewhere). + +DM_ALLOWED is used to tell the DVM to which npubs it should listen to. We use this here to listen to our bot, +as defined in main.py to perform jobs on it's behalf and reply. + +if LNBITS_INVOICE_KEY is not set (=""), the DVM is still zappable but a lud16 address in required in the profile. + +additional options can be set, for example to preinitalize vaiables or give parameters that are required to perform a +task, for example an address or an API key. + + +""" + +# Generate an optional Admin Config, in this case, whenever we give our DVMs this config, they will (re)broadcast +# their NIP89 announcement +admin_config = AdminConfig() +admin_config.REBROADCAST_NIP89 = False +# Set rebroadcast to true once you have set your NIP89 descriptions and d tags. You only need to rebroadcast once you +# want to update your NIP89 descriptions + + +def build_pdf_extractor(name, dm_allowed_keys): + dvm_config = DVMConfig() + dvm_config.PRIVATE_KEY = os.getenv("NOSTR_PRIVATE_KEY") + dvm_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY") + dvm_config.LNBITS_URL = os.getenv("LNBITS_HOST") + dvm_config.DM_ALLOWED = dm_allowed_keys + # Add NIP89 + nip90params = {} + nip89info = { + "name": name, + "image": "https://image.nostr.build/c33ca6fc4cc038ca4adb46fdfdfda34951656f87ee364ef59095bae1495ce669.jpg", + "about": "I extract text from pdf documents", + "nip90Params": nip90params + } + + nip89config = NIP89Config() + nip89config.DTAG = os.getenv("TASK_TEXT_EXTRACTION_NIP89_DTAG") + nip89config.CONTENT = json.dumps(nip89info) + return TextExtractionPDF(name=name, dvm_config=dvm_config, nip89config=nip89config, + admin_config=admin_config) + + +def build_translator(name, dm_allowed_keys): + dvm_config = DVMConfig() + dvm_config.PRIVATE_KEY = os.getenv("NOSTR_PRIVATE_KEY") + dvm_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY") + dvm_config.LNBITS_URL = os.getenv("LNBITS_HOST") + dvm_config.DM_ALLOWED = dm_allowed_keys + + nip90params = { + "language": { + "required": False, + "values": ["en", "az", "be", "bg", "bn", "bs", "ca", "ceb", "co", "cs", "cy", "da", "de", "el", "eo", "es", + "et", "eu", "fa", "fi", "fr", "fy", "ga", "gd", "gl", "gu", "ha", "haw", "hi", "hmn", "hr", "ht", + "hu", "hy", "id", "ig", "is", "it", "he", "ja", "jv", "ka", "kk", "km", "kn", "ko", "ku", "ky", + "la", "lb", "lo", "lt", "lv", "mg", "mi", "mk", "ml", "mn", "mr", "ms", "mt", "my", "ne", "nl", + "no", "ny", "or", "pa", "pl", "ps", "pt", "ro", "ru", "sd", "si", "sk", "sl", "sm", "sn", "so", + "sq", "sr", "st", "su", "sv", "sw", "ta", "te", "tg", "th", "tl", "tr", "ug", "uk", "ur", "uz", + "vi", "xh", "yi", "yo", "zh", "zu"] + } + } + nip89info = { + "name": name, + "image": "https://image.nostr.build/c33ca6fc4cc038ca4adb46fdfdfda34951656f87ee364ef59095bae1495ce669.jpg", + "about": "I translate text from given text/event/job. Currently using Google Translation Services to translate " + "input into the language defined in params.", + "nip90Params": nip90params + } + nip89config = NIP89Config() + nip89config.DTAG = os.getenv("TASK_TRANSLATION_NIP89_DTAG") + nip89config.CONTENT = json.dumps(nip89info) + return Translation(name=name, dvm_config=dvm_config, nip89config=nip89config, + admin_config=admin_config) + + +def build_unstable_diffusion(name, dm_allowed_keys): + dvm_config = DVMConfig() + dvm_config.PRIVATE_KEY = os.getenv("NOSTR_PRIVATE_KEY") + dvm_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY") + dvm_config.LNBITS_URL = os.getenv("LNBITS_HOST") + dvm_config.DM_ALLOWED = dm_allowed_keys + + # A module might have options it can be initialized with, here we set a default model, and the nova-server + # address it should use. These parameters can be freely defined in the task component + options = {'default_model': "unstable", 'nova_server': os.getenv("NOVA_SERVER")} + + nip90params = { + "negative_prompt": { + "required": False, + "values": [] + }, + "ratio": { + "required": False, + "values": ["1:1", "4:3", "16:9", "3:4", "9:16", "10:16"] + } + } + nip89info = { + "name": name, + "image": "https://image.nostr.build/c33ca6fc4cc038ca4adb46fdfdfda34951656f87ee364ef59095bae1495ce669.jpg", + "about": "I draw images based on a prompt with a Model called unstable diffusion", + "nip90Params": nip90params + } + nip89config = NIP89Config() + nip89config.DTAG = os.getenv("TASK_IMAGE_GENERATION_NIP89_DTAG") + nip89config.CONTENT = json.dumps(nip89info) + return ImageGenerationSDXL(name=name, dvm_config=dvm_config, nip89config=nip89config, + admin_config=admin_config, options=options) + + +def build_sketcher(name, dm_allowed_keys): + dvm_config = DVMConfig() + dvm_config.PRIVATE_KEY = os.getenv("NOSTR_PRIVATE_KEY2") + dvm_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY") + dvm_config.LNBITS_URL = os.getenv("LNBITS_HOST") + dvm_config.DM_ALLOWED = dm_allowed_keys + + nip90params = { + "negative_prompt": { + "required": False, + "values": [] + }, + "ratio": { + "required": False, + "values": ["1:1", "4:3", "16:9", "3:4", "9:16", "10:16"] + } + } + nip89info = { + "name": name, + "image": "https://image.nostr.build/229c14e440895da30de77b3ca145d66d4b04efb4027ba3c44ca147eecde891f1.jpg", + "about": "I draw images based on a prompt in the style of paper sketches", + "nip90Params": nip90params + } + + # A module might have options it can be initialized with, here we set a default model, lora and the nova-server + # address it should use. These parameters can be freely defined in the task component + options = {'default_model': "mohawk", 'default_lora': "timburton", 'nova_server': os.getenv("NOVA_SERVER")} + + nip89config = NIP89Config() + nip89config.DTAG = os.getenv("TASK_IMAGE_GENERATION_NIP89_DTAG2") + nip89config.CONTENT = json.dumps(nip89info) + # We add an optional AdminConfig for this one, and tell the dvm to rebroadcast its NIP89 + return ImageGenerationSDXL(name=name, dvm_config=dvm_config, nip89config=nip89config, + admin_config=admin_config, options=options) + + +def build_dalle(name, dm_allowed_keys): + dvm_config = DVMConfig() + dvm_config.PRIVATE_KEY = os.getenv("NOSTR_PRIVATE_KEY3") + dvm_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY") + dvm_config.LNBITS_URL = os.getenv("LNBITS_HOST") + dvm_config.DM_ALLOWED = dm_allowed_keys + profit_in_sats = 10 + dvm_config.COST = int(((4.0 / (get_price_per_sat("USD") * 100)) + profit_in_sats)) + + nip90params = { + "size": { + "required": False, + "values": ["1024:1024", "1024x1792", "1792x1024"] + } + } + nip89info = { + "name": name, + "image": "https://image.nostr.build/c33ca6fc4cc038ca4adb46fdfdfda34951656f87ee364ef59095bae1495ce669.jpg", + "about": "I use OpenAI's DALL·E 3", + "nip90Params": nip90params + } + + # A module might have options it can be initialized with, here we set a default model, lora and the nova-server + # address it should use. These parameters can be freely defined in the task component + + nip89config = NIP89Config() + nip89config.DTAG = os.getenv("TASK_IMAGE_GENERATION_NIP89_DTAG3") + nip89config.CONTENT = json.dumps(nip89info) + # We add an optional AdminConfig for this one, and tell the dvm to rebroadcast its NIP89 + return ImageGenerationDALLE(name=name, dvm_config=dvm_config, nip89config=nip89config, + admin_config=admin_config) + + +# Little Gimmick: +# For Dalle where we have to pay 4cent per image, we fetch current sat price in fiat +# and update cost at each start +def get_price_per_sat(currency): + import requests + + url = "https://api.coinstats.app/public/v1/coins" + params = {"skip": 0, "limit": 1, "currency": currency} + try: + response = requests.get(url, params=params) + response_json = response.json() + + bitcoin_price = response_json["coins"][0]["price"] + price_currency_per_sat = bitcoin_price / 100000000.0 + except: + price_currency_per_sat = 0.0004 + + return price_currency_per_sat diff --git a/requirements.txt b/requirements.txt index 16bad4f..82a7a88 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,7 @@ ffmpegio-core==0.8.5 idna==3.4 inquirer==3.1.3 install==1.3.5 -nostr-sdk==0.0.4 +nostr-sdk==0.0.5 numpy==1.26.2 packaging==23.2 pandas==2.1.3 @@ -21,6 +21,7 @@ python-dateutil==2.8.2 python-dotenv==1.0.0 python-editor==1.0.4 pytz==2023.3.post1 +PyUpload~=0.1.4 pyuseragents==1.0.5 readchar==4.0.5 requests==2.31.0 @@ -31,3 +32,4 @@ translatepy==2.3 tzdata==2023.3 urllib3==2.1.0 wcwidth==0.2.10 + diff --git a/tasks/README.md b/tasks/README.md new file mode 100644 index 0000000..7dcf575 --- /dev/null +++ b/tasks/README.md @@ -0,0 +1,13 @@ +# NostrAI Data Vending Machine Tasks + +Here Tasks can be defined. Tasks need to follow the DVMTaskInterface as defined in interfaces. +Tasks can either happen locally (especially if they are fast) or they can call an alternative backend. +Reusable backend functions can be defined in backends (e.g. API calls) + +Current List of Tasks: + +| Module | Kind | Description | Backend | +|---------------------|------|-------------------------------------------|---------------------------| +| Translation | 5002 | Translates Inputs to another language | Local, calling Google API | +| TextExtractionPDF | 5001 | Extracts Text from a PDF file | Local | +| ImageGenerationSDXL | 5100 | Generates an Image with StableDiffusionXL | nova-server | \ No newline at end of file diff --git a/tasks/imagegeneration_openai_dalle.py b/tasks/imagegeneration_openai_dalle.py new file mode 100644 index 0000000..1b1784a --- /dev/null +++ b/tasks/imagegeneration_openai_dalle.py @@ -0,0 +1,103 @@ +import json +import os +import time +from multiprocessing.pool import ThreadPool +from threading import Thread + +from backends.nova_server import check_nova_server_status, send_request_to_nova_server +from dvm import DVM +from interfaces.dvmtaskinterface import DVMTaskInterface +from utils.admin_utils import AdminConfig +from utils.definitions import EventDefinitions +from utils.dvmconfig import DVMConfig +from utils.nip89_utils import NIP89Config + +""" +This File contains a Module to transform Text input on NOVA-Server and receive results back. + +Accepted Inputs: Prompt (text) +Outputs: An url to an Image +Params: -model # models: juggernaut, dynavision, colossusProject, newreality, unstable + -lora # loras (weights on top of models) voxel, +""" + + +class ImageGenerationDALLE(DVMTaskInterface): + KIND: int = EventDefinitions.KIND_NIP90_GENERATE_IMAGE + TASK: str = "text-to-image" + COST: int = 120 + + def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config, admin_config: AdminConfig = None, + options=None): + + self.init(name, dvm_config, admin_config, nip89config) + self.options = options + + def is_input_supported(self, input_type, input_content): + if input_type != "text": + return False + return True + + def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None): + request_form = {"jobID": event.id().to_hex() + "_" + self.NAME.replace(" ", "")} + prompt = "" + width = "1024" + height = "1024" + model = "dall-e-3" + quality = "standard" + + for tag in event.tags(): + if tag.as_vec()[0] == 'i': + input_type = tag.as_vec()[2] + if input_type == "text": + prompt = tag.as_vec()[1] + + elif tag.as_vec()[0] == 'param': + print("Param: " + tag.as_vec()[1] + ": " + tag.as_vec()[2]) + if tag.as_vec()[1] == "size": + if len(tag.as_vec()) > 3: + width = (tag.as_vec()[2]) + height = (tag.as_vec()[3]) + elif len(tag.as_vec()) == 3: + split = tag.as_vec()[2].split("x") + if len(split) > 1: + width = split[0] + height = split[1] + elif tag.as_vec()[1] == "model": + model = tag.as_vec()[2] + elif tag.as_vec()[1] == "quality": + quality = tag.as_vec()[2] + + options = { + "prompt": prompt, + "size": width + "x" + height, + "model": model, + "quality": quality, + "number": 1 + } + request_form['options'] = json.dumps(options) + + return request_form + + def process(self, request_form): + try: + options = DVMTaskInterface.set_options(request_form) + + from openai import OpenAI + client = OpenAI() + print("Job " + request_form['jobID'] + " sent to OpenAI API..") + + response = client.images.generate( + model=options['model'], + prompt=options['prompt'], + size=options['size'], + quality=options['quality'], + n=int(options['number']), + ) + + image_url = response.data[0].url + return image_url + + except Exception as e: + print("Error in Module") + raise Exception(e) diff --git a/tasks/imagegeneration_sdxl.py b/tasks/imagegeneration_sdxl.py new file mode 100644 index 0000000..df07576 --- /dev/null +++ b/tasks/imagegeneration_sdxl.py @@ -0,0 +1,150 @@ +import json +from multiprocessing.pool import ThreadPool + +from backends.nova_server import check_nova_server_status, send_request_to_nova_server +from dvm import DVM +from interfaces.dvmtaskinterface import DVMTaskInterface +from utils.admin_utils import AdminConfig +from utils.definitions import EventDefinitions +from utils.dvmconfig import DVMConfig +from utils.nip89_utils import NIP89Config + +""" +This File contains a Module to transform Text input on NOVA-Server and receive results back. + +Accepted Inputs: Prompt (text) +Outputs: An url to an Image +Params: -model # models: juggernaut, dynavision, colossusProject, newreality, unstable + -lora # loras (weights on top of models) voxel, +""" + + +class ImageGenerationSDXL(DVMTaskInterface): + KIND: int = EventDefinitions.KIND_NIP90_GENERATE_IMAGE + TASK: str = "text-to-image" + COST: int = 50 + + def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config, admin_config: AdminConfig = None, options=None): + self.init(name, dvm_config, admin_config, nip89config) + self.options = options + + def is_input_supported(self, input_type, input_content): + if input_type != "text": + return False + return True + + def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None): + request_form = {"jobID": event.id().to_hex() + "_" + self.NAME.replace(" ", "")} + request_form["trainerFilePath"] = 'modules\\stablediffusionxl\\stablediffusionxl.trainer' + + prompt = "" + negative_prompt = "" + if self.options.get("default_model"): + model = self.options['default_model'] + else: + model = "stabilityai/stable-diffusion-xl-base-1.0" + + ratio_width = "1" + ratio_height = "1" + width = "" + height = "" + if self.options.get("default_lora"): + lora = self.options['default_lora'] + else: + lora = "" + lora_weight = "" + strength = "" + guidance_scale = "" + for tag in event.tags(): + if tag.as_vec()[0] == 'i': + input_type = tag.as_vec()[2] + if input_type == "text": + prompt = tag.as_vec()[1] + + elif tag.as_vec()[0] == 'param': + print("Param: " + tag.as_vec()[1] + ": " + tag.as_vec()[2]) + if tag.as_vec()[1] == "negative_prompt": + negative_prompt = tag.as_vec()[2] + elif tag.as_vec()[1] == "lora": + lora = tag.as_vec()[2] + elif tag.as_vec()[1] == "lora_weight": + lora_weight = tag.as_vec()[2] + elif tag.as_vec()[1] == "strength": + strength = tag.as_vec()[2] + elif tag.as_vec()[1] == "guidance_scale": + guidance_scale = tag.as_vec()[2] + elif tag.as_vec()[1] == "ratio": + if len(tag.as_vec()) > 3: + ratio_width = (tag.as_vec()[2]) + ratio_height = (tag.as_vec()[3]) + elif len(tag.as_vec()) == 3: + split = tag.as_vec()[2].split(":") + ratio_width = split[0] + ratio_height = split[1] + # if size is set it will overwrite ratio. + elif tag.as_vec()[1] == "size": + if len(tag.as_vec()) > 3: + width = (tag.as_vec()[2]) + height = (tag.as_vec()[3]) + elif len(tag.as_vec()) == 3: + split = tag.as_vec()[2].split("x") + if len(split) > 1: + width = split[0] + height = split[1] + elif tag.as_vec()[1] == "model": + model = tag.as_vec()[2] + + io_input = { + "id": "input_prompt", + "type": "input", + "src": "request:text", + "data": prompt + } + io_negative = { + "id": "negative_prompt", + "type": "input", + "src": "request:text", + "data": negative_prompt + } + io_output = { + "id": "output_image", + "type": "output", + "src": "request:image" + } + + request_form['data'] = json.dumps([io_input, io_negative, io_output]) + + options = { + "model": model, + "ratio": ratio_width + '-' + ratio_height, + "width": width, + "height": height, + "strength": strength, + "guidance_scale": guidance_scale, + "lora": lora, + "lora_weight": lora_weight + } + request_form['options'] = json.dumps(options) + + # old format, deprecated, will remove + request_form["optStr"] = ('model=' + model + ';ratio=' + str(ratio_width) + '-' + str(ratio_height) + ';size=' + + str(width) + '-' + str(height) + ';strength=' + str(strength) + ';guidance_scale=' + + str(guidance_scale) + ';lora=' + lora + ';lora_weight=' + lora_weight) + + return request_form + + def process(self, request_form): + try: + # Call the process route of NOVA-Server with our request form. + response = send_request_to_nova_server(request_form, self.options['nova_server']) + if bool(json.loads(response)['success']): + print("Job " + request_form['jobID'] + " sent to NOVA-server") + + pool = ThreadPool(processes=1) + thread = pool.apply_async(check_nova_server_status, (request_form['jobID'], self.options['nova_server'])) + print("Wait for results of NOVA-Server...") + result = thread.get() + return str(result) + + except Exception as e: + raise Exception(e) diff --git a/tasks/textextractionPDF.py b/tasks/textextractionpdf.py similarity index 56% rename from tasks/textextractionPDF.py rename to tasks/textextractionpdf.py index 05ac8ce..ba85899 100644 --- a/tasks/textextractionPDF.py +++ b/tasks/textextractionpdf.py @@ -1,32 +1,37 @@ +import json import os import re +from threading import Thread +from dvm import DVM from interfaces.dvmtaskinterface import DVMTaskInterface -from utils import env +from utils.admin_utils import AdminConfig from utils.definitions import EventDefinitions -from utils.nip89_utils import NIP89Announcement -from utils.nostr_utils import get_event_by_id, get_referenced_event_by_id +from utils.dvmconfig import DVMConfig +from utils.nip89_utils import NIP89Config +from utils.nostr_utils import get_event_by_id + +""" +This File contains a Module to extract Text from a PDF file locally on the DVM Machine + +Accepted Inputs: Url to pdf file, Event containing an URL to a PDF file +Outputs: Text containing the extracted contents of the PDF file +Params: None +""" class TextExtractionPDF(DVMTaskInterface): KIND: int = EventDefinitions.KIND_NIP90_EXTRACT_TEXT TASK: str = "pdf-to-text" - COST: int = 20 + COST: int = 0 - def __init__(self, name, pk): - self.NAME = name - self.PK = pk + def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config, admin_config: AdminConfig = None, options=None): + self.init(name, dvm_config, admin_config, nip89config) + self.options = options - def NIP89_announcement(self): - nip89 = NIP89Announcement() - nip89.kind = self.KIND - nip89.pk = self.PK - nip89.dtag = os.getenv(env.TASK_TEXTEXTRACTION_NIP89_DTAG) - nip89.content = "{\"name\":\"" + self.NAME + "\",\"image\":\"https://image.nostr.build/c33ca6fc4cc038ca4adb46fdfdfda34951656f87ee364ef59095bae1495ce669.jpg\",\"about\":\"I extract Text from pdf documents\",\"nip90Params\":{}}" - return nip89 def is_input_supported(self, input_type, input_content): - if input_type != "url": + if input_type != "url" and input_type != "event": return False return True @@ -45,23 +50,24 @@ class TextExtractionPDF(DVMTaskInterface): if input_type == "url": url = input_content + # if event contains url to pdf, we checked for a pdf link before elif input_type == "event": - evt = get_event_by_id(input_content, config=dvm_config) - url = re.search("(?Phttps?://[^\s]+)", evt.content()).group("url") - elif input_type == "job": - evt = get_referenced_event_by_id(input_content, [EventDefinitions.KIND_NIP90_RESULT_GENERATE_IMAGE], - client, config=dvm_config) - + evt = get_event_by_id(input_content, client=client, config=dvm_config) url = re.search("(?Phttps?://[^\s]+)", evt.content()).group("url") - request_form["optStr"] = 'url=' + url + options = { + "url": url, + } + request_form['options'] = json.dumps(options) return request_form def process(self, request_form): - options = DVMTaskInterface.setOptions(request_form) from pypdf import PdfReader from pathlib import Path import requests + + options = DVMTaskInterface.set_options(request_form) + try: file_path = Path('temp.pdf') response = requests.get(options["url"]) @@ -76,4 +82,4 @@ class TextExtractionPDF(DVMTaskInterface): os.remove('temp.pdf') return text except Exception as e: - raise Exception(e) \ No newline at end of file + raise Exception(e) diff --git a/tasks/translation.py b/tasks/translation.py index 01e99ff..d852265 100644 --- a/tasks/translation.py +++ b/tasks/translation.py @@ -1,28 +1,32 @@ -import os +import json +from threading import Thread +from dvm import DVM from interfaces.dvmtaskinterface import DVMTaskInterface -from utils import env +from utils.admin_utils import AdminConfig from utils.definitions import EventDefinitions -from utils.nip89_utils import NIP89Announcement +from utils.dvmconfig import DVMConfig +from utils.nip89_utils import NIP89Config from utils.nostr_utils import get_referenced_event_by_id, get_event_by_id +""" +This File contains a Module to call Google Translate Services locally on the DVM Machine + +Accepted Inputs: Text, Events, Jobs (Text Extraction, Summary, Translation) +Outputs: Text containing the Translation in the desired language. +Params: -language The target language +""" + class Translation(DVMTaskInterface): KIND: int = EventDefinitions.KIND_NIP90_TRANSLATE_TEXT TASK: str = "translation" COST: int = 0 - def __init__(self, name, pk): - self.NAME = name - self.PK = pk - - def NIP89_announcement(self): - nip89 = NIP89Announcement() - nip89.kind = self.KIND - nip89.pk = self.PK - nip89.dtag = os.getenv(env.TASK_TRANSLATION_NIP89_DTAG) - nip89.content = "{\"name\":\"" + self.NAME + "\",\"image\":\"https://image.nostr.build/c33ca6fc4cc038ca4adb46fdfdfda34951656f87ee364ef59095bae1495ce669.jpg\",\"about\":\"I translate text from given text/event/job, currently using Google Translation Services into language defined in param. \",\"nip90Params\":{\"language\":{\"required\":true,\"values\":[\"af\",\"am\",\"ar\",\"az\",\"be\",\"bg\",\"bn\",\"bs\",\"ca\",\"ceb\",\"co\",\"cs\",\"cy\",\"da\",\"de\",\"el\",\"eo\",\"es\",\"et\",\"eu\",\"fa\",\"fi\",\"fr\",\"fy\",\"ga\",\"gd\",\"gl\",\"gu\",\"ha\",\"haw\",\"hi\",\"hmn\",\"hr\",\"ht\",\"hu\",\"hy\",\"id\",\"ig\",\"is\",\"it\",\"he\",\"ja\",\"jv\",\"ka\",\"kk\",\"km\",\"kn\",\"ko\",\"ku\",\"ky\",\"la\",\"lb\",\"lo\",\"lt\",\"lv\",\"mg\",\"mi\",\"mk\",\"ml\",\"mn\",\"mr\",\"ms\",\"mt\",\"my\",\"ne\",\"nl\",\"no\",\"ny\",\"or\",\"pa\",\"pl\",\"ps\",\"pt\",\"ro\",\"ru\",\"sd\",\"si\",\"sk\",\"sl\",\"sm\",\"sn\",\"so\",\"sq\",\"sr\",\"st\",\"su\",\"sv\",\"sw\",\"ta\",\"te\",\"tg\",\"th\",\"tl\",\"tr\",\"ug\",\"uk\",\"ur\",\"uz\",\"vi\",\"xh\",\"yi\",\"yo\",\"zh\",\"zu\"]}}}" - return nip89 + def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config, admin_config: AdminConfig = None, + options=None): + self.init(name, dvm_config, admin_config, nip89config) + self.options = options def is_input_supported(self, input_type, input_content): if input_type != "event" and input_type != "job" and input_type != "text": @@ -50,7 +54,7 @@ class Translation(DVMTaskInterface): if input_type == "event": for tag in event.tags(): if tag.as_vec()[0] == 'i': - evt = get_event_by_id(tag.as_vec()[1], config=dvm_config) + evt = get_event_by_id(tag.as_vec()[1], client=client, config=dvm_config) text = evt.content() break @@ -63,22 +67,26 @@ class Translation(DVMTaskInterface): elif input_type == "job": for tag in event.tags(): if tag.as_vec()[0] == 'i': - evt = get_referenced_event_by_id(tag.as_vec()[1], - [EventDefinitions.KIND_NIP90_RESULT_EXTRACT_TEXT, - EventDefinitions.KIND_NIP90_RESULT_SUMMARIZE_TEXT], - client, - config=dvm_config) + evt = get_referenced_event_by_id(event_id=tag.as_vec()[1], client=client, + kinds=[EventDefinitions.KIND_NIP90_RESULT_EXTRACT_TEXT, + EventDefinitions.KIND_NIP90_RESULT_SUMMARIZE_TEXT, + EventDefinitions.KIND_NIP90_RESULT_TRANSLATE_TEXT], + dvm_config=dvm_config) text = evt.content() break - request_form["optStr"] = ('translation_lang=' + translation_lang + ';text=' + - text.replace('\U0001f919', "").replace("=", "equals"). - replace(";", ",")) + options = { + "text": text, + "language": translation_lang + } + request_form['options'] = json.dumps(options) + return request_form def process(self, request_form): - options = DVMTaskInterface.setOptions(request_form) from translatepy.translators.google import GoogleTranslate + + options = DVMTaskInterface.set_options(request_form) gtranslate = GoogleTranslate() length = len(options["text"]) @@ -89,7 +97,7 @@ class Translation(DVMTaskInterface): text_part = options["text"][step:step + 5000] step = step + 5000 try: - translated_text_part = str(gtranslate.translate(text_part, options["translation_lang"])) + translated_text_part = str(gtranslate.translate(text_part, options["language"])) print("Translated Text part:\n\n " + translated_text_part) except Exception as e: raise Exception(e) @@ -99,7 +107,7 @@ class Translation(DVMTaskInterface): if step < length: text_part = options["text"][step:length] try: - translated_text_part = str(gtranslate.translate(text_part, options["translation_lang"])) + translated_text_part = str(gtranslate.translate(text_part, options["language"])) print("Translated Text part:\n " + translated_text_part) except Exception as e: raise Exception(e) diff --git a/test_client.py b/test_dvm_client.py similarity index 52% rename from test_client.py rename to test_dvm_client.py index 4e7a4bf..55d15a7 100644 --- a/test_client.py +++ b/test_dvm_client.py @@ -1,4 +1,3 @@ - import os import time import datetime as datetime @@ -8,13 +7,14 @@ from threading import Thread import dotenv from nostr_sdk import Keys, Client, Tag, EventBuilder, Filter, HandleNotification, Timestamp, nip04_decrypt +from utils.dvmconfig import DVMConfig from utils.nostr_utils import send_event -from utils.definitions import EventDefinitions, RELAY_LIST +from utils.definitions import EventDefinitions -import utils.env as env -#TODO HINT: Only use this path with a preiously whitelisted privkey, as zapping events is not implemented in the lib/code + +# TODO HINT: Best use this path with a previously whitelisted privkey, as zapping events is not implemented in the lib/code def nostr_client_test_translation(input, kind, lang, sats, satsmax): - keys = Keys.from_sk_str(os.getenv(env.NOSTR_TEST_CLIENT_PRIVATE_KEY)) + keys = Keys.from_sk_str(os.getenv("NOSTR_TEST_CLIENT_PRIVATE_KEY")) if kind == "text": iTag = Tag.parse(["i", input, "text"]) elif kind == "event": @@ -22,43 +22,73 @@ def nostr_client_test_translation(input, kind, lang, sats, satsmax): paramTag1 = Tag.parse(["param", "language", lang]) bidTag = Tag.parse(['bid', str(sats * 1000), str(satsmax * 1000)]) - relaysTag = Tag.parse(['relays', "wss://relay.damus.io", "wss://blastr.f7z.xyz", "wss://relayable.org", "wss://nostr-pub.wellorder.net"]) + relaysTag = Tag.parse(['relays', "wss://relay.damus.io", "wss://blastr.f7z.xyz", "wss://relayable.org", + "wss://nostr-pub.wellorder.net"]) alttag = Tag.parse(["alt", "This is a NIP90 DVM AI task to translate a given Input"]) - event = EventBuilder(EventDefinitions.KIND_NIP90_TRANSLATE_TEXT, str("Translate the given input."), [iTag, paramTag1, bidTag, relaysTag, alttag]).to_event(keys) + event = EventBuilder(EventDefinitions.KIND_NIP90_TRANSLATE_TEXT, str("Translate the given input."), + [iTag, paramTag1, bidTag, relaysTag, alttag]).to_event(keys) relay_list = ["wss://relay.damus.io", "wss://blastr.f7z.xyz", "wss://relayable.org", "wss://nostr-pub.wellorder.net"] - client = Client(keys) for relay in relay_list: client.add_relay(relay) client.connect() - send_event(event, client, keys) + config = DVMConfig + send_event(event, client=client, dvm_config=config) + return event.as_json() + + +def nostr_client_test_image(prompt): + keys = Keys.from_sk_str(os.getenv("NOSTR_TEST_CLIENT_PRIVATE_KEY")) + + iTag = Tag.parse(["i", prompt, "text"]) + outTag = Tag.parse(["output", "image/png;format=url"]) + paramTag1 = Tag.parse(["param", "size", "1024x1024"]) + tTag = Tag.parse(["t", "bitcoin"]) + + bidTag = Tag.parse(['bid', str(1000 * 1000), str(1000 * 1000)]) + relaysTag = Tag.parse(['relays', "wss://relay.damus.io", "wss://blastr.f7z.xyz", "wss://relayable.org", + "wss://nostr-pub.wellorder.net"]) + alttag = Tag.parse(["alt", "This is a NIP90 DVM AI task to translate a given Input"]) + event = EventBuilder(EventDefinitions.KIND_NIP90_GENERATE_IMAGE, str("Generate an Image."), + [iTag, outTag, tTag, paramTag1, bidTag, relaysTag, alttag]).to_event(keys) + + relay_list = ["wss://relay.damus.io", "wss://blastr.f7z.xyz", "wss://relayable.org", + "wss://nostr-pub.wellorder.net"] + + client = Client(keys) + for relay in relay_list: + client.add_relay(relay) + client.connect() + config = DVMConfig + send_event(event, client=client, dvm_config=config) return event.as_json() def nostr_client(): - keys = Keys.from_sk_str(os.getenv(env.NOSTR_TEST_CLIENT_PRIVATE_KEY)) + keys = Keys.from_sk_str(os.getenv("NOSTR_TEST_CLIENT_PRIVATE_KEY")) sk = keys.secret_key() pk = keys.public_key() print(f"Nostr Client public key: {pk.to_bech32()}, Hex: {pk.to_hex()} ") client = Client(keys) - for relay in RELAY_LIST: + dvmconfig = DVMConfig() + for relay in dvmconfig.RELAY_LIST: client.add_relay(relay) client.connect() dm_zap_filter = Filter().pubkey(pk).kinds([EventDefinitions.KIND_DM, - EventDefinitions.KIND_ZAP]).since(Timestamp.now()) # events to us specific + EventDefinitions.KIND_ZAP]).since( + Timestamp.now()) # events to us specific dvm_filter = (Filter().kinds([EventDefinitions.KIND_NIP90_RESULT_TRANSLATE_TEXT, - EventDefinitions.KIND_FEEDBACK]).since(Timestamp.now())) # public events + EventDefinitions.KIND_FEEDBACK]).since(Timestamp.now())) # public events client.subscribe([dm_zap_filter, dvm_filter]) + nostr_client_test_translation("This is the result of the DVM in spanish", "text", "es", 20, 20) + #nostr_client_test_translation("note1p8cx2dz5ss5gnk7c59zjydcncx6a754c0hsyakjvnw8xwlm5hymsnc23rs", "event", "es", 20,20) + #nostr_client_test_translation("44a0a8b395ade39d46b9d20038b3f0c8a11168e67c442e3ece95e4a1703e2beb", "event", "zh", 20, 20) - #nostr_client_test_translation("This is the result of the DVM in spanish", "text", "es", 20, 20) - nostr_client_test_translation("44a0a8b395ade39d46b9d20038b3f0c8a11168e67c442e3ece95e4a1703e2beb", "event", "fr", 20, 20) - - - #nostr_client_test_image(sats=50, satsmax=10) + #nostr_client_test_image("a beautiful purple ostrich watching the sunset") class NotificationHandler(HandleNotification): def handle(self, relay_url, event): print(f"Received new event from {relay_url}: {event.as_json()}") @@ -76,16 +106,14 @@ def nostr_client(): print("[Nostr Client]: " + f"Received new zap:") print(event.as_json()) - def handle_msg(self, relay_url, msg): - None + return client.handle_notifications(NotificationHandler()) while True: time.sleep(5.0) - if __name__ == '__main__': env_path = Path('.env') @@ -95,6 +123,5 @@ if __name__ == '__main__': else: raise FileNotFoundError(f'.env file not found at {env_path} ') - nostr_dvm_thread = Thread(target=nostr_client()) nostr_dvm_thread.start() diff --git a/utils/admin_utils.py b/utils/admin_utils.py index 743b4bc..96a1f54 100644 --- a/utils/admin_utils.py +++ b/utils/admin_utils.py @@ -5,62 +5,72 @@ from nostr_sdk import Keys, EventBuilder, PublicKey from utils.database_utils import get_from_sql_table, list_db, delete_from_sql_table, update_sql_table, \ get_or_add_user, clean_db +from utils.dvmconfig import DVMConfig from utils.nip89_utils import nip89_announce_tasks from utils.nostr_utils import send_event +class AdminConfig: + REBROADCAST_NIP89: bool = False + WHITELISTUSER: bool = False + UNWHITELISTUSER: bool = False + BLACKLISTUSER: bool = False + DELETEUSER: bool = False + LISTDATABASE: bool = False + ClEANDB: bool = False + USERNPUB: str = "" -def admin_make_database_updates(config=None, client=None): +def admin_make_database_updates(adminconfig: AdminConfig = None, dvmconfig: DVMConfig = None, client=None): # This is called on start of Server, Admin function to manually whitelist/blacklist/add balance/delete users - dvmconfig = config + if adminconfig is None or dvmconfig is None: + return - rebroadcast_nip89 = False - cleandb = False - listdatabase = False - deleteuser = False - whitelistuser = False - unwhitelistuser = False - blacklistuser = False - addbalance = False - additional_balance = 50 + if not isinstance(adminconfig, AdminConfig): + return + + if ((adminconfig.WHITELISTUSER is True or adminconfig.UNWHITELISTUSER is True or adminconfig.BLACKLISTUSER is True or adminconfig.DELETEUSER is True) + and adminconfig.USERNPUB == ""): + return + + + db = dvmconfig.DB + + rebroadcast_nip89 = adminconfig.REBROADCAST_NIP89 + cleandb = adminconfig.ClEANDB + listdatabase = adminconfig.LISTDATABASE + deleteuser = adminconfig.DELETEUSER + whitelistuser = adminconfig.WHITELISTUSER + unwhitelistuser = adminconfig.UNWHITELISTUSER + blacklistuser = adminconfig.BLACKLISTUSER + + if adminconfig.USERNPUB != "": + if str(adminconfig.USERNPUB).startswith("npub"): + publickey = PublicKey.from_bech32(adminconfig.USERNPUB).to_hex() + else: + publickey = adminconfig.USERNPUB - # publickey = PublicKey.from_bech32("npub1...").to_hex() - # use this if you have the npub - publickey = "asd123" - #use this if you have hex if whitelistuser: - user = get_or_add_user(publickey) - update_sql_table(user.npub, user.balance, True, False, user.nip05, user.lud16, user.name, user.lastactive) - user = get_from_sql_table(publickey) + user = get_or_add_user(db, publickey, client=client, config=dvmconfig) + update_sql_table(db, user.npub, user.balance, True, False, user.nip05, user.lud16, user.name, user.lastactive) + user = get_from_sql_table(db, publickey) print(str(user.name) + " is whitelisted: " + str(user.iswhitelisted)) if unwhitelistuser: - user = get_from_sql_table(publickey) - update_sql_table(user.npub, user.balance, False, False, user.nip05, user.lud16, user.name, user.lastactive) + user = get_from_sql_table(db, publickey) + update_sql_table(db, user.npub, user.balance, False, False, user.nip05, user.lud16, user.name, user.lastactive) if blacklistuser: - user = get_from_sql_table(publickey) - update_sql_table(user.npub, user.balance, False, True, user.nip05, user.lud16, user.name, user.lastactive) - - if addbalance: - user = get_from_sql_table(publickey) - update_sql_table(user[0], (int(user.balance) + additional_balance), user.iswhitelisted, user.isblacklisted, user.nip05, user.lud16, user.name, user.lastactive) - time.sleep(1.0) - message = str(additional_balance) + " Sats have been added to your balance. Your new balance is " + str( - (int(user.balance) + additional_balance)) + " Sats." - keys = Keys.from_sk_str(config.PRIVATE_KEY) - evt = EventBuilder.new_encrypted_direct_msg(keys, PublicKey.from_hex(publickey), message, - None).to_event(keys) - send_event(evt, key=keys) + user = get_from_sql_table(db, publickey) + update_sql_table(db, user.npub, user.balance, False, True, user.nip05, user.lud16, user.name, user.lastactive) if deleteuser: - delete_from_sql_table(publickey) + delete_from_sql_table(db, publickey) if cleandb: - clean_db() + clean_db(db) if listdatabase: - list_db() + list_db(db) if rebroadcast_nip89: - nip89_announce_tasks(dvmconfig) + nip89_announce_tasks(dvmconfig, client=client) diff --git a/utils/backend_utils.py b/utils/backend_utils.py index e0a0132..b9b1281 100644 --- a/utils/backend_utils.py +++ b/utils/backend_utils.py @@ -1,11 +1,7 @@ import requests - -from tasks.textextractionPDF import TextExtractionPDF from utils.definitions import EventDefinitions from utils.nostr_utils import get_event_by_id -from tasks.translation import Translation - def get_task(event, client, dvmconfig): if event.kind() == EventDefinitions.KIND_NIP90_GENERIC: # use this for events that have no id yet @@ -28,16 +24,16 @@ def get_task(event, client, dvmconfig): if tag.as_vec()[2] == "url": file_type = check_url_is_readable(tag.as_vec()[1]) if file_type == "pdf": - return TextExtractionPDF.TASK + return "pdf-to-text" else: return "unknown job" elif tag.as_vec()[2] == "event": - evt = get_event_by_id(tag.as_vec()[1], config=dvmconfig) + evt = get_event_by_id(tag.as_vec()[1], client=client, config=dvmconfig) if evt is not None: if evt.kind() == 1063: - for tag in evt.tags(): - if tag.as_vec()[0] == 'url': - file_type = check_url_is_readable(tag.as_vec()[1]) + for tg in evt.tags(): + if tg.as_vec()[0] == 'url': + file_type = check_url_is_readable(tg.as_vec()[1]) if file_type == "pdf": return "pdf-to-text" else: @@ -45,9 +41,10 @@ def get_task(event, client, dvmconfig): else: return "unknown type" - elif event.kind() == EventDefinitions.KIND_NIP90_TRANSLATE_TEXT: - return Translation.TASK + return "translation" + elif event.kind() == EventDefinitions.KIND_NIP90_GENERATE_IMAGE: + return "text-to-image" else: return "unknown type" @@ -58,7 +55,7 @@ def check_task_is_supported(event, client, get_duration=False, config=None): input_value = "" input_type = "" duration = 1 - + task = get_task(event, client=client, dvmconfig=dvm_config) for tag in event.tags(): if tag.as_vec()[0] == 'i': if len(tag.as_vec()) < 3: @@ -68,32 +65,32 @@ def check_task_is_supported(event, client, get_duration=False, config=None): input_value = tag.as_vec()[1] input_type = tag.as_vec()[2] if input_type == "event": - evt = get_event_by_id(input_value, config=dvm_config) + evt = get_event_by_id(input_value, client=client, config=dvm_config) if evt is None: print("Event not found") return False, "", 0 + elif input_type == 'url' and check_url_is_readable(input_value) is None: + print("Url not readable / supported") + return False, task, duration elif tag.as_vec()[0] == 'output': output = tag.as_vec()[1] - if not ( - output == "text/plain" or output == "text/json" or output == "json" or output == "image/png" or "image/jpg" or output == ""): + if not (output == "text/plain" + or output == "text/json" or output == "json" + or output == "image/png" or "image/jpg" + or output == "image/png;format=url" or output == "image/jpg;format=url" + or output == ""): print("Output format not supported, skipping..") return False, "", 0 - task = get_task(event, client=client, dvmconfig=dvm_config) - - if input_type == 'url' and check_url_is_readable(input_value) is None: - print("url not readable") - return False, task, duration - - if task not in (x.TASK for x in dvm_config.SUPPORTED_TASKS): - return False, task, duration - - for dvm in dvm_config.SUPPORTED_TASKS: + for dvm in dvm_config.SUPPORTED_DVMS: if dvm.TASK == task: if not dvm.is_input_supported(input_type, event.content()): return False, task, duration + if task not in (x.TASK for x in dvm_config.SUPPORTED_DVMS): + return False, task, duration + return True, task, duration @@ -106,9 +103,9 @@ def check_url_is_readable(url): if content_type == 'audio/x-wav' or str(url).endswith(".wav") or content_type == 'audio/mpeg' or str(url).endswith( ".mp3") or content_type == 'audio/ogg' or str(url).endswith(".ogg"): return "audio" - elif content_type == 'image/png' or str(url).endswith(".png") or content_type == 'image/jpg' or str(url).endswith( - ".jpg") or content_type == 'image/jpeg' or str(url).endswith(".jpeg") or content_type == 'image/png' or str( - url).endswith(".png"): + elif (content_type == 'image/png' or str(url).endswith(".png") or content_type == 'image/jpg' or str(url).endswith( + ".jpg") or content_type == 'image/jpeg' or str(url).endswith(".jpeg") or content_type == 'image/png' or + str(url).endswith(".png")): return "image" elif content_type == 'video/mp4' or str(url).endswith(".mp4") or content_type == 'video/avi' or str(url).endswith( ".avi") or content_type == 'video/mov' or str(url).endswith(".mov"): @@ -121,11 +118,10 @@ def check_url_is_readable(url): def get_amount_per_task(task, dvm_config, duration=1): - print(dvm_config.SUPPORTED_TASKS) - for dvm in dvm_config.SUPPORTED_TASKS: + for dvm in dvm_config.SUPPORTED_DVMS: #this is currently just one if dvm.TASK == task: amount = dvm.COST * duration return amount else: - print("[Nostr] Task " + task + " is currently not supported by this instance, skipping") + print("["+dvm_config.SUPPORTED_DVMS[0].NAME +"] Task " + task + " is currently not supported by this instance, skipping") return None diff --git a/utils/database_utils.py b/utils/database_utils.py index 856dcec..a022645 100644 --- a/utils/database_utils.py +++ b/utils/database_utils.py @@ -1,5 +1,5 @@ # DATABASE LOGIC -import os +import json import sqlite3 import time @@ -8,12 +8,10 @@ from dataclasses import dataclass from datetime import timedelta from logging import Filter -from nostr_sdk import Timestamp, Keys, PublicKey, EventBuilder, Metadata, Filter - -from utils import env -from utils.definitions import NEW_USER_BALANCE +from nostr_sdk import Timestamp, Keys, PublicKey, EventBuilder, Filter, Client, Options from utils.nostr_utils import send_event + @dataclass class User: npub: str @@ -26,10 +24,9 @@ class User: lastactive: int - -def create_sql_table(): +def create_sql_table(db): try: - con = sqlite3.connect(os.getenv(env.USER_DB_PATH)) + con = sqlite3.connect(db) cur = con.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS users ( npub text PRIMARY KEY, @@ -48,9 +45,9 @@ def create_sql_table(): print(e) -def add_sql_table_column(): +def add_sql_table_column(db): try: - con = sqlite3.connect(os.getenv(env.USER_DB_PATH)) + con = sqlite3.connect(db) cur = con.cursor() cur.execute(""" ALTER TABLE users ADD COLUMN lastactive 'integer' """) con.close() @@ -58,23 +55,23 @@ def add_sql_table_column(): print(e) -def add_to_sql_table(npub, sats, iswhitelisted, isblacklisted, nip05, lud16, name, lastactive): +def add_to_sql_table(db, npub, sats, iswhitelisted, isblacklisted, nip05, lud16, name, lastactive): try: - con = sqlite3.connect(os.getenv(env.USER_DB_PATH)) + con = sqlite3.connect(db) cur = con.cursor() data = (npub, sats, iswhitelisted, isblacklisted, nip05, lud16, name, lastactive) cur.execute("INSERT or IGNORE INTO users VALUES(?, ?, ?, ?, ?, ?, ?, ?)", data) con.commit() con.close() except Error as e: - print(e) + print("Error when Adding to DB: " + str(e)) -def update_sql_table(npub, sats, iswhitelisted, isblacklisted, nip05, lud16, name, lastactive): +def update_sql_table(db, npub, balance, iswhitelisted, isblacklisted, nip05, lud16, name, lastactive): try: - con = sqlite3.connect(os.getenv(env.USER_DB_PATH)) + con = sqlite3.connect(db) cur = con.cursor() - data = (sats, iswhitelisted, isblacklisted, nip05, lud16, name, lastactive, npub) + data = (balance, iswhitelisted, isblacklisted, nip05, lud16, name, lastactive, npub) cur.execute(""" UPDATE users SET sats = ? , @@ -88,35 +85,38 @@ def update_sql_table(npub, sats, iswhitelisted, isblacklisted, nip05, lud16, nam con.commit() con.close() except Error as e: - print(e) + print("Error Updating DB: " + str(e)) -def get_from_sql_table(npub): +def get_from_sql_table(db, npub): try: - con = sqlite3.connect(os.getenv(env.USER_DB_PATH)) + con = sqlite3.connect(db) cur = con.cursor() cur.execute("SELECT * FROM users WHERE npub=?", (npub,)) row = cur.fetchone() con.close() - user = User - user.npub = row[0] - user.balance = row[1] - user.iswhitelisted = row[2] - user.isblacklisted = row[3] - user.nip05 = row[4] - user.lud16 = row[5] - user.name = row[6] - user.lastactive = row[7] + if row is None: + return None + else: + user = User + user.npub = row[0] + user.balance = row[1] + user.iswhitelisted = row[2] + user.isblacklisted = row[3] + user.nip05 = row[4] + user.lud16 = row[5] + user.name = row[6] + user.lastactive = row[7] - return user + return user except Error as e: - print(e) + print("Error Getting from DB: " + str(e)) -def delete_from_sql_table(npub): +def delete_from_sql_table(db, npub): try: - con = sqlite3.connect(os.getenv(env.USER_DB_PATH)) + con = sqlite3.connect(db) cur = con.cursor() cur.execute("DELETE FROM users WHERE npub=?", (npub,)) con.commit() @@ -125,24 +125,24 @@ def delete_from_sql_table(npub): print(e) -def clean_db(): +def clean_db(db): try: - con = sqlite3.connect(os.getenv(env.USER_DB_PATH)) + con = sqlite3.connect(db) cur = con.cursor() cur.execute("SELECT * FROM users WHERE npub IS NULL OR npub = '' ") rows = cur.fetchall() for row in rows: print(row) - delete_from_sql_table(row[0]) + delete_from_sql_table(db, row[0]) con.close() return rows except Error as e: print(e) -def list_db(): +def list_db(db): try: - con = sqlite3.connect(os.getenv(env.USER_DB_PATH)) + con = sqlite3.connect(db) cur = con.cursor() cur.execute("SELECT * FROM users ORDER BY sats DESC") rows = cur.fetchall() @@ -153,70 +153,75 @@ def list_db(): print(e) -def update_user_balance(sender, sats, config=None): - user = get_from_sql_table(sender) +def update_user_balance(db, npub, additional_sats, client, config): + user = get_from_sql_table(db, npub) if user is None: - add_to_sql_table(sender, (int(sats) + NEW_USER_BALANCE), False, False, - "", "", "", Timestamp.now().as_secs()) - print("NEW USER: " + sender + " Zap amount: " + str(sats) + " Sats.") + name, nip05, lud16 = fetch_user_metadata(npub, client) + add_to_sql_table(db, npub, (int(additional_sats) + config.NEW_USER_BALANCE), False, False, + nip05, lud16, name, Timestamp.now().as_secs()) + print("Adding User: " + npub + " (" + npub + ")") else: - user = get_from_sql_table(sender) - print(str(sats)) - - - if user.nip05 is None: - user.nip05 = "" - if user.lud16 is None: - user.lud16 = "" - if user.name is None: - user.name = "" - - new_balance = int(user.balance) + int(sats) - update_sql_table(sender, new_balance, user.iswhitelisted, user.isblacklisted, user.nip05, user.lud16, user.name, + user = get_from_sql_table(db, npub) + new_balance = int(user.balance) + int(additional_sats) + update_sql_table(db, npub, new_balance, user.iswhitelisted, user.isblacklisted, user.nip05, user.lud16, + user.name, Timestamp.now().as_secs()) - print("UPDATE USER BALANCE: " + str(user.name) + " Zap amount: " + str(sats) + " Sats.") - + print("Updated user balance for: " + str(user.name) + + " Zap amount: " + str(additional_sats) + " Sats. New balance: " + str(new_balance) +" Sats") if config is not None: keys = Keys.from_sk_str(config.PRIVATE_KEY) time.sleep(1.0) - message = ("Added "+ str(sats) + " Sats to balance. New balance is " + str(new_balance) + " Sats. " ) + message = ("Added " + str(additional_sats) + " Sats to balance. New balance is " + str(new_balance) + " Sats.") - - evt = EventBuilder.new_encrypted_direct_msg(keys, PublicKey.from_hex(sender), message, + evt = EventBuilder.new_encrypted_direct_msg(keys, PublicKey.from_hex(npub), message, None).to_event(keys) - send_event(evt, key=keys) + send_event(evt, client=client, dvm_config=config) -def get_or_add_user(sender): - user = get_from_sql_table(sender) - +def get_or_add_user(db, npub, client, config): + user = get_from_sql_table(db, npub) if user is None: - add_to_sql_table(sender, NEW_USER_BALANCE, False, False, None, - None, None, Timestamp.now().as_secs()) - user = get_from_sql_table(sender) - print(user) + try: + name, nip05, lud16 = fetch_user_metadata(npub, client) + print("Adding User: " + npub + " (" + npub + ")") + add_to_sql_table(db, npub, config.NEW_USER_BALANCE, False, False, nip05, + lud16, name, Timestamp.now().as_secs()) + user = get_from_sql_table(db, npub) + return user + except Exception as e: + print("Error Adding User to DB: " + str(e)) return user -def update_user_metadata(sender, client): - user = get_from_sql_table(sender) - try: - profile_filter = Filter().kind(0).author(sender).limit(1) - events = client.get_events_of([profile_filter], timedelta(seconds=3)) - if len(events) > 0: - ev = events[0] - metadata = Metadata.from_json(ev.content()) - name = metadata.get_display_name() - if str(name) == "" or name is None: - user.name = metadata.get_name() - user.nip05 = metadata.get_nip05() - user.lud16 = metadata.get_lud16() - except: - print("Couldn't get meta information") - update_sql_table(user.npub, user.balance, user.iswhitelisted, user.isblacklisted, user.nip05, user.lud16, - user.name, Timestamp.now().as_secs()) - user = get_from_sql_table(user.npub) - return user +class DvmConfig: + pass + + +def fetch_user_metadata(npub, client): + name = "" + nip05 = "" + lud16 = "" + pk = PublicKey.from_hex(npub) + print(f"\nGetting profile metadata for {pk.to_bech32()}...") + profile_filter = Filter().kind(0).author(pk).limit(1) + events = client.get_events_of([profile_filter], timedelta(seconds=5)) + #TODO, it seems our client is still subscribed after that + + if len(events) > 0: + latest_entry = events[0] + latest_time = 0 + for entry in events: + if entry.created_at().as_secs() > latest_time: + latest_time = entry.created_at().as_secs() + latest_entry = entry + profile = json.loads(latest_entry.content()) + if profile.get("name"): + name = profile['name'] + if profile.get("nip05"): + nip05 = profile['nip05'] + if profile.get("lud16"): + lud16 = profile['lud16'] + return name, nip05, lud16 diff --git a/utils/definitions.py b/utils/definitions.py index 04f5c8b..28a173e 100644 --- a/utils/definitions.py +++ b/utils/definitions.py @@ -1,14 +1,7 @@ +import os from dataclasses import dataclass from nostr_sdk import Event - -NEW_USER_BALANCE: int = 250 # Free credits for new users - -RELAY_LIST = ["wss://relay.damus.io", "wss://nostr-pub.wellorder.net", "wss://nos.lol", "wss://nostr.wine", - "wss://relay.nostfiles.dev", "wss://nostr.mom", "wss://nostr.oxtr.dev", "wss://relay.nostr.bg", - "wss://relay.f7z.io"] - - class EventDefinitions: KIND_DM: int = 4 KIND_ZAP: int = 9735 @@ -40,24 +33,6 @@ class EventDefinitions: KIND_NIP90_RESULT_GENERIC] -class DVMConfig: - SUPPORTED_TASKS = [] - PRIVATE_KEY: str - - RELAY_LIST = ["wss://relay.damus.io", "wss://nostr-pub.wellorder.net", "wss://nos.lol", "wss://nostr.wine", - "wss://relay.nostfiles.dev", "wss://nostr.mom", "wss://nostr.oxtr.dev", "wss://relay.nostr.bg", - "wss://relay.f7z.io"] - RELAY_TIMEOUT = 5 - LNBITS_INVOICE_KEY = '' - LNBITS_URL = 'https://lnbits.com' - REQUIRES_NIP05: bool = False - - SHOWRESULTBEFOREPAYMENT: bool = True # if this is true show results even when not paid right after autoprocess - - - NIP89s: list = [] - - @dataclass class JobToWatch: event_id: str diff --git a/utils/dvmconfig.py b/utils/dvmconfig.py new file mode 100644 index 0000000..a42d479 --- /dev/null +++ b/utils/dvmconfig.py @@ -0,0 +1,26 @@ +import os + +from utils.nip89_utils import NIP89Announcement + + +class DVMConfig: + SUPPORTED_DVMS= [] + PRIVATE_KEY: str = os.getenv("NOSTR_PRIVATE_KEY") + COST: int = None + + RELAY_LIST = ["wss://relay.damus.io", "wss://nostr-pub.wellorder.net", "wss://nos.lol", "wss://nostr.wine", + "wss://relay.nostfiles.dev", "wss://nostr.mom", "wss://nostr.oxtr.dev", "wss://relay.nostr.bg", + "wss://relay.f7z.io"] + + RELAY_TIMEOUT = 3 + LNBITS_INVOICE_KEY = '' + LNBITS_URL = 'https://lnbits.com' + DB: str + NEW_USER_BALANCE: int = 250 # Free credits for new users + NIP89: NIP89Announcement + DM_ALLOWED = [] + + SHOW_RESULT_BEFORE_PAYMENT: bool = False # if this is true show results even when not paid right after autoprocess + + + diff --git a/utils/env.py b/utils/env.py deleted file mode 100644 index 1fef1d1..0000000 --- a/utils/env.py +++ /dev/null @@ -1,12 +0,0 @@ -NOSTR_PRIVATE_KEY = "NOSTR_PRIVATE_KEY" -NOSTR_TEST_CLIENT_PRIVATE_KEY = "NOSTR_TEST_CLIENT_PRIVATE_KEY" - -USER_DB_PATH = "USER_DB_PATH" - -LNBITS_INVOICE_KEY = "LNBITS_INVOICE_KEY" -LNBITS_HOST = "LNBITS_HOST" - -TASK_TRANSLATION_NIP89_DTAG = "TASK_TRANSLATION_NIP89_DTAG" -TASK_TEXTEXTRACTION_NIP89_DTAG = "TASK_TEXTEXTRACTION_NIP89_DTAG" - - diff --git a/utils/nip89_utils.py b/utils/nip89_utils.py index 220e53c..be918dd 100644 --- a/utils/nip89_utils.py +++ b/utils/nip89_utils.py @@ -1,19 +1,26 @@ from nostr_sdk import Tag, Keys, EventBuilder + from utils.nostr_utils import send_event + class NIP89Announcement: + name: str kind: int dtag: str pk: str content: str -def nip89_announce_tasks(dvmconfig): - for nip89 in dvmconfig.NIP89s: - k_tag = Tag.parse(["k", str(nip89.kind)]) - d_tag = Tag.parse(["d", nip89.dtag]) - keys = Keys.from_sk_str(nip89.pk) - content = nip89.content - event = EventBuilder(31990, content, [k_tag, d_tag]).to_event(keys) - send_event(event, key=keys) - print("Announced NIP 89") \ No newline at end of file +class NIP89Config: + DTAG: str + CONTENT: str + + +def nip89_announce_tasks(dvm_config, client): + k_tag = Tag.parse(["k", str(dvm_config.NIP89.kind)]) + d_tag = Tag.parse(["d", dvm_config.NIP89.dtag]) + keys = Keys.from_sk_str(dvm_config.NIP89.pk) + content = dvm_config.NIP89.content + event = EventBuilder(31990, content, [k_tag, d_tag]).to_event(keys) + send_event(event, client=client, dvm_config=dvm_config) + print("Announced NIP 89 for " + dvm_config.NIP89.name) diff --git a/utils/nostr_utils.py b/utils/nostr_utils.py index 86e6a2a..b30ee6e 100644 --- a/utils/nostr_utils.py +++ b/utils/nostr_utils.py @@ -1,89 +1,59 @@ from datetime import timedelta -from nostr_sdk import Keys, Filter, Client, Alphabet, EventId, Options - -from utils.definitions import RELAY_LIST +from nostr_sdk import Keys, Filter, Client, Alphabet, EventId, Options, Event, PublicKey -def get_event_by_id(event_id, client=None, config=None): - is_new_client = False - if client is None: - keys = Keys.from_sk_str(config.PRIVATE_KEY) - client = Client(keys) - for relay in config.RELAY_LIST: - client.add_relay(relay) - client.connect() - is_new_client = True - +def get_event_by_id(event_id: str, client: Client, config=None) -> Event | None: split = event_id.split(":") if len(split) == 3: - id_filter = Filter().author(split[1]).custom_tag(Alphabet.D, [split[2]]) + pk = PublicKey.from_hex(split[1]) + id_filter = Filter().author(pk).custom_tag(Alphabet.D, [split[2]]) events = client.get_events_of([id_filter], timedelta(seconds=config.RELAY_TIMEOUT)) else: + if str(event_id).startswith('note'): + event_id = EventId.from_bech32(event_id) + else: + event_id = EventId.from_hex(event_id) + id_filter = Filter().id(event_id).limit(1) events = client.get_events_of([id_filter], timedelta(seconds=config.RELAY_TIMEOUT)) - if is_new_client: - client.disconnect() if len(events) > 0: return events[0] else: return None -def get_referenced_event_by_id(event_id, kinds=None, client=None, config=None): - if kinds is None: - kinds = [] - is_new_client = False - if client is None: - keys = Keys.from_sk_str(config.PRIVATE_KEY) - client = Client(keys) - for relay in config.RELAY_LIST: - client.add_relay(relay) - client.connect() - is_new_client = True +def get_referenced_event_by_id(event_id, client, dvm_config, kinds) -> Event | None: if kinds is None: kinds = [] + if len(kinds) > 0: job_id_filter = Filter().kinds(kinds).event(EventId.from_hex(event_id)).limit(1) else: job_id_filter = Filter().event(EventId.from_hex(event_id)).limit(1) - events = client.get_events_of([job_id_filter], timedelta(seconds=config.RELAY_TIMEOUT)) + events = client.get_events_of([job_id_filter], timedelta(seconds=dvm_config.RELAY_TIMEOUT)) - if is_new_client: - client.disconnect() if len(events) > 0: return events[0] else: return None -def send_event(event, client=None, key=None): +def send_event(event: Event, client: Client, dvm_config) -> EventId: relays = [] - is_new_client = False for tag in event.tags(): if tag.as_vec()[0] == 'relays': relays = tag.as_vec()[1].split(',') - if client is None: - opts = Options().wait_for_send(False).send_timeout(timedelta(seconds=5)).skip_disconnected_relays(True) - client = Client.with_opts(key, opts) - for relay in RELAY_LIST: - client.add_relay(relay) - client.connect() - is_new_client = True - for relay in relays: - if relay not in RELAY_LIST: + if relay not in dvm_config.RELAY_LIST: client.add_relay(relay) event_id = client.send_event(event) for relay in relays: - if relay not in RELAY_LIST: + if relay not in dvm_config.RELAY_LIST: client.remove_relay(relay) - if is_new_client: - client.disconnect() - return event_id diff --git a/utils/output_utils.py b/utils/output_utils.py index 06536f6..6d1b0cd 100644 --- a/utils/output_utils.py +++ b/utils/output_utils.py @@ -1,17 +1,25 @@ import json import datetime as datetime +import os from types import NoneType +import emoji +import requests +from pyupload.uploader import CatboxUploader + import pandas +''' +Post process results to either given output format or a Nostr readable plain text. +''' + def post_process_result(anno, original_event): - print("post-processing...") + print("Post-processing...") if isinstance(anno, pandas.DataFrame): # if input is an anno we parse it to required output format for tag in original_event.tags(): print(tag.as_vec()[0]) if tag.as_vec()[0] == "output": - print("HAS OUTPUT TAG") output_format = tag.as_vec()[1] print("requested output is " + str(tag.as_vec()[1]) + "...") try: @@ -80,11 +88,97 @@ def post_process_result(anno, original_event): elif isinstance(anno, NoneType): return "An error occurred" else: - result = replace_broken_words(anno) #TODO + result = replace_broken_words(anno) # TODO return result +''' +Convenience function to replace words like Noster with Nostr +''' + + def replace_broken_words(text): result = (text.replace("Noster", "Nostr").replace("Nostra", "Nostr").replace("no stir", "Nostr"). replace("Nostro", "Nostr").replace("Impub", "npub").replace("sets", "Sats")) return result + + +''' +Function to upload to Nostr.build and if it fails to Nostrfiles.dev +Larger files than these hosters allow and fallback is catbox currently. +Will probably need to switch to another system in the future. +''' + + +def upload_media_to_hoster(filepath: str): + print("Uploading image: " + filepath) + try: + files = {'file': open(filepath, 'rb')} + file_stats = os.stat(filepath) + sizeinmb = file_stats.st_size / (1024 * 1024) + print("Filesize of Uploaded media: " + str(sizeinmb) + " Mb.") + if sizeinmb > 25: + uploader = CatboxUploader(filepath) + result = uploader.execute() + return result + else: + url = 'https://nostr.build/api/v2/upload/files' + response = requests.post(url, files=files) + json_object = json.loads(response.text) + result = json_object["data"][0]["url"] + return result + except: + try: + file = {'file': open(filepath, 'rb')} + url = 'https://nostrfiles.dev/upload_image' + response = requests.post(url, files=file) + json_object = json.loads(response.text) + print(json_object["url"]) + return json_object["url"] + # fallback filehoster + except: + + try: + uploader = CatboxUploader(filepath) + result = uploader.execute() + print(result) + return result + except: + return "Upload not possible, all hosters didn't work" + + +def build_status_reaction(status, task, amount, content): + alt_description = "This is a reaction to a NIP90 DVM AI task. " + + if status == "processing": + alt_description = "NIP90 DVM AI task " + task + " started processing. " + reaction = alt_description + emoji.emojize(":thumbs_up:") + elif status == "success": + alt_description = "NIP90 DVM AI task " + task + " finished successfully. " + reaction = alt_description + emoji.emojize(":call_me_hand:") + elif status == "chain-scheduled": + alt_description = "NIP90 DVM AI task " + task + " Chain Task scheduled" + reaction = alt_description + emoji.emojize(":thumbs_up:") + elif status == "error": + alt_description = "NIP90 DVM AI task " + task + " had an error. " + if content is None: + reaction = alt_description + emoji.emojize(":thumbs_down:") + else: + reaction = alt_description + emoji.emojize(":thumbs_down:") + content + + elif status == "payment-required": + alt_description = "NIP90 DVM AI task " + task + " requires payment of min " + str( + amount) + " Sats. " + reaction = alt_description + emoji.emojize(":orange_heart:") + + elif status == "payment-rejected": + alt_description = "NIP90 DVM AI task " + task + " payment is below required amount of " + str( + amount) + " Sats. " + reaction = alt_description + emoji.emojize(":thumbs_down:") + elif status == "user-blocked-from-service": + alt_description = "NIP90 DVM AI task " + task + " can't be performed. User has been blocked from Service. " + reaction = alt_description + emoji.emojize(":thumbs_down:") + else: + reaction = emoji.emojize(":thumbs_down:") + + return alt_description, reaction diff --git a/utils/zap_utils.py b/utils/zap_utils.py index b81a627..2e6c372 100644 --- a/utils/zap_utils.py +++ b/utils/zap_utils.py @@ -4,10 +4,12 @@ import json import requests from Crypto.Cipher import AES from bech32 import bech32_decode, convertbits -from nostr_sdk import PublicKey, nostr_sdk +from nostr_sdk import nostr_sdk, PublicKey, SecretKey, Event +from utils.dvmconfig import DVMConfig +from utils.nostr_utils import get_event_by_id -def parse_bolt11_invoice(invoice): +def parse_amount_from_bolt11_invoice(bolt11_invoice: str) -> int: def get_index_of_first_letter(ip): index = 0 for c in ip: @@ -17,7 +19,7 @@ def parse_bolt11_invoice(invoice): index = index + 1 return len(ip) - remaining_invoice = invoice[4:] + remaining_invoice = bolt11_invoice[4:] index = get_index_of_first_letter(remaining_invoice) identifier = remaining_invoice[index] number_string = remaining_invoice[:index] @@ -33,44 +35,82 @@ def parse_bolt11_invoice(invoice): return int(number) -def create_bolt11_ln_bits(sats, config): + +def parse_zap_event_tags(zap_event, keys, name, client, config): + zapped_event = None + invoice_amount = 0 + anon = False + sender = zap_event.pubkey() + + for tag in zap_event.tags(): + if tag.as_vec()[0] == 'bolt11': + invoice_amount = parse_amount_from_bolt11_invoice(tag.as_vec()[1]) + elif tag.as_vec()[0] == 'e': + zapped_event = get_event_by_id(tag.as_vec()[1], client=client, config=config) + elif tag.as_vec()[0] == 'description': + zap_request_event = Event.from_json(tag.as_vec()[1]) + sender = check_for_zapplepay(zap_request_event.pubkey().to_hex(), + zap_request_event.content()) + for z_tag in zap_request_event.tags(): + if z_tag.as_vec()[0] == 'anon': + if len(z_tag.as_vec()) > 1: + print("[" + name + "] Private Zap received.") + decrypted_content = decrypt_private_zap_message(z_tag.as_vec()[1], + keys.secret_key(), + zap_request_event.pubkey()) + decrypted_private_event = Event.from_json(decrypted_content) + if decrypted_private_event.kind() == 9733: + sender = decrypted_private_event.pubkey().to_hex() + message = decrypted_private_event.content() + if message != "": + print("Zap Message: " + message) + else: + anon = True + print( + "[" + name + "] Anonymous Zap received. Unlucky, I don't know from whom, and never will") + + return invoice_amount, zapped_event, sender, anon + + +def create_bolt11_ln_bits(sats: int, config: DVMConfig) -> (str, str): url = config.LNBITS_URL + "/api/v1/payments" - data = {'out': False, 'amount': sats, 'memo': "Nostr-DVM"} + data = {'out': False, 'amount': sats, 'memo': "Nostr-DVM " + config.NIP89.name} headers = {'X-API-Key': config.LNBITS_INVOICE_KEY, 'Content-Type': 'application/json', 'charset': 'UTF-8'} try: res = requests.post(url, json=data, headers=headers) obj = json.loads(res.text) return obj["payment_request"], obj["payment_hash"] except Exception as e: - print(e) - return None + print("LNBITS: " + str(e)) + return None, None -def check_bolt11_ln_bits_is_paid(payment_hash, config): + +def check_bolt11_ln_bits_is_paid(payment_hash: str, config: DVMConfig): url = config.LNBITS_URL + "/api/v1/payments/" + payment_hash headers = {'X-API-Key': config.LNBITS_INVOICE_KEY, 'Content-Type': 'application/json', 'charset': 'UTF-8'} try: res = requests.get(url, headers=headers) obj = json.loads(res.text) - return obj["paid"] + return obj["paid"] #TODO cast except Exception as e: return None # DECRYPT ZAPS -def check_for_zapplepay(sender, content): +def check_for_zapplepay(pubkey_hex: str, content: str): try: # Special case Zapplepay - if sender == PublicKey.from_bech32("npub1wxl6njlcgygduct7jkgzrvyvd9fylj4pqvll6p32h59wyetm5fxqjchcan").to_hex(): + if pubkey_hex == PublicKey.from_bech32("npub1wxl6njlcgygduct7jkgzrvyvd9fylj4pqvll6p32h59wyetm5fxqjchcan").to_hex(): real_sender_bech32 = content.replace("From: nostr:", "") - sender = PublicKey.from_bech32(real_sender_bech32).to_hex() - return sender + pubkey_hex = PublicKey.from_bech32(real_sender_bech32).to_hex() + return pubkey_hex except Exception as e: print(e) - return sender + return pubkey_hex -def decrypt_private_zap_message(msg, privkey, pubkey): +def decrypt_private_zap_message(msg: str, privkey: SecretKey, pubkey: PublicKey): shared_secret = nostr_sdk.generate_shared_key(privkey, pubkey) if len(shared_secret) != 16 and len(shared_secret) != 32: return "invalid shared secret size" @@ -92,4 +132,3 @@ def decrypt_private_zap_message(msg, privkey, pubkey): return decoded except Exception as ex: return str(ex) -