fix: check `max_block_size` for direct transfers

This commit is contained in:
Thales Macedo Garitezi 2024-05-29 16:08:13 -03:00
parent 4859cebd9f
commit 347971e24a
2 changed files with 42 additions and 8 deletions

View File

@ -81,7 +81,8 @@
mode := direct, mode := direct,
container := template_str(), container := template_str(),
blob := template_str(), blob := template_str(),
content := template_str() content := template_str(),
max_block_size := pos_integer()
} }
}. }.
-type aggreg_action_config() :: #{ -type aggreg_action_config() :: #{
@ -94,7 +95,9 @@
max_records := pos_integer() max_records := pos_integer()
}, },
container := string(), container := string(),
blob := template_str() blob := template_str(),
max_block_size := pos_integer(),
min_block_size := pos_integer()
}, },
any() => term() any() => term()
}. }.
@ -106,7 +109,8 @@
mode := direct, mode := direct,
container := emqx_template:t(), container := emqx_template:t(),
blob := emqx_template:t(), blob := emqx_template:t(),
content := emqx_template:t() content := emqx_template:t(),
max_block_size := pos_integer()
}. }.
-type aggreg_action_state() :: #{ -type aggreg_action_state() :: #{
mode := aggregated, mode := aggregated,
@ -505,7 +509,8 @@ install_action(#{parameters := #{mode := direct}} = ActionConfig, _ConnState) ->
mode := Mode = direct, mode := Mode = direct,
container := ContainerTemplateStr, container := ContainerTemplateStr,
blob := BlobTemplateStr, blob := BlobTemplateStr,
content := ContentTemplateStr content := ContentTemplateStr,
max_block_size := MaxBlockSize
} }
} = ActionConfig, } = ActionConfig,
ContainerTemplate = emqx_template:parse(ContainerTemplateStr), ContainerTemplate = emqx_template:parse(ContainerTemplateStr),
@ -515,7 +520,8 @@ install_action(#{parameters := #{mode := direct}} = ActionConfig, _ConnState) ->
mode => Mode, mode => Mode,
container => ContainerTemplate, container => ContainerTemplate,
blob => BlobTemplate, blob => BlobTemplate,
content => ContentTemplate content => ContentTemplate,
max_block_size => MaxBlockSize
}; };
install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState) -> install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState) ->
#{pool_name := Pool} = ConnState, #{pool_name := Pool} = ConnState,
@ -582,7 +588,8 @@ run_direct_transfer(Data, ConnResId, ActionResId, ActionState) ->
#{ #{
container := ContainerTemplate, container := ContainerTemplate,
blob := BlobTemplate, blob := BlobTemplate,
content := ContentTemplate content := ContentTemplate,
max_block_size := MaxBlockSize
} = ActionState, } = ActionState,
Container = render_container(ContainerTemplate, Data), Container = render_container(ContainerTemplate, Data),
Blob = render_blob(BlobTemplate, Data), Blob = render_blob(BlobTemplate, Data),
@ -595,6 +602,12 @@ run_direct_transfer(Data, ConnResId, ActionResId, ActionState) ->
data = Content data = Content
} }
}), }),
case iolist_size(Content) > MaxBlockSize of
true ->
error({unrecoverable_error, payload_too_large});
false ->
ok
end,
case put_block_blob(ConnResId, Container, Blob, Content) of case put_block_blob(ConnResId, Container, Blob, Content) of
{ok, created} -> {ok, created} ->
?tp(azure_blob_storage_bridge_connector_upload_ok, #{instance_id => ConnResId}), ?tp(azure_blob_storage_bridge_connector_upload_ok, #{instance_id => ConnResId}),
@ -652,7 +665,7 @@ render_container(Template, Data) ->
{Result, []} -> {Result, []} ->
iolist_to_string(Result); iolist_to_string(Result);
{_, Errors} -> {_, Errors} ->
erlang:error({unrecoverable_error, {container_undefined, Errors}}) error({unrecoverable_error, {container_undefined, Errors}})
end. end.
render_blob(Template, Data) -> render_blob(Template, Data) ->

View File

@ -136,7 +136,8 @@ end_per_testcase(_Testcase, Config) ->
direct_action_cases() -> direct_action_cases() ->
[ [
t_sync_query, t_sync_query,
t_sync_query_down t_sync_query_down,
t_max_block_size_direct_transfer
]. ].
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -685,3 +686,23 @@ t_aggreg_pending_upload_restart(Config) ->
[] []
), ),
ok. ok.
%% Checks that we return an unrecoverable error if the payload exceeds `max_block_size'.
t_max_block_size_direct_transfer(Config) ->
{ok, _Bridge} = emqx_bridge_v2_testlib:create_bridge_api(
Config,
#{<<"parameters">> => #{<<"max_block_size">> => <<"1B">>}}
),
Topic = <<"t/a">>,
ClientId = <<"myclient">>,
ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
Payload = <<"too large">>,
PayloadBin = emqx_utils_json:encode(Payload),
MsgEvent = mk_message_event(ClientId, Topic, PayloadBin),
Message = {BridgeId, MsgEvent},
?assertMatch(
{error, {unrecoverable_error, payload_too_large}},
emqx_resource:simple_sync_query(ResourceId, Message)
),
ok.