refactor(pluglib): move `emqx_placeholder` to utils app

Also make user that existing code calls it directly.
This commit is contained in:
Andrew Mayorov 2023-06-07 21:19:52 +03:00
parent 3a8332811a
commit d6c1ee183f
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
32 changed files with 151 additions and 203 deletions

View File

@ -225,21 +225,19 @@ without_password(Credential, [Name | Rest]) ->
without_password(Credential, Rest) without_password(Credential, Rest)
end. end.
urlencode_var({var, _} = Var, Value) ->
emqx_http_lib:uri_encode(handle_var(Var, Value));
urlencode_var(Var, Value) -> urlencode_var(Var, Value) ->
handle_var(Var, Value). emqx_http_lib:uri_encode(handle_var(Var, Value)).
handle_var({var, _Name}, undefined) -> handle_var(_Name, undefined) ->
<<>>; <<>>;
handle_var({var, <<"peerhost">>}, PeerHost) -> handle_var([<<"peerhost">>], PeerHost) ->
emqx_placeholder:bin(inet:ntoa(PeerHost)); emqx_placeholder:bin(inet:ntoa(PeerHost));
handle_var(_, Value) -> handle_var(_, Value) ->
emqx_placeholder:bin(Value). emqx_placeholder:bin(Value).
handle_sql_var({var, _Name}, undefined) -> handle_sql_var(_Name, undefined) ->
<<>>; <<>>;
handle_sql_var({var, <<"peerhost">>}, PeerHost) -> handle_sql_var([<<"peerhost">>], PeerHost) ->
emqx_placeholder:bin(inet:ntoa(PeerHost)); emqx_placeholder:bin(inet:ntoa(PeerHost));
handle_sql_var(_, Value) -> handle_sql_var(_, Value) ->
emqx_placeholder:sql_data(Value). emqx_placeholder:sql_data(Value).

View File

@ -188,21 +188,19 @@ convert_client_var({dn, DN}) -> {cert_subject, DN};
convert_client_var({protocol, Proto}) -> {proto_name, Proto}; convert_client_var({protocol, Proto}) -> {proto_name, Proto};
convert_client_var(Other) -> Other. convert_client_var(Other) -> Other.
urlencode_var({var, _} = Var, Value) ->
emqx_http_lib:uri_encode(handle_var(Var, Value));
urlencode_var(Var, Value) -> urlencode_var(Var, Value) ->
handle_var(Var, Value). emqx_http_lib:uri_encode(handle_var(Var, Value)).
handle_var({var, _Name}, undefined) -> handle_var(_Name, undefined) ->
<<>>; <<>>;
handle_var({var, <<"peerhost">>}, IpAddr) -> handle_var([<<"peerhost">>], IpAddr) ->
inet_parse:ntoa(IpAddr); inet_parse:ntoa(IpAddr);
handle_var(_Name, Value) -> handle_var(_Name, Value) ->
emqx_placeholder:bin(Value). emqx_placeholder:bin(Value).
handle_sql_var({var, _Name}, undefined) -> handle_sql_var(_Name, undefined) ->
<<>>; <<>>;
handle_sql_var({var, <<"peerhost">>}, IpAddr) -> handle_sql_var([<<"peerhost">>], IpAddr) ->
inet_parse:ntoa(IpAddr); inet_parse:ntoa(IpAddr);
handle_sql_var(_Name, Value) -> handle_sql_var(_Name, Value) ->
emqx_placeholder:sql_data(Value). emqx_placeholder:sql_data(Value).

View File

@ -81,7 +81,7 @@ t_compile(_) ->
{{127, 0, 0, 1}, {127, 0, 0, 1}, 32}, {{127, 0, 0, 1}, {127, 0, 0, 1}, 32},
{{192, 168, 1, 0}, {192, 168, 1, 255}, 24} {{192, 168, 1, 0}, {192, 168, 1, 255}, 24}
]}, ]},
subscribe, [{pattern, [{var, {var, <<"clientid">>}}]}]}, subscribe, [{pattern, [{var, [<<"clientid">>]}]}]},
emqx_authz_rule:compile(?SOURCE3) emqx_authz_rule:compile(?SOURCE3)
), ),
@ -99,14 +99,14 @@ t_compile(_) ->
{clientid, {re_pattern, _, _, _, _}} {clientid, {re_pattern, _, _, _, _}}
]}, ]},
publish, [ publish, [
{pattern, [{var, {var, <<"username">>}}]}, {pattern, [{var, {var, <<"clientid">>}}]} {pattern, [{var, [<<"username">>]}]}, {pattern, [{var, [<<"clientid">>]}]}
]}, ]},
emqx_authz_rule:compile(?SOURCE5) emqx_authz_rule:compile(?SOURCE5)
), ),
?assertEqual( ?assertEqual(
{allow, {username, {eq, <<"test">>}}, publish, [ {allow, {username, {eq, <<"test">>}}, publish, [
{pattern, [{str, <<"t/foo">>}, {var, {var, <<"username">>}}, {str, <<"boo">>}]} {pattern, [{str, <<"t/foo">>}, {var, [<<"username">>]}, {str, <<"boo">>}]}
]}, ]},
emqx_authz_rule:compile(?SOURCE6) emqx_authz_rule:compile(?SOURCE6)
), ),

View File

@ -273,10 +273,10 @@ proc_cql_params(
%% assert %% assert
_PreparedKey = maps:get(PreparedKey0, Prepares), _PreparedKey = maps:get(PreparedKey0, Prepares),
Tokens = maps:get(PreparedKey0, ParamsTokens), Tokens = maps:get(PreparedKey0, ParamsTokens),
{PreparedKey0, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}; {PreparedKey0, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))};
proc_cql_params(query, SQL, Params, _State) -> proc_cql_params(query, SQL, Params, _State) ->
{SQL1, Tokens} = emqx_plugin_libs_rule:preproc_sql(SQL, '?'), {SQL1, Tokens} = emqx_placeholder:preproc_sql(SQL, '?'),
{SQL1, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}. {SQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}.
exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
Type == query; Type == prepared_query Type == query; Type == prepared_query
@ -403,7 +403,7 @@ parse_prepare_cql(_) ->
#{prepare_cql => #{}, params_tokens => #{}}. #{prepare_cql => #{}, params_tokens => #{}}.
parse_prepare_cql([{Key, H} | T], Prepares, Tokens) -> parse_prepare_cql([{Key, H} | T], Prepares, Tokens) ->
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '?'), {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '?'),
parse_prepare_cql( parse_prepare_cql(
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
); );

View File

@ -193,7 +193,7 @@ prepare_sql_templates(#{
batch_value_separator := Separator batch_value_separator := Separator
}) -> }) ->
InsertTemplate = InsertTemplate =
emqx_plugin_libs_rule:preproc_tmpl(Template), emqx_placeholder:preproc_tmpl(Template),
BulkExtendInsertTemplate = BulkExtendInsertTemplate =
prepare_sql_bulk_extend_template(Template, Separator), prepare_sql_bulk_extend_template(Template, Separator),
#{ #{
@ -210,7 +210,7 @@ prepare_sql_bulk_extend_template(Template, Separator) ->
%% Add separator before ValuesTemplate so that one can append it %% Add separator before ValuesTemplate so that one can append it
%% to an insert template %% to an insert template
ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]), ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]),
emqx_plugin_libs_rule:preproc_tmpl(ExtendParamTemplate). emqx_placeholder:preproc_tmpl(ExtendParamTemplate).
%% This function is similar to emqx_plugin_libs_rule:split_insert_sql/1 but can %% This function is similar to emqx_plugin_libs_rule:split_insert_sql/1 but can
%% also handle Clickhouse's SQL extension for INSERT statments that allows the %% also handle Clickhouse's SQL extension for INSERT statments that allows the
@ -363,7 +363,7 @@ on_query(
transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL). transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL).
get_sql(send_message, #{send_message_template := PreparedSQL}, Data) -> get_sql(send_message, #{send_message_template := PreparedSQL}, Data) ->
emqx_plugin_libs_rule:proc_tmpl(PreparedSQL, Data); emqx_placeholder:proc_tmpl(PreparedSQL, Data);
get_sql(_, _, SQL) -> get_sql(_, _, SQL) ->
SQL. SQL.
@ -421,10 +421,10 @@ objects_to_sql(
} }
) -> ) ->
%% Prepare INSERT-statement and the first row after VALUES %% Prepare INSERT-statement and the first row after VALUES
InsertStatementHead = emqx_plugin_libs_rule:proc_tmpl(InsertTemplate, FirstObject), InsertStatementHead = emqx_placeholder:proc_tmpl(InsertTemplate, FirstObject),
FormatObjectDataFunction = FormatObjectDataFunction =
fun(Object) -> fun(Object) ->
emqx_plugin_libs_rule:proc_tmpl(BulkExtendInsertTemplate, Object) emqx_placeholder:proc_tmpl(BulkExtendInsertTemplate, Object)
end, end,
InsertStatementTail = lists:map(FormatObjectDataFunction, RemainingObjects), InsertStatementTail = lists:map(FormatObjectDataFunction, RemainingObjects),
CompleteStatement = erlang:iolist_to_binary([InsertStatementHead, InsertStatementTail]), CompleteStatement = erlang:iolist_to_binary([InsertStatementHead, InsertStatementTail]),

View File

@ -200,7 +200,7 @@ parse_template(Config) ->
parse_template(maps:to_list(Templates), #{}). parse_template(maps:to_list(Templates), #{}).
parse_template([{Key, H} | T], Templates) -> parse_template([{Key, H} | T], Templates) ->
ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(H), ParamsTks = emqx_placeholder:preproc_tmpl(H),
parse_template( parse_template(
T, T,
Templates#{Key => ParamsTks} Templates#{Key => ParamsTks}

View File

@ -144,7 +144,7 @@ apply_template({Key, Msg} = Req, Templates) ->
undefined -> undefined ->
Req; Req;
Template -> Template ->
{Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)} {Key, emqx_placeholder:proc_tmpl(Template, Msg)}
end; end;
%% now there is no batch delete, so %% now there is no batch delete, so
%% 1. we can simply replace the `send_message` to `put` %% 1. we can simply replace the `send_message` to `put`

View File

@ -40,7 +40,7 @@
connect_timeout := timer:time(), connect_timeout := timer:time(),
jwt_config := emqx_connector_jwt:jwt_config(), jwt_config := emqx_connector_jwt:jwt_config(),
max_retries := non_neg_integer(), max_retries := non_neg_integer(),
payload_template := emqx_plugin_libs_rule:tmpl_token(), payload_template := emqx_placeholder:tmpl_token(),
pool_name := binary(), pool_name := binary(),
project_id := binary(), project_id := binary(),
pubsub_topic := binary(), pubsub_topic := binary(),
@ -104,7 +104,7 @@ on_start(
connect_timeout => ConnectTimeout, connect_timeout => ConnectTimeout,
jwt_config => JWTConfig, jwt_config => JWTConfig,
max_retries => MaxRetries, max_retries => MaxRetries,
payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate), payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
pool_name => ResourceId, pool_name => ResourceId,
project_id => ProjectId, project_id => ProjectId,
pubsub_topic => PubSubTopic, pubsub_topic => PubSubTopic,
@ -294,7 +294,7 @@ encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) ->
Interpolated = Interpolated =
case PayloadTemplate of case PayloadTemplate of
[] -> emqx_utils_json:encode(Selected); [] -> emqx_utils_json:encode(Selected);
_ -> emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, Selected) _ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected)
end, end,
#{data => base64:encode(Interpolated)}. #{data => base64:encode(Interpolated)}.

View File

@ -436,7 +436,7 @@ to_config([Item0 | Rest], Acc, Precision) ->
Ts0 = maps:get(timestamp, Item0, undefined), Ts0 = maps:get(timestamp, Item0, undefined),
{Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision), {Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision),
Item = #{ Item = #{
measurement => emqx_plugin_libs_rule:preproc_tmpl(maps:get(measurement, Item0)), measurement => emqx_placeholder:preproc_tmpl(maps:get(measurement, Item0)),
timestamp => Ts, timestamp => Ts,
precision => {FromPrecision, ToPrecision}, precision => {FromPrecision, ToPrecision},
tags => to_kv_config(maps:get(tags, Item0)), tags => to_kv_config(maps:get(tags, Item0)),
@ -458,18 +458,18 @@ preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) ->
preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) -> preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) ->
preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision); preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision);
preproc_tmpl_timestamp(<<?DEFAULT_TIMESTAMP_TMPL>> = Ts, Precision) -> preproc_tmpl_timestamp(<<?DEFAULT_TIMESTAMP_TMPL>> = Ts, Precision) ->
{emqx_plugin_libs_rule:preproc_tmpl(Ts), ms, Precision}; {emqx_placeholder:preproc_tmpl(Ts), ms, Precision};
preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) -> preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) ->
%% a placehold is in use. e.g. ${payload.my_timestamp} %% a placehold is in use. e.g. ${payload.my_timestamp}
%% we can only hope it the value will be of the same precision in the configs %% we can only hope it the value will be of the same precision in the configs
{emqx_plugin_libs_rule:preproc_tmpl(Ts), Precision, Precision}. {emqx_placeholder:preproc_tmpl(Ts), Precision, Precision}.
to_kv_config(KVfields) -> to_kv_config(KVfields) ->
maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)). maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)).
to_maps_config(K, V, Res) -> to_maps_config(K, V, Res) ->
NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)), NK = emqx_placeholder:preproc_tmpl(bin(K)),
NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)), NV = emqx_placeholder:preproc_tmpl(bin(V)),
Res#{NK => NV}. Res#{NK => NV}.
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
@ -505,7 +505,7 @@ parse_batch_data(InstId, BatchData, SyntaxLines) ->
fields := [{binary(), binary()}], fields := [{binary(), binary()}],
measurement := binary(), measurement := binary(),
tags := [{binary(), binary()}], tags := [{binary(), binary()}],
timestamp := emqx_plugin_libs_rule:tmpl_token() | integer(), timestamp := emqx_placeholder:tmpl_token() | integer(),
precision := {From :: ts_precision(), To :: ts_precision()} precision := {From :: ts_precision(), To :: ts_precision()}
} }
]) -> {ok, [map()]} | {error, term()}. ]) -> {ok, [map()]} | {error, term()}.
@ -526,7 +526,7 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error
is_list(Ts) is_list(Ts)
-> ->
TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
case parse_timestamp(emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions)) of case parse_timestamp(emqx_placeholder:proc_tmpl(Ts, Data, TransOptions)) of
{ok, TsInt} -> {ok, TsInt} ->
Item1 = Item#{timestamp => TsInt}, Item1 = Item#{timestamp => TsInt},
continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc); continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc);
@ -573,7 +573,7 @@ line_to_point(
{_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
{_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
maps:without([precision], Item#{ maps:without([precision], Item#{
measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data), measurement => emqx_placeholder:proc_tmpl(Measurement, Data),
tags => EncodedTags, tags => EncodedTags,
fields => EncodedFields, fields => EncodedFields,
timestamp => maybe_convert_time_unit(Ts, Precision) timestamp => maybe_convert_time_unit(Ts, Precision)
@ -590,8 +590,8 @@ time_unit(ns) -> nanosecond.
maps_config_to_data(K, V, {Data, Res}) -> maps_config_to_data(K, V, {Data, Res}) ->
KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
NK0 = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions), NK0 = emqx_placeholder:proc_tmpl(K, Data, KTransOptions),
NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, VTransOptions), NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions),
case {NK0, NV} of case {NK0, NV} of
{[undefined], _} -> {[undefined], _} ->
{Data, Res}; {Data, Res};

View File

@ -185,7 +185,7 @@ preproc_data(
timestamp => maybe_preproc_tmpl( timestamp => maybe_preproc_tmpl(
maps:get(<<"timestamp">>, Data, <<"now">>) maps:get(<<"timestamp">>, Data, <<"now">>)
), ),
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), measurement => emqx_placeholder:preproc_tmpl(Measurement),
data_type => DataType, data_type => DataType,
value => maybe_preproc_tmpl(Value) value => maybe_preproc_tmpl(Value)
} }
@ -203,7 +203,7 @@ preproc_data(_NoMatch, Acc) ->
Acc. Acc.
maybe_preproc_tmpl(Value) when is_binary(Value) -> maybe_preproc_tmpl(Value) when is_binary(Value) ->
emqx_plugin_libs_rule:preproc_tmpl(Value); emqx_placeholder:preproc_tmpl(Value);
maybe_preproc_tmpl(Value) -> maybe_preproc_tmpl(Value) ->
Value. Value.
@ -225,7 +225,7 @@ proc_data(PreProcessedData, Msg) ->
) -> ) ->
#{ #{
timestamp => iot_timestamp(TimestampTkn, Msg, Nows), timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Msg), measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
data_type => DataType, data_type => DataType,
value => proc_value(DataType, ValueTkn, Msg) value => proc_value(DataType, ValueTkn, Msg)
} }
@ -236,7 +236,7 @@ proc_data(PreProcessedData, Msg) ->
iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) -> iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
Timestamp; Timestamp;
iot_timestamp(TimestampTkn, Msg, Nows) -> iot_timestamp(TimestampTkn, Msg, Nows) ->
iot_timestamp(emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows). iot_timestamp(emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
iot_timestamp(Timestamp, #{now_ms := NowMs}) when iot_timestamp(Timestamp, #{now_ms := NowMs}) when
Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>> Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
@ -250,7 +250,7 @@ iot_timestamp(Timestamp, _) when is_binary(Timestamp) ->
binary_to_integer(Timestamp). binary_to_integer(Timestamp).
proc_value(<<"TEXT">>, ValueTkn, Msg) -> proc_value(<<"TEXT">>, ValueTkn, Msg) ->
case emqx_plugin_libs_rule:proc_tmpl(ValueTkn, Msg) of case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of
<<"undefined">> -> null; <<"undefined">> -> null;
Val -> Val Val -> Val
end; end;
@ -262,7 +262,7 @@ proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> ->
convert_float(replace_var(ValueTkn, Msg)). convert_float(replace_var(ValueTkn, Msg)).
replace_var(Tokens, Data) when is_list(Tokens) -> replace_var(Tokens, Data) when is_list(Tokens) ->
[Val] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), [Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
Val; Val;
replace_var(Val, _Data) -> replace_var(Val, _Data) ->
Val. Val.
@ -410,8 +410,8 @@ device_id(Message, Payloads, State) ->
%% [FIXME] there could be conflicting device-ids in the Payloads %% [FIXME] there could be conflicting device-ids in the Payloads
maps:get(<<"device_id">>, hd(Payloads), undefined); maps:get(<<"device_id">>, hd(Payloads), undefined);
DeviceId -> DeviceId ->
DeviceIdTkn = emqx_plugin_libs_rule:preproc_tmpl(DeviceId), DeviceIdTkn = emqx_placeholder:preproc_tmpl(DeviceId),
emqx_plugin_libs_rule:proc_tmpl(DeviceIdTkn, Message) emqx_placeholder:proc_tmpl(DeviceIdTkn, Message)
end. end.
handle_response({ok, 200, _Headers, Body} = Resp) -> handle_response({ok, 200, _Headers, Body} = Resp) ->

View File

@ -68,7 +68,7 @@
resource_id := resource_id(), resource_id := resource_id(),
topic_mapping := #{ topic_mapping := #{
kafka_topic() := #{ kafka_topic() := #{
payload_template := emqx_plugin_libs_rule:tmpl_token(), payload_template := emqx_placeholder:tmpl_token(),
mqtt_topic => emqx_types:topic(), mqtt_topic => emqx_types:topic(),
qos => emqx_types:qos() qos => emqx_types:qos()
} }
@ -82,7 +82,7 @@
resource_id := resource_id(), resource_id := resource_id(),
topic_mapping := #{ topic_mapping := #{
kafka_topic() := #{ kafka_topic() := #{
payload_template := emqx_plugin_libs_rule:tmpl_token(), payload_template := emqx_placeholder:tmpl_token(),
mqtt_topic => emqx_types:topic(), mqtt_topic => emqx_types:topic(),
qos => emqx_types:qos() qos => emqx_types:qos()
} }
@ -536,7 +536,7 @@ convert_topic_mapping(TopicMappingList) ->
qos := QoS, qos := QoS,
payload_template := PayloadTemplate0 payload_template := PayloadTemplate0
} = Fields, } = Fields,
PayloadTemplate = emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate0), PayloadTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate0),
Acc#{ Acc#{
KafkaTopic => #{ KafkaTopic => #{
payload_template => PayloadTemplate, payload_template => PayloadTemplate,
@ -559,7 +559,7 @@ render(FullMessage, PayloadTemplate) ->
emqx_plugin_libs_rule:bin(X) emqx_plugin_libs_rule:bin(X)
end end
}, },
emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, FullMessage, Opts). emqx_placeholder:proc_tmpl(PayloadTemplate, FullMessage, Opts).
encode(Value, none) -> encode(Value, none) ->
Value; Value;

View File

@ -240,7 +240,7 @@ compile_message_template(T) ->
}. }.
preproc_tmpl(Tmpl) -> preproc_tmpl(Tmpl) ->
emqx_plugin_libs_rule:preproc_tmpl(Tmpl). emqx_placeholder:preproc_tmpl(Tmpl).
render_message( render_message(
#{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message
@ -259,7 +259,7 @@ render(Template, Message) ->
end, end,
return => full_binary return => full_binary
}, },
emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts). emqx_placeholder:proc_tmpl(Template, Message, Opts).
render_timestamp(Template, Message) -> render_timestamp(Template, Message) ->
try try

View File

@ -21,7 +21,7 @@
-export_type([msgvars/0]). -export_type([msgvars/0]).
-type template() :: emqx_plugin_libs_rule:tmpl_token(). -type template() :: emqx_placeholder:tmpl_token().
-type msgvars() :: #{ -type msgvars() :: #{
topic => template(), topic => template(),
@ -48,7 +48,7 @@ parse(Conf) ->
parse_field(Key, Conf, Acc) -> parse_field(Key, Conf, Acc) ->
case Conf of case Conf of
#{Key := Val} when is_binary(Val) -> #{Key := Val} when is_binary(Val) ->
Acc#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)}; Acc#{Key => emqx_placeholder:preproc_tmpl(Val)};
#{Key := Val} -> #{Key := Val} ->
Acc#{Key => Val}; Acc#{Key => Val};
#{} -> #{} ->

View File

@ -34,8 +34,8 @@
value := binary() value := binary()
}. }.
-type message_template() :: #{ -type message_template() :: #{
key := emqx_plugin_libs_rule:tmpl_token(), key := emqx_placeholder:tmpl_token(),
value := emqx_plugin_libs_rule:tmpl_token() value := emqx_placeholder:tmpl_token()
}. }.
-type config() :: #{ -type config() :: #{
authentication := _, authentication := _,
@ -421,7 +421,7 @@ compile_message_template(TemplateOpts) ->
}. }.
preproc_tmpl(Template) -> preproc_tmpl(Template) ->
emqx_plugin_libs_rule:preproc_tmpl(Template). emqx_placeholder:preproc_tmpl(Template).
render_message( render_message(
Message, #{key := KeyTemplate, value := ValueTemplate} Message, #{key := KeyTemplate, value := ValueTemplate}
@ -439,7 +439,7 @@ render(Message, Template) ->
end, end,
return => full_binary return => full_binary
}, },
emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts). emqx_placeholder:proc_tmpl(Template, Message, Opts).
get_producer_status(Producers) -> get_producer_status(Producers) ->
case pulsar_producers:all_connected(Producers) of case pulsar_producers:all_connected(Producers) of

View File

@ -225,7 +225,7 @@ on_start(
{pool_size, PoolSize}, {pool_size, PoolSize},
{pool, InstanceID} {pool, InstanceID}
], ],
ProcessedTemplate = emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate), ProcessedTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate),
State = #{ State = #{
poolname => InstanceID, poolname => InstanceID,
processed_payload_template => ProcessedTemplate, processed_payload_template => ProcessedTemplate,
@ -547,7 +547,7 @@ is_send_message_atom(_) ->
format_data([], Msg) -> format_data([], Msg) ->
emqx_utils_json:encode(Msg); emqx_utils_json:encode(Msg);
format_data(Tokens, Msg) -> format_data(Tokens, Msg) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg). emqx_placeholder:proc_tmpl(Tokens, Msg).
handle_result({error, ecpool_empty}) -> handle_result({error, ecpool_empty}) ->
{error, {recoverable_error, ecpool_empty}}; {error, {recoverable_error, ecpool_empty}};

View File

@ -102,7 +102,7 @@ on_start(
emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS) emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS)
), ),
ClientId = client_id(InstanceId), ClientId = client_id(InstanceId),
TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic), TopicTks = emqx_placeholder:preproc_tmpl(Topic),
#{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config), #{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config),
ClientCfg = #{acl_info => AclInfo}, ClientCfg = #{acl_info => AclInfo},
Templates = parse_template(Config), Templates = parse_template(Config),
@ -240,7 +240,7 @@ parse_template(Config) ->
parse_template(maps:to_list(Templates), #{}). parse_template(maps:to_list(Templates), #{}).
parse_template([{Key, H} | T], Templates) -> parse_template([{Key, H} | T], Templates) ->
ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(H), ParamsTks = emqx_placeholder:preproc_tmpl(H),
parse_template( parse_template(
T, T,
Templates#{Key => ParamsTks} Templates#{Key => ParamsTks}
@ -249,7 +249,7 @@ parse_template([], Templates) ->
Templates. Templates.
get_topic_key({_, Msg}, TopicTks) -> get_topic_key({_, Msg}, TopicTks) ->
emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg); emqx_placeholder:proc_tmpl(TopicTks, Msg);
get_topic_key([Query | _], TopicTks) -> get_topic_key([Query | _], TopicTks) ->
get_topic_key(Query, TopicTks). get_topic_key(Query, TopicTks).
@ -258,14 +258,14 @@ apply_template({Key, Msg} = _Req, Templates) ->
undefined -> undefined ->
emqx_utils_json:encode(Msg); emqx_utils_json:encode(Msg);
Template -> Template ->
emqx_plugin_libs_rule:proc_tmpl(Template, Msg) emqx_placeholder:proc_tmpl(Template, Msg)
end; end;
apply_template([{Key, _} | _] = Reqs, Templates) -> apply_template([{Key, _} | _] = Reqs, Templates) ->
case maps:get(Key, Templates, undefined) of case maps:get(Key, Templates, undefined) of
undefined -> undefined ->
[emqx_utils_json:encode(Msg) || {_, Msg} <- Reqs]; [emqx_utils_json:encode(Msg) || {_, Msg} <- Reqs];
Template -> Template ->
[emqx_plugin_libs_rule:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs] [emqx_placeholder:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs]
end. end.
client_id(ResourceId) -> client_id(ResourceId) ->

View File

@ -455,9 +455,7 @@ parse_sql_template([{Key, H} | T], BatchInsertTks) ->
Key => Key =>
#{ #{
?BATCH_INSERT_PART => InsertSQL, ?BATCH_INSERT_PART => InsertSQL,
?BATCH_PARAMS_TOKENS => emqx_plugin_libs_rule:preproc_tmpl( ?BATCH_PARAMS_TOKENS => emqx_placeholder:preproc_tmpl(Params)
Params
)
} }
} }
); );
@ -478,7 +476,7 @@ parse_sql_template([], BatchInsertTks) ->
apply_template( apply_template(
{?ACTION_SEND_MESSAGE = _Key, _Msg} = Query, Templates {?ACTION_SEND_MESSAGE = _Key, _Msg} = Query, Templates
) -> ) ->
%% TODO: fix emqx_plugin_libs_rule:proc_tmpl/2 %% TODO: fix emqx_placeholder:proc_tmpl/2
%% it won't add single quotes for string %% it won't add single quotes for string
apply_template([Query], Templates); apply_template([Query], Templates);
%% batch inserts %% batch inserts

View File

@ -126,8 +126,8 @@ on_query(InstanceId, {query, SQL}, State) ->
do_query(InstanceId, SQL, State); do_query(InstanceId, SQL, State);
on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) -> on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) ->
case maps:find(Key, InsertTksMap) of case maps:find(Key, InsertTksMap) of
{ok, Tokens} -> {ok, Tokens} when is_map(Data) ->
SQL = emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Data), SQL = emqx_placeholder:proc_sql_param_str(Tokens, Data),
do_query(InstanceId, SQL, State); do_query(InstanceId, SQL, State);
_ -> _ ->
{error, {unrecoverable_error, invalid_request}} {error, {unrecoverable_error, invalid_request}}
@ -136,7 +136,7 @@ on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) ->
%% aggregate the batch queries to one SQL is a heavy job, we should put it in the worker process %% aggregate the batch queries to one SQL is a heavy job, we should put it in the worker process
on_batch_query( on_batch_query(
InstanceId, InstanceId,
[{Key, _} | _] = BatchReq, [{Key, _Data = #{}} | _] = BatchReq,
#{batch_tokens := BatchTksMap, query_opts := Opts} = State #{batch_tokens := BatchTksMap, query_opts := Opts} = State
) -> ) ->
case maps:find(Key, BatchTksMap) of case maps:find(Key, BatchTksMap) of
@ -231,8 +231,8 @@ do_batch_insert(Conn, Tokens, BatchReqs, Opts) ->
aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) -> aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) ->
lists:foldl( lists:foldl(
fun({_, Data}, Acc) -> fun({_, Data}, Acc) ->
InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data), InsertPart = emqx_placeholder:proc_sql_param_str(InsertPartTks, Data),
ParamsPart = emqx_plugin_libs_rule:proc_sql_param_str(ParamsPartTks, Data), ParamsPart = emqx_placeholder:proc_sql_param_str(ParamsPartTks, Data),
Values = maps:get(InsertPart, Acc, []), Values = maps:get(InsertPart, Acc, []),
maps:put(InsertPart, [ParamsPart | Values], Acc) maps:put(InsertPart, [ParamsPart | Values], Acc)
end, end,
@ -260,12 +260,12 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) ->
{ok, select} -> {ok, select} ->
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap); parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap);
{ok, insert} -> {ok, insert} ->
InsertTks = emqx_plugin_libs_rule:preproc_tmpl(H), InsertTks = emqx_placeholder:preproc_tmpl(H),
H1 = string:trim(H, trailing, ";"), H1 = string:trim(H, trailing, ";"),
case split_insert_sql(H1) of case split_insert_sql(H1) of
[_InsertStr, InsertPart, _ValuesStr, ParamsPart] -> [_InsertStr, InsertPart, _ValuesStr, ParamsPart] ->
InsertPartTks = emqx_plugin_libs_rule:preproc_tmpl(InsertPart), InsertPartTks = emqx_placeholder:preproc_tmpl(InsertPart),
ParamsPartTks = emqx_plugin_libs_rule:preproc_tmpl(ParamsPart), ParamsPartTks = emqx_placeholder:preproc_tmpl(ParamsPart),
parse_batch_prepare_sql( parse_batch_prepare_sql(
T, T,
InsertTksMap#{Key => InsertTks}, InsertTksMap#{Key => InsertTks},

View File

@ -496,7 +496,7 @@ t_simple_sql_query(Config) ->
), ),
case EnableBatch of case EnableBatch of
true -> true ->
?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result); ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false -> false ->
?assertMatch({ok, #{<<"code">> := 0, <<"data">> := [[1]]}}, Result) ?assertMatch({ok, #{<<"code">> := 0, <<"data">> := [[1]]}}, Result)
end, end,
@ -535,7 +535,7 @@ t_bad_sql_parameter(Config) ->
2_000 2_000
), ),
?assertMatch({error, #{<<"code">> := _}}, Result), ?assertMatch({error, {unrecoverable_error, invalid_request}}, Result),
ok. ok.
t_nasty_sql_string(Config) -> t_nasty_sql_string(Config) ->

View File

@ -464,8 +464,8 @@ preprocess_request(
} = Req } = Req
) -> ) ->
#{ #{
method => emqx_plugin_libs_rule:preproc_tmpl(to_bin(Method)), method => emqx_placeholder:preproc_tmpl(to_bin(Method)),
path => emqx_plugin_libs_rule:preproc_tmpl(Path), path => emqx_placeholder:preproc_tmpl(Path),
body => maybe_preproc_tmpl(body, Req), body => maybe_preproc_tmpl(body, Req),
headers => wrap_auth_header(preproc_headers(Headers)), headers => wrap_auth_header(preproc_headers(Headers)),
request_timeout => maps:get(request_timeout, Req, ?DEFAULT_REQUEST_TIMEOUT_MS), request_timeout => maps:get(request_timeout, Req, ?DEFAULT_REQUEST_TIMEOUT_MS),
@ -477,8 +477,8 @@ preproc_headers(Headers) when is_map(Headers) ->
fun(K, V, Acc) -> fun(K, V, Acc) ->
[ [
{ {
emqx_plugin_libs_rule:preproc_tmpl(to_bin(K)), emqx_placeholder:preproc_tmpl(to_bin(K)),
emqx_plugin_libs_rule:preproc_tmpl(to_bin(V)) emqx_placeholder:preproc_tmpl(to_bin(V))
} }
| Acc | Acc
] ]
@ -490,8 +490,8 @@ preproc_headers(Headers) when is_list(Headers) ->
lists:map( lists:map(
fun({K, V}) -> fun({K, V}) ->
{ {
emqx_plugin_libs_rule:preproc_tmpl(to_bin(K)), emqx_placeholder:preproc_tmpl(to_bin(K)),
emqx_plugin_libs_rule:preproc_tmpl(to_bin(V)) emqx_placeholder:preproc_tmpl(to_bin(V))
} }
end, end,
Headers Headers
@ -530,7 +530,7 @@ try_bin_to_lower(Bin) ->
maybe_preproc_tmpl(Key, Conf) -> maybe_preproc_tmpl(Key, Conf) ->
case maps:get(Key, Conf, undefined) of case maps:get(Key, Conf, undefined) of
undefined -> undefined; undefined -> undefined;
Val -> emqx_plugin_libs_rule:preproc_tmpl(Val) Val -> emqx_placeholder:preproc_tmpl(Val)
end. end.
process_request( process_request(
@ -544,8 +544,8 @@ process_request(
Msg Msg
) -> ) ->
Conf#{ Conf#{
method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg)), method => make_method(emqx_placeholder:proc_tmpl(MethodTks, Msg)),
path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg), path => emqx_placeholder:proc_tmpl(PathTks, Msg),
body => process_request_body(BodyTks, Msg), body => process_request_body(BodyTks, Msg),
headers => proc_headers(HeadersTks, Msg), headers => proc_headers(HeadersTks, Msg),
request_timeout => ReqTimeout request_timeout => ReqTimeout
@ -554,14 +554,14 @@ process_request(
process_request_body(undefined, Msg) -> process_request_body(undefined, Msg) ->
emqx_utils_json:encode(Msg); emqx_utils_json:encode(Msg);
process_request_body(BodyTks, Msg) -> process_request_body(BodyTks, Msg) ->
emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg). emqx_placeholder:proc_tmpl(BodyTks, Msg).
proc_headers(HeaderTks, Msg) -> proc_headers(HeaderTks, Msg) ->
lists:map( lists:map(
fun({K, V}) -> fun({K, V}) ->
{ {
emqx_plugin_libs_rule:proc_tmpl(K, Msg), emqx_placeholder:proc_tmpl(K, Msg),
emqx_plugin_libs_rule:proc_tmpl(emqx_secret:unwrap(V), Msg) emqx_placeholder:proc_tmpl(emqx_secret:unwrap(V), Msg)
} }
end, end,
HeaderTks HeaderTks

View File

@ -344,7 +344,7 @@ parse_prepare_sql(Config) ->
parse_prepare_sql(maps:to_list(SQL), #{}, #{}, #{}, #{}). parse_prepare_sql(maps:to_list(SQL), #{}, #{}, #{}, #{}).
parse_prepare_sql([{Key, H} | _] = L, Prepares, Tokens, BatchInserts, BatchTks) -> parse_prepare_sql([{Key, H} | _] = L, Prepares, Tokens, BatchInserts, BatchTks) ->
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H), {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H),
parse_batch_prepare_sql( parse_batch_prepare_sql(
L, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}, BatchInserts, BatchTks L, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}, BatchInserts, BatchTks
); );
@ -363,7 +363,7 @@ parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks
{ok, insert} -> {ok, insert} ->
case emqx_plugin_libs_rule:split_insert_sql(H) of case emqx_plugin_libs_rule:split_insert_sql(H) of
{ok, {InsertSQL, Params}} -> {ok, {InsertSQL, Params}} ->
ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(Params), ParamsTks = emqx_placeholder:preproc_tmpl(Params),
parse_prepare_sql( parse_prepare_sql(
T, T,
Prepares, Prepares,
@ -389,7 +389,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens})
undefined -> undefined ->
{SQLOrData, Params}; {SQLOrData, Params};
Tokens -> Tokens ->
{TypeOrKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} {TypeOrKey, emqx_placeholder:proc_sql(Tokens, SQLOrData)}
end. end.
on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) -> on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) ->

View File

@ -188,7 +188,7 @@ on_batch_query(
{error, {unrecoverable_error, batch_prepare_not_implemented}}; {error, {unrecoverable_error, batch_prepare_not_implemented}};
TokenList -> TokenList ->
{_, Datas} = lists:unzip(BatchReq), {_, Datas} = lists:unzip(BatchReq),
Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas],
St = maps:get(BinKey, Sts), St = maps:get(BinKey, Sts),
case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of
{error, _Error} = Result -> {error, _Error} = Result ->
@ -218,7 +218,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens})
undefined -> undefined ->
{SQLOrData, Params}; {SQLOrData, Params};
Tokens -> Tokens ->
{Key, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} {Key, emqx_placeholder:proc_sql(Tokens, SQLOrData)}
end. end.
on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
@ -350,7 +350,7 @@ parse_prepare_sql(Config) ->
parse_prepare_sql(maps:to_list(SQL), #{}, #{}). parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
parse_prepare_sql([{Key, H} | T], Prepares, Tokens) -> parse_prepare_sql([{Key, H} | T], Prepares, Tokens) ->
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '$n'), {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '$n'),
parse_prepare_sql( parse_prepare_sql(
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
); );

View File

@ -301,8 +301,8 @@ feedvar(Override, Packet, ConnInfo, ClientInfo) ->
}, },
maps:map( maps:map(
fun(_K, V) -> fun(_K, V) ->
Tokens = emqx_plugin_libs_rule:preproc_tmpl(V), Tokens = emqx_placeholder:preproc_tmpl(V),
emqx_plugin_libs_rule:proc_tmpl(Tokens, Envs) emqx_placeholder:proc_tmpl(Tokens, Envs)
end, end,
Override Override
). ).

View File

@ -283,8 +283,8 @@ feedvar(Override, Packet, ConnInfo, ClientInfo) ->
}, },
maps:map( maps:map(
fun(_K, V) -> fun(_K, V) ->
Tokens = emqx_plugin_libs_rule:preproc_tmpl(V), Tokens = emqx_placeholder:preproc_tmpl(V),
emqx_plugin_libs_rule:proc_tmpl(Tokens, Envs) emqx_placeholder:proc_tmpl(Tokens, Envs)
end, end,
Override Override
). ).

View File

@ -168,7 +168,7 @@ on_batch_query(
{error, {unrecoverable_error, batch_prepare_not_implemented}}; {error, {unrecoverable_error, batch_prepare_not_implemented}};
TokenList -> TokenList ->
{_, Datas} = lists:unzip(BatchReq), {_, Datas} = lists:unzip(BatchReq),
Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas],
St = maps:get(BinKey, Sts), St = maps:get(BinKey, Sts),
case case
on_sql_query(InstId, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2) on_sql_query(InstId, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2)
@ -204,7 +204,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{
undefined -> undefined ->
{SQLOrData, Params}; {SQLOrData, Params};
Sql -> Sql ->
{Sql, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} {Sql, emqx_placeholder:proc_sql(Tokens, SQLOrData)}
end end
end. end.
@ -305,7 +305,7 @@ parse_prepare_sql(Config) ->
parse_prepare_sql(maps:to_list(SQL), #{}, #{}). parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
parse_prepare_sql([{Key, H} | T], Prepares, Tokens) -> parse_prepare_sql([{Key, H} | T], Prepares, Tokens) ->
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, ':n'), {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, ':n'),
parse_prepare_sql( parse_prepare_sql(
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
); );

View File

@ -19,17 +19,6 @@
%% preprocess and process template string with place holders %% preprocess and process template string with place holders
-export([ -export([
preproc_tmpl/1,
proc_tmpl/2,
proc_tmpl/3,
preproc_cmd/1,
proc_cmd/2,
proc_cmd/3,
preproc_sql/1,
preproc_sql/2,
proc_sql/2,
proc_sql_param_str/2,
proc_cql_param_str/2,
split_insert_sql/1, split_insert_sql/1,
detect_sql_type/1, detect_sql_type/1,
proc_batch_sql/3, proc_batch_sql/3,
@ -61,68 +50,15 @@
]). ]).
-export([ -export([
now_ms/0,
can_topic_match_oneof/2 can_topic_match_oneof/2
]). ]).
-export_type([tmpl_token/0]).
-compile({no_auto_import, [float/1]}). -compile({no_auto_import, [float/1]}).
-type uri_string() :: iodata(). -type uri_string() :: iodata().
-type tmpl_token() :: list({var, binary()} | {str, binary()}). -type tmpl_token() :: list({var, binary()} | {str, binary()}).
-type tmpl_cmd() :: list(tmpl_token()).
-type prepare_statement_key() :: binary().
%% preprocess template string with place holders
-spec preproc_tmpl(binary()) -> tmpl_token().
preproc_tmpl(Str) ->
emqx_placeholder:preproc_tmpl(Str).
-spec proc_tmpl(tmpl_token(), map()) -> binary().
proc_tmpl(Tokens, Data) ->
emqx_placeholder:proc_tmpl(Tokens, Data).
-spec proc_tmpl(tmpl_token(), map(), map()) -> binary() | list().
proc_tmpl(Tokens, Data, Opts) ->
emqx_placeholder:proc_tmpl(Tokens, Data, Opts).
-spec preproc_cmd(binary()) -> tmpl_cmd().
preproc_cmd(Str) ->
emqx_placeholder:preproc_cmd(Str).
-spec proc_cmd([tmpl_token()], map()) -> binary() | list().
proc_cmd(Tokens, Data) ->
emqx_placeholder:proc_cmd(Tokens, Data).
-spec proc_cmd([tmpl_token()], map(), map()) -> list().
proc_cmd(Tokens, Data, Opts) ->
emqx_placeholder:proc_cmd(Tokens, Data, Opts).
%% preprocess SQL with place holders
-spec preproc_sql(Sql :: binary()) -> {prepare_statement_key(), tmpl_token()}.
preproc_sql(Sql) ->
emqx_placeholder:preproc_sql(Sql).
-spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n' | ':n') ->
{prepare_statement_key(), tmpl_token()}.
preproc_sql(Sql, ReplaceWith) ->
emqx_placeholder:preproc_sql(Sql, ReplaceWith).
-spec proc_sql(tmpl_token(), map()) -> list().
proc_sql(Tokens, Data) ->
emqx_placeholder:proc_sql(Tokens, Data).
-spec proc_sql_param_str(tmpl_token(), map()) -> binary().
proc_sql_param_str(Tokens, Data) ->
emqx_placeholder:proc_sql_param_str(Tokens, Data).
-spec proc_cql_param_str(tmpl_token(), map()) -> binary().
proc_cql_param_str(Tokens, Data) ->
emqx_placeholder:proc_cql_param_str(Tokens, Data).
%% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">> %% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">>
-spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when -spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when
InsertSQL :: binary(), InsertSQL :: binary(),
@ -169,7 +105,7 @@ detect_sql_type(SQL) ->
proc_batch_sql(BatchReqs, InsertPart, Tokens) -> proc_batch_sql(BatchReqs, InsertPart, Tokens) ->
ValuesPart = erlang:iolist_to_binary( ValuesPart = erlang:iolist_to_binary(
lists:join($,, [ lists:join($,, [
proc_sql_param_str(Tokens, Msg) emqx_placeholder:proc_sql_param_str(Tokens, Msg)
|| {_, Msg} <- BatchReqs || {_, Msg} <- BatchReqs
]) ])
), ),
@ -353,9 +289,6 @@ number_to_list(Int) when is_integer(Int) ->
number_to_list(Float) when is_float(Float) -> number_to_list(Float) when is_float(Float) ->
float_to_list(Float, [{decimals, 10}, compact]). float_to_list(Float, [{decimals, 10}, compact]).
now_ms() ->
erlang:system_time(millisecond).
can_topic_match_oneof(Topic, Filters) -> can_topic_match_oneof(Topic, Filters) ->
lists:any( lists:any(
fun(Fltr) -> fun(Fltr) ->

View File

@ -65,10 +65,10 @@ pre_process_action_args(
) -> ) ->
Args#{ Args#{
preprocessed_tmpl => #{ preprocessed_tmpl => #{
topic => emqx_plugin_libs_rule:preproc_tmpl(Topic), topic => emqx_placeholder:preproc_tmpl(Topic),
qos => preproc_vars(QoS), qos => preproc_vars(QoS),
retain => preproc_vars(Retain), retain => preproc_vars(Retain),
payload => emqx_plugin_libs_rule:preproc_tmpl(Payload), payload => emqx_placeholder:preproc_tmpl(Payload),
user_properties => preproc_user_properties(UserProperties) user_properties => preproc_user_properties(UserProperties)
} }
}; };
@ -110,7 +110,7 @@ republish(
} }
} }
) -> ) ->
Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected), Topic = emqx_placeholder:proc_tmpl(TopicTks, Selected),
Payload = format_msg(PayloadTks, Selected), Payload = format_msg(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected, 0), QoS = replace_simple_var(QoSTks, Selected, 0),
Retain = replace_simple_var(RetainTks, Selected, false), Retain = replace_simple_var(RetainTks, Selected, false),
@ -189,7 +189,7 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
emqx_metrics:inc_msg(Msg). emqx_metrics:inc_msg(Msg).
preproc_vars(Data) when is_binary(Data) -> preproc_vars(Data) when is_binary(Data) ->
emqx_plugin_libs_rule:preproc_tmpl(Data); emqx_placeholder:preproc_tmpl(Data);
preproc_vars(Data) -> preproc_vars(Data) ->
Data. Data.
@ -201,13 +201,13 @@ preproc_user_properties(<<"${pub_props.'User-Property'}">>) ->
?ORIGINAL_USER_PROPERTIES; ?ORIGINAL_USER_PROPERTIES;
preproc_user_properties(<<"${", _/binary>> = V) -> preproc_user_properties(<<"${", _/binary>> = V) ->
%% use a variable %% use a variable
emqx_plugin_libs_rule:preproc_tmpl(V); emqx_placeholder:preproc_tmpl(V);
preproc_user_properties(_) -> preproc_user_properties(_) ->
%% invalid, discard %% invalid, discard
undefined. undefined.
replace_simple_var(Tokens, Data, Default) when is_list(Tokens) -> replace_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
[Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), [Var] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
case Var of case Var of
%% cannot find the variable from Data %% cannot find the variable from Data
undefined -> Default; undefined -> Default;
@ -219,7 +219,7 @@ replace_simple_var(Val, _Data, _Default) ->
format_msg([], Selected) -> format_msg([], Selected) ->
emqx_utils_json:encode(Selected); emqx_utils_json:encode(Selected);
format_msg(Tokens, Selected) -> format_msg(Tokens, Selected) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected). emqx_placeholder:proc_tmpl(Tokens, Selected).
format_pub_props(UserPropertiesTks, Selected, Env) -> format_pub_props(UserPropertiesTks, Selected, Env) ->
UserProperties = UserProperties =

View File

@ -46,7 +46,7 @@
quote_mysql/1 quote_mysql/1
]). ]).
-include_lib("emqx/include/emqx_placeholder.hrl"). -define(PH_VAR_THIS, '$this').
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})"). -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
@ -55,7 +55,7 @@
%% Space and CRLF %% Space and CRLF
-define(EX_WITHE_CHARS, "\\s"). -define(EX_WITHE_CHARS, "\\s").
-type tmpl_token() :: list({var, binary()} | {str, binary()}). -type tmpl_token() :: list({var, ?PH_VAR_THIS | [binary()]} | {str, binary()}).
-type tmpl_cmd() :: list(tmpl_token()). -type tmpl_cmd() :: list(tmpl_token()).
@ -123,11 +123,11 @@ proc_tmpl(Tokens, Data, Opts = #{return := rawlist}) ->
({str, Str}) -> ({str, Str}) ->
Str; Str;
({var, Phld}) when is_function(Trans, 1) -> ({var, Phld}) when is_function(Trans, 1) ->
Trans(get_phld_var(Phld, Data)); Trans(lookup_var(Phld, Data));
({var, Phld}) when is_function(Trans, 2) -> ({var, Phld}) when is_function(Trans, 2) ->
Trans(Phld, get_phld_var(Phld, Data)); Trans(Phld, lookup_var(Phld, Data));
({var, Phld}) -> ({var, Phld}) ->
get_phld_var(Phld, Data) lookup_var(Phld, Data)
end, end,
Tokens Tokens
). ).
@ -264,13 +264,35 @@ quote_mysql(Str) when is_binary(Str) ->
quote_mysql(Str) -> quote_mysql(Str) ->
quote_escape(Str, fun escape_mysql/1). quote_escape(Str, fun escape_mysql/1).
lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] ->
Value;
lookup_var([Prop | Rest], Data) ->
case lookup(Prop, Data) of
{ok, Value} ->
lookup_var(Rest, Value);
{error, _} ->
undefined
end.
lookup(Prop, Data) when is_binary(Prop) ->
case maps:get(Prop, Data, undefined) of
undefined ->
try
{ok, maps:get(binary_to_existing_atom(Prop, utf8), Data)}
catch
error:{badkey, _} ->
{error, undefined};
error:badarg ->
{error, undefined}
end;
Value ->
{ok, Value}
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Internal functions %% Internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
get_phld_var(Phld, Data) ->
emqx_rule_maps:nested_get(Phld, Data).
preproc_var_re(#{placeholders := PHs, strip_double_quote := true}) -> preproc_var_re(#{placeholders := PHs, strip_double_quote := true}) ->
Res = [ph_to_re(PH) || PH <- PHs], Res = [ph_to_re(PH) || PH <- PHs],
QuoteRes = ["\"" ++ Re ++ "\"" || Re <- Res], QuoteRes = ["\"" ++ Re ++ "\"" || Re <- Res],
@ -340,9 +362,8 @@ parse_nested(<<".", R/binary>>) ->
parse_nested(R); parse_nested(R);
parse_nested(Attr) -> parse_nested(Attr) ->
case string:split(Attr, <<".">>, all) of case string:split(Attr, <<".">>, all) of
[<<>>] -> {var, ?PH_VAR_THIS}; [<<>>] -> ?PH_VAR_THIS;
[Attr] -> {var, Attr}; Nested -> Nested
Nested -> {path, [{key, P} || P <- Nested]}
end. end.
unwrap(<<"\"${", Val/binary>>, _StripDoubleQuote = true) -> unwrap(<<"\"${", Val/binary>>, _StripDoubleQuote = true) ->

View File

@ -225,9 +225,9 @@ start_producer(
msg => "hstreamdb connector: producer started" msg => "hstreamdb connector: producer started"
}), }),
EnableBatch = maps:get(enable_batch, Options, false), EnableBatch = maps:get(enable_batch, Options, false),
Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin), Payload = emqx_placeholder:preproc_tmpl(PayloadBin),
OrderingKeyBin = maps:get(ordering_key, Options, <<"">>), OrderingKeyBin = maps:get(ordering_key, Options, <<"">>),
OrderingKey = emqx_plugin_libs_rule:preproc_tmpl(OrderingKeyBin), OrderingKey = emqx_placeholder:preproc_tmpl(OrderingKeyBin),
State = #{ State = #{
client => Client, client => Client,
producer => Producer, producer => Producer,
@ -254,8 +254,8 @@ start_producer(
end. end.
to_record(OrderingKeyTmpl, PayloadTmpl, Data) -> to_record(OrderingKeyTmpl, PayloadTmpl, Data) ->
OrderingKey = emqx_plugin_libs_rule:proc_tmpl(OrderingKeyTmpl, Data), OrderingKey = emqx_placeholder:proc_tmpl(OrderingKeyTmpl, Data),
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTmpl, Data), Payload = emqx_placeholder:proc_tmpl(PayloadTmpl, Data),
to_record(OrderingKey, Payload). to_record(OrderingKey, Payload).
to_record(OrderingKey, Payload) when is_binary(OrderingKey) -> to_record(OrderingKey, Payload) when is_binary(OrderingKey) ->

View File

@ -57,7 +57,7 @@ on_query(InstanceId, {send_message, Message0}, State) ->
connector_state := ConnectorState connector_state := ConnectorState
} = State, } = State,
NewConnectorState = ConnectorState#{ NewConnectorState = ConnectorState#{
collection => emqx_plugin_libs_rule:proc_tmpl(CollectionTemplate, Message0) collection => emqx_placeholder:proc_tmpl(CollectionTemplate, Message0)
}, },
Message = render_message(PayloadTemplate, Message0), Message = render_message(PayloadTemplate, Message0),
Res = emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, NewConnectorState), Res = emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, NewConnectorState),
@ -76,7 +76,7 @@ on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) ->
preprocess_template(undefined = _PayloadTemplate) -> preprocess_template(undefined = _PayloadTemplate) ->
undefined; undefined;
preprocess_template(PayloadTemplate) -> preprocess_template(PayloadTemplate) ->
emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate). emqx_placeholder:preproc_tmpl(PayloadTemplate).
render_message(undefined = _PayloadTemplate, Message) -> render_message(undefined = _PayloadTemplate, Message) ->
Message; Message;
@ -102,14 +102,14 @@ format_data(PayloadTks, Msg) ->
case maps:size(PreparedTupleMap) of case maps:size(PreparedTupleMap) of
% If no tuples were found simply proceed with the json decoding and be done with it % If no tuples were found simply proceed with the json decoding and be done with it
0 -> 0 ->
emqx_utils_json:decode(emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Msg), [return_maps]); emqx_utils_json:decode(emqx_placeholder:proc_tmpl(PayloadTks, Msg), [return_maps]);
_ -> _ ->
% If tuples were found, replace the tuple values with the references created, run % If tuples were found, replace the tuple values with the references created, run
% the modified message through the json parser, and then at the end replace the % the modified message through the json parser, and then at the end replace the
% references with the actual tuple values. % references with the actual tuple values.
ProcessedMessage = replace_message_values_with_references(Msg, PreparedTupleMap), ProcessedMessage = replace_message_values_with_references(Msg, PreparedTupleMap),
DecodedMap = emqx_utils_json:decode( DecodedMap = emqx_utils_json:decode(
emqx_plugin_libs_rule:proc_tmpl(PayloadTks, ProcessedMessage), [return_maps] emqx_placeholder:proc_tmpl(PayloadTks, ProcessedMessage), [return_maps]
), ),
populate_map_with_tuple_values(PreparedTupleMap, DecodedMap) populate_map_with_tuple_values(PreparedTupleMap, DecodedMap)
end. end.

View File

@ -126,13 +126,13 @@ process_batch_data(BatchData, CommandTemplate) ->
proc_command_template(CommandTemplate, Msg) -> proc_command_template(CommandTemplate, Msg) ->
lists:map( lists:map(
fun(ArgTks) -> fun(ArgTks) ->
emqx_plugin_libs_rule:proc_tmpl(ArgTks, Msg, #{return => full_binary}) emqx_placeholder:proc_tmpl(ArgTks, Msg, #{return => full_binary})
end, end,
CommandTemplate CommandTemplate
). ).
preproc_command_template(CommandTemplate) -> preproc_command_template(CommandTemplate) ->
lists:map( lists:map(
fun emqx_plugin_libs_rule:preproc_tmpl/1, fun emqx_placeholder:preproc_tmpl/1,
CommandTemplate CommandTemplate
). ).