From d6c1ee183f2e685c4ac83e8d755d40e2f24e0f35 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 7 Jun 2023 21:19:52 +0300 Subject: [PATCH 1/7] refactor(pluglib): move `emqx_placeholder` to utils app Also make user that existing code calls it directly. --- apps/emqx_authn/src/emqx_authn_utils.erl | 12 ++-- apps/emqx_authz/src/emqx_authz_utils.erl | 12 ++-- .../emqx_authz/test/emqx_authz_rule_SUITE.erl | 6 +- .../src/emqx_bridge_cassandra_connector.erl | 8 +-- .../src/emqx_bridge_clickhouse_connector.erl | 10 +-- .../src/emqx_bridge_dynamo_connector.erl | 2 +- .../emqx_bridge_dynamo_connector_client.erl | 2 +- .../src/emqx_bridge_gcp_pubsub_connector.erl | 6 +- .../src/emqx_bridge_influxdb_connector.erl | 20 +++--- .../src/emqx_bridge_iotdb_impl.erl | 16 ++--- .../src/emqx_bridge_kafka_impl_consumer.erl | 8 +-- .../src/emqx_bridge_kafka_impl_producer.erl | 4 +- .../src/emqx_bridge_mqtt_msg.erl | 4 +- .../src/emqx_bridge_pulsar_impl_producer.erl | 8 +-- .../src/emqx_bridge_rabbitmq_connector.erl | 4 +- .../src/emqx_bridge_rocketmq_connector.erl | 10 +-- .../src/emqx_bridge_sqlserver_connector.erl | 6 +- .../src/emqx_bridge_tdengine_connector.erl | 16 ++--- .../test/emqx_bridge_tdengine_SUITE.erl | 4 +- .../src/emqx_connector_http.erl | 24 +++---- .../src/emqx_connector_mysql.erl | 6 +- .../src/emqx_connector_pgsql.erl | 6 +- .../src/emqx_mqttsn_channel.erl | 4 +- .../src/emqx_stomp_channel.erl | 4 +- apps/emqx_oracle/src/emqx_oracle.erl | 6 +- .../src/emqx_plugin_libs_rule.erl | 69 +------------------ .../src/emqx_rule_actions.erl | 14 ++-- .../src/emqx_placeholder.erl | 43 +++++++++--- .../test/emqx_placeholder_SUITE.erl | 0 .../src/emqx_ee_connector_hstreamdb.erl | 8 +-- .../src/emqx_ee_connector_mongodb.erl | 8 +-- .../src/emqx_ee_connector_redis.erl | 4 +- 32 files changed, 151 insertions(+), 203 deletions(-) rename apps/{emqx_plugin_libs => emqx_utils}/src/emqx_placeholder.erl (92%) rename apps/{emqx_plugin_libs => emqx_utils}/test/emqx_placeholder_SUITE.erl (100%) diff --git a/apps/emqx_authn/src/emqx_authn_utils.erl b/apps/emqx_authn/src/emqx_authn_utils.erl index 12520251e..8e168bb5d 100644 --- a/apps/emqx_authn/src/emqx_authn_utils.erl +++ b/apps/emqx_authn/src/emqx_authn_utils.erl @@ -225,21 +225,19 @@ without_password(Credential, [Name | Rest]) -> without_password(Credential, Rest) end. -urlencode_var({var, _} = Var, Value) -> - emqx_http_lib:uri_encode(handle_var(Var, Value)); urlencode_var(Var, Value) -> - handle_var(Var, Value). + emqx_http_lib:uri_encode(handle_var(Var, Value)). -handle_var({var, _Name}, undefined) -> +handle_var(_Name, undefined) -> <<>>; -handle_var({var, <<"peerhost">>}, PeerHost) -> +handle_var([<<"peerhost">>], PeerHost) -> emqx_placeholder:bin(inet:ntoa(PeerHost)); handle_var(_, Value) -> emqx_placeholder:bin(Value). -handle_sql_var({var, _Name}, undefined) -> +handle_sql_var(_Name, undefined) -> <<>>; -handle_sql_var({var, <<"peerhost">>}, PeerHost) -> +handle_sql_var([<<"peerhost">>], PeerHost) -> emqx_placeholder:bin(inet:ntoa(PeerHost)); handle_sql_var(_, Value) -> emqx_placeholder:sql_data(Value). diff --git a/apps/emqx_authz/src/emqx_authz_utils.erl b/apps/emqx_authz/src/emqx_authz_utils.erl index c01505680..ec112070e 100644 --- a/apps/emqx_authz/src/emqx_authz_utils.erl +++ b/apps/emqx_authz/src/emqx_authz_utils.erl @@ -188,21 +188,19 @@ convert_client_var({dn, DN}) -> {cert_subject, DN}; convert_client_var({protocol, Proto}) -> {proto_name, Proto}; convert_client_var(Other) -> Other. -urlencode_var({var, _} = Var, Value) -> - emqx_http_lib:uri_encode(handle_var(Var, Value)); urlencode_var(Var, Value) -> - handle_var(Var, Value). + emqx_http_lib:uri_encode(handle_var(Var, Value)). -handle_var({var, _Name}, undefined) -> +handle_var(_Name, undefined) -> <<>>; -handle_var({var, <<"peerhost">>}, IpAddr) -> +handle_var([<<"peerhost">>], IpAddr) -> inet_parse:ntoa(IpAddr); handle_var(_Name, Value) -> emqx_placeholder:bin(Value). -handle_sql_var({var, _Name}, undefined) -> +handle_sql_var(_Name, undefined) -> <<>>; -handle_sql_var({var, <<"peerhost">>}, IpAddr) -> +handle_sql_var([<<"peerhost">>], IpAddr) -> inet_parse:ntoa(IpAddr); handle_sql_var(_Name, Value) -> emqx_placeholder:sql_data(Value). diff --git a/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl b/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl index 76e5677ce..fbfb84785 100644 --- a/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl @@ -81,7 +81,7 @@ t_compile(_) -> {{127, 0, 0, 1}, {127, 0, 0, 1}, 32}, {{192, 168, 1, 0}, {192, 168, 1, 255}, 24} ]}, - subscribe, [{pattern, [{var, {var, <<"clientid">>}}]}]}, + subscribe, [{pattern, [{var, [<<"clientid">>]}]}]}, emqx_authz_rule:compile(?SOURCE3) ), @@ -99,14 +99,14 @@ t_compile(_) -> {clientid, {re_pattern, _, _, _, _}} ]}, publish, [ - {pattern, [{var, {var, <<"username">>}}]}, {pattern, [{var, {var, <<"clientid">>}}]} + {pattern, [{var, [<<"username">>]}]}, {pattern, [{var, [<<"clientid">>]}]} ]}, emqx_authz_rule:compile(?SOURCE5) ), ?assertEqual( {allow, {username, {eq, <<"test">>}}, publish, [ - {pattern, [{str, <<"t/foo">>}, {var, {var, <<"username">>}}, {str, <<"boo">>}]} + {pattern, [{str, <<"t/foo">>}, {var, [<<"username">>]}, {str, <<"boo">>}]} ]}, emqx_authz_rule:compile(?SOURCE6) ), diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index 285825714..ad41329d2 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -273,10 +273,10 @@ proc_cql_params( %% assert _PreparedKey = maps:get(PreparedKey0, Prepares), Tokens = maps:get(PreparedKey0, ParamsTokens), - {PreparedKey0, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}; + {PreparedKey0, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}; proc_cql_params(query, SQL, Params, _State) -> - {SQL1, Tokens} = emqx_plugin_libs_rule:preproc_sql(SQL, '?'), - {SQL1, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}. + {SQL1, Tokens} = emqx_placeholder:preproc_sql(SQL, '?'), + {SQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}. exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when Type == query; Type == prepared_query @@ -403,7 +403,7 @@ parse_prepare_cql(_) -> #{prepare_cql => #{}, params_tokens => #{}}. parse_prepare_cql([{Key, H} | T], Prepares, Tokens) -> - {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '?'), + {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '?'), parse_prepare_cql( T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} ); diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index 211e41174..002460c95 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -193,7 +193,7 @@ prepare_sql_templates(#{ batch_value_separator := Separator }) -> InsertTemplate = - emqx_plugin_libs_rule:preproc_tmpl(Template), + emqx_placeholder:preproc_tmpl(Template), BulkExtendInsertTemplate = prepare_sql_bulk_extend_template(Template, Separator), #{ @@ -210,7 +210,7 @@ prepare_sql_bulk_extend_template(Template, Separator) -> %% Add separator before ValuesTemplate so that one can append it %% to an insert template ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]), - emqx_plugin_libs_rule:preproc_tmpl(ExtendParamTemplate). + emqx_placeholder:preproc_tmpl(ExtendParamTemplate). %% This function is similar to emqx_plugin_libs_rule:split_insert_sql/1 but can %% also handle Clickhouse's SQL extension for INSERT statments that allows the @@ -363,7 +363,7 @@ on_query( transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL). get_sql(send_message, #{send_message_template := PreparedSQL}, Data) -> - emqx_plugin_libs_rule:proc_tmpl(PreparedSQL, Data); + emqx_placeholder:proc_tmpl(PreparedSQL, Data); get_sql(_, _, SQL) -> SQL. @@ -421,10 +421,10 @@ objects_to_sql( } ) -> %% Prepare INSERT-statement and the first row after VALUES - InsertStatementHead = emqx_plugin_libs_rule:proc_tmpl(InsertTemplate, FirstObject), + InsertStatementHead = emqx_placeholder:proc_tmpl(InsertTemplate, FirstObject), FormatObjectDataFunction = fun(Object) -> - emqx_plugin_libs_rule:proc_tmpl(BulkExtendInsertTemplate, Object) + emqx_placeholder:proc_tmpl(BulkExtendInsertTemplate, Object) end, InsertStatementTail = lists:map(FormatObjectDataFunction, RemainingObjects), CompleteStatement = erlang:iolist_to_binary([InsertStatementHead, InsertStatementTail]), diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 0cc3f993b..b0e145e63 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -200,7 +200,7 @@ parse_template(Config) -> parse_template(maps:to_list(Templates), #{}). parse_template([{Key, H} | T], Templates) -> - ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(H), + ParamsTks = emqx_placeholder:preproc_tmpl(H), parse_template( T, Templates#{Key => ParamsTks} diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl index faaef9df4..1b379298f 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl @@ -144,7 +144,7 @@ apply_template({Key, Msg} = Req, Templates) -> undefined -> Req; Template -> - {Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)} + {Key, emqx_placeholder:proc_tmpl(Template, Msg)} end; %% now there is no batch delete, so %% 1. we can simply replace the `send_message` to `put` diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index 65a0336ec..58a3f6612 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -40,7 +40,7 @@ connect_timeout := timer:time(), jwt_config := emqx_connector_jwt:jwt_config(), max_retries := non_neg_integer(), - payload_template := emqx_plugin_libs_rule:tmpl_token(), + payload_template := emqx_placeholder:tmpl_token(), pool_name := binary(), project_id := binary(), pubsub_topic := binary(), @@ -104,7 +104,7 @@ on_start( connect_timeout => ConnectTimeout, jwt_config => JWTConfig, max_retries => MaxRetries, - payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate), + payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), pool_name => ResourceId, project_id => ProjectId, pubsub_topic => PubSubTopic, @@ -294,7 +294,7 @@ encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) -> Interpolated = case PayloadTemplate of [] -> emqx_utils_json:encode(Selected); - _ -> emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, Selected) + _ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected) end, #{data => base64:encode(Interpolated)}. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 05e7c11b2..71ab85c58 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -436,7 +436,7 @@ to_config([Item0 | Rest], Acc, Precision) -> Ts0 = maps:get(timestamp, Item0, undefined), {Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision), Item = #{ - measurement => emqx_plugin_libs_rule:preproc_tmpl(maps:get(measurement, Item0)), + measurement => emqx_placeholder:preproc_tmpl(maps:get(measurement, Item0)), timestamp => Ts, precision => {FromPrecision, ToPrecision}, tags => to_kv_config(maps:get(tags, Item0)), @@ -458,18 +458,18 @@ preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) -> preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) -> preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision); preproc_tmpl_timestamp(<> = Ts, Precision) -> - {emqx_plugin_libs_rule:preproc_tmpl(Ts), ms, Precision}; + {emqx_placeholder:preproc_tmpl(Ts), ms, Precision}; preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) -> %% a placehold is in use. e.g. ${payload.my_timestamp} %% we can only hope it the value will be of the same precision in the configs - {emqx_plugin_libs_rule:preproc_tmpl(Ts), Precision, Precision}. + {emqx_placeholder:preproc_tmpl(Ts), Precision, Precision}. to_kv_config(KVfields) -> maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)). to_maps_config(K, V, Res) -> - NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)), - NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)), + NK = emqx_placeholder:preproc_tmpl(bin(K)), + NV = emqx_placeholder:preproc_tmpl(bin(V)), Res#{NK => NV}. %% ------------------------------------------------------------------------------------------------- @@ -505,7 +505,7 @@ parse_batch_data(InstId, BatchData, SyntaxLines) -> fields := [{binary(), binary()}], measurement := binary(), tags := [{binary(), binary()}], - timestamp := emqx_plugin_libs_rule:tmpl_token() | integer(), + timestamp := emqx_placeholder:tmpl_token() | integer(), precision := {From :: ts_precision(), To :: ts_precision()} } ]) -> {ok, [map()]} | {error, term()}. @@ -526,7 +526,7 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error is_list(Ts) -> TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, - case parse_timestamp(emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions)) of + case parse_timestamp(emqx_placeholder:proc_tmpl(Ts, Data, TransOptions)) of {ok, TsInt} -> Item1 = Item#{timestamp => TsInt}, continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc); @@ -573,7 +573,7 @@ line_to_point( {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields), maps:without([precision], Item#{ - measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data), + measurement => emqx_placeholder:proc_tmpl(Measurement, Data), tags => EncodedTags, fields => EncodedFields, timestamp => maybe_convert_time_unit(Ts, Precision) @@ -590,8 +590,8 @@ time_unit(ns) -> nanosecond. maps_config_to_data(K, V, {Data, Res}) -> KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, - NK0 = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions), - NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, VTransOptions), + NK0 = emqx_placeholder:proc_tmpl(K, Data, KTransOptions), + NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions), case {NK0, NV} of {[undefined], _} -> {Data, Res}; diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index 0e1934525..087906549 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -185,7 +185,7 @@ preproc_data( timestamp => maybe_preproc_tmpl( maps:get(<<"timestamp">>, Data, <<"now">>) ), - measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), + measurement => emqx_placeholder:preproc_tmpl(Measurement), data_type => DataType, value => maybe_preproc_tmpl(Value) } @@ -203,7 +203,7 @@ preproc_data(_NoMatch, Acc) -> Acc. maybe_preproc_tmpl(Value) when is_binary(Value) -> - emqx_plugin_libs_rule:preproc_tmpl(Value); + emqx_placeholder:preproc_tmpl(Value); maybe_preproc_tmpl(Value) -> Value. @@ -225,7 +225,7 @@ proc_data(PreProcessedData, Msg) -> ) -> #{ timestamp => iot_timestamp(TimestampTkn, Msg, Nows), - measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Msg), + measurement => emqx_placeholder:proc_tmpl(Measurement, Msg), data_type => DataType, value => proc_value(DataType, ValueTkn, Msg) } @@ -236,7 +236,7 @@ proc_data(PreProcessedData, Msg) -> iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) -> Timestamp; iot_timestamp(TimestampTkn, Msg, Nows) -> - iot_timestamp(emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows). + iot_timestamp(emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows). iot_timestamp(Timestamp, #{now_ms := NowMs}) when Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>> @@ -250,7 +250,7 @@ iot_timestamp(Timestamp, _) when is_binary(Timestamp) -> binary_to_integer(Timestamp). proc_value(<<"TEXT">>, ValueTkn, Msg) -> - case emqx_plugin_libs_rule:proc_tmpl(ValueTkn, Msg) of + case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of <<"undefined">> -> null; Val -> Val end; @@ -262,7 +262,7 @@ proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> -> convert_float(replace_var(ValueTkn, Msg)). replace_var(Tokens, Data) when is_list(Tokens) -> - [Val] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), + [Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}), Val; replace_var(Val, _Data) -> Val. @@ -410,8 +410,8 @@ device_id(Message, Payloads, State) -> %% [FIXME] there could be conflicting device-ids in the Payloads maps:get(<<"device_id">>, hd(Payloads), undefined); DeviceId -> - DeviceIdTkn = emqx_plugin_libs_rule:preproc_tmpl(DeviceId), - emqx_plugin_libs_rule:proc_tmpl(DeviceIdTkn, Message) + DeviceIdTkn = emqx_placeholder:preproc_tmpl(DeviceId), + emqx_placeholder:proc_tmpl(DeviceIdTkn, Message) end. handle_response({ok, 200, _Headers, Body} = Resp) -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index c0de23d94..9d03d53f9 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -68,7 +68,7 @@ resource_id := resource_id(), topic_mapping := #{ kafka_topic() := #{ - payload_template := emqx_plugin_libs_rule:tmpl_token(), + payload_template := emqx_placeholder:tmpl_token(), mqtt_topic => emqx_types:topic(), qos => emqx_types:qos() } @@ -82,7 +82,7 @@ resource_id := resource_id(), topic_mapping := #{ kafka_topic() := #{ - payload_template := emqx_plugin_libs_rule:tmpl_token(), + payload_template := emqx_placeholder:tmpl_token(), mqtt_topic => emqx_types:topic(), qos => emqx_types:qos() } @@ -536,7 +536,7 @@ convert_topic_mapping(TopicMappingList) -> qos := QoS, payload_template := PayloadTemplate0 } = Fields, - PayloadTemplate = emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate0), + PayloadTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate0), Acc#{ KafkaTopic => #{ payload_template => PayloadTemplate, @@ -559,7 +559,7 @@ render(FullMessage, PayloadTemplate) -> emqx_plugin_libs_rule:bin(X) end }, - emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, FullMessage, Opts). + emqx_placeholder:proc_tmpl(PayloadTemplate, FullMessage, Opts). encode(Value, none) -> Value; diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 8b8337b09..cb8427cc5 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -240,7 +240,7 @@ compile_message_template(T) -> }. preproc_tmpl(Tmpl) -> - emqx_plugin_libs_rule:preproc_tmpl(Tmpl). + emqx_placeholder:preproc_tmpl(Tmpl). render_message( #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message @@ -259,7 +259,7 @@ render(Template, Message) -> end, return => full_binary }, - emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts). + emqx_placeholder:proc_tmpl(Template, Message, Opts). render_timestamp(Template, Message) -> try diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl index 8a8cffe55..48cae70d7 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl @@ -21,7 +21,7 @@ -export_type([msgvars/0]). --type template() :: emqx_plugin_libs_rule:tmpl_token(). +-type template() :: emqx_placeholder:tmpl_token(). -type msgvars() :: #{ topic => template(), @@ -48,7 +48,7 @@ parse(Conf) -> parse_field(Key, Conf, Acc) -> case Conf of #{Key := Val} when is_binary(Val) -> - Acc#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)}; + Acc#{Key => emqx_placeholder:preproc_tmpl(Val)}; #{Key := Val} -> Acc#{Key => Val}; #{} -> diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index b8157d4fc..0b5885c16 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -34,8 +34,8 @@ value := binary() }. -type message_template() :: #{ - key := emqx_plugin_libs_rule:tmpl_token(), - value := emqx_plugin_libs_rule:tmpl_token() + key := emqx_placeholder:tmpl_token(), + value := emqx_placeholder:tmpl_token() }. -type config() :: #{ authentication := _, @@ -421,7 +421,7 @@ compile_message_template(TemplateOpts) -> }. preproc_tmpl(Template) -> - emqx_plugin_libs_rule:preproc_tmpl(Template). + emqx_placeholder:preproc_tmpl(Template). render_message( Message, #{key := KeyTemplate, value := ValueTemplate} @@ -439,7 +439,7 @@ render(Message, Template) -> end, return => full_binary }, - emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts). + emqx_placeholder:proc_tmpl(Template, Message, Opts). get_producer_status(Producers) -> case pulsar_producers:all_connected(Producers) of diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 0dc73b5f6..c7aca7633 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -225,7 +225,7 @@ on_start( {pool_size, PoolSize}, {pool, InstanceID} ], - ProcessedTemplate = emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate), + ProcessedTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate), State = #{ poolname => InstanceID, processed_payload_template => ProcessedTemplate, @@ -547,7 +547,7 @@ is_send_message_atom(_) -> format_data([], Msg) -> emqx_utils_json:encode(Msg); format_data(Tokens, Msg) -> - emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg). + emqx_placeholder:proc_tmpl(Tokens, Msg). handle_result({error, ecpool_empty}) -> {error, {recoverable_error, ecpool_empty}}; diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index a7d01960e..6167b7749 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -102,7 +102,7 @@ on_start( emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS) ), ClientId = client_id(InstanceId), - TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic), + TopicTks = emqx_placeholder:preproc_tmpl(Topic), #{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config), ClientCfg = #{acl_info => AclInfo}, Templates = parse_template(Config), @@ -240,7 +240,7 @@ parse_template(Config) -> parse_template(maps:to_list(Templates), #{}). parse_template([{Key, H} | T], Templates) -> - ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(H), + ParamsTks = emqx_placeholder:preproc_tmpl(H), parse_template( T, Templates#{Key => ParamsTks} @@ -249,7 +249,7 @@ parse_template([], Templates) -> Templates. get_topic_key({_, Msg}, TopicTks) -> - emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg); + emqx_placeholder:proc_tmpl(TopicTks, Msg); get_topic_key([Query | _], TopicTks) -> get_topic_key(Query, TopicTks). @@ -258,14 +258,14 @@ apply_template({Key, Msg} = _Req, Templates) -> undefined -> emqx_utils_json:encode(Msg); Template -> - emqx_plugin_libs_rule:proc_tmpl(Template, Msg) + emqx_placeholder:proc_tmpl(Template, Msg) end; apply_template([{Key, _} | _] = Reqs, Templates) -> case maps:get(Key, Templates, undefined) of undefined -> [emqx_utils_json:encode(Msg) || {_, Msg} <- Reqs]; Template -> - [emqx_plugin_libs_rule:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs] + [emqx_placeholder:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs] end. client_id(ResourceId) -> diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 52bd910db..edd8770f5 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -455,9 +455,7 @@ parse_sql_template([{Key, H} | T], BatchInsertTks) -> Key => #{ ?BATCH_INSERT_PART => InsertSQL, - ?BATCH_PARAMS_TOKENS => emqx_plugin_libs_rule:preproc_tmpl( - Params - ) + ?BATCH_PARAMS_TOKENS => emqx_placeholder:preproc_tmpl(Params) } } ); @@ -478,7 +476,7 @@ parse_sql_template([], BatchInsertTks) -> apply_template( {?ACTION_SEND_MESSAGE = _Key, _Msg} = Query, Templates ) -> - %% TODO: fix emqx_plugin_libs_rule:proc_tmpl/2 + %% TODO: fix emqx_placeholder:proc_tmpl/2 %% it won't add single quotes for string apply_template([Query], Templates); %% batch inserts diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 8fd41443c..6c1d06429 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -126,8 +126,8 @@ on_query(InstanceId, {query, SQL}, State) -> do_query(InstanceId, SQL, State); on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) -> case maps:find(Key, InsertTksMap) of - {ok, Tokens} -> - SQL = emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Data), + {ok, Tokens} when is_map(Data) -> + SQL = emqx_placeholder:proc_sql_param_str(Tokens, Data), do_query(InstanceId, SQL, State); _ -> {error, {unrecoverable_error, invalid_request}} @@ -136,7 +136,7 @@ on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) -> %% aggregate the batch queries to one SQL is a heavy job, we should put it in the worker process on_batch_query( InstanceId, - [{Key, _} | _] = BatchReq, + [{Key, _Data = #{}} | _] = BatchReq, #{batch_tokens := BatchTksMap, query_opts := Opts} = State ) -> case maps:find(Key, BatchTksMap) of @@ -231,8 +231,8 @@ do_batch_insert(Conn, Tokens, BatchReqs, Opts) -> aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) -> lists:foldl( fun({_, Data}, Acc) -> - InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data), - ParamsPart = emqx_plugin_libs_rule:proc_sql_param_str(ParamsPartTks, Data), + InsertPart = emqx_placeholder:proc_sql_param_str(InsertPartTks, Data), + ParamsPart = emqx_placeholder:proc_sql_param_str(ParamsPartTks, Data), Values = maps:get(InsertPart, Acc, []), maps:put(InsertPart, [ParamsPart | Values], Acc) end, @@ -260,12 +260,12 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) -> {ok, select} -> parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap); {ok, insert} -> - InsertTks = emqx_plugin_libs_rule:preproc_tmpl(H), + InsertTks = emqx_placeholder:preproc_tmpl(H), H1 = string:trim(H, trailing, ";"), case split_insert_sql(H1) of [_InsertStr, InsertPart, _ValuesStr, ParamsPart] -> - InsertPartTks = emqx_plugin_libs_rule:preproc_tmpl(InsertPart), - ParamsPartTks = emqx_plugin_libs_rule:preproc_tmpl(ParamsPart), + InsertPartTks = emqx_placeholder:preproc_tmpl(InsertPart), + ParamsPartTks = emqx_placeholder:preproc_tmpl(ParamsPart), parse_batch_prepare_sql( T, InsertTksMap#{Key => InsertTks}, diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 55f8bb69e..7644921f0 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -496,7 +496,7 @@ t_simple_sql_query(Config) -> ), case EnableBatch of true -> - ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result); + ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); false -> ?assertMatch({ok, #{<<"code">> := 0, <<"data">> := [[1]]}}, Result) end, @@ -535,7 +535,7 @@ t_bad_sql_parameter(Config) -> 2_000 ), - ?assertMatch({error, #{<<"code">> := _}}, Result), + ?assertMatch({error, {unrecoverable_error, invalid_request}}, Result), ok. t_nasty_sql_string(Config) -> diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index c44212cc3..149704f76 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -464,8 +464,8 @@ preprocess_request( } = Req ) -> #{ - method => emqx_plugin_libs_rule:preproc_tmpl(to_bin(Method)), - path => emqx_plugin_libs_rule:preproc_tmpl(Path), + method => emqx_placeholder:preproc_tmpl(to_bin(Method)), + path => emqx_placeholder:preproc_tmpl(Path), body => maybe_preproc_tmpl(body, Req), headers => wrap_auth_header(preproc_headers(Headers)), request_timeout => maps:get(request_timeout, Req, ?DEFAULT_REQUEST_TIMEOUT_MS), @@ -477,8 +477,8 @@ preproc_headers(Headers) when is_map(Headers) -> fun(K, V, Acc) -> [ { - emqx_plugin_libs_rule:preproc_tmpl(to_bin(K)), - emqx_plugin_libs_rule:preproc_tmpl(to_bin(V)) + emqx_placeholder:preproc_tmpl(to_bin(K)), + emqx_placeholder:preproc_tmpl(to_bin(V)) } | Acc ] @@ -490,8 +490,8 @@ preproc_headers(Headers) when is_list(Headers) -> lists:map( fun({K, V}) -> { - emqx_plugin_libs_rule:preproc_tmpl(to_bin(K)), - emqx_plugin_libs_rule:preproc_tmpl(to_bin(V)) + emqx_placeholder:preproc_tmpl(to_bin(K)), + emqx_placeholder:preproc_tmpl(to_bin(V)) } end, Headers @@ -530,7 +530,7 @@ try_bin_to_lower(Bin) -> maybe_preproc_tmpl(Key, Conf) -> case maps:get(Key, Conf, undefined) of undefined -> undefined; - Val -> emqx_plugin_libs_rule:preproc_tmpl(Val) + Val -> emqx_placeholder:preproc_tmpl(Val) end. process_request( @@ -544,8 +544,8 @@ process_request( Msg ) -> Conf#{ - method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg)), - path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg), + method => make_method(emqx_placeholder:proc_tmpl(MethodTks, Msg)), + path => emqx_placeholder:proc_tmpl(PathTks, Msg), body => process_request_body(BodyTks, Msg), headers => proc_headers(HeadersTks, Msg), request_timeout => ReqTimeout @@ -554,14 +554,14 @@ process_request( process_request_body(undefined, Msg) -> emqx_utils_json:encode(Msg); process_request_body(BodyTks, Msg) -> - emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg). + emqx_placeholder:proc_tmpl(BodyTks, Msg). proc_headers(HeaderTks, Msg) -> lists:map( fun({K, V}) -> { - emqx_plugin_libs_rule:proc_tmpl(K, Msg), - emqx_plugin_libs_rule:proc_tmpl(emqx_secret:unwrap(V), Msg) + emqx_placeholder:proc_tmpl(K, Msg), + emqx_placeholder:proc_tmpl(emqx_secret:unwrap(V), Msg) } end, HeaderTks diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 25db669ce..a46b21d92 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -344,7 +344,7 @@ parse_prepare_sql(Config) -> parse_prepare_sql(maps:to_list(SQL), #{}, #{}, #{}, #{}). parse_prepare_sql([{Key, H} | _] = L, Prepares, Tokens, BatchInserts, BatchTks) -> - {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H), + {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H), parse_batch_prepare_sql( L, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}, BatchInserts, BatchTks ); @@ -363,7 +363,7 @@ parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks {ok, insert} -> case emqx_plugin_libs_rule:split_insert_sql(H) of {ok, {InsertSQL, Params}} -> - ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(Params), + ParamsTks = emqx_placeholder:preproc_tmpl(Params), parse_prepare_sql( T, Prepares, @@ -389,7 +389,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) undefined -> {SQLOrData, Params}; Tokens -> - {TypeOrKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} + {TypeOrKey, emqx_placeholder:proc_sql(Tokens, SQLOrData)} end. on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) -> diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index cbe95cb33..8fb60f102 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -188,7 +188,7 @@ on_batch_query( {error, {unrecoverable_error, batch_prepare_not_implemented}}; TokenList -> {_, Datas} = lists:unzip(BatchReq), - Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], + Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas], St = maps:get(BinKey, Sts), case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of {error, _Error} = Result -> @@ -218,7 +218,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) undefined -> {SQLOrData, Params}; Tokens -> - {Key, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} + {Key, emqx_placeholder:proc_sql(Tokens, SQLOrData)} end. on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> @@ -350,7 +350,7 @@ parse_prepare_sql(Config) -> parse_prepare_sql(maps:to_list(SQL), #{}, #{}). parse_prepare_sql([{Key, H} | T], Prepares, Tokens) -> - {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '$n'), + {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '$n'), parse_prepare_sql( T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} ); diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index 84334875b..71a891850 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -301,8 +301,8 @@ feedvar(Override, Packet, ConnInfo, ClientInfo) -> }, maps:map( fun(_K, V) -> - Tokens = emqx_plugin_libs_rule:preproc_tmpl(V), - emqx_plugin_libs_rule:proc_tmpl(Tokens, Envs) + Tokens = emqx_placeholder:preproc_tmpl(V), + emqx_placeholder:proc_tmpl(Tokens, Envs) end, Override ). diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index 316432dea..854ccad88 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -283,8 +283,8 @@ feedvar(Override, Packet, ConnInfo, ClientInfo) -> }, maps:map( fun(_K, V) -> - Tokens = emqx_plugin_libs_rule:preproc_tmpl(V), - emqx_plugin_libs_rule:proc_tmpl(Tokens, Envs) + Tokens = emqx_placeholder:preproc_tmpl(V), + emqx_placeholder:proc_tmpl(Tokens, Envs) end, Override ). diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 0f543badd..5d80c0d56 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -168,7 +168,7 @@ on_batch_query( {error, {unrecoverable_error, batch_prepare_not_implemented}}; TokenList -> {_, Datas} = lists:unzip(BatchReq), - Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], + Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas], St = maps:get(BinKey, Sts), case on_sql_query(InstId, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2) @@ -204,7 +204,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{ undefined -> {SQLOrData, Params}; Sql -> - {Sql, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} + {Sql, emqx_placeholder:proc_sql(Tokens, SQLOrData)} end end. @@ -305,7 +305,7 @@ parse_prepare_sql(Config) -> parse_prepare_sql(maps:to_list(SQL), #{}, #{}). parse_prepare_sql([{Key, H} | T], Prepares, Tokens) -> - {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, ':n'), + {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, ':n'), parse_prepare_sql( T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} ); diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index 3bfac1ec4..b3e3507f0 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -19,17 +19,6 @@ %% preprocess and process template string with place holders -export([ - preproc_tmpl/1, - proc_tmpl/2, - proc_tmpl/3, - preproc_cmd/1, - proc_cmd/2, - proc_cmd/3, - preproc_sql/1, - preproc_sql/2, - proc_sql/2, - proc_sql_param_str/2, - proc_cql_param_str/2, split_insert_sql/1, detect_sql_type/1, proc_batch_sql/3, @@ -61,68 +50,15 @@ ]). -export([ - now_ms/0, can_topic_match_oneof/2 ]). --export_type([tmpl_token/0]). - -compile({no_auto_import, [float/1]}). -type uri_string() :: iodata(). -type tmpl_token() :: list({var, binary()} | {str, binary()}). --type tmpl_cmd() :: list(tmpl_token()). - --type prepare_statement_key() :: binary(). - -%% preprocess template string with place holders --spec preproc_tmpl(binary()) -> tmpl_token(). -preproc_tmpl(Str) -> - emqx_placeholder:preproc_tmpl(Str). - --spec proc_tmpl(tmpl_token(), map()) -> binary(). -proc_tmpl(Tokens, Data) -> - emqx_placeholder:proc_tmpl(Tokens, Data). - --spec proc_tmpl(tmpl_token(), map(), map()) -> binary() | list(). -proc_tmpl(Tokens, Data, Opts) -> - emqx_placeholder:proc_tmpl(Tokens, Data, Opts). - --spec preproc_cmd(binary()) -> tmpl_cmd(). -preproc_cmd(Str) -> - emqx_placeholder:preproc_cmd(Str). - --spec proc_cmd([tmpl_token()], map()) -> binary() | list(). -proc_cmd(Tokens, Data) -> - emqx_placeholder:proc_cmd(Tokens, Data). --spec proc_cmd([tmpl_token()], map(), map()) -> list(). -proc_cmd(Tokens, Data, Opts) -> - emqx_placeholder:proc_cmd(Tokens, Data, Opts). - -%% preprocess SQL with place holders --spec preproc_sql(Sql :: binary()) -> {prepare_statement_key(), tmpl_token()}. -preproc_sql(Sql) -> - emqx_placeholder:preproc_sql(Sql). - --spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n' | ':n') -> - {prepare_statement_key(), tmpl_token()}. -preproc_sql(Sql, ReplaceWith) -> - emqx_placeholder:preproc_sql(Sql, ReplaceWith). - --spec proc_sql(tmpl_token(), map()) -> list(). -proc_sql(Tokens, Data) -> - emqx_placeholder:proc_sql(Tokens, Data). - --spec proc_sql_param_str(tmpl_token(), map()) -> binary(). -proc_sql_param_str(Tokens, Data) -> - emqx_placeholder:proc_sql_param_str(Tokens, Data). - --spec proc_cql_param_str(tmpl_token(), map()) -> binary(). -proc_cql_param_str(Tokens, Data) -> - emqx_placeholder:proc_cql_param_str(Tokens, Data). - %% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">> -spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when InsertSQL :: binary(), @@ -169,7 +105,7 @@ detect_sql_type(SQL) -> proc_batch_sql(BatchReqs, InsertPart, Tokens) -> ValuesPart = erlang:iolist_to_binary( lists:join($,, [ - proc_sql_param_str(Tokens, Msg) + emqx_placeholder:proc_sql_param_str(Tokens, Msg) || {_, Msg} <- BatchReqs ]) ), @@ -353,9 +289,6 @@ number_to_list(Int) when is_integer(Int) -> number_to_list(Float) when is_float(Float) -> float_to_list(Float, [{decimals, 10}, compact]). -now_ms() -> - erlang:system_time(millisecond). - can_topic_match_oneof(Topic, Filters) -> lists:any( fun(Fltr) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index b562af09d..a5ad35066 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -65,10 +65,10 @@ pre_process_action_args( ) -> Args#{ preprocessed_tmpl => #{ - topic => emqx_plugin_libs_rule:preproc_tmpl(Topic), + topic => emqx_placeholder:preproc_tmpl(Topic), qos => preproc_vars(QoS), retain => preproc_vars(Retain), - payload => emqx_plugin_libs_rule:preproc_tmpl(Payload), + payload => emqx_placeholder:preproc_tmpl(Payload), user_properties => preproc_user_properties(UserProperties) } }; @@ -110,7 +110,7 @@ republish( } } ) -> - Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected), + Topic = emqx_placeholder:proc_tmpl(TopicTks, Selected), Payload = format_msg(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), @@ -189,7 +189,7 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) -> emqx_metrics:inc_msg(Msg). preproc_vars(Data) when is_binary(Data) -> - emqx_plugin_libs_rule:preproc_tmpl(Data); + emqx_placeholder:preproc_tmpl(Data); preproc_vars(Data) -> Data. @@ -201,13 +201,13 @@ preproc_user_properties(<<"${pub_props.'User-Property'}">>) -> ?ORIGINAL_USER_PROPERTIES; preproc_user_properties(<<"${", _/binary>> = V) -> %% use a variable - emqx_plugin_libs_rule:preproc_tmpl(V); + emqx_placeholder:preproc_tmpl(V); preproc_user_properties(_) -> %% invalid, discard undefined. replace_simple_var(Tokens, Data, Default) when is_list(Tokens) -> - [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), + [Var] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}), case Var of %% cannot find the variable from Data undefined -> Default; @@ -219,7 +219,7 @@ replace_simple_var(Val, _Data, _Default) -> format_msg([], Selected) -> emqx_utils_json:encode(Selected); format_msg(Tokens, Selected) -> - emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected). + emqx_placeholder:proc_tmpl(Tokens, Selected). format_pub_props(UserPropertiesTks, Selected, Env) -> UserProperties = diff --git a/apps/emqx_plugin_libs/src/emqx_placeholder.erl b/apps/emqx_utils/src/emqx_placeholder.erl similarity index 92% rename from apps/emqx_plugin_libs/src/emqx_placeholder.erl rename to apps/emqx_utils/src/emqx_placeholder.erl index dcd666f5b..babac8a84 100644 --- a/apps/emqx_plugin_libs/src/emqx_placeholder.erl +++ b/apps/emqx_utils/src/emqx_placeholder.erl @@ -46,7 +46,7 @@ quote_mysql/1 ]). --include_lib("emqx/include/emqx_placeholder.hrl"). +-define(PH_VAR_THIS, '$this'). -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})"). @@ -55,7 +55,7 @@ %% Space and CRLF -define(EX_WITHE_CHARS, "\\s"). --type tmpl_token() :: list({var, binary()} | {str, binary()}). +-type tmpl_token() :: list({var, ?PH_VAR_THIS | [binary()]} | {str, binary()}). -type tmpl_cmd() :: list(tmpl_token()). @@ -123,11 +123,11 @@ proc_tmpl(Tokens, Data, Opts = #{return := rawlist}) -> ({str, Str}) -> Str; ({var, Phld}) when is_function(Trans, 1) -> - Trans(get_phld_var(Phld, Data)); + Trans(lookup_var(Phld, Data)); ({var, Phld}) when is_function(Trans, 2) -> - Trans(Phld, get_phld_var(Phld, Data)); + Trans(Phld, lookup_var(Phld, Data)); ({var, Phld}) -> - get_phld_var(Phld, Data) + lookup_var(Phld, Data) end, Tokens ). @@ -264,13 +264,35 @@ quote_mysql(Str) when is_binary(Str) -> quote_mysql(Str) -> quote_escape(Str, fun escape_mysql/1). +lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] -> + Value; +lookup_var([Prop | Rest], Data) -> + case lookup(Prop, Data) of + {ok, Value} -> + lookup_var(Rest, Value); + {error, _} -> + undefined + end. + +lookup(Prop, Data) when is_binary(Prop) -> + case maps:get(Prop, Data, undefined) of + undefined -> + try + {ok, maps:get(binary_to_existing_atom(Prop, utf8), Data)} + catch + error:{badkey, _} -> + {error, undefined}; + error:badarg -> + {error, undefined} + end; + Value -> + {ok, Value} + end. + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ -get_phld_var(Phld, Data) -> - emqx_rule_maps:nested_get(Phld, Data). - preproc_var_re(#{placeholders := PHs, strip_double_quote := true}) -> Res = [ph_to_re(PH) || PH <- PHs], QuoteRes = ["\"" ++ Re ++ "\"" || Re <- Res], @@ -340,9 +362,8 @@ parse_nested(<<".", R/binary>>) -> parse_nested(R); parse_nested(Attr) -> case string:split(Attr, <<".">>, all) of - [<<>>] -> {var, ?PH_VAR_THIS}; - [Attr] -> {var, Attr}; - Nested -> {path, [{key, P} || P <- Nested]} + [<<>>] -> ?PH_VAR_THIS; + Nested -> Nested end. unwrap(<<"\"${", Val/binary>>, _StripDoubleQuote = true) -> diff --git a/apps/emqx_plugin_libs/test/emqx_placeholder_SUITE.erl b/apps/emqx_utils/test/emqx_placeholder_SUITE.erl similarity index 100% rename from apps/emqx_plugin_libs/test/emqx_placeholder_SUITE.erl rename to apps/emqx_utils/test/emqx_placeholder_SUITE.erl diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl index 0ae9bb2dc..70eca83d7 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl @@ -225,9 +225,9 @@ start_producer( msg => "hstreamdb connector: producer started" }), EnableBatch = maps:get(enable_batch, Options, false), - Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin), + Payload = emqx_placeholder:preproc_tmpl(PayloadBin), OrderingKeyBin = maps:get(ordering_key, Options, <<"">>), - OrderingKey = emqx_plugin_libs_rule:preproc_tmpl(OrderingKeyBin), + OrderingKey = emqx_placeholder:preproc_tmpl(OrderingKeyBin), State = #{ client => Client, producer => Producer, @@ -254,8 +254,8 @@ start_producer( end. to_record(OrderingKeyTmpl, PayloadTmpl, Data) -> - OrderingKey = emqx_plugin_libs_rule:proc_tmpl(OrderingKeyTmpl, Data), - Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTmpl, Data), + OrderingKey = emqx_placeholder:proc_tmpl(OrderingKeyTmpl, Data), + Payload = emqx_placeholder:proc_tmpl(PayloadTmpl, Data), to_record(OrderingKey, Payload). to_record(OrderingKey, Payload) when is_binary(OrderingKey) -> diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl index 59f763904..20985e961 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl @@ -57,7 +57,7 @@ on_query(InstanceId, {send_message, Message0}, State) -> connector_state := ConnectorState } = State, NewConnectorState = ConnectorState#{ - collection => emqx_plugin_libs_rule:proc_tmpl(CollectionTemplate, Message0) + collection => emqx_placeholder:proc_tmpl(CollectionTemplate, Message0) }, Message = render_message(PayloadTemplate, Message0), Res = emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, NewConnectorState), @@ -76,7 +76,7 @@ on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) -> preprocess_template(undefined = _PayloadTemplate) -> undefined; preprocess_template(PayloadTemplate) -> - emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate). + emqx_placeholder:preproc_tmpl(PayloadTemplate). render_message(undefined = _PayloadTemplate, Message) -> Message; @@ -102,14 +102,14 @@ format_data(PayloadTks, Msg) -> case maps:size(PreparedTupleMap) of % If no tuples were found simply proceed with the json decoding and be done with it 0 -> - emqx_utils_json:decode(emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Msg), [return_maps]); + emqx_utils_json:decode(emqx_placeholder:proc_tmpl(PayloadTks, Msg), [return_maps]); _ -> % If tuples were found, replace the tuple values with the references created, run % the modified message through the json parser, and then at the end replace the % references with the actual tuple values. ProcessedMessage = replace_message_values_with_references(Msg, PreparedTupleMap), DecodedMap = emqx_utils_json:decode( - emqx_plugin_libs_rule:proc_tmpl(PayloadTks, ProcessedMessage), [return_maps] + emqx_placeholder:proc_tmpl(PayloadTks, ProcessedMessage), [return_maps] ), populate_map_with_tuple_values(PreparedTupleMap, DecodedMap) end. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_redis.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_redis.erl index 4ce96d5c7..9caba2beb 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_redis.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_redis.erl @@ -126,13 +126,13 @@ process_batch_data(BatchData, CommandTemplate) -> proc_command_template(CommandTemplate, Msg) -> lists:map( fun(ArgTks) -> - emqx_plugin_libs_rule:proc_tmpl(ArgTks, Msg, #{return => full_binary}) + emqx_placeholder:proc_tmpl(ArgTks, Msg, #{return => full_binary}) end, CommandTemplate ). preproc_command_template(CommandTemplate) -> lists:map( - fun emqx_plugin_libs_rule:preproc_tmpl/1, + fun emqx_placeholder:preproc_tmpl/1, CommandTemplate ). From 7d0abb6146f976be25437179593d1e74dc880656 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 8 Jun 2023 08:56:24 +0300 Subject: [PATCH 2/7] feat(emqx): add `emqx_topic:match_any/2` utility --- apps/emqx/src/emqx_topic.erl | 7 +++++++ apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl | 12 ------------ apps/emqx_rule_engine/src/emqx_rule_engine.erl | 2 +- apps/emqx_rule_engine/src/emqx_rule_sqltester.erl | 3 +-- 4 files changed, 9 insertions(+), 15 deletions(-) diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index 62dca99c7..c1515e14b 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -21,6 +21,7 @@ %% APIs -export([ match/2, + match_any/2, validate/1, validate/2, levels/1, @@ -86,6 +87,12 @@ match([_H1 | _], []) -> match([], [_H | _T2]) -> false. +-spec match_any(Name, [Filter]) -> boolean() when + Name :: topic() | words(), + Filter :: topic() | words(). +match_any(Topic, Filters) -> + lists:any(fun(Filter) -> match(Topic, Filter) end, Filters). + %% @doc Validate topic name or filter -spec validate(topic() | {name | filter, topic()}) -> true. validate(Topic) when is_binary(Topic) -> diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index b3e3507f0..fd792310b 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -49,10 +49,6 @@ tcp_connectivity/3 ]). --export([ - can_topic_match_oneof/2 -]). - -compile({no_auto_import, [float/1]}). -type uri_string() :: iodata(). @@ -288,11 +284,3 @@ number_to_list(Int) when is_integer(Int) -> integer_to_list(Int); number_to_list(Float) when is_float(Float) -> float_to_list(Float, [{decimals, 10}, compact]). - -can_topic_match_oneof(Topic, Filters) -> - lists:any( - fun(Fltr) -> - emqx_topic:match(Topic, Fltr) - end, - Filters - ). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 24ad2c5f0..ff6636b9a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -206,7 +206,7 @@ get_rules_for_topic(Topic) -> [ Rule || Rule = #{from := From} <- get_rules(), - emqx_plugin_libs_rule:can_topic_match_oneof(Topic, From) + emqx_topic:match_any(Topic, From) ]. -spec get_rules_with_same_event(Topic :: binary()) -> [rule()]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 455efe389..f3b4e2790 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -14,7 +14,6 @@ -module(emqx_rule_sqltester). --include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). -export([ @@ -31,7 +30,7 @@ test(#{sql := Sql, context := Context}) -> case lists:all(fun is_publish_topic/1, EventTopics) of true -> %% test if the topic matches the topic filters in the rule - case emqx_plugin_libs_rule:can_topic_match_oneof(InTopic, EventTopics) of + case emqx_topic:match_any(InTopic, EventTopics) of true -> test_rule(Sql, Select, Context, EventTopics); false -> {error, nomatch} end; From a51baaa206bbbb084a7b9568e1f391fc4722762e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 8 Jun 2023 14:42:20 +0300 Subject: [PATCH 3/7] refactor(pluglib): move conversion utils to `emqx_utils_conv` --- .../src/emqx_bridge_influxdb_connector.erl | 4 +- .../src/emqx_bridge_kafka_impl_consumer.erl | 2 +- .../src/emqx_bridge_kafka_impl_producer.erl | 2 +- .../src/emqx_bridge_pulsar_impl_producer.erl | 2 +- .../src/emqx_bridge_sqlserver_connector.erl | 2 +- apps/emqx_oracle/src/emqx_oracle.erl | 10 +- .../src/emqx_plugin_libs_rule.erl | 41 +----- .../test/emqx_plugin_libs_rule_SUITE.erl | 23 +--- apps/emqx_rule_engine/src/emqx_rule_funcs.erl | 40 +++--- apps/emqx_rule_engine/src/emqx_rule_maps.erl | 2 +- .../test/emqx_rule_funcs_SUITE.erl | 6 +- apps/emqx_utils/src/emqx_placeholder.erl | 4 +- apps/emqx_utils/src/emqx_utils_conv.erl | 125 ++++++++++++++++++ .../emqx_utils/test/emqx_utils_conv_tests.erl | 44 ++++++ 14 files changed, 215 insertions(+), 92 deletions(-) create mode 100644 apps/emqx_utils/src/emqx_utils_conv.erl create mode 100644 apps/emqx_utils/test/emqx_utils_conv_tests.erl diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 71ab85c58..adf61918a 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -637,7 +637,7 @@ value_type(Val) -> Val. key_filter(undefined) -> undefined; -key_filter(Value) -> emqx_plugin_libs_rule:bin(Value). +key_filter(Value) -> emqx_utils_conv:bin(Value). data_filter(undefined) -> undefined; data_filter(Int) when is_integer(Int) -> Int; @@ -645,7 +645,7 @@ data_filter(Number) when is_number(Number) -> Number; data_filter(Bool) when is_boolean(Bool) -> Bool; data_filter(Data) -> bin(Data). -bin(Data) -> emqx_plugin_libs_rule:bin(Data). +bin(Data) -> emqx_utils_conv:bin(Data). %% helper funcs log_error_points(InstId, Errs) -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 9d03d53f9..a95e67b8d 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -556,7 +556,7 @@ render(FullMessage, PayloadTemplate) -> (undefined) -> <<>>; (X) -> - emqx_plugin_libs_rule:bin(X) + emqx_utils_conv:bin(X) end }, emqx_placeholder:proc_tmpl(PayloadTemplate, FullMessage, Opts). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index cb8427cc5..2d48c788b 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -255,7 +255,7 @@ render(Template, Message) -> Opts = #{ var_trans => fun (undefined) -> <<"">>; - (X) -> emqx_plugin_libs_rule:bin(X) + (X) -> emqx_utils_conv:bin(X) end, return => full_binary }, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 0b5885c16..133b98710 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -435,7 +435,7 @@ render(Message, Template) -> Opts = #{ var_trans => fun (undefined) -> <<"">>; - (X) -> emqx_plugin_libs_rule:bin(X) + (X) -> emqx_utils_conv:bin(X) end, return => full_binary }, diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index edd8770f5..cb0c0e16e 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -44,7 +44,7 @@ %% Internal exports used to execute code with ecpool worker -export([do_get_status/1, worker_do_insert/3]). --import(emqx_plugin_libs_rule, [str/1]). +-import(emqx_utils_conv, [str/1]). -import(hoconsc, [mk/2, enum/1, ref/2]). -define(ACTION_SEND_MESSAGE, send_message). diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 5d80c0d56..d6fd51847 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -93,14 +93,14 @@ on_start( ServiceName = case maps:get(service_name, Config, undefined) of undefined -> undefined; - ServiceName0 -> emqx_plugin_libs_rule:str(ServiceName0) + ServiceName0 -> emqx_utils_conv:str(ServiceName0) end, Options = [ {host, Host}, {port, Port}, - {user, emqx_plugin_libs_rule:str(User)}, + {user, emqx_utils_conv:str(User)}, {password, jamdb_secret:wrap(maps:get(password, Config, ""))}, - {sid, emqx_plugin_libs_rule:str(Sid)}, + {sid, emqx_utils_conv:str(Sid)}, {service_name, ServiceName}, {pool_size, maps:get(<<"pool_size">>, Config, ?DEFAULT_POOL_SIZE)}, {timeout, ?OPT_TIMEOUT}, @@ -268,14 +268,14 @@ connect(Opts) -> jamdb_oracle:start_link(Opts). sql_query_to_str(SqlQuery) -> - emqx_plugin_libs_rule:str(SqlQuery). + emqx_utils_conv:str(SqlQuery). sql_params_to_str(Params) when is_list(Params) -> lists:map( fun (false) -> "0"; (true) -> "1"; - (Value) -> emqx_plugin_libs_rule:str(Value) + (Value) -> emqx_utils_conv:str(Value) end, Params ). diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index fd792310b..aa9e38316 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -27,8 +27,6 @@ %% type converting -export([ - str/1, - bin/1, bool/1, int/1, float/1, @@ -36,7 +34,6 @@ map/1, utf8_bin/1, utf8_str/1, - number_to_binary/1, atom_key/1, unsafe_atom_key/1 ]). @@ -172,39 +169,15 @@ tcp_connectivity(Host, Port, Timeout) -> {error, Reason} end. -str(Bin) when is_binary(Bin) -> binary_to_list(Bin); -str(Num) when is_number(Num) -> number_to_list(Num); -str(Atom) when is_atom(Atom) -> atom_to_list(Atom); -str(Map) when is_map(Map) -> binary_to_list(emqx_utils_json:encode(Map)); -str(List) when is_list(List) -> - case io_lib:printable_list(List) of - true -> List; - false -> binary_to_list(emqx_utils_json:encode(List)) - end; -str(Data) -> - error({invalid_str, Data}). - utf8_bin(Str) when is_binary(Str); is_list(Str) -> unicode:characters_to_binary(Str); utf8_bin(Str) -> - unicode:characters_to_binary(bin(Str)). + unicode:characters_to_binary(emqx_utils_conv:bin(Str)). utf8_str(Str) when is_binary(Str); is_list(Str) -> unicode:characters_to_list(Str); utf8_str(Str) -> - unicode:characters_to_list(str(Str)). - -bin(Bin) when is_binary(Bin) -> Bin; -bin(Num) when is_number(Num) -> number_to_binary(Num); -bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); -bin(Map) when is_map(Map) -> emqx_utils_json:encode(Map); -bin(List) when is_list(List) -> - case io_lib:printable_list(List) of - true -> list_to_binary(List); - false -> emqx_utils_json:encode(List) - end; -bin(Data) -> - error({invalid_bin, Data}). + unicode:characters_to_list(emqx_utils_conv:str(Str)). int(List) when is_list(List) -> try @@ -274,13 +247,3 @@ bool(Bool) when false; bool(Bool) -> error({invalid_boolean, Bool}). - -number_to_binary(Int) when is_integer(Int) -> - integer_to_binary(Int); -number_to_binary(Float) when is_float(Float) -> - float_to_binary(Float, [{decimals, 10}, compact]). - -number_to_list(Int) when is_integer(Int) -> - integer_to_list(Int); -number_to_list(Float) when is_float(Float) -> - float_to_list(Float, [{decimals, 10}, compact]). diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl index bf509f362..bb1ba838b 100644 --- a/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl @@ -28,11 +28,11 @@ all() -> emqx_common_test_helpers:all(?MODULE). t_http_connectivity(_) -> {ok, Socket} = gen_tcp:listen(?PORT, []), ok = emqx_plugin_libs_rule:http_connectivity( - "http://127.0.0.1:" ++ emqx_plugin_libs_rule:str(?PORT), 1000 + "http://127.0.0.1:" ++ integer_to_list(?PORT), 1000 ), gen_tcp:close(Socket), {error, _} = emqx_plugin_libs_rule:http_connectivity( - "http://127.0.0.1:" ++ emqx_plugin_libs_rule:str(?PORT), 1000 + "http://127.0.0.1:" ++ integer_to_list(?PORT), 1000 ). t_tcp_connectivity(_) -> @@ -41,25 +41,6 @@ t_tcp_connectivity(_) -> gen_tcp:close(Socket), {error, _} = emqx_plugin_libs_rule:tcp_connectivity("127.0.0.1", ?PORT, 1000). -t_str(_) -> - ?assertEqual("abc", emqx_plugin_libs_rule:str("abc")), - ?assertEqual("abc", emqx_plugin_libs_rule:str(abc)), - ?assertEqual("{\"a\":1}", emqx_plugin_libs_rule:str(#{a => 1})), - ?assertEqual("1", emqx_plugin_libs_rule:str(1)), - ?assertEqual("2.0", emqx_plugin_libs_rule:str(2.0)), - ?assertEqual("true", emqx_plugin_libs_rule:str(true)), - ?assertError(_, emqx_plugin_libs_rule:str({a, v})). - -t_bin(_) -> - ?assertEqual(<<"abc">>, emqx_plugin_libs_rule:bin("abc")), - ?assertEqual(<<"abc">>, emqx_plugin_libs_rule:bin(abc)), - ?assertEqual(<<"{\"a\":1}">>, emqx_plugin_libs_rule:bin(#{a => 1})), - ?assertEqual(<<"[{\"a\":1}]">>, emqx_plugin_libs_rule:bin([#{a => 1}])), - ?assertEqual(<<"1">>, emqx_plugin_libs_rule:bin(1)), - ?assertEqual(<<"2.0">>, emqx_plugin_libs_rule:bin(2.0)), - ?assertEqual(<<"true">>, emqx_plugin_libs_rule:bin(true)), - ?assertError(_, emqx_plugin_libs_rule:bin({a, v})). - t_atom_key(_) -> _ = erlang, _ = port, diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index a94aa0c8a..de9bf0485 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -17,7 +17,6 @@ -module(emqx_rule_funcs). -include("rule_engine.hrl"). --include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -elvis([{elvis_style, god_modules, disable}]). @@ -266,8 +265,6 @@ ]} ). --define(is_var(X), is_binary(X)). - %% @doc "msgid()" Func msgid() -> fun @@ -631,29 +628,42 @@ do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"signed">>, <<"little">>) -> %%------------------------------------------------------------------------------ str(Data) -> - emqx_plugin_libs_rule:bin(Data). + emqx_utils_conv:bin(Data). +str_utf8(Data) when is_binary(Data); is_list(Data) -> + unicode:characters_to_binary(Data); str_utf8(Data) -> - emqx_plugin_libs_rule:utf8_bin(Data). + unicode:characters_to_binary(str(Data)). bool(Data) -> - emqx_plugin_libs_rule:bool(Data). + emqx_utils_conv:bool(Data). int(Data) -> - emqx_plugin_libs_rule:int(Data). + emqx_utils_conv:int(Data). float(Data) -> - emqx_plugin_libs_rule:float(Data). + emqx_utils_conv:float(Data). float(Data, Decimals) when Decimals > 0 -> - Data1 = ?MODULE:float(Data), + Data1 = emqx_utils_conv:float(Data), list_to_float(float_to_list(Data1, [{decimals, Decimals}])). float2str(Float, Precision) -> - emqx_plugin_libs_rule:float2str(Float, Precision). + float_to_binary(Float, [{decimals, Precision}, compact]). +map(Bin) when is_binary(Bin) -> + case emqx_utils_json:decode(Bin) of + Map = #{} -> + Map; + _ -> + error(badarg, [Bin]) + end; +map(List) when is_list(List) -> + maps:from_list(List); +map(Map = #{}) -> + Map; map(Data) -> - emqx_plugin_libs_rule:map(Data). + error(badarg, [Data]). bin2hexstr(Bin) when is_binary(Bin) -> emqx_utils:bin_to_hexstr(Bin, upper). @@ -895,7 +905,7 @@ mget(Key, Map, Default) -> Val; error when is_atom(Key) -> %% the map may have an equivalent binary-form key - BinKey = emqx_plugin_libs_rule:bin(Key), + BinKey = emqx_utils_conv:bin(Key), case maps:find(BinKey, Map) of {ok, Val} -> Val; error -> Default @@ -922,7 +932,7 @@ mput(Key, Val, Map) -> maps:put(Key, Val, Map); error when is_atom(Key) -> %% the map may have an equivalent binary-form key - BinKey = emqx_plugin_libs_rule:bin(Key), + BinKey = emqx_utils_conv:bin(Key), case maps:find(BinKey, Map) of {ok, _} -> maps:put(BinKey, Val, Map); error -> maps:put(Key, Val, Map) @@ -1053,7 +1063,7 @@ unix_ts_to_rfc3339(Epoch) -> unix_ts_to_rfc3339(Epoch, <<"second">>). unix_ts_to_rfc3339(Epoch, Unit) when is_integer(Epoch) -> - emqx_plugin_libs_rule:bin( + emqx_utils_conv:bin( calendar:system_time_to_rfc3339( Epoch, [{unit, time_unit(Unit)}] ) @@ -1090,7 +1100,7 @@ format_date(TimeUnit, Offset, FormatString) -> format_date(TimeUnit, Offset, FormatString, TimeEpoch) -> Unit = time_unit(TimeUnit), - emqx_plugin_libs_rule:bin( + emqx_utils_conv:bin( lists:concat( emqx_calendar:format(TimeEpoch, Unit, Offset, FormatString) ) diff --git a/apps/emqx_rule_engine/src/emqx_rule_maps.erl b/apps/emqx_rule_engine/src/emqx_rule_maps.erl index 3dfffca46..baf83ff6f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_maps.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_maps.erl @@ -97,7 +97,7 @@ general_find({key, Key}, Map, _OrgData, Handler) when is_map(Map) -> Handler({found, {{key, Key}, Val}}); error when is_atom(Key) -> %% the map may have an equivalent binary-form key - BinKey = emqx_plugin_libs_rule:bin(Key), + BinKey = atom_to_binary(Key), case maps:find(BinKey, Map) of {ok, Val} -> Handler({equivalent, {{key, BinKey}, Val}}); error -> Handler(not_found) diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index 12110b206..6e67f6cd1 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -126,7 +126,7 @@ t_int(_) -> ?assertEqual(1, emqx_rule_funcs:int(1.0001)), ?assertEqual(1, emqx_rule_funcs:int(true)), ?assertEqual(0, emqx_rule_funcs:int(false)), - ?assertError({invalid_number, {a, v}}, emqx_rule_funcs:int({a, v})), + ?assertError(badarg, emqx_rule_funcs:int({a, v})), ?assertError(_, emqx_rule_funcs:int("a")). t_float(_) -> @@ -137,7 +137,7 @@ t_float(_) -> ?assertEqual(1.9, emqx_rule_funcs:float(1.9)), ?assertEqual(1.0001, emqx_rule_funcs:float(1.0001)), ?assertEqual(1.0000000001, emqx_rule_funcs:float(1.0000000001)), - ?assertError({invalid_number, {a, v}}, emqx_rule_funcs:float({a, v})), + ?assertError(badarg, emqx_rule_funcs:float({a, v})), ?assertError(_, emqx_rule_funcs:float("a")). t_map(_) -> @@ -158,7 +158,7 @@ t_bool(_) -> ?assertEqual(true, emqx_rule_funcs:bool(<<"true">>)), ?assertEqual(false, emqx_rule_funcs:bool(false)), ?assertEqual(false, emqx_rule_funcs:bool(<<"false">>)), - ?assertError({invalid_boolean, _}, emqx_rule_funcs:bool(3)). + ?assertError(badarg, emqx_rule_funcs:bool(3)). t_proc_dict_put_get_del(_) -> ?assertEqual(undefined, emqx_rule_funcs:proc_dict_get(<<"abc">>)), diff --git a/apps/emqx_utils/src/emqx_placeholder.erl b/apps/emqx_utils/src/emqx_placeholder.erl index babac8a84..861548f93 100644 --- a/apps/emqx_utils/src/emqx_placeholder.erl +++ b/apps/emqx_utils/src/emqx_placeholder.erl @@ -112,7 +112,7 @@ proc_tmpl(Tokens, Data) -> -spec proc_tmpl(tmpl_token(), map(), proc_tmpl_opts()) -> binary() | list(). proc_tmpl(Tokens, Data, Opts = #{return := full_binary}) -> - Trans = maps:get(var_trans, Opts, fun emqx_plugin_libs_rule:bin/1), + Trans = maps:get(var_trans, Opts, fun emqx_utils_conv:bin/1), list_to_binary( proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Trans}) ); @@ -243,7 +243,7 @@ sql_data(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); sql_data(Map) when is_map(Map) -> emqx_utils_json:encode(Map). -spec bin(term()) -> binary(). -bin(Val) -> emqx_plugin_libs_rule:bin(Val). +bin(Val) -> emqx_utils_conv:bin(Val). -spec quote_sql(_Value) -> iolist(). quote_sql(Str) -> diff --git a/apps/emqx_utils/src/emqx_utils_conv.erl b/apps/emqx_utils/src/emqx_utils_conv.erl new file mode 100644 index 000000000..9879d3a9d --- /dev/null +++ b/apps/emqx_utils/src/emqx_utils_conv.erl @@ -0,0 +1,125 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_utils_conv). + +-export([bin/1]). +-export([str/1]). +-export([bool/1]). +-export([int/1]). +-export([float/1]). + +-compile({no_auto_import, [float/1]}). + +-type scalar() :: binary() | number() | atom() | string(). + +-spec bin(Term) -> binary() when + Term :: scalar() | #{scalar() => Term} | [Term]. +bin(Bin) when is_binary(Bin) -> Bin; +bin(Num) when is_number(Num) -> number_to_binary(Num); +bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); +bin(Map) when is_map(Map) -> emqx_utils_json:encode(Map); +bin(List) when is_list(List) -> + case io_lib:printable_list(List) of + true -> list_to_binary(List); + false -> emqx_utils_json:encode(List) + end; +bin(Data) -> + error({invalid_bin, Data}). + +-spec str(Term) -> string() when + Term :: scalar() | #{scalar() => Term} | [Term]. +str(Bin) when is_binary(Bin) -> binary_to_list(Bin); +str(Num) when is_number(Num) -> number_to_list(Num); +str(Atom) when is_atom(Atom) -> atom_to_list(Atom); +str(Map) when is_map(Map) -> binary_to_list(emqx_utils_json:encode(Map)); +str(List) when is_list(List) -> + case io_lib:printable_list(List) of + true -> List; + false -> binary_to_list(emqx_utils_json:encode(List)) + end; +str(Data) -> + error({invalid_str, Data}). + +-spec number_to_binary(number()) -> binary(). +number_to_binary(Int) when is_integer(Int) -> + integer_to_binary(Int); +number_to_binary(Float) when is_float(Float) -> + float_to_binary(Float, [{decimals, 10}, compact]). + +-spec number_to_list(number()) -> string(). +number_to_list(Int) when is_integer(Int) -> + integer_to_list(Int); +number_to_list(Float) when is_float(Float) -> + float_to_list(Float, [{decimals, 10}, compact]). + +-spec bool(Term) -> boolean() when + Term :: boolean() | binary() | 0..1. +bool(true) -> true; +bool(<<"true">>) -> true; +bool(N) when N == 1 -> true; +bool(false) -> false; +bool(<<"false">>) -> false; +bool(N) when N == 0 -> false; +bool(Data) -> error(badarg, [Data]). + +-spec int(Term) -> integer() when + Term :: binary() | string() | number() | boolean(). +int(List) when is_list(List) -> + try + list_to_integer(List) + catch + error:badarg -> + int(list_to_float(List)) + end; +int(Bin) when is_binary(Bin) -> + try + binary_to_integer(Bin) + catch + error:badarg -> + int(binary_to_float(Bin)) + end; +int(Int) when is_integer(Int) -> + Int; +int(Float) when is_float(Float) -> + erlang:floor(Float); +int(true) -> + 1; +int(false) -> + 0; +int(Data) -> + error(badarg, [Data]). + +-spec float(Term) -> float() when + Term :: binary() | string() | number(). +float(List) when is_list(List) -> + try + list_to_float(List) + catch + error:badarg -> + float(list_to_integer(List)) + end; +float(Bin) when is_binary(Bin) -> + try + binary_to_float(Bin) + catch + error:badarg -> + float(binary_to_integer(Bin)) + end; +float(Num) when is_number(Num) -> + erlang:float(Num); +float(Data) -> + error(badarg, [Data]). diff --git a/apps/emqx_utils/test/emqx_utils_conv_tests.erl b/apps/emqx_utils/test/emqx_utils_conv_tests.erl new file mode 100644 index 000000000..b60467264 --- /dev/null +++ b/apps/emqx_utils/test/emqx_utils_conv_tests.erl @@ -0,0 +1,44 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_utils_conv_tests). + +-import(emqx_utils_conv, [bin/1, str/1]). + +-include_lib("eunit/include/eunit.hrl"). + +bin_test_() -> + [ + ?_assertEqual(<<"abc">>, bin("abc")), + ?_assertEqual(<<"abc">>, bin(abc)), + ?_assertEqual(<<"{\"a\":1}">>, bin(#{a => 1})), + ?_assertEqual(<<"[{\"a\":1}]">>, bin([#{a => 1}])), + ?_assertEqual(<<"1">>, bin(1)), + ?_assertEqual(<<"2.0">>, bin(2.0)), + ?_assertEqual(<<"true">>, bin(true)), + ?_assertError(_, bin({a, v})) + ]. + +str_test_() -> + [ + ?_assertEqual("abc", str("abc")), + ?_assertEqual("abc", str(abc)), + ?_assertEqual("{\"a\":1}", str(#{a => 1})), + ?_assertEqual("1", str(1)), + ?_assertEqual("2.0", str(2.0)), + ?_assertEqual("true", str(true)), + ?_assertError(_, str({a, v})) + ]. From 8919a6ef93f78774e2e23c74903b635f4602c616 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 9 Jun 2023 00:21:39 +0300 Subject: [PATCH 4/7] refactor(pluglib): provide SQL related utils in `emqx_utils_sql` --- .../src/emqx_bridge_clickhouse_connector.erl | 2 +- .../src/emqx_bridge_sqlserver_connector.erl | 22 ++- .../src/emqx_bridge_tdengine_connector.erl | 17 +- .../src/emqx_connector_mysql.erl | 11 +- .../src/emqx_plugin_libs_rule.erl | 49 +----- apps/emqx_utils/src/emqx_placeholder.erl | 69 +------- apps/emqx_utils/src/emqx_utils_sql.erl | 157 ++++++++++++++++++ 7 files changed, 199 insertions(+), 128 deletions(-) create mode 100644 apps/emqx_utils/src/emqx_utils_sql.erl diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index 002460c95..2d9af6bee 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -212,7 +212,7 @@ prepare_sql_bulk_extend_template(Template, Separator) -> ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]), emqx_placeholder:preproc_tmpl(ExtendParamTemplate). -%% This function is similar to emqx_plugin_libs_rule:split_insert_sql/1 but can +%% This function is similar to emqx_utils_sql:parse_insert/1 but can %% also handle Clickhouse's SQL extension for INSERT statments that allows the %% user to specify different formats: %% diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index cb0c0e16e..9512d2f6b 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -443,11 +443,11 @@ parse_sql_template(Config) -> parse_sql_template(maps:to_list(RawSQLTemplates), BatchInsertTks). parse_sql_template([{Key, H} | T], BatchInsertTks) -> - case emqx_plugin_libs_rule:detect_sql_type(H) of - {ok, select} -> + case emqx_utils_sql:get_statement_type(H) of + select -> parse_sql_template(T, BatchInsertTks); - {ok, insert} -> - case emqx_plugin_libs_rule:split_insert_sql(H) of + insert -> + case emqx_utils_sql:parse_insert(H) of {ok, {InsertSQL, Params}} -> parse_sql_template( T, @@ -463,6 +463,9 @@ parse_sql_template([{Key, H} | T], BatchInsertTks) -> ?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}), parse_sql_template(T, BatchInsertTks) end; + Type when is_atom(Type) -> + ?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}), + parse_sql_template(T, BatchInsertTks); {error, Reason} -> ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}), parse_sql_template(T, BatchInsertTks) @@ -488,10 +491,19 @@ apply_template( undefined -> BatchReqs; #{?BATCH_INSERT_PART := BatchInserts, ?BATCH_PARAMS_TOKENS := BatchParamsTks} -> - SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks), + SQL = proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks), {Key, SQL} end; apply_template(Query, Templates) -> %% TODO: more detail infomatoin ?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}), {error, failed_to_apply_sql_template}. + +proc_batch_sql(BatchReqs, BatchInserts, Tokens) -> + Values = erlang:iolist_to_binary( + lists:join($,, [ + emqx_placeholder:proc_sql_param_str(Tokens, Msg) + || {_, Msg} <- BatchReqs + ]) + ), + <>. diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 6c1d06429..e47a2d083 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -256,10 +256,10 @@ parse_prepare_sql(Config) -> parse_batch_prepare_sql(maps:to_list(SQL), #{}, #{}). parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) -> - case emqx_plugin_libs_rule:detect_sql_type(H) of - {ok, select} -> + case emqx_utils_sql:get_statement_type(H) of + select -> parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap); - {ok, insert} -> + insert -> InsertTks = emqx_placeholder:preproc_tmpl(H), H1 = string:trim(H, trailing, ";"), case split_insert_sql(H1) of @@ -275,6 +275,9 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) -> ?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}), parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) end; + Type when is_atom(Type) -> + ?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}), + parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap); {error, Reason} -> ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}), parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) @@ -289,7 +292,7 @@ to_bin(List) when is_list(List) -> unicode:characters_to_binary(List, utf8). split_insert_sql(SQL0) -> - SQL = emqx_plugin_libs_rule:formalize_sql(SQL0), + SQL = formalize_sql(SQL0), lists:filtermap( fun(E) -> case string:trim(E) of @@ -301,3 +304,9 @@ split_insert_sql(SQL0) -> end, re:split(SQL, "(?i)(insert into)|(?i)(values)") ). + +formalize_sql(Input) -> + %% 1. replace all whitespaces like '\r' '\n' or spaces to a single space char. + SQL = re:replace(Input, "\\s+", " ", [global, {return, binary}]), + %% 2. trims the result + string:trim(SQL). diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index a46b21d92..9c40919f2 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -357,11 +357,11 @@ parse_prepare_sql([], Prepares, Tokens, BatchInserts, BatchTks) -> }. parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks) -> - case emqx_plugin_libs_rule:detect_sql_type(H) of - {ok, select} -> + case emqx_utils_sql:get_statement_type(H) of + select -> parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks); - {ok, insert} -> - case emqx_plugin_libs_rule:split_insert_sql(H) of + insert -> + case emqx_utils_sql:parse_insert(H) of {ok, {InsertSQL, Params}} -> ParamsTks = emqx_placeholder:preproc_tmpl(Params), parse_prepare_sql( @@ -375,6 +375,9 @@ parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks ?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}), parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks) end; + Type when is_atom(Type) -> + ?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}), + parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks); {error, Reason} -> ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}), parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks) diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index aa9e38316..38dfb492f 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -19,10 +19,7 @@ %% preprocess and process template string with place holders -export([ - split_insert_sql/1, - detect_sql_type/1, - proc_batch_sql/3, - formalize_sql/1 + proc_batch_sql/3 ]). %% type converting @@ -52,44 +49,6 @@ -type tmpl_token() :: list({var, binary()} | {str, binary()}). -%% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">> --spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when - InsertSQL :: binary(), - Params :: binary(). -split_insert_sql(SQL0) -> - SQL = formalize_sql(SQL0), - case re:split(SQL, "((?i)values)", [{return, binary}]) of - [Part1, _, Part3] -> - case string:trim(Part1, leading) of - <<"insert", _/binary>> = InsertSQL -> - {ok, {InsertSQL, Part3}}; - <<"INSERT", _/binary>> = InsertSQL -> - {ok, {InsertSQL, Part3}}; - _ -> - {error, not_insert_sql} - end; - _ -> - {error, not_insert_sql} - end. - --spec detect_sql_type(binary()) -> {ok, Type} | {error, atom()} when - Type :: insert | select. -detect_sql_type(SQL) -> - case re:run(SQL, "^\\s*([a-zA-Z]+)", [{capture, all_but_first, list}]) of - {match, [First]} -> - Types = [select, insert], - PropTypes = [{erlang:atom_to_list(Type), Type} || Type <- Types], - LowFirst = string:lowercase(First), - case proplists:lookup(LowFirst, PropTypes) of - {LowFirst, Type} -> - {ok, Type}; - _ -> - {error, invalid_sql} - end; - _ -> - {error, invalid_sql} - end. - -spec proc_batch_sql( BatchReqs :: list({atom(), map()}), InsertPart :: binary(), @@ -104,12 +63,6 @@ proc_batch_sql(BatchReqs, InsertPart, Tokens) -> ), <>. -formalize_sql(Input) -> - %% 1. replace all whitespaces like '\r' '\n' or spaces to a single space char. - SQL = re:replace(Input, "\\s+", " ", [global, {return, binary}]), - %% 2. trims the result - string:trim(SQL). - unsafe_atom_key(Key) when is_atom(Key) -> Key; unsafe_atom_key(Key) when is_binary(Key) -> diff --git a/apps/emqx_utils/src/emqx_placeholder.erl b/apps/emqx_utils/src/emqx_placeholder.erl index 861548f93..fd40b906d 100644 --- a/apps/emqx_utils/src/emqx_placeholder.erl +++ b/apps/emqx_utils/src/emqx_placeholder.erl @@ -90,8 +90,6 @@ | {tmpl, tmpl_token()} | {value, term()}. --dialyzer({no_improper_lists, [quote_mysql/1, escape_mysql/4, escape_prepend/4]}). - %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ @@ -247,22 +245,15 @@ bin(Val) -> emqx_utils_conv:bin(Val). -spec quote_sql(_Value) -> iolist(). quote_sql(Str) -> - quote_escape(Str, fun escape_sql/1). + emqx_utils_sql:to_sql_string(Str, #{escaping => sql}). -spec quote_cql(_Value) -> iolist(). quote_cql(Str) -> - quote_escape(Str, fun escape_cql/1). + emqx_utils_sql:to_sql_string(Str, #{escaping => cql}). -spec quote_mysql(_Value) -> iolist(). -quote_mysql(Str) when is_binary(Str) -> - try - escape_mysql(Str) - catch - throw:invalid_utf8 -> - [<<"0x">> | binary:encode_hex(Str)] - end; quote_mysql(Str) -> - quote_escape(Str, fun escape_mysql/1). + emqx_utils_sql:to_sql_string(Str, #{escaping => mysql}). lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] -> Value; @@ -370,57 +361,3 @@ unwrap(<<"\"${", Val/binary>>, _StripDoubleQuote = true) -> binary:part(Val, {0, byte_size(Val) - 2}); unwrap(<<"${", Val/binary>>, _StripDoubleQuote) -> binary:part(Val, {0, byte_size(Val) - 1}). - --spec quote_escape(_Value, fun((binary()) -> iodata())) -> iodata(). -quote_escape(Str, EscapeFun) when is_binary(Str) -> - EscapeFun(Str); -quote_escape(Str, EscapeFun) when is_list(Str) -> - case unicode:characters_to_binary(Str) of - Bin when is_binary(Bin) -> - EscapeFun(Bin); - Otherwise -> - error(Otherwise) - end; -quote_escape(Str, EscapeFun) when is_atom(Str) orelse is_map(Str) -> - EscapeFun(bin(Str)); -quote_escape(Val, _EscapeFun) -> - bin(Val). - --spec escape_sql(binary()) -> iolist(). -escape_sql(S) -> - ES = binary:replace(S, [<<"\\">>, <<"'">>], <<"\\">>, [global, {insert_replaced, 1}]), - [$', ES, $']. - --spec escape_cql(binary()) -> iolist(). -escape_cql(S) -> - ES = binary:replace(S, <<"'">>, <<"'">>, [global, {insert_replaced, 1}]), - [$', ES, $']. - --spec escape_mysql(binary()) -> iolist(). -escape_mysql(S0) -> - % https://dev.mysql.com/doc/refman/8.0/en/string-literals.html - [$', escape_mysql(S0, 0, 0, S0), $']. - -%% NOTE -%% This thing looks more complicated than needed because it's optimized for as few -%% intermediate memory (re)allocations as possible. -escape_mysql(<<$', Rest/binary>>, I, Run, Src) -> - escape_prepend(I, Run, Src, [<<"\\'">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); -escape_mysql(<<$\\, Rest/binary>>, I, Run, Src) -> - escape_prepend(I, Run, Src, [<<"\\\\">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); -escape_mysql(<<0, Rest/binary>>, I, Run, Src) -> - escape_prepend(I, Run, Src, [<<"\\0">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); -escape_mysql(<<_/utf8, Rest/binary>> = S, I, Run, Src) -> - CWidth = byte_size(S) - byte_size(Rest), - escape_mysql(Rest, I, Run + CWidth, Src); -escape_mysql(<<>>, 0, _, Src) -> - Src; -escape_mysql(<<>>, I, Run, Src) -> - binary:part(Src, I, Run); -escape_mysql(_, _I, _Run, _Src) -> - throw(invalid_utf8). - -escape_prepend(_RunI, 0, _Src, Tail) -> - Tail; -escape_prepend(I, Run, Src, Tail) -> - [binary:part(Src, I, Run) | Tail]. diff --git a/apps/emqx_utils/src/emqx_utils_sql.erl b/apps/emqx_utils/src/emqx_utils_sql.erl new file mode 100644 index 000000000..3caed6b62 --- /dev/null +++ b/apps/emqx_utils/src/emqx_utils_sql.erl @@ -0,0 +1,157 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_utils_sql). + +-export([get_statement_type/1]). +-export([parse_insert/1]). + +-export([to_sql_value/1]). +-export([to_sql_string/2]). + +-export([escape_sql/1]). +-export([escape_cql/1]). +-export([escape_mysql/1]). + +-export_type([value/0]). + +-type statement_type() :: select | insert | delete. +-type value() :: null | binary() | number() | boolean() | [value()]. + +-dialyzer({no_improper_lists, [escape_mysql/4, escape_prepend/4]}). + +-spec get_statement_type(iodata()) -> statement_type() | {error, unknown}. +get_statement_type(Query) -> + KnownTypes = #{ + <<"select">> => select, + <<"insert">> => insert, + <<"delete">> => delete + }, + case re:run(Query, <<"^\\s*([a-zA-Z]+)">>, [{capture, all_but_first, binary}]) of + {match, [Token]} -> + maps:get(string:lowercase(Token), KnownTypes, {error, unknown}); + _ -> + {error, unknown} + end. + +%% @doc Parse an INSERT SQL statement into its INSERT part and the VALUES part. +%% SQL = <<"INSERT INTO \"abc\" (c1, c2, c3) VALUES (${a}, ${b}, ${c.prop})">> +%% {ok, {<<"INSERT INTO \"abc\" (c1, c2, c3)">>, <<"(${a}, ${b}, ${c.prop})">>}} +-spec parse_insert(iodata()) -> + {ok, {_Statement :: binary(), _Rows :: binary()}} | {error, not_insert_sql}. +parse_insert(SQL) -> + case re:split(SQL, "((?i)values)", [{return, binary}]) of + [Part1, _, Part3] -> + case string:trim(Part1, leading) of + <<"insert", _/binary>> = InsertSQL -> + {ok, {InsertSQL, Part3}}; + <<"INSERT", _/binary>> = InsertSQL -> + {ok, {InsertSQL, Part3}}; + _ -> + {error, not_insert_sql} + end; + _ -> + {error, not_insert_sql} + end. + +%% @doc Convert an Erlang term to a value that can be used primarily in +%% prepared SQL statements. +-spec to_sql_value(term()) -> value(). +to_sql_value(undefined) -> null; +to_sql_value(List) when is_list(List) -> List; +to_sql_value(Bin) when is_binary(Bin) -> Bin; +to_sql_value(Num) when is_number(Num) -> Num; +to_sql_value(Bool) when is_boolean(Bool) -> Bool; +to_sql_value(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); +to_sql_value(Map) when is_map(Map) -> emqx_utils_json:encode(Map). + +%% @doc Convert an Erlang term to a string that can be interpolated in literal +%% SQL statements. The value is escaped if necessary. +-spec to_sql_string(term(), Options) -> iodata() when + Options :: #{ + escaping => cql | mysql | sql + }. +to_sql_string(String, #{escaping := mysql}) when is_binary(String) -> + try + escape_mysql(String) + catch + throw:invalid_utf8 -> + [<<"0x">>, binary:encode_hex(String)] + end; +to_sql_string(Term, #{escaping := mysql}) -> + maybe_escape(Term, fun escape_mysql/1); +to_sql_string(Term, #{escaping := cql}) -> + maybe_escape(Term, fun escape_cql/1); +to_sql_string(Term, #{}) -> + maybe_escape(Term, fun escape_sql/1). + +-spec maybe_escape(_Value, fun((binary()) -> iodata())) -> iodata(). +maybe_escape(Str, EscapeFun) when is_binary(Str) -> + EscapeFun(Str); +maybe_escape(Str, EscapeFun) when is_list(Str) -> + case unicode:characters_to_binary(Str) of + Bin when is_binary(Bin) -> + EscapeFun(Bin); + Otherwise -> + error(Otherwise) + end; +maybe_escape(Val, EscapeFun) when is_atom(Val) orelse is_map(Val) -> + EscapeFun(emqx_utils_conv:bin(Val)); +maybe_escape(Val, _EscapeFun) -> + emqx_utils_conv:bin(Val). + +-spec escape_sql(binary()) -> iodata(). +escape_sql(S) -> + % NOTE + % This is a bit misleading: currently, escaping logic in `escape_sql/1` likely + % won't work with pgsql since it does not support C-style escapes by default. + % https://www.postgresql.org/docs/14/sql-syntax-lexical.html#SQL-SYNTAX-CONSTANTS + ES = binary:replace(S, [<<"\\">>, <<"'">>], <<"\\">>, [global, {insert_replaced, 1}]), + [$', ES, $']. + +-spec escape_cql(binary()) -> iodata(). +escape_cql(S) -> + ES = binary:replace(S, <<"'">>, <<"'">>, [global, {insert_replaced, 1}]), + [$', ES, $']. + +-spec escape_mysql(binary()) -> iodata(). +escape_mysql(S0) -> + % https://dev.mysql.com/doc/refman/8.0/en/string-literals.html + [$', escape_mysql(S0, 0, 0, S0), $']. + +%% NOTE +%% This thing looks more complicated than needed because it's optimized for as few +%% intermediate memory (re)allocations as possible. +escape_mysql(<<$', Rest/binary>>, I, Run, Src) -> + escape_prepend(I, Run, Src, [<<"\\'">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); +escape_mysql(<<$\\, Rest/binary>>, I, Run, Src) -> + escape_prepend(I, Run, Src, [<<"\\\\">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); +escape_mysql(<<0, Rest/binary>>, I, Run, Src) -> + escape_prepend(I, Run, Src, [<<"\\0">> | escape_mysql(Rest, I + Run + 1, 0, Src)]); +escape_mysql(<<_/utf8, Rest/binary>> = S, I, Run, Src) -> + CWidth = byte_size(S) - byte_size(Rest), + escape_mysql(Rest, I, Run + CWidth, Src); +escape_mysql(<<>>, 0, _, Src) -> + Src; +escape_mysql(<<>>, I, Run, Src) -> + binary:part(Src, I, Run); +escape_mysql(_, _I, _Run, _Src) -> + throw(invalid_utf8). + +escape_prepend(_RunI, 0, _Src, Tail) -> + Tail; +escape_prepend(I, Run, Src, Tail) -> + [binary:part(Src, I, Run) | Tail]. From e6fb0203b424eaf53ed559b0ba824c013203d2cd Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 9 Jun 2023 00:22:53 +0300 Subject: [PATCH 5/7] refactor(pluglib): move connectivity checks to `emqx_connector_lib` --- .../src/emqx_bridge_opents_connector.erl | 4 +- .../emqx_connector/src/emqx_connector_lib.erl | 31 +++++++++++ .../test/emqx_connector_lib_tests.erl | 54 +++++++++++++++++++ .../test/emqx_plugin_libs_rule_SUITE.erl | 18 ------- 4 files changed, 88 insertions(+), 19 deletions(-) create mode 100644 apps/emqx_connector/test/emqx_connector_lib_tests.erl diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index 71184e872..555921c93 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -47,6 +47,8 @@ fields(config) -> %% `emqx_resource' API %%======================================================================================== +-define(HTTP_CONNECT_TIMEOUT, 1000). + callback_mode() -> always_sync. is_buffer_supported() -> false. @@ -171,7 +173,7 @@ opentsdb_connectivity(Server) -> <<"https://", _/binary>> -> Server; _ -> "http://" ++ Server end, - emqx_plugin_libs_rule:http_connectivity(SvrUrl). + emqx_connector_lib:http_connectivity(SvrUrl, ?HTTP_CONNECT_TIMEOUT). format_opentsdb_msg(Msg) -> maps:with( diff --git a/apps/emqx_connector/src/emqx_connector_lib.erl b/apps/emqx_connector/src/emqx_connector_lib.erl index d554ca5ab..8fbe7597a 100644 --- a/apps/emqx_connector/src/emqx_connector_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_lib.erl @@ -15,8 +15,39 @@ %%-------------------------------------------------------------------- -module(emqx_connector_lib). +%% connectivity check +-export([ + http_connectivity/2, + tcp_connectivity/3 +]). + -export([resolve_dns/2]). +-spec http_connectivity(uri_string:uri_string(), timeout()) -> + ok | {error, Reason :: term()}. +http_connectivity(Url, Timeout) -> + case emqx_http_lib:uri_parse(Url) of + {ok, #{host := Host, port := Port}} -> + tcp_connectivity(Host, Port, Timeout); + {error, Reason} -> + {error, Reason} + end. + +-spec tcp_connectivity( + Host :: inet:socket_address() | inet:hostname(), + Port :: inet:port_number(), + timeout() +) -> + ok | {error, Reason :: term()}. +tcp_connectivity(Host, Port, Timeout) -> + case gen_tcp:connect(Host, Port, emqx_utils:ipv6_probe([]), Timeout) of + {ok, Sock} -> + gen_tcp:close(Sock), + ok; + {error, Reason} -> + {error, Reason} + end. + %% @doc Mostly for meck. resolve_dns(DNS, Type) -> inet_res:lookup(DNS, in, Type). diff --git a/apps/emqx_connector/test/emqx_connector_lib_tests.erl b/apps/emqx_connector/test/emqx_connector_lib_tests.erl new file mode 100644 index 000000000..5add35d31 --- /dev/null +++ b/apps/emqx_connector/test/emqx_connector_lib_tests.erl @@ -0,0 +1,54 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_connector_lib_tests). + +-include_lib("eunit/include/eunit.hrl"). + +http_connectivity_ok_test() -> + {ok, Socket} = gen_tcp:listen(0, []), + {ok, Port} = inet:port(Socket), + ?assertEqual( + ok, + emqx_connector_lib:http_connectivity("http://127.0.0.1:" ++ integer_to_list(Port), 1000) + ), + gen_tcp:close(Socket). + +http_connectivity_error_test() -> + {ok, Socket} = gen_tcp:listen(0, []), + {ok, Port} = inet:port(Socket), + ok = gen_tcp:close(Socket), + ?assertEqual( + {error, econnrefused}, + emqx_connector_lib:http_connectivity("http://127.0.0.1:" ++ integer_to_list(Port), 1000) + ). + +tcp_connectivity_ok_test() -> + {ok, Socket} = gen_tcp:listen(0, []), + {ok, Port} = inet:port(Socket), + ?assertEqual( + ok, + emqx_connector_lib:tcp_connectivity("127.0.0.1", Port, 1000) + ), + ok = gen_tcp:close(Socket). + +tcp_connectivity_error_test() -> + {ok, Socket} = gen_tcp:listen(0, []), + {ok, Port} = inet:port(Socket), + ok = gen_tcp:close(Socket), + ?assertEqual( + {error, econnrefused}, + emqx_connector_lib:tcp_connectivity("127.0.0.1", Port, 1000) + ). diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl index bb1ba838b..73b700974 100644 --- a/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl @@ -21,26 +21,8 @@ -include_lib("eunit/include/eunit.hrl"). --define(PORT, 9876). - all() -> emqx_common_test_helpers:all(?MODULE). -t_http_connectivity(_) -> - {ok, Socket} = gen_tcp:listen(?PORT, []), - ok = emqx_plugin_libs_rule:http_connectivity( - "http://127.0.0.1:" ++ integer_to_list(?PORT), 1000 - ), - gen_tcp:close(Socket), - {error, _} = emqx_plugin_libs_rule:http_connectivity( - "http://127.0.0.1:" ++ integer_to_list(?PORT), 1000 - ). - -t_tcp_connectivity(_) -> - {ok, Socket} = gen_tcp:listen(?PORT, []), - ok = emqx_plugin_libs_rule:tcp_connectivity("127.0.0.1", ?PORT, 1000), - gen_tcp:close(Socket), - {error, _} = emqx_plugin_libs_rule:tcp_connectivity("127.0.0.1", ?PORT, 1000). - t_atom_key(_) -> _ = erlang, _ = port, From 830ba54721b20c1923d590150b55a1b2911087d8 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 9 Jun 2023 00:33:05 +0300 Subject: [PATCH 6/7] refactor(pluglib): remove `emqx_plugin_libs` application --- .github/CODEOWNERS | 1 - apps/emqx_machine/src/emqx_machine_boot.erl | 1 - apps/emqx_machine/test/emqx_machine_SUITE.erl | 1 - apps/emqx_plugin_libs/rebar.config | 8 - .../src/emqx_plugin_libs.app.src | 8 - .../src/emqx_plugin_libs.appup.src | 13 -- .../emqx_plugin_libs/src/emqx_plugin_libs.erl | 17 -- .../src/emqx_plugin_libs_rule.erl | 202 ------------------ .../test/emqx_plugin_libs_rule_SUITE.erl | 47 ---- mix.exs | 1 - rebar.config.erl | 1 - 11 files changed, 300 deletions(-) delete mode 100644 apps/emqx_plugin_libs/rebar.config delete mode 100644 apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src delete mode 100644 apps/emqx_plugin_libs/src/emqx_plugin_libs.appup.src delete mode 100644 apps/emqx_plugin_libs/src/emqx_plugin_libs.erl delete mode 100644 apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl delete mode 100644 apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index a45d9af59..4ad6049f3 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -11,7 +11,6 @@ /apps/emqx_ft/ @emqx/emqx-review-board @savonarola @keynslug /apps/emqx_gateway/ @emqx/emqx-review-board @lafirest /apps/emqx_management/ @emqx/emqx-review-board @lafirest @sstrigler -/apps/emqx_plugin_libs/ @emqx/emqx-review-board @lafirest /apps/emqx_plugins/ @emqx/emqx-review-board @JimMoen /apps/emqx_prometheus/ @emqx/emqx-review-board @JimMoen /apps/emqx_psk/ @emqx/emqx-review-board @lafirest diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index 4c5740bad..b2b0823ce 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -137,7 +137,6 @@ basic_reboot_apps() -> emqx_resource, emqx_rule_engine, emqx_bridge, - emqx_plugin_libs, emqx_management, emqx_retainer, emqx_exhook, diff --git a/apps/emqx_machine/test/emqx_machine_SUITE.erl b/apps/emqx_machine/test/emqx_machine_SUITE.erl index d8613a537..bd18c67aa 100644 --- a/apps/emqx_machine/test/emqx_machine_SUITE.erl +++ b/apps/emqx_machine/test/emqx_machine_SUITE.erl @@ -51,7 +51,6 @@ init_per_suite(Config) -> emqx_resource, emqx_rule_engine, emqx_bridge, - emqx_plugin_libs, emqx_management, emqx_retainer, emqx_exhook, diff --git a/apps/emqx_plugin_libs/rebar.config b/apps/emqx_plugin_libs/rebar.config deleted file mode 100644 index dee2902a5..000000000 --- a/apps/emqx_plugin_libs/rebar.config +++ /dev/null @@ -1,8 +0,0 @@ -%% -*- mode: erlang -*- - -{deps, [ - {emqx, {path, "../emqx"}}, - {emqx_utils, {path, "../emqx_utils"}} -]}. - -{project_plugins, [erlfmt]}. diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src b/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src deleted file mode 100644 index 6953acdf1..000000000 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src +++ /dev/null @@ -1,8 +0,0 @@ -%% -*- mode: erlang -*- -{application, emqx_plugin_libs, [ - {description, "EMQX Plugin utility libs"}, - {vsn, "4.3.12"}, - {modules, []}, - {applications, [kernel, stdlib]}, - {env, []} -]}. diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs.appup.src b/apps/emqx_plugin_libs/src/emqx_plugin_libs.appup.src deleted file mode 100644 index 7ae0f0317..000000000 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs.appup.src +++ /dev/null @@ -1,13 +0,0 @@ -%% -*- mode: erlang -*- -{VSN, - [ {"4.3.0", - [ {add_module, emqx_plugin_libs_pool} - ]}, - {<<".*">>, []} - ], - [ {"4.3.0", - [ {delete_module, emqx_plugin_libs_pool} - ]}, - {<<".*">>, []} - ] -}. diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs.erl deleted file mode 100644 index e237ffe82..000000000 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs.erl +++ /dev/null @@ -1,17 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_plugin_libs). diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl deleted file mode 100644 index 38dfb492f..000000000 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ /dev/null @@ -1,202 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_plugin_libs_rule). --elvis([{elvis_style, god_modules, disable}]). - -%% preprocess and process template string with place holders --export([ - proc_batch_sql/3 -]). - -%% type converting --export([ - bool/1, - int/1, - float/1, - float2str/2, - map/1, - utf8_bin/1, - utf8_str/1, - atom_key/1, - unsafe_atom_key/1 -]). - -%% connectivity check --export([ - http_connectivity/1, - http_connectivity/2, - tcp_connectivity/2, - tcp_connectivity/3 -]). - --compile({no_auto_import, [float/1]}). - --type uri_string() :: iodata(). - --type tmpl_token() :: list({var, binary()} | {str, binary()}). - --spec proc_batch_sql( - BatchReqs :: list({atom(), map()}), - InsertPart :: binary(), - Tokens :: tmpl_token() -) -> InsertSQL :: binary(). -proc_batch_sql(BatchReqs, InsertPart, Tokens) -> - ValuesPart = erlang:iolist_to_binary( - lists:join($,, [ - emqx_placeholder:proc_sql_param_str(Tokens, Msg) - || {_, Msg} <- BatchReqs - ]) - ), - <>. - -unsafe_atom_key(Key) when is_atom(Key) -> - Key; -unsafe_atom_key(Key) when is_binary(Key) -> - binary_to_atom(Key, utf8); -unsafe_atom_key(Keys = [_Key | _]) -> - [unsafe_atom_key(SubKey) || SubKey <- Keys]; -unsafe_atom_key(Key) -> - error({invalid_key, Key}). - -atom_key(Key) when is_atom(Key) -> - Key; -atom_key(Key) when is_binary(Key) -> - try - binary_to_existing_atom(Key, utf8) - catch - error:badarg -> error({invalid_key, Key}) - end; -%% nested keys -atom_key(Keys = [_Key | _]) -> - [atom_key(SubKey) || SubKey <- Keys]; -atom_key(Key) -> - error({invalid_key, Key}). - --spec http_connectivity(uri_string()) -> ok | {error, Reason :: term()}. -http_connectivity(Url) -> - http_connectivity(Url, 3000). - --spec http_connectivity(uri_string(), integer()) -> ok | {error, Reason :: term()}. -http_connectivity(Url, Timeout) -> - case emqx_http_lib:uri_parse(Url) of - {ok, #{host := Host, port := Port}} -> - tcp_connectivity(Host, Port, Timeout); - {error, Reason} -> - {error, Reason} - end. - --spec tcp_connectivity( - Host :: inet:socket_address() | inet:hostname(), - Port :: inet:port_number() -) -> - ok | {error, Reason :: term()}. -tcp_connectivity(Host, Port) -> - tcp_connectivity(Host, Port, 3000). - --spec tcp_connectivity( - Host :: inet:socket_address() | inet:hostname(), - Port :: inet:port_number(), - Timeout :: integer() -) -> - ok | {error, Reason :: term()}. -tcp_connectivity(Host, Port, Timeout) -> - case gen_tcp:connect(Host, Port, emqx_utils:ipv6_probe([]), Timeout) of - {ok, Sock} -> - gen_tcp:close(Sock), - ok; - {error, Reason} -> - {error, Reason} - end. - -utf8_bin(Str) when is_binary(Str); is_list(Str) -> - unicode:characters_to_binary(Str); -utf8_bin(Str) -> - unicode:characters_to_binary(emqx_utils_conv:bin(Str)). - -utf8_str(Str) when is_binary(Str); is_list(Str) -> - unicode:characters_to_list(Str); -utf8_str(Str) -> - unicode:characters_to_list(emqx_utils_conv:str(Str)). - -int(List) when is_list(List) -> - try - list_to_integer(List) - catch - error:badarg -> - int(list_to_float(List)) - end; -int(Bin) when is_binary(Bin) -> - try - binary_to_integer(Bin) - catch - error:badarg -> - int(binary_to_float(Bin)) - end; -int(Int) when is_integer(Int) -> Int; -int(Float) when is_float(Float) -> erlang:floor(Float); -int(true) -> - 1; -int(false) -> - 0; -int(Data) -> - error({invalid_number, Data}). - -float(List) when is_list(List) -> - try - list_to_float(List) - catch - error:badarg -> - float(list_to_integer(List)) - end; -float(Bin) when is_binary(Bin) -> - try - binary_to_float(Bin) - catch - error:badarg -> - float(binary_to_integer(Bin)) - end; -float(Num) when is_number(Num) -> erlang:float(Num); -float(Data) -> - error({invalid_number, Data}). - -float2str(Float, Precision) when is_float(Float) and is_integer(Precision) -> - float_to_binary(Float, [{decimals, Precision}, compact]). - -map(Bin) when is_binary(Bin) -> - case emqx_utils_json:decode(Bin, [return_maps]) of - Map = #{} -> Map; - _ -> error({invalid_map, Bin}) - end; -map(List) when is_list(List) -> maps:from_list(List); -map(Map) when is_map(Map) -> Map; -map(Data) -> - error({invalid_map, Data}). - -bool(Bool) when - Bool == true; - Bool == <<"true">>; - Bool == 1 --> - true; -bool(Bool) when - Bool == false; - Bool == <<"false">>; - Bool == 0 --> - false; -bool(Bool) -> - error({invalid_boolean, Bool}). diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl deleted file mode 100644 index 73b700974..000000000 --- a/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl +++ /dev/null @@ -1,47 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_plugin_libs_rule_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). - -all() -> emqx_common_test_helpers:all(?MODULE). - -t_atom_key(_) -> - _ = erlang, - _ = port, - ?assertEqual([erlang], emqx_plugin_libs_rule:atom_key([<<"erlang">>])), - ?assertEqual([erlang, port], emqx_plugin_libs_rule:atom_key([<<"erlang">>, port])), - ?assertEqual([erlang, port], emqx_plugin_libs_rule:atom_key([<<"erlang">>, <<"port">>])), - ?assertEqual(erlang, emqx_plugin_libs_rule:atom_key(<<"erlang">>)), - ?assertError({invalid_key, {a, v}}, emqx_plugin_libs_rule:atom_key({a, v})), - _ = xyz876gv123, - ?assertEqual([xyz876gv123, port], emqx_plugin_libs_rule:atom_key([<<"xyz876gv123">>, port])). - -t_unsafe_atom_key(_) -> - ?assertEqual([xyz876gv], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv">>])), - ?assertEqual( - [xyz876gv33, port], - emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv33">>, port]) - ), - ?assertEqual( - [xyz876gv331, port1221], - emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv331">>, <<"port1221">>]) - ), - ?assertEqual(xyz876gv3312, emqx_plugin_libs_rule:unsafe_atom_key(<<"xyz876gv3312">>)). diff --git a/mix.exs b/mix.exs index fbd88e61d..5fdf5c565 100644 --- a/mix.exs +++ b/mix.exs @@ -352,7 +352,6 @@ defmodule EMQXUmbrella.MixProject do [ mnesia: :load, ekka: :load, - emqx_plugin_libs: :load, esasl: :load, observer_cli: :permanent, tools: :permanent, diff --git a/rebar.config.erl b/rebar.config.erl index d265f53cd..81b5fba5f 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -405,7 +405,6 @@ relx_apps(ReleaseType, Edition) -> [ {mnesia, load}, {ekka, load}, - {emqx_plugin_libs, load}, {esasl, load}, observer_cli, tools, From c8360f86b78c1eff104033926934ce67ea4c0b53 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 9 Jun 2023 01:02:51 +0300 Subject: [PATCH 7/7] chore: bump application versions --- apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src | 2 +- apps/emqx_machine/src/emqx_machine.app.src | 2 +- lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src b/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src index 38da1e18b..1fda99700 100644 --- a/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src +++ b/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src @@ -1,6 +1,6 @@ {application, emqx_gateway_stomp, [ {description, "Stomp Gateway"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src index 5cfa80369..b9138d07d 100644 --- a/apps/emqx_machine/src/emqx_machine.app.src +++ b/apps/emqx_machine/src/emqx_machine.app.src @@ -3,7 +3,7 @@ {id, "emqx_machine"}, {description, "The EMQX Machine"}, % strict semver, bump manually! - {vsn, "0.2.5"}, + {vsn, "0.2.6"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, emqx_ctl]}, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 3ed460492..702b9ff09 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_connector, [ {description, "EMQX Enterprise connectors"}, - {vsn, "0.1.13"}, + {vsn, "0.1.14"}, {registered, []}, {applications, [ kernel,