diff --git a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf index 576d7d8fe..dd2d2a1dc 100644 --- a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf +++ b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf @@ -33,4 +33,39 @@ emqx_ft_schema { } } + local_storage_gc { + desc { + en: "Garbage collection settings for the intermediate and temporary files in the local file system." + zh: "" + } + label: { + en: "Local Storage GC" + zh: "" + } + } + + storage_gc_interval { + desc { + en: "Interval of periodic garbage collection." + zh: "" + } + label: { + en: "GC Interval" + zh: "" + } + } + + storage_gc_max_segments_ttl { + desc { + en: "Maximum TTL of a segment kept in the local file system.
" + "This is a hard limit: no segment will outlive this TTL, even if some file transfer specifies a " + "TTL more than that." + zh: "" + } + label: { + en: "GC Interval" + zh: "" + } + } + } diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl index 444462716..e925ee376 100644 --- a/apps/emqx_ft/src/emqx_ft_conf.erl +++ b/apps/emqx_ft/src/emqx_ft_conf.erl @@ -50,17 +50,25 @@ storage() -> -spec gc_interval(_Storage) -> milliseconds(). gc_interval(_Storage) -> - % TODO: config wiring - application:get_env(emqx_ft, gc_interval, timer:minutes(10)). + Conf = assert_storage(local), + emqx_map_lib:deep_get([gc, interval], Conf). -spec segments_ttl(_Storage) -> {_Min :: seconds(), _Max :: seconds()}. segments_ttl(_Storage) -> - % TODO: config wiring + Conf = assert_storage(local), { - application:get_env(emqx_ft, min_segments_ttl, 60), - application:get_env(emqx_ft, max_segments_ttl, 72 * 3600) + emqx_map_lib:deep_get([gc, minimum_segments_ttl], Conf), + emqx_map_lib:deep_get([gc, maximum_segments_ttl], Conf) }. +assert_storage(Type) -> + case storage() of + Conf = #{type := Type} -> + Conf; + Conf -> + error({inapplicable, Conf}) + end. + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index f17c957a9..d2b2b9299 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -65,13 +65,44 @@ fields(local_storage) -> type => binary(), desc => ?DESC("local_storage_root"), required => false + }}, + {gc, #{ + type => hoconsc:ref(?MODULE, local_storage_gc), + desc => ?DESC("local_storage_gc"), + required => false + }} + ]; +fields(local_storage_gc) -> + [ + {interval, #{ + type => emqx_schema:duration_ms(), + desc => ?DESC("storage_gc_interval"), + required => false, + default => "1h" + }}, + {maximum_segments_ttl, #{ + type => emqx_schema:duration_s(), + desc => ?DESC("storage_gc_max_segments_ttl"), + required => false, + default => "24h" + }}, + {minimum_segments_ttl, #{ + type => emqx_schema:duration_s(), + % desc => ?DESC("storage_gc_min_segments_ttl"), + required => false, + default => "5m", + % NOTE + % This setting does not seem to be useful to an end-user. + hidden => true }} ]. desc(file_transfer) -> "File transfer settings"; desc(local_storage) -> - "File transfer local storage settings". + "File transfer local storage settings"; +desc(local_storage_gc) -> + "Garbage collection settings for the File transfer local storage backend". schema(filemeta) -> #{ diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl index a0b969edd..a83d915e6 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -37,16 +37,22 @@ end_per_suite(_Config) -> ok. init_per_testcase(TC, Config) -> - _ = application:unset_env(emqx_ft, gc_interval), - _ = application:unset_env(emqx_ft, min_segments_ttl), - _ = application:unset_env(emqx_ft, max_segments_ttl), ok = emqx_common_test_helpers:start_app( emqx_ft, fun(emqx_ft) -> - ok = emqx_config:put([file_transfer, storage], #{ - type => local, - root => mk_root(TC, Config) - }) + emqx_common_test_helpers:load_config( + emqx_ft_schema, + iolist_to_binary([ + "file_transfer {" + " storage = {" + " type = \"local\"," + " root = \"", + mk_root(TC, Config), + "\"" + " }" + "}" + ]) + ) end ), Config. @@ -64,7 +70,7 @@ mk_root(TC, Config) -> t_gc_triggers_periodically(_Config) -> Interval = 500, - ok = application:set_env(emqx_ft, gc_interval, Interval), + ok = emqx_config:put([file_transfer, storage, gc, interval], Interval), ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()), ?check_trace( timer:sleep(Interval * 3), @@ -165,8 +171,8 @@ t_gc_complete_transfers(_Config) -> ). t_gc_incomplete_transfers(_Config) -> - _ = application:set_env(emqx_ft, min_segments_ttl, 0), - _ = application:set_env(emqx_ft, max_segments_ttl, 4), + ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0), + ok = emqx_config:put([file_transfer, storage, gc, maximum_segments_ttl], 4), Storage = emqx_ft_conf:storage(), Transfers = [ { @@ -195,7 +201,7 @@ t_gc_incomplete_transfers(_Config) -> ?check_trace( begin % 2. Enable periodic GC every 0.5 seconds. - ok = application:set_env(emqx_ft, gc_interval, 500), + ok = emqx_config:put([file_transfer, storage, gc, interval], 500), ok = emqx_ft_storage_fs_gc:reset(Storage), % 3. First we need the first transfer to be collected. {ok, _} = ?block_until( @@ -241,8 +247,8 @@ t_gc_incomplete_transfers(_Config) -> ). t_gc_handling_errors(_Config) -> - _ = application:set_env(emqx_ft, min_segments_ttl, 0), - _ = application:set_env(emqx_ft, max_segments_ttl, 0), + ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0), + ok = emqx_config:put([file_transfer, storage, gc, maximum_segments_ttl], 0), Storage = emqx_ft_conf:storage(), Transfer1 = {<<"client1">>, mk_file_id()}, Transfer2 = {<<"client2">>, mk_file_id()},