refactor(rule): move emqx_rule_utils.erl -> emqx_plugin_libs_rule.erl

This commit is contained in:
Shawn 2021-09-07 19:56:33 +08:00
parent 39bb1b8d9d
commit 1dae970cd3
13 changed files with 183 additions and 183 deletions

View File

@ -233,8 +233,8 @@ feedvar(Override, Packet, ConnInfo, ClientInfo) ->
, 'Packet' => connect_packet_to_map(Packet)
},
maps:map(fun(_K, V) ->
Tokens = emqx_rule_utils:preproc_tmpl(V),
emqx_rule_utils:proc_tmpl(Tokens, Envs)
Tokens = emqx_plugin_libs_rule:preproc_tmpl(V),
emqx_plugin_libs_rule:proc_tmpl(Tokens, Envs)
end, Override).
connect_packet_to_map(#mqtt_sn_message{}) ->

View File

@ -231,8 +231,8 @@ feedvar(Override, Packet, ConnInfo, ClientInfo) ->
, 'Packet' => connect_packet_to_map(Packet)
},
maps:map(fun(_K, V) ->
Tokens = emqx_rule_utils:preproc_tmpl(V),
emqx_rule_utils:proc_tmpl(Tokens, Envs)
Tokens = emqx_plugin_libs_rule:preproc_tmpl(V),
emqx_plugin_libs_rule:proc_tmpl(Tokens, Envs)
end, Override).
connect_packet_to_map(#stomp_frame{headers = Headers}) ->

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_utils).
-module(emqx_plugin_libs_rule).
%% preprocess and process tempalte string with place holders
-export([ preproc_tmpl/1

View File

@ -0,0 +1,136 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_utils_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-define(PORT, 9876).
all() -> emqx_ct:all(?MODULE).
t_http_connectivity(_) ->
{ok, Socket} = gen_tcp:listen(?PORT, []),
ok = emqx_plugin_libs_rule:http_connectivity("http://127.0.0.1:"++emqx_plugin_libs_rule:str(?PORT), 1000),
gen_tcp:close(Socket),
{error, _} = emqx_plugin_libs_rule:http_connectivity("http://127.0.0.1:"++emqx_plugin_libs_rule:str(?PORT), 1000).
t_tcp_connectivity(_) ->
{ok, Socket} = gen_tcp:listen(?PORT, []),
ok = emqx_plugin_libs_rule:tcp_connectivity("127.0.0.1", ?PORT, 1000),
gen_tcp:close(Socket),
{error, _} = emqx_plugin_libs_rule:tcp_connectivity("127.0.0.1", ?PORT, 1000).
t_str(_) ->
?assertEqual("abc", emqx_plugin_libs_rule:str("abc")),
?assertEqual("abc", emqx_plugin_libs_rule:str(abc)),
?assertEqual("{\"a\":1}", emqx_plugin_libs_rule:str(#{a => 1})),
?assertEqual("1", emqx_plugin_libs_rule:str(1)),
?assertEqual("2.0", emqx_plugin_libs_rule:str(2.0)),
?assertEqual("true", emqx_plugin_libs_rule:str(true)),
?assertError(_, emqx_plugin_libs_rule:str({a, v})).
t_bin(_) ->
?assertEqual(<<"abc">>, emqx_plugin_libs_rule:bin("abc")),
?assertEqual(<<"abc">>, emqx_plugin_libs_rule:bin(abc)),
?assertEqual(<<"{\"a\":1}">>, emqx_plugin_libs_rule:bin(#{a => 1})),
?assertEqual(<<"[{\"a\":1}]">>, emqx_plugin_libs_rule:bin([#{a => 1}])),
?assertEqual(<<"1">>, emqx_plugin_libs_rule:bin(1)),
?assertEqual(<<"2.0">>, emqx_plugin_libs_rule:bin(2.0)),
?assertEqual(<<"true">>, emqx_plugin_libs_rule:bin(true)),
?assertError(_, emqx_plugin_libs_rule:bin({a, v})).
t_atom_key(_) ->
_ = erlang, _ = port,
?assertEqual([erlang], emqx_plugin_libs_rule:atom_key([<<"erlang">>])),
?assertEqual([erlang, port], emqx_plugin_libs_rule:atom_key([<<"erlang">>, port])),
?assertEqual([erlang, port], emqx_plugin_libs_rule:atom_key([<<"erlang">>, <<"port">>])),
?assertEqual(erlang, emqx_plugin_libs_rule:atom_key(<<"erlang">>)),
?assertError({invalid_key, {a, v}}, emqx_plugin_libs_rule:atom_key({a, v})),
_ = xyz876gv123,
?assertEqual([xyz876gv123, port], emqx_plugin_libs_rule:atom_key([<<"xyz876gv123">>, port])).
t_unsafe_atom_key(_) ->
?assertEqual([xyz876gv], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv">>])),
?assertEqual([xyz876gv33, port], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv33">>, port])),
?assertEqual([xyz876gv331, port1221], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv331">>, <<"port1221">>])),
?assertEqual(xyz876gv3312, emqx_plugin_libs_rule:unsafe_atom_key(<<"xyz876gv3312">>)).
t_proc_tmpl(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
Tks = emqx_plugin_libs_rule:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:1,b:1,c:1.0,d:{\"d1\":\"hi\"}">>,
emqx_plugin_libs_rule:proc_tmpl(Tks, Selected)).
t_proc_tmpl1(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
Tks = emqx_plugin_libs_rule:preproc_tmpl(<<"a:$a,b:b},c:{c},d:${d">>),
?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>,
emqx_plugin_libs_rule:proc_tmpl(Tks, Selected)).
t_proc_cmd(_) ->
Selected = #{v0 => <<"x">>, v1 => <<"1">>, v2 => #{d1 => <<"hi">>}},
Tks = emqx_plugin_libs_rule:preproc_cmd(<<"hset name a:${v0} ${v1} b ${v2} ">>),
?assertEqual([<<"hset">>, <<"name">>,
<<"a:x">>, <<"1">>,
<<"b">>, <<"{\"d1\":\"hi\"}">>],
emqx_plugin_libs_rule:proc_cmd(Tks, Selected)).
t_preproc_sql(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
{PrepareStatement, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '?'),
?assertEqual(<<"a:?,b:?,c:?,d:?">>, PrepareStatement),
?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>],
emqx_plugin_libs_rule:proc_sql(ParamsTokens, Selected)).
t_preproc_sql1(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
{PrepareStatement, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '$n'),
?assertEqual(<<"a:$1,b:$2,c:$3,d:$4">>, PrepareStatement),
?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>],
emqx_plugin_libs_rule:proc_sql(ParamsTokens, Selected)).
t_preproc_sql2(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
{PrepareStatement, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(<<"a:$a,b:b},c:{c},d:${d">>, '?'),
?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>, PrepareStatement),
?assertEqual([], emqx_plugin_libs_rule:proc_sql(ParamsTokens, Selected)).
t_preproc_sql3(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
ParamsTokens = emqx_plugin_libs_rule:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}'">>,
emqx_plugin_libs_rule:proc_sql_param_str(ParamsTokens, Selected)).
t_preproc_sql4(_) ->
%% with apostrophes
%% https://github.com/emqx/emqx/issues/4135
Selected = #{a => <<"1''2">>, b => 1, c => 1.0,
d => #{d1 => <<"someone's phone">>}},
ParamsTokens = emqx_plugin_libs_rule:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:'1\\'\\'2',b:1,c:1.0,d:'{\"d1\":\"someone\\'s phone\"}'">>,
emqx_plugin_libs_rule:proc_sql_param_str(ParamsTokens, Selected)).
t_preproc_sql5(_) ->
%% with apostrophes for cassandra
%% https://github.com/emqx/emqx/issues/4148
Selected = #{a => <<"1''2">>, b => 1, c => 1.0,
d => #{d1 => <<"someone's phone">>}},
ParamsTokens = emqx_plugin_libs_rule:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>,
emqx_plugin_libs_rule:proc_cql_param_str(ParamsTokens, Selected)).

View File

@ -22,7 +22,7 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_rule_engine/include/rule_actions.hrl").
-import(emqx_rule_utils, [str/1]).
-import(emqx_plugin_libs_rule, [str/1]).
-export([ on_resource_create/2
, on_get_resource_status/2
@ -443,10 +443,10 @@ on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName,
<<"forward_topic">> := ForwardTopic,
<<"payload_tmpl">> := PayloadTmpl}) ->
?LOG(info, "Initiating Action ~p.", [?FUNCTION_NAME]),
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
PayloadTks = emqx_plugin_libs_rule:preproc_tmpl(PayloadTmpl),
TopicTks = case ForwardTopic == <<"">> of
true -> undefined;
false -> emqx_rule_utils:preproc_tmpl(ForwardTopic)
false -> emqx_plugin_libs_rule:preproc_tmpl(ForwardTopic)
end,
Opts.
@ -461,7 +461,7 @@ on_action_data_to_mqtt_broker(Msg, _Env =
}}) ->
Topic1 = case TopicTks =:= undefined of
true -> Topic;
false -> emqx_rule_utils:proc_tmpl(TopicTks, Msg)
false -> emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg)
end,
BrokerMsg = #message{id = Id,
qos = QoS,
@ -480,7 +480,7 @@ format_data([], Msg) ->
emqx_json:encode(Msg);
format_data(Tokens, Msg) ->
emqx_rule_utils:proc_tmpl(Tokens, Msg).
emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg).
subscriptions(Subscriptions) ->
scan_binary(<<"[", Subscriptions/binary, "].">>).

View File

@ -237,8 +237,8 @@ on_action_create_data_to_webserver(Id, Params) ->
body := Body,
pool := Pool,
request_timeout := RequestTimeout} = parse_action_params(Params),
BodyTokens = emqx_rule_utils:preproc_tmpl(Body),
PathTokens = emqx_rule_utils:preproc_tmpl(Path),
BodyTokens = emqx_plugin_libs_rule:preproc_tmpl(Body),
PathTokens = emqx_plugin_libs_rule:preproc_tmpl(Path),
Params.
on_action_data_to_webserver(Selected, _Envs =
@ -252,7 +252,7 @@ on_action_data_to_webserver(Selected, _Envs =
'RequestTimeout' := RequestTimeout},
clientid := ClientID}) ->
NBody = format_msg(BodyTokens, Selected),
NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected),
NPath = emqx_plugin_libs_rule:proc_tmpl(PathTokens, Selected),
Req = create_req(Method, NPath, Headers, NBody),
case ehttpc:request(ehttpc_pool:pick_worker(Pool, ClientID), Method, Req, RequestTimeout) of
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
@ -273,7 +273,7 @@ on_action_data_to_webserver(Selected, _Envs =
format_msg([], Data) ->
emqx_json:encode(Data);
format_msg(Tokens, Data) ->
emqx_rule_utils:proc_tmpl(Tokens, Data).
emqx_plugin_libs_rule:proc_tmpl(Tokens, Data).
%%------------------------------------------------------------------------------
%% Internal functions
@ -366,7 +366,7 @@ get_ssl_opts(Opts, ResId) ->
test_http_connect(Conf) ->
Url = fun() -> maps:get(<<"url">>, Conf) end,
try
emqx_rule_utils:http_connectivity(Url())
emqx_plugin_libs_rule:http_connectivity(Url())
of
ok -> true;
{error, _Reason} ->

View File

@ -135,8 +135,8 @@ on_action_create_republish(Id, Params = #{
<<"target_qos">> := TargetQoS,
<<"payload_tmpl">> := PayloadTmpl
}) ->
TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic),
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
TopicTks = emqx_plugin_libs_rule:preproc_tmpl(TargetTopic),
PayloadTks = emqx_plugin_libs_rule:preproc_tmpl(PayloadTmpl),
Params.
-spec on_action_republish(selected_data(), env_vars()) -> any().
@ -167,8 +167,8 @@ on_action_republish(Selected, _Envs = #{
from = ActId,
flags = Flags,
headers = #{republish_by => ActId},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
payload = emqx_rule_utils:proc_tmpl(PayloadTks, Selected),
topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
timestamp = Timestamp
});
@ -190,8 +190,8 @@ on_action_republish(Selected, _Envs = #{
from = ActId,
flags = #{dup => false, retain => false},
headers = #{republish_by => ActId},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
payload = emqx_rule_utils:proc_tmpl(PayloadTks, Selected),
topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
timestamp = erlang:system_time(millisecond)
}).

View File

@ -216,7 +216,7 @@ delete_rule(RuleId) ->
case emqx_rule_registry:get_rule(RuleId) of
{ok, Rule = #rule{actions = Actions}} ->
try
_ = emqx_rule_utils:cluster_call(?MODULE, clear_rule, [Rule]),
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, clear_rule, [Rule]),
ok = emqx_rule_registry:remove_rule(Rule)
catch
Error:Reason:ST ->
@ -242,7 +242,7 @@ create_resource(#{type := Type, config := Config0} = Params) ->
ok = emqx_rule_registry:add_resource(Resource),
%% Note that we will return OK in case of resource creation failure,
%% A timer is started to re-start the resource later.
catch _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [M, F, ResId, Config]),
catch _ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_resource, [M, F, ResId, Config]),
{ok, Resource};
not_found ->
{error, {resource_type_not_found, Type}}
@ -280,7 +280,7 @@ do_check_and_update_resource(#{id := Id, type := Type, description := NewDescrip
Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec),
case test_resource(#{type => Type, config => NewConfig}) of
ok ->
_ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [Module, Create, Id, Config]),
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_resource, [Module, Create, Id, Config]),
emqx_rule_registry:add_resource(#resource{
id = Id,
type = Type,
@ -319,8 +319,8 @@ test_resource(#{type := Type, config := Config0}) ->
Config = emqx_rule_validator:validate_params(Config0, ParamSpec),
ResId = resource_id(),
try
_ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [ModC, Create, ResId, Config]),
_ = emqx_rule_utils:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]),
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_resource, [ModC, Create, ResId, Config]),
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]),
ok
catch
throw:Reason -> {error, Reason}
@ -359,7 +359,7 @@ delete_resource(ResId) ->
try
case emqx_rule_registry:remove_resource(ResId) of
ok ->
_ = emqx_rule_utils:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]),
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]),
ok;
{error, _} = R -> R
end
@ -426,7 +426,7 @@ prepare_action(#{name := Name, args := Args0} = Action, NeedInit) ->
ActionInstId = maps:get(id, Action, action_instance_id(Name)),
case NeedInit of
true ->
_ = emqx_rule_utils:cluster_call(?MODULE, init_action, [Mod, Create, ActionInstId,
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_action, [Mod, Create, ActionInstId,
with_resource_params(Args)]),
ok;
false -> ok
@ -485,7 +485,7 @@ may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) ->
may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) ->
%% prepare new actions before removing old ones
NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)),
_ = emqx_rule_utils:cluster_call(?MODULE, clear_actions, [OldActions]),
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, clear_actions, [OldActions]),
may_update_rule_params(Rule#rule{actions = NewActions}, maps:remove(actions, Params));
may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params
Rule.
@ -631,7 +631,7 @@ refresh_actions(Actions, Pred) ->
true ->
{ok, #action{module = Mod, on_create = Create}}
= emqx_rule_registry:find_action(ActName),
_ = emqx_rule_utils:cluster_call(?MODULE, init_action, [Mod, Create, Id, with_resource_params(Args)]),
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_action, [Mod, Create, Id, with_resource_params(Args)]),
refresh_actions(Fallbacks, Pred);
false -> ok
end

View File

@ -503,26 +503,26 @@ do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"signed">>, <<"little">>) ->
%%------------------------------------------------------------------------------
str(Data) ->
emqx_rule_utils:bin(Data).
emqx_plugin_libs_rule:bin(Data).
str_utf8(Data) ->
emqx_rule_utils:utf8_bin(Data).
emqx_plugin_libs_rule:utf8_bin(Data).
bool(Data) ->
emqx_rule_utils:bool(Data).
emqx_plugin_libs_rule:bool(Data).
int(Data) ->
emqx_rule_utils:int(Data).
emqx_plugin_libs_rule:int(Data).
float(Data) ->
emqx_rule_utils:float(Data).
emqx_plugin_libs_rule:float(Data).
float(Data, Decimals) when Decimals > 0 ->
Data1 = ?MODULE:float(Data),
list_to_float(float_to_list(Data1, [{decimals, Decimals}])).
map(Data) ->
emqx_rule_utils:map(Data).
emqx_plugin_libs_rule:map(Data).
bin2hexstr(Bin) when is_binary(Bin) ->
emqx_misc:bin2hexstr_A_F(Bin).
@ -730,7 +730,7 @@ mget(Key, Map, Default) ->
{ok, Val} -> Val;
error when is_atom(Key) ->
%% the map may have an equivalent binary-form key
BinKey = emqx_rule_utils:bin(Key),
BinKey = emqx_plugin_libs_rule:bin(Key),
case maps:find(BinKey, Map) of
{ok, Val} -> Val;
error -> Default
@ -754,7 +754,7 @@ mput(Key, Val, Map) ->
{ok, _} -> maps:put(Key, Val, Map);
error when is_atom(Key) ->
%% the map may have an equivalent binary-form key
BinKey = emqx_rule_utils:bin(Key),
BinKey = emqx_plugin_libs_rule:bin(Key),
case maps:find(BinKey, Map) of
{ok, _} -> maps:put(BinKey, Val, Map);
error -> maps:put(Key, Val, Map)
@ -853,7 +853,7 @@ unix_ts_to_rfc3339(Epoch) ->
unix_ts_to_rfc3339(Epoch, <<"second">>).
unix_ts_to_rfc3339(Epoch, Unit) when is_integer(Epoch) ->
emqx_rule_utils:bin(
emqx_plugin_libs_rule:bin(
calendar:system_time_to_rfc3339(
Epoch, [{unit, time_unit(Unit)}])).

View File

@ -85,7 +85,7 @@ general_find({key, Key}, Map, _OrgData, Handler) when is_map(Map) ->
{ok, Val} -> Handler({found, {{key, Key}, Val}});
error when is_atom(Key) ->
%% the map may have an equivalent binary-form key
BinKey = emqx_rule_utils:bin(Key),
BinKey = emqx_plugin_libs_rule:bin(Key),
case maps:find(BinKey, Map) of
{ok, Val} -> Handler({equivalent, {{key, BinKey}, Val}});
error -> Handler(not_found)

View File

@ -185,7 +185,7 @@ get_rules_ordered_by_ts() ->
-spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
get_rules_for(Topic) ->
[Rule || Rule = #rule{for = For} <- get_rules(),
emqx_rule_utils:can_topic_match_oneof(Topic, For)].
emqx_plugin_libs_rule:can_topic_match_oneof(Topic, For)].
-spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())).
get_rules_with_same_event(Topic) ->
@ -221,7 +221,7 @@ remove_rules(Rules) ->
%% @private
insert_rule(Rule) ->
_ = emqx_rule_utils:cluster_call(?MODULE, load_hooks_for_rule, [Rule]),
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, load_hooks_for_rule, [Rule]),
mnesia:write(?RULE_TAB, Rule, write).
%% @private
@ -231,7 +231,7 @@ delete_rule(RuleId) when is_binary(RuleId) ->
not_found -> ok
end;
delete_rule(Rule) ->
_ = emqx_rule_utils:cluster_call(?MODULE, unload_hooks_for_rule, [Rule]),
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, unload_hooks_for_rule, [Rule]),
mnesia:delete_object(?RULE_TAB, Rule, write).
load_hooks_for_rule(#rule{for = Topics}) ->

View File

@ -38,7 +38,7 @@ test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) ->
case lists:all(fun is_publish_topic/1, EventTopics) of
true ->
%% test if the topic matches the topic filters in the rule
case emqx_rule_utils:can_topic_match_oneof(InTopic, EventTopics) of
case emqx_plugin_libs_rule:can_topic_match_oneof(InTopic, EventTopics) of
true -> test_rule(Sql, Select, Context, EventTopics);
false -> {error, nomatch}
end;
@ -112,7 +112,7 @@ envs_examp(_) ->
topic => <<"t/a">>,
qos => 1,
flags => #{sys => true, event => true},
publish_received_at => emqx_rule_utils:now_ms(),
timestamp => emqx_rule_utils:now_ms(),
publish_received_at => emqx_plugin_libs_rule:now_ms(),
timestamp => emqx_plugin_libs_rule:now_ms(),
node => node()
}.

View File

@ -1,136 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_utils_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-define(PORT, 9876).
all() -> emqx_ct:all(?MODULE).
t_http_connectivity(_) ->
{ok, Socket} = gen_tcp:listen(?PORT, []),
ok = emqx_rule_utils:http_connectivity("http://127.0.0.1:"++emqx_rule_utils:str(?PORT), 1000),
gen_tcp:close(Socket),
{error, _} = emqx_rule_utils:http_connectivity("http://127.0.0.1:"++emqx_rule_utils:str(?PORT), 1000).
t_tcp_connectivity(_) ->
{ok, Socket} = gen_tcp:listen(?PORT, []),
ok = emqx_rule_utils:tcp_connectivity("127.0.0.1", ?PORT, 1000),
gen_tcp:close(Socket),
{error, _} = emqx_rule_utils:tcp_connectivity("127.0.0.1", ?PORT, 1000).
t_str(_) ->
?assertEqual("abc", emqx_rule_utils:str("abc")),
?assertEqual("abc", emqx_rule_utils:str(abc)),
?assertEqual("{\"a\":1}", emqx_rule_utils:str(#{a => 1})),
?assertEqual("1", emqx_rule_utils:str(1)),
?assertEqual("2.0", emqx_rule_utils:str(2.0)),
?assertEqual("true", emqx_rule_utils:str(true)),
?assertError(_, emqx_rule_utils:str({a, v})).
t_bin(_) ->
?assertEqual(<<"abc">>, emqx_rule_utils:bin("abc")),
?assertEqual(<<"abc">>, emqx_rule_utils:bin(abc)),
?assertEqual(<<"{\"a\":1}">>, emqx_rule_utils:bin(#{a => 1})),
?assertEqual(<<"[{\"a\":1}]">>, emqx_rule_utils:bin([#{a => 1}])),
?assertEqual(<<"1">>, emqx_rule_utils:bin(1)),
?assertEqual(<<"2.0">>, emqx_rule_utils:bin(2.0)),
?assertEqual(<<"true">>, emqx_rule_utils:bin(true)),
?assertError(_, emqx_rule_utils:bin({a, v})).
t_atom_key(_) ->
_ = erlang, _ = port,
?assertEqual([erlang], emqx_rule_utils:atom_key([<<"erlang">>])),
?assertEqual([erlang, port], emqx_rule_utils:atom_key([<<"erlang">>, port])),
?assertEqual([erlang, port], emqx_rule_utils:atom_key([<<"erlang">>, <<"port">>])),
?assertEqual(erlang, emqx_rule_utils:atom_key(<<"erlang">>)),
?assertError({invalid_key, {a, v}}, emqx_rule_utils:atom_key({a, v})),
_ = xyz876gv123,
?assertEqual([xyz876gv123, port], emqx_rule_utils:atom_key([<<"xyz876gv123">>, port])).
t_unsafe_atom_key(_) ->
?assertEqual([xyz876gv], emqx_rule_utils:unsafe_atom_key([<<"xyz876gv">>])),
?assertEqual([xyz876gv33, port], emqx_rule_utils:unsafe_atom_key([<<"xyz876gv33">>, port])),
?assertEqual([xyz876gv331, port1221], emqx_rule_utils:unsafe_atom_key([<<"xyz876gv331">>, <<"port1221">>])),
?assertEqual(xyz876gv3312, emqx_rule_utils:unsafe_atom_key(<<"xyz876gv3312">>)).
t_proc_tmpl(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
Tks = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:1,b:1,c:1.0,d:{\"d1\":\"hi\"}">>,
emqx_rule_utils:proc_tmpl(Tks, Selected)).
t_proc_tmpl1(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
Tks = emqx_rule_utils:preproc_tmpl(<<"a:$a,b:b},c:{c},d:${d">>),
?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>,
emqx_rule_utils:proc_tmpl(Tks, Selected)).
t_proc_cmd(_) ->
Selected = #{v0 => <<"x">>, v1 => <<"1">>, v2 => #{d1 => <<"hi">>}},
Tks = emqx_rule_utils:preproc_cmd(<<"hset name a:${v0} ${v1} b ${v2} ">>),
?assertEqual([<<"hset">>, <<"name">>,
<<"a:x">>, <<"1">>,
<<"b">>, <<"{\"d1\":\"hi\"}">>],
emqx_rule_utils:proc_cmd(Tks, Selected)).
t_preproc_sql(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
{PrepareStatement, ParamsTokens} = emqx_rule_utils:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '?'),
?assertEqual(<<"a:?,b:?,c:?,d:?">>, PrepareStatement),
?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>],
emqx_rule_utils:proc_sql(ParamsTokens, Selected)).
t_preproc_sql1(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
{PrepareStatement, ParamsTokens} = emqx_rule_utils:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '$n'),
?assertEqual(<<"a:$1,b:$2,c:$3,d:$4">>, PrepareStatement),
?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>],
emqx_rule_utils:proc_sql(ParamsTokens, Selected)).
t_preproc_sql2(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
{PrepareStatement, ParamsTokens} = emqx_rule_utils:preproc_sql(<<"a:$a,b:b},c:{c},d:${d">>, '?'),
?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>, PrepareStatement),
?assertEqual([], emqx_rule_utils:proc_sql(ParamsTokens, Selected)).
t_preproc_sql3(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}'">>,
emqx_rule_utils:proc_sql_param_str(ParamsTokens, Selected)).
t_preproc_sql4(_) ->
%% with apostrophes
%% https://github.com/emqx/emqx/issues/4135
Selected = #{a => <<"1''2">>, b => 1, c => 1.0,
d => #{d1 => <<"someone's phone">>}},
ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:'1\\'\\'2',b:1,c:1.0,d:'{\"d1\":\"someone\\'s phone\"}'">>,
emqx_rule_utils:proc_sql_param_str(ParamsTokens, Selected)).
t_preproc_sql5(_) ->
%% with apostrophes for cassandra
%% https://github.com/emqx/emqx/issues/4148
Selected = #{a => <<"1''2">>, b => 1, c => 1.0,
d => #{d1 => <<"someone's phone">>}},
ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>,
emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)).