diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index d9576c253..34e6ec8d6 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -233,8 +233,8 @@ feedvar(Override, Packet, ConnInfo, ClientInfo) -> , 'Packet' => connect_packet_to_map(Packet) }, maps:map(fun(_K, V) -> - Tokens = emqx_rule_utils:preproc_tmpl(V), - emqx_rule_utils:proc_tmpl(Tokens, Envs) + Tokens = emqx_plugin_libs_rule:preproc_tmpl(V), + emqx_plugin_libs_rule:proc_tmpl(Tokens, Envs) end, Override). connect_packet_to_map(#mqtt_sn_message{}) -> diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 673535ebd..e55e0a580 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -231,8 +231,8 @@ feedvar(Override, Packet, ConnInfo, ClientInfo) -> , 'Packet' => connect_packet_to_map(Packet) }, maps:map(fun(_K, V) -> - Tokens = emqx_rule_utils:preproc_tmpl(V), - emqx_rule_utils:proc_tmpl(Tokens, Envs) + Tokens = emqx_plugin_libs_rule:preproc_tmpl(V), + emqx_plugin_libs_rule:proc_tmpl(Tokens, Envs) end, Override). connect_packet_to_map(#stomp_frame{headers = Headers}) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl similarity index 99% rename from apps/emqx_rule_engine/src/emqx_rule_utils.erl rename to apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index 2d978ee0d..b3393de04 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_rule_utils). +-module(emqx_plugin_libs_rule). %% preprocess and process tempalte string with place holders -export([ preproc_tmpl/1 diff --git a/apps/emqx_plugin_libs/test/emqx_rule_libs_rule_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_rule_libs_rule_SUITE.erl new file mode 100644 index 000000000..e4c358695 --- /dev/null +++ b/apps/emqx_plugin_libs/test/emqx_rule_libs_rule_SUITE.erl @@ -0,0 +1,136 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_utils_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-define(PORT, 9876). + +all() -> emqx_ct:all(?MODULE). + +t_http_connectivity(_) -> + {ok, Socket} = gen_tcp:listen(?PORT, []), + ok = emqx_plugin_libs_rule:http_connectivity("http://127.0.0.1:"++emqx_plugin_libs_rule:str(?PORT), 1000), + gen_tcp:close(Socket), + {error, _} = emqx_plugin_libs_rule:http_connectivity("http://127.0.0.1:"++emqx_plugin_libs_rule:str(?PORT), 1000). + +t_tcp_connectivity(_) -> + {ok, Socket} = gen_tcp:listen(?PORT, []), + ok = emqx_plugin_libs_rule:tcp_connectivity("127.0.0.1", ?PORT, 1000), + gen_tcp:close(Socket), + {error, _} = emqx_plugin_libs_rule:tcp_connectivity("127.0.0.1", ?PORT, 1000). + +t_str(_) -> + ?assertEqual("abc", emqx_plugin_libs_rule:str("abc")), + ?assertEqual("abc", emqx_plugin_libs_rule:str(abc)), + ?assertEqual("{\"a\":1}", emqx_plugin_libs_rule:str(#{a => 1})), + ?assertEqual("1", emqx_plugin_libs_rule:str(1)), + ?assertEqual("2.0", emqx_plugin_libs_rule:str(2.0)), + ?assertEqual("true", emqx_plugin_libs_rule:str(true)), + ?assertError(_, emqx_plugin_libs_rule:str({a, v})). + +t_bin(_) -> + ?assertEqual(<<"abc">>, emqx_plugin_libs_rule:bin("abc")), + ?assertEqual(<<"abc">>, emqx_plugin_libs_rule:bin(abc)), + ?assertEqual(<<"{\"a\":1}">>, emqx_plugin_libs_rule:bin(#{a => 1})), + ?assertEqual(<<"[{\"a\":1}]">>, emqx_plugin_libs_rule:bin([#{a => 1}])), + ?assertEqual(<<"1">>, emqx_plugin_libs_rule:bin(1)), + ?assertEqual(<<"2.0">>, emqx_plugin_libs_rule:bin(2.0)), + ?assertEqual(<<"true">>, emqx_plugin_libs_rule:bin(true)), + ?assertError(_, emqx_plugin_libs_rule:bin({a, v})). + +t_atom_key(_) -> + _ = erlang, _ = port, + ?assertEqual([erlang], emqx_plugin_libs_rule:atom_key([<<"erlang">>])), + ?assertEqual([erlang, port], emqx_plugin_libs_rule:atom_key([<<"erlang">>, port])), + ?assertEqual([erlang, port], emqx_plugin_libs_rule:atom_key([<<"erlang">>, <<"port">>])), + ?assertEqual(erlang, emqx_plugin_libs_rule:atom_key(<<"erlang">>)), + ?assertError({invalid_key, {a, v}}, emqx_plugin_libs_rule:atom_key({a, v})), + _ = xyz876gv123, + ?assertEqual([xyz876gv123, port], emqx_plugin_libs_rule:atom_key([<<"xyz876gv123">>, port])). + +t_unsafe_atom_key(_) -> + ?assertEqual([xyz876gv], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv">>])), + ?assertEqual([xyz876gv33, port], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv33">>, port])), + ?assertEqual([xyz876gv331, port1221], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv331">>, <<"port1221">>])), + ?assertEqual(xyz876gv3312, emqx_plugin_libs_rule:unsafe_atom_key(<<"xyz876gv3312">>)). + +t_proc_tmpl(_) -> + Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, + Tks = emqx_plugin_libs_rule:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), + ?assertEqual(<<"a:1,b:1,c:1.0,d:{\"d1\":\"hi\"}">>, + emqx_plugin_libs_rule:proc_tmpl(Tks, Selected)). + +t_proc_tmpl1(_) -> + Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, + Tks = emqx_plugin_libs_rule:preproc_tmpl(<<"a:$a,b:b},c:{c},d:${d">>), + ?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>, + emqx_plugin_libs_rule:proc_tmpl(Tks, Selected)). + +t_proc_cmd(_) -> + Selected = #{v0 => <<"x">>, v1 => <<"1">>, v2 => #{d1 => <<"hi">>}}, + Tks = emqx_plugin_libs_rule:preproc_cmd(<<"hset name a:${v0} ${v1} b ${v2} ">>), + ?assertEqual([<<"hset">>, <<"name">>, + <<"a:x">>, <<"1">>, + <<"b">>, <<"{\"d1\":\"hi\"}">>], + emqx_plugin_libs_rule:proc_cmd(Tks, Selected)). + +t_preproc_sql(_) -> + Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, + {PrepareStatement, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '?'), + ?assertEqual(<<"a:?,b:?,c:?,d:?">>, PrepareStatement), + ?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>], + emqx_plugin_libs_rule:proc_sql(ParamsTokens, Selected)). + +t_preproc_sql1(_) -> + Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, + {PrepareStatement, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '$n'), + ?assertEqual(<<"a:$1,b:$2,c:$3,d:$4">>, PrepareStatement), + ?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>], + emqx_plugin_libs_rule:proc_sql(ParamsTokens, Selected)). +t_preproc_sql2(_) -> + Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, + {PrepareStatement, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(<<"a:$a,b:b},c:{c},d:${d">>, '?'), + ?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>, PrepareStatement), + ?assertEqual([], emqx_plugin_libs_rule:proc_sql(ParamsTokens, Selected)). + +t_preproc_sql3(_) -> + Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, + ParamsTokens = emqx_plugin_libs_rule:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), + ?assertEqual(<<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}'">>, + emqx_plugin_libs_rule:proc_sql_param_str(ParamsTokens, Selected)). + +t_preproc_sql4(_) -> + %% with apostrophes + %% https://github.com/emqx/emqx/issues/4135 + Selected = #{a => <<"1''2">>, b => 1, c => 1.0, + d => #{d1 => <<"someone's phone">>}}, + ParamsTokens = emqx_plugin_libs_rule:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), + ?assertEqual(<<"a:'1\\'\\'2',b:1,c:1.0,d:'{\"d1\":\"someone\\'s phone\"}'">>, + emqx_plugin_libs_rule:proc_sql_param_str(ParamsTokens, Selected)). + +t_preproc_sql5(_) -> + %% with apostrophes for cassandra + %% https://github.com/emqx/emqx/issues/4148 + Selected = #{a => <<"1''2">>, b => 1, c => 1.0, + d => #{d1 => <<"someone's phone">>}}, + ParamsTokens = emqx_plugin_libs_rule:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), + ?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>, + emqx_plugin_libs_rule:proc_cql_param_str(ParamsTokens, Selected)). diff --git a/apps/emqx_rule_actions/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_rule_actions/src/emqx_bridge_mqtt_actions.erl index 194fc4096..fc5f89091 100644 --- a/apps/emqx_rule_actions/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_rule_actions/src/emqx_bridge_mqtt_actions.erl @@ -22,7 +22,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_rule_engine/include/rule_actions.hrl"). --import(emqx_rule_utils, [str/1]). +-import(emqx_plugin_libs_rule, [str/1]). -export([ on_resource_create/2 , on_get_resource_status/2 @@ -443,10 +443,10 @@ on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName, <<"forward_topic">> := ForwardTopic, <<"payload_tmpl">> := PayloadTmpl}) -> ?LOG(info, "Initiating Action ~p.", [?FUNCTION_NAME]), - PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), + PayloadTks = emqx_plugin_libs_rule:preproc_tmpl(PayloadTmpl), TopicTks = case ForwardTopic == <<"">> of true -> undefined; - false -> emqx_rule_utils:preproc_tmpl(ForwardTopic) + false -> emqx_plugin_libs_rule:preproc_tmpl(ForwardTopic) end, Opts. @@ -461,7 +461,7 @@ on_action_data_to_mqtt_broker(Msg, _Env = }}) -> Topic1 = case TopicTks =:= undefined of true -> Topic; - false -> emqx_rule_utils:proc_tmpl(TopicTks, Msg) + false -> emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg) end, BrokerMsg = #message{id = Id, qos = QoS, @@ -480,7 +480,7 @@ format_data([], Msg) -> emqx_json:encode(Msg); format_data(Tokens, Msg) -> - emqx_rule_utils:proc_tmpl(Tokens, Msg). + emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg). subscriptions(Subscriptions) -> scan_binary(<<"[", Subscriptions/binary, "].">>). diff --git a/apps/emqx_rule_actions/src/emqx_web_hook_actions.erl b/apps/emqx_rule_actions/src/emqx_web_hook_actions.erl index ef21ab864..1b68ad5b0 100644 --- a/apps/emqx_rule_actions/src/emqx_web_hook_actions.erl +++ b/apps/emqx_rule_actions/src/emqx_web_hook_actions.erl @@ -237,8 +237,8 @@ on_action_create_data_to_webserver(Id, Params) -> body := Body, pool := Pool, request_timeout := RequestTimeout} = parse_action_params(Params), - BodyTokens = emqx_rule_utils:preproc_tmpl(Body), - PathTokens = emqx_rule_utils:preproc_tmpl(Path), + BodyTokens = emqx_plugin_libs_rule:preproc_tmpl(Body), + PathTokens = emqx_plugin_libs_rule:preproc_tmpl(Path), Params. on_action_data_to_webserver(Selected, _Envs = @@ -252,7 +252,7 @@ on_action_data_to_webserver(Selected, _Envs = 'RequestTimeout' := RequestTimeout}, clientid := ClientID}) -> NBody = format_msg(BodyTokens, Selected), - NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected), + NPath = emqx_plugin_libs_rule:proc_tmpl(PathTokens, Selected), Req = create_req(Method, NPath, Headers, NBody), case ehttpc:request(ehttpc_pool:pick_worker(Pool, ClientID), Method, Req, RequestTimeout) of {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> @@ -273,7 +273,7 @@ on_action_data_to_webserver(Selected, _Envs = format_msg([], Data) -> emqx_json:encode(Data); format_msg(Tokens, Data) -> - emqx_rule_utils:proc_tmpl(Tokens, Data). + emqx_plugin_libs_rule:proc_tmpl(Tokens, Data). %%------------------------------------------------------------------------------ %% Internal functions @@ -366,7 +366,7 @@ get_ssl_opts(Opts, ResId) -> test_http_connect(Conf) -> Url = fun() -> maps:get(<<"url">>, Conf) end, try - emqx_rule_utils:http_connectivity(Url()) + emqx_plugin_libs_rule:http_connectivity(Url()) of ok -> true; {error, _Reason} -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index ad035356f..7ac45633c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -135,8 +135,8 @@ on_action_create_republish(Id, Params = #{ <<"target_qos">> := TargetQoS, <<"payload_tmpl">> := PayloadTmpl }) -> - TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic), - PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), + TopicTks = emqx_plugin_libs_rule:preproc_tmpl(TargetTopic), + PayloadTks = emqx_plugin_libs_rule:preproc_tmpl(PayloadTmpl), Params. -spec on_action_republish(selected_data(), env_vars()) -> any(). @@ -167,8 +167,8 @@ on_action_republish(Selected, _Envs = #{ from = ActId, flags = Flags, headers = #{republish_by => ActId}, - topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), - payload = emqx_rule_utils:proc_tmpl(PayloadTks, Selected), + topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected), + payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), timestamp = Timestamp }); @@ -190,8 +190,8 @@ on_action_republish(Selected, _Envs = #{ from = ActId, flags = #{dup => false, retain => false}, headers = #{republish_by => ActId}, - topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), - payload = emqx_rule_utils:proc_tmpl(PayloadTks, Selected), + topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected), + payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), timestamp = erlang:system_time(millisecond) }). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 0b3ffb603..5095bc5b1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -216,7 +216,7 @@ delete_rule(RuleId) -> case emqx_rule_registry:get_rule(RuleId) of {ok, Rule = #rule{actions = Actions}} -> try - _ = emqx_rule_utils:cluster_call(?MODULE, clear_rule, [Rule]), + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, clear_rule, [Rule]), ok = emqx_rule_registry:remove_rule(Rule) catch Error:Reason:ST -> @@ -242,7 +242,7 @@ create_resource(#{type := Type, config := Config0} = Params) -> ok = emqx_rule_registry:add_resource(Resource), %% Note that we will return OK in case of resource creation failure, %% A timer is started to re-start the resource later. - catch _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [M, F, ResId, Config]), + catch _ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_resource, [M, F, ResId, Config]), {ok, Resource}; not_found -> {error, {resource_type_not_found, Type}} @@ -280,7 +280,7 @@ do_check_and_update_resource(#{id := Id, type := Type, description := NewDescrip Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec), case test_resource(#{type => Type, config => NewConfig}) of ok -> - _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [Module, Create, Id, Config]), + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_resource, [Module, Create, Id, Config]), emqx_rule_registry:add_resource(#resource{ id = Id, type = Type, @@ -319,8 +319,8 @@ test_resource(#{type := Type, config := Config0}) -> Config = emqx_rule_validator:validate_params(Config0, ParamSpec), ResId = resource_id(), try - _ = emqx_rule_utils:cluster_call(?MODULE, init_resource, [ModC, Create, ResId, Config]), - _ = emqx_rule_utils:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]), + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_resource, [ModC, Create, ResId, Config]), + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]), ok catch throw:Reason -> {error, Reason} @@ -359,7 +359,7 @@ delete_resource(ResId) -> try case emqx_rule_registry:remove_resource(ResId) of ok -> - _ = emqx_rule_utils:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]), + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]), ok; {error, _} = R -> R end @@ -426,7 +426,7 @@ prepare_action(#{name := Name, args := Args0} = Action, NeedInit) -> ActionInstId = maps:get(id, Action, action_instance_id(Name)), case NeedInit of true -> - _ = emqx_rule_utils:cluster_call(?MODULE, init_action, [Mod, Create, ActionInstId, + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_action, [Mod, Create, ActionInstId, with_resource_params(Args)]), ok; false -> ok @@ -485,7 +485,7 @@ may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) -> may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) -> %% prepare new actions before removing old ones NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)), - _ = emqx_rule_utils:cluster_call(?MODULE, clear_actions, [OldActions]), + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, clear_actions, [OldActions]), may_update_rule_params(Rule#rule{actions = NewActions}, maps:remove(actions, Params)); may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params Rule. @@ -631,7 +631,7 @@ refresh_actions(Actions, Pred) -> true -> {ok, #action{module = Mod, on_create = Create}} = emqx_rule_registry:find_action(ActName), - _ = emqx_rule_utils:cluster_call(?MODULE, init_action, [Mod, Create, Id, with_resource_params(Args)]), + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_action, [Mod, Create, Id, with_resource_params(Args)]), refresh_actions(Fallbacks, Pred); false -> ok end diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 89fdde579..21c715745 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -503,26 +503,26 @@ do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"signed">>, <<"little">>) -> %%------------------------------------------------------------------------------ str(Data) -> - emqx_rule_utils:bin(Data). + emqx_plugin_libs_rule:bin(Data). str_utf8(Data) -> - emqx_rule_utils:utf8_bin(Data). + emqx_plugin_libs_rule:utf8_bin(Data). bool(Data) -> - emqx_rule_utils:bool(Data). + emqx_plugin_libs_rule:bool(Data). int(Data) -> - emqx_rule_utils:int(Data). + emqx_plugin_libs_rule:int(Data). float(Data) -> - emqx_rule_utils:float(Data). + emqx_plugin_libs_rule:float(Data). float(Data, Decimals) when Decimals > 0 -> Data1 = ?MODULE:float(Data), list_to_float(float_to_list(Data1, [{decimals, Decimals}])). map(Data) -> - emqx_rule_utils:map(Data). + emqx_plugin_libs_rule:map(Data). bin2hexstr(Bin) when is_binary(Bin) -> emqx_misc:bin2hexstr_A_F(Bin). @@ -730,7 +730,7 @@ mget(Key, Map, Default) -> {ok, Val} -> Val; error when is_atom(Key) -> %% the map may have an equivalent binary-form key - BinKey = emqx_rule_utils:bin(Key), + BinKey = emqx_plugin_libs_rule:bin(Key), case maps:find(BinKey, Map) of {ok, Val} -> Val; error -> Default @@ -754,7 +754,7 @@ mput(Key, Val, Map) -> {ok, _} -> maps:put(Key, Val, Map); error when is_atom(Key) -> %% the map may have an equivalent binary-form key - BinKey = emqx_rule_utils:bin(Key), + BinKey = emqx_plugin_libs_rule:bin(Key), case maps:find(BinKey, Map) of {ok, _} -> maps:put(BinKey, Val, Map); error -> maps:put(Key, Val, Map) @@ -853,7 +853,7 @@ unix_ts_to_rfc3339(Epoch) -> unix_ts_to_rfc3339(Epoch, <<"second">>). unix_ts_to_rfc3339(Epoch, Unit) when is_integer(Epoch) -> - emqx_rule_utils:bin( + emqx_plugin_libs_rule:bin( calendar:system_time_to_rfc3339( Epoch, [{unit, time_unit(Unit)}])). diff --git a/apps/emqx_rule_engine/src/emqx_rule_maps.erl b/apps/emqx_rule_engine/src/emqx_rule_maps.erl index 512ae5c74..4bb104f7f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_maps.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_maps.erl @@ -85,7 +85,7 @@ general_find({key, Key}, Map, _OrgData, Handler) when is_map(Map) -> {ok, Val} -> Handler({found, {{key, Key}, Val}}); error when is_atom(Key) -> %% the map may have an equivalent binary-form key - BinKey = emqx_rule_utils:bin(Key), + BinKey = emqx_plugin_libs_rule:bin(Key), case maps:find(BinKey, Map) of {ok, Val} -> Handler({equivalent, {{key, BinKey}, Val}}); error -> Handler(not_found) diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index 096534585..d8cc53c33 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -185,7 +185,7 @@ get_rules_ordered_by_ts() -> -spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())). get_rules_for(Topic) -> [Rule || Rule = #rule{for = For} <- get_rules(), - emqx_rule_utils:can_topic_match_oneof(Topic, For)]. + emqx_plugin_libs_rule:can_topic_match_oneof(Topic, For)]. -spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())). get_rules_with_same_event(Topic) -> @@ -221,7 +221,7 @@ remove_rules(Rules) -> %% @private insert_rule(Rule) -> - _ = emqx_rule_utils:cluster_call(?MODULE, load_hooks_for_rule, [Rule]), + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, load_hooks_for_rule, [Rule]), mnesia:write(?RULE_TAB, Rule, write). %% @private @@ -231,7 +231,7 @@ delete_rule(RuleId) when is_binary(RuleId) -> not_found -> ok end; delete_rule(Rule) -> - _ = emqx_rule_utils:cluster_call(?MODULE, unload_hooks_for_rule, [Rule]), + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, unload_hooks_for_rule, [Rule]), mnesia:delete_object(?RULE_TAB, Rule, write). load_hooks_for_rule(#rule{for = Topics}) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 760205d62..1ebc808eb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -38,7 +38,7 @@ test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) -> case lists:all(fun is_publish_topic/1, EventTopics) of true -> %% test if the topic matches the topic filters in the rule - case emqx_rule_utils:can_topic_match_oneof(InTopic, EventTopics) of + case emqx_plugin_libs_rule:can_topic_match_oneof(InTopic, EventTopics) of true -> test_rule(Sql, Select, Context, EventTopics); false -> {error, nomatch} end; @@ -112,7 +112,7 @@ envs_examp(_) -> topic => <<"t/a">>, qos => 1, flags => #{sys => true, event => true}, - publish_received_at => emqx_rule_utils:now_ms(), - timestamp => emqx_rule_utils:now_ms(), + publish_received_at => emqx_plugin_libs_rule:now_ms(), + timestamp => emqx_plugin_libs_rule:now_ms(), node => node() }. diff --git a/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl deleted file mode 100644 index 002d47c8c..000000000 --- a/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl +++ /dev/null @@ -1,136 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_rule_utils_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). - --define(PORT, 9876). - -all() -> emqx_ct:all(?MODULE). - -t_http_connectivity(_) -> - {ok, Socket} = gen_tcp:listen(?PORT, []), - ok = emqx_rule_utils:http_connectivity("http://127.0.0.1:"++emqx_rule_utils:str(?PORT), 1000), - gen_tcp:close(Socket), - {error, _} = emqx_rule_utils:http_connectivity("http://127.0.0.1:"++emqx_rule_utils:str(?PORT), 1000). - -t_tcp_connectivity(_) -> - {ok, Socket} = gen_tcp:listen(?PORT, []), - ok = emqx_rule_utils:tcp_connectivity("127.0.0.1", ?PORT, 1000), - gen_tcp:close(Socket), - {error, _} = emqx_rule_utils:tcp_connectivity("127.0.0.1", ?PORT, 1000). - -t_str(_) -> - ?assertEqual("abc", emqx_rule_utils:str("abc")), - ?assertEqual("abc", emqx_rule_utils:str(abc)), - ?assertEqual("{\"a\":1}", emqx_rule_utils:str(#{a => 1})), - ?assertEqual("1", emqx_rule_utils:str(1)), - ?assertEqual("2.0", emqx_rule_utils:str(2.0)), - ?assertEqual("true", emqx_rule_utils:str(true)), - ?assertError(_, emqx_rule_utils:str({a, v})). - -t_bin(_) -> - ?assertEqual(<<"abc">>, emqx_rule_utils:bin("abc")), - ?assertEqual(<<"abc">>, emqx_rule_utils:bin(abc)), - ?assertEqual(<<"{\"a\":1}">>, emqx_rule_utils:bin(#{a => 1})), - ?assertEqual(<<"[{\"a\":1}]">>, emqx_rule_utils:bin([#{a => 1}])), - ?assertEqual(<<"1">>, emqx_rule_utils:bin(1)), - ?assertEqual(<<"2.0">>, emqx_rule_utils:bin(2.0)), - ?assertEqual(<<"true">>, emqx_rule_utils:bin(true)), - ?assertError(_, emqx_rule_utils:bin({a, v})). - -t_atom_key(_) -> - _ = erlang, _ = port, - ?assertEqual([erlang], emqx_rule_utils:atom_key([<<"erlang">>])), - ?assertEqual([erlang, port], emqx_rule_utils:atom_key([<<"erlang">>, port])), - ?assertEqual([erlang, port], emqx_rule_utils:atom_key([<<"erlang">>, <<"port">>])), - ?assertEqual(erlang, emqx_rule_utils:atom_key(<<"erlang">>)), - ?assertError({invalid_key, {a, v}}, emqx_rule_utils:atom_key({a, v})), - _ = xyz876gv123, - ?assertEqual([xyz876gv123, port], emqx_rule_utils:atom_key([<<"xyz876gv123">>, port])). - -t_unsafe_atom_key(_) -> - ?assertEqual([xyz876gv], emqx_rule_utils:unsafe_atom_key([<<"xyz876gv">>])), - ?assertEqual([xyz876gv33, port], emqx_rule_utils:unsafe_atom_key([<<"xyz876gv33">>, port])), - ?assertEqual([xyz876gv331, port1221], emqx_rule_utils:unsafe_atom_key([<<"xyz876gv331">>, <<"port1221">>])), - ?assertEqual(xyz876gv3312, emqx_rule_utils:unsafe_atom_key(<<"xyz876gv3312">>)). - -t_proc_tmpl(_) -> - Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, - Tks = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), - ?assertEqual(<<"a:1,b:1,c:1.0,d:{\"d1\":\"hi\"}">>, - emqx_rule_utils:proc_tmpl(Tks, Selected)). - -t_proc_tmpl1(_) -> - Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, - Tks = emqx_rule_utils:preproc_tmpl(<<"a:$a,b:b},c:{c},d:${d">>), - ?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>, - emqx_rule_utils:proc_tmpl(Tks, Selected)). - -t_proc_cmd(_) -> - Selected = #{v0 => <<"x">>, v1 => <<"1">>, v2 => #{d1 => <<"hi">>}}, - Tks = emqx_rule_utils:preproc_cmd(<<"hset name a:${v0} ${v1} b ${v2} ">>), - ?assertEqual([<<"hset">>, <<"name">>, - <<"a:x">>, <<"1">>, - <<"b">>, <<"{\"d1\":\"hi\"}">>], - emqx_rule_utils:proc_cmd(Tks, Selected)). - -t_preproc_sql(_) -> - Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, - {PrepareStatement, ParamsTokens} = emqx_rule_utils:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '?'), - ?assertEqual(<<"a:?,b:?,c:?,d:?">>, PrepareStatement), - ?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>], - emqx_rule_utils:proc_sql(ParamsTokens, Selected)). - -t_preproc_sql1(_) -> - Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, - {PrepareStatement, ParamsTokens} = emqx_rule_utils:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '$n'), - ?assertEqual(<<"a:$1,b:$2,c:$3,d:$4">>, PrepareStatement), - ?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>], - emqx_rule_utils:proc_sql(ParamsTokens, Selected)). -t_preproc_sql2(_) -> - Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, - {PrepareStatement, ParamsTokens} = emqx_rule_utils:preproc_sql(<<"a:$a,b:b},c:{c},d:${d">>, '?'), - ?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>, PrepareStatement), - ?assertEqual([], emqx_rule_utils:proc_sql(ParamsTokens, Selected)). - -t_preproc_sql3(_) -> - Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, - ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), - ?assertEqual(<<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}'">>, - emqx_rule_utils:proc_sql_param_str(ParamsTokens, Selected)). - -t_preproc_sql4(_) -> - %% with apostrophes - %% https://github.com/emqx/emqx/issues/4135 - Selected = #{a => <<"1''2">>, b => 1, c => 1.0, - d => #{d1 => <<"someone's phone">>}}, - ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), - ?assertEqual(<<"a:'1\\'\\'2',b:1,c:1.0,d:'{\"d1\":\"someone\\'s phone\"}'">>, - emqx_rule_utils:proc_sql_param_str(ParamsTokens, Selected)). - -t_preproc_sql5(_) -> - %% with apostrophes for cassandra - %% https://github.com/emqx/emqx/issues/4148 - Selected = #{a => <<"1''2">>, b => 1, c => 1.0, - d => #{d1 => <<"someone's phone">>}}, - ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), - ?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>, - emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)).