From 4e6532266757ee567d02e4ac41f903e6d487ffc2 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 28 Apr 2022 21:43:34 +0200 Subject: [PATCH 1/4] refactor: move emqx_plugin_libs_metrics to emqx app because it can not depend on other apps --- apps/emqx/src/emqx_authentication.erl | 14 +-- .../emqx/src/emqx_authn_authz_metrics_sup.erl | 4 +- .../src/emqx_metrics_worker.erl} | 18 ++- .../test/emqx_metrics_worker_SUITE.erl} | 115 +++++++++--------- apps/emqx_authn/src/emqx_authn_api.erl | 2 +- .../src/proto/emqx_plugin_libs_proto_v1.erl | 8 +- apps/emqx_resource/include/emqx_resource.hrl | 2 +- apps/emqx_resource/src/emqx_resource.erl | 8 +- .../src/emqx_resource_instance.erl | 8 +- apps/emqx_resource/src/emqx_resource_sup.erl | 2 +- .../emqx_rule_engine/src/emqx_rule_engine.erl | 8 +- .../src/emqx_rule_engine_sup.erl | 2 +- .../src/emqx_rule_runtime.erl | 34 +++--- 13 files changed, 115 insertions(+), 110 deletions(-) rename apps/{emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl => emqx/src/emqx_metrics_worker.erl} (96%) rename apps/{emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl => emqx/test/emqx_metrics_worker_SUITE.erl} (55%) diff --git a/apps/emqx/src/emqx_authentication.erl b/apps/emqx/src/emqx_authentication.erl index 3c1305f76..745eaa715 100644 --- a/apps/emqx/src/emqx_authentication.erl +++ b/apps/emqx/src/emqx_authentication.erl @@ -582,7 +582,7 @@ handle_delete_authenticator(Chain, AuthenticatorID) -> [] -> {error, {not_found, {authenticator, AuthenticatorID}}}; [AuthenticatorID] -> - emqx_plugin_libs_metrics:clear_metrics(authn_metrics, AuthenticatorID), + emqx_metrics_worker:clear_metrics(authn_metrics, AuthenticatorID), ok end. @@ -613,7 +613,7 @@ handle_create_authenticator(Chain, Config, Providers) -> Chain#chain{authenticators = NAuthenticators} ), - ok = emqx_plugin_libs_metrics:create_metrics( + ok = emqx_metrics_worker:create_metrics( authn_metrics, AuthenticatorID, [matched, success, failed, ignore], @@ -628,10 +628,10 @@ handle_create_authenticator(Chain, Config, Providers) -> do_authenticate([], _) -> {stop, {error, not_authorized}}; do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | More], Credential) -> - emqx_plugin_libs_metrics:inc(authn_metrics, ID, matched), + emqx_metrics_worker:inc(authn_metrics, ID, matched), try Provider:authenticate(Credential, State) of ignore -> - ok = emqx_plugin_libs_metrics:inc(authn_metrics, ID, ignore), + ok = emqx_metrics_worker:inc(authn_metrics, ID, ignore), do_authenticate(More, Credential); Result -> %% {ok, Extra} @@ -641,9 +641,9 @@ do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | M %% {error, Reason} case Result of {ok, _} -> - emqx_plugin_libs_metrics:inc(authn_metrics, ID, success); + emqx_metrics_worker:inc(authn_metrics, ID, success); {error, _} -> - emqx_plugin_libs_metrics:inc(authn_metrics, ID, failed); + emqx_metrics_worker:inc(authn_metrics, ID, failed); _ -> ok end, @@ -657,7 +657,7 @@ do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | M stacktrace => Stacktrace, authenticator => ID }), - emqx_plugin_libs_metrics:inc(authn_metrics, ID, ignore), + emqx_metrics_worker:inc(authn_metrics, ID, ignore), do_authenticate(More, Credential) end. diff --git a/apps/emqx/src/emqx_authn_authz_metrics_sup.erl b/apps/emqx/src/emqx_authn_authz_metrics_sup.erl index ccffc604b..1c4bf06d5 100644 --- a/apps/emqx/src/emqx_authn_authz_metrics_sup.erl +++ b/apps/emqx/src/emqx_authn_authz_metrics_sup.erl @@ -26,8 +26,8 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - AuthnMetrics = emqx_plugin_libs_metrics:child_spec(emqx_authn_metrics, authn_metrics), - AuthzMetrics = emqx_plugin_libs_metrics:child_spec(eqmx_authz_metrics, authz_metrics), + AuthnMetrics = emqx_metrics_worker:child_spec(emqx_authn_metrics, authn_metrics), + AuthzMetrics = emqx_metrics_worker:child_spec(eqmx_authz_metrics, authz_metrics), {ok, { {one_for_one, 10, 100}, diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl b/apps/emqx/src/emqx_metrics_worker.erl similarity index 96% rename from apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl rename to apps/emqx/src/emqx_metrics_worker.erl index aea4d0d5d..575dcca6c 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl +++ b/apps/emqx/src/emqx_metrics_worker.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_plugin_libs_metrics). +-module(emqx_metrics_worker). -behaviour(gen_server). @@ -100,16 +100,16 @@ -spec child_spec(handler_name()) -> supervisor:child_spec(). child_spec(Name) -> - child_spec(emqx_plugin_libs_metrics, Name). + child_spec(emqx_metrics_worker, Name). child_spec(ChldName, Name) -> #{ id => ChldName, - start => {emqx_plugin_libs_metrics, start_link, [Name]}, + start => {emqx_metrics_worker, start_link, [Name]}, restart => permanent, shutdown => 5000, type => worker, - modules => [emqx_plugin_libs_metrics] + modules => [emqx_metrics_worker] }. -spec create_metrics(handler_name(), metric_id(), [atom()]) -> ok | {error, term()}. @@ -284,7 +284,15 @@ terminate(_Reason, #state{metric_ids = MIDs}) -> persistent_term:erase(?CntrRef(Name)). stop(Name) -> - gen_server:stop(Name). + try + gen_server:stop(Name) + catch + exit:noproc -> + ok; + exit:timeout -> + %% after timeout, the process killed by gen.erl + ok + end. %%------------------------------------------------------------------------------ %% Internal Functions diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl b/apps/emqx/test/emqx_metrics_worker_SUITE.erl similarity index 55% rename from apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl rename to apps/emqx/test/emqx_metrics_worker_SUITE.erl index cd6e1b0b2..326b0be1e 100644 --- a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl +++ b/apps/emqx/test/emqx_metrics_worker_SUITE.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_plugin_libs_metrics_SUITE). +-module(emqx_metrics_worker_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -31,18 +31,15 @@ suite() -> -define(NAME, ?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([emqx_conf]), - {ok, _} = emqx_plugin_libs_metrics:start_link(?NAME), + {ok, _} = emqx_metrics_worker:start_link(?NAME), Config. end_per_suite(_Config) -> - catch emqx_plugin_libs_metrics:stop(?NAME), - emqx_common_test_helpers:stop_apps([emqx_conf]), - ok. + ok = emqx_metrics_worker:stop(?NAME). init_per_testcase(_, Config) -> - catch emqx_plugin_libs_metrics:stop(?NAME), - {ok, _} = emqx_plugin_libs_metrics:start_link(?NAME), + ok = emqx_metrics_worker:stop(?NAME), + {ok, _} = emqx_metrics_worker:start_link(?NAME), Config. end_per_testcase(_, _Config) -> @@ -50,7 +47,7 @@ end_per_testcase(_, _Config) -> t_get_metrics(_) -> Metrics = [a, b, c], - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics), + ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, Metrics), %% all the metrics are set to zero at start ?assertMatch( #{ @@ -65,12 +62,12 @@ t_get_metrics(_) -> c := 0 } }, - emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) ), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), + ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), + ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), ct:sleep(1500), ?LET( #{ @@ -85,7 +82,7 @@ t_get_metrics(_) -> c := 2 } }, - emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>), + emqx_metrics_worker:get_metrics(?NAME, <<"testid">>), { ?assert(CurrA > 0), ?assert(CurrB > 0), @@ -95,11 +92,11 @@ t_get_metrics(_) -> ?assert(MaxC > 0) } ), - ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). t_reset_metrics(_) -> Metrics = [a, b, c], - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics), + ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, Metrics), %% all the metrics are set to zero at start ?assertMatch( #{ @@ -114,14 +111,14 @@ t_reset_metrics(_) -> c := 0 } }, - emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) ), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), + ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), + ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), ct:sleep(1500), - ok = emqx_plugin_libs_metrics:reset_metrics(?NAME, <<"testid">>), + ok = emqx_metrics_worker:reset_metrics(?NAME, <<"testid">>), ?LET( #{ rate := #{ @@ -135,7 +132,7 @@ t_reset_metrics(_) -> c := 0 } }, - emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>), + emqx_metrics_worker:get_metrics(?NAME, <<"testid">>), { ?assert(CurrA == 0), ?assert(CurrB == 0), @@ -145,19 +142,19 @@ t_reset_metrics(_) -> ?assert(MaxC == 0) } ), - ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). t_get_metrics_2(_) -> Metrics = [a, b, c], - ok = emqx_plugin_libs_metrics:create_metrics( + ok = emqx_metrics_worker:create_metrics( ?NAME, <<"testid">>, Metrics, [a] ), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), + ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), + ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), ?assertMatch( #{ rate := Rate = #{ @@ -169,13 +166,13 @@ t_get_metrics_2(_) -> c := 1 } } when map_size(Rate) =:= 1, - emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) ), - ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). t_recreate_metrics(_) -> - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, [a]), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a), + ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, [a]), + ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a), ?assertMatch( #{ rate := R = #{ @@ -185,12 +182,12 @@ t_recreate_metrics(_) -> a := 1 } } when map_size(R) == 1 andalso map_size(C) == 1, - emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) ), %% we create the metrics again, to add some counters - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, [a, b, c]), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), + ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, [a, b, c]), + ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b), + ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c), ?assertMatch( #{ rate := R = #{ @@ -202,42 +199,42 @@ t_recreate_metrics(_) -> a := 1, b := 1, c := 1 } } when map_size(R) == 3 andalso map_size(C) == 3, - emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>) + emqx_metrics_worker:get_metrics(?NAME, <<"testid">>) ), - ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>). t_inc_matched(_) -> Metrics = ['rules.matched'], - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>, Metrics), - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule2">>, Metrics), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'), - ?assertEqual(1, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')), - ?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule2">>, 'rules.matched')), - ?assertEqual(0, emqx_plugin_libs_metrics:get(?NAME, <<"rule3">>, 'rules.matched')), - ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>), - ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule2">>). + ok = emqx_metrics_worker:create_metrics(?NAME, <<"rule1">>, Metrics), + ok = emqx_metrics_worker:create_metrics(?NAME, <<"rule2">>, Metrics), + ok = emqx_metrics_worker:inc(?NAME, <<"rule1">>, 'rules.matched'), + ok = emqx_metrics_worker:inc(?NAME, <<"rule2">>, 'rules.matched'), + ok = emqx_metrics_worker:inc(?NAME, <<"rule2">>, 'rules.matched'), + ?assertEqual(1, emqx_metrics_worker:get(?NAME, <<"rule1">>, 'rules.matched')), + ?assertEqual(2, emqx_metrics_worker:get(?NAME, <<"rule2">>, 'rules.matched')), + ?assertEqual(0, emqx_metrics_worker:get(?NAME, <<"rule3">>, 'rules.matched')), + ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule1">>), + ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule2">>). t_rate(_) -> - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>, ['rules.matched']), - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>, ['rules.matched']), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'), - ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule:2">>, 'rules.matched'), - ?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')), + ok = emqx_metrics_worker:create_metrics(?NAME, <<"rule1">>, ['rules.matched']), + ok = emqx_metrics_worker:create_metrics(?NAME, <<"rule:2">>, ['rules.matched']), + ok = emqx_metrics_worker:inc(?NAME, <<"rule1">>, 'rules.matched'), + ok = emqx_metrics_worker:inc(?NAME, <<"rule1">>, 'rules.matched'), + ok = emqx_metrics_worker:inc(?NAME, <<"rule:2">>, 'rules.matched'), + ?assertEqual(2, emqx_metrics_worker:get(?NAME, <<"rule1">>, 'rules.matched')), ct:sleep(1000), ?LET( #{'rules.matched' := #{max := Max, current := Current}}, - emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>), + emqx_metrics_worker:get_rate(?NAME, <<"rule1">>), {?assert(Max =< 2), ?assert(Current =< 2)} ), ct:sleep(2100), ?LET( #{'rules.matched' := #{max := Max, current := Current, last5m := Last5Min}}, - emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>), + emqx_metrics_worker:get_rate(?NAME, <<"rule1">>), {?assert(Max =< 2), ?assert(Current == 0), ?assert(Last5Min =< 0.67)} ), ct:sleep(3000), - ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>), - ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule:2">>). + ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule1">>), + ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule:2">>). diff --git a/apps/emqx_authn/src/emqx_authn_api.erl b/apps/emqx_authn/src/emqx_authn_api.erl index ea81fd6cd..96f0093b2 100644 --- a/apps/emqx_authn/src/emqx_authn_api.erl +++ b/apps/emqx_authn/src/emqx_authn_api.erl @@ -943,7 +943,7 @@ lookup_from_local_node(ChainName, AuthenticatorID) -> NodeId = node(self()), case emqx_authentication:lookup_authenticator(ChainName, AuthenticatorID) of {ok, #{provider := Provider, state := State}} -> - Metrics = emqx_plugin_libs_metrics:get_metrics(authn_metrics, AuthenticatorID), + Metrics = emqx_metrics_worker:get_metrics(authn_metrics, AuthenticatorID), case lists:member(Provider, resource_provider()) of false -> {ok, {NodeId, connected, Metrics, #{}}}; diff --git a/apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl b/apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl index fa7d9416d..b8936a94f 100644 --- a/apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl +++ b/apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl @@ -31,8 +31,8 @@ introduced_in() -> -spec get_metrics( node(), - emqx_plugin_libs_metrics:handler_name(), - emqx_plugin_libs_metrics:metric_id() -) -> emqx_plugin_libs_metrics:metrics() | {badrpc, _}. + emqx_metrics_worker:handler_name(), + emqx_metrics_worker:metric_id() +) -> emqx_metrics_worker:metrics() | {badrpc, _}. get_metrics(Node, HandlerName, MetricId) -> - rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [HandlerName, MetricId]). + rpc:call(Node, emqx_metrics_worker, get_metrics, [HandlerName, MetricId]). diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 41cbc872e..ca8b661d4 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -27,7 +27,7 @@ config := resource_config(), state := resource_state(), status := resource_connection_status(), - metrics := emqx_plugin_libs_metrics:metrics() + metrics := emqx_metrics_worker:metrics() }. -type resource_group() :: binary(). -type create_opts() :: #{ diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 12e02ce0a..ba03494fa 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -250,12 +250,12 @@ query(InstId, Request, AfterQuery) -> {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} -> %% the resource state is readonly to Module:on_query/4 %% and the `after_query()` functions should be thread safe - ok = emqx_plugin_libs_metrics:inc(resource_metrics, InstId, matched), + ok = emqx_metrics_worker:inc(resource_metrics, InstId, matched), try Mod:on_query(InstId, Request, AfterQuery, ResourceState) catch Err:Reason:ST -> - emqx_plugin_libs_metrics:inc(resource_metrics, InstId, exception), + emqx_metrics_worker:inc(resource_metrics, InstId, exception), erlang:raise(Err, Reason, ST) end; {error, not_found} -> @@ -418,8 +418,8 @@ filter_instances(Filter) -> [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)]. inc_metrics_funcs(InstId) -> - OnFailed = [{fun emqx_plugin_libs_metrics:inc/3, [resource_metrics, InstId, failed]}], - OnSucc = [{fun emqx_plugin_libs_metrics:inc/3, [resource_metrics, InstId, success]}], + OnFailed = [{fun emqx_metrics_worker:inc/3, [resource_metrics, InstId, failed]}], + OnSucc = [{fun emqx_metrics_worker:inc/3, [resource_metrics, InstId, success]}], {OnSucc, OnFailed}. call_instance(InstId, Query) -> diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 04592159c..a52c6d229 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -82,10 +82,10 @@ make_test_id() -> <>. get_metrics(InstId) -> - emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId). + emqx_metrics_worker:get_metrics(resource_metrics, InstId). reset_metrics(InstId) -> - emqx_plugin_libs_metrics:reset_metrics(resource_metrics, InstId). + emqx_metrics_worker:reset_metrics(resource_metrics, InstId). force_lookup(InstId) -> {ok, _Group, Data} = lookup(InstId), @@ -200,7 +200,7 @@ do_create(InstId, Group, ResourceType, Config, Opts) -> {ok, already_created}; {error, not_found} -> ok = do_start(InstId, Group, ResourceType, Config, Opts), - ok = emqx_plugin_libs_metrics:create_metrics( + ok = emqx_metrics_worker:create_metrics( resource_metrics, InstId, [matched, success, failed, exception], @@ -243,7 +243,7 @@ do_remove(Group, #{id := InstId} = Data, ClearMetrics) -> _ = do_stop(Group, Data), ets:delete(emqx_resource_instance, InstId), case ClearMetrics of - true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId); + true -> ok = emqx_metrics_worker:clear_metrics(resource_metrics, InstId); false -> ok end, ok. diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl index 770ca1fed..3f35c2399 100644 --- a/apps/emqx_resource/src/emqx_resource_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -33,7 +33,7 @@ init([]) -> _ = ets:new(emqx_resource_instance, TabOpts), SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, - Metrics = emqx_plugin_libs_metrics:child_spec(resource_metrics), + Metrics = emqx_metrics_worker:child_spec(resource_metrics), Pool = ?RESOURCE_INST_MOD, Mod = ?RESOURCE_INST_MOD, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index b08964646..95dac3807 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -214,19 +214,19 @@ load_hooks_for_rule(#{from := Topics}) -> lists:foreach(fun emqx_rule_events:load/1, Topics). maybe_add_metrics_for_rule(Id) -> - case emqx_plugin_libs_metrics:has_metrics(rule_metrics, Id) of + case emqx_metrics_worker:has_metrics(rule_metrics, Id) of true -> ok; false -> - ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, Id, ?METRICS, ?RATE_METRICS) + ok = emqx_metrics_worker:create_metrics(rule_metrics, Id, ?METRICS, ?RATE_METRICS) end. clear_metrics_for_rule(Id) -> - ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, Id). + ok = emqx_metrics_worker:clear_metrics(rule_metrics, Id). -spec reset_metrics_for_rule(rule_id()) -> ok. reset_metrics_for_rule(Id) -> - emqx_plugin_libs_metrics:reset_metrics(rule_metrics, Id). + emqx_metrics_worker:reset_metrics(rule_metrics, Id). unload_hooks_for_rule(#{id := Id, from := Topics}) -> lists:foreach( diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl index 621766945..4818727b4 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl @@ -36,5 +36,5 @@ init([]) -> type => worker, modules => [emqx_rule_engine] }, - Metrics = emqx_plugin_libs_metrics:child_spec(rule_metrics), + Metrics = emqx_metrics_worker:child_spec(rule_metrics), {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 81cabd40b..fc2401156 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -64,14 +64,14 @@ apply_rule_discard_result(Rule, Input) -> ok. apply_rule(Rule = #{id := RuleID}, Input) -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.matched'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'sql.matched'), clear_rule_payload(), try do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})) catch %% ignore the errors if select or match failed _:Reason = {select_and_transform_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(warning, #{ msg => "SELECT_clause_exception", rule_id => RuleID, @@ -79,7 +79,7 @@ apply_rule(Rule = #{id := RuleID}, Input) -> }), {error, Reason}; _:Reason = {match_conditions_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(warning, #{ msg => "WHERE_clause_exception", rule_id => RuleID, @@ -87,7 +87,7 @@ apply_rule(Rule = #{id := RuleID}, Input) -> }), {error, Reason}; _:Reason = {select_and_collect_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(warning, #{ msg => "FOREACH_clause_exception", rule_id => RuleID, @@ -95,7 +95,7 @@ apply_rule(Rule = #{id := RuleID}, Input) -> }), {error, Reason}; _:Reason = {match_incase_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(warning, #{ msg => "INCASE_clause_exception", rule_id => RuleID, @@ -103,7 +103,7 @@ apply_rule(Rule = #{id := RuleID}, Input) -> }), {error, Reason}; Class:Error:StkTrace -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(error, #{ msg => "apply_rule_failed", rule_id => RuleID, @@ -141,13 +141,13 @@ do_apply_rule( Collection2 = filter_collection(Input, InCase, DoEach, Collection), case Collection2 of [] -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'); + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'sql.failed.no_result'); _ -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.passed') + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'sql.passed') end, {ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]}; false -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'sql.failed.no_result'), {error, nomatch} end; do_apply_rule( @@ -171,10 +171,10 @@ do_apply_rule( ) of true -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.passed'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'sql.passed'), {ok, handle_output_list(RuleId, Outputs, Selected, Input)}; false -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'sql.failed.no_result'), {error, nomatch} end. @@ -316,21 +316,21 @@ handle_output_list(RuleId, Outputs, Selected, Envs) -> [handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs]. handle_output(RuleId, OutId, Selected, Envs) -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.total'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.total'), try Result = do_handle_output(OutId, Selected, Envs), - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.success'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.success'), Result catch throw:out_of_service -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'), - ok = emqx_plugin_libs_metrics:inc( + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed'), + ok = emqx_metrics_worker:inc( rule_metrics, RuleId, 'outputs.failed.out_of_service' ), ?SLOG(warning, #{msg => "out_of_service", output => OutId}); Err:Reason:ST -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'), - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.unknown'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed.unknown'), ?SLOG(error, #{ msg => "output_failed", output => OutId, From 1e170da9e88565f369cdb06878a090c16d3ac417 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 28 Apr 2022 13:36:46 +0200 Subject: [PATCH 2/4] test: ensure emqx_metrics gen_server stopped after each test --- apps/emqx/src/emqx_metrics.erl | 9 ++++++++- apps/emqx/test/emqx_metrics_SUITE.erl | 15 ++++++++++++--- scripts/git-hook-pre-commit.sh | 11 +++++++---- 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_metrics.erl b/apps/emqx/src/emqx_metrics.erl index a5d63cb09..f7d3a5dc9 100644 --- a/apps/emqx/src/emqx_metrics.erl +++ b/apps/emqx/src/emqx_metrics.erl @@ -279,7 +279,14 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -spec stop() -> ok. -stop() -> gen_server:stop(?SERVER). +stop() -> + try + gen_server:stop(?SERVER) + catch + exit:R when R =:= noproc orelse R =:= timeout -> + %% pid is killed after timeout + ok + end. %% BACKW: v4.3.0 upgrade_retained_delayed_counter_type() -> diff --git a/apps/emqx/test/emqx_metrics_SUITE.erl b/apps/emqx/test/emqx_metrics_SUITE.erl index ababd359d..10bbeda27 100644 --- a/apps/emqx/test/emqx_metrics_SUITE.erl +++ b/apps/emqx/test/emqx_metrics_SUITE.erl @@ -166,7 +166,16 @@ t_trans(_) -> ). with_metrics_server(Fun) -> - _ = supervisor:terminate_child(emqx_kernel_sup, emqx_metrics), + try + supervisor:terminate_child(emqx_kernel_sup, emqx_metrics) + catch + exit:_ -> + ok + end, {ok, _} = emqx_metrics:start_link(), - _ = Fun(), - ok = emqx_metrics:stop(). + try + _ = Fun(), + ok + after + ok = emqx_metrics:stop() + end. diff --git a/scripts/git-hook-pre-commit.sh b/scripts/git-hook-pre-commit.sh index 7b9e06385..dc4e13e93 100755 --- a/scripts/git-hook-pre-commit.sh +++ b/scripts/git-hook-pre-commit.sh @@ -2,10 +2,13 @@ set -euo pipefail -files="$(git diff --cached --name-only | grep -E '.*\.erl' || true)" -if [[ "${files}" == '' ]]; then +OPT="${1:--c}" + +files_dirty="$(git diff --name-only | grep -E '.*\.erl' || true)" +files_cached="$(git diff --cached --name-only | grep -E '.*\.erl' || true)" +if [[ "${files_dirty}" == '' ]] && [[ "${files_cached}" == '' ]]; then exit 0 fi -files="$(echo -e "$files" | xargs)" +files="$(echo -e "${files_dirty} \n ${files_cached}" | xargs)" # shellcheck disable=SC2086 -./scripts/erlfmt -c $files +./scripts/erlfmt $OPT $files From 4a6dabbe57ad9edb7dfa33ee19baf8b9b2a1372c Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Fri, 29 Apr 2022 11:44:15 +0800 Subject: [PATCH 3/4] fix: rename to emqx_metrics_worker --- apps/emqx_authz/src/emqx_authz.erl | 14 +++++++------- apps/emqx_authz/src/emqx_authz_api_sources.erl | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index a493e339d..4e498c237 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -173,7 +173,7 @@ do_post_config_update({?CMD_PREPEND, RawNewSource}, Sources) -> InitedNewSource = init_source(get_source_by_type(type(RawNewSource), Sources)), %% create metrics TypeName = type(RawNewSource), - ok = emqx_plugin_libs_metrics:create_metrics( + ok = emqx_metrics_worker:create_metrics( authz_metrics, TypeName, [matched, allow, deny, ignore], @@ -194,7 +194,7 @@ do_post_config_update({{?CMD_DELETE, Type}, _RawNewSource}, _Sources) -> OldInitedSources = lookup(), {OldSource, Front, Rear} = take(Type, OldInitedSources), %% delete metrics - ok = emqx_plugin_libs_metrics:clear_metrics(authz_metrics, Type), + ok = emqx_metrics_worker:clear_metrics(authz_metrics, Type), ok = ensure_resource_deleted(OldSource), clear_certs(OldSource), Front ++ Rear; @@ -268,7 +268,7 @@ init_source(#{type := Type} = Source) -> init_metrics(Source) -> TypeName = type(Source), - emqx_plugin_libs_metrics:create_metrics( + emqx_metrics_worker:create_metrics( authz_metrics, TypeName, [matched, allow, deny, ignore], @@ -310,7 +310,7 @@ authorize( ipaddr => IpAddress, topic => Topic }), - emqx_plugin_libs_metrics:inc(authz_metrics, AuthzSource, allow), + emqx_metrics_worker:inc(authz_metrics, AuthzSource, allow), emqx_metrics:inc(?METRIC_ALLOW), {stop, allow}; {{matched, deny}, AuthzSource} -> @@ -324,7 +324,7 @@ authorize( ipaddr => IpAddress, topic => Topic }), - emqx_plugin_libs_metrics:inc(authz_metrics, AuthzSource, deny), + emqx_metrics_worker:inc(authz_metrics, AuthzSource, deny), emqx_metrics:inc(?METRIC_DENY), {stop, deny}; nomatch -> @@ -354,10 +354,10 @@ do_authorize( [Connector = #{type := Type} | Tail] ) -> Module = authz_module(Type), - emqx_plugin_libs_metrics:inc(authz_metrics, Type, matched), + emqx_metrics_worker:inc(authz_metrics, Type, matched), case Module:authorize(Client, PubSub, Topic, Connector) of nomatch -> - emqx_plugin_libs_metrics:inc(authz_metrics, Type, ignore), + emqx_metrics_worker:inc(authz_metrics, Type, ignore), do_authorize(Client, PubSub, Topic, Tail); Matched -> {Matched, Type} diff --git a/apps/emqx_authz/src/emqx_authz_api_sources.erl b/apps/emqx_authz/src/emqx_authz_api_sources.erl index c5920dd14..72fa10c98 100644 --- a/apps/emqx_authz/src/emqx_authz_api_sources.erl +++ b/apps/emqx_authz/src/emqx_authz_api_sources.erl @@ -308,7 +308,7 @@ lookup_from_local_node(Type) -> NodeId = node(self()), try emqx_authz:lookup(Type) of #{annotations := #{id := ResourceId}} -> - Metrics = emqx_plugin_libs_metrics:get_metrics(authz_metrics, Type), + Metrics = emqx_metrics_worker:get_metrics(authz_metrics, Type), case emqx_resource:get_instance(ResourceId) of {error, not_found} -> {error, {NodeId, not_found_resource}}; @@ -316,7 +316,7 @@ lookup_from_local_node(Type) -> {ok, {NodeId, Status, Metrics, ResourceMetrics}} end; _ -> - Metrics = emqx_plugin_libs_metrics:get_metrics(authz_metrics, Type), + Metrics = emqx_metrics_worker:get_metrics(authz_metrics, Type), {ok, {NodeId, connected, Metrics, #{}}} catch _:Reason -> {error, {NodeId, list_to_binary(io_lib:format("~p", [Reason]))}} From fc73f96aeb7084b7af3210ed20f4128a432a4f02 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 29 Apr 2022 11:52:31 +0800 Subject: [PATCH 4/4] fix(test): use different http port for testing authz and authn --- apps/emqx_authn/test/emqx_authn_http_SUITE.erl | 6 +++--- apps/emqx_authn/test/emqx_authn_https_SUITE.erl | 4 ++-- apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqx_authn/test/emqx_authn_http_SUITE.erl b/apps/emqx_authn/test/emqx_authn_http_SUITE.erl index 2ccb0c9df..dce4522f3 100644 --- a/apps/emqx_authn/test/emqx_authn_http_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_http_SUITE.erl @@ -26,7 +26,7 @@ -define(PATH, [?CONF_NS_ATOM]). --define(HTTP_PORT, 33333). +-define(HTTP_PORT, 32333). -define(HTTP_PATH, "/auth"). -define(CREDENTIALS, #{ username => <<"plain">>, @@ -178,7 +178,7 @@ t_destroy(_Config) -> t_update(_Config) -> CorrectConfig = raw_http_auth_config(), IncorrectConfig = - CorrectConfig#{url => <<"http://127.0.0.1:33333/invalid">>}, + CorrectConfig#{url => <<"http://127.0.0.1:32333/invalid">>}, {ok, _} = emqx:update_config( ?PATH, @@ -267,7 +267,7 @@ raw_http_auth_config() -> backend => <<"http">>, method => <<"get">>, - url => <<"http://127.0.0.1:33333/auth">>, + url => <<"http://127.0.0.1:32333/auth">>, body => #{<<"username">> => ?PH_USERNAME, <<"password">> => ?PH_PASSWORD}, headers => #{<<"X-Test-Header">> => <<"Test Value">>} }. diff --git a/apps/emqx_authn/test/emqx_authn_https_SUITE.erl b/apps/emqx_authn/test/emqx_authn_https_SUITE.erl index b67f040ea..c3327c31b 100644 --- a/apps/emqx_authn/test/emqx_authn_https_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_https_SUITE.erl @@ -26,7 +26,7 @@ -define(PATH, [?CONF_NS_ATOM]). --define(HTTPS_PORT, 33333). +-define(HTTPS_PORT, 32334). -define(HTTPS_PATH, "/auth"). -define(CREDENTIALS, #{ username => <<"plain">>, @@ -148,7 +148,7 @@ raw_https_auth_config(SpecificSSLOpts) -> backend => <<"http">>, method => <<"get">>, - url => <<"https://127.0.0.1:33333/auth">>, + url => <<"https://127.0.0.1:32334/auth">>, body => #{<<"username">> => ?PH_USERNAME, <<"password">> => ?PH_PASSWORD}, headers => #{<<"X-Test-Header">> => <<"Test Value">>}, ssl => maps:merge(SSLOpts, SpecificSSLOpts) diff --git a/apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl b/apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl index dbdacaa6b..b7002c9e5 100644 --- a/apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl @@ -25,7 +25,7 @@ -define(AUTHN_ID, <<"mechanism:jwt">>). --define(JWKS_PORT, 33333). +-define(JWKS_PORT, 31333). -define(JWKS_PATH, "/jwks.json"). all() ->