mirror of
https://github.com/purrgrammer/grimoire.git
synced 2026-04-10 07:27:23 +02:00
feat: improve event-log reliability, add ERROR type and per-relay timing
Service improvements: - Fix notice$ duplicate logging with per-relay dedup tracking - Remove dead Array.isArray code path (notice$ emits strings) - Increase relay poll interval from 1s to 5s - Clean publishIdToEntryId map on terminal state, not just overflow - Immutable entry updates (spread instead of in-place mutation) - Extract NewEntry<T>/AddEntryInput helper types for clean addEntry signature - Clear lastNoticePerRelay on log clear New capabilities: - ERROR log type: subscribes to relay.error$ for connection failure reasons - RelayStatusEntry with updatedAt timestamp for per-relay response timing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,8 +2,9 @@
|
||||
* Event Log Service
|
||||
*
|
||||
* Provides an ephemeral log of relay operations for introspection:
|
||||
* - PUBLISH events with per-relay status
|
||||
* - PUBLISH events with per-relay status and timing
|
||||
* - CONNECT/DISCONNECT events
|
||||
* - ERROR events for connection failures
|
||||
* - AUTH events
|
||||
* - NOTICE events
|
||||
*
|
||||
@@ -30,9 +31,18 @@ export type EventLogType =
|
||||
| "PUBLISH"
|
||||
| "CONNECT"
|
||||
| "DISCONNECT"
|
||||
| "ERROR"
|
||||
| "AUTH"
|
||||
| "NOTICE";
|
||||
|
||||
/** Per-relay status with timing */
|
||||
export interface RelayStatusEntry {
|
||||
status: string;
|
||||
error?: string;
|
||||
/** Timestamp of the last status transition */
|
||||
updatedAt: number;
|
||||
}
|
||||
|
||||
/** Base interface for all log entries */
|
||||
interface BaseLogEntry {
|
||||
/** Unique ID for this log entry */
|
||||
@@ -52,8 +62,8 @@ export interface PublishLogEntry extends BaseLogEntry {
|
||||
event: NostrEvent;
|
||||
/** Target relays */
|
||||
relays: string[];
|
||||
/** Per-relay status */
|
||||
relayStatus: Map<string, { status: string; error?: string }>;
|
||||
/** Per-relay status with timing */
|
||||
relayStatus: Map<string, RelayStatusEntry>;
|
||||
/** Overall status: pending, partial, success, failed */
|
||||
status: "pending" | "partial" | "success" | "failed";
|
||||
/** Publish ID from PublishService */
|
||||
@@ -66,6 +76,14 @@ export interface ConnectLogEntry extends BaseLogEntry {
|
||||
relay: string;
|
||||
}
|
||||
|
||||
/** Connection error log entry */
|
||||
export interface ErrorLogEntry extends BaseLogEntry {
|
||||
type: "ERROR";
|
||||
relay: string;
|
||||
/** Error message */
|
||||
message: string;
|
||||
}
|
||||
|
||||
/** Auth event log entry */
|
||||
export interface AuthLogEntry extends BaseLogEntry {
|
||||
type: "AUTH";
|
||||
@@ -88,13 +106,30 @@ export interface NoticeLogEntry extends BaseLogEntry {
|
||||
export type LogEntry =
|
||||
| PublishLogEntry
|
||||
| ConnectLogEntry
|
||||
| ErrorLogEntry
|
||||
| AuthLogEntry
|
||||
| NoticeLogEntry;
|
||||
|
||||
/** Helper type for creating new entries (id/timestamp auto-generated) */
|
||||
type NewEntry<T extends LogEntry> = Omit<T, "id" | "timestamp"> & {
|
||||
id?: string;
|
||||
timestamp?: number;
|
||||
};
|
||||
|
||||
type AddEntryInput =
|
||||
| NewEntry<PublishLogEntry>
|
||||
| NewEntry<ConnectLogEntry>
|
||||
| NewEntry<ErrorLogEntry>
|
||||
| NewEntry<AuthLogEntry>
|
||||
| NewEntry<NoticeLogEntry>;
|
||||
|
||||
// ============================================================================
|
||||
// EventLogService Class
|
||||
// ============================================================================
|
||||
|
||||
/** Interval for polling new relays (ms) */
|
||||
const RELAY_POLL_INTERVAL = 5000;
|
||||
|
||||
class EventLogService {
|
||||
/** Maximum number of entries to keep in the log */
|
||||
private maxEntries: number;
|
||||
@@ -120,6 +155,9 @@ class EventLogService {
|
||||
/** Map of publish IDs to log entry IDs */
|
||||
private publishIdToEntryId = new Map<string, string>();
|
||||
|
||||
/** Track last seen notice per relay to prevent duplicates */
|
||||
private lastNoticePerRelay = new Map<string, string>();
|
||||
|
||||
/** Polling interval for new relays */
|
||||
private pollingIntervalId?: NodeJS.Timeout;
|
||||
|
||||
@@ -163,14 +201,14 @@ class EventLogService {
|
||||
// Monitor existing relays
|
||||
pool.relays.forEach((relay) => this.monitorRelay(relay));
|
||||
|
||||
// Poll for new relays
|
||||
// Poll for new relays (infrequent — new relays don't appear often)
|
||||
this.pollingIntervalId = setInterval(() => {
|
||||
pool.relays.forEach((relay) => {
|
||||
if (!this.relaySubscriptions.has(relay.url)) {
|
||||
this.monitorRelay(relay);
|
||||
}
|
||||
});
|
||||
}, 1000);
|
||||
}, RELAY_POLL_INTERVAL);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -194,7 +232,7 @@ class EventLogService {
|
||||
// --------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Monitor a relay for connection, auth, and notice events
|
||||
* Monitor a relay for connection, error, auth, and notice events
|
||||
*/
|
||||
private monitorRelay(relay: IRelay): void {
|
||||
const url = relay.url;
|
||||
@@ -219,6 +257,19 @@ class EventLogService {
|
||||
}),
|
||||
);
|
||||
|
||||
// Track connection errors
|
||||
subscription.add(
|
||||
relay.error$
|
||||
.pipe(filter((error): error is Error => error !== null))
|
||||
.subscribe((error) => {
|
||||
this.addEntry({
|
||||
type: "ERROR",
|
||||
relay: url,
|
||||
message: error.message || "Unknown connection error",
|
||||
});
|
||||
}),
|
||||
);
|
||||
|
||||
// Track authentication events
|
||||
subscription.add(
|
||||
relay.authenticated$
|
||||
@@ -250,22 +301,19 @@ class EventLogService {
|
||||
}),
|
||||
);
|
||||
|
||||
// Track notices
|
||||
// Track notices — deduplicate per relay
|
||||
subscription.add(
|
||||
relay.notice$.subscribe((notices) => {
|
||||
// notices can be a single string or array
|
||||
const noticeArray = Array.isArray(notices)
|
||||
? notices
|
||||
: notices
|
||||
? [notices]
|
||||
: [];
|
||||
// Only log new notices (last one)
|
||||
if (noticeArray.length > 0) {
|
||||
const latestNotice = noticeArray[noticeArray.length - 1];
|
||||
relay.notice$.subscribe((notice) => {
|
||||
if (
|
||||
typeof notice === "string" &&
|
||||
notice &&
|
||||
notice !== this.lastNoticePerRelay.get(url)
|
||||
) {
|
||||
this.lastNoticePerRelay.set(url, notice);
|
||||
this.addEntry({
|
||||
type: "NOTICE",
|
||||
relay: url,
|
||||
message: latestNotice,
|
||||
message: notice,
|
||||
});
|
||||
}
|
||||
}),
|
||||
@@ -285,30 +333,48 @@ class EventLogService {
|
||||
// Check if we already have an entry for this publish (avoid duplicates)
|
||||
const existingEntryId = this.publishIdToEntryId.get(event.id);
|
||||
if (existingEntryId) {
|
||||
// Update existing entry instead of creating a new one
|
||||
// Update existing entry immutably
|
||||
const entryIndex = this.entries.findIndex(
|
||||
(e) => e.id === existingEntryId && e.type === "PUBLISH",
|
||||
);
|
||||
if (entryIndex !== -1) {
|
||||
const entry = this.entries[entryIndex] as PublishLogEntry;
|
||||
entry.relayStatus = new Map(event.results);
|
||||
entry.status = this.calculatePublishStatus(event.results);
|
||||
const newRelayStatus = new Map<string, RelayStatusEntry>();
|
||||
// Preserve timing from existing entries, add timing for new ones
|
||||
for (const [relay, status] of event.results) {
|
||||
const existing = entry.relayStatus.get(relay);
|
||||
newRelayStatus.set(relay, {
|
||||
...status,
|
||||
updatedAt: existing?.updatedAt ?? Date.now(),
|
||||
});
|
||||
}
|
||||
this.entries[entryIndex] = {
|
||||
...entry,
|
||||
relayStatus: newRelayStatus,
|
||||
status: this.calculatePublishStatus(newRelayStatus),
|
||||
};
|
||||
this.entriesSubject.next([...this.entries]);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
const entryId = this.generateId();
|
||||
const now = Date.now();
|
||||
|
||||
// Create initial publish entry with timing
|
||||
const relayStatus = new Map<string, RelayStatusEntry>();
|
||||
for (const [relay, status] of event.results) {
|
||||
relayStatus.set(relay, { ...status, updatedAt: now });
|
||||
}
|
||||
|
||||
// Create initial publish entry
|
||||
const entry: PublishLogEntry = {
|
||||
id: entryId,
|
||||
type: "PUBLISH",
|
||||
timestamp: event.startedAt,
|
||||
event: event.event,
|
||||
relays: event.relays,
|
||||
relayStatus: new Map(event.results),
|
||||
status: this.calculatePublishStatus(event.results),
|
||||
relayStatus,
|
||||
status: this.calculatePublishStatus(relayStatus),
|
||||
publishId: event.id,
|
||||
};
|
||||
|
||||
@@ -325,7 +391,7 @@ class EventLogService {
|
||||
const entryId = this.publishIdToEntryId.get(update.publishId);
|
||||
if (!entryId) return;
|
||||
|
||||
// Find and update the publish entry
|
||||
// Find the publish entry
|
||||
const entryIndex = this.entries.findIndex(
|
||||
(e) => e.id === entryId && e.type === "PUBLISH",
|
||||
);
|
||||
@@ -333,14 +399,31 @@ class EventLogService {
|
||||
|
||||
const entry = this.entries[entryIndex] as PublishLogEntry;
|
||||
|
||||
// Update relay status
|
||||
entry.relayStatus.set(update.relay, {
|
||||
// Update immutably with timing
|
||||
const newRelayStatus = new Map(entry.relayStatus);
|
||||
newRelayStatus.set(update.relay, {
|
||||
status: update.status,
|
||||
error: update.error,
|
||||
updatedAt: update.timestamp,
|
||||
});
|
||||
|
||||
// Recalculate overall status
|
||||
entry.status = this.calculatePublishStatus(entry.relayStatus);
|
||||
const newStatus = this.calculatePublishStatus(newRelayStatus);
|
||||
|
||||
this.entries[entryIndex] = {
|
||||
...entry,
|
||||
relayStatus: newRelayStatus,
|
||||
status: newStatus,
|
||||
};
|
||||
|
||||
// Clean up publish ID mapping when publish reaches terminal state
|
||||
if (newStatus !== "pending") {
|
||||
const allTerminal = Array.from(newRelayStatus.values()).every(
|
||||
(r) => r.status === "success" || r.status === "error",
|
||||
);
|
||||
if (allTerminal) {
|
||||
this.publishIdToEntryId.delete(update.publishId);
|
||||
}
|
||||
}
|
||||
|
||||
// Notify subscribers
|
||||
this.entriesSubject.next([...this.entries]);
|
||||
@@ -350,7 +433,7 @@ class EventLogService {
|
||||
* Calculate overall publish status from relay results
|
||||
*/
|
||||
private calculatePublishStatus(
|
||||
results: Map<string, { status: string; error?: string }>,
|
||||
results: Map<string, RelayStatusEntry>,
|
||||
): "pending" | "partial" | "success" | "failed" {
|
||||
const statuses = Array.from(results.values()).map((r) => r.status);
|
||||
|
||||
@@ -387,25 +470,7 @@ class EventLogService {
|
||||
* Add an entry to the log
|
||||
* Accepts partial entries without id/timestamp (they will be generated)
|
||||
*/
|
||||
private addEntry(
|
||||
entry:
|
||||
| (Omit<PublishLogEntry, "id" | "timestamp"> & {
|
||||
id?: string;
|
||||
timestamp?: number;
|
||||
})
|
||||
| (Omit<ConnectLogEntry, "id" | "timestamp"> & {
|
||||
id?: string;
|
||||
timestamp?: number;
|
||||
})
|
||||
| (Omit<AuthLogEntry, "id" | "timestamp"> & {
|
||||
id?: string;
|
||||
timestamp?: number;
|
||||
})
|
||||
| (Omit<NoticeLogEntry, "id" | "timestamp"> & {
|
||||
id?: string;
|
||||
timestamp?: number;
|
||||
}),
|
||||
): void {
|
||||
private addEntry(entry: AddEntryInput): void {
|
||||
const fullEntry = {
|
||||
id: entry.id || this.generateId(),
|
||||
timestamp: entry.timestamp || Date.now(),
|
||||
@@ -448,6 +513,7 @@ class EventLogService {
|
||||
clear(): void {
|
||||
this.entries = [];
|
||||
this.publishIdToEntryId.clear();
|
||||
this.lastNoticePerRelay.clear();
|
||||
this.entriesSubject.next([]);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user