2
0
mirror of https://github.com/boostorg/build.git synced 2026-01-19 04:02:14 +00:00

WIP: Working parallel grep using std::thread.

This commit is contained in:
Rene Rivera
2023-01-10 08:56:41 -06:00
parent 5625a195af
commit 39609a0aba
7 changed files with 375 additions and 53 deletions

View File

@@ -12,15 +12,11 @@ if --grep in [ modules.peek : ARGV ]
import path ;
import regex ;
local mmm = [ regex.grep
/home/grafik/Sync/DevRoots/B2/bfg-mainline/src/engine : *.h
: "#(include) \"([^\"]+)\"" : 1 2 ] ;
ECHO "====" ;
while $(mmm[1])
{
ECHO $(mmm[1-3]) ;
mmm = $(mmm[4-]) ;
}
ECHO "====" ;
# /home/grafik/Sync/DevRoots/B2/bfg-mainline/src : bi*.h
/home/grafik/Sync/DevRoots/Boost/develop/boost : *.*pp
: "#(include) \"([^\"]+)\"" "#(include) <([^>]+)>"
: 1 2
: recursive ] ;
EXIT "!" ;
}

View File

@@ -67,4 +67,10 @@ typedef long int32_t;
#define B2_NOEXCEPT noexcept
#endif
// Indicate if we can use std::thread and friends.
#ifndef B2_USE_STD_THREADS
#define B2_USE_STD_THREADS 1
#endif
#endif

View File

@@ -1,5 +1,5 @@
/*
Copyright 2019-2022 René Ferdinand Rivera Morell
Copyright 2019-2023 René Ferdinand Rivera Morell
Distributed under the Boost Software License, Version 1.0.
(See accompanying file LICENSE.txt or https://www.bfgroup.xyz/b2/LICENSE.txt)
*/
@@ -206,12 +206,16 @@ struct regex_grep_task
{
list_cref file_glob_patterns;
std::vector<b2::regex::program> text_grep_prog;
std::unique_ptr<task_group> grep_tasks;
std::shared_ptr<b2::task::group> grep_tasks;
flags<std::uint16_t> text_grep_result_expressions;
std::vector<std::unique_ptr<std::vector<std::string>>> intermediate;
mutex_t mx;
bool recursive_glob = false;
regex_grep_task(
list_cref files, list_cref patterns, flags<std::uint16_t> expressions)
regex_grep_task(list_cref files,
list_cref patterns,
flags<std::uint16_t> expressions,
list_cref options)
: file_glob_patterns(files)
, text_grep_result_expressions(expressions)
{
@@ -220,7 +224,12 @@ struct regex_grep_task
{
text_grep_prog.emplace_back(p->str());
}
grep_tasks = task_executor::get().make();
for (auto option : options)
{
std::string opt = option->str();
if (opt == "recursive") recursive_glob = true;
}
grep_tasks = b2::task::executor::get().make();
}
static void dirscan_callback(regex_grep_task * self,
@@ -240,6 +249,18 @@ struct regex_grep_task
std::string filename(filepathparts.base() + filepathparts.suffix());
// Ignore meta-dir paths.
if (filename == "." || filename == "..") return;
// If indicated, recurse scan subdirectories.
if (recursive_glob)
{
if (file_is_file(file) == 0)
{
file_dirscan(file,
reinterpret_cast<scanback>(
&regex_grep_task::dirscan_callback),
this);
return;
}
}
// Match the full `path` to the set of glob patterns we are looking for.
for (auto glob_pattern : file_glob_patterns)
{
@@ -257,9 +278,6 @@ struct regex_grep_task
// WARNING: We need to avoid Jam operations in this. As we are getting
// called from different threads. And the Jam memory is not thread-safe.
// out_printf(">> b2::regex_grep_task::file_grep(%s)\n",
// filepath.c_str());
// The match results are tuples of filepath+expressions. Collect all
// those tuples for this file here.
std::unique_ptr<std::vector<std::string>> result(
@@ -275,8 +293,8 @@ struct regex_grep_task
auto grep_i = prog.search(filedata.begin(), filedata.end());
for (; grep_i; ++grep_i)
{
// We need to add the file to the result (which is followed
// by the match expressions).
// We need to add the file to the result (which is
// followed by the match expressions).
result->push_back(filepath);
for (int i = 0; i < text_grep_result_expressions.size; ++i)
{
@@ -284,7 +302,6 @@ struct regex_grep_task
{
std::string m(grep_i[i].str, grep_i[i].size);
result->push_back(m);
// out_printf(" : %s\n", m.c_str());
}
}
}
@@ -292,14 +309,13 @@ struct regex_grep_task
}
// Append this file's results to the global results.
// TODO: Thread locking on the append as this will be a multi-thread op.
intermediate.push_back(std::move(result));
scope_lock_t lock(mx);
intermediate.emplace_back(result.release());
}
void wait()
{
grep_tasks->wait();
out_printf(">> b2::regex_grep_task::wait()\n");
}
};
@@ -308,7 +324,8 @@ struct regex_grep_task
list_ref b2::regex_grep(list_cref directories,
list_cref files,
list_cref patterns,
list_cref result_expressions)
list_cref result_expressions,
list_cref options)
{
// For the glob we always do a case insensitive compare. So we need
// the globs as lower-case.
@@ -336,7 +353,7 @@ list_ref b2::regex_grep(list_cref directories,
}
}
regex_grep_task task(list_cref(*globs), patterns, sub_expr);
regex_grep_task task(list_cref(*globs), patterns, sub_expr, options);
for (auto dir : directories)
{
file_dirscan(dir,

View File

@@ -1,5 +1,5 @@
/*
Copyright 2019-2022 René Ferdinand Rivera Morell
Copyright 2019-2023 René Ferdinand Rivera Morell
Distributed under the Boost Software License, Version 1.0.
(See accompanying file LICENSE.txt or https://www.bfgroup.xyz/b2/LICENSE.txt)
*/
@@ -191,8 +191,10 @@ list_ref regex_replace_each(
====
[horizontal]
Jam:: `rule grep ( directories + : files + : patterns + : result_expressions *
)` {CPP}:: `b2::list_ref regex_grep(b2::list_cref directories, b2::list_cref
files, b2::list_cref patterns, list_cref result_expressions);`
: options * )`
{CPP}:: `b2::list_ref regex_grep(b2::list_cref directories, b2::list_cref
files, b2::list_cref patterns, list_cref result_expressions,
list_cref options);`
====
Match any of the `patterns` against the globbed `files` in `directories`, and
@@ -204,7 +206,8 @@ end::reference[] */
list_ref regex_grep(list_cref directories,
list_cref files,
list_cref patterns,
list_cref result_expressions);
list_cref result_expressions,
list_cref options);
struct regex_module : b2::bind::module_<regex_module>
{
@@ -229,7 +232,7 @@ struct regex_module : b2::bind::module_<regex_module>
"list" * _n | "match" * _1 | "replacement" * _1)
.def(&regex_grep, "grep",
"directories" * _1n | "files" * _1n | "patterns" * _1n
| "result_expressions" * _n);
| "result_expressions" * _n | "options" * _n);
binder.eval(init_code);
binder.loaded();
}

View File

@@ -1,29 +1,274 @@
/*
Copyright 2022 René Ferdinand Rivera Morell
Copyright 2022-2023 René Ferdinand Rivera Morell
Distributed under the Boost Software License, Version 1.0.
(See accompanying file LICENSE.txt or https://www.bfgroup.xyz/b2/LICENSE.txt)
*/
#include "tasks.h"
#include "config.h"
#include <memory>
#include "mod_sysinfo.h"
b2::task_group::task_group() {}
#include <vector>
#if B2_USE_STD_THREADS
#include <condition_variable>
#include <mutex>
#include <thread>
#endif
void b2::task_group::queue(const std::function<void()> & f) { f(); }
namespace b2 { namespace task {
void b2::task_group::wait() {}
b2::task_executor::task_executor(unsigned parallelism /* = 0 */) {}
std::unique_ptr<b2::task_group> b2::task_executor::make(
unsigned parallelism /* = 0 */)
/*
A group of tasks that run in parallel within a limit of parallelism. The
parallelism limit is enforced by only dequeuing calls when possible.
*/
struct group::implementation
{
return std::unique_ptr<b2::task_group>(new b2::task_group);
executor & exec;
unsigned parallelism = 0;
unsigned running = 0;
std::vector<std::function<void()>> pending;
#if B2_USE_STD_THREADS
std::mutex mx;
#endif // B2_USE_STD_THREADS
inline implementation(executor & e, unsigned p)
: exec(e)
, parallelism(p)
{}
// Add a task call to run.
inline void call_queue(std::function<void()> f);
// Remove and return a call ro run. If no call is available or possible
// and empty function is returned.
inline std::function<void()> call_dequeue();
// Wait for all queued task calls to complete.
inline void wait();
};
/*
A global executor of tasks from a collection of task groups. There is an overall
parallelism limit that follow the general Jam (-j) limit by default. Tasks from
groups are run within this limit regardless of what the group limits are. That
is, it will run the minimum of the execution limit and the group limits.
*/
struct executor::implementation
{
std::vector<std::shared_ptr<group>> groups;
unsigned call_count = 0;
unsigned running_count = 0;
#if B2_USE_STD_THREADS
std::mutex mx;
std::vector<std::thread> runners;
std::condition_variable call_cv;
#endif // B2_USE_STD_THREADS
// Construction starts the task calling threads within the parallelism
// limit.
inline implementation(unsigned parallelism = 0);
// Add a group of tasks to run.
inline void push_group(std::shared_ptr<group> g);
// Signal that new tasks are available, and hence that the task threads
// should run them.
inline void call_signal();
// Get a task call to execute from the collective set of group tasks.
inline std::function<void()> call_get();
// Indicate that an task call completed.
inline void call_done();
// Should we keep running (true), or complete ASAP (false).
inline bool is_running();
// Stop the threads in the execution pool regardless of pending tasks.
inline void stop();
// Waits for task calls to execute. This is teh body of each thread.
void runner();
};
#if B2_USE_STD_THREADS
inline void group::implementation::call_queue(std::function<void()> f)
{
// We wrap the bare task function to track how many are running at any time.
// The running count is used in the wait and to enforce the group
// parallelism limit.
auto the_call = [this, f]() {
std::unique_lock<std::mutex> lock(mx);
running += 1;
lock.unlock();
f();
lock.lock();
running -= 1;
};
std::unique_lock<std::mutex> lock(mx);
pending.push_back(the_call);
lock.unlock();
// Signal the executor that there are tasks to run.
exec.i->call_signal();
}
b2::task_executor & b2::task_executor::get()
inline std::function<void()> group::implementation::call_dequeue()
{
static task_executor e;
std::unique_lock<std::mutex> lock(mx);
std::function<void()> result;
// We only return tasks when we have them, and when we have enough
// parallelism.
if (!pending.empty() && running < parallelism)
{
result = pending.back();
pending.pop_back();
}
return result;
}
inline void group::implementation::wait()
{
while (true)
{
{
std::unique_lock<std::mutex> lock(mx);
// We need to wait until we have nothing to run and are not running
// anything.
if (pending.empty() && running == 0) return;
}
std::this_thread::yield();
}
}
inline executor::implementation::implementation(unsigned parallelism)
{
std::unique_lock<std::mutex> lock(mx);
// Launch the threads to cover the expected parallelism.
runners.reserve(parallelism);
for (; parallelism > 0; --parallelism)
{
running_count += 1;
runners.emplace_back([this]() { runner(); });
}
}
inline void executor::implementation::push_group(std::shared_ptr<group> g)
{
std::unique_lock<std::mutex> lock(mx);
groups.push_back(g);
}
inline void executor::implementation::call_signal() { call_cv.notify_one(); }
inline std::function<void()> executor::implementation::call_get()
{
std::function<void()> result;
std::unique_lock<std::mutex> lock(mx);
// We only dequeue task calls when we have a thread to run them.
if (call_count < runners.size())
{
for (auto & group : groups)
{
result = group->i->call_dequeue();
if (result)
{
call_count += 1;
return result;
}
}
}
// We don't have tasks to run, wait for some to become available.
call_cv.wait(lock);
return result;
}
inline void executor::implementation::call_done()
{
std::unique_lock<std::mutex> lock(mx);
call_count -= 1;
}
inline bool executor::implementation::is_running()
{
std::unique_lock<std::mutex> lock(mx);
return running_count > 0;
}
inline void executor::implementation::stop()
{
std::vector<std::thread> to_join;
{
std::unique_lock<std::mutex> lock(mx);
running_count = 0;
to_join.swap(runners);
}
call_cv.notify_all();
for (auto & t : to_join)
{
t.join();
}
}
#endif // B2_USE_STD_THREADS
void executor::implementation::runner()
{
while (is_running())
{
auto f = call_get();
if (f)
{
try
{
f();
}
catch (const std::exception &)
{}
call_done();
}
}
}
namespace {
unsigned get_parallelism(unsigned parallelism)
{
return parallelism == 0 ? system_info().cpu_thread_count() : parallelism;
}
} // namespace
group::group(executor & exec, unsigned parallelism /* = 0 */)
: i(std::make_shared<implementation>(exec, parallelism))
{
i->parallelism = parallelism;
}
group::~group() {}
void group::queue(std::function<void()> && f) { i->call_queue(f); }
void group::wait() { i->wait(); }
executor::executor(unsigned parallelism /* = 0 */)
: i(std::make_shared<implementation>(get_parallelism(parallelism)))
{}
executor::~executor() { i->stop(); }
std::shared_ptr<group> executor::make(unsigned parallelism /* = 0 */)
{
auto result = std::make_shared<group>(*this, get_parallelism(parallelism));
i->push_group(result);
return result;
}
executor & executor::get()
{
static executor e;
return e;
}
}} // namespace b2::task

View File

@@ -1,5 +1,5 @@
/*
Copyright 2022 René Ferdinand Rivera Morell
Copyright 2022-2023 René Ferdinand Rivera Morell
Distributed under the Boost Software License, Version 1.0.
(See accompanying file LICENSE.txt or https://www.bfgroup.xyz/b2/LICENSE.txt)
*/
@@ -14,24 +14,67 @@ Distributed under the Boost Software License, Version 1.0.
#include <functional>
#include <memory>
namespace b2 {
/*
= Tasks
class task_group
Utility classes for parallel invocation of "tasks" (i.e. functions).
*/
namespace b2 { namespace task {
class executor;
/*
== `b2::task::group`
A task group is a collection of tasks that will execute within a parallelism
limit of their own and can be waited on collectively to complete.
*/
class group
{
public:
task_group();
void queue(const std::function<void()> & f);
group(executor & exec, unsigned parallelism = 0);
~group();
// Add a task function to execute async. The functions are executed in no
// particular order.
void queue(std::function<void()> && f);
// Wait for all the task functions in the group to complete.
void wait();
private:
struct implementation;
std::shared_ptr<implementation> i;
friend class executor;
};
class task_executor
/*
== `b2::task::executor`
Global task execution queue that has a global parallelism limit. By default the
parallelism limit matches the `b2::system_info::cpu_thread_count` value.
*/
class executor
{
public:
task_executor(unsigned parallelism = 0);
std::unique_ptr<task_group> make(unsigned parallelism = 0);
static task_executor & get();
executor(unsigned parallelism = 0);
~executor();
// The global executor instance.
static executor & get();
// Create a task group.
std::shared_ptr<group> make(unsigned parallelism = 0);
private:
struct implementation;
std::shared_ptr<implementation> i;
friend class group;
};
} // namespace b2
}} // namespace b2::task
#endif

View File

@@ -6,13 +6,25 @@
#ifndef B2_TYPES_H
#define B2_TYPES_H
#include "config.h"
#include <string>
#if B2_USE_STD_THREADS
#include <mutex>
#endif
namespace b2
{
using string_t = std::string;
using int_t = int;
using uint_t = unsigned int;
#if B2_USE_STD_THREADS
using mutex_t = std::mutex;
using scope_lock_t = std::unique_lock<std::mutex>;
#endif
} // namespace b2
#endif