fix(emqx_slow_subs): change rpc call to bpapi
This commit is contained in:
parent
b9884de1d0
commit
361ca5be42
|
@ -16,3 +16,4 @@
|
|||
{emqx_statsd,1}.
|
||||
{emqx_telemetry,1}.
|
||||
{emqx_topic_metrics,1}.
|
||||
{emqx_slow_subs,1}.
|
||||
|
|
|
@ -337,30 +337,6 @@ normal_topic_filter() ->
|
|||
end
|
||||
end).
|
||||
|
||||
%% Type defined emqx_message_lantency_stats.erl - stats()
|
||||
latency_stats() ->
|
||||
Keys = [{threshold, number()},
|
||||
{ema, exp_moving_average()},
|
||||
{last_update_time, non_neg_integer()},
|
||||
{last_access_time, non_neg_integer()},
|
||||
{last_insert_value, non_neg_integer()}
|
||||
],
|
||||
?LET({Ks, M}, {Keys, map(limited_atom(), limited_any_term())},
|
||||
begin
|
||||
maps:merge(maps:from_list(Ks), M)
|
||||
end).
|
||||
|
||||
%% Type defined emqx_moving_average.erl - ema()
|
||||
exp_moving_average() ->
|
||||
Keys = [{type, exponential},
|
||||
{average, number()},
|
||||
{coefficient, float()}
|
||||
],
|
||||
?LET({Ks, M}, {Keys, map(limited_atom(), limited_any_term())},
|
||||
begin
|
||||
maps:merge(maps:from_list(Ks), M)
|
||||
end).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Basic Types
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -90,24 +90,15 @@ conf_schema() ->
|
|||
hoconsc:mk(Ref, #{}).
|
||||
|
||||
slow_subs(delete, _) ->
|
||||
Nodes = mria_mnesia:running_nodes(),
|
||||
_ = [rpc_call(Node, emqx_slow_subs, clear_history, [], ok, ?DEFAULT_RPC_TIMEOUT)
|
||||
|| Node <- Nodes],
|
||||
_ = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:clear_history(Nodes) end),
|
||||
{204};
|
||||
|
||||
slow_subs(get, _) ->
|
||||
Nodes = mria_mnesia:running_nodes(),
|
||||
Fun = fun(Node, Acc) ->
|
||||
NodeRankL = rpc_call(Node,
|
||||
?MODULE,
|
||||
get_history,
|
||||
[],
|
||||
[],
|
||||
?DEFAULT_RPC_TIMEOUT),
|
||||
NodeRankL ++ Acc
|
||||
NodeRankL = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:get_history(Nodes) end),
|
||||
Fun = fun({ok, L}, Acc) -> L ++ Acc;
|
||||
(_, Acc) -> Acc
|
||||
end,
|
||||
|
||||
RankL = lists:foldl(Fun, [], Nodes),
|
||||
RankL = lists:foldl(Fun, [], NodeRankL),
|
||||
|
||||
SortFun = fun(#{timespan := A}, #{timespan := B}) ->
|
||||
A > B
|
||||
|
@ -141,11 +132,6 @@ settings(put, #{body := Body}) ->
|
|||
_ = emqx_slow_subs:update_settings(Body),
|
||||
{200, emqx:get_raw_config([slow_subs], #{})}.
|
||||
|
||||
rpc_call(Node, M, F, A, _ErrorR, _T) when Node =:= node() ->
|
||||
erlang:apply(M, F, A);
|
||||
|
||||
rpc_call(Node, M, F, A, ErrorR, T) ->
|
||||
case rpc:call(Node, M, F, A, T) of
|
||||
{badrpc, _} -> ErrorR;
|
||||
Res -> Res
|
||||
end.
|
||||
rpc_call(Fun) ->
|
||||
Nodes = mria_mnesia:running_nodes(),
|
||||
Fun(Nodes).
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_slow_subs_proto_v1).
|
||||
|
||||
-behaviour(emqx_bpapi).
|
||||
|
||||
-export([introduced_in/0]).
|
||||
|
||||
-export([clear_history/1, get_history/1]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
introduced_in() ->
|
||||
"5.0.0".
|
||||
|
||||
-spec clear_history([node()]) -> emqx_rpc:erpc_multicall(map()).
|
||||
clear_history(Nodes) ->
|
||||
erpc:multicall(Nodes, emqx_slow_subs, clear_history, []).
|
||||
|
||||
-spec get_history([node()]) -> emqx_rpc:erpc_multicall(map()).
|
||||
get_history(Nodes) ->
|
||||
erpc:multicall(Nodes, emqx_slow_subs_api, get_history, []).
|
Loading…
Reference in New Issue