Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to enable threadsafe operation #1064

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions dynet/aligned-mem-pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,25 @@

using namespace dynet;


void DynamicCPUMemoryPool::zero(void* p, size_t n) {
auto rounded_n = a->round_up_align(n);
a->zero(p, rounded_n);
}

void* DynamicCPUMemoryPool::allocate(size_t n) {
auto rounded_n = a->round_up_align(n);
void* res = a->malloc(rounded_n);
if (res) {
ptrs.push_back(res);
sizes.push_back(rounded_n);
}
return res;
}

void DynamicCPUMemoryPool::sys_alloc(size_t cap) {}


void* InternalMemoryPool::allocate(size_t n) {
auto rounded_n = a->round_up_align(n);
if (rounded_n + used > capacity) {
Expand All @@ -23,9 +42,13 @@ void InternalMemoryPool::sys_alloc(size_t cap) {
used = 0;
}

AlignedMemoryPool::AlignedMemoryPool(const std::string &name, size_t initial_cap, MemAllocator *a, size_t expanding_unit) : name(name), cap(initial_cap), current(0), a(a), expanding_unit(expanding_unit) {
AlignedMemoryPool::AlignedMemoryPool(const std::string &name, size_t initial_cap, MemAllocator *a, size_t expanding_unit, bool dynamic) : name(name), cap(initial_cap), current(0), a(a), expanding_unit(expanding_unit), dynamic(dynamic) {
DYNET_ARG_CHECK(cap > 0, "Attempt to allocate memory of size 0 in AlignedMemoryPool");
pools.push_back(new InternalMemoryPool(name, cap, a));
if (dynamic) {
pools.push_back(new DynamicCPUMemoryPool(name, cap));
} else {
pools.push_back(new InternalMemoryPool(name, cap, a));
}
}
AlignedMemoryPool::~AlignedMemoryPool() {
for ( auto p : pools) { delete p; }
Expand All @@ -36,7 +59,11 @@ void* AlignedMemoryPool::allocate(size_t n) {
if (res == 0) {
// round up to the nearest multiple of expanding_unit
size_t new_pool_size = (n + expanding_unit-1) / expanding_unit * expanding_unit;
pools.push_back(new InternalMemoryPool(name, new_pool_size, a));
if (dynamic) {
pools.push_back(new DynamicCPUMemoryPool(name, new_pool_size));
} else {
pools.push_back(new InternalMemoryPool(name, new_pool_size, a));
}
cap += new_pool_size;
current++;
res = pools[current]->allocate(n);
Expand All @@ -49,7 +76,12 @@ void AlignedMemoryPool::free() {
if (current > 0) {
for (auto p : pools) { delete p; }
pools.clear();
pools.push_back(new InternalMemoryPool(name, cap, a));
if (dynamic) {
pools.push_back(new DynamicCPUMemoryPool(name, cap));
} else {
pools.push_back(new InternalMemoryPool(name, cap, a));
}
cap = cap * (current + 1);
current = 0;
}
pools[0]->free();
Expand Down
74 changes: 66 additions & 8 deletions dynet/aligned-mem-pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,64 @@

namespace dynet {

class InternalMemoryPool {
class BaseMemoryPool {
public:
explicit InternalMemoryPool(const std::string & name, size_t cap, MemAllocator* a) : name(name), a(a) {
BaseMemoryPool(const std::string & name, MemAllocator* a) : a(a), name(name) {}
virtual ~BaseMemoryPool() {}
virtual void* allocate(size_t n) = 0;

virtual void free() = 0;
// zeros out the amount of allocations
virtual void zero_allocated_memory() = 0;

size_t used;

protected:
virtual void sys_alloc(size_t cap) {}
virtual void zero_all() {}

MemAllocator* a;
std::string name;
void* mem;
};

class DynamicCPUMemoryPool : public BaseMemoryPool {
private:
std::vector<void*> ptrs;
std::vector<size_t> sizes;

public:
explicit DynamicCPUMemoryPool(const std::string & name, size_t cap)
: BaseMemoryPool(name, new CPUAllocator()) {}

~DynamicCPUMemoryPool() {
free();
delete a;
}

void* allocate(size_t n);
void zero(void* p, size_t n);

void free() {
for (auto p : ptrs)
a->free(p);
ptrs.clear();
sizes.clear();
}
// zeros out the amount of allocations
void zero_allocated_memory() {
for (unsigned i = 0; i < ptrs.size(); i++)
zero(ptrs[i], sizes[i]);
}

private:
void sys_alloc(size_t cap);
void zero_all() {}
};

class InternalMemoryPool : public BaseMemoryPool {
public:
explicit InternalMemoryPool(const std::string & name, size_t cap, MemAllocator* a) : BaseMemoryPool(name, a) {
sys_alloc(cap);
zero_all();
}
Expand All @@ -33,20 +88,18 @@ class InternalMemoryPool {

size_t used;
private:
size_t capacity;

void sys_alloc(size_t cap);

void zero_all() {
a->zero(mem, capacity);
}
std::string name;
size_t capacity;
MemAllocator* a;
void* mem;
};

class AlignedMemoryPool {
public:
explicit AlignedMemoryPool(const std::string &name, size_t initial_cap, MemAllocator *a, size_t expanding_unit = 1<<24);
explicit AlignedMemoryPool(const std::string &name, size_t initial_cap, MemAllocator *a, size_t expanding_unit = 1<<24, bool dynamic = false);
~AlignedMemoryPool();

void* allocate(size_t n);
Expand All @@ -59,13 +112,18 @@ class AlignedMemoryPool {
void set_used(size_t s);
size_t get_cap();

size_t round_up_align(size_t n) const { return a->round_up_align(n); }

bool is_dynamic() { return dynamic; }

private:
std::string name;
std::vector<InternalMemoryPool *> pools;
std::vector<BaseMemoryPool *> pools;
size_t cap;
int current;
MemAllocator* a;
size_t expanding_unit;
bool dynamic;
};

} // namespace dynet
Expand Down
11 changes: 6 additions & 5 deletions dynet/devices.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Device_GPU::Device_GPU(int my_id, const DeviceMempoolSizes & mbs, int device_id)
Device_GPU::~Device_GPU() {}
#endif

Device_CPU::Device_CPU(int my_id, const DeviceMempoolSizes & mbs, bool shared) :
Device_CPU::Device_CPU(int my_id, const DeviceMempoolSizes & mbs, bool shared, bool dynamic) :
Device(my_id, DeviceType::CPU, &cpu_mem), shmem(mem) {
if (shared) shmem = new SharedAllocator();
kSCALAR_MINUSONE = (float*) mem->malloc(sizeof(float));
Expand All @@ -136,10 +136,11 @@ Device_CPU::Device_CPU(int my_id, const DeviceMempoolSizes & mbs, bool shared) :
edevice = new Eigen::DefaultDevice;

// this is the big memory allocation.
pools[0] = new AlignedMemoryPool("CPU forward memory", (mbs.used[0] << 20), &cpu_mem);
pools[1] = new AlignedMemoryPool("CPU backward memory", (mbs.used[1] << 20), &cpu_mem);
pools[2] = new AlignedMemoryPool("CPU parameter memory", (mbs.used[2] << 20), shmem);
pools[3] = new AlignedMemoryPool("CPU scratch memory", (mbs.used[3] << 20), &cpu_mem);
const size_t initial = 1<<24;
pools[0] = new AlignedMemoryPool("CPU forward memory", (mbs.used[0] << 20), &cpu_mem, initial, dynamic);
pools[1] = new AlignedMemoryPool("CPU backward memory", (mbs.used[1] << 20), &cpu_mem, initial, dynamic);
pools[2] = new AlignedMemoryPool("CPU parameter memory", (mbs.used[2] << 20), shmem, initial, dynamic);
pools[3] = new AlignedMemoryPool("CPU scratch memory", (mbs.used[3] << 20), &cpu_mem, initial, dynamic);
}

Device_CPU::~Device_CPU() {}
Expand Down
2 changes: 1 addition & 1 deletion dynet/devices.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class Device_GPU : public Device {
class Device_CPU : public Device {
public:
typedef Eigen::DefaultDevice EigenDevice;
explicit Device_CPU(int my_id, const DeviceMempoolSizes & mb, bool shared);
explicit Device_CPU(int my_id, const DeviceMempoolSizes & mb, bool shared, bool dynamic);
~Device_CPU();
CPUAllocator cpu_mem;
Eigen::DefaultDevice* edevice;
Expand Down
4 changes: 2 additions & 2 deletions dynet/dynet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ ComputationGraph::ComputationGraph() {
} else {
ee.reset(new SimpleExecutionEngine(*this));
}
if (n_hgs > 0) {
if (!default_device->pools[0]->is_dynamic() && n_hgs > 0) {
cerr << "Memory allocator assumes only a single ComputationGraph at a time.\n";
throw std::runtime_error("Attempted to create >1 CG");
}
Expand All @@ -132,7 +132,7 @@ ComputationGraph::ComputationGraph(bool batched) {
} else {
ee.reset(new SimpleExecutionEngine(*this));
}
if (n_hgs > 0) {
if (!default_device->pools[0]->is_dynamic() && n_hgs > 0) {
cerr << "Memory allocator assumes only a single ComputationGraph at a time.\n";
throw std::runtime_error("Attempted to create >1 CG");
}
Expand Down
48 changes: 41 additions & 7 deletions dynet/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,26 @@ vector<const Tensor*> ExecutionEngine::forward(
return ret;
}

SimpleExecutionEngine::SimpleExecutionEngine(const ComputationGraph& cg) :
ExecutionEngine(cg), num_nodes_evaluated(0) {
if (default_device->pools[0]->is_dynamic()) {
mem = new CPUAllocator();
pool_fxs = new AlignedMemoryPool("CPU forward memory", 1 << 24, mem, 1 << 24, true);
pool_dEdfs = new AlignedMemoryPool("CPU backward memory", 1 << 24, mem, 1 << 24, true);
} else {
pool_fxs = default_device->pools[(int)DeviceMempool::FXS];
pool_dEdfs = default_device->pools[(int)DeviceMempool::DEDFS];
}
}

SimpleExecutionEngine::~SimpleExecutionEngine() {
if (default_device->pools[0]->is_dynamic()) {
delete pool_fxs;
delete pool_dEdfs;
delete mem;
}
}

void SimpleExecutionEngine::invalidate() {
num_nodes_evaluated = 0;
backward_computed = 0;
Expand Down Expand Up @@ -96,6 +116,15 @@ const Tensor& SimpleExecutionEngine::incremental_forward(VariableIndex i) {
string current_node_name; // Optionally used for debugging (reused).
vector<const Tensor*> xs(16); // Container for arguments to nodes (reused).

unsigned size = 0;
void* begin;
for (unsigned j = num_nodes_evaluated; j <= i; ++j) {
const Node* node = cg.nodes[j];
auto rounded_n = pool_fxs->round_up_align(node->dim.size() * sizeof(float));
size += rounded_n;
}
begin = pool_fxs->allocate(size);

for (; num_nodes_evaluated <= i; ++num_nodes_evaluated) {
const Node* node = cg.nodes[num_nodes_evaluated];
if (profiling_flag) {
Expand Down Expand Up @@ -144,7 +173,6 @@ const Tensor& SimpleExecutionEngine::incremental_forward(VariableIndex i) {
DYNET_RUNTIME_ERR("Ran out of auxiliary memory when executing node "
<< num_nodes_evaluated);
}
node->aux_mem = aux_mem;

// Compute f(xs) and store to node_fx.
node->forward(xs, node_fx);
Expand Down Expand Up @@ -173,12 +201,21 @@ void SimpleExecutionEngine::backward(VariableIndex from_where, bool full) {

const unsigned num_nodes = from_where + 1;
ndEdfs.resize(num_nodes);
const vector<Device*> &devices = device_manager->get_devices();
for(Device* device : devices)
device->pools[(int)DeviceMempool::DEDFS]->free();
pool_dEdfs->free();

// This loop allocates memory on the appropriate devices for the nodes whose
// derivatives will be computed.
// This assumes all of these use the same device!
unsigned size = 0;
void* begin;
for (unsigned i = 0; i < num_nodes; ++i) {
const Node* node = cg.nodes[i];
auto rounded_n = pool_dEdfs->round_up_align(node->dim.size() * sizeof(float));
size += rounded_n;
}
begin = pool_dEdfs->allocate(size);
pool_dEdfs->zero_allocated_memory();

for (unsigned i = 0; i < num_nodes; ++i) {
const auto dim = nfxs[i].d;
auto& node_dEdfx = ndEdfs[i];
Expand All @@ -203,9 +240,6 @@ void SimpleExecutionEngine::backward(VariableIndex from_where, bool full) {
}
}
}
// Zero all derivative memory (which is contiguous on each device)
for (Device* device : devices)
device->pools[(int)DeviceMempool::DEDFS]->zero_allocated_memory();

// initialize dE/dE = 1
ndEdfs.back().v = cg.nodes.back()->device->kSCALAR_ONE;
Expand Down
12 changes: 10 additions & 2 deletions dynet/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#define DYNET_EXEC_H

#include "dynet/dynet.h"
#include "dynet/aligned-mem-pool.h"
#include <memory>

namespace dynet {

Expand All @@ -24,6 +26,10 @@ class ExecutionEngine {
virtual const Tensor& get_gradient(VariableIndex i) = 0;
virtual void backward(bool full = false) = 0;
virtual void backward(VariableIndex i, bool full = false) = 0;
AlignedMemoryPool* pool_fxs;
AlignedMemoryPool* pool_dEdfs;
MemAllocator* mem = nullptr;

protected:
explicit ExecutionEngine(const ComputationGraph& cg);
DeviceManager* const device_manager;
Expand All @@ -33,8 +39,10 @@ class ExecutionEngine {

class SimpleExecutionEngine : public ExecutionEngine {
public:
explicit SimpleExecutionEngine(const ComputationGraph& cg) :
ExecutionEngine(cg), num_nodes_evaluated(0) {}
explicit SimpleExecutionEngine(const ComputationGraph& cg);

~SimpleExecutionEngine();

void invalidate() override;
void invalidate(unsigned i) override;
const Tensor& forward() override;
Expand Down
4 changes: 4 additions & 0 deletions dynet/expr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ std::string Expression::get_device_name() const {
return pg->nodes[i]->device->name;
}

const bool Expression::is_dynamic() const {
return default_device->pools[0]->is_dynamic();
}

Expression input(ComputationGraph& g, real s, Device *device) { return Expression(&g, g.add_input(s, device)); }
Expression input(ComputationGraph& g, const real *ps, Device *device) { return Expression(&g, g.add_input(ps, device)); }
Expression input(ComputationGraph& g, const Dim& d, const vector<float>& data, Device *device) { return Expression(&g, g.add_input(d, data, device)); }
Expand Down
Loading