Merge pull request #7820 from zmstone/0428-fix-flaky-test
test: ensure emqx_metrics gen_server stopped after each test
This commit is contained in:
commit
f28559ef62
|
@ -582,7 +582,7 @@ handle_delete_authenticator(Chain, AuthenticatorID) ->
|
||||||
[] ->
|
[] ->
|
||||||
{error, {not_found, {authenticator, AuthenticatorID}}};
|
{error, {not_found, {authenticator, AuthenticatorID}}};
|
||||||
[AuthenticatorID] ->
|
[AuthenticatorID] ->
|
||||||
emqx_plugin_libs_metrics:clear_metrics(authn_metrics, AuthenticatorID),
|
emqx_metrics_worker:clear_metrics(authn_metrics, AuthenticatorID),
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -613,7 +613,7 @@ handle_create_authenticator(Chain, Config, Providers) ->
|
||||||
Chain#chain{authenticators = NAuthenticators}
|
Chain#chain{authenticators = NAuthenticators}
|
||||||
),
|
),
|
||||||
|
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(
|
ok = emqx_metrics_worker:create_metrics(
|
||||||
authn_metrics,
|
authn_metrics,
|
||||||
AuthenticatorID,
|
AuthenticatorID,
|
||||||
[matched, success, failed, ignore],
|
[matched, success, failed, ignore],
|
||||||
|
@ -628,10 +628,10 @@ handle_create_authenticator(Chain, Config, Providers) ->
|
||||||
do_authenticate([], _) ->
|
do_authenticate([], _) ->
|
||||||
{stop, {error, not_authorized}};
|
{stop, {error, not_authorized}};
|
||||||
do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | More], Credential) ->
|
do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | More], Credential) ->
|
||||||
emqx_plugin_libs_metrics:inc(authn_metrics, ID, matched),
|
emqx_metrics_worker:inc(authn_metrics, ID, matched),
|
||||||
try Provider:authenticate(Credential, State) of
|
try Provider:authenticate(Credential, State) of
|
||||||
ignore ->
|
ignore ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(authn_metrics, ID, ignore),
|
ok = emqx_metrics_worker:inc(authn_metrics, ID, ignore),
|
||||||
do_authenticate(More, Credential);
|
do_authenticate(More, Credential);
|
||||||
Result ->
|
Result ->
|
||||||
%% {ok, Extra}
|
%% {ok, Extra}
|
||||||
|
@ -641,9 +641,9 @@ do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | M
|
||||||
%% {error, Reason}
|
%% {error, Reason}
|
||||||
case Result of
|
case Result of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
emqx_plugin_libs_metrics:inc(authn_metrics, ID, success);
|
emqx_metrics_worker:inc(authn_metrics, ID, success);
|
||||||
{error, _} ->
|
{error, _} ->
|
||||||
emqx_plugin_libs_metrics:inc(authn_metrics, ID, failed);
|
emqx_metrics_worker:inc(authn_metrics, ID, failed);
|
||||||
_ ->
|
_ ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
|
@ -657,7 +657,7 @@ do_authenticate([#authenticator{id = ID, provider = Provider, state = State} | M
|
||||||
stacktrace => Stacktrace,
|
stacktrace => Stacktrace,
|
||||||
authenticator => ID
|
authenticator => ID
|
||||||
}),
|
}),
|
||||||
emqx_plugin_libs_metrics:inc(authn_metrics, ID, ignore),
|
emqx_metrics_worker:inc(authn_metrics, ID, ignore),
|
||||||
do_authenticate(More, Credential)
|
do_authenticate(More, Credential)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -26,8 +26,8 @@ start_link() ->
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
AuthnMetrics = emqx_plugin_libs_metrics:child_spec(emqx_authn_metrics, authn_metrics),
|
AuthnMetrics = emqx_metrics_worker:child_spec(emqx_authn_metrics, authn_metrics),
|
||||||
AuthzMetrics = emqx_plugin_libs_metrics:child_spec(eqmx_authz_metrics, authz_metrics),
|
AuthzMetrics = emqx_metrics_worker:child_spec(eqmx_authz_metrics, authz_metrics),
|
||||||
{ok,
|
{ok,
|
||||||
{
|
{
|
||||||
{one_for_one, 10, 100},
|
{one_for_one, 10, 100},
|
||||||
|
|
|
@ -279,7 +279,14 @@ start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
-spec stop() -> ok.
|
-spec stop() -> ok.
|
||||||
stop() -> gen_server:stop(?SERVER).
|
stop() ->
|
||||||
|
try
|
||||||
|
gen_server:stop(?SERVER)
|
||||||
|
catch
|
||||||
|
exit:R when R =:= noproc orelse R =:= timeout ->
|
||||||
|
%% pid is killed after timeout
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
%% BACKW: v4.3.0
|
%% BACKW: v4.3.0
|
||||||
upgrade_retained_delayed_counter_type() ->
|
upgrade_retained_delayed_counter_type() ->
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_plugin_libs_metrics).
|
-module(emqx_metrics_worker).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
@ -100,16 +100,16 @@
|
||||||
|
|
||||||
-spec child_spec(handler_name()) -> supervisor:child_spec().
|
-spec child_spec(handler_name()) -> supervisor:child_spec().
|
||||||
child_spec(Name) ->
|
child_spec(Name) ->
|
||||||
child_spec(emqx_plugin_libs_metrics, Name).
|
child_spec(emqx_metrics_worker, Name).
|
||||||
|
|
||||||
child_spec(ChldName, Name) ->
|
child_spec(ChldName, Name) ->
|
||||||
#{
|
#{
|
||||||
id => ChldName,
|
id => ChldName,
|
||||||
start => {emqx_plugin_libs_metrics, start_link, [Name]},
|
start => {emqx_metrics_worker, start_link, [Name]},
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
shutdown => 5000,
|
shutdown => 5000,
|
||||||
type => worker,
|
type => worker,
|
||||||
modules => [emqx_plugin_libs_metrics]
|
modules => [emqx_metrics_worker]
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-spec create_metrics(handler_name(), metric_id(), [atom()]) -> ok | {error, term()}.
|
-spec create_metrics(handler_name(), metric_id(), [atom()]) -> ok | {error, term()}.
|
||||||
|
@ -284,7 +284,15 @@ terminate(_Reason, #state{metric_ids = MIDs}) ->
|
||||||
persistent_term:erase(?CntrRef(Name)).
|
persistent_term:erase(?CntrRef(Name)).
|
||||||
|
|
||||||
stop(Name) ->
|
stop(Name) ->
|
||||||
gen_server:stop(Name).
|
try
|
||||||
|
gen_server:stop(Name)
|
||||||
|
catch
|
||||||
|
exit:noproc ->
|
||||||
|
ok;
|
||||||
|
exit:timeout ->
|
||||||
|
%% after timeout, the process killed by gen.erl
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal Functions
|
%% Internal Functions
|
|
@ -166,7 +166,16 @@ t_trans(_) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
with_metrics_server(Fun) ->
|
with_metrics_server(Fun) ->
|
||||||
_ = supervisor:terminate_child(emqx_kernel_sup, emqx_metrics),
|
try
|
||||||
|
supervisor:terminate_child(emqx_kernel_sup, emqx_metrics)
|
||||||
|
catch
|
||||||
|
exit:_ ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
{ok, _} = emqx_metrics:start_link(),
|
{ok, _} = emqx_metrics:start_link(),
|
||||||
_ = Fun(),
|
try
|
||||||
ok = emqx_metrics:stop().
|
_ = Fun(),
|
||||||
|
ok
|
||||||
|
after
|
||||||
|
ok = emqx_metrics:stop()
|
||||||
|
end.
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_plugin_libs_metrics_SUITE).
|
-module(emqx_metrics_worker_SUITE).
|
||||||
|
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
@ -31,18 +31,15 @@ suite() ->
|
||||||
-define(NAME, ?MODULE).
|
-define(NAME, ?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_common_test_helpers:start_apps([emqx_conf]),
|
{ok, _} = emqx_metrics_worker:start_link(?NAME),
|
||||||
{ok, _} = emqx_plugin_libs_metrics:start_link(?NAME),
|
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
catch emqx_plugin_libs_metrics:stop(?NAME),
|
ok = emqx_metrics_worker:stop(?NAME).
|
||||||
emqx_common_test_helpers:stop_apps([emqx_conf]),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
catch emqx_plugin_libs_metrics:stop(?NAME),
|
ok = emqx_metrics_worker:stop(?NAME),
|
||||||
{ok, _} = emqx_plugin_libs_metrics:start_link(?NAME),
|
{ok, _} = emqx_metrics_worker:start_link(?NAME),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(_, _Config) ->
|
end_per_testcase(_, _Config) ->
|
||||||
|
@ -50,7 +47,7 @@ end_per_testcase(_, _Config) ->
|
||||||
|
|
||||||
t_get_metrics(_) ->
|
t_get_metrics(_) ->
|
||||||
Metrics = [a, b, c],
|
Metrics = [a, b, c],
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics),
|
ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, Metrics),
|
||||||
%% all the metrics are set to zero at start
|
%% all the metrics are set to zero at start
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
|
@ -65,12 +62,12 @@ t_get_metrics(_) ->
|
||||||
c := 0
|
c := 0
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)
|
emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
|
||||||
),
|
),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
|
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
|
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
|
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
|
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
|
||||||
ct:sleep(1500),
|
ct:sleep(1500),
|
||||||
?LET(
|
?LET(
|
||||||
#{
|
#{
|
||||||
|
@ -85,7 +82,7 @@ t_get_metrics(_) ->
|
||||||
c := 2
|
c := 2
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>),
|
emqx_metrics_worker:get_metrics(?NAME, <<"testid">>),
|
||||||
{
|
{
|
||||||
?assert(CurrA > 0),
|
?assert(CurrA > 0),
|
||||||
?assert(CurrB > 0),
|
?assert(CurrB > 0),
|
||||||
|
@ -95,11 +92,11 @@ t_get_metrics(_) ->
|
||||||
?assert(MaxC > 0)
|
?assert(MaxC > 0)
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
|
ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>).
|
||||||
|
|
||||||
t_reset_metrics(_) ->
|
t_reset_metrics(_) ->
|
||||||
Metrics = [a, b, c],
|
Metrics = [a, b, c],
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics),
|
ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, Metrics),
|
||||||
%% all the metrics are set to zero at start
|
%% all the metrics are set to zero at start
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
|
@ -114,14 +111,14 @@ t_reset_metrics(_) ->
|
||||||
c := 0
|
c := 0
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)
|
emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
|
||||||
),
|
),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
|
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
|
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
|
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
|
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
|
||||||
ct:sleep(1500),
|
ct:sleep(1500),
|
||||||
ok = emqx_plugin_libs_metrics:reset_metrics(?NAME, <<"testid">>),
|
ok = emqx_metrics_worker:reset_metrics(?NAME, <<"testid">>),
|
||||||
?LET(
|
?LET(
|
||||||
#{
|
#{
|
||||||
rate := #{
|
rate := #{
|
||||||
|
@ -135,7 +132,7 @@ t_reset_metrics(_) ->
|
||||||
c := 0
|
c := 0
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>),
|
emqx_metrics_worker:get_metrics(?NAME, <<"testid">>),
|
||||||
{
|
{
|
||||||
?assert(CurrA == 0),
|
?assert(CurrA == 0),
|
||||||
?assert(CurrB == 0),
|
?assert(CurrB == 0),
|
||||||
|
@ -145,19 +142,19 @@ t_reset_metrics(_) ->
|
||||||
?assert(MaxC == 0)
|
?assert(MaxC == 0)
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
|
ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>).
|
||||||
|
|
||||||
t_get_metrics_2(_) ->
|
t_get_metrics_2(_) ->
|
||||||
Metrics = [a, b, c],
|
Metrics = [a, b, c],
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(
|
ok = emqx_metrics_worker:create_metrics(
|
||||||
?NAME,
|
?NAME,
|
||||||
<<"testid">>,
|
<<"testid">>,
|
||||||
Metrics,
|
Metrics,
|
||||||
[a]
|
[a]
|
||||||
),
|
),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
|
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
|
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
|
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
rate := Rate = #{
|
rate := Rate = #{
|
||||||
|
@ -169,13 +166,13 @@ t_get_metrics_2(_) ->
|
||||||
c := 1
|
c := 1
|
||||||
}
|
}
|
||||||
} when map_size(Rate) =:= 1,
|
} when map_size(Rate) =:= 1,
|
||||||
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)
|
emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
|
||||||
),
|
),
|
||||||
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
|
ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>).
|
||||||
|
|
||||||
t_recreate_metrics(_) ->
|
t_recreate_metrics(_) ->
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, [a]),
|
ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, [a]),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
|
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, a),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
rate := R = #{
|
rate := R = #{
|
||||||
|
@ -185,12 +182,12 @@ t_recreate_metrics(_) ->
|
||||||
a := 1
|
a := 1
|
||||||
}
|
}
|
||||||
} when map_size(R) == 1 andalso map_size(C) == 1,
|
} when map_size(R) == 1 andalso map_size(C) == 1,
|
||||||
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)
|
emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
|
||||||
),
|
),
|
||||||
%% we create the metrics again, to add some counters
|
%% we create the metrics again, to add some counters
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, [a, b, c]),
|
ok = emqx_metrics_worker:create_metrics(?NAME, <<"testid">>, [a, b, c]),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
|
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, b),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
|
ok = emqx_metrics_worker:inc(?NAME, <<"testid">>, c),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
rate := R = #{
|
rate := R = #{
|
||||||
|
@ -202,42 +199,42 @@ t_recreate_metrics(_) ->
|
||||||
a := 1, b := 1, c := 1
|
a := 1, b := 1, c := 1
|
||||||
}
|
}
|
||||||
} when map_size(R) == 3 andalso map_size(C) == 3,
|
} when map_size(R) == 3 andalso map_size(C) == 3,
|
||||||
emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)
|
emqx_metrics_worker:get_metrics(?NAME, <<"testid">>)
|
||||||
),
|
),
|
||||||
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
|
ok = emqx_metrics_worker:clear_metrics(?NAME, <<"testid">>).
|
||||||
|
|
||||||
t_inc_matched(_) ->
|
t_inc_matched(_) ->
|
||||||
Metrics = ['rules.matched'],
|
Metrics = ['rules.matched'],
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>, Metrics),
|
ok = emqx_metrics_worker:create_metrics(?NAME, <<"rule1">>, Metrics),
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule2">>, Metrics),
|
ok = emqx_metrics_worker:create_metrics(?NAME, <<"rule2">>, Metrics),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
|
ok = emqx_metrics_worker:inc(?NAME, <<"rule1">>, 'rules.matched'),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'),
|
ok = emqx_metrics_worker:inc(?NAME, <<"rule2">>, 'rules.matched'),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'),
|
ok = emqx_metrics_worker:inc(?NAME, <<"rule2">>, 'rules.matched'),
|
||||||
?assertEqual(1, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')),
|
?assertEqual(1, emqx_metrics_worker:get(?NAME, <<"rule1">>, 'rules.matched')),
|
||||||
?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule2">>, 'rules.matched')),
|
?assertEqual(2, emqx_metrics_worker:get(?NAME, <<"rule2">>, 'rules.matched')),
|
||||||
?assertEqual(0, emqx_plugin_libs_metrics:get(?NAME, <<"rule3">>, 'rules.matched')),
|
?assertEqual(0, emqx_metrics_worker:get(?NAME, <<"rule3">>, 'rules.matched')),
|
||||||
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>),
|
ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule1">>),
|
||||||
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule2">>).
|
ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule2">>).
|
||||||
|
|
||||||
t_rate(_) ->
|
t_rate(_) ->
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>, ['rules.matched']),
|
ok = emqx_metrics_worker:create_metrics(?NAME, <<"rule1">>, ['rules.matched']),
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>, ['rules.matched']),
|
ok = emqx_metrics_worker:create_metrics(?NAME, <<"rule:2">>, ['rules.matched']),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
|
ok = emqx_metrics_worker:inc(?NAME, <<"rule1">>, 'rules.matched'),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
|
ok = emqx_metrics_worker:inc(?NAME, <<"rule1">>, 'rules.matched'),
|
||||||
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule:2">>, 'rules.matched'),
|
ok = emqx_metrics_worker:inc(?NAME, <<"rule:2">>, 'rules.matched'),
|
||||||
?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')),
|
?assertEqual(2, emqx_metrics_worker:get(?NAME, <<"rule1">>, 'rules.matched')),
|
||||||
ct:sleep(1000),
|
ct:sleep(1000),
|
||||||
?LET(
|
?LET(
|
||||||
#{'rules.matched' := #{max := Max, current := Current}},
|
#{'rules.matched' := #{max := Max, current := Current}},
|
||||||
emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
|
emqx_metrics_worker:get_rate(?NAME, <<"rule1">>),
|
||||||
{?assert(Max =< 2), ?assert(Current =< 2)}
|
{?assert(Max =< 2), ?assert(Current =< 2)}
|
||||||
),
|
),
|
||||||
ct:sleep(2100),
|
ct:sleep(2100),
|
||||||
?LET(
|
?LET(
|
||||||
#{'rules.matched' := #{max := Max, current := Current, last5m := Last5Min}},
|
#{'rules.matched' := #{max := Max, current := Current, last5m := Last5Min}},
|
||||||
emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
|
emqx_metrics_worker:get_rate(?NAME, <<"rule1">>),
|
||||||
{?assert(Max =< 2), ?assert(Current == 0), ?assert(Last5Min =< 0.67)}
|
{?assert(Max =< 2), ?assert(Current == 0), ?assert(Last5Min =< 0.67)}
|
||||||
),
|
),
|
||||||
ct:sleep(3000),
|
ct:sleep(3000),
|
||||||
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>),
|
ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule1">>),
|
||||||
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule:2">>).
|
ok = emqx_metrics_worker:clear_metrics(?NAME, <<"rule:2">>).
|
|
@ -943,7 +943,7 @@ lookup_from_local_node(ChainName, AuthenticatorID) ->
|
||||||
NodeId = node(self()),
|
NodeId = node(self()),
|
||||||
case emqx_authentication:lookup_authenticator(ChainName, AuthenticatorID) of
|
case emqx_authentication:lookup_authenticator(ChainName, AuthenticatorID) of
|
||||||
{ok, #{provider := Provider, state := State}} ->
|
{ok, #{provider := Provider, state := State}} ->
|
||||||
Metrics = emqx_plugin_libs_metrics:get_metrics(authn_metrics, AuthenticatorID),
|
Metrics = emqx_metrics_worker:get_metrics(authn_metrics, AuthenticatorID),
|
||||||
case lists:member(Provider, resource_provider()) of
|
case lists:member(Provider, resource_provider()) of
|
||||||
false ->
|
false ->
|
||||||
{ok, {NodeId, connected, Metrics, #{}}};
|
{ok, {NodeId, connected, Metrics, #{}}};
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
|
|
||||||
-define(PATH, [?CONF_NS_ATOM]).
|
-define(PATH, [?CONF_NS_ATOM]).
|
||||||
|
|
||||||
-define(HTTP_PORT, 33333).
|
-define(HTTP_PORT, 32333).
|
||||||
-define(HTTP_PATH, "/auth").
|
-define(HTTP_PATH, "/auth").
|
||||||
-define(CREDENTIALS, #{
|
-define(CREDENTIALS, #{
|
||||||
username => <<"plain">>,
|
username => <<"plain">>,
|
||||||
|
@ -178,7 +178,7 @@ t_destroy(_Config) ->
|
||||||
t_update(_Config) ->
|
t_update(_Config) ->
|
||||||
CorrectConfig = raw_http_auth_config(),
|
CorrectConfig = raw_http_auth_config(),
|
||||||
IncorrectConfig =
|
IncorrectConfig =
|
||||||
CorrectConfig#{url => <<"http://127.0.0.1:33333/invalid">>},
|
CorrectConfig#{url => <<"http://127.0.0.1:32333/invalid">>},
|
||||||
|
|
||||||
{ok, _} = emqx:update_config(
|
{ok, _} = emqx:update_config(
|
||||||
?PATH,
|
?PATH,
|
||||||
|
@ -267,7 +267,7 @@ raw_http_auth_config() ->
|
||||||
|
|
||||||
backend => <<"http">>,
|
backend => <<"http">>,
|
||||||
method => <<"get">>,
|
method => <<"get">>,
|
||||||
url => <<"http://127.0.0.1:33333/auth">>,
|
url => <<"http://127.0.0.1:32333/auth">>,
|
||||||
body => #{<<"username">> => ?PH_USERNAME, <<"password">> => ?PH_PASSWORD},
|
body => #{<<"username">> => ?PH_USERNAME, <<"password">> => ?PH_PASSWORD},
|
||||||
headers => #{<<"X-Test-Header">> => <<"Test Value">>}
|
headers => #{<<"X-Test-Header">> => <<"Test Value">>}
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
|
|
||||||
-define(PATH, [?CONF_NS_ATOM]).
|
-define(PATH, [?CONF_NS_ATOM]).
|
||||||
|
|
||||||
-define(HTTPS_PORT, 33333).
|
-define(HTTPS_PORT, 32334).
|
||||||
-define(HTTPS_PATH, "/auth").
|
-define(HTTPS_PATH, "/auth").
|
||||||
-define(CREDENTIALS, #{
|
-define(CREDENTIALS, #{
|
||||||
username => <<"plain">>,
|
username => <<"plain">>,
|
||||||
|
@ -148,7 +148,7 @@ raw_https_auth_config(SpecificSSLOpts) ->
|
||||||
|
|
||||||
backend => <<"http">>,
|
backend => <<"http">>,
|
||||||
method => <<"get">>,
|
method => <<"get">>,
|
||||||
url => <<"https://127.0.0.1:33333/auth">>,
|
url => <<"https://127.0.0.1:32334/auth">>,
|
||||||
body => #{<<"username">> => ?PH_USERNAME, <<"password">> => ?PH_PASSWORD},
|
body => #{<<"username">> => ?PH_USERNAME, <<"password">> => ?PH_PASSWORD},
|
||||||
headers => #{<<"X-Test-Header">> => <<"Test Value">>},
|
headers => #{<<"X-Test-Header">> => <<"Test Value">>},
|
||||||
ssl => maps:merge(SSLOpts, SpecificSSLOpts)
|
ssl => maps:merge(SSLOpts, SpecificSSLOpts)
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
|
|
||||||
-define(AUTHN_ID, <<"mechanism:jwt">>).
|
-define(AUTHN_ID, <<"mechanism:jwt">>).
|
||||||
|
|
||||||
-define(JWKS_PORT, 33333).
|
-define(JWKS_PORT, 31333).
|
||||||
-define(JWKS_PATH, "/jwks.json").
|
-define(JWKS_PATH, "/jwks.json").
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
|
|
|
@ -173,7 +173,7 @@ do_post_config_update({?CMD_PREPEND, RawNewSource}, Sources) ->
|
||||||
InitedNewSource = init_source(get_source_by_type(type(RawNewSource), Sources)),
|
InitedNewSource = init_source(get_source_by_type(type(RawNewSource), Sources)),
|
||||||
%% create metrics
|
%% create metrics
|
||||||
TypeName = type(RawNewSource),
|
TypeName = type(RawNewSource),
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(
|
ok = emqx_metrics_worker:create_metrics(
|
||||||
authz_metrics,
|
authz_metrics,
|
||||||
TypeName,
|
TypeName,
|
||||||
[matched, allow, deny, ignore],
|
[matched, allow, deny, ignore],
|
||||||
|
@ -194,7 +194,7 @@ do_post_config_update({{?CMD_DELETE, Type}, _RawNewSource}, _Sources) ->
|
||||||
OldInitedSources = lookup(),
|
OldInitedSources = lookup(),
|
||||||
{OldSource, Front, Rear} = take(Type, OldInitedSources),
|
{OldSource, Front, Rear} = take(Type, OldInitedSources),
|
||||||
%% delete metrics
|
%% delete metrics
|
||||||
ok = emqx_plugin_libs_metrics:clear_metrics(authz_metrics, Type),
|
ok = emqx_metrics_worker:clear_metrics(authz_metrics, Type),
|
||||||
ok = ensure_resource_deleted(OldSource),
|
ok = ensure_resource_deleted(OldSource),
|
||||||
clear_certs(OldSource),
|
clear_certs(OldSource),
|
||||||
Front ++ Rear;
|
Front ++ Rear;
|
||||||
|
@ -268,7 +268,7 @@ init_source(#{type := Type} = Source) ->
|
||||||
|
|
||||||
init_metrics(Source) ->
|
init_metrics(Source) ->
|
||||||
TypeName = type(Source),
|
TypeName = type(Source),
|
||||||
emqx_plugin_libs_metrics:create_metrics(
|
emqx_metrics_worker:create_metrics(
|
||||||
authz_metrics,
|
authz_metrics,
|
||||||
TypeName,
|
TypeName,
|
||||||
[matched, allow, deny, ignore],
|
[matched, allow, deny, ignore],
|
||||||
|
@ -310,7 +310,7 @@ authorize(
|
||||||
ipaddr => IpAddress,
|
ipaddr => IpAddress,
|
||||||
topic => Topic
|
topic => Topic
|
||||||
}),
|
}),
|
||||||
emqx_plugin_libs_metrics:inc(authz_metrics, AuthzSource, allow),
|
emqx_metrics_worker:inc(authz_metrics, AuthzSource, allow),
|
||||||
emqx_metrics:inc(?METRIC_ALLOW),
|
emqx_metrics:inc(?METRIC_ALLOW),
|
||||||
{stop, allow};
|
{stop, allow};
|
||||||
{{matched, deny}, AuthzSource} ->
|
{{matched, deny}, AuthzSource} ->
|
||||||
|
@ -324,7 +324,7 @@ authorize(
|
||||||
ipaddr => IpAddress,
|
ipaddr => IpAddress,
|
||||||
topic => Topic
|
topic => Topic
|
||||||
}),
|
}),
|
||||||
emqx_plugin_libs_metrics:inc(authz_metrics, AuthzSource, deny),
|
emqx_metrics_worker:inc(authz_metrics, AuthzSource, deny),
|
||||||
emqx_metrics:inc(?METRIC_DENY),
|
emqx_metrics:inc(?METRIC_DENY),
|
||||||
{stop, deny};
|
{stop, deny};
|
||||||
nomatch ->
|
nomatch ->
|
||||||
|
@ -354,10 +354,10 @@ do_authorize(
|
||||||
[Connector = #{type := Type} | Tail]
|
[Connector = #{type := Type} | Tail]
|
||||||
) ->
|
) ->
|
||||||
Module = authz_module(Type),
|
Module = authz_module(Type),
|
||||||
emqx_plugin_libs_metrics:inc(authz_metrics, Type, matched),
|
emqx_metrics_worker:inc(authz_metrics, Type, matched),
|
||||||
case Module:authorize(Client, PubSub, Topic, Connector) of
|
case Module:authorize(Client, PubSub, Topic, Connector) of
|
||||||
nomatch ->
|
nomatch ->
|
||||||
emqx_plugin_libs_metrics:inc(authz_metrics, Type, ignore),
|
emqx_metrics_worker:inc(authz_metrics, Type, ignore),
|
||||||
do_authorize(Client, PubSub, Topic, Tail);
|
do_authorize(Client, PubSub, Topic, Tail);
|
||||||
Matched ->
|
Matched ->
|
||||||
{Matched, Type}
|
{Matched, Type}
|
||||||
|
|
|
@ -308,7 +308,7 @@ lookup_from_local_node(Type) ->
|
||||||
NodeId = node(self()),
|
NodeId = node(self()),
|
||||||
try emqx_authz:lookup(Type) of
|
try emqx_authz:lookup(Type) of
|
||||||
#{annotations := #{id := ResourceId}} ->
|
#{annotations := #{id := ResourceId}} ->
|
||||||
Metrics = emqx_plugin_libs_metrics:get_metrics(authz_metrics, Type),
|
Metrics = emqx_metrics_worker:get_metrics(authz_metrics, Type),
|
||||||
case emqx_resource:get_instance(ResourceId) of
|
case emqx_resource:get_instance(ResourceId) of
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
{error, {NodeId, not_found_resource}};
|
{error, {NodeId, not_found_resource}};
|
||||||
|
@ -316,7 +316,7 @@ lookup_from_local_node(Type) ->
|
||||||
{ok, {NodeId, Status, Metrics, ResourceMetrics}}
|
{ok, {NodeId, Status, Metrics, ResourceMetrics}}
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
Metrics = emqx_plugin_libs_metrics:get_metrics(authz_metrics, Type),
|
Metrics = emqx_metrics_worker:get_metrics(authz_metrics, Type),
|
||||||
{ok, {NodeId, connected, Metrics, #{}}}
|
{ok, {NodeId, connected, Metrics, #{}}}
|
||||||
catch
|
catch
|
||||||
_:Reason -> {error, {NodeId, list_to_binary(io_lib:format("~p", [Reason]))}}
|
_:Reason -> {error, {NodeId, list_to_binary(io_lib:format("~p", [Reason]))}}
|
||||||
|
|
|
@ -31,8 +31,8 @@ introduced_in() ->
|
||||||
|
|
||||||
-spec get_metrics(
|
-spec get_metrics(
|
||||||
node(),
|
node(),
|
||||||
emqx_plugin_libs_metrics:handler_name(),
|
emqx_metrics_worker:handler_name(),
|
||||||
emqx_plugin_libs_metrics:metric_id()
|
emqx_metrics_worker:metric_id()
|
||||||
) -> emqx_plugin_libs_metrics:metrics() | {badrpc, _}.
|
) -> emqx_metrics_worker:metrics() | {badrpc, _}.
|
||||||
get_metrics(Node, HandlerName, MetricId) ->
|
get_metrics(Node, HandlerName, MetricId) ->
|
||||||
rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [HandlerName, MetricId]).
|
rpc:call(Node, emqx_metrics_worker, get_metrics, [HandlerName, MetricId]).
|
||||||
|
|
|
@ -27,7 +27,7 @@
|
||||||
config := resource_config(),
|
config := resource_config(),
|
||||||
state := resource_state(),
|
state := resource_state(),
|
||||||
status := resource_connection_status(),
|
status := resource_connection_status(),
|
||||||
metrics := emqx_plugin_libs_metrics:metrics()
|
metrics := emqx_metrics_worker:metrics()
|
||||||
}.
|
}.
|
||||||
-type resource_group() :: binary().
|
-type resource_group() :: binary().
|
||||||
-type create_opts() :: #{
|
-type create_opts() :: #{
|
||||||
|
|
|
@ -250,12 +250,12 @@ query(InstId, Request, AfterQuery) ->
|
||||||
{ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
|
{ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
|
||||||
%% the resource state is readonly to Module:on_query/4
|
%% the resource state is readonly to Module:on_query/4
|
||||||
%% and the `after_query()` functions should be thread safe
|
%% and the `after_query()` functions should be thread safe
|
||||||
ok = emqx_plugin_libs_metrics:inc(resource_metrics, InstId, matched),
|
ok = emqx_metrics_worker:inc(resource_metrics, InstId, matched),
|
||||||
try
|
try
|
||||||
Mod:on_query(InstId, Request, AfterQuery, ResourceState)
|
Mod:on_query(InstId, Request, AfterQuery, ResourceState)
|
||||||
catch
|
catch
|
||||||
Err:Reason:ST ->
|
Err:Reason:ST ->
|
||||||
emqx_plugin_libs_metrics:inc(resource_metrics, InstId, exception),
|
emqx_metrics_worker:inc(resource_metrics, InstId, exception),
|
||||||
erlang:raise(Err, Reason, ST)
|
erlang:raise(Err, Reason, ST)
|
||||||
end;
|
end;
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
|
@ -418,8 +418,8 @@ filter_instances(Filter) ->
|
||||||
[Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
|
[Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
|
||||||
|
|
||||||
inc_metrics_funcs(InstId) ->
|
inc_metrics_funcs(InstId) ->
|
||||||
OnFailed = [{fun emqx_plugin_libs_metrics:inc/3, [resource_metrics, InstId, failed]}],
|
OnFailed = [{fun emqx_metrics_worker:inc/3, [resource_metrics, InstId, failed]}],
|
||||||
OnSucc = [{fun emqx_plugin_libs_metrics:inc/3, [resource_metrics, InstId, success]}],
|
OnSucc = [{fun emqx_metrics_worker:inc/3, [resource_metrics, InstId, success]}],
|
||||||
{OnSucc, OnFailed}.
|
{OnSucc, OnFailed}.
|
||||||
|
|
||||||
call_instance(InstId, Query) ->
|
call_instance(InstId, Query) ->
|
||||||
|
|
|
@ -82,10 +82,10 @@ make_test_id() ->
|
||||||
<<?TEST_ID_PREFIX, RandId/binary>>.
|
<<?TEST_ID_PREFIX, RandId/binary>>.
|
||||||
|
|
||||||
get_metrics(InstId) ->
|
get_metrics(InstId) ->
|
||||||
emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId).
|
emqx_metrics_worker:get_metrics(resource_metrics, InstId).
|
||||||
|
|
||||||
reset_metrics(InstId) ->
|
reset_metrics(InstId) ->
|
||||||
emqx_plugin_libs_metrics:reset_metrics(resource_metrics, InstId).
|
emqx_metrics_worker:reset_metrics(resource_metrics, InstId).
|
||||||
|
|
||||||
force_lookup(InstId) ->
|
force_lookup(InstId) ->
|
||||||
{ok, _Group, Data} = lookup(InstId),
|
{ok, _Group, Data} = lookup(InstId),
|
||||||
|
@ -200,7 +200,7 @@ do_create(InstId, Group, ResourceType, Config, Opts) ->
|
||||||
{ok, already_created};
|
{ok, already_created};
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
ok = do_start(InstId, Group, ResourceType, Config, Opts),
|
ok = do_start(InstId, Group, ResourceType, Config, Opts),
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(
|
ok = emqx_metrics_worker:create_metrics(
|
||||||
resource_metrics,
|
resource_metrics,
|
||||||
InstId,
|
InstId,
|
||||||
[matched, success, failed, exception],
|
[matched, success, failed, exception],
|
||||||
|
@ -243,7 +243,7 @@ do_remove(Group, #{id := InstId} = Data, ClearMetrics) ->
|
||||||
_ = do_stop(Group, Data),
|
_ = do_stop(Group, Data),
|
||||||
ets:delete(emqx_resource_instance, InstId),
|
ets:delete(emqx_resource_instance, InstId),
|
||||||
case ClearMetrics of
|
case ClearMetrics of
|
||||||
true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId);
|
true -> ok = emqx_metrics_worker:clear_metrics(resource_metrics, InstId);
|
||||||
false -> ok
|
false -> ok
|
||||||
end,
|
end,
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -33,7 +33,7 @@ init([]) ->
|
||||||
_ = ets:new(emqx_resource_instance, TabOpts),
|
_ = ets:new(emqx_resource_instance, TabOpts),
|
||||||
|
|
||||||
SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
|
SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
|
||||||
Metrics = emqx_plugin_libs_metrics:child_spec(resource_metrics),
|
Metrics = emqx_metrics_worker:child_spec(resource_metrics),
|
||||||
|
|
||||||
Pool = ?RESOURCE_INST_MOD,
|
Pool = ?RESOURCE_INST_MOD,
|
||||||
Mod = ?RESOURCE_INST_MOD,
|
Mod = ?RESOURCE_INST_MOD,
|
||||||
|
|
|
@ -214,19 +214,19 @@ load_hooks_for_rule(#{from := Topics}) ->
|
||||||
lists:foreach(fun emqx_rule_events:load/1, Topics).
|
lists:foreach(fun emqx_rule_events:load/1, Topics).
|
||||||
|
|
||||||
maybe_add_metrics_for_rule(Id) ->
|
maybe_add_metrics_for_rule(Id) ->
|
||||||
case emqx_plugin_libs_metrics:has_metrics(rule_metrics, Id) of
|
case emqx_metrics_worker:has_metrics(rule_metrics, Id) of
|
||||||
true ->
|
true ->
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, Id, ?METRICS, ?RATE_METRICS)
|
ok = emqx_metrics_worker:create_metrics(rule_metrics, Id, ?METRICS, ?RATE_METRICS)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
clear_metrics_for_rule(Id) ->
|
clear_metrics_for_rule(Id) ->
|
||||||
ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, Id).
|
ok = emqx_metrics_worker:clear_metrics(rule_metrics, Id).
|
||||||
|
|
||||||
-spec reset_metrics_for_rule(rule_id()) -> ok.
|
-spec reset_metrics_for_rule(rule_id()) -> ok.
|
||||||
reset_metrics_for_rule(Id) ->
|
reset_metrics_for_rule(Id) ->
|
||||||
emqx_plugin_libs_metrics:reset_metrics(rule_metrics, Id).
|
emqx_metrics_worker:reset_metrics(rule_metrics, Id).
|
||||||
|
|
||||||
unload_hooks_for_rule(#{id := Id, from := Topics}) ->
|
unload_hooks_for_rule(#{id := Id, from := Topics}) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
|
|
|
@ -36,5 +36,5 @@ init([]) ->
|
||||||
type => worker,
|
type => worker,
|
||||||
modules => [emqx_rule_engine]
|
modules => [emqx_rule_engine]
|
||||||
},
|
},
|
||||||
Metrics = emqx_plugin_libs_metrics:child_spec(rule_metrics),
|
Metrics = emqx_metrics_worker:child_spec(rule_metrics),
|
||||||
{ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}.
|
{ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}.
|
||||||
|
|
|
@ -64,14 +64,14 @@ apply_rule_discard_result(Rule, Input) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
apply_rule(Rule = #{id := RuleID}, Input) ->
|
apply_rule(Rule = #{id := RuleID}, Input) ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.matched'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'sql.matched'),
|
||||||
clear_rule_payload(),
|
clear_rule_payload(),
|
||||||
try
|
try
|
||||||
do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID}))
|
do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID}))
|
||||||
catch
|
catch
|
||||||
%% ignore the errors if select or match failed
|
%% ignore the errors if select or match failed
|
||||||
_:Reason = {select_and_transform_error, Error} ->
|
_:Reason = {select_and_transform_error, Error} ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "SELECT_clause_exception",
|
msg => "SELECT_clause_exception",
|
||||||
rule_id => RuleID,
|
rule_id => RuleID,
|
||||||
|
@ -79,7 +79,7 @@ apply_rule(Rule = #{id := RuleID}, Input) ->
|
||||||
}),
|
}),
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
_:Reason = {match_conditions_error, Error} ->
|
_:Reason = {match_conditions_error, Error} ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "WHERE_clause_exception",
|
msg => "WHERE_clause_exception",
|
||||||
rule_id => RuleID,
|
rule_id => RuleID,
|
||||||
|
@ -87,7 +87,7 @@ apply_rule(Rule = #{id := RuleID}, Input) ->
|
||||||
}),
|
}),
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
_:Reason = {select_and_collect_error, Error} ->
|
_:Reason = {select_and_collect_error, Error} ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "FOREACH_clause_exception",
|
msg => "FOREACH_clause_exception",
|
||||||
rule_id => RuleID,
|
rule_id => RuleID,
|
||||||
|
@ -95,7 +95,7 @@ apply_rule(Rule = #{id := RuleID}, Input) ->
|
||||||
}),
|
}),
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
_:Reason = {match_incase_error, Error} ->
|
_:Reason = {match_incase_error, Error} ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "INCASE_clause_exception",
|
msg => "INCASE_clause_exception",
|
||||||
rule_id => RuleID,
|
rule_id => RuleID,
|
||||||
|
@ -103,7 +103,7 @@ apply_rule(Rule = #{id := RuleID}, Input) ->
|
||||||
}),
|
}),
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
Class:Error:StkTrace ->
|
Class:Error:StkTrace ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'sql.failed.exception'),
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "apply_rule_failed",
|
msg => "apply_rule_failed",
|
||||||
rule_id => RuleID,
|
rule_id => RuleID,
|
||||||
|
@ -141,13 +141,13 @@ do_apply_rule(
|
||||||
Collection2 = filter_collection(Input, InCase, DoEach, Collection),
|
Collection2 = filter_collection(Input, InCase, DoEach, Collection),
|
||||||
case Collection2 of
|
case Collection2 of
|
||||||
[] ->
|
[] ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result');
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'sql.failed.no_result');
|
||||||
_ ->
|
_ ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.passed')
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'sql.passed')
|
||||||
end,
|
end,
|
||||||
{ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]};
|
{ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]};
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'sql.failed.no_result'),
|
||||||
{error, nomatch}
|
{error, nomatch}
|
||||||
end;
|
end;
|
||||||
do_apply_rule(
|
do_apply_rule(
|
||||||
|
@ -171,10 +171,10 @@ do_apply_rule(
|
||||||
)
|
)
|
||||||
of
|
of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.passed'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'sql.passed'),
|
||||||
{ok, handle_output_list(RuleId, Outputs, Selected, Input)};
|
{ok, handle_output_list(RuleId, Outputs, Selected, Input)};
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'sql.failed.no_result'),
|
||||||
{error, nomatch}
|
{error, nomatch}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -316,21 +316,21 @@ handle_output_list(RuleId, Outputs, Selected, Envs) ->
|
||||||
[handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs].
|
[handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs].
|
||||||
|
|
||||||
handle_output(RuleId, OutId, Selected, Envs) ->
|
handle_output(RuleId, OutId, Selected, Envs) ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.total'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.total'),
|
||||||
try
|
try
|
||||||
Result = do_handle_output(OutId, Selected, Envs),
|
Result = do_handle_output(OutId, Selected, Envs),
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.success'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.success'),
|
||||||
Result
|
Result
|
||||||
catch
|
catch
|
||||||
throw:out_of_service ->
|
throw:out_of_service ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed'),
|
||||||
ok = emqx_plugin_libs_metrics:inc(
|
ok = emqx_metrics_worker:inc(
|
||||||
rule_metrics, RuleId, 'outputs.failed.out_of_service'
|
rule_metrics, RuleId, 'outputs.failed.out_of_service'
|
||||||
),
|
),
|
||||||
?SLOG(warning, #{msg => "out_of_service", output => OutId});
|
?SLOG(warning, #{msg => "out_of_service", output => OutId});
|
||||||
Err:Reason:ST ->
|
Err:Reason:ST ->
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed'),
|
||||||
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.unknown'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed.unknown'),
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "output_failed",
|
msg => "output_failed",
|
||||||
output => OutId,
|
output => OutId,
|
||||||
|
|
|
@ -2,10 +2,13 @@
|
||||||
|
|
||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
|
|
||||||
files="$(git diff --cached --name-only | grep -E '.*\.erl' || true)"
|
OPT="${1:--c}"
|
||||||
if [[ "${files}" == '' ]]; then
|
|
||||||
|
files_dirty="$(git diff --name-only | grep -E '.*\.erl' || true)"
|
||||||
|
files_cached="$(git diff --cached --name-only | grep -E '.*\.erl' || true)"
|
||||||
|
if [[ "${files_dirty}" == '' ]] && [[ "${files_cached}" == '' ]]; then
|
||||||
exit 0
|
exit 0
|
||||||
fi
|
fi
|
||||||
files="$(echo -e "$files" | xargs)"
|
files="$(echo -e "${files_dirty} \n ${files_cached}" | xargs)"
|
||||||
# shellcheck disable=SC2086
|
# shellcheck disable=SC2086
|
||||||
./scripts/erlfmt -c $files
|
./scripts/erlfmt $OPT $files
|
||||||
|
|
Loading…
Reference in New Issue