181 lines
5.0 KiB
Go
181 lines
5.0 KiB
Go
package trending
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"git.highperfocused.tech/highperfocused/lumina-relay/relay/cache"
|
|
)
|
|
|
|
type Post struct {
|
|
ID string `json:"id"`
|
|
PubKey string `json:"pubkey"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
Kind string `json:"kind"`
|
|
Content string `json:"content"`
|
|
Tags [][]string `json:"tags"`
|
|
ReactionCount int `json:"reaction_count"`
|
|
}
|
|
|
|
var (
|
|
trendingCache = cache.New()
|
|
cacheDuration = 35 * time.Minute // Slightly longer than update interval
|
|
updateInterval = 30 * time.Minute
|
|
)
|
|
|
|
// Initialize sets up the trending system
|
|
// - Creates database tables
|
|
// - Performs initial calculation
|
|
// - Sets up periodic updates
|
|
func Initialize(db *sql.DB) error {
|
|
// Create necessary database tables
|
|
if err := CreateTablesIfNotExist(db); err != nil {
|
|
return fmt.Errorf("failed to create trending database tables: %w", err)
|
|
}
|
|
|
|
// Perform initial calculation
|
|
if _, err := calculateTrendingKind20(db); err != nil {
|
|
fmt.Printf("Error in initial trending calculation: %v\n", err)
|
|
}
|
|
|
|
// Set up periodic updates
|
|
go func() {
|
|
ticker := time.NewTicker(updateInterval)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
if _, err := calculateTrendingKind20(db); err != nil {
|
|
fmt.Printf("Error updating trending data: %v\n", err)
|
|
} else {
|
|
fmt.Printf("Successfully updated trending data at %s\n", time.Now().Format(time.RFC3339))
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetTrendingKind20 returns the top 20 trending posts of kind 20 from the last 24 hours
|
|
// It first tries the in-memory cache, then the database, and calculates on-demand if needed
|
|
func GetTrendingKind20(db *sql.DB) ([]Post, error) {
|
|
// Try in-memory cache first
|
|
if cached, ok := trendingCache.Get("trending_kind_20"); ok {
|
|
return cached.([]Post), nil
|
|
}
|
|
|
|
// If not in memory, try getting from database
|
|
posts, calculationTime, err := GetLatestTrendingFromHistory(db, 20)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// If we got data from the database and it's not too old, use it
|
|
if len(posts) > 0 && time.Since(calculationTime) < cacheDuration {
|
|
// Update in-memory cache
|
|
trendingCache.Set("trending_kind_20", posts, cacheDuration)
|
|
return posts, nil
|
|
}
|
|
|
|
// Calculate on-demand as a last resort
|
|
return calculateTrendingKind20(db)
|
|
}
|
|
|
|
// calculateTrendingKind20 performs the actual calculation of trending posts,
|
|
// updates the cache, and saves to database
|
|
func calculateTrendingKind20(db *sql.DB) ([]Post, error) {
|
|
query := `
|
|
WITH reactions AS (
|
|
SELECT
|
|
tags_expanded.value->1 #>> '{}' AS original_event_id,
|
|
COUNT(*) as reaction_count
|
|
FROM event e
|
|
CROSS JOIN LATERAL jsonb_array_elements(tags) as tags_expanded(value)
|
|
WHERE e.kind::text = '7'
|
|
AND e.created_at >= extract(epoch from now() - interval '24 hours')::bigint
|
|
AND tags_expanded.value->0 #>> '{}' = 'e'
|
|
GROUP BY tags_expanded.value->1 #>> '{}'
|
|
)
|
|
SELECT
|
|
e.id,
|
|
e.pubkey,
|
|
to_timestamp(e.created_at) as created_at,
|
|
e.kind,
|
|
e.content,
|
|
e.tags,
|
|
COALESCE(r.reaction_count, 0) as reaction_count
|
|
FROM event e
|
|
LEFT JOIN reactions r ON e.id = r.original_event_id
|
|
WHERE e.kind::text = '20'
|
|
AND e.created_at >= extract(epoch from now() - interval '24 hours')::bigint
|
|
ORDER BY reaction_count DESC, e.created_at DESC
|
|
LIMIT 20
|
|
`
|
|
|
|
rows, err := db.Query(query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var trendingPosts []Post
|
|
for rows.Next() {
|
|
var post Post
|
|
var tagsJSON []byte
|
|
if err := rows.Scan(&post.ID, &post.PubKey, &post.CreatedAt, &post.Kind, &post.Content, &tagsJSON, &post.ReactionCount); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := json.Unmarshal(tagsJSON, &post.Tags); err != nil {
|
|
return nil, err
|
|
}
|
|
trendingPosts = append(trendingPosts, post)
|
|
}
|
|
|
|
// Update in-memory cache
|
|
trendingCache.Set("trending_kind_20", trendingPosts, cacheDuration)
|
|
|
|
// Save to database for historical records
|
|
postsJSON, err := json.Marshal(trendingPosts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal trending posts to JSON: %w", err)
|
|
}
|
|
|
|
_, err = db.Exec(`
|
|
INSERT INTO trending_history (calculation_time, kind, trending_data)
|
|
VALUES (NOW(), 20, $1)
|
|
`, postsJSON)
|
|
|
|
if err != nil {
|
|
fmt.Printf("Warning: failed to store trending data in database: %v\n", err)
|
|
// Don't return error here as we want to still return the trending posts even if saving fails
|
|
}
|
|
|
|
return trendingPosts, nil
|
|
}
|
|
|
|
// UnmarshalPosts unmarshals JSON data into a slice of Post objects
|
|
func UnmarshalPosts(data []byte) ([]Post, error) {
|
|
var posts []Post
|
|
if err := json.Unmarshal(data, &posts); err != nil {
|
|
return nil, err
|
|
}
|
|
return posts, nil
|
|
}
|
|
|
|
// GetTrendingHistoryCount returns the count of trending history entries for the given kind
|
|
func GetTrendingHistoryCount(db *sql.DB, kind int) (int, error) {
|
|
var count int
|
|
err := db.QueryRow(`
|
|
SELECT COUNT(*)
|
|
FROM trending_history
|
|
WHERE kind = $1
|
|
`, kind).Scan(&count)
|
|
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to count trending history: %w", err)
|
|
}
|
|
|
|
return count, nil
|
|
}
|