runner: Release semaphore and improve error messages on failures

If we have an error after creating a new sequence but before
finding a slot for it, we return without releasing the semaphore.
This reduces our parallel sequences and eventually leads to deadlock.

In practice this should never happen because once we have acquired
the semaphore, we should always be able to find a slot. However, the
code is clearly not correct.
This commit is contained in:
Jesse Gross
2025-03-14 17:24:46 -07:00
committed by Jesse Gross
parent 5d097277ef
commit b2a465296d
2 changed files with 9 additions and 3 deletions

View File

@ -599,7 +599,7 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) {
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
slog.Info("aborting completion request due to client closing the connection") slog.Info("aborting completion request due to client closing the connection")
} else { } else {
slog.Error("Failed to acquire semaphore", "error", err) http.Error(w, fmt.Sprintf("Failed to acquire semaphore: %v", err), http.StatusInternalServerError)
} }
return return
} }
@ -611,6 +611,7 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) {
seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs, true) seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs, true)
if err != nil { if err != nil {
s.mu.Unlock() s.mu.Unlock()
s.seqsSem.Release(1)
http.Error(w, fmt.Sprintf("Failed to load cache: %v", err), http.StatusInternalServerError) http.Error(w, fmt.Sprintf("Failed to load cache: %v", err), http.StatusInternalServerError)
return return
} }
@ -626,6 +627,7 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) {
s.mu.Unlock() s.mu.Unlock()
if !found { if !found {
s.seqsSem.Release(1)
http.Error(w, "could not find an available sequence", http.StatusInternalServerError) http.Error(w, "could not find an available sequence", http.StatusInternalServerError)
return return
} }
@ -691,7 +693,7 @@ func (s *Server) embeddings(w http.ResponseWriter, r *http.Request) {
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
slog.Info("aborting embeddings request due to client closing the connection") slog.Info("aborting embeddings request due to client closing the connection")
} else { } else {
slog.Error("Failed to acquire semaphore", "error", err) http.Error(w, fmt.Sprintf("Failed to acquire semaphore: %v", err), http.StatusInternalServerError)
} }
return return
} }
@ -703,6 +705,7 @@ func (s *Server) embeddings(w http.ResponseWriter, r *http.Request) {
seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs, false) seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs, false)
if err != nil { if err != nil {
s.mu.Unlock() s.mu.Unlock()
s.seqsSem.Release(1)
http.Error(w, fmt.Sprintf("Failed to load cache: %v", err), http.StatusInternalServerError) http.Error(w, fmt.Sprintf("Failed to load cache: %v", err), http.StatusInternalServerError)
return return
} }
@ -715,6 +718,7 @@ func (s *Server) embeddings(w http.ResponseWriter, r *http.Request) {
s.mu.Unlock() s.mu.Unlock()
if !found { if !found {
s.seqsSem.Release(1)
http.Error(w, "could not find an available sequence", http.StatusInternalServerError) http.Error(w, "could not find an available sequence", http.StatusInternalServerError)
return return
} }

View File

@ -609,7 +609,7 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) {
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
slog.Info("aborting completion request due to client closing the connection") slog.Info("aborting completion request due to client closing the connection")
} else { } else {
slog.Error("Failed to acquire semaphore", "error", err) http.Error(w, fmt.Sprintf("Failed to acquire semaphore: %v", err), http.StatusInternalServerError)
} }
return return
} }
@ -621,6 +621,7 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) {
seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs) seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs)
if err != nil { if err != nil {
s.mu.Unlock() s.mu.Unlock()
s.seqsSem.Release(1)
http.Error(w, fmt.Sprintf("Failed to load cache: %v", err), http.StatusInternalServerError) http.Error(w, fmt.Sprintf("Failed to load cache: %v", err), http.StatusInternalServerError)
return return
} }
@ -634,6 +635,7 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) {
s.mu.Unlock() s.mu.Unlock()
if !found { if !found {
s.seqsSem.Release(1)
http.Error(w, "could not find an available sequence", http.StatusInternalServerError) http.Error(w, "could not find an available sequence", http.StatusInternalServerError)
return return
} }