2
0
mirror of https://github.com/boostorg/build.git synced 2026-01-19 04:02:14 +00:00

Tweak tasks to avoid busy wait for group and to obey -j limit.

This commit is contained in:
Rene Rivera
2023-01-16 23:22:50 -06:00
parent 80ba6ff692
commit 416d556b41
3 changed files with 117 additions and 41 deletions

View File

@@ -12,8 +12,11 @@ if --grep in [ modules.peek : ARGV ]
import path ;
import regex ;
local mmm = [ regex.grep
# /home/grafik/Sync/DevRoots/B2/bfg-mainline/src : bi*.h
/home/grafik/Sync/DevRoots/Boost/develop/boost : *.*pp
/home/grafik/Sync/DevRoots
# /home/grafik/Sync/DevRoots/B2/bfg-mainline/src
# /home/grafik/Sync/DevRoots/Boost/develop/boost
# /home/grafik/Sync/DevRoots/grafikrobot/llvm-project
: *.*pp *.h *.cpp
: "#(include) \"([^\"]+)\"" "#(include) <([^>]+)>"
: 1 2
: recursive ] ;

View File

@@ -7,10 +7,14 @@ Distributed under the Boost Software License, Version 1.0.
#include "tasks.h"
#include "config.h"
#include "jam.h"
#include "mod_sysinfo.h"
#include "output.h"
#include <queue>
#include <vector>
#if B2_USE_STD_THREADS
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
@@ -18,6 +22,48 @@ Distributed under the Boost Software License, Version 1.0.
namespace b2 { namespace task {
struct sync
{
inline sync();
inline void wait();
inline bool signal();
#if B2_USE_STD_THREADS
std::mutex arrived_mx;
std::condition_variable arrived_cv;
std::atomic_bool wait_arrived;
std::atomic_bool signal_arrived;
#endif
};
#if B2_USE_STD_THREADS
inline sync::sync()
: wait_arrived(false)
, signal_arrived(false)
{}
inline void sync::wait()
{
// Indicate that we waiting.
wait_arrived = true;
// Wait for the signal that we can proceed.
std::unique_lock<std::mutex> lock(arrived_mx);
arrived_cv.wait(lock, [this]() { return signal_arrived.load(); });
}
inline bool sync::signal()
{
// Wait for wait() to get called.
if (!wait_arrived.load()) return false;
// Tell the waiter that we arrived.
signal_arrived = true;
arrived_cv.notify_one();
return true;
}
#endif
/*
A group of tasks that run in parallel within a limit of parallelism. The
parallelism limit is enforced by only dequeuing calls when possible.
@@ -27,10 +73,13 @@ struct group::implementation
executor & exec;
unsigned parallelism = 0;
unsigned running = 0;
std::vector<std::function<void()>> pending;
std::queue<std::function<void()>> pending;
unsigned stat_max_running = 0;
unsigned stat_total = 0;
#if B2_USE_STD_THREADS
std::mutex mx;
sync finished;
#endif // B2_USE_STD_THREADS
inline implementation(executor & e, unsigned p)
@@ -69,7 +118,7 @@ struct executor::implementation
// Construction starts the task calling threads within the parallelism
// limit.
inline implementation(unsigned parallelism = 0);
inline implementation(unsigned parallelism);
// Add a group of tasks to run.
inline void push_group(std::shared_ptr<group> g);
@@ -98,56 +147,75 @@ struct executor::implementation
inline void group::implementation::call_queue(std::function<void()> f)
{
// If we don't have parallel allotment. We opt to execute the call inline.
if (parallelism == 0)
{
f();
return;
}
// We wrap the bare task function to track how many are running at any time.
// The running count is used in the wait and to enforce the group
// parallelism limit.
auto the_call = [this, f]() {
std::unique_lock<std::mutex> lock(mx);
running += 1;
lock.unlock();
{
std::unique_lock<std::mutex> lock(mx);
running += 1;
stat_max_running = std::max(running, stat_max_running);
stat_total += 1;
}
f();
lock.lock();
running -= 1;
bool signal_finished = false;
{
std::unique_lock<std::mutex> lock(mx);
running -= 1;
signal_finished = pending.empty() && running == 0;
}
if (signal_finished) finished.signal();
};
std::unique_lock<std::mutex> lock(mx);
pending.push_back(the_call);
lock.unlock();
{
std::unique_lock<std::mutex> lock(mx);
pending.push(std::move(the_call));
}
// Signal the executor that there are tasks to run.
exec.i->call_signal();
}
inline std::function<void()> group::implementation::call_dequeue()
{
std::unique_lock<std::mutex> lock(mx);
bool signal_finished = false;
std::function<void()> result;
// We only return tasks when we have them, and when we have enough
// parallelism.
if (!pending.empty() && running < parallelism)
{
result = pending.back();
pending.pop_back();
std::unique_lock<std::mutex> lock(mx);
signal_finished = pending.empty() && running == 0;
// We only return tasks when we have them, and when we have enough
// parallelism.
if (!pending.empty() && running < parallelism)
{
result = std::move(pending.front());
pending.pop();
}
}
if (signal_finished) finished.signal();
return result;
}
inline void group::implementation::wait()
{
while (true)
{
{
std::unique_lock<std::mutex> lock(mx);
// We need to wait until we have nothing to run and are not running
// anything.
if (pending.empty() && running == 0) return;
}
std::this_thread::yield();
}
// Without parallelism there's nothing to wait. As everything was executed
// inline already.
if (parallelism == 0) return;
// Signal the tasks that we are waiting for completion.
exec.i->call_signal();
// Wait for completion of the group tasks.
finished.wait();
}
inline executor::implementation::implementation(unsigned parallelism)
{
std::unique_lock<std::mutex> lock(mx);
// No need to launch anything if we aren't parallel.
if (parallelism == 0) return;
// Launch the threads to cover the expected parallelism.
std::unique_lock<std::mutex> lock(mx);
runners.reserve(parallelism);
for (; parallelism > 0; --parallelism)
{
@@ -200,6 +268,7 @@ inline bool executor::implementation::is_running()
inline void executor::implementation::stop()
{
// Stop all the runner threads (i.e. signal and wait).
std::vector<std::thread> to_join;
{
std::unique_lock<std::mutex> lock(mx);
@@ -234,31 +303,35 @@ void executor::implementation::runner()
}
namespace {
unsigned get_parallelism(unsigned parallelism)
unsigned get_parallelism(int parallelism)
{
return parallelism == 0 ? system_info().cpu_thread_count() : parallelism;
return parallelism >= 0
? parallelism
: std::min(unsigned(globs.jobs), system_info().cpu_thread_count()) - 1;
}
} // namespace
group::group(executor & exec, unsigned parallelism /* = 0 */)
group::group(executor & exec, int parallelism)
: i(std::make_shared<implementation>(exec, parallelism))
{
i->parallelism = parallelism;
}
{}
group::~group() {}
group::~group()
{
out_printf("b2::task::group.. MAX = %u, TOTAL = %u\n", i->stat_max_running,
i->stat_total);
}
void group::queue(std::function<void()> && f) { i->call_queue(f); }
void group::wait() { i->wait(); }
executor::executor(unsigned parallelism /* = 0 */)
executor::executor(int parallelism)
: i(std::make_shared<implementation>(get_parallelism(parallelism)))
{}
executor::~executor() { i->stop(); }
std::shared_ptr<group> executor::make(unsigned parallelism /* = 0 */)
std::shared_ptr<group> executor::make(int parallelism)
{
auto result = std::make_shared<group>(*this, get_parallelism(parallelism));
i->push_group(result);

View File

@@ -33,7 +33,7 @@ limit of their own and can be waited on collectively to complete.
class group
{
public:
group(executor & exec, unsigned parallelism = 0);
group(executor & exec, int parallelism = -1);
~group();
// Add a task function to execute async. The functions are executed in no
@@ -59,14 +59,14 @@ parallelism limit matches the `b2::system_info::cpu_thread_count` value.
class executor
{
public:
executor(unsigned parallelism = 0);
executor(int parallelism = -1);
~executor();
// The global executor instance.
static executor & get();
// Create a task group.
std::shared_ptr<group> make(unsigned parallelism = 0);
std::shared_ptr<group> make(int parallelism = -1);
private:
struct implementation;