NostrDVM: Adding Scheduler to interface

allows scheduling a task in the background (e.g. updating database, check for a time-based thing
This commit is contained in:
Believethehype 2024-03-04 15:42:37 +01:00
parent 1fd708e956
commit b28773291c
6 changed files with 72 additions and 54 deletions

38
main.py
View File

@ -8,7 +8,7 @@ from nostr_dvm.tasks import videogeneration_replicate_svd, imagegeneration_repli
trending_notes_nostrband, discovery_inactive_follows, translation_google, textextraction_pdf, \
translation_libretranslate, textextraction_google, convert_media, imagegeneration_openai_dalle, texttospeech, \
imagegeneration_sd21_mlx, advanced_search, textgeneration_huggingchat, summarization_huggingchat, \
discovery_nonfollowers
discovery_nonfollowers, search_users
from nostr_dvm.utils.admin_utils import AdminConfig
from nostr_dvm.utils.backend_utils import keep_alive
from nostr_dvm.utils.definitions import EventDefinitions
@ -31,7 +31,6 @@ def playground():
bot_config.LNBITS_ADMIN_KEY = admin_key # The dvm might pay failed jobs back
bot_config.LNBITS_URL = os.getenv("LNBITS_HOST")
# Generate an optional Admin Config, in this case, whenever we give our DVMs this config, they will (re)broadcast
# their NIP89 announcement
# You can create individual admins configs and hand them over when initializing the dvm,
@ -46,7 +45,6 @@ def playground():
# Update the DVMs (not the bot) profile. For example after you updated the NIP89 or the lnaddress, you can automatically update profiles here.
admin_config.UPDATE_PROFILE = False
# Spawn some DVMs in the playground and run them
# You can add arbitrary DVMs there and instantiate them here
@ -63,11 +61,11 @@ def playground():
# Spawn DVM3 Kind 5002 Local Text TranslationLibre, calling the free LibreTranslateApi, as an alternative.
# This will only run and appear on the bot if an endpoint is set in the .env
if os.getenv("LIBRE_TRANSLATE_ENDPOINT") is not None and os.getenv("LIBRE_TRANSLATE_ENDPOINT") != "":
libre_translator = translation_libretranslate.build_example("Libre Translator", "libre_translator", admin_config)
libre_translator = translation_libretranslate.build_example("Libre Translator", "libre_translator",
admin_config)
bot_config.SUPPORTED_DVMS.append(libre_translator) # We add translator to the bot
libre_translator.run()
# Spawn DVM4, this one requires an OPENAI API Key and balance with OpenAI, you will move the task to them and pay
# per call. Make sure you have enough balance and the DVM's cost is set higher than what you pay yourself, except, you know,
# you're being generous.
@ -77,17 +75,18 @@ def playground():
dalle.run()
if os.getenv("REPLICATE_API_TOKEN") is not None and os.getenv("REPLICATE_API_TOKEN") != "":
sdxlreplicate = imagegeneration_replicate_sdxl.build_example("Stable Diffusion XL", "replicate_sdxl", admin_config)
sdxlreplicate = imagegeneration_replicate_sdxl.build_example("Stable Diffusion XL", "replicate_sdxl",
admin_config)
bot_config.SUPPORTED_DVMS.append(sdxlreplicate)
sdxlreplicate.run()
if os.getenv("REPLICATE_API_TOKEN") is not None and os.getenv("REPLICATE_API_TOKEN") != "":
svdreplicate = videogeneration_replicate_svd.build_example("Stable Video Diffusion", "replicate_svd", admin_config)
svdreplicate = videogeneration_replicate_svd.build_example("Stable Video Diffusion", "replicate_svd",
admin_config)
bot_config.SUPPORTED_DVMS.append(svdreplicate)
svdreplicate.run()
#Let's define a function so we can add external DVMs to our bot, we will instanciate it afterwards
# Let's define a function so we can add external DVMs to our bot, we will instanciate it afterwards
# Spawn DVM5.. oh wait, actually we don't spawn a new DVM, we use the dvmtaskinterface to define an external dvm by providing some info about it, such as
# their pubkey, a name, task, kind etc. (unencrypted)
@ -98,7 +97,6 @@ def playground():
bot_config.SUPPORTED_DVMS.append(tasktiger_external)
# Don't run it, it's on someone else's machine, and we simply make the bot aware of it.
# DVM: 6 Another external dvm for recommendations:
ymhm_external = build_external_dvm(pubkey="6b37d5dc88c1cbd32d75b713f6d4c2f7766276f51c9337af9d32c8d715cc1b93",
task="content-discovery",
@ -108,21 +106,17 @@ def playground():
# If we get back a list of people or events, we can post-process it to make it readable in social clients
bot_config.SUPPORTED_DVMS.append(ymhm_external)
# Spawn DVM 7 Find inactive followers
googleextractor = textextraction_google.build_example("Extractor", "speech_recognition",
admin_config)
bot_config.SUPPORTED_DVMS.append(googleextractor)
googleextractor.run()
# Spawn DVM 8 A Media Grabber/Converter
media_bringer = convert_media.build_example("Media Bringer", "media_converter", admin_config)
bot_config.SUPPORTED_DVMS.append(media_bringer)
media_bringer.run()
# Spawn DVM9 Find inactive followers
discover_inactive = discovery_inactive_follows.build_example("Bygones", "discovery_inactive_follows",
admin_config)
@ -134,7 +128,8 @@ def playground():
bot_config.SUPPORTED_DVMS.append(discover_nonfollowers)
discover_nonfollowers.run()
trending = trending_notes_nostrband.build_example("Trending Notes on nostr.band", "trending_notes_nostrband", admin_config)
trending = trending_notes_nostrband.build_example("Trending Notes on nostr.band", "trending_notes_nostrband",
admin_config)
bot_config.SUPPORTED_DVMS.append(trending)
trending.run()
@ -150,8 +145,12 @@ def playground():
bot_config.SUPPORTED_DVMS.append(search)
search.run()
profile_search = search_users.build_example("Profile Searcher", "profile_search", admin_config)
bot_config.SUPPORTED_DVMS.append(profile_search)
profile_search.run()
inactive = discovery_inactive_follows.build_example("Inactive People you follow", "discovery_inactive_follows", admin_config)
inactive = discovery_inactive_follows.build_example("Inactive People you follow", "discovery_inactive_follows",
admin_config)
bot_config.SUPPORTED_DVMS.append(inactive)
inactive.run()
@ -162,16 +161,15 @@ def playground():
mlx.run()
if os.getenv("HUGGINGFACE_EMAIL") is not None and os.getenv("HUGGINGFACE_EMAIL") != "":
hugginchat = textgeneration_huggingchat.build_example("Huggingchat", "huggingchat",admin_config)
hugginchat = textgeneration_huggingchat.build_example("Huggingchat", "huggingchat", admin_config)
bot_config.SUPPORTED_DVMS.append(hugginchat)
hugginchat.run()
hugginchatsum = summarization_huggingchat.build_example("Huggingchat Summarizer", "huggingchatsum", admin_config)
hugginchatsum = summarization_huggingchat.build_example("Huggingchat Summarizer", "huggingchatsum",
admin_config)
bot_config.SUPPORTED_DVMS.append(hugginchatsum)
hugginchatsum.run()
# Run the bot
Bot(bot_config)
# Keep the main function alive for libraries that require it, like openai

View File

@ -22,8 +22,6 @@ from nostr_dvm.utils.zap_utils import check_bolt11_ln_bits_is_paid, create_bolt1
from nostr_dvm.utils.cashu_utils import redeem_cashu
class DVM:
dvm_config: DVMConfig
admin_config: AdminConfig
@ -42,8 +40,7 @@ class DVM:
.skip_disconnected_relays(skip_disconnected_relays))
signer = NostrSigner.keys(self.keys)
self.client = Client.with_opts(signer,opts)
self.client = Client.with_opts(signer, opts)
self.job_list = []
self.jobs_on_hold_list = []
@ -419,15 +416,15 @@ class DVM:
if status == "payment-required" or (status == "processing" and not is_paid):
if dvm_config.LNBITS_INVOICE_KEY != "":
try:
bolt11, payment_hash = create_bolt11_ln_bits(amount,dvm_config)
bolt11, payment_hash = create_bolt11_ln_bits(amount, dvm_config)
except Exception as e:
print(e)
try:
bolt11, payment_hash = create_bolt11_lud16(dvm_config.LN_ADDRESS,
amount)
amount)
except Exception as e:
print(e)
bolt11 = None
print(e)
bolt11 = None
elif dvm_config.LN_ADDRESS != "":
try:
bolt11, payment_hash = create_bolt11_lud16(dvm_config.LN_ADDRESS, amount)
@ -529,7 +526,8 @@ class DVM:
user = get_or_add_user(self.dvm_config.DB, job_event.author().to_hex(),
client=self.client, config=self.dvm_config)
print(user.lud16 + " " + str(amount))
bolt11 = zaprequest(user.lud16, amount, "Couldn't finish job, returning sats", job_event, user.npub,
bolt11 = zaprequest(user.lud16, amount, "Couldn't finish job, returning sats", job_event,
user.npub,
self.keys, self.dvm_config.RELAY_LIST, zaptype="private")
if bolt11 is None:
print("Receiver has no Lightning address, can't zap back.")
@ -543,6 +541,11 @@ class DVM:
self.client.handle_notifications(NotificationHandler())
while True:
for dvm in self.dvm_config.SUPPORTED_DVMS:
scheduled_result = dvm.schedule(self.dvm_config)
for job in self.job_list:
if job.bolt11 != "" and job.payment_hash != "" and not job.payment_hash is None and not job.is_paid:
ispaid = check_bolt11_ln_bits_is_paid(job.payment_hash, self.dvm_config)

View File

@ -86,6 +86,10 @@ class DVMTaskInterface:
nostr_dvm_thread = Thread(target=self.DVM, args=[self.dvm_config, self.admin_config])
nostr_dvm_thread.start()
def schedule(self, dvm_config):
"""schedule something, e.g. define some time to update or to post, does nothing by default"""
pass
def NIP89_announcement(self, nip89config: NIP89Config):
nip89 = NIP89Config()
nip89.NAME = self.NAME

View File

@ -24,10 +24,12 @@ class SearchUser(DVMTaskInterface):
TASK: str = "search-user"
FIX_COST: float = 0
dvm_config: DVMConfig
last_schedule: int
def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
admin_config: AdminConfig = None, options=None):
dvm_config.SCRIPT = os.path.abspath(__file__)
self.last_schedule = Timestamp.now().as_secs()
use_logger = False
if use_logger:
@ -35,24 +37,7 @@ class SearchUser(DVMTaskInterface):
super().__init__(name, dvm_config, nip89config, admin_config, options)
opts = (Options().wait_for_send(False).send_timeout(timedelta(seconds=self.dvm_config.RELAY_TIMEOUT)))
sk = SecretKey.from_hex(self.dvm_config.PRIVATE_KEY)
keys = Keys.parse(sk.to_hex())
signer = NostrSigner.keys(keys)
database = NostrDatabase.sqlite("db/nostr_profiles.db")
cli = ClientBuilder().signer(signer).database(database).opts(opts).build()
cli.add_relay("wss://relay.damus.io")
#cli.add_relay("wss://atl.purplerelay.com")
cli.connect()
filter1 = Filter().kind(0)
# filter = Filter().author(keys.public_key())
print("Syncing Profile Database.. this might take a while..")
dbopts = NegentropyOptions().direction(NegentropyDirection.DOWN)
cli.reconcile(filter1, dbopts)
print("Done Syncing Profile Database.")
self.sync_db()
def is_input_supported(self, tags, client=None, dvm_config=None):
for tag in tags:
@ -103,21 +88,19 @@ class SearchUser(DVMTaskInterface):
cli = ClientBuilder().database(database).signer(signer).opts(opts).build()
cli.add_relay("wss://relay.damus.io")
#cli.add_relay("wss://atl.purplerelay.com")
# cli.add_relay("wss://atl.purplerelay.com")
cli.connect()
# Negentropy reconciliation
# Query events from database
filter1 = Filter().kind(0)
events = cli.database().query([filter1])
#for event in events:
# for event in events:
# print(event.as_json())
#events = cli.get_events_of([notes_filter], timedelta(seconds=5))
# events = cli.get_events_of([notes_filter], timedelta(seconds=5))
result_list = []
print("Events: " + str(len(events)))
@ -133,7 +116,7 @@ class SearchUser(DVMTaskInterface):
result_list.append(p_tag.as_vec())
index += 1
except Exception as exp:
print(str(exp) + " " + event.author().to_hex())
print(str(exp) + " " + event.author().to_hex())
else:
break
@ -150,6 +133,34 @@ class SearchUser(DVMTaskInterface):
# if not text/plain, don't post-process
return result
def schedule(self, dvm_config):
if dvm_config.SCHEDULE_UPDATES_SECONDS == 0:
return 0
else:
if Timestamp.now().as_secs() >= self.last_schedule + dvm_config.SCHEDULE_UPDATES_SECONDS:
self.sync_db()
self.last_schedule = Timestamp.now().as_secs()
return 1
def sync_db(self):
opts = (Options().wait_for_send(False).send_timeout(timedelta(seconds=self.dvm_config.RELAY_TIMEOUT)))
sk = SecretKey.from_hex(self.dvm_config.PRIVATE_KEY)
keys = Keys.parse(sk.to_hex())
signer = NostrSigner.keys(keys)
database = NostrDatabase.sqlite("db/nostr_profiles.db")
cli = ClientBuilder().signer(signer).database(database).opts(opts).build()
cli.add_relay("wss://relay.damus.io")
cli.connect()
filter1 = Filter().kind(0)
# filter = Filter().author(keys.public_key())
print("Syncing Profile Database.. this might take a while..")
dbopts = NegentropyOptions().direction(NegentropyDirection.DOWN)
cli.reconcile(filter1, dbopts)
print("Done Syncing Profile Database.")
# We build an example here that we can call by either calling this file directly from the main directory,
# or by adding it to our playground. You can call the example and adjust it to your needs or redefine it in the
@ -158,6 +169,7 @@ def build_example(name, identifier, admin_config):
dvm_config = build_default_config(identifier)
dvm_config.USE_OWN_VENV = False
dvm_config.SHOWLOG = True
dvm_config.SCHEDULE_UPDATES_SECONDS = 600 # Every 10 seconds
# Add NIP89
nip89info = {
"name": name,

View File

@ -35,6 +35,7 @@ class DVMConfig:
NIP89: NIP89Config
SEND_FEEDBACK_EVENTS = True
SHOW_RESULT_BEFORE_PAYMENT: bool = False # if this is true show results even when not paid right after autoprocess
SCHEDULE_UPDATES_SECONDS = 0
def build_default_config(identifier):

View File

@ -1,6 +1,6 @@
from setuptools import setup, find_packages
VERSION = '0.2.5'
VERSION = '0.2.6'
DESCRIPTION = 'A framework to build and run Nostr NIP90 Data Vending Machines'
LONG_DESCRIPTION = ('A framework to build and run Nostr NIP90 Data Vending Machines. '
'This is an early stage release. Interfaces might change/brick')