fix(kafka_consumer): check client connectivity
Fixes https://emqx.atlassian.net/browse/EMQX-11945
This commit is contained in:
parent
fc8b5d4522
commit
963e0de0c3
|
@ -454,6 +454,24 @@ probe_bridge_api(Kind, BridgeType, BridgeName, BridgeConfig) ->
|
||||||
ct:pal("bridge probe (~s, http) result:\n ~p", [Kind, Res]),
|
ct:pal("bridge probe (~s, http) result:\n ~p", [Kind, Res]),
|
||||||
Res.
|
Res.
|
||||||
|
|
||||||
|
probe_connector_api(Config) ->
|
||||||
|
probe_connector_api(Config, _Overrides = #{}).
|
||||||
|
|
||||||
|
probe_connector_api(Config, Overrides) ->
|
||||||
|
#{
|
||||||
|
connector_type := Type,
|
||||||
|
connector_name := Name
|
||||||
|
} = get_common_values(Config),
|
||||||
|
ConnectorConfig0 = get_value(connector_config, Config),
|
||||||
|
ConnectorConfig1 = emqx_utils_maps:deep_merge(ConnectorConfig0, Overrides),
|
||||||
|
Params = ConnectorConfig1#{<<"type">> => Type, <<"name">> => Name},
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["connectors_probe"]),
|
||||||
|
ct:pal("probing connector (~s, http):\n ~p", [Type, Params]),
|
||||||
|
Method = post,
|
||||||
|
Res = request(Method, Path, Params),
|
||||||
|
ct:pal("probing connector (~s, http) result:\n ~p", [Type, Res]),
|
||||||
|
Res.
|
||||||
|
|
||||||
list_bridges_http_api_v1() ->
|
list_bridges_http_api_v1() ->
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
|
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
|
||||||
ct:pal("list bridges (http v1)"),
|
ct:pal("list bridges (http v1)"),
|
||||||
|
|
|
@ -220,10 +220,17 @@ on_stop(ConnectorResId, State) ->
|
||||||
|
|
||||||
-spec on_get_status(connector_resource_id(), connector_state()) ->
|
-spec on_get_status(connector_resource_id(), connector_state()) ->
|
||||||
?status_connected | ?status_disconnected.
|
?status_connected | ?status_disconnected.
|
||||||
on_get_status(_ConnectorResId, _State = #{kafka_client_id := ClientID}) ->
|
on_get_status(_ConnectorResId, State = #{kafka_client_id := ClientID}) ->
|
||||||
case brod_sup:find_client(ClientID) of
|
case whereis(ClientID) of
|
||||||
[_Pid] -> ?status_connected;
|
Pid when is_pid(Pid) ->
|
||||||
_ -> ?status_disconnected
|
case check_client_connectivity(Pid) of
|
||||||
|
{Status, Reason} ->
|
||||||
|
{Status, State, Reason};
|
||||||
|
Status ->
|
||||||
|
Status
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
?status_disconnected
|
||||||
end;
|
end;
|
||||||
on_get_status(_ConnectorResId, _State) ->
|
on_get_status(_ConnectorResId, _State) ->
|
||||||
?status_disconnected.
|
?status_disconnected.
|
||||||
|
@ -631,6 +638,39 @@ is_dry_run(ConnectorResId) ->
|
||||||
string:equal(TestIdStart, ConnectorResId)
|
string:equal(TestIdStart, ConnectorResId)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec check_client_connectivity(pid()) ->
|
||||||
|
?status_connected
|
||||||
|
| ?status_disconnected
|
||||||
|
| {?status_disconnected, term()}.
|
||||||
|
check_client_connectivity(ClientPid) ->
|
||||||
|
%% We use a fake group id just to probe the connection, as `get_group_coordinator'
|
||||||
|
%% will ensure a connection to the broker.
|
||||||
|
FakeGroupId = <<"____emqx_consumer_probe">>,
|
||||||
|
case brod_client:get_group_coordinator(ClientPid, FakeGroupId) of
|
||||||
|
{error, client_down} ->
|
||||||
|
?status_disconnected;
|
||||||
|
{error, {client_down, Reason}} ->
|
||||||
|
%% `brod' should have already logged the client being down.
|
||||||
|
{?status_disconnected, maybe_clean_error(Reason)};
|
||||||
|
{error, Reason} ->
|
||||||
|
%% `brod' should have already logged the client being down.
|
||||||
|
{?status_disconnected, maybe_clean_error(Reason)};
|
||||||
|
{ok, _Metadata} ->
|
||||||
|
?status_connected
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% Attempt to make the returned error a bit more friendly.
|
||||||
|
maybe_clean_error(Reason) ->
|
||||||
|
case Reason of
|
||||||
|
[{{Host, Port}, {nxdomain, _Stacktrace}} | _] when is_integer(Port) ->
|
||||||
|
HostPort = iolist_to_binary([Host, ":", integer_to_binary(Port)]),
|
||||||
|
{HostPort, nxdomain};
|
||||||
|
[{error_code, Code}, {error_msg, Msg} | _] ->
|
||||||
|
{Code, Msg};
|
||||||
|
_ ->
|
||||||
|
Reason
|
||||||
|
end.
|
||||||
|
|
||||||
-spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom().
|
-spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom().
|
||||||
make_client_id(ConnectorResId, BridgeType, BridgeName) ->
|
make_client_id(ConnectorResId, BridgeType, BridgeName) ->
|
||||||
case is_dry_run(ConnectorResId) of
|
case is_dry_run(ConnectorResId) of
|
||||||
|
|
|
@ -2071,6 +2071,7 @@ t_begin_offset_earliest(Config) ->
|
||||||
{ok, _} = create_bridge(Config, #{
|
{ok, _} = create_bridge(Config, #{
|
||||||
<<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>}
|
<<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>}
|
||||||
}),
|
}),
|
||||||
|
?retry(500, 20, ?assertEqual({ok, connected}, health_check(Config))),
|
||||||
|
|
||||||
#{num_published => NumMessages}
|
#{num_published => NumMessages}
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -339,3 +339,15 @@ t_update_topic(Config) ->
|
||||||
emqx_bridge_v2_testlib:get_source_api(?SOURCE_TYPE_BIN, Name)
|
emqx_bridge_v2_testlib:get_source_api(?SOURCE_TYPE_BIN, Name)
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_bad_bootstrap_host(Config) ->
|
||||||
|
?assertMatch(
|
||||||
|
{error, {{_, 400, _}, _, _}},
|
||||||
|
emqx_bridge_v2_testlib:probe_connector_api(
|
||||||
|
Config,
|
||||||
|
#{
|
||||||
|
<<"bootstrap_hosts">> => <<"bad_host:9999">>
|
||||||
|
}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
Loading…
Reference in New Issue