Merge pull request #10987 from keynslug/fix/EMQX-9257/sep-placeholder
refactor: tear `emqx_plugin_libs` application apart
This commit is contained in:
commit
b930f4cc73
|
@ -11,7 +11,6 @@
|
|||
/apps/emqx_ft/ @emqx/emqx-review-board @savonarola @keynslug
|
||||
/apps/emqx_gateway/ @emqx/emqx-review-board @lafirest
|
||||
/apps/emqx_management/ @emqx/emqx-review-board @lafirest @sstrigler
|
||||
/apps/emqx_plugin_libs/ @emqx/emqx-review-board @lafirest
|
||||
/apps/emqx_plugins/ @emqx/emqx-review-board @JimMoen
|
||||
/apps/emqx_prometheus/ @emqx/emqx-review-board @JimMoen
|
||||
/apps/emqx_psk/ @emqx/emqx-review-board @lafirest
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
%% APIs
|
||||
-export([
|
||||
match/2,
|
||||
match_any/2,
|
||||
validate/1,
|
||||
validate/2,
|
||||
levels/1,
|
||||
|
@ -86,6 +87,12 @@ match([_H1 | _], []) ->
|
|||
match([], [_H | _T2]) ->
|
||||
false.
|
||||
|
||||
-spec match_any(Name, [Filter]) -> boolean() when
|
||||
Name :: topic() | words(),
|
||||
Filter :: topic() | words().
|
||||
match_any(Topic, Filters) ->
|
||||
lists:any(fun(Filter) -> match(Topic, Filter) end, Filters).
|
||||
|
||||
%% @doc Validate topic name or filter
|
||||
-spec validate(topic() | {name | filter, topic()}) -> true.
|
||||
validate(Topic) when is_binary(Topic) ->
|
||||
|
|
|
@ -225,21 +225,19 @@ without_password(Credential, [Name | Rest]) ->
|
|||
without_password(Credential, Rest)
|
||||
end.
|
||||
|
||||
urlencode_var({var, _} = Var, Value) ->
|
||||
emqx_http_lib:uri_encode(handle_var(Var, Value));
|
||||
urlencode_var(Var, Value) ->
|
||||
handle_var(Var, Value).
|
||||
emqx_http_lib:uri_encode(handle_var(Var, Value)).
|
||||
|
||||
handle_var({var, _Name}, undefined) ->
|
||||
handle_var(_Name, undefined) ->
|
||||
<<>>;
|
||||
handle_var({var, <<"peerhost">>}, PeerHost) ->
|
||||
handle_var([<<"peerhost">>], PeerHost) ->
|
||||
emqx_placeholder:bin(inet:ntoa(PeerHost));
|
||||
handle_var(_, Value) ->
|
||||
emqx_placeholder:bin(Value).
|
||||
|
||||
handle_sql_var({var, _Name}, undefined) ->
|
||||
handle_sql_var(_Name, undefined) ->
|
||||
<<>>;
|
||||
handle_sql_var({var, <<"peerhost">>}, PeerHost) ->
|
||||
handle_sql_var([<<"peerhost">>], PeerHost) ->
|
||||
emqx_placeholder:bin(inet:ntoa(PeerHost));
|
||||
handle_sql_var(_, Value) ->
|
||||
emqx_placeholder:sql_data(Value).
|
||||
|
|
|
@ -188,21 +188,19 @@ convert_client_var({dn, DN}) -> {cert_subject, DN};
|
|||
convert_client_var({protocol, Proto}) -> {proto_name, Proto};
|
||||
convert_client_var(Other) -> Other.
|
||||
|
||||
urlencode_var({var, _} = Var, Value) ->
|
||||
emqx_http_lib:uri_encode(handle_var(Var, Value));
|
||||
urlencode_var(Var, Value) ->
|
||||
handle_var(Var, Value).
|
||||
emqx_http_lib:uri_encode(handle_var(Var, Value)).
|
||||
|
||||
handle_var({var, _Name}, undefined) ->
|
||||
handle_var(_Name, undefined) ->
|
||||
<<>>;
|
||||
handle_var({var, <<"peerhost">>}, IpAddr) ->
|
||||
handle_var([<<"peerhost">>], IpAddr) ->
|
||||
inet_parse:ntoa(IpAddr);
|
||||
handle_var(_Name, Value) ->
|
||||
emqx_placeholder:bin(Value).
|
||||
|
||||
handle_sql_var({var, _Name}, undefined) ->
|
||||
handle_sql_var(_Name, undefined) ->
|
||||
<<>>;
|
||||
handle_sql_var({var, <<"peerhost">>}, IpAddr) ->
|
||||
handle_sql_var([<<"peerhost">>], IpAddr) ->
|
||||
inet_parse:ntoa(IpAddr);
|
||||
handle_sql_var(_Name, Value) ->
|
||||
emqx_placeholder:sql_data(Value).
|
||||
|
|
|
@ -81,7 +81,7 @@ t_compile(_) ->
|
|||
{{127, 0, 0, 1}, {127, 0, 0, 1}, 32},
|
||||
{{192, 168, 1, 0}, {192, 168, 1, 255}, 24}
|
||||
]},
|
||||
subscribe, [{pattern, [{var, {var, <<"clientid">>}}]}]},
|
||||
subscribe, [{pattern, [{var, [<<"clientid">>]}]}]},
|
||||
emqx_authz_rule:compile(?SOURCE3)
|
||||
),
|
||||
|
||||
|
@ -99,14 +99,14 @@ t_compile(_) ->
|
|||
{clientid, {re_pattern, _, _, _, _}}
|
||||
]},
|
||||
publish, [
|
||||
{pattern, [{var, {var, <<"username">>}}]}, {pattern, [{var, {var, <<"clientid">>}}]}
|
||||
{pattern, [{var, [<<"username">>]}]}, {pattern, [{var, [<<"clientid">>]}]}
|
||||
]},
|
||||
emqx_authz_rule:compile(?SOURCE5)
|
||||
),
|
||||
|
||||
?assertEqual(
|
||||
{allow, {username, {eq, <<"test">>}}, publish, [
|
||||
{pattern, [{str, <<"t/foo">>}, {var, {var, <<"username">>}}, {str, <<"boo">>}]}
|
||||
{pattern, [{str, <<"t/foo">>}, {var, [<<"username">>]}, {str, <<"boo">>}]}
|
||||
]},
|
||||
emqx_authz_rule:compile(?SOURCE6)
|
||||
),
|
||||
|
|
|
@ -273,10 +273,10 @@ proc_cql_params(
|
|||
%% assert
|
||||
_PreparedKey = maps:get(PreparedKey0, Prepares),
|
||||
Tokens = maps:get(PreparedKey0, ParamsTokens),
|
||||
{PreparedKey0, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))};
|
||||
{PreparedKey0, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))};
|
||||
proc_cql_params(query, SQL, Params, _State) ->
|
||||
{SQL1, Tokens} = emqx_plugin_libs_rule:preproc_sql(SQL, '?'),
|
||||
{SQL1, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}.
|
||||
{SQL1, Tokens} = emqx_placeholder:preproc_sql(SQL, '?'),
|
||||
{SQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}.
|
||||
|
||||
exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
|
||||
Type == query; Type == prepared_query
|
||||
|
@ -403,7 +403,7 @@ parse_prepare_cql(_) ->
|
|||
#{prepare_cql => #{}, params_tokens => #{}}.
|
||||
|
||||
parse_prepare_cql([{Key, H} | T], Prepares, Tokens) ->
|
||||
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '?'),
|
||||
{PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '?'),
|
||||
parse_prepare_cql(
|
||||
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
|
||||
);
|
||||
|
|
|
@ -193,7 +193,7 @@ prepare_sql_templates(#{
|
|||
batch_value_separator := Separator
|
||||
}) ->
|
||||
InsertTemplate =
|
||||
emqx_plugin_libs_rule:preproc_tmpl(Template),
|
||||
emqx_placeholder:preproc_tmpl(Template),
|
||||
BulkExtendInsertTemplate =
|
||||
prepare_sql_bulk_extend_template(Template, Separator),
|
||||
#{
|
||||
|
@ -210,9 +210,9 @@ prepare_sql_bulk_extend_template(Template, Separator) ->
|
|||
%% Add separator before ValuesTemplate so that one can append it
|
||||
%% to an insert template
|
||||
ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]),
|
||||
emqx_plugin_libs_rule:preproc_tmpl(ExtendParamTemplate).
|
||||
emqx_placeholder:preproc_tmpl(ExtendParamTemplate).
|
||||
|
||||
%% This function is similar to emqx_plugin_libs_rule:split_insert_sql/1 but can
|
||||
%% This function is similar to emqx_utils_sql:parse_insert/1 but can
|
||||
%% also handle Clickhouse's SQL extension for INSERT statments that allows the
|
||||
%% user to specify different formats:
|
||||
%%
|
||||
|
@ -363,7 +363,7 @@ on_query(
|
|||
transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL).
|
||||
|
||||
get_sql(send_message, #{send_message_template := PreparedSQL}, Data) ->
|
||||
emqx_plugin_libs_rule:proc_tmpl(PreparedSQL, Data);
|
||||
emqx_placeholder:proc_tmpl(PreparedSQL, Data);
|
||||
get_sql(_, _, SQL) ->
|
||||
SQL.
|
||||
|
||||
|
@ -421,10 +421,10 @@ objects_to_sql(
|
|||
}
|
||||
) ->
|
||||
%% Prepare INSERT-statement and the first row after VALUES
|
||||
InsertStatementHead = emqx_plugin_libs_rule:proc_tmpl(InsertTemplate, FirstObject),
|
||||
InsertStatementHead = emqx_placeholder:proc_tmpl(InsertTemplate, FirstObject),
|
||||
FormatObjectDataFunction =
|
||||
fun(Object) ->
|
||||
emqx_plugin_libs_rule:proc_tmpl(BulkExtendInsertTemplate, Object)
|
||||
emqx_placeholder:proc_tmpl(BulkExtendInsertTemplate, Object)
|
||||
end,
|
||||
InsertStatementTail = lists:map(FormatObjectDataFunction, RemainingObjects),
|
||||
CompleteStatement = erlang:iolist_to_binary([InsertStatementHead, InsertStatementTail]),
|
||||
|
|
|
@ -197,7 +197,7 @@ parse_template(Config) ->
|
|||
parse_template(maps:to_list(Templates), #{}).
|
||||
|
||||
parse_template([{Key, H} | T], Templates) ->
|
||||
ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(H),
|
||||
ParamsTks = emqx_placeholder:preproc_tmpl(H),
|
||||
parse_template(
|
||||
T,
|
||||
Templates#{Key => ParamsTks}
|
||||
|
|
|
@ -144,7 +144,7 @@ apply_template({Key, Msg} = Req, Templates) ->
|
|||
undefined ->
|
||||
Req;
|
||||
Template ->
|
||||
{Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)}
|
||||
{Key, emqx_placeholder:proc_tmpl(Template, Msg)}
|
||||
end;
|
||||
%% now there is no batch delete, so
|
||||
%% 1. we can simply replace the `send_message` to `put`
|
||||
|
|
|
@ -39,7 +39,7 @@
|
|||
connect_timeout := timer:time(),
|
||||
jwt_config := emqx_connector_jwt:jwt_config(),
|
||||
max_retries := non_neg_integer(),
|
||||
payload_template := emqx_plugin_libs_rule:tmpl_token(),
|
||||
payload_template := emqx_placeholder:tmpl_token(),
|
||||
pool_name := binary(),
|
||||
project_id := binary(),
|
||||
pubsub_topic := binary(),
|
||||
|
@ -101,7 +101,7 @@ on_start(
|
|||
connect_timeout => ConnectTimeout,
|
||||
jwt_config => JWTConfig,
|
||||
max_retries => MaxRetries,
|
||||
payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate),
|
||||
payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
|
||||
pool_name => ResourceId,
|
||||
project_id => ProjectId,
|
||||
pubsub_topic => PubSubTopic,
|
||||
|
@ -291,7 +291,7 @@ encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) ->
|
|||
Interpolated =
|
||||
case PayloadTemplate of
|
||||
[] -> emqx_utils_json:encode(Selected);
|
||||
_ -> emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, Selected)
|
||||
_ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected)
|
||||
end,
|
||||
#{data => base64:encode(Interpolated)}.
|
||||
|
||||
|
|
|
@ -436,7 +436,7 @@ to_config([Item0 | Rest], Acc, Precision) ->
|
|||
Ts0 = maps:get(timestamp, Item0, undefined),
|
||||
{Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision),
|
||||
Item = #{
|
||||
measurement => emqx_plugin_libs_rule:preproc_tmpl(maps:get(measurement, Item0)),
|
||||
measurement => emqx_placeholder:preproc_tmpl(maps:get(measurement, Item0)),
|
||||
timestamp => Ts,
|
||||
precision => {FromPrecision, ToPrecision},
|
||||
tags => to_kv_config(maps:get(tags, Item0)),
|
||||
|
@ -458,18 +458,18 @@ preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) ->
|
|||
preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) ->
|
||||
preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision);
|
||||
preproc_tmpl_timestamp(<<?DEFAULT_TIMESTAMP_TMPL>> = Ts, Precision) ->
|
||||
{emqx_plugin_libs_rule:preproc_tmpl(Ts), ms, Precision};
|
||||
{emqx_placeholder:preproc_tmpl(Ts), ms, Precision};
|
||||
preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) ->
|
||||
%% a placehold is in use. e.g. ${payload.my_timestamp}
|
||||
%% we can only hope it the value will be of the same precision in the configs
|
||||
{emqx_plugin_libs_rule:preproc_tmpl(Ts), Precision, Precision}.
|
||||
{emqx_placeholder:preproc_tmpl(Ts), Precision, Precision}.
|
||||
|
||||
to_kv_config(KVfields) ->
|
||||
maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)).
|
||||
|
||||
to_maps_config(K, V, Res) ->
|
||||
NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)),
|
||||
NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)),
|
||||
NK = emqx_placeholder:preproc_tmpl(bin(K)),
|
||||
NV = emqx_placeholder:preproc_tmpl(bin(V)),
|
||||
Res#{NK => NV}.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
|
@ -505,7 +505,7 @@ parse_batch_data(InstId, BatchData, SyntaxLines) ->
|
|||
fields := [{binary(), binary()}],
|
||||
measurement := binary(),
|
||||
tags := [{binary(), binary()}],
|
||||
timestamp := emqx_plugin_libs_rule:tmpl_token() | integer(),
|
||||
timestamp := emqx_placeholder:tmpl_token() | integer(),
|
||||
precision := {From :: ts_precision(), To :: ts_precision()}
|
||||
}
|
||||
]) -> {ok, [map()]} | {error, term()}.
|
||||
|
@ -526,7 +526,7 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error
|
|||
is_list(Ts)
|
||||
->
|
||||
TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
||||
case parse_timestamp(emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions)) of
|
||||
case parse_timestamp(emqx_placeholder:proc_tmpl(Ts, Data, TransOptions)) of
|
||||
{ok, TsInt} ->
|
||||
Item1 = Item#{timestamp => TsInt},
|
||||
continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc);
|
||||
|
@ -573,7 +573,7 @@ line_to_point(
|
|||
{_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
|
||||
{_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
|
||||
maps:without([precision], Item#{
|
||||
measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data),
|
||||
measurement => emqx_placeholder:proc_tmpl(Measurement, Data),
|
||||
tags => EncodedTags,
|
||||
fields => EncodedFields,
|
||||
timestamp => maybe_convert_time_unit(Ts, Precision)
|
||||
|
@ -590,8 +590,8 @@ time_unit(ns) -> nanosecond.
|
|||
maps_config_to_data(K, V, {Data, Res}) ->
|
||||
KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
|
||||
VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
||||
NK0 = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions),
|
||||
NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, VTransOptions),
|
||||
NK0 = emqx_placeholder:proc_tmpl(K, Data, KTransOptions),
|
||||
NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions),
|
||||
case {NK0, NV} of
|
||||
{[undefined], _} ->
|
||||
{Data, Res};
|
||||
|
@ -637,7 +637,7 @@ value_type(Val) ->
|
|||
Val.
|
||||
|
||||
key_filter(undefined) -> undefined;
|
||||
key_filter(Value) -> emqx_plugin_libs_rule:bin(Value).
|
||||
key_filter(Value) -> emqx_utils_conv:bin(Value).
|
||||
|
||||
data_filter(undefined) -> undefined;
|
||||
data_filter(Int) when is_integer(Int) -> Int;
|
||||
|
@ -645,7 +645,7 @@ data_filter(Number) when is_number(Number) -> Number;
|
|||
data_filter(Bool) when is_boolean(Bool) -> Bool;
|
||||
data_filter(Data) -> bin(Data).
|
||||
|
||||
bin(Data) -> emqx_plugin_libs_rule:bin(Data).
|
||||
bin(Data) -> emqx_utils_conv:bin(Data).
|
||||
|
||||
%% helper funcs
|
||||
log_error_points(InstId, Errs) ->
|
||||
|
|
|
@ -185,7 +185,7 @@ preproc_data(
|
|||
timestamp => maybe_preproc_tmpl(
|
||||
maps:get(<<"timestamp">>, Data, <<"now">>)
|
||||
),
|
||||
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
|
||||
measurement => emqx_placeholder:preproc_tmpl(Measurement),
|
||||
data_type => DataType,
|
||||
value => maybe_preproc_tmpl(Value)
|
||||
}
|
||||
|
@ -203,7 +203,7 @@ preproc_data(_NoMatch, Acc) ->
|
|||
Acc.
|
||||
|
||||
maybe_preproc_tmpl(Value) when is_binary(Value) ->
|
||||
emqx_plugin_libs_rule:preproc_tmpl(Value);
|
||||
emqx_placeholder:preproc_tmpl(Value);
|
||||
maybe_preproc_tmpl(Value) ->
|
||||
Value.
|
||||
|
||||
|
@ -225,7 +225,7 @@ proc_data(PreProcessedData, Msg) ->
|
|||
) ->
|
||||
#{
|
||||
timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
|
||||
measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Msg),
|
||||
measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
|
||||
data_type => DataType,
|
||||
value => proc_value(DataType, ValueTkn, Msg)
|
||||
}
|
||||
|
@ -236,7 +236,7 @@ proc_data(PreProcessedData, Msg) ->
|
|||
iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
|
||||
Timestamp;
|
||||
iot_timestamp(TimestampTkn, Msg, Nows) ->
|
||||
iot_timestamp(emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows).
|
||||
iot_timestamp(emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
|
||||
|
||||
iot_timestamp(Timestamp, #{now_ms := NowMs}) when
|
||||
Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
|
||||
|
@ -250,7 +250,7 @@ iot_timestamp(Timestamp, _) when is_binary(Timestamp) ->
|
|||
binary_to_integer(Timestamp).
|
||||
|
||||
proc_value(<<"TEXT">>, ValueTkn, Msg) ->
|
||||
case emqx_plugin_libs_rule:proc_tmpl(ValueTkn, Msg) of
|
||||
case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of
|
||||
<<"undefined">> -> null;
|
||||
Val -> Val
|
||||
end;
|
||||
|
@ -262,7 +262,7 @@ proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> ->
|
|||
convert_float(replace_var(ValueTkn, Msg)).
|
||||
|
||||
replace_var(Tokens, Data) when is_list(Tokens) ->
|
||||
[Val] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
|
||||
[Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
|
||||
Val;
|
||||
replace_var(Val, _Data) ->
|
||||
Val.
|
||||
|
@ -410,8 +410,8 @@ device_id(Message, Payloads, State) ->
|
|||
%% [FIXME] there could be conflicting device-ids in the Payloads
|
||||
maps:get(<<"device_id">>, hd(Payloads), undefined);
|
||||
DeviceId ->
|
||||
DeviceIdTkn = emqx_plugin_libs_rule:preproc_tmpl(DeviceId),
|
||||
emqx_plugin_libs_rule:proc_tmpl(DeviceIdTkn, Message)
|
||||
DeviceIdTkn = emqx_placeholder:preproc_tmpl(DeviceId),
|
||||
emqx_placeholder:proc_tmpl(DeviceIdTkn, Message)
|
||||
end.
|
||||
|
||||
handle_response({ok, 200, _Headers, Body} = Resp) ->
|
||||
|
|
|
@ -68,7 +68,7 @@
|
|||
resource_id := resource_id(),
|
||||
topic_mapping := #{
|
||||
kafka_topic() := #{
|
||||
payload_template := emqx_plugin_libs_rule:tmpl_token(),
|
||||
payload_template := emqx_placeholder:tmpl_token(),
|
||||
mqtt_topic => emqx_types:topic(),
|
||||
qos => emqx_types:qos()
|
||||
}
|
||||
|
@ -82,7 +82,7 @@
|
|||
resource_id := resource_id(),
|
||||
topic_mapping := #{
|
||||
kafka_topic() := #{
|
||||
payload_template := emqx_plugin_libs_rule:tmpl_token(),
|
||||
payload_template := emqx_placeholder:tmpl_token(),
|
||||
mqtt_topic => emqx_types:topic(),
|
||||
qos => emqx_types:qos()
|
||||
}
|
||||
|
@ -534,7 +534,7 @@ convert_topic_mapping(TopicMappingList) ->
|
|||
qos := QoS,
|
||||
payload_template := PayloadTemplate0
|
||||
} = Fields,
|
||||
PayloadTemplate = emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate0),
|
||||
PayloadTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate0),
|
||||
Acc#{
|
||||
KafkaTopic => #{
|
||||
payload_template => PayloadTemplate,
|
||||
|
@ -554,10 +554,10 @@ render(FullMessage, PayloadTemplate) ->
|
|||
(undefined) ->
|
||||
<<>>;
|
||||
(X) ->
|
||||
emqx_plugin_libs_rule:bin(X)
|
||||
emqx_utils_conv:bin(X)
|
||||
end
|
||||
},
|
||||
emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, FullMessage, Opts).
|
||||
emqx_placeholder:proc_tmpl(PayloadTemplate, FullMessage, Opts).
|
||||
|
||||
encode(Value, none) ->
|
||||
Value;
|
||||
|
|
|
@ -252,7 +252,7 @@ compile_message_template(T) ->
|
|||
}.
|
||||
|
||||
preproc_tmpl(Tmpl) ->
|
||||
emqx_plugin_libs_rule:preproc_tmpl(Tmpl).
|
||||
emqx_placeholder:preproc_tmpl(Tmpl).
|
||||
|
||||
render_message(
|
||||
#{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message
|
||||
|
@ -267,11 +267,11 @@ render(Template, Message) ->
|
|||
Opts = #{
|
||||
var_trans => fun
|
||||
(undefined) -> <<"">>;
|
||||
(X) -> emqx_plugin_libs_rule:bin(X)
|
||||
(X) -> emqx_utils_conv:bin(X)
|
||||
end,
|
||||
return => full_binary
|
||||
},
|
||||
emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts).
|
||||
emqx_placeholder:proc_tmpl(Template, Message, Opts).
|
||||
|
||||
render_timestamp(Template, Message) ->
|
||||
try
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-export_type([msgvars/0]).
|
||||
|
||||
-type template() :: emqx_plugin_libs_rule:tmpl_token().
|
||||
-type template() :: emqx_placeholder:tmpl_token().
|
||||
|
||||
-type msgvars() :: #{
|
||||
topic => template(),
|
||||
|
@ -48,7 +48,7 @@ parse(Conf) ->
|
|||
parse_field(Key, Conf, Acc) ->
|
||||
case Conf of
|
||||
#{Key := Val} when is_binary(Val) ->
|
||||
Acc#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)};
|
||||
Acc#{Key => emqx_placeholder:preproc_tmpl(Val)};
|
||||
#{Key := Val} ->
|
||||
Acc#{Key => Val};
|
||||
#{} ->
|
||||
|
|
|
@ -46,6 +46,8 @@ fields(config) ->
|
|||
%% `emqx_resource' API
|
||||
%%========================================================================================
|
||||
|
||||
-define(HTTP_CONNECT_TIMEOUT, 1000).
|
||||
|
||||
callback_mode() -> always_sync.
|
||||
|
||||
on_start(
|
||||
|
@ -168,7 +170,7 @@ opentsdb_connectivity(Server) ->
|
|||
<<"https://", _/binary>> -> Server;
|
||||
_ -> "http://" ++ Server
|
||||
end,
|
||||
emqx_plugin_libs_rule:http_connectivity(SvrUrl).
|
||||
emqx_connector_lib:http_connectivity(SvrUrl, ?HTTP_CONNECT_TIMEOUT).
|
||||
|
||||
format_opentsdb_msg(Msg) ->
|
||||
maps:with(
|
||||
|
|
|
@ -34,8 +34,8 @@
|
|||
value := binary()
|
||||
}.
|
||||
-type message_template() :: #{
|
||||
key := emqx_plugin_libs_rule:tmpl_token(),
|
||||
value := emqx_plugin_libs_rule:tmpl_token()
|
||||
key := emqx_placeholder:tmpl_token(),
|
||||
value := emqx_placeholder:tmpl_token()
|
||||
}.
|
||||
-type config() :: #{
|
||||
authentication := _,
|
||||
|
@ -419,7 +419,7 @@ compile_message_template(TemplateOpts) ->
|
|||
}.
|
||||
|
||||
preproc_tmpl(Template) ->
|
||||
emqx_plugin_libs_rule:preproc_tmpl(Template).
|
||||
emqx_placeholder:preproc_tmpl(Template).
|
||||
|
||||
render_message(
|
||||
Message, #{key := KeyTemplate, value := ValueTemplate}
|
||||
|
@ -433,11 +433,11 @@ render(Message, Template) ->
|
|||
Opts = #{
|
||||
var_trans => fun
|
||||
(undefined) -> <<"">>;
|
||||
(X) -> emqx_plugin_libs_rule:bin(X)
|
||||
(X) -> emqx_utils_conv:bin(X)
|
||||
end,
|
||||
return => full_binary
|
||||
},
|
||||
emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts).
|
||||
emqx_placeholder:proc_tmpl(Template, Message, Opts).
|
||||
|
||||
get_producer_status(Producers) ->
|
||||
case pulsar_producers:all_connected(Producers) of
|
||||
|
|
|
@ -219,7 +219,7 @@ on_start(
|
|||
{pool_size, PoolSize},
|
||||
{pool, InstanceID}
|
||||
],
|
||||
ProcessedTemplate = emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate),
|
||||
ProcessedTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate),
|
||||
State = #{
|
||||
poolname => InstanceID,
|
||||
processed_payload_template => ProcessedTemplate,
|
||||
|
@ -541,7 +541,7 @@ is_send_message_atom(_) ->
|
|||
format_data([], Msg) ->
|
||||
emqx_utils_json:encode(Msg);
|
||||
format_data(Tokens, Msg) ->
|
||||
emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg).
|
||||
emqx_placeholder:proc_tmpl(Tokens, Msg).
|
||||
|
||||
handle_result({error, ecpool_empty}) ->
|
||||
{error, {recoverable_error, ecpool_empty}};
|
||||
|
|
|
@ -99,7 +99,7 @@ on_start(
|
|||
emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS)
|
||||
),
|
||||
ClientId = client_id(InstanceId),
|
||||
TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic),
|
||||
TopicTks = emqx_placeholder:preproc_tmpl(Topic),
|
||||
#{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config),
|
||||
ClientCfg = #{acl_info => AclInfo},
|
||||
Templates = parse_template(Config),
|
||||
|
@ -237,7 +237,7 @@ parse_template(Config) ->
|
|||
parse_template(maps:to_list(Templates), #{}).
|
||||
|
||||
parse_template([{Key, H} | T], Templates) ->
|
||||
ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(H),
|
||||
ParamsTks = emqx_placeholder:preproc_tmpl(H),
|
||||
parse_template(
|
||||
T,
|
||||
Templates#{Key => ParamsTks}
|
||||
|
@ -246,7 +246,7 @@ parse_template([], Templates) ->
|
|||
Templates.
|
||||
|
||||
get_topic_key({_, Msg}, TopicTks) ->
|
||||
emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg);
|
||||
emqx_placeholder:proc_tmpl(TopicTks, Msg);
|
||||
get_topic_key([Query | _], TopicTks) ->
|
||||
get_topic_key(Query, TopicTks).
|
||||
|
||||
|
@ -255,14 +255,14 @@ apply_template({Key, Msg} = _Req, Templates) ->
|
|||
undefined ->
|
||||
emqx_utils_json:encode(Msg);
|
||||
Template ->
|
||||
emqx_plugin_libs_rule:proc_tmpl(Template, Msg)
|
||||
emqx_placeholder:proc_tmpl(Template, Msg)
|
||||
end;
|
||||
apply_template([{Key, _} | _] = Reqs, Templates) ->
|
||||
case maps:get(Key, Templates, undefined) of
|
||||
undefined ->
|
||||
[emqx_utils_json:encode(Msg) || {_, Msg} <- Reqs];
|
||||
Template ->
|
||||
[emqx_plugin_libs_rule:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs]
|
||||
[emqx_placeholder:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs]
|
||||
end.
|
||||
|
||||
client_id(ResourceId) ->
|
||||
|
|
|
@ -43,7 +43,7 @@
|
|||
%% Internal exports used to execute code with ecpool worker
|
||||
-export([do_get_status/1, worker_do_insert/3]).
|
||||
|
||||
-import(emqx_plugin_libs_rule, [str/1]).
|
||||
-import(emqx_utils_conv, [str/1]).
|
||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
||||
-define(ACTION_SEND_MESSAGE, send_message).
|
||||
|
@ -440,11 +440,11 @@ parse_sql_template(Config) ->
|
|||
parse_sql_template(maps:to_list(RawSQLTemplates), BatchInsertTks).
|
||||
|
||||
parse_sql_template([{Key, H} | T], BatchInsertTks) ->
|
||||
case emqx_plugin_libs_rule:detect_sql_type(H) of
|
||||
{ok, select} ->
|
||||
case emqx_utils_sql:get_statement_type(H) of
|
||||
select ->
|
||||
parse_sql_template(T, BatchInsertTks);
|
||||
{ok, insert} ->
|
||||
case emqx_plugin_libs_rule:split_insert_sql(H) of
|
||||
insert ->
|
||||
case emqx_utils_sql:parse_insert(H) of
|
||||
{ok, {InsertSQL, Params}} ->
|
||||
parse_sql_template(
|
||||
T,
|
||||
|
@ -452,9 +452,7 @@ parse_sql_template([{Key, H} | T], BatchInsertTks) ->
|
|||
Key =>
|
||||
#{
|
||||
?BATCH_INSERT_PART => InsertSQL,
|
||||
?BATCH_PARAMS_TOKENS => emqx_plugin_libs_rule:preproc_tmpl(
|
||||
Params
|
||||
)
|
||||
?BATCH_PARAMS_TOKENS => emqx_placeholder:preproc_tmpl(Params)
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -462,6 +460,9 @@ parse_sql_template([{Key, H} | T], BatchInsertTks) ->
|
|||
?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}),
|
||||
parse_sql_template(T, BatchInsertTks)
|
||||
end;
|
||||
Type when is_atom(Type) ->
|
||||
?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}),
|
||||
parse_sql_template(T, BatchInsertTks);
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}),
|
||||
parse_sql_template(T, BatchInsertTks)
|
||||
|
@ -475,7 +476,7 @@ parse_sql_template([], BatchInsertTks) ->
|
|||
apply_template(
|
||||
{?ACTION_SEND_MESSAGE = _Key, _Msg} = Query, Templates
|
||||
) ->
|
||||
%% TODO: fix emqx_plugin_libs_rule:proc_tmpl/2
|
||||
%% TODO: fix emqx_placeholder:proc_tmpl/2
|
||||
%% it won't add single quotes for string
|
||||
apply_template([Query], Templates);
|
||||
%% batch inserts
|
||||
|
@ -487,10 +488,19 @@ apply_template(
|
|||
undefined ->
|
||||
BatchReqs;
|
||||
#{?BATCH_INSERT_PART := BatchInserts, ?BATCH_PARAMS_TOKENS := BatchParamsTks} ->
|
||||
SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks),
|
||||
SQL = proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks),
|
||||
{Key, SQL}
|
||||
end;
|
||||
apply_template(Query, Templates) ->
|
||||
%% TODO: more detail infomatoin
|
||||
?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}),
|
||||
{error, failed_to_apply_sql_template}.
|
||||
|
||||
proc_batch_sql(BatchReqs, BatchInserts, Tokens) ->
|
||||
Values = erlang:iolist_to_binary(
|
||||
lists:join($,, [
|
||||
emqx_placeholder:proc_sql_param_str(Tokens, Msg)
|
||||
|| {_, Msg} <- BatchReqs
|
||||
])
|
||||
),
|
||||
<<BatchInserts/binary, " values ", Values/binary>>.
|
||||
|
|
|
@ -123,8 +123,8 @@ on_query(InstanceId, {query, SQL}, State) ->
|
|||
do_query(InstanceId, SQL, State);
|
||||
on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) ->
|
||||
case maps:find(Key, InsertTksMap) of
|
||||
{ok, Tokens} ->
|
||||
SQL = emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Data),
|
||||
{ok, Tokens} when is_map(Data) ->
|
||||
SQL = emqx_placeholder:proc_sql_param_str(Tokens, Data),
|
||||
do_query(InstanceId, SQL, State);
|
||||
_ ->
|
||||
{error, {unrecoverable_error, invalid_request}}
|
||||
|
@ -133,7 +133,7 @@ on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) ->
|
|||
%% aggregate the batch queries to one SQL is a heavy job, we should put it in the worker process
|
||||
on_batch_query(
|
||||
InstanceId,
|
||||
[{Key, _} | _] = BatchReq,
|
||||
[{Key, _Data = #{}} | _] = BatchReq,
|
||||
#{batch_tokens := BatchTksMap, query_opts := Opts} = State
|
||||
) ->
|
||||
case maps:find(Key, BatchTksMap) of
|
||||
|
@ -228,8 +228,8 @@ do_batch_insert(Conn, Tokens, BatchReqs, Opts) ->
|
|||
aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) ->
|
||||
lists:foldl(
|
||||
fun({_, Data}, Acc) ->
|
||||
InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data),
|
||||
ParamsPart = emqx_plugin_libs_rule:proc_sql_param_str(ParamsPartTks, Data),
|
||||
InsertPart = emqx_placeholder:proc_sql_param_str(InsertPartTks, Data),
|
||||
ParamsPart = emqx_placeholder:proc_sql_param_str(ParamsPartTks, Data),
|
||||
Values = maps:get(InsertPart, Acc, []),
|
||||
maps:put(InsertPart, [ParamsPart | Values], Acc)
|
||||
end,
|
||||
|
@ -253,16 +253,16 @@ parse_prepare_sql(Config) ->
|
|||
parse_batch_prepare_sql(maps:to_list(SQL), #{}, #{}).
|
||||
|
||||
parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) ->
|
||||
case emqx_plugin_libs_rule:detect_sql_type(H) of
|
||||
{ok, select} ->
|
||||
case emqx_utils_sql:get_statement_type(H) of
|
||||
select ->
|
||||
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap);
|
||||
{ok, insert} ->
|
||||
InsertTks = emqx_plugin_libs_rule:preproc_tmpl(H),
|
||||
insert ->
|
||||
InsertTks = emqx_placeholder:preproc_tmpl(H),
|
||||
H1 = string:trim(H, trailing, ";"),
|
||||
case split_insert_sql(H1) of
|
||||
[_InsertStr, InsertPart, _ValuesStr, ParamsPart] ->
|
||||
InsertPartTks = emqx_plugin_libs_rule:preproc_tmpl(InsertPart),
|
||||
ParamsPartTks = emqx_plugin_libs_rule:preproc_tmpl(ParamsPart),
|
||||
InsertPartTks = emqx_placeholder:preproc_tmpl(InsertPart),
|
||||
ParamsPartTks = emqx_placeholder:preproc_tmpl(ParamsPart),
|
||||
parse_batch_prepare_sql(
|
||||
T,
|
||||
InsertTksMap#{Key => InsertTks},
|
||||
|
@ -272,6 +272,9 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) ->
|
|||
?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}),
|
||||
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap)
|
||||
end;
|
||||
Type when is_atom(Type) ->
|
||||
?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}),
|
||||
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap);
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}),
|
||||
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap)
|
||||
|
@ -286,7 +289,7 @@ to_bin(List) when is_list(List) ->
|
|||
unicode:characters_to_binary(List, utf8).
|
||||
|
||||
split_insert_sql(SQL0) ->
|
||||
SQL = emqx_plugin_libs_rule:formalize_sql(SQL0),
|
||||
SQL = formalize_sql(SQL0),
|
||||
lists:filtermap(
|
||||
fun(E) ->
|
||||
case string:trim(E) of
|
||||
|
@ -298,3 +301,9 @@ split_insert_sql(SQL0) ->
|
|||
end,
|
||||
re:split(SQL, "(?i)(insert into)|(?i)(values)")
|
||||
).
|
||||
|
||||
formalize_sql(Input) ->
|
||||
%% 1. replace all whitespaces like '\r' '\n' or spaces to a single space char.
|
||||
SQL = re:replace(Input, "\\s+", " ", [global, {return, binary}]),
|
||||
%% 2. trims the result
|
||||
string:trim(SQL).
|
||||
|
|
|
@ -496,7 +496,7 @@ t_simple_sql_query(Config) ->
|
|||
),
|
||||
case EnableBatch of
|
||||
true ->
|
||||
?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
|
||||
?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
|
||||
false ->
|
||||
?assertMatch({ok, #{<<"code">> := 0, <<"data">> := [[1]]}}, Result)
|
||||
end,
|
||||
|
@ -535,7 +535,7 @@ t_bad_sql_parameter(Config) ->
|
|||
2_000
|
||||
),
|
||||
|
||||
?assertMatch({error, #{<<"code">> := _}}, Result),
|
||||
?assertMatch({error, {unrecoverable_error, invalid_request}}, Result),
|
||||
ok.
|
||||
|
||||
t_nasty_sql_string(Config) ->
|
||||
|
|
|
@ -464,8 +464,8 @@ preprocess_request(
|
|||
} = Req
|
||||
) ->
|
||||
#{
|
||||
method => emqx_plugin_libs_rule:preproc_tmpl(to_bin(Method)),
|
||||
path => emqx_plugin_libs_rule:preproc_tmpl(Path),
|
||||
method => emqx_placeholder:preproc_tmpl(to_bin(Method)),
|
||||
path => emqx_placeholder:preproc_tmpl(Path),
|
||||
body => maybe_preproc_tmpl(body, Req),
|
||||
headers => wrap_auth_header(preproc_headers(Headers)),
|
||||
request_timeout => maps:get(request_timeout, Req, ?DEFAULT_REQUEST_TIMEOUT_MS),
|
||||
|
@ -477,8 +477,8 @@ preproc_headers(Headers) when is_map(Headers) ->
|
|||
fun(K, V, Acc) ->
|
||||
[
|
||||
{
|
||||
emqx_plugin_libs_rule:preproc_tmpl(to_bin(K)),
|
||||
emqx_plugin_libs_rule:preproc_tmpl(to_bin(V))
|
||||
emqx_placeholder:preproc_tmpl(to_bin(K)),
|
||||
emqx_placeholder:preproc_tmpl(to_bin(V))
|
||||
}
|
||||
| Acc
|
||||
]
|
||||
|
@ -490,8 +490,8 @@ preproc_headers(Headers) when is_list(Headers) ->
|
|||
lists:map(
|
||||
fun({K, V}) ->
|
||||
{
|
||||
emqx_plugin_libs_rule:preproc_tmpl(to_bin(K)),
|
||||
emqx_plugin_libs_rule:preproc_tmpl(to_bin(V))
|
||||
emqx_placeholder:preproc_tmpl(to_bin(K)),
|
||||
emqx_placeholder:preproc_tmpl(to_bin(V))
|
||||
}
|
||||
end,
|
||||
Headers
|
||||
|
@ -530,7 +530,7 @@ try_bin_to_lower(Bin) ->
|
|||
maybe_preproc_tmpl(Key, Conf) ->
|
||||
case maps:get(Key, Conf, undefined) of
|
||||
undefined -> undefined;
|
||||
Val -> emqx_plugin_libs_rule:preproc_tmpl(Val)
|
||||
Val -> emqx_placeholder:preproc_tmpl(Val)
|
||||
end.
|
||||
|
||||
process_request(
|
||||
|
@ -544,8 +544,8 @@ process_request(
|
|||
Msg
|
||||
) ->
|
||||
Conf#{
|
||||
method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg)),
|
||||
path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg),
|
||||
method => make_method(emqx_placeholder:proc_tmpl(MethodTks, Msg)),
|
||||
path => emqx_placeholder:proc_tmpl(PathTks, Msg),
|
||||
body => process_request_body(BodyTks, Msg),
|
||||
headers => proc_headers(HeadersTks, Msg),
|
||||
request_timeout => ReqTimeout
|
||||
|
@ -554,14 +554,14 @@ process_request(
|
|||
process_request_body(undefined, Msg) ->
|
||||
emqx_utils_json:encode(Msg);
|
||||
process_request_body(BodyTks, Msg) ->
|
||||
emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg).
|
||||
emqx_placeholder:proc_tmpl(BodyTks, Msg).
|
||||
|
||||
proc_headers(HeaderTks, Msg) ->
|
||||
lists:map(
|
||||
fun({K, V}) ->
|
||||
{
|
||||
emqx_plugin_libs_rule:proc_tmpl(K, Msg),
|
||||
emqx_plugin_libs_rule:proc_tmpl(emqx_secret:unwrap(V), Msg)
|
||||
emqx_placeholder:proc_tmpl(K, Msg),
|
||||
emqx_placeholder:proc_tmpl(emqx_secret:unwrap(V), Msg)
|
||||
}
|
||||
end,
|
||||
HeaderTks
|
||||
|
|
|
@ -15,8 +15,39 @@
|
|||
%%--------------------------------------------------------------------
|
||||
-module(emqx_connector_lib).
|
||||
|
||||
%% connectivity check
|
||||
-export([
|
||||
http_connectivity/2,
|
||||
tcp_connectivity/3
|
||||
]).
|
||||
|
||||
-export([resolve_dns/2]).
|
||||
|
||||
-spec http_connectivity(uri_string:uri_string(), timeout()) ->
|
||||
ok | {error, Reason :: term()}.
|
||||
http_connectivity(Url, Timeout) ->
|
||||
case emqx_http_lib:uri_parse(Url) of
|
||||
{ok, #{host := Host, port := Port}} ->
|
||||
tcp_connectivity(Host, Port, Timeout);
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec tcp_connectivity(
|
||||
Host :: inet:socket_address() | inet:hostname(),
|
||||
Port :: inet:port_number(),
|
||||
timeout()
|
||||
) ->
|
||||
ok | {error, Reason :: term()}.
|
||||
tcp_connectivity(Host, Port, Timeout) ->
|
||||
case gen_tcp:connect(Host, Port, emqx_utils:ipv6_probe([]), Timeout) of
|
||||
{ok, Sock} ->
|
||||
gen_tcp:close(Sock),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%% @doc Mostly for meck.
|
||||
resolve_dns(DNS, Type) ->
|
||||
inet_res:lookup(DNS, in, Type).
|
||||
|
|
|
@ -344,7 +344,7 @@ parse_prepare_sql(Config) ->
|
|||
parse_prepare_sql(maps:to_list(SQL), #{}, #{}, #{}, #{}).
|
||||
|
||||
parse_prepare_sql([{Key, H} | _] = L, Prepares, Tokens, BatchInserts, BatchTks) ->
|
||||
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H),
|
||||
{PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H),
|
||||
parse_batch_prepare_sql(
|
||||
L, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}, BatchInserts, BatchTks
|
||||
);
|
||||
|
@ -357,13 +357,13 @@ parse_prepare_sql([], Prepares, Tokens, BatchInserts, BatchTks) ->
|
|||
}.
|
||||
|
||||
parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks) ->
|
||||
case emqx_plugin_libs_rule:detect_sql_type(H) of
|
||||
{ok, select} ->
|
||||
case emqx_utils_sql:get_statement_type(H) of
|
||||
select ->
|
||||
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks);
|
||||
{ok, insert} ->
|
||||
case emqx_plugin_libs_rule:split_insert_sql(H) of
|
||||
insert ->
|
||||
case emqx_utils_sql:parse_insert(H) of
|
||||
{ok, {InsertSQL, Params}} ->
|
||||
ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(Params),
|
||||
ParamsTks = emqx_placeholder:preproc_tmpl(Params),
|
||||
parse_prepare_sql(
|
||||
T,
|
||||
Prepares,
|
||||
|
@ -375,6 +375,9 @@ parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks
|
|||
?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}),
|
||||
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks)
|
||||
end;
|
||||
Type when is_atom(Type) ->
|
||||
?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}),
|
||||
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks);
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}),
|
||||
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks)
|
||||
|
@ -389,7 +392,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens})
|
|||
undefined ->
|
||||
{SQLOrData, Params};
|
||||
Tokens ->
|
||||
{TypeOrKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
|
||||
{TypeOrKey, emqx_placeholder:proc_sql(Tokens, SQLOrData)}
|
||||
end.
|
||||
|
||||
on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) ->
|
||||
|
|
|
@ -188,7 +188,7 @@ on_batch_query(
|
|||
{error, {unrecoverable_error, batch_prepare_not_implemented}};
|
||||
TokenList ->
|
||||
{_, Datas} = lists:unzip(BatchReq),
|
||||
Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
|
||||
Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas],
|
||||
St = maps:get(BinKey, Sts),
|
||||
case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of
|
||||
{error, _Error} = Result ->
|
||||
|
@ -218,7 +218,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens})
|
|||
undefined ->
|
||||
{SQLOrData, Params};
|
||||
Tokens ->
|
||||
{Key, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
|
||||
{Key, emqx_placeholder:proc_sql(Tokens, SQLOrData)}
|
||||
end.
|
||||
|
||||
on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
|
||||
|
@ -350,7 +350,7 @@ parse_prepare_sql(Config) ->
|
|||
parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
|
||||
|
||||
parse_prepare_sql([{Key, H} | T], Prepares, Tokens) ->
|
||||
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '$n'),
|
||||
{PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '$n'),
|
||||
parse_prepare_sql(
|
||||
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
|
||||
);
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_connector_lib_tests).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
http_connectivity_ok_test() ->
|
||||
{ok, Socket} = gen_tcp:listen(0, []),
|
||||
{ok, Port} = inet:port(Socket),
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_connector_lib:http_connectivity("http://127.0.0.1:" ++ integer_to_list(Port), 1000)
|
||||
),
|
||||
gen_tcp:close(Socket).
|
||||
|
||||
http_connectivity_error_test() ->
|
||||
{ok, Socket} = gen_tcp:listen(0, []),
|
||||
{ok, Port} = inet:port(Socket),
|
||||
ok = gen_tcp:close(Socket),
|
||||
?assertEqual(
|
||||
{error, econnrefused},
|
||||
emqx_connector_lib:http_connectivity("http://127.0.0.1:" ++ integer_to_list(Port), 1000)
|
||||
).
|
||||
|
||||
tcp_connectivity_ok_test() ->
|
||||
{ok, Socket} = gen_tcp:listen(0, []),
|
||||
{ok, Port} = inet:port(Socket),
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_connector_lib:tcp_connectivity("127.0.0.1", Port, 1000)
|
||||
),
|
||||
ok = gen_tcp:close(Socket).
|
||||
|
||||
tcp_connectivity_error_test() ->
|
||||
{ok, Socket} = gen_tcp:listen(0, []),
|
||||
{ok, Port} = inet:port(Socket),
|
||||
ok = gen_tcp:close(Socket),
|
||||
?assertEqual(
|
||||
{error, econnrefused},
|
||||
emqx_connector_lib:tcp_connectivity("127.0.0.1", Port, 1000)
|
||||
).
|
|
@ -301,8 +301,8 @@ feedvar(Override, Packet, ConnInfo, ClientInfo) ->
|
|||
},
|
||||
maps:map(
|
||||
fun(_K, V) ->
|
||||
Tokens = emqx_plugin_libs_rule:preproc_tmpl(V),
|
||||
emqx_plugin_libs_rule:proc_tmpl(Tokens, Envs)
|
||||
Tokens = emqx_placeholder:preproc_tmpl(V),
|
||||
emqx_placeholder:proc_tmpl(Tokens, Envs)
|
||||
end,
|
||||
Override
|
||||
).
|
||||
|
|
|
@ -283,8 +283,8 @@ feedvar(Override, Packet, ConnInfo, ClientInfo) ->
|
|||
},
|
||||
maps:map(
|
||||
fun(_K, V) ->
|
||||
Tokens = emqx_plugin_libs_rule:preproc_tmpl(V),
|
||||
emqx_plugin_libs_rule:proc_tmpl(Tokens, Envs)
|
||||
Tokens = emqx_placeholder:preproc_tmpl(V),
|
||||
emqx_placeholder:proc_tmpl(Tokens, Envs)
|
||||
end,
|
||||
Override
|
||||
).
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
{id, "emqx_machine"},
|
||||
{description, "The EMQX Machine"},
|
||||
% strict semver, bump manually!
|
||||
{vsn, "0.2.5"},
|
||||
{vsn, "0.2.6"},
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, emqx_ctl]},
|
||||
|
|
|
@ -137,7 +137,6 @@ basic_reboot_apps() ->
|
|||
emqx_resource,
|
||||
emqx_rule_engine,
|
||||
emqx_bridge,
|
||||
emqx_plugin_libs,
|
||||
emqx_management,
|
||||
emqx_retainer,
|
||||
emqx_exhook,
|
||||
|
|
|
@ -51,7 +51,6 @@ init_per_suite(Config) ->
|
|||
emqx_resource,
|
||||
emqx_rule_engine,
|
||||
emqx_bridge,
|
||||
emqx_plugin_libs,
|
||||
emqx_management,
|
||||
emqx_retainer,
|
||||
emqx_exhook,
|
||||
|
|
|
@ -90,14 +90,14 @@ on_start(
|
|||
ServiceName =
|
||||
case maps:get(service_name, Config, undefined) of
|
||||
undefined -> undefined;
|
||||
ServiceName0 -> emqx_plugin_libs_rule:str(ServiceName0)
|
||||
ServiceName0 -> emqx_utils_conv:str(ServiceName0)
|
||||
end,
|
||||
Options = [
|
||||
{host, Host},
|
||||
{port, Port},
|
||||
{user, emqx_plugin_libs_rule:str(User)},
|
||||
{user, emqx_utils_conv:str(User)},
|
||||
{password, jamdb_secret:wrap(maps:get(password, Config, ""))},
|
||||
{sid, emqx_plugin_libs_rule:str(Sid)},
|
||||
{sid, emqx_utils_conv:str(Sid)},
|
||||
{service_name, ServiceName},
|
||||
{pool_size, maps:get(<<"pool_size">>, Config, ?DEFAULT_POOL_SIZE)},
|
||||
{timeout, ?OPT_TIMEOUT},
|
||||
|
@ -165,7 +165,7 @@ on_batch_query(
|
|||
{error, {unrecoverable_error, batch_prepare_not_implemented}};
|
||||
TokenList ->
|
||||
{_, Datas} = lists:unzip(BatchReq),
|
||||
Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
|
||||
Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas],
|
||||
St = maps:get(BinKey, Sts),
|
||||
case
|
||||
on_sql_query(InstId, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2)
|
||||
|
@ -201,7 +201,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{
|
|||
undefined ->
|
||||
{SQLOrData, Params};
|
||||
Sql ->
|
||||
{Sql, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
|
||||
{Sql, emqx_placeholder:proc_sql(Tokens, SQLOrData)}
|
||||
end
|
||||
end.
|
||||
|
||||
|
@ -265,14 +265,14 @@ connect(Opts) ->
|
|||
jamdb_oracle:start_link(Opts).
|
||||
|
||||
sql_query_to_str(SqlQuery) ->
|
||||
emqx_plugin_libs_rule:str(SqlQuery).
|
||||
emqx_utils_conv:str(SqlQuery).
|
||||
|
||||
sql_params_to_str(Params) when is_list(Params) ->
|
||||
lists:map(
|
||||
fun
|
||||
(false) -> "0";
|
||||
(true) -> "1";
|
||||
(Value) -> emqx_plugin_libs_rule:str(Value)
|
||||
(Value) -> emqx_utils_conv:str(Value)
|
||||
end,
|
||||
Params
|
||||
).
|
||||
|
@ -302,7 +302,7 @@ parse_prepare_sql(Config) ->
|
|||
parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
|
||||
|
||||
parse_prepare_sql([{Key, H} | T], Prepares, Tokens) ->
|
||||
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, ':n'),
|
||||
{PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, ':n'),
|
||||
parse_prepare_sql(
|
||||
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
|
||||
);
|
||||
|
|
|
@ -1,8 +0,0 @@
|
|||
%% -*- mode: erlang -*-
|
||||
|
||||
{deps, [
|
||||
{emqx, {path, "../emqx"}},
|
||||
{emqx_utils, {path, "../emqx_utils"}}
|
||||
]}.
|
||||
|
||||
{project_plugins, [erlfmt]}.
|
|
@ -1,8 +0,0 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_plugin_libs, [
|
||||
{description, "EMQX Plugin utility libs"},
|
||||
{vsn, "4.3.12"},
|
||||
{modules, []},
|
||||
{applications, [kernel, stdlib]},
|
||||
{env, []}
|
||||
]}.
|
|
@ -1,13 +0,0 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[ {"4.3.0",
|
||||
[ {add_module, emqx_plugin_libs_pool}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
],
|
||||
[ {"4.3.0",
|
||||
[ {delete_module, emqx_plugin_libs_pool}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
]
|
||||
}.
|
|
@ -1,17 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_plugin_libs).
|
|
@ -1,365 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_plugin_libs_rule).
|
||||
-elvis([{elvis_style, god_modules, disable}]).
|
||||
|
||||
%% preprocess and process template string with place holders
|
||||
-export([
|
||||
preproc_tmpl/1,
|
||||
proc_tmpl/2,
|
||||
proc_tmpl/3,
|
||||
preproc_cmd/1,
|
||||
proc_cmd/2,
|
||||
proc_cmd/3,
|
||||
preproc_sql/1,
|
||||
preproc_sql/2,
|
||||
proc_sql/2,
|
||||
proc_sql_param_str/2,
|
||||
proc_cql_param_str/2,
|
||||
split_insert_sql/1,
|
||||
detect_sql_type/1,
|
||||
proc_batch_sql/3,
|
||||
formalize_sql/1
|
||||
]).
|
||||
|
||||
%% type converting
|
||||
-export([
|
||||
str/1,
|
||||
bin/1,
|
||||
bool/1,
|
||||
int/1,
|
||||
float/1,
|
||||
float2str/2,
|
||||
map/1,
|
||||
utf8_bin/1,
|
||||
utf8_str/1,
|
||||
number_to_binary/1,
|
||||
atom_key/1,
|
||||
unsafe_atom_key/1
|
||||
]).
|
||||
|
||||
%% connectivity check
|
||||
-export([
|
||||
http_connectivity/1,
|
||||
http_connectivity/2,
|
||||
tcp_connectivity/2,
|
||||
tcp_connectivity/3
|
||||
]).
|
||||
|
||||
-export([
|
||||
now_ms/0,
|
||||
can_topic_match_oneof/2
|
||||
]).
|
||||
|
||||
-export_type([tmpl_token/0]).
|
||||
|
||||
-compile({no_auto_import, [float/1]}).
|
||||
|
||||
-type uri_string() :: iodata().
|
||||
|
||||
-type tmpl_token() :: list({var, binary()} | {str, binary()}).
|
||||
|
||||
-type tmpl_cmd() :: list(tmpl_token()).
|
||||
|
||||
-type prepare_statement_key() :: binary().
|
||||
|
||||
%% preprocess template string with place holders
|
||||
-spec preproc_tmpl(binary()) -> tmpl_token().
|
||||
preproc_tmpl(Str) ->
|
||||
emqx_placeholder:preproc_tmpl(Str).
|
||||
|
||||
-spec proc_tmpl(tmpl_token(), map()) -> binary().
|
||||
proc_tmpl(Tokens, Data) ->
|
||||
emqx_placeholder:proc_tmpl(Tokens, Data).
|
||||
|
||||
-spec proc_tmpl(tmpl_token(), map(), map()) -> binary() | list().
|
||||
proc_tmpl(Tokens, Data, Opts) ->
|
||||
emqx_placeholder:proc_tmpl(Tokens, Data, Opts).
|
||||
|
||||
-spec preproc_cmd(binary()) -> tmpl_cmd().
|
||||
preproc_cmd(Str) ->
|
||||
emqx_placeholder:preproc_cmd(Str).
|
||||
|
||||
-spec proc_cmd([tmpl_token()], map()) -> binary() | list().
|
||||
proc_cmd(Tokens, Data) ->
|
||||
emqx_placeholder:proc_cmd(Tokens, Data).
|
||||
-spec proc_cmd([tmpl_token()], map(), map()) -> list().
|
||||
proc_cmd(Tokens, Data, Opts) ->
|
||||
emqx_placeholder:proc_cmd(Tokens, Data, Opts).
|
||||
|
||||
%% preprocess SQL with place holders
|
||||
-spec preproc_sql(Sql :: binary()) -> {prepare_statement_key(), tmpl_token()}.
|
||||
preproc_sql(Sql) ->
|
||||
emqx_placeholder:preproc_sql(Sql).
|
||||
|
||||
-spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n' | ':n') ->
|
||||
{prepare_statement_key(), tmpl_token()}.
|
||||
preproc_sql(Sql, ReplaceWith) ->
|
||||
emqx_placeholder:preproc_sql(Sql, ReplaceWith).
|
||||
|
||||
-spec proc_sql(tmpl_token(), map()) -> list().
|
||||
proc_sql(Tokens, Data) ->
|
||||
emqx_placeholder:proc_sql(Tokens, Data).
|
||||
|
||||
-spec proc_sql_param_str(tmpl_token(), map()) -> binary().
|
||||
proc_sql_param_str(Tokens, Data) ->
|
||||
emqx_placeholder:proc_sql_param_str(Tokens, Data).
|
||||
|
||||
-spec proc_cql_param_str(tmpl_token(), map()) -> binary().
|
||||
proc_cql_param_str(Tokens, Data) ->
|
||||
emqx_placeholder:proc_cql_param_str(Tokens, Data).
|
||||
|
||||
%% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">>
|
||||
-spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when
|
||||
InsertSQL :: binary(),
|
||||
Params :: binary().
|
||||
split_insert_sql(SQL0) ->
|
||||
SQL = formalize_sql(SQL0),
|
||||
case re:split(SQL, "((?i)values)", [{return, binary}]) of
|
||||
[Part1, _, Part3] ->
|
||||
case string:trim(Part1, leading) of
|
||||
<<"insert", _/binary>> = InsertSQL ->
|
||||
{ok, {InsertSQL, Part3}};
|
||||
<<"INSERT", _/binary>> = InsertSQL ->
|
||||
{ok, {InsertSQL, Part3}};
|
||||
_ ->
|
||||
{error, not_insert_sql}
|
||||
end;
|
||||
_ ->
|
||||
{error, not_insert_sql}
|
||||
end.
|
||||
|
||||
-spec detect_sql_type(binary()) -> {ok, Type} | {error, atom()} when
|
||||
Type :: insert | select.
|
||||
detect_sql_type(SQL) ->
|
||||
case re:run(SQL, "^\\s*([a-zA-Z]+)", [{capture, all_but_first, list}]) of
|
||||
{match, [First]} ->
|
||||
Types = [select, insert],
|
||||
PropTypes = [{erlang:atom_to_list(Type), Type} || Type <- Types],
|
||||
LowFirst = string:lowercase(First),
|
||||
case proplists:lookup(LowFirst, PropTypes) of
|
||||
{LowFirst, Type} ->
|
||||
{ok, Type};
|
||||
_ ->
|
||||
{error, invalid_sql}
|
||||
end;
|
||||
_ ->
|
||||
{error, invalid_sql}
|
||||
end.
|
||||
|
||||
-spec proc_batch_sql(
|
||||
BatchReqs :: list({atom(), map()}),
|
||||
InsertPart :: binary(),
|
||||
Tokens :: tmpl_token()
|
||||
) -> InsertSQL :: binary().
|
||||
proc_batch_sql(BatchReqs, InsertPart, Tokens) ->
|
||||
ValuesPart = erlang:iolist_to_binary(
|
||||
lists:join($,, [
|
||||
proc_sql_param_str(Tokens, Msg)
|
||||
|| {_, Msg} <- BatchReqs
|
||||
])
|
||||
),
|
||||
<<InsertPart/binary, " values ", ValuesPart/binary>>.
|
||||
|
||||
formalize_sql(Input) ->
|
||||
%% 1. replace all whitespaces like '\r' '\n' or spaces to a single space char.
|
||||
SQL = re:replace(Input, "\\s+", " ", [global, {return, binary}]),
|
||||
%% 2. trims the result
|
||||
string:trim(SQL).
|
||||
|
||||
unsafe_atom_key(Key) when is_atom(Key) ->
|
||||
Key;
|
||||
unsafe_atom_key(Key) when is_binary(Key) ->
|
||||
binary_to_atom(Key, utf8);
|
||||
unsafe_atom_key(Keys = [_Key | _]) ->
|
||||
[unsafe_atom_key(SubKey) || SubKey <- Keys];
|
||||
unsafe_atom_key(Key) ->
|
||||
error({invalid_key, Key}).
|
||||
|
||||
atom_key(Key) when is_atom(Key) ->
|
||||
Key;
|
||||
atom_key(Key) when is_binary(Key) ->
|
||||
try
|
||||
binary_to_existing_atom(Key, utf8)
|
||||
catch
|
||||
error:badarg -> error({invalid_key, Key})
|
||||
end;
|
||||
%% nested keys
|
||||
atom_key(Keys = [_Key | _]) ->
|
||||
[atom_key(SubKey) || SubKey <- Keys];
|
||||
atom_key(Key) ->
|
||||
error({invalid_key, Key}).
|
||||
|
||||
-spec http_connectivity(uri_string()) -> ok | {error, Reason :: term()}.
|
||||
http_connectivity(Url) ->
|
||||
http_connectivity(Url, 3000).
|
||||
|
||||
-spec http_connectivity(uri_string(), integer()) -> ok | {error, Reason :: term()}.
|
||||
http_connectivity(Url, Timeout) ->
|
||||
case emqx_http_lib:uri_parse(Url) of
|
||||
{ok, #{host := Host, port := Port}} ->
|
||||
tcp_connectivity(Host, Port, Timeout);
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec tcp_connectivity(
|
||||
Host :: inet:socket_address() | inet:hostname(),
|
||||
Port :: inet:port_number()
|
||||
) ->
|
||||
ok | {error, Reason :: term()}.
|
||||
tcp_connectivity(Host, Port) ->
|
||||
tcp_connectivity(Host, Port, 3000).
|
||||
|
||||
-spec tcp_connectivity(
|
||||
Host :: inet:socket_address() | inet:hostname(),
|
||||
Port :: inet:port_number(),
|
||||
Timeout :: integer()
|
||||
) ->
|
||||
ok | {error, Reason :: term()}.
|
||||
tcp_connectivity(Host, Port, Timeout) ->
|
||||
case gen_tcp:connect(Host, Port, emqx_utils:ipv6_probe([]), Timeout) of
|
||||
{ok, Sock} ->
|
||||
gen_tcp:close(Sock),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
|
||||
str(Num) when is_number(Num) -> number_to_list(Num);
|
||||
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
|
||||
str(Map) when is_map(Map) -> binary_to_list(emqx_utils_json:encode(Map));
|
||||
str(List) when is_list(List) ->
|
||||
case io_lib:printable_list(List) of
|
||||
true -> List;
|
||||
false -> binary_to_list(emqx_utils_json:encode(List))
|
||||
end;
|
||||
str(Data) ->
|
||||
error({invalid_str, Data}).
|
||||
|
||||
utf8_bin(Str) when is_binary(Str); is_list(Str) ->
|
||||
unicode:characters_to_binary(Str);
|
||||
utf8_bin(Str) ->
|
||||
unicode:characters_to_binary(bin(Str)).
|
||||
|
||||
utf8_str(Str) when is_binary(Str); is_list(Str) ->
|
||||
unicode:characters_to_list(Str);
|
||||
utf8_str(Str) ->
|
||||
unicode:characters_to_list(str(Str)).
|
||||
|
||||
bin(Bin) when is_binary(Bin) -> Bin;
|
||||
bin(Num) when is_number(Num) -> number_to_binary(Num);
|
||||
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
|
||||
bin(Map) when is_map(Map) -> emqx_utils_json:encode(Map);
|
||||
bin(List) when is_list(List) ->
|
||||
case io_lib:printable_list(List) of
|
||||
true -> list_to_binary(List);
|
||||
false -> emqx_utils_json:encode(List)
|
||||
end;
|
||||
bin(Data) ->
|
||||
error({invalid_bin, Data}).
|
||||
|
||||
int(List) when is_list(List) ->
|
||||
try
|
||||
list_to_integer(List)
|
||||
catch
|
||||
error:badarg ->
|
||||
int(list_to_float(List))
|
||||
end;
|
||||
int(Bin) when is_binary(Bin) ->
|
||||
try
|
||||
binary_to_integer(Bin)
|
||||
catch
|
||||
error:badarg ->
|
||||
int(binary_to_float(Bin))
|
||||
end;
|
||||
int(Int) when is_integer(Int) -> Int;
|
||||
int(Float) when is_float(Float) -> erlang:floor(Float);
|
||||
int(true) ->
|
||||
1;
|
||||
int(false) ->
|
||||
0;
|
||||
int(Data) ->
|
||||
error({invalid_number, Data}).
|
||||
|
||||
float(List) when is_list(List) ->
|
||||
try
|
||||
list_to_float(List)
|
||||
catch
|
||||
error:badarg ->
|
||||
float(list_to_integer(List))
|
||||
end;
|
||||
float(Bin) when is_binary(Bin) ->
|
||||
try
|
||||
binary_to_float(Bin)
|
||||
catch
|
||||
error:badarg ->
|
||||
float(binary_to_integer(Bin))
|
||||
end;
|
||||
float(Num) when is_number(Num) -> erlang:float(Num);
|
||||
float(Data) ->
|
||||
error({invalid_number, Data}).
|
||||
|
||||
float2str(Float, Precision) when is_float(Float) and is_integer(Precision) ->
|
||||
float_to_binary(Float, [{decimals, Precision}, compact]).
|
||||
|
||||
map(Bin) when is_binary(Bin) ->
|
||||
case emqx_utils_json:decode(Bin, [return_maps]) of
|
||||
Map = #{} -> Map;
|
||||
_ -> error({invalid_map, Bin})
|
||||
end;
|
||||
map(List) when is_list(List) -> maps:from_list(List);
|
||||
map(Map) when is_map(Map) -> Map;
|
||||
map(Data) ->
|
||||
error({invalid_map, Data}).
|
||||
|
||||
bool(Bool) when
|
||||
Bool == true;
|
||||
Bool == <<"true">>;
|
||||
Bool == 1
|
||||
->
|
||||
true;
|
||||
bool(Bool) when
|
||||
Bool == false;
|
||||
Bool == <<"false">>;
|
||||
Bool == 0
|
||||
->
|
||||
false;
|
||||
bool(Bool) ->
|
||||
error({invalid_boolean, Bool}).
|
||||
|
||||
number_to_binary(Int) when is_integer(Int) ->
|
||||
integer_to_binary(Int);
|
||||
number_to_binary(Float) when is_float(Float) ->
|
||||
float_to_binary(Float, [{decimals, 10}, compact]).
|
||||
|
||||
number_to_list(Int) when is_integer(Int) ->
|
||||
integer_to_list(Int);
|
||||
number_to_list(Float) when is_float(Float) ->
|
||||
float_to_list(Float, [{decimals, 10}, compact]).
|
||||
|
||||
now_ms() ->
|
||||
erlang:system_time(millisecond).
|
||||
|
||||
can_topic_match_oneof(Topic, Filters) ->
|
||||
lists:any(
|
||||
fun(Fltr) ->
|
||||
emqx_topic:match(Topic, Fltr)
|
||||
end,
|
||||
Filters
|
||||
).
|
|
@ -1,84 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_plugin_libs_rule_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(PORT, 9876).
|
||||
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_http_connectivity(_) ->
|
||||
{ok, Socket} = gen_tcp:listen(?PORT, []),
|
||||
ok = emqx_plugin_libs_rule:http_connectivity(
|
||||
"http://127.0.0.1:" ++ emqx_plugin_libs_rule:str(?PORT), 1000
|
||||
),
|
||||
gen_tcp:close(Socket),
|
||||
{error, _} = emqx_plugin_libs_rule:http_connectivity(
|
||||
"http://127.0.0.1:" ++ emqx_plugin_libs_rule:str(?PORT), 1000
|
||||
).
|
||||
|
||||
t_tcp_connectivity(_) ->
|
||||
{ok, Socket} = gen_tcp:listen(?PORT, []),
|
||||
ok = emqx_plugin_libs_rule:tcp_connectivity("127.0.0.1", ?PORT, 1000),
|
||||
gen_tcp:close(Socket),
|
||||
{error, _} = emqx_plugin_libs_rule:tcp_connectivity("127.0.0.1", ?PORT, 1000).
|
||||
|
||||
t_str(_) ->
|
||||
?assertEqual("abc", emqx_plugin_libs_rule:str("abc")),
|
||||
?assertEqual("abc", emqx_plugin_libs_rule:str(abc)),
|
||||
?assertEqual("{\"a\":1}", emqx_plugin_libs_rule:str(#{a => 1})),
|
||||
?assertEqual("1", emqx_plugin_libs_rule:str(1)),
|
||||
?assertEqual("2.0", emqx_plugin_libs_rule:str(2.0)),
|
||||
?assertEqual("true", emqx_plugin_libs_rule:str(true)),
|
||||
?assertError(_, emqx_plugin_libs_rule:str({a, v})).
|
||||
|
||||
t_bin(_) ->
|
||||
?assertEqual(<<"abc">>, emqx_plugin_libs_rule:bin("abc")),
|
||||
?assertEqual(<<"abc">>, emqx_plugin_libs_rule:bin(abc)),
|
||||
?assertEqual(<<"{\"a\":1}">>, emqx_plugin_libs_rule:bin(#{a => 1})),
|
||||
?assertEqual(<<"[{\"a\":1}]">>, emqx_plugin_libs_rule:bin([#{a => 1}])),
|
||||
?assertEqual(<<"1">>, emqx_plugin_libs_rule:bin(1)),
|
||||
?assertEqual(<<"2.0">>, emqx_plugin_libs_rule:bin(2.0)),
|
||||
?assertEqual(<<"true">>, emqx_plugin_libs_rule:bin(true)),
|
||||
?assertError(_, emqx_plugin_libs_rule:bin({a, v})).
|
||||
|
||||
t_atom_key(_) ->
|
||||
_ = erlang,
|
||||
_ = port,
|
||||
?assertEqual([erlang], emqx_plugin_libs_rule:atom_key([<<"erlang">>])),
|
||||
?assertEqual([erlang, port], emqx_plugin_libs_rule:atom_key([<<"erlang">>, port])),
|
||||
?assertEqual([erlang, port], emqx_plugin_libs_rule:atom_key([<<"erlang">>, <<"port">>])),
|
||||
?assertEqual(erlang, emqx_plugin_libs_rule:atom_key(<<"erlang">>)),
|
||||
?assertError({invalid_key, {a, v}}, emqx_plugin_libs_rule:atom_key({a, v})),
|
||||
_ = xyz876gv123,
|
||||
?assertEqual([xyz876gv123, port], emqx_plugin_libs_rule:atom_key([<<"xyz876gv123">>, port])).
|
||||
|
||||
t_unsafe_atom_key(_) ->
|
||||
?assertEqual([xyz876gv], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv">>])),
|
||||
?assertEqual(
|
||||
[xyz876gv33, port],
|
||||
emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv33">>, port])
|
||||
),
|
||||
?assertEqual(
|
||||
[xyz876gv331, port1221],
|
||||
emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv331">>, <<"port1221">>])
|
||||
),
|
||||
?assertEqual(xyz876gv3312, emqx_plugin_libs_rule:unsafe_atom_key(<<"xyz876gv3312">>)).
|
|
@ -65,10 +65,10 @@ pre_process_action_args(
|
|||
) ->
|
||||
Args#{
|
||||
preprocessed_tmpl => #{
|
||||
topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
|
||||
topic => emqx_placeholder:preproc_tmpl(Topic),
|
||||
qos => preproc_vars(QoS),
|
||||
retain => preproc_vars(Retain),
|
||||
payload => emqx_plugin_libs_rule:preproc_tmpl(Payload),
|
||||
payload => emqx_placeholder:preproc_tmpl(Payload),
|
||||
user_properties => preproc_user_properties(UserProperties)
|
||||
}
|
||||
};
|
||||
|
@ -110,7 +110,7 @@ republish(
|
|||
}
|
||||
}
|
||||
) ->
|
||||
Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
|
||||
Topic = emqx_placeholder:proc_tmpl(TopicTks, Selected),
|
||||
Payload = format_msg(PayloadTks, Selected),
|
||||
QoS = replace_simple_var(QoSTks, Selected, 0),
|
||||
Retain = replace_simple_var(RetainTks, Selected, false),
|
||||
|
@ -189,7 +189,7 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
|
|||
emqx_metrics:inc_msg(Msg).
|
||||
|
||||
preproc_vars(Data) when is_binary(Data) ->
|
||||
emqx_plugin_libs_rule:preproc_tmpl(Data);
|
||||
emqx_placeholder:preproc_tmpl(Data);
|
||||
preproc_vars(Data) ->
|
||||
Data.
|
||||
|
||||
|
@ -201,13 +201,13 @@ preproc_user_properties(<<"${pub_props.'User-Property'}">>) ->
|
|||
?ORIGINAL_USER_PROPERTIES;
|
||||
preproc_user_properties(<<"${", _/binary>> = V) ->
|
||||
%% use a variable
|
||||
emqx_plugin_libs_rule:preproc_tmpl(V);
|
||||
emqx_placeholder:preproc_tmpl(V);
|
||||
preproc_user_properties(_) ->
|
||||
%% invalid, discard
|
||||
undefined.
|
||||
|
||||
replace_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
|
||||
[Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
|
||||
[Var] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
|
||||
case Var of
|
||||
%% cannot find the variable from Data
|
||||
undefined -> Default;
|
||||
|
@ -219,7 +219,7 @@ replace_simple_var(Val, _Data, _Default) ->
|
|||
format_msg([], Selected) ->
|
||||
emqx_utils_json:encode(Selected);
|
||||
format_msg(Tokens, Selected) ->
|
||||
emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected).
|
||||
emqx_placeholder:proc_tmpl(Tokens, Selected).
|
||||
|
||||
format_pub_props(UserPropertiesTks, Selected, Env) ->
|
||||
UserProperties =
|
||||
|
|
|
@ -206,7 +206,7 @@ get_rules_for_topic(Topic) ->
|
|||
[
|
||||
Rule
|
||||
|| Rule = #{from := From} <- get_rules(),
|
||||
emqx_plugin_libs_rule:can_topic_match_oneof(Topic, From)
|
||||
emqx_topic:match_any(Topic, From)
|
||||
].
|
||||
|
||||
-spec get_rules_with_same_event(Topic :: binary()) -> [rule()].
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
-module(emqx_rule_funcs).
|
||||
|
||||
-include("rule_engine.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-elvis([{elvis_style, god_modules, disable}]).
|
||||
|
@ -266,8 +265,6 @@
|
|||
]}
|
||||
).
|
||||
|
||||
-define(is_var(X), is_binary(X)).
|
||||
|
||||
%% @doc "msgid()" Func
|
||||
msgid() ->
|
||||
fun
|
||||
|
@ -631,29 +628,42 @@ do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"signed">>, <<"little">>) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
str(Data) ->
|
||||
emqx_plugin_libs_rule:bin(Data).
|
||||
emqx_utils_conv:bin(Data).
|
||||
|
||||
str_utf8(Data) when is_binary(Data); is_list(Data) ->
|
||||
unicode:characters_to_binary(Data);
|
||||
str_utf8(Data) ->
|
||||
emqx_plugin_libs_rule:utf8_bin(Data).
|
||||
unicode:characters_to_binary(str(Data)).
|
||||
|
||||
bool(Data) ->
|
||||
emqx_plugin_libs_rule:bool(Data).
|
||||
emqx_utils_conv:bool(Data).
|
||||
|
||||
int(Data) ->
|
||||
emqx_plugin_libs_rule:int(Data).
|
||||
emqx_utils_conv:int(Data).
|
||||
|
||||
float(Data) ->
|
||||
emqx_plugin_libs_rule:float(Data).
|
||||
emqx_utils_conv:float(Data).
|
||||
|
||||
float(Data, Decimals) when Decimals > 0 ->
|
||||
Data1 = ?MODULE:float(Data),
|
||||
Data1 = emqx_utils_conv:float(Data),
|
||||
list_to_float(float_to_list(Data1, [{decimals, Decimals}])).
|
||||
|
||||
float2str(Float, Precision) ->
|
||||
emqx_plugin_libs_rule:float2str(Float, Precision).
|
||||
float_to_binary(Float, [{decimals, Precision}, compact]).
|
||||
|
||||
map(Bin) when is_binary(Bin) ->
|
||||
case emqx_utils_json:decode(Bin) of
|
||||
Map = #{} ->
|
||||
Map;
|
||||
_ ->
|
||||
error(badarg, [Bin])
|
||||
end;
|
||||
map(List) when is_list(List) ->
|
||||
maps:from_list(List);
|
||||
map(Map = #{}) ->
|
||||
Map;
|
||||
map(Data) ->
|
||||
emqx_plugin_libs_rule:map(Data).
|
||||
error(badarg, [Data]).
|
||||
|
||||
bin2hexstr(Bin) when is_binary(Bin) ->
|
||||
emqx_utils:bin_to_hexstr(Bin, upper).
|
||||
|
@ -895,7 +905,7 @@ mget(Key, Map, Default) ->
|
|||
Val;
|
||||
error when is_atom(Key) ->
|
||||
%% the map may have an equivalent binary-form key
|
||||
BinKey = emqx_plugin_libs_rule:bin(Key),
|
||||
BinKey = emqx_utils_conv:bin(Key),
|
||||
case maps:find(BinKey, Map) of
|
||||
{ok, Val} -> Val;
|
||||
error -> Default
|
||||
|
@ -922,7 +932,7 @@ mput(Key, Val, Map) ->
|
|||
maps:put(Key, Val, Map);
|
||||
error when is_atom(Key) ->
|
||||
%% the map may have an equivalent binary-form key
|
||||
BinKey = emqx_plugin_libs_rule:bin(Key),
|
||||
BinKey = emqx_utils_conv:bin(Key),
|
||||
case maps:find(BinKey, Map) of
|
||||
{ok, _} -> maps:put(BinKey, Val, Map);
|
||||
error -> maps:put(Key, Val, Map)
|
||||
|
@ -1053,7 +1063,7 @@ unix_ts_to_rfc3339(Epoch) ->
|
|||
unix_ts_to_rfc3339(Epoch, <<"second">>).
|
||||
|
||||
unix_ts_to_rfc3339(Epoch, Unit) when is_integer(Epoch) ->
|
||||
emqx_plugin_libs_rule:bin(
|
||||
emqx_utils_conv:bin(
|
||||
calendar:system_time_to_rfc3339(
|
||||
Epoch, [{unit, time_unit(Unit)}]
|
||||
)
|
||||
|
@ -1090,7 +1100,7 @@ format_date(TimeUnit, Offset, FormatString) ->
|
|||
|
||||
format_date(TimeUnit, Offset, FormatString, TimeEpoch) ->
|
||||
Unit = time_unit(TimeUnit),
|
||||
emqx_plugin_libs_rule:bin(
|
||||
emqx_utils_conv:bin(
|
||||
lists:concat(
|
||||
emqx_calendar:format(TimeEpoch, Unit, Offset, FormatString)
|
||||
)
|
||||
|
|
|
@ -97,7 +97,7 @@ general_find({key, Key}, Map, _OrgData, Handler) when is_map(Map) ->
|
|||
Handler({found, {{key, Key}, Val}});
|
||||
error when is_atom(Key) ->
|
||||
%% the map may have an equivalent binary-form key
|
||||
BinKey = emqx_plugin_libs_rule:bin(Key),
|
||||
BinKey = atom_to_binary(Key),
|
||||
case maps:find(BinKey, Map) of
|
||||
{ok, Val} -> Handler({equivalent, {{key, BinKey}, Val}});
|
||||
error -> Handler(not_found)
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
|
||||
-module(emqx_rule_sqltester).
|
||||
|
||||
-include("rule_engine.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-export([
|
||||
|
@ -31,7 +30,7 @@ test(#{sql := Sql, context := Context}) ->
|
|||
case lists:all(fun is_publish_topic/1, EventTopics) of
|
||||
true ->
|
||||
%% test if the topic matches the topic filters in the rule
|
||||
case emqx_plugin_libs_rule:can_topic_match_oneof(InTopic, EventTopics) of
|
||||
case emqx_topic:match_any(InTopic, EventTopics) of
|
||||
true -> test_rule(Sql, Select, Context, EventTopics);
|
||||
false -> {error, nomatch}
|
||||
end;
|
||||
|
|
|
@ -126,7 +126,7 @@ t_int(_) ->
|
|||
?assertEqual(1, emqx_rule_funcs:int(1.0001)),
|
||||
?assertEqual(1, emqx_rule_funcs:int(true)),
|
||||
?assertEqual(0, emqx_rule_funcs:int(false)),
|
||||
?assertError({invalid_number, {a, v}}, emqx_rule_funcs:int({a, v})),
|
||||
?assertError(badarg, emqx_rule_funcs:int({a, v})),
|
||||
?assertError(_, emqx_rule_funcs:int("a")).
|
||||
|
||||
t_float(_) ->
|
||||
|
@ -137,7 +137,7 @@ t_float(_) ->
|
|||
?assertEqual(1.9, emqx_rule_funcs:float(1.9)),
|
||||
?assertEqual(1.0001, emqx_rule_funcs:float(1.0001)),
|
||||
?assertEqual(1.0000000001, emqx_rule_funcs:float(1.0000000001)),
|
||||
?assertError({invalid_number, {a, v}}, emqx_rule_funcs:float({a, v})),
|
||||
?assertError(badarg, emqx_rule_funcs:float({a, v})),
|
||||
?assertError(_, emqx_rule_funcs:float("a")).
|
||||
|
||||
t_map(_) ->
|
||||
|
@ -158,7 +158,7 @@ t_bool(_) ->
|
|||
?assertEqual(true, emqx_rule_funcs:bool(<<"true">>)),
|
||||
?assertEqual(false, emqx_rule_funcs:bool(false)),
|
||||
?assertEqual(false, emqx_rule_funcs:bool(<<"false">>)),
|
||||
?assertError({invalid_boolean, _}, emqx_rule_funcs:bool(3)).
|
||||
?assertError(badarg, emqx_rule_funcs:bool(3)).
|
||||
|
||||
t_proc_dict_put_get_del(_) ->
|
||||
?assertEqual(undefined, emqx_rule_funcs:proc_dict_get(<<"abc">>)),
|
||||
|
|
|
@ -46,7 +46,7 @@
|
|||
quote_mysql/1
|
||||
]).
|
||||
|
||||
-include_lib("emqx/include/emqx_placeholder.hrl").
|
||||
-define(PH_VAR_THIS, '$this').
|
||||
|
||||
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
|
||||
|
||||
|
@ -55,7 +55,7 @@
|
|||
%% Space and CRLF
|
||||
-define(EX_WITHE_CHARS, "\\s").
|
||||
|
||||
-type tmpl_token() :: list({var, binary()} | {str, binary()}).
|
||||
-type tmpl_token() :: list({var, ?PH_VAR_THIS | [binary()]} | {str, binary()}).
|
||||
|
||||
-type tmpl_cmd() :: list(tmpl_token()).
|
||||
|
||||
|
@ -90,8 +90,6 @@
|
|||
| {tmpl, tmpl_token()}
|
||||
| {value, term()}.
|
||||
|
||||
-dialyzer({no_improper_lists, [quote_mysql/1, escape_mysql/4, escape_prepend/4]}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -112,7 +110,7 @@ proc_tmpl(Tokens, Data) ->
|
|||
|
||||
-spec proc_tmpl(tmpl_token(), map(), proc_tmpl_opts()) -> binary() | list().
|
||||
proc_tmpl(Tokens, Data, Opts = #{return := full_binary}) ->
|
||||
Trans = maps:get(var_trans, Opts, fun emqx_plugin_libs_rule:bin/1),
|
||||
Trans = maps:get(var_trans, Opts, fun emqx_utils_conv:bin/1),
|
||||
list_to_binary(
|
||||
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Trans})
|
||||
);
|
||||
|
@ -123,11 +121,11 @@ proc_tmpl(Tokens, Data, Opts = #{return := rawlist}) ->
|
|||
({str, Str}) ->
|
||||
Str;
|
||||
({var, Phld}) when is_function(Trans, 1) ->
|
||||
Trans(get_phld_var(Phld, Data));
|
||||
Trans(lookup_var(Phld, Data));
|
||||
({var, Phld}) when is_function(Trans, 2) ->
|
||||
Trans(Phld, get_phld_var(Phld, Data));
|
||||
Trans(Phld, lookup_var(Phld, Data));
|
||||
({var, Phld}) ->
|
||||
get_phld_var(Phld, Data)
|
||||
lookup_var(Phld, Data)
|
||||
end,
|
||||
Tokens
|
||||
).
|
||||
|
@ -243,34 +241,49 @@ sql_data(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
|
|||
sql_data(Map) when is_map(Map) -> emqx_utils_json:encode(Map).
|
||||
|
||||
-spec bin(term()) -> binary().
|
||||
bin(Val) -> emqx_plugin_libs_rule:bin(Val).
|
||||
bin(Val) -> emqx_utils_conv:bin(Val).
|
||||
|
||||
-spec quote_sql(_Value) -> iolist().
|
||||
quote_sql(Str) ->
|
||||
quote_escape(Str, fun escape_sql/1).
|
||||
emqx_utils_sql:to_sql_string(Str, #{escaping => sql}).
|
||||
|
||||
-spec quote_cql(_Value) -> iolist().
|
||||
quote_cql(Str) ->
|
||||
quote_escape(Str, fun escape_cql/1).
|
||||
emqx_utils_sql:to_sql_string(Str, #{escaping => cql}).
|
||||
|
||||
-spec quote_mysql(_Value) -> iolist().
|
||||
quote_mysql(Str) when is_binary(Str) ->
|
||||
try
|
||||
escape_mysql(Str)
|
||||
catch
|
||||
throw:invalid_utf8 ->
|
||||
[<<"0x">> | binary:encode_hex(Str)]
|
||||
end;
|
||||
quote_mysql(Str) ->
|
||||
quote_escape(Str, fun escape_mysql/1).
|
||||
emqx_utils_sql:to_sql_string(Str, #{escaping => mysql}).
|
||||
|
||||
lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] ->
|
||||
Value;
|
||||
lookup_var([Prop | Rest], Data) ->
|
||||
case lookup(Prop, Data) of
|
||||
{ok, Value} ->
|
||||
lookup_var(Rest, Value);
|
||||
{error, _} ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
lookup(Prop, Data) when is_binary(Prop) ->
|
||||
case maps:get(Prop, Data, undefined) of
|
||||
undefined ->
|
||||
try
|
||||
{ok, maps:get(binary_to_existing_atom(Prop, utf8), Data)}
|
||||
catch
|
||||
error:{badkey, _} ->
|
||||
{error, undefined};
|
||||
error:badarg ->
|
||||
{error, undefined}
|
||||
end;
|
||||
Value ->
|
||||
{ok, Value}
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
get_phld_var(Phld, Data) ->
|
||||
emqx_rule_maps:nested_get(Phld, Data).
|
||||
|
||||
preproc_var_re(#{placeholders := PHs, strip_double_quote := true}) ->
|
||||
Res = [ph_to_re(PH) || PH <- PHs],
|
||||
QuoteRes = ["\"" ++ Re ++ "\"" || Re <- Res],
|
||||
|
@ -340,66 +353,11 @@ parse_nested(<<".", R/binary>>) ->
|
|||
parse_nested(R);
|
||||
parse_nested(Attr) ->
|
||||
case string:split(Attr, <<".">>, all) of
|
||||
[<<>>] -> {var, ?PH_VAR_THIS};
|
||||
[Attr] -> {var, Attr};
|
||||
Nested -> {path, [{key, P} || P <- Nested]}
|
||||
[<<>>] -> ?PH_VAR_THIS;
|
||||
Nested -> Nested
|
||||
end.
|
||||
|
||||
unwrap(<<"\"${", Val/binary>>, _StripDoubleQuote = true) ->
|
||||
binary:part(Val, {0, byte_size(Val) - 2});
|
||||
unwrap(<<"${", Val/binary>>, _StripDoubleQuote) ->
|
||||
binary:part(Val, {0, byte_size(Val) - 1}).
|
||||
|
||||
-spec quote_escape(_Value, fun((binary()) -> iodata())) -> iodata().
|
||||
quote_escape(Str, EscapeFun) when is_binary(Str) ->
|
||||
EscapeFun(Str);
|
||||
quote_escape(Str, EscapeFun) when is_list(Str) ->
|
||||
case unicode:characters_to_binary(Str) of
|
||||
Bin when is_binary(Bin) ->
|
||||
EscapeFun(Bin);
|
||||
Otherwise ->
|
||||
error(Otherwise)
|
||||
end;
|
||||
quote_escape(Str, EscapeFun) when is_atom(Str) orelse is_map(Str) ->
|
||||
EscapeFun(bin(Str));
|
||||
quote_escape(Val, _EscapeFun) ->
|
||||
bin(Val).
|
||||
|
||||
-spec escape_sql(binary()) -> iolist().
|
||||
escape_sql(S) ->
|
||||
ES = binary:replace(S, [<<"\\">>, <<"'">>], <<"\\">>, [global, {insert_replaced, 1}]),
|
||||
[$', ES, $'].
|
||||
|
||||
-spec escape_cql(binary()) -> iolist().
|
||||
escape_cql(S) ->
|
||||
ES = binary:replace(S, <<"'">>, <<"'">>, [global, {insert_replaced, 1}]),
|
||||
[$', ES, $'].
|
||||
|
||||
-spec escape_mysql(binary()) -> iolist().
|
||||
escape_mysql(S0) ->
|
||||
% https://dev.mysql.com/doc/refman/8.0/en/string-literals.html
|
||||
[$', escape_mysql(S0, 0, 0, S0), $'].
|
||||
|
||||
%% NOTE
|
||||
%% This thing looks more complicated than needed because it's optimized for as few
|
||||
%% intermediate memory (re)allocations as possible.
|
||||
escape_mysql(<<$', Rest/binary>>, I, Run, Src) ->
|
||||
escape_prepend(I, Run, Src, [<<"\\'">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
|
||||
escape_mysql(<<$\\, Rest/binary>>, I, Run, Src) ->
|
||||
escape_prepend(I, Run, Src, [<<"\\\\">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
|
||||
escape_mysql(<<0, Rest/binary>>, I, Run, Src) ->
|
||||
escape_prepend(I, Run, Src, [<<"\\0">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
|
||||
escape_mysql(<<_/utf8, Rest/binary>> = S, I, Run, Src) ->
|
||||
CWidth = byte_size(S) - byte_size(Rest),
|
||||
escape_mysql(Rest, I, Run + CWidth, Src);
|
||||
escape_mysql(<<>>, 0, _, Src) ->
|
||||
Src;
|
||||
escape_mysql(<<>>, I, Run, Src) ->
|
||||
binary:part(Src, I, Run);
|
||||
escape_mysql(_, _I, _Run, _Src) ->
|
||||
throw(invalid_utf8).
|
||||
|
||||
escape_prepend(_RunI, 0, _Src, Tail) ->
|
||||
Tail;
|
||||
escape_prepend(I, Run, Src, Tail) ->
|
||||
[binary:part(Src, I, Run) | Tail].
|
|
@ -0,0 +1,125 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_utils_conv).
|
||||
|
||||
-export([bin/1]).
|
||||
-export([str/1]).
|
||||
-export([bool/1]).
|
||||
-export([int/1]).
|
||||
-export([float/1]).
|
||||
|
||||
-compile({no_auto_import, [float/1]}).
|
||||
|
||||
-type scalar() :: binary() | number() | atom() | string().
|
||||
|
||||
-spec bin(Term) -> binary() when
|
||||
Term :: scalar() | #{scalar() => Term} | [Term].
|
||||
bin(Bin) when is_binary(Bin) -> Bin;
|
||||
bin(Num) when is_number(Num) -> number_to_binary(Num);
|
||||
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
|
||||
bin(Map) when is_map(Map) -> emqx_utils_json:encode(Map);
|
||||
bin(List) when is_list(List) ->
|
||||
case io_lib:printable_list(List) of
|
||||
true -> list_to_binary(List);
|
||||
false -> emqx_utils_json:encode(List)
|
||||
end;
|
||||
bin(Data) ->
|
||||
error({invalid_bin, Data}).
|
||||
|
||||
-spec str(Term) -> string() when
|
||||
Term :: scalar() | #{scalar() => Term} | [Term].
|
||||
str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
|
||||
str(Num) when is_number(Num) -> number_to_list(Num);
|
||||
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
|
||||
str(Map) when is_map(Map) -> binary_to_list(emqx_utils_json:encode(Map));
|
||||
str(List) when is_list(List) ->
|
||||
case io_lib:printable_list(List) of
|
||||
true -> List;
|
||||
false -> binary_to_list(emqx_utils_json:encode(List))
|
||||
end;
|
||||
str(Data) ->
|
||||
error({invalid_str, Data}).
|
||||
|
||||
-spec number_to_binary(number()) -> binary().
|
||||
number_to_binary(Int) when is_integer(Int) ->
|
||||
integer_to_binary(Int);
|
||||
number_to_binary(Float) when is_float(Float) ->
|
||||
float_to_binary(Float, [{decimals, 10}, compact]).
|
||||
|
||||
-spec number_to_list(number()) -> string().
|
||||
number_to_list(Int) when is_integer(Int) ->
|
||||
integer_to_list(Int);
|
||||
number_to_list(Float) when is_float(Float) ->
|
||||
float_to_list(Float, [{decimals, 10}, compact]).
|
||||
|
||||
-spec bool(Term) -> boolean() when
|
||||
Term :: boolean() | binary() | 0..1.
|
||||
bool(true) -> true;
|
||||
bool(<<"true">>) -> true;
|
||||
bool(N) when N == 1 -> true;
|
||||
bool(false) -> false;
|
||||
bool(<<"false">>) -> false;
|
||||
bool(N) when N == 0 -> false;
|
||||
bool(Data) -> error(badarg, [Data]).
|
||||
|
||||
-spec int(Term) -> integer() when
|
||||
Term :: binary() | string() | number() | boolean().
|
||||
int(List) when is_list(List) ->
|
||||
try
|
||||
list_to_integer(List)
|
||||
catch
|
||||
error:badarg ->
|
||||
int(list_to_float(List))
|
||||
end;
|
||||
int(Bin) when is_binary(Bin) ->
|
||||
try
|
||||
binary_to_integer(Bin)
|
||||
catch
|
||||
error:badarg ->
|
||||
int(binary_to_float(Bin))
|
||||
end;
|
||||
int(Int) when is_integer(Int) ->
|
||||
Int;
|
||||
int(Float) when is_float(Float) ->
|
||||
erlang:floor(Float);
|
||||
int(true) ->
|
||||
1;
|
||||
int(false) ->
|
||||
0;
|
||||
int(Data) ->
|
||||
error(badarg, [Data]).
|
||||
|
||||
-spec float(Term) -> float() when
|
||||
Term :: binary() | string() | number().
|
||||
float(List) when is_list(List) ->
|
||||
try
|
||||
list_to_float(List)
|
||||
catch
|
||||
error:badarg ->
|
||||
float(list_to_integer(List))
|
||||
end;
|
||||
float(Bin) when is_binary(Bin) ->
|
||||
try
|
||||
binary_to_float(Bin)
|
||||
catch
|
||||
error:badarg ->
|
||||
float(binary_to_integer(Bin))
|
||||
end;
|
||||
float(Num) when is_number(Num) ->
|
||||
erlang:float(Num);
|
||||
float(Data) ->
|
||||
error(badarg, [Data]).
|
|
@ -0,0 +1,157 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_utils_sql).
|
||||
|
||||
-export([get_statement_type/1]).
|
||||
-export([parse_insert/1]).
|
||||
|
||||
-export([to_sql_value/1]).
|
||||
-export([to_sql_string/2]).
|
||||
|
||||
-export([escape_sql/1]).
|
||||
-export([escape_cql/1]).
|
||||
-export([escape_mysql/1]).
|
||||
|
||||
-export_type([value/0]).
|
||||
|
||||
-type statement_type() :: select | insert | delete.
|
||||
-type value() :: null | binary() | number() | boolean() | [value()].
|
||||
|
||||
-dialyzer({no_improper_lists, [escape_mysql/4, escape_prepend/4]}).
|
||||
|
||||
-spec get_statement_type(iodata()) -> statement_type() | {error, unknown}.
|
||||
get_statement_type(Query) ->
|
||||
KnownTypes = #{
|
||||
<<"select">> => select,
|
||||
<<"insert">> => insert,
|
||||
<<"delete">> => delete
|
||||
},
|
||||
case re:run(Query, <<"^\\s*([a-zA-Z]+)">>, [{capture, all_but_first, binary}]) of
|
||||
{match, [Token]} ->
|
||||
maps:get(string:lowercase(Token), KnownTypes, {error, unknown});
|
||||
_ ->
|
||||
{error, unknown}
|
||||
end.
|
||||
|
||||
%% @doc Parse an INSERT SQL statement into its INSERT part and the VALUES part.
|
||||
%% SQL = <<"INSERT INTO \"abc\" (c1, c2, c3) VALUES (${a}, ${b}, ${c.prop})">>
|
||||
%% {ok, {<<"INSERT INTO \"abc\" (c1, c2, c3)">>, <<"(${a}, ${b}, ${c.prop})">>}}
|
||||
-spec parse_insert(iodata()) ->
|
||||
{ok, {_Statement :: binary(), _Rows :: binary()}} | {error, not_insert_sql}.
|
||||
parse_insert(SQL) ->
|
||||
case re:split(SQL, "((?i)values)", [{return, binary}]) of
|
||||
[Part1, _, Part3] ->
|
||||
case string:trim(Part1, leading) of
|
||||
<<"insert", _/binary>> = InsertSQL ->
|
||||
{ok, {InsertSQL, Part3}};
|
||||
<<"INSERT", _/binary>> = InsertSQL ->
|
||||
{ok, {InsertSQL, Part3}};
|
||||
_ ->
|
||||
{error, not_insert_sql}
|
||||
end;
|
||||
_ ->
|
||||
{error, not_insert_sql}
|
||||
end.
|
||||
|
||||
%% @doc Convert an Erlang term to a value that can be used primarily in
|
||||
%% prepared SQL statements.
|
||||
-spec to_sql_value(term()) -> value().
|
||||
to_sql_value(undefined) -> null;
|
||||
to_sql_value(List) when is_list(List) -> List;
|
||||
to_sql_value(Bin) when is_binary(Bin) -> Bin;
|
||||
to_sql_value(Num) when is_number(Num) -> Num;
|
||||
to_sql_value(Bool) when is_boolean(Bool) -> Bool;
|
||||
to_sql_value(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
|
||||
to_sql_value(Map) when is_map(Map) -> emqx_utils_json:encode(Map).
|
||||
|
||||
%% @doc Convert an Erlang term to a string that can be interpolated in literal
|
||||
%% SQL statements. The value is escaped if necessary.
|
||||
-spec to_sql_string(term(), Options) -> iodata() when
|
||||
Options :: #{
|
||||
escaping => cql | mysql | sql
|
||||
}.
|
||||
to_sql_string(String, #{escaping := mysql}) when is_binary(String) ->
|
||||
try
|
||||
escape_mysql(String)
|
||||
catch
|
||||
throw:invalid_utf8 ->
|
||||
[<<"0x">>, binary:encode_hex(String)]
|
||||
end;
|
||||
to_sql_string(Term, #{escaping := mysql}) ->
|
||||
maybe_escape(Term, fun escape_mysql/1);
|
||||
to_sql_string(Term, #{escaping := cql}) ->
|
||||
maybe_escape(Term, fun escape_cql/1);
|
||||
to_sql_string(Term, #{}) ->
|
||||
maybe_escape(Term, fun escape_sql/1).
|
||||
|
||||
-spec maybe_escape(_Value, fun((binary()) -> iodata())) -> iodata().
|
||||
maybe_escape(Str, EscapeFun) when is_binary(Str) ->
|
||||
EscapeFun(Str);
|
||||
maybe_escape(Str, EscapeFun) when is_list(Str) ->
|
||||
case unicode:characters_to_binary(Str) of
|
||||
Bin when is_binary(Bin) ->
|
||||
EscapeFun(Bin);
|
||||
Otherwise ->
|
||||
error(Otherwise)
|
||||
end;
|
||||
maybe_escape(Val, EscapeFun) when is_atom(Val) orelse is_map(Val) ->
|
||||
EscapeFun(emqx_utils_conv:bin(Val));
|
||||
maybe_escape(Val, _EscapeFun) ->
|
||||
emqx_utils_conv:bin(Val).
|
||||
|
||||
-spec escape_sql(binary()) -> iodata().
|
||||
escape_sql(S) ->
|
||||
% NOTE
|
||||
% This is a bit misleading: currently, escaping logic in `escape_sql/1` likely
|
||||
% won't work with pgsql since it does not support C-style escapes by default.
|
||||
% https://www.postgresql.org/docs/14/sql-syntax-lexical.html#SQL-SYNTAX-CONSTANTS
|
||||
ES = binary:replace(S, [<<"\\">>, <<"'">>], <<"\\">>, [global, {insert_replaced, 1}]),
|
||||
[$', ES, $'].
|
||||
|
||||
-spec escape_cql(binary()) -> iodata().
|
||||
escape_cql(S) ->
|
||||
ES = binary:replace(S, <<"'">>, <<"'">>, [global, {insert_replaced, 1}]),
|
||||
[$', ES, $'].
|
||||
|
||||
-spec escape_mysql(binary()) -> iodata().
|
||||
escape_mysql(S0) ->
|
||||
% https://dev.mysql.com/doc/refman/8.0/en/string-literals.html
|
||||
[$', escape_mysql(S0, 0, 0, S0), $'].
|
||||
|
||||
%% NOTE
|
||||
%% This thing looks more complicated than needed because it's optimized for as few
|
||||
%% intermediate memory (re)allocations as possible.
|
||||
escape_mysql(<<$', Rest/binary>>, I, Run, Src) ->
|
||||
escape_prepend(I, Run, Src, [<<"\\'">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
|
||||
escape_mysql(<<$\\, Rest/binary>>, I, Run, Src) ->
|
||||
escape_prepend(I, Run, Src, [<<"\\\\">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
|
||||
escape_mysql(<<0, Rest/binary>>, I, Run, Src) ->
|
||||
escape_prepend(I, Run, Src, [<<"\\0">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
|
||||
escape_mysql(<<_/utf8, Rest/binary>> = S, I, Run, Src) ->
|
||||
CWidth = byte_size(S) - byte_size(Rest),
|
||||
escape_mysql(Rest, I, Run + CWidth, Src);
|
||||
escape_mysql(<<>>, 0, _, Src) ->
|
||||
Src;
|
||||
escape_mysql(<<>>, I, Run, Src) ->
|
||||
binary:part(Src, I, Run);
|
||||
escape_mysql(_, _I, _Run, _Src) ->
|
||||
throw(invalid_utf8).
|
||||
|
||||
escape_prepend(_RunI, 0, _Src, Tail) ->
|
||||
Tail;
|
||||
escape_prepend(I, Run, Src, Tail) ->
|
||||
[binary:part(Src, I, Run) | Tail].
|
|
@ -0,0 +1,44 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_utils_conv_tests).
|
||||
|
||||
-import(emqx_utils_conv, [bin/1, str/1]).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
bin_test_() ->
|
||||
[
|
||||
?_assertEqual(<<"abc">>, bin("abc")),
|
||||
?_assertEqual(<<"abc">>, bin(abc)),
|
||||
?_assertEqual(<<"{\"a\":1}">>, bin(#{a => 1})),
|
||||
?_assertEqual(<<"[{\"a\":1}]">>, bin([#{a => 1}])),
|
||||
?_assertEqual(<<"1">>, bin(1)),
|
||||
?_assertEqual(<<"2.0">>, bin(2.0)),
|
||||
?_assertEqual(<<"true">>, bin(true)),
|
||||
?_assertError(_, bin({a, v}))
|
||||
].
|
||||
|
||||
str_test_() ->
|
||||
[
|
||||
?_assertEqual("abc", str("abc")),
|
||||
?_assertEqual("abc", str(abc)),
|
||||
?_assertEqual("{\"a\":1}", str(#{a => 1})),
|
||||
?_assertEqual("1", str(1)),
|
||||
?_assertEqual("2.0", str(2.0)),
|
||||
?_assertEqual("true", str(true)),
|
||||
?_assertError(_, str({a, v}))
|
||||
].
|
|
@ -225,9 +225,9 @@ start_producer(
|
|||
msg => "hstreamdb connector: producer started"
|
||||
}),
|
||||
EnableBatch = maps:get(enable_batch, Options, false),
|
||||
Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin),
|
||||
Payload = emqx_placeholder:preproc_tmpl(PayloadBin),
|
||||
OrderingKeyBin = maps:get(ordering_key, Options, <<"">>),
|
||||
OrderingKey = emqx_plugin_libs_rule:preproc_tmpl(OrderingKeyBin),
|
||||
OrderingKey = emqx_placeholder:preproc_tmpl(OrderingKeyBin),
|
||||
State = #{
|
||||
client => Client,
|
||||
producer => Producer,
|
||||
|
@ -254,8 +254,8 @@ start_producer(
|
|||
end.
|
||||
|
||||
to_record(OrderingKeyTmpl, PayloadTmpl, Data) ->
|
||||
OrderingKey = emqx_plugin_libs_rule:proc_tmpl(OrderingKeyTmpl, Data),
|
||||
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTmpl, Data),
|
||||
OrderingKey = emqx_placeholder:proc_tmpl(OrderingKeyTmpl, Data),
|
||||
Payload = emqx_placeholder:proc_tmpl(PayloadTmpl, Data),
|
||||
to_record(OrderingKey, Payload).
|
||||
|
||||
to_record(OrderingKey, Payload) when is_binary(OrderingKey) ->
|
||||
|
|
|
@ -54,7 +54,7 @@ on_query(InstanceId, {send_message, Message0}, State) ->
|
|||
connector_state := ConnectorState
|
||||
} = State,
|
||||
NewConnectorState = ConnectorState#{
|
||||
collection => emqx_plugin_libs_rule:proc_tmpl(CollectionTemplate, Message0)
|
||||
collection => emqx_placeholder:proc_tmpl(CollectionTemplate, Message0)
|
||||
},
|
||||
Message = render_message(PayloadTemplate, Message0),
|
||||
Res = emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, NewConnectorState),
|
||||
|
@ -73,7 +73,7 @@ on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) ->
|
|||
preprocess_template(undefined = _PayloadTemplate) ->
|
||||
undefined;
|
||||
preprocess_template(PayloadTemplate) ->
|
||||
emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate).
|
||||
emqx_placeholder:preproc_tmpl(PayloadTemplate).
|
||||
|
||||
render_message(undefined = _PayloadTemplate, Message) ->
|
||||
Message;
|
||||
|
@ -99,14 +99,14 @@ format_data(PayloadTks, Msg) ->
|
|||
case maps:size(PreparedTupleMap) of
|
||||
% If no tuples were found simply proceed with the json decoding and be done with it
|
||||
0 ->
|
||||
emqx_utils_json:decode(emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Msg), [return_maps]);
|
||||
emqx_utils_json:decode(emqx_placeholder:proc_tmpl(PayloadTks, Msg), [return_maps]);
|
||||
_ ->
|
||||
% If tuples were found, replace the tuple values with the references created, run
|
||||
% the modified message through the json parser, and then at the end replace the
|
||||
% references with the actual tuple values.
|
||||
ProcessedMessage = replace_message_values_with_references(Msg, PreparedTupleMap),
|
||||
DecodedMap = emqx_utils_json:decode(
|
||||
emqx_plugin_libs_rule:proc_tmpl(PayloadTks, ProcessedMessage), [return_maps]
|
||||
emqx_placeholder:proc_tmpl(PayloadTks, ProcessedMessage), [return_maps]
|
||||
),
|
||||
populate_map_with_tuple_values(PreparedTupleMap, DecodedMap)
|
||||
end.
|
||||
|
|
|
@ -126,13 +126,13 @@ process_batch_data(BatchData, CommandTemplate) ->
|
|||
proc_command_template(CommandTemplate, Msg) ->
|
||||
lists:map(
|
||||
fun(ArgTks) ->
|
||||
emqx_plugin_libs_rule:proc_tmpl(ArgTks, Msg, #{return => full_binary})
|
||||
emqx_placeholder:proc_tmpl(ArgTks, Msg, #{return => full_binary})
|
||||
end,
|
||||
CommandTemplate
|
||||
).
|
||||
|
||||
preproc_command_template(CommandTemplate) ->
|
||||
lists:map(
|
||||
fun emqx_plugin_libs_rule:preproc_tmpl/1,
|
||||
fun emqx_placeholder:preproc_tmpl/1,
|
||||
CommandTemplate
|
||||
).
|
||||
|
|
1
mix.exs
1
mix.exs
|
@ -352,7 +352,6 @@ defmodule EMQXUmbrella.MixProject do
|
|||
[
|
||||
mnesia: :load,
|
||||
ekka: :load,
|
||||
emqx_plugin_libs: :load,
|
||||
esasl: :load,
|
||||
observer_cli: :permanent,
|
||||
tools: :permanent,
|
||||
|
|
|
@ -409,7 +409,6 @@ relx_apps(ReleaseType, Edition) ->
|
|||
[
|
||||
{mnesia, load},
|
||||
{ekka, load},
|
||||
{emqx_plugin_libs, load},
|
||||
{esasl, load},
|
||||
observer_cli,
|
||||
tools,
|
||||
|
|
Loading…
Reference in New Issue