diff --git a/apps/emqx/src/emqx_hooks.erl b/apps/emqx/src/emqx_hooks.erl index 088bb4085..f056b754e 100644 --- a/apps/emqx/src/emqx_hooks.erl +++ b/apps/emqx/src/emqx_hooks.erl @@ -67,7 +67,7 @@ %% - The execution order is the adding order of callbacks if they have %% equal priority values. --type(hookpoint() :: atom()). +-type(hookpoint() :: atom() | binary()). -type(action() :: {module(), atom(), [term()] | undefined}). -type(filter() :: {module(), atom(), [term()] | undefined}). @@ -158,12 +158,12 @@ del(HookPoint, Action) -> gen_server:cast(?SERVER, {del, HookPoint, Action}). %% @doc Run hooks. --spec(run(atom(), list(Arg::term())) -> ok). +-spec(run(hookpoint(), list(Arg::term())) -> ok). run(HookPoint, Args) -> do_run(lookup(HookPoint), Args). %% @doc Run hooks with Accumulator. --spec(run_fold(atom(), list(Arg::term()), Acc::term()) -> Acc::term()). +-spec(run_fold(hookpoint(), list(Arg::term()), Acc::term()) -> Acc::term()). run_fold(HookPoint, Args, Acc) -> do_run_fold(lookup(HookPoint), Args, Acc). diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index a03c888d3..431e94b1e 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -89,12 +89,13 @@ drop_bridge(Name) -> %% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called %% if the bridge received msgs from the remote broker. on_message_received(Msg, ChannelName) -> - emqx:run_hook(ChannelName, [Msg]). + Name = atom_to_binary(ChannelName, utf8), + emqx:run_hook(<<"$bridges/", Name/binary>>, [Msg]). %% =================================================================== on_start(InstId, Conf) -> logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]), - NamePrefix = binary_to_list(InstId), + "bridge:" ++ NamePrefix = binary_to_list(InstId), BasicConf = basic_config(Conf), InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, channels => []}}, InOutConfigs = taged_map_list(ingress_channels, maps:get(ingress_channels, Conf, #{})) diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 8b0aa5051..560500d3d 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -162,7 +162,7 @@ handle_publish(Msg, undefined) -> handle_publish(Msg, #{on_message_received := {OnMsgRcvdFunc, Args}} = Vars) -> ?LOG(debug, "publish to local broker, msg: ~p, vars: ~p", [Msg, Vars]), emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1), - _ = erlang:apply(OnMsgRcvdFunc, [Msg, Args]), + _ = erlang:apply(OnMsgRcvdFunc, [Msg] ++ Args), case maps:get(local_topic, Vars, undefined) of undefined -> ok; _Topic -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index 6b3b3061f..e9b71f89d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -23,6 +23,7 @@ -export([stop/1]). start(_Type, _Args) -> + ok = emqx_rule_events:reload(), emqx_rule_engine_sup:start_link(). stop(_State) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 151af84f0..d030917ef 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -21,7 +21,8 @@ -include_lib("emqx/include/logger.hrl"). --export([ load/1 +-export([ reload/0 + , load/1 , unload/0 , unload/1 , event_name/1 @@ -36,6 +37,7 @@ , on_message_dropped/4 , on_message_delivered/3 , on_message_acked/3 + , on_bridge_message_received/2 ]). -export([ event_info/0 @@ -61,6 +63,12 @@ ]). -endif. +reload() -> + emqx_rule_registry:load_hooks_for_rule(emqx_rule_registry:get_rules()). + +load(<<"$bridges/", _ChannelId/binary>> = BridgeTopic) -> + emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received, + [#{bridge_topic => BridgeTopic}]}); load(Topic) -> HookPoint = event_name(Topic), emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [[]]}). @@ -77,6 +85,12 @@ unload(Topic) -> %%-------------------------------------------------------------------- %% Callbacks %%-------------------------------------------------------------------- +on_bridge_message_received(Message, #{bridge_topic := BridgeTopic}) -> + case emqx_rule_registry:get_rules_for_topic(BridgeTopic) of + [] -> ok; + Rules -> emqx_rule_runtime:apply_rules(Rules, Message) + end. + on_message_publish(Message = #message{topic = Topic}, _Env) -> case ignore_sys_message(Message) of true -> ok; diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 9d8c9eece..a7be19b54 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -238,8 +238,11 @@ handle_output(OutId, Selected, Envs) -> ?LOG(warning, "Output to ~p failed, ~p", [OutId, {Err, Reason, ST}]) end. -do_handle_output(#{type := bridge, target := ChannelId}, _Selected, _Envs) -> - ?LOG(warning, "calling bridge from rules has not been implemented yet! ~p", [ChannelId]); +do_handle_output(#{type := bridge, target := ChannelId}, Selected, _Envs) -> + ?LOG(debug, "output to bridge: ~p", [ChannelId]), + [Type, BridgeName | _] = string:split(ChannelId, ":", all), + ResId = emqx_bridge:resource_id(<>), + emqx_resource:query(ResId, {send_to_remote, ChannelId, Selected}); do_handle_output(#{type := func, target := Func} = Out, Selected, Envs) -> erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]); do_handle_output(#{type := builtin, target := Output} = Out, Selected, Envs)