TCP模型,mqtt协议01 day41
十一:TCP模型
一:select
/epoll
的双工通信
// server#include #include #include #include #include #include #include #include #include #include #include /* See NOTES */#include #include typedef struct sockaddr *(SA);int main(int argc, char **argv) { // udp 用户数据报 int udpfd = socket(AF_INET, SOCK_DGRAM, 0); if (-1 == udpfd) { perror(\"socket\"); return 1; } //给套接字 设定IP +port // man 7 ip // ser 是本服务器的地址+port ,cli 准备存储客户端 的地址+port struct sockaddr_in ser, cli; bzero(&ser, sizeof(ser)); bzero(&cli, sizeof(cli)); ser.sin_family = AF_INET; // ipv4 // host to net short //小端转大端 ser.sin_port = htons(50000); //点分十进制转 大端 ser.sin_addr.s_addr = inet_addr(\"192.168.31.33\"); int ret = bind(udpfd, (SA)&ser, sizeof(ser)); if (-1 == ret) { perror(\"bind\"); return 1; } // 1 创建集合 fd_set rd_set, tmp_set; // 2 add fd FD_ZERO(&rd_set); FD_ZERO(&tmp_set); FD_SET(udpfd, &tmp_set); FD_SET(0, &tmp_set); socklen_t len = sizeof(cli); char buf[512] = {0}; recvfrom(udpfd, buf, sizeof(buf), 0, (SA)&cli, &len); bzero(buf, sizeof(buf)); while (1) { rd_set = tmp_set; select(udpfd + 1, &rd_set, NULL, NULL, NULL); if (FD_ISSET(0, &rd_set)) { printf(\"自己:\"); fgets(buf, sizeof(buf), stdin); buf[strlen(buf) - 1] = \'\\0\'; sendto(udpfd, buf, strlen(buf), 0, (SA)&cli, len); if (0 == strcmp(buf, \"#quit\\n\")) { exit(0); } } if (FD_ISSET(udpfd, &rd_set)) { recvfrom(udpfd, buf, sizeof(buf), 0, NULL, NULL); if (0 == strcmp(buf, \"#quit\\n\")) { exit(0); } printf(\"对面:%s\\n\", buf); } } close(udpfd); // system(\"pause\"); return 0;}-----------------------------------------------------------// client 客户端#include #include #include #include #include #include #include #include #include #include /* See NOTES */#include #include typedef struct sockaddr *(SA);int main(int argc, char **argv) { // internet ip v4 int udpfd = socket(AF_INET, SOCK_DGRAM, 0); if (-1 == udpfd) { perror(\"socket\"); return 1; } struct sockaddr_in ser; bzero(&ser, sizeof(ser)); ser.sin_family = AF_INET; // ipv4 // host to net short //小端转大端 ser.sin_port = htons(50000); //点分十进制转 大端 ser.sin_addr.s_addr = inet_addr(\"192.168.31.33\"); // 1 创建集合 fd_set rd_set, tmp_set; // 2 add fd FD_ZERO(&rd_set); FD_ZERO(&tmp_set); FD_SET(udpfd, &tmp_set); FD_SET(0, &tmp_set); socklen_t len = sizeof(ser); char buf[512] = \"start\"; sendto(udpfd, buf, strlen(buf), 0, (SA)&ser, sizeof(ser)); while (1) { rd_set = tmp_set; select(udpfd + 1, &rd_set, NULL, NULL, NULL); char buf[512] = {0}; if (FD_ISSET(0, &rd_set)) { printf(\"自己:\"); fgets(buf, sizeof(buf), stdin); buf[strlen(buf) - 1] = \'\\0\'; sendto(udpfd, buf, strlen(buf), 0, (SA)&ser, len); if (0 == strcmp(buf, \"#quit\\n\")) { exit(0); } } if (FD_ISSET(udpfd, &rd_set)) { recvfrom(udpfd, buf, sizeof(buf), 0, NULL, NULL); if (0 == strcmp(buf, \"#quit\\n\")) { exit(0); } printf(\"对面:%s\\n\", buf); } } close(udpfd); // system(\"pause\"); return 0;}
// server#include #include #include #include #include #include #include #include #include #include #include #include #include #include /* See NOTES */#include #include #include int add_fd(int epfd, int fd) { struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = fd; int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); if (-1 == ret) { perror(\"add_fd\"); return 1; } return 0;}int del_fd(int epfd, int fd) { struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = fd; int ret = epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev); if (-1 == ret) { perror(\"add_fd\"); return 1; } return 0;}typedef struct sockaddr *(SA);int main(int argc, char **argv) { // udp 用户数据报 int udpfd = socket(AF_INET, SOCK_DGRAM, 0); if (-1 == udpfd) { perror(\"socket\"); return 1; } //给套接字 设定IP +port // man 7 ip // ser 是本服务器的地址+port ,cli 准备存储客户端 的地址+port struct sockaddr_in ser, cli; bzero(&ser, sizeof(ser)); bzero(&cli, sizeof(cli)); ser.sin_family = AF_INET; // ipv4 // host to net short //小端转大端 ser.sin_port = htons(50000); //点分十进制转 大端 ser.sin_addr.s_addr = inet_addr(\"192.168.31.33\"); int ret = bind(udpfd, (SA)&ser, sizeof(ser)); if (-1 == ret) { perror(\"bind\"); return 1; } // ------------------------------------------------------------- int epfd = epoll_create(2); if (-1 == epfd) { perror(\"epoll_create\"); exit(1); } struct epoll_event rev[2]; // 2 add fd add_fd(epfd, 0); add_fd(epfd, udpfd); socklen_t len = sizeof(cli); char buf_t[512] = {0}; recvfrom(udpfd, buf_t, sizeof(buf_t), 0, (SA)&cli, &len); while (1) { char buf[1024] = {0}; int ep_ret = epoll_wait(epfd, rev, 2, -1); // int timeout:指定等待事件的最大时间, //如果设置为 -1,则表示无限等待,0 表示非阻塞模式。 for (int i = 0; i < ep_ret; ++i) { if (rev[i].data.fd == udpfd) { int rd_ret = recvfrom(udpfd, buf, sizeof(buf), 0, NULL, NULL); // 管道的写端关闭 if (rd_ret <= 0) { del_fd(epfd, udpfd); close(udpfd); continue; } // recvfrom(udpfd, buf, sizeof(buf), 0, NULL, NULL); printf(\"对面:%s\\n\", buf); } if (0 == rev[i].data.fd) { bzero(buf, sizeof(buf)); printf(\"自己:\"); fgets(buf, sizeof(buf), stdin); buf[strlen(buf) - 1] = \'\\0\'; sendto(udpfd, buf, strlen(buf), 0, (SA)&cli, len); } } } close(udpfd); // system(\"pause\"); return 0;}-----------------------------------------------------------------// client 客户端#include #include #include #include #include #include #include #include #include #include #include #include #include /* See NOTES */#include #include #include int add_fd(int epfd, int fd) { struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = fd; int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); if (-1 == ret) { perror(\"add_fd\"); return 1; } return 0;}int del_fd(int epfd, int fd) { struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = fd; int ret = epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev); if (-1 == ret) { perror(\"add_fd\"); return 1; } return 0;}typedef struct sockaddr *(SA);int main(int argc, char **argv) { // internet ip v4 int udpfd = socket(AF_INET, SOCK_DGRAM, 0); if (-1 == udpfd) { perror(\"socket\"); return 1; } struct sockaddr_in ser; bzero(&ser, sizeof(ser)); ser.sin_family = AF_INET; // ipv4 // host to net short //小端转大端 ser.sin_port = htons(50000); //点分十进制转 大端 ser.sin_addr.s_addr = inet_addr(\"192.168.31.33\"); // ------------------------------------------------------------- int epfd = epoll_create(2); if (-1 == epfd) { perror(\"epoll_create\"); exit(1); } struct epoll_event rev[2]; // 2 add fd add_fd(epfd, 0); add_fd(epfd, udpfd); socklen_t len = sizeof(ser); char buf[512] = \"start\"; sendto(udpfd, buf, strlen(buf), 0, (SA)&ser, sizeof(ser)); while (1) { char buf[1024] = {0}; int ep_ret = epoll_wait(epfd, rev, 2, -1); // int timeout:指定等待事件的最大时间, //如果设置为 -1,则表示无限等待,0 表示非阻塞模式。 for (int i = 0; i < ep_ret; ++i) { if (rev[i].data.fd == udpfd) { int rd_ret = recvfrom(udpfd, buf, sizeof(buf), 0, NULL, NULL); // 管道的写端关闭 if (rd_ret <= 0) { del_fd(epfd, udpfd); close(udpfd); continue; } // recvfrom(udpfd, buf, sizeof(buf), 0, NULL, NULL); if (0 == strcmp(buf, \"#quit\\n\")) { exit(0); } printf(\"对面:%s\\n\", buf); } if (0 == rev[i].data.fd) { bzero(buf, sizeof(buf)); printf(\"自己:\"); fgets(buf, sizeof(buf), stdin); buf[strlen(buf) - 1] = \'\\0\'; sendto(udpfd, buf, strlen(buf), 0, (SA)&ser, len); if (0 == strcmp(buf, \"#quit\\n\")) { exit(0); } } } } close(udpfd); // system(\"pause\"); return 0;}
二:TCP:select
多个客户端连接服务器
listfd
的误区:listfd 3
不代表只能绑三个客户端,他中间有时间间断要去三次握手,第四个只是要等一会才能接受
listen(sockfd, 3);表示:内核最多允许有 3 个客户端连接处于“等待被你服务器 accept() 处理”的状态。超过 3 个之后:1.要么新来的客户端连接会被拒绝(返回 ECONNREFUSED 错误),2.要么客户端阻塞等待(取决于系统和协议栈设置)。
`listen(fd, 3)` 并不是客户端连接总数限制,而是等待 `accept` 的连接数限制。你完全可以接入成百上千个客户端,只要你能及时 `accept()` 并处理。
//ser#include #include #include #include #include #include #include /* See NOTES */#include #include #include /* According to earlier standards */#include #include #include typedef struct sockaddr*(SA);int main(int argc, char** argv){ //监听套接字 功能检测是否有客户端 连连接服务器 int listfd = socket(AF_INET, SOCK_STREAM, 0); if (-1 == listfd) { perror(\"socket\"); return 1; } // man 7 ip struct sockaddr_in ser, cli; bzero(&ser, sizeof(ser)); bzero(&cli, sizeof(cli)); ser.sin_family = AF_INET; ser.sin_port = htons(50000); // 代表本机地址 外部客户端可以连接到服务器 ser.sin_addr.s_addr = INADDR_ANY; int ret = bind(listfd, (SA)&ser, sizeof(ser)); if (-1 == ret) { perror(\"bind\"); return 1; } // 同一时刻可以服务器建立连接的排队数 listen(listfd, 3); socklen_t len = sizeof(cli); //1 create set fd_set rd_set,tmp_set; //2 add fd FD_ZERO(&rd_set); FD_ZERO(&tmp_set); FD_SET(listfd,&tmp_set); int maxfd = listfd; while (1) { rd_set = tmp_set; select(maxfd+1,&rd_set,NULL,NULL,NULL); for(int i = listfd ;i<maxfd+1;++i) { if(FD_ISSET(i,&rd_set) && i ==listfd) // i代表文件描述符,i就绪(读),且 i是监听套接字 { //和客户端建立连接,并获得通信套接字,这个套接字就代表客户端 int conn = accept(listfd, (SA)&cli, &len); if (-1 == conn) { perror(\"accept\"); continue; } FD_SET(conn,&tmp_set); if(conn>maxfd) { maxfd =conn; } } if(FD_ISSET(i,&rd_set) && i!=listfd) // 是通信套接字的情况 { int conn = i; char buf[512] = {0}; int rec_ret= recv(conn, buf, sizeof(buf), 0); if(rec_ret<=0) { printf(\"client offline...\\n\"); FD_CLR(conn,&tmp_set); close(conn); continue; } printf(\"from cli:%s\\n\",buf); time_t tm; time(&tm); sprintf(buf, \"%s %s\", buf, ctime(&tm)); int sd_ret = send(conn, buf, strlen(buf), 0); if(sd_ret<=0) { perror(\"send\"); FD_CLR(conn,&tmp_set); close(conn); continue; } } } } close(listfd); return 0;}---------------------------------------------------------------//cli#include #include #include #include #include #include #include #include /* See NOTES */#include #include typedef struct sockaddr *(SA);int main(int argc, char **argv){ int conn = socket(AF_INET, SOCK_STREAM, 0); if (-1 == conn) { perror(\"socket\"); return 1; } struct sockaddr_in ser; bzero(&ser, sizeof(ser)); ser.sin_family = AF_INET; ser.sin_port = htons(50000); // 代表本机地址 外部客户端可以连接到服务器 ser.sin_addr.s_addr = inet_addr(\"192.168.31.33\"); int ret = connect(conn, (SA)&ser, sizeof(ser)); if (-1 == ret) { perror(\"connect\"); return 1; } while (1) { char buf[512] = \"hello,this tcp test\"; int sd_ret = send(conn, buf, strlen(buf), 0); if(sd_ret<=0) { break; } bzero(buf, sizeof(buf)); int ret_rec = recv(conn, buf, sizeof(buf), 0); if(ret_rec<=0) { break; } printf(\"from ser:%s\\n\",buf); sleep(1); } close(conn); // system(\"pause\"); return 0;}
三:TCP:epoll
//ser#include #include #include #include #include #include #include #include #include /* See NOTES */#include #include typedef struct sockaddr *(SA);int add_fd(int epfd, int fd) { struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = fd; int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); if (-1 == ret) { perror(\"add_fd\"); return 1; } return 0;}int del_fd(int epfd, int fd) { struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = fd; int ret = epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev); if (-1 == ret) { perror(\"add_fd\"); return 1; } return 0;}int main(int argc, char **argv) { //监听套接字 功能检测是否有客户端 连连接服务器 int listfd = socket(AF_INET, SOCK_STREAM, 0); if (-1 == listfd) { perror(\"socket\"); return 1; } // man 7 ip struct sockaddr_in ser, cli; bzero(&ser, sizeof(ser)); bzero(&cli, sizeof(cli)); ser.sin_family = AF_INET; ser.sin_port = htons(50000); // 代表本机地址 外部客户端可以连接到服务器 ser.sin_addr.s_addr = INADDR_ANY; int ret = bind(listfd, (SA)&ser, sizeof(ser)); if (-1 == ret) { perror(\"bind\"); return 1; } // 同一时刻可以服务器建立连接的排队数 listen(listfd, 3); socklen_t len = sizeof(cli); struct epoll_event rev[10] = {0}; int epfd = epoll_create(10); if (-1 == epfd) { perror(\"epoll create\"); exit(1); } add_fd(epfd, listfd); while (1) { int ep_ret = epoll_wait(epfd, rev, 10, 5000); if (ep_ret > 0) { for (int i = 0; i < ep_ret; i++) { if (rev[i].data.fd == listfd) { int conn = accept(listfd, (SA)&cli, &len); if (-1 == conn) { perror(\"accept\"); continue; } add_fd(epfd, conn); } else { int conn = rev[i].data.fd; char buf[512] = {0}; int rec_ret = recv(conn, buf, sizeof(buf), 0); if (rec_ret <= 0) { printf(\"client offline..\\n\"); del_fd(epfd, conn); close(conn); continue; } getpeername(conn, (SA)&cli, &len); // htons ntohs printf(\"from cli ip:%s port :%d :%s\\n\", inet_ntoa(cli.sin_addr), ntohs(cli.sin_port), buf); time_t tm; time(&tm); sprintf(buf, \"%s %s\", buf, ctime(&tm)); int sd_ret = send(conn, buf, strlen(buf), 0); if (sd_ret <= 0) { // break; printf(\"client offline...\\n\"); del_fd(epfd, conn); close(conn); continue; } } } } else if (0 == ep_ret) { printf(\"time out...\\n\"); continue; } else { perror(\"epoll_wait\"); return 1; } } close(listfd); // system(\"pause\"); return 0;}------------------------------------------------------//cli#include #include #include #include #include #include #include #include /* See NOTES */#include #include typedef struct sockaddr *(SA);int main(int argc, char **argv){ int conn = socket(AF_INET, SOCK_STREAM, 0); if (-1 == conn) { perror(\"socket\"); return 1; } struct sockaddr_in ser; bzero(&ser, sizeof(ser)); ser.sin_family = AF_INET; ser.sin_port = htons(50000); // 代表本机地址 外部客户端可以连接到服务器 ser.sin_addr.s_addr = inet_addr(\"192.168.31.33\"); int ret = connect(conn, (SA)&ser, sizeof(ser)); if (-1 == ret) { perror(\"connect\"); return 1; } while (1) { char buf[512] = \"hello,this tcp test\"; int sd_ret = send(conn, buf, strlen(buf), 0); if(sd_ret<=0) { break; } bzero(buf, sizeof(buf)); int ret_rec = recv(conn, buf, sizeof(buf), 0); if(ret_rec<=0) { break; } printf(\"from ser:%s\\n\",buf); sleep(1); } close(conn); // system(\"pause\"); return 0;}
四:fork
只负责自己的部分,就和fifo
双工通信一样。关闭与自己功能不相关的部分
netsate -anp|less
、
// ser#include #include #include #include #include #include #include #include /* See NOTES */#include #include #include typedef struct sockaddr *(SA);void myhandle(int num) { wait(NULL); }int main(int argc, char **argv) { signal(SIGCHLD, myhandle); //监听套接字 功能检测是否有客户端 连连接服务器 int listfd = socket(AF_INET, SOCK_STREAM, 0); if (-1 == listfd) { perror(\"socket\"); return 1; } // man 7 ip struct sockaddr_in ser, cli; bzero(&ser, sizeof(ser)); bzero(&cli, sizeof(cli)); ser.sin_family = AF_INET; ser.sin_port = htons(50000); // 代表本机地址 外部客户端可以连接到服务器 ser.sin_addr.s_addr = INADDR_ANY; int ret = bind(listfd, (SA)&ser, sizeof(ser)); if (-1 == ret) { perror(\"bind\"); return 1; } // 同一时刻可以服务器建立连接的排队数 listen(listfd, 3); socklen_t len = sizeof(cli); while (1) { int conn = accept(listfd, (SA)&cli, &len); if (-1 == conn) { perror(\"accept\"); close(conn); continue; } pid_t pid = fork(); if (pid > 0) { close(conn); } else if (0 == pid) { close(listfd); while (1) //子线程运行不止一次,他要反复的收发 { char buf[512] = {0}; int rec_ret = recv(conn, buf, sizeof(buf), 0); if (rec_ret <= 0) { printf(\"cli offline..\\n\"); exit(0); } printf(\"from cli:%s\\n\", buf); time_t tm; time(&tm); sprintf(buf, \"%s %s\", buf, ctime(&tm)); int sd_ret = send(conn, buf, strlen(buf), 0); if (sd_ret <= 0) { printf(\"cli offline...\\n\"); exit(0); } } } else { perror(\"fork\"); exit(1); } } close(listfd); // system(\"pause\"); return 0;}
五:th
线程
涉及th
调度问题,因为accept
和pthread_create
如果没有设置同步,在一些情况(比如调度频繁),会导致accept
比pthread_create
还快,就会导致int conn
的值不能正确的传递,发生错误,所以用sem_t
保护conn
值
是一个 POSIX 标准定义的头文件,用于提供信号量(
semaphore
)的接口 sem_t
// ser#include #include #include #include #include #include #include #include #include /* See NOTES */#include #include typedef struct sockaddr *(SA);sem_t sem_arg;void *th(void *arg) { int conn = *(int *)arg; sem_post(&sem_arg); pthread_detach(pthread_self()); //用分离还回收线程,不用join,因为线程不是走一次就直接回收,他也要反复收发 while (1) { char buf[512] = {0}; int rec_ret = recv(conn, buf, sizeof(buf), 0); if (rec_ret <= 0) { printf(\"cli offline \\n\"); break; } printf(\"from cli:%s\\n\", buf); time_t tm; time(&tm); sprintf(buf, \"%s %s\", buf, ctime(&tm)); int sd_ret = send(conn, buf, strlen(buf), 0); if (sd_ret <= 0) { printf(\"cli offline \\n\"); break; } } return NULL;}int main(int argc, char **argv) { //监听套接字 功能检测是否有客户端 连连接服务器 int listfd = socket(AF_INET, SOCK_STREAM, 0); if (-1 == listfd) { perror(\"socket\"); return 1; } // man 7 ip struct sockaddr_in ser, cli; bzero(&ser, sizeof(ser)); bzero(&cli, sizeof(cli)); ser.sin_family = AF_INET; ser.sin_port = htons(50000); // 代表本机地址 外部客户端可以连接到服务器 ser.sin_addr.s_addr = INADDR_ANY; int ret = bind(listfd, (SA)&ser, sizeof(ser)); if (-1 == ret) { perror(\"bind\"); return 1; } // 同一时刻可以服务器建立连接的排队数 listen(listfd, 3); socklen_t len = sizeof(cli); sem_init(&sem_arg, 0, 0); //设置同步,保护arg while (1) { int conn = accept(listfd, (SA)&cli, &len); if (-1 == conn) { perror(\"accept\"); close(conn); continue; } pthread_t tid; pthread_create(&tid, NULL, th, &conn); sem_wait(&sem_arg); } sem_destroy(&sem_arg); close(listfd); // system(\"pause\"); return 0;}
十二:MQTT协议
一:基础概念
mqtt
协议 : 消息队列遥测传输协议
适合嵌入式设备,移动设备,资源受限。带宽低,低功耗。希望传输可靠等场景。
工作过程。 3个端 :发布者,订阅者,broker(代理服务器)。
发布者发送数据(必须携带主题名)到服务器,服务器会查看自己订阅链表, 如果有订阅该主题的订阅者,就会转发数据。
订阅者需要向服务器注册自己关心的主题名,服务器记录该主题被订阅,已经是那个客户端订阅的。
订阅者也可以发布数据,发布者也可以订阅主题。