From 29fe201676703668e5b279eef26e63238c23fbf8 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 31 May 2023 17:08:42 +0300 Subject: [PATCH] fix(ruleeng): parse bridge id on create / update Instead of parsing it every time in the `send_message/2` hot path. --- apps/emqx_bridge/src/emqx_bridge.erl | 12 +- apps/emqx_bridge/src/emqx_bridge_resource.erl | 5 +- .../src/emqx_rule_actions.erl | 10 +- .../emqx_rule_engine/src/emqx_rule_engine.erl | 59 +++++---- .../src/emqx_rule_engine_api.erl | 6 +- .../src/emqx_rule_runtime.erl | 15 +-- .../test/emqx_rule_engine_SUITE.erl | 112 +++++++----------- 7 files changed, 109 insertions(+), 110 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index cbbe937aa..46c822ed0 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -42,7 +42,10 @@ list/0 ]). --export([send_message/2]). +-export([ + send_message/2, + send_message/4 +]). -export([config_key_path/0]). @@ -199,14 +202,17 @@ send_to_matched_egress_bridges(Topic, Msg) -> send_message(BridgeId, Message) -> {BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId), ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), + send_message(BridgeType, BridgeName, ResId, Message). + +send_message(BridgeType, BridgeName, ResId, Message) -> case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of not_found -> - {error, {bridge_not_found, BridgeId}}; + {error, bridge_not_found}; #{enable := true} = Config -> QueryOpts = query_opts(Config), emqx_resource:query(ResId, {send_message, Message}, QueryOpts); #{enable := false} -> - {error, {bridge_stopped, BridgeId}} + {error, bridge_stopped} end. query_opts(Config) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 1a3edb484..eee39bd56 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -112,7 +112,10 @@ validate_name(Name0, Opts) -> case lists:all(fun is_id_char/1, Name) of true -> case maps:get(atom_name, Opts, true) of - true -> list_to_existing_atom(Name); + % NOTE + % Rule may be created before bridge, thus not `list_to_existing_atom/1`, + % also it is infrequent user input anyway. + true -> list_to_atom(Name); false -> Name0 end; false -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index a9f24ddcd..b562af09d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -146,21 +146,25 @@ get_action_mod_func(ActionFunc) when is_binary(ActionFunc) -> try binary_to_existing_atom(Bin) of Atom -> Atom catch - error:badarg -> error({unknown_action_function, ActionFunc}) + error:badarg -> validation_error(unknown_action_function) end end, case string:split(ActionFunc, ":", all) of [Func1] -> {emqx_rule_actions, ToAtom(Func1)}; [Mod1, Func1] -> {ToAtom(Mod1), ToAtom(Func1)}; - _ -> error({invalid_action_function, ActionFunc}) + _ -> validation_error(invalid_action_function) end. assert_function_supported(Mod, Func) -> case erlang:function_exported(Mod, Func, 3) of true -> ok; - false -> error({action_function_not_supported, Func}) + false -> validation_error(action_function_not_supported) end. +-spec validation_error(any()) -> no_return(). +validation_error(Reason) -> + throw(#{kind => validation_error, reason => Reason}). + pre_process_args(Mod, Func, Args) -> case erlang:function_exported(Mod, pre_process_action_args, 2) of true -> Mod:pre_process_action_args(Func, Args); diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 9dd94970b..d8a798eb8 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -36,7 +36,6 @@ -export([ create_rule/1, - insert_rule/1, update_rule/1, delete_rule/1, get_rule/1 @@ -116,25 +115,30 @@ start_link() -> post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) -> #{added := Added, removed := Removed, changed := Updated} = emqx_utils_maps:diff_maps(NewRules, OldRules), - maps_foreach( - fun({Id, {_Old, New}}) -> - {ok, _} = update_rule(New#{id => bin(Id)}) - end, - Updated - ), - maps_foreach( - fun({Id, _Rule}) -> - ok = delete_rule(bin(Id)) - end, - Removed - ), - maps_foreach( - fun({Id, Rule}) -> - {ok, _} = create_rule(Rule#{id => bin(Id)}) - end, - Added - ), - {ok, get_rules()}. + try + maps_foreach( + fun({Id, {_Old, New}}) -> + {ok, _} = update_rule(New#{id => bin(Id)}) + end, + Updated + ), + maps_foreach( + fun({Id, _Rule}) -> + ok = delete_rule(bin(Id)) + end, + Removed + ), + maps_foreach( + fun({Id, Rule}) -> + {ok, _} = create_rule(Rule#{id => bin(Id)}) + end, + Added + ), + {ok, get_rules()} + catch + throw:#{kind := _} = Error -> + {error, Error} + end. %%------------------------------------------------------------------------------ %% APIs for rules @@ -154,7 +158,7 @@ load_rules() -> -spec create_rule(map()) -> {ok, rule()} | {error, term()}. create_rule(Params) -> - create_rule(Params, now_ms()). + create_rule(Params, maps:get(created_at, Params, now_ms())). create_rule(Params = #{id := RuleId}, CreatedAt) when is_binary(RuleId) -> case get_rule(RuleId) of @@ -451,8 +455,9 @@ parse_actions(Actions) -> do_parse_action(Action) when is_map(Action) -> emqx_rule_actions:parse_action(Action); -do_parse_action(BridgeChannelId) when is_binary(BridgeChannelId) -> - BridgeChannelId. +do_parse_action(BridgeId) when is_binary(BridgeId) -> + {Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId), + {bridge, Type, Name, emqx_bridge_resource:resource_id(Type, Name)}. get_all_records(Tab) -> [Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)]. @@ -484,7 +489,8 @@ contains_actions(Actions, Mod0, Func0) -> ). forwards_to_bridge(Actions, BridgeId) -> - lists:any(fun(A) -> A =:= BridgeId end, Actions). + Action = do_parse_action(BridgeId), + lists:any(fun(A) -> A =:= Action end, Actions). references_ingress_bridge(Froms, BridgeId) -> lists:member( @@ -506,4 +512,7 @@ get_referenced_hookpoints(Froms) -> ]. get_egress_bridges(Actions) -> - lists:filter(fun is_binary/1, Actions). + [ + emqx_bridge_resource:bridge_id(BridgeType, BridgeName) + || {bridge, BridgeType, BridgeName, _ResId} <- Actions + ]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 2583a5d03..b53763d47 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -515,13 +515,13 @@ format_datetime(Timestamp, Unit) -> format_action(Actions) -> [do_format_action(Act) || Act <- Actions]. +do_format_action({bridge, BridgeType, BridgeName, _ResId}) -> + emqx_bridge_resource:bridge_id(BridgeType, BridgeName); do_format_action(#{mod := Mod, func := Func, args := Args}) -> #{ function => printable_function_name(Mod, Func), args => maps:remove(preprocessed_tmpl, Args) - }; -do_format_action(BridgeChannelId) when is_binary(BridgeChannelId) -> - BridgeChannelId. + }. printable_function_name(emqx_rule_actions, Func) -> Func; diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 602a7f91e..c0afa0939 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -17,7 +17,6 @@ -module(emqx_rule_runtime). -include("rule_engine.hrl"). --include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_resource/include/emqx_resource_errors.hrl"). @@ -50,8 +49,6 @@ iolist_to_binary(io_lib:format("_v_~ts_~p_~p", [TYPE, NAME, erlang:system_time()])) ). --define(ActionMaxRetry, 3). - %%------------------------------------------------------------------------------ %% Apply rules %%------------------------------------------------------------------------------ @@ -348,10 +345,14 @@ handle_action(RuleId, ActId, Selected, Envs) -> }) end. -do_handle_action(BridgeId, Selected, _Envs) when is_binary(BridgeId) -> - ?TRACE("BRIDGE", "bridge_action", #{bridge_id => BridgeId}), - case emqx_bridge:send_message(BridgeId, Selected) of - {error, {Err, _}} when Err == bridge_not_found; Err == bridge_stopped -> +do_handle_action({bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) -> + ?TRACE( + "BRIDGE", + "bridge_action", + #{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)} + ), + case emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected) of + {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped -> throw(out_of_service); Result -> Result diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index e88529ad1..ca7832717 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -250,14 +250,14 @@ t_kv_store(_) -> t_add_get_remove_rule(_Config) -> RuleId0 = <<"rule-debug-0">>, - ok = emqx_rule_engine:insert_rule(make_simple_rule(RuleId0)), + ok = create_rule(make_simple_rule(RuleId0)), ?assertMatch({ok, #{id := RuleId0}}, emqx_rule_engine:get_rule(RuleId0)), ok = delete_rule(RuleId0), ?assertEqual(not_found, emqx_rule_engine:get_rule(RuleId0)), RuleId1 = <<"rule-debug-1">>, Rule1 = make_simple_rule(RuleId1), - ok = emqx_rule_engine:insert_rule(Rule1), + ok = create_rule(Rule1), ?assertMatch({ok, #{id := RuleId1}}, emqx_rule_engine:get_rule(RuleId1)), ok = delete_rule(Rule1), ?assertEqual(not_found, emqx_rule_engine:get_rule(RuleId1)), @@ -265,7 +265,7 @@ t_add_get_remove_rule(_Config) -> t_add_get_remove_rules(_Config) -> delete_rules_by_ids([Id || #{id := Id} <- emqx_rule_engine:get_rules()]), - ok = insert_rules( + ok = create_rules( [ make_simple_rule(<<"rule-debug-1">>), make_simple_rule(<<"rule-debug-2">>) @@ -294,7 +294,7 @@ t_create_existing_rule(_Config) -> t_get_rules_for_topic(_Config) -> Len0 = length(emqx_rule_engine:get_rules_for_topic(<<"simple/topic">>)), - ok = insert_rules( + ok = create_rules( [ make_simple_rule(<<"rule-debug-1">>), make_simple_rule(<<"rule-debug-2">>) @@ -305,12 +305,12 @@ t_get_rules_for_topic(_Config) -> ok. t_get_rules_ordered_by_ts(_Config) -> - Now = fun() -> erlang:system_time(nanosecond) end, - ok = insert_rules( + Now = erlang:system_time(microsecond), + ok = create_rules( [ - make_simple_rule_with_ts(<<"rule-debug-0">>, Now()), - make_simple_rule_with_ts(<<"rule-debug-1">>, Now()), - make_simple_rule_with_ts(<<"rule-debug-2">>, Now()) + make_simple_rule_with_ts(<<"rule-debug-0">>, Now + 1), + make_simple_rule_with_ts(<<"rule-debug-1">>, Now + 2), + make_simple_rule_with_ts(<<"rule-debug-2">>, Now + 3) ] ), ?assertMatch( @@ -324,23 +324,19 @@ t_get_rules_ordered_by_ts(_Config) -> t_get_rules_for_topic_2(_Config) -> Len0 = length(emqx_rule_engine:get_rules_for_topic(<<"simple/1">>)), - ok = insert_rules( + ok = create_rules( [ - make_simple_rule(<<"rule-debug-1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]), - make_simple_rule(<<"rule-debug-2">>, <<"select * from \"simple/+\"">>, [<<"simple/+">>]), - make_simple_rule(<<"rule-debug-3">>, <<"select * from \"simple/+/1\"">>, [ - <<"simple/+/1">> - ]), - make_simple_rule(<<"rule-debug-4">>, <<"select * from \"simple/1\"">>, [<<"simple/1">>]), + make_simple_rule(<<"rule-debug-1">>, _1 = <<"select * from \"simple/#\"">>), + make_simple_rule(<<"rule-debug-2">>, _2 = <<"select * from \"simple/+\"">>), + make_simple_rule(<<"rule-debug-3">>, <<"select * from \"simple/+/1\"">>), + make_simple_rule(<<"rule-debug-4">>, _3 = <<"select * from \"simple/1\"">>), make_simple_rule( <<"rule-debug-5">>, - <<"select * from \"simple/2,simple/+,simple/3\"">>, - [<<"simple/2">>, <<"simple/+">>, <<"simple/3">>] + _4 = <<"select * from \"simple/2\", \"simple/+\", \"simple/3\"">> ), make_simple_rule( <<"rule-debug-6">>, - <<"select * from \"simple/2,simple/3,simple/4\"">>, - [<<"simple/2">>, <<"simple/3">>, <<"simple/4">>] + <<"select * from \"simple/2\", \"simple/3\", \"simple/4\"">> ) ] ), @@ -367,52 +363,44 @@ t_get_rules_with_same_event(_Config) -> ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_delivered">>)), ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_acked">>)), ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_dropped">>)), - ok = insert_rules( + ok = create_rules( [ - make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]), - make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>, [<<"abc/+">>]), + make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>), + make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>), make_simple_rule( <<"r3">>, - <<"select * from \"$events/client_connected\"">>, - [<<"$events/client_connected">>] + <<"select * from \"$events/client_connected\"">> ), make_simple_rule( <<"r4">>, - <<"select * from \"$events/client_disconnected\"">>, - [<<"$events/client_disconnected">>] + <<"select * from \"$events/client_disconnected\"">> ), make_simple_rule( <<"r5">>, - <<"select * from \"$events/session_subscribed\"">>, - [<<"$events/session_subscribed">>] + <<"select * from \"$events/session_subscribed\"">> ), make_simple_rule( <<"r6">>, - <<"select * from \"$events/session_unsubscribed\"">>, - [<<"$events/session_unsubscribed">>] + <<"select * from \"$events/session_unsubscribed\"">> ), make_simple_rule( <<"r7">>, - <<"select * from \"$events/message_delivered\"">>, - [<<"$events/message_delivered">>] + <<"select * from \"$events/message_delivered\"">> ), make_simple_rule( <<"r8">>, - <<"select * from \"$events/message_acked\"">>, - [<<"$events/message_acked">>] + <<"select * from \"$events/message_acked\"">> ), make_simple_rule( <<"r9">>, - <<"select * from \"$events/message_dropped\"">>, - [<<"$events/message_dropped">>] + <<"select * from \"$events/message_dropped\"">> ), make_simple_rule( <<"r10">>, << - "select * from \"t/1, " - "$events/session_subscribed, $events/client_connected\"" - >>, - [<<"t/1">>, <<"$events/session_subscribed">>, <<"$events/client_connected">>] + "select * from \"t/1\", " + "\"$events/session_subscribed\", \"$events/client_connected\"" + >> ) ] ), @@ -455,23 +443,18 @@ t_get_rules_with_same_event(_Config) -> t_get_rule_ids_by_action(_) -> ID = <<"t_get_rule_ids_by_action">>, Rule1 = #{ - enable => false, id => ID, sql => <<"SELECT * FROM \"t\"">>, - from => [<<"t">>], - fields => [<<"*">>], - is_foreach => false, - conditions => {}, actions => [ - #{mod => emqx_rule_actions, func => console, args => #{}}, - #{mod => emqx_rule_actions, func => republish, args => #{}}, + #{function => console, args => #{}}, + #{function => republish, args => #{}}, <<"mqtt:my_mqtt_bridge">>, <<"mysql:foo">> ], description => ID, created_at => erlang:system_time(millisecond) }, - ok = insert_rules([Rule1]), + ok = create_rules([Rule1]), ?assertMatch( [ID], emqx_rule_engine:get_rule_ids_by_action(#{function => <<"emqx_rule_actions:console">>}) @@ -2834,26 +2817,20 @@ republish_action(Topic, Payload, UserProperties) -> make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) -> SQL = <<"select * from \"simple/topic\"">>, - Topics = [<<"simple/topic">>], - make_simple_rule(RuleId, SQL, Topics, Ts). + make_simple_rule(RuleId, SQL, Ts). make_simple_rule(RuleId) when is_binary(RuleId) -> SQL = <<"select * from \"simple/topic\"">>, - Topics = [<<"simple/topic">>], - make_simple_rule(RuleId, SQL, Topics). + make_simple_rule(RuleId, SQL). -make_simple_rule(RuleId, SQL, Topics) when is_binary(RuleId) -> - make_simple_rule(RuleId, SQL, Topics, erlang:system_time(millisecond)). +make_simple_rule(RuleId, SQL) when is_binary(RuleId) -> + make_simple_rule(RuleId, SQL, erlang:system_time(millisecond)). -make_simple_rule(RuleId, SQL, Topics, Ts) when is_binary(RuleId) -> +make_simple_rule(RuleId, SQL, Ts) when is_binary(RuleId) -> #{ id => RuleId, sql => SQL, - from => Topics, - fields => [<<"*">>], - is_foreach => false, - conditions => {}, - actions => [#{mod => emqx_rule_actions, func => console, args => #{}}], + actions => [#{function => console, args => #{}}], description => <<"simple rule">>, created_at => Ts }. @@ -3233,13 +3210,12 @@ deps_path(App, RelativePath) -> local_path(RelativePath) -> deps_path(emqx_rule_engine, RelativePath). -insert_rules(Rules) -> - lists:foreach( - fun(Rule) -> - ok = emqx_rule_engine:insert_rule(Rule) - end, - Rules - ). +create_rules(Rules) -> + lists:foreach(fun create_rule/1, Rules). + +create_rule(Rule) -> + {ok, _} = emqx_rule_engine:create_rule(Rule), + ok. delete_rules_by_ids(Ids) -> lists:foreach(