2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-01 08:32:08 +00:00

Merge branch 'develop'

This commit is contained in:
Oliver Kowalke
2017-06-10 17:55:32 +02:00
32 changed files with 250 additions and 146 deletions

View File

@@ -49,7 +49,7 @@ struct yield_completion {
if ( ! completed_) {
// suspend(unique_lock<spinlock>) unlocks the lock in the act of
// resuming another fiber
fibers::context::active()->suspend( & lk);
fibers::context::active()->suspend( lk);
}
}

View File

@@ -153,7 +153,7 @@ public:
} else if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
// suspend this producer
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
} else {
slots_[pidx_] = value;
pidx_ = (pidx_ + 1) % capacity_;
@@ -178,7 +178,7 @@ public:
} else if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
// suspend this producer
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
} else {
slots_[pidx_] = std::move( value);
pidx_ = (pidx_ + 1) % capacity_;
@@ -220,7 +220,7 @@ public:
} else if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, & lk) ) {
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
lk.lock();
// remove from waiting-queue
@@ -254,7 +254,7 @@ public:
} else if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, & lk) ) {
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
lk.lock();
// remove from waiting-queue
@@ -307,7 +307,7 @@ public:
} else {
active_ctx->wait_link( waiting_consumers_);
// suspend this consumer
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
}
} else {
value = std::move( slots_[cidx_]);
@@ -336,7 +336,7 @@ public:
} else {
active_ctx->wait_link( waiting_consumers_);
// suspend this consumer
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
}
} else {
value_type value = std::move( slots_[cidx_]);
@@ -373,7 +373,7 @@ public:
} else {
active_ctx->wait_link( waiting_consumers_);
// suspend this consumer
if ( ! active_ctx->wait_until( timeout_time, & lk) ) {
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
lk.lock();
// remove from waiting-queue

View File

@@ -73,7 +73,7 @@ public:
// unlock external lt
lt.unlock();
// suspend this fiber
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
// relock external again before returning
try {
lt.lock();
@@ -104,7 +104,7 @@ public:
// unlock external lt
lt.unlock();
// suspend this fiber
if ( ! active_ctx->wait_until( timeout_time, & lk) ) {
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
status = cv_status::timeout;
// relock local lk
lk.lock();

View File

@@ -32,6 +32,7 @@
#include <boost/intrusive/slist.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/data.hpp>
#include <boost/fiber/detail/decay_copy.hpp>
#include <boost/fiber/detail/fss.hpp>
#include <boost/fiber/detail/spinlock.hpp>
@@ -185,9 +186,6 @@ private:
fiber_properties * properties_{ nullptr };
std::chrono::steady_clock::time_point tp_{ (std::chrono::steady_clock::time_point::max)() };
boost::context::continuation c_{};
context * from_ctx_{ nullptr };
context * ready_ctx_{ nullptr };
detail::spinlock_lock * lk_{ nullptr };
type type_;
launch policy_;
@@ -197,7 +195,7 @@ private:
policy_{ policy } {
}
void resume_() noexcept;
void resume_( detail::data_t &) noexcept;
public:
class id {
@@ -280,11 +278,11 @@ public:
}
void resume() noexcept;
void resume( detail::spinlock_lock *) noexcept;
void resume( detail::spinlock_lock &) noexcept;
void resume( context *) noexcept;
void suspend() noexcept;
void suspend( detail::spinlock_lock *) noexcept;
void suspend( detail::spinlock_lock &) noexcept;
boost::context::continuation suspend_with_cc() noexcept;
boost::context::continuation terminate() noexcept;
@@ -295,7 +293,7 @@ public:
bool wait_until( std::chrono::steady_clock::time_point const&) noexcept;
bool wait_until( std::chrono::steady_clock::time_point const&,
detail::spinlock_lock *) noexcept;
detail::spinlock_lock &) noexcept;
void schedule( context *) noexcept;
@@ -422,18 +420,13 @@ private:
// if an exception escapes `fn`
{
c = c.resume();
context * active_ctx = active();
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( nullptr != active_ctx->from_ctx_);
active_ctx->from_ctx_->c_ = std::move( c);
active_ctx->from_ctx_ = nullptr;
if ( nullptr != active_ctx->lk_) {
active_ctx->lk_->unlock();
active_ctx->lk_ = nullptr;
}
if ( nullptr != active_ctx->ready_ctx_) {
active_ctx->schedule( active_ctx->ready_ctx_);
active_ctx->ready_ctx_ = nullptr;
detail::data_t * dp = c.get_data< detail::data_t * >();
// update contiunation of calling fiber
dp->from->c_ = std::move( c);
if ( nullptr != dp->lk) {
dp->lk->unlock();
} else if ( nullptr != dp->ctx) {
active()->schedule( dp->ctx);
}
#if defined(BOOST_NO_CXX17_STD_APPLY)
boost::context::detail::apply( std::move( fn_), std::move( arg_) );
@@ -467,11 +460,9 @@ static intrusive_ptr< context > make_worker_context( launch policy,
typedef worker_context< Fn, Arg ... > context_t;
auto sctx = salloc.allocate();
BOOST_ASSERT( ( sizeof( context_t) + 2048) < sctx.size); // stack at least of 2kB
const std::size_t offset = sizeof( context_t) + 63;
// reserve space for control structure
void * storage = reinterpret_cast< void * >(
( reinterpret_cast< uintptr_t >( sctx.sp) - static_cast< uintptr_t >( offset) )
( reinterpret_cast< uintptr_t >( sctx.sp) - static_cast< uintptr_t >( sizeof( context_t) ) )
& ~ static_cast< uintptr_t >( 0xff) );
void * stack_bottom = reinterpret_cast< void * >(
reinterpret_cast< uintptr_t >( sctx.sp) - static_cast< uintptr_t >( sctx.size) );

View File

@@ -0,0 +1,54 @@
// Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#ifndef BOOST_FIBERS_DETAIL_DATA_H
#define BOOST_FIBERS_DETAIL_DATA_H
#include <boost/config.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
class context;
namespace detail {
struct data_t {
spinlock_lock * lk{ nullptr };
context * ctx{ nullptr };
context * from;
explicit data_t( context * from_) noexcept :
from{ from_ } {
}
explicit data_t( spinlock_lock * lk_,
context * from_) noexcept :
lk{ lk_ },
from{ from_ } {
}
explicit data_t( context * ctx_,
context * from_) noexcept :
ctx{ ctx_ },
from{ from_ } {
}
};
}}}
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#endif // BOOST_FIBERS_DETAIL_DATA_H

View File

@@ -26,28 +26,28 @@ namespace fibers {
namespace detail {
#if BOOST_OS_LINUX
inline
BOOST_FORCEINLINE
int sys_futex( void * addr, std::int32_t op, std::int32_t x) {
return ::syscall( SYS_futex, addr, op, x, nullptr, nullptr, 0);
}
inline
BOOST_FORCEINLINE
int futex_wake( std::atomic< std::int32_t > * addr) {
return 0 <= sys_futex( static_cast< void * >( addr), FUTEX_WAKE_PRIVATE, 1) ? 0 : -1;
}
inline
BOOST_FORCEINLINE
int futex_wait( std::atomic< std::int32_t > * addr, std::int32_t x) {
return 0 <= sys_futex( static_cast< void * >( addr), FUTEX_WAIT_PRIVATE, x) ? 0 : -1;
}
#elif BOOST_OS_WINDOWS
inline
BOOST_FORCEINLINE
int futex_wake( std::atomic< std::int32_t > * addr) {
::WakeByAddressSingle( static_cast< void * >( addr) );
return 0;
}
inline
BOOST_FORCEINLINE
int futex_wait( std::atomic< std::int32_t > * addr, std::int32_t x) {
::WaitOnAddress( static_cast< volatile void * >( addr), & x, sizeof( x), INFINITE);
return 0;

View File

@@ -25,17 +25,17 @@ namespace detail {
template< typename FBSplk >
class spinlock_rtm {
private:
FBSplk splk_;
FBSplk splk_{};
std::minstd_rand generator_{};
public:
spinlock_rtm() noexcept = default;
spinlock_rtm() = default;
spinlock_rtm( spinlock_rtm const&) = delete;
spinlock_rtm & operator=( spinlock_rtm const&) = delete;
void lock() noexcept {
std::size_t collisions = 0 ;
std::minstd_rand generator;
for ( std::size_t retries = 0; retries < BOOST_FIBERS_RETRY_THRESHOLD; ++retries) {
std::uint32_t status;
if ( rtm_status::success == ( status = rtm_begin() ) ) {
@@ -56,7 +56,7 @@ public:
if ( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD > collisions) {
std::uniform_int_distribution< std::size_t > distribution{
0, static_cast< std::size_t >( 1) << (std::min)(collisions, static_cast< std::size_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) };
const std::size_t z = distribution( generator);
const std::size_t z = distribution( generator_);
++collisions;
for ( std::size_t i = 0; i < z; ++i) {
cpu_relax();

View File

@@ -31,16 +31,16 @@ private:
friend class spinlock_rtm;
std::atomic< spinlock_status > state_{ spinlock_status::unlocked };
std::minstd_rand generator_{};
public:
spinlock_ttas() noexcept = default;
spinlock_ttas() = default;
spinlock_ttas( spinlock_ttas const&) = delete;
spinlock_ttas & operator=( spinlock_ttas const&) = delete;
void lock() noexcept {
std::size_t collisions = 0 ;
std::minstd_rand generator;
for (;;) {
// avoid using multiple pause instructions for a delay of a specific cycle count
// the delay of cpu_relax() (pause on Intel) depends on the processor family
@@ -88,7 +88,7 @@ public:
// linear_congruential_engine is a random number engine based on Linear congruential generator (LCG)
std::uniform_int_distribution< std::size_t > distribution{
0, static_cast< std::size_t >( 1) << (std::min)(collisions, static_cast< std::size_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) };
const std::size_t z = distribution( generator);
const std::size_t z = distribution( generator_);
++collisions;
for ( std::size_t i = 0; i < z; ++i) {
// -> reduces the power consumed by the CPU

View File

@@ -32,16 +32,16 @@ private:
std::atomic< spinlock_status > state_{ spinlock_status::unlocked };
std::atomic< std::size_t > retries_{ 0 };
std::minstd_rand generator_{};
public:
spinlock_ttas_adaptive() noexcept = default;
spinlock_ttas_adaptive() = default;
spinlock_ttas_adaptive( spinlock_ttas_adaptive const&) = delete;
spinlock_ttas_adaptive & operator=( spinlock_ttas_adaptive const&) = delete;
void lock() noexcept {
std::size_t collisions = 0 ;
std::minstd_rand generator;
for (;;) {
std::size_t retries = 0;
const std::size_t prev_retries = retries_.load( std::memory_order_relaxed);
@@ -95,7 +95,7 @@ public:
// linear_congruential_engine is a random number engine based on Linear congruential generator (LCG)
std::uniform_int_distribution< std::size_t > distribution{
0, static_cast< std::size_t >( 1) << (std::min)(collisions, static_cast< std::size_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) };
const std::size_t z = distribution( generator);
const std::size_t z = distribution( generator_);
++collisions;
for ( std::size_t i = 0; i < z; ++i) {
// -> reduces the power consumed by the CPU

View File

@@ -31,9 +31,10 @@ private:
std::atomic< std::int32_t > value_{ 0 };
std::atomic< std::int32_t > retries_{ 0 };
std::minstd_rand generator_{};
public:
spinlock_ttas_adaptive_futex() noexcept = default;
spinlock_ttas_adaptive_futex() = default;
spinlock_ttas_adaptive_futex( spinlock_ttas_adaptive_futex const&) = delete;
spinlock_ttas_adaptive_futex & operator=( spinlock_ttas_adaptive_futex const&) = delete;
@@ -45,7 +46,6 @@ public:
static_cast< std::int32_t >( BOOST_FIBERS_SPIN_BEFORE_SLEEP0), 2 * prev_retries + 10);
const std::int32_t max_sleep_retries = (std::min)(
static_cast< std::int32_t >( BOOST_FIBERS_SPIN_BEFORE_YIELD), 2 * prev_retries + 10);
std::minstd_rand generator;
// after max. spins or collisions suspend via futex
while ( retries++ < BOOST_FIBERS_RETRY_THRESHOLD) {
// avoid using multiple pause instructions for a delay of a specific cycle count
@@ -92,7 +92,7 @@ public:
// linear_congruential_engine is a random number engine based on Linear congruential generator (LCG)
std::uniform_int_distribution< std::int32_t > distribution{
0, static_cast< std::int32_t >( 1) << (std::min)(collisions, static_cast< std::int32_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) };
const std::int32_t z = distribution( generator);
const std::int32_t z = distribution( generator_);
++collisions;
for ( std::int32_t i = 0; i < z; ++i) {
// -> reduces the power consumed by the CPU

View File

@@ -30,16 +30,16 @@ private:
friend class spinlock_rtm;
std::atomic< std::int32_t > value_{ 0 };
std::minstd_rand generator_{};
public:
spinlock_ttas_futex() noexcept = default;
spinlock_ttas_futex() = default;
spinlock_ttas_futex( spinlock_ttas_futex const&) = delete;
spinlock_ttas_futex & operator=( spinlock_ttas_futex const&) = delete;
void lock() noexcept {
std::int32_t collisions = 0, retries = 0, expected = 0;
std::minstd_rand generator;
// after max. spins or collisions suspend via futex
while ( retries++ < BOOST_FIBERS_RETRY_THRESHOLD) {
// avoid using multiple pause instructions for a delay of a specific cycle count
@@ -86,7 +86,7 @@ public:
// linear_congruential_engine is a random number engine based on Linear congruential generator (LCG)
std::uniform_int_distribution< std::int32_t > distribution{
0, static_cast< std::int32_t >( 1) << (std::min)(collisions, static_cast< std::int32_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) };
const std::int32_t z = distribution( generator);
const std::int32_t z = distribution( generator_);
++collisions;
for ( std::int32_t i = 0; i < z; ++i) {
// -> reduces the power consumed by the CPU

View File

@@ -22,6 +22,7 @@
#include <boost/fiber/algo/algorithm.hpp>
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/data.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
@@ -123,7 +124,7 @@ public:
boost::context::continuation dispatch() noexcept;
boost::context::continuation terminate( detail::spinlock_lock *, context *) noexcept;
boost::context::continuation terminate( detail::spinlock_lock &, context *) noexcept;
void yield( context *) noexcept;
@@ -131,10 +132,10 @@ public:
std::chrono::steady_clock::time_point const&) noexcept;
bool wait_until( context *,
std::chrono::steady_clock::time_point const&,
detail::spinlock_lock *) noexcept;
detail::spinlock_lock &) noexcept;
void suspend() noexcept;
void suspend( detail::spinlock_lock *) noexcept;
void suspend( detail::spinlock_lock &) noexcept;
bool has_ready_fibers() const noexcept;

View File

@@ -25,6 +25,7 @@
#include <boost/intrusive/set.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/data.hpp>
#include <boost/fiber/detail/decay_copy.hpp>
#include <boost/fiber/detail/fss.hpp>
#include <boost/fiber/detail/spinlock.hpp>

View File

@@ -149,7 +149,7 @@ public:
active_ctx->schedule( consumer_ctx);
}
// suspend till value has been consumed
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
// resumed, value has been consumed
return channel_op_status::success;
} else {
@@ -162,7 +162,7 @@ public:
}
active_ctx->wait_link( waiting_producers_);
// suspend this producer
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
// resumed, slot mabye free
}
}
@@ -184,7 +184,7 @@ public:
active_ctx->schedule( consumer_ctx);
}
// suspend till value has been consumed
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
// resumed, value has been consumed
return channel_op_status::success;
} else {
@@ -197,7 +197,7 @@ public:
}
active_ctx->wait_link( waiting_producers_);
// suspend this producer
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
// resumed, slot mabye free
}
}
@@ -236,7 +236,7 @@ public:
active_ctx->schedule( consumer_ctx);
}
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, & lk) ) {
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// clear slot
slot * nil_slot = nullptr, * own_slot = & s;
slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
@@ -255,7 +255,7 @@ public:
}
active_ctx->wait_link( waiting_producers_);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, & lk) ) {
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
lk.lock();
// remove from waiting-queue
@@ -286,7 +286,7 @@ public:
active_ctx->schedule( consumer_ctx);
}
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, & lk) ) {
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// clear slot
slot * nil_slot = nullptr, * own_slot = & s;
slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
@@ -305,7 +305,7 @@ public:
}
active_ctx->wait_link( waiting_producers_);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, & lk) ) {
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
lk.lock();
// remove from waiting-queue
@@ -347,7 +347,7 @@ public:
}
active_ctx->wait_link( waiting_consumers_);
// suspend this consumer
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
// resumed, slot mabye set
}
}
@@ -385,7 +385,7 @@ public:
}
active_ctx->wait_link( waiting_consumers_);
// suspend this consumer
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
// resumed, slot mabye set
}
}
@@ -431,7 +431,7 @@ public:
}
active_ctx->wait_link( waiting_consumers_);
// suspend this consumer
if ( ! active_ctx->wait_until( timeout_time, & lk) ) {
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
lk.lock();
// remove from waiting-queue

View File

@@ -25,6 +25,7 @@
#include <boost/fiber/all.hpp>
#include <boost/fiber/numa/topology.hpp>
#include <boost/predef.h>
#include "../barrier.hpp"
@@ -85,7 +86,14 @@ int main() {
barrier b{ hardware_concurrency( topo) };
std::size_t size{ 1000000 };
std::size_t div{ 10 };
// Windows 10 and FreeBSD require a fiber stack of 8kb
// otherwise the stack gets exhausted
// stack requirements must be checked for other OS too
#if BOOST_OS_WINDOWS || BOOST_OS_BSD
allocator_type salloc{ 2*allocator_type::traits_type::page_size() };
#else
allocator_type salloc{ allocator_type::traits_type::page_size() };
#endif
std::uint64_t result{ 0 };
channel_type rc{ 2 };
std::vector< std::thread > threads;

View File

@@ -19,6 +19,7 @@
#include <vector>
#include <boost/fiber/all.hpp>
#include <boost/predef.h>
using allocator_type = boost::fibers::fixedsize_stack;
using channel_type = boost::fibers::buffered_channel< std::uint64_t >;
@@ -51,7 +52,14 @@ int main() {
try {
std::size_t size{ 1000000 };
std::size_t div{ 10 };
// Windows 10 and FreeBSD require a fiber stack of 8kb
// otherwise the stack gets exhausted
// stack requirements must be checked for other OS too
#if BOOST_OS_WINDOWS || BOOST_OS_BSD
allocator_type salloc{ 2*allocator_type::traits_type::page_size() };
#else
allocator_type salloc{ allocator_type::traits_type::page_size() };
#endif
std::uint64_t result{ 0 };
channel_type rc{ 2 };
time_point_type start{ clock_type::now() };

View File

@@ -19,6 +19,7 @@
#include <vector>
#include <boost/fiber/all.hpp>
#include <boost/predef.h>
using allocator_type = boost::fibers::fixedsize_stack;
using channel_type = boost::fibers::buffered_channel< std::uint64_t >;
@@ -55,7 +56,14 @@ int main() {
try {
std::size_t size{ 1000000 };
std::size_t div{ 10 };
// Windows 10 and FreeBSD require a fiber stack of 8kb
// otherwise the stack gets exhausted
// stack requirements must be checked for other OS too
#if BOOST_OS_WINDOWS || BOOST_OS_BSD
allocator_type salloc{ 2*allocator_type::traits_type::page_size() };
#else
allocator_type salloc{ allocator_type::traits_type::page_size() };
#endif
std::uint64_t result{ 0 };
channel_type rc{ 2 };
time_point_type start{ clock_type::now() };

View File

@@ -22,6 +22,7 @@
#include <vector>
#include <boost/fiber/all.hpp>
#include <boost/predef.h>
#include "barrier.hpp"
@@ -78,7 +79,14 @@ int main() {
for ( unsigned int i = 1; i < n; ++i) {
threads.emplace_back( thread, i - 1, & b);
};
// Windows 10 and FreeBSD require a fiber stack of 8kb
// otherwise the stack gets exhausted
// stack requirements must be checked for other OS too
#if BOOST_OS_WINDOWS || BOOST_OS_BSD
allocator_type salloc{ 2*allocator_type::traits_type::page_size() };
#else
allocator_type salloc{ allocator_type::traits_type::page_size() };
#endif
std::uint64_t result{ 0 };
channel_type rc{ 2 };
b.wait();

View File

@@ -22,6 +22,7 @@
#include <vector>
#include <boost/fiber/all.hpp>
#include <boost/predef.h>
#include "barrier.hpp"
@@ -82,7 +83,14 @@ int main() {
for ( unsigned int i = 1; i < n; ++i) {
threads.emplace_back( thread, i - 1, & b);
};
// Windows 10 and FreeBSD require a fiber stack of 8kb
// otherwise the stack gets exhausted
// stack requirements must be checked for other OS too
#if BOOST_OS_WINDOWS || BOOST_OS_BSD
allocator_type salloc{ 2*allocator_type::traits_type::page_size() };
#else
allocator_type salloc{ allocator_type::traits_type::page_size() };
#endif
std::uint64_t result{ 0 };
channel_type rc{ 2 };
b.wait();

View File

@@ -24,6 +24,7 @@
#include <vector>
#include <boost/fiber/all.hpp>
#include <boost/predef.h>
#include "barrier.hpp"
@@ -83,7 +84,14 @@ int main() {
barrier b{ thread_count };
std::size_t size{ 1000000 };
std::size_t div{ 10 };
// Windows 10 and FreeBSD require a fiber stack of 8kb
// otherwise the stack gets exhausted
// stack requirements must be checked for other OS too
#if BOOST_OS_WINDOWS || BOOST_OS_BSD
allocator_type salloc{ 2*allocator_type::traits_type::page_size() };
#else
allocator_type salloc{ allocator_type::traits_type::page_size() };
#endif
std::uint64_t result{ 0 };
channel_type rc{ 2 };
std::vector< std::thread > threads;

View File

@@ -24,6 +24,7 @@
#include <vector>
#include <boost/fiber/all.hpp>
#include <boost/predef.h>
#include "barrier.hpp"
@@ -77,7 +78,14 @@ int main() {
barrier b{ thread_count };
std::size_t size{ 1000000 };
std::size_t div{ 10 };
// Windows 10 and FreeBSD require a fiber stack of 8kb
// otherwise the stack gets exhausted
// stack requirements must be checked for other OS too
#if BOOST_OS_WINDOWS || BOOST_OS_BSD
allocator_type salloc{ 2*allocator_type::traits_type::page_size() };
#else
allocator_type salloc{ allocator_type::traits_type::page_size() };
#endif
std::uint64_t result{ 0 };
channel_type rc{ 2 };
std::vector< std::thread > threads;

View File

@@ -24,6 +24,7 @@
#include <vector>
#include <boost/fiber/all.hpp>
#include <boost/predef.h>
#include "barrier.hpp"
@@ -81,7 +82,14 @@ int main() {
barrier b{ thread_count };
std::size_t size{ 1000000 };
std::size_t div{ 10 };
// Windows 10 and FreeBSD require a fiber stack of 8kb
// otherwise the stack gets exhausted
// stack requirements must be checked for other OS too
#if BOOST_OS_WINDOWS || BOOST_OS_BSD
allocator_type salloc{ 2*allocator_type::traits_type::page_size() };
#else
allocator_type salloc{ allocator_type::traits_type::page_size() };
#endif
std::uint64_t result{ 0 };
channel_type rc{ 2 };
std::vector< std::thread > threads;

View File

@@ -32,19 +32,14 @@ private:
boost::context::continuation
run_( boost::context::continuation && c) noexcept {
c = c.resume();
context * active_ctx = active();
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( nullptr != active_ctx->from_ctx_);
active_ctx->from_ctx_->c_ = std::move( c);
active_ctx->from_ctx_ = nullptr;
if ( nullptr != active_ctx->lk_) {
active_ctx->lk_->unlock();
active_ctx->lk_ = nullptr;
}
if ( nullptr != active_ctx->ready_ctx_) {
active_ctx->schedule( active_ctx->ready_ctx_);
active_ctx->ready_ctx_ = nullptr;
}
detail::data_t * dp = c.get_data< detail::data_t * >();
// update contiunation of calling fiber
dp->from->c_ = std::move( c);
if ( nullptr != dp->lk) {
dp->lk->unlock();
} else if ( nullptr != dp->ctx) {
active()->schedule( dp->ctx);
}
// execute scheduler::dispatch()
return get_scheduler()->dispatch();
}
@@ -61,11 +56,9 @@ public:
static intrusive_ptr< context > make_dispatcher_context() {
default_stack salloc; // use default satck-size
auto sctx = salloc.allocate();
BOOST_ASSERT( ( sizeof( dispatcher_context) + 2048) < sctx.size); // stack at least of 2kB
const std::size_t offset = sizeof( dispatcher_context) + 63;
// reserve space for control structure
void * storage = reinterpret_cast< void * >(
( reinterpret_cast< uintptr_t >( sctx.sp) - static_cast< uintptr_t >( offset) )
( reinterpret_cast< uintptr_t >( sctx.sp) - static_cast< uintptr_t >( sizeof( dispatcher_context) ) )
& ~ static_cast< uintptr_t >( 0xff) );
void * stack_bottom = reinterpret_cast< void * >(
reinterpret_cast< uintptr_t >( sctx.sp) - static_cast< uintptr_t >( sctx.size) );
@@ -124,24 +117,16 @@ context::reset_active() noexcept {
}
void
context::resume_() noexcept {
context * prev = this;
// context_initializer::active_ will point to `this`
// prev will point to previous active context
std::swap( context_initializer::active_, prev);
boost::context::continuation c = c_.resume();
context * active_ctx = active();
BOOST_ASSERT( nullptr != active_ctx);
BOOST_ASSERT( nullptr != active_ctx->from_ctx_);
active_ctx->from_ctx_->c_ = std::move( c);
active_ctx->from_ctx_ = nullptr;
if ( nullptr != active_ctx->lk_) {
active_ctx->lk_->unlock();
active_ctx->lk_ = nullptr;
}
if ( nullptr != active_ctx->ready_ctx_) {
active_ctx->schedule( active_ctx->ready_ctx_);
active_ctx->ready_ctx_ = nullptr;
context::resume_( detail::data_t & d) noexcept {
boost::context::continuation c = c_.resume( & d);
detail::data_t * dp = c.get_data< detail::data_t * >();
if ( nullptr != dp) {
dp->from->c_ = std::move( c);
if ( nullptr != dp->lk) {
dp->lk->unlock();
} else if ( nullptr != dp->ctx) {
active()->schedule( dp->ctx);
}
}
}
@@ -175,22 +160,32 @@ context::get_id() const noexcept {
void
context::resume() noexcept {
from_ctx_ = active();
resume_();
context * prev = this;
// context_initializer::active_ will point to `this`
// prev will point to previous active context
std::swap( context_initializer::active_, prev);
detail::data_t d{ prev };
resume_( d);
}
void
context::resume( detail::spinlock_lock * lk) noexcept {
from_ctx_ = active();
lk_ = lk;
resume_();
context::resume( detail::spinlock_lock & lk) noexcept {
context * prev = this;
// context_initializer::active_ will point to `this`
// prev will point to previous active context
std::swap( context_initializer::active_, prev);
detail::data_t d{ & lk, prev };
resume_( d);
}
void
context::resume( context * ready_ctx) noexcept {
from_ctx_ = active();
ready_ctx_ = ready_ctx;
resume_();
context * prev = this;
// context_initializer::active_ will point to `this`
// prev will point to previous active context
std::swap( context_initializer::active_, prev);
detail::data_t d{ ready_ctx, prev };
resume_( d);
}
void
@@ -199,7 +194,7 @@ context::suspend() noexcept {
}
void
context::suspend( detail::spinlock_lock * lk) noexcept {
context::suspend( detail::spinlock_lock & lk) noexcept {
get_scheduler()->suspend( lk);
}
@@ -216,7 +211,7 @@ context::join() {
// the active context
active_ctx->wait_link( wait_queue_);
// suspend active context
active_ctx->get_scheduler()->suspend( & lk);
active_ctx->get_scheduler()->suspend( lk);
// active context resumed
BOOST_ASSERT( context::active() == active_ctx);
}
@@ -230,13 +225,13 @@ context::yield() noexcept {
boost::context::continuation
context::suspend_with_cc() noexcept {
from_ctx_ = active();
context * prev = this;
// context_initializer::active_ will point to `this`
// prev will point to previous active context
std::swap( context_initializer::active_, prev);
detail::data_t d{ prev };
// context switch
return c_.resume();
return c_.resume( & d);
}
boost::context::continuation
@@ -260,7 +255,7 @@ context::terminate() noexcept {
}
fss_data_.clear();
// switch to another context
return get_scheduler()->terminate( & lk, this);
return get_scheduler()->terminate( lk, this);
}
bool
@@ -272,7 +267,7 @@ context::wait_until( std::chrono::steady_clock::time_point const& tp) noexcept {
bool
context::wait_until( std::chrono::steady_clock::time_point const& tp,
detail::spinlock_lock * lk) noexcept {
detail::spinlock_lock & lk) noexcept {
BOOST_ASSERT( nullptr != get_scheduler() );
BOOST_ASSERT( this == active() );
return get_scheduler()->wait_until( this, tp, lk);

View File

@@ -37,7 +37,7 @@ mutex::lock() {
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
active_ctx->wait_link( wait_queue_);
// suspend this fiber
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
}
}

View File

@@ -36,7 +36,7 @@ recursive_mutex::lock() {
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
active_ctx->wait_link( wait_queue_);
// suspend this fiber
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
}
}

View File

@@ -39,7 +39,7 @@ recursive_timed_mutex::try_lock_until_( std::chrono::steady_clock::time_point co
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
active_ctx->wait_link( wait_queue_);
// suspend this fiber until notified or timed-out
if ( ! active_ctx->wait_until( timeout_time, & lk) ) {
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// remove fiber from wait-queue
lk.lock();
wait_queue_.remove( * active_ctx);
@@ -66,7 +66,7 @@ recursive_timed_mutex::lock() {
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
active_ctx->wait_link( wait_queue_);
// suspend this fiber
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
}
}

View File

@@ -239,7 +239,7 @@ scheduler::schedule_from_remote( context * ctx) noexcept {
#endif
boost::context::continuation
scheduler::terminate( detail::spinlock_lock * lk, context * ctx) noexcept {
scheduler::terminate( detail::spinlock_lock & lk, context * ctx) noexcept {
BOOST_ASSERT( nullptr != ctx);
BOOST_ASSERT( context::active() == ctx);
BOOST_ASSERT( this == ctx->get_scheduler() );
@@ -260,7 +260,7 @@ scheduler::terminate( detail::spinlock_lock * lk, context * ctx) noexcept {
// remove from the worker-queue
ctx->worker_unlink();
// release lock
lk->unlock();
lk.unlock();
// resume another fiber
return algo_->pick_next()->suspend_with_cc();
}
@@ -306,7 +306,7 @@ scheduler::wait_until( context * ctx,
bool
scheduler::wait_until( context * ctx,
std::chrono::steady_clock::time_point const& sleep_tp,
detail::spinlock_lock * lk) noexcept {
detail::spinlock_lock & lk) noexcept {
BOOST_ASSERT( nullptr != ctx);
BOOST_ASSERT( context::active() == ctx);
BOOST_ASSERT( ctx->is_context( type::worker_context) || ctx->is_context( type::main_context) );
@@ -335,7 +335,7 @@ scheduler::suspend() noexcept {
}
void
scheduler::suspend( detail::spinlock_lock * lk) noexcept {
scheduler::suspend( detail::spinlock_lock & lk) noexcept {
// resume another context
algo_->pick_next()->resume( lk);
}

View File

@@ -35,7 +35,7 @@ timed_mutex::try_lock_until_( std::chrono::steady_clock::time_point const& timeo
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
active_ctx->wait_link( wait_queue_);
// suspend this fiber until notified or timed-out
if ( ! active_ctx->wait_until( timeout_time, & lk) ) {
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// remove fiber from wait-queue
lk.lock();
wait_queue_.remove( * active_ctx);
@@ -62,7 +62,7 @@ timed_mutex::lock() {
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
active_ctx->wait_link( wait_queue_);
// suspend this fiber
active_ctx->suspend( & lk);
active_ctx->suspend( lk);
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
}
}

View File

@@ -28,7 +28,7 @@
typedef boost::chrono::milliseconds ms;
boost::atomic< int > value;
boost::atomic< int > value1;
void wait_fn( boost::barrier & b,
boost::fibers::mutex & mtx,
@@ -37,7 +37,7 @@ void wait_fn( boost::barrier & b,
b.wait();
std::unique_lock< boost::fibers::mutex > lk( mtx);
cond.wait( lk, [&flag](){ return flag; });
++value;
++value1;
}
void notify_one_fn( boost::barrier & b,
@@ -106,11 +106,11 @@ void test_one_waiter_notify_one() {
boost::barrier b( 2);
bool flag = false;
value = 0;
value1 = 0;
boost::fibers::mutex mtx;
boost::fibers::condition_variable cond;
BOOST_CHECK( 0 == value);
BOOST_CHECK( 0 == value1);
boost::thread t1(std::bind( fn1, std::ref( b), std::ref( mtx), std::ref( cond), std::ref( flag) ) );
boost::thread t2(std::bind( fn2, std::ref( b), std::ref( mtx), std::ref( cond), std::ref( flag) ) );
@@ -118,7 +118,7 @@ void test_one_waiter_notify_one() {
t1.join();
t2.join();
BOOST_CHECK( 1 == value);
BOOST_CHECK( 1 == value1);
}
}
@@ -127,11 +127,11 @@ void test_two_waiter_notify_all() {
boost::barrier b( 3);
bool flag = false;
value = 0;
value1 = 0;
boost::fibers::mutex mtx;
boost::fibers::condition_variable cond;
BOOST_CHECK( 0 == value);
BOOST_CHECK( 0 == value1);
boost::thread t1(std::bind( fn1, std::ref( b), std::ref( mtx), std::ref( cond), std::ref( flag) ) );
boost::thread t2(std::bind( fn1, std::ref( b), std::ref( mtx), std::ref( cond), std::ref( flag) ) );
@@ -141,7 +141,7 @@ void test_two_waiter_notify_all() {
t2.join();
t3.join();
BOOST_CHECK( 2 == value);
BOOST_CHECK( 2 == value1);
}
}

View File

@@ -28,7 +28,7 @@
typedef boost::chrono::milliseconds ms;
boost::atomic< int > value;
boost::atomic< int > value1;
void wait_fn( boost::barrier & b,
boost::fibers::mutex & mtx,
@@ -37,7 +37,7 @@ void wait_fn( boost::barrier & b,
b.wait();
std::unique_lock< boost::fibers::mutex > lk( mtx);
cond.wait( lk, [&flag](){ return flag; });
++value;
++value1;
}
void notify_one_fn( boost::barrier & b,
@@ -106,11 +106,11 @@ void test_one_waiter_notify_one() {
boost::barrier b( 2);
bool flag = false;
value = 0;
value1 = 0;
boost::fibers::mutex mtx;
boost::fibers::condition_variable cond;
BOOST_CHECK( 0 == value);
BOOST_CHECK( 0 == value1);
boost::thread t1(std::bind( fn1, std::ref( b), std::ref( mtx), std::ref( cond), std::ref( flag) ) );
boost::thread t2(std::bind( fn2, std::ref( b), std::ref( mtx), std::ref( cond), std::ref( flag) ) );
@@ -118,7 +118,7 @@ void test_one_waiter_notify_one() {
t1.join();
t2.join();
BOOST_CHECK( 1 == value);
BOOST_CHECK( 1 == value1);
}
}
@@ -127,11 +127,11 @@ void test_two_waiter_notify_all() {
boost::barrier b( 3);
bool flag = false;
value = 0;
value1 = 0;
boost::fibers::mutex mtx;
boost::fibers::condition_variable cond;
BOOST_CHECK( 0 == value);
BOOST_CHECK( 0 == value1);
boost::thread t1(std::bind( fn1, std::ref( b), std::ref( mtx), std::ref( cond), std::ref( flag) ) );
boost::thread t2(std::bind( fn1, std::ref( b), std::ref( mtx), std::ref( cond), std::ref( flag) ) );
@@ -141,7 +141,7 @@ void test_two_waiter_notify_all() {
t2.join();
t3.join();
BOOST_CHECK( 2 == value);
BOOST_CHECK( 2 == value1);
}
}

View File

@@ -279,8 +279,7 @@ void test_join_bind() {
value1 = 3;
value2 = str;
},
std::placeholders::_1,
std::placeholders::_2
std::placeholders::_1
),
std::ref( abc) );
f.join();

View File

@@ -279,8 +279,7 @@ void test_join_bind() {
value1 = 3;
value2 = str;
},
std::placeholders::_1,
std::placeholders::_2
std::placeholders::_1
),
std::ref( abc) );
f.join();