From 4b6bba11eb3ddec8fbd64745f44b0bc9f42035d5 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 28 Dec 2021 23:46:22 +0800 Subject: [PATCH] feat(trace): struct log for trace --- apps/emqx/include/logger.hrl | 2 +- apps/emqx/src/emqx_broker.erl | 2 +- apps/emqx/src/emqx_channel.erl | 2 +- apps/emqx/src/emqx_connection.erl | 10 +-- apps/emqx/src/emqx_packet.erl | 85 +++++++++++-------- apps/emqx/src/emqx_schema.erl | 17 ++++ apps/emqx/src/emqx_trace/emqx_trace.erl | 15 ++-- .../src/emqx_trace/emqx_trace_formatter.erl | 43 +++++----- .../src/emqx_trace/emqx_trace_handler.erl | 11 ++- apps/emqx/src/emqx_ws_connection.erl | 10 +-- .../emqx_conf/test/emqx_cluster_rpc_SUITE.erl | 14 ++- .../src/emqx_connector_http.erl | 4 +- .../src/emqx_connector_ldap.erl | 9 +- .../src/emqx_connector_mongo.erl | 6 +- .../src/emqx_connector_mqtt.erl | 9 +- .../src/emqx_connector_mysql.erl | 5 +- .../src/emqx_connector_pgsql.erl | 6 +- .../src/emqx_connector_redis.erl | 6 +- .../src/emqx_rule_outputs.erl | 4 +- .../src/emqx_rule_runtime.erl | 2 +- .../src/emqx_rule_sqltester.erl | 2 +- 21 files changed, 154 insertions(+), 110 deletions(-) diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index ecedfafe7..42d598ef9 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -69,7 +69,7 @@ ok end). --define(TRACE(Action, Meta, Msg), emqx_trace:log(Action, Meta, Msg)). +-define(TRACE(Event, Msg, Meta), emqx_trace:log(Event, Msg, Meta)). %% print to 'user' group leader -define(ULOG(Fmt, Args), io:format(user, Fmt, Args)). diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 6dce69136..dec753fc2 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -205,7 +205,7 @@ publish(Msg) when is_record(Msg, message) -> emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'), case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of #message{headers = #{allow_publish := false}} -> - ?TRACE("NotAllow", #{payload => emqx_message:to_log_map(Msg)}, "message_not_published"), + ?TRACE("MQTT", "msg_publish_not_allowed", #{message => emqx_message:to_log_map(Msg)}), []; Msg1 = #message{topic = Topic} -> emqx_persistent_session:persist_message(Msg1), diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 7fe366ad1..0f83b04ff 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -292,7 +292,7 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) -> fun check_banned/2 ], ConnPkt, Channel#channel{conn_state = connecting}) of {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} -> - ?TRACE("RECV", #{}, Packet), + ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}), NChannel1 = NChannel#channel{ will_msg = emqx_packet:will_msg(NConnPkt), alias_maximum = init_alias_maximum(NConnPkt, ClientInfo) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index b8a6b4b7b..37e15f522 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -526,7 +526,7 @@ handle_msg({connack, ConnAck}, State) -> handle_outgoing(ConnAck, State); handle_msg({close, Reason}, State) -> - ?TRACE("CLOSE", #{reason => Reason}, "force_socket_close"), + ?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}), handle_info({sock_closed, Reason}, close_socket(State)); handle_msg({event, connected}, State = #state{channel = Channel}) -> @@ -565,7 +565,7 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport, emqx_congestion:cancel_alarms(Socket, Transport, Channel1), emqx_channel:terminate(Reason, Channel1), close_socket_ok(State), - ?TRACE("TERMINATE", #{reason => Reason}, "terminated") + ?TRACE("SOCKET", "tcp_socket_terminated", #{reason => Reason}) catch E : C : S -> ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S}) @@ -715,7 +715,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) -> ok = inc_incoming_stats(Packet), - ?TRACE("RECV", #{}, Packet), + ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}), with_channel(handle_in, [Packet], State); handle_incoming(FrameError, State) -> @@ -754,13 +754,13 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> <<>> -> ?SLOG(warning, #{ msg => "packet_is_discarded", reason => "frame_is_too_large", - packet => emqx_packet:format(Packet) + packet => emqx_packet:format(Packet, null) }), ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), <<>>; Data -> - ?TRACE("SEND", #{}, Packet), + ?TRACE("MQTT", "mqtt_packet_sent", #{packet => Packet}), ok = inc_outgoing_stats(Packet), Data catch diff --git a/apps/emqx/src/emqx_packet.erl b/apps/emqx/src/emqx_packet.erl index 02165a6b5..53e4eabfe 100644 --- a/apps/emqx/src/emqx_packet.erl +++ b/apps/emqx/src/emqx_packet.erl @@ -44,7 +44,9 @@ , will_msg/1 ]). --export([format/1]). +-export([ format/1 + , format/2 + ]). -define(TYPE_NAMES, { 'CONNECT' @@ -435,25 +437,28 @@ will_msg(#mqtt_packet_connect{clientid = ClientId, %% @doc Format packet -spec(format(emqx_types:packet()) -> iolist()). -format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) -> - format_header(Header, format_variable(Variable, Payload)). +format(Packet) -> format(Packet, emqx_trace_handler:payload_encode()). + +%% @doc Format packet +-spec(format(emqx_types:packet(), hex | text | null) -> iolist()). +format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, PayloadEncode) -> + HeaderIO = format_header(Header), + case format_variable(Variable, Payload, PayloadEncode) of + "" -> HeaderIO; + VarIO -> [HeaderIO,",", VarIO] + end. format_header(#mqtt_packet_header{type = Type, dup = Dup, qos = QoS, - retain = Retain}, S) -> - S1 = case S == undefined of - true -> <<>>; - false -> [", ", S] - end, - io_lib:format("~ts(Q~p, R~p, D~p~ts)", [type_name(Type), QoS, i(Retain), i(Dup), S1]). + retain = Retain}) -> + io_lib:format("~ts(Q~p, R~p, D~p)", [type_name(Type), QoS, i(Retain), i(Dup)]). -format_variable(undefined, _) -> - undefined; -format_variable(Variable, undefined) -> - format_variable(Variable); -format_variable(Variable, Payload) -> - io_lib:format("~ts, Payload=~ts", [format_variable(Variable), Payload]). +format_variable(undefined, _, _) -> ""; +format_variable(Variable, undefined, PayloadEncode) -> + format_variable(Variable, PayloadEncode); +format_variable(Variable, Payload, PayloadEncode) -> + [format_variable(Variable, PayloadEncode), format_payload(Payload, PayloadEncode)]. format_variable(#mqtt_packet_connect{ proto_ver = ProtoVer, @@ -467,55 +472,63 @@ format_variable(#mqtt_packet_connect{ will_topic = WillTopic, will_payload = WillPayload, username = Username, - password = Password}) -> - Format = "ClientId=~ts, ProtoName=~ts, ProtoVsn=~p, CleanStart=~ts, KeepAlive=~p, Username=~ts, Password=~ts", - Args = [ClientId, ProtoName, ProtoVer, CleanStart, KeepAlive, Username, format_password(Password)], - {Format1, Args1} = if - WillFlag -> {Format ++ ", Will(Q~p, R~p, Topic=~ts, Payload=~0p)", - Args ++ [WillQoS, i(WillRetain), WillTopic, WillPayload]}; - true -> {Format, Args} - end, - io_lib:format(Format1, Args1); + password = Password}, + PayloadEncode) -> + Base = io_lib:format( + "ClientId=~ts, ProtoName=~ts, ProtoVsn=~p, CleanStart=~ts, KeepAlive=~p, Username=~ts, Password=~ts", + [ClientId, ProtoName, ProtoVer, CleanStart, KeepAlive, Username, format_password(Password)]), + case WillFlag of + true -> + [Base, io_lib:format(", Will(Q~p, R~p, Topic=~ts ", + [WillQoS, i(WillRetain), WillTopic]), + format_payload(WillPayload, PayloadEncode), ")"]; + false -> + Base + end; format_variable(#mqtt_packet_disconnect - {reason_code = ReasonCode}) -> + {reason_code = ReasonCode}, _) -> io_lib:format("ReasonCode=~p", [ReasonCode]); format_variable(#mqtt_packet_connack{ack_flags = AckFlags, - reason_code = ReasonCode}) -> + reason_code = ReasonCode}, _) -> io_lib:format("AckFlags=~p, ReasonCode=~p", [AckFlags, ReasonCode]); format_variable(#mqtt_packet_publish{topic_name = TopicName, - packet_id = PacketId}) -> + packet_id = PacketId}, _) -> io_lib:format("Topic=~ts, PacketId=~p", [TopicName, PacketId]); format_variable(#mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode}) -> + reason_code = ReasonCode}, _) -> io_lib:format("PacketId=~p, ReasonCode=~p", [PacketId, ReasonCode]); format_variable(#mqtt_packet_subscribe{packet_id = PacketId, - topic_filters = TopicFilters}) -> + topic_filters = TopicFilters}, _) -> io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, TopicFilters]); format_variable(#mqtt_packet_unsubscribe{packet_id = PacketId, - topic_filters = Topics}) -> + topic_filters = Topics}, _) -> io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, Topics]); format_variable(#mqtt_packet_suback{packet_id = PacketId, - reason_codes = ReasonCodes}) -> + reason_codes = ReasonCodes}, _) -> io_lib:format("PacketId=~p, ReasonCodes=~p", [PacketId, ReasonCodes]); -format_variable(#mqtt_packet_unsuback{packet_id = PacketId}) -> +format_variable(#mqtt_packet_unsuback{packet_id = PacketId}, _) -> io_lib:format("PacketId=~p", [PacketId]); -format_variable(#mqtt_packet_auth{reason_code = ReasonCode}) -> +format_variable(#mqtt_packet_auth{reason_code = ReasonCode}, _) -> io_lib:format("ReasonCode=~p", [ReasonCode]); -format_variable(PacketId) when is_integer(PacketId) -> +format_variable(PacketId, _) when is_integer(PacketId) -> io_lib:format("PacketId=~p", [PacketId]). -format_password(undefined) -> undefined; -format_password(_Password) -> '******'. +format_password(undefined) -> "undefined"; +format_password(_Password) -> "******". + +format_payload(Payload, text) -> ["Payload=", io_lib:format("~ts", [Payload])]; +format_payload(Payload, hex) -> ["Payload(hex)=", binary:encode_hex(Payload)]; +format_payload(_, null) -> "Payload=******". i(true) -> 1; i(false) -> 0; diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 85b411217..cbf21683d 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -184,6 +184,12 @@ roots(low) -> , {"latency_stats", sc(ref("latency_stats"), #{})} + , {"trace", + sc(ref("trace"), + #{desc => """ +Real-time filtering logs for the ClientID or Topic or IP for debugging. +""" + })} ]. fields("persistent_session_store") -> @@ -981,6 +987,17 @@ when deactivated, but after the retention time. fields("latency_stats") -> [ {"samples", sc(integer(), #{default => 10, desc => "the number of smaples for calculate the average latency of delivery"})} + ]; +fields("trace") -> + [ {"payload_encode", sc(hoconsc:enum([hex, text, null]), #{ + default => text, + desc => """ +Determine the format of the payload format in the trace file.
+- `text`: Text-based protocol or plain text protocol. It is recommended when payload is json encode.
+- `hex`: Binary hexadecimal encode. It is recommended when payload is a custom binary protocol.
+- `null`: Don't show payload in trace log file. + """ + })} ]. mqtt_listener() -> diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index f4679e073..7bca3ca3c 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -84,22 +84,22 @@ mnesia(boot) -> publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore; publish(#message{from = From, topic = Topic, payload = Payload}) when is_binary(From); is_atom(From) -> - ?TRACE("PUBLISH", #{topic => Topic}, {publish, Payload}). + ?TRACE("PUBLISH", "publish_to", #{topic => Topic, payload => Payload}). subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore; subscribe(Topic, SubId, SubOpts) -> - ?TRACE("SUBSCRIBE", #{topic => Topic}, {subscribe, SubId, SubOpts}). + ?TRACE("SUBSCRIBE", "subscribe", #{topic => Topic, sub_opts => SubOpts, sub_id => SubId}). unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore; unsubscribe(Topic, SubOpts) -> - ?TRACE("UNSUBSCRIBE", #{topic => Topic}, {unsubscribe, SubOpts}). + ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}). -log(Action, Meta0, Msg) -> +log(Event, Msg, Meta0) -> case persistent_term:get(?TRACE_FILTER, undefined) of undefined -> ok; List -> Meta = maps:merge(logger:get_process_metadata(), Meta0), - Log = #{level => trace, action => Action, meta => Meta, msg => Msg}, + Log = #{level => trace, event => Event, meta => Meta, msg => Msg}, log_filter(List, Log) end. @@ -112,11 +112,12 @@ log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) -> case logger_config:get(ets:whereis(logger), Id) of {ok, #{module := Module} = HandlerConfig0} -> HandlerConfig = maps:without(?OWN_KEYS, HandlerConfig0), + io:format("~p~n", [{Module, Log, HandlerConfig}]), try Module:log(Log, HandlerConfig) catch C:R:S -> case logger:remove_handler(Id) of ok -> - logger:internal_log(error, {removed_failing_handler, Id}); + logger:internal_log(error, {removed_failing_handler, Id, C, R, S}); {error,{not_found,_}} -> %% Probably already removed by other client %% Don't report again @@ -528,5 +529,5 @@ update_trace_handler() -> filter_cli_handler(Names) -> lists:filter(fun(Name) -> - notmatch =:= re:run(Name, "^CLI-+.", []) + nomatch =:= re:run(Name, "^CLI-+.", []) end, Names). diff --git a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl index 1b37b7130..a9c4ca31d 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl @@ -22,35 +22,36 @@ -spec format(LogEvent, Config) -> unicode:chardata() when LogEvent :: logger:log_event(), Config :: logger:config(). -format(#{level := trace, msg := Msg, meta := Meta, action := Action}, _Config) -> +format(#{level := trace, event := Event, meta := Meta, msg := Msg}, + #{payload_encode := PEncode}) -> Time = calendar:system_time_to_rfc3339(erlang:system_time(second)), ClientId = maps:get(clientid, Meta, ""), Peername = maps:get(peername, Meta, ""), - MsgBin = format_msg(Msg), - MetaBin = format_map(maps:without([clientid, peername], Meta)), - [Time, " [", Action, "] ", ClientId, "@", Peername, " ", MsgBin, " ( ", - MetaBin, ")\n"]; + MetaBin = format_meta(Meta, PEncode), + [Time, " [", Event, "] ", ClientId, "@", Peername, " msg: ", Msg, MetaBin, "\n"]; format(Event, Config) -> emqx_logger_textfmt:format(Event, Config). -format_msg(Bin)when is_binary(Bin) -> Bin; -format_msg(List) when is_list(List) -> List; -format_msg({publish, Payload}) -> - io_lib:format("Publish Payload:(~ts) TO ", [Payload]); -format_msg({subscribe, SubId, SubOpts}) -> - [io_lib:format("SUBSCRIBE ~ts, Opts( ", [SubId]), - format_map(SubOpts), ")"]; -format_msg({unsubscribe, SubOpts}) -> - [io_lib:format("UNSUBSCRIBE ~ts, Opts( ", [maps:get(subid, SubOpts, "undefined")]), - format_map(maps:without([subid], SubOpts)), ")"]; -format_msg(Packet) -> - emqx_packet:format(Packet). +format_meta(Meta0, Encode) -> + Packet = format_packet(maps:get(packet, Meta0, undefined), Encode), + Payload = format_payload(maps:get(payload, Meta0, undefined), Encode), + Meta1 = maps:without([msg, clientid, peername, packet, payload], Meta0), + case Meta1 =:= #{} of + true -> [Packet, Payload]; + false -> + Meta2 = lists:map(fun({K, V}) -> [to_iolist(K), ": ", to_iolist(V)] end, + maps:to_list(Meta1)), + [Packet, ", ", lists:join(",", Meta2), Payload] + end. -format_map(Map) -> - maps:fold(fun(K, V, Acc) -> - [to_iolist(K), ":", to_iolist(V), " "|Acc] - end, [], Map). +format_packet(undefined, _) -> ""; +format_packet(Packet, Encode) -> [", packet: ", emqx_packet:format(Packet, Encode)]. + +format_payload(undefined, _) -> ""; +format_payload(Payload, text) -> [", payload: ", io_lib:format("~ts", [Payload])]; +format_payload(Payload, hex) -> [", payload(hex): ", binary:encode_hex(Payload)]; +format_payload(_, null) -> ", payload=******". to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom); to_iolist(Int) when is_integer(Int) -> integer_to_list(Int); diff --git a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl index 9c991301c..d1fbd20b8 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl @@ -35,8 +35,10 @@ , filter_topic/2 , filter_ip_address/2 ]). +-export([template/1]). -export([handler_id/2]). +-export([payload_encode/0]). -type tracer() :: #{ name := binary(), @@ -145,13 +147,14 @@ filters(#{type := topic, filter := Filter, name := Name}) -> filters(#{type := ip_address, filter := Filter, name := Name}) -> [{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}]. -formatter(#{type := Type}) -> +formatter(#{type := _Type}) -> {emqx_trace_formatter, #{ - template => template(Type), + template => [], single_line => false, max_size => unlimited, - depth => unlimited + depth => unlimited, + payload_encode => payload_encode() } }. @@ -181,6 +184,8 @@ filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) Acc end. +payload_encode() -> emqx_config:get([trace, payload_encode], text). + handler_id(Name, Type) -> try do_handler_id(Name, Type) diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 6e6bc1c90..f70d7ac6c 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -431,11 +431,11 @@ websocket_info(Info, State) -> websocket_close({_, ReasonCode, _Payload}, State) when is_integer(ReasonCode) -> websocket_close(ReasonCode, State); websocket_close(Reason, State) -> - ?TRACE("CLOSED", #{transport => websocket, reason => Reason}, "websocket_closed"), + ?TRACE("SOCKET", "websocket_closed", #{reason => Reason}), handle_info({sock_closed, Reason}, State). terminate(Reason, _Req, #state{channel = Channel}) -> - ?TRACE("TERMINATE", #{transport => websocket, reason => Reason}, "webscoket_terminated"), + ?TRACE("SOCKET", "websocket_terminated", #{reason => Reason}), emqx_channel:terminate(Reason, Channel); terminate(_Reason, _Req, _UnExpectedState) -> @@ -479,7 +479,7 @@ handle_info({connack, ConnAck}, State) -> return(enqueue(ConnAck, State)); handle_info({close, Reason}, State) -> - ?TRACE("CLOSE", #{reason => Reason}, "force_socket_close"), + ?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}), return(enqueue({close, Reason}, State)); handle_info({event, connected}, State = #state{channel = Channel}) -> @@ -662,7 +662,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> handle_incoming(Packet, State = #state{listener = {Type, Listener}}) when is_record(Packet, mqtt_packet) -> - ?TRACE("RECV", #{transport => websocket}, Packet), + ?TRACE("WS-MQTT", "mqtt_packet_received", #{packet => Packet}), ok = inc_incoming_stats(Packet), NState = case emqx_pd:get_counter(incoming_pubs) > get_active_n(Type, Listener) of @@ -726,7 +726,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped'), <<>>; - Data -> ?TRACE("SEND", #{transport => websocket}, Packet), + Data -> ?TRACE("WS-MQTT", "mqtt_packet_sent", #{packet => Packet}), ok = inc_outgoing_stats(Packet), Data catch diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index ad74faf99..66b05c95a 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -74,9 +74,19 @@ t_base_test(_Config) -> ?assertEqual(node(), maps:get(initiator, Query)), ?assert(maps:is_key(created_at, Query)), ?assertEqual(ok, receive_msg(3, test)), + ?assertEqual({ok, 2, ok}, emqx_cluster_rpc:multicall(M, F, A)), {atomic, Status} = emqx_cluster_rpc:status(), - ?assertEqual(3, length(Status)), - ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 1 end, Status)), + case length(Status) =:= 3 of + true -> ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status)); + false -> + %% wait for mnesia to write in. + ct:sleep(42), + {atomic, Status1} = emqx_cluster_rpc:status(), + ct:pal("status: ~p", Status), + ct:pal("status1: ~p", Status1), + ?assertEqual(3, length(Status1)), + ?assert(lists:all(fun(I) -> maps:get(tnx_id, I) =:= 2 end, Status)) + end, ok. t_commit_fail_test(_Config) -> diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index b54d87f12..7d7771503 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -199,8 +199,8 @@ on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) -> on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State); on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery, #{pool_name := PoolName, base_path := BasePath} = State) -> - ?TRACE("QUERY", #{request => Request, connector => InstId, state => State}, - "http connector received request"), + ?TRACE("QUERY", "http_connector_received", + #{request => Request, connector => InstId, state => State}), NRequest = update_path(BasePath, Request), case Result = ehttpc:request(case KeyOrNum of undefined -> PoolName; diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 8aa1f9319..97e963f18 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -87,16 +87,15 @@ on_stop(InstId, #{poolname := PoolName}) -> on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) -> Request = {Base, Filter, Attributes}, - ?TRACE("QUERY", #{request => Request, connector => InstId, state => State}, - "ldap connector received request"), + ?TRACE("QUERY", "ldap_connector_received", + #{request => Request, connector => InstId, state => State}), case Result = ecpool:pick_and_do( PoolName, {?MODULE, search, [Base, Filter, Attributes]}, no_handover) of {error, Reason} -> - ?SLOG(error, #{msg => "ldap connector do request failed", - request => Request, connector => InstId, - reason => Reason}), + ?SLOG(error, #{msg => "ldap_connector_do_request_failed", + request => Request, connector => InstId, reason => Reason}), emqx_resource:query_failed(AfterQuery); _ -> emqx_resource:query_success(AfterQuery) diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index d2594ab93..eacb3ec2d 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -137,13 +137,13 @@ on_query(InstId, AfterQuery, #{poolname := PoolName} = State) -> Request = {Action, Collection, Selector, Docs}, - ?TRACE("QUERY", #{request => Request, connector => InstId, state => State}, - "mongodb connector received request"), + ?TRACE("QUERY", "mongodb_connector_received", + #{request => Request, connector => InstId, state => State}), case ecpool:pick_and_do(PoolName, {?MODULE, mongo_query, [Action, Collection, Selector, Docs]}, no_handover) of {error, Reason} -> - ?SLOG(error, #{msg => "mongodb connector do query failed", + ?SLOG(error, #{msg => "mongodb_connector_do_query_failed", request => Request, reason => Reason, connector => InstId}), emqx_resource:query_failed(AfterQuery), diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 6d620cc14..27d001806 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -118,7 +118,7 @@ on_message_received(Msg, HookPoint) -> %% =================================================================== on_start(InstId, Conf) -> InstanceId = binary_to_atom(InstId, utf8), - ?SLOG(info, #{msg => "starting mqtt connector", + ?SLOG(info, #{msg => "starting_mqtt_connector", connector => InstanceId, config => Conf}), BasicConf = basic_config(Conf), BridgeConf = BasicConf#{ @@ -139,19 +139,18 @@ on_start(InstId, Conf) -> end. on_stop(_InstId, #{name := InstanceId}) -> - ?SLOG(info, #{msg => "stopping mqtt connector", + ?SLOG(info, #{msg => "stopping_mqtt_connector", connector => InstanceId}), case ?MODULE:drop_bridge(InstanceId) of ok -> ok; {error, not_found} -> ok; {error, Reason} -> - ?SLOG(error, #{msg => "stop mqtt connector", + ?SLOG(error, #{msg => "stop_mqtt_connector", connector => InstanceId, reason => Reason}) end. on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) -> - ?TRACE("QUERY", #{message => Msg, connector => InstanceId}, - "send msg to remote node"), + ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), emqx_resource:query_success(AfterQuery). diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index def3904b4..ae8239936 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -85,14 +85,13 @@ on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) -> on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) -> on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State); on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) -> - ?TRACE("QUERY", #{connector => InstId, sql => SQL, state => State}, - "mysql connector received sql query"), + ?TRACE("QUERY", "mysql_connector_received", #{connector => InstId, sql => SQL, state => State}), case Result = ecpool:pick_and_do( PoolName, {mysql, query, [SQL, Params, Timeout]}, no_handover) of {error, Reason} -> - ?SLOG(error, #{msg => "mysql connector do sql query failed", + ?SLOG(error, #{msg => "mysql_connector_do_sql_query_failed", connector => InstId, sql => SQL, reason => Reason}), emqx_resource:query_failed(AfterQuery); _ -> diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index ac864a45d..9b6f559b4 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -83,12 +83,12 @@ on_stop(InstId, #{poolname := PoolName}) -> on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) -> on_query(InstId, {sql, SQL, []}, AfterQuery, State); on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> - ?TRACE("QUERY", #{connector => InstId, sql => SQL, state => State}, - "postgresql connector received sql query"), + ?TRACE("QUERY", "postgresql_connector_received", + #{connector => InstId, sql => SQL, state => State}), case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of {error, Reason} -> ?SLOG(error, #{ - msg => "postgresql connector do sql query failed", + msg => "postgresql_connector_do_sql_query_failed", connector => InstId, sql => SQL, reason => Reason}), emqx_resource:query_failed(AfterQuery); _ -> diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 61b716b8b..94f4eca3e 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -125,15 +125,15 @@ on_stop(InstId, #{poolname := PoolName}) -> emqx_plugin_libs_pool:stop_pool(PoolName). on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) -> - ?TRACE("QUERY", #{connector => InstId, sql => Command, state => State}, - "redis connector received cmd query"), + ?TRACE("QUERY", "redis_connector_received", + #{connector => InstId, sql => Command, state => State}), Result = case Type of cluster -> eredis_cluster:q(PoolName, Command); _ -> ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover) end, case Result of {error, Reason} -> - ?SLOG(error, #{msg => "redis connector do cmd query failed", + ?SLOG(error, #{msg => "redis_connector_do_cmd_query_failed", connector => InstId, sql => Command, reason => Reason}), emqx_resource:query_failed(AfterCommand); _ -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl index 70aa68cf5..d02f62d70 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl @@ -85,7 +85,7 @@ republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}}, Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), - ?TRACE("REPUBLISH", #{topic => Topic, payload => Payload}, "republish message"), + ?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}), safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload); %% in case this is a "$events/" event @@ -99,7 +99,7 @@ republish(Selected, #{metadata := #{rule_id := RuleId}}, Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), - ?TRACE("REPUBLISH", #{topic => Topic, payload => Payload}, "republish"), + ?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}), safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload). %%-------------------------------------------------------------------- diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index c61296d87..60a7cbaad 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -248,7 +248,7 @@ handle_output(OutId, Selected, Envs) -> end. do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) -> - ?TRACE("SEND", #{bridge_id => BridgeId}, "output to bridge"), + ?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}), emqx_bridge:send_message(BridgeId, Selected); do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) -> Mod:Func(Selected, Envs, Args). diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 7a9da25a2..cd4d0ce6b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -77,7 +77,7 @@ flatten([D1 | L]) when is_list(D1) -> D1 ++ flatten(L). echo_action(Data, Envs) -> - ?TRACE("TEST", #{data => Data, envs => Envs}, "testing_rule_sql_ok"), + ?TRACE("RULE", "testing_rule_sql_ok", #{data => Data, envs => Envs}), Data. fill_default_values(Event, Context) ->