diff --git a/.github/workflows/run_api_tests.yaml b/.github/workflows/run_api_tests.yaml index e8ab99f57..c5a8b94cb 100644 --- a/.github/workflows/run_api_tests.yaml +++ b/.github/workflows/run_api_tests.yaml @@ -86,7 +86,7 @@ jobs: - uses: actions/checkout@v2 with: repository: emqx/emqx-fvt - ref: 1.0.3-dev2 + ref: 1.0.4-dev1 path: . - uses: actions/setup-java@v1 with: diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index ab333cf87..fded10e03 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -1799,6 +1799,7 @@ t_clients_api(_) -> %% kickout {204, _} = request(delete, "/gateway/mqttsn/clients/client_id_test1"), + timer:sleep(100), {200, #{data := []}} = request(get, "/gateway/mqttsn/clients"), send_disconnect_msg(Socket, undefined), diff --git a/apps/emqx_modules/src/emqx_topic_metrics_api.erl b/apps/emqx_modules/src/emqx_topic_metrics_api.erl index 9f826a8f9..24c516f60 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics_api.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics_api.erl @@ -31,6 +31,9 @@ , operate_topic_metrics/2 ]). +-export([ cluster_accumulation_metrics/0 + , cluster_accumulation_metrics/1]). + -export([ api_spec/0 , paths/0 , schema/1 @@ -40,6 +43,7 @@ -define(ERROR_TOPIC, 'ERROR_TOPIC'). -define(EXCEED_LIMIT, 'EXCEED_LIMIT'). -define(BAD_TOPIC, 'BAD_TOPIC'). +-define(BAD_RPC, 'BAD_RPC'). -define(BAD_REQUEST, 'BAD_REQUEST'). -define(API_TAG_MQTT, [<<"mqtt">>]). @@ -59,16 +63,16 @@ schema("/mqtt/topic_metrics") -> #{ description => <<"List topic metrics">> , tags => ?API_TAG_MQTT , responses => - #{200 => mk(array(hoconsc:ref(topic_metrics)), #{ desc => <<"List topic metrics">>})} + #{200 => mk(array(hoconsc:ref(topic_metrics)), #{ desc => <<"List all topic metrics">>})} } , put => - #{ description => <<"Reset topic metrics by topic name, or all">> + #{ description => <<"Reset topic metrics by topic name. Or reset all Topic Metrics">> , tags => ?API_TAG_MQTT , 'requestBody' => emqx_dashboard_swagger:schema_with_examples( ref(reset), reset_examples()) , responses => - #{ 204 => <<"Reset topic metrics success">> + #{ 204 => <<"Reset topic metrics successfully">> , 404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>) } } @@ -78,8 +82,8 @@ schema("/mqtt/topic_metrics") -> , 'requestBody' => [topic(body)] , responses => #{ 204 => <<"Create topic metrics success">> - , 409 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], <<"Topic metrics max limit">>) - , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST, ?BAD_TOPIC], <<"Topic metrics already exist or bad topic">>) + , 409 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], <<"Topic metrics exceeded max limit 512">>) + , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST, ?BAD_TOPIC], <<"Topic metrics already existed or bad topic">>) } } }; @@ -108,7 +112,7 @@ schema("/mqtt/topic_metrics/:topic") -> fields(reset) -> [ {topic , mk( binary() - , #{ desc => <<"Topic Name. If this paramter is not present, all created topic metrics will be reset.">> + , #{ desc => <<"Topic Name. If this paramter is not present, all created topic metrics will be reseted">> , example => <<"testtopic/1">> , nullable => true})} , {action @@ -122,80 +126,80 @@ fields(reset) -> fields(topic_metrics) -> [ { topic , mk( binary() - , #{ desc => <<"MQTT Topic">> + , #{ desc => <<"Topic Name">> , example => <<"testtopic/1">> , nullable => false})}, { create_time , mk( emqx_schema:rfc3339_system_time() - , #{ desc => <<"Date time, rfc3339">> + , #{ desc => <<"Topic Metrics created date time, in rfc3339">> , nullable => false , example => <<"2022-01-14T21:48:47+08:00">>})}, { reset_time , mk( emqx_schema:rfc3339_system_time() - , #{ desc => <<"Nullable. Date time, rfc3339.">> + , #{ desc => <<"Topic Metrics reset date time, in rfc3339. Nullable if never reseted">> , nullable => true , example => <<"2022-01-14T21:48:47+08:00">>})}, { metrics , mk( ref(metrics) - , #{ desc => <<"MQTT Topic Metrics">> + , #{ desc => <<"Topic Metrics fields">> , nullable => false}) } ]; fields(metrics) -> [ { 'messages.dropped.count' - , mk( integer(), #{ desc => <<"Message dropped count.">> + , mk( integer(), #{ desc => <<"Message dropped count">> , example => 0})}, { 'messages.dropped.rate' - , mk( number(), #{ desc => <<"Message dropped rate in 5s.">> + , mk( number(), #{ desc => <<"Message dropped rate in 5s">> , example => 0})}, { 'messages.in.count' - , mk( integer(), #{ desc => <<"Message received count.">> + , mk( integer(), #{ desc => <<"Message received count">> , example => 0})}, { 'messages.in.rate' , mk( number(), #{ desc => <<"Message received rate in 5s">> , example => 0})}, { 'messages.out.count' - , mk( integer(), #{ desc => <<"Message sent count.">> + , mk( integer(), #{ desc => <<"Message sent count">> , example => 0})}, { 'messages.out.rate' - , mk( number(), #{ desc => <<"Message sent rate in 5s.">> + , mk( number(), #{ desc => <<"Message sent rate in 5s">> , example => 0})}, { 'messages.qos0.in.count' - , mk( integer(), #{ desc => <<"Message with QoS 0 received count.">> + , mk( integer(), #{ desc => <<"Message with QoS 0 received count">> , example => 0})}, { 'messages.qos0.in.rate' - , mk( number(), #{ desc => <<"Message with QoS 0 received rate in 5s.">> + , mk( number(), #{ desc => <<"Message with QoS 0 received rate in 5s">> , example => 0})}, { 'messages.qos0.out.count' - , mk( integer(), #{ desc => <<"Message with QoS 0 sent count.">> + , mk( integer(), #{ desc => <<"Message with QoS 0 sent count">> , example => 0})}, { 'messages.qos0.out.rate' - , mk( number(), #{ desc => <<"Message with QoS 0 sent rate in 5s.">> + , mk( number(), #{ desc => <<"Message with QoS 0 sent rate in 5s">> , example => 0})}, { 'messages.qos1.in.count' - , mk( integer(), #{ desc => <<"Message with QoS 1 received count.">> + , mk( integer(), #{ desc => <<"Message with QoS 1 received count">> , example => 0})}, { 'messages.qos1.in.rate' - , mk( number(), #{ desc => <<"Message with QoS 1 received rate in 5s.">> + , mk( number(), #{ desc => <<"Message with QoS 1 received rate in 5s">> , example => 0})}, { 'messages.qos1.out.count' - , mk( integer(), #{ desc => <<"Message with QoS 1 sent count.">> + , mk( integer(), #{ desc => <<"Message with QoS 1 sent count">> , example => 0})}, { 'messages.qos1.out.rate' - , mk( number(), #{ desc => <<"Message with QoS 1 sent rate in 5s.">> + , mk( number(), #{ desc => <<"Message with QoS 1 sent rate in 5s">> , example => 0})}, { 'messages.qos2.in.count' - , mk( integer(), #{ desc => <<"Message with QoS 2 sent count.">> + , mk( integer(), #{ desc => <<"Message with QoS 2 sent count">> , example => 0})}, { 'messages.qos2.in.rate' - , mk( number(), #{ desc => <<"Message with QoS 2 received rate in 5s.">> + , mk( number(), #{ desc => <<"Message with QoS 2 received rate in 5s">> , example => 0})}, { 'messages.qos2.out.count' - , mk( integer(), #{ desc => <<"Message with QoS 2 sent count.">> + , mk( integer(), #{ desc => <<"Message with QoS 2 sent count">> , example => 0})}, { 'messages.qos2.out.rate' - , mk( number(), #{ desc => <<"Message with QoS 2 sent rate in 5s.">> + , mk( number(), #{ desc => <<"Message with QoS 2 sent rate in 5s">> , example => 0})} ]. @@ -205,7 +209,7 @@ topic(In) -> Desc = <<"Raw topic string">>, Example = "testtopic/1"; path -> - Desc = <<"Notice Topic string in url path must encode">>, + Desc = <<"Notice: Topic string in url path must be encoded">>, Example = "testtopic%2F1" end, { topic @@ -238,39 +242,31 @@ reset_examples() -> %%-------------------------------------------------------------------- topic_metrics(get, _) -> - case cluster_accumulation_metrics() of - {error, Reason} -> - {500, Reason}; - {ok, Metrics} -> - {200, Metrics} - end; + get_cluster_response([]); topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) -> case reset(Topic) of - ok -> {204}; - {error, Reason} -> reason2httpresp(Reason) + ok -> + get_cluster_response([Topic]); + {error, Reason} -> + reason2httpresp(Reason) end; topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) -> reset(), - {204}; + get_cluster_response([]); topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) -> {400, 'BAD_REQUEST', <<"Topic can not be empty">>}; topic_metrics(post, #{body := #{<<"topic">> := Topic}}) -> case emqx_modules_conf:add_topic_metrics(Topic) of {ok, Topic} -> - {204}; + get_cluster_response([Topic]); {error, Reason} -> reason2httpresp(Reason) end. operate_topic_metrics(get, #{bindings := #{topic := Topic0}}) -> - case cluster_accumulation_metrics(emqx_http_lib:uri_decode(Topic0)) of - {ok, Metrics} -> - {200, Metrics}; - {error, Reason} -> - reason2httpresp(Reason) - end; + get_cluster_response([emqx_http_lib:uri_decode(Topic0)]); operate_topic_metrics(delete, #{bindings := #{topic := Topic0}}) -> case emqx_modules_conf:remove_topic_metrics(emqx_http_lib:uri_decode(Topic0)) of @@ -297,7 +293,8 @@ cluster_accumulation_metrics(Topic) -> {SuccResList, []} -> case lists:filter(fun({error, _}) -> false; (_) -> true end, SuccResList) of - [] -> {error, topic_not_found}; + [] -> + {error, topic_not_found}; TopicMetrics -> NTopicMetrics = [ [T] || T <- TopicMetrics], [AccMetrics] = accumulate_nodes_metrics(NTopicMetrics), @@ -389,3 +386,13 @@ reason2httpresp(topic_not_found) -> reason2httpresp(not_found) -> Msg = <<"Topic not found">>, {404, #{code => ?ERROR_TOPIC, message => Msg}}. + +get_cluster_response(Args) -> + case erlang:apply(?MODULE, cluster_accumulation_metrics, Args) of + {error, {badrpc, RPCReason}} -> + {500, RPCReason}; + {error, Reason} when is_atom(Reason) -> + reason2httpresp(Reason); + {ok, Metrics} -> + {200, Metrics} + end.