fix: avoid uploading blocks too frequently and splitting large buffers

Fixes https://github.com/emqx/emqx/pull/13069#discussion_r1613706747
This commit is contained in:
Thales Macedo Garitezi 2024-05-27 14:59:01 -03:00
parent af99829a21
commit 616f14ae53
4 changed files with 61 additions and 174 deletions

View File

@ -144,6 +144,16 @@ fields(aggregation) ->
default => 1_000_000,
desc => ?DESC("aggregation_max_records")
}
)},
{min_block_size,
mk(
emqx_schema:bytesize(),
#{
default => <<"10mb">>,
importance => ?IMPORTANCE_HIDDEN,
required => true,
validator => fun block_size_validator/1
}
)}
];
fields(common_action_parameters) ->
@ -156,7 +166,7 @@ fields(common_action_parameters) ->
importance => ?IMPORTANCE_HIDDEN,
desc => ?DESC("max_block_size"),
required => true,
validator => fun max_block_size_validator/1
validator => fun block_size_validator/1
}
)}
];
@ -293,7 +303,7 @@ scunion(Field, Schemas, {value, Value}) ->
throw(#{field_name => Field, expected => maps:keys(Schemas)})
end.
max_block_size_validator(SizeLimit) ->
block_size_validator(SizeLimit) ->
case SizeLimit =< 4_000 * 1024 * 1024 of
true -> ok;
false -> {error, "must be less than 4000 MiB"}

View File

@ -55,10 +55,6 @@
%% `emqx_template' API
-export([lookup/2]).
-ifdef(TEST).
-export([take_chunk/2]).
-endif.
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
@ -131,6 +127,7 @@
action := binary(),
blob := emqx_template:t(),
container := string(),
min_block_size := pos_integer(),
max_block_size := pos_integer(),
pool := connector_resource_id()
}
@ -144,6 +141,8 @@
buffer_size := non_neg_integer(),
container := container(),
max_block_size := pos_integer(),
min_block_size := pos_integer(),
next_block := queue:queue(iolist()),
num_blocks := non_neg_integer(),
pool := pool_name(),
started := boolean()
@ -348,6 +347,7 @@ init_transfer_state(Buffer, Opts) ->
blob := BlobTemplate,
container := Container,
max_block_size := MaxBlockSize,
min_block_size := MinBlockSize,
pool := Pool
}
} = Opts,
@ -358,6 +358,8 @@ init_transfer_state(Buffer, Opts) ->
buffer_size => 0,
container => Container,
max_block_size => MaxBlockSize,
min_block_size => MinBlockSize,
next_block => queue:new(),
num_blocks => 0,
pool => Pool,
started => false
@ -371,12 +373,25 @@ mk_blob_name_key(Buffer, ActionName, BlobTemplate) ->
process_append(IOData, TransferState0) ->
#{
buffer := Buffer,
buffer_size := BufferSize0
buffer_size := BufferSize0,
min_block_size := MinBlockSize,
next_block := NextBlock0
} = TransferState0,
TransferState0#{
buffer := [Buffer, IOData],
buffer_size := BufferSize0 + iolist_size(IOData)
}.
Size = iolist_size(IOData),
case Size + BufferSize0 >= MinBlockSize of
true ->
%% Block is ready to be written.
TransferState0#{
buffer := [],
buffer_size := 0,
next_block := queue:in([Buffer, IOData], NextBlock0)
};
false ->
TransferState0#{
buffer := [Buffer, IOData],
buffer_size := BufferSize0 + Size
}
end.
-spec process_write(transfer_state()) ->
{ok, transfer_state()} | {error, term()}.
@ -396,25 +411,19 @@ process_write(TransferState0 = #{started := false}) ->
{error, Reason} ->
{error, Reason}
end;
process_write(TransferState = #{started := true, buffer_size := 0}) ->
{ok, TransferState};
process_write(TransferState0 = #{started := true}) ->
#{
buffer := Buffer,
buffer_size := BufferSize,
max_block_size := MaxBlockSize
next_block := NextBlock0
} = TransferState0,
case BufferSize > MaxBlockSize of
true ->
{IOData, NewBuffer} = take_chunk(Buffer, MaxBlockSize),
case queue:out(NextBlock0) of
{{value, Block}, NextBlock} ->
?tp(azure_blob_storage_will_write_chunk, #{}),
do_process_write(IOData, NewBuffer, TransferState0);
false ->
NewBuffer = [],
do_process_write(Buffer, NewBuffer, TransferState0)
do_process_write(Block, TransferState0#{next_block := NextBlock});
{empty, _} ->
{ok, TransferState0}
end.
do_process_write(IOData, NewBuffer, TransferState0 = #{started := true}) ->
do_process_write(IOData, TransferState0 = #{started := true}) ->
#{
blob := Blob,
container := Container,
@ -423,12 +432,7 @@ do_process_write(IOData, NewBuffer, TransferState0 = #{started := true}) ->
} = TransferState0,
case append_data(Pool, Container, Blob, block_id(NumBlocks), IOData) of
{ok, _} ->
BufferSize = iolist_size(NewBuffer),
TransferState = TransferState0#{
buffer := NewBuffer,
buffer_size := BufferSize,
num_blocks := NumBlocks + 1
},
TransferState = TransferState0#{num_blocks := NumBlocks + 1},
process_write(TransferState);
{error, Reason} ->
{error, Reason}
@ -439,10 +443,21 @@ do_process_write(IOData, NewBuffer, TransferState0 = #{started := true}) ->
process_complete(TransferState) ->
#{
blob := Blob,
buffer := Buffer,
buffer_size := BufferSize,
container := Container,
num_blocks := NumBlocks,
num_blocks := NumBlocks0,
pool := Pool
} = TransferState,
%% Flush any left-over data
NumBlocks =
case BufferSize > 0 of
true ->
{ok, #{num_blocks := NumBlocks1}} = do_process_write(Buffer, TransferState),
NumBlocks1;
false ->
NumBlocks0
end,
BlockRefs = [{block_id(N), latest} || N <- lists:seq(0, NumBlocks - 1)],
case put_block_list(Pool, Container, Blob, BlockRefs) of
{ok, _} ->
@ -511,6 +526,7 @@ install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState)
aggregation := #{
container := ContainerOpts,
max_records := MaxRecords,
min_block_size := MinBlockSize,
time_interval := TimeInterval
},
container := ContainerName,
@ -531,6 +547,7 @@ install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState)
blob => Blob,
container => ContainerName,
max_block_size => MaxBlockSize,
min_block_size => MinBlockSize,
pool => Pool
},
DeliveryOpts = #{
@ -707,49 +724,6 @@ check_aggreg_upload_errors(AggregId) ->
check_container_accessible(Pool, Container) ->
list_blobs(Pool, Container).
-spec take_chunk(transfer_buffer(), pos_integer()) -> {_IOList :: iolist(), transfer_buffer()}.
take_chunk(Buffer, MaxBlockSize) ->
RemainingBytes = MaxBlockSize,
take_chunk(Buffer, RemainingBytes, _IOListAcc = []).
-spec take_chunk(transfer_buffer(), non_neg_integer(), IOList) ->
{IOList, transfer_buffer()}
when
IOList :: iolist().
take_chunk(RemainingBuffer = [], _RemainingBytes, IOListAcc) ->
{IOListAcc, RemainingBuffer};
take_chunk(RemainingBuffer, RemainingBytes, IOListAcc) when RemainingBytes =< 0 ->
{IOListAcc, [RemainingBuffer]};
take_chunk([Data0], RemainingBytes, IOListAcc) ->
case do_take_chunk([Data0], RemainingBytes) of
{done, Data} ->
{[IOListAcc, Data], _RemainingBuffer = []};
{more, {Data, Rest}} ->
{[IOListAcc, Data], [Rest]}
end;
take_chunk([Data0, Rest0], RemainingBytes0, IOListAcc) ->
case do_take_chunk(Data0, RemainingBytes0) of
{done, Data} ->
RemainingBytes = RemainingBytes0 - iolist_size(Data),
take_chunk([Rest0], RemainingBytes, [IOListAcc, Data]);
{more, {Data, Rest}} ->
{[IOListAcc, Data], [Rest, Rest0]}
end.
do_take_chunk(Data, ChunkSize) ->
case iolist_size(Data) =< ChunkSize of
true ->
{done, Data};
false ->
BinData0 = iolist_to_binary(Data),
{more, split_binary(BinData0, ChunkSize)}
end.
%% ensure_transfer_buffer_shape([_, _] = Buffer) ->
%% Buffer;
%% ensure_transfer_buffer_shape(Data) ->
%% [Data].
block_id(N) ->
NumDigits = 32,
list_to_binary(string:pad(integer_to_list(N), NumDigits, leading, $0)).

View File

@ -615,7 +615,9 @@ t_aggreg_pending_upload_restart(Config) ->
#{
<<"parameters">> =>
#{
<<"max_block_size">> => <<"1024B">>
<<"aggregation">> => #{
<<"min_block_size">> => <<"1024B">>
}
}
}
)

View File

@ -1,99 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_tests).
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
take_chunk(Buffer, BlockSize) ->
emqx_bridge_azure_blob_storage_connector:take_chunk(Buffer, BlockSize).
%%------------------------------------------------------------------------------
%% Generators
%%------------------------------------------------------------------------------
iodata_gen() ->
frequency([
{10, []},
{10, binary()},
{10, list(oneof([binary(), byte()]))},
{1, ?SIZED(Size, resize(Size div 3, list(iodata_gen())))}
]).
buffer_gen() ->
?LET(
DataParts,
list(iodata_gen()),
lists:foldl(
fun(DataPart, Buffer) ->
[Buffer, DataPart]
end,
_InitialBuffer = [],
DataParts
)
).
%%------------------------------------------------------------------------------
%% Properties
%%------------------------------------------------------------------------------
%% Verifies that we can take several chunks from the buffer, and that data is not lost nor
%% created.
take_chunk_preserves_data_prop() ->
?FORALL(
{Buffer, BlockSize, Steps},
{buffer_gen(), non_neg_integer(), pos_integer()},
begin
{Chunks, FinalBuffer} =
lists:mapfoldl(
fun(_, Acc) ->
take_chunk(Acc, BlockSize)
end,
Buffer,
lists:seq(1, Steps)
),
%% Original buffer is preserved
BufferBin = iolist_to_binary(Buffer),
ConcatenatedBin = iolist_to_binary([Chunks, FinalBuffer]),
?WHENFAIL(
ct:pal(
"block size: ~b\nsteps: ~b\noriginal buffer:\n ~p\nchunks + final buffer:\n ~p",
[BlockSize, Steps, Buffer, {Chunks, FinalBuffer}]
),
BufferBin =:= ConcatenatedBin
)
end
).
%% Verifies that the produced chunk has at most the requested size.
take_chunk_size_prop() ->
?FORALL(
{Buffer, BlockSize},
{buffer_gen(), non_neg_integer()},
begin
{Chunk, FinalBuffer} = take_chunk(Buffer, BlockSize),
?WHENFAIL(
ct:pal(
"block size: ~b\n\noriginal buffer:\n ~p\nchunk + final buffer:\n ~p"
"\nchunk size: ~b",
[BlockSize, Buffer, {Chunk, FinalBuffer}, iolist_size(Chunk)]
),
iolist_size(Chunk) =< BlockSize
)
end
).
%%------------------------------------------------------------------------------
%% Tests
%%------------------------------------------------------------------------------
take_chunk_test_() ->
Props = [take_chunk_preserves_data_prop(), take_chunk_size_prop()],
Opts = [{numtests, 1000}, {to_file, user}, {max_size, 100}],
{timeout, 300, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}.