chore(log): add SLOG/3 to add meta info
This commit is contained in:
parent
8b5b3a448a
commit
121d906992
|
@ -59,15 +59,19 @@
|
|||
|
||||
%% structured logging
|
||||
-define(SLOG(Level, Data),
|
||||
%% check 'allow' here, only evaluate Data when necessary
|
||||
case logger:allow(Level, ?MODULE) of
|
||||
true ->
|
||||
logger:log(Level, (Data), #{ mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}
|
||||
, line => ?LINE
|
||||
});
|
||||
false ->
|
||||
ok
|
||||
end).
|
||||
?SLOG(Level, Data, #{})).
|
||||
|
||||
%% structured logging, meta is for handler's filter.
|
||||
-define(SLOG(Level, Data, Meta),
|
||||
%% check 'allow' here, only evaluate Data when necessary
|
||||
case logger:allow(Level, ?MODULE) of
|
||||
true ->
|
||||
logger:log(Level, (Data), Meta#{ mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}
|
||||
, line => ?LINE
|
||||
});
|
||||
false ->
|
||||
ok
|
||||
end).
|
||||
|
||||
-define(TRACE(Event, Msg, Meta), emqx_trace:log(Event, Msg, Meta)).
|
||||
|
||||
|
|
|
@ -187,7 +187,7 @@ convert_certs(CertsDir, Config) ->
|
|||
{ok, SSL} ->
|
||||
new_ssl_config(Config, SSL);
|
||||
{error, Reason} ->
|
||||
?SLOG(error, Reason#{msg => bad_ssl_config}),
|
||||
?SLOG(error, Reason#{msg => "bad_ssl_config"}),
|
||||
throw({bad_ssl_config, Reason})
|
||||
end.
|
||||
|
||||
|
@ -199,7 +199,7 @@ convert_certs(CertsDir, NewConfig, OldConfig) ->
|
|||
ok = emqx_tls_lib:delete_ssl_files(CertsDir, NewSSL1, OldSSL),
|
||||
new_ssl_config(NewConfig, NewSSL1);
|
||||
{error, Reason} ->
|
||||
?SLOG(error, Reason#{msg => bad_ssl_config}),
|
||||
?SLOG(error, Reason#{msg => "bad_ssl_config"}),
|
||||
throw({bad_ssl_config, Reason})
|
||||
end.
|
||||
|
||||
|
|
|
@ -204,8 +204,9 @@ publish(Msg) when is_record(Msg, message) ->
|
|||
_ = emqx_trace:publish(Msg),
|
||||
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
|
||||
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
|
||||
#message{headers = #{allow_publish := false}} ->
|
||||
?TRACE("MQTT", "msg_publish_not_allowed", #{message => emqx_message:to_log_map(Msg)}),
|
||||
#message{headers = #{allow_publish := false}, topic = Topic} ->
|
||||
Message = emqx_message:to_log_map(Msg),
|
||||
?TRACE("MQTT", "msg_publish_not_allowed", #{message => Message, topic => Topic}),
|
||||
[];
|
||||
Msg1 = #message{topic = Topic} ->
|
||||
emqx_persistent_session:persist_message(Msg1),
|
||||
|
@ -225,7 +226,9 @@ safe_publish(Msg) when is_record(Msg, message) ->
|
|||
reason => Reason,
|
||||
payload => emqx_message:to_log_map(Msg),
|
||||
stacktrace => Stk
|
||||
}),
|
||||
},
|
||||
#{topic => Msg#message.topic}
|
||||
),
|
||||
[]
|
||||
end.
|
||||
|
||||
|
@ -279,7 +282,7 @@ forward(Node, To, Delivery, async) ->
|
|||
msg => "async_forward_msg_to_node_failed",
|
||||
node => Node,
|
||||
reason => Reason
|
||||
}),
|
||||
}, #{topic => To}),
|
||||
{error, badrpc}
|
||||
end;
|
||||
|
||||
|
@ -290,7 +293,7 @@ forward(Node, To, Delivery, sync) ->
|
|||
msg => "sync_forward_msg_to_node_failed",
|
||||
node => Node,
|
||||
reason => Reason
|
||||
}),
|
||||
}, #{topic => To}),
|
||||
{error, badrpc};
|
||||
Result ->
|
||||
emqx_metrics:inc('messages.forward'), Result
|
||||
|
|
|
@ -552,7 +552,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
|
|||
msg => "cannot_publish_to_topic",
|
||||
topic => Topic,
|
||||
reason => emqx_reason_codes:name(Rc)
|
||||
}),
|
||||
}, #{topic => Topic}),
|
||||
case emqx:get_config([authorization, deny_action], ignore) of
|
||||
ignore ->
|
||||
case QoS of
|
||||
|
@ -570,7 +570,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
|
|||
msg => "cannot_publish_to_topic",
|
||||
topic => Topic,
|
||||
reason => emqx_reason_codes:name(Rc)
|
||||
}),
|
||||
}, #{topic => Topic}),
|
||||
case QoS of
|
||||
?QOS_0 ->
|
||||
ok = emqx_metrics:inc('packets.publish.dropped'),
|
||||
|
@ -585,7 +585,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
|
|||
msg => "cannot_publish_to_topic",
|
||||
topic => Topic,
|
||||
reason => emqx_reason_codes:name(Rc)
|
||||
}),
|
||||
}, #{topic => Topic}),
|
||||
handle_out(disconnect, Rc, NChannel)
|
||||
end.
|
||||
|
||||
|
@ -635,7 +635,7 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
|||
msg => "dropped_qos2_packet",
|
||||
reason => emqx_reason_codes:name(RC),
|
||||
packet_id => PacketId
|
||||
}),
|
||||
}, #{topic => Msg#message.topic}),
|
||||
ok = emqx_metrics:inc('packets.publish.dropped'),
|
||||
handle_out(pubrec, {PacketId, RC}, Channel)
|
||||
end.
|
||||
|
@ -687,7 +687,7 @@ process_subscribe([Topic = {TopicFilter, SubOpts} | More], SubProps, Channel, Ac
|
|||
?SLOG(warning, #{
|
||||
msg => "cannot_subscribe_topic_filter",
|
||||
reason => emqx_reason_codes:name(ReasonCode)
|
||||
}),
|
||||
}, #{topic => TopicFilter}),
|
||||
process_subscribe(More, SubProps, Channel, [{Topic, ReasonCode} | Acc])
|
||||
end.
|
||||
|
||||
|
@ -703,7 +703,7 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
|||
?SLOG(warning, #{
|
||||
msg => "cannot_subscribe_topic_filter",
|
||||
reason => emqx_reason_codes:text(RC)
|
||||
}),
|
||||
}, #{topic => NTopicFilter}),
|
||||
{RC, Channel}
|
||||
end.
|
||||
|
||||
|
|
|
@ -448,20 +448,23 @@ kick_session(Action, ClientId, ChanPid) ->
|
|||
, action => Action
|
||||
, error => Error
|
||||
, reason => Reason
|
||||
})
|
||||
},
|
||||
#{clientid => unicode:characters_to_list(ClientId, utf8)})
|
||||
end.
|
||||
|
||||
kick_session(ClientId) ->
|
||||
case lookup_channels(ClientId) of
|
||||
[] ->
|
||||
?SLOG(warning, #{msg => "kicked_an_unknown_session",
|
||||
clientid => ClientId}),
|
||||
clientid => ClientId},
|
||||
#{clientid => unicode:characters_to_list(ClientId, utf8)}),
|
||||
ok;
|
||||
ChanPids ->
|
||||
case length(ChanPids) > 1 of
|
||||
true ->
|
||||
?SLOG(warning, #{msg => "more_than_one_channel_found",
|
||||
chan_pids => ChanPids});
|
||||
chan_pids => ChanPids},
|
||||
#{clientid => unicode:characters_to_list(ClientId, utf8)});
|
||||
false -> ok
|
||||
end,
|
||||
lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids)
|
||||
|
|
|
@ -262,7 +262,7 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) ->
|
|||
{ok, RawRichConf} ->
|
||||
init_load(SchemaMod, RawRichConf);
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => failed_to_load_hocon_conf,
|
||||
?SLOG(error, #{msg => "failed_to_load_hocon_conf",
|
||||
reason => Reason,
|
||||
include_dirs => IncDir
|
||||
}),
|
||||
|
@ -396,7 +396,7 @@ save_to_override_conf(RawConf, Opts) ->
|
|||
case file:write_file(FileName, hocon_pp:do(RawConf, #{})) of
|
||||
ok -> ok;
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => failed_to_write_override_file,
|
||||
?SLOG(error, #{msg => "failed_to_write_override_file",
|
||||
filename => FileName,
|
||||
reason => Reason}),
|
||||
{error, Reason}
|
||||
|
|
|
@ -872,7 +872,7 @@ check_limiter(Needs,
|
|||
{ok, Limiter2} ->
|
||||
WhenOk(Data, Msgs, State#state{limiter = Limiter2});
|
||||
{pause, Time, Limiter2} ->
|
||||
?SLOG(warning, #{msg => "pause time dueto rate limit",
|
||||
?SLOG(warning, #{msg => "pause_time_dueto_rate_limit",
|
||||
needs => Needs,
|
||||
time_in_ms => Time}),
|
||||
|
||||
|
@ -912,7 +912,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
|
|||
, limiter_timer = undefined
|
||||
});
|
||||
{pause, Time, Limiter2} ->
|
||||
?SLOG(warning, #{msg => "pause time dueto rate limit",
|
||||
?SLOG(warning, #{msg => "pause_time_dueto_rate_limit",
|
||||
types => Types,
|
||||
time_in_ms => Time}),
|
||||
|
||||
|
|
|
@ -122,7 +122,7 @@ handle_cast({detected, #flapping{clientid = ClientId,
|
|||
peer_host => fmt_host(PeerHost),
|
||||
detect_cnt => DetectCnt,
|
||||
wind_time_in_ms => WindTime
|
||||
}),
|
||||
}, #{clientid => unicode:characters_to_list(ClientId, utf8)}),
|
||||
Now = erlang:system_time(second),
|
||||
Banned = #banned{who = {clientid, ClientId},
|
||||
by = <<"flapping detector">>,
|
||||
|
@ -138,7 +138,7 @@ handle_cast({detected, #flapping{clientid = ClientId,
|
|||
peer_host => fmt_host(PeerHost),
|
||||
detect_cnt => DetectCnt,
|
||||
interval => Interval
|
||||
})
|
||||
}, #{clientid => unicode:characters_to_list(ClientId, utf8)})
|
||||
end,
|
||||
{noreply, State};
|
||||
|
||||
|
|
|
@ -535,16 +535,20 @@ enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) ->
|
|||
(Dropped =/= undefined) andalso log_dropped(Dropped, Session),
|
||||
Session#session{mqueue = NewQ}.
|
||||
|
||||
log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
|
||||
case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of
|
||||
log_dropped(Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) ->
|
||||
Payload = emqx_message:to_log_map(Msg),
|
||||
#{store_qos0 := StoreQos0} = QueueInfo = emqx_mqueue:info(Q),
|
||||
case (QoS == ?QOS_0) andalso (not StoreQos0) of
|
||||
true ->
|
||||
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
|
||||
?SLOG(warning, #{msg => "dropped_qos0_msg",
|
||||
payload => emqx_message:to_log_map(Msg)});
|
||||
queue => QueueInfo,
|
||||
payload => Payload}, #{topic => Topic});
|
||||
false ->
|
||||
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
|
||||
?SLOG(warning, #{msg => "dropped_msg_due_to_mqueue_is_full",
|
||||
payload => emqx_message:to_log_map(Msg)})
|
||||
queue => QueueInfo,
|
||||
payload => Payload}, #{topic => Topic})
|
||||
end.
|
||||
|
||||
enrich_fun(Session = #session{subscriptions = Subs}) ->
|
||||
|
|
|
@ -260,7 +260,7 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
init_resume_worker(RemotePid, SessionID, #{ pmon := Pmon } = State) ->
|
||||
case emqx_session_router_worker_sup:start_worker(SessionID, RemotePid) of
|
||||
{error, What} ->
|
||||
?SLOG(error, #{msg => "Could not start resume worker", reason => What}),
|
||||
?SLOG(error, #{msg => "failed_to_start_resume_worker", reason => What}),
|
||||
error;
|
||||
{ok, Pid} ->
|
||||
Pmon1 = emqx_pmon:monitor(Pid, Pmon),
|
||||
|
|
|
@ -98,7 +98,11 @@ log(Event, Msg, Meta0) ->
|
|||
case persistent_term:get(?TRACE_FILTER, undefined) of
|
||||
undefined -> ok;
|
||||
List ->
|
||||
Meta = maps:merge(logger:get_process_metadata(), Meta0),
|
||||
Meta =
|
||||
case logger:get_process_metadata() of
|
||||
undefined -> Meta0;
|
||||
ProcMeta -> maps:merge(ProcMeta, Meta0)
|
||||
end,
|
||||
Log = #{level => trace, event => Event, meta => Meta, msg => Msg},
|
||||
log_filter(List, Log)
|
||||
end.
|
||||
|
|
|
@ -549,7 +549,7 @@ check_limiter(Needs,
|
|||
{ok, Limiter2} ->
|
||||
WhenOk(Data, Msgs, State#state{limiter = Limiter2});
|
||||
{pause, Time, Limiter2} ->
|
||||
?SLOG(warning, #{msg => "pause time dueto rate limit",
|
||||
?SLOG(warning, #{msg => "pause_time_due_to_rate_limit",
|
||||
needs => Needs,
|
||||
time_in_ms => Time}),
|
||||
|
||||
|
@ -585,7 +585,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
|
|||
, limiter_timer = undefined
|
||||
});
|
||||
{pause, Time, Limiter2} ->
|
||||
?SLOG(warning, #{msg => "pause time dueto rate limit",
|
||||
?SLOG(warning, #{msg => "pause_time_due_to_rate_limit",
|
||||
types => Types,
|
||||
time_in_ms => Time}),
|
||||
|
||||
|
|
|
@ -43,7 +43,6 @@ groups() ->
|
|||
[].
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
||||
emqx_authn_test_lib:delete_authenticators(
|
||||
[?CONF_NS_ATOM],
|
||||
?GLOBAL),
|
||||
|
@ -56,9 +55,8 @@ init_per_testcase(_, Config) ->
|
|||
Config.
|
||||
|
||||
init_per_suite(Config) ->
|
||||
_ = application:load(emqx_conf),
|
||||
ok = emqx_common_test_helpers:start_apps(
|
||||
[emqx_authn, emqx_dashboard],
|
||||
[emqx_conf, emqx_authn, emqx_dashboard],
|
||||
fun set_special_configs/1),
|
||||
|
||||
?AUTHN:delete_chain(?GLOBAL),
|
||||
|
|
|
@ -214,7 +214,7 @@ update(Type, Name, {OldConf, Conf}) ->
|
|||
case recreate(Type, Name, Conf) of
|
||||
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);
|
||||
{error, not_found} ->
|
||||
?SLOG(warning, #{ msg => "updating a non-exist bridge, create a new one"
|
||||
?SLOG(warning, #{ msg => "updating_a_non-exist_bridge_need_create_a_new_one"
|
||||
, type => Type, name => Name, config => Conf}),
|
||||
create(Type, Name, Conf);
|
||||
{error, Reason} -> {update_bridge_failed, Reason}
|
||||
|
@ -242,7 +242,7 @@ create_dry_run(Type, Conf) ->
|
|||
end.
|
||||
|
||||
remove(Type, Name, _Conf) ->
|
||||
?SLOG(info, #{msg => "remove bridge", type => Type, name => Name}),
|
||||
?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}),
|
||||
case emqx_resource:remove_local(resource_id(Type, Name)) of
|
||||
ok -> ok;
|
||||
{error, not_found} -> ok;
|
||||
|
|
|
@ -236,7 +236,7 @@ catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) ->
|
|||
false -> RetryMs
|
||||
end;
|
||||
{aborted, Reason} ->
|
||||
?SLOG(error, #{msg => "read_next_mfa transaction failed", error => Reason}),
|
||||
?SLOG(error, #{msg => "read_next_mfa_transaction_failed", error => Reason}),
|
||||
RetryMs
|
||||
end.
|
||||
|
||||
|
@ -248,7 +248,7 @@ read_next_mfa(Node) ->
|
|||
TnxId = max(LatestId - 1, 0),
|
||||
commit(Node, TnxId),
|
||||
?SLOG(notice, #{
|
||||
msg => "New node first catch up and start commit.",
|
||||
msg => "new_node_first_catch_up_and_start_commit.",
|
||||
node => Node, tnx_id => TnxId}),
|
||||
TnxId;
|
||||
[#cluster_rpc_commit{tnx_id = LastAppliedID}] -> LastAppliedID + 1
|
||||
|
@ -277,7 +277,7 @@ do_catch_up(ToTnxId, Node) ->
|
|||
io_lib:format("~p catch up failed by LastAppliedId(~p) > ToTnxId(~p)",
|
||||
[Node, LastAppliedId, ToTnxId])),
|
||||
?SLOG(error, #{
|
||||
msg => "catch up failed!",
|
||||
msg => "catch_up_failed!",
|
||||
last_applied_id => LastAppliedId,
|
||||
to_tnx_id => ToTnxId
|
||||
}),
|
||||
|
|
|
@ -144,7 +144,7 @@ multicall(M, F, Args) ->
|
|||
{retry, TnxId, Res, Nodes} ->
|
||||
%% The init MFA return ok, but other nodes failed.
|
||||
%% We return ok and alert an alarm.
|
||||
?SLOG(error, #{msg => "failed to update config in cluster", nodes => Nodes,
|
||||
?SLOG(error, #{msg => "failed_to_update_config_in_cluster", nodes => Nodes,
|
||||
tnx_id => TnxId, mfa => {M, F, Args}}),
|
||||
Res;
|
||||
{error, Error} -> %% all MFA return not ok or {ok, term()}.
|
||||
|
|
|
@ -143,7 +143,7 @@ on_start(InstId, #{base_url := #{scheme := Scheme,
|
|||
retry_interval := RetryInterval,
|
||||
pool_type := PoolType,
|
||||
pool_size := PoolSize} = Config) ->
|
||||
?SLOG(info, #{msg => "starting http connector",
|
||||
?SLOG(info, #{msg => "starting_http_connector",
|
||||
connector => InstId, config => Config}),
|
||||
{Transport, TransportOpts} = case Scheme of
|
||||
http ->
|
||||
|
@ -181,13 +181,13 @@ on_start(InstId, #{base_url := #{scheme := Scheme,
|
|||
end.
|
||||
|
||||
on_stop(InstId, #{pool_name := PoolName}) ->
|
||||
?SLOG(info, #{msg => "stopping http connector",
|
||||
?SLOG(info, #{msg => "stopping_http_connector",
|
||||
connector => InstId}),
|
||||
ehttpc_sup:stop_pool(PoolName).
|
||||
|
||||
on_query(InstId, {send_message, Msg}, AfterQuery, State) ->
|
||||
case maps:get(request, State, undefined) of
|
||||
undefined -> ?SLOG(error, #{msg => "request not found", connector => InstId});
|
||||
undefined -> ?SLOG(error, #{msg => "request_not_found", connector => InstId});
|
||||
Request ->
|
||||
#{method := Method, path := Path, body := Body, headers := Headers,
|
||||
request_timeout := Timeout} = process_request(Request, Msg),
|
||||
|
@ -207,7 +207,7 @@ on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery,
|
|||
_ -> {PoolName, KeyOrNum}
|
||||
end, Method, NRequest, Timeout) of
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "http connector do reqeust failed",
|
||||
?SLOG(error, #{msg => "http_connector_do_reqeust_failed",
|
||||
request => NRequest, reason => Reason,
|
||||
connector => InstId}),
|
||||
emqx_resource:query_failed(AfterQuery);
|
||||
|
|
|
@ -55,7 +55,7 @@ on_start(InstId, #{servers := Servers0,
|
|||
pool_size := PoolSize,
|
||||
auto_reconnect := AutoReconn,
|
||||
ssl := SSL} = Config) ->
|
||||
?SLOG(info, #{msg => "starting ldap connector",
|
||||
?SLOG(info, #{msg => "starting_ldap_connector",
|
||||
connector => InstId, config => Config}),
|
||||
Servers = [begin proplists:get_value(host, S) end || S <- Servers0],
|
||||
SslOpts = case maps:get(enable, SSL) of
|
||||
|
@ -81,7 +81,7 @@ on_start(InstId, #{servers := Servers0,
|
|||
{ok, #{poolname => PoolName}}.
|
||||
|
||||
on_stop(InstId, #{poolname := PoolName}) ->
|
||||
?SLOG(info, #{msg => "stopping ldap connector",
|
||||
?SLOG(info, #{msg => "stopping_ldap_connector",
|
||||
connector => InstId}),
|
||||
emqx_plugin_libs_pool:stop_pool(PoolName).
|
||||
|
||||
|
|
|
@ -128,7 +128,7 @@ on_start(InstId, Config = #{mongo_type := Type,
|
|||
{ok, #{poolname => PoolName, type => Type}}.
|
||||
|
||||
on_stop(InstId, #{poolname := PoolName}) ->
|
||||
?SLOG(info, #{msg => "stopping mongodb connector",
|
||||
?SLOG(info, #{msg => "stopping_mongodb_connector",
|
||||
connector => InstId}),
|
||||
emqx_plugin_libs_pool:stop_pool(PoolName).
|
||||
|
||||
|
|
|
@ -56,7 +56,7 @@ on_start(InstId, #{server := {Host, Port},
|
|||
auto_reconnect := AutoReconn,
|
||||
pool_size := PoolSize,
|
||||
ssl := SSL } = Config) ->
|
||||
?SLOG(info, #{msg => "starting mysql connector",
|
||||
?SLOG(info, #{msg => "starting_mysql_connector",
|
||||
connector => InstId, config => Config}),
|
||||
SslOpts = case maps:get(enable, SSL) of
|
||||
true ->
|
||||
|
@ -76,7 +76,7 @@ on_start(InstId, #{server := {Host, Port},
|
|||
{ok, #{poolname => PoolName}}.
|
||||
|
||||
on_stop(InstId, #{poolname := PoolName}) ->
|
||||
?SLOG(info, #{msg => "stopping mysql connector",
|
||||
?SLOG(info, #{msg => "stopping_mysql_connector",
|
||||
connector => InstId}),
|
||||
emqx_plugin_libs_pool:stop_pool(PoolName).
|
||||
|
||||
|
|
|
@ -56,7 +56,7 @@ on_start(InstId, #{server := {Host, Port},
|
|||
auto_reconnect := AutoReconn,
|
||||
pool_size := PoolSize,
|
||||
ssl := SSL } = Config) ->
|
||||
?SLOG(info, #{msg => "starting postgresql connector",
|
||||
?SLOG(info, #{msg => "starting_postgresql_connector",
|
||||
connector => InstId, config => Config}),
|
||||
SslOpts = case maps:get(enable, SSL) of
|
||||
true ->
|
||||
|
|
|
@ -87,7 +87,7 @@ on_start(InstId, #{redis_type := Type,
|
|||
pool_size := PoolSize,
|
||||
auto_reconnect := AutoReconn,
|
||||
ssl := SSL } = Config) ->
|
||||
?SLOG(info, #{msg => "starting redis connector",
|
||||
?SLOG(info, #{msg => "starting_redis_connector",
|
||||
connector => InstId, config => Config}),
|
||||
Servers = case Type of
|
||||
single -> [{servers, [maps:get(server, Config)]}];
|
||||
|
@ -120,7 +120,7 @@ on_start(InstId, #{redis_type := Type,
|
|||
{ok, #{poolname => PoolName, type => Type}}.
|
||||
|
||||
on_stop(InstId, #{poolname := PoolName}) ->
|
||||
?SLOG(info, #{msg => "stopping redis connector",
|
||||
?SLOG(info, #{msg => "stopping_redis_connector",
|
||||
connector => InstId}),
|
||||
emqx_plugin_libs_pool:stop_pool(PoolName).
|
||||
|
||||
|
|
|
@ -158,15 +158,15 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, Parent)
|
|||
RC =:= ?RC_NO_MATCHING_SUBSCRIBERS ->
|
||||
Parent ! {batch_ack, PktId}, ok;
|
||||
handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
|
||||
?SLOG(warning, #{msg => "publish to remote node falied",
|
||||
?SLOG(warning, #{msg => "publish_to_remote_node_falied",
|
||||
packet_id => PktId, reason_code => RC}).
|
||||
|
||||
handle_publish(Msg, undefined) ->
|
||||
?SLOG(error, #{msg => "cannot publish to local broker as"
|
||||
" 'ingress' is not configured",
|
||||
?SLOG(error, #{msg => "cannot_publish_to_local_broker_as"
|
||||
"_'ingress'_is_not_configured",
|
||||
message => Msg});
|
||||
handle_publish(Msg, Vars) ->
|
||||
?SLOG(debug, #{msg => "publish to local broker",
|
||||
?SLOG(debug, #{msg => "publish_to_local_broker",
|
||||
message => Msg, vars => Vars}),
|
||||
emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1),
|
||||
case Vars of
|
||||
|
|
|
@ -188,7 +188,7 @@ callback_mode() -> [state_functions].
|
|||
|
||||
%% @doc Config should be a map().
|
||||
init(#{name := Name} = ConnectOpts) ->
|
||||
?SLOG(debug, #{msg => "starting bridge worker",
|
||||
?SLOG(debug, #{msg => "starting_bridge_worker",
|
||||
name => Name}),
|
||||
erlang:process_flag(trap_exit, true),
|
||||
Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})),
|
||||
|
@ -335,7 +335,7 @@ common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) ->
|
|||
NewQ = replayq:append(Q, [Msg]),
|
||||
{keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}};
|
||||
common(StateName, Type, Content, #{name := Name} = State) ->
|
||||
?SLOG(notice, #{msg => "Bridge discarded event",
|
||||
?SLOG(notice, #{msg => "bridge_discarded_event",
|
||||
name => Name, type => Type, state_name => StateName,
|
||||
content => Content}),
|
||||
{keep_state, State}.
|
||||
|
@ -349,7 +349,7 @@ do_connect(#{connect_opts := ConnectOpts,
|
|||
{ok, State#{connection => Conn}};
|
||||
{error, Reason} ->
|
||||
ConnectOpts1 = obfuscate(ConnectOpts),
|
||||
?SLOG(error, #{msg => "Failed to connect",
|
||||
?SLOG(error, #{msg => "failed_to_connect",
|
||||
config => ConnectOpts1, reason => Reason}),
|
||||
{error, Reason, State}
|
||||
end.
|
||||
|
@ -386,8 +386,8 @@ pop_and_send_loop(#{replayq := Q} = State, N) ->
|
|||
end.
|
||||
|
||||
do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) ->
|
||||
?SLOG(error, #{msg => "cannot forward messages to remote broker"
|
||||
" as 'egress' is not configured",
|
||||
?SLOG(error, #{msg => "cannot_forward_messages_to_remote_broker"
|
||||
"_as_'egress'_is_not_configured",
|
||||
messages => Msg});
|
||||
do_send(#{inflight := Inflight,
|
||||
connection := Connection,
|
||||
|
@ -398,7 +398,7 @@ do_send(#{inflight := Inflight,
|
|||
emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'),
|
||||
emqx_connector_mqtt_msg:to_remote_msg(Message, Vars)
|
||||
end,
|
||||
?SLOG(debug, #{msg => "publish to remote broker",
|
||||
?SLOG(debug, #{msg => "publish_to_remote_broker",
|
||||
message => Msg, vars => Vars}),
|
||||
case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(Msg)]) of
|
||||
{ok, Refs} ->
|
||||
|
|
Loading…
Reference in New Issue