Merge remote-tracking branch 'origin/release-51' into 0610-merge-release-51-to-master

This commit is contained in:
Zaiming (Stone) Shi 2023-06-10 12:23:55 +02:00
commit 97850de524
94 changed files with 1281 additions and 1037 deletions

1
.github/CODEOWNERS vendored
View File

@ -11,7 +11,6 @@
/apps/emqx_ft/ @emqx/emqx-review-board @savonarola @keynslug
/apps/emqx_gateway/ @emqx/emqx-review-board @lafirest
/apps/emqx_management/ @emqx/emqx-review-board @lafirest @sstrigler
/apps/emqx_plugin_libs/ @emqx/emqx-review-board @lafirest
/apps/emqx_plugins/ @emqx/emqx-review-board @JimMoen
/apps/emqx_prometheus/ @emqx/emqx-review-board @JimMoen
/apps/emqx_psk/ @emqx/emqx-review-board @lafirest

View File

@ -32,10 +32,10 @@
%% `apps/emqx/src/bpapi/README.md'
%% Opensource edition
-define(EMQX_RELEASE_CE, "5.1.0-alpha.3").
-define(EMQX_RELEASE_CE, "5.1.0-alpha.4").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.1.0-alpha.3").
-define(EMQX_RELEASE_EE, "5.1.0-alpha.4").
%% The HTTP API version
-define(EMQX_API_VERSION, "5.0").

View File

@ -28,6 +28,7 @@
{emqx_management,2}.
{emqx_management,3}.
{emqx_management,4}.
{emqx_metrics,1}.
{emqx_mgmt_api_plugins,1}.
{emqx_mgmt_api_plugins,2}.
{emqx_mgmt_cluster,1}.
@ -38,7 +39,6 @@
{emqx_node_rebalance_evacuation,1}.
{emqx_node_rebalance_status,1}.
{emqx_persistent_session,1}.
{emqx_plugin_libs,1}.
{emqx_plugins,1}.
{emqx_prometheus,1}.
{emqx_resource,1}.

View File

@ -485,7 +485,8 @@ to_server_opts(Type, Opts) ->
cacertfile => Path(cacertfile),
ciphers => Ciphers,
versions => Versions
})
}),
Versions
).
%% @doc Convert hocon-checked tls client options (map()) to
@ -510,19 +511,22 @@ to_client_opts(Type, Opts) ->
SNI = ensure_sni(Get(server_name_indication)),
Versions = integral_versions(Type, Get(versions)),
Ciphers = integral_ciphers(Versions, Get(ciphers)),
filter([
{keyfile, KeyFile},
{certfile, CertFile},
{cacertfile, CAFile},
{verify, Verify},
{server_name_indication, SNI},
{versions, Versions},
{ciphers, Ciphers},
{reuse_sessions, Get(reuse_sessions)},
{depth, Get(depth)},
{password, ensure_str(Get(password))},
{secure_renegotiate, Get(secure_renegotiate)}
]);
filter(
[
{keyfile, KeyFile},
{certfile, CertFile},
{cacertfile, CAFile},
{verify, Verify},
{server_name_indication, SNI},
{versions, Versions},
{ciphers, Ciphers},
{reuse_sessions, Get(reuse_sessions)},
{depth, Get(depth)},
{password, ensure_str(Get(password))},
{secure_renegotiate, Get(secure_renegotiate)}
],
Versions
);
false ->
[]
end.
@ -552,10 +556,35 @@ resolve_cert_path_for_read_strict(Path) ->
resolve_cert_path_for_read(Path) ->
emqx_schema:naive_env_interpolation(Path).
filter([]) -> [];
filter([{_, undefined} | T]) -> filter(T);
filter([{_, ""} | T]) -> filter(T);
filter([H | T]) -> [H | filter(T)].
filter([], _) ->
[];
filter([{_, undefined} | T], Versions) ->
filter(T, Versions);
filter([{_, ""} | T], Versions) ->
filter(T, Versions);
filter([{K, V} | T], Versions) ->
case tls_option_compatible_versions(K) of
all ->
[{K, V} | filter(T, Versions)];
CompatibleVersions ->
case CompatibleVersions -- (CompatibleVersions -- Versions) of
[] ->
filter(T, Versions);
_ ->
[{K, V} | filter(T, Versions)]
end
end.
tls_option_compatible_versions(reuse_sessions) ->
[dtlsv1, 'dtlsv1.2', 'tlsv1', 'tlsv1.1', 'tlsv1.2'];
tls_option_compatible_versions(secure_renegotiate) ->
[dtlsv1, 'dtlsv1.2', 'tlsv1', 'tlsv1.1', 'tlsv1.2'];
tls_option_compatible_versions(user_lookup_fun) ->
[dtlsv1, 'dtlsv1.2', 'tlsv1', 'tlsv1.1', 'tlsv1.2'];
tls_option_compatible_versions(client_renegotiation) ->
[dtlsv1, 'dtlsv1.2', 'tlsv1', 'tlsv1.1', 'tlsv1.2'];
tls_option_compatible_versions(_) ->
all.
-spec fuzzy_map_get(atom() | binary(), map(), any()) -> any().
fuzzy_map_get(Key, Options, Default) ->

View File

@ -21,6 +21,7 @@
%% APIs
-export([
match/2,
match_any/2,
validate/1,
validate/2,
levels/1,
@ -86,6 +87,12 @@ match([_H1 | _], []) ->
match([], [_H | _T2]) ->
false.
-spec match_any(Name, [Filter]) -> boolean() when
Name :: topic() | words(),
Filter :: topic() | words().
match_any(Topic, Filters) ->
lists:any(fun(Filter) -> match(Topic, Filter) end, Filters).
%% @doc Validate topic name or filter
-spec validate(topic() | {name | filter, topic()}) -> true.
validate(Topic) when is_binary(Topic) ->

View File

@ -14,25 +14,26 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_plugin_libs_proto_v1).
-module(emqx_metrics_proto_v1).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
get_metrics/3
get_metrics/4
]).
-include_lib("emqx/include/bpapi.hrl").
-include("bpapi.hrl").
introduced_in() ->
"5.0.0".
"5.1.0".
-spec get_metrics(
node(),
[node()],
emqx_metrics_worker:handler_name(),
emqx_metrics_worker:metric_id()
) -> emqx_metrics_worker:metrics() | {badrpc, _}.
get_metrics(Node, HandlerName, MetricId) ->
rpc:call(Node, emqx_metrics_worker, get_metrics, [HandlerName, MetricId]).
emqx_metrics_worker:metric_id(),
timeout()
) -> emqx_rpc:erpc_multicall(emqx_metrics_worker:metrics()).
get_metrics(Nodes, HandlerName, MetricId, Timeout) ->
erpc:multicall(Nodes, emqx_metrics_worker, get_metrics, [HandlerName, MetricId], Timeout).

View File

@ -51,8 +51,14 @@
"gen_rpc, recon, redbug, observer_cli, snabbkaffe, ekka, mria, amqp_client, rabbit_common"
).
-define(IGNORED_MODULES, "emqx_rpc").
-define(FORCE_DELETED_MODULES, [emqx_statsd, emqx_statsd_proto_v1]).
-define(FORCE_DELETED_APIS, [{emqx_statsd, 1}]).
-define(FORCE_DELETED_MODULES, [
emqx_statsd,
emqx_statsd_proto_v1
]).
-define(FORCE_DELETED_APIS, [
{emqx_statsd, 1},
{emqx_plugin_libs, 1}
]).
%% List of known RPC backend modules:
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
%% List of known functions also known to do RPC:

View File

@ -224,6 +224,71 @@ ssl_file_deterministic_names_test() ->
),
_ = file:del_dir_r(filename:join(["/tmp", ?FUNCTION_NAME])).
to_client_opts_test() ->
VersionsAll = [tlsv1, 'tlsv1.1', 'tlsv1.2', 'tlsv1.3'],
Versions13Only = ['tlsv1.3'],
Options = #{
enable => true,
verify => "Verify",
server_name_indication => "SNI",
ciphers => "Ciphers",
depth => "depth",
password => "password",
versions => VersionsAll,
secure_renegotiate => "secure_renegotiate",
reuse_sessions => "reuse_sessions"
},
Expected1 = lists:usort(maps:keys(Options) -- [enable]),
?assertEqual(
Expected1, lists:usort(proplists:get_keys(emqx_tls_lib:to_client_opts(tls, Options)))
),
Expected2 =
lists:usort(
maps:keys(Options) --
[enable, reuse_sessions, secure_renegotiate]
),
?assertEqual(
Expected2,
lists:usort(
proplists:get_keys(
emqx_tls_lib:to_client_opts(tls, Options#{versions := Versions13Only})
)
)
),
Expected3 = lists:usort(maps:keys(Options) -- [enable, depth, password]),
?assertEqual(
Expected3,
lists:usort(
proplists:get_keys(
emqx_tls_lib:to_client_opts(tls, Options#{depth := undefined, password := ""})
)
)
).
to_server_opts_test() ->
VersionsAll = [tlsv1, 'tlsv1.1', 'tlsv1.2', 'tlsv1.3'],
Versions13Only = ['tlsv1.3'],
Options = #{
verify => "Verify",
ciphers => "Ciphers",
versions => VersionsAll,
user_lookup_fun => "funfunfun",
client_renegotiation => "client_renegotiation"
},
Expected1 = lists:usort(maps:keys(Options)),
?assertEqual(
Expected1, lists:usort(proplists:get_keys(emqx_tls_lib:to_server_opts(tls, Options)))
),
Expected2 = lists:usort(maps:keys(Options) -- [user_lookup_fun, client_renegotiation]),
?assertEqual(
Expected2,
lists:usort(
proplists:get_keys(
emqx_tls_lib:to_server_opts(tls, Options#{versions := Versions13Only})
)
)
).
bin(X) -> iolist_to_binary(X).
test_key() ->

View File

@ -225,21 +225,19 @@ without_password(Credential, [Name | Rest]) ->
without_password(Credential, Rest)
end.
urlencode_var({var, _} = Var, Value) ->
emqx_http_lib:uri_encode(handle_var(Var, Value));
urlencode_var(Var, Value) ->
handle_var(Var, Value).
emqx_http_lib:uri_encode(handle_var(Var, Value)).
handle_var({var, _Name}, undefined) ->
handle_var(_Name, undefined) ->
<<>>;
handle_var({var, <<"peerhost">>}, PeerHost) ->
handle_var([<<"peerhost">>], PeerHost) ->
emqx_placeholder:bin(inet:ntoa(PeerHost));
handle_var(_, Value) ->
emqx_placeholder:bin(Value).
handle_sql_var({var, _Name}, undefined) ->
handle_sql_var(_Name, undefined) ->
<<>>;
handle_sql_var({var, <<"peerhost">>}, PeerHost) ->
handle_sql_var([<<"peerhost">>], PeerHost) ->
emqx_placeholder:bin(inet:ntoa(PeerHost));
handle_sql_var(_, Value) ->
emqx_placeholder:sql_data(Value).

View File

@ -188,21 +188,19 @@ convert_client_var({dn, DN}) -> {cert_subject, DN};
convert_client_var({protocol, Proto}) -> {proto_name, Proto};
convert_client_var(Other) -> Other.
urlencode_var({var, _} = Var, Value) ->
emqx_http_lib:uri_encode(handle_var(Var, Value));
urlencode_var(Var, Value) ->
handle_var(Var, Value).
emqx_http_lib:uri_encode(handle_var(Var, Value)).
handle_var({var, _Name}, undefined) ->
handle_var(_Name, undefined) ->
<<>>;
handle_var({var, <<"peerhost">>}, IpAddr) ->
handle_var([<<"peerhost">>], IpAddr) ->
inet_parse:ntoa(IpAddr);
handle_var(_Name, Value) ->
emqx_placeholder:bin(Value).
handle_sql_var({var, _Name}, undefined) ->
handle_sql_var(_Name, undefined) ->
<<>>;
handle_sql_var({var, <<"peerhost">>}, IpAddr) ->
handle_sql_var([<<"peerhost">>], IpAddr) ->
inet_parse:ntoa(IpAddr);
handle_sql_var(_Name, Value) ->
emqx_placeholder:sql_data(Value).

View File

@ -81,7 +81,7 @@ t_compile(_) ->
{{127, 0, 0, 1}, {127, 0, 0, 1}, 32},
{{192, 168, 1, 0}, {192, 168, 1, 255}, 24}
]},
subscribe, [{pattern, [{var, {var, <<"clientid">>}}]}]},
subscribe, [{pattern, [{var, [<<"clientid">>]}]}]},
emqx_authz_rule:compile(?SOURCE3)
),
@ -99,14 +99,14 @@ t_compile(_) ->
{clientid, {re_pattern, _, _, _, _}}
]},
publish, [
{pattern, [{var, {var, <<"username">>}}]}, {pattern, [{var, {var, <<"clientid">>}}]}
{pattern, [{var, [<<"username">>]}]}, {pattern, [{var, [<<"clientid">>]}]}
]},
emqx_authz_rule:compile(?SOURCE5)
),
?assertEqual(
{allow, {username, {eq, <<"test">>}}, publish, [
{pattern, [{str, <<"t/foo">>}, {var, {var, <<"username">>}}, {str, <<"boo">>}]}
{pattern, [{str, <<"t/foo">>}, {var, [<<"username">>]}, {str, <<"boo">>}]}
]},
emqx_authz_rule:compile(?SOURCE6)
),

View File

@ -273,10 +273,10 @@ proc_cql_params(
%% assert
_PreparedKey = maps:get(PreparedKey0, Prepares),
Tokens = maps:get(PreparedKey0, ParamsTokens),
{PreparedKey0, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))};
{PreparedKey0, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))};
proc_cql_params(query, SQL, Params, _State) ->
{SQL1, Tokens} = emqx_plugin_libs_rule:preproc_sql(SQL, '?'),
{SQL1, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}.
{SQL1, Tokens} = emqx_placeholder:preproc_sql(SQL, '?'),
{SQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}.
exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
Type == query; Type == prepared_query
@ -403,7 +403,7 @@ parse_prepare_cql(_) ->
#{prepare_cql => #{}, params_tokens => #{}}.
parse_prepare_cql([{Key, H} | T], Prepares, Tokens) ->
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '?'),
{PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '?'),
parse_prepare_cql(
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
);

View File

@ -193,7 +193,7 @@ prepare_sql_templates(#{
batch_value_separator := Separator
}) ->
InsertTemplate =
emqx_plugin_libs_rule:preproc_tmpl(Template),
emqx_placeholder:preproc_tmpl(Template),
BulkExtendInsertTemplate =
prepare_sql_bulk_extend_template(Template, Separator),
#{
@ -210,9 +210,9 @@ prepare_sql_bulk_extend_template(Template, Separator) ->
%% Add separator before ValuesTemplate so that one can append it
%% to an insert template
ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]),
emqx_plugin_libs_rule:preproc_tmpl(ExtendParamTemplate).
emqx_placeholder:preproc_tmpl(ExtendParamTemplate).
%% This function is similar to emqx_plugin_libs_rule:split_insert_sql/1 but can
%% This function is similar to emqx_utils_sql:parse_insert/1 but can
%% also handle Clickhouse's SQL extension for INSERT statments that allows the
%% user to specify different formats:
%%
@ -363,7 +363,7 @@ on_query(
transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL).
get_sql(send_message, #{send_message_template := PreparedSQL}, Data) ->
emqx_plugin_libs_rule:proc_tmpl(PreparedSQL, Data);
emqx_placeholder:proc_tmpl(PreparedSQL, Data);
get_sql(_, _, SQL) ->
SQL.
@ -421,10 +421,10 @@ objects_to_sql(
}
) ->
%% Prepare INSERT-statement and the first row after VALUES
InsertStatementHead = emqx_plugin_libs_rule:proc_tmpl(InsertTemplate, FirstObject),
InsertStatementHead = emqx_placeholder:proc_tmpl(InsertTemplate, FirstObject),
FormatObjectDataFunction =
fun(Object) ->
emqx_plugin_libs_rule:proc_tmpl(BulkExtendInsertTemplate, Object)
emqx_placeholder:proc_tmpl(BulkExtendInsertTemplate, Object)
end,
InsertStatementTail = lists:map(FormatObjectDataFunction, RemainingObjects),
CompleteStatement = erlang:iolist_to_binary([InsertStatementHead, InsertStatementTail]),
@ -450,10 +450,14 @@ execute_sql_in_clickhouse_server_using_connection(Connection, SQL) ->
%% This function transforms the result received from clickhouse to something
%% that is a little bit more readable and creates approprieate log messages
transform_and_log_clickhouse_result({ok, 200, <<"">>} = _ClickhouseResult, _, _) ->
transform_and_log_clickhouse_result({ok, ResponseCode, <<"">>} = _ClickhouseResult, _, _) when
ResponseCode =:= 200; ResponseCode =:= 204
->
snabbkaffe_log_return(ok),
ok;
transform_and_log_clickhouse_result({ok, 200, Data}, _, _) ->
transform_and_log_clickhouse_result({ok, ResponseCode, Data}, _, _) when
ResponseCode =:= 200; ResponseCode =:= 204
->
Result = {ok, Data},
snabbkaffe_log_return(Result),
Result;
@ -464,13 +468,58 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) ->
sql => SQL,
reason => ClickhouseErrorResult
}),
case ClickhouseErrorResult of
{error, ecpool_empty} ->
{error, {recoverable_error, ecpool_empty}};
_ ->
{error, ClickhouseErrorResult}
case is_recoverable_error(ClickhouseErrorResult) of
%% TODO: The hackeny errors that the clickhouse library forwards are
%% very loosely defined. We should try to make sure that the following
%% handles all error cases that we need to handle as recoverable_error
true ->
?SLOG(warning, #{
msg => "clickhouse connector: sql query failed (recoverable)",
recoverable_error => true,
connector => ResourceID,
sql => SQL,
reason => ClickhouseErrorResult
}),
to_recoverable_error(ClickhouseErrorResult);
false ->
?SLOG(error, #{
msg => "clickhouse connector: sql query failed (unrecoverable)",
recoverable_error => false,
connector => ResourceID,
sql => SQL,
reason => ClickhouseErrorResult
}),
to_error_tuple(ClickhouseErrorResult)
end.
to_recoverable_error({error, Reason}) ->
{error, {recoverable_error, Reason}};
to_recoverable_error(Error) ->
{error, {recoverable_error, Error}}.
to_error_tuple({error, Reason}) ->
{error, {unrecoverable_error, Reason}};
to_error_tuple(Error) ->
{error, {unrecoverable_error, Error}}.
is_recoverable_error({error, Reason}) ->
is_recoverable_error_reason(Reason);
is_recoverable_error(_) ->
false.
is_recoverable_error_reason(ecpool_empty) ->
true;
is_recoverable_error_reason(econnrefused) ->
true;
is_recoverable_error_reason(closed) ->
true;
is_recoverable_error_reason({closed, _PartialBody}) ->
true;
is_recoverable_error_reason(disconnected) ->
true;
is_recoverable_error_reason(_) ->
false.
snabbkaffe_log_return(_Result) ->
?tp(
clickhouse_connector_query_return,

View File

@ -17,7 +17,6 @@
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
@ -64,8 +63,6 @@ fields(config) ->
callback_mode() -> always_sync.
is_buffer_supported() -> false.
on_start(
InstanceId,
#{
@ -200,7 +197,7 @@ parse_template(Config) ->
parse_template(maps:to_list(Templates), #{}).
parse_template([{Key, H} | T], Templates) ->
ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(H),
ParamsTks = emqx_placeholder:preproc_tmpl(H),
parse_template(
T,
Templates#{Key => ParamsTks}

View File

@ -144,7 +144,7 @@ apply_template({Key, Msg} = Req, Templates) ->
undefined ->
Req;
Template ->
{Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)}
{Key, emqx_placeholder:proc_tmpl(Template, Msg)}
end;
%% now there is no batch delete, so
%% 1. we can simply replace the `send_message` to `put`

View File

@ -22,8 +22,7 @@
on_query_async/4,
on_batch_query/3,
on_batch_query_async/4,
on_get_status/2,
is_buffer_supported/0
on_get_status/2
]).
-export([reply_delegator/3]).
@ -40,7 +39,7 @@
connect_timeout := timer:time(),
jwt_config := emqx_connector_jwt:jwt_config(),
max_retries := non_neg_integer(),
payload_template := emqx_plugin_libs_rule:tmpl_token(),
payload_template := emqx_placeholder:tmpl_token(),
pool_name := binary(),
project_id := binary(),
pubsub_topic := binary(),
@ -56,8 +55,6 @@
%% emqx_resource API
%%-------------------------------------------------------------------------------------------------
is_buffer_supported() -> false.
callback_mode() -> async_if_possible.
-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.
@ -104,7 +101,7 @@ on_start(
connect_timeout => ConnectTimeout,
jwt_config => JWTConfig,
max_retries => MaxRetries,
payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate),
payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
pool_name => ResourceId,
project_id => ProjectId,
pubsub_topic => PubSubTopic,
@ -294,7 +291,7 @@ encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) ->
Interpolated =
case PayloadTemplate of
[] -> emqx_utils_json:encode(Selected);
_ -> emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, Selected)
_ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected)
end,
#{data => base64:encode(Interpolated)}.

View File

@ -436,7 +436,7 @@ to_config([Item0 | Rest], Acc, Precision) ->
Ts0 = maps:get(timestamp, Item0, undefined),
{Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision),
Item = #{
measurement => emqx_plugin_libs_rule:preproc_tmpl(maps:get(measurement, Item0)),
measurement => emqx_placeholder:preproc_tmpl(maps:get(measurement, Item0)),
timestamp => Ts,
precision => {FromPrecision, ToPrecision},
tags => to_kv_config(maps:get(tags, Item0)),
@ -458,18 +458,18 @@ preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) ->
preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) ->
preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision);
preproc_tmpl_timestamp(<<?DEFAULT_TIMESTAMP_TMPL>> = Ts, Precision) ->
{emqx_plugin_libs_rule:preproc_tmpl(Ts), ms, Precision};
{emqx_placeholder:preproc_tmpl(Ts), ms, Precision};
preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) ->
%% a placehold is in use. e.g. ${payload.my_timestamp}
%% we can only hope it the value will be of the same precision in the configs
{emqx_plugin_libs_rule:preproc_tmpl(Ts), Precision, Precision}.
{emqx_placeholder:preproc_tmpl(Ts), Precision, Precision}.
to_kv_config(KVfields) ->
maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)).
to_maps_config(K, V, Res) ->
NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)),
NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)),
NK = emqx_placeholder:preproc_tmpl(bin(K)),
NV = emqx_placeholder:preproc_tmpl(bin(V)),
Res#{NK => NV}.
%% -------------------------------------------------------------------------------------------------
@ -505,7 +505,7 @@ parse_batch_data(InstId, BatchData, SyntaxLines) ->
fields := [{binary(), binary()}],
measurement := binary(),
tags := [{binary(), binary()}],
timestamp := emqx_plugin_libs_rule:tmpl_token() | integer(),
timestamp := emqx_placeholder:tmpl_token() | integer(),
precision := {From :: ts_precision(), To :: ts_precision()}
}
]) -> {ok, [map()]} | {error, term()}.
@ -526,7 +526,7 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error
is_list(Ts)
->
TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
case parse_timestamp(emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions)) of
case parse_timestamp(emqx_placeholder:proc_tmpl(Ts, Data, TransOptions)) of
{ok, TsInt} ->
Item1 = Item#{timestamp => TsInt},
continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc);
@ -573,7 +573,7 @@ line_to_point(
{_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
{_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
maps:without([precision], Item#{
measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data),
measurement => emqx_placeholder:proc_tmpl(Measurement, Data),
tags => EncodedTags,
fields => EncodedFields,
timestamp => maybe_convert_time_unit(Ts, Precision)
@ -590,8 +590,8 @@ time_unit(ns) -> nanosecond.
maps_config_to_data(K, V, {Data, Res}) ->
KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
NK0 = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions),
NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, VTransOptions),
NK0 = emqx_placeholder:proc_tmpl(K, Data, KTransOptions),
NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions),
case {NK0, NV} of
{[undefined], _} ->
{Data, Res};
@ -637,7 +637,7 @@ value_type(Val) ->
Val.
key_filter(undefined) -> undefined;
key_filter(Value) -> emqx_plugin_libs_rule:bin(Value).
key_filter(Value) -> emqx_utils_conv:bin(Value).
data_filter(undefined) -> undefined;
data_filter(Int) when is_integer(Int) -> Int;
@ -645,7 +645,7 @@ data_filter(Number) when is_number(Number) -> Number;
data_filter(Bool) when is_boolean(Bool) -> Bool;
data_filter(Data) -> bin(Data).
bin(Data) -> emqx_plugin_libs_rule:bin(Data).
bin(Data) -> emqx_utils_conv:bin(Data).
%% helper funcs
log_error_points(InstId, Errs) ->

View File

@ -185,7 +185,7 @@ preproc_data(
timestamp => maybe_preproc_tmpl(
maps:get(<<"timestamp">>, Data, <<"now">>)
),
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
measurement => emqx_placeholder:preproc_tmpl(Measurement),
data_type => DataType,
value => maybe_preproc_tmpl(Value)
}
@ -203,7 +203,7 @@ preproc_data(_NoMatch, Acc) ->
Acc.
maybe_preproc_tmpl(Value) when is_binary(Value) ->
emqx_plugin_libs_rule:preproc_tmpl(Value);
emqx_placeholder:preproc_tmpl(Value);
maybe_preproc_tmpl(Value) ->
Value.
@ -225,7 +225,7 @@ proc_data(PreProcessedData, Msg) ->
) ->
#{
timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Msg),
measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
data_type => DataType,
value => proc_value(DataType, ValueTkn, Msg)
}
@ -236,7 +236,7 @@ proc_data(PreProcessedData, Msg) ->
iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
Timestamp;
iot_timestamp(TimestampTkn, Msg, Nows) ->
iot_timestamp(emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows).
iot_timestamp(emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
iot_timestamp(Timestamp, #{now_ms := NowMs}) when
Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
@ -250,7 +250,7 @@ iot_timestamp(Timestamp, _) when is_binary(Timestamp) ->
binary_to_integer(Timestamp).
proc_value(<<"TEXT">>, ValueTkn, Msg) ->
case emqx_plugin_libs_rule:proc_tmpl(ValueTkn, Msg) of
case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of
<<"undefined">> -> null;
Val -> Val
end;
@ -262,7 +262,7 @@ proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> ->
convert_float(replace_var(ValueTkn, Msg)).
replace_var(Tokens, Data) when is_list(Tokens) ->
[Val] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
[Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
Val;
replace_var(Val, _Data) ->
Val.
@ -410,8 +410,8 @@ device_id(Message, Payloads, State) ->
%% [FIXME] there could be conflicting device-ids in the Payloads
maps:get(<<"device_id">>, hd(Payloads), undefined);
DeviceId ->
DeviceIdTkn = emqx_plugin_libs_rule:preproc_tmpl(DeviceId),
emqx_plugin_libs_rule:proc_tmpl(DeviceIdTkn, Message)
DeviceIdTkn = emqx_placeholder:preproc_tmpl(DeviceId),
emqx_placeholder:proc_tmpl(DeviceIdTkn, Message)
end.
handle_response({ok, 200, _Headers, Body} = Resp) ->

View File

@ -301,7 +301,23 @@ fields(producer_kafka_opts) ->
mk(ref(producer_buffer), #{
required => false,
desc => ?DESC(producer_buffer)
})}
})},
{query_mode,
mk(
enum([async, sync]),
#{
default => async,
desc => ?DESC(query_mode)
}
)},
{sync_query_timeout,
mk(
emqx_schema:timeout_duration_ms(),
#{
default => <<"5s">>,
desc => ?DESC(sync_query_timeout)
}
)}
];
fields(kafka_message) ->
[

View File

@ -8,7 +8,7 @@
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
query_mode/1,
on_start/2,
on_stop/2,
on_get_status/2
@ -68,7 +68,7 @@
resource_id := resource_id(),
topic_mapping := #{
kafka_topic() := #{
payload_template := emqx_plugin_libs_rule:tmpl_token(),
payload_template := emqx_placeholder:tmpl_token(),
mqtt_topic => emqx_types:topic(),
qos => emqx_types:qos()
}
@ -82,7 +82,7 @@
resource_id := resource_id(),
topic_mapping := #{
kafka_topic() := #{
payload_template := emqx_plugin_libs_rule:tmpl_token(),
payload_template := emqx_placeholder:tmpl_token(),
mqtt_topic => emqx_types:topic(),
qos => emqx_types:qos()
}
@ -112,11 +112,9 @@
callback_mode() ->
async_if_possible.
%% there are no queries to be made to this bridge, so we say that
%% buffer is supported so we don't spawn unused resource buffer
%% workers.
is_buffer_supported() ->
true.
%% consumer bridges don't need resource workers
query_mode(_Config) ->
no_queries.
-spec on_start(resource_id(), config()) -> {ok, state()}.
on_start(ResourceId, Config) ->
@ -539,7 +537,7 @@ convert_topic_mapping(TopicMappingList) ->
qos := QoS,
payload_template := PayloadTemplate0
} = Fields,
PayloadTemplate = emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate0),
PayloadTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate0),
Acc#{
KafkaTopic => #{
payload_template => PayloadTemplate,
@ -559,10 +557,10 @@ render(FullMessage, PayloadTemplate) ->
(undefined) ->
<<>>;
(X) ->
emqx_plugin_libs_rule:bin(X)
emqx_utils_conv:bin(X)
end
},
emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, FullMessage, Opts).
emqx_placeholder:proc_tmpl(PayloadTemplate, FullMessage, Opts).
encode(Value, none) ->
Value;

View File

@ -4,10 +4,11 @@
-module(emqx_bridge_kafka_impl_producer).
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
%% callbacks of behaviour emqx_resource
-export([
is_buffer_supported/0,
query_mode/1,
callback_mode/0,
on_start/2,
on_stop/2,
@ -32,7 +33,10 @@
%% to hocon; keeping this as just `kafka' for backwards compatibility.
-define(BRIDGE_TYPE, kafka).
is_buffer_supported() -> true.
query_mode(#{kafka := #{query_mode := sync}}) ->
simple_sync;
query_mode(_) ->
simple_async.
callback_mode() -> async_if_possible.
@ -43,7 +47,11 @@ on_start(InstId, Config) ->
bootstrap_hosts := Hosts0,
bridge_name := BridgeName,
connect_timeout := ConnTimeout,
kafka := KafkaConfig = #{message := MessageTemplate, topic := KafkaTopic},
kafka := KafkaConfig = #{
message := MessageTemplate,
topic := KafkaTopic,
sync_query_timeout := SyncQueryTimeout
},
metadata_request_timeout := MetaReqTimeout,
min_metadata_refresh_interval := MinMetaRefreshInterval,
socket_opts := SocketOpts,
@ -99,7 +107,8 @@ on_start(InstId, Config) ->
client_id => ClientId,
kafka_topic => KafkaTopic,
producers => Producers,
resource_id => ResourceId
resource_id => ResourceId,
sync_query_timeout => SyncQueryTimeout
}};
{error, Reason2} ->
?SLOG(error, #{
@ -189,14 +198,16 @@ on_stop(InstanceId, _State) ->
on_query(
_InstId,
{send_message, Message},
#{message_template := Template, producers := Producers}
#{
message_template := Template,
producers := Producers,
sync_query_timeout := SyncTimeout
}
) ->
?tp(emqx_bridge_kafka_impl_producer_sync_query, #{}),
KafkaMessage = render_message(Template, Message),
%% TODO: this function is not used so far,
%% timeout should be configurable
%% or the on_query/3 should be on_query/4 instead.
try
{_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], 5000),
{_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout),
ok
catch
error:{producer_down, _} = Reason ->
@ -217,6 +228,7 @@ on_query_async(
AsyncReplyFn,
#{message_template := Template, producers := Producers}
) ->
?tp(emqx_bridge_kafka_impl_producer_async_query, #{}),
KafkaMessage = render_message(Template, Message),
%% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
%% * Must be a single element batch because wolff books calls, but not batch sizes
@ -240,7 +252,7 @@ compile_message_template(T) ->
}.
preproc_tmpl(Tmpl) ->
emqx_plugin_libs_rule:preproc_tmpl(Tmpl).
emqx_placeholder:preproc_tmpl(Tmpl).
render_message(
#{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message
@ -255,11 +267,11 @@ render(Template, Message) ->
Opts = #{
var_trans => fun
(undefined) -> <<"">>;
(X) -> emqx_plugin_libs_rule:bin(X)
(X) -> emqx_utils_conv:bin(X)
end,
return => full_binary
},
emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts).
emqx_placeholder:proc_tmpl(Template, Message, Opts).
render_timestamp(Template, Message) ->
try

View File

@ -120,6 +120,34 @@ set_special_configs(emqx_dashboard) ->
ok;
set_special_configs(_) ->
ok.
%%------------------------------------------------------------------------------
%% Test case for the query_mode parameter
%%------------------------------------------------------------------------------
t_query_mode(CtConfig) ->
%% We need this because on_query_async is in a different group
CtConfig1 = [{query_api, none} | CtConfig],
?check_trace(
begin
publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "sync"})
end,
fun(RunStageResult, Trace) ->
%% We should have a sync Snabbkaffe trace
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_sync_query, Trace))
end
),
?check_trace(
begin
publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"})
end,
fun(RunStageResult, Trace) ->
%% We should have a sync Snabbkaffe trace
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace))
end
),
ok.
%%------------------------------------------------------------------------------
%% Test cases for all combinations of SSL, no SSL and authentication types
%%------------------------------------------------------------------------------
@ -473,6 +501,16 @@ do_send(Ref, Config, ResourceId, Msg, State) when is_list(Config) ->
ok
end.
publish_with_config_template_parameters(CtConfig, ConfigTemplateParameters) ->
publish_helper(
CtConfig,
#{
auth_settings => "none",
ssl_settings => #{}
},
ConfigTemplateParameters
).
publish_with_and_without_ssl(CtConfig, AuthSettings) ->
publish_with_and_without_ssl(CtConfig, AuthSettings, #{}).
@ -537,21 +575,25 @@ publish_helper(
{ok, _} = emqx_bridge:create(
<<?BRIDGE_TYPE>>, list_to_binary(Name), Conf
),
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
Partition = 0,
Msg = #{
clientid => BinTime,
payload => <<"payload">>,
timestamp => Time
},
{ok, Offset0} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition),
ct:pal("base offset before testing ~p", [Offset0]),
{ok, _Group, #{state := State}} = emqx_resource:get_instance(InstId),
ok = send(CtConfig, InstId, Msg, State),
{ok, {_, [KafkaMsg0]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset0),
?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0),
case proplists:get_value(query_api, CtConfig) of
none ->
ok;
_ ->
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
Msg = #{
clientid => BinTime,
payload => <<"payload">>,
timestamp => Time
},
{ok, Offset0} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition),
ct:pal("base offset before testing ~p", [Offset0]),
{ok, _Group, #{state := State}} = emqx_resource:get_instance(InstId),
ok = send(CtConfig, InstId, Msg, State),
{ok, {_, [KafkaMsg0]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset0),
?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0)
end,
%% test that it forwards from local mqtt topic as well
{ok, Offset1} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition),
ct:pal("base offset before testing (2) ~p", [Offset1]),
@ -596,13 +638,15 @@ hocon_config(Args) ->
AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf),
SSLConf = maps:get("ssl", Args, #{}),
SSLTemplate = iolist_to_binary(hocon_config_template_ssl(SSLConf)),
QueryMode = maps:get("query_mode", Args, <<"async">>),
SSLConfRendered = bbmustache:render(SSLTemplate, SSLConf),
Hocon = bbmustache:render(
iolist_to_binary(hocon_config_template()),
Args#{
"authentication" => AuthConfRendered,
"bridge_name" => Name,
"ssl" => SSLConfRendered
"ssl" => SSLConfRendered,
"query_mode" => QueryMode
}
),
Hocon.
@ -630,6 +674,7 @@ bridges.kafka.{{ bridge_name }} {
}
partition_strategy = {{ partition_strategy }}
topic = \"{{ kafka_topic }}\"
query_mode = {{ query_mode }}
}
metadata_request_timeout = 5s
min_metadata_refresh_interval = 3s

View File

@ -21,7 +21,7 @@
-export_type([msgvars/0]).
-type template() :: emqx_plugin_libs_rule:tmpl_token().
-type template() :: emqx_placeholder:tmpl_token().
-type msgvars() :: #{
topic => template(),
@ -48,7 +48,7 @@ parse(Conf) ->
parse_field(Key, Conf, Acc) ->
case Conf of
#{Key := Val} when is_binary(Val) ->
Acc#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)};
Acc#{Key => emqx_placeholder:preproc_tmpl(Val)};
#{Key := Val} ->
Acc#{Key => Val};
#{} ->

View File

@ -17,7 +17,6 @@
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
@ -47,9 +46,9 @@ fields(config) ->
%% `emqx_resource' API
%%========================================================================================
callback_mode() -> always_sync.
-define(HTTP_CONNECT_TIMEOUT, 1000).
is_buffer_supported() -> false.
callback_mode() -> always_sync.
on_start(
InstanceId,
@ -171,7 +170,7 @@ opentsdb_connectivity(Server) ->
<<"https://", _/binary>> -> Server;
_ -> "http://" ++ Server
end,
emqx_plugin_libs_rule:http_connectivity(SvrUrl).
emqx_connector_lib:http_connectivity(SvrUrl, ?HTTP_CONNECT_TIMEOUT).
format_opentsdb_msg(Msg) ->
maps:with(

View File

@ -11,7 +11,7 @@
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
query_mode/1,
on_start/2,
on_stop/2,
on_get_status/2,
@ -34,8 +34,8 @@
value := binary()
}.
-type message_template() :: #{
key := emqx_plugin_libs_rule:tmpl_token(),
value := emqx_plugin_libs_rule:tmpl_token()
key := emqx_placeholder:tmpl_token(),
value := emqx_placeholder:tmpl_token()
}.
-type config() :: #{
authentication := _,
@ -70,10 +70,8 @@
callback_mode() -> async_if_possible.
%% there are no queries to be made to this bridge, so we say that
%% buffer is supported so we don't spawn unused resource buffer
%% workers.
is_buffer_supported() -> true.
query_mode(_Config) ->
simple_async.
-spec on_start(resource_id(), config()) -> {ok, state()}.
on_start(InstanceId, Config) ->
@ -421,7 +419,7 @@ compile_message_template(TemplateOpts) ->
}.
preproc_tmpl(Template) ->
emqx_plugin_libs_rule:preproc_tmpl(Template).
emqx_placeholder:preproc_tmpl(Template).
render_message(
Message, #{key := KeyTemplate, value := ValueTemplate}
@ -435,11 +433,11 @@ render(Message, Template) ->
Opts = #{
var_trans => fun
(undefined) -> <<"">>;
(X) -> emqx_plugin_libs_rule:bin(X)
(X) -> emqx_utils_conv:bin(X)
end,
return => full_binary
},
emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts).
emqx_placeholder:proc_tmpl(Template, Message, Opts).
get_producer_status(Producers) ->
case pulsar_producers:all_connected(Producers) of

View File

@ -34,7 +34,6 @@
%% Optional callbacks
on_get_status/2,
on_query/3,
is_buffer_supported/0,
on_batch_query/3
]).
@ -187,11 +186,6 @@ callback_mode() -> always_sync.
%% emqx_resource callback
-spec is_buffer_supported() -> boolean().
is_buffer_supported() ->
%% We want to make use of EMQX's buffer mechanism
false.
%% emqx_resource callback called when the resource is started
-spec on_start(resource_id(), term()) -> {ok, resource_state()} | {error, _}.
@ -225,7 +219,7 @@ on_start(
{pool_size, PoolSize},
{pool, InstanceID}
],
ProcessedTemplate = emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate),
ProcessedTemplate = emqx_placeholder:preproc_tmpl(PayloadTemplate),
State = #{
poolname => InstanceID,
processed_payload_template => ProcessedTemplate,
@ -547,7 +541,7 @@ is_send_message_atom(_) ->
format_data([], Msg) ->
emqx_utils_json:encode(Msg);
format_data(Tokens, Msg) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Msg).
emqx_placeholder:proc_tmpl(Tokens, Msg).
handle_result({error, ecpool_empty}) ->
{error, {recoverable_error, ecpool_empty}};

View File

@ -17,7 +17,6 @@
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
@ -86,8 +85,6 @@ servers() ->
callback_mode() -> always_sync.
is_buffer_supported() -> false.
on_start(
InstanceId,
#{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config
@ -102,7 +99,7 @@ on_start(
emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS)
),
ClientId = client_id(InstanceId),
TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic),
TopicTks = emqx_placeholder:preproc_tmpl(Topic),
#{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config),
ClientCfg = #{acl_info => AclInfo},
Templates = parse_template(Config),
@ -240,7 +237,7 @@ parse_template(Config) ->
parse_template(maps:to_list(Templates), #{}).
parse_template([{Key, H} | T], Templates) ->
ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(H),
ParamsTks = emqx_placeholder:preproc_tmpl(H),
parse_template(
T,
Templates#{Key => ParamsTks}
@ -249,7 +246,7 @@ parse_template([], Templates) ->
Templates.
get_topic_key({_, Msg}, TopicTks) ->
emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg);
emqx_placeholder:proc_tmpl(TopicTks, Msg);
get_topic_key([Query | _], TopicTks) ->
get_topic_key(Query, TopicTks).
@ -258,14 +255,14 @@ apply_template({Key, Msg} = _Req, Templates) ->
undefined ->
emqx_utils_json:encode(Msg);
Template ->
emqx_plugin_libs_rule:proc_tmpl(Template, Msg)
emqx_placeholder:proc_tmpl(Template, Msg)
end;
apply_template([{Key, _} | _] = Reqs, Templates) ->
case maps:get(Key, Templates, undefined) of
undefined ->
[emqx_utils_json:encode(Msg) || {_, Msg} <- Reqs];
Template ->
[emqx_plugin_libs_rule:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs]
[emqx_placeholder:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs]
end.
client_id(ResourceId) ->

View File

@ -30,7 +30,6 @@
%% callbacks for behaviour emqx_resource
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
@ -44,7 +43,7 @@
%% Internal exports used to execute code with ecpool worker
-export([do_get_status/1, worker_do_insert/3]).
-import(emqx_plugin_libs_rule, [str/1]).
-import(emqx_utils_conv, [str/1]).
-import(hoconsc, [mk/2, enum/1, ref/2]).
-define(ACTION_SEND_MESSAGE, send_message).
@ -169,8 +168,6 @@ server() ->
callback_mode() -> always_sync.
is_buffer_supported() -> false.
on_start(
ResourceId = PoolName,
#{
@ -443,11 +440,11 @@ parse_sql_template(Config) ->
parse_sql_template(maps:to_list(RawSQLTemplates), BatchInsertTks).
parse_sql_template([{Key, H} | T], BatchInsertTks) ->
case emqx_plugin_libs_rule:detect_sql_type(H) of
{ok, select} ->
case emqx_utils_sql:get_statement_type(H) of
select ->
parse_sql_template(T, BatchInsertTks);
{ok, insert} ->
case emqx_plugin_libs_rule:split_insert_sql(H) of
insert ->
case emqx_utils_sql:parse_insert(H) of
{ok, {InsertSQL, Params}} ->
parse_sql_template(
T,
@ -455,9 +452,7 @@ parse_sql_template([{Key, H} | T], BatchInsertTks) ->
Key =>
#{
?BATCH_INSERT_PART => InsertSQL,
?BATCH_PARAMS_TOKENS => emqx_plugin_libs_rule:preproc_tmpl(
Params
)
?BATCH_PARAMS_TOKENS => emqx_placeholder:preproc_tmpl(Params)
}
}
);
@ -465,6 +460,9 @@ parse_sql_template([{Key, H} | T], BatchInsertTks) ->
?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}),
parse_sql_template(T, BatchInsertTks)
end;
Type when is_atom(Type) ->
?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}),
parse_sql_template(T, BatchInsertTks);
{error, Reason} ->
?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}),
parse_sql_template(T, BatchInsertTks)
@ -478,7 +476,7 @@ parse_sql_template([], BatchInsertTks) ->
apply_template(
{?ACTION_SEND_MESSAGE = _Key, _Msg} = Query, Templates
) ->
%% TODO: fix emqx_plugin_libs_rule:proc_tmpl/2
%% TODO: fix emqx_placeholder:proc_tmpl/2
%% it won't add single quotes for string
apply_template([Query], Templates);
%% batch inserts
@ -490,10 +488,19 @@ apply_template(
undefined ->
BatchReqs;
#{?BATCH_INSERT_PART := BatchInserts, ?BATCH_PARAMS_TOKENS := BatchParamsTks} ->
SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks),
SQL = proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks),
{Key, SQL}
end;
apply_template(Query, Templates) ->
%% TODO: more detail infomatoin
?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}),
{error, failed_to_apply_sql_template}.
proc_batch_sql(BatchReqs, BatchInserts, Tokens) ->
Values = erlang:iolist_to_binary(
lists:join($,, [
emqx_placeholder:proc_sql_param_str(Tokens, Msg)
|| {_, Msg} <- BatchReqs
])
),
<<BatchInserts/binary, " values ", Values/binary>>.

View File

@ -17,7 +17,6 @@
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
@ -79,8 +78,6 @@ server() ->
callback_mode() -> always_sync.
is_buffer_supported() -> false.
on_start(
InstanceId,
#{
@ -126,8 +123,8 @@ on_query(InstanceId, {query, SQL}, State) ->
do_query(InstanceId, SQL, State);
on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) ->
case maps:find(Key, InsertTksMap) of
{ok, Tokens} ->
SQL = emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Data),
{ok, Tokens} when is_map(Data) ->
SQL = emqx_placeholder:proc_sql_param_str(Tokens, Data),
do_query(InstanceId, SQL, State);
_ ->
{error, {unrecoverable_error, invalid_request}}
@ -136,7 +133,7 @@ on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) ->
%% aggregate the batch queries to one SQL is a heavy job, we should put it in the worker process
on_batch_query(
InstanceId,
[{Key, _} | _] = BatchReq,
[{Key, _Data = #{}} | _] = BatchReq,
#{batch_tokens := BatchTksMap, query_opts := Opts} = State
) ->
case maps:find(Key, BatchTksMap) of
@ -231,8 +228,8 @@ do_batch_insert(Conn, Tokens, BatchReqs, Opts) ->
aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) ->
lists:foldl(
fun({_, Data}, Acc) ->
InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data),
ParamsPart = emqx_plugin_libs_rule:proc_sql_param_str(ParamsPartTks, Data),
InsertPart = emqx_placeholder:proc_sql_param_str(InsertPartTks, Data),
ParamsPart = emqx_placeholder:proc_sql_param_str(ParamsPartTks, Data),
Values = maps:get(InsertPart, Acc, []),
maps:put(InsertPart, [ParamsPart | Values], Acc)
end,
@ -256,16 +253,16 @@ parse_prepare_sql(Config) ->
parse_batch_prepare_sql(maps:to_list(SQL), #{}, #{}).
parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) ->
case emqx_plugin_libs_rule:detect_sql_type(H) of
{ok, select} ->
case emqx_utils_sql:get_statement_type(H) of
select ->
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap);
{ok, insert} ->
InsertTks = emqx_plugin_libs_rule:preproc_tmpl(H),
insert ->
InsertTks = emqx_placeholder:preproc_tmpl(H),
H1 = string:trim(H, trailing, ";"),
case split_insert_sql(H1) of
[_InsertStr, InsertPart, _ValuesStr, ParamsPart] ->
InsertPartTks = emqx_plugin_libs_rule:preproc_tmpl(InsertPart),
ParamsPartTks = emqx_plugin_libs_rule:preproc_tmpl(ParamsPart),
InsertPartTks = emqx_placeholder:preproc_tmpl(InsertPart),
ParamsPartTks = emqx_placeholder:preproc_tmpl(ParamsPart),
parse_batch_prepare_sql(
T,
InsertTksMap#{Key => InsertTks},
@ -275,6 +272,9 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) ->
?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}),
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap)
end;
Type when is_atom(Type) ->
?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}),
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap);
{error, Reason} ->
?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}),
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap)
@ -289,7 +289,7 @@ to_bin(List) when is_list(List) ->
unicode:characters_to_binary(List, utf8).
split_insert_sql(SQL0) ->
SQL = emqx_plugin_libs_rule:formalize_sql(SQL0),
SQL = formalize_sql(SQL0),
lists:filtermap(
fun(E) ->
case string:trim(E) of
@ -301,3 +301,9 @@ split_insert_sql(SQL0) ->
end,
re:split(SQL, "(?i)(insert into)|(?i)(values)")
).
formalize_sql(Input) ->
%% 1. replace all whitespaces like '\r' '\n' or spaces to a single space char.
SQL = re:replace(Input, "\\s+", " ", [global, {return, binary}]),
%% 2. trims the result
string:trim(SQL).

View File

@ -496,7 +496,7 @@ t_simple_sql_query(Config) ->
),
case EnableBatch of
true ->
?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false ->
?assertMatch({ok, #{<<"code">> := 0, <<"data">> := [[1]]}}, Result)
end,
@ -535,7 +535,7 @@ t_bad_sql_parameter(Config) ->
2_000
),
?assertMatch({error, #{<<"code">> := _}}, Result),
?assertMatch({error, {unrecoverable_error, invalid_request}}, Result),
ok.
t_nasty_sql_string(Config) ->

View File

@ -320,10 +320,16 @@ typename_to_spec("atom()", _Mod) ->
#{type => string};
typename_to_spec("duration()", _Mod) ->
#{type => duration};
typename_to_spec("timeout_duration()", _Mod) ->
#{type => duration};
typename_to_spec("duration_s()", _Mod) ->
#{type => duration};
typename_to_spec("timeout_duration_s()", _Mod) ->
#{type => duration};
typename_to_spec("duration_ms()", _Mod) ->
#{type => duration};
typename_to_spec("timeout_duration_ms()", _Mod) ->
#{type => duration};
typename_to_spec("percent()", _Mod) ->
#{type => percent};
typename_to_spec("file()", _Mod) ->

View File

@ -464,8 +464,8 @@ preprocess_request(
} = Req
) ->
#{
method => emqx_plugin_libs_rule:preproc_tmpl(to_bin(Method)),
path => emqx_plugin_libs_rule:preproc_tmpl(Path),
method => emqx_placeholder:preproc_tmpl(to_bin(Method)),
path => emqx_placeholder:preproc_tmpl(Path),
body => maybe_preproc_tmpl(body, Req),
headers => wrap_auth_header(preproc_headers(Headers)),
request_timeout => maps:get(request_timeout, Req, ?DEFAULT_REQUEST_TIMEOUT_MS),
@ -477,8 +477,8 @@ preproc_headers(Headers) when is_map(Headers) ->
fun(K, V, Acc) ->
[
{
emqx_plugin_libs_rule:preproc_tmpl(to_bin(K)),
emqx_plugin_libs_rule:preproc_tmpl(to_bin(V))
emqx_placeholder:preproc_tmpl(to_bin(K)),
emqx_placeholder:preproc_tmpl(to_bin(V))
}
| Acc
]
@ -490,8 +490,8 @@ preproc_headers(Headers) when is_list(Headers) ->
lists:map(
fun({K, V}) ->
{
emqx_plugin_libs_rule:preproc_tmpl(to_bin(K)),
emqx_plugin_libs_rule:preproc_tmpl(to_bin(V))
emqx_placeholder:preproc_tmpl(to_bin(K)),
emqx_placeholder:preproc_tmpl(to_bin(V))
}
end,
Headers
@ -530,7 +530,7 @@ try_bin_to_lower(Bin) ->
maybe_preproc_tmpl(Key, Conf) ->
case maps:get(Key, Conf, undefined) of
undefined -> undefined;
Val -> emqx_plugin_libs_rule:preproc_tmpl(Val)
Val -> emqx_placeholder:preproc_tmpl(Val)
end.
process_request(
@ -544,8 +544,8 @@ process_request(
Msg
) ->
Conf#{
method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg)),
path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg),
method => make_method(emqx_placeholder:proc_tmpl(MethodTks, Msg)),
path => emqx_placeholder:proc_tmpl(PathTks, Msg),
body => process_request_body(BodyTks, Msg),
headers => proc_headers(HeadersTks, Msg),
request_timeout => ReqTimeout
@ -554,14 +554,14 @@ process_request(
process_request_body(undefined, Msg) ->
emqx_utils_json:encode(Msg);
process_request_body(BodyTks, Msg) ->
emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg).
emqx_placeholder:proc_tmpl(BodyTks, Msg).
proc_headers(HeaderTks, Msg) ->
lists:map(
fun({K, V}) ->
{
emqx_plugin_libs_rule:proc_tmpl(K, Msg),
emqx_plugin_libs_rule:proc_tmpl(emqx_secret:unwrap(V), Msg)
emqx_placeholder:proc_tmpl(K, Msg),
emqx_placeholder:proc_tmpl(emqx_secret:unwrap(V), Msg)
}
end,
HeaderTks

View File

@ -15,8 +15,39 @@
%%--------------------------------------------------------------------
-module(emqx_connector_lib).
%% connectivity check
-export([
http_connectivity/2,
tcp_connectivity/3
]).
-export([resolve_dns/2]).
-spec http_connectivity(uri_string:uri_string(), timeout()) ->
ok | {error, Reason :: term()}.
http_connectivity(Url, Timeout) ->
case emqx_http_lib:uri_parse(Url) of
{ok, #{host := Host, port := Port}} ->
tcp_connectivity(Host, Port, Timeout);
{error, Reason} ->
{error, Reason}
end.
-spec tcp_connectivity(
Host :: inet:socket_address() | inet:hostname(),
Port :: inet:port_number(),
timeout()
) ->
ok | {error, Reason :: term()}.
tcp_connectivity(Host, Port, Timeout) ->
case gen_tcp:connect(Host, Port, emqx_utils:ipv6_probe([]), Timeout) of
{ok, Sock} ->
gen_tcp:close(Sock),
ok;
{error, Reason} ->
{error, Reason}
end.
%% @doc Mostly for meck.
resolve_dns(DNS, Type) ->
inet_res:lookup(DNS, in, Type).

View File

@ -344,7 +344,7 @@ parse_prepare_sql(Config) ->
parse_prepare_sql(maps:to_list(SQL), #{}, #{}, #{}, #{}).
parse_prepare_sql([{Key, H} | _] = L, Prepares, Tokens, BatchInserts, BatchTks) ->
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H),
{PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H),
parse_batch_prepare_sql(
L, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}, BatchInserts, BatchTks
);
@ -357,13 +357,13 @@ parse_prepare_sql([], Prepares, Tokens, BatchInserts, BatchTks) ->
}.
parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks) ->
case emqx_plugin_libs_rule:detect_sql_type(H) of
{ok, select} ->
case emqx_utils_sql:get_statement_type(H) of
select ->
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks);
{ok, insert} ->
case emqx_plugin_libs_rule:split_insert_sql(H) of
insert ->
case emqx_utils_sql:parse_insert(H) of
{ok, {InsertSQL, Params}} ->
ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(Params),
ParamsTks = emqx_placeholder:preproc_tmpl(Params),
parse_prepare_sql(
T,
Prepares,
@ -375,6 +375,9 @@ parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks
?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}),
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks)
end;
Type when is_atom(Type) ->
?SLOG(error, #{msg => "detect sql type unsupported", sql => H, type => Type}),
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks);
{error, Reason} ->
?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}),
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks)
@ -389,7 +392,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens})
undefined ->
{SQLOrData, Params};
Tokens ->
{TypeOrKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
{TypeOrKey, emqx_placeholder:proc_sql(Tokens, SQLOrData)}
end.
on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) ->

View File

@ -188,7 +188,7 @@ on_batch_query(
{error, {unrecoverable_error, batch_prepare_not_implemented}};
TokenList ->
{_, Datas} = lists:unzip(BatchReq),
Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas],
St = maps:get(BinKey, Sts),
case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of
{error, _Error} = Result ->
@ -218,7 +218,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens})
undefined ->
{SQLOrData, Params};
Tokens ->
{Key, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
{Key, emqx_placeholder:proc_sql(Tokens, SQLOrData)}
end.
on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
@ -350,7 +350,7 @@ parse_prepare_sql(Config) ->
parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
parse_prepare_sql([{Key, H} | T], Prepares, Tokens) ->
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '$n'),
{PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '$n'),
parse_prepare_sql(
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
);

View File

@ -0,0 +1,54 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_lib_tests).
-include_lib("eunit/include/eunit.hrl").
http_connectivity_ok_test() ->
{ok, Socket} = gen_tcp:listen(0, []),
{ok, Port} = inet:port(Socket),
?assertEqual(
ok,
emqx_connector_lib:http_connectivity("http://127.0.0.1:" ++ integer_to_list(Port), 1000)
),
gen_tcp:close(Socket).
http_connectivity_error_test() ->
{ok, Socket} = gen_tcp:listen(0, []),
{ok, Port} = inet:port(Socket),
ok = gen_tcp:close(Socket),
?assertEqual(
{error, econnrefused},
emqx_connector_lib:http_connectivity("http://127.0.0.1:" ++ integer_to_list(Port), 1000)
).
tcp_connectivity_ok_test() ->
{ok, Socket} = gen_tcp:listen(0, []),
{ok, Port} = inet:port(Socket),
?assertEqual(
ok,
emqx_connector_lib:tcp_connectivity("127.0.0.1", Port, 1000)
),
ok = gen_tcp:close(Socket).
tcp_connectivity_error_test() ->
{ok, Socket} = gen_tcp:listen(0, []),
{ok, Port} = inet:port(Socket),
ok = gen_tcp:close(Socket),
?assertEqual(
{error, econnrefused},
emqx_connector_lib:tcp_connectivity("127.0.0.1", Port, 1000)
).

View File

@ -29,6 +29,7 @@ file_transfer {
enable = true
storage {
local {
enable = true
exporter {
local { root = "/var/lib/emqx/transfers" }
}
@ -50,7 +51,9 @@ file_transfer {
enable = true
storage {
local {
enable = true
exporter {
enable = true
s3 {
host = "s3.us-east-1.amazonaws.com"
port = "443"

View File

@ -25,6 +25,10 @@
-export([schema/1]).
%% Utilities
-export([backend/1]).
%% Test-only helpers
-export([translate/1]).
-type json_value() ::
@ -145,7 +149,7 @@ fields(local_storage) ->
}
}
)}
];
] ++ common_backend_fields();
fields(local_storage_segments) ->
[
{root,
@ -193,9 +197,9 @@ fields(local_storage_exporter) ->
required => false
}
)}
];
] ++ common_backend_fields();
fields(s3_exporter) ->
emqx_s3_schema:fields(s3);
emqx_s3_schema:fields(s3) ++ common_backend_fields();
fields(local_storage_segments_gc) ->
[
{interval,
@ -232,6 +236,18 @@ fields(local_storage_segments_gc) ->
)}
].
common_backend_fields() ->
[
{enable,
mk(
boolean(), #{
desc => ?DESC("backend_enable"),
required => false,
default => true
}
)}
].
desc(file_transfer) ->
"File transfer settings";
desc(local_storage) ->
@ -278,11 +294,14 @@ validator(filename) ->
];
validator(backend) ->
fun(Config) ->
case maps:keys(Config) of
[_Type] ->
Enabled = maps:filter(fun(_, #{<<"enable">> := E}) -> E end, Config),
case maps:to_list(Enabled) of
[{_Type, _BackendConfig}] ->
ok;
_Conflicts = [_ | _] ->
{error, multiple_conflicting_backends}
{error, multiple_enabled_backends};
_None = [] ->
{error, no_enabled_backend}
end
end.
@ -314,11 +333,24 @@ converter(unicode_string) ->
ref(Ref) ->
ref(?MODULE, Ref).
%% Utilities
-spec backend(emqx_config:config()) ->
{_Type :: atom(), emqx_config:config()}.
backend(Config) ->
catch maps:foreach(fun emit_enabled/2, Config).
-spec emit_enabled(atom(), emqx_config:config()) ->
no_return().
emit_enabled(Type, BConf = #{enable := Enabled}) ->
Enabled andalso throw({Type, BConf}).
%% Test-only helpers
-spec translate(emqx_config:raw_config()) ->
emqx_config:config().
translate(Conf) ->
[Root] = roots(),
maps:get(
Root,
hocon_tconf:check_plain(
?MODULE, #{atom_to_binary(Root) => Conf}, #{atom_key => true}, [Root]
)
).
RootRaw = atom_to_binary(Root),
ConfChecked = hocon_tconf:check_plain(?MODULE, #{RootRaw => Conf}, #{}, [Root]),
emqx_utils_maps:unsafe_atom_key_map(maps:get(RootRaw, ConfChecked)).

View File

@ -182,8 +182,8 @@ on_backend_update(BackendOld, BackendNew) when
%%--------------------------------------------------------------------
-spec backend(config()) -> backend().
backend(#{local := Storage}) ->
{local, Storage}.
backend(Config) ->
emqx_ft_schema:backend(Config).
on_storage_start({Type, Storage}) ->
(mod(Type)):start(Storage).

View File

@ -175,10 +175,10 @@ stop({ExporterMod, ExporterOpts}) ->
%%------------------------------------------------------------------------------
exporter(Storage) ->
case maps:get(exporter, Storage) of
#{local := Options} ->
case emqx_ft_schema:backend(maps:get(exporter, Storage)) of
{local, Options} ->
{emqx_ft_storage_exporter_fs, Options};
#{s3 := Options} ->
{s3, Options} ->
{emqx_ft_storage_exporter_s3, Options}
end.

View File

@ -46,6 +46,7 @@
-export_type([options/0]).
-type options() :: #{
enable := true,
root => file:name(),
_ => _
}.

View File

@ -102,6 +102,7 @@
-type storage() :: #{
type := 'local',
enable := true,
segments := segments(),
exporter := emqx_ft_storage_exporter:exporter()
}.

View File

@ -256,6 +256,7 @@ storage(Config) ->
},
<<"exporter">> => #{
<<"local">> => #{
<<"enable">> => true,
<<"root">> => ?config(exports_root, Config)
}
}

View File

@ -77,6 +77,7 @@ t_update_config(_Config) ->
},
<<"exporter">> => #{
<<"local">> => #{
<<"enable">> => true,
<<"root">> => <<"/tmp/exports">>
}
}
@ -104,7 +105,10 @@ t_disable_restore_config(Config) ->
{ok, _},
emqx_conf:update(
[file_transfer],
#{<<"enable">> => true, <<"storage">> => #{<<"local">> => #{}}},
#{
<<"enable">> => true,
<<"storage">> => #{<<"local">> => #{}}
},
#{}
)
),
@ -207,6 +211,7 @@ t_switch_exporter(_Config) ->
[file_transfer, storage, local, exporter],
#{
<<"s3">> => #{
<<"enable">> => true,
<<"bucket">> => <<"emqx">>,
<<"host">> => <<"https://localhost">>,
<<"port">> => 9000,
@ -234,7 +239,7 @@ t_switch_exporter(_Config) ->
{ok, _},
emqx_conf:update(
[file_transfer, storage, local, exporter],
#{<<"local">> => #{}},
#{<<"local">> => #{<<"enable">> => true}},
#{}
)
),

View File

@ -45,9 +45,10 @@ init_per_testcase(TC, Config) ->
<<"enable">> => true,
<<"storage">> => #{
<<"local">> => #{
<<"enable">> => true,
<<"segments">> => #{<<"root">> => SegmentsRoot},
<<"exporter">> => #{
<<"local">> => #{<<"root">> => ExportsRoot}
<<"local">> => #{<<"enable">> => true, <<"root">> => ExportsRoot}
}
}
}

View File

@ -55,17 +55,24 @@ local_storage(Config) ->
local_storage(Config, Opts) ->
#{
<<"local">> => #{
<<"enable">> => true,
<<"segments">> => #{<<"root">> => root(Config, node(), [segments])},
<<"exporter">> => exporter(Config, Opts)
}
}.
exporter(Config, #{exporter := local}) ->
#{<<"local">> => #{<<"root">> => root(Config, node(), [exports])}};
#{
<<"local">> => #{
<<"enable">> => true,
<<"root">> => root(Config, node(), [exports])
}
};
exporter(_Config, #{exporter := s3, bucket_name := BucketName}) ->
BaseConfig = emqx_s3_test_helpers:base_raw_config(tcp),
#{
<<"s3">> => BaseConfig#{
<<"enable">> => true,
<<"bucket">> => list_to_binary(BucketName),
<<"host">> => ?S3_HOST,
<<"port">> => ?S3_PORT

View File

@ -300,8 +300,8 @@ feedvar(Override, Packet, ConnInfo, ClientInfo) ->
},
maps:map(
fun(_K, V) ->
Tokens = emqx_plugin_libs_rule:preproc_tmpl(V),
emqx_plugin_libs_rule:proc_tmpl(Tokens, Envs)
Tokens = emqx_placeholder:preproc_tmpl(V),
emqx_placeholder:proc_tmpl(Tokens, Envs)
end,
Override
).

View File

@ -283,8 +283,8 @@ feedvar(Override, Packet, ConnInfo, ClientInfo) ->
},
maps:map(
fun(_K, V) ->
Tokens = emqx_plugin_libs_rule:preproc_tmpl(V),
emqx_plugin_libs_rule:proc_tmpl(Tokens, Envs)
Tokens = emqx_placeholder:preproc_tmpl(V),
emqx_placeholder:proc_tmpl(Tokens, Envs)
end,
Override
).

View File

@ -3,7 +3,7 @@
{id, "emqx_machine"},
{description, "The EMQX Machine"},
% strict semver, bump manually!
{vsn, "0.2.5"},
{vsn, "0.2.6"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, emqx_ctl]},

View File

@ -137,7 +137,6 @@ basic_reboot_apps() ->
emqx_resource,
emqx_rule_engine,
emqx_bridge,
emqx_plugin_libs,
emqx_management,
emqx_retainer,
emqx_exhook,

View File

@ -51,7 +51,6 @@ init_per_suite(Config) ->
emqx_resource,
emqx_rule_engine,
emqx_bridge,
emqx_plugin_libs,
emqx_management,
emqx_retainer,
emqx_exhook,

View File

@ -48,8 +48,8 @@
-type start_opts() :: #{
conn_evict_rate => pos_integer(),
sess_evict_rate => pos_integer(),
wait_health_check => pos_integer(),
wait_takeover => pos_integer(),
wait_health_check => number(),
wait_takeover => number(),
abs_conn_threshold => pos_integer(),
rel_conn_threshold => number(),
abs_sess_threshold => pos_integer(),
@ -438,7 +438,7 @@ is_node_available() ->
node().
all_nodes() ->
mria_mnesia:running_nodes().
emqx:running_nodes().
seconds(Sec) ->
round(timer:seconds(Sec)).

View File

@ -202,10 +202,10 @@ schema("/load_rebalance/:node/evacuation/stop") ->
}}.
'/load_rebalance/availability_check'(get, #{}) ->
case emqx_eviction_agent:status() of
case emqx_node_rebalance_status:local_status() of
disabled ->
{200, #{}};
{enabled, _Stats} ->
_ ->
error_response(503, ?NODE_EVACUATING, <<"Node Evacuating">>)
end.
@ -258,11 +258,11 @@ wrap_rpc(Node, RPCResult) ->
{200, #{}};
{error, Reason} ->
error_response(
400, ?BAD_REQUEST, io_lib:format("error on node ~p: ~p", [Node, Reason])
400, ?BAD_REQUEST, binfmt("error on node ~p: ~p", [Node, Reason])
);
{badrpc, Reason} ->
error_response(
503, ?RPC_ERROR, io_lib:format("RPC error on node ~p: ~p", [Node, Reason])
503, ?RPC_ERROR, binfmt("RPC error on node ~p: ~p", [Node, Reason])
)
end.
@ -299,9 +299,9 @@ with_nodes_at_key(Key, Params, Fun) ->
{ok, Params1} ->
Fun(Params1);
{error, {unavailable, Nodes}} ->
error_response(400, ?NOT_FOUND, io_lib:format("Nodes unavailable: ~p", [Nodes]));
error_response(400, ?NOT_FOUND, binfmt("Nodes unavailable: ~p", [Nodes]));
{error, {invalid, Nodes}} ->
error_response(400, ?BAD_REQUEST, io_lib:format("Invalid nodes: ~p", [Nodes]))
error_response(400, ?BAD_REQUEST, binfmt("Invalid nodes: ~p", [Nodes]))
end.
parse_node(Bin) when is_binary(Bin) ->
@ -331,6 +331,8 @@ without(Keys, Props) ->
Props
).
binfmt(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)).
%%------------------------------------------------------------------------------
%% Schema
%%------------------------------------------------------------------------------
@ -432,6 +434,14 @@ fields(rebalance_start) ->
];
fields(rebalance_evacuation_start) ->
[
{"wait_health_check",
mk(
emqx_schema:timeout_duration_s(),
#{
desc => ?DESC(wait_health_check),
required => false
}
)},
{"conn_evict_rate",
mk(
pos_integer(),
@ -712,6 +722,7 @@ rebalance_example() ->
rebalance_evacuation_example() ->
#{
wait_health_check => 10,
conn_evict_rate => 100,
sess_evict_rate => 100,
redirect_to => <<"othernode:1883">>,

View File

@ -103,6 +103,7 @@ cli(_) ->
[
{
"rebalance start --evacuation \\\n"
" [--wait-health-check Secs] \\\n"
" [--redirect-to \"Host1:Port1 Host2:Port2 ...\"] \\\n"
" [--conn-evict-rate CountPerSec] \\\n"
" [--migrate-to \"node1@host1 node2@host2 ...\"] \\\n"
@ -182,8 +183,6 @@ collect_args(["--migrate-to", MigrateTo | Args], Map) ->
%% rebalance
collect_args(["--nodes", Nodes | Args], Map) ->
collect_args(Args, Map#{"--nodes" => Nodes});
collect_args(["--wait-health-check", WaitHealthCheck | Args], Map) ->
collect_args(Args, Map#{"--wait-health-check" => WaitHealthCheck});
collect_args(["--abs-conn-threshold", AbsConnThres | Args], Map) ->
collect_args(Args, Map#{"--abs-conn-threshold" => AbsConnThres});
collect_args(["--rel-conn-threshold", RelConnThres | Args], Map) ->
@ -193,6 +192,8 @@ collect_args(["--abs-sess-threshold", AbsSessThres | Args], Map) ->
collect_args(["--rel-sess-threshold", RelSessThres | Args], Map) ->
collect_args(Args, Map#{"--rel-sess-threshold" => RelSessThres});
%% common
collect_args(["--wait-health-check", WaitHealthCheck | Args], Map) ->
collect_args(Args, Map#{"--wait-health-check" => WaitHealthCheck});
collect_args(["--conn-evict-rate", ConnEvictRate | Args], Map) ->
collect_args(Args, Map#{"--conn-evict-rate" => ConnEvictRate});
collect_args(["--wait-takeover", WaitTakeover | Args], Map) ->
@ -207,6 +208,8 @@ validate_evacuation([], Map) ->
{ok, Map};
validate_evacuation([{"--evacuation", _} | Rest], Map) ->
validate_evacuation(Rest, Map);
validate_evacuation([{"--wait-health-check", _} | _] = Opts, Map) ->
validate_pos_int(wait_health_check, Opts, Map, fun validate_evacuation/2);
validate_evacuation([{"--redirect-to", ServerReference} | Rest], Map) ->
validate_evacuation(Rest, Map#{server_reference => list_to_binary(ServerReference)});
validate_evacuation([{"--conn-evict-rate", _} | _] = Opts, Map) ->

View File

@ -53,10 +53,11 @@
server_reference => emqx_eviction_agent:server_reference(),
conn_evict_rate => pos_integer(),
sess_evict_rate => pos_integer(),
wait_takeover => pos_integer(),
migrate_to => migrate_to()
wait_takeover => number(),
migrate_to => migrate_to(),
wait_health_check => number()
}.
-type start_error() :: already_started | eviction_agent_busy.
-type start_error() :: already_started.
-type stats() :: #{
initial_conns := non_neg_integer(),
initial_sessions := non_neg_integer(),
@ -97,7 +98,7 @@ available_nodes(Nodes) when is_list(Nodes) ->
callback_mode() -> handle_event_function.
%% states: disabled, evicting_conns, waiting_takeover, evicting_sessions, prohibiting
%% states: disabled, waiting_health_check, evicting_conns, waiting_takeover, evicting_sessions, prohibiting
init([]) ->
case emqx_node_rebalance_evacuation_persist:read(default_opts()) of
@ -119,25 +120,20 @@ init([]) ->
%% start
handle_event(
{call, From},
{start, #{server_reference := ServerReference} = Opts},
{start, #{wait_health_check := WaitHealthCheck} = Opts},
disabled,
#{} = Data
) ->
case emqx_eviction_agent:enable(?MODULE, ServerReference) of
ok ->
NewData = init_data(Data, Opts),
ok = emqx_node_rebalance_evacuation_persist:save(Opts),
?SLOG(warning, #{
msg => "node_evacuation_started",
opts => Opts
}),
{next_state, evicting_conns, NewData, [
{state_timeout, 0, evict_conns},
{reply, From, ok}
]};
{error, eviction_agent_busy} ->
{keep_state_and_data, [{reply, From, {error, eviction_agent_busy}}]}
end;
?SLOG(warning, #{
msg => "node_evacuation_started",
opts => Opts
}),
NewData = init_data(Data, Opts),
ok = emqx_node_rebalance_evacuation_persist:save(Opts),
{next_state, waiting_health_check, NewData, [
{state_timeout, seconds(WaitHealthCheck), start_eviction},
{reply, From, ok}
]};
handle_event({call, From}, {start, _Opts}, _State, #{}) ->
{keep_state_and_data, [{reply, From, {error, already_started}}]};
%% stop
@ -167,6 +163,27 @@ handle_event({call, From}, status, State, #{migrate_to := MigrateTo} = Data) ->
{keep_state_and_data, [
{reply, From, {enabled, Stats#{state => State, migrate_to => migrate_to(MigrateTo)}}}
]};
%% start eviction
handle_event(
state_timeout,
start_eviction,
waiting_health_check,
#{server_reference := ServerReference} = Data
) ->
case emqx_eviction_agent:enable(?MODULE, ServerReference) of
ok ->
?tp(debug, eviction_agent_started, #{
data => Data
}),
{next_state, evicting_conns, Data, [
{state_timeout, 0, evict_conns}
]};
{error, eviction_agent_busy} ->
?tp(warning, eviction_agent_busy, #{
data => Data
}),
{next_state, disabled, deinit(Data)}
end;
%% conn eviction
handle_event(
state_timeout,
@ -270,12 +287,14 @@ default_opts() ->
conn_evict_rate => ?DEFAULT_CONN_EVICT_RATE,
sess_evict_rate => ?DEFAULT_SESS_EVICT_RATE,
wait_takeover => ?DEFAULT_WAIT_TAKEOVER,
wait_health_check => ?DEFAULT_WAIT_HEALTH_CHECK,
migrate_to => undefined
}.
init_data(Data0, Opts) ->
Data1 = maps:merge(Data0, Opts),
{enabled, #{connections := ConnCount, sessions := SessCount}} = emqx_eviction_agent:status(),
ConnCount = emqx_eviction_agent:connection_count(),
SessCount = emqx_eviction_agent:session_count(),
Data1#{
initial_conns => ConnCount,
current_conns => ConnCount,
@ -305,4 +324,7 @@ is_node_available() ->
node().
all_nodes() ->
mria_mnesia:running_nodes() -- [node()].
emqx:running_nodes() -- [node()].
seconds(Sec) ->
round(timer:seconds(Sec)).

View File

@ -21,24 +21,16 @@
%% APIs
%%--------------------------------------------------------------------
%% do not persist `migrate_to`:
%% * after restart there is nothing to migrate
%% * this value may be invalid after node was offline
-type persisted_start_opts() :: #{
server_reference => emqx_eviction_agent:server_reference(),
conn_evict_rate => pos_integer(),
sess_evict_rate => pos_integer(),
wait_takeover => pos_integer()
}.
-type start_opts() :: #{
server_reference => emqx_eviction_agent:server_reference(),
conn_evict_rate => pos_integer(),
sess_evict_rate => pos_integer(),
wait_takeover => pos_integer(),
migrate_to => emqx_node_rebalance_evacuation:migrate_to()
wait_takeover => number(),
migrate_to => emqx_node_rebalance_evacuation:migrate_to(),
wait_health_check => number()
}.
-spec save(persisted_start_opts()) -> ok_or_error(term()).
-spec save(start_opts()) -> ok_or_error(term()).
save(
#{
server_reference := ServerReference,
@ -50,7 +42,7 @@ save(
(is_binary(ServerReference) orelse ServerReference =:= undefined) andalso
is_integer(ConnEvictRate) andalso ConnEvictRate > 0 andalso
is_integer(SessEvictRate) andalso SessEvictRate > 0 andalso
is_integer(WaitTakeover) andalso WaitTakeover >= 0
is_number(WaitTakeover) andalso WaitTakeover >= 0
->
Filepath = evacuation_filepath(),
case filelib:ensure_dir(Filepath) of

View File

@ -69,6 +69,7 @@ t_start_evacuation_validation(Config) ->
#{sess_evict_rate => <<"sess">>},
#{redirect_to => 123},
#{wait_takeover => <<"wait">>},
#{wait_health_check => <<"wait">>},
#{migrate_to => []},
#{migrate_to => <<"migrate_to">>},
#{migrate_to => [<<"bad_node">>]},
@ -103,6 +104,7 @@ t_start_evacuation_validation(Config) ->
conn_evict_rate => 10,
sess_evict_rate => 10,
wait_takeover => 10,
wait_health_check => 10,
redirect_to => <<"srv">>,
migrate_to => [atom_to_binary(RecipientNode)]
}

View File

@ -86,11 +86,13 @@ end_per_testcase(_Case, Config) ->
t_agent_busy(Config) ->
[{DonorNode, _DonorPort}] = ?config(cluster_nodes, Config),
ok = rpc:call(DonorNode, emqx_eviction_agent, enable, [other_rebalance, undefined]),
?assertEqual(
{error, eviction_agent_busy},
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)])
?assertWaitEvent(
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
#{?snk_kind := eviction_agent_busy},
5000
).
t_already_started(Config) ->
@ -115,7 +117,12 @@ t_start(Config) ->
[{DonorNode, DonorPort}] = ?config(cluster_nodes, Config),
ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
?assertWaitEvent(
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
#{?snk_kind := eviction_agent_started},
5000
),
?assertMatch(
{error, {use_another_server, #{}}},
emqtt_try_connect([{port, DonorPort}])
@ -126,7 +133,11 @@ t_persistence(Config) ->
[{DonorNode, DonorPort}] = ?config(cluster_nodes, Config),
ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
?assertWaitEvent(
rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
#{?snk_kind := eviction_agent_started},
5000
),
?assertMatch(
{error, {use_another_server, #{}}},
@ -179,7 +190,7 @@ t_conn_evicted(Config) ->
?assertWaitEvent(
ok = rpc:call(DonorNode, emqx_node_rebalance_evacuation, start, [opts(Config)]),
#{?snk_kind := node_evacuation_evict_conn},
1000
5000
),
?assertMatch(
@ -251,6 +262,7 @@ opts(Config) ->
conn_evict_rate => 10,
sess_evict_rate => 10,
wait_takeover => 1,
wait_health_check => 1,
migrate_to => migrate_to(Config)
}.

View File

@ -18,7 +18,6 @@
%% callbacks for behaviour emqx_resource
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
@ -68,8 +67,6 @@
% be sync for now.
callback_mode() -> always_sync.
is_buffer_supported() -> false.
-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
on_start(
InstId,
@ -93,14 +90,14 @@ on_start(
ServiceName =
case maps:get(service_name, Config, undefined) of
undefined -> undefined;
ServiceName0 -> emqx_plugin_libs_rule:str(ServiceName0)
ServiceName0 -> emqx_utils_conv:str(ServiceName0)
end,
Options = [
{host, Host},
{port, Port},
{user, emqx_plugin_libs_rule:str(User)},
{user, emqx_utils_conv:str(User)},
{password, jamdb_secret:wrap(maps:get(password, Config, ""))},
{sid, emqx_plugin_libs_rule:str(Sid)},
{sid, emqx_utils_conv:str(Sid)},
{service_name, ServiceName},
{pool_size, maps:get(<<"pool_size">>, Config, ?DEFAULT_POOL_SIZE)},
{timeout, ?OPT_TIMEOUT},
@ -168,7 +165,7 @@ on_batch_query(
{error, {unrecoverable_error, batch_prepare_not_implemented}};
TokenList ->
{_, Datas} = lists:unzip(BatchReq),
Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas],
St = maps:get(BinKey, Sts),
case
on_sql_query(InstId, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2)
@ -204,7 +201,7 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{
undefined ->
{SQLOrData, Params};
Sql ->
{Sql, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
{Sql, emqx_placeholder:proc_sql(Tokens, SQLOrData)}
end
end.
@ -268,14 +265,14 @@ connect(Opts) ->
jamdb_oracle:start_link(Opts).
sql_query_to_str(SqlQuery) ->
emqx_plugin_libs_rule:str(SqlQuery).
emqx_utils_conv:str(SqlQuery).
sql_params_to_str(Params) when is_list(Params) ->
lists:map(
fun
(false) -> "0";
(true) -> "1";
(Value) -> emqx_plugin_libs_rule:str(Value)
(Value) -> emqx_utils_conv:str(Value)
end,
Params
).
@ -305,7 +302,7 @@ parse_prepare_sql(Config) ->
parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
parse_prepare_sql([{Key, H} | T], Prepares, Tokens) ->
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, ':n'),
{PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, ':n'),
parse_prepare_sql(
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
);

View File

@ -1,8 +0,0 @@
%% -*- mode: erlang -*-
{deps, [
{emqx, {path, "../emqx"}},
{emqx_utils, {path, "../emqx_utils"}}
]}.
{project_plugins, [erlfmt]}.

View File

@ -1,8 +0,0 @@
%% -*- mode: erlang -*-
{application, emqx_plugin_libs, [
{description, "EMQX Plugin utility libs"},
{vsn, "4.3.11"},
{modules, []},
{applications, [kernel, stdlib]},
{env, []}
]}.

View File

@ -1,13 +0,0 @@
%% -*- mode: erlang -*-
{VSN,
[ {"4.3.0",
[ {add_module, emqx_plugin_libs_pool}
]},
{<<".*">>, []}
],
[ {"4.3.0",
[ {delete_module, emqx_plugin_libs_pool}
]},
{<<".*">>, []}
]
}.

View File

@ -1,17 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_plugin_libs).

View File

@ -1,365 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_plugin_libs_rule).
-elvis([{elvis_style, god_modules, disable}]).
%% preprocess and process template string with place holders
-export([
preproc_tmpl/1,
proc_tmpl/2,
proc_tmpl/3,
preproc_cmd/1,
proc_cmd/2,
proc_cmd/3,
preproc_sql/1,
preproc_sql/2,
proc_sql/2,
proc_sql_param_str/2,
proc_cql_param_str/2,
split_insert_sql/1,
detect_sql_type/1,
proc_batch_sql/3,
formalize_sql/1
]).
%% type converting
-export([
str/1,
bin/1,
bool/1,
int/1,
float/1,
float2str/2,
map/1,
utf8_bin/1,
utf8_str/1,
number_to_binary/1,
atom_key/1,
unsafe_atom_key/1
]).
%% connectivity check
-export([
http_connectivity/1,
http_connectivity/2,
tcp_connectivity/2,
tcp_connectivity/3
]).
-export([
now_ms/0,
can_topic_match_oneof/2
]).
-export_type([tmpl_token/0]).
-compile({no_auto_import, [float/1]}).
-type uri_string() :: iodata().
-type tmpl_token() :: list({var, binary()} | {str, binary()}).
-type tmpl_cmd() :: list(tmpl_token()).
-type prepare_statement_key() :: binary().
%% preprocess template string with place holders
-spec preproc_tmpl(binary()) -> tmpl_token().
preproc_tmpl(Str) ->
emqx_placeholder:preproc_tmpl(Str).
-spec proc_tmpl(tmpl_token(), map()) -> binary().
proc_tmpl(Tokens, Data) ->
emqx_placeholder:proc_tmpl(Tokens, Data).
-spec proc_tmpl(tmpl_token(), map(), map()) -> binary() | list().
proc_tmpl(Tokens, Data, Opts) ->
emqx_placeholder:proc_tmpl(Tokens, Data, Opts).
-spec preproc_cmd(binary()) -> tmpl_cmd().
preproc_cmd(Str) ->
emqx_placeholder:preproc_cmd(Str).
-spec proc_cmd([tmpl_token()], map()) -> binary() | list().
proc_cmd(Tokens, Data) ->
emqx_placeholder:proc_cmd(Tokens, Data).
-spec proc_cmd([tmpl_token()], map(), map()) -> list().
proc_cmd(Tokens, Data, Opts) ->
emqx_placeholder:proc_cmd(Tokens, Data, Opts).
%% preprocess SQL with place holders
-spec preproc_sql(Sql :: binary()) -> {prepare_statement_key(), tmpl_token()}.
preproc_sql(Sql) ->
emqx_placeholder:preproc_sql(Sql).
-spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n' | ':n') ->
{prepare_statement_key(), tmpl_token()}.
preproc_sql(Sql, ReplaceWith) ->
emqx_placeholder:preproc_sql(Sql, ReplaceWith).
-spec proc_sql(tmpl_token(), map()) -> list().
proc_sql(Tokens, Data) ->
emqx_placeholder:proc_sql(Tokens, Data).
-spec proc_sql_param_str(tmpl_token(), map()) -> binary().
proc_sql_param_str(Tokens, Data) ->
emqx_placeholder:proc_sql_param_str(Tokens, Data).
-spec proc_cql_param_str(tmpl_token(), map()) -> binary().
proc_cql_param_str(Tokens, Data) ->
emqx_placeholder:proc_cql_param_str(Tokens, Data).
%% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">>
-spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when
InsertSQL :: binary(),
Params :: binary().
split_insert_sql(SQL0) ->
SQL = formalize_sql(SQL0),
case re:split(SQL, "((?i)values)", [{return, binary}]) of
[Part1, _, Part3] ->
case string:trim(Part1, leading) of
<<"insert", _/binary>> = InsertSQL ->
{ok, {InsertSQL, Part3}};
<<"INSERT", _/binary>> = InsertSQL ->
{ok, {InsertSQL, Part3}};
_ ->
{error, not_insert_sql}
end;
_ ->
{error, not_insert_sql}
end.
-spec detect_sql_type(binary()) -> {ok, Type} | {error, atom()} when
Type :: insert | select.
detect_sql_type(SQL) ->
case re:run(SQL, "^\\s*([a-zA-Z]+)", [{capture, all_but_first, list}]) of
{match, [First]} ->
Types = [select, insert],
PropTypes = [{erlang:atom_to_list(Type), Type} || Type <- Types],
LowFirst = string:lowercase(First),
case proplists:lookup(LowFirst, PropTypes) of
{LowFirst, Type} ->
{ok, Type};
_ ->
{error, invalid_sql}
end;
_ ->
{error, invalid_sql}
end.
-spec proc_batch_sql(
BatchReqs :: list({atom(), map()}),
InsertPart :: binary(),
Tokens :: tmpl_token()
) -> InsertSQL :: binary().
proc_batch_sql(BatchReqs, InsertPart, Tokens) ->
ValuesPart = erlang:iolist_to_binary(
lists:join($,, [
proc_sql_param_str(Tokens, Msg)
|| {_, Msg} <- BatchReqs
])
),
<<InsertPart/binary, " values ", ValuesPart/binary>>.
formalize_sql(Input) ->
%% 1. replace all whitespaces like '\r' '\n' or spaces to a single space char.
SQL = re:replace(Input, "\\s+", " ", [global, {return, binary}]),
%% 2. trims the result
string:trim(SQL).
unsafe_atom_key(Key) when is_atom(Key) ->
Key;
unsafe_atom_key(Key) when is_binary(Key) ->
binary_to_atom(Key, utf8);
unsafe_atom_key(Keys = [_Key | _]) ->
[unsafe_atom_key(SubKey) || SubKey <- Keys];
unsafe_atom_key(Key) ->
error({invalid_key, Key}).
atom_key(Key) when is_atom(Key) ->
Key;
atom_key(Key) when is_binary(Key) ->
try
binary_to_existing_atom(Key, utf8)
catch
error:badarg -> error({invalid_key, Key})
end;
%% nested keys
atom_key(Keys = [_Key | _]) ->
[atom_key(SubKey) || SubKey <- Keys];
atom_key(Key) ->
error({invalid_key, Key}).
-spec http_connectivity(uri_string()) -> ok | {error, Reason :: term()}.
http_connectivity(Url) ->
http_connectivity(Url, 3000).
-spec http_connectivity(uri_string(), integer()) -> ok | {error, Reason :: term()}.
http_connectivity(Url, Timeout) ->
case emqx_http_lib:uri_parse(Url) of
{ok, #{host := Host, port := Port}} ->
tcp_connectivity(Host, Port, Timeout);
{error, Reason} ->
{error, Reason}
end.
-spec tcp_connectivity(
Host :: inet:socket_address() | inet:hostname(),
Port :: inet:port_number()
) ->
ok | {error, Reason :: term()}.
tcp_connectivity(Host, Port) ->
tcp_connectivity(Host, Port, 3000).
-spec tcp_connectivity(
Host :: inet:socket_address() | inet:hostname(),
Port :: inet:port_number(),
Timeout :: integer()
) ->
ok | {error, Reason :: term()}.
tcp_connectivity(Host, Port, Timeout) ->
case gen_tcp:connect(Host, Port, emqx_utils:ipv6_probe([]), Timeout) of
{ok, Sock} ->
gen_tcp:close(Sock),
ok;
{error, Reason} ->
{error, Reason}
end.
str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
str(Num) when is_number(Num) -> number_to_list(Num);
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
str(Map) when is_map(Map) -> binary_to_list(emqx_utils_json:encode(Map));
str(List) when is_list(List) ->
case io_lib:printable_list(List) of
true -> List;
false -> binary_to_list(emqx_utils_json:encode(List))
end;
str(Data) ->
error({invalid_str, Data}).
utf8_bin(Str) when is_binary(Str); is_list(Str) ->
unicode:characters_to_binary(Str);
utf8_bin(Str) ->
unicode:characters_to_binary(bin(Str)).
utf8_str(Str) when is_binary(Str); is_list(Str) ->
unicode:characters_to_list(Str);
utf8_str(Str) ->
unicode:characters_to_list(str(Str)).
bin(Bin) when is_binary(Bin) -> Bin;
bin(Num) when is_number(Num) -> number_to_binary(Num);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
bin(Map) when is_map(Map) -> emqx_utils_json:encode(Map);
bin(List) when is_list(List) ->
case io_lib:printable_list(List) of
true -> list_to_binary(List);
false -> emqx_utils_json:encode(List)
end;
bin(Data) ->
error({invalid_bin, Data}).
int(List) when is_list(List) ->
try
list_to_integer(List)
catch
error:badarg ->
int(list_to_float(List))
end;
int(Bin) when is_binary(Bin) ->
try
binary_to_integer(Bin)
catch
error:badarg ->
int(binary_to_float(Bin))
end;
int(Int) when is_integer(Int) -> Int;
int(Float) when is_float(Float) -> erlang:floor(Float);
int(true) ->
1;
int(false) ->
0;
int(Data) ->
error({invalid_number, Data}).
float(List) when is_list(List) ->
try
list_to_float(List)
catch
error:badarg ->
float(list_to_integer(List))
end;
float(Bin) when is_binary(Bin) ->
try
binary_to_float(Bin)
catch
error:badarg ->
float(binary_to_integer(Bin))
end;
float(Num) when is_number(Num) -> erlang:float(Num);
float(Data) ->
error({invalid_number, Data}).
float2str(Float, Precision) when is_float(Float) and is_integer(Precision) ->
float_to_binary(Float, [{decimals, Precision}, compact]).
map(Bin) when is_binary(Bin) ->
case emqx_utils_json:decode(Bin, [return_maps]) of
Map = #{} -> Map;
_ -> error({invalid_map, Bin})
end;
map(List) when is_list(List) -> maps:from_list(List);
map(Map) when is_map(Map) -> Map;
map(Data) ->
error({invalid_map, Data}).
bool(Bool) when
Bool == true;
Bool == <<"true">>;
Bool == 1
->
true;
bool(Bool) when
Bool == false;
Bool == <<"false">>;
Bool == 0
->
false;
bool(Bool) ->
error({invalid_boolean, Bool}).
number_to_binary(Int) when is_integer(Int) ->
integer_to_binary(Int);
number_to_binary(Float) when is_float(Float) ->
float_to_binary(Float, [{decimals, 10}, compact]).
number_to_list(Int) when is_integer(Int) ->
integer_to_list(Int);
number_to_list(Float) when is_float(Float) ->
float_to_list(Float, [{decimals, 10}, compact]).
now_ms() ->
erlang:system_time(millisecond).
can_topic_match_oneof(Topic, Filters) ->
lists:any(
fun(Fltr) ->
emqx_topic:match(Topic, Fltr)
end,
Filters
).

View File

@ -1,84 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_plugin_libs_rule_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-define(PORT, 9876).
all() -> emqx_common_test_helpers:all(?MODULE).
t_http_connectivity(_) ->
{ok, Socket} = gen_tcp:listen(?PORT, []),
ok = emqx_plugin_libs_rule:http_connectivity(
"http://127.0.0.1:" ++ emqx_plugin_libs_rule:str(?PORT), 1000
),
gen_tcp:close(Socket),
{error, _} = emqx_plugin_libs_rule:http_connectivity(
"http://127.0.0.1:" ++ emqx_plugin_libs_rule:str(?PORT), 1000
).
t_tcp_connectivity(_) ->
{ok, Socket} = gen_tcp:listen(?PORT, []),
ok = emqx_plugin_libs_rule:tcp_connectivity("127.0.0.1", ?PORT, 1000),
gen_tcp:close(Socket),
{error, _} = emqx_plugin_libs_rule:tcp_connectivity("127.0.0.1", ?PORT, 1000).
t_str(_) ->
?assertEqual("abc", emqx_plugin_libs_rule:str("abc")),
?assertEqual("abc", emqx_plugin_libs_rule:str(abc)),
?assertEqual("{\"a\":1}", emqx_plugin_libs_rule:str(#{a => 1})),
?assertEqual("1", emqx_plugin_libs_rule:str(1)),
?assertEqual("2.0", emqx_plugin_libs_rule:str(2.0)),
?assertEqual("true", emqx_plugin_libs_rule:str(true)),
?assertError(_, emqx_plugin_libs_rule:str({a, v})).
t_bin(_) ->
?assertEqual(<<"abc">>, emqx_plugin_libs_rule:bin("abc")),
?assertEqual(<<"abc">>, emqx_plugin_libs_rule:bin(abc)),
?assertEqual(<<"{\"a\":1}">>, emqx_plugin_libs_rule:bin(#{a => 1})),
?assertEqual(<<"[{\"a\":1}]">>, emqx_plugin_libs_rule:bin([#{a => 1}])),
?assertEqual(<<"1">>, emqx_plugin_libs_rule:bin(1)),
?assertEqual(<<"2.0">>, emqx_plugin_libs_rule:bin(2.0)),
?assertEqual(<<"true">>, emqx_plugin_libs_rule:bin(true)),
?assertError(_, emqx_plugin_libs_rule:bin({a, v})).
t_atom_key(_) ->
_ = erlang,
_ = port,
?assertEqual([erlang], emqx_plugin_libs_rule:atom_key([<<"erlang">>])),
?assertEqual([erlang, port], emqx_plugin_libs_rule:atom_key([<<"erlang">>, port])),
?assertEqual([erlang, port], emqx_plugin_libs_rule:atom_key([<<"erlang">>, <<"port">>])),
?assertEqual(erlang, emqx_plugin_libs_rule:atom_key(<<"erlang">>)),
?assertError({invalid_key, {a, v}}, emqx_plugin_libs_rule:atom_key({a, v})),
_ = xyz876gv123,
?assertEqual([xyz876gv123, port], emqx_plugin_libs_rule:atom_key([<<"xyz876gv123">>, port])).
t_unsafe_atom_key(_) ->
?assertEqual([xyz876gv], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv">>])),
?assertEqual(
[xyz876gv33, port],
emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv33">>, port])
),
?assertEqual(
[xyz876gv331, port1221],
emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv331">>, <<"port1221">>])
),
?assertEqual(xyz876gv3312, emqx_plugin_libs_rule:unsafe_atom_key(<<"xyz876gv3312">>)).

View File

@ -18,7 +18,6 @@
-behaviour(minirest_api).
-include("emqx_prometheus.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [ref/2]).
@ -128,7 +127,8 @@ prometheus_config_example() ->
prometheus_data_schema() ->
#{
description => <<"Get Prometheus Data">>,
description =>
<<"Get Prometheus Data. Note that support for JSON output is deprecated and will be removed in v5.2.">>,
content =>
#{
'application/json' =>

View File

@ -22,7 +22,7 @@
-type resource_state() :: term().
-type resource_status() :: connected | disconnected | connecting | stopped.
-type callback_mode() :: always_sync | async_if_possible.
-type query_mode() :: async | sync | dynamic.
-type query_mode() :: async | sync | simple_async | simple_sync | dynamic.
-type result() :: term().
-type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined.
-type query_opts() :: #{

View File

@ -100,7 +100,8 @@
call_health_check/3,
%% stop the instance
call_stop/3,
is_buffer_supported/1
%% get the query mode of the resource
query_mode/3
]).
%% list all the instances, id only.
@ -132,7 +133,7 @@
on_query_async/4,
on_batch_query_async/4,
on_get_status/2,
is_buffer_supported/0
query_mode/1
]).
%% when calling emqx_resource:start/1
@ -173,7 +174,8 @@
| {resource_status(), resource_state()}
| {resource_status(), resource_state(), term()}.
-callback is_buffer_supported() -> boolean().
-callback query_mode(Config :: term()) ->
simple_sync | simple_async | sync | async | no_queries.
-spec list_types() -> [module()].
list_types() ->
@ -276,16 +278,20 @@ query(ResId, Request) ->
Result :: term().
query(ResId, Request, Opts) ->
case emqx_resource_manager:lookup_cached(ResId) of
{ok, _Group, #{query_mode := QM, mod := Module}} ->
IsBufferSupported = is_buffer_supported(Module),
case {IsBufferSupported, QM} of
{true, _} ->
%% only Kafka producer so far
{ok, _Group, #{query_mode := QM}} ->
case QM of
simple_async ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
Opts1 = Opts#{is_buffer_supported => true},
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
{false, sync} ->
simple_sync ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_query(ResId, Request);
sync ->
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
{false, async} ->
async ->
emqx_resource_buffer_worker:async_query(ResId, Request, Opts)
end;
{error, not_found} ->
@ -367,15 +373,6 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
get_callback_mode(Mod) ->
Mod:callback_mode().
-spec is_buffer_supported(module()) -> boolean().
is_buffer_supported(Module) ->
try
Module:is_buffer_supported()
catch
_:_ ->
false
end.
-spec call_start(resource_id(), module(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}.
call_start(ResId, Mod, Config) ->
@ -416,6 +413,17 @@ call_stop(ResId, Mod, ResourceState) ->
Res
end).
-spec query_mode(module(), term(), creation_opts()) ->
simple_sync | simple_async | sync | async | no_queries.
query_mode(Mod, Config, Opts) ->
case erlang:function_exported(Mod, query_mode, 1) of
true ->
Mod:query_mode(Config);
false ->
maps:get(query_mode, Opts, sync)
end.
-spec check_config(resource_type(), raw_resource_config()) ->
{ok, resource_config()} | {error, term()}.
check_config(ResourceType, Conf) ->

View File

@ -144,12 +144,18 @@ create(ResId, Group, ResourceType, Config, Opts) ->
],
[matched]
),
case emqx_resource:is_buffer_supported(ResourceType) of
true ->
%% the resource it self supports
%% buffer, so there is no need for resource workers
QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts),
case QueryMode of
%% the resource has built-in buffer, so there is no need for resource workers
simple_sync ->
ok;
false ->
simple_async ->
ok;
%% The resource is a consumer resource, so there is no need for resource workers
no_queries ->
ok;
_ ->
%% start resource workers as the query type requires them
ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts),
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
true ->
@ -288,16 +294,17 @@ health_check(ResId) ->
%% @doc Function called from the supervisor to actually start the server
start_link(ResId, Group, ResourceType, Config, Opts) ->
QueryMode = emqx_resource:query_mode(
ResourceType,
Config,
Opts
),
Data = #data{
id = ResId,
group = Group,
mod = ResourceType,
callback_mode = emqx_resource:get_callback_mode(ResourceType),
%% query_mode = dynamic | sync | async
%% TODO:
%% dynamic mode is async mode when things are going well, but becomes sync mode
%% if the resource worker is overloaded
query_mode = maps:get(query_mode, Opts, sync),
query_mode = QueryMode,
config = Config,
opts = Opts,
state = undefined,

View File

@ -65,10 +65,10 @@ pre_process_action_args(
) ->
Args#{
preprocessed_tmpl => #{
topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
topic => emqx_placeholder:preproc_tmpl(Topic),
qos => preproc_vars(QoS),
retain => preproc_vars(Retain),
payload => emqx_plugin_libs_rule:preproc_tmpl(Payload),
payload => emqx_placeholder:preproc_tmpl(Payload),
user_properties => preproc_user_properties(UserProperties)
}
};
@ -110,7 +110,7 @@ republish(
}
}
) ->
Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
Topic = emqx_placeholder:proc_tmpl(TopicTks, Selected),
Payload = format_msg(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected, 0),
Retain = replace_simple_var(RetainTks, Selected, false),
@ -189,7 +189,7 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
emqx_metrics:inc_msg(Msg).
preproc_vars(Data) when is_binary(Data) ->
emqx_plugin_libs_rule:preproc_tmpl(Data);
emqx_placeholder:preproc_tmpl(Data);
preproc_vars(Data) ->
Data.
@ -201,13 +201,13 @@ preproc_user_properties(<<"${pub_props.'User-Property'}">>) ->
?ORIGINAL_USER_PROPERTIES;
preproc_user_properties(<<"${", _/binary>> = V) ->
%% use a variable
emqx_plugin_libs_rule:preproc_tmpl(V);
emqx_placeholder:preproc_tmpl(V);
preproc_user_properties(_) ->
%% invalid, discard
undefined.
replace_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
[Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
[Var] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
case Var of
%% cannot find the variable from Data
undefined -> Default;
@ -219,7 +219,7 @@ replace_simple_var(Val, _Data, _Default) ->
format_msg([], Selected) ->
emqx_utils_json:encode(Selected);
format_msg(Tokens, Selected) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected).
emqx_placeholder:proc_tmpl(Tokens, Selected).
format_pub_props(UserPropertiesTks, Selected, Env) ->
UserProperties =

View File

@ -212,7 +212,7 @@ get_rules_for_topic(Topic) ->
[
Rule
|| Rule = #{from := From} <- get_rules(),
emqx_plugin_libs_rule:can_topic_match_oneof(Topic, From)
emqx_topic:match_any(Topic, From)
].
-spec get_rules_with_same_event(Topic :: binary()) -> [rule()].

View File

@ -44,10 +44,13 @@
%% query callback
-export([qs2ms/2, run_fuzzy_match/2, format_rule_info_resp/1]).
-define(RPC_GET_METRICS_TIMEOUT, 5000).
-define(ERR_BADARGS(REASON), begin
R0 = err_msg(REASON),
<<"Bad Arguments: ", R0/binary>>
end).
-define(CHECK_PARAMS(PARAMS, TAG, EXPR),
case emqx_rule_api_schema:check_params(PARAMS, TAG) of
{ok, CheckedParams} ->
@ -56,6 +59,7 @@ end).
{400, #{code => 'BAD_REQUEST', message => ?ERR_BADARGS(REASON)}}
end
).
-define(METRICS(
MATCH,
PASS,
@ -87,6 +91,7 @@ end).
'matched.rate.last5m' => RATE_5
}
).
-define(metrics(
MATCH,
PASS,
@ -248,6 +253,7 @@ schema("/rules/:id") ->
summary => <<"Delete rule">>,
parameters => param_path_id(),
responses => #{
404 => error_schema('NOT_FOUND', "Rule not found"),
204 => <<"Delete rule successfully">>
}
}
@ -527,74 +533,77 @@ printable_function_name(Mod, Func) ->
list_to_binary(lists:concat([Mod, ":", Func])).
get_rule_metrics(Id) ->
Format = fun
(
Node,
#{
counters :=
#{
'matched' := Matched,
'passed' := Passed,
'failed' := Failed,
'failed.exception' := FailedEx,
'failed.no_result' := FailedNoRes,
'actions.total' := OTotal,
'actions.failed' := OFailed,
'actions.failed.out_of_service' := OFailedOOS,
'actions.failed.unknown' := OFailedUnknown,
'actions.success' := OFailedSucc
},
rate :=
#{
'matched' :=
#{current := Current, max := Max, last5m := Last5M}
}
}
) ->
#{
metrics => ?METRICS(
Matched,
Passed,
Failed,
FailedEx,
FailedNoRes,
OTotal,
OFailed,
OFailedOOS,
OFailedUnknown,
OFailedSucc,
Current,
Max,
Last5M
),
node => Node
};
(Node, _Metrics) ->
%% Empty metrics: can happen when a node joins another and a bridge is not yet
%% replicated to it, so the counters map is empty.
#{
metrics => ?METRICS(
_Matched = 0,
_Passed = 0,
_Failed = 0,
_FailedEx = 0,
_FailedNoRes = 0,
_OTotal = 0,
_OFailed = 0,
_OFailedOOS = 0,
_OFailedUnknown = 0,
_OFailedSucc = 0,
_Current = 0,
_Max = 0,
_Last5M = 0
),
node => Node
}
end,
[
Format(Node, emqx_plugin_libs_proto_v1:get_metrics(Node, rule_metrics, Id))
|| Node <- mria:running_nodes()
].
Nodes = emqx:running_nodes(),
Results = emqx_metrics_proto_v1:get_metrics(Nodes, rule_metrics, Id, ?RPC_GET_METRICS_TIMEOUT),
NodeResults = lists:zip(Nodes, Results),
NodeMetrics = [format_metrics(Node, Metrics) || {Node, {ok, Metrics}} <- NodeResults],
NodeErrors = [Result || Result = {_Node, {NOk, _}} <- NodeResults, NOk =/= ok],
NodeErrors == [] orelse
?SLOG(warning, #{
msg => "rpc_get_rule_metrics_errors",
errors => NodeErrors
}),
NodeMetrics.
format_metrics(Node, #{
counters :=
#{
'matched' := Matched,
'passed' := Passed,
'failed' := Failed,
'failed.exception' := FailedEx,
'failed.no_result' := FailedNoRes,
'actions.total' := OTotal,
'actions.failed' := OFailed,
'actions.failed.out_of_service' := OFailedOOS,
'actions.failed.unknown' := OFailedUnknown,
'actions.success' := OFailedSucc
},
rate :=
#{
'matched' :=
#{current := Current, max := Max, last5m := Last5M}
}
}) ->
#{
metrics => ?METRICS(
Matched,
Passed,
Failed,
FailedEx,
FailedNoRes,
OTotal,
OFailed,
OFailedOOS,
OFailedUnknown,
OFailedSucc,
Current,
Max,
Last5M
),
node => Node
};
format_metrics(Node, _Metrics) ->
%% Empty metrics: can happen when a node joins another and a bridge is not yet
%% replicated to it, so the counters map is empty.
#{
metrics => ?METRICS(
_Matched = 0,
_Passed = 0,
_Failed = 0,
_FailedEx = 0,
_FailedNoRes = 0,
_OTotal = 0,
_OFailed = 0,
_OFailedOOS = 0,
_OFailedUnknown = 0,
_OFailedSucc = 0,
_Current = 0,
_Max = 0,
_Last5M = 0
),
node => Node
}.
aggregate_metrics(AllMetrics) ->
InitMetrics = ?METRICS(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),

View File

@ -17,7 +17,6 @@
-module(emqx_rule_funcs).
-include("rule_engine.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-elvis([{elvis_style, god_modules, disable}]).
@ -266,8 +265,6 @@
]}
).
-define(is_var(X), is_binary(X)).
%% @doc "msgid()" Func
msgid() ->
fun
@ -631,29 +628,42 @@ do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"signed">>, <<"little">>) ->
%%------------------------------------------------------------------------------
str(Data) ->
emqx_plugin_libs_rule:bin(Data).
emqx_utils_conv:bin(Data).
str_utf8(Data) when is_binary(Data); is_list(Data) ->
unicode:characters_to_binary(Data);
str_utf8(Data) ->
emqx_plugin_libs_rule:utf8_bin(Data).
unicode:characters_to_binary(str(Data)).
bool(Data) ->
emqx_plugin_libs_rule:bool(Data).
emqx_utils_conv:bool(Data).
int(Data) ->
emqx_plugin_libs_rule:int(Data).
emqx_utils_conv:int(Data).
float(Data) ->
emqx_plugin_libs_rule:float(Data).
emqx_utils_conv:float(Data).
float(Data, Decimals) when Decimals > 0 ->
Data1 = ?MODULE:float(Data),
Data1 = emqx_utils_conv:float(Data),
list_to_float(float_to_list(Data1, [{decimals, Decimals}])).
float2str(Float, Precision) ->
emqx_plugin_libs_rule:float2str(Float, Precision).
float_to_binary(Float, [{decimals, Precision}, compact]).
map(Bin) when is_binary(Bin) ->
case emqx_utils_json:decode(Bin) of
Map = #{} ->
Map;
_ ->
error(badarg, [Bin])
end;
map(List) when is_list(List) ->
maps:from_list(List);
map(Map = #{}) ->
Map;
map(Data) ->
emqx_plugin_libs_rule:map(Data).
error(badarg, [Data]).
bin2hexstr(Bin) when is_binary(Bin) ->
emqx_utils:bin_to_hexstr(Bin, upper).
@ -895,7 +905,7 @@ mget(Key, Map, Default) ->
Val;
error when is_atom(Key) ->
%% the map may have an equivalent binary-form key
BinKey = emqx_plugin_libs_rule:bin(Key),
BinKey = emqx_utils_conv:bin(Key),
case maps:find(BinKey, Map) of
{ok, Val} -> Val;
error -> Default
@ -922,7 +932,7 @@ mput(Key, Val, Map) ->
maps:put(Key, Val, Map);
error when is_atom(Key) ->
%% the map may have an equivalent binary-form key
BinKey = emqx_plugin_libs_rule:bin(Key),
BinKey = emqx_utils_conv:bin(Key),
case maps:find(BinKey, Map) of
{ok, _} -> maps:put(BinKey, Val, Map);
error -> maps:put(Key, Val, Map)
@ -1053,7 +1063,7 @@ unix_ts_to_rfc3339(Epoch) ->
unix_ts_to_rfc3339(Epoch, <<"second">>).
unix_ts_to_rfc3339(Epoch, Unit) when is_integer(Epoch) ->
emqx_plugin_libs_rule:bin(
emqx_utils_conv:bin(
calendar:system_time_to_rfc3339(
Epoch, [{unit, time_unit(Unit)}]
)
@ -1090,7 +1100,7 @@ format_date(TimeUnit, Offset, FormatString) ->
format_date(TimeUnit, Offset, FormatString, TimeEpoch) ->
Unit = time_unit(TimeUnit),
emqx_plugin_libs_rule:bin(
emqx_utils_conv:bin(
lists:concat(
emqx_calendar:format(TimeEpoch, Unit, Offset, FormatString)
)

View File

@ -97,7 +97,7 @@ general_find({key, Key}, Map, _OrgData, Handler) when is_map(Map) ->
Handler({found, {{key, Key}, Val}});
error when is_atom(Key) ->
%% the map may have an equivalent binary-form key
BinKey = emqx_plugin_libs_rule:bin(Key),
BinKey = atom_to_binary(Key),
case maps:find(BinKey, Map) of
{ok, Val} -> Handler({equivalent, {{key, BinKey}, Val}});
error -> Handler(not_found)

View File

@ -14,7 +14,6 @@
-module(emqx_rule_sqltester).
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
-export([
@ -31,7 +30,7 @@ test(#{sql := Sql, context := Context}) ->
case lists:all(fun is_publish_topic/1, EventTopics) of
true ->
%% test if the topic matches the topic filters in the rule
case emqx_plugin_libs_rule:can_topic_match_oneof(InTopic, EventTopics) of
case emqx_topic:match_any(InTopic, EventTopics) of
true -> test_rule(Sql, Select, Context, EventTopics);
false -> {error, nomatch}
end;

View File

@ -126,7 +126,7 @@ t_int(_) ->
?assertEqual(1, emqx_rule_funcs:int(1.0001)),
?assertEqual(1, emqx_rule_funcs:int(true)),
?assertEqual(0, emqx_rule_funcs:int(false)),
?assertError({invalid_number, {a, v}}, emqx_rule_funcs:int({a, v})),
?assertError(badarg, emqx_rule_funcs:int({a, v})),
?assertError(_, emqx_rule_funcs:int("a")).
t_float(_) ->
@ -137,7 +137,7 @@ t_float(_) ->
?assertEqual(1.9, emqx_rule_funcs:float(1.9)),
?assertEqual(1.0001, emqx_rule_funcs:float(1.0001)),
?assertEqual(1.0000000001, emqx_rule_funcs:float(1.0000000001)),
?assertError({invalid_number, {a, v}}, emqx_rule_funcs:float({a, v})),
?assertError(badarg, emqx_rule_funcs:float({a, v})),
?assertError(_, emqx_rule_funcs:float("a")).
t_map(_) ->
@ -158,7 +158,7 @@ t_bool(_) ->
?assertEqual(true, emqx_rule_funcs:bool(<<"true">>)),
?assertEqual(false, emqx_rule_funcs:bool(false)),
?assertEqual(false, emqx_rule_funcs:bool(<<"false">>)),
?assertError({invalid_boolean, _}, emqx_rule_funcs:bool(3)).
?assertError(badarg, emqx_rule_funcs:bool(3)).
t_proc_dict_put_get_del(_) ->
?assertEqual(undefined, emqx_rule_funcs:proc_dict_get(<<"abc">>)),

View File

@ -46,7 +46,7 @@
quote_mysql/1
]).
-include_lib("emqx/include/emqx_placeholder.hrl").
-define(PH_VAR_THIS, '$this').
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
@ -55,7 +55,7 @@
%% Space and CRLF
-define(EX_WITHE_CHARS, "\\s").
-type tmpl_token() :: list({var, binary()} | {str, binary()}).
-type tmpl_token() :: list({var, ?PH_VAR_THIS | [binary()]} | {str, binary()}).
-type tmpl_cmd() :: list(tmpl_token()).
@ -90,8 +90,6 @@
| {tmpl, tmpl_token()}
| {value, term()}.
-dialyzer({no_improper_lists, [quote_mysql/1, escape_mysql/4, escape_prepend/4]}).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
@ -112,7 +110,7 @@ proc_tmpl(Tokens, Data) ->
-spec proc_tmpl(tmpl_token(), map(), proc_tmpl_opts()) -> binary() | list().
proc_tmpl(Tokens, Data, Opts = #{return := full_binary}) ->
Trans = maps:get(var_trans, Opts, fun emqx_plugin_libs_rule:bin/1),
Trans = maps:get(var_trans, Opts, fun emqx_utils_conv:bin/1),
list_to_binary(
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Trans})
);
@ -123,11 +121,11 @@ proc_tmpl(Tokens, Data, Opts = #{return := rawlist}) ->
({str, Str}) ->
Str;
({var, Phld}) when is_function(Trans, 1) ->
Trans(get_phld_var(Phld, Data));
Trans(lookup_var(Phld, Data));
({var, Phld}) when is_function(Trans, 2) ->
Trans(Phld, get_phld_var(Phld, Data));
Trans(Phld, lookup_var(Phld, Data));
({var, Phld}) ->
get_phld_var(Phld, Data)
lookup_var(Phld, Data)
end,
Tokens
).
@ -243,34 +241,49 @@ sql_data(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
sql_data(Map) when is_map(Map) -> emqx_utils_json:encode(Map).
-spec bin(term()) -> binary().
bin(Val) -> emqx_plugin_libs_rule:bin(Val).
bin(Val) -> emqx_utils_conv:bin(Val).
-spec quote_sql(_Value) -> iolist().
quote_sql(Str) ->
quote_escape(Str, fun escape_sql/1).
emqx_utils_sql:to_sql_string(Str, #{escaping => sql}).
-spec quote_cql(_Value) -> iolist().
quote_cql(Str) ->
quote_escape(Str, fun escape_cql/1).
emqx_utils_sql:to_sql_string(Str, #{escaping => cql}).
-spec quote_mysql(_Value) -> iolist().
quote_mysql(Str) when is_binary(Str) ->
try
escape_mysql(Str)
catch
throw:invalid_utf8 ->
[<<"0x">> | binary:encode_hex(Str)]
end;
quote_mysql(Str) ->
quote_escape(Str, fun escape_mysql/1).
emqx_utils_sql:to_sql_string(Str, #{escaping => mysql}).
lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] ->
Value;
lookup_var([Prop | Rest], Data) ->
case lookup(Prop, Data) of
{ok, Value} ->
lookup_var(Rest, Value);
{error, _} ->
undefined
end.
lookup(Prop, Data) when is_binary(Prop) ->
case maps:get(Prop, Data, undefined) of
undefined ->
try
{ok, maps:get(binary_to_existing_atom(Prop, utf8), Data)}
catch
error:{badkey, _} ->
{error, undefined};
error:badarg ->
{error, undefined}
end;
Value ->
{ok, Value}
end.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
get_phld_var(Phld, Data) ->
emqx_rule_maps:nested_get(Phld, Data).
preproc_var_re(#{placeholders := PHs, strip_double_quote := true}) ->
Res = [ph_to_re(PH) || PH <- PHs],
QuoteRes = ["\"" ++ Re ++ "\"" || Re <- Res],
@ -340,66 +353,11 @@ parse_nested(<<".", R/binary>>) ->
parse_nested(R);
parse_nested(Attr) ->
case string:split(Attr, <<".">>, all) of
[<<>>] -> {var, ?PH_VAR_THIS};
[Attr] -> {var, Attr};
Nested -> {path, [{key, P} || P <- Nested]}
[<<>>] -> ?PH_VAR_THIS;
Nested -> Nested
end.
unwrap(<<"\"${", Val/binary>>, _StripDoubleQuote = true) ->
binary:part(Val, {0, byte_size(Val) - 2});
unwrap(<<"${", Val/binary>>, _StripDoubleQuote) ->
binary:part(Val, {0, byte_size(Val) - 1}).
-spec quote_escape(_Value, fun((binary()) -> iodata())) -> iodata().
quote_escape(Str, EscapeFun) when is_binary(Str) ->
EscapeFun(Str);
quote_escape(Str, EscapeFun) when is_list(Str) ->
case unicode:characters_to_binary(Str) of
Bin when is_binary(Bin) ->
EscapeFun(Bin);
Otherwise ->
error(Otherwise)
end;
quote_escape(Str, EscapeFun) when is_atom(Str) orelse is_map(Str) ->
EscapeFun(bin(Str));
quote_escape(Val, _EscapeFun) ->
bin(Val).
-spec escape_sql(binary()) -> iolist().
escape_sql(S) ->
ES = binary:replace(S, [<<"\\">>, <<"'">>], <<"\\">>, [global, {insert_replaced, 1}]),
[$', ES, $'].
-spec escape_cql(binary()) -> iolist().
escape_cql(S) ->
ES = binary:replace(S, <<"'">>, <<"'">>, [global, {insert_replaced, 1}]),
[$', ES, $'].
-spec escape_mysql(binary()) -> iolist().
escape_mysql(S0) ->
% https://dev.mysql.com/doc/refman/8.0/en/string-literals.html
[$', escape_mysql(S0, 0, 0, S0), $'].
%% NOTE
%% This thing looks more complicated than needed because it's optimized for as few
%% intermediate memory (re)allocations as possible.
escape_mysql(<<$', Rest/binary>>, I, Run, Src) ->
escape_prepend(I, Run, Src, [<<"\\'">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
escape_mysql(<<$\\, Rest/binary>>, I, Run, Src) ->
escape_prepend(I, Run, Src, [<<"\\\\">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
escape_mysql(<<0, Rest/binary>>, I, Run, Src) ->
escape_prepend(I, Run, Src, [<<"\\0">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
escape_mysql(<<_/utf8, Rest/binary>> = S, I, Run, Src) ->
CWidth = byte_size(S) - byte_size(Rest),
escape_mysql(Rest, I, Run + CWidth, Src);
escape_mysql(<<>>, 0, _, Src) ->
Src;
escape_mysql(<<>>, I, Run, Src) ->
binary:part(Src, I, Run);
escape_mysql(_, _I, _Run, _Src) ->
throw(invalid_utf8).
escape_prepend(_RunI, 0, _Src, Tail) ->
Tail;
escape_prepend(I, Run, Src, Tail) ->
[binary:part(Src, I, Run) | Tail].

View File

@ -0,0 +1,125 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_utils_conv).
-export([bin/1]).
-export([str/1]).
-export([bool/1]).
-export([int/1]).
-export([float/1]).
-compile({no_auto_import, [float/1]}).
-type scalar() :: binary() | number() | atom() | string().
-spec bin(Term) -> binary() when
Term :: scalar() | #{scalar() => Term} | [Term].
bin(Bin) when is_binary(Bin) -> Bin;
bin(Num) when is_number(Num) -> number_to_binary(Num);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
bin(Map) when is_map(Map) -> emqx_utils_json:encode(Map);
bin(List) when is_list(List) ->
case io_lib:printable_list(List) of
true -> list_to_binary(List);
false -> emqx_utils_json:encode(List)
end;
bin(Data) ->
error({invalid_bin, Data}).
-spec str(Term) -> string() when
Term :: scalar() | #{scalar() => Term} | [Term].
str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
str(Num) when is_number(Num) -> number_to_list(Num);
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
str(Map) when is_map(Map) -> binary_to_list(emqx_utils_json:encode(Map));
str(List) when is_list(List) ->
case io_lib:printable_list(List) of
true -> List;
false -> binary_to_list(emqx_utils_json:encode(List))
end;
str(Data) ->
error({invalid_str, Data}).
-spec number_to_binary(number()) -> binary().
number_to_binary(Int) when is_integer(Int) ->
integer_to_binary(Int);
number_to_binary(Float) when is_float(Float) ->
float_to_binary(Float, [{decimals, 10}, compact]).
-spec number_to_list(number()) -> string().
number_to_list(Int) when is_integer(Int) ->
integer_to_list(Int);
number_to_list(Float) when is_float(Float) ->
float_to_list(Float, [{decimals, 10}, compact]).
-spec bool(Term) -> boolean() when
Term :: boolean() | binary() | 0..1.
bool(true) -> true;
bool(<<"true">>) -> true;
bool(N) when N == 1 -> true;
bool(false) -> false;
bool(<<"false">>) -> false;
bool(N) when N == 0 -> false;
bool(Data) -> error(badarg, [Data]).
-spec int(Term) -> integer() when
Term :: binary() | string() | number() | boolean().
int(List) when is_list(List) ->
try
list_to_integer(List)
catch
error:badarg ->
int(list_to_float(List))
end;
int(Bin) when is_binary(Bin) ->
try
binary_to_integer(Bin)
catch
error:badarg ->
int(binary_to_float(Bin))
end;
int(Int) when is_integer(Int) ->
Int;
int(Float) when is_float(Float) ->
erlang:floor(Float);
int(true) ->
1;
int(false) ->
0;
int(Data) ->
error(badarg, [Data]).
-spec float(Term) -> float() when
Term :: binary() | string() | number().
float(List) when is_list(List) ->
try
list_to_float(List)
catch
error:badarg ->
float(list_to_integer(List))
end;
float(Bin) when is_binary(Bin) ->
try
binary_to_float(Bin)
catch
error:badarg ->
float(binary_to_integer(Bin))
end;
float(Num) when is_number(Num) ->
erlang:float(Num);
float(Data) ->
error(badarg, [Data]).

View File

@ -0,0 +1,157 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_utils_sql).
-export([get_statement_type/1]).
-export([parse_insert/1]).
-export([to_sql_value/1]).
-export([to_sql_string/2]).
-export([escape_sql/1]).
-export([escape_cql/1]).
-export([escape_mysql/1]).
-export_type([value/0]).
-type statement_type() :: select | insert | delete.
-type value() :: null | binary() | number() | boolean() | [value()].
-dialyzer({no_improper_lists, [escape_mysql/4, escape_prepend/4]}).
-spec get_statement_type(iodata()) -> statement_type() | {error, unknown}.
get_statement_type(Query) ->
KnownTypes = #{
<<"select">> => select,
<<"insert">> => insert,
<<"delete">> => delete
},
case re:run(Query, <<"^\\s*([a-zA-Z]+)">>, [{capture, all_but_first, binary}]) of
{match, [Token]} ->
maps:get(string:lowercase(Token), KnownTypes, {error, unknown});
_ ->
{error, unknown}
end.
%% @doc Parse an INSERT SQL statement into its INSERT part and the VALUES part.
%% SQL = <<"INSERT INTO \"abc\" (c1, c2, c3) VALUES (${a}, ${b}, ${c.prop})">>
%% {ok, {<<"INSERT INTO \"abc\" (c1, c2, c3)">>, <<"(${a}, ${b}, ${c.prop})">>}}
-spec parse_insert(iodata()) ->
{ok, {_Statement :: binary(), _Rows :: binary()}} | {error, not_insert_sql}.
parse_insert(SQL) ->
case re:split(SQL, "((?i)values)", [{return, binary}]) of
[Part1, _, Part3] ->
case string:trim(Part1, leading) of
<<"insert", _/binary>> = InsertSQL ->
{ok, {InsertSQL, Part3}};
<<"INSERT", _/binary>> = InsertSQL ->
{ok, {InsertSQL, Part3}};
_ ->
{error, not_insert_sql}
end;
_ ->
{error, not_insert_sql}
end.
%% @doc Convert an Erlang term to a value that can be used primarily in
%% prepared SQL statements.
-spec to_sql_value(term()) -> value().
to_sql_value(undefined) -> null;
to_sql_value(List) when is_list(List) -> List;
to_sql_value(Bin) when is_binary(Bin) -> Bin;
to_sql_value(Num) when is_number(Num) -> Num;
to_sql_value(Bool) when is_boolean(Bool) -> Bool;
to_sql_value(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
to_sql_value(Map) when is_map(Map) -> emqx_utils_json:encode(Map).
%% @doc Convert an Erlang term to a string that can be interpolated in literal
%% SQL statements. The value is escaped if necessary.
-spec to_sql_string(term(), Options) -> iodata() when
Options :: #{
escaping => cql | mysql | sql
}.
to_sql_string(String, #{escaping := mysql}) when is_binary(String) ->
try
escape_mysql(String)
catch
throw:invalid_utf8 ->
[<<"0x">>, binary:encode_hex(String)]
end;
to_sql_string(Term, #{escaping := mysql}) ->
maybe_escape(Term, fun escape_mysql/1);
to_sql_string(Term, #{escaping := cql}) ->
maybe_escape(Term, fun escape_cql/1);
to_sql_string(Term, #{}) ->
maybe_escape(Term, fun escape_sql/1).
-spec maybe_escape(_Value, fun((binary()) -> iodata())) -> iodata().
maybe_escape(Str, EscapeFun) when is_binary(Str) ->
EscapeFun(Str);
maybe_escape(Str, EscapeFun) when is_list(Str) ->
case unicode:characters_to_binary(Str) of
Bin when is_binary(Bin) ->
EscapeFun(Bin);
Otherwise ->
error(Otherwise)
end;
maybe_escape(Val, EscapeFun) when is_atom(Val) orelse is_map(Val) ->
EscapeFun(emqx_utils_conv:bin(Val));
maybe_escape(Val, _EscapeFun) ->
emqx_utils_conv:bin(Val).
-spec escape_sql(binary()) -> iodata().
escape_sql(S) ->
% NOTE
% This is a bit misleading: currently, escaping logic in `escape_sql/1` likely
% won't work with pgsql since it does not support C-style escapes by default.
% https://www.postgresql.org/docs/14/sql-syntax-lexical.html#SQL-SYNTAX-CONSTANTS
ES = binary:replace(S, [<<"\\">>, <<"'">>], <<"\\">>, [global, {insert_replaced, 1}]),
[$', ES, $'].
-spec escape_cql(binary()) -> iodata().
escape_cql(S) ->
ES = binary:replace(S, <<"'">>, <<"'">>, [global, {insert_replaced, 1}]),
[$', ES, $'].
-spec escape_mysql(binary()) -> iodata().
escape_mysql(S0) ->
% https://dev.mysql.com/doc/refman/8.0/en/string-literals.html
[$', escape_mysql(S0, 0, 0, S0), $'].
%% NOTE
%% This thing looks more complicated than needed because it's optimized for as few
%% intermediate memory (re)allocations as possible.
escape_mysql(<<$', Rest/binary>>, I, Run, Src) ->
escape_prepend(I, Run, Src, [<<"\\'">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
escape_mysql(<<$\\, Rest/binary>>, I, Run, Src) ->
escape_prepend(I, Run, Src, [<<"\\\\">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
escape_mysql(<<0, Rest/binary>>, I, Run, Src) ->
escape_prepend(I, Run, Src, [<<"\\0">> | escape_mysql(Rest, I + Run + 1, 0, Src)]);
escape_mysql(<<_/utf8, Rest/binary>> = S, I, Run, Src) ->
CWidth = byte_size(S) - byte_size(Rest),
escape_mysql(Rest, I, Run + CWidth, Src);
escape_mysql(<<>>, 0, _, Src) ->
Src;
escape_mysql(<<>>, I, Run, Src) ->
binary:part(Src, I, Run);
escape_mysql(_, _I, _Run, _Src) ->
throw(invalid_utf8).
escape_prepend(_RunI, 0, _Src, Tail) ->
Tail;
escape_prepend(I, Run, Src, Tail) ->
[binary:part(Src, I, Run) | Tail].

View File

@ -0,0 +1,44 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_utils_conv_tests).
-import(emqx_utils_conv, [bin/1, str/1]).
-include_lib("eunit/include/eunit.hrl").
bin_test_() ->
[
?_assertEqual(<<"abc">>, bin("abc")),
?_assertEqual(<<"abc">>, bin(abc)),
?_assertEqual(<<"{\"a\":1}">>, bin(#{a => 1})),
?_assertEqual(<<"[{\"a\":1}]">>, bin([#{a => 1}])),
?_assertEqual(<<"1">>, bin(1)),
?_assertEqual(<<"2.0">>, bin(2.0)),
?_assertEqual(<<"true">>, bin(true)),
?_assertError(_, bin({a, v}))
].
str_test_() ->
[
?_assertEqual("abc", str("abc")),
?_assertEqual("abc", str(abc)),
?_assertEqual("{\"a\":1}", str(#{a => 1})),
?_assertEqual("1", str(1)),
?_assertEqual("2.0", str(2.0)),
?_assertEqual("true", str(true)),
?_assertError(_, str({a, v}))
].

View File

@ -0,0 +1,3 @@
Fix issue when mqtt clients could not connect over TLS if the listener was configured to use TLS v1.3 only.
The problem was that TLS connection was trying to use options incompatible with TLS v1.3.

View File

@ -0,0 +1 @@
A query_mode parameter has been added to the Kafka producer bridge. This parameter allows you to specify if the bridge should use the asynchronous or synchronous mode when sending data to Kafka. The default is asynchronous mode.

View File

@ -0,0 +1,3 @@
Fixed error message formatting in rebalance API: previously they could be displayed as unclear dumps of internal Erlang structures.
Added `wait_health_check` option to node evacuation CLI and API. This is a time interval when the node reports "unhealthy status" without beginning actual evacuation. We need this to allow a Load Balancer (if any) to remove the evacuated node from balancing and not forward (re)connecting clients to the evacuated node.

View File

@ -0,0 +1 @@
The ClickHouse bridge had a problem that could cause messages to be dropped when the ClickHouse server is closed while sending messages even when the request_ttl is set to infinity. This has been fixed by treating errors due to a closed connection as recoverable errors.

View File

@ -225,9 +225,9 @@ start_producer(
msg => "hstreamdb connector: producer started"
}),
EnableBatch = maps:get(enable_batch, Options, false),
Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin),
Payload = emqx_placeholder:preproc_tmpl(PayloadBin),
OrderingKeyBin = maps:get(ordering_key, Options, <<"">>),
OrderingKey = emqx_plugin_libs_rule:preproc_tmpl(OrderingKeyBin),
OrderingKey = emqx_placeholder:preproc_tmpl(OrderingKeyBin),
State = #{
client => Client,
producer => Producer,
@ -254,8 +254,8 @@ start_producer(
end.
to_record(OrderingKeyTmpl, PayloadTmpl, Data) ->
OrderingKey = emqx_plugin_libs_rule:proc_tmpl(OrderingKeyTmpl, Data),
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTmpl, Data),
OrderingKey = emqx_placeholder:proc_tmpl(OrderingKeyTmpl, Data),
Payload = emqx_placeholder:proc_tmpl(PayloadTmpl, Data),
to_record(OrderingKey, Payload).
to_record(OrderingKey, Payload) when is_binary(OrderingKey) ->

View File

@ -15,7 +15,6 @@
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
@ -28,8 +27,6 @@
callback_mode() -> emqx_connector_mongo:callback_mode().
is_buffer_supported() -> false.
on_start(InstanceId, Config) ->
case emqx_connector_mongo:on_start(InstanceId, Config) of
{ok, ConnectorState} ->
@ -57,7 +54,7 @@ on_query(InstanceId, {send_message, Message0}, State) ->
connector_state := ConnectorState
} = State,
NewConnectorState = ConnectorState#{
collection => emqx_plugin_libs_rule:proc_tmpl(CollectionTemplate, Message0)
collection => emqx_placeholder:proc_tmpl(CollectionTemplate, Message0)
},
Message = render_message(PayloadTemplate, Message0),
Res = emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, NewConnectorState),
@ -76,7 +73,7 @@ on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) ->
preprocess_template(undefined = _PayloadTemplate) ->
undefined;
preprocess_template(PayloadTemplate) ->
emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate).
emqx_placeholder:preproc_tmpl(PayloadTemplate).
render_message(undefined = _PayloadTemplate, Message) ->
Message;
@ -102,14 +99,14 @@ format_data(PayloadTks, Msg) ->
case maps:size(PreparedTupleMap) of
% If no tuples were found simply proceed with the json decoding and be done with it
0 ->
emqx_utils_json:decode(emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Msg), [return_maps]);
emqx_utils_json:decode(emqx_placeholder:proc_tmpl(PayloadTks, Msg), [return_maps]);
_ ->
% If tuples were found, replace the tuple values with the references created, run
% the modified message through the json parser, and then at the end replace the
% references with the actual tuple values.
ProcessedMessage = replace_message_values_with_references(Msg, PreparedTupleMap),
DecodedMap = emqx_utils_json:decode(
emqx_plugin_libs_rule:proc_tmpl(PayloadTks, ProcessedMessage), [return_maps]
emqx_placeholder:proc_tmpl(PayloadTks, ProcessedMessage), [return_maps]
),
populate_map_with_tuple_values(PreparedTupleMap, DecodedMap)
end.

View File

@ -128,13 +128,13 @@ process_batch_data(BatchData, CommandTemplate) ->
proc_command_template(CommandTemplate, Msg) ->
lists:map(
fun(ArgTks) ->
emqx_plugin_libs_rule:proc_tmpl(ArgTks, Msg, #{return => full_binary})
emqx_placeholder:proc_tmpl(ArgTks, Msg, #{return => full_binary})
end,
CommandTemplate
).
preproc_command_template(CommandTemplate) ->
lists:map(
fun emqx_plugin_libs_rule:preproc_tmpl/1,
fun emqx_placeholder:preproc_tmpl/1,
CommandTemplate
).

View File

@ -352,7 +352,6 @@ defmodule EMQXUmbrella.MixProject do
[
mnesia: :load,
ekka: :load,
emqx_plugin_libs: :load,
esasl: :load,
observer_cli: :permanent,
tools: :permanent,

View File

@ -409,7 +409,6 @@ relx_apps(ReleaseType, Edition) ->
[
{mnesia, load},
{ekka, load},
{emqx_plugin_libs, load},
{esasl, load},
observer_cli,
tools,

View File

@ -178,7 +178,7 @@ consumer_offset_reset_policy.label:
partition_count_refresh_interval.desc:
"""The time interval for Kafka producer to discover increased number of partitions.
After the number of partitions is increased in Kafka, EMQX will start taking the
After the number of partitions is increased in Kafka, EMQX will start taking the
discovered partitions into account when dispatching messages per <code>partition_strategy</code>."""
partition_count_refresh_interval.label:
@ -370,4 +370,16 @@ compression.desc:
compression.label:
"""Compression"""
query_mode.desc:
"""Query mode. Optional 'sync/async', default 'async'."""
query_mode.label:
"""Query mode"""
sync_query_timeout.desc:
"""This parameter defines the timeout limit for synchronous queries. It applies only when the bridge query mode is configured to 'sync'."""
sync_query_timeout.label:
"""Synchronous Query Timeout"""
}

View File

@ -18,6 +18,9 @@ store_segment_timeout.desc:
"""Timeout for storing a file segment.<br/>
After reaching the timeout (e.g. due to system overloaded), the PUBACK message will contain error code (0x80)."""
backend_enable.desc:
"""Whether to enable this backend."""
storage_backend.desc:
"""Storage settings for file transfer."""

View File

@ -49,7 +49,7 @@ param_node.label:
"""Node name"""
wait_health_check.desc:
"""Time to wait before starting the rebalance process, in seconds"""
"""Time to wait before starting the rebalance/evacuation process, in seconds"""
wait_health_check.label:
"""Wait health check"""

View File

@ -92,7 +92,8 @@ mqtt_max_topic_alias.label:
"""Max Topic Alias"""
common_ssl_opts_schema_user_lookup_fun.desc:
"""EMQX-internal callback that is used to lookup pre-shared key (PSK) identity."""
"""EMQX-internal callback that is used to lookup pre-shared key (PSK) identity.</br>
Has no effect when TLS version is configured (or negotiated) to 1.3"""
common_ssl_opts_schema_user_lookup_fun.label:
"""SSL PSK user lookup fun"""
@ -1240,7 +1241,8 @@ The SSL application already takes measures to counter-act such attempts,
but client-initiated renegotiation can be strictly disabled by setting this option to false.
The default value is true. Note that disabling renegotiation can result in
long-lived connections becoming unusable due to limits on
the number of messages the underlying cipher suite can encipher."""
the number of messages the underlying cipher suite can encipher.</br>
Has no effect when TLS version is configured (or negotiated) to 1.3"""
server_ssl_opts_schema_client_renegotiation.label:
"""SSL client renegotiation"""
@ -1326,7 +1328,8 @@ common_ssl_opts_schema_secure_renegotiate.desc:
"""SSL parameter renegotiation is a feature that allows a client and a server
to renegotiate the parameters of the SSL connection on the fly.
RFC 5746 defines a more secure way of doing this. By enabling secure renegotiation,
you drop support for the insecure renegotiation, prone to MitM attacks."""
you drop support for the insecure renegotiation, prone to MitM attacks.</br>
Has no effect when TLS version is configured (or negotiated) to 1.3"""
common_ssl_opts_schema_secure_renegotiate.label:
"""SSL renegotiate"""
@ -1361,7 +1364,8 @@ mqtt_max_packet_size.label:
"""Max Packet Size"""
common_ssl_opts_schema_reuse_sessions.desc:
"""Enable TLS session reuse."""
"""Enable TLS session reuse.</br>
Has no effect when TLS version is configured (or negotiated) to 1.3"""
common_ssl_opts_schema_reuse_sessions.label:
"""TLS session reuse"""