Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def __init__(

main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.available_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.available_gpu_resource.set(1.0)

@property
Expand Down Expand Up @@ -274,6 +275,7 @@ def update_cache_config(self, cache_config):

main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.available_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.available_gpu_resource.set(1.0)

def can_allocate_gpu_blocks(self, num_blocks: int):
Expand Down
4 changes: 2 additions & 2 deletions fastdeploy/engine/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ def allocate_resources_for_new_tasks(self, tasks):
break

# record batch size here
task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - task_used_block_num)
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - num_blocks_used_by_tasks)
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())

Expand Down
20 changes: 10 additions & 10 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_re
llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}")
preempted_reqs.append(preempted_req)
scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
main_process_metrics.num_requests_waiting.inc(1)
main_process_metrics.num_requests_running.dec(1)
if preempted_req == request:
# No more request to preempt.
can_schedule = False
Expand Down Expand Up @@ -381,8 +379,6 @@ def schedule(self):
request, self.config.cache_config.block_size, request.num_computed_tokens
)
request.status = RequestStatus.RUNNING
main_process_metrics.num_requests_waiting.dec(1)
main_process_metrics.num_requests_running.inc(1)
allocated_position = self.get_available_position()
request.idx = allocated_position
self.tasks_list[allocated_position] = request
Expand Down Expand Up @@ -426,20 +422,24 @@ def schedule(self):
request, self.config.cache_config.block_size, request.num_computed_tokens
)
request.status = RequestStatus.RUNNING
main_process_metrics.num_requests_waiting.dec(1)
main_process_metrics.num_requests_running.inc(1)
else:
if self.config.cache_config.enable_prefix_caching:
self._free_blocks(request)
break
else:
llm_logger.error("Unknown request status type")
if scheduled_reqs:
task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - task_used_block_num)
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
llm_logger.debug(f"schedued_reqs: {scheduled_reqs}")

# Update metrics
num_tasks = sum([1 if task else 0 for task in self.tasks_list])
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - num_blocks_used_by_tasks)
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.num_requests_running.set(len(self.running))
main_process_metrics.num_requests_waiting.set(num_tasks - len(self.running))

return scheduled_reqs

def get_available_position(self) -> int:
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class MetricsManager:
"available_gpu_block_num": {
"type": Gauge,
"name": "fastdeploy:available_gpu_block_num",
"description": "Number of available gpu blocks in cache, including prefix caching blocks that are not officially released",
"description": "Number of available gpu blocks in cache, including blocks in LRU list",
"kwargs": {},
},
"free_gpu_block_num": {
Expand Down
7 changes: 5 additions & 2 deletions fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,12 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False
self.resource_manager.tasks_list[index] = None
self.resource_manager._recycle_block_tables(task)

task_used_block_num = sum([len(task.block_tables) if task else 0 for task in self.resource_manager.tasks_list])
# Update block metrics
num_blocks_used_by_tasks = sum(
[len(task.block_tables) if task else 0 for task in self.resource_manager.tasks_list]
)
main_process_metrics.available_gpu_block_num.set(
self.resource_manager.total_block_number() - task_used_block_num
self.resource_manager.total_block_number() - num_blocks_used_by_tasks
)
main_process_metrics.batch_size.set(
self.resource_manager.max_num_seqs - self.resource_manager.available_batch()
Expand Down
Loading