Compare commits
6 Commits
main
...
background
Author | SHA1 | Date | |
---|---|---|---|
|
bab736aa07 | ||
|
a5d6cc7e2e | ||
|
94439ee30b | ||
|
f98b118168 | ||
|
4aac6b6afa | ||
|
cc103043b4 |
2
.gitignore
vendored
2
.gitignore
vendored
@ -1 +1 @@
|
|||||||
postgres/
|
postgres**/
|
349
relay/main.go
349
relay/main.go
@ -5,14 +5,22 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.highperfocused.tech/highperfocused/lumina-relay/relay/cache"
|
||||||
"git.highperfocused.tech/highperfocused/lumina-relay/relay/trending"
|
"git.highperfocused.tech/highperfocused/lumina-relay/relay/trending"
|
||||||
"github.com/fiatjaf/eventstore/postgresql"
|
"github.com/fiatjaf/eventstore/postgresql"
|
||||||
"github.com/fiatjaf/khatru"
|
"github.com/fiatjaf/khatru"
|
||||||
"github.com/fiatjaf/khatru/policies"
|
"github.com/fiatjaf/khatru/policies"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Cache for storing generic data like event counts
|
||||||
|
var dataCache = cache.New()
|
||||||
|
|
||||||
|
const eventCountCacheKey = "total_event_count"
|
||||||
|
const eventCountCacheDuration = 1 * time.Minute
|
||||||
|
|
||||||
func getEnv(key, fallback string) string {
|
func getEnv(key, fallback string) string {
|
||||||
if value, ok := os.LookupEnv(key); ok {
|
if value, ok := os.LookupEnv(key); ok {
|
||||||
return value
|
return value
|
||||||
@ -20,6 +28,43 @@ func getEnv(key, fallback string) string {
|
|||||||
return fallback
|
return fallback
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Gets total event count, using cache if available
|
||||||
|
func getTotalEventCount(db *postgresql.PostgresBackend) (int, error) {
|
||||||
|
// Try getting from cache first
|
||||||
|
if cachedCount, ok := dataCache.Get(eventCountCacheKey); ok {
|
||||||
|
return cachedCount.(int), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If not in cache, query the database
|
||||||
|
count := 0
|
||||||
|
row := db.DB.QueryRow("SELECT COUNT(*) FROM event")
|
||||||
|
if err := row.Scan(&count); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the cache
|
||||||
|
dataCache.Set(eventCountCacheKey, count, eventCountCacheDuration)
|
||||||
|
return count, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Updates event count in the background periodically
|
||||||
|
func startEventCountUpdater(db *postgresql.PostgresBackend) {
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(eventCountCacheDuration)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for range ticker.C {
|
||||||
|
count := 0
|
||||||
|
row := db.DB.QueryRow("SELECT COUNT(*) FROM event")
|
||||||
|
if err := row.Scan(&count); err != nil {
|
||||||
|
fmt.Printf("Error updating event count: %v\n", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
dataCache.Set(eventCountCacheKey, count, eventCountCacheDuration)
|
||||||
|
fmt.Printf("Updated event count cache: %d events\n", count)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
fmt.Print(`
|
fmt.Print(`
|
||||||
LUMINA RELAY
|
LUMINA RELAY
|
||||||
@ -48,6 +93,20 @@ func main() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialize trending system to start background calculations
|
||||||
|
fmt.Println("Initializing trending system...")
|
||||||
|
if err := trending.Initialize(db.DB.DB); err != nil {
|
||||||
|
fmt.Printf("Warning: Error initializing trending system: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize event count cache and start periodic updates
|
||||||
|
fmt.Println("Initializing event count cache...")
|
||||||
|
_, err := getTotalEventCount(&db)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Warning: Error initializing event count cache: %v\n", err)
|
||||||
|
}
|
||||||
|
startEventCountUpdater(&db)
|
||||||
|
|
||||||
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
|
relay.StoreEvent = append(relay.StoreEvent, db.SaveEvent)
|
||||||
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
|
relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents)
|
||||||
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
|
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
|
||||||
@ -66,11 +125,11 @@ func main() {
|
|||||||
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("content-type", "text/html")
|
w.Header().Set("content-type", "text/html")
|
||||||
|
|
||||||
// Query the total number of events
|
// Get event count from cache
|
||||||
count := 0
|
count, err := getTotalEventCount(&db)
|
||||||
row := db.DB.QueryRow("SELECT COUNT(*) FROM event")
|
if err != nil {
|
||||||
if err := row.Scan(&count); err != nil {
|
fmt.Printf("Error getting event count: %v\n", err)
|
||||||
fmt.Printf("Error counting events: %v\n", err)
|
count = 0 // Fall back to zero if there's an error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Improved HTML content with link to stats page
|
// Improved HTML content with link to stats page
|
||||||
@ -120,6 +179,7 @@ func main() {
|
|||||||
<h1>Welcome to LUMINA Relay!</h1>
|
<h1>Welcome to LUMINA Relay!</h1>
|
||||||
<p>Number of events stored: %d</p>
|
<p>Number of events stored: %d</p>
|
||||||
<p><a href="/stats">View Event Stats</a></p>
|
<p><a href="/stats">View Event Stats</a></p>
|
||||||
|
<p><a href="/trending/history">View Trending History</a></p>
|
||||||
</div>
|
</div>
|
||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
@ -218,6 +278,13 @@ func main() {
|
|||||||
mux.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
|
mux.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
// Get total count from cache
|
||||||
|
totalCount, err := getTotalEventCount(&db)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, fmt.Sprintf("Error getting event count: %v", err), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Query the number of events for each kind, sorted by kind
|
// Query the number of events for each kind, sorted by kind
|
||||||
rows, err := db.DB.Query("SELECT kind, COUNT(*) FROM event GROUP BY kind ORDER BY kind")
|
rows, err := db.DB.Query("SELECT kind, COUNT(*) FROM event GROUP BY kind ORDER BY kind")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -227,7 +294,6 @@ func main() {
|
|||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
stats := make(map[string]int)
|
stats := make(map[string]int)
|
||||||
totalCount := 0
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var kind string
|
var kind string
|
||||||
var count int
|
var count int
|
||||||
@ -236,7 +302,6 @@ func main() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
stats[kind] = count
|
stats[kind] = count
|
||||||
totalCount += count
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add total count to the stats
|
// Add total count to the stats
|
||||||
@ -269,6 +334,276 @@ func main() {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Add endpoint for trending history
|
||||||
|
mux.HandleFunc("/api/trending/history/kind20", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
// Parse query parameters for pagination
|
||||||
|
limitStr := r.URL.Query().Get("limit")
|
||||||
|
offsetStr := r.URL.Query().Get("offset")
|
||||||
|
|
||||||
|
limit := 10 // Default limit
|
||||||
|
offset := 0 // Default offset
|
||||||
|
|
||||||
|
if limitStr != "" {
|
||||||
|
if val, err := strconv.Atoi(limitStr); err == nil && val > 0 {
|
||||||
|
limit = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if offsetStr != "" {
|
||||||
|
if val, err := strconv.Atoi(offsetStr); err == nil && val >= 0 {
|
||||||
|
offset = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get trending history for kind 20
|
||||||
|
history, err := trending.GetTrendingHistoryForKind(db.DB.DB, 20, limit, offset)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, fmt.Sprintf("Error getting trending history: %v", err), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get total count for pagination info
|
||||||
|
totalCount, err := trending.GetTrendingHistoryCount(db.DB.DB, 20)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, fmt.Sprintf("Error getting trending history count: %v", err), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
response := map[string]interface{}{
|
||||||
|
"history": history,
|
||||||
|
"pagination": map[string]interface{}{
|
||||||
|
"total": totalCount,
|
||||||
|
"limit": limit,
|
||||||
|
"offset": offset,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.NewEncoder(w).Encode(response); err != nil {
|
||||||
|
http.Error(w, fmt.Sprintf("Error encoding JSON: %v", err), http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Add UI for trending history
|
||||||
|
mux.HandleFunc("/trending/history", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("content-type", "text/html")
|
||||||
|
|
||||||
|
fmt.Fprintf(w, `
|
||||||
|
<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<head>
|
||||||
|
<meta charset="UTF-8">
|
||||||
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||||
|
<title>LUMINA Relay - Trending History</title>
|
||||||
|
<style>
|
||||||
|
body {
|
||||||
|
font-family: Arial, sans-serif;
|
||||||
|
background-color: #f4f4f4;
|
||||||
|
margin: 0;
|
||||||
|
padding: 20px;
|
||||||
|
}
|
||||||
|
.container {
|
||||||
|
max-width: 1200px;
|
||||||
|
margin: 0 auto;
|
||||||
|
background-color: #fff;
|
||||||
|
padding: 20px;
|
||||||
|
border-radius: 8px;
|
||||||
|
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
|
||||||
|
}
|
||||||
|
h1, h2 {
|
||||||
|
color: #333;
|
||||||
|
}
|
||||||
|
.history-item {
|
||||||
|
margin-bottom: 30px;
|
||||||
|
padding: 15px;
|
||||||
|
border: 1px solid #ddd;
|
||||||
|
border-radius: 5px;
|
||||||
|
}
|
||||||
|
.history-date {
|
||||||
|
font-weight: bold;
|
||||||
|
margin-bottom: 10px;
|
||||||
|
color: #555;
|
||||||
|
}
|
||||||
|
.posts-container {
|
||||||
|
display: grid;
|
||||||
|
grid-template-columns: repeat(auto-fill, minmax(300px, 1fr));
|
||||||
|
gap: 15px;
|
||||||
|
}
|
||||||
|
.post {
|
||||||
|
border: 1px solid #eee;
|
||||||
|
padding: 10px;
|
||||||
|
border-radius: 5px;
|
||||||
|
}
|
||||||
|
.post-content {
|
||||||
|
max-height: 100px;
|
||||||
|
overflow: hidden;
|
||||||
|
margin-bottom: 10px;
|
||||||
|
}
|
||||||
|
.post-id {
|
||||||
|
max-height: 100px;
|
||||||
|
overflow: hidden;
|
||||||
|
margin-bottom: 10px;
|
||||||
|
}
|
||||||
|
.post-reactions {
|
||||||
|
font-weight: bold;
|
||||||
|
color: #007bff;
|
||||||
|
}
|
||||||
|
.pagination {
|
||||||
|
margin-top: 20px;
|
||||||
|
display: flex;
|
||||||
|
justify-content: center;
|
||||||
|
}
|
||||||
|
.pagination a {
|
||||||
|
margin: 0 5px;
|
||||||
|
padding: 8px 12px;
|
||||||
|
border: 1px solid #ddd;
|
||||||
|
color: #007bff;
|
||||||
|
text-decoration: none;
|
||||||
|
border-radius: 3px;
|
||||||
|
}
|
||||||
|
.pagination a:hover {
|
||||||
|
background-color: #f8f8f8;
|
||||||
|
}
|
||||||
|
.loading {
|
||||||
|
text-align: center;
|
||||||
|
padding: 20px;
|
||||||
|
font-style: italic;
|
||||||
|
color: #777;
|
||||||
|
}
|
||||||
|
</style>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<div class="container">
|
||||||
|
<h1>Trending History</h1>
|
||||||
|
<p>Archive of trending posts calculations</p>
|
||||||
|
|
||||||
|
<div id="history-container">
|
||||||
|
<div class="loading">Loading trending history data...</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div id="pagination" class="pagination"></div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<script>
|
||||||
|
let currentOffset = 0;
|
||||||
|
const limit = 5;
|
||||||
|
let totalItems = 0;
|
||||||
|
|
||||||
|
// Format date for display
|
||||||
|
function formatDate(dateStr) {
|
||||||
|
const date = new Date(dateStr);
|
||||||
|
return date.toLocaleString();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Truncate content for preview
|
||||||
|
function truncateContent(content, maxLength = 100) {
|
||||||
|
if (content.length <= maxLength) return content;
|
||||||
|
return content.substr(0, maxLength) + '...';
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load trending history data
|
||||||
|
async function loadTrendingHistory(offset = 0) {
|
||||||
|
try {
|
||||||
|
const response = await fetch('/api/trending/history/kind20?limit=' + limit + '&offset=' + offset);
|
||||||
|
const data = await response.json();
|
||||||
|
|
||||||
|
if (!response.ok) throw new Error(data.message || 'Error loading trending history');
|
||||||
|
|
||||||
|
totalItems = data.pagination.total;
|
||||||
|
currentOffset = offset;
|
||||||
|
|
||||||
|
renderTrendingHistory(data.history);
|
||||||
|
renderPagination();
|
||||||
|
} catch (error) {
|
||||||
|
document.getElementById('history-container').innerHTML =
|
||||||
|
'<div class="error">Error loading trending history: ' + error.message + '</div>';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Render trending history
|
||||||
|
function renderTrendingHistory(historyItems) {
|
||||||
|
const container = document.getElementById('history-container');
|
||||||
|
|
||||||
|
if (historyItems.length === 0) {
|
||||||
|
container.innerHTML = '<p>No trending history data available yet.</p>';
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let html = '';
|
||||||
|
|
||||||
|
historyItems.forEach(item => {
|
||||||
|
html += '<div class="history-item">' +
|
||||||
|
'<div class="history-date">' +
|
||||||
|
'Calculated on: ' + formatDate(item.calculation_time) +
|
||||||
|
'</div>' +
|
||||||
|
'<div class="posts-container">';
|
||||||
|
|
||||||
|
item.posts.forEach(post => {
|
||||||
|
html += '<div class="post">' +
|
||||||
|
'<div class="post-id">' + truncateContent(post.id) + '</div><hr />' +
|
||||||
|
'<div class="post-content">' + truncateContent(post.content) + '</div>' +
|
||||||
|
'<div class="post-reactions">Reactions: ' + post.reaction_count + '</div>' +
|
||||||
|
'</div>';
|
||||||
|
});
|
||||||
|
|
||||||
|
html += '</div></div>';
|
||||||
|
});
|
||||||
|
|
||||||
|
container.innerHTML = html;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Render pagination controls
|
||||||
|
function renderPagination() {
|
||||||
|
const container = document.getElementById('pagination');
|
||||||
|
const totalPages = Math.ceil(totalItems / limit);
|
||||||
|
const currentPage = Math.floor(currentOffset / limit) + 1;
|
||||||
|
|
||||||
|
let html = '';
|
||||||
|
|
||||||
|
if (totalPages <= 1) {
|
||||||
|
container.innerHTML = '';
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Previous button
|
||||||
|
if (currentPage > 1) {
|
||||||
|
html += '<a href="#" onclick="loadTrendingHistory(' + ((currentPage - 2) * limit) + '); return false;">Previous</a>';
|
||||||
|
}
|
||||||
|
|
||||||
|
// Page numbers
|
||||||
|
const maxPagesToShow = 5;
|
||||||
|
let startPage = Math.max(1, currentPage - Math.floor(maxPagesToShow / 2));
|
||||||
|
let endPage = Math.min(totalPages, startPage + maxPagesToShow - 1);
|
||||||
|
|
||||||
|
if (endPage - startPage + 1 < maxPagesToShow) {
|
||||||
|
startPage = Math.max(1, endPage - maxPagesToShow + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (let i = startPage; i <= endPage; i++) {
|
||||||
|
const offset = (i - 1) * limit;
|
||||||
|
const active = i === currentPage ? 'active' : '';
|
||||||
|
html += '<a href="#" class="' + active + '" onclick="loadTrendingHistory(' + offset + '); return false;">' + i + '</a>';
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next button
|
||||||
|
if (currentPage < totalPages) {
|
||||||
|
html += '<a href="#" onclick="loadTrendingHistory(' + (currentPage * limit) + '); return false;">Next</a>';
|
||||||
|
}
|
||||||
|
|
||||||
|
container.innerHTML = html;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initial load
|
||||||
|
document.addEventListener('DOMContentLoaded', () => {
|
||||||
|
loadTrendingHistory();
|
||||||
|
});
|
||||||
|
</script>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
`)
|
||||||
|
})
|
||||||
|
|
||||||
fmt.Println("running on :3334")
|
fmt.Println("running on :3334")
|
||||||
http.ListenAndServe(":3334", relay)
|
http.ListenAndServe(":3334", relay)
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package trending
|
|||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.highperfocused.tech/highperfocused/lumina-relay/relay/cache"
|
"git.highperfocused.tech/highperfocused/lumina-relay/relay/cache"
|
||||||
@ -19,16 +20,71 @@ type Post struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
trendingCache = cache.New()
|
trendingCache = cache.New()
|
||||||
cacheDuration = 5 * time.Minute
|
cacheDuration = 35 * time.Minute // Slightly longer than update interval
|
||||||
|
updateInterval = 30 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Initialize sets up the trending system
|
||||||
|
// - Creates database tables
|
||||||
|
// - Performs initial calculation
|
||||||
|
// - Sets up periodic updates
|
||||||
|
func Initialize(db *sql.DB) error {
|
||||||
|
// Create necessary database tables
|
||||||
|
if err := CreateTablesIfNotExist(db); err != nil {
|
||||||
|
return fmt.Errorf("failed to create trending database tables: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform initial calculation
|
||||||
|
if _, err := calculateTrendingKind20(db); err != nil {
|
||||||
|
fmt.Printf("Error in initial trending calculation: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up periodic updates
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(updateInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for range ticker.C {
|
||||||
|
if _, err := calculateTrendingKind20(db); err != nil {
|
||||||
|
fmt.Printf("Error updating trending data: %v\n", err)
|
||||||
|
} else {
|
||||||
|
fmt.Printf("Successfully updated trending data at %s\n", time.Now().Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetTrendingKind20 returns the top 20 trending posts of kind 20 from the last 24 hours
|
// GetTrendingKind20 returns the top 20 trending posts of kind 20 from the last 24 hours
|
||||||
|
// It first tries the in-memory cache, then the database, and calculates on-demand if needed
|
||||||
func GetTrendingKind20(db *sql.DB) ([]Post, error) {
|
func GetTrendingKind20(db *sql.DB) ([]Post, error) {
|
||||||
|
// Try in-memory cache first
|
||||||
if cached, ok := trendingCache.Get("trending_kind_20"); ok {
|
if cached, ok := trendingCache.Get("trending_kind_20"); ok {
|
||||||
return cached.([]Post), nil
|
return cached.([]Post), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If not in memory, try getting from database
|
||||||
|
posts, calculationTime, err := GetLatestTrendingFromHistory(db, 20)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we got data from the database and it's not too old, use it
|
||||||
|
if len(posts) > 0 && time.Since(calculationTime) < cacheDuration {
|
||||||
|
// Update in-memory cache
|
||||||
|
trendingCache.Set("trending_kind_20", posts, cacheDuration)
|
||||||
|
return posts, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate on-demand as a last resort
|
||||||
|
return calculateTrendingKind20(db)
|
||||||
|
}
|
||||||
|
|
||||||
|
// calculateTrendingKind20 performs the actual calculation of trending posts,
|
||||||
|
// updates the cache, and saves to database
|
||||||
|
func calculateTrendingKind20(db *sql.DB) ([]Post, error) {
|
||||||
query := `
|
query := `
|
||||||
WITH reactions AS (
|
WITH reactions AS (
|
||||||
SELECT
|
SELECT
|
||||||
@ -76,6 +132,49 @@ func GetTrendingKind20(db *sql.DB) ([]Post, error) {
|
|||||||
trendingPosts = append(trendingPosts, post)
|
trendingPosts = append(trendingPosts, post)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update in-memory cache
|
||||||
trendingCache.Set("trending_kind_20", trendingPosts, cacheDuration)
|
trendingCache.Set("trending_kind_20", trendingPosts, cacheDuration)
|
||||||
|
|
||||||
|
// Save to database for historical records
|
||||||
|
postsJSON, err := json.Marshal(trendingPosts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to marshal trending posts to JSON: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = db.Exec(`
|
||||||
|
INSERT INTO trending_history (calculation_time, kind, trending_data)
|
||||||
|
VALUES (NOW(), 20, $1)
|
||||||
|
`, postsJSON)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Warning: failed to store trending data in database: %v\n", err)
|
||||||
|
// Don't return error here as we want to still return the trending posts even if saving fails
|
||||||
|
}
|
||||||
|
|
||||||
return trendingPosts, nil
|
return trendingPosts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UnmarshalPosts unmarshals JSON data into a slice of Post objects
|
||||||
|
func UnmarshalPosts(data []byte) ([]Post, error) {
|
||||||
|
var posts []Post
|
||||||
|
if err := json.Unmarshal(data, &posts); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return posts, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTrendingHistoryCount returns the count of trending history entries for the given kind
|
||||||
|
func GetTrendingHistoryCount(db *sql.DB, kind int) (int, error) {
|
||||||
|
var count int
|
||||||
|
err := db.QueryRow(`
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM trending_history
|
||||||
|
WHERE kind = $1
|
||||||
|
`, kind).Scan(&count)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to count trending history: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return count, nil
|
||||||
|
}
|
||||||
|
140
relay/trending/schema.go
Normal file
140
relay/trending/schema.go
Normal file
@ -0,0 +1,140 @@
|
|||||||
|
package trending
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Schema version to track database migrations
|
||||||
|
const SchemaVersion = 1
|
||||||
|
|
||||||
|
// CreateTablesIfNotExist ensures that all necessary database tables for the trending system exist
|
||||||
|
func CreateTablesIfNotExist(db *sql.DB) error {
|
||||||
|
// Create the trending_history table if it doesn't exist
|
||||||
|
_, err := db.Exec(`
|
||||||
|
CREATE TABLE IF NOT EXISTS trending_history (
|
||||||
|
id SERIAL PRIMARY KEY,
|
||||||
|
calculation_time TIMESTAMPTZ NOT NULL,
|
||||||
|
kind INTEGER NOT NULL,
|
||||||
|
trending_data JSONB NOT NULL
|
||||||
|
)
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create trending_history table: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an index on calculation_time and kind for faster queries
|
||||||
|
_, err = db.Exec(`
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_trending_history_time_kind
|
||||||
|
ON trending_history (calculation_time DESC, kind)
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create index on trending_history: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create schema version table if it doesn't exist
|
||||||
|
_, err = db.Exec(`
|
||||||
|
CREATE TABLE IF NOT EXISTS trending_schema_version (
|
||||||
|
version INTEGER PRIMARY KEY,
|
||||||
|
updated_at TIMESTAMPTZ NOT NULL
|
||||||
|
)
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create trending_schema_version table: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we need to initialize the schema version
|
||||||
|
var count int
|
||||||
|
err = db.QueryRow(`SELECT COUNT(*) FROM trending_schema_version`).Scan(&count)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to check trending_schema_version: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if count == 0 {
|
||||||
|
_, err = db.Exec(`
|
||||||
|
INSERT INTO trending_schema_version (version, updated_at)
|
||||||
|
VALUES ($1, $2)
|
||||||
|
`, SchemaVersion, time.Now())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to initialize trending_schema_version: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLatestTrendingFromHistory retrieves the most recent trending data for the specified kind
|
||||||
|
func GetLatestTrendingFromHistory(db *sql.DB, kind int) ([]Post, time.Time, error) {
|
||||||
|
var (
|
||||||
|
trendingData []byte
|
||||||
|
calculationTime time.Time
|
||||||
|
)
|
||||||
|
|
||||||
|
err := db.QueryRow(`
|
||||||
|
SELECT trending_data, calculation_time
|
||||||
|
FROM trending_history
|
||||||
|
WHERE kind = $1
|
||||||
|
ORDER BY calculation_time DESC
|
||||||
|
LIMIT 1
|
||||||
|
`, kind).Scan(&trendingData, &calculationTime)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return []Post{}, time.Time{}, nil
|
||||||
|
}
|
||||||
|
return nil, time.Time{}, fmt.Errorf("failed to get latest trending data: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
posts, err := UnmarshalPosts(trendingData)
|
||||||
|
if err != nil {
|
||||||
|
return nil, time.Time{}, fmt.Errorf("failed to unmarshal trending posts: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return posts, calculationTime, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTrendingHistoryForKind retrieves trending history for the specified kind
|
||||||
|
// limit defines how many records to return, offset is for pagination
|
||||||
|
func GetTrendingHistoryForKind(db *sql.DB, kind int, limit, offset int) ([]TrendingHistoryEntry, error) {
|
||||||
|
rows, err := db.Query(`
|
||||||
|
SELECT id, calculation_time, trending_data
|
||||||
|
FROM trending_history
|
||||||
|
WHERE kind = $1
|
||||||
|
ORDER BY calculation_time DESC
|
||||||
|
LIMIT $2 OFFSET $3
|
||||||
|
`, kind, limit, offset)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to query trending history: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var entries []TrendingHistoryEntry
|
||||||
|
for rows.Next() {
|
||||||
|
var (
|
||||||
|
entry TrendingHistoryEntry
|
||||||
|
trendingData []byte
|
||||||
|
)
|
||||||
|
|
||||||
|
err := rows.Scan(&entry.ID, &entry.CalculationTime, &trendingData)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to scan trending history entry: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
entry.Posts, err = UnmarshalPosts(trendingData)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to unmarshal trending posts: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
entries = append(entries, entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
return entries, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TrendingHistoryEntry represents a historical record of trending data
|
||||||
|
type TrendingHistoryEntry struct {
|
||||||
|
ID int `json:"id"`
|
||||||
|
CalculationTime time.Time `json:"calculation_time"`
|
||||||
|
Posts []Post `json:"posts"`
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user