feat(ft): add segment checksum validation

Also downgrade validation errors to mere info messages.
This commit is contained in:
Andrew Mayorov 2023-02-20 20:09:29 +03:00 committed by Ilya Averyanov
parent 5998961f9f
commit 15d967459c
2 changed files with 157 additions and 99 deletions

View File

@ -20,6 +20,7 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-export([
hook/0,
@ -156,7 +157,12 @@ on_file_command(PacketId, FileId, Msg, FileCommand) ->
Transfer = transfer(Msg, FileId),
case FileCommand of
[<<"init">>] ->
on_init(PacketId, Msg, Transfer);
validate(
[{filemeta, Msg#message.payload}],
fun([Meta]) ->
on_init(PacketId, Msg, Transfer, Meta)
end
);
[<<"fin">>, FinalSizeBin | MaybeChecksum] when length(MaybeChecksum) =< 1 ->
ChecksumBin = emqx_maybe:from_list(MaybeChecksum),
validate(
@ -172,51 +178,47 @@ on_file_command(PacketId, FileId, Msg, FileCommand) ->
on_segment(PacketId, Msg, Transfer, Offset, undefined)
end);
[OffsetBin, ChecksumBin] ->
validate([{offset, OffsetBin}, {checksum, ChecksumBin}], fun([Offset, Checksum]) ->
on_segment(PacketId, Msg, Transfer, Offset, Checksum)
end);
validate(
[{offset, OffsetBin}, {checksum, ChecksumBin}],
fun([Offset, Checksum]) ->
validate(
[{integrity, Msg#message.payload, Checksum}],
fun(_) ->
on_segment(PacketId, Msg, Transfer, Offset, Checksum)
end
)
end
);
_ ->
?RC_UNSPECIFIED_ERROR
end.
on_init(PacketId, Msg, Transfer) ->
on_init(PacketId, Msg, Transfer, Meta) ->
?SLOG(info, #{
msg => "on_init",
mqtt_msg => Msg,
packet_id => PacketId,
transfer => Transfer
transfer => Transfer,
filemeta => Meta
}),
Payload = Msg#message.payload,
PacketKey = {self(), PacketId},
% %% Add validations here
case decode_filemeta(Payload) of
{ok, Meta} ->
Callback = fun(Result) ->
?MODULE:on_complete("store_filemeta", PacketKey, Transfer, Result)
end,
with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() ->
case store_filemeta(Transfer, Meta) of
% Stored, ack through the responder right away
ok ->
emqx_ft_responder:ack(PacketKey, ok);
% Storage operation started, packet will be acked by the responder
% {async, Pid} ->
% ok = emqx_ft_responder:kickoff(PacketKey, Pid),
% ok;
%% Storage operation failed, ack through the responder
{error, _} = Error ->
emqx_ft_responder:ack(PacketKey, Error)
end
end);
{error, Reason} ->
?SLOG(error, #{
msg => "on_init: invalid filemeta",
mqtt_msg => Msg,
transfer => Transfer,
reason => Reason
}),
?RC_UNSPECIFIED_ERROR
end.
Callback = fun(Result) ->
?MODULE:on_complete("store_filemeta", PacketKey, Transfer, Result)
end,
with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() ->
case store_filemeta(Transfer, Meta) of
% Stored, ack through the responder right away
ok ->
emqx_ft_responder:ack(PacketKey, ok);
% Storage operation started, packet will be acked by the responder
% {async, Pid} ->
% ok = emqx_ft_responder:kickoff(PacketKey, Pid),
% ok;
%% Storage operation failed, ack through the responder
{error, _} = Error ->
emqx_ft_responder:ack(PacketKey, Error)
end
end).
on_abort(_Msg, _FileId) ->
%% TODO
@ -231,14 +233,11 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) ->
offset => Offset,
checksum => Checksum
}),
%% TODO: handle checksum
Payload = Msg#message.payload,
Segment = {Offset, Payload},
Segment = {Offset, Msg#message.payload},
PacketKey = {self(), PacketId},
Callback = fun(Result) ->
?MODULE:on_complete("store_segment", PacketKey, Transfer, Result)
end,
%% Add offset/checksum validations
with_responder(PacketKey, Callback, ?STORE_SEGMENT_TIMEOUT, fun() ->
case store_segment(Transfer, Segment) of
ok ->
@ -360,10 +359,7 @@ validate(Validations, Fun) ->
{ok, Parsed} ->
Fun(Parsed);
{error, Reason} ->
?SLOG(error, #{
msg => "validate: invalid $file command",
reason => Reason
}),
?tp(info, "client_violated_protocol", #{reason => Reason}),
?RC_UNSPECIFIED_ERROR
end.
@ -376,6 +372,13 @@ do_validate([{fileid, FileId} | Rest], Parsed) ->
0 ->
{error, {invalid_fileid, FileId}}
end;
do_validate([{filemeta, Payload} | Rest], Parsed) ->
case decode_filemeta(Payload) of
{ok, Meta} ->
do_validate(Rest, [Meta | Parsed]);
{error, Reason} ->
{error, {invalid_filemeta, Reason}}
end;
do_validate([{offset, Offset} | Rest], Parsed) ->
case string:to_integer(Offset) of
{Int, <<>>} ->
@ -397,6 +400,13 @@ do_validate([{checksum, Checksum} | Rest], Parsed) ->
{error, _Reason} ->
{error, {invalid_checksum, Checksum}}
end;
do_validate([{integrity, Payload, Checksum} | Rest], Parsed) ->
case crypto:hash(sha256, Payload) of
Checksum ->
do_validate(Rest, [Payload | Parsed]);
Mismatch ->
{error, {checksum_mismatch, binary:encode_hex(Mismatch)}}
end;
do_validate([{{maybe, _}, undefined} | Rest], Parsed) ->
do_validate(Rest, [undefined | Parsed]);
do_validate([{{maybe, T}, Value} | Rest], Parsed) ->

View File

@ -37,8 +37,8 @@ all() ->
groups() ->
[
{single_node, [sequence], emqx_common_test_helpers:all(?MODULE) -- group_cluster()},
{cluster, [sequence], group_cluster()}
{single_node, [], emqx_common_test_helpers:all(?MODULE) -- group_cluster()},
{cluster, [], group_cluster()}
].
group_cluster() ->
@ -177,32 +177,28 @@ t_simple_transfer(Config) ->
Meta = #{size := Filesize} = meta(Filename, Data),
MetaPayload = emqx_json:encode(Meta),
MetaTopic = <<"$file/", FileId/binary, "/init">>,
?assertRCName(
success,
emqtt:publish(C, MetaTopic, MetaPayload, 1)
emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1)
),
lists:foreach(
fun({Chunk, Offset}) ->
SegmentTopic = <<"$file/", FileId/binary, "/", Offset/binary>>,
?assertRCName(
success,
emqtt:publish(C, SegmentTopic, Chunk, 1)
emqtt:publish(C, mk_segment_topic(FileId, Offset), Chunk, 1)
)
end,
with_offsets(Data)
),
FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>,
?assertRCName(
success,
emqtt:publish(C, FinTopic, <<>>, 1)
emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1)
),
{ok, [{ReadyTransferId, _}]} = emqx_ft_storage:ready_transfers(),
[ReadyTransferId] = list_ready_transfers(?config(clientid, Config)),
{ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId),
?assertEqual(
iolist_to_binary(Data),
iolist_to_binary(qlc:eval(TableQH))
@ -217,10 +213,9 @@ t_meta_conflict(Config) ->
Meta = meta(Filename, [<<"x">>]),
MetaPayload = emqx_json:encode(Meta),
MetaTopic = <<"$file/", FileId/binary, "/init">>,
?assertRCName(
success,
emqtt:publish(C, MetaTopic, MetaPayload, 1)
emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1)
),
ConflictMeta = Meta#{name => <<"conflict.pdf">>},
@ -228,7 +223,7 @@ t_meta_conflict(Config) ->
?assertRCName(
unspecified_error,
emqtt:publish(C, MetaTopic, ConflictMetaPayload, 1)
emqtt:publish(C, mk_init_topic(FileId), ConflictMetaPayload, 1)
).
t_no_meta(Config) ->
@ -237,16 +232,14 @@ t_no_meta(Config) ->
FileId = <<"f1">>,
Data = <<"first">>,
SegmentTopic = <<"$file/", FileId/binary, "/0">>,
?assertRCName(
success,
emqtt:publish(C, SegmentTopic, Data, 1)
emqtt:publish(C, mk_segment_topic(FileId, 0), Data, 1)
),
FinTopic = <<"$file/", FileId/binary, "/fin/42">>,
?assertRCName(
unspecified_error,
emqtt:publish(C, FinTopic, <<>>, 1)
emqtt:publish(C, mk_fin_topic(FileId, 42), <<>>, 1)
).
t_no_segment(Config) ->
@ -260,28 +253,25 @@ t_no_segment(Config) ->
Meta = #{size := Filesize} = meta(Filename, Data),
MetaPayload = emqx_json:encode(Meta),
MetaTopic = <<"$file/", FileId/binary, "/init">>,
?assertRCName(
success,
emqtt:publish(C, MetaTopic, MetaPayload, 1)
emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1)
),
lists:foreach(
fun({Chunk, Offset}) ->
SegmentTopic = <<"$file/", FileId/binary, "/", Offset/binary>>,
?assertRCName(
success,
emqtt:publish(C, SegmentTopic, Chunk, 1)
emqtt:publish(C, mk_segment_topic(FileId, Offset), Chunk, 1)
)
end,
%% Skip the first segment
tl(with_offsets(Data))
),
FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>,
?assertRCName(
unspecified_error,
emqtt:publish(C, FinTopic, <<>>, 1)
emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1)
).
t_invalid_meta(Config) ->
@ -289,20 +279,18 @@ t_invalid_meta(Config) ->
FileId = <<"f1">>,
MetaTopic = <<"$file/", FileId/binary, "/init">>,
%% Invalid schema
Meta = #{foo => <<"bar">>},
MetaPayload = emqx_json:encode(Meta),
?assertRCName(
unspecified_error,
emqtt:publish(C, MetaTopic, MetaPayload, 1)
emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1)
),
%% Invalid JSON
?assertRCName(
unspecified_error,
emqtt:publish(C, MetaTopic, <<"{oops;">>, 1)
emqtt:publish(C, mk_init_topic(FileId), <<"{oops;">>, 1)
).
t_invalid_checksum(Config) ->
@ -316,27 +304,73 @@ t_invalid_checksum(Config) ->
Meta = #{size := Filesize} = meta(Filename, Data),
MetaPayload = emqx_json:encode(Meta#{checksum => sha256hex(<<"invalid">>)}),
MetaTopic = <<"$file/", FileId/binary, "/init">>,
?assertRCName(
success,
emqtt:publish(C, MetaTopic, MetaPayload, 1)
emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1)
),
lists:foreach(
fun({Chunk, Offset}) ->
SegmentTopic = <<"$file/", FileId/binary, "/", Offset/binary>>,
?assertRCName(
success,
emqtt:publish(C, SegmentTopic, Chunk, 1)
emqtt:publish(C, mk_segment_topic(FileId, Offset), Chunk, 1)
)
end,
with_offsets(Data)
),
FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>,
?assertRCName(
unspecified_error,
emqtt:publish(C, FinTopic, <<>>, 1)
emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1)
).
t_corrupted_segment_retry(Config) ->
C = ?config(client, Config),
Filename = <<"corruption.pdf">>,
FileId = <<"4242-4242">>,
Data = [<<"first">>, <<"second">>, <<"third">>],
[
{Seg1, Offset1},
{Seg2, Offset2},
{Seg3, Offset3}
] = with_offsets(Data),
[
Checksum1,
Checksum2,
Checksum3
] = [sha256hex(S) || S <- Data],
Meta = #{size := Filesize} = meta(Filename, Data),
?assertRCName(success, emqtt:publish(C, mk_init_topic(FileId), emqx_json:encode(Meta), 1)),
?assertRCName(
success,
emqtt:publish(C, mk_segment_topic(FileId, Offset1, Checksum1), Seg1, 1)
),
% segment is corrupted
?assertRCName(
unspecified_error,
emqtt:publish(C, mk_segment_topic(FileId, Offset2, Checksum2), <<Seg2/binary, 42>>, 1)
),
% retry
?assertRCName(
success,
emqtt:publish(C, mk_segment_topic(FileId, Offset2, Checksum2), Seg2, 1)
),
?assertRCName(
success,
emqtt:publish(C, mk_segment_topic(FileId, Offset3, Checksum3), Seg3, 1)
),
?assertRCName(
success,
emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1)
).
t_switch_node(Config) ->
@ -359,14 +393,13 @@ t_switch_node(Config) ->
Meta = #{size := Filesize} = meta(Filename, Data),
MetaPayload = emqx_json:encode(Meta),
MetaTopic = <<"$file/", FileId/binary, "/init">>,
?assertRCName(
success,
emqtt:publish(C1, MetaTopic, MetaPayload, 1)
emqtt:publish(C1, mk_init_topic(FileId), MetaPayload, 1)
),
?assertRCName(
success,
emqtt:publish(C1, <<"$file/", FileId/binary, "/", Offset0/binary>>, Data0, 1)
emqtt:publish(C1, mk_segment_topic(FileId, Offset0), Data0, 1)
),
%% Then, switch the client to the main node
@ -378,29 +411,24 @@ t_switch_node(Config) ->
?assertRCName(
success,
emqtt:publish(C2, <<"$file/", FileId/binary, "/", Offset1/binary>>, Data1, 1)
emqtt:publish(C2, mk_segment_topic(FileId, Offset1), Data1, 1)
),
?assertRCName(
success,
emqtt:publish(C2, <<"$file/", FileId/binary, "/", Offset2/binary>>, Data2, 1)
emqtt:publish(C2, mk_segment_topic(FileId, Offset2), Data2, 1)
),
FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>,
?assertRCName(
success,
emqtt:publish(C2, FinTopic, <<>>, 1)
emqtt:publish(C2, mk_fin_topic(FileId, Filesize), <<>>, 1)
),
ok = emqtt:stop(C2),
%% Now check consistency of the file
{ok, ReadyTransfers} = emqx_ft_storage:ready_transfers(),
{ReadyTransferIds, _} = lists:unzip(ReadyTransfers),
[ReadyTransferId] = [Id || #{<<"clientid">> := CId} = Id <- ReadyTransferIds, CId == ClientId],
[ReadyTransferId] = list_ready_transfers(ClientId),
{ok, TableQH} = emqx_ft_storage:get_ready_transfer(ReadyTransferId),
?assertEqual(
iolist_to_binary(Data),
iolist_to_binary(qlc:eval(TableQH))
@ -476,9 +504,7 @@ t_unreliable_migrating_client(Config) ->
],
_Context = run_commands(Commands, Context),
{ok, ReadyTransfers} = emqx_ft_storage:ready_transfers(),
ReadyTransferIds =
[Id || {#{<<"clientid">> := CId} = Id, _Info} <- ReadyTransfers, CId == ClientId],
ReadyTransferIds = list_ready_transfers(?config(clientid, Config)),
% NOTE
% The cluster had 2 assemblers running on two different nodes, because client sent `fin`
@ -526,16 +552,13 @@ disown_mqtt_client(Context = #{}) ->
Context.
send_filemeta(Meta, Context = #{client := Client, fileid := FileId}) ->
Topic = <<"$file/", FileId/binary, "/init">>,
MetaPayload = emqx_json:encode(Meta),
?assertRCName(
success,
emqtt:publish(Client, Topic, MetaPayload, 1)
emqtt:publish(Client, mk_init_topic(FileId), emqx_json:encode(Meta), 1)
),
Context.
send_segment(Offset, Size, Context = #{client := Client, fileid := FileId, payload := Payload}) ->
Topic = <<"$file/", FileId/binary, "/", (integer_to_binary(Offset))/binary>>,
Data =
case Size of
eof ->
@ -545,15 +568,14 @@ send_segment(Offset, Size, Context = #{client := Client, fileid := FileId, paylo
end,
?assertRCName(
success,
emqtt:publish(Client, Topic, Data, 1)
emqtt:publish(Client, mk_segment_topic(FileId, Offset), Data, 1)
),
Context.
send_finish(Context = #{client := Client, fileid := FileId, filesize := Filesize}) ->
Topic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Filesize))/binary>>,
?assertRCName(
success,
emqtt:publish(Client, Topic, <<>>, 1)
emqtt:publish(Client, mk_fin_topic(FileId, Filesize), <<>>, 1)
),
Context.
@ -561,6 +583,24 @@ send_finish(Context = #{client := Client, fileid := FileId, filesize := Filesize
%% Helpers
%%--------------------------------------------------------------------
mk_init_topic(FileId) ->
<<"$file/", FileId/binary, "/init">>.
mk_segment_topic(FileId, Offset) when is_integer(Offset) ->
mk_segment_topic(FileId, integer_to_binary(Offset));
mk_segment_topic(FileId, Offset) when is_binary(Offset) ->
<<"$file/", FileId/binary, "/", Offset/binary>>.
mk_segment_topic(FileId, Offset, Checksum) when is_integer(Offset) ->
mk_segment_topic(FileId, integer_to_binary(Offset), Checksum);
mk_segment_topic(FileId, Offset, Checksum) when is_binary(Offset) ->
<<"$file/", FileId/binary, "/", Offset/binary, "/", Checksum/binary>>.
mk_fin_topic(FileId, Size) when is_integer(Size) ->
mk_fin_topic(FileId, integer_to_binary(Size));
mk_fin_topic(FileId, Size) when is_binary(Size) ->
<<"$file/", FileId/binary, "/fin/", Size/binary>>.
with_offsets(Items) ->
{List, _} = lists:mapfoldl(
fun(Item, Offset) ->
@ -582,3 +622,11 @@ meta(FileName, Data) ->
expire_at => erlang:system_time(_Unit = second) + 3600,
size => byte_size(FullData)
}.
list_ready_transfers(ClientId) ->
{ok, ReadyTransfers} = emqx_ft_storage:ready_transfers(),
[
Id
|| {#{<<"clientid">> := CId} = Id, _Info} <- ReadyTransfers,
CId == ClientId
].