style: reformat emqx_machine, emqx_plugin_libs, and emqx_statsd

This commit is contained in:
Zaiming (Stone) Shi 2022-04-23 09:55:50 +02:00
parent c32fc33c1a
commit b445182335
33 changed files with 1080 additions and 770 deletions

View File

@ -1,4 +1,5 @@
%% -*- mode: erlang -*-
{deps, [ {emqx, {path, "../emqx"}}
]}.
{deps, [{emqx, {path, "../emqx"}}]}.
{project_plugins, [erlfmt]}.

View File

@ -23,13 +23,14 @@
-export([run/0]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
%% 5 minutes
%% -define(DEFAULT_INTERVAL, 300000).
@ -38,14 +39,14 @@
%% APIs
%%--------------------------------------------------------------------
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec(run() -> {ok, timer:time()}).
-spec run() -> {ok, timer:time()}.
run() -> gen_server:call(?MODULE, run, infinity).
-spec(stop() -> ok).
-spec stop() -> ok.
stop() -> gen_server:stop(?MODULE).
%%--------------------------------------------------------------------
@ -58,7 +59,6 @@ init([]) ->
handle_call(run, _From, State) ->
{Time, ok} = timer:tc(fun run_gc/0),
{reply, {ok, Time div 1000}, State, hibernate};
handle_call(_Req, _From, State) ->
{reply, ignored, State}.
@ -68,7 +68,6 @@ handle_cast(_Msg, State) ->
handle_info({timeout, TRef, run}, State = #{timer := TRef}) ->
ok = run_gc(),
{noreply, ensure_timer(State), hibernate};
handle_info(_Info, State) ->
{noreply, State}.
@ -84,7 +83,8 @@ code_change(_OldVsn, State, _Extra) ->
ensure_timer(State) ->
case application:get_env(emqx_machine, global_gc_interval) of
undefined -> State;
undefined ->
State;
{ok, Interval} ->
TRef = emqx_misc:start_timer(Interval, run),
State#{timer := TRef}

View File

@ -1,16 +1,18 @@
%% -*- mode: erlang -*-
{application, emqx_machine,
[{id, "emqx_machine"},
{application, emqx_machine, [
{id, "emqx_machine"},
{description, "The EMQX Machine"},
{vsn, "0.1.0"}, % strict semver, bump manually!
% strict semver, bump manually!
{vsn, "0.1.0"},
{modules, []},
{registered, []},
{applications, [kernel,stdlib]},
{mod, {emqx_machine_app,[]}},
{applications, [kernel, stdlib]},
{mod, {emqx_machine_app, []}},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQX Team <contact@emqx.io>"]},
{links, [{"Homepage", "https://emqx.io/"},
{links, [
{"Homepage", "https://emqx.io/"},
{"Github", "https://github.com/emqx/emqx"}
]}
]}.

View File

@ -16,23 +16,26 @@
-module(emqx_machine).
-export([ start/0
, graceful_shutdown/0
, is_ready/0
-export([
start/0,
graceful_shutdown/0,
is_ready/0,
, node_status/0
, update_vips/0
]).
node_status/0,
update_vips/0
]).
-include_lib("emqx/include/logger.hrl").
%% @doc EMQX boot entrypoint.
start() ->
case os:type() of
{win32, nt} -> ok;
{win32, nt} ->
ok;
_Nix ->
os:set_signal(sighup, ignore),
os:set_signal(sigterm, handle) %% default is handle
%% default is handle
os:set_signal(sigterm, handle)
end,
ok = set_backtrace_depth(),
start_sysmon(),
@ -56,9 +59,12 @@ is_ready() ->
print_otp_version_warning() -> ok.
-else.
print_otp_version_warning() ->
?ULOG("WARNING: Running on Erlang/OTP version ~p. Recommended: 23~n",
[?OTP_RELEASE]).
-endif. % OTP_RELEASE > 22
?ULOG(
"WARNING: Running on Erlang/OTP version ~p. Recommended: 23~n",
[?OTP_RELEASE]
).
% OTP_RELEASE > 22
-endif.
start_sysmon() ->
_ = application:load(system_monitor),
@ -76,8 +82,9 @@ start_sysmon() ->
end.
node_status() ->
emqx_json:encode(#{ backend => mria_rlog:backend()
, role => mria_rlog:role()
emqx_json:encode(#{
backend => mria_rlog:backend(),
role => mria_rlog:role()
}).
update_vips() ->
@ -90,4 +97,5 @@ configure_shard_transports() ->
ShardName = binary_to_existing_atom(ShardBin),
mria_config:set_shard_transport(ShardName, Transport)
end,
ShardTransports).
ShardTransports
).

View File

@ -16,9 +16,10 @@
-module(emqx_machine_app).
-export([ start/2
, stop/1
]).
-export([
start/2,
stop/1
]).
-behaviour(application).

View File

@ -37,16 +37,18 @@ post_boot() ->
-ifdef(TEST).
print_vsn() -> ok.
-else. % TEST
% TEST
-else.
print_vsn() ->
?ULOG("~ts ~ts is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]).
-endif. % TEST
% TEST
-endif.
start_autocluster() ->
ekka:callback(stop, fun emqx_machine_boot:stop_apps/0),
ekka:callback(start, fun emqx_machine_boot:ensure_apps_started/0),
_ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec
%% returns 'ok' or a pid or 'any()' as in spec
_ = ekka:autocluster(emqx),
ok.
stop_apps() ->
@ -59,11 +61,13 @@ stop_one_app(App) ->
try
_ = application:stop(App)
catch
C : E ->
?SLOG(error, #{msg => "failed_to_stop_app",
C:E ->
?SLOG(error, #{
msg => "failed_to_stop_app",
app => App,
exception => C,
reason => E})
reason => E
})
end.
ensure_apps_started() ->
@ -121,16 +125,21 @@ sorted_reboot_apps(Apps) ->
%% Isolated apps without which are not dependency of any other apps are
%% put to the end of the list in the original order.
add_apps_to_digraph(G, Apps) ->
lists:foldl(fun
lists:foldl(
fun
({App, undefined}, Acc) ->
?SLOG(debug, #{msg => "app_is_not_loaded", app => App}),
Acc;
({App, []}, Acc) ->
Acc ++ [App]; %% use '++' to keep the original order
%% use '++' to keep the original order
Acc ++ [App];
({App, Deps}, Acc) ->
add_app_deps_to_digraph(G, App, Deps),
Acc
end, [], Apps).
end,
[],
Apps
).
add_app_deps_to_digraph(G, App, undefined) ->
?SLOG(debug, #{msg => "app_is_not_loaded", app => App}),
@ -141,14 +150,17 @@ add_app_deps_to_digraph(_G, _App, []) ->
add_app_deps_to_digraph(G, App, [Dep | Deps]) ->
digraph:add_vertex(G, App),
digraph:add_vertex(G, Dep),
digraph:add_edge(G, Dep, App), %% dep -> app as dependency
%% dep -> app as dependency
digraph:add_edge(G, Dep, App),
add_app_deps_to_digraph(G, App, Deps).
find_loops(G) ->
lists:filtermap(
fun (App) ->
fun(App) ->
case digraph:get_short_cycle(G, App) of
false -> false;
Apps -> {true, Apps}
end
end, digraph:vertices(G)).
end,
digraph:vertices(G)
).

View File

@ -20,9 +20,16 @@
%% perform graceful shutdown.
-module(emqx_machine_signal_handler).
-export([start/0, init/1, format_status/2,
handle_event/2, handle_call/2, handle_info/2,
terminate/2, code_change/3]).
-export([
start/0,
init/1,
format_status/2,
handle_event/2,
handle_call/2,
handle_info/2,
terminate/2,
code_change/3
]).
-include_lib("emqx/include/logger.hrl").
@ -30,7 +37,8 @@ start() ->
ok = gen_event:swap_sup_handler(
erl_signal_server,
{erl_signal_handler, []},
{?MODULE, []}).
{?MODULE, []}
).
init({[], _}) -> {ok, #{}}.
@ -53,7 +61,7 @@ handle_info(_Other, State) ->
handle_call(_Request, State) ->
{ok, ok, State}.
format_status(_Opt, [_Pdict,_S]) ->
format_status(_Opt, [_Pdict, _S]) ->
ok.
code_change(_OldVsn, State, _Extra) ->

View File

@ -20,8 +20,7 @@
-behaviour(supervisor).
-export([ start_link/0
]).
-export([start_link/0]).
-export([init/1]).
@ -33,7 +32,8 @@ init([]) ->
BootApps = child_worker(emqx_machine_boot, post_boot, [], temporary),
GlobalGC = child_worker(emqx_global_gc, [], permanent),
Children = [Terminator, BootApps, GlobalGC],
SupFlags = #{strategy => one_for_one,
SupFlags = #{
strategy => one_for_one,
intensity => 100,
period => 10
},
@ -43,7 +43,8 @@ child_worker(M, Args, Restart) ->
child_worker(M, start_link, Args, Restart).
child_worker(M, Func, Args, Restart) ->
#{id => M,
#{
id => M,
start => {M, Func, Args},
restart => Restart,
shutdown => 5000,

View File

@ -18,15 +18,22 @@
-behaviour(gen_server).
-export([ start_link/0
, graceful/0
, graceful_wait/0
, is_running/0
]).
-export([
start_link/0,
graceful/0,
graceful_wait/0,
is_running/0
]).
-export([init/1, format_status/2,
handle_cast/2, handle_call/3, handle_info/2,
terminate/2, code_change/3]).
-export([
init/1,
format_status/2,
handle_cast/2,
handle_call/3,
handle_info/2,
terminate/2,
code_change/3
]).
-include_lib("emqx/include/logger.hrl").
@ -47,7 +54,7 @@ graceful() ->
try
_ = gen_server:call(?TERMINATOR, ?DO_IT, infinity)
catch
_ : _ ->
_:_ ->
%% failed to notify terminator, probably due to not started yet
%% or node is going down, either case, the caller
%% should issue a shutdown to be sure
@ -82,9 +89,10 @@ handle_call(?DO_IT, _From, State) ->
try
emqx_machine_boot:stop_apps()
catch
C : E : St ->
C:E:St ->
Apps = [element(1, A) || A <- application:which_applications()],
?SLOG(error, #{msg => "failed_to_stop_apps",
?SLOG(error, #{
msg => "failed_to_stop_apps",
exception => C,
reason => E,
stacktrace => St,
@ -97,7 +105,7 @@ handle_call(?DO_IT, _From, State) ->
handle_call(_Call, _From, State) ->
{noreply, State}.
format_status(_Opt, [_Pdict,_S]) ->
format_status(_Opt, [_Pdict, _S]) ->
ok.
code_change(_OldVsn, State, _Extra) ->

View File

@ -77,27 +77,35 @@ limit_warning(MF, Args) ->
max_args_warning(MF, Args) ->
ArgsSize = erts_debug:flat_size(Args),
case ArgsSize < ?MAX_ARGS_SIZE of
true -> ok;
true ->
ok;
false ->
warning("[WARNING] current_args_size:~w, max_args_size:~w", [ArgsSize, ?MAX_ARGS_SIZE]),
?SLOG(warning, #{msg => "execute_function_in_shell_max_args_size",
?SLOG(warning, #{
msg => "execute_function_in_shell_max_args_size",
function => MF,
args => Args,
args_size => ArgsSize,
max_heap_size => ?MAX_ARGS_SIZE})
max_heap_size => ?MAX_ARGS_SIZE
})
end.
max_heap_size_warning(MF, Args) ->
{heap_size, HeapSize} = erlang:process_info(self(), heap_size),
case HeapSize < ?MAX_HEAP_SIZE of
true -> ok;
true ->
ok;
false ->
warning("[WARNING] current_heap_size:~w, max_heap_size_warning:~w", [HeapSize, ?MAX_HEAP_SIZE]),
?SLOG(warning, #{msg => "shell_process_exceed_max_heap_size",
warning("[WARNING] current_heap_size:~w, max_heap_size_warning:~w", [
HeapSize, ?MAX_HEAP_SIZE
]),
?SLOG(warning, #{
msg => "shell_process_exceed_max_heap_size",
current_heap_size => HeapSize,
function => MF,
args => Args,
max_heap_size => ?MAX_HEAP_SIZE})
max_heap_size => ?MAX_HEAP_SIZE
})
end.
log(prohibited, MF, Args) ->
@ -105,8 +113,11 @@ log(prohibited, MF, Args) ->
?SLOG(error, #{msg => "execute_function_in_shell_prohibited", function => MF, args => Args});
log(exempted, MF, Args) ->
limit_warning(MF, Args),
?SLOG(error, #{msg => "execute_dangerous_function_in_shell_exempted", function => MF, args => Args});
log(ignore, MF, Args) -> limit_warning(MF, Args).
?SLOG(error, #{
msg => "execute_dangerous_function_in_shell_exempted", function => MF, args => Args
});
log(ignore, MF, Args) ->
limit_warning(MF, Args).
warning(Format, Args) ->
io:format(?RED_BG ++ Format ++ ?RESET ++ "~n", Args).

View File

@ -43,22 +43,23 @@ init_per_suite(Config) ->
%%
application:unload(emqx_authz),
emqx_common_test_helpers:start_apps([emqx_conf]),
application:set_env(emqx_machine, applications, [ emqx_prometheus
, emqx_modules
, emqx_dashboard
, emqx_connector
, emqx_gateway
, emqx_statsd
, emqx_resource
, emqx_rule_engine
, emqx_bridge
, emqx_plugin_libs
, emqx_management
, emqx_retainer
, emqx_exhook
, emqx_authn
, emqx_authz
, emqx_plugin
application:set_env(emqx_machine, applications, [
emqx_prometheus,
emqx_modules,
emqx_dashboard,
emqx_connector,
emqx_gateway,
emqx_statsd,
emqx_resource,
emqx_rule_engine,
emqx_bridge,
emqx_plugin_libs,
emqx_management,
emqx_retainer,
emqx_exhook,
emqx_authn,
emqx_authz,
emqx_plugin
]),
Config.

View File

@ -19,22 +19,26 @@
-include_lib("eunit/include/eunit.hrl").
sorted_reboot_apps_test_() ->
Apps1 = [{1, [2, 3, 4]},
Apps1 = [
{1, [2, 3, 4]},
{2, [3, 4]}
],
Apps2 = [{1, [2, 3, 4]},
Apps2 = [
{1, [2, 3, 4]},
{2, [3, 4]},
{5, [4, 3, 2, 1, 1]}
],
[fun() -> check_order(Apps1) end,
[
fun() -> check_order(Apps1) end,
fun() -> check_order(Apps2) end
].
sorted_reboot_apps_cycle_test() ->
Apps = [{1,[2]},{2, [1,3]}],
?assertError({circular_application_dependency, [[1, 2, 1], [2, 1, 2]]},
check_order(Apps)).
Apps = [{1, [2]}, {2, [1, 3]}],
?assertError(
{circular_application_dependency, [[1, 2, 1], [2, 1, 2]]},
check_order(Apps)
).
check_order(Apps) ->
AllApps = lists:usort(lists:append([[A | Deps] || {A, Deps} <- Apps])),
@ -47,7 +51,8 @@ check_order(Apps) ->
lists:foldr(fun(A, {I, Acc}) -> {I + 1, [{A, I} | Acc]} end, {1, []}, Sorted),
do_check_order(Apps, SortedWithIndex).
do_check_order([], _) -> ok;
do_check_order([], _) ->
ok;
do_check_order([{A, Deps} | Rest], Sorted) ->
case lists:filter(fun(Dep) -> is_sorted_before(Dep, A, Sorted) end, Deps) of
[] -> do_check_order(Rest, Sorted);

View File

@ -35,25 +35,39 @@ end_per_suite(_Config) ->
t_local_allowed(_Config) ->
LocalProhibited = [halt, q],
State = undefined,
lists:foreach(fun(LocalFunc) ->
lists:foreach(
fun(LocalFunc) ->
?assertEqual({false, State}, emqx_restricted_shell:local_allowed(LocalFunc, [], State))
end, LocalProhibited),
end,
LocalProhibited
),
LocalAllowed = [ls, pwd],
lists:foreach(fun(LocalFunc) ->
?assertEqual({true, State},emqx_restricted_shell:local_allowed(LocalFunc, [], State))
end, LocalAllowed),
lists:foreach(
fun(LocalFunc) ->
?assertEqual({true, State}, emqx_restricted_shell:local_allowed(LocalFunc, [], State))
end,
LocalAllowed
),
ok.
t_non_local_allowed(_Config) ->
RemoteProhibited = [{erlang, halt}, {c, q}, {init, stop}, {init, restart}, {init, reboot}],
State = undefined,
lists:foreach(fun(RemoteFunc) ->
?assertEqual({false, State}, emqx_restricted_shell:non_local_allowed(RemoteFunc, [], State))
end, RemoteProhibited),
lists:foreach(
fun(RemoteFunc) ->
?assertEqual(
{false, State}, emqx_restricted_shell:non_local_allowed(RemoteFunc, [], State)
)
end,
RemoteProhibited
),
RemoteAllowed = [{erlang, date}, {erlang, system_time}],
lists:foreach(fun(RemoteFunc) ->
lists:foreach(
fun(RemoteFunc) ->
?assertEqual({true, State}, emqx_restricted_shell:local_allowed(RemoteFunc, [], State))
end, RemoteAllowed),
end,
RemoteAllowed
),
ok.
t_lock(_Config) ->
@ -62,11 +76,15 @@ t_lock(_Config) ->
?assertEqual({false, State}, emqx_restricted_shell:local_allowed(q, [], State)),
?assertEqual({true, State}, emqx_restricted_shell:local_allowed(ls, [], State)),
?assertEqual({false, State}, emqx_restricted_shell:non_local_allowed({init, stop}, [], State)),
?assertEqual({true, State}, emqx_restricted_shell:non_local_allowed({inet, getifaddrs}, [], State)),
?assertEqual(
{true, State}, emqx_restricted_shell:non_local_allowed({inet, getifaddrs}, [], State)
),
emqx_restricted_shell:unlock(),
?assertEqual({true, State}, emqx_restricted_shell:local_allowed(q, [], State)),
?assertEqual({true, State}, emqx_restricted_shell:local_allowed(ls, [], State)),
?assertEqual({true, State}, emqx_restricted_shell:non_local_allowed({init, stop}, [], State)),
?assertEqual({true, State}, emqx_restricted_shell:non_local_allowed({inet, getifaddrs}, [], State)),
?assertEqual(
{true, State}, emqx_restricted_shell:non_local_allowed({inet, getifaddrs}, [], State)
),
emqx_restricted_shell:lock(),
ok.

View File

@ -1,4 +1,5 @@
%% -*- mode: erlang -*-
{deps, [ {emqx, {path, "../emqx"}}
]}.
{deps, [{emqx, {path, "../emqx"}}]}.
{project_plugins, [erlfmt]}.

View File

@ -17,29 +17,31 @@
-module(emqx_placeholder).
%% preprocess and process template string with place holders
-export([ preproc_tmpl/1
, preproc_tmpl/2
, proc_tmpl/2
, proc_tmpl/3
, preproc_cmd/1
, proc_cmd/2
, proc_cmd/3
, preproc_sql/1
, preproc_sql/2
, proc_sql/2
, proc_sql_param_str/2
, proc_cql_param_str/2
, preproc_tmpl_deep/1
, preproc_tmpl_deep/2
, proc_tmpl_deep/2
, proc_tmpl_deep/3
-export([
preproc_tmpl/1,
preproc_tmpl/2,
proc_tmpl/2,
proc_tmpl/3,
preproc_cmd/1,
proc_cmd/2,
proc_cmd/3,
preproc_sql/1,
preproc_sql/2,
proc_sql/2,
proc_sql_param_str/2,
proc_cql_param_str/2,
preproc_tmpl_deep/1,
preproc_tmpl_deep/2,
proc_tmpl_deep/2,
proc_tmpl_deep/3,
, bin/1
, sql_data/1
]).
bin/1,
sql_data/1
]).
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
-define(EX_WITHE_CHARS, "\\s"). %% Space and CRLF
%% Space and CRLF
-define(EX_WITHE_CHARS, "\\s").
-type tmpl_token() :: list({var, binary()} | {str, binary()}).
@ -48,26 +50,32 @@
-type prepare_statement_key() :: binary().
-type var_trans() ::
fun((FoundValue :: term()) -> binary()) |
fun((Placeholder :: term(), FoundValue :: term()) -> binary()).
fun((FoundValue :: term()) -> binary())
| fun((Placeholder :: term(), FoundValue :: term()) -> binary()).
-type preproc_tmpl_opts() :: #{placeholders => list(binary())}.
-type preproc_sql_opts() :: #{placeholders => list(binary()),
replace_with => '?' | '$n'}.
-type preproc_sql_opts() :: #{
placeholders => list(binary()),
replace_with => '?' | '$n'
}.
-type preproc_deep_opts() :: #{placeholders => list(binary()),
process_keys => boolean()}.
-type preproc_deep_opts() :: #{
placeholders => list(binary()),
process_keys => boolean()
}.
-type proc_tmpl_opts() :: #{return => rawlist | full_binary,
var_trans => var_trans()}.
-type proc_tmpl_opts() :: #{
return => rawlist | full_binary,
var_trans => var_trans()
}.
-type deep_template() ::
#{deep_template() => deep_template()} |
{tuple, [deep_template()]} |
[deep_template()] |
{tmpl, tmpl_token()} |
{value, term()}.
#{deep_template() => deep_template()}
| {tuple, [deep_template()]}
| [deep_template()]
| {tmpl, tmpl_token()}
| {value, term()}.
%%------------------------------------------------------------------------------
%% APIs
@ -83,7 +91,6 @@ preproc_tmpl(Str, Opts) ->
Tokens = re:split(Str, RE, [{return, binary}, group, trim]),
do_preproc_tmpl(Tokens, []).
-spec proc_tmpl(tmpl_token(), map()) -> binary().
proc_tmpl(Tokens, Data) ->
proc_tmpl(Tokens, Data, #{return => full_binary}).
@ -92,24 +99,27 @@ proc_tmpl(Tokens, Data) ->
proc_tmpl(Tokens, Data, Opts = #{return := full_binary}) ->
Trans = maps:get(var_trans, Opts, fun emqx_plugin_libs_rule:bin/1),
list_to_binary(
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Trans}));
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Trans})
);
proc_tmpl(Tokens, Data, Opts = #{return := rawlist}) ->
Trans = maps:get(var_trans, Opts, undefined),
lists:map(
fun ({str, Str}) -> Str;
fun
({str, Str}) ->
Str;
({var, Phld}) when is_function(Trans, 1) ->
Trans(get_phld_var(Phld, Data));
({var, Phld}) when is_function(Trans, 2) ->
Trans(Phld, get_phld_var(Phld, Data));
({var, Phld}) ->
get_phld_var(Phld, Data)
end, Tokens).
end,
Tokens
).
-spec preproc_cmd(binary()) -> tmpl_cmd().
preproc_cmd(Str) ->
SubStrList = re:split(Str, ?EX_WITHE_CHARS, [{return,binary},trim]),
SubStrList = re:split(Str, ?EX_WITHE_CHARS, [{return, binary}, trim]),
[preproc_tmpl(SubStr) || SubStr <- SubStrList].
-spec proc_cmd([tmpl_token()], map()) -> binary() | list().
@ -119,9 +129,8 @@ proc_cmd(Tokens, Data) ->
proc_cmd(Tokens, Data, Opts) ->
[proc_tmpl(Tks, Data, Opts) || Tks <- Tokens].
%% preprocess SQL with place holders
-spec preproc_sql(Sql::binary()) -> {prepare_statement_key(), tmpl_token()}.
-spec preproc_sql(Sql :: binary()) -> {prepare_statement_key(), tmpl_token()}.
preproc_sql(Sql) ->
preproc_sql(Sql, '?').
@ -129,7 +138,6 @@ preproc_sql(Sql) ->
{prepare_statement_key(), tmpl_token()}.
preproc_sql(Sql, ReplaceWith) when is_atom(ReplaceWith) ->
preproc_sql(Sql, #{replace_with => ReplaceWith});
preproc_sql(Sql, Opts) ->
RE = preproc_var_re(Opts),
ReplaceWith = maps:get(replace_with, Opts, '?'),
@ -141,22 +149,18 @@ preproc_sql(Sql, Opts) ->
{Sql, []}
end.
-spec proc_sql(tmpl_token(), map()) -> list().
proc_sql(Tokens, Data) ->
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => fun sql_data/1}).
-spec proc_sql_param_str(tmpl_token(), map()) -> binary().
proc_sql_param_str(Tokens, Data) ->
proc_param_str(Tokens, Data, fun quote_sql/1).
-spec proc_cql_param_str(tmpl_token(), map()) -> binary().
proc_cql_param_str(Tokens, Data) ->
proc_param_str(Tokens, Data, fun quote_cql/1).
-spec preproc_tmpl_deep(term()) -> deep_template().
preproc_tmpl_deep(Data) ->
preproc_tmpl_deep(Data, #{process_keys => true}).
@ -164,26 +168,22 @@ preproc_tmpl_deep(Data) ->
-spec preproc_tmpl_deep(term(), preproc_deep_opts()) -> deep_template().
preproc_tmpl_deep(List, Opts) when is_list(List) ->
[preproc_tmpl_deep(El, Opts) || El <- List];
preproc_tmpl_deep(Map, Opts) when is_map(Map) ->
maps:from_list(
lists:map(
fun({K, V}) ->
{preproc_tmpl_deep_map_key(K, Opts),
preproc_tmpl_deep(V, Opts)}
{preproc_tmpl_deep_map_key(K, Opts), preproc_tmpl_deep(V, Opts)}
end,
maps:to_list(Map)));
maps:to_list(Map)
)
);
preproc_tmpl_deep(Binary, Opts) when is_binary(Binary) ->
{tmpl, preproc_tmpl(Binary, Opts)};
preproc_tmpl_deep(Tuple, Opts) when is_tuple(Tuple) ->
{tuple, preproc_tmpl_deep(tuple_to_list(Tuple), Opts)};
preproc_tmpl_deep(Other, _Opts) ->
{value, Other}.
-spec proc_tmpl_deep(deep_template(), map()) -> term().
proc_tmpl_deep(DeepTmpl, Data) ->
proc_tmpl_deep(DeepTmpl, Data, #{return => full_binary}).
@ -191,24 +191,22 @@ proc_tmpl_deep(DeepTmpl, Data) ->
-spec proc_tmpl_deep(deep_template(), map(), proc_tmpl_opts()) -> term().
proc_tmpl_deep(List, Data, Opts) when is_list(List) ->
[proc_tmpl_deep(El, Data, Opts) || El <- List];
proc_tmpl_deep(Map, Data, Opts) when is_map(Map) ->
maps:from_list(
lists:map(
fun({K, V}) ->
{proc_tmpl_deep(K, Data, Opts),
proc_tmpl_deep(V, Data, Opts)}
{proc_tmpl_deep(K, Data, Opts), proc_tmpl_deep(V, Data, Opts)}
end,
maps:to_list(Map)));
proc_tmpl_deep({value, Value}, _Data, _Opts) -> Value;
proc_tmpl_deep({tmpl, Tokens}, Data, Opts) -> proc_tmpl(Tokens, Data, Opts);
maps:to_list(Map)
)
);
proc_tmpl_deep({value, Value}, _Data, _Opts) ->
Value;
proc_tmpl_deep({tmpl, Tokens}, Data, Opts) ->
proc_tmpl(Tokens, Data, Opts);
proc_tmpl_deep({tuple, Elements}, Data, Opts) ->
list_to_tuple([proc_tmpl_deep(El, Data, Opts) || El <- Elements]).
-spec sql_data(term()) -> term().
sql_data(undefined) -> null;
sql_data(List) when is_list(List) -> List;
@ -218,7 +216,6 @@ sql_data(Bool) when is_boolean(Bool) -> Bool;
sql_data(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
sql_data(Map) when is_map(Map) -> emqx_json:encode(Map).
-spec bin(term()) -> binary().
bin(Val) -> emqx_plugin_libs_rule:bin(Val).
@ -228,7 +225,8 @@ bin(Val) -> emqx_plugin_libs_rule:bin(Val).
proc_param_str(Tokens, Data, Quote) ->
iolist_to_binary(
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})).
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})
).
%% backward compatibility for hot upgrading from =< e4.2.1
get_phld_var(Fun, Data) when is_function(Fun) ->
@ -252,19 +250,20 @@ do_preproc_tmpl([[Str, Phld] | Tokens], Acc) ->
put_head(
var,
parse_nested(unwrap(Phld)),
put_head(str, Str, Acc)));
put_head(str, Str, Acc)
)
);
do_preproc_tmpl([[Str] | Tokens], Acc) ->
do_preproc_tmpl(
Tokens,
put_head(str, Str, Acc)).
put_head(str, Str, Acc)
).
put_head(_Type, <<>>, List) -> List;
put_head(Type, Term, List) ->
[{Type, Term} | List].
put_head(Type, Term, List) -> [{Type, Term} | List].
preproc_tmpl_deep_map_key(Key, #{process_keys := true} = Opts) ->
preproc_tmpl_deep(Key, Opts);
preproc_tmpl_deep_map_key(Key, _) ->
{value, Key}.
@ -274,12 +273,16 @@ replace_with(Tmpl, RE, '$n') ->
Parts = re:split(Tmpl, RE, [{return, binary}, trim, group]),
{Res, _} =
lists:foldl(
fun([Tkn, _Phld], {Acc, Seq}) ->
fun
([Tkn, _Phld], {Acc, Seq}) ->
Seq1 = erlang:integer_to_binary(Seq),
{<<Acc/binary, Tkn/binary, "$", Seq1/binary>>, Seq + 1};
([Tkn], {Acc, Seq}) ->
{<<Acc/binary, Tkn/binary>>, Seq}
end, {<<>>, 1}, Parts),
end,
{<<>>, 1},
Parts
),
Res.
parse_nested(Attr) ->
@ -289,7 +292,7 @@ parse_nested(Attr) ->
end.
unwrap(<<"${", Val/binary>>) ->
binary:part(Val, {0, byte_size(Val)-1}).
binary:part(Val, {0, byte_size(Val) - 1}).
quote_sql(Str) ->
quote(Str, <<"\\\\'">>).
@ -301,7 +304,8 @@ quote(Str, ReplaceWith) when
is_list(Str);
is_binary(Str);
is_atom(Str);
is_map(Str) ->
is_map(Str)
->
[$', escape_apo(bin(Str), ReplaceWith), $'];
quote(Val, _) ->
bin(Val).

View File

@ -1,8 +1,8 @@
%% -*- mode: erlang -*-
{application, emqx_plugin_libs,
[{description, "EMQX Plugin utility libs"},
{application, emqx_plugin_libs, [
{description, "EMQX Plugin utility libs"},
{vsn, "4.3.1"},
{modules, []},
{applications, [kernel,stdlib]},
{applications, [kernel, stdlib]},
{env, []}
]}.
]}.

View File

@ -19,34 +19,36 @@
-behaviour(gen_server).
%% API functions
-export([ start_link/1
, stop/1
, child_spec/1
]).
-export([
start_link/1,
stop/1,
child_spec/1
]).
-export([ inc/3
, inc/4
, get/3
, get_rate/2
, get_counters/2
, create_metrics/3
, create_metrics/4
, clear_metrics/2
, reset_metrics/2
, has_metrics/2
]).
-export([
inc/3,
inc/4,
get/3,
get_rate/2,
get_counters/2,
create_metrics/3,
create_metrics/4,
clear_metrics/2,
reset_metrics/2,
has_metrics/2
]).
-export([ get_metrics/2
]).
-export([get_metrics/2]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_info/2
, handle_cast/2
, code_change/3
, terminate/2
]).
-export([
init/1,
handle_call/3,
handle_info/2,
handle_cast/2,
code_change/3,
terminate/2
]).
-ifndef(TEST).
-define(SECS_5M, 300).
@ -84,78 +86,83 @@
%% metadata for calculating the 5min avg rate
last5m_acc = 0 :: number(),
last5m_smpl = [] :: list()
}).
}).
-record(state, {
metric_ids = sets:new(),
rates :: undefined | #{metric_id() => #rate{}}
}).
}).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec(child_spec(handler_name()) -> supervisor:child_spec()).
-spec child_spec(handler_name()) -> supervisor:child_spec().
child_spec(Name) ->
#{ id => emqx_plugin_libs_metrics
, start => {emqx_plugin_libs_metrics, start_link, [Name]}
, restart => permanent
, shutdown => 5000
, type => worker
, modules => [emqx_plugin_libs_metrics]
#{
id => emqx_plugin_libs_metrics,
start => {emqx_plugin_libs_metrics, start_link, [Name]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_plugin_libs_metrics]
}.
-spec(create_metrics(handler_name(), metric_id(), [atom()]) -> ok | {error, term()}).
-spec create_metrics(handler_name(), metric_id(), [atom()]) -> ok | {error, term()}.
create_metrics(Name, Id, Metrics) ->
create_metrics(Name, Id, Metrics, Metrics).
-spec(create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok | {error, term()}).
-spec create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok | {error, term()}.
create_metrics(Name, Id, Metrics, RateMetrics) ->
gen_server:call(Name, {create_metrics, Id, Metrics, RateMetrics}).
-spec(clear_metrics(handler_name(), metric_id()) -> ok).
-spec clear_metrics(handler_name(), metric_id()) -> ok.
clear_metrics(Name, Id) ->
gen_server:call(Name, {delete_metrics, Id}).
-spec(reset_metrics(handler_name(), metric_id()) -> ok).
-spec reset_metrics(handler_name(), metric_id()) -> ok.
reset_metrics(Name, Id) ->
gen_server:call(Name, {reset_metrics, Id}).
-spec(has_metrics(handler_name(), metric_id()) -> boolean()).
-spec has_metrics(handler_name(), metric_id()) -> boolean().
has_metrics(Name, Id) ->
case get_ref(Name, Id) of
not_found -> false;
_ -> true
end.
-spec(get(handler_name(), metric_id(), atom() | integer()) -> number()).
-spec get(handler_name(), metric_id(), atom() | integer()) -> number().
get(Name, Id, Metric) ->
case get_ref(Name, Id) of
not_found -> 0;
not_found ->
0;
Ref when is_atom(Metric) ->
counters:get(Ref, idx_metric(Name, Id, Metric));
Ref when is_integer(Metric) ->
counters:get(Ref, Metric)
end.
-spec(get_rate(handler_name(), metric_id()) -> map()).
-spec get_rate(handler_name(), metric_id()) -> map().
get_rate(Name, Id) ->
gen_server:call(Name, {get_rate, Id}).
-spec(get_counters(handler_name(), metric_id()) -> map()).
-spec get_counters(handler_name(), metric_id()) -> map().
get_counters(Name, Id) ->
maps:map(fun(_Metric, Index) ->
maps:map(
fun(_Metric, Index) ->
get(Name, Id, Index)
end, get_indexes(Name, Id)).
end,
get_indexes(Name, Id)
).
-spec reset_counters(handler_name(), metric_id()) -> ok.
reset_counters(Name, Id) ->
Indexes = maps:values(get_indexes(Name, Id)),
Ref = get_ref(Name, Id),
[counters:put(Ref, Idx, 0) || Idx <- Indexes ],
[counters:put(Ref, Idx, 0) || Idx <- Indexes],
ok.
-spec(get_metrics(handler_name(), metric_id()) -> metrics()).
-spec get_metrics(handler_name(), metric_id()) -> metrics().
get_metrics(Name, Id) ->
#{rate => get_rate(Name, Id), counters => get_counters(Name, Id)}.
@ -180,47 +187,63 @@ init(Name) ->
handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) ->
{reply, make_rate(0, 0, 0), State};
handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) ->
{reply, case maps:get(Id, Rates, undefined) of
{reply,
case maps:get(Id, Rates, undefined) of
undefined -> make_rate(0, 0, 0);
RatesPerId -> format_rates_of_id(RatesPerId)
end, State};
handle_call({create_metrics, Id, Metrics, RateMetrics}, _From,
State = #state{metric_ids = MIDs, rates = Rates}) ->
handle_call(
{create_metrics, Id, Metrics, RateMetrics},
_From,
State = #state{metric_ids = MIDs, rates = Rates}
) ->
case RateMetrics -- Metrics of
[] ->
RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]),
Rate1 = case Rates of
Rate1 =
case Rates of
undefined -> #{Id => RatePerId};
_ -> Rates#{Id => RatePerId}
end,
{reply, create_counters(get_self_name(), Id, Metrics),
State#state{metric_ids = sets:add_element(Id, MIDs),
rates = Rate1}};
{reply, create_counters(get_self_name(), Id, Metrics), State#state{
metric_ids = sets:add_element(Id, MIDs),
rates = Rate1
}};
_ ->
{reply, {error, not_super_set_of, {RateMetrics, Metrics}}, State}
end;
handle_call({delete_metrics, Id}, _From,
State = #state{metric_ids = MIDs, rates = Rates}) ->
{reply, delete_counters(get_self_name(), Id),
State#state{metric_ids = sets:del_element(Id, MIDs),
rates = case Rates of
handle_call(
{delete_metrics, Id},
_From,
State = #state{metric_ids = MIDs, rates = Rates}
) ->
{reply, delete_counters(get_self_name(), Id), State#state{
metric_ids = sets:del_element(Id, MIDs),
rates =
case Rates of
undefined -> undefined;
_ -> maps:remove(Id, Rates)
end}};
handle_call({reset_metrics, Id}, _From,
State = #state{rates = Rates}) ->
{reply, reset_counters(get_self_name(), Id),
State#state{rates = case Rates of
undefined -> undefined;
_ -> ResetRate =
maps:map(fun(_Key, _Value) -> #rate{} end,
maps:get(Id, Rates, #{})),
end
}};
handle_call(
{reset_metrics, Id},
_From,
State = #state{rates = Rates}
) ->
{reply, reset_counters(get_self_name(), Id), State#state{
rates =
case Rates of
undefined ->
undefined;
_ ->
ResetRate =
maps:map(
fun(_Key, _Value) -> #rate{} end,
maps:get(Id, Rates, #{})
),
maps:put(Id, ResetRate, Rates)
end}};
end
}};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
@ -230,17 +253,21 @@ handle_cast(_Msg, State) ->
handle_info(ticking, State = #state{rates = undefined}) ->
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
{noreply, State};
handle_info(ticking, State = #state{rates = Rates0}) ->
Rates =
maps:map(fun(Id, RatesPerID) ->
maps:map(fun(Metric, Rate) ->
maps:map(
fun(Id, RatesPerID) ->
maps:map(
fun(Metric, Rate) ->
calculate_rate(get(get_self_name(), Id, Metric), Rate)
end, RatesPerID)
end, Rates0),
end,
RatesPerID
)
end,
Rates0
),
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
{noreply, State#state{rates = Rates}};
handle_info(_Info, State) ->
{noreply, State}.
@ -269,12 +296,17 @@ create_counters(Name, Id, Metrics) ->
Indexes = maps:from_list(lists:zip(Metrics, lists:seq(1, Size))),
Counters = get_pterm(Name),
CntrRef = counters:new(Size, [write_concurrency]),
persistent_term:put(?CntrRef(Name),
Counters#{Id => #{ref => CntrRef, indexes => Indexes}}),
persistent_term:put(
?CntrRef(Name),
Counters#{Id => #{ref => CntrRef, indexes => Indexes}}
),
%% restore the old counters
lists:foreach(fun({Metric, N}) ->
lists:foreach(
fun({Metric, N}) ->
inc(Name, Id, Metric, N)
end, maps:to_list(OlderCounters)).
end,
maps:to_list(OlderCounters)
).
delete_counters(Name, Id) ->
persistent_term:put(?CntrRef(Name), maps:remove(Id, get_pterm(Name))).
@ -299,9 +331,13 @@ get_pterm(Name) ->
calculate_rate(_CurrVal, undefined) ->
undefined;
calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal,
tick = Tick, last5m_acc = AccRate5Min0,
last5m_smpl = Last5MinSamples0}) ->
calculate_rate(CurrVal, #rate{
max = MaxRate0,
last_v = LastVal,
tick = Tick,
last5m_acc = AccRate5Min0,
last5m_smpl = Last5MinSamples0
}) ->
%% calculate the current rate based on the last value of the counter
CurrRate = (CurrVal - LastVal) / ?SAMPLING,
@ -317,31 +353,39 @@ calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal,
case Tick =< ?SAMPCOUNT_5M of
true ->
Acc = AccRate5Min0 + CurrRate,
{lists:reverse([CurrRate | lists:reverse(Last5MinSamples0)]),
Acc, Acc / Tick};
{lists:reverse([CurrRate | lists:reverse(Last5MinSamples0)]), Acc, Acc / Tick};
false ->
[FirstRate | Rates] = Last5MinSamples0,
Acc = AccRate5Min0 + CurrRate - FirstRate,
{lists:reverse([CurrRate | lists:reverse(Rates)]),
Acc, Acc / ?SAMPCOUNT_5M}
{lists:reverse([CurrRate | lists:reverse(Rates)]), Acc, Acc / ?SAMPCOUNT_5M}
end,
#rate{max = MaxRate, current = CurrRate, last5m = Last5Min,
last_v = CurrVal, last5m_acc = Acc5Min,
last5m_smpl = Last5MinSamples, tick = Tick + 1}.
#rate{
max = MaxRate,
current = CurrRate,
last5m = Last5Min,
last_v = CurrVal,
last5m_acc = Acc5Min,
last5m_smpl = Last5MinSamples,
tick = Tick + 1
}.
format_rates_of_id(RatesPerId) ->
maps:map(fun(_Metric, Rates) ->
maps:map(
fun(_Metric, Rates) ->
format_rate(Rates)
end, RatesPerId).
end,
RatesPerId
).
format_rate(#rate{max = Max, current = Current, last5m = Last5Min}) ->
make_rate(Current, Max, Last5Min).
make_rate(Current, Max, Last5Min) ->
#{ current => precision(Current, 2)
, max => precision(Max, 2)
, last5m => precision(Last5Min, 2)
#{
current => precision(Current, 2),
max => precision(Max, 2),
last5m => precision(Last5Min, 2)
}.
precision(Float, N) ->

View File

@ -16,11 +16,12 @@
-module(emqx_plugin_libs_pool).
-export([ start_pool/3
, stop_pool/1
, pool_name/1
, health_check/3
]).
-export([
start_pool/3,
stop_pool/1,
pool_name/1,
health_check/3
]).
-include_lib("emqx/include/logger.hrl").
@ -36,8 +37,11 @@ start_pool(Name, Mod, Options) ->
stop_pool(Name),
start_pool(Name, Mod, Options);
{error, Reason} ->
?SLOG(error, #{msg => "start_ecpool_error", pool_name => Name,
reason => Reason}),
?SLOG(error, #{
msg => "start_ecpool_error",
pool_name => Name,
reason => Reason
}),
{error, {start_pool_failed, Name, Reason}}
end.
@ -48,18 +52,24 @@ stop_pool(Name) ->
{error, not_found} ->
ok;
{error, Reason} ->
?SLOG(error, #{msg => "stop_ecpool_failed", pool_name => Name,
reason => Reason}),
?SLOG(error, #{
msg => "stop_ecpool_failed",
pool_name => Name,
reason => Reason
}),
error({stop_pool_failed, Name, Reason})
end.
health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) ->
Status = [begin
Status = [
begin
case ecpool_worker:client(Worker) of
{ok, Conn} -> CheckFunc(Conn);
_ -> false
end
end || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
end
|| {_WorkerName, Worker} <- ecpool:workers(PoolName)
],
case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of
true -> {ok, State};
false -> {error, health_check_failed, State}

View File

@ -18,106 +18,109 @@
-elvis([{elvis_style, god_modules, disable}]).
%% preprocess and process template string with place holders
-export([ preproc_tmpl/1
, proc_tmpl/2
, proc_tmpl/3
, preproc_cmd/1
, proc_cmd/2
, proc_cmd/3
, preproc_sql/1
, preproc_sql/2
, proc_sql/2
, proc_sql_param_str/2
, proc_cql_param_str/2
]).
-export([
preproc_tmpl/1,
proc_tmpl/2,
proc_tmpl/3,
preproc_cmd/1,
proc_cmd/2,
proc_cmd/3,
preproc_sql/1,
preproc_sql/2,
proc_sql/2,
proc_sql_param_str/2,
proc_cql_param_str/2
]).
%% type converting
-export([ str/1
, bin/1
, bool/1
, int/1
, float/1
, map/1
, utf8_bin/1
, utf8_str/1
, number_to_binary/1
, atom_key/1
, unsafe_atom_key/1
]).
-export([
str/1,
bin/1,
bool/1,
int/1,
float/1,
map/1,
utf8_bin/1,
utf8_str/1,
number_to_binary/1,
atom_key/1,
unsafe_atom_key/1
]).
%% connectivity check
-export([ http_connectivity/1
, http_connectivity/2
, tcp_connectivity/2
, tcp_connectivity/3
]).
-export([
http_connectivity/1,
http_connectivity/2,
tcp_connectivity/2,
tcp_connectivity/3
]).
-export([ now_ms/0
, can_topic_match_oneof/2
]).
-export([
now_ms/0,
can_topic_match_oneof/2
]).
-export([cluster_call/3]).
-compile({no_auto_import,
[ float/1
]}).
-compile({no_auto_import, [float/1]}).
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
-define(EX_WITHE_CHARS, "\\s"). %% Space and CRLF
%% Space and CRLF
-define(EX_WITHE_CHARS, "\\s").
-type(uri_string() :: iodata()).
-type uri_string() :: iodata().
-type(tmpl_token() :: list({var, binary()} | {str, binary()})).
-type tmpl_token() :: list({var, binary()} | {str, binary()}).
-type(tmpl_cmd() :: list(tmpl_token())).
-type tmpl_cmd() :: list(tmpl_token()).
-type(prepare_statement_key() :: binary()).
-type prepare_statement_key() :: binary().
%% preprocess template string with place holders
-spec(preproc_tmpl(binary()) -> tmpl_token()).
-spec preproc_tmpl(binary()) -> tmpl_token().
preproc_tmpl(Str) ->
emqx_placeholder:preproc_tmpl(Str).
-spec(proc_tmpl(tmpl_token(), map()) -> binary()).
-spec proc_tmpl(tmpl_token(), map()) -> binary().
proc_tmpl(Tokens, Data) ->
emqx_placeholder:proc_tmpl(Tokens, Data).
-spec(proc_tmpl(tmpl_token(), map(), map()) -> binary() | list()).
-spec proc_tmpl(tmpl_token(), map(), map()) -> binary() | list().
proc_tmpl(Tokens, Data, Opts) ->
emqx_placeholder:proc_tmpl(Tokens, Data, Opts).
-spec(preproc_cmd(binary()) -> tmpl_cmd()).
-spec preproc_cmd(binary()) -> tmpl_cmd().
preproc_cmd(Str) ->
emqx_placeholder:preproc_cmd(Str).
-spec(proc_cmd([tmpl_token()], map()) -> binary() | list()).
-spec proc_cmd([tmpl_token()], map()) -> binary() | list().
proc_cmd(Tokens, Data) ->
emqx_placeholder:proc_cmd(Tokens, Data).
-spec(proc_cmd([tmpl_token()], map(), map()) -> list()).
-spec proc_cmd([tmpl_token()], map(), map()) -> list().
proc_cmd(Tokens, Data, Opts) ->
emqx_placeholder:proc_cmd(Tokens, Data, Opts).
%% preprocess SQL with place holders
-spec(preproc_sql(Sql::binary()) -> {prepare_statement_key(), tmpl_token()}).
-spec preproc_sql(Sql :: binary()) -> {prepare_statement_key(), tmpl_token()}.
preproc_sql(Sql) ->
emqx_placeholder:preproc_sql(Sql).
-spec(preproc_sql(Sql::binary(), ReplaceWith :: '?' | '$n')
-> {prepare_statement_key(), tmpl_token()}).
-spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n') ->
{prepare_statement_key(), tmpl_token()}.
preproc_sql(Sql, ReplaceWith) ->
emqx_placeholder:preproc_sql(Sql, ReplaceWith).
-spec(proc_sql(tmpl_token(), map()) -> list()).
-spec proc_sql(tmpl_token(), map()) -> list().
proc_sql(Tokens, Data) ->
emqx_placeholder:proc_sql(Tokens, Data).
-spec(proc_sql_param_str(tmpl_token(), map()) -> binary()).
-spec proc_sql_param_str(tmpl_token(), map()) -> binary().
proc_sql_param_str(Tokens, Data) ->
emqx_placeholder:proc_sql_param_str(Tokens, Data).
-spec(proc_cql_param_str(tmpl_token(), map()) -> binary()).
-spec proc_cql_param_str(tmpl_token(), map()) -> binary().
proc_cql_param_str(Tokens, Data) ->
emqx_placeholder:proc_cql_param_str(Tokens, Data).
@ -133,19 +136,22 @@ unsafe_atom_key(Key) ->
atom_key(Key) when is_atom(Key) ->
Key;
atom_key(Key) when is_binary(Key) ->
try binary_to_existing_atom(Key, utf8)
catch error:badarg -> error({invalid_key, Key})
try
binary_to_existing_atom(Key, utf8)
catch
error:badarg -> error({invalid_key, Key})
end;
atom_key(Keys = [_Key | _]) -> %% nested keys
%% nested keys
atom_key(Keys = [_Key | _]) ->
[atom_key(SubKey) || SubKey <- Keys];
atom_key(Key) ->
error({invalid_key, Key}).
-spec(http_connectivity(uri_string()) -> ok | {error, Reason :: term()}).
-spec http_connectivity(uri_string()) -> ok | {error, Reason :: term()}.
http_connectivity(Url) ->
http_connectivity(Url, 3000).
-spec(http_connectivity(uri_string(), integer()) -> ok | {error, Reason :: term()}).
-spec http_connectivity(uri_string(), integer()) -> ok | {error, Reason :: term()}.
http_connectivity(Url, Timeout) ->
case emqx_http_lib:uri_parse(Url) of
{ok, #{host := Host, port := Port}} ->
@ -154,20 +160,27 @@ http_connectivity(Url, Timeout) ->
{error, Reason}
end.
-spec tcp_connectivity(Host :: inet:socket_address() | inet:hostname(),
Port :: inet:port_number())
-> ok | {error, Reason :: term()}.
-spec tcp_connectivity(
Host :: inet:socket_address() | inet:hostname(),
Port :: inet:port_number()
) ->
ok | {error, Reason :: term()}.
tcp_connectivity(Host, Port) ->
tcp_connectivity(Host, Port, 3000).
-spec(tcp_connectivity(Host :: inet:socket_address() | inet:hostname(),
-spec tcp_connectivity(
Host :: inet:socket_address() | inet:hostname(),
Port :: inet:port_number(),
Timeout :: integer())
-> ok | {error, Reason :: term()}).
Timeout :: integer()
) ->
ok | {error, Reason :: term()}.
tcp_connectivity(Host, Port, Timeout) ->
case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of
{ok, Sock} -> gen_tcp:close(Sock), ok;
{error, Reason} -> {error, Reason}
{ok, Sock} ->
gen_tcp:close(Sock),
ok;
{error, Reason} ->
{error, Reason}
end.
str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
@ -179,7 +192,8 @@ str(List) when is_list(List) ->
true -> List;
false -> binary_to_list(emqx_json:encode(List))
end;
str(Data) -> error({invalid_str, Data}).
str(Data) ->
error({invalid_str, Data}).
utf8_bin(Str) when is_binary(Str); is_list(Str) ->
unicode:characters_to_binary(Str);
@ -200,36 +214,49 @@ bin(List) when is_list(List) ->
true -> list_to_binary(List);
false -> emqx_json:encode(List)
end;
bin(Data) -> error({invalid_bin, Data}).
bin(Data) ->
error({invalid_bin, Data}).
int(List) when is_list(List) ->
try list_to_integer(List)
catch error:badarg ->
try
list_to_integer(List)
catch
error:badarg ->
int(list_to_float(List))
end;
int(Bin) when is_binary(Bin) ->
try binary_to_integer(Bin)
catch error:badarg ->
try
binary_to_integer(Bin)
catch
error:badarg ->
int(binary_to_float(Bin))
end;
int(Int) when is_integer(Int) -> Int;
int(Float) when is_float(Float) -> erlang:floor(Float);
int(true) -> 1;
int(false) -> 0;
int(Data) -> error({invalid_number, Data}).
int(true) ->
1;
int(false) ->
0;
int(Data) ->
error({invalid_number, Data}).
float(List) when is_list(List) ->
try list_to_float(List)
catch error:badarg ->
try
list_to_float(List)
catch
error:badarg ->
float(list_to_integer(List))
end;
float(Bin) when is_binary(Bin) ->
try binary_to_float(Bin)
catch error:badarg ->
try
binary_to_float(Bin)
catch
error:badarg ->
float(binary_to_integer(Bin))
end;
float(Num) when is_number(Num) -> erlang:float(Num);
float(Data) -> error({invalid_number, Data}).
float(Data) ->
error({invalid_number, Data}).
map(Bin) when is_binary(Bin) ->
case emqx_json:decode(Bin, [return_maps]) of
@ -238,16 +265,23 @@ map(Bin) when is_binary(Bin) ->
end;
map(List) when is_list(List) -> maps:from_list(List);
map(Map) when is_map(Map) -> Map;
map(Data) -> error({invalid_map, Data}).
map(Data) ->
error({invalid_map, Data}).
bool(Bool) when Bool == true;
bool(Bool) when
Bool == true;
Bool == <<"true">>;
Bool == 1 -> true;
bool(Bool) when Bool == false;
Bool == 1
->
true;
bool(Bool) when
Bool == false;
Bool == <<"false">>;
Bool == 0 -> false;
bool(Bool) -> error({invalid_boolean, Bool}).
Bool == 0
->
false;
bool(Bool) ->
error({invalid_boolean, Bool}).
number_to_binary(Int) when is_integer(Int) ->
integer_to_binary(Int);
@ -263,9 +297,12 @@ now_ms() ->
erlang:system_time(millisecond).
can_topic_match_oneof(Topic, Filters) ->
lists:any(fun(Fltr) ->
lists:any(
fun(Fltr) ->
emqx_topic:match(Topic, Fltr)
end, Filters).
end,
Filters
).
cluster_call(Module, Func, Args) ->
{ok, _TnxId, Result} = emqx_cluster_rpc:multicall(Module, Func, Args),

View File

@ -18,19 +18,21 @@
-behaviour(emqx_bpapi).
-export([ introduced_in/0
-export([
introduced_in/0,
, get_metrics/3
]).
get_metrics/3
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.0.0".
-spec get_metrics( node()
, emqx_plugin_libs_metrics:handler_name()
, emqx_plugin_libs_metrics:metric_id()
) -> emqx_plugin_libs_metrics:metrics() | {badrpc, _}.
-spec get_metrics(
node(),
emqx_plugin_libs_metrics:handler_name(),
emqx_plugin_libs_metrics:metric_id()
) -> emqx_plugin_libs_metrics:metrics() | {badrpc, _}.
get_metrics(Node, HandlerName, MetricId) ->
rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [HandlerName, MetricId]).

View File

@ -23,54 +23,72 @@
all() -> emqx_common_test_helpers:all(?MODULE).
t_proc_tmpl(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
Tks = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:1,b:1,c:1.0,d:{\"d1\":\"hi\"}">>,
emqx_placeholder:proc_tmpl(Tks, Selected)).
?assertEqual(
<<"a:1,b:1,c:1.0,d:{\"d1\":\"hi\"}">>,
emqx_placeholder:proc_tmpl(Tks, Selected)
).
t_proc_tmpl_path(_) ->
Selected = #{d => #{d1 => <<"hi">>}},
Tks = emqx_placeholder:preproc_tmpl(<<"d.d1:${d.d1}">>),
?assertEqual(<<"d.d1:hi">>,
emqx_placeholder:proc_tmpl(Tks, Selected)).
?assertEqual(
<<"d.d1:hi">>,
emqx_placeholder:proc_tmpl(Tks, Selected)
).
t_proc_tmpl_custom_ph(_) ->
Selected = #{a => <<"a">>, b => <<"b">>},
Tks = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b}">>, #{placeholders => [<<"${a}">>]}),
?assertEqual(<<"a:a,b:${b}">>,
emqx_placeholder:proc_tmpl(Tks, Selected)).
?assertEqual(
<<"a:a,b:${b}">>,
emqx_placeholder:proc_tmpl(Tks, Selected)
).
t_proc_tmpl1(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
Tks = emqx_placeholder:preproc_tmpl(<<"a:$a,b:b},c:{c},d:${d">>),
?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>,
emqx_placeholder:proc_tmpl(Tks, Selected)).
?assertEqual(
<<"a:$a,b:b},c:{c},d:${d">>,
emqx_placeholder:proc_tmpl(Tks, Selected)
).
t_proc_cmd(_) ->
Selected = #{v0 => <<"x">>, v1 => <<"1">>, v2 => #{d1 => <<"hi">>}},
Tks = emqx_placeholder:preproc_cmd(<<"hset name a:${v0} ${v1} b ${v2} ">>),
?assertEqual([<<"hset">>, <<"name">>,
<<"a:x">>, <<"1">>,
<<"b">>, <<"{\"d1\":\"hi\"}">>],
emqx_placeholder:proc_cmd(Tks, Selected)).
?assertEqual(
[
<<"hset">>,
<<"name">>,
<<"a:x">>,
<<"1">>,
<<"b">>,
<<"{\"d1\":\"hi\"}">>
],
emqx_placeholder:proc_cmd(Tks, Selected)
).
t_preproc_sql(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
{PrepareStatement, ParamsTokens} =
emqx_placeholder:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '?'),
?assertEqual(<<"a:?,b:?,c:?,d:?">>, PrepareStatement),
?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>],
emqx_placeholder:proc_sql(ParamsTokens, Selected)).
?assertEqual(
[<<"1">>, 1, 1.0, <<"{\"d1\":\"hi\"}">>],
emqx_placeholder:proc_sql(ParamsTokens, Selected)
).
t_preproc_sql1(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
{PrepareStatement, ParamsTokens} =
emqx_placeholder:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '$n'),
?assertEqual(<<"a:$1,b:$2,c:$3,d:$4">>, PrepareStatement),
?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>],
emqx_placeholder:proc_sql(ParamsTokens, Selected)).
?assertEqual(
[<<"1">>, 1, 1.0, <<"{\"d1\":\"hi\"}">>],
emqx_placeholder:proc_sql(ParamsTokens, Selected)
).
t_preproc_sql2(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
@ -82,49 +100,72 @@ t_preproc_sql2(_) ->
t_preproc_sql3(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
ParamsTokens = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}'">>,
emqx_placeholder:proc_sql_param_str(ParamsTokens, Selected)).
?assertEqual(
<<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}'">>,
emqx_placeholder:proc_sql_param_str(ParamsTokens, Selected)
).
t_preproc_sql4(_) ->
%% with apostrophes
%% https://github.com/emqx/emqx/issues/4135
Selected = #{a => <<"1''2">>, b => 1, c => 1.0,
d => #{d1 => <<"someone's phone">>}},
Selected = #{
a => <<"1''2">>,
b => 1,
c => 1.0,
d => #{d1 => <<"someone's phone">>}
},
ParamsTokens = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:'1\\'\\'2',b:1,c:1.0,d:'{\"d1\":\"someone\\'s phone\"}'">>,
emqx_placeholder:proc_sql_param_str(ParamsTokens, Selected)).
?assertEqual(
<<"a:'1\\'\\'2',b:1,c:1.0,d:'{\"d1\":\"someone\\'s phone\"}'">>,
emqx_placeholder:proc_sql_param_str(ParamsTokens, Selected)
).
t_preproc_sql5(_) ->
%% with apostrophes for cassandra
%% https://github.com/emqx/emqx/issues/4148
Selected = #{a => <<"1''2">>, b => 1, c => 1.0,
d => #{d1 => <<"someone's phone">>}},
Selected = #{
a => <<"1''2">>,
b => 1,
c => 1.0,
d => #{d1 => <<"someone's phone">>}
},
ParamsTokens = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>,
emqx_placeholder:proc_cql_param_str(ParamsTokens, Selected)).
?assertEqual(
<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>,
emqx_placeholder:proc_cql_param_str(ParamsTokens, Selected)
).
t_preproc_sql6(_) ->
Selected = #{a => <<"a">>, b => <<"b">>},
{PrepareStatement, ParamsTokens} = emqx_placeholder:preproc_sql(
<<"a:${a},b:${b}">>,
#{replace_with => '$n',
placeholders => [<<"${a}">>]}),
#{
replace_with => '$n',
placeholders => [<<"${a}">>]
}
),
?assertEqual(<<"a:$1,b:${b}">>, PrepareStatement),
?assertEqual([<<"a">>],
emqx_placeholder:proc_sql(ParamsTokens, Selected)).
?assertEqual(
[<<"a">>],
emqx_placeholder:proc_sql(ParamsTokens, Selected)
).
t_preproc_tmpl_deep(_) ->
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
Tmpl0 = emqx_placeholder:preproc_tmpl_deep(
#{<<"${a}">> => [<<"${b}">>, "c", 2, 3.0, '${d}', {[<<"${c}">>], 0}]}),
#{<<"${a}">> => [<<"${b}">>, "c", 2, 3.0, '${d}', {[<<"${c}">>], 0}]}
),
?assertEqual(
#{<<"1">> => [<<"1">>, "c", 2, 3.0, '${d}', {[<<"1.0">>], 0}]},
emqx_placeholder:proc_tmpl_deep(Tmpl0, Selected)),
emqx_placeholder:proc_tmpl_deep(Tmpl0, Selected)
),
Tmpl1 = emqx_placeholder:preproc_tmpl_deep(
#{<<"${a}">> => [<<"${b}">>, "c", 2, 3.0, '${d}', {[<<"${c}">>], 0}]},
#{process_keys => false}),
#{process_keys => false}
),
?assertEqual(
#{<<"${a}">> => [<<"1">>, "c", 2, 3.0, '${d}', {[<<"1.0">>], 0}]},
emqx_placeholder:proc_tmpl_deep(Tmpl1, Selected)).
emqx_placeholder:proc_tmpl_deep(Tmpl1, Selected)
).

View File

@ -52,7 +52,8 @@ t_get_metrics(_) ->
Metrics = [a, b, c],
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics),
%% all the metrics are set to zero at start
?assertMatch(#{
?assertMatch(
#{
rate := #{
a := #{current := 0.0, max := 0.0, last5m := 0.0},
b := #{current := 0.0, max := 0.0, last5m := 0.0},
@ -63,13 +64,16 @@ t_get_metrics(_) ->
b := 0,
c := 0
}
}, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
},
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)
),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
ct:sleep(1500),
?LET(#{
?LET(
#{
rate := #{
a := #{current := CurrA, max := MaxA, last5m := _},
b := #{current := CurrB, max := MaxB, last5m := _},
@ -80,16 +84,25 @@ t_get_metrics(_) ->
b := 1,
c := 2
}
}, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>),
{?assert(CurrA > 0), ?assert(CurrB > 0), ?assert(CurrC > 0),
?assert(MaxA > 0), ?assert(MaxB > 0), ?assert(MaxC > 0)}),
},
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>),
{
?assert(CurrA > 0),
?assert(CurrB > 0),
?assert(CurrC > 0),
?assert(MaxA > 0),
?assert(MaxB > 0),
?assert(MaxC > 0)
}
),
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
t_reset_metrics(_) ->
Metrics = [a, b, c],
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics),
%% all the metrics are set to zero at start
?assertMatch(#{
?assertMatch(
#{
rate := #{
a := #{current := 0.0, max := 0.0, last5m := 0.0},
b := #{current := 0.0, max := 0.0, last5m := 0.0},
@ -100,14 +113,17 @@ t_reset_metrics(_) ->
b := 0,
c := 0
}
}, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
},
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)
),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
ct:sleep(1500),
ok = emqx_plugin_libs_metrics:reset_metrics(?NAME, <<"testid">>),
?LET(#{
?LET(
#{
rate := #{
a := #{current := CurrA, max := MaxA, last5m := _},
b := #{current := CurrB, max := MaxB, last5m := _},
@ -118,19 +134,32 @@ t_reset_metrics(_) ->
b := 0,
c := 0
}
}, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>),
{?assert(CurrA == 0), ?assert(CurrB == 0), ?assert(CurrC == 0),
?assert(MaxA == 0), ?assert(MaxB == 0), ?assert(MaxC == 0)}),
},
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>),
{
?assert(CurrA == 0),
?assert(CurrB == 0),
?assert(CurrC == 0),
?assert(MaxA == 0),
?assert(MaxB == 0),
?assert(MaxC == 0)
}
),
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
t_get_metrics_2(_) ->
Metrics = [a, b, c],
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics,
[a]),
ok = emqx_plugin_libs_metrics:create_metrics(
?NAME,
<<"testid">>,
Metrics,
[a]
),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
?assertMatch(#{
?assertMatch(
#{
rate := Rate = #{
a := #{current := _, max := _, last5m := _}
},
@ -139,13 +168,16 @@ t_get_metrics_2(_) ->
b := 1,
c := 1
}
} when map_size(Rate) =:= 1, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
} when map_size(Rate) =:= 1,
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)
),
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
t_recreate_metrics(_) ->
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, [a]),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
?assertMatch(#{
?assertMatch(
#{
rate := R = #{
a := #{current := _, max := _, last5m := _}
},
@ -153,12 +185,14 @@ t_recreate_metrics(_) ->
a := 1
}
} when map_size(R) == 1 andalso map_size(C) == 1,
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)
),
%% we create the metrics again, to add some counters
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, [a, b, c]),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
?assertMatch(#{
?assertMatch(
#{
rate := R = #{
a := #{current := _, max := _, last5m := _},
b := #{current := _, max := _, last5m := _},
@ -168,7 +202,8 @@ t_recreate_metrics(_) ->
a := 1, b := 1, c := 1
}
} when map_size(R) == 3 andalso map_size(C) == 3,
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)
),
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
t_inc_matched(_) ->
@ -192,15 +227,17 @@ t_rate(_) ->
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule:2">>, 'rules.matched'),
?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')),
ct:sleep(1000),
?LET(#{'rules.matched' := #{max := Max, current := Current}},
?LET(
#{'rules.matched' := #{max := Max, current := Current}},
emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
{?assert(Max =< 2),
?assert(Current =< 2)}),
{?assert(Max =< 2), ?assert(Current =< 2)}
),
ct:sleep(2100),
?LET(#{'rules.matched' := #{max := Max, current := Current, last5m := Last5Min}}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
{?assert(Max =< 2),
?assert(Current == 0),
?assert(Last5Min =< 0.67)}),
?LET(
#{'rules.matched' := #{max := Max, current := Current, last5m := Last5Min}},
emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
{?assert(Max =< 2), ?assert(Current == 0), ?assert(Last5Min =< 0.67)}
),
ct:sleep(3000),
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>),
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule:2">>).

View File

@ -28,10 +28,12 @@ all() -> emqx_common_test_helpers:all(?MODULE).
t_http_connectivity(_) ->
{ok, Socket} = gen_tcp:listen(?PORT, []),
ok = emqx_plugin_libs_rule:http_connectivity(
"http://127.0.0.1:"++emqx_plugin_libs_rule:str(?PORT), 1000),
"http://127.0.0.1:" ++ emqx_plugin_libs_rule:str(?PORT), 1000
),
gen_tcp:close(Socket),
{error, _} = emqx_plugin_libs_rule:http_connectivity(
"http://127.0.0.1:"++emqx_plugin_libs_rule:str(?PORT), 1000).
"http://127.0.0.1:" ++ emqx_plugin_libs_rule:str(?PORT), 1000
).
t_tcp_connectivity(_) ->
{ok, Socket} = gen_tcp:listen(?PORT, []),
@ -59,7 +61,8 @@ t_bin(_) ->
?assertError(_, emqx_plugin_libs_rule:bin({a, v})).
t_atom_key(_) ->
_ = erlang, _ = port,
_ = erlang,
_ = port,
?assertEqual([erlang], emqx_plugin_libs_rule:atom_key([<<"erlang">>])),
?assertEqual([erlang, port], emqx_plugin_libs_rule:atom_key([<<"erlang">>, port])),
?assertEqual([erlang, port], emqx_plugin_libs_rule:atom_key([<<"erlang">>, <<"port">>])),
@ -70,8 +73,12 @@ t_atom_key(_) ->
t_unsafe_atom_key(_) ->
?assertEqual([xyz876gv], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv">>])),
?assertEqual([xyz876gv33, port],
emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv33">>, port])),
?assertEqual([xyz876gv331, port1221],
emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv331">>, <<"port1221">>])),
?assertEqual(
[xyz876gv33, port],
emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv33">>, port])
),
?assertEqual(
[xyz876gv331, port1221],
emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv331">>, <<"port1221">>])
),
?assertEqual(xyz876gv3312, emqx_plugin_libs_rule:unsafe_atom_key(<<"xyz876gv3312">>)).

View File

@ -1,11 +1,14 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [ {emqx, {path, "../emqx"}}
, {estatsd, {git, "https://github.com/emqx/estatsd", {tag, "0.1.0"}}}
]}.
{deps, [
{emqx, {path, "../emqx"}},
{estatsd, {git, "https://github.com/emqx/estatsd", {tag, "0.1.0"}}}
]}.
{shell, [
% {config, "config/sys.config"},
{apps, [emqx_statsd]}
]}.
{project_plugins, [erlfmt]}.

View File

@ -1,17 +1,17 @@
%% -*- mode: erlang -*-
{application, emqx_statsd,
[{description, "An OTP application"},
{application, emqx_statsd, [
{description, "An OTP application"},
{vsn, "5.0.0"},
{registered, []},
{mod, {emqx_statsd_app, []}},
{applications,
[kernel,
{applications, [
kernel,
stdlib,
estatsd,
emqx
]},
{env,[]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.
]}.

View File

@ -27,27 +27,29 @@
-include_lib("emqx/include/logger.hrl").
-export([ update/1
, start/0
, stop/0
, restart/0
-export([
update/1,
start/0,
stop/0,
restart/0,
%% for rpc
, do_start/0
, do_stop/0
, do_restart/0
]).
do_start/0,
do_stop/0,
do_restart/0
]).
%% Interface
-export([start_link/1]).
%% Internal Exports
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, code_change/3
, terminate/2
]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3,
terminate/2
]).
-record(state, {
timer :: reference() | undefined,
@ -57,9 +59,13 @@
}).
update(Config) ->
case emqx_conf:update([statsd],
case
emqx_conf:update(
[statsd],
Config,
#{rawconf_with_defaults => true, override_to => cluster}) of
#{rawconf_with_defaults => true, override_to => cluster}
)
of
{ok, #{raw_config := NewConfigRows}} ->
ok = stop(),
case maps:get(<<"enable">>, Config, true) of
@ -95,17 +101,27 @@ init([Opts]) ->
process_flag(trap_exit, true),
Tags = tags(maps:get(tags, Opts, #{})),
{Host, Port} = maps:get(server, Opts, {?DEFAULT_HOST, ?DEFAULT_PORT}),
Opts1 = maps:without([sample_time_interval,
flush_time_interval], Opts#{tags => Tags,
Opts1 = maps:without(
[
sample_time_interval,
flush_time_interval
],
Opts#{
tags => Tags,
host => Host,
port => Port,
prefix => <<"emqx">>}),
prefix => <<"emqx">>
}
),
{ok, Pid} = estatsd:start_link(maps:to_list(Opts1)),
SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL),
FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL),
{ok, ensure_timer(#state{sample_time_interval = SampleTimeInterval,
{ok,
ensure_timer(#state{
sample_time_interval = SampleTimeInterval,
flush_time_interval = FlushTimeInterval,
estatsd_pid = Pid})}.
estatsd_pid = Pid
})}.
handle_call(_Req, _From, State) ->
{noreply, State}.
@ -113,20 +129,25 @@ handle_call(_Req, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({timeout, Ref, sample_timeout},
State = #state{sample_time_interval = SampleTimeInterval,
handle_info(
{timeout, Ref, sample_timeout},
State = #state{
sample_time_interval = SampleTimeInterval,
flush_time_interval = FlushTimeInterval,
estatsd_pid = Pid,
timer = Ref}) ->
timer = Ref
}
) ->
Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(),
SampleRate = SampleTimeInterval / FlushTimeInterval,
StatsdMetrics = [{gauge, trans_metrics_name(Name), Value, SampleRate, []} || {Name, Value} <- Metrics],
StatsdMetrics = [
{gauge, trans_metrics_name(Name), Value, SampleRate, []}
|| {Name, Value} <- Metrics
],
estatsd:submit(Pid, StatsdMetrics),
{noreply, ensure_timer(State)};
handle_info({'EXIT', Pid, Error}, State = #state{estatsd_pid = Pid}) ->
{stop, {shutdown, Error}, State};
handle_info(_Msg, State) ->
{noreply, State}.
@ -145,25 +166,36 @@ trans_metrics_name(Name) ->
binary_to_atom(<<"emqx.", Name0/binary>>, utf8).
emqx_vm_data() ->
Idle = case cpu_sup:util([detailed]) of
{_, 0, 0, _} -> 0; %% Not support for Windows
Idle =
case cpu_sup:util([detailed]) of
%% Not support for Windows
{_, 0, 0, _} -> 0;
{_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0)
end,
RunQueue = erlang:statistics(run_queue),
[{run_queue, RunQueue},
[
{run_queue, RunQueue},
{cpu_idle, Idle},
{cpu_use, 100 - Idle}] ++ emqx_vm:mem_info().
{cpu_use, 100 - Idle}
] ++ emqx_vm:mem_info().
tags(Map) ->
Tags = maps:to_list(Map),
[{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags].
ensure_timer(State =#state{sample_time_interval = SampleTimeInterval}) ->
ensure_timer(State = #state{sample_time_interval = SampleTimeInterval}) ->
State#state{timer = emqx_misc:start_timer(SampleTimeInterval, sample_timeout)}.
check_multicall_result({Results, []}) ->
case lists:all(fun(ok) -> true; (_) -> false end, Results) of
case
lists:all(
fun
(ok) -> true;
(_) -> false
end,
Results
)
of
true -> ok;
false -> error({bad_result, Results})
end;

View File

@ -26,17 +26,17 @@
-export([statsd/2]).
-export([ api_spec/0
, paths/0
, schema/1
]).
-export([
api_spec/0,
paths/0,
schema/1
]).
-define(API_TAG_STATSD, [<<"statsd">>]).
-define(SCHEMA_MODULE, emqx_statsd_schema).
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
@ -44,18 +44,21 @@ paths() ->
["/statsd"].
schema("/statsd") ->
#{ 'operationId' => statsd
, get =>
#{ description => <<"Get statsd config">>
, tags => ?API_TAG_STATSD
, responses =>
#{
'operationId' => statsd,
get =>
#{
description => <<"Get statsd config">>,
tags => ?API_TAG_STATSD,
responses =>
#{200 => statsd_config_schema()}
}
, put =>
#{ description => <<"Set statsd config">>
, tags => ?API_TAG_STATSD
, 'requestBody' => statsd_config_schema()
, responses =>
},
put =>
#{
description => <<"Set statsd config">>,
tags => ?API_TAG_STATSD,
'requestBody' => statsd_config_schema(),
responses =>
#{200 => statsd_config_schema()}
}
}.
@ -67,18 +70,19 @@ schema("/statsd") ->
statsd_config_schema() ->
emqx_dashboard_swagger:schema_with_example(
ref(?SCHEMA_MODULE, "statsd"),
statsd_example()).
statsd_example()
).
statsd_example() ->
#{ enable => true
, flush_time_interval => "32s"
, sample_time_interval => "32s"
, server => "127.0.0.1:8125"
#{
enable => true,
flush_time_interval => "32s",
sample_time_interval => "32s",
server => "127.0.0.1:8125"
}.
statsd(get, _Params) ->
{200, emqx:get_raw_config([<<"statsd">>], #{})};
statsd(put, #{body := Body}) ->
case emqx_statsd:update(Body) of
{ok, NewConfig} ->

View File

@ -1,28 +1,29 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_statsd_app).
-module(emqx_statsd_app).
-behaviour(application).
-behaviour(application).
-include("emqx_statsd.hrl").
-export([ start/2
, stop/1
]).
-export([
start/2,
stop/1
]).
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_statsd_sup:start_link(),

View File

@ -23,10 +23,12 @@
-export([to_ip_port/1]).
-export([ namespace/0
, roots/0
, fields/1
, desc/1]).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
-typerefl_from_string({ip_port/0, emqx_statsd_schema, to_ip_port}).
@ -35,19 +37,23 @@ namespace() -> "statsd".
roots() -> ["statsd"].
fields("statsd") ->
[ {enable, hoconsc:mk(boolean(),
#{ default => false
, required => true
, desc => ?DESC(enable)
})}
, {server, fun server/1}
, {sample_time_interval, fun sample_interval/1}
, {flush_time_interval, fun flush_interval/1}
[
{enable,
hoconsc:mk(
boolean(),
#{
default => false,
required => true,
desc => ?DESC(enable)
}
)},
{server, fun server/1},
{sample_time_interval, fun sample_interval/1},
{flush_time_interval, fun flush_interval/1}
].
desc("statsd") -> ?DESC(statsd);
desc(_) ->
undefined.
desc(_) -> undefined.
server(type) -> emqx_schema:ip_port();
server(required) -> true;
@ -74,5 +80,6 @@ to_ip_port(Str) ->
{ok, R} -> {ok, {R, list_to_integer(Port)}};
_ -> {error, Str}
end;
_ -> {error, Str}
_ ->
{error, Str}
end.

View File

@ -7,21 +7,24 @@
-behaviour(supervisor).
-export([ start_link/0
, ensure_child_started/1
, ensure_child_started/2
, ensure_child_stopped/1
]).
-export([
start_link/0,
ensure_child_started/1,
ensure_child_started/2,
ensure_child_stopped/1
]).
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(Mod, Opts), #{id => Mod,
-define(CHILD(Mod, Opts), #{
id => Mod,
start => {Mod, start_link, [Opts]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [Mod]}).
modules => [Mod]
}).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

View File

@ -18,12 +18,13 @@
-behaviour(emqx_bpapi).
-export([ introduced_in/0
-export([
introduced_in/0,
, start/1
, stop/1
, restart/1
]).
start/1,
stop/1,
restart/1
]).
-include_lib("emqx/include/bpapi.hrl").

View File

@ -21,8 +21,7 @@ t_statsd(_) ->
receive
{udp, _Socket, _Host, _Port, Bin} ->
?assert(length(Bin) > 50)
after
11*1000 ->
after 11 * 1000 ->
?assert(true, failed)
end,
gen_udp:close(Socket).

View File

@ -15,6 +15,7 @@ APPS+=( 'apps/emqx_exhook')
APPS+=( 'apps/emqx_retainer' 'apps/emqx_slow_subs')
APPS+=( 'apps/emqx_management')
APPS+=( 'apps/emqx_psk')
APPS+=( 'apps/emqx_plugin_libs' 'apps/emqx_machine' 'apps/emqx_statsd' )
for app in "${APPS[@]}"; do
echo "$app ..."