2
0
mirror of https://github.com/boostorg/mpi.git synced 2026-01-19 04:22:10 +00:00

Fixed #6436 #5596 and added threaded initialization

[SVN r84739]
This commit is contained in:
Matthias Troyer
2013-06-11 08:30:39 +00:00
parent f367a5aa17
commit 7cfa3db27a
15 changed files with 500 additions and 7 deletions

View File

@@ -350,8 +350,6 @@ constructor as shown in this example:
return 0;
}
[section:point_to_point Point-to-Point communication]
As a message passing library, MPI's primary purpose is to routine
@@ -1885,7 +1883,7 @@ routine, e.g.,
`skeleton_proxy` objects can be received on the other end via `recv()`,
which stores a newly-created instance of your data structure with the
same "shape" as the sender in its `"object` attribute:
same "shape" as the sender in its `"object"` attribute:
shape = mpi.world.recv(0, 0)
my_data_structure = shape.object
@@ -1906,7 +1904,6 @@ values in the data structure--but not its shape--changes.
The skeleton/content mechanism is a structured way to exploit the
interaction between custom-built MPI datatypes and `MPI_BOTTOM`, to
eliminate extra buffer copies.
[endsect]
[section:python_compatbility C++/Python MPI Compatibility]
Boost.MPI is a C++ library whose facilities have been exposed to Python
@@ -1989,6 +1986,42 @@ the amount of effort required to interface between Boost.MPI
and the C MPI library.
[endsect]
[section:threading Threads]
There are an increasing number of hybrid parrallel applications that mix
distributed and shared memory parallelism. To know how to support that model,
one need to know what level of threading support is guaranteed by the MPI
implementation. There are 4 ordered level of possible threading support described
by [classref boost::mpi::threading::level mpi::threading::level].
At the lowest level, you should not use threads at all, at the highest level, any
thread can perform MPI call.
If you want to use multi-threading in your MPI application, you should indicate
in the environment constructor your preffered threading support. Then probe the
one the librarie did provide, and decide what you can do with it (it could be
nothing, then aborting is a valid option):
#include <boost/mpi/environment.hpp>
#include <boost/mpi/communicator.hpp>
#include <iostream>
namespace mpi = boost::mpi;
namespace mt = mpi::threading;
int main()
{
mpi::environment env(mt::funneled);
if (env.thread_level() < mt::funneled) {
env.abort(-1);
}
mpi::communicator world;
std::cout << "I am process " << world.rank() << " of " << world.size()
<< "." << std::endl;
return 0;
}
[endsect]
[section:performance Performance Evaluation]
Message-passing performance is crucial in high-performance distributed

View File

@@ -0,0 +1,46 @@
// Copyright (C) 2013 Andreas Hehn <hehn@phys.ethz.ch>, ETH Zurich
// based on
// hellp-world_broadcast.cpp (C) 2006 Douglas Gregor <doug.gregor@gmail.com>
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// A simple Hello world! example
// using boost::mpi::group and boost::mpi::broadcast()
#include <stdexcept>
#include <boost/mpi/environment.hpp>
#include <boost/mpi/communicator.hpp>
#include <boost/mpi/group.hpp>
#include <boost/mpi/collectives.hpp>
#include <boost/serialization/string.hpp>
namespace mpi = boost::mpi;
int main(int argc, char* argv[])
{
mpi::environment env(argc, argv);
mpi::communicator world;
if(world.size() < 2)
throw std::runtime_error("Please run with at least 2 MPI processes!");
int group_a_ranks[2] = {0,1};
mpi::group world_group = world.group();
mpi::group group_a = world_group.include(group_a_ranks,group_a_ranks+2);
mpi::communicator comm_a(world,group_a);
std::string value("Hello world!");
if(comm_a)
{
if(comm_a.rank() == 0) {
value = "Hello group a!";
}
mpi::broadcast(comm_a, value, 0);
}
std::cout << "Process #" << world.rank() << " says " << value << std::endl;
return 0;
}

View File

@@ -330,6 +330,25 @@ reduce(const communicator& comm, const T* in_values, int n, Op op, int root)
is_mpi_op<Op, T>(), is_mpi_datatype<T>());
}
template<typename T, typename Op>
void
reduce(const communicator & comm, std::vector<T> const & in_values, Op op,
int root)
{
reduce(comm, &in_values.front(), in_values.size(), op, root);
}
template<typename T, typename Op>
void
reduce(const communicator & comm, std::vector<T> const & in_values,
std::vector<T> & out_values, Op op, int root)
{
out_values.resize(in_values.size());
reduce(comm, &in_values.front(), in_values.size(), &out_values.front(), op,
root);
}
template<typename T, typename Op>
void
reduce(const communicator& comm, const T& in_value, T& out_value, Op op,

View File

@@ -13,6 +13,7 @@
#ifndef BOOST_MPI_COMMUNICATOR_HPP
#define BOOST_MPI_COMMUNICATOR_HPP
#include <boost/assert.hpp>
#include <boost/mpi/config.hpp>
#include <boost/mpi/exception.hpp>
#include <boost/optional.hpp>
@@ -869,6 +870,8 @@ class BOOST_MPI_DECL communicator
{
void operator()(MPI_Comm* comm) const
{
BOOST_ASSERT( comm != 0 );
BOOST_ASSERT(*comm != MPI_COMM_NULL);
int finalized;
BOOST_MPI_CHECK_RESULT(MPI_Finalized, (&finalized));
if (!finalized)

View File

@@ -61,6 +61,7 @@ BOOST_ARCHIVE_FORWARD_IMPLEMENTATION(archive::version_type)
BOOST_ARCHIVE_FORWARD_IMPLEMENTATION(archive::class_id_type)
BOOST_ARCHIVE_FORWARD_IMPLEMENTATION(archive::class_id_reference_type)
BOOST_ARCHIVE_FORWARD_IMPLEMENTATION(archive::object_id_type)
BOOST_ARCHIVE_FORWARD_IMPLEMENTATION(archive::object_reference_type)
BOOST_ARCHIVE_FORWARD_IMPLEMENTATION(archive::tracking_type)
BOOST_ARCHIVE_FORWARD_IMPLEMENTATION(archive::class_name_type)
BOOST_ARCHIVE_FORWARD_IMPLEMENTATION(serialization::collection_size_type)

View File

@@ -17,9 +17,46 @@
#include <boost/noncopyable.hpp>
#include <boost/optional.hpp>
#include <string>
#include <iosfwd>
namespace boost { namespace mpi {
namespace threading {
/** @brief specify the supported threading level.
*
* Based on MPI 2 standard/8.7.3
*/
enum level {
/** Only one thread will execute.
*/
single = MPI_THREAD_SINGLE,
/** Only main thread will do MPI calls.
*
* The process may be multi-threaded, but only the main
* thread will make MPI calls (all MPI calls are ``funneled''
* to the main thread).
*/
funneled = MPI_THREAD_FUNNELED,
/** Only one thread at the time do MPI calls.
*
* The process may be multi-threaded, and multiple
* threads may make MPI calls, but only one at a time:
* MPI calls are not made concurrently from two distinct
* threads (all MPI calls are ``serialized'').
*/
serialized = MPI_THREAD_SERIALIZED,
/** Multiple thread may do MPI calls.
*
* Multiple threads may call MPI, with no restrictions.
*/
multiple = MPI_THREAD_MULTIPLE
};
/** Formated output for threading level. */
std::ostream& operator<<(std::ostream& out, level l);
/** Formated input for threading level. */
std::istream& operator>>(std::istream& in, level& l);
} // namespace threading
/** @brief Initialize, finalize, and query the MPI environment.
*
* The @c environment class is used to initialize, finalize, and
@@ -62,6 +99,22 @@ public:
* program if it is destructed due to an uncaught exception.
*/
explicit environment(bool abort_on_exception = true);
/** Initialize the MPI environment.
*
* If the MPI environment has not already been initialized,
* initializes MPI with a call to @c MPI_Init_thread. Since this
* constructor does not take command-line arguments (@c argc and @c
* argv), it is only available when the underlying MPI
* implementation supports calling @c MPI_Init with @c NULL
* arguments, indicated by the macro @c
* BOOST_MPI_HAS_NOARG_INITIALIZATION.
*
* @param mt_level the required level of threading support.
*
* @param abort_on_exception When true, this object will abort the
* program if it is destructed due to an uncaught exception.
*/
explicit environment(threading::level mt_level, bool abort_on_exception = true);
#endif
/** Initialize the MPI environment.
@@ -80,6 +133,25 @@ public:
*/
environment(int& argc, char** &argv, bool abort_on_exception = true);
/** Initialize the MPI environment.
*
* If the MPI environment has not already been initialized,
* initializes MPI with a call to @c MPI_Init_thread.
*
* @param argc The number of arguments provided in @p argv, as
* passed into the program's @c main function.
*
* @param argv The array of argument strings passed to the program
* via @c main.
*
* @param mt_level the required level of threading support
*
* @param abort_on_exception When true, this object will abort the
* program if it is destructed due to an uncaught exception.
*/
environment(int& argc, char** &argv, threading::level mt_level,
bool abort_on_exception = true);
/** Shuts down the MPI environment.
*
* If this @c environment object was used to initialize the MPI
@@ -185,13 +257,21 @@ public:
*/
static std::string processor_name();
/** Query the current level of thread support.
*/
static threading::level thread_level();
/** Are we in the main thread?
*/
static bool is_main_thread();
private:
/// Whether this environment object called MPI_Init
bool i_initialized;
/// Whether we should abort if the destructor is
bool abort_on_exception;
/// The number of reserved tags.
static const int num_reserved_tags = 1;
};

View File

@@ -22,6 +22,7 @@
#include <boost/archive/detail/auto_link_archive.hpp>
#include <boost/archive/detail/common_iarchive.hpp>
#include <boost/archive/shared_ptr_helper.hpp>
#include <boost/archive/basic_archive.hpp>
#include <boost/mpi/detail/packed_iprimitive.hpp>
#include <boost/mpi/detail/binary_buffer_iprimitive.hpp>
#include <boost/serialization/string.hpp>
@@ -120,6 +121,16 @@ public:
// input archives need to ignore the optional information
void load_override(archive::class_id_optional_type & /*t*/, int){}
void load_override(archive::class_id_type & t, int version){
int_least16_t x=0;
* this->This() >> x;
t = boost::archive::class_id_type(x);
}
void load_override(archive::class_id_reference_type & t, int version){
load_override(static_cast<archive::class_id_type &>(t), version);
}
void load_override(archive::class_name_type & t, int)
{
std::string cn;

View File

@@ -19,6 +19,7 @@
#define BOOST_MPI_PACKED_OARCHIVE_HPP
#include <boost/mpi/datatype.hpp>
#include <boost/archive/basic_archive.hpp>
#include <boost/archive/detail/auto_link_archive.hpp>
#include <boost/archive/detail/common_oarchive.hpp>
#include <boost/archive/shared_ptr_helper.hpp>
@@ -124,6 +125,11 @@ public:
* this->This() << s;
}
void save_override(archive::class_id_type & t, int version){
const boost::int_least16_t x = t;
* this->This() << x;
}
private:
/// An internal buffer to be used when the user does not supply his
/// own buffer.

View File

@@ -63,7 +63,8 @@ communicator::communicator(const communicator& comm,
MPI_Comm newcomm;
BOOST_MPI_CHECK_RESULT(MPI_Comm_create,
((MPI_Comm)comm, (MPI_Group)subgroup, &newcomm));
comm_ptr.reset(new MPI_Comm(newcomm), comm_free());
if(newcomm != MPI_COMM_NULL)
comm_ptr.reset(new MPI_Comm(newcomm), comm_free());
}
int communicator::size() const

View File

@@ -9,10 +9,57 @@
#include <boost/mpi/exception.hpp>
#include <boost/mpi/detail/mpi_datatype_cache.hpp>
#include <cassert>
#include <string>
#include <exception>
#include <stdexcept>
#include <ostream>
namespace boost { namespace mpi {
namespace threading {
std::istream& operator>>(std::istream& in, level& l)
{
std::string tk;
in >> tk;
if (!in.bad()) {
if (tk == "single") {
l = single;
} else if (tk == "funneled") {
l = funneled;
} else if (tk == "serialized") {
l = serialized;
} else if (tk == "multiple") {
l = multiple;
} else {
in.setstate(std::ios::badbit);
}
}
return in;
}
std::ostream& operator<<(std::ostream& out, level l)
{
switch(l) {
case single:
out << "single";
break;
case funneled:
out << "funneled";
break;
case serialized:
out << "serialized";
break;
case multiple:
out << "multiple";
break;
default:
out << "<level error>[" << int(l) << ']';
out.setstate(std::ios::badbit);
break;
}
return out;
}
} // namespace threading
#ifdef BOOST_MPI_HAS_NOARG_INITIALIZATION
environment::environment(bool abort_on_exception)
@@ -26,6 +73,21 @@ environment::environment(bool abort_on_exception)
MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
}
environment::environment(threading::level mt_level, bool abort_on_exception)
: i_initialized(false),
abort_on_exception(abort_on_exception)
{
// It is not clear that we can pass null in MPI_Init_thread.
int dummy_thread_level = 0;
if (!initialized()) {
BOOST_MPI_CHECK_RESULT(MPI_Init_thread,
(0, 0, int(mt_level), &dummy_thread_level ));
i_initialized = true;
}
MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
}
#endif
environment::environment(int& argc, char** &argv, bool abort_on_exception)
@@ -40,6 +102,22 @@ environment::environment(int& argc, char** &argv, bool abort_on_exception)
MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
}
environment::environment(int& argc, char** &argv, threading::level mt_level,
bool abort_on_exception)
: i_initialized(false),
abort_on_exception(abort_on_exception)
{
// It is not clear that we can pass null in MPI_Init_thread.
int dummy_thread_level = 0;
if (!initialized()) {
BOOST_MPI_CHECK_RESULT(MPI_Init_thread,
(&argc, &argv, int(mt_level), &dummy_thread_level));
i_initialized = true;
}
MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
}
environment::~environment()
{
if (i_initialized) {
@@ -122,4 +200,20 @@ std::string environment::processor_name()
return std::string(name, len);
}
threading::level environment::thread_level()
{
int level;
BOOST_MPI_CHECK_RESULT(MPI_Query_thread, (&level));
return static_cast<threading::level>(level);
}
bool environment::is_main_thread()
{
int isit;
BOOST_MPI_CHECK_RESULT(MPI_Is_thread_main, (&isit));
return static_cast<bool>(isit);
}
} } // end namespace boost::mpi

View File

@@ -23,6 +23,11 @@ test-suite mpi
[ mpi-test broadcast_test : : : 2 17 ]
[ mpi-test gather_test ]
[ mpi-test is_mpi_op_test : : : 1 ]
[ mpi-test mt_level_test : : : 1 ]
[ mpi-test mt_init_test-single : mt_init_test.cpp : <testing.arg>"single" : 1 4 ]
[ mpi-test mt_init_test-funneled : mt_init_test.cpp : <testing.arg>"funneled" : 1 4 ]
[ mpi-test mt_init_test-serialized : mt_init_test.cpp : <testing.arg>"serialized" : 1 4 ]
[ mpi-test mt_init_test-multiple : mt_init_test.cpp : <testing.arg>"multiple" : 1 4 ]
# Note: Microsoft MPI fails nonblocking_test on 1 processor
[ mpi-test nonblocking_test ]
[ mpi-test reduce_test ]
@@ -33,5 +38,6 @@ test-suite mpi
[ mpi-test skeleton_content_test : : : 2 3 4 7 8 13 17 ]
[ mpi-test graph_topology_test : : : 2 7 13 ]
[ mpi-test pointer_test : : : 2 ]
[ mpi-test groups_test ]
;
}

59
test/groups_test.cpp Normal file
View File

@@ -0,0 +1,59 @@
// Copyright (C) 2013 Andreas Hehn <hehn@phys.ethz.ch>, ETH Zurich
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// A test of communicators created from groups.
#include <boost/mpi/environment.hpp>
#include <boost/mpi/communicator.hpp>
#include <boost/mpi/group.hpp>
#include <boost/test/minimal.hpp>
#include <vector>
#include <algorithm>
namespace mpi = boost::mpi;
template <typename T>
struct iota
{
iota() : state(0){};
T operator()()
{
return state++;
}
T state;
};
void group_test(const mpi::communicator& comm)
{
std::vector<int> grp_a_ranks(comm.size() / 2);
std::generate(grp_a_ranks.begin(),grp_a_ranks.end(),iota<int>());
mpi::group grp_a = comm.group().include(grp_a_ranks.begin(),grp_a_ranks.end());
mpi::group grp_b = comm.group().exclude(grp_a_ranks.begin(),grp_a_ranks.end());
mpi::communicator part_a(comm,grp_a);
mpi::communicator part_b(comm,grp_b);
if(part_a)
{
std::cout << "comm rank: " << comm.rank() << " -> part_a rank:" << part_a.rank() << std::endl;
BOOST_CHECK(part_a.rank() == comm.rank());
}
if(part_b)
{
std::cout << "comm rank: " << comm.rank() << " -> part_b rank:" << part_b.rank() << std::endl;
BOOST_CHECK(part_b.rank() == comm.rank() - comm.size()/2);
}
}
int test_main(int argc, char* argv[])
{
mpi::environment env(argc,argv);
mpi::communicator comm;
group_test(comm);
return 0;
}

27
test/mt_init_test.cpp Normal file
View File

@@ -0,0 +1,27 @@
// Copyright (C) 2013 Alain Miniussi <alain.miniussi@oca.eu>
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// test threading::level operations
#include <boost/mpi/environment.hpp>
#include <boost/test/minimal.hpp>
#include <iostream>
#include <sstream>
namespace mpi = boost::mpi;
int
test_main(int argc, char* argv[]) {
mpi::threading::level required = mpi::threading::level(-1);
BOOST_CHECK(argc == 2);
std::istringstream cmdline(argv[1]);
cmdline >> required;
BOOST_CHECK(!cmdline.bad());
mpi::environment env(argc,argv,required);
BOOST_CHECK(env.thread_level() >= mpi::threading::single);
BOOST_CHECK(env.thread_level() <= mpi::threading::multiple);
return 0;
}

107
test/mt_level_test.cpp Normal file
View File

@@ -0,0 +1,107 @@
// Copyright (C) 2013 Alain Miniussi <alain.miniussi@oca.eu>
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// test threading::level operations
#include <boost/mpi/environment.hpp>
#include <boost/test/minimal.hpp>
#include <iostream>
#include <sstream>
namespace mpi = boost::mpi;
void
test_threading_level_io(mpi::threading::level orig) {
std::ostringstream out;
namespace mt = boost::mpi::threading;
mt::level printed = mt::level(-1);
out << orig;
BOOST_CHECK(out.good());
std::string orig_str(out.str());
std::cout << "orig string:" << orig_str << '\n';
std::istringstream in(orig_str);
in >> printed;
BOOST_CHECK(!in.bad());
std::cout << "orig: " << orig << ", printed: " << printed << std::endl;
BOOST_CHECK(orig == printed);
}
void
test_threading_levels_io() {
namespace mt = boost::mpi::threading;
test_threading_level_io(mt::single);
test_threading_level_io(mt::funneled);
test_threading_level_io(mt::serialized);
test_threading_level_io(mt::multiple);
}
void
test_threading_level_cmp() {
namespace mt = boost::mpi::threading;
BOOST_CHECK(mt::single == mt::single);
BOOST_CHECK(mt::funneled == mt::funneled);
BOOST_CHECK(mt::serialized == mt::serialized);
BOOST_CHECK(mt::multiple == mt::multiple);
BOOST_CHECK(mt::single != mt::funneled);
BOOST_CHECK(mt::single != mt::serialized);
BOOST_CHECK(mt::single != mt::multiple);
BOOST_CHECK(mt::funneled != mt::single);
BOOST_CHECK(mt::funneled != mt::serialized);
BOOST_CHECK(mt::funneled != mt::multiple);
BOOST_CHECK(mt::serialized != mt::single);
BOOST_CHECK(mt::serialized != mt::funneled);
BOOST_CHECK(mt::serialized != mt::multiple);
BOOST_CHECK(mt::multiple != mt::single);
BOOST_CHECK(mt::multiple != mt::funneled);
BOOST_CHECK(mt::multiple != mt::serialized);
BOOST_CHECK(mt::single < mt::funneled);
BOOST_CHECK(mt::funneled > mt::single);
BOOST_CHECK(mt::single < mt::serialized);
BOOST_CHECK(mt::serialized > mt::single);
BOOST_CHECK(mt::single < mt::multiple);
BOOST_CHECK(mt::multiple > mt::single);
BOOST_CHECK(mt::funneled < mt::serialized);
BOOST_CHECK(mt::serialized > mt::funneled);
BOOST_CHECK(mt::funneled < mt::multiple);
BOOST_CHECK(mt::multiple > mt::funneled);
BOOST_CHECK(mt::serialized < mt::multiple);
BOOST_CHECK(mt::multiple > mt::serialized);
BOOST_CHECK(mt::single <= mt::single);
BOOST_CHECK(mt::single <= mt::funneled);
BOOST_CHECK(mt::funneled >= mt::single);
BOOST_CHECK(mt::single <= mt::serialized);
BOOST_CHECK(mt::serialized >= mt::single);
BOOST_CHECK(mt::single <= mt::multiple);
BOOST_CHECK(mt::multiple >= mt::single);
BOOST_CHECK(mt::funneled <= mt::funneled);
BOOST_CHECK(mt::funneled <= mt::serialized);
BOOST_CHECK(mt::serialized >= mt::funneled);
BOOST_CHECK(mt::funneled <= mt::multiple);
BOOST_CHECK(mt::multiple >= mt::funneled);
BOOST_CHECK(mt::serialized <= mt::serialized);
BOOST_CHECK(mt::serialized <= mt::multiple);
BOOST_CHECK(mt::multiple >= mt::serialized);
BOOST_CHECK(mt::multiple <= mt::multiple);
}
int
test_main(int argc, char* argv[]) {
test_threading_levels_io();
test_threading_level_cmp();
return 0;
}

View File

@@ -28,7 +28,7 @@ enum method_kind {
mk_all_except_test_all // use for serialized types
};
static char* method_kind_names[mk_all] = {
static const char* method_kind_names[mk_all] = {
"wait_any",
"test_any",
"wait_all",