局域网TCP通过组播放地址rtp推流和拉流实现实时喊话_rtp:239.93.22.242:9924
应用场景,安卓端局域网不用ip通过组播放地址实现实时对讲功能
发送端: ffmpeg -f alsa -i hw:1 -acodec aac -ab 64k -ac 2 -ar 16000 -frtp -sdp file stream.sdp rtp://224.0.0.1:14556
接收端: ffmpeg -protocol whitelist file,udp,rtp -i stream.sdp -acodec pcm s16le -ar 16000 -ac 2 -f alsa default
在windows上测试通过后然后在安卓中实现
# 查询本地可用麦克风设备
ffmpeg -list_devices true -f dshow -i dummy
麦克风 (Realtek(R) Audio)这是我电脑的
# windows 执行RTM推音频流
ffmpeg -f dshow -i audio=\"麦克风 (Realtek(R) Audio)\" -acodec aac -ab 64k -ac 2 -ar 16000 -f rtp -sdp_file stream.sdp rtp://239.0.0.1:15556
上面windows上调通后接下来在安卓上实现
implementation(\"com.arthenica:mobile-ffmpeg-full:4.4.LTS\")主要用到这个库
package com.xhx.megaphone.tcpimport android.media.AudioFormatimport android.media.AudioRecordimport android.media.MediaRecorderimport com.arthenica.mobileffmpeg.Configimport com.arthenica.mobileffmpeg.FFmpegimport com.blankj.utilcode.util.LogUtilsimport com.xhx.megaphone.Appimport kotlinx.coroutines.Dispatchersimport kotlinx.coroutines.GlobalScopeimport kotlinx.coroutines.launchimport java.io.Fileimport java.io.FileOutputStreamimport java.io.IOExceptionimport java.util.concurrent.atomic.AtomicBoolean/** * 实时推流助手 - 最优化版本 * * 功能: * 1. 录音buffer直接实时写入临时文件 * 2. FFmpeg同时读取文件进行推流 * 3. 最小化延迟的实时推流 */object LiveStreamingHelper { private const val TAG = \"LiveStreamingHelper\" // 组播配置 private const val MULTICAST_ADDRESS = \"239.0.0.1\" private const val MULTICAST_PORT = 15556 // 音频参数 private const val SAMPLE_RATE = 16000 private const val CHANNELS = 1 private const val BIT_RATE = 64000 private const val AUDIO_FORMAT = AudioFormat.ENCODING_PCM_16BIT // 缓冲区大小 - 使用较小的缓冲区减少延迟 private val BUFFER_SIZE = AudioRecord.getMinBufferSize( SAMPLE_RATE, AudioFormat.CHANNEL_IN_MONO, AUDIO_FORMAT ).let { minSize -> // 使用最小缓冲区的2倍,减少延迟 minSize * 2 } // 推流状态 private val isStreaming = AtomicBoolean(false) private var audioRecord: AudioRecord? = null private var recordingThread: Thread? = null private var ffmpegExecutionId: Long = 0 // 文件路径 private val cacheDir = File(App.ctx.cacheDir, \"live_streaming\") private val sdpFile = File(cacheDir, \"stream.sdp\") private val liveAudioFile = File(cacheDir, \"live_audio.pcm\") /** * 开始实时录音推流 */ fun startStreaming(): Boolean { if (isStreaming.get()) { LogUtils.w(TAG, \"推流已在进行中\") return false } return try { initializeFiles() createSdpFile() startAudioRecording() startLiveStreaming() isStreaming.set(true) LogUtils.i(TAG, \"✅ 实时推流启动成功\") true } catch (e: Exception) { LogUtils.e(TAG, \"❌ 实时推流启动失败\", e) stopStreaming() false } } /** * 停止推流 */ fun stopStreaming() { if (!isStreaming.get()) { return } isStreaming.set(false) // 停止录音 audioRecord?.stop() audioRecord?.release() audioRecord = null // 停止录音线程 recordingThread?.interrupt() recordingThread = null // 停止FFmpeg if (ffmpegExecutionId != 0L) { FFmpeg.cancel(ffmpegExecutionId) ffmpegExecutionId = 0 } LogUtils.i(TAG, \"🛑 实时推流已停止\") } /** * 获取推流状态 */ fun isStreaming(): Boolean = isStreaming.get() /** * 获取SDP文件路径 */ fun getSdpFilePath(): String = sdpFile.absolutePath /** * 获取组播地址信息 */ fun getMulticastInfo(): String { val fileSize = if (liveAudioFile.exists()) { \"${liveAudioFile.length() / 1024}KB\" } else { \"0KB\" } return \"组播地址: $MULTICAST_ADDRESS:$MULTICAST_PORT\\n\" + \"SDP文件: ${sdpFile.absolutePath}\\n\" + \"传输方式: 实时文件流\\n\" + \"缓冲区大小: ${BUFFER_SIZE}字节\\n\" + \"当前文件大小: $fileSize\\n\" + \"推流状态: ${if (isStreaming.get()) \"进行中\" else \"已停止\"}\" } /** * 初始化文件和目录 */ private fun initializeFiles() { if (!cacheDir.exists()) { cacheDir.mkdirs() } // 清理旧文件 if (liveAudioFile.exists()) { liveAudioFile.delete() } // 创建新的音频文件 liveAudioFile.createNewFile() } /** * 创建SDP文件 */ private fun createSdpFile() { val sdpContent = \"\"\" v=0 o=- 0 0 IN IP4 127.0.0.1 s=No Name c=IN IP4 $MULTICAST_ADDRESS t=0 0 a=tool:libavformat 58.45.100 m=audio $MULTICAST_PORT RTP/AVP 97 b=AS:64 a=rtpmap:97 MPEG4-GENERIC/$SAMPLE_RATE/$CHANNELS a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=141056E500 \"\"\".trimIndent() sdpFile.writeText(sdpContent) LogUtils.i(TAG, \"SDP文件创建成功: ${sdpFile.absolutePath}\") } /** * 开始音频录音 - 直接实时写入文件 */ private fun startAudioRecording() { audioRecord = AudioRecord( MediaRecorder.AudioSource.MIC, SAMPLE_RATE, AudioFormat.CHANNEL_IN_MONO, AUDIO_FORMAT, BUFFER_SIZE ) if (audioRecord?.state != AudioRecord.STATE_INITIALIZED) { throw IOException(\"AudioRecord初始化失败\") } audioRecord?.startRecording() // 启动录音线程,实时写入文件 recordingThread = Thread { val buffer = ByteArray(BUFFER_SIZE) var fileOutputStream: FileOutputStream? = null var totalBytes = 0 var lastLogTime = System.currentTimeMillis() try { fileOutputStream = FileOutputStream(liveAudioFile, false) // 不追加,覆盖写入 LogUtils.i(TAG, \"录音线程启动,实时写入: ${liveAudioFile.absolutePath}\") LogUtils.i(TAG, \"缓冲区大小: $BUFFER_SIZE 字节\") while (isStreaming.get() && !Thread.currentThread().isInterrupted) { val bytesRead = audioRecord?.read(buffer, 0, buffer.size) ?: 0 if (bytesRead > 0) { // 立即写入文件并刷新 fileOutputStream.write(buffer, 0, bytesRead) fileOutputStream.flush() totalBytes += bytesRead // 每3秒打印一次状态(更频繁的状态更新) val currentTime = System.currentTimeMillis() if (currentTime - lastLogTime > 3000) { LogUtils.d(TAG, \"🎙️ 实时录音中: ${totalBytes / 1024}KB, 速率: ${(totalBytes / ((currentTime - (lastLogTime - 3000)) / 1000.0) / 1024).toInt()}KB/s\") lastLogTime = currentTime } } else if (bytesRead == AudioRecord.ERROR_INVALID_OPERATION) { LogUtils.e(TAG, \"AudioRecord读取错误: ERROR_INVALID_OPERATION\") break } else if (bytesRead LogUtils.i(TAG, \"FFmpeg推流结束: executionId=$executionId, returnCode=$returnCode\") when (returnCode) { Config.RETURN_CODE_SUCCESS -> { LogUtils.i(TAG, \"✅ 推流正常结束\") } Config.RETURN_CODE_CANCEL -> { LogUtils.i(TAG, \"🛑 推流被用户取消\") } else -> { LogUtils.w(TAG, \"⚠️ 推流异常结束,返回码: $returnCode\") } } } LogUtils.i(TAG, \"FFmpeg执行ID: $ffmpegExecutionId\") } }}
拉流
package com.xhx.megaphone.tcpimport android.media.AudioFormatimport android.media.AudioManagerimport android.media.AudioTrackimport com.arthenica.mobileffmpeg.Configimport com.arthenica.mobileffmpeg.FFmpegimport com.blankj.utilcode.util.LogUtilsimport com.xhx.megaphone.Appimport kotlinx.coroutines.Dispatchersimport kotlinx.coroutines.GlobalScopeimport kotlinx.coroutines.launchimport java.io.Fileimport java.io.RandomAccessFileimport java.util.concurrent.atomic.AtomicBooleanimport java.util.concurrent.atomic.AtomicLong/** * 低延迟拉流播放助手 * * 优化策略: * 1. 最小化FFmpeg缓冲 * 2. 减少AudioTrack缓冲区 * 3. 更频繁的数据读取 * 4. 优化文件IO */object LowLatencyPullHelper { private const val TAG = \"LowLatencyPullHelper\" // 音频参数 private const val SAMPLE_RATE = 16000 private const val CHANNELS = 1 private const val AUDIO_FORMAT = AudioFormat.ENCODING_PCM_16BIT // 低延迟参数 private const val SMALL_BUFFER_SIZE = 1024 // 使用更小的缓冲区 private const val READ_INTERVAL_MS = 20 // 更频繁的读取间隔 // 拉流状态 private val isPulling = AtomicBoolean(false) private var ffmpegExecutionId: Long = 0 private var audioTrack: AudioTrack? = null private var playbackThread: Thread? = null // 文件读取位置 private val fileReadPosition = AtomicLong(0) // 文件路径 private val cacheDir = File(App.ctx.cacheDir, \"low_latency_pull\") private val outputPcmFile = File(cacheDir, \"realtime_audio.pcm\") /** * 开始低延迟拉流播放 */ fun startPulling(sdpFilePath: String): Boolean { if (isPulling.get()) { LogUtils.w(TAG, \"拉流已在进行中\") return false } val sdpFile = File(sdpFilePath) if (!sdpFile.exists()) { LogUtils.e(TAG, \"SDP文件不存在: $sdpFilePath\") return false } return try { initializeFiles() startLowLatencyDecoding(sdpFilePath) startLowLatencyPlayback() isPulling.set(true) fileReadPosition.set(0) LogUtils.i(TAG, \"✅ 低延迟拉流播放启动成功\") true } catch (e: Exception) { LogUtils.e(TAG, \"❌ 低延迟拉流播放启动失败\", e) stopPulling() false } } /** * 停止拉流 */ fun stopPulling() { if (!isPulling.get()) { return } isPulling.set(false) // 停止FFmpeg if (ffmpegExecutionId != 0L) { FFmpeg.cancel(ffmpegExecutionId) ffmpegExecutionId = 0 } // 停止音频播放 audioTrack?.stop() audioTrack?.release() audioTrack = null // 停止播放线程 playbackThread?.interrupt() playbackThread = null LogUtils.i(TAG, \"🛑 低延迟拉流已停止\") } /** * 获取拉流状态 */ fun isPulling(): Boolean = isPulling.get() /** * 获取拉流信息 */ fun getPullInfo(): String { val fileSize = if (outputPcmFile.exists()) { \"${outputPcmFile.length() / 1024}KB\" } else { \"0KB\" } return \"拉流状态: ${if (isPulling.get()) \"进行中\" else \"已停止\"}\\n\" + \"解码文件: ${outputPcmFile.absolutePath}\\n\" + \"文件大小: $fileSize\\n\" + \"读取位置: ${fileReadPosition.get() / 1024}KB\\n\" + \"优化模式: 低延迟\" } /** * 初始化文件和目录 */ private fun initializeFiles() { if (!cacheDir.exists()) { cacheDir.mkdirs() } // 清理旧文件 if (outputPcmFile.exists()) { outputPcmFile.delete() } } /** * 启动低延迟FFmpeg解码 */ private fun startLowLatencyDecoding(sdpFilePath: String) { GlobalScope.launch(Dispatchers.IO) { // 减少等待时间 Thread.sleep(500) // 构建超低延迟FFmpeg解码命令 val command = \"-protocol_whitelist file,udp,rtp \" + \"-fflags +nobuffer+flush_packets \" + // 禁用缓冲并立即刷新 \"-flags low_delay \" + // 低延迟模式 \"-probesize 32 \" + // 最小探测大小 \"-analyzeduration 0 \" + // 不分析流 \"-max_delay 0 \" + // 最大延迟为0 \"-reorder_queue_size 0 \" + // 禁用重排序队列 \"-rw_timeout 3000000 \" + // 3秒超时 \"-i $sdpFilePath \" + \"-acodec pcm_s16le \" + \"-ar $SAMPLE_RATE \" + \"-ac $CHANNELS \" + \"-f s16le \" + \"-flush_packets 1 \" + // 立即刷新数据包 \"${outputPcmFile.absolutePath}\" LogUtils.i(TAG, \"低延迟FFmpeg解码命令: $command\") ffmpegExecutionId = FFmpeg.executeAsync(command) { executionId, returnCode -> LogUtils.i(TAG, \"FFmpeg解码结束: executionId=$executionId, returnCode=$returnCode\") when (returnCode) { Config.RETURN_CODE_SUCCESS -> { LogUtils.i(TAG, \"✅ 解码正常结束\") } Config.RETURN_CODE_CANCEL -> { LogUtils.i(TAG, \"🛑 解码被用户取消\") } else -> { LogUtils.w(TAG, \"⚠️ 解码异常结束,返回码: $returnCode\") } } } } } /** * 启动低延迟音频播放 */ private fun startLowLatencyPlayback() { // 使用最小缓冲区 val minBufferSize = AudioTrack.getMinBufferSize( SAMPLE_RATE, AudioFormat.CHANNEL_OUT_MONO, AUDIO_FORMAT ) // 使用稍大于最小缓冲区的大小,但不要太大 val bufferSize = minBufferSize * 2 audioTrack = AudioTrack( AudioManager.STREAM_MUSIC, SAMPLE_RATE, AudioFormat.CHANNEL_OUT_MONO, AUDIO_FORMAT, bufferSize, AudioTrack.MODE_STREAM ) // 设置低延迟模式(API 26+) try { if (android.os.Build.VERSION.SDK_INT >= android.os.Build.VERSION_CODES.O) { val audioAttributes = android.media.AudioAttributes.Builder() .setUsage(android.media.AudioAttributes.USAGE_MEDIA) .setContentType(android.media.AudioAttributes.CONTENT_TYPE_MUSIC) .setFlags(android.media.AudioAttributes.FLAG_LOW_LATENCY) .build() audioTrack = AudioTrack.Builder() .setAudioAttributes(audioAttributes) .setAudioFormat( AudioFormat.Builder() .setEncoding(AUDIO_FORMAT) .setSampleRate(SAMPLE_RATE) .setChannelMask(AudioFormat.CHANNEL_OUT_MONO) .build() ) .setBufferSizeInBytes(bufferSize) .setTransferMode(AudioTrack.MODE_STREAM) .build() } } catch (e: Exception) { LogUtils.w(TAG, \"无法设置低延迟AudioTrack,使用默认配置\", e) } audioTrack?.play() LogUtils.i(TAG, \"AudioTrack初始化完成,缓冲区大小: $bufferSize\") // 启动高频率播放线程 playbackThread = Thread { val buffer = ByteArray(SMALL_BUFFER_SIZE) var totalPlayed = 0 var lastLogTime = System.currentTimeMillis() LogUtils.i(TAG, \"低延迟音频播放线程启动\") while (isPulling.get() && !Thread.currentThread().isInterrupted) { try { if (outputPcmFile.exists()) { val currentFileSize = outputPcmFile.length() val currentReadPos = fileReadPosition.get() // 如果有新数据可读 if (currentFileSize > currentReadPos) { RandomAccessFile(outputPcmFile, \"r\").use { randomAccessFile -> randomAccessFile.seek(currentReadPos) val bytesRead = randomAccessFile.read(buffer) if (bytesRead > 0) { audioTrack?.write(buffer, 0, bytesRead) totalPlayed += bytesRead fileReadPosition.addAndGet(bytesRead.toLong()) // 每2秒打印一次状态 val currentTime = System.currentTimeMillis() if (currentTime - lastLogTime > 2000) { LogUtils.d(TAG, \"🔊 低延迟播放: ${totalPlayed / 1024}KB, 延迟: ${(currentFileSize - currentReadPos) / 32}ms\") lastLogTime = currentTime } } } } } // 高频率检查,减少延迟 Thread.sleep(READ_INTERVAL_MS.toLong()) } catch (e: Exception) { LogUtils.e(TAG, \"低延迟播放异常\", e) Thread.sleep(100) } } LogUtils.i(TAG, \"低延迟播放线程结束,总计播放: ${totalPlayed / 1024}KB\") } playbackThread?.start() }}