From 1fedae8a1633b54f30ca9fdc16a1a5732870e238 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 2 Feb 2023 21:29:35 +0300 Subject: [PATCH] fix(ft-asm): ensure module follows statem behaviour --- apps/emqx_ft/src/emqx_ft_assembler.erl | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index dfbb2bd3e..4fd8d6e75 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -45,7 +45,7 @@ %% start_link(Storage, Transfer, Callback) -> - gen_server:start_link(?MODULE, {Storage, Transfer, Callback}, []). + gen_statem:start_link(?MODULE, {Storage, Transfer, Callback}, []). %% @@ -64,7 +64,7 @@ init({Storage, Transfer, Callback}) -> }, {ok, list_local_fragments, St, ?internal([])}. -handle_event(list_local_fragments, internal, _, St = #st{assembly = Asm}) -> +handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) -> % TODO: what we do with non-transients errors here (e.g. `eacces`)? {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer), NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)), @@ -79,7 +79,7 @@ handle_event(list_local_fragments, internal, _, St = #st{assembly = Asm}) -> % {error, _} = Reason -> % {stop, Reason} end; -handle_event({list_remote_fragments, Nodes}, internal, _, St) -> +handle_event(internal, _, {list_remote_fragments, Nodes}, St) -> % TODO: portable "storage" ref Args = [St#st.storage, St#st.transfer], % TODO @@ -109,13 +109,13 @@ handle_event({list_remote_fragments, Nodes}, internal, _, St) -> {incomplete, _} = Status -> {stop, {error, Status}} end; -handle_event(start_assembling, internal, _, St = #st{assembly = Asm}) -> +handle_event(internal, _, start_assembling, St = #st{assembly = Asm}) -> Filemeta = emqx_ft_assembly:filemeta(Asm), Coverage = emqx_ft_assembly:coverage(Asm), % TODO: errors {ok, Handle} = emqx_ft_storage_fs:open_file(St#st.storage, St#st.transfer, Filemeta), {next_state, {assemble, Coverage}, St#st{file = Handle}, ?internal([])}; -handle_event({assemble, [{Node, Segment} | Rest]}, internal, _, St = #st{}) -> +handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) -> % TODO % Currently, race is possible between getting segment info from the remote node and % this node garbage collecting the segment itself. @@ -128,9 +128,9 @@ handle_event({assemble, [{Node, Segment} | Rest]}, internal, _, St = #st{}) -> % {error, _} -> % ... end; -handle_event({assemble, []}, internal, _, St = #st{}) -> +handle_event(internal, _, {assemble, []}, St = #st{}) -> {next_state, complete, St, ?internal([])}; -handle_event(complete, internal, _, St = #st{assembly = Asm, file = Handle, callback = Callback}) -> +handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle, callback = Callback}) -> Filemeta = emqx_ft_assembly:filemeta(Asm), Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle), %% TODO: safe apply @@ -160,5 +160,5 @@ handle_event(complete, internal, _, St = #st{assembly = Asm, file = Handle, call %% -segsize(#{fragment := {segmentinfo, Info}}) -> +segsize(#{fragment := {segment, Info}}) -> maps:get(size, Info).