feat(trace): struct log for trace
This commit is contained in:
parent
668180388c
commit
4b6bba11eb
|
@ -69,7 +69,7 @@
|
|||
ok
|
||||
end).
|
||||
|
||||
-define(TRACE(Action, Meta, Msg), emqx_trace:log(Action, Meta, Msg)).
|
||||
-define(TRACE(Event, Msg, Meta), emqx_trace:log(Event, Msg, Meta)).
|
||||
|
||||
%% print to 'user' group leader
|
||||
-define(ULOG(Fmt, Args), io:format(user, Fmt, Args)).
|
||||
|
|
|
@ -205,7 +205,7 @@ publish(Msg) when is_record(Msg, message) ->
|
|||
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
|
||||
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
|
||||
#message{headers = #{allow_publish := false}} ->
|
||||
?TRACE("NotAllow", #{payload => emqx_message:to_log_map(Msg)}, "message_not_published"),
|
||||
?TRACE("MQTT", "msg_publish_not_allowed", #{message => emqx_message:to_log_map(Msg)}),
|
||||
[];
|
||||
Msg1 = #message{topic = Topic} ->
|
||||
emqx_persistent_session:persist_message(Msg1),
|
||||
|
|
|
@ -292,7 +292,7 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
|
|||
fun check_banned/2
|
||||
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
||||
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
|
||||
?TRACE("RECV", #{}, Packet),
|
||||
?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
|
||||
NChannel1 = NChannel#channel{
|
||||
will_msg = emqx_packet:will_msg(NConnPkt),
|
||||
alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
|
||||
|
|
|
@ -526,7 +526,7 @@ handle_msg({connack, ConnAck}, State) ->
|
|||
handle_outgoing(ConnAck, State);
|
||||
|
||||
handle_msg({close, Reason}, State) ->
|
||||
?TRACE("CLOSE", #{reason => Reason}, "force_socket_close"),
|
||||
?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}),
|
||||
handle_info({sock_closed, Reason}, close_socket(State));
|
||||
|
||||
handle_msg({event, connected}, State = #state{channel = Channel}) ->
|
||||
|
@ -565,7 +565,7 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport,
|
|||
emqx_congestion:cancel_alarms(Socket, Transport, Channel1),
|
||||
emqx_channel:terminate(Reason, Channel1),
|
||||
close_socket_ok(State),
|
||||
?TRACE("TERMINATE", #{reason => Reason}, "terminated")
|
||||
?TRACE("SOCKET", "tcp_socket_terminated", #{reason => Reason})
|
||||
catch
|
||||
E : C : S ->
|
||||
?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S})
|
||||
|
@ -715,7 +715,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
|||
|
||||
handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
|
||||
ok = inc_incoming_stats(Packet),
|
||||
?TRACE("RECV", #{}, Packet),
|
||||
?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
|
||||
with_channel(handle_in, [Packet], State);
|
||||
|
||||
handle_incoming(FrameError, State) ->
|
||||
|
@ -754,13 +754,13 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
|||
<<>> -> ?SLOG(warning, #{
|
||||
msg => "packet_is_discarded",
|
||||
reason => "frame_is_too_large",
|
||||
packet => emqx_packet:format(Packet)
|
||||
packet => emqx_packet:format(Packet, null)
|
||||
}),
|
||||
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
||||
ok = emqx_metrics:inc('delivery.dropped'),
|
||||
<<>>;
|
||||
Data ->
|
||||
?TRACE("SEND", #{}, Packet),
|
||||
?TRACE("MQTT", "mqtt_packet_sent", #{packet => Packet}),
|
||||
ok = inc_outgoing_stats(Packet),
|
||||
Data
|
||||
catch
|
||||
|
|
|
@ -44,7 +44,9 @@
|
|||
, will_msg/1
|
||||
]).
|
||||
|
||||
-export([format/1]).
|
||||
-export([ format/1
|
||||
, format/2
|
||||
]).
|
||||
|
||||
-define(TYPE_NAMES,
|
||||
{ 'CONNECT'
|
||||
|
@ -435,25 +437,28 @@ will_msg(#mqtt_packet_connect{clientid = ClientId,
|
|||
|
||||
%% @doc Format packet
|
||||
-spec(format(emqx_types:packet()) -> iolist()).
|
||||
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) ->
|
||||
format_header(Header, format_variable(Variable, Payload)).
|
||||
format(Packet) -> format(Packet, emqx_trace_handler:payload_encode()).
|
||||
|
||||
%% @doc Format packet
|
||||
-spec(format(emqx_types:packet(), hex | text | null) -> iolist()).
|
||||
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) ->
|
||||
HeaderIO = format_header(Header),
|
||||
case format_variable(Variable, Payload, PayloadEncode) of
|
||||
"" -> HeaderIO;
|
||||
VarIO -> [HeaderIO,",", VarIO]
|
||||
end.
|
||||
|
||||
format_header(#mqtt_packet_header{type = Type,
|
||||
dup = Dup,
|
||||
qos = QoS,
|
||||
retain = Retain}, S) ->
|
||||
S1 = case S == undefined of
|
||||
true -> <<>>;
|
||||
false -> [", ", S]
|
||||
end,
|
||||
io_lib:format("~ts(Q~p, R~p, D~p~ts)", [type_name(Type), QoS, i(Retain), i(Dup), S1]).
|
||||
retain = Retain}) ->
|
||||
io_lib:format("~ts(Q~p, R~p, D~p)", [type_name(Type), QoS, i(Retain), i(Dup)]).
|
||||
|
||||
format_variable(undefined, _) ->
|
||||
undefined;
|
||||
format_variable(Variable, undefined) ->
|
||||
format_variable(Variable);
|
||||
format_variable(Variable, Payload) ->
|
||||
io_lib:format("~ts, Payload=~ts", [format_variable(Variable), Payload]).
|
||||
format_variable(undefined, _, _) -> "";
|
||||
format_variable(Variable, undefined, PayloadEncode) ->
|
||||
format_variable(Variable, PayloadEncode);
|
||||
format_variable(Variable, Payload, PayloadEncode) ->
|
||||
[format_variable(Variable, PayloadEncode), format_payload(Payload, PayloadEncode)].
|
||||
|
||||
format_variable(#mqtt_packet_connect{
|
||||
proto_ver = ProtoVer,
|
||||
|
@ -467,55 +472,63 @@ format_variable(#mqtt_packet_connect{
|
|||
will_topic = WillTopic,
|
||||
will_payload = WillPayload,
|
||||
username = Username,
|
||||
password = Password}) ->
|
||||
Format = "ClientId=~ts, ProtoName=~ts, ProtoVsn=~p, CleanStart=~ts, KeepAlive=~p, Username=~ts, Password=~ts",
|
||||
Args = [ClientId, ProtoName, ProtoVer, CleanStart, KeepAlive, Username, format_password(Password)],
|
||||
{Format1, Args1} = if
|
||||
WillFlag -> {Format ++ ", Will(Q~p, R~p, Topic=~ts, Payload=~0p)",
|
||||
Args ++ [WillQoS, i(WillRetain), WillTopic, WillPayload]};
|
||||
true -> {Format, Args}
|
||||
end,
|
||||
io_lib:format(Format1, Args1);
|
||||
password = Password},
|
||||
PayloadEncode) ->
|
||||
Base = io_lib:format(
|
||||
"ClientId=~ts, ProtoName=~ts, ProtoVsn=~p, CleanStart=~ts, KeepAlive=~p, Username=~ts, Password=~ts",
|
||||
[ClientId, ProtoName, ProtoVer, CleanStart, KeepAlive, Username, format_password(Password)]),
|
||||
case WillFlag of
|
||||
true ->
|
||||
[Base, io_lib:format(", Will(Q~p, R~p, Topic=~ts ",
|
||||
[WillQoS, i(WillRetain), WillTopic]),
|
||||
format_payload(WillPayload, PayloadEncode), ")"];
|
||||
false ->
|
||||
Base
|
||||
end;
|
||||
|
||||
format_variable(#mqtt_packet_disconnect
|
||||
{reason_code = ReasonCode}) ->
|
||||
{reason_code = ReasonCode}, _) ->
|
||||
io_lib:format("ReasonCode=~p", [ReasonCode]);
|
||||
|
||||
format_variable(#mqtt_packet_connack{ack_flags = AckFlags,
|
||||
reason_code = ReasonCode}) ->
|
||||
reason_code = ReasonCode}, _) ->
|
||||
io_lib:format("AckFlags=~p, ReasonCode=~p", [AckFlags, ReasonCode]);
|
||||
|
||||
format_variable(#mqtt_packet_publish{topic_name = TopicName,
|
||||
packet_id = PacketId}) ->
|
||||
packet_id = PacketId}, _) ->
|
||||
io_lib:format("Topic=~ts, PacketId=~p", [TopicName, PacketId]);
|
||||
|
||||
format_variable(#mqtt_packet_puback{packet_id = PacketId,
|
||||
reason_code = ReasonCode}) ->
|
||||
reason_code = ReasonCode}, _) ->
|
||||
io_lib:format("PacketId=~p, ReasonCode=~p", [PacketId, ReasonCode]);
|
||||
|
||||
format_variable(#mqtt_packet_subscribe{packet_id = PacketId,
|
||||
topic_filters = TopicFilters}) ->
|
||||
topic_filters = TopicFilters}, _) ->
|
||||
io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, TopicFilters]);
|
||||
|
||||
format_variable(#mqtt_packet_unsubscribe{packet_id = PacketId,
|
||||
topic_filters = Topics}) ->
|
||||
topic_filters = Topics}, _) ->
|
||||
io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, Topics]);
|
||||
|
||||
format_variable(#mqtt_packet_suback{packet_id = PacketId,
|
||||
reason_codes = ReasonCodes}) ->
|
||||
reason_codes = ReasonCodes}, _) ->
|
||||
io_lib:format("PacketId=~p, ReasonCodes=~p", [PacketId, ReasonCodes]);
|
||||
|
||||
format_variable(#mqtt_packet_unsuback{packet_id = PacketId}) ->
|
||||
format_variable(#mqtt_packet_unsuback{packet_id = PacketId}, _) ->
|
||||
io_lib:format("PacketId=~p", [PacketId]);
|
||||
|
||||
format_variable(#mqtt_packet_auth{reason_code = ReasonCode}) ->
|
||||
format_variable(#mqtt_packet_auth{reason_code = ReasonCode}, _) ->
|
||||
io_lib:format("ReasonCode=~p", [ReasonCode]);
|
||||
|
||||
format_variable(PacketId) when is_integer(PacketId) ->
|
||||
format_variable(PacketId, _) when is_integer(PacketId) ->
|
||||
io_lib:format("PacketId=~p", [PacketId]).
|
||||
|
||||
format_password(undefined) -> undefined;
|
||||
format_password(_Password) -> '******'.
|
||||
format_password(undefined) -> "undefined";
|
||||
format_password(_Password) -> "******".
|
||||
|
||||
format_payload(Payload, text) -> ["Payload=", io_lib:format("~ts", [Payload])];
|
||||
format_payload(Payload, hex) -> ["Payload(hex)=", binary:encode_hex(Payload)];
|
||||
format_payload(_, null) -> "Payload=******".
|
||||
|
||||
i(true) -> 1;
|
||||
i(false) -> 0;
|
||||
|
|
|
@ -184,6 +184,12 @@ roots(low) ->
|
|||
, {"latency_stats",
|
||||
sc(ref("latency_stats"),
|
||||
#{})}
|
||||
, {"trace",
|
||||
sc(ref("trace"),
|
||||
#{desc => """
|
||||
Real-time filtering logs for the ClientID or Topic or IP for debugging.
|
||||
"""
|
||||
})}
|
||||
].
|
||||
|
||||
fields("persistent_session_store") ->
|
||||
|
@ -981,6 +987,17 @@ when deactivated, but after the retention time.
|
|||
fields("latency_stats") ->
|
||||
[ {"samples", sc(integer(), #{default => 10,
|
||||
desc => "the number of smaples for calculate the average latency of delivery"})}
|
||||
];
|
||||
fields("trace") ->
|
||||
[ {"payload_encode", sc(hoconsc:enum([hex, text, null]), #{
|
||||
default => text,
|
||||
desc => """
|
||||
Determine the format of the payload format in the trace file.</br>
|
||||
- `text`: Text-based protocol or plain text protocol. It is recommended when payload is json encode.</br>
|
||||
- `hex`: Binary hexadecimal encode. It is recommended when payload is a custom binary protocol.</br>
|
||||
- `null`: Don't show payload in trace log file.
|
||||
"""
|
||||
})}
|
||||
].
|
||||
|
||||
mqtt_listener() ->
|
||||
|
|
|
@ -84,22 +84,22 @@ mnesia(boot) ->
|
|||
publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore;
|
||||
publish(#message{from = From, topic = Topic, payload = Payload}) when
|
||||
is_binary(From); is_atom(From) ->
|
||||
?TRACE("PUBLISH", #{topic => Topic}, {publish, Payload}).
|
||||
?TRACE("PUBLISH", "publish_to", #{topic => Topic, payload => Payload}).
|
||||
|
||||
subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore;
|
||||
subscribe(Topic, SubId, SubOpts) ->
|
||||
?TRACE("SUBSCRIBE", #{topic => Topic}, {subscribe, SubId, SubOpts}).
|
||||
?TRACE("SUBSCRIBE", "subscribe", #{topic => Topic, sub_opts => SubOpts, sub_id => SubId}).
|
||||
|
||||
unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore;
|
||||
unsubscribe(Topic, SubOpts) ->
|
||||
?TRACE("UNSUBSCRIBE", #{topic => Topic}, {unsubscribe, SubOpts}).
|
||||
?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
|
||||
|
||||
log(Action, Meta0, Msg) ->
|
||||
log(Event, Msg, Meta0) ->
|
||||
case persistent_term:get(?TRACE_FILTER, undefined) of
|
||||
undefined -> ok;
|
||||
List ->
|
||||
Meta = maps:merge(logger:get_process_metadata(), Meta0),
|
||||
Log = #{level => trace, action => Action, meta => Meta, msg => Msg},
|
||||
Log = #{level => trace, event => Event, meta => Meta, msg => Msg},
|
||||
log_filter(List, Log)
|
||||
end.
|
||||
|
||||
|
@ -112,11 +112,12 @@ log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) ->
|
|||
case logger_config:get(ets:whereis(logger), Id) of
|
||||
{ok, #{module := Module} = HandlerConfig0} ->
|
||||
HandlerConfig = maps:without(?OWN_KEYS, HandlerConfig0),
|
||||
io:format("~p~n", [{Module, Log, HandlerConfig}]),
|
||||
try Module:log(Log, HandlerConfig)
|
||||
catch C:R:S ->
|
||||
case logger:remove_handler(Id) of
|
||||
ok ->
|
||||
logger:internal_log(error, {removed_failing_handler, Id});
|
||||
logger:internal_log(error, {removed_failing_handler, Id, C, R, S});
|
||||
{error,{not_found,_}} ->
|
||||
%% Probably already removed by other client
|
||||
%% Don't report again
|
||||
|
@ -528,5 +529,5 @@ update_trace_handler() ->
|
|||
|
||||
filter_cli_handler(Names) ->
|
||||
lists:filter(fun(Name) ->
|
||||
notmatch =:= re:run(Name, "^CLI-+.", [])
|
||||
nomatch =:= re:run(Name, "^CLI-+.", [])
|
||||
end, Names).
|
||||
|
|
|
@ -22,35 +22,36 @@
|
|||
-spec format(LogEvent, Config) -> unicode:chardata() when
|
||||
LogEvent :: logger:log_event(),
|
||||
Config :: logger:config().
|
||||
format(#{level := trace, msg := Msg, meta := Meta, action := Action}, _Config) ->
|
||||
format(#{level := trace, event := Event, meta := Meta, msg := Msg},
|
||||
#{payload_encode := PEncode}) ->
|
||||
Time = calendar:system_time_to_rfc3339(erlang:system_time(second)),
|
||||
ClientId = maps:get(clientid, Meta, ""),
|
||||
Peername = maps:get(peername, Meta, ""),
|
||||
MsgBin = format_msg(Msg),
|
||||
MetaBin = format_map(maps:without([clientid, peername], Meta)),
|
||||
[Time, " [", Action, "] ", ClientId, "@", Peername, " ", MsgBin, " ( ",
|
||||
MetaBin, ")\n"];
|
||||
MetaBin = format_meta(Meta, PEncode),
|
||||
[Time, " [", Event, "] ", ClientId, "@", Peername, " msg: ", Msg, MetaBin, "\n"];
|
||||
|
||||
format(Event, Config) ->
|
||||
emqx_logger_textfmt:format(Event, Config).
|
||||
|
||||
format_msg(Bin)when is_binary(Bin) -> Bin;
|
||||
format_msg(List) when is_list(List) -> List;
|
||||
format_msg({publish, Payload}) ->
|
||||
io_lib:format("Publish Payload:(~ts) TO ", [Payload]);
|
||||
format_msg({subscribe, SubId, SubOpts}) ->
|
||||
[io_lib:format("SUBSCRIBE ~ts, Opts( ", [SubId]),
|
||||
format_map(SubOpts), ")"];
|
||||
format_msg({unsubscribe, SubOpts}) ->
|
||||
[io_lib:format("UNSUBSCRIBE ~ts, Opts( ", [maps:get(subid, SubOpts, "undefined")]),
|
||||
format_map(maps:without([subid], SubOpts)), ")"];
|
||||
format_msg(Packet) ->
|
||||
emqx_packet:format(Packet).
|
||||
format_meta(Meta0, Encode) ->
|
||||
Packet = format_packet(maps:get(packet, Meta0, undefined), Encode),
|
||||
Payload = format_payload(maps:get(payload, Meta0, undefined), Encode),
|
||||
Meta1 = maps:without([msg, clientid, peername, packet, payload], Meta0),
|
||||
case Meta1 =:= #{} of
|
||||
true -> [Packet, Payload];
|
||||
false ->
|
||||
Meta2 = lists:map(fun({K, V}) -> [to_iolist(K), ": ", to_iolist(V)] end,
|
||||
maps:to_list(Meta1)),
|
||||
[Packet, ", ", lists:join(",", Meta2), Payload]
|
||||
end.
|
||||
|
||||
format_map(Map) ->
|
||||
maps:fold(fun(K, V, Acc) ->
|
||||
[to_iolist(K), ":", to_iolist(V), " "|Acc]
|
||||
end, [], Map).
|
||||
format_packet(undefined, _) -> "";
|
||||
format_packet(Packet, Encode) -> [", packet: ", emqx_packet:format(Packet, Encode)].
|
||||
|
||||
format_payload(undefined, _) -> "";
|
||||
format_payload(Payload, text) -> [", payload: ", io_lib:format("~ts", [Payload])];
|
||||
format_payload(Payload, hex) -> [", payload(hex): ", binary:encode_hex(Payload)];
|
||||
format_payload(_, null) -> ", payload=******".
|
||||
|
||||
to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom);
|
||||
to_iolist(Int) when is_integer(Int) -> integer_to_list(Int);
|
||||
|
|
|
@ -35,8 +35,10 @@
|
|||
, filter_topic/2
|
||||
, filter_ip_address/2
|
||||
]).
|
||||
-export([template/1]).
|
||||
|
||||
-export([handler_id/2]).
|
||||
-export([payload_encode/0]).
|
||||
|
||||
-type tracer() :: #{
|
||||
name := binary(),
|
||||
|
@ -145,13 +147,14 @@ filters(#{type := topic, filter := Filter, name := Name}) ->
|
|||
filters(#{type := ip_address, filter := Filter, name := Name}) ->
|
||||
[{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}].
|
||||
|
||||
formatter(#{type := Type}) ->
|
||||
formatter(#{type := _Type}) ->
|
||||
{emqx_trace_formatter,
|
||||
#{
|
||||
template => template(Type),
|
||||
template => [],
|
||||
single_line => false,
|
||||
max_size => unlimited,
|
||||
depth => unlimited
|
||||
depth => unlimited,
|
||||
payload_encode => payload_encode()
|
||||
}
|
||||
}.
|
||||
|
||||
|
@ -181,6 +184,8 @@ filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc)
|
|||
Acc
|
||||
end.
|
||||
|
||||
payload_encode() -> emqx_config:get([trace, payload_encode], text).
|
||||
|
||||
handler_id(Name, Type) ->
|
||||
try
|
||||
do_handler_id(Name, Type)
|
||||
|
|
|
@ -431,11 +431,11 @@ websocket_info(Info, State) ->
|
|||
websocket_close({_, ReasonCode, _Payload}, State) when is_integer(ReasonCode) ->
|
||||
websocket_close(ReasonCode, State);
|
||||
websocket_close(Reason, State) ->
|
||||
?TRACE("CLOSED", #{transport => websocket, reason => Reason}, "websocket_closed"),
|
||||
?TRACE("SOCKET", "websocket_closed", #{reason => Reason}),
|
||||
handle_info({sock_closed, Reason}, State).
|
||||
|
||||
terminate(Reason, _Req, #state{channel = Channel}) ->
|
||||
?TRACE("TERMINATE", #{transport => websocket, reason => Reason}, "webscoket_terminated"),
|
||||
?TRACE("SOCKET", "websocket_terminated", #{reason => Reason}),
|
||||
emqx_channel:terminate(Reason, Channel);
|
||||
|
||||
terminate(_Reason, _Req, _UnExpectedState) ->
|
||||
|
@ -479,7 +479,7 @@ handle_info({connack, ConnAck}, State) ->
|
|||
return(enqueue(ConnAck, State));
|
||||
|
||||
handle_info({close, Reason}, State) ->
|
||||
?TRACE("CLOSE", #{reason => Reason}, "force_socket_close"),
|
||||
?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}),
|
||||
return(enqueue({close, Reason}, State));
|
||||
|
||||
handle_info({event, connected}, State = #state{channel = Channel}) ->
|
||||
|
@ -662,7 +662,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
|||
|
||||
handle_incoming(Packet, State = #state{listener = {Type, Listener}})
|
||||
when is_record(Packet, mqtt_packet) ->
|
||||
?TRACE("RECV", #{transport => websocket}, Packet),
|
||||
?TRACE("WS-MQTT", "mqtt_packet_received", #{packet => Packet}),
|
||||
ok = inc_incoming_stats(Packet),
|
||||
NState = case emqx_pd:get_counter(incoming_pubs) >
|
||||
get_active_n(Type, Listener) of
|
||||
|
@ -726,7 +726,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
|||
ok = emqx_metrics:inc('delivery.dropped.too_large'),
|
||||
ok = emqx_metrics:inc('delivery.dropped'),
|
||||
<<>>;
|
||||
Data -> ?TRACE("SEND", #{transport => websocket}, Packet),
|
||||
Data -> ?TRACE("WS-MQTT", "mqtt_packet_sent", #{packet => Packet}),
|
||||
ok = inc_outgoing_stats(Packet),
|
||||
Data
|
||||
catch
|
||||
|
|
|
@ -74,9 +74,19 @@ t_base_test(_Config) ->
|
|||
?assertEqual(node(), maps:get(initiator, Query)),
|
||||
?assert(maps:is_key(created_at, Query)),
|
||||
?assertEqual(ok, receive_msg(3, test)),
|
||||
?assertEqual({ok, 2, ok}, emqx_cluster_rpc:multicall(M, F, A)),
|
||||
{atomic, Status} = emqx_cluster_rpc:status(),
|
||||
?assertEqual(3, length(Status)),
|
||||
?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 1 end, Status)),
|
||||
case length(Status) =:= 3 of
|
||||
true -> ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status));
|
||||
false ->
|
||||
%% wait for mnesia to write in.
|
||||
ct:sleep(42),
|
||||
{atomic, Status1} = emqx_cluster_rpc:status(),
|
||||
ct:pal("status: ~p", Status),
|
||||
ct:pal("status1: ~p", Status1),
|
||||
?assertEqual(3, length(Status1)),
|
||||
?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status))
|
||||
end,
|
||||
ok.
|
||||
|
||||
t_commit_fail_test(_Config) ->
|
||||
|
|
|
@ -199,8 +199,8 @@ on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
|
|||
on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State);
|
||||
on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery,
|
||||
#{pool_name := PoolName, base_path := BasePath} = State) ->
|
||||
?TRACE("QUERY", #{request => Request, connector => InstId, state => State},
|
||||
"http connector received request"),
|
||||
?TRACE("QUERY", "http_connector_received",
|
||||
#{request => Request, connector => InstId, state => State}),
|
||||
NRequest = update_path(BasePath, Request),
|
||||
case Result = ehttpc:request(case KeyOrNum of
|
||||
undefined -> PoolName;
|
||||
|
|
|
@ -87,16 +87,15 @@ on_stop(InstId, #{poolname := PoolName}) ->
|
|||
|
||||
on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) ->
|
||||
Request = {Base, Filter, Attributes},
|
||||
?TRACE("QUERY", #{request => Request, connector => InstId, state => State},
|
||||
"ldap connector received request"),
|
||||
?TRACE("QUERY", "ldap_connector_received",
|
||||
#{request => Request, connector => InstId, state => State}),
|
||||
case Result = ecpool:pick_and_do(
|
||||
PoolName,
|
||||
{?MODULE, search, [Base, Filter, Attributes]},
|
||||
no_handover) of
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "ldap connector do request failed",
|
||||
request => Request, connector => InstId,
|
||||
reason => Reason}),
|
||||
?SLOG(error, #{msg => "ldap_connector_do_request_failed",
|
||||
request => Request, connector => InstId, reason => Reason}),
|
||||
emqx_resource:query_failed(AfterQuery);
|
||||
_ ->
|
||||
emqx_resource:query_success(AfterQuery)
|
||||
|
|
|
@ -137,13 +137,13 @@ on_query(InstId,
|
|||
AfterQuery,
|
||||
#{poolname := PoolName} = State) ->
|
||||
Request = {Action, Collection, Selector, Docs},
|
||||
?TRACE("QUERY", #{request => Request, connector => InstId, state => State},
|
||||
"mongodb connector received request"),
|
||||
?TRACE("QUERY", "mongodb_connector_received",
|
||||
#{request => Request, connector => InstId, state => State}),
|
||||
case ecpool:pick_and_do(PoolName,
|
||||
{?MODULE, mongo_query, [Action, Collection, Selector, Docs]},
|
||||
no_handover) of
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "mongodb connector do query failed",
|
||||
?SLOG(error, #{msg => "mongodb_connector_do_query_failed",
|
||||
request => Request, reason => Reason,
|
||||
connector => InstId}),
|
||||
emqx_resource:query_failed(AfterQuery),
|
||||
|
|
|
@ -118,7 +118,7 @@ on_message_received(Msg, HookPoint) ->
|
|||
%% ===================================================================
|
||||
on_start(InstId, Conf) ->
|
||||
InstanceId = binary_to_atom(InstId, utf8),
|
||||
?SLOG(info, #{msg => "starting mqtt connector",
|
||||
?SLOG(info, #{msg => "starting_mqtt_connector",
|
||||
connector => InstanceId, config => Conf}),
|
||||
BasicConf = basic_config(Conf),
|
||||
BridgeConf = BasicConf#{
|
||||
|
@ -139,19 +139,18 @@ on_start(InstId, Conf) ->
|
|||
end.
|
||||
|
||||
on_stop(_InstId, #{name := InstanceId}) ->
|
||||
?SLOG(info, #{msg => "stopping mqtt connector",
|
||||
?SLOG(info, #{msg => "stopping_mqtt_connector",
|
||||
connector => InstanceId}),
|
||||
case ?MODULE:drop_bridge(InstanceId) of
|
||||
ok -> ok;
|
||||
{error, not_found} -> ok;
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "stop mqtt connector",
|
||||
?SLOG(error, #{msg => "stop_mqtt_connector",
|
||||
connector => InstanceId, reason => Reason})
|
||||
end.
|
||||
|
||||
on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) ->
|
||||
?TRACE("QUERY", #{message => Msg, connector => InstanceId},
|
||||
"send msg to remote node"),
|
||||
?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
|
||||
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
|
||||
emqx_resource:query_success(AfterQuery).
|
||||
|
||||
|
|
|
@ -85,14 +85,13 @@ on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
|
|||
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) ->
|
||||
on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State);
|
||||
on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) ->
|
||||
?TRACE("QUERY", #{connector => InstId, sql => SQL, state => State},
|
||||
"mysql connector received sql query"),
|
||||
?TRACE("QUERY", "mysql_connector_received", #{connector => InstId, sql => SQL, state => State}),
|
||||
case Result = ecpool:pick_and_do(
|
||||
PoolName,
|
||||
{mysql, query, [SQL, Params, Timeout]},
|
||||
no_handover) of
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "mysql connector do sql query failed",
|
||||
?SLOG(error, #{msg => "mysql_connector_do_sql_query_failed",
|
||||
connector => InstId, sql => SQL, reason => Reason}),
|
||||
emqx_resource:query_failed(AfterQuery);
|
||||
_ ->
|
||||
|
|
|
@ -83,12 +83,12 @@ on_stop(InstId, #{poolname := PoolName}) ->
|
|||
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
|
||||
on_query(InstId, {sql, SQL, []}, AfterQuery, State);
|
||||
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
|
||||
?TRACE("QUERY", #{connector => InstId, sql => SQL, state => State},
|
||||
"postgresql connector received sql query"),
|
||||
?TRACE("QUERY", "postgresql_connector_received",
|
||||
#{connector => InstId, sql => SQL, state => State}),
|
||||
case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{
|
||||
msg => "postgresql connector do sql query failed",
|
||||
msg => "postgresql_connector_do_sql_query_failed",
|
||||
connector => InstId, sql => SQL, reason => Reason}),
|
||||
emqx_resource:query_failed(AfterQuery);
|
||||
_ ->
|
||||
|
|
|
@ -125,15 +125,15 @@ on_stop(InstId, #{poolname := PoolName}) ->
|
|||
emqx_plugin_libs_pool:stop_pool(PoolName).
|
||||
|
||||
on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) ->
|
||||
?TRACE("QUERY", #{connector => InstId, sql => Command, state => State},
|
||||
"redis connector received cmd query"),
|
||||
?TRACE("QUERY", "redis_connector_received",
|
||||
#{connector => InstId, sql => Command, state => State}),
|
||||
Result = case Type of
|
||||
cluster -> eredis_cluster:q(PoolName, Command);
|
||||
_ -> ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover)
|
||||
end,
|
||||
case Result of
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "redis connector do cmd query failed",
|
||||
?SLOG(error, #{msg => "redis_connector_do_cmd_query_failed",
|
||||
connector => InstId, sql => Command, reason => Reason}),
|
||||
emqx_resource:query_failed(AfterCommand);
|
||||
_ ->
|
||||
|
|
|
@ -85,7 +85,7 @@ republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}},
|
|||
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
|
||||
QoS = replace_simple_var(QoSTks, Selected, 0),
|
||||
Retain = replace_simple_var(RetainTks, Selected, false),
|
||||
?TRACE("REPUBLISH", #{topic => Topic, payload => Payload}, "republish message"),
|
||||
?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}),
|
||||
safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload);
|
||||
|
||||
%% in case this is a "$events/" event
|
||||
|
@ -99,7 +99,7 @@ republish(Selected, #{metadata := #{rule_id := RuleId}},
|
|||
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
|
||||
QoS = replace_simple_var(QoSTks, Selected, 0),
|
||||
Retain = replace_simple_var(RetainTks, Selected, false),
|
||||
?TRACE("REPUBLISH", #{topic => Topic, payload => Payload}, "republish"),
|
||||
?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}),
|
||||
safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -248,7 +248,7 @@ handle_output(OutId, Selected, Envs) ->
|
|||
end.
|
||||
|
||||
do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
|
||||
?TRACE("SEND", #{bridge_id => BridgeId}, "output to bridge"),
|
||||
?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}),
|
||||
emqx_bridge:send_message(BridgeId, Selected);
|
||||
do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
|
||||
Mod:Func(Selected, Envs, Args).
|
||||
|
|
|
@ -77,7 +77,7 @@ flatten([D1 | L]) when is_list(D1) ->
|
|||
D1 ++ flatten(L).
|
||||
|
||||
echo_action(Data, Envs) ->
|
||||
?TRACE("TEST", #{data => Data, envs => Envs}, "testing_rule_sql_ok"),
|
||||
?TRACE("RULE", "testing_rule_sql_ok", #{data => Data, envs => Envs}),
|
||||
Data.
|
||||
|
||||
fill_default_values(Event, Context) ->
|
||||
|
|
Loading…
Reference in New Issue