From 01c60221040234af77df2a312d91787154652104 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 12 Dec 2023 18:25:57 +0800 Subject: [PATCH 1/6] fix(jt808): open jt808 conn session --- .../src/emqx_jt808_channel.erl | 104 ++++++++++++------ 1 file changed, 72 insertions(+), 32 deletions(-) diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl index 5708efe55..2cafb9022 100644 --- a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl @@ -228,9 +228,8 @@ do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) -> #{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame, case emqx_jt808_auth:register(Frame, Channel0#channel.auth) of {ok, Authcode} -> - Channel = enrich_clientinfo( - Frame, enrich_conninfo(Frame, Channel0#channel{authcode = Authcode}) - ), + {ok, Conninfo} = enrich_conninfo(Frame, Channel0#channel{authcode = Authcode}), + {ok, Channel} = enrich_clientinfo(Frame, Conninfo), handle_out({?MS_REGISTER_ACK, 0}, MsgSn, Channel); {error, Reason} -> ?SLOG(error, #{msg => "register_failed", reason => Reason}), @@ -241,27 +240,22 @@ do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) -> end, handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel0) end; -do_handle_in(Frame = ?MSG(?MC_AUTH), Channel0) -> - #{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame, - Channel = - #channel{clientinfo = #{clientid := ClientId}} = - enrich_clientinfo(Frame, enrich_conninfo(Frame, Channel0)), - authack( - case authenticate(Frame, Channel0) of - true -> - NChannel = prepare_adapter_topic(ensure_connected(Channel)), - emqx_logger:set_metadata_clientid(ClientId), - %% Auto subscribe downlink topics - autosubcribe(NChannel), - _ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel), - %% 0: Successful - {0, MsgSn, NChannel}; - false -> - ?SLOG(error, #{msg => "authenticated_failed"}), - %% 1: Failure - {1, MsgSn, Channel} - end - ); +do_handle_in(Frame = ?MSG(?MC_AUTH), Channel) -> + case + emqx_utils:pipeline( + [ + fun enrich_clientinfo/2, + fun enrich_conninfo/2, + fun set_log_meta/2, + fun auth_connect/2 + ], + Frame, + Channel + ) + of + {ok, _NFrame, NChannel} -> + _NChannel = process_connect(Frame, ensure_connected(NChannel)) + end; do_handle_in(Frame = ?MSG(?MC_HEARTBEAT), Channel) -> handle_out({?MS_GENERAL_RESPONSE, 0, ?MC_HEARTBEAT}, msgsn(Frame), Channel); do_handle_in(?MSG(?MC_RSA_KEY), Channel = #channel{rsa_key = [E, N]}) -> @@ -615,6 +609,43 @@ maybe_fix_mountpoint(ClientInfo = #{mountpoint := Mountpoint}) -> Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo), ClientInfo#{mountpoint := Mountpoint1}. +process_connect( + _Frame = #{<<"header">> := #{<<"msg_sn">> := MsgSn}}, + Channel = #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo + } +) -> + SessFun = fun(_, _) -> #{} end, + case + emqx_gateway_ctx:open_session( + Ctx, + true, + ClientInfo, + ConnInfo, + SessFun + ) + of + {ok, #{session := Session}} -> + NChannel = Channel#channel{session = Session}, + %% Auto subscribe downlink topics + autosubcribe(NChannel), + _ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel), + _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]), + authack({0, MsgSn, NChannel}); + {error, Reason} -> + log( + error, + #{ + msg => "failed_to_open_session", + reason => Reason + }, + Channel + ), + shutdown(Reason, Channel) + end. + ensure_connected( Channel = #channel{ ctx = Ctx, @@ -624,10 +655,12 @@ ensure_connected( ) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), - Channel#channel{ - conninfo = NConnInfo, - conn_state = connected - }. + prepare_adapter_topic( + Channel#channel{ + conninfo = NConnInfo, + conn_state = connected + } + ). %% Ensure disconnected ensure_disconnected( @@ -802,6 +835,9 @@ is_driver_id_req_exist(#channel{inflight = Inflight}) -> Key = get_msg_ack(?MC_DRIVER_ID_REPORT, none), emqx_inflight:contain(Key, Inflight). +auth_connect(Frame, Channel) -> + {ok, Channel#channel{auth = authenticate(Frame, Channel)}}. + authenticate(_AuthFrame, #channel{authcode = anonymous}) -> true; authenticate(AuthFrame, #channel{authcode = undefined, auth = Auth}) -> @@ -836,7 +872,7 @@ enrich_conninfo( receive_maximum => 0, expiry_interval => 0 }, - Channel#channel{conninfo = NConnInfo}. + {ok, Channel#channel{conninfo = NConnInfo}}. %% Register enrich_clientinfo( @@ -855,7 +891,7 @@ enrich_clientinfo( manufacturer => Manu, terminal_id => DevId }), - Channel#channel{clientinfo = NClientInfo}; + {ok, Channel#channel{clientinfo = NClientInfo}}; %% Auth enrich_clientinfo( #{<<"header">> := #{<<"phone">> := Phone}}, @@ -865,7 +901,11 @@ enrich_clientinfo( phone => Phone, clientid => Phone }, - Channel#channel{clientinfo = NClientInfo}. + {ok, Channel#channel{clientinfo = NClientInfo}}. + +set_log_meta(_Packet, #channel{clientinfo = #{clientid := ClientId}}) -> + emqx_logger:set_metadata_clientid(ClientId), + ok. prepare_adapter_topic(Channel = #channel{up_topic = UpTopic, dn_topic = DnTopic}) -> Channel#channel{ @@ -906,7 +946,7 @@ autosubcribe(#channel{ dn_topic = Topic }) -> SubOpts = #{rap => 0, nl => 0, qos => 0, rh => 0}, - emqx:subscribe(Topic, ClientId, SubOpts), + _ = emqx_broker:subscribe(Topic, ClientId, SubOpts), ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts#{is_new => true}]). start_keepalive(Secs, _Channel) when Secs > 0 -> From d4e964f633221903ed4b29bc7b819099af95dbc4 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 12 Dec 2023 18:37:56 +0800 Subject: [PATCH 2/6] refactor: move func section --- .../emqx_gateway_jt808/test/emqx_jt808_SUITE.erl | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/emqx_gateway_jt808/test/emqx_jt808_SUITE.erl b/apps/emqx_gateway_jt808/test/emqx_jt808_SUITE.erl index 08ba9b8f8..d9200bc9e 100644 --- a/apps/emqx_gateway_jt808/test/emqx_jt808_SUITE.erl +++ b/apps/emqx_gateway_jt808/test/emqx_jt808_SUITE.erl @@ -324,6 +324,14 @@ location_report_28bytes() -> binary_to_hex_string(Data) -> lists:flatten([io_lib:format("~2.16.0B ", [X]) || <> <= Data]). +receive_msg() -> + receive + {deliver, Topic, #message{payload = Payload}} -> + {Topic, Payload} + after 100 -> + {error, timeout} + end. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%% test cases %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% t_case00_register(_) -> @@ -2677,11 +2685,3 @@ t_case34_dl_0x8805_single_mm_data_ctrl(_Config) -> {error, timeout} = gen_tcp:recv(Socket, 0, 500), ok = gen_tcp:close(Socket). - -receive_msg() -> - receive - {deliver, Topic, #message{payload = Payload}} -> - {Topic, Payload} - after 100 -> - {error, timeout} - end. From ec83ec77306c508e273b586c5849bc73347980c2 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 13 Dec 2023 09:26:04 +0800 Subject: [PATCH 3/6] fix(gw_jt808): insert channel info --- .../src/emqx_jt808_channel.erl | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl index 2cafb9022..fd83cb291 100644 --- a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl @@ -115,8 +115,8 @@ info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> ClientId; info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; -info(session, _) -> - #{}; +info(session, #channel{session = Session}) -> + Session; info(conn_state, #channel{conn_state = ConnState}) -> ConnState; info(authcode, #channel{authcode = AuthCode}) -> @@ -254,7 +254,7 @@ do_handle_in(Frame = ?MSG(?MC_AUTH), Channel) -> ) of {ok, _NFrame, NChannel} -> - _NChannel = process_connect(Frame, ensure_connected(NChannel)) + _ = process_connect(Frame, ensure_connected(NChannel)) end; do_handle_in(Frame = ?MSG(?MC_HEARTBEAT), Channel) -> handle_out({?MS_GENERAL_RESPONSE, 0, ?MC_HEARTBEAT}, msgsn(Frame), Channel); @@ -614,7 +614,7 @@ process_connect( Channel = #channel{ ctx = Ctx, conninfo = ConnInfo, - clientinfo = ClientInfo + clientinfo = ClientInfo = #{clientid := ClientId} } ) -> SessFun = fun(_, _) -> #{} end, @@ -630,9 +630,10 @@ process_connect( {ok, #{session := Session}} -> NChannel = Channel#channel{session = Session}, %% Auto subscribe downlink topics - autosubcribe(NChannel), + ok = autosubcribe(NChannel), _ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel), _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]), + _ = emqx_gateway_ctx:insert_channel_info(Ctx, ClientId, info(NChannel), undefined), authack({0, MsgSn, NChannel}); {error, Reason} -> log( @@ -655,12 +656,7 @@ ensure_connected( ) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), - prepare_adapter_topic( - Channel#channel{ - conninfo = NConnInfo, - conn_state = connected - } - ). + prepare_adapter_topic(Channel#channel{conninfo = NConnInfo, conn_state = connected}). %% Ensure disconnected ensure_disconnected( From 0b1838a5ffa5ee4f6314f1d97580c61670931269 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 13 Dec 2023 22:38:13 +0800 Subject: [PATCH 4/6] fix(gw_jt808): subscriptions and channel stats --- .../src/emqx_jt808_channel.erl | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl index fd83cb291..45b85f92e 100644 --- a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl @@ -85,6 +85,8 @@ -define(INFO_KEYS, [ctx, conninfo, zone, clientid, clientinfo, session, conn_state, authcode]). +-define(DN_TOPIC_SUBOPTS, #{rap => 0, nl => 0, qos => 0, rh => 0}). + -define(RETX_INTERVAL, 8000). -define(RETX_MAX_TIME, 5). @@ -122,8 +124,21 @@ info(conn_state, #channel{conn_state = ConnState}) -> info(authcode, #channel{authcode = AuthCode}) -> AuthCode. -stats(_Channel) -> - []. +-spec stats(channel()) -> emqx_types:stats(). +stats(#channel{inflight = Inflight, mqueue = Queue}) -> + %% XXX: A fake stats for managed by emqx_management + [ + {subscriptions_cnt, 1}, + {subscriptions_max, 1}, + {inflight_cnt, emqx_inflight:size(Inflight)}, + {inflight_max, emqx_inflight:max_size(Inflight)}, + {mqueue_len, queue:len(Queue)}, + {mqueue_max, 0}, + {mqueue_dropped, 0}, + {next_pkt_id, 0}, + {awaiting_rel_cnt, 0}, + {awaiting_rel_max, 0} + ]. %%-------------------------------------------------------------------- %% Init the Channel @@ -422,6 +437,8 @@ handle_call(kick, _From, Channel) -> disconnect_and_shutdown(kicked, ok, Channel1); handle_call(discard, _From, Channel) -> disconnect_and_shutdown(discarded, ok, Channel); +handle_call(subscriptions, _From, Channel = #channel{dn_topic = DnTopic}) -> + reply({ok, [{DnTopic, ?DN_TOPIC_SUBOPTS}]}, Channel); handle_call(Req, _From, Channel) -> log(error, #{msg => "unexpected_call", call => Req}, Channel), reply(ignored, Channel). @@ -633,7 +650,9 @@ process_connect( ok = autosubcribe(NChannel), _ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel), _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]), - _ = emqx_gateway_ctx:insert_channel_info(Ctx, ClientId, info(NChannel), undefined), + _ = emqx_gateway_ctx:insert_channel_info( + Ctx, ClientId, info(NChannel), stats(NChannel) + ), authack({0, MsgSn, NChannel}); {error, Reason} -> log( @@ -941,9 +960,10 @@ autosubcribe(#channel{ #{clientid := ClientId}, dn_topic = Topic }) -> - SubOpts = #{rap => 0, nl => 0, qos => 0, rh => 0}, - _ = emqx_broker:subscribe(Topic, ClientId, SubOpts), - ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts#{is_new => true}]). + _ = emqx_broker:subscribe(Topic, ClientId, ?DN_TOPIC_SUBOPTS), + ok = emqx_hooks:run('session.subscribed', [ + ClientInfo, Topic, ?DN_TOPIC_SUBOPTS#{is_new => true} + ]). start_keepalive(Secs, _Channel) when Secs > 0 -> self() ! {keepalive, start, round(Secs) * 1000}. From d1adcd464efcc30afa14e5dba955ac2fe10462c2 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 14 Dec 2023 02:27:14 +0800 Subject: [PATCH 5/6] fix(gw_jt808): client keepalive timer --- apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl | 2 +- apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl index 65a696a16..3f71f5a3a 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl +++ b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl @@ -416,7 +416,7 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected} log(error, #{msg => "unexpected_sock_closed", reason => Reason}, Channel), {ok, Channel}; handle_info(Info, Channel) -> - log(error, #{msg => "unexpected_info}", info => Info}, Channel), + log(error, #{msg => "unexpected_info", info => Info}, Channel), {ok, Channel}. %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl index 45b85f92e..e78d6667e 100644 --- a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl @@ -48,7 +48,7 @@ %% AuthCode authcode :: undefined | anonymous | binary(), %% Keepalive - keepalive, + keepalive :: maybe(emqx_keepalive:keepalive()), %% Msg SN msg_sn, %% Down Topic @@ -188,7 +188,7 @@ init( conn_state = idle, timers = #{}, authcode = undefined, - keepalive = maps:get(keepalive, Options, ?DEFAULT_KEEPALIVE), + keepalive = undefined, msg_sn = 0, % TODO: init rsa_key from user input dn_topic = maps:get(dn_topic, ProtoConf, ?DEFAULT_DN_TOPIC), @@ -475,6 +475,9 @@ handle_info( handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> log(error, #{msg => "unexpected_sock_closed", reason => Reason}, Channel), {ok, Channel}; +handle_info({keepalive, start, Interval}, Channel) -> + NChannel = Channel#channel{keepalive = emqx_keepalive:init(Interval)}, + {ok, ensure_timer(alive_timer, NChannel)}; handle_info(Info, Channel) -> log(error, #{msg => "unexpected_info", info => Info}, Channel), {ok, Channel}. From 944bb596c08ea90c62450b6ae65a4e5b92f98347 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 14 Dec 2023 14:51:01 +0800 Subject: [PATCH 6/6] fix(gw_jt808): make static check happy --- .../src/emqx_jt808_channel.erl | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl index e78d6667e..052065ef5 100644 --- a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl @@ -255,21 +255,27 @@ do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) -> end, handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel0) end; -do_handle_in(Frame = ?MSG(?MC_AUTH), Channel) -> +do_handle_in(Frame = ?MSG(?MC_AUTH), Channel0) -> + #{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame, case emqx_utils:pipeline( [ fun enrich_clientinfo/2, fun enrich_conninfo/2, - fun set_log_meta/2, - fun auth_connect/2 + fun set_log_meta/2 ], Frame, - Channel + Channel0 ) of - {ok, _NFrame, NChannel} -> - _ = process_connect(Frame, ensure_connected(NChannel)) + {ok, _NFrame, Channel} -> + case authenticate(Frame, Channel) of + true -> + NChannel = process_connect(Frame, ensure_connected(Channel)), + authack({0, MsgSn, NChannel}); + false -> + authack({1, MsgSn, Channel}) + end end; do_handle_in(Frame = ?MSG(?MC_HEARTBEAT), Channel) -> handle_out({?MS_GENERAL_RESPONSE, 0, ?MC_HEARTBEAT}, msgsn(Frame), Channel); @@ -630,7 +636,7 @@ maybe_fix_mountpoint(ClientInfo = #{mountpoint := Mountpoint}) -> ClientInfo#{mountpoint := Mountpoint1}. process_connect( - _Frame = #{<<"header">> := #{<<"msg_sn">> := MsgSn}}, + _Frame, Channel = #channel{ ctx = Ctx, conninfo = ConnInfo, @@ -656,7 +662,7 @@ process_connect( _ = emqx_gateway_ctx:insert_channel_info( Ctx, ClientId, info(NChannel), stats(NChannel) ), - authack({0, MsgSn, NChannel}); + NChannel; {error, Reason} -> log( error, @@ -853,9 +859,6 @@ is_driver_id_req_exist(#channel{inflight = Inflight}) -> Key = get_msg_ack(?MC_DRIVER_ID_REPORT, none), emqx_inflight:contain(Key, Inflight). -auth_connect(Frame, Channel) -> - {ok, Channel#channel{auth = authenticate(Frame, Channel)}}. - authenticate(_AuthFrame, #channel{authcode = anonymous}) -> true; authenticate(AuthFrame, #channel{authcode = undefined, auth = Auth}) ->