IMG-13: pauseThreshold in AsyncBuffer + close behaviour (#1477)

* pauseThreshold in AsyncBuffer

* Additional close() condition

* io.ReadCloser in buffer

* ticker + fixes

* Minor fixes for asyncbuffer

* Renamed ticker to cond

* warn if close of upstream reader failed

* ticker -> chunkCond

* Fix io.EOF behaviour
This commit is contained in:
Victor Sokolov
2025-08-06 18:45:53 +02:00
committed by GitHub
parent f7a13c99de
commit 0015e88447
7 changed files with 662 additions and 191 deletions

View File

@@ -18,10 +18,19 @@ import (
"io"
"sync"
"sync/atomic"
"github.com/sirupsen/logrus"
)
// ChunkSize is the size of each chunk in bytes
const ChunkSize = 4096
const (
// chunkSize is the size of each chunk in bytes
chunkSize = 4096
// pauseThreshold is the size of the file which is always read to memory. Data beyond the
// threshold is read only if accessed. If not a multiple of chunkSize, the last chunk it points
// to is read in full.
pauseThreshold = 32768 // 32 KiB
)
// byteChunk is a struct that holds a buffer and the data read from the upstream reader
// data slice is required since the chunk read may be smaller than ChunkSize
@@ -34,7 +43,7 @@ type byteChunk struct {
// all readers
var chunkPool = sync.Pool{
New: func() any {
buf := make([]byte, ChunkSize)
buf := make([]byte, chunkSize)
return &byteChunk{
buf: buf,
@@ -46,31 +55,27 @@ var chunkPool = sync.Pool{
// AsyncBuffer is a wrapper around io.Reader that reads data in chunks
// in background and allows reading from synchronously.
type AsyncBuffer struct {
r io.Reader // Upstream reader
r io.ReadCloser // Upstream reader
chunks []*byteChunk // References to the chunks read from the upstream reader
mu sync.RWMutex // Mutex on chunks slice
err atomic.Value // Error that occurred during reading
finished atomic.Bool // Indicates that the reader has finished reading
len atomic.Int64 // Total length of the data read
closed atomic.Bool // Indicates that the reader was closed
err atomic.Value // Error that occurred during reading
len atomic.Int64 // Total length of the data read
mu sync.RWMutex // Mutex on chunks slice
newChunkSignal chan struct{} // Tick-tock channel that indicates that a new chunk is ready
}
finished atomic.Bool // Indicates that the buffer has finished reading
closed atomic.Bool // Indicates that the buffer was closed
// Underlying Reader that provides io.ReadSeeker interface for the actual data reading
// What is the purpose of this Reader?
type Reader struct {
ab *AsyncBuffer
pos int64
paused *Latch // Paused buffer does not read data beyond threshold
chunkCond *Cond // Ticker that signals when a new chunk is ready
}
// FromReadCloser creates a new AsyncBuffer that reads from the given io.Reader in background
func FromReader(r io.Reader) *AsyncBuffer {
func FromReader(r io.ReadCloser) *AsyncBuffer {
ab := &AsyncBuffer{
r: r,
newChunkSignal: make(chan struct{}),
r: r,
paused: NewLatch(),
chunkCond: NewCond(),
}
go ab.readChunks()
@@ -78,48 +83,50 @@ func FromReader(r io.Reader) *AsyncBuffer {
return ab
}
// getNewChunkSignal returns the channel that signals when a new chunk is ready
// Lock is required to read the channel, so it is not closed while reading
func (ab *AsyncBuffer) getNewChunkSignal() chan struct{} {
ab.mu.RLock()
defer ab.mu.RUnlock()
return ab.newChunkSignal
}
// addChunk adds a new chunk to the AsyncBuffer, increments len and signals that a chunk is ready
func (ab *AsyncBuffer) addChunk(chunk *byteChunk) {
ab.mu.Lock()
defer ab.mu.Unlock()
if ab.closed.Load() {
// If the reader is closed, we return the chunk to the pool
chunkPool.Put(chunk)
return
}
// Store the chunk, increase chunk size, increase length of the data read
ab.chunks = append(ab.chunks, chunk)
ab.len.Add(int64(len(chunk.data)))
// Signal that a chunk is ready
currSignal := ab.newChunkSignal
ab.newChunkSignal = make(chan struct{})
close(currSignal)
}
// finish marks the reader as finished
func (ab *AsyncBuffer) finish() {
// Indicate that the reader has finished reading
ab.finished.Store(true)
// This indicates that Close() was called before all the chunks were read, we do not need to close the channel
// since it was closed already.
if !ab.closed.Load() {
close(ab.newChunkSignal)
}
ab.chunkCond.Tick()
}
// readChunks reads data from the upstream reader in background and stores them in the pool
func (ab *AsyncBuffer) readChunks() {
defer ab.finish()
defer func() {
// Indicate that the reader has finished reading
ab.finished.Store(true)
ab.chunkCond.Close()
// Close the upstream reader
if err := ab.r.Close(); err != nil {
logrus.WithField("source", "asyncbuffer.AsyncBuffer.readChunks").Warningf("error closing upstream reader: %s", err)
}
}()
// Stop reading if the reader is closed
for !ab.closed.Load() {
// In case we are trying to read data beyond threshold and we are paused,
// wait for pause to be released.
if ab.len.Load() >= pauseThreshold {
ab.paused.Wait()
// If the reader has been closed while waiting, we can stop reading
if ab.closed.Load() {
return // No more data to read
}
}
// Stop reading if the reader is finished
for !ab.finished.Load() {
// Get a chunk from the pool
// If the pool is empty, it will create a new byteChunk with ChunkSize
chunk, ok := chunkPool.Get().(*byteChunk)
@@ -129,11 +136,14 @@ func (ab *AsyncBuffer) readChunks() {
}
// Read data into the chunk's buffer
// There is no way to guarantee that ReadFull will abort on context cancellation,
// unfortunately, this is how golang works.
n, err := io.ReadFull(ab.r, chunk.buf)
// If it's not the EOF, we need to store the error
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
ab.err.Store(err)
chunkPool.Put(chunk)
return
}
@@ -161,24 +171,24 @@ func (ab *AsyncBuffer) readChunks() {
// If the reader had an error, it returns that error instead.
func (ab *AsyncBuffer) closedError() error {
// If the reader is closed, we return the error or nil
if ab.closed.Load() {
err := ab.Error()
if err == nil {
err = errors.New("asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
}
return err
if !ab.closed.Load() {
return nil
}
return nil
err := ab.Error()
if err == nil {
err = errors.New("asyncbuffer.AsyncBuffer.ReadAt: attempt to read on closed reader")
}
return err
}
// offsetAvailable checks if the data at the given offset is available for reading.
// It may return io.EOF if the reader is finished reading and the offset is beyond the end of the stream.
func (ab *AsyncBuffer) offsetAvailable(off int64) (bool, error) {
// We can not read data from the closed reader, none
if ab.closed.Load() {
return false, ab.closedError()
if err := ab.closedError(); err != nil {
return false, err
}
// In case the offset falls within the already read chunks, we can return immediately,
@@ -191,8 +201,7 @@ func (ab *AsyncBuffer) offsetAvailable(off int64) (bool, error) {
// data yet, return either error or EOF
if ab.finished.Load() {
// In case, error has occurred, we need to return it
err := ab.Error()
if err != nil {
if err := ab.Error(); err != nil {
return false, err
}
@@ -207,46 +216,41 @@ func (ab *AsyncBuffer) offsetAvailable(off int64) (bool, error) {
// WaitFor waits for the data to be ready at the given offset. nil means ok.
// It guarantees that the chunk at the given offset is ready to be read.
func (ab *AsyncBuffer) WaitFor(off int64) error {
// In case we are trying to read data which would potentially hit the pause threshold,
// we need to unpause the reader ASAP.
if off >= pauseThreshold {
ab.paused.Release()
}
for {
ok, err := ab.offsetAvailable(off)
if ok || err != nil {
return err
}
<-ab.getNewChunkSignal()
ab.chunkCond.Wait()
}
}
// Wait waits for the reader to finish reading all data and returns
// the total length of the data read.
func (ab *AsyncBuffer) Wait() (int64, error) {
// Wait ends till the end of the stream: unpause the reader
ab.paused.Release()
for {
// We can not read data from the closed reader even if there were no errors
if ab.closed.Load() {
return 0, ab.closedError()
// We can not read data from the closed reader
if err := ab.closedError(); err != nil {
return 0, err
}
// In case the reader is finished reading, we can return immediately
if ab.finished.Load() {
size := ab.len.Load()
// If there was an error during reading, we need to return it no matter what position
// had the error happened
err := ab.err.Load()
if err != nil {
err, ok := err.(error)
if !ok {
return size, errors.New("asyncbuffer.AsyncBuffer.Wait: failed to get error")
}
return size, err
}
return size, nil
return ab.len.Load(), ab.Error()
}
// Lock until the next chunk is ready
<-ab.getNewChunkSignal()
ab.chunkCond.Wait()
}
}
@@ -275,10 +279,10 @@ func (ab *AsyncBuffer) readChunkAt(p []byte, off int64) int {
return 0
}
ind := off / ChunkSize // chunk index
ind := off / chunkSize // chunk index
chunk := ab.chunks[ind]
startOffset := off % ChunkSize // starting offset in the chunk
startOffset := off % chunkSize // starting offset in the chunk
// If the offset in current chunk is greater than the data
// it has, we return 0
@@ -293,15 +297,11 @@ func (ab *AsyncBuffer) readChunkAt(p []byte, off int64) int {
// readAt reads data from the AsyncBuffer at the given offset.
//
// If full is true:
// Please note that if pause threshold is hit in the middle of the reading,
// the data beyond the threshold may not be available.
//
// The behaviour is similar to io.ReaderAt.ReadAt. It blocks until the maxumum amount of data possible
// is read from the buffer. It may return io.UnexpectedEOF in case we tried to read more data than was
// available in the buffer.
//
// If full is false:
//
// It behaves like a regular non-blocking Read.
// If the reader is paused and we try to read data beyond the pause threshold,
// it will wait till something could be returned.
func (ab *AsyncBuffer) readAt(p []byte, off int64) (int, error) {
size := int64(len(p)) // total size of the data to read
@@ -309,6 +309,11 @@ func (ab *AsyncBuffer) readAt(p []byte, off int64) (int, error) {
return 0, errors.New("asyncbuffer.AsyncBuffer.readAt: negative offset")
}
// If we plan to hit threshold while reading, release the paused reader
if int64(len(p))+off > pauseThreshold {
ab.paused.Release()
}
// Wait for the offset to be available.
// It may return io.EOF if the offset is beyond the end of the stream.
err := ab.WaitFor(off)
@@ -316,12 +321,13 @@ func (ab *AsyncBuffer) readAt(p []byte, off int64) (int, error) {
return 0, err
}
// We lock the mutex until current buffer is read
ab.mu.RLock()
defer ab.mu.RUnlock()
// If the reader is closed, we return an error
if ab.closed.Load() {
return 0, ab.closedError()
if err := ab.closedError(); err != nil {
return 0, err
}
// Read data from the first chunk
@@ -337,7 +343,11 @@ func (ab *AsyncBuffer) readAt(p []byte, off int64) (int, error) {
for size > 0 {
// If data is not available at the given offset, we can return data read so far.
ok, err := ab.offsetAvailable(off)
if !ok || err != nil {
if !ok {
if err == io.EOF {
return n, nil
}
return n, err
}
@@ -349,8 +359,8 @@ func (ab *AsyncBuffer) readAt(p []byte, off int64) (int, error) {
// If we read data shorter than ChunkSize or, in case that was the last chunk, less than
// the size of the tail, return kind of EOF
if int64(nX) < min(size, int64(ChunkSize)) {
return n, io.EOF
if int64(nX) < min(size, int64(chunkSize)) {
return n, nil
}
}
@@ -372,17 +382,14 @@ func (ab *AsyncBuffer) Close() error {
ab.closed.Store(true)
// If the reader is still running, we need to signal that it should stop and close the channel
if !ab.finished.Load() {
ab.finished.Store(true)
close(ab.newChunkSignal)
}
// Return all chunks to the pool
for _, chunk := range ab.chunks {
chunkPool.Put(chunk)
}
// Release the paused latch so that no goroutines are waiting for it
ab.paused.Release()
return nil
}
@@ -390,41 +397,3 @@ func (ab *AsyncBuffer) Close() error {
func (ab *AsyncBuffer) Reader() *Reader {
return &Reader{ab: ab, pos: 0}
}
// Read reads data from the AsyncBuffer.
func (r *Reader) Read(p []byte) (int, error) {
n, err := r.ab.readAt(p, r.pos)
if err == nil {
r.pos += int64(n)
}
return n, err
}
// Seek sets the position of the reader to the given offset and returns the new position
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
r.pos = offset
case io.SeekCurrent:
r.pos += offset
case io.SeekEnd:
size, err := r.ab.Wait()
if err != nil {
return 0, err
}
r.pos = size + offset
default:
return 0, errors.New("asyncbuffer.AsyncBuffer.ReadAt: invalid whence")
}
if r.pos < 0 {
return 0, errors.New("asyncbuffer.AsyncBuffer.ReadAt: negative position")
}
return r.pos, nil
}

View File

@@ -9,19 +9,30 @@ import (
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
halfChunkSize = ChunkSize / 2
quaterChunkSize = ChunkSize / 4
halfChunkSize = chunkSize / 2
quaterChunkSize = chunkSize / 4
)
// nopSeekCloser is a wrapper around io.ReadSeeker that implements no-op io.Closer
type nopSeekCloser struct {
io.ReadSeeker
}
// Close implements io.Closer interface, but does nothing
func (nopSeekCloser) Close() error {
return nil
}
// erraticReader is a test reader that simulates a slow read and can fail after reading a certain number of bytes
type erraticReader struct {
reader bytes.Reader
reader io.ReadSeekCloser
failAt int64 // if set, will return an error after reading this many bytes
}
@@ -34,15 +45,20 @@ func (r *erraticReader) Read(p []byte) (n int, err error) {
return r.reader.Read(p)
}
// Close forwards closing to the underlying reader
func (r *erraticReader) Close() error {
return r.reader.Close()
}
// blockingReader is a test reader which flushes data in chunks
type blockingReader struct {
reader bytes.Reader
reader io.ReadCloser
mu sync.Mutex // locked reader does not return anything
unlocking atomic.Bool // if true, will proceed without locking each chunk
}
// newBlockingReader creates a new partialReader in locked state
func newBlockingReader(reader bytes.Reader) *blockingReader {
func newBlockingReader(reader io.ReadCloser) *blockingReader {
r := &blockingReader{
reader: reader,
}
@@ -71,10 +87,14 @@ func (r *blockingReader) Read(p []byte) (n int, err error) {
return n, err
}
func (r *blockingReader) Close() error { // Close forwards closing to the underlying reader
return r.reader.Close()
}
// generateSourceData generates a byte slice with 4.5 chunks of data
func generateSourceData(t *testing.T, size int64) ([]byte, *bytes.Reader) {
func generateSourceData(t *testing.T, size int64) ([]byte, io.ReadSeekCloser) {
// We use small chunks for tests, let's check the ChunkSize just in case
assert.GreaterOrEqual(t, ChunkSize, 20, "ChunkSize required for tests must be greater than 10 bytes")
assert.GreaterOrEqual(t, chunkSize, 20, "ChunkSize required for tests must be greater than 10 bytes")
// Create a byte slice with 4 chunks of ChunkSize
source := make([]byte, size)
@@ -82,13 +102,13 @@ func generateSourceData(t *testing.T, size int64) ([]byte, *bytes.Reader) {
// Fill the source with random data
_, err := rand.Read(source)
require.NoError(t, err)
return source, bytes.NewReader(source)
return source, nopSeekCloser{bytes.NewReader(source)}
}
// TestAsyncBufferRead tests reading from AsyncBuffer using readAt method which is base for all other methods
func TestAsyncBufferReadAt(t *testing.T) {
// Let's use source buffer which is 4.5 chunks long
source, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
asyncBuffer := FromReader(bytesReader)
defer asyncBuffer.Close()
@@ -106,7 +126,7 @@ func TestAsyncBufferReadAt(t *testing.T) {
target = make([]byte, len(source)+1)
n, err = asyncBuffer.readAt(target, 0)
require.ErrorIs(t, err, io.EOF) // We read all the data, and reached end
require.NoError(t, err) // We read all the data, and reached end
assert.Equal(t, len(source), n)
assert.Equal(t, target[:n], source)
@@ -118,30 +138,29 @@ func TestAsyncBufferReadAt(t *testing.T) {
assert.Equal(t, target, source[quaterChunkSize:len(source)-quaterChunkSize])
// Let's read some data from the middle of the stream < chunk size
target = make([]byte, ChunkSize/4)
n, err = asyncBuffer.readAt(target, ChunkSize+ChunkSize/4)
target = make([]byte, chunkSize/4)
n, err = asyncBuffer.readAt(target, chunkSize+chunkSize/4)
require.NoError(t, err)
assert.Equal(t, quaterChunkSize, n)
assert.Equal(t, target, source[ChunkSize+quaterChunkSize:ChunkSize+quaterChunkSize*2])
assert.Equal(t, target, source[chunkSize+quaterChunkSize:chunkSize+quaterChunkSize*2])
// Let's read some data from the latest half chunk
target = make([]byte, quaterChunkSize)
n, err = asyncBuffer.readAt(target, ChunkSize*4+quaterChunkSize)
n, err = asyncBuffer.readAt(target, chunkSize*4+quaterChunkSize)
require.NoError(t, err)
assert.Equal(t, quaterChunkSize, n)
assert.Equal(t, target, source[ChunkSize*4+quaterChunkSize:ChunkSize*4+halfChunkSize])
assert.Equal(t, target, source[chunkSize*4+quaterChunkSize:chunkSize*4+halfChunkSize])
// Let's try to read more data then available in the stream
target = make([]byte, ChunkSize*2)
n, err = asyncBuffer.readAt(target, ChunkSize*4)
require.Error(t, err)
assert.Equal(t, err, io.EOF)
assert.Equal(t, ChunkSize/2, n)
assert.Equal(t, target[:ChunkSize/2], source[ChunkSize*4:]) // We read only last half chunk
target = make([]byte, chunkSize*2)
n, err = asyncBuffer.readAt(target, chunkSize*4)
require.NoError(t, err)
assert.Equal(t, chunkSize/2, n)
assert.Equal(t, target[:chunkSize/2], source[chunkSize*4:]) // We read only last half chunk
// Let's try to read data beyond the end of the stream
target = make([]byte, ChunkSize*2)
n, err = asyncBuffer.readAt(target, ChunkSize*5)
target = make([]byte, chunkSize*2)
n, err = asyncBuffer.readAt(target, chunkSize*5)
require.Error(t, err)
assert.Equal(t, err, io.EOF)
assert.Equal(t, 0, n)
@@ -177,7 +196,7 @@ func TestAsyncBufferReadAtSmallBuffer(t *testing.T) {
}
func TestAsyncBufferReader(t *testing.T) {
source, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
// Create an AsyncBuffer with the byte slice
asyncBuffer := FromReader(bytesReader)
@@ -186,7 +205,7 @@ func TestAsyncBufferReader(t *testing.T) {
// Let's wait for all chunks to be read
size, err := asyncBuffer.Wait()
require.NoError(t, err, "AsyncBuffer failed to wait for all chunks")
assert.Equal(t, int64(ChunkSize*4+halfChunkSize), size)
assert.Equal(t, int64(chunkSize*4+halfChunkSize), size)
reader := asyncBuffer.Reader()
@@ -194,34 +213,34 @@ func TestAsyncBufferReader(t *testing.T) {
require.NoError(t, err)
// Read the first two chunks
twoChunks := make([]byte, ChunkSize*2)
twoChunks := make([]byte, chunkSize*2)
n, err := reader.Read(twoChunks)
require.NoError(t, err)
assert.Equal(t, ChunkSize*2, n)
assert.Equal(t, source[:ChunkSize*2], twoChunks)
assert.Equal(t, chunkSize*2, n)
assert.Equal(t, source[:chunkSize*2], twoChunks)
// Seek to the last chunk + 10 bytes
pos, err := reader.Seek(ChunkSize*3+5, io.SeekStart)
pos, err := reader.Seek(chunkSize*3+5, io.SeekStart)
require.NoError(t, err)
assert.Equal(t, int64(ChunkSize*3+5), pos)
assert.Equal(t, int64(chunkSize*3+5), pos)
// Read the next 10 bytes
smallSlice := make([]byte, 10)
n, err = reader.Read(smallSlice)
require.NoError(t, err)
assert.Equal(t, 10, n)
assert.Equal(t, source[ChunkSize*3+5:ChunkSize*3+5+10], smallSlice)
assert.Equal(t, source[chunkSize*3+5:chunkSize*3+5+10], smallSlice)
// Seek -10 bytes from the current position
pos, err = reader.Seek(-10, io.SeekCurrent)
require.NoError(t, err)
assert.Equal(t, int64(ChunkSize*3+5), pos)
assert.Equal(t, int64(chunkSize*3+5), pos)
// Read data again
n, err = reader.Read(smallSlice)
require.NoError(t, err)
assert.Equal(t, 10, n)
assert.Equal(t, source[ChunkSize*3+5:ChunkSize*3+5+10], smallSlice)
assert.Equal(t, source[chunkSize*3+5:chunkSize*3+5+10], smallSlice)
// Seek -10 bytes from end of the stream
pos, err = reader.Seek(-10, io.SeekEnd)
@@ -245,7 +264,7 @@ func TestAsyncBufferReader(t *testing.T) {
// TestAsyncBufferClose tests closing the AsyncBuffer
func TestAsyncBufferClose(t *testing.T) {
_, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
_, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
// Create an AsyncBuffer with the byte slice
asyncBuffer := FromReader(bytesReader)
@@ -273,8 +292,8 @@ func TestAsyncBufferClose(t *testing.T) {
// which would fail somewhere
func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
// Let's use source buffer which is 4.5 chunks long
source, bytesReader := generateSourceData(t, int64(ChunkSize*4)+halfChunkSize)
slowReader := &erraticReader{reader: *bytesReader, failAt: ChunkSize*3 + 5} // fails at last chunk
source, bytesReader := generateSourceData(t, int64(chunkSize*4)+halfChunkSize)
slowReader := &erraticReader{reader: bytesReader, failAt: chunkSize*3 + 5} // fails at last chunk
asyncBuffer := FromReader(slowReader)
defer asyncBuffer.Close()
@@ -298,7 +317,7 @@ func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
// Let's read something, but when error occurs
target = make([]byte, halfChunkSize)
_, err = asyncBuffer.readAt(target, ChunkSize*3)
_, err = asyncBuffer.readAt(target, chunkSize*3)
require.Error(t, err, "simulated read failure")
}
@@ -306,8 +325,8 @@ func TestAsyncBufferReadAtErrAtSomePoint(t *testing.T) {
// with full = false
func TestAsyncBufferReadAsync(t *testing.T) {
// Let's use source buffer which is 4.5 chunks long
source, bytesReader := generateSourceData(t, int64(ChunkSize)*3)
blockingReader := newBlockingReader(*bytesReader)
source, bytesReader := generateSourceData(t, int64(chunkSize)*3)
blockingReader := newBlockingReader(bytesReader)
asyncBuffer := FromReader(blockingReader)
defer asyncBuffer.Close()
@@ -316,34 +335,34 @@ func TestAsyncBufferReadAsync(t *testing.T) {
// Let's try to read first two chunks, however,
// we know that only the first chunk is available
target := make([]byte, ChunkSize*2)
target := make([]byte, chunkSize*2)
n, err := asyncBuffer.readAt(target, 0)
require.NoError(t, err)
assert.Equal(t, ChunkSize, n)
assert.Equal(t, target[:ChunkSize], source[:ChunkSize])
assert.Equal(t, chunkSize, n)
assert.Equal(t, target[:chunkSize], source[:chunkSize])
blockingReader.flushNextChunk() // unlock reader to allow read second chunk
asyncBuffer.WaitFor(ChunkSize + 1) // wait for the second chunk to be available
asyncBuffer.WaitFor(chunkSize + 1) // wait for the second chunk to be available
target = make([]byte, ChunkSize*2)
target = make([]byte, chunkSize*2)
n, err = asyncBuffer.readAt(target, 0)
require.NoError(t, err)
assert.Equal(t, ChunkSize*2, n)
assert.Equal(t, target, source[:ChunkSize*2])
assert.Equal(t, chunkSize*2, n)
assert.Equal(t, target, source[:chunkSize*2])
blockingReader.flush() // Flush the rest of the data
asyncBuffer.Wait()
// Try to read near end of the stream, EOF
target = make([]byte, ChunkSize)
n, err = asyncBuffer.readAt(target, ChunkSize*3-1)
require.ErrorIs(t, err, io.EOF)
target = make([]byte, chunkSize)
n, err = asyncBuffer.readAt(target, chunkSize*3-1)
require.NoError(t, err)
assert.Equal(t, 1, n)
assert.Equal(t, target[0], source[ChunkSize*3-1])
assert.Equal(t, target[0], source[chunkSize*3-1])
// Try to read beyond the end of the stream == eof
target = make([]byte, ChunkSize)
n, err = asyncBuffer.readAt(target, ChunkSize*3)
target = make([]byte, chunkSize)
n, err = asyncBuffer.readAt(target, chunkSize*3)
require.ErrorIs(t, io.EOF, err)
assert.Equal(t, 0, n)
}
@@ -352,10 +371,72 @@ func TestAsyncBufferReadAsync(t *testing.T) {
func TestAsyncBufferReadAllCompability(t *testing.T) {
source, err := os.ReadFile("../testdata/test1.jpg")
require.NoError(t, err)
asyncBuffer := FromReader(bytes.NewReader(source))
asyncBuffer := FromReader(nopSeekCloser{bytes.NewReader(source)})
defer asyncBuffer.Close()
b, err := io.ReadAll(asyncBuffer.Reader())
require.NoError(t, err)
require.Len(t, b, len(source))
}
func TestAsyncBufferThreshold(t *testing.T) {
_, bytesReader := generateSourceData(t, int64(pauseThreshold)*2)
asyncBuffer := FromReader(bytesReader)
defer asyncBuffer.Close()
target := make([]byte, chunkSize)
n, err := asyncBuffer.readAt(target, 0)
require.NoError(t, err)
assert.Equal(t, chunkSize, n)
// Ensure that buffer hits the pause threshold
require.Eventually(t, func() bool {
return asyncBuffer.len.Load() >= pauseThreshold
}, 300*time.Millisecond, 10*time.Millisecond)
// Ensure that buffer never reaches the end of the stream
require.Never(t, func() bool {
return asyncBuffer.len.Load() >= pauseThreshold*2-1
}, 300*time.Millisecond, 10*time.Millisecond)
// Let's hit the pause threshold
target = make([]byte, pauseThreshold)
n, err = asyncBuffer.readAt(target, 0)
require.NoError(t, err)
require.Equal(t, pauseThreshold, n)
// Ensure that buffer never reaches the end of the stream
require.Never(t, func() bool {
return asyncBuffer.len.Load() >= pauseThreshold*2-1
}, 300*time.Millisecond, 10*time.Millisecond)
// Let's hit the pause threshold
target = make([]byte, pauseThreshold+1)
n, err = asyncBuffer.readAt(target, 0)
require.NoError(t, err)
// It usually returns only pauseThreshold bytes because this exact operation unpauses the reader,
// but the initial offset is before the threshold, data beyond the threshold may not be available.
assert.GreaterOrEqual(t, pauseThreshold, n)
// Ensure that buffer hits the end of the stream
require.Eventually(t, func() bool {
return asyncBuffer.len.Load() >= pauseThreshold*2
}, 300*time.Millisecond, 10*time.Millisecond)
}
func TestAsyncBufferThresholdInstantBeyondAccess(t *testing.T) {
_, bytesReader := generateSourceData(t, int64(pauseThreshold)*2)
asyncBuffer := FromReader(bytesReader)
defer asyncBuffer.Close()
target := make([]byte, chunkSize)
n, err := asyncBuffer.readAt(target, pauseThreshold+1)
require.NoError(t, err)
assert.GreaterOrEqual(t, chunkSize, n)
// Ensure that buffer hits the end of the stream
require.Eventually(t, func() bool {
return asyncBuffer.len.Load() >= pauseThreshold*2
}, 300*time.Millisecond, 10*time.Millisecond)
}

58
asyncbuffer/cond.go Normal file
View File

@@ -0,0 +1,58 @@
package asyncbuffer
import (
"sync"
)
type condCh = chan struct{}
// Cond signals that an event has occurred to a multiple waiters.
type Cond struct {
mu sync.RWMutex
ch condCh
closeOnce sync.Once
}
// NewCond creates a new Ticker instance with an initialized channel.
func NewCond() *Cond {
return &Cond{
ch: make(condCh),
}
}
// Tick signals that an event has occurred by closing the channel.
func (t *Cond) Tick() {
t.mu.Lock()
defer t.mu.Unlock()
if t.ch != nil {
close(t.ch)
t.ch = make(condCh)
}
}
// Wait blocks until the channel is closed, indicating that an event has occurred.
func (t *Cond) Wait() {
t.mu.RLock()
ch := t.ch
t.mu.RUnlock()
if ch == nil {
return
}
<-ch
}
// Close closes the ticker channel and prevents further ticks.
func (t *Cond) Close() {
t.closeOnce.Do(func() {
t.mu.Lock()
defer t.mu.Unlock()
if t.ch != nil {
close(t.ch)
t.ch = nil
}
})
}

158
asyncbuffer/cond_test.go Normal file
View File

@@ -0,0 +1,158 @@
package asyncbuffer
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/suite"
)
type TestCondSuite struct {
suite.Suite
cond *Cond
}
func (s *TestCondSuite) SetupTest() {
s.cond = NewCond()
}
func (s *TestCondSuite) TeardownTest() {
if s.cond != nil {
s.cond.Close()
}
}
// TestBasicWaitAndTick tests the basic functionality of the Cond
func (s *TestCondSuite) TestBasicWaitAndTick() {
done := make(chan struct{})
ch := s.cond.ch
// Start a goroutine that will tick after a short delay
go func() {
time.Sleep(50 * time.Millisecond)
s.cond.Tick()
}()
// Start a goroutine that will wait for the tick
go func() {
s.cond.Wait()
close(done)
}()
s.Require().Eventually(func() bool {
select {
case <-done:
return true
default:
return false
}
}, 100*time.Millisecond, 10*time.Millisecond)
// Means that and old channel was closed and a new one has been created
s.Require().NotEqual(ch, s.cond.ch)
}
// TestWaitMultipleWaiters tests that multiple waiters can be unblocked by a single tick
func (s *TestCondSuite) TestWaitMultipleWaiters() {
const numWaiters = 10
var wg sync.WaitGroup
var startWg sync.WaitGroup
results := make([]bool, numWaiters)
// Start multiple waiters
for i := range numWaiters {
wg.Add(1)
startWg.Add(1)
go func(index int) {
defer wg.Done()
startWg.Done() // Signal that this goroutine is ready
s.cond.Wait()
results[index] = true
}(i)
}
// Wait for all goroutines to start waiting
startWg.Wait()
// Wait for all waiters to complete
done := make(chan struct{})
go func() {
s.cond.Tick() // Signal that execution can proceed
wg.Wait()
close(done)
}()
s.Require().Eventually(func() bool {
select {
case <-done:
return true
default:
return false
}
}, 100*time.Millisecond, 10*time.Millisecond)
// Check that all waiters were unblocked
for _, completed := range results {
s.Require().True(completed)
}
}
// TestClose tests the behavior of the Cond when closed
func (s *TestCondSuite) TestClose() {
s.cond.Close()
s.cond.Close() // Should not panic
s.cond.Wait() // Should eventually return
s.cond.Tick() // Should not panic
s.Require().Nil(s.cond.ch)
}
func (s *TestCondSuite) TestRapidTicksAndWaits() {
const iterations = 1000
var wg sync.WaitGroup
// Start a goroutine that will rapidly tick
wg.Add(1)
go func() {
defer wg.Done()
for range iterations {
s.cond.Tick()
time.Sleep(time.Microsecond)
}
s.cond.Close() // Close after all ticks
}()
// Start multiple waiters
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
for range iterations / 10 {
s.cond.Wait()
}
}()
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
s.Require().Eventually(func() bool {
select {
case <-done:
return true
default:
return false
}
}, 100*time.Millisecond, 10*time.Millisecond)
}
func TestCond(t *testing.T) {
suite.Run(t, new(TestCondSuite))
}

26
asyncbuffer/latch.go Normal file
View File

@@ -0,0 +1,26 @@
package asyncbuffer
import (
"sync"
)
// Latch is once-releasing semaphore.
type Latch struct {
once sync.Once
done chan struct{}
}
// NewLatch creates a new Latch.
func NewLatch() *Latch {
return &Latch{done: make(chan struct{})}
}
// Release releases the latch, allowing all waiting goroutines to proceed.
func (g *Latch) Release() {
g.once.Do(func() { close(g.done) })
}
// Wait blocks until the latch is released.
func (g *Latch) Wait() {
<-g.done
}

128
asyncbuffer/latch_test.go Normal file
View File

@@ -0,0 +1,128 @@
package asyncbuffer
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewLatch(t *testing.T) {
latch := NewLatch()
require.NotNil(t, latch)
require.NotNil(t, latch.done)
// Channel should be open (not closed) initially
select {
case <-latch.done:
t.Fatal("Latch should not be released initially")
default:
// Expected - channel is not ready
}
}
func TestLatchRelease(t *testing.T) {
latch := NewLatch()
// Release the latch
latch.Release()
// Channel should now be closed/ready
select {
case <-latch.done:
// Expected - channel is ready after release
default:
t.Fatal("Latch should be released after Release() call")
}
}
func TestLatchWait(t *testing.T) {
latch := NewLatch()
// Start a goroutine that will wait
waitCompleted := make(chan bool, 1)
go func() {
latch.Wait()
waitCompleted <- true
}()
// Give the goroutine a moment to start waiting
time.Sleep(10 * time.Millisecond)
// Wait should not complete yet
select {
case <-waitCompleted:
t.Fatal("Wait should not complete before Release")
default:
// Expected
}
// Release the latch
latch.Release()
// Wait should complete now
select {
case <-waitCompleted:
// Expected
case <-time.After(100 * time.Millisecond):
t.Fatal("Wait should complete after Release")
}
}
func TestLatchMultipleWaiters(t *testing.T) {
latch := NewLatch()
const numWaiters = 10
var wg sync.WaitGroup
waitersCompleted := make(chan int, numWaiters)
// Start multiple waiters
for i := 0; i < numWaiters; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
latch.Wait()
waitersCompleted <- id
}(i)
}
// Give goroutines time to start waiting
time.Sleep(10 * time.Millisecond)
// No waiters should complete yet
assert.Empty(t, waitersCompleted)
// Release the latch
latch.Release()
// All waiters should complete
wg.Wait()
close(waitersCompleted)
// Verify all waiters completed
completed := make([]int, 0, numWaiters)
for id := range waitersCompleted {
completed = append(completed, id)
}
assert.Len(t, completed, numWaiters)
}
func TestLatchMultipleReleases(t *testing.T) {
latch := NewLatch()
// Release multiple times should be safe
latch.Release()
latch.Release()
latch.Release()
// Should still be able to wait
select {
case <-latch.done:
// Expected - channel should be ready
default:
t.Fatal("Latch should be released")
}
}

51
asyncbuffer/reader.go Normal file
View File

@@ -0,0 +1,51 @@
package asyncbuffer
import (
"errors"
"io"
)
// Underlying Reader that provides io.ReadSeeker interface for the actual data reading
// What is the purpose of this Reader?
type Reader struct {
ab *AsyncBuffer
pos int64
}
// Read reads data from the AsyncBuffer.
func (r *Reader) Read(p []byte) (int, error) {
n, err := r.ab.readAt(p, r.pos)
if err == nil {
r.pos += int64(n)
}
return n, err
}
// Seek sets the position of the reader to the given offset and returns the new position
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
r.pos = offset
case io.SeekCurrent:
r.pos += offset
case io.SeekEnd:
size, err := r.ab.Wait()
if err != nil {
return 0, err
}
r.pos = size + offset
default:
return 0, errors.New("asyncbuffer.AsyncBuffer.ReadAt: invalid whence")
}
if r.pos < 0 {
return 0, errors.New("asyncbuffer.AsyncBuffer.ReadAt: negative position")
}
return r.pos, nil
}