diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 9def284d9..99011deea 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -454,6 +454,24 @@ probe_bridge_api(Kind, BridgeType, BridgeName, BridgeConfig) -> ct:pal("bridge probe (~s, http) result:\n ~p", [Kind, Res]), Res. +probe_connector_api(Config) -> + probe_connector_api(Config, _Overrides = #{}). + +probe_connector_api(Config, Overrides) -> + #{ + connector_type := Type, + connector_name := Name + } = get_common_values(Config), + ConnectorConfig0 = get_value(connector_config, Config), + ConnectorConfig1 = emqx_utils_maps:deep_merge(ConnectorConfig0, Overrides), + Params = ConnectorConfig1#{<<"type">> => Type, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["connectors_probe"]), + ct:pal("probing connector (~s, http):\n ~p", [Type, Params]), + Method = post, + Res = request(Method, Path, Params), + ct:pal("probing connector (~s, http) result:\n ~p", [Type, Res]), + Res. + list_bridges_http_api_v1() -> Path = emqx_mgmt_api_test_util:api_path(["bridges"]), ct:pal("list bridges (http v1)"), diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 6cfcf7d5d..c4f66dfff 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -220,10 +220,17 @@ on_stop(ConnectorResId, State) -> -spec on_get_status(connector_resource_id(), connector_state()) -> ?status_connected | ?status_disconnected. -on_get_status(_ConnectorResId, _State = #{kafka_client_id := ClientID}) -> - case brod_sup:find_client(ClientID) of - [_Pid] -> ?status_connected; - _ -> ?status_disconnected +on_get_status(_ConnectorResId, State = #{kafka_client_id := ClientID}) -> + case whereis(ClientID) of + Pid when is_pid(Pid) -> + case check_client_connectivity(Pid) of + {Status, Reason} -> + {Status, State, Reason}; + Status -> + Status + end; + _ -> + ?status_disconnected end; on_get_status(_ConnectorResId, _State) -> ?status_disconnected. @@ -631,6 +638,39 @@ is_dry_run(ConnectorResId) -> string:equal(TestIdStart, ConnectorResId) end. +-spec check_client_connectivity(pid()) -> + ?status_connected + | ?status_disconnected + | {?status_disconnected, term()}. +check_client_connectivity(ClientPid) -> + %% We use a fake group id just to probe the connection, as `get_group_coordinator' + %% will ensure a connection to the broker. + FakeGroupId = <<"____emqx_consumer_probe">>, + case brod_client:get_group_coordinator(ClientPid, FakeGroupId) of + {error, client_down} -> + ?status_disconnected; + {error, {client_down, Reason}} -> + %% `brod' should have already logged the client being down. + {?status_disconnected, maybe_clean_error(Reason)}; + {error, Reason} -> + %% `brod' should have already logged the client being down. + {?status_disconnected, maybe_clean_error(Reason)}; + {ok, _Metadata} -> + ?status_connected + end. + +%% Attempt to make the returned error a bit more friendly. +maybe_clean_error(Reason) -> + case Reason of + [{{Host, Port}, {nxdomain, _Stacktrace}} | _] when is_integer(Port) -> + HostPort = iolist_to_binary([Host, ":", integer_to_binary(Port)]), + {HostPort, nxdomain}; + [{error_code, Code}, {error_msg, Msg} | _] -> + {Code, Msg}; + _ -> + Reason + end. + -spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom(). make_client_id(ConnectorResId, BridgeType, BridgeName) -> case is_dry_run(ConnectorResId) of diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 23a8b4828..402841f99 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -2071,6 +2071,7 @@ t_begin_offset_earliest(Config) -> {ok, _} = create_bridge(Config, #{ <<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>} }), + ?retry(500, 20, ?assertEqual({ok, connected}, health_check(Config))), #{num_published => NumMessages} end, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl index 02a7a6279..8568e2f62 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl @@ -339,3 +339,15 @@ t_update_topic(Config) -> emqx_bridge_v2_testlib:get_source_api(?SOURCE_TYPE_BIN, Name) ), ok. + +t_bad_bootstrap_host(Config) -> + ?assertMatch( + {error, {{_, 400, _}, _, _}}, + emqx_bridge_v2_testlib:probe_connector_api( + Config, + #{ + <<"bootstrap_hosts">> => <<"bad_host:9999">> + } + ) + ), + ok.