一.epoll反应堆

1.设计思想

Epoll反应堆是一种在Linux系统上实现高性能I/O多路复用的机制,通常用于网络编程中。它的设计思想主要包括以下几个方面:

  1. 事件驱动: Epoll反应堆采用事件驱动的模型,即当I/O事件发生时,系统通过回调函数或其他方式通知应用程序。这种设计避免了传统的轮询方式,提高了效率。
  2. 非阻塞I/O: Epoll反应堆利用非阻塞I/O的特性,当某个I/O操作无法立即完成时,不会阻塞整个线程或进程,而是通过回调函数告知应用程序,从而提高了并发性能。
  3. 可扩展性: Epoll反应堆的设计考虑了系统的可扩展性,它能够处理大量的并发连接,而且随着连接数量的增加,性能不会线性下降。
  4. 高效的事件管理: Epoll反应堆通过内核态与用户态之间的事件传递,避免了频繁的上下文切换,减少了系统开销。
  5. 精细的事件选择: Epoll反应堆能够监控多个文件描述符上的事件,并且在事件发生时通知应用程序,这种精细的事件选择机制能够减少不必要的系统资源消耗。
  6. 支持边缘触发和水平触发: Epoll反应堆支持两种工作模式,边缘触发和水平触发。边缘触发模式只在状态变化时通知应用程序,而水平触发模式则会持续通知直到事件处理完毕。

2.边缘触发

**目的:**通过事件类型的快速转换,使程序能够在事件发生时及时做出响应,并尽可能地减少不必要的事件触发,以提高性能和效率。

**体现:**本代码中,在readDatasendData中反映了边缘触发的模型,实现了事件的快速响应和高效处理

  • 当写事件被触发时,会先读取客户端数据,再立即将事件设置为写事件,待缓冲区有足够空间写入数据时,才会触发写事件写出读取到的数据

    这样避免了频繁触发写事件,从而提高效率

  • 当写事件被触发时,会先写出从客户端读取的数据,而后立即将事件类型设置回读事件

    这样做却道了客户端发送更多数据之前,及时准备好接收

graph TD; subgraph 监听cfd A[新事件到达] --> B[根据事件类型执行相关任务] B --> |读事件| C[读取数据] --> D{读到数据?} D -- 是 --> E[切换为写事件] D -- 否 --> F[关闭连接、下树] B --> |写事件| G[写出数据] --> H[切换为读事件] end

3.示例

(1)流程图

graph TD; subgraph 初始化 A["包含必要的头文件"] --> B["定义常量"] B --> C["声明全局变量"] C --> D["定义事件结构体"] end subgraph 事件管理 D --> E["添加事件: eventadd()"] D --> F["修改事件: eventset()"] D --> G["删除事件: eventdel()"] end subgraph 事件回调 H["发送数据: senddata()"] --> J{"事件触发"} I["读取数据: readData()"] --> J J --> K["初始连接接受: initAccept()"] end subgraph 主函数 L["创建套接字,绑定,监听"] M["创建epoll实例"] N["将初始事件添加到epoll"] L --> M --> N --> O["无限循环"] O --> P["epoll_wait()"] P --> Q["处理事件"] Q --> R["检索相应的xevent"] R --> S["调用关联的回调函数"] S --> T["处理新事件"] end subgraph 清理 O --> U["终止条件"] U --> V["关闭监听套接字"] V --> W["退出程序"] end subgraph 处理新事件 T["检查事件类型"] T -->|新连接事件| U["执行initAccept()"] T -->|已存在客户端事件| V["根据事件类型执行相关任务"] end

(2)代码

① 执行过程
  1. 初始化

    • 包含了必要的头文件。
    • 定义了缓冲区长度和事件大小等常量。
    • 声明了全局变量,包括epoll文件描述符gepfd和名为myeventsxevent结构数组。
  2. 事件结构体

    • xevent结构体定义了一个事件,包括文件描述符、事件、回调函数、缓冲区和其他相关数据。
  3. 事件管理函数

    • eventadd:向epoll实例添加事件。
    • eventset:修改现有事件。
    • eventdel:从epoll实例中删除事件。
  4. 事件回调函数

    • senddata:处理发送数据的回调函数。
    • readData:处理读取数据的回调函数。
    • initAccept:处理初始连接接受的回调函数。
  5. 主函数

    • 创建套接字、绑定和监听。
    • 创建epoll实例(gepfd)。
    • 添加初始事件(initAccept)到epoll实例。
    • 在一个无限循环中:
      • epoll_wait等待epoll实例上的事件发生。
      • 当有事件发生时:
        • 遍历事件。
        • 获取相应的xevent结构体。
        • 调用与事件关联的回调函数。
  6. 清理

    • 主循环在满足终止条件时退出。
    • 关闭监听套接字。
    • 程序退出。
② code
//反应堆简单版
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include "wrap.h"

#define _BUF_LEN_  1024
#define _EVENT_SIZE_ 1024

//全局epoll树的根
int gepfd = 0;

//事件驱动结构体
typedef struct xx_event{
    int fd;
    int events;
    void (*call_back)(int fd,int events,void *arg);
    void *arg;
    char buf[1024];
    int buflen;
    int epfd;
}xevent;

xevent myevents[_EVENT_SIZE_+1];

void readData(int fd,int events,void *arg);

//添加事件
//eventadd(lfd,EPOLLIN,initAccept,&myevents[_EVENT_SIZE_-1],&myevents[_EVENT_SIZE_-1]);
void eventadd(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)
{
    ev->fd = fd;
    ev->events = events;
    //ev->arg = arg;//代表结构体自己,可以通过arg得到结构体的所有信息
    ev->call_back = call_back;

    struct epoll_event epv;
    epv.events = events;
    epv.data.ptr = ev;//核心思想
    epoll_ctl(gepfd,EPOLL_CTL_ADD,fd,&epv);//上树
}

//修改事件
//eventset(fd,EPOLLOUT,senddata,arg,ev);
void eventset(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)
{
    ev->fd = fd;
    ev->events = events;
    //ev->arg = arg;
    ev->call_back = call_back;

    struct epoll_event epv;
    epv.events = events;
    epv.data.ptr = ev;
    epoll_ctl(gepfd,EPOLL_CTL_MOD,fd,&epv);//修改
}

//删除事件
void eventdel(xevent *ev,int fd,int events)
{
	printf("begin call %s\n",__FUNCTION__);

    ev->fd = 0;
    ev->events = 0;
    ev->call_back = NULL;
    memset(ev->buf,0x00,sizeof(ev->buf));
    ev->buflen = 0;

    struct epoll_event epv;
    epv.data.ptr = NULL;
    epv.events = events;
    epoll_ctl(gepfd,EPOLL_CTL_DEL,fd,&epv);//下树
}

//发送数据
void senddata(int fd,int events,void *arg)
{
    printf("begin call %s\n",__FUNCTION__);

    xevent *ev = arg;
    Write(fd,ev->buf,ev->buflen);
    eventset(fd,EPOLLIN,readData,arg,ev);
}

//读数据
void readData(int fd,int events,void *arg)
{
    printf("begin call %s\n",__FUNCTION__);
    xevent *ev = arg;

    ev->buflen = Read(fd,ev->buf,sizeof(ev->buf));
    if(ev->buflen>0) //读到数据
	{	
		//void eventset(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)
        eventset(fd,EPOLLOUT,senddata,arg,ev);

    }
	else if(ev->buflen==0) //对方关闭连接
	{
        Close(fd);
        eventdel(ev,fd,EPOLLIN);
    }

}
//新连接处理
void initAccept(int fd,int events,void *arg)
{
    printf("begin call %s,gepfd =%d\n",__FUNCTION__,gepfd);//__FUNCTION__ 函数名

    int i;
    struct sockaddr_in addr;
    socklen_t len = sizeof(addr);
    int cfd = Accept(fd,(struct sockaddr*)&addr,&len);//是否会阻塞?
	
	//查找myevents数组中可用的位置
    for(i = 0 ; i < _EVENT_SIZE_; i ++)
	{
        if(myevents[i].fd==0)
		{
            break;
        }
    }

    //设置读事件
    eventadd(cfd,EPOLLIN,readData,&myevents[i],&myevents[i]);
}

int main(int argc,char *argv[])
{
	//创建socket
    int lfd = Socket(AF_INET,SOCK_STREAM,0);

    //端口复用
    int opt = 1;
    setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));

	//绑定
    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8888);
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    Bind(lfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
    
	//监听
    Listen(lfd,128);

	//创建epoll树根节点
    gepfd = epoll_create(1024);
    printf("gepfd === %d\n",gepfd);

    struct epoll_event events[1024];

    //添加最初始事件,将侦听的描述符上树
    eventadd(lfd,EPOLLIN,initAccept,&myevents[_EVENT_SIZE_],&myevents[_EVENT_SIZE_]);
    //void eventadd(int fd,int events,void (*call_back)(int ,int ,void *),void *arg,xevent *ev)

    while(1)
	{
        int nready = epoll_wait(gepfd,events,1024,-1);
		if(nready<0) //调用epoll_wait失败
		{
			perr_exit("epoll_wait error");
			
		}
        else if(nready>0) //调用epoll_wait成功,返回有事件发生的文件描述符的个数
		{
            int i = 0;
            for(i=0;ifd);

                if(xe->events & events[i].events)
				{
                    xe->call_back(xe->fd,xe->events,xe);//调用事件对应的回调
                }
            }
        }
    }

	//关闭监听文件描述符
	Close(lfd);

    return 0;
}

二.线程池

1.概述

(1)介绍

**线程池:**是一种多线程编程模式,用于管理和复用线程以处理大量任务。其主要目的是避免频繁创建和销毁线程,提高系统的性能和效率

(2)实现

  1. 数据结构定义
    • 线程池结构体(ThreadPool):用于维护线程池的状态信息,如线程数量、任务队列、互斥锁、条件变量等。
    • 任务结构体(Task):表示要执行的任务,包含一个函数指针和一个参数,用于执行任务的回调函数和传递参数。
  2. 初始化线程池
    • 创建线程池结构体,并初始化其成员变量,如线程数量、任务队列大小等。
    • 初始化互斥锁和条件变量,用于线程间的同步和互斥访问。
    • 创建指定数量的线程,并将它们设置为等待状态。
  3. 任务提交
    • 将任务添加到线程池的任务队列中。
    • 如果任务队列已满,阻塞等待直到有空闲位置。
  4. 线程工作函数
    • 线程从任务队列中取出任务,并执行任务的回调函数。
    • 执行完任务后,继续从任务队列中获取下一个任务,直到线程池被销毁或收到关闭信号。
  5. 线程池销毁
    • 设置线程池的关闭标志,并通知所有线程退出。
    • 等待所有线程退出,并释放线程池的资源。
  6. 线程同步和互斥
    • 使用互斥锁保护共享资源,如任务队列。
    • 使用条件变量实现线程的等待和唤醒机制,当任务队列为空或满时,线程进行阻塞等待或唤醒。
  7. 任务执行
    • 任务被执行时,线程调用任务的回调函数,并传递参数。
    • 任务执行完毕后,线程将任务标记为已完成,并释放资源。

2.示例

(1)流程

下面是本程序的大致流程,做辅助理解吧

graph TD; A["主函数main()"] --> B["创建线程池create_threadpool()"] B --循环创建指定数目线程--> C(("特定数目的线程")) B --> ADD(("循环提交任务")) ADD --任务队列满了--> 阻塞等待 ADD --任务队列没满--> 进行任务添加 --"发送信号(有任务了)"-->Z --> G C --> D(("线程工作thrRun()")) D --> E["从队列中获取任务"] E --> F{"有任务可执行或要销毁线程池吗?"} F -- 有任务需要处理 --> G["执行任务taskRun()"] F -- 要销毁线程池 --> SH["销毁线程池"] F -- 无 --> Z["阻塞等待"] G --> H{"任务执行完毕了吗?"} H -- 没有 --> I["休眠1秒"] I --> H H -- 是 --> J["任务执行完成"] J --"通知任务队列任务--"--> E B --> K{"需要关闭线程池吗?"} K -- 需要 --> M["销毁线程池destroy_threadpool()"] --发送信号通知线程池关闭--> F M --> N["等待线程退出Wait for threads to exit"] N --> O["退出Exit"]

(2)代码

头文件:

#ifndef _THREADPOOL_H
#define _THREADPOOL_H

#include 
#include 
#include 
#include 
#include 
#include 

//任务结构体
typedef struct _PoolTask
{
    int tasknum;//模拟任务编号
    void *arg;//回调函数参数
    void (*task_func)(void *arg);//任务的回调函数
}PoolTask ;

//线程池结构体
typedef struct _ThreadPool
{
    int max_job_num;//最大任务个数,任务队列大小
    int job_num;//实际任务个数,运行时任务队列上的任务个数
    PoolTask *tasks;//任务队列数组
    int job_push;//入队位置
    int job_pop;// 出队位置

    int thr_num;//线程池内线程个数
    pthread_t *threads;//线程池内线程数组
    int shutdown;//是否关闭线程池
    pthread_mutex_t pool_lock;//线程池的锁
    pthread_cond_t empty_task;//任务队列为空的条件
    pthread_cond_t not_empty_task;//任务队列不为空的条件

}ThreadPool;

void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum  代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
void addtask(ThreadPool *pool);//添加任务到线程池
void taskRun(void *arg);//任务回调函数

#endif

线程池Demo源文件:

//简易版线程池
#include "threadpoolsimple.h"

ThreadPool *thrPool = NULL;//线程池

int beginnum = 1000;

//线程工作函数,arg是线程池
void *thrRun(void *arg)
{
    //printf("begin call %s-----\n",__FUNCTION__);
    ThreadPool *pool = (ThreadPool*)arg;//这个线程池是主线程传递过来的线程池,但下面的thrPool是全局的线程池,这两个是一样的,不使用pool也可以
    int taskpos = 0;//任务位置
    PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));//申请一个任务空间,用以将从任务队列中取出的任务拷贝到这个空间,避免因为任务队列中新增任务导致任务被修改

    while(1)
	{
        //获取任务,先要尝试加锁
        pthread_mutex_lock(&thrPool->pool_lock);

		//无任务并且线程池不是要摧毁,则阻塞等待
        while(thrPool->job_num <= 0 && !thrPool->shutdown )
		{
			//如果没有任务,线程会阻塞,等待生产者添加任务时发送信号
            pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);
        }
        
        if(thrPool->job_num)
		{
            //有任务需要处理
            taskpos = (thrPool->job_pop++)%thrPool->max_job_num;//这是通过取模的方式来实现循环队列
            //printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());

            memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));//拷贝任务,避免因为生产者添加任务导致任务被修改
            task->arg = task;//这里的arg是任务的地址,这样在任务回调函数中就可以通过arg来获取任务的信息
            thrPool->job_num--;//通知任务队列减少一个任务
            pthread_cond_signal(&thrPool->empty_task);//通知生产者
        }

        if(thrPool->shutdown)//shutdown为0或1,正好作为C语言中的true和false
		{
            //代表要摧毁线程池,此时线程退出即可
            //pthread_detach(pthread_self());//临死前分家
            pthread_mutex_unlock(&thrPool->pool_lock);
            free(task);
			pthread_exit(NULL);
        }

        //释放锁
        pthread_mutex_unlock(&thrPool->pool_lock);
        task->task_func(task->arg);//执行回调函数
    }
    
    //printf("end call %s-----\n",__FUNCTION__);
}

//创建线程池
void create_threadpool(int thrnum,int maxtasknum)
{
    printf("begin call %s-----\n",__FUNCTION__);
    thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));

    thrPool->thr_num = thrnum;
    thrPool->max_job_num = maxtasknum;
    thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁
    thrPool->job_push = 0;//任务队列添加的位置
    thrPool->job_pop = 0;//任务队列出队的位置
    thrPool->job_num = 0;//初始化的任务个数为0

    thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列,申请一片连续的内存空间存储任务

    //初始化锁和条件变量
    pthread_mutex_init(&thrPool->pool_lock,NULL);
    pthread_cond_init(&thrPool->empty_task,NULL);
    pthread_cond_init(&thrPool->not_empty_task,NULL);

    int i = 0;
    thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间,用于存储线程id
	
    //创建线程属性,用于下面创建线程设置线程分离属性
	pthread_attr_t attr;
	pthread_attr_init(&attr);
	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

    //循环创建指定个数的线程,组成线程池
    for(i = 0;i < thrnum;i++)
	{
        //线程的回调函数是thrRun,参数是thrPool,即线程池,thrRun就是线程的工作函数
        pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程
    }

    //printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{
    pool->shutdown = 1;//开始自爆
    pthread_cond_broadcast(&pool->not_empty_task);//诱杀,给所有进程发送信号通知其有任务,让其停止阻塞向下执行,但其不满足job_num,进入下面的 if(thrPool->shutdown),因为其是1,所以会自杀

    int i = 0;
    //释放线程资源
    for(i = 0; i < pool->thr_num ; i++)
	{
        pthread_join(pool->threads[i],NULL);
    }
    //释放线程池资源
    pthread_cond_destroy(&pool->not_empty_task);
    pthread_cond_destroy(&pool->empty_task);
    pthread_mutex_destroy(&pool->pool_lock);
    //释放申请的内存
    free(pool->tasks);
    free(pool->threads);
    free(pool);
}

//添加任务到线程池
void addtask(ThreadPool *pool)
{
    //printf("begin call %s-----\n",__FUNCTION__);
    pthread_mutex_lock(&pool->pool_lock);

	//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)
    while(pool->max_job_num <= pool->job_num)
	{
        pthread_cond_wait(&pool->empty_task,&pool->pool_lock);
    }

    int taskpos = (pool->job_push++)%pool->max_job_num;
    //printf("add task %d  tasknum===%d\n",taskpos,beginnum);
    pool->tasks[taskpos].tasknum = beginnum++;
    pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];
    pool->tasks[taskpos].task_func = taskRun;
    pool->job_num++;

    pthread_mutex_unlock(&pool->pool_lock);

    pthread_cond_signal(&pool->not_empty_task);//通知消费者,有任务了
    //printf("end call %s-----\n",__FUNCTION__);
}

//任务回调函数
void taskRun(void *arg)
{
    PoolTask *task = (PoolTask*)arg;
    int num = task->tasknum;
    printf("task %d is runing %lu\n",num,pthread_self());

    sleep(1);
    printf("task %d is done %lu\n",num,pthread_self());
}


int main()
{
    create_threadpool(3,20);
    int i = 0;
    for(i = 0;i < 50 ; i++)
	{
        addtask(thrPool);//模拟添加任务
    }

    sleep(20);
    destroy_threadpool(thrPool);

    return 0;
}
最后修改:2024 年 05 月 03 日
如果觉得我的文章对你有用,请随意赞赏