Merge pull request #7372 from lafirest/fix/coap_events

fix(coap): trigger disconnect logic when the client manually disconne…
This commit is contained in:
JianBo He 2022-03-23 19:36:23 +08:00 committed by GitHub
commit 617022f276
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 3 deletions

View File

@ -638,7 +638,8 @@ process_connection({open, Req}, Result,
Channel); Channel);
process_connection({close, Msg}, _, Channel, _) -> process_connection({close, Msg}, _, Channel, _) ->
Reply = emqx_coap_message:piggyback({ok, deleted}, Msg), Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
{shutdown, close, Reply, Channel}. NChannel = ensure_disconnected(normal, Channel),
{shutdown, normal, Reply, NChannel}.
process_subscribe({Sub, Msg}, Result, #channel{session = Session} = Channel, Iter) -> process_subscribe({Sub, Msg}, Result, #channel{session = Session} = Channel, Iter) ->
Result2 = emqx_coap_session:process_subscribe(Sub, Msg, Result, Session), Result2 = emqx_coap_session:process_subscribe(Sub, Msg, Result, Session),

View File

@ -292,12 +292,41 @@ t_clients_get_subscription_api(_) ->
end, end,
with_connection(Fun). with_connection(Fun).
t_on_offline_event(_) ->
Fun = fun(Channel) ->
emqx_hooks:add('client.connected', {emqx_sys, on_client_connected, []}),
emqx_hooks:add('client.disconnected', {emqx_sys, on_client_disconnected, []}),
ConnectedSub = <<"$SYS/brokers/+/gateway/coap/clients/+/connected">>,
emqx_broker:subscribe(ConnectedSub),
timer:sleep(100),
Token = connection(Channel),
?assertMatch(#message{}, receive_deliver(500)),
DisconnectedSub = <<"$SYS/brokers/+/gateway/coap/clients/+/disconnected">>,
emqx_broker:subscribe(DisconnectedSub),
timer:sleep(100),
disconnection(Channel, Token),
?assertMatch(#message{}, receive_deliver(500)),
emqx_broker:unsubscribe(ConnectedSub),
emqx_broker:unsubscribe(DisconnectedSub),
emqx_hooks:del('client.connected', {emqx_sys, on_client_connected}),
emqx_hooks:del('client.disconnected', {emqx_sys, on_client_disconnected}),
timer:sleep(500)
end,
do(Fun).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% helpers %% helpers
connection(Channel) -> connection(Channel) ->
URI = ?MQTT_PREFIX ++ URI = ?MQTT_PREFIX ++
"/connection?clientid=client1&username=admin&password=public", "/connection?clientid=client1&username=admin&password=public",
Req = make_req(post), Req = make_req(post),
{ok, created, Data} = do_request(Channel, URI, Req), {ok, created, Data} = do_request(Channel, URI, Req),
#coap_content{payload = BinToken} = Data, #coap_content{payload = BinToken} = Data,
@ -378,3 +407,11 @@ with_connection(Action) ->
timer:sleep(100) timer:sleep(100)
end, end,
do(Fun). do(Fun).
receive_deliver(Wait) ->
receive
{deliver, _, Msg} ->
Msg
after Wait ->
{error, timeout}
end.

View File

@ -42,9 +42,12 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([emqx_modules]), emqx_common_test_helpers:start_apps([emqx_modules]),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?EVENT_MESSAGE), load_config(),
Config. Config.
load_config() ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?EVENT_MESSAGE).
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_modules]). emqx_common_test_helpers:stop_apps([emqx_modules]).