From b4451823350ec46126c49ca915b4b169dd4cf49e Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sat, 23 Apr 2022 09:55:50 +0200 Subject: [PATCH] style: reformat emqx_machine, emqx_plugin_libs, and emqx_statsd --- apps/emqx_machine/rebar.config | 5 +- apps/emqx_machine/src/emqx_global_gc.erl | 28 +- apps/emqx_machine/src/emqx_machine.app.src | 30 +- apps/emqx_machine/src/emqx_machine.erl | 48 +-- apps/emqx_machine/src/emqx_machine_app.erl | 7 +- apps/emqx_machine/src/emqx_machine_boot.erl | 38 ++- .../src/emqx_machine_signal_handler.erl | 22 +- apps/emqx_machine/src/emqx_machine_sup.erl | 27 +- .../src/emqx_machine_terminator.erl | 42 +-- .../src/emqx_restricted_shell.erl | 31 +- apps/emqx_machine/test/emqx_machine_SUITE.erl | 35 +-- apps/emqx_machine/test/emqx_machine_tests.erl | 33 +- .../test/emqx_restricted_shell_SUITE.erl | 46 ++- apps/emqx_plugin_libs/rebar.config | 5 +- .../emqx_plugin_libs/src/emqx_placeholder.erl | 184 +++++------ .../src/emqx_plugin_libs.app.src | 14 +- .../src/emqx_plugin_libs_metrics.erl | 286 ++++++++++-------- .../src/emqx_plugin_libs_pool.erl | 38 ++- .../src/emqx_plugin_libs_rule.erl | 231 ++++++++------ .../src/proto/emqx_plugin_libs_proto_v1.erl | 16 +- .../test/emqx_placeholder_SUITE.erl | 119 +++++--- .../test/emqx_plugin_libs_metrics_SUITE.erl | 177 ++++++----- .../test/emqx_plugin_libs_rule_SUITE.erl | 21 +- apps/emqx_statsd/rebar.config | 11 +- apps/emqx_statsd/src/emqx_statsd.app.src | 32 +- apps/emqx_statsd/src/emqx_statsd.erl | 134 ++++---- apps/emqx_statsd/src/emqx_statsd_api.erl | 58 ++-- apps/emqx_statsd/src/emqx_statsd_app.erl | 39 +-- apps/emqx_statsd/src/emqx_statsd_schema.erl | 51 ++-- apps/emqx_statsd/src/emqx_statsd_sup.erl | 25 +- .../src/proto/emqx_statsd_proto_v1.erl | 11 +- apps/emqx_statsd/test/emqx_statsd_SUITE.erl | 5 +- scripts/check-format.sh | 1 + 33 files changed, 1080 insertions(+), 770 deletions(-) diff --git a/apps/emqx_machine/rebar.config b/apps/emqx_machine/rebar.config index 528efecb6..9f17b7657 100644 --- a/apps/emqx_machine/rebar.config +++ b/apps/emqx_machine/rebar.config @@ -1,4 +1,5 @@ %% -*- mode: erlang -*- -{deps, [ {emqx, {path, "../emqx"}} - ]}. +{deps, [{emqx, {path, "../emqx"}}]}. + +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_machine/src/emqx_global_gc.erl b/apps/emqx_machine/src/emqx_global_gc.erl index af92fcaef..74825c321 100644 --- a/apps/emqx_machine/src/emqx_global_gc.erl +++ b/apps/emqx_machine/src/emqx_global_gc.erl @@ -23,13 +23,14 @@ -export([run/0]). %% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). %% 5 minutes %% -define(DEFAULT_INTERVAL, 300000). @@ -38,14 +39,14 @@ %% APIs %%-------------------------------------------------------------------- --spec(start_link() -> {ok, pid()} | ignore | {error, term()}). +-spec start_link() -> {ok, pid()} | ignore | {error, term()}. start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). --spec(run() -> {ok, timer:time()}). +-spec run() -> {ok, timer:time()}. run() -> gen_server:call(?MODULE, run, infinity). --spec(stop() -> ok). +-spec stop() -> ok. stop() -> gen_server:stop(?MODULE). %%-------------------------------------------------------------------- @@ -58,7 +59,6 @@ init([]) -> handle_call(run, _From, State) -> {Time, ok} = timer:tc(fun run_gc/0), {reply, {ok, Time div 1000}, State, hibernate}; - handle_call(_Req, _From, State) -> {reply, ignored, State}. @@ -68,7 +68,6 @@ handle_cast(_Msg, State) -> handle_info({timeout, TRef, run}, State = #{timer := TRef}) -> ok = run_gc(), {noreply, ensure_timer(State), hibernate}; - handle_info(_Info, State) -> {noreply, State}. @@ -84,8 +83,9 @@ code_change(_OldVsn, State, _Extra) -> ensure_timer(State) -> case application:get_env(emqx_machine, global_gc_interval) of - undefined -> State; - {ok, Interval} -> + undefined -> + State; + {ok, Interval} -> TRef = emqx_misc:start_timer(Interval, run), State#{timer := TRef} end. diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src index 15ce275ec..59a5ad4b8 100644 --- a/apps/emqx_machine/src/emqx_machine.app.src +++ b/apps/emqx_machine/src/emqx_machine.app.src @@ -1,16 +1,18 @@ %% -*- mode: erlang -*- -{application, emqx_machine, - [{id, "emqx_machine"}, - {description, "The EMQX Machine"}, - {vsn, "0.1.0"}, % strict semver, bump manually! - {modules, []}, - {registered, []}, - {applications, [kernel,stdlib]}, - {mod, {emqx_machine_app,[]}}, - {env, []}, - {licenses, ["Apache-2.0"]}, - {maintainers, ["EMQX Team "]}, - {links, [{"Homepage", "https://emqx.io/"}, - {"Github", "https://github.com/emqx/emqx"} - ]} +{application, emqx_machine, [ + {id, "emqx_machine"}, + {description, "The EMQX Machine"}, + % strict semver, bump manually! + {vsn, "0.1.0"}, + {modules, []}, + {registered, []}, + {applications, [kernel, stdlib]}, + {mod, {emqx_machine_app, []}}, + {env, []}, + {licenses, ["Apache-2.0"]}, + {maintainers, ["EMQX Team "]}, + {links, [ + {"Homepage", "https://emqx.io/"}, + {"Github", "https://github.com/emqx/emqx"} + ]} ]}. diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 0285e5141..2a4595bf9 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -16,23 +16,26 @@ -module(emqx_machine). --export([ start/0 - , graceful_shutdown/0 - , is_ready/0 +-export([ + start/0, + graceful_shutdown/0, + is_ready/0, - , node_status/0 - , update_vips/0 - ]). + node_status/0, + update_vips/0 +]). -include_lib("emqx/include/logger.hrl"). %% @doc EMQX boot entrypoint. start() -> case os:type() of - {win32, nt} -> ok; + {win32, nt} -> + ok; _Nix -> os:set_signal(sighup, ignore), - os:set_signal(sigterm, handle) %% default is handle + %% default is handle + os:set_signal(sigterm, handle) end, ok = set_backtrace_depth(), start_sysmon(), @@ -56,16 +59,19 @@ is_ready() -> print_otp_version_warning() -> ok. -else. print_otp_version_warning() -> - ?ULOG("WARNING: Running on Erlang/OTP version ~p. Recommended: 23~n", - [?OTP_RELEASE]). --endif. % OTP_RELEASE > 22 + ?ULOG( + "WARNING: Running on Erlang/OTP version ~p. Recommended: 23~n", + [?OTP_RELEASE] + ). +% OTP_RELEASE > 22 +-endif. start_sysmon() -> _ = application:load(system_monitor), application:set_env(system_monitor, node_status_fun, {?MODULE, node_status}), application:set_env(system_monitor, status_checks, [{?MODULE, update_vips, false, 10}]), case application:get_env(system_monitor, db_hostname) of - {ok, [_ | _]} -> + {ok, [_ | _]} -> application:set_env(system_monitor, callback_mod, system_monitor_pg), _ = application:ensure_all_started(system_monitor, temporary), ok; @@ -76,9 +82,10 @@ start_sysmon() -> end. node_status() -> - emqx_json:encode(#{ backend => mria_rlog:backend() - , role => mria_rlog:role() - }). + emqx_json:encode(#{ + backend => mria_rlog:backend(), + role => mria_rlog:role() + }). update_vips() -> system_monitor:add_vip(mria_status:shards_up()). @@ -86,8 +93,9 @@ update_vips() -> configure_shard_transports() -> ShardTransports = application:get_env(emqx_machine, custom_shard_transports, #{}), maps:foreach( - fun(ShardBin, Transport) -> - ShardName = binary_to_existing_atom(ShardBin), - mria_config:set_shard_transport(ShardName, Transport) - end, - ShardTransports). + fun(ShardBin, Transport) -> + ShardName = binary_to_existing_atom(ShardBin), + mria_config:set_shard_transport(ShardName, Transport) + end, + ShardTransports + ). diff --git a/apps/emqx_machine/src/emqx_machine_app.erl b/apps/emqx_machine/src/emqx_machine_app.erl index ae837db2c..bf3637567 100644 --- a/apps/emqx_machine/src/emqx_machine_app.erl +++ b/apps/emqx_machine/src/emqx_machine_app.erl @@ -16,9 +16,10 @@ -module(emqx_machine_app). --export([ start/2 - , stop/1 - ]). +-export([ + start/2, + stop/1 +]). -behaviour(application). diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index 20d0aa4b6..638b1488f 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -37,16 +37,18 @@ post_boot() -> -ifdef(TEST). print_vsn() -> ok. --else. % TEST +% TEST +-else. print_vsn() -> ?ULOG("~ts ~ts is running now!~n", [emqx_app:get_description(), emqx_app:get_release()]). --endif. % TEST - +% TEST +-endif. start_autocluster() -> ekka:callback(stop, fun emqx_machine_boot:stop_apps/0), ekka:callback(start, fun emqx_machine_boot:ensure_apps_started/0), - _ = ekka:autocluster(emqx), %% returns 'ok' or a pid or 'any()' as in spec + %% returns 'ok' or a pid or 'any()' as in spec + _ = ekka:autocluster(emqx), ok. stop_apps() -> @@ -59,11 +61,13 @@ stop_one_app(App) -> try _ = application:stop(App) catch - C : E -> - ?SLOG(error, #{msg => "failed_to_stop_app", + C:E -> + ?SLOG(error, #{ + msg => "failed_to_stop_app", app => App, exception => C, - reason => E}) + reason => E + }) end. ensure_apps_started() -> @@ -121,16 +125,21 @@ sorted_reboot_apps(Apps) -> %% Isolated apps without which are not dependency of any other apps are %% put to the end of the list in the original order. add_apps_to_digraph(G, Apps) -> - lists:foldl(fun + lists:foldl( + fun ({App, undefined}, Acc) -> ?SLOG(debug, #{msg => "app_is_not_loaded", app => App}), Acc; ({App, []}, Acc) -> - Acc ++ [App]; %% use '++' to keep the original order + %% use '++' to keep the original order + Acc ++ [App]; ({App, Deps}, Acc) -> add_app_deps_to_digraph(G, App, Deps), Acc - end, [], Apps). + end, + [], + Apps + ). add_app_deps_to_digraph(G, App, undefined) -> ?SLOG(debug, #{msg => "app_is_not_loaded", app => App}), @@ -141,14 +150,17 @@ add_app_deps_to_digraph(_G, _App, []) -> add_app_deps_to_digraph(G, App, [Dep | Deps]) -> digraph:add_vertex(G, App), digraph:add_vertex(G, Dep), - digraph:add_edge(G, Dep, App), %% dep -> app as dependency + %% dep -> app as dependency + digraph:add_edge(G, Dep, App), add_app_deps_to_digraph(G, App, Deps). find_loops(G) -> lists:filtermap( - fun (App) -> + fun(App) -> case digraph:get_short_cycle(G, App) of false -> false; Apps -> {true, Apps} end - end, digraph:vertices(G)). + end, + digraph:vertices(G) + ). diff --git a/apps/emqx_machine/src/emqx_machine_signal_handler.erl b/apps/emqx_machine/src/emqx_machine_signal_handler.erl index e86724011..cdc21ab90 100644 --- a/apps/emqx_machine/src/emqx_machine_signal_handler.erl +++ b/apps/emqx_machine/src/emqx_machine_signal_handler.erl @@ -20,17 +20,25 @@ %% perform graceful shutdown. -module(emqx_machine_signal_handler). --export([start/0, init/1, format_status/2, - handle_event/2, handle_call/2, handle_info/2, - terminate/2, code_change/3]). +-export([ + start/0, + init/1, + format_status/2, + handle_event/2, + handle_call/2, + handle_info/2, + terminate/2, + code_change/3 +]). -include_lib("emqx/include/logger.hrl"). start() -> ok = gen_event:swap_sup_handler( - erl_signal_server, - {erl_signal_handler, []}, - {?MODULE, []}). + erl_signal_server, + {erl_signal_handler, []}, + {?MODULE, []} + ). init({[], _}) -> {ok, #{}}. @@ -53,7 +61,7 @@ handle_info(_Other, State) -> handle_call(_Request, State) -> {ok, ok, State}. -format_status(_Opt, [_Pdict,_S]) -> +format_status(_Opt, [_Pdict, _S]) -> ok. code_change(_OldVsn, State, _Extra) -> diff --git a/apps/emqx_machine/src/emqx_machine_sup.erl b/apps/emqx_machine/src/emqx_machine_sup.erl index f667ec6f7..14a7b4387 100644 --- a/apps/emqx_machine/src/emqx_machine_sup.erl +++ b/apps/emqx_machine/src/emqx_machine_sup.erl @@ -20,8 +20,7 @@ -behaviour(supervisor). --export([ start_link/0 - ]). +-export([start_link/0]). -export([init/1]). @@ -33,20 +32,22 @@ init([]) -> BootApps = child_worker(emqx_machine_boot, post_boot, [], temporary), GlobalGC = child_worker(emqx_global_gc, [], permanent), Children = [Terminator, BootApps, GlobalGC], - SupFlags = #{strategy => one_for_one, - intensity => 100, - period => 10 - }, + SupFlags = #{ + strategy => one_for_one, + intensity => 100, + period => 10 + }, {ok, {SupFlags, Children}}. child_worker(M, Args, Restart) -> child_worker(M, start_link, Args, Restart). child_worker(M, Func, Args, Restart) -> - #{id => M, - start => {M, Func, Args}, - restart => Restart, - shutdown => 5000, - type => worker, - modules => [M] - }. + #{ + id => M, + start => {M, Func, Args}, + restart => Restart, + shutdown => 5000, + type => worker, + modules => [M] + }. diff --git a/apps/emqx_machine/src/emqx_machine_terminator.erl b/apps/emqx_machine/src/emqx_machine_terminator.erl index 3479103c5..23fcb7df2 100644 --- a/apps/emqx_machine/src/emqx_machine_terminator.erl +++ b/apps/emqx_machine/src/emqx_machine_terminator.erl @@ -18,15 +18,22 @@ -behaviour(gen_server). --export([ start_link/0 - , graceful/0 - , graceful_wait/0 - , is_running/0 - ]). +-export([ + start_link/0, + graceful/0, + graceful_wait/0, + is_running/0 +]). --export([init/1, format_status/2, - handle_cast/2, handle_call/3, handle_info/2, - terminate/2, code_change/3]). +-export([ + init/1, + format_status/2, + handle_cast/2, + handle_call/3, + handle_info/2, + terminate/2, + code_change/3 +]). -include_lib("emqx/include/logger.hrl"). @@ -47,7 +54,7 @@ graceful() -> try _ = gen_server:call(?TERMINATOR, ?DO_IT, infinity) catch - _ : _ -> + _:_ -> %% failed to notify terminator, probably due to not started yet %% or node is going down, either case, the caller %% should issue a shutdown to be sure @@ -82,14 +89,15 @@ handle_call(?DO_IT, _From, State) -> try emqx_machine_boot:stop_apps() catch - C : E : St -> + C:E:St -> Apps = [element(1, A) || A <- application:which_applications()], - ?SLOG(error, #{msg => "failed_to_stop_apps", - exception => C, - reason => E, - stacktrace => St, - remaining_apps => Apps - }) + ?SLOG(error, #{ + msg => "failed_to_stop_apps", + exception => C, + reason => E, + stacktrace => St, + remaining_apps => Apps + }) after init:stop() end, @@ -97,7 +105,7 @@ handle_call(?DO_IT, _From, State) -> handle_call(_Call, _From, State) -> {noreply, State}. -format_status(_Opt, [_Pdict,_S]) -> +format_status(_Opt, [_Pdict, _S]) -> ok. code_change(_OldVsn, State, _Extra) -> diff --git a/apps/emqx_machine/src/emqx_restricted_shell.erl b/apps/emqx_machine/src/emqx_restricted_shell.erl index d855b7aff..977821247 100644 --- a/apps/emqx_machine/src/emqx_restricted_shell.erl +++ b/apps/emqx_machine/src/emqx_restricted_shell.erl @@ -46,7 +46,7 @@ prompt_func(PropList) -> Line = proplists:get_value(history, PropList, 1), Version = emqx_release:version(), case is_alive() of - true -> io_lib:format(<<"~ts(~s)~w> ">>, [Version, node(), Line]); + true -> io_lib:format(<<"~ts(~s)~w> ">>, [Version, node(), Line]); false -> io_lib:format(<<"~ts ~w> ">>, [Version, Line]) end. @@ -77,27 +77,35 @@ limit_warning(MF, Args) -> max_args_warning(MF, Args) -> ArgsSize = erts_debug:flat_size(Args), case ArgsSize < ?MAX_ARGS_SIZE of - true -> ok; + true -> + ok; false -> warning("[WARNING] current_args_size:~w, max_args_size:~w", [ArgsSize, ?MAX_ARGS_SIZE]), - ?SLOG(warning, #{msg => "execute_function_in_shell_max_args_size", + ?SLOG(warning, #{ + msg => "execute_function_in_shell_max_args_size", function => MF, args => Args, args_size => ArgsSize, - max_heap_size => ?MAX_ARGS_SIZE}) + max_heap_size => ?MAX_ARGS_SIZE + }) end. max_heap_size_warning(MF, Args) -> {heap_size, HeapSize} = erlang:process_info(self(), heap_size), case HeapSize < ?MAX_HEAP_SIZE of - true -> ok; + true -> + ok; false -> - warning("[WARNING] current_heap_size:~w, max_heap_size_warning:~w", [HeapSize, ?MAX_HEAP_SIZE]), - ?SLOG(warning, #{msg => "shell_process_exceed_max_heap_size", + warning("[WARNING] current_heap_size:~w, max_heap_size_warning:~w", [ + HeapSize, ?MAX_HEAP_SIZE + ]), + ?SLOG(warning, #{ + msg => "shell_process_exceed_max_heap_size", current_heap_size => HeapSize, function => MF, args => Args, - max_heap_size => ?MAX_HEAP_SIZE}) + max_heap_size => ?MAX_HEAP_SIZE + }) end. log(prohibited, MF, Args) -> @@ -105,8 +113,11 @@ log(prohibited, MF, Args) -> ?SLOG(error, #{msg => "execute_function_in_shell_prohibited", function => MF, args => Args}); log(exempted, MF, Args) -> limit_warning(MF, Args), - ?SLOG(error, #{msg => "execute_dangerous_function_in_shell_exempted", function => MF, args => Args}); -log(ignore, MF, Args) -> limit_warning(MF, Args). + ?SLOG(error, #{ + msg => "execute_dangerous_function_in_shell_exempted", function => MF, args => Args + }); +log(ignore, MF, Args) -> + limit_warning(MF, Args). warning(Format, Args) -> io:format(?RED_BG ++ Format ++ ?RESET ++ "~n", Args). diff --git a/apps/emqx_machine/test/emqx_machine_SUITE.erl b/apps/emqx_machine/test/emqx_machine_SUITE.erl index 01b2d59fc..f865b0f26 100644 --- a/apps/emqx_machine/test/emqx_machine_SUITE.erl +++ b/apps/emqx_machine/test/emqx_machine_SUITE.erl @@ -43,23 +43,24 @@ init_per_suite(Config) -> %% application:unload(emqx_authz), emqx_common_test_helpers:start_apps([emqx_conf]), - application:set_env(emqx_machine, applications, [ emqx_prometheus - , emqx_modules - , emqx_dashboard - , emqx_connector - , emqx_gateway - , emqx_statsd - , emqx_resource - , emqx_rule_engine - , emqx_bridge - , emqx_plugin_libs - , emqx_management - , emqx_retainer - , emqx_exhook - , emqx_authn - , emqx_authz - , emqx_plugin - ]), + application:set_env(emqx_machine, applications, [ + emqx_prometheus, + emqx_modules, + emqx_dashboard, + emqx_connector, + emqx_gateway, + emqx_statsd, + emqx_resource, + emqx_rule_engine, + emqx_bridge, + emqx_plugin_libs, + emqx_management, + emqx_retainer, + emqx_exhook, + emqx_authn, + emqx_authz, + emqx_plugin + ]), Config. end_per_suite(_Config) -> diff --git a/apps/emqx_machine/test/emqx_machine_tests.erl b/apps/emqx_machine/test/emqx_machine_tests.erl index f628e1840..c8d5fb7e8 100644 --- a/apps/emqx_machine/test/emqx_machine_tests.erl +++ b/apps/emqx_machine/test/emqx_machine_tests.erl @@ -19,22 +19,26 @@ -include_lib("eunit/include/eunit.hrl"). sorted_reboot_apps_test_() -> - Apps1 = [{1, [2, 3, 4]}, - {2, [3, 4]} - ], - Apps2 = [{1, [2, 3, 4]}, - {2, [3, 4]}, - {5, [4, 3, 2, 1, 1]} - ], - [fun() -> check_order(Apps1) end, - fun() -> check_order(Apps2) end + Apps1 = [ + {1, [2, 3, 4]}, + {2, [3, 4]} + ], + Apps2 = [ + {1, [2, 3, 4]}, + {2, [3, 4]}, + {5, [4, 3, 2, 1, 1]} + ], + [ + fun() -> check_order(Apps1) end, + fun() -> check_order(Apps2) end ]. sorted_reboot_apps_cycle_test() -> - Apps = [{1,[2]},{2, [1,3]}], - ?assertError({circular_application_dependency, [[1, 2, 1], [2, 1, 2]]}, - check_order(Apps)). - + Apps = [{1, [2]}, {2, [1, 3]}], + ?assertError( + {circular_application_dependency, [[1, 2, 1], [2, 1, 2]]}, + check_order(Apps) + ). check_order(Apps) -> AllApps = lists:usort(lists:append([[A | Deps] || {A, Deps} <- Apps])), @@ -47,7 +51,8 @@ check_order(Apps) -> lists:foldr(fun(A, {I, Acc}) -> {I + 1, [{A, I} | Acc]} end, {1, []}, Sorted), do_check_order(Apps, SortedWithIndex). -do_check_order([], _) -> ok; +do_check_order([], _) -> + ok; do_check_order([{A, Deps} | Rest], Sorted) -> case lists:filter(fun(Dep) -> is_sorted_before(Dep, A, Sorted) end, Deps) of [] -> do_check_order(Rest, Sorted); diff --git a/apps/emqx_machine/test/emqx_restricted_shell_SUITE.erl b/apps/emqx_machine/test/emqx_restricted_shell_SUITE.erl index 4ae0e66a2..8ab5637ee 100644 --- a/apps/emqx_machine/test/emqx_restricted_shell_SUITE.erl +++ b/apps/emqx_machine/test/emqx_restricted_shell_SUITE.erl @@ -35,25 +35,39 @@ end_per_suite(_Config) -> t_local_allowed(_Config) -> LocalProhibited = [halt, q], State = undefined, - lists:foreach(fun(LocalFunc) -> - ?assertEqual({false, State}, emqx_restricted_shell:local_allowed(LocalFunc, [], State)) - end, LocalProhibited), + lists:foreach( + fun(LocalFunc) -> + ?assertEqual({false, State}, emqx_restricted_shell:local_allowed(LocalFunc, [], State)) + end, + LocalProhibited + ), LocalAllowed = [ls, pwd], - lists:foreach(fun(LocalFunc) -> - ?assertEqual({true, State},emqx_restricted_shell:local_allowed(LocalFunc, [], State)) - end, LocalAllowed), + lists:foreach( + fun(LocalFunc) -> + ?assertEqual({true, State}, emqx_restricted_shell:local_allowed(LocalFunc, [], State)) + end, + LocalAllowed + ), ok. t_non_local_allowed(_Config) -> RemoteProhibited = [{erlang, halt}, {c, q}, {init, stop}, {init, restart}, {init, reboot}], State = undefined, - lists:foreach(fun(RemoteFunc) -> - ?assertEqual({false, State}, emqx_restricted_shell:non_local_allowed(RemoteFunc, [], State)) - end, RemoteProhibited), + lists:foreach( + fun(RemoteFunc) -> + ?assertEqual( + {false, State}, emqx_restricted_shell:non_local_allowed(RemoteFunc, [], State) + ) + end, + RemoteProhibited + ), RemoteAllowed = [{erlang, date}, {erlang, system_time}], - lists:foreach(fun(RemoteFunc) -> - ?assertEqual({true, State}, emqx_restricted_shell:local_allowed(RemoteFunc, [], State)) - end, RemoteAllowed), + lists:foreach( + fun(RemoteFunc) -> + ?assertEqual({true, State}, emqx_restricted_shell:local_allowed(RemoteFunc, [], State)) + end, + RemoteAllowed + ), ok. t_lock(_Config) -> @@ -62,11 +76,15 @@ t_lock(_Config) -> ?assertEqual({false, State}, emqx_restricted_shell:local_allowed(q, [], State)), ?assertEqual({true, State}, emqx_restricted_shell:local_allowed(ls, [], State)), ?assertEqual({false, State}, emqx_restricted_shell:non_local_allowed({init, stop}, [], State)), - ?assertEqual({true, State}, emqx_restricted_shell:non_local_allowed({inet, getifaddrs}, [], State)), + ?assertEqual( + {true, State}, emqx_restricted_shell:non_local_allowed({inet, getifaddrs}, [], State) + ), emqx_restricted_shell:unlock(), ?assertEqual({true, State}, emqx_restricted_shell:local_allowed(q, [], State)), ?assertEqual({true, State}, emqx_restricted_shell:local_allowed(ls, [], State)), ?assertEqual({true, State}, emqx_restricted_shell:non_local_allowed({init, stop}, [], State)), - ?assertEqual({true, State}, emqx_restricted_shell:non_local_allowed({inet, getifaddrs}, [], State)), + ?assertEqual( + {true, State}, emqx_restricted_shell:non_local_allowed({inet, getifaddrs}, [], State) + ), emqx_restricted_shell:lock(), ok. diff --git a/apps/emqx_plugin_libs/rebar.config b/apps/emqx_plugin_libs/rebar.config index 528efecb6..9f17b7657 100644 --- a/apps/emqx_plugin_libs/rebar.config +++ b/apps/emqx_plugin_libs/rebar.config @@ -1,4 +1,5 @@ %% -*- mode: erlang -*- -{deps, [ {emqx, {path, "../emqx"}} - ]}. +{deps, [{emqx, {path, "../emqx"}}]}. + +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_plugin_libs/src/emqx_placeholder.erl b/apps/emqx_plugin_libs/src/emqx_placeholder.erl index d72d6c58c..f63f5eefc 100644 --- a/apps/emqx_plugin_libs/src/emqx_placeholder.erl +++ b/apps/emqx_plugin_libs/src/emqx_placeholder.erl @@ -17,29 +17,31 @@ -module(emqx_placeholder). %% preprocess and process template string with place holders --export([ preproc_tmpl/1 - , preproc_tmpl/2 - , proc_tmpl/2 - , proc_tmpl/3 - , preproc_cmd/1 - , proc_cmd/2 - , proc_cmd/3 - , preproc_sql/1 - , preproc_sql/2 - , proc_sql/2 - , proc_sql_param_str/2 - , proc_cql_param_str/2 - , preproc_tmpl_deep/1 - , preproc_tmpl_deep/2 - , proc_tmpl_deep/2 - , proc_tmpl_deep/3 +-export([ + preproc_tmpl/1, + preproc_tmpl/2, + proc_tmpl/2, + proc_tmpl/3, + preproc_cmd/1, + proc_cmd/2, + proc_cmd/3, + preproc_sql/1, + preproc_sql/2, + proc_sql/2, + proc_sql_param_str/2, + proc_cql_param_str/2, + preproc_tmpl_deep/1, + preproc_tmpl_deep/2, + proc_tmpl_deep/2, + proc_tmpl_deep/3, - , bin/1 - , sql_data/1 - ]). + bin/1, + sql_data/1 +]). -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})"). --define(EX_WITHE_CHARS, "\\s"). %% Space and CRLF +%% Space and CRLF +-define(EX_WITHE_CHARS, "\\s"). -type tmpl_token() :: list({var, binary()} | {str, binary()}). @@ -48,26 +50,32 @@ -type prepare_statement_key() :: binary(). -type var_trans() :: - fun((FoundValue :: term()) -> binary()) | - fun((Placeholder :: term(), FoundValue :: term()) -> binary()). + fun((FoundValue :: term()) -> binary()) + | fun((Placeholder :: term(), FoundValue :: term()) -> binary()). -type preproc_tmpl_opts() :: #{placeholders => list(binary())}. --type preproc_sql_opts() :: #{placeholders => list(binary()), - replace_with => '?' | '$n'}. +-type preproc_sql_opts() :: #{ + placeholders => list(binary()), + replace_with => '?' | '$n' +}. --type preproc_deep_opts() :: #{placeholders => list(binary()), - process_keys => boolean()}. +-type preproc_deep_opts() :: #{ + placeholders => list(binary()), + process_keys => boolean() +}. --type proc_tmpl_opts() :: #{return => rawlist | full_binary, - var_trans => var_trans()}. +-type proc_tmpl_opts() :: #{ + return => rawlist | full_binary, + var_trans => var_trans() +}. -type deep_template() :: - #{deep_template() => deep_template()} | - {tuple, [deep_template()]} | - [deep_template()] | - {tmpl, tmpl_token()} | - {value, term()}. + #{deep_template() => deep_template()} + | {tuple, [deep_template()]} + | [deep_template()] + | {tmpl, tmpl_token()} + | {value, term()}. %%------------------------------------------------------------------------------ %% APIs @@ -83,7 +91,6 @@ preproc_tmpl(Str, Opts) -> Tokens = re:split(Str, RE, [{return, binary}, group, trim]), do_preproc_tmpl(Tokens, []). - -spec proc_tmpl(tmpl_token(), map()) -> binary(). proc_tmpl(Tokens, Data) -> proc_tmpl(Tokens, Data, #{return => full_binary}). @@ -92,24 +99,27 @@ proc_tmpl(Tokens, Data) -> proc_tmpl(Tokens, Data, Opts = #{return := full_binary}) -> Trans = maps:get(var_trans, Opts, fun emqx_plugin_libs_rule:bin/1), list_to_binary( - proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Trans})); - + proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Trans}) + ); proc_tmpl(Tokens, Data, Opts = #{return := rawlist}) -> Trans = maps:get(var_trans, Opts, undefined), lists:map( - fun ({str, Str}) -> Str; + fun + ({str, Str}) -> + Str; ({var, Phld}) when is_function(Trans, 1) -> Trans(get_phld_var(Phld, Data)); ({var, Phld}) when is_function(Trans, 2) -> Trans(Phld, get_phld_var(Phld, Data)); ({var, Phld}) -> get_phld_var(Phld, Data) - end, Tokens). - + end, + Tokens + ). -spec preproc_cmd(binary()) -> tmpl_cmd(). preproc_cmd(Str) -> - SubStrList = re:split(Str, ?EX_WITHE_CHARS, [{return,binary},trim]), + SubStrList = re:split(Str, ?EX_WITHE_CHARS, [{return, binary}, trim]), [preproc_tmpl(SubStr) || SubStr <- SubStrList]. -spec proc_cmd([tmpl_token()], map()) -> binary() | list(). @@ -119,17 +129,15 @@ proc_cmd(Tokens, Data) -> proc_cmd(Tokens, Data, Opts) -> [proc_tmpl(Tks, Data, Opts) || Tks <- Tokens]. - %% preprocess SQL with place holders --spec preproc_sql(Sql::binary()) -> {prepare_statement_key(), tmpl_token()}. +-spec preproc_sql(Sql :: binary()) -> {prepare_statement_key(), tmpl_token()}. preproc_sql(Sql) -> preproc_sql(Sql, '?'). -spec preproc_sql(binary(), '?' | '$n' | preproc_sql_opts()) -> - {prepare_statement_key(), tmpl_token()}. + {prepare_statement_key(), tmpl_token()}. preproc_sql(Sql, ReplaceWith) when is_atom(ReplaceWith) -> preproc_sql(Sql, #{replace_with => ReplaceWith}); - preproc_sql(Sql, Opts) -> RE = preproc_var_re(Opts), ReplaceWith = maps:get(replace_with, Opts, '?'), @@ -141,22 +149,18 @@ preproc_sql(Sql, Opts) -> {Sql, []} end. - -spec proc_sql(tmpl_token(), map()) -> list(). proc_sql(Tokens, Data) -> proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => fun sql_data/1}). - -spec proc_sql_param_str(tmpl_token(), map()) -> binary(). proc_sql_param_str(Tokens, Data) -> proc_param_str(Tokens, Data, fun quote_sql/1). - -spec proc_cql_param_str(tmpl_token(), map()) -> binary(). proc_cql_param_str(Tokens, Data) -> proc_param_str(Tokens, Data, fun quote_cql/1). - -spec preproc_tmpl_deep(term()) -> deep_template(). preproc_tmpl_deep(Data) -> preproc_tmpl_deep(Data, #{process_keys => true}). @@ -164,26 +168,22 @@ preproc_tmpl_deep(Data) -> -spec preproc_tmpl_deep(term(), preproc_deep_opts()) -> deep_template(). preproc_tmpl_deep(List, Opts) when is_list(List) -> [preproc_tmpl_deep(El, Opts) || El <- List]; - preproc_tmpl_deep(Map, Opts) when is_map(Map) -> maps:from_list( - lists:map( - fun({K, V}) -> - {preproc_tmpl_deep_map_key(K, Opts), - preproc_tmpl_deep(V, Opts)} - end, - maps:to_list(Map))); - + lists:map( + fun({K, V}) -> + {preproc_tmpl_deep_map_key(K, Opts), preproc_tmpl_deep(V, Opts)} + end, + maps:to_list(Map) + ) + ); preproc_tmpl_deep(Binary, Opts) when is_binary(Binary) -> {tmpl, preproc_tmpl(Binary, Opts)}; - preproc_tmpl_deep(Tuple, Opts) when is_tuple(Tuple) -> {tuple, preproc_tmpl_deep(tuple_to_list(Tuple), Opts)}; - preproc_tmpl_deep(Other, _Opts) -> {value, Other}. - -spec proc_tmpl_deep(deep_template(), map()) -> term(). proc_tmpl_deep(DeepTmpl, Data) -> proc_tmpl_deep(DeepTmpl, Data, #{return => full_binary}). @@ -191,24 +191,22 @@ proc_tmpl_deep(DeepTmpl, Data) -> -spec proc_tmpl_deep(deep_template(), map(), proc_tmpl_opts()) -> term(). proc_tmpl_deep(List, Data, Opts) when is_list(List) -> [proc_tmpl_deep(El, Data, Opts) || El <- List]; - proc_tmpl_deep(Map, Data, Opts) when is_map(Map) -> maps:from_list( - lists:map( - fun({K, V}) -> - {proc_tmpl_deep(K, Data, Opts), - proc_tmpl_deep(V, Data, Opts)} - end, - maps:to_list(Map))); - -proc_tmpl_deep({value, Value}, _Data, _Opts) -> Value; - -proc_tmpl_deep({tmpl, Tokens}, Data, Opts) -> proc_tmpl(Tokens, Data, Opts); - + lists:map( + fun({K, V}) -> + {proc_tmpl_deep(K, Data, Opts), proc_tmpl_deep(V, Data, Opts)} + end, + maps:to_list(Map) + ) + ); +proc_tmpl_deep({value, Value}, _Data, _Opts) -> + Value; +proc_tmpl_deep({tmpl, Tokens}, Data, Opts) -> + proc_tmpl(Tokens, Data, Opts); proc_tmpl_deep({tuple, Elements}, Data, Opts) -> list_to_tuple([proc_tmpl_deep(El, Data, Opts) || El <- Elements]). - -spec sql_data(term()) -> term(). sql_data(undefined) -> null; sql_data(List) when is_list(List) -> List; @@ -218,7 +216,6 @@ sql_data(Bool) when is_boolean(Bool) -> Bool; sql_data(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); sql_data(Map) when is_map(Map) -> emqx_json:encode(Map). - -spec bin(term()) -> binary(). bin(Val) -> emqx_plugin_libs_rule:bin(Val). @@ -228,7 +225,8 @@ bin(Val) -> emqx_plugin_libs_rule:bin(Val). proc_param_str(Tokens, Data, Quote) -> iolist_to_binary( - proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})). + proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote}) + ). %% backward compatibility for hot upgrading from =< e4.2.1 get_phld_var(Fun, Data) when is_function(Fun) -> @@ -248,23 +246,24 @@ do_preproc_tmpl([], Acc) -> lists:reverse(Acc); do_preproc_tmpl([[Str, Phld] | Tokens], Acc) -> do_preproc_tmpl( - Tokens, - put_head( - var, - parse_nested(unwrap(Phld)), - put_head(str, Str, Acc))); + Tokens, + put_head( + var, + parse_nested(unwrap(Phld)), + put_head(str, Str, Acc) + ) + ); do_preproc_tmpl([[Str] | Tokens], Acc) -> do_preproc_tmpl( - Tokens, - put_head(str, Str, Acc)). + Tokens, + put_head(str, Str, Acc) + ). put_head(_Type, <<>>, List) -> List; -put_head(Type, Term, List) -> - [{Type, Term} | List]. +put_head(Type, Term, List) -> [{Type, Term} | List]. preproc_tmpl_deep_map_key(Key, #{process_keys := true} = Opts) -> preproc_tmpl_deep(Key, Opts); - preproc_tmpl_deep_map_key(Key, _) -> {value, Key}. @@ -274,12 +273,16 @@ replace_with(Tmpl, RE, '$n') -> Parts = re:split(Tmpl, RE, [{return, binary}, trim, group]), {Res, _} = lists:foldl( - fun([Tkn, _Phld], {Acc, Seq}) -> - Seq1 = erlang:integer_to_binary(Seq), - {<>, Seq + 1}; + fun + ([Tkn, _Phld], {Acc, Seq}) -> + Seq1 = erlang:integer_to_binary(Seq), + {<>, Seq + 1}; ([Tkn], {Acc, Seq}) -> {<>, Seq} - end, {<<>>, 1}, Parts), + end, + {<<>>, 1}, + Parts + ), Res. parse_nested(Attr) -> @@ -289,7 +292,7 @@ parse_nested(Attr) -> end. unwrap(<<"${", Val/binary>>) -> - binary:part(Val, {0, byte_size(Val)-1}). + binary:part(Val, {0, byte_size(Val) - 1}). quote_sql(Str) -> quote(Str, <<"\\\\'">>). @@ -301,7 +304,8 @@ quote(Str, ReplaceWith) when is_list(Str); is_binary(Str); is_atom(Str); - is_map(Str) -> + is_map(Str) +-> [$', escape_apo(bin(Str), ReplaceWith), $']; quote(Val, _) -> bin(Val). diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src b/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src index 603bbb7c0..2f2976219 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src @@ -1,8 +1,8 @@ %% -*- mode: erlang -*- -{application, emqx_plugin_libs, - [{description, "EMQX Plugin utility libs"}, - {vsn, "4.3.1"}, - {modules, []}, - {applications, [kernel,stdlib]}, - {env, []} - ]}. +{application, emqx_plugin_libs, [ + {description, "EMQX Plugin utility libs"}, + {vsn, "4.3.1"}, + {modules, []}, + {applications, [kernel, stdlib]}, + {env, []} +]}. diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl index da0d0ab4f..d3cc3e63c 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl @@ -19,34 +19,36 @@ -behaviour(gen_server). %% API functions --export([ start_link/1 - , stop/1 - , child_spec/1 - ]). +-export([ + start_link/1, + stop/1, + child_spec/1 +]). --export([ inc/3 - , inc/4 - , get/3 - , get_rate/2 - , get_counters/2 - , create_metrics/3 - , create_metrics/4 - , clear_metrics/2 - , reset_metrics/2 - , has_metrics/2 - ]). +-export([ + inc/3, + inc/4, + get/3, + get_rate/2, + get_counters/2, + create_metrics/3, + create_metrics/4, + clear_metrics/2, + reset_metrics/2, + has_metrics/2 +]). --export([ get_metrics/2 - ]). +-export([get_metrics/2]). %% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_info/2 - , handle_cast/2 - , code_change/3 - , terminate/2 - ]). +-export([ + init/1, + handle_call/3, + handle_info/2, + handle_cast/2, + code_change/3, + terminate/2 +]). -ifndef(TEST). -define(SECS_5M, 300). @@ -75,87 +77,92 @@ -define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)). -record(rate, { - max = 0 :: number(), - current = 0 :: number(), - last5m = 0 :: number(), - %% metadata for calculating the avg rate - tick = 1 :: number(), - last_v = 0 :: number(), - %% metadata for calculating the 5min avg rate - last5m_acc = 0 :: number(), - last5m_smpl = [] :: list() - }). + max = 0 :: number(), + current = 0 :: number(), + last5m = 0 :: number(), + %% metadata for calculating the avg rate + tick = 1 :: number(), + last_v = 0 :: number(), + %% metadata for calculating the 5min avg rate + last5m_acc = 0 :: number(), + last5m_smpl = [] :: list() +}). -record(state, { - metric_ids = sets:new(), - rates :: undefined | #{metric_id() => #rate{}} - }). + metric_ids = sets:new(), + rates :: undefined | #{metric_id() => #rate{}} +}). %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ --spec(child_spec(handler_name()) -> supervisor:child_spec()). +-spec child_spec(handler_name()) -> supervisor:child_spec(). child_spec(Name) -> - #{ id => emqx_plugin_libs_metrics - , start => {emqx_plugin_libs_metrics, start_link, [Name]} - , restart => permanent - , shutdown => 5000 - , type => worker - , modules => [emqx_plugin_libs_metrics] - }. + #{ + id => emqx_plugin_libs_metrics, + start => {emqx_plugin_libs_metrics, start_link, [Name]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_plugin_libs_metrics] + }. --spec(create_metrics(handler_name(), metric_id(), [atom()]) -> ok | {error, term()}). +-spec create_metrics(handler_name(), metric_id(), [atom()]) -> ok | {error, term()}. create_metrics(Name, Id, Metrics) -> create_metrics(Name, Id, Metrics, Metrics). --spec(create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok | {error, term()}). +-spec create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok | {error, term()}. create_metrics(Name, Id, Metrics, RateMetrics) -> gen_server:call(Name, {create_metrics, Id, Metrics, RateMetrics}). --spec(clear_metrics(handler_name(), metric_id()) -> ok). +-spec clear_metrics(handler_name(), metric_id()) -> ok. clear_metrics(Name, Id) -> gen_server:call(Name, {delete_metrics, Id}). --spec(reset_metrics(handler_name(), metric_id()) -> ok). +-spec reset_metrics(handler_name(), metric_id()) -> ok. reset_metrics(Name, Id) -> gen_server:call(Name, {reset_metrics, Id}). --spec(has_metrics(handler_name(), metric_id()) -> boolean()). +-spec has_metrics(handler_name(), metric_id()) -> boolean(). has_metrics(Name, Id) -> case get_ref(Name, Id) of not_found -> false; _ -> true end. --spec(get(handler_name(), metric_id(), atom() | integer()) -> number()). +-spec get(handler_name(), metric_id(), atom() | integer()) -> number(). get(Name, Id, Metric) -> case get_ref(Name, Id) of - not_found -> 0; + not_found -> + 0; Ref when is_atom(Metric) -> counters:get(Ref, idx_metric(Name, Id, Metric)); Ref when is_integer(Metric) -> counters:get(Ref, Metric) end. --spec(get_rate(handler_name(), metric_id()) -> map()). +-spec get_rate(handler_name(), metric_id()) -> map(). get_rate(Name, Id) -> gen_server:call(Name, {get_rate, Id}). --spec(get_counters(handler_name(), metric_id()) -> map()). +-spec get_counters(handler_name(), metric_id()) -> map(). get_counters(Name, Id) -> - maps:map(fun(_Metric, Index) -> + maps:map( + fun(_Metric, Index) -> get(Name, Id, Index) - end, get_indexes(Name, Id)). + end, + get_indexes(Name, Id) + ). -spec reset_counters(handler_name(), metric_id()) -> ok. reset_counters(Name, Id) -> Indexes = maps:values(get_indexes(Name, Id)), Ref = get_ref(Name, Id), - [counters:put(Ref, Idx, 0) || Idx <- Indexes ], + [counters:put(Ref, Idx, 0) || Idx <- Indexes], ok. --spec(get_metrics(handler_name(), metric_id()) -> metrics()). +-spec get_metrics(handler_name(), metric_id()) -> metrics(). get_metrics(Name, Id) -> #{rate => get_rate(Name, Id), counters => get_counters(Name, Id)}. @@ -180,47 +187,63 @@ init(Name) -> handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) -> {reply, make_rate(0, 0, 0), State}; handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) -> - {reply, case maps:get(Id, Rates, undefined) of - undefined -> make_rate(0, 0, 0); - RatesPerId -> format_rates_of_id(RatesPerId) - end, State}; - -handle_call({create_metrics, Id, Metrics, RateMetrics}, _From, - State = #state{metric_ids = MIDs, rates = Rates}) -> + {reply, + case maps:get(Id, Rates, undefined) of + undefined -> make_rate(0, 0, 0); + RatesPerId -> format_rates_of_id(RatesPerId) + end, State}; +handle_call( + {create_metrics, Id, Metrics, RateMetrics}, + _From, + State = #state{metric_ids = MIDs, rates = Rates} +) -> case RateMetrics -- Metrics of [] -> RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]), - Rate1 = case Rates of - undefined -> #{Id => RatePerId}; - _ -> Rates#{Id => RatePerId} - end, - {reply, create_counters(get_self_name(), Id, Metrics), - State#state{metric_ids = sets:add_element(Id, MIDs), - rates = Rate1}}; + Rate1 = + case Rates of + undefined -> #{Id => RatePerId}; + _ -> Rates#{Id => RatePerId} + end, + {reply, create_counters(get_self_name(), Id, Metrics), State#state{ + metric_ids = sets:add_element(Id, MIDs), + rates = Rate1 + }}; _ -> {reply, {error, not_super_set_of, {RateMetrics, Metrics}}, State} end; - -handle_call({delete_metrics, Id}, _From, - State = #state{metric_ids = MIDs, rates = Rates}) -> - {reply, delete_counters(get_self_name(), Id), - State#state{metric_ids = sets:del_element(Id, MIDs), - rates = case Rates of - undefined -> undefined; - _ -> maps:remove(Id, Rates) - end}}; - -handle_call({reset_metrics, Id}, _From, - State = #state{rates = Rates}) -> - {reply, reset_counters(get_self_name(), Id), - State#state{rates = case Rates of - undefined -> undefined; - _ -> ResetRate = - maps:map(fun(_Key, _Value) -> #rate{} end, - maps:get(Id, Rates, #{})), - maps:put(Id, ResetRate, Rates) - end}}; - +handle_call( + {delete_metrics, Id}, + _From, + State = #state{metric_ids = MIDs, rates = Rates} +) -> + {reply, delete_counters(get_self_name(), Id), State#state{ + metric_ids = sets:del_element(Id, MIDs), + rates = + case Rates of + undefined -> undefined; + _ -> maps:remove(Id, Rates) + end + }}; +handle_call( + {reset_metrics, Id}, + _From, + State = #state{rates = Rates} +) -> + {reply, reset_counters(get_self_name(), Id), State#state{ + rates = + case Rates of + undefined -> + undefined; + _ -> + ResetRate = + maps:map( + fun(_Key, _Value) -> #rate{} end, + maps:get(Id, Rates, #{}) + ), + maps:put(Id, ResetRate, Rates) + end + }}; handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -230,17 +253,21 @@ handle_cast(_Msg, State) -> handle_info(ticking, State = #state{rates = undefined}) -> erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {noreply, State}; - handle_info(ticking, State = #state{rates = Rates0}) -> Rates = - maps:map(fun(Id, RatesPerID) -> - maps:map(fun(Metric, Rate) -> - calculate_rate(get(get_self_name(), Id, Metric), Rate) - end, RatesPerID) - end, Rates0), + maps:map( + fun(Id, RatesPerID) -> + maps:map( + fun(Metric, Rate) -> + calculate_rate(get(get_self_name(), Id, Metric), Rate) + end, + RatesPerID + ) + end, + Rates0 + ), erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {noreply, State#state{rates = Rates}}; - handle_info(_Info, State) -> {noreply, State}. @@ -269,12 +296,17 @@ create_counters(Name, Id, Metrics) -> Indexes = maps:from_list(lists:zip(Metrics, lists:seq(1, Size))), Counters = get_pterm(Name), CntrRef = counters:new(Size, [write_concurrency]), - persistent_term:put(?CntrRef(Name), - Counters#{Id => #{ref => CntrRef, indexes => Indexes}}), + persistent_term:put( + ?CntrRef(Name), + Counters#{Id => #{ref => CntrRef, indexes => Indexes}} + ), %% restore the old counters - lists:foreach(fun({Metric, N}) -> + lists:foreach( + fun({Metric, N}) -> inc(Name, Id, Metric, N) - end, maps:to_list(OlderCounters)). + end, + maps:to_list(OlderCounters) + ). delete_counters(Name, Id) -> persistent_term:put(?CntrRef(Name), maps:remove(Id, get_pterm(Name))). @@ -299,9 +331,13 @@ get_pterm(Name) -> calculate_rate(_CurrVal, undefined) -> undefined; -calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal, - tick = Tick, last5m_acc = AccRate5Min0, - last5m_smpl = Last5MinSamples0}) -> +calculate_rate(CurrVal, #rate{ + max = MaxRate0, + last_v = LastVal, + tick = Tick, + last5m_acc = AccRate5Min0, + last5m_smpl = Last5MinSamples0 +}) -> %% calculate the current rate based on the last value of the counter CurrRate = (CurrVal - LastVal) / ?SAMPLING, @@ -317,32 +353,40 @@ calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal, case Tick =< ?SAMPCOUNT_5M of true -> Acc = AccRate5Min0 + CurrRate, - {lists:reverse([CurrRate | lists:reverse(Last5MinSamples0)]), - Acc, Acc / Tick}; + {lists:reverse([CurrRate | lists:reverse(Last5MinSamples0)]), Acc, Acc / Tick}; false -> [FirstRate | Rates] = Last5MinSamples0, - Acc = AccRate5Min0 + CurrRate - FirstRate, - {lists:reverse([CurrRate | lists:reverse(Rates)]), - Acc, Acc / ?SAMPCOUNT_5M} + Acc = AccRate5Min0 + CurrRate - FirstRate, + {lists:reverse([CurrRate | lists:reverse(Rates)]), Acc, Acc / ?SAMPCOUNT_5M} end, - #rate{max = MaxRate, current = CurrRate, last5m = Last5Min, - last_v = CurrVal, last5m_acc = Acc5Min, - last5m_smpl = Last5MinSamples, tick = Tick + 1}. + #rate{ + max = MaxRate, + current = CurrRate, + last5m = Last5Min, + last_v = CurrVal, + last5m_acc = Acc5Min, + last5m_smpl = Last5MinSamples, + tick = Tick + 1 + }. format_rates_of_id(RatesPerId) -> - maps:map(fun(_Metric, Rates) -> + maps:map( + fun(_Metric, Rates) -> format_rate(Rates) - end, RatesPerId). + end, + RatesPerId + ). format_rate(#rate{max = Max, current = Current, last5m = Last5Min}) -> make_rate(Current, Max, Last5Min). make_rate(Current, Max, Last5Min) -> - #{ current => precision(Current, 2) - , max => precision(Max, 2) - , last5m => precision(Last5Min, 2) - }. + #{ + current => precision(Current, 2), + max => precision(Max, 2), + last5m => precision(Last5Min, 2) + }. precision(Float, N) -> Base = math:pow(10, N), diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl index d462e3db2..edbd28242 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl @@ -16,11 +16,12 @@ -module(emqx_plugin_libs_pool). --export([ start_pool/3 - , stop_pool/1 - , pool_name/1 - , health_check/3 - ]). +-export([ + start_pool/3, + stop_pool/1, + pool_name/1, + health_check/3 +]). -include_lib("emqx/include/logger.hrl"). @@ -36,8 +37,11 @@ start_pool(Name, Mod, Options) -> stop_pool(Name), start_pool(Name, Mod, Options); {error, Reason} -> - ?SLOG(error, #{msg => "start_ecpool_error", pool_name => Name, - reason => Reason}), + ?SLOG(error, #{ + msg => "start_ecpool_error", + pool_name => Name, + reason => Reason + }), {error, {start_pool_failed, Name, Reason}} end. @@ -48,18 +52,24 @@ stop_pool(Name) -> {error, not_found} -> ok; {error, Reason} -> - ?SLOG(error, #{msg => "stop_ecpool_failed", pool_name => Name, - reason => Reason}), + ?SLOG(error, #{ + msg => "stop_ecpool_failed", + pool_name => Name, + reason => Reason + }), error({stop_pool_failed, Name, Reason}) end. health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) -> - Status = [begin - case ecpool_worker:client(Worker) of - {ok, Conn} -> CheckFunc(Conn); - _ -> false + Status = [ + begin + case ecpool_worker:client(Worker) of + {ok, Conn} -> CheckFunc(Conn); + _ -> false + end end - end || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + || {_WorkerName, Worker} <- ecpool:workers(PoolName) + ], case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of true -> {ok, State}; false -> {error, health_check_failed, State} diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index 0e90ce8f7..4d2ddf10d 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -18,106 +18,109 @@ -elvis([{elvis_style, god_modules, disable}]). %% preprocess and process template string with place holders --export([ preproc_tmpl/1 - , proc_tmpl/2 - , proc_tmpl/3 - , preproc_cmd/1 - , proc_cmd/2 - , proc_cmd/3 - , preproc_sql/1 - , preproc_sql/2 - , proc_sql/2 - , proc_sql_param_str/2 - , proc_cql_param_str/2 - ]). +-export([ + preproc_tmpl/1, + proc_tmpl/2, + proc_tmpl/3, + preproc_cmd/1, + proc_cmd/2, + proc_cmd/3, + preproc_sql/1, + preproc_sql/2, + proc_sql/2, + proc_sql_param_str/2, + proc_cql_param_str/2 +]). %% type converting --export([ str/1 - , bin/1 - , bool/1 - , int/1 - , float/1 - , map/1 - , utf8_bin/1 - , utf8_str/1 - , number_to_binary/1 - , atom_key/1 - , unsafe_atom_key/1 - ]). +-export([ + str/1, + bin/1, + bool/1, + int/1, + float/1, + map/1, + utf8_bin/1, + utf8_str/1, + number_to_binary/1, + atom_key/1, + unsafe_atom_key/1 +]). %% connectivity check --export([ http_connectivity/1 - , http_connectivity/2 - , tcp_connectivity/2 - , tcp_connectivity/3 - ]). +-export([ + http_connectivity/1, + http_connectivity/2, + tcp_connectivity/2, + tcp_connectivity/3 +]). --export([ now_ms/0 - , can_topic_match_oneof/2 - ]). +-export([ + now_ms/0, + can_topic_match_oneof/2 +]). -export([cluster_call/3]). --compile({no_auto_import, - [ float/1 - ]}). +-compile({no_auto_import, [float/1]}). -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})"). --define(EX_WITHE_CHARS, "\\s"). %% Space and CRLF +%% Space and CRLF +-define(EX_WITHE_CHARS, "\\s"). --type(uri_string() :: iodata()). +-type uri_string() :: iodata(). --type(tmpl_token() :: list({var, binary()} | {str, binary()})). +-type tmpl_token() :: list({var, binary()} | {str, binary()}). --type(tmpl_cmd() :: list(tmpl_token())). +-type tmpl_cmd() :: list(tmpl_token()). --type(prepare_statement_key() :: binary()). +-type prepare_statement_key() :: binary(). %% preprocess template string with place holders --spec(preproc_tmpl(binary()) -> tmpl_token()). +-spec preproc_tmpl(binary()) -> tmpl_token(). preproc_tmpl(Str) -> emqx_placeholder:preproc_tmpl(Str). --spec(proc_tmpl(tmpl_token(), map()) -> binary()). +-spec proc_tmpl(tmpl_token(), map()) -> binary(). proc_tmpl(Tokens, Data) -> emqx_placeholder:proc_tmpl(Tokens, Data). --spec(proc_tmpl(tmpl_token(), map(), map()) -> binary() | list()). +-spec proc_tmpl(tmpl_token(), map(), map()) -> binary() | list(). proc_tmpl(Tokens, Data, Opts) -> emqx_placeholder:proc_tmpl(Tokens, Data, Opts). --spec(preproc_cmd(binary()) -> tmpl_cmd()). +-spec preproc_cmd(binary()) -> tmpl_cmd(). preproc_cmd(Str) -> emqx_placeholder:preproc_cmd(Str). --spec(proc_cmd([tmpl_token()], map()) -> binary() | list()). +-spec proc_cmd([tmpl_token()], map()) -> binary() | list(). proc_cmd(Tokens, Data) -> emqx_placeholder:proc_cmd(Tokens, Data). --spec(proc_cmd([tmpl_token()], map(), map()) -> list()). +-spec proc_cmd([tmpl_token()], map(), map()) -> list(). proc_cmd(Tokens, Data, Opts) -> emqx_placeholder:proc_cmd(Tokens, Data, Opts). %% preprocess SQL with place holders --spec(preproc_sql(Sql::binary()) -> {prepare_statement_key(), tmpl_token()}). +-spec preproc_sql(Sql :: binary()) -> {prepare_statement_key(), tmpl_token()}. preproc_sql(Sql) -> emqx_placeholder:preproc_sql(Sql). --spec(preproc_sql(Sql::binary(), ReplaceWith :: '?' | '$n') - -> {prepare_statement_key(), tmpl_token()}). +-spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n') -> + {prepare_statement_key(), tmpl_token()}. preproc_sql(Sql, ReplaceWith) -> emqx_placeholder:preproc_sql(Sql, ReplaceWith). --spec(proc_sql(tmpl_token(), map()) -> list()). +-spec proc_sql(tmpl_token(), map()) -> list(). proc_sql(Tokens, Data) -> emqx_placeholder:proc_sql(Tokens, Data). --spec(proc_sql_param_str(tmpl_token(), map()) -> binary()). +-spec proc_sql_param_str(tmpl_token(), map()) -> binary(). proc_sql_param_str(Tokens, Data) -> emqx_placeholder:proc_sql_param_str(Tokens, Data). --spec(proc_cql_param_str(tmpl_token(), map()) -> binary()). +-spec proc_cql_param_str(tmpl_token(), map()) -> binary(). proc_cql_param_str(Tokens, Data) -> emqx_placeholder:proc_cql_param_str(Tokens, Data). @@ -133,19 +136,22 @@ unsafe_atom_key(Key) -> atom_key(Key) when is_atom(Key) -> Key; atom_key(Key) when is_binary(Key) -> - try binary_to_existing_atom(Key, utf8) - catch error:badarg -> error({invalid_key, Key}) + try + binary_to_existing_atom(Key, utf8) + catch + error:badarg -> error({invalid_key, Key}) end; -atom_key(Keys = [_Key | _]) -> %% nested keys +%% nested keys +atom_key(Keys = [_Key | _]) -> [atom_key(SubKey) || SubKey <- Keys]; atom_key(Key) -> error({invalid_key, Key}). --spec(http_connectivity(uri_string()) -> ok | {error, Reason :: term()}). +-spec http_connectivity(uri_string()) -> ok | {error, Reason :: term()}. http_connectivity(Url) -> http_connectivity(Url, 3000). --spec(http_connectivity(uri_string(), integer()) -> ok | {error, Reason :: term()}). +-spec http_connectivity(uri_string(), integer()) -> ok | {error, Reason :: term()}. http_connectivity(Url, Timeout) -> case emqx_http_lib:uri_parse(Url) of {ok, #{host := Host, port := Port}} -> @@ -154,20 +160,27 @@ http_connectivity(Url, Timeout) -> {error, Reason} end. --spec tcp_connectivity(Host :: inet:socket_address() | inet:hostname(), - Port :: inet:port_number()) - -> ok | {error, Reason :: term()}. +-spec tcp_connectivity( + Host :: inet:socket_address() | inet:hostname(), + Port :: inet:port_number() +) -> + ok | {error, Reason :: term()}. tcp_connectivity(Host, Port) -> tcp_connectivity(Host, Port, 3000). --spec(tcp_connectivity(Host :: inet:socket_address() | inet:hostname(), - Port :: inet:port_number(), - Timeout :: integer()) - -> ok | {error, Reason :: term()}). +-spec tcp_connectivity( + Host :: inet:socket_address() | inet:hostname(), + Port :: inet:port_number(), + Timeout :: integer() +) -> + ok | {error, Reason :: term()}. tcp_connectivity(Host, Port, Timeout) -> case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of - {ok, Sock} -> gen_tcp:close(Sock), ok; - {error, Reason} -> {error, Reason} + {ok, Sock} -> + gen_tcp:close(Sock), + ok; + {error, Reason} -> + {error, Reason} end. str(Bin) when is_binary(Bin) -> binary_to_list(Bin); @@ -177,9 +190,10 @@ str(Map) when is_map(Map) -> binary_to_list(emqx_json:encode(Map)); str(List) when is_list(List) -> case io_lib:printable_list(List) of true -> List; - false -> binary_to_list(emqx_json:encode(List)) + false -> binary_to_list(emqx_json:encode(List)) end; -str(Data) -> error({invalid_str, Data}). +str(Data) -> + error({invalid_str, Data}). utf8_bin(Str) when is_binary(Str); is_list(Str) -> unicode:characters_to_binary(Str); @@ -200,36 +214,49 @@ bin(List) when is_list(List) -> true -> list_to_binary(List); false -> emqx_json:encode(List) end; -bin(Data) -> error({invalid_bin, Data}). +bin(Data) -> + error({invalid_bin, Data}). int(List) when is_list(List) -> - try list_to_integer(List) - catch error:badarg -> - int(list_to_float(List)) + try + list_to_integer(List) + catch + error:badarg -> + int(list_to_float(List)) end; int(Bin) when is_binary(Bin) -> - try binary_to_integer(Bin) - catch error:badarg -> - int(binary_to_float(Bin)) + try + binary_to_integer(Bin) + catch + error:badarg -> + int(binary_to_float(Bin)) end; int(Int) when is_integer(Int) -> Int; int(Float) when is_float(Float) -> erlang:floor(Float); -int(true) -> 1; -int(false) -> 0; -int(Data) -> error({invalid_number, Data}). +int(true) -> + 1; +int(false) -> + 0; +int(Data) -> + error({invalid_number, Data}). float(List) when is_list(List) -> - try list_to_float(List) - catch error:badarg -> - float(list_to_integer(List)) + try + list_to_float(List) + catch + error:badarg -> + float(list_to_integer(List)) end; float(Bin) when is_binary(Bin) -> - try binary_to_float(Bin) - catch error:badarg -> - float(binary_to_integer(Bin)) + try + binary_to_float(Bin) + catch + error:badarg -> + float(binary_to_integer(Bin)) end; float(Num) when is_number(Num) -> erlang:float(Num); -float(Data) -> error({invalid_number, Data}). +float(Data) -> + error({invalid_number, Data}). map(Bin) when is_binary(Bin) -> case emqx_json:decode(Bin, [return_maps]) of @@ -238,16 +265,23 @@ map(Bin) when is_binary(Bin) -> end; map(List) when is_list(List) -> maps:from_list(List); map(Map) when is_map(Map) -> Map; -map(Data) -> error({invalid_map, Data}). +map(Data) -> + error({invalid_map, Data}). - -bool(Bool) when Bool == true; - Bool == <<"true">>; - Bool == 1 -> true; -bool(Bool) when Bool == false; - Bool == <<"false">>; - Bool == 0 -> false; -bool(Bool) -> error({invalid_boolean, Bool}). +bool(Bool) when + Bool == true; + Bool == <<"true">>; + Bool == 1 +-> + true; +bool(Bool) when + Bool == false; + Bool == <<"false">>; + Bool == 0 +-> + false; +bool(Bool) -> + error({invalid_boolean, Bool}). number_to_binary(Int) when is_integer(Int) -> integer_to_binary(Int); @@ -263,9 +297,12 @@ now_ms() -> erlang:system_time(millisecond). can_topic_match_oneof(Topic, Filters) -> - lists:any(fun(Fltr) -> - emqx_topic:match(Topic, Fltr) - end, Filters). + lists:any( + fun(Fltr) -> + emqx_topic:match(Topic, Fltr) + end, + Filters + ). cluster_call(Module, Func, Args) -> {ok, _TnxId, Result} = emqx_cluster_rpc:multicall(Module, Func, Args), diff --git a/apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl b/apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl index 9a7e8e6a5..fa7d9416d 100644 --- a/apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl +++ b/apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl @@ -18,19 +18,21 @@ -behaviour(emqx_bpapi). --export([ introduced_in/0 +-export([ + introduced_in/0, - , get_metrics/3 - ]). + get_metrics/3 +]). -include_lib("emqx/include/bpapi.hrl"). introduced_in() -> "5.0.0". --spec get_metrics( node() - , emqx_plugin_libs_metrics:handler_name() - , emqx_plugin_libs_metrics:metric_id() - ) -> emqx_plugin_libs_metrics:metrics() | {badrpc, _}. +-spec get_metrics( + node(), + emqx_plugin_libs_metrics:handler_name(), + emqx_plugin_libs_metrics:metric_id() +) -> emqx_plugin_libs_metrics:metrics() | {badrpc, _}. get_metrics(Node, HandlerName, MetricId) -> rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [HandlerName, MetricId]). diff --git a/apps/emqx_plugin_libs/test/emqx_placeholder_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_placeholder_SUITE.erl index c8de69d32..5fcfb23e4 100644 --- a/apps/emqx_plugin_libs/test/emqx_placeholder_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_placeholder_SUITE.erl @@ -23,54 +23,72 @@ all() -> emqx_common_test_helpers:all(?MODULE). - t_proc_tmpl(_) -> Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, Tks = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), - ?assertEqual(<<"a:1,b:1,c:1.0,d:{\"d1\":\"hi\"}">>, - emqx_placeholder:proc_tmpl(Tks, Selected)). + ?assertEqual( + <<"a:1,b:1,c:1.0,d:{\"d1\":\"hi\"}">>, + emqx_placeholder:proc_tmpl(Tks, Selected) + ). t_proc_tmpl_path(_) -> Selected = #{d => #{d1 => <<"hi">>}}, Tks = emqx_placeholder:preproc_tmpl(<<"d.d1:${d.d1}">>), - ?assertEqual(<<"d.d1:hi">>, - emqx_placeholder:proc_tmpl(Tks, Selected)). + ?assertEqual( + <<"d.d1:hi">>, + emqx_placeholder:proc_tmpl(Tks, Selected) + ). t_proc_tmpl_custom_ph(_) -> Selected = #{a => <<"a">>, b => <<"b">>}, Tks = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b}">>, #{placeholders => [<<"${a}">>]}), - ?assertEqual(<<"a:a,b:${b}">>, - emqx_placeholder:proc_tmpl(Tks, Selected)). + ?assertEqual( + <<"a:a,b:${b}">>, + emqx_placeholder:proc_tmpl(Tks, Selected) + ). t_proc_tmpl1(_) -> Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, Tks = emqx_placeholder:preproc_tmpl(<<"a:$a,b:b},c:{c},d:${d">>), - ?assertEqual(<<"a:$a,b:b},c:{c},d:${d">>, - emqx_placeholder:proc_tmpl(Tks, Selected)). + ?assertEqual( + <<"a:$a,b:b},c:{c},d:${d">>, + emqx_placeholder:proc_tmpl(Tks, Selected) + ). t_proc_cmd(_) -> Selected = #{v0 => <<"x">>, v1 => <<"1">>, v2 => #{d1 => <<"hi">>}}, Tks = emqx_placeholder:preproc_cmd(<<"hset name a:${v0} ${v1} b ${v2} ">>), - ?assertEqual([<<"hset">>, <<"name">>, - <<"a:x">>, <<"1">>, - <<"b">>, <<"{\"d1\":\"hi\"}">>], - emqx_placeholder:proc_cmd(Tks, Selected)). + ?assertEqual( + [ + <<"hset">>, + <<"name">>, + <<"a:x">>, + <<"1">>, + <<"b">>, + <<"{\"d1\":\"hi\"}">> + ], + emqx_placeholder:proc_cmd(Tks, Selected) + ). t_preproc_sql(_) -> Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, {PrepareStatement, ParamsTokens} = emqx_placeholder:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '?'), ?assertEqual(<<"a:?,b:?,c:?,d:?">>, PrepareStatement), - ?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>], - emqx_placeholder:proc_sql(ParamsTokens, Selected)). + ?assertEqual( + [<<"1">>, 1, 1.0, <<"{\"d1\":\"hi\"}">>], + emqx_placeholder:proc_sql(ParamsTokens, Selected) + ). t_preproc_sql1(_) -> Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, {PrepareStatement, ParamsTokens} = emqx_placeholder:preproc_sql(<<"a:${a},b:${b},c:${c},d:${d}">>, '$n'), ?assertEqual(<<"a:$1,b:$2,c:$3,d:$4">>, PrepareStatement), - ?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>], - emqx_placeholder:proc_sql(ParamsTokens, Selected)). + ?assertEqual( + [<<"1">>, 1, 1.0, <<"{\"d1\":\"hi\"}">>], + emqx_placeholder:proc_sql(ParamsTokens, Selected) + ). t_preproc_sql2(_) -> Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, @@ -82,49 +100,72 @@ t_preproc_sql2(_) -> t_preproc_sql3(_) -> Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, ParamsTokens = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), - ?assertEqual(<<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}'">>, - emqx_placeholder:proc_sql_param_str(ParamsTokens, Selected)). + ?assertEqual( + <<"a:'1',b:1,c:1.0,d:'{\"d1\":\"hi\"}'">>, + emqx_placeholder:proc_sql_param_str(ParamsTokens, Selected) + ). t_preproc_sql4(_) -> %% with apostrophes %% https://github.com/emqx/emqx/issues/4135 - Selected = #{a => <<"1''2">>, b => 1, c => 1.0, - d => #{d1 => <<"someone's phone">>}}, + Selected = #{ + a => <<"1''2">>, + b => 1, + c => 1.0, + d => #{d1 => <<"someone's phone">>} + }, ParamsTokens = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), - ?assertEqual(<<"a:'1\\'\\'2',b:1,c:1.0,d:'{\"d1\":\"someone\\'s phone\"}'">>, - emqx_placeholder:proc_sql_param_str(ParamsTokens, Selected)). + ?assertEqual( + <<"a:'1\\'\\'2',b:1,c:1.0,d:'{\"d1\":\"someone\\'s phone\"}'">>, + emqx_placeholder:proc_sql_param_str(ParamsTokens, Selected) + ). t_preproc_sql5(_) -> %% with apostrophes for cassandra %% https://github.com/emqx/emqx/issues/4148 - Selected = #{a => <<"1''2">>, b => 1, c => 1.0, - d => #{d1 => <<"someone's phone">>}}, + Selected = #{ + a => <<"1''2">>, + b => 1, + c => 1.0, + d => #{d1 => <<"someone's phone">>} + }, ParamsTokens = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), - ?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>, - emqx_placeholder:proc_cql_param_str(ParamsTokens, Selected)). + ?assertEqual( + <<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>, + emqx_placeholder:proc_cql_param_str(ParamsTokens, Selected) + ). t_preproc_sql6(_) -> Selected = #{a => <<"a">>, b => <<"b">>}, {PrepareStatement, ParamsTokens} = emqx_placeholder:preproc_sql( - <<"a:${a},b:${b}">>, - #{replace_with => '$n', - placeholders => [<<"${a}">>]}), + <<"a:${a},b:${b}">>, + #{ + replace_with => '$n', + placeholders => [<<"${a}">>] + } + ), ?assertEqual(<<"a:$1,b:${b}">>, PrepareStatement), - ?assertEqual([<<"a">>], - emqx_placeholder:proc_sql(ParamsTokens, Selected)). + ?assertEqual( + [<<"a">>], + emqx_placeholder:proc_sql(ParamsTokens, Selected) + ). t_preproc_tmpl_deep(_) -> Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}}, Tmpl0 = emqx_placeholder:preproc_tmpl_deep( - #{<<"${a}">> => [<<"${b}">>, "c", 2, 3.0, '${d}', {[<<"${c}">>], 0}]}), + #{<<"${a}">> => [<<"${b}">>, "c", 2, 3.0, '${d}', {[<<"${c}">>], 0}]} + ), ?assertEqual( - #{<<"1">> => [<<"1">>, "c", 2, 3.0, '${d}', {[<<"1.0">>], 0}]}, - emqx_placeholder:proc_tmpl_deep(Tmpl0, Selected)), + #{<<"1">> => [<<"1">>, "c", 2, 3.0, '${d}', {[<<"1.0">>], 0}]}, + emqx_placeholder:proc_tmpl_deep(Tmpl0, Selected) + ), Tmpl1 = emqx_placeholder:preproc_tmpl_deep( - #{<<"${a}">> => [<<"${b}">>, "c", 2, 3.0, '${d}', {[<<"${c}">>], 0}]}, - #{process_keys => false}), + #{<<"${a}">> => [<<"${b}">>, "c", 2, 3.0, '${d}', {[<<"${c}">>], 0}]}, + #{process_keys => false} + ), ?assertEqual( - #{<<"${a}">> => [<<"1">>, "c", 2, 3.0, '${d}', {[<<"1.0">>], 0}]}, - emqx_placeholder:proc_tmpl_deep(Tmpl1, Selected)). + #{<<"${a}">> => [<<"1">>, "c", 2, 3.0, '${d}', {[<<"1.0">>], 0}]}, + emqx_placeholder:proc_tmpl_deep(Tmpl1, Selected) + ). diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl index 3ae480c22..cd6e1b0b2 100644 --- a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl @@ -52,100 +52,132 @@ t_get_metrics(_) -> Metrics = [a, b, c], ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics), %% all the metrics are set to zero at start - ?assertMatch(#{ - rate := #{ - a := #{current := 0.0, max := 0.0, last5m := 0.0}, - b := #{current := 0.0, max := 0.0, last5m := 0.0}, - c := #{current := 0.0, max := 0.0, last5m := 0.0} + ?assertMatch( + #{ + rate := #{ + a := #{current := 0.0, max := 0.0, last5m := 0.0}, + b := #{current := 0.0, max := 0.0, last5m := 0.0}, + c := #{current := 0.0, max := 0.0, last5m := 0.0} + }, + counters := #{ + a := 0, + b := 0, + c := 0 + } }, - counters := #{ - a := 0, - b := 0, - c := 0 - } - }, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)), + emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>) + ), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), ct:sleep(1500), - ?LET(#{ - rate := #{ - a := #{current := CurrA, max := MaxA, last5m := _}, - b := #{current := CurrB, max := MaxB, last5m := _}, - c := #{current := CurrC, max := MaxC, last5m := _} + ?LET( + #{ + rate := #{ + a := #{current := CurrA, max := MaxA, last5m := _}, + b := #{current := CurrB, max := MaxB, last5m := _}, + c := #{current := CurrC, max := MaxC, last5m := _} + }, + counters := #{ + a := 1, + b := 1, + c := 2 + } }, - counters := #{ - a := 1, - b := 1, - c := 2 + emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>), + { + ?assert(CurrA > 0), + ?assert(CurrB > 0), + ?assert(CurrC > 0), + ?assert(MaxA > 0), + ?assert(MaxB > 0), + ?assert(MaxC > 0) } - }, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>), - {?assert(CurrA > 0), ?assert(CurrB > 0), ?assert(CurrC > 0), - ?assert(MaxA > 0), ?assert(MaxB > 0), ?assert(MaxC > 0)}), + ), ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). t_reset_metrics(_) -> Metrics = [a, b, c], ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics), %% all the metrics are set to zero at start - ?assertMatch(#{ - rate := #{ - a := #{current := 0.0, max := 0.0, last5m := 0.0}, - b := #{current := 0.0, max := 0.0, last5m := 0.0}, - c := #{current := 0.0, max := 0.0, last5m := 0.0} + ?assertMatch( + #{ + rate := #{ + a := #{current := 0.0, max := 0.0, last5m := 0.0}, + b := #{current := 0.0, max := 0.0, last5m := 0.0}, + c := #{current := 0.0, max := 0.0, last5m := 0.0} + }, + counters := #{ + a := 0, + b := 0, + c := 0 + } }, - counters := #{ - a := 0, - b := 0, - c := 0 - } - }, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)), + emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>) + ), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), ct:sleep(1500), ok = emqx_plugin_libs_metrics:reset_metrics(?NAME, <<"testid">>), - ?LET(#{ - rate := #{ - a := #{current := CurrA, max := MaxA, last5m := _}, - b := #{current := CurrB, max := MaxB, last5m := _}, - c := #{current := CurrC, max := MaxC, last5m := _} + ?LET( + #{ + rate := #{ + a := #{current := CurrA, max := MaxA, last5m := _}, + b := #{current := CurrB, max := MaxB, last5m := _}, + c := #{current := CurrC, max := MaxC, last5m := _} + }, + counters := #{ + a := 0, + b := 0, + c := 0 + } }, - counters := #{ - a := 0, - b := 0, - c := 0 + emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>), + { + ?assert(CurrA == 0), + ?assert(CurrB == 0), + ?assert(CurrC == 0), + ?assert(MaxA == 0), + ?assert(MaxB == 0), + ?assert(MaxC == 0) } - }, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>), - {?assert(CurrA == 0), ?assert(CurrB == 0), ?assert(CurrC == 0), - ?assert(MaxA == 0), ?assert(MaxB == 0), ?assert(MaxC == 0)}), + ), ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). t_get_metrics_2(_) -> Metrics = [a, b, c], - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics, - [a]), + ok = emqx_plugin_libs_metrics:create_metrics( + ?NAME, + <<"testid">>, + Metrics, + [a] + ), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), - ?assertMatch(#{ - rate := Rate = #{ - a := #{current := _, max := _, last5m := _} - }, - counters := #{ - a := 1, - b := 1, - c := 1 - } - } when map_size(Rate) =:= 1, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)), + ?assertMatch( + #{ + rate := Rate = #{ + a := #{current := _, max := _, last5m := _} + }, + counters := #{ + a := 1, + b := 1, + c := 1 + } + } when map_size(Rate) =:= 1, + emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>) + ), ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). t_recreate_metrics(_) -> ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, [a]), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a), - ?assertMatch(#{ + ?assertMatch( + #{ rate := R = #{ a := #{current := _, max := _, last5m := _} }, @@ -153,12 +185,14 @@ t_recreate_metrics(_) -> a := 1 } } when map_size(R) == 1 andalso map_size(C) == 1, - emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)), + emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>) + ), %% we create the metrics again, to add some counters ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, [a, b, c]), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), - ?assertMatch(#{ + ?assertMatch( + #{ rate := R = #{ a := #{current := _, max := _, last5m := _}, b := #{current := _, max := _, last5m := _}, @@ -168,7 +202,8 @@ t_recreate_metrics(_) -> a := 1, b := 1, c := 1 } } when map_size(R) == 3 andalso map_size(C) == 3, - emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)), + emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>) + ), ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). t_inc_matched(_) -> @@ -192,15 +227,17 @@ t_rate(_) -> ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule:2">>, 'rules.matched'), ?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')), ct:sleep(1000), - ?LET(#{'rules.matched' := #{max := Max, current := Current}}, - emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>), - {?assert(Max =< 2), - ?assert(Current =< 2)}), + ?LET( + #{'rules.matched' := #{max := Max, current := Current}}, + emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>), + {?assert(Max =< 2), ?assert(Current =< 2)} + ), ct:sleep(2100), - ?LET(#{'rules.matched' := #{max := Max, current := Current, last5m := Last5Min}}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>), - {?assert(Max =< 2), - ?assert(Current == 0), - ?assert(Last5Min =< 0.67)}), + ?LET( + #{'rules.matched' := #{max := Max, current := Current, last5m := Last5Min}}, + emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>), + {?assert(Max =< 2), ?assert(Current == 0), ?assert(Last5Min =< 0.67)} + ), ct:sleep(3000), ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>), ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule:2">>). diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl index 94060b7e3..9f7117324 100644 --- a/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_rule_SUITE.erl @@ -28,10 +28,12 @@ all() -> emqx_common_test_helpers:all(?MODULE). t_http_connectivity(_) -> {ok, Socket} = gen_tcp:listen(?PORT, []), ok = emqx_plugin_libs_rule:http_connectivity( - "http://127.0.0.1:"++emqx_plugin_libs_rule:str(?PORT), 1000), + "http://127.0.0.1:" ++ emqx_plugin_libs_rule:str(?PORT), 1000 + ), gen_tcp:close(Socket), {error, _} = emqx_plugin_libs_rule:http_connectivity( - "http://127.0.0.1:"++emqx_plugin_libs_rule:str(?PORT), 1000). + "http://127.0.0.1:" ++ emqx_plugin_libs_rule:str(?PORT), 1000 + ). t_tcp_connectivity(_) -> {ok, Socket} = gen_tcp:listen(?PORT, []), @@ -59,7 +61,8 @@ t_bin(_) -> ?assertError(_, emqx_plugin_libs_rule:bin({a, v})). t_atom_key(_) -> - _ = erlang, _ = port, + _ = erlang, + _ = port, ?assertEqual([erlang], emqx_plugin_libs_rule:atom_key([<<"erlang">>])), ?assertEqual([erlang, port], emqx_plugin_libs_rule:atom_key([<<"erlang">>, port])), ?assertEqual([erlang, port], emqx_plugin_libs_rule:atom_key([<<"erlang">>, <<"port">>])), @@ -70,8 +73,12 @@ t_atom_key(_) -> t_unsafe_atom_key(_) -> ?assertEqual([xyz876gv], emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv">>])), - ?assertEqual([xyz876gv33, port], - emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv33">>, port])), - ?assertEqual([xyz876gv331, port1221], - emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv331">>, <<"port1221">>])), + ?assertEqual( + [xyz876gv33, port], + emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv33">>, port]) + ), + ?assertEqual( + [xyz876gv331, port1221], + emqx_plugin_libs_rule:unsafe_atom_key([<<"xyz876gv331">>, <<"port1221">>]) + ), ?assertEqual(xyz876gv3312, emqx_plugin_libs_rule:unsafe_atom_key(<<"xyz876gv3312">>)). diff --git a/apps/emqx_statsd/rebar.config b/apps/emqx_statsd/rebar.config index b6ad169f8..bb9a14272 100644 --- a/apps/emqx_statsd/rebar.config +++ b/apps/emqx_statsd/rebar.config @@ -1,11 +1,14 @@ %% -*- mode: erlang -*- {erl_opts, [debug_info]}. -{deps, [ {emqx, {path, "../emqx"}} - , {estatsd, {git, "https://github.com/emqx/estatsd", {tag, "0.1.0"}}} - ]}. +{deps, [ + {emqx, {path, "../emqx"}}, + {estatsd, {git, "https://github.com/emqx/estatsd", {tag, "0.1.0"}}} +]}. {shell, [ - % {config, "config/sys.config"}, + % {config, "config/sys.config"}, {apps, [emqx_statsd]} ]}. + +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_statsd/src/emqx_statsd.app.src b/apps/emqx_statsd/src/emqx_statsd.app.src index aa1c14ded..9e5829a11 100644 --- a/apps/emqx_statsd/src/emqx_statsd.app.src +++ b/apps/emqx_statsd/src/emqx_statsd.app.src @@ -1,17 +1,17 @@ %% -*- mode: erlang -*- -{application, emqx_statsd, - [{description, "An OTP application"}, - {vsn, "5.0.0"}, - {registered, []}, - {mod, {emqx_statsd_app, []}}, - {applications, - [kernel, - stdlib, - estatsd, - emqx - ]}, - {env,[]}, - {modules, []}, - {licenses, ["Apache 2.0"]}, - {links, []} - ]}. +{application, emqx_statsd, [ + {description, "An OTP application"}, + {vsn, "5.0.0"}, + {registered, []}, + {mod, {emqx_statsd_app, []}}, + {applications, [ + kernel, + stdlib, + estatsd, + emqx + ]}, + {env, []}, + {modules, []}, + {licenses, ["Apache 2.0"]}, + {links, []} +]}. diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 042322bf1..987f7e1bf 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -27,39 +27,45 @@ -include_lib("emqx/include/logger.hrl"). --export([ update/1 - , start/0 - , stop/0 - , restart/0 - %% for rpc - , do_start/0 - , do_stop/0 - , do_restart/0 - ]). +-export([ + update/1, + start/0, + stop/0, + restart/0, + %% for rpc + do_start/0, + do_stop/0, + do_restart/0 +]). %% Interface -export([start_link/1]). %% Internal Exports --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , code_change/3 - , terminate/2 - ]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + terminate/2 +]). -record(state, { - timer :: reference() | undefined, + timer :: reference() | undefined, sample_time_interval :: pos_integer(), - flush_time_interval :: pos_integer(), - estatsd_pid :: pid() + flush_time_interval :: pos_integer(), + estatsd_pid :: pid() }). update(Config) -> - case emqx_conf:update([statsd], - Config, - #{rawconf_with_defaults => true, override_to => cluster}) of + case + emqx_conf:update( + [statsd], + Config, + #{rawconf_with_defaults => true, override_to => cluster} + ) + of {ok, #{raw_config := NewConfigRows}} -> ok = stop(), case maps:get(<<"enable">>, Config, true) of @@ -73,8 +79,8 @@ update(Config) -> {error, Reason} end. -start() -> check_multicall_result(emqx_statsd_proto_v1:start(mria_mnesia:running_nodes())). -stop() -> check_multicall_result(emqx_statsd_proto_v1:stop(mria_mnesia:running_nodes())). +start() -> check_multicall_result(emqx_statsd_proto_v1:start(mria_mnesia:running_nodes())). +stop() -> check_multicall_result(emqx_statsd_proto_v1:stop(mria_mnesia:running_nodes())). restart() -> check_multicall_result(emqx_statsd_proto_v1:restart(mria_mnesia:running_nodes())). do_start() -> @@ -95,17 +101,27 @@ init([Opts]) -> process_flag(trap_exit, true), Tags = tags(maps:get(tags, Opts, #{})), {Host, Port} = maps:get(server, Opts, {?DEFAULT_HOST, ?DEFAULT_PORT}), - Opts1 = maps:without([sample_time_interval, - flush_time_interval], Opts#{tags => Tags, - host => Host, - port => Port, - prefix => <<"emqx">>}), + Opts1 = maps:without( + [ + sample_time_interval, + flush_time_interval + ], + Opts#{ + tags => Tags, + host => Host, + port => Port, + prefix => <<"emqx">> + } + ), {ok, Pid} = estatsd:start_link(maps:to_list(Opts1)), SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), - {ok, ensure_timer(#state{sample_time_interval = SampleTimeInterval, - flush_time_interval = FlushTimeInterval, - estatsd_pid = Pid})}. + {ok, + ensure_timer(#state{ + sample_time_interval = SampleTimeInterval, + flush_time_interval = FlushTimeInterval, + estatsd_pid = Pid + })}. handle_call(_Req, _From, State) -> {noreply, State}. @@ -113,20 +129,25 @@ handle_call(_Req, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({timeout, Ref, sample_timeout}, - State = #state{sample_time_interval = SampleTimeInterval, - flush_time_interval = FlushTimeInterval, - estatsd_pid = Pid, - timer = Ref}) -> +handle_info( + {timeout, Ref, sample_timeout}, + State = #state{ + sample_time_interval = SampleTimeInterval, + flush_time_interval = FlushTimeInterval, + estatsd_pid = Pid, + timer = Ref + } +) -> Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(), SampleRate = SampleTimeInterval / FlushTimeInterval, - StatsdMetrics = [{gauge, trans_metrics_name(Name), Value, SampleRate, []} || {Name, Value} <- Metrics], + StatsdMetrics = [ + {gauge, trans_metrics_name(Name), Value, SampleRate, []} + || {Name, Value} <- Metrics + ], estatsd:submit(Pid, StatsdMetrics), {noreply, ensure_timer(State)}; - handle_info({'EXIT', Pid, Error}, State = #state{estatsd_pid = Pid}) -> {stop, {shutdown, Error}, State}; - handle_info(_Msg, State) -> {noreply, State}. @@ -145,26 +166,37 @@ trans_metrics_name(Name) -> binary_to_atom(<<"emqx.", Name0/binary>>, utf8). emqx_vm_data() -> - Idle = case cpu_sup:util([detailed]) of - {_, 0, 0, _} -> 0; %% Not support for Windows - {_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0) - end, + Idle = + case cpu_sup:util([detailed]) of + %% Not support for Windows + {_, 0, 0, _} -> 0; + {_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0) + end, RunQueue = erlang:statistics(run_queue), - [{run_queue, RunQueue}, - {cpu_idle, Idle}, - {cpu_use, 100 - Idle}] ++ emqx_vm:mem_info(). + [ + {run_queue, RunQueue}, + {cpu_idle, Idle}, + {cpu_use, 100 - Idle} + ] ++ emqx_vm:mem_info(). tags(Map) -> Tags = maps:to_list(Map), [{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags]. - -ensure_timer(State =#state{sample_time_interval = SampleTimeInterval}) -> +ensure_timer(State = #state{sample_time_interval = SampleTimeInterval}) -> State#state{timer = emqx_misc:start_timer(SampleTimeInterval, sample_timeout)}. check_multicall_result({Results, []}) -> - case lists:all(fun(ok) -> true; (_) -> false end, Results) of - true -> ok; + case + lists:all( + fun + (ok) -> true; + (_) -> false + end, + Results + ) + of + true -> ok; false -> error({bad_result, Results}) end; check_multicall_result({_, _}) -> diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl index 3b0142bd0..716147eca 100644 --- a/apps/emqx_statsd/src/emqx_statsd_api.erl +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -26,17 +26,17 @@ -export([statsd/2]). --export([ api_spec/0 - , paths/0 - , schema/1 - ]). +-export([ + api_spec/0, + paths/0, + schema/1 +]). -define(API_TAG_STATSD, [<<"statsd">>]). -define(SCHEMA_MODULE, emqx_statsd_schema). -define(INTERNAL_ERROR, 'INTERNAL_ERROR'). - api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). @@ -44,21 +44,24 @@ paths() -> ["/statsd"]. schema("/statsd") -> - #{ 'operationId' => statsd - , get => - #{ description => <<"Get statsd config">> - , tags => ?API_TAG_STATSD - , responses => - #{200 => statsd_config_schema()} + #{ + 'operationId' => statsd, + get => + #{ + description => <<"Get statsd config">>, + tags => ?API_TAG_STATSD, + responses => + #{200 => statsd_config_schema()} + }, + put => + #{ + description => <<"Set statsd config">>, + tags => ?API_TAG_STATSD, + 'requestBody' => statsd_config_schema(), + responses => + #{200 => statsd_config_schema()} } - , put => - #{ description => <<"Set statsd config">> - , tags => ?API_TAG_STATSD - , 'requestBody' => statsd_config_schema() - , responses => - #{200 => statsd_config_schema()} - } - }. + }. %%-------------------------------------------------------------------- %% Helper funcs @@ -66,19 +69,20 @@ schema("/statsd") -> statsd_config_schema() -> emqx_dashboard_swagger:schema_with_example( - ref(?SCHEMA_MODULE, "statsd"), - statsd_example()). + ref(?SCHEMA_MODULE, "statsd"), + statsd_example() + ). statsd_example() -> - #{ enable => true - , flush_time_interval => "32s" - , sample_time_interval => "32s" - , server => "127.0.0.1:8125" - }. + #{ + enable => true, + flush_time_interval => "32s", + sample_time_interval => "32s", + server => "127.0.0.1:8125" + }. statsd(get, _Params) -> {200, emqx:get_raw_config([<<"statsd">>], #{})}; - statsd(put, #{body := Body}) -> case emqx_statsd:update(Body) of {ok, NewConfig} -> diff --git a/apps/emqx_statsd/src/emqx_statsd_app.erl b/apps/emqx_statsd/src/emqx_statsd_app.erl index 16cdb2b80..4b34006ac 100644 --- a/apps/emqx_statsd/src/emqx_statsd_app.erl +++ b/apps/emqx_statsd/src/emqx_statsd_app.erl @@ -1,28 +1,29 @@ %%-------------------------------------------------------------------- - %% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. - %% - %% Licensed under the Apache License, Version 2.0 (the "License"); - %% you may not use this file except in compliance with the License. - %% You may obtain a copy of the License at - %% - %% http://www.apache.org/licenses/LICENSE-2.0 - %% - %% Unless required by applicable law or agreed to in writing, software - %% distributed under the License is distributed on an "AS IS" BASIS, - %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - %% See the License for the specific language governing permissions and - %% limitations under the License. - %%-------------------------------------------------------------------- +%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- - -module(emqx_statsd_app). +-module(emqx_statsd_app). - -behaviour(application). +-behaviour(application). -include("emqx_statsd.hrl"). - -export([ start/2 - , stop/1 - ]). +-export([ + start/2, + stop/1 +]). start(_StartType, _StartArgs) -> {ok, Sup} = emqx_statsd_sup:start_link(), diff --git a/apps/emqx_statsd/src/emqx_statsd_schema.erl b/apps/emqx_statsd/src/emqx_statsd_schema.erl index df98e4df1..e8cc32f99 100644 --- a/apps/emqx_statsd/src/emqx_statsd_schema.erl +++ b/apps/emqx_statsd/src/emqx_statsd_schema.erl @@ -23,10 +23,12 @@ -export([to_ip_port/1]). --export([ namespace/0 - , roots/0 - , fields/1 - , desc/1]). +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). -typerefl_from_string({ip_port/0, emqx_statsd_schema, to_ip_port}). @@ -35,19 +37,23 @@ namespace() -> "statsd". roots() -> ["statsd"]. fields("statsd") -> - [ {enable, hoconsc:mk(boolean(), - #{ default => false - , required => true - , desc => ?DESC(enable) - })} - , {server, fun server/1} - , {sample_time_interval, fun sample_interval/1} - , {flush_time_interval, fun flush_interval/1} + [ + {enable, + hoconsc:mk( + boolean(), + #{ + default => false, + required => true, + desc => ?DESC(enable) + } + )}, + {server, fun server/1}, + {sample_time_interval, fun sample_interval/1}, + {flush_time_interval, fun flush_interval/1} ]. desc("statsd") -> ?DESC(statsd); -desc(_) -> - undefined. +desc(_) -> undefined. server(type) -> emqx_schema:ip_port(); server(required) -> true; @@ -68,11 +74,12 @@ flush_interval(desc) -> ?DESC(?FUNCTION_NAME); flush_interval(_) -> undefined. to_ip_port(Str) -> - case string:tokens(Str, ":") of - [Ip, Port] -> - case inet:parse_address(Ip) of - {ok, R} -> {ok, {R, list_to_integer(Port)}}; - _ -> {error, Str} - end; - _ -> {error, Str} - end. + case string:tokens(Str, ":") of + [Ip, Port] -> + case inet:parse_address(Ip) of + {ok, R} -> {ok, {R, list_to_integer(Port)}}; + _ -> {error, Str} + end; + _ -> + {error, Str} + end. diff --git a/apps/emqx_statsd/src/emqx_statsd_sup.erl b/apps/emqx_statsd/src/emqx_statsd_sup.erl index 983cf97d4..851dbf8cc 100644 --- a/apps/emqx_statsd/src/emqx_statsd_sup.erl +++ b/apps/emqx_statsd/src/emqx_statsd_sup.erl @@ -7,21 +7,24 @@ -behaviour(supervisor). --export([ start_link/0 - , ensure_child_started/1 - , ensure_child_started/2 - , ensure_child_stopped/1 - ]). +-export([ + start_link/0, + ensure_child_started/1, + ensure_child_started/2, + ensure_child_stopped/1 +]). -export([init/1]). %% Helper macro for declaring children of supervisor --define(CHILD(Mod, Opts), #{id => Mod, - start => {Mod, start_link, [Opts]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [Mod]}). +-define(CHILD(Mod, Opts), #{ + id => Mod, + start => {Mod, start_link, [Opts]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [Mod] +}). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). diff --git a/apps/emqx_statsd/src/proto/emqx_statsd_proto_v1.erl b/apps/emqx_statsd/src/proto/emqx_statsd_proto_v1.erl index d3626b495..2e01f7d5e 100644 --- a/apps/emqx_statsd/src/proto/emqx_statsd_proto_v1.erl +++ b/apps/emqx_statsd/src/proto/emqx_statsd_proto_v1.erl @@ -18,12 +18,13 @@ -behaviour(emqx_bpapi). --export([ introduced_in/0 +-export([ + introduced_in/0, - , start/1 - , stop/1 - , restart/1 - ]). + start/1, + stop/1, + restart/1 +]). -include_lib("emqx/include/bpapi.hrl"). diff --git a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl index b0124e579..08c78dd07 100644 --- a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl +++ b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl @@ -21,9 +21,8 @@ t_statsd(_) -> receive {udp, _Socket, _Host, _Port, Bin} -> ?assert(length(Bin) > 50) - after - 11*1000 -> - ?assert(true, failed) + after 11 * 1000 -> + ?assert(true, failed) end, gen_udp:close(Socket). diff --git a/scripts/check-format.sh b/scripts/check-format.sh index 8289c27f5..7fa7ca7f0 100755 --- a/scripts/check-format.sh +++ b/scripts/check-format.sh @@ -15,6 +15,7 @@ APPS+=( 'apps/emqx_exhook') APPS+=( 'apps/emqx_retainer' 'apps/emqx_slow_subs') APPS+=( 'apps/emqx_management') APPS+=( 'apps/emqx_psk') +APPS+=( 'apps/emqx_plugin_libs' 'apps/emqx_machine' 'apps/emqx_statsd' ) for app in "${APPS[@]}"; do echo "$app ..."