2
0
mirror of https://github.com/boostorg/mpi.git synced 2026-01-19 04:22:10 +00:00

checkpointing personnal stuf

This commit is contained in:
Alain Miniussi
2018-08-24 16:29:41 +02:00
parent e3851261a2
commit e62545bb2f
6 changed files with 121 additions and 419 deletions

View File

@@ -291,9 +291,6 @@ class BOOST_MPI_DECL communicator
template<typename T>
void send(int dest, int tag, const T& value) const;
template<typename T, typename A>
void send(int dest, int tag, const std::vector<T,A>& value) const;
/**
* @brief Send the skeleton of an object.
*
@@ -406,9 +403,6 @@ class BOOST_MPI_DECL communicator
template<typename T>
status recv(int source, int tag, T& value) const;
template<typename T, typename A>
status recv(int source, int tag, std::vector<T,A>& value) const;
/**
* @brief Receive a skeleton from a remote process.
*
@@ -603,9 +597,6 @@ class BOOST_MPI_DECL communicator
template<typename T>
request isend(int dest, int tag, const T* values, int n) const;
template<typename T, class A>
request isend(int dest, int tag, const std::vector<T,A>& values) const;
/**
* @brief Send a message to another process without any data
* without blocking.
@@ -690,9 +681,6 @@ class BOOST_MPI_DECL communicator
template<typename T>
request irecv(int source, int tag, T* values, int n) const;
template<typename T, typename A>
request irecv(int source, int tag, std::vector<T,A>& values) const;
/**
* @brief Initiate receipt of a message from a remote process that
* carries no data.
@@ -1100,38 +1088,6 @@ class BOOST_MPI_DECL communicator
request
array_irecv_impl(int source, int tag, T* values, int n, mpl::false_) const;
// We're sending/receivig a vector with associated MPI datatype.
// We need to send/recv the size and then the data and make sure
// blocking and non blocking method agrees on the format.
template<typename T, typename A>
request irecv_vector(int source, int tag, std::vector<T,A>& values,
mpl::true_) const;
template<typename T, class A>
request isend_vector(int dest, int tag, const std::vector<T,A>& values,
mpl::true_) const;
template<typename T, typename A>
void send_vector(int dest, int tag, const std::vector<T,A>& value,
mpl::true_) const;
template<typename T, typename A>
status recv_vector(int source, int tag, std::vector<T,A>& value,
mpl::true_) const;
// We're sending/receivig a vector with no associated MPI datatype.
// We need to send/recv it as an archive and make sure
// blocking and non blocking method agrees on the format.
template<typename T, typename A>
request irecv_vector(int source, int tag, std::vector<T,A>& values,
mpl::false_) const;
template<typename T, class A>
request isend_vector(int dest, int tag, const std::vector<T,A>& values,
mpl::false_) const;
template<typename T, typename A>
void send_vector(int dest, int tag, const std::vector<T,A>& value,
mpl::false_) const;
template<typename T, typename A>
status recv_vector(int source, int tag, std::vector<T,A>& value,
mpl::false_) const;
protected:
shared_ptr<MPI_Comm> comm_ptr;
};
@@ -1355,34 +1311,12 @@ communicator::array_send_impl(int dest, int tag, const T* values, int n,
mpl::false_) const
{
packed_oarchive oa(*this);
oa << n << boost::serialization::make_array(values, n);
for(int i = 0; i < n; ++i) {
oa << values[i];
}
send(dest, tag, oa);
}
template<typename T, typename A>
void communicator::send_vector(int dest, int tag,
const std::vector<T,A>& value, mpl::true_ true_type) const
{
// send the vector size
typename std::vector<T,A>::size_type size = value.size();
send(dest, tag, size);
// send the data
this->array_send_impl(dest, tag, value.data(), size, true_type);
}
template<typename T, typename A>
void communicator::send_vector(int dest, int tag,
const std::vector<T,A>& value, mpl::false_ false_type) const
{
this->send_impl(dest, tag, value, false_type);
}
template<typename T, typename A>
void communicator::send(int dest, int tag, const std::vector<T,A>& value) const
{
send_vector(dest, tag, value, is_mpi_datatype<T>());
}
// Array send must send the elements directly
template<typename T>
void communicator::send(int dest, int tag, const T* values, int n) const
@@ -1447,50 +1381,14 @@ communicator::array_recv_impl(int source, int tag, T* values, int n,
// Receive the message
packed_iarchive ia(*this);
status stat = recv(source, tag, ia);
// Determine how much data we are going to receive
int count;
ia >> count;
// Deserialize the data in the message
boost::serialization::array_wrapper<T> arr(values, count > n? n : count);
ia >> arr;
if (count > n) {
boost::throw_exception(
std::range_error("communicator::recv: message receive overflow"));
for(int i = 0; i < n; ++i) {
ia >> values[i];
}
stat.m_count = count;
stat.m_count = n;
return stat;
}
template<typename T, typename A>
status communicator::recv_vector(int source, int tag,
std::vector<T,A>& value, mpl::true_ true_type) const
{
// receive the vector size
typename std::vector<T,A>::size_type size = 0;
recv(source, tag, size);
// size the vector
value.resize(size);
// receive the data
return this->array_recv_impl(source, tag, value.data(), size, true_type);
}
template<typename T, typename A>
status communicator::recv_vector(int source, int tag,
std::vector<T,A>& value, mpl::false_ false_type) const
{
return this->recv_impl(source, tag, value, false_type);
}
template<typename T, typename A>
status communicator::recv(int source, int tag, std::vector<T,A>& value) const
{
return recv_vector(source, tag, value, is_mpi_datatype<T>());
}
// Array receive must receive the elements directly into a buffer.
template<typename T>
status communicator::recv(int source, int tag, T* values, int n) const
@@ -1572,35 +1470,6 @@ request communicator::isend(int dest, int tag, const T& value) const
return this->isend_impl(dest, tag, value, is_mpi_datatype<T>());
}
template<typename T, class A>
request communicator::isend(int dest, int tag, const std::vector<T,A>& values) const
{
return this->isend_vector(dest, tag, values, is_mpi_datatype<T>());
}
template<typename T, class A>
request
communicator::isend_vector(int dest, int tag, const std::vector<T,A>& values,
mpl::true_) const
{
std::size_t size = values.size();
request req = this->isend_impl(dest, tag, size, mpl::true_());
BOOST_MPI_CHECK_RESULT(MPI_Isend,
(const_cast<T*>(values.data()), size,
get_mpi_datatype<T>(),
dest, tag, MPI_Comm(*this), req.m_request.get()));
return req;
}
template<typename T, class A>
request
communicator::isend_vector(int dest, int tag, const std::vector<T,A>& values,
mpl::false_ no) const
{
return this->isend_impl(dest, tag, values, no);
}
template<typename T>
request
communicator::array_isend_impl(int dest, int tag, const T* values, int n,
@@ -1620,7 +1489,9 @@ communicator::array_isend_impl(int dest, int tag, const T* values, int n,
mpl::false_) const
{
shared_ptr<packed_oarchive> archive(new packed_oarchive(*this));
*archive << n << boost::serialization::make_array(values, n);
for(int i = 0; i < n; ++i) {
*archive << values[i];
}
request result = isend(dest, tag, *archive);
result.m_data = archive;
return result;
@@ -1796,29 +1667,6 @@ communicator::array_irecv_impl(int source, int tag, T* values, int n,
return request(source, tag, *this, values, n);
}
template<typename T, class A>
request
communicator::irecv_vector(int source, int tag, std::vector<T,A>& values,
mpl::true_) const
{
return request(source, tag, *this, values);
}
template<typename T, class A>
request
communicator::irecv_vector(int source, int tag, std::vector<T,A>& values,
mpl::false_ no) const
{
return irecv_impl(source, tag, values, no);
}
template<typename T, typename A>
request
communicator::irecv(int source, int tag, std::vector<T,A>& values) const
{
return irecv_vector(source, tag, values, is_mpi_datatype<T>());
}
// Array receive must receive the elements directly into a buffer.
template<typename T>
request communicator::irecv(int source, int tag, T* values, int n) const

View File

@@ -14,6 +14,8 @@
#include <boost/mpi/config.hpp>
#include <vector>
#include <memory>
#include <cstdio>
#include <iterator> // for std::iterator_traits
#include <boost/optional.hpp>
#include <utility> // for std::pair
@@ -88,7 +90,8 @@ optional<std::pair<status, ForwardIterator> >
test_any(ForwardIterator first, ForwardIterator last)
{
while (first != last) {
if (optional<status> result = first->test()) {
optional<status> result = first->test();
if (result) {
return std::make_pair(*result, first);
}
++first;
@@ -124,9 +127,34 @@ template<typename ForwardIterator, typename OutputIterator>
OutputIterator
wait_all(ForwardIterator first, ForwardIterator last, OutputIterator out)
{
for(ForwardIterator it = first; it != last; ++it) {
*out++ = it->wait();
std::vector<request*> requests(std::distance(first, last));
for(int i = 0; i < requests.size(); ++i) {
requests[i] = &(*first++);
}
std::vector<status> statuses(requests.size());
typedef std::vector<request*>::iterator vriter;
int pending;
do {
pending = 0;
for(int i = 0; i < requests.size(); ++i) {
if (requests[i]) {
request& req = *requests[i];
if (!req.active()) {
statuses[i] = status::empty_status();
} else {
optional<status> stat = req.test();
if (stat) {
printf("Proc %i got msg %d\n", communicator().rank(), stat->tag());
statuses[i] = *stat;
requests[i] = 0;
} else {
++pending;
}
}
}
}
} while(pending>0);
std::copy(statuses.begin(), statuses.end(), out);
return out;
}
@@ -137,9 +165,8 @@ template<typename ForwardIterator>
void
wait_all(ForwardIterator first, ForwardIterator last)
{
for(ForwardIterator it = first; it != last; ++it) {
it->wait();
}
std::vector<status> statuses(std::distance(first, last));
wait_all(first, last, statuses.begin());
}
/**

View File

@@ -139,7 +139,7 @@ class BOOST_MPI_DECL request
template<class T> struct probe_info_const_skeleton_proxy;
template<class T> struct probe_info_skeleton_proxy;
public: // while debuging
shared_ptr<MPI_Request> m_request;
shared_ptr<probe_info_base> m_probe_info;
shared_ptr<void> m_data;

View File

@@ -21,7 +21,9 @@
#include <boost/mpi/datatype.hpp>
#include <boost/mpi/exception.hpp>
#include <boost/mpi/detail/antiques.hpp>
#include <boost/mpi/communicator.hpp>
#include <cassert>
#include <cstdio>
namespace boost { namespace mpi { namespace detail {
@@ -38,12 +40,16 @@ packed_archive_send(MPI_Comm comm, int dest, int tag,
int
packed_archive_isend(MPI_Comm comm, int dest, int tag,
const packed_oarchive& ar,
MPI_Request& out_requests)
MPI_Request& out_request)
{
BOOST_MPI_CHECK_RESULT(MPI_Isend,
(detail::unconst(ar.address()), ar.size(),
MPI_PACKED,
dest, tag, comm, &out_requests));
dest, tag, comm, &out_request));
int flag;
MPI_Status stat;
MPI_Request_get_status(out_request, &flag, &stat);
printf("Proc %i ISending message %i to %i\n", communicator().rank(), tag, dest);
return 1;
}

View File

@@ -29,18 +29,19 @@ status request::wait()
m_request.reset();
m_data.reset();
} else if (bool(m_probe_info)) {
BOOST_MPI_CHECK_RESULT(MPI_Mprobe, (m_probe_info->m_source, m_probe_info->m_tag,
m_probe_info->m_comm,
&m_probe_info->m_message,
probe_info_base& info = *m_probe_info;
BOOST_MPI_CHECK_RESULT(MPI_Mprobe, (info.m_source, info.m_tag,
info.m_comm,
&info.m_message,
&stat.m_status));
int count;
BOOST_MPI_CHECK_RESULT(MPI_Get_count, (&stat.m_status, MPI_PACKED, &count));
packed_iarchive& ar = m_probe_info->archive();
packed_iarchive& ar = info.archive();
ar.resize(count);
BOOST_MPI_CHECK_RESULT(MPI_Mrecv,
(ar.address(), ar.size(), MPI_PACKED,
&m_probe_info->m_message, &stat.m_status));
m_probe_info->deserialize(stat);
&info.m_message, &stat.m_status));
info.deserialize(stat);
m_probe_info.reset();
m_data.reset();
} else {
@@ -54,33 +55,53 @@ optional<status> request::test()
status stat;
int flag = 0;
if (bool(m_request)) {
BOOST_MPI_CHECK_RESULT(MPI_Test, (m_request.get(), &flag, &stat.m_status));
if (flag) {
if (*m_request == MPI_REQUEST_NULL) {
m_request.reset();
m_data.reset();
return optional<status>(stat);
} else {
return optional<status>();
MPI_Request_get_status(*m_request.get(), &flag, &stat.m_status);
printf("Proc %i: looking into msg %i\n", communicator().rank(), stat.m_status.MPI_TAG);
BOOST_MPI_CHECK_RESULT(MPI_Test, (m_request.get(), &flag, &stat.m_status));
if (flag) {
printf("Proc %i: concluded msg %i\n", communicator().rank(), stat.m_status.MPI_TAG);
m_request.reset();
m_data.reset();
return optional<status>(stat);
} else {
printf("Prod %i: pending msg %i\n", communicator().rank(), stat.m_status.MPI_TAG);
return optional<status>();
}
}
} else if (bool(m_probe_info)) {
BOOST_MPI_CHECK_RESULT(MPI_Improbe, (m_probe_info->m_source, m_probe_info->m_tag,
m_probe_info->m_comm, &flag,
&m_probe_info->m_message,
&stat.m_status));
probe_info_base& info = *m_probe_info;
/*BOOST_MPI_CHECK_RESULT(MPI_Improbe, (info.m_source, info.m_tag,
info.m_comm, &flag,
&info.m_message,
&stat.m_status));*/
int source = info.m_source;
int err = MPI_Improbe(source, info.m_tag,
info.m_comm, &flag,
&info.m_message,
&stat.m_status);
if (err != MPI_SUCCESS) { std::abort(); }
if (flag) {
printf("Proc %i: probed msg %i from %i completed\n", communicator().rank(), info.m_tag, info.m_source);
// message is arrived
int count;
BOOST_MPI_CHECK_RESULT(MPI_Get_count, (&stat.m_status, MPI_PACKED, &count));
packed_iarchive& ar = m_probe_info->archive();
packed_iarchive& ar = info.archive();
ar.resize(count);
BOOST_MPI_CHECK_RESULT(MPI_Mrecv,
(ar.address(), ar.size(), MPI_PACKED,
&m_probe_info->m_message, &stat.m_status));
m_probe_info->deserialize(stat);
&info.m_message, &stat.m_status));
info.deserialize(stat);
m_probe_info.reset();
m_data.reset();
return optional<status>(stat);
} else {
printf("Proc %i: probed msg %i from %i pending\n", communicator().rank(), info.m_tag, info.m_source);
return optional<status>();
}
} else {

View File

@@ -1,18 +1,8 @@
// Copyright (C) 2006 Douglas Gregor.
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// A test of the nonblocking point-to-point operations.
#include <boost/mpi/nonblocking.hpp>
#include <boost/mpi/communicator.hpp>
#include <boost/mpi/environment.hpp>
#include <boost/test/minimal.hpp>
#include "gps_position.hpp"
#include <boost/lexical_cast.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/list.hpp>
#include <iterator>
#include <algorithm>
@@ -20,229 +10,39 @@ using boost::mpi::communicator;
using boost::mpi::request;
using boost::mpi::status;
enum method_kind {
mk_wait_any, mk_test_any, mk_wait_all, mk_wait_all_keep,
mk_test_all, mk_test_all_keep, mk_wait_some, mk_wait_some_keep,
mk_test_some, mk_test_some_keep,
mk_test_size
};
static const char* method_kind_names[mk_test_size] = {
"wait_any",
"test_any",
"wait_all",
"wait_all (keep results)",
"test_all",
"test_all (keep results)",
"wait_some",
"wait_some (keep results)",
"test_some",
"test_some (keep results)"
};
template<typename T>
void
nonblocking_tests( const communicator& comm, const T* values, int num_values,
const char* kind, bool composite)
{
nonblocking_test(comm, values, num_values, kind, mk_wait_any);
nonblocking_test(comm, values, num_values, kind, mk_test_any);
nonblocking_test(comm, values, num_values, kind, mk_wait_all);
nonblocking_test(comm, values, num_values, kind, mk_wait_all_keep);
if (!composite) {
nonblocking_test(comm, values, num_values, kind, mk_test_all);
nonblocking_test(comm, values, num_values, kind, mk_test_all_keep);
}
nonblocking_test(comm, values, num_values, kind, mk_wait_some);
nonblocking_test(comm, values, num_values, kind, mk_wait_some_keep);
nonblocking_test(comm, values, num_values, kind, mk_test_some);
nonblocking_test(comm, values, num_values, kind, mk_test_some_keep);
}
template<typename T>
void
nonblocking_test(const communicator& comm, const T* values, int num_values,
const char* kind, method_kind method)
{
using boost::mpi::wait_any;
using boost::mpi::test_any;
using boost::mpi::wait_all;
using boost::mpi::test_all;
using boost::mpi::wait_some;
using boost::mpi::test_some;
if (comm.rank() == 0) {
std::cout << "Testing " << method_kind_names[method]
<< " with " << kind << "...";
std::cout.flush();
}
typedef std::pair<status, std::vector<request>::iterator>
status_iterator_pair;
T incoming_value;
std::vector<T> incoming_values(num_values);
std::vector<request> reqs;
// Send/receive the first value
reqs.push_back(comm.isend((comm.rank() + 1) % comm.size(), 0, values[0]));
reqs.push_back(comm.irecv((comm.rank() + comm.size() - 1) % comm.size(),
0, incoming_value));
if (method != mk_wait_any && method != mk_test_any) {
#ifndef LAM_MPI
// We've run into problems here (with 0-length messages) with
// LAM/MPI on Mac OS X and x86-86 Linux. Will investigate
// further at a later time, but the problem only seems to occur
// when using shared memory, not TCP.
// Send/receive an empty message
reqs.push_back(comm.isend((comm.rank() + 1) % comm.size(), 1));
reqs.push_back(comm.irecv((comm.rank() + comm.size() - 1) % comm.size(),
1));
#endif
// Send/receive an array
reqs.push_back(comm.isend((comm.rank() + 1) % comm.size(), 2, values,
num_values));
reqs.push_back(comm.irecv((comm.rank() + comm.size() - 1) % comm.size(),
2, &incoming_values.front(), num_values));
}
switch (method) {
case mk_wait_any:
if (wait_any(reqs.begin(), reqs.end()).second == reqs.begin())
reqs[1].wait();
else
reqs[0].wait();
break;
case mk_test_any:
{
boost::optional<status_iterator_pair> result;
do {
result = test_any(reqs.begin(), reqs.end());
} while (!result);
if (result->second == reqs.begin())
reqs[1].wait();
else
reqs[0].wait();
break;
}
case mk_wait_all:
wait_all(reqs.begin(), reqs.end());
break;
case mk_wait_all_keep:
{
std::vector<status> stats;
wait_all(reqs.begin(), reqs.end(), std::back_inserter(stats));
}
break;
case mk_test_all:
while (!test_all(reqs.begin(), reqs.end())) { /* Busy wait */ }
break;
case mk_test_all_keep:
{
std::vector<status> stats;
while (!test_all(reqs.begin(), reqs.end(), std::back_inserter(stats)))
/* Busy wait */;
}
break;
case mk_wait_some:
{
std::vector<request>::iterator pos = reqs.end();
do {
pos = wait_some(reqs.begin(), pos);
} while (pos != reqs.begin());
}
break;
case mk_wait_some_keep:
{
std::vector<status> stats;
std::vector<request>::iterator pos = reqs.end();
do {
pos = wait_some(reqs.begin(), pos, std::back_inserter(stats)).second;
} while (pos != reqs.begin());
}
break;
case mk_test_some:
{
std::vector<request>::iterator pos = reqs.end();
do {
pos = test_some(reqs.begin(), pos);
} while (pos != reqs.begin());
}
break;
case mk_test_some_keep:
{
std::vector<status> stats;
std::vector<request>::iterator pos = reqs.end();
do {
pos = test_some(reqs.begin(), pos, std::back_inserter(stats)).second;
} while (pos != reqs.begin());
}
break;
default:
BOOST_CHECK(false);
}
if (comm.rank() == 0) {
bool okay = true;
if (!((incoming_value == values[0])))
okay = false;
if (method != mk_wait_any && method != mk_test_any
&& !std::equal(incoming_values.begin(), incoming_values.end(),
values))
okay = false;
if (okay)
std::cout << "OK." << std::endl;
else
std::cerr << "ERROR!" << std::endl;
}
BOOST_CHECK(incoming_value == values[0]);
if (method != mk_wait_any && method != mk_test_any)
BOOST_CHECK(std::equal(incoming_values.begin(), incoming_values.end(),
values));
}
int test_main(int argc, char* argv[])
int main(int argc, char* argv[])
{
boost::mpi::environment env(argc, argv);
communicator comm;
int int_array[3] = {17, 42, 256};
nonblocking_tests(comm, int_array, 3, "integers", false);
gps_position gps_array[2] = {
gps_position(17, 42, .06),
gps_position(42, 17, .06)
};
nonblocking_tests(comm, gps_array, 2, "gps positions", false);
std::string string_array[2] = { "Hello", "World" };
nonblocking_tests(comm, string_array, 2, "strings", true);
std::list<std::string> lst_of_strings;
for (int i = 0; i < comm.size(); ++i)
lst_of_strings.push_back(boost::lexical_cast<std::string>(i));
nonblocking_tests(comm, &lst_of_strings, 1, "list of strings", true);
boost::mpi::communicator comm;
std::string value = "Hello";
std::string incoming;
int next = (comm.rank() + 1) % comm.size();
int prev = (comm.rank() + comm.size() - 1) % comm.size();
int tag = 2;
request sreq = comm.isend(next, tag, value);
request rreq = comm.irecv(prev, tag, incoming);
int probe = 0;
int test = 0;
MPI_Message msg;
do {
if (!test) {
MPI_Test(sreq.m_request.get(), &test, MPI_STATUS_IGNORE);
if (test) {
printf("Proc %i sent msg %i to Proc %i\n", comm.rank(), tag, next);
} else {
printf("Proc %i have not sent msg %i to Proc %i yet\n", comm.rank(), tag, next);
}
}
if (!probe) {
int err = MPI_Improbe(prev, tag,
comm, &probe,
&msg,
MPI_STATUS_IGNORE);
if (probe)
printf("Proc %i got msg %i from proc %i\n", comm.rank(), tag, prev);
else
printf("Proc %i haven't got msg %i from proc %i yet\n", comm.rank(), tag, prev);
}
} while(probe == 0 || test == 0);
return 0;
}