Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 47 additions & 46 deletions src/realm/barrier_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ namespace Realm {
size_t num_participants, ReductionOpID redop_id,
const void *initial_value, size_t initial_value_size)
{
assert(0);
abort();
// TODO(apryakhin@): Implement me
return Barrier();
}

Barrier Barrier::set_arrival_pattern(const Barrier::ParticipantInfo *expected_arrivals,
size_t num_participants)
{
assert(0);
abort();
// TODO(apryakhin@): Implement me
BarrierImpl *impl = get_barrier_impl(*this);
return impl->current_barrier();
Expand All @@ -88,7 +88,7 @@ namespace Realm {
ID nextid(id);
EventImpl::gen_t gen = ID(id).barrier_generation() + 1;
#ifdef DEBUG_REALM
assert(MAX_PHASES <= nextid.barrier_generation().MAXVAL);
REALM_ASSERT(MAX_PHASES <= nextid.barrier_generation().MAXVAL);
#endif
// return NO_BARRIER if the count overflows
if(gen > MAX_PHASES) {
Expand Down Expand Up @@ -155,14 +155,14 @@ namespace Realm {
size_t initial_value_size /*= 0*/)
{
BarrierImpl *impl = get_runtime()->local_barrier_free_list->alloc_entry();
assert(impl);
assert(ID(impl->me).is_barrier());
REALM_ASSERT(impl);
REALM_ASSERT(ID(impl->me).is_barrier());

// set the arrival count
impl->base_arrival_count = expected_arrivals;

if(redopid == 0) {
assert(initial_value_size == 0);
REALM_ASSERT(initial_value_size == 0);
impl->redop_id = 0;
impl->redop = 0;
impl->initial_value = 0;
Expand All @@ -176,8 +176,8 @@ namespace Realm {
abort();
}

assert(initial_value != 0);
assert(initial_value_size == impl->redop->sizeof_lhs);
REALM_ASSERT(initial_value != 0);
REALM_ASSERT(initial_value_size == impl->redop->sizeof_lhs);

impl->initial_value = std::make_unique<char[]>(initial_value_size);
memcpy(impl->initial_value.get(), initial_value, initial_value_size);
Expand Down Expand Up @@ -240,7 +240,7 @@ namespace Realm {
Serialization::FixedBufferDeserializer fbd(data, datalen);
BarrierTriggerMessageArgs trigger_args;
bool ok = fbd & trigger_args;
assert(ok);
REALM_ASSERT(ok);

data = (char *)data + (datalen - fbd.bytes_left());
datalen = fbd.bytes_left();
Expand Down Expand Up @@ -433,7 +433,7 @@ namespace Realm {

// is there any data we need to store?
if(datalen) {
assert(args.redop_id != 0);
REALM_ASSERT(args.redop_id != 0);

// TODO: deal with invalidation of previous instance of a barrier
impl->redop_id = args.redop_id;
Expand All @@ -445,19 +445,19 @@ namespace Realm {
impl->first_generation = args.first_generation;

int rel_gen = trigger_gen - impl->first_generation;
assert(rel_gen > 0);
REALM_ASSERT(rel_gen > 0);
if(impl->value_capacity < (size_t)rel_gen) {
size_t new_capacity = rel_gen;
impl->final_values.resize(new_capacity * impl->redop->sizeof_lhs);
// no need to initialize new entries - we'll overwrite them now or when data
// does show up
impl->value_capacity = new_capacity;
}
assert(args.trigger_gen <= trigger_gen);
REALM_ASSERT(args.trigger_gen <= trigger_gen);
// trigger_gen might have changed so make sure you use args.trigger_gen here
assert(datalen ==
REALM_ASSERT(datalen ==
(impl->redop->sizeof_lhs * (args.trigger_gen - args.previous_gen)));
assert(args.previous_gen >= impl->first_generation);
REALM_ASSERT(args.previous_gen >= impl->first_generation);
memcpy(impl->final_values.data() + ((args.previous_gen - impl->first_generation) *
impl->redop->sizeof_lhs),
data, datalen);
Expand Down Expand Up @@ -527,8 +527,9 @@ namespace Realm {
{
remote_subscribe_gens.clear();
remote_trigger_gens.clear();
assert(get_runtime()->get_module_config("core")->get_property(
"barrier_broadcast_radix", broadcast_radix) == REALM_SUCCESS);
RealmStatus status = get_runtime()->get_module_config("core")->get_property(
"barrier_broadcast_radix", broadcast_radix);
REALM_ASSERT(status == REALM_SUCCESS);
}

BarrierImpl::BarrierImpl(BarrierCommunicator *_barrier_comm, int _broadcast_radix)
Expand Down Expand Up @@ -567,7 +568,7 @@ namespace Realm {
return 0;
}
void *dst = malloc(datalen);
assert(dst != 0);
REALM_ASSERT(dst != 0);
memcpy(dst, data, datalen);
return dst;
}
Expand All @@ -594,7 +595,7 @@ namespace Realm {
virtual void event_triggered(bool poisoned, TimeLimit work_until)
{
// TODO: handle poison
assert(poisoned == POISON_FIXME);
REALM_ASSERT(poisoned == POISON_FIXME);
log_barrier.info() << "deferred barrier arrival: " << barrier << " ("
<< barrier.timestamp << "), delta=" << delta;
BarrierImpl *impl = get_runtime()->get_barrier_impl(barrier);
Expand Down Expand Up @@ -677,7 +678,7 @@ namespace Realm {
static inline void get_broadcast_targets(NodeID local, int num_peers, int radix,
std::vector<NodeID> &broadcast_targets)
{
assert(broadcast_targets.empty());
REALM_ASSERT(broadcast_targets.empty());
for(int i = 1; i <= radix; i++) {
NodeID target = local * radix + i;
if(num_peers <= (target - 1))
Expand Down Expand Up @@ -729,9 +730,9 @@ namespace Realm {

Serialization::DynamicBufferSerializer dbs_first(remaining_data_size + header_size);
bool ok = dbs_first & trigger_args;
assert(ok);
REALM_ASSERT(ok);
ok = dbs_first & payload;
assert(ok);
REALM_ASSERT(ok);

size_t max_payload_size = barrier_comm->recommend_max_payload(
ordered_notifications[target].node, sizeof(BarrierTriggerMessage));
Expand Down Expand Up @@ -838,11 +839,11 @@ namespace Realm {

// sanity checks - is this a valid barrier?
// assert(generation < free_generation);
assert(base_arrival_count > 0);
REALM_ASSERT(base_arrival_count > 0);

// update whatever generation we're told to
{
assert(barrier_gen > generation.load());
REALM_ASSERT(barrier_gen > generation.load());
Generation *g;
std::map<gen_t, Generation *>::iterator it = generations.find(barrier_gen);
if(it != generations.end()) {
Expand Down Expand Up @@ -946,12 +947,12 @@ namespace Realm {
// adjustment is
// being held - no need to have lots of reduce values lying around
if(reduce_value_size > 0) {
assert(redop != 0);
assert(redop->sizeof_rhs == reduce_value_size);
REALM_ASSERT(redop != 0);
REALM_ASSERT(redop->sizeof_rhs == reduce_value_size);

// do we have space for this reduction result yet?
int rel_gen = barrier_gen - first_generation;
assert(rel_gen > 0);
REALM_ASSERT(rel_gen > 0);

if(value_capacity < static_cast<size_t>(rel_gen)) {
size_t new_capacity = rel_gen;
Expand All @@ -976,7 +977,7 @@ namespace Realm {
// we have something stable after we let go of the lock
if(trigger_gen && redop) {
int rel_gen = oldest_previous + 1 - first_generation;
assert(rel_gen > 0);
REALM_ASSERT(rel_gen > 0);
int count = trigger_gen - oldest_previous;
final_values_copy =
bytedup(final_values.data() + ((rel_gen - 1) * redop->sizeof_lhs),
Expand Down Expand Up @@ -1271,9 +1272,9 @@ namespace Realm {

// find the right generation - this should not fail
std::map<gen_t, Generation *>::iterator it = generations.find(needed_gen);
assert(it != generations.end());
REALM_ASSERT(it != generations.end());
bool ok = it->second->local_waiters.erase(waiter);
assert(ok);
REALM_ASSERT(ok);
return true;
}

Expand Down Expand Up @@ -1313,7 +1314,7 @@ namespace Realm {
}

// make sure the subscription is for this "lifetime" of the barrier
assert(subscribe_gen > first_generation);
REALM_ASSERT(subscribe_gen > first_generation);

bool already_subscribed = false;
{
Expand All @@ -1322,7 +1323,7 @@ namespace Realm {
if(it != remote_subscribe_gens.end()) {
// a valid subscription should always be for a generation that hasn't
// triggered yet
assert(it->second > active_generation);
REALM_ASSERT(it->second > active_generation);
if(it->second >= subscribe_gen) {
already_subscribed = true;
} else {
Expand Down Expand Up @@ -1354,7 +1355,7 @@ namespace Realm {

if(redop) {
int rel_gen = previous_gen + 1 - first_generation;
assert(rel_gen > 0);
REALM_ASSERT(rel_gen > 0);
final_values_size = (trigger_gen - previous_gen) * redop->sizeof_lhs;
final_values_copy =
bytedup(final_values.data() + ((rel_gen - 1) * redop->sizeof_lhs),
Expand Down Expand Up @@ -1397,14 +1398,14 @@ namespace Realm {
sizeof(BarrierTriggerMessageArgsInternal) + sizeof(size_t);
Serialization::DynamicBufferSerializer dbs(header_size);
bool ok = dbs & trigger_args;
assert(ok);
REALM_ASSERT(ok);
if(final_values_size > 0) {
BarrierTriggerPayload payload;
payload.reduction.insert(
payload.reduction.end(), static_cast<const char *>(final_values_copy),
static_cast<const char *>(final_values_copy) + final_values_size);
ok = dbs & payload;
assert(ok);
REALM_ASSERT(ok);
}

size_t max_payload_size = barrier_comm->recommend_max_payload(
Expand Down Expand Up @@ -1432,22 +1433,22 @@ namespace Realm {
BarrierTriggerPayload payload;

if(datalen > 0) {
assert(data);
REALM_ASSERT(data);
Serialization::FixedBufferDeserializer payload_fbd(data, datalen);
bool ok = payload_fbd & payload;
assert(ok);
REALM_ASSERT(ok);
}

if(!payload.reduction.empty()) {
assert(redop_id != 0);
REALM_ASSERT(redop_id != 0);
if(redop == 0) {
log_barrier.fatal() << "no reduction op registered for ID " << redop_id;
abort();
}
first_generation = first_gen;

int rel_gen = trigger_gen - first_generation;
assert(rel_gen > 0);
REALM_ASSERT(rel_gen > 0);
if(value_capacity < static_cast<size_t>(rel_gen)) {
size_t new_capacity = rel_gen;
final_values.resize(new_capacity * redop->sizeof_lhs);
Expand All @@ -1456,11 +1457,11 @@ namespace Realm {
value_capacity = new_capacity;
}

assert(original_gen <= trigger_gen);
REALM_ASSERT(original_gen <= trigger_gen);
// trigger_gen might have changed so make sure you use original_gen here
assert(payload.reduction.size() ==
REALM_ASSERT(payload.reduction.size() ==
(redop->sizeof_lhs * (original_gen - previous_gen)));
assert(previous_gen >= first_gen);
REALM_ASSERT(previous_gen >= first_gen);
memcpy(final_values.data() +
((previous_gen - first_generation) * redop->sizeof_lhs),
payload.reduction.data(), payload.reduction.size());
Expand Down Expand Up @@ -1570,12 +1571,12 @@ namespace Realm {

// if it has triggered, we should have the data
int rel_gen = result_gen - first_generation;
assert(rel_gen > 0);
assert((size_t)rel_gen <= value_capacity);
REALM_ASSERT(rel_gen > 0);
REALM_ASSERT((size_t)rel_gen <= value_capacity);

assert(redop != 0);
assert(value_size == redop->sizeof_lhs);
assert(value != 0);
REALM_ASSERT(redop != 0);
REALM_ASSERT(value_size == redop->sizeof_lhs);
REALM_ASSERT(value != 0);

std::memcpy(value, &final_values[(rel_gen - 1) * redop->sizeof_lhs],
redop->sizeof_lhs);
Expand Down
Loading
Loading