2
0
mirror of https://github.com/boostorg/fiber.git synced 2026-02-02 08:52:07 +00:00

use fifo as waiting-queue

This commit is contained in:
Oliver Kowalke
2014-03-16 13:33:07 +01:00
parent 12505646ae
commit 6edddcbabc
6 changed files with 12049 additions and 25 deletions

View File

@@ -0,0 +1,176 @@
// Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#ifndef BOOST_FIBERS_DETAIL_FIFO_H
#define BOOST_FIBERS_DETAIL_FIFO_H
#include <cstddef>
#include <boost/assert.hpp>
#include <boost/config.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/utility.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/worker_fiber.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
namespace detail {
class fifo : private noncopyable
{
public:
typedef worker_fiber::ptr_t ptr_t;
fifo() BOOST_NOEXCEPT :
head_(),
tail_()
{}
bool empty() const BOOST_NOEXCEPT
{ return 0 == head_.get(); }
std::size_t size() const BOOST_NOEXCEPT
{
std::size_t counter = 0;
for ( ptr_t x = head_; x; x = x->next() )
++counter;
return counter;
}
void push( ptr_t const& item) BOOST_NOEXCEPT
{
BOOST_ASSERT( item);
if ( empty() )
{
head_ = tail_ = item;
return;
}
tail_->next( item);
tail_ = tail_->next();
}
ptr_t head() const BOOST_NOEXCEPT
{ return head_; }
void top( ptr_t const& item) BOOST_NOEXCEPT
{ head_ = item; }
ptr_t tail() const BOOST_NOEXCEPT
{ return tail_; }
void tail( ptr_t const& item) BOOST_NOEXCEPT
{ tail_ = item; }
ptr_t pop() BOOST_NOEXCEPT
{
BOOST_ASSERT( ! empty() );
ptr_t item = head_;
head_ = head_->next();
if ( ! head_)
tail_ = head_;
else
item->next_reset();
return item;
}
ptr_t find( ptr_t const& item) BOOST_NOEXCEPT
{
BOOST_ASSERT( item);
for ( ptr_t x = head_; x; x = x->next() )
if ( item == x) return x;
return ptr_t();
}
void erase( ptr_t const& item) BOOST_NOEXCEPT
{
BOOST_ASSERT( item);
BOOST_ASSERT( ! empty() );
if ( item == head_)
{
pop();
return;
}
for ( ptr_t x = head_; x; x = x->next() )
{
ptr_t nxt = x->next();
if ( ! nxt) return;
if ( item == nxt)
{
if ( tail_ == nxt) tail_ = x;
x->next( nxt->next() );
nxt->next_reset();
return;
}
}
}
template< typename Queue, typename Fn >
void move_to( Queue & queue, Fn fn)
{
for ( ptr_t f = head_, prev = head_; f; )
{
ptr_t nxt = f->next();
if ( fn( f) )
{
if ( f == head_)
{
head_ = nxt;
if ( ! head_)
tail_ = head_;
else
{
head_ = nxt;
prev = nxt;
}
}
else
{
if ( ! nxt)
tail_ = prev;
prev->next( nxt);
}
f->next_reset();
queue.push_back( f);
}
else
prev = f;
f = nxt;
}
}
void swap( fifo & other)
{
head_.swap( other.head_);
tail_.swap( other.tail_);
}
private:
ptr_t head_;
ptr_t tail_;
};
}}}
# if defined(BOOST_MSVC)
# pragma warning(pop)
# endif
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#endif // BOOST_FIBERS_DETAIL_FIFO_H

View File

@@ -83,6 +83,8 @@ private:
> coro_t;
fss_data_t fss_data_;
ptr_t nxt_;
clock_type::time_point tp_;
void trampoline_( coro_t::yield_type &);
@@ -210,6 +212,24 @@ public:
BOOST_ASSERT( is_running() ); // set by the scheduler-algorithm
}
ptr_t const& next() const
{ return nxt_; }
void next( ptr_t const& nxt)
{ nxt_ = nxt; }
void next_reset()
{ nxt_.reset(); }
clock_type::time_point const& time_point() const
{ return tp_; }
void time_point( clock_type::time_point const& tp)
{ tp_ = tp; }
void time_point_reset()
{ tp_ = (clock_type::time_point::max)(); }
};
}}}

View File

@@ -21,10 +21,11 @@
#include <boost/fiber/algorithm.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/worker_fiber.hpp>
#include <boost/fiber/detail/main_fiber.hpp>
#include <boost/fiber/detail/fiber_base.hpp>
#include <boost/fiber/detail/fifo.hpp>
#include <boost/fiber/detail/main_fiber.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/detail/worker_fiber.hpp>
#include <boost/fiber/fiber.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
@@ -42,6 +43,7 @@ namespace fibers {
class BOOST_FIBERS_DECL round_robin : public algorithm
{
private:
#if 0
struct schedulable
{
detail::worker_fiber::ptr_t f;
@@ -68,8 +70,9 @@ private:
iterator begin() { return c.begin(); }
iterator end() { return c.end(); }
};
#endif
//typedef std::deque< schedulable > wqueue_t;
typedef detail::fifo wqueue_t;
typedef std::deque< detail::worker_fiber::ptr_t > rqueue_t;
detail::worker_fiber::ptr_t active_fiber_;

File diff suppressed because it is too large Load Diff

View File

@@ -60,6 +60,8 @@ worker_fiber::trampoline_( coro_t::yield_type & yield)
worker_fiber::worker_fiber( attributes const& attrs) :
fiber_base(),
fss_data_(),
nxt_(),
tp_( (clock_type::time_point::max)() ),
callee_( 0),
caller_(
boost::bind( & worker_fiber::trampoline_, this, _1),

View File

@@ -28,6 +28,18 @@
namespace boost {
namespace fibers {
bool fetch_ready( detail::worker_fiber::ptr_t & f)
{
BOOST_ASSERT( ! f->is_running() );
BOOST_ASSERT( ! f->is_terminated() );
// set fiber to state_ready if dead-line was reached
// set fiber to state_ready if interruption was requested
if ( f->time_point() <= clock_type::now() || f->interruption_requested() )
f->set_ready();
return f->is_ready();
}
detail::worker_fiber::ptr_t
round_robin::pick_next_()
{
@@ -99,25 +111,7 @@ round_robin::run()
{
// move all fibers witch are ready (state_ready)
// from waiting-queue to the runnable-queue
wqueue_t wqueue;
for ( wqueue_t::iterator it = wqueue_.begin(), end = wqueue_.end(); it != end; ++it )
{
detail::worker_fiber::ptr_t f( it->f);
BOOST_ASSERT( ! f->is_running() );
BOOST_ASSERT( ! f->is_terminated() );
// set fiber to state_ready if dead-line was reached
// set fiber to state_ready if interruption was requested
if ( it->tp <= clock_type::now() || f->interruption_requested() )
f->set_ready();
if ( f->is_ready() )
rqueue_.push_back( f);
else wqueue.push( * it);
}
// exchange local with global waiting queue
std::swap( wqueue_, wqueue);
wqueue_.move_to( rqueue_, fetch_ready);
// pop new fiber from ready-queue which is not complete
// (example: fiber in ready-queue could be canceled by active-fiber)
@@ -155,7 +149,8 @@ round_robin::wait_until( clock_type::time_point const& timeout_time,
// release lock
lk.unlock();
// push active fiber to wqueue_
wqueue_.push( schedulable( active_fiber_, timeout_time) );
active_fiber_->time_point( timeout_time);
wqueue_.push( active_fiber_);
// run next fiber
run();
@@ -171,7 +166,7 @@ round_robin::yield()
// set active fiber to state_waiting
active_fiber_->set_ready();
// push active fiber to wqueue_
wqueue_.push( schedulable( active_fiber_) );
wqueue_.push( active_fiber_);
// run next fiber
run();
}
@@ -187,7 +182,7 @@ round_robin::join( detail::worker_fiber::ptr_t const& f)
// set active fiber to state_waiting
active_fiber_->set_waiting();
// push active fiber to wqueue_
wqueue_.push( schedulable( active_fiber_) );
wqueue_.push( active_fiber_);
// add active fiber to joinig-list of f
if ( ! f->join( active_fiber_) )
// f must be already terminated therefore we set