fix(ft-asm): ensure module follows statem behaviour

This commit is contained in:
Andrew Mayorov 2023-02-02 21:29:35 +03:00 committed by Ilya Averyanov
parent 37d8930341
commit d2bb574921
1 changed files with 8 additions and 8 deletions

View File

@ -45,7 +45,7 @@
%% %%
start_link(Storage, Transfer, Callback) -> start_link(Storage, Transfer, Callback) ->
gen_server:start_link(?MODULE, {Storage, Transfer, Callback}, []). gen_statem:start_link(?MODULE, {Storage, Transfer, Callback}, []).
%% %%
@ -64,7 +64,7 @@ init({Storage, Transfer, Callback}) ->
}, },
{ok, list_local_fragments, St, ?internal([])}. {ok, list_local_fragments, St, ?internal([])}.
handle_event(list_local_fragments, internal, _, St = #st{assembly = Asm}) -> handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
% TODO: what we do with non-transients errors here (e.g. `eacces`)? % TODO: what we do with non-transients errors here (e.g. `eacces`)?
{ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer), {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer),
NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)), NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
@ -79,7 +79,7 @@ handle_event(list_local_fragments, internal, _, St = #st{assembly = Asm}) ->
% {error, _} = Reason -> % {error, _} = Reason ->
% {stop, Reason} % {stop, Reason}
end; end;
handle_event({list_remote_fragments, Nodes}, internal, _, St) -> handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
% TODO: portable "storage" ref % TODO: portable "storage" ref
Args = [St#st.storage, St#st.transfer], Args = [St#st.storage, St#st.transfer],
% TODO % TODO
@ -109,13 +109,13 @@ handle_event({list_remote_fragments, Nodes}, internal, _, St) ->
{incomplete, _} = Status -> {incomplete, _} = Status ->
{stop, {error, Status}} {stop, {error, Status}}
end; end;
handle_event(start_assembling, internal, _, St = #st{assembly = Asm}) -> handle_event(internal, _, start_assembling, St = #st{assembly = Asm}) ->
Filemeta = emqx_ft_assembly:filemeta(Asm), Filemeta = emqx_ft_assembly:filemeta(Asm),
Coverage = emqx_ft_assembly:coverage(Asm), Coverage = emqx_ft_assembly:coverage(Asm),
% TODO: errors % TODO: errors
{ok, Handle} = emqx_ft_storage_fs:open_file(St#st.storage, St#st.transfer, Filemeta), {ok, Handle} = emqx_ft_storage_fs:open_file(St#st.storage, St#st.transfer, Filemeta),
{next_state, {assemble, Coverage}, St#st{file = Handle}, ?internal([])}; {next_state, {assemble, Coverage}, St#st{file = Handle}, ?internal([])};
handle_event({assemble, [{Node, Segment} | Rest]}, internal, _, St = #st{}) -> handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
% TODO % TODO
% Currently, race is possible between getting segment info from the remote node and % Currently, race is possible between getting segment info from the remote node and
% this node garbage collecting the segment itself. % this node garbage collecting the segment itself.
@ -128,9 +128,9 @@ handle_event({assemble, [{Node, Segment} | Rest]}, internal, _, St = #st{}) ->
% {error, _} -> % {error, _} ->
% ... % ...
end; end;
handle_event({assemble, []}, internal, _, St = #st{}) -> handle_event(internal, _, {assemble, []}, St = #st{}) ->
{next_state, complete, St, ?internal([])}; {next_state, complete, St, ?internal([])};
handle_event(complete, internal, _, St = #st{assembly = Asm, file = Handle, callback = Callback}) -> handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle, callback = Callback}) ->
Filemeta = emqx_ft_assembly:filemeta(Asm), Filemeta = emqx_ft_assembly:filemeta(Asm),
Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle), Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle),
%% TODO: safe apply %% TODO: safe apply
@ -160,5 +160,5 @@ handle_event(complete, internal, _, St = #st{assembly = Asm, file = Handle, call
%% %%
segsize(#{fragment := {segmentinfo, Info}}) -> segsize(#{fragment := {segment, Info}}) ->
maps:get(size, Info). maps:get(size, Info).