Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion client/parallel_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
if err != nil {
return errors.Wrap(err, "parallel get: full read")
}
return errors.Wrap(writeChunkAt(dst, 0, total, full), "parallel get")
// The full read is a fresh request whose body may be a different
// revision than discovery, so the discovery `total` cannot validate its
// length; -1 skips the check and relies on transport-level EOF detection.
Comment thread
alecthomas marked this conversation as resolved.
return errors.Wrap(writeChunkAt(dst, 0, -1, full), "parallel get")
}

// Multiple chunks: copy the already-open first chunk concurrently with the
Expand Down
40 changes: 40 additions & 0 deletions client/parallel_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,43 @@ func TestParallelGetNoETagSingleChunk(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, data, dst.buf)
}

// changingSizeReader serves a multi-chunk body with no ETag on the ranged
// discovery request, then a differently sized body on the subsequent full
// (non-range) read, modelling an object rewritten between the two requests.
type changingSizeReader struct {
discovery []byte
rewritten []byte
}

func (c *changingSizeReader) Open(_ context.Context, _ client.Key, opts ...client.RequestOption) (io.ReadCloser, http.Header, error) {
o := client.NewRequestOptions(opts...)
headers := http.Header{}
if o.Range == "" {
headers.Set("Content-Length", strconv.FormatInt(int64(len(c.rewritten)), 10))
return io.NopCloser(bytes.NewReader(c.rewritten)), headers, nil
}
size := int64(len(c.discovery))
start, length, outcome := o.ResolveRange(size, "")
if outcome == client.RangeNotSatisfiable {
headers.Set("Content-Range", fmt.Sprintf("bytes */%d", size))
return nil, headers, client.ErrRangeNotSatisfiable
}
headers.Set("Content-Length", strconv.FormatInt(length, 10))
if outcome == client.RangePartial {
headers.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, start+length-1, size))
}
return io.NopCloser(bytes.NewReader(c.discovery[start : start+length])), headers, nil
}

func TestParallelGetNoETagSizeChangedBetweenRequests(t *testing.T) {
// A no-ETag multi-chunk object falls back to a single full read. If it is
// rewritten to a different size between discovery and that read, the
// discovery total must not be used to validate the full body: the full read
// is itself a consistent revision and should be accepted in its entirety.
c := &changingSizeReader{discovery: make([]byte, 1000), rewritten: []byte("changed")}
var dst bufferAt
err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 100, 4)
assert.NoError(t, err)
assert.Equal(t, c.rewritten, dst.buf)
}