/* * Copyright (C) Nemirtingas * This file is part of System. * * System is free software; you can redistribute it * and/or modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 3 of the License, or (at your option) any later version. * * System is distributed in the hope that it will be * useful, but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with the System; if not, see * . */ #pragma once #include #include #include #include #include #include #include #include #include #include namespace System { class ThreadPool { using task_t = std::function; std::atomic _StopWorkers; std::atomic _ActiveCount; std::condition_variable _WorkerNotifier; std::mutex _Mutex; std::vector _Workers; std::queue _Tasks; public: explicit ThreadPool(): _ActiveCount(0) { } ~ThreadPool() { Join(); } ThreadPool(ThreadPool const &) = delete; ThreadPool(ThreadPool&&) = default; ThreadPool&operator=(ThreadPool const &) = delete; ThreadPool&operator=(ThreadPool&&) = default; template auto Push(Func &&fn, Args &&...args) { using return_type = typename std::result_of::type; auto task{ std::make_shared>( std::bind(std::forward(fn), std::forward(args)...) ) }; auto future{ task->get_future() }; { std::lock_guard lock(_Mutex); _Tasks.emplace([task]() { (*task)(); }); } _WorkerNotifier.notify_one(); return future; } // Remove all pending tasks from the queue void Clear() { std::lock_guard lock(_Mutex); _Tasks = {}; } // Stops all previous and creates new worker threads. void Start(std::size_t worker_count = std::thread::hardware_concurrency()) { Join(); _StopWorkers = false; for (std::size_t i = 0; i < worker_count; ++i) _Workers.emplace_back(std::bind(&ThreadPool::_WorkerLoop, this)); } // Wait all workers to finish void Join() { _StopWorkers = true; _WorkerNotifier.notify_all(); for (auto &thread : _Workers) { if (thread.joinable()) thread.join(); } _Workers.clear(); } std::size_t WorkerCount() const { return _Workers.size(); } // Get the number of active workers std::size_t ActiveCount() const { return _ActiveCount; } private: void _WorkerLoop() { while (true) { auto task{ _NextTask() }; if (task) { ++_ActiveCount; task(); --_ActiveCount; } else if (_StopWorkers) { break; } } } task_t _NextTask() { std::unique_lock lock{ _Mutex }; _WorkerNotifier.wait(lock, [this]() { return !_Tasks.empty() || _StopWorkers; }); if (_Tasks.empty()) return {}; auto task{ _Tasks.front() }; _Tasks.pop(); return task; } }; }