Sunday, December 30, 2018

[System Design]Thread Pool

A thread pool is a collection of working thread which efficiently execute different tasks asynchronously. To implement it, we want it behaves as follow:

  • The thread pool has a fixed size buffer to store all the the tasks to be executed, if the buffer is full, the thread will wait until there is available space for the new task
  • The worker threads will actively remove the task from the queue and execute them, if there is no available task, it will wait until it is notified there is new task added
The general logic is pretty similar to producer consumer queue problem we discussed about earlier. If you have problem to understand the code below, please check the article above:

class Task
{
public:
Task(int i)
{
id = i;
}
void virtual run()
{
int r = rand() % 3001;
this_thread::sleep_for(chrono::milliseconds(r));
}
int getId()
{
return id;
}
private:
int id;
};
class ThreadPool
{
public:
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(const ThreadPool&) = delete;
ThreadPool()
{
for (int i = 0; i < NUM_OF_THREADS; ++i)
{
thread th(bind(&ThreadPool::start, this));
pool.push_back(move(th));
}
}
void execute(Task& task)
{
//crucial part
unique_lock<mutex> lock(m);
cv.wait(lock, [&] {return q.size() < MAX_CAPACITY; });
cout << "Task " << to_string(task.getId()) << " is added to the blocking queue..." << endl;
q.push(move(task));
//notify there is available task
cv.notify_one();
}
private:
queue<Task> q;
mutex m;
condition_variable cv;
const int MAX_CAPACITY = 5;
const int NUM_OF_THREADS = 6;
vector<thread> pool;
void start()
{
while (true)
{
//crucial part
unique_lock<mutex> lock(m);
cv.wait(lock, [&] {return q.size(); });
Task&& task = move(q.front());
q.pop();
//notify there is space to add more tasks
cout << "Task " << to_string(task.getId()) << " is running..." << endl;
cv.notify_one();
lock.unlock();
//perform task
task.run();
}
}
};
void testThreadPool()
{
const int NUM_OF_TASKS = 15;
ThreadPool pool;
for (int i = 0; i < NUM_OF_TASKS; ++i)
{
Task task(i);
pool.execute(task);
}
getchar();
}
view raw ThreadPool.cpp hosted with ❤ by GitHub

Some explanations:
  • The task will be a base class and if you want to do something different in task, you can inherit it and reimplement the run function
  • C++ thread will start after it is constructed, so don't be confused if you don't see something like thread.start()
  • There will be multiple thread to add tasks to thread pool, all threads in pool will keep executing tasks. All this process is coordinated by lock and condition variables to make sure it behaves as expected, for more details, please check the article above
Example output:


No comments:

Post a Comment