Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ build:rocm --copt="-D_GLIBCXX_USE_CXX11_ABI=1"
#build:rocm --action_env HIPBLASLT_LOG_MASK="32" # to log blasLt
#build:rocm --action_env ROCBLAS_LAYER="4"
#build:rocm --copt="-DENABLE_PROF=1"
build:rocm --action_env LD_LIBRARY_PATH="/opt/rh/gcc-toolset-12/root/usr/lib64:/lib64:/opt/conda310/lib/:/opt/rocm/lib/:/opt/taobao/java/jre/lib/amd64/server/:/opt/amdgpu/lib64/"
build:rocm --action_env LD_LIBRARY_PATH="/opt/rh/gcc-toolset-12/root/usr/lib64:/lib64:/opt/conda310/lib/:/opt/rocm/lib/:/opt/taobao/java/jre/lib/amd64/server/:/opt/amdgpu/lib64/:/lib64"
build:rocm --host_action_env LD_LIBRARY_PATH="/opt/rocm/lib/:/opt/taobao/java/jre/lib/amd64/server/:/opt/amdgpu/lib64/"
build:rocm --action_env PATH="/opt/rh/gcc-toolset-12/root/usr/bin:/opt/conda310/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/tmp/cache_bust_20260321"
build:rocm --host_action_env PATH="/opt/rh/gcc-toolset-12/root/usr/bin:/opt/conda310/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/tmp/cache_bust_20260321"
Expand Down
44 changes: 25 additions & 19 deletions rtp_llm/cpp/cache/HybridTypeKVCacheAllocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,17 @@ int HybridTypeKVCacheAllocator::reuseCache(const CacheKeysType& cache_keys, Batc
MallocResult HybridTypeKVCacheAllocator::incrMalloc(const MallocInfo& malloc_info) {
auto& kv_resource = malloc_info.batch_kv_cache_resource;
const int batch_size = kv_resource->batchSize();
const int seq_len = malloc_info.complete_token_ids->seqLength();
const int seq_len = malloc_info.incrSeqLen();
const int reserve_step = malloc_info.complete_token_ids->getReserveStep();

// Record original sizes for rollback in case any subsequent allocation fails
std::vector<std::vector<size_t>> original_sizes(batch_size);
// Record original block vectors for rollback in case any subsequent
// allocation fails. Linear groups can materialize existing NULL slots, so
// size-only rollback is not enough.
std::vector<std::vector<BlockIndicesType>> original_blocks(batch_size);
for (int b = 0; b < batch_size; ++b) {
original_sizes[b].resize(static_cast<size_t>(kv_resource->groupNums()));
original_blocks[b].resize(static_cast<size_t>(kv_resource->groupNums()));
for (int gid = 0; gid < kv_resource->groupNums(); ++gid) {
original_sizes[b][static_cast<size_t>(gid)] = kv_resource->blocksNum(b, gid);
original_blocks[b][static_cast<size_t>(gid)] = kv_resource->blocks(b, gid);
}
}

Expand Down Expand Up @@ -189,25 +191,29 @@ MallocResult HybridTypeKVCacheAllocator::incrMalloc(const MallocInfo& malloc_inf
return {true, 0};
}

// rollback kvcache blocks
// Roll back kvcache blocks. Free any valid block that was not present in
// the original vector, including backfilled existing NULL slots.
BlockIndicesType blocks_to_free;

for (int b = 0; b < batch_size; ++b) {
for (int b = 0; b <= failed_batch && b < batch_size; ++b) {
for (int gid = 0; gid < kv_resource->groupNums(); ++gid) {
auto& block_ids = kv_resource->mutableBlockIds(b, gid);
size_t original_num = original_sizes[b][static_cast<size_t>(gid)];
if (block_ids.blocksNum() > original_num) {
const auto& blk = block_ids.blocks();
for (size_t i = original_num; i < blk.size(); ++i) {
if (!isNullBlockIdx(blk[i])) {
blocks_to_free.push_back(blk[i]);
}
auto& block_ids = kv_resource->mutableBlockIds(b, gid);
const auto& original = original_blocks[b][static_cast<size_t>(gid)];

std::unordered_set<BlockIdxType> original_valid_blocks;
original_valid_blocks.reserve(original.size());
for (auto block : original) {
if (!isNullBlockIdx(block)) {
original_valid_blocks.insert(block);
}
block_ids.resize(original_num);
}
}
if (b > failed_batch) {
break;

for (auto block : block_ids.blocks()) {
if (!isNullBlockIdx(block) && original_valid_blocks.find(block) == original_valid_blocks.end()) {
blocks_to_free.push_back(block);
}
}
block_ids.assign(original);
}
}
if (!blocks_to_free.empty()) {
Expand Down
14 changes: 12 additions & 2 deletions rtp_llm/cpp/cache/KVCacheResource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,18 @@ void BlockIds::remove(const std::vector<size_t>& indices) {
}

void BlockIds::swap(size_t pos_a, size_t pos_b) {
RTP_LLM_CHECK(pos_a < block_indices.size());
RTP_LLM_CHECK(pos_b < block_indices.size());
if (pos_a >= block_indices.size() || pos_b >= block_indices.size()) {
RTP_LLM_LOG_ERROR("BlockIds::swap: pos_a=%d or pos_b=%d is out of range, block_indices.size()=%d",
pos_a,
pos_b,
block_indices.size());
RTP_LLM_CHECK_WITH_INFO(false,
"BlockIds::swap: pos_a=%d or pos_b=%d is out of range, block_indices.size()=%d",
pos_a,
pos_b,
block_indices.size());
}

if (pos_a == pos_b) {
return;
}
Expand Down
135 changes: 81 additions & 54 deletions rtp_llm/cpp/cache/LinearKVCacheGroup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,48 @@ int LinearKVCacheGroup::needBlocksNum(int seq_len, int current_blocks, int reser

NeedBlocksInfo LinearKVCacheGroup::getNeedBlocks(
int common_seq_len, int seq_len, int reserve_step, int reuse_blocks_len, bool reuse_enabled) const {
const int reuse_begin = reuse_blocks_len;
const int step = std::max(1, linear_step_);

// calculate the number of blocks in the range (begin, end]
auto count_linear_sparse_range = [&](int begin, int end) -> int {
if (end <= begin) {
return 0;
}
if (!reuse_enabled) {
// keeps only the tail block
return 1;
}
const int eligible = (end + 1) / step - (begin + 1) / step;
const int tail = ((end + 1) % step == 0) ? 0 : 1;
return eligible + tail;
};

NeedBlocksInfo info;
const int step = std::max(1, linear_step_);

// common_slots: blocks for common_seq_len (no reserve)
const int common_slots = needBlocksNum(common_seq_len, 0);
// seq_slots: blocks for seq_len (no reserve)
const int seq_slots = needBlocksNum(seq_len, 0);
// total_slots = seq_slots + reserve_step
// total_slots includes reserve_step - 1 extra linear slots when reserve_step is non-zero.
const int total_slots = needBlocksNum(seq_len, 0, reserve_step);

info.common_blocks = count_linear_sparse_range(reuse_begin, common_slots);
info.extra_blocks = count_linear_sparse_range(common_slots, seq_slots);
info.extra_blocks += std::max(total_slots - seq_slots, 0); // for reserve_step
auto should_materialize = [&](int pos, int cur_seq_slots, int cur_total_slots) {
const bool is_seq_tail =
(cur_seq_slots > 0) && (pos >= std::max(0, cur_seq_slots - 2)) && (pos < cur_seq_slots);
const bool is_reserve = (reserve_step > 0) && (pos >= cur_seq_slots) && (pos < cur_total_slots);
const bool step_hit = (((pos + 1) % step) == 0);
return is_reserve || (reuse_enabled ? (step_hit || is_seq_tail) : is_seq_tail);
};

auto common_required = [&](int pos) { return should_materialize(pos, common_slots, common_slots); };
auto final_required = [&](int pos) { return should_materialize(pos, seq_slots, total_slots); };

NeedBlocksInfo info;
for (int pos = 0; pos < common_slots; ++pos) {
if (common_required(pos)) {
info.common_blocks++;
}
}
for (int pos = 0; pos < total_slots; ++pos) {
if (final_required(pos) && !(pos < common_slots && common_required(pos))) {
info.extra_blocks++;
}
}

// Linear reuse materializes only one prefix block: the matched tail at
// reuse_blocks_len - 1. Do not count that block as newly allocated.
const int reused_tail_pos = (reuse_enabled && reuse_blocks_len > 0) ? reuse_blocks_len - 1 : -1;
if (reused_tail_pos >= 0) {
if (reused_tail_pos < common_slots && common_required(reused_tail_pos)) {
info.common_blocks--;
} else if (reused_tail_pos < total_slots && final_required(reused_tail_pos)) {
info.extra_blocks--;
}
}

info.common_blocks = std::max(info.common_blocks, 0);
info.extra_blocks = std::max(info.extra_blocks, 0);
Expand All @@ -76,29 +89,32 @@ bool LinearKVCacheGroup::malloc(BlockIds& block_ids, int seq_len, bool enable_re
const int step = std::max(1, linear_step_);
const int current_blocks_len = static_cast<int>(block_ids.blocksNum());
const int seq_slots = needBlocksNum(seq_len, 0, 0);
const int new_blocks_len = needBlocksNum(seq_len, current_blocks_len, reserve_step);
const int total_slots = needBlocksNum(seq_len, 0, reserve_step);
const int new_blocks_len = std::max(total_slots - current_blocks_len, 0);

auto should_materialize = [&](int pos) {
// Materialize tail and tail-1: causal_conv1d_update may read
// (seq_len - 2) / SBP when seq_len crosses a block boundary.
// Leaving tail-1 NULL can hit IMA on long prompts.
const bool is_seq_tail = (seq_slots > 0) && (pos >= std::max(0, seq_slots - 2)) && (pos < seq_slots);
const bool is_reserve = (reserve_step > 0) && (pos >= seq_slots) && (pos < total_slots);
const bool step_hit = (((pos + 1) % step) == 0);
return is_reserve || (enable_reuse_cache ? (step_hit || is_seq_tail) : is_seq_tail);
};

if (new_blocks_len == 0) {
return true;
std::vector<size_t> positions_to_backfill;
const auto& existing_blocks = block_ids.blocks();
const int existing_scan = std::min(current_blocks_len, total_slots);
for (int i = 0; i < existing_scan; ++i) {
if (should_materialize(i) && isNullBlockIdx(existing_blocks[static_cast<size_t>(i)])) {
positions_to_backfill.push_back(static_cast<size_t>(i));
}
}

// LinearKVCacheGroup::malloc is responsible for:
// 1. allocating blocks for the current sequence length;
// 2. free unused blocks to reduce kvcache block usage;

// Two policies to follow:
// 1. Linear Steps: keep N * linear_step blocks if cache reuse enabled;
// 2. Allocate Tail Blocks: allocate the last partial block when initialization and keep last 2 block during
// decoding;

int need_alloc_blocks = 0;

for (int i = current_blocks_len; i < current_blocks_len + new_blocks_len; i++) {
const bool is_seq_tail = (seq_slots > 0) && (i == seq_slots - 1);
const bool is_reserve = (reserve_step > 0) && (i >= seq_slots);
const bool step_hit = (((i + 1) % step) == 0);
const bool should_alloc = is_reserve || (enable_reuse_cache ? (step_hit || is_seq_tail) : is_seq_tail);
if (should_alloc) {
need_alloc_blocks += static_cast<int>(positions_to_backfill.size());
for (int i = current_blocks_len; i < total_slots; i++) {
if (should_materialize(i)) {
need_alloc_blocks++;
}
}
Expand All @@ -115,24 +131,35 @@ bool LinearKVCacheGroup::malloc(BlockIds& block_ids, int seq_len, bool enable_re
}
}

BlockIndicesType allocated_blocks;
if (need_alloc_blocks > 0) {
allocated_blocks = block_pool_->malloc(need_alloc_blocks);
if (allocated_blocks.size() != static_cast<size_t>(need_alloc_blocks)) {
return false;
}
}

size_t allocated_idx = 0;
for (size_t pos : positions_to_backfill) {
block_ids.setAt(pos, allocated_blocks[allocated_idx++]);
}

BlockIndicesType new_ids;
new_ids.reserve(static_cast<size_t>(new_blocks_len));
for (int i = current_blocks_len; i < current_blocks_len + new_blocks_len; i++) {
const bool is_seq_tail = (seq_slots > 0) && (i == seq_slots - 1);
const bool is_reserve = (reserve_step > 0) && (i >= seq_slots);
const bool step_hit = (((i + 1) % step) == 0);
const bool should_alloc = is_reserve || (enable_reuse_cache ? (step_hit || is_seq_tail) : is_seq_tail);
if (should_alloc) {
auto result = block_pool_->malloc(1);
if (result.empty()) {
return false;
}
new_ids.push_back(result[0]);
for (int i = current_blocks_len; i < total_slots; i++) {
if (should_materialize(i)) {
new_ids.push_back(allocated_blocks[allocated_idx++]);
} else {
new_ids.push_back(NULL_BLOCK_IDX);
}
}
block_ids.add(new_ids);
if (!new_ids.empty()) {
block_ids.add(new_ids);
}
RTP_LLM_CHECK_WITH_INFO(allocated_idx == allocated_blocks.size(),
"linear kv allocation accounting mismatch, used=%zu allocated=%zu",
allocated_idx,
allocated_blocks.size());
return true;
}

Expand Down Expand Up @@ -172,7 +199,7 @@ void LinearKVCacheGroup::removeSkippedBlocks(BlockIds& block_ids, bool enable_re
// keep last 2 and every reserve_step
for (int i = block_size - 3 - reserve_step; i >= 0; i--) {
if (isNullBlockIdx(block_indices[i])) {
break;
continue;
}
if (enable_reuse_cache && ((i + 1) % step) == 0) {
continue;
Expand Down
1 change: 1 addition & 0 deletions rtp_llm/cpp/cache/LinearKVCacheGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class LinearKVCacheGroup: public KVCacheGroup {
int reserve_step,
int reuse_blocks_len,
bool reuse_enabled = false) const override;
bool shouldMaterializeBlock(int pos, int seq_len, int reserve_step, bool enable_reuse_cache) const;

private:
void filterValidBlocks(const BlockIndicesType& in, BlockIndicesType& out) const;
Expand Down
2 changes: 1 addition & 1 deletion rtp_llm/cpp/cache/SingleTypeKVCacheAllocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ MallocResult SingleTypeKVCacheAllocator::incrMalloc(const MallocInfo& malloc_inf
auto& kv_resource = malloc_info.batch_kv_cache_resource;
int batch_size = kv_resource->batchSize();
int current_blocks = kv_resource->curBlocksNum();
int seq_len = malloc_info.complete_token_ids->seqLength();
int seq_len = malloc_info.incrSeqLen();
int reserve_step = malloc_info.complete_token_ids->getReserveStep();

auto need_blocks = full_kv_cache_group_->needBlocksNum(seq_len, current_blocks, reserve_step);
Expand Down
7 changes: 7 additions & 0 deletions rtp_llm/cpp/cache/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ struct MallocInfo {
bool enable_device_cache = true;
// Sparse linear-block cleanup is only valid for incremental allocation.
bool enable_remove_skipped_blocks = true;
// Override for incrMalloc's seqLength read; -1 = fall back to complete_token_ids->seqLength().
// Lets the state machine feed the publish-time value instead of racing with the async worker.
int incr_seq_len_override = -1;

int incrSeqLen() const {
return incr_seq_len_override >= 0 ? incr_seq_len_override : complete_token_ids->seqLength();
}
};

struct MallocResult {
Expand Down
10 changes: 6 additions & 4 deletions rtp_llm/cpp/cache/connector/memory/KVCacheMemoryConnector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,9 @@ std::shared_ptr<AsyncContext> KVCacheMemoryConnector::asyncWrite(const std::shar
item.is_complete = copy_info.is_complete;
putToCache(item);
}
// reset resource to decrease block ref count in destructor
resource_copy.reset();
}
// reset resource to decrease block ref count in destructor
resource_copy.reset();
const int64_t write_block_num = success ? static_cast<int64_t>(copy_plan->copy_infos.size()) : 0;
// reset copy plan to release memory block refs
copy_plan.reset();
Expand Down Expand Up @@ -466,9 +466,11 @@ bool KVCacheMemoryConnector::startCopyAsync(const std::shared_ptr<MemoryAsyncCon
if (stop_.load()) {
return false;
}
auto code = wait_done_thread_pool_->pushTask([this, context, copy_plan]() mutable {
auto send_result = sendCopyPlan(copy_plan);
auto task_copy_plan = copy_plan;
auto code = wait_done_thread_pool_->pushTask([this, context, task_copy_plan]() mutable {
auto send_result = sendCopyPlan(task_copy_plan);
context->setBroadcastResult(send_result);
task_copy_plan.reset();
context->waitDone();
});
if (code != autil::ThreadPoolBase::ERROR_NONE) {
Expand Down
Loading
Loading