diff --git a/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl b/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl index e9ac07b53..cdd25ad7d 100644 --- a/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl +++ b/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl @@ -81,7 +81,8 @@ mode := direct, container := template_str(), blob := template_str(), - content := template_str() + content := template_str(), + max_block_size := pos_integer() } }. -type aggreg_action_config() :: #{ @@ -94,7 +95,9 @@ max_records := pos_integer() }, container := string(), - blob := template_str() + blob := template_str(), + max_block_size := pos_integer(), + min_block_size := pos_integer() }, any() => term() }. @@ -106,7 +109,8 @@ mode := direct, container := emqx_template:t(), blob := emqx_template:t(), - content := emqx_template:t() + content := emqx_template:t(), + max_block_size := pos_integer() }. -type aggreg_action_state() :: #{ mode := aggregated, @@ -505,7 +509,8 @@ install_action(#{parameters := #{mode := direct}} = ActionConfig, _ConnState) -> mode := Mode = direct, container := ContainerTemplateStr, blob := BlobTemplateStr, - content := ContentTemplateStr + content := ContentTemplateStr, + max_block_size := MaxBlockSize } } = ActionConfig, ContainerTemplate = emqx_template:parse(ContainerTemplateStr), @@ -515,7 +520,8 @@ install_action(#{parameters := #{mode := direct}} = ActionConfig, _ConnState) -> mode => Mode, container => ContainerTemplate, blob => BlobTemplate, - content => ContentTemplate + content => ContentTemplate, + max_block_size => MaxBlockSize }; install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState) -> #{pool_name := Pool} = ConnState, @@ -582,7 +588,8 @@ run_direct_transfer(Data, ConnResId, ActionResId, ActionState) -> #{ container := ContainerTemplate, blob := BlobTemplate, - content := ContentTemplate + content := ContentTemplate, + max_block_size := MaxBlockSize } = ActionState, Container = render_container(ContainerTemplate, Data), Blob = render_blob(BlobTemplate, Data), @@ -595,6 +602,12 @@ run_direct_transfer(Data, ConnResId, ActionResId, ActionState) -> data = Content } }), + case iolist_size(Content) > MaxBlockSize of + true -> + error({unrecoverable_error, payload_too_large}); + false -> + ok + end, case put_block_blob(ConnResId, Container, Blob, Content) of {ok, created} -> ?tp(azure_blob_storage_bridge_connector_upload_ok, #{instance_id => ConnResId}), @@ -652,7 +665,7 @@ render_container(Template, Data) -> {Result, []} -> iolist_to_string(Result); {_, Errors} -> - erlang:error({unrecoverable_error, {container_undefined, Errors}}) + error({unrecoverable_error, {container_undefined, Errors}}) end. render_blob(Template, Data) -> diff --git a/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl b/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl index 71c1d3a29..9e165c8e8 100644 --- a/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl +++ b/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl @@ -136,7 +136,8 @@ end_per_testcase(_Testcase, Config) -> direct_action_cases() -> [ t_sync_query, - t_sync_query_down + t_sync_query_down, + t_max_block_size_direct_transfer ]. %%------------------------------------------------------------------------------ @@ -685,3 +686,23 @@ t_aggreg_pending_upload_restart(Config) -> [] ), ok. + +%% Checks that we return an unrecoverable error if the payload exceeds `max_block_size'. +t_max_block_size_direct_transfer(Config) -> + {ok, _Bridge} = emqx_bridge_v2_testlib:create_bridge_api( + Config, + #{<<"parameters">> => #{<<"max_block_size">> => <<"1B">>}} + ), + Topic = <<"t/a">>, + ClientId = <<"myclient">>, + ResourceId = emqx_bridge_v2_testlib:resource_id(Config), + BridgeId = emqx_bridge_v2_testlib:bridge_id(Config), + Payload = <<"too large">>, + PayloadBin = emqx_utils_json:encode(Payload), + MsgEvent = mk_message_event(ClientId, Topic, PayloadBin), + Message = {BridgeId, MsgEvent}, + ?assertMatch( + {error, {unrecoverable_error, payload_too_large}}, + emqx_resource:simple_sync_query(ResourceId, Message) + ), + ok.