mirror of
https://github.com/purrgrammer/grimoire.git
synced 2026-04-09 23:16:50 +02:00
feat: centralize publish flow with RxJS-based PublishService
Create a unified PublishService that: - Provides consistent relay selection (outbox + state + hints + fallbacks) - Emits RxJS observables for per-relay status updates - Handles EventStore integration automatically - Supports both fire-and-forget and observable-based publishing Refactor all publish locations to use the centralized service: - hub.ts: Use PublishService for ActionRunner publish - delete-event.ts: Use PublishService (fixes missing eventStore.add) - publish-spell.ts: Use PublishService with relay hint support - PostViewer.tsx: Use publishWithUpdates() for per-relay UI tracking This lays the groundwork for the event log feature by providing observable hooks into all publish operations.
This commit is contained in:
@@ -1,11 +1,6 @@
|
||||
import accountManager from "@/services/accounts";
|
||||
import pool from "@/services/relay-pool";
|
||||
import publishService from "@/services/publish-service";
|
||||
import { EventFactory } from "applesauce-core/event-factory";
|
||||
import { relayListCache } from "@/services/relay-list-cache";
|
||||
import { AGGREGATOR_RELAYS } from "@/services/loaders";
|
||||
import { mergeRelaySets } from "applesauce-core/helpers";
|
||||
import { grimoireStateAtom } from "@/core/state";
|
||||
import { getDefaultStore } from "jotai";
|
||||
import { NostrEvent } from "@/types/nostr";
|
||||
import { settingsManager } from "@/services/settings";
|
||||
import { GRIMOIRE_CLIENT_TAG } from "@/constants/app";
|
||||
@@ -37,24 +32,15 @@ export class DeleteEventAction {
|
||||
|
||||
const event = await factory.sign(draft);
|
||||
|
||||
// Get write relays from cache and state
|
||||
const authorWriteRelays =
|
||||
(await relayListCache.getOutboxRelays(account.pubkey)) || [];
|
||||
// Publish via centralized PublishService
|
||||
// Relay selection is handled automatically (outbox + state + aggregators)
|
||||
const result = await publishService.publish(event);
|
||||
|
||||
const store = getDefaultStore();
|
||||
const state = store.get(grimoireStateAtom);
|
||||
const stateWriteRelays =
|
||||
state.activeAccount?.relays?.filter((r) => r.write).map((r) => r.url) ||
|
||||
[];
|
||||
|
||||
// Combine all relay sources
|
||||
const writeRelays = mergeRelaySets(
|
||||
authorWriteRelays,
|
||||
stateWriteRelays,
|
||||
AGGREGATOR_RELAYS,
|
||||
);
|
||||
|
||||
// Publish to all target relays
|
||||
await pool.publish(writeRelays, event);
|
||||
if (!result.ok) {
|
||||
const errors = result.failed
|
||||
.map((f) => `${f.relay}: ${f.error}`)
|
||||
.join(", ");
|
||||
throw new Error(`Failed to publish deletion event. Errors: ${errors}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1,10 @@
|
||||
import { LocalSpell } from "@/services/db";
|
||||
import accountManager from "@/services/accounts";
|
||||
import pool from "@/services/relay-pool";
|
||||
import publishService from "@/services/publish-service";
|
||||
import { encodeSpell } from "@/lib/spell-conversion";
|
||||
import { markSpellPublished } from "@/services/spell-storage";
|
||||
import { EventFactory } from "applesauce-core/event-factory";
|
||||
import { SpellEvent } from "@/types/spell";
|
||||
import { relayListCache } from "@/services/relay-list-cache";
|
||||
import { AGGREGATOR_RELAYS } from "@/services/loaders";
|
||||
import { mergeRelaySets } from "applesauce-core/helpers";
|
||||
import eventStore from "@/services/event-store";
|
||||
import { settingsManager } from "@/services/settings";
|
||||
import { GRIMOIRE_CLIENT_TAG } from "@/constants/app";
|
||||
|
||||
@@ -25,7 +21,6 @@ export class PublishSpellAction {
|
||||
|
||||
if (spell.isPublished && spell.event) {
|
||||
// Use existing signed event for rebroadcasting
|
||||
|
||||
event = spell.event;
|
||||
} else {
|
||||
const signer = account.signer;
|
||||
@@ -34,9 +29,7 @@ export class PublishSpellAction {
|
||||
|
||||
const encoded = encodeSpell({
|
||||
command: spell.command,
|
||||
|
||||
name: spell.name,
|
||||
|
||||
description: spell.description,
|
||||
});
|
||||
|
||||
@@ -50,38 +43,35 @@ export class PublishSpellAction {
|
||||
|
||||
const draft = await factory.build({
|
||||
kind: 777,
|
||||
|
||||
content: encoded.content,
|
||||
|
||||
tags,
|
||||
});
|
||||
|
||||
event = (await factory.sign(draft)) as SpellEvent;
|
||||
}
|
||||
|
||||
// Use provided relays or fallback to author's write relays + aggregators
|
||||
// Get relay hints from event tags
|
||||
const eventRelayHints =
|
||||
event.tags.find((t) => t[0] === "relays")?.slice(1) || [];
|
||||
|
||||
let relays = targetRelays;
|
||||
|
||||
if (!relays || relays.length === 0) {
|
||||
const authorWriteRelays =
|
||||
(await relayListCache.getOutboxRelays(account.pubkey)) || [];
|
||||
|
||||
relays = mergeRelaySets(
|
||||
event.tags.find((t) => t[0] === "relays")?.slice(1) || [],
|
||||
|
||||
authorWriteRelays,
|
||||
|
||||
AGGREGATOR_RELAYS,
|
||||
);
|
||||
// Publish via centralized PublishService
|
||||
let result;
|
||||
if (targetRelays && targetRelays.length > 0) {
|
||||
// Use explicit target relays
|
||||
result = await publishService.publishToRelays(event, targetRelays);
|
||||
} else {
|
||||
// Use automatic relay selection with event hints
|
||||
result = await publishService.publish(event, {
|
||||
relayHints: eventRelayHints,
|
||||
});
|
||||
}
|
||||
|
||||
// Publish to all target relays
|
||||
|
||||
await pool.publish(relays, event);
|
||||
|
||||
// Add to event store for immediate availability
|
||||
eventStore.add(event);
|
||||
if (!result.ok) {
|
||||
const errors = result.failed
|
||||
.map((f) => `${f.relay}: ${f.error}`)
|
||||
.join(", ");
|
||||
throw new Error(`Failed to publish spell. Errors: ${errors}`);
|
||||
}
|
||||
|
||||
await markSpellPublished(spell.id, event);
|
||||
}
|
||||
|
||||
@@ -37,7 +37,9 @@ import {
|
||||
import { RelayLink } from "./nostr/RelayLink";
|
||||
import { Kind1Renderer } from "./nostr/kinds";
|
||||
import pool from "@/services/relay-pool";
|
||||
import eventStore from "@/services/event-store";
|
||||
import publishService, {
|
||||
type RelayPublishStatus,
|
||||
} from "@/services/publish-service";
|
||||
import { EventFactory } from "applesauce-core/event-factory";
|
||||
import { NoteBlueprint } from "@/lib/blueprints";
|
||||
import { useGrimoire } from "@/core/state";
|
||||
@@ -47,12 +49,9 @@ import { use$ } from "applesauce-react/hooks";
|
||||
import { getAuthIcon } from "@/lib/relay-status-utils";
|
||||
import { GRIMOIRE_CLIENT_TAG } from "@/constants/app";
|
||||
|
||||
// Per-relay publish status
|
||||
type RelayStatus = "pending" | "publishing" | "success" | "error";
|
||||
|
||||
interface RelayPublishState {
|
||||
url: string;
|
||||
status: RelayStatus;
|
||||
status: RelayPublishStatus;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
@@ -100,7 +99,7 @@ export function PostViewer({ windowId }: PostViewerProps = {}) {
|
||||
setRelayStates(
|
||||
writeRelays.map((url) => ({
|
||||
url,
|
||||
status: "pending" as RelayStatus,
|
||||
status: "pending" as RelayPublishStatus,
|
||||
})),
|
||||
);
|
||||
setSelectedRelays(new Set(writeRelays));
|
||||
@@ -157,7 +156,7 @@ export function PostViewer({ windowId }: PostViewerProps = {}) {
|
||||
.filter((url: string) => !currentRelayUrls.has(url))
|
||||
.map((url: string) => ({
|
||||
url,
|
||||
status: "pending" as RelayStatus,
|
||||
status: "pending" as RelayPublishStatus,
|
||||
}));
|
||||
return newRelays.length > 0 ? [...prev, ...newRelays] : prev;
|
||||
});
|
||||
@@ -275,39 +274,42 @@ export function PostViewer({ windowId }: PostViewerProps = {}) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// Update status to publishing
|
||||
setRelayStates((prev) =>
|
||||
prev.map((r) =>
|
||||
r.url === relayUrl
|
||||
? { ...r, status: "publishing" as RelayStatus }
|
||||
: r,
|
||||
),
|
||||
);
|
||||
// Update status to publishing
|
||||
setRelayStates((prev) =>
|
||||
prev.map((r) =>
|
||||
r.url === relayUrl
|
||||
? { ...r, status: "publishing" as RelayPublishStatus }
|
||||
: r,
|
||||
),
|
||||
);
|
||||
|
||||
// Republish the same signed event
|
||||
await pool.publish([relayUrl], lastPublishedEvent);
|
||||
// Retry via PublishService (skipEventStore since it's already in store)
|
||||
const result = await publishService.retryRelays(lastPublishedEvent, [
|
||||
relayUrl,
|
||||
]);
|
||||
|
||||
// Update status to success
|
||||
setRelayStates((prev) =>
|
||||
prev.map((r) =>
|
||||
r.url === relayUrl
|
||||
? { ...r, status: "success" as RelayStatus, error: undefined }
|
||||
: r,
|
||||
),
|
||||
);
|
||||
|
||||
toast.success(`Published to ${relayUrl.replace(/^wss?:\/\//, "")}`);
|
||||
} catch (error) {
|
||||
console.error(`Failed to retry publish to ${relayUrl}:`, error);
|
||||
if (result.ok) {
|
||||
setRelayStates((prev) =>
|
||||
prev.map((r) =>
|
||||
r.url === relayUrl
|
||||
? {
|
||||
...r,
|
||||
status: "error" as RelayStatus,
|
||||
error:
|
||||
error instanceof Error ? error.message : "Unknown error",
|
||||
status: "success" as RelayPublishStatus,
|
||||
error: undefined,
|
||||
}
|
||||
: r,
|
||||
),
|
||||
);
|
||||
toast.success(`Published to ${relayUrl.replace(/^wss?:\/\//, "")}`);
|
||||
} else {
|
||||
const error = result.failed[0]?.error || "Unknown error";
|
||||
setRelayStates((prev) =>
|
||||
prev.map((r) =>
|
||||
r.url === relayUrl
|
||||
? {
|
||||
...r,
|
||||
status: "error" as RelayPublishStatus,
|
||||
error,
|
||||
}
|
||||
: r,
|
||||
),
|
||||
@@ -409,67 +411,39 @@ export function PostViewer({ windowId }: PostViewerProps = {}) {
|
||||
}
|
||||
|
||||
// Signing succeeded, now publish to relays
|
||||
try {
|
||||
// Store the signed event for potential retries
|
||||
setLastPublishedEvent(event);
|
||||
// Store the signed event for potential retries
|
||||
setLastPublishedEvent(event);
|
||||
|
||||
// Update relay states - set selected to publishing, keep others as pending
|
||||
// Use PublishService with status updates
|
||||
const { updates$, result } = publishService.publishWithUpdates(event, {
|
||||
relays: selected,
|
||||
});
|
||||
|
||||
// Subscribe to per-relay status updates for UI
|
||||
const subscription = updates$.subscribe((update) => {
|
||||
setRelayStates((prev) =>
|
||||
prev.map((r) =>
|
||||
selected.includes(r.url)
|
||||
? { ...r, status: "publishing" as RelayStatus }
|
||||
r.url === update.relay
|
||||
? {
|
||||
...r,
|
||||
status: update.status,
|
||||
error: update.error,
|
||||
}
|
||||
: r,
|
||||
),
|
||||
);
|
||||
});
|
||||
|
||||
// Publish to each relay individually to track status
|
||||
const publishPromises = selected.map(async (relayUrl) => {
|
||||
try {
|
||||
await pool.publish([relayUrl], event);
|
||||
try {
|
||||
// Wait for publish to complete
|
||||
const publishResult = await result;
|
||||
|
||||
// Update status to success
|
||||
setRelayStates((prev) =>
|
||||
prev.map((r) =>
|
||||
r.url === relayUrl
|
||||
? { ...r, status: "success" as RelayStatus }
|
||||
: r,
|
||||
),
|
||||
);
|
||||
return { success: true, relayUrl };
|
||||
} catch (error) {
|
||||
console.error(`Failed to publish to ${relayUrl}:`, error);
|
||||
// Unsubscribe from updates
|
||||
subscription.unsubscribe();
|
||||
|
||||
// Update status to error
|
||||
setRelayStates((prev) =>
|
||||
prev.map((r) =>
|
||||
r.url === relayUrl
|
||||
? {
|
||||
...r,
|
||||
status: "error" as RelayStatus,
|
||||
error:
|
||||
error instanceof Error
|
||||
? error.message
|
||||
: "Unknown error",
|
||||
}
|
||||
: r,
|
||||
),
|
||||
);
|
||||
return { success: false, relayUrl };
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for all publishes to complete (settled = all finished, regardless of success/failure)
|
||||
const results = await Promise.allSettled(publishPromises);
|
||||
|
||||
// Check how many relays succeeded
|
||||
const successCount = results.filter(
|
||||
(r) => r.status === "fulfilled" && r.value.success,
|
||||
).length;
|
||||
|
||||
if (successCount > 0) {
|
||||
// At least one relay succeeded - add to event store
|
||||
eventStore.add(event);
|
||||
const successCount = publishResult.successful.length;
|
||||
|
||||
if (publishResult.ok) {
|
||||
// Clear draft from localStorage
|
||||
if (pubkey) {
|
||||
const draftKey = windowId
|
||||
@@ -501,16 +475,17 @@ export function PostViewer({ windowId }: PostViewerProps = {}) {
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
subscription.unsubscribe();
|
||||
console.error("Failed to publish:", error);
|
||||
toast.error(
|
||||
error instanceof Error ? error.message : "Failed to publish note",
|
||||
);
|
||||
|
||||
// Reset relay states to pending on publishing error
|
||||
// Reset relay states to error on publishing error
|
||||
setRelayStates((prev) =>
|
||||
prev.map((r) => ({
|
||||
...r,
|
||||
status: "error" as RelayStatus,
|
||||
status: "error" as RelayPublishStatus,
|
||||
error: error instanceof Error ? error.message : "Unknown error",
|
||||
})),
|
||||
);
|
||||
@@ -518,7 +493,7 @@ export function PostViewer({ windowId }: PostViewerProps = {}) {
|
||||
setIsPublishing(false);
|
||||
}
|
||||
},
|
||||
[canSign, signer, pubkey, selectedRelays, settings],
|
||||
[canSign, signer, pubkey, selectedRelays, settings, windowId],
|
||||
);
|
||||
|
||||
// Handle file paste
|
||||
@@ -585,7 +560,7 @@ export function PostViewer({ windowId }: PostViewerProps = {}) {
|
||||
// Add to relay states
|
||||
setRelayStates((prev) => [
|
||||
...prev,
|
||||
{ url: normalizedUrl, status: "pending" as RelayStatus },
|
||||
{ url: normalizedUrl, status: "pending" as RelayPublishStatus },
|
||||
]);
|
||||
|
||||
// Select the new relay
|
||||
|
||||
@@ -1,40 +1,30 @@
|
||||
import { ActionRunner } from "applesauce-actions";
|
||||
import eventStore from "./event-store";
|
||||
import { EventFactory } from "applesauce-core/event-factory";
|
||||
import pool from "./relay-pool";
|
||||
import { relayListCache } from "./relay-list-cache";
|
||||
import { getSeenRelays } from "applesauce-core/helpers/relays";
|
||||
import type { NostrEvent } from "nostr-tools/core";
|
||||
import accountManager from "./accounts";
|
||||
import publishService from "./publish-service";
|
||||
|
||||
/**
|
||||
* Publishes a Nostr event to relays using the author's outbox relays
|
||||
* Falls back to seen relays from the event if no relay list found
|
||||
* Publishes a Nostr event to relays using the centralized PublishService
|
||||
*
|
||||
* Relay selection strategy (in priority order):
|
||||
* 1. Author's outbox relays (kind 10002)
|
||||
* 2. User's configured write relays (from Grimoire state)
|
||||
* 3. Seen relays from the event
|
||||
* 4. Aggregator relays (fallback)
|
||||
*
|
||||
* @param event - The signed Nostr event to publish
|
||||
*/
|
||||
export async function publishEvent(event: NostrEvent): Promise<void> {
|
||||
// Try to get author's outbox relays from EventStore (kind 10002)
|
||||
let relays = await relayListCache.getOutboxRelays(event.pubkey);
|
||||
const result = await publishService.publish(event);
|
||||
|
||||
// Fallback to relays from the event itself (where it was seen)
|
||||
if (!relays || relays.length === 0) {
|
||||
const seenRelays = getSeenRelays(event);
|
||||
relays = seenRelays ? Array.from(seenRelays) : [];
|
||||
if (!result.ok) {
|
||||
const errors = result.failed
|
||||
.map((f) => `${f.relay}: ${f.error}`)
|
||||
.join(", ");
|
||||
throw new Error(`Failed to publish to any relay. Errors: ${errors}`);
|
||||
}
|
||||
|
||||
// If still no relays, throw error
|
||||
if (relays.length === 0) {
|
||||
throw new Error(
|
||||
"No relays found for publishing. Please configure relay list (kind 10002) or ensure event has relay hints.",
|
||||
);
|
||||
}
|
||||
|
||||
// Publish to relay pool
|
||||
await pool.publish(relays, event);
|
||||
|
||||
// Add to EventStore for immediate local availability
|
||||
eventStore.add(event);
|
||||
}
|
||||
|
||||
const factory = new EventFactory();
|
||||
@@ -46,7 +36,7 @@ const factory = new EventFactory();
|
||||
* Configured with:
|
||||
* - EventStore: Single source of truth for Nostr events
|
||||
* - EventFactory: Creates and signs events
|
||||
* - publishEvent: Publishes events to author's outbox relays (with fallback to seen relays)
|
||||
* - publishEvent: Publishes events via centralized PublishService
|
||||
*/
|
||||
export const hub = new ActionRunner(eventStore, factory, publishEvent);
|
||||
|
||||
@@ -56,20 +46,26 @@ accountManager.active$.subscribe((account) => {
|
||||
factory.setSigner(account?.signer || undefined);
|
||||
});
|
||||
|
||||
/**
|
||||
* Publishes a Nostr event to specific relays
|
||||
*
|
||||
* @param event - The signed Nostr event to publish
|
||||
* @param relays - Explicit list of relay URLs to publish to
|
||||
*/
|
||||
export async function publishEventToRelays(
|
||||
event: NostrEvent,
|
||||
relays: string[],
|
||||
): Promise<void> {
|
||||
// If no relays, throw error
|
||||
if (relays.length === 0) {
|
||||
throw new Error(
|
||||
"No relays found for publishing. Please configure relay list (kind 10002) or ensure event has relay hints.",
|
||||
);
|
||||
throw new Error("No relays provided for publishing.");
|
||||
}
|
||||
|
||||
// Publish to relay pool
|
||||
await pool.publish(relays, event);
|
||||
const result = await publishService.publishToRelays(event, relays);
|
||||
|
||||
// Add to EventStore for immediate local availability
|
||||
eventStore.add(event);
|
||||
if (!result.ok) {
|
||||
const errors = result.failed
|
||||
.map((f) => `${f.relay}: ${f.error}`)
|
||||
.join(", ");
|
||||
throw new Error(`Failed to publish to any relay. Errors: ${errors}`);
|
||||
}
|
||||
}
|
||||
|
||||
435
src/services/publish-service.ts
Normal file
435
src/services/publish-service.ts
Normal file
@@ -0,0 +1,435 @@
|
||||
/**
|
||||
* Centralized Publish Service
|
||||
*
|
||||
* Provides a unified API for publishing Nostr events with:
|
||||
* - Smart relay selection (outbox + state write relays + hints + fallbacks)
|
||||
* - Per-relay status tracking via RxJS observables
|
||||
* - EventStore integration
|
||||
* - Logging/observability hooks for EventLogService
|
||||
*
|
||||
* All publishing in Grimoire should go through this service.
|
||||
*/
|
||||
|
||||
import { Subject, Observable } from "rxjs";
|
||||
import { filter } from "rxjs/operators";
|
||||
import type { NostrEvent } from "nostr-tools";
|
||||
import { mergeRelaySets, getSeenRelays } from "applesauce-core/helpers";
|
||||
import pool from "./relay-pool";
|
||||
import eventStore from "./event-store";
|
||||
import { relayListCache } from "./relay-list-cache";
|
||||
import { AGGREGATOR_RELAYS } from "./loaders";
|
||||
import { grimoireStateAtom } from "@/core/state";
|
||||
import { getDefaultStore } from "jotai";
|
||||
|
||||
// ============================================================================
|
||||
// Types
|
||||
// ============================================================================
|
||||
|
||||
/** Status of a publish attempt to a single relay */
|
||||
export type RelayPublishStatus = "pending" | "publishing" | "success" | "error";
|
||||
|
||||
/** Per-relay status update */
|
||||
export interface RelayStatusUpdate {
|
||||
/** Unique ID for this publish operation */
|
||||
publishId: string;
|
||||
/** Relay URL */
|
||||
relay: string;
|
||||
/** Current status */
|
||||
status: RelayPublishStatus;
|
||||
/** Error message if status is 'error' */
|
||||
error?: string;
|
||||
/** Timestamp of this status update */
|
||||
timestamp: number;
|
||||
}
|
||||
|
||||
/** Overall publish operation event */
|
||||
export interface PublishEvent {
|
||||
/** Unique ID for this publish operation */
|
||||
id: string;
|
||||
/** The event being published */
|
||||
event: NostrEvent;
|
||||
/** Target relays */
|
||||
relays: string[];
|
||||
/** Timestamp when publish started */
|
||||
startedAt: number;
|
||||
/** Timestamp when publish completed (all relays resolved) */
|
||||
completedAt?: number;
|
||||
/** Per-relay results */
|
||||
results: Map<string, { status: RelayPublishStatus; error?: string }>;
|
||||
}
|
||||
|
||||
/** Result returned from publish operations */
|
||||
export interface PublishResult {
|
||||
/** Unique ID for this publish operation */
|
||||
publishId: string;
|
||||
/** The published event */
|
||||
event: NostrEvent;
|
||||
/** Relays that succeeded */
|
||||
successful: string[];
|
||||
/** Relays that failed with their errors */
|
||||
failed: Array<{ relay: string; error: string }>;
|
||||
/** Whether at least one relay succeeded */
|
||||
ok: boolean;
|
||||
}
|
||||
|
||||
/** Options for publish operations */
|
||||
export interface PublishOptions {
|
||||
/** Explicit relays to publish to (overrides automatic selection) */
|
||||
relays?: string[];
|
||||
/** Additional relay hints to include */
|
||||
relayHints?: string[];
|
||||
/** Skip adding to EventStore after publish */
|
||||
skipEventStore?: boolean;
|
||||
/** Custom publish ID (for retry operations) */
|
||||
publishId?: string;
|
||||
}
|
||||
|
||||
/** Options for relay selection */
|
||||
export interface RelaySelectionOptions {
|
||||
/** Author pubkey for outbox relay lookup */
|
||||
authorPubkey?: string;
|
||||
/** Additional relay hints */
|
||||
relayHints?: string[];
|
||||
/** Include aggregator relays as fallback */
|
||||
includeAggregators?: boolean;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// PublishService Class
|
||||
// ============================================================================
|
||||
|
||||
class PublishService {
|
||||
/** Subject for all publish events (start, complete) */
|
||||
private publishSubject = new Subject<PublishEvent>();
|
||||
|
||||
/** Subject for per-relay status updates */
|
||||
private statusSubject = new Subject<RelayStatusUpdate>();
|
||||
|
||||
/** Active publish operations */
|
||||
private activePublishes = new Map<string, PublishEvent>();
|
||||
|
||||
/** Counter for generating unique publish IDs */
|
||||
private publishCounter = 0;
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
// Public Observables
|
||||
// --------------------------------------------------------------------------
|
||||
|
||||
/** Observable of all publish events */
|
||||
readonly publish$ = this.publishSubject.asObservable();
|
||||
|
||||
/** Observable of all relay status updates */
|
||||
readonly status$ = this.statusSubject.asObservable();
|
||||
|
||||
/**
|
||||
* Get status updates for a specific publish operation
|
||||
*/
|
||||
getStatusUpdates(publishId: string): Observable<RelayStatusUpdate> {
|
||||
return this.status$.pipe(
|
||||
filter((update) => update.publishId === publishId),
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get status updates for a specific relay
|
||||
*/
|
||||
getRelayStatusUpdates(relay: string): Observable<RelayStatusUpdate> {
|
||||
return this.status$.pipe(filter((update) => update.relay === relay));
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
// Relay Selection
|
||||
// --------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Select relays for publishing an event
|
||||
*
|
||||
* Priority order:
|
||||
* 1. Author's outbox relays (kind 10002)
|
||||
* 2. User's configured write relays (from Grimoire state)
|
||||
* 3. Relay hints (seen relays, explicit hints)
|
||||
* 4. Aggregator relays (fallback)
|
||||
*/
|
||||
async selectRelays(options: RelaySelectionOptions = {}): Promise<string[]> {
|
||||
const {
|
||||
authorPubkey,
|
||||
relayHints = [],
|
||||
includeAggregators = true,
|
||||
} = options;
|
||||
|
||||
const relaySets: string[][] = [];
|
||||
|
||||
// 1. Author's outbox relays from kind 10002
|
||||
if (authorPubkey) {
|
||||
const outboxRelays = await relayListCache.getOutboxRelays(authorPubkey);
|
||||
if (outboxRelays && outboxRelays.length > 0) {
|
||||
relaySets.push(outboxRelays);
|
||||
}
|
||||
}
|
||||
|
||||
// 2. User's configured write relays from Grimoire state
|
||||
const store = getDefaultStore();
|
||||
const state = store.get(grimoireStateAtom);
|
||||
const stateWriteRelays =
|
||||
state.activeAccount?.relays?.filter((r) => r.write).map((r) => r.url) ||
|
||||
[];
|
||||
if (stateWriteRelays.length > 0) {
|
||||
relaySets.push(stateWriteRelays);
|
||||
}
|
||||
|
||||
// 3. Relay hints
|
||||
if (relayHints.length > 0) {
|
||||
relaySets.push(relayHints);
|
||||
}
|
||||
|
||||
// 4. Aggregator relays as fallback
|
||||
if (includeAggregators) {
|
||||
relaySets.push(AGGREGATOR_RELAYS);
|
||||
}
|
||||
|
||||
// Merge and deduplicate
|
||||
const merged = mergeRelaySets(...relaySets);
|
||||
|
||||
// If still empty, return aggregators as last resort
|
||||
if (merged.length === 0) {
|
||||
return AGGREGATOR_RELAYS;
|
||||
}
|
||||
|
||||
return merged;
|
||||
}
|
||||
|
||||
/**
|
||||
* Select relays for an event using its metadata
|
||||
*/
|
||||
async selectRelaysForEvent(
|
||||
event: NostrEvent,
|
||||
additionalHints: string[] = [],
|
||||
): Promise<string[]> {
|
||||
// Get seen relays from the event
|
||||
const seenRelays = getSeenRelays(event);
|
||||
const hints = [
|
||||
...additionalHints,
|
||||
...(seenRelays ? Array.from(seenRelays) : []),
|
||||
];
|
||||
|
||||
return this.selectRelays({
|
||||
authorPubkey: event.pubkey,
|
||||
relayHints: hints,
|
||||
includeAggregators: true,
|
||||
});
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
// Publish Methods
|
||||
// --------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Generate a unique publish ID
|
||||
*/
|
||||
private generatePublishId(): string {
|
||||
return `pub_${Date.now()}_${++this.publishCounter}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish an event and return a Promise with the result
|
||||
*
|
||||
* This is the main publish method - use this for simple fire-and-forget publishing.
|
||||
*/
|
||||
async publish(
|
||||
event: NostrEvent,
|
||||
options: PublishOptions = {},
|
||||
): Promise<PublishResult> {
|
||||
const publishId = options.publishId || this.generatePublishId();
|
||||
const startedAt = Date.now();
|
||||
|
||||
// Determine target relays
|
||||
let relays: string[];
|
||||
if (options.relays && options.relays.length > 0) {
|
||||
relays = options.relays;
|
||||
} else {
|
||||
relays = await this.selectRelaysForEvent(event, options.relayHints);
|
||||
}
|
||||
|
||||
if (relays.length === 0) {
|
||||
throw new Error(
|
||||
"No relays available for publishing. Please configure relay list or provide relay hints.",
|
||||
);
|
||||
}
|
||||
|
||||
// Initialize publish event
|
||||
const publishEvent: PublishEvent = {
|
||||
id: publishId,
|
||||
event,
|
||||
relays,
|
||||
startedAt,
|
||||
results: new Map(),
|
||||
};
|
||||
this.activePublishes.set(publishId, publishEvent);
|
||||
|
||||
// Emit initial publish event
|
||||
this.publishSubject.next(publishEvent);
|
||||
|
||||
// Emit initial pending status for all relays
|
||||
for (const relay of relays) {
|
||||
publishEvent.results.set(relay, { status: "pending" });
|
||||
this.emitStatus(publishId, relay, "pending");
|
||||
}
|
||||
|
||||
// Publish to each relay individually for status tracking
|
||||
const publishPromises = relays.map(async (relay) => {
|
||||
this.emitStatus(publishId, relay, "publishing");
|
||||
publishEvent.results.set(relay, { status: "publishing" });
|
||||
|
||||
try {
|
||||
await pool.publish([relay], event);
|
||||
publishEvent.results.set(relay, { status: "success" });
|
||||
this.emitStatus(publishId, relay, "success");
|
||||
return { relay, success: true as const };
|
||||
} catch (err) {
|
||||
const error = err instanceof Error ? err.message : "Unknown error";
|
||||
publishEvent.results.set(relay, { status: "error", error });
|
||||
this.emitStatus(publishId, relay, "error", error);
|
||||
return { relay, success: false as const, error };
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for all to complete
|
||||
const results = await Promise.all(publishPromises);
|
||||
|
||||
// Update publish event
|
||||
publishEvent.completedAt = Date.now();
|
||||
this.publishSubject.next(publishEvent);
|
||||
|
||||
// Build result
|
||||
const successful = results.filter((r) => r.success).map((r) => r.relay);
|
||||
const failed = results
|
||||
.filter(
|
||||
(r): r is { relay: string; success: false; error: string } =>
|
||||
!r.success,
|
||||
)
|
||||
.map((r) => ({ relay: r.relay, error: r.error }));
|
||||
|
||||
const result: PublishResult = {
|
||||
publishId,
|
||||
event,
|
||||
successful,
|
||||
failed,
|
||||
ok: successful.length > 0,
|
||||
};
|
||||
|
||||
// Add to EventStore if at least one relay succeeded
|
||||
if (result.ok && !options.skipEventStore) {
|
||||
eventStore.add(event);
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
this.activePublishes.delete(publishId);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish to specific relays (explicit relay list)
|
||||
*
|
||||
* Use this when you know exactly which relays to publish to.
|
||||
*/
|
||||
async publishToRelays(
|
||||
event: NostrEvent,
|
||||
relays: string[],
|
||||
options: Omit<PublishOptions, "relays"> = {},
|
||||
): Promise<PublishResult> {
|
||||
return this.publish(event, { ...options, relays });
|
||||
}
|
||||
|
||||
/**
|
||||
* Retry publishing to specific relays
|
||||
*
|
||||
* Use this to retry failed relays from a previous publish.
|
||||
*/
|
||||
async retryRelays(
|
||||
event: NostrEvent,
|
||||
relays: string[],
|
||||
originalPublishId?: string,
|
||||
): Promise<PublishResult> {
|
||||
return this.publish(event, {
|
||||
relays,
|
||||
publishId: originalPublishId ? `${originalPublishId}_retry` : undefined,
|
||||
skipEventStore: true, // Event should already be in store from original publish
|
||||
});
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
// Observable-based Publishing (for UI with live updates)
|
||||
// --------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Start a publish operation and return an Observable of status updates
|
||||
*
|
||||
* Use this when you need to show per-relay status in the UI.
|
||||
* The Observable completes when all relays have resolved.
|
||||
*/
|
||||
publishWithUpdates(
|
||||
event: NostrEvent,
|
||||
options: PublishOptions = {},
|
||||
): {
|
||||
publishId: string;
|
||||
updates$: Observable<RelayStatusUpdate>;
|
||||
result: Promise<PublishResult>;
|
||||
} {
|
||||
const publishId = options.publishId || this.generatePublishId();
|
||||
|
||||
// Create filtered observable for this publish
|
||||
const updates$ = this.getStatusUpdates(publishId);
|
||||
|
||||
// Start the publish (returns promise)
|
||||
const result = this.publish(event, { ...options, publishId });
|
||||
|
||||
return { publishId, updates$, result };
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// --------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Emit a status update
|
||||
*/
|
||||
private emitStatus(
|
||||
publishId: string,
|
||||
relay: string,
|
||||
status: RelayPublishStatus,
|
||||
error?: string,
|
||||
): void {
|
||||
this.statusSubject.next({
|
||||
publishId,
|
||||
relay,
|
||||
status,
|
||||
error,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get active publish operations
|
||||
*/
|
||||
getActivePublishes(): PublishEvent[] {
|
||||
return Array.from(this.activePublishes.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a publish operation is active
|
||||
*/
|
||||
isPublishing(publishId: string): boolean {
|
||||
return this.activePublishes.has(publishId);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Singleton Export
|
||||
// ============================================================================
|
||||
|
||||
const publishService = new PublishService();
|
||||
export default publishService;
|
||||
|
||||
// Also export the class for testing
|
||||
export { PublishService };
|
||||
Reference in New Issue
Block a user