diff --git a/component/block_cache/block_cache.go b/component/block_cache/block_cache.go index c938a2e4f..daf0f1104 100644 --- a/component/block_cache/block_cache.go +++ b/component/block_cache/block_cache.go @@ -1570,7 +1570,7 @@ func (bc *BlockCache) commitBlocks(handle *handlemap.Handle) error { } } - blockIDList, err := bc.getBlockIDList(handle) + blockIDList, restageId, err := bc.getBlockIDList(handle) if err != nil { log.Err("BlockCache::commitBlocks : Failed to get block id list for %v [%v]", handle.Path, err.Error()) return err @@ -1598,11 +1598,31 @@ func (bc *BlockCache) commitBlocks(handle *handlemap.Handle) error { listMap[k].committed = true } + if restageId != "" { + // We need to restage with block by merging it to the next block + for i := range blockIDList { + if blockIDList[i] == restageId { + // Read one block from offset of this block, which shall effectively read this block and the next block + // Thne stage this block again with correct length + // Remove the next block from blockIDList + // Commit the block list again + block, err := bc.getOrCreateBlock(handle, uint64(i)*bc.blockSize) + if err != nil { + log.Err("BlockCache::commitBlocks : Failed to get block for %v [%v]", handle.Path, err.Error()) + return err + } + + block.Dirty() + return bc.commitBlocks(handle) + } + } + } + handle.Flags.Clear(handlemap.HandleFlagDirty) return nil } -func (bc *BlockCache) getBlockIDList(handle *handlemap.Handle) ([]string, error) { +func (bc *BlockCache) getBlockIDList(handle *handlemap.Handle) ([]string, string, error) { // generate the block id list order list, _ := handle.GetValue("blockList") listMap := list.(map[int64]*blockInfo) @@ -1617,27 +1637,59 @@ func (bc *BlockCache) getBlockIDList(handle *handlemap.Handle) ([]string, error) zeroBlockStaged := false zeroBlockID := "" + restageId := "" index := int64(0) i := 0 for i < len(offsets) { if index == offsets[i] { - // TODO: when a staged block (not last block) has data less than block size if i != len(offsets)-1 && listMap[offsets[i]].size != bc.blockSize { - log.Err("BlockCache::getBlockIDList : Staged block %v has less data %v for %v=>%s\n%v", offsets[i], listMap[offsets[i]].size, handle.ID, handle.Path, common.BlockCacheRWErrMsg) - return nil, fmt.Errorf("staged block %v has less data %v for %v=>%s\n%v", offsets[i], listMap[offsets[i]].size, handle.ID, handle.Path, common.BlockCacheRWErrMsg) - } + // A non last block was staged earlier and it is not of the same size as block size + // This happens when a block which is not full is staged and at that moment it was the last block + // Now we have written data beyond that point and its no longer the last block + // In such case we need to fill the gap with zero blocks + // For simplicity we will fill the gap with a new block and later merge both these blocks in one block + id := base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16)) + fillerSize := (bc.blockSize - listMap[offsets[i]].size) + fillerOffset := uint64(offsets[i]*int64(bc.blockSize)) + listMap[offsets[i]].size + + log.Debug("BlockCache::getBlockIDList : Staging semi zero block for %v=>%v offset %v, size %v", handle.ID, handle.Path, fillerOffset, fillerSize) + err := bc.NextComponent().StageData(internal.StageDataOptions{ + Name: handle.Path, + Data: bc.blockPool.zeroBlock.data[:fillerSize], + Id: id, + }) - blockIDList = append(blockIDList, listMap[offsets[i]].id) - log.Debug("BlockCache::getBlockIDList : Preparing blocklist for %v=>%s (%v : %v, size %v)", handle.ID, handle.Path, offsets[i], listMap[offsets[i]].id, listMap[offsets[i]].size) - index++ - i++ + if err != nil { + log.Err("BlockCache::getBlockIDList : Failed to write semi zero block for %v=>%v [%s]", handle.ID, handle.Path, err.Error()) + return nil, "", err + } + + blockIDList = append(blockIDList, listMap[offsets[i]].id) + log.Debug("BlockCache::getBlockIDList : Preparing blocklist for %v=>%s (%v : %v, size %v)", handle.ID, handle.Path, offsets[i], listMap[offsets[i]].id, listMap[offsets[i]].size) + + // After the flush call we need to merge this particular block with the next block (semi zero block) + restageId = listMap[offsets[i]].id + + // Add the semi zero block to the list + blockIDList = append(blockIDList, id) + log.Debug("BlockCache::getBlockIDList : Preparing blocklist for %v=>%s (%v : %v, size %v)", handle.ID, handle.Path, fillerOffset, id, fillerSize) + + index++ + i++ + + } else { + blockIDList = append(blockIDList, listMap[offsets[i]].id) + log.Debug("BlockCache::getBlockIDList : Preparing blocklist for %v=>%s (%v : %v, size %v)", handle.ID, handle.Path, offsets[i], listMap[offsets[i]].id, listMap[offsets[i]].size) + index++ + i++ + } } else { for index < offsets[i] { if !zeroBlockStaged { id, err := bc.stageZeroBlock(handle, 1) if err != nil { - return nil, err + return nil, "", err } zeroBlockStaged = true @@ -1657,7 +1709,7 @@ func (bc *BlockCache) getBlockIDList(handle *handlemap.Handle) ([]string, error) } } - return blockIDList, nil + return blockIDList, restageId, nil } func (bc *BlockCache) stageZeroBlock(handle *handlemap.Handle, tryCnt int) (string, error) { diff --git a/component/block_cache/block_cache_test.go b/component/block_cache/block_cache_test.go index 14a41e875..5a6083cd3 100644 --- a/component/block_cache/block_cache_test.go +++ b/component/block_cache/block_cache_test.go @@ -2830,6 +2830,57 @@ func (suite *blockCacheTestSuite) TestStrongConsistency() { suite.assert.NotEqualValues(xattrMd5sum1, xattrMd5sum2) } +func (suite *blockCacheTestSuite) TestReadCommittedLastBlockAfterAppends() { + prefetch := 12 + cfg := fmt.Sprintf("block_cache:\n block-size-mb: 1\n mem-size-mb: 12\n prefetch: %v\n parallelism: 10", prefetch) + tobj, err := setupPipeline(cfg) + defer tobj.cleanupPipeline() + + suite.assert.Nil(err) + suite.assert.NotNil(tobj.blockCache) + + path := getTestFileName(suite.T().Name()) + storagePath := filepath.Join(tobj.fake_storage_path, path) + + // write using block cache + options := internal.CreateFileOptions{Name: path, Mode: 0777} + h, err := tobj.blockCache.CreateFile(options) + suite.assert.Nil(err) + suite.assert.NotNil(h) + suite.assert.Equal(h.Size, int64(0)) + suite.assert.False(h.Dirty()) + + // Jump to 13thMB offset and write 500kb of data + n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(13 * _1MB), Data: dataBuff[:(_1MB / 2)]}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(_1MB/2)) + suite.assert.True(h.Dirty()) + + // Write remaining data backwords so that last block is staged first + for i := 0; i < 12; i++ { + + n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(uint64(12-i) * _1MB), Data: dataBuff[:_1MB]}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(_1MB)) + suite.assert.True(h.Dirty()) + } + + // Now Jump to 20thMB offset and write 500kb of data + n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(20 * _1MB), Data: dataBuff[:(_1MB / 2)]}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(_1MB/2)) + suite.assert.True(h.Dirty()) + + tobj.blockCache.FlushFile(internal.FlushFileOptions{Handle: h}) + + err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h}) + suite.assert.Nil(err) + + _, err = os.Stat(storagePath) + suite.assert.Nil(err) + suite.assert.Equal(h.Size, int64((20*_1MB)+(_1MB/2))) +} + // In order for 'go test' to run this suite, we need to create // a normal test function and pass our suite to suite.Run func TestBlockCacheTestSuite(t *testing.T) {