Merge pull request #7467 from lafirest/test/exhook_metrics
test(exhook): add test case for exhook metrics
This commit is contained in:
commit
9ffc58071d
|
@ -33,7 +33,7 @@ cast(Hookpoint, Req) ->
|
||||||
|
|
||||||
cast(_, _, []) ->
|
cast(_, _, []) ->
|
||||||
ok;
|
ok;
|
||||||
cast(Hookpoint, Req, [ServerName|More]) ->
|
cast(Hookpoint, Req, [ServerName | More]) ->
|
||||||
%% XXX: Need a real asynchronous running
|
%% XXX: Need a real asynchronous running
|
||||||
_ = emqx_exhook_server:call(Hookpoint, Req,
|
_ = emqx_exhook_server:call(Hookpoint, Req,
|
||||||
emqx_exhook_mgr:server(ServerName)),
|
emqx_exhook_mgr:server(ServerName)),
|
||||||
|
@ -51,7 +51,7 @@ call_fold(Hookpoint, Req, AccFun) ->
|
||||||
|
|
||||||
call_fold(_, Req, _, []) ->
|
call_fold(_, Req, _, []) ->
|
||||||
{ok, Req};
|
{ok, Req};
|
||||||
call_fold(Hookpoint, Req, AccFun, [ServerName|More]) ->
|
call_fold(Hookpoint, Req, AccFun, [ServerName | More]) ->
|
||||||
Server = emqx_exhook_mgr:server(ServerName),
|
Server = emqx_exhook_mgr:server(ServerName),
|
||||||
case emqx_exhook_server:call(Hookpoint, Req, Server) of
|
case emqx_exhook_server:call(Hookpoint, Req, Server) of
|
||||||
{ok, Resp} ->
|
{ok, Resp} ->
|
||||||
|
|
|
@ -34,6 +34,7 @@
|
||||||
, window_rate :: integer()
|
, window_rate :: integer()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-type metrics() :: #metrics{}.
|
||||||
-type server_name() :: emqx_exhook_mgr:server_name().
|
-type server_name() :: emqx_exhook_mgr:server_name().
|
||||||
-type hookpoint() :: emqx_exhook_server:hookpoint().
|
-type hookpoint() :: emqx_exhook_server:hookpoint().
|
||||||
-type index() :: {server_name(), hookpoint()}.
|
-type index() :: {server_name(), hookpoint()}.
|
||||||
|
@ -72,16 +73,16 @@ new_metric_info() ->
|
||||||
succeed(Server, Hook) ->
|
succeed(Server, Hook) ->
|
||||||
inc(Server, Hook, #metrics.succeed,
|
inc(Server, Hook, #metrics.succeed,
|
||||||
#metrics{ index = {Server, Hook}
|
#metrics{ index = {Server, Hook}
|
||||||
, window_rate = 1
|
, window_rate = 0
|
||||||
, succeed = 1
|
, succeed = 0
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-spec failed(server_name(), hookpoint()) -> ok.
|
-spec failed(server_name(), hookpoint()) -> ok.
|
||||||
failed(Server, Hook) ->
|
failed(Server, Hook) ->
|
||||||
inc(Server, Hook, #metrics.failed,
|
inc(Server, Hook, #metrics.failed,
|
||||||
#metrics{ index = {Server, Hook}
|
#metrics{ index = {Server, Hook}
|
||||||
, window_rate = 1
|
, window_rate = 0
|
||||||
, failed = 1
|
, failed = 0
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-spec update(pos_integer()) -> true.
|
-spec update(pos_integer()) -> true.
|
||||||
|
@ -187,7 +188,7 @@ metrics_aggregate_by_key(Key, MetricsL) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-spec inc(server_name(), hookpoint(), pos_integer(), #metrics{}) -> ok.
|
-spec inc(server_name(), hookpoint(), pos_integer(), metrics()) -> ok.
|
||||||
inc(Server, Hook, Pos, Default) ->
|
inc(Server, Hook, Pos, Default) ->
|
||||||
Index = {Server, Hook},
|
Index = {Server, Hook},
|
||||||
_ = ets:update_counter(?HOOKS_METRICS,
|
_ = ets:update_counter(?HOOKS_METRICS,
|
||||||
|
|
|
@ -36,6 +36,9 @@
|
||||||
, failed_action/1
|
, failed_action/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-export([hk2func/1]).
|
||||||
|
-endif.
|
||||||
|
|
||||||
-type server() :: #{%% Server name (equal to grpc client channel name)
|
-type server() :: #{%% Server name (equal to grpc client channel name)
|
||||||
name := binary(),
|
name := binary(),
|
||||||
|
@ -105,7 +108,8 @@ load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts)
|
||||||
hookspec => HookSpecs,
|
hookspec => HookSpecs,
|
||||||
prefix => Prefix }};
|
prefix => Prefix }};
|
||||||
{error, _} = E ->
|
{error, _} = E ->
|
||||||
emqx_exhook_sup:stop_grpc_client_channel(Name), E
|
emqx_exhook_sup:stop_grpc_client_channel(Name),
|
||||||
|
E
|
||||||
end;
|
end;
|
||||||
{error, _} = E -> E
|
{error, _} = E -> E
|
||||||
end.
|
end.
|
||||||
|
@ -287,12 +291,20 @@ match_topic_filter(_, []) ->
|
||||||
match_topic_filter(TopicName, TopicFilter) ->
|
match_topic_filter(TopicName, TopicFilter) ->
|
||||||
lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter).
|
lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter).
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-define(CALL_PB_CLIENT(ChanneName, Fun, Req, Options),
|
||||||
|
apply(?PB_CLIENT_MOD, Fun, [Req, #{<<"channel">> => ChannName}, Options])).
|
||||||
|
-else.
|
||||||
|
-define(CALL_PB_CLIENT(ChanneName, Fun, Req, Options),
|
||||||
|
apply(?PB_CLIENT_MOD, Fun, [Req, Options])).
|
||||||
|
-endif.
|
||||||
|
|
||||||
-spec do_call(binary(), atom(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
|
-spec do_call(binary(), atom(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
|
||||||
do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) ->
|
do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) ->
|
||||||
Options = ReqOpts#{channel => ChannName},
|
Options = ReqOpts#{channel => ChannName},
|
||||||
?SLOG(debug, #{msg => "do_call", module => ?PB_CLIENT_MOD, function => Fun,
|
?SLOG(debug, #{msg => "do_call", module => ?PB_CLIENT_MOD, function => Fun,
|
||||||
req => Req, options => Options}),
|
req => Req, options => Options}),
|
||||||
case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of
|
case catch ?CALL_PB_CLIENT(ChanneName, Fun, Req, Options) of
|
||||||
{ok, Resp, Metadata} ->
|
{ok, Resp, Metadata} ->
|
||||||
?SLOG(debug, #{msg => "do_call_ok", resp => Resp, metadata => Metadata}),
|
?SLOG(debug, #{msg => "do_call_ok", resp => Resp, metadata => Metadata}),
|
||||||
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:succeed/2),
|
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:succeed/2),
|
||||||
|
|
|
@ -59,7 +59,7 @@ init_per_suite(Cfg) ->
|
||||||
meck:expect(emqx_alarm, deactivate, 3, ok),
|
meck:expect(emqx_alarm, deactivate, 3, ok),
|
||||||
|
|
||||||
_ = emqx_exhook_demo_svr:start(),
|
_ = emqx_exhook_demo_svr:start(),
|
||||||
ok = emqx_common_test_helpers:load_config(emqx_exhook_schema, ?CONF_DEFAULT),
|
load_cfg(?CONF_DEFAULT),
|
||||||
emqx_common_test_helpers:start_apps([emqx_exhook]),
|
emqx_common_test_helpers:start_apps([emqx_exhook]),
|
||||||
Cfg.
|
Cfg.
|
||||||
|
|
||||||
|
@ -86,6 +86,9 @@ end_per_testcase(_, Config) ->
|
||||||
end,
|
end,
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
load_cfg(Cfg) ->
|
||||||
|
ok = emqx_common_test_helpers:load_config(emqx_exhook_schema, Cfg).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases
|
%% Test cases
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -0,0 +1,220 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_exhook_metrics_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("emqx_exhook/include/emqx_exhook.hrl").
|
||||||
|
-define(SvrFun(SvrName, FuncName), {SvrName, FuncName}).
|
||||||
|
|
||||||
|
-define(TARGET_HOOK, 'message.publish').
|
||||||
|
|
||||||
|
-define(CONF, <<"
|
||||||
|
exhook {
|
||||||
|
servers = [
|
||||||
|
{ name = succed,
|
||||||
|
url = \"http://127.0.0.1:9000\"
|
||||||
|
},
|
||||||
|
{ name = failed,
|
||||||
|
failed_action = ignore,
|
||||||
|
url = \"http://127.0.0.1:9001\"
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
">>).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Setups
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Cfg) ->
|
||||||
|
application:load(emqx_conf),
|
||||||
|
meck:new(emqx_exhook_mgr, [non_strict, passthrough, no_link]),
|
||||||
|
meck:new(emqx_exhook_demo_svr, [non_strict, passthrough, no_link]),
|
||||||
|
meck:expect(emqx_exhook_mgr, refresh_tick, fun() -> ok end),
|
||||||
|
init_injections(hook_injects()),
|
||||||
|
|
||||||
|
emqx_exhook_SUITE:load_cfg(?CONF),
|
||||||
|
_ = emqx_exhook_demo_svr:start(),
|
||||||
|
_ = emqx_exhook_demo_svr:start(failed, 9001),
|
||||||
|
emqx_common_test_helpers:start_apps([emqx_exhook]),
|
||||||
|
Cfg.
|
||||||
|
|
||||||
|
end_per_suite(Cfg) ->
|
||||||
|
meck:unload(emqx_exhook_demo_svr),
|
||||||
|
meck:unload(emqx_exhook_mgr),
|
||||||
|
emqx_exhook_demo_svr:stop(),
|
||||||
|
emqx_exhook_demo_svr:stop(failed),
|
||||||
|
emqx_common_test_helpers:stop_apps([emqx_exhook]).
|
||||||
|
|
||||||
|
init_per_testcase(_, Config) ->
|
||||||
|
clear_metrics(),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Test cases
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
t_servers_metrics(_Cfg) ->
|
||||||
|
Test = fun(C) ->
|
||||||
|
Repeat = fun() ->
|
||||||
|
emqtt:publish(C, <<"/exhook/metrics">>, <<>>, qos0)
|
||||||
|
end,
|
||||||
|
repeat(Repeat, 10)
|
||||||
|
end,
|
||||||
|
with_connection(Test),
|
||||||
|
|
||||||
|
timer:sleep(200),
|
||||||
|
SM = emqx_exhook_metrics:server_metrics(<<"succed">>),
|
||||||
|
?assertMatch(#{failed := 0, succeed := 10}, SM),
|
||||||
|
|
||||||
|
FM = emqx_exhook_metrics:server_metrics(<<"failed">>),
|
||||||
|
?assertMatch(#{failed := 10, succeed := 0}, FM),
|
||||||
|
|
||||||
|
SvrsM = emqx_exhook_metrics:servers_metrics(),
|
||||||
|
?assertEqual(SM, maps:get(<<"succed">>, SvrsM)),
|
||||||
|
?assertEqual(FM, maps:get(<<"failed">>, SvrsM)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_rate(_) ->
|
||||||
|
Test = fun(C) ->
|
||||||
|
Repeat = fun() ->
|
||||||
|
emqtt:publish(C, <<"/exhook/metrics">>, <<>>, qos0)
|
||||||
|
end,
|
||||||
|
|
||||||
|
repeat(Repeat, 5),
|
||||||
|
timer:sleep(200),
|
||||||
|
emqx_exhook_metrics:update(timer:seconds(1)),
|
||||||
|
SM = emqx_exhook_metrics:server_metrics(<<"succed">>),
|
||||||
|
?assertMatch(#{rate := 5, max_rate := 5}, SM),
|
||||||
|
|
||||||
|
repeat(Repeat, 6),
|
||||||
|
timer:sleep(200),
|
||||||
|
emqx_exhook_metrics:update(timer:seconds(1)),
|
||||||
|
SM2 = emqx_exhook_metrics:server_metrics(<<"succed">>),
|
||||||
|
?assertMatch(#{rate := 6, max_rate := 6}, SM2),
|
||||||
|
|
||||||
|
repeat(Repeat, 3),
|
||||||
|
timer:sleep(200),
|
||||||
|
emqx_exhook_metrics:update(timer:seconds(1)),
|
||||||
|
SM3 = emqx_exhook_metrics:server_metrics(<<"succed">>),
|
||||||
|
?assertMatch(#{rate := 3, max_rate := 6}, SM3)
|
||||||
|
end,
|
||||||
|
with_connection(Test),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_hooks_metrics(_) ->
|
||||||
|
Test = fun(C) ->
|
||||||
|
Repeat = fun() ->
|
||||||
|
emqtt:publish(C, <<"/exhook/metrics">>, <<>>, qos0)
|
||||||
|
end,
|
||||||
|
|
||||||
|
repeat(Repeat, 5),
|
||||||
|
timer:sleep(200),
|
||||||
|
HM = emqx_exhook_metrics:hooks_metrics(<<"succed">>),
|
||||||
|
?assertMatch(#{'message.publish' :=
|
||||||
|
#{failed := 0, succeed := 5}}, HM)
|
||||||
|
end,
|
||||||
|
with_connection(Test),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_on_server_deleted(_) ->
|
||||||
|
|
||||||
|
Test = fun(C) ->
|
||||||
|
Repeat = fun() ->
|
||||||
|
emqtt:publish(C, <<"/exhook/metrics">>, <<>>, qos0)
|
||||||
|
end,
|
||||||
|
repeat(Repeat, 10)
|
||||||
|
end,
|
||||||
|
with_connection(Test),
|
||||||
|
|
||||||
|
timer:sleep(200),
|
||||||
|
SM = emqx_exhook_metrics:server_metrics(<<"succed">>),
|
||||||
|
?assertMatch(#{failed := 0, succeed := 10}, SM),
|
||||||
|
|
||||||
|
emqx_exhook_metrics:on_server_deleted(<<"succed">>),
|
||||||
|
SM2 = emqx_exhook_metrics:server_metrics(<<"succed">>),
|
||||||
|
?assertMatch(#{failed := 0, succeed := 0}, SM2),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Utils
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
clear_metrics() ->
|
||||||
|
ets:delete_all_objects(?HOOKS_METRICS).
|
||||||
|
|
||||||
|
init_injections(Injects) ->
|
||||||
|
lists:map(fun({Name, _}) ->
|
||||||
|
Str = erlang:atom_to_list(Name),
|
||||||
|
case lists:prefix("on_", Str) of
|
||||||
|
true ->
|
||||||
|
Action = fun(Req, #{<<"channel">> := SvrName} = Md) ->
|
||||||
|
case maps:get(?SvrFun(SvrName, Name), Injects, undefined) of
|
||||||
|
undefined ->
|
||||||
|
meck:passthrough([Req, Md]);
|
||||||
|
Injection ->
|
||||||
|
Injection(Req, Md)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
|
meck:expect(emqx_exhook_demo_svr, Name, Action);
|
||||||
|
_ ->
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
emqx_exhook_demo_svr:module_info(exports)).
|
||||||
|
|
||||||
|
hook_injects() ->
|
||||||
|
#{?SvrFun(<<"failed">>, emqx_exhook_server:hk2func(?TARGET_HOOK)) =>
|
||||||
|
fun(_Req, _Md) ->
|
||||||
|
{error, "Error due to test"}
|
||||||
|
end,
|
||||||
|
?SvrFun(<<"failed">>, on_provider_loaded) =>
|
||||||
|
fun(_Req, Md) ->
|
||||||
|
{ok, #{hooks => [#{name => <<"message.publish">>}]}, Md}
|
||||||
|
end,
|
||||||
|
?SvrFun(<<"succed">>, on_provider_loaded) =>
|
||||||
|
fun(_Req, Md) ->
|
||||||
|
{ok, #{hooks => [#{name => <<"message.publish">>}]}, Md}
|
||||||
|
end
|
||||||
|
}.
|
||||||
|
|
||||||
|
with_connection(Fun) ->
|
||||||
|
{ok, C} = emqtt:start_link([{host, "localhost"},
|
||||||
|
{port, 1883},
|
||||||
|
{username, <<"admin">>},
|
||||||
|
{clientid, <<"exhook_tester">>}]),
|
||||||
|
{ok, _} = emqtt:connect(C),
|
||||||
|
try
|
||||||
|
Fun(C)
|
||||||
|
catch Type:Error:Trace ->
|
||||||
|
emqtt:stop(C),
|
||||||
|
erlang:raise(Type, Error, Trace)
|
||||||
|
end.
|
||||||
|
|
||||||
|
repeat(_Fun, 0) ->
|
||||||
|
ok;
|
||||||
|
repeat(Fun, N) ->
|
||||||
|
Fun(),
|
||||||
|
repeat(Fun, N - 1).
|
Loading…
Reference in New Issue