From c44fe92ef181aa81acc2193efed516552f5a2113 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Fri, 10 Feb 2023 14:57:10 +0200 Subject: [PATCH] feat(ft): add assembler tests --- apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf | 11 +++++ apps/emqx_ft/src/emqx_ft_assembler.erl | 35 +++++++++++---- apps/emqx_ft/src/emqx_ft_assembler_sup.erl | 2 +- apps/emqx_ft/src/emqx_ft_schema.erl | 5 +++ apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl | 43 +++++++++++++++++++ 5 files changed, 86 insertions(+), 10 deletions(-) diff --git a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf index 481ad8154..7e057fdf8 100644 --- a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf +++ b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf @@ -11,4 +11,15 @@ emqx_ft_schema { } } + local_storage_root { + desc { + en: "File system path to keep uploaded files and temporary data." + zh: "保存上传文件和临时数据的文件系统路径。" + } + label: { + en: "Local Storage Root" + zh: "本地存储根" + } + } + } diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 623e11714..38faebf03 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -16,6 +16,8 @@ -module(emqx_ft_assembler). +-include_lib("emqx/include/logger.hrl"). + -export([start_link/3]). -behaviour(gen_statem). @@ -73,7 +75,7 @@ handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) -> complete -> {next_state, start_assembling, NSt, ?internal([])}; {incomplete, _} -> - Nodes = ekka:nodelist() -- [node()], + Nodes = mria_mnesia:running_nodes() -- [node()], {next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])} % TODO: recovery? % {error, _} = Reason -> @@ -105,7 +107,7 @@ handle_event(internal, _, {list_remote_fragments, Nodes}, St) -> {next_state, start_assembling, NSt, ?internal([])}; % TODO: retries / recovery? {incomplete, _} = Status -> - {stop, {error, Status}} + {next_state, {failure, {error, Status}}, NSt, ?internal([])} end; handle_event(internal, _, start_assembling, St = #st{assembly = Asm}) -> Filemeta = emqx_ft_assembly:filemeta(Asm), @@ -124,21 +126,23 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) -> {ok, NHandle} -> {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])}; %% TODO: better error handling - {error, Error} -> - error(Error) + {error, _} = Error -> + {next_state, {failure, Error}, St, ?internal([])} end; - {error, Error} -> + {error, _} = Error -> %% TODO: better error handling - error(Error) + {next_state, {failure, Error}, St, ?internal([])} end; handle_event(internal, _, {assemble, []}, St = #st{}) -> {next_state, complete, St, ?internal([])}; handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle, callback = Callback}) -> Filemeta = emqx_ft_assembly:filemeta(Asm), Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle), - %% TODO: safe apply - _ = Callback(Result), - {stop, shutdown}. + _ = safe_apply(Callback, Result), + {stop, shutdown}; +handle_event(internal, _, {failure, Error}, #st{callback = Callback}) -> + _ = safe_apply(Callback, Error), + {stop, Error}. % handle_continue(list_local, St = #st{storage = Storage, transfer = Transfer, assembly = Asm}) -> % % TODO: what we do with non-transients errors here (e.g. `eacces`)? @@ -170,3 +174,16 @@ pread(Node, Segment, St) -> segsize(#{fragment := {segment, Info}}) -> maps:get(size, Info). + +safe_apply(Callback, Result) -> + try apply(Callback, [Result]) of + _ -> ok + catch + Class:Reason:Stacktrace -> + ?SLOG(error, #{ + msg => "safe_apply_failed", + class => Class, + reason => Reason, + stacktrace => Stacktrace + }) + end. diff --git a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl index b60949a5e..17aa4d998 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl @@ -29,7 +29,7 @@ start_child(Storage, Transfer, Callback) -> Childspec = #{ id => {Storage, Transfer}, start => {emqx_ft_assembler, start_link, [Storage, Transfer, Callback]}, - restart => transient + restart => temporary }, supervisor:start_child(?MODULE, Childspec). diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index 70acb8322..deb2cae6f 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -59,6 +59,11 @@ fields(local_storage) -> default => local, required => false, desc => ?DESC("local") + }}, + {root, #{ + type => binary(), + desc => ?DESC("local_storage_root"), + required => false }} ]. diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index 336580584..dd3ffedad 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -28,6 +28,8 @@ all() -> [ t_assemble_empty_transfer, t_assemble_complete_local_transfer, + t_assemble_incomplete_transfer, + t_assemble_no_meta, % NOTE % It depends on the side effects of all previous testcases. @@ -155,6 +157,46 @@ t_assemble_complete_local_transfer(Config) -> AssemblyFilename ). +t_assemble_incomplete_transfer(Config) -> + Storage = storage(Config), + Transfer = {?CLIENTID2, ?config(file_id, Config)}, + Filename = "incomplete.pdf", + TransferSize = 10000 + rand:uniform(50000), + SegmentSize = 4096, + Gen = emqx_ft_content_gen:new({Transfer, TransferSize}, SegmentSize), + Hash = emqx_ft_content_gen:hash(Gen, crypto:hash_init(sha256)), + Meta = #{ + name => Filename, + checksum => {sha256, Hash}, + size => TransferSize, + expire_at => 42 + }, + ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta), + Self = self(), + {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun(Result) -> + Self ! {test_assembly_finished, Result} + end), + receive + {test_assembly_finished, Result} -> + ?assertMatch({error, _}, Result) + after 1000 -> + ct:fail("Assembler did not called callback") + end. + +t_assemble_no_meta(Config) -> + Storage = storage(Config), + Transfer = {?CLIENTID2, ?config(file_id, Config)}, + Self = self(), + {ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun(Result) -> + Self ! {test_assembly_finished, Result} + end), + receive + {test_assembly_finished, Result} -> + ?assertMatch({error, _}, Result) + after 1000 -> + ct:fail("Assembler did not called callback") + end. + mk_assembly_filename(Config, {ClientID, FileID}, Filename) -> filename:join([?config(storage_root, Config), ClientID, FileID, result, Filename]). @@ -205,5 +247,6 @@ mk_fileid() -> storage(Config) -> #{ + type => local, root => ?config(storage_root, Config) }.