目录 1.实现目标 2.HTTP服务器 3.Reactor模型 3.1分类 4.功能模块划分: 4.1SERVER模块: 4.2HTTP协议模块: 5.简单的秒级定时任务实现 5.1Linux提供给我们的定时器 5.2时间轮思想: 6.正
目录
代码仓库:https://gitee.com/lc-yulin/server
仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器:
通过实现的高并发服务器组件,可以简洁快速的完成一个高性能的服务器搭建。并且,通过组件内提供的不同应用层协议支持,也可以快速完成一个高性能应用服务器的搭建(当前为了便于项目的演示,项目中提供Http协议组件的支持)
在这里,要明确的是要实现的是一一个高并发服务器组件,因此当前的项目中并不包含实际的业务内容。
概念:
HTTP (Hyper Text Transfer Protocol),超文本传输协议是应用层协议,是一种简单的请求响应协
议(客户端根据自己的需要向服务器发送请求,服务器针对请求提供服务,完毕后通信结束)。
HTTP协议是一个运行在tcp协议之上的应用层协议,这一点本质上是告诉我们,HTTP服务器其实就是个TCP服务器,只不过在应用层基于HTTP协议格式进行数据的组织和解析来明确客户端的请求并完成业务处理。因此实现HTTP服务器简单理解,只需要以下几步即可
1.搭建一个TCP服务器,接收客户端请求。
2.以HTTP协议格式进行解析请求数据,明确客户端目的。
3.明确客户端请求目的后提供对应服务。
4.将服务结果- -HTTP协议格式进行组织,发送给客户端
实现一个HTTP服务器很简单,但是实现一个高性能的服务器并不简单,这个单元中将讲解基于
Reactor模式的高性能服务器实现。当然准确来说,因为要实现的服务器本身并不存在业务,咱们要实现的应该算是一个高性能服务器基础库,是一个基础组件。
概念
Reactor模式,是指通过一个或多个输入同时传递给服务器进行请求处理时的事件驱动处理模式。
服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫
Dispatcher模式。
简单理解就是使用I/0多路复用统一监听事件,收到事件后分发给处理进程或线程,是编写高性能
网络服务器的必备技术之一。
单Reactor单线程:单I/O多路复用+业务处理
1.通过I0多路复用模型进行客户端请求监控
2.触发事件后,进行事件处理
a.如果是新建连接请求,则获取新建连接,并添加至多路复用模型进行事件监控。
b.如果是数据通信请求,则进行对应数据处理(接收数据,处理数据,发送响应)
优点:所有操作在同-个线程中完成,思想编码较为简单,不涉及资源问题抢夺问题
缺点:无法有效利用CPU多核资源,容易达到性能瓶颈
使用场景:翻于客户端数量较少的场景
单Reactor多线程:
1. Reactor线程通过I/0多路复用模型进行客户端请求监控
2.触发事件后,进行事件处理
a.如果是新建连接请求,则获取新建连接,并添加至多路复用模型进行事件监控。
b.如果是数据通信请求,则接收数据后分发给Worker线程池进行业务处理。
c.工作线程处理完毕后,将响应交给Reactor线程进行数据响应
优点:充分利用CPU多核资源
缺点:多线程间的数据共享访问控制较为复杂,单个Reactor承担所有事件的监听和响应,在单线程中运行,高并发场景下容易成为性能瓶颈。
多Reactor多线程:多|/O多路复用+线程池(业务处理)
1.在主Reactor中处理新连接请求事件,有新连接到来则分发到子Reactor中监控
2.在子Reactor中进行客户端通信监控,有事件触发,则接收数据分发给Worker线程池
3. Worker线程 池分配独立的线程进行具体的业务处理
a.工作线程处理完毕后,将响应交给子Reactor线程进行数据响应
优点:充分利用CPU多核资源,主从Reactor各司其职
目标定位: One Thread One Loop主从Reactor模型高并发服务器
要实现的是主从Reactor模型服务器,也就是主Reactor线程仅仅监控监听描述符,获取新建连接,保证获取新连接的高效性,提高服务器的并发性能。
主Reactor获取到新连接后分发给子Reactor进行通信事件监控。而子Reactor线程 监控各自的描述符的读写事件进行数据读写以及业务处理。
One Thread One Loop的思想就是把所有的操作都放到一个线程 中进行,一个线程对应一个事件处理的循环。
当前实现中,因为并不确定组件使用者的使用意向,因此并不提供业务层工作线程池的实现,只实现主从Reactor,而Worker工作线程池,可由组件库的使用者的需要自行决定是否使用和实现。
基于以上的理解,我们要实现的是一一个带有协议支持的Reactor模型高性能服务器,因此将整个项目的实现划分为两个大的模块:
●SERVER模块: 实现Reactor模型的TCP服务器;
●协议模块:对当前的Reactor模型服务器提供应用层协议支持。
SERVER模块就是对所有的连接以及线程进行管理,让它们各司其职,在合适的时候做合适的事,最终完成高性能服务器组件的实现。而具体的管理也分为三个方面:
●监听连接管理: 对监听连接进行管理。
●通信连接管理:对通信连接进行管理。
●超时连接 管理:对超时连接进行管理
基于以上的管理思想,将这个模块进行细致的划分又可以划分为以下多个子模块:
Buffer模块:
功能:用于实现套接字的用户缓冲区
意义:
a.防止接收到的数据不是完整的数据,因此对接受的数据进行缓存
b.对于客户端响应的数据,应该是套接字可写的情况下进行发送
功能设计:
a.向缓冲区添加数据
b.从缓冲区中取出数据
Socket模块:
功能:对Socket套接字的操作进行封装
意义:程序中对于套接字的各项操作更加便捷
功能设计:
a.创建套接字
b.绑定地址信息
c.开始监听
d.向服务器发起连接
e.获取新连接
f.接受数据
g.发送数据
h.关闭套接字
i.创建一个监听连接
j.创建一个客户端连接
Channe|模块:
功能:对于一个描述符进行监控事件管理
意义:对于描述符的监控事件在用户态更容易维护,以及触发事件后的操作流程更加清晰
功能设计:
a.对监控事件的管理:
描述符是否可读描述符是否可写对描述符监控可读对描述符监控可写
解除可读事件监控解除可写事件监控解除所有事件监控
b.对监控事件触发后的处理
设置不同的回调函数->明确出发了某个事件之后应该如何处理
Connection模块
Connection模块是对Buffer模块,Socket模块, Channel模块的一个整体封装,实现了对一个通信套接字的整体的管理,每一个进行数据通信的套接字(也就是accept获取到的新连接)都会使用Connection进行管理。
●Connection模块内部包含有三个由组件使用者传入的回调函数:连接建立完成回调,事件回调,新数据回调,关闭回调。
●Connection模块内部包含有两个组件使用者提供的接口:数据发送接口,连接关闭接口,●Connection模块内部包含有两个用户态缓冲区:用户态接收缓冲区,用户态发送缓冲区
●Connection模块内部包含有一个Socket对象:完成描述符面向系统的I0操作
●Connection模块内部包含有-个Channel对象:完成描述符I0事件就绪的处理
具体处理流程如下:
1.实现向Channel提供可读,可写,错误等不同事件的I0事件回调函数,然后将Channel和对应的描述符添加到Poller事件监控中。
2.当描述符在Poller模块中就绪了I0可读事件,则调用描述符对应Channel中保存的读事件处理函数,进行数据读取,将socket接收缓冲区全部读取到Connection管理的用户态接收缓冲区中。然后调用由组件使用者传入的新数据到来回调函数进行处理。
3.组件使用者进行数据的业务处理完毕后,通过Connection向使用者提供的数据发送接口,将数据.写入Connection的发送缓冲区中。
4.启动描述符在Pol模块中的I0写事件监控,就绪后,调用Channel中保存的写事件处理函数,将发送缓冲区中的数据通过Socket进行面向系统的实际数据发送。
Acceptor模块:
Acceptor模块是对Socket模块,Channel模块的一个整体封装,实现了对一个监听套接字的整体的管理。
●Acceptor模块内部包含有-个Socket对象:实现监听套接字的操作
●Acceptor模块内部包含有一个Channel对象:实现监听套接字I0事件就绪的处理
具体处理流程如下:
1.实现向Channel提供可读事件的10事件处理回调函数,函数的功能其实也就是获取新连接
2.为新连接构建一个Connection对象出来。
TimerQueue模块:
功能:定时任务模块,让一个任务可以在指定的时间之后执行
意义:组件内部,对于非活跃连接希望在N秒时候被释放
功能设计:
a.添加定时任务
b.刷新定时任务->希望一个定时任务重新开始计时
c.取消定时任务
Poller模块:
功能:对任意的描述符进行I0事件监控
意义:对epolI进行封装,让对描述符进行事件监控的操作更加简单
功能接口:
a.添加事件监控->Channel模块
b.修改事件监控
c.移除事件监控
EventLoop模块:
EventLoop模块可以理解就是我们上边所说的Reactor模块,它是对Poller模块 ,TimerQueue模块,Socket模块的一个整体封装,进行所有描述符的事件监控。
EventLoop模块必然是一个对象对应一个线程的模块,线程内部的目的就是运行EventLoop的启动函数。
EventLoop模块为了保证整个服务器的线程安全问题,因此要求使用者对于Connection的所有操作一定要在其对应的EventLoop线程内完成,不能在其他线程中进行(比如组件使用者使Connection发送数据,以及关闭连接这种操作)。
EventLoop模块保证自己内部所监控的所有描述符,都要是活跃连接,非活跃连接就要及时释放避免资源浪费。
EventLoop模块内部包含有一个eventfd: eventfd其 实就是linux内核提供的一个事件fd,专门用于事件通知。
●EventLoop模块内部包含有一个Poller对象:用于进行描述符的I0事件监控。
●EventL oop模块内部包含有一个TimerQueue对象:用于进行定时任务的管理。
●EventL oop模块内部包含有一个PendingTask队列:组件使用者将对Connection进行的所有操作,都加入到任务队列中,由EventLoop模块进行管理,并在EventLoop对应的线程 中进行执行。●每一个Connection对象都会绑定到一个EventLoop.上,这样能保证对这个连接的所有操作都是在一个线程中完成的。
具体操作流程:
1.通过Poller模块对当前模块管理内的所有描述符进行I0事件监控,有描述符事件就绪后,通过描述符对应的Channel进行事件处理。
2.所有就绪的描述符IO事件处理完毕后,对任务队列中的所有操作顺序进行执行。
3.由于epoll的事件监控,有可能会因为没有事件到来而持续阻塞,导致任务队列中的任务不能及时得到执行,因此创建了eventfd,添加到Poller的事件监控中,用于实现每次向任务队列添加任务的时候,通过向eventfd写入数据来唤醒epoll的阻塞。
TcpServer模块:
这个模块是一个整体Tcp服务器模块的封装,内部封装了Acceptor模块,EventLoopThreadPool模块。
●TcpServer中包含有一个EventLoop对象:以备在超轻量使用场景中不需要EventLoop线程池, 只需要在主线程中完成所有操作的情况。
●TcpServer模块内部包含有-个EventL oopThreadPool对象: 其实就是EventLoop线程池,也就是子Reactor线程池
●TcpServer模块内部包含有-个Acceptor对象: 一个TcpServer服务器, 必然对应有- -个监听套接字,能够完成获取客户端新连接,并处理的任务。
TcpServer模块内部包含有一个std::shared_ ptr的hash表: 保存了所有的新建连接对应的Connection,注意,所有的Connection使用shared_ ptr进行管理,这样能够保证在hash表中删除了Connection信息后,在shared_ptr计数器 为0的情况下完成对Connection资源的释放操作。
具体操作流程如下:
在实例化TcpServer对象过程中,完成BaseLoop的设置, Acceptor对 象的实例化,以及EventLoop线程池的实例化,以及std::shared_ ptr
的hash表的实例化。
2.为Acceptor对象设置回调函数:获取到新连接后,为新连接构建Connection对象,设置
Connection的各项回调,并使用shared_ptr进行管理,并添加到hash表中进行管理,并为
Connection选择一个EventLoop线程 ,为Connection添加一个定时销毁任务,为Connection添加事件监控
3.启动BaseLoop。
模块关系图:
HTTP协议模块用于对高并发服务器模块进行协议支持,基于提供的协议支持能够更方便的完成指定协议服务器的搭建。
而HTTP协议支持模块的实现,可以细分为以下几个模块。
Util模块:
这个模块是一个工具模块,主要提供HTTP协议模块所用到的一些工具函数,比如url编解码,文件读,写...等。
HttpRequest模块:
这个模块是HTTP请求数据模块,用于保存HTTP请求数据被解析后的各项请求元素信息。
HttpResponse模块:
这个模块是HTTP响应数据模块,用于业务处理后设置并保存HTTP响应数据的的各项元素信息,最终会被按照HTTP协议响应格式组织成为响应信息发送给客户端。
HttpContext模块:
这个模块是一个HTTP请求接收的上下文模块,主要是为了防止在一次接收的数据中,不是一个完整的HTTP请求,则解析过程并未完成,无法进行完整的请求处理,需要在下次接收到新数据后继续根据上下文进行解析,最终得到一个HttpRequest请求信息对象,因此在请求数据的接收以及解析部分需要一个上下文来进行控制接收和处理节奏。
HttpServer模块:
这个模块是最终给组件使用者提供的HTTP服务器模块了,用于以简单的接口实现HTTP服务器的搭建。
httpserver模块内部包含有一个TcpServer对象: TcpServer对象实现服务器的搭建
HttpServer模块内部包含有两个提供给TcpServer对象的接口:连接建立成功设置上下文接口,数据处理接口。
HttpServer模块内部包含有一个hash-map表存储请求与处理函数的映射表:组件使用者向
HttpServer设置哪些请求应该使用哪些函数进行处理,等TcpServer收到对应的请求就会使用对应的函数进行处理。
在当前的⾼并发服务器中,我们不得不考虑⼀个问题,那就是连接的超时关闭问题。我们需要避免⼀个连接⻓时间不通信,但是也不关闭,空耗资源的情况。
这时候我们就需要⼀个定时任务,定时的将超时过期的连接进⾏释放。
#include int timerfd_create(int clockid, int flags);clockid: CLOCK_REALTIME-系统实时时间,如果修改了系统时间就会出问题;CLOCK_MONOTONIC-从开机到现在的时间是⼀种相对时间;flags: 0-默认阻塞属性int timerfd_settime(int fd, int flags, struct itimerspec *new, structitimerspec *old);fd: timerfd_create返回的⽂件描述符flags: 0-相对时间, 1-绝对时间;默认设置为0即可.new: ⽤于设置定时器的新超时时间old: ⽤于接收原来的超时时间struct timespec {time_t tv_sec; long tv_nsec; };struct itimerspec {struct timespec it_interval; struct timespec it_value; };
定时器会在每次超时时,自动给fd中写⼊8字节的数据,表⽰在上⼀次读取数据到当前读取数据期间超时了多少次。
实例:
#include#include#include#include#includeint main(){ //1.创建一个定时器: int timerfd = timerfd_create(CLOCK_MONOTONIC,0); if(timerfd < 0) { perror("create fail!"); return -1; } //2.启动定时器: struct itimerspec itime; itime.it_value.tv_sec = 1; itime.it_value.tv_nsec = 0;//第一次超时时间为1s后 itime.it_interval.tv_sec = 1; itime.it_interval.tv_nsec = 0; //第一次超时后,每次超时的间隔时 // 这个定时器描述符将每隔1秒都会触发⼀次可读事件 timerfd_settime(timerfd,0,&itime,NULL); while(1) { uint64_t times; int ret = read(timerfd,×,sizeof(times)); if(ret < 0) { perror("read error!"); return -1; } printf("超时了,距离上一次超时了%ld次\n", times); } close(timerfd); return 0;}
[myl@VM-8-12-Centos example]$ ./timerfd超时了,距离上一次超时了1次超时了,距离上一次超时了1次超时了,距离上一次超时了1次
上边例子,是⼀个定时器的使用实例,是每隔1s钟触发⼀次定时器超时,否则就会阻塞在read读取数据这里。基于这个例⼦,则我们可以实现每隔1s,检测⼀下哪些连接超时了,然后将超时的连接释放掉。
上述的例子,存在⼀个很⼤的问题,每次超时都要将所有的连接遍历⼀遍,如果有上万个连接,效率⽆疑是较为低下的。这时候⼤家就会想到,我们可以针对所有的连接,根据每个连接最近⼀次通信的系统时间建立⼀个小根堆,这样只需要每次针对堆顶部分的连接逐个释放,直到没有超时的连接为止,这样也可以大大提高处理的效率。
上述方法可以实现定时任务,但是这⾥给⼤家介绍另⼀种方案:时间轮
时间轮的思想来源于钟表,如果我们定了⼀个3点钟的闹铃,则当时针走到3的时候,就代表时间到
了。
同样的道理,如果我们定义了⼀个数组,并且有⼀个指针,指向数组起始位置,这个指针每秒钟向后走动⼀步,⾛到哪里,则代表哪里的任务该被执⾏了,那么如果我们想要定⼀个3s后的任务,则只需要将任务添加到tick+3位置,则每秒中走一步,三秒钟后tick⾛到对应位置,这时候执行对应位置的任务即可。
但是,同⼀时间可能会有⼤批量的定时任务,因此我们可以给数组对应位置下拉⼀个数组,这样就可以在同⼀个时刻上添加多个定时任务了。
存在的问题:
当某个任务在超时时间内收到了一次数据请求,此时就需要延长定期时间
解决方式:使用智能指针 + 析构函数
将定时任务封装成一个类,类实例化的每一个对象,就是一个定时任务对象,当对象销毁的时候,再去执行定时任务(将定时任务的执行放到析构函数中)
shared_ptr用于对new的对象进行空间管理,当shared_ptr对对象进行管理的时候,内部有一个计数器,当计数器为0的时候,则释放所管理的对象
收到新的数据请求之后又构建一个shared_ptr对象加入到数组中,此时计数器变为2,进而达到了延长定时任务的效果。
实例:
#include#include#include#include#include#includeusing TaskFunc = std::function;using ReleaseFunc = std::function;class TimerTask{private: uint64_t _id; //定时器任务对象ID uint32_t _timeout; //定时任务的超时时间 bool _canceled; //false表示没有取消 true表示被取消 TaskFunc _task_cb; //定时器对象要执行的定时任务 ReleaseFunc _release; //用于删除TimerWheel中保存的定时器对象信息public: TimerTask(uint64_t id,uint32_t delay,const TaskFunc& cb) :_id(id),_timeout(delay),_task_cb(cb),_canceled(false) {} void SetRelease(const ReleaseFunc& cb) {_release = cb;} uint32_t DelayTime() {return _timeout;} void Cancel() {_canceled = true;} ~TimerTask() { if(_canceled == false) _task_cb(); _release(); }};class TimerWheel{private: //解决出现不同的智能指针管理同一个对象,当一个智能指针释放该对象之后,另一个智能指针管理一个空的对象 //所以使用weak_ptr和id关联起来,每次使用智能指针管理对象的时候都能找到该对象被哪个智能指针对象管理 //只需要引用计数增加,而不是再使用一个智能指针对象进行管理,weak_ptr的特点是管理对象资源,但是不增加 //引用计数 using WeakTask = std::weak_ptr; using PtrTask = std::shared_ptr; int _tick; //当前的秒针,走到哪里释放哪里,释放哪里就相当于执行哪里的任务 int _capacity; //表盘的最大数量,也就是最大延迟时间 std::vector> _wheel; std::unordered_map _timers;private: void RemoveTimer(uint64_t id) { auto it = _timers.find(id); if(it != _timers.end()) _timers.erase(it); }public: TimerWheel():_capacity(60),_tick(0),_wheel(_capacity){} //1.添加定时任务: void TimerAdd(uint64_t id,uint32_t delay,const TaskFunc& cb) { PtrTask pt(new TimerTask(id,delay,cb)); pt->SetRelease(std::bind(&TimerWheel::RemoveTimer,this,id)); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); _timers[id] = WeakTask(pt); } //2.刷新和延迟定时任务: void TimerRefresh(uint64_t id) { //通过保存的定时器对象的weak_ptr构造一个shared_ptr出来,添加到轮子中 auto it = _timers.find(id); if(it == _timers.end()) return; PtrTask pt = it->second.lock(); //lock()获取weak_ptr中管理的对象对应的shared_ptr; int delay = pt->DelayTime(); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); } //3.这个函数应该每秒钟被执行一次,相当于秒钟向后走了一步 void RunTimerTask() { _tick = (_tick + 1) % _capacity; //清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉 _wheel[_tick].clear(); } //4.取消定时任务: void TimerCancel(uint64_t id) { auto it = _timers.find(id); if(it == _timers.end()) return; PtrTask pt = it->second.lock(); if(pt) pt->Cancel(); }};//测试:class Test{public: Test() {std::cout << "构造" << std::endl;} ~Test() {std::cout << "析构" << std::endl;}};void DelTest(Test* t) { delete t;}int main(){ TimerWheel tw; Test* t = new Test(); tw.TimerAdd(111,5,std::bind(DelTest,t)); for(int i = 0; i < 5; ++i) { sleep(1); //刷新定时任务 tw.TimerRefresh(111); //向后移动秒针: tw.RunTimerTask(); printf("刷新了一下定时任务,重新需要%d钟后才会销毁\n",5-i); } // tw.TimerCancel(111); while(1) { std::cout << "---------" << std::endl; sleep(1); tw.RunTimerTask(); } return 0;}
[myl@VM-8-12-centos example]$ ./timewheel 构造刷新了一下定时任务,重新需要5钟后才会销毁刷新了一下定时任务,重新需要4钟后才会销毁刷新了一下定时任务,重新需要3钟后才会销毁刷新了一下定时任务,重新需要2钟后才会销毁刷新了一下定时任务,重新需要1钟后才会销毁------------------------------------析构
正则表达式(regular expression)描述了⼀种字符串匹配的模式(pattern),可以用来检查⼀个串是否含有某种⼦串、将匹配的子串替换或者从某个串中取出符合某个条件的子串等。
正则表达式的使用,可以使得HTTP请求的解析更加简单(这⾥指的时程序员的⼯作变得的简单,这并不代表处理效率会变高,实际上效率上是低于直接的字符串处理的),使我们实现的HTTP组件库使用起来更加灵活。
实例:
#include#include#includeint main(){ std::string str = "/numbers/1234"; //匹配以"/numbers/"为起始,后边跟一个或者多个数字字符的字符串,并且在匹配的过程中 //提取匹配到的这个数字字符串 std::regex e("/numbers/(\\d+)"); std::smatch matches; bool ret = std::regex_match(str,matches,e); if(ret == false) return -1; for(auto& e: matches) std::cout << e << std::endl; return 0;}
[myl@VM-8-12-centos example]$ ./regex /numbers/12341234
#include#include#includeint main(){ //HTTP请求行格式: GET /baidu/login?user=xiaoming&pass=123123 HTTP/1.1\r\n std::string str = "GET /baidu/login?user=xiaoming&pass=123123 HTTP/1.1\r\n"; std::smatch matches; //请求方法的匹配 GET HEAD POST PUT DELETE .... std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?" ,std::regex::icase); //(GET|HEAD|POST|PUT|DELETE):表示匹配任意一个字符串 //([^?]*):[^?]匹配非问号字符,后边的*表示0次或者是多次 //\\?(.*) \\? 表示原始的?字符 (.*)表示提取?之后的任意字符0次或多次,知道遇到空格 //HTTP/1\\.[01] 表示匹配以HTTP/1.开始,后边有个0或1的字符串 //(?:\n|\r\n)? (?: ...) 表示匹配某个格式字符串,但是不提取, 最后的?表示的是匹配前边的表达式0次或1次 bool ret = std::regex_match(str,matches,e); if(ret == false) return -1; for(auto& e: matches) std::cout << e << std::endl; return 0;}
[myl@VM-8-12-centos example]$ ./request GET /baidu/login?user=xiaoming&pass=123123 HTTP/1.1GET/baidu/loginuser=xiaoming&pass=123123HTTP/1.1
每⼀个Connection对连接进⾏管理,最终都不可避免需要涉及到应⽤层协议的处理,因此在
Connection中需要设置协议处理的上下⽂来控制处理节奏。但是应⽤层协议千千万,为了降低耦合
度,这个协议接收解析上下⽂就不能有明显的协议倾向,它可以是任意协议的上下⽂信息,因此就需要⼀个通⽤的类型来保存各种不同的数据结构。
在C语言中,通⽤类型可以使⽤void*来管理,但是在c++中,boost库和C++17给我们提供了⼀个通⽤类型any来灵活使⽤,如果考虑增加代码的移植性,尽量减少第三⽅库的依赖,则可以使⽤C++17特性中的any,或者自己来实现。而这个any通⽤类型类的实现其实并不复杂,以下是简单的部分实现。
1.是一个容器,容器中可以保存各种不同类型的数据
解决方式:
a.模板:
templateclass Any{private: T _content;};
存在的问题:
实例化对象的时候,必须指定容器保存的数据类型:Any
而我们需要的是: Any a; a = 10
b.嵌套一下,设计一个类,专门用于保存其它类型的数据,而Any类保存的是固定类的对象
class Any{private: class hlder {……}; template class placeholder : holder { T _val; }; holder* _content;}
Any类中,保存的是holder类的指针,当Any容器需要保存一个数据的时候,只需要通过palceholder子类实例化一个特定类型的子类对象出来,让子类对象保存数据,采用多态的思想,父类指针指向子类的时候调用子类的方法
Any类模拟实现:
#include #include #include #include #include class Any{private: class holder { public: virtual ~holder() {} virtual const std::type_info& type() = 0; virtual holder* clone() = 0; }; template class placeholder : public holder { public: placeholder(const T& val) : _val(val) {} //获取子类对象保存的数据类型: virtual const std::type_info& type() { return typeid(T); } //针对当前的对象自身,克隆出一个新的子类对象来 virtual holder* clone() { return new placeholder(_val); } public: T _val; }; holder* _content;public: Any(): _content(nullptr) {} template Any(const T& val) : _content(new placeholder(val)) {} Any(const Any& other) : _content(other._content ? other._content->clone() : NULL) {} ~Any() {delete _content;} Any& swap(Any& other) { std::swap(_content,other._content); return *this; } //返回子类对象保存的数据指针 template T* get() { //获取想要的数据类型,必须和保存的数据类型一致 assert(typeid(T) == _content->type()); return &((placeholder*)_content)->_val; } //赋值运算符的重载函数: template Any& operator=(const T& val) { //为val构造一个临时的通用容器,然后与当前容器自身进行指针交换,临时对象释放的时候, //原先保存的数据也就被释放 Any(val).swap(*this); return *this; } Any& operator=(const Any& other) { Any(other).swap(*this); return *this; }};//测试:int main(){ Any a; a = 10; int* pa = a.get(); std::cout << *pa << std::endl; return 0;}
[myl@VM-8-12-centos example]$ ./any 10
#define INF 0#define DBG 1#define ERR 2#define LOG_LEVEL INF#define LOG(level, fORMat, ...) do{\ if (level < LOG_LEVEL) break;\ time_t t = time(NULL);\ struct tm *ltm = localtime(&t);\ char tmp[32] = {0};\ strftime(tmp, 31, "%H:%M:%S", ltm);\ fprintf(stdout, "[%p %s %s:%d] " format "\n",(void*)pthread_self(),tmp, __FILE__, __LINE__, ##__VA_ARGS__);\ }while(0)#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)
提供的功能:存储数据,取出数据
实现思想:
1.实现缓冲区得要有-块空间,采用vector < char> ,vector底层采用的是一块线性的内存空间
2.要素:
a.默认的空间大小
b.当前读取数据位置
c.当前的写入数据位置
3.操作:
a.写入数据:
当前写入位置指向哪里,就从哪里开始写入,如果后续空闲空间不够了,考虑整齐缓冲区空闲空间是否足够
足够:将数据移动到起始位置即呵
不够:扩容,从当前写位开始扩容足够大小.
数据一旦写入成功,当前写位置就要向后偏移
b.读取数据:
当前的读取位置指向哪里,就从哪里开始读取,前提是有数据可读
可读数据大小:当前写入位置减去当前读取位置
实现缓冲区类该如何设计:
class Buffer{private:std:vector_ buffer;//位置,是一个相对偏移量,而不是绝对地址uint64_ t_ _read_ idx; //读位置uint64_ t_ write_ idx; /写位置public:1.获取当前写位置的地址2.确保可写空间足够(移动+扩容)3.获取前沿空闲空间大小4.获取后沿空间空间大小5.将读写位置向后移动指定长度6.获取当前读位置地址7.获取可读数据大小8.将读位置向后移动指定长度9.清理功能};
实例代码:
class Buffer{private: std::vector _buffer; //使用vector进行内存空间管理 uint64_t _read_idx; //读偏移 uint64_t _write_idx; //写偏移public: Buffer() : _read_idx(0),_write_idx(0),_buffer(BUFFER_DEFAULT_SIZE) {} char* Begin() {return &*_buffer.begin();} //获取当前写入起始地址, _buffer的空间起始地址,加上写偏移量 char *WritePosition() { return Begin() + _write_idx; } //获取当前读取起始地址 char *ReadPosition() { return Begin() + _read_idx; } //获取缓冲区末尾空闲空间大小--写偏移之后的空闲空间, 总体空间大小减去写偏移 uint64_t TailIdleSize() { return _buffer.size() - _write_idx; } //获取缓冲区起始空闲空间大小--读偏移之前的空闲空间 uint64_t HeadIdleSize() { return _read_idx; } //获取可读数据大小 = 写偏移 - 读偏移 uint64_t ReadAbleSize() { return _write_idx - _read_idx; } //将读偏移向后移动 void MoveReadOffset(uint64_t len) { if (len == 0) return; //向后移动的大小,必须小于可读数据大小 assert(len <= ReadAbleSize()); _read_idx += len; } //将写偏移向后移动 void MoveWriteOffset(uint64_t len) { //向后移动的大小,必须小于当前后边的空闲空间大小 assert(len <= TailIdleSize()); _write_idx += len; } //确保可写空间足够(整体空闲空间够了就移动数据,否则就扩容) void EnsureWriteSpace(uint64_t len) { //如果末尾空闲空间大小足够,直接返回 if (TailIdleSize() >= len) { return; } //末尾空闲空间不够,则判断加上起始位置的空闲空间大小是否足够, 够了就将数据移动到起始位置 if (len <= TailIdleSize() + HeadIdleSize()) { //将数据移动到起始位置 uint64_t rsz = ReadAbleSize();//把当前数据大小先保存起来 std::copy(ReadPosition(), ReadPosition() + rsz, Begin());//把可读数据拷贝到起始位置 _read_idx = 0; //将读偏移归0 _write_idx = rsz; //将写位置置为可读数据大小, 因为当前的可读数据大小就是写偏移量 }else { //总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可 _buffer.resize(_write_idx + len); } } //写入数据 void Write(const void *data, uint64_t len) { //1. 保证有足够空间,2. 拷贝数据进去 if (len == 0) return; EnsureWriteSpace(len); const char *d = (const char *)data; std::copy(d, d + len, WritePosition()); } void WriteAndPush(const void *data, uint64_t len) { Write(data, len); MoveWriteOffset(len); } void WriteString(const std::string &data) { return Write(data.c_str(), data.size()); } void WriteStringAndPush(const std::string &data) { WriteString(data); MoveWriteOffset(data.size()); } void WriteBuffer(Buffer &data) { return Write(data.ReadPosition(), data.ReadAbleSize()); } void WriteBufferAndPush(Buffer &data) { WriteBuffer(data); MoveWriteOffset(data.ReadAbleSize()); } //读取数据 void Read(void *buf, uint64_t len) { //要求要获取的数据大小必须小于可读数据大小 assert(len <= ReadAbleSize()); std::copy(ReadPosition(), ReadPosition() + len, (char*)buf); } void ReadAndPop(void *buf, uint64_t len) { Read(buf, len); MoveReadOffset(len); } std::string ReadAsString(uint64_t len) { //要求要获取的数据大小必须小于可读数据大小 assert(len <= ReadAbleSize()); std::string str; str.resize(len); Read(&str[0], len); return str; } std::string ReadAsStringAndPop(uint64_t len) { assert(len <= ReadAbleSize()); std::string str = ReadAsString(len); MoveReadOffset(len); return str; } char *FindCRLF() { //找到'\n'的位置 char *res = (char*)memchr(ReadPosition(), '\n', ReadAbleSize()); return res; } std::string GetLine() { char *pos = FindCRLF(); if (pos == NULL) { return ""; } // +1是为了把换行字符也取出来。 return ReadAsString(pos - ReadPosition() + 1); } std::string GetLineAndPop() { std::string str = GetLine(); MoveReadOffset(str.size()); return str; } //清空缓冲区 void Clear() { //只需要将偏移量归0即可 _read_idx = 0; _write_idx = 0; }};
创建套接字
绑定地址信息
开始监听
向服务器发起连接
获取新连接
接受数据
发送数据
关闭套接字
创建一个服务 端连接
创建一个客户端连接
设置套接字选项—开启地址端口用 -> 主动断开连接的一方会进入time wait状态,此时就会出现绑定端口号失败的情况,所以开启地址端口重用,重新绑定端口号
设置套接字阻塞属性--设置为非阻塞
class NetWork {public: NetWork() { DBG_LOG("SIGPIPE INIT"); signal(SIGPIPE, SIG_IGN); }};static NetWork nw;#define MAX_LISTEN 1024class Socket{private: int _sockfd;public: Socket() :_sockfd(-1) {} Socket(int fd) : _sockfd(fd) {} ~Socket() {Close();} //创建套接字 -> int socket(int domain, int type, int protocol) int Fd() {return _sockfd;} bool Create() { _sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (_sockfd < 0) { ERR_LOG("CREATE SOCKET FAILED!!"); return false; } return true; } //绑定地址信息 bool Bind(const std::string &ip, uint16_t port) { struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); socklen_t len = sizeof(struct sockaddr_in); // int bind(int sockfd, struct sockaddr*addr, socklen_t len); int ret = bind(_sockfd, (struct sockaddr*)&addr, len); if (ret < 0) { ERR_LOG("BIND ADDRESS FAILED!"); return false; } return true; } //开始监听 backlog:全连接队列的大小 bool Listen(int backlog = MAX_LISTEN) { // int listen(int backlog) int ret = listen(_sockfd, backlog); if (ret < 0) { ERR_LOG("SOCKET LISTEN FAILED!"); return false; } return true; } //向服务器发起连接 bool Connect(const std::string &ip, uint16_t port) { struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); socklen_t len = sizeof(struct sockaddr_in); // int connect(int sockfd, struct sockaddr*addr, socklen_t len); int ret = connect(_sockfd, (struct sockaddr*)&addr, len); if (ret < 0) { ERR_LOG("CONNECT SERVER FAILED!"); return false; } return true; } //获取新连接 int Accept() { // int accept(int sockfd, struct sockaddr *addr, socklen_t *len); int newfd = accept(_sockfd, NULL, NULL); if (newfd < 0) { ERR_LOG("SOCKET ACCEPT FAILED!"); return -1; } return newfd; } //接收数据 ssize_t Recv(void *buf, size_t len, int flag = 0) { // ssize_t recv(int sockfd, void *buf, size_t len, int flag); ssize_t ret = recv(_sockfd, buf, len, flag); if (ret <= 0) { //EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误 //EINTR 表示当前socket的阻塞等待,被信号打断了, if (errno == EAGAIN || errno == EINTR) { return 0;//表示这次接收没有接收到数据 } ERR_LOG("SOCKET RECV FAILED!!"); return -1; } return ret; //实际接收的数据长度 } ssize_t NonBlockRecv(void *buf, size_t len) { return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞。 } //发送数据 ssize_t Send(const void *buf, size_t len, int flag = 0) { // ssize_t send(int sockfd, void *data, size_t len, int flag); ssize_t ret = send(_sockfd, buf, len, flag); if (ret < 0) { if (errno == EAGAIN || errno == EINTR) { return 0; } ERR_LOG("SOCKET SEND FAILED!!"); return -1; } return ret;//实际发送的数据长度 } ssize_t NonBlockSend(void *buf, size_t len) { if (len == 0) return 0; return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前发送为非阻塞。 } //关闭套接字 void Close() { if (_sockfd != -1) { close(_sockfd); _sockfd = -1; } } //创建一个服务端连接 bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) { //1. 创建套接字,2. 绑定地址,3. 开始监听,4. 设置默认是阻塞, 5. 启动地址重用 if (Create() == false) return false; if (block_flag) NonBlock(); if (Bind(ip, port) == false) return false; if (Listen() == false) return false; ReuseAddress(); return true; } //创建一个客户端连接 bool CreateClient(uint16_t port, const std::string &ip) { //1. 创建套接字,2.指向连接服务器 if (Create() == false) return false; if (Connect(ip, port) == false) return false; return true; } //设置套接字选项---开启地址端口重用 void ReuseAddress() { // int setsockopt(int fd, int leve, int optname, void *val, int vallen) int val = 1; //SO_REUSEADDR:设置地址重用 setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&val, sizeof(int)); val = 1; //SO_REUSEPORT :设置端口号重用 setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&val, sizeof(int)); } //设置套接字阻塞属性-- 设置为非阻塞 void NonBlock() { //int fcntl(int fd, int cmd, ... ); int flag = fcntl(_sockfd, F_GETFL, 0); fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK); }};
目的:对描述符的监控时间管理
功能:
1 .事件管理:
描述符是否可读
描述符是否可写
对描述符监控可读
对描述符监控可写
解除可读事件监控
解除可写事件监控
解除所有事件监控
2.事件出发后的处理的管理
要处理的事件:可读,可写,挂断,错误,任意
事件处理的回调函数成员:
EPOLLIN:可读.
EPOLLOUT:可写
EPOLLRDHUP:连接断开
EPOLLPRI:优先数据
EPOLLERR:出错了
EPOLLHUP:挂断
事件处理,因为有五种事件要处理,就需要五个回调函数
class Poller;class EventLoop;class Channel {private: int _fd; EventLoop *_loop; uint32_t _events; // 当前需要监控的事件 uint32_t _revents; // 当前连接触发的事件 using EventCallback = std::function; EventCallback _read_callback; //可读事件被触发的回调函数 EventCallback _write_callback; //可写事件被触发的回调函数 EventCallback _error_callback; //错误事件被触发的回调函数 EventCallback _close_callback; //连接断开事件被触发的回调函数 EventCallback _event_callback; //任意事件被触发的回调函数public: Channel(EventLoop *loop,int fd):_fd(fd), _events(0), _revents(0),_loop(loop) {} int Fd() { return _fd; } uint32_t Events() { return _events; }//获取想要监控的事件 void SetREvents(uint32_t events) { _revents = events; }//设置实际就绪的事件 void SetReadCallback(const EventCallback &cb) { _read_callback = cb; } void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; } void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; } void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; } void SetEventCallback(const EventCallback &cb) { _event_callback = cb; } //当前是否监控了可读 bool ReadAble() { return (_events & EPOLLIN); } //当前是否监控了可写 bool WriteAble() { return (_events & EPOLLOUT); } //启动读事件监控 void EnableRead() { _events |= EPOLLIN; Update(); } //启动写事件监控 void EnableWrite() { _events |= EPOLLOUT; Update(); } //关闭读事件监控 void DisableRead() { _events &= ~EPOLLIN; Update(); } //关闭写事件监控 void DisableWrite() { _events &= ~EPOLLOUT; Update(); } //关闭所有事件监控 void DisableAll() { _events = 0; Update(); } //移除监控 void Remove(); void Update(); //事件处理,一旦连接触发了事件,就调用这个函数,自己触发了什么事件如何处理自己决定 void HandleEvent() { //EPOLLRDHUP:半连接 EPOLLPRI:带外数据 if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { if (_read_callback) _read_callback(); } if (_revents & EPOLLOUT) { if (_write_callback) _write_callback(); }else if (_revents & EPOLLERR) { if (_error_callback) _error_callback();//一旦出错,就会释放连接,因此要放到前边调用任意回调 }else if (_revents & EPOLLHUP) { if (_close_callback) _close_callback(); } if (_event_callback) _event_callback(); }};
描述符IO事件监控模块
意义:通过epoll实现对描述符的I0事件监控
功能:
1.添加/修改描述符的事件监控(不存在则添加,存在则修改)
2.移除描述符的事件监控
封装思想:
1.必须拥有一个epoll的操作句柄
2.拥有一个struct epoll_ event结构体数组,监控是保存所有的活跃事件
3.使用hash表管理描述符于描述符对应的事件管理Channel对象
逻辑流程:
1.对描述符进行监控,通过Channel才 能知道描述符需要监控什么事件
2.当描述符就绪了,通过描述符在hash表中找到对应的Channel(得到了Channel才能知道什么事件该如何处理)当描述符就绪了,返回就绪描述符对应的Channel
类的设计:
class Poller{private: int_ epfd; struct epoll_ event_ evs[X] std::unordered_ map< int,Channel*>private: 1判断要更新事件的描述符是否存在 2.针对epoll直接操作(添加,修改,删除)public: 1.添加或更行描述符所监控的事件 2.移除描述符的监控 3.开始监控,获取就绪的Channel};
代码实现:
#define MAX_EPOLLEVENTS 1024class Poller {private: int _epfd; struct epoll_event _evs[MAX_EPOLLEVENTS]; std::unordered_map _channels;private: //对epoll的直接操作 void Update(Channel *channel, int op) { // int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev); int fd = channel->Fd(); struct epoll_event ev; ev.data.fd = fd; ev.events = channel->Events(); int ret = epoll_ctl(_epfd, op, fd, &ev); if (ret < 0) { ERR_LOG("EPOLLCTL FAILED!"); } return; } //判断一个Channel是否已经添加了事件监控 bool HasChannel(Channel *channel) { auto it = _channels.find(channel->Fd()); if (it == _channels.end()) { return false; } return true; }public: Poller() { _epfd = epoll_create(MAX_EPOLLEVENTS); if (_epfd < 0) { ERR_LOG("EPOLL CREATE FAILED!!"); abort();//退出程序 } } //添加或修改监控事件 void UpdateEvent(Channel *channel) { bool ret = HasChannel(channel); if (ret == false) { //不存在则添加 _channels.insert(std::make_pair(channel->Fd(), channel)); return Update(channel, EPOLL_CTL_ADD); } return Update(channel, EPOLL_CTL_MOD); } //移除监控 void RemoveEvent(Channel *channel) { auto it = _channels.find(channel->Fd()); if (it != _channels.end()) { _channels.erase(it); } Update(channel, EPOLL_CTL_DEL); } //开始监控,返回活跃连接 void Poll(std::vector *active) { // int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout) int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); if (nfds < 0) { if (errno == EINTR) { return ; } ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno)); abort();//退出程序 } for (int i = 0; i < nfds; i++) { auto it = _channels.find(_evs[i].data.fd); assert(it != _channels.end()); it->second->SetREvents(_evs[i].events);//设置实际就绪的事件 active->push_back(it->second); } return; }};
定时器模块的整合:
timefd:实现内核每隔一段事件,给进程一次超时时间
timewheel:实现每次执行Runtimetask,都可以执行一波到期的定时任务
要实现-个完整的秒级定时器,就需要将这两个功能整合到一起,timefd设置为海秒钟触发一次定时事件, 事件被触发,则运行一次timewheel的runtimetask执行一下所有过期定时任务
代码实现 :
using TaskFunc = std::function;using ReleaseFunc = std::function;class TimerTask{ private: uint64_t _id; // 定时器任务对象ID uint32_t _timeout; //定时任务的超时时间 bool _canceled; // false-表示没有被取消, true-表示被取消 TaskFunc _task_cb; //定时器对象要执行的定时任务 ReleaseFunc _release; //用于删除TimerWheel中保存的定时器对象信息 public: TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb): _id(id), _timeout(delay), _task_cb(cb), _canceled(false) {} ~TimerTask() { if (_canceled == false) _task_cb(); _release(); } void Cancel() { _canceled = true; } void SetRelease(const ReleaseFunc &cb) { _release = cb; } uint32_t DelayTime() { return _timeout; }};class TimerWheel {private: using WeakTask = std::weak_ptr; using PtrTask = std::shared_ptr; int _tick; //当前的秒针,走到哪里释放哪里,释放哪里,就相当于执行哪里的任务 int _capacity; //表盘最大数量---其实就是最大延迟时间 std::vector> _wheel; std::unordered_map _timers; EventLoop *_loop; int _timerfd;//定时器描述符--可读事件回调就是读取计数器,执行定时任务 std::unique_ptr _timer_channel;private: void RemoveTimer(uint64_t id) { auto it = _timers.find(id); if (it != _timers.end()) { _timers.erase(it); } } static int CreateTimerfd() { int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); if (timerfd < 0) { ERR_LOG("TIMERFD CREATE FAILED!"); abort(); } //int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old); struct itimerspec itime; itime.it_value.tv_sec = 1; itime.it_value.tv_nsec = 0;//第一次超时时间为1s后 itime.it_interval.tv_sec = 1; itime.it_interval.tv_nsec = 0; //第一次超时后,每次超时的间隔时 timerfd_settime(timerfd, 0, &itime, NULL); return timerfd; } int ReadTimefd() { uint64_t times; //有可能因为其他描述符的事件处理花费事件比较长,然后在处理定时器描述符事件的时候,有可能就已经超时了很多次 //read读取到的数据times就是从上一次read之后超时的次数 int ret = read(_timerfd, ×, 8); if (ret < 0) { ERR_LOG("READ TIMEFD FAILED!"); abort(); } return times; } //这个函数应该每秒钟被执行一次,相当于秒针向后走了一步 void RunTimerTask() { _tick = (_tick + 1) % _capacity; _wheel[_tick].clear();//清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉 } void OnTime() { //根据实际超时的次数,执行对应的超时任务 int times = ReadTimefd(); for (int i = 0; i < times; i++) { RunTimerTask(); } } void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) { PtrTask pt(new TimerTask(id, delay, cb)); pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id)); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); _timers[id] = WeakTask(pt); } void TimerRefreshInLoop(uint64_t id) { //通过保存的定时器对象的weak_ptr构造一个shared_ptr出来,添加到轮子中 auto it = _timers.find(id); if (it == _timers.end()) { return;//没找着定时任务,没法刷新,没法延迟 } PtrTask pt = it->second.lock();//lock获取weak_ptr管理的对象对应的shared_ptr int delay = pt->DelayTime(); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); } void TimerCancelInLoop(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return;//没找着定时任务,没法刷新,没法延迟 } PtrTask pt = it->second.lock(); if (pt) pt->Cancel(); }public: TimerWheel(EventLoop *loop):_capacity(60), _tick(0), _wheel(_capacity), _loop(loop), _timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd)) { _timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this)); _timer_channel->EnableRead();//启动读事件监控 } void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb); //刷新/延迟定时任务 void TimerRefresh(uint64_t id); void TimerCancel(uint64_t id); bool HasTimer(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return false; } return true; }};
eventfd:一种事件通知机制
创建一个描述符用于实现事件通知
eventfd本质在内核离边管理的就是一个计数器
创建eventfd就会在内核中创建一个计数器结构
每当向eventfd中写入一个数值—用于表示事件通知次数
可以使用read进行数据的读取,读取到的数据就是通知的次数
假设每次给eventfd中写入一个1,就表示通知了一次,连续写了三次之后,再去read读取出来的数字就是3,读取之后计数清0
用处:在EventLoop模块中实现线程间通知机制
#include < sys/eventfd.h>
int eventfd(unsigned int initval,int flags);
功能:创建一个eventfd对象,实现事件通知
参数:
initval :计数初值
flags:
EFD_CLOEXEC—禁止进程复制
EFD_NOBLOACK—启动非阻塞属性
返回值:返回一个文件描述符用于操作
eventfd也是通过read/write/close进行操作的
注意点: read&write进行IO的时候数据只能是一个8字节的数据
代码演示:
#include #include #include #include #include int main(){ int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (efd < 0) { perror("eventfd failed!!"); return -1; } uint64_t val = 1; write(efd, &val, sizeof(val)); write(efd, &val, sizeof(val)); write(efd, &val, sizeof(val)); uint64_t res = 0; read(efd, &res, sizeof(res)); printf("%ld\n", res); close(efd); return 0;}
运行截图:
进行事件监控,以及事件处理的模块
关键点:这个模块与线程是一一对应关联的
监控了一个连接,而这个连接-旦就绪,就要进行事件处理。
但是如果这个描述符,在多个线程中都触发了事件,进行处理,就会存在线程安全的问题
因此我们需要将-个连接的事件监控,以及连接事件处理,以及其它操作都放在同一个线程中进行
如何保证一个连接的所有操作都在EventLopp对应的线程中:
解决方案:给EventLoop模块中,添加一个任务队列,对连接的所有操作,都进行-次封装,将对连接
的操作并不直接执行,而是当作任务添加到任务队列中
EventLoop处理流程:
1.在线程中对描述符进行事件监控
2.有描述符就绪则对描述符进行事件处理(如何玩保证处理回调函数中的操作都在线程中)
3.所有的就绪事件处理完了,这时候再去将任务队列中的任务执行
1.事件监控
使用Poller模块
有事件就绪则进行事件处理
2.执行任务队列中的任务
一个线程安全的任务队列
注意点:
因为有可能因为等待描述符I0事件就绪,导致执行流流程阻塞,这时候任务队列中的任务将得不到执行因此得有一个事件通知的东西,能够唤醒事件监控的阻塞
class EventLoop {private: using Functor = std::function; std::thread::id _thread_id;//线程ID int _event_fd;//eventfd唤醒IO事件监控有可能导致的阻塞 std::unique_ptr _event_channel; Poller _poller;//进行所有描述符的事件监控 std::vector _tasks;//任务池 std::mutex _mutex;//实现任务池操作的线程安全 TimerWheel _timer_wheel;//定时器模块public: //执行任务池中的所有任务 void RunAllTask() { std::vector functor; { std::unique_lock _lock(_mutex); _tasks.swap(functor); } for (auto &f : functor) { f(); } return ; } static int CreateEventFd() { int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (efd < 0) { ERR_LOG("CREATE EVENTFD FAILED!!"); abort();//让程序异常退出 } return efd; } void ReadEventfd() { uint64_t res = 0; int ret = read(_event_fd, &res, sizeof(res)); if (ret < 0) { //EINTR -- 被信号打断; EAGAIN -- 表示无数据可读 if (errno == EINTR || errno == EAGAIN) { return; } ERR_LOG("READ EVENTFD FAILED!"); abort(); } return ; } void WeakUpEventFd() { uint64_t val = 1; int ret = write(_event_fd, &val, sizeof(val)); if (ret < 0) { if (errno == EINTR) { return; } ERR_LOG("READ EVENTFD FAILED!"); abort(); } return ; }public: EventLoop():_thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(new Channel(this, _event_fd)), _timer_wheel(this) { //给eventfd添加可读事件回调函数,读取eventfd事件通知次数 _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this)); //启动eventfd的读事件监控 _event_channel->EnableRead(); } //三步走--事件监控-》就绪事件处理-》执行任务 void Start() { while(1) { //1. 事件监控, std::vector actives; _poller.Poll(&actives); //2. 事件处理。 for (auto &channel : actives) { channel->HandleEvent(); } //3. 执行任务 RunAllTask(); } } //用于判断当前线程是否是EventLoop对应的线程; bool IsInLoop() { return (_thread_id == std::this_thread::get_id()); } void AssertInLoop() { assert(_thread_id == std::this_thread::get_id()); } //判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。 void RunInLoop(const Functor &cb) { if (IsInLoop()) { return cb(); } return QueueInLoop(cb); } //将操作压入任务池 void QueueInLoop(const Functor &cb) { { std::unique_lock _lock(_mutex); _tasks.push_back(cb); } //唤醒有可能因为没有事件就绪,而导致的epoll阻塞; //其实就是给eventfd写入一个数据,eventfd就会触发可读事件 WeakUpEventFd(); } //添加/修改描述符的事件监控 void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); } //移除描述符的监控 void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); } void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); } void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); } void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); } bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }};void Channel::Remove() { return _loop->RemoveEvent(this); }void Channel::Update() { return _loop->UpdateEvent(this); }void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { _loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));}//刷新/延迟定时任务void TimerWheel::TimerRefresh(uint64_t id) { _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));}void TimerWheel::TimerCancel(uint64_t id) { _loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));}
EventLoop模块与线程是对应的
EventLoop模块实例化的对象,在构造的时候就会初始化_ _thread_ id
后边当运行一个操作的时候判断当前是否运行在EventLoop模块对应的线程中,就是将线程ID与EventLoop模块中的thread_ id进行比较,相同就表示在同一个线程,不同就表示当前运行线程并不是EventLoop线程
含义: EventLoop模块在实例化对象的时候,必须在线程内部
EventLoop实例化对象时会设置自己的thread_ id,如果我们先创建了多个EventLoop对象,然后创建了多个线程,将各个线程的id,重新给EventLoop进行设置
存在的问题:在构造EventLoop对象, 到设置新的thread id期间将是不可控的
因此我们必先创建线程,人后在线程的入口函数中,实例化EventLoop对象
构造一个新的模块: LoopThread
这个模块的功能:将EventLoop 与thread整合到一起
思想:
1.创建线程
2.在线程中实例化EventLoop对象
功能:可以向外部返回所实例化的EventLoop
代码编写
class LoopThread {private: std::mutex _mutex; // 互斥锁 std::condition_variable _cond; // 条件变量 EventLoop *_loop; // EventLoop指针变量,这个对象需要在线程内实例化 std::thread _thread; // EventLoop对应的线程private: void ThreadEntry() { EventLoop loop; { std::unique_lock lock(_mutex);//加锁 _loop = &loop; _cond.notify_all(); } loop.Start(); }public: LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {} EventLoop *GetLoop() { EventLoop *loop = NULL; { std::unique_lock lock(_mutex);//加锁 _cond.wait(lock, [&](){ return _loop != NULL; });//loop为NULL就一直阻塞 loop = _loop; } return loop; }};
针对LoopThread设计一个线程池:
LoopThreadPoll模块:对所有的LoopThread进行管理及分配
功能:
1.线程数量可配置(0个或者多个)
注意事项:在服务器中,主从Reactor模型是主线程只负责新连接获取,从属线程负责新连接的事件监控及处理因此当前的线程池,有可能从属线程会数量为0,也就是实现单Reactor服务器, 一个线程既负责获取新连接负责连接的处理
2.对所有的线程进行管理,其实就是管理0个或多个LoopThread对象
3.提供线程分配的功能
当主线程获取了一个新连接,需要将新连接挂到从属线程上进行事件监控及处理
假设有0个从属线程,则直接分配给主线程的EventLoop, 进行处理
假设有多个从属线程,则采用RR轮转思想,进行线程的分配(将对应线程的EventLoop获取到,设置给对应的Connection)
class LoopThreadPool {private: int _thread_count; //从属线程的数量 int _next_idx; EventLoop *_baseloop; std::vector _threads; std::vector _loops;public: LoopThreadPool(EventLoop *baseloop):_thread_count(0), _next_idx(0), _baseloop(baseloop) {} void SetThreadCount(int count) { _thread_count = count; } void Create() { if (_thread_count > 0) { _threads.resize(_thread_count); _loops.resize(_thread_count); for (int i = 0; i < _thread_count; i++) { _threads[i] = new LoopThread(); _loops[i] = _threads[i]->GetLoop(); } } return ; } EventLoop *NextLoop() { if (_thread_count == 0) { return _baseloop; } _next_idx = (_next_idx + 1) % _thread_count; return _loops[_next_idx]; }};
目的:对连接进行全方位的管理,对通信连接的所有操作都是通过这个模块提供的功能完成的
管理:
1.套接字的管理,能够进行套接字的操作
2.连接事件的管理,可读,可写,错误,挂断,任意
3.缓冲区的管理,便于socket数据的接受和发送
4.协议吓文的管理,记录请求数据的处理过程
5.回调函数的管理
因为连接接受到数据之后该如何处理,要由用户决定,因此必须有业务处理回调函数
一个连接建立成功后,该如何处理,用户决定,因此必须有关闭连接回调函数
任意事件的产生,有没有某些处理,由户决定,因此必须有任意事件的回调函数
功能:
1.发送数据--给用户提供的发送数据接口,钚是真正的发送接口,而只是把数据放到发送缓冲区,然后启动写事件监控
2.关闭连接--给用户提供关闭连接接口,应该在实际释放连接之前,看看输入输出缓冲区否有数据处理
3.启动非活跃连接的超时销毁功能
4.取消非活跃连接的超时功能
5.协议切换--一个连接接受数据后如何进行业务处理,取决于上下文,以及数据的业务处理回调函数Connection模块是对连接的管理模块,对于连接的所有操作都是通过这个模块完成的
场景:对连接进行操作的时候,但是连接已经释放,导致内存访问错误,最终程序崩溃
解决方案:使用智能指针shared_ ptr对Connection对象进行管理,这样就能保证任意地方对
Connection对象进行操作的时候,保存了一份shared_ ptr, 因此就算其它地方进行释放操作
也只是对shared_ ptr的计数器-1,而不会导致Connection的实际释放
class Any{private: class holder { public: virtual ~holder() {} virtual const std::type_info& type() = 0; virtual holder *clone() = 0; }; template class placeholder: public holder { public: placeholder(const T &val): _val(val) {} // 获取子类对象保存的数据类型 virtual const std::type_info& type() { return typeid(T); } // 针对当前的对象自身,克隆出一个新的子类对象 virtual holder *clone() { return new placeholder(_val); } public: T _val; }; holder *_content;public: Any():_content(NULL) {} template Any(const T &val):_content(new placeholder(val)) {} Any(const Any &other):_content(other._content ? other._content->clone() : NULL) {} ~Any() { delete _content; } Any &swap(Any &other) { std::swap(_content, other._content); return *this; } // 返回子类对象保存的数据的指针 template T *get() { //想要获取的数据类型,必须和保存的数据类型一致 assert(typeid(T) == _content->type()); return &((placeholder*)_content)->_val; } //赋值运算符的重载函数 template Any& operator=(const T &val) { //为val构造一个临时的通用容器,然后与当前容器自身进行指针交换,临时对象释放的时候,原先保存的数据也就被释放 Any(val).swap(*this); return *this; } Any& operator=(const Any &other) { Any(other).swap(*this); return *this; }};class Connection;//DISCONECTED -- 连接关闭状态; CONNECTING -- 连接建立成功-待处理状态//CONNECTED -- 连接建立完成,各种设置已完成,可以通信的状态; DISCONNECTING -- 待关闭状态typedef enum { DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING}ConnStatu;using PtrConnection = std::shared_ptr;class Connection : public std::enable_shared_from_this {private: uint64_t _conn_id; // 连接的唯一ID,便于连接的管理和查找 //uint64_t _timer_id; //定时器ID,必须是唯一的,这块为了简化操作使用conn_id作为定时器ID int _sockfd; // 连接关联的文件描述符 bool _enable_inactive_release; // 连接是否启动非活跃销毁的判断标志,默认为false EventLoop *_loop; // 连接所关联的一个EventLoop ConnStatu _statu; // 连接状态 Socket _socket; // 套接字操作管理 Channel _channel; // 连接的事件管理 Buffer _in_buffer; // 输入缓冲区---存放从socket中读取到的数据 Buffer _out_buffer; // 输出缓冲区---存放要发送给对端的数据 Any _context; // 请求的接收处理上下文 using ConnectedCallback = std::function; using MessageCallback = std::function; using ClosedCallback = std::function; using AnyEventCallback = std::function; ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback; ClosedCallback _server_closed_callback;private: //描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callback void HandleRead() { //1. 接收socket的数据,放到缓冲区 char buf[65536]; ssize_t ret = _socket.NonBlockRecv(buf, 65535); if (ret < 0) { //出错了,不能直接关闭连接 return ShutdownInLoop(); } //这里的等于0表示的是没有读取到数据,而并不是连接断开了,连接断开返回的是-1 //将数据放入输入缓冲区,写入之后顺便将写偏移向后移动 _in_buffer.WriteAndPush(buf, ret); //2. 调用message_callback进行业务处理 if (_in_buffer.ReadAbleSize() > 0) { //shared_from_this--从当前对象自身获取自身的shared_ptr管理对象 return _message_callback(shared_from_this(), &_in_buffer); } } //描述符可写事件触发后调用的函数,将发送缓冲区中的数据进行发送 void HandleWrite() { //_out_buffer中保存的数据就是要发送的数据 ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize()); if (ret < 0) { //发送错误就该关闭连接了, if (_in_buffer.ReadAbleSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); } return Release();//这时候就是实际的关闭释放操作了。 } _out_buffer.MoveReadOffset(ret);//千万不要忘了,将读偏移向后移动 if (_out_buffer.ReadAbleSize() == 0) { _channel.DisableWrite();// 没有数据待发送了,关闭写事件监控 //如果当前是连接待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放 if (_statu == DISCONNECTING) { return Release(); } } return; } //描述符触发挂断事件 void HandleClose() { if (_in_buffer.ReadAbleSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); } return Release(); } //描述符触发出错事件 void HandleError() { return HandleClose(); } //描述符触发任意事件: 1. 刷新连接的活跃度--延迟定时销毁任务; 2. 调用组件使用者的任意事件回调 void HandleEvent() { if (_enable_inactive_release == true) { _loop->TimerRefresh(_conn_id); } if (_event_callback) { _event_callback(shared_from_this()); } } //连接获取之后,所处的状态下要进行各种设置(启动读监控,调用回调函数) void EstablishedInLoop() { // 1. 修改连接状态; 2. 启动读事件监控; 3. 调用回调函数 assert(_statu == CONNECTING);//当前的状态必须一定是上层的半连接状态 _statu = CONNECTED;//当前函数执行完毕,则连接进入已完成连接状态 // 一旦启动读事件监控就有可能会立即触发读事件,如果这时候启动了非活跃连接销毁 _channel.EnableRead(); if (_connected_callback) _connected_callback(shared_from_this()); } //这个接口才是实际的释放接口 void ReleaseInLoop() { //1. 修改连接状态,将其置为DISCONNECTED _statu = DISCONNECTED; //2. 移除连接的事件监控 _channel.Remove(); //3. 关闭描述符 _socket.Close(); //4. 如果当前定时器队列中还有定时销毁任务,则取消任务 if (_loop->HasTimer(_conn_id)) CancelInactiveReleaseInLoop(); //5. 调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,再去处理会出错,因此先调用用户的回调函数 if (_closed_callback) _closed_callback(shared_from_this()); //移除服务器内部管理的连接信息 if (_server_closed_callback) _server_closed_callback(shared_from_this()); } //这个接口并不是实际的发送接口,而只是把数据放到了发送缓冲区,启动了可写事件监控 void SendInLoop(Buffer &buf) { if (_statu == DISCONNECTED) return ; _out_buffer.WriteBufferAndPush(buf); if (_channel.WriteAble() == false) { _channel.EnableWrite(); } } //这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送 void ShutdownInLoop() { _statu = DISCONNECTING;// 设置连接为半关闭状态 if (_in_buffer.ReadAbleSize() > 0) { if (_message_callback) _message_callback(shared_from_this(), &_in_buffer); } //要么就是写入数据的时候出错关闭,要么就是没有待发送数据,直接关闭 if (_out_buffer.ReadAbleSize() > 0) { if (_channel.WriteAble() == false) { _channel.EnableWrite(); } } if (_out_buffer.ReadAbleSize() == 0) { Release(); } } //启动非活跃连接超时释放规则 void EnableInactiveReleaseInLoop(int sec) { //1. 将判断标志 _enable_inactive_release 置为true _enable_inactive_release = true; //2. 如果当前定时销毁任务已经存在,那就刷新延迟一下即可 if (_loop->HasTimer(_conn_id)) { return _loop->TimerRefresh(_conn_id); } //3. 如果不存在定时销毁任务,则新增 _loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this)); } void CancelInactiveReleaseInLoop() { _enable_inactive_release = false; if (_loop->HasTimer(_conn_id)) { _loop->TimerCancel(_conn_id); } } void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) { _context = context; _connected_callback = conn; _message_callback = msg; _closed_callback = closed; _event_callback = event; }public: Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd), _enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd), _channel(loop, _sockfd) { _channel.SetCloseCallback(std::bind(&Connection::HandleClose, this)); _channel.SetEventCallback(std::bind(&Connection::HandleEvent, this)); _channel.SetReadCallback(std::bind(&Connection::HandleRead, this)); _channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this)); _channel.SetErrorCallback(std::bind(&Connection::HandleError, this)); } ~Connection() { DBG_LOG("RELEASE CONNECTION:%p", this); } //获取管理的文件描述符 int Fd() { return _sockfd; } //获取连接ID int Id() { return _conn_id; } //是否处于CONNECTED状态 bool Connected() { return (_statu == CONNECTED); } //设置上下文--连接建立完成时进行调用 void SetContext(const Any &context) { _context = context; } //获取上下文,返回的是指针 Any *GetContext() { return &_context; } void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; } void SetSrvClosedCallback(const ClosedCallback&cb) { _server_closed_callback = cb; } //连接建立就绪后,进行channel回调设置,启动读监控,调用_connected_callback void Established() { _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this)); } //发送数据,将数据放到发送缓冲区,启动写事件监控 void Send(const char *data, size_t len) { //外界传入的data,可能是个临时的空间,我们现在只是把发送操作压入了任务池,有可能并没有被立即执行 //因此有可能执行的时候,data指向的空间有可能已经被释放了。 Buffer buf; buf.WriteAndPush(data, len); _loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf))); } //提供给组件使用者的关闭接口--并不实际关闭,需要判断有没有数据待处理 void Shutdown() { _loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this)); } void Release() { _loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this)); } //启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务 void EnableInactiveRelease(int sec) { _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec)); } //取消非活跃销毁 void CancelInactiveRelease() { _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this)); } //切换协议---重置上下文以及阶段性回调处理函数 -- 而是这个接口必须在EventLoop线程中立即执行 //防备新的事件触发后,处理的时候,切换任务还没有被执行--会导致数据使用原协议处理了。 void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) { _loop->AssertInLoop(); _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event)); }};
Acceptor模块:对监听套接字进行管理
1.创建一个监听套接字
2.启动读事件监控
3.事件触发后,获取新连接
4.调用新连接获取成功后的回调函数
为新连接创建Connection进行管理(这一步怀是 Acceptor模块操作,应该是服务器模块)
因为Acceptor模块只进行监听连接的管理,因此获取到新连接的描述符之后,对于新连接描述符如何处理其实并不关心,对于新连接如何处理,应该是服务器模块来管理的
服务器模块,实现了一个对于新连接描述符处理的函数,将这个函数设置给Acceptor模块中的回调函数
class Acceptor {private: Socket _socket;//用于创建监听套接字 EventLoop *_loop; //用于对监听套接字进行事件监控 Channel _channel; //用于对监听套接字进行事件管理 using AcceptCallback = std::function; AcceptCallback _accept_callback;private: void HandleRead() { int newfd = _socket.Accept(); if (newfd < 0) { return ; } if (_accept_callback) _accept_callback(newfd); } int CreateServer(int port) { bool ret = _socket.CreateServer(port); assert(ret == true); return _socket.Fd(); }public: Acceptor(EventLoop *loop, int port): _socket(CreateServer(port)), _loop(loop), _channel(loop, _socket.Fd()) { _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this)); } void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; } void Listen() { _channel.EnableRead(); }};
对所有模块的整合,通过TcpServer模块实例化对象,可以非常简单的完成一个服务器的搭建
管理:
1.Acceptor对象,创建一个监听套接字
2.EventLoop对象,baseloop对象,实现对监听套接字的事件监控
3.std::unordered_ map_ conns,实现对所有新建连接的管理
4.LoopThreadPool对象,创建loop线程池, 对新建连接进行事件监控及处理
功能:
1.设置从属线程池数量
2.启动服务器
3.设置各种回调函数(连接建立完成,消息,关闭,任意),用户设置给TcpServer,TcpServer设 置给获取的新连接
4.是否启动非活跃连接超时销毁功能
5.添加定时任务功能
流程:
1.在TcpServer中实例化一个Acceptor对象, 以及-个EventLoop对象(baseloop)
2.将Acceptor挂到baseloop上进行事件监控
3.一旦Acceptor对象就绪了可读事件,则执行读事件回到函数获取新建连接
4.对新连接,创建一个Connection进行管理
5.对连接对应的Connecton设置功能回调(连接完成回调,消息回调,关闭回调,任意事件回调)6.启动Connection的非活跃连接的超时销毁规则
7.将新连接对应的Connection挂到L oopThreadPool中的从属线程对应的EventLoop中进行事件监控
8.一旦Connection对应的连接就绪了可读事件,则这个时候执行事件回调函数,读取数据,读取完毕
后调用TcpServer设置的消息回调
class TcpServer {private: uint64_t _next_id; //这是一个自动增长的连接ID, int _port; int _timeout; //这是非活跃连接的统计时间---多长时间无通信就是非活跃连接 bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志 EventLoop _baseloop; //这是主线程的EventLoop对象,负责监听事件的处理 Acceptor _acceptor; //这是监听套接字的管理对象 LoopThreadPool _pool; //这是从属EventLoop线程池 std::unordered_map _conns;//保存管理所有连接对应的shared_ptr对象 using ConnectedCallback = std::function; using MessageCallback = std::function; using ClosedCallback = std::function; using AnyEventCallback = std::function; using Functor = std::function; ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback;private: void RunAfterInLoop(const Functor &task, int delay) { _next_id++; _baseloop.TimerAdd(_next_id, delay, task); } //为新连接构造一个Connection进行管理 void NewConnection(int fd) { _next_id++; PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd)); conn->SetMessageCallback(_message_callback); conn->SetClosedCallback(_closed_callback); conn->SetConnectedCallback(_connected_callback); conn->SetAnyEventCallback(_event_callback); conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1)); if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);//启动非活跃超时销毁 conn->Established();//就绪初始化 _conns.insert(std::make_pair(_next_id, conn)); } void RemoveConnectionInLoop(const PtrConnection &conn) { int id = conn->Id(); auto it = _conns.find(id); if (it != _conns.end()) { _conns.erase(it); } } //从管理Connection的_conns中移除连接信息 void RemoveConnection(const PtrConnection &conn) { _baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn)); }public: TcpServer(int port): _port(port), _next_id(0), _enable_inactive_release(false), _acceptor(&_baseloop, port), _pool(&_baseloop) { _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1)); _acceptor.Listen();//将监听套接字挂到baseloop上 } void SetThreadCount(int count) { return _pool.SetThreadCount(count); } void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; } void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; } //用于添加一个定时任务 void RunAfter(const Functor &task, int delay) { _baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay)); } void Start() { _pool.Create(); _baseloop.Start(); }};
#include "../source/server.hpp"class EchoServer{private: TcpServer _server;private: void OnConnected(const PtrConnection &conn) { DBG_LOG("NEW CONNECTION:%p", conn.get()); } void OnClosed(const PtrConnection &conn) { DBG_LOG("CLOSE CONNECTION:%p", conn.get()); } void OnMessage(const PtrConnection &conn, Buffer *buf) { conn->Send(buf->ReadPosition(), buf->ReadAbleSize()); buf->MoveReadOffset(buf->ReadAbleSize()); conn->Shutdown(); }public: EchoServer(int port) : _server(port) { _server.SetThreadCount(2); _server.EnableInactiveRelease(10); _server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1)); _server.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1)); _server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2)); } void Start() { _server.Start(); }};
#include "echo.hpp"int main(){ EchoServer server(8081); server.Start(); return 0;}
std::unordered_map _statu_msg = { {100, "Continue"}, {101, "Switching Protocol"}, {102, "Processing"}, {103, "Early Hints"}, {200, "OK"}, {201, "Created"}, {202, "Accepted"}, {203, "Non-Authoritative Information"}, {204, "No Content"}, {205, "Reset Content"}, {206, "Partial Content"}, {207, "Multi-Status"}, {208, "Already Reported"}, {226, "IM Used"}, {300, "Multiple Choice"}, {301, "Moved Permanently"}, {302, "Found"}, {303, "See Other"}, {304, "Not Modified"}, {305, "Use Proxy"}, {306, "unused"}, {307, "Temporary Redirect"}, {308, "Permanent Redirect"}, {400, "Bad Request"}, {401, "Unauthorized"}, {402, "Payment Required"}, {403, "Forbidden"}, {404, "Not Found"}, {405, "Method Not Allowed"}, {406, "Not Acceptable"}, {407, "Proxy Authentication Required"}, {408, "Request Timeout"}, {409, "Conflict"}, {410, "Gone"}, {411, "Length Required"}, {412, "Precondition Failed"}, {413, "Payload Too Large"}, {414, "URI Too Long"}, {415, "Unsupported Media Type"}, {416, "Range Not Satisfiable"}, {417, "Expectation Failed"}, {418, "I'm a teapot"}, {421, "Misdirected Request"}, {422, "Unprocessable Entity"}, {423, "Locked"}, {424, "Failed Dependency"}, {425, "Too Early"}, {426, "Upgrade Required"}, {428, "Precondition Required"}, {429, "Too Many Requests"}, {431, "Request Header Fields Too Large"}, {451, "Unavailable For Legal Reasons"}, {501, "Not Implemented"}, {502, "Bad Gateway"}, {503, "Service Unavailable"}, {504, "Gateway Timeout"}, {505, "HTTP Version Not Supported"}, {506, "Variant Also Negotiates"}, {507, "Insufficient Storage"}, {508, "Loop Detected"}, {510, "Not Extended"}, {511, "Network Authentication Required"}};std::unordered_map _mime_msg = { {".aac", "audio/aac"}, {".abw", "application/x-abiWord"}, {".arc", "application/x-freearc"}, {".avi", "video/x-msvideo"}, {".azw", "application/vnd.amazon.ebook"}, {".bin", "application/octet-stream"}, {".bmp", "image/bmp"}, {".bz", "application/x-bzip"}, {".bz2", "application/x-bzip2"}, {".csh", "application/x-csh"}, {".CSS", "text/css"}, {".csv", "text/csv"}, {".doc", "application/msword"}, {".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"}, {".eot", "application/vnd.ms-fontobject"}, {".epub", "application/epub+zip"}, {".gif", "image/gif"}, {".htm", "text/html"}, {".html", "text/html"}, {".ico", "image/vnd.microsoft.icon"}, {".ics", "text/calendar"}, {".jar", "application/java-arcHive"}, {".jpeg", "image/jpeg"}, {".jpg", "image/jpeg"}, {".js", "text/javascript"}, {".JSON", "application/json"}, {".jsonld", "application/ld+json"}, {".mid", "audio/midi"}, {".midi", "audio/x-midi"}, {".mjs", "text/javascript"}, {".mp3", "audio/mpeg"}, {".mpeg", "video/mpeg"}, {".mpkg", "application/vnd.apple.installer+xml"}, {".odp", "application/vnd.oasis.opendocument.presentation"}, {".ods", "application/vnd.oasis.opendocument.spreadsheet"}, {".odt", "application/vnd.oasis.opendocument.text"}, {".oga", "audio/ogg"}, {".ogv", "video/ogg"}, {".ogx", "application/ogg"}, {".otf", "font/otf"}, {".png", "image/png"}, {".pdf", "application/pdf"}, {".ppt", "application/vnd.ms-powerpoint"}, {".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"}, {".rar", "application/x-rar-compressed"}, {".rtf", "application/rtf"}, {".sh", "application/x-sh"}, {".svg", "image/svg+xml"}, {".swf", "application/x-shockwave-flash"}, {".tar", "application/x-tar"}, {".tif", "image/tiff"}, {".tiff", "image/tiff"}, {".ttf", "font/ttf"}, {".txt", "text/plain"}, {".vsd", "application/vnd.visio"}, {".wav", "audio/wav"}, {".WEBa", "audio/webm"}, {".webm", "video/webm"}, {".webp", "image/webp"}, {".woff", "font/woff"}, {".woff2", "font/woff2"}, {".xhtml", "application/xhtml+xml"}, {".xls", "application/vnd.ms-excel"}, {".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"}, {".xml", "application/xml"}, {".xul", "application/vnd.mozilla.xul+xml"}, {".zip", "application/zip"}, {".3gp", "video/3gpp"}, {".3g2", "video/3gpp2"}, {".7z", "application/x-7z-compressed"}};class Util{public: // 字符串分割函数,将src字符串按照sep字符进行分割,得到的各个字串放到arry中,最终返回字串的数量 static size_t Split(const std::string &src, const std::string &sep, std::vector *arry) { size_t offset = 0; // 有10个字符,offset是查找的起始位置,范围应该是0~9,offset==10就代表已经越界了 while (offset < src.size()) { size_t pos = src.find(sep, offset); // 在src字符串偏移量offset处,开始向后查找sep字符/字串,返回查找到的位置 if (pos == std::string::npos) { // 没有找到特定的字符 // 将剩余的部分当作一个字串,放入arry中 if (pos == src.size()) break; arry->push_back(src.substr(offset)); return arry->size(); } if (pos == offset) { offset = pos + sep.size(); continue; // 当前字串是一个空的,没有内容 } arry->push_back(src.substr(offset, pos - offset)); offset = pos + sep.size(); } return arry->size(); } // 读取文件的所有内容,将读取的内容放到一个Buffer中 static bool ReadFile(const std::string &filename, std::string *buf) { std::ifstream ifs(filename, std::ios::binary); if (ifs.is_open() == false) { printf("OPEN %s FILE FAILED!!", filename.c_str()); return false; } size_t fsize = 0; ifs.seekg(0, ifs.end); // 跳转读写位置到末尾 fsize = ifs.tellg(); // 获取当前读写位置相对于起始位置的偏移量,从末尾偏移刚好就是文件大小 ifs.seekg(0, ifs.beg); // 跳转到起始位置 buf->resize(fsize); // 开辟文件大小的空间 ifs.read(&(*buf)[0], fsize); if (ifs.good() == false) { printf("READ %s FILE FAILED!!", filename.c_str()); ifs.close(); return false; } ifs.close(); return true; } // 向文件写入数据 static bool WriteFile(const std::string &filename, const std::string &buf) { std::ofstream ofs(filename, std::ios::binary | std::ios::trunc); if (ofs.is_open() == false) { printf("OPEN %s FILE FAILED!!", filename.c_str()); return false; } ofs.write(buf.c_str(), buf.size()); if (ofs.good() == false) { ERR_LOG("WRITE %s FILE FAILED!", filename.c_str()); ofs.close(); return false; } ofs.close(); return true; } // URL编码,避免URL中资源路径与查询字符串中的特殊字符与HTTP请求中特殊字符产生歧义 // 编码格式:将特殊字符的ascii值,转换为两个16进制字符,前缀% C++ -> C%2B%2B // 不编码的特殊字符: RFC3986文档规定 . - _ ~ 字母,数字属于绝对不编码字符 // RFC3986文档规定,编码格式 %HH // W3C标准中规定,查询字符串中的空格,需要编码为+, 解码则是+转空格 static std::string UrlEncode(const std::string url, bool convert_space_to_plus) { std::string res; for (auto &c : url) { if (c == '.' || c == '-' || c == '_' || c == '~' || isalnum(c)) { res += c; continue; } if (c == ' ' && convert_space_to_plus == true) { res += '+'; continue; } // 剩下的字符都是需要编码成为 %HH 格式 char tmp[4] = {0}; // snprintf 与 printf比较类似,都是格式化字符串,只不过一个是打印,一个是放到一块空间中 snprintf(tmp, 4, "%%%02X", c); res += tmp; } return res; } static char HEXTOI(char c) { if (c >= '0' && c <= '9') { return c - '0'; } else if (c >= 'a' && c <= 'z') { return c - 'a' + 10; } else if (c >= 'A' && c <= 'Z') { return c - 'A' + 10; } return -1; } static std::string UrlDecode(const std::string url, bool convert_plus_to_space) { // 遇到了%,则将紧随其后的2个字符,转换为数字,第一个数字左移4位,然后加上第二个数字 + -> 2b %2b->2 << 4 + 11 std::string res; for (int i = 0; i < url.size(); i++) { if (url[i] == '+' && convert_plus_to_space == true) { res += ' '; continue; } if (url[i] == '%' && (i + 2) < url.size()) { char v1 = HEXTOI(url[i + 1]); char v2 = HEXTOI(url[i + 2]); char v = v1 * 16 + v2; res += v; i += 2; continue; } res += url[i]; } return res; } // 响应状态码的描述信息获取 static std::string StatuDesc(int statu) { auto it = _statu_msg.find(statu); if (it != _statu_msg.end()) { return it->second; } return "Unknow"; } // 根据文件后缀名获取文件mime static std::string ExtMime(const std::string &filename) { // a.b.txt 先获取文件扩展名 size_t pos = filename.find_last_of('.'); if (pos == std::string::npos) { return "application/octet-stream"; } // 根据扩展名,获取mime std::string ext = filename.substr(pos); auto it = _mime_msg.find(ext); if (it == _mime_msg.end()) { return "application/octet-stream"; } return it->second; } // 判断一个文件是否是一个目录 static bool IsDirectory(const std::string &filename) { struct stat st; int ret = stat(filename.c_str(), &st); if (ret < 0) { return false; } return S_ISDIR(st.st_mode); } // 判断一个文件是否是一个普通文件 static bool IsRegular(const std::string &filename) { struct stat st; int ret = stat(filename.c_str(), &st); if (ret < 0) { return false; } return S_ISREG(st.st_mode); } // http请求的资源路径有效性判断 // /index.html --- 前边的/叫做相对根目录 映射的是某个服务器上的子目录 // 想表达的意思就是,客户端只能请求相对根目录中的资源,其他地方的资源都不予理会 // /../login, 这个路径中的..会让路径的查找跑到相对根目录之外,这是不合理的,不安全的 static bool ValidPath(const std::string &path) { // 思想:按照/进行路径分割,根据有多少子目录,计算目录深度,有多少层,深度不能小于0 std::vector subdir; Split(path, "/", &subdir); int level = 0; for (auto &dir : subdir) { if (dir == "..") { level--; // 任意一层走出相对根目录,就认为有问题 if (level < 0) return false; continue; } level++; } return true; }};
class HttpRequest{public: std::string _method; // 请求方法 std::string _path; // 资源路径 std::string _version; // 协议版本 std::string _body; // 请求正文 std::smatch _matches; // 资源路径的正则提取数据 std::unordered_map _headers; // 头部字段 std::unordered_map _params; // 查询字符串public: HttpRequest() : _version("HTTP/1.1") {} void ReSet() { _method.clear(); _path.clear(); _version = "HTTP/1.1"; _body.clear(); std::smatch match; _matches.swap(match); _headers.clear(); _params.clear(); } // 插入头部字段 void SetHeader(const std::string &key, const std::string &val) { _headers.insert(std::make_pair(key, val)); } // 判断是否存在指定头部字段 bool HasHeader(const std::string &key) const { auto it = _headers.find(key); if (it == _headers.end()) { return false; } return true; } // 获取指定头部字段的值 std::string GetHeader(const std::string &key) const { auto it = _headers.find(key); if (it == _headers.end()) { return ""; } return it->second; } // 插入查询字符串 void SetParam(const std::string &key, const std::string &val) { _params.insert(std::make_pair(key, val)); } // 判断是否有某个指定的查询字符串 bool HasParam(const std::string &key) const { auto it = _params.find(key); if (it == _params.end()) { return false; } return true; } // 获取指定的查询字符串 std::string GetParam(const std::string &key) const { auto it = _params.find(key); if (it == _params.end()) { return ""; } return it->second; } // 获取正文长度 size_t ContentLength() const { // Content-Length: 1234\r\n bool ret = HasHeader("Content-Length"); if (ret == false) { return 0; } std::string clen = GetHeader("Content-Length"); return std::stol(clen); } // 判断是否是短链接 bool Close() const { // 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接 if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") { return false; } return true; }};
class HttpResponse{public: int _statu; bool _redirect_flag; std::string _body; std::string _redirect_url; std::unordered_map _headers;public: HttpResponse() : _redirect_flag(false), _statu(200) {} HttpResponse(int statu) : _redirect_flag(false), _statu(statu) {} void ReSet() { _statu = 200; _redirect_flag = false; _body.clear(); _redirect_url.clear(); _headers.clear(); } // 插入头部字段 void SetHeader(const std::string &key, const std::string &val) { _headers.insert(std::make_pair(key, val)); } // 判断是否存在指定头部字段 bool HasHeader(const std::string &key) { auto it = _headers.find(key); if (it == _headers.end()) { return false; } return true; } // 获取指定头部字段的值 std::string GetHeader(const std::string &key) { auto it = _headers.find(key); if (it == _headers.end()) { return ""; } return it->second; } void SetContent(const std::string &body, const std::string &type = "text/html") { _body = body; SetHeader("Content-Type", type); } void SetRedirect(const std::string &url, int statu = 302) { _statu = statu; _redirect_flag = true; _redirect_url = url; } // 判断是否是短链接 bool Close() { // 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接 if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") { return false; } return true; }};
typedef enum{ RECV_HTTP_ERROR, RECV_HTTP_LINE, RECV_HTTP_HEAD, RECV_HTTP_BODY, RECV_HTTP_OVER} HttpRecvStatu;#define MAX_LINE 8192class HttpContext{private: int _resp_statu; // 响应状态码 HttpRecvStatu _recv_statu; // 当前接收及解析的阶段状态 HttpRequest _request; // 已经解析得到的请求信息private: bool ParseHttpLine(const std::string &line) { std::smatch matches; std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?", std::regex::icase); bool ret = std::regex_match(line, matches, e); if (ret == false) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; // BAD REQUEST return false; } // 0 : GET /bitejiuyeke/login?user=xiaoming&pass=123123 HTTP/1.1 // 1 : GET // 2 : /bitejiuyeke/login // 3 : user=xiaoming&pass=123123 // 4 : HTTP/1.1 // 请求方法的获取 _request._method = matches[1]; std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper); // 资源路径的获取,需要进行URL解码操作,但是不需要+转空格 _request._path = Util::UrlDecode(matches[2], false); // 协议版本的获取 _request._version = matches[4]; // 查询字符串的获取与处理 std::vector query_string_arry; std::string query_string = matches[3]; // 查询字符串的格式 key=val&key=val....., 先以 & 符号进行分割,得到各个字串 Util::Split(query_string, "&", &query_string_arry); // 针对各个字串,以 = 符号进行分割,得到key 和val, 得到之后也需要进行URL解码 for (auto &str : query_string_arry) { size_t pos = str.find("="); if (pos == std::string::npos) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; // BAD REQUEST return false; } std::string key = Util::UrlDecode(str.substr(0, pos), true); std::string val = Util::UrlDecode(str.substr(pos + 1), true); _request.SetParam(key, val); } return true; } bool RecvHttpLine(Buffer *buf) { if (_recv_statu != RECV_HTTP_LINE) return false; // 1. 获取一行数据,带有末尾的换行 std::string line = buf->GetLineAndPop(); // 2. 需要考虑的一些要素:缓冲区中的数据不足一行, 获取的一行数据超大 if (line.size() == 0) { // 缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的 if (buf->ReadAbleSize() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } // 缓冲区中数据不足一行,但是也不多,就等等新数据的到来 return true; } if (line.size() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } bool ret = ParseHttpLine(line); if (ret == false) { return false; } // 首行处理完毕,进入头部获取阶段 _recv_statu = RECV_HTTP_HEAD; return true; } bool RecvHttpHead(Buffer *buf) { if (_recv_statu != RECV_HTTP_HEAD) return false; // 一行一行取出数据,直到遇到空行为止, 头部的格式 key: val\r\nkey: val\r\n.... while (1) { std::string line = buf->GetLineAndPop(); // 2. 需要考虑的一些要素:缓冲区中的数据不足一行, 获取的一行数据超大 if (line.size() == 0) { // 缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的 if (buf->ReadAbleSize() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } // 缓冲区中数据不足一行,但是也不多,就等等新数据的到来 return true; } if (line.size() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } if (line == "\n" || line == "\r\n") { break; } bool ret = ParseHttpHead(line); if (ret == false) { return false; } } // 头部处理完毕,进入正文获取阶段 _recv_statu = RECV_HTTP_BODY; return true; } bool ParseHttpHead(std::string &line) { // key: val\r\nkey: val\r\n.... if (line.back() == '\n') line.pop_back(); // 末尾是换行则去掉换行字符 if (line.back() == '\r') line.pop_back(); // 末尾是回车则去掉回车字符 size_t pos = line.find(": "); if (pos == std::string::npos) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; // return false; } std::string key = line.substr(0, pos); std::string val = line.substr(pos + 2); _request.SetHeader(key, val); return true; } bool RecvHttpBody(Buffer *buf) { if (_recv_statu != RECV_HTTP_BODY) return false; // 1. 获取正文长度 size_t content_length = _request.ContentLength(); if (content_length == 0) { // 没有正文,则请求接收解析完毕 _recv_statu = RECV_HTTP_OVER; return true; } // 2. 当前已经接收了多少正文,其实就是往 _request._body 中放了多少数据了 size_t real_len = content_length - _request._body.size(); // 实际还需要接收的正文长度 // 3. 接收正文放到body中,但是也要考虑当前缓冲区中的数据,是否是全部的正文 // 3.1 缓冲区中数据,包含了当前请求的所有正文,则取出所需的数据 if (buf->ReadAbleSize() >= real_len) { _request._body.append(buf->ReadPosition(), real_len); buf->MoveReadOffset(real_len); _recv_statu = RECV_HTTP_OVER; return true; } // 3.2 缓冲区中数据,无法满足当前正文的需要,数据不足,取出数据,然后等待新数据到来 _request._body.append(buf->ReadPosition(), buf->ReadAbleSize()); buf->MoveReadOffset(buf->ReadAbleSize()); return true; }public: HttpContext() : _resp_statu(200), _recv_statu(RECV_HTTP_LINE) {} void ReSet() { _resp_statu = 200; _recv_statu = RECV_HTTP_LINE; _request.ReSet(); } int RespStatu() { return _resp_statu; } HttpRecvStatu RecvStatu() { return _recv_statu; } HttpRequest &Request() { return _request; } // 接收并解析HTTP请求 void RecvHttpRequest(Buffer *buf) { // 不同的状态,做不同的事情,但是这里不要break, 因为处理完请求行后,应该立即处理头部,而不是退出等新数据 switch (_recv_statu) { case RECV_HTTP_LINE: RecvHttpLine(buf); case RECV_HTTP_HEAD: RecvHttpHead(buf); case RECV_HTTP_BODY: RecvHttpBody(buf); } return; }};
class HttpServer{private: using Handler = std::function; using Handlers = std::vector>; Handlers _get_route; Handlers _post_route; Handlers _put_route; Handlers _delete_route; std::string _basedir; // 静态资源根目录 TcpServer _server;private: void ErrorHandler(const HttpRequest &req, HttpResponse *rsp) { // 1. 组织一个错误展示页面 std::string body; body += ""; body += ""; body += ""; body += ""; body += ""; body += ""; body += std::to_string(rsp->_statu); body += " "; body += Util::StatuDesc(rsp->_statu); body += "
"; body += ""; body += ""; // 2. 将页面数据,当作响应正文,放入rsp中 rsp->SetContent(body, "text/html"); } // 将HttpResponse中的要素按照http协议格式进行组织,发送 void WriteReponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp) { // 1. 先完善头部字段 if (req.Close() == true) { rsp.SetHeader("Connection", "close"); } else { rsp.SetHeader("Connection", "keep-alive"); } if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false) { rsp.SetHeader("Content-Length", std::to_string(rsp._body.size())); } if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false) { rsp.SetHeader("Content-Type", "application/octet-stream"); } if (rsp._redirect_flag == true) { rsp.SetHeader("Location", rsp._redirect_url); } // 2. 将rsp中的要素,按照http协议格式进行组织 std::stringstream rsp_str; rsp_str << req._version << " " << std::to_string(rsp._statu) << " " << Util::StatuDesc(rsp._statu) << "\r\n"; for (auto &head : rsp._headers) { rsp_str << head.first << ": " << head.second << "\r\n"; } rsp_str << "\r\n"; rsp_str << rsp._body; // 3. 发送数据 conn->Send(rsp_str.str().c_str(), rsp_str.str().size()); } bool IsFileHandler(const HttpRequest &req) { // 1. 必须设置了静态资源根目录 if (_basedir.empty()) { return false; } // 2. 请求方法,必须是GET / HEAD请求方法 if (req._method != "GET" && req._method != "HEAD") { return false; } // 3. 请求的资源路径必须是一个合法路径 if (Util::ValidPath(req._path) == false) { return false; } // 4. 请求的资源必须存在,且是一个普通文件 // 有一种请求比较特殊 -- 目录:/, /image/, 这种情况给后边默认追加一个 index.html // index.html /image/a.png // 不要忘了前缀的相对根目录,也就是将请求路径转换为实际存在的路径 /image/a.png -> ./wwwroot/image/a.png std::string req_path = _basedir + req._path; // 为了避免直接修改请求的资源路径,因此定义一个临时对象 if (req._path.back() == '/') { req_path += "index.html"; } if (Util::IsRegular(req_path) == false) { return false; } return true; } // 静态资源的请求处理 --- 将静态资源文件的数据读取出来,放到rsp的_body中, 并设置mime void FileHandler(const HttpRequest &req, HttpResponse *rsp) { std::string req_path = _basedir + req._path; if (req._path.back() == '/') { req_path += "index.html"; } bool ret = Util::ReadFile(req_path, &rsp->_body); if (ret == false) { return; } std::string mime = Util::ExtMime(req_path); rsp->SetHeader("Content-Type", mime); return; } // 功能性请求的分类处理 void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers) { // 在对应请求方法的路由表中,查找是否含有对应资源请求的处理函数,有则调用,没有则发挥404 // 思想:路由表存储的时键值对 -- 正则表达式 & 处理函数 // 使用正则表达式,对请求的资源路径进行正则匹配,匹配成功就使用对应函数进行处理 // /numbers/(\d+) /numbers/12345 for (auto &handler : handlers) { const std::regex &re = handler.first; const Handler &functor = handler.second; bool ret = std::regex_match(req._path, req._matches, re); if (ret == false) { continue; } return functor(req, rsp); // 传入请求信息,和空的rsp,执行处理函数 } rsp->_statu = 404; } void Route(HttpRequest &req, HttpResponse *rsp) { // 1. 对请求进行分辨,是一个静态资源请求,还是一个功能性请求 // 静态资源请求,则进行静态资源的处理 // 功能性请求,则需要通过几个请求路由表来确定是否有处理函数 // 既不是静态资源请求,也没有设置对应的功能性请求处理函数,就返回405 if (IsFileHandler(req) == true) { // 是一个静态资源请求, 则进行静态资源请求的处理 return FileHandler(req, rsp); } if (req._method == "GET" || req._method == "HEAD") { return Dispatcher(req, rsp, _get_route); } else if (req._method == "POST") { return Dispatcher(req, rsp, _post_route); } else if (req._method == "PUT") { return Dispatcher(req, rsp, _put_route); } else if (req._method == "DELETE") { return Dispatcher(req, rsp, _delete_route); } rsp->_statu = 405; // Method Not Allowed return; } // 设置上下文 void OnConnected(const PtrConnection &conn) { conn->SetContext(HttpContext()); DBG_LOG("NEW CONNECTION %p", conn.get()); } // 缓冲区数据解析+处理 void OnMessage(const PtrConnection &conn, Buffer *buffer) { while (buffer->ReadAbleSize() > 0) { // 1. 获取上下文 HttpContext *context = conn->GetContext()->get(); // 2. 通过上下文对缓冲区数据进行解析,得到HttpRequest对象 // 1. 如果缓冲区的数据解析出错,就直接回复出错响应 // 2. 如果解析正常,且请求已经获取完毕,才开始去进行处理 context->RecvHttpRequest(buffer); HttpRequest &req = context->Request(); HttpResponse rsp(context->RespStatu()); if (context->RespStatu() >= 400) { // 进行错误响应,关闭连接 ErrorHandler(req, &rsp); // 填充一个错误显示页面数据到rsp中 WriteReponse(conn, req, rsp); // 组织响应发送给客户端 context->ReSet(); buffer->MoveReadOffset(buffer->ReadAbleSize()); // 出错了就把缓冲区数据清空 conn->Shutdown(); // 关闭连接 return; } if (context->RecvStatu() != RECV_HTTP_OVER) { // 当前请求还没有接收完整,则退出,等新数据到来再重新继续处理 return; } // 3. 请求路由 + 业务处理 Route(req, &rsp); // 4. 对HttpResponse进行组织发送 WriteReponse(conn, req, rsp); // 5. 重置上下文 context->ReSet(); // 6. 根据长短连接判断是否关闭连接或者继续处理 if (rsp.Close() == true) conn->Shutdown(); // 短链接则直接关闭 } return; }public: HttpServer(int port, int timeout = DEFALT_TIMEOUT) : _server(port) { _server.EnableInactiveRelease(timeout); _server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1)); _server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2)); } void SetBaseDir(const std::string &path) { assert(Util::IsDirectory(path) == true); _basedir = path; } void Get(const std::string &pattern, const Handler &handler) { _get_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Post(const std::string &pattern, const Handler &handler) { _post_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Put(const std::string &pattern, const Handler &handler) { _put_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Delete(const std::string &pattern, const Handler &handler) { _delete_route.push_back(std::make_pair(std::regex(pattern), handler)); } void SetThreadCount(int count) { _server.SetThreadCount(count); } void Listen() { _server.Start(); }};
#include "http.hpp"#define WWWROOT "./wwwroot/"std::string RequestStr(const HttpRequest &req) { std::stringstream ss; ss << req._method << " " << req._path << " " << req._version << "\r\n"; for (auto &it : req._params) { ss << it.first << ": " << it.second << "\r\n"; } for (auto &it : req._headers) { ss << it.first << ": " << it.second << "\r\n"; } ss << "\r\n"; ss << req._body; return ss.str();}void Hello(const HttpRequest &req, HttpResponse *rsp) { rsp->SetContent(RequestStr(req), "text/plain");}void Login(const HttpRequest &req, HttpResponse *rsp) { rsp->SetContent(RequestStr(req), "text/plain");}void PutFile(const HttpRequest &req, HttpResponse *rsp) { std::string pathname = WWWROOT + req._path; Util::WriteFile(pathname, req._body);}void DelFile(const HttpRequest &req, HttpResponse *rsp) { rsp->SetContent(RequestStr(req), "text/plain");}int main(){ HttpServer server(8081); server.SetThreadCount(3); server.SetBaseDir(WWWROOT);//设置静态资源根目录,告诉服务器有静态资源请求到来,需要到哪里去找资源文件 server.Get("/hello", Hello); server.Post("/login", Login); server.Put("/1234.txt", PutFile); server.Delete("/1234.txt", DelFile); server.Listen(); return 0;}
⼀个连接中每隔3s向服务器发送⼀个请求,查看是否会收到响应。
预期结果:可以正常进行长连接的通信。
int main(){ Socket cli_sock; cli_sock.CreateClient(8085, "10.0.24.11"); std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DEBUG_LOG("[%s]", buf); sleep(3); } cli_sock.Close(); return 0;}
客户端每三秒发送一次数据,刷新活跃度。长连接测试正常。
创建一个客户端,给服务器发送一次数据后 不动了,查看服务器是否会正常的超时关闭连接。
int main(){ Socket cli_sock; cli_sock.CreateClient(8085, "10.0.24.11"); std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DEBUG_LOG("[%s]", buf); sleep(15); } cli_sock.Close(); return 0;}
客户端发送一次数据后,超时时间内再无动作。非活跃连接正常超时关闭,测试正常。
给服务器发送一个数据,告诉服务器要发送1024字节的数据,但是实际发送的数据不足1024,查看服务器处理结果。
//不完整请求测试int main(){ Socket cli_sock; cli_sock.CreateClient(8085, "10.0.24.11"); std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 100\r\n\r\nbitejiuyeke"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); //assert(cli_sock.Send(req.c_str(), req.size()) != -1); //assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf,1023)); DEBUG_LOG("[%s]", buf); sleep(3); } cli_sock.Close(); return 0;}
1. 如果数据只发送一次,服务器将得不到完整请求,就不会进行业务处理,客户端也就得不到响应,最终超时关闭连接。
2. 连着给服务器发送了多次 小的请求, 服务器会将后边的请求当作前边请求的正文进行处理,而后边处理的时候有可能就会因为处理错误而关闭连接。
业务处理超时,查看服务器的处理情况
当服务器达到了一个性能瓶颈,在一次业务处理中花费了太长的时间(超过了服务器设置的非活跃超时时间)
1. 在一次业务处理中耗费太长时间,导致其他的连接也被连累超时,其他的连接有可能会被拖累超时释放。假设现在 12345描述符就绪了, 在处理1的时候花费了30s处理完,超时了,导致2345描述符因为长时间没有刷新活跃度
1. 如果接下来的2345描述符都是通信连接描述符,如果都就绪了,则并不影响,因为接下来就会进行处理并刷新活跃度
2. 如果接下来的2号描述符是定时器事件描述符 定时器触发超时,执行定时任务,就会将345描述符给释放掉
2.1 这时一旦345描述符对应的连接被释放,接下来在处理345事件的时候就会导致程序崩溃(内存访问错误)
2.2 因此这时候,在本次事件处理中,并不能直接对连接进行释放,而应该将释放操作压入到任务池中,等到事件处理完了执行任务池中的任务的时候,再去释放。
int main(){ signal(SIGCHLD, SIG_IGN); for (int i = 0; i < 10; i++) { pid_t pid = fork(); if (pid < 0) { DEBUG_LOG("FORK ERROR"); return -1; }else if (pid == 0) { Socket cli_sock; cli_sock.CreateClient(8085, "10.0.24.11"); std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DEBUG_LOG("[%s]", buf); } cli_sock.Close(); exit(0); } } while(1) sleep(1); return 0;}
一次性给服务器发送多条数据,然后查看服务器的处理结果。
int main(){ Socket cli_sock; cli_sock.CreateClient(8085, "10.0.24.11"); std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DEBUG_LOG("[%s]", buf); sleep(3); } cli_sock.Close(); return 0;}
每一条请求都应该得到正常处理。
大文件传输测试,给服务器上传一个大文件,服务器将文件保存下来,观察处理结果。上传的文件,和服务器保存的文件一致。
准备好一个测试文件,资源有限,创建一个100MB大小的log.txt。
int main(){ Socket cli_sock; cli_sock.CreateClient(8085, "10.0.24.11"); std::string req = "PUT /1234.txt HTTP/1.1\r\nConnection: keep-alive\r\n"; std::string body; Util::ReadFile("./log.txt", body); req += "Content-Length: " + std::to_string(body.size()) + "\r\n\r\n"; assert(cli_sock.Send(req.c_str(), req.size()) != -1); assert(cli_sock.Send(body.c_str(), body.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DEBUG_LOG("[%s]", buf); sleep(3); cli_sock.Close(); return 0;}
文件上传成功:
收到的文件:
对比两个文件内容是否相同:
根据测试,文件传输也没有问题。
通过测试工具模拟大量客户端向服务器发送连接请求。
模拟20000个客户端同时向服务器发送请求,没有出现连接失败。
测试结论(参考)
性能测试环境:
服务端:2核2G带宽为1M的云服务器。
客户端:4核8G的虚拟机通过webbench工具模拟客户端,创建大量线程连接服务器,发送请求,在收到响应后关闭连接,开始下一个连接的建立。
测试结论:
服务器并发量:可以同时处理20000-30000个客户端的请求而不会出现连接失败。
QPS:(Query Per Second)每秒查询率107左右。
来源地址:https://blog.csdn.net/qq_65307907/article/details/132391236
--结束END--
本文标题: C++项目:仿mudou库one thread one loop式并发服务器实现
本文链接: https://lsjlt.com/news/423985.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0