From e62545bb2fa8cc0bf70885fa8326c190210d2e10 Mon Sep 17 00:00:00 2001 From: Alain Miniussi Date: Fri, 24 Aug 2018 16:29:41 +0200 Subject: [PATCH] checkpointing personnal stuf --- include/boost/mpi/communicator.hpp | 172 ++----------------- include/boost/mpi/nonblocking.hpp | 39 ++++- include/boost/mpi/request.hpp | 2 +- src/point_to_point.cpp | 10 +- src/request.cpp | 53 ++++-- test/nonblocking_test.cpp | 264 ++++------------------------- 6 files changed, 121 insertions(+), 419 deletions(-) diff --git a/include/boost/mpi/communicator.hpp b/include/boost/mpi/communicator.hpp index 96d67af..038b9ec 100644 --- a/include/boost/mpi/communicator.hpp +++ b/include/boost/mpi/communicator.hpp @@ -291,9 +291,6 @@ class BOOST_MPI_DECL communicator template void send(int dest, int tag, const T& value) const; - template - void send(int dest, int tag, const std::vector& value) const; - /** * @brief Send the skeleton of an object. * @@ -406,9 +403,6 @@ class BOOST_MPI_DECL communicator template status recv(int source, int tag, T& value) const; - template - status recv(int source, int tag, std::vector& value) const; - /** * @brief Receive a skeleton from a remote process. * @@ -603,9 +597,6 @@ class BOOST_MPI_DECL communicator template request isend(int dest, int tag, const T* values, int n) const; - template - request isend(int dest, int tag, const std::vector& values) const; - /** * @brief Send a message to another process without any data * without blocking. @@ -690,9 +681,6 @@ class BOOST_MPI_DECL communicator template request irecv(int source, int tag, T* values, int n) const; - template - request irecv(int source, int tag, std::vector& values) const; - /** * @brief Initiate receipt of a message from a remote process that * carries no data. @@ -1100,38 +1088,6 @@ class BOOST_MPI_DECL communicator request array_irecv_impl(int source, int tag, T* values, int n, mpl::false_) const; - // We're sending/receivig a vector with associated MPI datatype. - // We need to send/recv the size and then the data and make sure - // blocking and non blocking method agrees on the format. - template - request irecv_vector(int source, int tag, std::vector& values, - mpl::true_) const; - template - request isend_vector(int dest, int tag, const std::vector& values, - mpl::true_) const; - template - void send_vector(int dest, int tag, const std::vector& value, - mpl::true_) const; - template - status recv_vector(int source, int tag, std::vector& value, - mpl::true_) const; - - // We're sending/receivig a vector with no associated MPI datatype. - // We need to send/recv it as an archive and make sure - // blocking and non blocking method agrees on the format. - template - request irecv_vector(int source, int tag, std::vector& values, - mpl::false_) const; - template - request isend_vector(int dest, int tag, const std::vector& values, - mpl::false_) const; - template - void send_vector(int dest, int tag, const std::vector& value, - mpl::false_) const; - template - status recv_vector(int source, int tag, std::vector& value, - mpl::false_) const; - protected: shared_ptr comm_ptr; }; @@ -1355,34 +1311,12 @@ communicator::array_send_impl(int dest, int tag, const T* values, int n, mpl::false_) const { packed_oarchive oa(*this); - oa << n << boost::serialization::make_array(values, n); + for(int i = 0; i < n; ++i) { + oa << values[i]; + } send(dest, tag, oa); } -template -void communicator::send_vector(int dest, int tag, - const std::vector& value, mpl::true_ true_type) const -{ - // send the vector size - typename std::vector::size_type size = value.size(); - send(dest, tag, size); - // send the data - this->array_send_impl(dest, tag, value.data(), size, true_type); -} - -template -void communicator::send_vector(int dest, int tag, - const std::vector& value, mpl::false_ false_type) const -{ - this->send_impl(dest, tag, value, false_type); -} - -template -void communicator::send(int dest, int tag, const std::vector& value) const -{ - send_vector(dest, tag, value, is_mpi_datatype()); -} - // Array send must send the elements directly template void communicator::send(int dest, int tag, const T* values, int n) const @@ -1447,50 +1381,14 @@ communicator::array_recv_impl(int source, int tag, T* values, int n, // Receive the message packed_iarchive ia(*this); status stat = recv(source, tag, ia); - - // Determine how much data we are going to receive - int count; - ia >> count; - - // Deserialize the data in the message - boost::serialization::array_wrapper arr(values, count > n? n : count); - ia >> arr; - - if (count > n) { - boost::throw_exception( - std::range_error("communicator::recv: message receive overflow")); + + for(int i = 0; i < n; ++i) { + ia >> values[i]; } - - stat.m_count = count; + stat.m_count = n; return stat; } -template -status communicator::recv_vector(int source, int tag, - std::vector& value, mpl::true_ true_type) const -{ - // receive the vector size - typename std::vector::size_type size = 0; - recv(source, tag, size); - // size the vector - value.resize(size); - // receive the data - return this->array_recv_impl(source, tag, value.data(), size, true_type); -} - -template -status communicator::recv_vector(int source, int tag, - std::vector& value, mpl::false_ false_type) const -{ - return this->recv_impl(source, tag, value, false_type); -} - -template -status communicator::recv(int source, int tag, std::vector& value) const -{ - return recv_vector(source, tag, value, is_mpi_datatype()); -} - // Array receive must receive the elements directly into a buffer. template status communicator::recv(int source, int tag, T* values, int n) const @@ -1572,35 +1470,6 @@ request communicator::isend(int dest, int tag, const T& value) const return this->isend_impl(dest, tag, value, is_mpi_datatype()); } -template -request communicator::isend(int dest, int tag, const std::vector& values) const -{ - return this->isend_vector(dest, tag, values, is_mpi_datatype()); -} - -template -request -communicator::isend_vector(int dest, int tag, const std::vector& values, - mpl::true_) const -{ - std::size_t size = values.size(); - request req = this->isend_impl(dest, tag, size, mpl::true_()); - BOOST_MPI_CHECK_RESULT(MPI_Isend, - (const_cast(values.data()), size, - get_mpi_datatype(), - dest, tag, MPI_Comm(*this), req.m_request.get())); - return req; - -} - -template -request -communicator::isend_vector(int dest, int tag, const std::vector& values, - mpl::false_ no) const -{ - return this->isend_impl(dest, tag, values, no); -} - template request communicator::array_isend_impl(int dest, int tag, const T* values, int n, @@ -1620,7 +1489,9 @@ communicator::array_isend_impl(int dest, int tag, const T* values, int n, mpl::false_) const { shared_ptr archive(new packed_oarchive(*this)); - *archive << n << boost::serialization::make_array(values, n); + for(int i = 0; i < n; ++i) { + *archive << values[i]; + } request result = isend(dest, tag, *archive); result.m_data = archive; return result; @@ -1796,29 +1667,6 @@ communicator::array_irecv_impl(int source, int tag, T* values, int n, return request(source, tag, *this, values, n); } -template -request -communicator::irecv_vector(int source, int tag, std::vector& values, - mpl::true_) const -{ - return request(source, tag, *this, values); -} - -template -request -communicator::irecv_vector(int source, int tag, std::vector& values, - mpl::false_ no) const -{ - return irecv_impl(source, tag, values, no); -} - -template -request -communicator::irecv(int source, int tag, std::vector& values) const -{ - return irecv_vector(source, tag, values, is_mpi_datatype()); -} - // Array receive must receive the elements directly into a buffer. template request communicator::irecv(int source, int tag, T* values, int n) const diff --git a/include/boost/mpi/nonblocking.hpp b/include/boost/mpi/nonblocking.hpp index a50c845..7a54c32 100644 --- a/include/boost/mpi/nonblocking.hpp +++ b/include/boost/mpi/nonblocking.hpp @@ -14,6 +14,8 @@ #include #include +#include +#include #include // for std::iterator_traits #include #include // for std::pair @@ -88,7 +90,8 @@ optional > test_any(ForwardIterator first, ForwardIterator last) { while (first != last) { - if (optional result = first->test()) { + optional result = first->test(); + if (result) { return std::make_pair(*result, first); } ++first; @@ -124,9 +127,34 @@ template OutputIterator wait_all(ForwardIterator first, ForwardIterator last, OutputIterator out) { - for(ForwardIterator it = first; it != last; ++it) { - *out++ = it->wait(); + std::vector requests(std::distance(first, last)); + for(int i = 0; i < requests.size(); ++i) { + requests[i] = &(*first++); } + std::vector statuses(requests.size()); + typedef std::vector::iterator vriter; + int pending; + do { + pending = 0; + for(int i = 0; i < requests.size(); ++i) { + if (requests[i]) { + request& req = *requests[i]; + if (!req.active()) { + statuses[i] = status::empty_status(); + } else { + optional stat = req.test(); + if (stat) { + printf("Proc %i got msg %d\n", communicator().rank(), stat->tag()); + statuses[i] = *stat; + requests[i] = 0; + } else { + ++pending; + } + } + } + } + } while(pending>0); + std::copy(statuses.begin(), statuses.end(), out); return out; } @@ -137,9 +165,8 @@ template void wait_all(ForwardIterator first, ForwardIterator last) { - for(ForwardIterator it = first; it != last; ++it) { - it->wait(); - } + std::vector statuses(std::distance(first, last)); + wait_all(first, last, statuses.begin()); } /** diff --git a/include/boost/mpi/request.hpp b/include/boost/mpi/request.hpp index 12e2ea8..5ca5607 100644 --- a/include/boost/mpi/request.hpp +++ b/include/boost/mpi/request.hpp @@ -139,7 +139,7 @@ class BOOST_MPI_DECL request template struct probe_info_const_skeleton_proxy; template struct probe_info_skeleton_proxy; - + public: // while debuging shared_ptr m_request; shared_ptr m_probe_info; shared_ptr m_data; diff --git a/src/point_to_point.cpp b/src/point_to_point.cpp index b72ddfd..5b11bc8 100644 --- a/src/point_to_point.cpp +++ b/src/point_to_point.cpp @@ -21,7 +21,9 @@ #include #include #include +#include #include +#include namespace boost { namespace mpi { namespace detail { @@ -38,12 +40,16 @@ packed_archive_send(MPI_Comm comm, int dest, int tag, int packed_archive_isend(MPI_Comm comm, int dest, int tag, const packed_oarchive& ar, - MPI_Request& out_requests) + MPI_Request& out_request) { BOOST_MPI_CHECK_RESULT(MPI_Isend, (detail::unconst(ar.address()), ar.size(), MPI_PACKED, - dest, tag, comm, &out_requests)); + dest, tag, comm, &out_request)); + int flag; + MPI_Status stat; + MPI_Request_get_status(out_request, &flag, &stat); + printf("Proc %i ISending message %i to %i\n", communicator().rank(), tag, dest); return 1; } diff --git a/src/request.cpp b/src/request.cpp index 2d382f4..72d4583 100644 --- a/src/request.cpp +++ b/src/request.cpp @@ -29,18 +29,19 @@ status request::wait() m_request.reset(); m_data.reset(); } else if (bool(m_probe_info)) { - BOOST_MPI_CHECK_RESULT(MPI_Mprobe, (m_probe_info->m_source, m_probe_info->m_tag, - m_probe_info->m_comm, - &m_probe_info->m_message, + probe_info_base& info = *m_probe_info; + BOOST_MPI_CHECK_RESULT(MPI_Mprobe, (info.m_source, info.m_tag, + info.m_comm, + &info.m_message, &stat.m_status)); int count; BOOST_MPI_CHECK_RESULT(MPI_Get_count, (&stat.m_status, MPI_PACKED, &count)); - packed_iarchive& ar = m_probe_info->archive(); + packed_iarchive& ar = info.archive(); ar.resize(count); BOOST_MPI_CHECK_RESULT(MPI_Mrecv, (ar.address(), ar.size(), MPI_PACKED, - &m_probe_info->m_message, &stat.m_status)); - m_probe_info->deserialize(stat); + &info.m_message, &stat.m_status)); + info.deserialize(stat); m_probe_info.reset(); m_data.reset(); } else { @@ -54,33 +55,53 @@ optional request::test() status stat; int flag = 0; if (bool(m_request)) { - BOOST_MPI_CHECK_RESULT(MPI_Test, (m_request.get(), &flag, &stat.m_status)); - if (flag) { + if (*m_request == MPI_REQUEST_NULL) { m_request.reset(); m_data.reset(); return optional(stat); } else { - return optional(); + MPI_Request_get_status(*m_request.get(), &flag, &stat.m_status); + printf("Proc %i: looking into msg %i\n", communicator().rank(), stat.m_status.MPI_TAG); + BOOST_MPI_CHECK_RESULT(MPI_Test, (m_request.get(), &flag, &stat.m_status)); + if (flag) { + printf("Proc %i: concluded msg %i\n", communicator().rank(), stat.m_status.MPI_TAG); + m_request.reset(); + m_data.reset(); + return optional(stat); + } else { + printf("Prod %i: pending msg %i\n", communicator().rank(), stat.m_status.MPI_TAG); + return optional(); + } } } else if (bool(m_probe_info)) { - BOOST_MPI_CHECK_RESULT(MPI_Improbe, (m_probe_info->m_source, m_probe_info->m_tag, - m_probe_info->m_comm, &flag, - &m_probe_info->m_message, - &stat.m_status)); + probe_info_base& info = *m_probe_info; + /*BOOST_MPI_CHECK_RESULT(MPI_Improbe, (info.m_source, info.m_tag, + info.m_comm, &flag, + &info.m_message, + &stat.m_status));*/ + int source = info.m_source; + int err = MPI_Improbe(source, info.m_tag, + info.m_comm, &flag, + &info.m_message, + &stat.m_status); + if (err != MPI_SUCCESS) { std::abort(); } if (flag) { + printf("Proc %i: probed msg %i from %i completed\n", communicator().rank(), info.m_tag, info.m_source); + // message is arrived int count; BOOST_MPI_CHECK_RESULT(MPI_Get_count, (&stat.m_status, MPI_PACKED, &count)); - packed_iarchive& ar = m_probe_info->archive(); + packed_iarchive& ar = info.archive(); ar.resize(count); BOOST_MPI_CHECK_RESULT(MPI_Mrecv, (ar.address(), ar.size(), MPI_PACKED, - &m_probe_info->m_message, &stat.m_status)); - m_probe_info->deserialize(stat); + &info.m_message, &stat.m_status)); + info.deserialize(stat); m_probe_info.reset(); m_data.reset(); return optional(stat); } else { + printf("Proc %i: probed msg %i from %i pending\n", communicator().rank(), info.m_tag, info.m_source); return optional(); } } else { diff --git a/test/nonblocking_test.cpp b/test/nonblocking_test.cpp index 7ff44b4..880b0b2 100644 --- a/test/nonblocking_test.cpp +++ b/test/nonblocking_test.cpp @@ -1,18 +1,8 @@ -// Copyright (C) 2006 Douglas Gregor. - -// Use, modification and distribution is subject to the Boost Software -// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) - -// A test of the nonblocking point-to-point operations. #include #include #include -#include -#include "gps_position.hpp" #include #include -#include #include #include @@ -20,229 +10,39 @@ using boost::mpi::communicator; using boost::mpi::request; using boost::mpi::status; -enum method_kind { - mk_wait_any, mk_test_any, mk_wait_all, mk_wait_all_keep, - mk_test_all, mk_test_all_keep, mk_wait_some, mk_wait_some_keep, - mk_test_some, mk_test_some_keep, - mk_test_size -}; - -static const char* method_kind_names[mk_test_size] = { - "wait_any", - "test_any", - "wait_all", - "wait_all (keep results)", - "test_all", - "test_all (keep results)", - "wait_some", - "wait_some (keep results)", - "test_some", - "test_some (keep results)" -}; - - -template -void -nonblocking_tests( const communicator& comm, const T* values, int num_values, - const char* kind, bool composite) -{ - nonblocking_test(comm, values, num_values, kind, mk_wait_any); - nonblocking_test(comm, values, num_values, kind, mk_test_any); - nonblocking_test(comm, values, num_values, kind, mk_wait_all); - nonblocking_test(comm, values, num_values, kind, mk_wait_all_keep); - if (!composite) { - nonblocking_test(comm, values, num_values, kind, mk_test_all); - nonblocking_test(comm, values, num_values, kind, mk_test_all_keep); - } - nonblocking_test(comm, values, num_values, kind, mk_wait_some); - nonblocking_test(comm, values, num_values, kind, mk_wait_some_keep); - nonblocking_test(comm, values, num_values, kind, mk_test_some); - nonblocking_test(comm, values, num_values, kind, mk_test_some_keep); -} - -template -void -nonblocking_test(const communicator& comm, const T* values, int num_values, - const char* kind, method_kind method) -{ - using boost::mpi::wait_any; - using boost::mpi::test_any; - using boost::mpi::wait_all; - using boost::mpi::test_all; - using boost::mpi::wait_some; - using boost::mpi::test_some; - - if (comm.rank() == 0) { - std::cout << "Testing " << method_kind_names[method] - << " with " << kind << "..."; - std::cout.flush(); - } - - typedef std::pair::iterator> - status_iterator_pair; - - T incoming_value; - std::vector incoming_values(num_values); - - std::vector reqs; - // Send/receive the first value - reqs.push_back(comm.isend((comm.rank() + 1) % comm.size(), 0, values[0])); - reqs.push_back(comm.irecv((comm.rank() + comm.size() - 1) % comm.size(), - 0, incoming_value)); - - if (method != mk_wait_any && method != mk_test_any) { -#ifndef LAM_MPI - // We've run into problems here (with 0-length messages) with - // LAM/MPI on Mac OS X and x86-86 Linux. Will investigate - // further at a later time, but the problem only seems to occur - // when using shared memory, not TCP. - - // Send/receive an empty message - reqs.push_back(comm.isend((comm.rank() + 1) % comm.size(), 1)); - reqs.push_back(comm.irecv((comm.rank() + comm.size() - 1) % comm.size(), - 1)); -#endif - - // Send/receive an array - reqs.push_back(comm.isend((comm.rank() + 1) % comm.size(), 2, values, - num_values)); - reqs.push_back(comm.irecv((comm.rank() + comm.size() - 1) % comm.size(), - 2, &incoming_values.front(), num_values)); - } - - switch (method) { - case mk_wait_any: - if (wait_any(reqs.begin(), reqs.end()).second == reqs.begin()) - reqs[1].wait(); - else - reqs[0].wait(); - break; - - case mk_test_any: - { - boost::optional result; - do { - result = test_any(reqs.begin(), reqs.end()); - } while (!result); - if (result->second == reqs.begin()) - reqs[1].wait(); - else - reqs[0].wait(); - break; - } - - case mk_wait_all: - wait_all(reqs.begin(), reqs.end()); - break; - - case mk_wait_all_keep: - { - std::vector stats; - wait_all(reqs.begin(), reqs.end(), std::back_inserter(stats)); - } - break; - - case mk_test_all: - while (!test_all(reqs.begin(), reqs.end())) { /* Busy wait */ } - break; - - case mk_test_all_keep: - { - std::vector stats; - while (!test_all(reqs.begin(), reqs.end(), std::back_inserter(stats))) - /* Busy wait */; - } - break; - - case mk_wait_some: - { - std::vector::iterator pos = reqs.end(); - do { - pos = wait_some(reqs.begin(), pos); - } while (pos != reqs.begin()); - } - break; - - case mk_wait_some_keep: - { - std::vector stats; - std::vector::iterator pos = reqs.end(); - do { - pos = wait_some(reqs.begin(), pos, std::back_inserter(stats)).second; - } while (pos != reqs.begin()); - } - break; - - case mk_test_some: - { - std::vector::iterator pos = reqs.end(); - do { - pos = test_some(reqs.begin(), pos); - } while (pos != reqs.begin()); - } - break; - - case mk_test_some_keep: - { - std::vector stats; - std::vector::iterator pos = reqs.end(); - do { - pos = test_some(reqs.begin(), pos, std::back_inserter(stats)).second; - } while (pos != reqs.begin()); - } - break; - - default: - BOOST_CHECK(false); - } - - if (comm.rank() == 0) { - bool okay = true; - - if (!((incoming_value == values[0]))) - okay = false; - - if (method != mk_wait_any && method != mk_test_any - && !std::equal(incoming_values.begin(), incoming_values.end(), - values)) - okay = false; - - if (okay) - std::cout << "OK." << std::endl; - else - std::cerr << "ERROR!" << std::endl; - } - - BOOST_CHECK(incoming_value == values[0]); - - if (method != mk_wait_any && method != mk_test_any) - BOOST_CHECK(std::equal(incoming_values.begin(), incoming_values.end(), - values)); -} - -int test_main(int argc, char* argv[]) +int main(int argc, char* argv[]) { boost::mpi::environment env(argc, argv); - - communicator comm; - - int int_array[3] = {17, 42, 256}; - nonblocking_tests(comm, int_array, 3, "integers", false); - - gps_position gps_array[2] = { - gps_position(17, 42, .06), - gps_position(42, 17, .06) - }; - nonblocking_tests(comm, gps_array, 2, "gps positions", false); - - std::string string_array[2] = { "Hello", "World" }; - nonblocking_tests(comm, string_array, 2, "strings", true); - - std::list lst_of_strings; - for (int i = 0; i < comm.size(); ++i) - lst_of_strings.push_back(boost::lexical_cast(i)); - - nonblocking_tests(comm, &lst_of_strings, 1, "list of strings", true); - + boost::mpi::communicator comm; + std::string value = "Hello"; + std::string incoming; + int next = (comm.rank() + 1) % comm.size(); + int prev = (comm.rank() + comm.size() - 1) % comm.size(); + int tag = 2; + request sreq = comm.isend(next, tag, value); + request rreq = comm.irecv(prev, tag, incoming); + int probe = 0; + int test = 0; + MPI_Message msg; + do { + if (!test) { + MPI_Test(sreq.m_request.get(), &test, MPI_STATUS_IGNORE); + if (test) { + printf("Proc %i sent msg %i to Proc %i\n", comm.rank(), tag, next); + } else { + printf("Proc %i have not sent msg %i to Proc %i yet\n", comm.rank(), tag, next); + } + } + if (!probe) { + int err = MPI_Improbe(prev, tag, + comm, &probe, + &msg, + MPI_STATUS_IGNORE); + if (probe) + printf("Proc %i got msg %i from proc %i\n", comm.rank(), tag, prev); + else + printf("Proc %i haven't got msg %i from proc %i yet\n", comm.rank(), tag, prev); + } + } while(probe == 0 || test == 0); return 0; }