【基于主从Reactor模型的C++高并发服务器组件】
文章目录
- 一、项目介绍
-
- 1-1、项目总体简介
- 1-2、项目开发环境
- 1-3、项目核心技术
- 1-4、项目开发流程
- 1-5、项目如何使用
- 二、框架设计
-
- 2-1、功能模块划分
-
- 2-1-1、SERVER模块
- 2-1-2、协议模块
- 2-2、项目蓝图
-
- 2-2-1、整体图
- 2-2-2、模块关系图
-
- 2-2-2-1、Connection 模块关系图
- 2-2-2-2、Acceptor 模块关系图
- 2-2-2-3、EventLoop 模块关系图
- 三、前置知识
-
- 3-1、HTTP服务器
- 3-2、Reactor模型
-
- 3-2-1、单Reactor单线程
- 3-2-2、单Reactor多线程
- 3-2-3、多Reactor多线程
- 3-2-4、One Thread One Loop主从Reactor模型⾼并发服务器
- 3-3、bind
- 3-4、定时器
-
- 3-4-1、定时器详解
- 3-4-2、时间轮思想
- 3-4-3、时间轮实现
- 3-5、正则库regex
-
- 3-5-1、regex简介
- 3-5-2、正则表达式规则
- 3-5-3、正则库对于HTTP请求行的解析
- 3-6、通⽤类型any类型
-
- 3-6-1、设计思想
- 3-6-2、Any类的实现
- 3-6-3、C++库中的Any
- 3-7、eventfd事件通知机制
- 四、SERVER服务器模块实现
-
- 4-1、Buffer模块【缓冲区】
- 4-2、日志模块【打印日志】
- 4-3、Socket模块【套接字封装】
- 4-4、Channel模块【描述符管理】
- 4-5、Poller模块【描述符IO事件监控】
- 4-6、EventLoop模块【事件监控+事件处理】
- 4-7、TimerQueue模块【定时任务】
- 4-8、Connection模块【通信连接管理】
- 4-9、Acceptor模块【监听套接字管理】
- 4-10、LoopThread模块【EventLoop模块与线程整合】
- 4-11、LoopThreadPool模块【针对LoopThread设计线程池】
- 4-12、TcpServer模块【对上述所有模块整合】
- 4-13、EchoServer模块【回显服务】
- 五、HTTP协议模块
-
- 5-1、Util模块【零碎工具接口】
- 5-2、HttpRequest模块【存储HTTP请求信息】
- 5-3、HttpResponse模块【存储HTTP响应信息】
- 5-4、HttpContext模块【请求接收上下文】
- 5-5、HttpServer模块【HTTP协议支持所有模块的整合】
- 5-6、HttpServer模块简单测试【Postman】
- 六、功能测试
-
- 6-1、服务器长连接测试
- 6-2、服务器超时连接测试
- 6-3、服务器请求数据错误格式测试
- 6-4、服务器业务处理超时测试
- 6-5、服务器一次进行多条请求测试
- 6-6、服务器大文件传输测试
- 6-7、服务器性能压力测试
- 7、项目源码
一、项目介绍
1-1、项目总体简介
本项目主要是通过模仿muduo库,来实现一个以主从Reactor为模型,以 OneThreadOneEventLoop 为事件驱动的高并发服务器组件。通过这个服务器组件,我们可以简洁快速的搭建出一个高性能的 TCP 服务器。并且组件内部会提供不同的应用层协议支持,组件使用者可以通过这些协议快速的完成一个应用服务器的搭建。本项目中使用HTTP协议,当然准确来说,因为我们要实现的服务器本⾝并不存在业务,所以我们要实现的应该算是⼀个⾼性能服务器基础库,是⼀个基础组件。后续有什么需求直接搭建在我们该服务器基础之上就行了
下面是muduo库的详细介绍,以及muduo对应的源码,可供参考学习
muduo库介绍
muduo库源码
1-2、项目开发环境
本项目的开发环境如下:
VSCode:通过 VSCode 远程连接服务器(Linux)或直接使用 Linux中的Vim/Vi 进行代码编写与功能测试Linux:在 Centos7.6 环境下进行开发环境搭建与项目部署g++/gdb:通过 g++/gdb 进行代码编译与调试。注意:gcc版本要在7.0以上,不然不支持一些接口函数Makefile:通过 Makefile 进行项目构建
这里因为没有在VSCode中安装插件,所以不能直接在VSCode中进行编写C++代码
本项目直接采用VSCode连接远程服务器技术来进行开发
VS编译器也可以进行开发,但是比较麻烦
1-3、项目核心技术
1、C++11/C++17:使用 C++11/C++17中的某些新特性完成代码的编写例如function/bind/shared_ptr/mutex/regex/any。
2、Linux系统编程:使用 Linux 相关系统调用完成代码编写,例如 read/write/timerfd/eventfd/epoll。
3、Linux网络编程:使用 Socket 相关接口实现网络通信,例如socket/bind/listen/accept/recv/send/setsockopt。
4、HTML:编写简单 HTML页面进行功能测试。
1-4、项目开发流程
本项目一共分为四个开发流程:
框架设计:进行项目模块划分,确定每一个模块需要实现的功能前置知识:对项目中需要用到的一些知识进行了解,学会它们的基本使用; 比如Reactor 模式,Linux 中的定时器与事件通知,C++中的正则表达式与通用容器 Any 等模块开发:对项目的各个模块进行开发与基本功能测试功能测试:对最终搭建出来的应用服务器进行各种功能测试,包括边缘功能测试以及服务器压力测试
1-5、项目如何使用
我们将各个模块的代码都实现在头文件里面,实现完成以后,直接将项目给用户,然后用户直接将头文件包含就可以使用了【单头文件的使用方式】
二、框架设计
2-1、功能模块划分
基于以上的理解,我们要实现的是⼀个带有协议⽀持的Reactor模型⾼性能服务器,因此将整个项⽬的实现划分为两个⼤的模块:
- SERVER模块:实现Reactor模型的TCP服务器——高性能服务器的模块
- 协议模块:对当前的Reactor模型服务器提供应⽤层协议⽀持——对我们实现的高性能服务器,提供应用层的各种协议支持
在server模块之上,可以实现应用层各种协议模块来实现不同的功能
以下是各个模块的概念介绍:
2-1-1、SERVER模块
SERVER模块就是对所有的连接以及线程进⾏管理,让它们各司其职,在合适的时候做合适的事,最终完成⾼性能服务器组件的实现
⽽具体的管理也分为三个⽅⾯:
监听连接管理:对监听连接进⾏管理通信连接管理:对通信连接进⾏管理超时连接管理:对超时连接进⾏管理
基于以上的管理思想,可以将 SERVER 模块划分为以下的一些子模块:
- Buffer 模块:实现通信套接字的用户态缓冲区,防止接收到的数据不是一条完整的数据,同时确保客户端响应的数据在套接字可写的情况下进行发送。
- Socket 模块:对 socket 套接字的操作进行封装,使得程序中对于套接字的各项操作更加简便。
- Channel 模块:对于一个描述符进行监控事件管理,便于在用户态对描述符的监控事件进行维护,以及Poller模块对描述符进⾏IO事件监控就绪后,根据不同的事件,回调不同的处理函数功能。
- Connection 模块:对通信连接进行整体管理,一个连接的所有操作都通过此模块来完成,增加连接操作的灵活以及便捷性。
Connection模块是对Buffer模块,Socket模块,Channel模块的⼀个整体封装,实现了对⼀个通信套接字的整体的管理,每⼀个进⾏数据通信的套接字(也就是accept获取到的新连接)都会使⽤Connection进⾏管理。
• Connection模块内部包含有三个由组件使⽤者传⼊的回调函数:连接建⽴完成回调,事件回调,新数据回调,关闭回调。
• Connection模块内部包含有两个组件使⽤者提供的接⼝:数据发送接⼝,连接关闭接⼝
• Connection模块内部包含有两个⽤⼾态缓冲区:⽤⼾态接收缓冲区,⽤⼾态发送缓冲区
• Connection模块内部包含有⼀个Socket对象:完成描述符⾯向系统的IO操作
• Connection模块内部包含有⼀个Channel对象:完成描述符IO事件就绪的处理
具体处理流程如下:
- 实现向Channel提供可读,可写,错误等不同事件的IO事件回调函数,然后将Channel和对应的描
述符添加到Poller事件监控中。- 当描述符在Poller模块中就绪了IO可读事件,则调⽤描述符对应Channel中保存的读事件处理函
数,进⾏数据读取,将socket接收缓冲区全部读取到Connection管理的⽤⼾态接收缓冲区中。然
后调⽤由组件使⽤者传⼊的新数据到来回调函数进⾏处理。- 组件使⽤者进⾏数据的业务处理完毕后,通过Connection向使⽤者提供的数据发送接⼝,将数据
写⼊Connection的发送缓冲区中。- 启动描述符在Poll模块中的IO写事件监控,就绪后,调⽤Channel中保存的写事件处理函数,将发
送缓冲区中的数据通过Socket进⾏⾯向系统的实际数据发送。
- Acceptor 模块:对监听套接字进行管理,为客户端的新建连接创建 Connection 对象,并设置各种回调。
Acceptor模块是对Socket模块,Channel模块的⼀个整体封装,实现了对⼀个监听套接字的整体的管理。
• Acceptor模块内部包含有⼀个Socket对象:实现监听套接字的操作
• Acceptor模块内部包含有⼀个Channel对象:实现监听套接字IO事件就绪的处理
具体处理流程如下:
- 实现向Channel提供可读事件的IO事件处理回调函数,函数的功能其实也就是获取新连接
- 为新连接构建⼀个Connection对象出来。
- TimerQueue 模块:定时任务模块,让一个任务可以在指定的时间之后被执行。
TimerQueue模块是实现固定时间定时任务的模块,可以理解就是一个定时任务管理器,向定时任务管理器中添加⼀个任务,任务将在固定时间后被执⾏,同时也可以通过刷新定时任务来延迟任务的执⾏。
这个模块主要是对Connection对象的⽣命周期管理,对⾮活跃连接进⾏超时后的释放功能。
- TimerQueue模块内部包含有⼀个timerfd:linux系统提供的定时器。
- TimerQueue模块内部包含有⼀个Channel对象:实现对timerfd的IO时间就绪回调处理
- Poller模块:对任意的描述符进行IO事件监控,本质上就是对 epoll 的各种操作进行封装,从而让对描述符进行事件监控的操作更加简单,此模块是 Channel 模块的一个子模块。
- EventLoop 模块:对事件监控进行管理,为了确保线程安全,此模块一个模块对应一个线程,服务器中的所有的事件都是由此模块来完成。
1、EventLoop模块可以理解就是我们上边所说的Reactor模块,它是对Poller模块,TimerQueue模块,Socket模块的⼀个整体封装,进⾏所有描述符的事件监控。EventLoop模块必然是⼀个对象对应⼀个线程的模块,线程内部的⽬的就是运⾏EventLoop的启动函数。
2、EventLoop模块为了保证整个服务器的线程安全问题,因此要求使⽤者对于Connection的所有操作⼀定要在其对应的EventLoop线程内完成,不能在其他线程中进⾏(⽐如组件使⽤者使⽤Connection发送数据,以及关闭连接这种操作)。
3、EventLoop模块保证⾃⼰内部所监控的所有描述符,都要是活跃连接,⾮活跃连接就要及时释放避免资源浪费。
4、EventLoop模块内部包含有⼀个eventfd:eventfd其实就是linux内核提供的⼀个事件fd,专⻔⽤于事件通知。
• EventLoop模块内部包含有⼀个Poller对象:⽤于进⾏描述符的IO事件监控。
• EventLoop模块内部包含有⼀个TimerQueue对象:⽤于进⾏定时任务的管理。
• EventLoop模块内部包含有⼀个PendingTask队列:组件使⽤者将对Connection进⾏的所有操作,都加⼊到任务队列中,由EventLoop模块进⾏管理,并在EventLoop对应的线程中进⾏执⾏。
• 每⼀个Connection对象都会绑定到⼀个EventLoop上,这样能保证对这个连接的所有操作都是在⼀个线程中完成的。
具体操作流程:
- 通过Poller模块对当前模块管理内的所有描述符进⾏IO事件监控,有描述符事件就绪后,通过描述符对应的Channel进⾏事件处理。
- 所有就绪的描述符IO事件处理完毕后,对任务队列中的所有操作顺序进⾏执⾏。
- 由于epoll的事件监控,有可能会因为没有事件到来⽽持续阻塞,导致任务队列中的任务不能及时得
到执⾏,因此创建了eventfd,添加到Poller的事件监控中,⽤于实现每次向任务队列添加任务的时
候,通过向eventfd写⼊数据来唤醒epoll的阻塞。
- LoopThread 模块:将 EventLoop 与 thread 整合到一起,向外部返回所实例化的 EventLoop 对象,即将 EventLoop 对象与线程一一绑定。
- LoopThreadPool 模块:LoopThread 线程池,用于对所有的 LoopThread 进行管理及分配。
- TcpServer 模块:对前边所有子模块进行整合,从而提供给组件使用者的可以便捷的完成一个高性能服务器搭建的模块。
TcpServer 模块这个模块是⼀个整体Tcp服务器模块的封装,内部封装了Acceptor模块,EventLoopThreadPool模块。
• TcpServer中包含有⼀个EventLoop对象:以备在超轻量使⽤场景中不需要EventLoop线程池,只需要在主线程中完成所有操作的情况。
• TcpServer模块内部包含有⼀个EventLoopThreadPool对象:其实就是EventLoop线程池,也就是⼦Reactor线程池
• TcpServer模块内部包含有⼀个Acceptor对象:⼀个TcpServer服务器,必然对应有⼀个监听套接字,能够完成获取客⼾端新连接,并处理的任务。
• TcpServer模块内部包含有⼀个std::shared_ptr的hash表:保存了所有的新建连接对应的Connection,注意,所有的Connection使⽤shared_ptr进⾏管理,这样能够保证在hash表中删除了Connection信息后,在shared_ptr计数器为0的情况下完成对Connection资源的释放操作。
具体操作流程如下:
- 在实例化TcpServer对象过程中,完成BaseLoop的设置,Acceptor对象的实例化,以及EventLoop
线程池的实例化,以及std::shared_ptr的hash表的实例化。- 为Acceptor对象设置回调函数:获取到新连接后,为新连接构建Connection对象,设置Connection的各项回调,并使⽤shared_ptr进⾏管理,并添加到hash表中进⾏管理,并为Connection选择⼀个EventLoop线程,为Connection添加⼀个定时销毁任务,为Connection添加
事件监控,- 启动BaseLoop。
2-1-2、协议模块
协议模块用于对 SERVER 模块提供应用层协议支持,基于提供的协议支持能够更方便的完成指定协议服务器的搭建,同时还能够根据不同的应用场景切换不同的应用层协议。项目目前只提供了 HTTP 协议支持。
- HTTP协议模块:HTTP协议模块⽤于对⾼并发服务器模块进⾏协议⽀持,基于提供的协议⽀持能够更⽅便的完成指定协议服务器的搭建。
HTTP 协议支持模块可以划分为以下几个子模块:
- Util 模块:工具模块,提供 HTTP 协议模块所用到的一些工具函数,比如 URL 编码与解码、文件数据读取与写入等。
- HttpRequest 模块:HTTP 请求数据模块,用于保存 HTTP 请求数据被解析后的各项请求元素信息。
- HttpResponse 模块:HTTP 响应数据模块,用于业务处理后设置并保存 HTTP 响应数据的的各项元素信息,最终会被按照HTTP协议响应格式组织成为响应信息发送给客⼾端。
- HttpContext 模块:HTTP请求接收的上下⽂模块,主要是为了防⽌在⼀次接收的数据中,不是⼀个完整的HTTP请求,则解析过程并未完成,⽆法进⾏完整的请求处理,需要在下次接收到新数据后继续根据上下⽂进⾏解析,最终得到⼀个HttpRequest请求信息对象,因此在请求数据的接收以及解析部分需要⼀个上下⽂来进⾏控制接收和处理节奏。
- HttpServer 模块:对 HTTP 协议支持的所有模块的整合,让HTTP服务器的搭建变得更加简便。
HttpServer模块:
这个模块是最终给组件使⽤者提供的HTTP服务器模块了,⽤于以简单的接⼝实现HTTP服务器的搭建。
HttpServer模块内部包含有⼀个TcpServer对象:TcpServer对象实现服务器的搭建
HttpServer模块内部包含有两个提供给TcpServer对象的接⼝:连接建⽴成功设置上下⽂接⼝,数据处理接⼝。
HttpServer模块内部包含有⼀个hash-map表存储请求与处理函数的映射表:组件使⽤者向
HttpServer设置哪些请求应该使⽤哪些函数进⾏处理,等TcpServer收到对应的请求就会使⽤对应的函数进⾏处理。
2-2、项目蓝图
2-2-1、整体图
所以,我们项目中的协议切换,就是在Connection模块中进行调用不同的回调函数,达到由http协议切换为其他协议
2-2-2、模块关系图
2-2-2-1、Connection 模块关系图
2-2-2-2、Acceptor 模块关系图
2-2-2-3、EventLoop 模块关系图
三、前置知识
3-1、HTTP服务器
概念:
HTTP(Hyper Text Transfer Protocol),超⽂本传输协议是应⽤层协议,是⼀种简单的请求-响应协议(客⼾端根据⾃⼰的需要向服务器发送请求,服务器针对请求提供服务,完毕后通信结束)
需要注意的是HTTP协议是⼀个运⾏在TCP协议之上的应⽤层协议,这⼀点本质上是告诉我们,HTTP服务器其实就是个TCP服务器,只不过在应⽤层基于HTTP协议格式进⾏数据的组织和解析来明确客⼾端的请求并完成业务处理
因此实现HTTP服务器简单理解,只需要以下⼏步即可
1. 搭建⼀个TCP服务器,接收客⼾端请求。2. 以HTTP协议格式进⾏解析请求数据,明确客⼾端⽬的。3. 明确客⼾端请求⽬的后提供对应服务。4. 将服务结果⼀HTTP协议格式进⾏组织,发送给客⼾端
3-2、Reactor模型
Reactor 模式是指一个或多个客户端同时向服务器发送请求,进行业务处理的事件驱动处理模式。即服务器与客户端建立连接之后,哪个客户端给服务器发送了数据、触发了事件,服务器就对哪个客户端进行处理 (接收数据 + 处理数据 + 发送响应)。Reactor 模式的实现依赖于I/O多路复用技术(也称为I/O多路转接技术)。
Reactor模式分为单Reactor单线程、单Reactor多线程以及多Reactor多线程等不同的类别
3-2-1、单Reactor单线程
单I/O多路复⽤+业务处理
概念:在单个线程中进行事件监控与处理;即通过IO多路复用模型进行客户端请求监控,触发事件后进行事件处理
1. 通过IO多路复⽤模型进⾏客⼾端请求监控2. 触发事件后,进⾏事件处理a. 如果是新建连接请求,则获取新建连接,并添加⾄多路复⽤模型进⾏事件监控。b. 如果是数据通信请求,则进⾏对应数据处理(接收数据,处理数据,发送响应)。
优点:所有操作均在同⼀线程中完成,所有操作都是串行化的,思想流程较为简单,不涉及进程/线程间通信及资源争抢问题。
缺点:⽆法有效利⽤CPU多核资源,很容易达到性能瓶颈,导致客户端请求超时。
适⽤场景:适⽤于客⼾端数量较少,且处理速度较为快速的场景。(处理较慢或活跃连接较多,会导致串⾏处理的情况下,后处理的连接⻓时间⽆法得到响应)。
3-2-2、单Reactor多线程
单I/O多路复⽤+线程池(业务处理)
概念:一个Reactor线程 + 一个业务线程池;即Reactor线程通过IO多路复用模型进行客户端请求监控,触发事件后进行事件处理
1. Reactor线程通过I/O多路复⽤模型进⾏客⼾端请求监控2. 触发事件后,进⾏事件处理a. 如果是新建连接请求,则获取新建连接,并添加⾄多路复⽤模型进⾏事件监控。b. 如果是数据通信请求,则接收数据后分发给Worker线程池进⾏业务处理。c. ⼯作线程处理完毕后,将响应交给Reactor线程进⾏数据响应
优点:充分利用了CPU多核资源,提高了处理效率,同时降低了代码耦合度
缺点:多线程间的数据共享访问控制较为复杂,单个Reactor 承担所有事件的监听和响应,在单线程中运⾏,⾼并发场景下容易成为性能瓶颈(当短时间内有大量客户端连接时,服务器来不及进行新的客户端连接处理)
3-2-3、多Reactor多线程
多I/O多路复⽤+线程池(业务处理)
概念:主Reactor线程(获取新连接)+ 从属Reactor线程池(事件监控与IO处理)+ 业务线程池 (业务处理)
1. 主Reactor线程专门处理新连接请求事件,有新连接到来则将其分发到从属Reactor线程池中进行事件监控与IO处理2. 从属Reactor线程用于客户端的通信事件监控,当客户端通信事件触发时,接收客户端数据并分发给业务线程池进行业务处理3. 业务线程池分配独⽴的线程进⾏具体的业务处理a. ⼯作线程处理完毕后,将响应交给⼦Reactor线程进⾏数据响应
优点:充分利⽤CPU多核资源,主从Reactor各司其职。可以进行合理分配,适用于高并发场景
注意:执行流并不是越多越好,因为执行流越多,CPU切换调度的成本越高,所以在某些主从Reactor模型中,并没有单独的业务线程池,而是将业务处理直接放到从属Reactor线程池中完成的,一切取决于业务场景
3-2-4、One Thread One Loop主从Reactor模型⾼并发服务器
上面的三种Reactor模式中,各有各的应用场景,按需索取
本项目使用基于多Reactor多线程的变形模式来完成:
我们可以将多Reactor多线程模式进行进一步的简化——将业务线程直接与从Reactor线程进行合并
- One Thread One Loop主从Reactor模型就是本项目的高并发的并发模型!
优点:
1、减少了cpu切换调度的成本(直接在从属Reactor线程里面完成IO处理和业务处理)
2、在业务线程池里面进行业务处理时,在多对多的情况下,我们线程池内的各个线程的内容存在相互影响,这个时候需要锁来保证线程之间数据的安全性。如果在从属Reactor线程里面完成IO处理和业务处理,各个线程的内容相互影响比较小,就减少了锁的使用,提高了效率。在不加锁的情况下,也保证了安全性
⽬标定位:One Thread One Loop主从Reactor模型⾼并发服务器
我们要实现的是主从Reactor模型服务器,也就是主Reactor线程仅仅监控监听描述符,获取新建连接,保证获取新连接的⾼效性,提⾼服务器的并发性能。主Reactor获取到新连接后分发给⼦Reactor进⾏通信事件监控。⽽⼦Reactor线程监控各⾃的描述符的读写事件进⾏数据读写以及业务处理。
One Thread One Loop的思想就是把所有的操作都放到⼀个线程中进⾏,⼀个线程对应⼀个事件处理的循环。
当前实现中,因为并不确定组件使⽤者的使⽤意向,因此并不提供业务层⼯作线程池的实现,只实现主从Reactor,⽽业务⼯作线程池,可由组件库的使⽤者的需要⾃⾏决定是否使⽤和实现
3-3、bind
bind文档
C++11中的bind:
bind (Fn&& fn, Args&&... args);官⽅⽂档对于bind接⼝的概述解释:Bind function arguments——函数参数绑定
我们可以将bind接⼝看作是⼀个通⽤的函数适配器,它接受⼀个函数对象,以及函数的各项参数,然后返回⼀个新的函数对象,但是这个函数对象的参数已经被绑定为设置的参数。运⾏的时候相当于总是调⽤传⼊固定参数的原函数。但是如果进⾏绑定的时候,给与的参数为 std::placeholders::_1, _2...
则相当于为新适配⽣成的函数对象的调⽤预留⼀个参数进⾏传递。
样例1:
- 原来调用函数
#include #include #include void Print(const std::string &str)//下面实参字符串具有常属性,要加上const{ std::cout << str << std::endl;}int main(){ Print(\"hello world!\"); return 0;}
- bind函数绑定之后调用函数
#include #include #include void Print(const std::string &str)//下面实参字符串具有常属性,要加上const{ std::cout << str << std::endl;}int main(){ //Print(\"hello world!\"); auto func = std::bind(Print,\"hello world!\"); func();//此时使用bind绑定之后,func() == Print(\"hello world!\"); return 0;}
g++ bind.cpp -std=c++11 -o bind 进行编译之后
./bind 两次输出结果是一模一样的
样例2:
- bind函数绑定之后调用函数
#include #include #include void Print(const std::string &str,const int& num) // 这里增加了一个参数num,现在有两个参数了{ std::cout << str << \" \" << num << std::endl;}int main(){ //如果func调用地方还要传入参数 //bind预留了一个placeholders::_1,来方便我们下面调用func()时再传入参数 auto func = std::bind(Print, \"hello world!\",std::placeholders::_1); //将placeholders::_1这个参数bind绑定到Print第二个参数上面 func(10);//func(10) == Print(\"hello world!\",10) return 0;}
同理可得:
#include #include #include void Print(const std::string &str, const int &num1, const int &num2, const int &num3) // 这里增加了一个参数num,现在有两个参数了{ std::cout << str << \" \" << num1 << \" \" << num2 << \" \" << num3 << std::endl;}int main(){ auto func = std::bind(Print, \"hello world!\", std::placeholders::_1,std::placeholders::_2,std::placeholders::_3); func(10, 20, 30);//如果Print函数有多个参数,在func调用处传入第二个到最后一个参数 //在bind处使用placeholders::_1,placeholders::_2,placeholders::_3...来依次绑定Print第二到最后一个参数 return 0;}
1、bind绑定函数之后std::placeholders::_1表示函数未被传参的第一个参数值
2、如果bind绑定之后给了函数对应的参数值,那么func调用中再次给参数值将不起作用
基于bind的作⽤,当我们在设计⼀些线程池,或者任务池的时候,就可以将将任务池中的任务设置为函数类型,函数的参数由添加任务者直接使⽤bind进⾏适配绑定设置,⽽任务池中的任务被处理,只需要取出⼀个个的函数进⾏执⾏即可。这样做有个好处就是,这种任务池在设计的时候,不⽤考虑都有哪些任务处理⽅式了,处理函数该如何设计,有多少个什么样的参数,这些都不⽤考虑了,降低了代码之间的耦合度。
样例:
#include #include #include #include void Print(const std::string &str,const int& num){ std::cout << str << \" \" << num << std::endl;}int main(){ using task = std::function<void()>; std::vector<task> array;//这里的数组当做线程池/任务池 array.push_back(bind(Print,\"hello\",10)); array.push_back(bind(Print,\"hello\",20)); array.push_back(bind(Print,\"hello\",30)); array.push_back(bind(Print,\"hello\",40)); array.push_back(bind(Print,\"hello\",50)); for(auto& e: array) { e();//这里就相当于把任务一个个取出来处理,并且不需要传递参数,因为上面都bind绑定好了 } return 0;}
3-4、定时器
3-4-1、定时器详解
由于服务器的资源是有限的,为了避免某些客户端连接上来之后一直不通信而平白浪费服务器资源的情况,我们需要对非活跃连接设置定时销毁,而实现这个功能的前提是得有一个定时器。
timerfd 是 Linux 给我们提供的定时器,它主要包括 timerfd_create (创建定时器) 和 timerfd_settime (启动定时器) 两个函数:
头文件
#include
timerfd_create (创建定时器)
int timerfd_create(int clockid, int flags);clockid: CLOCK_REALTIME——系统实时时间,如果修改了系统时间就会出问题;CLOCK_MONOTONIC——从开机到现在的时间是⼀种相对时间;flags: 0-默认阻塞属性返回值:timerfd的操作和普通文件操作是一样的,因此timerfd_create返回的时一个文件描述符
linux下一切皆文件,定时器的操作也是跟文件操作并没有什么区别,而定时器定时的原理就是:
每隔一段时间(定时器的超时时间),系统就会给这个描述符对应的定时器写入一个8字节数据创建了一个定时器,定时器定立的超时时间是3s,也就是说每3s计算一次超时
从启动开始,每隔3s中,系统都会给描述如写入一个1,表示从上一次读取数据到现在超时了1次
假设30s之后才读取数据,则这时候就会读取到一个10,表示上一次读取数据到限制超时了10次
timerfd_settime (启动定时器)
int timerfd_settime(int fd, int flags, struct itimerspec *new, structitimerspec *old);fd: timerfd_create返回的⽂件描述符flags: 0-相对时间, 1-绝对时间;默认设置为0即可.new: ⽤于设置定时器的新超时时间old: ⽤于接收原来的超时时间(保存原来设置的超时时间,以便于还原,不需要old设置为空就行)返回值:启动成功返回0,失败返回-1struct timespec{time_t tv_sec;/* 秒*/long tv_nsec;/* 纳秒*/};struct itimerspec {struct timespec it_interval; /* 第⼀次之后的超时间隔时间 */ struct timespec it_value;/* 第⼀次超时时间 */};
timerfd 会在每次超时时,自动给 fd 中写入8字节的数据,表示从上⼀次读取数据到当前读取数据期间超时了多少次。所以我们在读取 fd 文件内容时需要一次读取8字节,表⽰在上⼀次读取数据到当前读取数据期间超时了多少次。
样例:
#include #include #include #include //定时器使用方法:两个接口,一个结构//int timerfd_create(int clockid, int flags);创建定时器//int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old);启动定时器//struct itimerspec itime;该结构里面还有一个结构体struct timespecint main(){ // int timerfd_create(int clockid, int flags);创建定时器 int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); // CLOCK_MONOTONIC表示开机到现在的时间——相对时间。0表示默认阻塞操作 if (timerfd < 0) { perror(\"timerfd create error!\\n\"); return -1; } // int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old);启动定时器 // 启动定时器之前要先有时间 struct itimerspec itime; itime.it_value.tv_sec = 1; // itime中的it_value是第一次超时时间,而it_value里面有tv_sec秒和tv_nsec纳秒两种设置,这里设置1s itime.it_value.tv_nsec = 0; // 为了防止随机数,纳秒设置为0 itime.it_interval.tv_sec = 1; // 第一次超时后,每次超时的时间间隔 itime.it_interval.tv_nsec = 0; // 启动定时器 timerfd_settime(timerfd, 0, &itime, NULL); // flag设为0表示相对时间,old不要设为NULL while (1) { uint64_t times;// 我们每次超时,自动给timerfd中写入8字节的数据,所以这里需要uint64_t的8字节 int ret = read(timerfd, ×, 8); // timerfd是阻塞的,没有数据就一直卡在read这里 if (ret < 0) { perror(\"read error!\\n\"); return -1; } printf(\"超时了,距离上一次超时了%ld次\\n\", times); // fflush(stdout); } close(timerfd); return 0;}
上边例⼦,是⼀个定时器的使⽤⽰例,是每隔1s钟触发⼀次定时器超时,否则就会阻塞在read读取数据这⾥。
基于这个例⼦,则我们可以实现每隔1s,检测⼀下哪些连接超时了,然后将超时的连接释放掉。
用定时器设置超时时间,启动定时器,然后每隔一次超时时间把所有的连接来检测一遍,看看有哪些连接非活跃超时了,将这些连接释放掉
3-4-2、时间轮思想
上述的例⼦,存在⼀个很⼤的问题,每次超时都要将所有的连接遍历⼀遍,如果有上万个连接,效率⽆疑是较为低下的。
这时候⼤家就会想到,我们可以针对所有的连接,根据每个连接最近⼀次通信的系统时间建⽴⼀个⼩根堆,这样只需要每次针对堆顶部分的连接逐个释放,直到没有超时的连接为⽌,这样也可以⼤⼤提⾼处理的效率。
上述⽅法可以实现定时任务,但是这⾥给⼤家介绍另⼀种⽅案:时间轮
。时间轮的思想来源于钟表,如果我们定了⼀个3点钟的闹铃,则当时针⾛到3的时候,就代表时间到了。
同样的道理,如果我们定义了⼀个数组,并且有⼀个指针,指向数组起始位置,这个指针每秒钟向后⾛动⼀步,⾛到哪⾥,则代表哪⾥的任务该被执⾏了,那么如果我们想要定⼀个3s后的任务,则只需要将任务添加到tick+3位置,则每秒中⾛⼀步,三秒钟后tick⾛到对应位置,这时候执⾏对应位置的任务即可。
问题1:超时时间很长怎么办?
假如我们的超时时间为一天,我们是不是要定义一个 24 * 60 * 60s 的数组?
解决方案:
我们可以将时间轮分级,即分别定义秒级时间轮、分级时间轮以及时级时间轮
多级时间轮设计思想:
后续还可以加上月,年等等级别时间轮
问题2:同一时刻需要添加多个定时任务怎么办?
解决方案:
将时间轮的一维数组设计成为二维数组【每一级的时间轮中,每一个节点也是一个时间轮】
需要添加定时任务时直接 timerwheel[i].push_back(task()) 就行了
问题3:如何延迟定时任务?
假设当一个连接 30s 内没有IO事件发生,我们就认为它是非活跃连接,需要执行其定时任务进行销毁,那么如果该连接在定时任务添加后的29.99秒时进行了 IO,此时我们应该将它的定时时间重新设置为 30s,那么如何做到呢?
解决方案:
类的析构 + shared_ptr智能指针1.使用一个类,对定时任务进行封装,类实例化的每一个对象,就是一个定时任务对象, 当对象被销毁的时候,再去执行定时任务(将定时任务的执行,放到析构函数中)2.shared_ptr用于对new的对象进行空间管理,当shared_ptr对一个对象进行管理的时候, 内部有一个计数器,计数器为0的时候,则释放所管理的对象int *a = new int;std:shared_ptrpi(a);--a对象只有在pi计数为O的时候,才会被释放std:shared_ptrpi1(pi)--当针对pi又构建了一个shared_ptr对象,则pi和pi1计数器为2当pi和pi1中任意一个被释放的时候,只是计数器-1,因此他们管理的a对象并没有被释放,只有当pi和pi1都被释放了,计数器为0了,这时候才会释放管理的a对象
所以我们可以每次连接有 IO 事件发生时,就创建一个该连接对应的管理定时任务对象的智能指针,并将其添加到定时任务中,此时即使该连接最开始添加的定时任务时间到了也只是shared_ptr的计数减一,其对应的定时任务对象并不会被释放,那么对象的析构函数也不会被执行,真正的定时任务也不会被执行了,只有shared_ptr的计数为0时,才真正执行析构,然后执行定时任务
注意:对于一个定时任务对象,我们直接使用它来构造智能指针,则管理该对象的其他智能指针的引用计数并不会增加,只有当使用该对象的智能指针来构造新的智能指针时,则智能指针相互的引用计数才会增加
为了解决这个问题,我们需要使用 weak_ptr来管理原始的定时任务对象资源,然后再使用 weak_ptr 来构造 shared_ptr (weak_ptr.lock()) 用于加入时间轮中,保证 shared_ptr 的引用计数增加,最后再释放 weak_ptr 即可。(weak_ptr 本身不会导致 shared_ptr 的引用计数增加)
3-4-3、时间轮实现
#include #include #include #include #include #include using TaskFunc = std::function<void()>;using ReleaseFunc = std::function<void()>;// 定时任务//class TimerTask{public: TimerTask(uint64_t id, uint32_t timeout, const TaskFunc &cb) : _id(id), _timeout(timeout), _task_cb(cb), _canceled(false) { } ~TimerTask() { if (_canceled == false) { _task_cb(); // 执行我们的定时任务 } _release(); } void Cancel() { _canceled = true; } /*设置release_cb回调函数*/ void SetRelease(const ReleaseFunc &release_cb) { _release = release_cb; } uint32_t Timeout() { return _timeout; } // 给类外知道定时任务的超时时间private: uint64_t _id; // 定时器任务对象id,保证唯一性,我们可能在多线程中使用定时器,所以要统一分配 uint32_t _timeout; // 定时任务超时时间 bool _canceled; // false表示不取消任务,true表示取消任务【取消任务——任务直接不执行了】 TaskFunc _task_cb; // 定时器对象要执行的定时任务 ReleaseFunc _release; // 用于删除TimerWheel对象中保存的定时器信息};// 定时任务//// 时间轮//class TimerWheel{public: TimerWheel() : _capacity(60), _tick(0), _wheel(_capacity) { } ~TimerWheel() {} // 添加定时器任务 void TimerAdd(uint64_t id, uint32_t timeout, const TaskFunc &cb) { PtrTask pt(new TimerTask(id, timeout, cb)); // new一个定时对象出来 // 这里不能直接pt->SetRelease(RemoveTimer),因为类内成员函数默认第一个参数是this指针 // RemoveTimer是TimerWheel类的非静态成员函数,必须通过类名作用域(TimerWheel::)访问。直接写RemoveTimer会被编译器误认为是全局函数或当前作用域的自由函数,导致“未声明的标识符”错误。 // 在 C++ 中,成员函数的地址需要通过 & 来获取。这是因为成员函数与普通函数不同,它需要绑定到一个具体的对象实例上才能被调用。& 用于获取成员函数的地址,以便将其作为参数传递给 std::bind // 如果不加 &,编译器会将 TimerWheel::RemoveTimer 视为一个函数调用,而不是函数地址,从而导致语法错误 pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id)); int pos = (_tick + timeout) % _capacity; // 如果_tick + timeout越界了,那么就要像环形一样,从头再开始 _wheel[pos].push_back(pt); // 在时间轮_wheel中找到pos位置,然后计数加一(也就是将获取到的shared_ptr插入进_wheel) _timers[id] = WeakTask(pt); // 不能使用shared_ptr,否则永远有一个智能指针指向对象,计数永远不为0 } // 刷新/延迟定时器任务 void TimerRefresh(uint64_t id) { // 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来,添加到时间轮中 auto it = _timers.find(id); // 迭代器遍历查找 if (it == _timers.end()) { perror(\"定时任务不存在!\"); // 该定时任务没有 return; } // it是一个哈希表(map),第二个成员就是weak_ptr // 而weak_ptr中的lock()函数就是用来获取weak_ptr中管理的对象对应的shared_ptr PtrTask pt = it->second.lock(); // 这里的pt就是我们找到的定时任务 int timeout = pt->Timeout(); // 获取初始延时时间 int pos = (_tick + timeout) % _capacity; // 如果_tick + timeout越界了,那么就要像环形一样,从头再开始 _wheel[pos].push_back(pt); // 在时间轮_wheel中找到pos位置,然后计数加一(也就是将获取到的shared_ptr插入进_wheel) } // 取消定时任务 void TimerCancel(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { perror(\"定时任务不存在!\"); return; } PtrTask pt = it->second.lock(); // 这里的pt就是我们找到的定时任务 if (pt) { pt->Cancel();//取消任务 } } // 这个函数应该每秒钟被执行一次,相当于秒针_tick向后走了一步 void RunTimerTask() { _tick = (_tick + 1) % _capacity; _wheel[_tick].clear(); // 清空数组的指定位置,就会将该位置保存的所有管理的定时器shared_ptr释放掉 }private: // 这里的RemoveTimer是一个回调函数,内部有this指针 // SetRelease回调函数,从unordered_map中将定时任务信息移除 void RemoveTimer(uint64_t id) // 每个定时任务结束后,删除对应的定时器任务对象id { auto it = _timers.find(id); // 迭代器遍历查找 if (it != _timers.end()) { _timers.erase(id); } }private: using WeakTask = std::weak_ptr<TimerTask>; using PtrTask = std::shared_ptr<TimerTask>; //_tick和_capacity要放到_wheel上面,因为初始化列表不决定初始化顺序,由声明的地方决定的 int _tick; // 当前的秒针,走到哪里就是释放哪里,释放哪里就是,执行哪里的定时任务 int _capacity; // 表盘最大数量——最大的延迟时间 std::unordered_map<uint64_t, WeakTask> _timers; // 定时器任务id与管理定时任务对象的weak_ptr之间的关联关系,简单来说(uint64_t是id,WeakTask是计数器) std::vector<std::vector<PtrTask>> _wheel; // 时间轮};// 时间轮//// 测试代码//class Test{public: Test() { std::cout << \"构造\" << std::endl; }; ~Test() { std::cout << \"析构\" << std::endl; };};void DelTest(Test *t){ delete t;}int main(){ TimerWheel tw; Test *t = new Test(); tw.TimerAdd(888, 5, std::bind(DelTest, t)); for (int i = 0; i < 5; ++i) { sleep(1); tw.TimerRefresh(888); // 刷新定时任务 tw.RunTimerTask(); // 向后移动秒针 std::cout << \"刷新了一下定时任务,重新需要5s之后才会销毁\" << std::endl; } tw.TimerCancel(888);//取消定时任务 while (1) // 秒针一直在向后走,但是上面不在刷新了,就执行定时任务 { sleep(1); std::cout << \"--------------------------------------\" << std::endl; tw.RunTimerTask(); // 向后移动秒针 } return 0;}// 测试代码//
3-5、正则库regex
了解正则库之前先要了解正则表达式
正则表达式(regular expression)描述了⼀种字符串匹配的模式(pattern),可以⽤来检查⼀个串是否含有某种⼦串、将匹配的⼦串替换或者从某个串中取出符合某个条件的⼦串等。
正则表达式就是字符串的匹配规则而正则库就是给我们提供一些接口,来让我们自己实现我们所需要的正则表达式
由于我们要实现的是一个带有应用层协议 (HTTP) 支持的服务器组件,因此必然会涉及到对 HTTP 请求的解析,比如我们接收到了这样的一个 HTTP 请求:
GET /login?user=zhang&pass=123 HTTP/1.1\\r\\n
那么我们需要从 HTTP 请求中提取出以下的信息:
GET —— 请求方法,我们需要针对不同的请求方法执行不同的操作/login —— 请求URL,我们需要知道客户端想要访问哪里的数据user=zhang&pass=123 —— 请求数据HTTP/1.1 —— 协议版本
如果要我们自己遍历请求字符串提取出上述的信息无疑是非常麻烦的,为了简化操作,我们可以使用正则表达式来提取字符串中的特定数据。也就是说,正则表达式就是基于某种字符串匹配规则来提取字符串中的特定数据。
正则表达式的使⽤,可以使得HTTP请求的解析更加简单(这⾥指的时程序员的⼯作变得的简单,这并不代表处理效率会变⾼,实际上效率上是低于直接的字符串处理的),使我们实现的HTTP组件库使⽤起来更加灵活
而C++给我们准备了正则库,也就是regex
3-5-1、regex简介
regex文档
regex有许多接口函数,我们只需要会使用其中的一些函数就行。比如regex_match
//regex_match 函数介绍bool regex_match(const std::string& src, std::smatch& matches, std::regex& e);//regex_match函数有3个参数src: 用于匹配的原始字符串;matches: 正则表达式可以从原始字符串中匹配并提取符合某种规则的数据,提取的数据就放在matches中是一个类似于数组的容器e: 正则表达式的匹配规则;返回值:匹配成功返回true,匹配失败返回false
#include #include #include int main(){ std::string str = \"/numbers/1234\"; // 正则表达式规则 // 要匹配数字,就要用\\d,字符串要使用\\就要用\\\\ // +表示前面这个字符出现一次或者多次。比如【没有+,可能1234匹配一个1就直接结束了,而有了+可以匹配1234....】 // 要提取出数据,就要用() // 匹配以/numbers/为起始,后面跟了一个或者多个数字字符的字符串,并且在匹配过程中提取这个匹配的数字字符串 std::regex e(\"/numbers/(\\\\d+)\"); std::smatch matches; // 类似于容器 bool ret = std::regex_match(str, matches, e); if (ret == false) { return -1; } for (auto &e : matches)//注意,matches会先将原始字符串str给保存一遍,然后是正则表达式匹配的字符串 { std::cout << e << std::endl; } return 0;}
从输出结果我们可以发现,smatch 会先将原字符串进行保存,然后才会保存正则匹配的结果。
3-5-2、正则表达式规则
正则表达式速查表
常见正则表达式规则包括以下几种基本规则和特殊字符:
字符匹配规则:
. :匹配除换行符以外的任意单个字符。\\d:匹配一个数字字符。等价于 [0-9]。\\D:匹配一个非数字字符。等价于 [^0-9]。\\w:匹配字母或数字字符(等价于 [a-zA-Z0-9_])。\\W:匹配非字母数字字符(等价于 [^a-zA-Z0-9_])。\\s:匹配任何空白字符,包括空格、制表符、换页符等等。\\S:匹配任何非空白字符。
边界匹配规则:
^:匹配输入字符串的开始位置。$:匹配输入字符串的结束位置。\\b:匹配一个单词边界,即单词和空格间的位置。\\B:匹配非单词边界。
量词:
*:匹配前面的子表达式零次或多次。+:匹配前面的子表达式一次或多次。?:匹配前面的子表达式零次或一次。{n}:n是一个非负整数。匹配确定的n次。{n,}:n是一个非负整数。至少匹配n次。{n,m}:n和m均为非负整数,其中n<=m。最少匹配n次且最多匹配m次。
选择、分组和前瞻:
|:A|B,匹配A或B。():标记一个子表达式的开始和结束位置。用于捕获分组或设定优先级。(?:expression):非捕获分组,不捕获匹配的文本,也不作为一组返回。(?=expression):正向零宽断言,断言此位置后面能匹配expression,但不消耗任何字符。(?!expression):负向零宽断言,断言此位置后面不能匹配expression。
特殊字符:
[]:字符集合。匹配方括号内的任何一个字符。[^]:否定字符集合。匹配不在方括号内的任何一个字符。{}:量词括号。用于确定前面的子表达式的重复次数。
3-5-3、正则库对于HTTP请求行的解析
#include #include #include int main(){ // HTTP请求行格式 : GET /study/login?user=zhangsan&pass=123123 HTTP/1.1\\r\\n //---注意:下面注释版本的代码不支持解析str末尾有\\r\\n,如果要使用1~4版本代码,str的末尾\\r\\n要手动去除 std::string str = \"GET /study/login?user=zhangsan&pass=123123 HTTP/1.1\\r\\n\"; std::smatch matches; // 1、请求方法提取 // 请求方法的匹配 : GET,POST,HEAD,PUT,DELETE... // () 提取出匹配的内容 // | 表示或者,可以用于多种匹配 // . 匹配除换行符以外的任意单个字符 // * 匹配前面的子表达式零次或多次,所以末尾.*表示提取内容后面的内容也就是GET之后不处理(不提取打印出来) // 所以我们的str字符串结尾没有\\r\\n,因为.*最后不会匹配\\r\\n,我们后面再处理 // std::regex e(\"(GET|POST|HEAD|PUT|DELETE).*\"); // 2、资源路径提取 // [^?] 匹配非?字符,*0次或多次 // 所以末尾.*表示提取内容后面的内容也就是?user之后不处理(不提取打印出来) // ------注意 :GET /study/这之间有空格,所以下面的DELETE) ([^?]之间也要有空格,不然打印的时候资源路径前面也有空格------- // std::regex e(\"(GET|POST|HEAD|PUT|DELETE) ([^?]*).*\"); // 3、查询字符串提取 // ?要单独处理,\\\\?的第一个\\取消第二个\\的转义字符作用,表示匹配?字符 // \\\\?(.*) .* 提取?以及后面的内容,直到遇到空格截止 // std::regex e(\"(GET|POST|HEAD|PUT|DELETE) ([^?]*)\\\\?(.*) .*\"); // 4、提取协议版本 // \\\\. 使用原始的. // [012] 匹配中括号里面的任意【一个】字符。(HTTP/1\\\\.[012]) 表示HTTP/1.0或者1或者2 // std::regex e(\"(GET|POST|HEAD|PUT|DELETE) ([^?]*)\\\\?(.*) (HTTP/1\\\\.[012])\"); // 5、末尾内容的处理【\\r\\n或者\\n或者什么都没有】 // 第一[]只能获取一个字符,不满足所有情况 // 第二(\\r\\n|\\n)可以解析,但是两种打印结果都是最后有两行空行 // 我们想要的是,末尾\\r\\n或者\\n等内容能够被解析出来,但是不起任何效果!相当于把末尾给无视了! // (?:.......) 表示匹配某个格式字符串,但是不提取!本来()表示提取,但是我们不想提取,所以说就有了(?:) // (?:\\r\\n|\\n) 如果末尾是\\r\\n或者\\n,那么不仅能够正确解析,而且提取出来不具有任何效果。但是如果末尾没有内容,那么解析将会失败 // 末尾的?表示前面的表达式匹配【0次】或者【1次】。这就解决了我们str末尾没有内容,但是解析还是正确 //std::regex e(\"(GET|POST|HEAD|PUT|DELETE) ([^?]*)\\\\?(.*) (HTTP/1\\\\.[012])(?:\\r\\n|\\n)?\"); // 6、HTTP行请求中间没有【查询字符串】GET /study/login HTTP/1.1\\r\\n // 所以\\\\?(.*)这个字符串可以有,也可以没有! // (?:\\\\?(.*))? 表示(\\\\?(.*))的内容可以被解析,但是不提取,而且最后的?表示前面的\\\\?(.*)匹配了0次或者1次,没有也能解析成功! std::regex e(\"(GET|POST|HEAD|PUT|DELETE) ([^?]*)(?:\\\\?(.*))? (HTTP/1\\\\.[012])(?:\\r\\n|\\n)?\"); bool ret = std::regex_match(str, matches, e); if (ret == false) { return -1; } for (auto &s : matches) { std::cout << s << std::endl; } return 0;}
3-6、通⽤类型any类型
我们的项目是要实现一个高并发的服务器组件,能够接收并且处理客户端发送过来的请求,就肯定会涉及到与客户端的通信,而通信就肯定会涉及到套接字的操作。同时,由于 TCP 是面向字节流的,因此服务器在接收客户端数据的时候就可能会出现socket中的数据不是一条完整请求,此时,我们想要请求处理到一半时就需要停下来等待 socket 中下一次的数据到来。
因此我们需要为客户端连接设置一个请求处理的上下文,用来保存请求接收、解析以及处理的状态,它决定着对于下一次从缓冲区中取出的数据从哪里开始,如何进行处理等操作
同时,对于一条完整的请求,我们还需要对其进行解析,得到各种关键的要素,比如 HTTP 请求中的请求方法、请求URL、HTTP版本等,这些信息都会被保存在请求处理上下文中。
那么我们应该如何保存请求接收、解析以及处理的各种状态信息呢?定义一个HTTP请求信息的结构用于填充吗?
如果我们的服务器组件只支持HTTP协议这样做是可以的。但我们设计的服务器的目标是要能够支持各种不同的应用层协议,以便于我们组件的使用者能够根据自己不同的业务场景定制对应的应用层协议进行使用
因此,我们的项目要能够支持不同的应用层协议,那么我们的上下文就不是一种类型数据的上下文,有不同的类型,我们就需要让这个结构能够保存不同类型的数据,此时就需要any出场了。
通用容器 any 是一种能够存储任何类型数据的容器
在C语言中,通用类型可以使用 void* 来管理。在C++中,C++17STL库中提供了可直接使用的 any 类,但由于 any类的实现并不复杂,同时考虑到代码的移植性,尽量减少第三方库的依赖,所以这里我们选择自己手动实现一个 any 类。
3-6-1、设计思想
1、Any类设置为一个模板类
这是不行的!在编译的时候Any a或者Any b等等需要传递类型作为模板参数,也就是说在使用的时候就要确定其类型,这是行不通的,因为保存在Content中的协议上下⽂,我们在定义any对象的时候是不知道他们的协议类型的,因此⽆法传递类型作为模板参数
template<class T>class Any{private:T _content;};//使用时要明确类型,我的要求是能存储不同类型数据,你都指明了,那岂不是多此一举,什么用都没有Any<int> a1;Any<double> a2;
2、嵌套一下,再设计一个类,专门用于保存其他类型的数据,Any类保存该固定类的对象
这样也不行,因为在Any类中⽆法定义这个holder对象或指针,因为any也不知道这个类要保存什么类型的数据,因此⽆法传递类型参数
template<class T>class Any{ private: template<class T> class hodler { T _val; }; hodler<T> _content;//还是要指明T是什么类型};
3、多态
所以,定义⼀个父类placehoder,让子类holder继承于placeholde,⽽Any类保存⽗类指针即可,当需要保存数据时,则new⼀个带有模板参数的⼦类holder对象出来保存数据,然后让Any类中的⽗类指针指向这个⼦类对象就搞定了,让父类指针访问子类对象的各个接口和成员。运行时多态,父类指针指向子类对象,调用的是子类的虚函数
class Any{private: class hodler { // ...... }; template<class T> class placehodler : public hodler { T_ val; }; hodler *_content;};Any any;// 定义any时不需要指定any的类型// 需要存储特定数据类型时,直接new该类型对应的placehodler对象,然后将其交由any中的父类指针_content管理placehodler<int> *h1 = new placehodler;any._content = h1;placehodler<double> *h2 = new placehodler;any._content = h2;
3-6-2、Any类的实现
#include #include #include #include class Any // Any类不能直接保存HTTP等协议的内容,要能保存多种协议上下文{private: class holder { public: virtual ~holder() {} virtual const std::type_info &type() = 0; // 获取子类对象中数据的类型,纯虚函数 virtual holder *clone() = 0; // 克隆函数,通过当前对象自身,来克隆出一个新的子类对象 }; template <class T> class placeholder : public holder { public: placeholder(const T &val) // 构造函数,可能传入任意类型数据 : _val(val) { } // placeholder的析构没有要析构的内容,可以不用实现 // 获取子类对象中数据的类型 // std::type_info 是一个类,用于表示类型的信息 virtual const std::type_info &type() { return typeid(T); } // 克隆函数,通过当前对象自身,来克隆出一个新的子类对象 virtual holder *clone() { // 虽然 placeholder 是一个模板类,但 clone() 是一个虚函数,它的实现是在类定义时确定的。当编译器实例化 placeholder 时,clone() 的具体实现会被确定为: // return new placeholder(_val); // 即使代码中没有显式写出 ,编译器也能根据上下文推断出 placeholder 的类型参数。这是因为 clone() 是在模板类 placeholder 的上下文中定义的,编译器知道 T 是什么。 // 虚函数(如 clone())的实现是在类定义时确定的,编译器可以根据上下文推断出模板参数,因此不需要显式指定 // 但是不能再clone函数里面加上,会破坏模板类的实例化机制。因为 clone() 是虚函数,它的实现必须与基类的声明一致,而基类并不知道模板参数 T。因此,显式指定 会导致编译器无法正确解析 // holder只有一个_content父类指针!而T _val是子类的! return new placeholder(_val); } public: T _val; // 保存任意类型数据的变量 }; holder *_content; // 父类指针public: // 构造函数 Any() : _content(nullptr) { } template <class T> Any(const T &val) // 传入一个任意类型数据进行构造容器 : _content(new placeholder<T>(val)) { } Any(const Any &other) // 通过其他容器,构造一个新的容器出来 // 不能直接拷贝赋值指针,不然释放时就野指针了 : _content(other._content ? other._content->clone() : nullptr) { } // 析构函数 ~Any() { delete _content; // new出来的内容释放掉 } // 获取在子类对象中所保存的数据 —— 是任意一个类型 template <class T> T *get() // 返回子类对象保存数据的指针 { // 获取的数据类型和保存的数据类型不一样! // if (_content->type() != typeid(T)) // { // return nullptr; // } assert(_content->type() == typeid(T)); // 直接断言判断 // 想要获取的数据类型必须和保存的数据类型一致 return &((placeholder<T> *)_content)->_val; // 父类指针_content中没有_val,所以说要强转为子类指针,最后取出_val的地址 } Any &swap(Any &other) // 交换两个Any对象中的指针 { std::swap(_content, other._content); return *this; // 返回Any &是为了能连续赋值 } // 赋值运算符重载= —— 完成各种不同类型数据给通用容器赋值 template <class T> Any &operator=(const T &val) { // 通过val构造一个临时的Any同用容器,与当前通用容器进行指针交换,临时对象被释放,原先保存的数据也就被释放了 // 因为这是临时构建的Any对象,所以外部对象没有一点影响 Any(val).swap(*this); return *this; // 返回Any &是为了能连续赋值 } Any &operator=(const Any &other) // 通过其他类型通用容器进行赋值 { Any(other).swap(*this); return *this; }};// 测试代码class Test // 测试有没有内存泄漏{public: Test() { std::cout << \"构造\" << std::endl; } Test(const Test &t) { std::cout << \"拷贝构造\" << std::endl; } ~Test() { std::cout << \"析构\" << std::endl; }};int main(){ Any a; a = 10; int *pa = a.get<int>(); std::cout << *pa << std::endl; a = std::string(\"hello\"); std::string *ps = a.get<std::string>(); std::cout << *ps << std::endl; ////所以通用容器Any里面就可以保存各种不同类型的数据了 Any b; // 这里如果b在作用域外面就只会析构一次,因为b还没有结束生命周期呢 { // 作用域,t在作用域内声明的 // Any b; Test t; b = t; } while (1) sleep(1); return 0;}
3-6-3、C++库中的Any
C++官方Any文档
#include #include int main(){ // 官方Any的使用代码 std::any a; a = 10; int *pa = std::any_cast<int>(&a);//使用any_cast,然后取出any对象地址 std::cout << *pa << std::endl; a = std::string(\"hello\"); std::string * ps = std::any_cast<std::string>(&a); std::cout << *ps << std::endl;}
3-7、eventfd事件通知机制
eventfd是Linux中的一种事件通知机制,创建一个描述符用于实现事件通知。
eventfd本质就是在内核中管理一个计数器 (创建eventfd 就会在内核中创建一个计数器结构
)。
- 信号是进程间事件通知的一种方式,被通知的事件由进程中的哪一个线程来处理是不确定的;而eventfd可以用于线程间的事件通知
- 信号量是对事件进行通知【计数器+1】,然后wait被唤醒【计数器-1】;而eventfd是每次写入一个数字,写n次【计数器+n】,然后读取清零
- eventfd 是一种事件通知机制,而信号量是一种进程间同步机制
每当向evenfd中写入一个数值-用于表示事件通知次数可以使用read进行数据的读取,读取到的数据就是通知的次数,读取之后计数清零假设每次给eventfd中写入一个1,就表示通知了一次,连续写了三次之后,再去read读取出来的数字就是3
eventfd用处:在EventLoop模块中实现线程间的事件通知功能
eventfd 通常与 epoll 搭配使用,通过epoll进行描述符事件监控,当描述符有事件就绪时就读取 eventfd 中的值,然后进行事件处理。在本项目中,eventfd 的作用是在 EventLoop 模块中实现线程间的事件通知功能。
eventfd系统调用简介:#include //头文件int eventfd(unsigned int initval, int flags);功能 : 创建一个eventfd对象,实现事件通知initval: eventfd中计数器的初始值,通常设置为0;flags:设置描述符属性,通常设置EFD_CLOEXEC和EFD_NONBLOCK EFD_CLOEXEC -- 禁止进程复制 EFD_NONBLOCK -- 启动非阻塞属性 返回值:返回一个文件描述符用于操作 eventfd也是通过read/write/close进行操作的注意:read/write进行IO的时候数据只能是一个8字节数据【我们向eventfd中写入事件通知的次数时需要写入一个8字节的数字,我们从 eventfd 中读取数据时也需要用一个8字节的变量来保存】
#include #include #include #include #include int main(){ // EFD_CLOEXEC -- 禁止进程复制 // EFD_NONBLOCK -- 设置描述符非阻塞属性 int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (fd < 0) { perror(\"eventfd error\"); return 1; } // 每次向eventfd中写入1,表示一次事件通知 // 要为8字节数据,所以是uint64_t uint64_t val = 1; write(fd, &val, sizeof(val)); write(fd, &val, sizeof(val)); write(fd, &val, sizeof(val)); // 读取结果表示事件通知的次数,读取完毕后计数器清0 uint64_t res; read(fd, &res, sizeof(res)); printf(\"%ld\\n\", res); close(fd); return 0;}
四、SERVER服务器模块实现
- 注意:我们项目中因为许多模块是相互联系的,所以单个模块的代码实现可能不完整(需要后续模块提供的接口,内容等等)。会在后续项目实现中不断完善。并且会采用模块组合的方式来测试代码的各个功能(测试可能需要添加一些代码,但是无关紧要)。项目最后会给出完整代码。
4-1、Buffer模块【缓冲区】
设计思想:
最主要功能:存储数据,取出数据——所以服务端,客户端各有一个接收缓冲区和发送缓冲区
服务端/客户端要发送数据,先把数据放到该发送缓冲区中,然后一切就绪后通过socket套接字进行发送
接收数据也是同理,将对方socket套接字发送的数据先存到接收缓冲区中,然后一切就绪后从接收缓冲区进行数据提取
如果socket通信时,接收端socket已经满了,发送端不能继续写了,等到接收端将当前socket中的内容拿出来存放到接收端的接收缓冲区之后,发送端才能继续通过socket向接收端写入数据
实现细节:
- 缓冲区要有一块内存空间,这块内存空间要使用vector来保存数据,而不是string,因为string遇到’\\0’就结束了,但是网络数据传输中可能会存在’\\0’这样的数据
- 要确定好默认空间的大小,然后分别记录写入数据位置和读取数据位置(类似于双指针,通过数组起始地址+读写偏移量得到),读到哪,写到哪,对应的指针就指向那个地方,然后向后移动
写入数据操作:
如果写入数据位置到数组末尾的空间不够下次写入数据:
1、空间足够:如果写入数据位置到数组末尾的空间 + 数组起始到读取数据位置的空间大于等于本次写入数据,那么将目前数组的内容全部移动到数组起始位置,这样写入数据位置到数组末尾的空间就足够了
2、空间不够:如果写入数据位置之后 + 读取数据位置之前的空间不够本次写入数据了,就从当前写入数据位置对vector进行扩容
3、数据成功写完之后,写入数据指针向后移动到对应的位置
读取数据操作:
1、有数据读取,那么读取数据指针指向哪里,就从哪里开始读
2、可读数据大小 = 当前写入数据位置 - 当前读取数据位置
3、数据成功读完之后,读取数据指针向后移动到对应的位置
注意:
1、这里的写入数据位置和读取数据位置是针对数组起始位置而言的,是一个相对偏移量,数组起始地址加上读或者写偏移量才是真正的读取数据和写入数据的地址
2、我们写入数据操作,不管空间足够(把缓冲区剩余数据移动到数组开头),还是不足够(缓冲区数组扩容),都是要保证写数据位置到数组末尾的空间足够大!能够写完该次的数据
代码实现:
注意点 : string st;st.c_str() -> 这样子拿到的是带有const类型的c类型地址【const char*】所以需要&st[0] -> 拿到字符串起始地址【char*】要么(char*)st.c_str()强转一下,要么使用的地方使用const char*
server.hpp
#include #include #include #include #include #define BUFFER_DEFAULT_SIZE 1024 // 缓冲区默认空间大小class Buffer{public: Buffer() : _reader_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) { } // 获取空间起始地址 char *Begin() { &*_buffer.begin(); //_buffer.begin()是起始数据,*之后拿到起始数据,然后&得到首元素地址也就是数组地址 } // 获取当前写入起始地址 char *WritePosition() //_buffer空间的起始地址 + 写偏移量 { return Begin() + _writer_idx; } // 获取当前读取起始地址 char *ReadPosition() //_buffer空间的起始地址 + 读偏移量 { return Begin() + _reader_idx; } // 获取写指针位置到数组末尾空闲空间大小--写偏移之后的空闲空间 uint64_t TailIdleSize() // 总体空间大小 - 写偏移量 { return _buffer.size() - _writer_idx; } // 获取读指针位置到数组起始空闲空间大小--读偏移之前的空闲空间 uint64_t HeadIdleSize() // 就是读偏移量的值,因为读偏移量就表示前面数据已经被读取走了,空间空出来了 { return _reader_idx; } // 获取可读数据大小 uint64_t ReadAbleSize() // 写偏移量 - 读偏移量 { return _writer_idx - _reader_idx; } // 将读偏移向后移动 void MoveReadOffset(uint64_t len) { assert(len <= ReadAbleSize()); // 读取数据内容的空间len不能超过可读数据大小,不然就是越界 _reader_idx += len; } // 将写偏移向后移动 void MoveWriteOffset(uint64_t len) { // 写数据内容的空间大小必须小于写数据位置到数组末尾空间的大小 assert(len <= TailIdleSize()); _writer_idx += len; } // 确保可写空间足够(整体空闲空间够了就移动数据,否则就扩容) void EnsureWriteSpace(uint64_t len) { // 如果末尾空间足够,直接返回 if (len <= TailIdleSize()) { return; } // 写位置到末尾空间 + 读位置到起始空间 是否足够 if (len <= TailIdleSize() + HeadIdleSize()) { // 空间足够,就将数据移动到数组起始位置 uint64_t rsz = ReadAbleSize(); // 把可读数据大小保存起来 // copy函数 :将第一个参数到第二个参数的数据,拷贝到第三个参数的位置 // 可读数据 = 读位置~读位置+可读数据大小,移动到起始位置 std::copy(ReadPosition(), ReadPosition() + rsz, Begin()); _reader_idx = 0; // 读偏移量置0 _writer_idx = rsz; // 写偏移量就是可读数据大小,继续接着写嘛 } else // 总体空间不够,不移动数据,在写偏移之后扩容足够空间即可 { _buffer.resize(_writer_idx + len); // resize改变空间大小 } } // 向缓冲区写入数据 void Write(const void *data, uint64_t len) { // 1、确保缓冲区有足够的空间 EnsureWriteSpace(len); const char *d = (const char *)data; // data是void*,而WritePosition()获取的是char*地址,copy的函数模版不匹配,要转换 // 2、拷贝数据到缓冲区内部 std::copy(d, d + len, WritePosition()); } // 向缓冲区写入char数据,然后写偏移量偏移 void WriteAndPush(const void *data, uint64_t len) { Write(data, len); MoveWriteOffset(len); } // 将string数据写入缓冲区 void WriteString(const std::string &data) { // 注意点 : // string st; // st.c_str() -> 这样子拿到的是带有const类型的c类型地址【const char*】 // 所以需要&st[0] -> 拿到字符串起始地址【char*】 // Write(data.c_str(), data.size()); // 这里.c_str()就不行了 Write(&data[0], data.size()); // 调用Write函数 } // 向缓冲区写入string数据,然后写偏移量偏移 void WriteStringAndPush(const std::string &data) { WriteString(data); //// std::cout << WritePosition() << std::endl;//打印写偏移量偏移之前位置的数据【有就表示我们写入成功了】 MoveWriteOffset(data.size()); //// std::cout << ReadAbleSize() << std::endl;//写入数据成功后,查看可读数据大小,判断写偏移量是否偏移成功 } // 将buffer数据写入缓冲区 void WriteBuffer(Buffer &data) { // Buffer &data前面不能加const,因为data.ReadPosition()返回一个char*类型,这就是const转非const Write(data.ReadPosition(), data.ReadAbleSize()); } // 向缓冲区写入buffer数据,然后写偏移量偏移【这里可以使用构造函数实现】 void WriteBufferAndPush(Buffer &data) // 这里也不能加上const,还是因为加上了就是const转非const { WriteBuffer(data); MoveWriteOffset(data.ReadAbleSize()); // 偏移可读数据大小 } // 向缓冲区读取数据 void Read(void *buf, uint64_t len) { // 要求读取的数据大小len必须小于可读数据大小 assert(len <= ReadAbleSize()); ////std::cout << \"---\" << ReadPosition() << std::endl; // 查看是否有数据 std::copy(ReadPosition(), ReadPosition() + len, (char *)buf); // 转成char*,不然类型不匹配函数模版识别不了 } // 读取char类型数据,然后读偏移量要进行偏移 void ReadAndPop(void *buf, uint64_t len) { Read(buf, len); MoveReadOffset(len); } // 向缓冲区读取string数据 std::string ReadAsString(uint64_t len) { assert(len <= ReadAbleSize()); std::string str; str.resize(len); Read(&str[0], len); // 调用Read函数就行 ////std::cout << \"***\" << str << std::endl;//查看str里面是否有数据 return str; } // 读取string类型数据,然后读偏移量要进行偏移 std::string ReadAsStringAndPop(uint64_t len) { assert(len <= ReadAbleSize()); std::string str = ReadAsString(len); ////std::cout << \"+++\" << str << std::endl;//查看str里面是否有数据 MoveReadOffset(len); return str; } /////对HTTP请求行的一些操作///// // 查找请求行结束符\\r\\n char *FindCRLF() // 不能用string找,因为请求行内容可能有\'\\0\' { // memchr在一块空间查找【某一个字节!】数据 // 参数1 : 空间起始地址 // 参数2 : 查找的字节 // 参数3 : 查找的范围 void *res = memchr(ReadPosition(), \'\\n\', ReadAbleSize()); return (char *)res; } // 从缓冲区中取出一行HTTP请求的数据 std::string GetLine() // 通常情况下获取一行数据,一般是ascll值,不太可能有\'\\0\',所以可以用string,但是上面我们直接去查找是会遇到的 { char *pos = FindCRLF(); if (pos == nullptr) // 没有找到\'\\n\' { return \"\"; } // 这里+1表示我们把pos位置的内容,也就是\'\\n\'也提取出来了 // 如果这次读取数据遇到pos位置的换行\'\\n\'了,不取出来下一次读取开始遇到的就是\'\\n\' return ReadAsString(pos - ReadPosition() + 1); } // 从缓冲区中取出一行HTTP请求的数据,读偏移量偏移 //注意,我们这里是遇到\'\\n\'表示有一行完整的HTTP请求数据,然后提取 //如果我们缓冲区满了,但是没有\'\\n\',表示这不是一条完整的数据,我们可以读取出来,也可以不读取【我们这里是直接不读取】 std::string GetLineAndPop() { std::string str = GetLine(); MoveReadOffset(str.size()); return str; } // 清空缓冲区 void Clear() { // 只需要将两个偏移量归0即可 // 注意这里只是偏移量全部回到数组起始位置,数组里面还是有之前数据的! _writer_idx = _reader_idx = 0; }private: std::vector<char> _buffer; // 内存空间,使用vector进行内存空间管理 uint64_t _reader_idx; // 读偏移量 uint64_t _writer_idx; // 写偏移量};
Makefile文件:
main:main.cc server.hppg++ -std=c++17 $^ -o $@
测试代码:
#include \"server.hpp\"int main(){ Buffer buf; size_t count = 0; for (size_t i = 0; i < 300; ++i) { std::string str = \"hello!!\" + std::to_string(i) + \'\\n\'; buf.WriteStringAndPush(str); // 将str一个个的写入到缓冲区里面 ++count; } while (buf.ReadAbleSize() > 0) { std::string line = buf.GetLineAndPop(); std::cout << line << std::endl; } // std::string tmp; // tmp = buf.ReadAsStringAndPop(buf.ReadAbleSize()); // std::cout << tmp << \"count : \" << count << std::endl; // // buf.WriteStringAndPush(str); // // Buffer buf1; // // buf1.WriteBufferAndPush(buf);//buf数据给buf1 // // std::string tmp; // // tmp = buf.ReadAsStringAndPop(buf.ReadAbleSize()); // // std::string tmp1; // // tmp1 = buf1.ReadAsStringAndPop(buf1.ReadAbleSize()); // // std::cout<<tmp<<std::endl; // // std::cout<<buf.ReadAbleSize()<<std::endl; // // std::cout<<tmp1<<std::endl; // // std::cout<<buf1.ReadAbleSize()<<std::endl; // // buf.Clear(); // // buf1.Clear(); // // // std::cout<<buf.ReadPosition()<<std::endl; // // // std::cout<<buf1.ReadPosition()<<std::endl; return 0;}
然后make命令进行编译,测试接口:
下面注释的测试代码也是无误的,打印结果都符合预期
4-2、日志模块【打印日志】
这里没有用太复杂的方法,直接使用宏来完成(日志宏信息),方便我们后续调试程序,查看代码哪里有问题
方法比较粗糙,但是我们项目足够使用了
注意:
1、我们日志打印模块需要借助前面Buffer缓冲区模块代码来进行测试
2、后序我们会把日志宏放到server.hpp文件里面,方便后续使用,这里只是为了测试日志宏效果怎么样就放在这里了
#include \"server.hpp\"//// 日志打印宏【宏必须要在同一行,用\\来进行续写】//// 1、日志等级#define INF 0 // 普通#define DBG 1 // bug#define ERR 2 // 错误//什么时候应该打印哪些日志,由LOG_LEVEL这个宏的值来决定#define LOG_LEVEL DBG // 默认为DBG//// 2、日志打印//__FILE__ : 文件名 __LINE__ : 行号 __VA_ARGS__ : 允许宏接受任意数量的参数并在宏展开时将这些参数传递给宏的实现部分// #define LOG(msg) fprintf(stdout, \"[%s:%d] %s\", __FILE__,__LINE__,msg)// format : 格式// ... : 表示不定参数【参数个数不定】// #define LOG(format, ...) fprintf(stdout, \"[%s:%d]\" format \"\\n\", __FILE__, __LINE__, __VA_ARGS__)// 调用方法 : LOG(\"%s\",line.c_str());// 如果是LOG(\"hello\");这种只有一个参数调用方法,那么要使用##__VA_ARGS__// 注意 : ##__VA_ARGS__对于LOG(\"%s\", line.c_str());和LOG(\"hello!\");两种调用方法都支持// #define LOG(format,...) fprintf(stdout, \"[%s:%d]\" format\"\\n\", __FILE__, __LINE__, ##__VA_ARGS__)// 我们要想加上打印时间就又需要优化【最后项目进行超时判断也需要打印时间】// localtime函数 : 【得到一个时间结构体指针struct tm*,里面有年月日时分秒等时间数据】// 拿到年月日时分秒等信息之后,我们要把它组织成为特定的格式(如字符串等等),方便我们打印观察// strftime函数 : 【将时间信息按照指定格式,组织成为字符串,放到第一个参数中】。使用时记得预留一个字节空间存放\'\\0\'// %Y:%m:%d--H:%M:%A : 年月日--时分秒 。 fprintf中的tmp就是系统时间// 增加一个level等级#define LOG(level, format, ...) \\ do \\ { \\ if (level < LOG_LEVEL) \\ break; \\ time_t t = time(nullptr); \\ struct tm *ltm = localtime(&t); \\ char tmp[32] = {0}; \\ strftime(tmp, 31, \"%Y:%m:%d--H:%M:%A\", ltm); \\ fprintf(stdout, \"[%s %s:%d]\" format \"\\n\", tmp, __FILE__, __LINE__, ##__VA_ARGS__); \\ } while (0)//// 信息打印//使用不定参数不能用...#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)//// 到这里,我们代码随便添加调试代码,只需要更改LOG_LEVEL这个默认值,就可以决定我们的调试代码打印不打印int main(){ Buffer buf; size_t count = 0; for (size_t i = 0; i < 300; ++i) { std::string str = \"hello!!\" + std::to_string(i) + \'\\n\'; buf.WriteStringAndPush(str); // 将str一个个的写入到缓冲区里面 ++count; } while (buf.ReadAbleSize() > 0) { std::string line = buf.GetLineAndPop(); INF_LOG(\"%s\", line.c_str()); DBG_LOG(\"hello!\"); } return 0;}
4-3、Socket模块【套接字封装】
这里我们实现一个Socket类。本来原来的socket套接字有各种接口,是足够让我们使用的,但是太麻烦了,我们封装一个类出来,对套接字接口进行一下简单的封装,方便我们使用
设计思想:
除了上面图中的功能设计,还应该有一些其他的功能:
1、套接字选项 —— 开启地址端口重用
在 TCP协议中,一个连接bind一个地址与端口之后,一旦连接断开则会进入time_wait状态,此时连接不会立即释放,会继续占用地址和端口。本来这种方法是用来保护客户端的,但会造成我们服务器崩溃后不能立即重新启动,因此我们需要对服务端连接设置套接字选项,开启地址与端口复用
2、设置套接字阻塞属性 —— 设置为非阻塞
通过 recv/send 系统调用来读取与发送socket中的数据时,一般会直接将 socket 缓冲区读空或者写满,而由于套接字默认是阻塞的,因此如果缓冲区数据没有了【读不到】,或者缓冲区数据占满了【写不进】,这会导致我们的程序阻塞在 recv/send 函数这里,因此我们还需要为套接字设置非阻塞属性
代码实现:
套接字等级:grep -R \'SO_REUSEADDR\' /usr/include通过该命令找到套接字等级存放的文件位置,然后vim/vi打开文件,就能看到有什么套接字等级了
server.hpp :
//////------ Socket套接字封装模块 ------//////#define MAX_LISTEN 1024 // 同一时间最大并发连接数默认值class Socket{public: Socket() : _sockfd(-1) { } Socket(int fd) // 通过套接字描述符创建一个套接字【构建一个Scoket对象】 : _sockfd(fd) { } ~Socket() { Close(); } int Fd() { return _sockfd; } // 创建套接字 bool Create() { // int socket(int domain,int type,int protocol); // AF_INET : ipv4协议 SOCK_STREAM : 流式套接字 IPPROTO_TCP : 具体的协议编号 _sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);//前面两个参数确定了,第三参数可以为0 if (_sockfd < 0) { ERR_LOG(\"create socket failed!\"); return false; } return true; } // 绑定地址信息 bool Bind(const std::string &ip, uint16_t port) { struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); socklen_t len = sizeof(struct sockaddr_in); // int bind(int sockfd,struct sockaddr* addr,socklen_t len); int ret = bind(_sockfd, (struct sockaddr *)&addr, len); if (ret < 0) { ERR_LOG(\"bind address failed!\"); return false; } return true; } // 开始监听 // backlog : 同一时间最大并发连接数 // 注意 : backlog是限制同一时间的最大并发连接数,不是限制连接总数!!! bool Listen(int backlog = MAX_LISTEN) { // int listen(int backlog) int ret = listen(_sockfd, backlog); if (ret < 0) { ERR_LOG(\"socket listen failed\"); return false; } return true; } // 向服务器发起连接 bool Connect(const std::string &ip, uint16_t port) { //// 该函数代码与上面Bind一模一样,只需要把函数内的bind换成connect // int connect(int sockfd,struct sockaddr* addr,socklen_t len); struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); socklen_t len = sizeof(struct sockaddr_in); // int connect(int sockfd,struct sockaddr* addr,socklen_t len); int ret = connect(_sockfd, (struct sockaddr *)&addr, len); if (ret < 0) { ERR_LOG(\"connect server failed!\"); return false; } return true; } // 获取新连接 // 客户端连接到服务端之后,我们项目不关心是哪个客户端连接上来了,所以说客户端ip + port我们的Accept都不获取了 int Accept() // 这里不返回Soclet套接字,而是返回【套接字文件描述符】,是为了方便使用!因为我们获取了新连接套接字之后还是要提取出套接字文件描述符 { // int accept(int sockfd,struct sockaddr* addr,socklen_t* len); int newfd = accept(_sockfd, NULL, NULL); // 我们不关心客户端的ip+port等信息,直接置空 if (newfd < 0) { ERR_LOG(\"socket accept failed\"); return -1; } return newfd; } // 接收数据 // size_t 无符号整形,ssize_t有符号整形 // 设置flag=0是因为Recv默认是阻塞操作,我们可以将Recv设置为非阻塞操作,或者设置一些其他选项【默认为0就是阻塞操作】 // 设置ssize_t是为了更好地表示操作的结果,尤其是处理错误和特殊情况,比如返回负数值 ssize_t Recv(void *buf, size_t len, int flag = 0) { // ssize_t recv(int sockfd,void* buf,size_t len,int flag); int ret = recv(_sockfd, buf, len, flag); // recv返回值三种结果:1、小于0表示【出错】 等于0表示【连接断开了】 大于0表示【接收到数据了】 if (ret <= 0) { // EAGAIN : 当前socket的接收缓冲区没有数据了!在非阻塞情况下才有这个错误 // EINTR : 当前socket的阻塞等待,被信号打断了 if (errno == EAGAIN || errno == EINTR) { return 0; // 这次接收没有接收到数据,但是连接没有问题! } ERR_LOG(\"socket recv failed\"); return -1; } return ret; } // 非阻塞接收数据 ssize_t NonBlockRecv(void *buf, size_t len, int flag = 0) { return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT就是一个非阻塞标记,表示当前接收为非阻塞 } // 发送数据 ssize_t Send(const void *buf, size_t len, int flag = 0) { // ssize_t send(int sockfd,void* buf,size_t len,int flag); int ret = send(_sockfd, buf, len, flag); if (ret < 0) { ERR_LOG(\"socket send failed\"); return -1; } return ret; // ret就是实际发送的数据长度 } // 非阻塞发送数据 ssize_t NonBlockSend(void *buf, size_t len, int flag = 0) { return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT就是一个非阻塞标记,表示当前发送为非阻塞 } // 关闭套接字 void Close() { if (_sockfd != -1) { close(_sockfd); _sockfd = -1; } } ////下面创建客户端和服务端连接的两个函数是为了方便我们调用上面的函数接口 // 创建一个服务端连接 // 服务器就不需要绑定ip地址了,因为大多数服务器都是【绑定当前主机所有网卡对应的端口】! // block_flag表示是否启动非阻塞,false表示阻塞 bool CreateServer(uint16_t port, const std::string &ip = \"0.0.0.0\", bool block_flag = false) // 这里我们调一下参数顺序,给一个缺省值ip就行 { // 1、创建套接字 if (Create() == false) { return false; } // 2、设置非阻塞 if (block_flag == true) { NonBlock(); } // 3、绑定地址 if (Bind(ip, port) == false) { return false; } // 4、开始监听 if (Listen() == false) { return false; } // 5、启动地址重用 ReuseAddress(); return true; } // 创建一个客户端连接 bool CreateClient(const std::string &ip, uint16_t port) { // 服务端不用手动绑定ip【函数调用自动帮我们绑定了】,也不需要手动绑定port【不然会端口冲突,函数调用自动绑定了,只要端口唯一即可】 // 1、创建套接字 if (Create() == false) { return false; } // 2、直接连接服务器 if (Connect(ip, port) == false) { return false; } return true; } // 设置套接字选项 —— 开启地址端口重用 // setsockopt函数设置套接字选项 void ReuseAddress() { // int setsockopt(int fd,int level,int optname,void* val,int vallen) int val = 1; // SO_REUSEADDR : 地址重用 setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int)); val = 1; // SO_REUSEPORT :端口重用 setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int)); } // 设置套接字阻塞属性 —— 将套接字设置为非阻塞状态 void NonBlock() { // int fcntl(int fd, int cmd, ... /* arg */ ); //F_GETFL:获取文件描述符的当前标志(即文件状态标志)。//F_SETFL:设置文件描述符的标志。 int flag = fcntl(_sockfd, F_GETFL, 0); // 获取属性 fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);//O_NONBLOCK是一个标志,表示将文件描述符设置为非阻塞模式 }private: int _sockfd; // 只需要一个套接字描述符};
tcp_src.cc :
#include \"../source/server.hpp\"int main(){ Socket lst_sock; lst_sock.CreateServer(8080); while (1) { int newfd = lst_sock.Accept(); if (newfd < 0) { continue; } Socket cli_sock(newfd); char buf[1024] = {0}; int ret = cli_sock.Recv(buf, 1023); if(ret <0) { cli_sock.Close(); continue; } cli_sock.Send(buf,ret); cli_sock.Close(); } lst_sock.Close(); return 0;}
tcp_cli.cc :
#include \"../source/server.hpp\"int main(){ Socket cli_sock; cli_sock.CreateClient(\"127.0.0.1\",8080); std::string str = \"hello!\"; cli_sock.Send(str.c_str(),str.size()); char buf[1024]={0}; cli_sock.Recv(buf,1023); DBG_LOG(\"%s\",buf); return 0;}
makefile :
all:client serverclient:tcp_cli.ccg++ -std=c++17 $^ -o $@server:tcp_src.ccg++ -std=c++17 $^ -o $@.PHONY:cleanclean:rm -rf client server
测试代码就设置了一次连接,然后客户端直接退出了
4-4、Channel模块【描述符管理】
这个模块要与其他模块关联起来【上文2-2-2的模块关系图】,比如和eventloop,Poller等模块进行关联
设计思想:
-
需要处理的事件:可读,可写,挂断,错误,任意事件等,所以需要有对应事件个数的回调函数
-
事件处理的回调函数,一旦某个事件触发了,就调用对应的回调函数
因为Connection是对通信连接进行整体管理的一个模块,Channel模块是Connection模块的一个子模块。所以回调函数是由Connection模块设置给Channel模块的 -
为了保证线程安全,添加/修改/移除事件监控的操作需要放到Connection对象关联的EventLoop对应的线程中去执行
因为后边使用epoll进行事件监控,所以对描述监控事件的修改最后也必须通过Poller模块中的epoll相关函数来完成Poller 模块也是EventLoop的一个子模块EPOLLIN可读EPOLLOUT可写 EPOLLRDHUP连接断开EPOLLPRI优先数据EPOLLERR出错了EPOLLHUP挂断而以上的事件都是一个数值uint32_t进行保存要进行事件管理,就需要有一个uint32_t类型的成员保存当前需要监控的事件
-
虽然我们关闭了描述符,对某个事件不进行监控了,但是我们的描述符还是挂在epoll的红黑树上面,随时还可以启动描述符,所以需要Remove这样的接口将我们的描述符从epoll红黑树中移除掉
代码实现:
//////------ 描述符管理Channel模块 ------//////class Channel{ // 定义一个类型别名EventCallback,在下面用来创建变量,用于触发不同的回调函数 using EventCallback = std::function<void()>;public: Channel(int fd) : _fd(fd), _events(0), _revents(0) { } ~Channel() { } int Fd() { return _fd; } // 获取想要监控的事件 uint32_t Events() { return _events; } // 设置触发事件 void SetRevents(uint32_t events) // 设置实际的就绪事件 { _revents = events; } // 设置可读事件回调 void SetReadCallback(const EventCallback &cb) { _read_callback = cb; } // 设置可写事件回调 void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; } // 设置错误事件回调 void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; } // 设置连接断开事件回调 void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; } // 设置任意事件回调 void SetEventCallback(const EventCallback &cb) { _event_callback = cb; } // 是否监控可读 bool ReadAble() { return (_events & EPOLLIN); } // 是否监控可写 bool WriteAble() { return (_events & EPOLLOUT); } // 启动读事件监控 void EnableRead() { _events |= EPOLLIN; //......后面会添加到EventLoop的事件监控中 } // 启动写事件监控 void EnableWrite() { _events |= EPOLLOUT; //......后面会添加到EventLoop的事件监控中 } // 关闭读事件监控 void DisableRead() // 0010进行取反1101,然后进行&=(_events默认初始化为0)刚刚好把1位置置0,其他位置不变 { _events &= ~EPOLLIN; //......后面会添加到EventLoop的事件监控中 } // 关闭写事件监控 void DisableWrite() { _events &= ~EPOLLOUT; //......后面会添加到EventLoop的事件监控中 } // 关闭所有事件监控 void DisableAll() { _events = 0; } // 我们上面的接口只是对于【某个事件不监控了,但是我们的描述符还是挂在epoll的红黑树上面,随时还可以启动描述符】 // Remove接口就是将描述符从epoll红黑树上面移除掉 void Remove() // 移除监控 { // 后面调用EventLoop接口来移除监控 } // 事件处理,一旦连接触发了事件,就调用这个函数 // 根据_revents来确定触发了什么事件,根据触发事件来调用上面对应事件的回调函数,自己触发了什么事件如何处理自己决定 void HandleEvent() { // 调用可读事件回调函数 // EPOLLRDHUP对方关闭了连接,也触发可读事件,因为我们将剩下缓冲区的数据进行处理了 // EPOLLPRI优先数据 if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { if (_read_callback) { _read_callback(); } // 调用任意事件回调函数 if (_event_callback) // 在事件处理完之后调用,刷新活跃度,防止连接超时,释放连接了 { _event_callback(); } } // 调用可写事件回调函数 // 有可能会释放连接的操作,一次只处理一个 if (_revents & EPOLLOUT) { if (_write_callback) { _write_callback(); } // 调用任意事件回调函数 if (_event_callback) // 在事件处理完之后调用,刷新活跃度,防止连接超时,释放连接了 { _event_callback(); } } // 一旦出错,直接释放连接了,所以要在前面调用任意事件回调函数 // 调用错误事件回调函数 else if (_revents & EPOLLERR) { // 调用任意事件回调函数 if (_event_callback) // 在前面调用,后面调用因为连接释放,没有意义了 { _event_callback(); } if (_error_callback) { _error_callback(); } } // 调用连接断开事件回调函数 else if (_revents & EPOLLRDHUP) { // 调用任意事件回调函数 if (_event_callback) // 在前面调用,后面调用因为连接释放,没有意义了 { _event_callback(); } if (_close_callback) { _close_callback(); } } }private: int _fd; // 描述符 uint32_t _events; // 当前需要监控事件 uint32_t _revents; // 当前触发连接事件 EventCallback _read_callback; // 可读事件触发的回调函数 EventCallback _write_callback; // 可写事件触发的回调函数 EventCallback _error_callback; // 错误事件触发的回调函数 EventCallback _close_callback; // 连接断开事件触发的回调函数 EventCallback _event_callback; // 任意事件触发的回调函数};
4-5、Poller模块【描述符IO事件监控】
Poller模块是EventLoop模块的一个子模块
设计思想:
通过epoll实现对描述符的IO事件监控,也就是对epoll功能的封装
- 添加或者修改描述符的事件监控【组合 : 存在就修改,不存在就添加】
- 移除描述符的事件监控
- 开始监控,获取就绪的Channel
封装思想:
1.必须拥有一个epoll的操作句柄(epoll的描述符)2.拥有一个struct epoll_event结构数组,监控时保存所有的活跃事件3.使用hash表管理描述符与描述符对应的事件管理Channel对象
逻辑流程:
1.对描述符进行监控,通过Channel才能知道描述符需要监控什么事件2.当描述符就绪了,通过描述符在hash表中找到对应的Channel(得到了Channel才能知道什么事件如何处理) 当描述符就绪了,返回就绪描述符对应的Channel
代码实现:
//////------ 描述符IO事件监控Poller模块 ------//////#define MAX_EPOLLEVENTS 1024class Poller{public: Poller() { // epoll_create函数就一个参数,这个参数随便给,因为不会使用但是必须【大于0】 _epfd = epoll_create(MAX_EPOLLEVENTS); if (_epfd < 0) { ERR_LOG(\"epoll create failed\"); abort(); // 退出程序 } } ~Poller() { } // 添加/修改描述符的事件监控 void UpdateEvent(Channel *channel) // Channel里面既又监控的描述符,又有要监控的事件等一系列内容 { bool ret = HasChannel(channel); if (ret == false) { // 不存在描述符监控事件——添加 return Update(channel, EPOLL_CTL_ADD); // EPOLL_CTL_ADD添加 } // 存在描述符监控事件——更新 return Update(channel, EPOLL_CTL_MOD); // EPOLL_CTL_MOD修改 } // 移除描述符的事件监控 void RemoveEvent(Channel *channel) { auto it = _channels.find(channel->Fd()); if (it != _channels.end()) { _channel.erase(it); } Update(channel, EPOLL_CTL_DEL); // EPOLL_CTL_DEL移除 } // 开始监控,返回活跃连接 void Poll(std::vector<Channel *> *active) { // int epoll_wait(int fd,struct epoll_event* ev,int maxevents,int timeout) // timeout : 为-1表示阻塞操作。如果是非-1表示实际时间 int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); // nfds是就绪事件个数 if (nfds < 0) { if (errno == EINTR) // EINTR阻塞被信号打断 { return; } ERR_LOG(\"epoll wait failed : %s\", strerror(errno)); abort(); } // nfds是就绪事件个数 for (size_t i = 0; i < nfds; ++i) { auto it = _channels.find(_evs[i].data.fd); assert(it != _channels.end()); it->second->SetRevents(_evs[i].events); // 设置实际就绪事件 active->push_back(it->second); } }private: // 实际对epoll的直接操作 void Update(Channel *channel, int op) { // int epoll_ctl(int epdf,int op,int fd,epoll_event* ev) int fd = channel->Fd(); struct epoll_event ev; ev.data.fd = fd; ev.events = channel->Events(); int ret = epoll_ctl(_epfd, op, fd, &ev); if (ret < 0) { ERR_LOG(\"epoll_ctl failed\"); // abort(); // 退出程序函数 } return; } // 判断一个Channel是否已经添加了监控 bool HasChannel(Channel *channel) { auto it = _channels.find(channel->Fd()); if (it == _channels.end()) { return false; } return true; }private: int _epfd; // epoll描述符 struct epoll_event _evs[MAX_EPOLLEVENTS]; // 就绪数组 -- 保存所有的活跃连接 std::unordered_map<int, Channel *> _channels; // 建立连接与其监控事件管理Channel句柄的关联关系};// 移除监控,上面Channel内声明了这两个函数void Channel::Remove() // 函数的定义{ return _poller->RemoveEvent(this);}void Channel::Update() // 函数的定义{ return _poller->UpdateEvent(this);}
Channel模块和Poller模块组合测试:
测试代码对上面代码做了一些修改,但是不是最终项目代码,所以这里就不贴出来了
测试结果是符合预期的,Channel和Poller模块功能没有什么问题
4-6、EventLoop模块【事件监控+事件处理】
设计思想:
- 有多少个线程,就有多少个EventLoop模块【与线程是一一对应的】
- 当我们监控了一个客户端连接后,一旦这个连接触发了事件,就需要进行事件处理
而这个描述符,在处理事件的过程中,在多线程中又触发了新的事件,那就有可能被分配到其他线程中去执行,这样就有可能会导致线 程安全问题。那么我们需要为每一个连接的操作都加一把锁来保证线程安全吗?这样做可以是可以,但是没必要,因为我们需要创建很多的锁,会造成不必要的资源开销,浪费掉了
因此我们需要将一个连接的事件监控,以及连接事件处理,以及其他操作都放在同一个线程中进行 - 虽然连接无法与线程直接一一对应,但是我们的EventLoop模块是与线程是一一对应的。因此我们只需将一个连接与一个EventLoop模块进行绑定,就间接完成连接与线程的一一绑定
但是仍然存在问题,如果外部有了任务线程池,再一次对任务进行了分摊,在这种情况下我们并不能保证连接的所有操作都在同一个线程中完成。那么如何保证一个连接的所有操作都必定在 EventLoop 对应的线程中呢?
解决方案:
给EventLoop模块中添加一个任务队列,对连接的所有操作都进行一次封装,将对连接的操作并不直接执行,而是当作任务添加到任务队列中
EventLoop处理流程:
1.在线程中对描述符进行事件监控2.有描述符就绪则对描述符进行事件处理(如何保证处理回调函数中的操作都在线程中?【给EventLoop模块中添加一个任务队列】)3.连接所有的就绪事件处理完了 (都添加都任务队列中了),这时候再去将任务队列中的所有任务一一执行
这样能够保证对于连接的所有操作,都是在一个线程中进行的,不涉及线程安全问题
但是对于任务队列的操作有线程安全的问题,只需要给task的操作加一把锁即可
- 当事件就绪,需要处理的时候,处理过程中,如果对连接要进行某些操作:
这些操作必须在EventLoop对应的线程中执行,保证对连接的各项操作都是线程安全的。
1.如果执行的操作本就在线程中,不需要将操作压入队列了,可以直接执行
2.如果执行的操作不再线程中,才需要加入任务池,等到事件处理完了然后执行任务
最后,因为有可能因为等待描述符IO事件就绪,导致执行流流程阻塞,这时候任务队列中的任务将得不到执行,因此需要使用eventfd来进行事件通知,唤醒事件监控的阻塞
代码实现:
//////------ EventLoop模块【事件监控+事件处理】 ------//////using Functor = std::function<void()>;// std::this_thread::get_id() // 获取当前线程的IDclass EventLoop{public: EventLoop() : _thread_id(std::this_thread::get_id()), // 获取当前线程的id _event_fd(CreateEventFd()), // 调用函数来创建事件通知描述符 _event_channel(new Channel(_event_fd, this)) { // 给eventfd添加可读事件回调函数,读取eventfd事件通知次数 _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this)); // 启动eventfd事件的读事件监控 _event_channel->EnableRead(); } ~EventLoop() { } // 三个流程 : 事件监控->就绪事件处理->执行任务 void Start() { // 1、事件监控 std::vector<Channel *> actives; // 用来接收活跃连接 _poller.Poll(&actives); // 这里可能因为等待描述符IO事件就绪,导致执行流流程阻塞。下面QueueInLoop调用WeakUpEventFd来唤醒 // 2、就绪事件处理 for (auto &channel : actives) { channel->HandleEvent(); // 进行事件处理 } // 3、执行任务 RunAllTask(); } // 用于判断当前线程是否是EventLoop对应的线程【是否和_thread_id一样】 bool IsInLoop() { return (_thread_id == std::this_thread::get_id()); } // 判断将要执行的任务是否处于线程当中,如果是则执行,不是则压入队列 void RunInLoop(const Functor &cb) { if (IsInLoop()) { return cb(); } return QueueInLoop(cb); // 把cb压入到任务池中 } // 将任务压入任务池 void QueueInLoop(const Functor &cb) { { // 对_tasks操作要加锁 std::unique_lock<std::mutex> _lock(_mutex); _tasks.push_back(cb); } // 唤醒有可能因为没有事件就绪,而导致的epoll阻塞 // 其实就是给eventfd写入一个数据,然后eventfd就会触发可读事件,有事件就绪就不会阻塞了 WeakUpEventFd(); } // 修改/添加描述符的事件监控 void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); } // 移除描述符的监控 void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }private: // 执行所有任务池中的任务 void RunAllTask() { std::vector<Functor> functor; // 我们可能在任意一个地方来执行任务,所以说需要加锁保护 { // 作用域,离开作用域,锁自动释放,所以说加一个{}花括号 std::unique_lock<std::mutex> _lock(_mutex); // 加锁期间把任务全部取出来 _tasks.swap(functor); // functor保存我们所有的任务;_tasks里面就是空的了 } for (auto &f : functor) { f(); } // 所有的任务就执行完了 return; } // 创建一个_event_fd事件通知描述符 static int CreateEventFd() { int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (efd < 0) { ERR_LOG(\"create eventfd error\"); abort(); // 让程序异常退出 } return efd; } // 读取eventfd当中的数据 void ReadEventFd() { uint64_t res; int ret = read(_event_fd, &res, sizeof(res)); if (ret < 0) { // EINTR -- 被信号打断,问题不大 // EAGAIN -- 无数据可读,问题不大 if (errno == EINTR || errno == EAGAIN) { return; } ERR_LOG(\"read eventfd failed\"); abort(); } return; } // 唤醒eventfd void WeakUpEventFd() { uint64_t val = 1; int ret = write(_event_fd, &val, sizeof(val)); if (ret < 0) { // EINTR -- 被信号打断,问题不大 // EAGAIN -- 无数据可读,问题不大 if (errno == EINTR || errno == EAGAIN) { return; } ERR_LOG(\"write eventfd failed\"); abort(); } return; }private: // 线程id,判断某个操作的id和EventLoop保存的id是不是一样的 // 如果是,那么执行的操作本【就在线程中】,不需要将操作压入队列了,可以直接执行【操作在一个线程中,没有线程安全问题】 // 如果id不一样不是,那么才需要加入任务池_tasks【不压入任务池,存在线程安全问题】 std::thread::id _thread_id; int _event_fd; // 事件通知描述符,来唤醒IO事件监控可能导致的阻塞 // 通过智能指针来管理Channel指针变量_event_channel // 当EventLoop释放的时候,_event_channel也会被自动释放。或者在析构函数对Channel*指针进行释放 std::unique_ptr<Channel> _event_channel; // 通过_event_channel来管理_event_fd的事件 Poller _poller; // 进行所有描述符事件监控模块 std::vector<Functor> _tasks; // 任务池 std::mutex _mutex; // 实现任务池的线程安全 TimerWheel _timer_wheel; // 定时器模块};
与Chaneel模块,Poller模块代码测试没有问题
注意:这里EventLoop模块不是最终版,因为EventLoop还有一个子模块TimerQueue没有添加进来,后面再一起整合
4-7、TimerQueue模块【定时任务】
设计思想:
- 我们必须把上面前置知识的timerfd和timerwheel整合在一起,才能实现一个完整的秒级定时器
- timerfd:实现内核每隔一段时间,给进程一次超时事件(timerfd可读)
timerwheel:实现每次执行Runtimetask,都可以执行一波到期的定时任务 - timerfd设置为每秒钟触发一次定时事件,当事件被触发,则运行一次timerwheel的runtimertask,执行一 下所有的过期定时任务
- 而timerfd的事件监控与触发,可以融合EventLoop来实现
代码实现:
//////------ TimerQueue模块【定时任务】 ------//////using TaskFunc = std::function<void()>;using ReleaseFunc = std::function<void()>;// 定时任务//class TimerTask{public: TimerTask(uint64_t id, uint32_t timeout, const TaskFunc &cb) : _id(id), _timeout(timeout), _task_cb(cb), _canceled(false) { } ~TimerTask() { if (_canceled == false) { _task_cb(); // 执行我们的定时任务 } _release(); } void Cancel() { _canceled = true; } /*设置release_cb回调函数*/ void SetRelease(const ReleaseFunc &release_cb) { _release = release_cb; } uint32_t Timeout() { return _timeout; } // 给类外知道定时任务的超时时间private: uint64_t _id; // 定时器任务对象id,保证唯一性,我们可能在多线程中使用定时器,所以要统一分配 uint32_t _timeout; // 定时任务超时时间 bool _canceled; // false表示不取消任务,true表示取消任务【取消任务——任务直接不执行了】 TaskFunc _task_cb; // 定时器对象要执行的定时任务 ReleaseFunc _release; // 用于删除TimerWheel对象中保存的定时器信息};// 定时任务//// 时间轮//class TimerWheel{public: // std::unique_ptr _timer_channel; // _timer_channel是由智能指针管理的Channel对象,不能直接传(_timerfd,_loop),要new一个对象 // 构造要求:必须通过原始指针初始化unique_ptr,且该指针必须指向动态分配的内存(通过 new 创建) TimerWheel(EventLoop *loop) : _capacity(60), _tick(0), _wheel(_capacity), _loop(loop), _timerfd(CreateTimerfd()), _timer_channel(new Channel(_timerfd, _loop)) { _timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this)); // 读取timerfd的内容 _timer_channel->EnableRead(); // 启动可读事件监控 } ~TimerWheel() {} //////------保证定时器的操作都是线程安全的 // 【定时器(时间轮)里面有一个_timers成员,定时器信息的操作可能在多线程中进行,所以要考虑线程安全问题】 // 【【如果不想加锁,那就把对定时器所有的操作,放到一个线程中进行】】 // RunInLoop将TimerAddInLoop压入到TimerWheel所依赖的EventLoop线程中的任务池中(线程安全)------ 一个线程对应一个EventLoop // 这样在一个EventLoop线程中,【任务是一个一个执行的】 ------ 在EventLoop线程里面串行化过程 // 处理完事件之后再执行添加定时器任务一定是对应线程内进行的,因为一个EventLoop对应一个线程! void TimerWheel::TimerAdd(uint64_t id, uint32_t timeout, const TaskFunc &cb){ _loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, timeout, cb));} // 刷新/延迟定时器任务void TimerWheel::TimerRefresh(uint64_t id){ _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));} // 取消定时任务void TimerWheel::TimerCancel(uint64_t id){ _loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));} ///---上面3个任务因为需要调用EventLoop类里面的成员方法,我们虽然在上面声明了EventLoop类,但是不知道EventLoop里面有什么成员函数,只能放到下面调用 // 是否存在某个定时任务 // 这个接口存在线程安全问题!!!【所以该接口不能被外界使用者来调用------只能在模块内,在对应的EventLoop线程中调用】 // HasTimer直接访问_timers,但未限制调用线程。若其他线程调用HasTimer,则会导致多线程同时读写_timers,引发数据竞争 bool HasTimer(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return false; } return true; }private: // 这里的RemoveTimer是一个回调函数,内部有this指针 // SetRelease回调函数,从unordered_map中将定时任务信息移除 void RemoveTimer(uint64_t id) // 每个定时任务结束后,删除对应的定时器任务对象id { auto it = _timers.find(id); // 迭代器遍历查找 if (it != _timers.end()) { _timers.erase(id); } } // 创建timerfd static int CreateTimerfd() { int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); // CLOCK_MONOTONIC表示开机到现在的时间——相对时间。0表示默认阻塞操作 if (timerfd < 0) { ERR_LOG(\"timerfd create failed!\\n\"); abort(); } // int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old);启动定时器 // 启动定时器之前要先有时间 struct itimerspec itime; itime.it_value.tv_sec = 1; // itime中的it_value是第一次超时时间,而it_value里面有tv_sec秒和tv_nsec纳秒两种设置,这里设置1s itime.it_value.tv_nsec = 0; // 为了防止随机数,纳秒设置为0 itime.it_interval.tv_sec = 1; // 第一次超时后,每次超时的时间间隔 itime.it_interval.tv_nsec = 0; // 启动定时器 timerfd_settime(timerfd, 0, &itime, NULL); // flag设为0表示相对时间,old不要设为NULL return timerfd; } // 读取timerfd的内容 void ReadTimerfd() { uint64_t times;// 我们每次超时,自动给timerfd中写入8字节的数据,所以这里需要uint64_t的8字节 int ret = read(_timerfd, ×, 8); // timerfd是阻塞的,没有数据就一直卡在read这里 if (ret < 0) { ERR_LOG(\"read timerfd error!\\n\"); abort(); } } // 这个函数应该每秒钟被执行一次,相当于秒针_tick向后走了一步 void RunTimerTask() { _tick = (_tick + 1) % _capacity; _wheel[_tick].clear(); // 清空数组的指定位置,就会将该位置保存的所有管理的定时器shared_ptr释放掉 } void OnTime() { ReadTimerfd(); // 读取timer内容 RunTimerTask(); // 执行定时任务 } // 真正进行添加定时器任务 void TimerAddInLoop(uint64_t id, uint32_t timeout, const TaskFunc &cb) { PtrTask pt(new TimerTask(id, timeout, cb)); // new一个定时对象出来 // 这里不能直接pt->SetRelease(RemoveTimer),因为类内成员函数默认第一个参数是this指针 // RemoveTimer是TimerWheel类的非静态成员函数,必须通过类名作用域(TimerWheel::)访问。直接写RemoveTimer会被编译器误认为是全局函数或当前作用域的自由函数,导致“未声明的标识符”错误。 // 在 C++ 中,成员函数的地址需要通过 & 来获取。这是因为成员函数与普通函数不同,它需要绑定到一个具体的对象实例上才能被调用。& 用于获取成员函数的地址,以便将其作为参数传递给 std::bind // 如果不加 &,编译器会将 TimerWheel::RemoveTimer 视为一个函数调用,而不是函数地址,从而导致语法错误 pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id)); int pos = (_tick + timeout) % _capacity; // 如果_tick + timeout越界了,那么就要像环形一样,从头再开始 _wheel[pos].push_back(pt); // 在时间轮_wheel中找到pos位置,然后计数加一(也就是将获取到的shared_ptr插入进_wheel) _timers[id] = WeakTask(pt); // 不能使用shared_ptr,否则永远有一个智能指针指向对象,计数永远不为0 } // 真正进行刷新/延迟定时器任务 void TimerRefreshInLoop(uint64_t id) { // 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来,添加到时间轮中 auto it = _timers.find(id); // 迭代器遍历查找 if (it == _timers.end()) { return; } // it是一个哈希表(map),第二个成员就是weak_ptr // 而weak_ptr中的lock()函数就是用来获取weak_ptr中管理的对象对应的shared_ptr PtrTask pt = it->second.lock(); // 这里的pt就是我们找到的定时任务 int timeout = pt->Timeout(); // 获取初始延时时间 int pos = (_tick + timeout) % _capacity; // 如果_tick + timeout越界了,那么就要像环形一样,从头再开始 _wheel[pos].push_back(pt); // 在时间轮_wheel中找到pos位置,然后计数加一(也就是将获取到的shared_ptr插入进_wheel) } // 真正进行取消定时任务 void TimerCancelInLoop(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return; } PtrTask pt = it->second.lock(); // 这里的pt就是我们找到的定时任务 if (pt) { pt->Cancel(); // 取消任务 } }private: using WeakTask = std::weak_ptr<TimerTask>; using PtrTask = std::shared_ptr<TimerTask>; //_tick和_capacity要放到_wheel上面,因为初始化列表不决定初始化顺序,由声明的地方决定的 int _tick; // 当前的秒针,走到哪里就是释放哪里,释放哪里就是,执行哪里的定时任务 int _capacity; // 表盘最大数量——最大的延迟时间 std::unordered_map<uint64_t, WeakTask> _timers; // 定时器任务id与管理定时任务对象的weak_ptr之间的关联关系,简单来说(uint64_t是id,WeakTask是计数器) std::vector<std::vector<PtrTask>> _wheel; // 时间轮 EventLoop *_loop; int _timerfd; // 定时器描述符--可读事件回调就是读取计数器,执行定时任务 std::unique_ptr<Channel> _timer_channel; // 定时器的Chaneel,用于设置定时器描述符事件回调};
TimerQueue模块没有什么问题
集合测试:Channel,Poller,EventLoop,TimerQueue
服务端:#include \"../source/server.hpp\"void HandleClose(Channel *channel){ //std::cout << \"close : \" <Fd() << std::endl; DBG_LOG(\"close fd : %d\",channel->Fd()); channel->Remove(); // 移除监控 delete channel;}void HandleRead(Channel *channel){ int fd = channel->Fd(); char buf[1024] = {0}; int ret = recv(fd, buf, 1023, 0); if (ret <= 0) { return HandleClose(channel); // 关闭连接 } DBG_LOG(\"%s\",buf); // std::cout << buf << std::endl; channel->EnableWrite(); // 启动可写事件}void HandleWrite(Channel *channel){ int fd = channel->Fd(); const char *date = \"hello everyone hello everyone\"; int ret = send(fd, date, strlen(date), 0); // 0阻塞发送 if (ret <= 0) { return HandleClose(channel); // 关闭连接 } channel->DisableWrite(); // 关闭写监控}void HandleError(Channel *channel){ return HandleClose(channel); // 关闭释放}void HandleEvent(EventLoop *loop, Channel *channel, uint64_t timerid) // 要刷新活跃度,我们希望连接是在我们设置的秒钟时间内没有事件,才释放连接{ loop->TimerRefresh(timerid);//每次有事件了,刷新活跃度,否则一旦到了我们设置的时间,就关闭连接了}void Acceptor(EventLoop *loop, Channel *lst_channel){ int fd = lst_channel->Fd(); int newfd = accept(fd, nullptr, nullptr); if (newfd < 0) { return; } uint64_t timerid = rand() % 10000;//随机数生成id Channel *channel = new Channel(newfd, loop); channel->SetReadCallback(std::bind(HandleRead, channel)); // 为通信套接字设置可读事件回调函数 channel->SetWriteCallback(std::bind(HandleWrite, channel)); // 为通信套接字设置可写事件回调函数 channel->SetCloseCallback(std::bind(HandleClose, channel)); // 为通信套接字设置关闭事件回调函数 channel->SetErrorCallback(std::bind(HandleError, channel)); // 为通信套接字设置错误事件回调函数 channel->SetEventCallback(std::bind(HandleEvent, loop, channel, timerid)); // 为通信套接字设置任意事件回调函数 //// 非活跃连接的超时释放操作 //// 定时销毁任务,必须在启动读事件之前,因为有可能启动事件监控之后,立即就有了事件,但是这个时候还没有任务 loop->TimerAdd(timerid, 10, std::bind(HandleClose, channel)); // 延迟10s channel->EnableRead();}int main(){ srand(time(nullptr)); EventLoop loop; Socket lst_sock; lst_sock.CreateServer(8080); // 为监听套接字,创建一个Channel进行事件管理,以及事件的处理 Channel channel(lst_sock.Fd(), &loop); channel.SetReadCallback(std::bind(Acceptor, &loop, &channel)); // 回调中获取新连接,为新连接创建Channel并添加监控 channel.EnableRead(); // 启动可读事件监控 while (1) { loop.Start(); } lst_sock.Close(); return 0;}客户端:#include \"../source/server.hpp\"int main(){ Socket cli_sock; cli_sock.CreateClient(\"127.0.0.1\", 8080); // while (1) for (size_t i = 0; i < 5; ++i)//5s之后不在通信 { std::string str = \"hello!\"; cli_sock.Send(str.c_str(), str.size()); char buf[1024] = {0}; cli_sock.Recv(buf, 1023); DBG_LOG(\"%s\", buf); sleep(1); } while(1)//不通信之后,过了我们设置的10s就会关闭连接 { sleep(1); } return 0;}
makefile:
.PHONY:client serverall:client server#带上-g表示可以进行调试client:tcp_cli.ccg++ -g -std=c++17 $^ -o $@server:tcp_src.ccg++ -g -std=c++17 $^ -o $@.PHONY:cleanclean:rm -rf client server
可以看到,有事件到来刷新了活跃度,10
EventLoop简单服务器的模块关系图
4-8、Connection模块【通信连接管理】
设计思想:
Connection模块简介:
目的:Connection模块是对连接的管理模块,对连接进行全方位的管理,对通信连接的所有操作都是通过这个模块提供的功能完成
管理:1.套接字的管理,能够进行套接字的操作2.连接事件的管理,可读,可写,错误,挂断,任意3.缓冲区的管理,便于socket数据的接收和发送4.协议上下文的管理,记录请求数据的处理过程5. 回调函数的管理因为连接接收到数据之后该如何处理,需要由用户决定,因此必须有业务处理回调函数一个连接建立成功后,该如何处理,由用户决定,因此必须有连接建立成功的回调函数一个连接关闭前,该如何处理,由用户决定,因此必须由关闭连接回调函数。任意事件的产生,有没有某些处理,由用户决定,因此必须有任意事件的回调函数
功能:
1.发送数据—给用户提供的发送数据接口,并不是真正的发送接口,而只是把数据放到发送缓冲区,然后启动写事件监控
2.关闭连接—给用户提供的关闭连接接口,应该在实际释放连接之前,看看输入输出缓冲区是否有数据待处理
3.启动非活跃连接的超时销毁功能
4.取消非活跃连接的超时销毁功能
5.协议切换-—一个连接接收数据后如何进行业务处理,取决于上下文,以及数据的业务处理回调函数
- 注意场景:对连接进行操作的时候,但是连接已经被释放,导致内存访问错误,最终程序崩溃
- 解决方案:使用智能指针shared_ptr对Connection对象进行管理,这样就能保证任意一个地方对Connection对象进行操作的时候,保存了一份shared_ptr,因此就算其他地方进行释放操作,也只是对shared_ptr的计数器-1,而不会导致Connection的实际释放
- 我们上面对socket套接字进行了封装,在双方通信的时候,如果服务端socket套接字内的接收缓冲区满了,这个时候客户端还在发送数据,那么服务端接收不了数据就卡住了,所以我们需要将服务端socket套接字内的接收缓冲区数据拿到我们自己上面定义的缓冲区当中
代码实现:
//////------ Connection模块【通信连接管理】 ------//////class Connection; // 声明,不然using PtrConnection = std::shared_ptr;不认识Connectiontypedef enum{ DISCONNECTED, // 连接关闭状态,处于这个状态就表示连接要释放了【enum默认从0开始】 CONNECTING, // 连接刚刚建立了,其他内容待处理的状态(比如回调函数等内容没有设置好)【半连接状态】 CONNECTED, // 连接建立已完成状态,各种设置已经完成,可以进行各种数据通信了 DISCONNECTING // 待关闭状态【半关闭状态】} ConnStatu;// 通信连接的智能指针 -- 最开始的智能指针(引用计数为1)由服务器模块创建using PtrConnection = std::shared_ptr<Connection>;// 组件使用者设置给服务器模块,服务器模块获取新的通信连接后再设置给通信连接模块的回调函数类型using ConnectedCallback = std::function<void(const PtrConnection &)>;using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;// 这里继承这个类是因为要使用【shared_from_this()从当前对象自身获取自身的shared_ptr管理对象】// public enable_shared_from_this这是一个模块类,会为当前对象生成一个weak_ptr,通过weak_ptr能获取对象的shard_ptr【shared_from_this()】class Connection : public std::enable_shared_from_this<Connection>{public: // _statu初始化默认为半完成连接状态 Connection(EventLoop *loop, uint64_t conn_id, int sockfd) : _conn_id(conn_id), _sockfd(sockfd), _enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd), _channel(_sockfd, loop) { _channel.SetReadCallback(std::bind(&Connection::HandleRead, this)); _channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this)); _channel.SetCloseCallback(std::bind(&Connection::HandleClose, this)); _channel.SetErrorCallback(std::bind(&Connection::HandleError, this)); _channel.SetEventCallback(std::bind(&Connection::HandleEvent, this)); // _channel启动读事件监控一定一定一定不能再构造函数里面完成,否则定时任务等会出现问题 } ~Connection() { DBG_LOG(\"release connection : %p\", this); // 看看连接有没有被正常释放 } // 获取管理的文件描述符 int Fd() { return _sockfd; } // 获取连接id int Id() { return _conn_id; } ConnStatu Statu() // 返回状态 { return _statu; } bool Connected() // 是否处于CONNECTED状态 { return (_statu == CONNECTED); } // 设置上下文 -- 连接建立完成时进行调用 void SetContext(const Any &context) { _context = context; } // 获取上下文 Any *GetContext() // 返回的是指针 { return &_context; } void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; } void SetServerClosedCallback(const ClosedCallback &cb) { _server_closed_callback = cb; } // 连接建立后,进行channel回调设置,启动读监控,调用_connected_callback void Established() { _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this)); } // 发送数据,将数据发送到缓冲区,启动写事件监控,触发可写事件之后,在进行数据的发送 void Send(const char *data, size_t len) { // 外界传入的data,可能是个临时的空间,我们现在只是把发送操作压入了任务池,有可能并没有被立即执行 // 因此有可能执行的时候,data指向的空间有可能已经被释放了。 Buffer buf;//这里有buf,下面SendInLoop构造Buffer临时对象,这样就不担心data被释放的可能了 buf.WriteAndPush(data, len); _loop->RunInLoop(std::bind(&Connection::SendInLoop, this, buf)); } // 提供给组件使用者的关闭接口,不实际关闭,需要判断有没有数据待处理 void Shoutdown() { _loop->RunInLoop(std::bind(&Connection::ShoutdownInLoop, this)); } // void Release() { _loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this)); } // 启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务 void EnableInactiveRelease(int sec) { _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec)); } // 取消非活跃销毁 void CancelInactiveRelease() { _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this)); } // 协议切换/协议升级 // 切换协议就是切换了数据的处理方式 -- 重置上下文,以及阶段性回调处理喊函数(_connected_callback等) // 【Upgrade接口必须在EVentLoop线程内立即执行】,不能压入到队列里面去,必须立刻执行 // 【【防备新的事件触发后,处理的时候,切换任务还没有被执行 ------ 会导致数据使用原协议处理了】】 void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &close, const AnyEventCallback &event) { _loop->AssertInLoop(); _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, close, event)); }private: // 对于连接的操作放到对应的EventLoop线程里面去执行,保证线程安全 // 真正对数据进行处理的函数 void ReleaseInLoop() // 实际释放接口 { // 1、修改连接状态,将其置为DISCONNECTED _statu = DISCONNECTED; // 2、移除连接的事件监控 _channel.Remove(); // 3、关闭描述符 _socket.Close(); // 4、如果当前定时器任务队列中还有定时销毁任务,则取消任务 if (_loop->HasTimer(_conn_id)) { CancelInactiveReleaseInLoop(); } ///---为了避免先移除服务器管理的连接信息导致Connection被释放,再去处理会出错,因此先调用用户【组件使用者】的关闭回调函数 // 5、调用关闭回调函数 if (_closed_callback) { _closed_callback(shared_from_this()); } // 移除服务器内部管理的连接信息 if (_server_closed_callback) { _server_closed_callback(shared_from_this()); } } // 连接获取之后,所处的状态下要进行各种设置(给channel设置事件回调,启动读监控,调用回调函数) void EstablishedInLoop() { // 1、修改连接状态 assert(_statu == CONNECTING); // 当前状态一定必须是上层的半连接状态 _statu = CONNECTED; // 当前函数执行完毕,连接进入已完成连接状态 // 2、启动读事件监控 // 一旦启动读事件监控,有可能立即触发读事件,如果这个时候启动了非活跃连接销毁 _channel.EnableRead(); // 3、调用回调函数 if (_connected_callback) { _connected_callback(shared_from_this()); } } // 这个接口并不是实际的发送接口,而只是把数据放到了发送缓冲区,启动了可写事件监控 // 真正的发送接口是HandleWrite void SendInLoop(Buffer buf) // 不使用&,就是重新构造一个新的临时对象 { if (_statu == DISCONNECTED) { return; // 连接是关闭状态什么都不做了 } _out_buffer.WriteBufferAndPush(buf); // 数据放到了发送缓冲区 // 这里_channel在Connection类中没有使用智能指针管理,直接是一个对象,所以不能用-> if (_channel.WriteAble() == false) // 启动了可写事件监控 { _channel.EnableWrite(); } } // 这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送 void ShoutdownInLoop() { _statu = DISCONNECTING; // 设置为半关闭状态 if (_in_buffer.ReadAbleSize() > 0) { if (_message_callback) { _message_callback(shared_from_this(), &_in_buffer); } } // 写事件一直在监控,写入数据出错关闭,HandleWrite里面调用了ReleaseInLoop if (_out_buffer.ReadAbleSize() > 0) { if (_channel.WriteAble() == false) // 启动了可写事件监控 { _channel.EnableWrite(); // 是对象,不是指针,用.不用-> } } // 没有数据发送直接关闭 if (_out_buffer.ReadAbleSize() == 0) { Release(); } } // 启动非活跃连接超时释放规则 void EnableInactiveReleaseInLoop(int sec) { // 1、将判断标志_enable_inactive_release置为true _enable_inactive_release = true; // 2、如果当前定时销毁任务已经存在,那就刷新延迟一下即可 if (_loop->HasTimer(_conn_id)) { return _loop->TimerRefresh(_conn_id); } // 3、如果当前定时销毁任务不存在,那么添加定时销毁任务 _loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this)); } // 取消 void CancelInactiveReleaseInLoop() { _enable_inactive_release = false; if (_loop->HasTimer(_conn_id)) { _loop->TimerCancel(_conn_id); // 定时销毁任务存在才取消 } } // 切换/升级协议 void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &close, const AnyEventCallback &event) { // SetContext(context); _context = context; // 两种设置上下文都可以 _connected_callback = conn; _message_callback = msg; _closed_callback = close; _event_callback = event; } //////------五个channel的事件回调函数 // 描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callback进行业务处理 void HandleRead() { // 1、接收socket的数据,放到接收缓冲区 char buf[65536]; ssize_t ret = _socket.NonBlockRecv(buf, 65535); // 非阻塞接收数据,可能没有数据导致阻塞 if (ret < 0) { // 出错了,不能直接关闭连接返回,看看接收缓冲区有没有数据待处理,发送缓冲区有没有数据待发送 return ShoutdownInLoop(); } else if (ret == 0) // 这里的==0表示没有接收到数据,并不是连接断开了,连接断开了返回-1 { return; } // 走到这里ret>0将数据放到接收缓冲区 _in_buffer.WriteAndPush(buf, ret); // 写入之后,函数最后会自动进行写偏移向后移动 // 2、调用_message_callback进行业务处理 if (_in_buffer.ReadAbleSize() > 0) { // shared_from_this()从当前对象自身获取自身的shared_ptr管理对象 // 使用shared_from_this(),那么类就要继承【class Connection : public enable_shared_from_this】 return _message_callback(shared_from_this(), &_in_buffer); // 事件处理完毕之后_message_callback内部在刷新活跃度 // 如果在上面就调用_message_callback,事件一旦处理过长,就会被认为连接超时了! // 所以HandleRead中不直接对连接进行关闭 } } // 描述符可读事件触发后调用的函数,将发送缓冲区中的数据进行发送 void HandleWrite() { // _out_buffer中的可读位置数据ReadPosition()就是我们要准备发送的数据 ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize()); if (ret < 0) { // 发送错误,看看接收缓冲区是不是还有发送数据,有就处理了,然后直接关闭,不用再等待 if (_in_buffer.ReadAbleSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); } return Release(); // 这里就是实际关闭释放操作了 } _out_buffer.MoveReadOffset(ret); // 读偏移向后移动!!!不然数据永远发送不完! // 如果当前是连接待关闭状态,有数据就发送完数据释放连接,没有数据直接释放 if (_out_buffer.ReadAbleSize() == 0) { _channel.DisableWrite(); // 没有数据待发送了,关闭写事件监控 if (_statu == DISCONNECTING) { return Release(); } } // 走到这里,如果_out_buffer还有数据,就不关闭可写事件监控,下一次可写了再来写入 return; } // 描述符触发挂断事件 void HandleClose() { // 一旦连接挂断,套接字就什么都干不了了,因此有数据待处理就处理一下,完毕关闭连接 if (_in_buffer.ReadAbleSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); } return Release(); // 这里就是实际关闭释放操作了 } // 描述符触发出错事件 void HandleError() { // 和挂断是同理的,套接字什么都做不了,直接调用挂断函数 return HandleClose(); } // 描述符触发任意事件 void HandleEvent() { // 1、刷新连接的活跃度 ------ 延迟定时销毁任务 if (_enable_inactive_release == true) // 启动非活跃连接销毁 { _loop->TimerRefresh(_conn_id); // 刷新活跃度 } // 2、调用组件使用者的任意事件回调 if (_event_callback) { _event_callback(shared_from_this()); } }private: // uint64_t _timer_id;//定时器id,必须唯一,为了简化操作,使用_conn_id作为定时器id uint64_t _conn_id; // 连接唯一id,便于连接的管理和查找 int _sockfd; // 连接关联的文件描述符 bool _enable_inactive_release; // 是否启动非活跃连接销毁的判断标志,默认为false表示关闭 EventLoop *_loop; // 连接所关联的EventLoop ConnStatu _statu; // 连接的状态 Socket _socket; // 通过Socket对象_socket来进行 套接字操作管理 Channel _channel; // 连接事件管理 Buffer _in_buffer; // 输入缓冲区 -- 存放从socket中读取到的数据 Buffer _out_buffer; // 输出缓冲区 -- 存放要发送给对端的数据 Any _context; // 请求的接收处理上下文 // 下面的四个回调函数是由组件使用者设置给服务器模块,(服务器模块的处理回调也是组件使用者设置的)服务器模块获取新的通信连接后再设置给通信连接模块的 // 换句话说,这几个回调都是组件使用者使用的 ConnectedCallback _connected_callback; // 通信连接建立成功的回调 MessageCallback _message_callback; // 接收到消息的回调 ClosedCallback _closed_callback; // 通信连接关闭的回调 AnyEventCallback _event_callback; // 任意事件的回调 // 组件内的连接关闭回调 -- 组件内设置的,因为服务器组件内会把所有的连接管理起来 // 一旦连接要关闭,就应该从管理的地方移除掉自己的信息 ClosedCallback _server_closed_callback;};
测试各个模块功能没有问题
4-9、Acceptor模块【监听套接字管理】
设计思想:
-
创建一个监听套接字
-
启动读事件监控
-
事件触发后,获取新连接
-
调用新连接获取成功后的回调函数
为新连接创建Connection进行管理(这一步由服务器模块操作,不是Acceptor模块)
因为Acceptor模块只进行监听连接的管理,因此获取到新连接的描述符之后,对于新连接描述符如何处理其实并不关心
对于新连接如何处理,应该是服务器模块来管理的
服务器模块,实现了一个对于新连接描述符处理的函数,将这个函数设置给Acceptor模块中的回调函数
代码实现:
//////------ Acceptor模块【对监听套接字进行管理】 ------//////using AcceptorCallback = std::function<void(int)>; // 参数就是描述符class Acceptor{public: Acceptor(EventLoop *loop, int port) // 服务器监听端口需要传入 : _loop(loop), _socket(CreateServer(port)), _channel(_socket.Fd(), loop) { // 不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动 // 否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新连接得不到处理,且资源泄漏 // 所以需要后面单独调用Listen()函数启动读事件 _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this)); // 设置读事件回调函数 } ~Acceptor() { } void SetAcceptCallback(const AcceptorCallback &cb) { _accept_callback = cb; } void Listen() { _channel.EnableRead(); // 启动读事件监控 }private: // 监听套接字读事件回调函数处理 —————— 获取新连接,调用_accept_callback函数,进行新连接处理 void HandleRead() { int newfd = _socket.Accept(); // 获取新连接 if (newfd < 0) { return; } if (_accept_callback) { _accept_callback(newfd); } } int CreateServer(int port) { bool ret = _socket.CreateServer(port); assert(ret == true); return _socket.Fd(); // 获取出文件描述符 }private: Socket _socket; // 用于创建监听套接字 EventLoop *_loop; // 用于对监听套接字进行事件监控 Channel _channel; // 用于对监听套接字进行事件管理 AcceptorCallback _accept_callback; //};
4-10、LoopThread模块【EventLoop模块与线程整合】
设计思想:
目标:将EventLoop模块与线程整合起来
EventLoop模块与线程是一—一对应的。EventLoop模块实例化的对象,在构造的时候就会初始化_thread_id,而后边当运行一个操作的时候判断当前是否运行在eventLoop模块对应的线程中,就是将线程ID与EventLoop模块中的_thread_id进行一个比较,相同就表示在同一个线程,不同就表示当前运行线程并不是EventLoop线程
含义:EventLoop模块在实例化对象的时候,必须在线程内部,EventLoop实例化对象时会设置自己的thread_id
如果我们先创建了多个EventLoop对象,然后创建了多个线程,将各个线程的id,重新给EventLoop进行设置
存在问题:在构造EventLoop对象,到设置新的thread_id期间将是不可控的
因此我们必须先创建线程,然后在线程的入口函数中,去实例化EventLoop对象。(先有线程,然后再实例化EventLoop对象)
先为 EventLoop 对象创建一个线程,然后在该线程的入口函数中去实例化 EventLoop 对象,这样该线程就会与 EventLoop 对象相关联 (实例化时该线程 id 被用于初始化 EventLoop 对象的 _thread_id)
所以我们需要构造一个新的模块:LoopThread模块(循环线程模块)
这个模块的功能:将EventLoop与thread整合到一起
思想:1. 创建线程2. 在线程中实例化EventLoop对象3. 可以向外部返回所实例化的EventLoop
有了LoopThread模块就能保证EventLoop对象在出现的第一课就和线程绑定到一起了
代码实现:
//////------ LoopThread模块【将EVentLoop与线程整合】 ------//////class LoopThread{public: // 创建线程,设置入口函数 LoopThread() : _loop(nullptr), _thread(std::thread(std::bind(&LoopThread::ThreadEntry, this))) { } ~LoopThread() { } // 返回当前线程关联的Eventloop对象指针 EventLoop *GetLoop() // 如果创建线程之后,还没有来得及实例化出Eventloop的对象,那么外界调用GetLoop获取的是一个空 { EventLoop *loop = nullptr; { std::unique_lock<std::mutex> lock(_mutex); // 对_loop的所有操作需要加锁 // 满足条件就不用等待 _cond.wait(lock, [&]() { return _loop != nullptr; }); // 走到这里满足条件了 loop = _loop; } // 返回局部指针loop就行,因为loop指向_loop,而_loop指向对象EventLoop是由类来管理的,生命周期不会因为GetLoop()函数的返回而结束 return loop; }private: // 实例化EventLoop对象,唤醒_cond上有可能阻塞的线程,并开始运行EventLoop模块的功能 void ThreadEntry() { // loop的生命周期由LoopThread来管理,LoopThread销毁这里的loop也就销毁了 EventLoop loop; // loop的线程id就是我们当前线程的id { std::unique_lock<std::mutex> lock(_mutex); // 对_loop的所有操作需要加锁 _loop = &loop; _cond.notify_all(); // 唤醒可能正在阻塞的GetLoop操作 } loop.Start(); // 里面循环运行 }private: // 下面两个成员变量用于实现_loop获取的同步关系 // 避免线程创建了,但是_loop还没有实例化之前去获取_loop,这就获取到空了 // 所以同步就是【外界调用GetLoop获取_loop的时候,_loop是已经实例化完了,如果_loop没有实例化,就不能获取】 std::mutex _mutex; // 互斥锁 std::condition_variable _cond; // 条件变量 EventLoop *_loop; // EventLoop指针变量,这个对象需要在线程内实例化 std::thread _thread; // EventLoop对应的线程,要先有线程,在线程中实例化出EVentLoop对象};
4-11、LoopThreadPool模块【针对LoopThread设计线程池】
设计思想:
针对LoopThread设计一个线程池:LoopThreadPool模块:对所有的LoopThread进行管理及分配
功能:
1.线程数量可配置(0个或多个)
注意事项:在服务器中,主从Reactor模型是主线程只负责新连接获取,从属线程负责新连接的事件监控及处理 因此当前的线程池,有可能从属线程会数量为0,也就是实现单Reactor服务器,一个线程及负责获取连接,也负责连接的处理(当我们服务器资源不够的时候,我们可以将主从Reactor模式转化成为单Reactor模式,来更大效率的利用资源,这就是线程数量可以配置)
2.对所有的线程进行管理,其实就是管理0个或多个LoopThread对象
3.提供线程分配的功能
当主线程获取了一个新连接,需要将新连接挂到从属线程上进行事件监控及处理
假设有0个从属线程,则直接分配给主线程的EventLoop,进行处理
假设有多个从属线程,则采用RR轮转思想,进行线程的分配(将对应线程的EventLoop获取到,设置给对应的Connection)
代码实现:
//////------ LoopThreadPool模块【针对LoopThread设计线程池】 ------//////class LoopThreadPool{public: LoopThreadPool(EventLoop *baseloop) : _thread_count(0), _next_idx(0), _baseloop(baseloop) { } ~LoopThreadPool() { } // 设置线程数量 void SetThreadCount(int count) { _thread_count = count; } // 创建所有的从属线程 void Create() { if (_thread_count > 0) { _threads.resize(_thread_count); _loops.resize(_thread_count); for (size_t i = 0; i < _thread_count; ++i) { ////LoopThread中构造函数不用传参数 _threads[i] = new LoopThread(); _loops[i] = _threads[i]->GetLoop(); // 实际上从属线程已经开始运行起来了,LoopThread构造调用了ThreadEntry,里面唤醒了阻塞线程并且start } } return; } // 分配下一个从属线程 EventLoop *NextLoop() { if (_thread_count == 0) { return _baseloop; // 如果从属线程数量为0,那么主从线程合二为一,所有线程操作在一个线程执行 } _next_idx = (_next_idx + 1) % _thread_count; return _loops[_next_idx]; }private: int _thread_count; // 从属线程的数量 int _next_idx;// 进行RR轮转 EventLoop *_baseloop; // 主EventLoop,运行在主线程,从属线程数量为0,则所有操作都在baseloop中进行 std::vector<LoopThread *> _threads; // 保存所有的LoopThread对象 std::vector<EventLoop *> _loops; // 从属线程数量大于0则从_loops中进行线程EventLoop分配};
4-12、TcpServer模块【对上述所有模块整合】
设计思想:
TcpServer模块:对所有模块的整合,通过TcpServer模块实例化的对象,可以非常简单的完成一个服务器的搭建
管理:
1. Acceptor对象,创建一个监听套接字2.EventLoop对象,baseloop对象,实现对监听套接字的事件监控3. std:unordered_map_conns,实现对所有新建连接的管理4.LoopThreadPool对象,创建loop线程池,对新建连接进行事件监控及处理
功能:
1.设置从属线程池数量2.启动服务器3.设置各种回调函数(连接建立完成,消息,关闭,任意),用户设置给TcpServer,TcpServer设置给获取的新连接4.是否启动非活跃连接超时销毁功能5.添加定时任务功能
流程:
1.在TcpServer中实例化一个Acceptor对象,以及一个EventLoop对象(baseloop)2.将Acceptor挂到baseloop上进行事件监控3.一旦Acceptor对象就绪了可读事件,则执行读事件回调函数获取新建连接4.对新连接,创建一个Connection进行管理5.对连接对应的Connection设置功能回调(连接完成回调,消息回调,关闭回调,任意事件回调)6.启动Connection的非活跃连接的超时销毁规则7.将新连接对应的Connection挂到LoopThreadPool中的从属线程对应的Eventloop中进行事件监控8.一旦Connection对应的连接就绪了可读事件,则这时候执行读事件回调函数,读取数据,读取完毕后调用TcpServer设置对应的回调函数
代码实现:
//////------ TcpServer模块【对上述所有模块整合】 ------//////// TcpServer模块设置回调函数,就是Connection模块里面有什么回调函数,就设置什么回调函数,因为Connection里面的回调就是这里设置的// 通信连接的智能指针 -- 最开始的智能指针(引用计数为1)由服务器模块创建using PtrConnection = std::shared_ptr<Connection>;// 组件使用者设置给服务器模块,服务器模块获取新的通信连接后再设置给通信连接模块的回调函数类型using ConnectedCallback = std::function<void(const PtrConnection &)>;using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;using Functor = std::function<void()>;class TcpServer{public: TcpServer(int port) : _port(port), _next_id(0), _enable_inactive_release(false), _acceptor(&_baseloop, port), _loop(&_baseloop) { //////不能在这里创建,因为我们还没有SetThreadCount设置线程池数量,直接创建肯定报错_loop.Create(); // 创建线程池中的从属线程 // 要设置,不能忘记 _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1)); _acceptor.Listen(); // 将监听套接字挂到baseloop上 } ~TcpServer() { } // 设置线程池数量 void SetThreadCount(int count) { return _loop.SetThreadCount(count); } // 各种回调函数设置 void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; } // 是否启动非活跃连接销毁 void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; } // 用于添加定时任务,多少秒之后执行一个任务 void RunAfter(const Functor &task, int timeout) // 定时任务,延迟时间 { _baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, timeout)); } // 启动服务器 void Start() { // 这里创建线程池中的从属线程才是正确的 _loop.Create(); // 创建线程池中的从属线程 _baseloop.Start(); // 启动服务器 }private: void NewConnection(int fd) // 为新连接构造一个Connection进行管理 { _next_id++; PtrConnection conn(new Connection(_loop.NextLoop(), _next_id, fd)); // 智能指针管理 conn->SetMessageCallback(_message_callback); conn->SetClosedCallback(_closed_callback); conn->SetConnectedCallback(_connected_callback); conn->SetAnyEventCallback(_event_callback); conn->SetServerClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1)); if (_enable_inactive_release) { conn->EnableInactiveRelease(_timeout); // 启动非活跃超时销毁 } conn->Established(); // 就绪初始化,添加读监控,调用OnConnected _conns.insert(std::make_pair(_next_id, conn)); } void RemoveConnectionInLoop(const PtrConnection &conn) { int id = conn->Id(); auto it = _conns.find(id); if (it != _conns.end()) { _conns.erase(it); } } void RemoveConnection(const PtrConnection &conn) // 从管理Connection的_conns中移除连接信息,这样连接才能真正被释放掉 { _baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn)); } void RunAfterInLoop(const Functor &task, int timeout) { _next_id++; _baseloop.TimerAdd(_next_id, timeout, task); }private: uint64_t _next_id; // 这是一个连接自动增长的id int _port; int _timeout; // 这是非活跃连接超时时间 ------ 多长时间无通信就是非活跃连接时间 bool _enable_inactive_release; // 这是是否启动非活跃连接超时销毁的判断标志 EventLoop _baseloop; // 这是主线程的Eventloop对象,负责监听事件的处理 Acceptor _acceptor; // 这是监听套接字管理的对象 LoopThreadPool _loop; // 这是从属Eventloop线程池,监听事件获取到新连接,新连接就交给从属线程池里面的线程来处理 // 保存管理所有连接对应的shard_ptr对象 // PtrConnection里面的信息一旦被删除。意味着要释放对应的连接了 std::unordered_map<uint64_t, PtrConnection> _conns; // 组件使用者设置给服务器模块的各种事件回调 ConnectedCallback _connected_callback; // 通信连接建立成功的回调 MessageCallback _message_callback; // 接收到消息的回调 ClosedCallback _closed_callback; // 通信连接关闭的回调 AnyEventCallback _event_callback; // 任意事件的回调};
4-13、EchoServer模块【回显服务】
#include \"../server.hpp\"class EchoServer{public: EchoServer(int port) : _server(port) { _server.SetThreadCount(2); _server.EnableInactiveRelease(10); _server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1)); _server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2)); _server.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1)); _server.Start(); } ~EchoServer() { } void Start() { _server.Start(); }private: //////类的成员函数,有this指针 // TcpServer模块的回调函数,在触发时会在内部传递参数PtrConnection void OnConnected(const PtrConnection &conn) { // get获取指针地址 DBG_LOG(\"new connection : %p\", conn.get()); } void OnClosed(const PtrConnection &conn) { // get获取指针地址 DBG_LOG(\"close connection : %p\", conn.get()); } // 这里触发是会在内部多传递一个Buffer参数 void OnMessage(const PtrConnection &conn, Buffer *buf) { conn->Send(buf->ReadPosition(), buf->ReadAbleSize()); buf->MoveReadOffset(buf->ReadAbleSize()); conn->Shutdown(); // 关闭连接 }private: TcpServer _server;};
使用wennench
测试
webbench 测试指令 : -c设置进程数量,-t设置时间./webbench -c 500 -t 60 http://127.0.0.1:8080/hello注意:因为是在一台机器上进行服务和测试,所以说抢占资源比较厉害,这种方法不合理,所以说如果进程数量太多,就会导致服务器断开
EchoServer流程图:
五、HTTP协议模块
5-1、Util模块【零碎工具接口】
设计思想:
命令:md5sum
作用:用来计算文件的 MD5 校验和
可以判断两个文件是否内容一致
在RFC3986文档中规定的URL绝对不编码字符:【. - _ ~ 字母 数字】
还有一个就是在不同的一些标准中的特殊处理:
W3C标准规定中规定查询字符串中的空格必须被编码为+,解码+转空格。
RFC2396中规定URI中的保留字符需要转换为%HH格式。
代码实现:
#pragma once#include \"../server.hpp\"// HTTP响应状态码对应的描述信息std::unordered_map<int, std::string> statu_msg = { {100, \"Continue\"}, {101, \"Switching Protocol\"}, {102, \"Processing\"}, {103, \"Early Hints\"}, {200, \"OK\"}, {201, \"Created\"}, {202, \"Accepted\"}, {203, \"Non-Authoritative Information\"}, {204, \"No Content\"}, {205, \"Reset Content\"}, {206, \"Partial Content\"}, {207, \"Multi-Status\"}, {208, \"Already Reported\"}, {226, \"IM Used\"}, {300, \"Multiple Choice\"}, {301, \"Moved Permanently\"}, {302, \"Found\"}, {303, \"See Other\"}, {304, \"Not Modified\"}, {305, \"Use Proxy\"}, {306, \"unused\"}, {307, \"Temporary Redirect\"}, {308, \"Permanent Redirect\"}, {400, \"Bad Request\"}, {401, \"Unauthorized\"}, {402, \"Payment Required\"}, {403, \"Forbidden\"}, {404, \"Not Found\"}, {405, \"Method Not Allowed\"}, {406, \"Not Acceptable\"}, {407, \"Proxy Authentication Required\"}, {408, \"Request Timeout\"}, {409, \"Conflict\"}, {410, \"Gone\"}, {411, \"Length Required\"}, {412, \"Precondition Failed\"}, {413, \"Payload Too Large\"}, {414, \"URI Too Long\"}, {415, \"Unsupported Media Type\"}, {416, \"Range Not Satisfiable\"}, {417, \"Expectation Failed\"}, {418, \"I\'m a teapot\"}, {421, \"Misdirected Request\"}, {422, \"Unprocessable Entity\"}, {423, \"Locked\"}, {424, \"Failed Dependency\"}, {425, \"Too Early\"}, {426, \"Upgrade Required\"}, {428, \"Precondition Required\"}, {429, \"Too Many Requests\"}, {431, \"Request Header Fields Too Large\"}, {451, \"Unavailable For Legal Reasons\"}, {501, \"Not Implemented\"}, {502, \"Bad Gateway\"}, {503, \"Service Unavailable\"}, {504, \"Gateway Timeout\"}, {505, \"HTTP Version Not Supported\"}, {506, \"Variant Also Negotiates\"}, {507, \"Insufficient Storage\"}, {508, \"Loop Detected\"}, {510, \"Not Extended\"}, {511, \"Network Authentication Required\"} // 最后没有逗号};std::unordered_map<std::string, std::string> mime_msg = { /*文件拓展名对应的MIME名称*/ {\".aac\", \"audio/aac\"}, {\".abw\", \"application/x-abiword\"}, {\".arc\", \"application/x-freearc\"}, {\".avi\", \"video/x-msvideo\"}, {\".azw\", \"application/vnd.amazon.ebook\"}, {\".bin\", \"application/octet-stream\"}, {\".bmp\", \"image/bmp\"}, {\".bz\", \"application/x-bzip\"}, {\".bz2\", \"application/x-bzip2\"}, {\".csh\", \"application/x-csh\"}, {\".css\", \"text/css\"}, {\".csv\", \"text/csv\"}, {\".doc\", \"application/msword\"}, {\".docx\", \"application/vnd.openxmlformats-officedocument.wordprocessingml.document\"}, {\".eot\", \"application/vnd.ms-fontobject\"}, {\".epub\", \"application/epub+zip\"}, {\".gif\", \"image/gif\"}, {\".htm\", \"text/html\"}, {\".html\", \"text/html\"}, {\".ico\", \"image/vnd.microsoft.icon\"}, {\".ics\", \"text/calendar\"}, {\".jar\", \"application/java-archive\"}, {\".jpeg\", \"image/jpeg\"}, {\".jpg\", \"image/jpeg\"}, {\".js\", \"text/javascript\"}, {\".json\", \"application/json\"}, {\".jsonld\", \"application/ld+json\"}, {\".mid\", \"audio/midi\"}, {\".midi\", \"audio/x-midi\"}, {\".mjs\", \"text/javascript\"}, {\".mp3\", \"audio/mpeg\"}, {\".mpeg\", \"video/mpeg\"}, {\".mpkg\", \"application/vnd.apple.installer+xml\"}, {\".odp\", \"application/vnd.oasis.opendocument.presentation\"}, {\".ods\", \"application/vnd.oasis.opendocument.spreadsheet\"}, {\".odt\", \"application/vnd.oasis.opendocument.text\"}, {\".oga\", \"audio/ogg\"}, {\".ogv\", \"video/ogg\"}, {\".ogx\", \"application/ogg\"}, {\".otf\", \"font/otf\"}, {\".png\", \"image/png\"}, {\".pdf\", \"application/pdf\"}, {\".ppt\", \"application/vnd.ms-powerpoint\"}, {\".pptx\", \"application/vnd.openxmlformats-officedocument.presentationml.presentation\"}, {\".rar\", \"application/x-rar-compressed\"}, {\".rtf\", \"application/rtf\"}, {\".sh\", \"application/x-sh\"}, {\".svg\", \"image/svg+xml\"}, {\".swf\", \"application/x-shockwave-flash\"}, {\".tar\", \"application/x-tar\"}, {\".tif\", \"image/tiff\"}, {\".tiff\", \"image/tiff\"}, {\".ttf\", \"font/ttf\"}, {\".txt\", \"text/plain\"}, {\".vsd\", \"application/vnd.visio\"}, {\".wav\", \"audio/wav\"}, {\".weba\", \"audio/webm\"}, {\".webm\", \"video/webm\"}, {\".webp\", \"image/webp\"}, {\".woff\", \"font/woff\"}, {\".woff2\", \"font/woff2\"}, {\".xhtml\", \"application/xhtml+xml\"}, {\".xls\", \"application/vnd.ms-excel\"}, {\".xlsx\", \"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet\"}, {\".xml\", \"application/xml\"}, {\".xul\", \"application/vnd.mozilla.xul+xml\"}, {\".zip\", \"application/zip\"}, {\".3gp\", \"video/3gpp\"}, {\".3g2\", \"video/3gpp2\"}, {\".7z\", \"application/x-7z-compressed\"} // 最后没有逗号};//////------ Util模块【零碎工具接口】 ------//////class Util{public: // 原字符串,分割符,分割之后得到的各个子串 // 字符串分割,将src字符串按照sep字符/字符串进行分割,得到的各个子串放到array中,最终返回子串的数量 // substr参数1:从什么位置开始截取,参数2:截取多长数据 static size_t Split(const std::string &src, const std::string &sep, std::vector<std::string> *array) { size_t offset = 0; // offset是下标,不能等于src.size(),否则越界 while (offset < src.size()) { // 在src字符串的偏移量offset处,开始向后查找sep字符/字符串,返回查找到的位置 size_t pos = src.find(sep, offset); if (pos == std::string::npos) { // 没有找到意味着 —————— offset位置到末尾的内容就是一个子串了,将剩余部分当做子串 array->push_back(src.substr(offset)); // 截取offset位置到末尾插入 return array->size(); } // 注意问题:src : \"abc,,,bcd,,,csd,,,\" sep:\',\' // 如果我当前所在的位置就是我要的字符,那么截取子串的长度就为0,就是空串了 if (pos == offset) { offset = pos + sep.size(); continue; // 当前子串是一个空串,没有内容 } array->push_back(src.substr(offset, pos - offset)); // 这里要加上分割内容的长度,不能+1,因为有可能sep是一个字符串,而不是字符 offset = pos + sep.size(); } return array->size(); } // 读取出文件所有内容,将读取的内容放到Buffer中 static bool ReadFile(const std::string &filename, std::string *buf) { std::ifstream ifs(filename, std::ios::binary); if (ifs.is_open() == false) // is_open判断一个文件是否被打开。open打开一个文件 { ERR_LOG(\"open %s file failed\", filename); return false; } size_t fsize = 0; // seekg : 将文件读写跳转到指定位置 ifs.seekg(0, ifs.end); // 文件读写跳转到末尾 fsize = ifs.tellg(); // 获取文件当前读写位置到起始位置的偏移量,上面跳到末尾,刚好获取到文件大小 ifs.seekg(0, ifs.beg); // 跳转到起始位置 buf->resize(fsize); // 设置有效数据空间,可以触发扩容,开辟空间 // ifs.read将文件数据读取到一个可写缓冲区中,buf ifs.read(&(*buf)[0], fsize); // 返回char*,不能用c_str(),有const属性。 if (ifs.good() == false) { ERR_LOG(\"read %s file failed\", filename); ifs.close(); return false; } ifs.close(); return true; } // 向文件写入数据 static bool WriteFile(const std::string &filename, const std::string &buf) { std::ofstream ofs(filename, std::ios::binary | std::ios::trunc); // trunc表示不要原有内容,直接写入新内容 if (ofs.is_open() == false) { ERR_LOG(\"open %s file failed\", filename); return false; } ofs.write(buf.c_str(), buf.size()); if (ofs.good() == false) { ERR_LOG(\"write %s file failed\", filename); ofs.close(); return false; } ofs.close(); return true; } // URL编码,避免URL中资源路径与查询字符串中的特殊字符与HTTP请求中特殊字符产生歧义 // 编码格式 : 将特殊字符的ascii码值,转换成为两个16进制字符,前缀%【C++ ---> C%2B%2B ---> +的ascii值是43,转16进制】 // 不编码的特殊字符 : RFC3986文档规定【. - _ ~ 字母 数字】属于绝对不编码字符 // RFC3986文档规定,编码格式%HH // W3C标准中规定,查询字符串中的空格,需要被编码为+,解码+转空格 static std::string UrlEncode(const std::string url, bool convert_space_to_plus) { std::string res; for (auto &ch : url) { // isalnum里面包含了isalpha(c) || isdigit(c) // isalpha判断字符是不是英文字符(大小写都包括了),isdigit判断字符是不是数字(0~9) if (ch == \'.\' || ch == \'-\' || ch == \'~\' || ch == \'_\' || isalnum(ch)) { res += ch; continue; } if (ch == \' \' && convert_space_to_plus == true) // 如果字符为空格,并且convert_space_to_plus为真 { res += \'+\'; continue; } // 其他的字符以及convert_space_to_plus为假的空格,都是需要被编码为 %HH 格式的 char tmp[4] = {0}; // 将数据(ch)按照指定格式(\"%%%02X\")格式化为一个指定字符串放到对应位置(tmp),且格式化内容不能超过范围(4) // %% 打印 % %02X 打印 %X(十六进制大写用X,小写用x) 0 : 填充0 2 : 占位两个字符 最终结果 :%HH snprintf(tmp, 4, \"%%%02X\", ch); // snprintf与printf比较类似,都是格式化字符串,只不过一个是打印,一个是放到一块空间中 res += tmp; } return res; } // URL解码 // url[i+1] << 4 + url[i+2]这样是不行的,因为url里面是字符,要把字符转10进制数字再进行计算 // 将一个十六进制字符转换为一个十进制字符 static int HexToDesc(char ch) { if (ch >= \'0\' && ch <= \'9\') { return ch - \'0\'; } else if (ch >= \'A\' && ch <= \'Z\') { return ch - \'A\' + 10; // 字符从10开始 } else if (ch >= \'a\' && ch <= \'z\') { return ch - \'a\' + 10; // 字符从10开始 } return -1; } static std::string UrlDecode(const std::string url, bool convert_plus_to_space) { std::string res; // 遇到了%,将后面两个字符转换为数字,第一个数字左移4位【数字 * 16】,然后加上第二个数字 // + 【ascii为43】 ---> 2b 那么 %2b ---> 2< 2*16+11 for (size_t i = 0; i < url.size(); ++i) { if (url[i] == \'%\' && (i + 2) < url.size()) // (i+2)<url.size()不管我解析对不对,不能让我越界访问程序崩溃了 { int ch1 = HexToDesc(url[i + 1]); int ch2 = HexToDesc(url[i + 2]); int ch = (ch1 << 4) + ch2; res += ch; i += 2; continue; } if (url[i] == \'+\' && convert_plus_to_space == true) // 如果设置了空格字符的特殊编码,则将 + 替换为空格 { res += \' \'; continue; } res += url[i]; } return res; } // 响应状态码的描述信息获取,状态码传入,返回字符串描述符信息 static std::string StatuDesc(int statu) { auto it = statu_msg.find(statu); if (it != statu_msg.end()) { return it->second; } return \"unknow statu\"; } // 根据文件后缀名获取文件mime,文件名传入,返回文件名扩展名对应的mime static std::string ExtMime(const std::string &filename) { // 先获取文件扩展名 a.b.c.d.txt ------ 获取到.txt size_t pos = filename.find_last_of(\'.\'); // find_last_of从后往前找 if (pos == std::string::npos) { return \"application/octet-stream\"; // 表示是一个二进制流,文件是二进制文件 } std::string ext = filename.substr(pos); // 根据文件扩展名获取mime auto it = mime_msg.find(ext); if (it == mime_msg.end()) { return \"application/octet-stream\"; // 表示是一个二进制流,文件是二进制文件 } return it->second; } // 判断一个文件是否是一个目录 static bool IsDirectory(const std::string &filename) { struct stat st; int ret = stat(filename.c_str(), &st); if (ret < 0) { return false; } // S_ISDIR宏函数:S_ISDIR(m)(((m) & S_IFMT) == S_IFDIR) // 里面自动进行了处理,可以直接来判断文件是不是目录 return S_ISDIR(st.st_mode); } // 判断一个文件是否是一个普通文件 static bool IsRegular(const std::string &filename) { struct stat st; int ret = stat(filename.c_str(), &st); if (ret < 0) { return false; } // S_ISREG宏函数:S_ISREG(m)(((m) & S_IFMT) == S_IFREG) // 里面自动进行了处理,可以直接来判断文件是不是普通文件 return S_ISREG(st.st_mode); } // 判断HTTP请求资源的路径是否是有效性 // /index.html --- 前边的/叫做相对根目录映射的是某个服务器上的子目录 // 客户端只能请求相对根目录中的资源,其他地方的资源都不予理会 // 资源路径的有效性是指客户端只能访问web根目录下的文件,不能通过/../login这种进入相对根目录之外,不安全 // 以/为分隔符将路径分隔开来 static bool ValidPath(const std::string &path) { // 思想:按照/进行路径分割,根据有多少子目录,计算目录深度,有多少层,深度不能小于0 std::vector<std::string> v; Split(path, \"/\", &v); int level = 0; for (auto &m : v) { if (m == \"..\") { --level; // 在任意位置如果目录深度小于0,说明访问到了web根目录的上级目录,有问题 if (level < 0) { return false; } continue; } ++level; } return true; }};
5-2、HttpRequest模块【存储HTTP请求信息】
设计思想:
HttpRequest模块 —— http请求信息模块:存储HTTP请求信息要素,提供简单的功能性接口
要素:请求方法,资源路径,查询字符串,头部字段,正文,协议版本
请求信息要素:请求行:请求方法,URL,协议版本URL:资源路径,查询字符串GET /search/1234?word=C++&en=utf8 HTTP/1.1请求头部:key: valrいnkey: valrIn..….Content-Length: OrIn正文std::smatch保存首行使用regex正则进行解析后,所提取的数据,比如提取资源路径中的数字...
功能性接口:
1.将成员变量设置为公有成员,便于直接访问
2.提供查询字符串,以及头部字段的单个查询和获取,插入功能
3.获取正文长度
4.判断长连接&短链接Connection:close/keep-alive
代码实现:
//////------ HttpRequest模块【存储HTTP请求信息】 ------//////class HttpRequest{public: // 重置 ------ 每一次处理完请求就要把请求重置一下,防止当前请求的信息对下一次请求信息造成影响 void ReSet() { _method.clear(); _path.clear(); _version.clear(); _body.clear(); std::smatch match; _matches.swap(match); // 交换进行清空 _headers.clear(); _params.clear(); } // 插入头部字段 void SetHeader(const std::string &key, const std::string &val) { _headers.insert(std::make_pair(key, val)); } // 判断是否存在指定的头部字段 bool HasHeader(const std::string &key) { auto it = _headers.find(key); if (it == _headers.end()) { return false; } return true; } // 获取指定头部字段的值 std::string GetHeader(const std::string& key) { auto it = _headers.find(key); if (it == _headers.end()) { return \"\"; } return it->second; } // 插入查询字符串 void SetParam(const std::string &key, const std::string &val) { _params.insert(std::make_pair(key, val)); } // 判断是否存在指定的查询字符串 bool HasParam(const std::string &key) { auto it = _params.find(key); if (it == _params.end()) { return false; } return true; } // 获取指定的字符串 std::string GetParam(const std::string &key) { auto it = _params.find(key); if (it == _params.end()) { return \"\"; } return it->second; } // 获取正文长度 size_t ContentLength() { // Content-Length : 1234\\r\\n bool ret = HasHeader(\"Content-Length\"); // 这个Content-Length后面表示正文 if (ret == false) { return 0; } std::string clen = GetHeader(\"Content-Length\"); // 获取到的正文是字符串,键值对(string,string) return std::stol(clen); // clen字符串转长整型(默认10进制) } // 判断是长连接还是短连接,true是短连接,false是长连接 bool Close() { // 没有Connection,或者有Connection但是值为close都是短连接,否则为长连接 if (HasHeader(\"Connection\") == true && GetHeader(\"Connection\") == \"keep-alive\") { return false; } return true; }public: std::string _method; // 请求方法 std::string _path; // 资源路径 std::string _version; // 协议版本 std::string _body; // 请求正文 std::smatch _matches; // 资源路径中正则提取到的数据 std::unordered_map<std::string, std::string> _headers; // 头部字段 std::unordered_map<std::string, std::string> _params; // 查询字符串};
5-3、HttpResponse模块【存储HTTP响应信息】
设计思想:
HttpResponse模块 : 存储HTTP响应信息要素,提供简单的功能性接口
响应信息要素:1. 响应状态码2. 头部字段3. 响应正文4.重定向信息(是否进行了重定向的标志,重定向的路径)
功能性接口:
1.为了便于成员的访问,因此将成员设置为公有成员
2.头部字段的新增,查询,获取
3.正文的设置
4.重定向的设置
5.长短连接的判断
代码实现:
//////------ HttpResponse模块【存储HTTP响应信息】 ------//////class HttpResponse{public: HttpResponse() : _redirect_flag(false), _statu(200) { } HttpResponse(int statu) : _redirect_flag(false), _statu(statu) { } ~HttpResponse() { } // 重置 ------ 每一次处理完请求就要把请求重置一下,防止当前请求的信息对下一次请求信息造成影响 void ReSet() { _statu = 200; _redirect_flag = false; _redirect_url.clear(); _body.clear(); _headers.clear(); } // 插入头部字段 void SetHeader(const std::string &key, const std::string &val) { _headers.insert(std::make_pair(key, val)); } // 判断是否存在指定的头部字段 bool HasHeader(const std::string &key) { auto it = _headers.find(key); if (it == _headers.end()) { return false; } return true; } // 获取指定头部字段的值 std::string GetHeader(const std::string key) { auto it = _headers.find(key); if (it == _headers.end()) { return \"\"; } return it->second; } // 设置正文 void SetContent(const std::string &body, const std::string &type = \"text/html\") { _body = body; SetHeader(\"Content-Type\", type); // 设置一下头部字段 } // 重定向设置 void SetRedirect(const std::string &url, int statu = 302) // 302临时重定向 { _redirect_flag = true; _statu = statu; _redirect_url = url; } // 判断是长连接还是短连接,true是短连接,false是长连接 bool Close() { // 没有Connection,或者有Connection但是值为close都是短连接,否则为长连接 if (HasHeader(\"Connection\") == true && GetHeader(\"Connection\") == \"keep-alive\") { return false; } return true; }private: int _statu;// 响应状态码 bool _redirect_flag; // 重定向 std::string _redirect_url; // 重定向地址 std::string _body; // 响应正文 std::unordered_map<std::string, std::string> _headers; // 头部字段};
5-4、HttpContext模块【请求接收上下文】
设计思想:
代码实现:
//////------ HttpContext模块【请求接收上下文】 ------//////#define MAX_LINE 8192 //单行数据的最大长度为 8192 字节(即 8KB)typedef enum{ RECV_HTTP_ERROR, // 出错阶段 RECV_HTTP_LINE, // 请求行阶段 RECV_HTTP_HEAD, // 头部接收阶段 RECV_HTTP_BODY, // 正文接收阶段 RECV_HTTP_OVER // 结束阶段} HttpRecvStatu;class HttpContext{public: HttpContext() : _resp_statu(200), _recv_statu(RECV_HTTP_LINE) { } ~HttpContext() { } // 获取响应状态码 int RespStatu() { return _resp_statu; } // 获取接收状态码 HttpRecvStatu RecvStatu() { return _recv_statu; } // 获取请求信息 HttpRequest &Request() { return _request; } // 接收并解析HTTP请求 void RecvHttpRequest(Buffer *buf) { switch (_recv_statu) // 不需要break,处理流程是需要一步一步向下走 { case RECV_HTTP_LINE: RecvHttpLine(buf); case RECV_HTTP_HEAD: RecvHttpHead(buf); case RECV_HTTP_BODY: RecvHttpBody(buf); } }private: // 获取请求行 bool RecvHttpLine(Buffer *buf) { if (_recv_statu != RECV_HTTP_LINE) { return false; } // 1、获取一行数据,Buffer模块中实现了 std::string line = buf->GetLineAndPop(); // 提取了一行数据,读偏移向后移动 // 2、考虑的地方 : 缓冲区数据不足一行,获取的一行数据超大 if (line.size() == 0) { // 缓冲区数据不足一行,判断缓冲区的可读数据长度,如果很长的数据没有读取到\\r\\n,数据有问题 if (buf->ReadAbleSize() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } // 缓冲区数据不足一行,但是也不多,就等等数据的到来 return true; } if (line.size() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } bool ret = ParseHttpLine(line); if (ret == false) { return false; } // 首行处理完毕,进入头部获取阶段 _recv_statu = RECV_HTTP_HEAD; return true; } // 解析请求行 bool ParseHttpLine(const std::string &line) { std::smatch matches; // 我们请求方法可能有大写,小写以及大小混合,这个时候需要匹配规则需要进行处理这种情况 // std::regex::icase : 表示正则表达式匹配规则忽略大小写!那么我们这里就仍然可以正确提取出相应的字符串 // 提取出字符串之后,后面再使用transform函数把请求方法转换成为大写! std::regex e(\"(GET|POST|HEAD|PUT|DELETE) ([^?]*)(?:\\\\?(.*))? (HTTP/1\\\\.[012])(?:\\r\\n|\\n)?\", std::regex::icase); bool ret = std::regex_match(line, matches, e); // 0 : GET /study/login?user=zhangsan&pass=123123 HTTP/1.1 // 1 : GET // 2 : /study/login // 3 : user=zhangsan&pass=123123 // 4 : HTTP/1.1 if (ret == false) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; // BAD REQUEST return false; } _request._method = matches[1]; // 请求方法 // transform : 对容器中的元素进行批量转换或处理 // 参数1 : 字符串起始位置 // 参数2 : 字符串结束位置 // 参数3 : 转换后的字符串存放在哪里【直接存放到原始字符串起始位置,类似于赋值】 // 参数4 : 使用什么规则/方法转换字符串【toupper是C语音公共字符串,所以说需要再全局域里面查找,::】 std::transform(_request._method.begin(), _request._method.end(), _request._method(), ::toupper); _request._path = Util::UrlDecode(matches[2], false); // 资源路径,需要进行url解码,不需要+转空格 _request._version = matches[4];// 协议版本 // 查询字符串获取,需要解析 std::string query_string = matches[3]; std::vector<std::string> query_string_arry; // 查询字符串格式 :key=val&key=val... 所以先以&符号进行分割,得到各个子串 Util::Split(query_string, \"&\", &query_string_arry); // 对各个子串通过 = 进行分割,得到key和val,得到之后也需要进行url解码 for (auto &str : query_string_arry) { size_t pos = str.find(\"=\"); if (pos == std::string::npos) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; // BAD REQUEST return false; } // 查询字符串空格要转+ std::string key = Util::UrlDecode(str.substr(0, pos), true); std::string val = Util::UrlDecode(str.substr(pos + 1), true); _request.SetParam(key, val); } return true; } // 接收头部信息 bool RecvHttpHead(Buffer *buf) { if (_recv_statu != RECV_HTTP_HEAD) { return false; } // 一行一行取出数据,直到遇到空行为止 头部格式 : key: val\\r\\nkey: val\\r\\nkey: val\\r\\n...... while (1) // 头部不只有一行数据 { // 1、获取一行数据,Buffer模块中实现了 std::string line = buf->GetLineAndPop(); // 提取了一行数据,读偏移向后移动 // 2、考虑的地方 : 缓冲区数据不足一行,获取的一行数据超大 if (line.size() == 0) { // 缓冲区数据不足一行,判断缓冲区的可读数据长度,如果很长的数据没有读取到\\r\\n,数据有问题 if (buf->ReadAbleSize() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } // 缓冲区数据不足一行,但是也不多,就等等数据的到来 return true; } if (line.size() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } if (line == \"\\n\" || line == \"\\r\\n\") // 如果取出的一行数据是空行,那么就退出,表示提取结束 { break; } bool ret = ParseHttpHead(line); if (ret == false) { return false; } } // 头部处理完毕,进入正文获取阶段 _recv_statu = RECV_HTTP_BODY; return true; } // 解析头部信息 bool ParseHttpHead(const std::string &line) { // 头部格式 : key: val\\r\\nkey: val\\r\\nkey: val\\r\\n...... // 去掉末尾的回车换行,因为不去掉的话,Close里面进行判断长短连接的时候,keep-alive后面带着\\r\\n了,长连接匹配失败 if (line.back() == \'\\n\') { line.pop_back(); // 去掉换行 } if (line.back() == \'\\r\') { line.pop_back(); // 去掉回车 } size_t pos = line.find(\": \"); if (pos == std::string::npos) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; // URI TOO LONG return false; } // 这里和解码没有关系了 std::string key = line.substr(0, pos); std::string val = line.substr(pos + 2); // 【: 】是两个字符,+2 _request.SetHeader(key, val); } // 接收正文信息 bool RecvHttpBody(Buffer *buf) { if (_recv_statu != RECV_HTTP_BODY) { return false; } // 1、获取正文长度 size_t content_length = _request.ContentLength(); if (content_length == 0) { // 没有正文,请求接收完毕 _recv_statu = RECV_HTTP_OVER; return true; } // 2、当前接收了多少正文,就是往_request._body中放了多少数据了 size_t real_len = content_length - _request._body.size(); // 实际还需要的正文长度 // 3、接收正文放到body中,判断缓冲区中的数据是不是全部的正文(看看是多余一条正文,还是不足一条正文) // 4、缓冲区中的数据,包含了当前请求的所有正文,则取出所有数据 if (buf->ReadAbleSize() >= real_len) { _request._body.append(buf->ReadPosition(), real_len); buf->MoveReadOffset(real_len); _recv_statu = RECV_HTTP_OVER; return true; } // 5、缓冲区中的数据,不足一条正文的数据,则取出所有数据放到body中,等待新数据到来 _request._body.append(buf->ReadPosition(), buf->ReadAbleSize()); buf->MoveReadOffset(buf->ReadAbleSize()); return true; }private: int _resp_statu; // 响应状态码 HttpRecvStatu _recv_statu; // 当前接收以及解析状态 HttpRequest _request; // 已经解析得到的请求信息};
5-5、HttpServer模块【HTTP协议支持所有模块的整合】
设计思想:
HttpServer模块:对于HTTP协议支持所有模块的整合,让HTTP服务器搭建更加简单
设计一张请求路由表:
表中记录了针对哪个请求,应该使用哪个函数来进行业务处理的映射关系【表中记录了HTTP请求与对应的业务处理函数的映射关系】
当服务器收到了一个请求,就在请求路由表中,查找有没有对应请求的处理函数,如果有,则执行对应的处理函数即可,没有就返回405界面(请求方法不支持)
说白了,什么请求,怎么处理,由用户来设定,服务器收到了请求只需要执行函数即可
这样做的好处:用户只需要实现业务处理函数,然后将请求与处理函数的映射关系,添加到服务器中
而服务器只需要接收数据,解析数据,查找路由表映射关系,执行业务处理函数。
要实现简便的搭建HTTP服务器,所需要的要素和提供的功能
要素:
路由映射表记录对应请求方法的请求的处理函数映射关系---更多是功能性请求的处理1.GET请求的路由映射表2.POST请求的路由映射表3. PUT请求的路由映射表4.DELETE请求的路由映射表5.静态资源相对根目录---实现静态资源请求的处理6.高性能TCP服务器---进行连接的IO操作
接口:
服务器处理流程:1.从socket接收数据,放到接收缓冲区2.调用OnMessage回调函数进行业务处理3.对请求进行解析,得到了一个HttpRequest结构,包含了所有的请求要素4.进行请求的路由查找--找到对应请求的处理方法4-1.静态资源请求一些实体文件资源的请求,html,image.…… 将静态资源文件的数据读取出来,填充到HttpResponse结构中4-2.功能性请求---在请求路由映射表中查找处理函数,找到了则执行函数 具体的业务处理,并进行HttpResponse结构的数据填充5.对静态资源请求/功能性请求进行处理完毕后,得到了一个填充了响应信息的HttpResponse对象,组织http格式响应,进行发送
接口使用:
添加请求-处理函数映射信息(GET/POST/PUT/DELETE)设置静态资源根目录设置是否启动超时连接关闭设置线程池中线程数量启动服务器OnConnected---用于给TcpServer设置协议上下文OnMessage----用于进行缓冲区数据解析处理获取上下文,进行缓冲区数据解析请求的路由查找静态资源请求查找和处理 功能性请求的查找和处理 组织响应进行回复
最终提供给用户一套组件,让用户完成服务器的搭建
代码实现:
//////------ HttpServer模块【HTTP协议支持所有模块的整合】 ------//////using Handler = std::function<void(const HttpRequest &, HttpResponse *)>;// using Handlers = std::unordered_map;// 如果我们map里面存string,那么到时候还是要进行编译,编译成为regex,效率比较慢,直接存储regex提高效率// using Handlers = std::unordered_map;// 在插入unordered_map的4个路由表的时候,因为我们请求路径(请求正则表达式)用的是string,而string转regex不支持// 【unordered_map进行insert时==双等号重载有问题(string与regex)】// 所以采用vector里面存放哈希表using Handlers = std::vector<std::pair<std::regex, Handler>>;class HttpServer{public: HttpServer(int port, int timeout = DEFAULT_TIMEOUT) : _server(port) { _server.EnableInactiveRelease(timeout); // 超时时间,直接构造函数设置 _server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1)); _server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2)); } ~HttpServer() { } // 设置静态资源根目录 --- 相对根目录 void SetBasedir(const std::string &path) { assert(Util::IsDirectory(path) == true); _basedir = path; } // 设置/添加,请求(请求正则表达式)与处理函数的映射关系 --- 添加到get路由表 void Get(const std::string &pattern, Handler &handlers) { _get_route.push_back(std::make_pair(std::regex(pattern), handlers)); } // 设置/添加,请求(请求正则表达式)与处理函数的映射关系 --- 添加到post路由表 void Post(const std::string &pattern, Handler &handlers) { _post_route.push_back(std::make_pair(std::regex(pattern), handlers)); } // 设置/添加,请求(请求正则表达式)与处理函数的映射关系 --- 添加到put路由表 void Put(const std::string &pattern, Handler &handlers) { _put_route.push_back(std::make_pair(std::regex(pattern), handlers)); } // 设置/添加,请求(请求正则表达式)与处理函数的映射关系 --- 添加到delete路由表 void Delete(const std::string &pattern, Handler &handlers) { _delete_route.push_back(std::make_pair(std::regex(pattern), handlers)); } // 设置从属线程数量 void SetThreadCount(int count) { _server.SetThreadCount(count); } // 监听 void Listen() { _server.Start(); }private: // 错误响应 void ErrorHandler(const HttpRequest &req, HttpResponse *rsp) { // 1、组织一个错误展示页面 std::string body; body += \"\"; body += \"\"; body += \"\"; body += \"\"; body += \"\"; body += \"\"
; // h1是标题标签里面内容会被加大加粗 body += std::to_string(rsp->_statu); body += \" \"; body += Util::StatuDesc(rsp->_statu); body += \"\"; body += \"\"; body += \"\"; // 2、将页面数据,当作响应正文,放入rsp中 rsp->SetContent(body, \"text/html\"); } // 将HttpResponse中的要素安装HTTP协议格式进行组织,然后发送 void WriteResponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp) { // 1、完善头部字段 if (req.Close() == true) // 长短连接填充 { rsp.SetHeader(\"Connection\", \"close\"); } else { rsp.SetHeader(\"Connection\", \"keep-alive\"); } if (rsp._body.empty() == false && rsp.HasHeader(\"Content-Length\") == false) // 正文长度填充 { rsp.SetHeader(\"Content-Length\", std::to_string(rsp._body.size())); } if (rsp._body.empty() == false && rsp.HasHeader(\"Content-Type\") == false) // 正文类型填充 { rsp.SetHeader(\"Content-Type\", \"application/octet-stream\"); // 二进制流 } if (rsp._redirect_flag == true) // 重定向填充 { rsp.SetHeader(\"Location\", rsp._redirect_url); } // 2、将rsp中的要素安装HTTP协议格式进行组织 std::stringstream rsp_str; // 首行内容 : 协议版本 响应状态码 状态码描述 \\r\\n rsp_str << req._version << \" \" << std::to_string(rsp._statu) << \" \" << Util::StatuDesc(rsp._statu) << \"\\r\\n\"; // 头部内容 for (auto &head : rsp._headers) { rsp_str << head.first << \": \" << head.second << \"\\r\\n\"; } rsp_str << \"\\r\\n\"; // 空行 rsp_str << rsp._body; // 正文 // 3、发送数据 // stringstream类型,要先获取到对象,再转C字符串 conn->Send(rsp_str.str().c_str(), rsp_str.str().size()); } // 判断一个请求是不是静态资源请求 bool IsFileHandler(const HttpRequest &req) { // 1、必须设置了静态资源根目录 if (_basedir.empty()) { return false; } // 2、请求方法必须是GET或者HEAD【GET或者HEAD不一定是静态资源请求,也可能是功能性请求,但是静态资源请求一定是GET或者HEAD】 if (req._method != \"GET\" && req._method != \"HEAD\") { return false; } // 3、请求资源的路径必须是一个合法的路径 if (Util::ValidPath(req._path) == false) { return false; } // 4、请求的资源必须存在,且是一个普通文件 ////// 特殊的请求资源 : 【相对根目录(web根目录)/ ,或者是目录 /image/ 】这种情况后面默认追加 index.html(首页) // index.html /image/a.png // 不要忘了前缀的相对根目录,也就是将请求路径转换为【实际存在的路径】 /image/a.png -> ./wwwroot/image/a.png std::string req_path = _basedir + req._path; if (req._path.back() == \'/\') { req_path += \"index.html\"; // 避免直接修改请求资源路径,用一个临时变量来进行修改 } if (Util::IsRegular(req_path) == false) { return false; } return true; } // 静态资源请求处理 --- 将静态资源文件的数据读取出来,放到rsp的正文_body中,并设置mime void FileHandler(const HttpRequest &req, HttpResponse *rsp) { // 将请求路径转换为【实际存在的路径】/image/a.png -> ./wwwroot/image/a.png std::string req_path = _basedir + req._path; if (req._path.back() == \'/\') { req_path += \"index.html\"; // 避免直接修改请求资源路径,用一个临时变量来进行修改 } bool ret = Util::ReadFile(req_path, &rsp->_body); if (ret == false) { return; } std::string mime = Util::ExtMime(req_path); // 通过文件扩展名来获取对应的mime rsp->SetHeader(\"Content-Type\", mime); return; } // 功能性请求的分类处理 void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers) { // 在对应请求方法的路由表中,查找对应资源的处理函数,有就调用,没有就返回404 // 路由表存储的键值对 --- 正则表达式 : 处理函数 // 使用正则表达式对请求资源的路径进行正则匹配,匹配成功就使用对应函数进行处理 // number/d+ /number/1234... 也可以(d+)提取出数字出来 for (auto &handler : handlers) { // std::regex e(handler->first); const std::regex &e = handler.first; // 节约string编译成为regex的过程,提高效率 const Handler &functor = handler.second; // handlers是数组,每一个数组元素是哈希表,哈希表第二个元素是Handler bool ret = std::regex_match(req._path, req._matches, e); if (ret == false) { continue; } functor(req, rsp); // 传入请求信息和空的rsp执行处理函数 return; } rsp->_statu = 404; // NOT FOUNT【支持请求方法,但是没有对应的执行函数】 } // 路由查找 void Route(HttpRequest &req, HttpResponse *rsp) { // 1、对请求进行分辨,是一个静态资源请求,还是一个功能性请求 // 静态资源请求,进行静态资源处理 // 功能性资源请求,需要通过几个请求路由表来确定是否有处理函数 // 如果既不是静态资源请求,也没有功能性请求处理函数,那就返回405【请求方法不支持】 // GET、HEAD这里默认是静态资源请求【HEAD不要请求正文,只要请求头部】 if (IsFileHandler(req) == true) { // 是一个静态资源请求,进行静态资源请求的处理 return FileHandler(req, rsp); } if (req._method == \"GET\" || req._method == \"HEAD\") { return Dispatcher(req, rsp, _get_route); } else if (req._method == \"POST\") { return Dispatcher(req, rsp, _post_route); } else if (req._method == \"PUT\") { return Dispatcher(req, rsp, _put_route); } else if (req._method == \"DELETE\") { return Dispatcher(req, rsp, _delete_route); } else { rsp->_statu = 405; // Method Not Allowed【请求方法不支持】 return; } } // 设置上下文 void OnConnected(const PtrConnection &conn) { conn->SetContext(HttpContext()); // conn->SetContext调用的是智能指针所管理的对象的成员函数 SetContext DBG_LOG(\"new connectioin %p\", conn.get()); // conn.get()是获取智能指针所管理的原始指针 } // 缓冲区数据解析 + 处理 void OnMessage(const PtrConnection &conn, Buffer *buffer) { while (buffer->ReadAbleSize() > 0) { // 1、获取上下文 HttpContext *context = conn->GetContext()->get<HttpContext>(); // 2、通过上下文对缓冲区数据进行解析,得到HttpRequest对象 context->RecvHttpRequest(buffer); // 接收并解析HTTP请求 HttpRequest &req = context->Request(); // 获取出请求数据,得到HttpRequest对象 HttpResponse rsp(context->RespStatu()); // 响应结构,拿HttpRequest结构的状态码来初始化HttpResponse结构 // 2-1、如果缓冲区的数据解析出错,直接回复出错响应 if (context->RespStatu() >= 400) { // 进行错误响应,关闭连接 // 填充一个错误显示页面到rsp中,上面不给context->RespStatu()初始化rsp这里会出问题 ErrorHandler(req, &rsp); WriteResponse(conn, req, rsp); // 组织响应发送给客户端 conn->Shutdown(); // 短连接关闭 return; } if (context->RecvStatu() != RECV_HTTP_OVER) { // 当前请求还没有接收完整,则退出,等到有新数据到来再重新继续处理 return; } // 2-2、如果解析正确,且请求已经获取完毕,才继续进行处理 // 3、请求路由 + 业务处理 Route(req, &rsp); // 4、对HttResponse进行组织发送 WriteResponse(conn, req, rsp); // 5、重置上下文 context->ReSet(); // 6、根据长短连接判断是否关闭连接或者继续处理 if (rsp.Close() == true) { conn->Shutdown(); // 短连接关闭 } } return; }private: // string是正则表达式,因为请求资源路径如果有数字,那情况太多了 Handlers _get_route; // 请求路由表 Handlers _post_route; Handlers _put_route; Handlers _delete_route; std::string _basedir; // 静态资源根目录P TcpServer _server;};
5-6、HttpServer模块简单测试【Postman】
这里要使用到Postman
工具
目录:
测试代码:
#include \"http.hpp\"#define WWWROOT \"./wwwroot/\"////// 业务处理std::string RequestStr(const HttpRequest &req){ std::stringstream ss; ss << req._method << \" \" << req._path << \" \" << req._version << \"\\r\\n\"; // 请求行 for (auto &it : req._params) { ss << it.first << \": \" << it.second << \"\\r\\n\"; // 查询字符串 } for (auto &it : req._headers) { ss << it.first << \": \" << it.second << \"\\r\\n\"; } ss << \"\\r\\n\"; // 空行 ss << req._body; // 正文 return ss.str();}void Hello(const HttpRequest &req, HttpResponse *rsp){ rsp->SetContent(RequestStr(req), \"text/plain\");}void Login(const HttpRequest &req, HttpResponse *rsp){ rsp->SetContent(RequestStr(req), \"text/plain\");}void PutFile(const HttpRequest &req, HttpResponse *rsp){ rsp->SetContent(RequestStr(req), \"text/plain\");}void DelFile(const HttpRequest &req, HttpResponse *rsp){ rsp->SetContent(RequestStr(req), \"text/plain\");}int main(){ ////// 服务器搭建 HttpServer server(8080); server.SetThreadCount(3); server.SetBasedir(WWWROOT); // 设置静态资源根目录,告诉服务器有静态资源请求到来,需要到哪里去找资源文件 server.Get(\"/hello\", Hello); server.Post(\"/login\", Login); server.Put(\"/1234.txt\", PutFile); server.Delete(\"/1234.txt\", DelFile); server.Listen(); return 0;}
<meta charset=\"utf8\"> <form action=\"/login\" method=\"post\"> <input type=\"text\" name=\"username\">
<input type=\"password\" name=\"password\">
<input type=\"submit\" value=\"提交\" name=\"submit\">
服务器回显测试结果
登录请求测试结果
PUT 请求测试结果
DELETE请求测试结果
六、功能测试
接下来来进行一些边界性的功能测试,观察服务器在边界情况下能够正常运行
6-1、服务器长连接测试
创建一个客户端,设置Connection头部字段【连接字段】为keep-alive【长连接】,观察客户端与服务器的连接超时之后,客户端是否还能够持续与服务器进行通信 ------ 一次连接,持续通信
// 长连接测试1 : 创建一个客户端持续给服务器发送数据,直到超过超时时间看看连接是否正常#include \"../source/server.hpp\"int main(){ Socket cli_sock; cli_sock.CreateClient(\"127.0.0.1\", 8080); std::string req = \"GET /Hello HTTP/1.1\\r\\nConnection: keep-alive\\r\\nContent-Length: 0\\r\\n\\r\\n\"; while (1) { assert(cli_sock.Send(req.c_str(),req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf,1023)); DBG_LOG(\"[%s]\",buf); sleep(3); } cli_sock.Close(); return 0;}
从测试结果可以看到,客户端能够持续与服务器进行通信,并且服务器也不会将客户端连接释放,而是等待客户端主动退出后才会释放
6-2、服务器超时连接测试
客户端连接上服务器后,长时间不给服务器发送数据,观察超时时间 (10s) 后服务器是否会将客户端连接进行释放
测试结果可以看到,服务器超时断开连接功能是正常的
6-3、服务器请求数据错误格式测试
给服务器发送一个数据,告诉服务器要发送1024字节的数据,但是实际发送的数据不足1024,查看服务器处理结果
1、如果数据只发送一次,服务器将得不到完整请求,就不会进行业务处理,客户端也就得不到响应,最终超时关闭连接
2、连着给服务器发送了多次小的请求,服务器会将后边的请求当作前边请求的正文进行处理,而后面处理的时候有可能就会因为处理错误而关闭连接(导致后面的请求解析错误)
预期结果1:
#include \"../source/server.hpp\"int main(){ Socket cli_sock; cli_sock.CreateClient(\"127.0.0.1\", 8080); //发送100字节数据,实际正文只有hello world std::string req = \"GET /Hello HTTP/1.1\\r\\nConnection: keep-alive\\r\\nContent-Length: 100\\r\\n\\r\\nhello world\"; while (1) { assert(cli_sock.Send(req.c_str(),req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf,1023)); DBG_LOG(\"[%s]\",buf); sleep(3); } cli_sock.Close(); return 0;}
可以看到,我们只发送了一次数据,然后不发送数据后,服务器在超时时间内没有拿到完整数据,关闭连接了,结果是正常的
预期结果2:
// 给服务器发送一个数据,告诉服务器要发送1024字节的数据,但是实际发送的数据不足1024,查看服务器处理结果// 1,如果数据只发送一次,服务器将得不到完整请求,就不会进行业务处理,客户端也就得不到响应,最终超时关闭连接// 2,连着给服务器发送了多次小的请求,服务器会将后边的请求当作前边请求的正文进行处理,而后面处理的时候有可能就会因为处理错误而关闭连接(导致后面的请求解析错误)#include \"../source/server.hpp\"int main(){ Socket cli_sock; cli_sock.CreateClient(\"127.0.0.1\", 8080); // 发送100字节数据,实际正文只有hello world std::string req = \"GET /Hello HTTP/1.1\\r\\nConnection: keep-alive\\r\\nContent-Length: 100\\r\\n\\r\\nhello world I am OAA\"; while (1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); assert(cli_sock.Send(req.c_str(), req.size()) != -1); assert(cli_sock.Send(req.c_str(), req.size()) != -1); assert(cli_sock.Send(req.c_str(), req.size()) != -1); assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DBG_LOG(\"[%s]\", buf); sleep(3); } cli_sock.Close(); return 0;}
6-4、服务器业务处理超时测试
业务处理超时,查看服务器的处理情况 : 当服务器达到了一个性能瓶颈,在一次业务处理中花费了太长的时间(超过了服务器设置的非活跃超时时间)
在一次业务处理中耗费太长时间,导致其他的连接也被连累超时,其他的连接有可能会被拖累超时释放
假设现在12345描述符就绪了,在处理1的时候花费了30s处理完,超时了,导致2345描述符因为长时间没有刷新活跃度
1.如果接下来的2345描述符都是通信连接描述符,如果都就绪了,则并不影响,因为接下来就会进行处理并刷新活跃度2.如果接下来的2号描述符是定时器事件描述符,定时器触发超时,执行定时任务,就会将345描述符给释放掉这时候一旦345描述符对应的连接被释放,接下来在处理345事件的时候就会导致程序崩溃(内存访问错误)
因此这时候,在本次事件处理中,并不能直接对连接进行释放,而应该将释放操作压入到任务池中,
等到事件处理完了执行任务池中的任务的时候,再去释放
多个客户端来了,前面的客户端业务处理完之后再进行刷新活跃度,这样前面的客户端不会影响到后面的客户端连接了
避免前面的连接因为业务处理耗时过长,而导致后面的连接无缘无故超时被释放
这里不应该用一个客户端来进行测试,而是要多个客户端进行测试
#include \"../source/server.hpp\"/*业务处理超时,查看服务器的处理情况 当服务器达到了一个性能瓶颈,在一次业务处理中花费了太长的时间(超过了服务器设置的非活跃超时时间) 1.在一次业务处理中耗费太长时间,导致其他的连接也被连累超时,其他的连接有可能会被拖累超时释放 假设现在12345描述符就绪了,在处理1的时候花费了30s处理完,超时了,导致2345描述符因为长时间没有刷新活跃度 1.如果接下来的2345描述符都是通信连接描述符,如果都就绪了,则并不影响,因为接下来就会进行处理并刷新活跃度 2.如果接下来的2号描述符是定时器事件描述符,定时器触发超时,执行定时任务,就会将345描述符给释放掉 这时候一旦345描述符对应的连接被释放,接下来在处理345事件的时候就会导致程序崩溃(内存访问错误) 因此这时候,在本次事件处理中,并不能直接对连接进行释放,而应该将释放操作压入到任务池中, 等到事件处理完了执行任务池中的任务的时候,再去释放*/int main(){ signal(SIGCHLD,SIG_IGN);//忽略信号,不关注僵尸进程 for (int i = 0; i < 10; ++i) // 相当于10个连接去访问服务器,这个时候第一个连接就会拖累后面的9个连接 { pid_t pid = fork(); if (pid < 0) { DBG_LOG(\"fork error\"); return -1; } else if (pid == 0) { Socket cli_sock; cli_sock.CreateClient(\"127.0.0.1\", 8080); // 发送100字节数据,实际正文只有hello world std::string req = \"GET /Hello HTTP/1.1\\r\\nConnection: keep-alive\\r\\nContent-Length: 0\\r\\n\\r\\n\"; while (1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DBG_LOG(\"[%s]\", buf); } cli_sock.Close(); exit(0); } } while(1) { sleep(1); } return 0;}
【打印结果都是超时释放,没有说前面的连接因为业务处理,而导致后面的连接无缘无故超时被释放】
6-5、服务器一次进行多条请求测试
一次给服务器发送多条数据,然后查看服务器的处理结果
每一条请求都应该得到正常处理
// 一次给服务器发送多条数据,然后查看服务器的处理结果// 每一条请求都应该得到正常处理#include \"../source/server.hpp\"int main(){ Socket cli_sock; cli_sock.CreateClient(\"127.0.0.1\", 8080); std::string req = \"GET /Hello HTTP/1.1\\r\\nConnection: keep-alive\\r\\nContent-Length: 0\\r\\n\\r\\n\"; req += \"GET /Hello HTTP/1.1\\r\\nConnection: keep-alive\\r\\nContent-Length: 0\\r\\n\\r\\n\"; req += \"GET /Hello HTTP/1.1\\r\\nConnection: keep-alive\\r\\nContent-Length: 0\\r\\n\\r\\n\"; while (1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1);// 发送一次数据,但是一次数据发送多条数据! char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DBG_LOG(\"[%s]\", buf); sleep(3); } cli_sock.Close(); return 0;}
测试结果可以看到服务器处理多条数据没有问题
6-6、服务器大文件传输测试
给服务器上传一个大文件,服务器将文件保存下来,观察处理结果
上传的文件应该和服务器保存的文件一致
linux中创建一个1G大小的文件命令:
dd if=/dev/zero of=./hello.txt bs=100M count=3从0号数据文件zero里面取出数据放到hello.txt文件里面而0号数据文件zero里面的数据都是【空】bs文件大小count文件个数
注意:我们如果bs=1G,那么服务端和客户端进行处理的时候因为双方都需要内存,所以说要2G内存,我们要看看程序环境的内存够不够
free指令 : 查看系统剩余内存
-s : 显示文件和目录的大小
-h : 以人类可读的格式显示文件大小
// 给服务器上传一个大文件,服务器将文件保存下来,观察处理结果// 上传的文件应该和服务器保存的文件一致#include \"../source/http/http.hpp\" // 要使用http.http里面的Utilint main(){ Socket cli_sock; cli_sock.CreateClient(\"127.0.0.1\", 8080); // PUT方法,对1234.txt文件进行操作 std::string req = \"PUT /1234.txt HTTP/1.1\\r\\nConnection: keep-alive\\r\\n\"; std::string body; Util::ReadFile(\"./hello.txt\", &body); req += \"Content-Length: \" + std::to_string(body.size()) + \"\\r\\n\\r\\n\"; // while (1) // { assert(cli_sock.Send(req.c_str(), req.size()) != -1);//发送一次就行了 assert(cli_sock.Send(body.c_str(), body.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DBG_LOG(\"[%s]\", buf); sleep(3); // } cli_sock.Close(); return 0;}
接下来又需要用到我们md5sum
这个命令来看看两个文件是不是一样的数据了
可以看到结果一样!
因为是云服务的原因,所以说内存资源比较少,能发送处理的数据资源有限
6-7、服务器性能压力测试
性能压力测试:
并发量:可以同时处理多少客户端的请求而不会出现连接失败
QPS:每秒钟处理的包的数量
工具:webbench工具
原理:创建大量的进程,在进程中,创建客户端连接服务器,发送请求,收到响应后关闭连接,开始下一个连接的建立
测试环境:
服务器为2核2G带宽4M的云服务器
在服务器上运行WebBench程序
使用 WebBench 程序以1000的并发量,进行1h的测试。
测试时,把日志等级调整为ERR,这样就不会打印出很多信息了
测试结果:
结果是1400QPS
7、项目源码
本项目源码