Merge pull request #9 from believethehype/media

Media
This commit is contained in:
believethehype 2023-11-28 17:14:11 +01:00 committed by GitHub
commit ab1524b478
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1502 additions and 252 deletions

BIN
.\outputs\file.mp4 Normal file

Binary file not shown.

5
.gitignore vendored
View File

@ -160,3 +160,8 @@ cython_debug/
#.idea/
.DS_Store
*.db
outputs/yt.mp3
.\outputs\audio.mp3
.\outputs\/yt.mp3
.\outputs\audio.mp3
.\outputs\/yt.mp3

View File

@ -3,6 +3,8 @@ import json
import os
import time
import zipfile
from pathlib import Path
import pandas as pd
import requests
import PIL.Image as Image
@ -36,6 +38,18 @@ def send_request_to_nova_server(request_form, address):
return response.text
def send_file_to_nova_server(filepath, address):
print("Sending file to NOVA-Server")
url = ('http://' + address + '/upload')
fp = open(filepath, 'rb')
response = requests.post(url, files={'file': fp})
result = response.content.decode('utf-8')
print(result)
return result
# headers = {'Content-type': 'application/x-www-form-urlencoded'}
"""
check_nova_server_status(request_form, address)
Function that requests the status of the current process with the jobID (we use the Nostr event as jobID).
@ -44,7 +58,7 @@ We throw an exception on error
"""
def check_nova_server_status(jobID, address):
def check_nova_server_status(jobID, address) -> str | pd.DataFrame:
headers = {'Content-type': 'application/x-www-form-urlencoded'}
url_status = 'http://' + address + '/job_status'
url_log = 'http://' + address + '/log'
@ -68,7 +82,6 @@ def check_nova_server_status(jobID, address):
if status == 2:
try:
result = ""
url_fetch = 'http://' + address + '/fetch_result'
print("Fetching Results from NOVA-Server...")
data = {"jobID": jobID, "delete_after_download": True}
@ -79,10 +92,12 @@ def check_nova_server_status(jobID, address):
image = Image.open(io.BytesIO(response.content))
image.save("./outputs/image.jpg")
result = upload_media_to_hoster("./outputs/image.jpg")
return result
os.remove("./outputs/image.jpg")
elif content_type == 'text/plain; charset=utf-8':
result = response.content.decode('utf-8')
elif content_type == "zip":
return result
elif content_type == "application/x-zip-compressed":
zf = zipfile.ZipFile(io.BytesIO(response.content), "r")
for fileinfo in zf.infolist():
@ -92,14 +107,15 @@ def check_nova_server_status(jobID, address):
columns = ['from', 'to', 'name', 'conf']
result = pd.DataFrame([row.split(';') for row in anno_string.split('\n')],
columns=columns)
print(result)
with open("response.zip", "wb") as f:
f.write(response.content)
#print(str(result))
return result
#with open("response.zip", "wb") as f:
# f.write(response.content)
except Exception as e:
#zf.extractall()
print(e)
return result
except Exception as e:
print("Couldn't fetch result: " + str(e))

18
bot.py
View File

@ -96,15 +96,18 @@ class Bot:
else:
command = decrypted_text.replace(decrypted_text.split(' ')[0] + " ", "")
input = command.split("-")[0].rstrip()
input = command.split(" -")[0].rstrip()
input_type = "text"
if input.startswith("http"):
input_type = "url"
i_tag = Tag.parse(["i", input, "text"])
bid = str(self.dvm_config.SUPPORTED_DVMS[index].COST * 1000)
bid_tag = Tag.parse(['bid', bid, bid])
i_tag = Tag.parse(["i", input, input_type])
#bid = str(self.dvm_config.SUPPORTED_DVMS[index].COST * 1000)
#bid_tag = Tag.parse(['bid', bid, bid])
relays_tag = Tag.parse(["relays", json.dumps(self.dvm_config.RELAY_LIST)])
alt_tag = Tag.parse(["alt", self.dvm_config.SUPPORTED_DVMS[index].TASK])
tags = [i_tag.as_vec(), bid_tag.as_vec(), relays_tag.as_vec(), alt_tag.as_vec()]
tags = [i_tag.as_vec(), relays_tag.as_vec(), alt_tag.as_vec()]
remaining_text = command.replace(input, "")
print(remaining_text)
@ -189,7 +192,7 @@ class Bot:
content = nostr_event.content()
if is_encrypted:
if ptag == self.dvm_config.PUBLIC_KEY:
if ptag == self.keys.public_key().to_hex():
tags_str = nip04_decrypt(Keys.from_sk_str(dvm_config.PRIVATE_KEY).secret_key(),
nostr_event.pubkey(), nostr_event.content())
params = json.loads(tags_str)
@ -309,7 +312,7 @@ class Bot:
self.job_list.remove(entry)
content = nostr_event.content()
if is_encrypted:
if ptag == self.dvm_config.PUBLIC_KEY:
if ptag == self.keys.public_key().to_hex():
content = nip04_decrypt(self.keys.secret_key(), nostr_event.pubkey(), content)
else:
return
@ -333,7 +336,6 @@ class Bot:
self.client, self.dvm_config)
user = get_or_add_user(self.dvm_config.DB, sender, client=self.client, config=self.dvm_config)
print("ZAPED EVENT: " + zapped_event.as_json())
if zapped_event is not None:
if not anon:
print("[" + self.NAME + "] Note Zap received for Bot balance: " + str(

55
dvm.py
View File

@ -1,9 +1,9 @@
import json
import typing
from datetime import timedelta
import pandas as pd
from nostr_sdk import PublicKey, Keys, Client, Tag, Event, EventBuilder, Filter, HandleNotification, Timestamp, \
init_logger, LogLevel, nip04_decrypt, Options, nip04_encrypt
init_logger, LogLevel, Options, nip04_encrypt
import time
@ -12,6 +12,7 @@ from utils.dvmconfig import DVMConfig
from utils.admin_utils import admin_make_database_updates, AdminConfig
from utils.backend_utils import get_amount_per_task, check_task_is_supported, get_task
from utils.database_utils import create_sql_table, get_or_add_user, update_user_balance, update_sql_table
from utils.mediasource_utils import input_data_file_duration
from utils.nostr_utils import get_event_by_id, get_referenced_event_by_id, send_event, check_and_decrypt_tags
from utils.output_utils import post_process_result, build_status_reaction
from utils.zap_utils import check_bolt11_ln_bits_is_paid, create_bolt11_ln_bits, parse_zap_event_tags, redeem_cashu
@ -39,10 +40,8 @@ class DVM:
.skip_disconnected_relays(skip_disconnected_relays))
self.client = Client.with_opts(self.keys, opts)
self.job_list = []
self.jobs_on_hold_list = []
pk = self.keys.public_key()
print("Nostr DVM public key: " + str(pk.to_bech32()) + " Hex: " + str(pk.to_hex()) + " Supported DVM tasks: " +
@ -53,9 +52,6 @@ class DVM:
self.client.connect()
zap_filter = Filter().pubkey(pk).kinds([EventDefinitions.KIND_ZAP]).since(Timestamp.now())
# bot_dm_filter = Filter().pubkey(pk).kinds([EventDefinitions.KIND_DM]).authors(self.dvm_config.DM_ALLOWED).since(
# Timestamp.now())
kinds = [EventDefinitions.KIND_NIP90_GENERIC]
for dvm in self.dvm_config.SUPPORTED_DVMS:
if dvm.KIND not in kinds:
@ -76,8 +72,6 @@ class DVM:
handle_nip90_job_event(nostr_event)
elif nostr_event.kind() == EventDefinitions.KIND_ZAP:
handle_zap(nostr_event)
# elif nostr_event.kind() == EventDefinitions.KIND_DM:
# handle_dm(nostr_event)
def handle_msg(self, relay_url, msg):
return
@ -98,9 +92,9 @@ class DVM:
elif tag.as_vec()[0] == "p":
p_tag_str = tag.as_vec()[1]
task_supported, task, duration = check_task_is_supported(nip90_event, client=self.client,
get_duration=(not user.iswhitelisted),
config=self.dvm_config)
task_supported, task = check_task_is_supported(nip90_event, client=self.client,
config=self.dvm_config)
if user.isblacklisted:
send_job_status_reaction(nip90_event, "error", client=self.client, dvm_config=self.dvm_config)
@ -108,6 +102,8 @@ class DVM:
elif task_supported:
print("[" + self.dvm_config.NIP89.name + "] Received new Request: " + task + " from " + user.name)
duration = input_data_file_duration(nip90_event, dvm_config=self.dvm_config, client=self.client)
print("File Duration: " + str(duration))
amount = get_amount_per_task(task, self.dvm_config, duration)
if amount is None:
return
@ -118,9 +114,8 @@ class DVM:
task_is_free = True
cashu_redeemed = False
cashu_message = ""
if cashu != "":
cashu_redeemed, cashu_message = redeem_cashu(cashu, amount, self.dvm_config, self.client)
cashu_redeemed, cashu_message = redeem_cashu(cashu, int(amount), self.dvm_config, self.client)
if cashu_message != "success":
send_job_status_reaction(nip90_event, "error", False, amount, self.client, cashu_message,
self.dvm_config)
@ -136,10 +131,9 @@ class DVM:
do_work(nip90_event)
# if task is directed to us via p tag and user has balance, do the job and update balance
elif p_tag_str == Keys.from_sk_str(
self.dvm_config.PUBLIC_KEY) and user.balance >= amount:
balance = max(user.balance - amount, 0)
self.dvm_config.PUBLIC_KEY) and user.balance >= int(amount):
balance = max(user.balance - int(amount), 0)
update_sql_table(db=self.dvm_config.DB, npub=user.npub, balance=balance,
iswhitelisted=user.iswhitelisted, isblacklisted=user.isblacklisted,
nip05=user.nip05, lud16=user.lud16, name=user.name,
@ -147,7 +141,6 @@ class DVM:
print(
"[" + self.dvm_config.NIP89.name + "] Using user's balance for task: " + task +
". Starting processing.. New balance is: " + str(balance))
send_job_status_reaction(nip90_event, "processing", True, 0,
@ -167,9 +160,9 @@ class DVM:
+ nip90_event.as_json())
if bid > 0:
bid_offer = int(bid / 1000)
if bid_offer >= amount:
if bid_offer >= int(amount):
send_job_status_reaction(nip90_event, "payment-required", False,
amount, # bid_offer
int(amount), # bid_offer
client=self.client, dvm_config=self.dvm_config)
else: # If there is no bid, just request server rate from user
@ -177,10 +170,10 @@ class DVM:
"[" + self.dvm_config.NIP89.name + "] Requesting payment for Event: " +
nip90_event.id().to_hex())
send_job_status_reaction(nip90_event, "payment-required",
False, amount, client=self.client, dvm_config=self.dvm_config)
False, int(amount), client=self.client, dvm_config=self.dvm_config)
#else:
#print("[" + self.dvm_config.NIP89.name + "] Task " + task + " not supported on this DVM, skipping..")
# else:
# print("[" + self.dvm_config.NIP89.name + "] Task " + task + " not supported on this DVM, skipping..")
def handle_zap(zap_event):
try:
@ -190,7 +183,6 @@ class DVM:
self.client, self.dvm_config)
user = get_or_add_user(db=self.dvm_config.DB, npub=sender, client=self.client, config=self.dvm_config)
if zapped_event is not None:
if zapped_event.kind() == EventDefinitions.KIND_FEEDBACK:
@ -211,10 +203,8 @@ class DVM:
# if a reaction by us got zapped
task_supported, task, duration = check_task_is_supported(job_event,
client=self.client,
get_duration=False,
config=self.dvm_config)
task_supported, task = check_task_is_supported(job_event, client=self.client,
config=self.dvm_config)
if job_event is not None and task_supported:
print("Zap received for NIP90 task: " + str(invoice_amount) + " Sats from " + str(
user.name))
@ -267,8 +257,7 @@ class DVM:
print("[" + self.dvm_config.NIP89.name + "] Error during content decryption: " + str(e))
def check_event_has_not_unfinished_job_input(nevent, append, client, dvmconfig):
task_supported, task, duration = check_task_is_supported(nevent, client, False,
config=dvmconfig)
task_supported, task = check_task_is_supported(nevent, client, config=dvmconfig)
if not task_supported:
return False
@ -322,6 +311,7 @@ class DVM:
break
try:
post_processed_content = post_process_result(data, original_event)
send_nostr_reply_event(post_processed_content, original_event_str)
except Exception as e:
@ -364,7 +354,7 @@ class DVM:
content=None,
dvm_config=None):
task = get_task(original_event, client=client, dvmconfig=dvm_config)
task = get_task(original_event, client=client, dvm_config=dvm_config)
alt_description, reaction = build_status_reaction(status, task, amount, content)
e_tag = Tag.parse(["e", original_event.id().to_hex()])
@ -449,7 +439,7 @@ class DVM:
if ((EventDefinitions.KIND_NIP90_EXTRACT_TEXT <= job_event.kind() <= EventDefinitions.KIND_NIP90_GENERIC)
or job_event.kind() == EventDefinitions.KIND_DM):
task = get_task(job_event, client=self.client, dvmconfig=self.dvm_config)
task = get_task(job_event, client=self.client, dvm_config=self.dvm_config)
for dvm in self.dvm_config.SUPPORTED_DVMS:
try:
@ -459,7 +449,6 @@ class DVM:
result = dvm.process(request_form)
check_and_return_event(result, str(job_event.as_json()))
except Exception as e:
print(e)
send_job_status_reaction(job_event, "error", content=str(e), dvm_config=self.dvm_config)

View File

@ -1,6 +1,8 @@
import json
from threading import Thread
from nostr_sdk import Keys
from utils.admin_utils import AdminConfig
from utils.dvmconfig import DVMConfig
from utils.nip89_utils import NIP89Announcement, NIP89Config
@ -11,7 +13,7 @@ class DVMTaskInterface:
NAME: str
KIND: int
TASK: str
COST: int
COST: float
PRIVATE_KEY: str
PUBLIC_KEY: str
DVM = DVM
@ -26,6 +28,8 @@ class DVMTaskInterface:
def init(self, name, dvm_config, admin_config=None, nip89config=None):
self.NAME = name
self.PRIVATE_KEY = dvm_config.PRIVATE_KEY
if dvm_config.PUBLIC_KEY == "" or dvm_config.PUBLIC_KEY is None:
dvm_config.PUBLIC_KEY = Keys.from_sk_str(dvm_config.PRIVATE_KEY).public_key().to_hex()
self.PUBLIC_KEY = dvm_config.PUBLIC_KEY
if dvm_config.COST is not None:
self.COST = dvm_config.COST
@ -50,10 +54,12 @@ class DVMTaskInterface:
nip89.content = nip89config.CONTENT
return nip89
def is_input_supported(self, input_type, input_content) -> bool:
def is_input_supported(self, tags) -> bool:
"""Check if input is supported for current Task."""
pass
def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None) -> dict:
"""Parse input into a request form that will be given to the process method"""
pass
@ -69,5 +75,4 @@ class DVMTaskInterface:
if request_form.get("options"):
opts = json.loads(request_form["options"])
print(opts)
return dict(opts)

13
main.py
View File

@ -9,7 +9,8 @@ import dotenv
from nostr_sdk import Keys
from bot import Bot
from playground import build_pdf_extractor, build_translator, build_unstable_diffusion, build_sketcher, build_dalle
from playground import build_pdf_extractor, build_translator, build_unstable_diffusion, build_sketcher, build_dalle, \
build_whisperx
from utils.dvmconfig import DVMConfig
@ -18,7 +19,6 @@ def run_nostr_dvm_with_local_config():
# Note this is very basic for now and still under development
bot_config = DVMConfig()
bot_config.PRIVATE_KEY = os.getenv("BOT_PRIVATE_KEY")
bot_config.PUBLIC_KEY = Keys.from_sk_str(bot_config.PRIVATE_KEY).public_key().to_hex()
bot_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY")
bot_config.LNBITS_ADMIN_KEY = os.getenv("LNBITS_ADMIN_KEY") # The bot will forward zaps for us, use responsibly
bot_config.LNBITS_URL = os.getenv("LNBITS_HOST")
@ -49,6 +49,13 @@ def run_nostr_dvm_with_local_config():
bot_config.SUPPORTED_DVMS.append(sketcher) # We also add Sketcher to the bot
sketcher.run()
if os.getenv("NOVA_SERVER") is not None and os.getenv("NOVA_SERVER") != "":
whisperer = build_whisperx("Whisperer")
bot_config.SUPPORTED_DVMS.append(whisperer) # We also add Sketcher to the bot
whisperer.run()
# Spawn DVM5, this one requires an OPENAI API Key and balance with OpenAI, you will move the task to them and pay
# per call. Make sure you have enough balance and the DVM's cost is set higher than what you pay yourself, except, you know,
# you're being generous.
@ -60,7 +67,7 @@ def run_nostr_dvm_with_local_config():
bot = Bot(bot_config)
bot.run()
# Keep the main function alive for libraries like openai
# Keep the main function alive for libraries that require it, like openai
try:
while True:
time.sleep(10)

BIN
outputs/image.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 62 KiB

BIN
outputs/test.mp4 Normal file

Binary file not shown.

View File

@ -6,6 +6,7 @@ from nostr_sdk import PublicKey, Keys
from interfaces.dvmtaskinterface import DVMTaskInterface
from tasks.imagegeneration_openai_dalle import ImageGenerationDALLE
from tasks.imagegeneration_sdxl import ImageGenerationSDXL
from tasks.textextraction_whisperx import SpeechToTextWhisperX
from tasks.textextractionpdf import TextExtractionPDF
from tasks.translation import Translation
from utils.admin_utils import AdminConfig
@ -43,7 +44,6 @@ admin_config.REBROADCAST_NIP89 = False
def build_pdf_extractor(name):
dvm_config = DVMConfig()
dvm_config.PRIVATE_KEY = os.getenv("NOSTR_PRIVATE_KEY")
dvm_config.PUBLIC_KEY = Keys.from_sk_str(dvm_config.PRIVATE_KEY).public_key().to_hex()
dvm_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY")
dvm_config.LNBITS_URL = os.getenv("LNBITS_HOST")
# Add NIP89
@ -65,7 +65,6 @@ def build_pdf_extractor(name):
def build_translator(name):
dvm_config = DVMConfig()
dvm_config.PRIVATE_KEY = os.getenv("NOSTR_PRIVATE_KEY")
dvm_config.PUBLIC_KEY = Keys.from_sk_str(dvm_config.PRIVATE_KEY).public_key().to_hex()
dvm_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY")
dvm_config.LNBITS_URL = os.getenv("LNBITS_HOST")
@ -98,7 +97,6 @@ def build_translator(name):
def build_unstable_diffusion(name):
dvm_config = DVMConfig()
dvm_config.PRIVATE_KEY = os.getenv("NOSTR_PRIVATE_KEY")
dvm_config.PUBLIC_KEY = Keys.from_sk_str(dvm_config.PRIVATE_KEY).public_key().to_hex()
dvm_config.LNBITS_INVOICE_KEY = "" #This one will not use Lnbits to create invoices, but rely on zaps
dvm_config.LNBITS_URL = ""
@ -128,11 +126,42 @@ def build_unstable_diffusion(name):
return ImageGenerationSDXL(name=name, dvm_config=dvm_config, nip89config=nip89config,
admin_config=admin_config, options=options)
def build_whisperx(name):
dvm_config = DVMConfig()
dvm_config.PRIVATE_KEY = os.getenv("NOSTR_PRIVATE_KEY4")
dvm_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY")
dvm_config.LNBITS_URL = os.getenv("LNBITS_HOST")
# A module might have options it can be initialized with, here we set a default model, and the nova-server
# address it should use. These parameters can be freely defined in the task component
options = {'default_model': "base", 'nova_server': os.getenv("NOVA_SERVER")}
nip90params = {
"model": {
"required": False,
"values": ["base","tiny","small","medium","large-v1","large-v2","tiny.en","base.en","small.en","medium.en"]
},
"alignment": {
"required": False,
"values": ["raw", "segment","word"]
}
}
nip89info = {
"name": name,
"image": "https://image.nostr.build/c33ca6fc4cc038ca4adb46fdfdfda34951656f87ee364ef59095bae1495ce669.jpg",
"about": "I am a test dvm to extract text from media files (very beta)",
"nip90Params": nip90params
}
nip89config = NIP89Config()
nip89config.DTAG = os.getenv("TASK_SPEECH_TO_TEXT_NIP89")
nip89config.CONTENT = json.dumps(nip89info)
return SpeechToTextWhisperX(name=name, dvm_config=dvm_config, nip89config=nip89config,
admin_config=admin_config, options=options)
def build_sketcher(name):
dvm_config = DVMConfig()
dvm_config.PRIVATE_KEY = os.getenv("NOSTR_PRIVATE_KEY2")
dvm_config.PUBLIC_KEY = Keys.from_sk_str(dvm_config.PRIVATE_KEY).public_key().to_hex()
dvm_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY")
dvm_config.LNBITS_URL = os.getenv("LNBITS_HOST")
@ -168,7 +197,6 @@ def build_sketcher(name):
def build_dalle(name):
dvm_config = DVMConfig()
dvm_config.PRIVATE_KEY = os.getenv("NOSTR_PRIVATE_KEY3")
dvm_config.PUBLIC_KEY = Keys.from_sk_str(dvm_config.PRIVATE_KEY).public_key().to_hex()
dvm_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY")
dvm_config.LNBITS_URL = os.getenv("LNBITS_HOST")
profit_in_sats = 10

View File

@ -1,35 +1,58 @@
anyio==3.7.1
beautifulsoup4==4.12.2
bech32==1.2.0
bitarray==2.8.3
bitstring==4.1.3
blessed==1.20.0
cassidy==0.1.4
certifi==2023.7.22
charset-normalizer==3.3.2
click==8.1.7
distro==1.8.0
emoji==2.8.0
enumb==0.1.5
eva-decord==0.6.1
exceptiongroup==1.2.0
expo==0.1.2
ffmpegio==0.8.5
ffmpegio-core==0.8.5
h11==0.14.0
httpcore==1.0.2
httpx==0.25.1
idna==3.4
inquirer==3.1.3
install==1.3.5
instaloader==4.10.1
lnurl==0.4.1
mediatype==0.1.6
nostr-sdk==0.0.5
numpy==1.26.2
openai==1.3.5
packaging==23.2
pandas==2.1.3
Pillow==10.1.0
pluggy==1.3.0
pycryptodome==3.19.0
pydantic==1.10.13
pydantic_core==2.14.5
pypdf==3.17.1
python-dateutil==2.8.2
python-dotenv==1.0.0
python-editor==1.0.4
pytube==15.0.0
pytz==2023.3.post1
PyUpload~=0.1.4
PyUpload==0.1.4
pyuseragents==1.0.5
readchar==4.0.5
requests==2.31.0
requests-toolbelt==1.0.0
safeIO==1.2
six==1.16.0
sniffio==1.3.0
soupsieve==2.5
tqdm==4.66.1
translatepy==2.3
typing_extensions==4.8.0
tzdata==2023.3
urllib3==2.1.0
wcwidth==0.2.10

View File

@ -25,15 +25,28 @@ Params: -model # models: juggernaut, dynavision, colossusProject, newrea
class ImageGenerationDALLE(DVMTaskInterface):
KIND: int = EventDefinitions.KIND_NIP90_GENERATE_IMAGE
TASK: str = "text-to-image"
COST: int = 120
COST: float = 120
def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
admin_config: AdminConfig = None, options=None):
super().__init__(name, dvm_config, nip89config, admin_config, options)
def is_input_supported(self, input_type, input_content):
if input_type != "text":
return False
def is_input_supported(self, tags):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
input_type = tag.as_vec()[2]
if input_type != "text":
return False
elif tag.as_vec()[0] == 'output':
output = tag.as_vec()[1]
if (output == "" or
not (output == "image/png" or "image/jpg"
or output == "image/png;format=url" or output == "image/jpg;format=url")):
print("Output format not supported, skipping..")
return False
return True
def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None):

View File

@ -2,12 +2,11 @@ import json
from multiprocessing.pool import ThreadPool
from backends.nova_server import check_nova_server_status, send_request_to_nova_server
from dvm import DVM
from interfaces.dvmtaskinterface import DVMTaskInterface
from utils.admin_utils import AdminConfig
from utils.definitions import EventDefinitions
from utils.dvmconfig import DVMConfig
from utils.nip89_utils import NIP89Config
from utils.definitions import EventDefinitions
"""
This File contains a Module to transform Text input on NOVA-Server and receive results back.
@ -22,15 +21,28 @@ Params: -model # models: juggernaut, dynavision, colossusProject, newrea
class ImageGenerationSDXL(DVMTaskInterface):
KIND: int = EventDefinitions.KIND_NIP90_GENERATE_IMAGE
TASK: str = "text-to-image"
COST: int = 50
COST: float = 50
def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
admin_config: AdminConfig = None, options=None):
super().__init__(name, dvm_config, nip89config, admin_config, options)
def is_input_supported(self, input_type, input_content):
if input_type != "text":
return False
def is_input_supported(self, tags):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
input_type = tag.as_vec()[2]
if input_type != "text":
return False
elif tag.as_vec()[0] == 'output':
output = tag.as_vec()[1]
if (output == "" or
not (output == "image/png" or "image/jpg"
or output == "image/png;format=url" or output == "image/jpg;format=url")):
print("Output format not supported, skipping..")
return False
return True
def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None):
@ -126,11 +138,6 @@ class ImageGenerationSDXL(DVMTaskInterface):
}
request_form['options'] = json.dumps(options)
# old format, deprecated, will remove
request_form["optStr"] = ('model=' + model + ';ratio=' + str(ratio_width) + '-' + str(ratio_height) + ';size=' +
str(width) + '-' + str(height) + ';strength=' + str(strength) + ';guidance_scale=' +
str(guidance_scale) + ';lora=' + lora + ';lora_weight=' + lora_weight)
return request_form
def process(self, request_form):
@ -144,7 +151,7 @@ class ImageGenerationSDXL(DVMTaskInterface):
thread = pool.apply_async(check_nova_server_status, (request_form['jobID'], self.options['nova_server']))
print("Wait for results of NOVA-Server...")
result = thread.get()
return str(result)
return result
except Exception as e:
raise Exception(e)

View File

@ -0,0 +1,143 @@
import json
import os
import time
from multiprocessing.pool import ThreadPool
from pathlib import Path
from backends.nova_server import check_nova_server_status, send_request_to_nova_server, send_file_to_nova_server
from interfaces.dvmtaskinterface import DVMTaskInterface
from utils.admin_utils import AdminConfig
from utils.dvmconfig import DVMConfig
from utils.mediasource_utils import organize_input_data
from utils.nip89_utils import NIP89Config
from utils.definitions import EventDefinitions
"""
This File contains a Module to transform Text input on NOVA-Server and receive results back.
Accepted Inputs: Prompt (text)
Outputs: An url to an Image
Params: -model # models: juggernaut, dynavision, colossusProject, newreality, unstable
-lora # loras (weights on top of models) voxel,
"""
class SpeechToTextWhisperX(DVMTaskInterface):
KIND: int = EventDefinitions.KIND_NIP90_EXTRACT_TEXT
TASK: str = "speech-to-text"
COST: float = 0.1
def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
admin_config: AdminConfig = None, options=None):
super().__init__(name, dvm_config, nip89config, admin_config, options)
def is_input_supported(self, tags):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
input_type = tag.as_vec()[2]
if input_type != "url":
return False
elif tag.as_vec()[0] == 'output':
output = tag.as_vec()[1]
if (output == "" or not (output == "text/plain")):
print("Output format not supported, skipping..")
return False
return True
def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None):
request_form = {"jobID": event.id().to_hex() + "_" + self.NAME.replace(" ", ""),
"trainerFilePath": 'modules\\whisperx\\whisperx_transcript.trainer'}
if self.options.get("default_model"):
model = self.options['default_model']
else:
model = "base"
if self.options.get("alignment"):
alignment = self.options['alignment']
else:
alignment = "raw"
url = ""
input_type = "url"
start_time = 0
end_time = 0
for tag in event.tags():
if tag.as_vec()[0] == 'i':
input_type = tag.as_vec()[2]
if input_type == "url":
url = tag.as_vec()[1]
elif tag.as_vec()[0] == 'param':
print("Param: " + tag.as_vec()[1] + ": " + tag.as_vec()[2])
if tag.as_vec()[1] == "alignment":
alignment = tag.as_vec()[2]
elif tag.as_vec()[1] == "model":
model = tag.as_vec()[2]
elif tag.as_vec()[1] == "range": #hui
try:
t = time.strptime(tag.as_vec()[2], "%H:%M:%S")
seconds = t.tm_hour * 60 * 60 + t.tm_min * 60 + t.tm_sec
start_time = float(seconds)
except:
try:
t = time.strptime(tag.as_vec()[2], "%M:%S")
seconds = t.tm_min * 60 + t.tm_sec
start_time = float(seconds)
except:
start_time = tag.as_vec()[2]
try:
t = time.strptime(tag.as_vec()[3], "%H:%M:%S")
seconds = t.tm_hour * 60 * 60 + t.tm_min * 60 + t.tm_sec
end_time = float(seconds)
except:
try:
t = time.strptime(tag.as_vec()[3], "%M:%S")
seconds = t.tm_min * 60 + t.tm_sec
end_time = float(seconds)
except:
end_time = float(tag.as_vec()[3])
filepath = organize_input_data(url, input_type, start_time, end_time, dvm_config, client)
pathonserver = send_file_to_nova_server(filepath, self.options['nova_server'])
io_input = {
"id": "audio",
"type": "input",
"src": "file:stream",
"uri": pathonserver
}
io_output = {
"id": "transcript",
"type": "output",
"src": "request:annotation:free"
}
request_form['data'] = json.dumps([io_input, io_output])
options = {
"model": model,
"alignment_mode": alignment,
}
request_form['options'] = json.dumps(options)
return request_form
def process(self, request_form):
try:
# Call the process route of NOVA-Server with our request form.
response = send_request_to_nova_server(request_form, self.options['nova_server'])
if bool(json.loads(response)['success']):
print("Job " + request_form['jobID'] + " sent to NOVA-server")
pool = ThreadPool(processes=1)
thread = pool.apply_async(check_nova_server_status, (request_form['jobID'], self.options['nova_server']))
print("Wait for results of NOVA-Server...")
result = thread.get()
return result
except Exception as e:
raise Exception(e)

View File

@ -23,16 +23,19 @@ Params: None
class TextExtractionPDF(DVMTaskInterface):
KIND: int = EventDefinitions.KIND_NIP90_EXTRACT_TEXT
TASK: str = "pdf-to-text"
COST: int = 0
COST: float = 0
def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
admin_config: AdminConfig = None, options=None):
super().__init__(name, dvm_config, nip89config, admin_config, options)
def is_input_supported(self, input_type, input_content):
if input_type != "url" and input_type != "event":
return False
def is_input_supported(self, tags):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
input_type = tag.as_vec()[2]
if input_type != "url" and input_type != "event":
return False
return True
def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None):

View File

@ -21,17 +21,21 @@ Params: -language The target language
class Translation(DVMTaskInterface):
KIND: int = EventDefinitions.KIND_NIP90_TRANSLATE_TEXT
TASK: str = "translation"
COST: int = 0
COST: float = 0
def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
admin_config: AdminConfig = None, options=None):
super().__init__(name, dvm_config, nip89config, admin_config, options)
def is_input_supported(self, input_type, input_content):
if input_type != "event" and input_type != "job" and input_type != "text":
return False
if input_type != "text" and len(input_content) > 4999:
return False
def is_input_supported(self, tags):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
input_type = tag.as_vec()[2]
if input_type != "event" and input_type != "job" and input_type != "text":
return False
if input_type != "text" and len(input_value) > 4999:
return False
return True
def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None):

View File

@ -4,69 +4,76 @@ import requests
from nostr_sdk import Event, Tag
from utils.definitions import EventDefinitions
from utils.mediasource_utils import check_source_type, media_source
from utils.nostr_utils import get_event_by_id
def get_task(event, client, dvmconfig):
if event.kind() == EventDefinitions.KIND_NIP90_GENERIC: # use this for events that have no id yet
for tag in event.tags():
if tag.as_vec()[0] == 'j':
return tag.as_vec()[1]
else:
return "unknown job: " + event.as_json()
elif event.kind() == EventDefinitions.KIND_DM: # dm
for tag in event.tags():
if tag.as_vec()[0] == 'j':
return tag.as_vec()[1]
else:
return "unknown job: " + event.as_json()
def get_task(event, client, dvm_config):
try:
if event.kind() == EventDefinitions.KIND_NIP90_GENERIC: # use this for events that have no id yet, inclufr j tag
for tag in event.tags():
if tag.as_vec()[0] == 'j':
return tag.as_vec()[1]
else:
return "unknown job: " + event.as_json()
elif event.kind() == EventDefinitions.KIND_DM: # dm
for tag in event.tags():
if tag.as_vec()[0] == 'j':
return tag.as_vec()[1]
else:
return "unknown job: " + event.as_json()
# This looks a bit more complicated, but we do several tasks for text-extraction in the future
elif event.kind() == EventDefinitions.KIND_NIP90_EXTRACT_TEXT:
for tag in event.tags:
if tag.as_vec()[0] == "i":
if tag.as_vec()[2] == "url":
file_type = check_url_is_readable(tag.as_vec()[1])
if file_type == "pdf":
return "pdf-to-text"
# This looks a bit more complicated, but we do several tasks for text-extraction in the future
elif event.kind() == EventDefinitions.KIND_NIP90_EXTRACT_TEXT:
for tag in event.tags():
if tag.as_vec()[0] == "i":
if tag.as_vec()[2] == "url":
file_type = check_url_is_readable(tag.as_vec()[1])
print(file_type)
if file_type == "pdf":
return "pdf-to-text"
elif file_type == "audio" or file_type == "video":
return "speech-to-text"
else:
return "unknown job"
elif tag.as_vec()[2] == "event":
evt = get_event_by_id(tag.as_vec()[1], client=client, config=dvm_config)
if evt is not None:
if evt.kind() == 1063:
for tg in evt.tags():
if tg.as_vec()[0] == 'url':
file_type = check_url_is_readable(tg.as_vec()[1])
if file_type == "pdf":
return "pdf-to-text"
elif file_type == "audio" or file_type == "video":
return "speech-to-text"
else:
return "unknown job"
else:
return "unknown type"
else:
return "unknown job"
elif tag.as_vec()[2] == "event":
evt = get_event_by_id(tag.as_vec()[1], client=client, config=dvmconfig)
if evt is not None:
if evt.kind() == 1063:
for tg in evt.tags():
if tg.as_vec()[0] == 'url':
file_type = check_url_is_readable(tg.as_vec()[1])
if file_type == "pdf":
return "pdf-to-text"
else:
return "unknown job"
else:
return "unknown type"
elif event.kind() == EventDefinitions.KIND_NIP90_TRANSLATE_TEXT:
return "translation"
elif event.kind() == EventDefinitions.KIND_NIP90_GENERATE_IMAGE:
return "text-to-image"
# TODO if a task can consist of multiple inputs add them here
# else if kind is supported, simply return task
else:
else:
return "unknown type"
for dvm in dvm_config.SUPPORTED_DVMS:
if dvm.KIND == event.kind():
return dvm.TASK
except Exception as e:
print("Get task: " + str(e))
return "unknown type"
def check_task_is_supported(event: Event, client, get_duration=False, config=None):
def is_input_supported_generic(tags, client, dvm_config) -> bool:
try:
dvm_config = config
input_value = ""
input_type = ""
duration = 1
task = get_task(event, client=client, dvmconfig=dvm_config)
for tag in event.tags():
for tag in tags:
if tag.as_vec()[0] == 'i':
if len(tag.as_vec()) < 3:
print("Job Event missing/malformed i tag, skipping..")
return False, "", 0
return False
else:
input_value = tag.as_vec()[1]
input_type = tag.as_vec()[2]
@ -74,32 +81,32 @@ def check_task_is_supported(event: Event, client, get_duration=False, config=Non
evt = get_event_by_id(input_value, client=client, config=dvm_config)
if evt is None:
print("Event not found")
return False, "", 0
elif input_type == 'url' and check_url_is_readable(input_value) is None:
print("Url not readable / supported")
return False, task, duration #
return False
# TODO check_url_is_readable might be more relevant per task in the future
# if input_type == 'url' and check_url_is_readable(input_value) is None:
# print("Url not readable / supported")
# return False
elif tag.as_vec()[0] == 'output':
# TODO move this to individual modules
output = tag.as_vec()[1]
if not (output == "text/plain"
or output == "text/json" or output == "json"
or output == "image/png" or "image/jpg"
or output == "image/png;format=url" or output == "image/jpg;format=url"
or output == ""):
print("Output format not supported, skipping..")
return False, "", 0
return True
except Exception as e:
print("Generic input check: " + str(e))
def check_task_is_supported(event: Event, client, config=None):
try:
dvm_config = config
task = get_task(event, client=client, dvm_config=dvm_config)
if task not in (x.TASK for x in dvm_config.SUPPORTED_DVMS):
return False, task
if not is_input_supported_generic(event.tags(), client, dvm_config):
return False, ""
for dvm in dvm_config.SUPPORTED_DVMS:
if dvm.TASK == task:
if not dvm.is_input_supported(input_type, event.content()):
return False, task, duration
if not dvm.is_input_supported(event.tags()):
return False, task
if task not in (x.TASK for x in dvm_config.SUPPORTED_DVMS):
return False, task, duration
return True, task, duration
return True, task
except Exception as e:
@ -109,27 +116,37 @@ def check_task_is_supported(event: Event, client, get_duration=False, config=Non
def check_url_is_readable(url):
if not str(url).startswith("http"):
return None
# If link is comaptible with one of these file formats, move on.
req = requests.get(url)
content_type = req.headers['content-type']
if content_type == 'audio/x-wav' or str(url).endswith(".wav") or content_type == 'audio/mpeg' or str(url).endswith(
".mp3") or content_type == 'audio/ogg' or str(url).endswith(".ogg"):
return "audio"
elif (content_type == 'image/png' or str(url).endswith(".png") or content_type == 'image/jpg' or str(url).endswith(
".jpg") or content_type == 'image/jpeg' or str(url).endswith(".jpeg") or content_type == 'image/png' or
str(url).endswith(".png")):
return "image"
elif content_type == 'video/mp4' or str(url).endswith(".mp4") or content_type == 'video/avi' or str(url).endswith(
".avi") or content_type == 'video/mov' or str(url).endswith(".mov"):
return "video"
elif (str(url)).endswith(".pdf"):
return "pdf"
source = check_source_type(url)
type = media_source(source)
if type == "url":
# If link is comaptible with one of these file formats, move on.
req = requests.get(url)
content_type = req.headers['content-type']
if content_type == 'audio/x-wav' or str(url).endswith(".wav") or content_type == 'audio/mpeg' or str(url).endswith(
".mp3") or content_type == 'audio/ogg' or str(url).endswith(".ogg"):
return "audio"
elif (content_type == 'image/png' or str(url).endswith(".png") or content_type == 'image/jpg' or str(url).endswith(
".jpg") or content_type == 'image/jpeg' or str(url).endswith(".jpeg") or content_type == 'image/png' or
str(url).endswith(".png")):
return "image"
elif content_type == 'video/mp4' or str(url).endswith(".mp4") or content_type == 'video/avi' or str(url).endswith(
".avi") or content_type == 'video/mov' or str(url).endswith(".mov"):
return "video"
elif (str(url)).endswith(".pdf"):
return "pdf"
else:
return type
# Otherwise we will not offer to do the job.
return None
def get_amount_per_task(task, dvm_config, duration=1):
# duration is either static 1 (for images etc) or in seconds
if duration == 0:
duration = 1
for dvm in dvm_config.SUPPORTED_DVMS: # this is currently just one
if dvm.TASK == task:
amount = dvm.COST * duration

331
utils/mediasource_utils.py Normal file
View File

@ -0,0 +1,331 @@
import os
import urllib
from datetime import time
from urllib.parse import urlparse
import ffmpegio
from decord import AudioReader, cpu
import requests
from utils.nostr_utils import get_event_by_id
def input_data_file_duration(event, dvm_config, client, start=0, end=0):
print("[" + dvm_config.NIP89.name + "] Getting Duration of the Media file..")
input_value = ""
input_type = "url"
for tag in event.tags():
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
input_type = tag.as_vec()[2]
if input_type == "event": # NIP94 event
evt = get_event_by_id(input_value, client=client, config=dvm_config)
if evt is not None:
input_value, input_type = check_nip94_event_for_media(evt, input_value, input_type)
if input_type == "url":
source_type = check_source_type(input_value)
filename, start, end, type = get_file_start_end_type(input_value, source_type, start, end)
if type != "audio" and type != "video":
return 1
if filename == "" or filename is None:
return 0
try:
file_reader = AudioReader(filename, ctx=cpu(0), mono=False)
duration = float(file_reader.duration())
except Exception as e:
print(e)
return 0
print("Original Duration of the Media file: " + str(duration))
start_time, end_time, new_duration = (
convert_media_length(start, end, duration))
print("New Duration of the Media file: " + str(new_duration))
return new_duration
return 1
def organize_input_data(input_value, input_type, start, end, dvm_config, client, process=True) -> str:
if input_type == "event": # NIP94 event
evt = get_event_by_id(input_value, client=client, config=dvm_config)
if evt is not None:
input_value, input_type = check_nip94_event_for_media(evt, input_value, input_type)
if input_type == "url":
source_type = check_source_type(input_value)
filename, start, end, type = get_file_start_end_type(input_value, source_type, start, end)
if filename == "" or filename is None:
return ""
try:
file_reader = AudioReader(filename, ctx=cpu(0), mono=False)
duration = float(file_reader.duration())
except Exception as e:
print(e)
return ""
print("Original Duration of the Media file: " + str(duration))
start_time, end_time, new_duration = (
convert_media_length(start, end, duration))
print("New Duration of the Media file: " + str(new_duration))
# TODO if already in a working format and time is 0 0, dont convert
print("Converting from " + str(start_time) + " until " + str(end_time))
# for now, we cut and convert all files to mp3
final_filename = '.\\outputs\\audio.mp3'
print(final_filename)
fs, x = ffmpegio.audio.read(filename, ss=start_time, to=end_time, sample_fmt='dbl', ac=1)
ffmpegio.audio.write(final_filename, fs, x, overwrite=True)
return final_filename
def check_nip94_event_for_media(evt, input_value, input_type):
# Parse NIP94 event for url, if found, use it.
if evt.kind() == 1063:
for tag in evt.tags():
if tag.as_vec()[0] == 'url':
input_type = "url"
input_value = tag.as_vec()[1]
return input_value, input_type
return input_value, input_type
def convert_media_length(start: float, end: float, duration: float):
if end == 0.0:
end_time = duration
elif end > duration:
end_time = duration
else:
end_time = end
if start <= 0.0 or start > end_time:
start_time = 0.0
else:
start_time = start
dur = end_time - start_time
return start_time, end_time, dur
def get_file_start_end_type(url, source_type, start, end) -> (str, str):
# Overcast
if source_type == "overcast":
name, start, end = get_overcast(url, start, end)
return name, start, end, "audio"
# Youtube
elif source_type == "youtube":
audio_only = True
name, start, end = get_youtube(url, start, end, audio_only)
return name, start, end, "audio"
# Xitter
elif source_type == "xitter":
name, start, end = get_Twitter(url, start, end)
return name, start, end, "video"
# Tiktok
elif source_type == "tiktok":
name, start, end = get_TikTok(url, start, end)
return name, start, end, "video"
# Instagram
elif source_type == "instagram":
name, start, end = get_Instagram(url, start, end)
if name.endswith("jpg"):
type = "image"
else:
type = "video"
return name, start, end, type
# A file link
else:
filename, filetype = get_media_link(url)
return filename, start, end, filetype
def media_source(source_type):
if source_type == "overcast":
return "audio"
elif source_type == "youtube":
return "audio"
elif source_type == "xitter":
return "video"
elif source_type == "tiktok":
return "video"
elif source_type == "instagram":
return "video"
else:
return "url"
def check_source_type(url):
if str(url).startswith("https://overcast.fm/"):
return "overcast"
elif str(url).replace("http://", "").replace("https://", "").replace(
"www.", "").replace("youtu.be/", "youtube.com?v=")[0:11] == "youtube.com":
return "youtube"
elif str(url).startswith("https://x.com") or str(url).startswith("https://twitter.com"):
return "xitter"
elif str(url).startswith("https://vm.tiktok.com") or str(url).startswith(
"https://www.tiktok.com") or str(url).startswith("https://m.tiktok.com"):
return "tiktok"
elif str(url).startswith("https://www.instagram.com") or str(url).startswith(
"https://instagram.com"):
return "instagram"
else:
return "url"
def get_overcast(input_value, start, end):
filename = '.\\outputs\\' + ".originalaudio.mp3"
print("Found overcast.fm Link.. downloading")
start_time = start
end_time = end
downloadOvercast(input_value, filename)
finaltag = str(input_value).replace("https://overcast.fm/", "").split('/')
if start == 0.0:
if len(finaltag) > 1:
t = time.strptime(finaltag[1], "%H:%M:%S")
seconds = t.tm_hour * 60 * 60 + t.tm_min * 60 + t.tm_sec
start_time = float(seconds)
print("Setting start time automatically to " + str(start_time))
if end > 0.0:
end_time = float(seconds + end)
print("Moving end time automatically to " + str(end_time))
return filename, start_time, end_time
def get_TikTok(input_value, start, end):
filepath = '.\\outputs\\'
try:
filename = downloadTikTok(input_value, filepath)
print(filename)
except Exception as e:
print(e)
return "", start, end
return filename, start, end
def get_Instagram(input_value, start, end):
filepath = '.\\outputs\\'
try:
filename = downloadInstagram(input_value, filepath)
print(filename)
except Exception as e:
print(e)
return "", start, end
return filename, start, end
def get_Twitter(input_value, start, end):
filepath = '.\\outputs\\'
cleanlink = str(input_value).replace("twitter.com", "x.com")
try:
filename = downloadTwitter(cleanlink, filepath)
print(filename)
except Exception as e:
print(e)
return "", start, end
return filename, start, end
def get_youtube(input_value, start, end, audioonly=True):
filepath = os.path.abspath(os.curdir) + r'/outputs/'
filename = ""
try:
filename = downloadYouTube(input_value, filepath, audioonly)
except Exception as e:
print("Youtube" + str(e))
return filename, start, end
try:
o = urlparse(input_value)
q = urllib.parse.parse_qs(o.query)
if start == 0.0:
if o.query.find('?t=') != -1:
start = q['t'][0] # overwrite from link.. why not..
print("Setting start time automatically to " + start)
if end > 0.0:
end = float(q['t'][0]) + end
print("Moving end time automatically to " + str(end))
except Exception as e:
print(e)
return filename, start, end
return filename, start, end
def get_media_link(url) -> (str, str):
req = requests.get(url)
content_type = req.headers['content-type']
print(content_type)
if content_type == 'audio/x-wav' or str(url).lower().endswith(".wav"):
ext = "wav"
file_type = "audio"
with open('.\\outputs\\file.' + ext, 'wb') as fd:
fd.write(req.content)
return '.\\outputs\\file.' + ext, file_type
elif content_type == 'audio/mpeg' or str(url).lower().endswith(".mp3"):
ext = "mp3"
file_type = "audio"
with open('.\\outputs\\file.' + '\\file.' + ext, 'wb') as fd:
fd.write(req.content)
return '.\\outputs\\file.' + ext, file_type
elif content_type == 'audio/ogg' or str(url).lower().endswith(".ogg"):
ext = "ogg"
file_type = "audio"
with open('.\\outputs\\file.' + ext, 'wb') as fd:
fd.write(req.content)
return '.\\outputs\\file.' + ext, file_type
elif content_type == 'video/mp4' or str(url).lower().endswith(".mp4"):
ext = "mp4"
file_type = "video"
with open('.\\outputs\\file.' + ext, 'wb') as fd:
fd.write(req.content)
return '.\\outputs\\file.' + ext, file_type
elif content_type == 'video/avi' or str(url).lower().endswith(".avi"):
ext = "avi"
file_type = "video"
with open('.\\outputs\\file.' + ext, 'wb') as fd:
fd.write(req.content)
return '.\\outputs\\file.' + ext, file_type
elif content_type == 'video/quicktime' or str(url).lower().endswith(".mov"):
ext = "mov"
file_type = "video"
with open('.\\outputs\\file.' + ext, 'wb') as fd:
fd.write(req.content)
return '.\\outputs\\file.' + ext, file_type
else:
print(str(url).lower())
return None, None
def downloadOvercast(source_url, target_location):
from utils.scrapper.media_scrapper import OvercastDownload
result = OvercastDownload(source_url, target_location)
return result
def downloadTwitter(videourl, path):
from utils.scrapper.media_scrapper import XitterDownload
result = XitterDownload(videourl, path + "x.mp4")
return result
def downloadTikTok(videourl, path):
from utils.scrapper.media_scrapper import TiktokDownloadAll
result = TiktokDownloadAll([videourl], path)
return result
def downloadInstagram(videourl, path):
from utils.scrapper.media_scrapper import InstagramDownload
result = InstagramDownload(videourl, "insta", path)
return result
def downloadYouTube(link, path, audioonly=True):
from utils.scrapper.media_scrapper import YouTubeDownload
result = YouTubeDownload(link, path, audio_only=True)
return result

View File

@ -1,7 +1,6 @@
import json
import typing
from datetime import timedelta
from nostr_sdk import Filter, Client, Alphabet, EventId, Event, PublicKey, Tag, Keys, nip04_decrypt, EventBuilder
from nostr_sdk import Filter, Client, Alphabet, EventId, Event, PublicKey, Tag, Keys, nip04_decrypt
def get_event_by_id(event_id: str, client: Client, config=None) -> Event | None:
@ -19,6 +18,7 @@ def get_event_by_id(event_id: str, client: Client, config=None) -> Event | None:
id_filter = Filter().id(event_id).limit(1)
events = client.get_events_of([id_filter], timedelta(seconds=config.RELAY_TIMEOUT))
if len(events) > 0:
return events[0]
else:
return None
@ -42,23 +42,26 @@ def get_referenced_event_by_id(event_id, client, dvm_config, kinds) -> Event | N
def send_event(event: Event, client: Client, dvm_config) -> EventId:
relays = []
try:
relays = []
for tag in event.tags():
if tag.as_vec()[0] == 'relays':
relays = tag.as_vec()[1].split(',')
for tag in event.tags():
if tag.as_vec()[0] == 'relays':
relays = tag.as_vec()[1].split(',')
for relay in relays:
if relay not in dvm_config.RELAY_LIST:
client.add_relay(relay)
for relay in relays:
if relay not in dvm_config.RELAY_LIST:
client.add_relay(relay)
event_id = client.send_event(event)
event_id = client.send_event(event)
for relay in relays:
if relay not in dvm_config.RELAY_LIST:
client.remove_relay(relay)
for relay in relays:
if relay not in dvm_config.RELAY_LIST:
client.remove_relay(relay)
return event_id
return event_id
except Exception as e:
print(e)
def check_and_decrypt_tags(event, dvm_config):

View File

@ -17,82 +17,87 @@ Post process results to either given output format or a Nostr readable plain tex
def post_process_result(anno, original_event):
print("Post-processing...")
if isinstance(anno, pandas.DataFrame): # if input is an anno we parse it to required output format
for tag in original_event.tags:
print("Pandas Dataframe...")
has_output_tag = False
output_format = "text/plain"
for tag in original_event.tags():
if tag.as_vec()[0] == "output":
output_format = tag.as_vec()[1]
print("requested output is " + str(tag.as_vec()[1]) + "...")
try:
if output_format == "text/plain":
result = ""
for each_row in anno['name']:
if each_row is not None:
for i in str(each_row).split('\n'):
result = result + i + "\n"
result = replace_broken_words(
str(result).replace("\"", "").replace('[', "").replace(']',
"").lstrip(None))
return result
has_output_tag = True
print("requested output is " + str(output_format) + "...")
elif output_format == "text/vtt":
print(str(anno))
result = "WEBVTT\n\n"
for element in anno:
name = element["name"] # name
start = float(element["from"])
convertstart = str(datetime.timedelta(seconds=start))
end = float(element["to"])
convertend = str(datetime.timedelta(seconds=end))
print(str(convertstart) + " --> " + str(convertend))
cleared_name = str(name).lstrip("\'").rstrip("\'")
result = result + str(convertstart) + " --> " + str(
convertend) + "\n" + cleared_name + "\n\n"
result = replace_broken_words(
str(result).replace("\"", "").replace('[', "").replace(']',
"").lstrip(None))
return result
elif output_format == "text/json" or output_format == "json":
# result = json.dumps(json.loads(anno.data.to_json(orient="records")))
result = replace_broken_words(json.dumps(anno.data.tolist()))
return result
# TODO add more
else:
result = ""
for element in anno.data:
element["name"] = str(element["name"]).lstrip()
element["from"] = (format(float(element["from"]), '.2f')).lstrip() # name
element["to"] = (format(float(element["to"]), '.2f')).lstrip() # name
result = result + "(" + str(element["from"]) + "," + str(element["to"]) + ")" + " " + str(
element["name"]) + "\n"
print(result)
result = replace_broken_words(result)
return result
except Exception as e:
print(e)
result = replace_broken_words(str(anno.data))
if has_output_tag:
print("Output Tag found: " + output_format)
try:
if output_format == "text/plain":
result = pandas_to_plaintext(anno)
result = replace_broken_words(
str(result).replace("\"", "").replace('[', "").replace(']',
"").lstrip(None))
return result
else:
result = ""
for element in anno.data:
element["name"] = str(element["name"]).lstrip()
element["from"] = (format(float(element["from"]), '.2f')).lstrip() # name
element["to"] = (format(float(element["to"]), '.2f')).lstrip() # name
result = result + "(" + str(element["from"]) + "," + str(element["to"]) + ")" + " " + str(
element["name"]) + "\n"
elif output_format == "text/vtt":
print(str(anno))
result = "WEBVTT\n\n"
for element in anno:
name = element["name"] # name
start = float(element["from"])
convertstart = str(datetime.timedelta(seconds=start))
end = float(element["to"])
convertend = str(datetime.timedelta(seconds=end))
print(str(convertstart) + " --> " + str(convertend))
cleared_name = str(name).lstrip("\'").rstrip("\'")
result = result + str(convertstart) + " --> " + str(
convertend) + "\n" + cleared_name + "\n\n"
result = replace_broken_words(
str(result).replace("\"", "").replace('[', "").replace(']',
"").lstrip(None))
return result
elif output_format == "text/json" or output_format == "json":
# result = json.dumps(json.loads(anno.data.to_json(orient="records")))
result = replace_broken_words(json.dumps(anno.data.tolist()))
return result
# TODO add more
else:
print("Pandas Dataframe but output tag not supported.. falling back to default..")
result = pandas_to_plaintext(anno)
print(result)
result = str(result).replace("\"", "").replace('[', "").replace(']',
"").lstrip(None)
return result
except Exception as e:
print(e)
result = replace_broken_words(str(anno.data))
return result
else:
print("Pandas Dataframe but no output tag set.. falling back to default..")
result = pandas_to_plaintext(anno)
print(result)
result = replace_broken_words(result)
result = str(result).replace("\"", "").replace('[', "").replace(']',
"").lstrip(None)
return result
elif isinstance(anno, NoneType):
return "An error occurred"
else:
print("Nonetype")
result = replace_broken_words(anno) # TODO
return result
def pandas_to_plaintext(anno):
result = ""
for each_row in anno['name']:
if each_row is not None:
for i in str(each_row).split('\n'):
result = result + i + "\n"
return result
'''
Convenience function to replace words like Noster with Nostr
'''

View File

@ -0,0 +1,599 @@
import json
import os
import re
import sys
import urllib.parse
from typing import Any
from urllib.request import urlopen, Request
import requests
import instaloader
from pytube import YouTube
def XitterDownload(source_url, target_location):
script_dir = os.path.dirname(os.path.realpath(__file__))
request_details_file = f"{script_dir}{os.sep}request_details.json"
request_details = json.load(open(request_details_file, "r")) # test
features, variables = request_details["features"], request_details["variables"]
def get_tokens(tweet_url):
html = requests.get(tweet_url)
assert (
html.status_code == 200
), f"Failed to get tweet page. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {html.status_code}. Tweet url: {tweet_url}"
mainjs_url = re.findall(
r"https://abs.twimg.com/responsive-web/client-web-legacy/main.[^\.]+.js",
html.text,
)
assert (
mainjs_url is not None and len(mainjs_url) > 0
), f"Failed to find main.js file. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}"
mainjs_url = mainjs_url[0]
mainjs = requests.get(mainjs_url)
assert (
mainjs.status_code == 200
), f"Failed to get main.js file. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {mainjs.status_code}. Tweet url: {tweet_url}"
bearer_token = re.findall(r'AAAAAAAAA[^"]+', mainjs.text)
assert (
bearer_token is not None and len(bearer_token) > 0
), f"Failed to find bearer token. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}, main.js url: {mainjs_url}"
bearer_token = bearer_token[0]
# get the guest token
with requests.Session() as s:
s.headers.update(
{
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0",
"accept": "*/*",
"accept-language": "de,en-US;q=0.7,en;q=0.3",
"accept-encoding": "gzip, deflate, br",
"te": "trailers",
}
)
s.headers.update({"authorization": f"Bearer {bearer_token}"})
# activate bearer token and get guest token
guest_token = s.post("https://api.twitter.com/1.1/guest/activate.json").json()[
"guest_token"
]
assert (
guest_token is not None
), f"Failed to find guest token. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}, main.js url: {mainjs_url}"
return bearer_token, guest_token
def get_details_url(tweet_id, features, variables):
# create a copy of variables - we don't want to modify the original
variables = {**variables}
variables["tweetId"] = tweet_id
return f"https://twitter.com/i/api/graphql/0hWvDhmW8YQ-S_ib3azIrw/TweetResultByRestId?variables={urllib.parse.quote(json.dumps(variables))}&features={urllib.parse.quote(json.dumps(features))}"
def get_tweet_details(tweet_url, guest_token, bearer_token):
tweet_id = re.findall(r"(?<=status/)\d+", tweet_url)
assert (
tweet_id is not None and len(tweet_id) == 1
), f"Could not parse tweet id from your url. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {tweet_url}"
tweet_id = tweet_id[0]
# the url needs a url encoded version of variables and features as a query string
url = get_details_url(tweet_id, features, variables)
details = requests.get(
url,
headers={
"authorization": f"Bearer {bearer_token}",
"x-guest-token": guest_token,
},
)
max_retries = 10
cur_retry = 0
while details.status_code == 400 and cur_retry < max_retries:
try:
error_json = json.loads(details.text)
except json.JSONDecodeError:
assert (
False
), f"Failed to parse json from details error. details text: {details.text} If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}"
assert (
"errors" in error_json
), f"Failed to find errors in details error json. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}"
needed_variable_pattern = re.compile(r"Variable '([^']+)'")
needed_features_pattern = re.compile(
r'The following features cannot be null: ([^"]+)'
)
for error in error_json["errors"]:
needed_vars = needed_variable_pattern.findall(error["message"])
for needed_var in needed_vars:
variables[needed_var] = True
needed_features = needed_features_pattern.findall(error["message"])
for nf in needed_features:
for feature in nf.split(","):
features[feature.strip()] = True
url = get_details_url(tweet_id, features, variables)
details = requests.get(
url,
headers={
"authorization": f"Bearer {bearer_token}",
"x-guest-token": guest_token,
},
)
cur_retry += 1
if details.status_code == 200:
# save new variables
request_details["variables"] = variables
request_details["features"] = features
with open(request_details_file, "w") as f:
json.dump(request_details, f, indent=4)
assert (
details.status_code == 200
), f"Failed to get tweet details. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}"
return details
def get_tweet_status_id(tweet_url):
sid_patern = r"https://x\.com/[^/]+/status/(\d+)"
if tweet_url[len(tweet_url) - 1] != "/":
tweet_url = tweet_url + "/"
match = re.findall(sid_patern, tweet_url)
if len(match) == 0:
print("error, could not get status id from this tweet url :", tweet_url)
exit()
status_id = match[0]
return status_id
def get_associated_media_id(j, tweet_url):
sid = get_tweet_status_id(tweet_url)
pattern = (
r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/'
+ sid
+ '/[^"]+",\s*"id_str"\s*:\s*"\d+",'
)
matches = re.findall(pattern, j)
if len(matches) > 0:
target = matches[0]
target = target[0: len(target) - 1] # remove the coma at the end
return json.loads("{" + target + "}")["id_str"]
return None
def extract_mp4s(j, tweet_url, target_all_mp4s=False):
# pattern looks like https://video.twimg.com/amplify_video/1638969830442237953/vid/1080x1920/lXSFa54mAVp7KHim.mp4?tag=16 or https://video.twimg.com/ext_tw_video/1451958820348080133/pu/vid/720x1280/GddnMJ7KszCQQFvA.mp4?tag=12
amplitude_pattern = re.compile(
r"(https://video.twimg.com/amplify_video/(\d+)/vid/(\d+x\d+)/[^.]+.mp4\?tag=\d+)"
)
ext_tw_pattern = re.compile(
r"(https://video.twimg.com/ext_tw_video/(\d+)/pu/vid/(avc1/)?(\d+x\d+)/[^.]+.mp4\?tag=\d+)"
)
# format - https://video.twimg.com/tweet_video/Fvh6brqWAAQhU9p.mp4
tweet_video_pattern = re.compile(r'https://video.twimg.com/tweet_video/[^"]+')
# https://video.twimg.com/ext_tw_video/1451958820348080133/pu/pl/b-CiC-gZClIwXgDz.m3u8?tag=12&container=fmp4
container_pattern = re.compile(r'https://video.twimg.com/[^"]*container=fmp4')
media_id = get_associated_media_id(j, tweet_url)
# find all the matches
matches = amplitude_pattern.findall(j)
matches += ext_tw_pattern.findall(j)
container_matches = container_pattern.findall(j)
tweet_video_matches = tweet_video_pattern.findall(j)
if len(matches) == 0 and len(tweet_video_matches) > 0:
return tweet_video_matches
results = {}
for match in matches:
url, tweet_id, _, resolution = match
if tweet_id not in results:
results[tweet_id] = {"resolution": resolution, "url": url}
else:
# if we already have a higher resolution video, then don't overwrite it
my_dims = [int(x) for x in resolution.split("x")]
their_dims = [int(x) for x in results[tweet_id]["resolution"].split("x")]
if my_dims[0] * my_dims[1] > their_dims[0] * their_dims[1]:
results[tweet_id] = {"resolution": resolution, "url": url}
if media_id:
all_urls = []
for twid in results:
all_urls.append(results[twid]["url"])
all_urls += container_matches
url_with_media_id = []
for url in all_urls:
if url.__contains__(media_id):
url_with_media_id.append(url)
if len(url_with_media_id) > 0:
return url_with_media_id
if len(container_matches) > 0 and not target_all_mp4s:
return container_matches
if target_all_mp4s:
urls = [x["url"] for x in results.values()]
urls += container_matches
return urls
return [x["url"] for x in results.values()]
def download_parts(url, output_filename):
resp = requests.get(url, stream=True)
# container begins with / ends with fmp4 and has a resolution in it we want to capture
pattern = re.compile(r"(/[^\n]*/(\d+x\d+)/[^\n]*container=fmp4)")
matches = pattern.findall(resp.text)
max_res = 0
max_res_url = None
for match in matches:
url, resolution = match
width, height = resolution.split("x")
res = int(width) * int(height)
if res > max_res:
max_res = res
max_res_url = url
assert (
max_res_url is not None
), f"Could not find a url to download from. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {url}"
video_part_prefix = "https://video.twimg.com"
resp = requests.get(video_part_prefix + max_res_url, stream=True)
mp4_pattern = re.compile(r"(/[^\n]*\.mp4)")
mp4_parts = mp4_pattern.findall(resp.text)
assert (
len(mp4_parts) == 1
), f"There should be exactly 1 mp4 container at this point. Instead, found {len(mp4_parts)}. Please open a GitHub issue and copy and paste this message into it. Tweet url: {url}"
mp4_url = video_part_prefix + mp4_parts[0]
m4s_part_pattern = re.compile(r"(/[^\n]*\.m4s)")
m4s_parts = m4s_part_pattern.findall(resp.text)
with open(output_filename, "wb") as f:
r = requests.get(mp4_url, stream=True)
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
f.flush()
for part in m4s_parts:
part_url = video_part_prefix + part
r = requests.get(part_url, stream=True)
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
f.flush()
return True
def repost_check(j, exclude_replies=True):
try:
# This line extract the index of the first reply
reply_index = j.index('"conversationthread-')
except ValueError:
# If there are no replies we use the enrire response data length
reply_index = len(j)
# We truncate the response data to exclude replies
if exclude_replies:
j = j[0:reply_index]
# We use this regular expression to extract the source status
source_status_pattern = r'"source_status_id_str"\s*:\s*"\d+"'
matches = re.findall(source_status_pattern, j)
if len(matches) > 0 and exclude_replies:
# We extract the source status id (ssid)
ssid = json.loads("{" + matches[0] + "}")["source_status_id_str"]
# We plug it in this regular expression to find expanded_url (the original tweet url)
expanded_url_pattern = (
r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/' + ssid + '[^"]+"'
)
matches2 = re.findall(expanded_url_pattern, j)
if len(matches2) > 0:
# We extract the url and return it
status_url = json.loads("{" + matches2[0] + "}")["expanded_url"]
return status_url
if not exclude_replies:
# If we include replies we'll have to get all ssids and remove duplicates
ssids = []
for match in matches:
ssids.append(json.loads("{" + match + "}")["source_status_id_str"])
# we remove duplicates (this line is messy but it's the easiest way to do it)
ssids = list(set(ssids))
if len(ssids) > 0:
for ssid in ssids:
expanded_url_pattern = (
r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/'
+ ssid
+ '[^"]+"'
)
matches2 = re.findall(expanded_url_pattern, j)
if len(matches2) > 0:
status_urls = []
for match in matches2:
status_urls.append(
json.loads("{" + match + "}")["expanded_url"]
)
# We remove duplicates another time
status_urls = list(set(status_urls))
return status_urls
# If we don't find source_status_id_str, the tweet doesn't feature a reposted video
return None
def download_video_from_x(tweet_url, output_file, target_all_videos=False):
bearer_token, guest_token = get_tokens(tweet_url)
resp = get_tweet_details(tweet_url, guest_token, bearer_token)
mp4s = extract_mp4s(resp.text, tweet_url, target_all_videos)
if target_all_videos:
video_counter = 1
original_urls = repost_check(resp.text, exclude_replies=False)
if len(original_urls) > 0:
for url in original_urls:
download_video_from_x(
url, output_file.replace(".mp4", f"_{video_counter}.mp4")
)
video_counter += 1
if len(mp4s) > 0:
for mp4 in mp4s:
output_file = output_file.replace(".mp4", f"_{video_counter}.mp4")
if "container" in mp4:
download_parts(mp4, output_file)
else:
r = requests.get(mp4, stream=True)
with open(output_file, "wb") as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
f.flush()
video_counter += 1
else:
original_url = repost_check(resp.text)
if original_url:
download_video_from_x(original_url, output_file)
else:
assert (
len(mp4s) > 0
), f"Could not find any mp4s to download. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {tweet_url}"
mp4 = mp4s[0]
if "container" in mp4:
download_parts(mp4, output_file)
else:
# use a stream to download the file
r = requests.get(mp4, stream=True)
with open(output_file, "wb") as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
f.flush()
return target_location
return download_video_from_x(source_url, target_location)
# TIKTOK/INSTA
def getDict() -> dict:
response = requests.get('https://ttdownloader.com/')
point = response.text.find('<input type="hidden" id="token" name="token" value="') + \
len('<input type="hidden" id="token" name="token" value="')
token = response.text[point:point + 64]
TTDict = {
'token': token,
}
for i in response.cookies:
TTDict[str(i).split()[1].split('=')[0].strip()] = str(
i).split()[1].split('=')[1].strip()
return TTDict
def createHeader(parseDict) -> tuple[dict[str, Any], dict[str | Any, str | Any], dict[str, str | Any]]:
cookies = {
'PHPSESSID': parseDict['PHPSESSID'],
# 'popCookie': parseDict['popCookie'],
}
headers = {
'authority': 'ttdownloader.com',
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9',
'content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
'origin': 'https://ttdownloader.com',
'referer': 'https://ttdownloader.com/',
'sec-ch-ua': '"Not?A_Brand";v="8", "Chromium";v="108", "Google Chrome";v="108"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/108.0.0.0 Safari/537.36',
'x-requested-with': 'XMLHttpRequest',
}
data = {
'url': '',
'format': '',
'token': parseDict['token'],
}
return cookies, headers, data
def TikTokDownload(cookies, headers, data, name, path) -> str:
response = requests.post('https://ttdownloader.com/search/',
cookies=cookies, headers=headers, data=data)
parsed_link = [i for i in str(response.text).split()
if i.startswith("href=")][0]
response = requests.get(parsed_link[6:-10])
with open(path + "tiktok" + name + ".mp4", "wb") as f:
f.write(response.content)
return path + "tiktok" + name + ".mp4"
def TiktokDownloadAll(linkList, path) -> str:
parseDict = getDict()
cookies, headers, data = createHeader(parseDict)
# linkList = getLinkDict()['tiktok']
for i in linkList:
try:
data['url'] = i
result = TikTokDownload(cookies, headers, data, str(linkList.index(i)), path)
return result
except IndexError:
parseDict = getDict()
cookies, headers, data = createHeader(parseDict)
except Exception as err:
print(err)
exit(1)
def InstagramDownload(url, name, path) -> str:
obj = instaloader.Instaloader()
post = instaloader.Post.from_shortcode(obj.context, url.split("/")[-2])
photo_url = post.url
video_url = post.video_url
print(video_url)
if video_url:
response = requests.get(video_url)
with open(path + "insta" + name + ".mp4", "wb") as f:
f.write(response.content)
return path + "insta" + name + ".mp4"
elif photo_url:
response = requests.get(photo_url)
with open(path + "insta" + name + ".jpg", "wb") as f:
f.write(response.content)
return path + "insta" + name + ".jpg"
def InstagramDownloadAll(linklist, path) -> str:
for i in linklist:
try:
print(str(linklist.index(i)))
print(str(linklist[i]))
result = InstagramDownload(i, str(linklist.index(i)), path)
return result
except Exception as err:
print(err)
exit(1)
# YOUTUBE
def YouTubeDownload(link, path, audio_only=True):
youtubeObject = YouTube(link)
if audio_only:
youtubeObject = youtubeObject.streams.get_audio_only()
youtubeObject.download(path, "yt.mp3")
print("Download is completed successfully")
return path + "yt.mp3"
else:
youtubeObject = youtubeObject.streams.get_highest_resolution()
youtubeObject.download(path, "yt.mp4")
print("Download is completed successfully")
return path + "yt.mp4"
def checkYoutubeLinkValid(link):
try:
# TODO find a way to test without fully downloading the file
youtubeObject = YouTube(link)
youtubeObject = youtubeObject.streams.get_audio_only()
youtubeObject.download(".", "yt.mp3")
os.remove("yt.mp3")
return True
except Exception as e:
print(str(e))
return False
# OVERCAST
def OvercastDownload(source_url, target_location):
def get_title(html_str):
"""Get the title from the meta tags"""
title = re.findall(r"<meta name=\"og:title\" content=\"(.+)\"", html_str)
if len(title) == 1:
return title[0].replace("&mdash;", "-")
return None
def get_description(html_str):
"""Get the description from the Meta tag"""
desc_re = r"<meta name=\"og:description\" content=\"(.+)\""
description = re.findall(desc_re, html_str)
if len(description) == 1:
return description[0]
return None
def get_url(html_string):
"""Find the URL from the <audio><source>.... tag"""
url = re.findall(r"<source src=\"(.+?)\"", html_string)
if len(url) == 1:
# strip off the last 4 characters to cater for the #t=0 in the URL
# which urlretrieve flags as invalid
return url[0][:-4]
return None
"""Given a Overcast source URL fetch the file it points to"""
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) "
"AppleWebKit/537.11 (KHTML, like Gecko) "
"Chrome/23.0.1271.64 Safari/537.11",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Charset": "ISO-8859-1,utf-8;q=0.7,*;q=0.3",
"Accept-Encoding": "none",
"Accept-Language": "en-US,en;q=0.8",
"Connection": "keep-alive",
}
req = Request(source_url, None, headers)
source_data = urlopen(req).read().decode('utf-8')
title = get_title(source_data)
url = get_url(source_data)
if url is None or title is None:
sys.exit("Could not find parse URL")
if not os.path.exists(target_location):
req = requests.get(url)
file = open(target_location, 'wb')
for chunk in req.iter_content(100000):
file.write(chunk)
file.close()

View File

@ -0,0 +1,40 @@
{
"features": {
"responsive_web_graphql_exclude_directive_enabled": true,
"verified_phone_label_enabled": false,
"responsive_web_graphql_timeline_navigation_enabled": true,
"responsive_web_graphql_skip_user_profile_image_extensions_enabled": false,
"tweetypie_unmention_optimization_enabled": true,
"vibe_api_enabled": false,
"responsive_web_edit_tweet_api_enabled": false,
"graphql_is_translatable_rweb_tweet_is_translatable_enabled": false,
"view_counts_everywhere_api_enabled": true,
"longform_notetweets_consumption_enabled": true,
"tweet_awards_web_tipping_enabled": false,
"freedom_of_speech_not_reach_fetch_enabled": false,
"standardized_nudges_misinfo": false,
"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": false,
"interactive_text_enabled": false,
"responsive_web_twitter_blue_verified_badge_is_enabled": true,
"responsive_web_text_conversations_enabled": false,
"longform_notetweets_richtext_consumption_enabled": false,
"responsive_web_enhance_cards_enabled": false,
"longform_notetweets_inline_media_enabled": true,
"longform_notetweets_rich_text_read_enabled": true,
"responsive_web_media_download_video_enabled": true,
"responsive_web_twitter_article_tweet_consumption_enabled": true,
"creator_subscriptions_tweet_preview_api_enabled": true
},
"variables": {
"with_rux_injections": false,
"includePromotedContent": true,
"withCommunity": true,
"withQuickPromoteEligibilityTweetFields": true,
"withBirdwatchNotes": true,
"withDownvotePerspective": false,
"withReactionsMetadata": false,
"withReactionsPerspective": false,
"withVoice": true,
"withV2Timeline": true
}
}

View File

@ -1,4 +1,4 @@
# LIGHTNING FUNCTIONS
# LIGHTNING/CASHU/ZAP FUNCTIONS
import base64
import json
import os
@ -11,7 +11,7 @@ from nostr_sdk import nostr_sdk, PublicKey, SecretKey, Event, EventBuilder, Tag,
from utils.database_utils import get_or_add_user
from utils.dvmconfig import DVMConfig
from utils.nostr_utils import get_event_by_id, check_and_decrypt_tags, check_and_decrypt_own_tags
from utils.nostr_utils import get_event_by_id, check_and_decrypt_own_tags
import lnurl
from hashlib import sha256
@ -91,7 +91,11 @@ def create_bolt11_ln_bits(sats: int, config: DVMConfig) -> (str, str):
try:
res = requests.post(url, json=data, headers=headers)
obj = json.loads(res.text)
return obj["payment_request"], obj["payment_hash"]
if obj.get("payment_request") and obj.get("payment_hash"):
return obj["payment_request"], obj["payment_hash"]#
else:
print(res.text)
return None, None
except Exception as e:
print("LNBITS: " + str(e))
return None, None
@ -121,7 +125,10 @@ def check_bolt11_ln_bits_is_paid(payment_hash: str, config: DVMConfig):
try:
res = requests.get(url, headers=headers)
obj = json.loads(res.text)
return obj["paid"]
if obj.get("paid"):
return obj["paid"]
else:
return False
except Exception as e:
return None
@ -133,7 +140,10 @@ def pay_bolt11_ln_bits(bolt11: str, config: DVMConfig):
try:
res = requests.post(url, json=data, headers=headers)
obj = json.loads(res.text)
return obj["payment_hash"]
if obj.get("payment_hash"):
return obj["payment_hash"]
else:
return "Error"
except Exception as e:
print("LNBITS: " + str(e))
return None, None