Merge pull request #13200 from thalesmg/update-erlazure-no-gen-server-m-20240606

chore: refactor azure blob connector to use new `erlazure` without `gen_server`
This commit is contained in:
Thales Macedo Garitezi 2024-06-11 13:30:14 -03:00 committed by GitHub
commit b2d716909f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 67 additions and 142 deletions

View File

@ -12,5 +12,5 @@
{deps, [ {deps, [
{emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_connector_aggregator, {path, "../../apps/emqx_connector_aggregator"}}, {emqx_connector_aggregator, {path, "../../apps/emqx_connector_aggregator"}},
{erlazure, {git, "https://github.com/emqx/erlazure.git", {tag, "0.3.0.2"}}} {erlazure, {git, "https://github.com/emqx/erlazure.git", {tag, "0.4.0.0"}}}
]}. ]}.

View File

@ -32,18 +32,6 @@
on_batch_query/3 on_batch_query/3
]). ]).
%% `ecpool_worker' API
-export([
connect/1,
do_create_append_blob/3,
do_create_block_blob/3,
do_append_data/5,
do_put_block_list/4,
do_put_block_blob/4,
do_health_check/1,
do_list_blobs/2
]).
%% `emqx_connector_aggreg_delivery' API %% `emqx_connector_aggreg_delivery' API
-export([ -export([
init_transfer_state/2, init_transfer_state/2,
@ -71,7 +59,7 @@
}. }.
-type connector_state() :: #{ -type connector_state() :: #{
pool_name := connector_resource_id(), driver_state := driver_state(),
installed_actions := #{action_resource_id() => action_state()} installed_actions := #{action_resource_id() => action_state()}
}. }.
@ -124,7 +112,7 @@
-type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}. -type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}.
-type pool_name() :: connector_resource_id(). -type driver_state() :: _.
-type transfer_opts() :: #{ -type transfer_opts() :: #{
upload_options := #{ upload_options := #{
@ -133,7 +121,7 @@
container := string(), container := string(),
min_block_size := pos_integer(), min_block_size := pos_integer(),
max_block_size := pos_integer(), max_block_size := pos_integer(),
pool := connector_resource_id() driver_state := driver_state()
} }
}. }.
@ -148,7 +136,7 @@
min_block_size := pos_integer(), min_block_size := pos_integer(),
next_block := queue:queue(iolist()), next_block := queue:queue(iolist()),
num_blocks := non_neg_integer(), num_blocks := non_neg_integer(),
pool := pool_name(), driver_state := driver_state(),
started := boolean() started := boolean()
}. }.
@ -162,38 +150,32 @@ callback_mode() ->
-spec on_start(connector_resource_id(), connector_config()) -> -spec on_start(connector_resource_id(), connector_config()) ->
{ok, connector_state()} | {error, _Reason}. {ok, connector_state()} | {error, _Reason}.
on_start(ConnResId, ConnConfig) -> on_start(_ConnResId, ConnConfig) ->
#{ #{
account_name := AccountName, account_name := AccountName,
account_key := AccountKey account_key := AccountKey
} = ConnConfig, } = ConnConfig,
Endpoint = maps:get(endpoint, ConnConfig, undefined), Endpoint = maps:get(endpoint, ConnConfig, undefined),
ClientOpts = [ {ok, DriverState} = erlazure:new(#{
{account_name, AccountName}, account => AccountName,
{account_key, AccountKey}, key => AccountKey,
{endpoint, Endpoint} endpoint => Endpoint
], }),
case emqx_resource_pool:start(ConnResId, ?MODULE, ClientOpts) of
ok ->
State = #{ State = #{
pool_name => ConnResId, driver_state => DriverState,
installed_actions => #{} installed_actions => #{}
}, },
{ok, State}; {ok, State}.
{error, Reason} ->
{error, Reason}
end.
-spec on_stop(connector_resource_id(), connector_state()) -> ok. -spec on_stop(connector_resource_id(), connector_state()) -> ok.
on_stop(ConnResId, _ConnState) -> on_stop(_ConnResId, _ConnState) ->
Res = emqx_resource_pool:stop(ConnResId), ?tp(azure_blob_storage_stop, #{instance_id => _ConnResId}),
?tp(azure_blob_storage_stop, #{instance_id => ConnResId}), ok.
Res.
-spec on_get_status(connector_resource_id(), connector_state()) -> -spec on_get_status(connector_resource_id(), connector_state()) ->
?status_connected | ?status_disconnected. ?status_connected | ?status_disconnected.
on_get_status(ConnResId, _ConnState) -> on_get_status(_ConnResId, _ConnState = #{driver_state := DriverState}) ->
health_check(ConnResId). health_check(DriverState).
-spec on_add_channel( -spec on_add_channel(
connector_resource_id(), connector_resource_id(),
@ -236,22 +218,22 @@ on_get_channels(ConnResId) ->
) -> ) ->
?status_connected | ?status_disconnected. ?status_connected | ?status_disconnected.
on_get_channel_status( on_get_channel_status(
ConnResId, _ConnResId,
ActionResId, ActionResId,
_ConnectorState = #{installed_actions := InstalledActions} ConnectorState = #{installed_actions := InstalledActions}
) when is_map_key(ActionResId, InstalledActions) -> ) when is_map_key(ActionResId, InstalledActions) ->
#{ActionResId := ActionConfig} = InstalledActions, #{ActionResId := ActionConfig} = InstalledActions,
channel_status(ActionConfig, ConnResId); channel_status(ActionConfig, ConnectorState);
on_get_channel_status(_ConnResId, _ActionResId, _ConnState) -> on_get_channel_status(_ConnResId, _ActionResId, _ConnState) ->
?status_disconnected. ?status_disconnected.
-spec on_query(connector_resource_id(), query(), connector_state()) -> -spec on_query(connector_resource_id(), query(), connector_state()) ->
{ok, _Result} | {error, _Reason}. {ok, _Result} | {error, _Reason}.
on_query(ConnResId, {Tag, Data}, #{installed_actions := InstalledActions}) -> on_query(ConnResId, {Tag, Data}, #{installed_actions := InstalledActions} = ConnState) ->
case maps:get(Tag, InstalledActions, undefined) of case maps:get(Tag, InstalledActions, undefined) of
ChannelState = #{mode := direct} -> ChannelState = #{mode := direct} ->
?tp(azure_blob_storage_bridge_on_query_enter, #{mode => direct}), ?tp(azure_blob_storage_bridge_on_query_enter, #{mode => direct}),
run_direct_transfer(Data, ConnResId, Tag, ChannelState); run_direct_transfer(Data, ConnResId, Tag, ChannelState, ConnState);
ChannelState = #{mode := aggregated} -> ChannelState = #{mode := aggregated} ->
?tp(azure_blob_storage_bridge_on_query_enter, #{mode => aggregated}), ?tp(azure_blob_storage_bridge_on_query_enter, #{mode => aggregated}),
run_aggregated_transfer([Data], ChannelState); run_aggregated_transfer([Data], ChannelState);
@ -271,71 +253,32 @@ on_batch_query(_ConnResId, [{Tag, Data0} | Rest], #{installed_actions := Install
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% `ecpool_worker' API %% Driver calls
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
connect(Opts0) -> do_create_block_blob(DriverState, Container, Blob) ->
#{
account_name := AccountName,
account_key := AccountKey,
endpoint := Endpoint
} = maps:from_list(Opts0),
erlazure:start(#{account => AccountName, key => AccountKey, endpoint => Endpoint}).
do_create_append_blob(Worker, Container, Blob) ->
%% TODO: check container type before setting content type %% TODO: check container type before setting content type
Opts = [{content_type, "text/csv"}], Opts = [{content_type, "text/csv"}],
erlazure:put_append_blob(Worker, Container, Blob, Opts, infinity). erlazure:put_block_blob(DriverState, Container, Blob, <<>>, Opts).
create_block_blob(Pool, Container, Blob) -> do_append_data(DriverState, Container, Blob, BlockId, IOData) ->
ecpool:pick_and_do(Pool, {?MODULE, do_create_block_blob, [Container, Blob]}, no_handover). erlazure:put_block(DriverState, Container, Blob, BlockId, IOData, []).
do_create_block_blob(Worker, Container, Blob) -> do_put_block_list(DriverState, Container, Blob, BlockRefs) ->
%% TODO: check container type before setting content type
Opts = [{content_type, "text/csv"}],
erlazure:put_block_blob(Worker, Container, Blob, <<>>, Opts, infinity).
append_data(Pool, Container, Blob, BlockId, IOData) ->
ecpool:pick_and_do(
Pool, {?MODULE, do_append_data, [Container, Blob, BlockId, IOData]}, no_handover
).
do_append_data(Worker, Container, Blob, BlockId, IOData) ->
erlazure:put_block(Worker, Container, Blob, BlockId, IOData, [], infinity).
put_block_list(Pool, Container, Blob, BlockRefs) ->
ecpool:pick_and_do(
Pool, {?MODULE, do_put_block_list, [Container, Blob, BlockRefs]}, no_handover
).
do_put_block_list(Worker, Container, Blob, BlockRefs) ->
%% TODO: check container type before setting content type %% TODO: check container type before setting content type
Opts = [{req_opts, [{headers, [{"x-ms-blob-content-type", "text/csv"}]}]}], Opts = [{req_opts, [{headers, [{"x-ms-blob-content-type", "text/csv"}]}]}],
erlazure:put_block_list(Worker, Container, Blob, BlockRefs, Opts, infinity). erlazure:put_block_list(DriverState, Container, Blob, BlockRefs, Opts).
put_block_blob(Pool, Container, Blob, IOData) -> do_put_block_blob(DriverState, Container, Blob, IOData) ->
ecpool:pick_and_do(Pool, {?MODULE, do_put_block_blob, [Container, Blob, IOData]}, no_handover). erlazure:put_block_blob(DriverState, Container, Blob, IOData, []).
do_put_block_blob(Worker, Container, Blob, IOData) -> do_list_blobs(DriverState, Container) ->
erlazure:put_block_blob(Worker, Container, Blob, IOData, [], infinity). try erlazure:list_blobs(DriverState, Container, []) of
do_health_check(Worker) ->
case erlazure:list_containers(Worker, [], infinity) of
{error, _} ->
error;
{L, _} when is_list(L) ->
ok
end.
list_blobs(Pool, Container) ->
ecpool:pick_and_do(Pool, {?MODULE, do_list_blobs, [Container]}, no_handover).
do_list_blobs(Worker, Container) ->
case erlazure:list_blobs(Worker, Container, [], infinity) of
{error, _} ->
error;
{L, _} when is_list(L) -> {L, _} when is_list(L) ->
ok ok
catch
_:_ ->
error
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -352,7 +295,7 @@ init_transfer_state(Buffer, Opts) ->
container := Container, container := Container,
max_block_size := MaxBlockSize, max_block_size := MaxBlockSize,
min_block_size := MinBlockSize, min_block_size := MinBlockSize,
pool := Pool driver_state := DriverState
} }
} = Opts, } = Opts,
Blob = mk_blob_name_key(Buffer, ActionName, BlobTemplate), Blob = mk_blob_name_key(Buffer, ActionName, BlobTemplate),
@ -365,7 +308,7 @@ init_transfer_state(Buffer, Opts) ->
min_block_size => MinBlockSize, min_block_size => MinBlockSize,
next_block => queue:new(), next_block => queue:new(),
num_blocks => 0, num_blocks => 0,
pool => Pool, driver_state => DriverState,
started => false started => false
}. }.
@ -401,14 +344,14 @@ process_append(IOData, TransferState0) ->
{ok, transfer_state()} | {error, term()}. {ok, transfer_state()} | {error, term()}.
process_write(TransferState0 = #{started := false}) -> process_write(TransferState0 = #{started := false}) ->
#{ #{
pool := Pool, driver_state := DriverState,
blob := Blob, blob := Blob,
container := Container container := Container
} = TransferState0, } = TransferState0,
%% TODO %% TODO
%% Possible optimization: if the whole buffer fits the 5000 MiB `put_block_blob' %% Possible optimization: if the whole buffer fits the 5000 MiB `put_block_blob'
%% limit, we could upload the whole thing here. %% limit, we could upload the whole thing here.
case create_block_blob(Pool, Container, Blob) of case do_create_block_blob(DriverState, Container, Blob) of
{ok, _} -> {ok, _} ->
TransferState = TransferState0#{started := true}, TransferState = TransferState0#{started := true},
process_write(TransferState); process_write(TransferState);
@ -432,9 +375,9 @@ do_process_write(IOData, TransferState0 = #{started := true}) ->
blob := Blob, blob := Blob,
container := Container, container := Container,
num_blocks := NumBlocks, num_blocks := NumBlocks,
pool := Pool driver_state := DriverState
} = TransferState0, } = TransferState0,
case append_data(Pool, Container, Blob, block_id(NumBlocks), IOData) of case do_append_data(DriverState, Container, Blob, block_id(NumBlocks), IOData) of
{ok, _} -> {ok, _} ->
TransferState = TransferState0#{num_blocks := NumBlocks + 1}, TransferState = TransferState0#{num_blocks := NumBlocks + 1},
process_write(TransferState); process_write(TransferState);
@ -451,7 +394,7 @@ process_complete(TransferState) ->
buffer_size := BufferSize, buffer_size := BufferSize,
container := Container, container := Container,
num_blocks := NumBlocks0, num_blocks := NumBlocks0,
pool := Pool driver_state := DriverState
} = TransferState, } = TransferState,
%% Flush any left-over data %% Flush any left-over data
NumBlocks = NumBlocks =
@ -463,7 +406,7 @@ process_complete(TransferState) ->
NumBlocks0 NumBlocks0
end, end,
BlockRefs = [{block_id(N), latest} || N <- lists:seq(0, NumBlocks - 1)], BlockRefs = [{block_id(N), latest} || N <- lists:seq(0, NumBlocks - 1)],
case put_block_list(Pool, Container, Blob, BlockRefs) of case do_put_block_list(DriverState, Container, Blob, BlockRefs) of
{ok, _} -> {ok, _} ->
{ok, #{num_blocks => NumBlocks}}; {ok, #{num_blocks => NumBlocks}};
{error, Reason} -> {error, Reason} ->
@ -524,7 +467,7 @@ install_action(#{parameters := #{mode := direct}} = ActionConfig, _ConnState) ->
max_block_size => MaxBlockSize max_block_size => MaxBlockSize
}; };
install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState) -> install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState) ->
#{pool_name := Pool} = ConnState, #{driver_state := DriverState} = ConnState,
#{ #{
bridge_name := Name, bridge_name := Name,
parameters := #{ parameters := #{
@ -554,7 +497,7 @@ install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState)
container => ContainerName, container => ContainerName,
max_block_size => MaxBlockSize, max_block_size => MaxBlockSize,
min_block_size => MinBlockSize, min_block_size => MinBlockSize,
pool => Pool driver_state => DriverState
}, },
DeliveryOpts = #{ DeliveryOpts = #{
callback_module => ?MODULE, callback_module => ?MODULE,
@ -584,7 +527,8 @@ stop_action(#{on_stop := {M, F, A}}) ->
stop_action(_) -> stop_action(_) ->
ok. ok.
run_direct_transfer(Data, ConnResId, ActionResId, ActionState) -> run_direct_transfer(Data, ConnResId, ActionResId, ActionState, ConnState) ->
#{driver_state := DriverState} = ConnState,
#{ #{
container := ContainerTemplate, container := ContainerTemplate,
blob := BlobTemplate, blob := BlobTemplate,
@ -608,7 +552,7 @@ run_direct_transfer(Data, ConnResId, ActionResId, ActionState) ->
false -> false ->
ok ok
end, end,
case put_block_blob(ConnResId, Container, Blob, Content) of case do_put_block_blob(DriverState, Container, Blob, Content) of
{ok, created} -> {ok, created} ->
?tp(azure_blob_storage_bridge_connector_upload_ok, #{instance_id => ConnResId}), ?tp(azure_blob_storage_bridge_connector_upload_ok, #{instance_id => ConnResId}),
ok; ok;
@ -681,40 +625,26 @@ render_content(Template, Data) ->
iolist_to_string(IOList) -> iolist_to_string(IOList) ->
unicode:characters_to_list(IOList). unicode:characters_to_list(IOList).
channel_status(#{mode := direct}, _ConnResId) -> channel_status(#{mode := direct}, _ConnState) ->
%% There's nothing in particular to check for in this mode; the connector health check %% There's nothing in particular to check for in this mode; the connector health check
%% already verifies that we're able to use the client to list containers. %% already verifies that we're able to use the client to list containers.
?status_connected; ?status_connected;
channel_status(#{mode := aggregated} = ActionState, ConnResId) -> channel_status(#{mode := aggregated} = ActionState, ConnState) ->
#{driver_state := DriverState} = ConnState,
#{container := Container, aggreg_id := AggregId} = ActionState, #{container := Container, aggreg_id := AggregId} = ActionState,
%% NOTE: This will effectively trigger uploads of buffers yet to be uploaded. %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
Timestamp = erlang:system_time(second), Timestamp = erlang:system_time(second),
ok = emqx_connector_aggregator:tick(AggregId, Timestamp), ok = emqx_connector_aggregator:tick(AggregId, Timestamp),
ok = check_container_accessible(ConnResId, Container), ok = check_container_accessible(DriverState, Container),
ok = check_aggreg_upload_errors(AggregId), ok = check_aggreg_upload_errors(AggregId),
?status_connected. ?status_connected.
health_check(ConnResId) -> health_check(DriverState) ->
case case erlazure:list_containers(DriverState, []) of
emqx_resource_pool:health_check_workers(
ConnResId,
fun ?MODULE:do_health_check/1,
emqx_resource_pool:health_check_timeout(),
#{return_values => true}
)
of
{ok, []} ->
?status_disconnected;
{ok, Values} ->
AllOk = lists:all(fun(S) -> S =:= ok end, Values),
case AllOk of
true ->
?status_connected;
false ->
?status_disconnected
end;
{error, _} -> {error, _} ->
?status_disconnected ?status_disconnected;
{L, _} when is_list(L) ->
?status_connected
end. end.
map_error({failed_connect, _} = Reason) -> map_error({failed_connect, _} = Reason) ->
@ -734,8 +664,8 @@ check_aggreg_upload_errors(AggregId) ->
ok ok
end. end.
check_container_accessible(Pool, Container) -> check_container_accessible(DriverState, Container) ->
list_blobs(Pool, Container). do_list_blobs(DriverState, Container).
block_id(N) -> block_id(N) ->
NumDigits = 32, NumDigits = 32,

View File

@ -105,7 +105,7 @@ init_per_testcase(TestCase, Config0) ->
parameters => #{container => ContainerName} parameters => #{container => ContainerName}
}) })
end, end,
Client = start_control_client(Endpoint), Client = new_control_driver(Endpoint),
ct:pal("container name: ~s", [ContainerName]), ct:pal("container name: ~s", [ContainerName]),
ok = ensure_new_container(ContainerName, Client), ok = ensure_new_container(ContainerName, Client),
Config = [ Config = [
@ -123,14 +123,12 @@ init_per_testcase(TestCase, Config0) ->
Config. Config.
end_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, Config) ->
Client = ?config(client, Config),
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_common_test_helpers:call_janitor(), emqx_common_test_helpers:call_janitor(),
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
stop_control_client(Client),
ok. ok.
direct_action_cases() -> direct_action_cases() ->
@ -144,17 +142,14 @@ direct_action_cases() ->
%% Helper fns %% Helper fns
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
start_control_client(Endpoint) -> new_control_driver(Endpoint) ->
{ok, Client} = erlazure:start(#{ {ok, Client} = erlazure:new(#{
endpoint => Endpoint, endpoint => Endpoint,
account => binary_to_list(?ACCOUNT_NAME_BIN), account => binary_to_list(?ACCOUNT_NAME_BIN),
key => binary_to_list(?ACCOUNT_KEY_BIN) key => binary_to_list(?ACCOUNT_KEY_BIN)
}), }),
Client. Client.
stop_control_client(Client) ->
gen_server:stop(Client).
container_name(Name) -> container_name(Name) ->
IOList = re:replace(bin(Name), <<"[^a-z0-9-]">>, <<"-">>, [global]), IOList = re:replace(bin(Name), <<"[^a-z0-9-]">>, <<"-">>, [global]),
iolist_to_binary(IOList). iolist_to_binary(IOList).