> 文档中心 > edgex进阶系列-5.与规则引擎eKuiper联动(图拓扑规则)

edgex进阶系列-5.与规则引擎eKuiper联动(图拓扑规则)


5 与规则引擎eKuiper联动(图拓扑规则)

LF Edge eKuiper 是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。eKuiper是一个规则引擎,即按一定的规则去处理数据。eKuiper灵活地解决了:从哪里取数,进行什么处理,处理完发到哪去 这些问题。

eKuiper虽然不是edgex的一部分,但与edgex做了比较深的集成。

关于eKuiper的基本使用,可以参考官方文档:https://ekuiper.org/docs/zh/latest/ 。

本文仅专注于如何在edgex中使用eKuiper的图拓扑规则来进行一些数据的处理。同时指出eKuiper哪些能做到,哪些做不到,有哪些坑。

注:基于eKuiper版本1.6.2。

场景

在物联网的场景下,一些设备的数据是需要进行加工或处理的,比如:

  1. 做一些简单计算
    典型的是温度采集上来都是int型的,需要除以10或100再-40得到真实值。这种线型标定,可以在采集时进行。比如edgex中的device-modbu采集服务就可以通过配置来实现。
    但如果采集阶段不由我们控制,我们不好干涉,需要上报后处理,那一种办法是在自定义的application service中写逻辑处理,也可以使用eKuiper进行处理。

  2. 进行采样或聚合
    想计算下每分钟的平均温度,然后再发送到云端或者发送到application service中去处理或者触发对设备的控制。这种场景非常适合使用eKuiper进行。

因此,希望通过edgex和eKuiper的图规则来实现以下场景:

  1. eKuiper从edgex接收设备数据
  2. 对数据进行计算
  3. 把计算结果作为一个新的属性 发回到edgex中
  4. 同时,把计算结果发到mqtt server上。

如图所示:

配置

准备工作

  1. 启动edgex的核心模块
  2. 启动eKuiper,最好启动eKuiper UI
  3. 启动一个mqtt server
  4. 定义设备profile和设备
  5. 通过device-service向消息总线发送数据

然后,开始配置图拓扑规则。图拓扑规则的API的body数据格式如下:

{  "id": "graph_rule_demo",   //规则id,唯一标识  "name": "Graph Rule - control",  //规则名称  "graph": {    "nodes": {     //节点列表,定义图上的节点      "local-edgex-source": { "type": "source", "nodeType": "edgex", "props": {   "confKey": "my_conf" }      },      ……    },    "topo": {     //关系列表,定义图中的连线      "sources": ["local-edgex-source"],      "edges": { "local-edgex-source": ["device-filter"], ……      }    }  }}

源配置

关于图拓扑配置的官方文档都很少。

拓扑中nodes的json结构如下:

  "your-source-name": {    "type": "source",    "nodeType": "edgex",    "props": {      ……    }  }

然后你看官方文档提到的配置:

看似你能在props下面去自己定义这些参数。但实际上,这些属性都不能生效。也就是说:

  1. 如果不指定confKey,那就会用default的配置,定义的这些参数也不能覆盖default中的定义
  2. 如果指定了confKey,那定义的这些参数也不能覆盖confKey中的定义
    因此,想在props中动态的指定topic,来实现过滤设备或属性的办法是不行的。

总结一下,目前从edgex的消息总线中接收数据,有2种方式:

  1. 用default配置,配置内容如下:
name: defaultmessageType: eventport: 6379protocol: redisserver: edgex-redistopic: rules-eventstype: redis

这种情况下,node配置中props缺省就行了。

  "your-source-name": {    "type": "source",    "nodeType": "edgex",    "props": {    }  }

但其实展示的是错的,实际订阅的topic是edgex/events/device/#,这在ekuiper控制台日志里可以看到。

  1. 如果redis地址不是这个,要先配一个自定义的配置,配置内容如下:
name: my_confmessageType: request    //如果从消息总线,这里必须是requestport: 6379protocol: redisserver: edgex-redis//实际redis的ip或域名topic: edgex/events/device/#     //这个topic可以按需进行,但要按照edgex的层级type: redis

注意:
如果从消息总线中接收消息,那messageType要使用request。 topic前缀为edgex/events/device,并遵循edgex的层级结构:edgex/events/device///

但其实dtos.Event 和 requests.AddEventRequest 的区别不大,从源码看,AddEventRequest在Event外面包了一层:

此时,props中的confKey配为已有配置即可。

"your-source-name": {    "type": "source",    "nodeType": "edgex",    "props": {      "confKey": "my_conf"    }},

规则配置

图拓扑中的规则配置,请参考官方文档:https://ekuiper.org/docs/zh/latest/rules/graph_rule.html

但文档中都是些简单的示例。所以我们要把之前的SQL转换成json的配置项,只能不断尝试。下面是一部分规则的注意点。

条件过滤

"device-filter": {    "type": "operator",    "nodeType": "filter",    "props": {      "expr": "meta(deviceName) = \"DemoModbusDevice\" AND Temperature < 30"    }  },

注意:

  • 从event中取元数据,使用meta函数
  • 逻辑拼接,使用AND / OR,而不是 && ||

计算

  "temp-cal": {    "type": "operator",    "nodeType": "function",    "props": { "expr": "power(Temperature, 2) AS TemperaturePredict"    }  }, 

注意:

  • 只能使用内置函数(文档:https://ekuiper.org/docs/zh/latest/sqls/built-in_functions.html ),所以到目前还不知道怎么计算 Temperature * 2 这种公式。
  • 每个node只能算1个参数,比如像SQL这种 power(Temperature, 2) AS TemperaturePredict, ln(Temperature, 2) AS TemperaturePredict2形式是不行的,只有第1个会被计算,后面的参数会被忽略。

字段过滤

在计算节点之后,会增加新属性,但原有属性也会保留。也就是说,如果此时再流回消息总线,那流回的这个消息也会重新进入到ekuiper源中,也就是成了循环,消息会指数级增长,很快把服务器资源用满。

为了避免成环,1种方法是在订阅的topic中加过滤,即在edgex源配置中,topic上配到详细的层级。这种的情况缺点是你只能处理某个设备或某类属性的数据。
还有1种方法是,我只向edgex总线发新生成的属性,这就用到了pick类型的node。

配置很简单:

"field-pick": {    "type": "operator",    "nodeType": "pick",    "props": { "fields": ["TemperaturePredict"]    }},

sink配置

edgex动作

把消息流入edgex总线,最基础的配置如下:

"local-edgex-sink": {    "type": "sink",    "nodeType": "edgex",    "props": { "host":"edgex-redis", "port": 6379, "protocol": "redis", "messageType": "request", "topicPrefix": "edgex/events/device", "contentType": "application/json" }    }},

此时会发现,由于没有带过来元数据信息,流入edgex消息总线的数据里,deviceName、profileName和sourceName都使用了如下所示的默认值:

{    "apiVersion":"v2",    "id":"37720fa9-ef58-427e-b636-0f2a4a6bf3f3",    "deviceName":"ekuiper",    "profileName":"ekuiperProfile",    "sourceName":"graph_rule_edgex_demo",    "origin":1666248283338039300,    "readings":[ {     "id":"2988a557-1384-4244-9570-f8fe409cbbc9",     "origin":1666248283338074000,     "deviceName":"ekuiper",     "resourceName":"TemperaturePredict",     "profileName":"ekuiperProfile",     "valueType":"Float64",     "value":"2.433600e+02" }    ]}

如果只处理一个设备的数据,那在props中直接配置topic就可以了,如topic: edgex/events/device/ProfileAAA/DeviceAAA/TemperaturePredict。或者配置topicPrefix,并配置deviceName、profileName和sourceName,如:

"topicPrefix": "edgex/events/device","deviceName":"DeviceAAA","resourceName":"TemperaturePredict","profileName":"ProfileAAA",

meta填充

如果我处理的是多个设备,那deviceName就不能配死了,这就需要配置动态元数据,来保留之前的元数据。

按照文档 动态元数据 中所示:

把这个转换到图拓扑的配置就是,先要加一个计算node,产生edgex_meta这个字段,然后在edgex sink中配置metadata,如下所示:

  "meta-fill": {    "type": "operator",    "nodeType": "function",    "props": {      "expr": "meta(*) AS edgex_meta"    }  },
  "local-edgex-sink": {    "type": "sink",    "nodeType": "edgex",    "props": { "host":"edgex-redis", "port": 6379, "protocol": "redis", "messageType": "request", "topicPrefix": "edgex/events/device", "contentType": "application/json" "metadata": "edgex_meta"    }  }

注意:

  • 填充meta的字段只能叫edgex_meta这个名字,叫别的名字会被当做属性处理。
  • edgex sink中一旦配置了metadata,deviceName、profileName和sourceName的配置都不生效。(从下面源码可以看出)

这就有问题了,我的场景下其实只是想保留deviceName、profileName,而sourceName指定为新生成的字段TemperaturePredict,也就是说仅仅deviceName、profileName是动态的,但这在目前没办法做到。

个人感觉应该是props中deviceName、profileName和sourceName的配置优先级更高,用户的主观配置去覆盖meta中的数据才合理。

总体配置

整个规则的配置如下,仅供参考:

{  "id": "graph_rule_edgex_demo",  "name": "Test Graph Rule",  "graph": {    "nodes": {      "local-edgex-source": { "type": "source", "nodeType": "edgex", "props": {   "confKey": "my_conf" }      },    "device-filter": { "type": "operator", "nodeType": "filter", "props": {   "expr": "meta(deviceName) = \"DemoModbusDevice\" AND Temperature < 30" }      },      "temp-cal": { "type": "operator", "nodeType": "function", "props": {   "expr": "power(Temperature, 2) AS TemperaturePredict" }      },     "field-pick": { "type": "operator", "nodeType": "pick", "props": { "fields": ["TemperaturePredict"] }      },     "meta-fill": { "type": "operator", "nodeType": "function", "props": {   "expr": "meta(*) AS edgex_meta" }      },      "local-mqtt-sink": { "type": "sink", "nodeType": "mqtt", "props": {   "server": "tcp://local-emqx:1883",   "topic": "demo/sink/greater" }      },      "local-edgex-sink": { "type": "sink", "nodeType": "edgex", "props": {     "host":"edgex-redis",     "port": 6379,     "protocol": "redis",     "messageType": "request",     "topicPrefix": "edgex/events/device",     "contentType": "application/json" }      }    },    "topo": {      "sources": ["local-edgex-source"],      "edges": { "local-edgex-source": ["device-filter"], "device-filter": ["temp-cal"], "temp-cal": ["field-pick"], "field-pick": ["meta-fill"], "meta-fill": ["local-mqtt-sink", "local-edgex-sink"]      }    }  }}

上面的配置并不能直接完成最初的场景设定,即:通过计算生成新的属性,并流回edgex的消息总线(虽然流回了消息总线,但里面的数据格式不是理想的)。
work around方法是,我把edgex_meta作为一个属性,放回消息总线,在application service中二次处理。

最终,流入消息总线的消息如下:

topic: edgex/events/device/ekuiperProfile/ekuiper/graph_rule_edgex_demo

{    "apiVersion":"v2",    "id":"7274c14e-ba66-4c55-9ef3-45856f9d75c0",    "deviceName":"ekuiper",    "profileName":"ekuiperProfile",    "sourceName":"graph_rule_edgex_demo",    "origin":1666250712010773200,    "readings":[ {     "id":"3138b5eb-8b5a-40d3-bca2-b3352ef6ca98",     "origin":1666250712010787000,     "deviceName":"ekuiper",     "resourceName":"edgex_meta",     "profileName":"ekuiperProfile",     "valueType":"Object",     "objectValue":{  "Temperature":{      "deviceName":"DemoModbusDevice",      "id":"b9652a7f-3ff2-4cb4-8789-2c8db1f263b8",      "origin":1666250712009348900,      "profileName":"DemoModbusProfile",      "valueType":"Float32"  },  "correlationid":"29e2f9f5-38d6-42a8-bfdf-56811619a8d6",  "deviceName":"DemoModbusDevice",  "id":"dacaeb8f-7568-44d2-af7f-e4ce6ecd375c",  "origin":1666250712009531400,  "profileName":"DemoModbusProfile",  "sourceName":"Temperature",  "tags":null     } }, {     "id":"98d45a05-c613-441b-ad20-957327e80bf4",     "origin":1666250712010809000,     "deviceName":"ekuiper",     "resourceName":"TemperaturePredict",     "profileName":"ekuiperProfile",     "valueType":"Float64",     "value":"2.433600e+02" }    ]}

发到mqtt的消息如下:

[    { "TemperaturePredict":243.36001190185561, "edgex_meta":{     "Temperature":{  "deviceName":"DemoModbusDevice",  "id":"ac8764e1-dcfd-4210-b410-8e56e00bc722",  "origin":1666252953917595800,  "profileName":"DemoModbusProfile",  "valueType":"Float32"     },     "correlationid":"af8b46ac-a9da-4475-8db5-b9644f09db15",     "deviceName":"DemoModbusDevice",     "id":"8baf53af-4633-4ffd-aa48-64b0ae55be0d",     "origin":1666252953917725800,     "profileName":"DemoModbusProfile",     "sourceName":"Temperature",     "tags":null }    }]

可以看到,edgex_meta被当做一个属性进行了包装,进入了edgex消息总线。