skip doing the job if addressed to someone else, add media converter

This commit is contained in:
Believethehype 2023-11-30 08:07:30 +01:00
parent 42b3d763f1
commit dd1b69bc0b
19 changed files with 445 additions and 201 deletions

View File

@ -26,6 +26,8 @@ TASK_TRANSLATION_NIP89_DTAG = "abcded"
TASK_IMAGE_GENERATION_NIP89_DTAG = "fgdfgdf"
TASK_IMAGE_GENERATION_NIP89_DTAG2 = "fdgdfg"
TASK_IMAGE_GENERATION_NIP89_DTAG3 = "asdasd"
TASK_SPEECH_TO_TEXT_NIP89 = "asdasdas"
TASK_MEDIA_CONVERTER_NIP89_DTAG = "asdasdasd"
#Backend Specific Options for tasks that require inputs, such as Endpoints or API Keys

2
.gitignore vendored
View File

@ -161,4 +161,4 @@ cython_debug/
.DS_Store
*.db
outputs/
outputs

1
.idea/dvm.iml generated
View File

@ -2,6 +2,7 @@
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.idea/dataSources" />
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />

View File

@ -41,10 +41,14 @@ def send_request_to_nova_server(request_form, address):
def send_file_to_nova_server(filepath, address):
print("Sending file to NOVA-Server")
url = ('http://' + address + '/upload')
fp = open(filepath, 'rb')
response = requests.post(url, files={'file': fp})
result = response.content.decode('utf-8')
print(result)
try:
fp = open(filepath, 'rb')
response = requests.post(url, files={'file': fp})
result = response.content.decode('utf-8')
except Exception as e:
print(e)
print(response.content.decode('utf-8'))
return result
# headers = {'Content-type': 'application/x-www-form-urlencoded'}

View File

@ -81,132 +81,100 @@ class Bot:
try:
decrypted_text = nip04_decrypt(self.keys.secret_key(), nostr_event.pubkey(), nostr_event.content())
print(decrypted_text)
user = get_or_add_user(db=self.dvm_config.DB, npub=sender, client=self.client, config=self.dvm_config)
print("[" + self.NAME + "] Message from " + user.name + ": " + decrypted_text)
# if user selects an index from the overview list...
if decrypted_text[0].isdigit():
index = int(decrypted_text.split(' ')[0]) - 1
if decrypted_text.split(" ")[1].lower() == "info":
info = print_dvm_info(self.client, index)
time.sleep(2.0)
if info is not None:
evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(),
info, None).to_event(self.keys)
else:
evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(),
"No NIP89 Info found for " +
self.dvm_config.SUPPORTED_DVMS[index].NAME,
None).to_event(self.keys)
send_event(evt, client=self.client, dvm_config=dvm_config)
split = decrypted_text.split(' ')
index = int(split[0]) - 1
# if user sends index info, e.g. 1 info, we fetch the nip89 information and reply with it.
if len(split) > 1 and split[1].lower() == "info":
answer_nip89(nostr_event, index)
# otherwise we probably have to do some work, so build an event from input and send it to the DVM
else:
task = self.dvm_config.SUPPORTED_DVMS[index].TASK
print("[" + self.NAME + "] Request from " + str(user.name) + " (" + str(
user.nip05) + ", Balance: "
+ str(user.balance) + " Sats) Task: " + str(task))
print("[" + self.NAME + "] Request from " + str(user.name) + " (" + str(user.nip05) +
", Balance: " + str(user.balance) + " Sats) Task: " + str(task))
if user.isblacklisted:
# For some reason an admin might blacklist npubs, e.g. for abusing the service
evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(),
"Your are currently blocked from all "
"services.", None).to_event(self.keys)
send_event(evt, client=self.client, dvm_config=dvm_config)
# If users are blacklisted for some reason, tell them.
answer_blacklisted(nostr_event)
else:
command = decrypted_text.replace(decrypted_text.split(' ')[0] + " ", "")
input = command.split(" -")[0].rstrip()
input_type = "text"
if input.startswith("http"):
input_type = "url"
i_tag = Tag.parse(["i", input, input_type])
relays_tag = Tag.parse(["relays", json.dumps(self.dvm_config.RELAY_LIST)])
alt_tag = Tag.parse(["alt", self.dvm_config.SUPPORTED_DVMS[index].TASK])
tags = [i_tag.as_vec(), relays_tag.as_vec(), alt_tag.as_vec()]
remaining_text = command.replace(input, "")
print(remaining_text)
params = remaining_text.rstrip().split(" -")
for i in params:
print(i)
if i != " ":
try:
split = i.split(" ")
param = str(split[0])
print(str(param))
value = str(split[1])
print(str(value))
if param == "cashu":
tag = Tag.parse([param, value])
else:
tag = Tag.parse(["param", param, value])
tags.append(tag.as_vec())
print("Added params: " + str(tag.as_vec()))
except Exception as e:
print(e)
print("Couldn't add " + str(i))
encrypted_params_string = json.dumps(tags)
print(encrypted_params_string)
encrypted_params = nip04_encrypt(self.keys.secret_key(),
PublicKey.from_hex(
self.dvm_config.SUPPORTED_DVMS[index].PUBLIC_KEY),
encrypted_params_string)
encrypted_tag = Tag.parse(['encrypted'])
# Parse inputs to params
tags = build_params(decrypted_text, nostr_event, index)
p_tag = Tag.parse(['p', self.dvm_config.SUPPORTED_DVMS[index].PUBLIC_KEY])
encrypted_nip90request = (EventBuilder(self.dvm_config.SUPPORTED_DVMS[index].KIND,
encrypted_params, [p_tag, encrypted_tag]).
to_event(self.keys))
entry = {"npub": user.npub, "event_id": encrypted_nip90request.id().to_hex(),
if self.dvm_config.SUPPORTED_DVMS[index].SUPPORTS_ENCRYPTION:
tags_str = []
for tag in tags:
tags_str.append(tag.as_vec())
params_as_str = json.dumps(tags_str)
print(params_as_str)
# and encrypt them
encrypted_params = nip04_encrypt(self.keys.secret_key(),
PublicKey.from_hex(
self.dvm_config.SUPPORTED_DVMS[index].PUBLIC_KEY),
params_as_str)
# add encrypted and p tag on the outside
encrypted_tag = Tag.parse(['encrypted'])
# add the encrypted params to the content
nip90request = (EventBuilder(self.dvm_config.SUPPORTED_DVMS[index].KIND,
encrypted_params, [p_tag, encrypted_tag]).
to_event(self.keys))
else:
tags.append(p_tag)
nip90request = (EventBuilder(self.dvm_config.SUPPORTED_DVMS[index].KIND,
"", tags).
to_event(self.keys))
# remember in the job_list that we have made an event, if anybody asks for payment,
# we know we actually sent the request
entry = {"npub": user.npub, "event_id": nip90request.id().to_hex(),
"dvm_key": self.dvm_config.SUPPORTED_DVMS[index].PUBLIC_KEY, "is_paid": False}
self.job_list.append(entry)
send_event(encrypted_nip90request, client=self.client, dvm_config=dvm_config)
# send the event to the DVM
send_event(nip90request, client=self.client, dvm_config=self.dvm_config)
print(nip90request.as_json())
else:
print("[" + self.NAME + "] Message from " + user.name + ": " + decrypted_text)
message = "DVMs that I support:\n\n"
index = 1
for p in self.dvm_config.SUPPORTED_DVMS:
if p.PER_UNIT_COST != 0 and p.PER_UNIT_COST is not None:
message += (str(index) + " " + p.NAME + " " + p.TASK + " " + str(p.FIX_COST) +
" Sats + " + str(p.PER_UNIT_COST) + " Sats per Second\n")
else:
message += (str(index) + " " + p.NAME + " " + p.TASK + " " + str(p.FIX_COST) +
" Sats\n")
index += 1
elif decrypted_text.lower().startswith("balance"):
time.sleep(3.0)
evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(),
message + "\nSelect an Index and provide an input ("
"e.g. 2 A purple ostrich)\nType index info to learn more about each DVM. (e.g. 2 info)\n\n"
""
"Add -cashu cashutoken with the amount + mint fees (at least 3 sat) to pay via #cashu",
nostr_event.id()).to_event(self.keys)
"Your current balance is " + str(
user.balance) + " Sats. Zap me to add to your balance. I will use your balance interact with the DVMs for you.\n"
"I support both public and private Zaps, as well as Zapplepay.\n"
"Alternativly you can add a #cashu token with \"-cashu cashuASomeToken\" to your command.\n Make sure the token is worth the requested amount + "
"mint fees (at least 3 sat).\n Not all DVMs might accept Cashu tokens."
, None).to_event(self.keys)
send_event(evt, client=self.client, dvm_config=dvm_config)
except Exception as e:
else:
# Build an overview of known DVMs and send it to the user
answer_overview(nostr_event)
except Exception as e:
print("Error in bot " + str(e))
def handle_nip90_feedback(nostr_event):
print(nostr_event.as_json())
try:
is_encrypted = False
status = ""
etag = ""
ptag = ""
content = nostr_event.content()
for tag in nostr_event.tags():
if tag.as_vec()[0] == "status":
status = tag.as_vec()[1]
if len(tag.as_vec()) > 2:
content = tag.as_vec()[2]
elif tag.as_vec()[0] == "e":
etag = tag.as_vec()[1]
elif tag.as_vec()[0] == "p":
@ -214,7 +182,6 @@ class Bot:
elif tag.as_vec()[0] == "encrypted":
is_encrypted = True
content = nostr_event.content()
if is_encrypted:
if ptag == self.keys.public_key().to_hex():
tags_str = nip04_decrypt(Keys.from_sk_str(dvm_config.PRIVATE_KEY).secret_key(),
@ -230,6 +197,8 @@ class Bot:
for tag in nostr_event.tags():
if tag.as_vec()[0] == "status":
status = tag.as_vec()[1]
if len(tag.as_vec()) > 2:
content = tag.as_vec()[2]
elif tag.as_vec()[0] == "e":
etag = tag.as_vec()[1]
elif tag.as_vec()[0] == "content":
@ -238,7 +207,7 @@ class Bot:
else:
return
if status == "success" or status == "error" or status == "processing" or status == "partial":
if status == "success" or status == "error" or status == "processing" or status == "partial" and content != "":
entry = next((x for x in self.job_list if x['event_id'] == etag), None)
if entry is not None:
user = get_or_add_user(db=self.dvm_config.DB, npub=entry['npub'],
@ -321,6 +290,7 @@ class Bot:
def handle_nip90_response_event(nostr_event: Event):
try:
ptag = ""
etag = ""
is_encrypted = False
for tag in nostr_event.tags():
if tag.as_vec()[0] == "e":
@ -382,7 +352,109 @@ class Bot:
except Exception as e:
print("[" + self.NAME + "] Error during content decryption:" + str(e))
self.client.handle_notifications(NotificationHandler())
def answer_overview(nostr_event):
message = "DVMs that I support:\n\n"
index = 1
for p in self.dvm_config.SUPPORTED_DVMS:
if p.PER_UNIT_COST != 0 and p.PER_UNIT_COST is not None:
message += (str(index) + " " + p.NAME + " " + p.TASK + " " + str(p.FIX_COST) +
" Sats + " + str(p.PER_UNIT_COST) + " Sats per Second\n")
else:
message += (str(index) + " " + p.NAME + " " + p.TASK + " " + str(p.FIX_COST) +
" Sats\n")
index += 1
time.sleep(3.0)
evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(),
message + "\nSelect an Index and provide an input ("
"e.g. \"2 A purple ostrich\")\nType \"index info\" to learn "
"more about each DVM. (e.g. \"2 info\")\n\n"
"Type \"balance\" to see your current balance",
nostr_event.id()).to_event(self.keys)
send_event(evt, client=self.client, dvm_config=dvm_config)
def answer_blacklisted(nostr_event):
# For some reason an admin might blacklist npubs, e.g. for abusing the service
evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(),
"Your are currently blocked from all "
"services.", None).to_event(self.keys)
send_event(evt, client=self.client, dvm_config=dvm_config)
def answer_nip89(nostr_event, index):
info = print_dvm_info(self.client, index)
time.sleep(2.0)
if info is not None:
evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(),
info, None).to_event(self.keys)
else:
evt = EventBuilder.new_encrypted_direct_msg(self.keys, nostr_event.pubkey(),
"No NIP89 Info found for " +
self.dvm_config.SUPPORTED_DVMS[index].NAME,
None).to_event(self.keys)
send_event(evt, client=self.client, dvm_config=dvm_config)
def build_params(decrypted_text, nostr_event, index):
tags = []
split = decrypted_text.split(' ')
# If only a command without parameters is sent, we assume no input is required, and that means the dvm might take in the user as input (e.g. for content discovery)
if len(split) == 1:
tag = Tag.parse(["param", "user", nostr_event.pubkey().to_hex()])
tags.append(tag)
output = Tag.parse(["output", "text/plain"])
tags.append(output)
relaylist = ["relays"]
for relay in self.dvm_config.RELAY_LIST:
relaylist.append(relay)
relays = Tag.parse(relaylist)
tags.append(relays)
return tags
command = decrypted_text.replace(split[0] + " ", "")
split = command.split(" -")
input = split[0].rstrip()
print(input)
input_type = "text"
if input.startswith("http"):
input_type = "url"
i_tag = Tag.parse(["i", input, input_type])
alt_tag = Tag.parse(["alt", self.dvm_config.SUPPORTED_DVMS[index].TASK])
tags.append(alt_tag)
relaylist = ["relays"]
for relay in self.dvm_config.RELAY_LIST:
relaylist.append(relay)
relays_tag = Tag.parse(relaylist)
# TODO readd relays tag, but need to find a way to parse it for both str and tag
tags = [i_tag, relays_tag, alt_tag]
remaining_text = command.replace(input, "")
print(remaining_text)
params = remaining_text.rstrip().split(" -")
for i in params:
print(i)
if i != " ":
try:
split = i.split(" ")
if len(split) > 1:
param = str(split[0])
print(str(param))
value = str(split[1])
print(str(value))
if param == "cashu":
tag = Tag.parse([param, value])
else:
tag = Tag.parse(["param", param, value])
tags.append(tag)
print("Added params: " + str(tag.as_vec()))
except Exception as e:
print(e)
print("Couldn't add " + str(i))
return tags
def print_dvm_info(client, index):
pubkey = self.dvm_config.SUPPORTED_DVMS[index].dvm_config.PUBLIC_KEY
@ -406,11 +478,11 @@ class Bot:
return None
self.client.handle_notifications(NotificationHandler())
try:
while True:
time.sleep(1.0)
except KeyboardInterrupt:
print('Stay weird!')
os.kill(os.getpid(), signal.SIGTERM)

View File

@ -95,7 +95,6 @@ class DVM:
task_supported, task = check_task_is_supported(nip90_event, client=self.client,
config=self.dvm_config)
if user.isblacklisted:
send_job_status_reaction(nip90_event, "error", client=self.client, dvm_config=self.dvm_config)
print("[" + self.dvm_config.NIP89.NAME + "] Request by blacklisted user, skipped")
@ -122,7 +121,8 @@ class DVM:
self.dvm_config)
return
# if user is whitelisted or task is free, just do the job
if user.iswhitelisted or task_is_free or cashu_redeemed:
if (user.iswhitelisted or task_is_free or cashu_redeemed) and (p_tag_str == "" or p_tag_str ==
self.dvm_config.PUBLIC_KEY):
print(
"[" + self.dvm_config.NIP89.NAME + "] Free task or Whitelisted for task " + task +
". Starting processing..")
@ -132,8 +132,7 @@ class DVM:
do_work(nip90_event)
# if task is directed to us via p tag and user has balance, do the job and update balance
elif p_tag_str == Keys.from_sk_str(
self.dvm_config.PUBLIC_KEY) and user.balance >= int(amount):
elif p_tag_str == self.dvm_config.PUBLIC_KEY and user.balance >= int(amount):
balance = max(user.balance - int(amount), 0)
update_sql_table(db=self.dvm_config.DB, npub=user.npub, balance=balance,
iswhitelisted=user.iswhitelisted, isblacklisted=user.isblacklisted,
@ -150,7 +149,7 @@ class DVM:
do_work(nip90_event)
# else send a payment required event to user
else:
elif p_tag_str == "" or p_tag_str == self.dvm_config.PUBLIC_KEY:
bid = 0
for tag in nip90_event.tags():
if tag.as_vec()[0] == 'bid':
@ -172,7 +171,8 @@ class DVM:
nip90_event.id().to_hex())
send_job_status_reaction(nip90_event, "payment-required",
False, int(amount), client=self.client, dvm_config=self.dvm_config)
else:
print("[" + self.dvm_config.NIP89.NAME + "] Job addressed to someone else, skipping..")
# else:
# print("[" + self.dvm_config.NIP89.NAME + "] Task " + task + " not supported on this DVM, skipping..")
@ -311,13 +311,19 @@ class DVM:
send_nostr_reply_event(data, original_event_str)
break
try:
task = get_task(original_event, self.client, self.dvm_config)
for dvm in self.dvm_config.SUPPORTED_DVMS:
if task == dvm.TASK:
try:
post_processed = dvm.post_process(data, original_event)
send_nostr_reply_event(post_processed, original_event.as_json())
except Exception as e:
send_job_status_reaction(original_event, "error", content=str(e),
dvm_config=self.dvm_config,
)
post_processed_content = post_process_result(data, original_event)
send_nostr_reply_event(post_processed_content, original_event_str)
except Exception as e:
send_job_status_reaction(original_event, "error", content=str(e), dvm_config=self.dvm_config,
)
def send_nostr_reply_event(content, original_event_as_str):
original_event = Event.from_json(original_event_as_str)
@ -448,11 +454,20 @@ class DVM:
request_form = dvm.create_request_form_from_nostr_event(job_event, self.client,
self.dvm_config)
result = dvm.process(request_form)
check_and_return_event(result, str(job_event.as_json()))
try:
post_processed = dvm.post_process(result, job_event)
send_nostr_reply_event(post_processed, job_event.as_json())
except Exception as e:
send_job_status_reaction(job_event, "error", content=str(e),
dvm_config=self.dvm_config,
)
except Exception as e:
print(e)
send_job_status_reaction(job_event, "error", content=str(e), dvm_config=self.dvm_config)
# we could send the exception here to the user, but maybe that's not a good idea after all.
send_job_status_reaction(job_event, "error", content="An error occurred",
dvm_config=self.dvm_config)
# TODO send sats back on error
return
self.client.handle_notifications(NotificationHandler())

View File

@ -7,6 +7,7 @@ from utils.admin_utils import AdminConfig
from utils.dvmconfig import DVMConfig
from utils.nip89_utils import NIP89Config
from core.dvm import DVM
from utils.output_utils import post_process_result
class DVMTaskInterface:
@ -18,6 +19,7 @@ class DVMTaskInterface:
PRIVATE_KEY: str
PUBLIC_KEY: str
DVM = DVM
SUPPORTS_ENCRYPTION = True # DVMs build with this framework support encryption, but others might not.
dvm_config: DVMConfig
admin_config: AdminConfig
@ -39,7 +41,6 @@ class DVMTaskInterface:
if task is not None:
self.TASK = task
dvm_config.SUPPORTED_DVMS = [self]
dvm_config.DB = "db/" + self.NAME + ".db"
if nip89config.KIND is not None:
@ -49,7 +50,6 @@ class DVMTaskInterface:
self.dvm_config = dvm_config
self.admin_config = admin_config
def run(self):
nostr_dvm_thread = Thread(target=self.DVM, args=[self.dvm_config, self.admin_config])
nostr_dvm_thread.start()
@ -67,8 +67,6 @@ class DVMTaskInterface:
"""Check if input is supported for current Task."""
pass
def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None) -> dict:
"""Parse input into a request form that will be given to the process method"""
pass
@ -77,6 +75,11 @@ class DVMTaskInterface:
"Process the data and return the result"
pass
def post_process(self, result, event):
"""Post-process the data and return the result Use default function, if not overwritten"""
return post_process_result(result, event)
@staticmethod
def set_options(request_form):
print("Setting options...")

40
main.py
View File

@ -11,7 +11,7 @@ from nostr_sdk import Keys
from bot.bot import Bot
from playground import build_pdf_extractor, build_googletranslator, build_unstable_diffusion, build_sketcher, \
build_dalle, \
build_whisperx, build_libretranslator, build_external_dvm
build_whisperx, build_libretranslator, build_external_dvm, build_media_converter
from utils.definitions import EventDefinitions
from utils.dvmconfig import DVMConfig
@ -38,8 +38,8 @@ def run_nostr_dvm_with_local_config():
bot_config.SUPPORTED_DVMS.append(translator) # We add translator to the bot
translator.run()
# Spawn DVM3 Kind 5002 Local Text TranslationLibre, calling the free LibreTranslateApi, as an alternative.
# This will only run and appear on the bot if an endpoint is set in the .env
if os.getenv("LIBRE_TRANSLATE_ENDPOINT") is not None and os.getenv("LIBRE_TRANSLATE_ENDPOINT") != "":
libre_translator = build_libretranslator("Libre Translator")
bot_config.SUPPORTED_DVMS.append(libre_translator) # We add translator to the bot
@ -64,8 +64,6 @@ def run_nostr_dvm_with_local_config():
bot_config.SUPPORTED_DVMS.append(whisperer) # We also add Sketcher to the bot
whisperer.run()
# Spawn DVM6, this one requires an OPENAI API Key and balance with OpenAI, you will move the task to them and pay
# per call. Make sure you have enough balance and the DVM's cost is set higher than what you pay yourself, except, you know,
# you're being generous.
@ -75,19 +73,35 @@ def run_nostr_dvm_with_local_config():
dalle.run()
# Spawn DVM7.. oh wait, actually we don't spawn a new DVM, we use the dvmtaskinterface to define an external dvm by providing some info about it, such as
# their pubkey, a name, task, kind etc.
# their pubkey, a name, task, kind etc. (unencrypted)
tasktiger_external = build_external_dvm(name="External DVM: TaskTiger",
pubkey="d483935d6bfcef3645195c04c97bbb70aedb6e65665c5ea83e562ca3c7acb978",
task="text-to-image",
kind=EventDefinitions.KIND_NIP90_GENERATE_IMAGE,
fix_cost=100, per_unit_cost=0)
tasktiger_external.SUPPORTS_ENCRYPTION = False # if the dvm does not support encrypted events, just send a regular event and mark it with p tag. Other dvms might initial answer
bot_config.SUPPORTED_DVMS.append(tasktiger_external)
# Don't run it, it's on someone else's machine and we simply make the bot aware of it.
# DVM: 8 Another external dvm for recommendations:
ymhm_external = build_external_dvm(name="External DVM: You might have missed",
pubkey="6b37d5dc88c1cbd32d75b713f6d4c2f7766276f51c9337af9d32c8d715cc1b93",
task="content-discovery",
kind=EventDefinitions.KIND_NIP90_CONTENT_DISCOVERY,
fix_cost=0, per_unit_cost=0)
ymhm_external.SUPPORTS_ENCRYPTION = False # if the dvm does not support encrypted events, just send a regular event and mark it with p tag. Other dvms might initial answer
bot_config.SUPPORTED_DVMS.append(ymhm_external)
# Spawn DVM9.. A Media Grabber/Converter
media_bringer = build_media_converter("Media Bringer")
bot_config.SUPPORTED_DVMS.append(media_bringer) # We also add Sketcher to the bot
media_bringer.run()
libretranslate_external = build_external_dvm(name="External DVM test",
pubkey="08fd6bdb17cb2c8a87f8d50653238cb46e26cd44948c474f51dae5f138609da6",
task="translation",
kind=EventDefinitions.KIND_NIP90_TRANSLATE_TEXT,
fix_cost=0, per_unit_cost=0)
bot_config.SUPPORTED_DVMS.append(libretranslate_external)
#Don't run it, it's on someone else's machine and we simply make the bot aware of it.
Bot(bot_config)
# Keep the main function alive for libraries that require it, like openai
try:
while True:

View File

@ -4,6 +4,7 @@ import os
from nostr_sdk import PublicKey, Keys
from interfaces.dvmtaskinterface import DVMTaskInterface
from tasks.convert_media import MediaConverter
from tasks.imagegeneration_openai_dalle import ImageGenerationDALLE
from tasks.imagegeneration_sdxl import ImageGenerationSDXL
from tasks.textextraction_whisperx import SpeechToTextWhisperX
@ -13,7 +14,7 @@ from tasks.translation_libretranslate import TranslationLibre
from utils.admin_utils import AdminConfig
from utils.definitions import EventDefinitions
from utils.dvmconfig import DVMConfig
from utils.nip89_utils import NIP89Config
from utils.nip89_utils import NIP89Config, nip89_create_d_tag
"""
This File is a playground to create DVMs. It shows some examples of DVMs that make use of the modules in the tasks folder
@ -263,6 +264,33 @@ def build_dalle(name):
# We add an optional AdminConfig for this one, and tell the dvm to rebroadcast its NIP89
return ImageGenerationDALLE(name=name, dvm_config=dvm_config, nip89config=nip89config, admin_config=admin_config)
def build_media_converter(name):
dvm_config = DVMConfig()
dvm_config.PRIVATE_KEY = os.getenv("NOSTR_PRIVATE_KEY6")
dvm_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY")
dvm_config.LNBITS_URL = os.getenv("LNBITS_HOST")
# Add NIP89
nip90params = {
"media_format": {
"required": False,
"values": ["video/mp4", "audio/mp3"]
}
}
nip89info = {
"name": name,
"image": "https://image.nostr.build/c33ca6fc4cc038ca4adb46fdfdfda34951656f87ee364ef59095bae1495ce669.jpg",
"about": "I convert videos from urls to given output format.",
"nip90Params": nip90params
}
nip89config = NIP89Config()
new_dtag = nip89_create_d_tag(name, Keys.from_sk_str(dvm_config.PRIVATE_KEY).public_key().to_hex(), nip89info["image"])
print("Some new dtag:" + new_dtag)
nip89config.DTAG = os.getenv("TASK_MEDIA_CONVERTER_NIP89_DTAG")
nip89config.CONTENT = json.dumps(nip89info)
return MediaConverter(name=name, dvm_config=dvm_config, nip89config=nip89config,
admin_config=admin_config)
def build_external_dvm(name, pubkey, task, kind, fix_cost, per_unit_cost):
dvm_config = DVMConfig()

View File

@ -6,9 +6,12 @@ Reusable backend functions can be defined in backends (e.g. API calls)
Current List of Tasks:
| Module | Kind | Description | Backend |
|----------------------|------|-------------------------------------------|---------------------------|
| Translation | 5002 | Translates Inputs to another language | Local, calling Google API |
| TextExtractionPDF | 5001 | Extracts Text from a PDF file | Local |
| ImageGenerationSDXL | 5100 | Generates an Image with StableDiffusionXL | nova-server |
| ImageGenerationDALLE | 5100 | Generates an Image with Dall-E | OpenAI |
| Module | Kind | Description | Backend |
|----------------------|------|------------------------------------------------|-------------|
| TextExtractionPDF | 5000 | Extracts Text from a PDF file | local |
| SpeechToTextWhisperX | 5000 | Extracts Speech from Media files | nova-server |
| TranslationGoogle | 5002 | Translates Inputs to another language | google API |
| TranslationLibre | 5002 | Translates Inputs to another language | libre API |
| ImageGenerationSDXL | 5100 | Generates an Image with StableDiffusionXL | nova-server |
| ImageGenerationDALLE | 5100 | Generates an Image with Dall-E | openAI |
| MediaConverter | 5300 | Converts a link of a media file and uploads it | openAI |

72
tasks/convert_media.py Normal file
View File

@ -0,0 +1,72 @@
import json
from interfaces.dvmtaskinterface import DVMTaskInterface
from utils.admin_utils import AdminConfig
from utils.definitions import EventDefinitions
from utils.dvmconfig import DVMConfig
from utils.nip89_utils import NIP89Config
from utils.mediasource_utils import organize_input_media_data
from utils.output_utils import upload_media_to_hoster
"""
This File contains a Module to call Google Translate Services locally on the DVM Machine
Accepted Inputs: Text, Events, Jobs (Text Extraction, Summary, Translation)
Outputs: Text containing the TranslationGoogle in the desired language.
Params: -language The target language
"""
class MediaConverter(DVMTaskInterface):
KIND = EventDefinitions.KIND_NIP90_CONVERT_VIDEO
TASK = "convert"
FIX_COST = 20
PER_UNIT_COST = 0.1
def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
admin_config: AdminConfig = None, options=None):
super().__init__(name, dvm_config, nip89config, admin_config, options)
def is_input_supported(self, tags):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
input_type = tag.as_vec()[2]
if input_type != "url":
return False
return True
def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None):
request_form = {"jobID": event.id().to_hex()}
url = ""
media_format = "video/mp4"
input_type = "text"
start_time = 0
end_time = 0
# TODO parse start/end parameters
for tag in event.tags():
if tag.as_vec()[0] == 'i':
input_type = tag.as_vec()[2]
if input_type == "url":
url = tag.as_vec()[1]
elif tag.as_vec()[0] == 'param':
param = tag.as_vec()[1]
if param == "format": # check for param type
media_format = tag.as_vec()[2]
filepath = organize_input_media_data(url, input_type, start_time, end_time, dvm_config, client, True, media_format)
options = {
"filepath": filepath
}
request_form['options'] = json.dumps(options)
return request_form
def process(self, request_form):
options = DVMTaskInterface.set_options(request_form)
url = upload_media_to_hoster(options["filepath"])
return url

View File

@ -8,17 +8,16 @@ from backends.nova_server import check_nova_server_status, send_request_to_nova_
from interfaces.dvmtaskinterface import DVMTaskInterface
from utils.admin_utils import AdminConfig
from utils.dvmconfig import DVMConfig
from utils.mediasource_utils import organize_input_data_to_audio
from utils.mediasource_utils import organize_input_media_data
from utils.nip89_utils import NIP89Config
from utils.definitions import EventDefinitions
"""
This File contains a Module to transform Text input on NOVA-Server and receive results back.
This File contains a Module to transform A media file input on NOVA-Server and receive results back.
Accepted Inputs: Url to media file (url)
Outputs: Transcribed text
Accepted Inputs: Prompt (text)
Outputs: An url to an Image
Params: -model # models: juggernaut, dynavision, colossusProject, newreality, unstable
-lora # loras (weights on top of models) voxel,
"""
@ -26,7 +25,7 @@ class SpeechToTextWhisperX(DVMTaskInterface):
KIND: int = EventDefinitions.KIND_NIP90_EXTRACT_TEXT
TASK: str = "speech-to-text"
FIX_COST: float = 10
PER_UNIT_COST: float = 0.1
PER_UNIT_COST: float = 0.1
def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
admin_config: AdminConfig = None, options=None):
@ -42,7 +41,7 @@ class SpeechToTextWhisperX(DVMTaskInterface):
elif tag.as_vec()[0] == 'output':
output = tag.as_vec()[1]
if (output == "" or not (output == "text/plain")):
if output == "" or not (output == "text/plain"):
print("Output format not supported, skipping..")
return False
@ -65,6 +64,7 @@ class SpeechToTextWhisperX(DVMTaskInterface):
input_type = "url"
start_time = 0
end_time = 0
media_format = "audio/mp3"
for tag in event.tags():
if tag.as_vec()[0] == 'i':
@ -78,7 +78,7 @@ class SpeechToTextWhisperX(DVMTaskInterface):
alignment = tag.as_vec()[2]
elif tag.as_vec()[1] == "model":
model = tag.as_vec()[2]
elif tag.as_vec()[1] == "range": #hui
elif tag.as_vec()[1] == "range":
try:
t = time.strptime(tag.as_vec()[2], "%H:%M:%S")
seconds = t.tm_hour * 60 * 60 + t.tm_min * 60 + t.tm_sec
@ -102,14 +102,14 @@ class SpeechToTextWhisperX(DVMTaskInterface):
except:
end_time = float(tag.as_vec()[3])
filepath = organize_input_data_to_audio(url, input_type, start_time, end_time, dvm_config, client)
pathonserver = send_file_to_nova_server(filepath, self.options['nova_server'])
filepath = organize_input_media_data(url, input_type, start_time, end_time, dvm_config, client, True, media_format)
path_on_server = send_file_to_nova_server(os.path.realpath(filepath), self.options['nova_server'])
io_input = {
"id": "audio",
"type": "input",
"src": "file:stream",
"uri": pathonserver
"uri": path_on_server
}
io_output = {

View File

@ -10,7 +10,7 @@ from utils.nostr_utils import get_referenced_event_by_id, get_event_by_id
"""
This File contains a Module to call Google Translate Services locally on the DVM Machine
Accepted Inputs: Text, Events, Jobs (Text Extraction, Summary, TranslationGoogle)
Accepted Inputs: Text, Events, Jobs (Text Extraction, Summary, Translation)
Outputs: Text containing the TranslationGoogle in the desired language.
Params: -language The target language
"""

View File

@ -12,9 +12,11 @@ from utils.nostr_utils import get_referenced_event_by_id, get_event_by_id
"""
This File contains a Module to call Google Translate Services locally on the DVM Machine
Accepted Inputs: Text, Events, Jobs (Text Extraction, Summary, TranslationGoogle)
Outputs: Text containing the TranslationGoogle in the desired language.
Accepted Inputs: Text, Events, Jobs (Text Extraction, Summary, Translation)
Outputs: Text containing the Translation with LibreTranslation in the desired language.
Params: -language The target language
Requires API key or self-hosted instance
"""

View File

@ -148,6 +148,7 @@ def get_amount_per_task(task, dvm_config, duration=1):
for dvm in dvm_config.SUPPORTED_DVMS: # this is currently just one
if dvm.TASK == task:
amount = dvm.FIX_COST + (dvm.PER_UNIT_COST * duration)
print("Cost: " + str(amount))
return amount
else:
print("[" + dvm_config.SUPPORTED_DVMS[

View File

@ -3,34 +3,37 @@ from dataclasses import dataclass
from nostr_sdk import Event
class EventDefinitions:
KIND_DM: int = 4
KIND_ZAP: int = 9735
KIND_ANNOUNCEMENT: int = 31990
KIND_NIP94_METADATA: int = 1063
KIND_FEEDBACK: int = 7000
KIND_DM = 4
KIND_ZAP = 9735
KIND_ANNOUNCEMENT = 31990
KIND_NIP94_METADATA = 1063
KIND_FEEDBACK = 7000
KIND_NIP90_EXTRACT_TEXT = 5000
KIND_NIP90_RESULT_EXTRACT_TEXT = 6000
KIND_NIP90_SUMMARIZE_TEXT = 5001
KIND_NIP90_RESULT_SUMMARIZE_TEXT = 6001
KIND_NIP90_TRANSLATE_TEXT = 5002
KIND_NIP90_RESULT_TRANSLATE_TEXT = 6002
KIND_NIP90_GENERATE_TEXT = 5050
KIND_NIP90_RESULT_GENERATE_TEXT = 6050
KIND_NIP90_GENERATE_IMAGE = 5100
KIND_NIP90_RESULT_GENERATE_IMAGE = 6100
KIND_NIP90_RECOMMEND_NOTES = 65006
KIND_NIP90_RESULT_RECOMMEND_NOTES = 65001
KIND_NIP90_RECOMMEND_USERS = 65007
KIND_NIP90_RESULT_RECOMMEND_USERS = 65001
KIND_NIP90_CONVERT_VIDEO = 5200
KIND_NIP90_RESULT_CONVERT_VIDEO = 6200
KIND_NIP90_CONTENT_DISCOVERY = 5300
KIND_NIP90_RESULT_CONTENT_DISCOVERY = 6300
KIND_NIP90_PEOPLE_DISCOVERY = 5301
KIND_NIP90_GENERIC = 5999
KIND_NIP90_RESULT_GENERIC = 6999
ANY_RESULT = [KIND_NIP90_RESULT_EXTRACT_TEXT,
KIND_NIP90_RESULT_SUMMARIZE_TEXT,
KIND_NIP90_RESULT_TRANSLATE_TEXT,
KIND_NIP90_RESULT_GENERATE_TEXT,
KIND_NIP90_RESULT_GENERATE_IMAGE,
KIND_NIP90_RESULT_RECOMMEND_NOTES,
KIND_NIP90_RESULT_RECOMMEND_USERS,
KIND_NIP90_CONTENT_DISCOVERY,
KIND_NIP90_PEOPLE_DISCOVERY,
KIND_NIP90_RESULT_CONVERT_VIDEO,
KIND_NIP90_RESULT_CONTENT_DISCOVERY,
KIND_NIP90_RESULT_GENERIC]

View File

@ -9,7 +9,7 @@ from utils.nostr_utils import get_event_by_id
def input_data_file_duration(event, dvm_config, client, start=0, end=0):
print("[" + dvm_config.NIP89.NAME + "] Getting Duration of the Media file..")
#print("[" + dvm_config.NIP89.NAME + "] Getting Duration of the Media file..")
input_value = ""
input_type = "url"
for tag in event.tags():
@ -32,7 +32,7 @@ def input_data_file_duration(event, dvm_config, client, start=0, end=0):
if input_type == "url":
source_type = check_source_type(input_value)
filename, start, end, type = get_file_start_end_type(input_value, source_type, start, end)
filename, start, end, type = get_file_start_end_type(input_value, source_type, start, end, True)
if type != "audio" and type != "video":
return 1
if filename == "" or filename is None:
@ -52,7 +52,7 @@ def input_data_file_duration(event, dvm_config, client, start=0, end=0):
return 1
def organize_input_data_to_audio(input_value, input_type, start, end, dvm_config, client) -> str:
def organize_input_media_data(input_value, input_type, start, end, dvm_config, client, process=True, media_format="audio/mp3") -> str:
if input_type == "event": # NIP94 event
evt = get_event_by_id(input_value, client=client, config=dvm_config)
if evt is not None:
@ -60,9 +60,15 @@ def organize_input_data_to_audio(input_value, input_type, start, end, dvm_config
if input_type == "url":
source_type = check_source_type(input_value)
filename, start, end, type = get_file_start_end_type(input_value, source_type, start, end)
audio_only = True
if media_format.split('/')[0] == "video":
audio_only = False
filename, start, end, type = get_file_start_end_type(input_value, source_type, start, end, audio_only)
if filename == "" or filename is None:
return ""
if type != "audio" and type != "video":
return filename
try:
file_reader = AudioReader(filename, ctx=cpu(0), mono=False)
duration = float(file_reader.duration())
@ -76,13 +82,23 @@ def organize_input_data_to_audio(input_value, input_type, start, end, dvm_config
print("New Duration of the Media file: " + str(new_duration))
# TODO if already in a working format and time is 0 0, dont convert
print("Converting from " + str(start_time) + " until " + str(end_time))
# for now, we cut and convert all files to mp3
final_filename = '.\\outputs\\audio.mp3'
print(final_filename)
fs, x = ffmpegio.audio.read(filename, ss=start_time, to=end_time, sample_fmt='dbl', ac=1)
ffmpegio.audio.write(final_filename, fs, x, overwrite=True)
return final_filename
if process:
# for now we cut and convert all files to mp3
file = r'processed.' + str(media_format.split('/')[1])
final_filename = os.path.abspath(os.curdir + r'/outputs/' + file)
if media_format.split('/')[0] == "audio":
print("Converting Audio from " + str(start_time) + " until " + str(end_time))
fs, x = ffmpegio.audio.read(filename, ss=start_time, to=end_time, sample_fmt='dbl', ac=1)
ffmpegio.audio.write(final_filename, fs, x, overwrite=True)
elif media_format.split('/')[0] == "video":
print("Converting Video from " + str(start_time) + " until " + str(end_time))
ffmpegio.transcode(filename, final_filename, overwrite=True, show_log=True)
print(final_filename)
return final_filename
else:
return filename
def check_nip94_event_for_media(evt, input_value, input_type):
@ -112,15 +128,13 @@ def convert_media_length(start: float, end: float, duration: float):
return start_time, end_time, dur
def get_file_start_end_type(url, source_type, start, end) -> (str, str):
def get_file_start_end_type(url, source_type, start, end, audio_only=True) -> (str, str):
# Overcast
if source_type == "overcast":
name, start, end = get_overcast(url, start, end)
return name, start, end, "audio"
# Youtube
elif source_type == "youtube":
audio_only = True
name, start, end = get_youtube(url, start, end, audio_only)
return name, start, end, "audio"
@ -180,7 +194,7 @@ def check_source_type(url):
def get_overcast(input_value, start, end):
filename = '.\\outputs\\' + ".originalaudio.mp3"
filename = os.path.abspath(os.curdir + r'/outputs/originalaudio.mp3')
print("Found overcast.fm Link.. downloading")
start_time = start
end_time = end
@ -200,7 +214,7 @@ def get_overcast(input_value, start, end):
def get_TikTok(input_value, start, end):
filepath = '.\\outputs\\'
filepath = os.path.abspath(os.curdir + r'/outputs/')
try:
filename = downloadTikTok(input_value, filepath)
print(filename)
@ -211,7 +225,7 @@ def get_TikTok(input_value, start, end):
def get_Instagram(input_value, start, end):
filepath = '.\\outputs\\'
filepath = os.path.abspath(os.curdir + r'/outputs/')
try:
filename = downloadInstagram(input_value, filepath)
print(filename)
@ -222,11 +236,10 @@ def get_Instagram(input_value, start, end):
def get_Twitter(input_value, start, end):
filepath = '.\\outputs\\'
filepath = os.path.abspath(os.curdir) + r'/outputs/'
cleanlink = str(input_value).replace("twitter.com", "x.com")
try:
filename = downloadTwitter(cleanlink, filepath)
print(filename)
except Exception as e:
print(e)
return "", start, end
@ -267,39 +280,40 @@ def get_media_link(url) -> (str, str):
if content_type == 'audio/x-wav' or str(url).lower().endswith(".wav"):
ext = "wav"
file_type = "audio"
with open('.\\outputs\\file.' + ext, 'wb') as fd:
with open(os.path.abspath(os.curdir + r'/outputs/' + 'file.' + ext), 'wb') as fd:
fd.write(req.content)
return '.\\outputs\\file.' + ext, file_type
return os.path.abspath(os.curdir + r'/outputs/' + 'file.' + ext), file_type
elif content_type == 'audio/mpeg' or str(url).lower().endswith(".mp3"):
ext = "mp3"
file_type = "audio"
with open('.\\outputs\\file.' + '\\file.' + ext, 'wb') as fd:
with open(os.path.abspath(os.curdir + r'/outputs/' + 'file.' + ext), 'wb') as fd:
fd.write(req.content)
return '.\\outputs\\file.' + ext, file_type
return os.path.abspath(os.curdir + r'/outputs/' + 'file.' + ext), file_type
elif content_type == 'audio/ogg' or str(url).lower().endswith(".ogg"):
ext = "ogg"
file_type = "audio"
with open('.\\outputs\\file.' + ext, 'wb') as fd:
with open(os.path.abspath(os.curdir + r'/outputs/' + 'file.' + ext), 'wb') as fd:
fd.write(req.content)
return '.\\outputs\\file.' + ext, file_type
return os.path.abspath(os.curdir + r'/outputs/' + 'file.' + ext), file_type
elif content_type == 'video/mp4' or str(url).lower().endswith(".mp4"):
ext = "mp4"
file_type = "video"
with open('.\\outputs\\file.' + ext, 'wb') as fd:
with open(os.path.abspath(os.curdir + r'/outputs/' + 'file.' + ext), 'wb') as fd:
fd.write(req.content)
return '.\\outputs\\file.' + ext, file_type
return os.path.abspath(os.curdir + r'/outputs/' + 'file.' + ext), file_type
elif content_type == 'video/avi' or str(url).lower().endswith(".avi"):
ext = "avi"
file_type = "video"
with open('.\\outputs\\file.' + ext, 'wb') as fd:
with open(os.path.abspath(os.curdir + r'/outputs/' + 'file.' + ext), 'wb') as fd:
fd.write(req.content)
return '.\\outputs\\file.' + ext, file_type
return os.path.abspath(os.curdir + r'/outputs/' + 'file.' + ext), file_type
elif content_type == 'video/quicktime' or str(url).lower().endswith(".mov"):
ext = "mov"
file_type = "video"
with open('.\\outputs\\file.' + ext, 'wb') as fd:
with open(os.path.abspath(os.curdir + r'/outputs/' + 'file.' + ext), 'wb') as fd:
fd.write(req.content)
return '.\\outputs\\file.' + ext, file_type
return os.path.abspath(os.curdir + r'/outputs/' + 'file.' + ext), file_type
else:
print(str(url).lower())
@ -332,5 +346,5 @@ def downloadInstagram(videourl, path):
def downloadYouTube(link, path, audioonly=True):
from utils.scrapper.media_scrapper import YouTubeDownload
result = YouTubeDownload(link, path, audio_only=True)
result = YouTubeDownload(link, path, audio_only=audioonly)
return result

View File

@ -14,6 +14,14 @@ class NIP89Config:
CONTENT: str = ""
def nip89_create_d_tag(name, pubkey, image):
import hashlib
m = hashlib.md5()
m.update(str(name + image + pubkey).encode("utf-8"))
d_tag = m.hexdigest()[0:16]
return d_tag
def nip89_announce_tasks(dvm_config, client):
k_tag = Tag.parse(["k", str(dvm_config.NIP89.kind)])
d_tag = Tag.parse(["d", dvm_config.NIP89.dtag])

View File

@ -47,7 +47,9 @@ def send_event(event: Event, client: Client, dvm_config) -> EventId:
for tag in event.tags():
if tag.as_vec()[0] == 'relays':
relays = tag.as_vec()[1].split(',')
for index, param in enumerate(tag.as_vec()):
if index != 0:
relays.append(tag.as_vec()[index])
for relay in relays:
if relay not in dvm_config.RELAY_LIST: