From f5cfefc0a55c7a05a5bd4f82fad2defc2a22ce36 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Sun, 16 Jan 2022 22:35:57 +0800 Subject: [PATCH 1/2] refactor(api): topic_metrics api swagger spec --- apps/emqx_modules/src/emqx_telemetry_api.erl | 4 +- .../src/emqx_topic_metrics_api.erl | 304 ++++++++++++------ 2 files changed, 204 insertions(+), 104 deletions(-) diff --git a/apps/emqx_modules/src/emqx_telemetry_api.erl b/apps/emqx_modules/src/emqx_telemetry_api.erl index db4e4620c..d879b083f 100644 --- a/apps/emqx_modules/src/emqx_telemetry_api.erl +++ b/apps/emqx_modules/src/emqx_telemetry_api.erl @@ -71,13 +71,13 @@ schema("/telemetry/data") -> }. status_schema(Desc) -> - mk(ref(?MODULE, status), #{desc => Desc}). + mk(ref(?MODULE, status), #{in => body, desc => Desc}). fields(status) -> [ { enable , mk( boolean() , #{ desc => <<"Telemetry status">> - , default => false + , default => true , example => false }) } diff --git a/apps/emqx_modules/src/emqx_topic_metrics_api.erl b/apps/emqx_modules/src/emqx_topic_metrics_api.erl index 1e82f7a94..9f826a8f9 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics_api.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics_api.erl @@ -18,121 +18,221 @@ -behaviour(minirest_api). --import(emqx_mgmt_util, [ properties/1 - , schema/1 - , object_schema/1 - , object_schema/2 - , object_array_schema/2 - , error_schema/2 - ]). +-include_lib("typerefl/include/types.hrl"). --export([api_spec/0]). +-import( hoconsc + , [ mk/2 + , ref/1 + , ref/2 + , array/1 + , map/2]). -export([ topic_metrics/2 , operate_topic_metrics/2 ]). +-export([ api_spec/0 + , paths/0 + , schema/1 + , fields/1 + ]). + -define(ERROR_TOPIC, 'ERROR_TOPIC'). - -define(EXCEED_LIMIT, 'EXCEED_LIMIT'). - -define(BAD_TOPIC, 'BAD_TOPIC'). - -define(BAD_REQUEST, 'BAD_REQUEST'). +-define(API_TAG_MQTT, [<<"mqtt">>]). + api_spec() -> - {[ - topic_metrics_api(), - operation_topic_metrics_api() - ],[]}. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). -properties() -> - properties([ - {topic, string}, - {create_time, string, <<"Date time, rfc3339">>}, - {reset_time, string, <<"Nullable. Date time, rfc3339.">>}, - {metrics, object, [{'messages.dropped.count', integer}, - {'messages.dropped.rate', number}, - {'messages.in.count', integer}, - {'messages.in.rate', number}, - {'messages.out.count', integer}, - {'messages.out.rate', number}, - {'messages.qos0.in.count', integer}, - {'messages.qos0.in.rate', number}, - {'messages.qos0.out.count', integer}, - {'messages.qos0.out.rate', number}, - {'messages.qos1.in.count', integer}, - {'messages.qos1.in.rate', number}, - {'messages.qos1.out.count', integer}, - {'messages.qos1.out.rate', number}, - {'messages.qos2.in.count', integer}, - {'messages.qos2.in.rate', number}, - {'messages.qos2.out.count', integer}, - {'messages.qos2.out.rate', number}]} - ]). +paths() -> + [ "/mqtt/topic_metrics" + , "/mqtt/topic_metrics/:topic" + ]. -topic_metrics_api() -> - MetaData = #{ - %% Get all nodes metrics and accumulate all of these - get => #{ - description => <<"List topic metrics">>, - responses => #{ - <<"200">> => object_array_schema(properties(), <<"List topic metrics">>) - } - }, - put => #{ - description => <<"Reset topic metrics by topic name, or all">>, - 'requestBody' => object_schema(properties([ - {topic, string, <<"no topic will reset all">>}, - {action, string, <<"Action, default reset">>, [reset]} - ])), - responses => #{ - <<"200">> => schema(<<"Reset topic metrics success">>), - <<"404">> => error_schema(<<"Topic not found">>, [?ERROR_TOPIC]) - } - }, - post => #{ - description => <<"Create topic metrics">>, - 'requestBody' => object_schema(properties([{topic, string}])), - responses => #{ - <<"200">> => schema(<<"Create topic metrics success">>), - <<"409">> => error_schema(<<"Topic metrics max limit">>, [?EXCEED_LIMIT]), - <<"400">> => error_schema( <<"Topic metrics already exist or bad topic">> - , [?BAD_REQUEST, ?BAD_TOPIC]) - } - } - }, - {"/mqtt/topic_metrics", MetaData, topic_metrics}. -operation_topic_metrics_api() -> - MetaData = #{ - get => #{ - description => <<"Get topic metrics">>, - parameters => [topic_param()], - responses => #{ - <<"200">> => object_schema(properties(), <<"Topic metrics">>), - <<"404">> => error_schema(<<"Topic not found">>, [?ERROR_TOPIC]) - }}, - delete => #{ - description => <<"Deregister topic metrics">>, - parameters => [topic_param()], - responses => #{ - <<"204">> => schema(<<"Deregister topic metrics">>), - <<"404">> => error_schema(<<"Topic not found">>, [?ERROR_TOPIC]) +schema("/mqtt/topic_metrics") -> + #{ 'operationId' => topic_metrics + , get => + #{ description => <<"List topic metrics">> + , tags => ?API_TAG_MQTT + , responses => + #{200 => mk(array(hoconsc:ref(topic_metrics)), #{ desc => <<"List topic metrics">>})} } - } - }, - {"/mqtt/topic_metrics/:topic", MetaData, operate_topic_metrics}. + , put => + #{ description => <<"Reset topic metrics by topic name, or all">> + , tags => ?API_TAG_MQTT + , 'requestBody' => emqx_dashboard_swagger:schema_with_examples( + ref(reset), + reset_examples()) + , responses => + #{ 204 => <<"Reset topic metrics success">> + , 404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>) + } + } + , post => + #{ description => <<"Create topic metrics">> + , tags => ?API_TAG_MQTT + , 'requestBody' => [topic(body)] + , responses => + #{ 204 => <<"Create topic metrics success">> + , 409 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], <<"Topic metrics max limit">>) + , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST, ?BAD_TOPIC], <<"Topic metrics already exist or bad topic">>) + } + } + }; +schema("/mqtt/topic_metrics/:topic") -> + #{ 'operationId' => operate_topic_metrics + , get => + #{ description => <<"Get topic metrics">> + , tags => ?API_TAG_MQTT + , parameters => [topic(path)] + , responses => + #{ 200 => mk(ref(topic_metrics), #{ desc => <<"Topic metrics">> }) + , 404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>) + } + } + , delete => + #{ description => <<"Remove the topic metrics">> + , tags => ?API_TAG_MQTT + , parameters => [topic(path)] + , responses => + #{ 204 => <<"Removed topic metrics successfully">>, + 404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>) + } + } + }. -topic_param() -> - #{ - name => topic, - in => path, - required => true, - description => <<"Notice: Topic string url must encode">>, - schema => #{type => string} +fields(reset) -> + [ {topic + , mk( binary() + , #{ desc => <<"Topic Name. If this paramter is not present, all created topic metrics will be reset.">> + , example => <<"testtopic/1">> + , nullable => true})} + , {action + , mk( string() + , #{ desc => <<"Action Name. Only as a \"reset\"">> + , enum => [reset] + , nullable => false + , example => <<"reset">>})} + ]; + +fields(topic_metrics) -> + [ { topic + , mk( binary() + , #{ desc => <<"MQTT Topic">> + , example => <<"testtopic/1">> + , nullable => false})}, + { create_time + , mk( emqx_schema:rfc3339_system_time() + , #{ desc => <<"Date time, rfc3339">> + , nullable => false + , example => <<"2022-01-14T21:48:47+08:00">>})}, + { reset_time + , mk( emqx_schema:rfc3339_system_time() + , #{ desc => <<"Nullable. Date time, rfc3339.">> + , nullable => true + , example => <<"2022-01-14T21:48:47+08:00">>})}, + { metrics + , mk( ref(metrics) + , #{ desc => <<"MQTT Topic Metrics">> + , nullable => false}) + } + ]; + +fields(metrics) -> + [ { 'messages.dropped.count' + , mk( integer(), #{ desc => <<"Message dropped count.">> + , example => 0})}, + { 'messages.dropped.rate' + , mk( number(), #{ desc => <<"Message dropped rate in 5s.">> + , example => 0})}, + { 'messages.in.count' + , mk( integer(), #{ desc => <<"Message received count.">> + , example => 0})}, + { 'messages.in.rate' + , mk( number(), #{ desc => <<"Message received rate in 5s">> + , example => 0})}, + { 'messages.out.count' + , mk( integer(), #{ desc => <<"Message sent count.">> + , example => 0})}, + { 'messages.out.rate' + , mk( number(), #{ desc => <<"Message sent rate in 5s.">> + , example => 0})}, + { 'messages.qos0.in.count' + , mk( integer(), #{ desc => <<"Message with QoS 0 received count.">> + , example => 0})}, + { 'messages.qos0.in.rate' + , mk( number(), #{ desc => <<"Message with QoS 0 received rate in 5s.">> + , example => 0})}, + { 'messages.qos0.out.count' + , mk( integer(), #{ desc => <<"Message with QoS 0 sent count.">> + , example => 0})}, + { 'messages.qos0.out.rate' + , mk( number(), #{ desc => <<"Message with QoS 0 sent rate in 5s.">> + , example => 0})}, + { 'messages.qos1.in.count' + , mk( integer(), #{ desc => <<"Message with QoS 1 received count.">> + , example => 0})}, + { 'messages.qos1.in.rate' + , mk( number(), #{ desc => <<"Message with QoS 1 received rate in 5s.">> + , example => 0})}, + { 'messages.qos1.out.count' + , mk( integer(), #{ desc => <<"Message with QoS 1 sent count.">> + , example => 0})}, + { 'messages.qos1.out.rate' + , mk( number(), #{ desc => <<"Message with QoS 1 sent rate in 5s.">> + , example => 0})}, + { 'messages.qos2.in.count' + , mk( integer(), #{ desc => <<"Message with QoS 2 sent count.">> + , example => 0})}, + { 'messages.qos2.in.rate' + , mk( number(), #{ desc => <<"Message with QoS 2 received rate in 5s.">> + , example => 0})}, + { 'messages.qos2.out.count' + , mk( integer(), #{ desc => <<"Message with QoS 2 sent count.">> + , example => 0})}, + { 'messages.qos2.out.rate' + , mk( number(), #{ desc => <<"Message with QoS 2 sent rate in 5s.">> + , example => 0})} + ]. + +topic(In) -> + case In of + body -> + Desc = <<"Raw topic string">>, + Example = "testtopic/1"; + path -> + Desc = <<"Notice Topic string in url path must encode">>, + Example = "testtopic%2F1" + end, + { topic + , mk( binary(), + #{ desc => Desc + , required => true + , in => In + , example => Example + }) }. +reset_examples() -> + #{ reset_specific_one_topic_metrics => + #{ summary => <<"reset_specific_one_topic_metrics">> + , value => + #{ topic => "testtopic/1" + , action => "reset" + } + } + , reset_all_topic_metrics => + #{ summary => <<"reset_all_topic_metrics">> + , value => + #{ action => "reset" + } + } + }. + %%-------------------------------------------------------------------- %% HTTP Callbacks %%-------------------------------------------------------------------- @@ -147,19 +247,19 @@ topic_metrics(get, _) -> topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) -> case reset(Topic) of - ok -> {200}; + ok -> {204}; {error, Reason} -> reason2httpresp(Reason) end; topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) -> reset(), - {200}; + {204}; topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) -> {400, 'BAD_REQUEST', <<"Topic can not be empty">>}; topic_metrics(post, #{body := #{<<"topic">> := Topic}}) -> case emqx_modules_conf:add_topic_metrics(Topic) of {ok, Topic} -> - {200}; + {204}; {error, Reason} -> reason2httpresp(Reason) end. @@ -174,7 +274,7 @@ operate_topic_metrics(get, #{bindings := #{topic := Topic0}}) -> operate_topic_metrics(delete, #{bindings := #{topic := Topic0}}) -> case emqx_modules_conf:remove_topic_metrics(emqx_http_lib:uri_decode(Topic0)) of - ok -> {200}; + ok -> {204}; {error, Reason} -> reason2httpresp(Reason) end. @@ -277,8 +377,8 @@ reason2httpresp(bad_topic) -> reason2httpresp({quota_exceeded, bad_topic}) -> Msg = list_to_binary( io_lib:format( - "Max topic metrics count is ~p, and topic cannot have wildcard", - [emqx_topic_metrics:max_limit()])), + "Max topic metrics count is ~p, and topic cannot have wildcard", + [emqx_topic_metrics:max_limit()])), {400, #{code => ?BAD_REQUEST, message => Msg}}; reason2httpresp(already_existed) -> Msg = <<"Topic already registered">>, From 31edd49f76c9b141332b6b55c00d3619d3de5036 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 17 Jan 2022 15:18:35 +0800 Subject: [PATCH 2/2] fix(topic_metrics): `POST` `PUT` need response --- .github/workflows/run_api_tests.yaml | 2 +- .../test/emqx_sn_protocol_SUITE.erl | 1 + .../src/emqx_topic_metrics_api.erl | 97 ++++++++++--------- 3 files changed, 54 insertions(+), 46 deletions(-) diff --git a/.github/workflows/run_api_tests.yaml b/.github/workflows/run_api_tests.yaml index e8ab99f57..c5a8b94cb 100644 --- a/.github/workflows/run_api_tests.yaml +++ b/.github/workflows/run_api_tests.yaml @@ -86,7 +86,7 @@ jobs: - uses: actions/checkout@v2 with: repository: emqx/emqx-fvt - ref: 1.0.3-dev2 + ref: 1.0.4-dev1 path: . - uses: actions/setup-java@v1 with: diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index ab333cf87..fded10e03 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -1799,6 +1799,7 @@ t_clients_api(_) -> %% kickout {204, _} = request(delete, "/gateway/mqttsn/clients/client_id_test1"), + timer:sleep(100), {200, #{data := []}} = request(get, "/gateway/mqttsn/clients"), send_disconnect_msg(Socket, undefined), diff --git a/apps/emqx_modules/src/emqx_topic_metrics_api.erl b/apps/emqx_modules/src/emqx_topic_metrics_api.erl index 9f826a8f9..24c516f60 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics_api.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics_api.erl @@ -31,6 +31,9 @@ , operate_topic_metrics/2 ]). +-export([ cluster_accumulation_metrics/0 + , cluster_accumulation_metrics/1]). + -export([ api_spec/0 , paths/0 , schema/1 @@ -40,6 +43,7 @@ -define(ERROR_TOPIC, 'ERROR_TOPIC'). -define(EXCEED_LIMIT, 'EXCEED_LIMIT'). -define(BAD_TOPIC, 'BAD_TOPIC'). +-define(BAD_RPC, 'BAD_RPC'). -define(BAD_REQUEST, 'BAD_REQUEST'). -define(API_TAG_MQTT, [<<"mqtt">>]). @@ -59,16 +63,16 @@ schema("/mqtt/topic_metrics") -> #{ description => <<"List topic metrics">> , tags => ?API_TAG_MQTT , responses => - #{200 => mk(array(hoconsc:ref(topic_metrics)), #{ desc => <<"List topic metrics">>})} + #{200 => mk(array(hoconsc:ref(topic_metrics)), #{ desc => <<"List all topic metrics">>})} } , put => - #{ description => <<"Reset topic metrics by topic name, or all">> + #{ description => <<"Reset topic metrics by topic name. Or reset all Topic Metrics">> , tags => ?API_TAG_MQTT , 'requestBody' => emqx_dashboard_swagger:schema_with_examples( ref(reset), reset_examples()) , responses => - #{ 204 => <<"Reset topic metrics success">> + #{ 204 => <<"Reset topic metrics successfully">> , 404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>) } } @@ -78,8 +82,8 @@ schema("/mqtt/topic_metrics") -> , 'requestBody' => [topic(body)] , responses => #{ 204 => <<"Create topic metrics success">> - , 409 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], <<"Topic metrics max limit">>) - , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST, ?BAD_TOPIC], <<"Topic metrics already exist or bad topic">>) + , 409 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], <<"Topic metrics exceeded max limit 512">>) + , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST, ?BAD_TOPIC], <<"Topic metrics already existed or bad topic">>) } } }; @@ -108,7 +112,7 @@ schema("/mqtt/topic_metrics/:topic") -> fields(reset) -> [ {topic , mk( binary() - , #{ desc => <<"Topic Name. If this paramter is not present, all created topic metrics will be reset.">> + , #{ desc => <<"Topic Name. If this paramter is not present, all created topic metrics will be reseted">> , example => <<"testtopic/1">> , nullable => true})} , {action @@ -122,80 +126,80 @@ fields(reset) -> fields(topic_metrics) -> [ { topic , mk( binary() - , #{ desc => <<"MQTT Topic">> + , #{ desc => <<"Topic Name">> , example => <<"testtopic/1">> , nullable => false})}, { create_time , mk( emqx_schema:rfc3339_system_time() - , #{ desc => <<"Date time, rfc3339">> + , #{ desc => <<"Topic Metrics created date time, in rfc3339">> , nullable => false , example => <<"2022-01-14T21:48:47+08:00">>})}, { reset_time , mk( emqx_schema:rfc3339_system_time() - , #{ desc => <<"Nullable. Date time, rfc3339.">> + , #{ desc => <<"Topic Metrics reset date time, in rfc3339. Nullable if never reseted">> , nullable => true , example => <<"2022-01-14T21:48:47+08:00">>})}, { metrics , mk( ref(metrics) - , #{ desc => <<"MQTT Topic Metrics">> + , #{ desc => <<"Topic Metrics fields">> , nullable => false}) } ]; fields(metrics) -> [ { 'messages.dropped.count' - , mk( integer(), #{ desc => <<"Message dropped count.">> + , mk( integer(), #{ desc => <<"Message dropped count">> , example => 0})}, { 'messages.dropped.rate' - , mk( number(), #{ desc => <<"Message dropped rate in 5s.">> + , mk( number(), #{ desc => <<"Message dropped rate in 5s">> , example => 0})}, { 'messages.in.count' - , mk( integer(), #{ desc => <<"Message received count.">> + , mk( integer(), #{ desc => <<"Message received count">> , example => 0})}, { 'messages.in.rate' , mk( number(), #{ desc => <<"Message received rate in 5s">> , example => 0})}, { 'messages.out.count' - , mk( integer(), #{ desc => <<"Message sent count.">> + , mk( integer(), #{ desc => <<"Message sent count">> , example => 0})}, { 'messages.out.rate' - , mk( number(), #{ desc => <<"Message sent rate in 5s.">> + , mk( number(), #{ desc => <<"Message sent rate in 5s">> , example => 0})}, { 'messages.qos0.in.count' - , mk( integer(), #{ desc => <<"Message with QoS 0 received count.">> + , mk( integer(), #{ desc => <<"Message with QoS 0 received count">> , example => 0})}, { 'messages.qos0.in.rate' - , mk( number(), #{ desc => <<"Message with QoS 0 received rate in 5s.">> + , mk( number(), #{ desc => <<"Message with QoS 0 received rate in 5s">> , example => 0})}, { 'messages.qos0.out.count' - , mk( integer(), #{ desc => <<"Message with QoS 0 sent count.">> + , mk( integer(), #{ desc => <<"Message with QoS 0 sent count">> , example => 0})}, { 'messages.qos0.out.rate' - , mk( number(), #{ desc => <<"Message with QoS 0 sent rate in 5s.">> + , mk( number(), #{ desc => <<"Message with QoS 0 sent rate in 5s">> , example => 0})}, { 'messages.qos1.in.count' - , mk( integer(), #{ desc => <<"Message with QoS 1 received count.">> + , mk( integer(), #{ desc => <<"Message with QoS 1 received count">> , example => 0})}, { 'messages.qos1.in.rate' - , mk( number(), #{ desc => <<"Message with QoS 1 received rate in 5s.">> + , mk( number(), #{ desc => <<"Message with QoS 1 received rate in 5s">> , example => 0})}, { 'messages.qos1.out.count' - , mk( integer(), #{ desc => <<"Message with QoS 1 sent count.">> + , mk( integer(), #{ desc => <<"Message with QoS 1 sent count">> , example => 0})}, { 'messages.qos1.out.rate' - , mk( number(), #{ desc => <<"Message with QoS 1 sent rate in 5s.">> + , mk( number(), #{ desc => <<"Message with QoS 1 sent rate in 5s">> , example => 0})}, { 'messages.qos2.in.count' - , mk( integer(), #{ desc => <<"Message with QoS 2 sent count.">> + , mk( integer(), #{ desc => <<"Message with QoS 2 sent count">> , example => 0})}, { 'messages.qos2.in.rate' - , mk( number(), #{ desc => <<"Message with QoS 2 received rate in 5s.">> + , mk( number(), #{ desc => <<"Message with QoS 2 received rate in 5s">> , example => 0})}, { 'messages.qos2.out.count' - , mk( integer(), #{ desc => <<"Message with QoS 2 sent count.">> + , mk( integer(), #{ desc => <<"Message with QoS 2 sent count">> , example => 0})}, { 'messages.qos2.out.rate' - , mk( number(), #{ desc => <<"Message with QoS 2 sent rate in 5s.">> + , mk( number(), #{ desc => <<"Message with QoS 2 sent rate in 5s">> , example => 0})} ]. @@ -205,7 +209,7 @@ topic(In) -> Desc = <<"Raw topic string">>, Example = "testtopic/1"; path -> - Desc = <<"Notice Topic string in url path must encode">>, + Desc = <<"Notice: Topic string in url path must be encoded">>, Example = "testtopic%2F1" end, { topic @@ -238,39 +242,31 @@ reset_examples() -> %%-------------------------------------------------------------------- topic_metrics(get, _) -> - case cluster_accumulation_metrics() of - {error, Reason} -> - {500, Reason}; - {ok, Metrics} -> - {200, Metrics} - end; + get_cluster_response([]); topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) -> case reset(Topic) of - ok -> {204}; - {error, Reason} -> reason2httpresp(Reason) + ok -> + get_cluster_response([Topic]); + {error, Reason} -> + reason2httpresp(Reason) end; topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) -> reset(), - {204}; + get_cluster_response([]); topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) -> {400, 'BAD_REQUEST', <<"Topic can not be empty">>}; topic_metrics(post, #{body := #{<<"topic">> := Topic}}) -> case emqx_modules_conf:add_topic_metrics(Topic) of {ok, Topic} -> - {204}; + get_cluster_response([Topic]); {error, Reason} -> reason2httpresp(Reason) end. operate_topic_metrics(get, #{bindings := #{topic := Topic0}}) -> - case cluster_accumulation_metrics(emqx_http_lib:uri_decode(Topic0)) of - {ok, Metrics} -> - {200, Metrics}; - {error, Reason} -> - reason2httpresp(Reason) - end; + get_cluster_response([emqx_http_lib:uri_decode(Topic0)]); operate_topic_metrics(delete, #{bindings := #{topic := Topic0}}) -> case emqx_modules_conf:remove_topic_metrics(emqx_http_lib:uri_decode(Topic0)) of @@ -297,7 +293,8 @@ cluster_accumulation_metrics(Topic) -> {SuccResList, []} -> case lists:filter(fun({error, _}) -> false; (_) -> true end, SuccResList) of - [] -> {error, topic_not_found}; + [] -> + {error, topic_not_found}; TopicMetrics -> NTopicMetrics = [ [T] || T <- TopicMetrics], [AccMetrics] = accumulate_nodes_metrics(NTopicMetrics), @@ -389,3 +386,13 @@ reason2httpresp(topic_not_found) -> reason2httpresp(not_found) -> Msg = <<"Topic not found">>, {404, #{code => ?ERROR_TOPIC, message => Msg}}. + +get_cluster_response(Args) -> + case erlang:apply(?MODULE, cluster_accumulation_metrics, Args) of + {error, {badrpc, RPCReason}} -> + {500, RPCReason}; + {error, Reason} when is_atom(Reason) -> + reason2httpresp(Reason); + {ok, Metrics} -> + {200, Metrics} + end.