#ifndef ZERO_THREADPOOL_H
#define ZERO_THREADPOOL_H
#include <future> // 用于future相关操作
#include <functional> // 用于std::bind和function
#include <iostream> // 用于输入输出
#include <queue> // 用于任务队列
#include <mutex> // 用于锁
#include <memory> // 用于智能指针
#ifdef WIN32
#include <windows.h> // Windows平台的头文件
#else
#include <sys/time.h> // Linux平台的时间相关头文件
#endif
using namespace std;
// 获取当前时间,填充到timeval结构体中
void getNow(timeval *tv);
// 获取当前时间的毫秒数
int64_t getNowMs();
// 宏定义用于当前时间
#define TNOW
#define TNOWMS
// 获取当前时间
getNow()
// 获取当前时间的毫秒数
getNowMs()
///
/**
* @file zero_thread_pool.h
* @brief 线程池类, 采用C++11实现
*
* 使用说明:
* ZERO_ThreadPool tpool;
* tpool.init(5); // 初始化线程池线程数
* tpool.start(); // 启动线程池
* tpool.exec(testFunction, 10); // 将任务丢到线程池中
* tpool.waitForAllDone(1000); // 等待线程池结束,超时1秒
* tpool.stop(); // 停止线程池
*
* 返回值示例:
* auto f = tpool.exec(testInt, 5);
* cout << f.get() << endl; // 当testInt在线程池中执行后, f.get()会返回数值5
*/
class ZERO_ThreadPool
{
protected:
struct TaskFunc
{
TaskFunc(uint64_t expireTime) : _expireTime(expireTime) {}
std::function<void()> _func; // 任务的实际函数
int64_t _expireTime = 0; // 任务超时时间
};
typedef shared_ptr<TaskFunc> TaskFuncPtr;
public:
/**
* @brief 构造函数
*/
ZERO_ThreadPool();
/**
* @brief 析构函数,停止所有线程
*/
virtual ~ZERO_ThreadPool();
/**
* @brief 初始化线程池
* @param num 工作线程个数
* @return 是否初始化成功
*/
bool init(size_t num);
/**
* @brief 获取当前线程池的线程个数
* @return 线程个数
*/
size_t getThreadNum()
{
std::unique_lock<std::mutex> lock(_mutex);
return _threads.size();
}
/**
* @brief 获取当前线程池的任务数
* @return 任务数
*/
size_t getJobNum()
{
std::unique_lock<std::mutex> lock(_mutex);
return _tasks.size();
}
/**
* @brief 停止所有线程,并等待线程结束
*/
void stop();
/**
* @brief 启动所有线程
* @return 是否成功启动
*/
bool start();
/**
* @brief 用线程池执行任务
* @param f 任务函数
* @param args 任务函数参数
* @return 返回任务的future对象
*/
template <class F, class... Args>
auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
{
return exec(0, f, args...);
}
/**
* @brief 用线程池执行带有超时的任务
* @param timeoutMs 超时时间(毫秒)
* @param f 任务函数
* @param args 任务函数参数
* @return 返回任务的future对象
*/
template <class F, class... Args>
auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>
{
int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs); // 获取当前时间
// 推导返回值类型
using RetType = decltype(f(args...));
// 封装任务
auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
// 创建任务指针并设置超时时间
TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);
fPtr->_func = [task]() { (*task)(); }; // 定义任务执行时的具体行为
// 加锁并将任务加入任务队列
std::unique_lock<std::mutex> lock(_mutex);
_tasks.push(fPtr);
_condition.notify_one(); // 唤醒等待线程
return task->get_future();
}
/**
* @brief 等待所有任务完成
* @param millsecond 等待的时间(毫秒),-1表示无限等待
* @return 是否所有工作都处理完毕
*/
bool waitForAllDone(int millsecond = -1);
protected:
/**
* @brief 获取任务
* @param task 任务指针
* @return 是否成功获取任务
*/
bool get(TaskFuncPtr& task);
/**
* @brief 线程池是否退出
* @return 是否退出
*/
bool isTerminate() { return _bTerminate; }
/**
* @brief 线程运行逻辑
*/
void run();
protected:
queue<TaskFuncPtr> _tasks; // 任务队列
std::vector<std::thread*> _threads; // 工作线程
std::mutex _mutex; // 互斥锁
std::condition_variable _condition; // 条件变量
size_t _threadNum; // 线程池线程数
std::atomic<int> _bTerminate; // 终止标志
};
#endif // ZERO_THREADPOOL_H
这个头文件定义了一个多线程线程池类 ZERO_ThreadPool,用于在多个线程之间调度并执行任务。它采用了 C++11 的标准,主要功能包括任务的执行、线程管理、任务队列等,支持任务超时管理。
文件中的主要部分解释
- 头文件和宏定义
#include <future> // 用于future相关操作
#include <functional> // 用于std::bind和function
#include <iostream> // 用于输入输出
#include <queue> // 用于任务队列
#include <mutex> // 用于锁
#include <memory> // 用于智能指针
#ifdef WIN32
#include <windows.h> // Windows平台的头文件
#else
#include <sys/time.h> // Linux平台的时间相关头文件
#endif
这些是本文件中所需的标准库头文件,用于任务管理、线程同步、内存管理和系统相关功能。根据不同平台,windows.h 或 sys/time.h 被包含进来。
- 时间相关函数
// 获取当前时间,填充到timeval结构体中
void getNow(timeval *tv);
// 获取当前时间的毫秒数
int64_t getNowMs();
这些函数用于获取当前系统时间,并将其以不同格式返回。getNow 获取精确到微秒的时间,getNowMs 获取精确到毫秒的时间。
- 宏定义
#define TNOW
#define TNOWMS
这些宏本应提供对当前时间的直接调用,但在当前代码中,它们只是占位符。TNOW 和 TNOWMS 可能应该用于返回当前时间(getNow() 和 getNowMs())。
- TaskFunc 结构体
struct TaskFunc
{
TaskFunc(uint64_t expireTime) : _expireTime(expireTime) {}
std::function<void()> _func; // 任务的实际函数
int64_t _expireTime = 0; // 任务超时时间
};
TaskFunc 用于封装任务及其超时时间。_func 是实际的任务函数,_expireTime 是任务的超时时间(如果有)。
- ZERO_ThreadPool 类
这是线程池类的定义,包含了初始化、任务执行、线程管理等功能。
- 构造函数和析构函数
ZERO_ThreadPool();
virtual ~ZERO_ThreadPool();
构造函数初始化线程池,析构函数清理线程池并停止所有线程。
- 初始化函数
bool init(size_t num);
初始化线程池,num 为线程数,成功返回 true,失败返回 false。
- 获取线程数和任务数
size_t getThreadNum();
size_t getJobNum();
获取当前线程池中的线程数和任务数。都使用了互斥锁 std::mutex 来保证线程安全。
- 停止线程池
void stop();
停止线程池中的所有线程。
- 启动线程池
bool start();
启动线程池中的所有线程。
- 任务执行
template <class F, class... Args>
auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))>;
提交任务到线程池,f 是任务函数,args 是任务的参数。返回一个 std::future 对象,用于获取任务的结果。
还有一个重载版本,允许指定超时时间:
template <class F, class... Args>
auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>;
timeoutMs 指定任务的超时毫秒数。
- 等待所有任务完成
bool waitForAllDone(int millsecond = -1);
等待所有任务完成。millsecond 指定超时时间,默认为 -1,表示无限等待。
- 获取任务
bool get(TaskFuncPtr& task);
从任务队列中获取任务。
- 线程运行逻辑
void run();
线程池的工作线程会调用此函数,它的主要职责是从任务队列中取任务并执行。
- 保护成员变量
- _tasks: 任务队列,存储待执行的任务。
- _threads: 工作线程的列表。
- _mutex: 保护任务队列的互斥锁。
- _condition: 条件变量,用于通知工作线程有新的任务到来。
- _threadNum: 线程池中的线程数。
- _bTerminate: 线程池是否终止的标志。
总结
这个线程池类通过以下方式实现多线程任务管理:
- 支持动态提交任务,任务可以有超时设置。
- 通过互斥锁和条件变量保证线程安全和任务调度。
- 提供了等待任务完成的功能,防止主线程提前退出。
- 支持跨平台,Windows 和 Linux 系统可以使用。
这种设计非常适合需要高并发和任务调度的场景,能有效地利用多个线程来执行任务,提高系统效率。