feat(rules): hook on bridges events and query bridges from rules

This commit is contained in:
Shawn 2021-09-26 18:53:59 +08:00
parent e630e23846
commit 69f3cce75d
6 changed files with 28 additions and 9 deletions

View File

@ -67,7 +67,7 @@
%% - The execution order is the adding order of callbacks if they have %% - The execution order is the adding order of callbacks if they have
%% equal priority values. %% equal priority values.
-type(hookpoint() :: atom()). -type(hookpoint() :: atom() | binary()).
-type(action() :: {module(), atom(), [term()] | undefined}). -type(action() :: {module(), atom(), [term()] | undefined}).
-type(filter() :: {module(), atom(), [term()] | undefined}). -type(filter() :: {module(), atom(), [term()] | undefined}).
@ -158,12 +158,12 @@ del(HookPoint, Action) ->
gen_server:cast(?SERVER, {del, HookPoint, Action}). gen_server:cast(?SERVER, {del, HookPoint, Action}).
%% @doc Run hooks. %% @doc Run hooks.
-spec(run(atom(), list(Arg::term())) -> ok). -spec(run(hookpoint(), list(Arg::term())) -> ok).
run(HookPoint, Args) -> run(HookPoint, Args) ->
do_run(lookup(HookPoint), Args). do_run(lookup(HookPoint), Args).
%% @doc Run hooks with Accumulator. %% @doc Run hooks with Accumulator.
-spec(run_fold(atom(), list(Arg::term()), Acc::term()) -> Acc::term()). -spec(run_fold(hookpoint(), list(Arg::term()), Acc::term()) -> Acc::term()).
run_fold(HookPoint, Args, Acc) -> run_fold(HookPoint, Args, Acc) ->
do_run_fold(lookup(HookPoint), Args, Acc). do_run_fold(lookup(HookPoint), Args, Acc).

View File

@ -89,12 +89,13 @@ drop_bridge(Name) ->
%% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called %% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called
%% if the bridge received msgs from the remote broker. %% if the bridge received msgs from the remote broker.
on_message_received(Msg, ChannelName) -> on_message_received(Msg, ChannelName) ->
emqx:run_hook(ChannelName, [Msg]). Name = atom_to_binary(ChannelName, utf8),
emqx:run_hook(<<"$bridges/", Name/binary>>, [Msg]).
%% =================================================================== %% ===================================================================
on_start(InstId, Conf) -> on_start(InstId, Conf) ->
logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]), logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]),
NamePrefix = binary_to_list(InstId), "bridge:" ++ NamePrefix = binary_to_list(InstId),
BasicConf = basic_config(Conf), BasicConf = basic_config(Conf),
InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, channels => []}}, InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, channels => []}},
InOutConfigs = taged_map_list(ingress_channels, maps:get(ingress_channels, Conf, #{})) InOutConfigs = taged_map_list(ingress_channels, maps:get(ingress_channels, Conf, #{}))

View File

@ -162,7 +162,7 @@ handle_publish(Msg, undefined) ->
handle_publish(Msg, #{on_message_received := {OnMsgRcvdFunc, Args}} = Vars) -> handle_publish(Msg, #{on_message_received := {OnMsgRcvdFunc, Args}} = Vars) ->
?LOG(debug, "publish to local broker, msg: ~p, vars: ~p", [Msg, Vars]), ?LOG(debug, "publish to local broker, msg: ~p, vars: ~p", [Msg, Vars]),
emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1), emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1),
_ = erlang:apply(OnMsgRcvdFunc, [Msg, Args]), _ = erlang:apply(OnMsgRcvdFunc, [Msg] ++ Args),
case maps:get(local_topic, Vars, undefined) of case maps:get(local_topic, Vars, undefined) of
undefined -> ok; undefined -> ok;
_Topic -> _Topic ->

View File

@ -23,6 +23,7 @@
-export([stop/1]). -export([stop/1]).
start(_Type, _Args) -> start(_Type, _Args) ->
ok = emqx_rule_events:reload(),
emqx_rule_engine_sup:start_link(). emqx_rule_engine_sup:start_link().
stop(_State) -> stop(_State) ->

View File

@ -21,7 +21,8 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-export([ load/1 -export([ reload/0
, load/1
, unload/0 , unload/0
, unload/1 , unload/1
, event_name/1 , event_name/1
@ -36,6 +37,7 @@
, on_message_dropped/4 , on_message_dropped/4
, on_message_delivered/3 , on_message_delivered/3
, on_message_acked/3 , on_message_acked/3
, on_bridge_message_received/2
]). ]).
-export([ event_info/0 -export([ event_info/0
@ -61,6 +63,12 @@
]). ]).
-endif. -endif.
reload() ->
emqx_rule_registry:load_hooks_for_rule(emqx_rule_registry:get_rules()).
load(<<"$bridges/", _ChannelId/binary>> = BridgeTopic) ->
emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received,
[#{bridge_topic => BridgeTopic}]});
load(Topic) -> load(Topic) ->
HookPoint = event_name(Topic), HookPoint = event_name(Topic),
emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [[]]}). emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [[]]}).
@ -77,6 +85,12 @@ unload(Topic) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Callbacks %% Callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
on_bridge_message_received(Message, #{bridge_topic := BridgeTopic}) ->
case emqx_rule_registry:get_rules_for_topic(BridgeTopic) of
[] -> ok;
Rules -> emqx_rule_runtime:apply_rules(Rules, Message)
end.
on_message_publish(Message = #message{topic = Topic}, _Env) -> on_message_publish(Message = #message{topic = Topic}, _Env) ->
case ignore_sys_message(Message) of case ignore_sys_message(Message) of
true -> ok; true -> ok;

View File

@ -238,8 +238,11 @@ handle_output(OutId, Selected, Envs) ->
?LOG(warning, "Output to ~p failed, ~p", [OutId, {Err, Reason, ST}]) ?LOG(warning, "Output to ~p failed, ~p", [OutId, {Err, Reason, ST}])
end. end.
do_handle_output(#{type := bridge, target := ChannelId}, _Selected, _Envs) -> do_handle_output(#{type := bridge, target := ChannelId}, Selected, _Envs) ->
?LOG(warning, "calling bridge from rules has not been implemented yet! ~p", [ChannelId]); ?LOG(debug, "output to bridge: ~p", [ChannelId]),
[Type, BridgeName | _] = string:split(ChannelId, ":", all),
ResId = emqx_bridge:resource_id(<<Type/binary, ":", BridgeName/binary>>),
emqx_resource:query(ResId, {send_to_remote, ChannelId, Selected});
do_handle_output(#{type := func, target := Func} = Out, Selected, Envs) -> do_handle_output(#{type := func, target := Func} = Out, Selected, Envs) ->
erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]); erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]);
do_handle_output(#{type := builtin, target := Output} = Out, Selected, Envs) do_handle_output(#{type := builtin, target := Output} = Out, Selected, Envs)