diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index e3f6cbe1c..064b6e066 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -45,7 +45,8 @@ bytes/0, offset/0, filemeta/0, - segment/0 + segment/0, + checksum/0 ]). %% Number of bytes @@ -57,6 +58,7 @@ -type fileid() :: binary(). -type transfer() :: {clientid(), fileid()}. -type offset() :: bytes(). +-type checksum() :: {_Algo :: atom(), _Digest :: binary()}. -type filemeta() :: #{ %% Display name @@ -68,7 +70,7 @@ %% currently do not condider that an error (or, specifically, a signal that %% the resulting file is corrupted during transmission). size => _Bytes :: non_neg_integer(), - checksum => {sha256, <<_:256>>}, + checksum => checksum(), expire_at := emqx_datetime:epoch_second(), %% TTL of individual segments %% Somewhat confusing that we won't know it on the nodes where the filemeta diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl index 297762d28..444d686a8 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl @@ -39,10 +39,21 @@ -type storage() :: emxt_ft_storage_fs:storage(). -type transfer() :: emqx_ft:transfer(). -type filemeta() :: emqx_ft:filemeta(). +-type checksum() :: emqx_ft:checksum(). -type exporter_conf() :: map(). -type export_st() :: term(). --opaque export() :: {module(), export_st()}. +-type hash_state() :: term(). +-opaque export() :: #{ + mod := module(), + st := export_st(), + hash := hash_state(), + filemeta := filemeta() +}. + +%%------------------------------------------------------------------------------ +%% Behaviour +%%------------------------------------------------------------------------------ -callback start_export(exporter_conf(), transfer(), filemeta()) -> {ok, export_st()} | {error, _Reason}. @@ -51,7 +62,7 @@ -callback write(ExportSt :: export_st(), iodata()) -> {ok, ExportSt :: export_st()} | {error, _Reason}. --callback complete(ExportSt :: export_st()) -> +-callback complete(_ExportSt :: export_st(), _Checksum :: checksum()) -> ok | {error, _Reason}. -callback discard(ExportSt :: export_st()) -> @@ -60,7 +71,9 @@ -callback list(storage()) -> {ok, [emqx_ft_storage:file_info()]} | {error, _Reason}. -%% +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ -spec start_export(storage(), transfer(), filemeta()) -> {ok, export()} | {error, _Reason}. @@ -68,29 +81,43 @@ start_export(Storage, Transfer, Filemeta) -> {ExporterMod, ExporterConf} = exporter(Storage), case ExporterMod:start_export(ExporterConf, Transfer, Filemeta) of {ok, ExportSt} -> - {ok, {ExporterMod, ExportSt}}; + {ok, #{ + mod => ExporterMod, + st => ExportSt, + hash => init_checksum(Filemeta), + filemeta => Filemeta + }}; {error, _} = Error -> Error end. -spec write(export(), iodata()) -> {ok, export()} | {error, _Reason}. -write({ExporterMod, ExportSt}, Content) -> +write(#{mod := ExporterMod, st := ExportSt, hash := Hash} = Export, Content) -> case ExporterMod:write(ExportSt, Content) of {ok, ExportStNext} -> - {ok, {ExporterMod, ExportStNext}}; + {ok, Export#{ + st := ExportStNext, + hash := update_checksum(Hash, Content) + }}; {error, _} = Error -> Error end. -spec complete(export()) -> ok | {error, _Reason}. -complete({ExporterMod, ExportSt}) -> - ExporterMod:complete(ExportSt). +complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}) -> + case verify_checksum(Hash, Filemeta) of + {ok, Checksum} -> + ExporterMod:complete(ExportSt, Checksum); + {error, _} = Error -> + _ = ExporterMod:discard(ExportSt), + Error + end. -spec discard(export()) -> ok | {error, _Reason}. -discard({ExporterMod, ExportSt}) -> +discard(#{mod := ExporterMod, st := ExportSt}) -> ExporterMod:discard(ExportSt). -spec list(storage()) -> @@ -99,7 +126,10 @@ list(Storage) -> {ExporterMod, ExporterOpts} = exporter(Storage), ExporterMod:list(ExporterOpts). --spec exporter(storage()) -> {module(), _ExporterOptions}. +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + exporter(Storage) -> case maps:get(exporter, Storage) of #{type := local} = Options -> @@ -111,3 +141,22 @@ exporter(Storage) -> -spec without_type(exporter_conf()) -> exporter_conf(). without_type(#{type := _} = Options) -> maps:without([type], Options). + +init_checksum(#{checksum := {Algo, _}}) -> + crypto:hash_init(Algo); +init_checksum(#{}) -> + crypto:hash_init(sha256). + +update_checksum(Ctx, IoData) -> + crypto:hash_update(Ctx, IoData). + +verify_checksum(Ctx, #{checksum := {Algo, Digest} = Checksum}) -> + case crypto:hash_final(Ctx) of + Digest -> + {ok, Checksum}; + Mismatch -> + {error, {checksum, Algo, binary:encode_hex(Mismatch)}} + end; +verify_checksum(Ctx, #{}) -> + Digest = crypto:hash_final(Ctx), + {ok, {sha256, Digest}}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl index 64c0e325a..fd1956009 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl @@ -24,7 +24,7 @@ -export([start_export/3]). -export([write/2]). --export([complete/1]). +-export([complete/2]). -export([discard/1]). -export([list/1]). @@ -50,12 +50,11 @@ -type file_error() :: emqx_ft_storage_fs:file_error(). --type export() :: #{ +-type export_st() :: #{ path := file:name(), handle := io:device(), result := file:name(), - meta := filemeta(), - hash := crypto:hash_state() + meta := filemeta() }. -type reader() :: pid(). @@ -92,7 +91,7 @@ %% -spec start_export(options(), transfer(), filemeta()) -> - {ok, export()} | {error, file_error()}. + {ok, export_st()} | {error, file_error()}. start_export(Options, Transfer, Filemeta = #{name := Filename}) -> TempFilepath = mk_temp_absfilepath(Options, Transfer, Filename), ResultFilepath = mk_absfilepath(Options, Transfer, result, Filename), @@ -103,47 +102,41 @@ start_export(Options, Transfer, Filemeta = #{name := Filename}) -> path => TempFilepath, handle => Handle, result => ResultFilepath, - meta => Filemeta, - hash => init_checksum(Filemeta) + meta => Filemeta }}; {error, _} = Error -> Error end. --spec write(export(), iodata()) -> - {ok, export()} | {error, file_error()}. -write(Export = #{handle := Handle, hash := Ctx}, IoData) -> +-spec write(export_st(), iodata()) -> + {ok, export_st()} | {error, file_error()}. +write(ExportSt = #{handle := Handle}, IoData) -> case file:write(Handle, IoData) of ok -> - {ok, Export#{hash := update_checksum(Ctx, IoData)}}; + {ok, ExportSt}; {error, _} = Error -> - _ = discard(Export), + _ = discard(ExportSt), Error end. --spec complete(export()) -> +-spec complete(export_st(), emqx_ft:checksum()) -> ok | {error, {checksum, _Algo, _Computed}} | {error, file_error()}. complete( - Export = #{ + #{ path := Filepath, handle := Handle, result := ResultFilepath, - meta := FilemetaIn, - hash := Ctx - } + meta := FilemetaIn + }, + Checksum ) -> - case verify_checksum(Ctx, FilemetaIn) of - {ok, Filemeta} -> - ok = file:close(Handle), - _ = filelib:ensure_dir(ResultFilepath), - _ = file:write_file(mk_manifest_filename(ResultFilepath), encode_filemeta(Filemeta)), - file:rename(Filepath, ResultFilepath); - {error, _} = Error -> - _ = discard(Export), - Error - end. + Filemeta = FilemetaIn#{checksum => Checksum}, + ok = file:close(Handle), + _ = filelib:ensure_dir(ResultFilepath), + _ = file:write_file(mk_manifest_filename(ResultFilepath), encode_filemeta(Filemeta)), + file:rename(Filepath, ResultFilepath). --spec discard(export()) -> +-spec discard(export_st()) -> ok. discard(#{path := Filepath, handle := Handle}) -> ok = file:close(Handle), @@ -297,27 +290,6 @@ list(_Options) -> %% -init_checksum(#{checksum := {Algo, _}}) -> - crypto:hash_init(Algo); -init_checksum(#{}) -> - crypto:hash_init(sha256). - -update_checksum(Ctx, IoData) -> - crypto:hash_update(Ctx, IoData). - -verify_checksum(Ctx, Filemeta = #{checksum := {Algo, Digest}}) -> - case crypto:hash_final(Ctx) of - Digest -> - {ok, Filemeta}; - Mismatch -> - {error, {checksum, Algo, binary:encode_hex(Mismatch)}} - end; -verify_checksum(Ctx, Filemeta = #{}) -> - Digest = crypto:hash_final(Ctx), - {ok, Filemeta#{checksum => {sha256, Digest}}}. - -%% - -define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]). encode_filemeta(Meta) ->