> 技术文档 > EMQX Webhook 调用本地 Supabase Edge Functions

EMQX Webhook 调用本地 Supabase Edge Functions


1. 环境准备

1.1 确认服务状态

# 检查 Supabase 服务状态cd /home/xx/dockerData/supabase/supabase-projectdocker-compose ps​# 确认 Edge Functions 服务运行正常docker-compose logs supabase-edge-functions

1.2 服务地址

  • Supabase Dashboard: http://172.16.9.14:18000

  • Edge Functions 端点: http://172.16.9.14:18000/functions/v1/

2. 数据库表结构

2.1 创建 MQTT 消息表

在 Supabase Dashboard 的 SQL Editor 中执行:

-- 创建 MQTT 消息表CREATE TABLE mqtt_messages ( id BIGSERIAL PRIMARY KEY, message_id TEXT UNIQUE NOT NULL, client_id TEXT NOT NULL, username TEXT, topic TEXT, payload JSONB, qos INTEGER, retain BOOLEAN, timestamp BIGINT, publish_received_at BIGINT, node TEXT, peerhost TEXT, created_at TIMESTAMPTZ DEFAULT NOW());​-- 创建索引提高查询性能CREATE INDEX idx_mqtt_client_id ON mqtt_messages(client_id);CREATE INDEX idx_mqtt_topic ON mqtt_messages(topic);CREATE INDEX idx_mqtt_timestamp ON mqtt_messages(timestamp);

EMQX Webhook 调用本地 Supabase Edge Functions

3. Edge Function 配置

3.1 函数文件位置

/home/xx/dockerData/supabase/supabase-project/volumes/functions/mqtt-webhook/index.ts

3.1 完整的函数代码

// functions/emqx-webhook/index.tsimport { serve } from \'https://cdn.jsdelivr.net/gh/denoland/deno_std@0.131.0/http/server.ts\'import { createClient } from \'https://esm.sh/@supabase/supabase-js@2\'// 处理 CORS 预检请求const corsHeaders = { \'Access-Control-Allow-Origin\': \'*\', \'Access-Control-Allow-Headers\': \'authorization, x-client-info, x-api-key, content-type\',}serve(async (req) => { // 处理 CORS 预检请求 if (req.method === \'OPTIONS\') { return new Response(\'ok\', { headers: corsHeaders }) } // 添加调试日志 // 验证固定的 API Key const apiKey = req.headers.get(\'x-api-key\') const expectedApiKey = Deno.env.get(\'EMQX_WEBHOOK_API_KEY\') || \'AAGGHfBBGBWOOBOJOPJHPJWPJPHOIVOJPJP\' if (!apiKey || apiKey !== expectedApiKey) { return new Response( JSON.stringify({ error: \'Unauthorized\' }), { status: 401, headers: corsHeaders } ) } try { // 只接受 POST 请求 if (req.method !== \'POST\') { return new Response( JSON.stringify({ error: \'Method not allowed\' }), {  status: 405,  headers: { ...corsHeaders, \'Content-Type\': \'application/json\' } } ) } // 解析请求体 const webhookData = await req.json() // 验证是否为消息发布事件 if (webhookData?.event !== \'message.publish\') { return new Response( JSON.stringify({  status: \'ignored\',  message: \'Not a message.publish event\' }), {  status: 200,  headers: { ...corsHeaders, \'Content-Type\': \'application/json\' } } ) } // 创建 Supabase 客户端 const supabaseUrl = Deno.env.get(\'SUPABASE_URL\')! const supabaseKey = Deno.env.get(\'SUPABASE_SERVICE_ROLE_KEY\')! const supabase = createClient(supabaseUrl, supabaseKey) // 解析 payload let parsedPayload try { if (typeof webhookData.payload === \'string\') { parsedPayload = JSON.parse(webhookData.payload) } else { parsedPayload = webhookData.payload } } catch { // 如果解析失败,保存为原始文本 parsedPayload = { raw_text: webhookData.payload } } // 插入数据到数据库 const { data, error } = await supabase .from(\'mqtt_messages\') .insert({ message_id: webhookData.id, client_id: webhookData.clientid, username: webhookData.username === \'undefined\' ? null : webhookData.username, topic: webhookData.topic, payload: parsedPayload, qos: webhookData.qos || 0, retain: webhookData.flags?.retain || false, timestamp: webhookData.timestamp, publish_received_at: webhookData.publish_received_at, node: webhookData.node, peerhost: webhookData.peerhost }) .select() if (error) { console.error(\'Database error:\', error) return new Response( JSON.stringify({  status: \'error\',  message: error.message }), {  status: 500,  headers: { ...corsHeaders, \'Content-Type\': \'application/json\' } } ) } // 返回成功响应 return new Response( JSON.stringify({ status: \'success\', message: \'Message stored successfully\', message_id: webhookData.id }), { status: 200, headers: { ...corsHeaders, \'Content-Type\': \'application/json\' } } ) } catch (error) { console.error(\'Function error:\', error) return new Response( JSON.stringify({ status: \'error\', message: error.message }), { status: 500, headers: { ...corsHeaders, \'Content-Type\': \'application/json\' } } ) }})

5. 服务重启和部署

5.1 重启服务

cd /home/xx/dockerData/supabase/supabase-project​# 重启 Edge Functions 服务docker-compose restart supabase-edge-functions​# 或重启所有服务docker-compose downdocker-compose up -d

5.2 查看日志

# 实时查看 Edge Functions 日志docker-compose logs -f supabase-edge-functions​# 查看所有服务日志docker-compose logs -f

6. 测试验证

6.1 手动测试

curl -X POST \"http://172.16.9.14:18000/functions/v1/mqtt-webhook\" \\ -H \"Content-Type: application/json\" \\ -H \"x-api-key: AAGGHfBBGBWOOBOJOPJHPJWPJPHOIVOJPJP\" \\ -d \'{ \"id\": \"test-edge-003\", \"clientid\": \"test_client\", \"username\": \"test_user\", \"topic\": \"test/edge\", \"payload\": \"{\\\"temperature\\\": 25.5}\", \"qos\": 1, \"timestamp\": 1640995200000, \"publish_received_at\": 1640995200100, \"node\": \"emqx@127.0.0.1\", \"peerhost\": \"192.168.1.100\", \"event\": \"message.publish\", \"flags\": {\"retain\": false, \"dup\": false} }\'

6.2 预期响应

{ \"status\": \"success\", \"message\": \"Message stored successfully\", \"message_id\": \"test-edge-003\"}

EMQX Webhook 调用本地 Supabase Edge Functions