Merge pull request #6792 from lafirest/fix/slow_subs_api
fix(emqx_slow_subs_api): return error when the module is not enable
This commit is contained in:
commit
48b7cc34b6
|
@ -50,27 +50,7 @@ clear_history(_Bindings, _Params) ->
|
|||
return(ok).
|
||||
|
||||
get_history(_Bindings, _Params) ->
|
||||
Nodes = ekka_mnesia:running_nodes(),
|
||||
Fun = fun(Node, Acc) ->
|
||||
NodeRankL = rpc_call(Node,
|
||||
?MODULE,
|
||||
?FUNCTION_NAME,
|
||||
[],
|
||||
[],
|
||||
?DEFAULT_RPC_TIMEOUT),
|
||||
NodeRankL ++ Acc
|
||||
end,
|
||||
|
||||
RankL = lists:foldl(Fun, [], Nodes),
|
||||
|
||||
SortFun = fun(#{timespan := A}, #{timespan := B}) ->
|
||||
A > B
|
||||
end,
|
||||
|
||||
SortedL = lists:sort(SortFun, RankL),
|
||||
SortedL2 = lists:sublist(SortedL, ?MAX_SIZE),
|
||||
|
||||
return({ok, SortedL2}).
|
||||
execute_when_enabled(fun do_get_history/0).
|
||||
|
||||
get_history() ->
|
||||
Node = node(),
|
||||
|
@ -84,10 +64,36 @@ get_history() ->
|
|||
, timespan => TimeSpan
|
||||
, last_update_time => LastUpdateTime
|
||||
}
|
||||
end,
|
||||
end,
|
||||
|
||||
lists:map(ConvFun, RankL).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
do_get_history() ->
|
||||
Nodes = ekka_mnesia:running_nodes(),
|
||||
Fun = fun(Node, Acc) ->
|
||||
NodeRankL = rpc_call(Node,
|
||||
?MODULE,
|
||||
get_history,
|
||||
[],
|
||||
[],
|
||||
?DEFAULT_RPC_TIMEOUT),
|
||||
NodeRankL ++ Acc
|
||||
end,
|
||||
|
||||
RankL = lists:foldl(Fun, [], Nodes),
|
||||
|
||||
SortFun = fun(#{timespan := A}, #{timespan := B}) ->
|
||||
A > B
|
||||
end,
|
||||
|
||||
SortedL = lists:sort(SortFun, RankL),
|
||||
SortedL2 = lists:sublist(SortedL, ?MAX_SIZE),
|
||||
|
||||
return({ok, SortedL2}).
|
||||
|
||||
rpc_call(Node, M, F, A, _ErrorR, _T) when Node =:= node() ->
|
||||
erlang:apply(M, F, A);
|
||||
|
||||
|
@ -96,3 +102,15 @@ rpc_call(Node, M, F, A, ErrorR, T) ->
|
|||
{badrpc, _} -> ErrorR;
|
||||
Res -> Res
|
||||
end.
|
||||
|
||||
-ifdef(EMQX_ENTERPRISE).
|
||||
execute_when_enabled(Fun) ->
|
||||
Fun().
|
||||
-else.
|
||||
%% this code from emqx_mod_api_topics_metrics:execute_when_enabled
|
||||
execute_when_enabled(Fun) ->
|
||||
case emqx_modules:find_module(emqx_mod_slow_subs) of
|
||||
[{_, true}] -> Fun();
|
||||
_ -> return({error, module_not_loaded})
|
||||
end.
|
||||
-endif.
|
||||
|
|
|
@ -39,6 +39,8 @@ all() ->
|
|||
emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
ok = meck:new([emqx_modules], [passthrough, no_history, no_link]),
|
||||
ok = meck:expect(emqx_modules, find_module, fun(_) -> [{true, true}] end),
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
application:load(emqx_plugin_libs),
|
||||
emqx_ct_helpers:start_apps([emqx_modules, emqx_management, emqx_dashboard]),
|
||||
|
@ -46,6 +48,7 @@ init_per_suite(Config) ->
|
|||
|
||||
end_per_suite(Config) ->
|
||||
emqx_ct_helpers:stop_apps([emqx_management]),
|
||||
ok = meck:unload(emqx_modules),
|
||||
Config.
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
|
|
Loading…
Reference in New Issue