diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 6b160f3b3..fdbbc5376 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -1154,7 +1154,7 @@ t_bridges_probe(Config) -> ?assertMatch( {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, - <<"message">> := <<"Connection refused">> + <<"message">> := <<"Connection refused", _/binary>> }}, request_json( post, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 118542356..6b9a40123 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -98,7 +98,7 @@ on_start(ResourceId, #{server := Server} = Conf) -> server => Server }}; {error, Reason} -> - {error, Reason} + {error, emqx_maybe:define(explain_error(Reason), Reason)} end. on_add_channel( @@ -200,7 +200,7 @@ on_get_channel_status( } = _State ) when is_map_key(ChannelId, Channels) -> %% The channel should be ok as long as the MQTT client is ok - connected. + ?status_connected. on_get_channels(ResId) -> emqx_bridge_v2:get_channels_for_connector(ResId). @@ -356,10 +356,15 @@ on_get_status(_ResourceId, State) -> Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)], try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of Statuses -> - combine_status(Statuses) + case combine_status(Statuses) of + {Status, Msg} -> + {Status, State, Msg}; + Status -> + Status + end catch exit:timeout -> - connecting + ?status_connecting end. get_status({_Pool, Worker}) -> @@ -367,7 +372,7 @@ get_status({_Pool, Worker}) -> {ok, Client} -> emqx_bridge_mqtt_ingress:status(Client); {error, _} -> - disconnected + ?status_disconnected end. combine_status(Statuses) -> @@ -375,11 +380,25 @@ combine_status(Statuses) -> %% Natural order of statuses: [connected, connecting, disconnected] %% * `disconnected` wins over any other status %% * `connecting` wins over `connected` - case lists:reverse(lists:usort(Statuses)) of + ToStatus = fun + ({S, _Reason}) -> S; + (S) when is_atom(S) -> S + end, + CompareFn = fun(S1A, S2A) -> + S1 = ToStatus(S1A), + S2 = ToStatus(S2A), + S1 > S2 + end, + case lists:usort(CompareFn, Statuses) of + [{Status, Reason} | _] -> + case explain_error(Reason) of + undefined -> Status; + Msg -> {Status, Msg} + end; [Status | _] -> Status; [] -> - disconnected + ?status_disconnected end. mk_ingress_config( @@ -514,15 +533,54 @@ connect(Pid, Name) -> {ok, Pid}; {error, Reason} = Error -> IsDryRun = emqx_resource:is_dry_run(Name), - ?SLOG(?LOG_LEVEL(IsDryRun), #{ - msg => "ingress_client_connect_failed", - reason => Reason, - resource_id => Name - }), + log_connect_error_reason(?LOG_LEVEL(IsDryRun), Reason, Name), _ = catch emqtt:stop(Pid), Error end. +log_connect_error_reason(Level, {tcp_closed, _} = Reason, Name) -> + ?tp(emqx_bridge_mqtt_connector_tcp_closed, #{}), + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name, + explain => explain_error(Reason) + }); +log_connect_error_reason(Level, econnrefused = Reason, Name) -> + ?tp(emqx_bridge_mqtt_connector_econnrefused_error, #{}), + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name, + explain => explain_error(Reason) + }); +log_connect_error_reason(Level, Reason, Name) -> + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name + }). + +explain_error(econnrefused) -> + << + "Connection refused. " + "This error indicates that your connection attempt to the MQTT server was rejected. " + "In simpler terms, the server you tried to connect to refused your request. " + "There can be multiple reasons for this. " + "For example, the MQTT server you're trying to connect to might be down or not " + "running at all or you might have provided the wrong address " + "or port number for the server." + >>; +explain_error({tcp_closed, _}) -> + << + "Your MQTT connection attempt was unsuccessful. " + "It might be at its maximum capacity for handling new connections. " + "To diagnose the issue further, you can check the server logs for " + "any specific messages related to the unavailability or connection limits." + >>; +explain_error(_Reason) -> + undefined. + handle_disconnect(_Reason) -> ok. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl index 1749d4194..35aea67a6 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl @@ -19,6 +19,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). %% management APIs -export([ @@ -234,13 +235,13 @@ status(Pid) -> try case proplists:get_value(socket, info(Pid)) of Socket when Socket /= undefined -> - connected; + ?status_connected; undefined -> - connecting + ?status_connecting end catch exit:{noproc, _} -> - disconnected + ?status_disconnected end. %% diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index a220eb9f7..42cf9d2b8 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -1025,31 +1025,39 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) -> ct:sleep(1000), %% stop the listener 1883 to make the bridge disconnected - ok = emqx_listeners:stop_listener('tcp:default'), - ct:sleep(1500), - ?assertMatch( - #{<<"status">> := Status} when - Status == <<"connecting">> orelse Status == <<"disconnected">>, - request_bridge(BridgeIDEgress) + ?check_trace( + begin + ok = emqx_listeners:stop_listener('tcp:default'), + ct:sleep(1500), + ?assertMatch( + #{<<"status">> := Status} when + Status == <<"connecting">> orelse Status == <<"disconnected">>, + request_bridge(BridgeIDEgress) + ), + + %% start the listener 1883 to make the bridge reconnected + ok = emqx_listeners:start_listener('tcp:default'), + timer:sleep(1500), + ?assertMatch( + #{<<"status">> := <<"connected">>}, + request_bridge(BridgeIDEgress) + ), + + N = stop_publisher(Publisher), + + %% all those messages should eventually be delivered + [ + assert_mqtt_msg_received(RemoteTopic, Payload) + || I <- lists:seq(1, N), + Payload <- [integer_to_binary(I)] + ], + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_econnrefused_error, Trace)), + ok + end ), - - %% start the listener 1883 to make the bridge reconnected - ok = emqx_listeners:start_listener('tcp:default'), - timer:sleep(1500), - ?assertMatch( - #{<<"status">> := <<"connected">>}, - request_bridge(BridgeIDEgress) - ), - - N = stop_publisher(Publisher), - - %% all those messages should eventually be delivered - [ - assert_mqtt_msg_received(RemoteTopic, Payload) - || I <- lists:seq(1, N), - Payload <- [integer_to_binary(I)] - ], - ok. start_publisher(Topic, Interval, CtrlPid) -> diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl index b9097b9c3..e0598fa1e 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl @@ -131,6 +131,9 @@ hookpoint(Config) -> BridgeId = bridge_id(Config), emqx_bridge_resource:bridge_hookpoint(BridgeId). +simplify_result(Res) -> + emqx_bridge_v2_testlib:simplify_result(Res). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -246,3 +249,46 @@ t_receive_via_rule(Config) -> end ), ok. + +t_connect_with_more_clients_than_the_broker_accepts(Config) -> + Name = ?config(connector_name, Config), + OrgConf = emqx_mgmt_listeners_conf:get_raw(tcp, default), + on_exit(fun() -> + emqx_mgmt_listeners_conf:update(tcp, default, OrgConf) + end), + NewConf = OrgConf#{<<"max_connections">> => 3}, + {ok, _} = emqx_mgmt_listeners_conf:update(tcp, default, NewConf), + ?check_trace( + #{timetrap => 10_000}, + begin + ?assertMatch( + {201, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := + <<"Your MQTT connection attempt was unsuccessful", _/binary>> + }}, + simplify_result( + emqx_bridge_v2_testlib:create_connector_api( + Config, + #{<<"pool_size">> => 100} + ) + ) + ), + ?block_until(#{?snk_kind := emqx_bridge_mqtt_connector_tcp_closed}), + ?assertMatch( + {200, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := + <<"Your MQTT connection attempt was unsuccessful", _/binary>> + }}, + simplify_result(emqx_bridge_v2_testlib:get_connector_api(mqtt, Name)) + ), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_tcp_closed, Trace)), + ok + end + ), + + ok. diff --git a/changes/ce/fix-13425.en.md b/changes/ce/fix-13425.en.md new file mode 100644 index 000000000..e02e99c0a --- /dev/null +++ b/changes/ce/fix-13425.en.md @@ -0,0 +1 @@ +The MQTT connector error log messages have been improved to provide clearer and more detailed information.