- The thread pool has a fixed size buffer to store all the the tasks to be executed, if the buffer is full, the thread will wait until there is available space for the new task
- The worker threads will actively remove the task from the queue and execute them, if there is no available task, it will wait until it is notified there is new task added
The general logic is pretty similar to producer consumer queue problem we discussed about earlier. If you have problem to understand the code below, please check the article above:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Task | |
{ | |
public: | |
Task(int i) | |
{ | |
id = i; | |
} | |
void virtual run() | |
{ | |
int r = rand() % 3001; | |
this_thread::sleep_for(chrono::milliseconds(r)); | |
} | |
int getId() | |
{ | |
return id; | |
} | |
private: | |
int id; | |
}; | |
class ThreadPool | |
{ | |
public: | |
ThreadPool& operator=(const ThreadPool&) = delete; | |
ThreadPool(const ThreadPool&) = delete; | |
ThreadPool() | |
{ | |
for (int i = 0; i < NUM_OF_THREADS; ++i) | |
{ | |
thread th(bind(&ThreadPool::start, this)); | |
pool.push_back(move(th)); | |
} | |
} | |
void execute(Task& task) | |
{ | |
//crucial part | |
unique_lock<mutex> lock(m); | |
cv.wait(lock, [&] {return q.size() < MAX_CAPACITY; }); | |
cout << "Task " << to_string(task.getId()) << " is added to the blocking queue..." << endl; | |
q.push(move(task)); | |
//notify there is available task | |
cv.notify_one(); | |
} | |
private: | |
queue<Task> q; | |
mutex m; | |
condition_variable cv; | |
const int MAX_CAPACITY = 5; | |
const int NUM_OF_THREADS = 6; | |
vector<thread> pool; | |
void start() | |
{ | |
while (true) | |
{ | |
//crucial part | |
unique_lock<mutex> lock(m); | |
cv.wait(lock, [&] {return q.size(); }); | |
Task&& task = move(q.front()); | |
q.pop(); | |
//notify there is space to add more tasks | |
cout << "Task " << to_string(task.getId()) << " is running..." << endl; | |
cv.notify_one(); | |
lock.unlock(); | |
//perform task | |
task.run(); | |
} | |
} | |
}; | |
void testThreadPool() | |
{ | |
const int NUM_OF_TASKS = 15; | |
ThreadPool pool; | |
for (int i = 0; i < NUM_OF_TASKS; ++i) | |
{ | |
Task task(i); | |
pool.execute(task); | |
} | |
getchar(); | |
} |
Some explanations:
- The task will be a base class and if you want to do something different in task, you can inherit it and reimplement the run function
- C++ thread will start after it is constructed, so don't be confused if you don't see something like thread.start()
- There will be multiple thread to add tasks to thread pool, all threads in pool will keep executing tasks. All this process is coordinated by lock and condition variables to make sure it behaves as expected, for more details, please check the article above
Example output:
No comments:
Post a Comment