more logutil.Trace (#12177)

This commit is contained in:
Michael Yang
2025-09-03 17:24:39 -07:00
committed by GitHub
parent fb92b61754
commit b3e6120736

View File

@@ -429,12 +429,12 @@ func (s *Server) forwardBatch(pendingBatch batchState) (nextBatch batchState, er
// before setting up the next batch so the seqs inputs are ready to receive their // before setting up the next batch so the seqs inputs are ready to receive their
// token values and we get the correct input pointers for the batchInputs // token values and we get the correct input pointers for the batchInputs
if pendingBatch.ctx != nil { if pendingBatch.ctx != nil {
slog.Log(context.TODO(), logutil.LevelTrace, "forwardBatch waiting for compute to start", "pendingBatch.id", pendingBatch.id) logutil.Trace("forwardBatch waiting for compute to start", "pendingBatch.id", pendingBatch.id)
<-pendingBatch.computeStartedCh <-pendingBatch.computeStartedCh
slog.Log(context.TODO(), logutil.LevelTrace, "forwardBatch compute started, setting up next batch", "pendingBatch.id", pendingBatch.id, "id", s.batchID) logutil.Trace("forwardBatch compute started, setting up next batch", "pendingBatch.id", pendingBatch.id, "id", s.batchID)
nextBatch.inputsReadyCh = pendingBatch.outputsReadyCh // Chain the ouputs from the pending batch to the next inputs batch nextBatch.inputsReadyCh = pendingBatch.outputsReadyCh // Chain the ouputs from the pending batch to the next inputs batch
} else { } else {
slog.Log(context.TODO(), logutil.LevelTrace, "forwardBatch no pending batch detected", "batchID", s.batchID) logutil.Trace("forwardBatch no pending batch detected", "batchID", s.batchID)
// No pendingBatch, so the inputs will be ready in the seqs immediately // No pendingBatch, so the inputs will be ready in the seqs immediately
nextBatch.inputsReadyCh = make(chan struct{}, 1) nextBatch.inputsReadyCh = make(chan struct{}, 1)
nextBatch.inputsReadyCh <- struct{}{} nextBatch.inputsReadyCh <- struct{}{}
@@ -546,7 +546,7 @@ func (s *Server) forwardBatch(pendingBatch batchState) (nextBatch batchState, er
if i+1 == len(seq.inputs) { if i+1 == len(seq.inputs) {
batch.Outputs = append(batch.Outputs, int32(len(batchInputs)-1)) batch.Outputs = append(batch.Outputs, int32(len(batchInputs)-1))
} }
slog.Log(context.TODO(), logutil.LevelTrace, "forwardBatch iBatch", "batchID", s.batchID, "seqIdx", seqIdx, "seq.iBatch", seq.iBatch, "i+1", i+1, "len(seq.inputs)", len(seq.inputs)) logutil.Trace("forwardBatch iBatch", "batchID", s.batchID, "seqIdx", seqIdx, "seq.iBatch", seq.iBatch, "i+1", i+1, "len(seq.inputs)", len(seq.inputs))
seq.pendingInputs = append(seq.pendingInputs, inp) seq.pendingInputs = append(seq.pendingInputs, inp)
} }
@@ -560,7 +560,7 @@ func (s *Server) forwardBatch(pendingBatch batchState) (nextBatch batchState, er
} }
if len(batchInputs) == 0 { if len(batchInputs) == 0 {
slog.Log(context.TODO(), logutil.LevelTrace, "forwardBatch no batchInputs, going idle", "batchID", s.batchID) logutil.Trace("forwardBatch no batchInputs, going idle", "batchID", s.batchID)
nextBatch.ctx.Close() nextBatch.ctx.Close()
nextBatch.ctx = nil nextBatch.ctx = nil
return return
@@ -589,14 +589,14 @@ func (s *Server) computeBatch(activeBatch batchState) {
defer activeBatch.ctx.Close() defer activeBatch.ctx.Close()
// Wait until inputs are ready // Wait until inputs are ready
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: waiting for inputs to be ready", "batchID", activeBatch.id) logutil.Trace("computeBatch: waiting for inputs to be ready", "batchID", activeBatch.id)
<-activeBatch.inputsReadyCh <-activeBatch.inputsReadyCh
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: inputs are ready", "batchID", activeBatch.id) logutil.Trace("computeBatch: inputs are ready", "batchID", activeBatch.id)
// Once we complete, signal the next batch of inputs are ready // Once we complete, signal the next batch of inputs are ready
// This will unblock the next computeBatch, or forwardBatch if new seqs come in // This will unblock the next computeBatch, or forwardBatch if new seqs come in
defer func() { defer func() {
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: outputs are ready", "batchID", activeBatch.id) logutil.Trace("computeBatch: outputs are ready", "batchID", activeBatch.id)
activeBatch.outputsReadyCh <- struct{}{} activeBatch.outputsReadyCh <- struct{}{}
}() }()
@@ -626,7 +626,7 @@ func (s *Server) computeBatch(activeBatch batchState) {
// Detect if the sequence we're processing has already been completed and replaced // Detect if the sequence we're processing has already been completed and replaced
// with a new sequence // with a new sequence
if seq != activeBatch.seqs[i] { if seq != activeBatch.seqs[i] {
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: sequence replaced, discarding its results", "batchID", activeBatch.id, "seqIdx", i) logutil.Trace("computeBatch: sequence replaced, discarding its results", "batchID", activeBatch.id, "seqIdx", i)
continue continue
} }
@@ -666,18 +666,18 @@ func (s *Server) computeBatch(activeBatch batchState) {
activeBatch.batch.Inputs.SetValueFromIntSlice(batchInputs) activeBatch.batch.Inputs.SetValueFromIntSlice(batchInputs)
activeBatch.ctx.ComputeWithNotify( activeBatch.ctx.ComputeWithNotify(
func() { func() {
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: signaling computeStartedCh", "batchID", activeBatch.id) logutil.Trace("computeBatch: signaling computeStartedCh", "batchID", activeBatch.id)
activeBatch.computeStartedCh <- struct{}{} activeBatch.computeStartedCh <- struct{}{}
}, },
activeBatch.modelOutput) activeBatch.modelOutput)
logits := activeBatch.modelOutput.Floats() logits := activeBatch.modelOutput.Floats()
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: logits ready", "batchID", activeBatch.id) logutil.Trace("computeBatch: logits ready", "batchID", activeBatch.id)
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: decoding", "batchID", activeBatch.id) logutil.Trace("computeBatch: decoding", "batchID", activeBatch.id)
for i, seq := range s.seqs { for i, seq := range s.seqs {
if seq == nil || nextBatchTokens[i] == nil { if seq == nil || nextBatchTokens[i] == nil {
continue continue
@@ -697,7 +697,7 @@ func (s *Server) computeBatch(activeBatch batchState) {
// sample a token // sample a token
vocabSize := len(logits) / len(activeBatch.batch.Outputs) vocabSize := len(logits) / len(activeBatch.batch.Outputs)
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: vocab details", "batchID", activeBatch.id, "seqIdx", i, "len(logits)", len(logits), "len(activeBatch.batch.Outputs)", len(activeBatch.batch.Outputs), "vocabSize", vocabSize, "iBatches", iBatches) logutil.Trace("computeBatch: vocab details", "batchID", activeBatch.id, "seqIdx", i, "len(logits)", len(logits), "len(activeBatch.batch.Outputs)", len(activeBatch.batch.Outputs), "vocabSize", vocabSize, "iBatches", iBatches)
token, err := seq.sampler.Sample(logits[iBatches[i]*vocabSize : (iBatches[i]+1)*vocabSize]) token, err := seq.sampler.Sample(logits[iBatches[i]*vocabSize : (iBatches[i]+1)*vocabSize])
if err != nil { if err != nil {
s.hardErrCh <- fmt.Errorf("failed to sample token: %w", err) s.hardErrCh <- fmt.Errorf("failed to sample token: %w", err)
@@ -711,7 +711,7 @@ func (s *Server) computeBatch(activeBatch batchState) {
// TODO (jmorganca): we should send this back // TODO (jmorganca): we should send this back
// as it's important for the /api/generate context // as it's important for the /api/generate context
// seq.responses <- piece // seq.responses <- piece
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: EOS", "batchID", activeBatch.id, "seqIdx", i) logutil.Trace("computeBatch: EOS", "batchID", activeBatch.id, "seqIdx", i)
s.removeSequence(i, llm.DoneReasonStop) s.removeSequence(i, llm.DoneReasonStop)
continue continue
} }