diff --git a/relay/main.go b/relay/main.go index c3c1fcf..3fc8a6f 100644 --- a/relay/main.go +++ b/relay/main.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "os" + "strconv" "time" "git.highperfocused.tech/highperfocused/lumina-relay/relay/trending" @@ -50,7 +51,9 @@ func main() { // Initialize trending system to start background calculations fmt.Println("Initializing trending system...") - trending.Initialize(db.DB.DB) + if err := trending.Initialize(db.DB.DB); err != nil { + fmt.Printf("Warning: Error initializing trending system: %v\n", err) + } relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent) relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents) @@ -273,6 +276,270 @@ func main() { } }) + // Add endpoint for trending history + mux.HandleFunc("/api/trending/history/kind20", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + // Parse query parameters for pagination + limitStr := r.URL.Query().Get("limit") + offsetStr := r.URL.Query().Get("offset") + + limit := 10 // Default limit + offset := 0 // Default offset + + if limitStr != "" { + if val, err := strconv.Atoi(limitStr); err == nil && val > 0 { + limit = val + } + } + + if offsetStr != "" { + if val, err := strconv.Atoi(offsetStr); err == nil && val >= 0 { + offset = val + } + } + + // Get trending history for kind 20 + history, err := trending.GetTrendingHistoryForKind(db.DB.DB, 20, limit, offset) + if err != nil { + http.Error(w, fmt.Sprintf("Error getting trending history: %v", err), http.StatusInternalServerError) + return + } + + // Get total count for pagination info + totalCount, err := trending.GetTrendingHistoryCount(db.DB.DB, 20) + if err != nil { + http.Error(w, fmt.Sprintf("Error getting trending history count: %v", err), http.StatusInternalServerError) + return + } + + response := map[string]interface{}{ + "history": history, + "pagination": map[string]interface{}{ + "total": totalCount, + "limit": limit, + "offset": offset, + }, + } + + if err := json.NewEncoder(w).Encode(response); err != nil { + http.Error(w, fmt.Sprintf("Error encoding JSON: %v", err), http.StatusInternalServerError) + } + }) + + // Add UI for trending history + mux.HandleFunc("/trending/history", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "text/html") + + fmt.Fprintf(w, ` + + + + + + LUMINA Relay - Trending History + + + +
+

Trending History

+

Archive of trending posts calculations

+ +
+
Loading trending history data...
+
+ + +
+ + + + + `) + }) + fmt.Println("running on :3334") http.ListenAndServe(":3334", relay) } diff --git a/relay/trending/kinds.go b/relay/trending/kinds.go index 96be3f9..00ccd68 100644 --- a/relay/trending/kinds.go +++ b/relay/trending/kinds.go @@ -25,9 +25,16 @@ var ( updateInterval = 30 * time.Minute ) -// Initialize sets up the background updating of trending calculations -// This should be called once at application startup -func Initialize(db *sql.DB) { +// Initialize sets up the trending system +// - Creates database tables +// - Performs initial calculation +// - Sets up periodic updates +func Initialize(db *sql.DB) error { + // Create necessary database tables + if err := CreateTablesIfNotExist(db); err != nil { + return fmt.Errorf("failed to create trending database tables: %w", err) + } + // Perform initial calculation if _, err := calculateTrendingKind20(db); err != nil { fmt.Printf("Error in initial trending calculation: %v\n", err) @@ -46,21 +53,37 @@ func Initialize(db *sql.DB) { } } }() + + return nil } // GetTrendingKind20 returns the top 20 trending posts of kind 20 from the last 24 hours -// It returns cached results that are updated periodically in the background +// It first tries the in-memory cache, then the database, and calculates on-demand if needed func GetTrendingKind20(db *sql.DB) ([]Post, error) { + // Try in-memory cache first if cached, ok := trendingCache.Get("trending_kind_20"); ok { return cached.([]Post), nil } - // If cache is empty (which shouldn't happen with background updates), - // calculate on demand + // If not in memory, try getting from database + posts, calculationTime, err := GetLatestTrendingFromHistory(db, 20) + if err != nil { + return nil, err + } + + // If we got data from the database and it's not too old, use it + if len(posts) > 0 && time.Since(calculationTime) < cacheDuration { + // Update in-memory cache + trendingCache.Set("trending_kind_20", posts, cacheDuration) + return posts, nil + } + + // Calculate on-demand as a last resort return calculateTrendingKind20(db) } -// calculateTrendingKind20 performs the actual calculation and updates the cache +// calculateTrendingKind20 performs the actual calculation of trending posts, +// updates the cache, and saves to database func calculateTrendingKind20(db *sql.DB) ([]Post, error) { query := ` WITH reactions AS ( @@ -109,6 +132,49 @@ func calculateTrendingKind20(db *sql.DB) ([]Post, error) { trendingPosts = append(trendingPosts, post) } + // Update in-memory cache trendingCache.Set("trending_kind_20", trendingPosts, cacheDuration) + + // Save to database for historical records + postsJSON, err := json.Marshal(trendingPosts) + if err != nil { + return nil, fmt.Errorf("failed to marshal trending posts to JSON: %w", err) + } + + _, err = db.Exec(` + INSERT INTO trending_history (calculation_time, kind, trending_data) + VALUES (NOW(), 20, $1) + `, postsJSON) + + if err != nil { + fmt.Printf("Warning: failed to store trending data in database: %v\n", err) + // Don't return error here as we want to still return the trending posts even if saving fails + } + return trendingPosts, nil } + +// UnmarshalPosts unmarshals JSON data into a slice of Post objects +func UnmarshalPosts(data []byte) ([]Post, error) { + var posts []Post + if err := json.Unmarshal(data, &posts); err != nil { + return nil, err + } + return posts, nil +} + +// GetTrendingHistoryCount returns the count of trending history entries for the given kind +func GetTrendingHistoryCount(db *sql.DB, kind int) (int, error) { + var count int + err := db.QueryRow(` + SELECT COUNT(*) + FROM trending_history + WHERE kind = $1 + `, kind).Scan(&count) + + if err != nil { + return 0, fmt.Errorf("failed to count trending history: %w", err) + } + + return count, nil +} diff --git a/relay/trending/schema.go b/relay/trending/schema.go new file mode 100644 index 0000000..40d1724 --- /dev/null +++ b/relay/trending/schema.go @@ -0,0 +1,140 @@ +package trending + +import ( + "database/sql" + "fmt" + "time" +) + +// Schema version to track database migrations +const SchemaVersion = 1 + +// CreateTablesIfNotExist ensures that all necessary database tables for the trending system exist +func CreateTablesIfNotExist(db *sql.DB) error { + // Create the trending_history table if it doesn't exist + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS trending_history ( + id SERIAL PRIMARY KEY, + calculation_time TIMESTAMPTZ NOT NULL, + kind INTEGER NOT NULL, + trending_data JSONB NOT NULL + ) + `) + if err != nil { + return fmt.Errorf("failed to create trending_history table: %w", err) + } + + // Create an index on calculation_time and kind for faster queries + _, err = db.Exec(` + CREATE INDEX IF NOT EXISTS idx_trending_history_time_kind + ON trending_history (calculation_time DESC, kind) + `) + if err != nil { + return fmt.Errorf("failed to create index on trending_history: %w", err) + } + + // Create schema version table if it doesn't exist + _, err = db.Exec(` + CREATE TABLE IF NOT EXISTS trending_schema_version ( + version INTEGER PRIMARY KEY, + updated_at TIMESTAMPTZ NOT NULL + ) + `) + if err != nil { + return fmt.Errorf("failed to create trending_schema_version table: %w", err) + } + + // Check if we need to initialize the schema version + var count int + err = db.QueryRow(`SELECT COUNT(*) FROM trending_schema_version`).Scan(&count) + if err != nil { + return fmt.Errorf("failed to check trending_schema_version: %w", err) + } + + if count == 0 { + _, err = db.Exec(` + INSERT INTO trending_schema_version (version, updated_at) + VALUES ($1, $2) + `, SchemaVersion, time.Now()) + if err != nil { + return fmt.Errorf("failed to initialize trending_schema_version: %w", err) + } + } + + return nil +} + +// GetLatestTrendingFromHistory retrieves the most recent trending data for the specified kind +func GetLatestTrendingFromHistory(db *sql.DB, kind int) ([]Post, time.Time, error) { + var ( + trendingData []byte + calculationTime time.Time + ) + + err := db.QueryRow(` + SELECT trending_data, calculation_time + FROM trending_history + WHERE kind = $1 + ORDER BY calculation_time DESC + LIMIT 1 + `, kind).Scan(&trendingData, &calculationTime) + + if err != nil { + if err == sql.ErrNoRows { + return []Post{}, time.Time{}, nil + } + return nil, time.Time{}, fmt.Errorf("failed to get latest trending data: %w", err) + } + + posts, err := UnmarshalPosts(trendingData) + if err != nil { + return nil, time.Time{}, fmt.Errorf("failed to unmarshal trending posts: %w", err) + } + + return posts, calculationTime, nil +} + +// GetTrendingHistoryForKind retrieves trending history for the specified kind +// limit defines how many records to return, offset is for pagination +func GetTrendingHistoryForKind(db *sql.DB, kind int, limit, offset int) ([]TrendingHistoryEntry, error) { + rows, err := db.Query(` + SELECT id, calculation_time, trending_data + FROM trending_history + WHERE kind = $1 + ORDER BY calculation_time DESC + LIMIT $2 OFFSET $3 + `, kind, limit, offset) + if err != nil { + return nil, fmt.Errorf("failed to query trending history: %w", err) + } + defer rows.Close() + + var entries []TrendingHistoryEntry + for rows.Next() { + var ( + entry TrendingHistoryEntry + trendingData []byte + ) + + err := rows.Scan(&entry.ID, &entry.CalculationTime, &trendingData) + if err != nil { + return nil, fmt.Errorf("failed to scan trending history entry: %w", err) + } + + entry.Posts, err = UnmarshalPosts(trendingData) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal trending posts: %w", err) + } + + entries = append(entries, entry) + } + + return entries, nil +} + +// TrendingHistoryEntry represents a historical record of trending data +type TrendingHistoryEntry struct { + ID int `json:"id"` + CalculationTime time.Time `json:"calculation_time"` + Posts []Post `json:"posts"` +} \ No newline at end of file