refactor(mgmt_trace): Decorate RPCs

This commit is contained in:
k32 2022-01-18 16:35:48 +01:00
parent 6e3b05d665
commit 4e9fb00a0e
4 changed files with 70 additions and 10 deletions

View File

@ -30,6 +30,7 @@
-export_type([ badrpc/0 -export_type([ badrpc/0
, call_result/0 , call_result/0
, cast_result/0 , cast_result/0
, multicall_result/1
, multicall_result/0 , multicall_result/0
, erpc/1 , erpc/1
, erpc_multicall/1 , erpc_multicall/1
@ -48,7 +49,9 @@
-type cast_result() :: true. -type cast_result() :: true.
-type multicall_result() :: {_Results :: [term()], _BadNodes :: [node()]}. -type multicall_result(Result) :: {[Result], _BadNodes :: [node()]}.
-type multicall_result() :: multicall_result(term()).
-type erpc(Ret) :: {ok, Ret} -type erpc(Ret) :: {ok, Ret}
| {throw, _Err} | {throw, _Err}

View File

@ -204,7 +204,7 @@ get_trace_filename(Name) ->
end end, end end,
transaction(Tran). transaction(Tran).
-spec trace_file(File :: list()) -> -spec trace_file(File :: file:filename_all()) ->
{ok, Node :: list(), Binary :: binary()} | {ok, Node :: list(), Binary :: binary()} |
{error, Node :: list(), Reason :: term()}. {error, Node :: list(), Reason :: term()}.
trace_file(File) -> trace_file(File) ->

View File

@ -241,7 +241,7 @@ trace(get, _Params) ->
List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end, List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end,
emqx_trace:format(List0)), emqx_trace:format(List0)),
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
TraceSize = cluster_call(?MODULE, get_trace_size, [], 30000), TraceSize = wrap_rpc(emqx_mgmt_trace_proto_v1:get_trace_size(Nodes)),
AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize), AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize),
Now = erlang:system_time(second), Now = erlang:system_time(second),
Traces = Traces =
@ -333,13 +333,12 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) ->
end, [], TraceFiles). end, [], TraceFiles).
collect_trace_file(TraceLog) -> collect_trace_file(TraceLog) ->
cluster_call(emqx_trace, trace_file, [TraceLog], 60000).
cluster_call(Mod, Fun, Args, Timeout) ->
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
{GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout), wrap_rpc(emqx_mgmt_trace_proto_v1:trace_file(Nodes, TraceLog)).
wrap_rpc({GoodRes, BadNodes}) ->
BadNodes =/= [] andalso BadNodes =/= [] andalso
?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes, mfa => {Mod, Fun, Args}}), ?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes}),
GoodRes. GoodRes.
stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) -> stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
@ -348,7 +347,7 @@ stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
Bytes = maps:get(<<"bytes">>, Query, 1000), Bytes = maps:get(<<"bytes">>, Query, 1000),
case to_node(Node0) of case to_node(Node0) of
{ok, Node} -> {ok, Node} ->
case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of case emqx_mgmt_trace_proto_v1:read_trace_file(Node, Name, Position, Bytes) of
{ok, Bin} -> {ok, Bin} ->
Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes}, Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
{200, #{meta => Meta, items => Bin}}; {200, #{meta => Meta, items => Bin}};
@ -368,6 +367,7 @@ stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
{error, not_found} -> {400, #{code => 'NODE_ERROR', message => <<"Node not found">>}} {error, not_found} -> {400, #{code => 'NODE_ERROR', message => <<"Node not found">>}}
end. end.
-spec get_trace_size() -> #{{node(), file:name_all()} => non_neg_integer()}.
get_trace_size() -> get_trace_size() ->
TraceDir = emqx_trace:trace_dir(), TraceDir = emqx_trace:trace_dir(),
Node = node(), Node = node(),
@ -381,6 +381,12 @@ get_trace_size() ->
end. end.
%% this is an rpc call for stream_log_file/2 %% this is an rpc call for stream_log_file/2
-spec read_trace_file( binary()
, non_neg_integer()
, non_neg_integer()
) -> {ok, binary()}
| {error, _}
| {eof, non_neg_integer()}.
read_trace_file(Name, Position, Limit) -> read_trace_file(Name, Position, Limit) ->
case emqx_trace:get_trace_filename(Name) of case emqx_trace:get_trace_filename(Name) of
{error, _} = Error -> Error; {error, _} = Error -> Error;

View File

@ -0,0 +1,51 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_trace_proto_v1).
-behaviour(emqx_bpapi).
-export([ introduced_in/0
, trace_file/2
, get_trace_size/1
, read_trace_file/4
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.0.0".
-spec get_trace_size([node()]) ->
emqx_rpc:multicall_result(#{{node(), file:name_all()} => non_neg_integer()}).
get_trace_size(Nodes) ->
rpc:multicall(Nodes, emqx_mgmt_api_trace, get_trace_size, [], 30000).
-spec trace_file([node()], file:name_all()) ->
emqx_rpc:multicall_result(
{ok, Node :: list(), Binary :: binary()} |
{error, Node :: list(), Reason :: term()}).
trace_file(Nodes, File) ->
rpc:multicall(Nodes, emqx_trace, trace_file, [File], 60000).
-spec read_trace_file(node(), binary(), non_neg_integer(), non_neg_integer()) ->
{ok, binary()}
| {error, _}
| {eof, non_neg_integer()}
| {badrpc, _}.
read_trace_file(Node, Name, Position, Limit) ->
rpc:call(Node, emqx_mgmt_api_trace, read_trace_file, [Name, Position, Limit]).