diff --git a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf index 15c42dcfa..941611424 100644 --- a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf +++ b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf @@ -22,13 +22,24 @@ emqx_ft_schema { } } - local_storage_root { + local_storage_segments { + desc { + en: "Settings for local segments storage, which include uploaded transfer fragments and temporary data." + zh: "保存上传文件和临时数据的文件系统路径。" + } + label: { + en: "Local Segments Storage" + zh: "本地存储根" + } + } + + local_storage_segments_root { desc { en: "File system path to keep uploaded fragments and temporary data." zh: "保存上传文件和临时数据的文件系统路径。" } label: { - en: "Local Storage Root" + en: "Local Segments Storage Filesystem Root" zh: "本地存储根" } } @@ -67,7 +78,7 @@ emqx_ft_schema { } } - local_storage_gc { + local_storage_segments_gc { desc { en: "Garbage collection settings for the intermediate and temporary files in the local file system." zh: "" diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl index e925ee376..b0e73cda4 100644 --- a/apps/emqx_ft/src/emqx_ft_conf.erl +++ b/apps/emqx_ft/src/emqx_ft_conf.erl @@ -22,6 +22,7 @@ %% Accessors -export([storage/0]). +-export([segments_root/1]). -export([gc_interval/1]). -export([segments_ttl/1]). @@ -48,17 +49,27 @@ storage() -> emqx_config:get([file_transfer, storage], disabled). +-spec segments_root(_Storage) -> file:name(). +segments_root(_Storage) -> + Conf = assert_storage(local), + case emqx_map_lib:deep_find([segments, root], Conf) of + {ok, Root} -> + Root; + {not_found, _, _} -> + filename:join([emqx:data_dir(), file_transfer, segments]) + end. + -spec gc_interval(_Storage) -> milliseconds(). gc_interval(_Storage) -> Conf = assert_storage(local), - emqx_map_lib:deep_get([gc, interval], Conf). + emqx_map_lib:deep_get([segments, gc, interval], Conf). -spec segments_ttl(_Storage) -> {_Min :: seconds(), _Max :: seconds()}. segments_ttl(_Storage) -> Conf = assert_storage(local), { - emqx_map_lib:deep_get([gc, minimum_segments_ttl], Conf), - emqx_map_lib:deep_get([gc, maximum_segments_ttl], Conf) + emqx_map_lib:deep_get([segments, gc, minimum_segments_ttl], Conf), + emqx_map_lib:deep_get([segments, gc, maximum_segments_ttl], Conf) }. assert_storage(Type) -> diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index 5c0a764a2..2abbe4c45 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -61,9 +61,9 @@ fields(local_storage) -> required => false, desc => ?DESC("local_type") }}, - {root, #{ - type => binary(), - desc => ?DESC("local_storage_root"), + {segments, #{ + type => ?REF(local_storage_segments), + desc => ?DESC("local_storage_segments"), required => false }}, {exporter, #{ @@ -72,10 +72,18 @@ fields(local_storage) -> ]), desc => ?DESC("local_storage_exporter"), required => true + }} + ]; +fields(local_storage_segments) -> + [ + {root, #{ + type => binary(), + desc => ?DESC("local_storage_segments_root"), + required => false }}, {gc, #{ - type => ?REF(local_storage_gc), - desc => ?DESC("local_storage_gc"), + type => ?REF(local_storage_segments_gc), + desc => ?DESC("local_storage_segments_gc"), required => false }} ]; @@ -93,7 +101,7 @@ fields(local_storage_exporter) -> required => false }} ]; -fields(local_storage_gc) -> +fields(local_storage_segments_gc) -> [ {interval, #{ type => emqx_schema:duration_ms(), @@ -122,10 +130,12 @@ desc(file_transfer) -> "File transfer settings"; desc(local_storage) -> "File transfer local storage settings"; +desc(local_storage_segments) -> + "File transfer local segments storage settings"; desc(local_storage_exporter) -> "Exporter settings for the File transfer local storage backend"; -desc(local_storage_gc) -> - "Garbage collection settings for the File transfer local storage backend". +desc(local_storage_segments_gc) -> + "Garbage collection settings for the File transfer local segments storage". schema(filemeta) -> #{ diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 10ca263da..f0bc9ac67 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -220,7 +220,7 @@ transfers(Storage) -> % TODO `Continuation` % There might be millions of transfers on the node, we need a protocol and % storage schema to iterate through them effectively. - ClientIds = try_list_dir(get_storage_root(Storage)), + ClientIds = try_list_dir(get_segments_root(Storage)), {ok, lists:foldl( fun(ClientId, Acc) -> transfers(Storage, ClientId, Acc) end, @@ -229,7 +229,7 @@ transfers(Storage) -> )}. transfers(Storage, ClientId, AccIn) -> - Dirname = filename:join(get_storage_root(Storage), ClientId), + Dirname = filename:join(get_segments_root(Storage), ClientId), case file:list_dir(Dirname) of {ok, FileIds} -> lists:foldl( @@ -307,7 +307,7 @@ break_segment_filename(Filename) -> mk_filedir(Storage, {ClientId, FileId}, SubDirs) -> filename:join([ - get_storage_root(Storage), + get_segments_root(Storage), emqx_ft_fs_util:escape_filename(ClientId), emqx_ft_fs_util:escape_filename(FileId) | SubDirs @@ -325,8 +325,8 @@ try_list_dir(Dirname) -> {error, _} -> [] end. -get_storage_root(Storage) -> - maps:get(root, Storage, filename:join([emqx:data_dir(), "ft", "transfers"])). +get_segments_root(Storage) -> + emqx_ft_conf:segments_root(Storage). -include_lib("kernel/include/file.hrl"). diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl index 258f3a7f3..63b0ab500 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_gc.erl @@ -65,7 +65,7 @@ collect(Storage, Transfer, Nodes) -> mk_server_ref(Storage) -> % TODO - {via, gproc, {n, l, {?MODULE, get_storage_root(Storage)}}}. + {via, gproc, {n, l, {?MODULE, get_segments_root(Storage)}}}. %% @@ -223,7 +223,7 @@ collect_transfer_directory(Storage, Transfer, Cutoff, Stats) -> end, case collect_empty_directory(Dirname, Filter, Stats) of {true, StatsNext} -> - collect_parents(Dirname, get_storage_root(Storage), StatsNext); + collect_parents(Dirname, get_segments_root(Storage), StatsNext); {false, StatsNext} -> StatsNext end. @@ -373,5 +373,5 @@ register_gcstat_error(Subject, Error, Stats = #gcstats{errors = Errors}) -> %% -get_storage_root(Storage) -> - maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")). +get_segments_root(Storage) -> + emqx_ft_conf:segments_root(Storage). diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 3cfcebf93..e365cba85 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -63,7 +63,10 @@ set_special_configs(Config) -> % NOTE % Inhibit local fs GC to simulate it isn't fast enough to collect % complete transfers. - storage => Storage#{gc => #{interval => 0}} + storage => emqx_map_lib:deep_merge( + Storage, + #{segments => #{gc => #{interval => 0}}} + ) }); (_) -> ok diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index 7c5d14b38..4b5610f51 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -248,7 +248,9 @@ exporter(Config) -> storage(Config) -> #{ type => local, - root => ?config(storage_root, Config), + segments => #{ + root => ?config(storage_root, Config) + }, exporter => #{ type => local, root => ?config(exports_root, Config) diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl index e43abf095..3811c1ef4 100644 --- a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl @@ -59,7 +59,12 @@ t_update_config(_Config) -> #{ <<"storage">> => #{ <<"type">> => <<"local">>, - <<"root">> => <<"/tmp/path">>, + <<"segments">> => #{ + <<"root">> => <<"/tmp/path">>, + <<"gc">> => #{ + <<"interval">> => <<"5m">> + } + }, <<"exporter">> => #{ <<"type">> => <<"local">>, <<"root">> => <<"/tmp/exports">> @@ -71,5 +76,9 @@ t_update_config(_Config) -> ), ?assertEqual( <<"/tmp/path">>, - emqx_config:get([file_transfer, storage, root]) + emqx_config:get([file_transfer, storage, segments, root]) + ), + ?assertEqual( + 5 * 60 * 1000, + emqx_ft_conf:gc_interval(emqx_ft_conf:storage()) ). diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl index e1574da0d..e3decf0f5 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl @@ -94,14 +94,7 @@ client_id(Config) -> atom_to_binary(?config(tc, Config), utf8). storage(Config) -> - #{ - type => local, - root => emqx_ft_test_helpers:root(Config, node(), ["transfers"]), - exporter => #{ - type => local, - root => emqx_ft_test_helpers:root(Config, node(), ["exports"]) - } - }. + emqx_ft_test_helpers:local_storage(Config). list_files(Config) -> {ok, Files} = emqx_ft_storage_fs:files(storage(Config)), diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl index 53a22b8b6..90039cd96 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -20,7 +20,6 @@ -compile(nowarn_export_all). -include_lib("emqx_ft/include/emqx_ft_storage_fs.hrl"). --include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). -include_lib("snabbkaffe/include/test_macros.hrl"). @@ -43,7 +42,9 @@ init_per_testcase(TC, Config) -> emqx_ft_test_helpers:load_config(#{ storage => #{ type => local, - root => emqx_ft_test_helpers:root(Config, node(), [TC, transfers]), + segments => #{ + root => emqx_ft_test_helpers:root(Config, node(), [TC, segments]) + }, exporter => #{ type => local, root => emqx_ft_test_helpers:root(Config, node(), [TC, exports]) @@ -66,7 +67,7 @@ end_per_testcase(_TC, _Config) -> t_gc_triggers_periodically(_Config) -> Interval = 500, - ok = emqx_config:put([file_transfer, storage, gc, interval], Interval), + ok = set_gc_config(interval, Interval), ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()), ?check_trace( timer:sleep(Interval * 3), @@ -104,9 +105,9 @@ t_gc_triggers_manually(_Config) -> t_gc_complete_transfers(_Config) -> Storage = emqx_ft_conf:storage(), - ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0), - ok = emqx_config:put([file_transfer, storage, gc, maximum_segments_ttl], 3), - ok = emqx_config:put([file_transfer, storage, gc, interval], 500), + ok = set_gc_config(minimum_segments_ttl, 0), + ok = set_gc_config(maximum_segments_ttl, 3), + ok = set_gc_config(interval, 500), ok = emqx_ft_storage_fs_gc:reset(Storage), Transfers = [ { @@ -194,8 +195,8 @@ t_gc_complete_transfers(_Config) -> ). t_gc_incomplete_transfers(_Config) -> - ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0), - ok = emqx_config:put([file_transfer, storage, gc, maximum_segments_ttl], 4), + ok = set_gc_config(minimum_segments_ttl, 0), + ok = set_gc_config(maximum_segments_ttl, 4), Storage = emqx_ft_conf:storage(), Transfers = [ { @@ -222,7 +223,7 @@ t_gc_incomplete_transfers(_Config) -> % 1. Start transfers, send all the segments but don't trigger completion. _ = emqx_misc:pmap(fun(Transfer) -> start_transfer(Storage, Transfer) end, Transfers), % 2. Enable periodic GC every 0.5 seconds. - ok = emqx_config:put([file_transfer, storage, gc, interval], 500), + ok = set_gc_config(interval, 500), ok = emqx_ft_storage_fs_gc:reset(Storage), % 3. First we need the first transfer to be collected. {ok, _} = ?block_until( @@ -265,8 +266,8 @@ t_gc_incomplete_transfers(_Config) -> ). t_gc_handling_errors(_Config) -> - ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0), - ok = emqx_config:put([file_transfer, storage, gc, maximum_segments_ttl], 0), + ok = set_gc_config(minimum_segments_ttl, 0), + ok = set_gc_config(maximum_segments_ttl, 0), Storage = emqx_ft_conf:storage(), Transfer1 = {<<"client1">>, mk_file_id()}, Transfer2 = {<<"client2">>, mk_file_id()}, @@ -322,6 +323,9 @@ t_gc_handling_errors(_Config) -> %% +set_gc_config(Name, Value) -> + emqx_config:put([file_transfer, storage, segments, gc, Name], Value). + start_transfer(Storage, {Transfer, Meta, Gen}) -> ?assertEqual( ok, diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl index a38242629..b8ee45b15 100644 --- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl +++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl @@ -46,8 +46,13 @@ stop_additional_node(Node) -> local_storage(Config) -> #{ type => local, - root => root(Config, node(), [transfers]), - exporter => #{type => local, root => root(Config, node(), [exports])} + segments => #{ + root => root(Config, node(), [segments]) + }, + exporter => #{ + type => local, + root => root(Config, node(), [exports]) + } }. load_config(Config) ->