diff --git a/include/nba/core/mempool.hh b/include/nba/core/mempool.hh index ff61145..196f97d 100644 --- a/include/nba/core/mempool.hh +++ b/include/nba/core/mempool.hh @@ -16,7 +16,7 @@ namespace nba class MemoryPool { public: - MemoryPool() : max_size_(0), curpos_(0) + MemoryPool() : max_size(0), cur_pos(0) {} virtual ~MemoryPool() {} @@ -25,13 +25,13 @@ public: int _alloc(size_t size, size_t *start_offset) { - if (curpos_ + size > max_size_) + if (cur_pos + size > max_size) return -ENOMEM; /* IMPORTANT: We need to return the position before adding the new size. */ if (start_offset != nullptr) - *start_offset = curpos_; - curpos_ += size; - curpos_ = ALIGN_CEIL(curpos_, CACHE_LINE_SIZE); + *start_offset = cur_pos; + cur_pos += size; + cur_pos = ALIGN_CEIL(cur_pos, CACHE_LINE_SIZE); return 0; } @@ -39,19 +39,19 @@ public: void reset() { - curpos_ = 0; + cur_pos = 0; } - size_t get_alloc_size() + size_t get_alloc_size() const { - return curpos_; + return cur_pos; } - virtual void *get_base_ptr() = 0; + virtual void *get_base_ptr() const = 0; protected: - size_t max_size_; - size_t curpos_; + size_t max_size; + size_t cur_pos; }; } diff --git a/include/nba/element/element.hh b/include/nba/element/element.hh index 49ee468..bdd9644 100644 --- a/include/nba/element/element.hh +++ b/include/nba/element/element.hh @@ -224,16 +224,18 @@ public: }; offload_compute_handlers.insert({{"dummy", ch},}); } - for (int i = 0; i < NBA_MAX_COPROCESSOR_TYPES; i++) - tasks[i] = nullptr; finished_batches.init(MAX_FINBATCH_QLEN, -1, finished_batches_arrbuf); } virtual ~OffloadableElement() {} int get_type() const { return ELEMTYPE_OFFLOADABLE | ELEMTYPE_SCHEDULABLE; } - /** Begins offloading of the given batch. */ + /** Enqueues the given batch for offloading and begins offloading when + * it has sufficient amount of work. */ int offload(ElementGraph *mother, PacketBatch *in_batch, int input_port); + /** Immediately begins offloading of the given (reused) offload-task. */ + int offload(ElementGraph *mother, OffloadTask *reused_offl_task, int input_port); + /** Stores the batches that are returned from offloading. */ int enqueue_batch(PacketBatch *batch); diff --git a/include/nba/element/packetbatch.hh b/include/nba/element/packetbatch.hh index 2a8cd7d..580776f 100644 --- a/include/nba/element/packetbatch.hh +++ b/include/nba/element/packetbatch.hh @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -414,7 +415,7 @@ public: first_idx(-1), last_idx(-1), slot_count(0), #endif datablock_states(nullptr), recv_timestamp(0), - generation(0), batch_id(0), element(nullptr), input_port(0), has_results(false), + generation(0), batch_id(0), #if NBA_BATCHING_SCHEME == NBA_BATCHING_CONTINUOUS has_dropped(false), #endif @@ -474,9 +475,7 @@ public: uint64_t recv_timestamp; uint64_t generation; uint64_t batch_id; - Element* element; - int input_port; - bool has_results; + struct task_tracker tracker; #if NBA_BATCHING_SCHEME == NBA_BATCHING_CONTINUOUS bool has_dropped; #endif diff --git a/include/nba/engines/cuda/compat.hh b/include/nba/engines/cuda/compat.hh index 86b4f86..560cb71 100644 --- a/include/nba/engines/cuda/compat.hh +++ b/include/nba/engines/cuda/compat.hh @@ -16,10 +16,14 @@ struct datablock_kernel_arg { void *buffer_bases_out[NBA_MAX_COPROC_PPDEPTH]; uint32_t item_count_in[NBA_MAX_COPROC_PPDEPTH]; uint32_t item_count_out[NBA_MAX_COPROC_PPDEPTH]; - uint16_t item_size_in; - uint16_t *item_sizes_in[NBA_MAX_COPROC_PPDEPTH]; - uint16_t item_size_out; - uint16_t *item_sizes_out[NBA_MAX_COPROC_PPDEPTH]; + union { + uint16_t item_size_in; + uint16_t *item_sizes_in[NBA_MAX_COPROC_PPDEPTH]; + }; + union { + uint16_t item_size_out; + uint16_t *item_sizes_out[NBA_MAX_COPROC_PPDEPTH]; + }; uint16_t *item_offsets_in[NBA_MAX_COPROC_PPDEPTH]; uint16_t *item_offsets_out[NBA_MAX_COPROC_PPDEPTH]; }; diff --git a/include/nba/engines/cuda/computecontext.hh b/include/nba/engines/cuda/computecontext.hh index 021cc55..1206c42 100644 --- a/include/nba/engines/cuda/computecontext.hh +++ b/include/nba/engines/cuda/computecontext.hh @@ -3,9 +3,11 @@ #include +#include +#include +#include #include #include -#include #include #include #include @@ -25,12 +27,14 @@ private: public: virtual ~CUDAComputeContext(); - int alloc_input_buffer(size_t size, void **host_ptr, memory_t *dev_mem); - int alloc_output_buffer(size_t size, void **host_ptr, memory_t *dev_mem); - void clear_io_buffers(); - void *get_host_input_buffer_base(); - memory_t get_device_input_buffer_base(); - size_t get_total_input_buffer_size(); + io_base_t alloc_io_base(); + int alloc_input_buffer(io_base_t io_base, size_t size, void **host_ptr, memory_t *dev_mem); + int alloc_output_buffer(io_base_t io_base, size_t size, void **host_ptr, memory_t *dev_mem); + void get_input_current_pos(io_base_t io_base, void **host_ptr, memory_t *dev_mem) const; + void get_output_current_pos(io_base_t io_base, void **host_ptr, memory_t *dev_mem) const; + size_t get_input_size(io_base_t io_base) const; + size_t get_output_size(io_base_t io_base) const; + void clear_io_buffers(io_base_t io_base); void clear_kernel_args(); void push_kernel_arg(struct kernel_arg &arg); @@ -83,13 +87,16 @@ private: uint8_t *checkbits_d; uint8_t *checkbits_h; cudaStream_t _stream; - CUDAMemoryPool _cuda_mempool_in; - CUDAMemoryPool _cuda_mempool_out; - CPUMemoryPool _cpu_mempool_in; - CPUMemoryPool _cpu_mempool_out; + CUDAMemoryPool _cuda_mempool_in[NBA_MAX_IO_BASES]; + CUDAMemoryPool _cuda_mempool_out[NBA_MAX_IO_BASES]; + CPUMemoryPool _cpu_mempool_in[NBA_MAX_IO_BASES]; + CPUMemoryPool _cpu_mempool_out[NBA_MAX_IO_BASES]; size_t num_kernel_args; struct kernel_arg kernel_args[CUDA_MAX_KERNEL_ARGS]; + + FixedRing io_base_ring; + unsigned io_base_ring_buf[NBA_MAX_IO_BASES]; }; } diff --git a/include/nba/engines/cuda/mempool.hh b/include/nba/engines/cuda/mempool.hh index 6f2b1eb..b5e0073 100644 --- a/include/nba/engines/cuda/mempool.hh +++ b/include/nba/engines/cuda/mempool.hh @@ -12,7 +12,7 @@ namespace nba { class CUDAMemoryPool : public MemoryPool { public: - CUDAMemoryPool() : MemoryPool(), base_(NULL) + CUDAMemoryPool() : MemoryPool(), base(NULL) { } @@ -23,8 +23,8 @@ public: virtual bool init(size_t max_size) { - max_size_ = max_size; - cutilSafeCall(cudaMalloc((void **) &base_, max_size)); + max_size = max_size; + cutilSafeCall(cudaMalloc((void **) &base, max_size)); return true; } @@ -33,29 +33,29 @@ public: size_t offset; int ret = _alloc(size, &offset); if (ret == 0) - return (void *) ((uint8_t *) base_ + (uintptr_t) offset); + return (void *) ((uint8_t *) base + (uintptr_t) offset); return NULL; } void destroy() { - if (base_ != NULL) - cudaFree(base_); + if (base != NULL) + cudaFree(base); } - void *get_base_ptr() + void *get_base_ptr() const { - return base_; + return base; } private: - void *base_; + void *base; }; class CPUMemoryPool : public MemoryPool { public: - CPUMemoryPool(int cuda_flags) : MemoryPool(), base_(NULL), flags_(cuda_flags) + CPUMemoryPool(int cuda_flags = 0) : MemoryPool(), base(NULL), flags(cuda_flags) { } @@ -66,9 +66,17 @@ public: virtual bool init(unsigned long size) { - max_size_ = size; - cutilSafeCall(cudaHostAlloc((void **) &base_, size, - flags_)); + max_size = size; + cutilSafeCall(cudaHostAlloc((void **) &base, size, + this->flags)); + return true; + } + + bool init_with_flags(unsigned long size, int flags) + { + max_size = size; + cutilSafeCall(cudaHostAlloc((void **) &base, size, + flags)); return true; } @@ -77,24 +85,24 @@ public: size_t offset; int ret = _alloc(size, &offset); if (ret == 0) - return (void *) ((uint8_t *) base_ + (uintptr_t) offset); + return (void *) ((uint8_t *) base + (uintptr_t) offset); return NULL; } void destroy() { - if (base_ != NULL) - cudaFreeHost(base_); + if (base != NULL) + cudaFreeHost(base); } - void *get_base_ptr() + void *get_base_ptr() const { - return base_; + return base; } protected: - void *base_; - int flags_; + void *base; + int flags; }; } diff --git a/include/nba/engines/dummy/computecontext.hh b/include/nba/engines/dummy/computecontext.hh index 0ec77af..07d9fc0 100644 --- a/include/nba/engines/dummy/computecontext.hh +++ b/include/nba/engines/dummy/computecontext.hh @@ -3,9 +3,11 @@ #include +#include +#include +#include #include #include -#include #include namespace nba @@ -21,12 +23,14 @@ private: public: virtual ~DummyComputeContext(); - int alloc_input_buffer(size_t size, void **host_ptr, memory_t *dev_mem); - int alloc_output_buffer(size_t size, void **host_ptr, memory_t *dev_mem); - void clear_io_buffers(); - void *get_host_input_buffer_base(); - memory_t get_device_input_buffer_base(); - size_t get_total_input_buffer_size(); + io_base_t alloc_io_base(); + int alloc_input_buffer(io_base_t io_base, size_t size, void **host_ptr, memory_t *dev_mem); + int alloc_output_buffer(io_base_t io_base, size_t size, void **host_ptr, memory_t *dev_mem); + void get_input_current_pos(io_base_t io_base, void **host_ptr, memory_t *dev_mem) const; + void get_output_current_pos(io_base_t io_base, void **host_ptr, memory_t *dev_mem) const; + size_t get_input_size(io_base_t io_base) const; + size_t get_output_size(io_base_t io_base) const; + void clear_io_buffers(io_base_t io_base); void clear_kernel_args() { } void push_kernel_arg(struct kernel_arg &arg) { } @@ -62,10 +66,13 @@ public: } private: - DummyCPUMemoryPool _dev_mempool_in; - DummyCPUMemoryPool _dev_mempool_out; - DummyCPUMemoryPool _cpu_mempool_in; - DummyCPUMemoryPool _cpu_mempool_out; + DummyCPUMemoryPool _dev_mempool_in[NBA_MAX_IO_BASES]; + DummyCPUMemoryPool _dev_mempool_out[NBA_MAX_IO_BASES]; + DummyCPUMemoryPool _cpu_mempool_in[NBA_MAX_IO_BASES]; + DummyCPUMemoryPool _cpu_mempool_out[NBA_MAX_IO_BASES]; + + FixedRing io_base_ring; + unsigned io_base_ring_buf[NBA_MAX_IO_BASES]; }; } diff --git a/include/nba/engines/dummy/mempool.hh b/include/nba/engines/dummy/mempool.hh index cd8ba35..52db1d9 100644 --- a/include/nba/engines/dummy/mempool.hh +++ b/include/nba/engines/dummy/mempool.hh @@ -10,7 +10,7 @@ namespace nba { class DummyCPUMemoryPool : public MemoryPool { public: - DummyCPUMemoryPool() : MemoryPool(), base_(NULL) + DummyCPUMemoryPool() : MemoryPool(), base(NULL) { } @@ -21,8 +21,8 @@ public: virtual bool init(unsigned long size) { - max_size_ = size; - base_ = malloc(size); + max_size = size; + base = malloc(size); return true; } @@ -31,25 +31,25 @@ public: size_t offset; int ret = _alloc(size, &offset); if (ret == 0) - return (void *) ((uint8_t *) base_ + offset); + return (void *) ((uint8_t *) base + offset); return NULL; } void destroy() { - if (base_ != NULL) { - free(base_); - base_ = NULL; + if (base != NULL) { + free(base); + base = NULL; } } - void *get_base_ptr() + void *get_base_ptr() const { - return base_; + return base; } protected: - void *base_; + void *base; }; } diff --git a/include/nba/engines/phi/computecontext.hh b/include/nba/engines/phi/computecontext.hh index 3058940..2edbe9a 100644 --- a/include/nba/engines/phi/computecontext.hh +++ b/include/nba/engines/phi/computecontext.hh @@ -24,12 +24,14 @@ private: public: virtual ~PhiComputeContext(); - int alloc_input_buffer(size_t size, void **host_ptr, memory_t *dev_mem); - int alloc_output_buffer(size_t size, void **host_ptr, memory_t *dev_mem); - void clear_io_buffers(); - void *get_host_input_buffer_base(); - memory_t get_device_input_buffer_base(); - size_t get_total_input_buffer_size(); + io_base_t alloc_io_base(); + int alloc_input_buffer(io_base_t io_base, size_t size, void **host_ptr, memory_t *dev_mem); + int alloc_output_buffer(io_base_t io_base, size_t size, void **host_ptr, memory_t *dev_mem); + void get_input_current_pos(io_base_t io_base, void **host_ptr, memory_t *dev_mem) const; + void get_output_current_pos(io_base_t io_base, void **host_ptr, memory_t *dev_mem) const; + size_t get_input_size(io_base_t io_base) const; + size_t get_output_size(io_base_t io_base) const; + void clear_io_buffers(io_base_t io_base); void clear_kernel_args() { } void push_kernel_arg(struct kernel_arg &arg) { } diff --git a/include/nba/engines/phi/mempool.hh b/include/nba/engines/phi/mempool.hh index c018715..18a846e 100644 --- a/include/nba/engines/phi/mempool.hh +++ b/include/nba/engines/phi/mempool.hh @@ -25,7 +25,7 @@ public: virtual bool init(unsigned long max_size) { - max_size_ = max_size; + max_size = max_size; cl_int err_ret; clbuf = clCreateBuffer(clctx, CL_MEM_HOST_NO_ACCESS | (direction_hint == HOST_TO_DEVICE ? CL_MEM_READ_ONLY : CL_MEM_WRITE_ONLY), @@ -60,7 +60,7 @@ public: clReleaseMemObject(clbuf); } - void *get_base_ptr() + void *get_base_ptr() const { // TODO: convert clbuf to void* assert(false, "not implemented yet"); @@ -85,8 +85,8 @@ public: virtual bool init(size_t max_size) { void *ret = NULL; - max_size_ = max_size; - base_ = (uint8_t*) malloc(max_size); + max_size = max_size; + base = (uint8_t*) malloc(max_size); return ret; } @@ -95,23 +95,23 @@ public: size_t offset; int ret = _alloc(size, &offset); if (ret == 0) - return (void *) ((uint8_t *) base_ + offset); + return (void *) ((uint8_t *) base + offset); return NULL; } virtual void destroy() { - if (base_) - free(base_); + if (base) + free(base); } - void *get_base_ptr() + void *get_base_ptr() const { - return base_; + return base; } private: - void *base_; + void *base; }; } diff --git a/include/nba/framework/computecontext.hh b/include/nba/framework/computecontext.hh index a2ea870..b08b59f 100644 --- a/include/nba/framework/computecontext.hh +++ b/include/nba/framework/computecontext.hh @@ -9,6 +9,9 @@ namespace nba { class OffloadTask; +typedef unsigned io_base_t; +const io_base_t INVALID_IO_BASE = 0xffffffffu; + class ComputeContext { /** @@ -31,12 +34,18 @@ public: } virtual ~ComputeContext() {} - virtual int alloc_input_buffer(size_t size, void **host_ptr, memory_t *dev_mem) = 0; - virtual int alloc_output_buffer(size_t size, void **host_ptr, memory_t *dev_mem) = 0; - virtual void clear_io_buffers() = 0; - virtual void *get_host_input_buffer_base() = 0; - virtual memory_t get_device_input_buffer_base() = 0; - virtual size_t get_total_input_buffer_size() = 0; + virtual io_base_t alloc_io_base() = 0; + virtual int alloc_input_buffer(io_base_t io_base, size_t size, + void **host_ptr, memory_t *dev_mem) = 0; + virtual int alloc_output_buffer(io_base_t io_base, size_t size, + void **host_ptr, memory_t *dev_mem) = 0; + virtual void get_input_current_pos(io_base_t io_base, + void **host_ptr, memory_t *dev_mem) const = 0; + virtual void get_output_current_pos(io_base_t io_base, + void **host_ptr, memory_t *dev_mem) const = 0; + virtual size_t get_input_size(io_base_t io_base) const = 0; + virtual size_t get_output_size(io_base_t io_base) const = 0; + virtual void clear_io_buffers(io_base_t io_base) = 0; virtual void clear_kernel_args() = 0; virtual void push_kernel_arg(struct kernel_arg &arg) = 0; diff --git a/include/nba/framework/config.hh b/include/nba/framework/config.hh index 2d6c4c8..0173dc7 100644 --- a/include/nba/framework/config.hh +++ b/include/nba/framework/config.hh @@ -16,7 +16,7 @@ #define NBA_BATCHING_CONTINUOUS (1) #define NBA_BATCHING_BITVECTOR (2) #define NBA_BATCHING_LINKEDLIST (3) -#define NBA_BATCHING_SCHEME NBA_BATCHING_BITVECTOR +#define NBA_BATCHING_SCHEME NBA_BATCHING_TRADITIONAL #define NBA_MAX_PACKET_SIZE (2048) #ifdef NBA_NO_HUGE @@ -47,6 +47,7 @@ #define NBA_MAX_TASKPOOL_SIZE (2048u) #define NBA_MAX_BATCHPOOL_SIZE (2048u) +#define NBA_MAX_IO_BASES (17) #define NBA_MAX_ANNOTATION_SET_SIZE (7) #define NBA_MAX_NODELOCALSTORAGE_ENTRIES (16) diff --git a/include/nba/framework/datablock.hh b/include/nba/framework/datablock.hh index 5c88494..13fbded 100644 --- a/include/nba/framework/datablock.hh +++ b/include/nba/framework/datablock.hh @@ -105,9 +105,9 @@ struct datablock_tracker { size_t in_count; size_t out_size; size_t out_count; - //struct item_size_info exact_item_sizes; - struct item_size_info aligned_item_sizes; + struct item_size_info *aligned_item_sizes_h; + memory_t aligned_item_sizes_d; }; /* NOTE: The alignment of this struct should match with CUDA. */ @@ -118,10 +118,14 @@ struct datablock_kernel_arg { void *buffer_bases_out[NBA_MAX_COPROC_PPDEPTH]; uint32_t item_count_in[NBA_MAX_COPROC_PPDEPTH]; uint32_t item_count_out[NBA_MAX_COPROC_PPDEPTH]; - uint16_t item_size_in; - uint16_t *item_sizes_in[NBA_MAX_COPROC_PPDEPTH]; - uint16_t item_size_out; - uint16_t *item_sizes_out[NBA_MAX_COPROC_PPDEPTH]; + union { + uint16_t item_size_in; + uint16_t *item_sizes_in[NBA_MAX_COPROC_PPDEPTH]; + }; + union { + uint16_t item_size_out; + uint16_t *item_sizes_out[NBA_MAX_COPROC_PPDEPTH]; + }; uint16_t *item_offsets_in[NBA_MAX_COPROC_PPDEPTH]; uint16_t *item_offsets_out[NBA_MAX_COPROC_PPDEPTH]; }; // __attribute__((aligned(8))); @@ -171,6 +175,9 @@ public: private: int my_id; + + size_t read_buffer_size; + size_t write_buffer_size; }; class MergedDataBlock : DataBlock diff --git a/include/nba/framework/elementgraph.hh b/include/nba/framework/elementgraph.hh index 710c588..3bdae05 100644 --- a/include/nba/framework/elementgraph.hh +++ b/include/nba/framework/elementgraph.hh @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -34,20 +35,18 @@ public: return elements.size(); } - /* Executes the element graph for the given batch and free it after - * processing. Internally it manages a queue to handle diverged paths - * with multiple batches to multipe outputs. - * When it needs to stop processing and wait for asynchronous events - * (e.g., completion of offloading or release of resources), it moves - * the batch to the delayed_batches queue. */ - void run(PacketBatch *batch, Element *start_elem, int input_port = 0); + /* Inserts the given batch/offloadtask to the internal task queue. + * This does not execute the pipeline; call flush_tasks() for that. */ + void enqueue_batch(PacketBatch *batch, Element *start_elem, int input_port = 0); + void enqueue_offload_task(OffloadTask *otask, Element *start_elem, int input_port = 0); + // TODO: merge with flush_tasks() /* Tries to execute all pending offloaded tasks. * This method does not allocate/free any batches. */ void flush_offloaded_tasks(); - /* Tries to run all delayed batches. */ - void flush_delayed_batches(); + /* Tries to run all delayed tasks. */ + void flush_tasks(); /* Scan and execute schedulable elements. */ void scan_offloadable_elements(); @@ -103,25 +102,35 @@ protected: /** * Used to book-keep element objects. */ - FixedRing elements; - FixedRing sched_elements; + FixedRing elements; + FixedRing sched_elements; /** * Used to pass context objects when calling element handlers. */ comp_thread_context *ctx; - FixedRing queue; + FixedRing queue; FixedRing ready_tasks[NBA_MAX_COPROCESSOR_TYPES]; - FixedRing delayed_batches; + //FixedRing delayed_batches; private: + /* Executes the element graph for the given batch and free it after + * processing. Internally it manages a queue to handle diverged paths + * with multiple batches to multipe outputs. + * When it needs to stop processing and wait for asynchronous events + * (e.g., completion of offloading or release of resources), it moves + * the batch to the delayed_batches queue. */ + void process_batch(PacketBatch *batch); + void process_offload_task(OffloadTask *otask); + std::map, int> offl_actions; std::set offl_fin; SchedulableElement *input_elem; friend int io_loop(void *arg); + friend int OffloadableElement::offload(ElementGraph *mother, OffloadTask *otask, int input_port); friend int OffloadableElement::offload(ElementGraph *mother, PacketBatch *in_batch, int input_port); friend void comp_thread_context::build_element_graph(const char *config); diff --git a/include/nba/framework/offloadtask.hh b/include/nba/framework/offloadtask.hh index 8b4da63..b9b0e9c 100644 --- a/include/nba/framework/offloadtask.hh +++ b/include/nba/framework/offloadtask.hh @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -66,11 +67,13 @@ public: enum TaskStates state; /* Initialized by element graph. */ + struct task_tracker tracker; int local_dev_idx; struct ev_loop *src_loop; comp_thread_context *comp_ctx; coproc_thread_context *coproc_ctx; ComputeContext *cctx; + io_base_t io_base; ElementGraph *elemgraph; FixedArray batches; FixedArray input_ports; @@ -86,10 +89,18 @@ public: private: uint64_t task_id; // for deubgging friend class OffloadableElement; + + void *host_write_begin; + void *host_read_begin; + memory_t dev_write_begin; + memory_t dev_read_begin; + size_t input_alloc_size_begin; + size_t output_alloc_size_begin; + }; } -#endif +#endif /* __NBA_OFFLOADTASK_HH__ */ // vim: ts=8 sts=4 sw=4 et foldmethod=marker diff --git a/include/nba/framework/task.hh b/include/nba/framework/task.hh new file mode 100644 index 0000000..1220a7c --- /dev/null +++ b/include/nba/framework/task.hh @@ -0,0 +1,69 @@ +#ifndef __NBA_TASK_HH__ +#define __NBA_TASK_HH__ + +#include + +namespace nba { + +enum TaskTypes : unsigned { + TASK_SINGLE_BATCH = 0, + TASK_OFFLOAD = 1 +}; + +class Element; +class PacketBatch; +class OffloadTask; + +struct task_tracker { + Element* element; + int input_port; + bool has_results; +}; + +namespace Task { + /* We use a 64-bit integer to represent a task item. + * The first 16 bits indicates the task type and + * the last 48 bits indicates the pointer address + * to raw data types. */ + static const uintptr_t TYPE_MASK = 0xffff000000000000u; + static const uintptr_t PTR_MASK = 0x0000ffffffffffffu; + static const uintptr_t PTR_SIZE = 48u; + + static inline void *to_task(PacketBatch *batch) + { + uintptr_t p = ((uintptr_t) TASK_SINGLE_BATCH << PTR_SIZE) + | ((uintptr_t) batch & PTR_MASK); + return (void *) p; + } + + static inline void *to_task(OffloadTask *otask) + { + uintptr_t p = ((uintptr_t) TASK_OFFLOAD << PTR_SIZE) + | ((uintptr_t) otask & PTR_MASK); + return (void *) p; + } + + static inline unsigned get_task_type(void *qitem) + { + return (unsigned) ((((uintptr_t) qitem) & TYPE_MASK) >> PTR_SIZE); + } + + static inline PacketBatch *to_packet_batch(void *qitem) + { + uintptr_t p = ((uintptr_t) qitem) & PTR_MASK; + return (PacketBatch *) p; + } + + static inline OffloadTask *to_offload_task(void *qitem) + { + uintptr_t p = ((uintptr_t) qitem) & PTR_MASK; + return (OffloadTask *) p; + } + +} /* endns(nba::Task) */ + +} /* endns(nba) */ + +#endif /* __NBA_TASK_HH__ */ + +// vim: ts=8 sts=4 sw=4 et foldmethod=marker diff --git a/src/engines/cuda/computecontext.cc b/src/engines/cuda/computecontext.cc index a4b5d94..233cac2 100644 --- a/src/engines/cuda/computecontext.cc +++ b/src/engines/cuda/computecontext.cc @@ -12,18 +12,20 @@ struct cuda_event_context { CUDAComputeContext::CUDAComputeContext(unsigned ctx_id, ComputeDevice *mother_device) : ComputeContext(ctx_id, mother_device), checkbits_d(NULL), checkbits_h(NULL), - _cuda_mempool_in(), _cuda_mempool_out(), - _cpu_mempool_in(cudaHostAllocPortable), _cpu_mempool_out(cudaHostAllocPortable), num_kernel_args(0) /* NOTE: Write-combined memory degrades performance to half... */ { type_name = "cuda"; - size_t mem_size = 32 * 1024 * 1024; // TODO: read from config + size_t io_base_size = 5 * 1024 * 1024; // TODO: read from config cutilSafeCall(cudaStreamCreateWithFlags(&_stream, cudaStreamNonBlocking)); - _cuda_mempool_in.init(mem_size); - _cuda_mempool_out.init(mem_size); - _cpu_mempool_in.init(mem_size); - _cpu_mempool_out.init(mem_size); + io_base_ring.init(NBA_MAX_IO_BASES, node_id, io_base_ring_buf); + for (unsigned i = 0; i < NBA_MAX_IO_BASES; i++) { + io_base_ring.push_back(i); + _cuda_mempool_in[i].init(io_base_size); + _cuda_mempool_out[i].init(io_base_size); + _cpu_mempool_in[i].init_with_flags(io_base_size, cudaHostAllocPortable); + _cpu_mempool_out[i].init_with_flags(io_base_size, cudaHostAllocPortable); + } cutilSafeCall(cudaHostAlloc((void **) &checkbits_h, MAX_BLOCKS, cudaHostAllocMapped)); cutilSafeCall(cudaHostGetDevicePointer((void **) &checkbits_d, checkbits_h, 0)); assert(checkbits_h != NULL); @@ -34,55 +36,77 @@ CUDAComputeContext::CUDAComputeContext(unsigned ctx_id, ComputeDevice *mother_de CUDAComputeContext::~CUDAComputeContext() { cutilSafeCall(cudaStreamDestroy(_stream)); - _cuda_mempool_in.destroy(); - _cuda_mempool_out.destroy(); - _cpu_mempool_in.destroy(); - _cpu_mempool_out.destroy(); + for (unsigned i = 0; i < NBA_MAX_IO_BASES; i++) { + _cuda_mempool_in[i].destroy(); + _cuda_mempool_out[i].destroy(); + _cpu_mempool_in[i].destroy(); + _cpu_mempool_out[i].destroy(); + } cutilSafeCall(cudaFreeHost(checkbits_h)); } -int CUDAComputeContext::alloc_input_buffer(size_t size, void **host_ptr, memory_t *dev_mem) +io_base_t CUDAComputeContext::alloc_io_base() { - *host_ptr = _cpu_mempool_in.alloc(size); - assert(*host_ptr != nullptr); - dev_mem->ptr = _cuda_mempool_in.alloc(size); - assert(dev_mem->ptr != nullptr); - return 0; + if (io_base_ring.empty()) return INVALID_IO_BASE; + unsigned i = io_base_ring.front(); + io_base_ring.pop_front(); + return (io_base_t) i; } -int CUDAComputeContext::alloc_output_buffer(size_t size, void **host_ptr, memory_t *dev_mem) +void CUDAComputeContext::get_input_current_pos(io_base_t io_base, void **host_ptr, memory_t *dev_mem) const { - *host_ptr = _cpu_mempool_out.alloc(size); - assert(*host_ptr != nullptr); - dev_mem->ptr = _cuda_mempool_out.alloc(size); - assert(dev_mem->ptr != nullptr); - return 0; + unsigned i = io_base; + *host_ptr = (char*)_cpu_mempool_in[i].get_base_ptr() + (uintptr_t)_cpu_mempool_in[i].get_alloc_size(); + dev_mem->ptr = (char*)_cuda_mempool_in[i].get_base_ptr() + (uintptr_t)_cuda_mempool_in[i].get_alloc_size(); +} + +void CUDAComputeContext::get_output_current_pos(io_base_t io_base, void **host_ptr, memory_t *dev_mem) const +{ + unsigned i = io_base; + *host_ptr = (char*)_cpu_mempool_out[i].get_base_ptr() + (uintptr_t)_cpu_mempool_out[i].get_alloc_size(); + dev_mem->ptr = (char*)_cuda_mempool_out[i].get_base_ptr() + (uintptr_t)_cuda_mempool_out[i].get_alloc_size(); } -void CUDAComputeContext::clear_io_buffers() +size_t CUDAComputeContext::get_input_size(io_base_t io_base) const { - _cpu_mempool_in.reset(); - _cpu_mempool_out.reset(); - _cuda_mempool_in.reset(); - _cuda_mempool_out.reset(); + unsigned i = io_base; + return _cpu_mempool_in[i].get_alloc_size(); } -void *CUDAComputeContext::get_host_input_buffer_base() +size_t CUDAComputeContext::get_output_size(io_base_t io_base) const { - return _cpu_mempool_in.get_base_ptr(); + unsigned i = io_base; + return _cpu_mempool_in[i].get_alloc_size(); +} + +int CUDAComputeContext::alloc_input_buffer(io_base_t io_base, size_t size, void **host_ptr, memory_t *dev_mem) +{ + unsigned i = io_base; + *host_ptr = _cpu_mempool_in[i].alloc(size); + assert(*host_ptr != nullptr); + dev_mem->ptr = _cuda_mempool_in[i].alloc(size); + assert(dev_mem->ptr != nullptr); + return 0; } -memory_t CUDAComputeContext::get_device_input_buffer_base() +int CUDAComputeContext::alloc_output_buffer(io_base_t io_base, size_t size, void **host_ptr, memory_t *dev_mem) { - memory_t ret; - ret.ptr = _cuda_mempool_in.get_base_ptr(); - return ret; + unsigned i = io_base; + *host_ptr = _cpu_mempool_out[i].alloc(size); + assert(*host_ptr != nullptr); + dev_mem->ptr = _cuda_mempool_out[i].alloc(size); + assert(dev_mem->ptr != nullptr); + return 0; } -size_t CUDAComputeContext::get_total_input_buffer_size() +void CUDAComputeContext::clear_io_buffers(io_base_t io_base) { - assert(_cpu_mempool_in.get_alloc_size() == _cuda_mempool_in.get_alloc_size()); - return _cpu_mempool_in.get_alloc_size(); + unsigned i = io_base; + _cpu_mempool_in[i].reset(); + _cpu_mempool_out[i].reset(); + _cuda_mempool_in[i].reset(); + _cuda_mempool_out[i].reset(); + io_base_ring.push_back(i); } int CUDAComputeContext::enqueue_memwrite_op(void *host_buf, memory_t dev_buf, size_t offset, size_t size) diff --git a/src/engines/dummy/computecontext.cc b/src/engines/dummy/computecontext.cc index 38a8fad..05fd6a2 100644 --- a/src/engines/dummy/computecontext.cc +++ b/src/engines/dummy/computecontext.cc @@ -8,58 +8,89 @@ DummyComputeContext::DummyComputeContext(unsigned ctx_id, ComputeDevice *mother_ : ComputeContext(ctx_id, mother_device) { type_name = "dummy"; - size_t mem_size = 8 * 1024 * 1024; // TODO: read from config - _dev_mempool_in.init(mem_size); - _dev_mempool_out.init(mem_size); - _cpu_mempool_in.init(mem_size); - _cpu_mempool_out.init(mem_size); + size_t io_base_size = 5 * 1024 * 1024; // TODO: read from config + io_base_ring.init(NBA_MAX_IO_BASES, node_id, io_base_ring_buf); + for (unsigned i = 0; i < NBA_MAX_IO_BASES; i++) { + _dev_mempool_in[i].init(io_base_size); + _dev_mempool_out[i].init(io_base_size); + _cpu_mempool_in[i].init(io_base_size); + _cpu_mempool_out[i].init(io_base_size); + } } DummyComputeContext::~DummyComputeContext() { - _dev_mempool_in.destroy(); - _dev_mempool_in.destroy(); - _cpu_mempool_out.destroy(); - _cpu_mempool_out.destroy(); + for (unsigned i = 0; i < NBA_MAX_IO_BASES; i++) { + _dev_mempool_in[i].destroy(); + _dev_mempool_out[i].destroy(); + _cpu_mempool_in[i].destroy(); + _cpu_mempool_out[i].destroy(); + } } -int DummyComputeContext::alloc_input_buffer(size_t size, void **host_ptr, memory_t *dev_mem) +io_base_t DummyComputeContext::alloc_io_base() { - *host_ptr = _cpu_mempool_in.alloc(size); - dev_mem->ptr = _dev_mempool_in.alloc(size); - return 0; + if (io_base_ring.empty()) return INVALID_IO_BASE; + unsigned i = io_base_ring.front(); + io_base_ring.pop_front(); + return (io_base_t) i; } -int DummyComputeContext::alloc_output_buffer(size_t size, void **host_ptr, memory_t *dev_mem) + +void DummyComputeContext::get_input_current_pos(io_base_t io_base, void **host_ptr, memory_t *dev_mem) const { - *host_ptr = _cpu_mempool_out.alloc(size); - dev_mem->ptr = _dev_mempool_out.alloc(size); - return 0; + unsigned i = io_base; + *host_ptr = (char*)_cpu_mempool_in[i].get_base_ptr() + (uintptr_t)_cpu_mempool_in[i].get_alloc_size(); + dev_mem->ptr = (char*)_dev_mempool_in[i].get_base_ptr() + (uintptr_t)_dev_mempool_in[i].get_alloc_size(); +} + +void DummyComputeContext::get_output_current_pos(io_base_t io_base, void **host_ptr, memory_t *dev_mem) const +{ + unsigned i = io_base; + *host_ptr = (char*)_cpu_mempool_out[i].get_base_ptr() + (uintptr_t)_cpu_mempool_out[i].get_alloc_size(); + dev_mem->ptr = (char*)_dev_mempool_out[i].get_base_ptr() + (uintptr_t)_dev_mempool_out[i].get_alloc_size(); } -void DummyComputeContext::clear_io_buffers() +size_t DummyComputeContext::get_input_size(io_base_t io_base) const { - _cpu_mempool_in.reset(); - _cpu_mempool_out.reset(); - _dev_mempool_in.reset(); - _dev_mempool_out.reset(); + unsigned i = io_base; + return _cpu_mempool_in[i].get_alloc_size(); } -void *DummyComputeContext::get_host_input_buffer_base() +size_t DummyComputeContext::get_output_size(io_base_t io_base) const { - return _cpu_mempool_in.get_base_ptr(); + unsigned i = io_base; + return _cpu_mempool_in[i].get_alloc_size(); } -memory_t DummyComputeContext::get_device_input_buffer_base() +int DummyComputeContext::alloc_input_buffer(io_base_t io_base, size_t size, void **host_ptr, memory_t *dev_mem) { - memory_t ret; - ret.ptr = _dev_mempool_in.get_base_ptr(); - return ret; + unsigned i = io_base; + *host_ptr = _cpu_mempool_in[i].alloc(size); + assert(*host_ptr != nullptr); + dev_mem->ptr = _dev_mempool_in[i].alloc(size); + assert(dev_mem->ptr != nullptr); + return 0; +} + +int DummyComputeContext::alloc_output_buffer(io_base_t io_base, size_t size, void **host_ptr, memory_t *dev_mem) +{ + unsigned i = io_base; + *host_ptr = _cpu_mempool_out[i].alloc(size); + assert(*host_ptr != nullptr); + dev_mem->ptr = _dev_mempool_out[i].alloc(size); + assert(dev_mem->ptr != nullptr); + return 0; } -size_t DummyComputeContext::get_total_input_buffer_size() +void DummyComputeContext::clear_io_buffers(io_base_t io_base) { - return _cpu_mempool_in.get_alloc_size(); + unsigned i = io_base; + _cpu_mempool_in[i].reset(); + _cpu_mempool_out[i].reset(); + _dev_mempool_in[i].reset(); + _dev_mempool_out[i].reset(); + io_base_ring.push_back(i); } int DummyComputeContext::enqueue_memwrite_op(void *host_buf, memory_t dev_buf, size_t offset, size_t size) diff --git a/src/lib/computation.cc b/src/lib/computation.cc index c8853d3..aaf2953 100644 --- a/src/lib/computation.cc +++ b/src/lib/computation.cc @@ -95,7 +95,7 @@ void comp_thread_context::stop_rx() void comp_thread_context::resume_rx() { /* Flush the processing pipeline. */ - elem_graph->flush_delayed_batches(); + elem_graph->flush_tasks(); /* Reactivate the packet RX event. */ ev_async_start(loop, rx_watcher); diff --git a/src/lib/datablock.cc b/src/lib/datablock.cc index 89b7048..515c29b 100644 --- a/src/lib/datablock.cc +++ b/src/lib/datablock.cc @@ -43,7 +43,8 @@ void declare_datablock_impl(const char *name, datablock_constructor ctor, int &y tuple DataBlock::calc_read_buffer_size(PacketBatch *batch) { - size_t read_buffer_size = 0, num_read_items = 0; + read_buffer_size = 0; + size_t num_read_items = 0; size_t accum_idx = 0; struct read_roi_info read_roi; this->get_read_roi(&read_roi); @@ -62,8 +63,8 @@ tuple DataBlock::calc_read_buffer_size(PacketBatch *batch) num_read_items = batch->count; size_t align = (read_roi.align == 0) ? 2 : read_roi.align; unsigned aligned_len = RTE_ALIGN_CEIL(read_roi.length, align); - t->aligned_item_sizes.size = aligned_len; - read_buffer_size = aligned_len * num_read_items; + t->aligned_item_sizes_h->size = aligned_len; + read_buffer_size = aligned_len * num_read_items; break; } case READ_WHOLE_PACKET: { @@ -78,17 +79,15 @@ tuple DataBlock::calc_read_buffer_size(PacketBatch *batch) #if (NBA_BATCHING_SCHEME == NBA_BATCHING_TRADITIONAL) \ || (NBA_BATCHING_SCHEME == NBA_BATCHING_BITVECTOR) if (IS_PACKET_INVALID(batch, pkt_idx)) { - t->aligned_item_sizes.offsets[pkt_idx] = 0; - //t->exact_item_sizes.sizes[pkt_idx] = 0; - t->aligned_item_sizes.sizes[pkt_idx] = 0; + t->aligned_item_sizes_h->offsets[pkt_idx] = 0; + t->aligned_item_sizes_h->sizes[pkt_idx] = 0; } else { #endif unsigned exact_len = rte_pktmbuf_data_len(batch->packets[pkt_idx]) - read_roi.offset + read_roi.length + read_roi.size_delta; unsigned aligned_len = RTE_ALIGN_CEIL(exact_len, align); - t->aligned_item_sizes.offsets[pkt_idx] = read_buffer_size; - //t->exact_item_sizes.sizes[pkt_idx] = exact_len; - t->aligned_item_sizes.sizes[pkt_idx] = aligned_len; + t->aligned_item_sizes_h->offsets[pkt_idx] = read_buffer_size; + t->aligned_item_sizes_h->sizes[pkt_idx] = aligned_len; read_buffer_size += aligned_len; #if (NBA_BATCHING_SCHEME == NBA_BATCHING_TRADITIONAL) \ || (NBA_BATCHING_SCHEME == NBA_BATCHING_BITVECTOR) @@ -121,7 +120,8 @@ tuple DataBlock::calc_read_buffer_size(PacketBatch *batch) tuple DataBlock::calc_write_buffer_size(PacketBatch *batch) { - size_t write_buffer_size = 0, num_write_items = 0; + write_buffer_size = 0; + size_t num_write_items = 0; struct read_roi_info read_roi; struct write_roi_info write_roi; this->get_read_roi(&read_roi); @@ -132,26 +132,20 @@ tuple DataBlock::calc_write_buffer_size(PacketBatch *batch) struct datablock_tracker *t = &batch->datablock_states[this->get_id()]; switch (write_roi.type) { - case WRITE_PARTIAL_PACKET: + case WRITE_PARTIAL_PACKET: { + + size_t align = (write_roi.align == 0) ? 2 : write_roi.align; + unsigned aligned_len = RTE_ALIGN_CEIL(write_roi.length, align); + num_write_items = batch->count; + write_buffer_size = aligned_len * num_write_items; + + break; } case WRITE_WHOLE_PACKET: { - num_write_items = batch->count; // FIXME: We currently assume same-as-input when read_roi.type is same. - assert((read_roi.type == READ_PARTIAL_PACKET - && write_roi.type == WRITE_PARTIAL_PACKET) || - (read_roi.type == READ_WHOLE_PACKET - && write_roi.type == WRITE_WHOLE_PACKET)); - //if (read_roi.type == READ_PARTIAL_PACKET - // && write_roi.type == WRITE_PARTIAL_PACKET) { - // //write_item_sizes.size = aligned_item_sizes.size; - //} else if (read_roi.type == READ_WHOLE_PACKET - // && write_roi.type == WRITE_WHOLE_PACKET) { - // //rte_memcpy(&write_item_sizes, &aligned_item_sizes, - // // sizeof(struct item_size_info)); - //} else { - // assert(0); // Not implemented yet! - //} - write_buffer_size = write_roi.length * num_write_items; + assert(read_roi.type == READ_WHOLE_PACKET && write_roi.type == WRITE_WHOLE_PACKET); + num_write_items = batch->count; + write_buffer_size = read_buffer_size; break; } case WRITE_FIXED_SEGMENTS: { @@ -187,8 +181,8 @@ void DataBlock::preprocess(PacketBatch *batch, void *host_in_buffer) { case READ_PARTIAL_PACKET: { void *invalid_value = this->get_invalid_value(); FOR_EACH_PACKET_ALL_PREFETCH(batch, 4u) { - size_t aligned_elemsz = t->aligned_item_sizes.size; - size_t offset = t->aligned_item_sizes.size * pkt_idx; + size_t aligned_elemsz = t->aligned_item_sizes_h->size; + size_t offset = t->aligned_item_sizes_h->size * pkt_idx; if (IS_PACKET_INVALID(batch, pkt_idx)) { if (invalid_value != nullptr) { rte_memcpy((char *) host_in_buffer + offset, invalid_value, aligned_elemsz); @@ -207,8 +201,8 @@ void DataBlock::preprocess(PacketBatch *batch, void *host_in_buffer) { FOR_EACH_PACKET_ALL_PREFETCH(batch, 4u) { if (IS_PACKET_INVALID(batch, pkt_idx)) continue; - size_t aligned_elemsz = t->aligned_item_sizes.sizes[pkt_idx]; - size_t offset = t->aligned_item_sizes.offsets[pkt_idx]; + size_t aligned_elemsz = t->aligned_item_sizes_h->sizes[pkt_idx]; + size_t offset = t->aligned_item_sizes_h->offsets[pkt_idx]; rte_memcpy((char*) host_in_buffer + offset, rte_pktmbuf_mtod(batch->packets[pkt_idx], char*) + read_roi.offset, aligned_elemsz); @@ -252,11 +246,11 @@ void DataBlock::postprocess(OffloadableElement *elem, int input_port, PacketBatc #endif FOR_EACH_PACKET(batch) { size_t elemsz = bitselect(write_roi.type == WRITE_PARTIAL_PACKET, - t->aligned_item_sizes.size, - t->aligned_item_sizes.sizes[pkt_idx]); + t->aligned_item_sizes_h->size, + t->aligned_item_sizes_h->sizes[pkt_idx]); size_t offset = bitselect(write_roi.type == WRITE_PARTIAL_PACKET, - t->aligned_item_sizes.size * pkt_idx, - t->aligned_item_sizes.offsets[pkt_idx]); + t->aligned_item_sizes_h->size * pkt_idx, + t->aligned_item_sizes_h->offsets[pkt_idx]); rte_memcpy(rte_pktmbuf_mtod(batch->packets[pkt_idx], char*) + write_roi.offset, (char*) host_out_ptr + offset, elemsz); @@ -268,7 +262,7 @@ void DataBlock::postprocess(OffloadableElement *elem, int input_port, PacketBatc if (batch->has_dropped) batch->collect_excluded_packets(); #endif - batch->has_results = true; + batch->tracker.has_results = true; break; } case WRITE_USER_POSTPROC: @@ -282,7 +276,7 @@ void DataBlock::postprocess(OffloadableElement *elem, int input_port, PacketBatc batch->has_dropped = false; #endif FOR_EACH_PACKET(batch) { - uintptr_t elemsz = t->aligned_item_sizes.size; + uintptr_t elemsz = write_roi.length; uintptr_t offset = elemsz * pkt_idx; Packet *pkt = Packet::from_base(batch->packets[pkt_idx]); pkt->bidx = pkt_idx; @@ -292,7 +286,7 @@ void DataBlock::postprocess(OffloadableElement *elem, int input_port, PacketBatc if (batch->has_dropped) batch->collect_excluded_packets(); #endif - batch->has_results = true; + batch->tracker.has_results = true; break; } case WRITE_NONE: { diff --git a/src/lib/element.cc b/src/lib/element.cc index 850cc5f..37b5c64 100644 --- a/src/lib/element.cc +++ b/src/lib/element.cc @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -22,7 +23,7 @@ using namespace std; using namespace nba; - + static uint64_t task_id = 0; Element::Element() : next_elems(), next_connected_inputs() @@ -54,7 +55,7 @@ int Element::_process_batch(int input_port, PacketBatch *batch) if (batch->has_dropped) batch->collect_excluded_packets(); #endif - batch->has_results = true; + batch->tracker.has_results = true; return 0; // this value will be ignored. } @@ -104,14 +105,14 @@ int VectorElement::_process_batch(int input_port, PacketBatch *batch) if (batch->has_dropped) batch->collect_excluded_packets(); #endif - batch->has_results = true; + batch->tracker.has_results = true; return 0; } int PerBatchElement::_process_batch(int input_port, PacketBatch *batch) { int ret = this->process_batch(input_port, batch); - batch->has_results = true; + batch->tracker.has_results = true; return ret; } @@ -175,50 +176,66 @@ int Element::configure(comp_thread_context *ctx, vector &args) { return 0; } -int OffloadableElement::offload(ElementGraph *mother, PacketBatch *in_batch, int input_port) +int OffloadableElement::offload(ElementGraph *mother, OffloadTask *otask, int input_port) +{ + int dev_idx = 0; + uint64_t now = rte_rdtsc(); + otask->state = TASK_INITIALIZING; + otask->task_id = task_id ++; + otask->offload_start = now; + otask->state = TASK_INITIALIZED; + mother->ready_tasks[dev_idx].push_back(otask); + /* This should always succeed. */ + return 0; +} + +int OffloadableElement::offload(ElementGraph *mother, PacketBatch *batch, int input_port) { int dev_idx = 0; - OffloadTask *task = nullptr; + OffloadTask *otask = nullptr; + /* Create a new OffloadTask or accumulate to pending OffloadTask. */ if (tasks[dev_idx] == nullptr) { #ifdef USE_NVPROF nvtxRangePush("accum_batch"); #endif /* We assume: task pool size >= task input queue length */ - int ret = rte_mempool_get(ctx->task_pool, (void **) &task); + int ret = rte_mempool_get(ctx->task_pool, (void **) &otask); if (ret == -ENOENT) { //if (!ctx->io_ctx->loop_broken) // ev_run(ctx->io_ctx->loop, EVRUN_NOWAIT); /* Keep the current batch for later processing. */ return -1; } - new (task) OffloadTask(); - task->state = TASK_INITIALIZING; - task->task_id = task_id ++; - task->src_loop = ctx->loop; - task->comp_ctx = ctx; - task->completion_queue = ctx->task_completion_queue; - task->completion_watcher = ctx->task_completion_watcher; - task->elemgraph = mother; - task->local_dev_idx = dev_idx; - //task->device = ctx->offload_devices->at(dev_idx); - //assert(task->device != nullptr); - task->elem = this; - - tasks[dev_idx] = task; + new (otask) OffloadTask(); + otask->tracker.element = this; + otask->tracker.input_port = input_port; + otask->tracker.has_results = false; + otask->state = TASK_INITIALIZING; + otask->task_id = task_id ++; + otask->src_loop = ctx->loop; + otask->comp_ctx = ctx; + otask->completion_queue = ctx->task_completion_queue; + otask->completion_watcher = ctx->task_completion_watcher; + otask->elemgraph = mother; + otask->local_dev_idx = dev_idx; + //otask->device = ctx->offload_devices->at(dev_idx); + //assert(otask->device != nullptr); + otask->elem = this; + tasks[dev_idx] = otask; } else - task = tasks[dev_idx]; - assert(task != nullptr); + otask = tasks[dev_idx]; /* Add the current batch if the task batch is not full. */ - if (task->batches.size() < ctx->num_coproc_ppdepth) { - task->batches.push_back(in_batch); - task->input_ports.push_back(input_port); + if (otask->batches.size() < ctx->num_coproc_ppdepth) { + otask->batches.push_back(batch); + otask->input_ports.push_back(input_port); #ifdef USE_NVPROF nvtxMarkA("add_batch"); #endif } else { return -1; } + assert(otask != nullptr); /* Start offloading when one or more following conditions are met: * 1. The task batch size has reached the limit (num_coproc_ppdepth). @@ -228,21 +245,21 @@ int OffloadableElement::offload(ElementGraph *mother, PacketBatch *in_batch, int * 3. The time elapsed since the first packet is greater than * 10 x avg.task completion time. */ - assert(task->batches.size() > 0); + assert(otask->batches.size() > 0); uint64_t now = rte_rdtsc(); - if (task->batches.size() == ctx->num_coproc_ppdepth - // || (task->num_bytes >= 64 * ctx->num_coproc_ppdepth * ctx->io_ctx->num_iobatch_size) - // || (task->batches.size() > 1 && (rdtsc() - task->offload_start) / (double) rte_get_tsc_hz() > 0.0005) + if (otask->batches.size() == ctx->num_coproc_ppdepth + // || (otask->num_bytes >= 64 * ctx->num_coproc_ppdepth * ctx->io_ctx->num_iobatch_size) + // || (otask->batches.size() > 1 && (rdtsc() - otask->offload_start) / (double) rte_get_tsc_hz() > 0.0005) // || (ctx->io_ctx->mode == IO_EMUL && !ctx->stop_task_batching) ) { - //printf("avg task completion time: %.6f sec\n", ctx->inspector->avg_task_completion_sec[dev_idx]); + //printf("avg otask completion time: %.6f sec\n", ctx->inspector->avg_task_completion_sec[dev_idx]); tasks[dev_idx] = nullptr; // Let the element be able to take next pkts/batches. - task->offload_start = now; + otask->offload_start = now; - task->state = TASK_INITIALIZED; - mother->ready_tasks[dev_idx].push_back(task); + otask->state = TASK_INITIALIZED; + mother->ready_tasks[dev_idx].push_back(otask); #ifdef USE_NVPROF nvtxRangePop(); #endif diff --git a/src/lib/elementgraph.cc b/src/lib/elementgraph.cc index a5a15a7..e973063 100644 --- a/src/lib/elementgraph.cc +++ b/src/lib/elementgraph.cc @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -26,7 +27,7 @@ using namespace nba; ElementGraph::ElementGraph(comp_thread_context *ctx) : elements(128, ctx->loc.node_id), sched_elements(16, ctx->loc.node_id), - queue(2048, ctx->loc.node_id), delayed_batches(2048, ctx->loc.node_id) + queue(2048, ctx->loc.node_id) { this->ctx = ctx; input_elem = nullptr; @@ -57,23 +58,22 @@ void ElementGraph::flush_offloaded_tasks() // TODO: create multiple cctx_list and access them via dev_idx for hetero-device systems. ComputeContext *cctx = ctx->cctx_list.front(); - if (cctx->state == ComputeContext::READY) { + /* Grab a compute context. */ + assert(cctx != nullptr); + #ifdef USE_NVPROF + nvtxRangePush("offl_prepare"); + #endif - /* Grab a compute context. */ - assert(cctx != nullptr); - assert(cctx->state == ComputeContext::READY); - #ifdef USE_NVPROF - nvtxRangePush("offl_prepare"); - #endif - - /* Prepare to offload. */ - cctx->state = ComputeContext::PREPARING; - cctx->currently_running_task = task; - task->cctx = cctx; - - if (task->state < TASK_PREPARED) { - task->cctx = cctx; + /* Prepare to offload. */ + task->cctx = cctx; + if (task->state < TASK_PREPARED) { + bool has_io_base = false; + if (task->io_base == INVALID_IO_BASE) { + task->io_base = cctx->alloc_io_base(); + has_io_base = (task->io_base != INVALID_IO_BASE); + } + if (has_io_base) { /* In the GPU side, datablocks argument has only used * datablocks in the beginning of the array (not sparsely). */ int datablock_ids[NBA_MAX_DATABLOCKS]; @@ -103,8 +103,10 @@ void ElementGraph::flush_offloaded_tasks() task->prepare_read_buffer(); task->prepare_write_buffer(); task->state = TASK_PREPARED; - } + } /* endif(has_io_base) */ + } /* endif(!task.prepared) */ + if (task->state == TASK_PREPARED) { /* Enqueue the offload task. */ int ret = rte_ring_enqueue(ctx->offload_input_queues[dev_idx], (void*) task); if (ret == ENOENT) { @@ -117,37 +119,12 @@ void ElementGraph::flush_offloaded_tasks() #ifdef USE_NVPROF nvtxRangePop(); #endif - } else { - /* Delay the current offloading task and break. */ ready_tasks[dev_idx].push_front(task); break; - - } /* endif(task.prepared) */ - } /* endif(compctx.ready) */ - } -} - -void ElementGraph::flush_delayed_batches() -{ - uint64_t prev_gen = 0; - uint64_t len_delayed_batches = delayed_batches.size(); - print_ratelimit("# delayed batches", len_delayed_batches, 10000); - - while (!delayed_batches.empty() && !ctx->io_ctx->loop_broken) { - PacketBatch *batch = delayed_batches.front(); - delayed_batches.pop_front(); - if (batch->delay_start > 0) { - batch->delay_time += (rdtscp() - batch->delay_start); - batch->delay_start = 0; - } - - /* It must have the associated element where this batch is delayed. */ - assert(batch->element != nullptr); - - /* Re-run the element graph from that element. */ - run(batch, batch->element, batch->input_port); + } + } /* endwhile(ready_tasks) */ } } @@ -176,390 +153,418 @@ void ElementGraph::scan_offloadable_elements() next_batch = nullptr; selem->dispatch(0, next_batch, selem->_last_delay); if (next_batch != nullptr) { - next_batch->has_results = true; - run(next_batch, selem, 0); + next_batch->tracker.has_results = true; + enqueue_batch(next_batch, selem, 0); } } while (next_batch != nullptr); } /* endif(ELEMTYPE_OFFLOADABLE) */ } /* endfor(selems) */ } -void ElementGraph::run(PacketBatch *batch, Element *start_elem, int input_port) +void ElementGraph::enqueue_batch(PacketBatch *batch, Element *start_elem, int input_port) { - Element *el = start_elem; - assert(el != nullptr); - assert(ctx->io_ctx != nullptr); - - batch->element = el; - batch->input_port = input_port; - batch->generation ++; - queue.push_back(batch); - - /* - * We have two cases of batch handling: - * - single output: enqueue the given batch as it is without - * copying or memory pool allocation. - * - multiple outputs: make copy of batches (but not packets) and - * enqueue them as later job. - */ + assert(start_elem != nullptr); + batch->tracker.element = start_elem; + batch->tracker.input_port = input_port; + queue.push_back(Task::to_task(batch)); +} - /* When the queue becomes empty, the processing path started from - * the start_elem is finished. The unit of a job is an element. */ - while (!queue.empty() && !ctx->io_ctx->loop_broken) { - PacketBatch *batch = queue.front(); - queue.pop_front(); +void ElementGraph::enqueue_offload_task(OffloadTask *otask, Element *start_elem, int input_port) +{ + assert(start_elem != nullptr); + otask->tracker.element = start_elem; + otask->tracker.input_port = input_port; + queue.push_back(Task::to_task(otask)); +} - Element *current_elem = batch->element; - int input_port = batch->input_port; - int batch_disposition = CONTINUE_TO_PROCESS; - int64_t lb_decision = anno_get(&batch->banno, NBA_BANNO_LB_DECISION); - uint64_t now = rdtscp(); // The starting timestamp of the current element. - - /* Check if we can and should offload. */ - if (!batch->has_results) { - if (current_elem->get_type() & ELEMTYPE_OFFLOADABLE) { - OffloadableElement *offloadable = dynamic_cast(current_elem); - assert(offloadable != nullptr); - if (lb_decision != -1) { - /* Get or initialize the task object. - * This step is always executed for every input batch - * passing every offloadable element. */ - if (offloadable->offload(this, batch, input_port) != 0) { - /* We have no room for batch in the preparing task. - * Keep the current batch for later processing. */ - assert(batch->delay_start == 0); - batch->delay_start = rte_rdtsc(); - delayed_batches.push_back(batch); - } - /* At this point, the batch is already consumed to the task - * or delayed. */ - continue; - } else { - /* If not offloaded, run the element's CPU-version handler. */ - batch_disposition = current_elem->_process_batch(input_port, batch); - batch->compute_time += (rdtscp() - now) / batch->count; +void ElementGraph::process_batch(PacketBatch *batch) +{ + Element *current_elem = batch->tracker.element; + int input_port = batch->tracker.input_port; + int batch_disposition = CONTINUE_TO_PROCESS; + int64_t lb_decision = anno_get(&batch->banno, NBA_BANNO_LB_DECISION); + uint64_t now = rdtscp(); // The starting timestamp of the current element. + + /* Check if we can and should offload. */ + if (!batch->tracker.has_results) { + /* Since dynamic_cast has runtime overheads, we first use a bitmask + * to check class types. */ + if (current_elem->get_type() & ELEMTYPE_OFFLOADABLE) { + OffloadableElement *offloadable = dynamic_cast(current_elem); + assert(offloadable != nullptr); + if (lb_decision != -1) { + /* Get or initialize the task object. + * This step is always executed for every input batch + * passing every offloadable element. */ + if (offloadable->offload(this, batch, input_port) != 0) { + /* We have no room for batch in the preparing task. + * Keep the current batch for later processing. */ + batch->delay_start = rte_rdtsc(); + queue.push_back(Task::to_task(batch)); } + /* At this point, the batch is already consumed to the task + * or delayed. */ + return; } else { - /* If not offloadable, run the element's CPU-version handler. */ + /* If not offloaded, run the element's CPU-version handler. */ batch_disposition = current_elem->_process_batch(input_port, batch); + batch->compute_time += (rdtscp() - now) / batch->count; } + } else { + /* If not offloadable, run the element's CPU-version handler. */ + batch_disposition = current_elem->_process_batch(input_port, batch); } + } - /* If the element was per-batch and it said it will keep the batch, - * we do not have to perform batch-split operations below. */ - if (batch_disposition == KEPT_BY_ELEMENT) - continue; + /* If the element was per-batch and it said it will keep the batch, + * we do not have to perform batch-split operations below. */ + if (batch_disposition == KEPT_BY_ELEMENT) + return; - /* When offloading is complete, processing of the resultant batches begins here. - * (ref: enqueue_postproc_batch) */ + /* When offloading is complete, processing of the resultant batches begins here. + * (ref: enqueue_postproc_batch) */ - /* Here, we should have the results no matter what happened before. - * If not, drop all packets in the batch. */ - if (!batch->has_results) { - RTE_LOG(DEBUG, ELEM, "elemgraph: dropping a batch with no results\n"); - if (ctx->inspector) ctx->inspector->drop_pkt_count += batch->count; - free_batch(batch); - continue; - } + /* Here, we should have the results no matter what happened before. + * If not, drop all packets in the batch. */ + if (!batch->tracker.has_results) { + RTE_LOG(DEBUG, ELEM, "elemgraph: dropping a batch with no results\n"); + if (ctx->inspector) ctx->inspector->drop_pkt_count += batch->count; + free_batch(batch); + return; + } - //assert(current_elem->num_max_outputs <= num_max_outputs || current_elem->num_max_outputs == -1); - size_t num_outputs = current_elem->next_elems.size(); + //assert(current_elem->num_max_outputs <= num_max_outputs || current_elem->num_max_outputs == -1); + size_t num_outputs = current_elem->next_elems.size(); - if (num_outputs == 0) { + if (num_outputs == 0) { - /* If no outputs are connected, drop all packets. */ - if (ctx->inspector) ctx->inspector->drop_pkt_count += batch->count; - free_batch(batch); - continue; + /* If no outputs are connected, drop all packets. */ + if (ctx->inspector) ctx->inspector->drop_pkt_count += batch->count; + free_batch(batch); - } else if (num_outputs == 1) { + } else if (num_outputs == 1) { - /* With the single output, we don't need to allocate new - * batches. Just reuse the given one. */ - if (0 == (current_elem->get_type() & ELEMTYPE_PER_BATCH)) { - const int *const results = batch->results; - #if NBA_BATCHING_SCHEME == NBA_BATCHING_CONTINUOUS - batch->has_dropped = false; - #endif - FOR_EACH_PACKET(batch) { - int o = results[pkt_idx]; - switch (o) { - case 0: - // pass - break; - #if NBA_BATCHING_SCHEME != NBA_BATCHING_CONTINUOUS - case DROP: - rte_ring_enqueue(ctx->io_ctx->drop_queue, batch->packets[pkt_idx]); - EXCLUDE_PACKET(batch, pkt_idx); - break; - #endif - case PENDING: - // remove from PacketBatch, but don't free.. - // They are stored in io_thread_ctx::pended_pkt_queue. - EXCLUDE_PACKET(batch, pkt_idx); - break; - case SLOWPATH: - rte_panic("SLOWPATH is not supported yet. (element: %s)\n", current_elem->class_name()); - break; - } - } END_FOR; - #if NBA_BATCHING_SCHEME == NBA_BATCHING_CONTINUOUS - if (batch->has_dropped) - batch->collect_excluded_packets(); - if (ctx->inspector) ctx->inspector->drop_pkt_count += batch->drop_count; - batch->clean_drops(ctx->io_ctx->drop_queue); + /* With the single output, we don't need to allocate new + * batches. Just reuse the given one. */ + if (0 == (current_elem->get_type() & ELEMTYPE_PER_BATCH)) { + const int *const results = batch->results; + #if NBA_BATCHING_SCHEME == NBA_BATCHING_CONTINUOUS + batch->has_dropped = false; + #endif + FOR_EACH_PACKET(batch) { + int o = results[pkt_idx]; + switch (o) { + case 0: + // pass + break; + #if NBA_BATCHING_SCHEME != NBA_BATCHING_CONTINUOUS + case DROP: + rte_ring_enqueue(ctx->io_ctx->drop_queue, batch->packets[pkt_idx]); + EXCLUDE_PACKET(batch, pkt_idx); + break; #endif - } - if (current_elem->next_elems[0]->get_type() & ELEMTYPE_OUTPUT) { - /* We are at the end leaf of the pipeline. - * Inidicate free of the original batch. */ - if (ctx->inspector) { - ctx->inspector->tx_batch_count ++;; - ctx->inspector->tx_pkt_count += batch->count; + case PENDING: + // remove from PacketBatch, but don't free.. + // They are stored in io_thread_ctx::pended_pkt_queue. + EXCLUDE_PACKET(batch, pkt_idx); + break; + case SLOWPATH: + rte_panic("SLOWPATH is not supported yet. (element: %s)\n", current_elem->class_name()); + break; } - io_tx_batch(ctx->io_ctx, batch); - free_batch(batch, false); - continue; - } else { - /* Recurse into the next element, reusing the batch. */ - Element *next_el = current_elem->next_elems[0]; - int next_input_port = current_elem->next_connected_inputs[0]; - - batch->element = next_el; - batch->input_port = next_input_port; - batch->has_results = false; - queue.push_back(batch); - continue; + } END_FOR; + #if NBA_BATCHING_SCHEME == NBA_BATCHING_CONTINUOUS + if (batch->has_dropped) + batch->collect_excluded_packets(); + if (ctx->inspector) ctx->inspector->drop_pkt_count += batch->drop_count; + batch->clean_drops(ctx->io_ctx->drop_queue); + #endif + } + if (current_elem->next_elems[0]->get_type() & ELEMTYPE_OUTPUT) { + /* We are at the end leaf of the pipeline. + * Inidicate free of the original batch. */ + if (ctx->inspector) { + ctx->inspector->tx_batch_count ++;; + ctx->inspector->tx_pkt_count += batch->count; } + io_tx_batch(ctx->io_ctx, batch); + free_batch(batch, false); + } else { + /* Recurse into the next element, reusing the batch. */ + Element *next_el = current_elem->next_elems[0]; + int next_input_port = current_elem->next_connected_inputs[0]; + + batch->tracker.element = next_el; + batch->tracker.input_port = next_input_port; + batch->tracker.has_results = false; + queue.push_back(Task::to_task(batch)); + } - } else { /* num_outputs > 1 */ + } else { /* num_outputs > 1 */ - // TODO: Work in progress! - //size_t num_outputs = elem->next_elems.size(); - //for (unsigned o = 1; o < num_outputs; o++) { - //} - // TODO: zero out the used portions of elem->output_cloned_packets[] + // TODO: Work in progress! + //size_t num_outputs = elem->next_elems.size(); + //for (unsigned o = 1; o < num_outputs; o++) { + //} + // TODO: zero out the used portions of elem->output_cloned_packets[] - const int *const results = batch->results; - PacketBatch *out_batches[num_max_outputs]; - // TODO: implement per-batch handling for branches + const int *const results = batch->results; + PacketBatch *out_batches[num_max_outputs]; + // TODO: implement per-batch handling for branches #ifndef NBA_DISABLE_BRANCH_PREDICTION #ifndef NBA_BRANCH_PREDICTION_ALWAYS - /* use branch prediction when miss < total / 4. */ - if ((current_elem->branch_total >> 2) > (current_elem->branch_miss)) + /* use branch prediction when miss < total / 4. */ + if ((current_elem->branch_total >> 2) > (current_elem->branch_miss)) #endif - { - /* With multiple outputs, make copy of batches. - * This does not copy the content of packets but the - * pointers to packets. */ - int predicted_output = 0; //TODO set to right prediction - uint64_t current_max = 0; - for (unsigned k = 0; k < current_elem->next_elems.size(); k++) { - if (current_max < current_elem->branch_count[k]) { - current_max = current_elem->branch_count[k]; - predicted_output = k; - } + { + /* With multiple outputs, make copy of batches. + * This does not copy the content of packets but the + * pointers to packets. */ + int predicted_output = 0; //TODO set to right prediction + uint64_t current_max = 0; + for (unsigned k = 0; k < current_elem->next_elems.size(); k++) { + if (current_max < current_elem->branch_count[k]) { + current_max = current_elem->branch_count[k]; + predicted_output = k; } + } - memset(out_batches, 0, num_max_outputs*sizeof(PacketBatch *)); - out_batches[predicted_output] = batch; - - /* Classify packets into copy-batches. */ - #if NBA_BATCHING_SCHEME == NBA_BATCHING_CONTINUOUS - batch->has_dropped = false; - #endif - FOR_EACH_PACKET(batch) { - int o = results[pkt_idx]; - assert(o < (signed) num_outputs); - - /* Prediction mismatch! */ - if (unlikely(o != predicted_output)) { - switch(o) { - HANDLE_ALL_PORTS: { - if (!out_batches[o]) { - /* out_batch is not allocated yet... */ - while (rte_mempool_get(ctx->batch_pool, (void**)(out_batches + o)) == -ENOENT - && !ctx->io_ctx->loop_broken) { - ev_run(ctx->io_ctx->loop, EVRUN_NOWAIT); - } - new (out_batches[o]) PacketBatch(); - anno_set(&out_batches[o]->banno, NBA_BANNO_LB_DECISION, lb_decision); - out_batches[o]->recv_timestamp = batch->recv_timestamp; + memset(out_batches, 0, num_max_outputs*sizeof(PacketBatch *)); + out_batches[predicted_output] = batch; + + /* Classify packets into copy-batches. */ + #if NBA_BATCHING_SCHEME == NBA_BATCHING_CONTINUOUS + batch->has_dropped = false; + #endif + FOR_EACH_PACKET(batch) { + int o = results[pkt_idx]; + assert(o < (signed) num_outputs); + + /* Prediction mismatch! */ + if (unlikely(o != predicted_output)) { + switch(o) { + HANDLE_ALL_PORTS: { + if (!out_batches[o]) { + /* out_batch is not allocated yet... */ + while (rte_mempool_get(ctx->batch_pool, (void**)(out_batches + o)) == -ENOENT + && !ctx->io_ctx->loop_broken) { + ev_run(ctx->io_ctx->loop, EVRUN_NOWAIT); } - /* Append the packet to the output batch. */ - ADD_PACKET(out_batches[o], batch->packets[pkt_idx]); - /* Exclude it from the batch. */ - EXCLUDE_PACKET(batch, pkt_idx); - break; } - #if NBA_BATCHING_SCHEME != NBA_BATCHING_CONTINUOUS - case DROP: - rte_ring_enqueue(ctx->io_ctx->drop_queue, batch->packets[pkt_idx]); - EXCLUDE_PACKET(batch, pkt_idx); - #endif - case PENDING: { - /* The packet is stored in io_thread_ctx::pended_pkt_queue. */ - /* Exclude it from the batch. */ - EXCLUDE_PACKET(batch, pkt_idx); - break; } - case SLOWPATH: - assert(0); // Not implemented yet. - break; + new (out_batches[o]) PacketBatch(); + anno_set(&out_batches[o]->banno, NBA_BANNO_LB_DECISION, lb_decision); + out_batches[o]->recv_timestamp = batch->recv_timestamp; } - current_elem->branch_miss++; + /* Append the packet to the output batch. */ + ADD_PACKET(out_batches[o], batch->packets[pkt_idx]); + /* Exclude it from the batch. */ + EXCLUDE_PACKET(batch, pkt_idx); + break; } + #if NBA_BATCHING_SCHEME != NBA_BATCHING_CONTINUOUS + case DROP: + rte_ring_enqueue(ctx->io_ctx->drop_queue, batch->packets[pkt_idx]); + EXCLUDE_PACKET(batch, pkt_idx); + #endif + case PENDING: { + /* The packet is stored in io_thread_ctx::pended_pkt_queue. */ + /* Exclude it from the batch. */ + EXCLUDE_PACKET(batch, pkt_idx); + break; } + case SLOWPATH: + assert(0); // Not implemented yet. + break; } - current_elem->branch_total++; - current_elem->branch_count[o]++; - } END_FOR; - - // NOTE: out_batches[predicted_output] == batch - #if NBA_BATCHING_SCHEME == NBA_BATCHING_CONTINUOUS - if (batch->has_dropped) - batch->collect_excluded_packets(); - if (ctx->inspector) ctx->inspector->drop_pkt_count += batch->drop_count; - batch->clean_drops(ctx->io_ctx->drop_queue); - #endif - - if (current_elem->branch_total & BRANCH_TRUNC_LIMIT) { - //double percentage = ((double)(current_elem->branch_total-current_elem->branch_miss) / (double)current_elem->branch_total); - //printf("%s: prediction: %f\n", current_elem->class_name(), percentage); - current_elem->branch_miss = current_elem->branch_total = 0; - for (unsigned k = 0; k < current_elem->next_elems.size(); k++) - current_elem->branch_count[k] = current_elem->branch_count[k] >> 1; + current_elem->branch_miss++; } + current_elem->branch_total++; + current_elem->branch_count[o]++; + } END_FOR; + + // NOTE: out_batches[predicted_output] == batch + #if NBA_BATCHING_SCHEME == NBA_BATCHING_CONTINUOUS + if (batch->has_dropped) + batch->collect_excluded_packets(); + if (ctx->inspector) ctx->inspector->drop_pkt_count += batch->drop_count; + batch->clean_drops(ctx->io_ctx->drop_queue); + #endif + + if (current_elem->branch_total & BRANCH_TRUNC_LIMIT) { + //double percentage = ((double)(current_elem->branch_total-current_elem->branch_miss) / (double)current_elem->branch_total); + //printf("%s: prediction: %f\n", current_elem->class_name(), percentage); + current_elem->branch_miss = current_elem->branch_total = 0; + for (unsigned k = 0; k < current_elem->next_elems.size(); k++) + current_elem->branch_count[k] = current_elem->branch_count[k] >> 1; + } - /* Recurse into the element subgraph starting from each - * output port using copy-batches. */ - for (unsigned o = 0; o < num_outputs; o++) { - if (out_batches[o] && out_batches[o]->count > 0) { - assert(current_elem->next_elems[o] != NULL); - if (current_elem->next_elems[o]->get_type() & ELEMTYPE_OUTPUT) { + /* Recurse into the element subgraph starting from each + * output port using copy-batches. */ + for (unsigned o = 0; o < num_outputs; o++) { + if (out_batches[o] && out_batches[o]->count > 0) { + assert(current_elem->next_elems[o] != NULL); + if (current_elem->next_elems[o]->get_type() & ELEMTYPE_OUTPUT) { - if (ctx->inspector) { - ctx->inspector->tx_batch_count ++; - ctx->inspector->tx_pkt_count += out_batches[o]->count; - } + if (ctx->inspector) { + ctx->inspector->tx_batch_count ++; + ctx->inspector->tx_pkt_count += out_batches[o]->count; + } - /* We are at the end leaf of the pipeline. */ - io_tx_batch(ctx->io_ctx, out_batches[o]); - free_batch(out_batches[o], false); + /* We are at the end leaf of the pipeline. */ + io_tx_batch(ctx->io_ctx, out_batches[o]); + free_batch(out_batches[o], false); - } else { + } else { - Element *next_el = current_elem->next_elems[o]; - int next_input_port = current_elem->next_connected_inputs[o]; + Element *next_el = current_elem->next_elems[o]; + int next_input_port = current_elem->next_connected_inputs[o]; - out_batches[o]->element = next_el; - out_batches[o]->input_port = next_input_port; - out_batches[o]->has_results = false; + out_batches[o]->tracker.element = next_el; + out_batches[o]->tracker.input_port = next_input_port; + out_batches[o]->tracker.has_results = false; - /* Push at the beginning of the job queue (DFS). - * If we insert at the end, it becomes BFS. */ - queue.push_back(out_batches[o]); - } - } else { - /* This batch is unused! */ - if (out_batches[o]) - free_batch(out_batches[o]); + /* Push at the beginning of the job queue (DFS). + * If we insert at the end, it becomes BFS. */ + queue.push_back(Task::to_task(out_batches[o])); } + } else { + /* This batch is unused! */ + if (out_batches[o]) + free_batch(out_batches[o]); } - continue; } + } #ifndef NBA_BRANCH_PREDICTION_ALWAYS - else + else #endif #endif #ifndef NBA_BRANCH_PREDICTION_ALWAYS - { - while (rte_mempool_get_bulk(ctx->batch_pool, (void **) out_batches, num_outputs) == -ENOENT - && !ctx->io_ctx->loop_broken) { - ev_run(ctx->io_ctx->loop, EVRUN_NOWAIT); - } + { + while (rte_mempool_get_bulk(ctx->batch_pool, (void **) out_batches, num_outputs) == -ENOENT + && !ctx->io_ctx->loop_broken) { + ev_run(ctx->io_ctx->loop, EVRUN_NOWAIT); + } - // TODO: optimize by choosing/determining the "major" path and reuse the - // batch for that path. + // TODO: optimize by choosing/determining the "major" path and reuse the + // batch for that path. - /* Initialize copy-batches. */ - for (unsigned o = 0; o < num_outputs; o++) { - new (out_batches[o]) PacketBatch(); - anno_set(&out_batches[o]->banno, NBA_BANNO_LB_DECISION, lb_decision); - out_batches[o]->recv_timestamp = batch->recv_timestamp; - } + /* Initialize copy-batches. */ + for (unsigned o = 0; o < num_outputs; o++) { + new (out_batches[o]) PacketBatch(); + anno_set(&out_batches[o]->banno, NBA_BANNO_LB_DECISION, lb_decision); + out_batches[o]->recv_timestamp = batch->recv_timestamp; + } - /* Classify packets into copy-batches. */ - FOR_EACH_PACKET(batch) { - int o = results[pkt_idx]; - #if NBA_BATCHING_SCHEME == NBA_BATCHING_CONTINUOUS - if (o >= (signed) num_outputs || o < 0) - printf("o=%d, num_outputs=%lu, %u/%u/%u\n", o, num_outputs, pkt_idx, batch->count, batch->drop_count); - #else - if (o >= (signed) num_outputs || o < 0) - printf("o=%d, num_outputs=%lu, %u/%u\n", o, num_outputs, pkt_idx, batch->count); - #endif - assert(o < (signed) num_outputs && o >= 0); + /* Classify packets into copy-batches. */ + FOR_EACH_PACKET(batch) { + int o = results[pkt_idx]; + #if NBA_BATCHING_SCHEME == NBA_BATCHING_CONTINUOUS + if (o >= (signed) num_outputs || o < 0) + printf("o=%d, num_outputs=%lu, %u/%u/%u\n", o, num_outputs, pkt_idx, batch->count, batch->drop_count); + #else + if (o >= (signed) num_outputs || o < 0) + printf("o=%d, num_outputs=%lu, %u/%u\n", o, num_outputs, pkt_idx, batch->count); + #endif + assert(o < (signed) num_outputs && o >= 0); - if (o >= 0) { - ADD_PACKET(out_batches[o], batch->packets[pkt_idx]); - #if NBA_BATCHING_SCHEME != NBA_BATCHING_CONTINUOUS - } else if (o == DROP) { - rte_ring_enqueue(ctx->io_ctx->drop_queue, batch->packets[pkt_idx]); - #endif - } else if (o == PENDING) { - // remove from PacketBatch, but don't free.. - // They are stored in io_thread_ctx::pended_pkt_queue. - } else if (o == SLOWPATH) { - assert(0); - } else { - rte_panic("Invalid packet disposition value. (element: %s, value: %d)\n", current_elem->class_name(), o); - } - /* Packets are excluded from original batch in ALL cases. */ - /* Therefore, we do not have to collect_excluded_packets() - * nor clean_drops()! */ - EXCLUDE_PACKET_MARK_ONLY(batch, pkt_idx); - } END_FOR; - - /* Recurse into the element subgraph starting from each - * output port using copy-batches. */ - for (unsigned o = 0; o < num_outputs; o++) { - if (out_batches[o]->count > 0) { - assert(current_elem->next_elems[o] != NULL); - if (current_elem->next_elems[o]->get_type() & ELEMTYPE_OUTPUT) { - - if (ctx->inspector) { - ctx->inspector->tx_batch_count ++; - ctx->inspector->tx_pkt_count += batch->count; - } + if (o >= 0) { + ADD_PACKET(out_batches[o], batch->packets[pkt_idx]); + #if NBA_BATCHING_SCHEME != NBA_BATCHING_CONTINUOUS + } else if (o == DROP) { + rte_ring_enqueue(ctx->io_ctx->drop_queue, batch->packets[pkt_idx]); + #endif + } else if (o == PENDING) { + // remove from PacketBatch, but don't free.. + // They are stored in io_thread_ctx::pended_pkt_queue. + } else if (o == SLOWPATH) { + assert(0); + } else { + rte_panic("Invalid packet disposition value. (element: %s, value: %d)\n", current_elem->class_name(), o); + } + /* Packets are excluded from original batch in ALL cases. */ + /* Therefore, we do not have to collect_excluded_packets() + * nor clean_drops()! */ + EXCLUDE_PACKET_MARK_ONLY(batch, pkt_idx); + } END_FOR; + + /* Recurse into the element subgraph starting from each + * output port using copy-batches. */ + for (unsigned o = 0; o < num_outputs; o++) { + if (out_batches[o]->count > 0) { + assert(current_elem->next_elems[o] != NULL); + if (current_elem->next_elems[o]->get_type() & ELEMTYPE_OUTPUT) { + + if (ctx->inspector) { + ctx->inspector->tx_batch_count ++; + ctx->inspector->tx_pkt_count += batch->count; + } - /* We are at the end leaf of the pipeline. */ - io_tx_batch(ctx->io_ctx, out_batches[o]); - free_batch(out_batches[o], false); + /* We are at the end leaf of the pipeline. */ + io_tx_batch(ctx->io_ctx, out_batches[o]); + free_batch(out_batches[o], false); - } else { + } else { - Element *next_el = current_elem->next_elems[o]; - int next_input_port = current_elem->next_connected_inputs[o]; + Element *next_el = current_elem->next_elems[o]; + int next_input_port = current_elem->next_connected_inputs[o]; - out_batches[o]->element = next_el; - out_batches[o]->input_port = next_input_port; - out_batches[o]->has_results = false; + out_batches[o]->tracker.element = next_el; + out_batches[o]->tracker.input_port = next_input_port; + out_batches[o]->tracker.has_results = false; - /* Push at the beginning of the job queue (DFS). - * If we insert at the end, it becomes BFS. */ - queue.push_back(out_batches[o]); - } - } else { - /* This batch is unused! */ - free_batch(out_batches[o]); + /* Push at the beginning of the job queue (DFS). + * If we insert at the end, it becomes BFS. */ + queue.push_back(Task::to_task(out_batches[o])); } + } else { + /* This batch is unused! */ + free_batch(out_batches[o]); } - - /* With multiple outputs (branches happened), we have made - * copy-batches and the parent should free its batch. */ - free_batch(batch); - continue; } + + /* With multiple outputs (branches happened), we have made + * copy-batches and the parent should free its batch. */ + free_batch(batch); + } #endif + } /* endif(numoutputs) */ +} + +void ElementGraph::process_offload_task(OffloadTask *otask) +{ + Element *current_elem = otask->tracker.element; + OffloadableElement *offloadable = dynamic_cast(current_elem); + assert(offloadable != nullptr); + assert(offloadable->offload(this, otask, otask->tracker.input_port) == 0); +} + +void ElementGraph::flush_tasks() +{ + /* + * We have two cases of batch handling: + * - single output: enqueue the given batch as it is without + * copying or memory pool allocation. + * - multiple outputs: make copy of batches (but not packets) and + * enqueue them as later job. + */ + + /* When the queue becomes empty, the processing path started from + * the start_elem is finished. The unit of a job is an element. */ + while (!queue.empty() && !ctx->io_ctx->loop_broken) { + void *raw_task = queue.front(); + queue.pop_front(); + switch (Task::get_task_type(raw_task)) { + case TASK_SINGLE_BATCH: + { + PacketBatch *batch = Task::to_packet_batch(raw_task); + process_batch(batch); + break; + } + case TASK_OFFLOAD: + { + OffloadTask *otask = Task::to_offload_task(raw_task); + process_offload_task(otask); + break; + } } - } + } /* endwhile(queue) */ return; } diff --git a/src/lib/io.cc b/src/lib/io.cc index 075704d..bf50650 100644 --- a/src/lib/io.cc +++ b/src/lib/io.cc @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -79,7 +80,7 @@ typedef function packet_builder_func_t; /* ===== COMP ===== */ static void comp_packetbatch_init(struct rte_mempool *mp, void *arg, void *obj, unsigned idx) { - PacketBatch *b = (PacketBatch*) obj; + PacketBatch *b = (PacketBatch *) obj; new (b) PacketBatch(); } @@ -97,7 +98,7 @@ static void comp_check_cb(struct ev_loop *loop, struct ev_check *watcher, int re { io_thread_context *io_ctx = (io_thread_context *) ev_userdata(loop); comp_thread_context *ctx = io_ctx->comp_ctx; - ctx->elem_graph->flush_delayed_batches(); + ctx->elem_graph->flush_tasks(); ctx->elem_graph->flush_offloaded_tasks(); ctx->elem_graph->scan_offloadable_elements(); } @@ -143,6 +144,7 @@ static void comp_offload_task_completion_cb(struct ev_loop *loop, struct ev_asyn total_batch_size += task->batches[b]->count; for (size_t b = 0, b_max = task->batches.size(); b < b_max; b ++) { task->batches[b]->compute_time += (uint64_t) ((float) task_cycles / total_batch_size - ((float) task->batches[b]->delay_time / task->batches[b]->count)); + // TODO: if next elem is offloadable, then use enqueue_offloadtask task->elem->enqueue_batch(task->batches[b]); } @@ -239,8 +241,8 @@ static size_t comp_process_batch(io_thread_context *ctx, void *pkts, size_t coun ctx->comp_ctx->elem_graph->free_batch(batch); } else { assert(next_batch == batch); - next_batch->has_results = true; // skip processing - ctx->comp_ctx->elem_graph->run(next_batch, input_elem, 0); + next_batch->tracker.has_results = true; // skip processing + ctx->comp_ctx->elem_graph->enqueue_batch(next_batch, input_elem, 0); } return count; @@ -972,8 +974,8 @@ int io_loop(void *arg) } /* Try to "drain" internally stored batches. */ while (next_batch != nullptr) { - next_batch->has_results = true; // skip processing - ctx->comp_ctx->elem_graph->run(next_batch, selem, 0); + next_batch->tracker.has_results = true; // skip processing + ctx->comp_ctx->elem_graph->enqueue_batch(next_batch, selem, 0); ret = selem->dispatch(loop_count, next_batch, selem->_last_delay); }; } /* endif(!ELEMTYPE_INPUT) */ diff --git a/src/lib/offloadtask.cc b/src/lib/offloadtask.cc index a2b9e0d..e5393a4 100644 --- a/src/lib/offloadtask.cc +++ b/src/lib/offloadtask.cc @@ -19,7 +19,7 @@ using namespace std; using namespace nba; -#define COALESC_COPY +#define COALESCED_COPY #undef DEBUG_HOSTSIDE static thread_local char dummy_buffer[NBA_MAX_PACKET_SIZE] = {0,}; @@ -36,6 +36,7 @@ OffloadTask::OffloadTask() completion_watcher = nullptr; completion_queue = nullptr; cctx = nullptr; + io_base = INVALID_IO_BASE; offload_start = 0; num_pkts = 0; num_bytes = 0; @@ -47,84 +48,130 @@ OffloadTask::~OffloadTask() void OffloadTask::prepare_read_buffer() { + assert(io_base != INVALID_IO_BASE); + #ifdef COALESCED_COPY + cctx->get_input_current_pos(io_base, &host_write_begin, &dev_write_begin); + cctx->get_output_current_pos(io_base, &host_read_begin, &dev_read_begin); + input_alloc_size_begin = cctx->get_input_size(io_base); + output_alloc_size_begin = cctx->get_output_size(io_base); + #endif + for (int dbid : datablocks) { - DataBlock *db = comp_ctx->datablock_registry[dbid]; - struct read_roi_info rri; - db->get_read_roi(&rri); - if (rri.type == READ_NONE) { - for (PacketBatch *batch : batches) { - struct datablock_tracker *t = &batch->datablock_states[dbid]; - t->in_size = 0; - t->in_count = 0; - t->host_in_ptr = nullptr; - t->dev_in_ptr.ptr = nullptr; - } - } else { - for (PacketBatch *batch : batches) { - struct datablock_tracker *t = &batch->datablock_states[dbid]; - tie(t->in_size, t->in_count) = db->calc_read_buffer_size(batch); - t->host_in_ptr = nullptr; - t->dev_in_ptr.ptr = nullptr; - if (t->in_size > 0 && t->in_count > 0) { - cctx->alloc_input_buffer(t->in_size, (void **) &t->host_in_ptr, &t->dev_in_ptr); - assert(t->host_in_ptr != nullptr); - db->preprocess(batch, t->host_in_ptr); - } else - printf("EMPTY BATCH @ prepare_read_buffer()\n"); + if (elemgraph->check_preproc(elem, dbid)) { + DataBlock *db = comp_ctx->datablock_registry[dbid]; + struct read_roi_info rri; + db->get_read_roi(&rri); + if (rri.type == READ_WHOLE_PACKET) { + for (PacketBatch *batch : batches) { + struct datablock_tracker *t = &batch->datablock_states[dbid]; + cctx->alloc_input_buffer(io_base, sizeof(struct item_size_info), + (void **) &t->aligned_item_sizes_h, + &t->aligned_item_sizes_d); + } + } else if (rri.type == READ_PARTIAL_PACKET) { + for (PacketBatch *batch : batches) { + struct datablock_tracker *t = &batch->datablock_states[dbid]; + cctx->alloc_input_buffer(io_base, sizeof(uint64_t), + (void **) &t->aligned_item_sizes_h, + &t->aligned_item_sizes_d); + } + } else { + for (PacketBatch *batch : batches) { + struct datablock_tracker *t = &batch->datablock_states[dbid]; + t->aligned_item_sizes_h = nullptr; + } } - } - } + } /* endif(check_preproc) */ + } /* endfor(dbid) */ + for (int dbid : datablocks) { + if (elemgraph->check_preproc(elem, dbid)) { + DataBlock *db = comp_ctx->datablock_registry[dbid]; + struct read_roi_info rri; + db->get_read_roi(&rri); + if (rri.type == READ_NONE) { + for (PacketBatch *batch : batches) { + struct datablock_tracker *t = &batch->datablock_states[dbid]; + t->in_size = 0; + t->in_count = 0; + t->host_in_ptr = nullptr; + t->dev_in_ptr.ptr = nullptr; + } + } else { + for (PacketBatch *batch : batches) { + struct datablock_tracker *t = &batch->datablock_states[dbid]; + tie(t->in_size, t->in_count) = db->calc_read_buffer_size(batch); + t->host_in_ptr = nullptr; + t->dev_in_ptr.ptr = nullptr; + if (t->in_size > 0 && t->in_count > 0) { + cctx->alloc_input_buffer(io_base, t->in_size, + (void **) &t->host_in_ptr, &t->dev_in_ptr); + assert(t->host_in_ptr != nullptr); + db->preprocess(batch, t->host_in_ptr); + } + } + } /* endcase(rri.type) */ + } /* endif(check_preproc) */ + } /* endfor(dbid) */ } void OffloadTask::prepare_write_buffer() { for (int dbid : datablocks) { - DataBlock *db = comp_ctx->datablock_registry[dbid]; - struct write_roi_info wri; - struct read_roi_info rri; - db->get_write_roi(&wri); - db->get_read_roi(&rri); - if (wri.type == WRITE_NONE) { - for (PacketBatch *batch : batches) { - struct datablock_tracker *t = &batch->datablock_states[dbid]; - t->out_size = 0; - t->out_count = 0; - t->host_out_ptr = nullptr; - t->dev_out_ptr.ptr = nullptr; - } - } else { - for (PacketBatch *batch : batches) { - struct datablock_tracker *t = &batch->datablock_states[dbid]; - t->host_out_ptr = nullptr; - t->dev_out_ptr.ptr = nullptr; - if (rri.type == READ_WHOLE_PACKET && wri.type == WRITE_WHOLE_PACKET) { - /* Reuse read_roi currently. Do NOT update size & count here! */ - t->out_size = t->in_size; - t->out_count = t->in_count; - t->host_out_ptr = t->host_in_ptr; - t->dev_out_ptr = t->dev_in_ptr; - } else { - tie(t->out_size, t->out_count) = db->calc_write_buffer_size(batch); - if (t->out_size > 0 && t->out_count > 0) { - cctx->alloc_output_buffer(t->out_size, (void **) &t->host_out_ptr, &t->dev_out_ptr); - assert(t->host_out_ptr != nullptr); - } else - printf("EMPTY BATCH @ prepare_write_buffer()\n"); + if (elemgraph->check_preproc(elem, dbid)) { + DataBlock *db = comp_ctx->datablock_registry[dbid]; + struct write_roi_info wri; + struct read_roi_info rri; + db->get_write_roi(&wri); + db->get_read_roi(&rri); + if (wri.type == WRITE_NONE) { + for (PacketBatch *batch : batches) { + struct datablock_tracker *t = &batch->datablock_states[dbid]; + t->out_size = 0; + t->out_count = 0; + t->host_out_ptr = nullptr; + t->dev_out_ptr.ptr = nullptr; + } + } else { + for (PacketBatch *batch : batches) { + struct datablock_tracker *t = &batch->datablock_states[dbid]; + t->host_out_ptr = nullptr; + t->dev_out_ptr.ptr = nullptr; + if (rri.type == READ_WHOLE_PACKET && wri.type == WRITE_WHOLE_PACKET) { + /* Reuse read_roi currently. Do NOT update size & count here! */ + t->out_size = t->in_size; + t->out_count = t->in_count; + t->host_out_ptr = t->host_in_ptr; + t->dev_out_ptr = t->dev_in_ptr; + } else { + tie(t->out_size, t->out_count) = db->calc_write_buffer_size(batch); + if (t->out_size > 0 && t->out_count > 0) { + cctx->alloc_output_buffer(io_base, t->out_size, + (void **) &t->host_out_ptr, + &t->dev_out_ptr); + assert(t->host_out_ptr != nullptr); + } + } } } - } - } + } /* endif(check_preproc) */ + } /* endfor(dbid) */ } bool OffloadTask::copy_h2d() { bool has_h2d_copies = false; + state = TASK_H2D_COPYING; /* Copy the datablock information for the first kernel argument. */ size_t dbarray_size = ALIGN_CEIL(sizeof(struct datablock_kernel_arg) * datablocks.size(), CACHE_LINE_SIZE); - cctx->alloc_input_buffer(dbarray_size, (void **) &dbarray_h, &dbarray_d); + cctx->alloc_input_buffer(io_base, dbarray_size, (void **) &dbarray_h, &dbarray_d); assert(dbarray_h != nullptr); - size_t itemszarray_size = 0; + + #ifndef COALESCED_COPY + void *item_info_h = nullptr; + memory_t item_info_d; + size_t item_info_size = 0; + #endif for (int dbid : datablocks) { int dbid_d = dbid_h2d[dbid]; @@ -142,37 +189,25 @@ bool OffloadTask::copy_h2d() for (PacketBatch *batch : batches) { struct datablock_tracker *t = &batch->datablock_states[dbid]; - if (rri.type == READ_WHOLE_PACKET) { + if (rri.type == READ_WHOLE_PACKET && t->in_count > 0) { /* We need to copy the size array because each item may * have different lengths. */ - uint16_t *item_sizes_h; - memory_t item_sizes_d; - cctx->alloc_input_buffer(t->in_count * sizeof(uint16_t), (void **) &item_sizes_h, &item_sizes_d); - assert(item_sizes_h != nullptr); - itemszarray_size += ALIGN_CEIL(t->in_count * sizeof(uint16_t), CACHE_LINE_SIZE); - rte_memcpy(item_sizes_h, &t->aligned_item_sizes.sizes[0], sizeof(uint16_t) * t->in_count); - #ifdef DEBUG_HOSTSIDE - dbarray_h[dbid_d].item_sizes_in[b] = (uint16_t *) item_sizes_h; - dbarray_h[dbid_d].item_sizes_out[b] = (uint16_t *) item_sizes_h; - #else - dbarray_h[dbid_d].item_sizes_in[b] = (uint16_t *) item_sizes_d.ptr; - dbarray_h[dbid_d].item_sizes_out[b] = (uint16_t *) item_sizes_d.ptr; - #endif - - uint16_t *item_offsets_h; - memory_t item_offsets_d; - cctx->alloc_input_buffer(t->in_count * sizeof(uint16_t), (void **) &item_offsets_h, &item_offsets_d); - assert(item_sizes_h != nullptr); - itemszarray_size += ALIGN_CEIL(t->in_count * sizeof(uint16_t), CACHE_LINE_SIZE); - rte_memcpy(item_offsets_h, &t->aligned_item_sizes.offsets[0], sizeof(uint16_t) * t->in_count); - #ifdef DEBUG_HOSTSIDE - dbarray_h[dbid_d].item_offsets_in[b] = (uint16_t *) item_offsets_h; - dbarray_h[dbid_d].item_offsets_out[b] = (uint16_t *) item_offsets_h; - #else - dbarray_h[dbid_d].item_offsets_in[b] = (uint16_t *) item_offsets_d.ptr; - dbarray_h[dbid_d].item_offsets_out[b] = (uint16_t *) item_offsets_d.ptr; + assert(t->aligned_item_sizes_h != nullptr); + #ifndef COALESCED_COPY + if (item_info_h == nullptr) { + item_info_h = t->aligned_item_sizes_h; + item_info_d = t->aligned_item_sizes_d; + } + item_info_size += ALIGN(sizeof(struct item_size_info), CACHE_LINE_SIZE); #endif - + dbarray_h[dbid_d].item_sizes_in[b] = (uint16_t *) ((char *) t->aligned_item_sizes_d.ptr + + (uintptr_t) offsetof(struct item_size_info, sizes)); + dbarray_h[dbid_d].item_sizes_out[b] = (uint16_t *) ((char *) t->aligned_item_sizes_d.ptr + + (uintptr_t) offsetof(struct item_size_info, sizes)); + dbarray_h[dbid_d].item_offsets_in[b] = (uint16_t *) ((char *) t->aligned_item_sizes_d.ptr + + (uintptr_t) offsetof(struct item_size_info, offsets)); + dbarray_h[dbid_d].item_offsets_out[b] = (uint16_t *) ((char *) t->aligned_item_sizes_d.ptr + + (uintptr_t) offsetof(struct item_size_info, offsets)); } else { /* Same for all batches. * We assume the module developer knows the fixed length @@ -182,63 +217,69 @@ bool OffloadTask::copy_h2d() dbarray_h[dbid_d].item_offsets_in[b] = nullptr; dbarray_h[dbid_d].item_offsets_out[b] = nullptr; } - #ifdef DEBUG_HOSTSIDE - dbarray_h[dbid_d].buffer_bases_in[b] = t->host_in_ptr; - #else dbarray_h[dbid_d].buffer_bases_in[b] = t->dev_in_ptr.ptr; // FIXME: generalize to CL? - #endif dbarray_h[dbid_d].item_count_in[b] = t->in_count; dbarray_h[dbid_d].total_item_count_in += t->in_count; - #ifdef DEBUG_HOSTSIDE - dbarray_h[dbid_d].buffer_bases_out[b] = t->host_out_ptr; - #else dbarray_h[dbid_d].buffer_bases_out[b] = t->dev_out_ptr.ptr; // FIXME: generalize to CL? - #endif dbarray_h[dbid_d].item_count_out[b] = t->out_count; dbarray_h[dbid_d].total_item_count_out += t->out_count; b++; - } - } + } /* endfor(batches) */ + } /* endfor(dbid) */ + + #ifndef COALESCED_COPY + cctx->enqueue_memwrite_op(item_info_h, item_info_d, 0, item_info_size); // FIXME: hacking by knowing internal behaviour of cuda_mempool... - cctx->enqueue_memwrite_op(dbarray_h, dbarray_d, 0, dbarray_size + itemszarray_size); + cctx->enqueue_memwrite_op(dbarray_h, dbarray_d, 0, dbarray_size); + #endif + has_h2d_copies = true; - /* Coalesced H2D data copy. */ + /* Coalesced H2D data copy. + * We need to check and copy not-yet-tranferred-to-GPU buffers one by + * one, but it causes high device API call overheads. + * We aggregate continuous copies to reduce the number of API calls. + * If there are no reused datablocks, all copies are shrinked into a + * single API call. */ + #ifndef COALESCED_COPY void *first_host_in_ptr = nullptr; - int copies = 0; memory_t first_dev_in_ptr; size_t total_size = 0; for (int dbid : datablocks) { - int b = 0; - for (PacketBatch *batch : batches) { - struct datablock_tracker *t = &batch->datablock_states[dbid]; - if (t == nullptr || t->host_in_ptr == nullptr || t->in_count == 0 || t->in_size == 0) { - #ifdef COALESC_COPY - if (first_host_in_ptr != nullptr) { - /* Discontinued copy. */ - cctx->enqueue_memwrite_op(first_host_in_ptr, first_dev_in_ptr, 0, total_size); - /* Reset. */ - first_host_in_ptr = nullptr; - total_size = 0; + if (elemgraph->check_preproc(elem, dbid)) { + for (PacketBatch *batch : batches) { + struct datablock_tracker *t = &batch->datablock_states[dbid]; + if (t == nullptr || t->host_in_ptr == nullptr) { + if (first_host_in_ptr != nullptr) { + /* Discontinued copy. */ + cctx->enqueue_memwrite_op(first_host_in_ptr, first_dev_in_ptr, + 0, total_size); + /* Reset. */ + first_host_in_ptr = nullptr; + total_size = 0; + } + continue; + } + if (t->in_count == 0) assert(t->in_size == 0); + if (t->in_count > 0) assert(t->in_size > 0); + /* IMPORTANT: IO buffer allocations are aligned by cache line size!! */ + if (first_host_in_ptr == nullptr) { + first_host_in_ptr = t->host_in_ptr; + first_dev_in_ptr = t->dev_in_ptr; + } + if ((char*) first_host_in_ptr + (uintptr_t) total_size + != (char*) t->host_in_ptr) + { + cctx->enqueue_memwrite_op(first_host_in_ptr, first_dev_in_ptr, + 0, total_size); + first_host_in_ptr = t->host_in_ptr; + total_size = ALIGN(t->in_size, CACHE_LINE_SIZE); + } else { + total_size += ALIGN(t->in_size, CACHE_LINE_SIZE); } - #endif - continue; - } - //if (t->in_count == 0) assert(t->in_size == 0); - //if (t->in_count > 0) assert(t->in_size > 0); - if (first_host_in_ptr == nullptr) { - first_host_in_ptr = t->host_in_ptr; - first_dev_in_ptr = t->dev_in_ptr; } - total_size += ALIGN_CEIL(t->in_size, CACHE_LINE_SIZE); - #ifndef COALESC_COPY - cctx->enqueue_memwrite_op(t->host_in_ptr, t->dev_in_ptr, 0, t->in_size); - #endif - has_h2d_copies = true; - b++; - } - } - #ifdef COALESC_COPY + } /* endif(check_preproc) */ + } /* endfor(dbid) */ if (first_host_in_ptr != nullptr) { /* Finished copy. */ cctx->enqueue_memwrite_op(first_host_in_ptr, first_dev_in_ptr, 0, total_size); @@ -276,13 +317,16 @@ void OffloadTask::execute() if (all_item_count > 0) { - cctx->alloc_input_buffer(sizeof(uint16_t) * all_item_count, (void **) &batch_ids_h, &batch_ids_d); + cctx->alloc_input_buffer(io_base, sizeof(uint16_t) * all_item_count, + (void **) &batch_ids_h, &batch_ids_d); assert(batch_ids_h != nullptr); - cctx->alloc_input_buffer(sizeof(uint16_t) * all_item_count, (void **) &item_ids_h, &item_ids_d); + cctx->alloc_input_buffer(io_base, sizeof(uint16_t) * all_item_count, + (void **) &item_ids_h, &item_ids_d); assert(item_ids_h != nullptr); res.num_workitems = all_item_count; res.num_threads_per_workgroup = 256; - res.num_workgroups = (all_item_count + res.num_threads_per_workgroup - 1) / res.num_threads_per_workgroup; + res.num_workgroups = (all_item_count + res.num_threads_per_workgroup - 1) + / res.num_threads_per_workgroup; uint16_t batch_id = 0; unsigned global_idx = 0; for (PacketBatch *batch : batches) { @@ -294,11 +338,18 @@ void OffloadTask::execute() } batch_id ++; } - cctx->enqueue_memwrite_op(batch_ids_h, batch_ids_d, 0, ALIGN_CEIL(sizeof(uint16_t) * all_item_count, CACHE_LINE_SIZE) * 2); + #ifdef COALESCED_COPY + size_t last_alloc_size = cctx->get_input_size(io_base); + cctx->enqueue_memwrite_op(host_write_begin, dev_write_begin, 0, last_alloc_size - input_alloc_size_begin); + #else + cctx->enqueue_memwrite_op(batch_ids_h, batch_ids_d, 0, ALIGN(sizeof(uint16_t) * all_item_count, CACHE_LINE_SIZE) * 2); + #endif cctx->clear_checkbits(); cctx->clear_kernel_args(); + state = TASK_EXECUTING; + /* Framework-provided kernel arguments: * (1) array of datablock_kernel_arg[] indexed by datablock ID * (2) the number of batches @@ -331,48 +382,58 @@ void OffloadTask::execute() bool OffloadTask::copy_d2h() { + state = TASK_D2H_COPYING; + /* Coalesced D2H data copy. */ - bool has_d2h_copies = false; + #ifdef COALESCED_COPY + size_t last_alloc_size = cctx->get_output_size(io_base); + cctx->enqueue_memread_op(host_read_begin, dev_read_begin, + 0, last_alloc_size - output_alloc_size_begin); + #else void *first_host_out_ptr = nullptr; - int copies = 0; memory_t first_dev_out_ptr; size_t total_size = 0; for (int dbid : datablocks) { - DataBlock *db = comp_ctx->datablock_registry[dbid]; - for (PacketBatch *batch : batches) { - struct datablock_tracker *t = &batch->datablock_states[dbid]; - if (t == nullptr || t->host_out_ptr == nullptr || t->out_count == 0 || t->out_size == 0) { - #ifdef COALESC_COPY - if (first_host_out_ptr != nullptr) { - /* Discontinued copy. */ + if (elemgraph->check_postproc(elem, dbid)) { + DataBlock *db = comp_ctx->datablock_registry[dbid]; + for (PacketBatch *batch : batches) { + struct datablock_tracker *t = &batch->datablock_states[dbid]; + if (t == nullptr || t->host_out_ptr == nullptr + || t->out_count == 0 || t->out_size == 0) + { + if (first_host_out_ptr != nullptr) { + /* Discontinued copy. */ + cctx->enqueue_memread_op(first_host_out_ptr, first_dev_out_ptr, 0, total_size); + /* Reset. */ + first_host_out_ptr = nullptr; + total_size = 0; + } + continue; + } + //if (t->out_count == 0) assert(t->out_size == 0); + //if (t->out_count > 0) assert(t->out_size > 0); + if (first_host_out_ptr == nullptr) { + first_host_out_ptr = t->host_out_ptr; + first_dev_out_ptr = t->dev_out_ptr; + } + if ((char*) first_host_out_ptr + (uintptr_t) total_size + != (char*) t->host_out_ptr) + { cctx->enqueue_memread_op(first_host_out_ptr, first_dev_out_ptr, 0, total_size); - /* Reset. */ - first_host_out_ptr = nullptr; - total_size = 0; + first_host_out_ptr = t->host_out_ptr; + total_size = ALIGN(t->out_size, CACHE_LINE_SIZE); + } else { + total_size += ALIGN(t->out_size, CACHE_LINE_SIZE); } - #endif - continue; } - //if (t->out_count == 0) assert(t->out_size == 0); - //if (t->out_count > 0) assert(t->out_size > 0); - if (first_host_out_ptr == nullptr) { - first_host_out_ptr = t->host_out_ptr; - first_dev_out_ptr = t->dev_out_ptr; - } - total_size += ALIGN_CEIL(t->out_size, CACHE_LINE_SIZE); - #ifndef COALESC_COPY - cctx->enqueue_memread_op(t->host_out_ptr, t->dev_out_ptr, 0, t->out_size); - #endif - has_d2h_copies = true; - } - } - #ifdef COALESC_COPY + } /* endif(check_postproc) */ + } /* endfor(dbid) */ if (first_host_out_ptr != nullptr) { /* Finished copy. */ cctx->enqueue_memread_op(first_host_out_ptr, first_dev_out_ptr, 0, total_size); } #endif - return has_d2h_copies; + return true; } bool OffloadTask::poll_kernel_finished() @@ -398,6 +459,7 @@ bool OffloadTask::poll_d2h_copy_finished() void OffloadTask::notify_completion() { /* Notify the computation thread. */ + state = TASK_FINISHED; assert(0 == rte_ring_sp_enqueue(completion_queue, (void *) this)); ev_async_send(src_loop, completion_watcher); } @@ -405,42 +467,38 @@ void OffloadTask::notify_completion() void OffloadTask::postprocess() { for (int dbid : datablocks) { - DataBlock *db = comp_ctx->datablock_registry[dbid]; - struct write_roi_info wri; - db->get_write_roi(&wri); - if (wri.type == WRITE_NONE) - continue; - int b = 0; - bool done = false; - for (PacketBatch *batch : batches) { - // TODO: query the elemgraph analysis result - // to check if elem is the postproc point - // of the current datablock. - struct datablock_tracker *t = &batch->datablock_states[dbid]; - if (t->host_out_ptr != nullptr) { - // FIXME: let the element to choose the datablock used for postprocessing, - // or pass multiple datablocks that have outputs. - db->postprocess(elem, input_ports[b], batch, t->host_out_ptr); - done = true; + if (elemgraph->check_postproc(elem, dbid)) { + DataBlock *db = comp_ctx->datablock_registry[dbid]; + struct write_roi_info wri; + db->get_write_roi(&wri); + int b = 0; + for (PacketBatch *batch : batches) { + struct datablock_tracker *t = &batch->datablock_states[dbid]; + if (t->host_out_ptr != nullptr) { + // FIXME: let the element to choose the datablock used for postprocessing, + // or pass multiple datablocks that have outputs. + db->postprocess(elem, input_ports[b], batch, t->host_out_ptr); + } + b++; } - b++; - } - } + } /* endif(check_postproc) */ + } /* endfor(dbid) */ - // TODO: query the elemgraph analysis result - // to check if no more offloading appears afterwards. - /* Reset all datablock trackers. */ - for (PacketBatch *batch : batches) { - if (batch->datablock_states != nullptr) { - struct datablock_tracker *t = batch->datablock_states; - t->host_in_ptr = nullptr; - t->host_out_ptr = nullptr; - rte_mempool_put(comp_ctx->dbstate_pool, (void *) t); - batch->datablock_states = nullptr; + if (elemgraph->check_postproc_all(elem)) { + /* Reset all datablock trackers. */ + for (PacketBatch *batch : batches) { + if (batch->datablock_states != nullptr) { + struct datablock_tracker *t = batch->datablock_states; + t->host_in_ptr = nullptr; + t->host_out_ptr = nullptr; + rte_mempool_put(comp_ctx->dbstate_pool, (void *) t); + batch->datablock_states = nullptr; + } } + /* Release per-task io_base. */ + cctx->clear_io_buffers(io_base); + //printf("%s task finished\n", elem->class_name()); } - /* Reset the IO buffer completely. */ - cctx->clear_io_buffers(); } // vim: ts=8 sts=4 sw=4 et