diff --git a/Jamroot.jam b/Jamroot.jam index 3588703d9..9a4444a5f 100644 --- a/Jamroot.jam +++ b/Jamroot.jam @@ -12,15 +12,11 @@ if --grep in [ modules.peek : ARGV ] import path ; import regex ; local mmm = [ regex.grep - /home/grafik/Sync/DevRoots/B2/bfg-mainline/src/engine : *.h - : "#(include) \"([^\"]+)\"" : 1 2 ] ; - ECHO "====" ; - while $(mmm[1]) - { - ECHO $(mmm[1-3]) ; - mmm = $(mmm[4-]) ; - } - ECHO "====" ; + # /home/grafik/Sync/DevRoots/B2/bfg-mainline/src : bi*.h + /home/grafik/Sync/DevRoots/Boost/develop/boost : *.*pp + : "#(include) \"([^\"]+)\"" "#(include) <([^>]+)>" + : 1 2 + : recursive ] ; EXIT "!" ; } diff --git a/src/engine/config.h b/src/engine/config.h index c4f216b12..d51671285 100644 --- a/src/engine/config.h +++ b/src/engine/config.h @@ -67,4 +67,10 @@ typedef long int32_t; #define B2_NOEXCEPT noexcept #endif +// Indicate if we can use std::thread and friends. +#ifndef B2_USE_STD_THREADS +#define B2_USE_STD_THREADS 1 +#endif + + #endif diff --git a/src/engine/mod_regex.cpp b/src/engine/mod_regex.cpp index d7454d50d..f644973e8 100644 --- a/src/engine/mod_regex.cpp +++ b/src/engine/mod_regex.cpp @@ -1,5 +1,5 @@ /* -Copyright 2019-2022 René Ferdinand Rivera Morell +Copyright 2019-2023 René Ferdinand Rivera Morell Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE.txt or https://www.bfgroup.xyz/b2/LICENSE.txt) */ @@ -206,12 +206,16 @@ struct regex_grep_task { list_cref file_glob_patterns; std::vector text_grep_prog; - std::unique_ptr grep_tasks; + std::shared_ptr grep_tasks; flags text_grep_result_expressions; std::vector>> intermediate; + mutex_t mx; + bool recursive_glob = false; - regex_grep_task( - list_cref files, list_cref patterns, flags expressions) + regex_grep_task(list_cref files, + list_cref patterns, + flags expressions, + list_cref options) : file_glob_patterns(files) , text_grep_result_expressions(expressions) { @@ -220,7 +224,12 @@ struct regex_grep_task { text_grep_prog.emplace_back(p->str()); } - grep_tasks = task_executor::get().make(); + for (auto option : options) + { + std::string opt = option->str(); + if (opt == "recursive") recursive_glob = true; + } + grep_tasks = b2::task::executor::get().make(); } static void dirscan_callback(regex_grep_task * self, @@ -240,6 +249,18 @@ struct regex_grep_task std::string filename(filepathparts.base() + filepathparts.suffix()); // Ignore meta-dir paths. if (filename == "." || filename == "..") return; + // If indicated, recurse scan subdirectories. + if (recursive_glob) + { + if (file_is_file(file) == 0) + { + file_dirscan(file, + reinterpret_cast( + ®ex_grep_task::dirscan_callback), + this); + return; + } + } // Match the full `path` to the set of glob patterns we are looking for. for (auto glob_pattern : file_glob_patterns) { @@ -257,9 +278,6 @@ struct regex_grep_task // WARNING: We need to avoid Jam operations in this. As we are getting // called from different threads. And the Jam memory is not thread-safe. - // out_printf(">> b2::regex_grep_task::file_grep(%s)\n", - // filepath.c_str()); - // The match results are tuples of filepath+expressions. Collect all // those tuples for this file here. std::unique_ptr> result( @@ -275,8 +293,8 @@ struct regex_grep_task auto grep_i = prog.search(filedata.begin(), filedata.end()); for (; grep_i; ++grep_i) { - // We need to add the file to the result (which is followed - // by the match expressions). + // We need to add the file to the result (which is + // followed by the match expressions). result->push_back(filepath); for (int i = 0; i < text_grep_result_expressions.size; ++i) { @@ -284,7 +302,6 @@ struct regex_grep_task { std::string m(grep_i[i].str, grep_i[i].size); result->push_back(m); - // out_printf(" : %s\n", m.c_str()); } } } @@ -292,14 +309,13 @@ struct regex_grep_task } // Append this file's results to the global results. - // TODO: Thread locking on the append as this will be a multi-thread op. - intermediate.push_back(std::move(result)); + scope_lock_t lock(mx); + intermediate.emplace_back(result.release()); } void wait() { grep_tasks->wait(); - out_printf(">> b2::regex_grep_task::wait()\n"); } }; @@ -308,7 +324,8 @@ struct regex_grep_task list_ref b2::regex_grep(list_cref directories, list_cref files, list_cref patterns, - list_cref result_expressions) + list_cref result_expressions, + list_cref options) { // For the glob we always do a case insensitive compare. So we need // the globs as lower-case. @@ -336,7 +353,7 @@ list_ref b2::regex_grep(list_cref directories, } } - regex_grep_task task(list_cref(*globs), patterns, sub_expr); + regex_grep_task task(list_cref(*globs), patterns, sub_expr, options); for (auto dir : directories) { file_dirscan(dir, diff --git a/src/engine/mod_regex.h b/src/engine/mod_regex.h index fc8c42876..ad3bf4124 100644 --- a/src/engine/mod_regex.h +++ b/src/engine/mod_regex.h @@ -1,5 +1,5 @@ /* -Copyright 2019-2022 René Ferdinand Rivera Morell +Copyright 2019-2023 René Ferdinand Rivera Morell Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE.txt or https://www.bfgroup.xyz/b2/LICENSE.txt) */ @@ -191,8 +191,10 @@ list_ref regex_replace_each( ==== [horizontal] Jam:: `rule grep ( directories + : files + : patterns + : result_expressions * -)` {CPP}:: `b2::list_ref regex_grep(b2::list_cref directories, b2::list_cref -files, b2::list_cref patterns, list_cref result_expressions);` +: options * )` +{CPP}:: `b2::list_ref regex_grep(b2::list_cref directories, b2::list_cref +files, b2::list_cref patterns, list_cref result_expressions, +list_cref options);` ==== Match any of the `patterns` against the globbed `files` in `directories`, and @@ -204,7 +206,8 @@ end::reference[] */ list_ref regex_grep(list_cref directories, list_cref files, list_cref patterns, - list_cref result_expressions); + list_cref result_expressions, + list_cref options); struct regex_module : b2::bind::module_ { @@ -229,7 +232,7 @@ struct regex_module : b2::bind::module_ "list" * _n | "match" * _1 | "replacement" * _1) .def(®ex_grep, "grep", "directories" * _1n | "files" * _1n | "patterns" * _1n - | "result_expressions" * _n); + | "result_expressions" * _n | "options" * _n); binder.eval(init_code); binder.loaded(); } diff --git a/src/engine/tasks.cpp b/src/engine/tasks.cpp index 69dc38d81..4ef93b9e4 100644 --- a/src/engine/tasks.cpp +++ b/src/engine/tasks.cpp @@ -1,29 +1,274 @@ /* -Copyright 2022 René Ferdinand Rivera Morell +Copyright 2022-2023 René Ferdinand Rivera Morell Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE.txt or https://www.bfgroup.xyz/b2/LICENSE.txt) */ #include "tasks.h" +#include "config.h" -#include +#include "mod_sysinfo.h" -b2::task_group::task_group() {} +#include +#if B2_USE_STD_THREADS +#include +#include +#include +#endif -void b2::task_group::queue(const std::function & f) { f(); } +namespace b2 { namespace task { -void b2::task_group::wait() {} - -b2::task_executor::task_executor(unsigned parallelism /* = 0 */) {} - -std::unique_ptr b2::task_executor::make( - unsigned parallelism /* = 0 */) +/* +A group of tasks that run in parallel within a limit of parallelism. The +parallelism limit is enforced by only dequeuing calls when possible. +*/ +struct group::implementation { - return std::unique_ptr(new b2::task_group); + executor & exec; + unsigned parallelism = 0; + unsigned running = 0; + std::vector> pending; + +#if B2_USE_STD_THREADS + std::mutex mx; +#endif // B2_USE_STD_THREADS + + inline implementation(executor & e, unsigned p) + : exec(e) + , parallelism(p) + {} + + // Add a task call to run. + inline void call_queue(std::function f); + + // Remove and return a call ro run. If no call is available or possible + // and empty function is returned. + inline std::function call_dequeue(); + + // Wait for all queued task calls to complete. + inline void wait(); +}; + +/* +A global executor of tasks from a collection of task groups. There is an overall +parallelism limit that follow the general Jam (-j) limit by default. Tasks from +groups are run within this limit regardless of what the group limits are. That +is, it will run the minimum of the execution limit and the group limits. +*/ +struct executor::implementation +{ + std::vector> groups; + unsigned call_count = 0; + unsigned running_count = 0; + +#if B2_USE_STD_THREADS + std::mutex mx; + std::vector runners; + std::condition_variable call_cv; +#endif // B2_USE_STD_THREADS + + // Construction starts the task calling threads within the parallelism + // limit. + inline implementation(unsigned parallelism = 0); + + // Add a group of tasks to run. + inline void push_group(std::shared_ptr g); + + // Signal that new tasks are available, and hence that the task threads + // should run them. + inline void call_signal(); + + // Get a task call to execute from the collective set of group tasks. + inline std::function call_get(); + + // Indicate that an task call completed. + inline void call_done(); + + // Should we keep running (true), or complete ASAP (false). + inline bool is_running(); + + // Stop the threads in the execution pool regardless of pending tasks. + inline void stop(); + + // Waits for task calls to execute. This is teh body of each thread. + void runner(); +}; + +#if B2_USE_STD_THREADS + +inline void group::implementation::call_queue(std::function f) +{ + // We wrap the bare task function to track how many are running at any time. + // The running count is used in the wait and to enforce the group + // parallelism limit. + auto the_call = [this, f]() { + std::unique_lock lock(mx); + running += 1; + lock.unlock(); + f(); + lock.lock(); + running -= 1; + }; + std::unique_lock lock(mx); + pending.push_back(the_call); + lock.unlock(); + // Signal the executor that there are tasks to run. + exec.i->call_signal(); } -b2::task_executor & b2::task_executor::get() +inline std::function group::implementation::call_dequeue() { - static task_executor e; + std::unique_lock lock(mx); + std::function result; + // We only return tasks when we have them, and when we have enough + // parallelism. + if (!pending.empty() && running < parallelism) + { + result = pending.back(); + pending.pop_back(); + } + return result; +} + +inline void group::implementation::wait() +{ + while (true) + { + { + std::unique_lock lock(mx); + // We need to wait until we have nothing to run and are not running + // anything. + if (pending.empty() && running == 0) return; + } + std::this_thread::yield(); + } +} + +inline executor::implementation::implementation(unsigned parallelism) +{ + std::unique_lock lock(mx); + // Launch the threads to cover the expected parallelism. + runners.reserve(parallelism); + for (; parallelism > 0; --parallelism) + { + running_count += 1; + runners.emplace_back([this]() { runner(); }); + } +} + +inline void executor::implementation::push_group(std::shared_ptr g) +{ + std::unique_lock lock(mx); + groups.push_back(g); +} + +inline void executor::implementation::call_signal() { call_cv.notify_one(); } + +inline std::function executor::implementation::call_get() +{ + std::function result; + std::unique_lock lock(mx); + // We only dequeue task calls when we have a thread to run them. + if (call_count < runners.size()) + { + for (auto & group : groups) + { + result = group->i->call_dequeue(); + if (result) + { + call_count += 1; + return result; + } + } + } + // We don't have tasks to run, wait for some to become available. + call_cv.wait(lock); + return result; +} + +inline void executor::implementation::call_done() +{ + std::unique_lock lock(mx); + call_count -= 1; +} + +inline bool executor::implementation::is_running() +{ + std::unique_lock lock(mx); + return running_count > 0; +} + +inline void executor::implementation::stop() +{ + std::vector to_join; + { + std::unique_lock lock(mx); + running_count = 0; + to_join.swap(runners); + } + call_cv.notify_all(); + for (auto & t : to_join) + { + t.join(); + } +} + +#endif // B2_USE_STD_THREADS + +void executor::implementation::runner() +{ + while (is_running()) + { + auto f = call_get(); + if (f) + { + try + { + f(); + } + catch (const std::exception &) + {} + call_done(); + } + } +} + +namespace { +unsigned get_parallelism(unsigned parallelism) +{ + return parallelism == 0 ? system_info().cpu_thread_count() : parallelism; +} +} // namespace + +group::group(executor & exec, unsigned parallelism /* = 0 */) + : i(std::make_shared(exec, parallelism)) +{ + i->parallelism = parallelism; +} + +group::~group() {} + +void group::queue(std::function && f) { i->call_queue(f); } + +void group::wait() { i->wait(); } + +executor::executor(unsigned parallelism /* = 0 */) + : i(std::make_shared(get_parallelism(parallelism))) +{} + +executor::~executor() { i->stop(); } + +std::shared_ptr executor::make(unsigned parallelism /* = 0 */) +{ + auto result = std::make_shared(*this, get_parallelism(parallelism)); + i->push_group(result); + return result; +} + +executor & executor::get() +{ + static executor e; return e; } + +}} // namespace b2::task diff --git a/src/engine/tasks.h b/src/engine/tasks.h index 36cb5739a..d0974b01e 100644 --- a/src/engine/tasks.h +++ b/src/engine/tasks.h @@ -1,5 +1,5 @@ /* -Copyright 2022 René Ferdinand Rivera Morell +Copyright 2022-2023 René Ferdinand Rivera Morell Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE.txt or https://www.bfgroup.xyz/b2/LICENSE.txt) */ @@ -14,24 +14,67 @@ Distributed under the Boost Software License, Version 1.0. #include #include -namespace b2 { +/* += Tasks -class task_group +Utility classes for parallel invocation of "tasks" (i.e. functions). +*/ + +namespace b2 { namespace task { + +class executor; + +/* +== `b2::task::group` + +A task group is a collection of tasks that will execute within a parallelism +limit of their own and can be waited on collectively to complete. +*/ +class group { public: - task_group(); - void queue(const std::function & f); + group(executor & exec, unsigned parallelism = 0); + ~group(); + + // Add a task function to execute async. The functions are executed in no + // particular order. + void queue(std::function && f); + + // Wait for all the task functions in the group to complete. void wait(); + + private: + struct implementation; + std::shared_ptr i; + + friend class executor; }; -class task_executor +/* +== `b2::task::executor` + +Global task execution queue that has a global parallelism limit. By default the +parallelism limit matches the `b2::system_info::cpu_thread_count` value. +*/ +class executor { public: - task_executor(unsigned parallelism = 0); - std::unique_ptr make(unsigned parallelism = 0); - static task_executor & get(); + executor(unsigned parallelism = 0); + ~executor(); + + // The global executor instance. + static executor & get(); + + // Create a task group. + std::shared_ptr make(unsigned parallelism = 0); + + private: + struct implementation; + std::shared_ptr i; + + friend class group; }; -} // namespace b2 +}} // namespace b2::task #endif diff --git a/src/engine/types.h b/src/engine/types.h index ac32a186f..c4f395b8b 100644 --- a/src/engine/types.h +++ b/src/engine/types.h @@ -6,13 +6,25 @@ #ifndef B2_TYPES_H #define B2_TYPES_H +#include "config.h" + #include +#if B2_USE_STD_THREADS +#include +#endif + namespace b2 { using string_t = std::string; using int_t = int; using uint_t = unsigned int; + +#if B2_USE_STD_THREADS +using mutex_t = std::mutex; +using scope_lock_t = std::unique_lock; +#endif + } // namespace b2 #endif