diff --git a/intdecoder.go b/intdecoder.go index 690a8b3..5624527 100644 --- a/intdecoder.go +++ b/intdecoder.go @@ -87,6 +87,9 @@ func (d *chunkedIntDecoder) loadChunk(chunk int) error { if err != nil { return err } + if len(curChunkBytesData) == 0 { + return nil + } d.uncompressed, err = ZSTDDecompress(d.uncompressed[:cap(d.uncompressed)], curChunkBytesData) if err != nil { return err diff --git a/load.go b/load.go index b242efb..1ad2b41 100644 --- a/load.go +++ b/load.go @@ -60,6 +60,8 @@ func load(data *segment.Data) (*Segment, error) { return nil, err } + rv.initDecompressedStoredFieldChunks(len(rv.storedFieldChunkOffsets)) + err = rv.loadDvReaders() if err != nil { return nil, err diff --git a/new.go b/new.go index a06a44b..aceee3f 100644 --- a/new.go +++ b/new.go @@ -99,6 +99,7 @@ func initSegmentBase(mem []byte, footer *footer, fieldFSTs: make(map[uint16]*vellum.FST), storedFieldChunkOffsets: storedFieldChunkOffsets, } + sb.initDecompressedStoredFieldChunks(len(storedFieldChunkOffsets)) sb.updateSize() err := sb.loadDvReaders() diff --git a/read.go b/read.go index 809555a..1b14252 100644 --- a/read.go +++ b/read.go @@ -18,47 +18,66 @@ import ( "encoding/binary" ) -func (s *Segment) getDocStoredMetaAndUnCompressed(docNum uint64) (meta, data []byte, err error) { - _, storedOffset, n, metaLen, dataLen, err := s.getDocStoredOffsets(docNum) - if err != nil { - return nil, nil, err +func (s *Segment) initDecompressedStoredFieldChunks(n int) { + s.m.Lock() + s.decompressedStoredFieldChunks = make(map[uint32]*segmentCacheData, n) + for i := uint32(0); i < uint32(n); i++ { + s.decompressedStoredFieldChunks[i] = &segmentCacheData{} } - - meta = s.storedFieldChunkUncompressed[int(storedOffset+n):int(storedOffset+n+metaLen)] - data = s.storedFieldChunkUncompressed[int(storedOffset+n+metaLen):int(storedOffset+n+metaLen+dataLen)] - return meta, data, nil + s.m.Unlock() } -func (s *Segment) getDocStoredOffsets(docNum uint64) (indexOffset, storedOffset, n, metaLen, dataLen uint64, err error) { - indexOffset, storedOffset, err = s.getDocStoredOffsetsOnly(docNum) +func (s *Segment) getDocStoredMetaAndUnCompressed(docNum uint64) (meta, data []byte, err error) { + _, storedOffset, err := s.getDocStoredOffsetsOnly(docNum) if err != nil { - return 0, 0, 0, 0, 0, err + return nil, nil, err } // document chunk coder - chunkI := docNum / uint64(defaultDocumentChunkSize) - chunkOffsetStart := s.storedFieldChunkOffsets[int(chunkI)] - chunkOffsetEnd := s.storedFieldChunkOffsets[int(chunkI)+1] - compressed, err := s.data.Read(int(chunkOffsetStart), int(chunkOffsetEnd)) - if err != nil { - return 0, 0, 0, 0, 0, err + var uncompressed []byte + chunkI := uint32(docNum) / defaultDocumentChunkSize + storedFieldDecompressed := s.decompressedStoredFieldChunks[chunkI] + storedFieldDecompressed.m.Lock() + if storedFieldDecompressed.data == nil { + // we haven't already loaded and decompressed this chunk + chunkOffsetStart := s.storedFieldChunkOffsets[int(chunkI)] + chunkOffsetEnd := s.storedFieldChunkOffsets[int(chunkI)+1] + compressed, err := s.data.Read(int(chunkOffsetStart), int(chunkOffsetEnd)) + if err != nil { + return nil, nil, err + } + + // decompress it + storedFieldDecompressed.data, err = ZSTDDecompress(nil, compressed) + if err != nil { + return nil, nil, err + } } - s.storedFieldChunkUncompressed = s.storedFieldChunkUncompressed[:0] - s.storedFieldChunkUncompressed, err = ZSTDDecompress(s.storedFieldChunkUncompressed[:cap(s.storedFieldChunkUncompressed)], compressed) - if err != nil { - return 0, 0, 0, 0, 0, err + // once initialized it wouldn't change, so we can unlock the mutex + uncompressed = storedFieldDecompressed.data + storedFieldDecompressed.m.Unlock() + + metaDataLenEnd := storedOffset + binary.MaxVarintLen64 + if metaDataLenEnd > uint64(len(uncompressed)) { + metaDataLenEnd = uint64(len(uncompressed)) } + metaLenData := uncompressed[storedOffset:metaDataLenEnd] - metaLenData := s.storedFieldChunkUncompressed[int(storedOffset):int(storedOffset+binary.MaxVarintLen64)] - var read int - metaLen, read = binary.Uvarint(metaLenData) + var n uint64 + metaLen, read := binary.Uvarint(metaLenData) n += uint64(read) - dataLenData := s.storedFieldChunkUncompressed[int(storedOffset+n):int(storedOffset+n+binary.MaxVarintLen64)] - dataLen, read = binary.Uvarint(dataLenData) + dataLenEnd := storedOffset + n + binary.MaxVarintLen64 + if dataLenEnd > uint64(len(uncompressed)) { + dataLenEnd = uint64(len(uncompressed)) + } + dataLenData := uncompressed[int(storedOffset+n):dataLenEnd] + dataLen, read := binary.Uvarint(dataLenData) n += uint64(read) - return indexOffset, storedOffset, n, metaLen, dataLen, nil + meta = uncompressed[int(storedOffset+n):int(storedOffset+n+metaLen)] + data = uncompressed[int(storedOffset+n+metaLen):int(storedOffset+n+metaLen+dataLen)] + return meta, data, nil } func (s *Segment) getDocStoredOffsetsOnly(docNum uint64) (indexOffset, storedOffset uint64, err error) { diff --git a/segment.go b/segment.go index 19b768c..d49b800 100644 --- a/segment.go +++ b/segment.go @@ -40,8 +40,7 @@ type Segment struct { fieldDocs map[uint16]uint64 // fieldID -> # docs with value in field fieldFreqs map[uint16]uint64 // fieldID -> # total tokens in field - storedFieldChunkOffsets []uint64 // stored field chunk offset - storedFieldChunkUncompressed []byte // for uncompress cache + storedFieldChunkOffsets []uint64 // stored field chunk offset dictLocs []uint64 fieldDvReaders map[uint16]*docValueReader // naive chunk cache per field @@ -49,8 +48,14 @@ type Segment struct { size uint64 // state loaded dynamically - m sync.Mutex - fieldFSTs map[uint16]*vellum.FST + m sync.RWMutex + fieldFSTs map[uint16]*vellum.FST + decompressedStoredFieldChunks map[uint32]*segmentCacheData +} + +type segmentCacheData struct { + data []byte + m sync.RWMutex } func (s *Segment) WriteTo(w io.Writer, _ chan struct{}) (int64, error) { diff --git a/segment_test.go b/segment_test.go index 4f25f52..09d5918 100644 --- a/segment_test.go +++ b/segment_test.go @@ -17,6 +17,7 @@ package ice import ( "path/filepath" "reflect" + "sync" "testing" segment "github.com/blugelabs/bluge_segment_api" @@ -549,3 +550,37 @@ func checkExpectedFields(t *testing.T, fields []string) { } } } + +func TestSegmentConcurrency(t *testing.T) { + path, cleanup := setupTestDir(t) + defer cleanup() + + segPath := filepath.Join(path, "segment.ice") + seg, closeF, err := createDiskSegment(buildTestSegmentMulti, segPath) + if err != nil { + t.Fatal(err) + } + defer func() { + cerr := closeF() + if cerr != nil { + t.Fatalf("error closing segment: %v", err) + } + }() + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + meta, data, err := seg.getDocStoredMetaAndUnCompressed(0) + if err != nil { + t.Errorf("getDocStoredMetaAndUnCompressed err: %v", err) + } + if meta == nil || data == nil { + t.Errorf("getDocStoredMetaAndUnCompressed meta or data should not be nil") + } + wg.Done() + }() + } + + wg.Wait() +}