Merge pull request #7464 from savonarola/emqx_topic_metrics_api-cov

chore(emqx_modules): add emqx_topic_metrics_api tests
This commit is contained in:
Ilya Averyanov 2022-03-31 11:27:58 +03:00 committed by GitHub
commit a6031d6695
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 408 additions and 265 deletions

View File

@ -18,15 +18,12 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/3, uri/1]).
-include("emqx_authn.hrl"). -include("emqx_authn.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5").
-define(BASE_PATH, "api").
-define(TCP_DEFAULT, 'tcp:default'). -define(TCP_DEFAULT, 'tcp:default').
-define( -define(
@ -71,16 +68,7 @@ end_per_suite(_Config) ->
ok. ok.
set_special_configs(emqx_dashboard) -> set_special_configs(emqx_dashboard) ->
Config = #{ emqx_dashboard_api_test_helpers:set_default_config();
default_username => <<"admin">>,
default_password => <<"public">>,
listeners => [#{
protocol => http,
port => 18083
}]
},
emqx_config:put([dashboard], Config),
ok;
set_special_configs(_App) -> set_special_configs(_App) ->
ok. ok.
@ -467,34 +455,3 @@ test_authenticator_import_users(PathPrefix) ->
request(Method, Url) -> request(Method, Url) ->
request(Method, Url, []). request(Method, Url, []).
request(Method, Url, Body) ->
Request =
case Body of
[] ->
{Url, [auth_header()]};
_ ->
{Url, [auth_header()], "application/json", to_json(Body)}
end,
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], [{body_format, binary}]) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } ->
{ok, Code, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
uri() -> uri([]).
uri(Parts) when is_list(Parts) ->
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | Parts]).
auth_header() ->
Username = <<"admin">>,
Password = <<"public">>,
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
to_json(Map) ->
jiffy:encode(Map).

View File

@ -22,9 +22,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(HOST, "http://127.0.0.1:18083/"). -import(emqx_dashboard_api_test_helpers, [request/3, uri/1]).
-define(API_VERSION, "v5").
-define(BASE_PATH, "api").
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -48,16 +46,7 @@ end_per_suite(_Config) ->
ok. ok.
set_special_configs(emqx_dashboard) -> set_special_configs(emqx_dashboard) ->
Config = #{ emqx_dashboard_api_test_helpers:set_default_config();
default_username => <<"admin">>,
default_password => <<"public">>,
listeners => [#{
protocol => http,
port => 18083
}]
},
emqx_config:put([emqx_dashboard], Config),
ok;
set_special_configs(emqx_authz) -> set_special_configs(emqx_authz) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false), {ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny), {ok, _} = emqx:update_config([authorization, no_match], deny),
@ -246,35 +235,3 @@ t_api(_) ->
, []), , []),
?assertEqual(0, emqx_authz_mnesia:record_count()), ?assertEqual(0, emqx_authz_mnesia:record_count()),
ok. ok.
%%--------------------------------------------------------------------
%% HTTP Request
%%--------------------------------------------------------------------
request(Method, Url, Body) ->
Request = case Body of
[] -> {Url, [auth_header_()]};
_ -> {Url, [auth_header_()], "application/json", jsx:encode(Body)}
end,
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], [{body_format, binary}]) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } ->
{ok, Code, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
uri() -> uri([]).
uri(Parts) when is_list(Parts) ->
NParts = [E || E <- Parts],
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
get_sources(Result) -> jsx:decode(Result).
auth_header_() ->
Username = <<"admin">>,
Password = <<"public">>,
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.

View File

@ -18,14 +18,11 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-include("emqx_authz.hrl"). -import(emqx_dashboard_api_test_helpers, [request/3, uri/1]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5").
-define(BASE_PATH, "api").
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -49,16 +46,7 @@ end_per_suite(_Config) ->
ok. ok.
set_special_configs(emqx_dashboard) -> set_special_configs(emqx_dashboard) ->
Config = #{ emqx_dashboard_api_test_helpers:set_default_config();
default_username => <<"admin">>,
default_password => <<"public">>,
listeners => [#{
protocol => http,
port => 18083
}]
},
emqx_config:put([emqx_dashboard], Config),
ok;
set_special_configs(emqx_authz) -> set_special_configs(emqx_authz) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false), {ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny), {ok, _} = emqx:update_config([authorization, no_match], deny),
@ -100,38 +88,5 @@ t_api(_) ->
ok. ok.
%%--------------------------------------------------------------------
%% HTTP Request
%%--------------------------------------------------------------------
request(Method, Url, Body) ->
Request = case Body of
[] -> {Url, [auth_header_()]};
_ -> {Url, [auth_header_()], "application/json", jsx:encode(Body)}
end,
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], [{body_format, binary}]) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } ->
{ok, Code, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
uri() -> uri([]).
uri(Parts) when is_list(Parts) ->
NParts = [E || E <- Parts],
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
get_sources(Result) ->
maps:get(<<"sources">>, jsx:decode(Result), []).
auth_header_() ->
Username = <<"admin">>,
Password = <<"public">>,
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
stop_apps(Apps) -> stop_apps(Apps) ->
lists:foreach(fun application:stop/1, Apps). lists:foreach(fun application:stop/1, Apps).

View File

@ -18,14 +18,12 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-include("emqx_authz.hrl"). -import(emqx_dashboard_api_test_helpers, [request/3, uri/1]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_placeholder.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl").
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5").
-define(BASE_PATH, "api").
-define(MONGO_SINGLE_HOST, "mongo"). -define(MONGO_SINGLE_HOST, "mongo").
-define(MYSQL_HOST, "mysql:3306"). -define(MYSQL_HOST, "mysql:3306").
-define(PGSQL_HOST, "pgsql"). -define(PGSQL_HOST, "pgsql").
@ -133,16 +131,7 @@ end_per_suite(_Config) ->
ok. ok.
set_special_configs(emqx_dashboard) -> set_special_configs(emqx_dashboard) ->
Config = #{ emqx_dashboard_api_test_helpers:set_default_config();
default_username => <<"admin">>,
default_password => <<"public">>,
listeners => [#{
protocol => http,
port => 18083
}]
},
emqx_config:put([emqx_dashboard], Config),
ok;
set_special_configs(emqx_authz) -> set_special_configs(emqx_authz) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false), {ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny), {ok, _} = emqx:update_config([authorization, no_match], deny),
@ -363,39 +352,9 @@ t_move_source(_) ->
ok. ok.
%%--------------------------------------------------------------------
%% HTTP Request
%%--------------------------------------------------------------------
request(Method, Url, Body) ->
Request = case Body of
[] -> {Url, [auth_header_()]};
_ -> {Url, [auth_header_()], "application/json", jsx:encode(Body)}
end,
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], [{body_format, binary}]) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } ->
{ok, Code, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
uri() -> uri([]).
uri(Parts) when is_list(Parts) ->
NParts = [E || E <- Parts],
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
get_sources(Result) -> get_sources(Result) ->
maps:get(<<"sources">>, jsx:decode(Result), []). maps:get(<<"sources">>, jsx:decode(Result), []).
auth_header_() ->
Username = <<"admin">>,
Password = <<"public">>,
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
data_dir() -> emqx:data_dir(). data_dir() -> emqx:data_dir().
start_apps(Apps) -> start_apps(Apps) ->

View File

@ -18,6 +18,8 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/4, uri/1]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"bridges: {}">>). -define(CONF_DEFAULT, <<"bridges: {}">>).
@ -65,13 +67,7 @@ end_per_suite(_Config) ->
ok. ok.
set_special_configs(emqx_dashboard) -> set_special_configs(emqx_dashboard) ->
Listeners = [#{protocol => http, port => 18083}], emqx_dashboard_api_test_helpers:set_default_config(<<"bridge_admin">>);
Config = #{listeners => Listeners,
default_username => <<"bridge_admin">>,
default_password => <<"public">>
},
emqx_config:put([dashboard], Config),
ok;
set_special_configs(_) -> set_special_configs(_) ->
ok. ok.
@ -337,38 +333,8 @@ t_enable_disable_bridges(_) ->
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
%%--------------------------------------------------------------------
%% HTTP Request
%%--------------------------------------------------------------------
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5").
-define(BASE_PATH, "api").
request(Method, Url, Body) -> request(Method, Url, Body) ->
Request = case Body of request(<<"bridge_admin">>, Method, Url, Body).
[] -> {Url, [auth_header_()]};
_ -> {Url, [auth_header_()], "application/json", jsx:encode(Body)}
end,
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], [{body_format, binary}]) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } ->
{ok, Code, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
uri() -> uri([]).
uri(Parts) when is_list(Parts) ->
NParts = [E || E <- Parts],
?HOST ++ str(filename:join([?BASE_PATH, ?API_VERSION | NParts])).
auth_header_() ->
Username = <<"bridge_admin">>,
Password = <<"public">>,
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
operation_path(node, Oper, BridgeID) -> operation_path(node, Oper, BridgeID) ->
uri(["nodes", node(), "bridges", BridgeID, "operation", Oper]); uri(["nodes", node(), "bridges", BridgeID, "operation", Oper]);

View File

@ -18,6 +18,8 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/4, uri/1]).
-include("emqx/include/emqx.hrl"). -include("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
@ -102,13 +104,7 @@ end_per_suite(_Config) ->
ok. ok.
set_special_configs(emqx_dashboard) -> set_special_configs(emqx_dashboard) ->
Listeners = [#{protocol => http, port => 18083}], emqx_dashboard_api_test_helpers:set_default_config(<<"connector_admin">>);
Config = #{listeners => Listeners,
default_username => <<"connector_admin">>,
default_password => <<"public">>
},
emqx_config:put([dashboard], Config),
ok;
set_special_configs(_) -> set_special_configs(_) ->
ok. ok.
@ -648,38 +644,8 @@ t_egress_mqtt_bridge_with_rules(_) ->
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
%%--------------------------------------------------------------------
%% HTTP Request
%%--------------------------------------------------------------------
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5").
-define(BASE_PATH, "api").
request(Method, Url, Body) -> request(Method, Url, Body) ->
Request = case Body of request(<<"connector_admin">>, Method, Url, Body).
[] -> {Url, [auth_header_()]};
_ -> {Url, [auth_header_()], "application/json", jsx:encode(Body)}
end,
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], [{body_format, binary}]) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } ->
{ok, Code, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
uri() -> uri([]).
uri(Parts) when is_list(Parts) ->
NParts = [E || E <- Parts],
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
auth_header_() ->
Username = <<"connector_admin">>,
Password = <<"public">>,
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
wait_for_resource_ready(InstId, 0) -> wait_for_resource_ready(InstId, 0) ->
ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]), ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]),

View File

@ -0,0 +1,68 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_dashboard_api_test_helpers).
-export([set_default_config/0,
set_default_config/1,
request/3,
request/4,
uri/0,
uri/1]).
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5").
-define(BASE_PATH, "api").
set_default_config() ->
set_default_config(<<"admin">>).
set_default_config(DefaultUsername) ->
Config = #{listeners => [#{protocol => http,
port => 18083}],
default_username => DefaultUsername,
default_password => <<"public">>
},
emqx_config:put([dashboard], Config),
ok.
request(Method, Url, Body) ->
request(<<"admin">>, Method, Url, Body).
request(Username, Method, Url, Body) ->
Request = case Body of
[] -> {Url, [auth_header(Username)]};
_ -> {Url, [auth_header(Username)], "application/json", jsx:encode(Body)}
end,
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], [{body_format, binary}]) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } ->
{ok, Code, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
uri() -> uri([]).
uri(Parts) when is_list(Parts) ->
NParts = [E || E <- Parts],
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
auth_header(Username) ->
Password = <<"public">>,
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.

View File

@ -361,12 +361,11 @@ fields(metrics) ->
]. ].
topic(In) -> topic(In) ->
case In of Desc =
body -> case In of
Desc = <<"Raw topic string">>; body -> <<"Raw topic string">>;
path -> path -> <<"Notice: Topic string in url path must be encoded">>
Desc = <<"Notice: Topic string in url path must be encoded">> end,
end,
{topic, {topic,
mk( mk(
binary(), binary(),

View File

@ -0,0 +1,316 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_topic_metrics_api_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/3, uri/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(BASE_CONF, #{
<<"topic_metrics">> => []
}).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
lists:foreach(
fun emqx_modules_conf:remove_topic_metrics/1,
emqx_modules_conf:topic_metrics()
),
Config.
init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF),
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_modules, emqx_dashboard],
fun set_special_configs/1
),
%% When many tests run in an obscure order, it may occur that
%% `gen_rpc` started with its default settings before `emqx_conf`.
%% `gen_rpc` and `emqx_conf` have different default `port_discovery` modes,
%% so we reinitialize `gen_rpc` explicitly.
ok = application:stop(gen_rpc),
ok = application:start(gen_rpc),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_dashboard, emqx_modules]),
ok.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config();
set_special_configs(_App) ->
ok.
%%------------------------------------------------------------------------------
%% Tests
%%------------------------------------------------------------------------------
t_mqtt_topic_metrics_collection(_) ->
{ok, 200, Result0} = request(
get,
uri(["mqtt", "topic_metrics"])
),
?assertEqual(
[],
jsx:decode(Result0)
),
{ok, 200, _} = request(
post,
uri(["mqtt", "topic_metrics"]),
#{<<"topic">> => <<"topic/1/2">>}
),
{ok, 200, Result1} = request(
get,
uri(["mqtt", "topic_metrics"])
),
?assertMatch(
[
#{
<<"topic">> := <<"topic/1/2">>,
<<"metrics">> := #{}
}
],
jsx:decode(Result1)
),
?assertMatch(
{ok, 200, _},
request(
put,
uri(["mqtt", "topic_metrics"]),
#{
<<"topic">> => <<"topic/1/2">>,
<<"action">> => <<"reset">>
}
)
),
?assertMatch(
{ok, 200, _},
request(
put,
uri(["mqtt", "topic_metrics"]),
#{<<"action">> => <<"reset">>}
)
),
?assertMatch(
{ok, 404, _},
request(
put,
uri(["mqtt", "topic_metrics"]),
#{
<<"topic">> => <<"unknown_topic/1/2">>,
<<"action">> => <<"reset">>
}
)
),
?assertMatch(
{ok, 204, _},
request(
delete,
uri(["mqtt", "topic_metrics", emqx_http_lib:uri_encode("topic/1/2")])
)
).
t_mqtt_topic_metrics(_) ->
{ok, 200, _} = request(
post,
uri(["mqtt", "topic_metrics"]),
#{<<"topic">> => <<"topic/1/2">>}
),
{ok, 200, Result0} = request(
get,
uri(["mqtt", "topic_metrics"])
),
?assertMatch([_], jsx:decode(Result0)),
{ok, 200, Result1} = request(
get,
uri(["mqtt", "topic_metrics", emqx_http_lib:uri_encode("topic/1/2")])
),
?assertMatch(
#{
<<"topic">> := <<"topic/1/2">>,
<<"metrics">> := #{}
},
jsx:decode(Result1)
),
?assertMatch(
{ok, 204, _},
request(
delete,
uri(["mqtt", "topic_metrics", emqx_http_lib:uri_encode("topic/1/2")])
)
),
?assertMatch(
{ok, 404, _},
request(
get,
uri(["mqtt", "topic_metrics", emqx_http_lib:uri_encode("topic/1/2")])
)
),
?assertMatch(
{ok, 404, _},
request(
delete,
uri(["mqtt", "topic_metrics", emqx_http_lib:uri_encode("topic/1/2")])
)
).
t_bad_reqs(_) ->
%% empty topic
?assertMatch(
{ok, 400, _},
request(
post,
uri(["mqtt", "topic_metrics"]),
#{<<"topic">> => <<"">>}
)
),
%% wildcard
?assertMatch(
{ok, 400, _},
request(
post,
uri(["mqtt", "topic_metrics"]),
#{<<"topic">> => <<"foo/+/bar">>}
)
),
{ok, 200, _} = request(
post,
uri(["mqtt", "topic_metrics"]),
#{<<"topic">> => <<"topic/1/2">>}
),
%% existing topic
?assertMatch(
{ok, 400, _},
request(
post,
uri(["mqtt", "topic_metrics"]),
#{<<"topic">> => <<"topic/1/2">>}
)
),
ok = emqx_modules_conf:remove_topic_metrics(<<"topic/1/2">>),
%% limit
Responses = lists:map(
fun(N) ->
Topic = iolist_to_binary([
<<"topic/">>,
integer_to_binary(N)
]),
request(
post,
uri(["mqtt", "topic_metrics"]),
#{<<"topic">> => Topic}
)
end,
lists:seq(1, 513)
),
?assertMatch(
[{ok, 409, _}, {ok, 200, _} | _],
lists:reverse(Responses)
),
%% limit && wildcard
?assertMatch(
{ok, 400, _},
request(
post,
uri(["mqtt", "topic_metrics"]),
#{<<"topic">> => <<"a/+">>}
)
).
t_node_aggregation(_) ->
TwoNodeResult = [
#{
create_time => <<"2022-03-30T13:54:10+03:00">>,
metrics => #{'messages.dropped.count' => 1},
reset_time => <<"2022-03-30T13:54:10+03:00">>,
topic => <<"topic/1/2">>
},
#{
create_time => <<"2022-03-30T13:54:10+03:00">>,
metrics => #{'messages.dropped.count' => 2},
reset_time => <<"2022-03-30T13:54:10+03:00">>,
topic => <<"topic/1/2">>
}
],
meck:new(emqx_topic_metrics_proto_v1, [passthrough]),
meck:expect(emqx_topic_metrics_proto_v1, metrics, 2, {TwoNodeResult, []}),
{ok, 200, Result} = request(
get,
uri(["mqtt", "topic_metrics", emqx_http_lib:uri_encode("topic/1/2")])
),
?assertMatch(
#{
<<"topic">> := <<"topic/1/2">>,
<<"metrics">> := #{<<"messages.dropped.count">> := 3}
},
jsx:decode(Result)
),
meck:unload(emqx_topic_metrics_proto_v1).
t_badrpc(_) ->
meck:new(emqx_topic_metrics_proto_v1, [passthrough]),
meck:expect(emqx_topic_metrics_proto_v1, metrics, 2, {[], [node()]}),
?assertMatch(
{ok, 500, _},
request(
get,
uri(["mqtt", "topic_metrics", emqx_http_lib:uri_encode("topic/1/2")])
)
),
meck:unload(emqx_topic_metrics_proto_v1).
%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------
request(Method, Url) ->
request(Method, Url, []).