dev2main (#8)

* add docker compose

* application refactor

* add checking latest posted message before posting again the same block height

---------

Co-authored-by: mroxso <24775431+mroxso@users.noreply.github.com>
This commit is contained in:
mroxso 2025-01-09 11:37:24 +01:00 committed by GitHub
parent 8ed939a880
commit 3ad2d6629f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 83 additions and 54 deletions

View File

@ -1,8 +1,7 @@
version: '3'
services:
timechain:
# image: ghcr.io/mroxso/timechain-nostr-bot:latest
timechain-nostr-bot:
image: ghcr.io/mroxso/timechain-nostr-bot:latest
build: .
environment:
- "RELAYS=wss://relay.example.com"
- "PRIVATE_KEY=YOUR_HEX_PRIVATE_KEY"
- PRIVATE_KEY=YOUR_PRIVATE_KEY
restart: always

128
main.py
View File

@ -2,71 +2,101 @@ import requests
import time
import ssl
import os
from nostr.event import Event
import uuid
import json
from nostr.event import Event, EventKind
from nostr.relay_manager import RelayManager
from nostr.key import PrivateKey
from nostr.filter import Filter, Filters
def connect(relays):
def get_relays():
env_relays = os.getenv('RELAYS')
if env_relays is None:
env_relays = "wss://relay.nostr.band"
return env_relays.split(",")
def setup_relay_manager(relays):
relay_manager = RelayManager()
for relay in relays:
print("Adding relay: " + relay)
relay_manager.add_relay(relay)
relay_manager.open_connections({"cert_reqs": ssl.CERT_NONE}) # NOTE: This disables ssl certificate verification
time.sleep(1.25) # allow the connections to open
print("Connected")
return relay_manager
def disconnect(relay_manager):
print("Disconnecting...")
relay_manager.close_connections()
print("Disconnected")
try:
# Read env variable and add relays
env_relays = os.getenv('RELAYS') # None
if env_relays is None:
env_relays = "wss://relay.nostr.band"
relay_manager = connect(env_relays.split(","))
def get_private_key():
env_private_key = os.environ.get("PRIVATE_KEY")
if not env_private_key:
print('The environment variable "PRIVATE_KEY" is not set.')
exit(1)
return PrivateKey(bytes.fromhex(env_private_key))
private_key = PrivateKey(bytes.fromhex(env_private_key))
public_key = private_key.public_key.hex()
def get_public_key():
private_key = get_private_key()
return private_key.public_key.hex()
old_block_height = 0
while True:
failCount = 0
try:
url = "https://blockstream.info/api/blocks/tip/height"
block_height = requests.get(url).json()
if(block_height > old_block_height):
message = "⚡️ " + str(block_height) + " ⚡️"
event = Event(
content=str(message),
public_key=private_key.public_key.hex()
)
private_key.sign_event(event)
relay_manager.publish_event(event)
# relay_manager.close_connections() # NEEDED?!
print(message)
old_block_height = block_height
time.sleep(5)
except Exception as e:
print(e)
failCount += 1
disconnect(relay_manager)
relay_manager = connect(env_relays.split(","))
if failCount > 5:
print("Too many fails, exiting...")
exit(1)
def fetch_latest_block_height():
url = "https://blockchain.info/latestblock"
response = requests.get(url)
data = response.json()
return data["height"]
def fetch_latest_bot_message(relay_manager):
public_key = get_public_key()
# print("Public Key: " + public_key)
filters = Filters([Filter(authors=[public_key], kinds=[EventKind.TEXT_NOTE], limit=1)])
# generate a random subscription id
subscription_id = str(uuid.uuid4())
request = [ClientMessageType.REQUEST, subscription_id]
request.extend(filters.to_json_array())
relay_manager.add_subscription(subscription_id, filters)
message = json.dumps(request)
relay_manager.publish_message(message)
while(not relay_manager.message_pool.has_events()):
print("Waiting for receiving latest bot message...", end="")
time.sleep(1)
print("done.")
message = relay_manager.message_pool.get_event().event.content
# print("Received latest bot message")
return message
def fetch_latest_bot_message_height(relay_manager):
message = fetch_latest_bot_message(relay_manager)
return int(message.split(" ")[1])
def main():
try:
relays = get_relays()
relay_manager = setup_relay_manager(relays)
private_key = get_private_key()
old_block_height = fetch_latest_bot_message_height(relay_manager=relay_manager)
print("Old Block Height: " + str(old_block_height))
while True:
try:
block_height = fetch_latest_block_height()
if block_height > old_block_height:
message = "⚡️ " + str(block_height) + " ⚡️"
print(message)
event = Event(
content=str(message),
public_key=private_key.public_key.hex()
)
private_key.sign_event(event)
relay_manager.publish_event(event)
old_block_height = block_height
except Exception as e:
print(e)
time.sleep(10)
time.sleep(5)
except Exception as e:
print(e)
relay_manager.close_connections()
exit(1)
except Exception as e:
print(e)
disconnect(relay_manager)
exit(1)
if __name__ == "__main__":
main()