diff --git a/apps/emqx_modules/src/emqx_event_message.erl b/apps/emqx_modules/src/emqx_event_message.erl index e02a5e676..b71f107fd 100644 --- a/apps/emqx_modules/src/emqx_event_message.erl +++ b/apps/emqx_modules/src/emqx_event_message.erl @@ -24,6 +24,8 @@ , update/1 , enable/0 , disable/0 + , post_config_update/5 + , init_conf_handler/0 ]). -export([ on_client_connected/2 @@ -39,66 +41,31 @@ -export([reason/1]). -endif. +init_conf_handler() -> + emqx_conf:add_handler([event_message], ?MODULE). + list() -> emqx_conf:get([event_message], #{}). update(Params) -> - disable(), case emqx_conf:update([event_message], - Params, - #{rawconf_with_defaults => true, override_to => cluster}) of + Params, + #{rawconf_with_defaults => true, override_to => cluster}) of {ok, #{raw_config := NewEventMessage}} -> - enable(), {ok, NewEventMessage}; {error, Reason} -> {error, Reason} end. +post_config_update(_KeyPath, _Config, NewConf, _OldConf, _AppEnvs) -> + disable(), + enable(maps:to_list(NewConf)). + enable() -> - lists:foreach(fun({_Topic, false}) -> ok; - ({Topic, true}) -> - case Topic of - client_connected -> - emqx_hooks:put('client.connected', {?MODULE, on_client_connected, []}); - client_disconnected -> - emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, []}); - client_subscribed -> - emqx_hooks:put('session.subscribed', {?MODULE, on_client_subscribed, []}); - client_unsubscribed -> - emqx_hooks:put('session.unsubscribed', {?MODULE, on_client_unsubscribed, []}); - message_delivered -> - emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}); - message_acked -> - emqx_hooks:put('message.acked', {?MODULE, on_message_acked, []}); - message_dropped -> - emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}); - _ -> - ok - end - end, maps:to_list(list())). + enable(maps:to_list(list())). disable() -> - lists:foreach(fun({_Topic, false}) -> ok; - ({Topic, true}) -> - case Topic of - client_connected -> - emqx_hooks:del('client.connected', {?MODULE, on_client_connected}); - client_disconnected -> - emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}); - client_subscribed -> - emqx_hooks:del('session.subscribed', {?MODULE, on_client_subscribed}); - client_unsubscribed -> - emqx_hooks:del('session.unsubscribed', {?MODULE, on_client_unsubscribed}); - message_delivered -> - emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}); - message_acked -> - emqx_hooks:del('message.acked', {?MODULE, on_message_acked}); - message_dropped -> - emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped}); - _ -> - ok - end - end, maps:to_list(list())). + foreach_with(fun check_enable/2, fun emqx_hooks:del/2, maps:to_list(list())). %%-------------------------------------------------------------------- %% Callbacks @@ -107,10 +74,10 @@ disable() -> on_client_connected(ClientInfo, ConnInfo) -> Payload0 = common_infos(ClientInfo, ConnInfo), Payload = Payload0#{ - keepalive => maps:get(keepalive, ConnInfo, 0), - clean_start => maps:get(clean_start, ConnInfo, true), - expiry_interval => maps:get(expiry_interval, ConnInfo, 0) - }, + keepalive => maps:get(keepalive, ConnInfo, 0), + clean_start => maps:get(clean_start, ConnInfo, true), + expiry_interval => maps:get(expiry_interval, ConnInfo, 0) + }, publish_event_msg(<<"$event/client_connected">>, Payload). on_client_disconnected(ClientInfo, @@ -118,14 +85,14 @@ on_client_disconnected(ClientInfo, Payload0 = common_infos(ClientInfo, ConnInfo), Payload = Payload0#{ - reason => reason(Reason), - disconnected_at => DisconnectedAt - }, + reason => reason(Reason), + disconnected_at => DisconnectedAt + }, publish_event_msg(<<"$event/client_disconnected">>, Payload). on_client_subscribed(_ClientInfo = #{clientid := ClientId, username := Username}, - Topic, SubOpts) -> + Topic, SubOpts) -> Payload = #{clientid => ClientId, username => Username, topic => Topic, @@ -136,7 +103,7 @@ on_client_subscribed(_ClientInfo = #{clientid := ClientId, on_client_unsubscribed(_ClientInfo = #{clientid := ClientId, username := Username}, - Topic, _SubOpts) -> + Topic, _SubOpts) -> Payload = #{clientid => ClientId, username => Username, topic => Topic, @@ -281,3 +248,29 @@ ignore_sys_message(#message{flags = Flags}) -> publish_event_msg(Topic, Payload) -> _ = emqx_broker:safe_publish(make_msg(Topic, emqx_json:encode(Payload))), ok. + +enable(List) -> + foreach_with(fun check_enable/2, fun emqx_hooks:put/2, List). + +check_enable(Handler, {client_connected, true}) -> + Handler('client.connected', {?MODULE, on_client_connected, []}); +check_enable(Handler, {client_disconnected, true}) -> + Handler('client.disconnected', {?MODULE, on_client_disconnected, []}); +check_enable(Handler, {client_subscribed, true}) -> + Handler('session.subscribed', {?MODULE, on_client_subscribed, []}); +check_enable(Handler, {client_unsubscribed, true}) -> + Handler('session.unsubscribed', {?MODULE, on_client_unsubscribed, []}); +check_enable(Handler, {message_delivered, true}) -> + Handler('message.delivered', {?MODULE, on_message_delivered, []}); +check_enable(Handler, {message_acked, true}) -> + Handler('message.acked', {?MODULE, on_message_acked, []}); +check_enable(Handler, {message_dropped, true}) -> + Handler('message.dropped', {?MODULE, on_message_dropped, []}); +check_enable(_Handler, {_Topic, _Enable}) -> + ok. + +foreach_with(Fun, With, [H | T]) -> + Fun(With, H), + foreach_with(Fun, With, T); +foreach_with(_Fun, _With, []) -> + ok. diff --git a/apps/emqx_modules/src/emqx_modules_sup.erl b/apps/emqx_modules/src/emqx_modules_sup.erl index 6438f1114..b92058c6d 100644 --- a/apps/emqx_modules/src/emqx_modules_sup.erl +++ b/apps/emqx_modules/src/emqx_modules_sup.erl @@ -37,6 +37,7 @@ start_link() -> %% Supervisor callbacks %%-------------------------------------------------------------------- init([]) -> + emqx_event_message:init_conf_handler(), {ok, {{one_for_one, 10, 3600}, [ ?CHILD(emqx_telemetry) , ?CHILD(emqx_topic_metrics)