Merge pull request #7601 from lafirest/fix/coap_disconnected

fix(coap): fix CoAP incorrect disconnected event
This commit is contained in:
JianBo He 2022-04-13 13:43:58 +08:00 committed by GitHub
commit 587fbf07ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 28 deletions

View File

@ -591,11 +591,18 @@ ensure_disconnected(
Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo
clientinfo = ClientInfo,
conn_state = ConnState
}
) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]),
case ConnState of
connected ->
ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]);
_ ->
ok
end,
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
shutdown_and_reply(Reason, Reply, Channel) ->
@ -713,11 +720,8 @@ process_connection(
{error, ReasonCode, NChannel} ->
ErrMsg = io_lib:format("Login Failed: ~ts", [ReasonCode]),
Payload = iolist_to_binary(ErrMsg),
iter(
Iter,
reply({error, bad_request}, Payload, Req, Result),
NChannel
)
Reply = emqx_coap_message:piggyback({error, bad_request}, Payload, Req),
process_shutdown(Reply, Result, NChannel, Iter)
end;
process_connection(
{open, Req},
@ -763,3 +767,10 @@ process_reply(Reply, Result, #channel{session = Session} = Channel, _) ->
Outs2 = lists:reverse(Outs),
Events = maps:get(events, Result, []),
{ok, [{outgoing, [Reply | Outs2]}] ++ Events, Channel#channel{session = Session2}}.
%% leaf node
process_shutdown(Reply, _Result, Channel, _) ->
% Outs = maps:get(out, Result, []),
% Outs2 = lists:reverse(Outs),
% Events = maps:get(events, Result, []),
{shutdown, normal, Reply, Channel}.

View File

@ -58,12 +58,30 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
Config.
end_per_suite(_) ->
meck:unload(emqx_access_control),
{ok, _} = emqx:remove_config([<<"gateway">>, <<"coap">>]),
emqx_mgmt_api_test_util:end_suite([emqx_gateway]).
init_per_testcase(t_connection_with_authn_failed, Config) ->
ok = meck:expect(
emqx_access_control,
authenticate,
fun(_) -> {error, bad_username_or_password} end
),
Config;
init_per_testcase(_, Config) ->
Config.
end_per_testcase(t_connection_with_authn_failed, Config) ->
ok = meck:delete(emqx_access_control, authenticate, 1),
Config;
end_per_testcase(_, Config) ->
Config.
default_config() ->
?CONF_DEFAULT.
@ -104,6 +122,23 @@ t_connection(_) ->
end,
do(Action).
t_connection_with_authn_failed(_) ->
ChId = {{127, 0, 0, 1}, 5683},
{ok, Sock} = er_coap_udp_socket:start_link(),
{ok, Channel} = er_coap_udp_socket:get_channel(Sock, ChId),
URI =
?MQTT_PREFIX ++
"/connection?clientid=client1&username=admin&password=public",
Req = make_req(post),
?assertMatch({error, bad_request, _}, do_request(Channel, URI, Req)),
timer:sleep(100),
?assertEqual(
[],
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
),
ok.
t_publish(_) ->
Action = fun(Channel, Token) ->
Topic = <<"/abc">>,
@ -128,27 +163,6 @@ t_publish(_) ->
end,
with_connection(Action).
%t_publish_authz_deny(_) ->
% Action = fun(Channel, Token) ->
% Topic = <<"/abc">>,
% Payload = <<"123">>,
% InvalidToken = lists:reverse(Token),
%
% TopicStr = binary_to_list(Topic),
% URI = ?PS_PREFIX ++
% TopicStr ++
% "?clientid=client1&token=" ++ InvalidToken,
%
% %% Sub topic first
% emqx:subscribe(Topic),
%
% Req = make_req(post, Payload),
% Result = do_request(Channel, URI, Req),
% ?assertEqual({error, reset}, Result)
% end,
%
% with_connection(Action).
t_subscribe(_) ->
Topic = <<"/abc">>,
Fun = fun(Channel, Token) ->