fix(topic_metrics): `POST` `PUT` need response

This commit is contained in:
JimMoen 2022-01-17 15:18:35 +08:00
parent f5cfefc0a5
commit 31edd49f76
3 changed files with 54 additions and 46 deletions

View File

@ -86,7 +86,7 @@ jobs:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
with: with:
repository: emqx/emqx-fvt repository: emqx/emqx-fvt
ref: 1.0.3-dev2 ref: 1.0.4-dev1
path: . path: .
- uses: actions/setup-java@v1 - uses: actions/setup-java@v1
with: with:

View File

@ -1799,6 +1799,7 @@ t_clients_api(_) ->
%% kickout %% kickout
{204, _} = {204, _} =
request(delete, "/gateway/mqttsn/clients/client_id_test1"), request(delete, "/gateway/mqttsn/clients/client_id_test1"),
timer:sleep(100),
{200, #{data := []}} = request(get, "/gateway/mqttsn/clients"), {200, #{data := []}} = request(get, "/gateway/mqttsn/clients"),
send_disconnect_msg(Socket, undefined), send_disconnect_msg(Socket, undefined),

View File

@ -31,6 +31,9 @@
, operate_topic_metrics/2 , operate_topic_metrics/2
]). ]).
-export([ cluster_accumulation_metrics/0
, cluster_accumulation_metrics/1]).
-export([ api_spec/0 -export([ api_spec/0
, paths/0 , paths/0
, schema/1 , schema/1
@ -40,6 +43,7 @@
-define(ERROR_TOPIC, 'ERROR_TOPIC'). -define(ERROR_TOPIC, 'ERROR_TOPIC').
-define(EXCEED_LIMIT, 'EXCEED_LIMIT'). -define(EXCEED_LIMIT, 'EXCEED_LIMIT').
-define(BAD_TOPIC, 'BAD_TOPIC'). -define(BAD_TOPIC, 'BAD_TOPIC').
-define(BAD_RPC, 'BAD_RPC').
-define(BAD_REQUEST, 'BAD_REQUEST'). -define(BAD_REQUEST, 'BAD_REQUEST').
-define(API_TAG_MQTT, [<<"mqtt">>]). -define(API_TAG_MQTT, [<<"mqtt">>]).
@ -59,16 +63,16 @@ schema("/mqtt/topic_metrics") ->
#{ description => <<"List topic metrics">> #{ description => <<"List topic metrics">>
, tags => ?API_TAG_MQTT , tags => ?API_TAG_MQTT
, responses => , responses =>
#{200 => mk(array(hoconsc:ref(topic_metrics)), #{ desc => <<"List topic metrics">>})} #{200 => mk(array(hoconsc:ref(topic_metrics)), #{ desc => <<"List all topic metrics">>})}
} }
, put => , put =>
#{ description => <<"Reset topic metrics by topic name, or all">> #{ description => <<"Reset topic metrics by topic name. Or reset all Topic Metrics">>
, tags => ?API_TAG_MQTT , tags => ?API_TAG_MQTT
, 'requestBody' => emqx_dashboard_swagger:schema_with_examples( , 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
ref(reset), ref(reset),
reset_examples()) reset_examples())
, responses => , responses =>
#{ 204 => <<"Reset topic metrics success">> #{ 204 => <<"Reset topic metrics successfully">>
, 404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>) , 404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>)
} }
} }
@ -78,8 +82,8 @@ schema("/mqtt/topic_metrics") ->
, 'requestBody' => [topic(body)] , 'requestBody' => [topic(body)]
, responses => , responses =>
#{ 204 => <<"Create topic metrics success">> #{ 204 => <<"Create topic metrics success">>
, 409 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], <<"Topic metrics max limit">>) , 409 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], <<"Topic metrics exceeded max limit 512">>)
, 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST, ?BAD_TOPIC], <<"Topic metrics already exist or bad topic">>) , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST, ?BAD_TOPIC], <<"Topic metrics already existed or bad topic">>)
} }
} }
}; };
@ -108,7 +112,7 @@ schema("/mqtt/topic_metrics/:topic") ->
fields(reset) -> fields(reset) ->
[ {topic [ {topic
, mk( binary() , mk( binary()
, #{ desc => <<"Topic Name. If this paramter is not present, all created topic metrics will be reset.">> , #{ desc => <<"Topic Name. If this paramter is not present, all created topic metrics will be reseted">>
, example => <<"testtopic/1">> , example => <<"testtopic/1">>
, nullable => true})} , nullable => true})}
, {action , {action
@ -122,80 +126,80 @@ fields(reset) ->
fields(topic_metrics) -> fields(topic_metrics) ->
[ { topic [ { topic
, mk( binary() , mk( binary()
, #{ desc => <<"MQTT Topic">> , #{ desc => <<"Topic Name">>
, example => <<"testtopic/1">> , example => <<"testtopic/1">>
, nullable => false})}, , nullable => false})},
{ create_time { create_time
, mk( emqx_schema:rfc3339_system_time() , mk( emqx_schema:rfc3339_system_time()
, #{ desc => <<"Date time, rfc3339">> , #{ desc => <<"Topic Metrics created date time, in rfc3339">>
, nullable => false , nullable => false
, example => <<"2022-01-14T21:48:47+08:00">>})}, , example => <<"2022-01-14T21:48:47+08:00">>})},
{ reset_time { reset_time
, mk( emqx_schema:rfc3339_system_time() , mk( emqx_schema:rfc3339_system_time()
, #{ desc => <<"Nullable. Date time, rfc3339.">> , #{ desc => <<"Topic Metrics reset date time, in rfc3339. Nullable if never reseted">>
, nullable => true , nullable => true
, example => <<"2022-01-14T21:48:47+08:00">>})}, , example => <<"2022-01-14T21:48:47+08:00">>})},
{ metrics { metrics
, mk( ref(metrics) , mk( ref(metrics)
, #{ desc => <<"MQTT Topic Metrics">> , #{ desc => <<"Topic Metrics fields">>
, nullable => false}) , nullable => false})
} }
]; ];
fields(metrics) -> fields(metrics) ->
[ { 'messages.dropped.count' [ { 'messages.dropped.count'
, mk( integer(), #{ desc => <<"Message dropped count.">> , mk( integer(), #{ desc => <<"Message dropped count">>
, example => 0})}, , example => 0})},
{ 'messages.dropped.rate' { 'messages.dropped.rate'
, mk( number(), #{ desc => <<"Message dropped rate in 5s.">> , mk( number(), #{ desc => <<"Message dropped rate in 5s">>
, example => 0})}, , example => 0})},
{ 'messages.in.count' { 'messages.in.count'
, mk( integer(), #{ desc => <<"Message received count.">> , mk( integer(), #{ desc => <<"Message received count">>
, example => 0})}, , example => 0})},
{ 'messages.in.rate' { 'messages.in.rate'
, mk( number(), #{ desc => <<"Message received rate in 5s">> , mk( number(), #{ desc => <<"Message received rate in 5s">>
, example => 0})}, , example => 0})},
{ 'messages.out.count' { 'messages.out.count'
, mk( integer(), #{ desc => <<"Message sent count.">> , mk( integer(), #{ desc => <<"Message sent count">>
, example => 0})}, , example => 0})},
{ 'messages.out.rate' { 'messages.out.rate'
, mk( number(), #{ desc => <<"Message sent rate in 5s.">> , mk( number(), #{ desc => <<"Message sent rate in 5s">>
, example => 0})}, , example => 0})},
{ 'messages.qos0.in.count' { 'messages.qos0.in.count'
, mk( integer(), #{ desc => <<"Message with QoS 0 received count.">> , mk( integer(), #{ desc => <<"Message with QoS 0 received count">>
, example => 0})}, , example => 0})},
{ 'messages.qos0.in.rate' { 'messages.qos0.in.rate'
, mk( number(), #{ desc => <<"Message with QoS 0 received rate in 5s.">> , mk( number(), #{ desc => <<"Message with QoS 0 received rate in 5s">>
, example => 0})}, , example => 0})},
{ 'messages.qos0.out.count' { 'messages.qos0.out.count'
, mk( integer(), #{ desc => <<"Message with QoS 0 sent count.">> , mk( integer(), #{ desc => <<"Message with QoS 0 sent count">>
, example => 0})}, , example => 0})},
{ 'messages.qos0.out.rate' { 'messages.qos0.out.rate'
, mk( number(), #{ desc => <<"Message with QoS 0 sent rate in 5s.">> , mk( number(), #{ desc => <<"Message with QoS 0 sent rate in 5s">>
, example => 0})}, , example => 0})},
{ 'messages.qos1.in.count' { 'messages.qos1.in.count'
, mk( integer(), #{ desc => <<"Message with QoS 1 received count.">> , mk( integer(), #{ desc => <<"Message with QoS 1 received count">>
, example => 0})}, , example => 0})},
{ 'messages.qos1.in.rate' { 'messages.qos1.in.rate'
, mk( number(), #{ desc => <<"Message with QoS 1 received rate in 5s.">> , mk( number(), #{ desc => <<"Message with QoS 1 received rate in 5s">>
, example => 0})}, , example => 0})},
{ 'messages.qos1.out.count' { 'messages.qos1.out.count'
, mk( integer(), #{ desc => <<"Message with QoS 1 sent count.">> , mk( integer(), #{ desc => <<"Message with QoS 1 sent count">>
, example => 0})}, , example => 0})},
{ 'messages.qos1.out.rate' { 'messages.qos1.out.rate'
, mk( number(), #{ desc => <<"Message with QoS 1 sent rate in 5s.">> , mk( number(), #{ desc => <<"Message with QoS 1 sent rate in 5s">>
, example => 0})}, , example => 0})},
{ 'messages.qos2.in.count' { 'messages.qos2.in.count'
, mk( integer(), #{ desc => <<"Message with QoS 2 sent count.">> , mk( integer(), #{ desc => <<"Message with QoS 2 sent count">>
, example => 0})}, , example => 0})},
{ 'messages.qos2.in.rate' { 'messages.qos2.in.rate'
, mk( number(), #{ desc => <<"Message with QoS 2 received rate in 5s.">> , mk( number(), #{ desc => <<"Message with QoS 2 received rate in 5s">>
, example => 0})}, , example => 0})},
{ 'messages.qos2.out.count' { 'messages.qos2.out.count'
, mk( integer(), #{ desc => <<"Message with QoS 2 sent count.">> , mk( integer(), #{ desc => <<"Message with QoS 2 sent count">>
, example => 0})}, , example => 0})},
{ 'messages.qos2.out.rate' { 'messages.qos2.out.rate'
, mk( number(), #{ desc => <<"Message with QoS 2 sent rate in 5s.">> , mk( number(), #{ desc => <<"Message with QoS 2 sent rate in 5s">>
, example => 0})} , example => 0})}
]. ].
@ -205,7 +209,7 @@ topic(In) ->
Desc = <<"Raw topic string">>, Desc = <<"Raw topic string">>,
Example = "testtopic/1"; Example = "testtopic/1";
path -> path ->
Desc = <<"Notice Topic string in url path must encode">>, Desc = <<"Notice: Topic string in url path must be encoded">>,
Example = "testtopic%2F1" Example = "testtopic%2F1"
end, end,
{ topic { topic
@ -238,39 +242,31 @@ reset_examples() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
topic_metrics(get, _) -> topic_metrics(get, _) ->
case cluster_accumulation_metrics() of get_cluster_response([]);
{error, Reason} ->
{500, Reason};
{ok, Metrics} ->
{200, Metrics}
end;
topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) -> topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) ->
case reset(Topic) of case reset(Topic) of
ok -> {204}; ok ->
{error, Reason} -> reason2httpresp(Reason) get_cluster_response([Topic]);
{error, Reason} ->
reason2httpresp(Reason)
end; end;
topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) -> topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) ->
reset(), reset(),
{204}; get_cluster_response([]);
topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) -> topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) ->
{400, 'BAD_REQUEST', <<"Topic can not be empty">>}; {400, 'BAD_REQUEST', <<"Topic can not be empty">>};
topic_metrics(post, #{body := #{<<"topic">> := Topic}}) -> topic_metrics(post, #{body := #{<<"topic">> := Topic}}) ->
case emqx_modules_conf:add_topic_metrics(Topic) of case emqx_modules_conf:add_topic_metrics(Topic) of
{ok, Topic} -> {ok, Topic} ->
{204}; get_cluster_response([Topic]);
{error, Reason} -> {error, Reason} ->
reason2httpresp(Reason) reason2httpresp(Reason)
end. end.
operate_topic_metrics(get, #{bindings := #{topic := Topic0}}) -> operate_topic_metrics(get, #{bindings := #{topic := Topic0}}) ->
case cluster_accumulation_metrics(emqx_http_lib:uri_decode(Topic0)) of get_cluster_response([emqx_http_lib:uri_decode(Topic0)]);
{ok, Metrics} ->
{200, Metrics};
{error, Reason} ->
reason2httpresp(Reason)
end;
operate_topic_metrics(delete, #{bindings := #{topic := Topic0}}) -> operate_topic_metrics(delete, #{bindings := #{topic := Topic0}}) ->
case emqx_modules_conf:remove_topic_metrics(emqx_http_lib:uri_decode(Topic0)) of case emqx_modules_conf:remove_topic_metrics(emqx_http_lib:uri_decode(Topic0)) of
@ -297,7 +293,8 @@ cluster_accumulation_metrics(Topic) ->
{SuccResList, []} -> {SuccResList, []} ->
case lists:filter(fun({error, _}) -> false; (_) -> true case lists:filter(fun({error, _}) -> false; (_) -> true
end, SuccResList) of end, SuccResList) of
[] -> {error, topic_not_found}; [] ->
{error, topic_not_found};
TopicMetrics -> TopicMetrics ->
NTopicMetrics = [ [T] || T <- TopicMetrics], NTopicMetrics = [ [T] || T <- TopicMetrics],
[AccMetrics] = accumulate_nodes_metrics(NTopicMetrics), [AccMetrics] = accumulate_nodes_metrics(NTopicMetrics),
@ -389,3 +386,13 @@ reason2httpresp(topic_not_found) ->
reason2httpresp(not_found) -> reason2httpresp(not_found) ->
Msg = <<"Topic not found">>, Msg = <<"Topic not found">>,
{404, #{code => ?ERROR_TOPIC, message => Msg}}. {404, #{code => ?ERROR_TOPIC, message => Msg}}.
get_cluster_response(Args) ->
case erlang:apply(?MODULE, cluster_accumulation_metrics, Args) of
{error, {badrpc, RPCReason}} ->
{500, RPCReason};
{error, Reason} when is_atom(Reason) ->
reason2httpresp(Reason);
{ok, Metrics} ->
{200, Metrics}
end.