feat(ft-s3): extract checksum verification

This commit is contained in:
Ilya Averyanov 2023-03-30 23:09:13 +03:00
parent 818a5cacf2
commit 8361223648
3 changed files with 84 additions and 61 deletions

View File

@ -45,7 +45,8 @@
bytes/0, bytes/0,
offset/0, offset/0,
filemeta/0, filemeta/0,
segment/0 segment/0,
checksum/0
]). ]).
%% Number of bytes %% Number of bytes
@ -57,6 +58,7 @@
-type fileid() :: binary(). -type fileid() :: binary().
-type transfer() :: {clientid(), fileid()}. -type transfer() :: {clientid(), fileid()}.
-type offset() :: bytes(). -type offset() :: bytes().
-type checksum() :: {_Algo :: atom(), _Digest :: binary()}.
-type filemeta() :: #{ -type filemeta() :: #{
%% Display name %% Display name
@ -68,7 +70,7 @@
%% currently do not condider that an error (or, specifically, a signal that %% currently do not condider that an error (or, specifically, a signal that
%% the resulting file is corrupted during transmission). %% the resulting file is corrupted during transmission).
size => _Bytes :: non_neg_integer(), size => _Bytes :: non_neg_integer(),
checksum => {sha256, <<_:256>>}, checksum => checksum(),
expire_at := emqx_datetime:epoch_second(), expire_at := emqx_datetime:epoch_second(),
%% TTL of individual segments %% TTL of individual segments
%% Somewhat confusing that we won't know it on the nodes where the filemeta %% Somewhat confusing that we won't know it on the nodes where the filemeta

View File

@ -39,10 +39,21 @@
-type storage() :: emxt_ft_storage_fs:storage(). -type storage() :: emxt_ft_storage_fs:storage().
-type transfer() :: emqx_ft:transfer(). -type transfer() :: emqx_ft:transfer().
-type filemeta() :: emqx_ft:filemeta(). -type filemeta() :: emqx_ft:filemeta().
-type checksum() :: emqx_ft:checksum().
-type exporter_conf() :: map(). -type exporter_conf() :: map().
-type export_st() :: term(). -type export_st() :: term().
-opaque export() :: {module(), export_st()}. -type hash_state() :: term().
-opaque export() :: #{
mod := module(),
st := export_st(),
hash := hash_state(),
filemeta := filemeta()
}.
%%------------------------------------------------------------------------------
%% Behaviour
%%------------------------------------------------------------------------------
-callback start_export(exporter_conf(), transfer(), filemeta()) -> -callback start_export(exporter_conf(), transfer(), filemeta()) ->
{ok, export_st()} | {error, _Reason}. {ok, export_st()} | {error, _Reason}.
@ -51,7 +62,7 @@
-callback write(ExportSt :: export_st(), iodata()) -> -callback write(ExportSt :: export_st(), iodata()) ->
{ok, ExportSt :: export_st()} | {error, _Reason}. {ok, ExportSt :: export_st()} | {error, _Reason}.
-callback complete(ExportSt :: export_st()) -> -callback complete(_ExportSt :: export_st(), _Checksum :: checksum()) ->
ok | {error, _Reason}. ok | {error, _Reason}.
-callback discard(ExportSt :: export_st()) -> -callback discard(ExportSt :: export_st()) ->
@ -60,7 +71,9 @@
-callback list(storage()) -> -callback list(storage()) ->
{ok, [emqx_ft_storage:file_info()]} | {error, _Reason}. {ok, [emqx_ft_storage:file_info()]} | {error, _Reason}.
%% %%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
-spec start_export(storage(), transfer(), filemeta()) -> -spec start_export(storage(), transfer(), filemeta()) ->
{ok, export()} | {error, _Reason}. {ok, export()} | {error, _Reason}.
@ -68,29 +81,43 @@ start_export(Storage, Transfer, Filemeta) ->
{ExporterMod, ExporterConf} = exporter(Storage), {ExporterMod, ExporterConf} = exporter(Storage),
case ExporterMod:start_export(ExporterConf, Transfer, Filemeta) of case ExporterMod:start_export(ExporterConf, Transfer, Filemeta) of
{ok, ExportSt} -> {ok, ExportSt} ->
{ok, {ExporterMod, ExportSt}}; {ok, #{
mod => ExporterMod,
st => ExportSt,
hash => init_checksum(Filemeta),
filemeta => Filemeta
}};
{error, _} = Error -> {error, _} = Error ->
Error Error
end. end.
-spec write(export(), iodata()) -> -spec write(export(), iodata()) ->
{ok, export()} | {error, _Reason}. {ok, export()} | {error, _Reason}.
write({ExporterMod, ExportSt}, Content) -> write(#{mod := ExporterMod, st := ExportSt, hash := Hash} = Export, Content) ->
case ExporterMod:write(ExportSt, Content) of case ExporterMod:write(ExportSt, Content) of
{ok, ExportStNext} -> {ok, ExportStNext} ->
{ok, {ExporterMod, ExportStNext}}; {ok, Export#{
st := ExportStNext,
hash := update_checksum(Hash, Content)
}};
{error, _} = Error -> {error, _} = Error ->
Error Error
end. end.
-spec complete(export()) -> -spec complete(export()) ->
ok | {error, _Reason}. ok | {error, _Reason}.
complete({ExporterMod, ExportSt}) -> complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}) ->
ExporterMod:complete(ExportSt). case verify_checksum(Hash, Filemeta) of
{ok, Checksum} ->
ExporterMod:complete(ExportSt, Checksum);
{error, _} = Error ->
_ = ExporterMod:discard(ExportSt),
Error
end.
-spec discard(export()) -> -spec discard(export()) ->
ok | {error, _Reason}. ok | {error, _Reason}.
discard({ExporterMod, ExportSt}) -> discard(#{mod := ExporterMod, st := ExportSt}) ->
ExporterMod:discard(ExportSt). ExporterMod:discard(ExportSt).
-spec list(storage()) -> -spec list(storage()) ->
@ -99,7 +126,10 @@ list(Storage) ->
{ExporterMod, ExporterOpts} = exporter(Storage), {ExporterMod, ExporterOpts} = exporter(Storage),
ExporterMod:list(ExporterOpts). ExporterMod:list(ExporterOpts).
-spec exporter(storage()) -> {module(), _ExporterOptions}. %%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
exporter(Storage) -> exporter(Storage) ->
case maps:get(exporter, Storage) of case maps:get(exporter, Storage) of
#{type := local} = Options -> #{type := local} = Options ->
@ -111,3 +141,22 @@ exporter(Storage) ->
-spec without_type(exporter_conf()) -> exporter_conf(). -spec without_type(exporter_conf()) -> exporter_conf().
without_type(#{type := _} = Options) -> without_type(#{type := _} = Options) ->
maps:without([type], Options). maps:without([type], Options).
init_checksum(#{checksum := {Algo, _}}) ->
crypto:hash_init(Algo);
init_checksum(#{}) ->
crypto:hash_init(sha256).
update_checksum(Ctx, IoData) ->
crypto:hash_update(Ctx, IoData).
verify_checksum(Ctx, #{checksum := {Algo, Digest} = Checksum}) ->
case crypto:hash_final(Ctx) of
Digest ->
{ok, Checksum};
Mismatch ->
{error, {checksum, Algo, binary:encode_hex(Mismatch)}}
end;
verify_checksum(Ctx, #{}) ->
Digest = crypto:hash_final(Ctx),
{ok, {sha256, Digest}}.

View File

@ -24,7 +24,7 @@
-export([start_export/3]). -export([start_export/3]).
-export([write/2]). -export([write/2]).
-export([complete/1]). -export([complete/2]).
-export([discard/1]). -export([discard/1]).
-export([list/1]). -export([list/1]).
@ -50,12 +50,11 @@
-type file_error() :: emqx_ft_storage_fs:file_error(). -type file_error() :: emqx_ft_storage_fs:file_error().
-type export() :: #{ -type export_st() :: #{
path := file:name(), path := file:name(),
handle := io:device(), handle := io:device(),
result := file:name(), result := file:name(),
meta := filemeta(), meta := filemeta()
hash := crypto:hash_state()
}. }.
-type reader() :: pid(). -type reader() :: pid().
@ -92,7 +91,7 @@
%% %%
-spec start_export(options(), transfer(), filemeta()) -> -spec start_export(options(), transfer(), filemeta()) ->
{ok, export()} | {error, file_error()}. {ok, export_st()} | {error, file_error()}.
start_export(Options, Transfer, Filemeta = #{name := Filename}) -> start_export(Options, Transfer, Filemeta = #{name := Filename}) ->
TempFilepath = mk_temp_absfilepath(Options, Transfer, Filename), TempFilepath = mk_temp_absfilepath(Options, Transfer, Filename),
ResultFilepath = mk_absfilepath(Options, Transfer, result, Filename), ResultFilepath = mk_absfilepath(Options, Transfer, result, Filename),
@ -103,47 +102,41 @@ start_export(Options, Transfer, Filemeta = #{name := Filename}) ->
path => TempFilepath, path => TempFilepath,
handle => Handle, handle => Handle,
result => ResultFilepath, result => ResultFilepath,
meta => Filemeta, meta => Filemeta
hash => init_checksum(Filemeta)
}}; }};
{error, _} = Error -> {error, _} = Error ->
Error Error
end. end.
-spec write(export(), iodata()) -> -spec write(export_st(), iodata()) ->
{ok, export()} | {error, file_error()}. {ok, export_st()} | {error, file_error()}.
write(Export = #{handle := Handle, hash := Ctx}, IoData) -> write(ExportSt = #{handle := Handle}, IoData) ->
case file:write(Handle, IoData) of case file:write(Handle, IoData) of
ok -> ok ->
{ok, Export#{hash := update_checksum(Ctx, IoData)}}; {ok, ExportSt};
{error, _} = Error -> {error, _} = Error ->
_ = discard(Export), _ = discard(ExportSt),
Error Error
end. end.
-spec complete(export()) -> -spec complete(export_st(), emqx_ft:checksum()) ->
ok | {error, {checksum, _Algo, _Computed}} | {error, file_error()}. ok | {error, {checksum, _Algo, _Computed}} | {error, file_error()}.
complete( complete(
Export = #{ #{
path := Filepath, path := Filepath,
handle := Handle, handle := Handle,
result := ResultFilepath, result := ResultFilepath,
meta := FilemetaIn, meta := FilemetaIn
hash := Ctx },
} Checksum
) -> ) ->
case verify_checksum(Ctx, FilemetaIn) of Filemeta = FilemetaIn#{checksum => Checksum},
{ok, Filemeta} -> ok = file:close(Handle),
ok = file:close(Handle), _ = filelib:ensure_dir(ResultFilepath),
_ = filelib:ensure_dir(ResultFilepath), _ = file:write_file(mk_manifest_filename(ResultFilepath), encode_filemeta(Filemeta)),
_ = file:write_file(mk_manifest_filename(ResultFilepath), encode_filemeta(Filemeta)), file:rename(Filepath, ResultFilepath).
file:rename(Filepath, ResultFilepath);
{error, _} = Error ->
_ = discard(Export),
Error
end.
-spec discard(export()) -> -spec discard(export_st()) ->
ok. ok.
discard(#{path := Filepath, handle := Handle}) -> discard(#{path := Filepath, handle := Handle}) ->
ok = file:close(Handle), ok = file:close(Handle),
@ -297,27 +290,6 @@ list(_Options) ->
%% %%
init_checksum(#{checksum := {Algo, _}}) ->
crypto:hash_init(Algo);
init_checksum(#{}) ->
crypto:hash_init(sha256).
update_checksum(Ctx, IoData) ->
crypto:hash_update(Ctx, IoData).
verify_checksum(Ctx, Filemeta = #{checksum := {Algo, Digest}}) ->
case crypto:hash_final(Ctx) of
Digest ->
{ok, Filemeta};
Mismatch ->
{error, {checksum, Algo, binary:encode_hex(Mismatch)}}
end;
verify_checksum(Ctx, Filemeta = #{}) ->
Digest = crypto:hash_final(Ctx),
{ok, Filemeta#{checksum => {sha256, Digest}}}.
%%
-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]). -define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
encode_filemeta(Meta) -> encode_filemeta(Meta) ->