Merge pull request #6727 from k32/bpapi-topic-metrics
refactor(emqx_topic_metrics): Decorate remote procedure calls
This commit is contained in:
commit
727dec14c7
|
@ -256,7 +256,7 @@ uptime() ->
|
||||||
nodes_uuid() ->
|
nodes_uuid() ->
|
||||||
Nodes = lists:delete(node(), mria_mnesia:running_nodes()),
|
Nodes = lists:delete(node(), mria_mnesia:running_nodes()),
|
||||||
lists:foldl(fun(Node, Acc) ->
|
lists:foldl(fun(Node, Acc) ->
|
||||||
case emqx_modules_proto_v1:get_uuid(Node) of
|
case emqx_telemetry_proto_v1:get_uuid(Node) of
|
||||||
{badrpc, _Reason} ->
|
{badrpc, _Reason} ->
|
||||||
Acc;
|
Acc;
|
||||||
{ok, UUID} ->
|
{ok, UUID} ->
|
||||||
|
|
|
@ -153,15 +153,10 @@ enable_telemetry(Enable) ->
|
||||||
enable_telemetry(Node, Enable)
|
enable_telemetry(Node, Enable)
|
||||||
end, mria_mnesia:running_nodes()).
|
end, mria_mnesia:running_nodes()).
|
||||||
|
|
||||||
enable_telemetry(Node, Enable) when Node =:= node() ->
|
enable_telemetry(Node, true) ->
|
||||||
case Enable of
|
is_ok(emqx_telemetry_proto_v1:enable_telemetry(Node));
|
||||||
true ->
|
enable_telemetry(Node, false) ->
|
||||||
emqx_telemetry:enable();
|
is_ok(emqx_telemetry_proto_v1:disable_telemetry(Node)).
|
||||||
false ->
|
|
||||||
emqx_telemetry:disable()
|
|
||||||
end;
|
|
||||||
enable_telemetry(Node, Enable) ->
|
|
||||||
rpc_call(Node, ?MODULE, enable_telemetry, [Node, Enable]).
|
|
||||||
|
|
||||||
get_telemetry_status() ->
|
get_telemetry_status() ->
|
||||||
#{enabled => emqx_telemetry:get_status()}.
|
#{enabled => emqx_telemetry:get_status()}.
|
||||||
|
@ -170,8 +165,8 @@ get_telemetry_data() ->
|
||||||
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
|
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
|
||||||
TelemetryData.
|
TelemetryData.
|
||||||
|
|
||||||
rpc_call(Node, Module, Fun, Args) ->
|
is_ok(Result) ->
|
||||||
case rpc:call(Node, Module, Fun, Args) of
|
case Result of
|
||||||
{badrpc, Reason} -> {error, Reason};
|
{badrpc, Reason} -> {error, Reason};
|
||||||
Result -> Result
|
Result -> Result
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -183,7 +183,8 @@ operate_topic_metrics(delete, #{bindings := #{topic := Topic0}}) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
cluster_accumulation_metrics() ->
|
cluster_accumulation_metrics() ->
|
||||||
case multicall(emqx_topic_metrics, metrics, []) of
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
|
case emqx_topic_metrics_proto_v1:metrics(Nodes) of
|
||||||
{SuccResList, []} ->
|
{SuccResList, []} ->
|
||||||
{ok, accumulate_nodes_metrics(SuccResList)};
|
{ok, accumulate_nodes_metrics(SuccResList)};
|
||||||
{_, FailedNodes} ->
|
{_, FailedNodes} ->
|
||||||
|
@ -191,7 +192,8 @@ cluster_accumulation_metrics() ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
cluster_accumulation_metrics(Topic) ->
|
cluster_accumulation_metrics(Topic) ->
|
||||||
case multicall(emqx_topic_metrics, metrics, [Topic]) of
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
|
case emqx_topic_metrics_proto_v1:metrics(Nodes, Topic) of
|
||||||
{SuccResList, []} ->
|
{SuccResList, []} ->
|
||||||
case lists:filter(fun({error, _}) -> false; (_) -> true
|
case lists:filter(fun({error, _}) -> false; (_) -> true
|
||||||
end, SuccResList) of
|
end, SuccResList) of
|
||||||
|
@ -244,11 +246,13 @@ do_accumulation_metrics(MetricsIn, {MetricsAcc, _}) ->
|
||||||
end, #{}, Keys).
|
end, #{}, Keys).
|
||||||
|
|
||||||
reset() ->
|
reset() ->
|
||||||
_ = multicall(emqx_topic_metrics, reset, []),
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
|
_ = emqx_topic_metrics_proto_v1:reset(Nodes),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
reset(Topic) ->
|
reset(Topic) ->
|
||||||
case multicall(emqx_topic_metrics, reset, [Topic]) of
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
|
case emqx_topic_metrics_proto_v1:reset(Nodes, Topic) of
|
||||||
{SuccResList, []} ->
|
{SuccResList, []} ->
|
||||||
case lists:filter(fun({error, _}) -> true; (_) -> false
|
case lists:filter(fun({error, _}) -> true; (_) -> false
|
||||||
end, SuccResList) of
|
end, SuccResList) of
|
||||||
|
@ -262,9 +266,6 @@ reset(Topic) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% utils
|
%% utils
|
||||||
|
|
||||||
multicall(M, F, A) ->
|
|
||||||
emqx_rpc:multicall(mria_mnesia:running_nodes(), M, F, A).
|
|
||||||
|
|
||||||
reason2httpresp(quota_exceeded) ->
|
reason2httpresp(quota_exceeded) ->
|
||||||
Msg = list_to_binary(
|
Msg = list_to_binary(
|
||||||
io_lib:format("Max topic metrics count is ~p",
|
io_lib:format("Max topic metrics count is ~p",
|
||||||
|
|
|
@ -14,13 +14,15 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_modules_proto_v1).
|
-module(emqx_telemetry_proto_v1).
|
||||||
|
|
||||||
-behaviour(emqx_bpapi).
|
-behaviour(emqx_bpapi).
|
||||||
|
|
||||||
-export([ introduced_in/0
|
-export([ introduced_in/0
|
||||||
|
|
||||||
, get_uuid/1
|
, get_uuid/1
|
||||||
|
, enable_telemetry/1
|
||||||
|
, disable_telemetry/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("emqx/include/bpapi.hrl").
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
@ -31,3 +33,11 @@ introduced_in() ->
|
||||||
-spec get_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc().
|
-spec get_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc().
|
||||||
get_uuid(Node) ->
|
get_uuid(Node) ->
|
||||||
rpc:call(Node, emqx_telemetry, get_uuid, []).
|
rpc:call(Node, emqx_telemetry, get_uuid, []).
|
||||||
|
|
||||||
|
-spec enable_telemetry(node()) -> _.
|
||||||
|
enable_telemetry(Node) ->
|
||||||
|
rpc:call(Node, emqx_telemetry, enable, []).
|
||||||
|
|
||||||
|
-spec disable_telemetry(node()) -> _.
|
||||||
|
disable_telemetry(Node) ->
|
||||||
|
rpc:call(Node, emqx_telemetry, disable, []).
|
|
@ -0,0 +1,48 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_topic_metrics_proto_v1).
|
||||||
|
|
||||||
|
-behaviour(emqx_bpapi).
|
||||||
|
|
||||||
|
-export([ introduced_in/0
|
||||||
|
|
||||||
|
, metrics/1
|
||||||
|
, metrics/2
|
||||||
|
, reset/1
|
||||||
|
, reset/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
|
||||||
|
introduced_in() ->
|
||||||
|
"5.0.0".
|
||||||
|
|
||||||
|
-spec metrics([node()]) -> emqx_rpc:multicall_result().
|
||||||
|
metrics(Nodes) ->
|
||||||
|
emqx_rpc:multicall(Nodes, emqx_topic_metrics, metrics, []).
|
||||||
|
|
||||||
|
-spec metrics([node()], emqx_types:topic()) -> emqx_rpc:multicall_result().
|
||||||
|
metrics(Nodes, Topic) ->
|
||||||
|
emqx_rpc:multicall(Nodes, emqx_topic_metrics, metrics, [Topic]).
|
||||||
|
|
||||||
|
-spec reset([node()]) -> emqx_rpc:multicall_result().
|
||||||
|
reset(Nodes) ->
|
||||||
|
emqx_rpc:multicall(Nodes, emqx_topic_metrics, reset, []).
|
||||||
|
|
||||||
|
-spec reset([node()], emqx_types:topic()) -> emqx_rpc:multicall_result().
|
||||||
|
reset(Nodes, Topic) ->
|
||||||
|
emqx_rpc:multicall(Nodes, emqx_topic_metrics, reset, [Topic]).
|
|
@ -43,11 +43,13 @@ t_uuid(_) ->
|
||||||
{ok, UUID2} = emqx_telemetry:get_uuid(),
|
{ok, UUID2} = emqx_telemetry:get_uuid(),
|
||||||
emqx_telemetry:disable(),
|
emqx_telemetry:disable(),
|
||||||
emqx_telemetry:enable(),
|
emqx_telemetry:enable(),
|
||||||
|
emqx_telemetry_proto_v1:disable_telemetry(node()),
|
||||||
|
emqx_telemetry_proto_v1:enable_telemetry(node()),
|
||||||
{ok, UUID3} = emqx_telemetry:get_uuid(),
|
{ok, UUID3} = emqx_telemetry:get_uuid(),
|
||||||
{ok, UUID4} = emqx_modules_proto_v1:get_uuid(node()),
|
{ok, UUID4} = emqx_telemetry_proto_v1:get_uuid(node()),
|
||||||
?assertEqual(UUID2, UUID3),
|
?assertEqual(UUID2, UUID3),
|
||||||
?assertEqual(UUID3, UUID4),
|
?assertEqual(UUID3, UUID4),
|
||||||
?assertMatch({badrpc, nodedown}, emqx_modules_proto_v1:get_uuid('fake@node')).
|
?assertMatch({badrpc, nodedown}, emqx_telemetry_proto_v1:get_uuid('fake@node')).
|
||||||
|
|
||||||
t_official_version(_) ->
|
t_official_version(_) ->
|
||||||
true = emqx_telemetry:official_version("0.0.0"),
|
true = emqx_telemetry:official_version("0.0.0"),
|
||||||
|
|
Loading…
Reference in New Issue