From 361ca5be422add97e6d61ec401f7a8e8de26ed44 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 10 Feb 2022 17:40:19 +0800 Subject: [PATCH] fix(emqx_slow_subs): change rpc call to bpapi --- apps/emqx/priv/bpapi.versions | 1 + apps/emqx/test/emqx_proper_types.erl | 24 ------------- .../emqx_slow_subs/src/emqx_slow_subs_api.erl | 30 +++++----------- .../src/proto/emqx_slow_subs_proto_v1.erl | 36 +++++++++++++++++++ 4 files changed, 45 insertions(+), 46 deletions(-) create mode 100644 apps/emqx_slow_subs/src/proto/emqx_slow_subs_proto_v1.erl diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 6b1926b30..1f040e4b3 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -16,3 +16,4 @@ {emqx_statsd,1}. {emqx_telemetry,1}. {emqx_topic_metrics,1}. +{emqx_slow_subs,1}. diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 5ddaa4d0d..4f54b9fed 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -337,30 +337,6 @@ normal_topic_filter() -> end end). -%% Type defined emqx_message_lantency_stats.erl - stats() -latency_stats() -> - Keys = [{threshold, number()}, - {ema, exp_moving_average()}, - {last_update_time, non_neg_integer()}, - {last_access_time, non_neg_integer()}, - {last_insert_value, non_neg_integer()} - ], - ?LET({Ks, M}, {Keys, map(limited_atom(), limited_any_term())}, - begin - maps:merge(maps:from_list(Ks), M) - end). - -%% Type defined emqx_moving_average.erl - ema() -exp_moving_average() -> - Keys = [{type, exponential}, - {average, number()}, - {coefficient, float()} - ], - ?LET({Ks, M}, {Keys, map(limited_atom(), limited_any_term())}, - begin - maps:merge(maps:from_list(Ks), M) - end). - %%-------------------------------------------------------------------- %% Basic Types %%-------------------------------------------------------------------- diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl index 271afa626..99adf1e53 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl @@ -90,24 +90,15 @@ conf_schema() -> hoconsc:mk(Ref, #{}). slow_subs(delete, _) -> - Nodes = mria_mnesia:running_nodes(), - _ = [rpc_call(Node, emqx_slow_subs, clear_history, [], ok, ?DEFAULT_RPC_TIMEOUT) - || Node <- Nodes], + _ = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:clear_history(Nodes) end), {204}; slow_subs(get, _) -> - Nodes = mria_mnesia:running_nodes(), - Fun = fun(Node, Acc) -> - NodeRankL = rpc_call(Node, - ?MODULE, - get_history, - [], - [], - ?DEFAULT_RPC_TIMEOUT), - NodeRankL ++ Acc + NodeRankL = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:get_history(Nodes) end), + Fun = fun({ok, L}, Acc) -> L ++ Acc; + (_, Acc) -> Acc end, - - RankL = lists:foldl(Fun, [], Nodes), + RankL = lists:foldl(Fun, [], NodeRankL), SortFun = fun(#{timespan := A}, #{timespan := B}) -> A > B @@ -141,11 +132,6 @@ settings(put, #{body := Body}) -> _ = emqx_slow_subs:update_settings(Body), {200, emqx:get_raw_config([slow_subs], #{})}. -rpc_call(Node, M, F, A, _ErrorR, _T) when Node =:= node() -> - erlang:apply(M, F, A); - -rpc_call(Node, M, F, A, ErrorR, T) -> - case rpc:call(Node, M, F, A, T) of - {badrpc, _} -> ErrorR; - Res -> Res - end. +rpc_call(Fun) -> + Nodes = mria_mnesia:running_nodes(), + Fun(Nodes). diff --git a/apps/emqx_slow_subs/src/proto/emqx_slow_subs_proto_v1.erl b/apps/emqx_slow_subs/src/proto/emqx_slow_subs_proto_v1.erl new file mode 100644 index 000000000..2e6fc7044 --- /dev/null +++ b/apps/emqx_slow_subs/src/proto/emqx_slow_subs_proto_v1.erl @@ -0,0 +1,36 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_slow_subs_proto_v1). + +-behaviour(emqx_bpapi). + +-export([introduced_in/0]). + +-export([clear_history/1, get_history/1]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec clear_history([node()]) -> emqx_rpc:erpc_multicall(map()). +clear_history(Nodes) -> + erpc:multicall(Nodes, emqx_slow_subs, clear_history, []). + +-spec get_history([node()]) -> emqx_rpc:erpc_multicall(map()). +get_history(Nodes) -> + erpc:multicall(Nodes, emqx_slow_subs_api, get_history, []).