%%-------------------------------------------------------------------- %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_rule_utils). -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). -export([ replace_var/2 ]). %% preprocess and process tempalte string with place holders -export([ preproc_tmpl/1 , proc_tmpl/2 , proc_tmpl/3 , preproc_cmd/1 , proc_cmd/2 , proc_cmd/3 , preproc_sql/1 , preproc_sql/2 , proc_sql/2 , proc_sql_param_str/2 , proc_cql_param_str/2 ]). %% type converting -export([ str/1 , float2str/2 , bin/1 , bool/1 , int/1 , float/1 , map/1 , utf8_bin/1 , utf8_str/1 , number_to_binary/1 , atom_key/1 , unsafe_atom_key/1 ]). %% connectivity check -export([ http_connectivity/1 , http_connectivity/2 , tcp_connectivity/2 , tcp_connectivity/3 ]). -export([ now_ms/0 , can_topic_match_oneof/2 ]). -export([ add_metadata/2 , log_action/4 ]). -compile({no_auto_import, [ float/1 ]}). -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})"). -define(EX_WITHE_CHARS, "\\s"). %% Space and CRLF -define(FLOAT_PRECISION, 17). -type(uri_string() :: iodata()). -type(tmpl_token() :: list({var, binary()} | {str, binary()})). -type(tmpl_cmd() :: list(tmpl_token())). -type(prepare_statement_key() :: binary()). %% preprocess template string with place holders -spec(preproc_tmpl(binary()) -> tmpl_token()). preproc_tmpl(Str) -> Tokens = re:split(Str, ?EX_PLACE_HOLDER, [{return,binary},group,trim]), preproc_tmpl(Tokens, []). preproc_tmpl([], Acc) -> lists:reverse(Acc); preproc_tmpl([[Str, Phld]| Tokens], Acc) -> preproc_tmpl(Tokens, put_head(var, parse_nested(unwrap(Phld)), put_head(str, Str, Acc))); preproc_tmpl([[Str]| Tokens], Acc) -> preproc_tmpl(Tokens, put_head(str, Str, Acc)). %% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result %% value will be an integer 1. replace_var(Tokens, Data) when is_list(Tokens) -> [Val] = proc_tmpl(Tokens, Data, #{return => rawlist}), Val; replace_var(Val, _Data) -> Val. put_head(_Type, <<>>, List) -> List; put_head(Type, Term, List) -> [{Type, Term} | List]. -spec(proc_tmpl(tmpl_token(), map()) -> binary()). proc_tmpl(Tokens, Data) -> proc_tmpl(Tokens, Data, #{return => full_binary}). -spec(proc_tmpl(tmpl_token(), map(), map()) -> binary() | list()). proc_tmpl(Tokens, Data, Opts = #{return := full_binary}) -> Trans = maps:get(var_trans, Opts, fun bin/1), list_to_binary( proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Trans})); proc_tmpl(Tokens, Data, Opts = #{return := rawlist}) -> Trans = maps:get(var_trans, Opts, undefined), lists:map( fun ({str, Str}) -> Str; ({var, Phld}) when is_function(Trans) -> Trans(get_phld_var(Phld, Data)); ({var, Phld}) -> get_phld_var(Phld, Data) end, Tokens). -spec(preproc_cmd(binary()) -> tmpl_cmd()). preproc_cmd(Str) -> SubStrList = re:split(Str, ?EX_WITHE_CHARS, [{return,binary},trim]), [preproc_tmpl(SubStr) || SubStr <- SubStrList]. -spec(proc_cmd([tmpl_token()], map()) -> binary() | list()). proc_cmd(Tokens, Data) -> proc_cmd(Tokens, Data, #{return => full_binary}). -spec(proc_cmd([tmpl_token()], map(), map()) -> list()). proc_cmd(Tokens, Data, Opts) -> [proc_tmpl(Tks, Data, Opts) || Tks <- Tokens]. %% preprocess SQL with place holders -spec(preproc_sql(Sql::binary()) -> {prepare_statement_key(), tmpl_token()}). preproc_sql(Sql) -> preproc_sql(Sql, '?'). -spec(preproc_sql(Sql::binary(), ReplaceWith :: '?' | '$n') -> {prepare_statement_key(), tmpl_token()}). preproc_sql(Sql, ReplaceWith) -> case re:run(Sql, ?EX_PLACE_HOLDER, [{capture, all_but_first, binary}, global]) of {match, PlaceHolders} -> PhKs = [parse_nested(unwrap(Phld)) || [Phld | _] <- PlaceHolders], {replace_with(Sql, ReplaceWith), [{var, Phld} || Phld <- PhKs]}; nomatch -> {Sql, []} end. -spec(proc_sql(tmpl_token(), map()) -> list()). proc_sql(Tokens, Data) -> proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => fun sql_data/1}). -spec(proc_sql_param_str(tmpl_token(), map()) -> binary()). proc_sql_param_str(Tokens, Data) -> proc_param_str(Tokens, Data, fun quote_sql/1). -spec(proc_cql_param_str(tmpl_token(), map()) -> binary()). proc_cql_param_str(Tokens, Data) -> proc_param_str(Tokens, Data, fun quote_cql/1). proc_param_str(Tokens, Data, Quote) -> iolist_to_binary( proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})). %% backward compatibility for hot upgrading from =< e4.2.1 get_phld_var(Fun, Data) when is_function(Fun) -> Fun(Data); get_phld_var(Phld, Data) -> emqx_rule_maps:nested_get(Phld, Data). replace_with(Tmpl, '?') -> re:replace(Tmpl, ?EX_PLACE_HOLDER, "?", [{return, binary}, global]); replace_with(Tmpl, '$n') -> Parts = re:split(Tmpl, ?EX_PLACE_HOLDER, [{return, binary}, trim, group]), {Res, _} = lists:foldl( fun([Tkn, _Phld], {Acc, Seq}) -> Seq1 = erlang:integer_to_binary(Seq), {<>, Seq + 1}; ([Tkn], {Acc, Seq}) -> {<>, Seq} end, {<<>>, 1}, Parts), Res. unsafe_atom_key(Key) when is_atom(Key) -> Key; unsafe_atom_key(Key) when is_binary(Key) -> binary_to_atom(Key, utf8); unsafe_atom_key(Keys = [_Key | _]) -> [unsafe_atom_key(SubKey) || SubKey <- Keys]; unsafe_atom_key(Key) -> error({invalid_key, Key}). atom_key(Key) when is_atom(Key) -> Key; atom_key(Key) when is_binary(Key) -> try binary_to_existing_atom(Key, utf8) catch error:badarg -> error({invalid_key, Key}) end; atom_key(Keys = [_Key | _]) -> %% nested keys [atom_key(SubKey) || SubKey <- Keys]; atom_key(Key) -> error({invalid_key, Key}). -spec(http_connectivity(uri_string()) -> ok | {error, Reason :: term()}). http_connectivity(Url) -> http_connectivity(Url, 3000). -spec(http_connectivity(uri_string(), integer()) -> ok | {error, Reason :: term()}). http_connectivity(Url, Timeout) -> case emqx_http_lib:uri_parse(Url) of {ok, #{host := Host, port := Port}} -> tcp_connectivity(Host, Port, Timeout); {error, Reason} -> {error, Reason} end. -spec tcp_connectivity(Host :: inet:socket_address() | inet:hostname(), Port :: inet:port_number()) -> ok | {error, Reason :: term()}. tcp_connectivity(Host, Port) -> tcp_connectivity(Host, Port, 3000). -spec(tcp_connectivity(Host :: inet:socket_address() | inet:hostname(), Port :: inet:port_number(), Timeout :: integer()) -> ok | {error, Reason :: term()}). tcp_connectivity(Host, Port, Timeout) -> case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of {ok, Sock} -> gen_tcp:close(Sock), ok; {error, Reason} -> {error, Reason} end. unwrap(<<"${", Val/binary>>) -> binary:part(Val, {0, byte_size(Val)-1}). sql_data(undefined) -> null; sql_data(List) when is_list(List) -> List; sql_data(Bin) when is_binary(Bin) -> Bin; sql_data(Num) when is_number(Num) -> Num; sql_data(Bool) when is_boolean(Bool) -> Bool; sql_data(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); sql_data(Map) when is_map(Map) -> emqx_json:encode(Map). quote_sql(Str) -> quote(Str, <<"\\\\'">>). quote_cql(Str) -> quote(Str, <<"''">>). quote(Str, ReplaceWith) when is_list(Str); is_binary(Str); is_atom(Str); is_map(Str) -> [$', escape_apo(bin(Str), ReplaceWith), $']; quote(Val, _) -> bin(Val). escape_apo(Str, ReplaceWith) -> re:replace(Str, <<"'">>, ReplaceWith, [{return, binary}, global]). str(Bin) when is_binary(Bin) -> binary_to_list(Bin); str(Num) when is_number(Num) -> number_to_list(Num); str(Atom) when is_atom(Atom) -> atom_to_list(Atom); str(Map) when is_map(Map) -> binary_to_list(emqx_json:encode(Map)); str(List) when is_list(List) -> case io_lib:printable_list(List) of true -> List; false -> binary_to_list(emqx_json:encode(List)) end; str(Data) -> error({invalid_str, Data}). float2str(Float, Precision) when is_float(Float) and is_integer(Precision)-> float_to_binary(Float, [{decimals, Precision}, compact]). utf8_bin(Str) when is_binary(Str); is_list(Str) -> unicode:characters_to_binary(Str); utf8_bin(Str) -> unicode:characters_to_binary(bin(Str)). utf8_str(Str) when is_binary(Str); is_list(Str) -> unicode:characters_to_list(Str); utf8_str(Str) -> unicode:characters_to_list(str(Str)). bin(Bin) when is_binary(Bin) -> Bin; bin(Num) when is_number(Num) -> number_to_binary(Num); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); bin(Map) when is_map(Map) -> emqx_json:encode(Map); bin(List) when is_list(List) -> case io_lib:printable_list(List) of true -> list_to_binary(List); false -> emqx_json:encode(List) end; bin(Data) -> error({invalid_bin, Data}). int(List) when is_list(List) -> try list_to_integer(List) catch error:badarg -> int(list_to_float(List)) end; int(Bin) when is_binary(Bin) -> try binary_to_integer(Bin) catch error:badarg -> int(binary_to_float(Bin)) end; int(Int) when is_integer(Int) -> Int; int(Float) when is_float(Float) -> erlang:floor(Float); int(true) -> 1; int(false) -> 0; int(Data) -> error({invalid_number, Data}). float(List) when is_list(List) -> try list_to_float(List) catch error:badarg -> float(list_to_integer(List)) end; float(Bin) when is_binary(Bin) -> try binary_to_float(Bin) catch error:badarg -> float(binary_to_integer(Bin)) end; float(Num) when is_number(Num) -> erlang:float(Num); float(Data) -> error({invalid_number, Data}). map(Bin) when is_binary(Bin) -> case emqx_json:decode(Bin, [return_maps]) of Map = #{} -> Map; _ -> error({invalid_map, Bin}) end; map(List) when is_list(List) -> maps:from_list(List); map(Map) when is_map(Map) -> Map; map(Data) -> error({invalid_map, Data}). bool(Bool) when Bool == true; Bool == <<"true">>; Bool == 1 -> true; bool(Bool) when Bool == false; Bool == <<"false">>; Bool == 0 -> false; bool(Bool) -> error({invalid_boolean, Bool}). number_to_binary(Int) when is_integer(Int) -> integer_to_binary(Int); number_to_binary(Float) when is_float(Float) -> float_to_binary(Float, [{decimals, ?FLOAT_PRECISION}, compact]). number_to_list(Int) when is_integer(Int) -> integer_to_list(Int); number_to_list(Float) when is_float(Float) -> float_to_list(Float, [{decimals, ?FLOAT_PRECISION}, compact]). parse_nested(Attr) -> case string:split(Attr, <<".">>, all) of [Attr] -> {var, Attr}; Nested -> {path, [{key, P} || P <- Nested]} end. now_ms() -> erlang:system_time(millisecond). can_topic_match_oneof(Topic, Filters) -> lists:any(fun(Fltr) -> emqx_topic:match(Topic, Fltr) end, Filters). add_metadata(Envs, Metadata) when is_map(Envs), is_map(Metadata) -> NMetadata = maps:merge(maps:get(metadata, Envs, #{}), Metadata), Envs#{metadata => NMetadata}; add_metadata(Envs, Action) when is_map(Envs), is_record(Action, action_instance)-> Metadata = gen_metadata_from_action(Action), NMetadata = maps:merge(maps:get(metadata, Envs, #{}), Metadata), Envs#{metadata => NMetadata}. gen_metadata_from_action(#action_instance{name = Name, args = undefined}) -> #{action_name => Name, resource_id => undefined}; gen_metadata_from_action(#action_instance{name = Name, args = Args}) when is_map(Args) -> #{action_name => Name, resource_id => maps:get(<<"$resource">>, Args, undefined)}; gen_metadata_from_action(#action_instance{name = Name}) -> #{action_name => Name, resource_id => undefined}. log_action(Level, Metadata, Fmt, Args) -> ?LOG(Level, "Rule: ~p; Action: ~p; Resource: ~p. " ++ Fmt, metadata_values(Metadata) ++ Args). metadata_values(Metadata) -> RuleId = maps:get(rule_id, Metadata, undefined), ActionName = maps:get(action_name, Metadata, undefined), ResourceName = maps:get(resource_id, Metadata, undefined), [RuleId, ActionName, ResourceName].