diff --git a/apps/emqx/src/emqx_maybe.erl b/apps/emqx/src/emqx_maybe.erl index 5b5d5e94b..af2fd04a7 100644 --- a/apps/emqx/src/emqx_maybe.erl +++ b/apps/emqx/src/emqx_maybe.erl @@ -45,8 +45,8 @@ define(Term, _) -> Term. %% @doc Apply a function to a maybe argument. --spec apply(fun((A) -> maybe(A)), maybe(A)) -> - maybe(A). +-spec apply(fun((A) -> B), maybe(A)) -> + maybe(B). apply(_Fun, undefined) -> undefined; apply(Fun, Term) when is_function(Fun) -> diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 898203b51..34dfc09a7 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -45,7 +45,8 @@ offset/0, filemeta/0, segment/0, - checksum/0 + checksum/0, + finopts/0 ]). %% Number of bytes @@ -80,6 +81,10 @@ -type segment() :: {offset(), _Content :: binary()}. +-type finopts() :: #{ + checksum => checksum() +}. + %%-------------------------------------------------------------------- %% API for app %%-------------------------------------------------------------------- @@ -170,8 +175,8 @@ on_file_command(PacketId, FileId, Msg, FileCommand) -> ChecksumBin = emqx_maybe:from_list(MaybeChecksum), validate( [{size, FinalSizeBin}, {{maybe, checksum}, ChecksumBin}], - fun([FinalSize, Checksum]) -> - on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) + fun([FinalSize, FinalChecksum]) -> + on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) end ); [<<"abort">>] -> @@ -251,13 +256,13 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) -> end end). -on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) -> +on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) -> ?tp(info, "file_transfer_fin", #{ mqtt_msg => Msg, packet_id => PacketId, transfer => Transfer, final_size => FinalSize, - checksum => Checksum + checksum => FinalChecksum }), %% TODO: handle checksum? Do we need it? FinPacketKey = {self(), PacketId}, @@ -265,7 +270,7 @@ on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) -> ?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result) end, with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() -> - case assemble(Transfer, FinalSize) of + case assemble(Transfer, FinalSize, FinalChecksum) of %% Assembling completed, ack through the responder right away ok -> emqx_ft_responder:ack(FinPacketKey, ok); @@ -314,9 +319,10 @@ store_segment(Transfer, Segment) -> {error, {internal_error, E}} end. -assemble(Transfer, FinalSize) -> +assemble(Transfer, FinalSize, FinalChecksum) -> try - emqx_ft_storage:assemble(Transfer, FinalSize) + FinOpts = [{checksum, FinalChecksum} || FinalChecksum /= undefined], + emqx_ft_storage:assemble(Transfer, FinalSize, maps:from_list(FinOpts)) catch C:E:S -> ?tp(error, "start_assemble_failed", #{ @@ -397,8 +403,8 @@ do_validate([{checksum, Checksum} | Rest], Parsed) -> {error, _Reason} -> {error, {invalid_checksum, Checksum}} end; -do_validate([{integrity, Payload, Checksum} | Rest], Parsed) -> - case crypto:hash(sha256, Payload) of +do_validate([{integrity, Payload, {Algo, Checksum}} | Rest], Parsed) -> + case crypto:hash(Algo, Payload) of Checksum -> do_validate(Rest, [Payload | Parsed]); Mismatch -> @@ -411,7 +417,7 @@ do_validate([{{maybe, T}, Value} | Rest], Parsed) -> parse_checksum(Checksum) when is_binary(Checksum) andalso byte_size(Checksum) =:= 64 -> try - {ok, binary:decode_hex(Checksum)} + {ok, {sha256, binary:decode_hex(Checksum)}} catch error:badarg -> {error, invalid_checksum} diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 873efc6ff..c96df224c 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -16,7 +16,7 @@ -module(emqx_ft_assembler). --export([start_link/3]). +-export([start_link/4]). -behaviour(gen_statem). -export([callback_mode/0]). @@ -29,6 +29,7 @@ -type stdata() :: #{ storage := emqx_ft_storage_fs:storage(), transfer := emqx_ft:transfer(), + finopts := emqx_ft:finopts(), assembly := emqx_ft_assembly:t(), export => emqx_ft_storage_exporter:export() }. @@ -38,8 +39,8 @@ %% -start_link(Storage, Transfer, Size) -> - gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size}, []). +start_link(Storage, Transfer, Size, Opts) -> + gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size, Opts}, []). where(Transfer) -> gproc:where(?NAME(Transfer)). @@ -60,11 +61,12 @@ callback_mode() -> handle_event_function. -spec init(_Args) -> {ok, state(), stdata()}. -init({Storage, Transfer, Size}) -> +init({Storage, Transfer, Size, Opts}) -> _ = erlang:process_flag(trap_exit, true), St = #{ storage => Storage, transfer => Transfer, + finopts => Opts, assembly => emqx_ft_assembly:new(Size) }, {ok, idle, St}. @@ -164,8 +166,8 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export := end; handle_event(internal, _, {assemble, []}, St = #{}) -> {next_state, complete, St, ?internal([])}; -handle_event(internal, _, complete, St = #{export := Export}) -> - Result = emqx_ft_storage_exporter:complete(Export), +handle_event(internal, _, complete, St = #{export := Export, finopts := Opts}) -> + Result = emqx_ft_storage_exporter:complete(Export, Opts), _ = maybe_garbage_collect(Result, St), {stop, {shutdown, Result}, maps:remove(export, St)}. diff --git a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl index 4ba65c290..e6e689a8c 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler_sup.erl @@ -17,7 +17,7 @@ -module(emqx_ft_assembler_sup). -export([start_link/0]). --export([ensure_child/3]). +-export([ensure_child/4]). -behaviour(supervisor). -export([init/1]). @@ -25,10 +25,10 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -ensure_child(Storage, Transfer, Size) -> +ensure_child(Storage, Transfer, Size, Opts) -> Childspec = #{ id => Transfer, - start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size]}, + start => {emqx_ft_assembler, start_link, [Storage, Transfer, Size, Opts]}, restart => temporary }, case supervisor:start_child(?MODULE, Childspec) of diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 4e1060d88..007b47db9 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -20,7 +20,7 @@ [ store_filemeta/2, store_segment/2, - assemble/2, + assemble/3, files/0, files/1, @@ -88,7 +88,7 @@ ok | {async, pid()} | {error, term()}. -callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) -> ok | {async, pid()} | {error, term()}. --callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes()) -> +-callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes(), emqx_ft:finopts()) -> ok | {async, pid()} | {error, term()}. -callback files(storage(), query(Cursor)) -> @@ -114,10 +114,10 @@ store_filemeta(Transfer, FileMeta) -> store_segment(Transfer, Segment) -> dispatch(store_segment, [Transfer, Segment]). --spec assemble(emqx_ft:transfer(), emqx_ft:bytes()) -> +-spec assemble(emqx_ft:transfer(), emqx_ft:bytes(), emqx_ft:finopts()) -> ok | {async, pid()} | {error, term()}. -assemble(Transfer, Size) -> - dispatch(assemble, [Transfer, Size]). +assemble(Transfer, Size, FinOpts) -> + dispatch(assemble, [Transfer, Size, FinOpts]). -spec files() -> {ok, page(file_info(), _)} | {error, term()}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl index 601f8b112..6f2b9bea1 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl @@ -24,7 +24,7 @@ %% Export API -export([start_export/3]). -export([write/2]). --export([complete/1]). +-export([complete/2]). -export([discard/1]). %% Listing API @@ -117,12 +117,19 @@ write(#{mod := ExporterMod, st := ExportSt, hash := Hash} = Export, Content) -> Error end. --spec complete(export()) -> +-spec complete(export(), emqx_ft:finopts()) -> ok | {error, _Reason}. -complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}) -> - case verify_checksum(Hash, Filemeta) of - {ok, Checksum} -> - ExporterMod:complete(ExportSt, Checksum); +complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}, Opts) -> + Checksum = emqx_maybe:define( + % NOTE + % Checksum in `Opts` takes precedence over one in `Filemeta` according to the spec. + % We do not care if they differ. + maps:get(checksum, Opts, undefined), + maps:get(checksum, Filemeta, undefined) + ), + case verify_checksum(Hash, Checksum) of + {ok, ExportChecksum} -> + ExporterMod:complete(ExportSt, ExportChecksum); {error, _} = Error -> _ = ExporterMod:discard(ExportSt), Error @@ -183,13 +190,13 @@ init_checksum(#{}) -> update_checksum(Ctx, IoData) -> crypto:hash_update(Ctx, IoData). -verify_checksum(Ctx, #{checksum := {Algo, Digest} = Checksum}) -> +verify_checksum(Ctx, {Algo, Digest} = Checksum) -> case crypto:hash_final(Ctx) of Digest -> {ok, Checksum}; Mismatch -> {error, {checksum, Algo, binary:encode_hex(Mismatch)}} end; -verify_checksum(Ctx, #{}) -> +verify_checksum(Ctx, undefined) -> Digest = crypto:hash_final(Ctx), {ok, {sha256, Digest}}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 7a0a6b3b4..99720f521 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -36,7 +36,7 @@ -export([list/3]). -export([pread/5]). -export([lookup_local_assembler/1]). --export([assemble/3]). +-export([assemble/4]). -export([transfers/1]). @@ -211,14 +211,14 @@ pread(_Storage, _Transfer, Frag, Offset, Size) -> {error, Reason} end. --spec assemble(storage(), transfer(), emqx_ft:bytes()) -> +-spec assemble(storage(), transfer(), emqx_ft:bytes(), emqx_ft:finopts()) -> {async, _Assembler :: pid()} | ok | {error, _TODO}. -assemble(Storage, Transfer, Size) -> +assemble(Storage, Transfer, Size, Opts) -> LookupSources = [ fun() -> lookup_local_assembler(Transfer) end, fun() -> lookup_remote_assembler(Transfer) end, fun() -> check_if_already_exported(Storage, Transfer) end, - fun() -> ensure_local_assembler(Storage, Transfer, Size) end + fun() -> ensure_local_assembler(Storage, Transfer, Size, Opts) end ], lookup_assembler(LookupSources). @@ -295,8 +295,8 @@ lookup_remote_assembler(Transfer) -> _ -> {error, not_found} end. -ensure_local_assembler(Storage, Transfer, Size) -> - {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size), +ensure_local_assembler(Storage, Transfer, Size, Opts) -> + {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size, Opts), {async, Pid}. -spec transfers(storage()) -> diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index e582db01f..ae274cd86 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -159,6 +159,10 @@ t_invalid_topic_format(Config) -> unspecified_error, emqtt:publish(C, <<"$file/fileid/fin/offset">>, <<>>, 1) ), + ?assertRCName( + unspecified_error, + emqtt:publish(C, <<"$file/fileid/fin/42/xyz">>, <<>>, 1) + ), ?assertRCName( unspecified_error, emqtt:publish(C, <<"$file/">>, <<>>, 1) @@ -390,9 +394,18 @@ t_invalid_checksum(Config) -> with_offsets(Data) ), + % Send `fin` w/o checksum, should fail since filemeta checksum is invalid + FinTopic = mk_fin_topic(FileId, Filesize), ?assertRCName( unspecified_error, - emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1) + emqtt:publish(C, FinTopic, <<>>, 1) + ), + + % Send `fin` with the correct checksum + Checksum = binary:encode_hex(sha256(Data)), + ?assertRCName( + success, + emqtt:publish(C, <>, <<>>, 1) ). t_corrupted_segment_retry(Config) -> @@ -507,7 +520,7 @@ t_assemble_crash(Config) -> C = ?config(client, Config), meck:new(emqx_ft_storage_fs), - meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _) -> meck:exception(error, oops) end), + meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _, _) -> meck:exception(error, oops) end), ?assertRCName( unspecified_error, diff --git a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl index c1deeb3bc..1dcc8a79d 100644 --- a/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_assembler_SUITE.erl @@ -178,7 +178,7 @@ complete_assemble(Storage, Transfer, Size) -> complete_assemble(Storage, Transfer, Size, 1000). complete_assemble(Storage, Transfer, Size, Timeout) -> - {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size), + {async, Pid} = emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}), MRef = erlang:monitor(process, Pid), Pid ! kickoff, receive diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl index 842ae6bad..12f91c808 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_gc_SUITE.erl @@ -381,7 +381,7 @@ complete_transfer(Storage, Transfer, Size) -> complete_transfer(Storage, Transfer, Size, 100). complete_transfer(Storage, Transfer, Size, Timeout) -> - case emqx_ft_storage_fs:assemble(Storage, Transfer, Size) of + case emqx_ft_storage_fs:assemble(Storage, Transfer, Size, #{}) of ok -> ok; {async, Pid} ->