fix(ft-asm): ensure module follows statem behaviour
This commit is contained in:
parent
7b77e96ab9
commit
1fedae8a16
|
@ -45,7 +45,7 @@
|
||||||
%%
|
%%
|
||||||
|
|
||||||
start_link(Storage, Transfer, Callback) ->
|
start_link(Storage, Transfer, Callback) ->
|
||||||
gen_server:start_link(?MODULE, {Storage, Transfer, Callback}, []).
|
gen_statem:start_link(?MODULE, {Storage, Transfer, Callback}, []).
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ init({Storage, Transfer, Callback}) ->
|
||||||
},
|
},
|
||||||
{ok, list_local_fragments, St, ?internal([])}.
|
{ok, list_local_fragments, St, ?internal([])}.
|
||||||
|
|
||||||
handle_event(list_local_fragments, internal, _, St = #st{assembly = Asm}) ->
|
handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
|
||||||
% TODO: what we do with non-transients errors here (e.g. `eacces`)?
|
% TODO: what we do with non-transients errors here (e.g. `eacces`)?
|
||||||
{ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer),
|
{ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer),
|
||||||
NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
|
NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
|
||||||
|
@ -79,7 +79,7 @@ handle_event(list_local_fragments, internal, _, St = #st{assembly = Asm}) ->
|
||||||
% {error, _} = Reason ->
|
% {error, _} = Reason ->
|
||||||
% {stop, Reason}
|
% {stop, Reason}
|
||||||
end;
|
end;
|
||||||
handle_event({list_remote_fragments, Nodes}, internal, _, St) ->
|
handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
|
||||||
% TODO: portable "storage" ref
|
% TODO: portable "storage" ref
|
||||||
Args = [St#st.storage, St#st.transfer],
|
Args = [St#st.storage, St#st.transfer],
|
||||||
% TODO
|
% TODO
|
||||||
|
@ -109,13 +109,13 @@ handle_event({list_remote_fragments, Nodes}, internal, _, St) ->
|
||||||
{incomplete, _} = Status ->
|
{incomplete, _} = Status ->
|
||||||
{stop, {error, Status}}
|
{stop, {error, Status}}
|
||||||
end;
|
end;
|
||||||
handle_event(start_assembling, internal, _, St = #st{assembly = Asm}) ->
|
handle_event(internal, _, start_assembling, St = #st{assembly = Asm}) ->
|
||||||
Filemeta = emqx_ft_assembly:filemeta(Asm),
|
Filemeta = emqx_ft_assembly:filemeta(Asm),
|
||||||
Coverage = emqx_ft_assembly:coverage(Asm),
|
Coverage = emqx_ft_assembly:coverage(Asm),
|
||||||
% TODO: errors
|
% TODO: errors
|
||||||
{ok, Handle} = emqx_ft_storage_fs:open_file(St#st.storage, St#st.transfer, Filemeta),
|
{ok, Handle} = emqx_ft_storage_fs:open_file(St#st.storage, St#st.transfer, Filemeta),
|
||||||
{next_state, {assemble, Coverage}, St#st{file = Handle}, ?internal([])};
|
{next_state, {assemble, Coverage}, St#st{file = Handle}, ?internal([])};
|
||||||
handle_event({assemble, [{Node, Segment} | Rest]}, internal, _, St = #st{}) ->
|
handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
|
||||||
% TODO
|
% TODO
|
||||||
% Currently, race is possible between getting segment info from the remote node and
|
% Currently, race is possible between getting segment info from the remote node and
|
||||||
% this node garbage collecting the segment itself.
|
% this node garbage collecting the segment itself.
|
||||||
|
@ -128,9 +128,9 @@ handle_event({assemble, [{Node, Segment} | Rest]}, internal, _, St = #st{}) ->
|
||||||
% {error, _} ->
|
% {error, _} ->
|
||||||
% ...
|
% ...
|
||||||
end;
|
end;
|
||||||
handle_event({assemble, []}, internal, _, St = #st{}) ->
|
handle_event(internal, _, {assemble, []}, St = #st{}) ->
|
||||||
{next_state, complete, St, ?internal([])};
|
{next_state, complete, St, ?internal([])};
|
||||||
handle_event(complete, internal, _, St = #st{assembly = Asm, file = Handle, callback = Callback}) ->
|
handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle, callback = Callback}) ->
|
||||||
Filemeta = emqx_ft_assembly:filemeta(Asm),
|
Filemeta = emqx_ft_assembly:filemeta(Asm),
|
||||||
Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle),
|
Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle),
|
||||||
%% TODO: safe apply
|
%% TODO: safe apply
|
||||||
|
@ -160,5 +160,5 @@ handle_event(complete, internal, _, St = #st{assembly = Asm, file = Handle, call
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
segsize(#{fragment := {segmentinfo, Info}}) ->
|
segsize(#{fragment := {segment, Info}}) ->
|
||||||
maps:get(size, Info).
|
maps:get(size, Info).
|
||||||
|
|
Loading…
Reference in New Issue