diff --git a/build/Jamfile.v2 b/build/Jamfile.v2 index d753ea5..5e05319 100644 --- a/build/Jamfile.v2 +++ b/build/Jamfile.v2 @@ -82,6 +82,7 @@ libraries += boost_mpi ; python/datatypes.cpp python/documentation.cpp python/py_environment.cpp + python/py_nonblocking.cpp python/py_exception.cpp python/module.cpp python/py_request.cpp diff --git a/build/__init__.py b/build/__init__.py index 9032fdf..ffd3862 100644 --- a/build/__init__.py +++ b/build/__init__.py @@ -1,10 +1,10 @@ import sys if sys.platform == 'linux2': - import dl + import DLFCN as dl flags = sys.getdlopenflags() sys.setdlopenflags(dl.RTLD_NOW|dl.RTLD_GLOBAL) import mpi sys.setdlopenflags(flags) else: -import mpi + import mpi diff --git a/src/python/documentation.cpp b/src/python/documentation.cpp index c1fd89a..ef28fb3 100644 --- a/src/python/documentation.cpp +++ b/src/python/documentation.cpp @@ -132,10 +132,10 @@ const char* module_docstring = "\n" " Once you have registered your C++ data structures, you can extract\n" " the skeleton for an instance of that data structure with skeleton().\n" - " The resulting skeleton_proxy can be transmitted via the normal send\n" + " The resulting SkeletonProxy can be transmitted via the normal send\n" " routine, e.g.,\n\n" " mpi.world.send(1, 0, skeleton(my_data_structure))\n\n" - " skeleton_proxy objects can be received on the other end via recv(),\n" + " SkeletonProxy objects can be received on the other end via recv(),\n" " which stores a newly-created instance of your data structure with the\n" " same `shape' as the sender in its `object' attribute:\n\n" " shape = mpi.world.recv(0, 0)\n" @@ -211,6 +211,86 @@ const char* environment_initialized_docstring = const char* environment_finalized_docstring = "Determine if the MPI environment has already been finalized.\n"; +/*********************************************************** + * nonblocking documentation * + ***********************************************************/ +const char* request_list_init_docstring= + "Without arguments, constructs an empty RequestList.\n" + "With one argument `iterable', copies request objects from this\n" + "iterable to the new RequestList.\n"; + +const char* nonblocking_wait_any_docstring = + "Waits until any of the given requests has been completed. It provides\n" + "functionality equivalent to MPI_Waitany.\n" + "\n" + "requests must be a RequestList instance.\n" + "\n" + "Returns a triple (value, status, index) consisting of received value\n" + "(or None), the Status object for the completed request, and its index\n" + "in the RequestList.\n"; + +const char* nonblocking_test_any_docstring = + "Tests if any of the given requests have been completed, but does not wait\n" + "for completion. It provides functionality equivalent to MPI_Testany.\n" + "\n" + "requests must be a RequestList instance.\n" + "\n" + "Returns a triple (value, status, index) like wait_any or None if no request\n" + "is complete.\n"; + +const char* nonblocking_wait_all_docstring = + "Waits until all of the given requests have been completed. It provides\n" + "functionality equivalent to MPI_Waitall.\n" + "\n" + "requests must be a RequestList instance.\n" + "\n" + "If the second parameter `callable' is provided, it is called with each\n" + "completed request's received value (or None) and it s Status object as\n" + "its arguments. The calls occur in the order given by the `requests' list.\n"; + +const char* nonblocking_test_all_docstring = + "Tests if all of the given requests have been completed. It provides\n" + "functionality equivalent to MPI_Testall.\n" + "\n" + "Returns True if all requests have been completed.\n" + "\n" + "requests must be a RequestList instance.\n" + "\n" + "If the second parameter `callable' is provided, it is called with each\n" + "completed request's received value (or None) and it s Status object as\n" + "its arguments. The calls occur in the order given by the `requests' list.\n"; + +const char* nonblocking_wait_some_docstring = + "Waits until at least one of the given requests has completed. It\n" + "then completes all of the requests it can, partitioning the input\n" + "sequence into pending requests followed by completed requests.\n" + "\n" + "This routine provides functionality equivalent to MPI_Waitsome.\n" + "\n" + "Returns the index of the first completed request." + "\n" + "requests must be a RequestList instance.\n" + "\n" + "If the second parameter `callable' is provided, it is called with each\n" + "completed request's received value (or None) and it s Status object as\n" + "its arguments. The calls occur in the order given by the `requests' list.\n"; + +const char* nonblocking_test_some_docstring = + "Tests to see if any of the given requests has completed. It completes\n" + "all of the requests it can, partitioning the input sequence into pending\n" + "requests followed by completed requests. This routine is similar to\n" + "wait_some, but does not wait until any requests have completed.\n" + "\n" + "This routine provides functionality equivalent to MPI_Testsome.\n" + "\n" + "Returns the index of the first completed request." + "\n" + "requests must be a RequestList instance.\n" + "\n" + "If the second parameter `callable' is provided, it is called with each\n" + "completed request's received value (or None) and it s Status object as\n" + "its arguments. The calls occur in the order given by the `requests' list.\n"; + /*********************************************************** * exception documentation * ***********************************************************/ @@ -310,14 +390,14 @@ const char* scatter_docstring = * communicator documentation * ***********************************************************/ const char* communicator_docstring = - "The communicator class abstracts a set of communicating\n" + "The Communicator class abstracts a set of communicating\n" "processes in MPI. All of the processes that belong to a certain\n" "communicator can determine the size of the communicator, their rank\n" "within the communicator, and communicate with any other processes\n" "in the communicator.\n"; const char* communicator_default_constructor_docstring = - "Build a new Boost.MPI communicator for MPI_COMM_WORLD.\n"; + "Build a new Boost.MPI Communicator instance for MPI_COMM_WORLD.\n"; const char* communicator_rank_docstring = "Returns the rank of the process in the communicator, which will be a\n" @@ -335,10 +415,10 @@ const char* communicator_send_docstring = " - For C++ objects registered via register_serialized(), the value\n" " will be serialized and transmitted.\n" "\n" - " - For skeleton_proxy objects, the skeleton of the object will be\n" + " - For SkeletonProxy objects, the skeleton of the object will be\n" " serialized and transmitted.\n" "\n" - " - For content objects, the content will be transmitted directly.\n" + " - For Content objects, the content will be transmitted directly.\n" " This content can be received by a matching recv/irecv call that\n" " provides a suitable `buffer' argument.\n" "\n" @@ -351,12 +431,12 @@ const char* communicator_recv_docstring = "the message can be received from any process. Likewise, if the tag\n" "parameter is not specified, a message with any tag can be received.\n" "If return_status is True, returns a tuple containing the received\n" - "object followed by a status object describing the communication.\n" + "object followed by a Status object describing the communication.\n" "Otherwise, recv() returns just the received object.\n" "\n" "When receiving the content of a data type that has been sent separately\n" "from its skeleton, user code must provide a value for the `buffer'\n" - "argument. This value should be the content object returned from\n" + "argument. This value should be the Content object returned from\n" "get_content().\n"; const char* communicator_isend_docstring = @@ -364,7 +444,7 @@ const char* communicator_isend_docstring = "tag to the process with rank dest. It can be received by the\n" "destination process with a matching recv call. The value will be\n" "transmitted in the same way as with send().\n" - "This routine returns a request object, which can be used to query\n" + "This routine returns a Request object, which can be used to query\n" "when the transmission has completed, wait for its completion, or\n" "cancel the transmission.\n"; @@ -373,15 +453,15 @@ const char* communicator_irecv_docstring = "source with the given tag. If the source parameter is not specified,\n" "the message can be received from any process. Likewise, if the tag\n" "parameter is not specified, a message with any tag can be received.\n" - "This routine returns a request object, which can be used to query\n" + "This routine returns a Request object, which can be used to query\n" "when the transmission has completed, wait for its completion, or\n" "cancel the transmission. The received value be accessible\n" - "through the `value' attribute of the request object once transmission\n" + "through the `value' attribute of the Request object once transmission\n" "has completed.\n" "\n" "As with the recv() routine, when receiving the content of a data type\n" "that has been sent separately from its skeleton, user code must provide\n" - "a value for the `buffer' argument. This value should be the content\n" + "a value for the `buffer' argument. This value should be the Content\n" "object returned from get_content().\n"; const char* communicator_probe_docstring = @@ -389,7 +469,7 @@ const char* communicator_irecv_docstring = "is available to be received. It then returns information about\n" "that message. If source is omitted, a message from any process\n" "will match. If tag is omitted, a message with any tag will match.\n" - "The actual source and tag can be retrieved from the returned status\n" + "The actual source and tag can be retrieved from the returned Status\n" "object. To check if a message is available without blocking, use\n" "iprobe.\n"; @@ -399,7 +479,7 @@ const char* communicator_iprobe_docstring = "message; otherwise, it returns None. If source is omitted, a message\n" "from any process will match. If tag is omitted, a message with any\n" "tag will match. The actual source and tag can be retrieved from the\n" - "returned status object. To wait for a message to become available, use\n" + "returned Status object. To wait for a message to become available, use\n" "probe.\n"; const char* communicator_barrier_docstring = @@ -418,8 +498,8 @@ const char* communicator_split_docstring = "the ordering of processes with the same color in the resulting\n" "communicator. If omitted, the key will default to the rank of\n" "the process in the current communicator.\n\n" - "Returns a new communicator containing all of the processes in\n" - "this communicator that have the same color.\n"; + "Returns a new Communicator instance containing all of the \n" + "processes in this communicator that have the same color.\n"; const char* communicator_abort_docstring = "Makes a \"best attempt\" to abort all of the tasks in the group of\n" @@ -435,36 +515,46 @@ const char* communicator_abort_docstring = * request documentation * ***********************************************************/ const char* request_docstring = - "The request class contains information about a non-blocking send\n" + "The Request class contains information about a non-blocking send\n" "or receive and will be returned from isend or irecv, respectively.\n" - "When a request object represents a completed irecv, the `value' \n" + "When a Request object represents a completed irecv, the `value' \n" "attribute will contain the received value.\n"; +const char* request_with_value_docstring = + "This class is an implementation detail. Any call that accepts a\n" + "Request also accepts a RequestWithValue, and vice versa.\n"; + const char* request_wait_docstring = "Wait until the communication associated with this request has\n" "completed. For a request that is associated with an isend(), returns\n" - "a status object describing the communication. For an irecv()\n" + "a Status object describing the communication. For an irecv()\n" "operation, returns the received value by default. However, when\n" - "return_status=True, a (value, status) pair is returned by a.\n" + "return_status=True, a (value, status) pair is returned by a\n" "completed irecv request.\n"; const char* request_test_docstring = "Determine whether the communication associated with this request\n" - "has completed successfully. If so, returns the status object\n" + "has completed successfully. If so, returns the Status object\n" "describing the communication (for an isend request) or a tuple\n" - "containing the received value and a status object (for an irecv\n" - "request). Note that once test() returns a status object, the\n" + "containing the received value and a Status object (for an irecv\n" + "request). Note that once test() returns a Status object, the\n" "request has completed and wait() should not be called.\n"; const char* request_cancel_docstring = "Cancel a pending communication, assuming it has not already been\n" "completed.\n"; +const char* request_value_docstring = + "If this request originated in an irecv(), this property makes the" + "sent value accessible once the request completes.\n" + "\n" + "If no value is available, ValueError is raised.\n"; + /*********************************************************** * skeleton/content documentation * ***********************************************************/ const char* object_without_skeleton_docstring = - "The object_without_skeleton class is an exception class used only\n" + "The ObjectWithoutSkeleton class is an exception class used only\n" "when the skeleton() or get_content() function is called with an\n" "object that is not supported by the skeleton/content mechanism.\n" "All C++ types for which skeletons and content can be transmitted\n" @@ -475,13 +565,13 @@ const char* object_without_skeleton_object_docstring = "The object on which skeleton() or get_content() was invoked.\n"; const char* skeleton_proxy_docstring = - "The skeleton_proxy class is used to represent the skeleton of an\n" - "object. The skeleton_proxy can be used as the value parameter of\n" + "The SkeletonProxy class is used to represent the skeleton of an\n" + "object. The SkeletonProxy can be used as the value parameter of\n" "send() or isend() operations, but instead of transmitting the\n" "entire object, only its skeleton (\"shape\") will be sent, without\n" "the actual data. Its content can then be transmitted, separately.\n" "\n" - "User code cannot generate skeleton_proxy instances directly. To\n" + "User code cannot generate SkeletonProxy instances directly. To\n" "refer to the skeleton of an object, use skeleton(object). Skeletons\n" "can also be received with the recv() and irecv() methods.\n" "\n" @@ -503,7 +593,7 @@ const char* content_docstring = "skeleton/content mechanism.\n"; const char* skeleton_docstring = - "The skeleton function retrieves the skeleton_proxy for its object\n" + "The skeleton function retrieves the SkeletonProxy for its object\n" "parameter, allowing the transmission of the skeleton (or \"shape\")\n" "of the object separately from its data. The skeleton/content mechanism\n" "is useful when a large data structure remains structurally the same\n" @@ -534,7 +624,7 @@ const char* get_content_docstring = * status documentation * ***********************************************************/ const char* status_docstring = - "The status class stores information about a given message, including\n" + "The Status class stores information about a given message, including\n" "its source, tag, and whether the message transmission was cancelled\n" "or resulted in an error.\n"; @@ -554,7 +644,7 @@ const char* status_cancelled_docstring = * timer documentation * ***********************************************************/ const char* timer_docstring = - "The timer class is a simple wrapper around the MPI timing facilities.\n"; + "The Timer class is a simple wrapper around the MPI timing facilities.\n"; const char* timer_default_constructor_docstring = "Initializes the timer. After this call, elapsed == 0.\n"; diff --git a/src/python/module.cpp b/src/python/module.cpp index f365b7e..ec36968 100644 --- a/src/python/module.cpp +++ b/src/python/module.cpp @@ -27,6 +27,7 @@ extern void export_datatypes(); extern void export_request(); extern void export_status(); extern void export_timer(); +extern void export_nonblocking(); extern const char* module_docstring; @@ -35,7 +36,7 @@ BOOST_PYTHON_MODULE(mpi) // Setup module documentation scope().attr("__doc__") = module_docstring; scope().attr("__author__") = "Douglas Gregor "; - scope().attr("__date__") = "$LastChangedDate: 2006-07-16 15:25:47 -0400 (Sun, 16 Jul 2006) $"; + scope().attr("__date__") = "$LastChangedDate$"; scope().attr("__version__") = "$Revision$"; scope().attr("__copyright__") = "Copyright (C) 2006 Douglas Gregor"; scope().attr("__license__") = "http://www.boost.org/LICENSE_1_0.txt"; @@ -48,6 +49,7 @@ BOOST_PYTHON_MODULE(mpi) export_request(); export_status(); export_timer(); + export_nonblocking(); } } } } // end namespace boost::mpi::python diff --git a/src/python/py_communicator.cpp b/src/python/py_communicator.cpp index 4b7ce50..6e53f56 100644 --- a/src/python/py_communicator.cpp +++ b/src/python/py_communicator.cpp @@ -14,6 +14,7 @@ #include #include #include +#include "request_with_value.hpp" using namespace boost::python; using namespace boost::mpi; @@ -49,14 +50,12 @@ communicator_recv(const communicator& comm, int source, int tag, return result; } -object +request_with_value communicator_irecv(const communicator& comm, int source, int tag) { - using boost::python::make_tuple; - - object result; - object req(comm.irecv(source, tag, result)); - req.attr("value") = result; + boost::shared_ptr result(new object()); + request_with_value req(comm.irecv(source, tag, *result)); + req.m_internal_value = result; return req; } @@ -76,7 +75,7 @@ void export_communicator() using boost::python::arg; using boost::python::object; - class_ comm("communicator", communicator_docstring); + class_ comm("Communicator", communicator_docstring); comm .def(init<>()) .add_property("rank", &communicator::rank, communicator_rank_docstring) diff --git a/src/python/py_exception.cpp b/src/python/py_exception.cpp index e19c0eb..fee48c4 100644 --- a/src/python/py_exception.cpp +++ b/src/python/py_exception.cpp @@ -30,9 +30,8 @@ extern const char* exception_result_code_docstring; str exception_str(const exception& e) { - return str("MPI routine `" + std::string(e.routine()) + - "' returned error code " + - lexical_cast(e.result_code())); + return str(std::string(e.what()) + + " (code " + lexical_cast(e.result_code())+")"); } void export_exception() @@ -42,10 +41,10 @@ void export_exception() object type = class_ - ("exception", exception_docstring, no_init) + ("Exception", exception_docstring, no_init) .add_property("what", &exception::what, exception_what_docstring) .add_property("routine", &exception::what, exception_routine_docstring) - .add_property("result_code", &exception::what, + .add_property("result_code", &exception::result_code, exception_result_code_docstring) .def("__str__", &exception_str) ; diff --git a/src/python/py_nonblocking.cpp b/src/python/py_nonblocking.cpp new file mode 100644 index 0000000..22f289f --- /dev/null +++ b/src/python/py_nonblocking.cpp @@ -0,0 +1,255 @@ +// (C) Copyright 2007 +// Douglas Gregor +// Andreas Kloeckner + +// Use, modification and distribution is subject to the Boost Software +// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// Authors: Douglas Gregor, Andreas Kloeckner + +/** @file py_nonblocking.cpp + * + * This file reflects the Boost.MPI nonblocking operations into Python + * functions. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "request_with_value.hpp" + +using namespace std; +using namespace boost::python; +using namespace boost::mpi; + + + + +namespace +{ + template + class py_call_output_iterator : + public boost::output_iterator_helper< + py_call_output_iterator > + { + private: + object m_callable; + RequestIterator m_request_iterator; + + public: + explicit py_call_output_iterator(object callable, + const RequestIterator &req_it) + : m_callable(callable), m_request_iterator(req_it) + { } + + py_call_output_iterator &operator=(ValueType const &v) + { + m_callable((m_request_iterator++)->get_value_or_none(), v); + return *this; + } + }; + + + + + typedef std::vector request_list; + typedef py_call_output_iterator + status_value_iterator; + + + + + std::auto_ptr make_request_list_from_py_list(object iterable) + { + std::auto_ptr result(new request_list); + std::copy( + stl_input_iterator(iterable), + stl_input_iterator(), + back_inserter(*result)); + return result; + } + + + + + class request_list_indexing_suite : + public vector_indexing_suite + { + public: + // FIXME: requests are not comparable, thus __contains__ makes no sense. + // Unfortunately, indexing_suites insist on having __contains__ available. + // Just make it error out for now. + + static bool + contains(request_list& container, request const& key) + { + PyErr_SetString(PyExc_NotImplementedError, "mpi requests are not comparable"); + throw error_already_set(); + } + }; + + + + + void check_request_list_not_empty(const request_list &requests) + { + if (requests.size() == 0) + { + PyErr_SetString(PyExc_ValueError, "cannot wait on an empty request vector"); + throw error_already_set(); + } + + } + + + + + + object wrap_wait_any(request_list &requests) + { + check_request_list_not_empty(requests); + + pair result = + wait_any(requests.begin(), requests.end()); + + return make_tuple( + result.second->get_value_or_none(), + result.first, + distance(requests.begin(), result.second)); + } + + + + + object wrap_test_any(request_list &requests) + { + check_request_list_not_empty(requests); + ::boost::optional > result = + test_any(requests.begin(), requests.end()); + + if (result) + return make_tuple( + result->second->get_value_or_none(), + result->first, + distance(requests.begin(), result->second)); + else + return object(); + } + + + + + + void wrap_wait_all(request_list &requests, object py_callable) + { + check_request_list_not_empty(requests); + if (py_callable != object()) + wait_all(requests.begin(), requests.end(), + status_value_iterator(py_callable, requests.begin())); + else + wait_all(requests.begin(), requests.end()); + } + + + + + bool wrap_test_all(request_list &requests, object py_callable) + { + check_request_list_not_empty(requests); + if (py_callable != object()) + return test_all(requests.begin(), requests.end(), + status_value_iterator(py_callable, requests.begin())); + else + return test_all(requests.begin(), requests.end()); + } + + + + + int wrap_wait_some(request_list &requests, object py_callable) + { + check_request_list_not_empty(requests); + request_list::iterator first_completed; + if (py_callable != object()) + first_completed = wait_some(requests.begin(), requests.end(), + status_value_iterator(py_callable, requests.begin())).second; + else + first_completed = wait_some(requests.begin(), requests.end()); + + return distance(requests.begin(), first_completed); + } + + + + + int wrap_test_some(request_list &requests, object py_callable) + { + check_request_list_not_empty(requests); + request_list::iterator first_completed; + if (py_callable != object()) + first_completed = test_some(requests.begin(), requests.end(), + status_value_iterator(py_callable, requests.begin())).second; + else + first_completed = test_some(requests.begin(), requests.end()); + + return distance(requests.begin(), first_completed); + } +} + + + + +namespace boost { namespace mpi { namespace python { + +extern const char* request_list_init_docstring; +extern const char* request_list_append_docstring; + +extern const char* nonblocking_wait_any_docstring; +extern const char* nonblocking_test_any_docstring; +extern const char* nonblocking_wait_all_docstring; +extern const char* nonblocking_test_all_docstring; +extern const char* nonblocking_wait_some_docstring; +extern const char* nonblocking_test_some_docstring; + +void export_nonblocking() +{ + using boost::python::arg; + + { + typedef request_list cl; + class_("RequestList", "A list of Request objects.") + .def("__init__", make_constructor(make_request_list_from_py_list), + /*arg("iterable"),*/ request_list_init_docstring) + .def(request_list_indexing_suite()) + ; + } + + def("wait_any", wrap_wait_any, + (arg("requests")), + nonblocking_wait_any_docstring); + def("test_any", wrap_test_any, + (arg("requests")), + nonblocking_test_any_docstring); + + def("wait_all", wrap_wait_all, + (arg("requests"), arg("callable") = object()), + nonblocking_wait_all_docstring); + def("test_all", wrap_test_all, + (arg("requests"), arg("callable") = object()), + nonblocking_test_all_docstring); + + def("wait_some", wrap_wait_some, + (arg("requests"), arg("callable") = object()), + nonblocking_wait_some_docstring); + def("test_some", wrap_test_some, + (arg("requests"), arg("callable") = object()), + nonblocking_test_some_docstring); +} + +} } } diff --git a/src/python/py_request.cpp b/src/python/py_request.cpp index 2746265..53aa4de 100644 --- a/src/python/py_request.cpp +++ b/src/python/py_request.cpp @@ -13,52 +13,90 @@ */ #include #include +#include "request_with_value.hpp" using namespace boost::python; using namespace boost::mpi; -namespace boost { namespace mpi { namespace python { - -extern const char* request_docstring; -extern const char* request_wait_docstring; -extern const char* request_test_docstring; -extern const char* request_cancel_docstring; - -object request_wait(object req_obj) +const object python::request_with_value::get_value() const { - request& req = extract(req_obj)(); - status stat = req.wait(); - if (PyObject_HasAttrString(req_obj.ptr(), "value")) - return boost::python::make_tuple(stat, req_obj.attr("value")); + if (m_internal_value.get()) + return *m_internal_value; + else if (m_external_value) + return *m_external_value; else - return object(stat); + { + PyErr_SetString(PyExc_ValueError, "request value not available"); + throw boost::python::error_already_set(); + } } -object request_test(object req_obj) +const object python::request_with_value::get_value_or_none() const { - request& req = extract(req_obj)(); - - if (optional stat = req.test()) - { - if (PyObject_HasAttrString(req_obj.ptr(), "value")) - return boost::python::make_tuple(stat, req_obj.attr("value")); - else - return object(stat); - } + if (m_internal_value.get()) + return *m_internal_value; + else if (m_external_value) + return *m_external_value; else return object(); } +const object python::request_with_value::wrap_wait() +{ + status stat = request::wait(); + if (m_internal_value.get() || m_external_value) + return boost::python::make_tuple(get_value(), stat); + else + return object(stat); +} + +const object python::request_with_value::wrap_test() +{ + ::boost::optional stat = request::test(); + if (stat) + { + if (m_internal_value.get() || m_external_value) + return boost::python::make_tuple(get_value(), *stat); + else + return object(*stat); + } + else + return object(); +} + + +namespace boost { namespace mpi { namespace python { + +extern const char* request_docstring; +extern const char* request_with_value_docstring; +extern const char* request_wait_docstring; +extern const char* request_test_docstring; +extern const char* request_cancel_docstring; +extern const char* request_value_docstring; + void export_request() { using boost::python::arg; using boost::python::object; - class_("request", request_docstring, no_init) - .def("wait", &request_wait, request_wait_docstring) - .def("test", &request_test, request_test_docstring) - .def("cancel", &request::cancel, request_cancel_docstring) - ; + { + typedef request cl; + class_("Request", request_docstring, no_init) + .def("wait", &cl::wait, request_wait_docstring) + .def("test", &cl::test, request_test_docstring) + .def("cancel", &cl::cancel, request_cancel_docstring) + ; + } + { + typedef request_with_value cl; + class_ >( + "RequestWithValue", request_with_value_docstring, no_init) + .def("wait", &cl::wrap_wait, request_wait_docstring) + .def("test", &cl::wrap_test, request_test_docstring) + ; + } + + implicitly_convertible(); } } } } // end namespace boost::mpi::python diff --git a/src/python/py_timer.cpp b/src/python/py_timer.cpp index d33f694..88b1b40 100644 --- a/src/python/py_timer.cpp +++ b/src/python/py_timer.cpp @@ -32,7 +32,7 @@ void export_timer() using boost::python::arg; using boost::python::object; - class_("timer", timer_docstring) + class_("Timer", timer_docstring) .def(init<>()) .def("restart", &timer::restart, timer_restart_docstring) .add_property("elapsed", &timer::elapsed, timer_elapsed_docstring) diff --git a/src/python/request_with_value.hpp b/src/python/request_with_value.hpp new file mode 100644 index 0000000..04d0c53 --- /dev/null +++ b/src/python/request_with_value.hpp @@ -0,0 +1,71 @@ +// (C) Copyright 2006 +// Douglas Gregor +// Andreas Kloeckner + +// Use, modification and distribution is subject to the Boost Software +// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// Authors: Douglas Gregor, Andreas Kloeckner + +#ifndef BOOST_MPI_PYTHON_REQUEST_WITH_VALUE_HPP +#define BOOST_MPI_PYTHON_REQUEST_WITH_VALUE_HPP + +#include +#include + +namespace boost { namespace mpi { namespace python { + + /** This wrapper adds a @c boost::python::object value to the @c + * boost::mpi::request structure, for the benefit of @c irecv() requests. + * + * In order to be able to return the value of his requests to the user, we + * need a handle that we can update to contain the transmitted value once the + * request completes. Since we're passing the address on to irecv to fill at + * any time in the future, this address may not change over time. + * + * There are two possible cases: + * - plain irecv() + * - skeleton-content irecv() + * + * In the first case, we need to own the storage from this object, the + * m_internal_value is used for this. In the second case, the updated + * python::object is part of a boost::mpi::python::content object: the + * m_external_value field handles this case. Furthermore, in the latter case, + * we now have a lifetime dependency on that content object; this can be + * handled with the BPL's with_custodian_and_ward facility. + * + * Since requests and request_with_value are supposed to be copyconstructible, + * we can't put the handle immediately inside this instance. Moreover, since + * we need to be able to put request_with_value inside request_vectors, any + * values we own must be held in a shared_ptr instance. + */ + + class request_with_value : public request + { + private: + boost::shared_ptr m_internal_value; + boost::python::object *m_external_value; + + public: + request_with_value() + : m_external_value(0) + { } + request_with_value(const request &req) + : request(req), m_external_value(0) + { } + + const boost::python::object get_value() const; + const boost::python::object get_value_or_none() const; + + const boost::python::object wrap_wait(); + const boost::python::object wrap_test(); + + friend request_with_value communicator_irecv(const communicator &, int, int); + friend request_with_value communicator_irecv_content( + const communicator&, int, int, content&); + }; + +} } } + +#endif diff --git a/src/python/skeleton_and_content.cpp b/src/python/skeleton_and_content.cpp index 45a39d1..d5376c1 100644 --- a/src/python/skeleton_and_content.cpp +++ b/src/python/skeleton_and_content.cpp @@ -16,6 +16,7 @@ #include #include #include "utility.hpp" +#include "request_with_value.hpp" using namespace boost::python; using namespace boost::mpi; @@ -114,14 +115,12 @@ communicator_recv_content(const communicator& comm, int source, int tag, /// Receive the content of a Python object. The request object's value /// attribute will reference the object whose content is being /// received, not the content wrapper. -object +request_with_value communicator_irecv_content(const communicator& comm, int source, int tag, - const content& c) + content& c) { - using boost::python::make_tuple; - - object req(comm.irecv(source, tag, c.base())); - req.attr("value") = c.object; + request_with_value req(comm.irecv(source, tag, c.base())); + req.m_external_value = &c.object; return req; } @@ -140,7 +139,7 @@ void export_skeleton_and_content(class_& comm) // Expose the object_without_skeleton exception object type = class_ - ("object_without_skeleton", object_without_skeleton_docstring, no_init) + ("ObjectWithoutSkeleton", object_without_skeleton_docstring, no_init) .def_readonly("object", &object_without_skeleton::value, object_without_skeleton_object_docstring) .def("__str__", &object_without_skeleton_str) @@ -150,11 +149,11 @@ void export_skeleton_and_content(class_& comm) // Expose the Python variants of "skeleton_proxy" and "content", and // their generator functions. detail::skeleton_proxy_base_type = - class_("skeleton_proxy", skeleton_proxy_docstring, + class_("SkeletonProxy", skeleton_proxy_docstring, no_init) .def_readonly("object", &skeleton_proxy_base::object, skeleton_proxy_object_docstring); - class_("content", content_docstring, no_init); + class_("Content", content_docstring, no_init); def("skeleton", &skeleton, arg("object"), skeleton_docstring); def("get_content", &get_content, arg("object"), get_content_docstring); @@ -166,7 +165,9 @@ void export_skeleton_and_content(class_& comm) (arg("source") = any_source, arg("tag") = any_tag, arg("buffer"), arg("return_status") = false)) .def("irecv", communicator_irecv_content, - (arg("source") = any_source, arg("tag") = any_tag, arg("buffer"))); + (arg("source") = any_source, arg("tag") = any_tag, arg("buffer")), + with_custodian_and_ward_postcall<0, 4>() + ); } } } } // end namespace boost::mpi::python diff --git a/src/python/status.cpp b/src/python/status.cpp index 51e1d27..a74221a 100644 --- a/src/python/status.cpp +++ b/src/python/status.cpp @@ -30,7 +30,7 @@ void export_status() using boost::python::arg; using boost::python::object; - class_("status", status_docstring, no_init) + class_("Status", status_docstring, no_init) .add_property("source", &status::source, status_source_docstring) .add_property("tag", &status::tag, status_tag_docstring) .add_property("error", &status::error, status_error_docstring) diff --git a/test/python/nonblocking_test.py b/test/python/nonblocking_test.py new file mode 100644 index 0000000..73b451c --- /dev/null +++ b/test/python/nonblocking_test.py @@ -0,0 +1,131 @@ +# (C) Copyright 2007 +# Andreas Kloeckner +# +# Use, modification and distribution is subject to the Boost Software +# License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at +# http://www.boost.org/LICENSE_1_0.txt) +# +# Authors: Andreas Kloeckner + + + + +import boost.mpi as mpi +import random +import sys + +MAX_GENERATIONS = 20 +TAG_DEBUG = 0 +TAG_DATA = 1 +TAG_TERMINATE = 2 +TAG_PROGRESS_REPORT = 3 + + + + +class TagGroupListener: + """Class to help listen for only a given set of tags. + + This is contrived: Typicallly you could just listen for + mpi.any_tag and filter.""" + def __init__(self, comm, tags): + self.tags = tags + self.comm = comm + self.active_requests = {} + + def wait(self): + for tag in self.tags: + if tag not in self.active_requests: + self.active_requests[tag] = self.comm.irecv(tag=tag) + requests = mpi.RequestList(self.active_requests.values()) + data, status, index = mpi.wait_any(requests) + del self.active_requests[status.tag] + return status, data + + def cancel(self): + for r in self.active_requests.itervalues(): + r.cancel() + #r.wait() + self.active_requests = {} + + + +def rank0(): + sent_histories = (mpi.size-1)*15 + print "sending %d packets on their way" % sent_histories + send_reqs = mpi.RequestList() + for i in range(sent_histories): + dest = random.randrange(1, mpi.size) + send_reqs.append(mpi.world.isend(dest, TAG_DATA, [])) + + mpi.wait_all(send_reqs) + + completed_histories = [] + progress_reports = {} + dead_kids = [] + + tgl = TagGroupListener(mpi.world, + [TAG_DATA, TAG_DEBUG, TAG_PROGRESS_REPORT, TAG_TERMINATE]) + + def is_complete(): + for i in progress_reports.values(): + if i != sent_histories: + return False + return len(dead_kids) == mpi.size-1 + + while True: + status, data = tgl.wait() + + if status.tag == TAG_DATA: + #print "received completed history %s from %d" % (data, status.source) + completed_histories.append(data) + if len(completed_histories) == sent_histories: + print "all histories received, exiting" + for rank in range(1, mpi.size): + mpi.world.send(rank, TAG_TERMINATE, None) + elif status.tag == TAG_PROGRESS_REPORT: + progress_reports[len(data)] = progress_reports.get(len(data), 0) + 1 + elif status.tag == TAG_DEBUG: + print "[DBG %d] %s" % (status.source, data) + elif status.tag == TAG_TERMINATE: + dead_kids.append(status.source) + else: + print "unexpected tag %d from %d" % (status.tag, status.source) + + if is_complete(): + break + + print "OK" + +def comm_rank(): + while True: + data, status = mpi.world.recv(return_status=True) + if status.tag == TAG_DATA: + mpi.world.send(0, TAG_PROGRESS_REPORT, data) + data.append(mpi.rank) + if len(data) >= MAX_GENERATIONS: + dest = 0 + else: + dest = random.randrange(1, mpi.size) + mpi.world.send(dest, TAG_DATA, data) + elif status.tag == TAG_TERMINATE: + from time import sleep + mpi.world.send(0, TAG_TERMINATE, 0) + break + else: + print "[DIRECTDBG %d] unexpected tag %d from %d" % (mpi.rank, status.tag, status.source) + + +def main(): + # this program sends around messages consisting of lists of visited nodes + # randomly. After MAX_GENERATIONS, they are returned to rank 0. + + if mpi.rank == 0: + rank0() + else: + comm_rank() + + + +if __name__ == "__main__": + main()