From e2c5fcc7d39727341b2f4b95d6edb7baae285a72 Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Thu, 25 Jun 2026 09:56:14 +1000 Subject: [PATCH] fix(client): read whole object when ParallelGet concurrency is 1 A single worker gains nothing from chunking, so skip the ranged discovery request entirely and read the object in one revision-consistent request rather than serialising ranged GETs. --- client/parallel_get.go | 33 +++++++++++++------ client/parallel_get_test.go | 49 +++++++++++++++++++++++++++++ internal/cache/parallel_get_test.go | 12 ++++--- 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/client/parallel_get.go b/client/parallel_get.go index 531e2e51..be7c9aa5 100644 --- a/client/parallel_get.go +++ b/client/parallel_get.go @@ -32,7 +32,9 @@ type RangeReader interface { // is likewise reported as an error, so a partially written dst must be discarded // by the caller on failure. An object with no ETag to pin to (e.g. one stored // before ETags were recorded) cannot be kept revision-safe across chunks, so it -// falls back to a single full read instead of parallelising. +// falls back to a single full read instead of parallelising. A concurrency of +// 1 likewise reads the whole object in one request, since chunking a single +// worker would only serialise ranged GETs for no benefit. // // dst is written via concurrent WriteAt calls at non-overlapping offsets; the // caller owns dst's lifecycle (open, close, cleanup) and need not pre-size it, @@ -43,6 +45,13 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c } concurrency = max(concurrency, 1) + // A single worker gains nothing from chunking — it would only serialise + // ranged GETs — so skip discovery entirely and read the object in one + // revision-consistent request. + if concurrency == 1 { + return fullRead(ctx, c, key, dst) + } + // Discovery: the first ranged Open delivers chunk zero and reveals the total // size and ETag used to pin the rest. rc, headers, err := c.Open(ctx, key, Range(0, chunkSize)) @@ -77,14 +86,7 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c if err := rc.Close(); err != nil { return errors.Wrap(err, "parallel get: close discovery reader") } - full, _, err := c.Open(ctx, key) - if err != nil { - return errors.Wrap(err, "parallel get: full read") - } - // The full read is a fresh request whose body may be a different - // revision than discovery, so the discovery `total` cannot validate its - // length; -1 skips the check and relies on transport-level EOF detection. - return errors.Wrap(writeChunkAt(dst, 0, -1, full), "parallel get") + return fullRead(ctx, c, key, dst) } // Multiple chunks: copy the already-open first chunk concurrently with the @@ -106,6 +108,19 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c return errors.Wrap(eg.Wait(), "parallel get") } +// fullRead downloads the entire object in a single request and writes it at +// offset zero. It is used when chunking would add no value (a single worker) or +// cannot be made revision-safe (no ETag to pin). The body is a single +// consistent revision, but its length is unknown up front, so writeChunkAt's +// length check is skipped (-1). +func fullRead(ctx context.Context, c RangeReader, key Key, dst io.WriterAt) error { + rc, _, err := c.Open(ctx, key) + if err != nil { + return errors.Wrap(err, "parallel get: full read") + } + return errors.Wrap(writeChunkAt(dst, 0, -1, rc), "parallel get") +} + // fetchChunk opens the [start, end) range pinned to etag and writes it at start. // An ETag change (the object was rewritten mid-download) or a short read is // reported as an error. diff --git a/client/parallel_get_test.go b/client/parallel_get_test.go index 791e1125..b9d920f5 100644 --- a/client/parallel_get_test.go +++ b/client/parallel_get_test.go @@ -147,6 +147,55 @@ func (c *changingSizeReader) Open(_ context.Context, _ client.Key, opts ...clien return io.NopCloser(bytes.NewReader(c.discovery[start : start+length])), headers, nil } +// recordingReader serves byte ranges and records the Range option of every +// Open call ("" for a full, non-ranged read), so tests can assert how the +// object was fetched. +type recordingReader struct { + data []byte + etag string + + mu sync.Mutex + opens []string +} + +func (r *recordingReader) Open(_ context.Context, _ client.Key, opts ...client.RequestOption) (io.ReadCloser, http.Header, error) { + o := client.NewRequestOptions(opts...) + r.mu.Lock() + r.opens = append(r.opens, o.Range) + r.mu.Unlock() + + size := int64(len(r.data)) + start, length, outcome := o.ResolveRange(size, r.etag) + headers := http.Header{} + if outcome == client.RangeNotSatisfiable { + headers.Set("Content-Range", fmt.Sprintf("bytes */%d", size)) + return nil, headers, client.ErrRangeNotSatisfiable + } + if r.etag != "" { + headers.Set(client.ETagKey, r.etag) + } + headers.Set("Content-Length", strconv.FormatInt(length, 10)) + if outcome == client.RangePartial { + headers.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, start+length-1, size)) + } + return io.NopCloser(bytes.NewReader(r.data[start : start+length])), headers, nil +} + +func TestParallelGetSingleWorkerFullRead(t *testing.T) { + // A concurrency of 1 gains nothing from chunking, so it must issue a single + // non-ranged read rather than discovering and serialising ranged GETs. + data := make([]byte, 1000) + for i := range data { + data[i] = byte(i % 251) + } + c := &recordingReader{data: data, etag: `"v1"`} + var dst bufferAt + err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 100, 1) + assert.NoError(t, err) + assert.Equal(t, data, dst.buf) + assert.Equal(t, []string{""}, c.opens) +} + func TestParallelGetNoETagSizeChangedBetweenRequests(t *testing.T) { // A no-ETag multi-chunk object falls back to a single full read. If it is // rewritten to a different size between discovery and that read, the diff --git a/internal/cache/parallel_get_test.go b/internal/cache/parallel_get_test.go index 9f0b5a4f..1baa3bb8 100644 --- a/internal/cache/parallel_get_test.go +++ b/internal/cache/parallel_get_test.go @@ -57,7 +57,7 @@ func TestParallelGet(t *testing.T) { {name: "UnevenChunks", chunkSize: 300, concurrency: 3}, {name: "SingleByteChunks", chunkSize: 1, concurrency: 8}, {name: "ChunkLargerThanObject", chunkSize: 5000, concurrency: 4}, - {name: "SerialFastPath", chunkSize: 100, concurrency: 1}, + {name: "SingleWorkerFullRead", chunkSize: 100, concurrency: 1}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -80,9 +80,13 @@ func TestParallelGetEmptyObject(t *testing.T) { assert.NoError(t, err) assert.NoError(t, w.Close()) - var dst bufferAt - assert.NoError(t, cache.ParallelGet(ctx, c, key, &dst, 100, 4)) - assert.Equal(t, 0, len(dst.buf)) + // concurrency 4 takes the ranged discovery path (ErrRangeNotSatisfiable), + // concurrency 1 takes the up-front full-read path; both must yield nothing. + for _, concurrency := range []int{4, 1} { + var dst bufferAt + assert.NoError(t, cache.ParallelGet(ctx, c, key, &dst, 100, concurrency)) + assert.Equal(t, 0, len(dst.buf)) + } } func TestParallelGetNotFound(t *testing.T) {