diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index 42d598ef9..ddd4349bb 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -59,15 +59,19 @@ %% structured logging -define(SLOG(Level, Data), - %% check 'allow' here, only evaluate Data when necessary - case logger:allow(Level, ?MODULE) of - true -> - logger:log(Level, (Data), #{ mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} - , line => ?LINE - }); - false -> - ok - end). + ?SLOG(Level, Data, #{})). + +%% structured logging, meta is for handler's filter. +-define(SLOG(Level, Data, Meta), +%% check 'allow' here, only evaluate Data when necessary + case logger:allow(Level, ?MODULE) of + true -> + logger:log(Level, (Data), Meta#{ mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} + , line => ?LINE + }); + false -> + ok + end). -define(TRACE(Event, Msg, Meta), emqx_trace:log(Event, Msg, Meta)). diff --git a/apps/emqx/src/emqx_authentication_config.erl b/apps/emqx/src/emqx_authentication_config.erl index 795dd060e..9767a2265 100644 --- a/apps/emqx/src/emqx_authentication_config.erl +++ b/apps/emqx/src/emqx_authentication_config.erl @@ -187,7 +187,7 @@ convert_certs(CertsDir, Config) -> {ok, SSL} -> new_ssl_config(Config, SSL); {error, Reason} -> - ?SLOG(error, Reason#{msg => bad_ssl_config}), + ?SLOG(error, Reason#{msg => "bad_ssl_config"}), throw({bad_ssl_config, Reason}) end. @@ -199,7 +199,7 @@ convert_certs(CertsDir, NewConfig, OldConfig) -> ok = emqx_tls_lib:delete_ssl_files(CertsDir, NewSSL1, OldSSL), new_ssl_config(NewConfig, NewSSL1); {error, Reason} -> - ?SLOG(error, Reason#{msg => bad_ssl_config}), + ?SLOG(error, Reason#{msg => "bad_ssl_config"}), throw({bad_ssl_config, Reason}) end. diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index dec753fc2..4085b6130 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -204,8 +204,9 @@ publish(Msg) when is_record(Msg, message) -> _ = emqx_trace:publish(Msg), emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'), case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of - #message{headers = #{allow_publish := false}} -> - ?TRACE("MQTT", "msg_publish_not_allowed", #{message => emqx_message:to_log_map(Msg)}), + #message{headers = #{allow_publish := false}, topic = Topic} -> + Message = emqx_message:to_log_map(Msg), + ?TRACE("MQTT", "msg_publish_not_allowed", #{message => Message, topic => Topic}), []; Msg1 = #message{topic = Topic} -> emqx_persistent_session:persist_message(Msg1), @@ -225,7 +226,9 @@ safe_publish(Msg) when is_record(Msg, message) -> reason => Reason, payload => emqx_message:to_log_map(Msg), stacktrace => Stk - }), + }, + #{topic => Msg#message.topic} + ), [] end. @@ -279,7 +282,7 @@ forward(Node, To, Delivery, async) -> msg => "async_forward_msg_to_node_failed", node => Node, reason => Reason - }), + }, #{topic => To}), {error, badrpc} end; @@ -290,7 +293,7 @@ forward(Node, To, Delivery, sync) -> msg => "sync_forward_msg_to_node_failed", node => Node, reason => Reason - }), + }, #{topic => To}), {error, badrpc}; Result -> emqx_metrics:inc('messages.forward'), Result diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 0f83b04ff..d0cb07a68 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -552,7 +552,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> msg => "cannot_publish_to_topic", topic => Topic, reason => emqx_reason_codes:name(Rc) - }), + }, #{topic => Topic}), case emqx:get_config([authorization, deny_action], ignore) of ignore -> case QoS of @@ -570,7 +570,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> msg => "cannot_publish_to_topic", topic => Topic, reason => emqx_reason_codes:name(Rc) - }), + }, #{topic => Topic}), case QoS of ?QOS_0 -> ok = emqx_metrics:inc('packets.publish.dropped'), @@ -585,7 +585,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> msg => "cannot_publish_to_topic", topic => Topic, reason => emqx_reason_codes:name(Rc) - }), + }, #{topic => Topic}), handle_out(disconnect, Rc, NChannel) end. @@ -635,7 +635,7 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, msg => "dropped_qos2_packet", reason => emqx_reason_codes:name(RC), packet_id => PacketId - }), + }, #{topic => Msg#message.topic}), ok = emqx_metrics:inc('packets.publish.dropped'), handle_out(pubrec, {PacketId, RC}, Channel) end. @@ -687,7 +687,7 @@ process_subscribe([Topic = {TopicFilter, SubOpts} | More], SubProps, Channel, Ac ?SLOG(warning, #{ msg => "cannot_subscribe_topic_filter", reason => emqx_reason_codes:name(ReasonCode) - }), + }, #{topic => TopicFilter}), process_subscribe(More, SubProps, Channel, [{Topic, ReasonCode} | Acc]) end. @@ -703,7 +703,7 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = ?SLOG(warning, #{ msg => "cannot_subscribe_topic_filter", reason => emqx_reason_codes:text(RC) - }), + }, #{topic => NTopicFilter}), {RC, Channel} end. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 162cff2e0..c44cfe15e 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -448,20 +448,23 @@ kick_session(Action, ClientId, ChanPid) -> , action => Action , error => Error , reason => Reason - }) + }, + #{clientid => unicode:characters_to_list(ClientId, utf8)}) end. kick_session(ClientId) -> case lookup_channels(ClientId) of [] -> ?SLOG(warning, #{msg => "kicked_an_unknown_session", - clientid => ClientId}), + clientid => ClientId}, + #{clientid => unicode:characters_to_list(ClientId, utf8)}), ok; ChanPids -> case length(ChanPids) > 1 of true -> ?SLOG(warning, #{msg => "more_than_one_channel_found", - chan_pids => ChanPids}); + chan_pids => ChanPids}, + #{clientid => unicode:characters_to_list(ClientId, utf8)}); false -> ok end, lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids) diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index f3e6e1366..35a6c048e 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -262,7 +262,7 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) -> {ok, RawRichConf} -> init_load(SchemaMod, RawRichConf); {error, Reason} -> - ?SLOG(error, #{msg => failed_to_load_hocon_conf, + ?SLOG(error, #{msg => "failed_to_load_hocon_conf", reason => Reason, include_dirs => IncDir }), @@ -396,7 +396,7 @@ save_to_override_conf(RawConf, Opts) -> case file:write_file(FileName, hocon_pp:do(RawConf, #{})) of ok -> ok; {error, Reason} -> - ?SLOG(error, #{msg => failed_to_write_override_file, + ?SLOG(error, #{msg => "failed_to_write_override_file", filename => FileName, reason => Reason}), {error, Reason} diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 37e15f522..e1dab3260 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -872,7 +872,7 @@ check_limiter(Needs, {ok, Limiter2} -> WhenOk(Data, Msgs, State#state{limiter = Limiter2}); {pause, Time, Limiter2} -> - ?SLOG(warning, #{msg => "pause time dueto rate limit", + ?SLOG(warning, #{msg => "pause_time_dueto_rate_limit", needs => Needs, time_in_ms => Time}), @@ -912,7 +912,7 @@ retry_limiter(#state{limiter = Limiter} = State) -> , limiter_timer = undefined }); {pause, Time, Limiter2} -> - ?SLOG(warning, #{msg => "pause time dueto rate limit", + ?SLOG(warning, #{msg => "pause_time_dueto_rate_limit", types => Types, time_in_ms => Time}), diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index 600144adc..cb3da361a 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -122,7 +122,7 @@ handle_cast({detected, #flapping{clientid = ClientId, peer_host => fmt_host(PeerHost), detect_cnt => DetectCnt, wind_time_in_ms => WindTime - }), + }, #{clientid => unicode:characters_to_list(ClientId, utf8)}), Now = erlang:system_time(second), Banned = #banned{who = {clientid, ClientId}, by = <<"flapping detector">>, @@ -138,7 +138,7 @@ handle_cast({detected, #flapping{clientid = ClientId, peer_host => fmt_host(PeerHost), detect_cnt => DetectCnt, interval => Interval - }) + }, #{clientid => unicode:characters_to_list(ClientId, utf8)}) end, {noreply, State}; diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index bf79085af..1695ed6ce 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -535,16 +535,20 @@ enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) -> (Dropped =/= undefined) andalso log_dropped(Dropped, Session), Session#session{mqueue = NewQ}. -log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) -> - case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of +log_dropped(Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) -> + Payload = emqx_message:to_log_map(Msg), + #{store_qos0 := StoreQos0} = QueueInfo = emqx_mqueue:info(Q), + case (QoS == ?QOS_0) andalso (not StoreQos0) of true -> ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), ?SLOG(warning, #{msg => "dropped_qos0_msg", - payload => emqx_message:to_log_map(Msg)}); + queue => QueueInfo, + payload => Payload}, #{topic => Topic}); false -> ok = emqx_metrics:inc('delivery.dropped.queue_full'), ?SLOG(warning, #{msg => "dropped_msg_due_to_mqueue_is_full", - payload => emqx_message:to_log_map(Msg)}) + queue => QueueInfo, + payload => Payload}, #{topic => Topic}) end. enrich_fun(Session = #session{subscriptions = Subs}) -> diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl index aaaedcb12..3d3722c32 100644 --- a/apps/emqx/src/emqx_session_router.erl +++ b/apps/emqx/src/emqx_session_router.erl @@ -260,7 +260,7 @@ code_change(_OldVsn, State, _Extra) -> init_resume_worker(RemotePid, SessionID, #{ pmon := Pmon } = State) -> case emqx_session_router_worker_sup:start_worker(SessionID, RemotePid) of {error, What} -> - ?SLOG(error, #{msg => "Could not start resume worker", reason => What}), + ?SLOG(error, #{msg => "failed_to_start_resume_worker", reason => What}), error; {ok, Pid} -> Pmon1 = emqx_pmon:monitor(Pid, Pmon), diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 67542a2bf..3869d300d 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -98,7 +98,11 @@ log(Event, Msg, Meta0) -> case persistent_term:get(?TRACE_FILTER, undefined) of undefined -> ok; List -> - Meta = maps:merge(logger:get_process_metadata(), Meta0), + Meta = + case logger:get_process_metadata() of + undefined -> Meta0; + ProcMeta -> maps:merge(ProcMeta, Meta0) + end, Log = #{level => trace, event => Event, meta => Meta, msg => Msg}, log_filter(List, Log) end. diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index f70d7ac6c..e2bdf6c72 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -549,7 +549,7 @@ check_limiter(Needs, {ok, Limiter2} -> WhenOk(Data, Msgs, State#state{limiter = Limiter2}); {pause, Time, Limiter2} -> - ?SLOG(warning, #{msg => "pause time dueto rate limit", + ?SLOG(warning, #{msg => "pause_time_due_to_rate_limit", needs => Needs, time_in_ms => Time}), @@ -585,7 +585,7 @@ retry_limiter(#state{limiter = Limiter} = State) -> , limiter_timer = undefined }); {pause, Time, Limiter2} -> - ?SLOG(warning, #{msg => "pause time dueto rate limit", + ?SLOG(warning, #{msg => "pause_time_due_to_rate_limit", types => Types, time_in_ms => Time}), diff --git a/apps/emqx_authn/test/emqx_authn_api_SUITE.erl b/apps/emqx_authn/test/emqx_authn_api_SUITE.erl index 885811fec..820eeb859 100644 --- a/apps/emqx_authn/test/emqx_authn_api_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_api_SUITE.erl @@ -43,7 +43,6 @@ groups() -> []. init_per_testcase(_, Config) -> - {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), emqx_authn_test_lib:delete_authenticators( [?CONF_NS_ATOM], ?GLOBAL), @@ -56,9 +55,8 @@ init_per_testcase(_, Config) -> Config. init_per_suite(Config) -> - _ = application:load(emqx_conf), ok = emqx_common_test_helpers:start_apps( - [emqx_authn, emqx_dashboard], + [emqx_conf, emqx_authn, emqx_dashboard], fun set_special_configs/1), ?AUTHN:delete_chain(?GLOBAL), diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index d4fc3df2d..a6681d3f1 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -214,7 +214,7 @@ update(Type, Name, {OldConf, Conf}) -> case recreate(Type, Name, Conf) of {ok, _} -> maybe_disable_bridge(Type, Name, Conf); {error, not_found} -> - ?SLOG(warning, #{ msg => "updating a non-exist bridge, create a new one" + ?SLOG(warning, #{ msg => "updating_a_non-exist_bridge_need_create_a_new_one" , type => Type, name => Name, config => Conf}), create(Type, Name, Conf); {error, Reason} -> {update_bridge_failed, Reason} @@ -242,7 +242,7 @@ create_dry_run(Type, Conf) -> end. remove(Type, Name, _Conf) -> - ?SLOG(info, #{msg => "remove bridge", type => Type, name => Name}), + ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}), case emqx_resource:remove_local(resource_id(Type, Name)) of ok -> ok; {error, not_found} -> ok; diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 7ebe7645b..514b9156a 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -236,7 +236,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> false -> RetryMs end; {aborted, Reason} -> - ?SLOG(error, #{msg => "read_next_mfa transaction failed", error => Reason}), + ?SLOG(error, #{msg => "read_next_mfa_transaction_failed", error => Reason}), RetryMs end. @@ -248,7 +248,7 @@ read_next_mfa(Node) -> TnxId = max(LatestId - 1, 0), commit(Node, TnxId), ?SLOG(notice, #{ - msg => "New node first catch up and start commit.", + msg => "new_node_first_catch_up_and_start_commit.", node => Node, tnx_id => TnxId}), TnxId; [#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1 @@ -277,7 +277,7 @@ do_catch_up(ToTnxId, Node) -> io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)", [Node, LastAppliedId, ToTnxId])), ?SLOG(error, #{ - msg => "catch up failed!", + msg => "catch_up_failed!", last_applied_id => LastAppliedId, to_tnx_id => ToTnxId }), diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index dec07f35c..b8a8c211d 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -144,7 +144,7 @@ multicall(M, F, Args) -> {retry, TnxId, Res, Nodes} -> %% The init MFA return ok, but other nodes failed. %% We return ok and alert an alarm. - ?SLOG(error, #{msg => "failed to update config in cluster", nodes => Nodes, + ?SLOG(error, #{msg => "failed_to_update_config_in_cluster", nodes => Nodes, tnx_id => TnxId, mfa => {M, F, Args}}), Res; {error, Error} -> %% all MFA return not ok or {ok, term()}. diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 7d7771503..f597a5c1d 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -143,7 +143,7 @@ on_start(InstId, #{base_url := #{scheme := Scheme, retry_interval := RetryInterval, pool_type := PoolType, pool_size := PoolSize} = Config) -> - ?SLOG(info, #{msg => "starting http connector", + ?SLOG(info, #{msg => "starting_http_connector", connector => InstId, config => Config}), {Transport, TransportOpts} = case Scheme of http -> @@ -181,13 +181,13 @@ on_start(InstId, #{base_url := #{scheme := Scheme, end. on_stop(InstId, #{pool_name := PoolName}) -> - ?SLOG(info, #{msg => "stopping http connector", + ?SLOG(info, #{msg => "stopping_http_connector", connector => InstId}), ehttpc_sup:stop_pool(PoolName). on_query(InstId, {send_message, Msg}, AfterQuery, State) -> case maps:get(request, State, undefined) of - undefined -> ?SLOG(error, #{msg => "request not found", connector => InstId}); + undefined -> ?SLOG(error, #{msg => "request_not_found", connector => InstId}); Request -> #{method := Method, path := Path, body := Body, headers := Headers, request_timeout := Timeout} = process_request(Request, Msg), @@ -207,7 +207,7 @@ on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery, _ -> {PoolName, KeyOrNum} end, Method, NRequest, Timeout) of {error, Reason} -> - ?SLOG(error, #{msg => "http connector do reqeust failed", + ?SLOG(error, #{msg => "http_connector_do_reqeust_failed", request => NRequest, reason => Reason, connector => InstId}), emqx_resource:query_failed(AfterQuery); diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 97e963f18..c188837ba 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -55,7 +55,7 @@ on_start(InstId, #{servers := Servers0, pool_size := PoolSize, auto_reconnect := AutoReconn, ssl := SSL} = Config) -> - ?SLOG(info, #{msg => "starting ldap connector", + ?SLOG(info, #{msg => "starting_ldap_connector", connector => InstId, config => Config}), Servers = [begin proplists:get_value(host, S) end || S <- Servers0], SslOpts = case maps:get(enable, SSL) of @@ -81,7 +81,7 @@ on_start(InstId, #{servers := Servers0, {ok, #{poolname => PoolName}}. on_stop(InstId, #{poolname := PoolName}) -> - ?SLOG(info, #{msg => "stopping ldap connector", + ?SLOG(info, #{msg => "stopping_ldap_connector", connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index eacb3ec2d..5321ea459 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -128,7 +128,7 @@ on_start(InstId, Config = #{mongo_type := Type, {ok, #{poolname => PoolName, type => Type}}. on_stop(InstId, #{poolname := PoolName}) -> - ?SLOG(info, #{msg => "stopping mongodb connector", + ?SLOG(info, #{msg => "stopping_mongodb_connector", connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index ae8239936..265a6a01e 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -56,7 +56,7 @@ on_start(InstId, #{server := {Host, Port}, auto_reconnect := AutoReconn, pool_size := PoolSize, ssl := SSL } = Config) -> - ?SLOG(info, #{msg => "starting mysql connector", + ?SLOG(info, #{msg => "starting_mysql_connector", connector => InstId, config => Config}), SslOpts = case maps:get(enable, SSL) of true -> @@ -76,7 +76,7 @@ on_start(InstId, #{server := {Host, Port}, {ok, #{poolname => PoolName}}. on_stop(InstId, #{poolname := PoolName}) -> - ?SLOG(info, #{msg => "stopping mysql connector", + ?SLOG(info, #{msg => "stopping_mysql_connector", connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 9b6f559b4..36bff3386 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -56,7 +56,7 @@ on_start(InstId, #{server := {Host, Port}, auto_reconnect := AutoReconn, pool_size := PoolSize, ssl := SSL } = Config) -> - ?SLOG(info, #{msg => "starting postgresql connector", + ?SLOG(info, #{msg => "starting_postgresql_connector", connector => InstId, config => Config}), SslOpts = case maps:get(enable, SSL) of true -> diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 94f4eca3e..48001ca25 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -87,7 +87,7 @@ on_start(InstId, #{redis_type := Type, pool_size := PoolSize, auto_reconnect := AutoReconn, ssl := SSL } = Config) -> - ?SLOG(info, #{msg => "starting redis connector", + ?SLOG(info, #{msg => "starting_redis_connector", connector => InstId, config => Config}), Servers = case Type of single -> [{servers, [maps:get(server, Config)]}]; @@ -120,7 +120,7 @@ on_start(InstId, #{redis_type := Type, {ok, #{poolname => PoolName, type => Type}}. on_stop(InstId, #{poolname := PoolName}) -> - ?SLOG(info, #{msg => "stopping redis connector", + ?SLOG(info, #{msg => "stopping_redis_connector", connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 7d5bb1283..3ab410391 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -158,15 +158,15 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, Parent) RC =:= ?RC_NO_MATCHING_SUBSCRIBERS -> Parent ! {batch_ack, PktId}, ok; handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) -> - ?SLOG(warning, #{msg => "publish to remote node falied", + ?SLOG(warning, #{msg => "publish_to_remote_node_falied", packet_id => PktId, reason_code => RC}). handle_publish(Msg, undefined) -> - ?SLOG(error, #{msg => "cannot publish to local broker as" - " 'ingress' is not configured", + ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as" + "_'ingress'_is_not_configured", message => Msg}); handle_publish(Msg, Vars) -> - ?SLOG(debug, #{msg => "publish to local broker", + ?SLOG(debug, #{msg => "publish_to_local_broker", message => Msg, vars => Vars}), emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1), case Vars of diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 5f6f4b69f..e0d5a2d77 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -188,7 +188,7 @@ callback_mode() -> [state_functions]. %% @doc Config should be a map(). init(#{name := Name} = ConnectOpts) -> - ?SLOG(debug, #{msg => "starting bridge worker", + ?SLOG(debug, #{msg => "starting_bridge_worker", name => Name}), erlang:process_flag(trap_exit, true), Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})), @@ -335,7 +335,7 @@ common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) -> NewQ = replayq:append(Q, [Msg]), {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}}; common(StateName, Type, Content, #{name := Name} = State) -> - ?SLOG(notice, #{msg => "Bridge discarded event", + ?SLOG(notice, #{msg => "bridge_discarded_event", name => Name, type => Type, state_name => StateName, content => Content}), {keep_state, State}. @@ -349,7 +349,7 @@ do_connect(#{connect_opts := ConnectOpts, {ok, State#{connection => Conn}}; {error, Reason} -> ConnectOpts1 = obfuscate(ConnectOpts), - ?SLOG(error, #{msg => "Failed to connect", + ?SLOG(error, #{msg => "failed_to_connect", config => ConnectOpts1, reason => Reason}), {error, Reason, State} end. @@ -386,8 +386,8 @@ pop_and_send_loop(#{replayq := Q} = State, N) -> end. do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) -> - ?SLOG(error, #{msg => "cannot forward messages to remote broker" - " as 'egress' is not configured", + ?SLOG(error, #{msg => "cannot_forward_messages_to_remote_broker" + "_as_'egress'_is_not_configured", messages => Msg}); do_send(#{inflight := Inflight, connection := Connection, @@ -398,7 +398,7 @@ do_send(#{inflight := Inflight, emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'), emqx_connector_mqtt_msg:to_remote_msg(Message, Vars) end, - ?SLOG(debug, #{msg => "publish to remote broker", + ?SLOG(debug, #{msg => "publish_to_remote_broker", message => Msg, vars => Vars}), case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(Msg)]) of {ok, Refs} ->