import json import os import time import uuid from dataclasses import dataclass from typing import Any, Dict, Optional import gevent from locust import User, between, events, task try: import websocket # websocket-client except Exception as exc: # pragma: no cover websocket = None _websocket_import_error = exc else: _websocket_import_error = None @dataclass(frozen=True) class NostrConfig: relay_url: str connect_timeout_s: float = 10.0 recv_timeout_s: float = 10.0 req_timeout_s: float = 10.0 filter_obj: Dict[str, Any] = None # type: ignore[assignment] def _now_ms() -> float: return time.perf_counter() * 1000.0 def _fire_ws(name: str, start_ms: float, response_length: int = 0, exc: Optional[BaseException] = None) -> None: events.request.fire( request_type="WS", name=name, response_time=_now_ms() - start_ms, response_length=response_length, exception=exc, ) def _load_filter_from_env() -> Dict[str, Any]: raw = os.getenv("NOSTR_FILTER_JSON") if raw: try: parsed = json.loads(raw) if isinstance(parsed, dict): return parsed except Exception: pass # Default: keep it lightweight and likely to succeed on most relays return {"kinds": [1], "limit": 1} def _load_config() -> NostrConfig: relay_url = os.getenv("NOSTR_RELAY_URL") or os.getenv("RELAY_URL") or "wss://relay.damus.io" return NostrConfig( relay_url=relay_url, connect_timeout_s=float(os.getenv("NOSTR_CONNECT_TIMEOUT", "10")), recv_timeout_s=float(os.getenv("NOSTR_RECV_TIMEOUT", "10")), req_timeout_s=float(os.getenv("NOSTR_REQ_TIMEOUT", "10")), filter_obj=_load_filter_from_env(), ) class NostrRelayUser(User): """Locust user that load tests a Nostr relay via raw WebSockets. Each task: - opens/ensures a WS connection - sends a Nostr REQ with a unique subscription id - reads until EOSE (or timeout) - sends CLOSE for the subscription """ wait_time = between(0.5, 2.0) def __init__(self, environment): super().__init__(environment) self._cfg = _load_config() self._ws = None def on_start(self) -> None: if _websocket_import_error is not None: raise RuntimeError( "websocket-client is not available. Install dependencies via requirements.txt" ) from _websocket_import_error self._connect() def on_stop(self) -> None: self._close_ws() def _connect(self) -> None: if self._ws is not None: return start = _now_ms() try: # websocket-client returns a WebSocket object ws = websocket.create_connection( self._cfg.relay_url, timeout=self._cfg.connect_timeout_s, enable_multithread=True, ) ws.settimeout(self._cfg.recv_timeout_s) self._ws = ws _fire_ws("connect", start) except Exception as exc: _fire_ws("connect", start, exc=exc) self._ws = None raise def _close_ws(self) -> None: if self._ws is None: return start = _now_ms() try: self._ws.close() _fire_ws("close", start) except Exception as exc: _fire_ws("close", start, exc=exc) finally: self._ws = None def _send_json(self, name: str, msg: Any) -> None: payload = json.dumps(msg, separators=(",", ":")) start = _now_ms() try: assert self._ws is not None self._ws.send(payload) _fire_ws(name, start, response_length=len(payload)) except Exception as exc: _fire_ws(name, start, exc=exc) self._close_ws() raise def _recv_text(self) -> str: assert self._ws is not None start = _now_ms() try: data = self._ws.recv() if data is None: raise TimeoutError("recv returned None") if isinstance(data, bytes): text = data.decode("utf-8", errors="replace") else: text = str(data) _fire_ws("recv", start, response_length=len(text)) return text except Exception as exc: _fire_ws("recv", start, exc=exc) self._close_ws() raise def _req_until_eose(self, sub_id: str) -> int: """Return number of EVENT messages seen for this sub_id.""" deadline = time.monotonic() + self._cfg.req_timeout_s event_count = 0 while time.monotonic() < deadline: raw = self._recv_text() try: msg = json.loads(raw) except Exception: continue if not isinstance(msg, list) or len(msg) < 2: continue msg_type = msg[0] msg_sub = msg[1] if msg_sub != sub_id: continue if msg_type == "EVENT": event_count += 1 elif msg_type == "EOSE": return event_count elif msg_type == "NOTICE": return event_count raise TimeoutError("Timed out waiting for EOSE") @task def subscribe_once(self) -> None: self._connect() sub_id = uuid.uuid4().hex start = _now_ms() exc: Optional[BaseException] = None events_seen = 0 try: self._send_json("REQ", ["REQ", sub_id, self._cfg.filter_obj]) events_seen = self._req_until_eose(sub_id) except Exception as e: exc = e finally: try: # Always try to CLOSE; ignore failures if self._ws is not None: self._send_json("CLOSE", ["CLOSE", sub_id]) except Exception: pass _fire_ws("REQ->EOSE", start, response_length=events_seen, exc=exc) if exc is not None: raise exc # Usage: # locust -f loadtest.py # Optional env: # NOSTR_RELAY_URL=wss://relay.example.com # NOSTR_FILTER_JSON='{"kinds":[1],"limit":5}' # NOSTR_REQ_TIMEOUT=10