feat(gw_jt808): port jt808 gateway from 4.4

This commit is contained in:
JimMoen 2023-11-12 21:44:18 +08:00
parent 526faa0362
commit 4d493292fb
No known key found for this signature in database
GPG Key ID: 87A520B4F76BA86D
17 changed files with 2776 additions and 5 deletions

View File

@ -381,7 +381,8 @@ fields(Gw) when
Gw == lwm2m;
Gw == exproto;
Gw == gbt32960;
Gw == ocpp
Gw == ocpp;
Gw == jt808
->
[{name, mk(Gw, #{desc => ?DESC(gateway_name)})}] ++
convert_listener_struct(emqx_gateway_schema:gateway_schema(Gw));
@ -392,7 +393,8 @@ fields(Gw) when
Gw == update_lwm2m;
Gw == update_exproto;
Gw == update_gbt32960;
Gw == update_ocpp
Gw == update_ocpp;
Gw == update_jt808
->
"update_" ++ GwStr = atom_to_list(Gw),
Gw1 = list_to_existing_atom(GwStr),

27
apps/emqx_gateway_jt808/.gitignore vendored Normal file
View File

@ -0,0 +1,27 @@
.eunit
deps
*.o
*.beam
*.plt
erl_crash.dump
ebin/*
rel/example_project
.concrete/DEV_MODE
.rebar
.erlang.mk/
emqx_connect_jt808.d
data/
!data/app.config
.idea/
*.iml
emqx_gateway_jt808.d
logs/
cover/
ct.coverdata
eunit.coverdata
test/ct.cover.spec
_build/
etc/emqx_gateway_jt808.conf.rendered
*.swp
rebar.lock
.rebar3/

View File

@ -0,0 +1,94 @@
Business Source License 1.1
Licensor: Hangzhou EMQ Technologies Co., Ltd.
Licensed Work: EMQX Enterprise Edition
The Licensed Work is (c) 2023
Hangzhou EMQ Technologies Co., Ltd.
Additional Use Grant: Students and educators are granted right to copy,
modify, and create derivative work for research
or education.
Change Date: 2027-02-01
Change License: Apache License, Version 2.0
For information about alternative licensing arrangements for the Software,
please contact Licensor: https://www.emqx.com/en/contact
Notice
The Business Source License (this document, or the “License”) is not an Open
Source license. However, the Licensed Work will eventually be made available
under an Open Source License, as stated in this License.
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
“Business Source License” is a trademark of MariaDB Corporation Ab.
-----------------------------------------------------------------------------
Business Source License 1.1
Terms
The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited
production use.
Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.
If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.
All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.
You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.
Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.
This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.
MariaDB hereby grants you permission to use this Licenses text to license
your works, and to refer to it using the trademark “Business Source License”,
as long as you comply with the Covenants of Licensor below.
Covenants of Licensor
In consideration of the right to use this Licenses text and the “Business
Source License” name and trademark, Licensor covenants to MariaDB, and to all
other recipients of the licensed work to be provided by Licensor:
1. To specify as the Change License the GPL Version 2.0 or any later version,
or a license that is compatible with GPL Version 2.0 or a later version,
where “compatible” means that software provided under the Change License can
be included in a program with software provided under GPL Version 2.0 or a
later version. Licensor may specify additional Change Licenses without
limitation.
2. To either: (a) specify an additional grant of rights to use that does not
impose any additional restriction on the right granted in this License, as
the Additional Use Grant; or (b) insert the text “None”.
3. To specify a Change Date.
4. Not to modify this License in any other way.

View File

@ -0,0 +1,26 @@
# emqx_jt808
The JT/T 808 protocol is designed for data interaction between vehicle-mounted satellite terminals and IoT platforms.
The **JT/T 808 Gateway** in EMQX can accept JT/T 808 clients and translate their events
and messages into MQTT Publish messages.
In the current implementation, it has the following limitations:
- Only supports JT/T 808-2013 protocol, JT/T 808-2019 is not supported yet.
- Based TCP/TLS transport.
- Third-party authentication/registration http service required.
## Quick Start
In EMQX 5.0, JT/T 808 gateway can be configured and enabled through the Dashboard.
It can also be enabled via the HTTP API, and emqx.conf e.g, In emqx.conf:
```
```
> Note:
> Configuring the gateway via emqx.conf requires changes on a per-node basis,
> but configuring it via Dashboard or the HTTP API will take effect across the cluster.
More documentations: [JT/T 808 Gateway](https://www.emqx.io/docs/en/v5.0/gateway/jt808.html)

View File

@ -0,0 +1,195 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-ifndef(EMQX_JT808_HRL).
-define(EMQX_JT808_HRL, true).
%%--------------------------------------------------------------------
%% Message Ids
%%--------------------------------------------------------------------
%% Message Ids of client to server
-define(MC_GENERAL_RESPONSE, 16#0001).
-define(MC_HEARTBEAT, 16#0002).
-define(MC_REGISTER, 16#0100).
-define(MC_DEREGISTER, 16#0003).
-define(MC_AUTH, 16#0102).
-define(MC_QUERY_PARAM_ACK, 16#0104).
-define(MC_QUERY_ATTRIB_ACK, 16#0107).
-define(MC_OTA_ACK, 16#0108).
-define(MC_LOCATION_REPORT, 16#0200).
-define(MC_QUERY_LOCATION_ACK, 16#0201).
-define(MC_EVENT_REPORT, 16#0301).
-define(MC_QUESTION_ACK, 16#0302).
-define(MC_INFO_REQ_CANCEL, 16#0303).
-define(MC_VEHICLE_CTRL_ACK, 16#0500).
-define(MC_DRIVE_RECORD_REPORT, 16#0700).
-define(MC_WAYBILL_REPORT, 16#0701).
-define(MC_DRIVER_ID_REPORT, 16#0702).
-define(MC_BULK_LOCATION_REPORT, 16#0704).
-define(MC_CAN_BUS_REPORT, 16#0705).
-define(MC_MULTIMEDIA_EVENT_REPORT, 16#0800).
-define(MC_MULTIMEDIA_DATA_REPORT, 16#0801).
-define(MC_CAMERA_SHOT_ACK, 16#0805).
-define(MC_MM_DATA_SEARCH_ACK, 16#0802).
-define(MC_SEND_TRANSPARENT_DATA, 16#0900).
-define(MC_SEND_ZIP_DATA, 16#0901).
-define(MC_RSA_KEY, 16#0A00).
%% Message Ids of server to client
-define(MS_GENERAL_RESPONSE, 16#8001).
-define(MS_REQUEST_FRAGMENT, 16#8003).
-define(MS_REGISTER_ACK, 16#8100).
-define(MS_SET_CLIENT_PARAM, 16#8103).
-define(MS_QUERY_CLIENT_ALL_PARAM, 16#8104).
-define(MS_CLIENT_CONTROL, 16#8105).
-define(MS_QUERY_CLIENT_PARAM, 16#8106).
-define(MS_QUERY_CLIENT_ATTRIB, 16#8107).
-define(MS_OTA, 16#8108).
-define(MS_QUERY_LOCATION, 16#8201).
-define(MS_TRACE_LOCATION, 16#8202).
-define(MS_CONFIRM_ALARM, 16#8203).
-define(MS_SEND_TEXT, 16#8300).
-define(MS_SET_EVENT, 16#8301).
-define(MS_SEND_QUESTION, 16#8302).
-define(MS_SET_MENU, 16#8303).
-define(MS_INFO_CONTENT, 16#8304).
-define(MS_PHONE_CALLBACK, 16#8400).
-define(MS_SET_PHONE_NUMBER, 16#8401).
-define(MS_VEHICLE_CONTROL, 16#8500).
-define(MS_SET_CIRCLE_AREA, 16#8600).
-define(MS_DEL_CIRCLE_AREA, 16#8601).
-define(MS_SET_RECT_AREA, 16#8602).
-define(MS_DEL_RECT_AREA, 16#8603).
-define(MS_SET_POLY_AREA, 16#8604).
-define(MS_DEL_POLY_AREA, 16#8605).
-define(MS_SET_PATH, 16#8606).
-define(MS_DEL_PATH, 16#8607).
-define(MS_DRIVE_RECORD_CAPTURE, 16#8700).
-define(MS_DRIVE_REC_PARAM_SEND, 16#8701).
-define(MS_REQ_DRIVER_ID, 16#8702).
-define(MS_MULTIMEDIA_DATA_ACK, 16#8800).
-define(MS_CAMERA_SHOT, 16#8801).
-define(MS_MM_DATA_SEARCH, 16#8802).
-define(MS_MM_DATA_UPLOAD, 16#8803).
-define(MS_VOICE_RECORD, 16#8804).
-define(MS_SINGLE_MM_DATA_CTRL, 16#8805).
-define(MS_SEND_TRANSPARENT_DATA, 16#8900).
-define(MS_RSA_KEY, 16#8A00).
%% Client Params
-define(CP_HEARTBEAT_DURATION, 16#0001).
-define(CP_TCP_TIMEOUT, 16#0002).
-define(CP_TCP_RETX, 16#0003).
-define(CP_UDP_TIMEOUT, 16#0004).
-define(CP_UDP_RETX, 16#0005).
-define(CP_SMS_TIMEOUT, 16#0006).
-define(CP_SMS_RETX, 16#0007).
-define(CP_SERVER_APN, 16#0010).
-define(CP_DIAL_USERNAME, 16#0011).
-define(CP_DIAL_PASSWORD, 16#0012).
-define(CP_SERVER_ADDRESS, 16#0013).
-define(CP_BACKUP_SERVER_APN, 16#0014).
-define(CP_BACKUP_DIAL_USERNAME, 16#0015).
-define(CP_BACKUP_DIAL_PASSWORD, 16#0016).
-define(CP_BACKUP_SERVER_ADDRESS, 16#0017).
-define(CP_SERVER_TCP_PORT, 16#0018).
-define(CP_SERVER_UDP_PORT, 16#0019).
-define(CP_IC_CARD_SERVER_ADDRESS, 16#001A).
-define(CP_IC_CARD_SERVER_TCP_PORT, 16#001B).
-define(CP_IC_CARD_SERVER_UDP_PORT, 16#001C).
-define(CP_IC_CARD_BACKUP_SERVER_ADDRESS, 16#001D).
-define(CP_POS_REPORT_POLICY, 16#0020).
-define(CP_POS_REPORT_CONTROL, 16#0021).
-define(CP_DRIVER_NLOGIN_REPORT_INTERVAL, 16#0022).
-define(CP_REPORT_INTERVAL_DURING_SLEEP, 16#0027).
-define(CP_EMERGENCY_ALARM_REPORT_INTERVAL, 16#0028).
-define(CP_DEFAULT_REPORT_INTERVAL, 16#0029).
-define(CP_DEFAULT_DISTANCE_INTERVAL, 16#002C).
-define(CP_DRIVER_NLOGIN_DISTANCE_INTERVAL, 16#002D).
-define(CP_DISTANCE_INTERVAL_DURING_SLEEP, 16#002E).
-define(CP_EMERGENCY_ALARM_DISTANCE_INTERVAL, 16#002F).
-define(CP_SET_TURN_ANGLE, 16#0030).
-define(CP_EFENCE_RADIUS, 16#0031).
-define(CP_MONITOR_PHONE, 16#0040).
-define(CP_RESETING_PHONE, 16#0041).
-define(CP_RECOVERY_PHONE, 16#0042).
-define(CP_SMS_MONITOR_PHONE, 16#0043).
-define(CP_EMERGENCY_SMS_PHONE, 16#0044).
-define(CP_ACCEPT_CALL_POLICY, 16#0045).
-define(CP_MAX_CALL_DURATION, 16#0046).
-define(CP_MAX_CALL_DURATION_OF_MONTH, 16#0047).
-define(CP_SPY_PHONE, 16#0048).
-define(CP_PRIVILEGE_SMS_PHONE, 16#0049).
-define(CP_ALARM_MASK, 16#0050).
-define(CP_ALARM_SEND_SMS_MASK, 16#0051).
-define(CP_ALARM_CAMERA_SHOT_MASK, 16#0052).
-define(CP_ALARM_PICTURE_SAVE_MASK, 16#0053).
-define(CP_ALARM_KEY_MASK, 16#0054).
-define(CP_MAX_SPEED, 16#0055).
-define(CP_OVERSPEED_ELAPSED, 16#0056).
-define(CP_CONT_DRIVE_THRESHOLD, 16#0057).
-define(CP_ACC_DRIVE_TIME_ONE_DAY_THRESHOLD, 16#0058).
-define(CP_MIN_BREAK_TIME, 16#0059).
-define(CP_MAX_PARK_TIME, 16#005A).
-define(CP_OVERSPEED_ALARM_DELTA, 16#005B).
-define(CP_DRIVER_FATIGUE_ALARM_DELTA, 16#005C).
-define(CP_SET_CRASH_ALARM_PARAM, 16#005D).
-define(CP_SET_ROLLOVER_PARAM, 16#005E).
-define(CP_TIME_CONTROLED_CAMERA, 16#0064).
-define(CP_DISTANCE_CONTROLED_CAMERA, 16#0065).
-define(CP_PICTURE_QUALITY, 16#0070).
-define(CP_PICTURE_BRIGHTNESS, 16#0071).
-define(CP_PICTURE_CONTRAST, 16#0072).
-define(CP_PICTURE_SATURATE, 16#0073).
-define(CP_PICTURE_CHROMATICITY, 16#0074).
-define(CP_ODOMETER, 16#0080).
-define(CP_REGISTERED_PROVINCE, 16#0081).
-define(CP_REGISTERED_CITY, 16#0082).
-define(CP_VEHICLE_LICENSE_NUMBER, 16#0083).
-define(CP_VEHICLE_LICENSE_PLATE_COLOR, 16#0084).
-define(CP_GNSS_MODE, 16#0090).
-define(CP_GNSS_BAUDRATE, 16#0091).
-define(CP_GNSS_OUTPUT_RATE, 16#0092).
-define(CP_GNSS_SAMPLING_RATE, 16#0093).
-define(CP_GNSS_UPLOAD_MODE, 16#0094).
-define(CP_GNSS_UPLOAD_UNIT, 16#0095).
-define(CP_CAN_BUS_CH1_SAMPLING, 16#0100).
-define(CP_CAN_BUS_CH1_UPLOAD, 16#0101).
-define(CP_CAN_BUS_CH2_SAMPLING, 16#0102).
-define(CP_CAN_BUS_CH2_UPLOAD, 16#0103).
-define(CP_SET_CAN_BUS_ID_PARAM, 16#0110).
%% Extra info types in Position Report
-define(CP_POS_EXTRA_MILEAGE, 16#01).
-define(CP_POS_EXTRA_FUEL_METER, 16#02).
-define(CP_POS_EXTRA_SPEED, 16#03).
-define(CP_POS_EXTRA_ALARM_ID, 16#04).
-define(CP_POS_EXTRA_OVERSPEED_ALARM, 16#11).
-define(CP_POS_EXTRA_IN_OUT_ALARM, 16#12).
-define(CP_POS_EXTRA_PATH_TIME_ALARM, 16#13).
-define(CP_POS_EXTRA_EXPANDED_SIGNAL, 16#25).
-define(CP_POS_EXTRA_IO_STATUS, 16#2A).
-define(CP_POS_EXTRA_ANALOG, 16#2B).
-define(CP_POS_EXTRA_RSSI, 16#30).
-define(CP_POS_EXTRA_GNSS_SAT_NUM, 16#31).
-define(CP_POS_EXTRA_CUSTOME, 16#E0).
%% Default Configs
-define(DEFAULT_MOUNTPOINT, <<"jt808/${clientid}/">>).
-define(DEFAULT_UP_TOPIC, <<?DEFAULT_MOUNTPOINT/binary, "${phone}/up">>).
-define(DEFAULT_DN_TOPIC, <<?DEFAULT_MOUNTPOINT/binary, "${phone}/dn">>).
%% Supported placeholders
-define(PH_CLIENTID, <<"${clientid}">>).
-define(PH_PHONE, <<"${phone}">>).
-record(auth, {
allow_anonymous :: boolean(),
registry :: emqx_schema:url(),
authentication :: emqx_schema:url()
}).
-type auth() :: #auth{}.
-endif.

View File

@ -0,0 +1,7 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [
{emqx, {path, "../../apps/emqx"}},
{emqx_utils, {path, "../emqx_utils"}},
{emqx_gateway, {path, "../../apps/emqx_gateway"}}
]}.

View File

@ -0,0 +1,11 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_jt808, [
{description, "JT/T 808 Gateway"},
{vsn, "0.0.1"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []},
{modules, []},
{licenses, ["BSL"]},
{links, []}
]}.

View File

@ -0,0 +1,99 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% @doc The JT/T 808 Gateway implement
-module(emqx_gateway_jt808).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
%% define a gateway named jt808
-gateway(#{
name => jt808,
callback_module => ?MODULE,
config_schema_module => emqx_jt808_schema,
edition => ee
}).
%% callback_module must implement the emqx_gateway_impl behaviour
-behaviour(emqx_gateway_impl).
%% callback for emqx_gateway_impl
-export([
on_gateway_load/2,
on_gateway_update/3,
on_gateway_unload/2
]).
-import(
emqx_gateway_utils,
[
normalize_config/1,
start_listeners/4,
stop_listeners/2
]
).
%%--------------------------------------------------------------------
%% emqx_gateway_impl callbacks
%%--------------------------------------------------------------------
on_gateway_load(
_Gateway = #{
name := GwName,
config := Config
},
Ctx
) ->
Listeners = normalize_config(Config),
ModCfg = #{
frame_mod => emqx_jt808_frame,
chann_mod => emqx_jt808_channel
},
case
start_listeners(
Listeners, GwName, Ctx, ModCfg
)
of
{ok, ListenerPids} ->
%% FIXME: How to throw an exception to interrupt the restart logic ?
%% FIXME: Assign ctx to GwState
{ok, ListenerPids, _GwState = #{ctx => Ctx}};
{error, {Reason, Listener}} ->
throw(
{badconf, #{
key => listeners,
value => Listener,
reason => Reason
}}
)
end.
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
GwName = maps:get(name, Gateway),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old state???
on_gateway_unload(Gateway, GwState),
on_gateway_load(Gateway#{config => Config}, Ctx)
catch
Class:Reason:Stk ->
logger:error(
"Failed to update ~ts; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[GwName, Class, Reason, Stk]
),
{error, Reason}
end.
on_gateway_unload(
_Gateway = #{
name := GwName,
config := Config
},
_GwState
) ->
Listeners = normalize_config(Config),
stop_listeners(GwName, Listeners).

View File

@ -0,0 +1,101 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_jt808_auth).
-include("emqx_jt808.hrl").
-export([
init/1,
register/2,
authenticate/2
]).
-export_type([auth/0]).
init(#{allow_anonymous := true}) ->
#auth{registry = undefined, authentication = undefined, allow_anonymous = true};
init(#{registry := Reg, authentication := Auth, allow_anonymous := Anonymous}) ->
#auth{registry = Reg, authentication = Auth, allow_anonymous = Anonymous}.
register(_RegFrame, #auth{registry = undefined, allow_anonymous = true}) ->
{ok, anonymous};
register(_RegFrame, #auth{registry = undefined, allow_anonymous = false}) ->
{error, registry_server_not_existed};
register(RegFrame, #auth{registry = RegUrl}) ->
#{
<<"header">> := #{<<"phone">> := Phone},
<<"body">> := FBody
} = RegFrame,
Params = maps:merge(FBody, #{<<"phone">> => Phone}),
case request(RegUrl, Params) of
{ok, 200, Body} ->
case emqx_utils_json:safe_decode(Body, [return_maps]) of
{ok, #{<<"code">> := 0, <<"authcode">> := Authcode}} ->
{ok, Authcode};
{ok, #{<<"code">> := Code}} ->
{error, Code};
_ ->
{error, {invailed_resp, Body}}
end;
{ok, Code, Body} ->
{error, {unknown_resp, Code, Body}};
{error, Reason} ->
{error, Reason}
end.
authenticate(_AuthFrame, #auth{authentication = undefined, allow_anonymous = true}) ->
{ok, #{auth_result => true, anonymous => true}};
authenticate(_AuthFrame, #auth{authentication = undefined, allow_anonymous = false}) ->
{ok, #{auth_result => false, anonymous => false}};
authenticate(AuthFrame, #auth{authentication = AuthUrl}) ->
#{
<<"header">> := #{<<"phone">> := Phone},
<<"body">> := #{<<"code">> := AuthCode}
} = AuthFrame,
case request(AuthUrl, #{<<"code">> => AuthCode, <<"phone">> => Phone}) of
{ok, 200, _} ->
{ok, #{auth_result => true, anonymous => false}};
{ok, _, _} ->
{ok, #{auth_result => false, anonymous => false}};
{error, Reason} ->
{error, Reason}
end.
%%--------------------------------------------------------------------
%% Inernal functions
%%--------------------------------------------------------------------
request(Url, Params) ->
RetryOpts = #{times => 3, interval => 1000, backoff => 2.0},
Req = {Url, [], "application/json", emqx_utils_json:encode(Params)},
reply(request_(post, Req, [{autoredirect, true}], [{body_format, binary}], RetryOpts)).
request_(
Method,
Req,
HTTPOpts,
Opts,
RetryOpts = #{
times := Times,
interval := Interval,
backoff := BackOff
}
) ->
case httpc:request(Method, Req, HTTPOpts, Opts) of
{error, _Reason} when Times > 0 ->
timer:sleep(trunc(Interval)),
RetryOpts1 = RetryOpts#{
times := Times - 1,
interval := Interval * BackOff
},
request_(Method, Req, HTTPOpts, Opts, RetryOpts1);
Other ->
Other
end.
reply({ok, {{_, Code, _}, _Headers, Body}}) ->
{ok, Code, Body};
reply({error, Error}) ->
{error, Error}.

View File

@ -0,0 +1,951 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_jt808_channel).
-behaviour(emqx_gateway_channel).
-include("emqx_jt808.hrl").
-include_lib("emqx/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
%% behaviour callbacks
-export([
info/1,
info/2,
stats/1
]).
-export([
init/2,
handle_in/2,
handle_deliver/2,
handle_timeout/3,
handle_call/3,
handle_cast/2,
handle_info/2
]).
-export([
terminate/2
]).
-record(channel, {
%% Context
ctx :: emqx_gateway_ctx:context(),
%% ConnInfo
conninfo :: emqx_types:conninfo(),
%% ClientInfo
clientinfo :: emqx_types:clientinfo(),
%% Session
session :: undefined | map(),
%% Conn State
conn_state :: conn_state(),
%% Timers
timers :: #{atom() => undefined | disabled | reference()},
%% AuthCode
authcode :: undefined | anonymous | binary(),
%% Keepalive
keepalive,
%% Msg SN
msg_sn,
%% Down Topic
dn_topic,
%% Up Topic
up_topic,
%% Auth
auth :: emqx_jt808_auth:auth(),
%% Inflight
inflight :: emqx_inflight:inflight(),
mqueue :: queue:queue(),
max_mqueue_len,
rsa_key,
retx_interval,
retx_max_times
}).
-type conn_state() :: idle | connecting | connected | disconnected.
-type channel() :: #channel{}.
-type reply() ::
{outgoing, emqx_types:packet()}
| {outgoing, [emqx_types:packet()]}
| {event, conn_state() | updated}
| {close, Reason :: atom()}.
-type replies() :: reply() | [reply()].
-define(TIMER_TABLE, #{
alive_timer => keepalive,
retry_timer => retry_delivery
}).
-define(INFO_KEYS, [ctx, conninfo, zone, clientid, clientinfo, session, conn_state, authcode]).
-define(RETX_INTERVAL, 8000).
-define(RETX_MAX_TIME, 5).
-define(DEFAULT_KEEPALIVE, 300).
-define(MSG(MsgId), #{<<"header">> := #{<<"msg_id">> := MsgId}}).
-dialyzer({nowarn_function, init/2}).
%%--------------------------------------------------------------------
%% Info, Attrs and Caps
%%--------------------------------------------------------------------
%% @doc Get infos of the channel.
-spec info(channel()) -> emqx_types:infos().
info(Channel) ->
maps:from_list(info(?INFO_KEYS, Channel)).
-spec info(list(atom()) | atom(), channel()) -> term().
info(Keys, Channel) when is_list(Keys) ->
[{Key, info(Key, Channel)} || Key <- Keys];
info(ctx, #channel{ctx = Ctx}) ->
Ctx;
info(conninfo, #channel{conninfo = ConnInfo}) ->
ConnInfo;
info(zone, #channel{clientinfo = #{zone := Zone}}) ->
Zone;
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
ClientId;
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, _) ->
#{};
info(conn_state, #channel{conn_state = ConnState}) ->
ConnState;
info(authcode, #channel{authcode = AuthCode}) ->
AuthCode.
stats(_Channel) ->
[].
%%--------------------------------------------------------------------
%% Init the Channel
%%--------------------------------------------------------------------
-spec init(emqx_types:conninfo(), map()) -> channel().
init(
ConnInfo = #{
peername := {PeerHost, _Port},
sockname := {_Host, SockPort}
},
Options = #{
ctx := Ctx,
message_queue_len := MessageQueueLen,
proto := ProtoConf
}
) ->
% TODO: init rsa_key from user input
Peercert = maps:get(peercert, ConnInfo, undefined),
Mountpoint = maps:get(mountpoint, Options, ?DEFAULT_MOUNTPOINT),
ListenerId =
case maps:get(listener, Options, undefined) of
undefined -> undefined;
{GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName)
end,
ClientInfo = setting_peercert_infos(
Peercert,
#{
zone => default,
listener => ListenerId,
protocol => jt808,
peerhost => PeerHost,
sockport => SockPort,
clientid => undefined,
username => undefined,
is_bridge => false,
is_superuser => false,
mountpoint => Mountpoint
}
),
#channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo,
session = undefined,
conn_state = idle,
timers = #{},
authcode = undefined,
keepalive = maps:get(keepalive, Options, ?DEFAULT_KEEPALIVE),
msg_sn = 0,
% TODO: init rsa_key from user input
dn_topic = maps:get(dn_topic, ProtoConf, ?DEFAULT_DN_TOPIC),
up_topic = maps:get(up_topic, ProtoConf, ?DEFAULT_UP_TOPIC),
auth = emqx_jt808_auth:init(ProtoConf),
inflight = emqx_inflight:new(128),
mqueue = queue:new(),
max_mqueue_len = MessageQueueLen,
rsa_key = [0, <<0:1024>>],
retx_interval = maps:get(retry_interval, Options, ?RETX_INTERVAL),
retx_max_times = maps:get(max_retry_times, Options, ?RETX_MAX_TIME)
}.
setting_peercert_infos(NoSSL, ClientInfo) when
NoSSL =:= nossl;
NoSSL =:= undefined
->
ClientInfo;
setting_peercert_infos(Peercert, ClientInfo) ->
DN = esockd_peercert:subject(Peercert),
CN = esockd_peercert:common_name(Peercert),
ClientInfo#{dn => DN, cn => CN}.
%%--------------------------------------------------------------------
%% Handle incoming packet
%%--------------------------------------------------------------------
-spec handle_in(emqx_jt808_frame:frame() | {frame_error, any()}, channel()) ->
{ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}.
handle_in(Frame = ?MSG(MType), Channel = #channel{conn_state = ConnState}) when
ConnState /= connected, MType =:= ?MC_REGISTER;
ConnState /= connected, MType =:= ?MC_AUTH
->
?SLOG(debug, #{msg => "recv_frame", frame => Frame}),
do_handle_in(Frame, Channel#channel{conn_state = connecting});
handle_in(Frame, Channel = #channel{conn_state = connected}) ->
?SLOG(debug, #{msg => "recv_frame", frame => Frame}),
do_handle_in(Frame, Channel);
handle_in(Frame, Channel) ->
?SLOG(error, #{msg => "unexpected_frame", frame => Frame}),
{stop, unexpected_frame, Channel}.
%% @private
do_handle_in(Frame = ?MSG(?MC_GENERAL_RESPONSE), Channel = #channel{inflight = Inflight}) ->
#{<<"body">> := #{<<"seq">> := Seq, <<"id">> := Id}} = Frame,
NewInflight = ack_msg(?MC_GENERAL_RESPONSE, {Id, Seq}, Inflight),
{ok, Channel#channel{inflight = NewInflight}};
do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) ->
#{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame,
case emqx_jt808_auth:register(Frame, Channel0#channel.auth) of
{ok, Authcode} ->
Channel = enrich_clientinfo(
Frame, enrich_conninfo(Frame, Channel0#channel{authcode = Authcode})
),
handle_out({?MS_REGISTER_ACK, 0}, MsgSn, Channel);
{error, Reason} ->
?SLOG(error, #{msg => "register_failed", reason => Reason}),
ResCode =
case is_integer(Reason) of
true -> Reason;
false -> 1
end,
handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel0)
end;
do_handle_in(Frame = ?MSG(?MC_AUTH), Channel0) ->
#{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame,
Channel =
#channel{clientinfo = #{clientid := ClientId}} =
enrich_clientinfo(Frame, enrich_conninfo(Frame, Channel0)),
authack(
case authenticate(Frame, Channel0) of
true ->
NChannel = prepare_adapter_topic(ensure_connected(Channel)),
emqx_logger:set_metadata_clientid(ClientId),
%% Auto subscribe downlink topics
autosubcribe(NChannel),
_ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel),
%% 0: Successful
{0, MsgSn, NChannel};
false ->
?SLOG(error, #{msg => "authenticated_failed"}),
%% 1: Failure
{1, MsgSn, Channel}
end
);
do_handle_in(Frame = ?MSG(?MC_HEARTBEAT), Channel) ->
handle_out({?MS_GENERAL_RESPONSE, 0, ?MC_HEARTBEAT}, msgsn(Frame), Channel);
do_handle_in(?MSG(?MC_RSA_KEY), Channel = #channel{rsa_key = [E, N]}) ->
Response = #{
<<"header">> => build_frame_header(?MS_RSA_KEY, Channel),
<<"body">> => #{<<"e">> => E, <<"n">> => N}
},
% TODO: how to use client's RSA key?
{ok, [{outgoing, Response}], state_inc_sn(Channel)};
do_handle_in(?MSG(?MC_MULTIMEDIA_DATA_REPORT), Channel = #channel{rsa_key = [_E, _N]}) ->
Response = #{
<<"header">> => build_frame_header(?MS_MULTIMEDIA_DATA_ACK, Channel),
<<"body">> => #{}
},
% TODO: how to fill ?
{ok, [{outgoing, Response}], state_inc_sn(Channel)};
do_handle_in(
Frame = ?MSG(?MC_DRIVER_ID_REPORT),
Channel = #channel{
up_topic = Topic,
inflight = Inflight
}
) ->
{MsgId, MsgSn} = msgidsn(Frame),
_ = do_publish(Topic, Frame),
case is_driver_id_req_exist(Channel) of
% this is an device passive command
false ->
handle_out({?MS_GENERAL_RESPONSE, 0, MsgId}, MsgSn, Channel);
% this is a response to MS_REQ_DRIVER_ID(0x8702)
true ->
{ok, Channel#channel{inflight = ack_msg(?MC_DRIVER_ID_REPORT, none, Inflight)}}
end;
do_handle_in(?MSG(?MC_DEREGISTER), Channel) ->
{stop, normal, Channel};
do_handle_in(Frame = #{}, Channel = #channel{up_topic = Topic, inflight = Inflight}) ->
{MsgId, MsgSn} = msgidsn(Frame),
_ = do_publish(Topic, Frame),
case is_general_response_needed(MsgId) of
% these frames device passive request
true ->
handle_out({?MS_GENERAL_RESPONSE, 0, MsgId}, MsgSn, Channel);
% these frames are response to server's request
false ->
{ok, Channel#channel{inflight = ack_msg(MsgId, seq(Frame), Inflight)}}
end;
do_handle_in(Frame, Channel) ->
?SLOG(error, #{msg => "ignore_unknown_frame", frame => Frame}),
{ok, Channel}.
do_publish(Topic, Frame) ->
?SLOG(debug, #{msg => "publish_msg", to_topic => Topic, farme => Frame}),
emqx:publish(emqx_message:make(jt808, ?QOS_1, Topic, emqx_utils_json:encode(Frame))).
%%--------------------------------------------------------------------
%% Handle Delivers from broker to client
%%--------------------------------------------------------------------
-spec handle_deliver(list(emqx_types:deliver()), channel()) ->
{ok, channel()}
| {ok, replies(), channel()}.
handle_deliver(
Messages0,
Channel = #channel{
clientinfo = #{mountpoint := Mountpoint},
mqueue = Queue,
max_mqueue_len = MaxQueueLen
}
) ->
Messages = lists:map(
fun({deliver, _, M}) ->
emqx_mountpoint:unmount(Mountpoint, M)
end,
Messages0
),
case MaxQueueLen - queue:len(Queue) of
N when N =< 0 ->
discard_downlink_messages(Messages, Channel),
{ok, Channel};
N ->
{NMessages, Dropped} = split_by_pos(Messages, N),
log(debug, #{msg => "enqueue_messages", messages => NMessages}, Channel),
metrics_inc('messages.delivered', Channel, erlang:length(NMessages)),
discard_downlink_messages(Dropped, Channel),
Frames = msgs2frame(NMessages, Channel),
NQueue = lists:foldl(fun(F, Q) -> queue:in(F, Q) end, Queue, Frames),
{Outgoings, NChannel} = dispatch_frame(Channel#channel{mqueue = NQueue}),
{ok, [{outgoing, Outgoings}], NChannel}
end.
split_by_pos(L, Pos) ->
split_by_pos(L, Pos, []).
split_by_pos([], _, A1) ->
{lists:reverse(A1), []};
split_by_pos(L, 0, A1) ->
{lists:reverse(A1), L};
split_by_pos([E | L], N, A1) ->
split_by_pos(L, N - 1, [E | A1]).
msgs2frame(Messages, Channel) ->
lists:filtermap(
fun(#message{payload = Payload}) ->
case emqx_utils_json:safe_decode(Payload, [return_maps]) of
{ok, Map} ->
MsgId = msgid(Map),
NewHeader = build_frame_header(MsgId, Channel),
Frame = maps:put(<<"header">>, NewHeader, Map),
{true, Frame};
{error, Reason} ->
log(error, #{msg => "json_decode_error", reason => Reason}, Channel),
false
end
end,
Messages
).
authack(
{Code, MsgSn,
Channel = #channel{
conninfo = ConnInfo,
clientinfo = ClientInfo
}}
) ->
Code == 0 andalso emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]),
handle_out({?MS_GENERAL_RESPONSE, Code, ?MC_AUTH}, MsgSn, Channel).
handle_out({?MS_GENERAL_RESPONSE, Result, InMsgId}, MsgSn, Channel) ->
Frame = #{
<<"header">> => build_frame_header(?MS_GENERAL_RESPONSE, Channel),
<<"body">> => #{<<"seq">> => MsgSn, <<"result">> => Result, <<"id">> => InMsgId}
},
{ok, [{outgoing, Frame}], state_inc_sn(Channel)};
handle_out({?MS_REGISTER_ACK, 0}, MsgSn, Channel = #channel{authcode = Authcode0}) ->
Authcode =
case Authcode0 == anonymous of
true -> <<>>;
false -> Authcode0
end,
Frame = #{
<<"header">> => build_frame_header(?MS_REGISTER_ACK, Channel),
<<"body">> => #{<<"seq">> => MsgSn, <<"result">> => 0, <<"auth_code">> => Authcode}
},
{ok, [{outgoing, Frame}], state_inc_sn(Channel)};
handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel) ->
Frame = #{
<<"header">> => build_frame_header(?MS_REGISTER_ACK, Channel),
<<"body">> => #{<<"seq">> => MsgSn, <<"result">> => ResCode}
},
{ok, [{outgoing, Frame}], state_inc_sn(Channel)}.
%%--------------------------------------------------------------------
%% Handle call
%%--------------------------------------------------------------------
-spec handle_call(Req :: term(), From :: term(), channel()) ->
{reply, Reply :: term(), channel()}
| {reply, Reply :: term(), replies(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), emqx_jt808_frame:frame(), channel()}.
handle_call(kick, _From, Channel) ->
Channel1 = ensure_disconnected(kicked, Channel),
disconnect_and_shutdown(kicked, ok, Channel1);
handle_call(discard, _From, Channel) ->
disconnect_and_shutdown(discarded, ok, Channel);
handle_call(Req, _From, Channel) ->
log(error, #{msg => "unexpected_call", call => Req}, Channel),
reply(ignored, Channel).
%%--------------------------------------------------------------------
%% Handle cast
%%--------------------------------------------------------------------
-spec handle_cast(Req :: term(), channel()) ->
ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
handle_cast(_Req, Channel) ->
{ok, Channel}.
%%--------------------------------------------------------------------
%% Handle info
%%--------------------------------------------------------------------
-spec handle_info(Info :: term(), channel()) ->
ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
shutdown(Reason, Channel);
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
shutdown(Reason, Channel);
handle_info(
{sock_closed, Reason},
Channel =
#channel{
conn_state = connected
}
) ->
NChannel = ensure_disconnected(Reason, Channel),
shutdown(Reason, NChannel);
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
log(error, #{msg => "unexpected_sock_closed", reason => Reason}, Channel),
{ok, Channel};
handle_info(Info, Channel) ->
log(error, #{msg => "unexpected_info", info => Info}, Channel),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Handle timeout
%%--------------------------------------------------------------------
-spec handle_timeout(reference(), Msg :: term(), channel()) ->
{ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}.
handle_timeout(
_TRef,
{keepalive, _StatVal},
Channel = #channel{keepalive = undefined}
) ->
{ok, Channel};
handle_timeout(
_TRef,
{keepalive, _StatVal},
Channel = #channel{conn_state = disconnected}
) ->
{ok, Channel};
handle_timeout(
_TRef,
{keepalive, StatVal},
Channel = #channel{keepalive = Keepalive}
) ->
case emqx_keepalive:check(StatVal, Keepalive) of
{ok, NKeepalive} ->
NChannel = Channel#channel{keepalive = NKeepalive},
{ok, reset_timer(alive_timer, NChannel)};
{error, timeout} ->
shutdown(keepalive_timeout, Channel)
end;
handle_timeout(
_TRef, retry_delivery, Channel = #channel{inflight = Inflight, retx_interval = RetxInterval}
) ->
case emqx_inflight:is_empty(Inflight) of
true ->
{ok, clean_timer(retry_timer, Channel)};
false ->
Frames = lists:sort(sortfun(), emqx_inflight:to_list(Inflight)),
{Outgoings, NInflight} = retry_delivery(
Frames, erlang:system_time(millisecond), RetxInterval, Inflight, []
),
{Outgoings2, NChannel} = dispatch_frame(Channel#channel{inflight = NInflight}),
{ok, [{outgoing, Outgoings ++ Outgoings2}], reset_timer(retry_timer, NChannel)}
end.
sortfun() ->
fun({_, {_, _, Ts1}}, {_, {_, _, Ts2}}) -> Ts1 < Ts2 end.
retry_delivery([], _Now, _Interval, Inflight, Acc) ->
{lists:reverse(Acc), Inflight};
retry_delivery([{Key, {_Frame, 0, _}} | Frames], Now, Interval, Inflight, Acc) ->
%% todo log(error, "has arrived max re-send times, drop ~p", [Frame]),
NInflight = emqx_inflight:delete(Key, Inflight),
retry_delivery(Frames, Now, Interval, NInflight, Acc);
retry_delivery([{Key, {Frame, RetxCount, Ts}} | Frames], Now, Interval, Inflight, Acc) ->
Diff = Now - Ts,
case Diff >= Interval of
true ->
NInflight = emqx_inflight:update(Key, {Frame, RetxCount - 1, Now}, Inflight),
retry_delivery(Frames, Now, Interval, NInflight, [Frame | Acc]);
_ ->
retry_delivery(Frames, Now, Interval, Inflight, Acc)
end.
dispatch_frame(
Channel = #channel{
msg_sn = TxMsgSn,
mqueue = Queue,
inflight = Inflight,
retx_max_times = RetxMax
}
) ->
case emqx_inflight:is_full(Inflight) orelse queue:is_empty(Queue) of
true ->
{[], Channel};
false ->
{{value, Frame}, NewQueue} = queue:out(Queue),
log(debug, #{msg => "delivery", frame => Frame}, Channel),
NewInflight = emqx_inflight:insert(
set_msg_ack(msgid(Frame), TxMsgSn),
{Frame, RetxMax, erlang:system_time(millisecond)},
Inflight
),
NChannel = Channel#channel{mqueue = NewQueue, inflight = NewInflight},
{[Frame], ensure_timer(retry_timer, NChannel)}
end.
%%--------------------------------------------------------------------
%% Ensure timers
%%--------------------------------------------------------------------
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
TRef = maps:get(Name, Timers, undefined),
Time = interval(Name, Channel),
case TRef == undefined andalso Time > 0 of
true -> ensure_timer(Name, Time, Channel);
%% Timer disabled or exists
false -> Channel
end.
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
log(debug, #{msg => "start_timer", name => Name, time => Time}, Channel),
Msg = maps:get(Name, ?TIMER_TABLE),
TRef = emqx_utils:start_timer(Time, Msg),
Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) ->
ensure_timer(Name, clean_timer(Name, Channel)).
clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive);
interval(retry_timer, #channel{retx_interval = RetxIntv}) ->
RetxIntv.
%%--------------------------------------------------------------------
%% Terminate
%%--------------------------------------------------------------------
terminate(_Reason, #channel{clientinfo = #{clientid := undefined}}) ->
ok;
terminate(_Reason, #channel{conn_state = disconnected}) ->
ok;
terminate(Reason, #channel{clientinfo = ClientInfo, conninfo = ConnInfo}) ->
?SLOG(info, #{msg => "connection_shutdown", reason => Reason}),
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, NConnInfo]).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
maybe_fix_mountpoint(ClientInfo = #{mountpoint := undefined}) ->
ClientInfo;
maybe_fix_mountpoint(ClientInfo = #{mountpoint := Mountpoint}) ->
%% TODO: Enrich the variable replacement????
%% i.e: ${ClientInfo.auth_result.productKey}
Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
ClientInfo#{mountpoint := Mountpoint1}.
ensure_connected(
Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{
conninfo = NConnInfo,
conn_state = connected
}.
%% Ensure disconnected
ensure_disconnected(
Reason,
Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = run_hooks(
Ctx,
'client.disconnected',
[ClientInfo, Reason, NConnInfo]
),
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
ack_msg(MsgId, KeyParam, Inflight) ->
Key = get_msg_ack(MsgId, KeyParam),
case emqx_inflight:contain(Key, Inflight) of
true -> emqx_inflight:delete(Key, Inflight);
false -> Inflight
end.
set_msg_ack(?MS_SET_CLIENT_PARAM, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_CLIENT_PARAM, MsgSn}};
set_msg_ack(?MS_QUERY_CLIENT_ALL_PARAM, MsgSn) ->
{?MC_QUERY_PARAM_ACK, MsgSn};
set_msg_ack(?MS_QUERY_CLIENT_PARAM, MsgSn) ->
{?MC_QUERY_PARAM_ACK, MsgSn};
set_msg_ack(?MS_CLIENT_CONTROL, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_CLIENT_CONTROL, MsgSn}};
set_msg_ack(?MS_QUERY_CLIENT_ATTRIB, _MsgSn) ->
{?MC_QUERY_ATTRIB_ACK, none};
set_msg_ack(?MS_OTA, _MsgSn) ->
{?MC_OTA_ACK, none};
set_msg_ack(?MS_QUERY_LOCATION, MsgSn) ->
{?MC_QUERY_LOCATION_ACK, MsgSn};
set_msg_ack(?MS_TRACE_LOCATION, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_TRACE_LOCATION, MsgSn}};
set_msg_ack(?MS_CONFIRM_ALARM, _MsgSn) ->
% TODO: how to ack this message?
{};
set_msg_ack(?MS_SEND_TEXT, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SEND_TEXT, MsgSn}};
set_msg_ack(?MS_SET_EVENT, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_EVENT, MsgSn}};
set_msg_ack(?MS_SEND_QUESTION, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SEND_QUESTION, MsgSn}};
set_msg_ack(?MS_SET_MENU, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_MENU, MsgSn}};
set_msg_ack(?MS_INFO_CONTENT, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_INFO_CONTENT, MsgSn}};
set_msg_ack(?MS_PHONE_CALLBACK, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_PHONE_CALLBACK, MsgSn}};
set_msg_ack(?MS_SET_PHONE_NUMBER, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_PHONE_NUMBER, MsgSn}};
set_msg_ack(?MS_VEHICLE_CONTROL, MsgSn) ->
{?MC_VEHICLE_CTRL_ACK, MsgSn};
set_msg_ack(?MS_SET_CIRCLE_AREA, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_CIRCLE_AREA, MsgSn}};
set_msg_ack(?MS_DEL_CIRCLE_AREA, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_DEL_CIRCLE_AREA, MsgSn}};
set_msg_ack(?MS_SET_RECT_AREA, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_RECT_AREA, MsgSn}};
set_msg_ack(?MS_DEL_RECT_AREA, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_DEL_RECT_AREA, MsgSn}};
set_msg_ack(?MS_SET_POLY_AREA, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_POLY_AREA, MsgSn}};
set_msg_ack(?MS_DEL_POLY_AREA, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_DEL_POLY_AREA, MsgSn}};
set_msg_ack(?MS_SET_PATH, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SET_PATH, MsgSn}};
set_msg_ack(?MS_DEL_PATH, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_DEL_PATH, MsgSn}};
set_msg_ack(?MS_DRIVE_RECORD_CAPTURE, MsgSn) ->
{?MC_DRIVE_RECORD_REPORT, MsgSn};
set_msg_ack(?MS_DRIVE_REC_PARAM_SEND, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_DRIVE_REC_PARAM_SEND, MsgSn}};
set_msg_ack(?MS_REQ_DRIVER_ID, _MsgSn) ->
{?MC_DRIVER_ID_REPORT, none};
set_msg_ack(?MS_CAMERA_SHOT, MsgSn) ->
% TODO: spec has two conflicted statement about this ack
% section 7.9.3 requires general ack
% section 8.55 requires 0x0805
{?MC_CAMERA_SHOT_ACK, MsgSn};
set_msg_ack(?MS_MM_DATA_SEARCH, MsgSn) ->
{?MC_MM_DATA_SEARCH_ACK, MsgSn};
set_msg_ack(?MS_MM_DATA_UPLOAD, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_MM_DATA_UPLOAD, MsgSn}};
set_msg_ack(?MS_VOICE_RECORD, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_VOICE_RECORD, MsgSn}};
set_msg_ack(?MS_SINGLE_MM_DATA_CTRL, MsgSn) ->
% TODO: right?
{?MC_MM_DATA_SEARCH_ACK, MsgSn};
set_msg_ack(?MS_SEND_TRANSPARENT_DATA, MsgSn) ->
{?MC_GENERAL_RESPONSE, {?MS_SEND_TRANSPARENT_DATA, MsgSn}};
set_msg_ack(MsgId, Param) ->
error({invalid_message_type, MsgId, Param}).
get_msg_ack(?MC_GENERAL_RESPONSE, {MsgId, MsgSn}) ->
{?MC_GENERAL_RESPONSE, {MsgId, MsgSn}};
get_msg_ack(?MC_QUERY_PARAM_ACK, MsgSn) ->
{?MC_QUERY_PARAM_ACK, MsgSn};
get_msg_ack(?MC_QUERY_ATTRIB_ACK, _MsgSn) ->
{?MC_QUERY_ATTRIB_ACK, none};
get_msg_ack(?MC_OTA_ACK, _MsgSn) ->
{?MC_OTA_ACK, none};
get_msg_ack(?MC_QUERY_LOCATION_ACK, MsgSn) ->
{?MC_QUERY_LOCATION_ACK, MsgSn};
get_msg_ack(?MC_QUESTION_ACK, MsgSn) ->
{?MC_QUESTION_ACK, MsgSn};
get_msg_ack(?MC_VEHICLE_CTRL_ACK, MsgSn) ->
{?MC_VEHICLE_CTRL_ACK, MsgSn};
get_msg_ack(?MC_DRIVE_RECORD_REPORT, MsgSn) ->
{?MC_DRIVE_RECORD_REPORT, MsgSn};
get_msg_ack(?MC_CAMERA_SHOT_ACK, MsgSn) ->
{?MC_CAMERA_SHOT_ACK, MsgSn};
get_msg_ack(?MC_MM_DATA_SEARCH_ACK, MsgSn) ->
{?MC_MM_DATA_SEARCH_ACK, MsgSn};
get_msg_ack(?MC_DRIVER_ID_REPORT, _MsgSn) ->
{?MC_DRIVER_ID_REPORT, none};
get_msg_ack(MsgId, MsgSn) ->
error({invalid_message_type, MsgId, MsgSn}).
build_frame_header(MsgId, #channel{clientinfo = #{phone := Phone}, msg_sn = TxMsgSn}) ->
build_frame_header(MsgId, 0, Phone, TxMsgSn).
build_frame_header(MsgId, Encrypt, Phone, TxMsgSn) ->
#{
<<"msg_id">> => MsgId,
<<"encrypt">> => Encrypt,
<<"phone">> => Phone,
<<"msg_sn">> => TxMsgSn
}.
seq(#{<<"body">> := #{<<"seq">> := MsgSn}}) -> MsgSn;
seq(#{}) -> 0.
msgsn(#{<<"header">> := #{<<"msg_sn">> := MsgSn}}) -> MsgSn.
msgid(#{<<"header">> := #{<<"msg_id">> := MsgId}}) -> MsgId.
msgidsn(#{
<<"header">> := #{
<<"msg_id">> := MsgId,
<<"msg_sn">> := MsgSn
}
}) ->
{MsgId, MsgSn}.
state_inc_sn(Channel = #channel{msg_sn = Sn}) ->
Channel#channel{msg_sn = next_msg_sn(Sn)}.
next_msg_sn(16#FFFF) -> 0;
next_msg_sn(Sn) -> Sn + 1.
is_general_response_needed(?MC_EVENT_REPORT) -> true;
is_general_response_needed(?MC_LOCATION_REPORT) -> true;
is_general_response_needed(?MC_INFO_REQ_CANCEL) -> true;
is_general_response_needed(?MC_WAYBILL_REPORT) -> true;
is_general_response_needed(?MC_BULK_LOCATION_REPORT) -> true;
is_general_response_needed(?MC_CAN_BUS_REPORT) -> true;
is_general_response_needed(?MC_MULTIMEDIA_EVENT_REPORT) -> true;
is_general_response_needed(?MC_SEND_TRANSPARENT_DATA) -> true;
is_general_response_needed(?MC_SEND_ZIP_DATA) -> true;
is_general_response_needed(_) -> false.
is_driver_id_req_exist(#channel{inflight = Inflight}) ->
% if there is a MS_REQ_DRIVER_ID (0x8702) command in re-tx queue
Key = get_msg_ack(?MC_DRIVER_ID_REPORT, none),
emqx_inflight:contain(Key, Inflight).
authenticate(_AuthFrame, #channel{authcode = anonymous}) ->
true;
authenticate(AuthFrame, #channel{authcode = undefined, auth = Auth}) ->
%% Try request authentication server
case emqx_jt808_auth:authenticate(AuthFrame, Auth) of
{ok, #{auth_result := IsAuth}} ->
IsAuth;
{error, Reason} ->
?SLOG(error, #{msg => "request_auth_server_failed", reason => Reason}),
false
end;
authenticate(
#{<<"body">> := #{<<"code">> := InCode}},
#channel{authcode = Authcode}
) ->
InCode == Authcode.
enrich_conninfo(
#{<<"header">> := #{<<"phone">> := Phone}},
Channel = #channel{conninfo = ConnInfo}
) ->
NConnInfo = ConnInfo#{
proto_name => <<"jt808">>,
proto_ver => <<"2013">>,
clean_start => true,
clientid => Phone,
username => undefined,
conn_props => #{},
connected => true,
connected_at => erlang:system_time(millisecond),
keepalive => ?DEFAULT_KEEPALIVE,
receive_maximum => 0,
expiry_interval => 0
},
Channel#channel{conninfo = NConnInfo}.
%% Register
enrich_clientinfo(
#{
<<"header">> := #{<<"phone">> := Phone},
<<"body">> := #{
<<"manufacturer">> := Manu,
<<"dev_id">> := DevId
}
},
Channel = #channel{clientinfo = ClientInfo}
) ->
NClientInfo = maybe_fix_mountpoint(ClientInfo#{
phone => Phone,
clientid => Phone,
manufacturer => Manu,
terminal_id => DevId
}),
Channel#channel{clientinfo = NClientInfo};
%% Auth
enrich_clientinfo(
#{<<"header">> := #{<<"phone">> := Phone}},
Channel = #channel{clientinfo = ClientInfo}
) ->
NClientInfo = ClientInfo#{
phone => Phone,
clientid => Phone
},
Channel#channel{clientinfo = NClientInfo}.
prepare_adapter_topic(Channel = #channel{up_topic = UpTopic, dn_topic = DnTopic}) ->
Channel#channel{
up_topic = replvar(UpTopic, Channel),
dn_topic = replvar(DnTopic, Channel)
}.
replvar(undefined, _Channel) ->
undefined;
replvar(Topic, #channel{clientinfo = #{clientid := ClientId, phone := Phone}}) when
is_binary(Topic)
->
do_replvar(Topic, #{clientid => ClientId, phone => Phone}).
do_replvar(Topic, Vars) ->
ClientID = maps:get(clientid, Vars, undefined),
Phone = maps:get(phone, Vars, undefined),
List = [
{?PH_CLIENTID, ClientID},
{?PH_PHONE, Phone}
],
lists:foldl(fun feed_var/2, Topic, List).
feed_var({_PH, undefined}, Topic) ->
Topic;
feed_var({PH, Value}, Topic) ->
emqx_topic:feed_var(PH, Value, Topic).
autosubcribe(#channel{dn_topic = Topic}) when
Topic == undefined;
Topic == ""
->
ok;
autosubcribe(#channel{
clientinfo =
ClientInfo =
#{clientid := ClientId},
dn_topic = Topic
}) ->
SubOpts = #{rap => 0, nl => 0, qos => 0, rh => 0},
emqx:subscribe(Topic, ClientId, SubOpts),
ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts#{is_new => true}]).
start_keepalive(Secs, _Channel) when Secs > 0 ->
self() ! {keepalive, start, round(Secs) * 1000}.
run_hooks(Ctx, Name, Args) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name),
emqx_hooks:run(Name, Args).
discard_downlink_messages([], _Channel) ->
ok;
discard_downlink_messages(Messages, Channel) ->
log(
error,
#{
msg => "discard_new_downlink_messages",
reason =>
"Discard new downlink messages due to that too"
" many messages are waiting their ACKs.",
messages => Messages
},
Channel
),
metrics_inc('delivery.dropped', Channel, erlang:length(Messages)).
metrics_inc(Name, #channel{ctx = Ctx}, Oct) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name, Oct).
log(Level, Meta, #channel{clientinfo = #{clientid := ClientId, username := Username}} = _Channel) ->
?SLOG(Level, Meta#{clientid => ClientId, username => Username}).
reply(Reply, Channel) ->
{reply, Reply, Channel}.
shutdown(Reason, Channel) ->
{shutdown, Reason, Channel}.
shutdown(Reason, Reply, Channel) ->
{shutdown, Reason, Reply, Channel}.
disconnect_and_shutdown(Reason, Reply, Channel) ->
shutdown(Reason, Reply, Channel).

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,119 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_jt808_schema).
-include("emqx_jt808.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-export([fields/1, desc/1]).
-define(NOT_EMPTY(MSG), emqx_resource_validator:not_empty(MSG)).
fields(jt808) ->
[
{frame, sc(ref(jt808_frame))},
{proto, sc(ref(jt808_proto))},
{mountpoint, emqx_gateway_schema:mountpoint(?DEFAULT_MOUNTPOINT)},
{retry_interval,
sc(
emqx_schema:duration_ms(),
#{
default => <<"8s">>,
desc => ?DESC(retry_interval)
}
)},
{max_retry_times,
sc(
non_neg_integer(),
#{
default => 3,
desc => ?DESC(max_retry_times)
}
)},
{message_queue_len,
sc(
non_neg_integer(),
#{
default => 10,
desc => ?DESC(message_queue_len)
}
)},
{listeners, sc(ref(emqx_gateway_schema, tcp_listeners), #{desc => ?DESC(tcp_listeners)})}
] ++ emqx_gateway_schema:gateway_common_options();
fields(jt808_frame) ->
[
{max_length, fun jt808_frame_max_length/1}
];
fields(jt808_proto) ->
[
{allow_anonymous, fun allow_anonymous/1},
{registry, fun registry_url/1},
{authentication, fun authentication_url/1},
{up_topic, fun up_topic/1},
{dn_topic, fun dn_topic/1}
].
jt808_frame_max_length(type) -> non_neg_integer();
jt808_frame_max_length(desc) -> ?DESC(?FUNCTION_NAME);
jt808_frame_max_length(default) -> 8192;
jt808_frame_max_length(required) -> false;
jt808_frame_max_length(_) -> undefined.
allow_anonymous(type) -> boolean();
allow_anonymous(desc) -> ?DESC(?FUNCTION_NAME);
allow_anonymous(default) -> true;
allow_anonymous(required) -> false;
allow_anonymous(_) -> undefined.
registry_url(type) -> binary();
registry_url(desc) -> ?DESC(?FUNCTION_NAME);
registry_url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")];
registry_url(required) -> false;
registry_url(_) -> undefined.
authentication_url(type) -> binary();
authentication_url(desc) -> ?DESC(?FUNCTION_NAME);
authentication_url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")];
authentication_url(required) -> false;
authentication_url(_) -> undefined.
up_topic(type) -> binary();
up_topic(desc) -> ?DESC(?FUNCTION_NAME);
up_topic(default) -> ?DEFAULT_UP_TOPIC;
up_topic(validator) -> [?NOT_EMPTY("the value of the field 'up_topic' cannot be empty")];
up_topic(required) -> true;
up_topic(_) -> undefined.
dn_topic(type) -> binary();
dn_topic(desc) -> ?DESC(?FUNCTION_NAME);
dn_topic(default) -> ?DEFAULT_DN_TOPIC;
dn_topic(validator) -> [?NOT_EMPTY("the value of the field 'dn_topic' cannot be empty")];
dn_topic(required) -> true;
dn_topic(_) -> undefined.
desc(jt808) ->
"The JT/T 808 protocol gateway provides EMQX with the ability to access JT/T 808 protocol devices.";
desc(jt808_frame) ->
"Limits for the JT/T 808 frames.";
desc(jt808_proto) ->
"The JT/T 808 protocol options.";
desc(_) ->
undefined.
%%--------------------------------------------------------------------
%% internal functions
sc(Type) ->
sc(Type, #{}).
sc(Type, Meta) ->
hoconsc:mk(Type, Meta).
ref(StructName) ->
ref(?MODULE, StructName).
ref(Mod, Field) ->
hoconsc:ref(Mod, Field).

View File

@ -128,6 +128,7 @@
emqx_audit,
emqx_gateway_gbt32960,
emqx_gateway_ocpp,
emqx_gateway_jt808,
emqx_bridge_syskeeper
],
%% must always be of type `load'

View File

@ -218,6 +218,7 @@ defmodule EMQXUmbrella.MixProject do
:emqx_audit,
:emqx_gateway_gbt32960,
:emqx_gateway_ocpp,
:emqx_gateway_jt808,
:emqx_bridge_syskeeper
])
end

View File

@ -113,6 +113,7 @@ is_community_umbrella_app("apps/emqx_dashboard_sso") -> false;
is_community_umbrella_app("apps/emqx_audit") -> false;
is_community_umbrella_app("apps/emqx_gateway_gbt32960") -> false;
is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false;
is_community_umbrella_app("apps/emqx_gateway_jt808") -> false;
is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false;
is_community_umbrella_app(_) -> true.

View File

@ -62,9 +62,10 @@ then the client actually subscribes to the topic `some_tenant/t`.
Similarly, if another client B (connected to the same listener as the client A) sends a message to topic `t`,
the message is routed to all the clients subscribed `some_tenant/t`,
so client A will receive the message, with topic name `t`. Set to `""` to disable the feature.
Variables in mountpoint string:<br/>
Supported placeholders in mountpoint string:<br/>
- <code>${clientid}</code>: clientid<br/>
- <code>${username}</code>: username"""
- <code>${username}</code>: username<br/>
- <code>${endpoint_name}</code>: endpoint name"""
listener_name_to_settings_map.desc:
"""A map from listener names to listener settings."""

View File

@ -0,0 +1,30 @@
emqx_jt808_schema {
jt808_frame_max_length.desc:
"""The maximum length of the JT/T 808 frame."""
jt808_allow_anonymous.desc:
"""Allow anonymous access to the JT/T 808 Gateway."""
registry_url.desc
"""The JT/T 808 device registry central URL."""
authentication_url.desc
"""The JT/T 808 device authentication central URL."""
jt808_up_topic.desc
"""The topic of the JT/T 808 protocol upstream message."""
jt808_dn_topic.desc
"""The topic of the JT/T 808 protocol downstream message."""
retry_interval.desc:
"""Re-send time interval"""
max_retry_times.desc:
"""Re-send max times"""
message_queue_len.desc:
"""Max message queue length"""
}