fix(ft-test): unbreak testcase by inhibiting local fs storage GC
This commit is contained in:
parent
23cd78b8d6
commit
c24c7eca34
|
@ -81,12 +81,17 @@ handle_call(Call, From, St) ->
|
||||||
{noreply, St}.
|
{noreply, St}.
|
||||||
|
|
||||||
handle_cast({collect, Transfer, [Node | Rest]}, St) ->
|
handle_cast({collect, Transfer, [Node | Rest]}, St) ->
|
||||||
ok = do_collect_transfer(Transfer, Node, St),
|
case gc_enabled(St) of
|
||||||
case Rest of
|
true ->
|
||||||
[_ | _] ->
|
ok = do_collect_transfer(Transfer, Node, St),
|
||||||
gen_server:cast(self(), {collect, Transfer, Rest});
|
case Rest of
|
||||||
[] ->
|
[_ | _] ->
|
||||||
ok
|
gen_server:cast(self(), {collect, Transfer, Rest});
|
||||||
|
[] ->
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
skip
|
||||||
end,
|
end,
|
||||||
{noreply, St};
|
{noreply, St};
|
||||||
handle_cast(reset, St) ->
|
handle_cast(reset, St) ->
|
||||||
|
@ -127,9 +132,14 @@ maybe_report(#gcstats{errors = Errors}, #st{storage = Storage}) when map_size(Er
|
||||||
maybe_report(#gcstats{} = _Stats, #st{storage = _Storage}) ->
|
maybe_report(#gcstats{} = _Stats, #st{storage = _Storage}) ->
|
||||||
?tp(garbage_collection, #{stats => _Stats, storage => _Storage}).
|
?tp(garbage_collection, #{stats => _Stats, storage => _Storage}).
|
||||||
|
|
||||||
start_timer(St = #st{next_gc_timer = undefined}) ->
|
start_timer(St = #st{storage = Storage, next_gc_timer = undefined}) ->
|
||||||
Delay = emqx_ft_conf:gc_interval(St#st.storage),
|
case emqx_ft_conf:gc_interval(Storage) of
|
||||||
St#st{next_gc_timer = emqx_misc:start_timer(Delay, collect)}.
|
Delay when Delay > 0 ->
|
||||||
|
St#st{next_gc_timer = emqx_misc:start_timer(Delay, collect)};
|
||||||
|
0 ->
|
||||||
|
?SLOG(warning, #{msg => "periodic_gc_disabled"}),
|
||||||
|
St
|
||||||
|
end.
|
||||||
|
|
||||||
reset_timer(St = #st{next_gc_timer = undefined}) ->
|
reset_timer(St = #st{next_gc_timer = undefined}) ->
|
||||||
start_timer(St);
|
start_timer(St);
|
||||||
|
@ -137,6 +147,9 @@ reset_timer(St = #st{next_gc_timer = TRef}) ->
|
||||||
ok = emqx_misc:cancel_timer(TRef),
|
ok = emqx_misc:cancel_timer(TRef),
|
||||||
start_timer(St#st{next_gc_timer = undefined}).
|
start_timer(St#st{next_gc_timer = undefined}).
|
||||||
|
|
||||||
|
gc_enabled(St) ->
|
||||||
|
emqx_ft_conf:gc_interval(St#st.storage) > 0.
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
collect_garbage(Storage) ->
|
collect_garbage(Storage) ->
|
||||||
|
|
|
@ -58,8 +58,12 @@ end_per_suite(_Config) ->
|
||||||
set_special_configs(Config) ->
|
set_special_configs(Config) ->
|
||||||
fun
|
fun
|
||||||
(emqx_ft) ->
|
(emqx_ft) ->
|
||||||
|
Storage = emqx_ft_test_helpers:local_storage(Config),
|
||||||
emqx_ft_test_helpers:load_config(#{
|
emqx_ft_test_helpers:load_config(#{
|
||||||
storage => emqx_ft_test_helpers:local_storage(Config)
|
% NOTE
|
||||||
|
% Inhibit local fs GC to simulate it isn't fast enough to collect
|
||||||
|
% complete transfers.
|
||||||
|
storage => Storage#{gc => #{interval => 0}}
|
||||||
});
|
});
|
||||||
(_) ->
|
(_) ->
|
||||||
ok
|
ok
|
||||||
|
@ -107,14 +111,7 @@ mk_cluster_specs(Config) ->
|
||||||
{env, [{emqx, boot_modules, [broker, listeners]}]},
|
{env, [{emqx, boot_modules, [broker, listeners]}]},
|
||||||
{apps, [emqx_ft]},
|
{apps, [emqx_ft]},
|
||||||
{conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]},
|
{conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]},
|
||||||
{env_handler, fun
|
{env_handler, set_special_configs(Config)}
|
||||||
(emqx_ft) ->
|
|
||||||
emqx_ft_test_helpers:load_config(#{
|
|
||||||
storage => emqx_ft_test_helpers:local_storage(Config)
|
|
||||||
});
|
|
||||||
(_) ->
|
|
||||||
ok
|
|
||||||
end}
|
|
||||||
],
|
],
|
||||||
emqx_common_test_helpers:emqx_cluster(
|
emqx_common_test_helpers:emqx_cluster(
|
||||||
Specs,
|
Specs,
|
||||||
|
@ -549,22 +546,20 @@ t_unreliable_migrating_client(Config) ->
|
||||||
% twice. This is currently expected, files must be identical anyway.
|
% twice. This is currently expected, files must be identical anyway.
|
||||||
Node1Str = atom_to_list(Node1),
|
Node1Str = atom_to_list(Node1),
|
||||||
NodeSelfStr = atom_to_list(NodeSelf),
|
NodeSelfStr = atom_to_list(NodeSelf),
|
||||||
|
% TODO: this testcase is specific to local fs storage backend
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
[#{"node" := Node1Str}, #{"node" := NodeSelfStr}],
|
[#{"node" := Node1Str}, #{"node" := NodeSelfStr}],
|
||||||
lists:map(
|
lists:map(
|
||||||
fun(#{uri := URIString}) ->
|
fun(#{uri := URIString}) ->
|
||||||
#{query := QS} = uri_string:parse(URIString),
|
#{query := QS} = uri_string:parse(URIString),
|
||||||
uri_string:dissect_query(QS)
|
maps:from_list(uri_string:dissect_query(QS))
|
||||||
end,
|
end,
|
||||||
lists:sort(Exports)
|
lists:sort(Exports)
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
|
||||||
[
|
[
|
||||||
?assertEqual(
|
?assertEqual({ok, Payload}, read_export(Export))
|
||||||
{ok, Payload},
|
|
||||||
read_export(Export)
|
|
||||||
)
|
|
||||||
|| Export <- Exports
|
|| Export <- Exports
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue