diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index a86116b02..17fed012a 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -24,12 +24,12 @@ -export([handle_event/4]). -export([terminate/3]). --record(st, { - storage :: _Storage, - transfer :: emqx_ft:transfer(), - assembly :: emqx_ft_assembly:t(), - export :: _Export | undefined -}). +-type stdata() :: #{ + storage := emqx_ft_storage_fs:storage(), + transfer := emqx_ft:transfer(), + assembly := emqx_ft_assembly:t(), + export => emqx_ft_storage_exporter:export() +}. -define(NAME(Transfer), {n, l, {?MODULE, Transfer}}). -define(REF(Transfer), {via, gproc, ?NAME(Transfer)}). @@ -41,31 +41,48 @@ start_link(Storage, Transfer, Size) -> %% +-type state() :: + idle + | list_local_fragments + | {list_remote_fragments, [node()]} + | start_assembling + | {assemble, [{node(), emqx_ft_storage_fs:filefrag()}]} + | complete. + -define(internal(C), {next_event, internal, C}). callback_mode() -> handle_event_function. +-spec init(_Args) -> {ok, state(), stdata()}. init({Storage, Transfer, Size}) -> _ = erlang:process_flag(trap_exit, true), - St = #st{ - storage = Storage, - transfer = Transfer, - assembly = emqx_ft_assembly:new(Size) + St = #{ + storage => Storage, + transfer => Transfer, + assembly => emqx_ft_assembly:new(Size) }, {ok, idle, St}. +-spec handle_event(info | internal, _, state(), stdata()) -> + {next_state, state(), stdata(), {next_event, internal, _}} + | {stop, {shutdown, ok | {error, _}}, stdata()}. handle_event(info, kickoff, idle, St) -> % NOTE % Someone's told us to start the work, which usually means that it has set up a monitor. % We could wait for this message and handle it at the end of the assembling rather than at % the beginning, however it would make error handling much more messier. {next_state, list_local_fragments, St, ?internal([])}; -handle_event(internal, _, list_local_fragments, St = #st{}) -> +handle_event( + internal, + _, + list_local_fragments, + St = #{storage := Storage, transfer := Transfer, assembly := Asm} +) -> % TODO: what we do with non-transients errors here (e.g. `eacces`)? - {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer, fragment), - NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(St#st.assembly, node(), Fragments)), - NSt = St#st{assembly = NAsm}, + {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer, fragment), + NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)), + NSt = St#{assembly := NAsm}, case emqx_ft_assembly:status(NAsm) of complete -> {next_state, start_assembling, NSt, ?internal([])}; @@ -76,27 +93,32 @@ handle_event(internal, _, list_local_fragments, St = #st{}) -> {error, _} = Error -> {stop, {shutdown, Error}} end; -handle_event(internal, _, {list_remote_fragments, Nodes}, St) -> +handle_event( + internal, + _, + {list_remote_fragments, Nodes}, + St = #{transfer := Transfer, assembly := Asm} +) -> % TODO % Async would better because we would not need to wait for some lagging nodes if % the coverage is already complete. % TODO: portable "storage" ref - Results = emqx_ft_storage_fs_proto_v1:multilist(Nodes, St#st.transfer, fragment), + Results = emqx_ft_storage_fs_proto_v1:multilist(Nodes, Transfer, fragment), NodeResults = lists:zip(Nodes, Results), NAsm = emqx_ft_assembly:update( lists:foldl( fun - ({Node, {ok, {ok, Fragments}}}, Asm) -> - emqx_ft_assembly:append(Asm, Node, Fragments); - ({_Node, _Result}, Asm) -> + ({Node, {ok, {ok, Fragments}}}, Acc) -> + emqx_ft_assembly:append(Acc, Node, Fragments); + ({_Node, _Result}, Acc) -> % TODO: log? - Asm + Acc end, - St#st.assembly, + Asm, NodeResults ) ), - NSt = St#st{assembly = NAsm}, + NSt = St#{assembly := NAsm}, case emqx_ft_assembly:status(NAsm) of complete -> {next_state, start_assembling, NSt, ?internal([])}; @@ -106,43 +128,51 @@ handle_event(internal, _, {list_remote_fragments, Nodes}, St) -> {error, _} = Error -> {stop, {shutdown, Error}} end; -handle_event(internal, _, start_assembling, St = #st{}) -> - Filemeta = emqx_ft_assembly:filemeta(St#st.assembly), - Coverage = emqx_ft_assembly:coverage(St#st.assembly), +handle_event( + internal, + _, + start_assembling, + St = #{storage := Storage, transfer := Transfer, assembly := Asm} +) -> + Filemeta = emqx_ft_assembly:filemeta(Asm), + Coverage = emqx_ft_assembly:coverage(Asm), % TODO: better error handling {ok, Export} = emqx_ft_storage_exporter:start_export( - St#st.storage, - St#st.transfer, + Storage, + Transfer, Filemeta ), - {next_state, {assemble, Coverage}, St#st{export = Export}, ?internal([])}; -handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) -> + {next_state, {assemble, Coverage}, St#{export => Export}, ?internal([])}; +handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export := Export}) -> % TODO % Currently, race is possible between getting segment info from the remote node and % this node garbage collecting the segment itself. % TODO: pipelining % TODO: better error handling {ok, Content} = pread(Node, Segment, St), - {ok, NExport} = emqx_ft_storage_exporter:write(St#st.export, Content), - {next_state, {assemble, Rest}, St#st{export = NExport}, ?internal([])}; -handle_event(internal, _, {assemble, []}, St = #st{}) -> + {ok, NExport} = emqx_ft_storage_exporter:write(Export, Content), + {next_state, {assemble, Rest}, St#{export := NExport}, ?internal([])}; +handle_event(internal, _, {assemble, []}, St = #{}) -> {next_state, complete, St, ?internal([])}; -handle_event(internal, _, complete, St = #st{}) -> - Result = emqx_ft_storage_exporter:complete(St#st.export), +handle_event(internal, _, complete, St = #{export := Export}) -> + Result = emqx_ft_storage_exporter:complete(Export), ok = maybe_garbage_collect(Result, St), - {stop, {shutdown, Result}, St#st{export = undefined}}. + {stop, {shutdown, Result}, maps:remove(export, St)}. -terminate(_Reason, _StateName, #st{export = Export}) -> - Export /= undefined andalso emqx_ft_storage_exporter:discard(Export). +-spec terminate(_Reason, state(), stdata()) -> _. +terminate(_Reason, _StateName, #{export := Export}) -> + emqx_ft_storage_exporter:discard(Export); +terminate(_Reason, _StateName, #{}) -> + ok. -pread(Node, Segment, St) when Node =:= node() -> - emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)); -pread(Node, Segment, St) -> - emqx_ft_storage_fs_proto_v1:pread(Node, St#st.transfer, Segment, 0, segsize(Segment)). +pread(Node, Segment, #{storage := Storage, transfer := Transfer}) when Node =:= node() -> + emqx_ft_storage_fs:pread(Storage, Transfer, Segment, 0, segsize(Segment)); +pread(Node, Segment, #{transfer := Transfer}) -> + emqx_ft_storage_fs_proto_v1:pread(Node, Transfer, Segment, 0, segsize(Segment)). %% -maybe_garbage_collect(ok, #st{storage = Storage, transfer = Transfer, assembly = Asm}) -> +maybe_garbage_collect(ok, #{storage := Storage, transfer := Transfer, assembly := Asm}) -> Nodes = emqx_ft_assembly:nodes(Asm), emqx_ft_storage_fs_gc:collect(Storage, Transfer, Nodes); maybe_garbage_collect({error, _}, _St) -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl index 97e260e3a..954423fba 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl @@ -34,6 +34,9 @@ -export([exporter/1]). +-export_type([options/0]). +-export_type([export/0]). + -type storage() :: emxt_ft_storage_fs:storage(). -type transfer() :: emqx_ft:transfer(). -type filemeta() :: emqx_ft:filemeta().