chore: All placeholders are placed in the same header file (#6190)
* chore: All placeholders are placed in the same header file * feat: Separate placeholders from the rules engine
This commit is contained in:
parent
a7a5e2d1f4
commit
04c204374a
|
@ -0,0 +1,76 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-ifndef(EMQ_X_PLACEHOLDER_HRL).
|
||||||
|
-define(EMQ_X_PLACEHOLDER_HRL, true).
|
||||||
|
|
||||||
|
-define(PH(Type), <<"${", Type/binary, "}">>).
|
||||||
|
|
||||||
|
%% action: publish/subscribe/all
|
||||||
|
-define(PH_ACTION, ?PH(<<"action">>)).
|
||||||
|
|
||||||
|
%% cert
|
||||||
|
-define(PH_CRET_SUBJECT, ?PH(<<"cert_subject">>)).
|
||||||
|
-define(PH_CRET_CN_NAME, ?PH(<<"cert_common_name">>)).
|
||||||
|
|
||||||
|
%% MQTT
|
||||||
|
-define(PH_PASSWORD, ?PH(<<"password">>)).
|
||||||
|
-define(PH_CLIENTID, ?PH(<<"clientid">>)).
|
||||||
|
-define(PH_FROM_CLIENTID, ?PH(<<"from_clienid">>)).
|
||||||
|
-define(PH_USERNAME, ?PH(<<"username">>)).
|
||||||
|
-define(PH_FROM_USERNAME, ?PH(<<"from_username">>)).
|
||||||
|
-define(PH_TOPIC, ?PH(<<"topic">>)).
|
||||||
|
%% MQTT payload
|
||||||
|
-define(PH_PAYLOAD, ?PH(<<"payload">>)).
|
||||||
|
%% client IPAddress
|
||||||
|
-define(PH_PEERHOST, ?PH(<<"peerhost">>)).
|
||||||
|
%% Enumeration of message QoS 0,1,2
|
||||||
|
-define(PH_QOS, ?PH(<<"qos">>)).
|
||||||
|
-define(PH_FLAGS, ?PH(<<"flags">>)).
|
||||||
|
%% Additional data related to process within the MQTT message
|
||||||
|
-define(PH_HEADERS, ?PH(<<"hearders">>)).
|
||||||
|
%% protocol name
|
||||||
|
-define(PH_PROTONAME, ?PH(<<"proto_name">>)).
|
||||||
|
%% protocol version
|
||||||
|
-define(PH_PROTOVER, ?PH(<<"proto_ver">>)).
|
||||||
|
%% MQTT keepalive interval
|
||||||
|
-define(PH_KEEPALIVE, ?PH(<<"keepalive">>)).
|
||||||
|
%% MQTT clean_start
|
||||||
|
-define(PH_CLEAR_START, ?PH(<<"clean_start">>)).
|
||||||
|
%% MQTT Session Expiration time
|
||||||
|
-define(PH_EXPIRY_INTERVAL, ?PH(<<"expiry_interval">>)).
|
||||||
|
|
||||||
|
%% Time when PUBLISH message reaches Broker (ms)
|
||||||
|
-define(PH_PUBLISH_RECEIVED_AT, ?PH(<<"publish_received_at">>)).
|
||||||
|
%% Mountpoint for bridging messages
|
||||||
|
-define(PH_MOUNTPOINT, ?PH(<<"mountpoint">>)).
|
||||||
|
%% IPAddress and Port of terminal
|
||||||
|
-define(PH_PEERNAME, ?PH(<<"peername">>)).
|
||||||
|
%% IPAddress and Port listened by emqx
|
||||||
|
-define(PH_SOCKNAME, ?PH(<<"sockname">>)).
|
||||||
|
%% whether it is MQTT bridge connection
|
||||||
|
-define(PH_IS_BRIDGE, ?PH(<<"is_bridge">>)).
|
||||||
|
%% Terminal connection completion time (s)
|
||||||
|
-define(PH_CONNECTED_AT, ?PH(<<"connected_at">>)).
|
||||||
|
%% Event trigger time(millisecond)
|
||||||
|
-define(PH_TIMESTAMP, ?PH(<<"timestamp">>)).
|
||||||
|
%% Terminal disconnection completion time (s)
|
||||||
|
-define(PH_DISCONNECTED_AT, ?PH(<<"disconnected_at">>)).
|
||||||
|
|
||||||
|
-define(PH_NODE, ?PH(<<"node">>)).
|
||||||
|
-define(PH_REASON, ?PH(<<"reason">>)).
|
||||||
|
|
||||||
|
-endif.
|
|
@ -68,8 +68,8 @@ end_per_suite(_Config) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
t_authz(_) ->
|
t_authz(_) ->
|
||||||
ClientInfo = #{clientid => <<"clientid">>,
|
ClientInfo = #{clientid => <<"my-clientid">>,
|
||||||
username => <<"username">>,
|
username => <<"my-username">>,
|
||||||
peerhost => {127,0,0,1},
|
peerhost => {127,0,0,1},
|
||||||
protocol => mqtt,
|
protocol => mqtt,
|
||||||
mountpoint => <<"fake">>,
|
mountpoint => <<"fake">>,
|
||||||
|
@ -92,4 +92,3 @@ t_authz(_) ->
|
||||||
?assertEqual(deny,
|
?assertEqual(deny,
|
||||||
emqx_access_control:authorize(ClientInfo, publish, <<"+">>)),
|
emqx_access_control:authorize(ClientInfo, publish, <<"+">>)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,182 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_placeholder).
|
||||||
|
|
||||||
|
%% preprocess and process template string with place holders
|
||||||
|
-export([ preproc_tmpl/1
|
||||||
|
, proc_tmpl/2
|
||||||
|
, proc_tmpl/3
|
||||||
|
, preproc_cmd/1
|
||||||
|
, proc_cmd/2
|
||||||
|
, proc_cmd/3
|
||||||
|
, preproc_sql/1
|
||||||
|
, preproc_sql/2
|
||||||
|
, proc_sql/2
|
||||||
|
, proc_sql_param_str/2
|
||||||
|
, proc_cql_param_str/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-import(emqx_plugin_libs_rule, [bin/1]).
|
||||||
|
|
||||||
|
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
|
||||||
|
-define(EX_WITHE_CHARS, "\\s"). %% Space and CRLF
|
||||||
|
|
||||||
|
-type(tmpl_token() :: list({var, binary()} | {str, binary()})).
|
||||||
|
|
||||||
|
-type(tmpl_cmd() :: list(tmpl_token())).
|
||||||
|
|
||||||
|
-type(prepare_statement_key() :: binary()).
|
||||||
|
|
||||||
|
%% preprocess template string with place holders
|
||||||
|
-spec(preproc_tmpl(binary()) -> tmpl_token()).
|
||||||
|
preproc_tmpl(Str) ->
|
||||||
|
Tokens = re:split(Str, ?EX_PLACE_HOLDER, [{return,binary},group,trim]),
|
||||||
|
preproc_tmpl(Tokens, []).
|
||||||
|
|
||||||
|
preproc_tmpl([], Acc) ->
|
||||||
|
lists:reverse(Acc);
|
||||||
|
preproc_tmpl([[Str, Phld] | Tokens], Acc) ->
|
||||||
|
preproc_tmpl(Tokens,
|
||||||
|
put_head(var, parse_nested(unwrap(Phld)),
|
||||||
|
put_head(str, Str, Acc)));
|
||||||
|
preproc_tmpl([[Str] | Tokens], Acc) ->
|
||||||
|
preproc_tmpl(Tokens, put_head(str, Str, Acc)).
|
||||||
|
|
||||||
|
put_head(_Type, <<>>, List) -> List;
|
||||||
|
put_head(Type, Term, List) ->
|
||||||
|
[{Type, Term} | List].
|
||||||
|
|
||||||
|
-spec(proc_tmpl(tmpl_token(), map()) -> binary()).
|
||||||
|
proc_tmpl(Tokens, Data) ->
|
||||||
|
proc_tmpl(Tokens, Data, #{return => full_binary}).
|
||||||
|
|
||||||
|
-spec(proc_tmpl(tmpl_token(), map(), map()) -> binary() | list()).
|
||||||
|
proc_tmpl(Tokens, Data, Opts = #{return := full_binary}) ->
|
||||||
|
Trans = maps:get(var_trans, Opts, fun emqx_plugin_libs_rule:bin/1),
|
||||||
|
list_to_binary(
|
||||||
|
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Trans}));
|
||||||
|
|
||||||
|
proc_tmpl(Tokens, Data, Opts = #{return := rawlist}) ->
|
||||||
|
Trans = maps:get(var_trans, Opts, undefined),
|
||||||
|
lists:map(
|
||||||
|
fun ({str, Str}) -> Str;
|
||||||
|
({var, Phld}) when is_function(Trans) ->
|
||||||
|
Trans(get_phld_var(Phld, Data));
|
||||||
|
({var, Phld}) ->
|
||||||
|
get_phld_var(Phld, Data)
|
||||||
|
end, Tokens).
|
||||||
|
|
||||||
|
|
||||||
|
-spec(preproc_cmd(binary()) -> tmpl_cmd()).
|
||||||
|
preproc_cmd(Str) ->
|
||||||
|
SubStrList = re:split(Str, ?EX_WITHE_CHARS, [{return,binary},trim]),
|
||||||
|
[preproc_tmpl(SubStr) || SubStr <- SubStrList].
|
||||||
|
|
||||||
|
-spec(proc_cmd([tmpl_token()], map()) -> binary() | list()).
|
||||||
|
proc_cmd(Tokens, Data) ->
|
||||||
|
proc_cmd(Tokens, Data, #{return => full_binary}).
|
||||||
|
-spec(proc_cmd([tmpl_token()], map(), map()) -> list()).
|
||||||
|
proc_cmd(Tokens, Data, Opts) ->
|
||||||
|
[proc_tmpl(Tks, Data, Opts) || Tks <- Tokens].
|
||||||
|
|
||||||
|
%% preprocess SQL with place holders
|
||||||
|
-spec(preproc_sql(Sql::binary()) -> {prepare_statement_key(), tmpl_token()}).
|
||||||
|
preproc_sql(Sql) ->
|
||||||
|
preproc_sql(Sql, '?').
|
||||||
|
|
||||||
|
-spec(preproc_sql(Sql::binary(), ReplaceWith :: '?' | '$n')
|
||||||
|
-> {prepare_statement_key(), tmpl_token()}).
|
||||||
|
|
||||||
|
preproc_sql(Sql, ReplaceWith) ->
|
||||||
|
case re:run(Sql, ?EX_PLACE_HOLDER, [{capture, all_but_first, binary}, global]) of
|
||||||
|
{match, PlaceHolders} ->
|
||||||
|
PhKs = [parse_nested(unwrap(Phld)) || [Phld | _] <- PlaceHolders],
|
||||||
|
{replace_with(Sql, ReplaceWith), [{var, Phld} || Phld <- PhKs]};
|
||||||
|
nomatch ->
|
||||||
|
{Sql, []}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec(proc_sql(tmpl_token(), map()) -> list()).
|
||||||
|
proc_sql(Tokens, Data) ->
|
||||||
|
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => fun sql_data/1}).
|
||||||
|
|
||||||
|
-spec(proc_sql_param_str(tmpl_token(), map()) -> binary()).
|
||||||
|
proc_sql_param_str(Tokens, Data) ->
|
||||||
|
proc_param_str(Tokens, Data, fun quote_sql/1).
|
||||||
|
|
||||||
|
-spec(proc_cql_param_str(tmpl_token(), map()) -> binary()).
|
||||||
|
proc_cql_param_str(Tokens, Data) ->
|
||||||
|
proc_param_str(Tokens, Data, fun quote_cql/1).
|
||||||
|
|
||||||
|
proc_param_str(Tokens, Data, Quote) ->
|
||||||
|
iolist_to_binary(
|
||||||
|
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})).
|
||||||
|
|
||||||
|
%% backward compatibility for hot upgrading from =< e4.2.1
|
||||||
|
get_phld_var(Fun, Data) when is_function(Fun) ->
|
||||||
|
Fun(Data);
|
||||||
|
get_phld_var(Phld, Data) ->
|
||||||
|
emqx_rule_maps:nested_get(Phld, Data).
|
||||||
|
|
||||||
|
replace_with(Tmpl, '?') ->
|
||||||
|
re:replace(Tmpl, ?EX_PLACE_HOLDER, "?", [{return, binary}, global]);
|
||||||
|
replace_with(Tmpl, '$n') ->
|
||||||
|
Parts = re:split(Tmpl, ?EX_PLACE_HOLDER, [{return, binary}, trim, group]),
|
||||||
|
{Res, _} =
|
||||||
|
lists:foldl(
|
||||||
|
fun([Tkn, _Phld], {Acc, Seq}) ->
|
||||||
|
Seq1 = erlang:integer_to_binary(Seq),
|
||||||
|
{<<Acc/binary, Tkn/binary, "$", Seq1/binary>>, Seq + 1};
|
||||||
|
([Tkn], {Acc, Seq}) ->
|
||||||
|
{<<Acc/binary, Tkn/binary>>, Seq}
|
||||||
|
end, {<<>>, 1}, Parts),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
parse_nested(Attr) ->
|
||||||
|
case string:split(Attr, <<".">>, all) of
|
||||||
|
[Attr] -> {var, Attr};
|
||||||
|
Nested -> {path, [{key, P} || P <- Nested]}
|
||||||
|
end.
|
||||||
|
|
||||||
|
unwrap(<<"${", Val/binary>>) ->
|
||||||
|
binary:part(Val, {0, byte_size(Val)-1}).
|
||||||
|
|
||||||
|
sql_data(undefined) -> null;
|
||||||
|
sql_data(List) when is_list(List) -> List;
|
||||||
|
sql_data(Bin) when is_binary(Bin) -> Bin;
|
||||||
|
sql_data(Num) when is_number(Num) -> Num;
|
||||||
|
sql_data(Bool) when is_boolean(Bool) -> Bool;
|
||||||
|
sql_data(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
|
||||||
|
sql_data(Map) when is_map(Map) -> emqx_json:encode(Map).
|
||||||
|
|
||||||
|
quote_sql(Str) ->
|
||||||
|
quote(Str, <<"\\\\'">>).
|
||||||
|
|
||||||
|
quote_cql(Str) ->
|
||||||
|
quote(Str, <<"''">>).
|
||||||
|
|
||||||
|
quote(Str, ReplaceWith) when
|
||||||
|
is_list(Str);
|
||||||
|
is_binary(Str);
|
||||||
|
is_atom(Str);
|
||||||
|
is_map(Str) ->
|
||||||
|
[$', escape_apo(bin(Str), ReplaceWith), $'];
|
||||||
|
quote(Val, _) ->
|
||||||
|
bin(Val).
|
||||||
|
|
||||||
|
escape_apo(Str, ReplaceWith) ->
|
||||||
|
re:replace(Str, <<"'">>, ReplaceWith, [{return, binary}, global]).
|
|
@ -15,8 +15,9 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_plugin_libs_rule).
|
-module(emqx_plugin_libs_rule).
|
||||||
|
-elvis([{elvis_style, god_modules, disable}]).
|
||||||
|
|
||||||
%% preprocess and process tempalte string with place holders
|
%% preprocess and process template string with place holders
|
||||||
-export([ preproc_tmpl/1
|
-export([ preproc_tmpl/1
|
||||||
, proc_tmpl/2
|
, proc_tmpl/2
|
||||||
, proc_tmpl/3
|
, proc_tmpl/3
|
||||||
|
@ -76,107 +77,49 @@
|
||||||
%% preprocess template string with place holders
|
%% preprocess template string with place holders
|
||||||
-spec(preproc_tmpl(binary()) -> tmpl_token()).
|
-spec(preproc_tmpl(binary()) -> tmpl_token()).
|
||||||
preproc_tmpl(Str) ->
|
preproc_tmpl(Str) ->
|
||||||
Tokens = re:split(Str, ?EX_PLACE_HOLDER, [{return,binary},group,trim]),
|
emqx_placeholder:preproc_tmpl(Str).
|
||||||
preproc_tmpl(Tokens, []).
|
|
||||||
|
|
||||||
preproc_tmpl([], Acc) ->
|
|
||||||
lists:reverse(Acc);
|
|
||||||
preproc_tmpl([[Str, Phld]| Tokens], Acc) ->
|
|
||||||
preproc_tmpl(Tokens,
|
|
||||||
put_head(var, parse_nested(unwrap(Phld)),
|
|
||||||
put_head(str, Str, Acc)));
|
|
||||||
preproc_tmpl([[Str]| Tokens], Acc) ->
|
|
||||||
preproc_tmpl(Tokens, put_head(str, Str, Acc)).
|
|
||||||
|
|
||||||
put_head(_Type, <<>>, List) -> List;
|
|
||||||
put_head(Type, Term, List) ->
|
|
||||||
[{Type, Term} | List].
|
|
||||||
|
|
||||||
-spec(proc_tmpl(tmpl_token(), map()) -> binary()).
|
-spec(proc_tmpl(tmpl_token(), map()) -> binary()).
|
||||||
proc_tmpl(Tokens, Data) ->
|
proc_tmpl(Tokens, Data) ->
|
||||||
proc_tmpl(Tokens, Data, #{return => full_binary}).
|
emqx_placeholder:proc_tmpl(Tokens, Data).
|
||||||
|
|
||||||
-spec(proc_tmpl(tmpl_token(), map(), map()) -> binary() | list()).
|
-spec(proc_tmpl(tmpl_token(), map(), map()) -> binary() | list()).
|
||||||
proc_tmpl(Tokens, Data, Opts = #{return := full_binary}) ->
|
proc_tmpl(Tokens, Data, Opts) ->
|
||||||
Trans = maps:get(var_trans, Opts, fun bin/1),
|
emqx_placeholder:proc_tmpl(Tokens, Data, Opts).
|
||||||
list_to_binary(
|
|
||||||
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Trans}));
|
|
||||||
|
|
||||||
proc_tmpl(Tokens, Data, Opts = #{return := rawlist}) ->
|
|
||||||
Trans = maps:get(var_trans, Opts, undefined),
|
|
||||||
lists:map(
|
|
||||||
fun ({str, Str}) -> Str;
|
|
||||||
({var, Phld}) when is_function(Trans) ->
|
|
||||||
Trans(get_phld_var(Phld, Data));
|
|
||||||
({var, Phld}) ->
|
|
||||||
get_phld_var(Phld, Data)
|
|
||||||
end, Tokens).
|
|
||||||
|
|
||||||
|
|
||||||
-spec(preproc_cmd(binary()) -> tmpl_cmd()).
|
-spec(preproc_cmd(binary()) -> tmpl_cmd()).
|
||||||
preproc_cmd(Str) ->
|
preproc_cmd(Str) ->
|
||||||
SubStrList = re:split(Str, ?EX_WITHE_CHARS, [{return,binary},trim]),
|
emqx_placeholder:preproc_cmd(Str).
|
||||||
[preproc_tmpl(SubStr) || SubStr <- SubStrList].
|
|
||||||
|
|
||||||
-spec(proc_cmd([tmpl_token()], map()) -> binary() | list()).
|
-spec(proc_cmd([tmpl_token()], map()) -> binary() | list()).
|
||||||
proc_cmd(Tokens, Data) ->
|
proc_cmd(Tokens, Data) ->
|
||||||
proc_cmd(Tokens, Data, #{return => full_binary}).
|
emqx_placeholder:proc_cmd(Tokens, Data).
|
||||||
-spec(proc_cmd([tmpl_token()], map(), map()) -> list()).
|
-spec(proc_cmd([tmpl_token()], map(), map()) -> list()).
|
||||||
proc_cmd(Tokens, Data, Opts) ->
|
proc_cmd(Tokens, Data, Opts) ->
|
||||||
[proc_tmpl(Tks, Data, Opts) || Tks <- Tokens].
|
emqx_placeholder:proc_cmd(Tokens, Data, Opts).
|
||||||
|
|
||||||
%% preprocess SQL with place holders
|
%% preprocess SQL with place holders
|
||||||
-spec(preproc_sql(Sql::binary()) -> {prepare_statement_key(), tmpl_token()}).
|
-spec(preproc_sql(Sql::binary()) -> {prepare_statement_key(), tmpl_token()}).
|
||||||
preproc_sql(Sql) ->
|
preproc_sql(Sql) ->
|
||||||
preproc_sql(Sql, '?').
|
emqx_placeholder:preproc_sql(Sql).
|
||||||
|
|
||||||
-spec(preproc_sql(Sql::binary(), ReplaceWith :: '?' | '$n')
|
-spec(preproc_sql(Sql::binary(), ReplaceWith :: '?' | '$n')
|
||||||
-> {prepare_statement_key(), tmpl_token()}).
|
-> {prepare_statement_key(), tmpl_token()}).
|
||||||
|
|
||||||
preproc_sql(Sql, ReplaceWith) ->
|
preproc_sql(Sql, ReplaceWith) ->
|
||||||
case re:run(Sql, ?EX_PLACE_HOLDER, [{capture, all_but_first, binary}, global]) of
|
emqx_placeholder:preproc_sql(Sql, ReplaceWith).
|
||||||
{match, PlaceHolders} ->
|
|
||||||
PhKs = [parse_nested(unwrap(Phld)) || [Phld | _] <- PlaceHolders],
|
|
||||||
{replace_with(Sql, ReplaceWith), [{var, Phld} || Phld <- PhKs]};
|
|
||||||
nomatch ->
|
|
||||||
{Sql, []}
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec(proc_sql(tmpl_token(), map()) -> list()).
|
-spec(proc_sql(tmpl_token(), map()) -> list()).
|
||||||
proc_sql(Tokens, Data) ->
|
proc_sql(Tokens, Data) ->
|
||||||
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => fun sql_data/1}).
|
emqx_placeholder:proc_sql(Tokens, Data).
|
||||||
|
|
||||||
-spec(proc_sql_param_str(tmpl_token(), map()) -> binary()).
|
-spec(proc_sql_param_str(tmpl_token(), map()) -> binary()).
|
||||||
proc_sql_param_str(Tokens, Data) ->
|
proc_sql_param_str(Tokens, Data) ->
|
||||||
proc_param_str(Tokens, Data, fun quote_sql/1).
|
emqx_placeholder:proc_sql_param_str(Tokens, Data).
|
||||||
|
|
||||||
-spec(proc_cql_param_str(tmpl_token(), map()) -> binary()).
|
-spec(proc_cql_param_str(tmpl_token(), map()) -> binary()).
|
||||||
proc_cql_param_str(Tokens, Data) ->
|
proc_cql_param_str(Tokens, Data) ->
|
||||||
proc_param_str(Tokens, Data, fun quote_cql/1).
|
emqx_placeholder:proc_cql_param_str(Tokens, Data).
|
||||||
|
|
||||||
proc_param_str(Tokens, Data, Quote) ->
|
|
||||||
iolist_to_binary(
|
|
||||||
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})).
|
|
||||||
|
|
||||||
%% backward compatibility for hot upgrading from =< e4.2.1
|
|
||||||
get_phld_var(Fun, Data) when is_function(Fun) ->
|
|
||||||
Fun(Data);
|
|
||||||
get_phld_var(Phld, Data) ->
|
|
||||||
emqx_rule_maps:nested_get(Phld, Data).
|
|
||||||
|
|
||||||
replace_with(Tmpl, '?') ->
|
|
||||||
re:replace(Tmpl, ?EX_PLACE_HOLDER, "?", [{return, binary}, global]);
|
|
||||||
replace_with(Tmpl, '$n') ->
|
|
||||||
Parts = re:split(Tmpl, ?EX_PLACE_HOLDER, [{return, binary}, trim, group]),
|
|
||||||
{Res, _} =
|
|
||||||
lists:foldl(
|
|
||||||
fun([Tkn, _Phld], {Acc, Seq}) ->
|
|
||||||
Seq1 = erlang:integer_to_binary(Seq),
|
|
||||||
{<<Acc/binary, Tkn/binary, "$", Seq1/binary>>, Seq + 1};
|
|
||||||
([Tkn], {Acc, Seq}) ->
|
|
||||||
{<<Acc/binary, Tkn/binary>>, Seq}
|
|
||||||
end, {<<>>, 1}, Parts),
|
|
||||||
Res.
|
|
||||||
|
|
||||||
unsafe_atom_key(Key) when is_atom(Key) ->
|
unsafe_atom_key(Key) when is_atom(Key) ->
|
||||||
Key;
|
Key;
|
||||||
|
@ -227,35 +170,6 @@ tcp_connectivity(Host, Port, Timeout) ->
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
unwrap(<<"${", Val/binary>>) ->
|
|
||||||
binary:part(Val, {0, byte_size(Val)-1}).
|
|
||||||
|
|
||||||
sql_data(undefined) -> null;
|
|
||||||
sql_data(List) when is_list(List) -> List;
|
|
||||||
sql_data(Bin) when is_binary(Bin) -> Bin;
|
|
||||||
sql_data(Num) when is_number(Num) -> Num;
|
|
||||||
sql_data(Bool) when is_boolean(Bool) -> Bool;
|
|
||||||
sql_data(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
|
|
||||||
sql_data(Map) when is_map(Map) -> emqx_json:encode(Map).
|
|
||||||
|
|
||||||
quote_sql(Str) ->
|
|
||||||
quote(Str, <<"\\\\'">>).
|
|
||||||
|
|
||||||
quote_cql(Str) ->
|
|
||||||
quote(Str, <<"''">>).
|
|
||||||
|
|
||||||
quote(Str, ReplaceWith) when
|
|
||||||
is_list(Str);
|
|
||||||
is_binary(Str);
|
|
||||||
is_atom(Str);
|
|
||||||
is_map(Str) ->
|
|
||||||
[$', escape_apo(bin(Str), ReplaceWith), $'];
|
|
||||||
quote(Val, _) ->
|
|
||||||
bin(Val).
|
|
||||||
|
|
||||||
escape_apo(Str, ReplaceWith) ->
|
|
||||||
re:replace(Str, <<"'">>, ReplaceWith, [{return, binary}, global]).
|
|
||||||
|
|
||||||
str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
|
str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
|
||||||
str(Num) when is_number(Num) -> number_to_list(Num);
|
str(Num) when is_number(Num) -> number_to_list(Num);
|
||||||
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
|
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
|
||||||
|
@ -345,12 +259,6 @@ number_to_list(Int) when is_integer(Int) ->
|
||||||
number_to_list(Float) when is_float(Float) ->
|
number_to_list(Float) when is_float(Float) ->
|
||||||
float_to_list(Float, [{decimals, 10}, compact]).
|
float_to_list(Float, [{decimals, 10}, compact]).
|
||||||
|
|
||||||
parse_nested(Attr) ->
|
|
||||||
case string:split(Attr, <<".">>, all) of
|
|
||||||
[Attr] -> {var, Attr};
|
|
||||||
Nested -> {path, [{key, P} || P <- Nested]}
|
|
||||||
end.
|
|
||||||
|
|
||||||
now_ms() ->
|
now_ms() ->
|
||||||
erlang:system_time(millisecond).
|
erlang:system_time(millisecond).
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_placeholder_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
|
||||||
|
t_proc_tmpl(_) ->
|
||||||
|
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
||||||
|
Tks = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
|
||||||
|
?assertEqual(<<"a:1,b:1,c:1.0,d:{\"d1\":\"hi\"}">>,
|
||||||
|
emqx_placeholder:proc_tmpl(Tks, Selected)).
|
||||||
|
|
||||||
|
t_proc_tmpl1(_) ->
|
||||||
|
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
||||||
|
Tks = emqx_placeholder:preproc_tmpl(<<"a:$a,b:b},c:{c},d:${d">>),
|
||||||
|
?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>,
|
||||||
|
emqx_placeholder:proc_tmpl(Tks, Selected)).
|
||||||
|
|
||||||
|
t_proc_cmd(_) ->
|
||||||
|
Selected = #{v0 => <<"x">>, v1 => <<"1">>, v2 => #{d1 => <<"hi">>}},
|
||||||
|
Tks = emqx_placeholder:preproc_cmd(<<"hset name a:${v0} ${v1} b ${v2} ">>),
|
||||||
|
?assertEqual([<<"hset">>, <<"name">>,
|
||||||
|
<<"a:x">>, <<"1">>,
|
||||||
|
<<"b">>, <<"{\"d1\":\"hi\"}">>],
|
||||||
|
emqx_placeholder:proc_cmd(Tks, Selected)).
|
||||||
|
|
||||||
|
t_preproc_sql(_) ->
|
||||||
|
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
||||||
|
{PrepareStatement, ParamsTokens} =
|
||||||
|
emqx_placeholder:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '?'),
|
||||||
|
?assertEqual(<<"a:?,b:?,c:?,d:?">>, PrepareStatement),
|
||||||
|
?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>],
|
||||||
|
emqx_placeholder:proc_sql(ParamsTokens, Selected)).
|
||||||
|
|
||||||
|
t_preproc_sql1(_) ->
|
||||||
|
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
||||||
|
{PrepareStatement, ParamsTokens} =
|
||||||
|
emqx_placeholder:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '$n'),
|
||||||
|
?assertEqual(<<"a:$1,b:$2,c:$3,d:$4">>, PrepareStatement),
|
||||||
|
?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>],
|
||||||
|
emqx_placeholder:proc_sql(ParamsTokens, Selected)).
|
||||||
|
t_preproc_sql2(_) ->
|
||||||
|
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
||||||
|
{PrepareStatement, ParamsTokens} =
|
||||||
|
emqx_placeholder:preproc_sql(<<"a:$a,b:b},c:{c},d:${d">>, '?'),
|
||||||
|
?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>, PrepareStatement),
|
||||||
|
?assertEqual([], emqx_placeholder:proc_sql(ParamsTokens, Selected)).
|
||||||
|
|
||||||
|
t_preproc_sql3(_) ->
|
||||||
|
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
||||||
|
ParamsTokens = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
|
||||||
|
?assertEqual(<<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}'">>,
|
||||||
|
emqx_placeholder:proc_sql_param_str(ParamsTokens, Selected)).
|
||||||
|
|
||||||
|
t_preproc_sql4(_) ->
|
||||||
|
%% with apostrophes
|
||||||
|
%% https://github.com/emqx/emqx/issues/4135
|
||||||
|
Selected = #{a => <<"1''2">>, b => 1, c => 1.0,
|
||||||
|
d => #{d1 => <<"someone's phone">>}},
|
||||||
|
ParamsTokens = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
|
||||||
|
?assertEqual(<<"a:'1\\'\\'2',b:1,c:1.0,d:'{\"d1\":\"someone\\'s phone\"}'">>,
|
||||||
|
emqx_placeholder:proc_sql_param_str(ParamsTokens, Selected)).
|
||||||
|
|
||||||
|
t_preproc_sql5(_) ->
|
||||||
|
%% with apostrophes for cassandra
|
||||||
|
%% https://github.com/emqx/emqx/issues/4148
|
||||||
|
Selected = #{a => <<"1''2">>, b => 1, c => 1.0,
|
||||||
|
d => #{d1 => <<"someone's phone">>}},
|
||||||
|
ParamsTokens = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
|
||||||
|
?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>,
|
||||||
|
emqx_placeholder:proc_cql_param_str(ParamsTokens, Selected)).
|
|
@ -27,9 +27,11 @@ all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
t_http_connectivity(_) ->
|
t_http_connectivity(_) ->
|
||||||
{ok, Socket} = gen_tcp:listen(?PORT, []),
|
{ok, Socket} = gen_tcp:listen(?PORT, []),
|
||||||
ok = emqx_plugin_libs_rule:http_connectivity("http://127.0.0.1:"++emqx_plugin_libs_rule:str(?PORT), 1000),
|
ok = emqx_plugin_libs_rule:http_connectivity(
|
||||||
|
"http://127.0.0.1:"++emqx_plugin_libs_rule:str(?PORT), 1000),
|
||||||
gen_tcp:close(Socket),
|
gen_tcp:close(Socket),
|
||||||
{error, _} = emqx_plugin_libs_rule:http_connectivity("http://127.0.0.1:"++emqx_plugin_libs_rule:str(?PORT), 1000).
|
{error, _} = emqx_plugin_libs_rule:http_connectivity(
|
||||||
|
"http://127.0.0.1:"++emqx_plugin_libs_rule:str(?PORT), 1000).
|
||||||
|
|
||||||
t_tcp_connectivity(_) ->
|
t_tcp_connectivity(_) ->
|
||||||
{ok, Socket} = gen_tcp:listen(?PORT, []),
|
{ok, Socket} = gen_tcp:listen(?PORT, []),
|
||||||
|
@ -68,69 +70,8 @@ t_atom_key(_) ->
|
||||||
|
|
||||||
t_unsafe_atom_key(_) ->
|
t_unsafe_atom_key(_) ->
|
||||||
?assertEqual([xyz876gv], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv">>])),
|
?assertEqual([xyz876gv], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv">>])),
|
||||||
?assertEqual([xyz876gv33, port], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv33">>, port])),
|
?assertEqual([xyz876gv33, port],
|
||||||
?assertEqual([xyz876gv331, port1221], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv331">>, <<"port1221">>])),
|
emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv33">>, port])),
|
||||||
|
?assertEqual([xyz876gv331, port1221],
|
||||||
|
emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv331">>, <<"port1221">>])),
|
||||||
?assertEqual(xyz876gv3312, emqx_plugin_libs_rule:unsafe_atom_key(<<"xyz876gv3312">>)).
|
?assertEqual(xyz876gv3312, emqx_plugin_libs_rule:unsafe_atom_key(<<"xyz876gv3312">>)).
|
||||||
|
|
||||||
t_proc_tmpl(_) ->
|
|
||||||
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
|
||||||
Tks = emqx_plugin_libs_rule:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
|
|
||||||
?assertEqual(<<"a:1,b:1,c:1.0,d:{\"d1\":\"hi\"}">>,
|
|
||||||
emqx_plugin_libs_rule:proc_tmpl(Tks, Selected)).
|
|
||||||
|
|
||||||
t_proc_tmpl1(_) ->
|
|
||||||
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
|
||||||
Tks = emqx_plugin_libs_rule:preproc_tmpl(<<"a:$a,b:b},c:{c},d:${d">>),
|
|
||||||
?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>,
|
|
||||||
emqx_plugin_libs_rule:proc_tmpl(Tks, Selected)).
|
|
||||||
|
|
||||||
t_proc_cmd(_) ->
|
|
||||||
Selected = #{v0 => <<"x">>, v1 => <<"1">>, v2 => #{d1 => <<"hi">>}},
|
|
||||||
Tks = emqx_plugin_libs_rule:preproc_cmd(<<"hset name a:${v0} ${v1} b ${v2} ">>),
|
|
||||||
?assertEqual([<<"hset">>, <<"name">>,
|
|
||||||
<<"a:x">>, <<"1">>,
|
|
||||||
<<"b">>, <<"{\"d1\":\"hi\"}">>],
|
|
||||||
emqx_plugin_libs_rule:proc_cmd(Tks, Selected)).
|
|
||||||
|
|
||||||
t_preproc_sql(_) ->
|
|
||||||
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
|
||||||
{PrepareStatement, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '?'),
|
|
||||||
?assertEqual(<<"a:?,b:?,c:?,d:?">>, PrepareStatement),
|
|
||||||
?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>],
|
|
||||||
emqx_plugin_libs_rule:proc_sql(ParamsTokens, Selected)).
|
|
||||||
|
|
||||||
t_preproc_sql1(_) ->
|
|
||||||
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
|
||||||
{PrepareStatement, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '$n'),
|
|
||||||
?assertEqual(<<"a:$1,b:$2,c:$3,d:$4">>, PrepareStatement),
|
|
||||||
?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>],
|
|
||||||
emqx_plugin_libs_rule:proc_sql(ParamsTokens, Selected)).
|
|
||||||
t_preproc_sql2(_) ->
|
|
||||||
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
|
||||||
{PrepareStatement, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(<<"a:$a,b:b},c:{c},d:${d">>, '?'),
|
|
||||||
?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>, PrepareStatement),
|
|
||||||
?assertEqual([], emqx_plugin_libs_rule:proc_sql(ParamsTokens, Selected)).
|
|
||||||
|
|
||||||
t_preproc_sql3(_) ->
|
|
||||||
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
|
||||||
ParamsTokens = emqx_plugin_libs_rule:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
|
|
||||||
?assertEqual(<<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}'">>,
|
|
||||||
emqx_plugin_libs_rule:proc_sql_param_str(ParamsTokens, Selected)).
|
|
||||||
|
|
||||||
t_preproc_sql4(_) ->
|
|
||||||
%% with apostrophes
|
|
||||||
%% https://github.com/emqx/emqx/issues/4135
|
|
||||||
Selected = #{a => <<"1''2">>, b => 1, c => 1.0,
|
|
||||||
d => #{d1 => <<"someone's phone">>}},
|
|
||||||
ParamsTokens = emqx_plugin_libs_rule:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
|
|
||||||
?assertEqual(<<"a:'1\\'\\'2',b:1,c:1.0,d:'{\"d1\":\"someone\\'s phone\"}'">>,
|
|
||||||
emqx_plugin_libs_rule:proc_sql_param_str(ParamsTokens, Selected)).
|
|
||||||
|
|
||||||
t_preproc_sql5(_) ->
|
|
||||||
%% with apostrophes for cassandra
|
|
||||||
%% https://github.com/emqx/emqx/issues/4148
|
|
||||||
Selected = #{a => <<"1''2">>, b => 1, c => 1.0,
|
|
||||||
d => #{d1 => <<"someone's phone">>}},
|
|
||||||
ParamsTokens = emqx_plugin_libs_rule:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
|
|
||||||
?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>,
|
|
||||||
emqx_plugin_libs_rule:proc_cql_param_str(ParamsTokens, Selected)).
|
|
||||||
|
|
Loading…
Reference in New Issue