fix(ft-conf): separate local segments storage settings

To make things less ambiguous.
This commit is contained in:
Andrew Mayorov 2023-03-28 18:49:02 +03:00 committed by Ilya Averyanov
parent 45e3b62dc4
commit 6ad7ce55d2
11 changed files with 96 additions and 48 deletions

View File

@ -22,13 +22,24 @@ emqx_ft_schema {
} }
} }
local_storage_root { local_storage_segments {
desc {
en: "Settings for local segments storage, which include uploaded transfer fragments and temporary data."
zh: "保存上传文件和临时数据的文件系统路径。"
}
label: {
en: "Local Segments Storage"
zh: "本地存储根"
}
}
local_storage_segments_root {
desc { desc {
en: "File system path to keep uploaded fragments and temporary data." en: "File system path to keep uploaded fragments and temporary data."
zh: "保存上传文件和临时数据的文件系统路径。" zh: "保存上传文件和临时数据的文件系统路径。"
} }
label: { label: {
en: "Local Storage Root" en: "Local Segments Storage Filesystem Root"
zh: "本地存储根" zh: "本地存储根"
} }
} }
@ -67,7 +78,7 @@ emqx_ft_schema {
} }
} }
local_storage_gc { local_storage_segments_gc {
desc { desc {
en: "Garbage collection settings for the intermediate and temporary files in the local file system." en: "Garbage collection settings for the intermediate and temporary files in the local file system."
zh: "" zh: ""

View File

@ -22,6 +22,7 @@
%% Accessors %% Accessors
-export([storage/0]). -export([storage/0]).
-export([segments_root/1]).
-export([gc_interval/1]). -export([gc_interval/1]).
-export([segments_ttl/1]). -export([segments_ttl/1]).
@ -48,17 +49,27 @@
storage() -> storage() ->
emqx_config:get([file_transfer, storage], disabled). emqx_config:get([file_transfer, storage], disabled).
-spec segments_root(_Storage) -> file:name().
segments_root(_Storage) ->
Conf = assert_storage(local),
case emqx_map_lib:deep_find([segments, root], Conf) of
{ok, Root} ->
Root;
{not_found, _, _} ->
filename:join([emqx:data_dir(), file_transfer, segments])
end.
-spec gc_interval(_Storage) -> milliseconds(). -spec gc_interval(_Storage) -> milliseconds().
gc_interval(_Storage) -> gc_interval(_Storage) ->
Conf = assert_storage(local), Conf = assert_storage(local),
emqx_map_lib:deep_get([gc, interval], Conf). emqx_map_lib:deep_get([segments, gc, interval], Conf).
-spec segments_ttl(_Storage) -> {_Min :: seconds(), _Max :: seconds()}. -spec segments_ttl(_Storage) -> {_Min :: seconds(), _Max :: seconds()}.
segments_ttl(_Storage) -> segments_ttl(_Storage) ->
Conf = assert_storage(local), Conf = assert_storage(local),
{ {
emqx_map_lib:deep_get([gc, minimum_segments_ttl], Conf), emqx_map_lib:deep_get([segments, gc, minimum_segments_ttl], Conf),
emqx_map_lib:deep_get([gc, maximum_segments_ttl], Conf) emqx_map_lib:deep_get([segments, gc, maximum_segments_ttl], Conf)
}. }.
assert_storage(Type) -> assert_storage(Type) ->

View File

@ -61,9 +61,9 @@ fields(local_storage) ->
required => false, required => false,
desc => ?DESC("local_type") desc => ?DESC("local_type")
}}, }},
{root, #{ {segments, #{
type => binary(), type => ?REF(local_storage_segments),
desc => ?DESC("local_storage_root"), desc => ?DESC("local_storage_segments"),
required => false required => false
}}, }},
{exporter, #{ {exporter, #{
@ -72,10 +72,18 @@ fields(local_storage) ->
]), ]),
desc => ?DESC("local_storage_exporter"), desc => ?DESC("local_storage_exporter"),
required => true required => true
}}
];
fields(local_storage_segments) ->
[
{root, #{
type => binary(),
desc => ?DESC("local_storage_segments_root"),
required => false
}}, }},
{gc, #{ {gc, #{
type => ?REF(local_storage_gc), type => ?REF(local_storage_segments_gc),
desc => ?DESC("local_storage_gc"), desc => ?DESC("local_storage_segments_gc"),
required => false required => false
}} }}
]; ];
@ -93,7 +101,7 @@ fields(local_storage_exporter) ->
required => false required => false
}} }}
]; ];
fields(local_storage_gc) -> fields(local_storage_segments_gc) ->
[ [
{interval, #{ {interval, #{
type => emqx_schema:duration_ms(), type => emqx_schema:duration_ms(),
@ -122,10 +130,12 @@ desc(file_transfer) ->
"File transfer settings"; "File transfer settings";
desc(local_storage) -> desc(local_storage) ->
"File transfer local storage settings"; "File transfer local storage settings";
desc(local_storage_segments) ->
"File transfer local segments storage settings";
desc(local_storage_exporter) -> desc(local_storage_exporter) ->
"Exporter settings for the File transfer local storage backend"; "Exporter settings for the File transfer local storage backend";
desc(local_storage_gc) -> desc(local_storage_segments_gc) ->
"Garbage collection settings for the File transfer local storage backend". "Garbage collection settings for the File transfer local segments storage".
schema(filemeta) -> schema(filemeta) ->
#{ #{

View File

@ -220,7 +220,7 @@ transfers(Storage) ->
% TODO `Continuation` % TODO `Continuation`
% There might be millions of transfers on the node, we need a protocol and % There might be millions of transfers on the node, we need a protocol and
% storage schema to iterate through them effectively. % storage schema to iterate through them effectively.
ClientIds = try_list_dir(get_storage_root(Storage)), ClientIds = try_list_dir(get_segments_root(Storage)),
{ok, {ok,
lists:foldl( lists:foldl(
fun(ClientId, Acc) -> transfers(Storage, ClientId, Acc) end, fun(ClientId, Acc) -> transfers(Storage, ClientId, Acc) end,
@ -229,7 +229,7 @@ transfers(Storage) ->
)}. )}.
transfers(Storage, ClientId, AccIn) -> transfers(Storage, ClientId, AccIn) ->
Dirname = filename:join(get_storage_root(Storage), ClientId), Dirname = filename:join(get_segments_root(Storage), ClientId),
case file:list_dir(Dirname) of case file:list_dir(Dirname) of
{ok, FileIds} -> {ok, FileIds} ->
lists:foldl( lists:foldl(
@ -307,7 +307,7 @@ break_segment_filename(Filename) ->
mk_filedir(Storage, {ClientId, FileId}, SubDirs) -> mk_filedir(Storage, {ClientId, FileId}, SubDirs) ->
filename:join([ filename:join([
get_storage_root(Storage), get_segments_root(Storage),
emqx_ft_fs_util:escape_filename(ClientId), emqx_ft_fs_util:escape_filename(ClientId),
emqx_ft_fs_util:escape_filename(FileId) emqx_ft_fs_util:escape_filename(FileId)
| SubDirs | SubDirs
@ -325,8 +325,8 @@ try_list_dir(Dirname) ->
{error, _} -> [] {error, _} -> []
end. end.
get_storage_root(Storage) -> get_segments_root(Storage) ->
maps:get(root, Storage, filename:join([emqx:data_dir(), "ft", "transfers"])). emqx_ft_conf:segments_root(Storage).
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").

View File

@ -65,7 +65,7 @@ collect(Storage, Transfer, Nodes) ->
mk_server_ref(Storage) -> mk_server_ref(Storage) ->
% TODO % TODO
{via, gproc, {n, l, {?MODULE, get_storage_root(Storage)}}}. {via, gproc, {n, l, {?MODULE, get_segments_root(Storage)}}}.
%% %%
@ -223,7 +223,7 @@ collect_transfer_directory(Storage, Transfer, Cutoff, Stats) ->
end, end,
case collect_empty_directory(Dirname, Filter, Stats) of case collect_empty_directory(Dirname, Filter, Stats) of
{true, StatsNext} -> {true, StatsNext} ->
collect_parents(Dirname, get_storage_root(Storage), StatsNext); collect_parents(Dirname, get_segments_root(Storage), StatsNext);
{false, StatsNext} -> {false, StatsNext} ->
StatsNext StatsNext
end. end.
@ -373,5 +373,5 @@ register_gcstat_error(Subject, Error, Stats = #gcstats{errors = Errors}) ->
%% %%
get_storage_root(Storage) -> get_segments_root(Storage) ->
maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")). emqx_ft_conf:segments_root(Storage).

View File

@ -63,7 +63,10 @@ set_special_configs(Config) ->
% NOTE % NOTE
% Inhibit local fs GC to simulate it isn't fast enough to collect % Inhibit local fs GC to simulate it isn't fast enough to collect
% complete transfers. % complete transfers.
storage => Storage#{gc => #{interval => 0}} storage => emqx_map_lib:deep_merge(
Storage,
#{segments => #{gc => #{interval => 0}}}
)
}); });
(_) -> (_) ->
ok ok

View File

@ -248,7 +248,9 @@ exporter(Config) ->
storage(Config) -> storage(Config) ->
#{ #{
type => local, type => local,
root => ?config(storage_root, Config), segments => #{
root => ?config(storage_root, Config)
},
exporter => #{ exporter => #{
type => local, type => local,
root => ?config(exports_root, Config) root => ?config(exports_root, Config)

View File

@ -59,7 +59,12 @@ t_update_config(_Config) ->
#{ #{
<<"storage">> => #{ <<"storage">> => #{
<<"type">> => <<"local">>, <<"type">> => <<"local">>,
<<"root">> => <<"/tmp/path">>, <<"segments">> => #{
<<"root">> => <<"/tmp/path">>,
<<"gc">> => #{
<<"interval">> => <<"5m">>
}
},
<<"exporter">> => #{ <<"exporter">> => #{
<<"type">> => <<"local">>, <<"type">> => <<"local">>,
<<"root">> => <<"/tmp/exports">> <<"root">> => <<"/tmp/exports">>
@ -71,5 +76,9 @@ t_update_config(_Config) ->
), ),
?assertEqual( ?assertEqual(
<<"/tmp/path">>, <<"/tmp/path">>,
emqx_config:get([file_transfer, storage, root]) emqx_config:get([file_transfer, storage, segments, root])
),
?assertEqual(
5 * 60 * 1000,
emqx_ft_conf:gc_interval(emqx_ft_conf:storage())
). ).

View File

@ -94,14 +94,7 @@ client_id(Config) ->
atom_to_binary(?config(tc, Config), utf8). atom_to_binary(?config(tc, Config), utf8).
storage(Config) -> storage(Config) ->
#{ emqx_ft_test_helpers:local_storage(Config).
type => local,
root => emqx_ft_test_helpers:root(Config, node(), ["transfers"]),
exporter => #{
type => local,
root => emqx_ft_test_helpers:root(Config, node(), ["exports"])
}
}.
list_files(Config) -> list_files(Config) ->
{ok, Files} = emqx_ft_storage_fs:files(storage(Config)), {ok, Files} = emqx_ft_storage_fs:files(storage(Config)),

View File

@ -20,7 +20,6 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("emqx_ft/include/emqx_ft_storage_fs.hrl"). -include_lib("emqx_ft/include/emqx_ft_storage_fs.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
-include_lib("snabbkaffe/include/test_macros.hrl"). -include_lib("snabbkaffe/include/test_macros.hrl").
@ -43,7 +42,9 @@ init_per_testcase(TC, Config) ->
emqx_ft_test_helpers:load_config(#{ emqx_ft_test_helpers:load_config(#{
storage => #{ storage => #{
type => local, type => local,
root => emqx_ft_test_helpers:root(Config, node(), [TC, transfers]), segments => #{
root => emqx_ft_test_helpers:root(Config, node(), [TC, segments])
},
exporter => #{ exporter => #{
type => local, type => local,
root => emqx_ft_test_helpers:root(Config, node(), [TC, exports]) root => emqx_ft_test_helpers:root(Config, node(), [TC, exports])
@ -66,7 +67,7 @@ end_per_testcase(_TC, _Config) ->
t_gc_triggers_periodically(_Config) -> t_gc_triggers_periodically(_Config) ->
Interval = 500, Interval = 500,
ok = emqx_config:put([file_transfer, storage, gc, interval], Interval), ok = set_gc_config(interval, Interval),
ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()), ok = emqx_ft_storage_fs_gc:reset(emqx_ft_conf:storage()),
?check_trace( ?check_trace(
timer:sleep(Interval * 3), timer:sleep(Interval * 3),
@ -104,9 +105,9 @@ t_gc_triggers_manually(_Config) ->
t_gc_complete_transfers(_Config) -> t_gc_complete_transfers(_Config) ->
Storage = emqx_ft_conf:storage(), Storage = emqx_ft_conf:storage(),
ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0), ok = set_gc_config(minimum_segments_ttl, 0),
ok = emqx_config:put([file_transfer, storage, gc, maximum_segments_ttl], 3), ok = set_gc_config(maximum_segments_ttl, 3),
ok = emqx_config:put([file_transfer, storage, gc, interval], 500), ok = set_gc_config(interval, 500),
ok = emqx_ft_storage_fs_gc:reset(Storage), ok = emqx_ft_storage_fs_gc:reset(Storage),
Transfers = [ Transfers = [
{ {
@ -194,8 +195,8 @@ t_gc_complete_transfers(_Config) ->
). ).
t_gc_incomplete_transfers(_Config) -> t_gc_incomplete_transfers(_Config) ->
ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0), ok = set_gc_config(minimum_segments_ttl, 0),
ok = emqx_config:put([file_transfer, storage, gc, maximum_segments_ttl], 4), ok = set_gc_config(maximum_segments_ttl, 4),
Storage = emqx_ft_conf:storage(), Storage = emqx_ft_conf:storage(),
Transfers = [ Transfers = [
{ {
@ -222,7 +223,7 @@ t_gc_incomplete_transfers(_Config) ->
% 1. Start transfers, send all the segments but don't trigger completion. % 1. Start transfers, send all the segments but don't trigger completion.
_ = emqx_misc:pmap(fun(Transfer) -> start_transfer(Storage, Transfer) end, Transfers), _ = emqx_misc:pmap(fun(Transfer) -> start_transfer(Storage, Transfer) end, Transfers),
% 2. Enable periodic GC every 0.5 seconds. % 2. Enable periodic GC every 0.5 seconds.
ok = emqx_config:put([file_transfer, storage, gc, interval], 500), ok = set_gc_config(interval, 500),
ok = emqx_ft_storage_fs_gc:reset(Storage), ok = emqx_ft_storage_fs_gc:reset(Storage),
% 3. First we need the first transfer to be collected. % 3. First we need the first transfer to be collected.
{ok, _} = ?block_until( {ok, _} = ?block_until(
@ -265,8 +266,8 @@ t_gc_incomplete_transfers(_Config) ->
). ).
t_gc_handling_errors(_Config) -> t_gc_handling_errors(_Config) ->
ok = emqx_config:put([file_transfer, storage, gc, minimum_segments_ttl], 0), ok = set_gc_config(minimum_segments_ttl, 0),
ok = emqx_config:put([file_transfer, storage, gc, maximum_segments_ttl], 0), ok = set_gc_config(maximum_segments_ttl, 0),
Storage = emqx_ft_conf:storage(), Storage = emqx_ft_conf:storage(),
Transfer1 = {<<"client1">>, mk_file_id()}, Transfer1 = {<<"client1">>, mk_file_id()},
Transfer2 = {<<"client2">>, mk_file_id()}, Transfer2 = {<<"client2">>, mk_file_id()},
@ -322,6 +323,9 @@ t_gc_handling_errors(_Config) ->
%% %%
set_gc_config(Name, Value) ->
emqx_config:put([file_transfer, storage, segments, gc, Name], Value).
start_transfer(Storage, {Transfer, Meta, Gen}) -> start_transfer(Storage, {Transfer, Meta, Gen}) ->
?assertEqual( ?assertEqual(
ok, ok,

View File

@ -46,8 +46,13 @@ stop_additional_node(Node) ->
local_storage(Config) -> local_storage(Config) ->
#{ #{
type => local, type => local,
root => root(Config, node(), [transfers]), segments => #{
exporter => #{type => local, root => root(Config, node(), [exports])} root => root(Config, node(), [segments])
},
exporter => #{
type => local,
root => root(Config, node(), [exports])
}
}. }.
load_config(Config) -> load_config(Config) ->