diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl index 5708efe55..2cafb9022 100644 --- a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl @@ -228,9 +228,8 @@ do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) -> #{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame, case emqx_jt808_auth:register(Frame, Channel0#channel.auth) of {ok, Authcode} -> - Channel = enrich_clientinfo( - Frame, enrich_conninfo(Frame, Channel0#channel{authcode = Authcode}) - ), + {ok, Conninfo} = enrich_conninfo(Frame, Channel0#channel{authcode = Authcode}), + {ok, Channel} = enrich_clientinfo(Frame, Conninfo), handle_out({?MS_REGISTER_ACK, 0}, MsgSn, Channel); {error, Reason} -> ?SLOG(error, #{msg => "register_failed", reason => Reason}), @@ -241,27 +240,22 @@ do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) -> end, handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel0) end; -do_handle_in(Frame = ?MSG(?MC_AUTH), Channel0) -> - #{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame, - Channel = - #channel{clientinfo = #{clientid := ClientId}} = - enrich_clientinfo(Frame, enrich_conninfo(Frame, Channel0)), - authack( - case authenticate(Frame, Channel0) of - true -> - NChannel = prepare_adapter_topic(ensure_connected(Channel)), - emqx_logger:set_metadata_clientid(ClientId), - %% Auto subscribe downlink topics - autosubcribe(NChannel), - _ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel), - %% 0: Successful - {0, MsgSn, NChannel}; - false -> - ?SLOG(error, #{msg => "authenticated_failed"}), - %% 1: Failure - {1, MsgSn, Channel} - end - ); +do_handle_in(Frame = ?MSG(?MC_AUTH), Channel) -> + case + emqx_utils:pipeline( + [ + fun enrich_clientinfo/2, + fun enrich_conninfo/2, + fun set_log_meta/2, + fun auth_connect/2 + ], + Frame, + Channel + ) + of + {ok, _NFrame, NChannel} -> + _NChannel = process_connect(Frame, ensure_connected(NChannel)) + end; do_handle_in(Frame = ?MSG(?MC_HEARTBEAT), Channel) -> handle_out({?MS_GENERAL_RESPONSE, 0, ?MC_HEARTBEAT}, msgsn(Frame), Channel); do_handle_in(?MSG(?MC_RSA_KEY), Channel = #channel{rsa_key = [E, N]}) -> @@ -615,6 +609,43 @@ maybe_fix_mountpoint(ClientInfo = #{mountpoint := Mountpoint}) -> Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo), ClientInfo#{mountpoint := Mountpoint1}. +process_connect( + _Frame = #{<<"header">> := #{<<"msg_sn">> := MsgSn}}, + Channel = #channel{ + ctx = Ctx, + conninfo = ConnInfo, + clientinfo = ClientInfo + } +) -> + SessFun = fun(_, _) -> #{} end, + case + emqx_gateway_ctx:open_session( + Ctx, + true, + ClientInfo, + ConnInfo, + SessFun + ) + of + {ok, #{session := Session}} -> + NChannel = Channel#channel{session = Session}, + %% Auto subscribe downlink topics + autosubcribe(NChannel), + _ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel), + _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]), + authack({0, MsgSn, NChannel}); + {error, Reason} -> + log( + error, + #{ + msg => "failed_to_open_session", + reason => Reason + }, + Channel + ), + shutdown(Reason, Channel) + end. + ensure_connected( Channel = #channel{ ctx = Ctx, @@ -624,10 +655,12 @@ ensure_connected( ) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), - Channel#channel{ - conninfo = NConnInfo, - conn_state = connected - }. + prepare_adapter_topic( + Channel#channel{ + conninfo = NConnInfo, + conn_state = connected + } + ). %% Ensure disconnected ensure_disconnected( @@ -802,6 +835,9 @@ is_driver_id_req_exist(#channel{inflight = Inflight}) -> Key = get_msg_ack(?MC_DRIVER_ID_REPORT, none), emqx_inflight:contain(Key, Inflight). +auth_connect(Frame, Channel) -> + {ok, Channel#channel{auth = authenticate(Frame, Channel)}}. + authenticate(_AuthFrame, #channel{authcode = anonymous}) -> true; authenticate(AuthFrame, #channel{authcode = undefined, auth = Auth}) -> @@ -836,7 +872,7 @@ enrich_conninfo( receive_maximum => 0, expiry_interval => 0 }, - Channel#channel{conninfo = NConnInfo}. + {ok, Channel#channel{conninfo = NConnInfo}}. %% Register enrich_clientinfo( @@ -855,7 +891,7 @@ enrich_clientinfo( manufacturer => Manu, terminal_id => DevId }), - Channel#channel{clientinfo = NClientInfo}; + {ok, Channel#channel{clientinfo = NClientInfo}}; %% Auth enrich_clientinfo( #{<<"header">> := #{<<"phone">> := Phone}}, @@ -865,7 +901,11 @@ enrich_clientinfo( phone => Phone, clientid => Phone }, - Channel#channel{clientinfo = NClientInfo}. + {ok, Channel#channel{clientinfo = NClientInfo}}. + +set_log_meta(_Packet, #channel{clientinfo = #{clientid := ClientId}}) -> + emqx_logger:set_metadata_clientid(ClientId), + ok. prepare_adapter_topic(Channel = #channel{up_topic = UpTopic, dn_topic = DnTopic}) -> Channel#channel{ @@ -906,7 +946,7 @@ autosubcribe(#channel{ dn_topic = Topic }) -> SubOpts = #{rap => 0, nl => 0, qos => 0, rh => 0}, - emqx:subscribe(Topic, ClientId, SubOpts), + _ = emqx_broker:subscribe(Topic, ClientId, SubOpts), ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts#{is_new => true}]). start_keepalive(Secs, _Channel) when Secs > 0 ->