Merge pull request #13425 from kjellwinblad/kjell/review_connector_error_logs_mqtt_etc/EMQX-12555/EMQX-12657
fix: make MQTT connector error log messages easier to understand
This commit is contained in:
commit
613fc644f5
|
@ -1154,7 +1154,7 @@ t_bridges_probe(Config) ->
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, 400, #{
|
{ok, 400, #{
|
||||||
<<"code">> := <<"TEST_FAILED">>,
|
<<"code">> := <<"TEST_FAILED">>,
|
||||||
<<"message">> := <<"Connection refused">>
|
<<"message">> := <<"Connection refused", _/binary>>
|
||||||
}},
|
}},
|
||||||
request_json(
|
request_json(
|
||||||
post,
|
post,
|
||||||
|
|
|
@ -98,7 +98,7 @@ on_start(ResourceId, #{server := Server} = Conf) ->
|
||||||
server => Server
|
server => Server
|
||||||
}};
|
}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason}
|
{error, emqx_maybe:define(explain_error(Reason), Reason)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_add_channel(
|
on_add_channel(
|
||||||
|
@ -200,7 +200,7 @@ on_get_channel_status(
|
||||||
} = _State
|
} = _State
|
||||||
) when is_map_key(ChannelId, Channels) ->
|
) when is_map_key(ChannelId, Channels) ->
|
||||||
%% The channel should be ok as long as the MQTT client is ok
|
%% The channel should be ok as long as the MQTT client is ok
|
||||||
connected.
|
?status_connected.
|
||||||
|
|
||||||
on_get_channels(ResId) ->
|
on_get_channels(ResId) ->
|
||||||
emqx_bridge_v2:get_channels_for_connector(ResId).
|
emqx_bridge_v2:get_channels_for_connector(ResId).
|
||||||
|
@ -356,10 +356,15 @@ on_get_status(_ResourceId, State) ->
|
||||||
Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)],
|
Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)],
|
||||||
try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of
|
try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of
|
||||||
Statuses ->
|
Statuses ->
|
||||||
combine_status(Statuses)
|
case combine_status(Statuses) of
|
||||||
|
{Status, Msg} ->
|
||||||
|
{Status, State, Msg};
|
||||||
|
Status ->
|
||||||
|
Status
|
||||||
|
end
|
||||||
catch
|
catch
|
||||||
exit:timeout ->
|
exit:timeout ->
|
||||||
connecting
|
?status_connecting
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_status({_Pool, Worker}) ->
|
get_status({_Pool, Worker}) ->
|
||||||
|
@ -367,7 +372,7 @@ get_status({_Pool, Worker}) ->
|
||||||
{ok, Client} ->
|
{ok, Client} ->
|
||||||
emqx_bridge_mqtt_ingress:status(Client);
|
emqx_bridge_mqtt_ingress:status(Client);
|
||||||
{error, _} ->
|
{error, _} ->
|
||||||
disconnected
|
?status_disconnected
|
||||||
end.
|
end.
|
||||||
|
|
||||||
combine_status(Statuses) ->
|
combine_status(Statuses) ->
|
||||||
|
@ -375,11 +380,25 @@ combine_status(Statuses) ->
|
||||||
%% Natural order of statuses: [connected, connecting, disconnected]
|
%% Natural order of statuses: [connected, connecting, disconnected]
|
||||||
%% * `disconnected` wins over any other status
|
%% * `disconnected` wins over any other status
|
||||||
%% * `connecting` wins over `connected`
|
%% * `connecting` wins over `connected`
|
||||||
case lists:reverse(lists:usort(Statuses)) of
|
ToStatus = fun
|
||||||
|
({S, _Reason}) -> S;
|
||||||
|
(S) when is_atom(S) -> S
|
||||||
|
end,
|
||||||
|
CompareFn = fun(S1A, S2A) ->
|
||||||
|
S1 = ToStatus(S1A),
|
||||||
|
S2 = ToStatus(S2A),
|
||||||
|
S1 > S2
|
||||||
|
end,
|
||||||
|
case lists:usort(CompareFn, Statuses) of
|
||||||
|
[{Status, Reason} | _] ->
|
||||||
|
case explain_error(Reason) of
|
||||||
|
undefined -> Status;
|
||||||
|
Msg -> {Status, Msg}
|
||||||
|
end;
|
||||||
[Status | _] ->
|
[Status | _] ->
|
||||||
Status;
|
Status;
|
||||||
[] ->
|
[] ->
|
||||||
disconnected
|
?status_disconnected
|
||||||
end.
|
end.
|
||||||
|
|
||||||
mk_ingress_config(
|
mk_ingress_config(
|
||||||
|
@ -514,15 +533,54 @@ connect(Pid, Name) ->
|
||||||
{ok, Pid};
|
{ok, Pid};
|
||||||
{error, Reason} = Error ->
|
{error, Reason} = Error ->
|
||||||
IsDryRun = emqx_resource:is_dry_run(Name),
|
IsDryRun = emqx_resource:is_dry_run(Name),
|
||||||
?SLOG(?LOG_LEVEL(IsDryRun), #{
|
log_connect_error_reason(?LOG_LEVEL(IsDryRun), Reason, Name),
|
||||||
msg => "ingress_client_connect_failed",
|
|
||||||
reason => Reason,
|
|
||||||
resource_id => Name
|
|
||||||
}),
|
|
||||||
_ = catch emqtt:stop(Pid),
|
_ = catch emqtt:stop(Pid),
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
log_connect_error_reason(Level, {tcp_closed, _} = Reason, Name) ->
|
||||||
|
?tp(emqx_bridge_mqtt_connector_tcp_closed, #{}),
|
||||||
|
?SLOG(Level, #{
|
||||||
|
msg => "ingress_client_connect_failed",
|
||||||
|
reason => Reason,
|
||||||
|
name => Name,
|
||||||
|
explain => explain_error(Reason)
|
||||||
|
});
|
||||||
|
log_connect_error_reason(Level, econnrefused = Reason, Name) ->
|
||||||
|
?tp(emqx_bridge_mqtt_connector_econnrefused_error, #{}),
|
||||||
|
?SLOG(Level, #{
|
||||||
|
msg => "ingress_client_connect_failed",
|
||||||
|
reason => Reason,
|
||||||
|
name => Name,
|
||||||
|
explain => explain_error(Reason)
|
||||||
|
});
|
||||||
|
log_connect_error_reason(Level, Reason, Name) ->
|
||||||
|
?SLOG(Level, #{
|
||||||
|
msg => "ingress_client_connect_failed",
|
||||||
|
reason => Reason,
|
||||||
|
name => Name
|
||||||
|
}).
|
||||||
|
|
||||||
|
explain_error(econnrefused) ->
|
||||||
|
<<
|
||||||
|
"Connection refused. "
|
||||||
|
"This error indicates that your connection attempt to the MQTT server was rejected. "
|
||||||
|
"In simpler terms, the server you tried to connect to refused your request. "
|
||||||
|
"There can be multiple reasons for this. "
|
||||||
|
"For example, the MQTT server you're trying to connect to might be down or not "
|
||||||
|
"running at all or you might have provided the wrong address "
|
||||||
|
"or port number for the server."
|
||||||
|
>>;
|
||||||
|
explain_error({tcp_closed, _}) ->
|
||||||
|
<<
|
||||||
|
"Your MQTT connection attempt was unsuccessful. "
|
||||||
|
"It might be at its maximum capacity for handling new connections. "
|
||||||
|
"To diagnose the issue further, you can check the server logs for "
|
||||||
|
"any specific messages related to the unavailability or connection limits."
|
||||||
|
>>;
|
||||||
|
explain_error(_Reason) ->
|
||||||
|
undefined.
|
||||||
|
|
||||||
handle_disconnect(_Reason) ->
|
handle_disconnect(_Reason) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||||
|
|
||||||
%% management APIs
|
%% management APIs
|
||||||
-export([
|
-export([
|
||||||
|
@ -234,13 +235,13 @@ status(Pid) ->
|
||||||
try
|
try
|
||||||
case proplists:get_value(socket, info(Pid)) of
|
case proplists:get_value(socket, info(Pid)) of
|
||||||
Socket when Socket /= undefined ->
|
Socket when Socket /= undefined ->
|
||||||
connected;
|
?status_connected;
|
||||||
undefined ->
|
undefined ->
|
||||||
connecting
|
?status_connecting
|
||||||
end
|
end
|
||||||
catch
|
catch
|
||||||
exit:{noproc, _} ->
|
exit:{noproc, _} ->
|
||||||
disconnected
|
?status_disconnected
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
|
@ -1025,31 +1025,39 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) ->
|
||||||
ct:sleep(1000),
|
ct:sleep(1000),
|
||||||
|
|
||||||
%% stop the listener 1883 to make the bridge disconnected
|
%% stop the listener 1883 to make the bridge disconnected
|
||||||
ok = emqx_listeners:stop_listener('tcp:default'),
|
?check_trace(
|
||||||
ct:sleep(1500),
|
begin
|
||||||
?assertMatch(
|
ok = emqx_listeners:stop_listener('tcp:default'),
|
||||||
#{<<"status">> := Status} when
|
ct:sleep(1500),
|
||||||
Status == <<"connecting">> orelse Status == <<"disconnected">>,
|
?assertMatch(
|
||||||
request_bridge(BridgeIDEgress)
|
#{<<"status">> := Status} when
|
||||||
|
Status == <<"connecting">> orelse Status == <<"disconnected">>,
|
||||||
|
request_bridge(BridgeIDEgress)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% start the listener 1883 to make the bridge reconnected
|
||||||
|
ok = emqx_listeners:start_listener('tcp:default'),
|
||||||
|
timer:sleep(1500),
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"status">> := <<"connected">>},
|
||||||
|
request_bridge(BridgeIDEgress)
|
||||||
|
),
|
||||||
|
|
||||||
|
N = stop_publisher(Publisher),
|
||||||
|
|
||||||
|
%% all those messages should eventually be delivered
|
||||||
|
[
|
||||||
|
assert_mqtt_msg_received(RemoteTopic, Payload)
|
||||||
|
|| I <- lists:seq(1, N),
|
||||||
|
Payload <- [integer_to_binary(I)]
|
||||||
|
],
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_econnrefused_error, Trace)),
|
||||||
|
ok
|
||||||
|
end
|
||||||
),
|
),
|
||||||
|
|
||||||
%% start the listener 1883 to make the bridge reconnected
|
|
||||||
ok = emqx_listeners:start_listener('tcp:default'),
|
|
||||||
timer:sleep(1500),
|
|
||||||
?assertMatch(
|
|
||||||
#{<<"status">> := <<"connected">>},
|
|
||||||
request_bridge(BridgeIDEgress)
|
|
||||||
),
|
|
||||||
|
|
||||||
N = stop_publisher(Publisher),
|
|
||||||
|
|
||||||
%% all those messages should eventually be delivered
|
|
||||||
[
|
|
||||||
assert_mqtt_msg_received(RemoteTopic, Payload)
|
|
||||||
|| I <- lists:seq(1, N),
|
|
||||||
Payload <- [integer_to_binary(I)]
|
|
||||||
],
|
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
start_publisher(Topic, Interval, CtrlPid) ->
|
start_publisher(Topic, Interval, CtrlPid) ->
|
||||||
|
|
|
@ -131,6 +131,9 @@ hookpoint(Config) ->
|
||||||
BridgeId = bridge_id(Config),
|
BridgeId = bridge_id(Config),
|
||||||
emqx_bridge_resource:bridge_hookpoint(BridgeId).
|
emqx_bridge_resource:bridge_hookpoint(BridgeId).
|
||||||
|
|
||||||
|
simplify_result(Res) ->
|
||||||
|
emqx_bridge_v2_testlib:simplify_result(Res).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Testcases
|
%% Testcases
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -246,3 +249,46 @@ t_receive_via_rule(Config) ->
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_connect_with_more_clients_than_the_broker_accepts(Config) ->
|
||||||
|
Name = ?config(connector_name, Config),
|
||||||
|
OrgConf = emqx_mgmt_listeners_conf:get_raw(tcp, default),
|
||||||
|
on_exit(fun() ->
|
||||||
|
emqx_mgmt_listeners_conf:update(tcp, default, OrgConf)
|
||||||
|
end),
|
||||||
|
NewConf = OrgConf#{<<"max_connections">> => 3},
|
||||||
|
{ok, _} = emqx_mgmt_listeners_conf:update(tcp, default, NewConf),
|
||||||
|
?check_trace(
|
||||||
|
#{timetrap => 10_000},
|
||||||
|
begin
|
||||||
|
?assertMatch(
|
||||||
|
{201, #{
|
||||||
|
<<"status">> := <<"disconnected">>,
|
||||||
|
<<"status_reason">> :=
|
||||||
|
<<"Your MQTT connection attempt was unsuccessful", _/binary>>
|
||||||
|
}},
|
||||||
|
simplify_result(
|
||||||
|
emqx_bridge_v2_testlib:create_connector_api(
|
||||||
|
Config,
|
||||||
|
#{<<"pool_size">> => 100}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?block_until(#{?snk_kind := emqx_bridge_mqtt_connector_tcp_closed}),
|
||||||
|
?assertMatch(
|
||||||
|
{200, #{
|
||||||
|
<<"status">> := <<"disconnected">>,
|
||||||
|
<<"status_reason">> :=
|
||||||
|
<<"Your MQTT connection attempt was unsuccessful", _/binary>>
|
||||||
|
}},
|
||||||
|
simplify_result(emqx_bridge_v2_testlib:get_connector_api(mqtt, Name))
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_tcp_closed, Trace)),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
The MQTT connector error log messages have been improved to provide clearer and more detailed information.
|
Loading…
Reference in New Issue