Merge pull request #10900 from keynslug/fix/EEC-856/parsed-bridge-id

fix(ruleeng): parse bridge id on create / update
This commit is contained in:
Andrew Mayorov 2023-06-01 10:21:26 +03:00 committed by GitHub
commit b5f24c4f88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 109 additions and 110 deletions

View File

@ -42,7 +42,10 @@
list/0
]).
-export([send_message/2]).
-export([
send_message/2,
send_message/4
]).
-export([config_key_path/0]).
@ -199,14 +202,17 @@ send_to_matched_egress_bridges(Topic, Msg) ->
send_message(BridgeId, Message) ->
{BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
send_message(BridgeType, BridgeName, ResId, Message).
send_message(BridgeType, BridgeName, ResId, Message) ->
case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
not_found ->
{error, {bridge_not_found, BridgeId}};
{error, bridge_not_found};
#{enable := true} = Config ->
QueryOpts = query_opts(Config),
emqx_resource:query(ResId, {send_message, Message}, QueryOpts);
#{enable := false} ->
{error, {bridge_stopped, BridgeId}}
{error, bridge_stopped}
end.
query_opts(Config) ->

View File

@ -112,7 +112,10 @@ validate_name(Name0, Opts) ->
case lists:all(fun is_id_char/1, Name) of
true ->
case maps:get(atom_name, Opts, true) of
true -> list_to_existing_atom(Name);
% NOTE
% Rule may be created before bridge, thus not `list_to_existing_atom/1`,
% also it is infrequent user input anyway.
true -> list_to_atom(Name);
false -> Name0
end;
false ->

View File

@ -146,21 +146,25 @@ get_action_mod_func(ActionFunc) when is_binary(ActionFunc) ->
try binary_to_existing_atom(Bin) of
Atom -> Atom
catch
error:badarg -> error({unknown_action_function, ActionFunc})
error:badarg -> validation_error(unknown_action_function)
end
end,
case string:split(ActionFunc, ":", all) of
[Func1] -> {emqx_rule_actions, ToAtom(Func1)};
[Mod1, Func1] -> {ToAtom(Mod1), ToAtom(Func1)};
_ -> error({invalid_action_function, ActionFunc})
_ -> validation_error(invalid_action_function)
end.
assert_function_supported(Mod, Func) ->
case erlang:function_exported(Mod, Func, 3) of
true -> ok;
false -> error({action_function_not_supported, Func})
false -> validation_error(action_function_not_supported)
end.
-spec validation_error(any()) -> no_return().
validation_error(Reason) ->
throw(#{kind => validation_error, reason => Reason}).
pre_process_args(Mod, Func, Args) ->
case erlang:function_exported(Mod, pre_process_action_args, 2) of
true -> Mod:pre_process_action_args(Func, Args);

View File

@ -36,7 +36,6 @@
-export([
create_rule/1,
insert_rule/1,
update_rule/1,
delete_rule/1,
get_rule/1
@ -116,25 +115,30 @@ start_link() ->
post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) ->
#{added := Added, removed := Removed, changed := Updated} =
emqx_utils_maps:diff_maps(NewRules, OldRules),
maps_foreach(
fun({Id, {_Old, New}}) ->
{ok, _} = update_rule(New#{id => bin(Id)})
end,
Updated
),
maps_foreach(
fun({Id, _Rule}) ->
ok = delete_rule(bin(Id))
end,
Removed
),
maps_foreach(
fun({Id, Rule}) ->
{ok, _} = create_rule(Rule#{id => bin(Id)})
end,
Added
),
{ok, get_rules()}.
try
maps_foreach(
fun({Id, {_Old, New}}) ->
{ok, _} = update_rule(New#{id => bin(Id)})
end,
Updated
),
maps_foreach(
fun({Id, _Rule}) ->
ok = delete_rule(bin(Id))
end,
Removed
),
maps_foreach(
fun({Id, Rule}) ->
{ok, _} = create_rule(Rule#{id => bin(Id)})
end,
Added
),
{ok, get_rules()}
catch
throw:#{kind := _} = Error ->
{error, Error}
end.
%%------------------------------------------------------------------------------
%% APIs for rules
@ -154,7 +158,7 @@ load_rules() ->
-spec create_rule(map()) -> {ok, rule()} | {error, term()}.
create_rule(Params) ->
create_rule(Params, now_ms()).
create_rule(Params, maps:get(created_at, Params, now_ms())).
create_rule(Params = #{id := RuleId}, CreatedAt) when is_binary(RuleId) ->
case get_rule(RuleId) of
@ -451,8 +455,9 @@ parse_actions(Actions) ->
do_parse_action(Action) when is_map(Action) ->
emqx_rule_actions:parse_action(Action);
do_parse_action(BridgeChannelId) when is_binary(BridgeChannelId) ->
BridgeChannelId.
do_parse_action(BridgeId) when is_binary(BridgeId) ->
{Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId),
{bridge, Type, Name, emqx_bridge_resource:resource_id(Type, Name)}.
get_all_records(Tab) ->
[Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)].
@ -484,7 +489,8 @@ contains_actions(Actions, Mod0, Func0) ->
).
forwards_to_bridge(Actions, BridgeId) ->
lists:any(fun(A) -> A =:= BridgeId end, Actions).
Action = do_parse_action(BridgeId),
lists:any(fun(A) -> A =:= Action end, Actions).
references_ingress_bridge(Froms, BridgeId) ->
lists:member(
@ -506,4 +512,7 @@ get_referenced_hookpoints(Froms) ->
].
get_egress_bridges(Actions) ->
lists:filter(fun is_binary/1, Actions).
[
emqx_bridge_resource:bridge_id(BridgeType, BridgeName)
|| {bridge, BridgeType, BridgeName, _ResId} <- Actions
].

View File

@ -515,13 +515,13 @@ format_datetime(Timestamp, Unit) ->
format_action(Actions) ->
[do_format_action(Act) || Act <- Actions].
do_format_action({bridge, BridgeType, BridgeName, _ResId}) ->
emqx_bridge_resource:bridge_id(BridgeType, BridgeName);
do_format_action(#{mod := Mod, func := Func, args := Args}) ->
#{
function => printable_function_name(Mod, Func),
args => maps:remove(preprocessed_tmpl, Args)
};
do_format_action(BridgeChannelId) when is_binary(BridgeChannelId) ->
BridgeChannelId.
}.
printable_function_name(emqx_rule_actions, Func) ->
Func;

View File

@ -17,7 +17,6 @@
-module(emqx_rule_runtime).
-include("rule_engine.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_resource/include/emqx_resource_errors.hrl").
@ -50,8 +49,6 @@
iolist_to_binary(io_lib:format("_v_~ts_~p_~p", [TYPE, NAME, erlang:system_time()]))
).
-define(ActionMaxRetry, 3).
%%------------------------------------------------------------------------------
%% Apply rules
%%------------------------------------------------------------------------------
@ -348,10 +345,14 @@ handle_action(RuleId, ActId, Selected, Envs) ->
})
end.
do_handle_action(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
?TRACE("BRIDGE", "bridge_action", #{bridge_id => BridgeId}),
case emqx_bridge:send_message(BridgeId, Selected) of
{error, {Err, _}} when Err == bridge_not_found; Err == bridge_stopped ->
do_handle_action({bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) ->
?TRACE(
"BRIDGE",
"bridge_action",
#{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
),
case emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected) of
{error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
throw(out_of_service);
Result ->
Result

View File

@ -250,14 +250,14 @@ t_kv_store(_) ->
t_add_get_remove_rule(_Config) ->
RuleId0 = <<"rule-debug-0">>,
ok = emqx_rule_engine:insert_rule(make_simple_rule(RuleId0)),
ok = create_rule(make_simple_rule(RuleId0)),
?assertMatch({ok, #{id := RuleId0}}, emqx_rule_engine:get_rule(RuleId0)),
ok = delete_rule(RuleId0),
?assertEqual(not_found, emqx_rule_engine:get_rule(RuleId0)),
RuleId1 = <<"rule-debug-1">>,
Rule1 = make_simple_rule(RuleId1),
ok = emqx_rule_engine:insert_rule(Rule1),
ok = create_rule(Rule1),
?assertMatch({ok, #{id := RuleId1}}, emqx_rule_engine:get_rule(RuleId1)),
ok = delete_rule(Rule1),
?assertEqual(not_found, emqx_rule_engine:get_rule(RuleId1)),
@ -265,7 +265,7 @@ t_add_get_remove_rule(_Config) ->
t_add_get_remove_rules(_Config) ->
delete_rules_by_ids([Id || #{id := Id} <- emqx_rule_engine:get_rules()]),
ok = insert_rules(
ok = create_rules(
[
make_simple_rule(<<"rule-debug-1">>),
make_simple_rule(<<"rule-debug-2">>)
@ -294,7 +294,7 @@ t_create_existing_rule(_Config) ->
t_get_rules_for_topic(_Config) ->
Len0 = length(emqx_rule_engine:get_rules_for_topic(<<"simple/topic">>)),
ok = insert_rules(
ok = create_rules(
[
make_simple_rule(<<"rule-debug-1">>),
make_simple_rule(<<"rule-debug-2">>)
@ -305,12 +305,12 @@ t_get_rules_for_topic(_Config) ->
ok.
t_get_rules_ordered_by_ts(_Config) ->
Now = fun() -> erlang:system_time(nanosecond) end,
ok = insert_rules(
Now = erlang:system_time(microsecond),
ok = create_rules(
[
make_simple_rule_with_ts(<<"rule-debug-0">>, Now()),
make_simple_rule_with_ts(<<"rule-debug-1">>, Now()),
make_simple_rule_with_ts(<<"rule-debug-2">>, Now())
make_simple_rule_with_ts(<<"rule-debug-0">>, Now + 1),
make_simple_rule_with_ts(<<"rule-debug-1">>, Now + 2),
make_simple_rule_with_ts(<<"rule-debug-2">>, Now + 3)
]
),
?assertMatch(
@ -324,23 +324,19 @@ t_get_rules_ordered_by_ts(_Config) ->
t_get_rules_for_topic_2(_Config) ->
Len0 = length(emqx_rule_engine:get_rules_for_topic(<<"simple/1">>)),
ok = insert_rules(
ok = create_rules(
[
make_simple_rule(<<"rule-debug-1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]),
make_simple_rule(<<"rule-debug-2">>, <<"select * from \"simple/+\"">>, [<<"simple/+">>]),
make_simple_rule(<<"rule-debug-3">>, <<"select * from \"simple/+/1\"">>, [
<<"simple/+/1">>
]),
make_simple_rule(<<"rule-debug-4">>, <<"select * from \"simple/1\"">>, [<<"simple/1">>]),
make_simple_rule(<<"rule-debug-1">>, _1 = <<"select * from \"simple/#\"">>),
make_simple_rule(<<"rule-debug-2">>, _2 = <<"select * from \"simple/+\"">>),
make_simple_rule(<<"rule-debug-3">>, <<"select * from \"simple/+/1\"">>),
make_simple_rule(<<"rule-debug-4">>, _3 = <<"select * from \"simple/1\"">>),
make_simple_rule(
<<"rule-debug-5">>,
<<"select * from \"simple/2,simple/+,simple/3\"">>,
[<<"simple/2">>, <<"simple/+">>, <<"simple/3">>]
_4 = <<"select * from \"simple/2\", \"simple/+\", \"simple/3\"">>
),
make_simple_rule(
<<"rule-debug-6">>,
<<"select * from \"simple/2,simple/3,simple/4\"">>,
[<<"simple/2">>, <<"simple/3">>, <<"simple/4">>]
<<"select * from \"simple/2\", \"simple/3\", \"simple/4\"">>
)
]
),
@ -367,52 +363,44 @@ t_get_rules_with_same_event(_Config) ->
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_delivered">>)),
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_acked">>)),
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_dropped">>)),
ok = insert_rules(
ok = create_rules(
[
make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]),
make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>, [<<"abc/+">>]),
make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>),
make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>),
make_simple_rule(
<<"r3">>,
<<"select * from \"$events/client_connected\"">>,
[<<"$events/client_connected">>]
<<"select * from \"$events/client_connected\"">>
),
make_simple_rule(
<<"r4">>,
<<"select * from \"$events/client_disconnected\"">>,
[<<"$events/client_disconnected">>]
<<"select * from \"$events/client_disconnected\"">>
),
make_simple_rule(
<<"r5">>,
<<"select * from \"$events/session_subscribed\"">>,
[<<"$events/session_subscribed">>]
<<"select * from \"$events/session_subscribed\"">>
),
make_simple_rule(
<<"r6">>,
<<"select * from \"$events/session_unsubscribed\"">>,
[<<"$events/session_unsubscribed">>]
<<"select * from \"$events/session_unsubscribed\"">>
),
make_simple_rule(
<<"r7">>,
<<"select * from \"$events/message_delivered\"">>,
[<<"$events/message_delivered">>]
<<"select * from \"$events/message_delivered\"">>
),
make_simple_rule(
<<"r8">>,
<<"select * from \"$events/message_acked\"">>,
[<<"$events/message_acked">>]
<<"select * from \"$events/message_acked\"">>
),
make_simple_rule(
<<"r9">>,
<<"select * from \"$events/message_dropped\"">>,
[<<"$events/message_dropped">>]
<<"select * from \"$events/message_dropped\"">>
),
make_simple_rule(
<<"r10">>,
<<
"select * from \"t/1, "
"$events/session_subscribed, $events/client_connected\""
>>,
[<<"t/1">>, <<"$events/session_subscribed">>, <<"$events/client_connected">>]
"select * from \"t/1\", "
"\"$events/session_subscribed\", \"$events/client_connected\""
>>
)
]
),
@ -455,23 +443,18 @@ t_get_rules_with_same_event(_Config) ->
t_get_rule_ids_by_action(_) ->
ID = <<"t_get_rule_ids_by_action">>,
Rule1 = #{
enable => false,
id => ID,
sql => <<"SELECT * FROM \"t\"">>,
from => [<<"t">>],
fields => [<<"*">>],
is_foreach => false,
conditions => {},
actions => [
#{mod => emqx_rule_actions, func => console, args => #{}},
#{mod => emqx_rule_actions, func => republish, args => #{}},
#{function => console, args => #{}},
#{function => republish, args => #{}},
<<"mqtt:my_mqtt_bridge">>,
<<"mysql:foo">>
],
description => ID,
created_at => erlang:system_time(millisecond)
},
ok = insert_rules([Rule1]),
ok = create_rules([Rule1]),
?assertMatch(
[ID],
emqx_rule_engine:get_rule_ids_by_action(#{function => <<"emqx_rule_actions:console">>})
@ -2834,26 +2817,20 @@ republish_action(Topic, Payload, UserProperties) ->
make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
SQL = <<"select * from \"simple/topic\"">>,
Topics = [<<"simple/topic">>],
make_simple_rule(RuleId, SQL, Topics, Ts).
make_simple_rule(RuleId, SQL, Ts).
make_simple_rule(RuleId) when is_binary(RuleId) ->
SQL = <<"select * from \"simple/topic\"">>,
Topics = [<<"simple/topic">>],
make_simple_rule(RuleId, SQL, Topics).
make_simple_rule(RuleId, SQL).
make_simple_rule(RuleId, SQL, Topics) when is_binary(RuleId) ->
make_simple_rule(RuleId, SQL, Topics, erlang:system_time(millisecond)).
make_simple_rule(RuleId, SQL) when is_binary(RuleId) ->
make_simple_rule(RuleId, SQL, erlang:system_time(millisecond)).
make_simple_rule(RuleId, SQL, Topics, Ts) when is_binary(RuleId) ->
make_simple_rule(RuleId, SQL, Ts) when is_binary(RuleId) ->
#{
id => RuleId,
sql => SQL,
from => Topics,
fields => [<<"*">>],
is_foreach => false,
conditions => {},
actions => [#{mod => emqx_rule_actions, func => console, args => #{}}],
actions => [#{function => console, args => #{}}],
description => <<"simple rule">>,
created_at => Ts
}.
@ -3233,13 +3210,12 @@ deps_path(App, RelativePath) ->
local_path(RelativePath) ->
deps_path(emqx_rule_engine, RelativePath).
insert_rules(Rules) ->
lists:foreach(
fun(Rule) ->
ok = emqx_rule_engine:insert_rule(Rule)
end,
Rules
).
create_rules(Rules) ->
lists:foreach(fun create_rule/1, Rules).
create_rule(Rule) ->
{ok, _} = emqx_rule_engine:create_rule(Rule),
ok.
delete_rules_by_ids(Ids) ->
lists:foreach(