Füge .gitignore, loadtest.py und requirements.txt hinzu

This commit is contained in:
2026-01-28 21:00:55 +01:00
commit f4e9dc5b79
3 changed files with 220 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
.venv
example.py

214
loadtest.py Normal file
View File

@@ -0,0 +1,214 @@
import json
import os
import time
import uuid
from dataclasses import dataclass
from typing import Any, Dict, Optional
import gevent
from locust import User, between, events, task
try:
import websocket # websocket-client
except Exception as exc: # pragma: no cover
websocket = None
_websocket_import_error = exc
else:
_websocket_import_error = None
@dataclass(frozen=True)
class NostrConfig:
relay_url: str
connect_timeout_s: float = 10.0
recv_timeout_s: float = 10.0
req_timeout_s: float = 10.0
filter_obj: Dict[str, Any] = None # type: ignore[assignment]
def _now_ms() -> float:
return time.perf_counter() * 1000.0
def _fire_ws(name: str, start_ms: float, response_length: int = 0, exc: Optional[BaseException] = None) -> None:
events.request.fire(
request_type="WS",
name=name,
response_time=_now_ms() - start_ms,
response_length=response_length,
exception=exc,
)
def _load_filter_from_env() -> Dict[str, Any]:
raw = os.getenv("NOSTR_FILTER_JSON")
if raw:
try:
parsed = json.loads(raw)
if isinstance(parsed, dict):
return parsed
except Exception:
pass
# Default: keep it lightweight and likely to succeed on most relays
return {"kinds": [1], "limit": 1}
def _load_config() -> NostrConfig:
relay_url = os.getenv("NOSTR_RELAY_URL") or os.getenv("RELAY_URL") or "wss://relay.damus.io"
return NostrConfig(
relay_url=relay_url,
connect_timeout_s=float(os.getenv("NOSTR_CONNECT_TIMEOUT", "10")),
recv_timeout_s=float(os.getenv("NOSTR_RECV_TIMEOUT", "10")),
req_timeout_s=float(os.getenv("NOSTR_REQ_TIMEOUT", "10")),
filter_obj=_load_filter_from_env(),
)
class NostrRelayUser(User):
"""Locust user that load tests a Nostr relay via raw WebSockets.
Each task:
- opens/ensures a WS connection
- sends a Nostr REQ with a unique subscription id
- reads until EOSE (or timeout)
- sends CLOSE for the subscription
"""
wait_time = between(0.5, 2.0)
def __init__(self, environment):
super().__init__(environment)
self._cfg = _load_config()
self._ws = None
def on_start(self) -> None:
if _websocket_import_error is not None:
raise RuntimeError(
"websocket-client is not available. Install dependencies via requirements.txt"
) from _websocket_import_error
self._connect()
def on_stop(self) -> None:
self._close_ws()
def _connect(self) -> None:
if self._ws is not None:
return
start = _now_ms()
try:
# websocket-client returns a WebSocket object
ws = websocket.create_connection(
self._cfg.relay_url,
timeout=self._cfg.connect_timeout_s,
enable_multithread=True,
)
ws.settimeout(self._cfg.recv_timeout_s)
self._ws = ws
_fire_ws("connect", start)
except Exception as exc:
_fire_ws("connect", start, exc=exc)
self._ws = None
raise
def _close_ws(self) -> None:
if self._ws is None:
return
start = _now_ms()
try:
self._ws.close()
_fire_ws("close", start)
except Exception as exc:
_fire_ws("close", start, exc=exc)
finally:
self._ws = None
def _send_json(self, name: str, msg: Any) -> None:
payload = json.dumps(msg, separators=(",", ":"))
start = _now_ms()
try:
assert self._ws is not None
self._ws.send(payload)
_fire_ws(name, start, response_length=len(payload))
except Exception as exc:
_fire_ws(name, start, exc=exc)
self._close_ws()
raise
def _recv_text(self) -> str:
assert self._ws is not None
start = _now_ms()
try:
data = self._ws.recv()
if data is None:
raise TimeoutError("recv returned None")
if isinstance(data, bytes):
text = data.decode("utf-8", errors="replace")
else:
text = str(data)
_fire_ws("recv", start, response_length=len(text))
return text
except Exception as exc:
_fire_ws("recv", start, exc=exc)
self._close_ws()
raise
def _req_until_eose(self, sub_id: str) -> int:
"""Return number of EVENT messages seen for this sub_id."""
deadline = time.monotonic() + self._cfg.req_timeout_s
event_count = 0
while time.monotonic() < deadline:
raw = self._recv_text()
try:
msg = json.loads(raw)
except Exception:
continue
if not isinstance(msg, list) or len(msg) < 2:
continue
msg_type = msg[0]
msg_sub = msg[1]
if msg_sub != sub_id:
continue
if msg_type == "EVENT":
event_count += 1
elif msg_type == "EOSE":
return event_count
elif msg_type == "NOTICE":
return event_count
raise TimeoutError("Timed out waiting for EOSE")
@task
def subscribe_once(self) -> None:
self._connect()
sub_id = uuid.uuid4().hex
start = _now_ms()
exc: Optional[BaseException] = None
events_seen = 0
try:
self._send_json("REQ", ["REQ", sub_id, self._cfg.filter_obj])
events_seen = self._req_until_eose(sub_id)
except Exception as e:
exc = e
finally:
try:
# Always try to CLOSE; ignore failures
if self._ws is not None:
self._send_json("CLOSE", ["CLOSE", sub_id])
except Exception:
pass
_fire_ws("REQ->EOSE", start, response_length=events_seen, exc=exc)
if exc is not None:
raise exc
# Usage:
# locust -f loadtest.py
# Optional env:
# NOSTR_RELAY_URL=wss://relay.example.com
# NOSTR_FILTER_JSON='{"kinds":[1],"limit":5}'
# NOSTR_REQ_TIMEOUT=10

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
locust
websockets
urllib3<2
websocket-client