Merge pull request #11883 from JimMoen/feat-gw/jt808

Feat gw/jt808
This commit is contained in:
JimMoen 2023-11-13 09:54:43 +08:00 committed by GitHub
commit 8e409fa898
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 7179 additions and 54 deletions

View File

@ -511,13 +511,7 @@ peercert() ->
conn_mod() ->
oneof([
emqx_connection,
emqx_ws_connection,
emqx_coap_mqtt_adapter,
emqx_sn_gateway,
emqx_lwm2m_protocol,
emqx_gbt32960_conn,
emqx_jt808_connection,
emqx_tcp_connection
emqx_ws_connection
]).
proto_name() ->

View File

@ -209,9 +209,6 @@ t_match_fast_forward(Config) ->
M:insert(<<"a/b/1/2/3/4/5/6/7/8/9/#">>, id1, <<>>, Tab),
M:insert(<<"z/y/x/+/+">>, id2, <<>>, Tab),
M:insert(<<"a/b/c/+">>, id3, <<>>, Tab),
% dbg:tracer(),
% dbg:p(all, c),
% dbg:tpl({ets, next, '_'}, x),
?assertEqual(id1, id(match(M, <<"a/b/1/2/3/4/5/6/7/8/9/0">>, Tab))),
?assertEqual([id1], [id(X) || X <- matches(M, <<"a/b/1/2/3/4/5/6/7/8/9/0">>, Tab)]).

View File

@ -381,7 +381,8 @@ fields(Gw) when
Gw == lwm2m;
Gw == exproto;
Gw == gbt32960;
Gw == ocpp
Gw == ocpp;
Gw == jt808
->
[{name, mk(Gw, #{desc => ?DESC(gateway_name)})}] ++
convert_listener_struct(emqx_gateway_schema:gateway_schema(Gw));
@ -392,7 +393,8 @@ fields(Gw) when
Gw == update_lwm2m;
Gw == update_exproto;
Gw == update_gbt32960;
Gw == update_ocpp
Gw == update_ocpp;
Gw == update_jt808
->
"update_" ++ GwStr = atom_to_list(Gw),
Gw1 = list_to_existing_atom(GwStr),

View File

@ -23,7 +23,7 @@
-behaviour(gen_server).
-include("include/emqx_gateway.hrl").
-include("emqx_gateway.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").

View File

@ -17,7 +17,7 @@
%% @doc The gateway connection registry
-module(emqx_gateway_cm_registry).
-include("include/emqx_gateway.hrl").
-include("emqx_gateway.hrl").
-behaviour(gen_server).

View File

@ -17,7 +17,7 @@
%% @doc The gateway instance context
-module(emqx_gateway_ctx).
-include("include/emqx_gateway.hrl").
-include("emqx_gateway.hrl").
%% @doc The running context for a Connection/Channel process.
%%

View File

@ -23,7 +23,7 @@
-behaviour(supervisor).
-include("include/emqx_gateway.hrl").
-include("emqx_gateway.hrl").
-export([start_link/1]).

View File

@ -17,7 +17,7 @@
%% @doc Gateway Interface Module for HTTP-APIs
-module(emqx_gateway_http).
-include("include/emqx_gateway.hrl").
-include("emqx_gateway.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_auth/include/emqx_authn_chains.hrl").

View File

@ -19,7 +19,7 @@
-behaviour(gen_server).
-include("include/emqx_gateway.hrl").
-include("emqx_gateway.hrl").
-include_lib("emqx/include/logger.hrl").
%% APIs

View File

@ -17,7 +17,7 @@
%% @doc The Registry Centre of Gateway
-module(emqx_gateway_registry).
-include("include/emqx_gateway.hrl").
-include("emqx_gateway.hrl").
-behaviour(gen_server).

View File

@ -18,7 +18,7 @@
-behaviour(supervisor).
-include("include/emqx_gateway.hrl").
-include("emqx_gateway.hrl").
-export([start_link/0]).

View File

@ -5,7 +5,7 @@ with [Publish-Subscribe Broker for the CoAP](https://datatracker.ietf.org/doc/ht
## Quick Start
In EMQX 5.0, CoAP gateways can be configured and enabled through the Dashboard.
In EMQX 5.0, CoAP gateway can be configured and enabled through the Dashboard.
It can also be enabled via the HTTP API or emqx.conf, e.g. In emqx.conf:

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [
{emqx, {path, "../emqx"}},

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_coap, [
{description, "CoAP Gateway"},
{vsn, "0.1.4"},

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [
{emqx, {path, "../emqx"}},

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_exproto, [
{description, "ExProto Gateway"},
{vsn, "0.1.4"},

View File

@ -4,6 +4,8 @@
-record(frame, {cmd, ack, vin, encrypt, length, data, check, rawdata}).
-type frame() :: #frame{}.
-define(CMD(CmdType), #frame{
cmd = CmdType,
ack = ?ACK_IS_CMD
@ -73,3 +75,6 @@
% 0x0A~0x2F: Customized data for Platform Exchange Protocol
% 0x30~0x7F: Reserved
% 0x80~0xFE: Customized by user
-define(DEFAULT_MOUNTPOINT, <<"gbt32960/${clientid}/">>).
-define(DEFAULT_DOWNLINK_TOPIC, <<"dnstream">>).

View File

@ -1,6 +1,7 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [
{emqx, {path, "../../apps/emqx"}},
{emqx_utils, {path, "../emqx_utils"}},
{emqx_gateway, {path, "../../apps/emqx_gateway"}}
{emqx, {path, "../../apps/emqx"}},
{emqx_utils, {path, "../emqx_utils"}},
{emqx_gateway, {path, "../../apps/emqx_gateway"}}
]}.

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_gbt32960, [
{description, "GBT32960 Gateway"},
{vsn, "0.1.0"},

View File

@ -67,7 +67,6 @@
| {close, Reason :: atom()}.
-type replies() :: reply() | [reply()].
-type frame() :: emqx_gbt32960_frame:frame().
-define(TIMER_TABLE, #{
alive_timer => keepalive,
@ -75,8 +74,6 @@
}).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
-define(DEFAULT_MOUNTPOINT, <<"gbt32960/${clientid}">>).
-define(DEFAULT_DOWNLINK_TOPIC, <<"/dnstream">>).
-dialyzer({nowarn_function, init/2}).
@ -203,7 +200,7 @@ setting_peercert_infos(Peercert, ClientInfo) ->
%%--------------------------------------------------------------------
%% Handle incoming packet
%%--------------------------------------------------------------------
-spec handle_in(emqx_gbt32960_frame:frame() | {frame_error, any()}, channel()) ->
-spec handle_in(frame() | {frame_error, any()}, channel()) ->
{ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}
@ -703,14 +700,14 @@ upstreaming(
transform(Frame = ?CMD(Cmd), Mountpoint) ->
Suffix =
case Cmd of
?CMD_VIHECLE_LOGIN -> <<"/upstream/vlogin">>;
?CMD_INFO_REPORT -> <<"/upstream/info">>;
?CMD_INFO_RE_REPORT -> <<"/upstream/reinfo">>;
?CMD_VIHECLE_LOGOUT -> <<"/upstream/vlogout">>;
?CMD_PLATFORM_LOGIN -> <<"/upstream/plogin">>;
?CMD_PLATFORM_LOGOUT -> <<"/upstream/plogout">>;
?CMD_VIHECLE_LOGIN -> <<"upstream/vlogin">>;
?CMD_INFO_REPORT -> <<"upstream/info">>;
?CMD_INFO_RE_REPORT -> <<"upstream/reinfo">>;
?CMD_VIHECLE_LOGOUT -> <<"upstream/vlogout">>;
?CMD_PLATFORM_LOGIN -> <<"upstream/plogin">>;
?CMD_PLATFORM_LOGOUT -> <<"upstream/plogout">>;
%CMD_HEARTBEAT, CMD_SCHOOL_TIME ...
_ -> <<"/upstream/transparent">>
_ -> <<"upstream/transparent">>
end,
Topic = emqx_mountpoint:mount(Mountpoint, Suffix),
Payload = to_json(Frame),
@ -718,7 +715,7 @@ transform(Frame = ?CMD(Cmd), Mountpoint) ->
transform(Frame = #frame{ack = Ack}, Mountpoint) when
?IS_ACK_CODE(Ack)
->
Topic = emqx_mountpoint:mount(Mountpoint, <<"/upstream/response">>),
Topic = emqx_mountpoint:mount(Mountpoint, <<"upstream/response">>),
Payload = to_json(Frame),
{Topic, Payload}.

View File

@ -62,7 +62,7 @@ serialize_opts() ->
parse(Bin, State) ->
case enter_parse(Bin, State) of
{ok, Message, Rest} ->
{ok, Message, Rest, State#{parse => search_heading}};
{ok, Message, Rest, State#{data => <<>>, phase => search_heading}};
{error, Error} ->
{error, Error};
{more_data_follow, Partial} ->

View File

@ -4,11 +4,10 @@
-module(emqx_gbt32960_schema).
-include("emqx_gbt32960.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-define(DEFAULT_MOUNTPOINT, <<"gbt32960/${clientid}">>).
%% config schema provides
-export([fields/1, desc/1]).

View File

@ -29,7 +29,8 @@
"}\n"
>>).
all() -> emqx_common_test_helpers:all(?MODULE).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
application:load(emqx_gateway_gbt32960),

27
apps/emqx_gateway_jt808/.gitignore vendored Normal file
View File

@ -0,0 +1,27 @@
.eunit
deps
*.o
*.beam
*.plt
erl_crash.dump
ebin/*
rel/example_project
.concrete/DEV_MODE
.rebar
.erlang.mk/
emqx_connect_jt808.d
data/
!data/app.config
.idea/
*.iml
emqx_gateway_jt808.d
logs/
cover/
ct.coverdata
eunit.coverdata
test/ct.cover.spec
_build/
etc/emqx_gateway_jt808.conf.rendered
*.swp
rebar.lock
.rebar3/

View File

@ -0,0 +1,94 @@
Business Source License 1.1
Licensor: Hangzhou EMQ Technologies Co., Ltd.
Licensed Work: EMQX Enterprise Edition
The Licensed Work is (c) 2023
Hangzhou EMQ Technologies Co., Ltd.
Additional Use Grant: Students and educators are granted right to copy,
modify, and create derivative work for research
or education.
Change Date: 2027-02-01
Change License: Apache License, Version 2.0
For information about alternative licensing arrangements for the Software,
please contact Licensor: https://www.emqx.com/en/contact
Notice
The Business Source License (this document, or the “License”) is not an Open
Source license. However, the Licensed Work will eventually be made available
under an Open Source License, as stated in this License.
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
“Business Source License” is a trademark of MariaDB Corporation Ab.
-----------------------------------------------------------------------------
Business Source License 1.1
Terms
The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited
production use.
Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.
If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.
All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.
You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.
Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.
This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.
MariaDB hereby grants you permission to use this Licenses text to license
your works, and to refer to it using the trademark “Business Source License”,
as long as you comply with the Covenants of Licensor below.
Covenants of Licensor
In consideration of the right to use this Licenses text and the “Business
Source License” name and trademark, Licensor covenants to MariaDB, and to all
other recipients of the licensed work to be provided by Licensor:
1. To specify as the Change License the GPL Version 2.0 or any later version,
or a license that is compatible with GPL Version 2.0 or a later version,
where “compatible” means that software provided under the Change License can
be included in a program with software provided under GPL Version 2.0 or a
later version. Licensor may specify additional Change Licenses without
limitation.
2. To either: (a) specify an additional grant of rights to use that does not
impose any additional restriction on the right granted in this License, as
the Additional Use Grant; or (b) insert the text “None”.
3. To specify a Change Date.
4. Not to modify this License in any other way.

View File

@ -0,0 +1,26 @@
# emqx_jt808
The JT/T 808 protocol is designed for data interaction between vehicle-mounted satellite terminals and IoT platforms.
The **JT/T 808 Gateway** in EMQX can accept JT/T 808 clients and translate their events
and messages into MQTT Publish messages.
In the current implementation, it has the following limitations:
- Only supports JT/T 808-2013 protocol, JT/T 808-2019 is not supported yet.
- Based TCP/TLS transport.
- Third-party authentication/registration http service required.
## Quick Start
In EMQX 5.0, JT/T 808 gateway can be configured and enabled through the Dashboard.
It can also be enabled via the HTTP API, and emqx.conf e.g, In emqx.conf:
```
```
> Note:
> Configuring the gateway via emqx.conf requires changes on a per-node basis,
> but configuring it via Dashboard or the HTTP API will take effect across the cluster.
More documentations: [JT/T 808 Gateway](https://www.emqx.io/docs/en/v5.0/gateway/jt808.html)

View File

@ -0,0 +1,840 @@
# emqx-jt808
JT/T 808 2013 协议接入网关
该文档定义了 Plugins **emqx_jt808****EMQX** 之间数据交换的格式
约定:
- Payload 采用 Json 格式进行组装
- Json Key 采用全小写格式命名
Json 结构示例
## 终端到服务器
```json
{
"header" : {
"msg_id" : 1,
"encrypt": 0,
"len": VAL,
"phone": 13900000000,
"msg_sn": 0
},
"body": {
"seq": 1,
"id": 1,
"result": 0
}
}
```
## 服务器到终端
```json
{
"header": {
"msg_id": 32769,
"encrypt": 0,
"phone": 13900000000,
"msg_sn": 0
},
"body": {
"seq": 1,
"id": 1,
"result": 0
}
}
```
## 数据类型对照表
| JT808 Defined Type | In Json Type | Comment |
|:------------------:|:------------:|:----------:|
| BYTE | integer | in decimal |
| WORD | integer | in decimal |
| DWORD | integer | in decimal |
| BYTE(n) | string | |
| BCD(n) | string | |
| STRING | string | |
## 字段对照表
### 消息头字段对照表
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 消息 ID | msg_id | word | integer |
| 数据加密方式 | encrypt | word | integer |
| 终端手机号 | phone | bcd(6) | string |
| 消息流水号 | msg_sn | word | integer |
| Optional Field | Json Key name | Value Type | Value Type in JSON |
|:--------------:|:-------------:|:----------:|:------------------:|
| 消息总包数 | frag_total | word | integer |
| 消息包序号 | frag_sn | word | integer |
- 存在 `frag_total``frag_sn` 时表示消息体为长消息,进行分包处理
### 消息体字段对照表
- 终端通用应答 `"msg_id": 1` 0x0001
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 应答流水号 | seq | word | integer |
| 应答 ID | id | word | integer |
| 结果 | result | byte | integer |
- 平台通用应答 `"msg_id": 32769` 0x8001
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 应答流水号 | seq | word | integer |
| 应答 ID | id | word | integer |
| 结果 | result | byte | integer |
- 终端心跳 `"msg_id": 2` 0x0002
空 Json
- 补传分包请求 `"msg_id": 32771` 0x8003
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:--------------:|:------------------:|
| 原始消息流水号 | seq | word | integer |
| 重传包总数 | length | byte | integer |
| 重传包 ID 列表 | ids | byte(2*length) | list of integer |
- 终端注册 `"msg_id": 256` 0x0100
| Field | Json Key name | Value Type | Value Type in Json |
|:---------:|:--------------:|:----------:|:------------------:|
| 省域 ID | province | word | integer |
| 市县域 ID | city | word | integer |
| 制造商 ID | manufacture | byte(5) | string |
| 终端型号 | model | byte(20) | string |
| 终端 ID | dev_id | byte(7) | string |
| 车牌颜色 | color | byte | integer |
| 车辆标识 | license_number | string | string |
- 终端注册应答 `"msg_id": 33024` 0x8100
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 应答流水号 | seq | word | integer |
| 结果 | result | byte | integer |
只有成功后才有此字段
| Optional Field | Json Key name | Value Type | Value Type in JSON |
| 鉴权码 | auth_code | string | string |
- 终端注销 `"msg_id": 3` 0x0003
空 Json
- 终端鉴权 `"msg_id": 258` 0x0102
| Field | Json Key name | Value Type | Value Type in Json |
|:------:|:-------------:|:----------:|:------------------:|
| 鉴权码 | code | string | string |
- 设置终端参数 `"msg_id": 33027` 0x8103
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------------------------------------------:|
| 参数总数 | length | byte | integer |
| 参数项列表 | params | list | list of id and value. `[{"id":ID, "value": VAL}, ...]` |
| 参数项 | id | dword | integer |
| 参数值 | value | byte | integer |
参数 ID 说明见协议规定.
- 查询终端参数 `"msg_id": 33028` 0x8104
空 Json
- 查询指定终端参数 `"msg_id": 33030` 0x8106
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:--------------:|:--------------------------------:|
| 参数总数 | length | byte | integer |
| 参数 ID 列表 | ids | byte(2*length) | list of id. `[1, 2, 3, 4, ...]` |
参数 ID 列表中元素为 integer
- 查询终端应答参数 `"msg_id": 260` 0x0104
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------------------------------------------:|
| 应答流水号 | seq | word | integer |
| 应答参数个数 | length | byte | integer |
| 参数项列表 | params | list | list of id and value. `[{"id":ID, "value": VAL}, ...]` |
| 参数项 | id | dword | integer |
| 参数值 | value | byte | integer |
参数 ID 说明见协议规定.
- 终端控制 `"msg_id": 33029 ` 0x8105
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 命令字 | command | byte | integer |
| 命令参数 | param | string | string |
- 查询终端属性 `"msg_id": 33031` 0x8107
空 Json
- 查询终端属性应答 `"msg_id": 263` 0x0107
| Field | Json Key name | Value Type | Value Type in Json |
|:-----------------:|:----------------:|:----------:|:------------------:|
| 终端类型 | type | word | integer |
| 制造商 ID | manufacture | byte(5) | string |
| 终端型号 | model | byte(20) | string |
| 终端 ID | id | byte(7) | string |
| 终端 SIM 卡 ICCID | iccid | byte(10) | string |
| 终端硬件版本号 | hardware_version | string | string |
| 终端硬件固件号 | firmware_version | string | string |
| GNSS 模块属性 | gnss_prop | byte | integer |
| 通信模块属性 | comm_prop | byte | integer |
-- 终端硬件版本号长度、终端固件版本号长度,将被用于二进制报文解析,不向上暴露
- 下发终端升级包 `"msg_id": 33032` 0x8108
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:----------:|:----------------------:|
| 升级类型 | type | byte | integer |
| 制造商 ID | manufacturer | byte(5) | string |
| 版本号长度 | ver_len | byte | integer |
| 版本号 | version | string | string |
| 升级数据包长度 | fw_len | dword | integer |
| 升级数据包 | firmware | binary | string(base64 encoded) |
- 终端升级结果通知 `"msg_id": 264` 0x0108
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 升级类型 | type | byte | integer |
| 升级结果 | result | byte | integer |
- 位置信息汇报 `"msg_id": 512` 0x0200
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------------:|:-------------:|:----------:|:------------------:|
| 报警标志 | alarm | dword | integer |
| 状态 | status | dword | integer |
| 纬度 | latitude | dword | integer |
| 经度 | longitude | dword | integer |
| 高程 | altitude | word | integer |
| 速度 | speed | word | integer |
| 方向 | direction | word | integer |
| 时间 | time | bcd(6) | string |
| Optional Field | Json Key name | Value Type | Value Type in JSON |
| 位置附加信息项列表 | extra | - | map |
%% TODO: refine alarm mroe details
位置附加信息项列表, 在 `extra`
| Field (附加信息描述) | Json Key name | Value Type | Value Type in Json |
|:---------------------------------:|:---------------:|:----------:|:----------------------:|
| 里程 | mileage | dword | integer |
| 油量 | fuel_meter | word | integer |
| 行驶记录功能获取的速度 | speed | word | integer |
| 需要人工确认报警事件的 ID | alarm_id | word | integer |
| 超速报警附加信息(长度1或5) | overspeed_alarm | - | map |
| 进出区域/路线报警附加信息 | in_out_alarm | - | map |
| 路段行驶时间不足/过长报警附加信息 | path_time_alarm | - | map |
| 扩展车辆信号状态位 | 见状态位附表 | - | - |
| IO 状态位 | io_status | - | map |
| 模拟量 | analog | - | map |
| 无线通信网络信号强度 | rssi | byte | integer |
| GNSS 定位卫星数 | gnss_sat_num | byte | integer |
| 后续自定义信息长度 | custome | - | string(base64 encoded) |
| %% TODO 自定义区域 | | | |
超速报警附加信息(长度1或5), 置于 map `overspeed_alarm`
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 位置类型 | type | byte | integer |
| Optional Field | Json Key name | Value Type | Value Type in JSON |
| 区域或路段 ID | id | dword | integer |
进出区域/路线报警附加信息, 置于 map `in_out_alarm`
| Field | Json Key name | Value Type | Value Type in Json |
|:-------------:|:-------------:|:----------:|:------------------:|
| 位置类型 | type | byte | integer |
| 区域或路段 ID | id | dword | integer |
| 方向 | direction | byte | integer |
路段行驶时间不足/过长报警附加信息, 置于 map `path_time_alarm`
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 路段 ID | id | dword | integer |
| 路段行驶时间 | time | word | integer |
| 结果 | result | byte | integer |
IO 状态位, 置于 map `io_status`
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 深度休眠状态 | deep_sleep | 1 bit | integer |
| 休眠状态 | sleep | 1 bit | integer |
模拟量, 置于 map `analog`
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 模拟量 0 | ad0 | 16 bits | integer |
| 模拟量 1 | ad1 | 16 bits | integer |
扩展车辆信号状态位, 置于 map `extra`
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:---------------:|:----------:|:------------------------------------------:|
| 信号 | signal | - 2 bits | map, `{"low_beam": VAL, "high_beam": VAL}` |
| 右转向灯信号 | right_turn | 1 bit | integer |
| 左转向灯信号 | left_turn | 1 bit | integer |
| 制动信号 | brake | 1 bit | integer |
| 倒档信号 | reverse | 1 bit | integer |
| 雾灯信号 | fog | 1 bit | integer |
| 示廓灯 | side_marker | 1 bit | integer |
| 喇叭状态 | horn | 1 bit | integer |
| 空调状态 | air_conditioner | 1 bit | integer |
| 空档信号 | neutral | 1 bit | integer |
| 缓速器工作 | retarder | 1 bit | integer |
| ABS 工作 | abs | 1 bit | integer |
| 加热器工作 | heater | 1 bit | integer |
| 离合器状态 | cluth | 1 bit | integer |
信号状态, 置于 map `signal`
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 近光灯信号 | low_beam | 1 bit | integer |
| 远光灯信号 | high_beam | 1 bit | integer |
example:
```
{
"header" : {
"msg_id" : 1,
"encrypt": 0,
"len": VAL,
"phone": 13900000000,
"msg_sn": 0
},
"body": {
"alarm": VAL,
"status": VAL,
"latitude": VAL,
"longitude": VAL,
"altitude": VAL,
"speed": VAL,
"direction": VAL,
"time": VAL,
"extra": {
"mileage": VAL,
"fuel_unit": VAL,
"speed": VAL,
"alarm_id": VAL,
"overspeed_alarm": {
"type": VAL,
"id": VAL
},
"in_out_alarm": {
"type": VAL,
"id": VAL,
"direction": VAL
},
"path_time_alarm": {
"id": VAL,
"time": VAL,
"result": VAL
},
"signal": {
"low_beam": VAL,
"high_beam": VAL
},
"right_turn": VAL,
"left_turn": VAL,
"break": VAL,
"reverse": VAL,
"fog": VAL,
"side_marker": VAL,
"horn": VAL,
"air_conditioner": VAL,
"neutral": VAL,
"retarder": VAL,
"abs": VAL,
"heater": VAL,
"cluth": VAL,
"io_status": {
"deep_sleep": VAL,
"sleep": VAL
},
"analog": {
"ad0": VAL,
"ad1": VAL
}
}
}
}
```
- 位置信息查询 `"msg_id": 33281` 0x8201
空 Json
- 位置信息查询应答 `"msg_id": 513` 0x0201
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 应答流水号 | seq | word | integer |
| 位置信息汇报 | params | - | map |
- 临时位置跟踪控制 `"msg_id": 33282` 0x8202
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:----------:|:------------------:|
| 时间间隔 | period | word | integer |
| 跟踪位置有效期 | expiry | dword | integer |
- 人工确认报警消息 `"msg_id": 33283` 0x8203
| Field | Json Key name | Value Type | Value Type in Json |
|:----------------:|:-------------:|:----------:|:------------------:|
| 报警消息流水号 | seq | word | integer |
| 人工确认报警类型 | type | dword | integer |
- 文本信息下发 `"msg_id": 33536` 0x8300
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 标志 | flag | byte | integer |
| 文本信息 | text | string | string |
- 事件设置 `"msg_id": 33537` 0x8301
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:-----------------------------------------------------------------:|
| 设置类型 | type | byte | integer |
| 设置总数 | length | byte | integer |
| 事件项列表 | events | list | list of event. `[{"id": ID, "length": LEN, "content": CON}, ...]` |
| 事件 ID | id | byte | integer |
| 事件内容长度 | length | byte | integer |
| 事件内容 | content | string | string |
- 事件报告 `"msg_id": 769` 0x0301
| Field | Json Key name | Value Type | Value Type in Json |
|:-------:|:-------------:|------------|:------------------:|
| 事件 ID | id | byte | integer |
- 提问下发 `"msg_id": 33538` 0x8302
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:---------------------------------------------------------------:|
| 标志 | flag | byte | integer |
| 问题内容长度 | length | byte | integer |
| 问题 | question | string | string |
| 候选答案列表 | answers | list | list of answer. `[{"id": ID, "len": LEN, "answer": ANS}, ...]` |
| 答案 ID | id | byte | integer |
| 答案内容长度 | len | byte | integer |
| 答案内容 | answer | string | string |
%% TODO: len -> length or other length -> len
- 提问应答 `"msg_id": 770` 0x0302
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 应答流水号 | seq | word | integer |
| 答案 ID | id | byte | integer |
- 信息点播菜单设置 `"msg_id": 33539` 0x8303
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 设置类型 | type | byte | integer |
| 信息项总数 | length | byte | integer |
| 信息项列表 | menus | list | list of menu |
| 信息类型 | type | byte | integer |
| 信息名称长度 | length | word | integer |
| 信息名称 | info | string | string |
- 信息点播/取消 `"msg_id": 771` 0x0303
| Field | Json Key name | Value Type | Value Type in Json |
|:-------------:|:-------------:|:----------:|:------------------:|
| 信息类型 | id | byte | integer |
| 点拨/取消标志 | flag | byte | integer |
- 信息服务 `"msg_id": 33540` 0x8304
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 信息类型 | type | byte | integer |
| 信息长度 | length | word | integer |
| 信息内容 | info | string | string |
- 电话回拨 `"msg_id": 33792` 0x8400
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 标志 | type | byte | integer |
| 电话号码 | phone | string | string |
- `"msg_id": 33793` 0x8401
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 设置类型 | type | byte | integer |
| 联系人总数 | length | byte | integer |
| 联系人项 | contacts | list | list of contact. |
| 标志 | type | byte | integer |
| 号码长度 | phone_len | byte | integer |
| 电话号码 | phone | string | string |
| 联系人长度 | name_len | byte | integer |
| 联系人 | name | string | string |
联系人项示例
`[{"type": TYPE, "phone_len", PH_LEN, "phone": PHONE, "name_len": NAME_LEN, "name": NAME}, ...]`
- `"msg_id": 34048` 0x8500
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 标志控制 | flag | byte | integer |
- `"msg_id": 1280` 0x0500
| Field | Json Key name | Value Type | Value Type in Json |
|:------------------:|:-------------:|:----------:|:------------------:|
| 应答流水号 | seq | word | integer |
| 位置信息汇报消息体 | location | map | map of location |
- `"msg_id": 34304` 0x8600
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:------------------:|:----------:|:------------------:|
| 设置属性 | type | byte | integer |
| 区域总数 | length | byte | integer |
| 区域项 | areas | list | list of area. |
| 区域 ID | id | dword | integer |
| 区域属性 | flag | dword | integer |
| 中心点纬度 | center_latitude | dword | integer |
| 中心点经度 | center_longitude | dword | integer |
| 半径 | radius | dword | integer |
| 起始时间 | start_time | string | string |
| 结束时间 | end_time | string | string |
| 最高速度 | max_speed | word | integer |
| 超速持续时间 | overspeed_duration | byte | integer |
区域列表示例
`[{"id": ID,
"flag": FLAG,
"center_latitude": CEN_LAT,
"center_longitude": CEN_LON,
"radius": RADIUS,
"start_time": START_TIME,
"end_time": END_TIME,
"max_speed", MAX_SPEED,
"overspeed_duration", OVERSPEED_DURATION
},
...
]`
- 删除圆形区域 `"msg_id": 34305` 0x8601
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 区域数 | length | byte | integer |
| 区域 ID 列表 | ids | list | list of id. |
| 区域 ID 1~n | - | dword | integer |
`[ID1, ID2, ...]`
- 设置矩形区域 `"msg_id": 34306` 0x8602
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:------------------:|:----------:|:------------------------:|
| 设置属性 | type | byte | integer |
| 区域总数 | length | byte | integer |
| 区域项 | areas | list | list of rectangle area. |
| 区域 ID | id | dword | integer |
| 区域属性 | flag | dword | integer |
| 左上点纬度 | lt_lat | dword | integer |
| 左上点经度 | lt_lng | dword | integer |
| 右下点纬度 | rb_lat | dword | integer |
| 右下点经度 | rb_lng | dword | integer |
| 起始时间 | start_time | string | string |
| 结束时间 | end_time | string | string |
| 最高速度 | max_speed | word | integer |
| 超速持续时间 | overspeed_duration | byte | integer |
- 删除矩形区域 `"msg_id": 34307` 0x8603
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 区域数 | length | byte | integer |
| 区域 ID 列表 | ids | list | list of id. |
| 区域 ID 1~n | - | dword | integer |
- 设置多边形区域 `"msg_id": 34308` 0x8604
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:------------------:|:----------:|:------------------:|
| 区域 ID | id | dword | integer |
| 区域属性 | flag | dword | integer |
| 起始时间 | start_time | string | string |
| 结束时间 | end_time | string | string |
| 最高速度 | max_speed | word | integer |
| 超速持续时间 | overspeed_duration | byte | integer |
| 区域总顶点数 | length | word | integer |
| 顶点项列表 | points | list | list of point. |
| 顶点纬度 | lat | dword | integer |
| 顶点经度 | lng | dword | integer |
- 删除多边形区域 `"msg_id": 34309` 0x8605
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:------------------:|
| 区域数 | length | byte | integer |
| 区域 ID 列表 | ids | list | list of id. |
| 区域 ID 1~n | - | dword | integer |
- 设置路线 `"msg_id": 34310` 0x8606
| Field | Json Key name | Value Type | Value Type in Json |
|:----------------:|:------------------:|:----------:|:------------------:|
| 路线 ID | id | dword | integer |
| 路线属性 | flag | word | integer |
| 起始时间 | start_time | string | string |
| 结束时间 | end_time | string | string |
| 路线总拐点数 | length | word | integer |
| 拐点项 | points | list | list of point. |
| 拐点 ID | point_id | dword | integer |
| 路段 ID | path_id | dword | integer |
| 拐点纬度 | point_lat | dword | integer |
| 拐点经度 | point_lng | dword | integer |
| 路段宽度 | width | byte | integer |
| 路段属性 | attrib | byte | integer |
| 路段行驶过长阈值 | passed | word | integer |
| 路段行驶不足阈值 | uncovered | word | integer |
| 路段最高速度 | max_speed | word | integer |
| 路段超速持续时间 | overspeed_duration | byte | integer |
- `"msg_id": 34311` 0x8607
| Field | Json Key name | Value Type | Value Type in Json |
|:--------:|:-------------:|:----------:|:------------------:|
| 路线数 | length | byte | integer |
| 路线列表 | ids | list | list of id |
| 路线 ID | - | dword | integer |
- 行驶记录数据采集命令 `"msg_id": 34560` 0x8700
| Field | Json Key name | Value Type | Value Type in Json |
|:------:|:-------------:|:----------------------:|:------------------:|
| 命令字 | command | byte | integer |
| 数据块 | param | string(base64 encoded) | string |
- 行驶记录数据上传 `"msg_id": 1792` 0x0700
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------------------:|:------------------:|
| 应答流水号 | seq | word | integer |
| 命令字 | command | byte | integer |
| 数据块 | data | string(base64 encoded) | string |
- 行驶记录参数下传命令 `"msg_id": 34561` 0x8701
| Field | Json Key name | Value Type | Value Type in Json |
|:------:|:-------------:|:----------------------:|:------------------:|
| 命令字 | command | byte | integer |
| 数据块 | param | string(base64 encoded) | string |
- 电子运单上报 `"msg_id": 1793` 0x0701
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------------------:|:------------------:|
| 电子运单长度 | length | dword | integer |
| 电子运单内容 | data | string(base64 encoded) | string |
- 上报驾驶员身份信息请求 `"msg_id": 34562` 0x8702
空 Json
- 驾驶员身份信息采集上报 `"msg_id": 1794` 0x0702
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:----------:|:------------------:|
| 状态 | status | byte | integer |
| 时间 | time | string | string |
| IC 卡读取结果 | ic_result | byte | integer |
| 驾驶员姓名 | driver_name | string | string |
| 从业资格证编码 | certificate | string | string |
| 发证机构名称 | organization | string | string |
| 证件有效期 | cert_expiry | string | string |
- 定位数据批量上传 `"msg_id": 1796` 0x0704
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:----------:|:------------------:|
| 位置数据类型 | type | byte | integer |
| 数据项个数 | length | word | integer |
| 位置汇报数据项 | location | list | list of location |
- `"msg_id": 1797` 0x0705
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------------:|:-------------:|:----------:|:----------------------:|
| 数据项个数 | length | word | integer |
| CAN 总线数据接收时间 | time | bcd(5) | integer |
| CAN 总线数据项 | can_data | list | list of can data. |
| CAN 总线通道号 | channel | 1 bit | integer |
| 帧类型 | frame_type | 1 bit | integer |
| 数据采集方式 | data_method | 1 bit | integer |
| CAN 总线 ID | id | 29 bits | integer |
| CAN 数据 | data | binary | string(base64 encoded) |
- 多媒体时间信息上传 `"msg_id": 2048` 0x0800
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:----------:|:------------------:|
| 多媒体数据 ID | id | dword | integer |
| 多媒体类型 | type | byte | integer |
| 多媒体编码格式 | format | byte | integer |
| 事件项编码 | event | byte | integer |
| 通道 ID | channel | byte | integer |
- 多媒体数据上传 `"msg_id": 2049` 0x0801
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:----------:|:----------------------:|
| 多媒体 ID | id | dword | integer |
| 多媒体类型 | type | byte | integer |
| 多媒体编码格式 | format | byte | integer |
| 事件项编码 | event | byte | integer |
| 通道 ID | channel | byte | integer |
| 位置信息汇报 | location | byte(28) | map |
| 多媒体数据包 | multimedia | binary | string(base64 encoded) |
- 多媒体数据上传应答 `"msg_id": 34816` 0x8800
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:----------:|:------------------:|
| 多媒体 ID | mm_id | dword | integer |
| 重传包总数 | length | byte | integer |
| 重传包 ID 列表 | retx_ids | list | list of retry IDs |
- 摄像头立即拍摄命令 `"msg_id": 34817` 0x8801
| Field | Json Key name | Value Type | Value Type in Json |
|:-----------------:|:-------------:|:----------:|:------------------:|
| 通道 ID | channel_id | byte | integer |
| 拍摄命令 | command | word | integer |
| 拍照间隔/录像时间 | period | word | integer |
| 保存标志 | save | byte | integer |
| 分辨率 | resolution | byte | integer |
| 图像/视频质量 | quality | byte | integer |
| 亮度 | bright | byte | integer |
| 对比度 | contrast | byte | integer |
| 饱和度 | saturate | byte | integer |
| 色度 | chromaticity | byte | integer |
- 摄像头立即拍摄应答 `"msg_id": 2053` 0x0805
| Field | Json Key name | Value Type | Value Type in Json |
|:--------------:|:-------------:|:--------------:|:------------------:|
| 应答流水号 | seq | word | integer |
| 结果 | result | byte | integer |
| 多媒体 ID 个数 | length | word | integer |
| 多媒体 ID 列表 | ids | byte(4*length) | integer |
- 存储多媒体数据检索 `"msg_id": 34818` 0x8802
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 多媒体类型 | | byte | |
| 通道 ID | | byte | |
| 事件项编码 | | byte | |
| 起始时间 | | string | |
| 结束时间 | | string | |
- 存储多媒体数据检索应答 `"msg_id": 2050` 0x0802
| Field | Json Key name | Value Type | Value Type in Json |
|:----------------:|:-------------:|:----------:|:---------------------:|
| 应答流水号 | seq | word | integer |
| 多媒体数据项总数 | length | word | integer |
| 检索项 | result | list | list of search result |
| 多媒体 ID | id | dword | integer |
| 多媒体类型 | type | byte | integer |
| 通道 ID | channel | byte | integer |
| 事件项编码 | event | byte | integer |
| 位置信息汇报 | location | byte(28) | map |
- 存储多媒体数据上传命令 `"msg_id": 34819` 0x8803
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 多媒体类型 | type | byte | integer |
| 通道 ID | channel | byte | integer |
| 事件项编码 | event | byte | integer |
| 起始时间 | start_time | string | string |
| 结束时间 | end_time | string | string |
| 删除标志 | delete | byte | integer |
- 录音开始命令 `"msg_id": 34820` 0x8804
| Field | Json Key name | Value Type | Value Type in Json |
|:----------:|:-------------:|:----------:|:------------------:|
| 录音命令 | command | byte | integer |
| 录音时间 | time | word | integer |
| 保存标志 | save | byte | integer |
| 音频采样率 | rate | byte | integer |
- 单条存储多媒体j叔叔检索上传命令 `"msg_id": 34821` 0x8805
| Field | Json Key name | Value Type | Value Type in Json |
|:---------:|:-------------:|:----------:|:------------------:|
| 多媒体 ID | id | dword | integer |
| 删除标志 | flag | byte | integer |
- 数据下行透传 `"msg_id": 35072` 0x8900
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:----------------------:|
| 透传消息类型 | type | byte | integer |
| 透传消息内容 | data | binary | string(base64 encoded) |
- 数据上行透传 `"msg_id": 2304` 0x0900
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:----------------------:|
| 透传消息类型 | type | byte | integer |
| 透传消息内容 | data | binary | string(base64 encoded) |
- 数据压缩上报 `"msg_id": 2305` 0x0901
| Field | Json Key name | Value Type | Value Type in Json |
|:------------:|:-------------:|:----------:|:----------------------:|
| 压缩消息长度 | length | dword | integer |
| 压缩消息体 | data | binary | string(base64 encoded) |
- 平台 RSA 公钥 `"msg_id": 35328` 0x8A00
| Field | Json Key name | Value Type | Value Type in Json |
|:-----:|:-------------:|:----------:|:----------------------:|
| e | e | dword | integer |
| n | n | byte(128) | string(base64 encoded) |
- 终端 RSA 公钥 `"msg_id": 2560` 0x0A00
| Field | Json Key name | Value Type | Value Type in Json |
|:-----:|:-------------:|:----------:|:----------------------:|
| e | e | dword | integer |
| n | n | byte(128) | string(base64 encoded) |
- 0x8F00 ~ 0x8FFF
- 0x0F00 ~ 0x0FFF

View File

@ -0,0 +1,195 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-ifndef(EMQX_JT808_HRL).
-define(EMQX_JT808_HRL, true).
%%--------------------------------------------------------------------
%% Message Ids
%%--------------------------------------------------------------------
%% Message Ids of client to server
-define(MC_GENERAL_RESPONSE, 16#0001).
-define(MC_HEARTBEAT, 16#0002).
-define(MC_REGISTER, 16#0100).
-define(MC_DEREGISTER, 16#0003).
-define(MC_AUTH, 16#0102).
-define(MC_QUERY_PARAM_ACK, 16#0104).
-define(MC_QUERY_ATTRIB_ACK, 16#0107).
-define(MC_OTA_ACK, 16#0108).
-define(MC_LOCATION_REPORT, 16#0200).
-define(MC_QUERY_LOCATION_ACK, 16#0201).
-define(MC_EVENT_REPORT, 16#0301).
-define(MC_QUESTION_ACK, 16#0302).
-define(MC_INFO_REQ_CANCEL, 16#0303).
-define(MC_VEHICLE_CTRL_ACK, 16#0500).
-define(MC_DRIVE_RECORD_REPORT, 16#0700).
-define(MC_WAYBILL_REPORT, 16#0701).
-define(MC_DRIVER_ID_REPORT, 16#0702).
-define(MC_BULK_LOCATION_REPORT, 16#0704).
-define(MC_CAN_BUS_REPORT, 16#0705).
-define(MC_MULTIMEDIA_EVENT_REPORT, 16#0800).
-define(MC_MULTIMEDIA_DATA_REPORT, 16#0801).
-define(MC_CAMERA_SHOT_ACK, 16#0805).
-define(MC_MM_DATA_SEARCH_ACK, 16#0802).
-define(MC_SEND_TRANSPARENT_DATA, 16#0900).
-define(MC_SEND_ZIP_DATA, 16#0901).
-define(MC_RSA_KEY, 16#0A00).
%% Message Ids of server to client
-define(MS_GENERAL_RESPONSE, 16#8001).
-define(MS_REQUEST_FRAGMENT, 16#8003).
-define(MS_REGISTER_ACK, 16#8100).
-define(MS_SET_CLIENT_PARAM, 16#8103).
-define(MS_QUERY_CLIENT_ALL_PARAM, 16#8104).
-define(MS_CLIENT_CONTROL, 16#8105).
-define(MS_QUERY_CLIENT_PARAM, 16#8106).
-define(MS_QUERY_CLIENT_ATTRIB, 16#8107).
-define(MS_OTA, 16#8108).
-define(MS_QUERY_LOCATION, 16#8201).
-define(MS_TRACE_LOCATION, 16#8202).
-define(MS_CONFIRM_ALARM, 16#8203).
-define(MS_SEND_TEXT, 16#8300).
-define(MS_SET_EVENT, 16#8301).
-define(MS_SEND_QUESTION, 16#8302).
-define(MS_SET_MENU, 16#8303).
-define(MS_INFO_CONTENT, 16#8304).
-define(MS_PHONE_CALLBACK, 16#8400).
-define(MS_SET_PHONE_NUMBER, 16#8401).
-define(MS_VEHICLE_CONTROL, 16#8500).
-define(MS_SET_CIRCLE_AREA, 16#8600).
-define(MS_DEL_CIRCLE_AREA, 16#8601).
-define(MS_SET_RECT_AREA, 16#8602).
-define(MS_DEL_RECT_AREA, 16#8603).
-define(MS_SET_POLY_AREA, 16#8604).
-define(MS_DEL_POLY_AREA, 16#8605).
-define(MS_SET_PATH, 16#8606).
-define(MS_DEL_PATH, 16#8607).
-define(MS_DRIVE_RECORD_CAPTURE, 16#8700).
-define(MS_DRIVE_REC_PARAM_SEND, 16#8701).
-define(MS_REQ_DRIVER_ID, 16#8702).
-define(MS_MULTIMEDIA_DATA_ACK, 16#8800).
-define(MS_CAMERA_SHOT, 16#8801).
-define(MS_MM_DATA_SEARCH, 16#8802).
-define(MS_MM_DATA_UPLOAD, 16#8803).
-define(MS_VOICE_RECORD, 16#8804).
-define(MS_SINGLE_MM_DATA_CTRL, 16#8805).
-define(MS_SEND_TRANSPARENT_DATA, 16#8900).
-define(MS_RSA_KEY, 16#8A00).
%% Client Params
-define(CP_HEARTBEAT_DURATION, 16#0001).
-define(CP_TCP_TIMEOUT, 16#0002).
-define(CP_TCP_RETX, 16#0003).
-define(CP_UDP_TIMEOUT, 16#0004).
-define(CP_UDP_RETX, 16#0005).
-define(CP_SMS_TIMEOUT, 16#0006).
-define(CP_SMS_RETX, 16#0007).
-define(CP_SERVER_APN, 16#0010).
-define(CP_DIAL_USERNAME, 16#0011).
-define(CP_DIAL_PASSWORD, 16#0012).
-define(CP_SERVER_ADDRESS, 16#0013).
-define(CP_BACKUP_SERVER_APN, 16#0014).
-define(CP_BACKUP_DIAL_USERNAME, 16#0015).
-define(CP_BACKUP_DIAL_PASSWORD, 16#0016).
-define(CP_BACKUP_SERVER_ADDRESS, 16#0017).
-define(CP_SERVER_TCP_PORT, 16#0018).
-define(CP_SERVER_UDP_PORT, 16#0019).
-define(CP_IC_CARD_SERVER_ADDRESS, 16#001A).
-define(CP_IC_CARD_SERVER_TCP_PORT, 16#001B).
-define(CP_IC_CARD_SERVER_UDP_PORT, 16#001C).
-define(CP_IC_CARD_BACKUP_SERVER_ADDRESS, 16#001D).
-define(CP_POS_REPORT_POLICY, 16#0020).
-define(CP_POS_REPORT_CONTROL, 16#0021).
-define(CP_DRIVER_NLOGIN_REPORT_INTERVAL, 16#0022).
-define(CP_REPORT_INTERVAL_DURING_SLEEP, 16#0027).
-define(CP_EMERGENCY_ALARM_REPORT_INTERVAL, 16#0028).
-define(CP_DEFAULT_REPORT_INTERVAL, 16#0029).
-define(CP_DEFAULT_DISTANCE_INTERVAL, 16#002C).
-define(CP_DRIVER_NLOGIN_DISTANCE_INTERVAL, 16#002D).
-define(CP_DISTANCE_INTERVAL_DURING_SLEEP, 16#002E).
-define(CP_EMERGENCY_ALARM_DISTANCE_INTERVAL, 16#002F).
-define(CP_SET_TURN_ANGLE, 16#0030).
-define(CP_EFENCE_RADIUS, 16#0031).
-define(CP_MONITOR_PHONE, 16#0040).
-define(CP_RESETING_PHONE, 16#0041).
-define(CP_RECOVERY_PHONE, 16#0042).
-define(CP_SMS_MONITOR_PHONE, 16#0043).
-define(CP_EMERGENCY_SMS_PHONE, 16#0044).
-define(CP_ACCEPT_CALL_POLICY, 16#0045).
-define(CP_MAX_CALL_DURATION, 16#0046).
-define(CP_MAX_CALL_DURATION_OF_MONTH, 16#0047).
-define(CP_SPY_PHONE, 16#0048).
-define(CP_PRIVILEGE_SMS_PHONE, 16#0049).
-define(CP_ALARM_MASK, 16#0050).
-define(CP_ALARM_SEND_SMS_MASK, 16#0051).
-define(CP_ALARM_CAMERA_SHOT_MASK, 16#0052).
-define(CP_ALARM_PICTURE_SAVE_MASK, 16#0053).
-define(CP_ALARM_KEY_MASK, 16#0054).
-define(CP_MAX_SPEED, 16#0055).
-define(CP_OVERSPEED_ELAPSED, 16#0056).
-define(CP_CONT_DRIVE_THRESHOLD, 16#0057).
-define(CP_ACC_DRIVE_TIME_ONE_DAY_THRESHOLD, 16#0058).
-define(CP_MIN_BREAK_TIME, 16#0059).
-define(CP_MAX_PARK_TIME, 16#005A).
-define(CP_OVERSPEED_ALARM_DELTA, 16#005B).
-define(CP_DRIVER_FATIGUE_ALARM_DELTA, 16#005C).
-define(CP_SET_CRASH_ALARM_PARAM, 16#005D).
-define(CP_SET_ROLLOVER_PARAM, 16#005E).
-define(CP_TIME_CONTROLED_CAMERA, 16#0064).
-define(CP_DISTANCE_CONTROLED_CAMERA, 16#0065).
-define(CP_PICTURE_QUALITY, 16#0070).
-define(CP_PICTURE_BRIGHTNESS, 16#0071).
-define(CP_PICTURE_CONTRAST, 16#0072).
-define(CP_PICTURE_SATURATE, 16#0073).
-define(CP_PICTURE_CHROMATICITY, 16#0074).
-define(CP_ODOMETER, 16#0080).
-define(CP_REGISTERED_PROVINCE, 16#0081).
-define(CP_REGISTERED_CITY, 16#0082).
-define(CP_VEHICLE_LICENSE_NUMBER, 16#0083).
-define(CP_VEHICLE_LICENSE_PLATE_COLOR, 16#0084).
-define(CP_GNSS_MODE, 16#0090).
-define(CP_GNSS_BAUDRATE, 16#0091).
-define(CP_GNSS_OUTPUT_RATE, 16#0092).
-define(CP_GNSS_SAMPLING_RATE, 16#0093).
-define(CP_GNSS_UPLOAD_MODE, 16#0094).
-define(CP_GNSS_UPLOAD_UNIT, 16#0095).
-define(CP_CAN_BUS_CH1_SAMPLING, 16#0100).
-define(CP_CAN_BUS_CH1_UPLOAD, 16#0101).
-define(CP_CAN_BUS_CH2_SAMPLING, 16#0102).
-define(CP_CAN_BUS_CH2_UPLOAD, 16#0103).
-define(CP_SET_CAN_BUS_ID_PARAM, 16#0110).
%% Extra info types in Position Report
-define(CP_POS_EXTRA_MILEAGE, 16#01).
-define(CP_POS_EXTRA_FUEL_METER, 16#02).
-define(CP_POS_EXTRA_SPEED, 16#03).
-define(CP_POS_EXTRA_ALARM_ID, 16#04).
-define(CP_POS_EXTRA_OVERSPEED_ALARM, 16#11).
-define(CP_POS_EXTRA_IN_OUT_ALARM, 16#12).
-define(CP_POS_EXTRA_PATH_TIME_ALARM, 16#13).
-define(CP_POS_EXTRA_EXPANDED_SIGNAL, 16#25).
-define(CP_POS_EXTRA_IO_STATUS, 16#2A).
-define(CP_POS_EXTRA_ANALOG, 16#2B).
-define(CP_POS_EXTRA_RSSI, 16#30).
-define(CP_POS_EXTRA_GNSS_SAT_NUM, 16#31).
-define(CP_POS_EXTRA_CUSTOME, 16#E0).
%% Default Configs
-define(DEFAULT_MOUNTPOINT, <<"jt808/${clientid}/">>).
-define(DEFAULT_UP_TOPIC, <<?DEFAULT_MOUNTPOINT/binary, "${phone}/up">>).
-define(DEFAULT_DN_TOPIC, <<?DEFAULT_MOUNTPOINT/binary, "${phone}/dn">>).
%% Supported placeholders
-define(PH_CLIENTID, <<"${clientid}">>).
-define(PH_PHONE, <<"${phone}">>).
-record(auth, {
allow_anonymous :: boolean(),
registry :: emqx_schema:url() | undefined,
authentication :: emqx_schema:url() | undefined
}).
-type auth() :: #auth{}.
-endif.

View File

@ -0,0 +1,7 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [
{emqx, {path, "../../apps/emqx"}},
{emqx_utils, {path, "../emqx_utils"}},
{emqx_gateway, {path, "../../apps/emqx_gateway"}}
]}.

View File

@ -0,0 +1,11 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_jt808, [
{description, "JT/T 808 Gateway"},
{vsn, "0.0.1"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []},
{modules, []},
{licenses, ["BSL"]},
{links, []}
]}.

View File

@ -0,0 +1,99 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% @doc The JT/T 808 Gateway implement
-module(emqx_gateway_jt808).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
%% define a gateway named jt808
-gateway(#{
name => jt808,
callback_module => ?MODULE,
config_schema_module => emqx_jt808_schema,
edition => ee
}).
%% callback_module must implement the emqx_gateway_impl behaviour
-behaviour(emqx_gateway_impl).
%% callback for emqx_gateway_impl
-export([
on_gateway_load/2,
on_gateway_update/3,
on_gateway_unload/2
]).
-import(
emqx_gateway_utils,
[
normalize_config/1,
start_listeners/4,
stop_listeners/2
]
).
%%--------------------------------------------------------------------
%% emqx_gateway_impl callbacks
%%--------------------------------------------------------------------
on_gateway_load(
_Gateway = #{
name := GwName,
config := Config
},
Ctx
) ->
Listeners = normalize_config(Config),
ModCfg = #{
frame_mod => emqx_jt808_frame,
chann_mod => emqx_jt808_channel
},
case
start_listeners(
Listeners, GwName, Ctx, ModCfg
)
of
{ok, ListenerPids} ->
%% FIXME: How to throw an exception to interrupt the restart logic ?
%% FIXME: Assign ctx to GwState
{ok, ListenerPids, _GwState = #{ctx => Ctx}};
{error, {Reason, Listener}} ->
throw(
{badconf, #{
key => listeners,
value => Listener,
reason => Reason
}}
)
end.
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
GwName = maps:get(name, Gateway),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old state???
on_gateway_unload(Gateway, GwState),
on_gateway_load(Gateway#{config => Config}, Ctx)
catch
Class:Reason:Stk ->
logger:error(
"Failed to update ~ts; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[GwName, Class, Reason, Stk]
),
{error, Reason}
end.
on_gateway_unload(
_Gateway = #{
name := GwName,
config := Config
},
_GwState
) ->
Listeners = normalize_config(Config),
stop_listeners(GwName, Listeners).

View File

@ -0,0 +1,101 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_jt808_auth).
-include("emqx_jt808.hrl").
-export([
init/1,
register/2,
authenticate/2
]).
-export_type([auth/0]).
init(#{allow_anonymous := true}) ->
#auth{registry = undefined, authentication = undefined, allow_anonymous = true};
init(#{registry := Reg, authentication := Auth, allow_anonymous := Anonymous}) ->
#auth{registry = Reg, authentication = Auth, allow_anonymous = Anonymous}.
register(_RegFrame, #auth{registry = undefined, allow_anonymous = true}) ->
{ok, anonymous};
register(_RegFrame, #auth{registry = undefined, allow_anonymous = false}) ->
{error, registry_server_not_existed};
register(RegFrame, #auth{registry = RegUrl}) ->
#{
<<"header">> := #{<<"phone">> := Phone},
<<"body">> := FBody
} = RegFrame,
Params = maps:merge(FBody, #{<<"phone">> => Phone}),
case request(RegUrl, Params) of
{ok, 200, Body} ->
case emqx_utils_json:safe_decode(Body, [return_maps]) of
{ok, #{<<"code">> := 0, <<"authcode">> := Authcode}} ->
{ok, Authcode};
{ok, #{<<"code">> := Code}} ->
{error, Code};
_ ->
{error, {invailed_resp, Body}}
end;
{ok, Code, Body} ->
{error, {unknown_resp, Code, Body}};
{error, Reason} ->
{error, Reason}
end.
authenticate(_AuthFrame, #auth{authentication = undefined, allow_anonymous = true}) ->
{ok, #{auth_result => true, anonymous => true}};
authenticate(_AuthFrame, #auth{authentication = undefined, allow_anonymous = false}) ->
{ok, #{auth_result => false, anonymous => false}};
authenticate(AuthFrame, #auth{authentication = AuthUrl}) ->
#{
<<"header">> := #{<<"phone">> := Phone},
<<"body">> := #{<<"code">> := AuthCode}
} = AuthFrame,
case request(AuthUrl, #{<<"code">> => AuthCode, <<"phone">> => Phone}) of
{ok, 200, _} ->
{ok, #{auth_result => true, anonymous => false}};
{ok, _, _} ->
{ok, #{auth_result => false, anonymous => false}};
{error, Reason} ->
{error, Reason}
end.
%%--------------------------------------------------------------------
%% Inernal functions
%%--------------------------------------------------------------------
request(Url, Params) ->
RetryOpts = #{times => 3, interval => 1000, backoff => 2.0},
Req = {Url, [], "application/json", emqx_utils_json:encode(Params)},
reply(request_(post, Req, [{autoredirect, true}], [{body_format, binary}], RetryOpts)).
request_(
Method,
Req,
HTTPOpts,
Opts,
RetryOpts = #{
times := Times,
interval := Interval,
backoff := BackOff
}
) ->
case httpc:request(Method, Req, HTTPOpts, Opts) of
{error, _Reason} when Times > 0 ->
timer:sleep(trunc(Interval)),
RetryOpts1 = RetryOpts#{
times := Times - 1,
interval := Interval * BackOff
},
request_(Method, Req, HTTPOpts, Opts, RetryOpts1);
Other ->
Other
end.
reply({ok, {{_, Code, _}, _Headers, Body}}) ->
{ok, Code, Body};
reply({error, Error}) ->
{error, Error}.

View File

@ -0,0 +1,951 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_jt808_channel).
-behaviour(emqx_gateway_channel).
-include("emqx_jt808.hrl").
-include_lib("emqx/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
%% behaviour callbacks
-export([
info/1,
info/2,
stats/1
]).
-export([
init/2,
handle_in/2,
handle_deliver/2,
handle_timeout/3,
handle_call/3,
handle_cast/2,
handle_info/2
]).
-export([
terminate/2
]).
-record(channel, {
%% Context
ctx :: emqx_gateway_ctx:context(),
%% ConnInfo
conninfo :: emqx_types:conninfo(),
%% ClientInfo
clientinfo :: emqx_types:clientinfo(),
%% Session
session :: undefined | map(),
%% Conn State
conn_state :: conn_state(),
%% Timers
timers :: #{atom() => undefined | disabled | reference()},
%% AuthCode
authcode :: undefined | anonymous | binary(),
%% Keepalive
keepalive,
%% Msg SN
msg_sn,
%% Down Topic
dn_topic,
%% Up Topic
up_topic,
%% Auth
auth :: emqx_jt808_auth:auth(),
%% Inflight
inflight :: emqx_inflight:inflight(),
mqueue :: queue:queue(),
max_mqueue_len,
rsa_key,
retx_interval,
retx_max_times
}).
-type conn_state() :: idle | connecting | connected | disconnected.
-type channel() :: #channel{}.
-type reply() ::
{outgoing, emqx_types:packet()}
| {outgoing, [emqx_types:packet()]}
| {event, conn_state() | updated}
| {close, Reason :: atom()}.
-type replies() :: reply() | [reply()].
-define(TIMER_TABLE, #{
alive_timer => keepalive,
retry_timer => retry_delivery
}).
-define(INFO_KEYS, [ctx, conninfo, zone, clientid, clientinfo, session, conn_state, authcode]).
-define(RETX_INTERVAL, 8000).
-define(RETX_MAX_TIME, 5).
-define(DEFAULT_KEEPALIVE, 300).
-define(MSG(MsgId), #{<<"header">> := #{<<"msg_id">> := MsgId}}).
-dialyzer({nowarn_function, init/2}).
%%--------------------------------------------------------------------
%% Info, Attrs and Caps
%%--------------------------------------------------------------------
%% @doc Get infos of the channel.
-spec info(channel()) -> emqx_types:infos().
info(Channel) ->
maps:from_list(info(?INFO_KEYS, Channel)).
-spec info(list(atom()) | atom(), channel()) -> term().
info(Keys, Channel) when is_list(Keys) ->
[{Key, info(Key, Channel)} || Key <- Keys];
info(ctx, #channel{ctx = Ctx}) ->
Ctx;
info(conninfo, #channel{conninfo = ConnInfo}) ->
ConnInfo;
info(zone, #channel{clientinfo = #{zone := Zone}}) ->
Zone;
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
ClientId;
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, _) ->
#{};
info(conn_state, #channel{conn_state = ConnState}) ->
ConnState;
info(authcode, #channel{authcode = AuthCode}) ->
AuthCode.
stats(_Channel) ->
[].
%%--------------------------------------------------------------------
%% Init the Channel
%%--------------------------------------------------------------------
-spec init(emqx_types:conninfo(), map()) -> channel().
init(
ConnInfo = #{
peername := {PeerHost, _Port},
sockname := {_Host, SockPort}
},
Options = #{
ctx := Ctx,
message_queue_len := MessageQueueLen,
proto := ProtoConf
}
) ->
% TODO: init rsa_key from user input
Peercert = maps:get(peercert, ConnInfo, undefined),
Mountpoint = maps:get(mountpoint, Options, ?DEFAULT_MOUNTPOINT),
ListenerId =
case maps:get(listener, Options, undefined) of
undefined -> undefined;
{GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName)
end,
ClientInfo = setting_peercert_infos(
Peercert,
#{
zone => default,
listener => ListenerId,
protocol => jt808,
peerhost => PeerHost,
sockport => SockPort,
clientid => undefined,
username => undefined,
is_bridge => false,
is_superuser => false,
mountpoint => Mountpoint
}
),
#channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo,
session = undefined,
conn_state = idle,
timers = #{},
authcode = undefined,
keepalive = maps:get(keepalive, Options, ?DEFAULT_KEEPALIVE),
msg_sn = 0,
% TODO: init rsa_key from user input
dn_topic = maps:get(dn_topic, ProtoConf, ?DEFAULT_DN_TOPIC),
up_topic = maps:get(up_topic, ProtoConf, ?DEFAULT_UP_TOPIC),
auth = emqx_jt808_auth:init(ProtoConf),
inflight = emqx_inflight:new(128),
mqueue = queue:new(),
max_mqueue_len = MessageQueueLen,
rsa_key = [0, <<0:1024>>],
retx_interval = maps:get(retry_interval, Options, ?RETX_INTERVAL),
retx_max_times = maps:get(max_retry_times, Options, ?RETX_MAX_TIME)
}.
setting_peercert_infos(NoSSL, ClientInfo) when
NoSSL =:= nossl;
NoSSL =:= undefined
->
ClientInfo;
setting_peercert_infos(Peercert, ClientInfo) ->
DN = esockd_peercert:subject(Peercert),
CN = esockd_peercert:common_name(Peercert),
ClientInfo#{dn => DN, cn => CN}.
%%--------------------------------------------------------------------
%% Handle incoming packet
%%--------------------------------------------------------------------
-spec handle_in(emqx_jt808_frame:frame() | {frame_error, any()}, channel()) ->
{ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}.
handle_in(Frame = ?MSG(MType), Channel = #channel{conn_state = ConnState}) when
ConnState /= connected, MType =:= ?MC_REGISTER;
ConnState /= connected, MType =:= ?MC_AUTH
->
?SLOG(debug, #{msg => "recv_frame", frame => Frame}),
do_handle_in(Frame, Channel#channel{conn_state = connecting});
handle_in(Frame, Channel = #channel{conn_state = connected}) ->
?SLOG(debug, #{msg => "recv_frame", frame => Frame}),
do_handle_in(Frame, Channel);
handle_in(Frame, Channel) ->
?SLOG(error, #{msg => "unexpected_frame", frame => Frame}),
{stop, unexpected_frame, Channel}.
%% @private
do_handle_in(Frame = ?MSG(?MC_GENERAL_RESPONSE), Channel = #channel{inflight = Inflight}) ->
#{<<"body">> := #{<<"seq">> := Seq, <<"id">> := Id}} = Frame,
NewInflight = ack_msg(?MC_GENERAL_RESPONSE, {Id, Seq}, Inflight),
{ok, Channel#channel{inflight = NewInflight}};
do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) ->
#{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame,
case emqx_jt808_auth:register(Frame, Channel0#channel.auth) of
{ok, Authcode} ->
Channel = enrich_clientinfo(
Frame, enrich_conninfo(Frame, Channel0#channel{authcode = Authcode})
),
handle_out({?MS_REGISTER_ACK, 0}, MsgSn, Channel);
{error, Reason} ->
?SLOG(error, #{msg => "register_failed", reason => Reason}),
ResCode =
case is_integer(Reason) of
true -> Reason;
false -> 1
end,
handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel0)
end;
do_handle_in(Frame = ?MSG(?MC_AUTH), Channel0) ->
#{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame,
Channel =
#channel{clientinfo = #{clientid := ClientId}} =
enrich_clientinfo(Frame, enrich_conninfo(Frame, Channel0)),
authack(
case authenticate(Frame, Channel0) of
true ->
NChannel = prepare_adapter_topic(ensure_connected(Channel)),
emqx_logger:set_metadata_clientid(ClientId),
%% Auto subscribe downlink topics
autosubcribe(NChannel),
_ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel),
%% 0: Successful
{0, MsgSn, NChannel};
false ->
?SLOG(error, #{msg => "authenticated_failed"}),
%% 1: Failure
{1, MsgSn, Channel}
end
);
do_handle_in(Frame = ?MSG(?MC_HEARTBEAT), Channel) ->
handle_out({?MS_GENERAL_RESPONSE, 0, ?MC_HEARTBEAT}, msgsn(Frame), Channel);
do_handle_in(?MSG(?MC_RSA_KEY), Channel = #channel{rsa_key = [E, N]}) ->
Response = #{
<<"header">> => build_frame_header(?MS_RSA_KEY, Channel),
<<"body">> => #{<<"e">> => E, <<"n">> => N}
},
% TODO: how to use client's RSA key?
{ok, [{outgoing, Response}], state_inc_sn(Channel)};
do_handle_in(?MSG(?MC_MULTIMEDIA_DATA_REPORT), Channel = #channel{rsa_key = [_E, _N]}) ->
Response = #{
<<"header">> => build_frame_header(?MS_MULTIMEDIA_DATA_ACK, Channel),
<<"body">> => #{}
},
% TODO: how to fill ?
{ok, [{outgoing, Response}], state_inc_sn(Channel)};
do_handle_in(
Frame = ?MSG(?MC_DRIVER_ID_REPORT),
Channel = #channel{
up_topic = Topic,
inflight = Inflight
}
) ->
{MsgId, MsgSn} = msgidsn(Frame),
_ = do_publish(Topic, Frame),
case is_driver_id_req_exist(Channel) of
% this is an device passive command
false ->
handle_out({?MS_GENERAL_RESPONSE, 0, MsgId}, MsgSn, Channel);
% this is a response to MS_REQ_DRIVER_ID(0x8702)
true ->
{ok, Channel#channel{inflight = ack_msg(?MC_DRIVER_ID_REPORT, none, Inflight)}}
end;
do_handle_in(?MSG(?MC_DEREGISTER), Channel) ->
{stop, normal, Channel};
do_handle_in(Frame = #{}, Channel = #channel{up_topic = Topic, inflight = Inflight}) ->
{MsgId, MsgSn} = msgidsn(Frame),
_ = do_publish(Topic, Frame),
case is_general_response_needed(MsgId) of
% these frames device passive request
true ->
handle_out({?MS_GENERAL_RESPONSE, 0, MsgId}, MsgSn, Channel);
% these frames are response to server's request
false ->
{ok, Channel#channel{inflight = ack_msg(MsgId, seq(Frame), Inflight)}}
end;
do_handle_in(Frame, Channel) ->
?SLOG(error, #{msg => "ignore_unknown_frame", frame => Frame}),
{ok, Channel}.
do_publish(Topic, Frame) ->
?SLOG(debug, #{msg => "publish_msg", to_topic => Topic, farme => Frame}),
emqx:publish(emqx_message:make(jt808, ?QOS_1, Topic, emqx_utils_json:encode(Frame))).
%%--------------------------------------------------------------------
%% Handle Delivers from broker to client
%%--------------------------------------------------------------------
-spec handle_deliver(list(emqx_types:deliver()), channel()) ->
{ok, channel()}
| {ok, replies(), channel()}.
handle_deliver(
Messages0,
Channel = #channel{
clientinfo = #{mountpoint := Mountpoint},
mqueue = Queue,
max_mqueue_len = MaxQueueLen
}
) ->
Messages = lists:map(
fun({deliver, _, M}) ->
emqx_mountpoint:unmount(Mountpoint, M)
end,
Messages0
),
case MaxQueueLen - queue:len(Queue) of
N when N =< 0 ->
discard_downlink_messages(Messages, Channel),
{ok, Channel};
N ->
{NMessages, Dropped} = split_by_pos(Messages, N),
log(debug, #{msg => "enqueue_messages", messages => NMessages}, Channel),
metrics_inc('messages.delivered', Channel, erlang:length(NMessages)),
discard_downlink_messages(Dropped, Channel),
Frames = msgs2frame(NMessages, Channel),
NQueue = lists:foldl(fun(F, Q) -> queue:in(F, Q) end, Queue, Frames),
{Outgoings, NChannel} = dispatch_frame(Channel#channel{mqueue = NQueue}),
{ok, [{outgoing, Outgoings}], NChannel}
end.
split_by_pos(L, Pos) ->
split_by_pos(L, Pos, []).
split_by_pos([], _, A1) ->
{lists:reverse(A1), []};
split_by_pos(L, 0, A1) ->
{lists:reverse(A1), L};
split_by_pos([E | L], N, A1) ->
split_by_pos(L, N - 1, [E | A1]).
msgs2frame(Messages, Channel) ->
lists:filtermap(
fun(#message{payload = Payload}) ->
case emqx_utils_json:safe_decode(Payload, [return_maps]) of
{ok, Map} ->
MsgId = msgid(Map),
NewHeader = build_frame_header(MsgId, Channel),
Frame = maps:put(<<"header">>, NewHeader, Map),
{true, Frame};
{error, Reason} ->
log(error, #{msg => "json_decode_error", reason => Reason}, Channel),
false
end
end,
Messages
).
authack(
{Code, MsgSn,
Channel = #channel{
conninfo = ConnInfo,
clientinfo = ClientInfo
}}
) ->
Code == 0 andalso emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]),
handle_out({?MS_GENERAL_RESPONSE, Code, ?MC_AUTH}, MsgSn, Channel).
handle_out({?MS_GENERAL_RESPONSE, Result, InMsgId}, MsgSn, Channel) ->
Frame = #{
<<"header">> => build_frame_header(?MS_GENERAL_RESPONSE, Channel),
<<"body">> => #{<<"seq">> => MsgSn, <<"result">> => Result, <<"id">> => InMsgId}
},
{ok, [{outgoing, Frame}], state_inc_sn(Channel)};
handle_out({?MS_REGISTER_ACK, 0}, MsgSn, Channel = #channel{authcode = Authcode0}) ->
Authcode =
case Authcode0 == anonymous of
true -> <<>>;
false -> Authcode0
end,
Frame = #{
<<"header">> => build_frame_header(?MS_REGISTER_ACK, Channel),
<<"body">> => #{<<"seq">> => MsgSn, <<"result">> => 0, <<"auth_code">> => Authcode}
},
{ok, [{outgoing, Frame}], state_inc_sn(Channel)};
handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel) ->
Frame = #{
<<"header">> => build_frame_header(?MS_REGISTER_ACK, Channel),
<<"body">> => #{<<"seq">> => MsgSn, <<"result">> => ResCode}
},
{ok, [{outgoing, Frame}], state_inc_sn(Channel)}.
%%--------------------------------------------------------------------
%% Handle call
%%--------------------------------------------------------------------
-spec handle_call(Req :: term(), From :: term(), channel()) ->
{reply, Reply :: term(), channel()}
| {reply, Reply :: term(), replies(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), emqx_jt808_frame:frame(), channel()}.
handle_call(kick, _From, Channel) ->
Channel1 = ensure_disconnected(kicked, Channel),
disconnect_and_shutdown(kicked, ok, Channel1);
handle_call(discard, _From, Channel) ->
disconnect_and_shutdown(discarded, ok, Channel);
handle_call(Req, _From, Channel) ->
log(error, #{msg => "unexpected_call", call => Req}, Channel),
reply(ignored, Channel).
%%--------------------------------------------------------------------
%% Handle cast
%%--------------------------------------------------------------------
-spec handle_cast(Req :: term(), channel()) ->
ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
handle_cast(_Req, Channel) ->
{ok, Channel}.
%%--------------------------------------------------------------------
%% Handle info
%%--------------------------------------------------------------------
-spec handle_info(Info :: term(), channel()) ->
ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
shutdown(Reason, Channel);
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
shutdown(Reason, Channel);
handle_info(
{sock_closed, Reason},
Channel =
#channel{
conn_state = connected
}
) ->
NChannel = ensure_disconnected(Reason, Channel),
shutdown(Reason, NChannel);
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
log(error, #{msg => "unexpected_sock_closed", reason => Reason}, Channel),
{ok, Channel};
handle_info(Info, Channel) ->
log(error, #{msg => "unexpected_info", info => Info}, Channel),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Handle timeout
%%--------------------------------------------------------------------
-spec handle_timeout(reference(), Msg :: term(), channel()) ->
{ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}.
handle_timeout(
_TRef,
{keepalive, _StatVal},
Channel = #channel{keepalive = undefined}
) ->
{ok, Channel};
handle_timeout(
_TRef,
{keepalive, _StatVal},
Channel = #channel{conn_state = disconnected}
) ->
{ok, Channel};
handle_timeout(
_TRef,
{keepalive, StatVal},
Channel = #channel{keepalive = Keepalive}
) ->
case emqx_keepalive:check(StatVal, Keepalive) of
{ok, NKeepalive} ->
NChannel = Channel#channel{keepalive = NKeepalive},
{ok, reset_timer(alive_timer, NChannel)};
{error, timeout} ->
shutdown(keepalive_timeout, Channel)
end;
handle_timeout(
_TRef, retry_delivery, Channel = #channel{inflight = Inflight, retx_interval = RetxInterval}
) ->
case emqx_inflight:is_empty(Inflight) of
true ->
{ok, clean_timer(retry_timer, Channel)};
false ->
Frames = lists:sort(sortfun(), emqx_inflight:to_list(Inflight)),
{Outgoings, NInflight} = retry_delivery(
Frames, erlang:system_time(millisecond), RetxInterval, Inflight, []
),
{Outgoings2, NChannel} = dispatch_frame(Channel#channel{inflight = NInflight}),
{ok, [{outgoing, Outgoings ++ Outgoings2}], reset_timer(retry_timer, NChannel)}
end.
sortfun() ->
fun({_, {_, _, Ts1}}, {_, {_, _, Ts2}}) -> Ts1 < Ts2 end.
retry_delivery([], _Now, _Interval, Inflight, Acc) ->
{lists:reverse(Acc), Inflight};
retry_delivery([{Key, {_Frame, 0, _}} | Frames], Now, Interval, Inflight, Acc) ->
%% todo log(error, "has arrived max re-send times, drop ~p", [Frame]),
NInflight = emqx_inflight:delete(Key, Inflight),
retry_delivery(Frames, Now, Interval, NInflight, Acc);
retry_delivery([{Key, {Frame, RetxCount, Ts}} | Frames], Now, Interval, Inflight, Acc) ->
Diff = Now - Ts,
case Diff >= Interval of
true ->
NInflight = emqx_inflight:update(Key, {Frame, RetxCount - 1, Now}, Inflight),
retry_delivery(Frames, Now, Interval, NInflight, [Frame | Acc]);
_ ->
retry_delivery(Frames, Now, Interval, Inflight, Acc)
end.
dispatch_frame(
Channel = #channel{
msg_sn = TxMsgSn,
mqueue = Queue,
inflight = Inflight,
retx_max_times = RetxMax
}
) ->
case emqx_inflight:is_full(Inflight) orelse queue:is_empty(Queue) of
true ->
{[], Channel};
false ->
{{value, Frame}, NewQueue} = queue:out(Queue),
log(debug, #{msg => "delivery", frame => Frame}, Channel),
NewInflight = emqx_inflight:insert(
set_msg_ack(msgid(Frame), TxMsgSn),
{Frame, RetxMax, erlang:system_time(millisecond)},
Inflight
),
NChannel = Channel#channel{mqueue = NewQueue, inflight = NewInflight},
{[Frame], ensure_timer(retry_timer, NChannel)}
end.
%%--------------------------------------------------------------------
%% Ensure timers
%%--------------------------------------------------------------------
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
TRef = maps:get(Name, Timers, undefined),
Time = interval(Name, Channel),
case TRef == undefined andalso Time > 0 of
true -> ensure_timer(Name, Time, Channel);
%% Timer disabled or exists
false -> Channel
end.
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
log(debug, #{msg => "start_timer", name => Name, time => Time}, Channel),
Msg = maps:get(Name, ?TIMER_TABLE),
TRef = emqx_utils:start_timer(Time, Msg),
Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) ->
ensure_timer(Name, clean_timer(Name, Channel)).
clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive);
interval(retry_timer, #channel{retx_interval = RetxIntv}) ->
RetxIntv.
%%--------------------------------------------------------------------
%% Terminate
%%--------------------------------------------------------------------
terminate(_Reason, #channel{clientinfo = #{clientid := undefined}}) ->
ok;
terminate(_Reason, #channel{conn_state = disconnected}) ->
ok;
terminate(Reason, #channel{clientinfo = ClientInfo, conninfo = ConnInfo}) ->
?SLOG(info, #{msg => "connection_shutdown", reason => Reason}),
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, NConnInfo]).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
maybe_fix_mountpoint(ClientInfo = #{mountpoint := undefined}) ->
ClientInfo;
maybe_fix_mountpoint(ClientInfo = #{mountpoint := Mountpoint}) ->
%% TODO: Enrich the variable replacement????
%% i.e: ${ClientInfo.auth_result.productKey}
Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
ClientInfo#{mountpoint := Mountpoint1}.
ensure_connected(
Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
conninfo = NConnInfo,
conn_state = connected
}.
%% Ensure disconnected
ensure_disconnected(
Reason,
Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = run_hooks(
Ctx,
'client.disconnected',
[ClientInfo, Reason, NConnInfo]
),
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
ack_msg(MsgId, KeyParam, Inflight) ->
Key = get_msg_ack(MsgId, KeyParam),
case emqx_inflight:contain(Key, Inflight) of
true -> emqx_inflight:delete(Key, Inflight);
false -> Inflight
end.
set_msg_ack(?MS_SET_CLIENT_PARAM, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_CLIENT_PARAM, MsgSn}};
set_msg_ack(?MS_QUERY_CLIENT_ALL_PARAM, MsgSn) ->
{?MC_QUERY_PARAM_ACK, MsgSn};
set_msg_ack(?MS_QUERY_CLIENT_PARAM, MsgSn) ->
{?MC_QUERY_PARAM_ACK, MsgSn};
set_msg_ack(?MS_CLIENT_CONTROL, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_CLIENT_CONTROL, MsgSn}};
set_msg_ack(?MS_QUERY_CLIENT_ATTRIB, _MsgSn) ->
{?MC_QUERY_ATTRIB_ACK, none};
set_msg_ack(?MS_OTA, _MsgSn) ->
{?MC_OTA_ACK, none};
set_msg_ack(?MS_QUERY_LOCATION, MsgSn) ->
{?MC_QUERY_LOCATION_ACK, MsgSn};
set_msg_ack(?MS_TRACE_LOCATION, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_TRACE_LOCATION, MsgSn}};
set_msg_ack(?MS_CONFIRM_ALARM, _MsgSn) ->
% TODO: how to ack this message?
{};
set_msg_ack(?MS_SEND_TEXT, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SEND_TEXT, MsgSn}};
set_msg_ack(?MS_SET_EVENT, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_EVENT, MsgSn}};
set_msg_ack(?MS_SEND_QUESTION, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SEND_QUESTION, MsgSn}};
set_msg_ack(?MS_SET_MENU, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_MENU, MsgSn}};
set_msg_ack(?MS_INFO_CONTENT, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_INFO_CONTENT, MsgSn}};
set_msg_ack(?MS_PHONE_CALLBACK, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_PHONE_CALLBACK, MsgSn}};
set_msg_ack(?MS_SET_PHONE_NUMBER, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_PHONE_NUMBER, MsgSn}};
set_msg_ack(?MS_VEHICLE_CONTROL, MsgSn) ->
{?MC_VEHICLE_CTRL_ACK, MsgSn};
set_msg_ack(?MS_SET_CIRCLE_AREA, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_CIRCLE_AREA, MsgSn}};
set_msg_ack(?MS_DEL_CIRCLE_AREA, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_DEL_CIRCLE_AREA, MsgSn}};
set_msg_ack(?MS_SET_RECT_AREA, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_RECT_AREA, MsgSn}};
set_msg_ack(?MS_DEL_RECT_AREA, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_DEL_RECT_AREA, MsgSn}};
set_msg_ack(?MS_SET_POLY_AREA, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_POLY_AREA, MsgSn}};
set_msg_ack(?MS_DEL_POLY_AREA, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_DEL_POLY_AREA, MsgSn}};
set_msg_ack(?MS_SET_PATH, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_PATH, MsgSn}};
set_msg_ack(?MS_DEL_PATH, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_DEL_PATH, MsgSn}};
set_msg_ack(?MS_DRIVE_RECORD_CAPTURE, MsgSn) ->
{?MC_DRIVE_RECORD_REPORT, MsgSn};
set_msg_ack(?MS_DRIVE_REC_PARAM_SEND, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_DRIVE_REC_PARAM_SEND, MsgSn}};
set_msg_ack(?MS_REQ_DRIVER_ID, _MsgSn) ->
{?MC_DRIVER_ID_REPORT, none};
set_msg_ack(?MS_CAMERA_SHOT, MsgSn) ->
% TODO: spec has two conflicted statement about this ack
% section 7.9.3 requires general ack
% section 8.55 requires 0x0805
{?MC_CAMERA_SHOT_ACK, MsgSn};
set_msg_ack(?MS_MM_DATA_SEARCH, MsgSn) ->
{?MC_MM_DATA_SEARCH_ACK, MsgSn};
set_msg_ack(?MS_MM_DATA_UPLOAD, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_MM_DATA_UPLOAD, MsgSn}};
set_msg_ack(?MS_VOICE_RECORD, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_VOICE_RECORD, MsgSn}};
set_msg_ack(?MS_SINGLE_MM_DATA_CTRL, MsgSn) ->
% TODO: right?
{?MC_MM_DATA_SEARCH_ACK, MsgSn};
set_msg_ack(?MS_SEND_TRANSPARENT_DATA, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SEND_TRANSPARENT_DATA, MsgSn}};
set_msg_ack(MsgId, Param) ->
error({invalid_message_type, MsgId, Param}).
get_msg_ack(?MC_GENERAL_RESPONSE, {MsgId, MsgSn}) ->
{?MC_GENERAL_RESPONSE, {MsgId, MsgSn}};
get_msg_ack(?MC_QUERY_PARAM_ACK, MsgSn) ->
{?MC_QUERY_PARAM_ACK, MsgSn};
get_msg_ack(?MC_QUERY_ATTRIB_ACK, _MsgSn) ->
{?MC_QUERY_ATTRIB_ACK, none};
get_msg_ack(?MC_OTA_ACK, _MsgSn) ->
{?MC_OTA_ACK, none};
get_msg_ack(?MC_QUERY_LOCATION_ACK, MsgSn) ->
{?MC_QUERY_LOCATION_ACK, MsgSn};
get_msg_ack(?MC_QUESTION_ACK, MsgSn) ->
{?MC_QUESTION_ACK, MsgSn};
get_msg_ack(?MC_VEHICLE_CTRL_ACK, MsgSn) ->
{?MC_VEHICLE_CTRL_ACK, MsgSn};
get_msg_ack(?MC_DRIVE_RECORD_REPORT, MsgSn) ->
{?MC_DRIVE_RECORD_REPORT, MsgSn};
get_msg_ack(?MC_CAMERA_SHOT_ACK, MsgSn) ->
{?MC_CAMERA_SHOT_ACK, MsgSn};
get_msg_ack(?MC_MM_DATA_SEARCH_ACK, MsgSn) ->
{?MC_MM_DATA_SEARCH_ACK, MsgSn};
get_msg_ack(?MC_DRIVER_ID_REPORT, _MsgSn) ->
{?MC_DRIVER_ID_REPORT, none};
get_msg_ack(MsgId, MsgSn) ->
error({invalid_message_type, MsgId, MsgSn}).
build_frame_header(MsgId, #channel{clientinfo = #{phone := Phone}, msg_sn = TxMsgSn}) ->
build_frame_header(MsgId, 0, Phone, TxMsgSn).
build_frame_header(MsgId, Encrypt, Phone, TxMsgSn) ->
#{
<<"msg_id">> => MsgId,
<<"encrypt">> => Encrypt,
<<"phone">> => Phone,
<<"msg_sn">> => TxMsgSn
}.
seq(#{<<"body">> := #{<<"seq">> := MsgSn}}) -> MsgSn;
seq(#{}) -> 0.
msgsn(#{<<"header">> := #{<<"msg_sn">> := MsgSn}}) -> MsgSn.
msgid(#{<<"header">> := #{<<"msg_id">> := MsgId}}) -> MsgId.
msgidsn(#{
<<"header">> := #{
<<"msg_id">> := MsgId,
<<"msg_sn">> := MsgSn
}
}) ->
{MsgId, MsgSn}.
state_inc_sn(Channel = #channel{msg_sn = Sn}) ->
Channel#channel{msg_sn = next_msg_sn(Sn)}.
next_msg_sn(16#FFFF) -> 0;
next_msg_sn(Sn) -> Sn + 1.
is_general_response_needed(?MC_EVENT_REPORT) -> true;
is_general_response_needed(?MC_LOCATION_REPORT) -> true;
is_general_response_needed(?MC_INFO_REQ_CANCEL) -> true;
is_general_response_needed(?MC_WAYBILL_REPORT) -> true;
is_general_response_needed(?MC_BULK_LOCATION_REPORT) -> true;
is_general_response_needed(?MC_CAN_BUS_REPORT) -> true;
is_general_response_needed(?MC_MULTIMEDIA_EVENT_REPORT) -> true;
is_general_response_needed(?MC_SEND_TRANSPARENT_DATA) -> true;
is_general_response_needed(?MC_SEND_ZIP_DATA) -> true;
is_general_response_needed(_) -> false.
is_driver_id_req_exist(#channel{inflight = Inflight}) ->
% if there is a MS_REQ_DRIVER_ID (0x8702) command in re-tx queue
Key = get_msg_ack(?MC_DRIVER_ID_REPORT, none),
emqx_inflight:contain(Key, Inflight).
authenticate(_AuthFrame, #channel{authcode = anonymous}) ->
true;
authenticate(AuthFrame, #channel{authcode = undefined, auth = Auth}) ->
%% Try request authentication server
case emqx_jt808_auth:authenticate(AuthFrame, Auth) of
{ok, #{auth_result := IsAuth}} ->
IsAuth;
{error, Reason} ->
?SLOG(error, #{msg => "request_auth_server_failed", reason => Reason}),
false
end;
authenticate(
#{<<"body">> := #{<<"code">> := InCode}},
#channel{authcode = Authcode}
) ->
InCode == Authcode.
enrich_conninfo(
#{<<"header">> := #{<<"phone">> := Phone}},
Channel = #channel{conninfo = ConnInfo}
) ->
NConnInfo = ConnInfo#{
proto_name => <<"jt808">>,
proto_ver => <<"2013">>,
clean_start => true,
clientid => Phone,
username => undefined,
conn_props => #{},
connected => true,
connected_at => erlang:system_time(millisecond),
keepalive => ?DEFAULT_KEEPALIVE,
receive_maximum => 0,
expiry_interval => 0
},
Channel#channel{conninfo = NConnInfo}.
%% Register
enrich_clientinfo(
#{
<<"header">> := #{<<"phone">> := Phone},
<<"body">> := #{
<<"manufacturer">> := Manu,
<<"dev_id">> := DevId
}
},
Channel = #channel{clientinfo = ClientInfo}
) ->
NClientInfo = maybe_fix_mountpoint(ClientInfo#{
phone => Phone,
clientid => Phone,
manufacturer => Manu,
terminal_id => DevId
}),
Channel#channel{clientinfo = NClientInfo};
%% Auth
enrich_clientinfo(
#{<<"header">> := #{<<"phone">> := Phone}},
Channel = #channel{clientinfo = ClientInfo}
) ->
NClientInfo = ClientInfo#{
phone => Phone,
clientid => Phone
},
Channel#channel{clientinfo = NClientInfo}.
prepare_adapter_topic(Channel = #channel{up_topic = UpTopic, dn_topic = DnTopic}) ->
Channel#channel{
up_topic = replvar(UpTopic, Channel),
dn_topic = replvar(DnTopic, Channel)
}.
replvar(undefined, _Channel) ->
undefined;
replvar(Topic, #channel{clientinfo = #{clientid := ClientId, phone := Phone}}) when
is_binary(Topic)
->
do_replvar(Topic, #{clientid => ClientId, phone => Phone}).
do_replvar(Topic, Vars) ->
ClientID = maps:get(clientid, Vars, undefined),
Phone = maps:get(phone, Vars, undefined),
List = [
{?PH_CLIENTID, ClientID},
{?PH_PHONE, Phone}
],
lists:foldl(fun feed_var/2, Topic, List).
feed_var({_PH, undefined}, Topic) ->
Topic;
feed_var({PH, Value}, Topic) ->
emqx_topic:feed_var(PH, Value, Topic).
autosubcribe(#channel{dn_topic = Topic}) when
Topic == undefined;
Topic == ""
->
ok;
autosubcribe(#channel{
clientinfo =
ClientInfo =
#{clientid := ClientId},
dn_topic = Topic
}) ->
SubOpts = #{rap => 0, nl => 0, qos => 0, rh => 0},
emqx:subscribe(Topic, ClientId, SubOpts),
ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts#{is_new => true}]).
start_keepalive(Secs, _Channel) when Secs > 0 ->
self() ! {keepalive, start, round(Secs) * 1000}.
run_hooks(Ctx, Name, Args) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name),
emqx_hooks:run(Name, Args).
discard_downlink_messages([], _Channel) ->
ok;
discard_downlink_messages(Messages, Channel) ->
log(
error,
#{
msg => "discard_new_downlink_messages",
reason =>
"Discard new downlink messages due to that too"
" many messages are waiting their ACKs.",
messages => Messages
},
Channel
),
metrics_inc('delivery.dropped', Channel, erlang:length(Messages)).
metrics_inc(Name, #channel{ctx = Ctx}, Oct) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name, Oct).
log(Level, Meta, #channel{clientinfo = #{clientid := ClientId, username := Username}} = _Channel) ->
?SLOG(Level, Meta#{clientid => ClientId, username => Username}).
reply(Reply, Channel) ->
{reply, Reply, Channel}.
shutdown(Reason, Channel) ->
{shutdown, Reason, Channel}.
shutdown(Reason, Reply, Channel) ->
{shutdown, Reason, Reply, Channel}.
disconnect_and_shutdown(Reason, Reply, Channel) ->
shutdown(Reason, Reply, Channel).

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,119 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_jt808_schema).
-include("emqx_jt808.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-export([fields/1, desc/1]).
-define(NOT_EMPTY(MSG), emqx_resource_validator:not_empty(MSG)).
fields(jt808) ->
[
{frame, sc(ref(jt808_frame))},
{proto, sc(ref(jt808_proto))},
{mountpoint, emqx_gateway_schema:mountpoint(?DEFAULT_MOUNTPOINT)},
{retry_interval,
sc(
emqx_schema:duration_ms(),
#{
default => <<"8s">>,
desc => ?DESC(retry_interval)
}
)},
{max_retry_times,
sc(
non_neg_integer(),
#{
default => 3,
desc => ?DESC(max_retry_times)
}
)},
{message_queue_len,
sc(
non_neg_integer(),
#{
default => 10,
desc => ?DESC(message_queue_len)
}
)},
{listeners, sc(ref(emqx_gateway_schema, tcp_listeners), #{desc => ?DESC(tcp_listeners)})}
] ++ emqx_gateway_schema:gateway_common_options();
fields(jt808_frame) ->
[
{max_length, fun jt808_frame_max_length/1}
];
fields(jt808_proto) ->
[
{allow_anonymous, fun allow_anonymous/1},
{registry, fun registry_url/1},
{authentication, fun authentication_url/1},
{up_topic, fun up_topic/1},
{dn_topic, fun dn_topic/1}
].
jt808_frame_max_length(type) -> non_neg_integer();
jt808_frame_max_length(desc) -> ?DESC(?FUNCTION_NAME);
jt808_frame_max_length(default) -> 8192;
jt808_frame_max_length(required) -> false;
jt808_frame_max_length(_) -> undefined.
allow_anonymous(type) -> boolean();
allow_anonymous(desc) -> ?DESC(?FUNCTION_NAME);
allow_anonymous(default) -> true;
allow_anonymous(required) -> false;
allow_anonymous(_) -> undefined.
registry_url(type) -> binary();
registry_url(desc) -> ?DESC(?FUNCTION_NAME);
registry_url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")];
registry_url(required) -> false;
registry_url(_) -> undefined.
authentication_url(type) -> binary();
authentication_url(desc) -> ?DESC(?FUNCTION_NAME);
authentication_url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")];
authentication_url(required) -> false;
authentication_url(_) -> undefined.
up_topic(type) -> binary();
up_topic(desc) -> ?DESC(?FUNCTION_NAME);
up_topic(default) -> ?DEFAULT_UP_TOPIC;
up_topic(validator) -> [?NOT_EMPTY("the value of the field 'up_topic' cannot be empty")];
up_topic(required) -> true;
up_topic(_) -> undefined.
dn_topic(type) -> binary();
dn_topic(desc) -> ?DESC(?FUNCTION_NAME);
dn_topic(default) -> ?DEFAULT_DN_TOPIC;
dn_topic(validator) -> [?NOT_EMPTY("the value of the field 'dn_topic' cannot be empty")];
dn_topic(required) -> true;
dn_topic(_) -> undefined.
desc(jt808) ->
"The JT/T 808 protocol gateway provides EMQX with the ability to access JT/T 808 protocol devices.";
desc(jt808_frame) ->
"Limits for the JT/T 808 frames.";
desc(jt808_proto) ->
"The JT/T 808 protocol options.";
desc(_) ->
undefined.
%%--------------------------------------------------------------------
%% internal functions
sc(Type) ->
sc(Type, #{}).
sc(Type, Meta) ->
hoconsc:mk(Type, Meta).
ref(StructName) ->
ref(?MODULE, StructName).
ref(Mod, Field) ->
hoconsc:ref(Mod, Field).

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,100 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_jt808_auth_http_test_server).
-behaviour(supervisor).
-behaviour(cowboy_handler).
% cowboy_server callbacks
-export([init/2]).
% supervisor callbacks
-export([init/1]).
% API
-export([
start_link/0, start_link/1, start_link/2,
stop/0,
set_handler/1
]).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
start_link() ->
start_link(8991).
start_link(Port) ->
start_link(Port, "/[...]").
start_link(Port, Path) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [Port, Path]).
stop() ->
gen_server:stop(?MODULE).
set_handler(F) when is_function(F, 2) ->
true = ets:insert(?MODULE, {handler, F}),
ok.
%%------------------------------------------------------------------------------
%% supervisor API
%%------------------------------------------------------------------------------
init([Port, Path]) ->
Dispatch = cowboy_router:compile(
[
{'_', [{Path, ?MODULE, []}]}
]
),
TransOpts = #{
socket_opts => [{port, Port}],
connection_type => supervisor
},
ProtoOpts = #{env => #{dispatch => Dispatch}},
Tab = ets:new(?MODULE, [set, named_table, public]),
ets:insert(Tab, {handler, fun default_handler/2}),
ChildSpec = ranch:child_spec(?MODULE, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts),
{ok, {{one_for_one, 10, 10}, [ChildSpec]}}.
%%------------------------------------------------------------------------------
%% cowboy_server API
%%------------------------------------------------------------------------------
init(Req, State) ->
[{handler, Handler}] = ets:lookup(?MODULE, handler),
Handler(Req, State).
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
default_handler(Req0 = #{method := <<"POST">>, path := <<"/jt808/registry">>}, State) ->
Req = cowboy_req:reply(
200,
#{<<"content-type">> => <<"application/json">>},
emqx_utils_json:encode(#{code => 0, authcode => <<"123456">>}),
Req0
),
{ok, Req, State};
default_handler(Req0 = #{method := <<"POST">>, path := <<"/jt808/auth">>}, State) ->
Req = cowboy_req:reply(
200,
#{<<"content-type">> => <<"application/json">>},
emqx_utils_json:encode(#{client_id => <<"abcdef">>}),
Req0
),
{ok, Req, State};
default_handler(Req0, State) ->
Req = cowboy_req:reply(
400,
#{<<"content-type">> => <<"text/plain">>},
<<"">>,
Req0
),
{ok, Req, State}.

View File

@ -0,0 +1,721 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_jt808_parser_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_jt808.hrl").
-define(LOGT(Format, Args), ct:print("TEST_SUITE: " ++ Format, Args)).
-define(FRM_FLAG, 16#7e:8).
-define(RESERVE, 0).
-define(NO_FRAGMENT, 0).
-define(WITH_FRAGMENT, 1).
-define(NO_ENCRYPT, 0).
-define(MSG_SIZE(X), X:10 / big - integer).
-define(word, 16 / big - integer).
-define(dword, 32 / big - integer).
-include_lib("eunit/include/eunit.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Config.
end_per_suite(Config) ->
Config.
t_case01_register(_Config) ->
Parser = emqx_jt808_frame:initial_parse_state(#{}),
Manuf = <<"examp">>,
Model = <<"33333333333333333333">>,
DevId = <<"1234567">>,
Color = 3,
Plate = <<"ujvl239">>,
RegisterPacket =
<<58:?word, 59:?word, Manuf/binary, Model/binary, DevId/binary, Color, Plate/binary>>,
MsgId = 16#0100,
PhoneBCD = <<16#00, 16#01, 16#23, 16#45, 16#67, 16#89>>,
MsgSn = 78,
Size = size(RegisterPacket),
Header =
<<MsgId:?word, ?RESERVE:2, ?NO_FRAGMENT:1, ?NO_ENCRYPT:3, ?MSG_SIZE(Size), PhoneBCD/binary,
MsgSn:?word>>,
Stream = encode(Header, RegisterPacket),
{ok, Map, Rest, State} = emqx_jt808_frame:parse(Stream, Parser),
?assertEqual(
#{
<<"header">> => #{
<<"msg_id">> => MsgId,
<<"encrypt">> => ?NO_ENCRYPT,
<<"phone">> => <<"000123456789">>,
<<"len">> => Size,
<<"msg_sn">> => MsgSn
},
<<"body">> => #{
<<"province">> => 58,
<<"city">> => 59,
<<"manufacturer">> => Manuf,
<<"model">> => Model,
<<"dev_id">> => DevId,
<<"color">> => Color,
<<"license_number">> => Plate
}
},
Map
),
?assertEqual(<<>>, Rest),
?assertEqual(#{data => <<>>, phase => searching_head_hex7e}, State),
ok.
t_case02_register_ack(_Config) ->
% register ack
MsgId = 16#8100,
MsgSn = 35,
Seq = 22,
Result = 1,
Code = <<"abcdef">>,
DownlinkJson = #{
<<"header">> => #{
<<"msg_id">> => MsgId,
<<"encrypt">> => ?NO_ENCRYPT,
<<"phone">> => <<"000123456789">>,
<<"msg_sn">> => MsgSn
},
<<"body">> => #{<<"seq">> => Seq, <<"result">> => Result, <<"auth_code">> => Code}
},
Stream = emqx_jt808_frame:serialize_pkt(DownlinkJson, #{}),
RegisterAckPacket = <<Seq:?word, Result:8, Code/binary>>,
PhoneBCD = <<16#00, 16#01, 16#23, 16#45, 16#67, 16#89>>,
Size = size(RegisterAckPacket),
Header =
<<MsgId:?word, ?RESERVE:2, ?NO_FRAGMENT:1, ?NO_ENCRYPT:3, ?MSG_SIZE(Size), PhoneBCD/binary,
MsgSn:?word>>,
StreamByHand = encode(Header, RegisterAckPacket),
?assertEqual(StreamByHand, Stream),
ok.
t_case04_MC_LOCATION_REPORT(_Config) ->
Parser = emqx_jt808_frame:initial_parse_state(#{}),
Data =
<<126, 2, 0, 0, 60, 1, 136, 118, 99, 137, 114, 0, 229, 0, 0, 0, 0, 0, 4, 0, 0, 1, 49, 122,
103, 6, 147, 104, 81, 0, 14, 0, 0, 0, 39, 23, 16, 25, 25, 53, 56, 1, 4, 0, 0, 63, 178,
3, 2, 0, 0, 37, 4, 0, 0, 0, 0, 42, 2, 0, 0, 43, 4, 0, 0, 0, 0, 48, 1, 31, 49, 1, 0, 171,
126>>,
{ok, Map, Rest, State} = emqx_jt808_frame:parse(Data, Parser),
?assertEqual(
#{
<<"header">> =>
#{
<<"encrypt">> => 0,
<<"len">> => 60,
<<"msg_id">> => 512,
<<"msg_sn">> => 229,
<<"phone">> => <<"018876638972">>
},
<<"body">> =>
#{
<<"alarm">> => 0,
<<"altitude">> => 14,
<<"direction">> => 39,
<<"extra">> =>
#{
<<"analog">> => #{<<"ad0">> => 0, <<"ad1">> => 0},
<<"gnss_sat_num">> => 0,
<<"io_status">> => #{<<"deep_sleep">> => 0, <<"sleep">> => 0},
<<"mileage">> => 16306,
<<"rssi">> => 31,
<<"signal">> =>
#{
<<"abs">> => 0,
<<"air_conditioner">> => 0,
<<"brake">> => 0,
<<"cluth">> => 0,
<<"fog">> => 0,
<<"heater">> => 0,
<<"high_beam">> => 0,
<<"horn">> => 0,
<<"left_turn">> => 0,
<<"low_beam">> => 0,
<<"neutral">> => 0,
<<"retarder">> => 0,
<<"reverse">> => 0,
<<"right_turn">> => 0,
<<"side_marker">> => 0
},
<<"speed">> => 0
},
<<"latitude">> => 20019815,
<<"longitude">> => 110323793,
<<"speed">> => 0,
<<"status">> => 262144,
<<"time">> => <<"171019193538">>
}
},
Map
),
?assertEqual(<<>>, Rest),
?assertEqual(#{data => <<>>, phase => searching_head_hex7e}, State),
ok.
t_case05_MC_BULK_LOCATION_REPORT(_Config) ->
Parser = emqx_jt808_frame:initial_parse_state(#{}),
Data =
<<126, 7, 4, 1, 57, 1, 136, 118, 99, 137, 114, 0, 231, 0, 5, 1, 0, 60, 0, 0, 0, 0, 0, 4, 0,
3, 1, 49, 115, 43, 6, 145, 211, 81, 0, 31, 0, 166, 0, 171, 23, 8, 49, 16, 73, 51, 1, 4,
0, 0, 60, 85, 3, 2, 0, 0, 37, 4, 0, 0, 0, 0, 42, 2, 0, 0, 43, 4, 0, 0, 0, 0, 48, 1, 0,
49, 1, 12, 0, 60, 0, 0, 0, 0, 0, 4, 0, 3, 1, 49, 114, 74, 6, 145, 212, 2, 0, 29, 0, 222,
0, 125, 2, 23, 8, 49, 16, 73, 56, 1, 4, 0, 0, 60, 85, 3, 2, 0, 0, 37, 4, 0, 0, 0, 0, 42,
2, 0, 0, 43, 4, 0, 0, 0, 0, 48, 1, 0, 49, 1, 12, 0, 60, 0, 0, 0, 0, 0, 4, 0, 3, 1, 49,
109, 222, 6, 145, 225, 80, 0, 37, 1, 169, 0, 109, 23, 8, 49, 16, 80, 17, 1, 4, 0, 0, 60,
89, 3, 2, 0, 0, 37, 4, 0, 0, 0, 0, 42, 2, 0, 0, 43, 4, 0, 0, 0, 0, 48, 1, 0, 49, 1, 12,
0, 60, 0, 0, 0, 0, 0, 4, 0, 3, 1, 49, 90, 235, 6, 146, 0, 24, 0, 43, 2, 136, 0, 114, 23,
8, 49, 16, 81, 17, 1, 4, 0, 0, 60, 99, 3, 2, 0, 0, 37, 4, 0, 0, 0, 0, 42, 2, 0, 0, 43,
4, 0, 0, 0, 0, 48, 1, 0, 49, 1, 12, 0, 60, 0, 0, 0, 0, 0, 4, 0, 3, 1, 49, 91, 120, 6,
146, 43, 21, 0, 43, 2, 247, 0, 83, 23, 8, 49, 16, 82, 20, 1, 4, 0, 0, 60, 111, 3, 2, 0,
0, 37, 4, 0, 0, 0, 0, 42, 2, 0, 0, 43, 4, 0, 0, 0, 0, 48, 1, 0, 49, 1, 12, 132, 126>>,
{ok, Map, Rest, State} = emqx_jt808_frame:parse(Data, Parser),
?assertEqual(
#{
<<"body">> =>
#{
<<"length">> => 5,
<<"location">> =>
[
#{
<<"alarm">> => 0,
<<"altitude">> => 31,
<<"direction">> => 171,
<<"extra">> =>
#{
<<"analog">> =>
#{
<<"ad0">> => 0,
<<"ad1">> => 0
},
<<"gnss_sat_num">> => 12,
<<"io_status">> =>
#{
<<"deep_sleep">> => 0,
<<"sleep">> => 0
},
<<"mileage">> => 15445,
<<"rssi">> => 0,
<<"signal">> =>
#{
<<"abs">> => 0,
<<"air_conditioner">> =>
0,
<<"brake">> => 0,
<<"cluth">> => 0,
<<"fog">> => 0,
<<"heater">> => 0,
<<"high_beam">> => 0,
<<"horn">> => 0,
<<"left_turn">> => 0,
<<"low_beam">> => 0,
<<"neutral">> => 0,
<<"retarder">> => 0,
<<"reverse">> => 0,
<<"right_turn">> => 0,
<<"side_marker">> => 0
},
<<"speed">> => 0
},
<<"latitude">> => 20017963,
<<"longitude">> => 110220113,
<<"speed">> => 166,
<<"status">> => 262147,
<<"time">> => <<"170831104933">>
},
#{
<<"alarm">> => 0,
<<"altitude">> => 29,
<<"direction">> => 126,
<<"extra">> =>
#{
<<"analog">> =>
#{
<<"ad0">> => 0,
<<"ad1">> => 0
},
<<"gnss_sat_num">> => 12,
<<"io_status">> =>
#{
<<"deep_sleep">> => 0,
<<"sleep">> => 0
},
<<"mileage">> => 15445,
<<"rssi">> => 0,
<<"signal">> =>
#{
<<"abs">> => 0,
<<"air_conditioner">> =>
0,
<<"brake">> => 0,
<<"cluth">> => 0,
<<"fog">> => 0,
<<"heater">> => 0,
<<"high_beam">> => 0,
<<"horn">> => 0,
<<"left_turn">> => 0,
<<"low_beam">> => 0,
<<"neutral">> => 0,
<<"retarder">> => 0,
<<"reverse">> => 0,
<<"right_turn">> => 0,
<<"side_marker">> => 0
},
<<"speed">> => 0
},
<<"latitude">> => 20017738,
<<"longitude">> => 110220290,
<<"speed">> => 222,
<<"status">> => 262147,
<<"time">> => <<"170831104938">>
},
#{
<<"alarm">> => 0,
<<"altitude">> => 37,
<<"direction">> => 109,
<<"extra">> =>
#{
<<"analog">> =>
#{
<<"ad0">> => 0,
<<"ad1">> => 0
},
<<"gnss_sat_num">> => 12,
<<"io_status">> =>
#{
<<"deep_sleep">> => 0,
<<"sleep">> => 0
},
<<"mileage">> => 15449,
<<"rssi">> => 0,
<<"signal">> =>
#{
<<"abs">> => 0,
<<"air_conditioner">> =>
0,
<<"brake">> => 0,
<<"cluth">> => 0,
<<"fog">> => 0,
<<"heater">> => 0,
<<"high_beam">> => 0,
<<"horn">> => 0,
<<"left_turn">> => 0,
<<"low_beam">> => 0,
<<"neutral">> => 0,
<<"retarder">> => 0,
<<"reverse">> => 0,
<<"right_turn">> => 0,
<<"side_marker">> => 0
},
<<"speed">> => 0
},
<<"latitude">> => 20016606,
<<"longitude">> => 110223696,
<<"speed">> => 425,
<<"status">> => 262147,
<<"time">> => <<"170831105011">>
},
#{
<<"alarm">> => 0,
<<"altitude">> => 43,
<<"direction">> => 114,
<<"extra">> =>
#{
<<"analog">> =>
#{
<<"ad0">> => 0,
<<"ad1">> => 0
},
<<"gnss_sat_num">> => 12,
<<"io_status">> =>
#{
<<"deep_sleep">> => 0,
<<"sleep">> => 0
},
<<"mileage">> => 15459,
<<"rssi">> => 0,
<<"signal">> =>
#{
<<"abs">> => 0,
<<"air_conditioner">> =>
0,
<<"brake">> => 0,
<<"cluth">> => 0,
<<"fog">> => 0,
<<"heater">> => 0,
<<"high_beam">> => 0,
<<"horn">> => 0,
<<"left_turn">> => 0,
<<"low_beam">> => 0,
<<"neutral">> => 0,
<<"retarder">> => 0,
<<"reverse">> => 0,
<<"right_turn">> => 0,
<<"side_marker">> => 0
},
<<"speed">> => 0
},
<<"latitude">> => 20011755,
<<"longitude">> => 110231576,
<<"speed">> => 648,
<<"status">> => 262147,
<<"time">> => <<"170831105111">>
},
#{
<<"alarm">> => 0,
<<"altitude">> => 43,
<<"direction">> => 83,
<<"extra">> =>
#{
<<"analog">> =>
#{
<<"ad0">> => 0,
<<"ad1">> => 0
},
<<"gnss_sat_num">> => 12,
<<"io_status">> =>
#{
<<"deep_sleep">> => 0,
<<"sleep">> => 0
},
<<"mileage">> => 15471,
<<"rssi">> => 0,
<<"signal">> =>
#{
<<"abs">> => 0,
<<"air_conditioner">> =>
0,
<<"brake">> => 0,
<<"cluth">> => 0,
<<"fog">> => 0,
<<"heater">> => 0,
<<"high_beam">> => 0,
<<"horn">> => 0,
<<"left_turn">> => 0,
<<"low_beam">> => 0,
<<"neutral">> => 0,
<<"retarder">> => 0,
<<"reverse">> => 0,
<<"right_turn">> => 0,
<<"side_marker">> => 0
},
<<"speed">> => 0
},
<<"latitude">> => 20011896,
<<"longitude">> => 110242581,
<<"speed">> => 759,
<<"status">> => 262147,
<<"time">> => <<"170831105214">>
}
],
<<"type">> => 1
},
<<"header">> =>
#{
<<"encrypt">> => 0,
<<"len">> => 313,
<<"msg_id">> => 1796,
<<"msg_sn">> => 231,
<<"phone">> => <<"018876638972">>
}
},
Map
),
?assertEqual(<<>>, Rest),
?assertEqual(#{data => <<>>, phase => searching_head_hex7e}, State),
ok.
t_case10_segmented_packet(_Config) ->
Parser = emqx_jt808_frame:initial_parse_state(#{}),
Manuf = <<"examp">>,
Model = <<"33333333333333333333">>,
DevId = <<"1234567">>,
Color = 3,
Plate = <<"ujvl239">>,
RegisterPacket =
<<58:?word, 59:?word, Manuf/binary, Model/binary, DevId/binary, Color, Plate/binary>>,
MsgId = 16#0100,
PhoneBCD = <<16#00, 16#01, 16#23, 16#45, 16#67, 16#89>>,
MsgSn = 78,
Size = size(RegisterPacket),
Header =
<<MsgId:?word, ?RESERVE:2, ?NO_FRAGMENT:1, ?NO_ENCRYPT:3, ?MSG_SIZE(Size), PhoneBCD/binary,
MsgSn:?word>>,
Stream = encode(Header, RegisterPacket),
<<Part1:7/binary, Part2/binary>> = Stream,
{more, Parser2} = emqx_jt808_frame:parse(Part1, Parser),
?assertMatch(#{phase := escaping_hex7d}, Parser2),
{ok, Map, Rest, State} = emqx_jt808_frame:parse(Part2, Parser2),
?assertEqual(
#{
<<"header">> => #{
<<"msg_id">> => MsgId,
<<"encrypt">> => ?NO_ENCRYPT,
<<"phone">> => <<"000123456789">>,
<<"len">> => Size,
<<"msg_sn">> => MsgSn
},
<<"body">> => #{
<<"province">> => 58,
<<"city">> => 59,
<<"manufacturer">> => Manuf,
<<"model">> => Model,
<<"dev_id">> => DevId,
<<"color">> => Color,
<<"license_number">> => Plate
}
},
Map
),
?assertEqual(<<>>, Rest),
?assertEqual(#{data => <<>>, phase => searching_head_hex7e}, State),
ok.
t_case11_prefix_register(_Config) ->
Parser = emqx_jt808_frame:initial_parse_state(#{}),
Manuf = <<"examp">>,
Model = <<"33333333333333333333">>,
DevId = <<"1234567">>,
Color = 3,
Plate = <<"ujvl239">>,
RegisterPacket =
<<58:?word, 59:?word, Manuf/binary, Model/binary, DevId/binary, Color, Plate/binary>>,
MsgId = 16#0100,
PhoneBCD = <<16#00, 16#01, 16#23, 16#45, 16#67, 16#89>>,
MsgSn = 78,
Size = size(RegisterPacket),
Header =
<<MsgId:?word, ?RESERVE:2, ?NO_FRAGMENT:1, ?NO_ENCRYPT:3, ?MSG_SIZE(Size), PhoneBCD/binary,
MsgSn:?word>>,
Stream = encode(Header, RegisterPacket),
MessBinary = <<0, 1, 2, 3, Stream/binary>>,
?LOGT("MessBinary=~p", [binary_to_hex_string(MessBinary)]),
{ok, Map, Rest, State} = emqx_jt808_frame:parse(MessBinary, Parser),
?assertEqual(
#{
<<"header">> => #{
<<"msg_id">> => MsgId,
<<"encrypt">> => ?NO_ENCRYPT,
<<"phone">> => <<"000123456789">>,
<<"len">> => Size,
<<"msg_sn">> => MsgSn
},
<<"body">> => #{
<<"province">> => 58,
<<"city">> => 59,
<<"manufacturer">> => Manuf,
<<"model">> => Model,
<<"dev_id">> => DevId,
<<"color">> => Color,
<<"license_number">> => Plate
}
},
Map
),
?assertEqual(<<>>, Rest),
?assertEqual(#{data => <<>>, phase => searching_head_hex7e}, State),
ok.
t_case12_0x7e_in_message(_Config) ->
Parser = emqx_jt808_frame:initial_parse_state(#{}),
Manuf = <<"examp">>,
Model = <<"33333333333333333333">>,
DevId = <<"1234567">>,
Color = 3,
Plate = <<"ujvl239">>,
% pay attention to this
AlarmInt = 16#7e,
AlarmDigit = 16#7d,
RegisterPacket =
<<AlarmInt:?word, AlarmDigit:?word, Manuf/binary, Model/binary, DevId/binary, Color,
Plate/binary>>,
MsgId = 16#0100,
PhoneBCD = <<16#00, 16#01, 16#23, 16#45, 16#67, 16#89>>,
MsgSn = 78,
Size = size(RegisterPacket),
Header =
<<MsgId:?word, ?RESERVE:2, ?NO_FRAGMENT:1, ?NO_ENCRYPT:3, ?MSG_SIZE(Size), PhoneBCD/binary,
MsgSn:?word>>,
Stream = encode(Header, RegisterPacket),
{ok, Map, Rest, State} = emqx_jt808_frame:parse(Stream, Parser),
?assertEqual(
#{
<<"header">> => #{
<<"msg_id">> => MsgId,
<<"encrypt">> => ?NO_ENCRYPT,
<<"phone">> => <<"000123456789">>,
<<"len">> => Size,
<<"msg_sn">> => MsgSn
},
<<"body">> => #{
<<"province">> => AlarmInt,
<<"city">> => AlarmDigit,
<<"manufacturer">> => Manuf,
<<"model">> => Model,
<<"dev_id">> => DevId,
<<"color">> => Color,
<<"license_number">> => Plate
}
},
Map
),
?assertEqual(<<>>, Rest),
?assertEqual(#{data => <<>>, phase => searching_head_hex7e}, State),
ok.
t_case13_partial_0x7d_in_message(_Config) ->
Parser = emqx_jt808_frame:initial_parse_state(#{}),
Manuf = <<"examp">>,
Model = <<"33333333333333333333">>,
DevId = <<"1234567">>,
Color = 3,
Plate = <<"ujvl239">>,
% pay attention to this
AlarmInt = 16#7e,
AlarmDigit = 16#7d,
RegisterPacket =
<<AlarmInt:?word, AlarmDigit:?word, Manuf/binary, Model/binary, DevId/binary, Color,
Plate/binary>>,
MsgId = 16#0100,
PhoneBCD = <<16#00, 16#01, 16#23, 16#45, 16#67, 16#89>>,
MsgSn = 78,
Size = size(RegisterPacket),
Header =
<<MsgId:?word, ?RESERVE:2, ?NO_FRAGMENT:1, ?NO_ENCRYPT:3, ?MSG_SIZE(Size), PhoneBCD/binary,
MsgSn:?word>>,
Stream = encode(Header, RegisterPacket),
<<Part1:15/binary, Part2/binary>> = Stream,
<<_:14/binary, 16#7d:8>> = Part1,
{more, Parser2} = emqx_jt808_frame:parse(Part1, Parser),
?assertMatch(#{phase := escaping_hex7d}, Parser2),
{ok, Map, Rest, State} = emqx_jt808_frame:parse(Part2, Parser2),
?assertEqual(
#{
<<"header">> => #{
<<"msg_id">> => MsgId,
<<"encrypt">> => ?NO_ENCRYPT,
<<"phone">> => <<"000123456789">>,
<<"len">> => Size,
<<"msg_sn">> => MsgSn
},
<<"body">> => #{
<<"province">> => AlarmInt,
<<"city">> => AlarmDigit,
<<"manufacturer">> => Manuf,
<<"model">> => Model,
<<"dev_id">> => DevId,
<<"color">> => Color,
<<"license_number">> => Plate
}
},
Map
),
?assertEqual(<<>>, Rest),
?assertEqual(#{data => <<>>, phase => searching_head_hex7e}, State),
ok.
t_case14_custome_location_data(_) ->
Bin =
<<126, 2, 0, 0, 64, 1, 65, 72, 7, 53, 80, 3, 106, 0, 0, 0, 0, 0, 0, 0, 1, 1, 195, 232, 22,
6, 89, 10, 16, 0, 0, 0, 0, 0, 0, 33, 6, 34, 9, 21, 69, 1, 4, 0, 0, 0, 0, 48, 1, 23, 49,
1, 10, 235, 22, 0, 12, 0, 178, 137, 134, 4, 66, 25, 25, 144, 147, 71, 153, 0, 6, 0, 137,
255, 255, 255, 255, 36, 126>>,
Parser = emqx_jt808_frame:initial_parse_state(#{}),
{ok, Packet, Rest, State} = emqx_jt808_frame:parse(Bin, Parser),
?assertEqual(<<>>, Rest),
?assertEqual(#{data => <<>>, phase => searching_head_hex7e}, State),
_ = emqx_utils_json:encode(Packet).
t_case14_reserved_location_data(_) ->
Bin =
<<126, 2, 0, 0, 49, 1, 145, 114, 3, 130, 104, 2, 0, 0, 0, 0, 0, 0, 0, 0, 3, 1, 211, 181,
215, 6, 51, 228, 71, 0, 4, 0, 0, 0, 138, 34, 4, 25, 22, 5, 70, 1, 4, 0, 0, 0, 0, 5, 3,
0, 0, 0, 48, 1, 31, 49, 1, 15, 130, 2, 0, 125, 2, 23, 126>>,
Parser = emqx_jt808_frame:initial_parse_state(#{}),
{ok, Packet, Rest, State} = emqx_jt808_frame:parse(Bin, Parser),
?assertEqual(<<>>, Rest),
?assertEqual(#{data => <<>>, phase => searching_head_hex7e}, State),
_ = emqx_utils_json:encode(Packet).
t_case15_custome_client_query_ack(_) ->
Bin =
<<126, 1, 4, 2, 17, 78, 244, 128, 18, 137, 25, 0, 43, 0, 42, 52, 0, 0, 0, 1, 4, 0, 0, 0,
180, 0, 0, 0, 8, 4, 0, 0, 1, 44, 0, 0, 0, 16, 5, 99, 109, 110, 101, 116, 0, 0, 0, 17, 0,
0, 0, 0, 18, 0, 0, 0, 0, 19, 12, 52, 55, 46, 57, 57, 46, 57, 56, 46, 50, 53, 52, 0, 0,
0, 23, 7, 48, 46, 48, 46, 48, 46, 48, 0, 0, 0, 24, 4, 0, 0, 31, 249, 0, 0, 0, 32, 4, 0,
0, 0, 0, 0, 0, 0, 39, 4, 0, 0, 0, 30, 0, 0, 0, 41, 4, 0, 0, 0, 30, 0, 0, 0, 48, 4, 0, 0,
0, 20, 0, 0, 0, 64, 1, 0, 0, 0, 0, 67, 1, 0, 0, 0, 0, 68, 1, 0, 0, 0, 0, 69, 4, 0, 0, 0,
0, 0, 0, 0, 72, 1, 0, 0, 0, 0, 73, 1, 0, 0, 0, 0, 85, 4, 0, 0, 0, 120, 0, 0, 0, 86, 4,
0, 0, 0, 20, 0, 0, 0, 93, 2, 0, 30, 0, 0, 0, 128, 4, 0, 0, 136, 203, 0, 0, 0, 129, 2, 0,
0, 0, 0, 0, 130, 2, 0, 0, 0, 0, 0, 131, 0, 0, 0, 0, 132, 1, 0, 0, 0, 240, 1, 2, 16, 0,
0, 0, 240, 2, 2, 3, 57, 0, 0, 240, 3, 1, 41, 0, 0, 240, 4, 2, 0, 116, 0, 0, 240, 5, 4,
0, 53, 111, 127, 0, 0, 240, 6, 4, 0, 3, 51, 18, 0, 0, 240, 7, 1, 1, 0, 0, 240, 8, 2, 0,
30, 0, 0, 240, 10, 2, 0, 30, 0, 0, 240, 11, 6, 0, 0, 0, 0, 0, 0, 0, 0, 240, 12, 1, 1, 0,
0, 240, 13, 18, 117, 112, 103, 114, 97, 100, 101, 46, 99, 112, 115, 100, 110, 97, 46,
99, 111, 109, 0, 0, 240, 14, 4, 0, 0, 123, 40, 0, 0, 240, 15, 1, 1, 0, 0, 240, 16, 3, 0,
16, 3, 0, 0, 240, 17, 2, 0, 32, 0, 0, 240, 18, 3, 0, 22, 40, 0, 0, 240, 19, 4, 0, 17,
25, 32, 0, 0, 240, 20, 3, 55, 0, 10, 0, 0, 240, 21, 1, 50, 0, 0, 240, 22, 2, 3, 25, 0,
0, 240, 23, 2, 30, 50, 0, 0, 240, 24, 21, 48, 44, 48, 44, 53, 56, 46, 50, 49, 53, 46,
53, 48, 46, 53, 48, 44, 54, 48, 48, 56, 0, 0, 241, 1, 1, 160, 0, 0, 241, 2, 1, 130, 0,
0, 255, 148, 93, 14, 22, 1, 56, 57, 56, 54, 48, 52, 57, 53, 49, 48, 50, 49, 55, 48, 51,
48, 52, 53, 53, 56, 17, 2, 52, 54, 48, 48, 56, 49, 53, 56, 51, 52, 48, 52, 53, 53, 56,
4, 3, 0, 1, 3, 5, 0, 3, 6, 0, 3, 7, 0, 5, 8, 3, 0, 1, 5, 8, 14, 0, 0, 5, 8, 15, 0, 0, 5,
8, 41, 0, 0, 5, 8, 48, 0, 0, 5, 8, 42, 0, 0, 5, 8, 43, 0, 0, 5, 8, 4, 0, 0, 138, 126>>,
Parser = emqx_jt808_frame:initial_parse_state(#{}),
{ok, Packet, Rest, State} = emqx_jt808_frame:parse(Bin, Parser),
?assertEqual(<<>>, Rest),
?assertEqual(#{data => <<>>, phase => searching_head_hex7e}, State),
_ = emqx_utils_json:encode(Packet).
encode(Header, Body) ->
S1 = <<Header/binary, Body/binary>>,
Crc = make_crc(S1, undefined),
S2 = do_escape(<<S1/binary, Crc:8>>),
Stream = <<16#7e:8, S2/binary, 16#7e:8>>,
%?LOGT("encode a packet=~p", [binary_to_hex_string(Stream)]),
Stream.
make_crc(<<>>, Xor) ->
?LOGT("crc is ~p", [Xor]),
Xor;
make_crc(<<C:8, Rest/binary>>, undefined) ->
make_crc(Rest, C);
make_crc(<<C:8, Rest/binary>>, Xor) ->
make_crc(Rest, C bxor Xor).
do_escape(Binary) ->
do_escape(Binary, <<>>).
do_escape(<<>>, Acc) ->
Acc;
do_escape(<<16#7e, Rest/binary>>, Acc) ->
do_escape(Rest, <<Acc/binary, 16#7d, 16#02>>);
do_escape(<<16#7d, Rest/binary>>, Acc) ->
do_escape(Rest, <<Acc/binary, 16#7d, 16#01>>);
do_escape(<<C, Rest/binary>>, Acc) ->
do_escape(Rest, <<Acc/binary, C:8>>).
binary_to_hex_string(Data) ->
lists:flatten([io_lib:format("~2.16.0B ", [X]) || <<X:8>> <= Data]).

View File

@ -5,7 +5,7 @@ is a protocol designed for IoT devices and machine-to-machine communication.
It is a lightweight protocol that supports devices with limited processing power and memory.
The **LwM2M Gateway** in EMQX can accept LwM2M clients and translate theirevents
The **LwM2M Gateway** in EMQX can accept LwM2M clients and translate their events
and messages into MQTT Publish messages.
In the current implementation, it has the following limitations:
@ -15,7 +15,7 @@ In the current implementation, it has the following limitations:
## Quick Start
In EMQX 5.0, LwM2M gateways can be configured and enabled through the Dashboard.
In EMQX 5.0, LwM2M gateway can be configured and enabled through the Dashboard.
It can also be enabled via the HTTP API, and emqx.conf e.g, In emqx.conf:
@ -52,7 +52,6 @@ gateway.lwm2m {
> Note:
> Configuring the gateway via emqx.conf requires changes on a per-node basis,
> but configuring it via Dashboard or the HTTP API will take effect across the cluster.
:::
## Object definations

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [ {emqx, {path, "../../apps/emqx"}},
{emqx_gateway, {path, "../../apps/emqx_gateway"}}

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_lwm2m, [
{description, "LwM2M Gateway"},
{vsn, "0.1.3"},

View File

@ -15,6 +15,7 @@
%%--------------------------------------------------------------------
-module(emqx_lwm2m_channel).
-behaviour(emqx_gateway_channel).
-include("emqx_lwm2m.hrl").
-include_lib("emqx/include/emqx.hrl").

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_mqttsn, [
{description, "MQTT-SN Gateway"},
{vsn, "0.1.5"},

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [
{emqx, {path, "../../apps/emqx"}},

View File

@ -1,3 +1,4 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_stomp, [
{description, "Stomp Gateway"},
{vsn, "0.1.3"},

View File

@ -35,7 +35,7 @@ fields(stomp_frame) ->
non_neg_integer(),
#{
default => 10,
desc => ?DESC(stom_frame_max_headers)
desc => ?DESC(stomp_frame_max_headers)
}
)},
{max_headers_length,
@ -51,7 +51,7 @@ fields(stomp_frame) ->
integer(),
#{
default => 65536,
desc => ?DESC(stom_frame_max_body_length)
desc => ?DESC(stomp_frame_max_body_length)
}
)}
].

View File

@ -128,6 +128,7 @@
emqx_audit,
emqx_gateway_gbt32960,
emqx_gateway_ocpp,
emqx_gateway_jt808,
emqx_bridge_syskeeper
],
%% must always be of type `load'

View File

@ -0,0 +1 @@
Introduced a new gateway for vehicles to access EMQX through the JT/T 808 protocol.

View File

@ -0,0 +1 @@
Introduced a new gateway for Electric vehicle (EV) charging stations to access EMQX through the OCPP (Open Charge Point Protocol).

View File

@ -218,6 +218,7 @@ defmodule EMQXUmbrella.MixProject do
:emqx_audit,
:emqx_gateway_gbt32960,
:emqx_gateway_ocpp,
:emqx_gateway_jt808,
:emqx_bridge_syskeeper
])
end

View File

@ -113,6 +113,7 @@ is_community_umbrella_app("apps/emqx_dashboard_sso") -> false;
is_community_umbrella_app("apps/emqx_audit") -> false;
is_community_umbrella_app("apps/emqx_gateway_gbt32960") -> false;
is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false;
is_community_umbrella_app("apps/emqx_gateway_jt808") -> false;
is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false;
is_community_umbrella_app(_) -> true.

View File

@ -44,7 +44,7 @@ gateway_common_listener_enable.desc:
"""Enable the listener."""
gateway_common_listener_enable_authn.desc:
"""Set <code>true</code> (default) to enable client authentication on this listener.
"""Set <code>true</code> (default) to enable client authentication on this listener.
When set to <code>false</code> clients will be allowed to connect without authentication."""
gateway_common_listener_max_conn_rate.desc:
@ -62,9 +62,10 @@ then the client actually subscribes to the topic `some_tenant/t`.
Similarly, if another client B (connected to the same listener as the client A) sends a message to topic `t`,
the message is routed to all the clients subscribed `some_tenant/t`,
so client A will receive the message, with topic name `t`. Set to `""` to disable the feature.
Variables in mountpoint string:<br/>
Supported placeholders in mountpoint string:<br/>
- <code>${clientid}</code>: clientid<br/>
- <code>${username}</code>: username"""
- <code>${username}</code>: username<br/>
- <code>${endpoint_name}</code>: endpoint name"""
listener_name_to_settings_map.desc:
"""A map from listener names to listener settings."""

View File

@ -0,0 +1,30 @@
emqx_jt808_schema {
jt808_frame_max_length.desc:
"""The maximum length of the JT/T 808 frame."""
jt808_allow_anonymous.desc:
"""Allow anonymous access to the JT/T 808 Gateway."""
registry_url.desc
"""The JT/T 808 device registry central URL."""
authentication_url.desc
"""The JT/T 808 device authentication central URL."""
jt808_up_topic.desc
"""The topic of the JT/T 808 protocol upstream message."""
jt808_dn_topic.desc
"""The topic of the JT/T 808 protocol downstream message."""
retry_interval.desc:
"""Re-send time interval"""
max_retry_times.desc:
"""Re-send max times"""
message_queue_len.desc:
"""Max message queue length"""
}

View File

@ -1,16 +1,16 @@
emqx_stomp_schema {
stom_frame_max_body_length.desc:
"""Maximum number of bytes of Body allowed per Stomp packet"""
stom_frame_max_headers.desc:
"""The maximum number of Header"""
stomp.desc:
"""The Stomp Gateway configuration.
This gateway supports v1.2/1.1/1.0"""
stomp_frame_max_headers.desc:
"""The maximum number of Header"""
stomp_frame_max_headers_length.desc:
"""The maximum string length of the Header Value"""
stomp_frame_max_body_length.desc:
"""Maximum number of bytes of Body allowed per Stomp packet"""
}