diff --git a/dvm.py b/dvm.py index eb65ecf..65a0742 100644 --- a/dvm.py +++ b/dvm.py @@ -2,15 +2,16 @@ from nostr_sdk import PublicKey, Keys, Client, Tag, Event, EventBuilder, Filter, init_logger, LogLevel import time import emoji -from utils.definitions import EventDefinitions, DVMConfig, RequiredJobToWatch, JobToWatch, LOCAL_TASKS + +from tasks.textextractionPDF import TextExtractionPDF +from tasks.translation import Translation +from utils.definitions import EventDefinitions, DVMConfig, RequiredJobToWatch, JobToWatch from utils.admin_utils import admin_make_database_updates -from utils.ai_utils import extract_text_from_pdf, google_translate from utils.backend_utils import get_amount_per_task, check_task_is_supported, get_task from utils.database_utils import update_sql_table, get_from_sql_table, \ create_sql_table, get_or_add_user, update_user_balance from utils.nostr_utils import get_event_by_id, get_referenced_event_by_id, send_event from utils.output_utils import post_process_result -from utils.requestform_utils import create_requestform_from_nostr_event from utils.zap_utils import check_bolt11_ln_bits_is_paid, parse_bolt11_invoice, \ check_for_zapplepay, decrypt_private_zap_message, create_bolt11_ln_bits @@ -50,14 +51,14 @@ def dvm(config): def handle(self, relay_url, nostr_event): if EventDefinitions.KIND_NIP90_EXTRACT_TEXT <= nostr_event.kind() <= EventDefinitions.KIND_NIP90_GENERIC: print(f"[Nostr] Received new NIP90 Job Request from {relay_url}: {nostr_event.as_json()}") - handle_nip90_job_event(nostr_event) + handle_nip90_job_event(nostr_event, dvm_config) elif nostr_event.kind() == EventDefinitions.KIND_ZAP: handle_zap(nostr_event) def handle_msg(self, relay_url, msg): return - def handle_nip90_job_event(event): + def handle_nip90_job_event(event, dvm_config): user = get_or_add_user(event.pubkey().to_hex()) is_whitelisted = user[2] is_blacklisted = user[3] @@ -153,8 +154,7 @@ def dvm(config): job_event = get_event_by_id(tag.as_vec()[1], config=dvm_config) task_supported, task, duration = check_task_is_supported(job_event, client=client, - get_duration=False, - config=dvm_config) + get_duration=False, config=dvm_config) if job_event is not None and task_supported: if amount <= invoice_amount: print("[Nostr] Payment-request fulfilled...") @@ -201,46 +201,241 @@ def dvm(config): print(f"Error during content decryption: {e}") def do_work(job_event, is_from_bot=False): - if (( - EventDefinitions.KIND_NIP90_EXTRACT_TEXT <= job_event.kind() <= EventDefinitions.KIND_NIP90_GENERIC) + if ((EventDefinitions.KIND_NIP90_EXTRACT_TEXT <= job_event.kind() <= EventDefinitions.KIND_NIP90_GENERIC) or job_event.kind() == EventDefinitions.KIND_DM): - #We're building a request form here because we want to send larger tasks to a processing server later - request_form = create_requestform_from_nostr_event(job_event, is_from_bot, client, dvm_config) task = get_task(job_event, client=client, dvmconfig=dvm_config) - # TASKS that take time or need GPU are moved to a backend server in the future - if task not in LOCAL_TASKS: - if task.startswith("unknown"): - print("Task not (yet) supported") - return + result = "" + try: + if task == Translation.TASK: + request_form = Translation.create_requestform_from_nostr_event(job_event,client,dvm_config) + options = setOptions(request_form) + result = Translation.process(options) + + elif task == TextExtractionPDF.TASK: + request_form = TextExtractionPDF.create_requestform_from_nostr_event(job_event, client, dvm_config) + options = setOptions(request_form) + result = TextExtractionPDF.process(options) + + #TODO Add more tasks here + + check_and_return_event(result, str(job_event.as_json()), dvm_key=dvm_config.PRIVATE_KEY) + + except Exception as e: + respond_to_error(e, job_event.as_json(), is_from_bot, dvm_config.PRIVATE_KEY) + + + def setOptions(request_form): + print("Setting options...") + opts = [] + if request_form.get("optStr"): + for k, v in [option.split("=") for option in request_form["optStr"].split(";")]: + t = (k, v) + opts.append(t) + print(k + "=" + v) + print("...done.") + return dict(opts) + + def check_event_has_not_unifinished_job_input(nevent, append, client, dvmconfig): + task_supported, task, duration = check_task_is_supported(nevent, client, False, config=dvmconfig) + if not task_supported: + return False + + for tag in nevent.tags(): + if tag.as_vec()[0] == 'i': + if len(tag.as_vec()) < 3: + print("Job Event missing/malformed i tag, skipping..") + return False else: - print("[Nostr] Scheduling " + task + " Job event: " + job_event.as_json()) - print("We will employ a backend server here in the future") + input = tag.as_vec()[1] + input_type = tag.as_vec()[2] + if input_type == "job": + evt = get_referenced_event_by_id(input, EventDefinitions.ANY_RESULT, client, config=dvmconfig) + if evt is None: + if append: + job = RequiredJobToWatch(event=nevent, timestamp=Timestamp.now().as_secs()) + jobs_on_hold_list.append(job) + send_job_status_reaction(nevent, "chain-scheduled", True, 0, client=client, + config=dvmconfig) + return False + else: + return True + + def send_job_status_reaction(original_event, status, is_paid=True, amount=0, client=None, content=None, config=None, + key=None): + dvmconfig = config + altdesc = "This is a reaction to a NIP90 DVM AI task. " + task = get_task(original_event, client=client, dvmconfig=dvmconfig) + if status == "processing": + altdesc = "NIP90 DVM AI task " + task + " started processing. " + reaction = altdesc + emoji.emojize(":thumbs_up:") + elif status == "success": + altdesc = "NIP90 DVM AI task " + task + " finished successfully. " + reaction = altdesc + emoji.emojize(":call_me_hand:") + elif status == "chain-scheduled": + altdesc = "NIP90 DVM AI task " + task + " Chain Task scheduled" + reaction = altdesc + emoji.emojize(":thumbs_up:") + elif status == "error": + altdesc = "NIP90 DVM AI task " + task + " had an error. " + if content is None: + reaction = altdesc + emoji.emojize(":thumbs_down:") else: - print("[Nostr] Scheduling local " + task + " Job event: " + job_event.as_json()) - result = "" - print("Setting options...") - opts = [] - if request_form.get("optStr"): - for k, v in [option.split("=") for option in request_form["optStr"].split(";")]: - t = (k, v) - opts.append(t) - print(k + "=" + v) - print("...done.") - options = dict(opts) + reaction = altdesc + emoji.emojize(":thumbs_down:") + content - if task == "translation": - result = google_translate(options["text"], options["translation_lang"]) - elif task == "pdf-to-text": - result = extract_text_from_pdf(options["url"]) - # TODO ADD FURTHER LOCAL TASKS HERE + elif status == "payment-required": - check_and_return_event(result, str(job_event.as_json()), - dvm_key=dvm_config.PRIVATE_KEY) + altdesc = "NIP90 DVM AI task " + task + " requires payment of min " + str(amount) + " Sats. " + reaction = altdesc + emoji.emojize(":orange_heart:") + elif status == "payment-rejected": + altdesc = "NIP90 DVM AI task " + task + " payment is below required amount of " + str(amount) + " Sats. " + reaction = altdesc + emoji.emojize(":thumbs_down:") + elif status == "user-blocked-from-service": + + altdesc = "NIP90 DVM AI task " + task + " can't be performed. User has been blocked from Service. " + reaction = altdesc + emoji.emojize(":thumbs_down:") + else: + reaction = emoji.emojize(":thumbs_down:") + + etag = Tag.parse(["e", original_event.id().to_hex()]) + ptag = Tag.parse(["p", original_event.pubkey().to_hex()]) + alttag = Tag.parse(["alt", altdesc]) + statustag = Tag.parse(["status", status]) + tags = [etag, ptag, alttag, statustag] + + if status == "success" or status == "error": # + for x in job_list: + if x.event_id == original_event.id(): + is_paid = x.is_paid + amount = x.amount + break + + bolt11 = "" + payment_hash = "" + expires = original_event.created_at().as_secs() + (60 * 60 * 24) + if status == "payment-required" or (status == "processing" and not is_paid): + if dvmconfig.LNBITS_INVOICE_KEY != "": + try: + bolt11, payment_hash = create_bolt11_ln_bits(amount, dvmconfig) + except Exception as e: + print(e) + + if not any(x.event_id == original_event.id().to_hex() for x in job_list): + job_list.append( + JobToWatch(event_id=original_event.id().to_hex(), timestamp=original_event.created_at().as_secs(), + amount=amount, + is_paid=is_paid, + status=status, result="", is_processed=False, bolt11=bolt11, payment_hash=payment_hash, + expires=expires, from_bot=False)) + print(str(job_list)) + if status == "payment-required" or status == "payment-rejected" or (status == "processing" and not is_paid) or ( + status == "success" and not is_paid): + + if dvmconfig.LNBITS_INVOICE_KEY != "": + amount_tag = Tag.parse(["amount", str(amount * 1000), bolt11]) + else: + amount_tag = Tag.parse(["amount", str(amount * 1000)]) # to millisats + tags.append(amount_tag) + if key is not None: + keys = Keys.from_sk_str(key) + else: + keys = Keys.from_sk_str(dvmconfig.PRIVATE_KEY) + event = EventBuilder(EventDefinitions.KIND_FEEDBACK, reaction, tags).to_event(keys) + + send_event(event, key=keys) + print( + "[Nostr] Sent Kind " + str(EventDefinitions.KIND_FEEDBACK) + " Reaction: " + status + " " + event.as_json()) + return event.as_json() + + def check_and_return_event(data, original_event_str: str, dvm_key=""): + original_event = Event.from_json(original_event_str) + keys = Keys.from_sk_str(dvm_key) + + for x in job_list: + if x.event_id == original_event.id().to_hex(): + is_paid = x.is_paid + amount = x.amount + x.result = data + x.is_processed = True + if dvm_config.SHOWRESULTBEFOREPAYMENT and not is_paid: + send_nostr_reply_event(data, original_event_str, key=keys) + send_job_status_reaction(original_event, "success", amount, + config=dvm_config) # or payment-required, or both? + elif not dvm_config.SHOWRESULTBEFOREPAYMENT and not is_paid: + send_job_status_reaction(original_event, "success", amount, + config=dvm_config) # or payment-required, or both? + + if dvm_config.SHOWRESULTBEFOREPAYMENT and is_paid: + job_list.remove(x) + elif not dvm_config.SHOWRESULTBEFOREPAYMENT and is_paid: + job_list.remove(x) + send_nostr_reply_event(data, original_event_str, key=keys) + break + + post_processed_content = post_process_result(data, original_event) + send_nostr_reply_event(post_processed_content, original_event_str, key=keys) + + def send_nostr_reply_event(content, original_event_as_str, key=None): + originalevent = Event.from_json(original_event_as_str) + requesttag = Tag.parse(["request", original_event_as_str.replace("\\", "")]) + etag = Tag.parse(["e", originalevent.id().to_hex()]) + ptag = Tag.parse(["p", originalevent.pubkey().to_hex()]) + alttag = Tag.parse(["alt", "This is the result of a NIP90 DVM AI task with kind " + str( + originalevent.kind()) + ". The task was: " + originalevent.content()]) + statustag = Tag.parse(["status", "success"]) + replytags = [requesttag, etag, ptag, alttag, statustag] + for tag in originalevent.tags(): + if tag.as_vec()[0] == "i": + icontent = tag.as_vec()[1] + ikind = tag.as_vec()[2] + itag = Tag.parse(["i", icontent, ikind]) + replytags.append(itag) + + if key is None: + key = Keys.from_sk_str(dvm_config.PRIVATE_KEY) + + response_kind = originalevent.kind() + 1000 + event = EventBuilder(response_kind, str(content), replytags).to_event(key) + send_event(event, key=key) + print("[Nostr] " + str(response_kind) + " Job Response event sent: " + event.as_json()) + return event.as_json() client.handle_notifications(NotificationHandler()) + def respond_to_error(content, originaleventstr, is_from_bot=False, dvm_key=None): + print("ERROR") + if dvm_key is None: + keys = Keys.from_sk_str(dvm_config.PRIVATE_KEY) + else: + keys = Keys.from_sk_str(dvm_key) + + original_event = Event.from_json(originaleventstr) + sender = "" + task = "" + if not is_from_bot: + send_job_status_reaction(original_event, "error", content=content, key=dvm_key) + # TODO Send Zap back + else: + for tag in original_event.tags(): + if tag.as_vec()[0] == "p": + sender = tag.as_vec()[1] + elif tag.as_vec()[0] == "i": + task = tag.as_vec()[1] + + user = get_from_sql_table(sender) + is_whitelisted = user[2] + if not is_whitelisted: + amount = int(user[1]) + get_amount_per_task(task) + update_sql_table(sender, amount, user[2], user[3], user[4], user[5], user[6], + Timestamp.now().as_secs()) + message = "There was the following error : " + content + ". Credits have been reimbursed" + else: + # User didn't pay, so no reimbursement + message = "There was the following error : " + content + + evt = EventBuilder.new_encrypted_direct_msg(keys, PublicKey.from_hex(sender), message, None).to_event(keys) + send_event(evt, key=keys) + while True: for job in job_list: if job.bolt11 != "" and job.payment_hash != "" and not job.is_paid: @@ -269,204 +464,16 @@ def dvm(config): time.sleep(1.0) -def check_event_has_not_unifinished_job_input(nevent, append, client, dvmconfig): - task_supported, task, duration = check_task_is_supported(nevent, client, False, config=dvmconfig) - if not task_supported: - return False - - for tag in nevent.tags(): - if tag.as_vec()[0] == 'i': - if len(tag.as_vec()) < 3: - print("Job Event missing/malformed i tag, skipping..") - return False - else: - input = tag.as_vec()[1] - input_type = tag.as_vec()[2] - if input_type == "job": - evt = get_referenced_event_by_id(input, EventDefinitions.ANY_RESULT, client, config=dvmconfig) - if evt is None: - if append: - job = RequiredJobToWatch(event=nevent, timestamp=Timestamp.now().as_secs()) - jobs_on_hold_list.append(job) - send_job_status_reaction(nevent, "chain-scheduled", True, 0, client=client, - config=dvmconfig) - - return False - else: - return True -def send_job_status_reaction(original_event, status, is_paid=True, amount=0, client=None, content=None, config=None, - key=None): - dvmconfig = config - altdesc = "This is a reaction to a NIP90 DVM AI task. " - task = get_task(original_event, client=client, dvmconfig=dvmconfig) - if status == "processing": - altdesc = "NIP90 DVM AI task " + task + " started processing. " - reaction = altdesc + emoji.emojize(":thumbs_up:") - elif status == "success": - altdesc = "NIP90 DVM AI task " + task + " finished successfully. " - reaction = altdesc + emoji.emojize(":call_me_hand:") - elif status == "chain-scheduled": - altdesc = "NIP90 DVM AI task " + task + " Chain Task scheduled" - reaction = altdesc + emoji.emojize(":thumbs_up:") - elif status == "error": - altdesc = "NIP90 DVM AI task " + task + " had an error. " - if content is None: - reaction = altdesc + emoji.emojize(":thumbs_down:") - else: - reaction = altdesc + emoji.emojize(":thumbs_down:") + content - - elif status == "payment-required": - - altdesc = "NIP90 DVM AI task " + task + " requires payment of min " + str(amount) + " Sats. " - reaction = altdesc + emoji.emojize(":orange_heart:") - - elif status == "payment-rejected": - altdesc = "NIP90 DVM AI task " + task + " payment is below required amount of " + str(amount) + " Sats. " - reaction = altdesc + emoji.emojize(":thumbs_down:") - elif status == "user-blocked-from-service": - - altdesc = "NIP90 DVM AI task " + task + " can't be performed. User has been blocked from Service. " - reaction = altdesc + emoji.emojize(":thumbs_down:") - else: - reaction = emoji.emojize(":thumbs_down:") - - etag = Tag.parse(["e", original_event.id().to_hex()]) - ptag = Tag.parse(["p", original_event.pubkey().to_hex()]) - alttag = Tag.parse(["alt", altdesc]) - statustag = Tag.parse(["status", status]) - tags = [etag, ptag, alttag, statustag] - - if status == "success" or status == "error": # - for x in job_list: - if x.event_id == original_event.id(): - is_paid = x.is_paid - amount = x.amount - break - - bolt11 = "" - payment_hash = "" - expires = original_event.created_at().as_secs() + (60 * 60 * 24) - if status == "payment-required" or (status == "processing" and not is_paid): - if dvmconfig.LNBITS_INVOICE_KEY != "": - try: - bolt11, payment_hash = create_bolt11_ln_bits(amount, dvmconfig) - except Exception as e: - print(e) - - if not any(x.event_id == original_event.id().to_hex() for x in job_list): - job_list.append( - JobToWatch(event_id=original_event.id().to_hex(), timestamp=original_event.created_at().as_secs(), - amount=amount, - is_paid=is_paid, - status=status, result="", is_processed=False, bolt11=bolt11, payment_hash=payment_hash, - expires=expires, from_bot=False)) - print(str(job_list)) - if status == "payment-required" or status == "payment-rejected" or (status == "processing" and not is_paid) or ( - status == "success" and not is_paid): - - if dvmconfig.LNBITS_INVOICE_KEY != "": - amount_tag = Tag.parse(["amount", str(amount * 1000), bolt11]) - else: - amount_tag = Tag.parse(["amount", str(amount * 1000)]) # to millisats - tags.append(amount_tag) - if key is not None: - keys = Keys.from_sk_str(key) - else: - keys = Keys.from_sk_str(dvmconfig.PRIVATE_KEY) - event = EventBuilder(EventDefinitions.KIND_FEEDBACK, reaction, tags).to_event(keys) - - send_event(event, key=keys) - print("[Nostr] Sent Kind " + str(EventDefinitions.KIND_FEEDBACK) + " Reaction: " + status + " " + event.as_json()) - return event.as_json() -def send_nostr_reply_event(content, original_event_as_str, key=None): - originalevent = Event.from_json(original_event_as_str) - requesttag = Tag.parse(["request", original_event_as_str.replace("\\", "")]) - etag = Tag.parse(["e", originalevent.id().to_hex()]) - ptag = Tag.parse(["p", originalevent.pubkey().to_hex()]) - alttag = Tag.parse(["alt", "This is the result of a NIP90 DVM AI task with kind " + str( - originalevent.kind()) + ". The task was: " + originalevent.content()]) - statustag = Tag.parse(["status", "success"]) - replytags = [requesttag, etag, ptag, alttag, statustag] - for tag in originalevent.tags(): - if tag.as_vec()[0] == "i": - icontent = tag.as_vec()[1] - ikind = tag.as_vec()[2] - itag = Tag.parse(["i", icontent, ikind]) - replytags.append(itag) - - if key is None: - key = Keys.from_sk_str(dvm_config.PRIVATE_KEY) - - response_kind = originalevent.kind() + 1000 - event = EventBuilder(response_kind, str(content), replytags).to_event(key) - send_event(event, key=key) - print("[Nostr] " + str(response_kind) + " Job Response event sent: " + event.as_json()) - return event.as_json() -def respond_to_error(content, originaleventstr, is_from_bot=False, dvm_key=None): - print("ERROR") - if dvm_key is None: - keys = Keys.from_sk_str(dvm_config.PRIVATE_KEY) - else: - keys = Keys.from_sk_str(dvm_key) - - original_event = Event.from_json(originaleventstr) - sender = "" - task = "" - if not is_from_bot: - send_job_status_reaction(original_event, "error", content=content, key=dvm_key) - # TODO Send Zap back - else: - for tag in original_event.tags(): - if tag.as_vec()[0] == "p": - sender = tag.as_vec()[1] - elif tag.as_vec()[0] == "i": - task = tag.as_vec()[1] - - user = get_from_sql_table(sender) - is_whitelisted = user[2] - if not is_whitelisted: - amount = int(user[1]) + get_amount_per_task(task) - update_sql_table(sender, amount, user[2], user[3], user[4], user[5], user[6], - Timestamp.now().as_secs()) - message = "There was the following error : " + content + ". Credits have been reimbursed" - else: - # User didn't pay, so no reimbursement - message = "There was the following error : " + content - - evt = EventBuilder.new_encrypted_direct_msg(keys, PublicKey.from_hex(sender), message, None).to_event(keys) - send_event(evt, key=keys) -def check_and_return_event(data, original_event_str: str, dvm_key=""): - original_event = Event.from_json(original_event_str) - keys = Keys.from_sk_str(dvm_key) - for x in job_list: - if x.event_id == original_event.id().to_hex(): - is_paid = x.is_paid - amount = x.amount - x.result = data - x.is_processed = True - if dvm_config.SHOWRESULTBEFOREPAYMENT and not is_paid: - send_nostr_reply_event(data, original_event_str, key=keys) - send_job_status_reaction(original_event, "success", amount, - config=dvm_config) # or payment-required, or both? - elif not dvm_config.SHOWRESULTBEFOREPAYMENT and not is_paid: - send_job_status_reaction(original_event, "success", amount, - config=dvm_config) # or payment-required, or both? - if dvm_config.SHOWRESULTBEFOREPAYMENT and is_paid: - job_list.remove(x) - elif not dvm_config.SHOWRESULTBEFOREPAYMENT and is_paid: - job_list.remove(x) - send_nostr_reply_event(data, original_event_str, key=keys) - break - post_processed_content = post_process_result(data, original_event) - send_nostr_reply_event(post_processed_content, original_event_str, key=keys) + + diff --git a/main.py b/main.py index 9954a05..00e7de4 100644 --- a/main.py +++ b/main.py @@ -4,35 +4,24 @@ from threading import Thread import dotenv import utils.env as env +from tasks.textextractionPDF import TextExtractionPDF +from tasks.translation import Translation from utils.definitions import EventDefinitions def run_nostr_dvm_with_local_config(): from dvm import dvm, DVMConfig - from utils.nip89_utils import NIP89Announcement dvmconfig = DVMConfig() dvmconfig.PRIVATE_KEY = os.getenv(env.NOSTR_PRIVATE_KEY) - dvmconfig.SUPPORTED_TASKS = ["translation", "pdf-to-text"] + dvmconfig.SUPPORTED_TASKS = [Translation.TASK, TextExtractionPDF.TASK] dvmconfig.LNBITS_INVOICE_KEY = os.getenv(env.LNBITS_INVOICE_KEY) dvmconfig.LNBITS_URL = os.getenv(env.LNBITS_HOST) # In admin_utils, set rebroadcast_nip89 to true to (re)broadcast your DVM. You can create a valid dtag and the content on vendata.io - # Add the dtag in your .env file so you can update your dvm later and change the content here as needed. - - nip89extraction = NIP89Announcement() - nip89extraction.kind = EventDefinitions.KIND_NIP90_EXTRACT_TEXT - nip89extraction.dtag = os.getenv(env.TASK_TRANSLATION_NIP89_DTAG) - nip89extraction.pk = os.getenv(env.NOSTR_PRIVATE_KEY) - nip89extraction.content = "{\"name\":\"Pdf Extractor\",\"image\":\"https://image.nostr.build/c33ca6fc4cc038ca4adb46fdfdfda34951656f87ee364ef59095bae1495ce669.jpg\",\"about\":\"I extract Text from pdf documents\",\"nip90Params\":{}}" - dvmconfig.NIP89s.append(nip89extraction) - - nip89translation = NIP89Announcement() - nip89translation.kind = EventDefinitions.KIND_NIP90_TRANSLATE_TEXT - nip89translation.dtag = os.getenv(env.TASK_TRANSLATION_NIP89_DTAG) - nip89translation.pk = os.getenv(env.NOSTR_PRIVATE_KEY) - nip89translation.content = "{\"name\":\"Translator\",\"image\":\"https://image.nostr.build/c33ca6fc4cc038ca4adb46fdfdfda34951656f87ee364ef59095bae1495ce669.jpg\",\"about\":\"I translate text from given text/event/job, currently using Google Translation Services into language defined in param. \",\"nip90Params\":{\"language\":{\"required\":true,\"values\":[\"af\",\"am\",\"ar\",\"az\",\"be\",\"bg\",\"bn\",\"bs\",\"ca\",\"ceb\",\"co\",\"cs\",\"cy\",\"da\",\"de\",\"el\",\"eo\",\"es\",\"et\",\"eu\",\"fa\",\"fi\",\"fr\",\"fy\",\"ga\",\"gd\",\"gl\",\"gu\",\"ha\",\"haw\",\"hi\",\"hmn\",\"hr\",\"ht\",\"hu\",\"hy\",\"id\",\"ig\",\"is\",\"it\",\"he\",\"ja\",\"jv\",\"ka\",\"kk\",\"km\",\"kn\",\"ko\",\"ku\",\"ky\",\"la\",\"lb\",\"lo\",\"lt\",\"lv\",\"mg\",\"mi\",\"mk\",\"ml\",\"mn\",\"mr\",\"ms\",\"mt\",\"my\",\"ne\",\"nl\",\"no\",\"ny\",\"or\",\"pa\",\"pl\",\"ps\",\"pt\",\"ro\",\"ru\",\"sd\",\"si\",\"sk\",\"sl\",\"sm\",\"sn\",\"so\",\"sq\",\"sr\",\"st\",\"su\",\"sv\",\"sw\",\"ta\",\"te\",\"tg\",\"th\",\"tl\",\"tr\",\"ug\",\"uk\",\"ur\",\"uz\",\"vi\",\"xh\",\"yi\",\"yo\",\"zh\",\"zu\"]}}}" - dvmconfig.NIP89s.append(nip89translation) + # Add the dtag in your .env file so you can update your dvm later and change the content in the module file as needed. + dvmconfig.NIP89s.append(TextExtractionPDF.NIP89_announcement()) + dvmconfig.NIP89s.append(Translation.NIP89_announcement()) nostr_dvm_thread = Thread(target=dvm, args=[dvmconfig]) nostr_dvm_thread.start() diff --git a/tasks/textextractionPDF.py b/tasks/textextractionPDF.py new file mode 100644 index 0000000..a309ce9 --- /dev/null +++ b/tasks/textextractionPDF.py @@ -0,0 +1,74 @@ +import os +from typing import re + +from utils import env +from utils.definitions import EventDefinitions +from utils.nip89_utils import NIP89Announcement +from utils.nostr_utils import get_event_by_id, get_referenced_event_by_id + + +class TextExtractionPDF: + TASK: str = "pdf-to-text" + COST: int = 20 + + @staticmethod + def NIP89_announcement(): + nip89 = NIP89Announcement() + nip89.kind = EventDefinitions.KIND_NIP90_TRANSLATE_TEXT + nip89.dtag = os.getenv(env.TASK_TRANSLATION_NIP89_DTAG) + nip89.pk = os.getenv(env.NOSTR_PRIVATE_KEY) + nip89.content = "{\"name\":\"Translator\",\"image\":\"https://image.nostr.build/c33ca6fc4cc038ca4adb46fdfdfda34951656f87ee364ef59095bae1495ce669.jpg\",\"about\":\"I translate text from given text/event/job, currently using Google Translation Services into language defined in param. \",\"nip90Params\":{\"language\":{\"required\":true,\"values\":[\"af\",\"am\",\"ar\",\"az\",\"be\",\"bg\",\"bn\",\"bs\",\"ca\",\"ceb\",\"co\",\"cs\",\"cy\",\"da\",\"de\",\"el\",\"eo\",\"es\",\"et\",\"eu\",\"fa\",\"fi\",\"fr\",\"fy\",\"ga\",\"gd\",\"gl\",\"gu\",\"ha\",\"haw\",\"hi\",\"hmn\",\"hr\",\"ht\",\"hu\",\"hy\",\"id\",\"ig\",\"is\",\"it\",\"he\",\"ja\",\"jv\",\"ka\",\"kk\",\"km\",\"kn\",\"ko\",\"ku\",\"ky\",\"la\",\"lb\",\"lo\",\"lt\",\"lv\",\"mg\",\"mi\",\"mk\",\"ml\",\"mn\",\"mr\",\"ms\",\"mt\",\"my\",\"ne\",\"nl\",\"no\",\"ny\",\"or\",\"pa\",\"pl\",\"ps\",\"pt\",\"ro\",\"ru\",\"sd\",\"si\",\"sk\",\"sl\",\"sm\",\"sn\",\"so\",\"sq\",\"sr\",\"st\",\"su\",\"sv\",\"sw\",\"ta\",\"te\",\"tg\",\"th\",\"tl\",\"tr\",\"ug\",\"uk\",\"ur\",\"uz\",\"vi\",\"xh\",\"yi\",\"yo\",\"zh\",\"zu\"]}}}" + return nip89 + + + @staticmethod + def is_input_supported(input_type, input_content): + if input_type != "url": + return False + return True + + @staticmethod + def create_requestform_from_nostr_event(event, client=None, dvmconfig=None): + request_form = {"jobID": event.id().to_hex()} + + # default values + input_type = "url" + input_content = "" + url = "" + + for tag in event.tags(): + if tag.as_vec()[0] == 'i': + input_type = tag.as_vec()[2] + input_content = tag.as_vec()[1] + + if input_type == "url": + url = input_content + elif input_type == "event": + evt = get_event_by_id(input_content, config=dvmconfig) + url = re.search("(?Phttps?://[^\s]+)", evt.content()).group("url") + elif input_type == "job": + evt = get_referenced_event_by_id(input_content, [EventDefinitions.KIND_NIP90_RESULT_GENERATE_IMAGE], + client, config=dvmconfig) + + url = re.search("(?Phttps?://[^\s]+)", evt.content()).group("url") + + request_form["optStr"] = 'url=' + url + return request_form + + @staticmethod + def process(options): + from pypdf import PdfReader + from pathlib import Path + import requests + file_path = Path('temp.pdf') + response = requests.get(options["url"]) + file_path.write_bytes(response.content) + reader = PdfReader(file_path) + number_of_pages = len(reader.pages) + text = "" + for page_num in range(number_of_pages): + page = reader.pages[page_num] + text = text + page.extract_text() + + os.remove('temp.pdf') + return text diff --git a/tasks/translation.py b/tasks/translation.py new file mode 100644 index 0000000..6813a80 --- /dev/null +++ b/tasks/translation.py @@ -0,0 +1,109 @@ +import os + +from utils import env +from utils.definitions import EventDefinitions +from utils.nip89_utils import NIP89Announcement +from utils.nostr_utils import get_referenced_event_by_id, get_event_by_id + + +class Translation: + TASK: str = "translation" + COST: int = 20 + + @staticmethod + def NIP89_announcement(): + nip89 = NIP89Announcement() + nip89.kind = EventDefinitions.KIND_NIP90_TRANSLATE_TEXT + nip89.dtag = os.getenv(env.TASK_TRANSLATION_NIP89_DTAG) + nip89.pk = os.getenv(env.NOSTR_PRIVATE_KEY) + nip89.content = "{\"name\":\"Translator\",\"image\":\"https://image.nostr.build/c33ca6fc4cc038ca4adb46fdfdfda34951656f87ee364ef59095bae1495ce669.jpg\",\"about\":\"I translate text from given text/event/job, currently using Google Translation Services into language defined in param. \",\"nip90Params\":{\"language\":{\"required\":true,\"values\":[\"af\",\"am\",\"ar\",\"az\",\"be\",\"bg\",\"bn\",\"bs\",\"ca\",\"ceb\",\"co\",\"cs\",\"cy\",\"da\",\"de\",\"el\",\"eo\",\"es\",\"et\",\"eu\",\"fa\",\"fi\",\"fr\",\"fy\",\"ga\",\"gd\",\"gl\",\"gu\",\"ha\",\"haw\",\"hi\",\"hmn\",\"hr\",\"ht\",\"hu\",\"hy\",\"id\",\"ig\",\"is\",\"it\",\"he\",\"ja\",\"jv\",\"ka\",\"kk\",\"km\",\"kn\",\"ko\",\"ku\",\"ky\",\"la\",\"lb\",\"lo\",\"lt\",\"lv\",\"mg\",\"mi\",\"mk\",\"ml\",\"mn\",\"mr\",\"ms\",\"mt\",\"my\",\"ne\",\"nl\",\"no\",\"ny\",\"or\",\"pa\",\"pl\",\"ps\",\"pt\",\"ro\",\"ru\",\"sd\",\"si\",\"sk\",\"sl\",\"sm\",\"sn\",\"so\",\"sq\",\"sr\",\"st\",\"su\",\"sv\",\"sw\",\"ta\",\"te\",\"tg\",\"th\",\"tl\",\"tr\",\"ug\",\"uk\",\"ur\",\"uz\",\"vi\",\"xh\",\"yi\",\"yo\",\"zh\",\"zu\"]}}}" + return nip89 + + + @staticmethod + def is_input_supported(input_type, input_content): + if input_type != "event" and input_type != "job" and input_type != "text": + return False + if input_type != "text" and len(input_content) > 4999: + return False + return True + + @staticmethod + def create_requestform_from_nostr_event(event, client=None, dvmconfig=None): + request_form = {"jobID": event.id().to_hex()} + + #default values + input_type = "event" + text = "" + translation_lang = "en" + + + for tag in event.tags(): + if tag.as_vec()[0] == 'i': + input_type = tag.as_vec()[2] + + elif tag.as_vec()[0] == 'param': + param = tag.as_vec()[1] + if param == "language": # check for paramtype + translation_lang = str(tag.as_vec()[2]).split('-')[0] + + if input_type == "event": + for tag in event.tags(): + if tag.as_vec()[0] == 'i': + evt = get_event_by_id(tag.as_vec()[1], config=dvmconfig) + text = evt.content() + break + + elif input_type == "text": + for tag in event.tags(): + if tag.as_vec()[0] == 'i': + text = tag.as_vec()[1] + break + + elif input_type == "job": + for tag in event.tags(): + if tag.as_vec()[0] == 'i': + evt = get_referenced_event_by_id(tag.as_vec()[1], + [EventDefinitions.KIND_NIP90_RESULT_EXTRACT_TEXT, + EventDefinitions.KIND_NIP90_RESULT_SUMMARIZE_TEXT], + client, + config=dvmconfig) + text = evt.content() + break + + request_form["optStr"] = ('translation_lang=' + translation_lang + ';text=' + + text.replace('\U0001f919', "").replace("=", "equals"). + replace(";", ",")) + return request_form + + @staticmethod + def process(options): + from translatepy.translators.google import GoogleTranslate + gtranslate = GoogleTranslate() + length = len(options["text"]) + + step = 0 + translated_text = "" + if length > 4999: + while step + 5000 < length: + textpart = options["text"][step:step + 5000] + step = step + 5000 + try: + translated_text_part = str(gtranslate.translate(textpart, options["translation_lang"])) + print("Translated Text part:\n\n " + translated_text_part) + except: + translated_text_part = "An error occured" + + translated_text = translated_text + translated_text_part + + if step < length: + textpart = options["text"][step:length] + try: + translated_text_part = str(gtranslate.translate(textpart, options["translation_lang"])) + print("Translated Text part:\n\n " + translated_text_part) + except: + translated_text_part = "An error occured" + + translated_text = translated_text + translated_text_part + + return translated_text diff --git a/utils/ai_utils.py b/utils/ai_utils.py deleted file mode 100644 index 9fef2ed..0000000 --- a/utils/ai_utils.py +++ /dev/null @@ -1,58 +0,0 @@ -import os - - -#We can add multiple Tasks here and call them in the do_work function. - -#Also make sure to define your task in get supported tasks, and in get amount per task and listen to -#the according event type in the beginning of dvm.py and - - -def google_translate(text, translation_lang): - from translatepy.translators.google import GoogleTranslate - gtranslate = GoogleTranslate() - length = len(text) - - step = 0 - translated_text = "" - if length > 4999: - while step+5000 < length: - textpart = text[step:step+5000] - step = step + 5000 - try: - translated_text_part = str(gtranslate.translate(textpart, translation_lang)) - print("Translated Text part:\n\n " + translated_text_part) - except: - translated_text_part = "An error occured" - - translated_text = translated_text + translated_text_part - - if step < length: - textpart = text[step:length] - try: - translated_text_part = str(gtranslate.translate(textpart, translation_lang)) - print("Translated Text part:\n\n " + translated_text_part) - except: - translated_text_part = "An error occured" - - translated_text = translated_text + translated_text_part - - - return translated_text - -def extract_text_from_pdf(url): - from pypdf import PdfReader - from pathlib import Path - import requests - file_path = Path('temp.pdf') - response = requests.get(url) - file_path.write_bytes(response.content) - reader = PdfReader(file_path) - number_of_pages = len(reader.pages) - text = "" - for page_num in range(number_of_pages): - page = reader.pages[page_num] - text = text + page.extract_text() - - os.remove('temp.pdf') - return text - diff --git a/utils/backend_utils.py b/utils/backend_utils.py index eb49df7..033b9fb 100644 --- a/utils/backend_utils.py +++ b/utils/backend_utils.py @@ -1,9 +1,11 @@ - import requests +from tasks.textextractionPDF import TextExtractionPDF from utils.definitions import EventDefinitions from utils.nostr_utils import get_event_by_id +from tasks.translation import Translation + def get_task(event, client, dvmconfig): if event.kind() == EventDefinitions.KIND_NIP90_GENERIC: # use this for events that have no id yet @@ -19,19 +21,20 @@ def get_task(event, client, dvmconfig): else: return "unknown job: " + event.as_json() + # This looks a bit more complicated, but we do several tasks for text-extraction in the future elif event.kind() == EventDefinitions.KIND_NIP90_EXTRACT_TEXT: for tag in event.tags(): if tag.as_vec()[0] == "i": if tag.as_vec()[2] == "url": file_type = check_url_is_readable(tag.as_vec()[1]) if file_type == "pdf": - return "pdf-to-text" + return TextExtractionPDF.TASK else: return "unknown job" elif tag.as_vec()[2] == "event": - evt = get_event_by_id(tag.as_vec()[1],config=dvmconfig) - if evt is not None: - if evt.kind() == 1063: + evt = get_event_by_id(tag.as_vec()[1], config=dvmconfig) + if evt is not None: + if evt.kind() == 1063: for tag in evt.tags(): if tag.as_vec()[0] == 'url': file_type = check_url_is_readable(tag.as_vec()[1]) @@ -39,23 +42,23 @@ def get_task(event, client, dvmconfig): return "pdf-to-text" else: return "unknown job" - else: - return "unknown type" + else: + return "unknown type" elif event.kind() == EventDefinitions.KIND_NIP90_TRANSLATE_TEXT: - return "translation" + return Translation.TASK else: return "unknown type" -def check_task_is_supported(event, client, get_duration = False, config=None): - dvmconfig = config + + +def check_task_is_supported(event, client, get_duration=False, config=None): + dvm_config = config input_value = "" input_type = "" duration = 1 - output_is_set = True - for tag in event.tags(): if tag.as_vec()[0] == 'i': if len(tag.as_vec()) < 3: @@ -65,49 +68,41 @@ def check_task_is_supported(event, client, get_duration = False, config=None): input_value = tag.as_vec()[1] input_type = tag.as_vec()[2] if input_type == "event": - evt = get_event_by_id(input_value, config=dvmconfig) - if evt == None: - print("Event not found") - return False, "", 0 + evt = get_event_by_id(input_value, config=dvm_config) + if evt is None: + print("Event not found") + return False, "", 0 elif tag.as_vec()[0] == 'output': - output = tag.as_vec()[1] - output_is_set = True - if not (output == "text/plain" or output == "text/json" or output == "json" or output == "image/png" or "image/jpg" or output == ""): - print("Output format not supported, skipping..") - return False, "", 0 + output = tag.as_vec()[1] + if not ( + output == "text/plain" or output == "text/json" or output == "json" or output == "image/png" or "image/jpg" or output == ""): + print("Output format not supported, skipping..") + return False, "", 0 - task = get_task(event, client=client, dvmconfig=dvmconfig) - if not output_is_set: - print("No output set") - if task not in dvmconfig.SUPPORTED_TASKS: # The Tasks this DVM supports (can be extended) + task = get_task(event, client=client, dvmconfig=dvm_config) + + if task not in dvm_config.SUPPORTED_TASKS: # The Tasks this DVM supports (can be extended) return False, task, duration - - elif task == "translation" and ( - input_type != "event" and input_type != "job" and input_type != "text"): # The input types per task - return False, task, duration - if task == "translation" and input_type != "text" and len(event.content()) > 4999: # Google Services have a limit of 5000 signs - return False, task, duration if input_type == 'url' and check_url_is_readable(input_value) is None: print("url not readable") return False, task, duration + if task == Translation.TASK: + return Translation.is_input_supported(input_type, event.content()), task, duration + + elif task == TextExtractionPDF.TASK: + return TextExtractionPDF.is_input_supported(input_type, event.content()), task, duration + return True, task, duration + def check_url_is_readable(url): if not str(url).startswith("http"): return None - # If it's a YouTube oder Overcast link, we suppose we support it - if (str(url).replace("http://", "").replace("https://", "").replace("www.", "").replace("youtu.be/", - "youtube.com?v=")[ - 0:11] == "youtube.com" and str(url).find("live") == -1) or str(url).startswith('https://x.com') or str(url).startswith('https://twitter.com') : - return "video" - elif str(url).startswith("https://overcast.fm/"): - return "audio" - - # If link is comaptible with one of these file formats, it's fine. + # If link is comaptible with one of these file formats, move on. req = requests.get(url) content_type = req.headers['content-type'] if content_type == 'audio/x-wav' or str(url).endswith(".wav") or content_type == 'audio/mpeg' or str(url).endswith( @@ -115,7 +110,7 @@ def check_url_is_readable(url): return "audio" elif content_type == 'image/png' or str(url).endswith(".png") or content_type == 'image/jpg' or str(url).endswith( ".jpg") or content_type == 'image/jpeg' or str(url).endswith(".jpeg") or content_type == 'image/png' or str( - url).endswith(".png"): + url).endswith(".png"): return "image" elif content_type == 'video/mp4' or str(url).endswith(".mp4") or content_type == 'video/avi' or str(url).endswith( ".avi") or content_type == 'video/mov' or str(url).endswith(".mov"): @@ -126,16 +121,14 @@ def check_url_is_readable(url): # Otherwise we will not offer to do the job. return None -def get_amount_per_task(task, duration = 0, config=None): - dvmconfig = config - if task == "translation": - amount = dvmconfig.COSTPERUNIT_TRANSLATION - elif task == "pdf-to-text": - amount = dvmconfig.COSTPERUNIT_TEXT_EXTRACTION + +def get_amount_per_task(task, duration=0, config=None): + if task == Translation.TASK: + amount = Translation.COST + elif task == TextExtractionPDF.TASK: + amount = TextExtractionPDF.COST else: print("[Nostr] Task " + task + " is currently not supported by this instance, skipping") return None return amount - - diff --git a/utils/definitions.py b/utils/definitions.py index 1dac005..6529de5 100644 --- a/utils/definitions.py +++ b/utils/definitions.py @@ -3,9 +3,6 @@ from dataclasses import dataclass from nostr_sdk import Event NEW_USER_BALANCE = 250 -LOCAL_TASKS = ["pdf-to-text", "translation"] -# Tasks performed by the DVM and not send to nova-server (can change later) - RELAY_LIST = ["wss://relay.damus.io", "wss://nostr-pub.wellorder.net", "wss://nos.lol", "wss://nostr.wine", "wss://relay.nostfiles.dev", "wss://nostr.mom", "wss://nostr.oxtr.dev", "wss://relay.nostr.bg", "wss://relay.f7z.io"] class EventDefinitions: @@ -54,8 +51,6 @@ class DVMConfig: SHOWRESULTBEFOREPAYMENT: bool = True # if this is true show results even when not paid right after autoprocess NEW_USER_BALANCE: int = 250 # Free credits for new users - COSTPERUNIT_TRANSLATION: int = 20 - COSTPERUNIT_TEXT_EXTRACTION: int = 20 NIP89s: list = [] diff --git a/utils/requestform_utils.py b/utils/requestform_utils.py deleted file mode 100644 index 02df531..0000000 --- a/utils/requestform_utils.py +++ /dev/null @@ -1,90 +0,0 @@ -import os -import re -from configparser import ConfigParser -import time - -from nostr_sdk import PublicKey - -from utils.definitions import EventDefinitions -from utils.backend_utils import get_task -from utils.nostr_utils import get_referenced_event_by_id, get_event_by_id -from utils.definitions import LOCAL_TASKS -import utils.env as env - - -def create_requestform_from_nostr_event(event, is_bot=False, client=None, dvmconfig=None): - task = get_task(event, client=client, dvmconfig=dvmconfig) - - request_form = {"jobID": event.id().to_hex(), "frameSize": 0, "stride": 0, - "leftContext": 0, "rightContext": 0, - "startTime": "0", "endTime": "0"} - - if task == "translation": - input_type = "event" - text = "" - translation_lang = "en" - for tag in event.tags(): - if tag.as_vec()[0] == 'i': - input_type = tag.as_vec()[2] - - elif tag.as_vec()[0] == 'param': - param = tag.as_vec()[1] - if param == "language": # check for paramtype - translation_lang = str(tag.as_vec()[2]).split('-')[0] - elif param == "lang": # check for paramtype - translation_lang = str(tag.as_vec()[2]).split('-')[0] - - if input_type == "event": - for tag in event.tags(): - if tag.as_vec()[0] == 'i': - evt = get_event_by_id(tag.as_vec()[1], config=dvmconfig) - text = evt.content() - break - - elif input_type == "text": - for tag in event.tags(): - if tag.as_vec()[0] == 'i': - text = tag.as_vec()[1] - break - - elif input_type == "job": - for tag in event.tags(): - if tag.as_vec()[0] == 'i': - evt = get_referenced_event_by_id(tag.as_vec()[1], - [EventDefinitions.KIND_NIP90_RESULT_EXTRACT_TEXT, - EventDefinitions.KIND_NIP90_RESULT_SUMMARIZE_TEXT], - client, - config=dvmconfig) - text = evt.content() - break - - request_form["optStr"] = ('translation_lang=' + translation_lang + ';text=' + - text.replace('\U0001f919', "").replace("=", "equals"). - replace(";", ",")) - - - - - elif task == "pdf-to-text": - input_type = "url" - input_content = "" - url = "" - for tag in event.tags(): - if tag.as_vec()[0] == 'i': - input_type = tag.as_vec()[2] - input_content = tag.as_vec()[1] - - if input_type == "url": - url = input_content - elif input_type == "event": - evt = get_event_by_id(input_content, config=dvmconfig) - url = re.search("(?Phttps?://[^\s]+)", evt.content()).group("url") - elif input_type == "job": - evt = get_referenced_event_by_id(input_content, [EventDefinitions.KIND_NIP90_RESULT_GENERATE_IMAGE], - client, config=dvmconfig) - - url = re.search("(?Phttps?://[^\s]+)", evt.content()).group("url") - - request_form["optStr"] = 'url=' + url - - return request_form