fix(kafka_producer): make status `connecting` while the client fails to connect

Fixes https://emqx.atlassian.net/browse/EMQX-11408

To make it consistent with the previous bridge behavior.

Also, introduces macros for resource status to avoid problems with typos.
This commit is contained in:
Thales Macedo Garitezi 2023-11-16 10:08:41 -03:00
parent 36b5d58957
commit b92821188b
7 changed files with 137 additions and 32 deletions

View File

@ -202,33 +202,36 @@ lookup(Type, Name) ->
%% The connector should always exist
%% ... but, in theory, there might be no channels associated to it when we try
%% to delete the connector, and then this reference will become dangling...
InstanceData =
ConnectorData =
case emqx_resource:get_instance(ConnectorId) of
{ok, _, Data} ->
Data;
{error, not_found} ->
#{}
end,
%% Find the Bridge V2 status from the InstanceData
Channels = maps:get(added_channels, InstanceData, #{}),
%% Find the Bridge V2 status from the ConnectorData
ConnectorStatus = maps:get(status, ConnectorData, undefined),
Channels = maps:get(added_channels, ConnectorData, #{}),
BridgeV2Id = id(Type, Name, BridgeConnector),
ChannelStatus = maps:get(BridgeV2Id, Channels, undefined),
{DisplayBridgeV2Status, ErrorMsg} =
case ChannelStatus of
#{status := connected} ->
{connected, <<"">>};
#{status := Status, error := undefined} ->
case {ChannelStatus, ConnectorStatus} of
{#{status := ?status_connected}, _} ->
{?status_connected, <<"">>};
{#{error := resource_not_operational}, ?status_connecting} ->
{?status_connecting, <<"Not installed">>};
{#{status := Status, error := undefined}, _} ->
{Status, <<"Unknown reason">>};
#{status := Status, error := Error} ->
{#{status := Status, error := Error}, _} ->
{Status, emqx_utils:readable_error_msg(Error)};
undefined ->
{disconnected, <<"Pending installation">>}
{undefined, _} ->
{?status_disconnected, <<"Not installed">>}
end,
{ok, #{
type => bin(Type),
name => bin(Name),
raw_config => RawConf,
resource_data => InstanceData,
resource_data => ConnectorData,
status => DisplayBridgeV2Status,
error => ErrorMsg
}}

View File

@ -20,6 +20,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
@ -43,7 +44,7 @@ con_schema() ->
{
con_type(),
hoconsc:mk(
hoconsc:map(name, typerefl:map()),
hoconsc:map(name, hoconsc:ref(?MODULE, connector_config)),
#{
desc => <<"Test Connector Config">>,
required => false
@ -52,6 +53,15 @@ con_schema() ->
}
].
fields(connector_config) ->
[
{enable, hoconsc:mk(typerefl:boolean(), #{})},
{resource_opts, hoconsc:mk(typerefl:map(), #{})},
{on_start_fun, hoconsc:mk(typerefl:binary(), #{})},
{on_get_status_fun, hoconsc:mk(typerefl:binary(), #{})},
{on_add_channel_fun, hoconsc:mk(typerefl:binary(), #{})}
].
con_config() ->
#{
<<"enable">> => true,
@ -112,6 +122,7 @@ setup_mocks() ->
catch meck:new(emqx_connector_schema, MeckOpts),
meck:expect(emqx_connector_schema, fields, 1, con_schema()),
meck:expect(emqx_connector_schema, connector_type_to_bridge_types, 1, [con_type()]),
catch meck:new(emqx_connector_resource, MeckOpts),
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()),
@ -159,15 +170,7 @@ init_per_testcase(_TestCase, Config) ->
ets:new(fun_table_name(), [named_table, public]),
%% Create a fake connector
{ok, _} = emqx_connector:create(con_type(), con_name(), con_config()),
[
{mocked_mods, [
emqx_connector_schema,
emqx_connector_resource,
emqx_bridge_v2
]}
| Config
].
Config.
end_per_testcase(_TestCase, _Config) ->
ets:delete(fun_table_name()),
@ -846,6 +849,51 @@ t_start_operation_when_on_add_channel_gives_error(_Config) ->
),
ok.
t_lookup_status_when_connecting(_Config) ->
ResponseETS = ets:new(response_ets, [public]),
ets:insert(ResponseETS, {on_get_status_value, ?status_connecting}),
OnGetStatusFun = wrap_fun(fun() ->
ets:lookup_element(ResponseETS, on_get_status_value, 2)
end),
ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{
<<"on_get_status_fun">> => OnGetStatusFun,
<<"resource_opts">> => #{<<"start_timeout">> => 100}
}),
ConnectorName = ?FUNCTION_NAME,
ct:pal("connector config:\n ~p", [ConnectorConfig]),
{ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig),
ActionName = my_test_action,
ChanStatusFun = wrap_fun(fun() -> ?status_disconnected end),
ActionConfig = (bridge_config())#{
<<"on_get_channel_status_fun">> => ChanStatusFun,
<<"connector">> => atom_to_binary(ConnectorName)
},
ct:pal("action config:\n ~p", [ActionConfig]),
{ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig),
%% Top-level status is connecting if the connector status is connecting, but the
%% channel is not yet installed. `resource_data.added_channels.$channel_id.status'
%% contains true internal status.
{ok, Res} = emqx_bridge_v2:lookup(bridge_type(), ActionName),
?assertMatch(
#{
%% This is the action's public status
status := ?status_connecting,
resource_data :=
#{
%% This is the connector's status
status := ?status_connecting
}
},
Res
),
#{resource_data := #{added_channels := Channels}} = Res,
[{_Id, ChannelData}] = maps:to_list(Channels),
?assertMatch(#{status := ?status_disconnected}, ChannelData),
ok.
%% Helper Functions
wait_until(Fun) ->

View File

@ -587,7 +587,7 @@ t_broken_bridge_config(Config) ->
<<"type">> := ?BRIDGE_TYPE,
<<"connector">> := <<"does_not_exist">>,
<<"status">> := <<"disconnected">>,
<<"error">> := <<"Pending installation">>
<<"error">> := <<"Not installed">>
}
]},
request_json(get, uri([?ROOT]), Config)
@ -640,7 +640,7 @@ t_fix_broken_bridge_config(Config) ->
<<"type">> := ?BRIDGE_TYPE,
<<"connector">> := <<"does_not_exist">>,
<<"status">> := <<"disconnected">>,
<<"error">> := <<"Pending installation">>
<<"error">> := <<"Not installed">>
}
]},
request_json(get, uri([?ROOT]), Config)

View File

@ -43,8 +43,8 @@ on_start(
) ->
Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef),
Fun(Conf);
on_start(_InstId, _Config) ->
{ok, #{}}.
on_start(_InstId, Config) ->
{ok, Config}.
on_add_channel(
_InstId,

View File

@ -481,11 +481,11 @@ on_get_status(
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
case wolff_client:check_connectivity(Pid) of
ok -> connected;
{error, Error} -> {connecting, State, Error}
ok -> ?status_connected;
{error, Error} -> {?status_connecting, State, Error}
end;
{error, _Reason} ->
connecting
?status_connecting
end.
on_get_channel_status(
@ -499,10 +499,10 @@ on_get_channel_status(
#{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels),
try
ok = check_topic_and_leader_connections(ClientId, KafkaTopic),
connected
?status_connected
catch
throw:#{reason := restarting} ->
conneting
?status_connecting
end.
check_topic_and_leader_connections(ClientId, KafkaTopic) ->

View File

@ -23,6 +23,8 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("brod/include/brod.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
-define(TYPE, kafka_producer).
%%------------------------------------------------------------------------------
@ -55,6 +57,13 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, _Config) ->
emqx_common_test_helpers:call_janitor(60_000),
ok.
%%-------------------------------------------------------------------------------------
%% Helper fns
%%-------------------------------------------------------------------------------------
@ -163,6 +172,16 @@ kafka_hosts_string() ->
KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"),
KafkaHost ++ ":" ++ KafkaPort.
create_connector(Name, Config) ->
Res = emqx_connector:create(?TYPE, Name, Config),
on_exit(fun() -> emqx_connector:remove(?TYPE, Name) end),
Res.
create_action(Name, Config) ->
Res = emqx_bridge_v2:create(?TYPE, Name, Config),
on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end),
Res.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -302,3 +321,24 @@ t_unknown_topic(_Config) ->
emqx_bridge_v2_testlib:get_bridge_api(?TYPE, BridgeName)
),
ok.
t_bad_url(_Config) ->
ConnectorName = <<"test_connector">>,
ActionName = <<"test_action">>,
ActionConfig = bridge_v2_config(<<"test_connector">>),
ConnectorConfig0 = connector_config(),
ConnectorConfig = ConnectorConfig0#{<<"bootstrap_hosts">> := <<"bad_host:9092">>},
?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)),
?assertMatch({ok, _}, create_action(ActionName, ActionConfig)),
?assertMatch(
{ok, #{
resource_data :=
#{
status := connecting,
error := [#{reason := unresolvable_hostname}]
}
}},
emqx_connector:lookup(?TYPE, ConnectorName)
),
?assertMatch({ok, #{status := connecting}}, emqx_bridge_v2:lookup(?TYPE, ActionName)),
ok.

View File

@ -13,6 +13,16 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% bridge/connector/action status
-define(status_connected, connected).
-define(status_connecting, connecting).
-define(status_disconnected, disconnected).
%% Note: the `stopped' status can only be emitted by `emqx_resource_manager'... Modules
%% implementing `emqx_resource' behavior should not return it. The `rm_' prefix is to
%% remind us of that.
-define(rm_status_stopped, stopped).
-type resource_type() :: module().
-type resource_id() :: binary().
-type channel_id() :: binary().
@ -21,8 +31,12 @@
-type resource_config() :: term().
-type resource_spec() :: map().
-type resource_state() :: term().
-type resource_status() :: connected | disconnected | connecting | stopped.
-type channel_status() :: connected | connecting | disconnected.
%% Note: the `stopped' status can only be emitted by `emqx_resource_manager'... Modules
%% implementing `emqx_resource' behavior should not return it.
-type resource_status() ::
?status_connected | ?status_disconnected | ?status_connecting | ?rm_status_stopped.
-type health_check_status() :: ?status_connected | ?status_disconnected | ?status_connecting.
-type channel_status() :: ?status_connected | ?status_connecting | ?status_disconnected.
-type callback_mode() :: always_sync | async_if_possible.
-type query_mode() ::
simple_sync