Adds a byte count in the event per second logger

This commit is contained in:
Vitor Pamplona
2025-05-15 15:33:53 -04:00
parent ea9d25d13b
commit dc6f982ae9

View File

@@ -27,19 +27,30 @@ import com.vitorpamplona.ammolite.relays.Relay
import com.vitorpamplona.quartz.nip01Core.core.Event import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.utils.LargeCache import com.vitorpamplona.quartz.utils.LargeCache
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import kotlin.concurrent.atomics.ExperimentalAtomicApi
import kotlin.concurrent.atomics.plusAssign
import kotlin.concurrent.timer import kotlin.concurrent.timer
import kotlin.jvm.java import kotlin.jvm.java
@OptIn(ExperimentalAtomicApi::class)
class KindGroup( class KindGroup(
var count: AtomicInteger = AtomicInteger(0), var count: AtomicInteger = AtomicInteger(0),
var memory: AtomicLong = AtomicLong(0L),
val subs: LargeCache<String, AtomicInteger> = LargeCache<String, AtomicInteger>(), val subs: LargeCache<String, AtomicInteger> = LargeCache<String, AtomicInteger>(),
val relays: LargeCache<String, AtomicInteger> = LargeCache<String, AtomicInteger>(), val relays: LargeCache<String, AtomicInteger> = LargeCache<String, AtomicInteger>(),
) { ) {
companion object {
const val MB: Long = 1024
}
fun increment( fun increment(
mem: Long,
subId: String, subId: String,
relayUrl: String, relayUrl: String,
) { ) {
count.incrementAndGet() count.incrementAndGet()
memory.addAndGet(mem)
val subStats = subs.get(subId) val subStats = subs.get(subId)
if (subStats != null) { if (subStats != null) {
@@ -58,6 +69,7 @@ class KindGroup(
fun reset() { fun reset() {
count.set(0) count.set(0)
memory.set(0L)
subs.forEach { key, value -> value.set(0) } subs.forEach { key, value -> value.set(0) }
relays.forEach { key, value -> value.set(0) } relays.forEach { key, value -> value.set(0) }
} }
@@ -66,7 +78,7 @@ class KindGroup(
fun printRelays() = relays.joinToString(", ") { key, value -> if (value.get() > 0) "${key.removePrefix("wss://").removeSuffix("/")} ($value)" else "" } fun printRelays() = relays.joinToString(", ") { key, value -> if (value.get() > 0) "${key.removePrefix("wss://").removeSuffix("/")} ($value)" else "" }
override fun toString() = "(${count.get()}); ${printSubs()}; ${printRelays()}" override fun toString() = "(${count.get()} - ${memory.get().div(MB)}kb); ${printSubs()}; ${printRelays()}"
} }
class FrameStat { class FrameStat {
@@ -77,15 +89,16 @@ class FrameStat {
kind: Int, kind: Int,
subId: String, subId: String,
relayUrl: String, relayUrl: String,
memory: Long,
) { ) {
eventCount.incrementAndGet() eventCount.incrementAndGet()
val kindGroup = kinds.get(kind) val kindGroup = kinds.get(kind)
if (kindGroup != null) { if (kindGroup != null) {
kindGroup.increment(subId, relayUrl) kindGroup.increment(memory, subId, relayUrl)
} else { } else {
val group = KindGroup(AtomicInteger(0)) val group = KindGroup()
group.increment(subId, relayUrl) group.increment(memory, subId, relayUrl)
kinds.put(kind, group) kinds.put(kind, group)
} }
} }
@@ -141,7 +154,7 @@ class RelaySpeedLogger(
arrivalTime: Long, arrivalTime: Long,
afterEOSE: Boolean, afterEOSE: Boolean,
) { ) {
current.increment(event.kind, subscriptionId, relay.url) current.increment(event.kind, subscriptionId, relay.url, event.countMemory())
} }
} }