feat(rule-engine): Update the configuration file to hocon

This commit is contained in:
Turtle 2021-06-28 14:02:37 +08:00 committed by turtleDeng
parent 54aeacee14
commit 434beef3ad
1 changed files with 31 additions and 31 deletions

View File

@ -63,10 +63,8 @@
-endif.
load(Topic) ->
IgnoreSys = proplists:get_value(ignore_sys_message, env(), true),
HookPoint = event_name(Topic),
emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint),
[#{ignore_sys_message => IgnoreSys}]}).
emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [[]]}).
unload() ->
lists:foreach(fun(HookPoint) ->
@ -77,23 +75,18 @@ unload(Topic) ->
HookPoint = event_name(Topic),
emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}).
env() ->
application:get_all_env(?APP).
%%--------------------------------------------------------------------
%% Callbacks
%%--------------------------------------------------------------------
on_message_publish(Message = #message{flags = #{event := true}},
_Env) ->
{ok, Message};
on_message_publish(Message = #message{flags = #{sys := true}},
#{ignore_sys_message := true}) ->
{ok, Message};
on_message_publish(Message = #message{topic = Topic}, _Env) ->
case emqx_rule_registry:get_rules_for(Topic) of
[] -> ok;
Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message))
case ignore_sys_message(Message) of
true ->
ok;
false ->
case emqx_rule_registry:get_rules_for(Topic) of
[] -> ok;
Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message))
end
end,
{ok, Message}.
@ -113,28 +106,31 @@ on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env) ->
apply_event('session.unsubscribed',
fun() -> eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts) end, Env).
on_message_dropped(Message = #message{flags = #{sys := true}},
_, _, #{ignore_sys_message := true}) ->
{ok, Message};
on_message_dropped(Message, _, Reason, Env) ->
apply_event('message.dropped',
fun() -> eventmsg_dropped(Message, Reason) end, Env),
case ignore_sys_message(Message) of
true -> ok;
false ->
apply_event('message.dropped',
fun() -> eventmsg_dropped(Message, Reason) end, Env)
end,
{ok, Message}.
on_message_delivered(_ClientInfo, Message = #message{flags = #{sys := true}},
#{ignore_sys_message := true}) ->
{ok, Message};
on_message_delivered(ClientInfo, Message, Env) ->
apply_event('message.delivered',
fun() -> eventmsg_delivered(ClientInfo, Message) end, Env),
case ignore_sys_message(Message) of
true -> ok;
false ->
apply_event('message.delivered',
fun() -> eventmsg_delivered(ClientInfo, Message) end, Env)
end,
{ok, Message}.
on_message_acked(_ClientInfo, Message = #message{flags = #{sys := true}},
#{ignore_sys_message := true}) ->
{ok, Message};
on_message_acked(ClientInfo, Message, Env) ->
apply_event('message.acked',
fun() -> eventmsg_acked(ClientInfo, Message) end, Env),
case ignore_sys_message(Message) of
true -> ok;
false ->
apply_event('message.acked',
fun() -> eventmsg_acked(ClientInfo, Message) end, Env)
end,
{ok, Message}.
%%--------------------------------------------------------------------
@ -597,3 +593,7 @@ printable_maps(Headers) ->
};
(K, V0, AccIn) -> AccIn#{K => V0}
end, #{}, Headers).
ignore_sys_message(#message{flags = Flags}) ->
maps:get(sys, Flags, false) andalso
emqx_config:get([emqx_rule_engine, ignore_sys_message]).