Added ThreadPool class

This commit is contained in:
Daniel Wolf 2016-06-26 14:02:17 +02:00
parent 0aeb35c42e
commit 84097756c8
3 changed files with 138 additions and 0 deletions

View File

@ -222,6 +222,7 @@ set(SOURCE_FILES
src/g2p.cpp src/g2p.h src/g2p.cpp src/g2p.h
src/languageModels.cpp src/languageModels.h src/languageModels.cpp src/languageModels.h
src/tupleHash.h src/tupleHash.h
src/ThreadPool.cpp src/ThreadPool.h
) )
add_executable(rhubarb ${SOURCE_FILES}) add_executable(rhubarb ${SOURCE_FILES})
target_link_libraries(rhubarb ${Boost_LIBRARIES} cppFormat sphinxbase pocketSphinx flite webRTC) target_link_libraries(rhubarb ${Boost_LIBRARIES} cppFormat sphinxbase pocketSphinx flite webRTC)

78
src/ThreadPool.cpp Normal file
View File

@ -0,0 +1,78 @@
#include "ThreadPool.h"
int ThreadPool::getRecommendedThreadCount() {
int coreCount = std::thread::hardware_concurrency();
// If the number of cores cannot be determined, use a reasonable default
return coreCount != 0 ? coreCount : 4;
}
ThreadPool::ThreadPool(int threadCount) :
threadCount(threadCount),
remainingJobCount(0),
bailout(false) {
for (int i = 0; i < threadCount; ++i) {
threads.push_back(std::thread([&] {
Task();
}));
}
}
ThreadPool::~ThreadPool() {
waitAll();
// Notify that we're done, and wake up any threads that are waiting for a new job
bailout = true;
jobAvailableCondition.notify_all();
for (auto& thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
}
void ThreadPool::addJob(job_t job) {
std::lock_guard<std::mutex> guard(queueMutex);
jobQueue.emplace_back(job);
++remainingJobCount;
jobAvailableCondition.notify_one();
}
void ThreadPool::waitAll() {
if (remainingJobCount == 0) return;
std::unique_lock<std::mutex> lock(waitMutex);
waitCondition.wait(lock, [&] {
return remainingJobCount == 0;
});
lock.unlock();
}
void ThreadPool::Task() {
while (!bailout) {
getNextJob()();
--remainingJobCount;
waitCondition.notify_one();
}
}
ThreadPool::job_t ThreadPool::getNextJob() {
std::unique_lock<std::mutex> jobLock(queueMutex);
// Wait for a job if we don't have any
jobAvailableCondition.wait(jobLock, [&] {
return jobQueue.size() > 0 || bailout;
});
if (bailout) {
// Return a dummy job to keep remainingJobCount accurate
++remainingJobCount;
return [] {};
}
// Get job from the queue
auto result = jobQueue.front();
jobQueue.pop_front();
return result;
}

59
src/ThreadPool.h Normal file
View File

@ -0,0 +1,59 @@
#pragma once
#include <atomic>
#include <thread>
#include <mutex>
#include <list>
#include <functional>
#include <vector>
// Thread pool based on https://github.com/nbsdx/ThreadPool, which is in the public domain.
class ThreadPool {
public:
using job_t = std::function<void(void)>;
static int getRecommendedThreadCount();
ThreadPool(int threadCount = getRecommendedThreadCount());
~ThreadPool();
// Gets the number of threads in this pool
int getThreadCount() const {
return threadCount;
}
// Gets the number of jobs left in the queue
int getRemainingJobCount() const {
return remainingJobCount;
}
// Adds a new job to the pool.
// If there are no queued jobs, a thread is woken up to take the job.
// If all threads are busy, the job is added to the end of the queue.
void addJob(job_t job);
// Blocks until all jobs have finshed executing
void waitAll();
private:
const int threadCount;
std::vector<std::thread> threads;
std::list<job_t> jobQueue;
std::atomic_int remainingJobCount; // The number of queued or running jobs
std::atomic_bool bailout;
std::condition_variable jobAvailableCondition;
std::condition_variable waitCondition;
std::mutex waitMutex;
std::mutex queueMutex;
// Takes the next job in the queue and run it.
// Notify the main thread that a job has completed.
void Task();
// Gets the next job; pop the first item in the queue,
// otherwise wait for a signal from the main thread
job_t getNextJob();
};