From 3ec811e828c1ce8c5ad2b814c544d952b2da401b Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 30 Oct 2023 21:45:09 +0800 Subject: [PATCH] feat(gbt32960): Port the GBT32960 gateway from v4 --- apps/emqx_gateway/src/emqx_gateway_api.erl | 52 +- apps/emqx_gateway/src/emqx_gateway_schema.erl | 3 +- apps/emqx_gateway_gbt32960/BSL.txt | 94 ++ apps/emqx_gateway_gbt32960/README.md | 24 + .../doc/Data_Exchange_Guide_CN.md | 741 +++++++++++++++ .../include/emqx_gbt32960.hrl | 75 ++ apps/emqx_gateway_gbt32960/rebar.config | 6 + .../src/emqx_gateway_gbt32960.app.src | 10 + .../src/emqx_gateway_gbt32960.erl | 97 ++ .../src/emqx_gbt32960_channel.erl | 862 ++++++++++++++++++ .../src/emqx_gbt32960_frame.erl | 802 ++++++++++++++++ .../src/emqx_gbt32960_schema.erl | 55 ++ apps/emqx_machine/priv/reboot_lists.eterm | 1 + mix.exs | 1 + rel/i18n/emqx_gateway_api.hocon | 2 +- rel/i18n/emqx_gbt32960_schema.hocon | 12 + 16 files changed, 2830 insertions(+), 7 deletions(-) create mode 100644 apps/emqx_gateway_gbt32960/BSL.txt create mode 100644 apps/emqx_gateway_gbt32960/README.md create mode 100644 apps/emqx_gateway_gbt32960/doc/Data_Exchange_Guide_CN.md create mode 100644 apps/emqx_gateway_gbt32960/include/emqx_gbt32960.hrl create mode 100644 apps/emqx_gateway_gbt32960/rebar.config create mode 100644 apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src create mode 100644 apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.erl create mode 100644 apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl create mode 100644 apps/emqx_gateway_gbt32960/src/emqx_gbt32960_frame.erl create mode 100644 apps/emqx_gateway_gbt32960/src/emqx_gbt32960_schema.erl create mode 100644 rel/i18n/emqx_gbt32960_schema.hocon diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index 61f29059f..b628b47e3 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -161,12 +161,13 @@ gateway_enable(put, #{bindings := #{name := Name, enable := Enable}}) -> return_http_error(404, <<"NOT FOUND">>) end. --spec gw_name(binary()) -> stomp | coap | lwm2m | mqttsn | exproto | no_return(). +-spec gw_name(binary()) -> stomp | coap | lwm2m | mqttsn | exproto | gbt32960 | no_return(). gw_name(<<"stomp">>) -> stomp; gw_name(<<"coap">>) -> coap; gw_name(<<"lwm2m">>) -> lwm2m; gw_name(<<"mqttsn">>) -> mqttsn; gw_name(<<"exproto">>) -> exproto; +gw_name(<<"gbt32960">>) -> gbt32960; gw_name(_Else) -> throw(not_found). %%-------------------------------------------------------------------- @@ -390,7 +391,8 @@ fields(Gw) when Gw == mqttsn; Gw == coap; Gw == lwm2m; - Gw == exproto + Gw == exproto; + Gw == gbt32960 -> [{name, mk(Gw, #{desc => ?DESC(gateway_name)})}] ++ convert_listener_struct(emqx_gateway_schema:gateway_schema(Gw)); @@ -399,7 +401,8 @@ fields(Gw) when Gw == update_mqttsn; Gw == update_coap; Gw == update_lwm2m; - Gw == update_exproto + Gw == update_exproto; + Gw == update_gbt32960 -> "update_" ++ GwStr = atom_to_list(Gw), Gw1 = list_to_existing_atom(GwStr), @@ -458,7 +461,8 @@ schema_load_or_update_gateways_conf() -> ref(?MODULE, update_mqttsn), ref(?MODULE, update_coap), ref(?MODULE, update_lwm2m), - ref(?MODULE, update_exproto) + ref(?MODULE, update_exproto), + ref(?MODULE, update_gbt32960) ]), examples_update_gateway_confs() ). @@ -470,7 +474,8 @@ schema_gateways_conf() -> ref(?MODULE, mqttsn), ref(?MODULE, coap), ref(?MODULE, lwm2m), - ref(?MODULE, exproto) + ref(?MODULE, exproto), + ref(?MODULE, gbt32960) ]), examples_gateway_confs() ). @@ -756,6 +761,30 @@ examples_gateway_confs() -> } ] } + }, + gbt32960_gateway => + #{ + summary => <<"A simple GBT32960 gateway config">>, + value => + #{ + enable => true, + name => <<"gbt32960">>, + enable_stats => true, + mountpoint => <<"gbt32960/${clientid}">>, + retry_interval => <<"8s">>, + max_retry_times => 3, + message_queue_len => 10, + listeners => + [ + #{ + type => <<"tcp">>, + name => <<"default">>, + bind => <<"7325">>, + max_connections => 1024000, + max_conn_rate => 1000 + } + ] + } } }. @@ -854,5 +883,18 @@ examples_update_gateway_confs() -> handler => #{address => <<"http://127.0.0.1:9001">>} } + }, + gbt32960_gateway => + #{ + summary => <<"A simple GBT32960 gateway config">>, + value => + #{ + enable => true, + enable_stats => true, + mountpoint => <<"gbt32960/${clientid}">>, + retry_interval => <<"8s">>, + max_retry_times => 3, + message_queue_len => 10 + } } }. diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index e58e552e2..2107d3133 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -341,7 +341,8 @@ gateway_schema(stomp) -> emqx_stomp_schema:fields(stomp); gateway_schema(mqttsn) -> emqx_mqttsn_schema:fields(mqttsn); gateway_schema(coap) -> emqx_coap_schema:fields(coap); gateway_schema(lwm2m) -> emqx_lwm2m_schema:fields(lwm2m); -gateway_schema(exproto) -> emqx_exproto_schema:fields(exproto). +gateway_schema(exproto) -> emqx_exproto_schema:fields(exproto); +gateway_schema(gbt32960) -> emqx_gbt32960_schema:fields(gbt32960). %%-------------------------------------------------------------------- %% helpers diff --git a/apps/emqx_gateway_gbt32960/BSL.txt b/apps/emqx_gateway_gbt32960/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_gateway_gbt32960/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_gateway_gbt32960/README.md b/apps/emqx_gateway_gbt32960/README.md new file mode 100644 index 000000000..779e7004c --- /dev/null +++ b/apps/emqx_gateway_gbt32960/README.md @@ -0,0 +1,24 @@ +# emqx_gbt32960 + +The GBT32960 Gateway is based on the GBT32960 specification. + +## Quick Start + +In EMQX 5.0, GBT32960 gateway can be configured and enabled through the Dashboard. + +It can also be enabled via the HTTP API or emqx.conf, e.g. In emqx.conf: + +```properties +gateway.gbt32960 { + + mountpoint = "gbt32960/${clientid}" + + listeners.tcp.default { + bind = 7325 + } +} +``` + +> Note: +> Configuring the gateway via emqx.conf requires changes on a per-node basis, +> but configuring it via Dashboard or the HTTP API will take effect across the cluster. diff --git a/apps/emqx_gateway_gbt32960/doc/Data_Exchange_Guide_CN.md b/apps/emqx_gateway_gbt32960/doc/Data_Exchange_Guide_CN.md new file mode 100644 index 000000000..e528f982a --- /dev/null +++ b/apps/emqx_gateway_gbt32960/doc/Data_Exchange_Guide_CN.md @@ -0,0 +1,741 @@ +# emqx-gbt32960 + +该文档定义了 Plugins **emqx_gbt32960** 和 **EMQX** 之间数据交换的格式 + +约定: +- Payload 采用 Json 格式进行组装 +- Json Key 采用大驼峰格式命名 + +# Upstream +数据流向: Terminal -> emqx_gbt32960 -> EMQX + +## 车辆登入 +Topic: gbt32960/${vin}/upstream/vlogin + +```json +{ + "Cmd": 1, + "Encrypt": 1, + "Vin": "1G1BL52P7TR115520", + "Data": { + "ICCID": "12345678901234567890", + "Id": "C", + "Length": 1, + "Num": 1, + "Seq": 1, + "Time": { + "Day": 29, + "Hour": 12, + "Minute": 19, + "Month": 12, + "Second": 20, + "Year": 12 + } + } +} +``` + +其中 + +| 字段 | 类型 | 描述 | +| --------- | ------- | ------------------------------------------------------------ | +| `Cmd` | Integer | 命令单元; `1` 表示车辆登入 | +| `Encrypt` | Integer | 数据单元加密方式,`1` 表示不加密,`2` 数据经过 RSA 加密,`3` 数据经过 ASE128 算法加密;`254` 表示异常;`255` 表示无效;其他预留 | +| `Vin` | String | 唯一识别码,即车辆 VIN 码 | +| `Data` | Object | 数据单元, JSON 对象格式。 | + +车辆登入的数据单元格式为 + +| 字段 | 类型 | 描述 | +| -------- | ------- | ------------------------------------------------------------ | +| `Time` | Object | 数据采集时间,按年,月,日,时,分,秒,格式见示例。 | +| `Seq` | Integer | 登入流水号 | +| `ICCID` | String | 长度为20的字符串,SIM 卡的 ICCID 号 | +| `Num` | Integer | 可充电储能子系统数,有效值 0 ~ 250 | +| `Length` | Integer | 可充电储能系统编码长度,有效值 0 ~ 50 | +| `Id` | String | 可充电储能系统编码,长度为 "子系统数" 与 "编码长度" 值的乘积 | + +## 车辆登出 + +Topic: gbt32960/${vin}/upstream/vlogout + +车辆登出的 `Cmd` 值为 4,其余字段含义与登入相同: + +```json +{ + "Cmd": 4, + "Encrypt": 1, + "Vin": "1G1BL52P7TR115520", + "Data": { + "Seq": 1, + "Time": { + "Day": 1, + "Hour": 2, + "Minute": 59, + "Month": 1, + "Second": 0, + "Year": 16 + } + } +} +``` + +## 实时信息上报 + +Topic: gbt32960/${vin}/upstream/info + +> 不同信息类型上报,格式上只有 Infos 里面的对象属性不同,通过 `Type` 进行区分 +> Infos 为数组,代表车载终端每次报文可以上报多个信息 + +### 整车数据 + +```json +{ + "Cmd": 2, + "Encrypt": 1, + "Vin": "1G1BL52P7TR115520", + "Data": { + "Infos": [ + { + "AcceleratorPedal": 90, + "BrakePedal": 0, + "Charging": 1, + "Current": 15000, + "DC": 1, + "Gear": 5, + "Mileage": 999999, + "Mode": 1, + "Resistance": 6000, + "SOC": 50, + "Speed": 2000, + "Status": 1, + "Type": "Vehicle", + "Voltage": 5000 + } + ], + "Time": { + "Day": 1, + "Hour": 2, + "Minute": 59, + "Month": 1, + "Second": 0, + "Year": 16 + } + } +} +``` + + + +其中,整车信息字段含义如下: + +| 字段 | 类型 | 描述 | +| ------------ | ------- | ------------------------------------------------------------ | +| `Type` | String | 数据类型,`Vehicle` 表示该结构为整车信息 | +| `Status` | Integer | 车辆状态,`1` 表示启动状态;`2` 表示熄火;`3` 表示其状态;`254` 表示异常;`255` 表示无效 | +| `Charging` | Integer | 充电状态,`1` 表示停车充电;`2` 行驶充电;`3` 未充电状态;`4` 充电完成;`254` 表示异常;`255` 表示无效 | +| `Mode` | Integer | 运行模式,`1` 表示纯电;`2` 混动;`3` 燃油;`254` 表示异常;`255` 表示无效 | +| `Speed` | Integer | 车速,有效值 ( 0~ 2200,表示 0 km/h ~ 220 km/h),单位 0.1 km/h | +| `Mileage` | Integer | 累计里程,有效值 0 ~9,999,999(表示 0 km ~ 999,999.9 km),单位 0.1 km | +| `Voltage` | Integer | 总电压,有效值范围 0 ~10000(表示 0 V ~ 1000 V)单位 0.1 V | +| `Current` | Integer | 总电流,有效值 0 ~ 20000 (偏移量 1000,表示 -1000 A ~ +1000 A,单位 0.1 A | +| `SOC` | Integer | SOC,有效值 0 ~ 100(表示 0% ~ 100%) | +| `DC` | Integer | DC,`1` 工作;`2` 断开;`254` 表示异常;`255` 表示无效 | +| `Gear` | Integer | 档位,参考原协议的 表 A.1,此值为其转换为整数的值 | +| `Resistance` | Integer | 绝缘电阻,有效范围 0 ~ 60000(表示 0 k欧姆 ~ 60000 k欧姆) | + +### 驱动电机数据 + +```json +{ + "Cmd": 2, + "Encrypt": 1, + "Vin": "1G1BL52P7TR115520", + "Data": { + "Infos": [ + { + "Motors": [ + { + "CtrlTemp": 125, + "DCBusCurrent": 31203, + "InputVoltage": 30012, + "MotorTemp": 125, + "No": 1, + "Rotating": 30000, + "Status": 1, + "Torque": 25000 + }, + { + "CtrlTemp": 125, + "DCBusCurrent": 30200, + "InputVoltage": 32000, + "MotorTemp": 145, + "No": 2, + "Rotating": 30200, + "Status": 1, + "Torque": 25300 + } + ], + "Number": 2, + "Type": "DriveMotor" + } + ], + "Time": { + "Day": 1, + "Hour": 2, + "Minute": 59, + "Month": 1, + "Second": 0, + "Year": 16 + } + } +} +``` + +其中,驱动电机数据各个字段的含义是 + +| 字段 | 类型 | 描述 | +| -------- | ------- | ------------------------------ | +| `Type` | String | 数据类型,此处为 `DriveMotor` | +| `Number` | Integer | 驱动电机个数,有效值 1~253 | +| `Motors` | Array | 驱动电机数据列表 | + +驱动电机数据字段为: + +| 字段 | 类型 | 描述 | +| -------------- | -------- | ------------------------------------------------------------ | +| `No` | Integer | 驱动电机序号,有效值 1~253 | +| `Status` | Integer | 驱动电机状态,`1` 表示耗电;`2`发电;`3` 关闭状态;`4` 准备状态;`254` 表示异常;`255` 表示无效 | +| `CtrlTemp` | Integer | 驱动电机控制器温度,有效值 0~250(数值偏移 40°C,表示 -40°C ~ +210°C)单位 °C | +| `Rotating` | Interger | 驱动电机转速,有效值 0~65531(数值偏移 20000表示 -20000 r/min ~ 45531 r/min)单位 1 r/min | +| `Torque` | Integer | 驱动电机转矩,有效值 0~65531(数据偏移量 20000,表示 - 2000 N·m ~ 4553.1 N·m)单位 0.1 N·m | +| `MotorTemp` | Integer | 驱动电机温度,有效值 0~250(数据偏移量 40 °C,表示 -40°C ~ +210°C)单位 1°C | +| `InputVoltage` | Integer | 电机控制器输入电压,有效值 0~60000(表示 0V ~ 6000V)单位 0.1 V | +| `DCBusCurrent` | Interger | 电机控制器直流母线电流,有效值 0~20000(数值偏移 1000A,表示 -1000A ~ +1000 A)单位 0.1 A | + +### 燃料电池数据 + +```json +{ + "Cmd": 2, + "Encrypt": 1, + "Vin": "1G1BL52P7TR115520", + "Data": { + "Infos": [ + { + "CellCurrent": 12000, + "CellVoltage": 10000, + "DCStatus": 1, + "FuelConsumption": 45000, + "H_ConcSensorCode": 11, + "H_MaxConc": 35000, + "H_MaxPress": 500, + "H_MaxTemp": 12500, + "H_PressSensorCode": 12, + "H_TempProbeCode": 10, + "ProbeNum": 2, + "ProbeTemps": [120, 121], + "Type": "FuelCell" + } + ], + "Time": { + "Day": 1, + "Hour": 2, + "Minute": 59, + "Month": 1, + "Second": 0, + "Year": 16 + } + } +} +``` + +其中,燃料电池数据各个字段的含义是 + +| 字段 | 类型 | 描述 | +| ------------------- | ------- | ------------------------------------------------------------ | +| `Type` | String | 数据类型,此处为 `FuleCell` | +| `CellVoltage` | Integer | 燃料电池电压,有效值范围 0~20000(表示 0V ~ 2000V)单位 0.1 V | +| `CellCurrent` | Integer | 燃料电池电流,有效值范围 0~20000(表示 0A~ +2000A)单位 0.1 A | +| `FuelConsumption` | Integer | 燃料消耗率,有效值范围 0~60000(表示 0kg/100km ~ 600 kg/100km) 单位 0.01 kg/100km | +| `ProbeNum` | Integer | 燃料电池探针总数,有效值范围 0~65531 | +| `ProbeTemps` | Array | 燃料电池每探针温度值 | +| `H_MaxTemp` | Integer | 氢系统最高温度,有效值 0~2400(偏移量40°C,表示 -40°C~200°C)单位 0.1 °C | +| `H_TempProbeCode` | Integer | 氢系统最高温度探针代号,有效值 1~252 | +| `H_MaxConc` | Integer | 氢气最高浓度,有效值 0~60000(表示 0mg/kg ~ 50000 mg/kg)单位 1mg/kg | +| `H_ConcSensorCode` | Integer | 氢气最高浓度传感器代号,有效值 1~252 | +| `H_MaxPress` | Integer | 氢气最高压力,有效值 0~1000(表示 0 MPa ~ 100 MPa)最小单位 0.1 MPa | +| `H_PressSensorCode` | Integer | 氢气最高压力传感器代号,有效值 1~252 | +| `DCStatus` | Integer | 高压 DC/DC状态,`1` 表示工作;`2`断开 | + +### 发动机数据 + +```json +{ + "Cmd": 2, + "Encrypt": 1, + "Vin": "1G1BL52P7TR115520", + "Data": { + "Infos": [ + { + "CrankshaftSpeed": 2000, + "FuelConsumption": 200, + "Status": 1, + "Type": "Engine" + } + ], + "Time": { + "Day": 1, + "Hour": 22, + "Minute": 59, + "Month": 10, + "Second": 0, + "Year": 16 + } + } +} +``` + +其中,发动机数据各个字段的含义是 + +| 字段 | 类型 | 描述 | +| ----------------- | ------- | ------------------------------------------------------------ | +| `Type` | String | 数据类型,此处为 `Engine` | +| `Status` | Integer | 发动机状态,`1` 表示启动;`2` 关闭 | +| `CrankshaftSpeed` | Integer | 曲轴转速,有效值 0~60000(表示 0r/min~60000r/min)单位 1r/min | +| `FuelConsumption` | Integer | 燃料消耗率,有效范围 0~60000(表示 0L/100km~600L/100km)单位 0.01 L/100km | + + + +### 车辆位置数据 + +```json +{ + "Cmd": 2, + "Encrypt": 1, + "Vin": "1G1BL52P7TR115520", + "Data": { + "Infos": [ + { + "Latitude": 100, + "Longitude": 10, + "Status": 0, + "Type": "Location" + } + ], + "Time": { + "Day": 1, + "Hour": 22, + "Minute": 59, + "Month": 10, + "Second": 0, + "Year": 16 + } + } +} +``` + +其中,车辆位置数据各个字段的含义是 + +| 字段 | 类型 | 描述 | +| ----------- | ------- | ----------------------------------------------------- | +| `Type` | String | 数据类型,此处为 `Location` | +| `Status` | Integer | 定位状态,见原协议表15,此处为所有比特位的整型值 | +| `Longitude` | Integer | 经度,以度为单位的纬度值乘以 10^6,精确到百万分之一度 | +| `Latitude` | Integer | 纬度,以度为单位的纬度值乘以 10^6,精确到百万分之一度 | + + + +### 极值数据 + +```json +{ + "Cmd": 2, + "Encrypt": 1, + "Vin": "1G1BL52P7TR115520", + "Data": { + "Infos": [ + { + "MaxBatteryVoltage": 7500, + "MaxTemp": 120, + "MaxTempProbeNo": 12, + "MaxTempSubsysNo": 14, + "MaxVoltageBatteryCode": 10, + "MaxVoltageBatterySubsysNo": 12, + "MinBatteryVoltage": 2000, + "MinTemp": 40, + "MinTempProbeNo": 13, + "MinTempSubsysNo": 15, + "MinVoltageBatteryCode": 11, + "MinVoltageBatterySubsysNo": 13, + "Type": "Extreme" + } + ], + "Time": { + "Day": 30, + "Hour": 12, + "Minute": 22, + "Month": 5, + "Second": 59, + "Year": 17 + } + } +} +``` + +其中,极值数据各个字段的含义是 + +| 字段 | 类型 | 描述 | +| --------------------------- | ------- | ------------------------------------------------------------ | +| `Type` | String | 数据类型,此处为 `Extreme` | +| `MaxVoltageBatterySubsysNo` | Integer | 最高电压电池子系统号,有效值 1~250 | +| `MaxVoltageBatteryCode` | Integer | 最高电压电池单体代号,有效值 1~250 | +| `MaxBatteryVoltage` | Integer | 电池单体电压最高值,有效值 0~15000(表示 0V~15V)单位 0.001V | +| `MinVoltageBatterySubsysNo` | Integer | 最低电压电池子系统号,有效值 1~250 | +| `MinVoltageBatteryCode` | Integer | 最低电压电池单体代号,有效值 1~250 | +| `MinBatteryVoltage` | Integer | 电池单体电压最低值,有效值 0~15000(表示 0V~15V)单位 0.001V | +| `MaxTempSubsysNo` | Integer | 最高温度子系统号,有效值 1~250 | +| `MaxTempProbeNo` | Integer | 最高温度探针序号,有效值 1~250 | +| `MaxTemp` | Integer | 最高温度值,有效值范围 0~250(偏移量40,表示 -40°C~+210°C) | +| `MinTempSubsysNo` | Integer | 最低温度子系统号,有效值 1~250 | +| `MinTempProbeNo` | Integer | 最低温度探针序号,有效值 1~250 | +| `MinTemp` | Integer | 最低温度值,有效值范围 0~250(偏移量40,表示 -40°C~+210°C) | + + + +### 报警数据 + +```json +{ + "Cmd": 2, + "Encrypt": 1, + "Vin": "1G1BL52P7TR115520", + "Data": { + "Infos": [ + { + "FaultChargeableDeviceNum": 1, + "FaultChargeableDeviceList": ["00C8"], + "FaultDriveMotorNum": 0, + "FaultDriveMotorList": [], + "FaultEngineNum": 1, + "FaultEngineList": ["006F"], + "FaultOthersNum": 0, + "FaultOthersList": [], + "GeneralAlarmFlag": 3, + "MaxAlarmLevel": 1, + "Type": "Alarm" + } + ], + "Time": { + "Day": 20, + "Hour": 22, + "Minute": 23, + "Month": 12, + "Second": 59, + "Year": 17 + } + } +} +``` + +其中,报警数据各个字段的含义是 + +| 字段 | 类型 | 描述 | +| --------------------------- | ------- | ------------------------------------------------------------ | +| `Type` | String | 数据类型,此处为 `Alarm` | +| `MaxAlarmLevel` | Integer | 最高报警等级,有效值范围 0~3,`0` 表示无故障,`1` 表示 `1` 级故障 | +| `GeneralAlarmFlag` | Integer | 通用报警标志位,见原协议表 18 | +| `FaultChargeableDeviceNum` | Integer | 可充电储能装置故障总数,有效值 0~252 | +| `FaultChargeableDeviceList` | Array | 可充电储能装置故障代码列表 | +| `FaultDriveMotorNum` | Integer | 驱动电机故障总数,有效置范围 0 ~252 | +| `FaultDriveMotorList` | Array | 驱动电机故障代码列表 | +| `FaultEngineNum` | Integer | 发动机故障总数,有效值范围 0~252 | +| `FaultEngineList` | Array | 发动机故障代码列表 | +| `FaultOthersNum` | Integer | 其他故障总数 | +| `FaultOthersList` | Array | 其他故障代码列表 | + + + +### 可充电储能装置电压数据 + +```json +{ + "Cmd": 2, + "Encrypt": 1, + "Vin": "1G1BL52P7TR115520", + "Data": { + "Infos": [ + { + "Number": 2, + "SubSystems": [ + { + "CellsTotal": 2, + "CellsVoltage": [5000], + "ChargeableCurrent": 10000, + "ChargeableSubsysNo": 1, + "ChargeableVoltage": 5000, + "FrameCellsCount": 1, + "FrameCellsIndex": 0 + }, + { + "CellsTotal": 2, + "CellsVoltage": [5001], + "ChargeableCurrent": 10001, + "ChargeableSubsysNo": 2, + "ChargeableVoltage": 5001, + "FrameCellsCount": 1, + "FrameCellsIndex": 1 + } + ], + "Type": "ChargeableVoltage" + } + ], + "Time": { + "Day": 1, + "Hour": 22, + "Minute": 59, + "Month": 10, + "Second": 0, + "Year": 16 + } + } +} +``` + + + +其中,字段定义如下 + +| 字段 | 类型 | 描述 | +| ----------- | ------- | ------------------------------------ | +| `Type` | String | 数据类型,此处位 `ChargeableVoltage` | +| `Number` | Integer | 可充电储能子系统个数,有效范围 1~250 | +| `SubSystem` | Object | 可充电储能子系统电压信息列表 | + +可充电储能子系统电压信息数据格式: + +| 字段 | 类型 | 描述 | +| -------------------- | ------- | ------------------------------------------------------------ | +| `ChargeableSubsysNo` | Integer | 可充电储能子系统号,有效值范围,1~250 | +| `ChargeableVoltage` | Integer | 可充电储能装置电压,有效值范围,0~10000(表示 0V~1000V)单位 0.1 V | +| `ChargeableCurrent` | Integer | 可充电储能装置电流,有效值范围,0~20000(数值偏移量 1000A,表示 -1000A~+1000A)单位 0.1 A | +| `CellsTotal` | Integer | 单体电池总数,有效值范围 1~65531 | +| `FrameCellsIndex` | Integer | 本帧起始电池序号,当本帧单体个数超过 200 时,应该拆分多个帧进行传输,有效值范围 1~65531 | +| `FrameCellsCount` | Integer | 本帧单体电池总数,有效值范围 1~200 | +| `CellsVoltage` | Array | 单体电池电压,有效值范围 0~60000(表示 0V~60.000V)单位 0.001V | + + + +### 可充电储能装置温度数据 + +```json +{ + "Cmd": 2, + "Encrypt": 1, + "Vin": "1G1BL52P7TR115520", + "Data": { + "Infos": [ + { + "Number": 2, + "SubSystems": [ + { + "ChargeableSubsysNo": 1, + "ProbeNum": 10, + "ProbesTemp": [0, 0, 0, 0, 0, 0, 0, 0, 19, 136] + }, + { + "ChargeableSubsysNo": 2, + "ProbeNum": 1, + "ProbesTemp": [100] + } + ], + "Type": "ChargeableTemp" + } + ], + "Time": { + "Day": 1, + "Hour": 22, + "Minute": 59, + "Month": 10, + "Second": 0, + "Year": 16 + } + } +} +``` +其中,数据格式为: + +| 字段 | 类型 | 描述 | +| ------------ | ------- | --------------------------------- | +| `Type` | String | 数据类型,此处为 `ChargeableTemp` | +| `Number` | Integer | 可充电储能子系统温度信息列表长度 | +| `SubSystems` | Object | 可充电储能子系统温度信息列表 | + +可充电储能子系统温度信息格式为 + +| 字段 | 类型 | 描述 | +| -------------------- | -------- | ------------------------------------ | +| `ChargeableSubsysNo` | Ineteger | 可充电储能子系统号,有效值 1~250 | +| `ProbeNum` | Integer | 可充电储能温度探针个数 | +| `ProbesTemp` | Array | 可充电储能子系统各温度探针温度值列表 | + + + +## 数据补发 + +Topic: gbt32960/${vin}/upstream/reinfo + +**数据格式: 略** (与实时数据上报相同) + +# Downstream + +> 请求数据流向: EMQX -> emqx_gbt32960 -> Terminal + +> 应答数据流向: Terminal -> emqx_gbt32960 -> EMQX + +下行主题: gbt32960/${vin}/dnstream +上行应答主题: gbt32960/${vin}/upstream/response + +## 参数查询 + + + +**Req:** + +```json +{ + "Action": "Query", + "Total": 2, + "Ids": ["0x01", "0x02"] +} +``` + +| 字段 | 类型 | 描述 | +| -------- | ------- | -------------------------------------------------- | +| `Action` | String | 下发命令类型,此处为 `Query` | +| `Total` | Integer | 查询参数总数 | +| `Ids` | Array | 需查询参数的 ID 列表,具体 ID 含义见原协议 表 B.10 | + +**Response:** + +```json +{ + "Cmd": 128, + "Encrypt": 1, + "Vin": "1G1BL52P7TR115520", + "Data": { + "Total": 2, + "Params": [ + {"0x01": 6000}, + {"0x02": 10} + ], + "Time": { + "Day": 2, + "Hour": 11, + "Minute": 12, + "Month": 2, + "Second": 12, + "Year": 17 + } + } +} +``` + + + +## 参数设置 + +**Req:** +```json +{ + "Action": "Setting", + "Total": 2, + "Params": [{"0x01": 5000}, + {"0x02": 200}] +} +``` + +| 字段 | 类型 | 描述 | +| -------- | ------- | ------------------------------ | +| `Action` | String | 下发命令类型,此处为 `Setting` | +| `Total` | Integer | 设置参数总数 | +| `Params` | Array | 需设置参数的 ID 和 值 | + +**Response:** + +```json +// fixme? 终端是按照这种方式返回? +{ + "Cmd": 129, + "Encrypt": 1, + "Vin": "1G1BL52P7TR115520", + "Data": { + "Total": 2, + "Params": [ + {"0x01": 5000}, + {"0x02": 200} + ], + "Time": { + "Day": 2, + "Hour": 11, + "Minute": 12, + "Month": 2, + "Second": 12, + "Year": 17 + } + } +} +``` + +## 终端控制 +**命令的不同, 参数不同; 无参数时为空** + +远程升级: +**Req:** + +```json +{ + "Action": "Control", + "Command": "0x01", + "Param": { + "DialingName": "hz203", + "Username": "user001", + "Password": "password01", + "Ip": "192.168.199.1", + "Port": 8080, + "ManufacturerId": "BMWA", + "HardwareVer": "1.0.0", + "SoftwareVer": "1.0.0", + "UpgradeUrl": "ftp://emqtt.io/ftp/server", + "Timeout": 10 + } +} +``` + +| 字段 | 类型 | 描述 | +| --------- | ------- | ------------------------------ | +| `Action` | String | 下发命令类型,此处为 `Control` | +| `Command` | Integer | 下发指令 ID,见原协议表 B.15 | +| `Param` | Object | 命令参数 | + +列表 + +车载终端关机: + +```json +{ + "Action": "Control", + "Command": "0x02" +} +``` + +... + +车载终端报警: +```json +{ + "Action": "Control", + "Command": "0x06", + "Param": {"Level": 0, "Message": "alarm message"} +} +``` diff --git a/apps/emqx_gateway_gbt32960/include/emqx_gbt32960.hrl b/apps/emqx_gateway_gbt32960/include/emqx_gbt32960.hrl new file mode 100644 index 000000000..ce1a3f135 --- /dev/null +++ b/apps/emqx_gateway_gbt32960/include/emqx_gbt32960.hrl @@ -0,0 +1,75 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-record(frame, {cmd, ack, vin, encrypt, length, data, check, rawdata}). + +-define(CMD(CmdType), #frame{ + cmd = CmdType, + ack = ?ACK_IS_CMD +}). + +-define(CMD(CmdType, Data), #frame{ + cmd = CmdType, + data = Data, + ack = ?ACK_IS_CMD +}). + +-define(IS_ACK_CODE(C), + (C == ?ACK_SUCCESS orelse + C == ?ACK_ERROR orelse + C == ?ACK_VIN_REPEAT) +). + +%%-------------------------------------------------------------------- +%% CMD Feilds +%%-------------------------------------------------------------------- +-define(CMD_VIHECLE_LOGIN, 16#01). +-define(CMD_INFO_REPORT, 16#02). +-define(CMD_INFO_RE_REPORT, 16#03). +-define(CMD_VIHECLE_LOGOUT, 16#04). +-define(CMD_PLATFORM_LOGIN, 16#05). +-define(CMD_PLATFORM_LOGOUT, 16#06). +-define(CMD_HEARTBEAT, 16#07). +-define(CMD_SCHOOL_TIME, 16#08). +% 0x09~0x7F: Reserved by upstream system +% 0x80~0x82: Reserved by terminal data +-define(CMD_PARAM_QUERY, 16#80). +-define(CMD_PARAM_SETTING, 16#81). +-define(CMD_TERMINAL_CTRL, 16#82). + +% 0x83~0xBF: Reserved by downstream system +% 0xC0~0xFE: Customized data for Platform Exchange Protocol + +%%-------------------------------------------------------------------- +%% ACK Feilds +%%-------------------------------------------------------------------- +-define(ACK_SUCCESS, 16#01). +-define(ACK_ERROR, 16#02). +-define(ACK_VIN_REPEAT, 16#03). +-define(ACK_IS_CMD, 16#FE). + +%%-------------------------------------------------------------------- +%% Encrypt Feilds +%%-------------------------------------------------------------------- +-define(ENCRYPT_NONE, 16#01). +-define(ENCRYPT_RSA, 16#02). +-define(ENCRYPT_AES128, 16#03). +-define(ENCRYPT_ABNORMAL, 16#FE). +-define(ENCRYPT_INVAILD, 16#FF). + +%%-------------------------------------------------------------------- +%% Info Type Flags +%%-------------------------------------------------------------------- +-define(INFO_TYPE_VEHICLE, 16#01). +-define(INFO_TYPE_DRIVE_MOTOR, 16#02). +-define(INFO_TYPE_FUEL_CELL, 16#03). +-define(INFO_TYPE_ENGINE, 16#04). +-define(INFO_TYPE_LOCATION, 16#05). +-define(INFO_TYPE_EXTREME, 16#06). +-define(INFO_TYPE_ALARM, 16#07). +-define(INFO_TYPE_CHARGEABLE_VOLTAGE, 16#08). +-define(INFO_TYPE_CHARGEABLE_TEMP, 16#09). +% 0x0A~0x2F: Customized data for Platform Exchange Protocol +% 0x30~0x7F: Reserved +% 0x80~0xFE: Customized by user diff --git a/apps/emqx_gateway_gbt32960/rebar.config b/apps/emqx_gateway_gbt32960/rebar.config new file mode 100644 index 000000000..cfeb0a195 --- /dev/null +++ b/apps/emqx_gateway_gbt32960/rebar.config @@ -0,0 +1,6 @@ +{erl_opts, [debug_info]}. +{deps, [ + {emqx, {path, "../../apps/emqx"}}, + {emqx_utils, {path, "../emqx_utils"}}, + {emqx_gateway, {path, "../../apps/emqx_gateway"}} +]}. diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src new file mode 100644 index 000000000..ee6cf30d8 --- /dev/null +++ b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src @@ -0,0 +1,10 @@ +{application, emqx_gateway_gbt32960, [ + {description, "GBT32960 Gateway"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [kernel, stdlib, emqx, emqx_gateway]}, + {env, []}, + {modules, []}, + {licenses, ["BSL"]}, + {links, []} +]}. diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.erl b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.erl new file mode 100644 index 000000000..dde54026d --- /dev/null +++ b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.erl @@ -0,0 +1,97 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% @doc The GBT32960 Gateway implement +-module(emqx_gateway_gbt32960). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_gateway/include/emqx_gateway.hrl"). + +%% define a gateway named gbt32960 +-gateway(#{ + name => gbt32960, + callback_module => ?MODULE, + config_schema_module => emqx_gbt32960_schema +}). + +%% callback_module must implement the emqx_gateway_impl behaviour +-behaviour(emqx_gateway_impl). + +%% callback for emqx_gateway_impl +-export([ + on_gateway_load/2, + on_gateway_update/3, + on_gateway_unload/2 +]). + +-import( + emqx_gateway_utils, + [ + normalize_config/1, + start_listeners/4, + stop_listeners/2 + ] +). + +%%-------------------------------------------------------------------- +%% emqx_gateway_impl callbacks +%%-------------------------------------------------------------------- + +on_gateway_load( + _Gateway = #{ + name := GwName, + config := Config + }, + Ctx +) -> + Listeners = normalize_config(Config), + ModCfg = #{ + frame_mod => emqx_gbt32960_frame, + chann_mod => emqx_gbt32960_channel + }, + case + start_listeners( + Listeners, GwName, Ctx, ModCfg + ) + of + {ok, ListenerPids} -> + %% FIXME: How to throw an exception to interrupt the restart logic ? + %% FIXME: Assign ctx to GwState + {ok, ListenerPids, _GwState = #{ctx => Ctx}}; + {error, {Reason, Listener}} -> + throw( + {badconf, #{ + key => listeners, + value => Listener, + reason => Reason + }} + ) + end. + +on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> + GwName = maps:get(name, Gateway), + try + %% XXX: 1. How hot-upgrade the changes ??? + %% XXX: 2. Check the New confs first before destroy old state??? + on_gateway_unload(Gateway, GwState), + on_gateway_load(Gateway#{config => Config}, Ctx) + catch + Class:Reason:Stk -> + logger:error( + "Failed to update ~ts; " + "reason: {~0p, ~0p} stacktrace: ~0p", + [GwName, Class, Reason, Stk] + ), + {error, Reason} + end. + +on_gateway_unload( + _Gateway = #{ + name := GwName, + config := Config + }, + _GwState +) -> + Listeners = normalize_config(Config), + stop_listeners(GwName, Listeners). diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl new file mode 100644 index 000000000..6b491a807 --- /dev/null +++ b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl @@ -0,0 +1,862 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_gbt32960_channel). +-behaviour(emqx_gateway_channel). + +-include("emqx_gbt32960.hrl"). +-include_lib("emqx/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). + +-export([ + info/1, + info/2, + stats/1 +]). + +-export([ + init/2, + handle_in/2, + handle_deliver/2, + handle_timeout/3, + terminate/2, + set_conn_state/2 +]). + +-export([ + handle_call/3, + handle_cast/2, + handle_info/2 +]). + +-record(channel, { + %% Context + ctx :: emqx_gateway_ctx:context(), + %% ConnInfo + conninfo :: emqx_types:conninfo(), + %% ClientInfo + clientinfo :: emqx_types:clientinfo(), + %% Session + session :: undefined | map(), + %% Keepalive + keepalive :: maybe(emqx_keepalive:keepalive()), + %% Conn State + conn_state :: conn_state(), + %% Timers + timers :: #{atom() => undefined | disabled | reference()}, + %% Inflight + inflight :: emqx_inflight:inflight(), + %% Message Queue + mqueue :: queue:queue(), + retx_interval, + retx_max_times, + max_mqueue_len +}). + +-type conn_state() :: idle | connecting | connected | disconnected. + +-type channel() :: #channel{}. + +-type reply() :: + {outgoing, emqx_types:packet()} + | {outgoing, [emqx_types:packet()]} + | {event, conn_state() | updated} + | {close, Reason :: atom()}. + +-type replies() :: reply() | [reply()]. +-type frame() :: emqx_gbt32960_frame:frame(). + +-define(TIMER_TABLE, #{ + alive_timer => keepalive, + retry_timer => retry_delivery +}). + +-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). +-define(DEFAULT_MOUNTPOINT, <<"gbt32960/${clientid}">>). +-define(DEFAULT_DOWNLINK_TOPIC, <<"/dnstream">>). + +-dialyzer({nowarn_function, init/2}). + +%%-------------------------------------------------------------------- +%% Info, Attrs and Caps +%%-------------------------------------------------------------------- + +%% @doc Get infos of the channel. +-spec info(channel()) -> emqx_types:infos(). +info(Channel) -> + maps:from_list(info(?INFO_KEYS, Channel)). + +-spec info(list(atom()) | atom(), channel()) -> term(). +info(Keys, Channel) when is_list(Keys) -> + [{Key, info(Key, Channel)} || Key <- Keys]; +info(conninfo, #channel{conninfo = ConnInfo}) -> + ConnInfo; +info(zone, #channel{clientinfo = #{zone := Zone}}) -> + Zone; +info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> + ClientId; +info(clientinfo, #channel{clientinfo = ClientInfo}) -> + ClientInfo; +info(session, _) -> + #{}; +info(conn_state, #channel{conn_state = ConnState}) -> + ConnState; +info(keepalive, #channel{keepalive = undefined}) -> + undefined; +info(keepalive, #channel{keepalive = Keepalive}) -> + emqx_keepalive:info(Keepalive); +info(will_msg, _) -> + undefined. + +-spec stats(channel()) -> emqx_types:stats(). +stats(#channel{inflight = Inflight, mqueue = Queue}) -> + %% XXX: A fake stats for managed by emqx_management + [ + {subscriptions_cnt, 1}, + {subscriptions_max, 0}, + {inflight_cnt, emqx_inflight:size(Inflight)}, + {inflight_max, emqx_inflight:max_size(Inflight)}, + {mqueue_len, queue:len(Queue)}, + {mqueue_max, 0}, + {mqueue_dropped, 0}, + {next_pkt_id, 0}, + {awaiting_rel_cnt, 0}, + {awaiting_rel_max, 0} + ]. + +set_conn_state(ConnState, Channel) -> + Channel#channel{conn_state = ConnState}. + +%%-------------------------------------------------------------------- +%% Init the Channel +%%-------------------------------------------------------------------- + +init( + ConnInfo = #{ + peername := {PeerHost, _Port}, + sockname := {_Host, SockPort} + }, + Options +) -> + % TODO: init rsa_key from user input + Peercert = maps:get(peercert, ConnInfo, undefined), + Mountpoint = maps:get(mountpoint, Options, ?DEFAULT_MOUNTPOINT), + ListenerId = + case maps:get(listener, Options, undefined) of + undefined -> undefined; + {GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName) + end, + EnableAuthn = maps:get(enable_authn, Options, true), + + ClientInfo = setting_peercert_infos( + Peercert, + #{ + zone => default, + listener => ListenerId, + protocol => gbt32960, + peerhost => PeerHost, + sockport => SockPort, + clientid => undefined, + username => undefined, + is_bridge => false, + is_superuser => false, + enable_authn => EnableAuthn, + mountpoint => Mountpoint + } + ), + + Ctx = maps:get(ctx, Options), + + #{ + retry_interval := RetxInterv, + max_retry_times := RetxMaxTime, + message_queue_len := MessageQueueLen + } = Options, + + #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo, + inflight = emqx_inflight:new(1), + mqueue = queue:new(), + timers = #{}, + conn_state = idle, + retx_interval = RetxInterv, + retx_max_times = RetxMaxTime, + max_mqueue_len = MessageQueueLen + }. + +setting_peercert_infos(NoSSL, ClientInfo) when + NoSSL =:= nossl; + NoSSL =:= undefined +-> + ClientInfo; +setting_peercert_infos(Peercert, ClientInfo) -> + {DN, CN} = {esockd_peercert:subject(Peercert), esockd_peercert:common_name(Peercert)}, + ClientInfo#{dn => DN, cn => CN}. + +%%-------------------------------------------------------------------- +%% Handle incoming packet +%%-------------------------------------------------------------------- +-spec handle_in(emqx_gbt32960_frame:frame() | {frame_error, any()}, channel()) -> + {ok, channel()} + | {ok, replies(), channel()} + | {shutdown, Reason :: term(), channel()} + | {shutdown, Reason :: term(), replies(), channel()}. + +handle_in( + Frame = ?CMD(?CMD_VIHECLE_LOGIN), + Channel +) -> + case + emqx_utils:pipeline( + [ + fun enrich_clientinfo/2, + fun enrich_conninfo/2, + fun set_log_meta/2, + %% TODO: How to implement the banned in the gateway instance? + %, fun check_banned/2 + fun auth_connect/2 + ], + Frame, + Channel#channel{conn_state = connecting} + ) + of + {ok, _NPacket, NChannel} -> + process_connect(Frame, ensure_connected(NChannel)); + {error, ReasonCode, NChannel} -> + log(warning, #{msg => "login_failed", reason => ReasonCode}, NChannel), + shutdown(ReasonCode, NChannel) + end; +handle_in(_Frame, Channel = #channel{conn_state = ConnState}) when + ConnState =/= connected +-> + shutdown(protocol_error, Channel); +handle_in(Frame = ?CMD(?CMD_INFO_REPORT), Channel) -> + _ = upstreaming(Frame, Channel), + {ok, Channel}; +handle_in(Frame = ?CMD(?CMD_INFO_RE_REPORT), Channel) -> + _ = upstreaming(Frame, Channel), + {ok, Channel}; +handle_in(Frame = ?CMD(?CMD_VIHECLE_LOGOUT), Channel) -> + %% XXX: unsubscribe gbt32960/dnstream/${vin}? + _ = upstreaming(Frame, Channel), + {ok, Channel}; +handle_in(Frame = ?CMD(?CMD_PLATFORM_LOGIN), Channel) -> + #{ + <<"Username">> := _Username, + <<"Password">> := _Password + } = Frame#frame.data, + %% TODO: + _ = upstreaming(Frame, Channel), + {ok, Channel}; +handle_in(Frame = ?CMD(?CMD_PLATFORM_LOGOUT), Channel) -> + %% TODO: + _ = upstreaming(Frame, Channel), + {ok, Channel}; +handle_in(Frame = ?CMD(?CMD_HEARTBEAT), Channel) -> + handle_out({?ACK_SUCCESS, Frame}, Channel); +handle_in(Frame = ?CMD(?CMD_SCHOOL_TIME), Channel) -> + %% TODO: How verify this request + handle_out({?ACK_SUCCESS, Frame}, Channel); +handle_in(Frame = #frame{cmd = Cmd}, Channel = #channel{inflight = Inflight}) -> + {Outgoings, NChannel} = dispatch_frame(Channel#channel{inflight = ack_frame(Cmd, Inflight)}), + _ = upstreaming(Frame, NChannel), + {ok, [{outgoing, Outgoings}], NChannel}; +handle_in(Frame, Channel) -> + log(warning, #{msg => "unexcepted_frame", frame => Frame}, Channel), + {ok, Channel}. + +%%-------------------------------------------------------------------- +%% Handle out +%%-------------------------------------------------------------------- + +handle_out({AckCode, Frame}, Channel) when + ?IS_ACK_CODE(AckCode) +-> + {ok, [{outgoing, ack(AckCode, Frame)}], Channel}. + +%%-------------------------------------------------------------------- +%% Handle Delivers from broker to client +%%-------------------------------------------------------------------- +-spec handle_deliver(list(emqx_types:deliver()), channel()) -> + {ok, channel()} + | {ok, replies(), channel()}. + +handle_deliver( + Messages0, + Channel = #channel{ + clientinfo = #{clientid := ClientId, mountpoint := Mountpoint}, + mqueue = Queue, + max_mqueue_len = MaxQueueLen + } +) -> + Messages = lists:map( + fun({deliver, _, M}) -> + emqx_mountpoint:unmount(Mountpoint, M) + end, + Messages0 + ), + case MaxQueueLen - queue:len(Queue) of + N when N =< 0 -> + discard_downlink_messages(Messages, Channel), + {ok, Channel}; + N -> + {NMessages, Dropped} = split_by_pos(Messages, N), + log(debug, #{msg => "enqueue_messages", messages => NMessages}, Channel), + metrics_inc('messages.delivered', Channel, erlang:length(NMessages)), + discard_downlink_messages(Dropped, Channel), + Frames = msgs2frame(NMessages, ClientId, Channel), + NQueue = lists:foldl(fun(F, Q) -> queue:in(F, Q) end, Queue, Frames), + {Outgoings, NChannel} = dispatch_frame(Channel#channel{mqueue = NQueue}), + {ok, [{outgoing, Outgoings}], NChannel} + end. + +split_by_pos(L, Pos) -> + split_by_pos(L, Pos, []). + +split_by_pos([], _, A1) -> + {lists:reverse(A1), []}; +split_by_pos(L, 0, A1) -> + {lists:reverse(A1), L}; +split_by_pos([E | L], N, A1) -> + split_by_pos(L, N - 1, [E | A1]). + +msgs2frame(Messages, Vin, Channel) -> + lists:filtermap( + fun(#message{payload = Payload}) -> + case emqx_utils_json:safe_decode(Payload, [return_maps]) of + {ok, Maps} -> + case msg2frame(Maps, Vin) of + {error, Reason} -> + log( + debug, + #{ + msg => "convert_message_to_frame_error", + reason => Reason, + data => Maps + }, + Channel + ), + false; + Frame -> + {true, Frame} + end; + {error, Reason} -> + log(error, #{msg => "json_decode_error", reason => Reason}, Channel), + false + end + end, + Messages + ). + +%%-------------------------------------------------------------------- +%% Handle call +%%-------------------------------------------------------------------- + +-spec handle_call(Req :: term(), From :: term(), channel()) -> + {reply, Reply :: term(), channel()} + | {reply, Reply :: term(), replies(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), frame(), channel()}. + +handle_call(kick, _From, Channel) -> + Channel1 = ensure_disconnected(kicked, Channel), + disconnect_and_shutdown(kicked, ok, Channel1); +handle_call(discard, _From, Channel) -> + disconnect_and_shutdown(discarded, ok, Channel); +handle_call(Req, _From, Channel) -> + log(error, #{msg => "unexpected_call", call => Req}, Channel), + reply(ignored, Channel). + +%%-------------------------------------------------------------------- +%% Handle cast +%%-------------------------------------------------------------------- + +-spec handle_cast(Req :: term(), channel()) -> + ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}. +handle_cast(_Req, Channel) -> + {ok, Channel}. + +%%-------------------------------------------------------------------- +%% Handle info +%%-------------------------------------------------------------------- + +-spec handle_info(Info :: term(), channel()) -> + ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}. + +handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) -> + shutdown(Reason, Channel); +handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) -> + shutdown(Reason, Channel); +handle_info( + {sock_closed, Reason}, + Channel = + #channel{ + conn_state = connected + } +) -> + NChannel = ensure_disconnected(Reason, Channel), + shutdown(Reason, NChannel); +handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> + log(error, #{msg => "unexpected_sock_closed", reason => Reason}, Channel), + {ok, Channel}; +handle_info(Info, Channel) -> + log(error, #{msg => "unexpected_info}", info => Info}, Channel), + {ok, Channel}. + +%%-------------------------------------------------------------------- +%% Handle timeout +%%-------------------------------------------------------------------- + +-spec handle_timeout(reference(), Msg :: term(), channel()) -> + {ok, channel()} + | {ok, replies(), channel()} + | {shutdown, Reason :: term(), channel()}. + +handle_timeout( + _TRef, + {keepalive, _StatVal}, + Channel = #channel{keepalive = undefined} +) -> + {ok, Channel}; +handle_timeout( + _TRef, + {keepalive, _StatVal}, + Channel = #channel{conn_state = disconnected} +) -> + {ok, Channel}; +handle_timeout( + _TRef, + {keepalive, StatVal}, + Channel = #channel{keepalive = Keepalive} +) -> + case emqx_keepalive:check(StatVal, Keepalive) of + {ok, NKeepalive} -> + NChannel = Channel#channel{keepalive = NKeepalive}, + {ok, reset_timer(alive_timer, NChannel)}; + {error, timeout} -> + shutdown(keepalive_timeout, Channel) + end; +handle_timeout( + _TRef, + retry_delivery, + Channel = #channel{inflight = Inflight, retx_interval = RetxInterv} +) -> + case emqx_inflight:is_empty(Inflight) of + true -> + {ok, clean_timer(retry_timer, Channel)}; + false -> + Frames = emqx_inflight:to_list(Inflight), + {Outgoings, NInflight} = retry_delivery( + Frames, erlang:system_time(millisecond), RetxInterv, Inflight, [] + ), + {Outgoings2, NChannel} = dispatch_frame(Channel#channel{inflight = NInflight}), + {ok, [{outgoing, Outgoings ++ Outgoings2}], reset_timer(retry_timer, NChannel)} + end; +handle_timeout(_TRef, Msg, Channel) -> + log(error, #{msg => "unexpected_timeout", content => Msg}, Channel), + {ok, Channel}. + +%%-------------------------------------------------------------------- +%% Ensure timers +%%-------------------------------------------------------------------- + +ensure_timer(Name, Channel = #channel{timers = Timers}) -> + TRef = maps:get(Name, Timers, undefined), + Time = interval(Name, Channel), + case TRef == undefined andalso Time > 0 of + true -> ensure_timer(Name, Time, Channel); + %% Timer disabled or exists + false -> Channel + end. + +ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> + log(debug, #{msg => "start_timer", name => Name, time => Time}, Channel), + Msg = maps:get(Name, ?TIMER_TABLE), + TRef = emqx_utils:start_timer(Time, Msg), + Channel#channel{timers = Timers#{Name => TRef}}. + +reset_timer(Name, Channel) -> + ensure_timer(Name, clean_timer(Name, Channel)). + +clean_timer(Name, Channel = #channel{timers = Timers}) -> + Channel#channel{timers = maps:remove(Name, Timers)}. + +interval(alive_timer, #channel{keepalive = KeepAlive}) -> + emqx_keepalive:info(interval, KeepAlive); +interval(retry_timer, #channel{retx_interval = RetxIntv}) -> + RetxIntv. + +%%-------------------------------------------------------------------- +%% Terminate +%%-------------------------------------------------------------------- + +terminate(Reason, #channel{ + ctx = Ctx, + session = Session, + clientinfo = ClientInfo +}) -> + run_hooks(Ctx, 'session.terminated', [ClientInfo, Reason, Session]). + +%%-------------------------------------------------------------------- +%% Ensure connected + +enrich_clientinfo( + Packet, + Channel = #channel{ + clientinfo = ClientInfo + } +) -> + {ok, NPacket, NClientInfo} = emqx_utils:pipeline( + [ + fun maybe_assign_clientid/2, + %% FIXME: CALL After authentication successfully + fun fix_mountpoint/2 + ], + Packet, + ClientInfo + ), + {ok, NPacket, Channel#channel{clientinfo = NClientInfo}}. + +enrich_conninfo( + _Packet, + Channel = #channel{ + conninfo = ConnInfo, + clientinfo = ClientInfo + } +) -> + #{clientid := ClientId, username := Username} = ClientInfo, + NConnInfo = ConnInfo#{ + proto_name => <<"GBT32960">>, + proto_ver => <<"">>, + clean_start => true, + keepalive => 0, + expiry_interval => 0, + conn_props => #{}, + receive_maximum => 0, + clientid => ClientId, + username => Username + }, + {ok, Channel#channel{conninfo = NConnInfo}}. + +set_log_meta(_Packet, #channel{clientinfo = #{clientid := ClientId}}) -> + emqx_logger:set_metadata_clientid(ClientId), + ok. + +auth_connect( + _Packet, + Channel = #channel{ + ctx = Ctx, + clientinfo = ClientInfo + } +) -> + #{ + clientid := ClientId, + username := Username + } = ClientInfo, + case emqx_gateway_ctx:authenticate(Ctx, ClientInfo) of + {ok, NClientInfo} -> + {ok, Channel#channel{clientinfo = NClientInfo}}; + {error, Reason} -> + ?SLOG(warning, #{ + msg => "client_login_failed", + clientid => ClientId, + username => Username, + reason => Reason + }), + {error, Reason} + end. + +ensure_connected( + Channel = #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo + } +) -> + NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, + ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), + Channel#channel{ + conninfo = NConnInfo, + conn_state = connected + }. + +process_connect( + Frame, + Channel = #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo + } +) -> + SessFun = fun(_, _) -> #{} end, + case + emqx_gateway_ctx:open_session( + Ctx, + true, + ClientInfo, + ConnInfo, + SessFun + ) + of + {ok, #{session := Session}} -> + NChannel = Channel#channel{session = Session}, + subscribe_downlink(?DEFAULT_DOWNLINK_TOPIC, Channel), + _ = upstreaming(Frame, NChannel), + %% XXX: connection_accepted is not defined by stomp protocol + _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]), + {ok, NChannel}; + {error, Reason} -> + log( + error, + #{ + msg => "failed_to_open_session", + reason => Reason + }, + Channel + ), + shutdown(Reason, Channel) + end. + +maybe_assign_clientid(#frame{vin = Vin}, ClientInfo) -> + {ok, ClientInfo#{clientid => Vin, username => Vin}}. + +fix_mountpoint(_Packet, #{mountpoint := undefined}) -> + ok; +fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) -> + %% TODO: Enrich the variable replacement???? + %% i.e: ${ClientInfo.auth_result.productKey} + Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo), + {ok, ClientInfo#{mountpoint := Mountpoint1}}. + +%%-------------------------------------------------------------------- +%% Ensure disconnected + +ensure_disconnected( + Reason, + Channel = #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo + } +) -> + NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, + ok = run_hooks( + Ctx, + 'client.disconnected', + [ClientInfo, Reason, NConnInfo] + ), + Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +run_hooks(Ctx, Name, Args) -> + emqx_gateway_ctx:metrics_inc(Ctx, Name), + emqx_hooks:run(Name, Args). + +reply(Reply, Channel) -> + {reply, Reply, Channel}. + +shutdown(Reason, Channel) -> + {shutdown, Reason, Channel}. + +shutdown(Reason, Reply, Channel) -> + {shutdown, Reason, Reply, Channel}. + +disconnect_and_shutdown(Reason, Reply, Channel) -> + shutdown(Reason, Reply, Channel). + +retry_delivery([], _Now, _Interval, Inflight, Acc) -> + {lists:reverse(Acc), Inflight}; +retry_delivery([{Key, {_Frame, 0, _}} | Frames], Now, Interval, Inflight, Acc) -> + %% todo log(error, "has arrived max re-send times, drop ~p", [Frame]), + NInflight = emqx_inflight:delete(Key, Inflight), + retry_delivery(Frames, Now, Interval, NInflight, Acc); +retry_delivery([{Key, {Frame, RetxCount, Ts}} | Frames], Now, Interval, Inflight, Acc) -> + Diff = Now - Ts, + case Diff >= Interval of + true -> + NInflight = emqx_inflight:update(Key, {Frame, RetxCount - 1, Now}, Inflight), + retry_delivery(Frames, Now, Interval, NInflight, [Frame | Acc]); + _ -> + retry_delivery(Frames, Now, Interval, Inflight, Acc) + end. + +upstreaming( + Frame, Channel = #channel{clientinfo = #{mountpoint := Mountpoint, clientid := ClientId}} +) -> + {Topic, Payload} = transform(Frame, Mountpoint), + log(debug, #{msg => "upstreaming_to_topic", topic => Topic, payload => Payload}, Channel), + emqx:publish(emqx_message:make(ClientId, ?QOS_1, Topic, Payload)). + +transform(Frame = ?CMD(Cmd), Mountpoint) -> + Suffix = + case Cmd of + ?CMD_VIHECLE_LOGIN -> <<"/upstream/vlogin">>; + ?CMD_INFO_REPORT -> <<"/upstream/info">>; + ?CMD_INFO_RE_REPORT -> <<"/upstream/reinfo">>; + ?CMD_VIHECLE_LOGOUT -> <<"/upstream/vlogout">>; + ?CMD_PLATFORM_LOGIN -> <<"/upstream/plogin">>; + ?CMD_PLATFORM_LOGOUT -> <<"/upstream/plogout">>; + %CMD_HEARTBEAT, CMD_SCHOOL_TIME ... + _ -> <<"/upstream/transparent">> + end, + Topic = emqx_mountpoint:mount(Mountpoint, Suffix), + Payload = to_json(Frame), + {Topic, Payload}; +transform(Frame = #frame{ack = Ack}, Mountpoint) when + ?IS_ACK_CODE(Ack) +-> + Topic = emqx_mountpoint:mount(Mountpoint, <<"/upstream/response">>), + Payload = to_json(Frame), + {Topic, Payload}. + +to_json(#frame{cmd = Cmd, vin = Vin, encrypt = Encrypt, data = Data}) -> + emqx_utils_json:encode(#{'Cmd' => Cmd, 'Vin' => Vin, 'Encrypt' => Encrypt, 'Data' => Data}). + +ack(Code, Frame = #frame{data = Data, ack = ?ACK_IS_CMD}) -> + % PROTO: Update time & ack feilds only + Frame#frame{ack = Code, data = Data#{<<"Time">> => gentime()}}. + +ack_frame(Key, Inflight) -> + case emqx_inflight:contain(Key, Inflight) of + true -> emqx_inflight:delete(Key, Inflight); + false -> Inflight + end. + +dispatch_frame( + Channel = #channel{ + mqueue = Queue, + inflight = Inflight, + retx_max_times = RetxMax + } +) -> + case emqx_inflight:is_full(Inflight) orelse queue:is_empty(Queue) of + true -> + {[], Channel}; + false -> + {{value, Frame}, NewQueue} = queue:out(Queue), + + log(debug, #{msg => "delivery", frame => Frame}, Channel), + + NewInflight = emqx_inflight:insert( + Frame#frame.cmd, {Frame, RetxMax, erlang:system_time(millisecond)}, Inflight + ), + NChannel = Channel#channel{mqueue = NewQueue, inflight = NewInflight}, + {[Frame], ensure_timer(retry_timer, NChannel)} + end. + +gentime() -> + {Year, Mon, Day} = date(), + {Hour, Min, Sec} = time(), + Year1 = list_to_integer(string:substr(integer_to_list(Year), 3, 2)), + #{ + <<"Year">> => Year1, + <<"Month">> => Mon, + <<"Day">> => Day, + <<"Hour">> => Hour, + <<"Minute">> => Min, + <<"Second">> => Sec + }. + +%%-------------------------------------------------------------------- +%% Message to frame +%%-------------------------------------------------------------------- + +msg2frame(#{<<"Action">> := <<"Query">>, <<"Total">> := Total, <<"Ids">> := Ids}, Vin) -> + % Ids = [<<"0x01">>, <<"0x02">>] --> [1, 2] + Data = #{ + <<"Time">> => gentime(), + <<"Total">> => Total, + <<"Ids">> => lists:map(fun hexstring_to_byte/1, Ids) + }, + #frame{ + cmd = ?CMD_PARAM_QUERY, ack = ?ACK_IS_CMD, vin = Vin, encrypt = ?ENCRYPT_NONE, data = Data + }; +msg2frame(#{<<"Action">> := <<"Setting">>, <<"Total">> := Total, <<"Params">> := Params}, Vin) -> + % Params = [#{<<"0x01">> := 5000}, #{<<"0x02">> := 400}] + % Params1 = [#{1 := 5000}, #{2 := 400}] + Params1 = lists:foldr( + fun(M, Acc) -> + [{K, V}] = maps:to_list(M), + [#{hexstring_to_byte(K) => V} | Acc] + end, + [], + Params + ), + Data = #{<<"Time">> => gentime(), <<"Total">> => Total, <<"Params">> => Params1}, + #frame{ + cmd = ?CMD_PARAM_SETTING, ack = ?ACK_IS_CMD, vin = Vin, encrypt = ?ENCRYPT_NONE, data = Data + }; +msg2frame(Data = #{<<"Action">> := <<"Control">>, <<"Command">> := Command}, Vin) -> + Param = maps:get(<<"Param">>, Data, <<>>), + Data1 = #{ + <<"Time">> => gentime(), + <<"Command">> => hexstring_to_byte(Command), + <<"Param">> => Param + }, + #frame{ + cmd = ?CMD_TERMINAL_CTRL, + ack = ?ACK_IS_CMD, + vin = Vin, + encrypt = ?ENCRYPT_NONE, + data = Data1 + }; +msg2frame(_Data, _Vin) -> + {error, unsupproted}. + +hexstring_to_byte(S) when is_binary(S) -> + hexstring_to_byte(binary_to_list(S)); +hexstring_to_byte("0x" ++ S) -> + tune_byte(list_to_integer(S, 16)); +hexstring_to_byte(S) -> + tune_byte(list_to_integer(S)). + +tune_byte(I) when I =< 16#FF -> I; +tune_byte(_) -> exit(invalid_byte). + +discard_downlink_messages([], _Channel) -> + ok; +discard_downlink_messages(Messages, Channel) -> + log( + error, + #{ + msg => "discard_new_downlink_messages", + reason => + "Discard new downlink messages due to that too" + " many messages are waiting their ACKs.", + messages => Messages + }, + Channel + ), + metrics_inc('delivery.dropped', Channel, erlang:length(Messages)). + +log(Level, Meta, #channel{clientinfo = #{clientid := ClientId, username := Username}} = _Channel) -> + ?SLOG(Level, Meta#{clientid => ClientId, username => Username}). + +metrics_inc(Name, #channel{ctx = Ctx}, Oct) -> + emqx_gateway_ctx:metrics_inc(Ctx, Name, Oct). + +subscribe_downlink( + Topic, + #channel{ + ctx = Ctx, + clientinfo = + ClientInfo = + #{ + clientid := ClientId, + mountpoint := Mountpoint + } + } +) -> + {ParsedTopic, SubOpts0} = emqx_topic:parse(Topic), + SubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts0), + MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic), + _ = emqx_broker:subscribe(MountedTopic, ClientId, SubOpts), + run_hooks(Ctx, 'session.subscribed', [ClientInfo, MountedTopic, SubOpts]). diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_frame.erl b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_frame.erl new file mode 100644 index 000000000..a5ec3fa77 --- /dev/null +++ b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_frame.erl @@ -0,0 +1,802 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_gbt32960_frame). + +-behaviour(emqx_gateway_frame). + +-include("emqx_gbt32960.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% emqx_gateway_frame callbacks +-export([ + initial_parse_state/1, + serialize_opts/0, + serialize_pkt/2, + parse/2, + format/1, + type/1, + is_message/1 +]). + +-define(FLAG, 1 / binary). +-define(BYTE, 8 / big - integer). +-define(WORD, 16 / big - integer). +-define(DWORD, 32 / big - integer). +%% CMD: 1, ACK: 1, VIN: 17, Enc: 1, Len: 2 +-define(HEADER_SIZE, 22). + +-define(IS_RESPONSE(Ack), + Ack == ?ACK_SUCCESS orelse + Ack == ?ACK_ERROR orelse + Ack == ?ACK_VIN_REPEAT +). + +-type phase() :: search_heading_0x23 | parse. + +-type parser_state() :: #{ + data := binary(), + phase := phase() +}. + +%%-------------------------------------------------------------------- +%% Init a Parser +%%-------------------------------------------------------------------- + +-spec initial_parse_state(map()) -> parser_state(). +initial_parse_state(_) -> + #{data => <<>>, phase => search_heading_0x23}. + +-spec serialize_opts() -> emqx_gateway_frame:serialize_options(). +serialize_opts() -> + #{}. + +%%-------------------------------------------------------------------- +%% Parse Message +%%-------------------------------------------------------------------- +parse(Bin, State) -> + case enter_parse(Bin, State) of + {ok, Message, Rest} -> + {ok, Message, Rest, State#{parse => search_heading_0x23}}; + {error, Error} -> + {error, Error}; + {more_data_follow, Partial} -> + {more, State#{data => Partial, phase => parse}} + end. + +enter_parse(Bin, #{phase := search_heading_0x23}) -> + case search_heading_0x23(Bin) of + {ok, Rest} -> + parse_msg(Rest); + Error -> + Error + end; +enter_parse(Bin, #{data := Data}) -> + parse_msg(<>). + +search_heading_0x23(<<16#23, 16#23, Rest/binary>>) -> + {ok, Rest}; +search_heading_0x23(<<_, Rest/binary>>) -> + search_heading_0x23(Rest); +search_heading_0x23(<<>>) -> + {error, invalid_frame}. + +parse_msg(Binary) -> + case byte_size(Binary) >= ?HEADER_SIZE of + true -> + {Frame, Rest2} = parse_header(Binary), + case byte_size(Rest2) >= Frame#frame.length + 1 of + true -> parse_body(Rest2, Frame); + false -> {more_data_follow, Binary} + end; + false -> + {more_data_follow, Binary} + end. + +parse_header(<> = Binary) -> + Check = cal_check(Binary, ?HEADER_SIZE, undefined), + { + #frame{cmd = Cmd, ack = Ack, vin = VIN, encrypt = Encrypt, length = Length, check = Check}, + Rest2 + }. + +parse_body(Binary, Frame = #frame{length = Length, check = OldCheck, encrypt = Encrypt}) -> + <> = Binary, + Check = cal_check(Binary, Length, OldCheck), + case CheckByte == Check of + true -> + RawData = decipher(Data, Encrypt), + {ok, Frame#frame{data = parse_data(Frame, RawData), rawdata = RawData}, Rest}; + false -> + {error, frame_check_error} + end. + +% Algo: ?ENCRYPT_NONE, ENCRYPT_RSA, ENCRYPT_AES128 +decipher(Data, _Algo) -> + % TODO: decypher data + Data. + +% Algo: ?ENCRYPT_NONE, ENCRYPT_RSA, ENCRYPT_AES128 +encipher(Data, _Algo) -> + % TODO: encipher data + Data. + +parse_data( + #frame{cmd = ?CMD_VIHECLE_LOGIN}, + <> +) -> + #{ + <<"Time">> => #{ + <<"Year">> => Year, + <<"Month">> => Month, + <<"Day">> => Day, + <<"Hour">> => Hour, + <<"Minute">> => Minute, + <<"Second">> => Second + }, + <<"Seq">> => Seq, + <<"ICCID">> => ICCID, + <<"Num">> => Num, + <<"Length">> => Length, + <<"Id">> => Id + }; +parse_data( + #frame{cmd = ?CMD_INFO_REPORT}, + <> +) -> + #{ + <<"Time">> => #{ + <<"Year">> => Year, + <<"Month">> => Month, + <<"Day">> => Day, + <<"Hour">> => Hour, + <<"Minute">> => Minute, + <<"Second">> => Second + }, + <<"Infos">> => parse_info(Infos, []) + }; +parse_data( + #frame{cmd = ?CMD_INFO_RE_REPORT}, + <> +) -> + #{ + <<"Time">> => #{ + <<"Year">> => Year, + <<"Month">> => Month, + <<"Day">> => Day, + <<"Hour">> => Hour, + <<"Minute">> => Minute, + <<"Second">> => Second + }, + <<"Infos">> => parse_info(Infos, []) + }; +parse_data( + #frame{cmd = ?CMD_VIHECLE_LOGOUT}, + <> +) -> + #{ + <<"Time">> => #{ + <<"Year">> => Year, + <<"Month">> => Month, + <<"Day">> => Day, + <<"Hour">> => Hour, + <<"Minute">> => Minute, + <<"Second">> => Second + }, + <<"Seq">> => Seq + }; +parse_data( + #frame{cmd = ?CMD_PLATFORM_LOGIN}, + <> +) -> + #{ + <<"Time">> => #{ + <<"Year">> => Year, + <<"Month">> => Month, + <<"Day">> => Day, + <<"Hour">> => Hour, + <<"Minute">> => Minute, + <<"Second">> => Second + }, + <<"Seq">> => Seq, + <<"Username">> => Username, + <<"Password">> => Password, + <<"Encrypt">> => Encrypt + }; +parse_data( + #frame{cmd = ?CMD_PLATFORM_LOGOUT}, + <> +) -> + #{ + <<"Time">> => #{ + <<"Year">> => Year, + <<"Month">> => Month, + <<"Day">> => Day, + <<"Hour">> => Hour, + <<"Minute">> => Minute, + <<"Second">> => Second + }, + <<"Seq">> => Seq + }; +parse_data(#frame{cmd = ?CMD_HEARTBEAT}, <<>>) -> + #{}; +parse_data(#frame{cmd = ?CMD_SCHOOL_TIME}, <<>>) -> + #{}; +parse_data( + #frame{cmd = ?CMD_PARAM_QUERY}, + <> +) -> + %% XXX: need check ACK filed? + #{ + <<"Time">> => #{ + <<"Year">> => Year, + <<"Month">> => Month, + <<"Day">> => Day, + <<"Hour">> => Hour, + <<"Minute">> => Minute, + <<"Second">> => Second + }, + <<"Total">> => Total, + <<"Params">> => parse_params(Rest) + }; +parse_data( + #frame{cmd = ?CMD_PARAM_SETTING}, + <> +) -> + ?SLOG(debug, #{msg => "rest", data => Rest}), + #{ + <<"Time">> => #{ + <<"Year">> => Year, + <<"Month">> => Month, + <<"Day">> => Day, + <<"Hour">> => Hour, + <<"Minute">> => Minute, + <<"Second">> => Second + }, + <<"Total">> => Total, + <<"Params">> => parse_params(Rest) + }; +parse_data( + #frame{cmd = ?CMD_TERMINAL_CTRL}, + <> +) -> + #{ + <<"Time">> => #{ + <<"Year">> => Year, + <<"Month">> => Month, + <<"Day">> => Day, + <<"Hour">> => Hour, + <<"Minute">> => Minute, + <<"Second">> => Second + }, + <<"Command">> => Command, + <<"Param">> => parse_ctrl_param(Command, Rest) + }; +parse_data(Frame, Data) -> + ?SLOG(error, #{msg => "invalid_frame", frame => Frame, data => Data}), + error(invalid_frame). + +%%-------------------------------------------------------------------- +%% Parse Report Data Info +%%-------------------------------------------------------------------- + +parse_info(<<>>, Acc) -> + lists:reverse(Acc); +parse_info(<>, Acc) -> + <> = Body, + parse_info(Rest, [ + #{ + <<"Type">> => <<"Vehicle">>, + <<"Status">> => Status, + <<"Charging">> => Charging, + <<"Mode">> => Mode, + <<"Speed">> => Speed, + <<"Mileage">> => Mileage, + <<"Voltage">> => Voltage, + <<"Current">> => Current, + <<"SOC">> => SOC, + <<"DC">> => DC, + <<"Gear">> => Gear, + <<"Resistance">> => Resistance, + <<"AcceleratorPedal">> => AcceleratorPedal, + <<"BrakePedal">> => BrakePedal + } + | Acc + ]); +parse_info(<>, Acc) -> + % 12 is packet len of per drive motor + Len = Number * 12, + <> = Rest, + parse_info(Rest1, [ + #{ + <<"Type">> => <<"DriveMotor">>, + <<"Number">> => Number, + <<"Motors">> => parse_drive_motor(Bodys, []) + } + | Acc + ]); +parse_info(<>, Acc) -> + <> = + Rest, + + <> = Rest1, + + <> = Rest2, + parse_info(Rest3, [ + #{ + <<"Type">> => <<"FuelCell">>, + <<"CellVoltage">> => CellVoltage, + <<"CellCurrent">> => CellCurrent, + <<"FuelConsumption">> => FuelConsumption, + <<"ProbeNum">> => ProbeNum, + <<"ProbeTemps">> => binary_to_list(ProbeTemps), + <<"H_MaxTemp">> => H_MaxTemp, + <<"H_TempProbeCode">> => H_TempProbeCode, + <<"H_MaxConc">> => H_MaxConc, + <<"H_ConcSensorCode">> => H_ConcSensorCode, + <<"H_MaxPress">> => H_MaxPress, + <<"H_PressSensorCode">> => H_PressSensorCode, + <<"DCStatus">> => DCStatus + } + | Acc + ]); +parse_info( + <>, + Acc +) -> + parse_info(Rest, [ + #{ + <<"Type">> => <<"Engine">>, + <<"Status">> => Status, + <<"CrankshaftSpeed">> => CrankshaftSpeed, + <<"FuelConsumption">> => FuelConsumption + } + | Acc + ]); +parse_info( + <>, Acc +) -> + parse_info(Rest, [ + #{ + <<"Type">> => <<"Location">>, + <<"Status">> => Status, + <<"Longitude">> => Longitude, + <<"Latitude">> => Latitude + } + | Acc + ]); +parse_info(<>, Acc) -> + <> = Body, + + parse_info(Rest, [ + #{ + <<"Type">> => <<"Extreme">>, + <<"MaxVoltageBatterySubsysNo">> => MaxVoltageBatterySubsysNo, + <<"MaxVoltageBatteryCode">> => MaxVoltageBatteryCode, + <<"MaxBatteryVoltage">> => MaxBatteryVoltage, + <<"MinVoltageBatterySubsysNo">> => MinVoltageBatterySubsysNo, + <<"MinVoltageBatteryCode">> => MinVoltageBatteryCode, + <<"MinBatteryVoltage">> => MinBatteryVoltage, + <<"MaxTempSubsysNo">> => MaxTempSubsysNo, + <<"MaxTempProbeNo">> => MaxTempProbeNo, + <<"MaxTemp">> => MaxTemp, + <<"MinTempSubsysNo">> => MinTempSubsysNo, + <<"MinTempProbeNo">> => MinTempProbeNo, + <<"MinTemp">> => MinTemp + } + | Acc + ]); +parse_info(<>, Acc) -> + <> = + Rest, + N1 = FaultChargeableDeviceNum * 4, + <> = Rest1, + N2 = FaultDriveMotorNum * 4, + <> = Rest2, + N3 = FaultEngineNum * 4, + <> = Rest3, + N4 = FaultOthersNum * 4, + <> = Rest4, + parse_info(Rest5, [ + #{ + <<"Type">> => <<"Alarm">>, + <<"MaxAlarmLevel">> => MaxAlarmLevel, + <<"GeneralAlarmFlag">> => GeneralAlarmFlag, + <<"FaultChargeableDeviceNum">> => FaultChargeableDeviceNum, + <<"FaultChargeableDeviceList">> => tune_fault_codelist(FaultChargeableDeviceList), + <<"FaultDriveMotorNum">> => FaultDriveMotorNum, + <<"FaultDriveMotorList">> => tune_fault_codelist(FaultDriveMotorList), + <<"FaultEngineNum">> => FaultEngineNum, + <<"FaultEngineList">> => tune_fault_codelist(FaultEngineList), + <<"FaultOthersNum">> => FaultOthersNum, + <<"FaultOthersList">> => tune_fault_codelist(FaultOthersList) + } + | Acc + ]); +parse_info(<>, Acc) -> + {Rest1, SubSystems} = parse_chargeable_voltage(Rest, Number, []), + parse_info(Rest1, [ + #{ + <<"Type">> => <<"ChargeableVoltage">>, + <<"Number">> => Number, + <<"SubSystems">> => SubSystems + } + | Acc + ]); +parse_info(<>, Acc) -> + {Rest1, SubSystems} = parse_chargeable_temp(Rest, Number, []), + parse_info(Rest1, [ + #{ + <<"Type">> => <<"ChargeableTemp">>, + <<"Number">> => Number, + <<"SubSystems">> => SubSystems + } + | Acc + ]); +parse_info(Rest, Acc) -> + ?SLOG(error, #{msg => "invalid_info_feild", rest => Rest, acc => Acc}), + error(invalid_info_feild). + +parse_drive_motor(<<>>, Acc) -> + lists:reverse(Acc); +parse_drive_motor( + <>, + Acc +) -> + parse_drive_motor(Rest, [ + #{ + <<"No">> => No, + <<"Status">> => Status, + <<"CtrlTemp">> => CtrlTemp, + <<"Rotating">> => Rotating, + <<"Torque">> => Torque, + <<"MotorTemp">> => MotorTemp, + <<"InputVoltage">> => InputVoltage, + <<"DCBusCurrent">> => DCBusCurrent + } + | Acc + ]). + +parse_chargeable_voltage(Rest, 0, Acc) -> + {Rest, lists:reverse(Acc)}; +parse_chargeable_voltage( + <>, + Num, + Acc +) -> + Len = FrameCellsCount * 2, + <> = Rest, + parse_chargeable_voltage(Rest1, Num - 1, [ + #{ + <<"ChargeableSubsysNo">> => ChargeableSubsysNo, + <<"ChargeableVoltage">> => ChargeableVoltage, + <<"ChargeableCurrent">> => ChargeableCurrent, + <<"CellsTotal">> => CellsTotal, + <<"FrameCellsIndex">> => FrameCellsIndex, + <<"FrameCellsCount">> => FrameCellsCount, + <<"CellsVoltage">> => tune_voltage(CellsVoltage) + } + | Acc + ]). + +parse_chargeable_temp(Rest, 0, Acc) -> + {Rest, lists:reverse(Acc)}; +parse_chargeable_temp(<>, Num, Acc) -> + <> = Rest, + parse_chargeable_temp(Rest1, Num - 1, [ + #{ + <<"ChargeableSubsysNo">> => ChargeableSubsysNo, + <<"ProbeNum">> => ProbeNum, + <<"ProbesTemp">> => binary_to_list(ProbesTemp) + } + | Acc + ]). +tune_fault_codelist(<<>>) -> + []; +tune_fault_codelist(Data) -> + lists:flatten([list_to_binary(io_lib:format("~4.16.0B", [X])) || <> <= Data]). + +tune_voltage(Bin) -> tune_voltage_(Bin, []). +tune_voltage_(<<>>, Acc) -> lists:reverse(Acc); +tune_voltage_(<>, Acc) -> tune_voltage_(Rest, [V | Acc]). + +parse_params(Bin) -> parse_params_(Bin, []). +parse_params_(<<>>, Acc) -> + lists:reverse(Acc); +parse_params_(<<16#01, Val:?WORD, Rest/binary>>, Acc) -> + parse_params_(Rest, [#{<<"0x01">> => Val} | Acc]); +parse_params_(<<16#02, Val:?WORD, Rest/binary>>, Acc) -> + parse_params_(Rest, [#{<<"0x02">> => Val} | Acc]); +parse_params_(<<16#03, Val:?WORD, Rest/binary>>, Acc) -> + parse_params_(Rest, [#{<<"0x03">> => Val} | Acc]); +parse_params_(<<16#04, Val:?BYTE, Rest/binary>>, Acc) -> + parse_params_(Rest, [#{<<"0x04">> => Val} | Acc]); +parse_params_(<<16#05, Rest/binary>>, Acc) -> + case [V || #{<<"0x04">> := V} <- Acc] of + [Len] -> + <> = Rest, + parse_params_(Rest1, [#{<<"0x05">> => Val} | Acc]); + _ -> + ?SLOG(error, #{ + msg => "invalid_data", reason => "cmd_0x04 must appear ahead of cmd_0x05" + }), + lists:reverse(Acc) + end; +parse_params_(<<16#06, Val:?WORD, Rest/binary>>, Acc) -> + parse_params_(Rest, [#{<<"0x06">> => Val} | Acc]); +parse_params_(<<16#07, Val:5/binary, Rest/binary>>, Acc) -> + parse_params_(Rest, [#{<<"0x07">> => Val} | Acc]); +parse_params_(<<16#08, Val:5/binary, Rest/binary>>, Acc) -> + parse_params_(Rest, [#{<<"0x08">> => Val} | Acc]); +parse_params_(<<16#09, Val:?BYTE, Rest/binary>>, Acc) -> + parse_params_(Rest, [#{<<"0x09">> => Val} | Acc]); +parse_params_(<<16#0A, Val:?WORD, Rest/binary>>, Acc) -> + parse_params_(Rest, [#{<<"0x0A">> => Val} | Acc]); +parse_params_(<<16#0B, Val:?WORD, Rest/binary>>, Acc) -> + parse_params_(Rest, [#{<<"0x0B">> => Val} | Acc]); +parse_params_(<<16#0C, Val:?BYTE, Rest/binary>>, Acc) -> + parse_params_(Rest, [#{<<"0x0C">> => Val} | Acc]); +parse_params_(<<16#0D, Val:?BYTE, Rest/binary>>, Acc) -> + parse_params_(Rest, [#{<<"0x0D">> => Val} | Acc]); +parse_params_(<<16#0E, Rest/binary>>, Acc) -> + case [V || #{<<"0x0D">> := V} <- Acc] of + [Len] -> + <> = Rest, + parse_params_(Rest1, [#{<<"0x0E">> => Val} | Acc]); + _ -> + ?SLOG(error, #{ + msg => "invalid_data", reason => "cmd_0x0D must appear ahead of cmd_0x0E" + }), + lists:reverse(Acc) + end; +parse_params_(<<16#0F, Val:?WORD, Rest/binary>>, Acc) -> + parse_params_(Rest, [#{<<"0x0F">> => Val} | Acc]); +parse_params_(<<16#10, Val:?BYTE, Rest/binary>>, Acc) -> + parse_params_(Rest, [#{<<"0x10">> => Val} | Acc]); +parse_params_(Cmd, Acc) -> + ?SLOG(error, #{msg => "unexcepted_param_identifier", cmd => Cmd}), + lists:reverse(Acc). + +parse_ctrl_param(16#01, Param) -> + parse_upgrade_feild(Param); +parse_ctrl_param(16#02, _) -> + <<>>; +parse_ctrl_param(16#03, _) -> + <<>>; +parse_ctrl_param(16#04, _) -> + <<>>; +parse_ctrl_param(16#05, _) -> + <<>>; +parse_ctrl_param(16#06, <>) -> + #{<<"Level">> => Level, <<"Message">> => Msg}; +parse_ctrl_param(16#07, _) -> + <<>>; +parse_ctrl_param(Cmd, Param) -> + ?SLOG(error, #{msg => "unexcepted_param", param => Param, cmd => Cmd}), + <<>>. + +parse_upgrade_feild(Param) -> + [ + DialingName, + Username, + Password, + <<0, 0, I1, I2, I3, I4>>, + <>, + ManufacturerId, + HardwareVer, + SoftwareVer, + UpgradeUrl, + <> + ] = re:split(Param, ";", [{return, binary}]), + + #{ + <<"DialingName">> => DialingName, + <<"Username">> => Username, + <<"Password">> => Password, + <<"Ip">> => list_to_binary(inet:ntoa({I1, I2, I3, I4})), + <<"Port">> => Port, + <<"ManufacturerId">> => ManufacturerId, + <<"HardwareVer">> => HardwareVer, + <<"SoftwareVer">> => SoftwareVer, + <<"UpgradeUrl">> => UpgradeUrl, + <<"Timeout">> => Timeout + }. + +%%-------------------------------------------------------------------- +%% serialize_pkt +%%-------------------------------------------------------------------- +serialize_pkt(Frame, _Opts) -> + serialize(Frame). + +serialize(#frame{cmd = Cmd, ack = Ack, vin = Vin, encrypt = Encrypt, data = Data, rawdata = RawData}) -> + Encrypted = encipher(serialize_data(Cmd, Ack, RawData, Data), Encrypt), + Len = byte_size(Encrypted), + Stream = <>, + Crc = cal_check(Stream, byte_size(Stream), undefined), + <<"##", Stream/binary, Crc:?BYTE>>. + +serialize_data(?CMD_PARAM_QUERY, ?ACK_IS_CMD, _, #{ + <<"Time">> := Time, + <<"Total">> := Total, + <<"Ids">> := Ids +}) when length(Ids) == Total -> + T = tune_time(Time), + Ids1 = tune_ids(Ids), + <>; +serialize_data(?CMD_PARAM_SETTING, ?ACK_IS_CMD, _, #{ + <<"Time">> := Time, + <<"Total">> := Total, + <<"Params">> := Params +}) when length(Params) == Total -> + T = tune_time(Time), + Params1 = tune_params(Params), + <>; +serialize_data(?CMD_TERMINAL_CTRL, ?ACK_IS_CMD, _, #{ + <<"Time">> := Time, + <<"Command">> := Cmd, + <<"Param">> := Param +}) -> + T = tune_time(Time), + Param1 = tune_ctrl_param(Cmd, Param), + <>; +serialize_data(_Cmd, Ack, RawData, #{<<"Time">> := Time}) when ?IS_RESPONSE(Ack) -> + Rest = + case byte_size(RawData) > 6 of + false -> <<>>; + true -> binary:part(RawData, 6, byte_size(RawData) - 6) + end, + T = tune_time(Time), + <>. + +tune_time(#{ + <<"Year">> := Year, + <<"Month">> := Month, + <<"Day">> := Day, + <<"Hour">> := Hour, + <<"Minute">> := Min, + <<"Second">> := Sec +}) -> + <>. + +tune_ids(Ids) -> + lists:foldr( + fun + (Id, Acc) when is_integer(Id) -> + <>; + (Id, Acc) when is_binary(Id) -> + <> + end, + <<>>, + Ids + ). + +tune_params(Params) -> + tune_params_(lists:reverse(Params), <<>>). + +tune_params_([], Bin) -> + Bin; +tune_params_([#{16#01 := Val} | Rest], Bin) -> + tune_params_(Rest, <<16#01:?BYTE, Val:?WORD, Bin/binary>>); +tune_params_([#{16#02 := Val} | Rest], Bin) -> + tune_params_(Rest, <<16#02:?BYTE, Val:?WORD, Bin/binary>>); +tune_params_([#{16#03 := Val} | Rest], Bin) -> + tune_params_(Rest, <<16#03:?BYTE, Val:?WORD, Bin/binary>>); +tune_params_([#{16#04 := Val} | Rest], Bin) -> + {Val_05, Rest1} = take_param(16#05, Rest), + tune_params_(Rest1, <<16#04:?BYTE, Val:?BYTE, 16#05, Val_05:Val/binary, Bin/binary>>); +tune_params_([#{16#05 := Val} | Rest], Bin) -> + tune_params_(Rest ++ [#{16#05 => Val}], Bin); +tune_params_([#{16#06 := Val} | Rest], Bin) -> + tune_params_(Rest, <<16#06:?BYTE, Val:?WORD, Bin/binary>>); +tune_params_([#{16#07 := Val} | Rest], Bin) when byte_size(Val) == 5 -> + tune_params_(Rest, <<16#07:?BYTE, Val/binary, Bin/binary>>); +tune_params_([#{16#08 := Val} | Rest], Bin) when byte_size(Val) == 5 -> + tune_params_(Rest, <<16#08:?BYTE, Val/binary, Bin/binary>>); +tune_params_([#{16#09 := Val} | Rest], Bin) -> + tune_params_(Rest, <<16#09:?BYTE, Val:?BYTE, Bin/binary>>); +tune_params_([#{16#0A := Val} | Rest], Bin) -> + tune_params_(Rest, <<16#0A:?BYTE, Val:?WORD, Bin/binary>>); +tune_params_([#{16#0B := Val} | Rest], Bin) -> + tune_params_(Rest, <<16#0B:?BYTE, Val:?WORD, Bin/binary>>); +tune_params_([#{16#0C := Val} | Rest], Bin) -> + tune_params_(Rest, <<16#0C:?BYTE, Val:?BYTE, Bin/binary>>); +tune_params_([#{16#0D := Val} | Rest], Bin) -> + {Val_0E, Rest1} = take_param(16#0E, Rest), + tune_params_(Rest1, <<16#0D:?BYTE, Val:?BYTE, 16#0E, Val_0E:Val/binary, Bin/binary>>); +tune_params_([#{16#0E := Val} | Rest], Bin) -> + tune_params_(Rest ++ [#{16#0E => Val}], Bin); +tune_params_([#{16#0F := Val} | Rest], Bin) -> + tune_params_(Rest, <<16#0F:?BYTE, Val:?WORD, Bin/binary>>); +tune_params_([#{16#10 := Val} | Rest], Bin) -> + tune_params_(Rest, <<16#10:?BYTE, Val:?BYTE, Bin/binary>>). + +tune_ctrl_param(16#00, _) -> + <<>>; +tune_ctrl_param(16#01, Param) -> + tune_upgrade_feild(Param); +tune_ctrl_param(16#02, _) -> + <<>>; +tune_ctrl_param(16#03, _) -> + <<>>; +tune_ctrl_param(16#04, _) -> + <<>>; +tune_ctrl_param(16#05, _) -> + <<>>; +tune_ctrl_param(16#06, #{<<"Level">> := Level, <<"Message">> := Msg}) -> + <>; +tune_ctrl_param(16#07, _) -> + <<>>; +tune_ctrl_param(Cmd, Param) -> + ?SLOG(error, #{msg => "unexcepted_cmd", cmd => Cmd, param => Param}), + <<>>. + +tune_upgrade_feild(Param) -> + TuneBin = fun + (Bin, Len) when is_binary(Bin), byte_size(Bin) =:= Len -> Bin; + (undefined, _) -> undefined; + (Bin, _) -> error({invalid_param_length, Bin}) + end, + TuneWrd = fun + (Val) when is_integer(Val), Val < 65535 -> <>; + (undefined) -> undefined; + (_) -> error(invalid_param_word_value) + end, + TuneAdr = fun + (Ip) when is_binary(Ip) -> + {ok, {I1, I2, I3, I4}} = inet:parse_address(binary_to_list(Ip)), + <<0, 0, I1, I2, I3, I4>>; + (undefined) -> + undefined; + (_) -> + error(invalid_ip_address) + end, + L = [ + maps:get(<<"DialingName">>, Param, undefined), + maps:get(<<"Username">>, Param, undefined), + maps:get(<<"Password">>, Param, undefined), + TuneAdr(maps:get(<<"Ip">>, Param, undefined)), + TuneWrd(maps:get(<<"Port">>, Param, undefined)), + TuneBin(maps:get(<<"ManufacturerId">>, Param, undefined), 4), + TuneBin(maps:get(<<"HardwareVer">>, Param, undefined), 5), + TuneBin(maps:get(<<"SoftwareVer">>, Param, undefined), 5), + maps:get(<<"UpgradeUrl">>, Param, undefined), + TuneWrd(maps:get(<<"Timeout">>, Param, undefined)) + ], + list_to_binary([I || I <- lists:join(";", L), I /= undefined]). + +take_param(K, Params) -> + V = search_param(K, Params), + {V, Params -- [#{K => V}]}. + +search_param(16#05, [#{16#05 := V} | _]) -> V; +search_param(16#0E, [#{16#0E := V} | _]) -> V; +search_param(K, [_ | Rest]) -> search_param(K, Rest). + +cal_check(_, 0, Check) -> Check; +cal_check(<>, Size, undefined) -> cal_check(Rest, Size - 1, C); +cal_check(<>, Size, Check) -> cal_check(Rest, Size - 1, Check bxor C). + +format(Msg) -> + io_lib:format("~p", [Msg]). + +type(_) -> + gbt32960. + +is_message(#frame{}) -> + true; +is_message(_) -> + false. diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_schema.erl b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_schema.erl new file mode 100644 index 000000000..bc86b9686 --- /dev/null +++ b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_schema.erl @@ -0,0 +1,55 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_gbt32960_schema). + +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("typerefl/include/types.hrl"). + +%% config schema provides +-export([fields/1, desc/1]). + +fields(gbt32960) -> + [ + {mountpoint, emqx_gateway_schema:mountpoint()}, + {retry_interval, + sc( + emqx_schema:duration_ms(), + #{ + default => <<"8s">>, + desc => ?DESC(retry_interval) + } + )}, + {max_retry_times, + sc( + non_neg_integer(), + #{ + default => 3, + desc => ?DESC(max_retry_times) + } + )}, + {message_queue_len, + sc( + non_neg_integer(), + #{ + default => 10, + desc => ?DESC(message_queue_len) + } + )}, + {listeners, sc(ref(emqx_gateway_schema, tcp_listeners), #{desc => ?DESC(tcp_listeners)})} + ] ++ emqx_gateway_schema:gateway_common_options(). + +desc(gbt32960) -> + "The GBT-32960 gateway"; +desc(_) -> + undefined. + +%%-------------------------------------------------------------------- +%% internal functions + +sc(Type, Meta) -> + hoconsc:mk(Type, Meta). + +ref(Mod, Field) -> + hoconsc:ref(Mod, Field). diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index 5ea6eee70..4292ce50d 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -65,6 +65,7 @@ emqx_gateway_coap, emqx_gateway_lwm2m, emqx_gateway_exproto, + emqx_gateway_gbt32960, emqx_exhook, emqx_bridge, emqx_bridge_mqtt, diff --git a/mix.exs b/mix.exs index ed869c414..11e0099bd 100644 --- a/mix.exs +++ b/mix.exs @@ -329,6 +329,7 @@ defmodule EMQXUmbrella.MixProject do :emqx_gateway_coap, :emqx_gateway_lwm2m, :emqx_gateway_exproto, + :emqx_gateway_gbt32960, :emqx_dashboard, :emqx_dashboard_sso, :emqx_audit, diff --git a/rel/i18n/emqx_gateway_api.hocon b/rel/i18n/emqx_gateway_api.hocon index 1e0e22456..d712a054d 100644 --- a/rel/i18n/emqx_gateway_api.hocon +++ b/rel/i18n/emqx_gateway_api.hocon @@ -38,7 +38,7 @@ gateway_name.desc: gateway_name_in_qs.desc: """Gateway Name.
-It's enum with `stomp`, `mqttsn`, `coap`, `lwm2m`, `exproto`""" +It's enum with `stomp`, `mqttsn`, `coap`, `lwm2m`, `exproto`, `gbt32960`""" gateway_node_status.desc: """The status of the gateway on each node in the cluster""" diff --git a/rel/i18n/emqx_gbt32960_schema.hocon b/rel/i18n/emqx_gbt32960_schema.hocon new file mode 100644 index 000000000..0827a05c4 --- /dev/null +++ b/rel/i18n/emqx_gbt32960_schema.hocon @@ -0,0 +1,12 @@ +emqx_gbt32960_schema { + +retry_interval.desc: +"""Re-send time interval""" + +max_retry_times.desc: +"""Re-send max times""" + +message_queue_len.desc: +"""Max message queue length""" + +}