From ba6cfd595be75101e43063b7cfa9e872ee3e9839 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 4 Mar 2022 14:26:44 +0800 Subject: [PATCH 1/6] feat(sys): support client events notification --- apps/emqx/etc/emqx.conf | 61 ++++++++++---- apps/emqx/src/emqx_schema.erl | 64 ++++++++++++--- apps/emqx/src/emqx_sys.erl | 146 +++++++++++++++++++++++++++++++--- 3 files changed, 235 insertions(+), 36 deletions(-) diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 7bd8cfe73..e47e8d04e 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -1009,22 +1009,6 @@ zones.default { ## Broker ##================================================================== broker { - ## System interval of publishing $SYS messages. - ## - ## @doc broker.sys_msg_interval - ## ValueType: Duration | disabled - ## Default: 1m - sys_msg_interval = 1m - - ## System heartbeat interval of publishing following heart beat message: - ## - "$SYS/brokers//uptime" - ## - "$SYS/brokers//datetime" - ## - ## @doc broker.sys_heartbeat_interval - ## ValueType: Duration - ## Default: 30s | disabled - sys_heartbeat_interval = 30s - ## Session locking strategy in a cluster. ## ## @doc broker.session_locking_strategy @@ -1095,6 +1079,51 @@ broker { perf.trie_compaction = true } +##================================================================== +## System Topic +##================================================================== + +sys_topic { + ## System interval of publishing $SYS messages. + ## + ## @doc broker.sys_msg_interval + ## ValueType: Duration | disabled + ## Default: 1m + sys_msg_interval = 1m + + ## System heartbeat interval of publishing following heart beat message: + ## - "$SYS/brokers//uptime" + ## - "$SYS/brokers//datetime" + ## + ## @doc broker.sys_heartbeat_interval + ## ValueType: Duration + ## Default: 30s | disabled + sys_heartbeat_interval = 30s + + ## Whether to enable Client lifecycle event messages publish. + ## The following options are not only for enabling MQTT client event messages + ## publish but also for Gateway clients. However, these kinds of clients type + ## are distinguished by the Topic prefix: + ## - For the MQTT client, its event topic format is: + ## $SYS/broker//clients// + ## - For the Gateway client, it is + ## $SYS/broker//gateway//clients// + sys_event_messages { + ## Enable to publish client connected event messages. + ## - Topic: "$SYS/broker//clients//connected" + client_connected = true + ## Enable to publish client disconnected event messages. + ## - Topic: "$SYS/broker//clients//disconnected" + client_disconnected = true + ## Enable to publish event message that client subscribed a topic successfully. + ## - Topic: "$SYS/broker//clients//subscribed" + client_subscribed = false + ## Enable to publish event message that client unsubscribed a topic successfully. + ## - Topic: "$SYS/broker//clients//unsubscribed" + client_unsubscribed = false + } +} + ##================================================================== ## System Monitor ##================================================================== diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index d09acd25e..01b93cc06 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -144,6 +144,9 @@ roots(medium) -> [ {"broker", sc(ref("broker"), #{})} + , {"sys_topic", + sc(ref("sys_topic"), + #{})} , {"rate_limit", sc(ref("rate_limit"), #{})} @@ -857,17 +860,7 @@ fields("deflate_opts") -> ]; fields("broker") -> - [ {"sys_msg_interval", - sc(hoconsc:union([disabled, duration()]), - #{ default => "1m" - }) - } - , {"sys_heartbeat_interval", - sc(hoconsc:union([disabled, duration()]), - #{ default => "30s" - }) - } - , {"enable_session_registry", + [ {"enable_session_registry", sc(boolean(), #{ default => true }) @@ -909,6 +902,55 @@ fields("broker_perf") -> })} ]; +fields("sys_topic") -> + [ {"sys_msg_interval", + sc(hoconsc:union([disabled, duration()]), + #{ default => "1m" + }) + } + , {"sys_heartbeat_interval", + sc(hoconsc:union([disabled, duration()]), + #{ default => "30s" + }) + } + , {"sys_event_messages", + sc(ref("event_names"), + #{ desc => + """Whether to enable Client lifecycle event messages publish.
+The following options are not only for enabling MQTT client event messages +publish but also for Gateway clients. However, these kinds of clients type +are distinguished by the Topic prefix: +- For the MQTT client, its event topic format is:
+ $SYS/broker//clients//
+- For the Gateway client, it is + $SYS/broker//gateway//clients//""" + }) + } + ]; + +fields("event_names") -> + [ {"client_connected", + sc(boolean(), + #{default => true + }) + } + , {"client_disconnected", + sc(boolean(), + #{default => true + }) + } + , {"client_subscribed", + sc(boolean(), + #{default => false + }) + } + , {"client_unsubscribed", + sc(boolean(), + #{default => false + }) + } + ]; + fields("sysmon") -> [ {"vm", sc(ref("sysmon_vm"), diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index 1beac2092..0d3a78fef 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -43,6 +43,12 @@ , terminate/2 ]). +-export([ on_client_connected/2 + , on_client_disconnected/3 + , on_client_subscribed/3 + , on_client_unsubscribed/3 + ]). + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -67,9 +73,9 @@ , sysdescr % Broker description ]). -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% APIs -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- -spec(start_link() -> {ok, pid()} | ignore | {error, any()}). start_link() -> @@ -101,10 +107,13 @@ datetime() -> "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). sys_interval() -> - emqx:get_config([broker, sys_msg_interval]). + emqx:get_config([sys_topic, sys_msg_interval]). sys_heatbeat_interval() -> - emqx:get_config([broker, sys_heartbeat_interval]). + emqx:get_config([sys_topic, sys_heartbeat_interval]). + +sys_event_message() -> + emqx:get_config([sys_topic, sys_event_messages]). %% @doc Get sys info -spec(info() -> list(tuple())). @@ -114,12 +123,13 @@ info() -> {uptime, uptime()}, {datetime, datetime()}]. -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- %% gen_server callbacks -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------- init([]) -> State = #state{sysdescr = iolist_to_binary(sysdescr())}, + load_event_hooks(), {ok, heartbeat(tick(State))}. heartbeat(State) -> @@ -127,6 +137,14 @@ heartbeat(State) -> tick(State) -> State#state{ticker = start_timer(sys_interval(), tick)}. +load_event_hooks() -> + maps:foreach( + fun(_, false) -> ok; + (K, true) -> + {HookPoint, Fun} = hook_and_fun(K), + emqx_hooks:put(HookPoint, {?MODULE, Fun, []}) + end, sys_event_message()). + handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -153,11 +171,81 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> + unload_event_hooks(), lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]). -%%----------------------------------------------------------------------------- +unload_event_hooks() -> + lists:foreach(fun(K, _) -> + {HookPoint, Fun} = hook_and_fun(K), + emqx_hooks:del(HookPoint, {?MODULE, Fun}) + end, sys_event_message()). + +%%-------------------------------------------------------------------- +%% hook callbacks +%%-------------------------------------------------------------------- + +on_client_connected(ClientInfo, ConnInfo) -> + Payload0 = common_infos(ClientInfo, ConnInfo), + Payload = Payload0#{ + keepalive => maps:get(keepalive, ConnInfo, 0), + clean_start => maps:get(clean_start, ConnInfo, true), + expiry_interval => maps:get(expiry_interval, ConnInfo, 0) + }, + publish(connected, Payload). + +on_client_disconnected(ClientInfo, Reason, + ConnInfo = #{disconnected_at := DisconnectedAt}) -> + + Payload0 = common_infos(ClientInfo, ConnInfo), + Payload = Payload0#{ + reason => reason(Reason), + disconnected_at => DisconnectedAt + }, + publish(disconnected, Payload). + +-compile({inline, [reason/1]}). +reason(Reason) when is_atom(Reason) -> Reason; +reason({shutdown, Reason}) when is_atom(Reason) -> Reason; +reason({Error, _}) when is_atom(Error) -> Error; +reason(_) -> internal_error. + +on_client_subscribed(_ClientInfo = #{clientid := ClientId, + username := Username, + protocol := Protocol}, + Topic, SubOpts) -> + Payload = #{clientid => ClientId, + username => Username, + protocol => Protocol, + topic => Topic, + subopts => SubOpts, + ts => erlang:system_time(millisecond) + }, + publish(subscribed, Payload). + +on_client_unsubscribed(_ClientInfo = #{clientid := ClientId, + username := Username, + protocol := Protocol}, + Topic, _SubOpts) -> + Payload = #{clientid => ClientId, + username => Username, + protocol => Protocol, + topic => Topic, + ts => erlang:system_time(millisecond) + }, + publish(unsubscribed, Payload). + +%%-------------------------------------------------------------------- %% Internal functions -%%----------------------------------------------------------------------------- +%%-------------------------------------------------------------------- + +hook_and_fun(client_connected) -> + {'client.connected', on_client_connected}; +hook_and_fun(client_disconnected) -> + {'client.disconnected', on_client_disconnected}; +hook_and_fun(client_subscribed) -> + {'session.subscribed', on_client_subscribed}; +hook_and_fun(client_unsubscribed) -> + {'session.unsubscribed', on_client_unsubscribed}. publish_any(Name, Value) -> _ = publish(Name, Value), @@ -179,7 +267,11 @@ publish(stats, Stats) -> || {Stat, Val} <- Stats, is_atom(Stat), is_integer(Val)]; publish(metrics, Metrics) -> [safe_publish(systop(metric_topic(Name)), integer_to_binary(Val)) - || {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)]. + || {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)]; +publish(Event, Payload) when Event == connected; Event == disconnected; + Event == subscribed; Event == unsubscribed -> + Topic = event_topic(Event, Payload), + safe_publish(Topic, emqx_json:encode(Payload)). metric_topic(Name) -> lists:concat(["metrics/", string:replace(atom_to_list(Name), ".", "/", all)]). @@ -191,3 +283,39 @@ safe_publish(Topic, Flags, Payload) -> emqx_message:set_flags( maps:merge(#{sys => true}, Flags), emqx_message:make(?SYS, Topic, iolist_to_binary(Payload)))). + +common_infos( + _ClientInfo = #{clientid := ClientId, + username := Username, + peerhost := PeerHost, + sockport := SockPort, + protocol := Protocol + }, + _ConnInfo = #{proto_name := ProtoName, + proto_ver := ProtoVer, + connected_at := ConnectedAt + }) -> + #{clientid => ClientId, + username => Username, + ipaddress => ntoa(PeerHost), + sockport => SockPort, + protocol => Protocol, + proto_name => ProtoName, + proto_ver => ProtoVer, + connected_at => ConnectedAt, + ts => erlang:system_time(millisecond) + }. + +ntoa(undefined) -> undefined; +ntoa({IpAddr, Port}) -> + iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]); +ntoa(IpAddr) -> + iolist_to_binary(inet:ntoa(IpAddr)). + +event_topic(Event, #{clientid := ClientId, protocol := mqtt}) -> + iolist_to_binary( + [systop("clients"), "/", ClientId, "/", atom_to_binary(Event)]); +event_topic(Event, #{clientid := ClientId, protocol := GwName}) -> + iolist_to_binary( + [systop("gateway"), "/", atom_to_binary(GwName), + "/clients/", ClientId, "/", atom_to_binary(Event)]). From ecc8d92e6c1709b9df13d579b161d05f6c3b4ac5 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 4 Mar 2022 14:45:52 +0800 Subject: [PATCH 2/6] test(sys): refine prop_emqx_sys --- apps/emqx/src/emqx_sys.erl | 2 +- apps/emqx/test/emqx_sys_SUITE.erl | 18 ------------------ apps/emqx/test/props/prop_emqx_sys.erl | 13 ++++++++++--- 3 files changed, 11 insertions(+), 22 deletions(-) diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index 0d3a78fef..3ba331b69 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -175,7 +175,7 @@ terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]). unload_event_hooks() -> - lists:foreach(fun(K, _) -> + maps:foreach(fun(K, _) -> {HookPoint, Fun} = hook_and_fun(K), emqx_hooks:del(HookPoint, {?MODULE, Fun}) end, sys_event_message()). diff --git a/apps/emqx/test/emqx_sys_SUITE.erl b/apps/emqx/test/emqx_sys_SUITE.erl index 094778c0e..6af42ca85 100644 --- a/apps/emqx/test/emqx_sys_SUITE.erl +++ b/apps/emqx/test/emqx_sys_SUITE.erl @@ -32,21 +32,3 @@ end_per_suite(_Config) -> application:unload(emqx), ok = emqx_logger:set_log_level(error), ok. - -% t_version(_) -> -% error('TODO'). - -% t_sysdescr(_) -> -% error('TODO'). - -% t_datetime(_) -> -% error('TODO'). - -% t_sys_interval(_) -> -% error('TODO'). - -% t_sys_heatbeat_interval(_) -> -% error('TODO'). - -% t_info(_) -> -% error('TODO'). diff --git a/apps/emqx/test/props/prop_emqx_sys.erl b/apps/emqx/test/props/prop_emqx_sys.erl index 780992951..12ca0842c 100644 --- a/apps/emqx/test/props/prop_emqx_sys.erl +++ b/apps/emqx/test/props/prop_emqx_sys.erl @@ -30,6 +30,7 @@ , emqx_stats , emqx_broker , mria_mnesia + , emqx_hooks ]). -define(ALL(Vars, Types, Exprs), @@ -59,8 +60,11 @@ prop_sys() -> do_setup() -> ok = emqx_logger:set_log_level(emergency), - emqx_config:put([broker, sys_msg_interval], 60000), - emqx_config:put([broker, sys_heartbeat_interval], 30000), + emqx_config:put([sys_topic, sys_msg_interval], 60000), + emqx_config:put([sys_topic, sys_heartbeat_interval], 30000), + emqx_config:put([sys_topic, sys_event_messages], + #{client_connected => true, client_disconnected => true, + client_subscribed => true, client_unsubscribed => true}), [mock(Mod) || Mod <- ?mock_modules], ok. @@ -83,7 +87,10 @@ do_mock(emqx_stats) -> do_mock(mria_mnesia) -> meck:expect(mria_mnesia, running_nodes, fun() -> [node()] end); do_mock(emqx_metrics) -> - meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end). + meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end); +do_mock(emqx_hooks) -> + meck:expect(emqx_hooks, put, fun(_HookPoint, _MFA) -> ok end), + meck:expect(emqx_hooks, del, fun(_HookPoint, _MF) -> ok end). %%-------------------------------------------------------------------- %% MODEL From 6dac422c9348ffd9c812ac1350c17aa137920441 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 4 Mar 2022 15:08:28 +0800 Subject: [PATCH 3/6] chore: rename sys_topic to sys_topics --- apps/emqx/etc/emqx.conf | 2 +- apps/emqx/src/emqx_schema.erl | 6 +++--- apps/emqx/src/emqx_sys.erl | 18 +++++++++--------- apps/emqx/test/props/prop_emqx_sys.erl | 6 +++--- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index e47e8d04e..e6249a039 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -1083,7 +1083,7 @@ broker { ## System Topic ##================================================================== -sys_topic { +sys_topics { ## System interval of publishing $SYS messages. ## ## @doc broker.sys_msg_interval diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 01b93cc06..94de42e31 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -144,8 +144,8 @@ roots(medium) -> [ {"broker", sc(ref("broker"), #{})} - , {"sys_topic", - sc(ref("sys_topic"), + , {"sys_topics", + sc(ref("sys_topics"), #{})} , {"rate_limit", sc(ref("rate_limit"), @@ -902,7 +902,7 @@ fields("broker_perf") -> })} ]; -fields("sys_topic") -> +fields("sys_topics") -> [ {"sys_msg_interval", sc(hoconsc:union([disabled, duration()]), #{ default => "1m" diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index 3ba331b69..9b10ba23b 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -107,13 +107,13 @@ datetime() -> "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). sys_interval() -> - emqx:get_config([sys_topic, sys_msg_interval]). + emqx:get_config([sys_topics, sys_msg_interval]). sys_heatbeat_interval() -> - emqx:get_config([sys_topic, sys_heartbeat_interval]). + emqx:get_config([sys_topics, sys_heartbeat_interval]). sys_event_message() -> - emqx:get_config([sys_topic, sys_event_messages]). + emqx:get_config([sys_topics, sys_event_messages]). %% @doc Get sys info -spec(info() -> list(tuple())). @@ -138,12 +138,12 @@ tick(State) -> State#state{ticker = start_timer(sys_interval(), tick)}. load_event_hooks() -> - maps:foreach( - fun(_, false) -> ok; - (K, true) -> + lists:foreach( + fun({_, false}) -> ok; + ({K, true}) -> {HookPoint, Fun} = hook_and_fun(K), emqx_hooks:put(HookPoint, {?MODULE, Fun, []}) - end, sys_event_message()). + end, maps:to_list(sys_event_message())). handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), @@ -175,10 +175,10 @@ terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]). unload_event_hooks() -> - maps:foreach(fun(K, _) -> + lists:foreach(fun({K, _}) -> {HookPoint, Fun} = hook_and_fun(K), emqx_hooks:del(HookPoint, {?MODULE, Fun}) - end, sys_event_message()). + end, maps:to_list(sys_event_message())). %%-------------------------------------------------------------------- %% hook callbacks diff --git a/apps/emqx/test/props/prop_emqx_sys.erl b/apps/emqx/test/props/prop_emqx_sys.erl index 12ca0842c..c65383d81 100644 --- a/apps/emqx/test/props/prop_emqx_sys.erl +++ b/apps/emqx/test/props/prop_emqx_sys.erl @@ -60,9 +60,9 @@ prop_sys() -> do_setup() -> ok = emqx_logger:set_log_level(emergency), - emqx_config:put([sys_topic, sys_msg_interval], 60000), - emqx_config:put([sys_topic, sys_heartbeat_interval], 30000), - emqx_config:put([sys_topic, sys_event_messages], + emqx_config:put([sys_topics, sys_msg_interval], 60000), + emqx_config:put([sys_topics, sys_heartbeat_interval], 30000), + emqx_config:put([sys_topics, sys_event_messages], #{client_connected => true, client_disconnected => true, client_subscribed => true, client_unsubscribed => true}), [mock(Mod) || Mod <- ?mock_modules], From 0c8d739e2fb8dc78a52313e1b511a64e7797156b Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 4 Mar 2022 17:16:46 +0800 Subject: [PATCH 4/6] test: change assert condition --- apps/emqx/test/emqx_config_SUITE.erl | 4 +--- apps/emqx_exhook/test/emqx_exhook_SUITE.erl | 6 ++++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index a9ee5645f..46ef7cba8 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -45,9 +45,7 @@ t_fill_default_values(_) -> <<"route_batch_clean">> := false, <<"session_locking_strategy">> := quorum, <<"shared_dispatch_ack_enabled">> := false, - <<"shared_subscription_strategy">> := round_robin, - <<"sys_heartbeat_interval">> := <<"30s">>, - <<"sys_msg_interval">> := <<"1m">>}}, WithDefaults), + <<"shared_subscription_strategy">> := round_robin}}, WithDefaults), %% ensure JSON compatible _ = emqx_json:encode(WithDefaults), ok. diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index 26fe652d9..64a81525d 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -24,6 +24,12 @@ -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CONF_DEFAULT, <<" +sys_topics { + sys_event_messages { + client_connected = false + client_disconnected = false + } +} exhook { servers = [ { name = default, From 857bd1e12a470454a0fedc094e9d05084aec10f6 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 7 Mar 2022 11:11:15 +0800 Subject: [PATCH 5/6] test(exhook): remove helpless test case --- apps/emqx/src/emqx_sys.erl | 6 +++--- apps/emqx_exhook/test/emqx_exhook_SUITE.erl | 12 ------------ 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index 9b10ba23b..2deb250d8 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -112,7 +112,7 @@ sys_interval() -> sys_heatbeat_interval() -> emqx:get_config([sys_topics, sys_heartbeat_interval]). -sys_event_message() -> +sys_event_messages() -> emqx:get_config([sys_topics, sys_event_messages]). %% @doc Get sys info @@ -143,7 +143,7 @@ load_event_hooks() -> ({K, true}) -> {HookPoint, Fun} = hook_and_fun(K), emqx_hooks:put(HookPoint, {?MODULE, Fun, []}) - end, maps:to_list(sys_event_message())). + end, maps:to_list(sys_event_messages())). handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), @@ -178,7 +178,7 @@ unload_event_hooks() -> lists:foreach(fun({K, _}) -> {HookPoint, Fun} = hook_and_fun(K), emqx_hooks:del(HookPoint, {?MODULE, Fun}) - end, maps:to_list(sys_event_message())). + end, maps:to_list(sys_event_messages())). %%-------------------------------------------------------------------- %% hook callbacks diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index 64a81525d..d89983449 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -24,12 +24,6 @@ -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CONF_DEFAULT, <<" -sys_topics { - sys_event_messages { - client_connected = false - client_disconnected = false - } -} exhook { servers = [ { name = default, @@ -96,12 +90,6 @@ end_per_testcase(_, Config) -> %% Test cases %%-------------------------------------------------------------------- -t_noserver_nohook(_) -> - emqx_exhook_mgr:disable(<<"default">>), - ?assertEqual([], ets:tab2list(emqx_hooks)), - {ok, _} = emqx_exhook_mgr:enable(<<"default">>), - ?assertNotEqual([], ets:tab2list(emqx_hooks)). - t_access_failed_if_no_server_running(_) -> emqx_exhook_mgr:disable(<<"default">>), ClientInfo = #{clientid => <<"user-id-1">>, From 321ad09882cdceeea9b8a34ea09bb7285ea537bc Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 9 Mar 2022 16:46:19 +0800 Subject: [PATCH 6/6] chore: fix elvis warnings --- apps/emqx_exhook/test/emqx_exhook_SUITE.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index d89983449..ed3e529f6 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -146,7 +146,9 @@ t_error_update_conf(_) -> ErrorAnd = #{<<"name">> => Name, <<"url">> => <<"http://127.0.0.1:9001">>}, {ok, _} = emqx_exhook_mgr:update_config(Path, {add, ErrorAnd}), - DisableAnd = #{<<"name">> => Name, <<"url">> => <<"http://127.0.0.1:9001">>, <<"enable">> => false}, + DisableAnd = #{<<"name">> => Name, + <<"url">> => <<"http://127.0.0.1:9001">>, + <<"enable">> => false}, {ok, _} = emqx_exhook_mgr:update_config(Path, {add, DisableAnd}), {ok, _} = emqx_exhook_mgr:update_config(Path, {delete, <<"error">>}),