一.epoll反应堆
1.设计思想
Epoll反应堆是一种在Linux系统上实现高性能I/O多路复用的机制,通常用于网络编程中。它的设计思想主要包括以下几个方面:
- 事件驱动: Epoll反应堆采用事件驱动的模型,即当I/O事件发生时,系统通过回调函数或其他方式通知应用程序。这种设计避免了传统的轮询方式,提高了效率。
- 非阻塞I/O: Epoll反应堆利用非阻塞I/O的特性,当某个I/O操作无法立即完成时,不会阻塞整个线程或进程,而是通过回调函数告知应用程序,从而提高了并发性能。
- 可扩展性: Epoll反应堆的设计考虑了系统的可扩展性,它能够处理大量的并发连接,而且随着连接数量的增加,性能不会线性下降。
- 高效的事件管理: Epoll反应堆通过内核态与用户态之间的事件传递,避免了频繁的上下文切换,减少了系统开销。
- 精细的事件选择: Epoll反应堆能够监控多个文件描述符上的事件,并且在事件发生时通知应用程序,这种精细的事件选择机制能够减少不必要的系统资源消耗。
- 支持边缘触发和水平触发: Epoll反应堆支持两种工作模式,边缘触发和水平触发。边缘触发模式只在状态变化时通知应用程序,而水平触发模式则会持续通知直到事件处理完毕。
2.边缘触发
**目的:**通过事件类型的快速转换,使程序能够在事件发生时及时做出响应,并尽可能地减少不必要的事件触发,以提高性能和效率。
**体现:**本代码中,在readData
和sendData
中反映了边缘触发的模型,实现了事件的快速响应和高效处理
-
当写事件被触发时,会先读取客户端数据,再立即将事件设置为写事件,待缓冲区有足够空间写入数据时,才会触发写事件写出读取到的数据
这样避免了频繁触发写事件,从而提高效率
-
当写事件被触发时,会先写出从客户端读取的数据,而后立即将事件类型设置回读事件
这样做却道了客户端发送更多数据之前,及时准备好接收
graph TD;
subgraph 监听cfd
A[新事件到达] --> B[根据事件类型执行相关任务]
B --> |读事件| C[读取数据] --> D{读到数据?}
D -- 是 --> E[切换为写事件]
D -- 否 --> F[关闭连接、下树]
B --> |写事件| G[写出数据] --> H[切换为读事件]
end
3.示例
(1)流程图
graph TD;
subgraph 初始化
A["包含必要的头文件"] --> B["定义常量"]
B --> C["声明全局变量"]
C --> D["定义事件结构体"]
end
subgraph 事件管理
D --> E["添加事件: eventadd()"]
D --> F["修改事件: eventset()"]
D --> G["删除事件: eventdel()"]
end
subgraph 事件回调
H["发送数据: senddata()"] --> J{"事件触发"}
I["读取数据: readData()"] --> J
J --> K["初始连接接受: initAccept()"]
end
subgraph 主函数
L["创建套接字,绑定,监听"]
M["创建epoll实例"]
N["将初始事件添加到epoll"]
L --> M --> N --> O["无限循环"]
O --> P["epoll_wait()"]
P --> Q["处理事件"]
Q --> R["检索相应的xevent"]
R --> S["调用关联的回调函数"]
S --> T["处理新事件"]
end
subgraph 清理
O --> U["终止条件"]
U --> V["关闭监听套接字"]
V --> W["退出程序"]
end
subgraph 处理新事件
T["检查事件类型"]
T -->|新连接事件| U["执行initAccept()"]
T -->|已存在客户端事件| V["根据事件类型执行相关任务"]
end
(2)代码
① 执行过程
-
初始化:
- 包含了必要的头文件。
- 定义了缓冲区长度和事件大小等常量。
- 声明了全局变量,包括epoll文件描述符
gepfd
和名为myevents
的xevent
结构数组。
-
事件结构体:
-
xevent
结构体定义了一个事件,包括文件描述符、事件、回调函数、缓冲区和其他相关数据。
-
-
事件管理函数:
-
eventadd
:向epoll实例添加事件。 -
eventset
:修改现有事件。 -
eventdel
:从epoll实例中删除事件。
-
-
事件回调函数:
-
senddata
:处理发送数据的回调函数。 -
readData
:处理读取数据的回调函数。 -
initAccept
:处理初始连接接受的回调函数。
-
-
主函数:
- 创建套接字、绑定和监听。
- 创建epoll实例(
gepfd
)。 - 添加初始事件(
initAccept
)到epoll实例。 - 在一个无限循环中:
-
epoll_wait
等待epoll实例上的事件发生。 - 当有事件发生时:
- 遍历事件。
- 获取相应的
xevent
结构体。 - 调用与事件关联的回调函数。
-
-
清理:
- 主循环在满足终止条件时退出。
- 关闭监听套接字。
- 程序退出。
② code
//反应堆简单版
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "wrap.h"
#define _BUF_LEN_ 1024
#define _EVENT_SIZE_ 1024
//全局epoll树的根
int gepfd = 0;
//事件驱动结构体
typedef struct xx_event{
int fd;
int events;
void (*call_back)(int fd,int events,void *arg);
void *arg;
char buf[1024];
int buflen;
int epfd;
}xevent;
xevent myevents[_EVENT_SIZE_+1];
void readData(int fd,int events,void *arg);
//添加事件
//eventadd(lfd,EPOLLIN,initAccept,&myevents[_EVENT_SIZE_-1],&myevents[_EVENT_SIZE_-1]);
void eventadd(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)
{
ev->fd = fd;
ev->events = events;
//ev->arg = arg;//代表结构体自己,可以通过arg得到结构体的所有信息
ev->call_back = call_back;
struct epoll_event epv;
epv.events = events;
epv.data.ptr = ev;//核心思想
epoll_ctl(gepfd,EPOLL_CTL_ADD,fd,&epv);//上树
}
//修改事件
//eventset(fd,EPOLLOUT,senddata,arg,ev);
void eventset(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)
{
ev->fd = fd;
ev->events = events;
//ev->arg = arg;
ev->call_back = call_back;
struct epoll_event epv;
epv.events = events;
epv.data.ptr = ev;
epoll_ctl(gepfd,EPOLL_CTL_MOD,fd,&epv);//修改
}
//删除事件
void eventdel(xevent *ev,int fd,int events)
{
printf("begin call %s\n",__FUNCTION__);
ev->fd = 0;
ev->events = 0;
ev->call_back = NULL;
memset(ev->buf,0x00,sizeof(ev->buf));
ev->buflen = 0;
struct epoll_event epv;
epv.data.ptr = NULL;
epv.events = events;
epoll_ctl(gepfd,EPOLL_CTL_DEL,fd,&epv);//下树
}
//发送数据
void senddata(int fd,int events,void *arg)
{
printf("begin call %s\n",__FUNCTION__);
xevent *ev = arg;
Write(fd,ev->buf,ev->buflen);
eventset(fd,EPOLLIN,readData,arg,ev);
}
//读数据
void readData(int fd,int events,void *arg)
{
printf("begin call %s\n",__FUNCTION__);
xevent *ev = arg;
ev->buflen = Read(fd,ev->buf,sizeof(ev->buf));
if(ev->buflen>0) //读到数据
{
//void eventset(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)
eventset(fd,EPOLLOUT,senddata,arg,ev);
}
else if(ev->buflen==0) //对方关闭连接
{
Close(fd);
eventdel(ev,fd,EPOLLIN);
}
}
//新连接处理
void initAccept(int fd,int events,void *arg)
{
printf("begin call %s,gepfd =%d\n",__FUNCTION__,gepfd);//__FUNCTION__ 函数名
int i;
struct sockaddr_in addr;
socklen_t len = sizeof(addr);
int cfd = Accept(fd,(struct sockaddr*)&addr,&len);//是否会阻塞?
//查找myevents数组中可用的位置
for(i = 0 ; i < _EVENT_SIZE_; i ++)
{
if(myevents[i].fd==0)
{
break;
}
}
//设置读事件
eventadd(cfd,EPOLLIN,readData,&myevents[i],&myevents[i]);
}
int main(int argc,char *argv[])
{
//创建socket
int lfd = Socket(AF_INET,SOCK_STREAM,0);
//端口复用
int opt = 1;
setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
//绑定
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(8888);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
Bind(lfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
//监听
Listen(lfd,128);
//创建epoll树根节点
gepfd = epoll_create(1024);
printf("gepfd === %d\n",gepfd);
struct epoll_event events[1024];
//添加最初始事件,将侦听的描述符上树
eventadd(lfd,EPOLLIN,initAccept,&myevents[_EVENT_SIZE_],&myevents[_EVENT_SIZE_]);
//void eventadd(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)
while(1)
{
int nready = epoll_wait(gepfd,events,1024,-1);
if(nready<0) //调用epoll_wait失败
{
perr_exit("epoll_wait error");
}
else if(nready>0) //调用epoll_wait成功,返回有事件发生的文件描述符的个数
{
int i = 0;
for(i=0;ifd);
if(xe->events & events[i].events)
{
xe->call_back(xe->fd,xe->events,xe);//调用事件对应的回调
}
}
}
}
//关闭监听文件描述符
Close(lfd);
return 0;
}
二.线程池
1.概述
(1)介绍
**线程池:**是一种多线程编程模式,用于管理和复用线程以处理大量任务。其主要目的是避免频繁创建和销毁线程,提高系统的性能和效率
(2)实现
-
数据结构定义:
- 线程池结构体(ThreadPool):用于维护线程池的状态信息,如线程数量、任务队列、互斥锁、条件变量等。
- 任务结构体(Task):表示要执行的任务,包含一个函数指针和一个参数,用于执行任务的回调函数和传递参数。
-
初始化线程池:
- 创建线程池结构体,并初始化其成员变量,如线程数量、任务队列大小等。
- 初始化互斥锁和条件变量,用于线程间的同步和互斥访问。
- 创建指定数量的线程,并将它们设置为等待状态。
-
任务提交:
- 将任务添加到线程池的任务队列中。
- 如果任务队列已满,阻塞等待直到有空闲位置。
-
线程工作函数:
- 线程从任务队列中取出任务,并执行任务的回调函数。
- 执行完任务后,继续从任务队列中获取下一个任务,直到线程池被销毁或收到关闭信号。
-
线程池销毁:
- 设置线程池的关闭标志,并通知所有线程退出。
- 等待所有线程退出,并释放线程池的资源。
-
线程同步和互斥:
- 使用互斥锁保护共享资源,如任务队列。
- 使用条件变量实现线程的等待和唤醒机制,当任务队列为空或满时,线程进行阻塞等待或唤醒。
-
任务执行:
- 任务被执行时,线程调用任务的回调函数,并传递参数。
- 任务执行完毕后,线程将任务标记为已完成,并释放资源。
2.示例
(1)流程
下面是本程序的大致流程,做辅助理解吧
graph TD;
A["主函数main()"] --> B["创建线程池create_threadpool()"]
B --循环创建指定数目线程--> C(("特定数目的线程"))
B --> ADD(("循环提交任务"))
ADD --任务队列满了--> 阻塞等待
ADD --任务队列没满--> 进行任务添加 --"发送信号(有任务了)"-->Z --> G
C --> D(("线程工作thrRun()"))
D --> E["从队列中获取任务"]
E --> F{"有任务可执行或要销毁线程池吗?"}
F -- 有任务需要处理 --> G["执行任务taskRun()"]
F -- 要销毁线程池 --> SH["销毁线程池"]
F -- 无 --> Z["阻塞等待"]
G --> H{"任务执行完毕了吗?"}
H -- 没有 --> I["休眠1秒"]
I --> H
H -- 是 --> J["任务执行完成"]
J --"通知任务队列任务--"--> E
B --> K{"需要关闭线程池吗?"}
K -- 需要 --> M["销毁线程池destroy_threadpool()"] --发送信号通知线程池关闭--> F
M --> N["等待线程退出Wait for threads to exit"]
N --> O["退出Exit"]
(2)代码
头文件:
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
#include
#include
#include
#include
#include
#include
//任务结构体
typedef struct _PoolTask
{
int tasknum;//模拟任务编号
void *arg;//回调函数参数
void (*task_func)(void *arg);//任务的回调函数
}PoolTask ;
//线程池结构体
typedef struct _ThreadPool
{
int max_job_num;//最大任务个数,任务队列大小
int job_num;//实际任务个数,运行时任务队列上的任务个数
PoolTask *tasks;//任务队列数组
int job_push;//入队位置
int job_pop;// 出队位置
int thr_num;//线程池内线程个数
pthread_t *threads;//线程池内线程数组
int shutdown;//是否关闭线程池
pthread_mutex_t pool_lock;//线程池的锁
pthread_cond_t empty_task;//任务队列为空的条件
pthread_cond_t not_empty_task;//任务队列不为空的条件
}ThreadPool;
void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum 代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
void addtask(ThreadPool *pool);//添加任务到线程池
void taskRun(void *arg);//任务回调函数
#endif
线程池Demo源文件:
//简易版线程池
#include "threadpoolsimple.h"
ThreadPool *thrPool = NULL;//线程池
int beginnum = 1000;
//线程工作函数,arg是线程池
void *thrRun(void *arg)
{
//printf("begin call %s-----\n",__FUNCTION__);
ThreadPool *pool = (ThreadPool*)arg;//这个线程池是主线程传递过来的线程池,但下面的thrPool是全局的线程池,这两个是一样的,不使用pool也可以
int taskpos = 0;//任务位置
PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));//申请一个任务空间,用以将从任务队列中取出的任务拷贝到这个空间,避免因为任务队列中新增任务导致任务被修改
while(1)
{
//获取任务,先要尝试加锁
pthread_mutex_lock(&thrPool->pool_lock);
//无任务并且线程池不是要摧毁,则阻塞等待
while(thrPool->job_num <= 0 && !thrPool->shutdown )
{
//如果没有任务,线程会阻塞,等待生产者添加任务时发送信号
pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);
}
if(thrPool->job_num)
{
//有任务需要处理
taskpos = (thrPool->job_pop++)%thrPool->max_job_num;//这是通过取模的方式来实现循环队列
//printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));//拷贝任务,避免因为生产者添加任务导致任务被修改
task->arg = task;//这里的arg是任务的地址,这样在任务回调函数中就可以通过arg来获取任务的信息
thrPool->job_num--;//通知任务队列减少一个任务
pthread_cond_signal(&thrPool->empty_task);//通知生产者
}
if(thrPool->shutdown)//shutdown为0或1,正好作为C语言中的true和false
{
//代表要摧毁线程池,此时线程退出即可
//pthread_detach(pthread_self());//临死前分家
pthread_mutex_unlock(&thrPool->pool_lock);
free(task);
pthread_exit(NULL);
}
//释放锁
pthread_mutex_unlock(&thrPool->pool_lock);
task->task_func(task->arg);//执行回调函数
}
//printf("end call %s-----\n",__FUNCTION__);
}
//创建线程池
void create_threadpool(int thrnum,int maxtasknum)
{
printf("begin call %s-----\n",__FUNCTION__);
thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));
thrPool->thr_num = thrnum;
thrPool->max_job_num = maxtasknum;
thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁
thrPool->job_push = 0;//任务队列添加的位置
thrPool->job_pop = 0;//任务队列出队的位置
thrPool->job_num = 0;//初始化的任务个数为0
thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列,申请一片连续的内存空间存储任务
//初始化锁和条件变量
pthread_mutex_init(&thrPool->pool_lock,NULL);
pthread_cond_init(&thrPool->empty_task,NULL);
pthread_cond_init(&thrPool->not_empty_task,NULL);
int i = 0;
thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间,用于存储线程id
//创建线程属性,用于下面创建线程设置线程分离属性
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
//循环创建指定个数的线程,组成线程池
for(i = 0;i < thrnum;i++)
{
//线程的回调函数是thrRun,参数是thrPool,即线程池,thrRun就是线程的工作函数
pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程
}
//printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{
pool->shutdown = 1;//开始自爆
pthread_cond_broadcast(&pool->not_empty_task);//诱杀,给所有进程发送信号通知其有任务,让其停止阻塞向下执行,但其不满足job_num,进入下面的 if(thrPool->shutdown),因为其是1,所以会自杀
int i = 0;
//释放线程资源
for(i = 0; i < pool->thr_num ; i++)
{
pthread_join(pool->threads[i],NULL);
}
//释放线程池资源
pthread_cond_destroy(&pool->not_empty_task);
pthread_cond_destroy(&pool->empty_task);
pthread_mutex_destroy(&pool->pool_lock);
//释放申请的内存
free(pool->tasks);
free(pool->threads);
free(pool);
}
//添加任务到线程池
void addtask(ThreadPool *pool)
{
//printf("begin call %s-----\n",__FUNCTION__);
pthread_mutex_lock(&pool->pool_lock);
//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)
while(pool->max_job_num <= pool->job_num)
{
pthread_cond_wait(&pool->empty_task,&pool->pool_lock);
}
int taskpos = (pool->job_push++)%pool->max_job_num;
//printf("add task %d tasknum===%d\n",taskpos,beginnum);
pool->tasks[taskpos].tasknum = beginnum++;
pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];
pool->tasks[taskpos].task_func = taskRun;
pool->job_num++;
pthread_mutex_unlock(&pool->pool_lock);
pthread_cond_signal(&pool->not_empty_task);//通知消费者,有任务了
//printf("end call %s-----\n",__FUNCTION__);
}
//任务回调函数
void taskRun(void *arg)
{
PoolTask *task = (PoolTask*)arg;
int num = task->tasknum;
printf("task %d is runing %lu\n",num,pthread_self());
sleep(1);
printf("task %d is done %lu\n",num,pthread_self());
}
int main()
{
create_threadpool(3,20);
int i = 0;
for(i = 0;i < 50 ; i++)
{
addtask(thrPool);//模拟添加任务
}
sleep(20);
destroy_threadpool(thrPool);
return 0;
}
1 条评论
怎么收藏这篇文章?