diff --git a/asyncbuffer/buffer.go b/asyncbuffer/buffer.go index c9cc3aff..69976c73 100644 --- a/asyncbuffer/buffer.go +++ b/asyncbuffer/buffer.go @@ -89,6 +89,17 @@ func New(r io.ReadCloser, finishFn ...context.CancelFunc) *AsyncBuffer { return ab } +// callFinishFn calls the finish functions registered with the AsyncBuffer. +func (ab *AsyncBuffer) callFinishFn() { + ab.finishOnce.Do(func() { + for _, fn := range ab.finishFn { + if fn != nil { + fn() + } + } + }) +} + // addChunk adds a new chunk to the AsyncBuffer, increments len and signals that a chunk is ready func (ab *AsyncBuffer) addChunk(chunk *byteChunk) { ab.mu.Lock() @@ -119,11 +130,7 @@ func (ab *AsyncBuffer) readChunks() { logrus.WithField("source", "asyncbuffer.AsyncBuffer.readChunks").Warningf("error closing upstream reader: %s", err) } - ab.finishOnce.Do(func() { - for _, fn := range ab.finishFn { - fn() - } - }) + ab.callFinishFn() }() // Stop reading if the reader is closed @@ -403,11 +410,7 @@ func (ab *AsyncBuffer) Close() error { ab.paused.Release() // Finish downloading - ab.finishOnce.Do(func() { - for _, fn := range ab.finishFn { - fn() - } - }) + ab.callFinishFn() return nil }