From e513583e709b2d428f6a38d716fc0852da2ed829 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Wed, 12 Jan 2022 23:52:32 +0100 Subject: [PATCH 1/2] refactor(emqx_topic_metrics): Decorate remote procedure calls --- .../src/emqx_topic_metrics_api.erl | 15 +++--- .../src/proto/emqx_topic_metrics_proto_v1.erl | 48 +++++++++++++++++++ 2 files changed, 56 insertions(+), 7 deletions(-) create mode 100644 apps/emqx_modules/src/proto/emqx_topic_metrics_proto_v1.erl diff --git a/apps/emqx_modules/src/emqx_topic_metrics_api.erl b/apps/emqx_modules/src/emqx_topic_metrics_api.erl index a59822795..1e82f7a94 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics_api.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics_api.erl @@ -183,7 +183,8 @@ operate_topic_metrics(delete, #{bindings := #{topic := Topic0}}) -> %%-------------------------------------------------------------------- cluster_accumulation_metrics() -> - case multicall(emqx_topic_metrics, metrics, []) of + Nodes = mria_mnesia:running_nodes(), + case emqx_topic_metrics_proto_v1:metrics(Nodes) of {SuccResList, []} -> {ok, accumulate_nodes_metrics(SuccResList)}; {_, FailedNodes} -> @@ -191,7 +192,8 @@ cluster_accumulation_metrics() -> end. cluster_accumulation_metrics(Topic) -> - case multicall(emqx_topic_metrics, metrics, [Topic]) of + Nodes = mria_mnesia:running_nodes(), + case emqx_topic_metrics_proto_v1:metrics(Nodes, Topic) of {SuccResList, []} -> case lists:filter(fun({error, _}) -> false; (_) -> true end, SuccResList) of @@ -244,11 +246,13 @@ do_accumulation_metrics(MetricsIn, {MetricsAcc, _}) -> end, #{}, Keys). reset() -> - _ = multicall(emqx_topic_metrics, reset, []), + Nodes = mria_mnesia:running_nodes(), + _ = emqx_topic_metrics_proto_v1:reset(Nodes), ok. reset(Topic) -> - case multicall(emqx_topic_metrics, reset, [Topic]) of + Nodes = mria_mnesia:running_nodes(), + case emqx_topic_metrics_proto_v1:reset(Nodes, Topic) of {SuccResList, []} -> case lists:filter(fun({error, _}) -> true; (_) -> false end, SuccResList) of @@ -262,9 +266,6 @@ reset(Topic) -> %%-------------------------------------------------------------------- %% utils -multicall(M, F, A) -> - emqx_rpc:multicall(mria_mnesia:running_nodes(), M, F, A). - reason2httpresp(quota_exceeded) -> Msg = list_to_binary( io_lib:format("Max topic metrics count is ~p", diff --git a/apps/emqx_modules/src/proto/emqx_topic_metrics_proto_v1.erl b/apps/emqx_modules/src/proto/emqx_topic_metrics_proto_v1.erl new file mode 100644 index 000000000..643cb4c3c --- /dev/null +++ b/apps/emqx_modules/src/proto/emqx_topic_metrics_proto_v1.erl @@ -0,0 +1,48 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_topic_metrics_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , metrics/1 + , metrics/2 + , reset/1 + , reset/2 + ]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec metrics([node()]) -> emqx_rpc:multicall_result(). +metrics(Nodes) -> + emqx_rpc:multicall(Nodes, emqx_topic_metrics, metrics, []). + +-spec metrics([node()], emqx_types:topic()) -> emqx_rpc:multicall_result(). +metrics(Nodes, Topic) -> + emqx_rpc:multicall(Nodes, emqx_topic_metrics, metrics, [Topic]). + +-spec reset([node()]) -> emqx_rpc:multicall_result(). +reset(Nodes) -> + emqx_rpc:multicall(Nodes, emqx_topic_metrics, reset, []). + +-spec reset([node()], emqx_types:topic()) -> emqx_rpc:multicall_result(). +reset(Nodes, Topic) -> + emqx_rpc:multicall(Nodes, emqx_topic_metrics, reset, [Topic]). From 8570df075c247f7f4028b60fa4dc48cb1205e1ea Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Thu, 13 Jan 2022 00:35:49 +0100 Subject: [PATCH 2/2] refactor(emqx_telemetry): Decorate remote procedure calls --- apps/emqx_modules/src/emqx_telemetry.erl | 2 +- apps/emqx_modules/src/emqx_telemetry_api.erl | 17 ++++++----------- ...proto_v1.erl => emqx_telemetry_proto_v1.erl} | 12 +++++++++++- apps/emqx_modules/test/emqx_telemetry_SUITE.erl | 6 ++++-- 4 files changed, 22 insertions(+), 15 deletions(-) rename apps/emqx_modules/src/proto/{emqx_modules_proto_v1.erl => emqx_telemetry_proto_v1.erl} (76%) diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 3aa7829e7..3da9a6d9d 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -256,7 +256,7 @@ uptime() -> nodes_uuid() -> Nodes = lists:delete(node(), mria_mnesia:running_nodes()), lists:foldl(fun(Node, Acc) -> - case emqx_modules_proto_v1:get_uuid(Node) of + case emqx_telemetry_proto_v1:get_uuid(Node) of {badrpc, _Reason} -> Acc; {ok, UUID} -> diff --git a/apps/emqx_modules/src/emqx_telemetry_api.erl b/apps/emqx_modules/src/emqx_telemetry_api.erl index 887de4305..bc5404099 100644 --- a/apps/emqx_modules/src/emqx_telemetry_api.erl +++ b/apps/emqx_modules/src/emqx_telemetry_api.erl @@ -153,15 +153,10 @@ enable_telemetry(Enable) -> enable_telemetry(Node, Enable) end, mria_mnesia:running_nodes()). -enable_telemetry(Node, Enable) when Node =:= node() -> - case Enable of - true -> - emqx_telemetry:enable(); - false -> - emqx_telemetry:disable() - end; -enable_telemetry(Node, Enable) -> - rpc_call(Node, ?MODULE, enable_telemetry, [Node, Enable]). +enable_telemetry(Node, true) -> + is_ok(emqx_telemetry_proto_v1:enable_telemetry(Node)); +enable_telemetry(Node, false) -> + is_ok(emqx_telemetry_proto_v1:disable_telemetry(Node)). get_telemetry_status() -> #{enabled => emqx_telemetry:get_status()}. @@ -170,8 +165,8 @@ get_telemetry_data() -> {ok, TelemetryData} = emqx_telemetry:get_telemetry(), TelemetryData. -rpc_call(Node, Module, Fun, Args) -> - case rpc:call(Node, Module, Fun, Args) of +is_ok(Result) -> + case Result of {badrpc, Reason} -> {error, Reason}; Result -> Result end. diff --git a/apps/emqx_modules/src/proto/emqx_modules_proto_v1.erl b/apps/emqx_modules/src/proto/emqx_telemetry_proto_v1.erl similarity index 76% rename from apps/emqx_modules/src/proto/emqx_modules_proto_v1.erl rename to apps/emqx_modules/src/proto/emqx_telemetry_proto_v1.erl index b9cfaa7f5..81aa07b65 100644 --- a/apps/emqx_modules/src/proto/emqx_modules_proto_v1.erl +++ b/apps/emqx_modules/src/proto/emqx_telemetry_proto_v1.erl @@ -14,13 +14,15 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_modules_proto_v1). +-module(emqx_telemetry_proto_v1). -behaviour(emqx_bpapi). -export([ introduced_in/0 , get_uuid/1 + , enable_telemetry/1 + , disable_telemetry/1 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -31,3 +33,11 @@ introduced_in() -> -spec get_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc(). get_uuid(Node) -> rpc:call(Node, emqx_telemetry, get_uuid, []). + +-spec enable_telemetry(node()) -> _. +enable_telemetry(Node) -> + rpc:call(Node, emqx_telemetry, enable, []). + +-spec disable_telemetry(node()) -> _. +disable_telemetry(Node) -> + rpc:call(Node, emqx_telemetry, disable, []). diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index a32895fc1..43c262989 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -43,11 +43,13 @@ t_uuid(_) -> {ok, UUID2} = emqx_telemetry:get_uuid(), emqx_telemetry:disable(), emqx_telemetry:enable(), + emqx_telemetry_proto_v1:disable_telemetry(node()), + emqx_telemetry_proto_v1:enable_telemetry(node()), {ok, UUID3} = emqx_telemetry:get_uuid(), - {ok, UUID4} = emqx_modules_proto_v1:get_uuid(node()), + {ok, UUID4} = emqx_telemetry_proto_v1:get_uuid(node()), ?assertEqual(UUID2, UUID3), ?assertEqual(UUID3, UUID4), - ?assertMatch({badrpc, nodedown}, emqx_modules_proto_v1:get_uuid('fake@node')). + ?assertMatch({badrpc, nodedown}, emqx_telemetry_proto_v1:get_uuid('fake@node')). t_official_version(_) -> true = emqx_telemetry:official_version("0.0.0"),