From 15d967459c7f3e8092465033afdae29c24ba6ab3 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 20 Feb 2023 20:09:29 +0300 Subject: [PATCH] feat(ft): add segment checksum validation Also downgrade validation errors to mere info messages. --- apps/emqx_ft/src/emqx_ft.erl | 98 +++++++++-------- apps/emqx_ft/test/emqx_ft_SUITE.erl | 158 ++++++++++++++++++---------- 2 files changed, 157 insertions(+), 99 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index baa0126d6..2ccd5db15 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -20,6 +20,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -export([ hook/0, @@ -156,7 +157,12 @@ on_file_command(PacketId, FileId, Msg, FileCommand) -> Transfer = transfer(Msg, FileId), case FileCommand of [<<"init">>] -> - on_init(PacketId, Msg, Transfer); + validate( + [{filemeta, Msg#message.payload}], + fun([Meta]) -> + on_init(PacketId, Msg, Transfer, Meta) + end + ); [<<"fin">>, FinalSizeBin | MaybeChecksum] when length(MaybeChecksum) =< 1 -> ChecksumBin = emqx_maybe:from_list(MaybeChecksum), validate( @@ -172,51 +178,47 @@ on_file_command(PacketId, FileId, Msg, FileCommand) -> on_segment(PacketId, Msg, Transfer, Offset, undefined) end); [OffsetBin, ChecksumBin] -> - validate([{offset, OffsetBin}, {checksum, ChecksumBin}], fun([Offset, Checksum]) -> - on_segment(PacketId, Msg, Transfer, Offset, Checksum) - end); + validate( + [{offset, OffsetBin}, {checksum, ChecksumBin}], + fun([Offset, Checksum]) -> + validate( + [{integrity, Msg#message.payload, Checksum}], + fun(_) -> + on_segment(PacketId, Msg, Transfer, Offset, Checksum) + end + ) + end + ); _ -> ?RC_UNSPECIFIED_ERROR end. -on_init(PacketId, Msg, Transfer) -> +on_init(PacketId, Msg, Transfer, Meta) -> ?SLOG(info, #{ msg => "on_init", mqtt_msg => Msg, packet_id => PacketId, - transfer => Transfer + transfer => Transfer, + filemeta => Meta }), - Payload = Msg#message.payload, PacketKey = {self(), PacketId}, - % %% Add validations here - case decode_filemeta(Payload) of - {ok, Meta} -> - Callback = fun(Result) -> - ?MODULE:on_complete("store_filemeta", PacketKey, Transfer, Result) - end, - with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() -> - case store_filemeta(Transfer, Meta) of - % Stored, ack through the responder right away - ok -> - emqx_ft_responder:ack(PacketKey, ok); - % Storage operation started, packet will be acked by the responder - % {async, Pid} -> - % ok = emqx_ft_responder:kickoff(PacketKey, Pid), - % ok; - %% Storage operation failed, ack through the responder - {error, _} = Error -> - emqx_ft_responder:ack(PacketKey, Error) - end - end); - {error, Reason} -> - ?SLOG(error, #{ - msg => "on_init: invalid filemeta", - mqtt_msg => Msg, - transfer => Transfer, - reason => Reason - }), - ?RC_UNSPECIFIED_ERROR - end. + Callback = fun(Result) -> + ?MODULE:on_complete("store_filemeta", PacketKey, Transfer, Result) + end, + with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() -> + case store_filemeta(Transfer, Meta) of + % Stored, ack through the responder right away + ok -> + emqx_ft_responder:ack(PacketKey, ok); + % Storage operation started, packet will be acked by the responder + % {async, Pid} -> + % ok = emqx_ft_responder:kickoff(PacketKey, Pid), + % ok; + %% Storage operation failed, ack through the responder + {error, _} = Error -> + emqx_ft_responder:ack(PacketKey, Error) + end + end). on_abort(_Msg, _FileId) -> %% TODO @@ -231,14 +233,11 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) -> offset => Offset, checksum => Checksum }), - %% TODO: handle checksum - Payload = Msg#message.payload, - Segment = {Offset, Payload}, + Segment = {Offset, Msg#message.payload}, PacketKey = {self(), PacketId}, Callback = fun(Result) -> ?MODULE:on_complete("store_segment", PacketKey, Transfer, Result) end, - %% Add offset/checksum validations with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() -> case store_segment(Transfer, Segment) of ok -> @@ -360,10 +359,7 @@ validate(Validations, Fun) -> {ok, Parsed} -> Fun(Parsed); {error, Reason} -> - ?SLOG(error, #{ - msg => "validate: invalid $file command", - reason => Reason - }), + ?tp(info, "client_violated_protocol", #{reason => Reason}), ?RC_UNSPECIFIED_ERROR end. @@ -376,6 +372,13 @@ do_validate([{fileid, FileId} | Rest], Parsed) -> 0 -> {error, {invalid_fileid, FileId}} end; +do_validate([{filemeta, Payload} | Rest], Parsed) -> + case decode_filemeta(Payload) of + {ok, Meta} -> + do_validate(Rest, [Meta | Parsed]); + {error, Reason} -> + {error, {invalid_filemeta, Reason}} + end; do_validate([{offset, Offset} | Rest], Parsed) -> case string:to_integer(Offset) of {Int, <<>>} -> @@ -397,6 +400,13 @@ do_validate([{checksum, Checksum} | Rest], Parsed) -> {error, _Reason} -> {error, {invalid_checksum, Checksum}} end; +do_validate([{integrity, Payload, Checksum} | Rest], Parsed) -> + case crypto:hash(sha256, Payload) of + Checksum -> + do_validate(Rest, [Payload | Parsed]); + Mismatch -> + {error, {checksum_mismatch, binary:encode_hex(Mismatch)}} + end; do_validate([{{maybe, _}, undefined} | Rest], Parsed) -> do_validate(Rest, [undefined | Parsed]); do_validate([{{maybe, T}, Value} | Rest], Parsed) -> diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 37cc38d4f..bb67d3c56 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -37,8 +37,8 @@ all() -> groups() -> [ - {single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- group_cluster()}, - {cluster, [sequence], group_cluster()} + {single_node, [], emqx_common_test_helpers:all(?MODULE) -- group_cluster()}, + {cluster, [], group_cluster()} ]. group_cluster() -> @@ -177,32 +177,28 @@ t_simple_transfer(Config) -> Meta = #{size := Filesize} = meta(Filename, Data), MetaPayload = emqx_json:encode(Meta), - MetaTopic = <<"$file/", FileId/binary, "/init">>, ?assertRCName( success, - emqtt:publish(C, MetaTopic, MetaPayload, 1) + emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1) ), lists:foreach( fun({Chunk, Offset}) -> - SegmentTopic = <<"$file/", FileId/binary, "/", Offset/binary>>, ?assertRCName( success, - emqtt:publish(C, SegmentTopic, Chunk, 1) + emqtt:publish(C, mk_segment_topic(FileId, Offset), Chunk, 1) ) end, with_offsets(Data) ), - FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, ?assertRCName( success, - emqtt:publish(C, FinTopic, <<>>, 1) + emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1) ), - {ok, [{ReadyTransferId, _}]} = emqx_ft_storage:ready_transfers(), + [ReadyTransferId] = list_ready_transfers(?config(clientid, Config)), {ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId), - ?assertEqual( iolist_to_binary(Data), iolist_to_binary(qlc:eval(TableQH)) @@ -217,10 +213,9 @@ t_meta_conflict(Config) -> Meta = meta(Filename, [<<"x">>]), MetaPayload = emqx_json:encode(Meta), - MetaTopic = <<"$file/", FileId/binary, "/init">>, ?assertRCName( success, - emqtt:publish(C, MetaTopic, MetaPayload, 1) + emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1) ), ConflictMeta = Meta#{name => <<"conflict.pdf">>}, @@ -228,7 +223,7 @@ t_meta_conflict(Config) -> ?assertRCName( unspecified_error, - emqtt:publish(C, MetaTopic, ConflictMetaPayload, 1) + emqtt:publish(C, mk_init_topic(FileId), ConflictMetaPayload, 1) ). t_no_meta(Config) -> @@ -237,16 +232,14 @@ t_no_meta(Config) -> FileId = <<"f1">>, Data = <<"first">>, - SegmentTopic = <<"$file/", FileId/binary, "/0">>, ?assertRCName( success, - emqtt:publish(C, SegmentTopic, Data, 1) + emqtt:publish(C, mk_segment_topic(FileId, 0), Data, 1) ), - FinTopic = <<"$file/", FileId/binary, "/fin/42">>, ?assertRCName( unspecified_error, - emqtt:publish(C, FinTopic, <<>>, 1) + emqtt:publish(C, mk_fin_topic(FileId, 42), <<>>, 1) ). t_no_segment(Config) -> @@ -260,28 +253,25 @@ t_no_segment(Config) -> Meta = #{size := Filesize} = meta(Filename, Data), MetaPayload = emqx_json:encode(Meta), - MetaTopic = <<"$file/", FileId/binary, "/init">>, ?assertRCName( success, - emqtt:publish(C, MetaTopic, MetaPayload, 1) + emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1) ), lists:foreach( fun({Chunk, Offset}) -> - SegmentTopic = <<"$file/", FileId/binary, "/", Offset/binary>>, ?assertRCName( success, - emqtt:publish(C, SegmentTopic, Chunk, 1) + emqtt:publish(C, mk_segment_topic(FileId, Offset), Chunk, 1) ) end, %% Skip the first segment tl(with_offsets(Data)) ), - FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, ?assertRCName( unspecified_error, - emqtt:publish(C, FinTopic, <<>>, 1) + emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1) ). t_invalid_meta(Config) -> @@ -289,20 +279,18 @@ t_invalid_meta(Config) -> FileId = <<"f1">>, - MetaTopic = <<"$file/", FileId/binary, "/init">>, - %% Invalid schema Meta = #{foo => <<"bar">>}, MetaPayload = emqx_json:encode(Meta), ?assertRCName( unspecified_error, - emqtt:publish(C, MetaTopic, MetaPayload, 1) + emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1) ), %% Invalid JSON ?assertRCName( unspecified_error, - emqtt:publish(C, MetaTopic, <<"{oops;">>, 1) + emqtt:publish(C, mk_init_topic(FileId), <<"{oops;">>, 1) ). t_invalid_checksum(Config) -> @@ -316,27 +304,73 @@ t_invalid_checksum(Config) -> Meta = #{size := Filesize} = meta(Filename, Data), MetaPayload = emqx_json:encode(Meta#{checksum => sha256hex(<<"invalid">>)}), - MetaTopic = <<"$file/", FileId/binary, "/init">>, ?assertRCName( success, - emqtt:publish(C, MetaTopic, MetaPayload, 1) + emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1) ), lists:foreach( fun({Chunk, Offset}) -> - SegmentTopic = <<"$file/", FileId/binary, "/", Offset/binary>>, ?assertRCName( success, - emqtt:publish(C, SegmentTopic, Chunk, 1) + emqtt:publish(C, mk_segment_topic(FileId, Offset), Chunk, 1) ) end, with_offsets(Data) ), - FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, ?assertRCName( unspecified_error, - emqtt:publish(C, FinTopic, <<>>, 1) + emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1) + ). + +t_corrupted_segment_retry(Config) -> + C = ?config(client, Config), + + Filename = <<"corruption.pdf">>, + FileId = <<"4242-4242">>, + + Data = [<<"first">>, <<"second">>, <<"third">>], + [ + {Seg1, Offset1}, + {Seg2, Offset2}, + {Seg3, Offset3} + ] = with_offsets(Data), + [ + Checksum1, + Checksum2, + Checksum3 + ] = [sha256hex(S) || S <- Data], + + Meta = #{size := Filesize} = meta(Filename, Data), + + ?assertRCName(success, emqtt:publish(C, mk_init_topic(FileId), emqx_json:encode(Meta), 1)), + + ?assertRCName( + success, + emqtt:publish(C, mk_segment_topic(FileId, Offset1, Checksum1), Seg1, 1) + ), + + % segment is corrupted + ?assertRCName( + unspecified_error, + emqtt:publish(C, mk_segment_topic(FileId, Offset2, Checksum2), <>, 1) + ), + + % retry + ?assertRCName( + success, + emqtt:publish(C, mk_segment_topic(FileId, Offset2, Checksum2), Seg2, 1) + ), + + ?assertRCName( + success, + emqtt:publish(C, mk_segment_topic(FileId, Offset3, Checksum3), Seg3, 1) + ), + + ?assertRCName( + success, + emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1) ). t_switch_node(Config) -> @@ -359,14 +393,13 @@ t_switch_node(Config) -> Meta = #{size := Filesize} = meta(Filename, Data), MetaPayload = emqx_json:encode(Meta), - MetaTopic = <<"$file/", FileId/binary, "/init">>, ?assertRCName( success, - emqtt:publish(C1, MetaTopic, MetaPayload, 1) + emqtt:publish(C1, mk_init_topic(FileId), MetaPayload, 1) ), ?assertRCName( success, - emqtt:publish(C1, <<"$file/", FileId/binary, "/", Offset0/binary>>, Data0, 1) + emqtt:publish(C1, mk_segment_topic(FileId, Offset0), Data0, 1) ), %% Then, switch the client to the main node @@ -378,29 +411,24 @@ t_switch_node(Config) -> ?assertRCName( success, - emqtt:publish(C2, <<"$file/", FileId/binary, "/", Offset1/binary>>, Data1, 1) + emqtt:publish(C2, mk_segment_topic(FileId, Offset1), Data1, 1) ), ?assertRCName( success, - emqtt:publish(C2, <<"$file/", FileId/binary, "/", Offset2/binary>>, Data2, 1) + emqtt:publish(C2, mk_segment_topic(FileId, Offset2), Data2, 1) ), - FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, ?assertRCName( success, - emqtt:publish(C2, FinTopic, <<>>, 1) + emqtt:publish(C2, mk_fin_topic(FileId, Filesize), <<>>, 1) ), ok = emqtt:stop(C2), %% Now check consistency of the file - {ok, ReadyTransfers} = emqx_ft_storage:ready_transfers(), - {ReadyTransferIds, _} = lists:unzip(ReadyTransfers), - [ReadyTransferId] = [Id || #{<<"clientid">> := CId} = Id <- ReadyTransferIds, CId == ClientId], - + [ReadyTransferId] = list_ready_transfers(ClientId), {ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId), - ?assertEqual( iolist_to_binary(Data), iolist_to_binary(qlc:eval(TableQH)) @@ -476,9 +504,7 @@ t_unreliable_migrating_client(Config) -> ], _Context = run_commands(Commands, Context), - {ok, ReadyTransfers} = emqx_ft_storage:ready_transfers(), - ReadyTransferIds = - [Id || {#{<<"clientid">> := CId} = Id, _Info} <- ReadyTransfers, CId == ClientId], + ReadyTransferIds = list_ready_transfers(?config(clientid, Config)), % NOTE % The cluster had 2 assemblers running on two different nodes, because client sent `fin` @@ -526,16 +552,13 @@ disown_mqtt_client(Context = #{}) -> Context. send_filemeta(Meta, Context = #{client := Client, fileid := FileId}) -> - Topic = <<"$file/", FileId/binary, "/init">>, - MetaPayload = emqx_json:encode(Meta), ?assertRCName( success, - emqtt:publish(Client, Topic, MetaPayload, 1) + emqtt:publish(Client, mk_init_topic(FileId), emqx_json:encode(Meta), 1) ), Context. send_segment(Offset, Size, Context = #{client := Client, fileid := FileId, payload := Payload}) -> - Topic = <<"$file/", FileId/binary, "/", (integer_to_binary(Offset))/binary>>, Data = case Size of eof -> @@ -545,15 +568,14 @@ send_segment(Offset, Size, Context = #{client := Client, fileid := FileId, paylo end, ?assertRCName( success, - emqtt:publish(Client, Topic, Data, 1) + emqtt:publish(Client, mk_segment_topic(FileId, Offset), Data, 1) ), Context. send_finish(Context = #{client := Client, fileid := FileId, filesize := Filesize}) -> - Topic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>, ?assertRCName( success, - emqtt:publish(Client, Topic, <<>>, 1) + emqtt:publish(Client, mk_fin_topic(FileId, Filesize), <<>>, 1) ), Context. @@ -561,6 +583,24 @@ send_finish(Context = #{client := Client, fileid := FileId, filesize := Filesize %% Helpers %%-------------------------------------------------------------------- +mk_init_topic(FileId) -> + <<"$file/", FileId/binary, "/init">>. + +mk_segment_topic(FileId, Offset) when is_integer(Offset) -> + mk_segment_topic(FileId, integer_to_binary(Offset)); +mk_segment_topic(FileId, Offset) when is_binary(Offset) -> + <<"$file/", FileId/binary, "/", Offset/binary>>. + +mk_segment_topic(FileId, Offset, Checksum) when is_integer(Offset) -> + mk_segment_topic(FileId, integer_to_binary(Offset), Checksum); +mk_segment_topic(FileId, Offset, Checksum) when is_binary(Offset) -> + <<"$file/", FileId/binary, "/", Offset/binary, "/", Checksum/binary>>. + +mk_fin_topic(FileId, Size) when is_integer(Size) -> + mk_fin_topic(FileId, integer_to_binary(Size)); +mk_fin_topic(FileId, Size) when is_binary(Size) -> + <<"$file/", FileId/binary, "/fin/", Size/binary>>. + with_offsets(Items) -> {List, _} = lists:mapfoldl( fun(Item, Offset) -> @@ -582,3 +622,11 @@ meta(FileName, Data) -> expire_at => erlang:system_time(_Unit = second) + 3600, size => byte_size(FullData) }. + +list_ready_transfers(ClientId) -> + {ok, ReadyTransfers} = emqx_ft_storage:ready_transfers(), + [ + Id + || {#{<<"clientid">> := CId} = Id, _Info} <- ReadyTransfers, + CId == ClientId + ].