#pragma once /* A simple C++11 Thread Pool implementation(https://github.com/progschj/ThreadPool) modified by bab2min to have additional parameter threadId */ #include #include #include #include #include #include #include #include #include namespace kiwi { namespace utils { class ThreadPool { public: ThreadPool(size_t threads = 0, size_t maxQueued = 0); ~ThreadPool(); template auto enqueue(F&& f, Args&&... args) ->std::future::type>; size_t size() const { return workers.size(); } size_t numEnqueued() const { return tasks.size(); } void joinAll(); private: std::vector workers; std::queue> tasks; std::mutex queue_mutex; std::condition_variable condition, inputCnd; bool stop; size_t maxQueued; }; inline ThreadPool::ThreadPool(size_t threads, size_t _maxQueued) : stop(false), maxQueued(_maxQueued) { for (size_t i = 0; i < threads; ++i) workers.emplace_back([this, i] { for (;;) { std::function task; { std::unique_lock lock(this->queue_mutex); this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); if (this->maxQueued) this->inputCnd.notify_all(); } task(i); } }); } template auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future::type> { using return_type = typename std::invoke_result::type; auto task = std::make_shared< std::packaged_task >( std::bind(std::forward(f), std::placeholders::_1, std::forward(args)...)); std::future res = task->get_future(); { std::unique_lock lock(queue_mutex); // don't allow enqueueing after stopping the pool if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); if (maxQueued && tasks.size() >= maxQueued) { inputCnd.wait(lock, [&]() { return tasks.size() < maxQueued; }); } tasks.emplace([task](size_t id) { (*task)(id); }); } condition.notify_one(); return res; } inline void ThreadPool::joinAll() { if (stop) return; { std::unique_lock lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) worker.join(); } inline ThreadPool::~ThreadPool() { joinAll(); } template void forEach(ThreadPool* pool, InputIt first, InputIt last, Fn fn) { if (!pool) { for (; first != last; ++first) { fn(0, *first); } } else { const size_t numItems = std::distance(first, last); const size_t numWorkers = std::min(pool->size(), numItems); std::vector> futures; futures.reserve(numWorkers); for (size_t i = 0; i < numWorkers; ++i) { InputIt mid = first; std::advance(mid, (numItems * (i + 1) / numWorkers) - (numItems * i / numWorkers)); futures.emplace_back(pool->enqueue([&](size_t tid, InputIt tFirst, InputIt tLast) { for (; tFirst != tLast; ++tFirst) { fn(tid, *tFirst); } }, first, mid)); first = mid; } for (auto& f : futures) { f.get(); } } } template void forEach(ThreadPool* pool, Container& cont, Fn fn) { forEach(pool, std::begin(cont), std::end(cont), fn); } } }