fix(ft): respect checksum in `fin` packets

This commit is contained in:
Andrew Mayorov 2023-05-31 18:00:43 +03:00
parent 036f180c27
commit 0293b54211
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
10 changed files with 73 additions and 45 deletions

View File

@ -45,8 +45,8 @@ define(Term, _) ->
Term.
%% @doc Apply a function to a maybe argument.
-spec apply(fun((A) -> maybe(A)), maybe(A)) ->
maybe(A).
-spec apply(fun((A) -> B), maybe(A)) ->
maybe(B).
apply(_Fun, undefined) ->
undefined;
apply(Fun, Term) when is_function(Fun) ->

View File

@ -45,7 +45,8 @@
offset/0,
filemeta/0,
segment/0,
checksum/0
checksum/0,
finopts/0
]).
%% Number of bytes
@ -80,6 +81,10 @@
-type segment() :: {offset(), _Content :: binary()}.
-type finopts() :: #{
checksum => checksum()
}.
%%--------------------------------------------------------------------
%% API for app
%%--------------------------------------------------------------------
@ -170,8 +175,8 @@ on_file_command(PacketId, FileId, Msg, FileCommand) ->
ChecksumBin = emqx_maybe:from_list(MaybeChecksum),
validate(
[{size, FinalSizeBin}, {{maybe, checksum}, ChecksumBin}],
fun([FinalSize, Checksum]) ->
on_fin(PacketId, Msg, Transfer, FinalSize, Checksum)
fun([FinalSize, FinalChecksum]) ->
on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum)
end
);
[<<"abort">>] ->
@ -251,13 +256,13 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
end
end).
on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) ->
?tp(info, "file_transfer_fin", #{
mqtt_msg => Msg,
packet_id => PacketId,
transfer => Transfer,
final_size => FinalSize,
checksum => Checksum
checksum => FinalChecksum
}),
%% TODO: handle checksum? Do we need it?
FinPacketKey = {self(), PacketId},
@ -265,7 +270,7 @@ on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result)
end,
with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() ->
case assemble(Transfer, FinalSize) of
case assemble(Transfer, FinalSize, FinalChecksum) of
%% Assembling completed, ack through the responder right away
ok ->
emqx_ft_responder:ack(FinPacketKey, ok);
@ -314,9 +319,10 @@ store_segment(Transfer, Segment) ->
{error, {internal_error, E}}
end.
assemble(Transfer, FinalSize) ->
assemble(Transfer, FinalSize, FinalChecksum) ->
try
emqx_ft_storage:assemble(Transfer, FinalSize)
FinOpts = [{checksum, FinalChecksum} || FinalChecksum /= undefined],
emqx_ft_storage:assemble(Transfer, FinalSize, maps:from_list(FinOpts))
catch
C:E:S ->
?tp(error, "start_assemble_failed", #{
@ -397,8 +403,8 @@ do_validate([{checksum, Checksum} | Rest], Parsed) ->
{error, _Reason} ->
{error, {invalid_checksum, Checksum}}
end;
do_validate([{integrity, Payload, Checksum} | Rest], Parsed) ->
case crypto:hash(sha256, Payload) of
do_validate([{integrity, Payload, {Algo, Checksum}} | Rest], Parsed) ->
case crypto:hash(Algo, Payload) of
Checksum ->
do_validate(Rest, [Payload | Parsed]);
Mismatch ->
@ -411,7 +417,7 @@ do_validate([{{maybe, T}, Value} | Rest], Parsed) ->
parse_checksum(Checksum) when is_binary(Checksum) andalso byte_size(Checksum) =:= 64 ->
try
{ok, binary:decode_hex(Checksum)}
{ok, {sha256, binary:decode_hex(Checksum)}}
catch
error:badarg ->
{error, invalid_checksum}

View File

@ -16,7 +16,7 @@
-module(emqx_ft_assembler).
-export([start_link/3]).
-export([start_link/4]).
-behaviour(gen_statem).
-export([callback_mode/0]).
@ -29,6 +29,7 @@
-type stdata() :: #{
storage := emqx_ft_storage_fs:storage(),
transfer := emqx_ft:transfer(),
finopts := emqx_ft:finopts(),
assembly := emqx_ft_assembly:t(),
export => emqx_ft_storage_exporter:export()
}.
@ -38,8 +39,8 @@
%%
start_link(Storage, Transfer, Size) ->
gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size}, []).
start_link(Storage, Transfer, Size, Opts) ->
gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size, Opts}, []).
where(Transfer) ->
gproc:where(?NAME(Transfer)).
@ -60,11 +61,12 @@ callback_mode() ->
handle_event_function.
-spec init(_Args) -> {ok, state(), stdata()}.
init({Storage, Transfer, Size}) ->
init({Storage, Transfer, Size, Opts}) ->
_ = erlang:process_flag(trap_exit, true),
St = #{
storage => Storage,
transfer => Transfer,
finopts => Opts,
assembly => emqx_ft_assembly:new(Size)
},
{ok, idle, St}.
@ -164,8 +166,8 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export :=
end;
handle_event(internal, _, {assemble, []}, St = #{}) ->
{next_state, complete, St, ?internal([])};
handle_event(internal, _, complete, St = #{export := Export}) ->
Result = emqx_ft_storage_exporter:complete(Export),
handle_event(internal, _, complete, St = #{export := Export, finopts := Opts}) ->
Result = emqx_ft_storage_exporter:complete(Export, Opts),
_ = maybe_garbage_collect(Result, St),
{stop, {shutdown, Result}, maps:remove(export, St)}.

View File

@ -17,7 +17,7 @@
-module(emqx_ft_assembler_sup).
-export([start_link/0]).
-export([ensure_child/3]).
-export([ensure_child/4]).
-behaviour(supervisor).
-export([init/1]).
@ -25,10 +25,10 @@
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
ensure_child(Storage, Transfer, Size) ->
ensure_child(Storage, Transfer, Size, Opts) ->
Childspec = #{
id => Transfer,
start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size]},
start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size, Opts]},
restart => temporary
},
case supervisor:start_child(?MODULE, Childspec) of

View File

@ -20,7 +20,7 @@
[
store_filemeta/2,
store_segment/2,
assemble/2,
assemble/3,
files/0,
files/1,
@ -88,7 +88,7 @@
ok | {async, pid()} | {error, term()}.
-callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) ->
ok | {async, pid()} | {error, term()}.
-callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) ->
-callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes(), emqx_ft:finopts()) ->
ok | {async, pid()} | {error, term()}.
-callback files(storage(), query(Cursor)) ->
@ -114,10 +114,10 @@ store_filemeta(Transfer, FileMeta) ->
store_segment(Transfer, Segment) ->
dispatch(store_segment, [Transfer, Segment]).
-spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) ->
-spec assemble(emqx_ft:transfer(), emqx_ft:bytes(), emqx_ft:finopts()) ->
ok | {async, pid()} | {error, term()}.
assemble(Transfer, Size) ->
dispatch(assemble, [Transfer, Size]).
assemble(Transfer, Size, FinOpts) ->
dispatch(assemble, [Transfer, Size, FinOpts]).
-spec files() ->
{ok, page(file_info(), _)} | {error, term()}.

View File

@ -24,7 +24,7 @@
%% Export API
-export([start_export/3]).
-export([write/2]).
-export([complete/1]).
-export([complete/2]).
-export([discard/1]).
%% Listing API
@ -117,12 +117,19 @@ write(#{mod := ExporterMod, st := ExportSt, hash := Hash} = Export, Content) ->
Error
end.
-spec complete(export()) ->
-spec complete(export(), emqx_ft:finopts()) ->
ok | {error, _Reason}.
complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}) ->
case verify_checksum(Hash, Filemeta) of
{ok, Checksum} ->
ExporterMod:complete(ExportSt, Checksum);
complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}, Opts) ->
Checksum = emqx_maybe:define(
% NOTE
% Checksum in `Opts` takes precedence over one in `Filemeta` according to the spec.
% We do not care if they differ.
maps:get(checksum, Opts, undefined),
maps:get(checksum, Filemeta, undefined)
),
case verify_checksum(Hash, Checksum) of
{ok, ExportChecksum} ->
ExporterMod:complete(ExportSt, ExportChecksum);
{error, _} = Error ->
_ = ExporterMod:discard(ExportSt),
Error
@ -183,13 +190,13 @@ init_checksum(#{}) ->
update_checksum(Ctx, IoData) ->
crypto:hash_update(Ctx, IoData).
verify_checksum(Ctx, #{checksum := {Algo, Digest} = Checksum}) ->
verify_checksum(Ctx, {Algo, Digest} = Checksum) ->
case crypto:hash_final(Ctx) of
Digest ->
{ok, Checksum};
Mismatch ->
{error, {checksum, Algo, binary:encode_hex(Mismatch)}}
end;
verify_checksum(Ctx, #{}) ->
verify_checksum(Ctx, undefined) ->
Digest = crypto:hash_final(Ctx),
{ok, {sha256, Digest}}.

View File

@ -36,7 +36,7 @@
-export([list/3]).
-export([pread/5]).
-export([lookup_local_assembler/1]).
-export([assemble/3]).
-export([assemble/4]).
-export([transfers/1]).
@ -211,14 +211,14 @@ pread(_Storage, _Transfer, Frag, Offset, Size) ->
{error, Reason}
end.
-spec assemble(storage(), transfer(), emqx_ft:bytes()) ->
-spec assemble(storage(), transfer(), emqx_ft:bytes(), emqx_ft:finopts()) ->
{async, _Assembler :: pid()} | ok | {error, _TODO}.
assemble(Storage, Transfer, Size) ->
assemble(Storage, Transfer, Size, Opts) ->
LookupSources = [
fun() -> lookup_local_assembler(Transfer) end,
fun() -> lookup_remote_assembler(Transfer) end,
fun() -> check_if_already_exported(Storage, Transfer) end,
fun() -> ensure_local_assembler(Storage, Transfer, Size) end
fun() -> ensure_local_assembler(Storage, Transfer, Size, Opts) end
],
lookup_assembler(LookupSources).
@ -295,8 +295,8 @@ lookup_remote_assembler(Transfer) ->
_ -> {error, not_found}
end.
ensure_local_assembler(Storage, Transfer, Size) ->
{ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size),
ensure_local_assembler(Storage, Transfer, Size, Opts) ->
{ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size, Opts),
{async, Pid}.
-spec transfers(storage()) ->

View File

@ -159,6 +159,10 @@ t_invalid_topic_format(Config) ->
unspecified_error,
emqtt:publish(C, <<"$file/fileid/fin/offset">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/fileid/fin/42/xyz">>, <<>>, 1)
),
?assertRCName(
unspecified_error,
emqtt:publish(C, <<"$file/">>, <<>>, 1)
@ -390,9 +394,18 @@ t_invalid_checksum(Config) ->
with_offsets(Data)
),
% Send `fin` w/o checksum, should fail since filemeta checksum is invalid
FinTopic = mk_fin_topic(FileId, Filesize),
?assertRCName(
unspecified_error,
emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1)
emqtt:publish(C, FinTopic, <<>>, 1)
),
% Send `fin` with the correct checksum
Checksum = binary:encode_hex(sha256(Data)),
?assertRCName(
success,
emqtt:publish(C, <<FinTopic/binary, "/", Checksum/binary>>, <<>>, 1)
).
t_corrupted_segment_retry(Config) ->
@ -507,7 +520,7 @@ t_assemble_crash(Config) ->
C = ?config(client, Config),
meck:new(emqx_ft_storage_fs),
meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _) -> meck:exception(error, oops) end),
meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _, _) -> meck:exception(error, oops) end),
?assertRCName(
unspecified_error,

View File

@ -178,7 +178,7 @@ complete_assemble(Storage, Transfer, Size) ->
complete_assemble(Storage, Transfer, Size, 1000).
complete_assemble(Storage, Transfer, Size, Timeout) ->
{async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size),
{async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}),
MRef = erlang:monitor(process, Pid),
Pid ! kickoff,
receive

View File

@ -381,7 +381,7 @@ complete_transfer(Storage, Transfer, Size) ->
complete_transfer(Storage, Transfer, Size, 100).
complete_transfer(Storage, Transfer, Size, Timeout) ->
case emqx_ft_storage_fs:assemble(Storage, Transfer, Size) of
case emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}) of
ok ->
ok;
{async, Pid} ->