chore: refactor azure blob connector to use new `erlazure` without `gen_server`
https://github.com/dkataskin/erlazure/pull/43 removes unnecessary usage of `gen_server` from the driver.
This commit is contained in:
parent
c0c5545c21
commit
e586178479
|
@ -12,5 +12,5 @@
|
|||
{deps, [
|
||||
{emqx_resource, {path, "../../apps/emqx_resource"}},
|
||||
{emqx_connector_aggregator, {path, "../../apps/emqx_connector_aggregator"}},
|
||||
{erlazure, {git, "https://github.com/emqx/erlazure.git", {tag, "0.3.0.2"}}}
|
||||
{erlazure, {git, "https://github.com/emqx/erlazure.git", {tag, "0.4.0.0"}}}
|
||||
]}.
|
||||
|
|
|
@ -32,18 +32,6 @@
|
|||
on_batch_query/3
|
||||
]).
|
||||
|
||||
%% `ecpool_worker' API
|
||||
-export([
|
||||
connect/1,
|
||||
do_create_append_blob/3,
|
||||
do_create_block_blob/3,
|
||||
do_append_data/5,
|
||||
do_put_block_list/4,
|
||||
do_put_block_blob/4,
|
||||
do_health_check/1,
|
||||
do_list_blobs/2
|
||||
]).
|
||||
|
||||
%% `emqx_connector_aggreg_delivery' API
|
||||
-export([
|
||||
init_transfer_state/2,
|
||||
|
@ -71,7 +59,7 @@
|
|||
}.
|
||||
|
||||
-type connector_state() :: #{
|
||||
pool_name := connector_resource_id(),
|
||||
driver_state := driver_state(),
|
||||
installed_actions := #{action_resource_id() => action_state()}
|
||||
}.
|
||||
|
||||
|
@ -124,7 +112,7 @@
|
|||
|
||||
-type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}.
|
||||
|
||||
-type pool_name() :: connector_resource_id().
|
||||
-type driver_state() :: _.
|
||||
|
||||
-type transfer_opts() :: #{
|
||||
upload_options := #{
|
||||
|
@ -133,7 +121,7 @@
|
|||
container := string(),
|
||||
min_block_size := pos_integer(),
|
||||
max_block_size := pos_integer(),
|
||||
pool := connector_resource_id()
|
||||
driver_state := driver_state()
|
||||
}
|
||||
}.
|
||||
|
||||
|
@ -148,7 +136,7 @@
|
|||
min_block_size := pos_integer(),
|
||||
next_block := queue:queue(iolist()),
|
||||
num_blocks := non_neg_integer(),
|
||||
pool := pool_name(),
|
||||
driver_state := driver_state(),
|
||||
started := boolean()
|
||||
}.
|
||||
|
||||
|
@ -162,38 +150,32 @@ callback_mode() ->
|
|||
|
||||
-spec on_start(connector_resource_id(), connector_config()) ->
|
||||
{ok, connector_state()} | {error, _Reason}.
|
||||
on_start(ConnResId, ConnConfig) ->
|
||||
on_start(_ConnResId, ConnConfig) ->
|
||||
#{
|
||||
account_name := AccountName,
|
||||
account_key := AccountKey
|
||||
} = ConnConfig,
|
||||
Endpoint = maps:get(endpoint, ConnConfig, undefined),
|
||||
ClientOpts = [
|
||||
{account_name, AccountName},
|
||||
{account_key, AccountKey},
|
||||
{endpoint, Endpoint}
|
||||
],
|
||||
case emqx_resource_pool:start(ConnResId, ?MODULE, ClientOpts) of
|
||||
ok ->
|
||||
{ok, DriverState} = erlazure:new(#{
|
||||
account => AccountName,
|
||||
key => AccountKey,
|
||||
endpoint => Endpoint
|
||||
}),
|
||||
State = #{
|
||||
pool_name => ConnResId,
|
||||
driver_state => DriverState,
|
||||
installed_actions => #{}
|
||||
},
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
{ok, State}.
|
||||
|
||||
-spec on_stop(connector_resource_id(), connector_state()) -> ok.
|
||||
on_stop(ConnResId, _ConnState) ->
|
||||
Res = emqx_resource_pool:stop(ConnResId),
|
||||
?tp(azure_blob_storage_stop, #{instance_id => ConnResId}),
|
||||
Res.
|
||||
on_stop(_ConnResId, _ConnState) ->
|
||||
?tp(azure_blob_storage_stop, #{instance_id => _ConnResId}),
|
||||
ok.
|
||||
|
||||
-spec on_get_status(connector_resource_id(), connector_state()) ->
|
||||
?status_connected | ?status_disconnected.
|
||||
on_get_status(ConnResId, _ConnState) ->
|
||||
health_check(ConnResId).
|
||||
on_get_status(_ConnResId, _ConnState = #{driver_state := DriverState}) ->
|
||||
health_check(DriverState).
|
||||
|
||||
-spec on_add_channel(
|
||||
connector_resource_id(),
|
||||
|
@ -236,22 +218,22 @@ on_get_channels(ConnResId) ->
|
|||
) ->
|
||||
?status_connected | ?status_disconnected.
|
||||
on_get_channel_status(
|
||||
ConnResId,
|
||||
_ConnResId,
|
||||
ActionResId,
|
||||
_ConnectorState = #{installed_actions := InstalledActions}
|
||||
ConnectorState = #{installed_actions := InstalledActions}
|
||||
) when is_map_key(ActionResId, InstalledActions) ->
|
||||
#{ActionResId := ActionConfig} = InstalledActions,
|
||||
channel_status(ActionConfig, ConnResId);
|
||||
channel_status(ActionConfig, ConnectorState);
|
||||
on_get_channel_status(_ConnResId, _ActionResId, _ConnState) ->
|
||||
?status_disconnected.
|
||||
|
||||
-spec on_query(connector_resource_id(), query(), connector_state()) ->
|
||||
{ok, _Result} | {error, _Reason}.
|
||||
on_query(ConnResId, {Tag, Data}, #{installed_actions := InstalledActions}) ->
|
||||
on_query(ConnResId, {Tag, Data}, #{installed_actions := InstalledActions} = ConnState) ->
|
||||
case maps:get(Tag, InstalledActions, undefined) of
|
||||
ChannelState = #{mode := direct} ->
|
||||
?tp(azure_blob_storage_bridge_on_query_enter, #{mode => direct}),
|
||||
run_direct_transfer(Data, ConnResId, Tag, ChannelState);
|
||||
run_direct_transfer(Data, ConnResId, Tag, ChannelState, ConnState);
|
||||
ChannelState = #{mode := aggregated} ->
|
||||
?tp(azure_blob_storage_bridge_on_query_enter, #{mode => aggregated}),
|
||||
run_aggregated_transfer([Data], ChannelState);
|
||||
|
@ -271,71 +253,32 @@ on_batch_query(_ConnResId, [{Tag, Data0} | Rest], #{installed_actions := Install
|
|||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% `ecpool_worker' API
|
||||
%% Driver calls
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
connect(Opts0) ->
|
||||
#{
|
||||
account_name := AccountName,
|
||||
account_key := AccountKey,
|
||||
endpoint := Endpoint
|
||||
} = maps:from_list(Opts0),
|
||||
erlazure:start(#{account => AccountName, key => AccountKey, endpoint => Endpoint}).
|
||||
|
||||
do_create_append_blob(Worker, Container, Blob) ->
|
||||
do_create_block_blob(DriverState, Container, Blob) ->
|
||||
%% TODO: check container type before setting content type
|
||||
Opts = [{content_type, "text/csv"}],
|
||||
erlazure:put_append_blob(Worker, Container, Blob, Opts, infinity).
|
||||
erlazure:put_block_blob(DriverState, Container, Blob, <<>>, Opts).
|
||||
|
||||
create_block_blob(Pool, Container, Blob) ->
|
||||
ecpool:pick_and_do(Pool, {?MODULE, do_create_block_blob, [Container, Blob]}, no_handover).
|
||||
do_append_data(DriverState, Container, Blob, BlockId, IOData) ->
|
||||
erlazure:put_block(DriverState, Container, Blob, BlockId, IOData, []).
|
||||
|
||||
do_create_block_blob(Worker, Container, Blob) ->
|
||||
%% TODO: check container type before setting content type
|
||||
Opts = [{content_type, "text/csv"}],
|
||||
erlazure:put_block_blob(Worker, Container, Blob, <<>>, Opts, infinity).
|
||||
|
||||
append_data(Pool, Container, Blob, BlockId, IOData) ->
|
||||
ecpool:pick_and_do(
|
||||
Pool, {?MODULE, do_append_data, [Container, Blob, BlockId, IOData]}, no_handover
|
||||
).
|
||||
|
||||
do_append_data(Worker, Container, Blob, BlockId, IOData) ->
|
||||
erlazure:put_block(Worker, Container, Blob, BlockId, IOData, [], infinity).
|
||||
|
||||
put_block_list(Pool, Container, Blob, BlockRefs) ->
|
||||
ecpool:pick_and_do(
|
||||
Pool, {?MODULE, do_put_block_list, [Container, Blob, BlockRefs]}, no_handover
|
||||
).
|
||||
|
||||
do_put_block_list(Worker, Container, Blob, BlockRefs) ->
|
||||
do_put_block_list(DriverState, Container, Blob, BlockRefs) ->
|
||||
%% TODO: check container type before setting content type
|
||||
Opts = [{req_opts, [{headers, [{"x-ms-blob-content-type", "text/csv"}]}]}],
|
||||
erlazure:put_block_list(Worker, Container, Blob, BlockRefs, Opts, infinity).
|
||||
erlazure:put_block_list(DriverState, Container, Blob, BlockRefs, Opts).
|
||||
|
||||
put_block_blob(Pool, Container, Blob, IOData) ->
|
||||
ecpool:pick_and_do(Pool, {?MODULE, do_put_block_blob, [Container, Blob, IOData]}, no_handover).
|
||||
do_put_block_blob(DriverState, Container, Blob, IOData) ->
|
||||
erlazure:put_block_blob(DriverState, Container, Blob, IOData, []).
|
||||
|
||||
do_put_block_blob(Worker, Container, Blob, IOData) ->
|
||||
erlazure:put_block_blob(Worker, Container, Blob, IOData, [], infinity).
|
||||
|
||||
do_health_check(Worker) ->
|
||||
case erlazure:list_containers(Worker, [], infinity) of
|
||||
{error, _} ->
|
||||
error;
|
||||
{L, _} when is_list(L) ->
|
||||
ok
|
||||
end.
|
||||
|
||||
list_blobs(Pool, Container) ->
|
||||
ecpool:pick_and_do(Pool, {?MODULE, do_list_blobs, [Container]}, no_handover).
|
||||
|
||||
do_list_blobs(Worker, Container) ->
|
||||
case erlazure:list_blobs(Worker, Container, [], infinity) of
|
||||
{error, _} ->
|
||||
error;
|
||||
do_list_blobs(DriverState, Container) ->
|
||||
try erlazure:list_blobs(DriverState, Container, []) of
|
||||
{L, _} when is_list(L) ->
|
||||
ok
|
||||
catch
|
||||
_:_ ->
|
||||
error
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -352,7 +295,7 @@ init_transfer_state(Buffer, Opts) ->
|
|||
container := Container,
|
||||
max_block_size := MaxBlockSize,
|
||||
min_block_size := MinBlockSize,
|
||||
pool := Pool
|
||||
driver_state := DriverState
|
||||
}
|
||||
} = Opts,
|
||||
Blob = mk_blob_name_key(Buffer, ActionName, BlobTemplate),
|
||||
|
@ -365,7 +308,7 @@ init_transfer_state(Buffer, Opts) ->
|
|||
min_block_size => MinBlockSize,
|
||||
next_block => queue:new(),
|
||||
num_blocks => 0,
|
||||
pool => Pool,
|
||||
driver_state => DriverState,
|
||||
started => false
|
||||
}.
|
||||
|
||||
|
@ -401,14 +344,14 @@ process_append(IOData, TransferState0) ->
|
|||
{ok, transfer_state()} | {error, term()}.
|
||||
process_write(TransferState0 = #{started := false}) ->
|
||||
#{
|
||||
pool := Pool,
|
||||
driver_state := DriverState,
|
||||
blob := Blob,
|
||||
container := Container
|
||||
} = TransferState0,
|
||||
%% TODO
|
||||
%% Possible optimization: if the whole buffer fits the 5000 MiB `put_block_blob'
|
||||
%% limit, we could upload the whole thing here.
|
||||
case create_block_blob(Pool, Container, Blob) of
|
||||
case do_create_block_blob(DriverState, Container, Blob) of
|
||||
{ok, _} ->
|
||||
TransferState = TransferState0#{started := true},
|
||||
process_write(TransferState);
|
||||
|
@ -432,9 +375,9 @@ do_process_write(IOData, TransferState0 = #{started := true}) ->
|
|||
blob := Blob,
|
||||
container := Container,
|
||||
num_blocks := NumBlocks,
|
||||
pool := Pool
|
||||
driver_state := DriverState
|
||||
} = TransferState0,
|
||||
case append_data(Pool, Container, Blob, block_id(NumBlocks), IOData) of
|
||||
case do_append_data(DriverState, Container, Blob, block_id(NumBlocks), IOData) of
|
||||
{ok, _} ->
|
||||
TransferState = TransferState0#{num_blocks := NumBlocks + 1},
|
||||
process_write(TransferState);
|
||||
|
@ -451,7 +394,7 @@ process_complete(TransferState) ->
|
|||
buffer_size := BufferSize,
|
||||
container := Container,
|
||||
num_blocks := NumBlocks0,
|
||||
pool := Pool
|
||||
driver_state := DriverState
|
||||
} = TransferState,
|
||||
%% Flush any left-over data
|
||||
NumBlocks =
|
||||
|
@ -463,7 +406,7 @@ process_complete(TransferState) ->
|
|||
NumBlocks0
|
||||
end,
|
||||
BlockRefs = [{block_id(N), latest} || N <- lists:seq(0, NumBlocks - 1)],
|
||||
case put_block_list(Pool, Container, Blob, BlockRefs) of
|
||||
case do_put_block_list(DriverState, Container, Blob, BlockRefs) of
|
||||
{ok, _} ->
|
||||
{ok, #{num_blocks => NumBlocks}};
|
||||
{error, Reason} ->
|
||||
|
@ -524,7 +467,7 @@ install_action(#{parameters := #{mode := direct}} = ActionConfig, _ConnState) ->
|
|||
max_block_size => MaxBlockSize
|
||||
};
|
||||
install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState) ->
|
||||
#{pool_name := Pool} = ConnState,
|
||||
#{driver_state := DriverState} = ConnState,
|
||||
#{
|
||||
bridge_name := Name,
|
||||
parameters := #{
|
||||
|
@ -554,7 +497,7 @@ install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState)
|
|||
container => ContainerName,
|
||||
max_block_size => MaxBlockSize,
|
||||
min_block_size => MinBlockSize,
|
||||
pool => Pool
|
||||
driver_state => DriverState
|
||||
},
|
||||
DeliveryOpts = #{
|
||||
callback_module => ?MODULE,
|
||||
|
@ -584,7 +527,8 @@ stop_action(#{on_stop := {M, F, A}}) ->
|
|||
stop_action(_) ->
|
||||
ok.
|
||||
|
||||
run_direct_transfer(Data, ConnResId, ActionResId, ActionState) ->
|
||||
run_direct_transfer(Data, ConnResId, ActionResId, ActionState, ConnState) ->
|
||||
#{driver_state := DriverState} = ConnState,
|
||||
#{
|
||||
container := ContainerTemplate,
|
||||
blob := BlobTemplate,
|
||||
|
@ -608,7 +552,7 @@ run_direct_transfer(Data, ConnResId, ActionResId, ActionState) ->
|
|||
false ->
|
||||
ok
|
||||
end,
|
||||
case put_block_blob(ConnResId, Container, Blob, Content) of
|
||||
case do_put_block_blob(DriverState, Container, Blob, Content) of
|
||||
{ok, created} ->
|
||||
?tp(azure_blob_storage_bridge_connector_upload_ok, #{instance_id => ConnResId}),
|
||||
ok;
|
||||
|
@ -681,40 +625,26 @@ render_content(Template, Data) ->
|
|||
iolist_to_string(IOList) ->
|
||||
unicode:characters_to_list(IOList).
|
||||
|
||||
channel_status(#{mode := direct}, _ConnResId) ->
|
||||
channel_status(#{mode := direct}, _ConnState) ->
|
||||
%% There's nothing in particular to check for in this mode; the connector health check
|
||||
%% already verifies that we're able to use the client to list containers.
|
||||
?status_connected;
|
||||
channel_status(#{mode := aggregated} = ActionState, ConnResId) ->
|
||||
channel_status(#{mode := aggregated} = ActionState, ConnState) ->
|
||||
#{driver_state := DriverState} = ConnState,
|
||||
#{container := Container, aggreg_id := AggregId} = ActionState,
|
||||
%% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
|
||||
Timestamp = erlang:system_time(second),
|
||||
ok = emqx_connector_aggregator:tick(AggregId, Timestamp),
|
||||
ok = check_container_accessible(ConnResId, Container),
|
||||
ok = check_container_accessible(DriverState, Container),
|
||||
ok = check_aggreg_upload_errors(AggregId),
|
||||
?status_connected.
|
||||
|
||||
health_check(ConnResId) ->
|
||||
case
|
||||
emqx_resource_pool:health_check_workers(
|
||||
ConnResId,
|
||||
fun ?MODULE:do_health_check/1,
|
||||
emqx_resource_pool:health_check_timeout(),
|
||||
#{return_values => true}
|
||||
)
|
||||
of
|
||||
{ok, []} ->
|
||||
?status_disconnected;
|
||||
{ok, Values} ->
|
||||
AllOk = lists:all(fun(S) -> S =:= ok end, Values),
|
||||
case AllOk of
|
||||
true ->
|
||||
?status_connected;
|
||||
false ->
|
||||
?status_disconnected
|
||||
end;
|
||||
health_check(DriverState) ->
|
||||
case erlazure:list_containers(DriverState, []) of
|
||||
{error, _} ->
|
||||
?status_disconnected
|
||||
?status_disconnected;
|
||||
{L, _} when is_list(L) ->
|
||||
?status_connected
|
||||
end.
|
||||
|
||||
map_error({failed_connect, _} = Reason) ->
|
||||
|
@ -734,8 +664,8 @@ check_aggreg_upload_errors(AggregId) ->
|
|||
ok
|
||||
end.
|
||||
|
||||
check_container_accessible(Pool, Container) ->
|
||||
list_blobs(Pool, Container).
|
||||
check_container_accessible(DriverState, Container) ->
|
||||
do_list_blobs(DriverState, Container).
|
||||
|
||||
block_id(N) ->
|
||||
NumDigits = 32,
|
||||
|
|
|
@ -105,7 +105,7 @@ init_per_testcase(TestCase, Config0) ->
|
|||
parameters => #{container => ContainerName}
|
||||
})
|
||||
end,
|
||||
Client = start_control_client(Endpoint),
|
||||
Client = new_control_driver(Endpoint),
|
||||
ct:pal("container name: ~s", [ContainerName]),
|
||||
ok = ensure_new_container(ContainerName, Client),
|
||||
Config = [
|
||||
|
@ -123,14 +123,12 @@ init_per_testcase(TestCase, Config0) ->
|
|||
Config.
|
||||
|
||||
end_per_testcase(_Testcase, Config) ->
|
||||
Client = ?config(client, Config),
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
|
||||
emqx_common_test_helpers:call_janitor(),
|
||||
ok = snabbkaffe:stop(),
|
||||
stop_control_client(Client),
|
||||
ok.
|
||||
|
||||
direct_action_cases() ->
|
||||
|
@ -144,17 +142,14 @@ direct_action_cases() ->
|
|||
%% Helper fns
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
start_control_client(Endpoint) ->
|
||||
{ok, Client} = erlazure:start(#{
|
||||
new_control_driver(Endpoint) ->
|
||||
{ok, Client} = erlazure:new(#{
|
||||
endpoint => Endpoint,
|
||||
account => binary_to_list(?ACCOUNT_NAME_BIN),
|
||||
key => binary_to_list(?ACCOUNT_KEY_BIN)
|
||||
}),
|
||||
Client.
|
||||
|
||||
stop_control_client(Client) ->
|
||||
gen_server:stop(Client).
|
||||
|
||||
container_name(Name) ->
|
||||
IOList = re:replace(bin(Name), <<"[^a-z0-9-]">>, <<"-">>, [global]),
|
||||
iolist_to_binary(IOList).
|
||||
|
|
Loading…
Reference in New Issue