From 11aaa7b07d8b7e69bfb6f5c361d6d950caec20b2 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 5 Jul 2024 17:55:48 +0200 Subject: [PATCH 1/7] fix: make MQTT connector error log messages easier to understand Fixes: https://emqx.atlassian.net/browse/EMQX-12555 https://emqx.atlassian.net/browse/EMQX-12657 --- .../src/emqx_bridge_mqtt_connector.erl | 43 +++++++++++-- .../test/emqx_bridge_mqtt_SUITE.erl | 62 +++++++++++++++++++ 2 files changed, 100 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 118542356..358856c35 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -514,15 +514,48 @@ connect(Pid, Name) -> {ok, Pid}; {error, Reason} = Error -> IsDryRun = emqx_resource:is_dry_run(Name), - ?SLOG(?LOG_LEVEL(IsDryRun), #{ - msg => "ingress_client_connect_failed", - reason => Reason, - resource_id => Name - }), + log_connect_error_reason(?LOG_LEVEL(IsDryRun), Reason, Name), _ = catch emqtt:stop(Pid), Error end. +log_connect_error_reason(Level, {tcp_closed, _} = Reason, Name) -> + ?tp(emqx_bridge_mqtt_connector_tcp_closed, #{}), + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name, + explain => + << + "Your MQTT connection attempt was unsuccessful. " + "It might be at its maximum capacity for handling new connections. " + "To diagnose the issue further, you can check the server logs for " + "any specific messages related to the unavailability or connection limits." + >> + }); +log_connect_error_reason(Level, econnrefused = Reason, Name) -> + ?tp(emqx_bridge_mqtt_connector_econnrefused_error, #{}), + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name, + explain => + << + "This error indicates that your connection attempt to the MQTT server was rejected. " + "In simpler terms, the server you tried to connect to refused your request. " + "There can be multiple reasons for this. " + "For example, the MQTT server you're trying to connect to might be down or not " + "running at all or you might have provided the wrong address " + "or port number for the server." + >> + }); +log_connect_error_reason(Level, Reason, Name) -> + ?SLOG(Level, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name + }). + handle_disconnect(_Reason) -> ok. diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index a220eb9f7..574851f70 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -27,6 +27,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + %% output functions -export([inspect/3]). @@ -399,6 +401,56 @@ t_mqtt_conn_bridge_ingress_shared_subscription(_) -> {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), ok. +t_connect_with_more_clients_than_the_broker_accepts(_) -> + PoolSize = 100, + OrgConf = emqx_mgmt_listeners_conf:get_raw(tcp, default), + on_exit(fun() -> + emqx_mgmt_listeners_conf:update(tcp, default, OrgConf) + end), + NewConf = OrgConf#{<<"max_connections">> => 3}, + {ok, _} = emqx_mgmt_listeners_conf:update(tcp, default, NewConf), + BridgeName = atom_to_binary(?FUNCTION_NAME), + BridgeID = create_bridge( + ?SERVER_CONF#{ + <<"name">> => BridgeName, + <<"ingress">> => #{ + <<"pool_size">> => PoolSize, + <<"remote">> => #{ + <<"topic">> => <<"$share/ingress/", ?INGRESS_REMOTE_TOPIC, "/#">>, + <<"qos">> => 1 + }, + <<"local">> => #{ + <<"topic">> => <>, + <<"qos">> => <<"${qos}">>, + <<"payload">> => <<"${clientid}">>, + <<"retain">> => <<"${retain}">> + } + } + } + ), + snabbkaffe:block_until( + fun + (#{msg := emqx_bridge_mqtt_connector_tcp_closed}) -> + true; + (_) -> + false + end, + 5000 + ), + Trace = snabbkaffe:collect_trace(), + ?assert( + lists:any( + fun(K) -> + maps:get(msg, K, not_found) =:= + emqx_bridge_mqtt_connector_tcp_closed + end, + Trace + ) + ), + + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), + ok. + t_mqtt_egress_bridge_warns_clean_start(_) -> BridgeName = atom_to_binary(?FUNCTION_NAME), Action = fun() -> @@ -1050,6 +1102,16 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) -> Payload <- [integer_to_binary(I)] ], + Trace = snabbkaffe:collect_trace(50), + ?assert( + lists:any( + fun(K) -> + maps:get(msg, K, not_found) =:= + emqx_bridge_mqtt_connector_econnrefused_error + end, + Trace + ) + ), ok. start_publisher(Topic, Interval, CtrlPid) -> From ba2d4f3df3f723be4c42992f6321a72183be30f1 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 5 Jul 2024 18:00:33 +0200 Subject: [PATCH 2/7] docs: add change log entry --- changes/ce/fix-13425.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/fix-13425.en.md diff --git a/changes/ce/fix-13425.en.md b/changes/ce/fix-13425.en.md new file mode 100644 index 000000000..e02e99c0a --- /dev/null +++ b/changes/ce/fix-13425.en.md @@ -0,0 +1 @@ +The MQTT connector error log messages have been improved to provide clearer and more detailed information. From baf2b96cbc12adbf9a66fba7a9b05868e1a8f08f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 1 Aug 2024 14:27:25 -0300 Subject: [PATCH 3/7] test: refactor test structure --- .../test/emqx_bridge_mqtt_SUITE.erl | 62 ++++++++----------- 1 file changed, 27 insertions(+), 35 deletions(-) diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index 574851f70..438b1f601 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -410,45 +410,37 @@ t_connect_with_more_clients_than_the_broker_accepts(_) -> NewConf = OrgConf#{<<"max_connections">> => 3}, {ok, _} = emqx_mgmt_listeners_conf:update(tcp, default, NewConf), BridgeName = atom_to_binary(?FUNCTION_NAME), - BridgeID = create_bridge( - ?SERVER_CONF#{ - <<"name">> => BridgeName, - <<"ingress">> => #{ - <<"pool_size">> => PoolSize, - <<"remote">> => #{ - <<"topic">> => <<"$share/ingress/", ?INGRESS_REMOTE_TOPIC, "/#">>, - <<"qos">> => 1 - }, - <<"local">> => #{ - <<"topic">> => <>, - <<"qos">> => <<"${qos}">>, - <<"payload">> => <<"${clientid}">>, - <<"retain">> => <<"${retain}">> + ?check_trace( + #{timetrap => 10_000}, + begin + BridgeID = create_bridge( + ?SERVER_CONF#{ + <<"name">> => BridgeName, + <<"ingress">> => #{ + <<"pool_size">> => PoolSize, + <<"remote">> => #{ + <<"topic">> => <<"$share/ingress/", ?INGRESS_REMOTE_TOPIC, "/#">>, + <<"qos">> => 1 + }, + <<"local">> => #{ + <<"topic">> => <>, + <<"qos">> => <<"${qos}">>, + <<"payload">> => <<"${clientid}">>, + <<"retain">> => <<"${retain}">> + } + } } - } - } - ), - snabbkaffe:block_until( - fun - (#{msg := emqx_bridge_mqtt_connector_tcp_closed}) -> - true; - (_) -> - false + ), + ?block_until(#{?snk_kind := emqx_bridge_mqtt_connector_tcp_closed}), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), + ok end, - 5000 - ), - Trace = snabbkaffe:collect_trace(), - ?assert( - lists:any( - fun(K) -> - maps:get(msg, K, not_found) =:= - emqx_bridge_mqtt_connector_tcp_closed - end, - Trace - ) + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_tcp_closed, Trace)), + ok + end ), - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), ok. t_mqtt_egress_bridge_warns_clean_start(_) -> From 44e7f2e9b2555a61716daac5996ff98916814630 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 1 Aug 2024 14:49:43 -0300 Subject: [PATCH 4/7] refactor: use macros for status to avoid typos --- apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl | 8 ++++---- apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl | 7 ++++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 358856c35..eb8b4e1ad 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -200,7 +200,7 @@ on_get_channel_status( } = _State ) when is_map_key(ChannelId, Channels) -> %% The channel should be ok as long as the MQTT client is ok - connected. + ?status_connected. on_get_channels(ResId) -> emqx_bridge_v2:get_channels_for_connector(ResId). @@ -359,7 +359,7 @@ on_get_status(_ResourceId, State) -> combine_status(Statuses) catch exit:timeout -> - connecting + ?status_connecting end. get_status({_Pool, Worker}) -> @@ -367,7 +367,7 @@ get_status({_Pool, Worker}) -> {ok, Client} -> emqx_bridge_mqtt_ingress:status(Client); {error, _} -> - disconnected + ?status_disconnected end. combine_status(Statuses) -> @@ -379,7 +379,7 @@ combine_status(Statuses) -> [Status | _] -> Status; [] -> - disconnected + ?status_disconnected end. mk_ingress_config( diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl index 1749d4194..35aea67a6 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl @@ -19,6 +19,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). %% management APIs -export([ @@ -234,13 +235,13 @@ status(Pid) -> try case proplists:get_value(socket, info(Pid)) of Socket when Socket /= undefined -> - connected; + ?status_connected; undefined -> - connecting + ?status_connecting end catch exit:{noproc, _} -> - disconnected + ?status_disconnected end. %% From 52b2d73b2802176c035d0ca1f0a777cb980fd694 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 1 Aug 2024 15:13:25 -0300 Subject: [PATCH 5/7] test: move new test to newer module and use current apis --- .../test/emqx_bridge_mqtt_SUITE.erl | 42 ------------------- .../emqx_bridge_mqtt_v2_subscriber_SUITE.erl | 25 +++++++++++ 2 files changed, 25 insertions(+), 42 deletions(-) diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index 438b1f601..e1af22c3d 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -401,48 +401,6 @@ t_mqtt_conn_bridge_ingress_shared_subscription(_) -> {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), ok. -t_connect_with_more_clients_than_the_broker_accepts(_) -> - PoolSize = 100, - OrgConf = emqx_mgmt_listeners_conf:get_raw(tcp, default), - on_exit(fun() -> - emqx_mgmt_listeners_conf:update(tcp, default, OrgConf) - end), - NewConf = OrgConf#{<<"max_connections">> => 3}, - {ok, _} = emqx_mgmt_listeners_conf:update(tcp, default, NewConf), - BridgeName = atom_to_binary(?FUNCTION_NAME), - ?check_trace( - #{timetrap => 10_000}, - begin - BridgeID = create_bridge( - ?SERVER_CONF#{ - <<"name">> => BridgeName, - <<"ingress">> => #{ - <<"pool_size">> => PoolSize, - <<"remote">> => #{ - <<"topic">> => <<"$share/ingress/", ?INGRESS_REMOTE_TOPIC, "/#">>, - <<"qos">> => 1 - }, - <<"local">> => #{ - <<"topic">> => <>, - <<"qos">> => <<"${qos}">>, - <<"payload">> => <<"${clientid}">>, - <<"retain">> => <<"${retain}">> - } - } - } - ), - ?block_until(#{?snk_kind := emqx_bridge_mqtt_connector_tcp_closed}), - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), - ok - end, - fun(Trace) -> - ?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_tcp_closed, Trace)), - ok - end - ), - - ok. - t_mqtt_egress_bridge_warns_clean_start(_) -> BridgeName = atom_to_binary(?FUNCTION_NAME), Action = fun() -> diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl index b9097b9c3..9030d2ac7 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl @@ -246,3 +246,28 @@ t_receive_via_rule(Config) -> end ), ok. + +t_connect_with_more_clients_than_the_broker_accepts(Config0) -> + OrgConf = emqx_mgmt_listeners_conf:get_raw(tcp, default), + on_exit(fun() -> + emqx_mgmt_listeners_conf:update(tcp, default, OrgConf) + end), + NewConf = OrgConf#{<<"max_connections">> => 3}, + {ok, _} = emqx_mgmt_listeners_conf:update(tcp, default, NewConf), + ConnectorConfig0 = ?config(connector_config, Config0), + ConnectorConfig = ConnectorConfig0#{<<"pool_size">> := 100}, + Config = emqx_utils:merge_opts(Config0, [{connector_config, ConnectorConfig}]), + ?check_trace( + #{timetrap => 10_000}, + begin + {ok, _} = emqx_bridge_v2_testlib:create_bridge_api(Config), + ?block_until(#{?snk_kind := emqx_bridge_mqtt_connector_tcp_closed}), + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_tcp_closed, Trace)), + ok + end + ), + + ok. From 3162fe7a27931d6d5094786d48aea846f0e3d69a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 1 Aug 2024 15:23:14 -0300 Subject: [PATCH 6/7] feat: prettify some error explanations --- .../src/emqx_bridge_mqtt_connector.erl | 62 +++++++++++++------ .../emqx_bridge_mqtt_v2_subscriber_SUITE.erl | 31 ++++++++-- 2 files changed, 69 insertions(+), 24 deletions(-) diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index eb8b4e1ad..02cf6595f 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -98,7 +98,7 @@ on_start(ResourceId, #{server := Server} = Conf) -> server => Server }}; {error, Reason} -> - {error, Reason} + {error, emqx_maybe:define(explain_error(Reason), Reason)} end. on_add_channel( @@ -356,7 +356,12 @@ on_get_status(_ResourceId, State) -> Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)], try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of Statuses -> - combine_status(Statuses) + case combine_status(Statuses) of + {Status, Msg} -> + {Status, State, Msg}; + Status -> + Status + end catch exit:timeout -> ?status_connecting @@ -375,7 +380,21 @@ combine_status(Statuses) -> %% Natural order of statuses: [connected, connecting, disconnected] %% * `disconnected` wins over any other status %% * `connecting` wins over `connected` - case lists:reverse(lists:usort(Statuses)) of + ToStatus = fun + ({S, _Reason}) -> S; + (S) when is_atom(S) -> S + end, + CompareFn = fun(S1A, S2A) -> + S1 = ToStatus(S1A), + S2 = ToStatus(S2A), + S1 > S2 + end, + case lists:usort(CompareFn, Statuses) of + [{Status, Reason} | _] -> + case explain_error(Reason) of + undefined -> Status; + Msg -> {Status, Msg} + end; [Status | _] -> Status; [] -> @@ -525,13 +544,7 @@ log_connect_error_reason(Level, {tcp_closed, _} = Reason, Name) -> msg => "ingress_client_connect_failed", reason => Reason, name => Name, - explain => - << - "Your MQTT connection attempt was unsuccessful. " - "It might be at its maximum capacity for handling new connections. " - "To diagnose the issue further, you can check the server logs for " - "any specific messages related to the unavailability or connection limits." - >> + explain => explain_error(Reason) }); log_connect_error_reason(Level, econnrefused = Reason, Name) -> ?tp(emqx_bridge_mqtt_connector_econnrefused_error, #{}), @@ -539,15 +552,7 @@ log_connect_error_reason(Level, econnrefused = Reason, Name) -> msg => "ingress_client_connect_failed", reason => Reason, name => Name, - explain => - << - "This error indicates that your connection attempt to the MQTT server was rejected. " - "In simpler terms, the server you tried to connect to refused your request. " - "There can be multiple reasons for this. " - "For example, the MQTT server you're trying to connect to might be down or not " - "running at all or you might have provided the wrong address " - "or port number for the server." - >> + explain => explain_error(Reason) }); log_connect_error_reason(Level, Reason, Name) -> ?SLOG(Level, #{ @@ -556,6 +561,25 @@ log_connect_error_reason(Level, Reason, Name) -> name => Name }). +explain_error(econnrefused) -> + << + "This error indicates that your connection attempt to the MQTT server was rejected. " + "In simpler terms, the server you tried to connect to refused your request. " + "There can be multiple reasons for this. " + "For example, the MQTT server you're trying to connect to might be down or not " + "running at all or you might have provided the wrong address " + "or port number for the server." + >>; +explain_error({tcp_closed, _}) -> + << + "Your MQTT connection attempt was unsuccessful. " + "It might be at its maximum capacity for handling new connections. " + "To diagnose the issue further, you can check the server logs for " + "any specific messages related to the unavailability or connection limits." + >>; +explain_error(_Reason) -> + undefined. + handle_disconnect(_Reason) -> ok. diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl index 9030d2ac7..e0598fa1e 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl @@ -131,6 +131,9 @@ hookpoint(Config) -> BridgeId = bridge_id(Config), emqx_bridge_resource:bridge_hookpoint(BridgeId). +simplify_result(Res) -> + emqx_bridge_v2_testlib:simplify_result(Res). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -247,21 +250,39 @@ t_receive_via_rule(Config) -> ), ok. -t_connect_with_more_clients_than_the_broker_accepts(Config0) -> +t_connect_with_more_clients_than_the_broker_accepts(Config) -> + Name = ?config(connector_name, Config), OrgConf = emqx_mgmt_listeners_conf:get_raw(tcp, default), on_exit(fun() -> emqx_mgmt_listeners_conf:update(tcp, default, OrgConf) end), NewConf = OrgConf#{<<"max_connections">> => 3}, {ok, _} = emqx_mgmt_listeners_conf:update(tcp, default, NewConf), - ConnectorConfig0 = ?config(connector_config, Config0), - ConnectorConfig = ConnectorConfig0#{<<"pool_size">> := 100}, - Config = emqx_utils:merge_opts(Config0, [{connector_config, ConnectorConfig}]), ?check_trace( #{timetrap => 10_000}, begin - {ok, _} = emqx_bridge_v2_testlib:create_bridge_api(Config), + ?assertMatch( + {201, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := + <<"Your MQTT connection attempt was unsuccessful", _/binary>> + }}, + simplify_result( + emqx_bridge_v2_testlib:create_connector_api( + Config, + #{<<"pool_size">> => 100} + ) + ) + ), ?block_until(#{?snk_kind := emqx_bridge_mqtt_connector_tcp_closed}), + ?assertMatch( + {200, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := + <<"Your MQTT connection attempt was unsuccessful", _/binary>> + }}, + simplify_result(emqx_bridge_v2_testlib:get_connector_api(mqtt, Name)) + ), ok end, fun(Trace) -> From bba9d085d6a0e03e90fca5ca7e87e964df269a87 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 1 Aug 2024 15:34:58 -0300 Subject: [PATCH 7/7] test: refactor test structure --- .../test/emqx_bridge_api_SUITE.erl | 2 +- .../src/emqx_bridge_mqtt_connector.erl | 1 + .../test/emqx_bridge_mqtt_SUITE.erl | 62 +++++++++---------- 3 files changed, 31 insertions(+), 34 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 6b160f3b3..fdbbc5376 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -1154,7 +1154,7 @@ t_bridges_probe(Config) -> ?assertMatch( {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, - <<"message">> := <<"Connection refused">> + <<"message">> := <<"Connection refused", _/binary>> }}, request_json( post, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 02cf6595f..6b9a40123 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -563,6 +563,7 @@ log_connect_error_reason(Level, Reason, Name) -> explain_error(econnrefused) -> << + "Connection refused. " "This error indicates that your connection attempt to the MQTT server was rejected. " "In simpler terms, the server you tried to connect to refused your request. " "There can be multiple reasons for this. " diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index e1af22c3d..42cf9d2b8 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -27,8 +27,6 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --import(emqx_common_test_helpers, [on_exit/1]). - %% output functions -export([inspect/3]). @@ -1027,40 +1025,38 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) -> ct:sleep(1000), %% stop the listener 1883 to make the bridge disconnected - ok = emqx_listeners:stop_listener('tcp:default'), - ct:sleep(1500), - ?assertMatch( - #{<<"status">> := Status} when - Status == <<"connecting">> orelse Status == <<"disconnected">>, - request_bridge(BridgeIDEgress) - ), + ?check_trace( + begin + ok = emqx_listeners:stop_listener('tcp:default'), + ct:sleep(1500), + ?assertMatch( + #{<<"status">> := Status} when + Status == <<"connecting">> orelse Status == <<"disconnected">>, + request_bridge(BridgeIDEgress) + ), - %% start the listener 1883 to make the bridge reconnected - ok = emqx_listeners:start_listener('tcp:default'), - timer:sleep(1500), - ?assertMatch( - #{<<"status">> := <<"connected">>}, - request_bridge(BridgeIDEgress) - ), + %% start the listener 1883 to make the bridge reconnected + ok = emqx_listeners:start_listener('tcp:default'), + timer:sleep(1500), + ?assertMatch( + #{<<"status">> := <<"connected">>}, + request_bridge(BridgeIDEgress) + ), - N = stop_publisher(Publisher), + N = stop_publisher(Publisher), - %% all those messages should eventually be delivered - [ - assert_mqtt_msg_received(RemoteTopic, Payload) - || I <- lists:seq(1, N), - Payload <- [integer_to_binary(I)] - ], - - Trace = snabbkaffe:collect_trace(50), - ?assert( - lists:any( - fun(K) -> - maps:get(msg, K, not_found) =:= - emqx_bridge_mqtt_connector_econnrefused_error - end, - Trace - ) + %% all those messages should eventually be delivered + [ + assert_mqtt_msg_received(RemoteTopic, Payload) + || I <- lists:seq(1, N), + Payload <- [integer_to_binary(I)] + ], + ok + end, + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(emqx_bridge_mqtt_connector_econnrefused_error, Trace)), + ok + end ), ok.