feat: http trace api for 5.0 (#6200)
* feat: port log trace http api for 5.0 * fix: name must printable unicode len<256 * fix: check-nl-at-eof warning * fix: handler_id always atom
This commit is contained in:
parent
6a60c17970
commit
ef0e440d27
|
@ -81,7 +81,7 @@
|
||||||
-define(SUBSCRIPTION, emqx_subscription).
|
-define(SUBSCRIPTION, emqx_subscription).
|
||||||
|
|
||||||
%% Guards
|
%% Guards
|
||||||
-define(is_subid(Id), (is_binary(Id) orelse is_atom(Id))).
|
-define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))).
|
||||||
|
|
||||||
-spec(start_link(atom(), pos_integer()) -> startlink_ret()).
|
-spec(start_link(atom(), pos_integer()) -> startlink_ret()).
|
||||||
start_link(Pool, Id) ->
|
start_link(Pool, Id) ->
|
||||||
|
@ -117,15 +117,17 @@ subscribe(Topic) when is_binary(Topic) ->
|
||||||
subscribe(Topic, undefined).
|
subscribe(Topic, undefined).
|
||||||
|
|
||||||
-spec(subscribe(emqx_types:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok).
|
-spec(subscribe(emqx_types:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok).
|
||||||
subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
|
subscribe(Topic, SubId) when is_binary(Topic), ?IS_SUBID(SubId) ->
|
||||||
subscribe(Topic, SubId, ?DEFAULT_SUBOPTS);
|
subscribe(Topic, SubId, ?DEFAULT_SUBOPTS);
|
||||||
subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
|
subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
|
||||||
subscribe(Topic, undefined, SubOpts).
|
subscribe(Topic, undefined, SubOpts).
|
||||||
|
|
||||||
-spec(subscribe(emqx_types:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok).
|
-spec(subscribe(emqx_types:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok).
|
||||||
subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts0) ->
|
subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?IS_SUBID(SubId), is_map(SubOpts0) ->
|
||||||
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
|
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
|
||||||
case ets:member(?SUBOPTION, {SubPid = self(), Topic}) of
|
_ = emqx_trace:subscribe(Topic, SubId, SubOpts),
|
||||||
|
SubPid = self(),
|
||||||
|
case ets:member(?SUBOPTION, {SubPid, Topic}) of
|
||||||
false -> %% New
|
false -> %% New
|
||||||
ok = emqx_broker_helper:register_sub(SubPid, SubId),
|
ok = emqx_broker_helper:register_sub(SubPid, SubId),
|
||||||
do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts));
|
do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts));
|
||||||
|
@ -171,6 +173,7 @@ unsubscribe(Topic) when is_binary(Topic) ->
|
||||||
case ets:lookup(?SUBOPTION, {SubPid, Topic}) of
|
case ets:lookup(?SUBOPTION, {SubPid, Topic}) of
|
||||||
[{_, SubOpts}] ->
|
[{_, SubOpts}] ->
|
||||||
_ = emqx_broker_helper:reclaim_seq(Topic),
|
_ = emqx_broker_helper:reclaim_seq(Topic),
|
||||||
|
_ = emqx_trace:unsubscribe(Topic, SubOpts),
|
||||||
do_unsubscribe(Topic, SubPid, SubOpts);
|
do_unsubscribe(Topic, SubPid, SubOpts);
|
||||||
[] -> ok
|
[] -> ok
|
||||||
end.
|
end.
|
||||||
|
@ -198,7 +201,7 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
|
||||||
|
|
||||||
-spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
|
-spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
|
||||||
publish(Msg) when is_record(Msg, message) ->
|
publish(Msg) when is_record(Msg, message) ->
|
||||||
_ = emqx_tracer:trace(publish, Msg),
|
_ = emqx_trace:publish(Msg),
|
||||||
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
|
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
|
||||||
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
|
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
|
||||||
#message{headers = #{allow_publish := false}} ->
|
#message{headers = #{allow_publish := false}} ->
|
||||||
|
@ -267,7 +270,7 @@ aggre(Routes) ->
|
||||||
end, [], Routes).
|
end, [], Routes).
|
||||||
|
|
||||||
%% @doc Forward message to another node.
|
%% @doc Forward message to another node.
|
||||||
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync|async)
|
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync | async)
|
||||||
-> emqx_types:deliver_result()).
|
-> emqx_types:deliver_result()).
|
||||||
forward(Node, To, Delivery, async) ->
|
forward(Node, To, Delivery, async) ->
|
||||||
case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of
|
case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of
|
||||||
|
@ -380,14 +383,14 @@ subscriptions(SubId) ->
|
||||||
-spec(subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean()).
|
-spec(subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean()).
|
||||||
subscribed(SubPid, Topic) when is_pid(SubPid) ->
|
subscribed(SubPid, Topic) when is_pid(SubPid) ->
|
||||||
ets:member(?SUBOPTION, {SubPid, Topic});
|
ets:member(?SUBOPTION, {SubPid, Topic});
|
||||||
subscribed(SubId, Topic) when ?is_subid(SubId) ->
|
subscribed(SubId, Topic) when ?IS_SUBID(SubId) ->
|
||||||
SubPid = emqx_broker_helper:lookup_subpid(SubId),
|
SubPid = emqx_broker_helper:lookup_subpid(SubId),
|
||||||
ets:member(?SUBOPTION, {SubPid, Topic}).
|
ets:member(?SUBOPTION, {SubPid, Topic}).
|
||||||
|
|
||||||
-spec(get_subopts(pid(), emqx_types:topic()) -> maybe(emqx_types:subopts())).
|
-spec(get_subopts(pid(), emqx_types:topic()) -> maybe(emqx_types:subopts())).
|
||||||
get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) ->
|
get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) ->
|
||||||
lookup_value(?SUBOPTION, {SubPid, Topic});
|
lookup_value(?SUBOPTION, {SubPid, Topic});
|
||||||
get_subopts(SubId, Topic) when ?is_subid(SubId) ->
|
get_subopts(SubId, Topic) when ?IS_SUBID(SubId) ->
|
||||||
case emqx_broker_helper:lookup_subpid(SubId) of
|
case emqx_broker_helper:lookup_subpid(SubId) of
|
||||||
SubPid when is_pid(SubPid) ->
|
SubPid when is_pid(SubPid) ->
|
||||||
get_subopts(SubPid, Topic);
|
get_subopts(SubPid, Topic);
|
||||||
|
@ -455,7 +458,8 @@ handle_call({subscribe, Topic}, _From, State) ->
|
||||||
{reply, Ok, State};
|
{reply, Ok, State};
|
||||||
|
|
||||||
handle_call({subscribe, Topic, I}, _From, State) ->
|
handle_call({subscribe, Topic, I}, _From, State) ->
|
||||||
Ok = case get(Shard = {Topic, I}) of
|
Shard = {Topic, I},
|
||||||
|
Ok = case get(Shard) of
|
||||||
undefined ->
|
undefined ->
|
||||||
_ = put(Shard, true),
|
_ = put(Shard, true),
|
||||||
true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}),
|
true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}),
|
||||||
|
@ -512,4 +516,3 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -287,7 +287,7 @@ handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = ConnState})
|
||||||
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
|
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
|
||||||
handle_out(connack, ?RC_PROTOCOL_ERROR, Channel);
|
handle_out(connack, ?RC_PROTOCOL_ERROR, Channel);
|
||||||
|
|
||||||
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
|
||||||
case pipeline([fun overload_protection/2,
|
case pipeline([fun overload_protection/2,
|
||||||
fun enrich_conninfo/2,
|
fun enrich_conninfo/2,
|
||||||
fun run_conn_hooks/2,
|
fun run_conn_hooks/2,
|
||||||
|
@ -297,6 +297,7 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
||||||
fun check_banned/2
|
fun check_banned/2
|
||||||
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
||||||
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
|
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
|
||||||
|
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
|
||||||
NChannel1 = NChannel#channel{
|
NChannel1 = NChannel#channel{
|
||||||
will_msg = emqx_packet:will_msg(NConnPkt),
|
will_msg = emqx_packet:will_msg(NConnPkt),
|
||||||
alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
|
alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
|
||||||
|
|
|
@ -679,7 +679,7 @@ next_incoming_msgs(Packets) ->
|
||||||
|
|
||||||
handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
|
handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
|
||||||
ok = inc_incoming_stats(Packet),
|
ok = inc_incoming_stats(Packet),
|
||||||
?SLOG(debug, #{msg => "RECV_packet", packet => Packet}),
|
?SLOG(debug, #{msg => "RECV_packet", packet => emqx_packet:format(Packet)}),
|
||||||
with_channel(handle_in, [Packet], State);
|
with_channel(handle_in, [Packet], State);
|
||||||
|
|
||||||
handle_incoming(FrameError, State) ->
|
handle_incoming(FrameError, State) ->
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
-behaviour(emqx_config_handler).
|
-behaviour(emqx_config_handler).
|
||||||
|
-elvis([{elvis_style, god_modules, disable}]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([ start_link/0
|
-export([ start_link/0
|
||||||
|
@ -78,10 +79,11 @@
|
||||||
id := logger:handler_id(),
|
id := logger:handler_id(),
|
||||||
level := logger:level(),
|
level := logger:level(),
|
||||||
dst := logger_dst(),
|
dst := logger_dst(),
|
||||||
|
filters := [{logger:filter_id(), logger:filter()}],
|
||||||
status := started | stopped
|
status := started | stopped
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-define(stopped_handlers, {?MODULE, stopped_handlers}).
|
-define(STOPPED_HANDLERS, {?MODULE, stopped_handlers}).
|
||||||
-define(CONF_PATH, [log]).
|
-define(CONF_PATH, [log]).
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
|
@ -238,19 +240,19 @@ get_log_handlers() ->
|
||||||
|
|
||||||
-spec(get_log_handlers(started | stopped) -> [logger_handler_info()]).
|
-spec(get_log_handlers(started | stopped) -> [logger_handler_info()]).
|
||||||
get_log_handlers(started) ->
|
get_log_handlers(started) ->
|
||||||
[log_hanlder_info(Conf, started) || Conf <- logger:get_handler_config()];
|
[log_handler_info(Conf, started) || Conf <- logger:get_handler_config()];
|
||||||
get_log_handlers(stopped) ->
|
get_log_handlers(stopped) ->
|
||||||
[log_hanlder_info(Conf, stopped) || Conf <- list_stopped_handler_config()].
|
[log_handler_info(Conf, stopped) || Conf <- list_stopped_handler_config()].
|
||||||
|
|
||||||
-spec(get_log_handler(logger:handler_id()) -> logger_handler_info()).
|
-spec(get_log_handler(logger:handler_id()) -> logger_handler_info()).
|
||||||
get_log_handler(HandlerId) ->
|
get_log_handler(HandlerId) ->
|
||||||
case logger:get_handler_config(HandlerId) of
|
case logger:get_handler_config(HandlerId) of
|
||||||
{ok, Conf} ->
|
{ok, Conf} ->
|
||||||
log_hanlder_info(Conf, started);
|
log_handler_info(Conf, started);
|
||||||
{error, _} ->
|
{error, _} ->
|
||||||
case read_stopped_handler_config(HandlerId) of
|
case read_stopped_handler_config(HandlerId) of
|
||||||
error -> {error, {not_found, HandlerId}};
|
error -> {error, {not_found, HandlerId}};
|
||||||
{ok, Conf} -> log_hanlder_info(Conf, stopped)
|
{ok, Conf} -> log_handler_info(Conf, stopped)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -305,21 +307,21 @@ set_log_level(Level) ->
|
||||||
%% Internal Functions
|
%% Internal Functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
log_hanlder_info(#{id := Id, level := Level, module := logger_std_h,
|
log_handler_info(#{id := Id, level := Level, module := logger_std_h,
|
||||||
config := #{type := Type}}, Status) when
|
filters := Filters, config := #{type := Type}}, Status) when
|
||||||
Type =:= standard_io;
|
Type =:= standard_io;
|
||||||
Type =:= standard_error ->
|
Type =:= standard_error ->
|
||||||
#{id => Id, level => Level, dst => console, status => Status};
|
#{id => Id, level => Level, dst => console, status => Status, filters => Filters};
|
||||||
log_hanlder_info(#{id := Id, level := Level, module := logger_std_h,
|
log_handler_info(#{id := Id, level := Level, module := logger_std_h,
|
||||||
config := Config = #{type := file}}, Status) ->
|
filters := Filters, config := Config = #{type := file}}, Status) ->
|
||||||
#{id => Id, level => Level, status => Status,
|
#{id => Id, level => Level, status => Status, filters => Filters,
|
||||||
dst => maps:get(file, Config, atom_to_list(Id))};
|
dst => maps:get(file, Config, atom_to_list(Id))};
|
||||||
|
|
||||||
log_hanlder_info(#{id := Id, level := Level, module := logger_disk_log_h,
|
log_handler_info(#{id := Id, level := Level, module := logger_disk_log_h,
|
||||||
config := #{file := Filename}}, Status) ->
|
filters := Filters, config := #{file := Filename}}, Status) ->
|
||||||
#{id => Id, level => Level, dst => Filename, status => Status};
|
#{id => Id, level => Level, dst => Filename, status => Status, filters => Filters};
|
||||||
log_hanlder_info(#{id := Id, level := Level, module := _OtherModule}, Status) ->
|
log_handler_info(#{id := Id, level := Level, filters := Filters}, Status) ->
|
||||||
#{id => Id, level => Level, dst => unknown, status => Status}.
|
#{id => Id, level => Level, dst => unknown, status => Status, filters => Filters}.
|
||||||
|
|
||||||
%% set level for all log handlers in one command
|
%% set level for all log handlers in one command
|
||||||
set_all_log_handlers_level(Level) ->
|
set_all_log_handlers_level(Level) ->
|
||||||
|
@ -341,29 +343,29 @@ rollback([{ID, Level} | List]) ->
|
||||||
rollback([]) -> ok.
|
rollback([]) -> ok.
|
||||||
|
|
||||||
save_stopped_handler_config(HandlerId, Config) ->
|
save_stopped_handler_config(HandlerId, Config) ->
|
||||||
case persistent_term:get(?stopped_handlers, undefined) of
|
case persistent_term:get(?STOPPED_HANDLERS, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
persistent_term:put(?stopped_handlers, #{HandlerId => Config});
|
persistent_term:put(?STOPPED_HANDLERS, #{HandlerId => Config});
|
||||||
ConfList ->
|
ConfList ->
|
||||||
persistent_term:put(?stopped_handlers, ConfList#{HandlerId => Config})
|
persistent_term:put(?STOPPED_HANDLERS, ConfList#{HandlerId => Config})
|
||||||
end.
|
end.
|
||||||
read_stopped_handler_config(HandlerId) ->
|
read_stopped_handler_config(HandlerId) ->
|
||||||
case persistent_term:get(?stopped_handlers, undefined) of
|
case persistent_term:get(?STOPPED_HANDLERS, undefined) of
|
||||||
undefined -> error;
|
undefined -> error;
|
||||||
ConfList -> maps:find(HandlerId, ConfList)
|
ConfList -> maps:find(HandlerId, ConfList)
|
||||||
end.
|
end.
|
||||||
remove_stopped_handler_config(HandlerId) ->
|
remove_stopped_handler_config(HandlerId) ->
|
||||||
case persistent_term:get(?stopped_handlers, undefined) of
|
case persistent_term:get(?STOPPED_HANDLERS, undefined) of
|
||||||
undefined -> ok;
|
undefined -> ok;
|
||||||
ConfList ->
|
ConfList ->
|
||||||
case maps:find(HandlerId, ConfList) of
|
case maps:find(HandlerId, ConfList) of
|
||||||
error -> ok;
|
error -> ok;
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
persistent_term:put(?stopped_handlers, maps:remove(HandlerId, ConfList))
|
persistent_term:put(?STOPPED_HANDLERS, maps:remove(HandlerId, ConfList))
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
list_stopped_handler_config() ->
|
list_stopped_handler_config() ->
|
||||||
case persistent_term:get(?stopped_handlers, undefined) of
|
case persistent_term:get(?STOPPED_HANDLERS, undefined) of
|
||||||
undefined -> [];
|
undefined -> [];
|
||||||
ConfList -> maps:values(ConfList)
|
ConfList -> maps:values(ConfList)
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -0,0 +1,486 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_trace).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
%% Mnesia bootstrap
|
||||||
|
-export([mnesia/1]).
|
||||||
|
|
||||||
|
-boot_mnesia({mnesia, [boot]}).
|
||||||
|
|
||||||
|
-export([ publish/1
|
||||||
|
, subscribe/3
|
||||||
|
, unsubscribe/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ start_link/0
|
||||||
|
, list/0
|
||||||
|
, list/1
|
||||||
|
, get_trace_filename/1
|
||||||
|
, create/1
|
||||||
|
, delete/1
|
||||||
|
, clear/0
|
||||||
|
, update/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ format/1
|
||||||
|
, zip_dir/0
|
||||||
|
, filename/2
|
||||||
|
, trace_dir/0
|
||||||
|
, trace_file/1
|
||||||
|
, delete_files_after_send/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-define(TRACE, ?MODULE).
|
||||||
|
-define(MAX_SIZE, 30).
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-export([log_file/2]).
|
||||||
|
-endif.
|
||||||
|
|
||||||
|
-export_type([ip_address/0]).
|
||||||
|
-type ip_address() :: string().
|
||||||
|
|
||||||
|
-record(?TRACE,
|
||||||
|
{ name :: binary() | undefined | '_'
|
||||||
|
, type :: clientid | topic | ip_address | undefined | '_'
|
||||||
|
, filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_'
|
||||||
|
, enable = true :: boolean() | '_'
|
||||||
|
, start_at :: integer() | undefined | '_'
|
||||||
|
, end_at :: integer() | undefined | '_'
|
||||||
|
}).
|
||||||
|
|
||||||
|
mnesia(boot) ->
|
||||||
|
ok = mria:create_table(?TRACE, [
|
||||||
|
{type, set},
|
||||||
|
{rlog_shard, ?COMMON_SHARD},
|
||||||
|
{storage, disc_copies},
|
||||||
|
{record_name, ?TRACE},
|
||||||
|
{attributes, record_info(fields, ?TRACE)}]).
|
||||||
|
|
||||||
|
publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore;
|
||||||
|
publish(#message{from = From, topic = Topic, payload = Payload}) when
|
||||||
|
is_binary(From); is_atom(From) ->
|
||||||
|
emqx_logger:info(
|
||||||
|
#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}},
|
||||||
|
"PUBLISH to ~s: ~0p",
|
||||||
|
[Topic, Payload]
|
||||||
|
).
|
||||||
|
|
||||||
|
subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore;
|
||||||
|
subscribe(Topic, SubId, SubOpts) ->
|
||||||
|
emqx_logger:info(
|
||||||
|
#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}},
|
||||||
|
"~ts SUBSCRIBE ~ts: Options: ~0p",
|
||||||
|
[SubId, Topic, SubOpts]
|
||||||
|
).
|
||||||
|
|
||||||
|
unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore;
|
||||||
|
unsubscribe(Topic, SubOpts) ->
|
||||||
|
emqx_logger:info(
|
||||||
|
#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}},
|
||||||
|
"~ts UNSUBSCRIBE ~ts: Options: ~0p",
|
||||||
|
[maps:get(subid, SubOpts, ""), Topic, SubOpts]
|
||||||
|
).
|
||||||
|
|
||||||
|
-spec(start_link() -> emqx_types:startlink_ret()).
|
||||||
|
start_link() ->
|
||||||
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
-spec list() -> [tuple()].
|
||||||
|
list() ->
|
||||||
|
ets:match_object(?TRACE, #?TRACE{_ = '_'}).
|
||||||
|
|
||||||
|
-spec list(boolean()) -> [tuple()].
|
||||||
|
list(Enable) ->
|
||||||
|
ets:match_object(?TRACE, #?TRACE{enable = Enable, _ = '_'}).
|
||||||
|
|
||||||
|
-spec create([{Key :: binary(), Value :: binary()}] | #{atom() => binary()}) ->
|
||||||
|
ok | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}.
|
||||||
|
create(Trace) ->
|
||||||
|
case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of
|
||||||
|
true ->
|
||||||
|
case to_trace(Trace) of
|
||||||
|
{ok, TraceRec} -> insert_new_trace(TraceRec);
|
||||||
|
{error, Reason} -> {error, Reason}
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
{error, "The number of traces created has reache the maximum"
|
||||||
|
" please delete the useless ones first"}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec delete(Name :: binary()) -> ok | {error, not_found}.
|
||||||
|
delete(Name) ->
|
||||||
|
Tran = fun() ->
|
||||||
|
case mnesia:read(?TRACE, Name) of
|
||||||
|
[_] -> mnesia:delete(?TRACE, Name, write);
|
||||||
|
[] -> mnesia:abort(not_found)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
transaction(Tran).
|
||||||
|
|
||||||
|
-spec clear() -> ok | {error, Reason :: term()}.
|
||||||
|
clear() ->
|
||||||
|
case mria:clear_table(?TRACE) of
|
||||||
|
{atomic, ok} -> ok;
|
||||||
|
{aborted, Reason} -> {error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec update(Name :: binary(), Enable :: boolean()) ->
|
||||||
|
ok | {error, not_found | finished}.
|
||||||
|
update(Name, Enable) ->
|
||||||
|
Tran = fun() ->
|
||||||
|
case mnesia:read(?TRACE, Name) of
|
||||||
|
[] -> mnesia:abort(not_found);
|
||||||
|
[#?TRACE{enable = Enable}] -> ok;
|
||||||
|
[Rec] ->
|
||||||
|
case erlang:system_time(second) >= Rec#?TRACE.end_at of
|
||||||
|
false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write);
|
||||||
|
true -> mnesia:abort(finished)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
transaction(Tran).
|
||||||
|
|
||||||
|
-spec get_trace_filename(Name :: binary()) ->
|
||||||
|
{ok, FileName :: string()} | {error, not_found}.
|
||||||
|
get_trace_filename(Name) ->
|
||||||
|
Tran = fun() ->
|
||||||
|
case mnesia:read(?TRACE, Name, read) of
|
||||||
|
[] -> mnesia:abort(not_found);
|
||||||
|
[#?TRACE{start_at = Start}] -> {ok, filename(Name, Start)}
|
||||||
|
end end,
|
||||||
|
transaction(Tran).
|
||||||
|
|
||||||
|
-spec trace_file(File :: list()) ->
|
||||||
|
{ok, Node :: list(), Binary :: binary()} |
|
||||||
|
{error, Node :: list(), Reason :: term()}.
|
||||||
|
trace_file(File) ->
|
||||||
|
FileName = filename:join(trace_dir(), File),
|
||||||
|
Node = atom_to_list(node()),
|
||||||
|
case file:read_file(FileName) of
|
||||||
|
{ok, Bin} -> {ok, Node, Bin};
|
||||||
|
{error, Reason} -> {error, Node, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
delete_files_after_send(TraceLog, Zips) ->
|
||||||
|
gen_server:cast(?MODULE, {delete_tag, self(), [TraceLog | Zips]}).
|
||||||
|
|
||||||
|
-spec format(list(#?TRACE{})) -> list(map()).
|
||||||
|
format(Traces) ->
|
||||||
|
Fields = record_info(fields, ?TRACE),
|
||||||
|
lists:map(fun(Trace0 = #?TRACE{}) ->
|
||||||
|
[_ | Values] = tuple_to_list(Trace0),
|
||||||
|
maps:from_list(lists:zip(Fields, Values))
|
||||||
|
end, Traces).
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
erlang:process_flag(trap_exit, true),
|
||||||
|
OriginLogLevel = emqx_logger:get_primary_log_level(),
|
||||||
|
ok = filelib:ensure_dir(trace_dir()),
|
||||||
|
ok = filelib:ensure_dir(zip_dir()),
|
||||||
|
{ok, _} = mnesia:subscribe({table, ?TRACE, simple}),
|
||||||
|
Traces = get_enable_trace(),
|
||||||
|
ok = update_log_primary_level(Traces, OriginLogLevel),
|
||||||
|
TRef = update_trace(Traces),
|
||||||
|
{ok, #{timer => TRef, monitors => #{}, primary_log_level => OriginLogLevel}}.
|
||||||
|
|
||||||
|
handle_call(Req, _From, State) ->
|
||||||
|
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||||
|
{reply, ok, State}.
|
||||||
|
|
||||||
|
handle_cast({delete_tag, Pid, Files}, State = #{monitors := Monitors}) ->
|
||||||
|
erlang:monitor(process, Pid),
|
||||||
|
{noreply, State#{monitors => Monitors#{Pid => Files}}};
|
||||||
|
handle_cast(Msg, State) ->
|
||||||
|
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitors}) ->
|
||||||
|
case maps:take(Pid, Monitors) of
|
||||||
|
error -> {noreply, State};
|
||||||
|
{Files, NewMonitors} ->
|
||||||
|
lists:foreach(fun file:delete/1, Files),
|
||||||
|
{noreply, State#{monitors => NewMonitors}}
|
||||||
|
end;
|
||||||
|
handle_info({timeout, TRef, update_trace},
|
||||||
|
#{timer := TRef, primary_log_level := OriginLogLevel} = State) ->
|
||||||
|
Traces = get_enable_trace(),
|
||||||
|
ok = update_log_primary_level(Traces, OriginLogLevel),
|
||||||
|
NextTRef = update_trace(Traces),
|
||||||
|
{noreply, State#{timer => NextTRef}};
|
||||||
|
|
||||||
|
handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) ->
|
||||||
|
emqx_misc:cancel_timer(TRef),
|
||||||
|
handle_info({timeout, TRef, update_trace}, State);
|
||||||
|
|
||||||
|
handle_info(Info, State) ->
|
||||||
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(_Reason, #{timer := TRef, primary_log_level := OriginLogLevel}) ->
|
||||||
|
ok = set_log_primary_level(OriginLogLevel),
|
||||||
|
_ = mnesia:unsubscribe({table, ?TRACE, simple}),
|
||||||
|
emqx_misc:cancel_timer(TRef),
|
||||||
|
stop_all_trace_handler(),
|
||||||
|
_ = file:del_dir_r(zip_dir()),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
insert_new_trace(Trace) ->
|
||||||
|
Tran = fun() ->
|
||||||
|
case mnesia:read(?TRACE, Trace#?TRACE.name) of
|
||||||
|
[] ->
|
||||||
|
#?TRACE{start_at = StartAt, type = Type, filter = Filter} = Trace,
|
||||||
|
Match = #?TRACE{_ = '_', start_at = StartAt, type = Type, filter = Filter},
|
||||||
|
case mnesia:match_object(?TRACE, Match, read) of
|
||||||
|
[] -> mnesia:write(?TRACE, Trace, write);
|
||||||
|
[#?TRACE{name = Name}] -> mnesia:abort({duplicate_condition, Name})
|
||||||
|
end;
|
||||||
|
[#?TRACE{name = Name}] -> mnesia:abort({already_existed, Name})
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
transaction(Tran).
|
||||||
|
|
||||||
|
update_trace(Traces) ->
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
{_Waiting, Running, Finished} = classify_by_time(Traces, Now),
|
||||||
|
disable_finished(Finished),
|
||||||
|
Started = emqx_trace_handler:running(),
|
||||||
|
{NeedRunning, AllStarted} = start_trace(Running, Started),
|
||||||
|
NeedStop = AllStarted -- NeedRunning,
|
||||||
|
ok = stop_trace(NeedStop, Started),
|
||||||
|
clean_stale_trace_files(),
|
||||||
|
NextTime = find_closest_time(Traces, Now),
|
||||||
|
emqx_misc:start_timer(NextTime, update_trace).
|
||||||
|
|
||||||
|
stop_all_trace_handler() ->
|
||||||
|
lists:foreach(fun(#{id := Id}) -> emqx_trace_handler:uninstall(Id) end,
|
||||||
|
emqx_trace_handler:running()).
|
||||||
|
get_enable_trace() ->
|
||||||
|
{atomic, Traces} =
|
||||||
|
mria:transaction(?COMMON_SHARD, fun() ->
|
||||||
|
mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read)
|
||||||
|
end),
|
||||||
|
Traces.
|
||||||
|
|
||||||
|
find_closest_time(Traces, Now) ->
|
||||||
|
Sec =
|
||||||
|
lists:foldl(
|
||||||
|
fun(#?TRACE{start_at = Start, end_at = End}, Closest)
|
||||||
|
when Start >= Now andalso Now < End -> %% running
|
||||||
|
min(End - Now, Closest);
|
||||||
|
(#?TRACE{start_at = Start}, Closest) when Start < Now -> %% waiting
|
||||||
|
min(Now - Start, Closest);
|
||||||
|
(_, Closest) -> Closest %% finished
|
||||||
|
end, 60 * 15, Traces),
|
||||||
|
timer:seconds(Sec).
|
||||||
|
|
||||||
|
disable_finished([]) -> ok;
|
||||||
|
disable_finished(Traces) ->
|
||||||
|
transaction(fun() ->
|
||||||
|
lists:map(fun(#?TRACE{name = Name}) ->
|
||||||
|
case mnesia:read(?TRACE, Name, write) of
|
||||||
|
[] -> ok;
|
||||||
|
[Trace] -> mnesia:write(?TRACE, Trace#?TRACE{enable = false}, write)
|
||||||
|
end end, Traces)
|
||||||
|
end).
|
||||||
|
|
||||||
|
start_trace(Traces, Started0) ->
|
||||||
|
Started = lists:map(fun(#{name := Name}) -> Name end, Started0),
|
||||||
|
lists:foldl(fun(#?TRACE{name = Name} = Trace, {Running, StartedAcc}) ->
|
||||||
|
case lists:member(Name, StartedAcc) of
|
||||||
|
true ->
|
||||||
|
{[Name | Running], StartedAcc};
|
||||||
|
false ->
|
||||||
|
case start_trace(Trace) of
|
||||||
|
ok -> {[Name | Running], [Name | StartedAcc]};
|
||||||
|
{error, _Reason} -> {[Name | Running], StartedAcc}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end, {[], Started}, Traces).
|
||||||
|
|
||||||
|
start_trace(Trace) ->
|
||||||
|
#?TRACE{name = Name
|
||||||
|
, type = Type
|
||||||
|
, filter = Filter
|
||||||
|
, start_at = Start
|
||||||
|
} = Trace,
|
||||||
|
Who = #{name => Name, type => Type, filter => Filter},
|
||||||
|
emqx_trace_handler:install(Who, debug, log_file(Name, Start)).
|
||||||
|
|
||||||
|
stop_trace(Finished, Started) ->
|
||||||
|
lists:foreach(fun(#{name := Name, type := Type}) ->
|
||||||
|
case lists:member(Name, Finished) of
|
||||||
|
true -> emqx_trace_handler:uninstall(Type, Name);
|
||||||
|
false -> ok
|
||||||
|
end
|
||||||
|
end, Started).
|
||||||
|
|
||||||
|
clean_stale_trace_files() ->
|
||||||
|
TraceDir = trace_dir(),
|
||||||
|
case file:list_dir(TraceDir) of
|
||||||
|
{ok, AllFiles} when AllFiles =/= ["zip"] ->
|
||||||
|
FileFun = fun(#?TRACE{name = Name, start_at = StartAt}) -> filename(Name, StartAt) end,
|
||||||
|
KeepFiles = lists:map(FileFun, list()),
|
||||||
|
case AllFiles -- ["zip" | KeepFiles] of
|
||||||
|
[] -> ok;
|
||||||
|
DeleteFiles ->
|
||||||
|
DelFun = fun(F) -> file:delete(filename:join(TraceDir, F)) end,
|
||||||
|
lists:foreach(DelFun, DeleteFiles)
|
||||||
|
end;
|
||||||
|
_ -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
classify_by_time(Traces, Now) ->
|
||||||
|
classify_by_time(Traces, Now, [], [], []).
|
||||||
|
|
||||||
|
classify_by_time([], _Now, Wait, Run, Finish) -> {Wait, Run, Finish};
|
||||||
|
classify_by_time([Trace = #?TRACE{start_at = Start} | Traces],
|
||||||
|
Now, Wait, Run, Finish) when Start > Now ->
|
||||||
|
classify_by_time(Traces, Now, [Trace | Wait], Run, Finish);
|
||||||
|
classify_by_time([Trace = #?TRACE{end_at = End} | Traces],
|
||||||
|
Now, Wait, Run, Finish) when End =< Now ->
|
||||||
|
classify_by_time(Traces, Now, Wait, Run, [Trace | Finish]);
|
||||||
|
classify_by_time([Trace | Traces], Now, Wait, Run, Finish) ->
|
||||||
|
classify_by_time(Traces, Now, Wait, [Trace | Run], Finish).
|
||||||
|
|
||||||
|
to_trace(TraceParam) ->
|
||||||
|
case to_trace(ensure_proplists(TraceParam), #?TRACE{}) of
|
||||||
|
{error, Reason} -> {error, Reason};
|
||||||
|
{ok, #?TRACE{name = undefined}} ->
|
||||||
|
{error, "name required"};
|
||||||
|
{ok, #?TRACE{type = undefined}} ->
|
||||||
|
{error, "type=[topic,clientid,ip_address] required"};
|
||||||
|
{ok, #?TRACE{filter = undefined}} ->
|
||||||
|
{error, "topic/clientid/ip_address filter required"};
|
||||||
|
{ok, TraceRec0} ->
|
||||||
|
case fill_default(TraceRec0) of
|
||||||
|
#?TRACE{start_at = Start, end_at = End} when End =< Start ->
|
||||||
|
{error, "failed by start_at >= end_at"};
|
||||||
|
TraceRec -> {ok, TraceRec}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
ensure_proplists(#{} = Trace) -> maps:to_list(Trace);
|
||||||
|
ensure_proplists(Trace) when is_list(Trace) ->
|
||||||
|
lists:foldl(
|
||||||
|
fun({K, V}, Acc) when is_binary(K) -> [{binary_to_existing_atom(K), V} | Acc];
|
||||||
|
({K, V}, Acc) when is_atom(K) -> [{K, V} | Acc];
|
||||||
|
(_, Acc) -> Acc
|
||||||
|
end, [], Trace).
|
||||||
|
|
||||||
|
fill_default(Trace = #?TRACE{start_at = undefined}) ->
|
||||||
|
fill_default(Trace#?TRACE{start_at = erlang:system_time(second)});
|
||||||
|
fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) ->
|
||||||
|
fill_default(Trace#?TRACE{end_at = StartAt + 10 * 60});
|
||||||
|
fill_default(Trace) -> Trace.
|
||||||
|
|
||||||
|
to_trace([], Rec) -> {ok, Rec};
|
||||||
|
to_trace([{name, Name} | Trace], Rec) ->
|
||||||
|
case io_lib:printable_unicode_list(unicode:characters_to_list(Name, utf8)) of
|
||||||
|
true ->
|
||||||
|
case binary:match(Name, [<<"/">>], []) of
|
||||||
|
nomatch -> to_trace(Trace, Rec#?TRACE{name = Name});
|
||||||
|
_ -> {error, "name cannot contain /"}
|
||||||
|
end;
|
||||||
|
false -> {error, "name must printable unicode"}
|
||||||
|
end;
|
||||||
|
to_trace([{type, Type} | Trace], Rec) ->
|
||||||
|
case lists:member(Type, [<<"clientid">>, <<"topic">>, <<"ip_address">>]) of
|
||||||
|
true -> to_trace(Trace, Rec#?TRACE{type = binary_to_existing_atom(Type)});
|
||||||
|
false -> {error, "incorrect type: only support clientid/topic/ip_address"}
|
||||||
|
end;
|
||||||
|
to_trace([{topic, Topic} | Trace], Rec) ->
|
||||||
|
case validate_topic(Topic) of
|
||||||
|
ok -> to_trace(Trace, Rec#?TRACE{filter = Topic});
|
||||||
|
{error, Reason} -> {error, Reason}
|
||||||
|
end;
|
||||||
|
to_trace([{clientid, ClientId} | Trace], Rec) ->
|
||||||
|
to_trace(Trace, Rec#?TRACE{filter = ClientId});
|
||||||
|
to_trace([{ip_address, IP} | Trace], Rec) ->
|
||||||
|
case inet:parse_address(binary_to_list(IP)) of
|
||||||
|
{ok, _} -> to_trace(Trace, Rec#?TRACE{filter = binary_to_list(IP)});
|
||||||
|
{error, Reason} -> {error, lists:flatten(io_lib:format("ip address: ~p", [Reason]))}
|
||||||
|
end;
|
||||||
|
to_trace([{start_at, StartAt} | Trace], Rec) ->
|
||||||
|
case to_system_second(StartAt) of
|
||||||
|
{ok, Sec} -> to_trace(Trace, Rec#?TRACE{start_at = Sec});
|
||||||
|
{error, Reason} -> {error, Reason}
|
||||||
|
end;
|
||||||
|
to_trace([{end_at, EndAt} | Trace], Rec) ->
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
case to_system_second(EndAt) of
|
||||||
|
{ok, Sec} when Sec > Now ->
|
||||||
|
to_trace(Trace, Rec#?TRACE{end_at = Sec});
|
||||||
|
{ok, _Sec} ->
|
||||||
|
{error, "end_at time has already passed"};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end;
|
||||||
|
to_trace([Unknown | _Trace], _Rec) -> {error, io_lib:format("unknown field: ~p", [Unknown])}.
|
||||||
|
|
||||||
|
validate_topic(TopicName) ->
|
||||||
|
try emqx_topic:validate(filter, TopicName) of
|
||||||
|
true -> ok
|
||||||
|
catch
|
||||||
|
error:Error ->
|
||||||
|
{error, io_lib:format("topic: ~s invalid by ~p", [TopicName, Error])}
|
||||||
|
end.
|
||||||
|
|
||||||
|
to_system_second(At) ->
|
||||||
|
try
|
||||||
|
Sec = calendar:rfc3339_to_system_time(binary_to_list(At), [{unit, second}]),
|
||||||
|
{ok, Sec}
|
||||||
|
catch error: {badmatch, _} ->
|
||||||
|
{error, ["The rfc3339 specification not satisfied: ", At]}
|
||||||
|
end.
|
||||||
|
|
||||||
|
zip_dir() ->
|
||||||
|
trace_dir() ++ "zip/".
|
||||||
|
|
||||||
|
trace_dir() ->
|
||||||
|
filename:join(emqx:data_dir(), "trace") ++ "/".
|
||||||
|
|
||||||
|
log_file(Name, Start) ->
|
||||||
|
filename:join(trace_dir(), filename(Name, Start)).
|
||||||
|
|
||||||
|
filename(Name, Start) ->
|
||||||
|
[Time, _] = string:split(calendar:system_time_to_rfc3339(Start), "T", leading),
|
||||||
|
lists:flatten(["trace_", binary_to_list(Name), "_", Time, ".log"]).
|
||||||
|
|
||||||
|
transaction(Tran) ->
|
||||||
|
case mria:transaction(?COMMON_SHARD, Tran) of
|
||||||
|
{atomic, Res} -> Res;
|
||||||
|
{aborted, Reason} -> {error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
update_log_primary_level([], OriginLevel) -> set_log_primary_level(OriginLevel);
|
||||||
|
update_log_primary_level(_, _) -> set_log_primary_level(debug).
|
||||||
|
|
||||||
|
set_log_primary_level(NewLevel) ->
|
||||||
|
case NewLevel =/= emqx_logger:get_primary_log_level() of
|
||||||
|
true -> emqx_logger:set_primary_log_level(NewLevel);
|
||||||
|
false -> ok
|
||||||
|
end.
|
|
@ -0,0 +1,210 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_trace_api).
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("kernel/include/file.hrl").
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([ list_trace/2
|
||||||
|
, create_trace/2
|
||||||
|
, update_trace/2
|
||||||
|
, delete_trace/2
|
||||||
|
, clear_traces/2
|
||||||
|
, download_zip_log/2
|
||||||
|
, stream_log_file/2
|
||||||
|
]).
|
||||||
|
-export([ read_trace_file/3
|
||||||
|
, get_trace_size/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
-define(TO_BIN(_B_), iolist_to_binary(_B_)).
|
||||||
|
-define(NOT_FOUND(N), {error, 'NOT_FOUND', ?TO_BIN([N, " NOT FOUND"])}).
|
||||||
|
|
||||||
|
list_trace(_, _Params) ->
|
||||||
|
case emqx_trace:list() of
|
||||||
|
[] -> {ok, []};
|
||||||
|
List0 ->
|
||||||
|
List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end, List0),
|
||||||
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
|
TraceSize = cluster_call(?MODULE, get_trace_size, [], 30000),
|
||||||
|
AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize),
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
Traces =
|
||||||
|
lists:map(fun(Trace = #{name := Name, start_at := Start,
|
||||||
|
end_at := End, enable := Enable, type := Type, filter := Filter}) ->
|
||||||
|
FileName = emqx_trace:filename(Name, Start),
|
||||||
|
LogSize = collect_file_size(Nodes, FileName, AllFileSize),
|
||||||
|
Trace0 = maps:without([enable, filter], Trace),
|
||||||
|
Trace0#{ log_size => LogSize
|
||||||
|
, Type => Filter
|
||||||
|
, start_at => list_to_binary(calendar:system_time_to_rfc3339(Start))
|
||||||
|
, end_at => list_to_binary(calendar:system_time_to_rfc3339(End))
|
||||||
|
, status => status(Enable, Start, End, Now)
|
||||||
|
}
|
||||||
|
end, emqx_trace:format(List)),
|
||||||
|
{ok, Traces}
|
||||||
|
end.
|
||||||
|
|
||||||
|
create_trace(_, Param) ->
|
||||||
|
case emqx_trace:create(Param) of
|
||||||
|
ok -> ok;
|
||||||
|
{error, {already_existed, Name}} ->
|
||||||
|
{error, 'ALREADY_EXISTED', ?TO_BIN([Name, "Already Exists"])};
|
||||||
|
{error, {duplicate_condition, Name}} ->
|
||||||
|
{error, 'DUPLICATE_CONDITION', ?TO_BIN([Name, "Duplication Condition"])};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, 'INCORRECT_PARAMS', ?TO_BIN(Reason)}
|
||||||
|
end.
|
||||||
|
|
||||||
|
delete_trace(#{name := Name}, _Param) ->
|
||||||
|
case emqx_trace:delete(Name) of
|
||||||
|
ok -> ok;
|
||||||
|
{error, not_found} -> ?NOT_FOUND(Name)
|
||||||
|
end.
|
||||||
|
|
||||||
|
clear_traces(_, _) ->
|
||||||
|
emqx_trace:clear().
|
||||||
|
|
||||||
|
update_trace(#{name := Name, operation := Operation}, _Param) ->
|
||||||
|
Enable = case Operation of disable -> false; enable -> true end,
|
||||||
|
case emqx_trace:update(Name, Enable) of
|
||||||
|
ok -> {ok, #{enable => Enable, name => Name}};
|
||||||
|
{error, not_found} -> ?NOT_FOUND(Name)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes.
|
||||||
|
%% cowboy_compress_h will auto encode gzip format.
|
||||||
|
download_zip_log(#{name := Name}, _Param) ->
|
||||||
|
case emqx_trace:get_trace_filename(Name) of
|
||||||
|
{ok, TraceLog} ->
|
||||||
|
TraceFiles = collect_trace_file(TraceLog),
|
||||||
|
ZipDir = emqx_trace:zip_dir(),
|
||||||
|
Zips = group_trace_file(ZipDir, TraceLog, TraceFiles),
|
||||||
|
ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip",
|
||||||
|
{ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]),
|
||||||
|
emqx_trace:delete_files_after_send(ZipFileName, Zips),
|
||||||
|
{ok, ZipFile};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
group_trace_file(ZipDir, TraceLog, TraceFiles) ->
|
||||||
|
lists:foldl(fun(Res, Acc) ->
|
||||||
|
case Res of
|
||||||
|
{ok, Node, Bin} ->
|
||||||
|
ZipName = ZipDir ++ Node ++ "-" ++ TraceLog,
|
||||||
|
ok = file:write_file(ZipName, Bin),
|
||||||
|
[Node ++ "-" ++ TraceLog | Acc];
|
||||||
|
{error, Node, Reason} ->
|
||||||
|
?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]),
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end, [], TraceFiles).
|
||||||
|
|
||||||
|
collect_trace_file(TraceLog) ->
|
||||||
|
cluster_call(emqx_trace, trace_file, [TraceLog], 60000).
|
||||||
|
|
||||||
|
cluster_call(Mod, Fun, Args, Timeout) ->
|
||||||
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
|
{GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout),
|
||||||
|
BadNodes =/= [] andalso ?LOG(error, "rpc call failed on ~p ~p", [BadNodes, {Mod, Fun, Args}]),
|
||||||
|
GoodRes.
|
||||||
|
|
||||||
|
stream_log_file(#{name := Name}, Params) ->
|
||||||
|
Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())),
|
||||||
|
Position0 = proplists:get_value(<<"position">>, Params, <<"0">>),
|
||||||
|
Bytes0 = proplists:get_value(<<"bytes">>, Params, <<"1000">>),
|
||||||
|
case to_node(Node0) of
|
||||||
|
{ok, Node} ->
|
||||||
|
Position = binary_to_integer(Position0),
|
||||||
|
Bytes = binary_to_integer(Bytes0),
|
||||||
|
case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
|
||||||
|
{ok, Bin} ->
|
||||||
|
Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
|
||||||
|
{ok, #{meta => Meta, items => Bin}};
|
||||||
|
{eof, Size} ->
|
||||||
|
Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
|
||||||
|
{ok, #{meta => Meta, items => <<"">>}};
|
||||||
|
{error, Reason} ->
|
||||||
|
logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]),
|
||||||
|
{error, Reason};
|
||||||
|
{badrpc, nodedown} ->
|
||||||
|
{error, "BadRpc node down"}
|
||||||
|
end;
|
||||||
|
{error, Reason} -> {error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
get_trace_size() ->
|
||||||
|
TraceDir = emqx_trace:trace_dir(),
|
||||||
|
Node = node(),
|
||||||
|
case file:list_dir(TraceDir) of
|
||||||
|
{ok, AllFiles} ->
|
||||||
|
lists:foldl(fun(File, Acc) ->
|
||||||
|
FullFileName = filename:join(TraceDir, File),
|
||||||
|
Acc#{{Node, File} => filelib:file_size(FullFileName)}
|
||||||
|
end, #{}, lists:delete("zip", AllFiles));
|
||||||
|
_ -> #{}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% this is an rpc call for stream_log_file/2
|
||||||
|
read_trace_file(Name, Position, Limit) ->
|
||||||
|
TraceDir = emqx_trace:trace_dir(),
|
||||||
|
{ok, AllFiles} = file:list_dir(TraceDir),
|
||||||
|
TracePrefix = "trace_" ++ binary_to_list(Name) ++ "_",
|
||||||
|
Filter = fun(FileName) -> nomatch =/= string:prefix(FileName, TracePrefix) end,
|
||||||
|
case lists:filter(Filter, AllFiles) of
|
||||||
|
[TraceFile] ->
|
||||||
|
TracePath = filename:join([TraceDir, TraceFile]),
|
||||||
|
read_file(TracePath, Position, Limit);
|
||||||
|
[] -> {error, not_found}
|
||||||
|
end.
|
||||||
|
|
||||||
|
read_file(Path, Offset, Bytes) ->
|
||||||
|
{ok, IoDevice} = file:open(Path, [read, raw, binary]),
|
||||||
|
try
|
||||||
|
_ = case Offset of
|
||||||
|
0 -> ok;
|
||||||
|
_ -> file:position(IoDevice, {bof, Offset})
|
||||||
|
end,
|
||||||
|
case file:read(IoDevice, Bytes) of
|
||||||
|
{ok, Bin} -> {ok, Bin};
|
||||||
|
{error, Reason} -> {error, Reason};
|
||||||
|
eof ->
|
||||||
|
{ok, #file_info{size = Size}} = file:read_file_info(IoDevice),
|
||||||
|
{eof, Size}
|
||||||
|
end
|
||||||
|
after
|
||||||
|
file:close(IoDevice)
|
||||||
|
end.
|
||||||
|
|
||||||
|
to_node(Node) ->
|
||||||
|
try {ok, binary_to_existing_atom(Node)}
|
||||||
|
catch _:_ ->
|
||||||
|
{error, "node not found"}
|
||||||
|
end.
|
||||||
|
|
||||||
|
collect_file_size(Nodes, FileName, AllFiles) ->
|
||||||
|
lists:foldl(fun(Node, Acc) ->
|
||||||
|
Size = maps:get({Node, FileName}, AllFiles, 0),
|
||||||
|
Acc#{Node => Size}
|
||||||
|
end, #{}, Nodes).
|
||||||
|
|
||||||
|
%% status(false, _Start, End, Now) when End > Now -> <<"stopped">>;
|
||||||
|
status(false, _Start, _End, _Now) -> <<"stopped">>;
|
||||||
|
status(true, Start, _End, Now) when Now < Start -> <<"waiting">>;
|
||||||
|
status(true, _Start, End, Now) when Now >= End -> <<"stopped">>;
|
||||||
|
status(true, _Start, _End, _Now) -> <<"running">>.
|
|
@ -0,0 +1,218 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_trace_handler).
|
||||||
|
|
||||||
|
-include("emqx.hrl").
|
||||||
|
-include("logger.hrl").
|
||||||
|
|
||||||
|
-logger_header("[Tracer]").
|
||||||
|
|
||||||
|
%% APIs
|
||||||
|
-export([ running/0
|
||||||
|
, install/3
|
||||||
|
, install/4
|
||||||
|
, uninstall/1
|
||||||
|
, uninstall/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% For logger handler filters callbacks
|
||||||
|
-export([ filter_clientid/2
|
||||||
|
, filter_topic/2
|
||||||
|
, filter_ip_address/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([handler_id/2]).
|
||||||
|
|
||||||
|
-type tracer() :: #{
|
||||||
|
name := binary(),
|
||||||
|
type := clientid | topic | ip_address,
|
||||||
|
filter := emqx_types:clientid() | emqx_types:topic() | emqx_trace:ip_address()
|
||||||
|
}.
|
||||||
|
|
||||||
|
-define(FORMAT,
|
||||||
|
{logger_formatter, #{
|
||||||
|
template => [
|
||||||
|
time, " [", level, "] ",
|
||||||
|
{clientid,
|
||||||
|
[{peername, [clientid, "@", peername, " "], [clientid, " "]}],
|
||||||
|
[{peername, [peername, " "], []}]
|
||||||
|
},
|
||||||
|
msg, "\n"
|
||||||
|
],
|
||||||
|
single_line => false,
|
||||||
|
max_size => unlimited,
|
||||||
|
depth => unlimited
|
||||||
|
}}
|
||||||
|
).
|
||||||
|
|
||||||
|
-define(CONFIG(_LogFile_), #{
|
||||||
|
type => halt,
|
||||||
|
file => _LogFile_,
|
||||||
|
max_no_bytes => 512 * 1024 * 1024,
|
||||||
|
overload_kill_enable => true,
|
||||||
|
overload_kill_mem_size => 50 * 1024 * 1024,
|
||||||
|
overload_kill_qlen => 20000,
|
||||||
|
%% disable restart
|
||||||
|
overload_kill_restart_after => infinity
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% APIs
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec install(Name :: binary() | list(),
|
||||||
|
Type :: clientid | topic | ip_address,
|
||||||
|
Filter ::emqx_types:clientid() | emqx_types:topic() | string(),
|
||||||
|
Level :: logger:level() | all,
|
||||||
|
LogFilePath :: string()) -> ok | {error, term()}.
|
||||||
|
install(Name, Type, Filter, Level, LogFile) ->
|
||||||
|
Who = #{type => Type, filter => ensure_bin(Filter), name => ensure_bin(Name)},
|
||||||
|
install(Who, Level, LogFile).
|
||||||
|
|
||||||
|
-spec install(Type :: clientid | topic | ip_address,
|
||||||
|
Filter ::emqx_types:clientid() | emqx_types:topic() | string(),
|
||||||
|
Level :: logger:level() | all,
|
||||||
|
LogFilePath :: string()) -> ok | {error, term()}.
|
||||||
|
install(Type, Filter, Level, LogFile) ->
|
||||||
|
install(Filter, Type, Filter, Level, LogFile).
|
||||||
|
|
||||||
|
-spec install(tracer(), logger:level() | all, string()) -> ok | {error, term()}.
|
||||||
|
install(Who, all, LogFile) ->
|
||||||
|
install(Who, debug, LogFile);
|
||||||
|
install(Who, Level, LogFile) ->
|
||||||
|
PrimaryLevel = emqx_logger:get_primary_log_level(),
|
||||||
|
try logger:compare_levels(Level, PrimaryLevel) of
|
||||||
|
lt ->
|
||||||
|
{error,
|
||||||
|
io_lib:format(
|
||||||
|
"Cannot trace at a log level (~s) "
|
||||||
|
"lower than the primary log level (~s)",
|
||||||
|
[Level, PrimaryLevel]
|
||||||
|
)};
|
||||||
|
_GtOrEq ->
|
||||||
|
install_handler(Who, Level, LogFile)
|
||||||
|
catch
|
||||||
|
error:badarg ->
|
||||||
|
{error, {invalid_log_level, Level}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec uninstall(Type :: clientid | topic | ip_address,
|
||||||
|
Name :: binary() | list()) -> ok | {error, term()}.
|
||||||
|
uninstall(Type, Name) ->
|
||||||
|
HandlerId = handler_id(ensure_bin(Name), Type),
|
||||||
|
uninstall(HandlerId).
|
||||||
|
|
||||||
|
-spec uninstall(HandlerId :: atom()) -> ok | {error, term()}.
|
||||||
|
uninstall(HandlerId) ->
|
||||||
|
Res = logger:remove_handler(HandlerId),
|
||||||
|
show_prompts(Res, HandlerId, "Stop trace"),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
%% @doc Return all running trace handlers information.
|
||||||
|
-spec running() ->
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
name => binary(),
|
||||||
|
type => topic | clientid | ip_address,
|
||||||
|
id => atom(),
|
||||||
|
filter => emqx_types:topic() | emqx_types:clienetid() | emqx_trace:ip_address(),
|
||||||
|
level => logger:level(),
|
||||||
|
dst => file:filename() | console | unknown
|
||||||
|
}
|
||||||
|
].
|
||||||
|
running() ->
|
||||||
|
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)).
|
||||||
|
|
||||||
|
-spec filter_clientid(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore.
|
||||||
|
filter_clientid(#{meta := #{clientid := ClientId}} = Log, {ClientId, _Name}) -> Log;
|
||||||
|
filter_clientid(_Log, _ExpectId) -> ignore.
|
||||||
|
|
||||||
|
-spec filter_topic(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore.
|
||||||
|
filter_topic(#{meta := #{topic := Topic}} = Log, {TopicFilter, _Name}) ->
|
||||||
|
case emqx_topic:match(Topic, TopicFilter) of
|
||||||
|
true -> Log;
|
||||||
|
false -> ignore
|
||||||
|
end;
|
||||||
|
filter_topic(_Log, _ExpectId) -> ignore.
|
||||||
|
|
||||||
|
-spec filter_ip_address(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore.
|
||||||
|
filter_ip_address(#{meta := #{peername := Peername}} = Log, {IP, _Name}) ->
|
||||||
|
case lists:prefix(IP, Peername) of
|
||||||
|
true -> Log;
|
||||||
|
false -> ignore
|
||||||
|
end;
|
||||||
|
filter_ip_address(_Log, _ExpectId) -> ignore.
|
||||||
|
|
||||||
|
install_handler(Who = #{name := Name, type := Type}, Level, LogFile) ->
|
||||||
|
HandlerId = handler_id(Name, Type),
|
||||||
|
Config = #{ level => Level,
|
||||||
|
formatter => ?FORMAT,
|
||||||
|
filter_default => stop,
|
||||||
|
filters => filters(Who),
|
||||||
|
config => ?CONFIG(LogFile)
|
||||||
|
},
|
||||||
|
Res = logger:add_handler(HandlerId, logger_disk_log_h, Config),
|
||||||
|
show_prompts(Res, Who, "Start trace"),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
filters(#{type := clientid, filter := Filter, name := Name}) ->
|
||||||
|
[{clientid, {fun ?MODULE:filter_clientid/2, {ensure_list(Filter), Name}}}];
|
||||||
|
filters(#{type := topic, filter := Filter, name := Name}) ->
|
||||||
|
[{topic, {fun ?MODULE:filter_topic/2, {ensure_bin(Filter), Name}}}];
|
||||||
|
filters(#{type := ip_address, filter := Filter, name := Name}) ->
|
||||||
|
[{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}].
|
||||||
|
|
||||||
|
filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) ->
|
||||||
|
Init = #{id => Id, level => Level, dst => Dst},
|
||||||
|
case Filters of
|
||||||
|
[{Type, {_FilterFun, {Filter, Name}}}] when
|
||||||
|
Type =:= topic orelse
|
||||||
|
Type =:= clientid orelse
|
||||||
|
Type =:= ip_address ->
|
||||||
|
[Init#{type => Type, filter => Filter, name => Name} | Acc];
|
||||||
|
_ ->
|
||||||
|
Acc
|
||||||
|
end.
|
||||||
|
|
||||||
|
handler_id(Name, Type) ->
|
||||||
|
try
|
||||||
|
do_handler_id(Name, Type)
|
||||||
|
catch
|
||||||
|
_ : _ ->
|
||||||
|
Hash = emqx_misc:bin2hexstr_a_f(crypto:hash(md5, Name)),
|
||||||
|
do_handler_id(Hash, Type)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% Handler ID must be an atom.
|
||||||
|
do_handler_id(Name, Type) ->
|
||||||
|
TypeStr = atom_to_list(Type),
|
||||||
|
NameStr = unicode:characters_to_list(Name, utf8),
|
||||||
|
FullNameStr = "trace_" ++ TypeStr ++ "_" ++ NameStr,
|
||||||
|
true = io_lib:printable_unicode_list(FullNameStr),
|
||||||
|
FullNameBin = unicode:characters_to_binary(FullNameStr, utf8),
|
||||||
|
binary_to_atom(FullNameBin, utf8).
|
||||||
|
|
||||||
|
ensure_bin(List) when is_list(List) -> iolist_to_binary(List);
|
||||||
|
ensure_bin(Bin) when is_binary(Bin) -> Bin.
|
||||||
|
|
||||||
|
ensure_list(Bin) when is_binary(Bin) -> binary_to_list(Bin);
|
||||||
|
ensure_list(List) when is_list(List) -> List.
|
||||||
|
|
||||||
|
show_prompts(ok, Who, Msg) ->
|
||||||
|
?LOG(info, Msg ++ " ~p " ++ "successfully~n", [Who]);
|
||||||
|
show_prompts({error, Reason}, Who, Msg) ->
|
||||||
|
?LOG(error, Msg ++ " ~p " ++ "failed with ~p~n", [Who, Reason]).
|
|
@ -1,167 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_tracer).
|
|
||||||
|
|
||||||
-include("emqx.hrl").
|
|
||||||
-include("logger.hrl").
|
|
||||||
|
|
||||||
|
|
||||||
%% APIs
|
|
||||||
-export([ trace/2
|
|
||||||
, start_trace/3
|
|
||||||
, lookup_traces/0
|
|
||||||
, stop_trace/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-type(trace_who() :: {clientid | topic, binary()}).
|
|
||||||
|
|
||||||
-define(TRACER, ?MODULE).
|
|
||||||
-define(FORMAT, {logger_formatter,
|
|
||||||
#{template =>
|
|
||||||
[time, " [", level, "] ",
|
|
||||||
{clientid,
|
|
||||||
[{peername,
|
|
||||||
[clientid, "@", peername, " "],
|
|
||||||
[clientid, " "]}],
|
|
||||||
[{peername,
|
|
||||||
[peername, " "],
|
|
||||||
[]}]},
|
|
||||||
msg, "\n"],
|
|
||||||
single_line => false
|
|
||||||
}}).
|
|
||||||
-define(TOPIC_TRACE_ID(T), "trace_topic_"++T).
|
|
||||||
-define(CLIENT_TRACE_ID(C), "trace_clientid_"++C).
|
|
||||||
-define(TOPIC_TRACE(T), {topic, T}).
|
|
||||||
-define(CLIENT_TRACE(C), {clientid, C}).
|
|
||||||
|
|
||||||
-define(IS_LOG_LEVEL(L),
|
|
||||||
L =:= emergency orelse
|
|
||||||
L =:= alert orelse
|
|
||||||
L =:= critical orelse
|
|
||||||
L =:= error orelse
|
|
||||||
L =:= warning orelse
|
|
||||||
L =:= notice orelse
|
|
||||||
L =:= info orelse
|
|
||||||
L =:= debug).
|
|
||||||
|
|
||||||
-dialyzer({nowarn_function, [install_trace_handler/3]}).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% APIs
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
|
|
||||||
%% Do not trace '$SYS' publish
|
|
||||||
ignore;
|
|
||||||
trace(publish, #message{from = From, topic = Topic, payload = Payload})
|
|
||||||
when is_binary(From); is_atom(From) ->
|
|
||||||
emqx_logger:info(#{topic => Topic,
|
|
||||||
mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} },
|
|
||||||
"PUBLISH to ~ts: ~0p", [Topic, Payload]).
|
|
||||||
|
|
||||||
%% @doc Start to trace clientid or topic.
|
|
||||||
-spec(start_trace(trace_who(), logger:level() | all, string()) -> ok | {error, term()}).
|
|
||||||
start_trace(Who, all, LogFile) ->
|
|
||||||
start_trace(Who, debug, LogFile);
|
|
||||||
start_trace(Who, Level, LogFile) ->
|
|
||||||
case ?IS_LOG_LEVEL(Level) of
|
|
||||||
true ->
|
|
||||||
#{level := PrimaryLevel} = logger:get_primary_config(),
|
|
||||||
try logger:compare_levels(Level, PrimaryLevel) of
|
|
||||||
lt ->
|
|
||||||
{error,
|
|
||||||
io_lib:format("Cannot trace at a log level (~ts) "
|
|
||||||
"lower than the primary log level (~ts)",
|
|
||||||
[Level, PrimaryLevel])};
|
|
||||||
_GtOrEq ->
|
|
||||||
install_trace_handler(Who, Level, LogFile)
|
|
||||||
catch
|
|
||||||
_:Error ->
|
|
||||||
{error, Error}
|
|
||||||
end;
|
|
||||||
false -> {error, {invalid_log_level, Level}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @doc Stop tracing clientid or topic.
|
|
||||||
-spec(stop_trace(trace_who()) -> ok | {error, term()}).
|
|
||||||
stop_trace(Who) ->
|
|
||||||
uninstall_trance_handler(Who).
|
|
||||||
|
|
||||||
%% @doc Lookup all traces
|
|
||||||
-spec(lookup_traces() -> [{Who :: trace_who(), LogFile :: string()}]).
|
|
||||||
lookup_traces() ->
|
|
||||||
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)).
|
|
||||||
|
|
||||||
install_trace_handler(Who, Level, LogFile) ->
|
|
||||||
case logger:add_handler(handler_id(Who), logger_disk_log_h,
|
|
||||||
#{level => Level,
|
|
||||||
formatter => ?FORMAT,
|
|
||||||
config => #{type => halt, file => LogFile},
|
|
||||||
filter_default => stop,
|
|
||||||
filters => [{meta_key_filter,
|
|
||||||
{fun filter_by_meta_key/2, Who}}]})
|
|
||||||
of
|
|
||||||
ok ->
|
|
||||||
?SLOG(info, #{msg => "start_trace", who => Who});
|
|
||||||
{error, Reason} ->
|
|
||||||
?SLOG(error, #{msg => "failed_to_trace", who => Who, reason => Reason}),
|
|
||||||
{error, Reason}
|
|
||||||
end.
|
|
||||||
|
|
||||||
uninstall_trance_handler(Who) ->
|
|
||||||
case logger:remove_handler(handler_id(Who)) of
|
|
||||||
ok ->
|
|
||||||
?SLOG(info, #{msg => "stop_trace", who => Who});
|
|
||||||
{error, Reason} ->
|
|
||||||
?SLOG(error, #{msg => "failed_to_stop_trace", who => Who, reason => Reason}),
|
|
||||||
{error, Reason}
|
|
||||||
end.
|
|
||||||
|
|
||||||
filter_traces(#{id := Id, level := Level, dst := Dst}, Acc) ->
|
|
||||||
case atom_to_list(Id) of
|
|
||||||
?TOPIC_TRACE_ID(T)->
|
|
||||||
[{?TOPIC_TRACE(T), {Level, Dst}} | Acc];
|
|
||||||
?CLIENT_TRACE_ID(C) ->
|
|
||||||
[{?CLIENT_TRACE(C), {Level, Dst}} | Acc];
|
|
||||||
_ -> Acc
|
|
||||||
end.
|
|
||||||
|
|
||||||
handler_id(?TOPIC_TRACE(Topic)) ->
|
|
||||||
list_to_atom(?TOPIC_TRACE_ID(handler_name(Topic)));
|
|
||||||
handler_id(?CLIENT_TRACE(ClientId)) ->
|
|
||||||
list_to_atom(?CLIENT_TRACE_ID(handler_name(ClientId))).
|
|
||||||
|
|
||||||
filter_by_meta_key(#{meta := Meta} = Log, {Key, Value}) ->
|
|
||||||
case is_meta_match(Key, Value, Meta) of
|
|
||||||
true -> Log;
|
|
||||||
false -> ignore
|
|
||||||
end.
|
|
||||||
|
|
||||||
is_meta_match(clientid, ClientId, #{clientid := ClientIdStr}) ->
|
|
||||||
ClientId =:= iolist_to_binary(ClientIdStr);
|
|
||||||
is_meta_match(topic, TopicFilter, #{topic := TopicMeta}) ->
|
|
||||||
emqx_topic:match(TopicMeta, TopicFilter);
|
|
||||||
is_meta_match(_, _, _) ->
|
|
||||||
false.
|
|
||||||
|
|
||||||
handler_name(Bin) ->
|
|
||||||
case byte_size(Bin) of
|
|
||||||
Size when Size =< 200 -> binary_to_list(Bin);
|
|
||||||
_ -> hashstr(Bin)
|
|
||||||
end.
|
|
||||||
|
|
||||||
hashstr(Bin) ->
|
|
||||||
binary_to_list(emqx_misc:bin2hexstr_A_F(Bin)).
|
|
|
@ -0,0 +1,318 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_trace_SUITE).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
|
||||||
|
-record(emqx_trace, {name, type, filter, enable = true, start_at, end_at}).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Setups
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
application:load(emqx_plugin_libs),
|
||||||
|
emqx_common_test_helpers:start_apps([]),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
emqx_common_test_helpers:stop_apps([]).
|
||||||
|
|
||||||
|
t_base_create_delete(_Config) ->
|
||||||
|
ok = emqx_trace:clear(),
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
Start = to_rfc3339(Now),
|
||||||
|
End = to_rfc3339(Now + 30 * 60),
|
||||||
|
Name = <<"name1">>,
|
||||||
|
ClientId = <<"test-device">>,
|
||||||
|
Trace = #{
|
||||||
|
name => Name,
|
||||||
|
type => <<"clientid">>,
|
||||||
|
clientid => ClientId,
|
||||||
|
start_at => Start,
|
||||||
|
end_at => End
|
||||||
|
},
|
||||||
|
AnotherTrace = Trace#{name => <<"anotherTrace">>},
|
||||||
|
ok = emqx_trace:create(Trace),
|
||||||
|
?assertEqual({error, {already_existed, Name}}, emqx_trace:create(Trace)),
|
||||||
|
?assertEqual({error, {duplicate_condition, Name}}, emqx_trace:create(AnotherTrace)),
|
||||||
|
[TraceRec] = emqx_trace:list(),
|
||||||
|
Expect = #emqx_trace{
|
||||||
|
name = Name,
|
||||||
|
type = clientid,
|
||||||
|
filter = ClientId,
|
||||||
|
start_at = Now,
|
||||||
|
end_at = Now + 30 * 60
|
||||||
|
},
|
||||||
|
?assertEqual(Expect, TraceRec),
|
||||||
|
ExpectFormat = [
|
||||||
|
#{
|
||||||
|
filter => <<"test-device">>,
|
||||||
|
enable => true,
|
||||||
|
type => clientid,
|
||||||
|
name => <<"name1">>,
|
||||||
|
start_at => Now,
|
||||||
|
end_at => Now + 30 * 60
|
||||||
|
}
|
||||||
|
],
|
||||||
|
?assertEqual(ExpectFormat, emqx_trace:format([TraceRec])),
|
||||||
|
?assertEqual(ok, emqx_trace:delete(Name)),
|
||||||
|
?assertEqual({error, not_found}, emqx_trace:delete(Name)),
|
||||||
|
?assertEqual([], emqx_trace:list()),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_create_size_max(_Config) ->
|
||||||
|
emqx_trace:clear(),
|
||||||
|
lists:map(fun(Seq) ->
|
||||||
|
Name = list_to_binary("name" ++ integer_to_list(Seq)),
|
||||||
|
Trace = [{name, Name}, {type, <<"topic">>},
|
||||||
|
{topic, list_to_binary("/x/y/" ++ integer_to_list(Seq))}],
|
||||||
|
ok = emqx_trace:create(Trace)
|
||||||
|
end, lists:seq(1, 30)),
|
||||||
|
Trace31 = [{<<"name">>, <<"name31">>},
|
||||||
|
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/31">>}],
|
||||||
|
{error, _} = emqx_trace:create(Trace31),
|
||||||
|
ok = emqx_trace:delete(<<"name30">>),
|
||||||
|
ok = emqx_trace:create(Trace31),
|
||||||
|
?assertEqual(30, erlang:length(emqx_trace:list())),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_create_failed(_Config) ->
|
||||||
|
ok = emqx_trace:clear(),
|
||||||
|
UnknownField = [{<<"unknown">>, 12}],
|
||||||
|
{error, Reason1} = emqx_trace:create(UnknownField),
|
||||||
|
?assertEqual(<<"unknown field: {unknown,12}">>, iolist_to_binary(Reason1)),
|
||||||
|
|
||||||
|
InvalidTopic = [{<<"topic">>, "#/#//"}],
|
||||||
|
{error, Reason2} = emqx_trace:create(InvalidTopic),
|
||||||
|
?assertEqual(<<"topic: #/#// invalid by function_clause">>, iolist_to_binary(Reason2)),
|
||||||
|
|
||||||
|
InvalidStart = [{<<"start_at">>, <<"2021-12-3:12">>}],
|
||||||
|
{error, Reason3} = emqx_trace:create(InvalidStart),
|
||||||
|
?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
|
||||||
|
iolist_to_binary(Reason3)),
|
||||||
|
|
||||||
|
InvalidEnd = [{<<"end_at">>, <<"2021-12-3:12">>}],
|
||||||
|
{error, Reason4} = emqx_trace:create(InvalidEnd),
|
||||||
|
?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
|
||||||
|
iolist_to_binary(Reason4)),
|
||||||
|
|
||||||
|
{error, Reason7} = emqx_trace:create([{<<"name">>, <<"test">>}, {<<"type">>, <<"clientid">>}]),
|
||||||
|
?assertEqual(<<"topic/clientid/ip_address filter required">>, iolist_to_binary(Reason7)),
|
||||||
|
|
||||||
|
InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>},
|
||||||
|
{<<"type">>, <<"clientid">>}],
|
||||||
|
{error, Reason9} = emqx_trace:create(InvalidPackets4),
|
||||||
|
?assertEqual(<<"name cannot contain /">>, iolist_to_binary(Reason9)),
|
||||||
|
|
||||||
|
?assertEqual({error, "type=[topic,clientid,ip_address] required"},
|
||||||
|
emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"clientid">>, <<"good">>}])),
|
||||||
|
|
||||||
|
?assertEqual({error, "incorrect type: only support clientid/topic/ip_address"},
|
||||||
|
emqx_trace:create([{<<"name">>, <<"test-name">>},
|
||||||
|
{<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])),
|
||||||
|
|
||||||
|
?assertEqual({error, "ip address: einval"},
|
||||||
|
emqx_trace:create([{<<"ip_address">>, <<"test-name">>}])),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_create_default(_Config) ->
|
||||||
|
ok = emqx_trace:clear(),
|
||||||
|
{error, "name required"} = emqx_trace:create([]),
|
||||||
|
ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
|
||||||
|
{<<"type">>, <<"clientid">>}, {<<"clientid">>, <<"good">>}]),
|
||||||
|
[#emqx_trace{name = <<"test-name">>}] = emqx_trace:list(),
|
||||||
|
ok = emqx_trace:clear(),
|
||||||
|
Trace = [
|
||||||
|
{<<"name">>, <<"test-name">>},
|
||||||
|
{<<"type">>, <<"topic">>},
|
||||||
|
{<<"topic">>, <<"/x/y/z">>},
|
||||||
|
{<<"start_at">>, <<"2021-10-28T10:54:47+08:00">>},
|
||||||
|
{<<"end_at">>, <<"2021-10-27T10:54:47+08:00">>}
|
||||||
|
],
|
||||||
|
{error, "end_at time has already passed"} = emqx_trace:create(Trace),
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
Trace2 = [
|
||||||
|
{<<"name">>, <<"test-name">>},
|
||||||
|
{<<"type">>, <<"topic">>},
|
||||||
|
{<<"topic">>, <<"/x/y/z">>},
|
||||||
|
{<<"start_at">>, to_rfc3339(Now + 10)},
|
||||||
|
{<<"end_at">>, to_rfc3339(Now + 3)}
|
||||||
|
],
|
||||||
|
{error, "failed by start_at >= end_at"} = emqx_trace:create(Trace2),
|
||||||
|
ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
|
||||||
|
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}]),
|
||||||
|
[#emqx_trace{start_at = Start, end_at = End}] = emqx_trace:list(),
|
||||||
|
?assertEqual(10 * 60, End - Start),
|
||||||
|
?assertEqual(true, Start - erlang:system_time(second) < 5),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_update_enable(_Config) ->
|
||||||
|
ok = emqx_trace:clear(),
|
||||||
|
Name = <<"test-name">>,
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)),
|
||||||
|
ok = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, <<"topic">>},
|
||||||
|
{<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]),
|
||||||
|
[#emqx_trace{enable = Enable}] = emqx_trace:list(),
|
||||||
|
?assertEqual(Enable, true),
|
||||||
|
ok = emqx_trace:update(Name, false),
|
||||||
|
[#emqx_trace{enable = false}] = emqx_trace:list(),
|
||||||
|
ok = emqx_trace:update(Name, false),
|
||||||
|
[#emqx_trace{enable = false}] = emqx_trace:list(),
|
||||||
|
ok = emqx_trace:update(Name, true),
|
||||||
|
[#emqx_trace{enable = true}] = emqx_trace:list(),
|
||||||
|
ok = emqx_trace:update(Name, false),
|
||||||
|
[#emqx_trace{enable = false}] = emqx_trace:list(),
|
||||||
|
?assertEqual({error, not_found}, emqx_trace:update(<<"Name not found">>, true)),
|
||||||
|
ct:sleep(2100),
|
||||||
|
?assertEqual({error, finished}, emqx_trace:update(Name, true)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_load_state(_Config) ->
|
||||||
|
emqx_trace:clear(),
|
||||||
|
load(),
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>},
|
||||||
|
{<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)},
|
||||||
|
{<<"end_at">>, to_rfc3339(Now + 2)}],
|
||||||
|
Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, <<"topic">>},
|
||||||
|
{<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)},
|
||||||
|
{<<"end_at">>, to_rfc3339(Now + 8)}],
|
||||||
|
Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, <<"topic">>},
|
||||||
|
{<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, to_rfc3339(Now - 5)},
|
||||||
|
{<<"end_at">>, to_rfc3339(Now)}],
|
||||||
|
ok = emqx_trace:create(Running),
|
||||||
|
ok = emqx_trace:create(Waiting),
|
||||||
|
{error, "end_at time has already passed"} = emqx_trace:create(Finished),
|
||||||
|
Traces = emqx_trace:format(emqx_trace:list()),
|
||||||
|
?assertEqual(2, erlang:length(Traces)),
|
||||||
|
Enables = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces),
|
||||||
|
ExpectEnables = [{<<"Running">>, true}, {<<"Waiting">>, true}],
|
||||||
|
?assertEqual(ExpectEnables, lists:sort(Enables)),
|
||||||
|
ct:sleep(3500),
|
||||||
|
Traces2 = emqx_trace:format(emqx_trace:list()),
|
||||||
|
?assertEqual(2, erlang:length(Traces2)),
|
||||||
|
Enables2 = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces2),
|
||||||
|
ExpectEnables2 = [{<<"Running">>, false}, {<<"Waiting">>, true}],
|
||||||
|
?assertEqual(ExpectEnables2, lists:sort(Enables2)),
|
||||||
|
unload(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_client_event(_Config) ->
|
||||||
|
application:set_env(emqx, allow_anonymous, true),
|
||||||
|
emqx_trace:clear(),
|
||||||
|
ClientId = <<"client-test">>,
|
||||||
|
load(),
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
Start = to_rfc3339(Now),
|
||||||
|
Name = <<"test_client_id_event">>,
|
||||||
|
ok = emqx_trace:create([{<<"name">>, Name},
|
||||||
|
{<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
|
||||||
|
ct:sleep(200),
|
||||||
|
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
emqtt:ping(Client),
|
||||||
|
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]),
|
||||||
|
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]),
|
||||||
|
ct:sleep(200),
|
||||||
|
ok = emqx_trace:create([{<<"name">>, <<"test_topic">>},
|
||||||
|
{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]),
|
||||||
|
ct:sleep(200),
|
||||||
|
{ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)),
|
||||||
|
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]),
|
||||||
|
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"4">>, [{qos, 0}]),
|
||||||
|
ok = emqtt:disconnect(Client),
|
||||||
|
ct:sleep(200),
|
||||||
|
{ok, Bin2} = file:read_file(emqx_trace:log_file(Name, Now)),
|
||||||
|
{ok, Bin3} = file:read_file(emqx_trace:log_file(<<"test_topic">>, Now)),
|
||||||
|
ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]),
|
||||||
|
?assert(erlang:byte_size(Bin) > 0),
|
||||||
|
?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)),
|
||||||
|
?assert(erlang:byte_size(Bin3) > 0),
|
||||||
|
unload(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_get_log_filename(_Config) ->
|
||||||
|
ok = emqx_trace:clear(),
|
||||||
|
load(),
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
Start = calendar:system_time_to_rfc3339(Now),
|
||||||
|
End = calendar:system_time_to_rfc3339(Now + 2),
|
||||||
|
Name = <<"name1">>,
|
||||||
|
Trace = [
|
||||||
|
{<<"name">>, Name},
|
||||||
|
{<<"type">>, <<"ip_address">>},
|
||||||
|
{<<"ip_address">>, <<"127.0.0.1">>},
|
||||||
|
{<<"start_at">>, list_to_binary(Start)},
|
||||||
|
{<<"end_at">>, list_to_binary(End)}
|
||||||
|
],
|
||||||
|
ok = emqx_trace:create(Trace),
|
||||||
|
?assertEqual({error, not_found}, emqx_trace:get_trace_filename(<<"test">>)),
|
||||||
|
?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
|
||||||
|
ct:sleep(3000),
|
||||||
|
?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
|
||||||
|
unload(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_trace_file(_Config) ->
|
||||||
|
FileName = "test.log",
|
||||||
|
Content = <<"test \n test">>,
|
||||||
|
TraceDir = emqx_trace:trace_dir(),
|
||||||
|
File = filename:join(TraceDir, FileName),
|
||||||
|
ok = file:write_file(File, Content),
|
||||||
|
{ok, Node, Bin} = emqx_trace:trace_file(FileName),
|
||||||
|
?assertEqual(Node, atom_to_list(node())),
|
||||||
|
?assertEqual(Content, Bin),
|
||||||
|
ok = file:delete(File),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_download_log(_Config) ->
|
||||||
|
emqx_trace:clear(),
|
||||||
|
load(),
|
||||||
|
ClientId = <<"client-test">>,
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
Start = to_rfc3339(Now),
|
||||||
|
Name = <<"test_client_id">>,
|
||||||
|
ok = emqx_trace:create([{<<"name">>, Name},
|
||||||
|
{<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
|
||||||
|
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
[begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
|
||||||
|
ct:sleep(100),
|
||||||
|
{ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []),
|
||||||
|
?assert(filelib:file_size(ZipFile) > 0),
|
||||||
|
ok = emqtt:disconnect(Client),
|
||||||
|
unload(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
to_rfc3339(Second) ->
|
||||||
|
list_to_binary(calendar:system_time_to_rfc3339(Second)).
|
||||||
|
|
||||||
|
load() ->
|
||||||
|
emqx_trace:start_link().
|
||||||
|
|
||||||
|
unload() ->
|
||||||
|
gen_server:stop(emqx_trace).
|
|
@ -0,0 +1,191 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_trace_handler_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-define(CLIENT, [{host, "localhost"},
|
||||||
|
{clientid, <<"client">>},
|
||||||
|
{username, <<"testuser">>},
|
||||||
|
{password, <<"pass">>}
|
||||||
|
]).
|
||||||
|
|
||||||
|
all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address].
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
emqx_common_test_helpers:boot_modules(all),
|
||||||
|
emqx_common_test_helpers:start_apps([]),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
emqx_common_test_helpers:stop_apps([]).
|
||||||
|
|
||||||
|
init_per_testcase(t_trace_clientid, Config) ->
|
||||||
|
Config;
|
||||||
|
init_per_testcase(_Case, Config) ->
|
||||||
|
ok = emqx_logger:set_log_level(debug),
|
||||||
|
_ = [logger:remove_handler(Id) ||#{id := Id} <- emqx_trace_handler:running()],
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_Case, _Config) ->
|
||||||
|
ok = emqx_logger:set_log_level(warning),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_trace_clientid(_Config) ->
|
||||||
|
%% Start tracing
|
||||||
|
emqx_logger:set_log_level(error),
|
||||||
|
{error, _} = emqx_trace_handler:install(clientid, <<"client">>, debug, "tmp/client.log"),
|
||||||
|
emqx_logger:set_log_level(debug),
|
||||||
|
%% add list clientid
|
||||||
|
ok = emqx_trace_handler:install(clientid, "client", debug, "tmp/client.log"),
|
||||||
|
ok = emqx_trace_handler:install(clientid, <<"client2">>, all, "tmp/client2.log"),
|
||||||
|
ok = emqx_trace_handler:install(clientid, <<"client3">>, all, "tmp/client3.log"),
|
||||||
|
{error, {invalid_log_level, bad_level}} =
|
||||||
|
emqx_trace_handler:install(clientid, <<"client4">>, bad_level, "tmp/client4.log"),
|
||||||
|
{error, {handler_not_added, {file_error, ".", eisdir}}} =
|
||||||
|
emqx_trace_handler:install(clientid, <<"client5">>, debug, "."),
|
||||||
|
ct:sleep(100),
|
||||||
|
|
||||||
|
%% Verify the tracing file exits
|
||||||
|
?assert(filelib:is_regular("tmp/client.log")),
|
||||||
|
?assert(filelib:is_regular("tmp/client2.log")),
|
||||||
|
?assert(filelib:is_regular("tmp/client3.log")),
|
||||||
|
|
||||||
|
%% Get current traces
|
||||||
|
?assertMatch([#{type := clientid, filter := "client", name := <<"client">>,
|
||||||
|
level := debug, dst := "tmp/client.log"},
|
||||||
|
#{type := clientid, filter := "client2", name := <<"client2">>
|
||||||
|
, level := debug, dst := "tmp/client2.log"},
|
||||||
|
#{type := clientid, filter := "client3", name := <<"client3">>,
|
||||||
|
level := debug, dst := "tmp/client3.log"}
|
||||||
|
], emqx_trace_handler:running()),
|
||||||
|
|
||||||
|
%% Client with clientid = "client" publishes a "hi" message to "a/b/c".
|
||||||
|
{ok, T} = emqtt:start_link(?CLIENT),
|
||||||
|
emqtt:connect(T),
|
||||||
|
emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
|
||||||
|
emqtt:ping(T),
|
||||||
|
ct:sleep(200),
|
||||||
|
|
||||||
|
%% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log".
|
||||||
|
{ok, Bin} = file:read_file("tmp/client.log"),
|
||||||
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"CONNECT">>])),
|
||||||
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"CONNACK">>])),
|
||||||
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
|
||||||
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"PINGREQ">>])),
|
||||||
|
?assert(filelib:file_size("tmp/client2.log") == 0),
|
||||||
|
|
||||||
|
%% Stop tracing
|
||||||
|
ok = emqx_trace_handler:uninstall(clientid, <<"client">>),
|
||||||
|
ok = emqx_trace_handler:uninstall(clientid, <<"client2">>),
|
||||||
|
ok = emqx_trace_handler:uninstall(clientid, <<"client3">>),
|
||||||
|
|
||||||
|
emqtt:disconnect(T),
|
||||||
|
?assertEqual([], emqx_trace_handler:running()).
|
||||||
|
|
||||||
|
t_trace_topic(_Config) ->
|
||||||
|
{ok, T} = emqtt:start_link(?CLIENT),
|
||||||
|
emqtt:connect(T),
|
||||||
|
|
||||||
|
%% Start tracing
|
||||||
|
emqx_logger:set_log_level(debug),
|
||||||
|
ok = emqx_trace_handler:install(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"),
|
||||||
|
ok = emqx_trace_handler:install(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"),
|
||||||
|
ct:sleep(100),
|
||||||
|
|
||||||
|
%% Verify the tracing file exits
|
||||||
|
?assert(filelib:is_regular("tmp/topic_trace_x.log")),
|
||||||
|
?assert(filelib:is_regular("tmp/topic_trace_y.log")),
|
||||||
|
|
||||||
|
%% Get current traces
|
||||||
|
?assertMatch([#{type := topic, filter := <<"x/#">>,
|
||||||
|
level := debug, dst := "tmp/topic_trace_x.log", name := <<"x/#">>},
|
||||||
|
#{type := topic, filter := <<"y/#">>,
|
||||||
|
name := <<"y/#">>, level := debug, dst := "tmp/topic_trace_y.log"}
|
||||||
|
],
|
||||||
|
emqx_trace_handler:running()),
|
||||||
|
|
||||||
|
%% Client with clientid = "client" publishes a "hi" message to "x/y/z".
|
||||||
|
emqtt:publish(T, <<"x/y/z">>, <<"hi1">>),
|
||||||
|
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
|
||||||
|
emqtt:subscribe(T, <<"x/y/z">>),
|
||||||
|
emqtt:unsubscribe(T, <<"x/y/z">>),
|
||||||
|
ct:sleep(200),
|
||||||
|
|
||||||
|
{ok, Bin} = file:read_file("tmp/topic_trace_x.log"),
|
||||||
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
|
||||||
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi2">>])),
|
||||||
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
|
||||||
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"SUBSCRIBE">>])),
|
||||||
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"UNSUBSCRIBE">>])),
|
||||||
|
?assert(filelib:file_size("tmp/topic_trace_y.log") =:= 0),
|
||||||
|
|
||||||
|
%% Stop tracing
|
||||||
|
ok = emqx_trace_handler:uninstall(topic, <<"x/#">>),
|
||||||
|
ok = emqx_trace_handler:uninstall(topic, <<"y/#">>),
|
||||||
|
{error, _Reason} = emqx_trace_handler:uninstall(topic, <<"z/#">>),
|
||||||
|
?assertEqual([], emqx_trace_handler:running()),
|
||||||
|
emqtt:disconnect(T).
|
||||||
|
|
||||||
|
t_trace_ip_address(_Config) ->
|
||||||
|
{ok, T} = emqtt:start_link(?CLIENT),
|
||||||
|
emqtt:connect(T),
|
||||||
|
|
||||||
|
%% Start tracing
|
||||||
|
ok = emqx_trace_handler:install(ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"),
|
||||||
|
ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"),
|
||||||
|
ct:sleep(100),
|
||||||
|
|
||||||
|
%% Verify the tracing file exits
|
||||||
|
?assert(filelib:is_regular("tmp/ip_trace_x.log")),
|
||||||
|
?assert(filelib:is_regular("tmp/ip_trace_y.log")),
|
||||||
|
|
||||||
|
%% Get current traces
|
||||||
|
?assertMatch([#{type := ip_address, filter := "127.0.0.1",
|
||||||
|
name := <<"127.0.0.1">>,
|
||||||
|
level := debug, dst := "tmp/ip_trace_x.log"},
|
||||||
|
#{type := ip_address, filter := "192.168.1.1",
|
||||||
|
name := <<"192.168.1.1">>,
|
||||||
|
level := debug, dst := "tmp/ip_trace_y.log"}
|
||||||
|
],
|
||||||
|
emqx_trace_handler:running()),
|
||||||
|
|
||||||
|
%% Client with clientid = "client" publishes a "hi" message to "x/y/z".
|
||||||
|
emqtt:publish(T, <<"x/y/z">>, <<"hi1">>),
|
||||||
|
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
|
||||||
|
emqtt:subscribe(T, <<"x/y/z">>),
|
||||||
|
emqtt:unsubscribe(T, <<"x/y/z">>),
|
||||||
|
ct:sleep(200),
|
||||||
|
|
||||||
|
{ok, Bin} = file:read_file("tmp/ip_trace_x.log"),
|
||||||
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
|
||||||
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi2">>])),
|
||||||
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"PUBLISH">>])),
|
||||||
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"SUBSCRIBE">>])),
|
||||||
|
?assertNotEqual(nomatch, binary:match(Bin, [<<"UNSUBSCRIBE">>])),
|
||||||
|
?assert(filelib:file_size("tmp/ip_trace_y.log") =:= 0),
|
||||||
|
|
||||||
|
%% Stop tracing
|
||||||
|
ok = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.1">>),
|
||||||
|
ok = emqx_trace_handler:uninstall(ip_address, <<"192.168.1.1">>),
|
||||||
|
{error, _Reason} = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.2">>),
|
||||||
|
emqtt:disconnect(T),
|
||||||
|
?assertEqual([], emqx_trace_handler:running()).
|
|
@ -1,120 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_tracer_SUITE).
|
|
||||||
|
|
||||||
-compile(export_all).
|
|
||||||
-compile(nowarn_export_all).
|
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
|
||||||
|
|
||||||
-include_lib("common_test/include/ct.hrl").
|
|
||||||
|
|
||||||
all() -> [t_trace_clientid, t_trace_topic].
|
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
|
||||||
emqx_common_test_helpers:boot_modules(all),
|
|
||||||
emqx_common_test_helpers:start_apps([]),
|
|
||||||
Config.
|
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
|
||||||
emqx_common_test_helpers:stop_apps([]).
|
|
||||||
|
|
||||||
t_trace_clientid(_Config) ->
|
|
||||||
{ok, T} = emqtt:start_link([{host, "localhost"},
|
|
||||||
{clientid, <<"client">>},
|
|
||||||
{username, <<"testuser">>},
|
|
||||||
{password, <<"pass">>}
|
|
||||||
]),
|
|
||||||
emqtt:connect(T),
|
|
||||||
|
|
||||||
%% Start tracing
|
|
||||||
emqx_logger:set_log_level(error),
|
|
||||||
{error, _} = emqx_tracer:start_trace({clientid, <<"client">>}, debug, "tmp/client.log"),
|
|
||||||
emqx_logger:set_log_level(debug),
|
|
||||||
ok = emqx_tracer:start_trace({clientid, <<"client">>}, debug, "tmp/client.log"),
|
|
||||||
ok = emqx_tracer:start_trace({clientid, <<"client2">>}, all, "tmp/client2.log"),
|
|
||||||
ok = emqx_tracer:start_trace({clientid, <<"client3">>}, all, "tmp/client3.log"),
|
|
||||||
{error, {invalid_log_level, bad_level}} = emqx_tracer:start_trace({clientid, <<"client4">>}, bad_level, "tmp/client4.log"),
|
|
||||||
{error, {handler_not_added, {file_error,".",eisdir}}} = emqx_tracer:start_trace({clientid, <<"client5">>}, debug, "."),
|
|
||||||
ct:sleep(100),
|
|
||||||
|
|
||||||
%% Verify the tracing file exits
|
|
||||||
?assert(filelib:is_regular("tmp/client.log")),
|
|
||||||
?assert(filelib:is_regular("tmp/client2.log")),
|
|
||||||
|
|
||||||
%% Get current traces
|
|
||||||
?assertEqual([{{clientid,"client"},{debug,"tmp/client.log"}},
|
|
||||||
{{clientid,"client2"},{debug,"tmp/client2.log"}},
|
|
||||||
{{clientid,"client3"},{debug,"tmp/client3.log"}}
|
|
||||||
], emqx_tracer:lookup_traces()),
|
|
||||||
|
|
||||||
%% set the overall log level to debug
|
|
||||||
emqx_logger:set_log_level(debug),
|
|
||||||
|
|
||||||
%% Client with clientid = "client" publishes a "hi" message to "a/b/c".
|
|
||||||
emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
|
|
||||||
ct:sleep(200),
|
|
||||||
|
|
||||||
%% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log".
|
|
||||||
?assert(filelib:file_size("tmp/client.log") > 0),
|
|
||||||
?assert(filelib:file_size("tmp/client2.log") == 0),
|
|
||||||
|
|
||||||
%% Stop tracing
|
|
||||||
ok = emqx_tracer:stop_trace({clientid, <<"client">>}),
|
|
||||||
ok = emqx_tracer:stop_trace({clientid, <<"client2">>}),
|
|
||||||
ok = emqx_tracer:stop_trace({clientid, <<"client3">>}),
|
|
||||||
emqtt:disconnect(T),
|
|
||||||
|
|
||||||
emqx_logger:set_log_level(warning).
|
|
||||||
|
|
||||||
t_trace_topic(_Config) ->
|
|
||||||
{ok, T} = emqtt:start_link([{host, "localhost"},
|
|
||||||
{clientid, <<"client">>},
|
|
||||||
{username, <<"testuser">>},
|
|
||||||
{password, <<"pass">>}
|
|
||||||
]),
|
|
||||||
emqtt:connect(T),
|
|
||||||
|
|
||||||
%% Start tracing
|
|
||||||
emqx_logger:set_log_level(debug),
|
|
||||||
ok = emqx_tracer:start_trace({topic, <<"x/#">>}, all, "tmp/topic_trace.log"),
|
|
||||||
ok = emqx_tracer:start_trace({topic, <<"y/#">>}, all, "tmp/topic_trace.log"),
|
|
||||||
ct:sleep(100),
|
|
||||||
|
|
||||||
%% Verify the tracing file exits
|
|
||||||
?assert(filelib:is_regular("tmp/topic_trace.log")),
|
|
||||||
|
|
||||||
%% Get current traces
|
|
||||||
?assertEqual([{{topic,"x/#"},{debug,"tmp/topic_trace.log"}},
|
|
||||||
{{topic,"y/#"},{debug,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()),
|
|
||||||
|
|
||||||
%% set the overall log level to debug
|
|
||||||
emqx_logger:set_log_level(debug),
|
|
||||||
|
|
||||||
%% Client with clientid = "client" publishes a "hi" message to "x/y/z".
|
|
||||||
emqtt:publish(T, <<"x/y/z">>, <<"hi">>),
|
|
||||||
ct:sleep(200),
|
|
||||||
|
|
||||||
?assert(filelib:file_size("tmp/topic_trace.log") > 0),
|
|
||||||
|
|
||||||
%% Stop tracing
|
|
||||||
ok = emqx_tracer:stop_trace({topic, <<"x/#">>}),
|
|
||||||
ok = emqx_tracer:stop_trace({topic, <<"y/#">>}),
|
|
||||||
{error, _Reason} = emqx_tracer:stop_trace({topic, <<"z/#">>}),
|
|
||||||
emqtt:disconnect(T),
|
|
||||||
|
|
||||||
emqx_logger:set_log_level(warning).
|
|
|
@ -36,6 +36,7 @@
|
||||||
, vm/1
|
, vm/1
|
||||||
, mnesia/1
|
, mnesia/1
|
||||||
, trace/1
|
, trace/1
|
||||||
|
, traces/1
|
||||||
, log/1
|
, log/1
|
||||||
, authz/1
|
, authz/1
|
||||||
, olp/1
|
, olp/1
|
||||||
|
@ -376,54 +377,133 @@ log(_) ->
|
||||||
%% @doc Trace Command
|
%% @doc Trace Command
|
||||||
|
|
||||||
trace(["list"]) ->
|
trace(["list"]) ->
|
||||||
lists:foreach(fun({{Who, Name}, {Level, LogFile}}) ->
|
lists:foreach(fun(Trace) ->
|
||||||
emqx_ctl:print(
|
#{type := Type, filter := Filter, level := Level, dst := Dst} = Trace,
|
||||||
"Trace(~ts=~ts, level=~ts, destination=~p)~n",
|
emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst])
|
||||||
[Who, Name, Level, LogFile]
|
end, emqx_trace_handler:running());
|
||||||
)
|
|
||||||
end, emqx_tracer:lookup_traces());
|
|
||||||
|
|
||||||
trace(["stop", "client", ClientId]) ->
|
trace(["stop", Operation, ClientId]) ->
|
||||||
trace_off(clientid, ClientId);
|
case trace_type(Operation) of
|
||||||
|
{ok, Type} -> trace_off(Type, ClientId);
|
||||||
|
error -> trace([])
|
||||||
|
end;
|
||||||
|
|
||||||
trace(["start", "client", ClientId, LogFile]) ->
|
trace(["start", Operation, ClientId, LogFile]) ->
|
||||||
trace_on(clientid, ClientId, all, LogFile);
|
trace(["start", Operation, ClientId, LogFile, "all"]);
|
||||||
|
|
||||||
trace(["start", "client", ClientId, LogFile, Level]) ->
|
trace(["start", Operation, ClientId, LogFile, Level]) ->
|
||||||
trace_on(clientid, ClientId, list_to_atom(Level), LogFile);
|
case trace_type(Operation) of
|
||||||
|
{ok, Type} -> trace_on(Type, ClientId, list_to_existing_atom(Level), LogFile);
|
||||||
trace(["stop", "topic", Topic]) ->
|
error -> trace([])
|
||||||
trace_off(topic, Topic);
|
end;
|
||||||
|
|
||||||
trace(["start", "topic", Topic, LogFile]) ->
|
|
||||||
trace_on(topic, Topic, all, LogFile);
|
|
||||||
|
|
||||||
trace(["start", "topic", Topic, LogFile, Level]) ->
|
|
||||||
trace_on(topic, Topic, list_to_atom(Level), LogFile);
|
|
||||||
|
|
||||||
trace(_) ->
|
trace(_) ->
|
||||||
emqx_ctl:usage([{"trace list", "List all traces started"},
|
emqx_ctl:usage([{"trace list", "List all traces started on local node"},
|
||||||
{"trace start client <ClientId> <File> [<Level>]", "Traces for a client"},
|
{"trace start client <ClientId> <File> [<Level>]",
|
||||||
{"trace stop client <ClientId>", "Stop tracing for a client"},
|
"Traces for a client on local node"},
|
||||||
{"trace start topic <Topic> <File> [<Level>] ", "Traces for a topic"},
|
{"trace stop client <ClientId>",
|
||||||
{"trace stop topic <Topic> ", "Stop tracing for a topic"}]).
|
"Stop tracing for a client on local node"},
|
||||||
|
{"trace start topic <Topic> <File> [<Level>] ",
|
||||||
|
"Traces for a topic on local node"},
|
||||||
|
{"trace stop topic <Topic> ",
|
||||||
|
"Stop tracing for a topic on local node"},
|
||||||
|
{"trace start ip_address <IP> <File> [<Level>] ",
|
||||||
|
"Traces for a client ip on local node"},
|
||||||
|
{"trace stop ip_addresss <IP> ",
|
||||||
|
"Stop tracing for a client ip on local node"}
|
||||||
|
]).
|
||||||
|
|
||||||
trace_on(Who, Name, Level, LogFile) ->
|
trace_on(Who, Name, Level, LogFile) ->
|
||||||
case emqx_tracer:start_trace({Who, iolist_to_binary(Name)}, Level, LogFile) of
|
case emqx_trace_handler:install(Who, Name, Level, LogFile) of
|
||||||
ok ->
|
ok ->
|
||||||
emqx_ctl:print("trace ~ts ~ts successfully~n", [Who, Name]);
|
emqx_ctl:print("trace ~s ~s successfully~n", [Who, Name]);
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
emqx_ctl:print("[error] trace ~ts ~ts: ~p~n", [Who, Name, Error])
|
emqx_ctl:print("[error] trace ~s ~s: ~p~n", [Who, Name, Error])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
trace_off(Who, Name) ->
|
trace_off(Who, Name) ->
|
||||||
case emqx_tracer:stop_trace({Who, iolist_to_binary(Name)}) of
|
case emqx_trace_handler:uninstall(Who, Name) of
|
||||||
ok ->
|
ok ->
|
||||||
emqx_ctl:print("stop tracing ~ts ~ts successfully~n", [Who, Name]);
|
emqx_ctl:print("stop tracing ~s ~s successfully~n", [Who, Name]);
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
emqx_ctl:print("[error] stop tracing ~ts ~ts: ~p~n", [Who, Name, Error])
|
emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Who, Name, Error])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% @doc Trace Cluster Command
|
||||||
|
traces(["list"]) ->
|
||||||
|
{ok, List} = emqx_trace_api:list_trace(get, []),
|
||||||
|
case List of
|
||||||
|
[] ->
|
||||||
|
emqx_ctl:print("Cluster Trace is empty~n", []);
|
||||||
|
_ ->
|
||||||
|
lists:foreach(fun(Trace) ->
|
||||||
|
#{type := Type, name := Name, status := Status,
|
||||||
|
log_size := LogSize} = Trace,
|
||||||
|
emqx_ctl:print("Trace(~s: ~s=~s, ~s, LogSize:~p)~n",
|
||||||
|
[Name, Type, maps:get(Type, Trace), Status, LogSize])
|
||||||
|
end, List)
|
||||||
|
end,
|
||||||
|
length(List);
|
||||||
|
|
||||||
|
traces(["stop", Name]) ->
|
||||||
|
trace_cluster_off(Name);
|
||||||
|
|
||||||
|
traces(["delete", Name]) ->
|
||||||
|
trace_cluster_del(Name);
|
||||||
|
|
||||||
|
traces(["start", Name, Operation, Filter]) ->
|
||||||
|
traces(["start", Name, Operation, Filter, "900"]);
|
||||||
|
|
||||||
|
traces(["start", Name, Operation, Filter, DurationS]) ->
|
||||||
|
case trace_type(Operation) of
|
||||||
|
{ok, Type} -> trace_cluster_on(Name, Type, Filter, DurationS);
|
||||||
|
error -> traces([])
|
||||||
|
end;
|
||||||
|
|
||||||
|
traces(_) ->
|
||||||
|
emqx_ctl:usage([{"traces list", "List all cluster traces started"},
|
||||||
|
{"traces start <Name> client <ClientId>", "Traces for a client in cluster"},
|
||||||
|
{"traces start <Name> topic <Topic>", "Traces for a topic in cluster"},
|
||||||
|
{"traces start <Name> ip_address <IPAddr>", "Traces for a IP in cluster"},
|
||||||
|
{"traces stop <Name>", "Stop trace in cluster"},
|
||||||
|
{"traces delete <Name>", "Delete trace in cluster"}
|
||||||
|
]).
|
||||||
|
|
||||||
|
trace_cluster_on(Name, Type, Filter, DurationS0) ->
|
||||||
|
DurationS = list_to_integer(DurationS0),
|
||||||
|
Now = erlang:system_time(second),
|
||||||
|
Trace = #{ name => list_to_binary(Name)
|
||||||
|
, type => atom_to_binary(Type)
|
||||||
|
, Type => list_to_binary(Filter)
|
||||||
|
, start_at => list_to_binary(calendar:system_time_to_rfc3339(Now))
|
||||||
|
, end_at => list_to_binary(calendar:system_time_to_rfc3339(Now + DurationS))
|
||||||
|
},
|
||||||
|
case emqx_trace:create(Trace) of
|
||||||
|
ok ->
|
||||||
|
emqx_ctl:print("cluster_trace ~p ~s ~s successfully~n", [Type, Filter, Name]);
|
||||||
|
{error, Error} ->
|
||||||
|
emqx_ctl:print("[error] cluster_trace ~s ~s=~s ~p~n",
|
||||||
|
[Name, Type, Filter, Error])
|
||||||
|
end.
|
||||||
|
|
||||||
|
trace_cluster_del(Name) ->
|
||||||
|
case emqx_trace:delete(list_to_binary(Name)) of
|
||||||
|
ok -> emqx_ctl:print("Del cluster_trace ~s successfully~n", [Name]);
|
||||||
|
{error, Error} -> emqx_ctl:print("[error] Del cluster_trace ~s: ~p~n", [Name, Error])
|
||||||
|
end.
|
||||||
|
|
||||||
|
trace_cluster_off(Name) ->
|
||||||
|
case emqx_trace:update(list_to_binary(Name), false) of
|
||||||
|
ok -> emqx_ctl:print("Stop cluster_trace ~s successfully~n", [Name]);
|
||||||
|
{error, Error} -> emqx_ctl:print("[error] Stop cluster_trace ~s: ~p~n", [Name, Error])
|
||||||
|
end.
|
||||||
|
|
||||||
|
trace_type("client") -> {ok, clientid};
|
||||||
|
trace_type("topic") -> {ok, topic};
|
||||||
|
trace_type("ip_address") -> {ok, ip_address};
|
||||||
|
trace_type(_) -> error.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc Listeners Command
|
%% @doc Listeners Command
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,7 @@ start_link() ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
init([]) ->
|
init([]) ->
|
||||||
{ok, {{one_for_one, 10, 3600},
|
{ok, {{one_for_one, 10, 3600},
|
||||||
[?CHILD(emqx_telemetry),
|
[ ?CHILD(emqx_telemetry)
|
||||||
?CHILD(emqx_topic_metrics),
|
, ?CHILD(emqx_topic_metrics)
|
||||||
?CHILD(emqx_delayed)]}}.
|
, ?CHILD(emqx_trace)
|
||||||
|
, ?CHILD(emqx_delayed)]}}.
|
||||||
|
|
Loading…
Reference in New Issue