【后端】构建简洁的音频转写系统:基于火山引擎ASR实现_火山引擎 api语音转文字
在当今数字化时代,语音识别技术已经成为许多应用不可或缺的一部分。无论是会议记录、语音助手还是内容字幕,将语音转化为文本的能力对提升用户体验和工作效率至关重要。本文将介绍如何构建一个简洁的音频转写系统,专注于文件上传、云存储以及ASR(自动语音识别)的集成,特别是基于火山引擎ASR服务的实现。
系统架构概览
一个简洁的音频转写系统需要包含以下几个关键组件:
- 前端界面:提供用户上传音频文件的入口
- API服务层:处理请求和业务逻辑
- 云存储服务:安全存储音频文件
- ASR服务:将音频转写为文本(本文使用火山引擎ASR服务)
系统流程如下:
用户 → 上传音频 → 存储到云服务 → 触发ASR转写 → 获取转写结果 → 返回给用户
技术选型
我们的最小实现基于以下技术栈:
- 后端框架:FastAPI(Python)
- 云存储:兼容S3协议的对象存储
- ASR服务:火山引擎ASR服务
- 异步处理:基于asyncio的异步请求处理
详细实现
1. 音频文件上传流程
实现音频上传有两种主要方式:
1.1 预签名URL上传
这种方式适合大文件上传,减轻服务器负担:
async def create_upload_url(file_name, file_size, mime_type): \"\"\"创建上传链接\"\"\" # 生成唯一文件名 timestamp = datetime.now().strftime(\"%Y%m%d%H%M%S\") random_suffix = os.urandom(4).hex() file_ext = os.path.splitext(file_name)[1] filename = f\"{timestamp}_{random_suffix}{file_ext}\" # 生成存储路径 storage_path = f\"audio/{filename}\" # 获取预签名URL upload_url = storage_client.generate_presigned_url( storage_path, expiry=300, #5分钟有效期 http_method=\"PUT\", content_length=file_size ) return { \"upload_url\": upload_url, \"storage_path\": storage_path }
前端调用示例:
// 1. 获取上传URLconst response = await fetch(\'/api/audio/upload-url\', { method: \'POST\', headers: { \'Content-Type\': \'application/json\' }, body: JSON.stringify({ file_name: file.name, file_size: file.size, mime_type: file.type })});const { upload_url, storage_path } = await response.json();// 2. 使用预签名URL上传文件await fetch(upload_url, { method: \'PUT\', body: file, headers: { \'Content-Type\': file.type }});// 3. 触发转写const transcriptResponse = await fetch(\'/api/audio/transcribe\', { method: \'POST\', headers: { \'Content-Type\': \'application/json\' }, body: JSON.stringify({ storage_path })});const transcriptResult = await transcriptResponse.json();
1.2 直接上传方式
适合较小文件,通过API直接上传:
async def upload_audio(file): \"\"\"直接上传音频文件\"\"\" # 验证文件类型 if file.content_type not in ALLOWED_AUDIO_TYPES: raise ValueError(\"不支持的文件类型\") # 读取文件内容 contents = await file.read() if len(contents) == 0: raise ValueError(\"文件内容为空\") # 生成唯一文件名 timestamp = datetime.now().strftime(\"%Y%m%d%H%M%S\") random_suffix = os.urandom(4).hex() file_ext = os.path.splitext(file.filename)[1] filename = f\"{timestamp}_{random_suffix}{file_ext}\" # 存储路径 storage_path = f\"audio/{filename}\" # 上传到云存储 storage_client.upload(storage_path, contents) # 生成访问URL access_url = storage_client.generate_presigned_url( storage_path, expiry=3600, # 1小时有效期 http_method=\"GET\" ) return { \"file_name\": file.filename, \"storage_path\": storage_path, \"file_size\": len(contents), \"mime_type\": file.content_type, \"access_url\": access_url, \"url_expires_at\": datetime.now() + timedelta(hours=1) }
2. ASR语音转写实现
可以通过两种方式调用ASR服务:基于存储路径或直接通过URL。
2.1 基于存储路径的转写
async def transcribe_audio_by_storage_path(storage_path): \"\"\"通过存储路径转写音频文件\"\"\" # 生成可访问的URL access_url = storage_client.generate_presigned_url( storage_path, expiry=3600, http_method=\"GET\" ) # 调用ASR服务 transcript_result = await _call_asr_service(access_url) return { \"storage_path\": storage_path, \"transcript\": transcript_result.get(\"text\", \"\"), \"segments\": transcript_result.get(\"segments\", []), \"duration\": transcript_result.get(\"duration\") }
2.2 基于URL的转写
async def transcribe_audio_by_url(audio_url): \"\"\"通过URL转写音频\"\"\" # 调用ASR服务 transcript_result = await _call_asr_service(audio_url) return { \"audio_url\": audio_url, \"transcript\": transcript_result.get(\"text\", \"\"), \"segments\": transcript_result.get(\"segments\", []), \"duration\": transcript_result.get(\"duration\") }
2.3 上传并立即转写
async def upload_and_transcribe(file): \"\"\"上传并立即转写音频文件\"\"\" # 上传文件 upload_result = await upload_audio(file) # 转写音频 transcript_result = await _call_asr_service(upload_result[\"access_url\"]) # 组合结果 return { \"file_name\": upload_result[\"file_name\"], \"storage_path\": upload_result[\"storage_path\"], \"file_size\": upload_result[\"file_size\"], \"mime_type\": upload_result[\"mime_type\"], \"access_url\": upload_result[\"access_url\"], \"transcript\": transcript_result.get(\"text\", \"\"), \"segments\": transcript_result.get(\"segments\", []), \"duration\": transcript_result.get(\"duration\") }
3. 火山引擎ASR服务调用实现
以下是基于火山引擎ASR服务的详细实现:
async def _call_asr_service(audio_url): \"\"\"调用火山引擎ASR服务进行转写\"\"\" # 生成唯一任务ID task_id = str(uuid.uuid4()) # 火山引擎ASR服务API端点 submit_url = \"https://openspeech.bytedance.com/api/v3/auc/bigmodel/submit\" query_url = \"https://openspeech.bytedance.com/api/v3/auc/bigmodel/query\" # 构建请求头 headers = { \"Content-Type\": \"application/json\", \"X-Api-App-Key\": APP_KEY, \"X-Api-Access-Key\": ACCESS_KEY, \"X-Api-Resource-Id\": \"volc.bigasr.auc\", \"X-Api-Request-Id\": task_id, \"X-Api-Sequence\": \"-1\" } # 请求体 payload = { \"audio\": { \"url\": audio_url } } # 提交转写任务 async with aiohttp.ClientSession() as session: async with session.post(submit_url, headers=headers, data=json.dumps(payload)) as response: if response.status != 200: error_detail = await response.text() raise ValueError(f\"提交ASR任务失败: {error_detail}\") response_headers = response.headers status_code = response_headers.get(\"X-Api-Status-Code\") log_id = response_headers.get(\"X-Tt-Logid\", \"\") if status_code not in [\"20000000\", \"20000001\", \"20000002\"]: raise ValueError(f\"ASR任务提交错误: {response_headers.get(\'X-Api-Message\', \'未知错误\')}\") # 轮询查询结果 max_retries = 10 for i in range(max_retries): # 等待一段时间再查询 await asyncio.sleep(0.5) # 查询转写结果 async with aiohttp.ClientSession() as session: query_headers = { \"Content-Type\": \"application/json\", \"X-Api-App-Key\": APP_KEY, \"X-Api-Access-Key\": ACCESS_KEY, \"X-Api-Resource-Id\": \"volc.bigasr.auc\", \"X-Api-Request-Id\": task_id, \"X-Tt-Logid\": log_id } async with session.post( query_url, headers=query_headers, data=json.dumps({}) ) as response: if response.status != 200: continue query_status_code = response.headers.get(\"X-Api-Status-Code\") # 如果完成,返回结果 if query_status_code == \"20000000\": try: response_data = await response.json() result = response_data.get(\"result\", {}) text = result.get(\"text\", \"\") utterances = result.get(\"utterances\", []) return {\"text\": text, \"utterances\": utterances} except Exception as e: raise ValueError(f\"解析ASR响应失败: {str(e)}\") # 如果仍在处理,继续等待 elif query_status_code in [\"20000001\", \"20000002\"]: await asyncio.sleep(0.5) continue else: error_message = response.headers.get(\"X-Api-Message\", \"未知错误\") raise ValueError(f\"ASR任务查询失败: {error_message}\") # 超过最大重试次数 raise ValueError(\"ASR转写超时,请稍后重试\")
4. API接口设计
完整的API接口设计,专注于最小功能实现:
# 1. 获取上传URL@router.post(\"/audio/upload-url\")async def create_upload_url(request: dict): return await audio_service.create_upload_url( request[\"file_name\"], request[\"file_size\"], request[\"mime_type\"] )# 2. 直接上传音频@router.post(\"/audio/upload\")async def upload_audio(file: UploadFile): return await audio_service.upload_audio(file)# 3. 转写音频 (通过存储路径)@router.post(\"/audio/transcribe\")async def transcribe_audio(request: dict): return await audio_service.transcribe_audio_by_storage_path(request[\"storage_path\"])# 4. 通过URL转写音频@router.post(\"/audio/transcribe-by-url\")async def transcribe_by_url(request: dict): return await audio_service.transcribe_audio_by_url(request[\"audio_url\"])# 5. 上传并转写音频@router.post(\"/audio/upload-and-transcribe\")async def upload_and_transcribe(file: UploadFile): return await audio_service.upload_and_transcribe(file)
性能与可靠性优化
在实际生产环境中,我们还应关注以下几点:
1. 大文件处理
对于大型音频文件,应当:
- 使用分块上传方式
- 实现断点续传
- 限制文件大小
- 采用预签名URL方式,避免通过API服务器中转
2. 错误处理和重试
增强系统稳定性:
- 实现指数退避重试策略
- 添加详细日志记录
- 设置超时处理
3. 安全性考虑
保护用户数据:
- 实现访问控制
- 对音频URL设置短期有效期
- 考虑临时文件清理机制
完整示例:构建最小可行实现
下面是一个使用FastAPI构建的基于火山引擎ASR的最小可行实现示例:
import osimport uuidimport jsonimport asyncioimport aiohttpfrom datetime import datetime, timedeltafrom fastapi import FastAPI, UploadFile, Filefrom typing import Dict, Any, Optionalapp = FastAPI()# 配置项ALLOWED_AUDIO_TYPES = [\"audio/mpeg\", \"audio/wav\", \"audio/mp4\", \"audio/x-m4a\"]APP_KEY = os.getenv(\"VOLCANO_ASR_APP_ID\")ACCESS_KEY = os.getenv(\"VOLCANO_ASR_ACCESS_TOKEN\")# 简单的存储客户端模拟class SimpleStorageClient: def upload(self, path, content): # 实际项目中应连接到S3、OSS等云存储 print(f\"Uploading {len(content)} bytes to {path}\") return True def generate_presigned_url(self, path, expiry=3600, http_method=\"GET\", **kwargs): # 简化示例,实际应返回带签名的URL return f\"https://storage-example.com/{path}?expires={expiry}&method={http_method}\"storage_client = SimpleStorageClient()# API端点@app.post(\"/audio/upload-url\")async def create_upload_url(file_name: str, file_size: int, mime_type: str): \"\"\"获取上传URL\"\"\" timestamp = datetime.now().strftime(\"%Y%m%d%H%M%S\") random_suffix = os.urandom(4).hex() file_ext = os.path.splitext(file_name)[1] filename = f\"{timestamp}_{random_suffix}{file_ext}\" storage_path = f\"audio/{filename}\" upload_url = storage_client.generate_presigned_url( storage_path, expiry=300, http_method=\"PUT\", content_length=file_size ) return { \"upload_url\": upload_url, \"storage_path\": storage_path }@app.post(\"/audio/upload\")async def upload_audio(file: UploadFile = File(...)): \"\"\"直接上传音频文件\"\"\" if file.content_type not in ALLOWED_AUDIO_TYPES: return {\"error\": \"不支持的文件类型\"} contents = await file.read() if len(contents) == 0: return {\"error\": \"文件内容为空\"} timestamp = datetime.now().strftime(\"%Y%m%d%H%M%S\") random_suffix = os.urandom(4).hex() file_ext = os.path.splitext(file.filename)[1] filename = f\"{timestamp}_{random_suffix}{file_ext}\" storage_path = f\"audio/{filename}\" storage_client.upload(storage_path, contents) access_url = storage_client.generate_presigned_url( storage_path, expiry=3600, http_method=\"GET\" ) return { \"file_name\": file.filename, \"storage_path\": storage_path, \"file_size\": len(contents), \"mime_type\": file.content_type, \"access_url\": access_url, \"url_expires_at\": (datetime.now() + timedelta(hours=1)).isoformat() }@app.post(\"/audio/transcribe\")async def transcribe_audio(storage_path: str): \"\"\"通过存储路径转写音频\"\"\" access_url = storage_client.generate_presigned_url( storage_path, expiry=3600, http_method=\"GET\" ) transcript_result = await _call_volcano_asr(access_url) return { \"storage_path\": storage_path, \"transcript\": transcript_result }@app.post(\"/audio/transcribe-by-url\")async def transcribe_by_url(audio_url: str): \"\"\"通过URL转写音频\"\"\" transcript_result = await _call_volcano_asr(audio_url) return { \"audio_url\": audio_url, \"transcript\": transcript_result }@app.post(\"/audio/upload-and-transcribe\")async def upload_and_transcribe(file: UploadFile = File(...)): \"\"\"上传并转写音频文件\"\"\" upload_result = await upload_audio(file) if \"error\" in upload_result: return upload_result transcript_result = await _call_volcano_asr(upload_result[\"access_url\"]) return { **upload_result, \"transcript\": transcript_result }async def _call_volcano_asr(audio_url): \"\"\"调用火山引擎ASR服务\"\"\" if not APP_KEY or not ACCESS_KEY: return {\"text\": \"火山引擎ASR配置缺失,请设置环境变量\"} # 生成任务ID task_id = str(uuid.uuid4()) # 火山引擎ASR服务API端点 submit_url = \"https://openspeech.bytedance.com/api/v3/auc/bigmodel/submit\" query_url = \"https://openspeech.bytedance.com/api/v3/auc/bigmodel/query\" # 提交请求头 headers = { \"Content-Type\": \"application/json\", \"X-Api-App-Key\": APP_KEY, \"X-Api-Access-Key\": ACCESS_KEY, \"X-Api-Resource-Id\": \"volc.bigasr.auc\", \"X-Api-Request-Id\": task_id, \"X-Api-Sequence\": \"-1\" } # 请求体 payload = { \"audio\": { \"url\": audio_url } } try: # 提交任务 async with aiohttp.ClientSession() as session: async with session.post(submit_url, headers=headers, data=json.dumps(payload)) as response: status_code = response.headers.get(\"X-Api-Status-Code\") log_id = response.headers.get(\"X-Tt-Logid\", \"\") if status_code not in [\"20000000\", \"20000001\", \"20000002\"]: return {\"error\": f\"提交转写任务失败: {response.headers.get(\'X-Api-Message\', \'未知错误\')}\"} # 查询结果 max_retries = 10 for i in range(max_retries): await asyncio.sleep(1) # 等待1秒 # 查询请求头 query_headers = { \"Content-Type\": \"application/json\", \"X-Api-App-Key\": APP_KEY, \"X-Api-Access-Key\": ACCESS_KEY, \"X-Api-Resource-Id\": \"volc.bigasr.auc\", \"X-Api-Request-Id\": task_id, \"X-Tt-Logid\": log_id } async with aiohttp.ClientSession() as session: async with session.post(query_url, headers=query_headers, data=\"{}\") as response: query_status = response.headers.get(\"X-Api-Status-Code\") if query_status == \"20000000\": # 转写完成 result = await response.json() text = result.get(\"result\", {}).get(\"text\", \"\") return {\"text\": text} elif query_status in [\"20000001\", \"20000002\"]: # 处理中 continue else: return {\"error\": f\"查询转写结果失败: {response.headers.get(\'X-Api-Message\', \'未知错误\')}\"} return {\"error\": \"转写超时,请稍后查询结果\"} except Exception as e: return {\"error\": f\"转写过程发生错误: {str(e)}\"}if __name__ == \"__main__\": import uvicorn uvicorn.run(app, host=\"0.0.0.0\", port=8000)
结论
构建一个简洁的音频转写系统可以不依赖数据库,只需要专注于文件上传、获取URL和ASR转写三个核心功能。通过集成火山引擎ASR服务,我们可以快速实现高质量的语音转文本功能,无需自行构建复杂的语音识别模型。
本文的最小可行实现充分利用了火山引擎ASR的API功能,提供了一个完整的工作流程,包括文件上传、URL生成和转写调用。这种方式不仅开发效率高,而且可以在不断迭代中逐步增强功能。
进一步的拓展方向
在有了最小可行实现后,可以考虑以下拓展:
- 添加数据库存储转写历史
- 实现用户认证和授权
- 支持实时语音转写
- 多语言转写支持
- 说话人分离功能
- 情感分析集成
- 关键词提取和主题识别