diff --git a/.gitignore b/.gitignore index 06a75af..86523a7 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1 @@ -postgres/ \ No newline at end of file +postgres**/ \ No newline at end of file diff --git a/relay/main.go b/relay/main.go index be77293..3547137 100644 --- a/relay/main.go +++ b/relay/main.go @@ -5,14 +5,22 @@ import ( "fmt" "net/http" "os" + "strconv" "time" + "git.highperfocused.tech/highperfocused/lumina-relay/relay/cache" "git.highperfocused.tech/highperfocused/lumina-relay/relay/trending" "github.com/fiatjaf/eventstore/postgresql" "github.com/fiatjaf/khatru" "github.com/fiatjaf/khatru/policies" ) +// Cache for storing generic data like event counts +var dataCache = cache.New() + +const eventCountCacheKey = "total_event_count" +const eventCountCacheDuration = 1 * time.Minute + func getEnv(key, fallback string) string { if value, ok := os.LookupEnv(key); ok { return value @@ -20,6 +28,43 @@ func getEnv(key, fallback string) string { return fallback } +// Gets total event count, using cache if available +func getTotalEventCount(db *postgresql.PostgresBackend) (int, error) { + // Try getting from cache first + if cachedCount, ok := dataCache.Get(eventCountCacheKey); ok { + return cachedCount.(int), nil + } + + // If not in cache, query the database + count := 0 + row := db.DB.QueryRow("SELECT COUNT(*) FROM event") + if err := row.Scan(&count); err != nil { + return 0, err + } + + // Update the cache + dataCache.Set(eventCountCacheKey, count, eventCountCacheDuration) + return count, nil +} + +// Updates event count in the background periodically +func startEventCountUpdater(db *postgresql.PostgresBackend) { + go func() { + ticker := time.NewTicker(eventCountCacheDuration) + defer ticker.Stop() + for range ticker.C { + count := 0 + row := db.DB.QueryRow("SELECT COUNT(*) FROM event") + if err := row.Scan(&count); err != nil { + fmt.Printf("Error updating event count: %v\n", err) + continue + } + dataCache.Set(eventCountCacheKey, count, eventCountCacheDuration) + fmt.Printf("Updated event count cache: %d events\n", count) + } + }() +} + func main() { fmt.Print(` LUMINA RELAY @@ -48,6 +93,20 @@ func main() { panic(err) } + // Initialize trending system to start background calculations + fmt.Println("Initializing trending system...") + if err := trending.Initialize(db.DB.DB); err != nil { + fmt.Printf("Warning: Error initializing trending system: %v\n", err) + } + + // Initialize event count cache and start periodic updates + fmt.Println("Initializing event count cache...") + _, err := getTotalEventCount(&db) + if err != nil { + fmt.Printf("Warning: Error initializing event count cache: %v\n", err) + } + startEventCountUpdater(&db) + relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent) relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents) relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent) @@ -66,11 +125,11 @@ func main() { mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("content-type", "text/html") - // Query the total number of events - count := 0 - row := db.DB.QueryRow("SELECT COUNT(*) FROM event") - if err := row.Scan(&count); err != nil { - fmt.Printf("Error counting events: %v\n", err) + // Get event count from cache + count, err := getTotalEventCount(&db) + if err != nil { + fmt.Printf("Error getting event count: %v\n", err) + count = 0 // Fall back to zero if there's an error } // Improved HTML content with link to stats page @@ -120,6 +179,7 @@ func main() {

Welcome to LUMINA Relay!

Number of events stored: %d

View Event Stats

+

View Trending History

@@ -218,6 +278,13 @@ func main() { mux.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") + // Get total count from cache + totalCount, err := getTotalEventCount(&db) + if err != nil { + http.Error(w, fmt.Sprintf("Error getting event count: %v", err), http.StatusInternalServerError) + return + } + // Query the number of events for each kind, sorted by kind rows, err := db.DB.Query("SELECT kind, COUNT(*) FROM event GROUP BY kind ORDER BY kind") if err != nil { @@ -227,7 +294,6 @@ func main() { defer rows.Close() stats := make(map[string]int) - totalCount := 0 for rows.Next() { var kind string var count int @@ -236,7 +302,6 @@ func main() { return } stats[kind] = count - totalCount += count } // Add total count to the stats @@ -269,6 +334,276 @@ func main() { } }) + // Add endpoint for trending history + mux.HandleFunc("/api/trending/history/kind20", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + // Parse query parameters for pagination + limitStr := r.URL.Query().Get("limit") + offsetStr := r.URL.Query().Get("offset") + + limit := 10 // Default limit + offset := 0 // Default offset + + if limitStr != "" { + if val, err := strconv.Atoi(limitStr); err == nil && val > 0 { + limit = val + } + } + + if offsetStr != "" { + if val, err := strconv.Atoi(offsetStr); err == nil && val >= 0 { + offset = val + } + } + + // Get trending history for kind 20 + history, err := trending.GetTrendingHistoryForKind(db.DB.DB, 20, limit, offset) + if err != nil { + http.Error(w, fmt.Sprintf("Error getting trending history: %v", err), http.StatusInternalServerError) + return + } + + // Get total count for pagination info + totalCount, err := trending.GetTrendingHistoryCount(db.DB.DB, 20) + if err != nil { + http.Error(w, fmt.Sprintf("Error getting trending history count: %v", err), http.StatusInternalServerError) + return + } + + response := map[string]interface{}{ + "history": history, + "pagination": map[string]interface{}{ + "total": totalCount, + "limit": limit, + "offset": offset, + }, + } + + if err := json.NewEncoder(w).Encode(response); err != nil { + http.Error(w, fmt.Sprintf("Error encoding JSON: %v", err), http.StatusInternalServerError) + } + }) + + // Add UI for trending history + mux.HandleFunc("/trending/history", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "text/html") + + fmt.Fprintf(w, ` + + + + + + LUMINA Relay - Trending History + + + +
+

Trending History

+

Archive of trending posts calculations

+ +
+
Loading trending history data...
+
+ + +
+ + + + + `) + }) + fmt.Println("running on :3334") http.ListenAndServe(":3334", relay) } diff --git a/relay/trending/kinds.go b/relay/trending/kinds.go index ca394f9..e3a27d8 100644 --- a/relay/trending/kinds.go +++ b/relay/trending/kinds.go @@ -3,6 +3,7 @@ package trending import ( "database/sql" "encoding/json" + "fmt" "time" "git.highperfocused.tech/highperfocused/lumina-relay/relay/cache" @@ -19,16 +20,71 @@ type Post struct { } var ( - trendingCache = cache.New() - cacheDuration = 5 * time.Minute + trendingCache = cache.New() + cacheDuration = 35 * time.Minute // Slightly longer than update interval + updateInterval = 30 * time.Minute ) +// Initialize sets up the trending system +// - Creates database tables +// - Performs initial calculation +// - Sets up periodic updates +func Initialize(db *sql.DB) error { + // Create necessary database tables + if err := CreateTablesIfNotExist(db); err != nil { + return fmt.Errorf("failed to create trending database tables: %w", err) + } + + // Perform initial calculation + if _, err := calculateTrendingKind20(db); err != nil { + fmt.Printf("Error in initial trending calculation: %v\n", err) + } + + // Set up periodic updates + go func() { + ticker := time.NewTicker(updateInterval) + defer ticker.Stop() + + for range ticker.C { + if _, err := calculateTrendingKind20(db); err != nil { + fmt.Printf("Error updating trending data: %v\n", err) + } else { + fmt.Printf("Successfully updated trending data at %s\n", time.Now().Format(time.RFC3339)) + } + } + }() + + return nil +} + // GetTrendingKind20 returns the top 20 trending posts of kind 20 from the last 24 hours +// It first tries the in-memory cache, then the database, and calculates on-demand if needed func GetTrendingKind20(db *sql.DB) ([]Post, error) { + // Try in-memory cache first if cached, ok := trendingCache.Get("trending_kind_20"); ok { return cached.([]Post), nil } + // If not in memory, try getting from database + posts, calculationTime, err := GetLatestTrendingFromHistory(db, 20) + if err != nil { + return nil, err + } + + // If we got data from the database and it's not too old, use it + if len(posts) > 0 && time.Since(calculationTime) < cacheDuration { + // Update in-memory cache + trendingCache.Set("trending_kind_20", posts, cacheDuration) + return posts, nil + } + + // Calculate on-demand as a last resort + return calculateTrendingKind20(db) +} + +// calculateTrendingKind20 performs the actual calculation of trending posts, +// updates the cache, and saves to database +func calculateTrendingKind20(db *sql.DB) ([]Post, error) { query := ` WITH reactions AS ( SELECT @@ -76,6 +132,49 @@ func GetTrendingKind20(db *sql.DB) ([]Post, error) { trendingPosts = append(trendingPosts, post) } + // Update in-memory cache trendingCache.Set("trending_kind_20", trendingPosts, cacheDuration) + + // Save to database for historical records + postsJSON, err := json.Marshal(trendingPosts) + if err != nil { + return nil, fmt.Errorf("failed to marshal trending posts to JSON: %w", err) + } + + _, err = db.Exec(` + INSERT INTO trending_history (calculation_time, kind, trending_data) + VALUES (NOW(), 20, $1) + `, postsJSON) + + if err != nil { + fmt.Printf("Warning: failed to store trending data in database: %v\n", err) + // Don't return error here as we want to still return the trending posts even if saving fails + } + return trendingPosts, nil } + +// UnmarshalPosts unmarshals JSON data into a slice of Post objects +func UnmarshalPosts(data []byte) ([]Post, error) { + var posts []Post + if err := json.Unmarshal(data, &posts); err != nil { + return nil, err + } + return posts, nil +} + +// GetTrendingHistoryCount returns the count of trending history entries for the given kind +func GetTrendingHistoryCount(db *sql.DB, kind int) (int, error) { + var count int + err := db.QueryRow(` + SELECT COUNT(*) + FROM trending_history + WHERE kind = $1 + `, kind).Scan(&count) + + if err != nil { + return 0, fmt.Errorf("failed to count trending history: %w", err) + } + + return count, nil +} diff --git a/relay/trending/schema.go b/relay/trending/schema.go new file mode 100644 index 0000000..4970701 --- /dev/null +++ b/relay/trending/schema.go @@ -0,0 +1,140 @@ +package trending + +import ( + "database/sql" + "fmt" + "time" +) + +// Schema version to track database migrations +const SchemaVersion = 1 + +// CreateTablesIfNotExist ensures that all necessary database tables for the trending system exist +func CreateTablesIfNotExist(db *sql.DB) error { + // Create the trending_history table if it doesn't exist + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS trending_history ( + id SERIAL PRIMARY KEY, + calculation_time TIMESTAMPTZ NOT NULL, + kind INTEGER NOT NULL, + trending_data JSONB NOT NULL + ) + `) + if err != nil { + return fmt.Errorf("failed to create trending_history table: %w", err) + } + + // Create an index on calculation_time and kind for faster queries + _, err = db.Exec(` + CREATE INDEX IF NOT EXISTS idx_trending_history_time_kind + ON trending_history (calculation_time DESC, kind) + `) + if err != nil { + return fmt.Errorf("failed to create index on trending_history: %w", err) + } + + // Create schema version table if it doesn't exist + _, err = db.Exec(` + CREATE TABLE IF NOT EXISTS trending_schema_version ( + version INTEGER PRIMARY KEY, + updated_at TIMESTAMPTZ NOT NULL + ) + `) + if err != nil { + return fmt.Errorf("failed to create trending_schema_version table: %w", err) + } + + // Check if we need to initialize the schema version + var count int + err = db.QueryRow(`SELECT COUNT(*) FROM trending_schema_version`).Scan(&count) + if err != nil { + return fmt.Errorf("failed to check trending_schema_version: %w", err) + } + + if count == 0 { + _, err = db.Exec(` + INSERT INTO trending_schema_version (version, updated_at) + VALUES ($1, $2) + `, SchemaVersion, time.Now()) + if err != nil { + return fmt.Errorf("failed to initialize trending_schema_version: %w", err) + } + } + + return nil +} + +// GetLatestTrendingFromHistory retrieves the most recent trending data for the specified kind +func GetLatestTrendingFromHistory(db *sql.DB, kind int) ([]Post, time.Time, error) { + var ( + trendingData []byte + calculationTime time.Time + ) + + err := db.QueryRow(` + SELECT trending_data, calculation_time + FROM trending_history + WHERE kind = $1 + ORDER BY calculation_time DESC + LIMIT 1 + `, kind).Scan(&trendingData, &calculationTime) + + if err != nil { + if err == sql.ErrNoRows { + return []Post{}, time.Time{}, nil + } + return nil, time.Time{}, fmt.Errorf("failed to get latest trending data: %w", err) + } + + posts, err := UnmarshalPosts(trendingData) + if err != nil { + return nil, time.Time{}, fmt.Errorf("failed to unmarshal trending posts: %w", err) + } + + return posts, calculationTime, nil +} + +// GetTrendingHistoryForKind retrieves trending history for the specified kind +// limit defines how many records to return, offset is for pagination +func GetTrendingHistoryForKind(db *sql.DB, kind int, limit, offset int) ([]TrendingHistoryEntry, error) { + rows, err := db.Query(` + SELECT id, calculation_time, trending_data + FROM trending_history + WHERE kind = $1 + ORDER BY calculation_time DESC + LIMIT $2 OFFSET $3 + `, kind, limit, offset) + if err != nil { + return nil, fmt.Errorf("failed to query trending history: %w", err) + } + defer rows.Close() + + var entries []TrendingHistoryEntry + for rows.Next() { + var ( + entry TrendingHistoryEntry + trendingData []byte + ) + + err := rows.Scan(&entry.ID, &entry.CalculationTime, &trendingData) + if err != nil { + return nil, fmt.Errorf("failed to scan trending history entry: %w", err) + } + + entry.Posts, err = UnmarshalPosts(trendingData) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal trending posts: %w", err) + } + + entries = append(entries, entry) + } + + return entries, nil +} + +// TrendingHistoryEntry represents a historical record of trending data +type TrendingHistoryEntry struct { + ID int `json:"id"` + CalculationTime time.Time `json:"calculation_time"` + Posts []Post `json:"posts"` +}