215 lines
5.1 KiB
Python
215 lines
5.1 KiB
Python
|
|
import json
|
|
import os
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, Optional
|
|
|
|
import gevent
|
|
from locust import User, between, events, task
|
|
|
|
try:
|
|
import websocket # websocket-client
|
|
except Exception as exc: # pragma: no cover
|
|
websocket = None
|
|
_websocket_import_error = exc
|
|
else:
|
|
_websocket_import_error = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class NostrConfig:
|
|
relay_url: str
|
|
connect_timeout_s: float = 10.0
|
|
recv_timeout_s: float = 10.0
|
|
req_timeout_s: float = 10.0
|
|
filter_obj: Dict[str, Any] = None # type: ignore[assignment]
|
|
|
|
|
|
def _now_ms() -> float:
|
|
return time.perf_counter() * 1000.0
|
|
|
|
|
|
def _fire_ws(name: str, start_ms: float, response_length: int = 0, exc: Optional[BaseException] = None) -> None:
|
|
events.request.fire(
|
|
request_type="WS",
|
|
name=name,
|
|
response_time=_now_ms() - start_ms,
|
|
response_length=response_length,
|
|
exception=exc,
|
|
)
|
|
|
|
|
|
def _load_filter_from_env() -> Dict[str, Any]:
|
|
raw = os.getenv("NOSTR_FILTER_JSON")
|
|
if raw:
|
|
try:
|
|
parsed = json.loads(raw)
|
|
if isinstance(parsed, dict):
|
|
return parsed
|
|
except Exception:
|
|
pass
|
|
# Default: keep it lightweight and likely to succeed on most relays
|
|
return {"kinds": [1], "limit": 1}
|
|
|
|
|
|
def _load_config() -> NostrConfig:
|
|
relay_url = os.getenv("NOSTR_RELAY_URL") or os.getenv("RELAY_URL") or "wss://relay.damus.io"
|
|
return NostrConfig(
|
|
relay_url=relay_url,
|
|
connect_timeout_s=float(os.getenv("NOSTR_CONNECT_TIMEOUT", "10")),
|
|
recv_timeout_s=float(os.getenv("NOSTR_RECV_TIMEOUT", "10")),
|
|
req_timeout_s=float(os.getenv("NOSTR_REQ_TIMEOUT", "10")),
|
|
filter_obj=_load_filter_from_env(),
|
|
)
|
|
|
|
|
|
class NostrRelayUser(User):
|
|
"""Locust user that load tests a Nostr relay via raw WebSockets.
|
|
|
|
Each task:
|
|
- opens/ensures a WS connection
|
|
- sends a Nostr REQ with a unique subscription id
|
|
- reads until EOSE (or timeout)
|
|
- sends CLOSE for the subscription
|
|
"""
|
|
|
|
wait_time = between(0.5, 2.0)
|
|
|
|
def __init__(self, environment):
|
|
super().__init__(environment)
|
|
self._cfg = _load_config()
|
|
self._ws = None
|
|
|
|
def on_start(self) -> None:
|
|
if _websocket_import_error is not None:
|
|
raise RuntimeError(
|
|
"websocket-client is not available. Install dependencies via requirements.txt"
|
|
) from _websocket_import_error
|
|
self._connect()
|
|
|
|
def on_stop(self) -> None:
|
|
self._close_ws()
|
|
|
|
def _connect(self) -> None:
|
|
if self._ws is not None:
|
|
return
|
|
|
|
start = _now_ms()
|
|
try:
|
|
# websocket-client returns a WebSocket object
|
|
ws = websocket.create_connection(
|
|
self._cfg.relay_url,
|
|
timeout=self._cfg.connect_timeout_s,
|
|
enable_multithread=True,
|
|
)
|
|
ws.settimeout(self._cfg.recv_timeout_s)
|
|
self._ws = ws
|
|
_fire_ws("connect", start)
|
|
except Exception as exc:
|
|
_fire_ws("connect", start, exc=exc)
|
|
self._ws = None
|
|
raise
|
|
|
|
def _close_ws(self) -> None:
|
|
if self._ws is None:
|
|
return
|
|
start = _now_ms()
|
|
try:
|
|
self._ws.close()
|
|
_fire_ws("close", start)
|
|
except Exception as exc:
|
|
_fire_ws("close", start, exc=exc)
|
|
finally:
|
|
self._ws = None
|
|
|
|
def _send_json(self, name: str, msg: Any) -> None:
|
|
payload = json.dumps(msg, separators=(",", ":"))
|
|
start = _now_ms()
|
|
try:
|
|
assert self._ws is not None
|
|
self._ws.send(payload)
|
|
_fire_ws(name, start, response_length=len(payload))
|
|
except Exception as exc:
|
|
_fire_ws(name, start, exc=exc)
|
|
self._close_ws()
|
|
raise
|
|
|
|
def _recv_text(self) -> str:
|
|
assert self._ws is not None
|
|
start = _now_ms()
|
|
try:
|
|
data = self._ws.recv()
|
|
if data is None:
|
|
raise TimeoutError("recv returned None")
|
|
if isinstance(data, bytes):
|
|
text = data.decode("utf-8", errors="replace")
|
|
else:
|
|
text = str(data)
|
|
_fire_ws("recv", start, response_length=len(text))
|
|
return text
|
|
except Exception as exc:
|
|
_fire_ws("recv", start, exc=exc)
|
|
self._close_ws()
|
|
raise
|
|
|
|
def _req_until_eose(self, sub_id: str) -> int:
|
|
"""Return number of EVENT messages seen for this sub_id."""
|
|
deadline = time.monotonic() + self._cfg.req_timeout_s
|
|
event_count = 0
|
|
while time.monotonic() < deadline:
|
|
raw = self._recv_text()
|
|
try:
|
|
msg = json.loads(raw)
|
|
except Exception:
|
|
continue
|
|
|
|
if not isinstance(msg, list) or len(msg) < 2:
|
|
continue
|
|
|
|
msg_type = msg[0]
|
|
msg_sub = msg[1]
|
|
if msg_sub != sub_id:
|
|
continue
|
|
|
|
if msg_type == "EVENT":
|
|
event_count += 1
|
|
elif msg_type == "EOSE":
|
|
return event_count
|
|
elif msg_type == "NOTICE":
|
|
return event_count
|
|
raise TimeoutError("Timed out waiting for EOSE")
|
|
|
|
@task
|
|
def subscribe_once(self) -> None:
|
|
self._connect()
|
|
sub_id = uuid.uuid4().hex
|
|
|
|
start = _now_ms()
|
|
exc: Optional[BaseException] = None
|
|
events_seen = 0
|
|
try:
|
|
self._send_json("REQ", ["REQ", sub_id, self._cfg.filter_obj])
|
|
events_seen = self._req_until_eose(sub_id)
|
|
except Exception as e:
|
|
exc = e
|
|
finally:
|
|
try:
|
|
# Always try to CLOSE; ignore failures
|
|
if self._ws is not None:
|
|
self._send_json("CLOSE", ["CLOSE", sub_id])
|
|
except Exception:
|
|
pass
|
|
_fire_ws("REQ->EOSE", start, response_length=events_seen, exc=exc)
|
|
if exc is not None:
|
|
raise exc
|
|
|
|
|
|
# Usage:
|
|
# locust -f loadtest.py
|
|
# Optional env:
|
|
# NOSTR_RELAY_URL=wss://relay.example.com
|
|
# NOSTR_FILTER_JSON='{"kinds":[1],"limit":5}'
|
|
# NOSTR_REQ_TIMEOUT=10
|