From d6c1ee183f2e685c4ac83e8d755d40e2f24e0f35 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 7 Jun 2023 21:19:52 +0300 Subject: [PATCH] refactor(pluglib): move `emqx_placeholder` to utils app Also make user that existing code calls it directly. --- apps/emqx_authn/src/emqx_authn_utils.erl | 12 ++-- apps/emqx_authz/src/emqx_authz_utils.erl | 12 ++-- .../emqx_authz/test/emqx_authz_rule_SUITE.erl | 6 +- .../src/emqx_bridge_cassandra_connector.erl | 8 +-- .../src/emqx_bridge_clickhouse_connector.erl | 10 +-- .../src/emqx_bridge_dynamo_connector.erl | 2 +- .../emqx_bridge_dynamo_connector_client.erl | 2 +- .../src/emqx_bridge_gcp_pubsub_connector.erl | 6 +- .../src/emqx_bridge_influxdb_connector.erl | 20 +++--- .../src/emqx_bridge_iotdb_impl.erl | 16 ++--- .../src/emqx_bridge_kafka_impl_consumer.erl | 8 +-- .../src/emqx_bridge_kafka_impl_producer.erl | 4 +- .../src/emqx_bridge_mqtt_msg.erl | 4 +- .../src/emqx_bridge_pulsar_impl_producer.erl | 8 +-- .../src/emqx_bridge_rabbitmq_connector.erl | 4 +- .../src/emqx_bridge_rocketmq_connector.erl | 10 +-- .../src/emqx_bridge_sqlserver_connector.erl | 6 +- .../src/emqx_bridge_tdengine_connector.erl | 16 ++--- .../test/emqx_bridge_tdengine_SUITE.erl | 4 +- .../src/emqx_connector_http.erl | 24 +++---- .../src/emqx_connector_mysql.erl | 6 +- .../src/emqx_connector_pgsql.erl | 6 +- .../src/emqx_mqttsn_channel.erl | 4 +- .../src/emqx_stomp_channel.erl | 4 +- apps/emqx_oracle/src/emqx_oracle.erl | 6 +- .../src/emqx_plugin_libs_rule.erl | 69 +------------------ .../src/emqx_rule_actions.erl | 14 ++-- .../src/emqx_placeholder.erl | 43 +++++++++--- .../test/emqx_placeholder_SUITE.erl | 0 .../src/emqx_ee_connector_hstreamdb.erl | 8 +-- .../src/emqx_ee_connector_mongodb.erl | 8 +-- .../src/emqx_ee_connector_redis.erl | 4 +- 32 files changed, 151 insertions(+), 203 deletions(-) rename apps/{emqx_plugin_libs => emqx_utils}/src/emqx_placeholder.erl (92%) rename apps/{emqx_plugin_libs => emqx_utils}/test/emqx_placeholder_SUITE.erl (100%) diff --git a/apps/emqx_authn/src/emqx_authn_utils.erl b/apps/emqx_authn/src/emqx_authn_utils.erl index 12520251e..8e168bb5d 100644 --- a/apps/emqx_authn/src/emqx_authn_utils.erl +++ b/apps/emqx_authn/src/emqx_authn_utils.erl @@ -225,21 +225,19 @@ without_password(Credential, [Name | Rest]) -> without_password(Credential, Rest) end. -urlencode_var({var, _} = Var, Value) -> - emqx_http_lib:uri_encode(handle_var(Var, Value)); urlencode_var(Var, Value) -> - handle_var(Var, Value). + emqx_http_lib:uri_encode(handle_var(Var, Value)). -handle_var({var, _Name}, undefined) -> +handle_var(_Name, undefined) -> <<>>; -handle_var({var, <<"peerhost">>}, PeerHost) -> +handle_var([<<"peerhost">>], PeerHost) -> emqx_placeholder:bin(inet:ntoa(PeerHost)); handle_var(_, Value) -> emqx_placeholder:bin(Value). -handle_sql_var({var, _Name}, undefined) -> +handle_sql_var(_Name, undefined) -> <<>>; -handle_sql_var({var, <<"peerhost">>}, PeerHost) -> +handle_sql_var([<<"peerhost">>], PeerHost) -> emqx_placeholder:bin(inet:ntoa(PeerHost)); handle_sql_var(_, Value) -> emqx_placeholder:sql_data(Value). diff --git a/apps/emqx_authz/src/emqx_authz_utils.erl b/apps/emqx_authz/src/emqx_authz_utils.erl index c01505680..ec112070e 100644 --- a/apps/emqx_authz/src/emqx_authz_utils.erl +++ b/apps/emqx_authz/src/emqx_authz_utils.erl @@ -188,21 +188,19 @@ convert_client_var({dn, DN}) -> {cert_subject, DN}; convert_client_var({protocol, Proto}) -> {proto_name, Proto}; convert_client_var(Other) -> Other. -urlencode_var({var, _} = Var, Value) -> - emqx_http_lib:uri_encode(handle_var(Var, Value)); urlencode_var(Var, Value) -> - handle_var(Var, Value). + emqx_http_lib:uri_encode(handle_var(Var, Value)). -handle_var({var, _Name}, undefined) -> +handle_var(_Name, undefined) -> <<>>; -handle_var({var, <<"peerhost">>}, IpAddr) -> +handle_var([<<"peerhost">>], IpAddr) -> inet_parse:ntoa(IpAddr); handle_var(_Name, Value) -> emqx_placeholder:bin(Value). -handle_sql_var({var, _Name}, undefined) -> +handle_sql_var(_Name, undefined) -> <<>>; -handle_sql_var({var, <<"peerhost">>}, IpAddr) -> +handle_sql_var([<<"peerhost">>], IpAddr) -> inet_parse:ntoa(IpAddr); handle_sql_var(_Name, Value) -> emqx_placeholder:sql_data(Value). diff --git a/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl b/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl index 76e5677ce..fbfb84785 100644 --- a/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl @@ -81,7 +81,7 @@ t_compile(_) -> {{127, 0, 0, 1}, {127, 0, 0, 1}, 32}, {{192, 168, 1, 0}, {192, 168, 1, 255}, 24} ]}, - subscribe, [{pattern, [{var, {var, <<"clientid">>}}]}]}, + subscribe, [{pattern, [{var, [<<"clientid">>]}]}]}, emqx_authz_rule:compile(?SOURCE3) ), @@ -99,14 +99,14 @@ t_compile(_) -> {clientid, {re_pattern, _, _, _, _}} ]}, publish, [ - {pattern, [{var, {var, <<"username">>}}]}, {pattern, [{var, {var, <<"clientid">>}}]} + {pattern, [{var, [<<"username">>]}]}, {pattern, [{var, [<<"clientid">>]}]} ]}, emqx_authz_rule:compile(?SOURCE5) ), ?assertEqual( {allow, {username, {eq, <<"test">>}}, publish, [ - {pattern, [{str, <<"t/foo">>}, {var, {var, <<"username">>}}, {str, <<"boo">>}]} + {pattern, [{str, <<"t/foo">>}, {var, [<<"username">>]}, {str, <<"boo">>}]} ]}, emqx_authz_rule:compile(?SOURCE6) ), diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index 285825714..ad41329d2 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -273,10 +273,10 @@ proc_cql_params( %% assert _PreparedKey = maps:get(PreparedKey0, Prepares), Tokens = maps:get(PreparedKey0, ParamsTokens), - {PreparedKey0, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}; + {PreparedKey0, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}; proc_cql_params(query, SQL, Params, _State) -> - {SQL1, Tokens} = emqx_plugin_libs_rule:preproc_sql(SQL, '?'), - {SQL1, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}. + {SQL1, Tokens} = emqx_placeholder:preproc_sql(SQL, '?'), + {SQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}. exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when Type == query; Type == prepared_query @@ -403,7 +403,7 @@ parse_prepare_cql(_) -> #{prepare_cql => #{}, params_tokens => #{}}. parse_prepare_cql([{Key, H} | T], Prepares, Tokens) -> - {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '?'), + {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '?'), parse_prepare_cql( T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} ); diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index 211e41174..002460c95 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -193,7 +193,7 @@ prepare_sql_templates(#{ batch_value_separator := Separator }) -> InsertTemplate = - emqx_plugin_libs_rule:preproc_tmpl(Template), + emqx_placeholder:preproc_tmpl(Template), BulkExtendInsertTemplate = prepare_sql_bulk_extend_template(Template, Separator), #{ @@ -210,7 +210,7 @@ prepare_sql_bulk_extend_template(Template, Separator) -> %% Add separator before ValuesTemplate so that one can append it %% to an insert template ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]), - emqx_plugin_libs_rule:preproc_tmpl(ExtendParamTemplate). + emqx_placeholder:preproc_tmpl(ExtendParamTemplate). %% This function is similar to emqx_plugin_libs_rule:split_insert_sql/1 but can %% also handle Clickhouse's SQL extension for INSERT statments that allows the @@ -363,7 +363,7 @@ on_query( transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL). get_sql(send_message, #{send_message_template := PreparedSQL}, Data) -> - emqx_plugin_libs_rule:proc_tmpl(PreparedSQL, Data); + emqx_placeholder:proc_tmpl(PreparedSQL, Data); get_sql(_, _, SQL) -> SQL. @@ -421,10 +421,10 @@ objects_to_sql( } ) -> %% Prepare INSERT-statement and the first row after VALUES - InsertStatementHead = emqx_plugin_libs_rule:proc_tmpl(InsertTemplate, FirstObject), + InsertStatementHead = emqx_placeholder:proc_tmpl(InsertTemplate, FirstObject), FormatObjectDataFunction = fun(Object) -> - emqx_plugin_libs_rule:proc_tmpl(BulkExtendInsertTemplate, Object) + emqx_placeholder:proc_tmpl(BulkExtendInsertTemplate, Object) end, InsertStatementTail = lists:map(FormatObjectDataFunction, RemainingObjects), CompleteStatement = erlang:iolist_to_binary([InsertStatementHead, InsertStatementTail]), diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 0cc3f993b..b0e145e63 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -200,7 +200,7 @@ parse_template(Config) -> parse_template(maps:to_list(Templates), #{}). parse_template([{Key, H} | T], Templates) -> - ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(H), + ParamsTks = emqx_placeholder:preproc_tmpl(H), parse_template( T, Templates#{Key => ParamsTks} diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl index faaef9df4..1b379298f 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl @@ -144,7 +144,7 @@ apply_template({Key, Msg} = Req, Templates) -> undefined -> Req; Template -> - {Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)} + {Key, emqx_placeholder:proc_tmpl(Template, Msg)} end; %% now there is no batch delete, so %% 1. we can simply replace the `send_message` to `put` diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index 65a0336ec..58a3f6612 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -40,7 +40,7 @@ connect_timeout := timer:time(), jwt_config := emqx_connector_jwt:jwt_config(), max_retries := non_neg_integer(), - payload_template := emqx_plugin_libs_rule:tmpl_token(), + payload_template := emqx_placeholder:tmpl_token(), pool_name := binary(), project_id := binary(), pubsub_topic := binary(), @@ -104,7 +104,7 @@ on_start( connect_timeout => ConnectTimeout, jwt_config => JWTConfig, max_retries => MaxRetries, - payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate), + payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), pool_name => ResourceId, project_id => ProjectId, pubsub_topic => PubSubTopic, @@ -294,7 +294,7 @@ encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) -> Interpolated = case PayloadTemplate of [] -> emqx_utils_json:encode(Selected); - _ -> emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, Selected) + _ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected) end, #{data => base64:encode(Interpolated)}. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 05e7c11b2..71ab85c58 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -436,7 +436,7 @@ to_config([Item0 | Rest], Acc, Precision) -> Ts0 = maps:get(timestamp, Item0, undefined), {Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision), Item = #{ - measurement => emqx_plugin_libs_rule:preproc_tmpl(maps:get(measurement, Item0)), + measurement => emqx_placeholder:preproc_tmpl(maps:get(measurement, Item0)), timestamp => Ts, precision => {FromPrecision, ToPrecision}, tags => to_kv_config(maps:get(tags, Item0)), @@ -458,18 +458,18 @@ preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) -> preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) -> preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision); preproc_tmpl_timestamp(<> = Ts, Precision) -> - {emqx_plugin_libs_rule:preproc_tmpl(Ts), ms, Precision}; + {emqx_placeholder:preproc_tmpl(Ts), ms, Precision}; preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) -> %% a placehold is in use. e.g. ${payload.my_timestamp} %% we can only hope it the value will be of the same precision in the configs - {emqx_plugin_libs_rule:preproc_tmpl(Ts), Precision, Precision}. + {emqx_placeholder:preproc_tmpl(Ts), Precision, Precision}. to_kv_config(KVfields) -> maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)). to_maps_config(K, V, Res) -> - NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)), - NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)), + NK = emqx_placeholder:preproc_tmpl(bin(K)), + NV = emqx_placeholder:preproc_tmpl(bin(V)), Res#{NK => NV}. %% ------------------------------------------------------------------------------------------------- @@ -505,7 +505,7 @@ parse_batch_data(InstId, BatchData, SyntaxLines) -> fields := [{binary(), binary()}], measurement := binary(), tags := [{binary(), binary()}], - timestamp := emqx_plugin_libs_rule:tmpl_token() | integer(), + timestamp := emqx_placeholder:tmpl_token() | integer(), precision := {From :: ts_precision(), To :: ts_precision()} } ]) -> {ok, [map()]} | {error, term()}. @@ -526,7 +526,7 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error is_list(Ts) -> TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, - case parse_timestamp(emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions)) of + case parse_timestamp(emqx_placeholder:proc_tmpl(Ts, Data, TransOptions)) of {ok, TsInt} -> Item1 = Item#{timestamp => TsInt}, continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc); @@ -573,7 +573,7 @@ line_to_point( {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), maps:without([precision], Item#{ - measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data), + measurement => emqx_placeholder:proc_tmpl(Measurement, Data), tags => EncodedTags, fields => EncodedFields, timestamp => maybe_convert_time_unit(Ts, Precision) @@ -590,8 +590,8 @@ time_unit(ns) -> nanosecond. maps_config_to_data(K, V, {Data, Res}) -> KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, - NK0 = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions), - NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, VTransOptions), + NK0 = emqx_placeholder:proc_tmpl(K, Data, KTransOptions), + NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions), case {NK0, NV} of {[undefined], _} -> {Data, Res}; diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index 0e1934525..087906549 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -185,7 +185,7 @@ preproc_data( timestamp => maybe_preproc_tmpl( maps:get(<<"timestamp">>, Data, <<"now">>) ), - measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), + measurement => emqx_placeholder:preproc_tmpl(Measurement), data_type => DataType, value => maybe_preproc_tmpl(Value) } @@ -203,7 +203,7 @@ preproc_data(_NoMatch, Acc) -> Acc. maybe_preproc_tmpl(Value) when is_binary(Value) -> - emqx_plugin_libs_rule:preproc_tmpl(Value); + emqx_placeholder:preproc_tmpl(Value); maybe_preproc_tmpl(Value) -> Value. @@ -225,7 +225,7 @@ proc_data(PreProcessedData, Msg) -> ) -> #{ timestamp => iot_timestamp(TimestampTkn, Msg, Nows), - measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Msg), + measurement => emqx_placeholder:proc_tmpl(Measurement, Msg), data_type => DataType, value => proc_value(DataType, ValueTkn, Msg) } @@ -236,7 +236,7 @@ proc_data(PreProcessedData, Msg) -> iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) -> Timestamp; iot_timestamp(TimestampTkn, Msg, Nows) -> - iot_timestamp(emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows). + iot_timestamp(emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows). iot_timestamp(Timestamp, #{now_ms := NowMs}) when Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>> @@ -250,7 +250,7 @@ iot_timestamp(Timestamp, _) when is_binary(Timestamp) -> binary_to_integer(Timestamp). proc_value(<<"TEXT">>, ValueTkn, Msg) -> - case emqx_plugin_libs_rule:proc_tmpl(ValueTkn, Msg) of + case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of <<"undefined">> -> null; Val -> Val end; @@ -262,7 +262,7 @@ proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> -> convert_float(replace_var(ValueTkn, Msg)). replace_var(Tokens, Data) when is_list(Tokens) -> - [Val] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), + [Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}), Val; replace_var(Val, _Data) -> Val. @@ -410,8 +410,8 @@ device_id(Message, Payloads, State) -> %% [FIXME] there could be conflicting device-ids in the Payloads maps:get(<<"device_id">>, hd(Payloads), undefined); DeviceId -> - DeviceIdTkn = emqx_plugin_libs_rule:preproc_tmpl(DeviceId), - emqx_plugin_libs_rule:proc_tmpl(DeviceIdTkn, Message) + DeviceIdTkn = emqx_placeholder:preproc_tmpl(DeviceId), + emqx_placeholder:proc_tmpl(DeviceIdTkn, Message) end. handle_response({ok, 200, _Headers, Body} = Resp) -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index c0de23d94..9d03d53f9 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -68,7 +68,7 @@ resource_id := resource_id(), topic_mapping := #{ kafka_topic() := #{ - payload_template := emqx_plugin_libs_rule:tmpl_token(), + payload_template := emqx_placeholder:tmpl_token(), mqtt_topic => emqx_types:topic(), qos => emqx_types:qos() } @@ -82,7 +82,7 @@ resource_id := resource_id(), topic_mapping := #{ kafka_topic() := #{ - payload_template := emqx_plugin_libs_rule:tmpl_token(), + payload_template := emqx_placeholder:tmpl_token(), mqtt_topic => emqx_types:topic(), qos => emqx_types:qos() } @@ -536,7 +536,7 @@ convert_topic_mapping(TopicMappingList) -> qos := QoS, payload_template := PayloadTemplate0 } = Fields, - PayloadTemplate = emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate0), + PayloadTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate0), Acc#{ KafkaTopic => #{ payload_template => PayloadTemplate, @@ -559,7 +559,7 @@ render(FullMessage, PayloadTemplate) -> emqx_plugin_libs_rule:bin(X) end }, - emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, FullMessage, Opts). + emqx_placeholder:proc_tmpl(PayloadTemplate, FullMessage, Opts). encode(Value, none) -> Value; diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 8b8337b09..cb8427cc5 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -240,7 +240,7 @@ compile_message_template(T) -> }. preproc_tmpl(Tmpl) -> - emqx_plugin_libs_rule:preproc_tmpl(Tmpl). + emqx_placeholder:preproc_tmpl(Tmpl). render_message( #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message @@ -259,7 +259,7 @@ render(Template, Message) -> end, return => full_binary }, - emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts). + emqx_placeholder:proc_tmpl(Template, Message, Opts). render_timestamp(Template, Message) -> try diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl index 8a8cffe55..48cae70d7 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl @@ -21,7 +21,7 @@ -export_type([msgvars/0]). --type template() :: emqx_plugin_libs_rule:tmpl_token(). +-type template() :: emqx_placeholder:tmpl_token(). -type msgvars() :: #{ topic => template(), @@ -48,7 +48,7 @@ parse(Conf) -> parse_field(Key, Conf, Acc) -> case Conf of #{Key := Val} when is_binary(Val) -> - Acc#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)}; + Acc#{Key => emqx_placeholder:preproc_tmpl(Val)}; #{Key := Val} -> Acc#{Key => Val}; #{} -> diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index b8157d4fc..0b5885c16 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -34,8 +34,8 @@ value := binary() }. -type message_template() :: #{ - key := emqx_plugin_libs_rule:tmpl_token(), - value := emqx_plugin_libs_rule:tmpl_token() + key := emqx_placeholder:tmpl_token(), + value := emqx_placeholder:tmpl_token() }. -type config() :: #{ authentication := _, @@ -421,7 +421,7 @@ compile_message_template(TemplateOpts) -> }. preproc_tmpl(Template) -> - emqx_plugin_libs_rule:preproc_tmpl(Template). + emqx_placeholder:preproc_tmpl(Template). render_message( Message, #{key := KeyTemplate, value := ValueTemplate} @@ -439,7 +439,7 @@ render(Message, Template) -> end, return => full_binary }, - emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts). + emqx_placeholder:proc_tmpl(Template, Message, Opts). get_producer_status(Producers) -> case pulsar_producers:all_connected(Producers) of diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 0dc73b5f6..c7aca7633 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -225,7 +225,7 @@ on_start( {pool_size, PoolSize}, {pool, InstanceID} ], - ProcessedTemplate = emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate), + ProcessedTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate), State = #{ poolname => InstanceID, processed_payload_template => ProcessedTemplate, @@ -547,7 +547,7 @@ is_send_message_atom(_) -> format_data([], Msg) -> emqx_utils_json:encode(Msg); format_data(Tokens, Msg) -> - emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg). + emqx_placeholder:proc_tmpl(Tokens, Msg). handle_result({error, ecpool_empty}) -> {error, {recoverable_error, ecpool_empty}}; diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index a7d01960e..6167b7749 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -102,7 +102,7 @@ on_start( emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS) ), ClientId = client_id(InstanceId), - TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic), + TopicTks = emqx_placeholder:preproc_tmpl(Topic), #{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config), ClientCfg = #{acl_info => AclInfo}, Templates = parse_template(Config), @@ -240,7 +240,7 @@ parse_template(Config) -> parse_template(maps:to_list(Templates), #{}). parse_template([{Key, H} | T], Templates) -> - ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(H), + ParamsTks = emqx_placeholder:preproc_tmpl(H), parse_template( T, Templates#{Key => ParamsTks} @@ -249,7 +249,7 @@ parse_template([], Templates) -> Templates. get_topic_key({_, Msg}, TopicTks) -> - emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg); + emqx_placeholder:proc_tmpl(TopicTks, Msg); get_topic_key([Query | _], TopicTks) -> get_topic_key(Query, TopicTks). @@ -258,14 +258,14 @@ apply_template({Key, Msg} = _Req, Templates) -> undefined -> emqx_utils_json:encode(Msg); Template -> - emqx_plugin_libs_rule:proc_tmpl(Template, Msg) + emqx_placeholder:proc_tmpl(Template, Msg) end; apply_template([{Key, _} | _] = Reqs, Templates) -> case maps:get(Key, Templates, undefined) of undefined -> [emqx_utils_json:encode(Msg) || {_, Msg} <- Reqs]; Template -> - [emqx_plugin_libs_rule:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs] + [emqx_placeholder:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs] end. client_id(ResourceId) -> diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 52bd910db..edd8770f5 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -455,9 +455,7 @@ parse_sql_template([{Key, H} | T], BatchInsertTks) -> Key => #{ ?BATCH_INSERT_PART => InsertSQL, - ?BATCH_PARAMS_TOKENS => emqx_plugin_libs_rule:preproc_tmpl( - Params - ) + ?BATCH_PARAMS_TOKENS => emqx_placeholder:preproc_tmpl(Params) } } ); @@ -478,7 +476,7 @@ parse_sql_template([], BatchInsertTks) -> apply_template( {?ACTION_SEND_MESSAGE = _Key, _Msg} = Query, Templates ) -> - %% TODO: fix emqx_plugin_libs_rule:proc_tmpl/2 + %% TODO: fix emqx_placeholder:proc_tmpl/2 %% it won't add single quotes for string apply_template([Query], Templates); %% batch inserts diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 8fd41443c..6c1d06429 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -126,8 +126,8 @@ on_query(InstanceId, {query, SQL}, State) -> do_query(InstanceId, SQL, State); on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) -> case maps:find(Key, InsertTksMap) of - {ok, Tokens} -> - SQL = emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Data), + {ok, Tokens} when is_map(Data) -> + SQL = emqx_placeholder:proc_sql_param_str(Tokens, Data), do_query(InstanceId, SQL, State); _ -> {error, {unrecoverable_error, invalid_request}} @@ -136,7 +136,7 @@ on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) -> %% aggregate the batch queries to one SQL is a heavy job, we should put it in the worker process on_batch_query( InstanceId, - [{Key, _} | _] = BatchReq, + [{Key, _Data = #{}} | _] = BatchReq, #{batch_tokens := BatchTksMap, query_opts := Opts} = State ) -> case maps:find(Key, BatchTksMap) of @@ -231,8 +231,8 @@ do_batch_insert(Conn, Tokens, BatchReqs, Opts) -> aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) -> lists:foldl( fun({_, Data}, Acc) -> - InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data), - ParamsPart = emqx_plugin_libs_rule:proc_sql_param_str(ParamsPartTks, Data), + InsertPart = emqx_placeholder:proc_sql_param_str(InsertPartTks, Data), + ParamsPart = emqx_placeholder:proc_sql_param_str(ParamsPartTks, Data), Values = maps:get(InsertPart, Acc, []), maps:put(InsertPart, [ParamsPart | Values], Acc) end, @@ -260,12 +260,12 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) -> {ok, select} -> parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap); {ok, insert} -> - InsertTks = emqx_plugin_libs_rule:preproc_tmpl(H), + InsertTks = emqx_placeholder:preproc_tmpl(H), H1 = string:trim(H, trailing, ";"), case split_insert_sql(H1) of [_InsertStr, InsertPart, _ValuesStr, ParamsPart] -> - InsertPartTks = emqx_plugin_libs_rule:preproc_tmpl(InsertPart), - ParamsPartTks = emqx_plugin_libs_rule:preproc_tmpl(ParamsPart), + InsertPartTks = emqx_placeholder:preproc_tmpl(InsertPart), + ParamsPartTks = emqx_placeholder:preproc_tmpl(ParamsPart), parse_batch_prepare_sql( T, InsertTksMap#{Key => InsertTks}, diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 55f8bb69e..7644921f0 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -496,7 +496,7 @@ t_simple_sql_query(Config) -> ), case EnableBatch of true -> - ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result); + ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); false -> ?assertMatch({ok, #{<<"code">> := 0, <<"data">> := [[1]]}}, Result) end, @@ -535,7 +535,7 @@ t_bad_sql_parameter(Config) -> 2_000 ), - ?assertMatch({error, #{<<"code">> := _}}, Result), + ?assertMatch({error, {unrecoverable_error, invalid_request}}, Result), ok. t_nasty_sql_string(Config) -> diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index c44212cc3..149704f76 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -464,8 +464,8 @@ preprocess_request( } = Req ) -> #{ - method => emqx_plugin_libs_rule:preproc_tmpl(to_bin(Method)), - path => emqx_plugin_libs_rule:preproc_tmpl(Path), + method => emqx_placeholder:preproc_tmpl(to_bin(Method)), + path => emqx_placeholder:preproc_tmpl(Path), body => maybe_preproc_tmpl(body, Req), headers => wrap_auth_header(preproc_headers(Headers)), request_timeout => maps:get(request_timeout, Req, ?DEFAULT_REQUEST_TIMEOUT_MS), @@ -477,8 +477,8 @@ preproc_headers(Headers) when is_map(Headers) -> fun(K, V, Acc) -> [ { - emqx_plugin_libs_rule:preproc_tmpl(to_bin(K)), - emqx_plugin_libs_rule:preproc_tmpl(to_bin(V)) + emqx_placeholder:preproc_tmpl(to_bin(K)), + emqx_placeholder:preproc_tmpl(to_bin(V)) } | Acc ] @@ -490,8 +490,8 @@ preproc_headers(Headers) when is_list(Headers) -> lists:map( fun({K, V}) -> { - emqx_plugin_libs_rule:preproc_tmpl(to_bin(K)), - emqx_plugin_libs_rule:preproc_tmpl(to_bin(V)) + emqx_placeholder:preproc_tmpl(to_bin(K)), + emqx_placeholder:preproc_tmpl(to_bin(V)) } end, Headers @@ -530,7 +530,7 @@ try_bin_to_lower(Bin) -> maybe_preproc_tmpl(Key, Conf) -> case maps:get(Key, Conf, undefined) of undefined -> undefined; - Val -> emqx_plugin_libs_rule:preproc_tmpl(Val) + Val -> emqx_placeholder:preproc_tmpl(Val) end. process_request( @@ -544,8 +544,8 @@ process_request( Msg ) -> Conf#{ - method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg)), - path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg), + method => make_method(emqx_placeholder:proc_tmpl(MethodTks, Msg)), + path => emqx_placeholder:proc_tmpl(PathTks, Msg), body => process_request_body(BodyTks, Msg), headers => proc_headers(HeadersTks, Msg), request_timeout => ReqTimeout @@ -554,14 +554,14 @@ process_request( process_request_body(undefined, Msg) -> emqx_utils_json:encode(Msg); process_request_body(BodyTks, Msg) -> - emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg). + emqx_placeholder:proc_tmpl(BodyTks, Msg). proc_headers(HeaderTks, Msg) -> lists:map( fun({K, V}) -> { - emqx_plugin_libs_rule:proc_tmpl(K, Msg), - emqx_plugin_libs_rule:proc_tmpl(emqx_secret:unwrap(V), Msg) + emqx_placeholder:proc_tmpl(K, Msg), + emqx_placeholder:proc_tmpl(emqx_secret:unwrap(V), Msg) } end, HeaderTks diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 25db669ce..a46b21d92 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -344,7 +344,7 @@ parse_prepare_sql(Config) -> parse_prepare_sql(maps:to_list(SQL), #{}, #{}, #{}, #{}). parse_prepare_sql([{Key, H} | _] = L, Prepares, Tokens, BatchInserts, BatchTks) -> - {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H), + {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H), parse_batch_prepare_sql( L, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}, BatchInserts, BatchTks ); @@ -363,7 +363,7 @@ parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks {ok, insert} -> case emqx_plugin_libs_rule:split_insert_sql(H) of {ok, {InsertSQL, Params}} -> - ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(Params), + ParamsTks = emqx_placeholder:preproc_tmpl(Params), parse_prepare_sql( T, Prepares, @@ -389,7 +389,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) undefined -> {SQLOrData, Params}; Tokens -> - {TypeOrKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} + {TypeOrKey, emqx_placeholder:proc_sql(Tokens, SQLOrData)} end. on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) -> diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index cbe95cb33..8fb60f102 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -188,7 +188,7 @@ on_batch_query( {error, {unrecoverable_error, batch_prepare_not_implemented}}; TokenList -> {_, Datas} = lists:unzip(BatchReq), - Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], + Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas], St = maps:get(BinKey, Sts), case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of {error, _Error} = Result -> @@ -218,7 +218,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) undefined -> {SQLOrData, Params}; Tokens -> - {Key, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} + {Key, emqx_placeholder:proc_sql(Tokens, SQLOrData)} end. on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> @@ -350,7 +350,7 @@ parse_prepare_sql(Config) -> parse_prepare_sql(maps:to_list(SQL), #{}, #{}). parse_prepare_sql([{Key, H} | T], Prepares, Tokens) -> - {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '$n'), + {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '$n'), parse_prepare_sql( T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} ); diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index 84334875b..71a891850 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -301,8 +301,8 @@ feedvar(Override, Packet, ConnInfo, ClientInfo) -> }, maps:map( fun(_K, V) -> - Tokens = emqx_plugin_libs_rule:preproc_tmpl(V), - emqx_plugin_libs_rule:proc_tmpl(Tokens, Envs) + Tokens = emqx_placeholder:preproc_tmpl(V), + emqx_placeholder:proc_tmpl(Tokens, Envs) end, Override ). diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index 316432dea..854ccad88 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -283,8 +283,8 @@ feedvar(Override, Packet, ConnInfo, ClientInfo) -> }, maps:map( fun(_K, V) -> - Tokens = emqx_plugin_libs_rule:preproc_tmpl(V), - emqx_plugin_libs_rule:proc_tmpl(Tokens, Envs) + Tokens = emqx_placeholder:preproc_tmpl(V), + emqx_placeholder:proc_tmpl(Tokens, Envs) end, Override ). diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 0f543badd..5d80c0d56 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -168,7 +168,7 @@ on_batch_query( {error, {unrecoverable_error, batch_prepare_not_implemented}}; TokenList -> {_, Datas} = lists:unzip(BatchReq), - Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], + Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas], St = maps:get(BinKey, Sts), case on_sql_query(InstId, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2) @@ -204,7 +204,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{ undefined -> {SQLOrData, Params}; Sql -> - {Sql, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} + {Sql, emqx_placeholder:proc_sql(Tokens, SQLOrData)} end end. @@ -305,7 +305,7 @@ parse_prepare_sql(Config) -> parse_prepare_sql(maps:to_list(SQL), #{}, #{}). parse_prepare_sql([{Key, H} | T], Prepares, Tokens) -> - {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, ':n'), + {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, ':n'), parse_prepare_sql( T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} ); diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index 3bfac1ec4..b3e3507f0 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -19,17 +19,6 @@ %% preprocess and process template string with place holders -export([ - preproc_tmpl/1, - proc_tmpl/2, - proc_tmpl/3, - preproc_cmd/1, - proc_cmd/2, - proc_cmd/3, - preproc_sql/1, - preproc_sql/2, - proc_sql/2, - proc_sql_param_str/2, - proc_cql_param_str/2, split_insert_sql/1, detect_sql_type/1, proc_batch_sql/3, @@ -61,68 +50,15 @@ ]). -export([ - now_ms/0, can_topic_match_oneof/2 ]). --export_type([tmpl_token/0]). - -compile({no_auto_import, [float/1]}). -type uri_string() :: iodata(). -type tmpl_token() :: list({var, binary()} | {str, binary()}). --type tmpl_cmd() :: list(tmpl_token()). - --type prepare_statement_key() :: binary(). - -%% preprocess template string with place holders --spec preproc_tmpl(binary()) -> tmpl_token(). -preproc_tmpl(Str) -> - emqx_placeholder:preproc_tmpl(Str). - --spec proc_tmpl(tmpl_token(), map()) -> binary(). -proc_tmpl(Tokens, Data) -> - emqx_placeholder:proc_tmpl(Tokens, Data). - --spec proc_tmpl(tmpl_token(), map(), map()) -> binary() | list(). -proc_tmpl(Tokens, Data, Opts) -> - emqx_placeholder:proc_tmpl(Tokens, Data, Opts). - --spec preproc_cmd(binary()) -> tmpl_cmd(). -preproc_cmd(Str) -> - emqx_placeholder:preproc_cmd(Str). - --spec proc_cmd([tmpl_token()], map()) -> binary() | list(). -proc_cmd(Tokens, Data) -> - emqx_placeholder:proc_cmd(Tokens, Data). --spec proc_cmd([tmpl_token()], map(), map()) -> list(). -proc_cmd(Tokens, Data, Opts) -> - emqx_placeholder:proc_cmd(Tokens, Data, Opts). - -%% preprocess SQL with place holders --spec preproc_sql(Sql :: binary()) -> {prepare_statement_key(), tmpl_token()}. -preproc_sql(Sql) -> - emqx_placeholder:preproc_sql(Sql). - --spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n' | ':n') -> - {prepare_statement_key(), tmpl_token()}. -preproc_sql(Sql, ReplaceWith) -> - emqx_placeholder:preproc_sql(Sql, ReplaceWith). - --spec proc_sql(tmpl_token(), map()) -> list(). -proc_sql(Tokens, Data) -> - emqx_placeholder:proc_sql(Tokens, Data). - --spec proc_sql_param_str(tmpl_token(), map()) -> binary(). -proc_sql_param_str(Tokens, Data) -> - emqx_placeholder:proc_sql_param_str(Tokens, Data). - --spec proc_cql_param_str(tmpl_token(), map()) -> binary(). -proc_cql_param_str(Tokens, Data) -> - emqx_placeholder:proc_cql_param_str(Tokens, Data). - %% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">> -spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when InsertSQL :: binary(), @@ -169,7 +105,7 @@ detect_sql_type(SQL) -> proc_batch_sql(BatchReqs, InsertPart, Tokens) -> ValuesPart = erlang:iolist_to_binary( lists:join($,, [ - proc_sql_param_str(Tokens, Msg) + emqx_placeholder:proc_sql_param_str(Tokens, Msg) || {_, Msg} <- BatchReqs ]) ), @@ -353,9 +289,6 @@ number_to_list(Int) when is_integer(Int) -> number_to_list(Float) when is_float(Float) -> float_to_list(Float, [{decimals, 10}, compact]). -now_ms() -> - erlang:system_time(millisecond). - can_topic_match_oneof(Topic, Filters) -> lists:any( fun(Fltr) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index b562af09d..a5ad35066 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -65,10 +65,10 @@ pre_process_action_args( ) -> Args#{ preprocessed_tmpl => #{ - topic => emqx_plugin_libs_rule:preproc_tmpl(Topic), + topic => emqx_placeholder:preproc_tmpl(Topic), qos => preproc_vars(QoS), retain => preproc_vars(Retain), - payload => emqx_plugin_libs_rule:preproc_tmpl(Payload), + payload => emqx_placeholder:preproc_tmpl(Payload), user_properties => preproc_user_properties(UserProperties) } }; @@ -110,7 +110,7 @@ republish( } } ) -> - Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected), + Topic = emqx_placeholder:proc_tmpl(TopicTks, Selected), Payload = format_msg(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), @@ -189,7 +189,7 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) -> emqx_metrics:inc_msg(Msg). preproc_vars(Data) when is_binary(Data) -> - emqx_plugin_libs_rule:preproc_tmpl(Data); + emqx_placeholder:preproc_tmpl(Data); preproc_vars(Data) -> Data. @@ -201,13 +201,13 @@ preproc_user_properties(<<"${pub_props.'User-Property'}">>) -> ?ORIGINAL_USER_PROPERTIES; preproc_user_properties(<<"${", _/binary>> = V) -> %% use a variable - emqx_plugin_libs_rule:preproc_tmpl(V); + emqx_placeholder:preproc_tmpl(V); preproc_user_properties(_) -> %% invalid, discard undefined. replace_simple_var(Tokens, Data, Default) when is_list(Tokens) -> - [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), + [Var] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}), case Var of %% cannot find the variable from Data undefined -> Default; @@ -219,7 +219,7 @@ replace_simple_var(Val, _Data, _Default) -> format_msg([], Selected) -> emqx_utils_json:encode(Selected); format_msg(Tokens, Selected) -> - emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected). + emqx_placeholder:proc_tmpl(Tokens, Selected). format_pub_props(UserPropertiesTks, Selected, Env) -> UserProperties = diff --git a/apps/emqx_plugin_libs/src/emqx_placeholder.erl b/apps/emqx_utils/src/emqx_placeholder.erl similarity index 92% rename from apps/emqx_plugin_libs/src/emqx_placeholder.erl rename to apps/emqx_utils/src/emqx_placeholder.erl index dcd666f5b..babac8a84 100644 --- a/apps/emqx_plugin_libs/src/emqx_placeholder.erl +++ b/apps/emqx_utils/src/emqx_placeholder.erl @@ -46,7 +46,7 @@ quote_mysql/1 ]). --include_lib("emqx/include/emqx_placeholder.hrl"). +-define(PH_VAR_THIS, '$this'). -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})"). @@ -55,7 +55,7 @@ %% Space and CRLF -define(EX_WITHE_CHARS, "\\s"). --type tmpl_token() :: list({var, binary()} | {str, binary()}). +-type tmpl_token() :: list({var, ?PH_VAR_THIS | [binary()]} | {str, binary()}). -type tmpl_cmd() :: list(tmpl_token()). @@ -123,11 +123,11 @@ proc_tmpl(Tokens, Data, Opts = #{return := rawlist}) -> ({str, Str}) -> Str; ({var, Phld}) when is_function(Trans, 1) -> - Trans(get_phld_var(Phld, Data)); + Trans(lookup_var(Phld, Data)); ({var, Phld}) when is_function(Trans, 2) -> - Trans(Phld, get_phld_var(Phld, Data)); + Trans(Phld, lookup_var(Phld, Data)); ({var, Phld}) -> - get_phld_var(Phld, Data) + lookup_var(Phld, Data) end, Tokens ). @@ -264,13 +264,35 @@ quote_mysql(Str) when is_binary(Str) -> quote_mysql(Str) -> quote_escape(Str, fun escape_mysql/1). +lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] -> + Value; +lookup_var([Prop | Rest], Data) -> + case lookup(Prop, Data) of + {ok, Value} -> + lookup_var(Rest, Value); + {error, _} -> + undefined + end. + +lookup(Prop, Data) when is_binary(Prop) -> + case maps:get(Prop, Data, undefined) of + undefined -> + try + {ok, maps:get(binary_to_existing_atom(Prop, utf8), Data)} + catch + error:{badkey, _} -> + {error, undefined}; + error:badarg -> + {error, undefined} + end; + Value -> + {ok, Value} + end. + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ -get_phld_var(Phld, Data) -> - emqx_rule_maps:nested_get(Phld, Data). - preproc_var_re(#{placeholders := PHs, strip_double_quote := true}) -> Res = [ph_to_re(PH) || PH <- PHs], QuoteRes = ["\"" ++ Re ++ "\"" || Re <- Res], @@ -340,9 +362,8 @@ parse_nested(<<".", R/binary>>) -> parse_nested(R); parse_nested(Attr) -> case string:split(Attr, <<".">>, all) of - [<<>>] -> {var, ?PH_VAR_THIS}; - [Attr] -> {var, Attr}; - Nested -> {path, [{key, P} || P <- Nested]} + [<<>>] -> ?PH_VAR_THIS; + Nested -> Nested end. unwrap(<<"\"${", Val/binary>>, _StripDoubleQuote = true) -> diff --git a/apps/emqx_plugin_libs/test/emqx_placeholder_SUITE.erl b/apps/emqx_utils/test/emqx_placeholder_SUITE.erl similarity index 100% rename from apps/emqx_plugin_libs/test/emqx_placeholder_SUITE.erl rename to apps/emqx_utils/test/emqx_placeholder_SUITE.erl diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl index 0ae9bb2dc..70eca83d7 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl @@ -225,9 +225,9 @@ start_producer( msg => "hstreamdb connector: producer started" }), EnableBatch = maps:get(enable_batch, Options, false), - Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin), + Payload = emqx_placeholder:preproc_tmpl(PayloadBin), OrderingKeyBin = maps:get(ordering_key, Options, <<"">>), - OrderingKey = emqx_plugin_libs_rule:preproc_tmpl(OrderingKeyBin), + OrderingKey = emqx_placeholder:preproc_tmpl(OrderingKeyBin), State = #{ client => Client, producer => Producer, @@ -254,8 +254,8 @@ start_producer( end. to_record(OrderingKeyTmpl, PayloadTmpl, Data) -> - OrderingKey = emqx_plugin_libs_rule:proc_tmpl(OrderingKeyTmpl, Data), - Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTmpl, Data), + OrderingKey = emqx_placeholder:proc_tmpl(OrderingKeyTmpl, Data), + Payload = emqx_placeholder:proc_tmpl(PayloadTmpl, Data), to_record(OrderingKey, Payload). to_record(OrderingKey, Payload) when is_binary(OrderingKey) -> diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl index 59f763904..20985e961 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl @@ -57,7 +57,7 @@ on_query(InstanceId, {send_message, Message0}, State) -> connector_state := ConnectorState } = State, NewConnectorState = ConnectorState#{ - collection => emqx_plugin_libs_rule:proc_tmpl(CollectionTemplate, Message0) + collection => emqx_placeholder:proc_tmpl(CollectionTemplate, Message0) }, Message = render_message(PayloadTemplate, Message0), Res = emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, NewConnectorState), @@ -76,7 +76,7 @@ on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) -> preprocess_template(undefined = _PayloadTemplate) -> undefined; preprocess_template(PayloadTemplate) -> - emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate). + emqx_placeholder:preproc_tmpl(PayloadTemplate). render_message(undefined = _PayloadTemplate, Message) -> Message; @@ -102,14 +102,14 @@ format_data(PayloadTks, Msg) -> case maps:size(PreparedTupleMap) of % If no tuples were found simply proceed with the json decoding and be done with it 0 -> - emqx_utils_json:decode(emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Msg), [return_maps]); + emqx_utils_json:decode(emqx_placeholder:proc_tmpl(PayloadTks, Msg), [return_maps]); _ -> % If tuples were found, replace the tuple values with the references created, run % the modified message through the json parser, and then at the end replace the % references with the actual tuple values. ProcessedMessage = replace_message_values_with_references(Msg, PreparedTupleMap), DecodedMap = emqx_utils_json:decode( - emqx_plugin_libs_rule:proc_tmpl(PayloadTks, ProcessedMessage), [return_maps] + emqx_placeholder:proc_tmpl(PayloadTks, ProcessedMessage), [return_maps] ), populate_map_with_tuple_values(PreparedTupleMap, DecodedMap) end. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_redis.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_redis.erl index 4ce96d5c7..9caba2beb 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_redis.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_redis.erl @@ -126,13 +126,13 @@ process_batch_data(BatchData, CommandTemplate) -> proc_command_template(CommandTemplate, Msg) -> lists:map( fun(ArgTks) -> - emqx_plugin_libs_rule:proc_tmpl(ArgTks, Msg, #{return => full_binary}) + emqx_placeholder:proc_tmpl(ArgTks, Msg, #{return => full_binary}) end, CommandTemplate ). preproc_command_template(CommandTemplate) -> lists:map( - fun emqx_plugin_libs_rule:preproc_tmpl/1, + fun emqx_placeholder:preproc_tmpl/1, CommandTemplate ).