diff --git a/apps/emqx_ft/src/emqx_ft.app.src b/apps/emqx_ft/src/emqx_ft.app.src index 855451bfb..80b4b47dd 100644 --- a/apps/emqx_ft/src/emqx_ft.app.src +++ b/apps/emqx_ft/src/emqx_ft.app.src @@ -5,7 +5,8 @@ {mod, {emqx_ft_app, []}}, {applications, [ kernel, - stdlib + stdlib, + gproc ]}, {env, []}, {modules, []} diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 0ba9c17a6..1f8a90a6f 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -30,6 +30,20 @@ on_channel_takeovered/3 ]). +-export_type([clientid/0]). +-export_type([transfer/0]). +-export_type([offset/0]). + +%% Number of bytes +-type bytes() :: non_neg_integer(). + +%% MQTT Client ID +-type clientid() :: emqx_types:clientid(). + +-type fileid() :: binary(). +-type transfer() :: {clientid(), fileid()}. +-type offset() :: bytes(). + -type ft_data() :: #{ nodes := list(node()) }. diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl new file mode 100644 index 000000000..2bd01c8b0 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -0,0 +1,160 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_assembler). + +-export([start_link/2]). + +-behaviour(gen_statem). +-export([callback_mode/0]). +-export([init/1]). +% -export([list_local_fragments/3]). +% -export([list_remote_fragments/3]). +% -export([start_assembling/3]). +-export([handle_event/4]). + +% -export([handle_continue/2]). +% -export([handle_call/3]). +% -export([handle_cast/2]). + +-record(st, { + storage :: _Storage, + transfer :: emqx_ft:transfer(), + assembly :: _TODO, + file :: io:device(), + hash +}). + +-define(RPC_LIST_TIMEOUT, 1000). +-define(RPC_READSEG_TIMEOUT, 5000). + +%% + +start_link(Storage, Transfer) -> + gen_server:start_link(?MODULE, {Storage, Transfer}, []). + +%% + +-define(internal(C), {next_event, internal, C}). + +callback_mode() -> + handle_event_function. + +init({Storage, Transfer}) -> + St = #st{ + storage = Storage, + transfer = Transfer, + assembly = emqx_ft_assembly:new(), + hash = crypto:hash_init(sha256) + }, + {ok, list_local_fragments, St, ?internal([])}. + +handle_event(list_local_fragments, internal, _, St = #st{assembly = Asm}) -> + % TODO: what we do with non-transients errors here (e.g. `eacces`)? + {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer), + NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)), + NSt = St#st{assembly = NAsm}, + case emqx_ft_assembly:status(NAsm) of + complete -> + {next_state, start_assembling, NSt, ?internal([])}; + {incomplete, _} -> + Nodes = ekka:nodelist() -- [node()], + {next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])} + % TODO: recovery? + % {error, _} = Reason -> + % {stop, Reason} + end; +handle_event({list_remote_fragments, Nodes}, internal, _, St) -> + % TODO: portable "storage" ref + Args = [St#st.storage, St#st.transfer], + % TODO + % Async would better because we would not need to wait for some lagging nodes if + % the coverage is already complete. + % TODO: BP API? + Results = erpc:multicall(Nodes, emqx_ft_storage_fs, list, Args, ?RPC_LIST_TIMEOUT), + NodeResults = lists:zip(Nodes, Results), + NAsm = emqx_ft_assembly:update( + lists:foldl( + fun + ({Node, {ok, {ok, Fragments}}}, Asm) -> + emqx_ft_assembly:append(Asm, Node, Fragments); + ({Node, Result}, Asm) -> + % TODO: log? + Asm + end, + St#st.assembly, + NodeResults + ) + ), + NSt = St#st{assembly = NAsm}, + case emqx_ft_assembly:status(NAsm) of + complete -> + {next_state, start_assembling, NSt, ?internal([])}; + % TODO: retries / recovery? + {incomplete, _} = Status -> + {stop, {error, Status}} + end; +handle_event(start_assembling, internal, _, St = #st{assembly = Asm}) -> + Filemeta = emqx_ft_assembly:filemeta(Asm), + Coverage = emqx_ft_assembly:coverage(Asm), + % TODO: errors + {ok, Handle} = emqx_ft_storage_fs:open_file(St#st.storage, St#st.transfer, Filemeta), + {next_state, {assemble, Coverage}, St#st{file = Handle}, ?internal([])}; +handle_event({assemble, [{Node, Segment} | Rest]}, internal, _, St = #st{}) -> + % TODO + % Currently, race is possible between getting segment info from the remote node and + % this node garbage collecting the segment itself. + Args = [St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)], + % TODO: pipelining + case erpc:call(Node, emqx_ft_storage_fs, read_segment, Args, ?RPC_READSEG_TIMEOUT) of + {ok, Content} -> + {ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content), + {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])} + % {error, _} -> + % ... + end; +handle_event({assemble, []}, internal, _, St = #st{}) -> + {next_state, complete, St, ?internal([])}; +handle_event(complete, internal, _, St = #st{assembly = Asm, file = Handle}) -> + Filemeta = emqx_ft_assembly:filemeta(Asm), + ok = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle), + {stop, shutdown}. + +% handle_continue(list_local, St = #st{storage = Storage, transfer = Transfer, assembly = Asm}) -> +% % TODO: what we do with non-transients errors here (e.g. `eacces`)? +% {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer), +% NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)), +% NSt = St#st{assembly = NAsm}, +% case emqx_ft_assembly:status(NAsm) of +% complete -> +% {noreply, NSt, {continue}}; +% {more, _} -> +% error(noimpl); +% {error, _} -> +% error(noimpl) +% end, +% {noreply, St}. + +% handle_call(_Call, _From, St) -> +% {reply, {error, badcall}, St}. + +% handle_cast(_Cast, St) -> +% {noreply, St}. + +%% + +segsize(#{fragment := {segmentinfo, Info}}) -> + maps:get(size, Info). diff --git a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl new file mode 100644 index 000000000..fbf948fbb --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl @@ -0,0 +1,44 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_assembler_sup). + +-export([start_link/1]). +-export([start_child/3]). + +-behaviour(supervisor). +-export([init/1]). + +-define(REF(ID), {via, gproc, {n, l, {?MODULE, ID}}}). + +start_link(ID) -> + supervisor:start_link(?REF(ID), ?MODULE, []). + +start_child(ID, Storage, Transfer) -> + Childspec = #{ + id => {Storage, Transfer}, + start => {emqx_ft_assembler, start_link, [Storage, Transfer]}, + restart => transient + }, + supervisor:start_child(?REF(ID), Childspec). + +init(_) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 100, + period => 1000 + }, + {ok, SupFlags, []}. diff --git a/apps/emqx_ft/src/emqx_ft_assembly.erl b/apps/emqx_ft/src/emqx_ft_assembly.erl new file mode 100644 index 000000000..854b420e0 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_assembly.erl @@ -0,0 +1,364 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_assembly). + +-export([new/0]). +-export([append/3]). +-export([update/1]). + +-export([status/1]). +-export([filemeta/1]). +-export([coverage/1]). +-export([properties/1]). + +-record(asm, { + status :: _TODO, + coverage :: _TODO, + properties :: _TODO, + meta :: _TODO, + % orddict:orddict(K, V) + segs :: _TODO, + size +}). + +new() -> + #asm{ + status = {incomplete, {missing, filemeta}}, + meta = orddict:new(), + segs = orddict:new(), + size = 0 + }. + +append(Asm, Node, Fragments) when is_list(Fragments) -> + lists:foldl(fun(F, AsmIn) -> append(AsmIn, Node, F) end, Asm, Fragments); +append(Asm, Node, Fragment = #{fragment := {filemeta, _}}) -> + append_filemeta(Asm, Node, Fragment); +append(Asm, Node, Segment = #{fragment := {segmentinfo, _}}) -> + append_segmentinfo(Asm, Node, Segment). + +update(Asm) -> + case status(meta, Asm) of + {complete, _Meta} -> + case status(coverage, Asm) of + {complete, Coverage, Props} -> + Asm#asm{ + status = complete, + coverage = Coverage, + properties = Props + }; + Status -> + Asm#asm{status = Status} + end; + Status -> + Asm#asm{status = Status} + end. + +status(#asm{status = Status}) -> + Status. + +filemeta(Asm) -> + case status(meta, Asm) of + {complete, Meta} -> Meta; + _Other -> undefined + end. + +coverage(#asm{coverage = Coverage}) -> + Coverage. + +properties(#asm{properties = Properties}) -> + Properties. + +status(meta, #asm{meta = Meta}) -> + status(meta, orddict:to_list(Meta)); +status(meta, [{Meta, {_Node, _Frag}}]) -> + {complete, Meta}; +status(meta, []) -> + {incomplete, {missing, filemeta}}; +status(meta, [_M1, _M2 | _] = Metas) -> + {error, {inconsistent, [Frag#{node => Node} || {_, {Node, Frag}} <- Metas]}}; +status(coverage, #asm{segs = Segments, size = Size}) -> + case coverage(orddict:to_list(Segments), 0, Size) of + Coverage when is_list(Coverage) -> + {complete, Coverage, #{ + dominant => dominant(Coverage) + }}; + Missing = {missing, _} -> + {incomplete, Missing} + end. + +append_filemeta(Asm, Node, Fragment = #{fragment := {filemeta, Meta}}) -> + Asm#asm{ + meta = orddict:store(Meta, {Node, Fragment}, Asm#asm.meta) + }. + +append_segmentinfo(Asm, Node, Fragment = #{fragment := {segmentinfo, Info}}) -> + Offset = maps:get(offset, Info), + Size = maps:get(size, Info), + End = Offset + Size, + Asm#asm{ + % TODO + % In theory it's possible to have two segments with same offset + size on + % different nodes but with differing content. We'd need a checksum to + % be able to disambiguate them though. + segs = orddict:store({Offset, locality(Node), -End, Node}, Fragment, Asm#asm.segs), + size = max(End, Asm#asm.size) + }. + +coverage([{{Offset, _, _, _}, _Segment} | Rest], Cursor, Sz) when Offset < Cursor -> + coverage(Rest, Cursor, Sz); +coverage([{{Cursor, _Locality, MEnd, Node}, Segment} | Rest], Cursor, Sz) -> + % NOTE + % We consider only whole fragments here, so for example from the point of view of + % this algo `[{Offset1 = 0, Size1 = 15}, {Offset2 = 10, Size2 = 10}]` has no + % coverage. + case coverage(Rest, -MEnd, Sz) of + Coverage when is_list(Coverage) -> + [{Node, Segment} | Coverage]; + Missing = {missing, _} -> + case coverage(Rest, Cursor, Sz) of + CoverageAlt when is_list(CoverageAlt) -> + CoverageAlt; + {missing, _} -> + Missing + end + end; +coverage([{{Offset, _MEnd, _, _}, _Segment} | _], Cursor, _Sz) when Offset > Cursor -> + {missing, {segment, Cursor, Offset}}; +coverage([], Cursor, Sz) when Cursor < Sz -> + {missing, {segment, Cursor, Sz}}; +coverage([], Cursor, Cursor) -> + []. + +dominant(Coverage) -> + % TODO: needs improvement, better defined _dominance_, maybe some score + Freqs = frequencies(fun({Node, Segment}) -> {Node, segsize(Segment)} end, Coverage), + maxfreq(Freqs, node()). + +frequencies(Fun, List) -> + lists:foldl( + fun(E, Acc) -> + {K, N} = Fun(E), + maps:update_with(K, fun(M) -> M + N end, N, Acc) + end, + #{}, + List + ). + +maxfreq(Freqs, Init) -> + {_, Max} = maps:fold( + fun + (F, N, {M, _MF}) when N > M -> {N, F}; + (_F, _N, {M, MF}) -> {M, MF} + end, + {0, Init}, + Freqs + ), + Max. + +locality(Node) when Node =:= node() -> + % NOTE + % This should prioritize locally available segments over those on remote nodes. + 0; +locality(_RemoteNode) -> + 1. + +segsize(#{fragment := {segmentinfo, Info}}) -> + maps:get(size, Info). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +incomplete_new_test() -> + ?assertEqual( + {incomplete, {missing, filemeta}}, + status(update(new())) + ). + +incomplete_test() -> + ?assertEqual( + {incomplete, {missing, filemeta}}, + status( + update( + append(new(), node(), [ + segmentinfo(p1, 0, 42), + segmentinfo(p1, 42, 100) + ]) + ) + ) + ). + +consistent_test() -> + Asm1 = append(new(), n1, [filemeta(m1, "blarg")]), + Asm2 = append(Asm1, n2, [segmentinfo(s2, 0, 42)]), + Asm3 = append(Asm2, n3, [filemeta(m3, "blarg")]), + ?assertMatch({complete, _}, status(meta, Asm3)). + +inconsistent_test() -> + Asm1 = append(new(), node(), [segmentinfo(s1, 0, 42)]), + Asm2 = append(Asm1, n1, [filemeta(m1, "blarg")]), + Asm3 = append(Asm2, n2, [segmentinfo(s2, 0, 42), filemeta(m1, "blorg")]), + Asm4 = append(Asm3, n3, [filemeta(m3, "blarg")]), + ?assertMatch( + {error, + {inconsistent, [ + % blarg < blorg + #{node := n3, path := m3, fragment := {filemeta, #{name := "blarg"}}}, + #{node := n2, path := m1, fragment := {filemeta, #{name := "blorg"}}} + ]}}, + status(meta, Asm4) + ). + +simple_coverage_test() -> + Node = node(), + Segs = [ + {node42, segmentinfo(n1, 20, 30)}, + {Node, segmentinfo(n2, 0, 10)}, + {Node, segmentinfo(n3, 50, 50)}, + {Node, segmentinfo(n4, 10, 10)} + ], + Asm = append_many(new(), Segs), + ?assertMatch( + {complete, + [ + {Node, #{path := n2}}, + {Node, #{path := n4}}, + {node42, #{path := n1}}, + {Node, #{path := n3}} + ], + #{dominant := Node}}, + status(coverage, Asm) + ). + +redundant_coverage_test() -> + Node = node(), + Segs = [ + {Node, segmentinfo(n1, 0, 20)}, + {node1, segmentinfo(n2, 0, 10)}, + {Node, segmentinfo(n3, 20, 40)}, + {node2, segmentinfo(n4, 10, 10)}, + {node2, segmentinfo(n5, 50, 20)}, + {node3, segmentinfo(n6, 20, 20)}, + {Node, segmentinfo(n7, 50, 10)}, + {node1, segmentinfo(n8, 40, 10)} + ], + Asm = append_many(new(), Segs), + ?assertMatch( + {complete, + [ + {Node, #{path := n1}}, + {node3, #{path := n6}}, + {node1, #{path := n8}}, + {node2, #{path := n5}} + ], + #{dominant := _}}, + status(coverage, Asm) + ). + +redundant_coverage_prefer_local_test() -> + Node = node(), + Segs = [ + {node1, segmentinfo(n1, 0, 20)}, + {Node, segmentinfo(n2, 0, 10)}, + {Node, segmentinfo(n3, 10, 10)}, + {node2, segmentinfo(n4, 20, 20)}, + {Node, segmentinfo(n5, 30, 10)}, + {Node, segmentinfo(n6, 20, 10)} + ], + Asm = append_many(new(), Segs), + ?assertMatch( + {complete, + [ + {Node, #{path := n2}}, + {Node, #{path := n3}}, + {Node, #{path := n6}}, + {Node, #{path := n5}} + ], + #{dominant := Node}}, + status(coverage, Asm) + ). + +missing_coverage_test() -> + Node = node(), + Segs = [ + {Node, segmentinfo(n1, 0, 10)}, + {node1, segmentinfo(n3, 10, 20)}, + {Node, segmentinfo(n2, 0, 20)}, + {node2, segmentinfo(n4, 50, 50)}, + {Node, segmentinfo(n5, 40, 60)} + ], + Asm = append_many(new(), Segs), + ?assertEqual( + % {incomplete, {missing, {segment, 30, 40}}}, ??? + {incomplete, {missing, {segment, 20, 40}}}, + status(coverage, Asm) + ). + +missing_end_coverage_test() -> + Node = node(), + Segs = [ + {Node, segmentinfo(n1, 0, 15)}, + {node1, segmentinfo(n3, 10, 10)} + ], + Asm = append_many(new(), Segs), + ?assertEqual( + {incomplete, {missing, {segment, 15, 20}}}, + status(coverage, Asm) + ). + +missing_coverage_with_redudancy_test() -> + Segs = [ + {node(), segmentinfo(n1, 0, 10)}, + {node(), segmentinfo(n2, 0, 20)}, + {node42, segmentinfo(n3, 10, 20)}, + {node43, segmentinfo(n4, 10, 50)}, + {node(), segmentinfo(n5, 40, 60)} + ], + Asm = append_many(new(), Segs), + ?assertEqual( + % {incomplete, {missing, {segment, 50, 60}}}, ??? + {incomplete, {missing, {segment, 20, 40}}}, + status(coverage, Asm) + ). + +append_many(Asm, List) -> + lists:foldl( + fun({Node, Frag}, Acc) -> append(Acc, Node, Frag) end, + Asm, + List + ). + +filemeta(Path, Name) -> + #{ + path => Path, + fragment => + {filemeta, #{ + name => Name + }} + }. + +segmentinfo(Path, Offset, Size) -> + #{ + path => Path, + fragment => + {segmentinfo, #{ + offset => Offset, + size => Size + }} + }. + +-endif. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl new file mode 100644 index 000000000..2d50e1b18 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -0,0 +1,425 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_fs). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +% -compile(export_all). + +-export([store_filemeta/3]). +-export([store_segment/3]). +-export([list/2]). +-export([assemble/2]). + +-export([open_file/3]). +-export([complete/4]). +-export([write/2]). +-export([discard/1]). + +% -behaviour(gen_server). +% -export([init/1]). +% -export([handle_call/3]). +% -export([handle_cast/2]). + +-type json_value() :: + null + | boolean() + | binary() + | number() + | [json_value()] + | #{binary() => json_value()}. + +-reflect_type([json_value/0]). + +-type transfer() :: emqx_ft:transfer(). +-type offset() :: emqx_ft:offset(). + +%% TODO: move to `emqx_ft` interface module +% -type sha256_hex() :: <<_:512>>. + +-type filemeta() :: #{ + %% Display name + name := string(), + %% Size in bytes, as advertised by the client. + %% Client is free to specify here whatever it wants, which means we can end + %% up with a file of different size after assembly. It's not clear from + %% specification what that means (e.g. what are clients' expectations), we + %% currently do not condider that an error (or, specifically, a signal that + %% the resulting file is corrupted during transmission). + size => _Bytes :: non_neg_integer(), + checksum => {sha256, <<_:256>>}, + expire_at := emqx_datetime:epoch_second(), + %% TTL of individual segments + %% Somewhat confusing that we won't know it on the nodes where the filemeta + %% is missing. + segments_ttl => _Seconds :: pos_integer(), + user_data => json_value() +}. + +-type segment() :: {offset(), _Content :: binary()}. + +-type segmentinfo() :: #{ + offset := offset(), + size := _Bytes :: non_neg_integer() +}. + +-type filefrag(T) :: #{ + path := file:name(), + timestamp := emqx_datetime:epoch_second(), + fragment := T +}. + +-type filefrag() :: filefrag({filemeta, filemeta()} | {segment, segmentinfo()}). + +-define(MANIFEST, "MANIFEST.json"). +-define(SEGMENT, "SEG"). +-define(TEMP, "TMP"). + +-type root() :: file:name(). + +% -record(st, { +% root :: file:name() +% }). + +%% TODO +-type storage() :: root(). + +%% + +-define(PROCREF(Root), {via, gproc, {n, l, {?MODULE, Root}}}). + +-spec start_link(root()) -> + {ok, pid()} | {error, already_started}. +start_link(Root) -> + gen_server:start_link(?PROCREF(Root), ?MODULE, [], []). + +%% Store manifest in the backing filesystem. +%% Atomic operation. +-spec store_filemeta(storage(), transfer(), filemeta()) -> + % Quota? Some lower level errors? + ok | {error, conflict} | {error, _TODO}. +store_filemeta(Storage, Transfer, Meta) -> + Filepath = mk_filepath(Storage, Transfer, ?MANIFEST), + case read_file(Filepath, fun decode_filemeta/1) of + {ok, Meta} -> + _ = touch_file(Filepath), + ok; + {ok, Conflict} -> + % TODO + % We won't see conflicts in case of concurrent `store_filemeta` + % requests. It's rather odd scenario so it's fine not to worry + % about it too much now. + {error, conflict}; + {error, Reason} when Reason =:= notfound; Reason =:= corrupted -> + write_file_atomic(Filepath, encode_filemeta(Meta)) + end. + +%% Store a segment in the backing filesystem. +%% Atomic operation. +-spec store_segment(storage(), transfer(), segment()) -> + % Where is the checksum gets verified? Upper level probably. + % Quota? Some lower level errors? + ok | {error, _TODO}. +store_segment(Storage, Transfer, Segment = {_Offset, Content}) -> + Filepath = mk_filepath(Storage, Transfer, mk_segment_filename(Segment)), + write_file_atomic(Filepath, Content). + +-spec list(storage(), transfer()) -> + % Some lower level errors? {error, notfound}? + % Result will contain zero or only one filemeta. + {ok, list(filefrag())} | {error, _TODO}. +list(Storage, Transfer) -> + Dirname = mk_filedir(Storage, Transfer), + case file:list_dir(Dirname) of + {ok, Filenames} -> + {ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)}; + {error, enoent} -> + {ok, []}; + {error, _} = Error -> + Error + end. + +-spec assemble(storage(), transfer()) -> + % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}. + {ok, _Assembler :: pid()} | {error, _TODO}. +assemble(Storage, Transfer) -> + emqx_ft_assembler_sup:start_child(Storage, Storage, Transfer). + +%% + +-opaque handle() :: {file:name(), io:device(), crypto:hash_state()}. + +-spec open_file(storage(), transfer(), filemeta()) -> + {ok, handle()} | {error, _TODO}. +open_file(Storage, Transfer, Filemeta) -> + Filename = maps:get(name, Filemeta), + Filepath = mk_filepath(Storage, Transfer, Filename), + TempFilepath = mk_temp_filepath(Filepath), + case file:open(TempFilepath, [write, raw]) of + {ok, Handle} -> + _ = file:truncate(Handle), + {ok, {TempFilepath, Handle, init_checksum(Filemeta)}}; + {error, _} = Error -> + Error + end. + +-spec write(handle(), iodata()) -> + ok | {error, _TODO}. +write({Filepath, IoDevice, Ctx}, IoData) -> + case file:write(IoDevice, IoData) of + ok -> + {ok, {Filepath, IoDevice, update_checksum(Ctx, IoData)}}; + {error, _} = Error -> + Error + end. + +-spec complete(storage(), transfer(), filemeta(), handle()) -> + ok | {error, {checksum, _Algo, _Computed}} | {error, _TODO}. +complete(Storage, Transfer, Filemeta, Handle = {Filepath, IoDevice, Ctx}) -> + TargetFilepath = mk_filepath(Storage, Transfer, maps:get(name, Filemeta)), + case verify_checksum(Ctx, Filemeta) of + ok -> + ok = file:close(IoDevice), + file:rename(Filepath, TargetFilepath); + {error, _} = Error -> + _ = discard(Handle), + Error + end. + +-spec discard(handle()) -> + ok. +discard({Filepath, IoDevice, _Ctx}) -> + ok = file:close(IoDevice), + file:delete(Filepath). + +init_checksum(#{checksum := {Algo, _}}) -> + crypto:hash_init(Algo); +init_checksum(#{}) -> + undefined. + +update_checksum(Ctx, IoData) when Ctx /= undefined -> + crypto:hash_update(Ctx, IoData); +update_checksum(undefined, _IoData) -> + undefined. + +verify_checksum(Ctx, #{checksum := {Algo, Digest}}) when Ctx /= undefined -> + case crypto:hash_final(Ctx) of + Digest -> + ok; + Mismatch -> + {error, {checksum, Algo, binary:encode_hex(Mismatch)}} + end; +verify_checksum(undefined, _) -> + ok. + +%% + +-spec init(root()) -> {ok, storage()}. +init(Root) -> + % TODO: garbage_collect(...) + {ok, Root}. + +%% + +-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]). + +schema() -> + #{ + roots => [ + {name, hoconsc:mk(string(), #{required => true})}, + {size, hoconsc:mk(non_neg_integer())}, + {expire_at, hoconsc:mk(non_neg_integer(), #{required => true})}, + {checksum, hoconsc:mk({atom(), binary()}, #{converter => converter(checksum)})}, + {segments_ttl, hoconsc:mk(pos_integer())}, + {user_data, hoconsc:mk(json_value())} + ] + }. + +% encode_filemeta(Meta) -> +% emqx_json:encode( +% ?PRELUDE( +% _Vsn = 1, +% maps:map( +% fun +% (name, Name) -> +% {<<"name">>, Name}; +% (size, Size) -> +% {<<"size">>, Size}; +% (checksum, {sha256, Hash}) -> +% {<<"checksum">>, <<"sha256:", (binary:encode_hex(Hash))/binary>>}; +% (expire_at, ExpiresAt) -> +% {<<"expire_at">>, ExpiresAt}; +% (segments_ttl, TTL) -> +% {<<"segments_ttl">>, TTL}; +% (user_data, UserData) -> +% {<<"user_data">>, UserData} +% end, +% Meta +% ) +% ) +% ). + +encode_filemeta(Meta) -> + % TODO: Looks like this should be hocon's responsibility. + Term = hocon_tconf:make_serializable(schema(), emqx_map_lib:binary_key_map(Meta), #{}), + emqx_json:encode(?PRELUDE(_Vsn = 1, Term)). + +decode_filemeta(Binary) -> + ?PRELUDE(_Vsn = 1, Term) = emqx_json:decode(Binary, [return_maps]), + hocon_tconf:check_plain(schema(), Term, #{atom_key => true, required => false}). + +converter(checksum) -> + fun + (undefined, #{}) -> + undefined; + ({sha256, Bin}, #{make_serializable := true}) -> + _ = is_binary(Bin) orelse throw({expected_type, string}), + _ = byte_size(Bin) =:= 32 orelse throw({expected_length, 32}), + binary:encode_hex(Bin); + (Hex, #{}) -> + _ = is_binary(Hex) orelse throw({expected_type, string}), + _ = byte_size(Hex) =:= 64 orelse throw({expected_length, 64}), + {sha256, binary:decode_hex(Hex)} + end. + +% map_into(Fun, Into, Ks, Map) -> +% map_foldr(map_into_fn(Fun, Into), Into, Ks, Map). + +% map_into_fn(Fun, L) when is_list(L) -> +% fun(K, V, Acc) -> [{K, Fun(K, V)} || Acc] end. + +% map_foldr(_Fun, Acc, [], _) -> +% Acc; +% map_foldr(Fun, Acc, [K | Ks], Map) when is_map_key(K, Map) -> +% Fun(K, maps:get(K, Map), map_foldr(Fun, Acc, Ks, Map)); +% map_foldr(Fun, Acc, [_ | Ks], Map) -> +% map_foldr(Fun, Acc, Ks, Map). + +%% + +mk_segment_filename({Offset, Content}) -> + lists:concat([?SEGMENT, ".", Offset, ".", byte_size(Content)]). + +break_segment_filename(Filename) -> + Regex = "^" ?SEGMENT "[.]([0-9]+)[.]([0-9]+)$", + Result = re:run(Filename, Regex, [{capture, all_but_first, list}]), + case Result of + {match, [Offset, Size]} -> + {ok, #{offset => list_to_integer(Offset), size => list_to_integer(Size)}}; + nomatch -> + {error, invalid} + end. + +mk_filedir(Storage, {ClientId, FileId}) -> + filename:join([get_storage_root(Storage), ClientId, FileId]). + +mk_filepath(Storage, Transfer, Filename) -> + filename:join(mk_filedir(Storage, Transfer), Filename). + +get_storage_root(Storage) -> + Storage. + +%% + +-include_lib("kernel/include/file.hrl"). + +read_file(Filepath) -> + file:read_file(Filepath). + +read_file(Filepath, DecodeFun) -> + case read_file(Filepath) of + {ok, Content} -> + safe_decode(Content, DecodeFun); + {error, _} = Error -> + Error + end. + +safe_decode(Content, DecodeFun) -> + try + {ok, DecodeFun(Content)} + catch + C:R:Stacktrace -> + % TODO: Log? + {error, corrupted} + end. + +write_file_atomic(Filepath, Content) when is_binary(Content) -> + Result = emqx_misc:pipeline( + [ + fun filelib:ensure_dir/1, + fun mk_temp_filepath/1, + fun write_contents/2, + fun mv_temp_file/1 + ], + Filepath, + Content + ), + case Result of + {ok, {Filepath, TempFilepath}, _} -> + _ = file:delete(TempFilepath), + ok; + {error, Reason, _} -> + {error, Reason} + end. + +mk_temp_filepath(Filepath) -> + Dirname = filename:dirname(Filepath), + Filename = filename:basename(Filepath), + Unique = erlang:unique_integer([positive]), + TempFilepath = filename:join(Dirname, ?TEMP ++ integer_to_list(Unique) ++ "." ++ Filename), + {Filepath, TempFilepath}. + +write_contents({_Filepath, TempFilepath}, Content) -> + file:write_file(TempFilepath, Content). + +mv_temp_file({Filepath, TempFilepath}) -> + file:rename(TempFilepath, Filepath). + +touch_file(Filepath) -> + Now = erlang:localtime(), + file:change_time(Filepath, _Mtime = Now, _Atime = Now). + +filtermap_files(Fun, Dirname, Filenames) -> + lists:filtermap(fun(Filename) -> Fun(Dirname, Filename) end, Filenames). + +mk_filefrag(Dirname, Filename = ?MANIFEST) -> + mk_filefrag(Dirname, Filename, fun read_filemeta/2); +mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) -> + mk_filefrag(Dirname, Filename, fun read_segmentinfo/2); +mk_filefrag(_Dirname, _) -> + false. + +mk_filefrag(Dirname, Filename, Fun) -> + Filepath = filename:join(Dirname, Filename), + Fileinfo = file:read_file_info(Filepath), + case Fun(Filename, Filepath) of + {ok, Frag} -> + {true, #{ + path => Filepath, + timestamp => Fileinfo#file_info.mtime, + fragment => Frag + }}; + {error, Reason} -> + false + end. + +read_filemeta(_Filename, Filepath) -> + read_file(Filepath, fun decode_filemeta/1). + +read_segmentinfo(Filename, _Filepath) -> + break_segment_filename(Filename).