From 11aaa7b07d8b7e69bfb6f5c361d6d950caec20b2 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 5 Jul 2024 17:55:48 +0200 Subject: [PATCH] fix: make MQTT connector error log messages easier to understand Fixes: https://emqx.atlassian.net/browse/EMQX-12555 https://emqx.atlassian.net/browse/EMQX-12657 --- .../src/emqx_bridge_mqtt_connector.erl | 43 +++++++++++-- .../test/emqx_bridge_mqtt_SUITE.erl | 62 +++++++++++++++++++ 2 files changed, 100 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 118542356..358856c35 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -514,15 +514,48 @@ connect(Pid, Name) -> {ok, Pid}; {error, Reason} = Error -> IsDryRun = emqx_resource:is_dry_run(Name), - ?SLOG(?LOG_LEVEL(IsDryRun), #{ - msg => "ingress_client_connect_failed", - reason => Reason, - resource_id => Name - }), + log_connect_error_reason(?LOG_LEVEL(IsDryRun), Reason, Name), _ = catch emqtt:stop(Pid), Error end. +log_connect_error_reason(Level, {tcp_closed, _} = Reason, Name) -> + ?tp(emqx_bridge_mqtt_connector_tcp_closed, #{}), + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name, + explain => + << + "Your MQTT connection attempt was unsuccessful. " + "It might be at its maximum capacity for handling new connections. " + "To diagnose the issue further, you can check the server logs for " + "any specific messages related to the unavailability or connection limits." + >> + }); +log_connect_error_reason(Level, econnrefused = Reason, Name) -> + ?tp(emqx_bridge_mqtt_connector_econnrefused_error, #{}), + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name, + explain => + << + "This error indicates that your connection attempt to the MQTT server was rejected. " + "In simpler terms, the server you tried to connect to refused your request. " + "There can be multiple reasons for this. " + "For example, the MQTT server you're trying to connect to might be down or not " + "running at all or you might have provided the wrong address " + "or port number for the server." + >> + }); +log_connect_error_reason(Level, Reason, Name) -> + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name + }). + handle_disconnect(_Reason) -> ok. diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index a220eb9f7..574851f70 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -27,6 +27,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + %% output functions -export([inspect/3]). @@ -399,6 +401,56 @@ t_mqtt_conn_bridge_ingress_shared_subscription(_) -> {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), ok. +t_connect_with_more_clients_than_the_broker_accepts(_) -> + PoolSize = 100, + OrgConf = emqx_mgmt_listeners_conf:get_raw(tcp, default), + on_exit(fun() -> + emqx_mgmt_listeners_conf:update(tcp, default, OrgConf) + end), + NewConf = OrgConf#{<<"max_connections">> => 3}, + {ok, _} = emqx_mgmt_listeners_conf:update(tcp, default, NewConf), + BridgeName = atom_to_binary(?FUNCTION_NAME), + BridgeID = create_bridge( + ?SERVER_CONF#{ + <<"name">> => BridgeName, + <<"ingress">> => #{ + <<"pool_size">> => PoolSize, + <<"remote">> => #{ + <<"topic">> => <<"$share/ingress/", ?INGRESS_REMOTE_TOPIC, "/#">>, + <<"qos">> => 1 + }, + <<"local">> => #{ + <<"topic">> => <>, + <<"qos">> => <<"${qos}">>, + <<"payload">> => <<"${clientid}">>, + <<"retain">> => <<"${retain}">> + } + } + } + ), + snabbkaffe:block_until( + fun + (#{msg := emqx_bridge_mqtt_connector_tcp_closed}) -> + true; + (_) -> + false + end, + 5000 + ), + Trace = snabbkaffe:collect_trace(), + ?assert( + lists:any( + fun(K) -> + maps:get(msg, K, not_found) =:= + emqx_bridge_mqtt_connector_tcp_closed + end, + Trace + ) + ), + + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), + ok. + t_mqtt_egress_bridge_warns_clean_start(_) -> BridgeName = atom_to_binary(?FUNCTION_NAME), Action = fun() -> @@ -1050,6 +1102,16 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) -> Payload <- [integer_to_binary(I)] ], + Trace = snabbkaffe:collect_trace(50), + ?assert( + lists:any( + fun(K) -> + maps:get(msg, K, not_found) =:= + emqx_bridge_mqtt_connector_econnrefused_error + end, + Trace + ) + ), ok. start_publisher(Topic, Interval, CtrlPid) ->