diff --git a/scraper/main.py b/scraper/main.py index 811cdd8..ac8ee66 100644 --- a/scraper/main.py +++ b/scraper/main.py @@ -3,18 +3,18 @@ import os import websockets import json from datetime import datetime +from uuid import uuid4 -async def relay_websockets(input_websocket, output_websocket, kinds): +async def relay_websockets(input_websocket, output_websocket, kinds, sub_id): while True: try: # Wait for an event on input websocket event = json.loads(await input_websocket.recv()) try: if(event[0] == "EVENT"): - # TODO: Broadcast to output websocket print("Got event: ", event) - # Output websocket broadcast will be implemented here - + # Forward the event to output websocket + await output_websocket.send(json.dumps(["EVENT", sub_id, event[2]])) elif(event[0] == "EOSE"): print("End of stream") @@ -31,11 +31,12 @@ async def relay_websockets(input_websocket, output_websocket, kinds): print("Connection closed, attempting to reconnect...") await asyncio.sleep(1) try: + new_sub_id = str(uuid4()) async with websockets.connect(os.environ.get("INPUT_RELAY")) as new_input_websocket, \ websockets.connect(os.environ.get("OUTPUT_RELAY")) as new_output_websocket: - message = '["REQ", "1337", {"kinds": '+kinds+', "limit": 10}]' + message = f'["REQ", "{new_sub_id}", {{"kinds": {kinds}, "limit": 10}}]' await new_input_websocket.send(message) - await relay_websockets(new_input_websocket, new_output_websocket, kinds) + await relay_websockets(new_input_websocket, new_output_websocket, kinds, new_sub_id) except Exception as error: # If the reconnection attempt fails, repeat the loop and try again @@ -56,11 +57,12 @@ async def main(): raise ValueError("Please set the OUTPUT_RELAY environment variable") try: + sub_id = str(uuid4()) async with websockets.connect(input_url) as input_websocket, \ websockets.connect(output_url) as output_websocket: - message = '["REQ", "1337", {"kinds": '+kinds+'}]' + message = f'["REQ", "{sub_id}", {{"kinds": {kinds}}}]' await input_websocket.send(message) - await relay_websockets(input_websocket, output_websocket, kinds) + await relay_websockets(input_websocket, output_websocket, kinds, sub_id) except Exception as error: # If the initial connection attempt fails, attempt to reconnect immediately