feat(kafka): ensure allocated resources are removed on failures

This commit is contained in:
Thales Macedo Garitezi 2023-05-24 16:28:08 -03:00
parent 42b37690c7
commit 5df7314255
6 changed files with 174 additions and 24 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_kafka, [ {application, emqx_bridge_kafka, [
{description, "EMQX Enterprise Kafka Bridge"}, {description, "EMQX Enterprise Kafka Bridge"},
{vsn, "0.1.2"}, {vsn, "0.1.3"},
{registered, [emqx_bridge_kafka_consumer_sup]}, {registered, [emqx_bridge_kafka_consumer_sup]},
{applications, [ {applications, [
kernel, kernel,

View File

@ -101,6 +101,10 @@
" the connection parameters." " the connection parameters."
). ).
%% Allocatable resources
-define(kafka_client_id, kafka_client_id).
-define(kafka_subscriber_id, kafka_subscriber_id).
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
@ -140,6 +144,7 @@ on_start(ResourceId, Config) ->
Auth -> [{sasl, emqx_bridge_kafka_impl:sasl(Auth)}] Auth -> [{sasl, emqx_bridge_kafka_impl:sasl(Auth)}]
end, end,
ClientOpts = add_ssl_opts(ClientOpts0, SSL), ClientOpts = add_ssl_opts(ClientOpts0, SSL),
ok = emqx_resource:allocate_resource(ResourceId, ?kafka_client_id, ClientID),
case brod:start_client(BootstrapHosts, ClientID, ClientOpts) of case brod:start_client(BootstrapHosts, ClientID, ClientOpts) of
ok -> ok ->
?tp( ?tp(
@ -163,7 +168,21 @@ on_start(ResourceId, Config) ->
start_consumer(Config, ResourceId, ClientID). start_consumer(Config, ResourceId, ClientID).
-spec on_stop(resource_id(), state()) -> ok. -spec on_stop(resource_id(), state()) -> ok.
on_stop(_ResourceID, State) -> on_stop(ResourceId, _State = undefined) ->
case emqx_resource:get_allocated_resources(ResourceId) of
#{?kafka_client_id := ClientID, ?kafka_subscriber_id := SubscriberId} ->
stop_subscriber(SubscriberId),
stop_client(ClientID),
?tp(kafka_consumer_subcriber_and_client_stopped, #{}),
ok;
#{?kafka_client_id := ClientID} ->
stop_client(ClientID),
?tp(kafka_consumer_just_client_stopped, #{}),
ok;
_ ->
ok
end;
on_stop(_ResourceId, State) ->
#{ #{
subscriber_id := SubscriberId, subscriber_id := SubscriberId,
kafka_client_id := ClientID kafka_client_id := ClientID
@ -333,6 +352,9 @@ start_consumer(Config, ResourceId, ClientID) ->
%% spawns one worker for each assigned topic-partition %% spawns one worker for each assigned topic-partition
%% automatically, so we should not spawn duplicate workers. %% automatically, so we should not spawn duplicate workers.
SubscriberId = make_subscriber_id(BridgeName), SubscriberId = make_subscriber_id(BridgeName),
?tp(kafka_consumer_about_to_start_subscriber, #{}),
ok = emqx_resource:allocate_resource(ResourceId, ?kafka_subscriber_id, SubscriberId),
?tp(kafka_consumer_subscriber_allocated, #{}),
case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of
{ok, _ConsumerPid} -> {ok, _ConsumerPid} ->
?tp( ?tp(

View File

@ -23,6 +23,11 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
%% Allocatable resources
-define(kafka_resource_id, kafka_resource_id).
-define(kafka_client_id, kafka_client_id).
-define(kafka_producers, kafka_producers).
%% TODO: rename this to `kafka_producer' after alias support is added %% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility. %% to hocon; keeping this as just `kafka' for backwards compatibility.
-define(BRIDGE_TYPE, kafka). -define(BRIDGE_TYPE, kafka).
@ -46,9 +51,11 @@ on_start(InstId, Config) ->
} = Config, } = Config,
BridgeType = ?BRIDGE_TYPE, BridgeType = ?BRIDGE_TYPE,
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId),
_ = maybe_install_wolff_telemetry_handlers(ResourceId), _ = maybe_install_wolff_telemetry_handlers(ResourceId),
Hosts = emqx_bridge_kafka_impl:hosts(Hosts0), Hosts = emqx_bridge_kafka_impl:hosts(Hosts0),
ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId),
ClientConfig = #{ ClientConfig = #{
min_metadata_refresh_interval => MinMetaRefreshInterval, min_metadata_refresh_interval => MinMetaRefreshInterval,
connect_timeout => ConnTimeout, connect_timeout => ConnTimeout,
@ -86,6 +93,7 @@ on_start(InstId, Config) ->
WolffProducerConfig = producers_config(BridgeName, ClientId, KafkaConfig, IsDryRun), WolffProducerConfig = producers_config(BridgeName, ClientId, KafkaConfig, IsDryRun),
case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
{ok, Producers} -> {ok, Producers} ->
ok = emqx_resource:allocate_resource(InstId, ?kafka_producers, Producers),
{ok, #{ {ok, #{
message_template => compile_message_template(MessageTemplate), message_template => compile_message_template(MessageTemplate),
client_id => ClientId, client_id => ClientId,
@ -120,28 +128,63 @@ on_start(InstId, Config) ->
) )
end. end.
on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) -> on_stop(InstanceId, _State) ->
case emqx_resource:get_allocated_resources(InstanceId) of
#{
?kafka_client_id := ClientId,
?kafka_producers := Producers,
?kafka_resource_id := ResourceId
} ->
_ = with_log_at_error( _ = with_log_at_error(
fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, fun() -> wolff:stop_and_delete_supervised_producers(Producers) end,
#{ #{
msg => "failed_to_delete_kafka_producer", msg => "failed_to_delete_kafka_producer",
client_id => ClientID client_id => ClientId
} }
), ),
_ = with_log_at_error( _ = with_log_at_error(
fun() -> wolff:stop_and_delete_supervised_client(ClientID) end, fun() -> wolff:stop_and_delete_supervised_client(ClientId) end,
#{ #{
msg => "failed_to_delete_kafka_client", msg => "failed_to_delete_kafka_client",
client_id => ClientID client_id => ClientId
} }
), ),
with_log_at_error( _ = with_log_at_error(
fun() -> uninstall_telemetry_handlers(ResourceID) end, fun() -> uninstall_telemetry_handlers(ResourceId) end,
#{ #{
msg => "failed_to_uninstall_telemetry_handlers", msg => "failed_to_uninstall_telemetry_handlers",
client_id => ClientID resource_id => ResourceId
} }
). ),
ok;
#{?kafka_client_id := ClientId, ?kafka_resource_id := ResourceId} ->
_ = with_log_at_error(
fun() -> wolff:stop_and_delete_supervised_client(ClientId) end,
#{
msg => "failed_to_delete_kafka_client",
client_id => ClientId
}
),
_ = with_log_at_error(
fun() -> uninstall_telemetry_handlers(ResourceId) end,
#{
msg => "failed_to_uninstall_telemetry_handlers",
resource_id => ResourceId
}
),
ok;
#{?kafka_resource_id := ResourceId} ->
_ = with_log_at_error(
fun() -> uninstall_telemetry_handlers(ResourceId) end,
#{
msg => "failed_to_uninstall_telemetry_handlers",
resource_id => ResourceId
}
),
ok;
_ ->
ok
end.
on_query( on_query(
_InstId, _InstId,

View File

@ -59,7 +59,9 @@ only_once_tests() ->
t_cluster_group, t_cluster_group,
t_node_joins_existing_cluster, t_node_joins_existing_cluster,
t_cluster_node_down, t_cluster_node_down,
t_multiple_topic_mappings t_multiple_topic_mappings,
t_resource_manager_crash_after_subscriber_started,
t_resource_manager_crash_before_subscriber_started
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
@ -1118,6 +1120,24 @@ stop_async_publisher(Pid) ->
end, end,
ok. ok.
kill_resource_managers() ->
ct:pal("gonna kill resource managers"),
lists:foreach(
fun({_, Pid, _, _}) ->
ct:pal("terminating resource manager ~p", [Pid]),
Ref = monitor(process, Pid),
exit(Pid, kill),
receive
{'DOWN', Ref, process, Pid, killed} ->
ok
after 500 ->
ct:fail("pid ~p didn't die!", [Pid])
end,
ok
end,
supervisor:which_children(emqx_resource_manager_sup)
).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -2024,3 +2044,65 @@ t_begin_offset_earliest(Config) ->
end end
), ),
ok. ok.
t_resource_manager_crash_after_subscriber_started(Config) ->
?check_trace(
begin
?force_ordering(
#{?snk_kind := kafka_consumer_subscriber_allocated},
#{?snk_kind := will_kill_resource_manager}
),
?force_ordering(
#{?snk_kind := resource_manager_killed},
#{?snk_kind := kafka_consumer_subscriber_started}
),
spawn_link(fun() ->
?tp(will_kill_resource_manager, #{}),
kill_resource_managers(),
?tp(resource_manager_killed, #{}),
ok
end),
%% even if the resource manager is dead, we can still
%% clear the allocated resources.
{{error, {config_update_crashed, {killed, _}}}, {ok, _}} =
?wait_async_action(
create_bridge(Config),
#{?snk_kind := kafka_consumer_subcriber_and_client_stopped},
10_000
),
ok
end,
[]
),
ok.
t_resource_manager_crash_before_subscriber_started(Config) ->
?check_trace(
begin
?force_ordering(
#{?snk_kind := kafka_consumer_client_started},
#{?snk_kind := will_kill_resource_manager}
),
?force_ordering(
#{?snk_kind := resource_manager_killed},
#{?snk_kind := kafka_consumer_about_to_start_subscriber}
),
spawn_link(fun() ->
?tp(will_kill_resource_manager, #{}),
kill_resource_managers(),
?tp(resource_manager_killed, #{}),
ok
end),
%% even if the resource manager is dead, we can still
%% clear the allocated resources.
{{error, {config_update_crashed, {killed, _}}}, {ok, _}} =
?wait_async_action(
create_bridge(Config),
#{?snk_kind := kafka_consumer_just_client_stopped},
10_000
),
ok
end,
[]
),
ok.

View File

@ -446,6 +446,8 @@ t_failed_creation_then_fix(Config) ->
?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
%% TODO: refactor those into init/end per testcase %% TODO: refactor those into init/end per testcase
ok = ?PRODUCER:on_stop(ResourceId, State), ok = ?PRODUCER:on_stop(ResourceId, State),
?assertEqual([], supervisor:which_children(wolff_client_sup)),
?assertEqual([], supervisor:which_children(wolff_producers_sup)),
ok = emqx_bridge_resource:remove(BridgeId), ok = emqx_bridge_resource:remove(BridgeId),
delete_all_bridges(), delete_all_bridges(),
ok. ok.

View File

@ -0,0 +1 @@
Refactored Kafka Producer and Consumer bridges to avoid leaking resources during crashes at creation.