fix(coap): fix CoAP incorrect disconnected event
This commit is contained in:
parent
b9f2aa36ea
commit
62fa500f85
|
@ -591,11 +591,18 @@ ensure_disconnected(
|
||||||
Channel = #channel{
|
Channel = #channel{
|
||||||
ctx = Ctx,
|
ctx = Ctx,
|
||||||
conninfo = ConnInfo,
|
conninfo = ConnInfo,
|
||||||
clientinfo = ClientInfo
|
clientinfo = ClientInfo,
|
||||||
|
conn_state = ConnState
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
|
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
|
||||||
ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]),
|
|
||||||
|
case ConnState of
|
||||||
|
connected ->
|
||||||
|
ok = run_hooks(Ctx, 'client.disconnected', [ClientInfo, Reason, NConnInfo]);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
|
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
|
||||||
|
|
||||||
shutdown_and_reply(Reason, Reply, Channel) ->
|
shutdown_and_reply(Reason, Reply, Channel) ->
|
||||||
|
@ -713,11 +720,8 @@ process_connection(
|
||||||
{error, ReasonCode, NChannel} ->
|
{error, ReasonCode, NChannel} ->
|
||||||
ErrMsg = io_lib:format("Login Failed: ~ts", [ReasonCode]),
|
ErrMsg = io_lib:format("Login Failed: ~ts", [ReasonCode]),
|
||||||
Payload = iolist_to_binary(ErrMsg),
|
Payload = iolist_to_binary(ErrMsg),
|
||||||
iter(
|
Reply = emqx_coap_message:piggyback({error, bad_request}, Payload, Req),
|
||||||
Iter,
|
process_shutdown(Reply, Result, NChannel, Iter)
|
||||||
reply({error, bad_request}, Payload, Req, Result),
|
|
||||||
NChannel
|
|
||||||
)
|
|
||||||
end;
|
end;
|
||||||
process_connection(
|
process_connection(
|
||||||
{open, Req},
|
{open, Req},
|
||||||
|
@ -763,3 +767,10 @@ process_reply(Reply, Result, #channel{session = Session} = Channel, _) ->
|
||||||
Outs2 = lists:reverse(Outs),
|
Outs2 = lists:reverse(Outs),
|
||||||
Events = maps:get(events, Result, []),
|
Events = maps:get(events, Result, []),
|
||||||
{ok, [{outgoing, [Reply | Outs2]}] ++ Events, Channel#channel{session = Session2}}.
|
{ok, [{outgoing, [Reply | Outs2]}] ++ Events, Channel#channel{session = Session2}}.
|
||||||
|
|
||||||
|
%% leaf node
|
||||||
|
process_shutdown(Reply, _Result, Channel, _) ->
|
||||||
|
% Outs = maps:get(out, Result, []),
|
||||||
|
% Outs2 = lists:reverse(Outs),
|
||||||
|
% Events = maps:get(events, Result, []),
|
||||||
|
{shutdown, normal, Reply, Channel}.
|
||||||
|
|
|
@ -58,12 +58,30 @@ all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
|
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
|
||||||
emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
|
emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
|
||||||
|
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_) ->
|
end_per_suite(_) ->
|
||||||
|
meck:unload(emqx_access_control),
|
||||||
{ok, _} = emqx:remove_config([<<"gateway">>, <<"coap">>]),
|
{ok, _} = emqx:remove_config([<<"gateway">>, <<"coap">>]),
|
||||||
emqx_mgmt_api_test_util:end_suite([emqx_gateway]).
|
emqx_mgmt_api_test_util:end_suite([emqx_gateway]).
|
||||||
|
|
||||||
|
init_per_testcase(t_connection_with_authn_failed, Config) ->
|
||||||
|
ok = meck:expect(
|
||||||
|
emqx_access_control,
|
||||||
|
authenticate,
|
||||||
|
fun(_) -> {error, bad_username_or_password} end
|
||||||
|
),
|
||||||
|
Config;
|
||||||
|
init_per_testcase(_, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(t_connection_with_authn_failed, Config) ->
|
||||||
|
ok = meck:delete(emqx_access_control, authenticate, 1),
|
||||||
|
Config;
|
||||||
|
end_per_testcase(_, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
default_config() ->
|
default_config() ->
|
||||||
?CONF_DEFAULT.
|
?CONF_DEFAULT.
|
||||||
|
|
||||||
|
@ -104,6 +122,23 @@ t_connection(_) ->
|
||||||
end,
|
end,
|
||||||
do(Action).
|
do(Action).
|
||||||
|
|
||||||
|
t_connection_with_authn_failed(_) ->
|
||||||
|
ChId = {{127, 0, 0, 1}, 5683},
|
||||||
|
{ok, Sock} = er_coap_udp_socket:start_link(),
|
||||||
|
{ok, Channel} = er_coap_udp_socket:get_channel(Sock, ChId),
|
||||||
|
URI =
|
||||||
|
?MQTT_PREFIX ++
|
||||||
|
"/connection?clientid=client1&username=admin&password=public",
|
||||||
|
Req = make_req(post),
|
||||||
|
?assertMatch({error, bad_request, _}, do_request(Channel, URI, Req)),
|
||||||
|
|
||||||
|
timer:sleep(100),
|
||||||
|
?assertEqual(
|
||||||
|
[],
|
||||||
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_publish(_) ->
|
t_publish(_) ->
|
||||||
Action = fun(Channel, Token) ->
|
Action = fun(Channel, Token) ->
|
||||||
Topic = <<"/abc">>,
|
Topic = <<"/abc">>,
|
||||||
|
@ -128,27 +163,6 @@ t_publish(_) ->
|
||||||
end,
|
end,
|
||||||
with_connection(Action).
|
with_connection(Action).
|
||||||
|
|
||||||
%t_publish_authz_deny(_) ->
|
|
||||||
% Action = fun(Channel, Token) ->
|
|
||||||
% Topic = <<"/abc">>,
|
|
||||||
% Payload = <<"123">>,
|
|
||||||
% InvalidToken = lists:reverse(Token),
|
|
||||||
%
|
|
||||||
% TopicStr = binary_to_list(Topic),
|
|
||||||
% URI = ?PS_PREFIX ++
|
|
||||||
% TopicStr ++
|
|
||||||
% "?clientid=client1&token=" ++ InvalidToken,
|
|
||||||
%
|
|
||||||
% %% Sub topic first
|
|
||||||
% emqx:subscribe(Topic),
|
|
||||||
%
|
|
||||||
% Req = make_req(post, Payload),
|
|
||||||
% Result = do_request(Channel, URI, Req),
|
|
||||||
% ?assertEqual({error, reset}, Result)
|
|
||||||
% end,
|
|
||||||
%
|
|
||||||
% with_connection(Action).
|
|
||||||
|
|
||||||
t_subscribe(_) ->
|
t_subscribe(_) ->
|
||||||
Topic = <<"/abc">>,
|
Topic = <<"/abc">>,
|
||||||
Fun = fun(Channel, Token) ->
|
Fun = fun(Channel, Token) ->
|
||||||
|
|
Loading…
Reference in New Issue