diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 5ed706511..5906cc57a 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -81,6 +81,7 @@ on_start(InstanceId, Config) -> } = Config, Servers = format_servers(Servers0), ClientId = make_client_id(InstanceId, BridgeName), + ok = emqx_resource:allocate_resource(InstanceId, pulsar_client_id, ClientId), SSLOpts = emqx_tls_lib:to_client_opts(SSL), ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)), ClientOpts = #{ @@ -116,15 +117,29 @@ on_start(InstanceId, Config) -> start_producer(Config, InstanceId, ClientId, ClientOpts). -spec on_stop(resource_id(), state()) -> ok. -on_stop(_InstanceId, State) -> - #{ - pulsar_client_id := ClientId, - producers := Producers - } = State, - stop_producers(ClientId, Producers), - stop_client(ClientId), - ?tp(pulsar_bridge_stopped, #{instance_id => _InstanceId}), - ok. +on_stop(InstanceId, _State) -> + case emqx_resource:get_allocated_resources(InstanceId) of + #{pulsar_client_id := ClientId, pulsar_producers := Producers} -> + stop_producers(ClientId, Producers), + stop_client(ClientId), + ?tp(pulsar_bridge_stopped, #{ + instance_id => InstanceId, + pulsar_client_id => ClientId, + pulsar_producers => Producers + }), + ok; + #{pulsar_client_id := ClientId} -> + stop_client(ClientId), + ?tp(pulsar_bridge_stopped, #{ + instance_id => InstanceId, + pulsar_client_id => ClientId, + pulsar_producers => undefined + }), + ok; + _ -> + ?tp(pulsar_bridge_stopped, #{instance_id => InstanceId}), + ok + end. -spec on_get_status(resource_id(), state()) -> connected | disconnected. on_get_status(_InstanceId, State = #{}) -> @@ -325,6 +340,8 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> ?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}), try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of {ok, Producers} -> + ok = emqx_resource:allocate_resource(InstanceId, pulsar_producers, Producers), + ?tp(pulsar_producer_producers_allocated, #{}), State = #{ pulsar_client_id => ClientId, producers => Producers, diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 76d9f94e1..3605baaab 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -43,7 +43,9 @@ only_once_tests() -> t_send_when_down, t_send_when_timeout, t_failure_to_start_producer, - t_producer_process_crash + t_producer_process_crash, + t_resource_manager_crash_after_producers_started, + t_resource_manager_crash_before_producers_started ]. init_per_suite(Config) -> @@ -429,7 +431,19 @@ wait_until_producer_connected() -> wait_until_connected(pulsar_producers_sup, pulsar_producer). wait_until_connected(SupMod, Mod) -> - Pids = [ + Pids = get_pids(SupMod, Mod), + ?retry( + _Sleep = 300, + _Attempts0 = 20, + lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids) + ), + ok. + +get_pulsar_producers() -> + get_pids(pulsar_producers_sup, pulsar_producer). + +get_pids(SupMod, Mod) -> + [ P || {_Name, SupPid, _Type, _Mods} <- supervisor:which_children(SupMod), P <- element(2, process_info(SupPid, links)), @@ -437,13 +451,7 @@ wait_until_connected(SupMod, Mod) -> {Mod, init, _} -> true; _ -> false end - ], - ?retry( - _Sleep = 300, - _Attempts0 = 20, - lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids) - ), - ok. + ]. create_rule_and_action_http(Config) -> PulsarName = ?config(pulsar_name, Config), @@ -528,6 +536,18 @@ start_cluster(Cluster) -> end), Nodes. +kill_resource_managers() -> + ct:pal("gonna kill resource managers"), + lists:foreach( + fun({_, Pid, _, _}) -> + ct:pal("terminating resource manager ~p", [Pid]), + %% sys:terminate(Pid, stop), + exit(Pid, kill), + ok + end, + supervisor:which_children(emqx_resource_manager_sup) + ). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -921,7 +941,11 @@ t_producer_process_crash(Config) -> ok after 1_000 -> ct:fail("pid didn't die") end, - ?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId)), + ?retry( + _Sleep0 = 50, + _Attempts0 = 50, + ?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId)) + ), %% Should recover given enough time. ?retry( _Sleep = 1_000, @@ -952,6 +976,69 @@ t_producer_process_crash(Config) -> ), ok. +t_resource_manager_crash_after_producers_started(Config) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := pulsar_producer_producers_allocated}, + #{?snk_kind := will_kill_resource_manager} + ), + ?force_ordering( + #{?snk_kind := resource_manager_killed}, + #{?snk_kind := pulsar_producer_bridge_started} + ), + spawn_link(fun() -> + ?tp(will_kill_resource_manager, #{}), + kill_resource_managers(), + ?tp(resource_manager_killed, #{}), + ok + end), + %% even if the resource manager is dead, we can still + %% clear the allocated resources. + {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := Producers} when + Producers =/= undefined, + 10_000 + ), + ok + end, + [] + ), + ok. + +t_resource_manager_crash_before_producers_started(Config) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := pulsar_producer_capture_name}, + #{?snk_kind := will_kill_resource_manager} + ), + ?force_ordering( + #{?snk_kind := resource_manager_killed}, + #{?snk_kind := pulsar_producer_about_to_start_producers} + ), + spawn_link(fun() -> + ?tp(will_kill_resource_manager, #{}), + kill_resource_managers(), + ?tp(resource_manager_killed, #{}), + ok + end), + %% even if the resource manager is dead, we can still + %% clear the allocated resources. + {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined}, + 10_000 + ), + ok + end, + [] + ), + ok. + t_cluster(Config) -> MQTTTopic = ?config(mqtt_topic, Config), ResourceId = resource_id(Config), diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 7f3ac580d..ce3ee73a9 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -121,3 +121,5 @@ -define(TEST_ID_PREFIX, "_probe_:"). -define(RES_METRICS, resource_metrics). + +-define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 80f270b13..10f1de6c4 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -79,7 +79,13 @@ query/2, query/3, %% query the instance without batching and queuing messages. - simple_sync_query/2 + simple_sync_query/2, + %% functions used by connectors to register resources that must be + %% freed when stopping or even when a resource manager crashes. + allocate_resource/3, + has_allocated_resources/1, + get_allocated_resources/1, + forget_allocated_resources/1 ]). %% Direct calls to the callback module @@ -372,6 +378,9 @@ is_buffer_supported(Module) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(ResId, Mod, Config) -> try + %% If the previous manager process crashed without cleaning up + %% allocated resources, clean them up. + clean_allocated_resources(ResId, Mod), Mod:on_start(ResId, Config) catch throw:Error -> @@ -390,7 +399,16 @@ call_health_check(ResId, Mod, ResourceState) -> -spec call_stop(resource_id(), module(), resource_state()) -> term(). call_stop(ResId, Mod, ResourceState) -> - ?SAFE_CALL(Mod:on_stop(ResId, ResourceState)). + ?SAFE_CALL(begin + Res = Mod:on_stop(ResId, ResourceState), + case Res of + ok -> + emqx_resource:forget_allocated_resources(ResId); + _ -> + ok + end, + Res + end). -spec check_config(resource_type(), raw_resource_config()) -> {ok, resource_config()} | {error, term()}. @@ -486,7 +504,37 @@ apply_reply_fun({F, A}, Result) when is_function(F) -> apply_reply_fun(From, Result) -> gen_server:reply(From, Result). +-spec allocate_resource(resource_id(), any(), term()) -> ok. +allocate_resource(InstanceId, Key, Value) -> + true = ets:insert(?RESOURCE_ALLOCATION_TAB, {InstanceId, Key, Value}), + ok. + +-spec has_allocated_resources(resource_id()) -> boolean(). +has_allocated_resources(InstanceId) -> + ets:member(?RESOURCE_ALLOCATION_TAB, InstanceId). + +-spec get_allocated_resources(resource_id()) -> map(). +get_allocated_resources(InstanceId) -> + Objects = ets:lookup(?RESOURCE_ALLOCATION_TAB, InstanceId), + maps:from_list([{K, V} || {_InstanceId, K, V} <- Objects]). + +-spec forget_allocated_resources(resource_id()) -> ok. +forget_allocated_resources(InstanceId) -> + true = ets:delete(?RESOURCE_ALLOCATION_TAB, InstanceId), + ok. + %% ================================================================================= filter_instances(Filter) -> [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)]. + +clean_allocated_resources(ResourceId, ResourceMod) -> + case emqx_resource:has_allocated_resources(ResourceId) of + true -> + %% The resource entries in the ETS table are erased inside + %% `call_stop' if the call is successful. + ok = emqx_resource:call_stop(ResourceId, ResourceMod, _ResourceState = undefined), + ok; + false -> + ok + end. diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 97ac355f4..7a54bfa97 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -500,8 +500,10 @@ stop_resource(#data{state = ResState, id = ResId} = Data) -> %% We don't care the return value of the Mod:on_stop/2. %% The callback mod should make sure the resource is stopped after on_stop/2 %% is returned. - case ResState /= undefined of + HasAllocatedResources = emqx_resource:has_allocated_resources(ResId), + case ResState =/= undefined orelse HasAllocatedResources of true -> + %% we clear the allocated resources after stop is successful emqx_resource:call_stop(Data#data.id, Data#data.mod, ResState); false -> ok diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index 73f1988c6..9e86e6363 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -17,6 +17,8 @@ -behaviour(supervisor). +-include("emqx_resource.hrl"). + -export([ensure_child/5, delete_child/1]). -export([start_link/0]). @@ -36,6 +38,12 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> + %% Maps resource_id() to one or more allocated resources. + emqx_utils_ets:new(?RESOURCE_ALLOCATION_TAB, [ + bag, + public, + {read_concurrency, true} + ]), ChildSpecs = [ #{ id => emqx_resource_manager, diff --git a/changes/ee/feat-10778.en.md b/changes/ee/feat-10778.en.md new file mode 100644 index 000000000..3084d2959 --- /dev/null +++ b/changes/ee/feat-10778.en.md @@ -0,0 +1 @@ +Refactored Pulsar Producer bridge to avoid leaking resources during crashes.