2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-19 14:22:23 +00:00
Files
fiber/examples/work_sharing.cpp
Nat Goodspeed 3b9e9a3a56 Snapshot of documentation update for 2016 review.
Note effect of BOOST_USE_SEGMENTED_STACKS if StackAllocator is not explicitly
passed.

Introduce function_heading_for QuickBook template to allow separate
descriptions of swap(fiber), swap(packaged_task) and swap(promise).

Document async() using C++14 std::result_of_t and std::decay_t, aligning with
std::async() documentation.

Rework when_any / when_all examples to use unbounded_channel throughout, since
we always close() the channel after the first value anyway. bounded_channel
doesn't really add much value here.

Make wait_first_outcome_impl() infer its channel pointer type. That way we can
reuse that function instead of coding a separate wait_all_until_error_impl(),
which differs only in using the nchannel facade instead of directly pushing to
unbounded_channel.

Explain use of std::bind() to bind a lambda.

Use a more nuanced discussion of promise lifetime in write_ec() example
function.

Use condition_variable::wait(lock, predicate) in a couple places in
work_sharing.cpp example.
2016-02-06 23:17:04 -05:00

229 lines
8.1 KiB
C++

// Copyright Nat Goodspeed + Oliver Kowalke 2015.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <string>
#include <thread>
#include <boost/assert.hpp>
#include <boost/fiber/all.hpp>
#include "barrier.hpp"
static std::size_t fiber_count{ 0 };
static std::mutex mtx_count{};
static boost::fibers::condition_variable_any cnd_count{};
typedef std::unique_lock< std::mutex > lock_count;
/*****************************************************************************
* shared_ready_queue scheduler
*****************************************************************************/
class shared_ready_queue : public boost::fibers::sched_algorithm {
private:
typedef std::unique_lock< std::mutex > lock_t;
typedef std::queue< boost::fibers::context * > rqueue_t;
static rqueue_t rqueue_;
static std::mutex rqueue_mtx_;
rqueue_t local_queue_{};
std::mutex mtx_{};
std::condition_variable cnd_{};
bool flag_{ false };
public:
//[awakened_ws
virtual void awakened( boost::fibers::context * ctx) noexcept {
BOOST_ASSERT( nullptr != ctx);
if ( ctx->is_main_context() ) { /*<
recognize when we're passed this thread's main fiber
never put this thread's main fiber on the queue stash
it in separate slot
>*/
local_queue_.push( ctx);
} else if ( ctx->is_dispatcher_context() ) { /*<
recognize when we're passed this thread's dispatcher fiber
never put this thread's main fiber on the queue
stash it in separate slot
>*/
local_queue_.push( ctx);
} else {
lock_t lk(rqueue_mtx_); /*<
worker fiber, enqueue on shared queue
>*/
rqueue_.push( ctx);
}
}
//]
//[pick_next_ws
virtual boost::fibers::context * pick_next() noexcept {
boost::fibers::context * ctx( nullptr);
lock_t lk(rqueue_mtx_);
if ( ! rqueue_.empty() ) { /*<
pop an item from the ready queue
>*/
ctx = rqueue_.front();
rqueue_.pop();
lk.unlock();
BOOST_ASSERT( nullptr != ctx);
boost::fibers::context::active()->migrate( ctx); /*<
attach context to current scheduler via the active fiber
of this thread
>*/
} else {
lk.unlock();
if ( ! local_queue_.empty() ) { /*<
nothing in the ready queue, return main or dispatcher fiber
>*/
ctx = local_queue_.front();
local_queue_.pop();
}
}
return ctx;
}
//]
virtual bool has_ready_fibers() const noexcept {
lock_t lock(rqueue_mtx_);
return ! rqueue_.empty() || ! local_queue_.empty();
}
void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept {
if ( (std::chrono::steady_clock::time_point::max)() == time_point) {
std::unique_lock< std::mutex > lk( mtx_);
cnd_.wait( lk, [this](){ return flag_; });
flag_ = false;
} else {
std::unique_lock< std::mutex > lk( mtx_);
cnd_.wait_until( lk, time_point, [this](){ return flag_; });
flag_ = false;
}
}
void notify() noexcept {
std::unique_lock< std::mutex > lk( mtx_);
flag_ = true;
lk.unlock();
cnd_.notify_all();
}
};
shared_ready_queue::rqueue_t shared_ready_queue::rqueue_{};
std::mutex shared_ready_queue::rqueue_mtx_{};
/*****************************************************************************
* example fiber function
*****************************************************************************/
//[fiber_fn_ws
void whatevah( char me) {
try {
std::thread::id my_thread = std::this_thread::get_id(); /*< get ID of initial thread >*/
{
std::ostringstream buffer;
buffer << "fiber " << me << " started on thread " << my_thread << '\n';
std::cout << buffer.str() << std::flush;
}
for ( unsigned i = 0; i < 10; ++i) { /*< loop ten time >*/
boost::this_fiber::yield(); /*< yield this fiber >*/
std::thread::id new_thread = std::this_thread::get_id(); /*< get ID of current thread >*/
if ( new_thread != my_thread) { /*< test if fiber was migrated to another thread >*/
my_thread = new_thread;
std::ostringstream buffer;
buffer << "fiber " << me << " switched to thread " << my_thread << '\n';
std::cout << buffer.str() << std::flush;
}
}
} catch ( ... ) {
}
lock_count lk( mtx_count);
if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
lk.unlock();
cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
}
}
//]
/*****************************************************************************
* example thread function
*****************************************************************************/
//[thread_fn_ws
void thread( barrier * b) {
std::ostringstream buffer;
buffer << "thread started " << std::this_thread::get_id() << std::endl;
std::cout << buffer.str() << std::flush;
boost::fibers::use_scheduling_algorithm< shared_ready_queue >(); /*<
Install the scheduling algorithm `shared_ready_queue` in order to
join the work sharing.
>*/
b->wait(); /*< wait on other threads >*/
lock_count lk( mtx_count);
cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*<
Suspend main fiber and resume worker fibers in the meanwhile.
Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
if all worker fibers are complete.
>*/
BOOST_ASSERT( 0 == fiber_count);
}
//]
/*****************************************************************************
* main()
*****************************************************************************/
int main( int argc, char *argv[]) {
std::cout << "main thread started " << std::this_thread::get_id() << std::endl;
//[main_ws
boost::fibers::use_scheduling_algorithm< shared_ready_queue >(); /*<
Install the scheduling algorithm `shared_ready_queue` in the main thread
too, so each new fiber gets launched into the shared pool.
>*/
for ( char c : std::string("abcdefghijklmnopqrstuvwxyz")) { /*<
Launch a number of worker fibers; each worker fiber picks-up a character
that is passed as parameter to fiber-function `whatevah`.
Each worker fiber gets detached, e.g. `shared_ready_queue` takes care
of fibers life-time.
>*/
boost::fibers::fiber([c](){ whatevah( c); }).detach();
++fiber_count; /*< Increment fiber counter for each new fiber. >*/
}
barrier b( 4);
std::thread threads[] = { /*<
Launch a couple of threads that join the work sharing.
>*/
std::thread( thread, & b),
std::thread( thread, & b),
std::thread( thread, & b)
};
b.wait(); /*< wait on other threads >*/
lock_count lk( mtx_count);
cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*<
Suspend main fiber and resume worker fibers in the meanwhile.
Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
if all worker fibers are complete.
>*/
}
lk.unlock(); /*<
Releasing lock of mtx_count is required before joining the threads, othwerwise
the other threads would be blocked inside condition_variable::wait() and
would never return (deadlock).
>*/
BOOST_ASSERT( 0 == fiber_count);
for ( std::thread & t : threads) { /*< wait for threads to terminate >*/
t.join();
}
//]
std::cout << "done." << std::endl;
return EXIT_SUCCESS;
}