WebRTC:成功实现公网双向音视频传输(Python、js)_python webrtc
1. 前言
工作期间纯记录,部分内容选摘自DeepSeek、ChatGPT。本文基于aiortc库实现跨局域网音视频传输WebRTC(Web Real-Time Communication),相关的网络协议包括SDP、ICE、STUN、TURN等等,整个通信建立流程:信令阶段、建立连接、最后握手媒体传输;
STUN的作用:NAT的作用,返回client的IP和PORT;
TURN的作用:实现中继;提供一些中继IP的备选(ICE候选)
ICE:实现P2P连接,测试ICE候选,然后选个
SDP:一些描述协议,比如说传输的是audio还是vedio,用的什么方式解码之类的
信令服务(signaling server):交换SDP,ICE等,可以通过websoket、http实现
2. 测试环境与搭建流程:
公网服务器一台
windows平台+mic+speaker
linux平台+camera+mic+speaker
从示例的图片中,可以明确的看见在哪些设备上做什么工作,具体工作如下:
在公网服务器上搭建信令服务、stun/turn服务
在两个客户端分别部署客户端,用ws连接信令服务,再调aiortc的库来实现
测试过程中,在公网服务器上记得开放3478等其他需要用到的UDP端口;
3. 搭建细节步骤:
3.1 在公网服务器搭建内容
1)部署turn服务,apt安装coturn
通过sudo vi /etc/turnserver.conf 配置turn服务:
# TURN 基本配置listening-port=3478tls-listening-port=5349fingerprintlistening-ip=公网服务器的局域网ip,ifconfig可见(192.168.xx.xx)realm=公网iprelay-ip=192.168.xx.xxlt-cred-mechexternal-ip=公网ip/192.168.xx.xxuser=user:GNUmin-port=49152max-port=65535log-file=/var/tmp/turn.logno-multicast-peersno-tlsno-dtlsverbose
启动turn服务
systemctl status coturn #查看状态systemctl enable coturn systemctl restart coturn
2) stun服务:
stun.l.google.com:19302 支持UDP
3)部署信令服务 ,vi signal_server.js ,node signal_server.js启动
const http = require(\'http\');const WebSocket = require(\'ws\');const PORT = 3000;// 1. 创建HTTP服务器(仅用于升级WebSocket连接)const server = http.createServer((req, res) => { res.writeHead(200, { \'Content-Type\': \'text/plain\' }); res.end(\'WebRTC Signaling Server (WebSocket only)\');});// 2. 创建WebSocket服务器const wss = new WebSocket.Server({ server });// 3. 客户端管理const clients = new Map();wss.on(\'connection\', (ws, req) => { const id = Date.now().toString(36); // 生成客户端ID const ip = req.socket.remoteAddress; clients.set(id, ws); console.log(`[+] Client connected: ${id} from ${ip}`); console.log(`[~] Total clients: ${clients.size}`); // 收到消息时 ws.on(\'message\', (data) => { try { const msg = JSON.parse(data); console.log(`[>] Message from ${id}:`, msg); let forwardCount = 0; clients.forEach((client, clientId) => { if (clientId !== id && client.readyState === WebSocket.OPEN) { client.send(JSON.stringify({ ...msg, sender: id })); forwardCount++; } }); console.log(`[~] Forwarded message from ${id} to ${forwardCount} client(s)`); } catch (err) { console.error(`[x] Error handling message from ${id}:`, err.message); } }); // 客户端断开时 ws.on(\'close\', () => { clients.delete(id); console.log(`[-] Client disconnected: ${id}`); console.log(`[~] Remaining clients: ${clients.size}`); }); // 客户端错误 ws.on(\'error\', (err) => { console.error(`[!] WebSocket error from ${id}:`, err.message); });});// 4. 启动服务server.listen(PORT, \'0.0.0.0\', () => { console.log(`[✔] Signaling server running on ws://0.0.0.0:${PORT}`);});
3.2 windows客户端接收视频流与音频流(html版本,chrome浏览器)
注意修改你的公网IP,用于turn和signaling服务的连接,首先实现了单向音视频的接收,因为是HTTP而不是HTTPS(暂时没有实现),所以不支持获取windows端的麦克风,windows获取mic数据发送的代码我另外用了python实现;
1)单向音视频的接收
WebRTC Realsense Viewer body { font-family: Arial, sans-serif; margin: 20px; } video { background: black; width: 640px; height: 480px; } #status { margin: 10px 0; padding: 10px; background: #f0f0f0; } Realsense WebRTC Viewer
Connecting to signaling server... const statusDiv = document.getElementById(\'status\'); const video = document.getElementById(\'video\'); const signalingServer = \'ws://你的公网ip:3000\'; const room = \'realsense\'; let pc = null; let ws = null; function updateStatus(message) { statusDiv.textContent = message; console.log(message); } async function start() { updateStatus(\"Starting WebRTC connection...\"); pc = new RTCPeerConnection({ iceServers: [ { urls: \'stun:stun.l.google.com:19302\' }, { urls: \'turn:你的公网IP:3478\', username: \'username\', credential: \'password\' } ] }); pc.onicecandidate = event => { if (event.candidate) { sendMessage({ type: \'candidate\', candidate: event.candidate }); } }; pc.ontrack = event => { if (event.track.kind === \'video\') { updateStatus(\"Received remote track\"); video.srcObject = event.streams[0]; } }; pc.oniceconnectionstatechange = () => { updateStatus(`ICE connection state: ${pc.iceConnectionState}`); }; ws = new WebSocket(signalingServer); ws.onopen = () => { updateStatus(\"Connected to signaling server\"); // 🔥 通知 signaling server 加入房间,请求 offer sendMessage({ type: \'join\', room: room }); }; ws.onmessage = async (message) => { const data = JSON.parse(message.data); if (data.type === \'offer\') { updateStatus(\"Received offer, creating answer...\"); await pc.setRemoteDescription(new RTCSessionDescription(data)); const answer = await pc.createAnswer(); await pc.setLocalDescription(answer); sendMessage(answer); } else if (data.type === \'candidate\') { try { await pc.addIceCandidate(new RTCIceCandidate(data.candidate)); } catch (e) { console.error(\'Error adding ICE candidate:\', e); } } }; ws.onerror = (error) => { updateStatus(`WebSocket error: ${error.message}`); }; ws.onclose = () => { updateStatus(\"WebSocket connection closed\"); }; } function sendMessage(message) { if (ws && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify(message)); } } start();
2) windows获取本地mic数据传输
import asyncioimport loggingimport jsonimport websocketsimport platformimport numpy as npimport sounddevice as sdfrom fractions import Fractionfrom aiortc import RTCPeerConnection, RTCSessionDescription, RTCConfiguration, RTCIceServer, AudioStreamTrack# 日志配置logging.basicConfig(level=logging.INFO)logger = logging.getLogger(\'WebRTC-Sender\')class SoundDeviceAudioTrack(AudioStreamTrack): kind = \"audio\" def __init__(self, device=None, samplerate=44100, channels=1): super().__init__() self.sample_rate = samplerate self.channels = channels self.blocksize = 120 self.buffer = asyncio.Queue() self.frames_sent = 0 def callback(indata, frames, time_info, status): if status: logger.warning(f\"音频状态警告: {status}\") #volume = np.sqrt(np.mean(indata ** 2)) self.buffer.put_nowait(indata.copy()) #logger.info(f\"采集音频块 shape={indata.shape}, dtype={indata.dtype}, volume={volume:.4f}\") self.stream = sd.InputStream( device=device, channels=self.channels, samplerate=self.sample_rate, dtype=\'int16\', blocksize=self.blocksize, callback=callback ) self.stream.start() async def recv(self): data = await self.buffer.get() #logger.info(f\"[SEND] 从 buffer 中读取音频块 shape={data.shape}, dtype={data.dtype}, max={np.max(data):.4f}\") frame = np.squeeze(data) return self._create_av_frame(frame) def _create_av_frame(self, data): import av frame = av.AudioFrame(format=\'s16\', layout=\'mono\', samples=len(data)) frame.planes[0].update(data.tobytes()) frame.sample_rate = self.sample_rate frame.time_base = Fraction(1, self.sample_rate) frame.pts = self.frames_sent self.frames_sent += len(data) return framedef list_input_devices(): print(\"可用输入设备:\") for i, device in enumerate(sd.query_devices()): if device[\'max_input_channels\'] > 0: print(f\"[{i}] {device[\'name\']}\")class WebRTCSender: def __init__(self, signaling_server, room, input_device=None): self.signaling_server = signaling_server self.room = room self.input_device = input_device ice_servers = [ RTCIceServer(urls=[\"stun:stun.l.google.com:19302\"]), RTCIceServer(urls=[\"turn:106.12.81.90:3478\"], username=\"username\", credential=\"password\") ] self.pc = RTCPeerConnection(RTCConfiguration(ice_servers)) self.ws = None async def connect_signaling_server(self): try: self.ws = await websockets.connect(self.signaling_server) await self.ws.send(json.dumps({\"type\": \"join\", \"room\": self.room})) logger.info(\"已连接信令服务器\") await self.start_streaming() await self.handle_messages() except Exception as e: logger.error(f\"信令错误: {e}\") finally: if self.ws: await self.ws.close() async def start_streaming(self): if platform.system() == \"Windows\": try: audio_track = SoundDeviceAudioTrack(device=self.input_device) self.pc.addTrack(audio_track) logger.info(\"已添加 sounddevice 音频轨道\") offer = await self.pc.createOffer() await self.pc.setLocalDescription(offer) logger.info(\"本地 SDP:\\n\" + self.pc.localDescription.sdp) while self.pc.iceGatheringState != \"complete\": await asyncio.sleep(0.1) await self.ws.send(json.dumps({ \"type\": \"offer\", \"sdp\": self.pc.localDescription.sdp })) logger.info(\"已发送 Offer\") except Exception as e: logger.error(f\"音频采集或发送出错: {e}\") else: logger.error(\"该功能仅支持 Windows 系统。\") async def handle_messages(self): async for message in self.ws: try: msg = json.loads(message) if msg[\"type\"] == \"answer\": await self.pc.setRemoteDescription(RTCSessionDescription( sdp=msg[\"sdp\"], type=msg[\"type\"] )) logger.info(\"已接收 Answer\") except Exception as e: logger.error(f\"消息处理错误: {e}\") async def close(self): await self.pc.close()if __name__ == \"__main__\": signaling_server = \"ws://公网IP地址:3000\" room = \"audio\" list_input_devices() index = int(input(\"请输入要使用的输入设备索引号: \")) sender = WebRTCSender(signaling_server, room, input_device=index) loop = asyncio.get_event_loop() try: loop.run_until_complete(sender.connect_signaling_server()) except KeyboardInterrupt: pass finally: loop.run_until_complete(sender.close()) loop.close()
3.3 linux客户端发送音频流与视频流
1) 发送视频,音频数据
import argparseimport asyncioimport jsonimport loggingimport timeimport uuidfrom aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrackfrom av import VideoFrameimport pyrealsense2 as rsimport numpy as npimport websocketsfrom fractions import Fraction# 自定义 WebSocket Signalingclass WebSocketSignaling: def __init__(self, server_address, room): self.server_address = server_address self.room = room self.websocket = None async def connect(self): logging.info(f\"[Signaling] Connecting to {self.server_address}\") self.websocket = await websockets.connect(self.server_address) await self.websocket.send(json.dumps({ \"type\": \"join\", \"room\": self.room })) logging.info(f\"[Signaling] Joined room: {self.room}\") async def send(self, message): if isinstance(message, RTCSessionDescription): msg = { \"type\": message.type, \"sdp\": message.sdp } else: msg = message await self.websocket.send(json.dumps(msg)) logging.info(f\"[Signaling] Sent message: {msg[\'type\']}\") async def receive(self): data = await self.websocket.recv() message = json.loads(data) logging.info(f\"[Signaling] Received message: {message.get(\'type\')}\") if message[\"type\"] in [\"offer\", \"answer\"]: return RTCSessionDescription(sdp=message[\"sdp\"], type=message[\"type\"]) return message async def close(self): logging.info(\"[Signaling] Closing signaling connection\") await self.websocket.close()# Realsense 视频轨道class RealsenseTrack(VideoStreamTrack): def __init__(self): super().__init__() logging.info(\"[Camera] Initializing RealSense pipeline\") self.pipeline = rs.pipeline() config = rs.config() config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30) self.pipeline.start(config) self.frame_count = 0 self.start_time = time.time() logging.info(\"[Camera] RealSense started with 640x480@30fps\") async def recv(self): await asyncio.sleep(1 / 30) # 控制帧率 frames = self.pipeline.wait_for_frames() color_frame = frames.get_color_frame() if not color_frame: logging.warning(\"[Camera] No color frame available\") return await super().recv() img = np.asanyarray(color_frame.get_data()) frame = VideoFrame.from_ndarray(img, format=\"bgr24\") frame.pts = int(time.time() * 1e6) frame.time_base = Fraction(1, 1_000_000) self.frame_count += 1 if self.frame_count % 30 == 0: logging.info(f\"[Camera] Sent {self.frame_count} frames\") return frame def __del__(self): logging.info(\"[Camera] Releasing RealSense pipeline\") self.pipeline.stop()# 主运行函数async def run(pc, signaling): await signaling.connect() logging.info(\"[RTC] Adding Realsense video track to peer connection\") pc.addTrack(RealsenseTrack()) @pc.on(\"iceconnectionstatechange\") async def on_iceconnectionstatechange(): logging.info(f\"[RTC] ICE connection state changed: {pc.iceConnectionState}\") logging.info(\"[RTC] Creating and sending offer\") offer = await pc.createOffer() await pc.setLocalDescription(offer) await signaling.send(pc.localDescription) while True: obj = await signaling.receive() if isinstance(obj, RTCSessionDescription): logging.info(f\"[RTC] Setting remote description: {obj.type}\") await pc.setRemoteDescription(obj) elif obj is None: logging.info(\"[RTC] Received None, exiting loop\") breakif __name__ == \"__main__\": parser = argparse.ArgumentParser() parser.add_argument(\"--signaling-host\", default=\"公网IP\") parser.add_argument(\"--signaling-port\", default=3000) parser.add_argument(\"--signaling-room\", default=\"realsense\") args = parser.parse_args() logging.basicConfig(level=logging.INFO) signaling = WebSocketSignaling( f\"ws://{args.signaling_host}:{args.signaling_port}\", args.signaling_room ) # ✅ 创建带配置的 PeerConnection ice_servers = [ RTCIceServer(urls=[\"stun:stun.l.google.com:19302\"]) ] configuration = RTCConfiguration(ice_servers) pc = RTCPeerConnection(configuration=configuration) loop = asyncio.get_event_loop() try: loop.run_until_complete(run(pc, signaling)) except KeyboardInterrupt: logging.info(\"[Main] Interrupted by user\") finally: logging.info(\"[Main] Closing connections\") loop.run_until_complete(pc.close()) loop.run_until_complete(signaling.close())
2) 接收音频数据并且播放
import asyncioimport loggingimport jsonimport websocketsimport platformimport sounddevice as sdimport numpy as npfrom aiortc import RTCPeerConnection, RTCSessionDescription, RTCConfiguration, RTCIceServer# 日志配置logging.basicConfig(level=logging.INFO)logger = logging.getLogger(\'WebRTC-Receiver\')class WebRTCReceiver: def __init__(self, signaling_server, room): self.signaling_server = signaling_server self.room = room ice_servers = [ RTCIceServer(urls=[\"stun:stun.l.google.com:19302\"]), RTCIceServer(urls=[\"turn:公网ip:3478\"], username=\"username\", credential=\"password\"), ] self.pc = RTCPeerConnection(RTCConfiguration(ice_servers)) self.ws = None async def connect_signaling_server(self): try: self.ws = await websockets.connect(self.signaling_server) await self.ws.send(json.dumps({\"type\": \"join\", \"room\": self.room})) logger.info(\"已连接信令服务器\") await self.handle_messages() except Exception as e: logger.error(f\"信令错误: {e}\") finally: if self.ws: await self.ws.close() async def handle_messages(self): async for message in self.ws: try: msg = json.loads(message) if msg[\"type\"] == \"offer\": self.pc.on(\"track\", self.on_track) await self.pc.setRemoteDescription(RTCSessionDescription( sdp=msg[\"sdp\"], type=msg[\"type\"] )) logger.info(\"已接收 Offer\") answer = await self.pc.createAnswer() await self.pc.setLocalDescription(answer) await self.ws.send(json.dumps({ \"type\": \"answer\", \"sdp\": answer.sdp })) logger.info(\"已发送 Answer\") except Exception as e: logger.error(f\"消息处理错误: {e}\") def on_track(self, track): logger.info(f\"触发 on_track,kind={track.kind}\") if track.kind == \"audio\": logger.info(\"收到音频轨道,开始播放\") asyncio.create_task(self.play_audio(track)) async def play_audio(self, track): try: samplerate = 44100 logger.info(\"正在打开音频播放设备 ...\") stream = sd.OutputStream( samplerate=samplerate, channels=1, dtype=\'int16\', blocksize=120, device=(0,0) # 使用默认输出设备 ) stream.start() logger.info(\"音频播放已启动\") while True: #logger.info(\"等待音频帧...\") frame = await track.recv() logger.info(\"已接收音频帧\") audio = frame.to_ndarray().astype(np.int16) # 明确使用单通道布局 audio = audio.reshape(-1, 1) # 确保为 (N, 1) 形状(单通道) #print(audio.ndim) if audio.ndim == 1: audio = audio.reshape(-1, 1) #elif audio.shape[1] != 1: # logger.warning(f\"轨道通道数不匹配,强制转换为单通道: {audio.shape}\") # audio = audio[:, :1] # 保留第一通道 #logger.info(f\"播放音频 shape={audio.shape}, dtype={audio.dtype}, max={np.max(audio):.4f}\") stream.write(audio) except Exception as e: logger.error(f\"音频播放错误: {e}\") async def close(self): if self.pc: await self.pc.close()if __name__ == \"__main__\": signaling_server = \"ws://公网ip:3000\" room = \"realsense\" receiver = WebRTCReceiver(signaling_server, room) loop = asyncio.get_event_loop() try: loop.run_until_complete(receiver.connect_signaling_server()) except KeyboardInterrupt: pass finally: loop.run_until_complete(receiver.close()) loop.close()
4. 调试结果
1) windows端的html(按F12可以看见html的debug)
2)发送端log,可以看见建立通信的流程
3)signaling 服务的log,可以看见sdp协议
4)coturn的 status
5. 调试工具与建议:
1)如果音频传输不对,考虑音频格式问题,也可以看SDP支持什么解码
2)关于webrtc详细日志
3)在http下无法访问mic,解决的办法可以在localhost下搭建你的html,就能获得
4)c++的实现方法使用的是libdatachannel库
持续修改,有问题请指出