From 459d2154c79d2f74e6c2e9ca813705375ff13db8 Mon Sep 17 00:00:00 2001 From: Turtle Date: Thu, 29 Jul 2021 23:13:11 +0800 Subject: [PATCH] feat(event-topic): add more test cases --- apps/emqx_modules/include/emqx_modules.hrl | 8 + apps/emqx_modules/src/emqx_event_topic.erl | 80 +++++++--- .../test/emqx_event_topic_SUITE.erl | 143 ++++++++++++++++++ .../emqx_modules/test/emqx_presence_SUITE.erl | 85 ----------- 4 files changed, 209 insertions(+), 107 deletions(-) create mode 100644 apps/emqx_modules/test/emqx_event_topic_SUITE.erl delete mode 100644 apps/emqx_modules/test/emqx_presence_SUITE.erl diff --git a/apps/emqx_modules/include/emqx_modules.hrl b/apps/emqx_modules/include/emqx_modules.hrl index 334173015..b7cdb154e 100644 --- a/apps/emqx_modules/include/emqx_modules.hrl +++ b/apps/emqx_modules/include/emqx_modules.hrl @@ -3,3 +3,11 @@ %% Interval for reporting telemetry data, Default: 7d -define(REPORT_INTERVAR, 604800). + +-define(BASE_TOPICS, [<<"$event/client_connected">>, + <<"$event/client_disconnected">>, + <<"$event/session_subscribed">>, + <<"$event/session_unsubscribed">>, + <<"$event/message_delivered">>, + <<"$event/message_acked">>, + <<"$event/message_dropped">>]). diff --git a/apps/emqx_modules/src/emqx_event_topic.erl b/apps/emqx_modules/src/emqx_event_topic.erl index e84a3d0ec..41dc76746 100644 --- a/apps/emqx_modules/src/emqx_event_topic.erl +++ b/apps/emqx_modules/src/emqx_event_topic.erl @@ -18,6 +18,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). +-include("emqx_modules.hrl"). -export([ enable/0 , disable/0 @@ -37,22 +38,50 @@ -endif. enable() -> - emqx_hooks:put('client.connected', {?MODULE, on_client_connected, []}), - emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, []}), - emqx_hooks:put('session.subscribed', {?MODULE, on_session_subscribed, []}), - emqx_hooks:put('session.unsubscribed', {?MODULE, on_session_unsubscribed, []}), - emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}), - emqx_hooks:put('message.acked', {?MODULE, on_message_acked, []}), - emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}). + Topics = emqx_config:get([event_topic, topics], []), + lists:foreach(fun(Topic) -> + case Topic of + <<"$event/client_connected">> -> + emqx_hooks:put('client.connected', {?MODULE, on_client_connected, []}); + <<"$event/client_disconnected">> -> + emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, []}); + <<"$event/session_subscribed">> -> + emqx_hooks:put('session.subscribed', {?MODULE, on_session_subscribed, []}); + <<"$event/session_unsubscribed">> -> + emqx_hooks:put('session.unsubscribed', {?MODULE, on_session_unsubscribed, []}); + <<"$event/message_delivered">> -> + emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}); + <<"$event/message_acked">> -> + emqx_hooks:put('message.acked', {?MODULE, on_message_acked, []}); + <<"$event/message_dropped">> -> + emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}); + _ -> + ok + end + end, Topics). disable() -> - emqx_hooks:del('client.connected', {?MODULE, on_client_connected}), - emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}), - emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}), - emqx_hooks:del('session.unsubscribed', {?MODULE, session_unsubscribed}), - emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}), - emqx_hooks:del('message.acked', {?MODULE, on_message_acked}), - emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped}). + Topics = emqx_config:get([event_topic, topics], []), + lists:foreach(fun(Topic) -> + case Topic of + <<"$event/client_connected">> -> + emqx_hooks:del('client.connected', {?MODULE, on_client_connected}); + <<"$event/client_disconnected">> -> + emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}); + <<"$event/session_subscribed">> -> + emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}); + <<"$event/session_unsubscribed">> -> + emqx_hooks:del('session.unsubscribed', {?MODULE, session_unsubscribed}); + <<"$event/message_delivered">> -> + emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}); + <<"$event/message_acked">> -> + emqx_hooks:del('message.acked', {?MODULE, on_message_acked}); + <<"$event/message_dropped">> -> + emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped}); + _ -> + ok + end + end, ?BASE_TOPICS -- Topics). %%-------------------------------------------------------------------- %% Callbacks @@ -62,7 +91,8 @@ on_client_connected(ClientInfo, ConnInfo) -> Payload0 = connected_payload(ClientInfo, ConnInfo), emqx_broker:safe_publish( make_msg(<<"$event/client_connected">>, - emqx_json:encode(Payload0))). + emqx_json:encode(Payload0))), + ok. on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username}, Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}) -> @@ -73,8 +103,9 @@ on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Usernam ts => erlang:system_time(millisecond) }, emqx_broker:safe_publish( - make_msg(<<"$event/client_connected">>, - emqx_json:encode(Payload0))). + make_msg(<<"$event/client_disconnected">>, + emqx_json:encode(Payload0))), + ok. on_session_subscribed(_ClientInfo = #{clientid := ClientId, username := Username}, @@ -87,7 +118,8 @@ on_session_subscribed(_ClientInfo = #{clientid := ClientId, }, emqx_broker:safe_publish( make_msg(<<"$event/session_subscribed">>, - emqx_json:encode(Payload0))). + emqx_json:encode(Payload0))), + ok. on_session_unsubscribed(_ClientInfo = #{clientid := ClientId, username := Username}, @@ -99,7 +131,8 @@ on_session_unsubscribed(_ClientInfo = #{clientid := ClientId, }, emqx_broker:safe_publish( make_msg(<<"$event/session_unsubscribed">>, - emqx_json:encode(Payload0))). + emqx_json:encode(Payload0))), + ok. on_message_dropped(Message = #message{from = ClientId}, _, Reason) -> case ignore_sys_message(Message) of @@ -113,7 +146,8 @@ on_message_dropped(Message = #message{from = ClientId}, _, Reason) -> peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)) }, emqx_broker:safe_publish( - make_msg(<<"$event/message_dropped">>, emqx_json:encode(Payload1))) + make_msg(<<"$event/message_dropped">>, emqx_json:encode(Payload1))), + ok end, {ok, Message}. @@ -134,7 +168,8 @@ on_message_delivered(_ClientInfo = #{ peerhost => ntoa(PeerHost) }, emqx_broker:safe_publish( - make_msg(<<"$event/message_delivered">>, emqx_json:encode(Payload1))) + make_msg(<<"$event/message_delivered">>, emqx_json:encode(Payload1))), + ok end, {ok, Message}. @@ -155,7 +190,8 @@ on_message_acked(_ClientInfo = #{ peerhost => ntoa(PeerHost) }, emqx_broker:safe_publish( - make_msg(<<"$event/message_acked">>, emqx_json:encode(Payload1))) + make_msg(<<"$event/message_acked">>, emqx_json:encode(Payload1))), + ok end, {ok, Message}. diff --git a/apps/emqx_modules/test/emqx_event_topic_SUITE.erl b/apps/emqx_modules/test/emqx_event_topic_SUITE.erl new file mode 100644 index 000000000..26898730a --- /dev/null +++ b/apps/emqx_modules/test/emqx_event_topic_SUITE.erl @@ -0,0 +1,143 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_event_topic_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([emqx_modules]), + meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]), + meck:expect(emqx_schema, includes, fun() -> ["event_topic"] end ), + meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_modules_schema:fields(FieldName) end), + ok = emqx_config:update([event_topic, topics], [<<"$event/client_connected">>, + <<"$event/client_disconnected">>, + <<"$event/session_subscribed">>, + <<"$event/session_unsubscribed">>, + <<"$event/message_delivered">>, + <<"$event/message_acked">>, + <<"$event/message_dropped">>]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([emqx_modules]), + meck:unload(emqx_schema). + +t_event_topic(_) -> + ok = emqx_event_topic:enable(), + {ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]), + {ok, _} = emqtt:connect(C1), + {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_connected">>, qos1), + {ok, C2} = emqtt:start_link([{clientid, <<"clientid">>}, + {username, <<"username">>}]), + {ok, _} = emqtt:connect(C2), + ok = recv_connected(<<"clientid">>), + + {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/session_subscribed">>, qos1), + _ = receive_publish(100), + timer:sleep(50), + {ok, _, [?QOS_1]} = emqtt:subscribe(C2, <<"test_sub">>, qos1), + ok = recv_subscribed(<<"clientid">>), + emqtt:unsubscribe(C1, <<"$event/session_subscribed">>), + timer:sleep(50), + + {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_delivered">>, qos1), + {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_acked">>, qos1), + _ = emqx:publish(emqx_message:make(<<"test">>, ?QOS_1, <<"test_sub">>, <<"test">>)), + recv_message_publish(<<"clientid">>), + recv_message_delivered(<<"clientid">>), + recv_message_acked(<<"clientid">>), + + {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_dropped">>, qos1), + ok= emqtt:publish(C2, <<"test_sub1">>, <<"test">>), + recv_message_dropped(<<"clientid">>), + + {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/session_unsubscribed">>, qos1), + _ = emqtt:unsubscribe(C2, <<"test_sub">>), + ok = recv_unsubscribed(<<"clientid">>), + + {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_disconnected">>, qos1), + ok = emqtt:disconnect(C2), + ok = recv_disconnected(<<"clientid">>), + ok = emqtt:disconnect(C1), + ok = emqx_event_topic:disable(). + +t_reason(_) -> + ?assertEqual(normal, emqx_event_topic:reason(normal)), + ?assertEqual(discarded, emqx_event_topic:reason({shutdown, discarded})), + ?assertEqual(tcp_error, emqx_event_topic:reason({tcp_error, einval})), + ?assertEqual(internal_error, emqx_event_topic:reason(<<"unknown error">>)). + +recv_connected(ClientId) -> + {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100), + ?assertMatch(<<"$event/client_connected">>, Topic), + ?assertMatch(#{<<"clientid">> := ClientId, + <<"username">> := <<"username">>, + <<"ipaddress">> := <<"127.0.0.1">>, + <<"proto_name">> := <<"MQTT">>, + <<"proto_ver">> := ?MQTT_PROTO_V4, + <<"connack">> := ?RC_SUCCESS, + <<"clean_start">> := true}, emqx_json:decode(Payload, [return_maps])). + +recv_subscribed(_ClientId) -> + {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), + ?assertMatch(<<"$event/session_subscribed">>, Topic). + +recv_message_dropped(_ClientId) -> + {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), + ?assertMatch(<<"$event/message_dropped">>, Topic). + +recv_message_delivered(_ClientId) -> + {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), + ?assertMatch(<<"$event/message_delivered">>, Topic). + +recv_message_publish(_ClientId) -> + {ok, #{qos := ?QOS_1, topic := Topic}} = receive_publish(100), + ?assertMatch(<<"test_sub">>, Topic). + +recv_message_acked(_ClientId) -> + {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), + ?assertMatch(<<"$event/message_acked">>, Topic). + +recv_unsubscribed(_ClientId) -> + {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), + ?assertMatch(<<"$event/session_unsubscribed">>, Topic). + +recv_disconnected(ClientId) -> + {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100), + ?assertMatch(<<"$event/client_disconnected">>, Topic), + ?assertMatch(#{<<"clientid">> := ClientId, + <<"username">> := <<"username">>, + <<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps])). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +receive_publish(Timeout) -> + receive + {publish, Publish} -> + {ok, Publish} + after + Timeout -> {error, timeout} + end. diff --git a/apps/emqx_modules/test/emqx_presence_SUITE.erl b/apps/emqx_modules/test/emqx_presence_SUITE.erl deleted file mode 100644 index de6e7bccc..000000000 --- a/apps/emqx_modules/test/emqx_presence_SUITE.erl +++ /dev/null @@ -1,85 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_presence_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("emqx/include/emqx_mqtt.hrl"). --include_lib("eunit/include/eunit.hrl"). - -all() -> emqx_ct:all(?MODULE). - -init_per_suite(Config) -> - emqx_ct_helpers:boot_modules(all), - emqx_ct_helpers:start_apps([emqx_modules]), - Config. - -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([emqx_modules]). - -t_mod_presence(_) -> - ok = emqx_presence:enable(), - {ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]), - {ok, _} = emqtt:connect(C1), - {ok, _Props, [?QOS_1]} = emqtt:subscribe(C1, <<"$SYS/brokers/+/clients/#">>, qos1), - %% Connected Presence - {ok, C2} = emqtt:start_link([{clientid, <<"clientid">>}, - {username, <<"username">>}]), - {ok, _} = emqtt:connect(C2), - ok = recv_and_check_presence(<<"clientid">>, <<"connected">>), - %% Disconnected Presence - ok = emqtt:disconnect(C2), - ok = recv_and_check_presence(<<"clientid">>, <<"disconnected">>), - ok = emqtt:disconnect(C1), - ok = emqx_presence:disable(). - -t_mod_presence_reason(_) -> - ?assertEqual(normal, emqx_presence:reason(normal)), - ?assertEqual(discarded, emqx_presence:reason({shutdown, discarded})), - ?assertEqual(tcp_error, emqx_presence:reason({tcp_error, einval})), - ?assertEqual(internal_error, emqx_presence:reason(<<"unknown error">>)). - -recv_and_check_presence(ClientId, Presence) -> - {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100), - ?assertMatch([<<"$SYS">>, <<"brokers">>, _Node, <<"clients">>, ClientId, Presence], - binary:split(Topic, <<"/">>, [global])), - case Presence of - <<"connected">> -> - ?assertMatch(#{<<"clientid">> := <<"clientid">>, - <<"username">> := <<"username">>, - <<"ipaddress">> := <<"127.0.0.1">>, - <<"proto_name">> := <<"MQTT">>, - <<"proto_ver">> := ?MQTT_PROTO_V4, - <<"connack">> := ?RC_SUCCESS, - <<"clean_start">> := true}, emqx_json:decode(Payload, [return_maps])); - <<"disconnected">> -> - ?assertMatch(#{<<"clientid">> := <<"clientid">>, - <<"username">> := <<"username">>, - <<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps])) - end. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -receive_publish(Timeout) -> - receive - {publish, Publish} -> {ok, Publish} - after - Timeout -> {error, timeout} - end.