fix(jt808): open jt808 conn session

This commit is contained in:
JimMoen 2023-12-12 18:25:57 +08:00
parent d5d05a1701
commit 01c6022104
No known key found for this signature in database
GPG Key ID: 87A520B4F76BA86D
1 changed files with 72 additions and 32 deletions

View File

@ -228,9 +228,8 @@ do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) ->
#{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame, #{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame,
case emqx_jt808_auth:register(Frame, Channel0#channel.auth) of case emqx_jt808_auth:register(Frame, Channel0#channel.auth) of
{ok, Authcode} -> {ok, Authcode} ->
Channel = enrich_clientinfo( {ok, Conninfo} = enrich_conninfo(Frame, Channel0#channel{authcode = Authcode}),
Frame, enrich_conninfo(Frame, Channel0#channel{authcode = Authcode}) {ok, Channel} = enrich_clientinfo(Frame, Conninfo),
),
handle_out({?MS_REGISTER_ACK, 0}, MsgSn, Channel); handle_out({?MS_REGISTER_ACK, 0}, MsgSn, Channel);
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{msg => "register_failed", reason => Reason}), ?SLOG(error, #{msg => "register_failed", reason => Reason}),
@ -241,27 +240,22 @@ do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) ->
end, end,
handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel0) handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel0)
end; end;
do_handle_in(Frame = ?MSG(?MC_AUTH), Channel0) -> do_handle_in(Frame = ?MSG(?MC_AUTH), Channel) ->
#{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame, case
Channel = emqx_utils:pipeline(
#channel{clientinfo = #{clientid := ClientId}} = [
enrich_clientinfo(Frame, enrich_conninfo(Frame, Channel0)), fun enrich_clientinfo/2,
authack( fun enrich_conninfo/2,
case authenticate(Frame, Channel0) of fun set_log_meta/2,
true -> fun auth_connect/2
NChannel = prepare_adapter_topic(ensure_connected(Channel)), ],
emqx_logger:set_metadata_clientid(ClientId), Frame,
%% Auto subscribe downlink topics Channel
autosubcribe(NChannel), )
_ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel), of
%% 0: Successful {ok, _NFrame, NChannel} ->
{0, MsgSn, NChannel}; _NChannel = process_connect(Frame, ensure_connected(NChannel))
false -> end;
?SLOG(error, #{msg => "authenticated_failed"}),
%% 1: Failure
{1, MsgSn, Channel}
end
);
do_handle_in(Frame = ?MSG(?MC_HEARTBEAT), Channel) -> do_handle_in(Frame = ?MSG(?MC_HEARTBEAT), Channel) ->
handle_out({?MS_GENERAL_RESPONSE, 0, ?MC_HEARTBEAT}, msgsn(Frame), Channel); handle_out({?MS_GENERAL_RESPONSE, 0, ?MC_HEARTBEAT}, msgsn(Frame), Channel);
do_handle_in(?MSG(?MC_RSA_KEY), Channel = #channel{rsa_key = [E, N]}) -> do_handle_in(?MSG(?MC_RSA_KEY), Channel = #channel{rsa_key = [E, N]}) ->
@ -615,6 +609,43 @@ maybe_fix_mountpoint(ClientInfo = #{mountpoint := Mountpoint}) ->
Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo), Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
ClientInfo#{mountpoint := Mountpoint1}. ClientInfo#{mountpoint := Mountpoint1}.
process_connect(
_Frame = #{<<"header">> := #{<<"msg_sn">> := MsgSn}},
Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
SessFun = fun(_, _) -> #{} end,
case
emqx_gateway_ctx:open_session(
Ctx,
true,
ClientInfo,
ConnInfo,
SessFun
)
of
{ok, #{session := Session}} ->
NChannel = Channel#channel{session = Session},
%% Auto subscribe downlink topics
autosubcribe(NChannel),
_ = start_keepalive(?DEFAULT_KEEPALIVE, NChannel),
_ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]),
authack({0, MsgSn, NChannel});
{error, Reason} ->
log(
error,
#{
msg => "failed_to_open_session",
reason => Reason
},
Channel
),
shutdown(Reason, Channel)
end.
ensure_connected( ensure_connected(
Channel = #channel{ Channel = #channel{
ctx = Ctx, ctx = Ctx,
@ -624,10 +655,12 @@ ensure_connected(
) -> ) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{ prepare_adapter_topic(
conninfo = NConnInfo, Channel#channel{
conn_state = connected conninfo = NConnInfo,
}. conn_state = connected
}
).
%% Ensure disconnected %% Ensure disconnected
ensure_disconnected( ensure_disconnected(
@ -802,6 +835,9 @@ is_driver_id_req_exist(#channel{inflight = Inflight}) ->
Key = get_msg_ack(?MC_DRIVER_ID_REPORT, none), Key = get_msg_ack(?MC_DRIVER_ID_REPORT, none),
emqx_inflight:contain(Key, Inflight). emqx_inflight:contain(Key, Inflight).
auth_connect(Frame, Channel) ->
{ok, Channel#channel{auth = authenticate(Frame, Channel)}}.
authenticate(_AuthFrame, #channel{authcode = anonymous}) -> authenticate(_AuthFrame, #channel{authcode = anonymous}) ->
true; true;
authenticate(AuthFrame, #channel{authcode = undefined, auth = Auth}) -> authenticate(AuthFrame, #channel{authcode = undefined, auth = Auth}) ->
@ -836,7 +872,7 @@ enrich_conninfo(
receive_maximum => 0, receive_maximum => 0,
expiry_interval => 0 expiry_interval => 0
}, },
Channel#channel{conninfo = NConnInfo}. {ok, Channel#channel{conninfo = NConnInfo}}.
%% Register %% Register
enrich_clientinfo( enrich_clientinfo(
@ -855,7 +891,7 @@ enrich_clientinfo(
manufacturer => Manu, manufacturer => Manu,
terminal_id => DevId terminal_id => DevId
}), }),
Channel#channel{clientinfo = NClientInfo}; {ok, Channel#channel{clientinfo = NClientInfo}};
%% Auth %% Auth
enrich_clientinfo( enrich_clientinfo(
#{<<"header">> := #{<<"phone">> := Phone}}, #{<<"header">> := #{<<"phone">> := Phone}},
@ -865,7 +901,11 @@ enrich_clientinfo(
phone => Phone, phone => Phone,
clientid => Phone clientid => Phone
}, },
Channel#channel{clientinfo = NClientInfo}. {ok, Channel#channel{clientinfo = NClientInfo}}.
set_log_meta(_Packet, #channel{clientinfo = #{clientid := ClientId}}) ->
emqx_logger:set_metadata_clientid(ClientId),
ok.
prepare_adapter_topic(Channel = #channel{up_topic = UpTopic, dn_topic = DnTopic}) -> prepare_adapter_topic(Channel = #channel{up_topic = UpTopic, dn_topic = DnTopic}) ->
Channel#channel{ Channel#channel{
@ -906,7 +946,7 @@ autosubcribe(#channel{
dn_topic = Topic dn_topic = Topic
}) -> }) ->
SubOpts = #{rap => 0, nl => 0, qos => 0, rh => 0}, SubOpts = #{rap => 0, nl => 0, qos => 0, rh => 0},
emqx:subscribe(Topic, ClientId, SubOpts), _ = emqx_broker:subscribe(Topic, ClientId, SubOpts),
ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts#{is_new => true}]). ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts#{is_new => true}]).
start_keepalive(Secs, _Channel) when Secs > 0 -> start_keepalive(Secs, _Channel) when Secs > 0 ->