mirror of
https://github.com/boostorg/graph_parallel.git
synced 2026-02-09 23:22:11 +00:00
Merged r67041,67084,67110,67705-67706,67722,67724,67764 (bug fixes) from trunk, plus allowed Boost.Filesystem v2 to be used in boost/distributed/adjlist/serialization.hpp
[SVN r67967]
This commit is contained in:
@@ -9,6 +9,7 @@
|
||||
// Authors: Douglas Gregor
|
||||
// Andrew Lumsdaine
|
||||
// Matthias Troyer
|
||||
#include <boost/assert.hpp>
|
||||
#include <boost/graph/use_mpi.hpp>
|
||||
#include <boost/graph/distributed/mpi_process_group.hpp>
|
||||
#include <boost/mpi/environment.hpp>
|
||||
@@ -172,7 +173,7 @@ void handle_sync (mpi_process_group const& self, int source, int tag, int val,
|
||||
std::size_t stage = static_cast<std::size_t>(
|
||||
++self.impl_->synchronizing_stage[source]);
|
||||
|
||||
assert(source != process_id(self));
|
||||
BOOST_ASSERT(source != process_id(self));
|
||||
|
||||
#ifdef DEBUG
|
||||
std::ostringstream out;
|
||||
@@ -183,7 +184,7 @@ void handle_sync (mpi_process_group const& self, int source, int tag, int val,
|
||||
|
||||
// record how many still have messages to be sent
|
||||
if (self.impl_->synchronizing_unfinished.size()<=stage) {
|
||||
assert(self.impl_->synchronizing_unfinished.size() == stage);
|
||||
BOOST_ASSERT(self.impl_->synchronizing_unfinished.size() == stage);
|
||||
self.impl_->synchronizing_unfinished.push_back(val >= 0 ? 1 : 0);
|
||||
}
|
||||
else
|
||||
@@ -191,7 +192,7 @@ void handle_sync (mpi_process_group const& self, int source, int tag, int val,
|
||||
|
||||
// record how many are in that stage
|
||||
if (self.impl_->processors_synchronizing_stage.size()<=stage) {
|
||||
assert(self.impl_->processors_synchronizing_stage.size() == stage);
|
||||
BOOST_ASSERT(self.impl_->processors_synchronizing_stage.size() == stage);
|
||||
self.impl_->processors_synchronizing_stage.push_back(1);
|
||||
}
|
||||
else
|
||||
@@ -346,7 +347,7 @@ replace_on_synchronize_handler(const on_synchronize_event_type& handler)
|
||||
|
||||
int mpi_process_group::allocate_block(bool out_of_band_receive)
|
||||
{
|
||||
assert(!block_num);
|
||||
BOOST_ASSERT(!block_num);
|
||||
block_iterator i = impl_->blocks.begin();
|
||||
while (i != impl_->blocks.end() && *i) ++i;
|
||||
|
||||
@@ -372,7 +373,7 @@ bool mpi_process_group::maybe_emit_receive(int process, int encoded_tag) const
|
||||
{
|
||||
std::pair<int, int> decoded = decode_tag(encoded_tag);
|
||||
|
||||
assert (decoded.first < static_cast<int>(impl_->blocks.size()));
|
||||
BOOST_ASSERT (decoded.first < static_cast<int>(impl_->blocks.size()));
|
||||
|
||||
block_type* block = impl_->blocks[decoded.first];
|
||||
if (!block) {
|
||||
@@ -384,7 +385,7 @@ bool mpi_process_group::maybe_emit_receive(int process, int encoded_tag) const
|
||||
if (impl_->blocks[i])
|
||||
std::cerr << i << ' ';
|
||||
std::cerr << std::endl;
|
||||
assert(block);
|
||||
BOOST_ASSERT(block);
|
||||
}
|
||||
|
||||
if (decoded.second < static_cast<int>(block->triggers.size())
|
||||
@@ -414,7 +415,7 @@ bool mpi_process_group::emit_receive(int process, int encoded_tag) const
|
||||
|
||||
// Find the block that will receive this message
|
||||
block_type* block = impl_->blocks[decoded.first];
|
||||
assert(block);
|
||||
BOOST_ASSERT(block);
|
||||
if (decoded.second < static_cast<int>(block->triggers.size())
|
||||
&& block->triggers[decoded.second])
|
||||
// We have a trigger for this message; use it
|
||||
@@ -526,7 +527,7 @@ mpi_process_group::send_batch(process_id_type dest,
|
||||
// we increment the number of batches sent
|
||||
++impl_->number_sent_batches[dest];
|
||||
// and send the batch
|
||||
assert(outgoing.headers.size() <= impl_->batch_header_number);
|
||||
BOOST_ASSERT(outgoing.headers.size() <= impl_->batch_header_number);
|
||||
if (id != dest) {
|
||||
#ifdef NO_ISEND_BATCHES
|
||||
impl::batch_request req;
|
||||
@@ -559,7 +560,7 @@ mpi_process_group::send_batch(process_id_type dest,
|
||||
MPI_Isend(const_cast<void*>(oa.address()), oa.size(),
|
||||
MPI_PACKED, dest, tag, impl_->comm,
|
||||
&req.request);
|
||||
assert(result == MPI_SUCCESS);
|
||||
BOOST_ASSERT(result == MPI_SUCCESS);
|
||||
impl_->max_sent = (std::max)(impl_->max_sent,impl_->sent_batches.size());
|
||||
#ifdef NO_ISEND_BATCHES
|
||||
int done=0;
|
||||
@@ -794,10 +795,10 @@ void mpi_process_group::synchronize() const
|
||||
++impl_->synchronizing_stage[id];
|
||||
if (impl_->synchronizing_stage[id] != stage)
|
||||
std::cerr << "Expected stage " << stage << ", got " << impl_->synchronizing_stage[id] << std::endl;
|
||||
assert(impl_->synchronizing_stage[id]==stage);
|
||||
BOOST_ASSERT(impl_->synchronizing_stage[id]==stage);
|
||||
// record how many still have messages to be sent
|
||||
if (static_cast<int>(impl_->synchronizing_unfinished.size())<=stage) {
|
||||
assert(static_cast<int>(impl_->synchronizing_unfinished.size()) == stage);
|
||||
BOOST_ASSERT(static_cast<int>(impl_->synchronizing_unfinished.size()) == stage);
|
||||
impl_->synchronizing_unfinished.push_back(no_new_messages ? 0 : 1);
|
||||
}
|
||||
else
|
||||
@@ -805,7 +806,7 @@ void mpi_process_group::synchronize() const
|
||||
|
||||
// record how many are in that stage
|
||||
if (static_cast<int>(impl_->processors_synchronizing_stage.size())<=stage) {
|
||||
assert(static_cast<int>(impl_->processors_synchronizing_stage.size()) == stage);
|
||||
BOOST_ASSERT(static_cast<int>(impl_->processors_synchronizing_stage.size()) == stage);
|
||||
impl_->processors_synchronizing_stage.push_back(1);
|
||||
}
|
||||
else
|
||||
@@ -841,7 +842,7 @@ void mpi_process_group::synchronize() const
|
||||
|
||||
// check that everyone is at least here
|
||||
for (int source=0; source<p ; ++source)
|
||||
assert(impl_->synchronizing_stage[source] >= stage);
|
||||
BOOST_ASSERT(impl_->synchronizing_stage[source] >= stage);
|
||||
|
||||
// receive any batches sent in the meantime
|
||||
// all have to be available already
|
||||
@@ -857,7 +858,7 @@ void mpi_process_group::synchronize() const
|
||||
|
||||
#ifndef NO_IMMEDIATE_PROCESSING
|
||||
for (int source=0; source<p ; ++source)
|
||||
assert(impl_->number_received_batches[source] >= 0);
|
||||
BOOST_ASSERT(impl_->number_received_batches[source] >= 0);
|
||||
#endif
|
||||
|
||||
impl_->synchronizing = false;
|
||||
@@ -888,7 +889,7 @@ void mpi_process_group::synchronize() const
|
||||
<< source << ", got " << impl_->synchronizing_stage[source]
|
||||
<< std::endl;
|
||||
}
|
||||
assert(impl_->synchronizing_stage[source]==stage);
|
||||
BOOST_ASSERT(impl_->synchronizing_stage[source]==stage);
|
||||
}
|
||||
#endif
|
||||
std::fill(impl_->synchronizing_stage.begin(),
|
||||
@@ -900,10 +901,10 @@ void mpi_process_group::synchronize() const
|
||||
impl_->synchronizing_unfinished.clear();
|
||||
|
||||
for (process_id_type dest = 0; dest < p; ++dest)
|
||||
assert (impl_->outgoing[dest].headers.empty());
|
||||
BOOST_ASSERT (impl_->outgoing[dest].headers.empty());
|
||||
#ifndef NO_IMMEDIATE_PROCESSING
|
||||
for (int source=0; source<p ; ++source)
|
||||
assert (impl_->number_received_batches[source] == 0);
|
||||
BOOST_ASSERT (impl_->number_received_batches[source] == 0);
|
||||
#endif
|
||||
|
||||
impl_->free_sent_batches();
|
||||
@@ -952,7 +953,7 @@ void mpi_process_group::poll_requests(int block) const
|
||||
std::pair<int, int> decoded = decode_tag(statuses[i].MPI_TAG);
|
||||
block_type* block = impl_->blocks[decoded.first];
|
||||
|
||||
assert (decoded.second < static_cast<int>(block->triggers.size()) && block->triggers[decoded.second]);
|
||||
BOOST_ASSERT (decoded.second < static_cast<int>(block->triggers.size()) && block->triggers[decoded.second]);
|
||||
// We have a trigger for this message; use it
|
||||
trigger_receive_context old_context = impl_->trigger_context;
|
||||
impl_->trigger_context = trc_irecv_out_of_band;
|
||||
@@ -1033,7 +1034,7 @@ void mpi_process_group::impl::free_sent_batches()
|
||||
int result;
|
||||
while(it != sent_batches.end()) {
|
||||
result = MPI_Test(&it->request,&flag,MPI_STATUS_IGNORE);
|
||||
assert(result == MPI_SUCCESS);
|
||||
BOOST_ASSERT(result == MPI_SUCCESS);
|
||||
iterator next=it;
|
||||
++next;
|
||||
if (flag)
|
||||
@@ -1044,7 +1045,7 @@ void mpi_process_group::impl::free_sent_batches()
|
||||
for (std::size_t i=0; i< batch_pool.size();++i) {
|
||||
if(batch_pool[i].request != MPI_REQUEST_NULL) {
|
||||
result = MPI_Test(&batch_pool[i].request,&flag,MPI_STATUS_IGNORE);
|
||||
assert(result == MPI_SUCCESS);
|
||||
BOOST_ASSERT(result == MPI_SUCCESS);
|
||||
if (flag) {
|
||||
free_batches.push(i);
|
||||
batch_pool[i].request = MPI_REQUEST_NULL;
|
||||
@@ -1060,7 +1061,7 @@ mpi_process_group::install_trigger(int tag, int block,
|
||||
shared_ptr<trigger_base> const& launcher)
|
||||
{
|
||||
block_type* my_block = impl_->blocks[block];
|
||||
assert(my_block);
|
||||
BOOST_ASSERT(my_block);
|
||||
|
||||
// Make sure we have enough space in the structure for this trigger.
|
||||
if (launcher->tag() >= static_cast<int>(my_block->triggers.size()))
|
||||
@@ -1073,7 +1074,7 @@ mpi_process_group::install_trigger(int tag, int block,
|
||||
<< " already has a trigger for tag " << launcher->tag()
|
||||
<< std::endl;
|
||||
}
|
||||
assert(!my_block->triggers[launcher->tag()]);
|
||||
BOOST_ASSERT(!my_block->triggers[launcher->tag()]);
|
||||
|
||||
// Attach a new trigger launcher
|
||||
my_block->triggers[launcher->tag()] = launcher;
|
||||
@@ -1091,8 +1092,8 @@ void mpi_process_group::set_message_buffer_size(std::size_t s)
|
||||
void* ptr;
|
||||
if (!message_buffer.empty()) {
|
||||
MPI_Buffer_detach(&ptr,&sz);
|
||||
assert(ptr == &message_buffer.front());
|
||||
assert(static_cast<std::size_t>(sz) == message_buffer.size());
|
||||
BOOST_ASSERT(ptr == &message_buffer.front());
|
||||
BOOST_ASSERT(static_cast<std::size_t>(sz) == message_buffer.size());
|
||||
}
|
||||
else if (old_buffer != 0)
|
||||
MPI_Buffer_detach(&old_buffer,&old_buffer_size);
|
||||
|
||||
Reference in New Issue
Block a user