diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 26689b022..824cfdcb1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -63,10 +63,8 @@ -endif. load(Topic) -> - IgnoreSys = proplists:get_value(ignore_sys_message, env(), true), HookPoint = event_name(Topic), - emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), - [#{ignore_sys_message => IgnoreSys}]}). + emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [[]]}). unload() -> lists:foreach(fun(HookPoint) -> @@ -77,23 +75,18 @@ unload(Topic) -> HookPoint = event_name(Topic), emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}). -env() -> - application:get_all_env(?APP). - %%-------------------------------------------------------------------- %% Callbacks %%-------------------------------------------------------------------- - -on_message_publish(Message = #message{flags = #{event := true}}, - _Env) -> - {ok, Message}; -on_message_publish(Message = #message{flags = #{sys := true}}, - #{ignore_sys_message := true}) -> - {ok, Message}; on_message_publish(Message = #message{topic = Topic}, _Env) -> - case emqx_rule_registry:get_rules_for(Topic) of - [] -> ok; - Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message)) + case ignore_sys_message(Message) of + true -> + ok; + false -> + case emqx_rule_registry:get_rules_for(Topic) of + [] -> ok; + Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message)) + end end, {ok, Message}. @@ -113,28 +106,31 @@ on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env) -> apply_event('session.unsubscribed', fun() -> eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts) end, Env). -on_message_dropped(Message = #message{flags = #{sys := true}}, - _, _, #{ignore_sys_message := true}) -> - {ok, Message}; on_message_dropped(Message, _, Reason, Env) -> - apply_event('message.dropped', - fun() -> eventmsg_dropped(Message, Reason) end, Env), + case ignore_sys_message(Message) of + true -> ok; + false -> + apply_event('message.dropped', + fun() -> eventmsg_dropped(Message, Reason) end, Env) + end, {ok, Message}. -on_message_delivered(_ClientInfo, Message = #message{flags = #{sys := true}}, - #{ignore_sys_message := true}) -> - {ok, Message}; on_message_delivered(ClientInfo, Message, Env) -> - apply_event('message.delivered', - fun() -> eventmsg_delivered(ClientInfo, Message) end, Env), + case ignore_sys_message(Message) of + true -> ok; + false -> + apply_event('message.delivered', + fun() -> eventmsg_delivered(ClientInfo, Message) end, Env) + end, {ok, Message}. -on_message_acked(_ClientInfo, Message = #message{flags = #{sys := true}}, - #{ignore_sys_message := true}) -> - {ok, Message}; on_message_acked(ClientInfo, Message, Env) -> - apply_event('message.acked', - fun() -> eventmsg_acked(ClientInfo, Message) end, Env), + case ignore_sys_message(Message) of + true -> ok; + false -> + apply_event('message.acked', + fun() -> eventmsg_acked(ClientInfo, Message) end, Env) + end, {ok, Message}. %%-------------------------------------------------------------------- @@ -597,3 +593,7 @@ printable_maps(Headers) -> }; (K, V0, AccIn) -> AccIn#{K => V0} end, #{}, Headers). + +ignore_sys_message(#message{flags = Flags}) -> + maps:get(sys, Flags, false) andalso + emqx_config:get([emqx_rule_engine, ignore_sys_message]).