Files
locustr/loadtest.py

215 lines
5.1 KiB
Python

import json
import os
import time
import uuid
from dataclasses import dataclass
from typing import Any, Dict, Optional
import gevent
from locust import User, between, events, task
try:
import websocket # websocket-client
except Exception as exc: # pragma: no cover
websocket = None
_websocket_import_error = exc
else:
_websocket_import_error = None
@dataclass(frozen=True)
class NostrConfig:
relay_url: str
connect_timeout_s: float = 10.0
recv_timeout_s: float = 10.0
req_timeout_s: float = 10.0
filter_obj: Dict[str, Any] = None # type: ignore[assignment]
def _now_ms() -> float:
return time.perf_counter() * 1000.0
def _fire_ws(name: str, start_ms: float, response_length: int = 0, exc: Optional[BaseException] = None) -> None:
events.request.fire(
request_type="WS",
name=name,
response_time=_now_ms() - start_ms,
response_length=response_length,
exception=exc,
)
def _load_filter_from_env() -> Dict[str, Any]:
raw = os.getenv("NOSTR_FILTER_JSON")
if raw:
try:
parsed = json.loads(raw)
if isinstance(parsed, dict):
return parsed
except Exception:
pass
# Default: keep it lightweight and likely to succeed on most relays
return {"kinds": [1], "limit": 1}
def _load_config() -> NostrConfig:
relay_url = os.getenv("NOSTR_RELAY_URL") or os.getenv("RELAY_URL") or "wss://relay.damus.io"
return NostrConfig(
relay_url=relay_url,
connect_timeout_s=float(os.getenv("NOSTR_CONNECT_TIMEOUT", "10")),
recv_timeout_s=float(os.getenv("NOSTR_RECV_TIMEOUT", "10")),
req_timeout_s=float(os.getenv("NOSTR_REQ_TIMEOUT", "10")),
filter_obj=_load_filter_from_env(),
)
class NostrRelayUser(User):
"""Locust user that load tests a Nostr relay via raw WebSockets.
Each task:
- opens/ensures a WS connection
- sends a Nostr REQ with a unique subscription id
- reads until EOSE (or timeout)
- sends CLOSE for the subscription
"""
wait_time = between(0.5, 2.0)
def __init__(self, environment):
super().__init__(environment)
self._cfg = _load_config()
self._ws = None
def on_start(self) -> None:
if _websocket_import_error is not None:
raise RuntimeError(
"websocket-client is not available. Install dependencies via requirements.txt"
) from _websocket_import_error
self._connect()
def on_stop(self) -> None:
self._close_ws()
def _connect(self) -> None:
if self._ws is not None:
return
start = _now_ms()
try:
# websocket-client returns a WebSocket object
ws = websocket.create_connection(
self._cfg.relay_url,
timeout=self._cfg.connect_timeout_s,
enable_multithread=True,
)
ws.settimeout(self._cfg.recv_timeout_s)
self._ws = ws
_fire_ws("connect", start)
except Exception as exc:
_fire_ws("connect", start, exc=exc)
self._ws = None
raise
def _close_ws(self) -> None:
if self._ws is None:
return
start = _now_ms()
try:
self._ws.close()
_fire_ws("close", start)
except Exception as exc:
_fire_ws("close", start, exc=exc)
finally:
self._ws = None
def _send_json(self, name: str, msg: Any) -> None:
payload = json.dumps(msg, separators=(",", ":"))
start = _now_ms()
try:
assert self._ws is not None
self._ws.send(payload)
_fire_ws(name, start, response_length=len(payload))
except Exception as exc:
_fire_ws(name, start, exc=exc)
self._close_ws()
raise
def _recv_text(self) -> str:
assert self._ws is not None
start = _now_ms()
try:
data = self._ws.recv()
if data is None:
raise TimeoutError("recv returned None")
if isinstance(data, bytes):
text = data.decode("utf-8", errors="replace")
else:
text = str(data)
_fire_ws("recv", start, response_length=len(text))
return text
except Exception as exc:
_fire_ws("recv", start, exc=exc)
self._close_ws()
raise
def _req_until_eose(self, sub_id: str) -> int:
"""Return number of EVENT messages seen for this sub_id."""
deadline = time.monotonic() + self._cfg.req_timeout_s
event_count = 0
while time.monotonic() < deadline:
raw = self._recv_text()
try:
msg = json.loads(raw)
except Exception:
continue
if not isinstance(msg, list) or len(msg) < 2:
continue
msg_type = msg[0]
msg_sub = msg[1]
if msg_sub != sub_id:
continue
if msg_type == "EVENT":
event_count += 1
elif msg_type == "EOSE":
return event_count
elif msg_type == "NOTICE":
return event_count
raise TimeoutError("Timed out waiting for EOSE")
@task
def subscribe_once(self) -> None:
self._connect()
sub_id = uuid.uuid4().hex
start = _now_ms()
exc: Optional[BaseException] = None
events_seen = 0
try:
self._send_json("REQ", ["REQ", sub_id, self._cfg.filter_obj])
events_seen = self._req_until_eose(sub_id)
except Exception as e:
exc = e
finally:
try:
# Always try to CLOSE; ignore failures
if self._ws is not None:
self._send_json("CLOSE", ["CLOSE", sub_id])
except Exception:
pass
_fire_ws("REQ->EOSE", start, response_length=events_seen, exc=exc)
if exc is not None:
raise exc
# Usage:
# locust -f loadtest.py
# Optional env:
# NOSTR_RELAY_URL=wss://relay.example.com
# NOSTR_FILTER_JSON='{"kinds":[1],"limit":5}'
# NOSTR_REQ_TIMEOUT=10