From e299d8d138ab8477638fd14317344ed117272bf4 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 31 Dec 2021 15:47:03 +0800 Subject: [PATCH] fix(rule): rules not triggered after the ingress mqtt bridge received some msg --- apps/emqx_bridge/src/emqx_bridge.erl | 23 +++++++++++++------ .../src/emqx_rule_runtime.erl | 11 +++++---- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index d46ce217e..1814a11fe 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -90,7 +90,14 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> send_message(BridgeId, Message) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), ResId = emqx_bridge:resource_id(BridgeType, BridgeName), - emqx_resource:query(ResId, {send_message, Message}). + case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of + not_found -> + throw({bridge_not_found, BridgeId}); + #{enable := true} -> + emqx_resource:query(ResId, {send_message, Message}); + #{enable := false} -> + throw({bridge_stopped, BridgeId}) + end. config_key_path() -> [bridges]. @@ -279,6 +286,8 @@ get_matched_bridges(Topic) -> end, Acc0, Conf) end, [], Bridges). +get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) -> + Acc; get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) -> case emqx_topic:match(Topic, Filter) of true -> [bridge_id(BType, BName) | Acc]; @@ -309,21 +318,21 @@ parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) {Type, ConnName} -> ConnectorConfs = emqx:get_config([connectors, Type, ConnName]), make_resource_confs(Direction, ConnectorConfs, - maps:without([connector, direction], Conf), Name); + maps:without([connector, direction], Conf), Type, Name); {_ConnType, _ConnName} -> error({cannot_use_connector_with_different_type, ConnId}) end; -parse_confs(_Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) +parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) when is_map(ConnectorConfs) -> make_resource_confs(Direction, ConnectorConfs, - maps:without([connector, direction], Conf), Name). + maps:without([connector, direction], Conf), Type, Name). -make_resource_confs(ingress, ConnectorConfs, BridgeConf, Name) -> - BName = bin(Name), +make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) -> + BName = bridge_id(Type, Name), ConnectorConfs#{ ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>} }; -make_resource_confs(egress, ConnectorConfs, BridgeConf, _Name) -> +make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) -> ConnectorConfs#{ egress => BridgeConf }. diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 60a7cbaad..428a1ef8c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -101,7 +101,7 @@ do_apply_rule(#{ true -> ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId), Collection2 = filter_collection(Input, InCase, DoEach, Collection), - {ok, [handle_output_list(Outputs, Coll, Input) || Coll <- Collection2]}; + {ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]}; false -> {error, nomatch} end; @@ -118,7 +118,7 @@ do_apply_rule(#{id := RuleId, {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId), - {ok, handle_output_list(Outputs, Selected, Input)}; + {ok, handle_output_list(RuleId, Outputs, Selected, Input)}; false -> {error, nomatch} end. @@ -231,14 +231,15 @@ number(Bin) -> catch error:badarg -> binary_to_float(Bin) end. -handle_output_list(Outputs, Selected, Envs) -> - [handle_output(Out, Selected, Envs) || Out <- Outputs]. +handle_output_list(RuleId, Outputs, Selected, Envs) -> + [handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs]. -handle_output(OutId, Selected, Envs) -> +handle_output(RuleId, OutId, Selected, Envs) -> try do_handle_output(OutId, Selected, Envs) catch Err:Reason:ST -> + ok = emqx_plugin_libs_metrics:inc_failed(rule_metrics, RuleId), ?SLOG(error, #{msg => "output_failed", output => OutId, exception => Err,