diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl index eb274a4d5..258f3a7f3 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl @@ -81,12 +81,17 @@ handle_call(Call, From, St) -> {noreply, St}. handle_cast({collect, Transfer, [Node | Rest]}, St) -> - ok = do_collect_transfer(Transfer, Node, St), - case Rest of - [_ | _] -> - gen_server:cast(self(), {collect, Transfer, Rest}); - [] -> - ok + case gc_enabled(St) of + true -> + ok = do_collect_transfer(Transfer, Node, St), + case Rest of + [_ | _] -> + gen_server:cast(self(), {collect, Transfer, Rest}); + [] -> + ok + end; + false -> + skip end, {noreply, St}; handle_cast(reset, St) -> @@ -127,9 +132,14 @@ maybe_report(#gcstats{errors = Errors}, #st{storage = Storage}) when map_size(Er maybe_report(#gcstats{} = _Stats, #st{storage = _Storage}) -> ?tp(garbage_collection, #{stats => _Stats, storage => _Storage}). -start_timer(St = #st{next_gc_timer = undefined}) -> - Delay = emqx_ft_conf:gc_interval(St#st.storage), - St#st{next_gc_timer = emqx_misc:start_timer(Delay, collect)}. +start_timer(St = #st{storage = Storage, next_gc_timer = undefined}) -> + case emqx_ft_conf:gc_interval(Storage) of + Delay when Delay > 0 -> + St#st{next_gc_timer = emqx_misc:start_timer(Delay, collect)}; + 0 -> + ?SLOG(warning, #{msg => "periodic_gc_disabled"}), + St + end. reset_timer(St = #st{next_gc_timer = undefined}) -> start_timer(St); @@ -137,6 +147,9 @@ reset_timer(St = #st{next_gc_timer = TRef}) -> ok = emqx_misc:cancel_timer(TRef), start_timer(St#st{next_gc_timer = undefined}). +gc_enabled(St) -> + emqx_ft_conf:gc_interval(St#st.storage) > 0. + %% collect_garbage(Storage) -> diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 1a886c00d..c493a78c5 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -58,8 +58,12 @@ end_per_suite(_Config) -> set_special_configs(Config) -> fun (emqx_ft) -> + Storage = emqx_ft_test_helpers:local_storage(Config), emqx_ft_test_helpers:load_config(#{ - storage => emqx_ft_test_helpers:local_storage(Config) + % NOTE + % Inhibit local fs GC to simulate it isn't fast enough to collect + % complete transfers. + storage => Storage#{gc => #{interval => 0}} }); (_) -> ok @@ -107,14 +111,7 @@ mk_cluster_specs(Config) -> {env, [{emqx, boot_modules, [broker, listeners]}]}, {apps, [emqx_ft]}, {conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]}, - {env_handler, fun - (emqx_ft) -> - emqx_ft_test_helpers:load_config(#{ - storage => emqx_ft_test_helpers:local_storage(Config) - }); - (_) -> - ok - end} + {env_handler, set_special_configs(Config)} ], emqx_common_test_helpers:emqx_cluster( Specs, @@ -549,22 +546,20 @@ t_unreliable_migrating_client(Config) -> % twice. This is currently expected, files must be identical anyway. Node1Str = atom_to_list(Node1), NodeSelfStr = atom_to_list(NodeSelf), + % TODO: this testcase is specific to local fs storage backend ?assertMatch( [#{"node" := Node1Str}, #{"node" := NodeSelfStr}], lists:map( fun(#{uri := URIString}) -> #{query := QS} = uri_string:parse(URIString), - uri_string:dissect_query(QS) + maps:from_list(uri_string:dissect_query(QS)) end, lists:sort(Exports) ) ), [ - ?assertEqual( - {ok, Payload}, - read_export(Export) - ) + ?assertEqual({ok, Payload}, read_export(Export)) || Export <- Exports ].