From 4d493292fb494bf0f34bdc3a4eef88ace95fb590 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Sun, 12 Nov 2023 21:44:18 +0800 Subject: [PATCH] feat(gw_jt808): port jt808 gateway from 4.4 --- apps/emqx_gateway/src/emqx_gateway_api.erl | 6 +- apps/emqx_gateway_jt808/.gitignore | 27 + apps/emqx_gateway_jt808/BSL.txt | 94 ++ apps/emqx_gateway_jt808/README.md | 26 + .../emqx_gateway_jt808/include/emqx_jt808.hrl | 195 +++ apps/emqx_gateway_jt808/rebar.config | 7 + .../src/emqx_gateway_jt808.app.src | 11 + .../src/emqx_gateway_jt808.erl | 99 ++ .../src/emqx_jt808_auth.erl | 101 ++ .../src/emqx_jt808_channel.erl | 951 ++++++++++++++ .../src/emqx_jt808_frame.erl | 1105 +++++++++++++++++ .../src/emqx_jt808_schema.erl | 119 ++ apps/emqx_machine/priv/reboot_lists.eterm | 1 + mix.exs | 1 + rebar.config.erl | 1 + rel/i18n/emqx_gateway_schema.hocon | 7 +- rel/i18n/emqx_jt808_schema.hocon | 30 + 17 files changed, 2776 insertions(+), 5 deletions(-) create mode 100644 apps/emqx_gateway_jt808/.gitignore create mode 100644 apps/emqx_gateway_jt808/BSL.txt create mode 100644 apps/emqx_gateway_jt808/README.md create mode 100644 apps/emqx_gateway_jt808/include/emqx_jt808.hrl create mode 100644 apps/emqx_gateway_jt808/rebar.config create mode 100644 apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src create mode 100644 apps/emqx_gateway_jt808/src/emqx_gateway_jt808.erl create mode 100644 apps/emqx_gateway_jt808/src/emqx_jt808_auth.erl create mode 100644 apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl create mode 100644 apps/emqx_gateway_jt808/src/emqx_jt808_frame.erl create mode 100644 apps/emqx_gateway_jt808/src/emqx_jt808_schema.erl create mode 100644 rel/i18n/emqx_jt808_schema.hocon diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index 7195943a3..ae2533f97 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -381,7 +381,8 @@ fields(Gw) when Gw == lwm2m; Gw == exproto; Gw == gbt32960; - Gw == ocpp + Gw == ocpp; + Gw == jt808 -> [{name, mk(Gw, #{desc => ?DESC(gateway_name)})}] ++ convert_listener_struct(emqx_gateway_schema:gateway_schema(Gw)); @@ -392,7 +393,8 @@ fields(Gw) when Gw == update_lwm2m; Gw == update_exproto; Gw == update_gbt32960; - Gw == update_ocpp + Gw == update_ocpp; + Gw == update_jt808 -> "update_" ++ GwStr = atom_to_list(Gw), Gw1 = list_to_existing_atom(GwStr), diff --git a/apps/emqx_gateway_jt808/.gitignore b/apps/emqx_gateway_jt808/.gitignore new file mode 100644 index 000000000..c09296035 --- /dev/null +++ b/apps/emqx_gateway_jt808/.gitignore @@ -0,0 +1,27 @@ +.eunit +deps +*.o +*.beam +*.plt +erl_crash.dump +ebin/* +rel/example_project +.concrete/DEV_MODE +.rebar +.erlang.mk/ +emqx_connect_jt808.d +data/ +!data/app.config +.idea/ +*.iml +emqx_gateway_jt808.d +logs/ +cover/ +ct.coverdata +eunit.coverdata +test/ct.cover.spec +_build/ +etc/emqx_gateway_jt808.conf.rendered +*.swp +rebar.lock +.rebar3/ diff --git a/apps/emqx_gateway_jt808/BSL.txt b/apps/emqx_gateway_jt808/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_gateway_jt808/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_gateway_jt808/README.md b/apps/emqx_gateway_jt808/README.md new file mode 100644 index 000000000..1eb296d34 --- /dev/null +++ b/apps/emqx_gateway_jt808/README.md @@ -0,0 +1,26 @@ +# emqx_jt808 + +The JT/T 808 protocol is designed for data interaction between vehicle-mounted satellite terminals and IoT platforms. + +The **JT/T 808 Gateway** in EMQX can accept JT/T 808 clients and translate their events +and messages into MQTT Publish messages. + +In the current implementation, it has the following limitations: +- Only supports JT/T 808-2013 protocol, JT/T 808-2019 is not supported yet. +- Based TCP/TLS transport. +- Third-party authentication/registration http service required. + +## Quick Start + +In EMQX 5.0, JT/T 808 gateway can be configured and enabled through the Dashboard. + +It can also be enabled via the HTTP API, and emqx.conf e.g, In emqx.conf: + +``` +``` + +> Note: +> Configuring the gateway via emqx.conf requires changes on a per-node basis, +> but configuring it via Dashboard or the HTTP API will take effect across the cluster. + +More documentations: [JT/T 808 Gateway](https://www.emqx.io/docs/en/v5.0/gateway/jt808.html) diff --git a/apps/emqx_gateway_jt808/include/emqx_jt808.hrl b/apps/emqx_gateway_jt808/include/emqx_jt808.hrl new file mode 100644 index 000000000..f5b41c2e9 --- /dev/null +++ b/apps/emqx_gateway_jt808/include/emqx_jt808.hrl @@ -0,0 +1,195 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_JT808_HRL). +-define(EMQX_JT808_HRL, true). + +%%-------------------------------------------------------------------- +%% Message Ids +%%-------------------------------------------------------------------- + +%% Message Ids of client to server +-define(MC_GENERAL_RESPONSE, 16#0001). +-define(MC_HEARTBEAT, 16#0002). +-define(MC_REGISTER, 16#0100). +-define(MC_DEREGISTER, 16#0003). +-define(MC_AUTH, 16#0102). +-define(MC_QUERY_PARAM_ACK, 16#0104). +-define(MC_QUERY_ATTRIB_ACK, 16#0107). +-define(MC_OTA_ACK, 16#0108). +-define(MC_LOCATION_REPORT, 16#0200). +-define(MC_QUERY_LOCATION_ACK, 16#0201). +-define(MC_EVENT_REPORT, 16#0301). +-define(MC_QUESTION_ACK, 16#0302). +-define(MC_INFO_REQ_CANCEL, 16#0303). +-define(MC_VEHICLE_CTRL_ACK, 16#0500). +-define(MC_DRIVE_RECORD_REPORT, 16#0700). +-define(MC_WAYBILL_REPORT, 16#0701). +-define(MC_DRIVER_ID_REPORT, 16#0702). +-define(MC_BULK_LOCATION_REPORT, 16#0704). +-define(MC_CAN_BUS_REPORT, 16#0705). +-define(MC_MULTIMEDIA_EVENT_REPORT, 16#0800). +-define(MC_MULTIMEDIA_DATA_REPORT, 16#0801). +-define(MC_CAMERA_SHOT_ACK, 16#0805). +-define(MC_MM_DATA_SEARCH_ACK, 16#0802). +-define(MC_SEND_TRANSPARENT_DATA, 16#0900). +-define(MC_SEND_ZIP_DATA, 16#0901). +-define(MC_RSA_KEY, 16#0A00). + +%% Message Ids of server to client +-define(MS_GENERAL_RESPONSE, 16#8001). +-define(MS_REQUEST_FRAGMENT, 16#8003). +-define(MS_REGISTER_ACK, 16#8100). +-define(MS_SET_CLIENT_PARAM, 16#8103). +-define(MS_QUERY_CLIENT_ALL_PARAM, 16#8104). +-define(MS_CLIENT_CONTROL, 16#8105). +-define(MS_QUERY_CLIENT_PARAM, 16#8106). +-define(MS_QUERY_CLIENT_ATTRIB, 16#8107). +-define(MS_OTA, 16#8108). +-define(MS_QUERY_LOCATION, 16#8201). +-define(MS_TRACE_LOCATION, 16#8202). +-define(MS_CONFIRM_ALARM, 16#8203). +-define(MS_SEND_TEXT, 16#8300). +-define(MS_SET_EVENT, 16#8301). +-define(MS_SEND_QUESTION, 16#8302). +-define(MS_SET_MENU, 16#8303). +-define(MS_INFO_CONTENT, 16#8304). +-define(MS_PHONE_CALLBACK, 16#8400). +-define(MS_SET_PHONE_NUMBER, 16#8401). +-define(MS_VEHICLE_CONTROL, 16#8500). +-define(MS_SET_CIRCLE_AREA, 16#8600). +-define(MS_DEL_CIRCLE_AREA, 16#8601). +-define(MS_SET_RECT_AREA, 16#8602). +-define(MS_DEL_RECT_AREA, 16#8603). +-define(MS_SET_POLY_AREA, 16#8604). +-define(MS_DEL_POLY_AREA, 16#8605). +-define(MS_SET_PATH, 16#8606). +-define(MS_DEL_PATH, 16#8607). +-define(MS_DRIVE_RECORD_CAPTURE, 16#8700). +-define(MS_DRIVE_REC_PARAM_SEND, 16#8701). +-define(MS_REQ_DRIVER_ID, 16#8702). +-define(MS_MULTIMEDIA_DATA_ACK, 16#8800). +-define(MS_CAMERA_SHOT, 16#8801). +-define(MS_MM_DATA_SEARCH, 16#8802). +-define(MS_MM_DATA_UPLOAD, 16#8803). +-define(MS_VOICE_RECORD, 16#8804). +-define(MS_SINGLE_MM_DATA_CTRL, 16#8805). +-define(MS_SEND_TRANSPARENT_DATA, 16#8900). +-define(MS_RSA_KEY, 16#8A00). + +%% Client Params +-define(CP_HEARTBEAT_DURATION, 16#0001). +-define(CP_TCP_TIMEOUT, 16#0002). +-define(CP_TCP_RETX, 16#0003). +-define(CP_UDP_TIMEOUT, 16#0004). +-define(CP_UDP_RETX, 16#0005). +-define(CP_SMS_TIMEOUT, 16#0006). +-define(CP_SMS_RETX, 16#0007). +-define(CP_SERVER_APN, 16#0010). +-define(CP_DIAL_USERNAME, 16#0011). +-define(CP_DIAL_PASSWORD, 16#0012). +-define(CP_SERVER_ADDRESS, 16#0013). +-define(CP_BACKUP_SERVER_APN, 16#0014). +-define(CP_BACKUP_DIAL_USERNAME, 16#0015). +-define(CP_BACKUP_DIAL_PASSWORD, 16#0016). +-define(CP_BACKUP_SERVER_ADDRESS, 16#0017). +-define(CP_SERVER_TCP_PORT, 16#0018). +-define(CP_SERVER_UDP_PORT, 16#0019). +-define(CP_IC_CARD_SERVER_ADDRESS, 16#001A). +-define(CP_IC_CARD_SERVER_TCP_PORT, 16#001B). +-define(CP_IC_CARD_SERVER_UDP_PORT, 16#001C). +-define(CP_IC_CARD_BACKUP_SERVER_ADDRESS, 16#001D). +-define(CP_POS_REPORT_POLICY, 16#0020). +-define(CP_POS_REPORT_CONTROL, 16#0021). +-define(CP_DRIVER_NLOGIN_REPORT_INTERVAL, 16#0022). +-define(CP_REPORT_INTERVAL_DURING_SLEEP, 16#0027). +-define(CP_EMERGENCY_ALARM_REPORT_INTERVAL, 16#0028). +-define(CP_DEFAULT_REPORT_INTERVAL, 16#0029). +-define(CP_DEFAULT_DISTANCE_INTERVAL, 16#002C). +-define(CP_DRIVER_NLOGIN_DISTANCE_INTERVAL, 16#002D). +-define(CP_DISTANCE_INTERVAL_DURING_SLEEP, 16#002E). +-define(CP_EMERGENCY_ALARM_DISTANCE_INTERVAL, 16#002F). +-define(CP_SET_TURN_ANGLE, 16#0030). +-define(CP_EFENCE_RADIUS, 16#0031). +-define(CP_MONITOR_PHONE, 16#0040). +-define(CP_RESETING_PHONE, 16#0041). +-define(CP_RECOVERY_PHONE, 16#0042). +-define(CP_SMS_MONITOR_PHONE, 16#0043). +-define(CP_EMERGENCY_SMS_PHONE, 16#0044). +-define(CP_ACCEPT_CALL_POLICY, 16#0045). +-define(CP_MAX_CALL_DURATION, 16#0046). +-define(CP_MAX_CALL_DURATION_OF_MONTH, 16#0047). +-define(CP_SPY_PHONE, 16#0048). +-define(CP_PRIVILEGE_SMS_PHONE, 16#0049). +-define(CP_ALARM_MASK, 16#0050). +-define(CP_ALARM_SEND_SMS_MASK, 16#0051). +-define(CP_ALARM_CAMERA_SHOT_MASK, 16#0052). +-define(CP_ALARM_PICTURE_SAVE_MASK, 16#0053). +-define(CP_ALARM_KEY_MASK, 16#0054). +-define(CP_MAX_SPEED, 16#0055). +-define(CP_OVERSPEED_ELAPSED, 16#0056). +-define(CP_CONT_DRIVE_THRESHOLD, 16#0057). +-define(CP_ACC_DRIVE_TIME_ONE_DAY_THRESHOLD, 16#0058). +-define(CP_MIN_BREAK_TIME, 16#0059). +-define(CP_MAX_PARK_TIME, 16#005A). +-define(CP_OVERSPEED_ALARM_DELTA, 16#005B). +-define(CP_DRIVER_FATIGUE_ALARM_DELTA, 16#005C). +-define(CP_SET_CRASH_ALARM_PARAM, 16#005D). +-define(CP_SET_ROLLOVER_PARAM, 16#005E). +-define(CP_TIME_CONTROLED_CAMERA, 16#0064). +-define(CP_DISTANCE_CONTROLED_CAMERA, 16#0065). +-define(CP_PICTURE_QUALITY, 16#0070). +-define(CP_PICTURE_BRIGHTNESS, 16#0071). +-define(CP_PICTURE_CONTRAST, 16#0072). +-define(CP_PICTURE_SATURATE, 16#0073). +-define(CP_PICTURE_CHROMATICITY, 16#0074). +-define(CP_ODOMETER, 16#0080). +-define(CP_REGISTERED_PROVINCE, 16#0081). +-define(CP_REGISTERED_CITY, 16#0082). +-define(CP_VEHICLE_LICENSE_NUMBER, 16#0083). +-define(CP_VEHICLE_LICENSE_PLATE_COLOR, 16#0084). +-define(CP_GNSS_MODE, 16#0090). +-define(CP_GNSS_BAUDRATE, 16#0091). +-define(CP_GNSS_OUTPUT_RATE, 16#0092). +-define(CP_GNSS_SAMPLING_RATE, 16#0093). +-define(CP_GNSS_UPLOAD_MODE, 16#0094). +-define(CP_GNSS_UPLOAD_UNIT, 16#0095). +-define(CP_CAN_BUS_CH1_SAMPLING, 16#0100). +-define(CP_CAN_BUS_CH1_UPLOAD, 16#0101). +-define(CP_CAN_BUS_CH2_SAMPLING, 16#0102). +-define(CP_CAN_BUS_CH2_UPLOAD, 16#0103). +-define(CP_SET_CAN_BUS_ID_PARAM, 16#0110). + +%% Extra info types in Position Report +-define(CP_POS_EXTRA_MILEAGE, 16#01). +-define(CP_POS_EXTRA_FUEL_METER, 16#02). +-define(CP_POS_EXTRA_SPEED, 16#03). +-define(CP_POS_EXTRA_ALARM_ID, 16#04). +-define(CP_POS_EXTRA_OVERSPEED_ALARM, 16#11). +-define(CP_POS_EXTRA_IN_OUT_ALARM, 16#12). +-define(CP_POS_EXTRA_PATH_TIME_ALARM, 16#13). +-define(CP_POS_EXTRA_EXPANDED_SIGNAL, 16#25). +-define(CP_POS_EXTRA_IO_STATUS, 16#2A). +-define(CP_POS_EXTRA_ANALOG, 16#2B). +-define(CP_POS_EXTRA_RSSI, 16#30). +-define(CP_POS_EXTRA_GNSS_SAT_NUM, 16#31). +-define(CP_POS_EXTRA_CUSTOME, 16#E0). + +%% Default Configs +-define(DEFAULT_MOUNTPOINT, <<"jt808/${clientid}/">>). +-define(DEFAULT_UP_TOPIC, <>). +-define(DEFAULT_DN_TOPIC, <>). + +%% Supported placeholders +-define(PH_CLIENTID, <<"${clientid}">>). +-define(PH_PHONE, <<"${phone}">>). + +-record(auth, { + allow_anonymous :: boolean(), + registry :: emqx_schema:url(), + authentication :: emqx_schema:url() +}). +-type auth() :: #auth{}. + +-endif. diff --git a/apps/emqx_gateway_jt808/rebar.config b/apps/emqx_gateway_jt808/rebar.config new file mode 100644 index 000000000..456746d25 --- /dev/null +++ b/apps/emqx_gateway_jt808/rebar.config @@ -0,0 +1,7 @@ +%% -*- mode: erlang -*- +{erl_opts, [debug_info]}. +{deps, [ + {emqx, {path, "../../apps/emqx"}}, + {emqx_utils, {path, "../emqx_utils"}}, + {emqx_gateway, {path, "../../apps/emqx_gateway"}} +]}. diff --git a/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src new file mode 100644 index 000000000..3d64366ff --- /dev/null +++ b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src @@ -0,0 +1,11 @@ +%% -*- mode: erlang -*- +{application, emqx_gateway_jt808, [ + {description, "JT/T 808 Gateway"}, + {vsn, "0.0.1"}, + {registered, []}, + {applications, [kernel, stdlib, emqx, emqx_gateway]}, + {env, []}, + {modules, []}, + {licenses, ["BSL"]}, + {links, []} +]}. diff --git a/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.erl b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.erl new file mode 100644 index 000000000..90c77dc36 --- /dev/null +++ b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.erl @@ -0,0 +1,99 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% @doc The JT/T 808 Gateway implement + +-module(emqx_gateway_jt808). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_gateway/include/emqx_gateway.hrl"). + +%% define a gateway named jt808 +-gateway(#{ + name => jt808, + callback_module => ?MODULE, + config_schema_module => emqx_jt808_schema, + edition => ee +}). + +%% callback_module must implement the emqx_gateway_impl behaviour +-behaviour(emqx_gateway_impl). + +%% callback for emqx_gateway_impl +-export([ + on_gateway_load/2, + on_gateway_update/3, + on_gateway_unload/2 +]). + +-import( + emqx_gateway_utils, + [ + normalize_config/1, + start_listeners/4, + stop_listeners/2 + ] +). + +%%-------------------------------------------------------------------- +%% emqx_gateway_impl callbacks +%%-------------------------------------------------------------------- + +on_gateway_load( + _Gateway = #{ + name := GwName, + config := Config + }, + Ctx +) -> + Listeners = normalize_config(Config), + ModCfg = #{ + frame_mod => emqx_jt808_frame, + chann_mod => emqx_jt808_channel + }, + case + start_listeners( + Listeners, GwName, Ctx, ModCfg + ) + of + {ok, ListenerPids} -> + %% FIXME: How to throw an exception to interrupt the restart logic ? + %% FIXME: Assign ctx to GwState + {ok, ListenerPids, _GwState = #{ctx => Ctx}}; + {error, {Reason, Listener}} -> + throw( + {badconf, #{ + key => listeners, + value => Listener, + reason => Reason + }} + ) + end. + +on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> + GwName = maps:get(name, Gateway), + try + %% XXX: 1. How hot-upgrade the changes ??? + %% XXX: 2. Check the New confs first before destroy old state??? + on_gateway_unload(Gateway, GwState), + on_gateway_load(Gateway#{config => Config}, Ctx) + catch + Class:Reason:Stk -> + logger:error( + "Failed to update ~ts; " + "reason: {~0p, ~0p} stacktrace: ~0p", + [GwName, Class, Reason, Stk] + ), + {error, Reason} + end. + +on_gateway_unload( + _Gateway = #{ + name := GwName, + config := Config + }, + _GwState +) -> + Listeners = normalize_config(Config), + stop_listeners(GwName, Listeners). diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_auth.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_auth.erl new file mode 100644 index 000000000..aeba537e8 --- /dev/null +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_auth.erl @@ -0,0 +1,101 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_jt808_auth). + +-include("emqx_jt808.hrl"). + +-export([ + init/1, + register/2, + authenticate/2 +]). + +-export_type([auth/0]). + +init(#{allow_anonymous := true}) -> + #auth{registry = undefined, authentication = undefined, allow_anonymous = true}; +init(#{registry := Reg, authentication := Auth, allow_anonymous := Anonymous}) -> + #auth{registry = Reg, authentication = Auth, allow_anonymous = Anonymous}. + +register(_RegFrame, #auth{registry = undefined, allow_anonymous = true}) -> + {ok, anonymous}; +register(_RegFrame, #auth{registry = undefined, allow_anonymous = false}) -> + {error, registry_server_not_existed}; +register(RegFrame, #auth{registry = RegUrl}) -> + #{ + <<"header">> := #{<<"phone">> := Phone}, + <<"body">> := FBody + } = RegFrame, + Params = maps:merge(FBody, #{<<"phone">> => Phone}), + case request(RegUrl, Params) of + {ok, 200, Body} -> + case emqx_utils_json:safe_decode(Body, [return_maps]) of + {ok, #{<<"code">> := 0, <<"authcode">> := Authcode}} -> + {ok, Authcode}; + {ok, #{<<"code">> := Code}} -> + {error, Code}; + _ -> + {error, {invailed_resp, Body}} + end; + {ok, Code, Body} -> + {error, {unknown_resp, Code, Body}}; + {error, Reason} -> + {error, Reason} + end. + +authenticate(_AuthFrame, #auth{authentication = undefined, allow_anonymous = true}) -> + {ok, #{auth_result => true, anonymous => true}}; +authenticate(_AuthFrame, #auth{authentication = undefined, allow_anonymous = false}) -> + {ok, #{auth_result => false, anonymous => false}}; +authenticate(AuthFrame, #auth{authentication = AuthUrl}) -> + #{ + <<"header">> := #{<<"phone">> := Phone}, + <<"body">> := #{<<"code">> := AuthCode} + } = AuthFrame, + case request(AuthUrl, #{<<"code">> => AuthCode, <<"phone">> => Phone}) of + {ok, 200, _} -> + {ok, #{auth_result => true, anonymous => false}}; + {ok, _, _} -> + {ok, #{auth_result => false, anonymous => false}}; + {error, Reason} -> + {error, Reason} + end. + +%%-------------------------------------------------------------------- +%% Inernal functions +%%-------------------------------------------------------------------- + +request(Url, Params) -> + RetryOpts = #{times => 3, interval => 1000, backoff => 2.0}, + Req = {Url, [], "application/json", emqx_utils_json:encode(Params)}, + reply(request_(post, Req, [{autoredirect, true}], [{body_format, binary}], RetryOpts)). + +request_( + Method, + Req, + HTTPOpts, + Opts, + RetryOpts = #{ + times := Times, + interval := Interval, + backoff := BackOff + } +) -> + case httpc:request(Method, Req, HTTPOpts, Opts) of + {error, _Reason} when Times > 0 -> + timer:sleep(trunc(Interval)), + RetryOpts1 = RetryOpts#{ + times := Times - 1, + interval := Interval * BackOff + }, + request_(Method, Req, HTTPOpts, Opts, RetryOpts1); + Other -> + Other + end. + +reply({ok, {{_, Code, _}, _Headers, Body}}) -> + {ok, Code, Body}; +reply({error, Error}) -> + {error, Error}. diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl new file mode 100644 index 000000000..5708efe55 --- /dev/null +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl @@ -0,0 +1,951 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_jt808_channel). +-behaviour(emqx_gateway_channel). + +-include("emqx_jt808.hrl"). +-include_lib("emqx/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). + +%% behaviour callbacks +-export([ + info/1, + info/2, + stats/1 +]). + +-export([ + init/2, + handle_in/2, + handle_deliver/2, + handle_timeout/3, + handle_call/3, + handle_cast/2, + handle_info/2 +]). + +-export([ + terminate/2 +]). + +-record(channel, { + %% Context + ctx :: emqx_gateway_ctx:context(), + %% ConnInfo + conninfo :: emqx_types:conninfo(), + %% ClientInfo + clientinfo :: emqx_types:clientinfo(), + %% Session + session :: undefined | map(), + %% Conn State + conn_state :: conn_state(), + %% Timers + timers :: #{atom() => undefined | disabled | reference()}, + %% AuthCode + authcode :: undefined | anonymous | binary(), + %% Keepalive + keepalive, + %% Msg SN + msg_sn, + %% Down Topic + dn_topic, + %% Up Topic + up_topic, + %% Auth + auth :: emqx_jt808_auth:auth(), + %% Inflight + inflight :: emqx_inflight:inflight(), + mqueue :: queue:queue(), + max_mqueue_len, + rsa_key, + retx_interval, + retx_max_times +}). + +-type conn_state() :: idle | connecting | connected | disconnected. + +-type channel() :: #channel{}. + +-type reply() :: + {outgoing, emqx_types:packet()} + | {outgoing, [emqx_types:packet()]} + | {event, conn_state() | updated} + | {close, Reason :: atom()}. + +-type replies() :: reply() | [reply()]. + +-define(TIMER_TABLE, #{ + alive_timer => keepalive, + retry_timer => retry_delivery +}). + +-define(INFO_KEYS, [ctx, conninfo, zone, clientid, clientinfo, session, conn_state, authcode]). + +-define(RETX_INTERVAL, 8000). +-define(RETX_MAX_TIME, 5). + +-define(DEFAULT_KEEPALIVE, 300). + +-define(MSG(MsgId), #{<<"header">> := #{<<"msg_id">> := MsgId}}). + +-dialyzer({nowarn_function, init/2}). + +%%-------------------------------------------------------------------- +%% Info, Attrs and Caps +%%-------------------------------------------------------------------- +%% @doc Get infos of the channel. +-spec info(channel()) -> emqx_types:infos(). +info(Channel) -> + maps:from_list(info(?INFO_KEYS, Channel)). + +-spec info(list(atom()) | atom(), channel()) -> term(). +info(Keys, Channel) when is_list(Keys) -> + [{Key, info(Key, Channel)} || Key <- Keys]; +info(ctx, #channel{ctx = Ctx}) -> + Ctx; +info(conninfo, #channel{conninfo = ConnInfo}) -> + ConnInfo; +info(zone, #channel{clientinfo = #{zone := Zone}}) -> + Zone; +info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> + ClientId; +info(clientinfo, #channel{clientinfo = ClientInfo}) -> + ClientInfo; +info(session, _) -> + #{}; +info(conn_state, #channel{conn_state = ConnState}) -> + ConnState; +info(authcode, #channel{authcode = AuthCode}) -> + AuthCode. + +stats(_Channel) -> + []. + +%%-------------------------------------------------------------------- +%% Init the Channel +%%-------------------------------------------------------------------- + +-spec init(emqx_types:conninfo(), map()) -> channel(). +init( + ConnInfo = #{ + peername := {PeerHost, _Port}, + sockname := {_Host, SockPort} + }, + Options = #{ + ctx := Ctx, + message_queue_len := MessageQueueLen, + proto := ProtoConf + } +) -> + % TODO: init rsa_key from user input + Peercert = maps:get(peercert, ConnInfo, undefined), + Mountpoint = maps:get(mountpoint, Options, ?DEFAULT_MOUNTPOINT), + ListenerId = + case maps:get(listener, Options, undefined) of + undefined -> undefined; + {GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName) + end, + ClientInfo = setting_peercert_infos( + Peercert, + #{ + zone => default, + listener => ListenerId, + protocol => jt808, + peerhost => PeerHost, + sockport => SockPort, + clientid => undefined, + username => undefined, + is_bridge => false, + is_superuser => false, + mountpoint => Mountpoint + } + ), + + #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo, + session = undefined, + conn_state = idle, + timers = #{}, + authcode = undefined, + keepalive = maps:get(keepalive, Options, ?DEFAULT_KEEPALIVE), + msg_sn = 0, + % TODO: init rsa_key from user input + dn_topic = maps:get(dn_topic, ProtoConf, ?DEFAULT_DN_TOPIC), + up_topic = maps:get(up_topic, ProtoConf, ?DEFAULT_UP_TOPIC), + auth = emqx_jt808_auth:init(ProtoConf), + inflight = emqx_inflight:new(128), + mqueue = queue:new(), + max_mqueue_len = MessageQueueLen, + rsa_key = [0, <<0:1024>>], + retx_interval = maps:get(retry_interval, Options, ?RETX_INTERVAL), + retx_max_times = maps:get(max_retry_times, Options, ?RETX_MAX_TIME) + }. + +setting_peercert_infos(NoSSL, ClientInfo) when + NoSSL =:= nossl; + NoSSL =:= undefined +-> + ClientInfo; +setting_peercert_infos(Peercert, ClientInfo) -> + DN = esockd_peercert:subject(Peercert), + CN = esockd_peercert:common_name(Peercert), + ClientInfo#{dn => DN, cn => CN}. + +%%-------------------------------------------------------------------- +%% Handle incoming packet +%%-------------------------------------------------------------------- + +-spec handle_in(emqx_jt808_frame:frame() | {frame_error, any()}, channel()) -> + {ok, channel()} + | {ok, replies(), channel()} + | {shutdown, Reason :: term(), channel()} + | {shutdown, Reason :: term(), replies(), channel()}. +handle_in(Frame = ?MSG(MType), Channel = #channel{conn_state = ConnState}) when + ConnState /= connected, MType =:= ?MC_REGISTER; + ConnState /= connected, MType =:= ?MC_AUTH +-> + ?SLOG(debug, #{msg => "recv_frame", frame => Frame}), + do_handle_in(Frame, Channel#channel{conn_state = connecting}); +handle_in(Frame, Channel = #channel{conn_state = connected}) -> + ?SLOG(debug, #{msg => "recv_frame", frame => Frame}), + do_handle_in(Frame, Channel); +handle_in(Frame, Channel) -> + ?SLOG(error, #{msg => "unexpected_frame", frame => Frame}), + {stop, unexpected_frame, Channel}. + +%% @private +do_handle_in(Frame = ?MSG(?MC_GENERAL_RESPONSE), Channel = #channel{inflight = Inflight}) -> + #{<<"body">> := #{<<"seq">> := Seq, <<"id">> := Id}} = Frame, + NewInflight = ack_msg(?MC_GENERAL_RESPONSE, {Id, Seq}, Inflight), + {ok, Channel#channel{inflight = NewInflight}}; +do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) -> + #{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame, + case emqx_jt808_auth:register(Frame, Channel0#channel.auth) of + {ok, Authcode} -> + Channel = enrich_clientinfo( + Frame, enrich_conninfo(Frame, Channel0#channel{authcode = Authcode}) + ), + handle_out({?MS_REGISTER_ACK, 0}, MsgSn, Channel); + {error, Reason} -> + ?SLOG(error, #{msg => "register_failed", reason => Reason}), + ResCode = + case is_integer(Reason) of + true -> Reason; + false -> 1 + end, + handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel0) + end; +do_handle_in(Frame = ?MSG(?MC_AUTH), Channel0) -> + #{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame, + Channel = + #channel{clientinfo = #{clientid := ClientId}} = + enrich_clientinfo(Frame, enrich_conninfo(Frame, Channel0)), + authack( + case authenticate(Frame, Channel0) of + true -> + NChannel = prepare_adapter_topic(ensure_connected(Channel)), + emqx_logger:set_metadata_clientid(ClientId), + %% Auto subscribe downlink topics + autosubcribe(NChannel), + _ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel), + %% 0: Successful + {0, MsgSn, NChannel}; + false -> + ?SLOG(error, #{msg => "authenticated_failed"}), + %% 1: Failure + {1, MsgSn, Channel} + end + ); +do_handle_in(Frame = ?MSG(?MC_HEARTBEAT), Channel) -> + handle_out({?MS_GENERAL_RESPONSE, 0, ?MC_HEARTBEAT}, msgsn(Frame), Channel); +do_handle_in(?MSG(?MC_RSA_KEY), Channel = #channel{rsa_key = [E, N]}) -> + Response = #{ + <<"header">> => build_frame_header(?MS_RSA_KEY, Channel), + <<"body">> => #{<<"e">> => E, <<"n">> => N} + }, + % TODO: how to use client's RSA key? + {ok, [{outgoing, Response}], state_inc_sn(Channel)}; +do_handle_in(?MSG(?MC_MULTIMEDIA_DATA_REPORT), Channel = #channel{rsa_key = [_E, _N]}) -> + Response = #{ + <<"header">> => build_frame_header(?MS_MULTIMEDIA_DATA_ACK, Channel), + <<"body">> => #{} + }, + % TODO: how to fill ? + {ok, [{outgoing, Response}], state_inc_sn(Channel)}; +do_handle_in( + Frame = ?MSG(?MC_DRIVER_ID_REPORT), + Channel = #channel{ + up_topic = Topic, + inflight = Inflight + } +) -> + {MsgId, MsgSn} = msgidsn(Frame), + _ = do_publish(Topic, Frame), + case is_driver_id_req_exist(Channel) of + % this is an device passive command + false -> + handle_out({?MS_GENERAL_RESPONSE, 0, MsgId}, MsgSn, Channel); + % this is a response to MS_REQ_DRIVER_ID(0x8702) + true -> + {ok, Channel#channel{inflight = ack_msg(?MC_DRIVER_ID_REPORT, none, Inflight)}} + end; +do_handle_in(?MSG(?MC_DEREGISTER), Channel) -> + {stop, normal, Channel}; +do_handle_in(Frame = #{}, Channel = #channel{up_topic = Topic, inflight = Inflight}) -> + {MsgId, MsgSn} = msgidsn(Frame), + _ = do_publish(Topic, Frame), + case is_general_response_needed(MsgId) of + % these frames device passive request + true -> + handle_out({?MS_GENERAL_RESPONSE, 0, MsgId}, MsgSn, Channel); + % these frames are response to server's request + false -> + {ok, Channel#channel{inflight = ack_msg(MsgId, seq(Frame), Inflight)}} + end; +do_handle_in(Frame, Channel) -> + ?SLOG(error, #{msg => "ignore_unknown_frame", frame => Frame}), + {ok, Channel}. + +do_publish(Topic, Frame) -> + ?SLOG(debug, #{msg => "publish_msg", to_topic => Topic, farme => Frame}), + emqx:publish(emqx_message:make(jt808, ?QOS_1, Topic, emqx_utils_json:encode(Frame))). + +%%-------------------------------------------------------------------- +%% Handle Delivers from broker to client +%%-------------------------------------------------------------------- +-spec handle_deliver(list(emqx_types:deliver()), channel()) -> + {ok, channel()} + | {ok, replies(), channel()}. + +handle_deliver( + Messages0, + Channel = #channel{ + clientinfo = #{mountpoint := Mountpoint}, + mqueue = Queue, + max_mqueue_len = MaxQueueLen + } +) -> + Messages = lists:map( + fun({deliver, _, M}) -> + emqx_mountpoint:unmount(Mountpoint, M) + end, + Messages0 + ), + case MaxQueueLen - queue:len(Queue) of + N when N =< 0 -> + discard_downlink_messages(Messages, Channel), + {ok, Channel}; + N -> + {NMessages, Dropped} = split_by_pos(Messages, N), + log(debug, #{msg => "enqueue_messages", messages => NMessages}, Channel), + metrics_inc('messages.delivered', Channel, erlang:length(NMessages)), + discard_downlink_messages(Dropped, Channel), + Frames = msgs2frame(NMessages, Channel), + NQueue = lists:foldl(fun(F, Q) -> queue:in(F, Q) end, Queue, Frames), + {Outgoings, NChannel} = dispatch_frame(Channel#channel{mqueue = NQueue}), + {ok, [{outgoing, Outgoings}], NChannel} + end. + +split_by_pos(L, Pos) -> + split_by_pos(L, Pos, []). + +split_by_pos([], _, A1) -> + {lists:reverse(A1), []}; +split_by_pos(L, 0, A1) -> + {lists:reverse(A1), L}; +split_by_pos([E | L], N, A1) -> + split_by_pos(L, N - 1, [E | A1]). + +msgs2frame(Messages, Channel) -> + lists:filtermap( + fun(#message{payload = Payload}) -> + case emqx_utils_json:safe_decode(Payload, [return_maps]) of + {ok, Map} -> + MsgId = msgid(Map), + NewHeader = build_frame_header(MsgId, Channel), + Frame = maps:put(<<"header">>, NewHeader, Map), + {true, Frame}; + {error, Reason} -> + log(error, #{msg => "json_decode_error", reason => Reason}, Channel), + false + end + end, + Messages + ). + +authack( + {Code, MsgSn, + Channel = #channel{ + conninfo = ConnInfo, + clientinfo = ClientInfo + }} +) -> + Code == 0 andalso emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]), + handle_out({?MS_GENERAL_RESPONSE, Code, ?MC_AUTH}, MsgSn, Channel). + +handle_out({?MS_GENERAL_RESPONSE, Result, InMsgId}, MsgSn, Channel) -> + Frame = #{ + <<"header">> => build_frame_header(?MS_GENERAL_RESPONSE, Channel), + <<"body">> => #{<<"seq">> => MsgSn, <<"result">> => Result, <<"id">> => InMsgId} + }, + {ok, [{outgoing, Frame}], state_inc_sn(Channel)}; +handle_out({?MS_REGISTER_ACK, 0}, MsgSn, Channel = #channel{authcode = Authcode0}) -> + Authcode = + case Authcode0 == anonymous of + true -> <<>>; + false -> Authcode0 + end, + Frame = #{ + <<"header">> => build_frame_header(?MS_REGISTER_ACK, Channel), + <<"body">> => #{<<"seq">> => MsgSn, <<"result">> => 0, <<"auth_code">> => Authcode} + }, + {ok, [{outgoing, Frame}], state_inc_sn(Channel)}; +handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel) -> + Frame = #{ + <<"header">> => build_frame_header(?MS_REGISTER_ACK, Channel), + <<"body">> => #{<<"seq">> => MsgSn, <<"result">> => ResCode} + }, + {ok, [{outgoing, Frame}], state_inc_sn(Channel)}. + +%%-------------------------------------------------------------------- +%% Handle call +%%-------------------------------------------------------------------- + +-spec handle_call(Req :: term(), From :: term(), channel()) -> + {reply, Reply :: term(), channel()} + | {reply, Reply :: term(), replies(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), emqx_jt808_frame:frame(), channel()}. + +handle_call(kick, _From, Channel) -> + Channel1 = ensure_disconnected(kicked, Channel), + disconnect_and_shutdown(kicked, ok, Channel1); +handle_call(discard, _From, Channel) -> + disconnect_and_shutdown(discarded, ok, Channel); +handle_call(Req, _From, Channel) -> + log(error, #{msg => "unexpected_call", call => Req}, Channel), + reply(ignored, Channel). + +%%-------------------------------------------------------------------- +%% Handle cast +%%-------------------------------------------------------------------- + +-spec handle_cast(Req :: term(), channel()) -> + ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}. +handle_cast(_Req, Channel) -> + {ok, Channel}. + +%%-------------------------------------------------------------------- +%% Handle info +%%-------------------------------------------------------------------- + +-spec handle_info(Info :: term(), channel()) -> + ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}. + +handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) -> + shutdown(Reason, Channel); +handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) -> + shutdown(Reason, Channel); +handle_info( + {sock_closed, Reason}, + Channel = + #channel{ + conn_state = connected + } +) -> + NChannel = ensure_disconnected(Reason, Channel), + shutdown(Reason, NChannel); +handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> + log(error, #{msg => "unexpected_sock_closed", reason => Reason}, Channel), + {ok, Channel}; +handle_info(Info, Channel) -> + log(error, #{msg => "unexpected_info", info => Info}, Channel), + {ok, Channel}. + +%%-------------------------------------------------------------------- +%% Handle timeout +%%-------------------------------------------------------------------- + +-spec handle_timeout(reference(), Msg :: term(), channel()) -> + {ok, channel()} + | {ok, replies(), channel()} + | {shutdown, Reason :: term(), channel()}. + +handle_timeout( + _TRef, + {keepalive, _StatVal}, + Channel = #channel{keepalive = undefined} +) -> + {ok, Channel}; +handle_timeout( + _TRef, + {keepalive, _StatVal}, + Channel = #channel{conn_state = disconnected} +) -> + {ok, Channel}; +handle_timeout( + _TRef, + {keepalive, StatVal}, + Channel = #channel{keepalive = Keepalive} +) -> + case emqx_keepalive:check(StatVal, Keepalive) of + {ok, NKeepalive} -> + NChannel = Channel#channel{keepalive = NKeepalive}, + {ok, reset_timer(alive_timer, NChannel)}; + {error, timeout} -> + shutdown(keepalive_timeout, Channel) + end; +handle_timeout( + _TRef, retry_delivery, Channel = #channel{inflight = Inflight, retx_interval = RetxInterval} +) -> + case emqx_inflight:is_empty(Inflight) of + true -> + {ok, clean_timer(retry_timer, Channel)}; + false -> + Frames = lists:sort(sortfun(), emqx_inflight:to_list(Inflight)), + {Outgoings, NInflight} = retry_delivery( + Frames, erlang:system_time(millisecond), RetxInterval, Inflight, [] + ), + {Outgoings2, NChannel} = dispatch_frame(Channel#channel{inflight = NInflight}), + {ok, [{outgoing, Outgoings ++ Outgoings2}], reset_timer(retry_timer, NChannel)} + end. + +sortfun() -> + fun({_, {_, _, Ts1}}, {_, {_, _, Ts2}}) -> Ts1 < Ts2 end. + +retry_delivery([], _Now, _Interval, Inflight, Acc) -> + {lists:reverse(Acc), Inflight}; +retry_delivery([{Key, {_Frame, 0, _}} | Frames], Now, Interval, Inflight, Acc) -> + %% todo log(error, "has arrived max re-send times, drop ~p", [Frame]), + NInflight = emqx_inflight:delete(Key, Inflight), + retry_delivery(Frames, Now, Interval, NInflight, Acc); +retry_delivery([{Key, {Frame, RetxCount, Ts}} | Frames], Now, Interval, Inflight, Acc) -> + Diff = Now - Ts, + case Diff >= Interval of + true -> + NInflight = emqx_inflight:update(Key, {Frame, RetxCount - 1, Now}, Inflight), + retry_delivery(Frames, Now, Interval, NInflight, [Frame | Acc]); + _ -> + retry_delivery(Frames, Now, Interval, Inflight, Acc) + end. + +dispatch_frame( + Channel = #channel{ + msg_sn = TxMsgSn, + mqueue = Queue, + inflight = Inflight, + retx_max_times = RetxMax + } +) -> + case emqx_inflight:is_full(Inflight) orelse queue:is_empty(Queue) of + true -> + {[], Channel}; + false -> + {{value, Frame}, NewQueue} = queue:out(Queue), + + log(debug, #{msg => "delivery", frame => Frame}, Channel), + + NewInflight = emqx_inflight:insert( + set_msg_ack(msgid(Frame), TxMsgSn), + {Frame, RetxMax, erlang:system_time(millisecond)}, + Inflight + ), + NChannel = Channel#channel{mqueue = NewQueue, inflight = NewInflight}, + {[Frame], ensure_timer(retry_timer, NChannel)} + end. + +%%-------------------------------------------------------------------- +%% Ensure timers +%%-------------------------------------------------------------------- + +ensure_timer(Name, Channel = #channel{timers = Timers}) -> + TRef = maps:get(Name, Timers, undefined), + Time = interval(Name, Channel), + case TRef == undefined andalso Time > 0 of + true -> ensure_timer(Name, Time, Channel); + %% Timer disabled or exists + false -> Channel + end. + +ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> + log(debug, #{msg => "start_timer", name => Name, time => Time}, Channel), + Msg = maps:get(Name, ?TIMER_TABLE), + TRef = emqx_utils:start_timer(Time, Msg), + Channel#channel{timers = Timers#{Name => TRef}}. + +reset_timer(Name, Channel) -> + ensure_timer(Name, clean_timer(Name, Channel)). + +clean_timer(Name, Channel = #channel{timers = Timers}) -> + Channel#channel{timers = maps:remove(Name, Timers)}. + +interval(alive_timer, #channel{keepalive = KeepAlive}) -> + emqx_keepalive:info(interval, KeepAlive); +interval(retry_timer, #channel{retx_interval = RetxIntv}) -> + RetxIntv. + +%%-------------------------------------------------------------------- +%% Terminate +%%-------------------------------------------------------------------- + +terminate(_Reason, #channel{clientinfo = #{clientid := undefined}}) -> + ok; +terminate(_Reason, #channel{conn_state = disconnected}) -> + ok; +terminate(Reason, #channel{clientinfo = ClientInfo, conninfo = ConnInfo}) -> + ?SLOG(info, #{msg => "connection_shutdown", reason => Reason}), + NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, + ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, NConnInfo]). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +maybe_fix_mountpoint(ClientInfo = #{mountpoint := undefined}) -> + ClientInfo; +maybe_fix_mountpoint(ClientInfo = #{mountpoint := Mountpoint}) -> + %% TODO: Enrich the variable replacement???? + %% i.e: ${ClientInfo.auth_result.productKey} + Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo), + ClientInfo#{mountpoint := Mountpoint1}. + +ensure_connected( + Channel = #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo + } +) -> + NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, + ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), + Channel#channel{ + conninfo = NConnInfo, + conn_state = connected + }. + +%% Ensure disconnected +ensure_disconnected( + Reason, + Channel = #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo + } +) -> + NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, + ok = run_hooks( + Ctx, + 'client.disconnected', + [ClientInfo, Reason, NConnInfo] + ), + Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. + +ack_msg(MsgId, KeyParam, Inflight) -> + Key = get_msg_ack(MsgId, KeyParam), + case emqx_inflight:contain(Key, Inflight) of + true -> emqx_inflight:delete(Key, Inflight); + false -> Inflight + end. + +set_msg_ack(?MS_SET_CLIENT_PARAM, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_SET_CLIENT_PARAM, MsgSn}}; +set_msg_ack(?MS_QUERY_CLIENT_ALL_PARAM, MsgSn) -> + {?MC_QUERY_PARAM_ACK, MsgSn}; +set_msg_ack(?MS_QUERY_CLIENT_PARAM, MsgSn) -> + {?MC_QUERY_PARAM_ACK, MsgSn}; +set_msg_ack(?MS_CLIENT_CONTROL, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_CLIENT_CONTROL, MsgSn}}; +set_msg_ack(?MS_QUERY_CLIENT_ATTRIB, _MsgSn) -> + {?MC_QUERY_ATTRIB_ACK, none}; +set_msg_ack(?MS_OTA, _MsgSn) -> + {?MC_OTA_ACK, none}; +set_msg_ack(?MS_QUERY_LOCATION, MsgSn) -> + {?MC_QUERY_LOCATION_ACK, MsgSn}; +set_msg_ack(?MS_TRACE_LOCATION, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_TRACE_LOCATION, MsgSn}}; +set_msg_ack(?MS_CONFIRM_ALARM, _MsgSn) -> + % TODO: how to ack this message? + {}; +set_msg_ack(?MS_SEND_TEXT, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_SEND_TEXT, MsgSn}}; +set_msg_ack(?MS_SET_EVENT, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_SET_EVENT, MsgSn}}; +set_msg_ack(?MS_SEND_QUESTION, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_SEND_QUESTION, MsgSn}}; +set_msg_ack(?MS_SET_MENU, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_SET_MENU, MsgSn}}; +set_msg_ack(?MS_INFO_CONTENT, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_INFO_CONTENT, MsgSn}}; +set_msg_ack(?MS_PHONE_CALLBACK, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_PHONE_CALLBACK, MsgSn}}; +set_msg_ack(?MS_SET_PHONE_NUMBER, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_SET_PHONE_NUMBER, MsgSn}}; +set_msg_ack(?MS_VEHICLE_CONTROL, MsgSn) -> + {?MC_VEHICLE_CTRL_ACK, MsgSn}; +set_msg_ack(?MS_SET_CIRCLE_AREA, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_SET_CIRCLE_AREA, MsgSn}}; +set_msg_ack(?MS_DEL_CIRCLE_AREA, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_DEL_CIRCLE_AREA, MsgSn}}; +set_msg_ack(?MS_SET_RECT_AREA, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_SET_RECT_AREA, MsgSn}}; +set_msg_ack(?MS_DEL_RECT_AREA, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_DEL_RECT_AREA, MsgSn}}; +set_msg_ack(?MS_SET_POLY_AREA, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_SET_POLY_AREA, MsgSn}}; +set_msg_ack(?MS_DEL_POLY_AREA, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_DEL_POLY_AREA, MsgSn}}; +set_msg_ack(?MS_SET_PATH, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_SET_PATH, MsgSn}}; +set_msg_ack(?MS_DEL_PATH, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_DEL_PATH, MsgSn}}; +set_msg_ack(?MS_DRIVE_RECORD_CAPTURE, MsgSn) -> + {?MC_DRIVE_RECORD_REPORT, MsgSn}; +set_msg_ack(?MS_DRIVE_REC_PARAM_SEND, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_DRIVE_REC_PARAM_SEND, MsgSn}}; +set_msg_ack(?MS_REQ_DRIVER_ID, _MsgSn) -> + {?MC_DRIVER_ID_REPORT, none}; +set_msg_ack(?MS_CAMERA_SHOT, MsgSn) -> + % TODO: spec has two conflicted statement about this ack + % section 7.9.3 requires general ack + % section 8.55 requires 0x0805 + {?MC_CAMERA_SHOT_ACK, MsgSn}; +set_msg_ack(?MS_MM_DATA_SEARCH, MsgSn) -> + {?MC_MM_DATA_SEARCH_ACK, MsgSn}; +set_msg_ack(?MS_MM_DATA_UPLOAD, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_MM_DATA_UPLOAD, MsgSn}}; +set_msg_ack(?MS_VOICE_RECORD, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_VOICE_RECORD, MsgSn}}; +set_msg_ack(?MS_SINGLE_MM_DATA_CTRL, MsgSn) -> + % TODO: right? + {?MC_MM_DATA_SEARCH_ACK, MsgSn}; +set_msg_ack(?MS_SEND_TRANSPARENT_DATA, MsgSn) -> + {?MC_GENERAL_RESPONSE, {?MS_SEND_TRANSPARENT_DATA, MsgSn}}; +set_msg_ack(MsgId, Param) -> + error({invalid_message_type, MsgId, Param}). + +get_msg_ack(?MC_GENERAL_RESPONSE, {MsgId, MsgSn}) -> + {?MC_GENERAL_RESPONSE, {MsgId, MsgSn}}; +get_msg_ack(?MC_QUERY_PARAM_ACK, MsgSn) -> + {?MC_QUERY_PARAM_ACK, MsgSn}; +get_msg_ack(?MC_QUERY_ATTRIB_ACK, _MsgSn) -> + {?MC_QUERY_ATTRIB_ACK, none}; +get_msg_ack(?MC_OTA_ACK, _MsgSn) -> + {?MC_OTA_ACK, none}; +get_msg_ack(?MC_QUERY_LOCATION_ACK, MsgSn) -> + {?MC_QUERY_LOCATION_ACK, MsgSn}; +get_msg_ack(?MC_QUESTION_ACK, MsgSn) -> + {?MC_QUESTION_ACK, MsgSn}; +get_msg_ack(?MC_VEHICLE_CTRL_ACK, MsgSn) -> + {?MC_VEHICLE_CTRL_ACK, MsgSn}; +get_msg_ack(?MC_DRIVE_RECORD_REPORT, MsgSn) -> + {?MC_DRIVE_RECORD_REPORT, MsgSn}; +get_msg_ack(?MC_CAMERA_SHOT_ACK, MsgSn) -> + {?MC_CAMERA_SHOT_ACK, MsgSn}; +get_msg_ack(?MC_MM_DATA_SEARCH_ACK, MsgSn) -> + {?MC_MM_DATA_SEARCH_ACK, MsgSn}; +get_msg_ack(?MC_DRIVER_ID_REPORT, _MsgSn) -> + {?MC_DRIVER_ID_REPORT, none}; +get_msg_ack(MsgId, MsgSn) -> + error({invalid_message_type, MsgId, MsgSn}). + +build_frame_header(MsgId, #channel{clientinfo = #{phone := Phone}, msg_sn = TxMsgSn}) -> + build_frame_header(MsgId, 0, Phone, TxMsgSn). + +build_frame_header(MsgId, Encrypt, Phone, TxMsgSn) -> + #{ + <<"msg_id">> => MsgId, + <<"encrypt">> => Encrypt, + <<"phone">> => Phone, + <<"msg_sn">> => TxMsgSn + }. + +seq(#{<<"body">> := #{<<"seq">> := MsgSn}}) -> MsgSn; +seq(#{}) -> 0. + +msgsn(#{<<"header">> := #{<<"msg_sn">> := MsgSn}}) -> MsgSn. + +msgid(#{<<"header">> := #{<<"msg_id">> := MsgId}}) -> MsgId. + +msgidsn(#{ + <<"header">> := #{ + <<"msg_id">> := MsgId, + <<"msg_sn">> := MsgSn + } +}) -> + {MsgId, MsgSn}. + +state_inc_sn(Channel = #channel{msg_sn = Sn}) -> + Channel#channel{msg_sn = next_msg_sn(Sn)}. + +next_msg_sn(16#FFFF) -> 0; +next_msg_sn(Sn) -> Sn + 1. + +is_general_response_needed(?MC_EVENT_REPORT) -> true; +is_general_response_needed(?MC_LOCATION_REPORT) -> true; +is_general_response_needed(?MC_INFO_REQ_CANCEL) -> true; +is_general_response_needed(?MC_WAYBILL_REPORT) -> true; +is_general_response_needed(?MC_BULK_LOCATION_REPORT) -> true; +is_general_response_needed(?MC_CAN_BUS_REPORT) -> true; +is_general_response_needed(?MC_MULTIMEDIA_EVENT_REPORT) -> true; +is_general_response_needed(?MC_SEND_TRANSPARENT_DATA) -> true; +is_general_response_needed(?MC_SEND_ZIP_DATA) -> true; +is_general_response_needed(_) -> false. + +is_driver_id_req_exist(#channel{inflight = Inflight}) -> + % if there is a MS_REQ_DRIVER_ID (0x8702) command in re-tx queue + Key = get_msg_ack(?MC_DRIVER_ID_REPORT, none), + emqx_inflight:contain(Key, Inflight). + +authenticate(_AuthFrame, #channel{authcode = anonymous}) -> + true; +authenticate(AuthFrame, #channel{authcode = undefined, auth = Auth}) -> + %% Try request authentication server + case emqx_jt808_auth:authenticate(AuthFrame, Auth) of + {ok, #{auth_result := IsAuth}} -> + IsAuth; + {error, Reason} -> + ?SLOG(error, #{msg => "request_auth_server_failed", reason => Reason}), + false + end; +authenticate( + #{<<"body">> := #{<<"code">> := InCode}}, + #channel{authcode = Authcode} +) -> + InCode == Authcode. + +enrich_conninfo( + #{<<"header">> := #{<<"phone">> := Phone}}, + Channel = #channel{conninfo = ConnInfo} +) -> + NConnInfo = ConnInfo#{ + proto_name => <<"jt808">>, + proto_ver => <<"2013">>, + clean_start => true, + clientid => Phone, + username => undefined, + conn_props => #{}, + connected => true, + connected_at => erlang:system_time(millisecond), + keepalive => ?DEFAULT_KEEPALIVE, + receive_maximum => 0, + expiry_interval => 0 + }, + Channel#channel{conninfo = NConnInfo}. + +%% Register +enrich_clientinfo( + #{ + <<"header">> := #{<<"phone">> := Phone}, + <<"body">> := #{ + <<"manufacturer">> := Manu, + <<"dev_id">> := DevId + } + }, + Channel = #channel{clientinfo = ClientInfo} +) -> + NClientInfo = maybe_fix_mountpoint(ClientInfo#{ + phone => Phone, + clientid => Phone, + manufacturer => Manu, + terminal_id => DevId + }), + Channel#channel{clientinfo = NClientInfo}; +%% Auth +enrich_clientinfo( + #{<<"header">> := #{<<"phone">> := Phone}}, + Channel = #channel{clientinfo = ClientInfo} +) -> + NClientInfo = ClientInfo#{ + phone => Phone, + clientid => Phone + }, + Channel#channel{clientinfo = NClientInfo}. + +prepare_adapter_topic(Channel = #channel{up_topic = UpTopic, dn_topic = DnTopic}) -> + Channel#channel{ + up_topic = replvar(UpTopic, Channel), + dn_topic = replvar(DnTopic, Channel) + }. + +replvar(undefined, _Channel) -> + undefined; +replvar(Topic, #channel{clientinfo = #{clientid := ClientId, phone := Phone}}) when + is_binary(Topic) +-> + do_replvar(Topic, #{clientid => ClientId, phone => Phone}). + +do_replvar(Topic, Vars) -> + ClientID = maps:get(clientid, Vars, undefined), + Phone = maps:get(phone, Vars, undefined), + List = [ + {?PH_CLIENTID, ClientID}, + {?PH_PHONE, Phone} + ], + lists:foldl(fun feed_var/2, Topic, List). + +feed_var({_PH, undefined}, Topic) -> + Topic; +feed_var({PH, Value}, Topic) -> + emqx_topic:feed_var(PH, Value, Topic). + +autosubcribe(#channel{dn_topic = Topic}) when + Topic == undefined; + Topic == "" +-> + ok; +autosubcribe(#channel{ + clientinfo = + ClientInfo = + #{clientid := ClientId}, + dn_topic = Topic +}) -> + SubOpts = #{rap => 0, nl => 0, qos => 0, rh => 0}, + emqx:subscribe(Topic, ClientId, SubOpts), + ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts#{is_new => true}]). + +start_keepalive(Secs, _Channel) when Secs > 0 -> + self() ! {keepalive, start, round(Secs) * 1000}. + +run_hooks(Ctx, Name, Args) -> + emqx_gateway_ctx:metrics_inc(Ctx, Name), + emqx_hooks:run(Name, Args). + +discard_downlink_messages([], _Channel) -> + ok; +discard_downlink_messages(Messages, Channel) -> + log( + error, + #{ + msg => "discard_new_downlink_messages", + reason => + "Discard new downlink messages due to that too" + " many messages are waiting their ACKs.", + messages => Messages + }, + Channel + ), + metrics_inc('delivery.dropped', Channel, erlang:length(Messages)). + +metrics_inc(Name, #channel{ctx = Ctx}, Oct) -> + emqx_gateway_ctx:metrics_inc(Ctx, Name, Oct). + +log(Level, Meta, #channel{clientinfo = #{clientid := ClientId, username := Username}} = _Channel) -> + ?SLOG(Level, Meta#{clientid => ClientId, username => Username}). + +reply(Reply, Channel) -> + {reply, Reply, Channel}. + +shutdown(Reason, Channel) -> + {shutdown, Reason, Channel}. + +shutdown(Reason, Reply, Channel) -> + {shutdown, Reason, Reply, Channel}. + +disconnect_and_shutdown(Reason, Reply, Channel) -> + shutdown(Reason, Reply, Channel). diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_frame.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_frame.erl new file mode 100644 index 000000000..02422235b --- /dev/null +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_frame.erl @@ -0,0 +1,1105 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_jt808_frame). + +-behaviour(emqx_gateway_frame). + +-include("emqx_jt808.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% emqx_gateway_frame callbacks +-export([ + initial_parse_state/1, + serialize_opts/0, + serialize_pkt/2, + parse/2, + format/1, + type/1, + is_message/1 +]). + +-define(FLAG, 1 / binary). +-define(BYTE, 8 / big - integer). +-define(WORD, 16 / big - integer). +-define(DWORD, 32 / big - integer). + +-define(NO_FRAGMENT, 0). +-define(HAS_FRAGMENT, 1). + +-type frame() :: map(). + +-type phase() :: searching_head_hex7e | {escaping_hex7d, binary()}. + +-type parser_state() :: #{ + data => binary(), + phase => phase() +}. + +-export_type([frame/0]). + +%%-------------------------------------------------------------------- +%% Callback APIs +%%-------------------------------------------------------------------- + +-spec initial_parse_state(map()) -> parser_state(). +initial_parse_state(_) -> + #{data => <<>>, phase => searching_head_hex7e}. + +-spec serialize_opts() -> emqx_gateway_frame:serialize_options(). +serialize_opts() -> + #{}. + +-spec parse(binary(), parser_state()) -> + emqx_gateway_frame:parse_result(). + +parse(Bin, State) -> + do_parse(Bin, State). + +serialize_pkt(Frame, _Opts) -> + serialize(Frame). + +format(Msg) -> + io_lib:format("~p", [Msg]). + +type(_) -> + jt808. + +is_message(#{}) -> + true; +is_message(_) -> + false. + +%%-------------------------------------------------------------------- +%% Parse Message +%%-------------------------------------------------------------------- +do_parse(Packet, State) -> + escape_head_hex7e(Packet, State). + +escape_head_hex7e(<<16#7e, Rest/binary>>, State = #{phase := searching_head_hex7e}) -> + %% 0x7e is start of a valid message + escape_frame(Rest, State); +escape_head_hex7e(<<_C, Rest/binary>>, State = #{phase := searching_head_hex7e}) -> + %% discard char other than 0x7e which is the start flag + escape_head_hex7e(Rest, State); +escape_head_hex7e(<<16#02, Rest/binary>>, State = #{data := Acc, phase := escaping_hex7d}) -> + %% corner case: 0x7d has been received in the end of last frame segment + escape_frame(Rest, State#{data => <>}); +escape_head_hex7e(<<16#01, Rest/binary>>, State = #{data := Acc, phase := escaping_hex7d}) -> + %% corner case: 0x7d has been received in the end of last frame segment + escape_frame(Rest, State#{data => <>}); +escape_head_hex7e(Rest, State = #{data := _Acc, phase := escaping_hex7d}) -> + %% continue parsing to escape 0x7d + escape_frame(Rest, State). + +escape_frame(Rest, State = #{data := Acc}) -> + case do_escape_frame(Rest, Acc) of + {ok, Msg, NRest} -> + {ok, parse_message(Msg), NRest, State#{data => <<>>, phase => searching_head_hex7e}}; + {error, _E} = Err -> + Err; + {more_data_follow, NRest} -> + {more, #{data => NRest, phase => escaping_hex7d}} + end. + +do_escape_frame(<<16#7d, 16#02, Rest/binary>>, Acc) -> + do_escape_frame(Rest, <>); +do_escape_frame(<<16#7d, 16#01, Rest/binary>>, Acc) -> + do_escape_frame(Rest, <>); +do_escape_frame(<<16#7d, _Other:8, _Rest/binary>>, _Acc) -> + %% only 0x02 and 0x01 is allowed to follow 0x7d + {error, invalid_message}; +do_escape_frame(<<16#7d>>, Acc) -> + %% corner case: last byte of the frame segment is 0x7d, + %% 0x01 or 0x02 is expected in next frame segment + {more_data_follow, Acc}; +do_escape_frame(<<16#7e, _Rest/binary>>, <<>>) -> + %% empty message + {error, invalid_message}; +do_escape_frame(<<16#7e, Rest/binary>>, Acc) -> + %% end of a normal message + case check(Acc) of + {error, _} = Err -> + Err; + Msg -> + {ok, Msg, Rest} + end; +do_escape_frame(<>, Acc) -> + do_escape_frame(Rest, <>); +do_escape_frame(<<>>, Acc) -> + {more_data_follow, Acc}. + +parse_message(Binary) -> + case parse_message_header(Binary) of + {ok, Header = #{<<"msg_id">> := MsgId}, RestBinary} -> + #{<<"header">> => Header, <<"body">> => parse_message_body(MsgId, RestBinary)}; + invalid_message -> + {error, invalid_message} + end. + +parse_message_header( + <> +) -> + {ok, + #{ + <<"msg_id">> => MsgId, + <<"encrypt">> => Encypt, + <<"len">> => Length, + <<"phone">> => from_bcd(PhoneBCD, []), + <<"msg_sn">> => MsgSn + }, + Rest}; +parse_message_header( + <> +) -> + {ok, + #{ + <<"msg_id">> => MsgId, + <<"encrypt">> => Encypt, + <<"len">> => Length, + <<"phone">> => from_bcd(PhoneBCD, []), + <<"msg_sn">> => MsgSn, + <<"frag_total">> => FragTotal, + <<"frag_sn">> => FragSeq + }, + Rest}; +parse_message_header(_) -> + invalid_message. + +parse_message_body(?MC_GENERAL_RESPONSE, <>) -> + #{<<"seq">> => Seq, <<"id">> => Id, <<"result">> => Result}; +parse_message_body(?MC_HEARTBEAT, <<>>) -> + #{}; +parse_message_body( + ?MC_REGISTER, + <> +) -> + #{ + <<"province">> => Province, + <<"city">> => City, + <<"manufacturer">> => Manufacturer, + <<"model">> => Model, + <<"dev_id">> => DevId, + <<"color">> => Color, + <<"license_number">> => LicNumber + }; +parse_message_body(?MC_DEREGISTER, <<>>) -> + #{}; +parse_message_body(?MC_AUTH, Binary) -> + #{<<"code">> => Binary}; +parse_message_body(?MC_QUERY_PARAM_ACK, <>) -> + {Length, Params} = parse_client_params(Rest), + #{<<"seq">> => Seq, <<"length">> => Length, <<"params">> => Params}; +parse_message_body( + ?MC_QUERY_ATTRIB_ACK, + <> +) -> + <> = Rest, + <> = Rest2, + #{ + <<"type">> => Type, + <<"manufacturer">> => Manufacturer, + <<"model">> => Model, + <<"id">> => Id, + <<"iccid">> => from_bcd(ICCID, []), + <<"hardware_version">> => HV, + <<"firmware_version">> => FV, + <<"gnss_prop">> => GNSSProp, + <<"comm_prop">> => CommProp + }; +parse_message_body(?MC_OTA_ACK, <>) -> + #{<<"type">> => Type, <<"result">> => Result}; +parse_message_body(?MC_LOCATION_REPORT, Binary) -> + parse_location_report(Binary); +parse_message_body(?MC_QUERY_LOCATION_ACK, <>) -> + Params = parse_location_report(Rest), + #{<<"seq">> => Seq, <<"params">> => Params}; +parse_message_body(?MC_EVENT_REPORT, <>) -> + #{<<"id">> => Id}; +parse_message_body(?MC_QUESTION_ACK, <>) -> + #{<<"seq">> => Seq, <<"id">> => Id}; +parse_message_body(?MC_INFO_REQ_CANCEL, <>) -> + #{<<"id">> => Id, <<"flag">> => Flag}; +parse_message_body(?MC_VEHICLE_CTRL_ACK, <>) -> + #{<<"seq">> => Seq, <<"location">> => parse_location_report(Location)}; +parse_message_body(?MC_DRIVE_RECORD_REPORT, <>) -> + #{<<"seq">> => Seq, <<"command">> => Command, <<"data">> => base64:encode(Data)}; +parse_message_body(?MC_WAYBILL_REPORT, <>) -> + #{<<"length">> => Length, <<"data">> => base64:encode(Data)}; +parse_message_body( + ?MC_DRIVER_ID_REPORT, + <> +) -> + <> = Rest, + <> = Rest2, + #{ + <<"status">> => Status, + <<"time">> => from_bcd(TimeBCD, []), + <<"ic_result">> => IcResult, + <<"driver_name">> => Name, + <<"certificate">> => Certificate, + <<"organization">> => Orgnization, + <<"cert_expiry">> => from_bcd(CertExpiryBCD, []) + }; +parse_message_body(?MC_BULK_LOCATION_REPORT, <>) -> + #{ + <<"type">> => Type, + <<"length">> => Count, + <<"location">> => parse_bulk_location_report(Count, Rest, []) + }; +parse_message_body(?MC_CAN_BUS_REPORT, <>) -> + CanData = parse_can_data(Count, Rest, []), + #{<<"length">> => Count, <<"time">> => from_bcd(TimeBCD, []), <<"can_data">> => CanData}; +parse_message_body( + ?MC_MULTIMEDIA_EVENT_REPORT, <> +) -> + #{ + <<"id">> => Id, + <<"type">> => Type, + <<"format">> => Format, + <<"event">> => Event, + <<"channel">> => Channel + }; +parse_message_body( + ?MC_MULTIMEDIA_DATA_REPORT, + <> +) -> + #{ + <<"id">> => Id, + <<"type">> => Type, + <<"format">> => Format, + <<"event">> => Event, + <<"channel">> => Channel, + <<"location">> => parse_location_report(Location), + <<"multimedia">> => base64:encode(Multimedia) + }; +parse_message_body(?MC_CAMERA_SHOT_ACK, <>) when + Result =:= 0 +-> + %% if Result is 0, means suceeded, "length" & "ids" present + {Array, _} = dword_array(Count, Rest, []), + #{<<"seq">> => Seq, <<"result">> => Result, <<"length">> => Count, <<"ids">> => Array}; +parse_message_body(?MC_CAMERA_SHOT_ACK, <>) -> + %% if Result is not 0, means failed, no "length" & "ids" + #{<<"seq">> => Seq, <<"result">> => Result}; +parse_message_body(?MC_MM_DATA_SEARCH_ACK, <>) -> + #{ + <<"seq">> => Seq, + <<"length">> => Count, + <<"result">> => parse_multimedia_search_result(Count, Rest, []) + }; +parse_message_body(?MC_SEND_TRANSPARENT_DATA, <>) -> + #{<<"type">> => Type, <<"data">> => base64:encode(Data)}; +parse_message_body(?MC_SEND_ZIP_DATA, <>) -> + #{<<"length">> => Length, <<"data">> => base64:encode(Data)}; +parse_message_body(?MC_RSA_KEY, <>) -> + #{<<"e">> => E, <<"n">> => base64:encode(N)}; +parse_message_body(UnknownId, Binary) -> + ?SLOG(error, #{msg => "unknow_message", id => UnknownId, msg_body => Binary}), + {error, invalid_message}. + +parse_client_params(<>) -> + {Count, parse_client_params2(Count, Rest, [])}. + +parse_client_params2(0, _Rest, Acc) -> + lists:reverse(Acc); +parse_client_params2(Count, <>, Acc) -> + {Value, Rest3} = + case client_param_data_type(Id) of + dword -> decode_cp_dword(Rest); + word -> decode_cp_word(Rest); + byte -> decode_cp_byte(Rest); + string -> decode_cp_string(Length, Rest); + reserved -> decode_cp_reserved(Length, Rest) + end, + parse_client_params2(Count - 1, Rest3, [#{<<"id">> => Id, <<"value">> => Value} | Acc]). + +decode_cp_dword(<>) -> + {Value, Rest}. + +decode_cp_word(<>) -> + {Value, Rest}. + +decode_cp_byte(<>) -> + {Value, Rest}. + +decode_cp_string(Length, Binary) -> + <> = Binary, + {Value, Rest}. + +decode_cp_reserved(Length, Binary) -> + <> = Binary, + {base64:encode(Value), Rest}. + +parse_location_report( + <> +) -> + Ret = #{ + <<"alarm">> => Alarm, + <<"status">> => Status, + <<"latitude">> => Latitude, + <<"longitude">> => Longitude, + <<"altitude">> => Altitude, + <<"speed">> => Speed, + <<"direction">> => Direction, + <<"time">> => from_bcd(TimeBCD, []) + }, + case Rest of + <<>> -> Ret; + _ -> Ret#{<<"extra">> => parse_location_report_extra(Rest, #{})} + end. + +parse_location_report_extra(<<>>, Acc) -> + Acc; +parse_location_report_extra( + <>, Acc +) -> + parse_location_report_extra(Rest, Acc#{<<"mileage">> => MileAge}); +parse_location_report_extra( + <>, Acc +) -> + parse_location_report_extra(Rest, Acc#{<<"fuel_meter">> => FuelMeter}); +parse_location_report_extra(<>, Acc) -> + parse_location_report_extra(Rest, Acc#{<<"speed">> => Speed}); +parse_location_report_extra( + <>, Acc +) -> + parse_location_report_extra(Rest, Acc#{<<"alarm_id">> => AlarmID}); +parse_location_report_extra( + <>, Acc +) -> + case Length of + 1 -> + <> = Rest, + parse_location_report_extra(Rest2, Acc#{<<"overspeed_alarm">> => #{<<"type">> => Type}}); + 5 -> + <> = Rest, + parse_location_report_extra(Rest2, Acc#{ + <<"overspeed_alarm">> => #{<<"type">> => Type, <<"id">> => Id} + }) + end; +parse_location_report_extra( + <>, + Acc +) -> + parse_location_report_extra(Rest, Acc#{ + <<"in_out_alarm">> => #{<<"type">> => Type, <<"id">> => Id, <<"direction">> => Direction} + }); +parse_location_report_extra( + <>, + Acc +) -> + parse_location_report_extra(Rest, Acc#{ + <<"path_time_alarm">> => #{<<"id">> => ID, <<"time">> => Time, <<"result">> => Result} + }); +parse_location_report_extra( + <>, Acc +) -> + <> = Signal, + parse_location_report_extra(Rest, Acc#{ + <<"signal">> => #{ + <<"low_beam">> => LowBeam, + <<"high_beam">> => HighBeam, + <<"right_turn">> => RightTurnSignal, + <<"left_turn">> => LeftTurnSignal, + <<"brake">> => Brake, + <<"reverse">> => Reverse, + <<"fog">> => Fog, + <<"side_marker">> => SideMarker, + <<"horn">> => Horn, + <<"air_conditioner">> => AirConditioner, + <<"neutral">> => Neutral, + <<"retarder">> => Retarder, + <<"abs">> => ABS, + <<"heater">> => Heater, + <<"cluth">> => Cluth + } + }); +parse_location_report_extra( + <>, Acc +) -> + parse_location_report_extra(Rest, Acc#{ + <<"io_status">> => #{<<"deep_sleep">> => DeepSleep, <<"sleep">> => Sleep} + }); +parse_location_report_extra( + <>, Acc +) -> + parse_location_report_extra(Rest, Acc#{<<"analog">> => #{<<"ad0">> => AD0, <<"ad1">> => AD1}}); +parse_location_report_extra(<>, Acc) -> + parse_location_report_extra(Rest, Acc#{<<"rssi">> => Rssi}); +parse_location_report_extra( + <>, Acc +) -> + parse_location_report_extra(Rest, Acc#{<<"gnss_sat_num">> => SatNum}); +%% TODO: ensure custom data +parse_location_report_extra(<>, Acc) -> + <> = Rest, + parse_location_report_extra(Rest, Acc#{<<"custome">> => base64:encode(Data)}); +parse_location_report_extra(<>, Acc) when + CustomeId >= 16#E0, CustomeId =< 16#FF +-> + <> = Rest, + Custome = maps:get(<<"custome">>, Acc, #{}), + NCustomeId = integer_to_binary(CustomeId), + parse_location_report_extra( + Rest2, + Acc#{<<"custome">> => maps:put(NCustomeId, base64:encode(Data), Custome)} + ); +parse_location_report_extra(<>, Acc) -> + <> = Rest, + ReservedId = integer_to_binary(ReservedId0), + parse_location_report_extra( + Rest2, + Acc#{ReservedId => base64:encode(Data)} + ). + +parse_bulk_location_report(0, _Binary, Acc) -> + lists:reverse(Acc); +parse_bulk_location_report(Count, <>, Acc) -> + <> = Rest, + parse_bulk_location_report(Count - 1, Rest2, [parse_location_report(Data) | Acc]). + +parse_can_data(0, _, Acc) -> + lists:reverse(Acc); +parse_can_data( + Count, + <>, + Acc +) -> + parse_can_data(Count - 1, Rest, [ + #{ + <<"channel">> => CanCh, + <<"frame_type">> => CanFrameType, + <<"data_method">> => CanDataMethod, + <<"id">> => CanId, + <<"data">> => base64:encode(Data) + } + | Acc + ]). + +dword_array(0, Binary, Acc) -> + {lists:reverse(Acc), Binary}; +dword_array(Count, <>, Acc) -> + dword_array(Count - 1, Rest, [Value | Acc]). + +parse_multimedia_search_result(0, _, Acc) -> + lists:reverse(Acc); +parse_multimedia_search_result( + Count, + <>, + Acc +) -> + parse_multimedia_search_result(Count - 1, Rest, [ + #{ + <<"id">> => Id, + <<"type">> => Type, + <<"channel">> => Channel, + <<"event">> => Event, + <<"location">> => parse_location_report(Location) + } + | Acc + ]). + +%%-------------------------------------------------------------------- +%% Serialize JT808 Message +%%-------------------------------------------------------------------- +serialize(Json) -> + Header = maps:get(<<"header">>, Json), + Body = + case maps:is_key(<<"body">>, Json) of + true -> maps:get(<<"body">>, Json); + false -> <<>> + end, + BodyStream = serialize_body(maps:get(<<"msg_id">>, Header), Body), + %% TODO: encrypt body here + Header2 = maps:put(<<"len">>, size(BodyStream), Header), + HeaderStream = serialize_header(Header2), + packet(<>). + +serialize_header( + Header = #{ + <<"msg_id">> := MsgId, + <<"encrypt">> := Encrypt, + <<"len">> := Length, + <<"phone">> := Phone, + <<"msg_sn">> := MsgSn + } +) -> + PhoneBCD = to_bcd(Phone, 6), + {Fragment, Total, Seq} = + case maps:is_key(<<"frag_total">>, Header) of + true -> {1, maps:get(<<"frag_total">>, Header), maps:get(<<"frag_sn">>, Header)}; + false -> {0, 0, 0} + end, + Binary = + <>, + case Fragment of + 0 -> Binary; + 1 -> <> + end. + +serialize_body(?MS_GENERAL_RESPONSE, Body) -> + Seq = maps:get(<<"seq">>, Body), + Id = maps:get(<<"id">>, Body), + Result = maps:get(<<"result">>, Body), + <>; +serialize_body(?MS_REQUEST_FRAGMENT, Body) -> + Seq = maps:get(<<"seq">>, Body), + Length = maps:get(<<"length">>, Body), + Ids = maps:get(<<"ids">>, Body), + LastStream = encode_word_array(Length, Ids, <<>>), + <>; +serialize_body(?MS_REGISTER_ACK, Body) -> + Seq = maps:get(<<"seq">>, Body), + %% XXX: replaced by maroc? + Result = maps:get(<<"result">>, Body), + case maps:is_key(<<"auth_code">>, Body) of + true -> + Code = maps:get(<<"auth_code">>, Body), + <>; + false -> + %% If the terminal regiter failed, it don't contain auth code + <> + end; +serialize_body(?MS_SET_CLIENT_PARAM, Body) -> + Length = maps:get(<<"length">>, Body), + ParamList = maps:get(<<"params">>, Body), + serialize_client_param(<>, ParamList); +serialize_body(?MS_QUERY_CLIENT_ALL_PARAM, _Body) -> + <<>>; +serialize_body(?MS_QUERY_CLIENT_PARAM, Body) -> + Length = maps:get(<<"length">>, Body), + List = maps:get(<<"ids">>, Body), + encode_dword_array(Length, List, <>); +serialize_body(?MS_CLIENT_CONTROL, Body) -> + Command = maps:get(<<"command">>, Body), + Param = maps:get(<<"param">>, Body), + <>; +serialize_body(?MS_QUERY_CLIENT_ATTRIB, _Body) -> + <<>>; +serialize_body(?MS_OTA, Body) -> + %% TODO: OTA in this way? + Type = maps:get(<<"type">>, Body), + Manuf = maps:get(<<"manufacturer">>, Body), + VerLength = maps:get(<<"ver_len">>, Body), + Version = maps:get(<<"version">>, Body), + FwLen = maps:get(<<"fw_len">>, Body), + Firmware = maps:get(<<"firmware">>, Body), + <>; +serialize_body(?MS_QUERY_LOCATION, _Body) -> + <<>>; +serialize_body(?MS_TRACE_LOCATION, Body) -> + Period = maps:get(<<"period">>, Body), + Expiry = maps:get(<<"expiry">>, Body), + <>; +serialize_body(?MS_CONFIRM_ALARM, Body) -> + Seq = maps:get(<<"seq">>, Body), + Type = maps:get(<<"type">>, Body), + <>; +serialize_body(?MS_SEND_TEXT, Body) -> + Flag = maps:get(<<"flag">>, Body), + Text = maps:get(<<"text">>, Body), + <>; +serialize_body(?MS_SET_EVENT, Body) -> + Type = maps:get(<<"type">>, Body), + %% FIXME: If the type is 0, the length and events is empty + Length = maps:get(<<"length">>, Body), + Events = maps:get(<<"events">>, Body), + serialize_events(Events, <>); +serialize_body(?MS_SEND_QUESTION, Body) -> + Flag = maps:get(<<"flag">>, Body), + Length = maps:get(<<"length">>, Body), + Question = maps:get(<<"question">>, Body), + Answers = maps:get(<<"answers">>, Body), + serialize_candidate_answers(Answers, <>); +serialize_body(?MS_SET_MENU, Body) -> + %% XXX: If the tpye is delete all menu, the remaining bytes should be drop? + Type = maps:get(<<"type">>, Body), + Length = maps:get(<<"length">>, Body), + Menus = maps:get(<<"menus">>, Body), + serialize_menus(Menus, <>); +serialize_body(?MS_INFO_CONTENT, Body) -> + Type = maps:get(<<"type">>, Body), + Length = maps:get(<<"length">>, Body), + Info = maps:get(<<"info">>, Body), + <>; +serialize_body(?MS_PHONE_CALLBACK, Body) -> + Type = maps:get(<<"type">>, Body), + Phone = maps:get(<<"phone">>, Body), + <>; +serialize_body(?MS_SET_PHONE_NUMBER, Body) -> + Type = maps:get(<<"type">>, Body), + Length = maps:get(<<"length">>, Body), + Contacts = maps:get(<<"contacts">>, Body), + serialize_contacts(Contacts, <>); +serialize_body(?MS_VEHICLE_CONTROL, Body) -> + Flag = maps:get(<<"flag">>, Body), + <>; +serialize_body(?MS_SET_CIRCLE_AREA, Body) -> + Type = maps:get(<<"type">>, Body), + Length = maps:get(<<"length">>, Body), + Areas = maps:get(<<"areas">>, Body), + serialize_circle_area(Areas, <>); +serialize_body(?MS_DEL_CIRCLE_AREA, Body) -> + Length = maps:get(<<"length">>, Body), + Ids = maps:get(<<"ids">>, Body), + encode_dword_array(Length, Ids, <>); +serialize_body(?MS_SET_RECT_AREA, Body) -> + Type = maps:get(<<"type">>, Body), + Length = maps:get(<<"length">>, Body), + Areas = maps:get(<<"areas">>, Body), + serialize_rect_area(Areas, <>); +serialize_body(?MS_DEL_RECT_AREA, Body) -> + Length = maps:get(<<"length">>, Body), + Ids = maps:get(<<"ids">>, Body), + encode_dword_array(Length, Ids, <>); +serialize_body(?MS_SET_POLY_AREA, Body) -> + Id = maps:get(<<"id">>, Body), + Flag = maps:get(<<"flag">>, Body), + StartTime = maps:get(<<"start_time">>, Body), + EndTime = maps:get(<<"end_time">>, Body), + MaxSpeed = maps:get(<<"max_speed">>, Body), + Overspeed = maps:get(<<"overspeed_duration">>, Body), + Length = maps:get(<<"length">>, Body), + Points = maps:get(<<"points">>, Body), + StartBCD = to_bcd(StartTime, 6), + EndBCD = to_bcd(EndTime, 6), + serialize_poly_point( + Length, + Points, + <> + ); +serialize_body(?MS_DEL_POLY_AREA, Body) -> + Length = maps:get(<<"length">>, Body), + Ids = maps:get(<<"ids">>, Body), + encode_dword_array(Length, Ids, <>); +serialize_body(?MS_SET_PATH, Body) -> + Id = maps:get(<<"id">>, Body), + Flag = maps:get(<<"flag">>, Body), + StartTime = maps:get(<<"start_time">>, Body), + EndTime = maps:get(<<"end_time">>, Body), + Length = maps:get(<<"length">>, Body), + Points = maps:get(<<"points">>, Body), + StartBCD = to_bcd(StartTime, 6), + EndBCD = to_bcd(EndTime, 6), + serialize_corner_point( + Length, Points, <> + ); +serialize_body(?MS_DEL_PATH, Body) -> + Length = maps:get(<<"length">>, Body), + Ids = maps:get(<<"ids">>, Body), + encode_dword_array(Length, Ids, <>); +serialize_body(?MS_DRIVE_RECORD_CAPTURE, Body) -> + Command = maps:get(<<"command">>, Body), + Param = maps:get(<<"param">>, Body), + RawParam = base64:decode(Param), + <>; +serialize_body(?MS_DRIVE_REC_PARAM_SEND, Body) -> + Command = maps:get(<<"command">>, Body), + Param = maps:get(<<"param">>, Body), + RawParam = base64:decode(Param), + <>; +serialize_body(?MS_REQ_DRIVER_ID, _Body) -> + <<>>; +serialize_body(?MS_MULTIMEDIA_DATA_ACK, Body) -> + MmId = maps:get(<<"mm_id">>, Body), + Length = maps:get(<<"length">>, Body), + RetxIds = maps:get(<<"retx_ids">>, Body), + encode_word_array(Length, RetxIds, <>); +serialize_body(?MS_CAMERA_SHOT, Body) -> + ChId = maps:get(<<"channel_id">>, Body), + Command = maps:get(<<"command">>, Body), + Period = maps:get(<<"period">>, Body), + Save = maps:get(<<"save">>, Body), + Resolution = maps:get(<<"resolution">>, Body), + Quality = maps:get(<<"quality">>, Body), + Bright = maps:get(<<"bright">>, Body), + Contrast = maps:get(<<"contrast">>, Body), + Saturate = maps:get(<<"saturate">>, Body), + Chromaticity = maps:get(<<"chromaticity">>, Body), + <>; +serialize_body(?MS_MM_DATA_SEARCH, Body) -> + Type = maps:get(<<"type">>, Body), + Channel = maps:get(<<"channel">>, Body), + Event = maps:get(<<"event">>, Body), + StartTime = maps:get(<<"start_time">>, Body), + EndTime = maps:get(<<"end_time">>, Body), + StartBCD = to_bcd(StartTime, 6), + EndBCD = to_bcd(EndTime, 6), + <>; +serialize_body(?MS_MM_DATA_UPLOAD, Body) -> + Type = maps:get(<<"type">>, Body), + ChId = maps:get(<<"channel">>, Body), + Event = maps:get(<<"event">>, Body), + Start = maps:get(<<"start_time">>, Body), + End = maps:get(<<"end_time">>, Body), + Delete = maps:get(<<"delete">>, Body), + StartBCD = to_bcd(Start, 6), + EndBCD = to_bcd(End, 6), + <>; +serialize_body(?MS_VOICE_RECORD, Body) -> + Command = maps:get(<<"command">>, Body), + Time = maps:get(<<"time">>, Body), + Save = maps:get(<<"save">>, Body), + Rate = maps:get(<<"rate">>, Body), + <>; +serialize_body(?MS_SINGLE_MM_DATA_CTRL, Body) -> + Id = maps:get(<<"id">>, Body), + Flag = maps:get(<<"flag">>, Body), + <>; +serialize_body(?MS_SEND_TRANSPARENT_DATA, Body) -> + Type = maps:get(<<"type">>, Body), + DataBase64 = maps:get(<<"data">>, Body), + Data = base64:decode(DataBase64), + <>; +serialize_body(?MS_RSA_KEY, Body) -> + E = maps:get(<<"e">>, Body), + N = maps:get(<<"n">>, Body), + <>; +serialize_body(_UnkonwnMsgId, _Body) -> + {error, invalid_input}. + +serialize_corner_point(0, [], Acc) -> + Acc; +serialize_corner_point( + Count, + [ + #{ + <<"point_id">> := PointId, + <<"path_id">> := PathId, + <<"point_lat">> := Lat, + <<"point_lng">> := Lng, + <<"width">> := Width, + <<"attrib">> := Attrib, + <<"passed">> := Passed, + <<"uncovered">> := Uncovered, + <<"max_speed">> := MaxSpeed, + <<"overspeed_duration">> := Overspeed + } + | T + ], + Acc +) -> + serialize_corner_point( + Count - 1, + T, + <> + ). + +serialize_poly_point(0, _, Acc) -> + Acc; +serialize_poly_point(Count, [#{<<"lat">> := Lat, <<"lng">> := Lng} | T], Acc) -> + serialize_poly_point(Count - 1, T, <>). + +serialize_rect_area([], Acc) -> + Acc; +serialize_rect_area( + [ + #{ + <<"id">> := Id, + <<"flag">> := Flag, + <<"lt_lat">> := LtLatitude, + <<"lt_lng">> := LtLongitude, + <<"rb_lat">> := RbLatitude, + <<"rb_lng">> := RbLongitude, + <<"start_time">> := StartTime, + <<"end_time">> := EndTime, + <<"max_speed">> := MaxSpeed, + <<"overspeed_duration">> := Overspeed + } + | T + ], + Acc +) -> + StartBCD = to_bcd(StartTime, 6), + EndBCD = to_bcd(EndTime, 6), + serialize_rect_area( + T, + <> + ). + +serialize_circle_area([], Acc) -> + Acc; +serialize_circle_area( + [ + H = #{ + <<"id">> := Id, + <<"flag">> := Flag, + <<"center_latitude">> := Latitude, + <<"center_longitude">> := Longitude, + <<"radius">> := Radius + } + | T + ], + Acc +) -> + First = <>, + Second = + case maps:is_key(<<"start_time">>, H) of + true -> + #{<<"start_time">> := StartTime, <<"end_time">> := EndTime} = H, + StartBCD = to_bcd(StartTime, 6), + EndBCD = to_bcd(EndTime, 6), + <>; + false -> + First + end, + Third = + case maps:is_key(<<"max_speed">>, H) of + true -> + #{<<"max_speed">> := MaxSpeed, <<"overspeed_duration">> := Overspeed} = H, + <>; + false -> + Second + end, + serialize_circle_area(T, Third). + +serialize_contacts([], Acc) -> + Acc; +serialize_contacts( + [ + #{ + <<"type">> := Type, + <<"phone_len">> := PhoneLen, + <<"phone">> := Phone, + <<"name_len">> := NameLen, + <<"name">> := Name + } + | T + ], + Acc +) -> + serialize_contacts( + T, <> + ). + +serialize_menus([], Acc) -> + Acc; +serialize_menus([#{<<"type">> := Type, <<"length">> := Length, <<"info">> := Info} | T], Acc) -> + serialize_menus(T, <>). + +serialize_candidate_answers([], Acc) -> + Acc; +serialize_candidate_answers([#{<<"id">> := Id, <<"len">> := Len, <<"answer">> := Answer} | T], Acc) -> + serialize_candidate_answers(T, <>). + +serialize_events([], Acc) -> + Acc; +serialize_events([#{<<"id">> := Id, <<"length">> := Len, <<"content">> := Content} | T], Acc) -> + serialize_events(T, <>). + +serialize_client_param(Acc, []) -> + Acc; +serialize_client_param(Acc, [#{<<"id">> := Id, <<"value">> := Value} | T]) -> + NewAcc = encode_client_param(Id, Value, Acc), + serialize_client_param(NewAcc, T). + +encode_client_param(Id, Value, Acc) -> + case client_param_data_type(Id) of + dword -> encode_cp_dword(Id, Value, Acc); + word -> encode_cp_word(Id, Value, Acc); + byte -> encode_cp_byte(Id, Value, Acc); + string -> encode_cp_string(Id, Value, Acc); + reserved -> encode_cp_reserved(Id, Value, Acc) + end. + +client_param_data_type(?CP_HEARTBEAT_DURATION) -> dword; +client_param_data_type(?CP_TCP_TIMEOUT) -> dword; +client_param_data_type(?CP_TCP_RETX) -> dword; +client_param_data_type(?CP_UDP_TIMEOUT) -> dword; +client_param_data_type(?CP_UDP_RETX) -> dword; +client_param_data_type(?CP_SMS_TIMEOUT) -> dword; +client_param_data_type(?CP_SMS_RETX) -> dword; +client_param_data_type(?CP_SERVER_APN) -> string; +client_param_data_type(?CP_DIAL_USERNAME) -> string; +client_param_data_type(?CP_DIAL_PASSWORD) -> string; +client_param_data_type(?CP_SERVER_ADDRESS) -> string; +client_param_data_type(?CP_BACKUP_SERVER_APN) -> string; +client_param_data_type(?CP_BACKUP_DIAL_USERNAME) -> string; +client_param_data_type(?CP_BACKUP_DIAL_PASSWORD) -> string; +client_param_data_type(?CP_BACKUP_SERVER_ADDRESS) -> string; +client_param_data_type(?CP_SERVER_TCP_PORT) -> dword; +client_param_data_type(?CP_SERVER_UDP_PORT) -> dword; +client_param_data_type(?CP_IC_CARD_SERVER_ADDRESS) -> string; +client_param_data_type(?CP_IC_CARD_SERVER_TCP_PORT) -> dword; +client_param_data_type(?CP_IC_CARD_SERVER_UDP_PORT) -> dword; +client_param_data_type(?CP_IC_CARD_BACKUP_SERVER_ADDRESS) -> string; +client_param_data_type(?CP_POS_REPORT_POLICY) -> dword; +client_param_data_type(?CP_POS_REPORT_CONTROL) -> dword; +client_param_data_type(?CP_DRIVER_NLOGIN_REPORT_INTERVAL) -> dword; +client_param_data_type(?CP_REPORT_INTERVAL_DURING_SLEEP) -> dword; +client_param_data_type(?CP_EMERGENCY_ALARM_REPORT_INTERVAL) -> dword; +client_param_data_type(?CP_DEFAULT_REPORT_INTERVAL) -> dword; +client_param_data_type(?CP_DEFAULT_DISTANCE_INTERVAL) -> dword; +client_param_data_type(?CP_DRIVER_NLOGIN_DISTANCE_INTERVAL) -> dword; +client_param_data_type(?CP_DISTANCE_INTERVAL_DURING_SLEEP) -> dword; +client_param_data_type(?CP_EMERGENCY_ALARM_DISTANCE_INTERVAL) -> dword; +client_param_data_type(?CP_SET_TURN_ANGLE) -> dword; +client_param_data_type(?CP_EFENCE_RADIUS) -> word; +client_param_data_type(?CP_MONITOR_PHONE) -> string; +client_param_data_type(?CP_RESETING_PHONE) -> string; +client_param_data_type(?CP_RECOVERY_PHONE) -> string; +client_param_data_type(?CP_SMS_MONITOR_PHONE) -> string; +client_param_data_type(?CP_EMERGENCY_SMS_PHONE) -> string; +client_param_data_type(?CP_ACCEPT_CALL_POLICY) -> dword; +client_param_data_type(?CP_MAX_CALL_DURATION) -> dword; +client_param_data_type(?CP_MAX_CALL_DURATION_OF_MONTH) -> dword; +client_param_data_type(?CP_SPY_PHONE) -> string; +client_param_data_type(?CP_PRIVILEGE_SMS_PHONE) -> string; +client_param_data_type(?CP_ALARM_MASK) -> dword; +client_param_data_type(?CP_ALARM_SEND_SMS_MASK) -> dword; +client_param_data_type(?CP_ALARM_CAMERA_SHOT_MASK) -> dword; +client_param_data_type(?CP_ALARM_PICTURE_SAVE_MASK) -> dword; +client_param_data_type(?CP_ALARM_KEY_MASK) -> dword; +client_param_data_type(?CP_MAX_SPEED) -> dword; +client_param_data_type(?CP_OVERSPEED_ELAPSED) -> dword; +client_param_data_type(?CP_CONT_DRIVE_THRESHOLD) -> dword; +client_param_data_type(?CP_ACC_DRIVE_TIME_ONE_DAY_THRESHOLD) -> dword; +client_param_data_type(?CP_MIN_BREAK_TIME) -> dword; +client_param_data_type(?CP_MAX_PARK_TIME) -> dword; +client_param_data_type(?CP_OVERSPEED_ALARM_DELTA) -> word; +client_param_data_type(?CP_DRIVER_FATIGUE_ALARM_DELTA) -> word; +client_param_data_type(?CP_SET_CRASH_ALARM_PARAM) -> word; +client_param_data_type(?CP_SET_ROLLOVER_PARAM) -> word; +client_param_data_type(?CP_TIME_CONTROLED_CAMERA) -> dword; +client_param_data_type(?CP_DISTANCE_CONTROLED_CAMERA) -> dword; +client_param_data_type(?CP_PICTURE_QUALITY) -> dword; +client_param_data_type(?CP_PICTURE_BRIGHTNESS) -> dword; +client_param_data_type(?CP_PICTURE_CONTRAST) -> dword; +client_param_data_type(?CP_PICTURE_SATURATE) -> dword; +client_param_data_type(?CP_PICTURE_CHROMATICITY) -> dword; +client_param_data_type(?CP_ODOMETER) -> dword; +client_param_data_type(?CP_REGISTERED_PROVINCE) -> word; +client_param_data_type(?CP_REGISTERED_CITY) -> word; +client_param_data_type(?CP_VEHICLE_LICENSE_NUMBER) -> string; +client_param_data_type(?CP_VEHICLE_LICENSE_PLATE_COLOR) -> byte; +client_param_data_type(?CP_GNSS_MODE) -> byte; +client_param_data_type(?CP_GNSS_BAUDRATE) -> byte; +client_param_data_type(?CP_GNSS_OUTPUT_RATE) -> byte; +client_param_data_type(?CP_GNSS_SAMPLING_RATE) -> dword; +client_param_data_type(?CP_GNSS_UPLOAD_MODE) -> byte; +client_param_data_type(?CP_GNSS_UPLOAD_UNIT) -> dword; +client_param_data_type(?CP_CAN_BUS_CH1_SAMPLING) -> dword; +client_param_data_type(?CP_CAN_BUS_CH1_UPLOAD) -> word; +client_param_data_type(?CP_CAN_BUS_CH2_SAMPLING) -> dword; +client_param_data_type(?CP_CAN_BUS_CH2_UPLOAD) -> word; +client_param_data_type(?CP_SET_CAN_BUS_ID_PARAM) -> string; +client_param_data_type(_) -> reserved. + +-spec encode_cp_byte(integer(), integer(), binary()) -> binary(). +encode_cp_byte(Id, Value, Acc) -> + <>. + +-spec encode_cp_word(integer(), integer(), binary()) -> binary(). +encode_cp_word(Id, Value, Acc) -> + <>. + +-spec encode_cp_dword(integer(), integer(), binary()) -> binary(). +encode_cp_dword(Id, Value, Acc) -> + <>. + +-spec encode_cp_string(integer(), binary(), binary()) -> binary(). +encode_cp_string(Id, StringBinary, Acc) -> + Length = size(StringBinary), + <>. + +-spec encode_cp_reserved(integer(), binary(), binary()) -> binary(). +encode_cp_reserved(Id, Base64Binary0, Acc) -> + Binary = base64:decode(Base64Binary0), + Length = size(Binary), + <>. + +packet(Binary) -> + packet2(Binary, undefined, <<16#7e:?BYTE>>). + +packet2(<<>>, Check, Acc) -> + Stream = pack(Acc, Check), + <>; +packet2(<>, Check, Acc) -> + NewCheck = cal_xor(C, Check), + packet2(Rest, NewCheck, pack(Acc, C)). + +pack(Stream, 16#7e) -> + <>; +pack(Stream, 16#7d) -> + <>; +pack(Stream, C) -> + <>. + +encode_word_array(0, _, Acc) -> + Acc; +encode_word_array(Count, [H | T], Acc) -> + encode_word_array(Count - 1, T, <>). + +encode_dword_array(0, _, Acc) -> + Acc; +encode_dword_array(Count, [H | T], Acc) -> + encode_dword_array(Count - 1, T, <>). + +from_bcd(<<>>, Acc) -> + list_to_binary(Acc); +from_bcd(<>, Acc) -> + from_bcd(Rest, Acc ++ [$0 + N1, $0 + N2]). + +to_bcd(String, BCDMaxSize) -> + StringSize = size(String), + Prefix = + case StringSize < BCDMaxSize of + true -> padding_zero(BCDMaxSize * 2 - StringSize, <<>>); + false -> <<>> + end, + encode_bcd(String, Prefix). + +padding_zero(0, Acc) -> + Acc; +padding_zero(Count, Acc) -> + padding_zero(Count - 1, <>). + +encode_bcd(<<>>, Acc) -> + Acc; +encode_bcd(<>, Acc) -> + C = H - $0, + encode_bcd(Rest, <>). + +check(Bin) -> + case check(Bin, undefined) of + true -> + Size = size(Bin) - 1, + <> = Bin, + Msg; + false -> + {error, invalid_message} + end. + +check(<<>>, _) -> + false; +check(<<_Byte:8>>, undefined) -> + false; +check(<>, XorValue) -> + Byte == XorValue; +check(<>, undefined) -> + check(Rest, Byte); +check(<>, XorValue) -> + check(Rest, Byte bxor XorValue). + +cal_xor(C, undefined) -> + C; +cal_xor(C, XorValue) -> + C bxor XorValue. diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_schema.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_schema.erl new file mode 100644 index 000000000..35d8f962d --- /dev/null +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_schema.erl @@ -0,0 +1,119 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_jt808_schema). + +-include("emqx_jt808.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("typerefl/include/types.hrl"). + +-export([fields/1, desc/1]). + +-define(NOT_EMPTY(MSG), emqx_resource_validator:not_empty(MSG)). + +fields(jt808) -> + [ + {frame, sc(ref(jt808_frame))}, + {proto, sc(ref(jt808_proto))}, + {mountpoint, emqx_gateway_schema:mountpoint(?DEFAULT_MOUNTPOINT)}, + {retry_interval, + sc( + emqx_schema:duration_ms(), + #{ + default => <<"8s">>, + desc => ?DESC(retry_interval) + } + )}, + {max_retry_times, + sc( + non_neg_integer(), + #{ + default => 3, + desc => ?DESC(max_retry_times) + } + )}, + {message_queue_len, + sc( + non_neg_integer(), + #{ + default => 10, + desc => ?DESC(message_queue_len) + } + )}, + {listeners, sc(ref(emqx_gateway_schema, tcp_listeners), #{desc => ?DESC(tcp_listeners)})} + ] ++ emqx_gateway_schema:gateway_common_options(); +fields(jt808_frame) -> + [ + {max_length, fun jt808_frame_max_length/1} + ]; +fields(jt808_proto) -> + [ + {allow_anonymous, fun allow_anonymous/1}, + {registry, fun registry_url/1}, + {authentication, fun authentication_url/1}, + {up_topic, fun up_topic/1}, + {dn_topic, fun dn_topic/1} + ]. + +jt808_frame_max_length(type) -> non_neg_integer(); +jt808_frame_max_length(desc) -> ?DESC(?FUNCTION_NAME); +jt808_frame_max_length(default) -> 8192; +jt808_frame_max_length(required) -> false; +jt808_frame_max_length(_) -> undefined. + +allow_anonymous(type) -> boolean(); +allow_anonymous(desc) -> ?DESC(?FUNCTION_NAME); +allow_anonymous(default) -> true; +allow_anonymous(required) -> false; +allow_anonymous(_) -> undefined. + +registry_url(type) -> binary(); +registry_url(desc) -> ?DESC(?FUNCTION_NAME); +registry_url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")]; +registry_url(required) -> false; +registry_url(_) -> undefined. + +authentication_url(type) -> binary(); +authentication_url(desc) -> ?DESC(?FUNCTION_NAME); +authentication_url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")]; +authentication_url(required) -> false; +authentication_url(_) -> undefined. + +up_topic(type) -> binary(); +up_topic(desc) -> ?DESC(?FUNCTION_NAME); +up_topic(default) -> ?DEFAULT_UP_TOPIC; +up_topic(validator) -> [?NOT_EMPTY("the value of the field 'up_topic' cannot be empty")]; +up_topic(required) -> true; +up_topic(_) -> undefined. + +dn_topic(type) -> binary(); +dn_topic(desc) -> ?DESC(?FUNCTION_NAME); +dn_topic(default) -> ?DEFAULT_DN_TOPIC; +dn_topic(validator) -> [?NOT_EMPTY("the value of the field 'dn_topic' cannot be empty")]; +dn_topic(required) -> true; +dn_topic(_) -> undefined. + +desc(jt808) -> + "The JT/T 808 protocol gateway provides EMQX with the ability to access JT/T 808 protocol devices."; +desc(jt808_frame) -> + "Limits for the JT/T 808 frames."; +desc(jt808_proto) -> + "The JT/T 808 protocol options."; +desc(_) -> + undefined. + +%%-------------------------------------------------------------------- +%% internal functions + +sc(Type) -> + sc(Type, #{}). + +sc(Type, Meta) -> + hoconsc:mk(Type, Meta). + +ref(StructName) -> + ref(?MODULE, StructName). + +ref(Mod, Field) -> + hoconsc:ref(Mod, Field). diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index 190ef1afa..ccb61d762 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -128,6 +128,7 @@ emqx_audit, emqx_gateway_gbt32960, emqx_gateway_ocpp, + emqx_gateway_jt808, emqx_bridge_syskeeper ], %% must always be of type `load' diff --git a/mix.exs b/mix.exs index 23cef80e3..8a0f93dd6 100644 --- a/mix.exs +++ b/mix.exs @@ -218,6 +218,7 @@ defmodule EMQXUmbrella.MixProject do :emqx_audit, :emqx_gateway_gbt32960, :emqx_gateway_ocpp, + :emqx_gateway_jt808, :emqx_bridge_syskeeper ]) end diff --git a/rebar.config.erl b/rebar.config.erl index 93ad9fa99..6bb2fb985 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -113,6 +113,7 @@ is_community_umbrella_app("apps/emqx_dashboard_sso") -> false; is_community_umbrella_app("apps/emqx_audit") -> false; is_community_umbrella_app("apps/emqx_gateway_gbt32960") -> false; is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false; +is_community_umbrella_app("apps/emqx_gateway_jt808") -> false; is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false; is_community_umbrella_app(_) -> true. diff --git a/rel/i18n/emqx_gateway_schema.hocon b/rel/i18n/emqx_gateway_schema.hocon index 2f0a012f2..1a04d57ac 100644 --- a/rel/i18n/emqx_gateway_schema.hocon +++ b/rel/i18n/emqx_gateway_schema.hocon @@ -44,7 +44,7 @@ gateway_common_listener_enable.desc: """Enable the listener.""" gateway_common_listener_enable_authn.desc: -"""Set true (default) to enable client authentication on this listener. +"""Set true (default) to enable client authentication on this listener. When set to false clients will be allowed to connect without authentication.""" gateway_common_listener_max_conn_rate.desc: @@ -62,9 +62,10 @@ then the client actually subscribes to the topic `some_tenant/t`. Similarly, if another client B (connected to the same listener as the client A) sends a message to topic `t`, the message is routed to all the clients subscribed `some_tenant/t`, so client A will receive the message, with topic name `t`. Set to `""` to disable the feature. -Variables in mountpoint string:
+Supported placeholders in mountpoint string:
- ${clientid}: clientid
- - ${username}: username""" + - ${username}: username
+ - ${endpoint_name}: endpoint name""" listener_name_to_settings_map.desc: """A map from listener names to listener settings.""" diff --git a/rel/i18n/emqx_jt808_schema.hocon b/rel/i18n/emqx_jt808_schema.hocon new file mode 100644 index 000000000..cd853df4d --- /dev/null +++ b/rel/i18n/emqx_jt808_schema.hocon @@ -0,0 +1,30 @@ +emqx_jt808_schema { + +jt808_frame_max_length.desc: +"""The maximum length of the JT/T 808 frame.""" + +jt808_allow_anonymous.desc: +"""Allow anonymous access to the JT/T 808 Gateway.""" + +registry_url.desc +"""The JT/T 808 device registry central URL.""" + +authentication_url.desc +"""The JT/T 808 device authentication central URL.""" + +jt808_up_topic.desc +"""The topic of the JT/T 808 protocol upstream message.""" + +jt808_dn_topic.desc +"""The topic of the JT/T 808 protocol downstream message.""" + +retry_interval.desc: +"""Re-send time interval""" + +max_retry_times.desc: +"""Re-send max times""" + +message_queue_len.desc: +"""Max message queue length""" + +}