emqx/apps/emqx_ft/src/emqx_ft_storage_fs.erl

505 lines
16 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% Filesystem storage backend
%%
%% NOTE
%% If you plan to change storage layout please consult `emqx_ft_storage_fs_gc`
%% to see how much it would break or impair GC.
-module(emqx_ft_storage_fs).
-behaviour(emqx_ft_storage).
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-export([child_spec/1]).
% Segments-related API
-export([store_filemeta/3]).
-export([store_segment/3]).
-export([read_filemeta/2]).
-export([list/3]).
-export([pread/5]).
-export([lookup_local_assembler/1]).
-export([assemble/4]).
-export([transfers/1]).
% GC API
% TODO: This is quickly becomes hairy.
-export([get_root/1]).
-export([get_subdir/2]).
-export([get_subdir/3]).
-export([files/2]).
-export([start/1]).
-export([stop/1]).
-export([update_config/2]).
-export_type([storage/0]).
-export_type([filefrag/1]).
-export_type([filefrag/0]).
-export_type([transferinfo/0]).
-export_type([segmentinfo/0]).
-export_type([file_error/0]).
-type transfer() :: emqx_ft:transfer().
-type offset() :: emqx_ft:offset().
-type filemeta() :: emqx_ft:filemeta().
-type segment() :: emqx_ft:segment().
-type segmentinfo() :: #{
offset := offset(),
size := _Bytes :: non_neg_integer()
}.
-type transferinfo() :: #{
filemeta => filemeta()
}.
% TODO naming
-type filefrag(T) :: #{
path := file:name(),
timestamp := emqx_utils_calendar:epoch_second(),
size := _Bytes :: non_neg_integer(),
fragment := T
}.
-type filefrag() :: filefrag(
{filemeta, filemeta()}
| {segment, segmentinfo()}
).
-define(FRAGDIR, frags).
-define(TEMPDIR, tmp).
-define(MANIFEST, "MANIFEST.json").
-define(SEGMENT, "SEG").
-type segments() :: #{
root := file:name(),
gc := #{
interval := non_neg_integer(),
maximum_segments_ttl := non_neg_integer(),
minimum_segments_ttl := non_neg_integer()
}
}.
-type storage() :: #{
type := 'local',
enable := true,
segments := segments(),
exporter := emqx_ft_storage_exporter:exporter_conf()
}.
-type file_error() ::
file:posix()
%% Filename is incompatible with the backing filesystem.
| badarg
%% System limit (e.g. number of ports) reached.
| system_limit.
%% Related resources childspecs
-spec child_spec(storage()) ->
[supervisor:child_spec()].
child_spec(Storage) ->
[
#{
id => emqx_ft_storage_fs_gc,
start => {emqx_ft_storage_fs_gc, start_link, [Storage]},
restart => permanent
}
].
%% Store manifest in the backing filesystem.
%% Atomic operation.
-spec store_filemeta(storage(), transfer(), filemeta()) ->
% Quota? Some lower level errors?
ok | {error, conflict} | {error, file_error()}.
store_filemeta(Storage, Transfer, Meta) ->
Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), ?MANIFEST),
case read_file(Filepath, fun decode_filemeta/1) of
{ok, Meta} ->
_ = touch_file(Filepath),
ok;
{ok, Conflict} ->
?SLOG(warning, #{
msg => "filemeta_conflict", transfer => Transfer, new => Meta, old => Conflict
}),
% TODO
% We won't see conflicts in case of concurrent `store_filemeta`
% requests. It's rather odd scenario so it's fine not to worry
% about it too much now.
{error, filemeta_conflict};
{error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent ->
write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta));
{error, _} = Error ->
Error
end.
%% Store a segment in the backing filesystem.
%% Atomic operation.
-spec store_segment(storage(), transfer(), segment()) ->
% Where is the checksum gets verified? Upper level probably.
% Quota? Some lower level errors?
ok | {error, file_error()}.
store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
Filename = mk_segment_filename(Segment),
Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), Filename),
write_file_atomic(Storage, Transfer, Filepath, Content).
-spec read_filemeta(storage(), transfer()) ->
{ok, filemeta()} | {error, corrupted} | {error, file_error()}.
read_filemeta(Storage, Transfer) ->
Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), ?MANIFEST),
read_file(Filepath, fun decode_filemeta/1).
-spec list(storage(), transfer(), _What :: fragment) ->
% Some lower level errors? {error, notfound}?
% Result will contain zero or only one filemeta.
{ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]}
| {error, file_error()}.
list(Storage, Transfer, What = fragment) ->
Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)),
case file:list_dir(Dirname) of
{ok, Filenames} ->
% TODO
% In case of `What = result` there might be more than one file (though
% extremely bad luck is needed for that, e.g. concurrent assemblers with
% different filemetas from different nodes). This might be unexpected for a
% client given the current protocol, yet might be helpful in the future.
{ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)};
{error, enoent} ->
{ok, []};
{error, _} = Error ->
Error
end.
-spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
{ok, _Content :: iodata()} | {error, eof} | {error, file_error()}.
pread(_Storage, _Transfer, Frag, Offset, Size) ->
Filepath = maps:get(path, Frag),
case file:open(Filepath, [read, raw, binary]) of
{ok, IoDevice} ->
% NOTE
% Reading empty file is always `eof`.
Read = file:pread(IoDevice, Offset, Size),
ok = file:close(IoDevice),
case Read of
{ok, Content} ->
{ok, Content};
eof ->
{error, eof};
{error, Reason} ->
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
-spec assemble(storage(), transfer(), emqx_ft:bytes(), emqx_ft:finopts()) ->
{async, _Assembler :: pid()} | ok | {error, _TODO}.
assemble(Storage, Transfer, Size, Opts) ->
LookupSources = [
fun() -> lookup_local_assembler(Transfer) end,
fun() -> lookup_remote_assembler(Transfer) end,
fun() -> check_if_already_exported(Storage, Transfer) end,
fun() -> ensure_local_assembler(Storage, Transfer, Size, Opts) end
],
lookup_assembler(LookupSources).
%%
files(Storage, Query) ->
emqx_ft_storage_exporter:list(Storage, Query).
%%
update_config(StorageOld, StorageNew) ->
% NOTE: this will reset GC timer, frequent changes would postpone GC indefinitely
ok = emqx_ft_storage_fs_gc:reset(StorageNew),
emqx_ft_storage_exporter:update_config(StorageOld, StorageNew).
start(Storage) ->
ok = lists:foreach(
fun(ChildSpec) ->
{ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec)
end,
child_spec(Storage)
),
ok = emqx_ft_storage_exporter:update_config(undefined, Storage),
ok.
stop(Storage) ->
ok = emqx_ft_storage_exporter:update_config(Storage, undefined),
ok = lists:foreach(
fun(#{id := ChildId}) ->
_ = supervisor:terminate_child(emqx_ft_sup, ChildId),
ok = supervisor:delete_child(emqx_ft_sup, ChildId)
end,
child_spec(Storage)
),
ok.
%%
lookup_assembler([LastSource]) ->
LastSource();
lookup_assembler([Source | Sources]) ->
case Source() of
{error, not_found} -> lookup_assembler(Sources);
Result -> Result
end.
check_if_already_exported(Storage, Transfer) ->
case files(Storage, #{transfer => Transfer}) of
{ok, #{items := [_ | _]}} ->
% NOTE: we don't know coverage here, let's just clean up locally.
_ = emqx_ft_storage_fs_gc:collect(Storage, Transfer, [node()]),
ok;
_ ->
{error, not_found}
end.
lookup_local_assembler(Transfer) ->
case emqx_ft_assembler:where(Transfer) of
Pid when is_pid(Pid) -> {async, Pid};
_ -> {error, not_found}
end.
lookup_remote_assembler(Transfer) ->
Nodes = emqx:running_nodes() -- [node()],
Assemblers = lists:flatmap(
fun
({ok, {async, Pid}}) -> [Pid];
(_) -> []
end,
emqx_ft_storage_fs_proto_v1:list_assemblers(Nodes, Transfer)
),
case Assemblers of
[Pid | _] -> {async, Pid};
_ -> {error, not_found}
end.
ensure_local_assembler(Storage, Transfer, Size, Opts) ->
{ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size, Opts),
{async, Pid}.
-spec transfers(storage()) ->
{ok, #{transfer() => transferinfo()}}.
transfers(Storage) ->
% TODO `Continuation`
% There might be millions of transfers on the node, we need a protocol and
% storage schema to iterate through them effectively.
ClientIds = try_list_dir(get_root(Storage)),
{ok,
lists:foldl(
fun(ClientId, Acc) -> transfers(Storage, ClientId, Acc) end,
#{},
ClientIds
)}.
transfers(Storage, ClientId, AccIn) ->
Dirname = filename:join(get_root(Storage), ClientId),
case file:list_dir(Dirname) of
{ok, FileIds} ->
lists:foldl(
fun(FileId, Acc) ->
Transfer = dirnames_to_transfer(ClientId, FileId),
read_transferinfo(Storage, Transfer, Acc)
end,
AccIn,
FileIds
);
{error, _Reason} ->
?tp(warning, "list_dir_failed", #{
storage => Storage,
directory => Dirname
}),
AccIn
end.
read_transferinfo(Storage, Transfer, Acc) ->
case read_filemeta(Storage, Transfer) of
{ok, Filemeta} ->
Acc#{Transfer => #{filemeta => Filemeta}};
{error, enoent} ->
Acc#{Transfer => #{}};
{error, Reason} ->
?tp(warning, "read_transferinfo_failed", #{
storage => Storage,
transfer => Transfer,
reason => Reason
}),
Acc
end.
-spec get_root(storage()) ->
file:name().
get_root(Storage) ->
case emqx_utils_maps:deep_find([segments, root], Storage) of
{ok, Root} ->
Root;
{not_found, _, _} ->
filename:join([emqx:data_dir(), file_transfer, segments])
end.
-spec get_subdir(storage(), transfer()) ->
file:name().
get_subdir(Storage, Transfer) ->
mk_filedir(Storage, Transfer, []).
-spec get_subdir(storage(), transfer(), fragment | temporary) ->
file:name().
get_subdir(Storage, Transfer, What) ->
mk_filedir(Storage, Transfer, get_subdirs_for(What)).
get_subdirs_for(fragment) ->
[?FRAGDIR];
get_subdirs_for(temporary) ->
[?TEMPDIR].
-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
encode_filemeta(Meta) ->
emqx_utils_json:encode(?PRELUDE(_Vsn = 1, emqx_ft:encode_filemeta(Meta))).
decode_filemeta(Binary) when is_binary(Binary) ->
?PRELUDE(_Vsn = 1, Map) = emqx_utils_json:decode(Binary, [return_maps]),
case emqx_ft:decode_filemeta(Map) of
{ok, Meta} ->
Meta;
{error, Reason} ->
error(Reason)
end.
mk_segment_filename({Offset, Content}) ->
lists:concat([?SEGMENT, ".", Offset, ".", byte_size(Content)]).
break_segment_filename(Filename) ->
Regex = "^" ?SEGMENT "[.]([0-9]+)[.]([0-9]+)$",
Result = re:run(Filename, Regex, [{capture, all_but_first, list}]),
case Result of
{match, [Offset, Size]} ->
{ok, #{offset => list_to_integer(Offset), size => list_to_integer(Size)}};
nomatch ->
{error, invalid}
end.
mk_filedir(Storage, {ClientId, FileId}, SubDirs) ->
filename:join([
get_root(Storage),
emqx_ft_fs_util:escape_filename(ClientId),
emqx_ft_fs_util:escape_filename(FileId)
| SubDirs
]).
dirnames_to_transfer(ClientId, FileId) ->
{emqx_ft_fs_util:unescape_filename(ClientId), emqx_ft_fs_util:unescape_filename(FileId)}.
mk_filepath(Storage, Transfer, SubDirs, Filename) ->
filename:join(mk_filedir(Storage, Transfer, SubDirs), Filename).
try_list_dir(Dirname) ->
case file:list_dir(Dirname) of
{ok, List} -> List;
{error, _} -> []
end.
-include_lib("kernel/include/file.hrl").
read_file(Filepath, DecodeFun) ->
emqx_ft_fs_util:read_decode_file(Filepath, DecodeFun).
write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) ->
TempFilepath = mk_temp_filepath(Storage, Transfer, filename:basename(Filepath)),
Result = emqx_utils:pipeline(
[
fun filelib:ensure_dir/1,
fun write_contents/2,
fun(_) -> mv_temp_file(TempFilepath, Filepath) end
],
TempFilepath,
Content
),
case Result of
{ok, _, _} ->
_ = file:delete(TempFilepath),
ok;
{error, Reason, _} ->
{error, Reason}
end.
mk_temp_filepath(Storage, Transfer, Filename) ->
TempFilename = emqx_ft_fs_util:mk_temp_filename(Filename),
filename:join(get_subdir(Storage, Transfer, temporary), TempFilename).
write_contents(Filepath, Content) ->
file:write_file(Filepath, Content).
mv_temp_file(TempFilepath, Filepath) ->
_ = filelib:ensure_dir(Filepath),
file:rename(TempFilepath, Filepath).
touch_file(Filepath) ->
Now = erlang:localtime(),
file:change_time(Filepath, _Mtime = Now, _Atime = Now).
filtermap_files(Fun, Dirname, Filenames) ->
lists:filtermap(fun(Filename) -> Fun(Dirname, Filename) end, Filenames).
mk_filefrag(Dirname, Filename = ?MANIFEST) ->
mk_filefrag(Dirname, Filename, filemeta, fun read_frag_filemeta/2);
mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) ->
mk_filefrag(Dirname, Filename, segment, fun read_frag_segmentinfo/2);
mk_filefrag(_Dirname, _Filename) ->
?tp(warning, "rogue_file_found", #{
directory => _Dirname,
filename => _Filename
}),
false.
mk_filefrag(Dirname, Filename, Tag, Fun) ->
Filepath = filename:join(Dirname, Filename),
% TODO error handling?
{ok, Fileinfo} = file:read_file_info(Filepath),
case Fun(Filename, Filepath) of
{ok, Frag} ->
{true, #{
path => Filepath,
timestamp => Fileinfo#file_info.mtime,
size => Fileinfo#file_info.size,
fragment => {Tag, Frag}
}};
{error, _Reason} ->
?tp(warning, "mk_filefrag_failed", #{
directory => Dirname,
filename => Filename,
type => Tag,
reason => _Reason
}),
false
end.
read_frag_filemeta(_Filename, Filepath) ->
read_file(Filepath, fun decode_filemeta/1).
read_frag_segmentinfo(Filename, _Filepath) ->
break_segment_filename(Filename).