Skip to content

Commit

Permalink
refs #12: Add a new PPC-based LB and eliminate randomness in LBs.
Browse files Browse the repository at this point in the history
 * IMPORTANT CHANGE: We no longer use random variables in our load
   balancers!  Instead, load balancers calculate how many batches should
   be sent to the CPU while the GPU always take the coproc-ppdepth
   number of batches.  This greatly stabilizes the PPC of GPUs.

 * Now load balancers track the CPU ratio per NUMA node.
  • Loading branch information
achimnol committed Jul 7, 2015
1 parent 2678e7a commit 5747b12
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 40 deletions.
95 changes: 61 additions & 34 deletions elements/loadbalancers/LoadBalanceAdaptiveMeasure.hh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#define LB_MEASURE_CPU_RATIO_MULTIPLIER (1000)
#define LB_MEASURE_CPU_RATIO_DELTA (50)
#define LB_MEASURE_REPTITON_PER_RATIO (8)
#define LB_MEASURE_REPTITON_PER_RATIO (16)

namespace nba {

Expand All @@ -33,62 +33,91 @@ public:
const char *port_count() const { return "1/1"; }
int get_type() const { return SchedulableElement::get_type() | PerBatchElement::get_type(); }

int initialize() {
uniform_dist = std::uniform_int_distribution<int64_t>(0, LB_MEASURE_CPU_RATIO_MULTIPLIER);
random_generator = std::default_random_engine();

int initialize()
{
/* We have only two ranges for CPU and GPU. */
local_cpu_ratio = 0;
print_count = 1;
rep = 0;
rep_limit = ctx->num_coproc_ppdepth;
offload = false;
cpu_ratio = (rte_atomic64_t *) ctx->node_local_storage->get_alloc("LBMeasure.cpu_weight");
return 0;
}

int initialize_global() { return 0; }

int initialize_per_node()
{
ctx->node_local_storage->alloc("LBPPC.cpu_weight", sizeof(rte_atomic64_t));
rte_atomic64_t *node_cpu_ratio = (rte_atomic64_t *)
ctx->node_local_storage->get_alloc("LBPPC.cpu_weight");
assert(node_cpu_ratio != nullptr);
rte_atomic64_set(node_cpu_ratio, 0);
return 0;
}
int initialize_global() { rte_atomic64_set(&(cpu_ratio), 0); return 0; }
int initialize_per_node() { return 0; }

int configure(comp_thread_context *ctx, std::vector<std::string> &args)
{
Element::configure(ctx, args);
RTE_LOG(INFO, LB, "load balancer mode: Adaptive\n");
RTE_LOG(INFO, LB, "load balancer mode: Measure\n");
return 0;
}

int process_batch(int input_port, PacketBatch *batch)
{
/* Generate a random number and find the interval where it belongs to. */
int64_t x = uniform_dist(random_generator);
int _temp = (x > local_cpu_ratio);
anno_set(&batch->banno, NBA_BANNO_LB_DECISION, _temp);
int decision = 0;
const float c = (float) local_cpu_ratio / LB_MEASURE_CPU_RATIO_MULTIPLIER;
rep ++;
if (offload) {
decision = 1;
if (rep == rep_limit) { // Change to CPU-mode
if (local_cpu_ratio == 0)
rep_limit = 0; // only once for sampling!
else
rep_limit = (unsigned) (c * ctx->num_coproc_ppdepth / (1.0f - c));
rep = 0;
offload = false;
}
} else {
decision = 0;
if (rep == rep_limit) { // Change to GPU-mode
rep_limit = ctx->num_coproc_ppdepth;
rep = 0;
offload = true;
}
}
anno_set(&batch->banno, NBA_BANNO_LB_DECISION, decision);
return 0;
}

int dispatch(uint64_t loop_count, PacketBatch*& out_batch, uint64_t &next_delay)
{
next_delay = 200000;
int64_t temp_cpu_ratio = rte_atomic64_read(&cpu_ratio);
next_delay = 200000; // 0.2sec
int64_t temp_cpu_ratio = rte_atomic64_read(cpu_ratio);
local_cpu_ratio = temp_cpu_ratio;

//if (ctx->io_ctx->loc.local_thread_idx == 0) {
if (ctx->io_ctx->loc.core_id == 0) {
double cpu_ppc = ctx->inspector->pkt_proc_cycles[0];
double gpu_ppc = ctx->inspector->pkt_proc_cycles[1];
double estimated_ppc = (temp_cpu_ratio * cpu_ppc
+ (LB_MEASURE_CPU_RATIO_MULTIPLIER - temp_cpu_ratio) * gpu_ppc)
/ LB_MEASURE_CPU_RATIO_MULTIPLIER;

printf("[MEASURE:%d] CPU %12f GPU %12f PPC %12f Ratio %.3f\n", ctx->loc.node_id,
cpu_ppc, gpu_ppc, estimated_ppc, ((double)temp_cpu_ratio) / LB_MEASURE_CPU_RATIO_MULTIPLIER);

if ((print_count++) % LB_MEASURE_REPTITON_PER_RATIO == 0)
{
const float ppc_cpu = ctx->inspector->pkt_proc_cycles[0];
const float ppc_gpu = ctx->inspector->pkt_proc_cycles[1];
const float estimated_ppc = (temp_cpu_ratio * ppc_cpu
+ (LB_MEASURE_CPU_RATIO_MULTIPLIER - temp_cpu_ratio) * ppc_gpu)
/ LB_MEASURE_CPU_RATIO_MULTIPLIER;
const float c = (float) temp_cpu_ratio / LB_MEASURE_CPU_RATIO_MULTIPLIER;
printf("[MEASURE:%d] CPU %'8.0f GPU %'8.0f PPC %'8.0f CPU-Ratio %.3f (cpu_rep_limit %u)\n",
ctx->loc.node_id,
ppc_cpu, ppc_gpu, estimated_ppc, c,
(unsigned) (c * ctx->num_coproc_ppdepth / (1.0f - c)));

if ((print_count++) % LB_MEASURE_REPTITON_PER_RATIO == 0) {
temp_cpu_ratio += LB_MEASURE_CPU_RATIO_DELTA;
if (temp_cpu_ratio > LB_MEASURE_CPU_RATIO_MULTIPLIER - LB_MEASURE_CPU_RATIO_DELTA)
{
if (temp_cpu_ratio > LB_MEASURE_CPU_RATIO_MULTIPLIER - LB_MEASURE_CPU_RATIO_DELTA) {
temp_cpu_ratio = LB_MEASURE_CPU_RATIO_MULTIPLIER - LB_MEASURE_CPU_RATIO_DELTA;
printf("END_OF_TEST\n");
raise(SIGINT);
}
rte_atomic64_set(&cpu_ratio, temp_cpu_ratio);
rte_atomic64_set(cpu_ratio, temp_cpu_ratio);
}
}

Expand All @@ -97,15 +126,13 @@ public:
}

private:
std::uniform_int_distribution<int64_t> uniform_dist;
std::default_random_engine random_generator;

static rte_atomic64_t cpu_ratio;
rte_atomic64_t *cpu_ratio;
int64_t local_cpu_ratio;
int print_count;
};

rte_atomic64_t LoadBalanceAdaptiveMeasure::cpu_ratio;
unsigned rep, rep_limit;
bool offload;
};

EXPORT_ELEMENT(LoadBalanceAdaptiveMeasure);

Expand Down
146 changes: 146 additions & 0 deletions elements/loadbalancers/LoadBalancePPC.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#ifndef __NBA_ELEMENT_LOADBALANCEPPC_HH__
#define __NBA_ELEMENT_LOADBALANCEPPC_HH__

#include "../../lib/element.hh"
#include "../../lib/annotation.hh"
#include "../../lib/loadbalancer.hh"
#include "../../lib/queue.hh"

#include <rte_errno.h>
#include <rte_log.h>
#include <rte_atomic.h>

#include <vector>
#include <string>
#include <random>
#include <cmath>

#define LB_PPC_CPU_RATIO_MULTIPLIER (1000)
#define LB_PPC_CPU_RATIO_DELTA (50)

namespace nba {

class LoadBalancePPC : public SchedulableElement, PerBatchElement {
public:
LoadBalancePPC() : SchedulableElement(), PerBatchElement()
{ }

virtual ~LoadBalancePPC()
{ }

const char *class_name() const { return "LoadBalancePPC"; }
const char *port_count() const { return "1/1"; }
int get_type() const { return SchedulableElement::get_type() | PerBatchElement::get_type(); }

int initialize()
{
local_cpu_ratio = LB_PPC_CPU_RATIO_MULTIPLIER;
delta = LB_PPC_CPU_RATIO_DELTA;
last_estimated_ppc = 0;
rep = 0;
rep_limit = ctx->num_coproc_ppdepth;
offload = false;
cpu_ratio = (rte_atomic64_t *) ctx->node_local_storage->get_alloc("LBPPC.cpu_weight");
return 0;
}

int initialize_global() { return 0; }

int initialize_per_node()
{
ctx->node_local_storage->alloc("LBPPC.cpu_weight", sizeof(rte_atomic64_t));
rte_atomic64_t *node_cpu_ratio = (rte_atomic64_t *)
ctx->node_local_storage->get_alloc("LBPPC.cpu_weight");
assert(node_cpu_ratio != nullptr);
rte_atomic64_set(node_cpu_ratio, 0);
return 0;
}

int configure(comp_thread_context *ctx, std::vector<std::string> &args)
{
Element::configure(ctx, args);
RTE_LOG(INFO, LB, "load balancer mode: Adaptive PPC\n");
return 0;
}

int process_batch(int input_port, PacketBatch *batch)
{
int decision = 0;
const float c = (float) local_cpu_ratio / LB_PPC_CPU_RATIO_MULTIPLIER;
rep ++;
if (offload) {
decision = 1;
if (rep == rep_limit) { // Change to CPU-mode
if (local_cpu_ratio == 0)
rep_limit = 0; // only once for sampling!
else
rep_limit = (unsigned) (c * ctx->num_coproc_ppdepth / (1.0f - c));
rep = 0;
offload = false;
}
} else {
decision = 0;
if (rep == rep_limit) { // Change to GPU-mode
rep_limit = ctx->num_coproc_ppdepth;
rep = 0;
offload = true;
}
}
anno_set(&batch->banno, NBA_BANNO_LB_DECISION, decision);
return 0;
}

int dispatch(uint64_t loop_count, PacketBatch*& out_batch, uint64_t &next_delay)
{
next_delay = 200000; // 0.2sec
int64_t temp_cpu_ratio = rte_atomic64_read(cpu_ratio);
local_cpu_ratio = temp_cpu_ratio;

if (ctx->loc.local_thread_idx == 0) {
int64_t temp_cpu_ratio = rte_atomic64_read(cpu_ratio);
const float ppc_cpu = ctx->inspector->pkt_proc_cycles[0];
const float ppc_gpu = ctx->inspector->pkt_proc_cycles[1];
const float estimated_ppc = (temp_cpu_ratio * ppc_cpu
+ (LB_PPC_CPU_RATIO_MULTIPLIER - temp_cpu_ratio) * ppc_gpu)
/ LB_PPC_CPU_RATIO_MULTIPLIER;
const float c = (float) temp_cpu_ratio / LB_PPC_CPU_RATIO_MULTIPLIER;

if (last_estimated_ppc != 0) {
if (last_estimated_ppc > estimated_ppc) {
// keep direction
} else {
// reverse direction
delta = -delta;
}
temp_cpu_ratio += delta;
}
if (temp_cpu_ratio < 0) temp_cpu_ratio = 0;
if (temp_cpu_ratio > LB_PPC_CPU_RATIO_MULTIPLIER) temp_cpu_ratio = LB_PPC_CPU_RATIO_MULTIPLIER;
last_estimated_ppc = estimated_ppc;

printf("[MEASURE:%d] CPU %'8.0f GPU %'8.0f PPC %'8.0f CPU-Ratio %.3f\n",
ctx->loc.node_id,
ppc_cpu, ppc_gpu, estimated_ppc, c);
rte_atomic64_set(cpu_ratio, temp_cpu_ratio);
}

out_batch = nullptr;
return 0;
}

private:
rte_atomic64_t *cpu_ratio;
double last_estimated_ppc;
int64_t local_cpu_ratio;
int64_t delta;
unsigned rep, rep_limit;
bool offload;
};

EXPORT_ELEMENT(LoadBalancePPC);

}

#endif

// vim: ts=8 sts=4 sw=4 et
7 changes: 2 additions & 5 deletions lib/elementgraph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port)
task->completion_queue = ctx->task_completion_queue;
task->completion_watcher = ctx->task_completion_watcher;
task->elemgraph = this;
task->offload_start = rdtscp();
task->local_dev_idx = dev_idx;
//task->device = ctx->offload_devices->at(dev_idx);
//assert(task->device != nullptr);
Expand Down Expand Up @@ -244,9 +245,7 @@ void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port)
)//|| (ctx->io_ctx->mode == IO_EMUL && !ctx->stop_task_batching))
{
//printf("avg task completion time: %.6f sec\n", ctx->inspector->avg_task_completion_sec[dev_idx]);

offloadable->tasks[dev_idx] = nullptr; // Let the element be able to take next pkts/batches.
task->offload_start = rdtscp();
ready_tasks[dev_idx].push_back(task);
#ifdef USE_NVPROF
nvtxRangePop();
Expand All @@ -256,14 +255,12 @@ void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port)

/* At this point, the batch is already consumed to the task
* or delayed. */

//if (ctx->load_balancer) ctx->load_balancer->is_changed_to_cpu = false;
continue;

} else {
/* If not offloaded, run the element's CPU-version handler. */
batch_disposition = current_elem->_process_batch(input_port, batch);
batch->compute_time += (rdtscp() - now);
batch->compute_time += (rdtscp() - now) / batch->count;
}
} else {
/* If not offloadable, run the element's CPU-version handler. */
Expand Down
3 changes: 2 additions & 1 deletion lib/loadbalancer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public:
uint64_t batch_proc_time;
double pkt_proc_cycles[NBA_MAX_PROCESSOR_TYPES];

const unsigned PPC_HISTORY_SIZES[2] = {128, 2048};
//const unsigned PPC_HISTORY_SIZES[2] = {128, 2048};
const unsigned PPC_HISTORY_SIZES[2] = {128, 512};
};

}
Expand Down

0 comments on commit 5747b12

Please sign in to comment.