diff --git a/examples/simple.cpp b/examples/simple.cpp index 6c097709..bff3a496 100644 --- a/examples/simple.cpp +++ b/examples/simple.cpp @@ -1,55 +1,130 @@ -#include -#include -#include -#include -#include - -#include -#include +// Copyright Nat Goodspeed 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) #include +#include +#include +#include +#include +#include +#include +#include +#include -inline -void fn( std::string const& str, int n) -{ - for ( int i = 0; i < n; ++i) - { - std::cout << i << ": " << str << std::endl; - boost::this_fiber::yield(); - } -} +/***************************************************************************** +* shared_ready_queue scheduler +*****************************************************************************/ +// This simple scheduler is like round_robin, except that it shares a common +// ready queue among all participating threads. A thread participates in this +// pool by executing use_scheduling_algorithm() before any +// other Boost.Fiber operation. +class shared_ready_queue : public boost::fibers::sched_algorithm { +private: + typedef std::queue rqueue_t; + // The important point about this ready queue is that it's a class static, + // common to all instances of shared_ready_queue. + static rqueue_t rqueue_; -void foo() { - try - { - boost::fibers::fiber f1( fn, "abc", 5); - std::cerr << "f1 : " << f1.get_id() << std::endl; - boost::fibers::fiber f2( std::allocator_arg, boost::fibers::fixedsize_stack(), - fn, std::string("xyz"), 8); - std::cerr << "f2 : " << f2.get_id() << std::endl; + // so is this mutex + static std::mutex mutex_; + typedef std::unique_lock lock_t; - f1.join(); - f2.join(); +public: + virtual void awakened( boost::fibers::fiber_context * f) { + BOOST_ASSERT( nullptr != f); + + lock_t lock(mutex_); + rqueue_.push( f); } - catch ( std::exception const& e) - { std::cerr << "exception: " << e.what() << std::endl; } - catch (...) - { std::cerr << "unhandled exception" << std::endl; } -} -int main() -{ - try - { - std::thread( foo).join(); - - std::cout << "done." << std::endl; - - return EXIT_SUCCESS; + virtual boost::fibers::fiber_context * pick_next() { + lock_t lock(mutex_); + boost::fibers::fiber_context * victim( nullptr); + if ( ! rqueue_.empty() ) { + victim = rqueue_.front(); + rqueue_.pop(); + BOOST_ASSERT( nullptr != victim); + } + return victim; + } + + virtual std::size_t ready_fibers() const noexcept { + lock_t lock(mutex_); + return rqueue_.size(); + } +}; + +shared_ready_queue::rqueue_t shared_ready_queue::rqueue_; +std::mutex shared_ready_queue::mutex_; + +/***************************************************************************** +* example thread function +*****************************************************************************/ +// Wait until all running fibers have completed. This works because we happen +// to know that all example fibers use yield(), which leaves them in ready +// state. A fiber blocked on a synchronization object is invisible to +// ready_fibers(). +void drain() { + // THIS fiber is running, so won't be counted among "ready" fibers + while (boost::fibers::ready_fibers()) { + boost::this_fiber::yield(); } - catch ( std::exception const& e) - { std::cerr << "exception: " << e.what() << std::endl; } - catch (...) - { std::cerr << "unhandled exception" << std::endl; } - return EXIT_FAILURE; } + +void thread() { + boost::fibers::use_scheduling_algorithm(); + drain(); +} + +/***************************************************************************** +* example fiber function +*****************************************************************************/ +void whatevah(char me) { + std::thread::id my_thread = std::this_thread::get_id(); + { + std::ostringstream buffer; + buffer << "fiber " << me << " started on thread " << my_thread << '\n'; + std::cout << buffer.str() << std::flush; + } + for (unsigned i = 0; i < 5; ++i) { + boost::this_fiber::yield(); + std::thread::id new_thread = std::this_thread::get_id(); + if (new_thread != my_thread) { + my_thread = new_thread; + std::ostringstream buffer; + buffer << "fiber " << me << " switched to thread " << my_thread << '\n'; + std::cout << buffer.str() << std::flush; + } + } +} + +int main( int argc, char *argv[]) { + // use shared_ready_queue for main thread too, so we launch new fibers + // into shared pool + boost::fibers::use_scheduling_algorithm(); + + // launch a number of fibers + for (char c : "abcdefghijklmno") { + boost::fibers::fiber([c](){ whatevah(c); }).detach(); + } + + // launch a couple threads to help process them + std::thread threads[] = { + std::thread(thread), + std::thread(thread), + std::thread(thread) + }; + + // drain running fibers + drain(); + + // wait for threads to terminate + for (std::thread& t : threads) { + t.join(); + } + + return EXIT_SUCCESS; +} + diff --git a/include/boost/fiber/fiber_context.hpp b/include/boost/fiber/fiber_context.hpp index 26ef1e0d..aeb61bcc 100644 --- a/include/boost/fiber/fiber_context.hpp +++ b/include/boost/fiber/fiber_context.hpp @@ -80,21 +80,21 @@ private: typedef std::map< uintptr_t, fss_data > fss_data_t; #if ! defined(BOOST_FIBERS_NO_ATOMICS) - std::atomic< std::size_t > use_count_; - std::atomic< fiber_status > state_; - std::atomic< int > flags_; + std::atomic< std::size_t > use_count_; + std::atomic< fiber_status > state_; + std::atomic< int > flags_; #else - std::size_t use_count_; - fiber_status state_; - int flags_; + std::size_t use_count_; + fiber_status state_; + int flags_; #endif - detail::spinlock splk_; - context::execution_context ctx_; - fss_data_t fss_data_; - std::vector< fiber_context * > waiting_; - std::exception_ptr except_; - std::chrono::steady_clock::time_point tp_; - fiber_properties * properties_; + detail::spinlock splk_; + context::execution_context ctx_; + fss_data_t fss_data_; + std::vector< fiber_context * > waiting_; + std::exception_ptr except_; + std::chrono::steady_clock::time_point tp_; + fiber_properties * properties_; // main fiber fiber_context() : @@ -240,6 +240,10 @@ public: void request_interruption( bool req) noexcept; + bool is_main_fiber() const noexcept { + return 0 != ( flags_ & flag_main_fiber); + } + bool is_terminated() const noexcept { return fiber_status::terminated == state_; } diff --git a/src/detail/waiting_queue.cpp b/src/detail/waiting_queue.cpp index 1ba291c8..2303c0b1 100644 --- a/src/detail/waiting_queue.cpp +++ b/src/detail/waiting_queue.cpp @@ -91,7 +91,7 @@ waiting_queue::move_to( std::unique_ptr< sched_algorithm > & sched_algo) { void waiting_queue::interrupt_all() noexcept { - fiber_context * mf( fiber_context::main_fiber() ); + fiber_context * mf( fiber_context::main_fiber() ); // ? for ( fiber_context * f( head_); nullptr != f; f = f->nxt) { if ( f != mf) { f->request_interruption( true);