Merge pull request #6547 from zhongwencool/trace-formatter

feat(trace): replace logger_formatter by emqx_trace_formatter
This commit is contained in:
zhongwencool 2021-12-30 23:40:59 +08:00 committed by GitHub
commit d16362af96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 652 additions and 379 deletions

View File

@ -59,15 +59,32 @@
%% structured logging
-define(SLOG(Level, Data),
%% check 'allow' here, only evaluate Data when necessary
case logger:allow(Level, ?MODULE) of
true ->
logger:log(Level, (Data), #{ mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}
, line => ?LINE
});
false ->
ok
end).
?SLOG(Level, Data, #{})).
%% structured logging, meta is for handler's filter.
-define(SLOG(Level, Data, Meta),
%% check 'allow' here, only evaluate Data and Meta when necessary
case logger:allow(Level, ?MODULE) of
true ->
logger:log(Level, (Data), (Meta#{ mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}
, line => ?LINE
}));
false ->
ok
end).
-define(TRACE_FILTER, emqx_trace_filter).
%% Only evaluate when necessary
-define(TRACE(Event, Msg, Meta),
begin
case persistent_term:get(?TRACE_FILTER, undefined) of
undefined -> ok;
[] -> ok;
List ->
emqx_trace:log(List, Event, Msg, Meta)
end
end).
%% print to 'user' group leader
-define(ULOG(Fmt, Args), io:format(user, Fmt, Args)).

View File

@ -187,7 +187,7 @@ convert_certs(CertsDir, Config) ->
{ok, SSL} ->
new_ssl_config(Config, SSL);
{error, Reason} ->
?SLOG(error, Reason#{msg => bad_ssl_config}),
?SLOG(error, Reason#{msg => "bad_ssl_config"}),
throw({bad_ssl_config, Reason})
end.
@ -199,7 +199,7 @@ convert_certs(CertsDir, NewConfig, OldConfig) ->
ok = emqx_tls_lib:delete_ssl_files(CertsDir, NewSSL1, OldSSL),
new_ssl_config(NewConfig, NewSSL1);
{error, Reason} ->
?SLOG(error, Reason#{msg => bad_ssl_config}),
?SLOG(error, Reason#{msg => "bad_ssl_config"}),
throw({bad_ssl_config, Reason})
end.

View File

@ -204,9 +204,9 @@ publish(Msg) when is_record(Msg, message) ->
_ = emqx_trace:publish(Msg),
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
#message{headers = #{allow_publish := false}} ->
?SLOG(debug, #{msg => "message_not_published",
payload => emqx_message:to_log_map(Msg)}),
#message{headers = #{allow_publish := false}, topic = Topic} ->
?TRACE("MQTT", "msg_publish_not_allowed", #{message => emqx_message:to_log_map(Msg),
topic => Topic}),
[];
Msg1 = #message{topic = Topic} ->
emqx_persistent_session:persist_message(Msg1),
@ -226,7 +226,9 @@ safe_publish(Msg) when is_record(Msg, message) ->
reason => Reason,
payload => emqx_message:to_log_map(Msg),
stacktrace => Stk
}),
},
#{topic => Msg#message.topic}
),
[]
end.
@ -280,7 +282,7 @@ forward(Node, To, Delivery, async) ->
msg => "async_forward_msg_to_node_failed",
node => Node,
reason => Reason
}),
}, #{topic => To}),
{error, badrpc}
end;
@ -291,7 +293,7 @@ forward(Node, To, Delivery, sync) ->
msg => "sync_forward_msg_to_node_failed",
node => Node,
reason => Reason
}),
}, #{topic => To}),
{error, badrpc};
Result ->
emqx_metrics:inc('messages.forward'), Result

View File

@ -292,7 +292,7 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
fun check_banned/2
], ConnPkt, Channel#channel{conn_state = connecting}) of
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
NChannel1 = NChannel#channel{
will_msg = emqx_packet:will_msg(NConnPkt),
alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
@ -550,9 +550,8 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
{error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
?SLOG(warning, #{
msg => "cannot_publish_to_topic",
topic => Topic,
reason => emqx_reason_codes:name(Rc)
}),
}, #{topic => Topic}),
case emqx:get_config([authorization, deny_action], ignore) of
ignore ->
case QoS of
@ -568,9 +567,8 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
{error, Rc = ?RC_QUOTA_EXCEEDED, NChannel} ->
?SLOG(warning, #{
msg => "cannot_publish_to_topic",
topic => Topic,
reason => emqx_reason_codes:name(Rc)
}),
}, #{topic => Topic}),
case QoS of
?QOS_0 ->
ok = emqx_metrics:inc('packets.publish.dropped'),
@ -585,7 +583,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
msg => "cannot_publish_to_topic",
topic => Topic,
reason => emqx_reason_codes:name(Rc)
}),
}, #{topic => Topic}),
handle_out(disconnect, Rc, NChannel)
end.
@ -635,7 +633,7 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
msg => "dropped_qos2_packet",
reason => emqx_reason_codes:name(RC),
packet_id => PacketId
}),
}, #{topic => Msg#message.topic}),
ok = emqx_metrics:inc('packets.publish.dropped'),
handle_out(pubrec, {PacketId, RC}, Channel)
end.
@ -687,7 +685,7 @@ process_subscribe([Topic = {TopicFilter, SubOpts} | More], SubProps, Channel, Ac
?SLOG(warning, #{
msg => "cannot_subscribe_topic_filter",
reason => emqx_reason_codes:name(ReasonCode)
}),
}, #{topic => TopicFilter}),
process_subscribe(More, SubProps, Channel, [{Topic, ReasonCode} | Acc])
end.
@ -703,7 +701,7 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
?SLOG(warning, #{
msg => "cannot_subscribe_topic_filter",
reason => emqx_reason_codes:text(RC)
}),
}, #{topic => NTopicFilter}),
{RC, Channel}
end.

View File

@ -375,7 +375,7 @@ discard_session(ClientId) when is_binary(ClientId) ->
-spec kick_or_kill(kick | discard, module(), pid()) -> ok.
kick_or_kill(Action, ConnMod, Pid) ->
try
%% this is essentailly a gen_server:call implemented in emqx_connection
%% this is essentially a gen_server:call implemented in emqx_connection
%% and emqx_ws_connection.
%% the handle_call is implemented in emqx_channel
ok = apply(ConnMod, call, [Pid, Action, ?T_KICK])
@ -390,19 +390,12 @@ kick_or_kill(Action, ConnMod, Pid) ->
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
_ : {timeout, {gen_server, call, _}} ->
?tp(warning, "session_kick_timeout",
#{pid => Pid,
action => Action,
stale_channel => stale_channel_info(Pid)
}),
#{pid => Pid, action => Action, stale_channel => stale_channel_info(Pid)}),
ok = force_kill(Pid);
_ : Error : St ->
?tp(error, "session_kick_exception",
#{pid => Pid,
action => Action,
reason => Error,
stacktrace => St,
stale_channel => stale_channel_info(Pid)
}),
#{pid => Pid, action => Action, reason => Error, stacktrace => St,
stale_channel => stale_channel_info(Pid)}),
ok = force_kill(Pid)
end.
@ -448,20 +441,22 @@ kick_session(Action, ClientId, ChanPid) ->
, action => Action
, error => Error
, reason => Reason
})
},
#{clientid => ClientId})
end.
kick_session(ClientId) ->
case lookup_channels(ClientId) of
[] ->
?SLOG(warning, #{msg => "kicked_an_unknown_session",
clientid => ClientId}),
?SLOG(warning, #{msg => "kicked_an_unknown_session"},
#{clientid => ClientId}),
ok;
ChanPids ->
case length(ChanPids) > 1 of
true ->
?SLOG(warning, #{msg => "more_than_one_channel_found",
chan_pids => ChanPids});
chan_pids => ChanPids},
#{clientid => ClientId});
false -> ok
end,
lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids)
@ -478,12 +473,12 @@ with_channel(ClientId, Fun) ->
Pids -> Fun(lists:last(Pids))
end.
%% @doc Get all registed channel pids. Debugg/test interface
%% @doc Get all registered channel pids. Debug/test interface
all_channels() ->
Pat = [{{'_', '$1'}, [], ['$1']}],
ets:select(?CHAN_TAB, Pat).
%% @doc Get all registed clientIDs. Debugg/test interface
%% @doc Get all registered clientIDs. Debug/test interface
all_client_ids() ->
Pat = [{{'$1', '_'}, [], ['$1']}],
ets:select(?CHAN_TAB, Pat).
@ -511,7 +506,7 @@ lookup_channels(local, ClientId) ->
rpc_call(Node, Fun, Args, Timeout) ->
case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of
{badrpc, Reason} ->
%% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler
%% since emqx app 4.3.10, the 'kick' and 'discard' calls handler
%% should catch all exceptions and always return 'ok'.
%% This leaves 'badrpc' only possible when there is problem
%% calling the remote node.

View File

@ -262,7 +262,7 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) ->
{ok, RawRichConf} ->
init_load(SchemaMod, RawRichConf);
{error, Reason} ->
?SLOG(error, #{msg => failed_to_load_hocon_conf,
?SLOG(error, #{msg => "failed_to_load_hocon_conf",
reason => Reason,
pwd => file:get_cwd(),
include_dirs => IncDir
@ -397,7 +397,7 @@ save_to_override_conf(RawConf, Opts) ->
case file:write_file(FileName, hocon_pp:do(RawConf, #{})) of
ok -> ok;
{error, Reason} ->
?SLOG(error, #{msg => failed_to_write_override_file,
?SLOG(error, #{msg => "failed_to_write_override_file",
filename => FileName,
reason => Reason}),
{error, Reason}

View File

@ -449,14 +449,12 @@ handle_msg({'$gen_cast', Req}, State) ->
{ok, NewState};
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
?SLOG(debug, #{msg => "RECV_data", data => Data, transport => Inet}),
Oct = iolist_size(Data),
inc_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct),
when_bytes_in(Oct, Data, State);
handle_msg({quic, Data, _Sock, _, _, _}, State) ->
?SLOG(debug, #{msg => "RECV_data", data => Data, transport => quic}),
Oct = iolist_size(Data),
inc_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct),
@ -528,7 +526,7 @@ handle_msg({connack, ConnAck}, State) ->
handle_outgoing(ConnAck, State);
handle_msg({close, Reason}, State) ->
?SLOG(debug, #{msg => "force_socket_close", reason => Reason}),
?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}),
handle_info({sock_closed, Reason}, close_socket(State));
handle_msg({event, connected}, State = #state{channel = Channel}) ->
@ -566,7 +564,8 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport,
Channel1 = emqx_channel:set_conn_state(disconnected, Channel),
emqx_congestion:cancel_alarms(Socket, Transport, Channel1),
emqx_channel:terminate(Reason, Channel1),
close_socket_ok(State)
close_socket_ok(State),
?TRACE("SOCKET", "tcp_socket_terminated", #{reason => Reason})
catch
E : C : S ->
?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S})
@ -716,7 +715,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
ok = inc_incoming_stats(Packet),
?SLOG(debug, #{msg => "RECV_packet", packet => emqx_packet:format(Packet)}),
?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
with_channel(handle_in, [Packet], State);
handle_incoming(FrameError, State) ->
@ -755,15 +754,13 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
<<>> -> ?SLOG(warning, #{
msg => "packet_is_discarded",
reason => "frame_is_too_large",
packet => emqx_packet:format(Packet)
packet => emqx_packet:format(Packet, hidden)
}),
ok = emqx_metrics:inc('delivery.dropped.too_large'),
ok = emqx_metrics:inc('delivery.dropped'),
<<>>;
Data -> ?SLOG(debug, #{
msg => "SEND_packet",
packet => emqx_packet:format(Packet)
}),
Data ->
?TRACE("MQTT", "mqtt_packet_sent", #{packet => Packet}),
ok = inc_outgoing_stats(Packet),
Data
catch
@ -875,7 +872,7 @@ check_limiter(Needs,
{ok, Limiter2} ->
WhenOk(Data, Msgs, State#state{limiter = Limiter2});
{pause, Time, Limiter2} ->
?SLOG(warning, #{msg => "pause time dueto rate limit",
?SLOG(warning, #{msg => "pause_time_dueto_rate_limit",
needs => Needs,
time_in_ms => Time}),
@ -915,7 +912,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
, limiter_timer = undefined
});
{pause, Time, Limiter2} ->
?SLOG(warning, #{msg => "pause time dueto rate limit",
?SLOG(warning, #{msg => "pause_time_dueto_rate_limit",
types => Types,
time_in_ms => Time}),

View File

@ -118,11 +118,10 @@ handle_cast({detected, #flapping{clientid = ClientId,
true -> %% Flapping happened:(
?SLOG(warning, #{
msg => "flapping_detected",
client_id => ClientId,
peer_host => fmt_host(PeerHost),
detect_cnt => DetectCnt,
wind_time_in_ms => WindTime
}),
}, #{clientid => ClientId}),
Now = erlang:system_time(second),
Banned = #banned{who = {clientid, ClientId},
by = <<"flapping detector">>,
@ -134,11 +133,10 @@ handle_cast({detected, #flapping{clientid = ClientId,
false ->
?SLOG(warning, #{
msg => "client_disconnected",
client_id => ClientId,
peer_host => fmt_host(PeerHost),
detect_cnt => DetectCnt,
interval => Interval
})
}, #{clientid => ClientId})
end,
{noreply, State};

View File

@ -197,15 +197,7 @@ critical(Metadata, Format, Args) when is_map(Metadata) ->
set_metadata_clientid(<<>>) ->
ok;
set_metadata_clientid(ClientId) ->
try
%% try put string format client-id metadata so
%% so the log is not like <<"...">>
Id = unicode:characters_to_list(ClientId, utf8),
set_proc_metadata(#{clientid => Id})
catch
_: _->
ok
end.
set_proc_metadata(#{clientid => ClientId}).
-spec(set_metadata_peername(peername_str()) -> ok).
set_metadata_peername(Peername) ->

View File

@ -18,22 +18,77 @@
-export([format/2]).
-export([check_config/1]).
-export([try_format_unicode/1]).
check_config(X) -> logger_formatter:check_config(X).
format(#{msg := {report, Report}, meta := Meta} = Event, Config) when is_map(Report) ->
logger_formatter:format(Event#{msg := {report, enrich(Report, Meta)}}, Config);
format(#{msg := Msg, meta := Meta} = Event, Config) ->
NewMsg = enrich_fmt(Msg, Meta),
logger_formatter:format(Event#{msg := NewMsg}, Config).
format(#{msg := {report, Report0}, meta := Meta} = Event, Config) when is_map(Report0) ->
Report1 = enrich_report_mfa(Report0, Meta),
Report2 = enrich_report_clientid(Report1, Meta),
Report3 = enrich_report_peername(Report2, Meta),
Report4 = enrich_report_topic(Report3, Meta),
logger_formatter:format(Event#{msg := {report, Report4}}, Config);
format(#{msg := {string, String}} = Event, Config) ->
format(Event#{msg => {"~ts ", String}}, Config);
format(#{msg := Msg0, meta := Meta} = Event, Config) ->
Msg1 = enrich_client_info(Msg0, Meta),
Msg2 = enrich_mfa(Msg1, Meta),
Msg3 = enrich_topic(Msg2, Meta),
logger_formatter:format(Event#{msg := Msg3}, Config).
enrich(Report, #{mfa := Mfa, line := Line}) ->
try_format_unicode(Char) ->
List =
try
case unicode:characters_to_list(Char) of
{error, _, _} -> error;
{incomplete, _, _} -> error;
Binary -> Binary
end
catch _:_ ->
error
end,
case List of
error -> io_lib:format("~0p", [Char]);
_ -> List
end.
enrich_report_mfa(Report, #{mfa := Mfa, line := Line}) ->
Report#{mfa => mfa(Mfa), line => Line};
enrich(Report, _) -> Report.
enrich_report_mfa(Report, _) -> Report.
enrich_fmt({Fmt, Args}, #{mfa := Mfa, line := Line}) when is_list(Fmt) ->
enrich_report_clientid(Report, #{clientid := ClientId}) ->
Report#{clientid => try_format_unicode(ClientId)};
enrich_report_clientid(Report, _) -> Report.
enrich_report_peername(Report, #{peername := Peername}) ->
Report#{peername => Peername};
enrich_report_peername(Report, _) -> Report.
%% clientid and peername always in emqx_conn's process metadata.
%% topic can be put in meta using ?SLOG/3, or put in msg's report by ?SLOG/2
enrich_report_topic(Report, #{topic := Topic}) ->
Report#{topic => try_format_unicode(Topic)};
enrich_report_topic(Report = #{topic := Topic}, _) ->
Report#{topic => try_format_unicode(Topic)};
enrich_report_topic(Report, _) -> Report.
enrich_mfa({Fmt, Args}, #{mfa := Mfa, line := Line}) when is_list(Fmt) ->
{Fmt ++ " mfa: ~ts line: ~w", Args ++ [mfa(Mfa), Line]};
enrich_fmt(Msg, _) ->
enrich_mfa(Msg, _) ->
Msg.
enrich_client_info({Fmt, Args}, #{clientid := ClientId, peername := Peer}) when is_list(Fmt) ->
{" ~ts@~ts " ++ Fmt, [ClientId, Peer | Args] };
enrich_client_info({Fmt, Args}, #{clientid := ClientId}) when is_list(Fmt) ->
{" ~ts " ++ Fmt, [ClientId | Args]};
enrich_client_info({Fmt, Args}, #{peername := Peer}) when is_list(Fmt) ->
{" ~ts " ++ Fmt, [Peer | Args]};
enrich_client_info(Msg, _) ->
Msg.
enrich_topic({Fmt, Args}, #{topic := Topic}) when is_list(Fmt) ->
{" topic: ~ts" ++ Fmt, [Topic | Args]};
enrich_topic(Msg, _) ->
Msg.
mfa({M, F, A}) -> atom_to_list(M) ++ ":" ++ atom_to_list(F) ++ "/" ++ integer_to_list(A).

View File

@ -44,7 +44,11 @@
, will_msg/1
]).
-export([format/1]).
-export([ format/1
, format/2
]).
-export([encode_hex/1]).
-define(TYPE_NAMES,
{ 'CONNECT'
@ -435,25 +439,28 @@ will_msg(#mqtt_packet_connect{clientid = ClientId,
%% @doc Format packet
-spec(format(emqx_types:packet()) -> iolist()).
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) ->
format_header(Header, format_variable(Variable, Payload)).
format(Packet) -> format(Packet, emqx_trace_handler:payload_encode()).
%% @doc Format packet
-spec(format(emqx_types:packet(), hex | text | hidden) -> iolist()).
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) ->
HeaderIO = format_header(Header),
case format_variable(Variable, Payload, PayloadEncode) of
"" -> HeaderIO;
VarIO -> [HeaderIO,",", VarIO]
end.
format_header(#mqtt_packet_header{type = Type,
dup = Dup,
qos = QoS,
retain = Retain}, S) ->
S1 = case S == undefined of
true -> <<>>;
false -> [", ", S]
end,
io_lib:format("~ts(Q~p, R~p, D~p~ts)", [type_name(Type), QoS, i(Retain), i(Dup), S1]).
retain = Retain}) ->
io_lib:format("~ts(Q~p, R~p, D~p)", [type_name(Type), QoS, i(Retain), i(Dup)]).
format_variable(undefined, _) ->
undefined;
format_variable(Variable, undefined) ->
format_variable(Variable);
format_variable(Variable, Payload) ->
io_lib:format("~ts, Payload=~0p", [format_variable(Variable), Payload]).
format_variable(undefined, _, _) -> "";
format_variable(Variable, undefined, PayloadEncode) ->
format_variable(Variable, PayloadEncode);
format_variable(Variable, Payload, PayloadEncode) ->
[format_variable(Variable, PayloadEncode), format_payload(Payload, PayloadEncode)].
format_variable(#mqtt_packet_connect{
proto_ver = ProtoVer,
@ -467,57 +474,140 @@ format_variable(#mqtt_packet_connect{
will_topic = WillTopic,
will_payload = WillPayload,
username = Username,
password = Password}) ->
Format = "ClientId=~ts, ProtoName=~ts, ProtoVsn=~p, CleanStart=~ts, KeepAlive=~p, Username=~ts, Password=~ts",
Args = [ClientId, ProtoName, ProtoVer, CleanStart, KeepAlive, Username, format_password(Password)],
{Format1, Args1} = if
WillFlag -> {Format ++ ", Will(Q~p, R~p, Topic=~ts, Payload=~0p)",
Args ++ [WillQoS, i(WillRetain), WillTopic, WillPayload]};
true -> {Format, Args}
end,
io_lib:format(Format1, Args1);
password = Password},
PayloadEncode) ->
Base = io_lib:format(
"ClientId=~ts, ProtoName=~ts, ProtoVsn=~p, CleanStart=~ts, KeepAlive=~p, Username=~ts, Password=~ts",
[ClientId, ProtoName, ProtoVer, CleanStart, KeepAlive, Username, format_password(Password)]),
case WillFlag of
true ->
[Base, io_lib:format(", Will(Q~p, R~p, Topic=~ts ",
[WillQoS, i(WillRetain), WillTopic]),
format_payload(WillPayload, PayloadEncode), ")"];
false ->
Base
end;
format_variable(#mqtt_packet_disconnect
{reason_code = ReasonCode}) ->
{reason_code = ReasonCode}, _) ->
io_lib:format("ReasonCode=~p", [ReasonCode]);
format_variable(#mqtt_packet_connack{ack_flags = AckFlags,
reason_code = ReasonCode}) ->
reason_code = ReasonCode}, _) ->
io_lib:format("AckFlags=~p, ReasonCode=~p", [AckFlags, ReasonCode]);
format_variable(#mqtt_packet_publish{topic_name = TopicName,
packet_id = PacketId}) ->
packet_id = PacketId}, _) ->
io_lib:format("Topic=~ts, PacketId=~p", [TopicName, PacketId]);
format_variable(#mqtt_packet_puback{packet_id = PacketId,
reason_code = ReasonCode}) ->
reason_code = ReasonCode}, _) ->
io_lib:format("PacketId=~p, ReasonCode=~p", [PacketId, ReasonCode]);
format_variable(#mqtt_packet_subscribe{packet_id = PacketId,
topic_filters = TopicFilters}) ->
io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, TopicFilters]);
topic_filters = TopicFilters}, _) ->
[io_lib:format("PacketId=~p ", [PacketId]), "TopicFilters=",
format_topic_filters(TopicFilters)];
format_variable(#mqtt_packet_unsubscribe{packet_id = PacketId,
topic_filters = Topics}) ->
io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, Topics]);
topic_filters = Topics}, _) ->
[io_lib:format("PacketId=~p ", [PacketId]), "TopicFilters=",
format_topic_filters(Topics)];
format_variable(#mqtt_packet_suback{packet_id = PacketId,
reason_codes = ReasonCodes}) ->
reason_codes = ReasonCodes}, _) ->
io_lib:format("PacketId=~p, ReasonCodes=~p", [PacketId, ReasonCodes]);
format_variable(#mqtt_packet_unsuback{packet_id = PacketId}) ->
format_variable(#mqtt_packet_unsuback{packet_id = PacketId}, _) ->
io_lib:format("PacketId=~p", [PacketId]);
format_variable(#mqtt_packet_auth{reason_code = ReasonCode}) ->
format_variable(#mqtt_packet_auth{reason_code = ReasonCode}, _) ->
io_lib:format("ReasonCode=~p", [ReasonCode]);
format_variable(PacketId) when is_integer(PacketId) ->
format_variable(PacketId, _) when is_integer(PacketId) ->
io_lib:format("PacketId=~p", [PacketId]).
format_password(undefined) -> undefined;
format_password(_Password) -> '******'.
format_password(undefined) -> "undefined";
format_password(_Password) -> "******".
format_payload(Payload, text) -> ["Payload=", io_lib:format("~ts", [Payload])];
format_payload(Payload, hex) -> ["Payload(hex)=", encode_hex(Payload)];
format_payload(_, hidden) -> "Payload=******".
i(true) -> 1;
i(false) -> 0;
i(I) when is_integer(I) -> I.
format_topic_filters(Filters) ->
["[",
lists:join(",",
lists:map(
fun({TopicFilter, SubOpts}) ->
io_lib:format("~ts(~p)", [TopicFilter, SubOpts]);
(TopicFilter) ->
io_lib:format("~ts", [TopicFilter])
end, Filters)),
"]"].
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Hex encoding functions
%% Copy from binary:encode_hex/1 (was only introduced in OTP24).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-define(HEX(X), (hex(X)):16).
-compile({inline,[hex/1]}).
-spec encode_hex(Bin) -> Bin2 when
Bin :: binary(),
Bin2 :: <<_:_*16>>.
encode_hex(Data) when byte_size(Data) rem 8 =:= 0 ->
<< <<?HEX(A),?HEX(B),?HEX(C),?HEX(D),?HEX(E),?HEX(F),?HEX(G),?HEX(H)>> || <<A,B,C,D,E,F,G,H>> <= Data >>;
encode_hex(Data) when byte_size(Data) rem 7 =:= 0 ->
<< <<?HEX(A),?HEX(B),?HEX(C),?HEX(D),?HEX(E),?HEX(F),?HEX(G)>> || <<A,B,C,D,E,F,G>> <= Data >>;
encode_hex(Data) when byte_size(Data) rem 6 =:= 0 ->
<< <<?HEX(A),?HEX(B),?HEX(C),?HEX(D),?HEX(E),?HEX(F)>> || <<A,B,C,D,E,F>> <= Data >>;
encode_hex(Data) when byte_size(Data) rem 5 =:= 0 ->
<< <<?HEX(A),?HEX(B),?HEX(C),?HEX(D),?HEX(E)>> || <<A,B,C,D,E>> <= Data >>;
encode_hex(Data) when byte_size(Data) rem 4 =:= 0 ->
<< <<?HEX(A),?HEX(B),?HEX(C),?HEX(D)>> || <<A,B,C,D>> <= Data >>;
encode_hex(Data) when byte_size(Data) rem 3 =:= 0 ->
<< <<?HEX(A),?HEX(B),?HEX(C)>> || <<A,B,C>> <= Data >>;
encode_hex(Data) when byte_size(Data) rem 2 =:= 0 ->
<< <<?HEX(A),?HEX(B)>> || <<A,B>> <= Data >>;
encode_hex(Data) when is_binary(Data) ->
<< <<?HEX(N)>> || <<N>> <= Data >>;
encode_hex(Bin) ->
erlang:error(badarg, [Bin]).
hex(X) ->
element(
X+1, {16#3030, 16#3031, 16#3032, 16#3033, 16#3034, 16#3035, 16#3036, 16#3037, 16#3038, 16#3039, 16#3041,
16#3042, 16#3043, 16#3044, 16#3045, 16#3046,
16#3130, 16#3131, 16#3132, 16#3133, 16#3134, 16#3135, 16#3136, 16#3137, 16#3138, 16#3139, 16#3141,
16#3142, 16#3143, 16#3144, 16#3145, 16#3146,
16#3230, 16#3231, 16#3232, 16#3233, 16#3234, 16#3235, 16#3236, 16#3237, 16#3238, 16#3239, 16#3241,
16#3242, 16#3243, 16#3244, 16#3245, 16#3246,
16#3330, 16#3331, 16#3332, 16#3333, 16#3334, 16#3335, 16#3336, 16#3337, 16#3338, 16#3339, 16#3341,
16#3342, 16#3343, 16#3344, 16#3345, 16#3346,
16#3430, 16#3431, 16#3432, 16#3433, 16#3434, 16#3435, 16#3436, 16#3437, 16#3438, 16#3439, 16#3441,
16#3442, 16#3443, 16#3444, 16#3445, 16#3446,
16#3530, 16#3531, 16#3532, 16#3533, 16#3534, 16#3535, 16#3536, 16#3537, 16#3538, 16#3539, 16#3541,
16#3542, 16#3543, 16#3544, 16#3545, 16#3546,
16#3630, 16#3631, 16#3632, 16#3633, 16#3634, 16#3635, 16#3636, 16#3637, 16#3638, 16#3639, 16#3641,
16#3642, 16#3643, 16#3644, 16#3645, 16#3646,
16#3730, 16#3731, 16#3732, 16#3733, 16#3734, 16#3735, 16#3736, 16#3737, 16#3738, 16#3739, 16#3741,
16#3742, 16#3743, 16#3744, 16#3745, 16#3746,
16#3830, 16#3831, 16#3832, 16#3833, 16#3834, 16#3835, 16#3836, 16#3837, 16#3838, 16#3839, 16#3841,
16#3842, 16#3843, 16#3844, 16#3845, 16#3846,
16#3930, 16#3931, 16#3932, 16#3933, 16#3934, 16#3935, 16#3936, 16#3937, 16#3938, 16#3939, 16#3941,
16#3942, 16#3943, 16#3944, 16#3945, 16#3946,
16#4130, 16#4131, 16#4132, 16#4133, 16#4134, 16#4135, 16#4136, 16#4137, 16#4138, 16#4139, 16#4141,
16#4142, 16#4143, 16#4144, 16#4145, 16#4146,
16#4230, 16#4231, 16#4232, 16#4233, 16#4234, 16#4235, 16#4236, 16#4237, 16#4238, 16#4239, 16#4241,
16#4242, 16#4243, 16#4244, 16#4245, 16#4246,
16#4330, 16#4331, 16#4332, 16#4333, 16#4334, 16#4335, 16#4336, 16#4337, 16#4338, 16#4339, 16#4341,
16#4342, 16#4343, 16#4344, 16#4345, 16#4346,
16#4430, 16#4431, 16#4432, 16#4433, 16#4434, 16#4435, 16#4436, 16#4437, 16#4438, 16#4439, 16#4441,
16#4442, 16#4443, 16#4444, 16#4445, 16#4446,
16#4530, 16#4531, 16#4532, 16#4533, 16#4534, 16#4535, 16#4536, 16#4537, 16#4538, 16#4539, 16#4541,
16#4542, 16#4543, 16#4544, 16#4545, 16#4546,
16#4630, 16#4631, 16#4632, 16#4633, 16#4634, 16#4635, 16#4636, 16#4637, 16#4638, 16#4639, 16#4641,
16#4642, 16#4643, 16#4644, 16#4645, 16#4646}).

View File

@ -180,6 +180,12 @@ roots(low) ->
, {"latency_stats",
sc(ref("latency_stats"),
#{})}
, {"trace",
sc(ref("trace"),
#{desc => """
Real-time filtering logs for the ClientID or Topic or IP for debugging.
"""
})}
].
fields("persistent_session_store") ->
@ -977,6 +983,17 @@ when deactivated, but after the retention time.
fields("latency_stats") ->
[ {"samples", sc(integer(), #{default => 10,
desc => "the number of smaples for calculate the average latency of delivery"})}
];
fields("trace") ->
[ {"payload_encode", sc(hoconsc:enum([hex, text, hidden]), #{
default => text,
desc => """
Determine the format of the payload format in the trace file.<br>
`text`: Text-based protocol or plain text protocol. It is recommended when payload is json encode.<br>
`hex`: Binary hexadecimal encode. It is recommended when payload is a custom binary protocol.<br>
`hidden`: payload is obfuscated as `******`
"""
})}
].
mqtt_listener() ->

View File

@ -535,16 +535,20 @@ enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) ->
(Dropped =/= undefined) andalso log_dropped(Dropped, Session),
Session#session{mqueue = NewQ}.
log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of
log_dropped(Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) ->
Payload = emqx_message:to_log_map(Msg),
#{store_qos0 := StoreQos0} = QueueInfo = emqx_mqueue:info(Q),
case (QoS == ?QOS_0) andalso (not StoreQos0) of
true ->
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
?SLOG(warning, #{msg => "dropped_qos0_msg",
payload => emqx_message:to_log_map(Msg)});
queue => QueueInfo,
payload => Payload}, #{topic => Topic});
false ->
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
?SLOG(warning, #{msg => "dropped_msg_due_to_mqueue_is_full",
payload => emqx_message:to_log_map(Msg)})
queue => QueueInfo,
payload => Payload}, #{topic => Topic})
end.
enrich_fun(Session = #session{subscriptions = Subs}) ->

View File

@ -260,7 +260,7 @@ code_change(_OldVsn, State, _Extra) ->
init_resume_worker(RemotePid, SessionID, #{ pmon := Pmon } = State) ->
case emqx_session_router_worker_sup:start_worker(SessionID, RemotePid) of
{error, What} ->
?SLOG(error, #{msg => "Could not start resume worker", reason => What}),
?SLOG(error, #{msg => "failed_to_start_resume_worker", reason => What}),
error;
{ok, Pid} ->
Pmon1 = emqx_pmon:monitor(Pid, Pmon),

View File

@ -26,6 +26,7 @@
-export([ publish/1
, subscribe/3
, unsubscribe/2
, log/4
]).
-export([ start_link/0
@ -36,6 +37,7 @@
, delete/1
, clear/0
, update/2
, check/0
]).
-export([ format/1
@ -50,6 +52,7 @@
-define(TRACE, ?MODULE).
-define(MAX_SIZE, 30).
-define(OWN_KEYS, [level, filters, filter_default, handlers]).
-ifdef(TEST).
-export([ log_file/2
@ -80,27 +83,53 @@ mnesia(boot) ->
publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore;
publish(#message{from = From, topic = Topic, payload = Payload}) when
is_binary(From); is_atom(From) ->
emqx_logger:info(
#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}},
"PUBLISH to ~s: ~0p",
[Topic, Payload]
).
?TRACE("PUBLISH", "publish_to", #{topic => Topic, payload => Payload}).
subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore;
subscribe(Topic, SubId, SubOpts) ->
emqx_logger:info(
#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}},
"~ts SUBSCRIBE ~ts: Options: ~0p",
[SubId, Topic, SubOpts]
).
?TRACE("SUBSCRIBE", "subscribe", #{topic => Topic, sub_opts => SubOpts, sub_id => SubId}).
unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore;
unsubscribe(Topic, SubOpts) ->
emqx_logger:info(
#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}},
"~ts UNSUBSCRIBE ~ts: Options: ~0p",
[maps:get(subid, SubOpts, ""), Topic, SubOpts]
).
?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
log(List, Event, Msg, Meta0) ->
Meta =
case logger:get_process_metadata() of
undefined -> Meta0;
ProcMeta -> maps:merge(ProcMeta, Meta0)
end,
Log = #{level => trace, event => Event, meta => Meta, msg => Msg},
log_filter(List, Log).
log_filter([], _Log) -> ok;
log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) ->
case FilterFun(Log0, {Filter, Name}) of
stop -> stop;
ignore -> ignore;
Log ->
case logger_config:get(ets:whereis(logger), Id) of
{ok, #{module := Module} = HandlerConfig0} ->
HandlerConfig = maps:without(?OWN_KEYS, HandlerConfig0),
try Module:log(Log, HandlerConfig)
catch C:R:S ->
case logger:remove_handler(Id) of
ok ->
logger:internal_log(error, {removed_failing_handler, Id, C, R, S});
{error,{not_found,_}} ->
%% Probably already removed by other client
%% Don't report again
ok;
{error,Reason} ->
logger:internal_log(error,
{removed_handler_failed, Id, Reason, C, R, S})
end
end;
{error, {not_found, Id}} -> ok;
{error, Reason} -> logger:internal_log(error, {find_handle_id_failed, Id, Reason})
end
end,
log_filter(Rest, Log0).
-spec(start_link() -> emqx_types:startlink_ret()).
start_link() ->
@ -161,6 +190,9 @@ update(Name, Enable) ->
end,
transaction(Tran).
check() ->
gen_server:call(?MODULE, check).
-spec get_trace_filename(Name :: binary()) ->
{ok, FileName :: string()} | {error, not_found}.
get_trace_filename(Name) ->
@ -196,15 +228,17 @@ format(Traces) ->
init([]) ->
ok = mria:wait_for_tables([?TRACE]),
erlang:process_flag(trap_exit, true),
OriginLogLevel = emqx_logger:get_primary_log_level(),
ok = filelib:ensure_dir(trace_dir()),
ok = filelib:ensure_dir(zip_dir()),
{ok, _} = mnesia:subscribe({table, ?TRACE, simple}),
Traces = get_enable_trace(),
ok = update_log_primary_level(Traces, OriginLogLevel),
TRef = update_trace(Traces),
{ok, #{timer => TRef, monitors => #{}, primary_log_level => OriginLogLevel}}.
update_trace_handler(),
{ok, #{timer => TRef, monitors => #{}}}.
handle_call(check, _From, State) ->
{_, NewState} = handle_info({mnesia_table_event, check}, State),
{reply, ok, NewState};
handle_call(Req, _From, State) ->
?SLOG(error, #{unexpected_call => Req}),
{reply, ok, State}.
@ -223,11 +257,10 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitor
lists:foreach(fun file:delete/1, Files),
{noreply, State#{monitors => NewMonitors}}
end;
handle_info({timeout, TRef, update_trace},
#{timer := TRef, primary_log_level := OriginLogLevel} = State) ->
handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) ->
Traces = get_enable_trace(),
ok = update_log_primary_level(Traces, OriginLogLevel),
NextTRef = update_trace(Traces),
update_trace_handler(),
{noreply, State#{timer => NextTRef}};
handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) ->
@ -238,11 +271,11 @@ handle_info(Info, State) ->
?SLOG(error, #{unexpected_info => Info}),
{noreply, State}.
terminate(_Reason, #{timer := TRef, primary_log_level := OriginLogLevel}) ->
ok = set_log_primary_level(OriginLogLevel),
terminate(_Reason, #{timer := TRef}) ->
_ = mnesia:unsubscribe({table, ?TRACE, simple}),
emqx_misc:cancel_timer(TRef),
stop_all_trace_handler(),
update_trace_handler(),
_ = file:del_dir_r(zip_dir()),
ok.
@ -270,7 +303,7 @@ update_trace(Traces) ->
disable_finished(Finished),
Started = emqx_trace_handler:running(),
{NeedRunning, AllStarted} = start_trace(Running, Started),
NeedStop = AllStarted -- NeedRunning,
NeedStop = filter_cli_handler(AllStarted) -- NeedRunning,
ok = stop_trace(NeedStop, Started),
clean_stale_trace_files(),
NextTime = find_closest_time(Traces, Now),
@ -308,10 +341,10 @@ disable_finished(Traces) ->
start_trace(Traces, Started0) ->
Started = lists:map(fun(#{name := Name}) -> Name end, Started0),
lists:foldl(fun(#?TRACE{name = Name} = Trace, {Running, StartedAcc}) ->
lists:foldl(fun(#?TRACE{name = Name} = Trace,
{Running, StartedAcc}) ->
case lists:member(Name, StartedAcc) of
true ->
{[Name | Running], StartedAcc};
true -> {[Name | Running], StartedAcc};
false ->
case start_trace(Trace) of
ok -> {[Name | Running], [Name | StartedAcc]};
@ -330,9 +363,11 @@ start_trace(Trace) ->
emqx_trace_handler:install(Who, debug, log_file(Name, Start)).
stop_trace(Finished, Started) ->
lists:foreach(fun(#{name := Name, type := Type}) ->
lists:foreach(fun(#{name := Name, type := Type, filter := Filter}) ->
case lists:member(Name, Finished) of
true -> emqx_trace_handler:uninstall(Type, Name);
true ->
?TRACE("API", "trace_stopping", #{Type => Filter}),
emqx_trace_handler:uninstall(Type, Name);
false -> ok
end
end, Started).
@ -419,7 +454,7 @@ to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) ->
case validate_ip_address(Filter) of
ok ->
Trace0 = maps:without([type, ip_address], Trace),
to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = Filter});
to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = binary_to_list(Filter)});
Error -> Error
end;
to_trace(#{type := Type}, _Rec) -> {error, io_lib:format("required ~s field", [Type])};
@ -481,11 +516,20 @@ transaction(Tran) ->
{aborted, Reason} -> {error, Reason}
end.
update_log_primary_level([], OriginLevel) -> set_log_primary_level(OriginLevel);
update_log_primary_level(_, _) -> set_log_primary_level(debug).
set_log_primary_level(NewLevel) ->
case NewLevel =/= emqx_logger:get_primary_log_level() of
true -> emqx_logger:set_primary_log_level(NewLevel);
false -> ok
update_trace_handler() ->
case emqx_trace_handler:running() of
[] -> persistent_term:erase(?TRACE_FILTER);
Running ->
List = lists:map(fun(#{id := Id, filter_fun := FilterFun,
filter := Filter, name := Name}) ->
{Id, FilterFun, Filter, Name} end, Running),
case List =/= persistent_term:get(?TRACE_FILTER, undefined) of
true -> persistent_term:put(?TRACE_FILTER, List);
false -> ok
end
end.
filter_cli_handler(Names) ->
lists:filter(fun(Name) ->
nomatch =:= re:run(Name, "^CLI-+.", [])
end, Names).

View File

@ -0,0 +1,62 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_trace_formatter).
-export([format/2]).
%%%-----------------------------------------------------------------
%%% API
-spec format(LogEvent, Config) -> unicode:chardata() when
LogEvent :: logger:log_event(),
Config :: logger:config().
format(#{level := trace, event := Event, meta := Meta, msg := Msg},
#{payload_encode := PEncode}) ->
Time = calendar:system_time_to_rfc3339(erlang:system_time(second)),
ClientId = to_iolist(maps:get(clientid, Meta, "")),
Peername = maps:get(peername, Meta, ""),
MetaBin = format_meta(Meta, PEncode),
[Time, " [", Event, "] ", ClientId, "@", Peername, " msg: ", Msg, MetaBin, "\n"];
format(Event, Config) ->
emqx_logger_textfmt:format(Event, Config).
format_meta(Meta0, Encode) ->
Packet = format_packet(maps:get(packet, Meta0, undefined), Encode),
Payload = format_payload(maps:get(payload, Meta0, undefined), Encode),
Meta1 = maps:without([msg, clientid, peername, packet, payload], Meta0),
case Meta1 =:= #{} of
true -> [Packet, Payload];
false -> [Packet, ", ", map_to_iolist(Meta1), Payload]
end.
format_packet(undefined, _) -> "";
format_packet(Packet, Encode) -> [", packet: ", emqx_packet:format(Packet, Encode)].
format_payload(undefined, _) -> "";
format_payload(Payload, text) -> [", payload: ", io_lib:format("~ts", [Payload])];
format_payload(Payload, hex) -> [", payload(hex): ", emqx_packet:encode_hex(Payload)];
format_payload(_, hidden) -> ", payload=******".
to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom);
to_iolist(Int) when is_integer(Int) -> integer_to_list(Int);
to_iolist(Float) when is_float(Float) -> float_to_list(Float, [{decimals, 2}]);
to_iolist(SubMap) when is_map(SubMap) -> ["[", map_to_iolist(SubMap), "]"];
to_iolist(Char) -> emqx_logger_textfmt:try_format_unicode(Char).
map_to_iolist(Map) ->
lists:join(",",
lists:map(fun({K, V}) -> [to_iolist(K), ": ", to_iolist(V)] end,
maps:to_list(Map))).

View File

@ -25,6 +25,7 @@
-export([ running/0
, install/3
, install/4
, install/5
, uninstall/1
, uninstall/2
]).
@ -36,6 +37,7 @@
]).
-export([handler_id/2]).
-export([payload_encode/0]).
-type tracer() :: #{
name := binary(),
@ -77,22 +79,18 @@ install(Type, Filter, Level, LogFile) ->
-spec install(tracer(), logger:level() | all, string()) -> ok | {error, term()}.
install(Who, all, LogFile) ->
install(Who, debug, LogFile);
install(Who, Level, LogFile) ->
PrimaryLevel = emqx_logger:get_primary_log_level(),
try logger:compare_levels(Level, PrimaryLevel) of
lt ->
{error,
io_lib:format(
"Cannot trace at a log level (~s) "
"lower than the primary log level (~s)",
[Level, PrimaryLevel]
)};
_GtOrEq ->
install_handler(Who, Level, LogFile)
catch
error:badarg ->
{error, {invalid_log_level, Level}}
end.
install(Who = #{name := Name, type := Type}, Level, LogFile) ->
HandlerId = handler_id(Name, Type),
Config = #{
level => Level,
formatter => formatter(Who),
filter_default => stop,
filters => filters(Who),
config => ?CONFIG(LogFile)
},
Res = logger:add_handler(HandlerId, logger_disk_log_h, Config),
show_prompts(Res, Who, "start_trace"),
Res.
-spec uninstall(Type :: clientid | topic | ip_address,
Name :: binary() | list()) -> ok | {error, term()}.
@ -121,83 +119,59 @@ uninstall(HandlerId) ->
running() ->
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)).
-spec filter_clientid(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore.
-spec filter_clientid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop.
filter_clientid(#{meta := #{clientid := ClientId}} = Log, {ClientId, _Name}) -> Log;
filter_clientid(_Log, _ExpectId) -> ignore.
filter_clientid(_Log, _ExpectId) -> stop.
-spec filter_topic(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore.
-spec filter_topic(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop.
filter_topic(#{meta := #{topic := Topic}} = Log, {TopicFilter, _Name}) ->
case emqx_topic:match(Topic, TopicFilter) of
true -> Log;
false -> ignore
false -> stop
end;
filter_topic(_Log, _ExpectId) -> ignore.
filter_topic(_Log, _ExpectId) -> stop.
-spec filter_ip_address(logger:log_event(), {string(), atom()}) -> logger:log_event() | ignore.
-spec filter_ip_address(logger:log_event(), {string(), atom()}) -> logger:log_event() | stop.
filter_ip_address(#{meta := #{peername := Peername}} = Log, {IP, _Name}) ->
case lists:prefix(IP, Peername) of
true -> Log;
false -> ignore
false -> stop
end;
filter_ip_address(_Log, _ExpectId) -> ignore.
install_handler(Who = #{name := Name, type := Type}, Level, LogFile) ->
HandlerId = handler_id(Name, Type),
Config = #{
level => Level,
formatter => formatter(Who),
filter_default => stop,
filters => filters(Who),
config => ?CONFIG(LogFile)
},
Res = logger:add_handler(HandlerId, logger_disk_log_h, Config),
show_prompts(Res, Who, "Start trace"),
Res.
filter_ip_address(_Log, _ExpectId) -> stop.
filters(#{type := clientid, filter := Filter, name := Name}) ->
[{clientid, {fun ?MODULE:filter_clientid/2, {ensure_list(Filter), Name}}}];
[{clientid, {fun ?MODULE:filter_clientid/2, {Filter, Name}}}];
filters(#{type := topic, filter := Filter, name := Name}) ->
[{topic, {fun ?MODULE:filter_topic/2, {ensure_bin(Filter), Name}}}];
filters(#{type := ip_address, filter := Filter, name := Name}) ->
[{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}].
formatter(#{type := Type}) ->
{logger_formatter,
formatter(#{type := _Type}) ->
{emqx_trace_formatter,
#{
template => template(Type),
single_line => false,
%% template is for ?SLOG message not ?TRACE.
template => [time," [",level,"] ", msg,"\n"],
single_line => true,
max_size => unlimited,
depth => unlimited
depth => unlimited,
payload_encode => payload_encode()
}
}.
%% Don't log clientid since clientid only supports exact match, all client ids are the same.
%% if clientid is not latin characters. the logger_formatter restricts the output must be `~tp`
%% (actually should use `~ts`), the utf8 characters clientid will become very difficult to read.
template(clientid) ->
[time, " [", level, "] ", {peername, [peername, " "], []}, msg, "\n"];
%% TODO better format when clientid is utf8.
template(_) ->
[time, " [", level, "] ",
{clientid,
[{peername, [clientid, "@", peername, " "], [clientid, " "]}],
[{peername, [peername, " "], []}]
},
msg, "\n"
].
filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) ->
Init = #{id => Id, level => Level, dst => Dst},
case Filters of
[{Type, {_FilterFun, {Filter, Name}}}] when
[{Type, {FilterFun, {Filter, Name}}}] when
Type =:= topic orelse
Type =:= clientid orelse
Type =:= ip_address ->
[Init#{type => Type, filter => Filter, name => Name} | Acc];
[Init#{type => Type, filter => Filter, name => Name, filter_fun => FilterFun} | Acc];
_ ->
Acc
end.
payload_encode() -> emqx_config:get([trace, payload_encode], text).
handler_id(Name, Type) ->
try
do_handler_id(Name, Type)

View File

@ -347,7 +347,6 @@ websocket_handle({binary, Data}, State) when is_list(Data) ->
websocket_handle({binary, iolist_to_binary(Data)}, State);
websocket_handle({binary, Data}, State) ->
?SLOG(debug, #{msg => "RECV_data", data => Data, transport => websocket}),
State2 = ensure_stats_timer(State),
{Packets, State3} = parse_incoming(Data, [], State2),
LenMsg = erlang:length(Packets),
@ -432,11 +431,11 @@ websocket_info(Info, State) ->
websocket_close({_, ReasonCode, _Payload}, State) when is_integer(ReasonCode) ->
websocket_close(ReasonCode, State);
websocket_close(Reason, State) ->
?SLOG(debug, #{msg => "websocket_closed", reason => Reason}),
?TRACE("SOCKET", "websocket_closed", #{reason => Reason}),
handle_info({sock_closed, Reason}, State).
terminate(Reason, _Req, #state{channel = Channel}) ->
?SLOG(debug, #{msg => "terminated", reason => Reason}),
?TRACE("SOCKET", "websocket_terminated", #{reason => Reason}),
emqx_channel:terminate(Reason, Channel);
terminate(_Reason, _Req, _UnExpectedState) ->
@ -480,7 +479,7 @@ handle_info({connack, ConnAck}, State) ->
return(enqueue(ConnAck, State));
handle_info({close, Reason}, State) ->
?SLOG(debug, #{msg => "force_socket_close", reason => Reason}),
?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}),
return(enqueue({close, Reason}, State));
handle_info({event, connected}, State = #state{channel = Channel}) ->
@ -550,7 +549,7 @@ check_limiter(Needs,
{ok, Limiter2} ->
WhenOk(Data, Msgs, State#state{limiter = Limiter2});
{pause, Time, Limiter2} ->
?SLOG(warning, #{msg => "pause time dueto rate limit",
?SLOG(warning, #{msg => "pause_time_due_to_rate_limit",
needs => Needs,
time_in_ms => Time}),
@ -586,7 +585,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
, limiter_timer = undefined
});
{pause, Time, Limiter2} ->
?SLOG(warning, #{msg => "pause time dueto rate limit",
?SLOG(warning, #{msg => "pause_time_due_to_rate_limit",
types => Types,
time_in_ms => Time}),
@ -663,7 +662,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
handle_incoming(Packet, State = #state{listener = {Type, Listener}})
when is_record(Packet, mqtt_packet) ->
?SLOG(debug, #{msg => "RECV", packet => emqx_packet:format(Packet)}),
?TRACE("WS-MQTT", "mqtt_packet_received", #{packet => Packet}),
ok = inc_incoming_stats(Packet),
NState = case emqx_pd:get_counter(incoming_pubs) >
get_active_n(Type, Listener) of
@ -727,7 +726,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
ok = emqx_metrics:inc('delivery.dropped.too_large'),
ok = emqx_metrics:inc('delivery.dropped'),
<<>>;
Data -> ?SLOG(debug, #{msg => "SEND", packet => Packet}),
Data -> ?TRACE("WS-MQTT", "mqtt_packet_sent", #{packet => Packet}),
ok = inc_outgoing_stats(Packet),
Data
catch

View File

@ -39,32 +39,29 @@ end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
init_per_testcase(t_trace_clientid, Config) ->
init(),
Config;
init_per_testcase(_Case, Config) ->
ok = emqx_logger:set_log_level(debug),
_ = [logger:remove_handler(Id) ||#{id := Id} <- emqx_trace_handler:running()],
init(),
Config.
end_per_testcase(_Case, _Config) ->
ok = emqx_logger:set_log_level(warning),
terminate(),
ok.
t_trace_clientid(_Config) ->
%% Start tracing
emqx_logger:set_log_level(error),
{error, _} = emqx_trace_handler:install(clientid, <<"client">>, debug, "tmp/client.log"),
emqx_logger:set_log_level(debug),
%% add list clientid
ok = emqx_trace_handler:install(clientid, "client", debug, "tmp/client.log"),
ok = emqx_trace_handler:install(clientid, <<"client2">>, all, "tmp/client2.log"),
ok = emqx_trace_handler:install(clientid, <<"client3">>, all, "tmp/client3.log"),
{error, {invalid_log_level, bad_level}} =
emqx_trace_handler:install(clientid, <<"client4">>, bad_level, "tmp/client4.log"),
ok = emqx_trace_handler:install("CLI-client1", clientid, "client", debug, "tmp/client.log"),
ok = emqx_trace_handler:install("CLI-client2", clientid, <<"client2">>, all, "tmp/client2.log"),
ok = emqx_trace_handler:install("CLI-client3", clientid, <<"client3">>, all, "tmp/client3.log"),
{error, {handler_not_added, {file_error, ".", eisdir}}} =
emqx_trace_handler:install(clientid, <<"client5">>, debug, "."),
ok = filesync(<<"client">>, clientid),
ok = filesync(<<"client2">>, clientid),
ok = filesync(<<"client3">>, clientid),
emqx_trace:check(),
ok = filesync(<<"CLI-client1">>, clientid),
ok = filesync(<<"CLI-client2">>, clientid),
ok = filesync(<<"CLI-client3">>, clientid),
%% Verify the tracing file exits
?assert(filelib:is_regular("tmp/client.log")),
@ -72,11 +69,11 @@ t_trace_clientid(_Config) ->
?assert(filelib:is_regular("tmp/client3.log")),
%% Get current traces
?assertMatch([#{type := clientid, filter := "client", name := <<"client">>,
?assertMatch([#{type := clientid, filter := <<"client">>, name := <<"CLI-client1">>,
level := debug, dst := "tmp/client.log"},
#{type := clientid, filter := "client2", name := <<"client2">>
#{type := clientid, filter := <<"client2">>, name := <<"CLI-client2">>
, level := debug, dst := "tmp/client2.log"},
#{type := clientid, filter := "client3", name := <<"client3">>,
#{type := clientid, filter := <<"client3">>, name := <<"CLI-client3">>,
level := debug, dst := "tmp/client3.log"}
], emqx_trace_handler:running()),
@ -85,9 +82,9 @@ t_trace_clientid(_Config) ->
emqtt:connect(T),
emqtt:publish(T, <<"a/b/c">>, <<"hi">>),
emqtt:ping(T),
ok = filesync(<<"client">>, clientid),
ok = filesync(<<"client2">>, clientid),
ok = filesync(<<"client3">>, clientid),
ok = filesync(<<"CLI-client1">>, clientid),
ok = filesync(<<"CLI-client2">>, clientid),
ok = filesync(<<"CLI-client3">>, clientid),
%% Verify messages are logged to "tmp/client.log" but not "tmp/client2.log".
{ok, Bin} = file:read_file("tmp/client.log"),
@ -98,25 +95,24 @@ t_trace_clientid(_Config) ->
?assert(filelib:file_size("tmp/client2.log") == 0),
%% Stop tracing
ok = emqx_trace_handler:uninstall(clientid, <<"client">>),
ok = emqx_trace_handler:uninstall(clientid, <<"client2">>),
ok = emqx_trace_handler:uninstall(clientid, <<"client3">>),
ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client1">>),
ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client2">>),
ok = emqx_trace_handler:uninstall(clientid, <<"CLI-client3">>),
emqtt:disconnect(T),
?assertEqual([], emqx_trace_handler:running()).
t_trace_clientid_utf8(_) ->
emqx_logger:set_log_level(debug),
Utf8Id = <<"client 漢字編碼"/utf8>>,
ok = emqx_trace_handler:install(clientid, Utf8Id, debug, "tmp/client-utf8.log"),
ok = emqx_trace_handler:install("CLI-UTF8", clientid, Utf8Id, debug, "tmp/client-utf8.log"),
emqx_trace:check(),
{ok, T} = emqtt:start_link([{clientid, Utf8Id}]),
emqtt:connect(T),
[begin emqtt:publish(T, <<"a/b/c">>, <<"hi">>) end|| _ <- lists:seq(1, 10)],
emqtt:ping(T),
ok = filesync(Utf8Id, clientid),
ok = emqx_trace_handler:uninstall(clientid, Utf8Id),
ok = filesync("CLI-UTF8", clientid),
ok = emqx_trace_handler:uninstall(clientid, "CLI-UTF8"),
emqtt:disconnect(T),
?assertEqual([], emqx_trace_handler:running()),
ok.
@ -126,11 +122,11 @@ t_trace_topic(_Config) ->
emqtt:connect(T),
%% Start tracing
emqx_logger:set_log_level(debug),
ok = emqx_trace_handler:install(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"),
ok = emqx_trace_handler:install(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"),
ok = filesync(<<"x/#">>, topic),
ok = filesync(<<"y/#">>, topic),
ok = emqx_trace_handler:install("CLI-TOPIC-1", topic, <<"x/#">>, all, "tmp/topic_trace_x.log"),
ok = emqx_trace_handler:install("CLI-TOPIC-2", topic, <<"y/#">>, all, "tmp/topic_trace_y.log"),
emqx_trace:check(),
ok = filesync("CLI-TOPIC-1", topic),
ok = filesync("CLI-TOPIC-2", topic),
%% Verify the tracing file exits
?assert(filelib:is_regular("tmp/topic_trace_x.log")),
@ -138,9 +134,9 @@ t_trace_topic(_Config) ->
%% Get current traces
?assertMatch([#{type := topic, filter := <<"x/#">>,
level := debug, dst := "tmp/topic_trace_x.log", name := <<"x/#">>},
level := debug, dst := "tmp/topic_trace_x.log", name := <<"CLI-TOPIC-1">>},
#{type := topic, filter := <<"y/#">>,
name := <<"y/#">>, level := debug, dst := "tmp/topic_trace_y.log"}
name := <<"CLI-TOPIC-2">>, level := debug, dst := "tmp/topic_trace_y.log"}
],
emqx_trace_handler:running()),
@ -149,8 +145,8 @@ t_trace_topic(_Config) ->
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
emqtt:subscribe(T, <<"x/y/z">>),
emqtt:unsubscribe(T, <<"x/y/z">>),
ok = filesync(<<"x/#">>, topic),
ok = filesync(<<"y/#">>, topic),
ok = filesync("CLI-TOPIC-1", topic),
ok = filesync("CLI-TOPIC-2", topic),
{ok, Bin} = file:read_file("tmp/topic_trace_x.log"),
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
@ -161,8 +157,8 @@ t_trace_topic(_Config) ->
?assert(filelib:file_size("tmp/topic_trace_y.log") =:= 0),
%% Stop tracing
ok = emqx_trace_handler:uninstall(topic, <<"x/#">>),
ok = emqx_trace_handler:uninstall(topic, <<"y/#">>),
ok = emqx_trace_handler:uninstall(topic, <<"CLI-TOPIC-1">>),
ok = emqx_trace_handler:uninstall(topic, <<"CLI-TOPIC-2">>),
{error, _Reason} = emqx_trace_handler:uninstall(topic, <<"z/#">>),
?assertEqual([], emqx_trace_handler:running()),
emqtt:disconnect(T).
@ -172,10 +168,12 @@ t_trace_ip_address(_Config) ->
emqtt:connect(T),
%% Start tracing
ok = emqx_trace_handler:install(ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"),
ok = emqx_trace_handler:install(ip_address, "192.168.1.1", all, "tmp/ip_trace_y.log"),
ok = filesync(<<"127.0.0.1">>, ip_address),
ok = filesync(<<"192.168.1.1">>, ip_address),
ok = emqx_trace_handler:install("CLI-IP-1", ip_address, "127.0.0.1", all, "tmp/ip_trace_x.log"),
ok = emqx_trace_handler:install("CLI-IP-2", ip_address,
"192.168.1.1", all, "tmp/ip_trace_y.log"),
emqx_trace:check(),
ok = filesync(<<"CLI-IP-1">>, ip_address),
ok = filesync(<<"CLI-IP-2">>, ip_address),
%% Verify the tracing file exits
?assert(filelib:is_regular("tmp/ip_trace_x.log")),
@ -183,10 +181,10 @@ t_trace_ip_address(_Config) ->
%% Get current traces
?assertMatch([#{type := ip_address, filter := "127.0.0.1",
name := <<"127.0.0.1">>,
name := <<"CLI-IP-1">>,
level := debug, dst := "tmp/ip_trace_x.log"},
#{type := ip_address, filter := "192.168.1.1",
name := <<"192.168.1.1">>,
name := <<"CLI-IP-2">>,
level := debug, dst := "tmp/ip_trace_y.log"}
],
emqx_trace_handler:running()),
@ -196,8 +194,8 @@ t_trace_ip_address(_Config) ->
emqtt:publish(T, <<"x/y/z">>, <<"hi2">>),
emqtt:subscribe(T, <<"x/y/z">>),
emqtt:unsubscribe(T, <<"x/y/z">>),
ok = filesync(<<"127.0.0.1">>, ip_address),
ok = filesync(<<"192.168.1.1">>, ip_address),
ok = filesync(<<"CLI-IP-1">>, ip_address),
ok = filesync(<<"CLI-IP-2">>, ip_address),
{ok, Bin} = file:read_file("tmp/ip_trace_x.log"),
?assertNotEqual(nomatch, binary:match(Bin, [<<"hi1">>])),
@ -208,8 +206,8 @@ t_trace_ip_address(_Config) ->
?assert(filelib:file_size("tmp/ip_trace_y.log") =:= 0),
%% Stop tracing
ok = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.1">>),
ok = emqx_trace_handler:uninstall(ip_address, <<"192.168.1.1">>),
ok = emqx_trace_handler:uninstall(ip_address, <<"CLI-IP-1">>),
ok = emqx_trace_handler:uninstall(ip_address, <<"CLI-IP-2">>),
{error, _Reason} = emqx_trace_handler:uninstall(ip_address, <<"127.0.0.2">>),
emqtt:disconnect(T),
?assertEqual([], emqx_trace_handler:running()).
@ -221,7 +219,12 @@ filesync(Name, Type) ->
%% sometime the handler process is not started yet.
filesync(_Name, _Type, 0) -> ok;
filesync(Name, Type, Retry) ->
filesync(Name0, Type, Retry) ->
Name =
case is_binary(Name0) of
true -> Name0;
false -> list_to_binary(Name0)
end,
try
Handler = binary_to_atom(<<"trace_",
(atom_to_binary(Type))/binary, "_", Name/binary>>),
@ -231,3 +234,9 @@ filesync(Name, Type, Retry) ->
ct:sleep(100),
filesync(Name, Type, Retry - 1)
end.
init() ->
emqx_trace:start_link().
terminate() ->
catch ok = gen_server:stop(emqx_trace, normal, 5000).

View File

@ -67,7 +67,7 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_authn, emqx_dashboard]),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authn]),
ok.
set_special_configs(emqx_dashboard) ->

View File

@ -214,7 +214,7 @@ update(Type, Name, {OldConf, Conf}) ->
case recreate(Type, Name, Conf) of
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);
{error, not_found} ->
?SLOG(warning, #{ msg => "updating a non-exist bridge, create a new one"
?SLOG(warning, #{ msg => "updating_a_non-exist_bridge_need_create_a_new_one"
, type => Type, name => Name, config => Conf}),
create(Type, Name, Conf);
{error, Reason} -> {update_bridge_failed, Reason}
@ -242,7 +242,7 @@ create_dry_run(Type, Conf) ->
end.
remove(Type, Name, _Conf) ->
?SLOG(info, #{msg => "remove bridge", type => Type, name => Name}),
?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}),
case emqx_resource:remove_local(resource_id(Type, Name)) of
ok -> ok;
{error, not_found} -> ok;

View File

@ -236,7 +236,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
false -> RetryMs
end;
{aborted, Reason} ->
?SLOG(error, #{msg => "read_next_mfa transaction failed", error => Reason}),
?SLOG(error, #{msg => "read_next_mfa_transaction_failed", error => Reason}),
RetryMs
end.
@ -248,7 +248,7 @@ read_next_mfa(Node) ->
TnxId = max(LatestId - 1, 0),
commit(Node, TnxId),
?SLOG(notice, #{
msg => "New node first catch up and start commit.",
msg => "new_node_first_catch_up_and_start_commit.",
node => Node, tnx_id => TnxId}),
TnxId;
[#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1
@ -277,7 +277,7 @@ do_catch_up(ToTnxId, Node) ->
io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)",
[Node, LastAppliedId, ToTnxId])),
?SLOG(error, #{
msg => "catch up failed!",
msg => "catch_up_failed!",
last_applied_id => LastAppliedId,
to_tnx_id => ToTnxId
}),

View File

@ -144,7 +144,7 @@ multicall(M, F, Args) ->
{retry, TnxId, Res, Nodes} ->
%% The init MFA return ok, but other nodes failed.
%% We return ok and alert an alarm.
?SLOG(error, #{msg => "failed to update config in cluster", nodes => Nodes,
?SLOG(error, #{msg => "failed_to_update_config_in_cluster", nodes => Nodes,
tnx_id => TnxId, mfa => {M, F, Args}}),
Res;
{error, Error} -> %% all MFA return not ok or {ok, term()}.

View File

@ -792,16 +792,7 @@ do_formatter(json, CharsLimit, SingleLine, TimeOffSet, Depth) ->
}};
do_formatter(text, CharsLimit, SingleLine, TimeOffSet, Depth) ->
{emqx_logger_textfmt,
#{template =>
[time," [",level,"] ",
{clientid,
[{peername,
[clientid,"@",peername," "],
[clientid, " "]}],
[{peername,
[peername," "],
[]}]},
msg,"\n"],
#{template => [time," [",level,"] ", msg,"\n"],
chars_limit => CharsLimit,
single_line => SingleLine,
time_offset => TimeOffSet,

View File

@ -74,9 +74,19 @@ t_base_test(_Config) ->
?assertEqual(node(), maps:get(initiator, Query)),
?assert(maps:is_key(created_at, Query)),
?assertEqual(ok, receive_msg(3, test)),
?assertEqual({ok, 2, ok}, emqx_cluster_rpc:multicall(M, F, A)),
{atomic, Status} = emqx_cluster_rpc:status(),
?assertEqual(3, length(Status)),
?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 1 end, Status)),
case length(Status) =:= 3 of
true -> ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status));
false ->
%% wait for mnesia to write in.
ct:sleep(42),
{atomic, Status1} = emqx_cluster_rpc:status(),
ct:pal("status: ~p", Status),
ct:pal("status1: ~p", Status1),
?assertEqual(3, length(Status1)),
?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status))
end,
ok.
t_commit_fail_test(_Config) ->

View File

@ -143,7 +143,7 @@ on_start(InstId, #{base_url := #{scheme := Scheme,
retry_interval := RetryInterval,
pool_type := PoolType,
pool_size := PoolSize} = Config) ->
?SLOG(info, #{msg => "starting http connector",
?SLOG(info, #{msg => "starting_http_connector",
connector => InstId, config => Config}),
{Transport, TransportOpts} = case Scheme of
http ->
@ -181,13 +181,13 @@ on_start(InstId, #{base_url := #{scheme := Scheme,
end.
on_stop(InstId, #{pool_name := PoolName}) ->
?SLOG(info, #{msg => "stopping http connector",
?SLOG(info, #{msg => "stopping_http_connector",
connector => InstId}),
ehttpc_sup:stop_pool(PoolName).
on_query(InstId, {send_message, Msg}, AfterQuery, State) ->
case maps:get(request, State, undefined) of
undefined -> ?SLOG(error, #{msg => "request not found", connector => InstId});
undefined -> ?SLOG(error, #{msg => "request_not_found", connector => InstId});
Request ->
#{method := Method, path := Path, body := Body, headers := Headers,
request_timeout := Timeout} = process_request(Request, Msg),
@ -199,16 +199,15 @@ on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State);
on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery,
#{pool_name := PoolName, base_path := BasePath} = State) ->
?SLOG(debug, #{msg => "http connector received request",
request => Request, connector => InstId,
state => State}),
?TRACE("QUERY", "http_connector_received",
#{request => Request, connector => InstId, state => State}),
NRequest = update_path(BasePath, Request),
case Result = ehttpc:request(case KeyOrNum of
undefined -> PoolName;
_ -> {PoolName, KeyOrNum}
end, Method, NRequest, Timeout) of
{error, Reason} ->
?SLOG(error, #{msg => "http connector do reqeust failed",
?SLOG(error, #{msg => "http_connector_do_reqeust_failed",
request => NRequest, reason => Reason,
connector => InstId}),
emqx_resource:query_failed(AfterQuery);

View File

@ -55,7 +55,7 @@ on_start(InstId, #{servers := Servers0,
pool_size := PoolSize,
auto_reconnect := AutoReconn,
ssl := SSL} = Config) ->
?SLOG(info, #{msg => "starting ldap connector",
?SLOG(info, #{msg => "starting_ldap_connector",
connector => InstId, config => Config}),
Servers = [begin proplists:get_value(host, S) end || S <- Servers0],
SslOpts = case maps:get(enable, SSL) of
@ -81,23 +81,21 @@ on_start(InstId, #{servers := Servers0,
{ok, #{poolname => PoolName}}.
on_stop(InstId, #{poolname := PoolName}) ->
?SLOG(info, #{msg => "stopping ldap connector",
?SLOG(info, #{msg => "stopping_ldap_connector",
connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) ->
Request = {Base, Filter, Attributes},
?SLOG(debug, #{msg => "ldap connector received request",
request => Request, connector => InstId,
state => State}),
?TRACE("QUERY", "ldap_connector_received",
#{request => Request, connector => InstId, state => State}),
case Result = ecpool:pick_and_do(
PoolName,
{?MODULE, search, [Base, Filter, Attributes]},
no_handover) of
{error, Reason} ->
?SLOG(error, #{msg => "ldap connector do request failed",
request => Request, connector => InstId,
reason => Reason}),
?SLOG(error, #{msg => "ldap_connector_do_request_failed",
request => Request, connector => InstId, reason => Reason}),
emqx_resource:query_failed(AfterQuery);
_ ->
emqx_resource:query_success(AfterQuery)

View File

@ -128,7 +128,7 @@ on_start(InstId, Config = #{mongo_type := Type,
{ok, #{poolname => PoolName, type => Type}}.
on_stop(InstId, #{poolname := PoolName}) ->
?SLOG(info, #{msg => "stopping mongodb connector",
?SLOG(info, #{msg => "stopping_mongodb_connector",
connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName).
@ -137,14 +137,13 @@ on_query(InstId,
AfterQuery,
#{poolname := PoolName} = State) ->
Request = {Action, Collection, Selector, Docs},
?SLOG(debug, #{msg => "mongodb connector received request",
request => Request, connector => InstId,
state => State}),
?TRACE("QUERY", "mongodb_connector_received",
#{request => Request, connector => InstId, state => State}),
case ecpool:pick_and_do(PoolName,
{?MODULE, mongo_query, [Action, Collection, Selector, Docs]},
no_handover) of
{error, Reason} ->
?SLOG(error, #{msg => "mongodb connector do query failed",
?SLOG(error, #{msg => "mongodb_connector_do_query_failed",
request => Request, reason => Reason,
connector => InstId}),
emqx_resource:query_failed(AfterQuery),

View File

@ -118,7 +118,7 @@ on_message_received(Msg, HookPoint) ->
%% ===================================================================
on_start(InstId, Conf) ->
InstanceId = binary_to_atom(InstId, utf8),
?SLOG(info, #{msg => "starting mqtt connector",
?SLOG(info, #{msg => "starting_mqtt_connector",
connector => InstanceId, config => Conf}),
BasicConf = basic_config(Conf),
BridgeConf = BasicConf#{
@ -139,19 +139,18 @@ on_start(InstId, Conf) ->
end.
on_stop(_InstId, #{name := InstanceId}) ->
?SLOG(info, #{msg => "stopping mqtt connector",
?SLOG(info, #{msg => "stopping_mqtt_connector",
connector => InstanceId}),
case ?MODULE:drop_bridge(InstanceId) of
ok -> ok;
{error, not_found} -> ok;
{error, Reason} ->
?SLOG(error, #{msg => "stop mqtt connector",
?SLOG(error, #{msg => "stop_mqtt_connector",
connector => InstanceId, reason => Reason})
end.
on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) ->
?SLOG(debug, #{msg => "send msg to remote node", message => Msg,
connector => InstanceId}),
?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
emqx_resource:query_success(AfterQuery).

View File

@ -56,7 +56,7 @@ on_start(InstId, #{server := {Host, Port},
auto_reconnect := AutoReconn,
pool_size := PoolSize,
ssl := SSL } = Config) ->
?SLOG(info, #{msg => "starting mysql connector",
?SLOG(info, #{msg => "starting_mysql_connector",
connector => InstId, config => Config}),
SslOpts = case maps:get(enable, SSL) of
true ->
@ -76,7 +76,7 @@ on_start(InstId, #{server := {Host, Port},
{ok, #{poolname => PoolName}}.
on_stop(InstId, #{poolname := PoolName}) ->
?SLOG(info, #{msg => "stopping mysql connector",
?SLOG(info, #{msg => "stopping_mysql_connector",
connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName).
@ -85,14 +85,13 @@ on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) ->
on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State);
on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) ->
?SLOG(debug, #{msg => "mysql connector received sql query",
connector => InstId, sql => SQL, state => State}),
?TRACE("QUERY", "mysql_connector_received", #{connector => InstId, sql => SQL, state => State}),
case Result = ecpool:pick_and_do(
PoolName,
{mysql, query, [SQL, Params, Timeout]},
no_handover) of
{error, Reason} ->
?SLOG(error, #{msg => "mysql connector do sql query failed",
?SLOG(error, #{msg => "mysql_connector_do_sql_query_failed",
connector => InstId, sql => SQL, reason => Reason}),
emqx_resource:query_failed(AfterQuery);
_ ->

View File

@ -58,7 +58,7 @@ on_start(InstId, #{server := {Host, Port},
auto_reconnect := AutoReconn,
pool_size := PoolSize,
ssl := SSL } = Config) ->
?SLOG(info, #{msg => "starting postgresql connector",
?SLOG(info, #{msg => "starting_postgresql_connector",
connector => InstId, config => Config}),
SslOpts = case maps:get(enable, SSL) of
true ->
@ -88,12 +88,12 @@ on_query(InstId, QueryParams, AfterQuery, #{poolname := PoolName} = State) ->
{prepared_query, Name, SQL} -> {prepared_query, [Name, SQL, []]};
{prepared_query, Name, SQL, Params} -> {prepared_query, [Name, SQL, Params]}
end,
?SLOG(debug, #{msg => "postgresql connector received sql query",
connector => InstId, command => Command, args => Args, state => State}),
?TRACE("QUERY", "postgresql_connector_received",
#{connector => InstId, command => Command, args => Args, state => State}),
case Result = ecpool:pick_and_do(PoolName, {?MODULE, Command, Args}, no_handover) of
{error, Reason} ->
?SLOG(error, #{
msg => "postgresql connector do sql query failed",
msg => "postgresql_connector_do_sql_query_failed",
connector => InstId, sql => SQL, reason => Reason}),
emqx_resource:query_failed(AfterQuery);
_ ->

View File

@ -94,7 +94,7 @@ on_start(InstId, #{redis_type := Type,
pool_size := PoolSize,
auto_reconnect := AutoReconn,
ssl := SSL } = Config) ->
?SLOG(info, #{msg => "starting redis connector",
?SLOG(info, #{msg => "starting_redis_connector",
connector => InstId, config => Config}),
Servers = case Type of
single -> [{servers, [maps:get(server, Config)]}];
@ -127,20 +127,20 @@ on_start(InstId, #{redis_type := Type,
{ok, #{poolname => PoolName, type => Type}}.
on_stop(InstId, #{poolname := PoolName}) ->
?SLOG(info, #{msg => "stopping redis connector",
?SLOG(info, #{msg => "stopping_redis_connector",
connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) ->
?SLOG(debug, #{msg => "redis connector received cmd query",
connector => InstId, sql => Command, state => State}),
?TRACE("QUERY", "redis_connector_received",
#{connector => InstId, sql => Command, state => State}),
Result = case Type of
cluster -> eredis_cluster:q(PoolName, Command);
_ -> ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover)
end,
case Result of
{error, Reason} ->
?SLOG(error, #{msg => "redis connector do cmd query failed",
?SLOG(error, #{msg => "redis_connector_do_cmd_query_failed",
connector => InstId, sql => Command, reason => Reason}),
emqx_resource:query_failed(AfterCommand);
_ ->

View File

@ -158,15 +158,15 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, Parent)
RC =:= ?RC_NO_MATCHING_SUBSCRIBERS ->
Parent ! {batch_ack, PktId}, ok;
handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
?SLOG(warning, #{msg => "publish to remote node falied",
?SLOG(warning, #{msg => "publish_to_remote_node_falied",
packet_id => PktId, reason_code => RC}).
handle_publish(Msg, undefined) ->
?SLOG(error, #{msg => "cannot publish to local broker as"
" 'ingress' is not configured",
?SLOG(error, #{msg => "cannot_publish_to_local_broker_as"
"_'ingress'_is_not_configured",
message => Msg});
handle_publish(Msg, Vars) ->
?SLOG(debug, #{msg => "publish to local broker",
?SLOG(debug, #{msg => "publish_to_local_broker",
message => Msg, vars => Vars}),
emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1),
case Vars of

View File

@ -188,7 +188,7 @@ callback_mode() -> [state_functions].
%% @doc Config should be a map().
init(#{name := Name} = ConnectOpts) ->
?SLOG(debug, #{msg => "starting bridge worker",
?SLOG(debug, #{msg => "starting_bridge_worker",
name => Name}),
erlang:process_flag(trap_exit, true),
Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})),
@ -335,7 +335,7 @@ common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) ->
NewQ = replayq:append(Q, [Msg]),
{keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}};
common(StateName, Type, Content, #{name := Name} = State) ->
?SLOG(notice, #{msg => "Bridge discarded event",
?SLOG(notice, #{msg => "bridge_discarded_event",
name => Name, type => Type, state_name => StateName,
content => Content}),
{keep_state, State}.
@ -349,7 +349,7 @@ do_connect(#{connect_opts := ConnectOpts,
{ok, State#{connection => Conn}};
{error, Reason} ->
ConnectOpts1 = obfuscate(ConnectOpts),
?SLOG(error, #{msg => "Failed to connect",
?SLOG(error, #{msg => "failed_to_connect",
config => ConnectOpts1, reason => Reason}),
{error, Reason, State}
end.
@ -386,8 +386,8 @@ pop_and_send_loop(#{replayq := Q} = State, N) ->
end.
do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) ->
?SLOG(error, #{msg => "cannot forward messages to remote broker"
" as 'egress' is not configured",
?SLOG(error, #{msg => "cannot_forward_messages_to_remote_broker"
"_as_'egress'_is_not_configured",
messages => Msg});
do_send(#{inflight := Inflight,
connection := Connection,
@ -398,7 +398,7 @@ do_send(#{inflight := Inflight,
emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'),
emqx_connector_mqtt_msg:to_remote_msg(Message, Vars)
end,
?SLOG(debug, #{msg => "publish to remote broker",
?SLOG(debug, #{msg => "publish_to_remote_broker",
message => Msg, vars => Vars}),
case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(Msg)]) of
{ok, Refs} ->

View File

@ -312,6 +312,9 @@ responses(Responses, Module) ->
response(Status, Bin, {Acc, RefsAcc, Module}) when is_binary(Bin) ->
{Acc#{integer_to_binary(Status) => #{description => Bin}}, RefsAcc, Module};
%% Support swagger raw object(file download).
response(Status, #{content := _} = Content, {Acc, RefsAcc, Module}) ->
{Acc#{integer_to_binary(Status) => Content}, RefsAcc, Module};
response(Status, ?REF(StructName), {Acc, RefsAcc, Module}) ->
response(Status, ?R_REF(Module, StructName), {Acc, RefsAcc, Module});
response(Status, ?R_REF(_Mod, _Name) = RRef, {Acc, RefsAcc, Module}) ->

View File

@ -149,7 +149,7 @@ api_key(post, #{body := App}) ->
Desc = unicode:characters_to_binary(Desc0, unicode),
case emqx_mgmt_auth:create(Name, Enable, ExpiredAt, Desc) of
{ok, NewApp} -> {200, format(NewApp)};
{error, Reason} -> {400, Reason}
{error, Reason} -> {400, io_lib:format("~p", [Reason])}
end.
api_key_by_name(get, #{bindings := #{name := Name}}) ->

View File

@ -107,9 +107,14 @@ schema("/trace/:name/download") ->
get => #{
description => "Download trace log by name",
parameters => [hoconsc:ref(name)],
%% todo zip file octet-stream
responses => #{
200 => <<"TODO octet-stream">>
200 =>
#{description => "A trace zip file",
content => #{
'application/octet-stream' =>
#{schema => #{type => "string", format => "binary"}}
}
}
}
}
};
@ -124,9 +129,12 @@ schema("/trace/:name/log") ->
hoconsc:ref(position),
hoconsc:ref(node)
],
%% todo response data
responses => #{
200 => <<"TODO">>
200 =>
[
{items, hoconsc:mk(binary(), #{example => "TEXT-LOG-ITEMS"})}
| fields(bytes) ++ fields(position)
]
}
}
}.
@ -209,6 +217,7 @@ fields(position) ->
default => 0
})}].
-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$").
validate_name(Name) ->
@ -296,7 +305,12 @@ download_trace_log(get, #{bindings := #{name := Name}}) ->
ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip",
{ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]),
emqx_trace:delete_files_after_send(ZipFileName, Zips),
{200, ZipFile};
Headers = #{
<<"content-type">> => <<"application/x-zip">>,
<<"content-disposition">> =>
iolist_to_binary("attachment; filename=" ++ filename:basename(ZipFile))
},
{200, Headers, {file, ZipFile}};
{error, not_found} -> ?NOT_FOUND(Name)
end.
@ -324,11 +338,10 @@ cluster_call(Mod, Fun, Args, Timeout) ->
BadNodes =/= [] andalso ?LOG(error, "rpc call failed on ~p ~p", [BadNodes, {Mod, Fun, Args}]),
GoodRes.
stream_log_file(get, #{bindings := #{name := Name}, query_string := Query} = T) ->
stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
Node0 = maps:get(<<"node">>, Query, atom_to_binary(node())),
Position = maps:get(<<"position">>, Query, 0),
Bytes = maps:get(<<"bytes">>, Query, 1000),
logger:error("~p", [T]),
case to_node(Node0) of
{ok, Node} ->
case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of

View File

@ -18,6 +18,7 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-include("emqx_mgmt.hrl").
@ -386,18 +387,20 @@ trace(["list"]) ->
emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Filter, Level, Dst])
end, emqx_trace_handler:running());
trace(["stop", Operation, ClientId]) ->
case trace_type(Operation) of
{ok, Type} -> trace_off(Type, ClientId);
trace(["stop", Operation, Filter0]) ->
case trace_type(Operation, Filter0) of
{ok, Type, Filter} -> trace_off(Type, Filter);
error -> trace([])
end;
trace(["start", Operation, ClientId, LogFile]) ->
trace(["start", Operation, ClientId, LogFile, "all"]);
trace(["start", Operation, ClientId, LogFile, Level]) ->
case trace_type(Operation) of
{ok, Type} -> trace_on(Type, ClientId, list_to_existing_atom(Level), LogFile);
trace(["start", Operation, Filter0, LogFile, Level]) ->
case trace_type(Operation, Filter0) of
{ok, Type, Filter} ->
trace_on(name(Filter0), Type, Filter,
list_to_existing_atom(Level), LogFile);
error -> trace([])
end;
@ -417,20 +420,23 @@ trace(_) ->
"Stop tracing for a client ip on local node"}
]).
trace_on(Who, Name, Level, LogFile) ->
case emqx_trace_handler:install(Who, Name, Level, LogFile) of
trace_on(Name, Type, Filter, Level, LogFile) ->
case emqx_trace_handler:install(Name, Type, Filter, Level, LogFile) of
ok ->
emqx_ctl:print("trace ~s ~s successfully~n", [Who, Name]);
emqx_trace:check(),
emqx_ctl:print("trace ~s ~s successfully~n", [Filter, Name]);
{error, Error} ->
emqx_ctl:print("[error] trace ~s ~s: ~p~n", [Who, Name, Error])
emqx_ctl:print("[error] trace ~s ~s: ~p~n", [Filter, Name, Error])
end.
trace_off(Who, Name) ->
case emqx_trace_handler:uninstall(Who, Name) of
trace_off(Type, Filter) ->
?TRACE("CLI", "trace_stopping", #{Type => Filter}),
case emqx_trace_handler:uninstall(Type, name(Filter)) of
ok ->
emqx_ctl:print("stop tracing ~s ~s successfully~n", [Who, Name]);
emqx_trace:check(),
emqx_ctl:print("stop tracing ~s ~s successfully~n", [Type, Filter]);
{error, Error} ->
emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Who, Name, Error])
emqx_ctl:print("[error] stop tracing ~s ~s: ~p~n", [Type, Filter, Error])
end.
%%--------------------------------------------------------------------
@ -459,9 +465,9 @@ traces(["delete", Name]) ->
traces(["start", Name, Operation, Filter]) ->
traces(["start", Name, Operation, Filter, "900"]);
traces(["start", Name, Operation, Filter, DurationS]) ->
case trace_type(Operation) of
{ok, Type} -> trace_cluster_on(Name, Type, Filter, DurationS);
traces(["start", Name, Operation, Filter0, DurationS]) ->
case trace_type(Operation, Filter0) of
{ok, Type, Filter} -> trace_cluster_on(Name, Type, Filter, DurationS);
error -> traces([])
end;
@ -503,10 +509,10 @@ trace_cluster_off(Name) ->
{error, Error} -> emqx_ctl:print("[error] Stop cluster_trace ~s: ~p~n", [Name, Error])
end.
trace_type("client") -> {ok, clientid};
trace_type("topic") -> {ok, topic};
trace_type("ip_address") -> {ok, ip_address};
trace_type(_) -> error.
trace_type("client", ClientId) -> {ok, clientid, list_to_binary(ClientId)};
trace_type("topic", Topic) -> {ok, topic, list_to_binary(Topic)};
trace_type("ip_address", IP) -> {ok, ip_address, IP};
trace_type(_, _) -> error.
%%--------------------------------------------------------------------
%% @doc Listeners Command
@ -716,3 +722,6 @@ format_listen_on({Addr, Port}) when is_list(Addr) ->
io_lib:format("~ts:~w", [Addr, Port]);
format_listen_on({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]).
name(Filter) ->
iolist_to_binary(["CLI-", Filter]).

View File

@ -85,7 +85,7 @@ republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}},
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected, 0),
Retain = replace_simple_var(RetainTks, Selected, false),
?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}),
?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}),
safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload);
%% in case this is a "$events/" event
@ -99,7 +99,7 @@ republish(Selected, #{metadata := #{rule_id := RuleId}},
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected, 0),
Retain = replace_simple_var(RetainTks, Selected, false),
?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}),
?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}),
safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload).
%%--------------------------------------------------------------------

View File

@ -248,7 +248,7 @@ handle_output(OutId, Selected, Envs) ->
end.
do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
?SLOG(debug, #{msg => "output to bridge", bridge_id => BridgeId}),
?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}),
emqx_bridge:send_message(BridgeId, Selected);
do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
Mod:Func(Selected, Envs, Args).

View File

@ -77,7 +77,7 @@ flatten([D1 | L]) when is_list(D1) ->
D1 ++ flatten(L).
echo_action(Data, Envs) ->
?SLOG(debug, #{msg => "testing_rule_sql_ok", data => Data, envs => Envs}),
?TRACE("RULE", "testing_rule_sql_ok", #{data => Data, envs => Envs}),
Data.
fill_default_values(Event, Context) ->

View File

@ -55,7 +55,7 @@
, {mria, {git, "https://github.com/emqx/mria", {tag, "0.1.5"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.7"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.9"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
, {replayq, "0.3.3"}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}