> 技术文档 > 【Python】WebSockets 库_python websocket

【Python】WebSockets 库_python websocket


第1章:网络通信基石与WebSocket协议的起源

1.1 OSI与TCP/IP模型中的Web通信定位

在深入理解WebSocket协议之前,我们必须首先回顾现代网络通信的基石——OSI(开放系统互联)七层模型和更实际的TCP/IP四层(或五层)模型。这些模型为我们理解数据如何在网络中传输、各种协议如何协同工作提供了统一的框架。

1.1.1 OSI七层模型概述

OSI模型是一个理论框架,将网络通信功能划分为七个抽象层。每一层都为上层提供服务,并利用下层提供的服务。理解其分层有助于我们概念化不同网络协议的职责。

  • 物理层 (Physical Layer):负责传输比特流,定义了电压、线缆、网卡接口等物理特性。例如以太网电缆、Wi-Fi信号。
  • 数据链路层 (Data Link Layer):在物理层之上提供可靠的数据传输。它处理帧的封装、差错控制、流量控制和物理地址(MAC地址)。例如以太网协议、PPP协议。
  • 网络层 (Network Layer):负责数据包在不同网络间的路由,提供逻辑地址(IP地址)。例如IP协议、ICMP协议。
  • 传输层 (Transport Layer):提供端到端的数据传输服务,包括连接的建立、维护与终止,以及数据的分段、重组、差错校验和流量控制。例如TCP、UDP协议。
  • 会话层 (Session Layer):管理应用程序之间的会话,包括会话的建立、管理和终止。例如NetBIOS、RPC。
  • 表示层 (Presentation Layer):处理数据格式的转换、加密解密、数据压缩与解压缩,确保应用程序层能够理解数据。例如JPEG、ASCII、SSL/TLS加密。
  • 应用层 (Application Layer):为最终用户提供网络服务。它直接与应用程序交互,定义了应用程序之间通信的协议。例如HTTP、FTP、SMTP、DNS。

WebSocket,作为一种应用层协议,其实现和功能都建立在这些下层协议之上。它利用了TCP的可靠传输能力,并在HTTP的基础上进行了协议升级。

1.1.2 TCP/IP模型与Web通信

与OSI模型相比,TCP/IP模型是一个更实用的模型,它将OSI的七层简化为四层(有时也说是五层,将网络接口层细分为物理层和数据链路层)。

  • 应用层 (Application Layer):对应OSI的应用层、表示层和会话层。包含了所有处理特定应用程序细节的协议,如HTTP、FTP、DNS、SMTP、以及我们即将深入探讨的WebSocket。
  • 传输层 (Transport Layer):对应OSI的传输层。负责端到端的数据传输。最常用的协议是TCP(传输控制协议)和UDP(用户数据报协议)。WebSocket协议严格依赖于TCP提供的可靠、有序、有连接的字节流传输。
  • 网络层 (Internet Layer):对应OSI的网络层。负责数据包的寻址和路由,使数据包能够跨越多个网络进行传输。核心协议是IP(网际协议)。
  • 网络接口层 (Network Access Layer):对应OSI的数据链路层和物理层。负责在物理介质上发送和接收数据帧,处理物理硬件细节。

在Web通信中,最常见的模式是客户端(通常是浏览器)通过HTTP协议与服务器进行通信。HTTP协议运行在TCP之上。当浏览器请求一个网页时,它首先通过DNS解析服务器域名得到IP地址,然后建立一个TCP连接,接着发送HTTP请求,服务器处理请求后发送HTTP响应,最后TCP连接可以被关闭或复用。

1.2 HTTP协议的固有局限性

HTTP(超文本传输协议)是Web的基石,它简单、无状态、可伸缩,非常适合请求-响应式的文档传输。然而,随着Web技术的发展,对实时交互的需求日益增长,HTTP的某些固有特性开始显现出局限性。

1.2.1 请求-响应模型与无状态性

HTTP是基于请求-响应(Request-Response)模式的。客户端发送一个请求,服务器返回一个响应。每个请求都是独立的,服务器不会在请求之间保留客户端的状态(无状态)。虽然可以通过Cookie、Session等机制来维护状态,但这并不改变请求-响应的基本模型。

这种模型在需要实时或半实时更新的场景中效率低下。例如:

  • 聊天应用:如果使用HTTP,客户端需要不断地向服务器发送请求(轮询,Polling)来检查是否有新消息,或者服务器在收到消息后长时间保持连接不打开(长轮询,Long Polling)。这两种方式都有明显的缺点。
  • 股票行情:实时股价波动需要立即推送到客户端。
  • 在线游戏:玩家操作和游戏状态的同步需要低延迟。
  • 协作文档编辑:多个用户同时编辑文档,需要实时同步修改。

在这些场景下,传统的HTTP模型会带来以下问题:

  • 高延迟:轮询机制意味着客户端必须等待固定间隔才能获取最新数据,导致更新不及时。长轮询虽然有所改善,但依然有延迟,且服务器维护大量“悬挂”连接的开销很大。
  • 资源消耗:频繁的HTTP请求和响应会产生大量的HTTP头部信息,增加了网络带宽消耗。服务器也需要处理大量的短连接或挂起连接,消耗CPU和内存资源。
  • 半双工通信:HTTP是半双工的,即在任何给定时间,数据只能在一个方向上传输。客户端发送请求后,必须等待服务器响应才能发送下一个请求。服务器无法主动向客户端推送数据。
1.2.2 轮询 (Polling) 和长轮询 (Long Polling) 的不足

为了在HTTP模型下模拟实时通信,开发人员采用了几种技术:

  • 短轮询 (Short Polling):客户端每隔N秒向服务器发送一次HTTP请求,询问是否有新数据。
    • 优点:实现简单。
    • 缺点
      • 效率低下:在大多数情况下,服务器可能没有新数据,导致大量空请求,浪费带宽和服务器资源。
      • 高延迟:新数据产生后,客户端必须等到下一个轮询周期才能获取。
  • 长轮询 (Long Polling):客户端发送一个HTTP请求,服务器在没有新数据时会保持连接打开,直到有新数据或达到超时时间。一旦有数据,服务器立即发送响应并关闭连接。客户端收到响应后,立即发送新的长轮询请求。
    • 优点:相对短轮询降低了延迟,减少了空请求。
    • 缺点
      • 复杂性:服务器端需要维护大量处于挂起状态的连接,管理这些连接的生命周期、超时等变得复杂。
      • 连接开销:每个新的数据推送都需要重新建立HTTP请求和响应头,依然存在一定的开销。
      • 防火墙和代理问题:长时间的挂起连接可能被中间网络设备(如防火墙、代理服务器)认为是死连接而关闭。

尽管这些技术可以在一定程度上缓解HTTP的局限性,但它们都不是真正的双向实时通信方案,并且在资源消耗、延迟和实现复杂性方面都有明显的不足。这些不足正是WebSocket协议诞生的根本原因。

1.3 WebSocket的诞生背景与核心优势

为了解决HTTP在实时通信方面的不足,WebSocket协议应运而生。它旨在提供一个在单个TCP连接上进行全双工通信的机制,从而实现客户端和服务器之间的实时、低延迟、高效的数据交换。

1.3.1 WebSocket的诞生背景

WebSocket协议最早在2008年由HTML5草案提出,并于2011年由IETF标准化为RFC 6455。它的出现是为了满足Web应用程序日益增长的实时交互需求,例如:

  • 实时数据更新:如体育比赛得分、股票行情、天气预报等。
  • 即时通讯:聊天应用、客服系统。
  • 在线协作:多人同时编辑文档、共享白板。
  • 游戏:多人在线游戏的状态同步和指令传输。
  • IoT设备监控与控制:实时传输传感器数据,远程控制设备。
1.3.2 WebSocket的核心优势

WebSocket协议通过以下几个关键特性,完美解决了传统HTTP在实时通信中的痛点:

  1. 全双工通信 (Full-Duplex Communication)

    • 这是WebSocket最核心的优势。一旦WebSocket连接建立,客户端和服务器可以同时独立地发送和接收数据,无需等待对方的响应。这就像一条双向高速公路,数据可以在两个方向上同时流动,极大地提高了通信效率和实时性。
    • 相比之下,HTTP是半双工的,同一时间只能在一个方向上传输数据(请求或响应)。
  2. 单个TCP连接 (Single TCP Connection)

    • WebSocket连接一旦建立,就一直保持开放状态,除非显式关闭或遇到网络故障。
    • 这意味着客户端和服务器之间只需要进行一次TCP握手(三次握手)和HTTP握手(一次协议升级),后续的数据传输无需重复建立连接。
    • 这显著减少了网络延迟和服务器资源开销,避免了HTTP轮询和长轮询中频繁的连接建立和关闭带来的额外负担。
  3. 协议头部开销小 (Minimal Overhead)

    • 在WebSocket连接建立后,后续的数据帧传输仅包含极小的帧头(通常只有几字节),而不是像HTTP请求和响应那样包含大量冗余的HTTP头部信息。
    • 例如,一个简单的WebSocket文本消息可能只需要2-14字节的帧头,而一个HTTP请求可能需要数百字节的头部。这极大地节省了带宽,尤其是在传输大量小消息时,效果更为显著。
  4. 服务器主动推送 (Server Push)

    • WebSocket允许服务器在有新数据时,无需客户端请求,即可主动将数据推送到客户端。这是实现真正实时交互的关键。
    • 这与HTTP的长轮询机制有本质区别:长轮询是客户端发起请求并保持连接,服务器才能响应;而WebSocket是连接建立后,双方都可以随时主动发送数据。
  5. 跨域通信 (Cross-Origin Communication)

    • WebSocket协议天然支持跨域通信,其同源策略与HTTP有所不同。在WebSocket握手阶段,服务器会检查Origin头部,并可以决定是否允许该源的连接。一旦连接建立,后续的数据传输不受同源策略限制,这简化了Web应用开发中的跨域问题。
  6. 更好的网络兼容性 (Better Network Compatibility)

    • WebSocket协议设计时考虑了HTTP兼容性,通过标准的HTTP/1.1升级机制来发起连接。这意味着它可以通过现有的HTTP端口(80/443)进行通信,更容易穿透防火墙和代理服务器。

综合来看,WebSocket协议为Web应用程序带来了真正的实时双向通信能力,显著提高了通信效率,降低了资源消耗和延迟,是现代实时Web应用不可或缺的核心技术。

1.4 WebSocket协议初探:握手机制的深度解析

WebSocket协议的建立过程是一个关键且精妙的部分,它利用了HTTP/1.1的“协议升级”(Upgrade)机制。这个过程被称为WebSocket握手(Handshake)。

1.4.1 协议升级(Upgrade)机制

HTTP/1.1引入了一个名为Upgrade的通用头部字段,允许客户端请求服务器将当前连接从一个协议切换到另一个协议。例如,客户端可以请求将一个普通HTTP连接升级到TLS/SSL(HTTPS),或者在WebSocket的场景中,升级到WebSocket协议。

当客户端发送一个带有UpgradeConnection: Upgrade头部的HTTP请求时,它实际上是在告诉服务器:“我想将这个TCP连接从HTTP协议切换到WebSocket协议。”如果服务器同意,它会返回一个特殊的HTTP响应,表明协议升级成功。此后,这个TCP连接上就不再传输HTTP消息,而是传输WebSocket帧。

1.4.2 WebSocket握手流程详解

WebSocket握手发生在TCP连接建立之后,HTTP协议头交换之前。整个流程可以概括为以下几个步骤:

  1. 客户端发起HTTP请求:客户端(通常是浏览器或一个WebSocket客户端库)向服务器发送一个特殊的HTTP GET请求。这个请求看起来像一个普通的HTTP请求,但包含一些特定的头部字段,表明其意图是发起WebSocket连接。

    GET /chat HTTP/1.1 # 请求行,路径通常用于区分不同的WebSocket服务Host: server.example.com # 目标主机,与HTTP请求一致Upgrade: websocket # 关键头部,表示客户端希望升级到WebSocket协议Connection: Upgrade # 关键头部,与Upgrade配合,表示进行连接升级Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== # 安全密钥,客户端随机生成,用于服务器生成响应密钥,防止代理缓存攻击Sec-WebSocket-Version: 13 # WebSocket协议版本,当前稳定版本是13Origin: http://example.com # 请求源,用于服务器进行同源策略验证,防止CSRF攻击User-Agent: Mozilla/5.0 (... )  # 用户代理,提供客户端信息Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits # 可选,请求扩展,如消息压缩Sec-WebSocket-Protocol: chat, superchat  # 可选,请求子协议,用于区分应用层协议
    • GET /chat HTTP/1.1: 这是一个标准的HTTP GET请求,/chat 是请求的URI路径,用于指示服务器上哪个WebSocket服务应该处理此连接。
    • Host: server.example.com: 指定了请求的目标主机。
    • Upgrade: websocket: 这是一个关键的HTTP头部,明确告诉服务器客户端希望将当前HTTP连接“升级”为WebSocket协议。
    • Connection: Upgrade: 这是另一个关键的HTTP头部,与Upgrade头部配合使用,指示HTTP/1.1的通用升级机制正在被使用。
    • Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==: 这是一个由客户端随机生成的Base64编码的16字节(128位)随机值。它的主要作用是增加握手的安全性,防止恶意缓存代理的攻击。服务器必须使用这个值来计算一个响应值,以证明其理解并支持WebSocket协议。
    • Sec-WebSocket-Version: 13: 指定了客户端支持的WebSocket协议版本。目前最新的稳定版本是RFC 6455定义的版本13。
    • Origin: http://example.com: 类似于HTTP请求中的Referer头部,指示了发起WebSocket连接的Web页面的源。服务器可以根据这个头部进行安全检查,例如阻止来自不允许的域名的连接。
    • User-Agent: Mozilla/5.0 (... ): 客户端的User-Agent字符串,提供关于客户端软件的信息。
    • Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits: 客户端请求的WebSocket扩展列表。permessage-deflate 是最常用的扩展,用于启用消息级别的压缩,提高数据传输效率。
    • Sec-WebSocket-Protocol: chat, superchat: 客户端请求的子协议列表。WebSocket本身不定义应用层消息的格式或含义,子协议允许在WebSocket连接之上运行多个更高级的应用层协议。例如,一个聊天应用可能使用一个“chat”子协议。服务器会从列表中选择一个它支持的子协议作为响应。
  2. 服务器响应HTTP 101状态码:如果服务器同意升级到WebSocket协议,它会发送一个特殊的HTTP响应,状态码为101 Switching Protocols。这个状态码明确表示服务器正在执行协议切换。

    HTTP/1.1 101 Switching Protocols  # 状态码101表示协议切换成功Upgrade: websocket # 确认升级到WebSocket协议Connection: Upgrade # 确认连接升级Sec-WebSocket-Accept: s3pPLMBiTzSykCbRzihAENQnQJk= # 服务器根据Sec-WebSocket-Key计算出的响应密钥Sec-WebSocket-Protocol: chat# 如果客户端请求了子协议,服务器会选择一个并在此返回
    • HTTP/1.1 101 Switching Protocols: 这是服务器同意协议升级的关键响应。
    • Upgrade: websocket: 再次确认升级到WebSocket协议。
    • Connection: Upgrade: 再次确认连接升级。
    • Sec-WebSocket-Accept: s3pPLMBiTzSykCbRzihAENQnQJk=: 这是服务器对Sec-WebSocket-Key的响应。服务器的计算方式是:将客户端发送的Sec-WebSocket-Key与一个固定的GUID(258EAFA5-E914-47DA-95CA-C5AB0DC85B11)拼接起来,然后对这个字符串进行SHA-1哈希计算,最后将哈希结果进行Base64编码。
      • 计算过程:
        1. 将客户端的Sec-WebSocket-Key (dGhlIHNhbXBsZSBub25jZQ==) 与 258EAFA5-E914-47DA-95CA-C5AB0DC85B11 拼接。
        2. 对拼接后的字符串进行SHA-1散列。
        3. 将SHA-1散列结果进行Base64编码。
      • 客户端收到此响应后,会执行相同的计算,并比对服务器返回的Sec-WebSocket-Accept值是否一致。如果一致,则确认服务器理解WebSocket协议,且并非恶意代理。这是防止缓存中毒和跨协议攻击的关键机制。
    • Sec-WebSocket-Protocol: chat: 如果客户端在请求中指定了多个子协议,服务器会在此处返回它选择并接受的那个子协议。如果服务器不支持任何客户端请求的子协议,它将不包含此头部。
  3. 握手完成,开始WebSocket数据帧传输

    • 一旦客户端收到并验证了101 Switching Protocols响应,握手过程就宣告完成。
    • 从这一刻起,这个TCP连接上不再传输HTTP消息,而是转变为传输WebSocket数据帧。客户端和服务器可以开始通过这个全双工连接发送和接收WebSocket数据帧了。
1.4.3 握手阶段的安全性考量

WebSocket握手阶段的设计考虑了多方面的安全性:

  • Sec-WebSocket-KeySec-WebSocket-Accept:这两个头部字段是防止缓存代理中毒的关键。恶意代理可能尝试缓存WebSocket握手响应,或者将HTTP响应伪装成WebSocket握手响应。通过这个挑战-响应机制,客户端可以验证服务器确实理解WebSocket协议,而不是一个返回了错误HTTP响应的普通HTTP服务器或代理。
  • Origin 头部Origin 头部提供了发起连接的客户端脚本的源(协议、域名、端口)。服务器可以检查这个头部,根据自己的安全策略(例如,只允许来自特定域名的连接)来决定是否接受WebSocket连接。这有助于防止跨站点请求伪造(CSRF)攻击,尽管对于WebSocket而言,其与HTTP的同源策略有所不同,但Origin 检查仍是第一道防线。
  • WSS (WebSocket Secure):类似HTTP的HTTPS,WebSocket也支持基于TLS/SSL的加密连接,即WSS协议(wss://)。当使用wss://前缀时,WebSocket连接在TCP层之上会首先进行TLS握手,建立加密隧道,然后再进行WebSocket协议握手。这确保了数据在传输过程中的机密性、完整性和认证性,是生产环境中强烈推荐的部署方式。
1.4.4 握手失败场景

如果WebSocket握手失败,例如:

  • 服务器不支持WebSocket协议。
  • 服务器拒绝了客户端的Origin
  • 服务器无法计算出正确的Sec-WebSocket-Accept值。
  • 服务器由于其他安全策略或内部错误拒绝连接。

在这种情况下,服务器会返回一个标准的HTTP错误状态码(例如,400 Bad Request403 Forbidden404 Not Found500 Internal Server Error等),而不是101 Switching Protocols。客户端会识别这是一个HTTP错误,而不是成功的WebSocket升级,从而终止连接或进行相应的错误处理。

深入理解WebSocket的握手机制对于构建稳健的WebSocket应用程序至关重要。它不仅是连接建立的必要步骤,更是协议安全性和兼容性的体现。从握手成功的那一刻起,我们便进入了WebSocket帧的世界,开始真正的全双工实时通信。

1.5 WebSocket数据帧结构与通信原理

一旦WebSocket握手成功,客户端和服务器之间就不再传输HTTP消息,而是通过一系列精心设计的数据帧(Frame)进行通信。理解这些数据帧的结构和编解码原理,是掌握WebSocket底层通信机制的关键。

1.5.1 WebSocket帧的基本结构

WebSocket协议定义了数据帧的基本格式,它允许在单个TCP连接上同时传输多种类型的数据(文本、二进制)以及控制消息。每个WebSocket帧都由一个固定的头部和可选的扩展数据、应用数据组成。

以下是WebSocket数据帧的通用结构概览:

0  1  2  30 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1+-+-+-+-+-------+-+-------------+-------------------------------+|F I N|R S V 1|R S V 2|R S V 3|o p c o d e|M A S K| Payload len |+-+-+-+-+-------+-+-------------+-------------------------------+| Extended payload length (if payload len is 126 or 127) || ... (2 or 8 bytes)  |+-+-+-+-+-------------------------------------------------------+| Masking-key (4 bytes) (if M A S K is set)  |+-+-+-+-+-------------------------------------------------------+| Payload Data (length of Payload len) || ...  |+-+-+-+-+-------------------------------------------------------+

我们将逐一解析这些字段:

  1. FIN (Final Fragment):1位。

    • FIN 位表示当前帧是否是消息的最后一个片段。
    • 如果 FIN1,表示这是消息的最后一个片段,或整个消息只有一个帧。
    • 如果 FIN0,表示后续还有更多片段,当前帧是一个分片消息的中间片段。WebSocket允许将一条大消息分割成多个小帧进行传输,这在网络条件不稳定或需要处理非常大的消息时很有用。
  2. RSV1, RSV2, RSV3 (Reserved Bits):各1位。

    • 这三个位是保留位,通常必须为 0
    • 它们用于定义协议的未来扩展。如果使用了扩展(例如 permessage-deflate 消息压缩扩展),这些位可能会被设置为非零值,用于指示扩展的特定行为。如果一个端点收到了一个设置为非零的保留位,而它不支持相应的扩展,那么它必须关闭连接。
  3. Opcode (Operation Code):4位。

    • Opcode 定义了帧的类型,指示负载数据(Payload Data)的解释方式。
    • 常见的 Opcode 值:
      • %x0 (0):连续帧 (Continuation Frame)。当一条消息被分片发送时,除了第一个和最后一个片段,中间的片段都使用这个 Opcode。
      • %x1 (1):文本帧 (Text Frame)。负载数据是UTF-8编码的文本。
      • %x2 (2):二进制帧 (Binary Frame)。负载数据是任意二进制数据。
      • %x8 (8):连接关闭帧 (Connection Close Frame)。用于发起或响应关闭WebSocket连接的请求。
      • %x9 (9):Ping帧 (Ping Frame)。一种控制帧,用于检测连接的活跃性(心跳)。
      • %xA (10):Pong帧 (Pong Frame)。对Ping帧的响应,表明连接仍然活跃。
    • %x3 - %x7%xB - %xF 是保留 Opcode,用于未来的协议扩展。
  4. MASK (Masking Bit):1位。

    • MASK 位表示负载数据是否被掩码处理。
    • 如果 MASK1,表示负载数据已被掩码处理,并且存在一个4字节的 Masking-key客户端发送到服务器的所有帧都必须被掩码处理 (MASK1),否则服务器会关闭连接。
    • 如果 MASK0,表示负载数据未被掩码处理,且没有 Masking-key服务器发送到客户端的所有帧都必须不被掩码处理 (MASK0)。
    • 这个机制是为了防止代理缓存投毒攻击,确保客户端发起连接请求的安全性。
  5. Payload len (Payload Length):7位、7+16位 或 7+64位。

    • Payload len 表示负载数据的长度(以字节为单位)。
    • 如果 Payload len 字段的值是 0-125,那么它直接表示负载数据的长度。
    • 如果 Payload len 字段的值是 126,那么接下来的2个字节(16位无符号整数)表示实际的负载数据长度。这允许负载长度最大达到 65535 字节。
    • 如果 Payload len 字段的值是 127,那么接下来的8个字节(64位无符号整数)表示实际的负载数据长度。这允许负载长度达到一个非常大的值(2^63 - 1 字节)。
    • 这种可变长度的编码方式允许WebSocket高效地传输不同大小的消息,从小消息到超大消息。
  6. Masking-key (掩码键):4字节 (32位)。

    • 只有当 MASK 位为 1 时才存在。这是一个由客户端随机生成的32位值。
    • 负载数据在发送前会与这个Masking-key进行异或运算,从而实现掩码处理。服务器收到掩码处理后的负载数据和Masking-key后,会使用相同的Masking-key再次进行异或运算,从而还原出原始负载数据。
    • 掩码运算规则
      transformed-octet-i = original-octet-i XOR masking-key-octet-(i % 4)
      其中 i 是负载数据中的字节索引。
  7. Payload Data (负载数据):长度由 Payload len 字段决定。

    • 这部分包含实际的应用数据,例如文本消息的UTF-8编码字节,或二进制文件的原始字节。
    • 如果启用了扩展(如压缩),Payload Data可能是压缩后的数据。
1.5.2 帧类型与分片机制

WebSocket帧可以分为两类:数据帧和控制帧。

  • 数据帧 (Data Frames)

    • Opcode = %x1 (文本帧)
    • Opcode = %x2 (二进制帧)
    • Opcode = %x0 (连续帧)
    • 数据帧用于传输实际的应用数据。它们可以被分片传输(FIN=0),允许发送非常大的消息而无需一次性加载到内存,或者在网络拥塞时分批发送。
    • 当消息被分片时,第一个片段是一个带有实际 Opcode(文本或二进制)的帧,后续所有中间片段和最后一个片段都使用 Opcode = %x0(连续帧)。最后一个片段的 FIN 位必须设置为 1
  • 控制帧 (Control Frames)

    • Opcode = %x8 (连接关闭帧)
    • Opcode = %x9 (Ping帧)
    • Opcode = %xA (Pong帧)
    • 控制帧用于管理WebSocket连接。它们不能被分片 (FIN 位必须为 1),且负载数据的长度不能超过125字节。
    • Ping/Pong机制
      • Ping帧 (Opcode = %x9) 可以由客户端或服务器发送,用于检测连接是否仍然活跃,或作为心跳机制的一部分。负载数据可以是任意的,通常用于携带连接标识或时间戳。
      • 接收到Ping帧的一方必须在合理的时间内回复一个 Pong帧 (Opcode = %xA)。Pong帧的负载数据必须与收到的Ping帧的负载数据完全相同。Ping/Pong机制是WebSocket实现可靠性和保持连接活跃性的重要手段。
    • 关闭帧 (Close Frame)
      • 当任一方希望关闭WebSocket连接时,会发送一个关闭帧 (Opcode = %x8)。关闭帧的负载数据通常包含一个两字节的状态码(例如 1000 表示正常关闭)和一个可选的UTF-8编码的关闭原因。
      • 接收到关闭帧的一方会回复一个关闭帧,然后双方关闭底层的TCP连接。这是一个优雅关闭连接的协商过程。
1.5.3 数据帧的传输与解析

当一个端点要发送一条消息时,它会将消息数据封装在一个或多个WebSocket帧中。
如果消息是文本,它会被编码为UTF-8字节序列。
如果消息是二进制,它就是原始字节序列。

发送方(例如客户端)的流程:

  1. 准备要发送的数据(文本或二进制)。
  2. 将数据封装到WebSocket帧中,设置正确的 Opcode(文本帧或二进制帧)。
  3. 如果数据很长,可以选择分片发送。
  4. 为帧设置 FIN 位(1 表示最后一个片段或非分片消息,0 表示中间片段)。
  5. 如果发送方是客户端,必须生成一个4字节的随机 Masking-key,将 MASK 位设置为 1,然后使用该 Masking-keyPayload Data 进行异或运算。
  6. 将完整的帧(包括头部、扩展长度、掩码键、掩码处理后的负载数据)发送到TCP连接。

接收方(例如服务器)的流程:

  1. 从TCP连接中读取字节流。
  2. 解析帧头部,提取 FINRSVOpcodeMASKPayload len
  3. 根据 Payload len 确定负载数据的实际长度和后续字节的读取方式(2字节或8字节的扩展长度)。
  4. 如果 MASK 位为 1,读取4字节的 Masking-key,然后使用该 Masking-key 对接收到的 Payload Data 进行异或运算,还原出原始负载数据。(注意:服务器收到客户端帧时 MASK 总是为 1
  5. 根据 Opcode 解释负载数据:
    • 如果是文本帧,将负载数据解码为UTF-8字符串。
    • 如果是二进制帧,直接处理二进制数据。
    • 如果是控制帧(Ping/Pong/Close),执行相应的控制逻辑。
  6. 如果 FIN 位为 0,则表示这是一个分片消息的中间片段,接收方应继续接收后续片段,直到收到 FIN=1 的片段,然后将所有片段的负载数据拼接起来形成完整的消息。

这个帧结构和通信原理是WebSocket协议高效和灵活的关键所在。它允许在同一个连接上可靠地传输各种类型的数据和控制信息,并提供了心跳和优雅关闭的机制,使得WebSocket成为构建实时应用的首选方案。

1.6 Python实现WebSocket通信的库生态概览

Python生态系统中存在多个库来支持WebSocket通信,从底层的网络编程到高级的异步框架集成,各有侧重。选择合适的库取决于项目的具体需求、对异步编程的熟悉程度以及性能要求。

1.6.1 核心异步编程模型:asyncio

在Python中,高效地处理大量并发的I/O操作(如网络通信)通常依赖于异步编程。Python 3.4 引入的 asyncio 库是Python标准库中用于编写并发代码的核心框架,它使用协程(coroutine)和事件循环(event loop)来实现非阻塞I/O。

WebSocket通信的特性(长连接、全双工、高并发)使其与异步编程模型天然契合。几乎所有主流的Python WebSocket库都基于 asyncio 或其他异步框架(如Twisted,但 asyncio 是主流)。理解 asyncio 的基本概念,如 async defawaitevent loopTasksFutures,对于使用这些库至关重要。

1.6.2 主流WebSocket库介绍
  1. websockets (推荐,基于asyncio)

    • 特点:这是目前Python社区中最受欢迎、最成熟且功能最丰富的WebSocket库之一。它完全基于 asyncio 构建,提供了客户端和服务器端的完整实现。
    • 优势
      • 完全异步:完美集成 asyncio,能够高效处理大量并发连接。
      • 协议符合性:严格遵循RFC 6455 WebSocket协议标准,提供了健壮的实现。
      • 易用性:提供了简洁的API用于创建WebSocket服务器和客户端。
      • 功能丰富:支持多种特性,如SSL/TLS加密 (WSS)、ping/pong心跳、消息分片、各种控制帧、HTTP代理、扩展 (如permessage-deflate压缩)、子协议协商。
      • 活跃开发:社区活跃,维护良好。
    • 适用场景:任何需要高性能、高并发、协议完整性的WebSocket应用,无论是独立的WebSocket服务器还是作为现有异步Web框架的补充。
    • 代码示例(概念)
      import asyncio # 导入asyncio库,用于异步编程import websockets # 导入websockets库,用于WebSocket通信# 定义一个异步函数,用于处理WebSocket连接async def echo_server(websocket, path): # websocket对象代表客户端连接,path是请求路径 async for message in websocket: # 异步迭代接收来自客户端的消息 print(f\"Received message: {  message}\") # 打印收到的消息 await websocket.send(f\"Echo: {  message}\") # 异步发送回声消息给客户端# 启动WebSocket服务器async def main(): # 定义主异步函数 # 启动一个WebSocket服务器,监听所有IP地址的8765端口,并使用echo_server处理连接 server = await websockets.serve(echo_server, \"0.0.0.0\", 8765) await server.wait_closed() # 等待服务器关闭if __name__ == \"__main__\": # 程序的入口点 asyncio.run(main()) # 运行主异步函数

      上述代码片段展示了使用 websockets 库创建一个简单的WebSocket回声服务器的基本结构。在后续章节中,我们将对 websockets 库进行极其详尽的剖析和实战。

  2. fastapi / starlette (Web框架集成)

    • 特点FastAPI 是一个现代、快速(高性能)的Web框架,用于构建API。它底层基于 Starlette,而 Starlette 提供了对ASGI(Asynchronous Server Gateway Interface)标准的支持,包括WebSocket。
    • 优势
      • 集成Web API:如果你已经在使用FastAPI构建RESTful API,那么在同一个框架内添加WebSocket功能非常自然。
      • 高性能:得益于Starlette和Uvicorn(ASGI服务器)。
      • 类型提示:FastAPI利用Python的类型提示进行数据验证和序列化,并自动生成OpenAPI文档。
      • 简单易用:WebSocket路由和处理函数的定义非常直观。
    • 适用场景:需要同时提供RESTful API和WebSocket实时通信的Web应用。
    • 代码示例(概念)
      from fastapi import FastAPI, WebSocket # 从FastAPI导入应用类和WebSocket类import uvicorn # 导入uvicorn,用于运行ASGI应用app = FastAPI() # 创建FastAPI应用实例@app.websocket(\"/ws\") # 定义一个WebSocket路由,当客户端连接到/ws路径时,会调用下面的函数async def websocket_endpoint(websocket: WebSocket): # websocket参数代表客户端连接 await websocket.accept() # 接受WebSocket连接,完成握手 try: while True: # 循环接收和发送消息 data = await websocket.receive_text() # 异步接收文本消息 await websocket.send_text(f\"Message text was: {  data}\") # 异步发送文本消息 except Exception as e: # 捕获异常,例如客户端断开连接 print(f\"WebSocket Error: {  e}\") # 打印错误信息 finally: await websocket.close() # 确保WebSocket连接被关闭if __name__ == \"__main__\": # 程序的入口点 uvicorn.run(app, host=\"0.0.0.0\", port=8000) # 使用uvicorn运行FastAPI应用

      这个示例展示了FastAPI如何简洁地集成WebSocket功能。虽然它提供了高层抽象,但底层依然依赖ASGI服务器(如Uvicorn)来处理WebSocket协议细节。

  3. Sanic (异步Web框架集成)

    • 特点Sanic 是一个异步Web框架,设计之初就考虑了快速性。它也支持ASGI规范,因此能够很好地集成WebSocket。
    • 优势
      • 异步特性:天生异步,性能出色。
      • 易于使用:API设计简洁。
    • 适用场景:与FastAPI类似,适用于需要Web框架和WebSocket集成的情况。
  4. Django Channels (Django框架集成)

    • 特点Django Channels 是Django框架的一个官方扩展,它将Django的同步请求-响应模型扩展为异步模型,支持WebSocket、HTTP/2等协议。
    • 优势
      • Django生态集成:如果你正在使用Django开发Web应用,Channels是添加实时功能的最佳选择,可以直接利用Django ORM、认证系统等。
      • Channel Layer:引入了“Channel Layer”的概念,允许不同进程或服务器实例之间通过消息队列(如Redis)进行通信,实现多服务器部署下的WebSocket消息广播。
    • 适用场景:基于Django的大型Web应用,需要实时功能且要求高可伸缩性。
  5. 低层Socket编程 (不推荐直接用于生产)

    • Python的 socket 模块提供了对TCP/IP协议的底层访问。理论上,你可以完全从零开始使用 socket 模块来实现WebSocket协议的握手、帧解析和构建。
    • 优点:对协议细节有极致的控制,能深入理解底层机制。
    • 缺点
      • 复杂性高:需要手动处理所有WebSocket协议细节(握手、帧编码/解码、掩码、心跳、错误处理、分片等),开发工作量巨大,且极易出错。
      • 不推荐:除非是学习目的或有极其特殊的需求,否则在生产环境中不建议直接使用 socket 模块实现WebSocket,而是使用成熟的库。

第2章:websockets 库核心实践:从基础构建到高级通信

本章将聚焦于Python中最强大且广泛使用的WebSocket库——websockets。我们将从其安装和环境配置开始,逐步深入到构建服务器和客户端的每一个细节,解析其底层机制,并探讨如何处理各种消息类型、管理连接状态以及实现高效的实时通信。

2.1 websockets 库安装与环境准备

在开始编写WebSocket应用之前,我们首先需要确保Python环境配置正确,并安装 websockets 库。

2.1.1 Python版本要求

websockets 库高度依赖Python的异步I/O(asyncio)特性。因此,它要求Python版本为 3.6 或更高。推荐使用最新稳定的Python 3.x 版本,例如Python 3.83.93.10 及以上,这些版本对 asyncio 进行了持续的性能优化和功能增强。

如果你使用的Python版本低于 3.6,则无法直接安装或运行 websockets 库,你需要升级你的Python解释器。可以通过以下命令检查当前Python版本:

python --version # 查看当前系统默认的Python版本

或者

python3 --version # 在某些系统上,python3 指向的是 Python 3.x 版本

如果你的系统中安装了多个Python版本,确保你激活的是符合要求的版本。

2.1.2 虚拟环境最佳实践

在Python开发中,强烈推荐为每个项目创建独立的虚拟环境(Virtual Environment)。虚拟环境可以隔离项目的依赖包,避免不同项目之间库版本冲突的问题,保持项目环境的清洁和可移植性。

创建和激活虚拟环境的常用步骤如下:

  1. 创建虚拟环境
    在项目根目录下执行以下命令,创建一个名为 venv 的虚拟环境(名称可自定义)。

    python -m venv venv # 使用venv模块创建一个名为venv的虚拟环境

    这条命令会创建一个新的目录 venv,其中包含了Python解释器的副本以及 pip 工具。

  2. 激活虚拟环境
    激活虚拟环境后,所有通过 pip 安装的包都将安装到这个独立的虚拟环境中,而不会影响系统全局的Python环境。

    • 在Windows系统上
      .\\venv\\Scripts\\activate # 在PowerShell或CMD中激活虚拟环境
    • 在macOS/Linux系统上
      source venv/bin/activate # 在Bash或Zsh中激活虚拟环境

    激活成功后,你的命令行提示符通常会显示虚拟环境的名称(例如 (venv)),表示你当前的操作都在虚拟环境内。

  3. 退出虚拟环境
    当你完成项目开发或需要切换到其他项目时,可以使用 deactivate 命令退出当前虚拟环境。

    deactivate # 退出当前激活的虚拟环境

始终在激活的虚拟环境中安装和运行 websockets 及其相关依赖。

2.1.3 pip 安装命令及其选项

激活虚拟环境后,使用Python的包管理器 pip 来安装 websockets 库。

  1. 基本安装

    pip install websockets # 使用pip安装websockets库

    这条命令会下载并安装最新稳定版本的 websockets 库及其所有必需的依赖项。

  2. 安装特定版本
    有时,你可能需要安装 websockets 的特定版本,以确保与你的项目或现有代码的兼容性。

    pip install websockets==10.4 # 安装websockets的10.4版本

    请根据实际需求替换 10.4 为你所需的版本号。

  3. 升级 websockets
    如果你已经安装了 websockets,并希望升级到最新版本。

    pip install --upgrade websockets # 升级websockets库到最新版本
  4. 安装带有额外依赖的 websockets
    websockets 库本身是轻量级的,但为了支持一些高级功能,例如通过HTTP代理连接,你可能需要安装额外的依赖。

    pip install websockets[http] # 安装websockets及其HTTP代理支持所需的额外依赖

    目前,websockets 库的额外依赖项主要是 http,用于支持通过HTTP代理服务器建立WebSocket连接。在内部,它会安装 httpx 库。

安装完成后,你可以通过尝试导入 websockets 模块来验证安装是否成功:

import websockets # 尝试导入websockets模块print(\"websockets installed successfully!\") # 如果导入成功,则打印此消息

如果在执行此Python脚本时没有出现 ModuleNotFoundError 错误,则表示 websockets 已正确安装。

2.1.4 依赖项解析 (asyncio, etc.)

websockets 库的核心依赖是Python标准库中的 asyncioasyncio 提供了一个框架,用于使用协程(coroutines)编写并发代码,并通过事件循环(event loop)来管理非阻塞I/O操作。

websockets 库正是建立在 asyncio 之上的。这意味着:

  • 协程的使用:你在 websockets 应用中编写的所有处理函数都必须是异步函数(使用 async def 定义),并且在调用I/O密集型操作(如发送、接收消息)时必须使用 await 关键字。
  • 事件循环websockets 服务器的运行和客户端的连接都需要在一个 asyncio 事件循环中进行调度。当你使用 asyncio.run() 启动一个异步入口点时,它会自动创建一个事件循环并运行你的主协程。
  • 非阻塞I/Owebsockets 库内部实现了底层的TCP Socket操作,但这些操作都是非阻塞的。当一个WebSocket连接等待数据到达或数据被发送出去时,事件循环可以切换到处理其他连接或任务,从而实现高并发。

除了 asynciowebsockets 库可能还会间接依赖一些其他标准库模块,如 socketssl(用于WSS安全连接)、struct(用于处理帧的二进制编码)等,但这些都是Python解释器自带的,无需额外安装。

理解 asyncio 的基础是高效使用 websockets 的前提。如果你对 asyncio 不熟悉,建议先阅读其官方文档或相关教程,掌握 async defawaitTaskFuture、事件循环等核心概念。

2.2 构建第一个WebSocket服务器

我们将从构建一个最简单的WebSocket服务器开始,它将实现一个“回声”(Echo)功能:客户端发送什么消息,服务器就原封不动地发回什么消息。这个例子将帮助你理解 websockets 服务器的核心工作原理。

2.2.1 基础服务器结构:websockets.serve

websockets 库提供了 websockets.serve() 函数来创建和启动一个WebSocket服务器。这个函数是一个协程,它需要一个异步函数作为其连接处理程序,并指定监听的IP地址和端口。

import asyncio # 导入asyncio库,用于支持异步编程和事件循环管理import websockets # 导入websockets库,用于构建WebSocket服务器和客户端# 1. 定义一个异步函数,作为WebSocket连接的处理程序。# 这个函数会在每个新的WebSocket连接建立时被调用一次。# websocket: 这是websockets库提供的WebSocketProtocol对象,代表了与当前客户端建立的WebSocket连接。# 通过这个对象,我们可以发送和接收数据。# path: 这是客户端连接时请求的URI路径(例如,如果客户端连接的是 ws://localhost:8765/echo,那么path就是 \"/echo\")。# 在简单的回声服务器中,我们可能不会使用它,但在更复杂的应用中,可以根据path来路由不同的服务。async def echo_server(websocket, path): # 定义一个异步函数来处理每个新的WebSocket连接 print(f\"新连接来自: { websocket.remote_address}, 路径: { path}\") # 打印新连接的远程地址和请求路径 try: # 使用try-except-finally块来优雅地处理连接的生命周期和可能出现的异常 # 2. 异步迭代接收来自客户端的消息。 # \'async for message in websocket\' 是一个非常方便的语法糖。 # 它会持续地从WebSocket连接中接收消息,直到连接关闭或发生错误。 # 接收到的消息类型(文本或二进制)会自动被处理为Python的str或bytes类型。 async for message in websocket: # 异步循环,持续接收客户端发送的消息 print(f\"收到来自 { websocket.remote_address} 的消息: { message}\") # 打印收到的消息及其来源 # 3. 将收到的消息原样发送回客户端。 # \'await websocket.send(message)\' 是一个异步操作,用于通过WebSocket连接发送数据。 # 这里直接发送了收到的消息,实现了“回声”功能。 await websocket.send(f\"服务器回应: { message}\") # 将收到的消息添加前缀后发回客户端 except websockets.exceptions.ConnectionClosedOK: # 捕获客户端正常关闭连接的异常 print(f\"连接 { websocket.remote_address} 正常关闭.\") # 打印连接正常关闭的信息 except websockets.exceptions.ConnectionClosedError as e: # 捕获客户端异常关闭连接的异常 print(f\"连接 { websocket.remote_address} 异常关闭: { e}\") # 打印连接异常关闭的信息和错误详情 except Exception as e: # 捕获所有其他未知异常 print(f\"连接 { websocket.remote_address} 发生未预期错误: { e}\") # 打印未预期错误信息 finally: # 无论连接如何关闭,都会执行finally块中的代码 print(f\"连接 { websocket.remote_address} 处理结束.\") # 打印连接处理结束的信息# 4. 定义主异步函数,用于启动WebSocket服务器。async def main_server(): # 定义一个主异步函数来启动服务器 # websockets.serve() 是一个异步函数,它创建一个WebSocket服务器实例。 # echo_server: 指定处理每个新连接的协程函数。 # \"0.0.0.0\": 监听所有可用的网络接口(IPv4),这意味着服务器可以在本机以及其他设备通过网络访问。 # 如果只希望在本机访问,可以使用 \"127.0.0.1\" (localhost)。 # 8765: 服务器监听的端口号。这是一个非特权端口,通常不会与常见服务冲突。 print(\"WebSocket服务器正在启动...\") # 打印服务器启动信息 server = await websockets.serve(echo_server, \"0.0.0.0\", 8765) # 启动WebSocket服务器 print(\"WebSocket服务器已在 ws://0.0.0.0:8765 监听...\") # 打印服务器监听地址和端口 # server.wait_closed() 是一个协程,它会一直运行,直到服务器被显式关闭。 # 在生产环境中,这通常意味着服务器会持续运行,直到进程被终止。 await server.wait_closed() # 等待服务器关闭,保持服务器运行# 5. 程序的入口点。# asyncio.run(main_server()) 运行main_server协程。# 它会自动创建一个新的asyncio事件循环,并在该循环中运行main_server协程,直到协程完成。# 对于顶层协程,这是启动asyncio应用的推荐方式。if __name__ == \"__main__\": # 检查是否作为主程序运行 asyncio.run(main_server()) # 运行主服务器启动函数

运行上述服务器代码的步骤:

  1. 将代码保存为 server.py
  2. 打开终端或命令行。
  3. 激活你的Python虚拟环境(如果已创建)。
  4. 运行 python server.py 命令。
    服务器将启动并输出 WebSocket服务器已在 ws://0.0.0.0:8765 监听...
2.2.2 详解服务器端消息处理:文本与二进制

echo_server 函数中,我们使用了 async for message in websocket: 来接收消息。websockets 库的这种抽象极大地简化了消息处理。它会自动处理WebSocket帧的解析、掩码去除以及消息类型(文本或二进制)的判断。

  • 接收文本消息
    当客户端发送一个文本帧(Opcode = %x1)时,async for 循环中的 message 变量将是一个Python str 类型。websockets 库会自动将UTF-8编码的负载数据解码为Python字符串。

    # ... 在 echo_server 函数内部 ...async for message in websocket: # 异步循环接收消息 if isinstance(message, str): # 检查接收到的消息是否为字符串类型(文本消息) print(f\"收到文本消息: {  message}\") # 打印文本消息内容 await websocket.send(f\"服务器已收到文本: {  message}\") # 发送文本响应 else: # 如果不是字符串类型,那就是二进制消息 print(f\"收到未知消息类型(应为二进制): {  type(message)}\") # 打印未知消息类型

    这段代码虽然没有直接在 async for 中区分,但 websocket.recv() 方法(async for 的底层逻辑)会根据WebSocket帧的Opcode自动返回 strbytes

  • 接收二进制消息
    当客户端发送一个二进制帧(Opcode = %x2)时,async for 循环中的 message 变量将是一个Python bytes 类型。websockets 库会直接提供原始的二进制负载数据。

    # ... 在 echo_server 函数内部 ...async for message in websocket: # 异步循环接收消息 if isinstance(message, bytes): # 检查接收到的消息是否为字节串类型(二进制消息) print(f\"收到二进制消息,长度: {  len(message)} 字节\") # 打印二进制消息的长度 # 可以对二进制数据进行处理,例如保存到文件,或者进行图片处理 # 这里简单地将其发送回去,或者加上前缀再发送 await websocket.send(b\"服务器已收到二进制数据: \" + message) # 发送二进制响应 elif isinstance(message, str): # 检查接收到的消息是否为字符串类型(文本消息) print(f\"收到文本消息: {  message}\") # 打印文本消息内容 await websocket.send(f\"服务器已收到文本: {  message}\") # 发送文本响应 else: # 处理其他可能的类型,虽然在websockets库中基本只有str和bytes print(f\"收到非文本/二进制消息: {  type(message)}\") # 打印非预期的消息类型

    注意websockets 库在 async for message in websocket 循环中,会自动处理消息的分片(fragmentation)。如果一个大消息被分成了多个帧发送,websockets 会在后台将这些片段重新组装成一个完整的 strbytes 对象,然后才将其传递给 message 变量。这极大地简化了开发人员的工作,无需手动管理分片和重组。

2.2.3 服务器端并发处理与事件循环

websockets 库是构建在 asyncio 之上的,这意味着它天生支持高并发。当我们说“并发”,通常指的是在同一时间管理多个独立的任务,而不是并行(同时在多个CPU核心上执行)。对于I/O密集型任务(如网络通信),并发通过事件循环实现,可以显著提高程序的吞吐量。

  1. asyncio 事件循环如何管理多个并发WebSocket连接
    asyncio 事件循环是 asyncio 程序的“大脑”。它负责:

    • 注册任务:当一个协程被 asyncio.create_task() 提交给事件循环时,它就成为一个“任务”。
    • 监视I/O事件:事件循环持续地监视套接字(socket)上的I/O事件,例如是否有新的数据可读,或者是否可以写入数据。
    • 调度协程:当一个协程遇到 await 关键字(例如 await websocket.recv()await websocket.send())时,它会暂停执行,并将控制权交还给事件循环。事件循环此时可以去执行其他已准备好的任务。当I/O操作完成时,事件循环会唤醒之前暂停的协程,让它从暂停的地方继续执行。
    • 单线程高效:整个事件循环运行在一个单独的线程中。通过这种协作式多任务(cooperative multitasking)的方式,一个Python进程可以高效地处理成千上万个并发连接,而无需为每个连接创建一个新的线程或进程,从而大大节省了系统资源。

    在我们的 echo_server 示例中:

    • websockets.serve() 启动后,它会接受传入的TCP连接。
    • 每当一个新的TCP连接升级为WebSocket连接时,websockets 库会为 echo_server(websocket, path) 这个协程创建一个新的 asyncio.Task,并将其提交到事件循环中。
    • 因此,即使有100个客户端同时连接到服务器,就会有100个 echo_server 协程任务在事件循环中并发运行。它们之间通过 await 语句协作,不会相互阻塞。
  2. asyncio.create_task 在并发连接中的应用
    尽管 websockets.serve() 内部已经为每个连接处理函数创建了任务,但在更复杂的场景中,你可能需要在单个WebSocket连接的处理函数内部,也启动多个并发子任务。例如,一个客户端连接可能同时需要:

    • 持续接收来自客户端的消息。
    • 定期向客户端发送心跳消息。
    • 监听来自后端系统(如消息队列)的广播消息,并转发给客户端。

    在这种情况下,你可以使用 asyncio.create_task() 来创建并运行这些独立的并发子任务。

    import asyncioimport websocketsimport datetime # 导入datetime模块用于时间操作import json # 导入json模块用于序列化数据connected_clients = set() # 定义一个全局集合,用于存储所有活动的WebSocket连接对象async def send_time_periodically(websocket): # 定义一个异步函数,用于周期性发送时间给客户端 try: # 使用try-except块处理任务中的异常 while True: # 无限循环,持续发送 now = datetime.datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\") # 获取当前时间并格式化 message = json.dumps({ \"type\": \"time_update\", \"time\": now}) # 将时间封装为JSON格式的字符串 await websocket.send(message) # 发送时间消息给客户端 await asyncio.sleep(5) # 等待5秒,然后再次发送 except websockets.exceptions.ConnectionClosed: # 捕获连接关闭异常 print(f\"定时发送任务: 连接 {  websocket.remote_address} 已关闭.\") # 打印任务因连接关闭而结束的信息 except Exception as e: # 捕获其他未知异常 print(f\"定时发送任务异常: {  e}\") # 打印任务异常信息async def chat_handler(websocket, path): # 定义聊天处理函数 connected_clients.add(websocket) # 将新的WebSocket连接添加到全局连接集合中 print(f\"新连接来自: {  websocket.remote_address}, 当前活跃连接数: {  len(connected_clients)}\") # 打印连接信息和当前连接数 # 为这个连接创建一个独立的任务来周期性发送时间,使其与消息接收任务并发运行 # asyncio.create_task(send_time_periodcally(websocket)) 会立即返回一个Task对象, # 并将send_time_periodcally协程调度到事件循环中运行,不会阻塞当前chat_handler的执行。 send_time_task = asyncio.create_task(send_time_periodically(websocket)) # 创建一个任务,用于周期性发送时间 try: # 使用try-except-finally块处理连接的生命周期 async for message in websocket: # 异步循环接收来自客户端的消息 print(f\"收到来自 {  websocket.remote_address} 的消息: {  message}\") # 打印收到的消息 # 收到消息后,向所有连接的客户端广播这条消息 # 注意:这里遍历的是一个集合,如果集合在迭代过程中被修改(例如有连接断开), # 可能会导致RuntimeError。更安全的做法是创建一个副本或使用队列。 # 但对于简单示例,这样足够。 for client in connected_clients: # 遍历所有已连接的客户端 if client != websocket: # 不向发送消息的客户端自己发送  try: # 尝试向客户端发送消息 await client.send(f\"[{  websocket.remote_address}] 说: {  message}\") # 向其他客户端发送广播消息  except websockets.exceptions.ConnectionClosed: # 捕获发送时发现连接已关闭的异常 # 这个连接可能刚刚断开,我们在这里暂时不处理移除,留到finally中统一处理 pass # 忽略已关闭连接的发送失败 except websockets.exceptions.ConnectionClosed: # 捕获连接正常关闭或异常关闭 print(f\"连接 {  websocket.remote_address} 已关闭.\") # 打印连接关闭信息 finally: # 无论连接如何关闭,都会执行finally块 connected_clients.remove(websocket) # 从连接集合中移除当前已关闭的连接 send_time_task.cancel() # 取消周期性发送时间的任务,释放资源 await send_time_task # 等待任务完成取消操作,防止挂起 print(f\"连接 {  websocket.remote_address} 处理结束. 剩余活跃连接数: {  len(connected_clients)}\") # 打印连接处理结束信息async def main_chat_server(): # 主服务器函数 print(\"聊天服务器正在启动...\") # 打印服务器启动信息 # 启动WebSocket服务器,监听所有IP的8765端口,并使用chat_handler处理连接 server = await websockets.serve(chat_handler, \"0.0.0.0\", 8765) # 启动WebSocket服务器 print(\"聊天服务器已在 ws://0.0.0.0:8765 监听...\") # 打印服务器监听地址 await server.wait_closed() # 等待服务器关闭if __name__ == \"__main__\": # 主程序入口 asyncio.run(main_chat_server()) # 运行主服务器函数

    在这个聊天服务器示例中,我们为每个新连接做了两件事:

    • 通过 async for message in websocket: 循环处理来自该客户端的消息。
    • 通过 asyncio.create_task(send_time_periodically(websocket)) 创建了一个独立的协程任务,用于每隔5秒向该客户端发送当前时间。
      这两个任务在同一个事件循环中并发运行,互不阻塞,提高了单个连接的功能性。当连接关闭时,我们也会取消相应的子任务,确保资源被正确释放。
  3. 服务器端连接管理:集合 (set) 来跟踪活动连接
    在WebSocket应用中,服务器经常需要知道所有当前连接的客户端,以便进行消息广播(如聊天室)或定向发送。一种常见的模式是使用一个Python set 集合来存储所有活跃的 websocket 对象。

    • 添加连接:当一个新连接建立时(chat_handler 被调用),将其 websocket 对象添加到 connected_clients 集合中。
      connected_clients.add(websocket) # 将新的websocket连接对象添加到集合中
    • 移除连接:当一个连接关闭(无论是客户端主动关闭、服务器主动关闭,还是因为网络错误),async for 循环会终止,finally 块会被执行。此时,我们从 connected_clients 集合中移除对应的 websocket 对象。
      connected_clients.remove(websocket) # 从集合中移除已关闭的websocket连接对象
    • 广播消息:要向所有连接的客户端发送消息,只需遍历 connected_clients 集合,并对每个 websocket 对象调用 await client.send(message)。在遍历时,最好加入 try-except websockets.exceptions.ConnectionClosed 块,以防在遍历过程中某个连接恰好断开。

    这种集合管理方式简单有效,适用于大多数并发量中等偏上的场景。对于极高并发量或需要跨多个服务器实例进行消息广播的场景,你可能需要引入消息队列(如Redis Pub/Sub、Kafka等)来作为“Channel Layer”,这将在后续章节中深入探讨。

2.3 构建第一个WebSocket客户端

有了服务器,我们也需要一个客户端来与之通信。websockets 库同样提供了简洁的API来创建WebSocket客户端。

2.3.1 基础客户端结构:websockets.connect

websockets.connect() 函数用于建立一个WebSocket连接。它是一个异步上下文管理器,这意味着你可以使用 async with 语句来管理连接的生命周期,确保连接在代码块结束时被正确关闭。

import asyncio # 导入asyncio库,用于异步编程import websockets # 导入websockets库,用于WebSocket客户端# 定义一个异步函数作为客户端的主逻辑async def simple_client(): # 定义一个异步函数,作为WebSocket客户端的主入口 uri = \"ws://localhost:8765\" # 定义WebSocket服务器的URI地址 print(f\"尝试连接到服务器: { uri}\") # 打印连接尝试信息 try: # 使用try-except块处理连接过程中的异常 # websockets.connect() 是一个异步上下文管理器。 # 当进入\'async with\'块时,它会尝试建立WebSocket连接并完成握手。 # 如果成功,websocket对象将可用。当退出\'async with\'块时(无论正常退出还是异常退出), # 连接都会被自动关闭。 async with websockets.connect(uri) as websocket: # 异步上下文管理器,建立WebSocket连接 print(f\"成功连接到服务器: { uri}\") # 打印连接成功信息 # 1. 客户端发送一条消息给服务器 client_message = \"你好,服务器!\" # 定义要发送的文本消息 await websocket.send(client_message) # 异步发送消息 print(f\"客户端发送: { client_message}\") # 打印发送的消息 # 2. 客户端等待并接收服务器的回应 server_response = await websocket.recv() # 异步接收服务器发回的消息 print(f\"客户端接收: { server_response}\") # 打印接收到的回应 # 客户端可以继续发送和接收多条消息 another_message = \"这是我的第二条消息。\" # 定义第二条要发送的消息 await websocket.send(another_message) # 异步发送第二条消息 print(f\"客户端发送: { another_message}\") # 打印第二条发送的消息 second_response = await websocket.recv() # 异步接收第二条回应 print(f\"客户端接收: { second_response}\") # 打印接收到的第二条回应 # 3. 连接在 \'async with\' 块结束时自动关闭 print(\"连接已关闭 (由async with自动处理).\") # 打印连接关闭信息 except ConnectionRefusedError: # 捕获连接被拒绝的错误(例如服务器未运行) print(f\"错误: 连接被拒绝。请确保服务器在 { uri} 上运行。\") # 打印连接拒绝错误 except Exception as e: # 捕获其他未知异常 print(f\"客户端发生未预期错误: { e}\") # 打印其他错误信息# 程序的入口点,运行客户端主逻辑if __name__ == \"__main__\": # 检查是否作为主程序运行 asyncio.run(simple_client()) # 运行客户端主函数

运行上述客户端代码的步骤:

  1. 确保之前启动的 server.py 正在运行。
  2. 将上述客户端代码保存为 client.py
  3. 打开新的终端或命令行窗口。
  4. 激活你的Python虚拟环境。
  5. 运行 python client.py 命令。
    你将看到客户端连接到服务器,发送消息,并接收服务器的回应。在服务器端的终端,你也会看到相应的日志输出。
2.3.2 客户端异步收发:独立任务与并发

在实际应用中,客户端通常需要同时进行发送和接收操作。例如,在一个聊天应用中,客户端既要不断监听服务器发来的新消息,又要能随时发送用户输入的消息。这同样可以通过 asyncio.create_task()asyncio.gather() 实现。

import asyncio # 导入asyncio库import websockets # 导入websockets库async def receive_messages(websocket): # 定义一个异步函数,专门用于接收消息 try: # 使用try-except块处理接收过程中的异常 async for message in websocket: # 异步循环,持续从WebSocket连接接收消息 print(f\"收到服务器消息: { message}\") # 打印接收到的消息 except websockets.exceptions.ConnectionClosedOK: # 捕获连接正常关闭的异常 print(\"服务器连接已正常关闭.\") # 打印连接关闭信息 except websockets.exceptions.ConnectionClosedError as e: # 捕获连接异常关闭的异常 print(f\"服务器连接异常关闭: { e}\") # 打印连接异常关闭信息 except Exception as e: # 捕获其他未知异常 print(f\"接收消息时发生错误: { e}\") # 打印接收消息时的错误async def send_messages(websocket): # 定义一个异步函数,专门用于发送消息(例如模拟用户输入) try: # 使用try-except块处理发送过程中的异常 messages_to_send = [ # 定义一个消息列表 \"你好,服务器!\", \"很高兴和你通信。\", \"请问现在几点了?\", \"再见!\" ] for msg in messages_to_send: # 遍历消息列表 await websocket.send(msg) # 异步发送每条消息 print(f\"已发送: { msg}\") # 打印已发送的消息 await asyncio.sleep(1) # 每发送一条消息后等待1秒 except websockets.exceptions.ConnectionClosed: # 捕获连接关闭异常 print(\"发送消息任务: 服务器连接已关闭.\") # 打印发送任务因连接关闭而结束的信息 except Exception as e: # 捕获其他未知异常 print(f\"发送消息时发生错误: { e}\") # 打印发送消息时的错误async def client_with_concurrent_io(): # 定义主客户端逻辑,实现并发收发 uri = \"ws://localhost:8765\" # 服务器URI print(f\"尝试连接到服务器: { uri}\") # 打印连接尝试信息 try: # 使用try-except块处理连接异常 async with websockets.connect(uri) as websocket: # 建立WebSocket连接 print(f\"成功连接到服务器: { uri}\") # 打印连接成功信息 # 创建两个独立的任务:一个用于接收消息,一个用于发送消息。 # asyncio.create_task() 会将协程包装成一个Task对象并调度到事件循环中运行。 # 它们会并发执行,不会相互阻塞。 receive_task = asyncio.create_task(receive_messages(websocket)) # 创建接收消息的任务 send_task = asyncio.create_task(send_messages(websocket)) # 创建发送消息的任务 # 使用 asyncio.gather() 等待这两个任务完成。 # 这意味着主协程会在这里暂停,直到receive_task和send_task都完成或发生异常。 # 如果其中一个任务因连接关闭而结束,另一个任务可能也会随之结束。 await asyncio.gather(receive_task, send_task) # 并发运行并等待接收和发送任务完成 print(\"所有客户端任务已完成或连接已关闭。\") # 打印所有任务完成信息 except ConnectionRefusedError: # 捕获连接拒绝错误 print(f\"错误: 连接被拒绝。请确保服务器在 { uri} 上运行。\") # 打印错误信息 except Exception as e: # 捕获其他未知错误 print(f\"客户端主函数发生未预期错误: { e}\") # 打印主函数错误信息if __name__ == \"__main__\": # 主程序入口 asyncio.run(client_with_concurrent_io()) # 运行客户端并发I/O函数

client_with_concurrent_io 函数中:

  • receive_messages(websocket) 协程在一个无限循环中持续调用 async for message in websocket 来接收消息。
  • send_messages(websocket) 协程按照预定的顺序发送消息。
  • asyncio.create_task() 将这两个协程转换为独立的任务,并将它们调度到事件循环中。
  • await asyncio.gather(receive_task, send_task) 会等待这两个任务都完成。这意味着只要其中一个任务因为连接断开(例如,服务器关闭或网络问题)而终止,gather 就会完成,从而退出 async with 块,关闭WebSocket连接。

这种模式在需要客户端同时监听和发送数据的场景中非常常见,如聊天应用、游戏客户端等。

  • 处理接收到的消息类型:文本与二进制
    与服务器端类似,客户端接收到的消息也会根据原始WebSocket帧的Opcode自动转换为Python strbytes 类型。

    # ... 在 receive_messages 函数内部 ...async for message in websocket: # 异步循环接收消息 if isinstance(message, str): # 如果消息是字符串类型 print(f\"收到服务器文本消息: {  message}\") # 打印文本消息 # 在这里处理文本消息的逻辑,例如显示在UI上 elif isinstance(message, bytes): # 如果消息是字节串类型 print(f\"收到服务器二进制消息,长度: {  len(message)} 字节\") # 打印二进制消息长度 # 在这里处理二进制消息的逻辑,例如保存文件或解析图像 else: # 处理不预期的消息类型 print(f\"收到未知类型的消息: {  type(message)}\") # 打印未知类型消息

    发送消息时,websockets 库也会根据你传入的Python类型自动选择正确的WebSocket帧Opcode:

    • 发送 str 类型的数据:await websocket.send(\"Hello\") 将发送一个文本帧。
    • 发送 bytes 类型的数据:await websocket.send(b\"\\x01\\x02\\x03\") 将发送一个二进制帧。
2.3.3 客户端断开连接与重连策略

在实际网络环境中,WebSocket连接可能会由于多种原因断开:服务器重启、网络故障、客户端断网、连接超时等。健壮的客户端应用通常需要实现断线重连机制,以提高用户体验和应用的稳定性。

  1. 捕获连接关闭异常
    websockets 库会在连接关闭时抛出特定的异常,这使得我们能够精确地处理不同类型的连接关闭事件。

    • websockets.exceptions.ConnectionClosedOK: 表示连接被正常、优雅地关闭。例如,服务器或客户端主动发送了关闭帧。
    • websockets.exceptions.ConnectionClosedError: 表示连接因异常情况(如网络错误、协议违规、TLS错误)而关闭。这个异常对象通常会包含一个 code 属性(WebSocket关闭状态码)和 reason 属性(关闭原因字符串)。

    async for message in websocket:await websocket.recv() / await websocket.send() 内部,这些异常会被抛出,你需要用 try...except 块来捕获它们。

    import asyncioimport websocketsasync def client_with_reconnect_attempt(): # 定义一个具有重连机制的客户端函数 uri = \"ws://localhost:8765\" # 服务器URI reconnect_delay = 1 # 初始重连延迟秒数 max_reconnect_delay = 32 # 最大重连延迟秒数 reconnect_attempts = 0 # 重连尝试次数 while True: # 无限循环,尝试持续连接和重连 print(f\"尝试连接到 {  uri} (尝试 {  reconnect_attempts + 1})...\") # 打印当前重连尝试信息 try: # 尝试建立和维持WebSocket连接 async with websockets.connect(uri) as websocket: # 建立WebSocket连接 print(f\"成功连接到服务器: {  uri}\") # 打印连接成功信息 reconnect_attempts = 0 # 连接成功,重置重连尝试次数和延迟 reconnect_delay = 1 # 重置重连延迟 while True: # 连接成功后,持续发送和接收消息  try: # 在连接内部,再次使用try-except处理发送和接收过程中的可能错误 message_to_send = f\"客户端消息 {  asyncio.get_event_loop().time()}\" # 构造发送消息 await websocket.send(message_to_send) # 异步发送消息 print(f\"已发送: {  message_to_send}\") # 打印发送消息 response = await websocket.recv() # 异步接收消息 print(f\"收到服务器回应: {  response}\") # 打印收到回应 await asyncio.sleep(2) # 等待2秒再发送下一条消息  except websockets.exceptions.ConnectionClosedOK: # 捕获连接正常关闭 print(\"连接正常关闭,准备退出当前连接循环。\") # 打印正常关闭信息 break # 退出内部while True循环,进入外层循环进行重连(如果需要)  except websockets.exceptions.ConnectionClosedError as e: # 捕获连接异常关闭 print(f\"连接异常关闭: Code={  e.code}, Reason={  e.reason}\") # 打印异常关闭信息 break # 退出内部while True循环,进入外层循环进行重连  except Exception as e: # 捕获其他未知错误 print(f\"连接内部发生错误: {  e}\") # 打印内部错误 break # 退出内部while True循环,进入外层循环进行重连 except ConnectionRefusedError: # 捕获连接被拒绝的错误(服务器可能未运行或防火墙问题) print(f\"连接到 {  uri} 被拒绝。\") # 打印连接拒绝信息 except Exception as e: # 捕获建立连接时的其他错误(如网络不可达) print(f\"连接到 {  uri} 失败: {  e}\") # 打印连接失败信息 reconnect_attempts += 1 # 增加重连尝试次数 # 指数退避策略:每次重连失败,等待时间翻倍,但有最大值限制 current_delay = min(reconnect_delay, max_reconnect_delay) # 计算当前等待延迟 print(f\"将在 {  current_delay:.2f} 秒后尝试重新连接...\") # 打印重连等待时间 await asyncio.sleep(current_delay) # 异步等待指定时间 reconnect_delay *= 2 # 下次重连延迟翻倍# 注意:这个无限循环的客户端会持续尝试重连,除非程序被外部终止。# 在实际应用中,通常会添加一个最大重连次数限制,或者一个外部停止信号。
if __name__ == \"__main__\": # 主程序入口 asyncio.run(client_with_reconnect_attempt()) # 运行带重连机制的客户端函数 print(\"客户端已停止运行。\") # 打印客户端停止信息
  1. 实现指数退避重连机制
    在上述示例中,我们实现了一个基本的指数退避(Exponential Backoff)重连策略。这是网络编程中常用的方法,旨在避免在网络不稳定时对服务器造成过大的负载。

    • 初始延迟:设置一个较小的初始重连延迟(例如1秒)。
    • 每次失败翻倍:每次连接尝试失败后,将下一次的重连延迟时间翻倍。
    • 最大延迟:设置一个最大重连延迟,避免等待时间过长。
    • 抖动(Jitter):在更复杂的实现中,你可能会在每次计算出的延迟时间上添加一个随机的小范围抖动(例如 current_delay * random.uniform(0.8, 1.2)),以防止多个客户端在同一时间点尝试重连,从而避免“雷暴”效应。这个例子没有包含抖动,但实际应用中可以考虑。
    • 重置延迟:一旦成功连接,重置重连尝试次数和延迟时间,以便下次断开时从头开始退避。
  2. 断线重连的实际考量

    • 身份认证:在重连后,客户端通常需要重新进行身份认证。如果服务器有会话管理,可能需要恢复会话。
    • 数据同步:如果连接断开期间有数据丢失,或者业务逻辑要求数据连续性,客户端可能需要在重连后请求服务器同步缺失的数据。
    • 用户体验:在尝试重连时,向用户提供视觉反馈(例如“正在尝试重新连接…”),并在重连失败多次后,告知用户并提供手动重试选项。
    • 资源清理:确保在连接断开时,所有与该连接相关的资源(如创建的 asyncio.Task、文件句柄等)都被正确清理,防止资源泄漏。async with 语句和 try...finally 块在这方面提供了很好的帮助。
    • 错误码和原因websockets.exceptions.ConnectionClosedError 提供了 codereason。你可以根据这些标准WebSocket关闭码(如1000表示正常关闭,1006表示异常关闭无状态码)来采取不同的重连策略或日志记录。

2.4 深入 WebSocket 消息类型与编码/解码

WebSocket协议支持两种基本的数据消息类型:文本(UTF-8编码的字符串)和二进制(任意字节序列)。websockets 库在Python层面上自动处理了这些类型的转换,使得开发人员能够直接使用Python的 strbytes 类型。

2.4.1 文本消息 (Text Frames):UTF-8编码

WebSocket协议规定文本帧的负载数据必须是UTF-8编码的文本。这是为了确保不同平台和语言之间文本数据的一致性传输。

  • 发送Python字符串
    当你在 websockets 中调用 await websocket.send(your_string) 时,如果 your_string 是一个Python str 对象,websockets 库会自动将其编码为UTF-8字节序列,并将其封装在一个 Opcode = %x1(文本帧)的WebSocket帧中发送。

    import asyncioimport websocketsasync def send_text_client(): # 定义一个发送文本消息的客户端函数 uri = \"ws://localhost:8765\" # 服务器URI async with websockets.connect(uri) as websocket: # 建立WebSocket连接 message = \"你好,世界!这是一条中文文本消息。\" # 定义包含中文的文本消息 await websocket.send(message) # 异步发送此文本消息 print(f\"客户端发送文本: \'{  message}\'\") # 打印发送的文本消息 response = await websocket.recv() # 异步接收服务器回应 print(f\"客户端收到回应: \'{  response}\'\") # 打印收到的回应if __name__ == \"__main__\": # 主程序入口 asyncio.run(send_text_client()) # 运行客户端函数

    服务器端(使用之前定义的 echo_serverchat_handler)会接收到这个UTF-8编码的帧,并自动解码为Python str 类型。

  • 接收并解码为Python字符串
    当服务器或客户端通过 await websocket.recv()async for message in websocket 接收到一个 Opcode = %x1 的帧时,websockets 库会负责解码其负载数据。如果解码成功,message 变量将是Python的 str 类型。

    # 假设在服务器端的处理函数中async def process_text_and_binary(websocket, path): # 定义一个处理文本和二进制消息的函数 async for message in websocket: # 异步循环接收消息 if isinstance(message, str): # 如果接收到的消息是字符串类型 print(f\"服务器收到文本: {  message}\") # 打印收到的文本消息 # 可以在这里对文本进行业务逻辑处理,例如解析JSON、执行命令等 response_text = f\"服务器已处理文本: \'{  message}\',长度: {  len(message)}字符\" # 构造文本回应 await websocket.send(response_text) # 发送文本回应 # ... 处理二进制消息的部分 ...
  • 非UTF-8数据的处理与错误
    如果一个客户端尝试发送一个被标记为文本帧但其负载数据实际上不是有效的UTF-8编码的字节序列,或者服务器收到一个非UTF-8的文本帧,websockets 库通常会检测到这种情况并抛出解码错误(例如 UnicodeDecodeError)。
    根据WebSocket协议(RFC 6455 Section 5.6),如果文本消息的帧负载数据不是有效的UTF-8编码,接收端应该关闭连接,并发送一个状态码为 1007(表示“消息负载数据不是有效的UTF-8数据”)的关闭帧。websockets 库通常会遵循这一规范。

    因此,在发送文本消息时,始终确保你的数据是可编码为UTF-8的合法字符串。

2.4.2 二进制消息 (Binary Frames):bytes 类型

二进制帧用于传输任意的原始字节数据,例如图片、音频、文件、序列化后的对象(如ProtoBuf、MessagePack)等。

  • 发送Python bytes 对象
    当你在 websockets 中调用 await websocket.send(your_bytes) 时,如果 your_bytes 是一个Python bytes 对象,websockets 库会将其作为原始字节数据封装在一个 Opcode = %x2(二进制帧)的WebSocket帧中发送。不会进行任何编码或解码操作。

    import asyncioimport websocketsimport os # 导入os模块用于文件操作async def send_binary_client(): # 定义一个发送二进制消息的客户端函数 uri = \"ws://localhost:8765\" # 服务器URI async with websockets.connect(uri) as websocket: # 建立WebSocket连接 # 假设我们有一个小图片文件 \'test_image.png\' # 为了演示,这里创建一个简单的二进制数据 binary_data = os.urandom(128) # 生成128字节的随机二进制数据作为演示 print(f\"客户端发送二进制数据,长度: {  len(binary_data)} 字节\") # 打印发送的二进制数据长度 await websocket.send(binary_data) # 异步发送二进制数据 # 接收服务器回应,服务器可能会发送二进制或文本回应 response = await websocket.recv() # 异步接收服务器回应 if isinstance(response, bytes): # 如果收到的是二进制回应 print(f\"客户端收到二进制回应,长度: {  len(response)} 字节\") # 打印二进制回应长度 elif isinstance(response, str): # 如果收到的是文本回应 print(f\"客户端收到文本回应: \'{  response}\'\") # 打印文本回应if __name__ == \"__main__\": # 主程序入口 asyncio.run(send_binary_client()) # 运行客户端函数
  • 接收并处理原始二进制数据
    当服务器或客户端通过 await websocket.recv()async for message in websocket 接收到一个 Opcode = %x2 的帧时,message 变量将是Python的 bytes 类型,包含了原始的二进制负载数据。

    # 假设在服务器端的处理函数中async def process_text_and_binary(websocket, path): # 定义一个处理文本和二进制消息的函数 async for message in websocket: # 异步循环接收消息 # ... 处理文本消息的部分 ... if isinstance(message, bytes): # 如果接收到的消息是字节串类型 print(f\"服务器收到二进制数据,长度: {  len(message)} 字节\") # 打印收到的二进制数据长度 # 在这里对二进制数据进行业务逻辑处理,例如: # 1. 保存到文件: with open(\"received_file.bin\", \"wb\") as f: f.write(message) # 2. 图片处理: image = Image.open(io.BytesIO(message)) # 3. 反序列化: deserialized_obj = my_serializer.loads(message) response_binary = b\"服务器已处理二进制数据,长度: \" + str(len(message)).encode(\'utf-8\') + b\" 字节\" # 构造二进制回应 await websocket.send(response_binary) # 发送二进制回应
  • 实际应用场景:图片、音频、序列化数据

    • 图片/视频流:实时传输摄像头图像、视频帧。
    • 音频流:语音聊天、实时语音识别。
    • 文件传输:小文件上传下载。
    • 序列化数据:当传输的数据结构复杂且需要高性能时,可以使用jsonpickle(Python专用)、MessagePackProtocol Buffers等库将Python对象序列化为二进制数据进行传输,接收方再进行反序列化。二进制传输比JSON文本传输通常更高效(更小的负载体积,更快的解析速度),尤其是在数据量大或结构复杂时。
2.4.3 混合消息类型处理的策略

在许多WebSocket应用中,客户端和服务器可能需要交换多种类型的数据,例如文本消息用于命令和元数据,二进制消息用于实际数据负载。正确地处理混合消息类型是构建灵活应用的关键。

最佳实践是始终检查 message 变量的类型 (isinstance(message, str)isinstance(message, bytes)),然后根据类型执行相应的业务逻辑。

import asyncioimport websocketsimport json # 导入json模块用于处理JSON数据import io # 导入io模块用于处理二进制流from PIL import Image # 导入Pillow库的Image模块用于图像处理# 假设服务器端处理函数async def mixed_message_handler(websocket, path): # 定义一个混合消息处理函数 print(f\"新连接来自: { websocket.remote_address}, 路径: { path}\") # 打印连接信息 try: # 使用try-except处理连接生命周期 async for message in websocket: # 异步循环接收消息 if isinstance(message, str): # 如果是文本消息 print(f\"收到文本消息: { message}\") # 打印文本消息 try: # 尝试将文本消息解析为JSON  data = json.loads(message) # 解析JSON字符串  if data.get(\"type\") == \"chat\": # 如果是聊天消息 chat_msg = data.get(\"content\") # 获取聊天内容 print(f\"聊天消息: { chat_msg}\") # 打印聊天消息 await websocket.send(json.dumps({ \"status\": \"ok\", \"response\": f\"已收到聊天: { chat_msg}\"})) # 发送JSON回应  elif data.get(\"type\") == \"command\": # 如果是命令消息 command = data.get(\"name\") # 获取命令名称 args = data.get(\"args\") # 获取命令参数 print(f\"执行命令: { command} with { args}\") # 打印命令信息 await websocket.send(json.dumps({ \"status\": \"ok\", \"response\":