%%-------------------------------------------------------------------- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- %% Filesystem storage backend %% %% NOTE %% If you plan to change storage layout please consult `emqx_ft_storage_fs_gc` %% to see how much it would break or impair GC. -module(emqx_ft_storage_fs). -behaviour(emqx_ft_storage). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). -export([child_spec/1]). % Segments-related API -export([store_filemeta/3]). -export([store_segment/3]). -export([read_filemeta/2]). -export([list/3]). -export([pread/5]). -export([lookup_local_assembler/1]). -export([assemble/4]). -export([transfers/1]). % GC API % TODO: This is quickly becomes hairy. -export([get_root/1]). -export([get_subdir/2]). -export([get_subdir/3]). -export([files/2]). -export([start/1]). -export([stop/1]). -export([update_config/2]). -export_type([storage/0]). -export_type([filefrag/1]). -export_type([filefrag/0]). -export_type([transferinfo/0]). -export_type([segmentinfo/0]). -export_type([file_error/0]). -type transfer() :: emqx_ft:transfer(). -type offset() :: emqx_ft:offset(). -type filemeta() :: emqx_ft:filemeta(). -type segment() :: emqx_ft:segment(). -type segmentinfo() :: #{ offset := offset(), size := _Bytes :: non_neg_integer() }. -type transferinfo() :: #{ filemeta => filemeta() }. % TODO naming -type filefrag(T) :: #{ path := file:name(), timestamp := emqx_utils_calendar:epoch_second(), size := _Bytes :: non_neg_integer(), fragment := T }. -type filefrag() :: filefrag( {filemeta, filemeta()} | {segment, segmentinfo()} ). -define(FRAGDIR, frags). -define(TEMPDIR, tmp). -define(MANIFEST, "MANIFEST.json"). -define(SEGMENT, "SEG"). -type segments() :: #{ root := file:name(), gc := #{ interval := non_neg_integer(), maximum_segments_ttl := non_neg_integer(), minimum_segments_ttl := non_neg_integer() } }. -type storage() :: #{ type := 'local', enable := true, segments := segments(), exporter := emqx_ft_storage_exporter:exporter_conf() }. -type file_error() :: file:posix() %% Filename is incompatible with the backing filesystem. | badarg %% System limit (e.g. number of ports) reached. | system_limit. %% Related resources childspecs -spec child_spec(storage()) -> [supervisor:child_spec()]. child_spec(Storage) -> [ #{ id => emqx_ft_storage_fs_gc, start => {emqx_ft_storage_fs_gc, start_link, [Storage]}, restart => permanent } ]. %% Store manifest in the backing filesystem. %% Atomic operation. -spec store_filemeta(storage(), transfer(), filemeta()) -> % Quota? Some lower level errors? ok | {error, conflict} | {error, file_error()}. store_filemeta(Storage, Transfer, Meta) -> Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), ?MANIFEST), case read_file(Filepath, fun decode_filemeta/1) of {ok, Meta} -> _ = touch_file(Filepath), ok; {ok, Conflict} -> ?SLOG(warning, #{ msg => "filemeta_conflict", transfer => Transfer, new => Meta, old => Conflict }), % TODO % We won't see conflicts in case of concurrent `store_filemeta` % requests. It's rather odd scenario so it's fine not to worry % about it too much now. {error, filemeta_conflict}; {error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent -> write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta)); {error, _} = Error -> Error end. %% Store a segment in the backing filesystem. %% Atomic operation. -spec store_segment(storage(), transfer(), segment()) -> % Where is the checksum gets verified? Upper level probably. % Quota? Some lower level errors? ok | {error, file_error()}. store_segment(Storage, Transfer, Segment = {_Offset, Content}) -> Filename = mk_segment_filename(Segment), Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), Filename), write_file_atomic(Storage, Transfer, Filepath, Content). -spec read_filemeta(storage(), transfer()) -> {ok, filemeta()} | {error, corrupted} | {error, file_error()}. read_filemeta(Storage, Transfer) -> Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), ?MANIFEST), read_file(Filepath, fun decode_filemeta/1). -spec list(storage(), transfer(), _What :: fragment) -> % Some lower level errors? {error, notfound}? % Result will contain zero or only one filemeta. {ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]} | {error, file_error()}. list(Storage, Transfer, What = fragment) -> Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)), case file:list_dir(Dirname) of {ok, Filenames} -> % TODO % In case of `What = result` there might be more than one file (though % extremely bad luck is needed for that, e.g. concurrent assemblers with % different filemetas from different nodes). This might be unexpected for a % client given the current protocol, yet might be helpful in the future. {ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)}; {error, enoent} -> {ok, []}; {error, _} = Error -> Error end. -spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) -> {ok, _Content :: iodata()} | {error, eof} | {error, file_error()}. pread(_Storage, _Transfer, Frag, Offset, Size) -> Filepath = maps:get(path, Frag), case file:open(Filepath, [read, raw, binary]) of {ok, IoDevice} -> % NOTE % Reading empty file is always `eof`. Read = file:pread(IoDevice, Offset, Size), ok = file:close(IoDevice), case Read of {ok, Content} -> {ok, Content}; eof -> {error, eof}; {error, Reason} -> {error, Reason} end; {error, Reason} -> {error, Reason} end. -spec assemble(storage(), transfer(), emqx_ft:bytes(), emqx_ft:finopts()) -> {async, _Assembler :: pid()} | ok | {error, _TODO}. assemble(Storage, Transfer, Size, Opts) -> LookupSources = [ fun() -> lookup_local_assembler(Transfer) end, fun() -> lookup_remote_assembler(Transfer) end, fun() -> check_if_already_exported(Storage, Transfer) end, fun() -> ensure_local_assembler(Storage, Transfer, Size, Opts) end ], lookup_assembler(LookupSources). %% files(Storage, Query) -> emqx_ft_storage_exporter:list(Storage, Query). %% update_config(StorageOld, StorageNew) -> % NOTE: this will reset GC timer, frequent changes would postpone GC indefinitely ok = emqx_ft_storage_fs_gc:reset(StorageNew), emqx_ft_storage_exporter:update_config(StorageOld, StorageNew). start(Storage) -> ok = lists:foreach( fun(ChildSpec) -> {ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec) end, child_spec(Storage) ), ok = emqx_ft_storage_exporter:update_config(undefined, Storage), ok. stop(Storage) -> ok = emqx_ft_storage_exporter:update_config(Storage, undefined), ok = lists:foreach( fun(#{id := ChildId}) -> _ = supervisor:terminate_child(emqx_ft_sup, ChildId), ok = supervisor:delete_child(emqx_ft_sup, ChildId) end, child_spec(Storage) ), ok. %% lookup_assembler([LastSource]) -> LastSource(); lookup_assembler([Source | Sources]) -> case Source() of {error, not_found} -> lookup_assembler(Sources); Result -> Result end. check_if_already_exported(Storage, Transfer) -> case files(Storage, #{transfer => Transfer}) of {ok, #{items := [_ | _]}} -> % NOTE: we don't know coverage here, let's just clean up locally. _ = emqx_ft_storage_fs_gc:collect(Storage, Transfer, [node()]), ok; _ -> {error, not_found} end. lookup_local_assembler(Transfer) -> case emqx_ft_assembler:where(Transfer) of Pid when is_pid(Pid) -> {async, Pid}; _ -> {error, not_found} end. lookup_remote_assembler(Transfer) -> Nodes = emqx:running_nodes() -- [node()], Assemblers = lists:flatmap( fun ({ok, {async, Pid}}) -> [Pid]; (_) -> [] end, emqx_ft_storage_fs_proto_v1:list_assemblers(Nodes, Transfer) ), case Assemblers of [Pid | _] -> {async, Pid}; _ -> {error, not_found} end. ensure_local_assembler(Storage, Transfer, Size, Opts) -> {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size, Opts), {async, Pid}. -spec transfers(storage()) -> {ok, #{transfer() => transferinfo()}}. transfers(Storage) -> % TODO `Continuation` % There might be millions of transfers on the node, we need a protocol and % storage schema to iterate through them effectively. ClientIds = try_list_dir(get_root(Storage)), {ok, lists:foldl( fun(ClientId, Acc) -> transfers(Storage, ClientId, Acc) end, #{}, ClientIds )}. transfers(Storage, ClientId, AccIn) -> Dirname = filename:join(get_root(Storage), ClientId), case file:list_dir(Dirname) of {ok, FileIds} -> lists:foldl( fun(FileId, Acc) -> Transfer = dirnames_to_transfer(ClientId, FileId), read_transferinfo(Storage, Transfer, Acc) end, AccIn, FileIds ); {error, _Reason} -> ?tp(warning, "list_dir_failed", #{ storage => Storage, directory => Dirname }), AccIn end. read_transferinfo(Storage, Transfer, Acc) -> case read_filemeta(Storage, Transfer) of {ok, Filemeta} -> Acc#{Transfer => #{filemeta => Filemeta}}; {error, enoent} -> Acc#{Transfer => #{}}; {error, Reason} -> ?tp(warning, "read_transferinfo_failed", #{ storage => Storage, transfer => Transfer, reason => Reason }), Acc end. -spec get_root(storage()) -> file:name(). get_root(Storage) -> case emqx_utils_maps:deep_find([segments, root], Storage) of {ok, Root} -> Root; {not_found, _, _} -> filename:join([emqx:data_dir(), file_transfer, segments]) end. -spec get_subdir(storage(), transfer()) -> file:name(). get_subdir(Storage, Transfer) -> mk_filedir(Storage, Transfer, []). -spec get_subdir(storage(), transfer(), fragment | temporary) -> file:name(). get_subdir(Storage, Transfer, What) -> mk_filedir(Storage, Transfer, get_subdirs_for(What)). get_subdirs_for(fragment) -> [?FRAGDIR]; get_subdirs_for(temporary) -> [?TEMPDIR]. -define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]). encode_filemeta(Meta) -> emqx_utils_json:encode(?PRELUDE(_Vsn = 1, emqx_ft:encode_filemeta(Meta))). decode_filemeta(Binary) when is_binary(Binary) -> ?PRELUDE(_Vsn = 1, Map) = emqx_utils_json:decode(Binary, [return_maps]), case emqx_ft:decode_filemeta(Map) of {ok, Meta} -> Meta; {error, Reason} -> error(Reason) end. mk_segment_filename({Offset, Content}) -> lists:concat([?SEGMENT, ".", Offset, ".", byte_size(Content)]). break_segment_filename(Filename) -> Regex = "^" ?SEGMENT "[.]([0-9]+)[.]([0-9]+)$", Result = re:run(Filename, Regex, [{capture, all_but_first, list}]), case Result of {match, [Offset, Size]} -> {ok, #{offset => list_to_integer(Offset), size => list_to_integer(Size)}}; nomatch -> {error, invalid} end. mk_filedir(Storage, {ClientId, FileId}, SubDirs) -> filename:join([ get_root(Storage), emqx_ft_fs_util:escape_filename(ClientId), emqx_ft_fs_util:escape_filename(FileId) | SubDirs ]). dirnames_to_transfer(ClientId, FileId) -> {emqx_ft_fs_util:unescape_filename(ClientId), emqx_ft_fs_util:unescape_filename(FileId)}. mk_filepath(Storage, Transfer, SubDirs, Filename) -> filename:join(mk_filedir(Storage, Transfer, SubDirs), Filename). try_list_dir(Dirname) -> case file:list_dir(Dirname) of {ok, List} -> List; {error, _} -> [] end. -include_lib("kernel/include/file.hrl"). read_file(Filepath, DecodeFun) -> emqx_ft_fs_util:read_decode_file(Filepath, DecodeFun). write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) -> TempFilepath = mk_temp_filepath(Storage, Transfer, filename:basename(Filepath)), Result = emqx_utils:pipeline( [ fun filelib:ensure_dir/1, fun write_contents/2, fun(_) -> mv_temp_file(TempFilepath, Filepath) end ], TempFilepath, Content ), case Result of {ok, _, _} -> _ = file:delete(TempFilepath), ok; {error, Reason, _} -> {error, Reason} end. mk_temp_filepath(Storage, Transfer, Filename) -> TempFilename = emqx_ft_fs_util:mk_temp_filename(Filename), filename:join(get_subdir(Storage, Transfer, temporary), TempFilename). write_contents(Filepath, Content) -> file:write_file(Filepath, Content). mv_temp_file(TempFilepath, Filepath) -> _ = filelib:ensure_dir(Filepath), file:rename(TempFilepath, Filepath). touch_file(Filepath) -> Now = erlang:localtime(), file:change_time(Filepath, _Mtime = Now, _Atime = Now). filtermap_files(Fun, Dirname, Filenames) -> lists:filtermap(fun(Filename) -> Fun(Dirname, Filename) end, Filenames). mk_filefrag(Dirname, Filename = ?MANIFEST) -> mk_filefrag(Dirname, Filename, filemeta, fun read_frag_filemeta/2); mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) -> mk_filefrag(Dirname, Filename, segment, fun read_frag_segmentinfo/2); mk_filefrag(_Dirname, _Filename) -> ?tp(warning, "rogue_file_found", #{ directory => _Dirname, filename => _Filename }), false. mk_filefrag(Dirname, Filename, Tag, Fun) -> Filepath = filename:join(Dirname, Filename), % TODO error handling? {ok, Fileinfo} = file:read_file_info(Filepath), case Fun(Filename, Filepath) of {ok, Frag} -> {true, #{ path => Filepath, timestamp => Fileinfo#file_info.mtime, size => Fileinfo#file_info.size, fragment => {Tag, Frag} }}; {error, _Reason} -> ?tp(warning, "mk_filefrag_failed", #{ directory => Dirname, filename => Filename, type => Tag, reason => _Reason }), false end. read_frag_filemeta(_Filename, Filepath) -> read_file(Filepath, fun decode_filemeta/1). read_frag_segmentinfo(Filename, _Filepath) -> break_segment_filename(Filename).