From 616f14ae535c7c8736d73948a5ec9896e02079e8 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 May 2024 14:59:01 -0300 Subject: [PATCH] fix: avoid uploading blocks too frequently and splitting large buffers Fixes https://github.com/emqx/emqx/pull/13069#discussion_r1613706747 --- ...ridge_azure_blob_storage_action_schema.erl | 14 ++- ...qx_bridge_azure_blob_storage_connector.erl | 118 +++++++----------- .../emqx_bridge_azure_blob_storage_SUITE.erl | 4 +- .../emqx_bridge_azure_blob_storage_tests.erl | 99 --------------- 4 files changed, 61 insertions(+), 174 deletions(-) delete mode 100644 apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_tests.erl diff --git a/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_action_schema.erl b/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_action_schema.erl index d1243f1e6..f50f1bd03 100644 --- a/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_action_schema.erl +++ b/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_action_schema.erl @@ -144,6 +144,16 @@ fields(aggregation) -> default => 1_000_000, desc => ?DESC("aggregation_max_records") } + )}, + {min_block_size, + mk( + emqx_schema:bytesize(), + #{ + default => <<"10mb">>, + importance => ?IMPORTANCE_HIDDEN, + required => true, + validator => fun block_size_validator/1 + } )} ]; fields(common_action_parameters) -> @@ -156,7 +166,7 @@ fields(common_action_parameters) -> importance => ?IMPORTANCE_HIDDEN, desc => ?DESC("max_block_size"), required => true, - validator => fun max_block_size_validator/1 + validator => fun block_size_validator/1 } )} ]; @@ -293,7 +303,7 @@ scunion(Field, Schemas, {value, Value}) -> throw(#{field_name => Field, expected => maps:keys(Schemas)}) end. -max_block_size_validator(SizeLimit) -> +block_size_validator(SizeLimit) -> case SizeLimit =< 4_000 * 1024 * 1024 of true -> ok; false -> {error, "must be less than 4000 MiB"} diff --git a/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl b/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl index 95d88ab28..4a66f039f 100644 --- a/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl +++ b/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl @@ -55,10 +55,6 @@ %% `emqx_template' API -export([lookup/2]). --ifdef(TEST). --export([take_chunk/2]). --endif. - %%------------------------------------------------------------------------------ %% Type declarations %%------------------------------------------------------------------------------ @@ -131,6 +127,7 @@ action := binary(), blob := emqx_template:t(), container := string(), + min_block_size := pos_integer(), max_block_size := pos_integer(), pool := connector_resource_id() } @@ -144,6 +141,8 @@ buffer_size := non_neg_integer(), container := container(), max_block_size := pos_integer(), + min_block_size := pos_integer(), + next_block := queue:queue(iolist()), num_blocks := non_neg_integer(), pool := pool_name(), started := boolean() @@ -348,6 +347,7 @@ init_transfer_state(Buffer, Opts) -> blob := BlobTemplate, container := Container, max_block_size := MaxBlockSize, + min_block_size := MinBlockSize, pool := Pool } } = Opts, @@ -358,6 +358,8 @@ init_transfer_state(Buffer, Opts) -> buffer_size => 0, container => Container, max_block_size => MaxBlockSize, + min_block_size => MinBlockSize, + next_block => queue:new(), num_blocks => 0, pool => Pool, started => false @@ -371,12 +373,25 @@ mk_blob_name_key(Buffer, ActionName, BlobTemplate) -> process_append(IOData, TransferState0) -> #{ buffer := Buffer, - buffer_size := BufferSize0 + buffer_size := BufferSize0, + min_block_size := MinBlockSize, + next_block := NextBlock0 } = TransferState0, - TransferState0#{ - buffer := [Buffer, IOData], - buffer_size := BufferSize0 + iolist_size(IOData) - }. + Size = iolist_size(IOData), + case Size + BufferSize0 >= MinBlockSize of + true -> + %% Block is ready to be written. + TransferState0#{ + buffer := [], + buffer_size := 0, + next_block := queue:in([Buffer, IOData], NextBlock0) + }; + false -> + TransferState0#{ + buffer := [Buffer, IOData], + buffer_size := BufferSize0 + Size + } + end. -spec process_write(transfer_state()) -> {ok, transfer_state()} | {error, term()}. @@ -396,25 +411,19 @@ process_write(TransferState0 = #{started := false}) -> {error, Reason} -> {error, Reason} end; -process_write(TransferState = #{started := true, buffer_size := 0}) -> - {ok, TransferState}; process_write(TransferState0 = #{started := true}) -> #{ - buffer := Buffer, - buffer_size := BufferSize, - max_block_size := MaxBlockSize + next_block := NextBlock0 } = TransferState0, - case BufferSize > MaxBlockSize of - true -> - {IOData, NewBuffer} = take_chunk(Buffer, MaxBlockSize), + case queue:out(NextBlock0) of + {{value, Block}, NextBlock} -> ?tp(azure_blob_storage_will_write_chunk, #{}), - do_process_write(IOData, NewBuffer, TransferState0); - false -> - NewBuffer = [], - do_process_write(Buffer, NewBuffer, TransferState0) + do_process_write(Block, TransferState0#{next_block := NextBlock}); + {empty, _} -> + {ok, TransferState0} end. -do_process_write(IOData, NewBuffer, TransferState0 = #{started := true}) -> +do_process_write(IOData, TransferState0 = #{started := true}) -> #{ blob := Blob, container := Container, @@ -423,12 +432,7 @@ do_process_write(IOData, NewBuffer, TransferState0 = #{started := true}) -> } = TransferState0, case append_data(Pool, Container, Blob, block_id(NumBlocks), IOData) of {ok, _} -> - BufferSize = iolist_size(NewBuffer), - TransferState = TransferState0#{ - buffer := NewBuffer, - buffer_size := BufferSize, - num_blocks := NumBlocks + 1 - }, + TransferState = TransferState0#{num_blocks := NumBlocks + 1}, process_write(TransferState); {error, Reason} -> {error, Reason} @@ -439,10 +443,21 @@ do_process_write(IOData, NewBuffer, TransferState0 = #{started := true}) -> process_complete(TransferState) -> #{ blob := Blob, + buffer := Buffer, + buffer_size := BufferSize, container := Container, - num_blocks := NumBlocks, + num_blocks := NumBlocks0, pool := Pool } = TransferState, + %% Flush any left-over data + NumBlocks = + case BufferSize > 0 of + true -> + {ok, #{num_blocks := NumBlocks1}} = do_process_write(Buffer, TransferState), + NumBlocks1; + false -> + NumBlocks0 + end, BlockRefs = [{block_id(N), latest} || N <- lists:seq(0, NumBlocks - 1)], case put_block_list(Pool, Container, Blob, BlockRefs) of {ok, _} -> @@ -511,6 +526,7 @@ install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState) aggregation := #{ container := ContainerOpts, max_records := MaxRecords, + min_block_size := MinBlockSize, time_interval := TimeInterval }, container := ContainerName, @@ -531,6 +547,7 @@ install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState) blob => Blob, container => ContainerName, max_block_size => MaxBlockSize, + min_block_size => MinBlockSize, pool => Pool }, DeliveryOpts = #{ @@ -707,49 +724,6 @@ check_aggreg_upload_errors(AggregId) -> check_container_accessible(Pool, Container) -> list_blobs(Pool, Container). --spec take_chunk(transfer_buffer(), pos_integer()) -> {_IOList :: iolist(), transfer_buffer()}. -take_chunk(Buffer, MaxBlockSize) -> - RemainingBytes = MaxBlockSize, - take_chunk(Buffer, RemainingBytes, _IOListAcc = []). - --spec take_chunk(transfer_buffer(), non_neg_integer(), IOList) -> - {IOList, transfer_buffer()} -when - IOList :: iolist(). -take_chunk(RemainingBuffer = [], _RemainingBytes, IOListAcc) -> - {IOListAcc, RemainingBuffer}; -take_chunk(RemainingBuffer, RemainingBytes, IOListAcc) when RemainingBytes =< 0 -> - {IOListAcc, [RemainingBuffer]}; -take_chunk([Data0], RemainingBytes, IOListAcc) -> - case do_take_chunk([Data0], RemainingBytes) of - {done, Data} -> - {[IOListAcc, Data], _RemainingBuffer = []}; - {more, {Data, Rest}} -> - {[IOListAcc, Data], [Rest]} - end; -take_chunk([Data0, Rest0], RemainingBytes0, IOListAcc) -> - case do_take_chunk(Data0, RemainingBytes0) of - {done, Data} -> - RemainingBytes = RemainingBytes0 - iolist_size(Data), - take_chunk([Rest0], RemainingBytes, [IOListAcc, Data]); - {more, {Data, Rest}} -> - {[IOListAcc, Data], [Rest, Rest0]} - end. - -do_take_chunk(Data, ChunkSize) -> - case iolist_size(Data) =< ChunkSize of - true -> - {done, Data}; - false -> - BinData0 = iolist_to_binary(Data), - {more, split_binary(BinData0, ChunkSize)} - end. - -%% ensure_transfer_buffer_shape([_, _] = Buffer) -> -%% Buffer; -%% ensure_transfer_buffer_shape(Data) -> -%% [Data]. - block_id(N) -> NumDigits = 32, list_to_binary(string:pad(integer_to_list(N), NumDigits, leading, $0)). diff --git a/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl b/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl index 223115991..89a9db41c 100644 --- a/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl +++ b/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl @@ -615,7 +615,9 @@ t_aggreg_pending_upload_restart(Config) -> #{ <<"parameters">> => #{ - <<"max_block_size">> => <<"1024B">> + <<"aggregation">> => #{ + <<"min_block_size">> => <<"1024B">> + } } } ) diff --git a/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_tests.erl b/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_tests.erl deleted file mode 100644 index dd7484aa2..000000000 --- a/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_tests.erl +++ /dev/null @@ -1,99 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_bridge_azure_blob_storage_tests). - --include_lib("proper/include/proper.hrl"). --include_lib("eunit/include/eunit.hrl"). - -%%------------------------------------------------------------------------------ -%% Helper fns -%%------------------------------------------------------------------------------ - -take_chunk(Buffer, BlockSize) -> - emqx_bridge_azure_blob_storage_connector:take_chunk(Buffer, BlockSize). - -%%------------------------------------------------------------------------------ -%% Generators -%%------------------------------------------------------------------------------ - -iodata_gen() -> - frequency([ - {10, []}, - {10, binary()}, - {10, list(oneof([binary(), byte()]))}, - {1, ?SIZED(Size, resize(Size div 3, list(iodata_gen())))} - ]). - -buffer_gen() -> - ?LET( - DataParts, - list(iodata_gen()), - lists:foldl( - fun(DataPart, Buffer) -> - [Buffer, DataPart] - end, - _InitialBuffer = [], - DataParts - ) - ). - -%%------------------------------------------------------------------------------ -%% Properties -%%------------------------------------------------------------------------------ - -%% Verifies that we can take several chunks from the buffer, and that data is not lost nor -%% created. -take_chunk_preserves_data_prop() -> - ?FORALL( - {Buffer, BlockSize, Steps}, - {buffer_gen(), non_neg_integer(), pos_integer()}, - begin - {Chunks, FinalBuffer} = - lists:mapfoldl( - fun(_, Acc) -> - take_chunk(Acc, BlockSize) - end, - Buffer, - lists:seq(1, Steps) - ), - %% Original buffer is preserved - BufferBin = iolist_to_binary(Buffer), - ConcatenatedBin = iolist_to_binary([Chunks, FinalBuffer]), - ?WHENFAIL( - ct:pal( - "block size: ~b\nsteps: ~b\noriginal buffer:\n ~p\nchunks + final buffer:\n ~p", - [BlockSize, Steps, Buffer, {Chunks, FinalBuffer}] - ), - BufferBin =:= ConcatenatedBin - ) - end - ). - -%% Verifies that the produced chunk has at most the requested size. -take_chunk_size_prop() -> - ?FORALL( - {Buffer, BlockSize}, - {buffer_gen(), non_neg_integer()}, - begin - {Chunk, FinalBuffer} = take_chunk(Buffer, BlockSize), - ?WHENFAIL( - ct:pal( - "block size: ~b\n\noriginal buffer:\n ~p\nchunk + final buffer:\n ~p" - "\nchunk size: ~b", - [BlockSize, Buffer, {Chunk, FinalBuffer}, iolist_size(Chunk)] - ), - iolist_size(Chunk) =< BlockSize - ) - end - ). - -%%------------------------------------------------------------------------------ -%% Tests -%%------------------------------------------------------------------------------ - -take_chunk_test_() -> - Props = [take_chunk_preserves_data_prop(), take_chunk_size_prop()], - Opts = [{numtests, 1000}, {to_file, user}, {max_size, 100}], - {timeout, 300, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}.