feat: implement azure blob storage action

Fixes https://emqx.atlassian.net/browse/EMQX-12280
This commit is contained in:
Thales Macedo Garitezi 2024-05-10 17:47:17 -03:00
parent 60d24c6ad5
commit c916c83c7c
29 changed files with 2607 additions and 6 deletions

View File

@ -0,0 +1,24 @@
version: '3.9'
services:
azurite:
container_name: azurite
image: mcr.microsoft.com/azure-storage/azurite:3.30.0
restart: always
expose:
- "10000"
# ports:
# - "10000:10000"
networks:
- emqx_bridge
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:10000"]
interval: 30s
timeout: 5s
retries: 4
command:
- azurite-blob
- "--blobHost"
- 0.0.0.0
- "-d"
- debug.log

View File

@ -215,5 +215,11 @@
"listen": "0.0.0.0:9200", "listen": "0.0.0.0:9200",
"upstream": "elasticsearch:9200", "upstream": "elasticsearch:9200",
"enabled": true "enabled": true
},
{
"name": "azurite_plain",
"listen": "0.0.0.0:10000",
"upstream": "azurite:10000",
"enabled": true
} }
] ]

View File

@ -90,6 +90,7 @@
hard_coded_action_info_modules_ee() -> hard_coded_action_info_modules_ee() ->
[ [
emqx_bridge_azure_event_hub_action_info, emqx_bridge_azure_event_hub_action_info,
emqx_bridge_azure_blob_storage_action_info,
emqx_bridge_confluent_producer_action_info, emqx_bridge_confluent_producer_action_info,
emqx_bridge_dynamo_action_info, emqx_bridge_dynamo_action_info,
emqx_bridge_gcp_pubsub_consumer_action_info, emqx_bridge_gcp_pubsub_consumer_action_info,

View File

@ -162,8 +162,8 @@ do_parse_and_check(RootBin, TypeBin, NameBin, SchemaMod, RawConf) ->
InnerConfigMap. InnerConfigMap.
bridge_id(Config) -> bridge_id(Config) ->
BridgeType = ?config(bridge_type, Config), BridgeType = get_ct_config_with_fallback(Config, [action_type, bridge_type]),
BridgeName = ?config(bridge_name, Config), BridgeName = get_ct_config_with_fallback(Config, [action_name, bridge_name]),
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
ConnectorId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), ConnectorId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
<<"action:", BridgeId/binary, ":", ConnectorId/binary>>. <<"action:", BridgeId/binary, ":", ConnectorId/binary>>.
@ -536,6 +536,36 @@ list_connectors_http_api() ->
ct:pal("list connectors result:\n ~p", [Res]), ct:pal("list connectors result:\n ~p", [Res]),
Res. Res.
enable_kind_http_api(Config) ->
do_enable_disable_kind_http_api(enable, Config).
disable_kind_http_api(Config) ->
do_enable_disable_kind_http_api(disable, Config).
do_enable_disable_kind_http_api(Op, Config) ->
#{
kind := Kind,
type := Type,
name := Name
} = get_common_values(Config),
RootBin =
case Kind of
action -> <<"actions">>;
source -> <<"sources">>
end,
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
OpPath =
case Op of
enable -> "true";
disable -> "false"
end,
Path = emqx_mgmt_api_test_util:api_path([RootBin, BridgeId, "enable", OpPath]),
OpStr = emqx_utils_conv:str(Op),
ct:pal(OpStr ++ " action ~p (http v2)", [{Type, Name}]),
Res = request(put, Path, _Params = []),
ct:pal(OpStr ++ " action ~p (http v2) result:\n ~p", [{Type, Name}, Res]),
Res.
update_rule_http(RuleId, Params) -> update_rule_http(RuleId, Params) ->
Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId]), Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId]),
ct:pal("update rule ~p:\n ~p", [RuleId, Params]), ct:pal("update rule ~p:\n ~p", [RuleId, Params]),
@ -585,7 +615,7 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}). create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}).
create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) -> create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
BridgeName = ?config(bridge_name, Config), BridgeName = get_ct_config_with_fallback(Config, [action_name, bridge_name]),
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>), SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>),
Params0 = #{ Params0 = #{
@ -742,6 +772,85 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
end, end,
ok. ok.
%% Like `t_sync_query', but we send the message while the connector is
%% `?status_disconnected' and test that, after recovery, the buffered message eventually
%% is sent.
%% * `make_message_fn' is a function that receives the rule topic and returns a
%% `#message{}' record.
%% * `enter_tp_filter' is a function that selects a tracepoint that indicates the point
%% inside the connector's `on_query' callback function before actually trying to push
%% the data.
%% * `error_tp_filter' is a function that selects a tracepoint that indicates the
%% message was attempted to be sent at least once and failed.
%% * `success_tp_filter' is a function that selects a tracepoint that indicates the
%% point where the message was acknowledged as successfully written.
t_sync_query_down(Config, Opts) ->
#{
make_message_fn := MakeMessageFn,
enter_tp_filter := EnterTPFilter,
error_tp_filter := ErrorTPFilter,
success_tp_filter := SuccessTPFilter
} = Opts,
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
{CTTimetrap, _} = ct:get_timetrap_info(),
?check_trace(
#{timetrap => max(0, CTTimetrap - 500)},
begin
#{type := Type} = get_common_values(Config),
?assertMatch({ok, _}, create_bridge_api(Config)),
RuleTopic = emqx_topic:join([<<"test">>, emqx_utils_conv:bin(Type)]),
{ok, _} = create_rule_and_action_http(Type, RuleTopic, Config),
ResourceId = resource_id(Config),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
?force_ordering(
#{?snk_kind := call_query},
#{?snk_kind := cut_connection, ?snk_span := start}
),
%% Note: order of arguments here is reversed compared to `?force_ordering'.
snabbkaffe_nemesis:force_ordering(
EnterTPFilter,
_NEvents = 1,
fun(Event1, Event2) ->
EnterTPFilter(Event1) andalso
?match_event(#{
?snk_kind := cut_connection,
?snk_span := {complete, _}
})(
Event2
)
end
),
spawn_link(fun() ->
?tp_span(
cut_connection,
#{},
emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort)
)
end),
try
{_, {ok, _}} =
snabbkaffe:wait_async_action(
fun() -> spawn(fun() -> emqx:publish(MakeMessageFn(RuleTopic)) end) end,
ErrorTPFilter,
infinity
)
after
emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort)
end,
{ok, _} = snabbkaffe:block_until(SuccessTPFilter, infinity),
ok
end,
[]
),
ok.
%% - `ProduceFn': produces a message in the remote system that shall be consumed. May be %% - `ProduceFn': produces a message in the remote system that shall be consumed. May be
%% a `{function(), integer()}' tuple. %% a `{function(), integer()}' tuple.
%% - `Tracepoint': marks the end of consumed message processing. %% - `Tracepoint': marks the end of consumed message processing.
@ -945,7 +1054,7 @@ t_on_get_status(Config, Opts) ->
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config), ProxyName = ?config(proxy_name, Config),
FailureStatus = maps:get(failure_status, Opts, disconnected), FailureStatus = maps:get(failure_status, Opts, disconnected),
?assertMatch({ok, _}, create_bridge(Config)), ?assertMatch({ok, _}, create_bridge_api(Config)),
ResourceId = resource_id(Config), ResourceId = resource_id(Config),
%% Since the connection process is async, we give it some time to %% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness. %% stabilize and avoid flakiness.

View File

@ -0,0 +1,94 @@
Business Source License 1.1
Licensor: Hangzhou EMQ Technologies Co., Ltd.
Licensed Work: EMQX Enterprise Edition
The Licensed Work is (c) 2024
Hangzhou EMQ Technologies Co., Ltd.
Additional Use Grant: Students and educators are granted right to copy,
modify, and create derivative work for research
or education.
Change Date: 2028-05-17
Change License: Apache License, Version 2.0
For information about alternative licensing arrangements for the Software,
please contact Licensor: https://www.emqx.com/en/contact
Notice
The Business Source License (this document, or the “License”) is not an Open
Source license. However, the Licensed Work will eventually be made available
under an Open Source License, as stated in this License.
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
“Business Source License” is a trademark of MariaDB Corporation Ab.
-----------------------------------------------------------------------------
Business Source License 1.1
Terms
The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited
production use.
Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.
If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.
All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.
You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.
Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.
This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.
MariaDB hereby grants you permission to use this Licenses text to license
your works, and to refer to it using the trademark “Business Source License”,
as long as you comply with the Covenants of Licensor below.
Covenants of Licensor
In consideration of the right to use this Licenses text and the “Business
Source License” name and trademark, Licensor covenants to MariaDB, and to all
other recipients of the licensed work to be provided by Licensor:
1. To specify as the Change License the GPL Version 2.0 or any later version,
or a license that is compatible with GPL Version 2.0 or a later version,
where “compatible” means that software provided under the Change License can
be included in a program with software provided under GPL Version 2.0 or a
later version. Licensor may specify additional Change Licenses without
limitation.
2. To either: (a) specify an additional grant of rights to use that does not
impose any additional restriction on the right granted in this License, as
the Additional Use Grant; or (b) insert the text “None”.
3. To specify a Change Date.
4. Not to modify this License in any other way.

View File

@ -0,0 +1,16 @@
# EMQX Azure Blob Storage Bridge
This application provides connector and action implementations for the EMQX to integrate with Azure Blob Storage as part of the EMQX data integration pipelines.
Users can leverage [EMQX Rule Engine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html) to create rules that publish message data to Azure Blob Storage.
## Documentation
Refer to [Rules engine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html) for the EMQX rules engine introduction.
## Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).
## License
EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

View File

@ -0,0 +1,2 @@
azurite
toxiproxy

View File

@ -0,0 +1,16 @@
%% -*- mode: erlang; -*-
{erl_opts, [
warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
warnings_as_errors,
debug_info
]}.
{deps, [
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_connector_aggregator, {path, "../../apps/emqx_connector_aggregator"}},
{erlazure, {git, "https://github.com/emqx/erlazure.git", {tag, "0.3.0.2"}}}
]}.

View File

@ -0,0 +1,23 @@
{application, emqx_bridge_azure_blob_storage, [
{description, "EMQX Enterprise Azure Blob Storage Bridge"},
{vsn, "0.1.0"},
{registered, [emqx_bridge_azure_blob_storage_sup]},
{applications, [
kernel,
stdlib,
erlazure,
emqx_resource,
emqx_connector_aggregator
]},
{env, [
{emqx_action_info_modules, [
emqx_bridge_azure_blob_storage_action_info
]},
{emqx_connector_info_modules, [
emqx_bridge_azure_blob_storage_connector_info
]}
]},
{mod, {emqx_bridge_azure_blob_storage_app, []}},
{modules, []},
{links, []}
]}.

View File

@ -0,0 +1,16 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-ifndef(__EMQX_BRIDGE_ABS_HRL__).
-define(__EMQX_BRIDGE_ABS_HRL__, true).
-define(CONNECTOR_TYPE, azure_blob_storage).
-define(CONNECTOR_TYPE_BIN, <<"azure_blob_storage">>).
-define(ACTION_TYPE, azure_blob_storage).
-define(ACTION_TYPE_BIN, <<"azure_blob_storage">>).
-define(AGGREG_SUP, emqx_bridge_azure_blob_storage_sup).
%% END ifndef(__EMQX_BRIDGE_ABS_HRL__)
-endif.

View File

@ -0,0 +1,34 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_action_info).
-behaviour(emqx_action_info).
-include("emqx_bridge_azure_blob_storage.hrl").
%% `emqx_action_info' API
-export([
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% `emqx_action_info' API
%%------------------------------------------------------------------------------
action_type_name() -> ?ACTION_TYPE.
connector_type_name() -> ?CONNECTOR_TYPE.
schema_module() -> emqx_bridge_azure_blob_storage_action_schema.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

View File

@ -0,0 +1,300 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_action_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include("emqx_bridge_azure_blob_storage.hrl").
%% `hocon_schema' API
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%% `emqx_bridge_v2_schema' "unofficial" API
-export([
bridge_v2_examples/1
]).
%% API
-export([]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
"action_azure_blob_storage".
roots() ->
[].
fields(Field) when
Field == "get_bridge_v2";
Field == "put_bridge_v2";
Field == "post_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(?ACTION_TYPE));
fields(action) ->
{?ACTION_TYPE,
mk(
hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION_TYPE)),
#{
desc => <<"Azure Blob Storage Action Config">>,
required => false
}
)};
fields(?ACTION_TYPE) ->
emqx_bridge_v2_schema:make_producer_action_schema(
mk(
mkunion(mode, #{
<<"direct">> => ref(direct_parameters),
<<"aggregated">> => ref(aggreg_parameters)
}),
#{
required => true,
desc => ?DESC("parameters")
}
),
#{
resource_opts_ref => ref(action_resource_opts)
}
);
fields(direct_parameters) ->
[
{mode, mk(direct, #{required => true, desc => ?DESC("direct_mode")})},
%% Container in the Azure Blob Storage domain, not aggregation.
{container,
mk(
emqx_schema:template_str(),
#{
desc => ?DESC("direct_container_template"),
required => true
}
)},
{blob,
mk(
emqx_schema:template_str(),
#{
desc => ?DESC("direct_blob_template"),
required => true
}
)},
{content,
mk(
emqx_schema:template(),
#{
required => false,
default => <<"${.}">>,
desc => ?DESC("direct_content_template")
}
)}
| fields(common_action_parameters)
];
fields(aggreg_parameters) ->
[
{mode, mk(aggregated, #{required => true, desc => ?DESC("aggregated_mode")})},
{aggregation, mk(ref(aggregation), #{required => true, desc => ?DESC("aggregation")})},
%% Container in the Azure Blob Storage domain, not aggregation.
{container,
mk(
string(),
#{
desc => ?DESC("aggregated_container_name"),
required => true
}
)},
{blob,
mk(
emqx_schema:template_str(),
#{
desc => ?DESC("aggregated_blob_template"),
required => true
}
)}
| fields(common_action_parameters)
];
fields(aggregation) ->
[
emqx_connector_aggregator_schema:container(),
%% TODO: Needs bucketing? (e.g. messages falling in this 1h interval)
{time_interval,
hoconsc:mk(
emqx_schema:duration_s(),
#{
required => false,
default => <<"1h">>,
desc => ?DESC("aggregation_interval")
}
)},
{max_records,
hoconsc:mk(
pos_integer(),
#{
required => false,
default => 1_000_000,
desc => ?DESC("aggregation_max_records")
}
)}
];
fields(common_action_parameters) ->
[
{max_block_size,
mk(
emqx_schema:bytesize(),
#{
default => <<"4000mb">>,
importance => ?IMPORTANCE_HIDDEN,
desc => ?DESC("max_block_size"),
required => true,
validator => fun max_block_size_validator/1
}
)}
];
fields(action_resource_opts) ->
%% NOTE: This action should benefit from generous batching defaults.
emqx_bridge_v2_schema:action_resource_opts_fields([
{batch_size, #{default => 100}},
{batch_time, #{default => <<"10ms">>}}
]).
desc(Name) when
Name =:= ?ACTION_TYPE;
Name =:= aggregation;
Name =:= aggreg_parameters;
Name =:= direct_parameters
->
?DESC(Name);
desc(action_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_Name) ->
undefined.
%%-------------------------------------------------------------------------------------------------
%% `emqx_bridge_v2_schema' "unofficial" API
%%-------------------------------------------------------------------------------------------------
bridge_v2_examples(Method) ->
[
#{
<<"aggregated_", ?ACTION_TYPE_BIN/binary>> => #{
summary => <<"Azure Blob Storage Aggregated Upload Action">>,
value => action_example(Method, aggregated)
},
<<"direct_", ?ACTION_TYPE_BIN/binary>> => #{
summary => <<"Azure Blob Storage Direct Upload Action">>,
value => action_example(Method, direct)
}
}
].
action_example(post, Mode) ->
maps:merge(
action_example(put, Mode),
#{
type => ?ACTION_TYPE_BIN,
name => <<"my_action">>
}
);
action_example(get, Mode) ->
maps:merge(
action_example(put, Mode),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
action_example(put, aggregated) ->
#{
enable => true,
description => <<"my action">>,
connector => <<"my_connector">>,
parameters =>
#{
mode => <<"aggregated">>,
aggregation => #{
container => #{
type => <<"csv">>,
column_order => [<<"a">>, <<"b">>]
},
time_interval => <<"4s">>,
max_records => 10_000
},
container => <<"mycontainer">>,
blob => <<"${action}/${node}/${datetime.rfc3339}/${sequence}">>
},
resource_opts =>
#{
batch_time => <<"10ms">>,
batch_size => 100,
health_check_interval => <<"30s">>,
inflight_window => 100,
query_mode => <<"sync">>,
request_ttl => <<"45s">>,
worker_pool_size => 16
}
};
action_example(put, direct) ->
#{
enable => true,
description => <<"my action">>,
connector => <<"my_connector">>,
parameters =>
#{
mode => <<"direct">>,
container => <<"${.payload.container}">>,
blob => <<"${.payload.blob}">>,
content => <<"${.payload}">>
},
resource_opts =>
#{
batch_time => <<"0ms">>,
batch_size => 1,
health_check_interval => <<"30s">>,
inflight_window => 100,
query_mode => <<"sync">>,
request_ttl => <<"45s">>,
worker_pool_size => 16
}
}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
ref(Name) -> hoconsc:ref(?MODULE, Name).
mk(Type, Meta) -> hoconsc:mk(Type, Meta).
mkunion(Field, Schemas) ->
hoconsc:union(fun(Arg) -> scunion(Field, Schemas, Arg) end).
scunion(_Field, Schemas, all_union_members) ->
maps:values(Schemas);
scunion(Field, Schemas, {value, Value}) ->
Selector = maps:get(emqx_utils_conv:bin(Field), Value, undefined),
case Selector == undefined orelse maps:find(emqx_utils_conv:bin(Selector), Schemas) of
{ok, Schema} ->
[Schema];
_Error ->
throw(#{field_name => Field, expected => maps:keys(Schemas)})
end.
max_block_size_validator(SizeLimit) ->
case SizeLimit =< 4_000 * 1024 * 1024 of
true -> ok;
false -> {error, "must be less than 4000 MiB"}
end.

View File

@ -0,0 +1,28 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_app).
-behaviour(application).
%% `application' API
-export([start/2, stop/1]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% `application' API
%%------------------------------------------------------------------------------
start(_StartType, _StartArgs) ->
emqx_bridge_azure_blob_storage_sup:start_link().
stop(_State) ->
ok.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

View File

@ -0,0 +1,763 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_connector).
-behaviour(emqx_resource).
-behaviour(emqx_connector_aggreg_delivery).
-behaviour(emqx_template).
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("emqx_connector_aggregator/include/emqx_connector_aggregator.hrl").
-include_lib("emqx/include/emqx_trace.hrl").
-include("emqx_bridge_azure_blob_storage.hrl").
%% `emqx_resource' API
-export([
callback_mode/0,
on_start/2,
on_stop/2,
on_get_status/2,
on_get_channels/1,
on_add_channel/4,
on_remove_channel/3,
on_get_channel_status/3,
on_query/3,
on_batch_query/3
]).
%% `ecpool_worker' API
-export([
connect/1,
do_create_append_blob/3,
do_create_block_blob/3,
do_append_data/5,
do_put_block_list/4,
do_put_block_blob/4,
do_health_check/1,
do_list_blobs/2
]).
%% `emqx_connector_aggreg_delivery' API
-export([
init_transfer_state/2,
process_append/2,
process_write/1,
process_complete/1
]).
%% `emqx_template' API
-export([lookup/2]).
-ifdef(TEST).
-export([take_chunk/2]).
-endif.
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-type container() :: string().
-type blob() :: string().
-type connector_config() :: #{
endpoint => string(),
account_name := string(),
account_key := emqx_secret:t(string()),
resource_opts := map(),
any() => term()
}.
-type connector_state() :: #{
pool_name := connector_resource_id(),
installed_actions := #{action_resource_id() => action_state()}
}.
-type action_config() :: direct_action_config() | aggreg_action_config().
-type direct_action_config() :: #{
parameters := #{
mode := direct,
container := template_str(),
blob := template_str(),
content := template_str()
}
}.
-type aggreg_action_config() :: #{
parameters := #{
mode := aggregated,
aggregation := #{
%% TODO: other containers
container := #{type := csv},
time_interval := pos_integer(),
max_records := pos_integer()
},
container := string(),
blob := template_str()
},
any() => term()
}.
-type template_str() :: unicode:chardata().
-type action_state() :: direct_action_state() | aggreg_action_state().
-type direct_action_state() :: #{
mode := direct,
container := emqx_template:t(),
blob := emqx_template:t(),
content := emqx_template:t()
}.
-type aggreg_action_state() :: #{
mode := aggregated,
name := binary(),
container := string(),
aggreg_id := aggreg_id(),
supervisor := pid(),
on_stop := {module(), atom(), [term()]}
}.
-type aggreg_id() :: {binary(), binary()}.
-type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}.
-type pool_name() :: connector_resource_id().
-type transfer_opts() :: #{
upload_options := #{
action := binary(),
blob := emqx_template:t(),
container := string(),
max_block_size := pos_integer(),
pool := connector_resource_id()
}
}.
-type transfer_buffer() :: iolist().
-type transfer_state() :: #{
blob := blob(),
buffer := transfer_buffer(),
buffer_size := non_neg_integer(),
container := container(),
max_block_size := pos_integer(),
num_blocks := non_neg_integer(),
pool := pool_name(),
started := boolean()
}.
%%------------------------------------------------------------------------------
%% `emqx_resource' API
%%------------------------------------------------------------------------------
-spec callback_mode() -> callback_mode().
callback_mode() ->
always_sync.
-spec on_start(connector_resource_id(), connector_config()) ->
{ok, connector_state()} | {error, _Reason}.
on_start(ConnResId, ConnConfig) ->
#{
account_name := AccountName,
account_key := AccountKey
} = ConnConfig,
Endpoint = maps:get(endpoint, ConnConfig, undefined),
ClientOpts = [
{account_name, AccountName},
{account_key, AccountKey},
{endpoint, Endpoint}
],
case emqx_resource_pool:start(ConnResId, ?MODULE, ClientOpts) of
ok ->
State = #{
pool_name => ConnResId,
installed_actions => #{}
},
{ok, State};
{error, Reason} ->
{error, Reason}
end.
-spec on_stop(connector_resource_id(), connector_state()) -> ok.
on_stop(ConnResId, _ConnState) ->
Res = emqx_resource_pool:stop(ConnResId),
?tp(azure_blob_storage_stop, #{instance_id => ConnResId}),
Res.
-spec on_get_status(connector_resource_id(), connector_state()) ->
?status_connected | ?status_disconnected.
on_get_status(ConnResId, _ConnState) ->
health_check(ConnResId).
-spec on_add_channel(
connector_resource_id(),
connector_state(),
action_resource_id(),
action_config()
) ->
{ok, connector_state()}.
on_add_channel(_ConnResId, ConnState0, ActionResId, ActionConfig) ->
ActionState = install_action(ActionConfig, ConnState0),
ConnState = emqx_utils_maps:deep_put([installed_actions, ActionResId], ConnState0, ActionState),
{ok, ConnState}.
-spec on_remove_channel(
connector_resource_id(),
connector_state(),
action_resource_id()
) ->
{ok, connector_state()}.
on_remove_channel(_ConnResId, ConnState0, ActionResId) ->
#{installed_actions := InstalledActions0} = ConnState0,
case maps:take(ActionResId, InstalledActions0) of
{ActionState, InstalledActions} ->
ok = stop_action(ActionState),
ConnState = ConnState0#{installed_actions := InstalledActions},
{ok, ConnState};
error ->
{ok, ConnState0}
end.
-spec on_get_channels(connector_resource_id()) ->
[{action_resource_id(), action_config()}].
on_get_channels(ConnResId) ->
emqx_bridge_v2:get_channels_for_connector(ConnResId).
-spec on_get_channel_status(
connector_resource_id(),
action_resource_id(),
connector_state()
) ->
?status_connected | ?status_disconnected.
on_get_channel_status(
ConnResId,
ActionResId,
_ConnectorState = #{installed_actions := InstalledActions}
) when is_map_key(ActionResId, InstalledActions) ->
#{ActionResId := ActionConfig} = InstalledActions,
channel_status(ActionConfig, ConnResId);
on_get_channel_status(_ConnResId, _ActionResId, _ConnState) ->
?status_disconnected.
-spec on_query(connector_resource_id(), query(), connector_state()) ->
{ok, _Result} | {error, _Reason}.
on_query(ConnResId, {Tag, Data}, #{installed_actions := InstalledActions}) ->
case maps:get(Tag, InstalledActions, undefined) of
ChannelState = #{mode := direct} ->
?tp(azure_blob_storage_bridge_on_query_enter, #{mode => direct}),
run_direct_transfer(Data, ConnResId, Tag, ChannelState);
ChannelState = #{mode := aggregated} ->
?tp(azure_blob_storage_bridge_on_query_enter, #{mode => aggregated}),
run_aggregated_transfer([Data], ChannelState);
undefined ->
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
end.
-spec on_batch_query(connector_resource_id(), [query()], connector_state()) ->
{ok, _Result} | {error, _Reason}.
on_batch_query(_ConnResId, [{Tag, Data0} | Rest], #{installed_actions := InstalledActions}) ->
case maps:get(Tag, InstalledActions, undefined) of
ActionState = #{mode := aggregated} ->
Records = [Data0 | [Data || {_, Data} <- Rest]],
run_aggregated_transfer(Records, ActionState);
undefined ->
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
end.
%%------------------------------------------------------------------------------
%% `ecpool_worker' API
%%------------------------------------------------------------------------------
connect(Opts0) ->
#{
account_name := AccountName,
account_key := AccountKey,
endpoint := Endpoint
} = maps:from_list(Opts0),
erlazure:start(#{account => AccountName, key => AccountKey, endpoint => Endpoint}).
do_create_append_blob(Worker, Container, Blob) ->
%% TODO: check container type before setting content type
Opts = [{content_type, "text/csv"}],
erlazure:put_append_blob(Worker, Container, Blob, Opts, infinity).
create_block_blob(Pool, Container, Blob) ->
ecpool:pick_and_do(Pool, {?MODULE, do_create_block_blob, [Container, Blob]}, no_handover).
do_create_block_blob(Worker, Container, Blob) ->
%% TODO: check container type before setting content type
Opts = [{content_type, "text/csv"}],
erlazure:put_block_blob(Worker, Container, Blob, <<>>, Opts, infinity).
append_data(Pool, Container, Blob, BlockId, IOData) ->
ecpool:pick_and_do(
Pool, {?MODULE, do_append_data, [Container, Blob, BlockId, IOData]}, no_handover
).
do_append_data(Worker, Container, Blob, BlockId, IOData) ->
erlazure:put_block(Worker, Container, Blob, BlockId, IOData, [], infinity).
put_block_list(Pool, Container, Blob, BlockRefs) ->
ecpool:pick_and_do(
Pool, {?MODULE, do_put_block_list, [Container, Blob, BlockRefs]}, no_handover
).
do_put_block_list(Worker, Container, Blob, BlockRefs) ->
%% TODO: check container type before setting content type
Opts = [{req_opts, [{headers, [{"x-ms-blob-content-type", "text/csv"}]}]}],
erlazure:put_block_list(Worker, Container, Blob, BlockRefs, Opts, infinity).
put_block_blob(Pool, Container, Blob, IOData) ->
ecpool:pick_and_do(Pool, {?MODULE, do_put_block_blob, [Container, Blob, IOData]}, no_handover).
do_put_block_blob(Worker, Container, Blob, IOData) ->
erlazure:put_block_blob(Worker, Container, Blob, IOData, [], infinity).
do_health_check(Worker) ->
case erlazure:list_containers(Worker, [], infinity) of
{error, _} ->
error;
{L, _} when is_list(L) ->
ok
end.
list_blobs(Pool, Container) ->
ecpool:pick_and_do(Pool, {?MODULE, do_list_blobs, [Container]}, no_handover).
do_list_blobs(Worker, Container) ->
case erlazure:list_blobs(Worker, Container, [], infinity) of
{error, _} ->
error;
{L, _} when is_list(L) ->
ok
end.
%%------------------------------------------------------------------------------
%% `emqx_connector_aggreg_delivery' API
%%------------------------------------------------------------------------------
-spec init_transfer_state(emqx_connector_aggreg_delivery:buffer_map(), transfer_opts()) ->
transfer_state().
init_transfer_state(BufferMap, Opts) ->
#{
upload_options := #{
action := ActionName,
blob := BlobTemplate,
container := Container,
max_block_size := MaxBlockSize,
pool := Pool
}
} = Opts,
Blob = mk_blob_name_key(BufferMap, ActionName, BlobTemplate),
#{
blob => Blob,
buffer => [],
buffer_size => 0,
container => Container,
max_block_size => MaxBlockSize,
num_blocks => 0,
pool => Pool,
started => false
}.
mk_blob_name_key(BufferMap, ActionName, BlobTemplate) ->
emqx_template:render_strict(BlobTemplate, {?MODULE, {ActionName, BufferMap}}).
-spec process_append(iodata(), transfer_state()) ->
transfer_state().
process_append(IOData, TransferState0) ->
#{
buffer := Buffer,
buffer_size := BufferSize0
} = TransferState0,
TransferState0#{
buffer := [Buffer, IOData],
buffer_size := BufferSize0 + iolist_size(IOData)
}.
-spec process_write(transfer_state()) ->
{ok, transfer_state()} | {error, term()}.
process_write(TransferState0 = #{started := false}) ->
#{
pool := Pool,
blob := Blob,
container := Container
} = TransferState0,
%% TODO
%% Possible optimization: if the whole buffer fits the 5000 MiB `put_block_blob'
%% limit, we could upload the whole thing here.
case create_block_blob(Pool, Container, Blob) of
{ok, _} ->
TransferState = TransferState0#{started := true},
process_write(TransferState);
{error, Reason} ->
{error, Reason}
end;
process_write(TransferState = #{started := true, buffer_size := 0}) ->
{ok, TransferState};
process_write(TransferState0 = #{started := true}) ->
#{
buffer := Buffer,
buffer_size := BufferSize,
max_block_size := MaxBlockSize
} = TransferState0,
case BufferSize > MaxBlockSize of
true ->
{IOData, NewBuffer} = take_chunk(Buffer, MaxBlockSize),
?tp(azure_blob_storage_will_write_chunk, #{}),
do_process_write(IOData, NewBuffer, TransferState0);
false ->
NewBuffer = [],
do_process_write(Buffer, NewBuffer, TransferState0)
end.
do_process_write(IOData, NewBuffer, TransferState0 = #{started := true}) ->
#{
blob := Blob,
container := Container,
num_blocks := NumBlocks,
pool := Pool
} = TransferState0,
case append_data(Pool, Container, Blob, block_id(NumBlocks), IOData) of
{ok, _} ->
BufferSize = iolist_size(NewBuffer),
TransferState = TransferState0#{
buffer := NewBuffer,
buffer_size := BufferSize,
num_blocks := NumBlocks + 1
},
process_write(TransferState);
{error, Reason} ->
{error, Reason}
end.
-spec process_complete(transfer_state()) ->
{ok, term()}.
process_complete(TransferState) ->
#{
blob := Blob,
container := Container,
num_blocks := NumBlocks,
pool := Pool
} = TransferState,
BlockRefs = [{block_id(N), latest} || N <- lists:seq(0, NumBlocks - 1)],
case put_block_list(Pool, Container, Blob, BlockRefs) of
{ok, _} ->
{ok, #{num_blocks => NumBlocks}};
{error, Reason} ->
exit({upload_failed, Reason})
end.
%%------------------------------------------------------------------------------
%% `emqx_template' API
%%------------------------------------------------------------------------------
-spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) ->
{ok, integer() | string()} | {error, undefined}.
lookup([<<"action">>], {ActionName, _BufferMap}) ->
{ok, mk_fs_safe_string(ActionName)};
lookup(Accessor, {_ActionName, BufferMap = #{}}) ->
lookup_buffer_var(Accessor, BufferMap);
lookup(_Accessor, _Context) ->
{error, undefined}.
lookup_buffer_var([<<"datetime">>, Format], #{since := Since}) ->
{ok, format_timestamp(Since, Format)};
lookup_buffer_var([<<"datetime_until">>, Format], #{until := Until}) ->
{ok, format_timestamp(Until, Format)};
lookup_buffer_var([<<"sequence">>], #{seq := Seq}) ->
{ok, Seq};
lookup_buffer_var([<<"node">>], #{}) ->
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
lookup_buffer_var(_Binding, _Context) ->
{error, undefined}.
format_timestamp(Timestamp, <<"rfc3339utc">>) ->
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]),
mk_fs_safe_string(String);
format_timestamp(Timestamp, <<"rfc3339">>) ->
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]),
mk_fs_safe_string(String);
format_timestamp(Timestamp, <<"unix">>) ->
Timestamp.
mk_fs_safe_string(String) ->
unicode:characters_to_binary(string:replace(String, ":", "_", all)).
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
-spec install_action(action_config(), connector_state()) -> action_state().
install_action(#{parameters := #{mode := direct}} = ActionConfig, _ConnState) ->
#{
parameters := #{
mode := Mode = direct,
container := ContainerTemplateStr,
blob := BlobTemplateStr,
content := ContentTemplateStr
}
} = ActionConfig,
ContainerTemplate = emqx_template:parse(ContainerTemplateStr),
BlobTemplate = emqx_template:parse(BlobTemplateStr),
ContentTemplate = emqx_template:parse(ContentTemplateStr),
#{
mode => Mode,
container => ContainerTemplate,
blob => BlobTemplate,
content => ContentTemplate
};
install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState) ->
#{pool_name := Pool} = ConnState,
#{
bridge_name := Name,
parameters := #{
mode := Mode = aggregated,
aggregation := #{
container := ContainerOpts,
max_records := MaxRecords,
time_interval := TimeInterval
},
container := ContainerName,
blob := BlobTemplateStr,
max_block_size := MaxBlockSize
}
} = ActionConfig,
Type = ?ACTION_TYPE_BIN,
AggregId = {Type, Name},
Blob = mk_blob_name_template(BlobTemplateStr),
AggregOpts = #{
max_records => MaxRecords,
time_interval => TimeInterval,
work_dir => work_dir(Type, Name)
},
TransferOpts = #{
action => Name,
blob => Blob,
container => ContainerName,
max_block_size => MaxBlockSize,
pool => Pool
},
DeliveryOpts = #{
callback_module => ?MODULE,
container => ContainerOpts,
upload_options => TransferOpts
},
_ = ?AGGREG_SUP:delete_child(AggregId),
{ok, SupPid} = ?AGGREG_SUP:start_child(#{
id => AggregId,
start =>
{emqx_connector_aggreg_upload_sup, start_link, [AggregId, AggregOpts, DeliveryOpts]},
type => supervisor,
restart => permanent
}),
#{
mode => Mode,
name => Name,
container => ContainerName,
aggreg_id => AggregId,
supervisor => SupPid,
on_stop => {?AGGREG_SUP, delete_child, [AggregId]}
}.
-spec stop_action(action_config()) -> ok | {error, any()}.
stop_action(#{on_stop := {M, F, A}}) ->
apply(M, F, A);
stop_action(_) ->
ok.
run_direct_transfer(Data, ConnResId, ActionResId, ActionState) ->
#{
container := ContainerTemplate,
blob := BlobTemplate,
content := ContentTemplate
} = ActionState,
Container = render_container(ContainerTemplate, Data),
Blob = render_blob(BlobTemplate, Data),
Content = render_content(ContentTemplate, Data),
emqx_trace:rendered_action_template(ActionResId, #{
container => Container,
blob => Blob,
content => #emqx_trace_format_func_data{
function = fun unicode:characters_to_binary/1,
data = Content
}
}),
case put_block_blob(ConnResId, Container, Blob, Content) of
{ok, created} ->
?tp(azure_blob_storage_bridge_connector_upload_ok, #{instance_id => ConnResId}),
ok;
{error, Reason} ->
?tp(
azure_blob_storage_bridge_direct_upload_error,
#{instance_id => ConnResId, reason => Reason}
),
{error, map_error(Reason)}
end.
run_aggregated_transfer(Records, #{aggreg_id := AggregId}) ->
Timestamp = erlang:system_time(second),
case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of
ok ->
ok;
{error, Reason} ->
{error, {unrecoverable_error, Reason}}
end.
work_dir(Type, Name) ->
filename:join([emqx:data_dir(), bridge, Type, Name]).
-spec mk_blob_name_template(template_str()) -> emqx_template:str().
mk_blob_name_template(TemplateStr) ->
Template = emqx_template:parse(TemplateStr),
{_, BindingErrors} = emqx_template:render(Template, #{}),
{UsedBindings, _} = lists:unzip(BindingErrors),
SuffixTemplate = mk_suffix_template(UsedBindings),
case emqx_template:is_const(SuffixTemplate) of
true ->
Template;
false ->
Template ++ SuffixTemplate
end.
mk_suffix_template(UsedBindings) ->
RequiredBindings = ["action", "node", "datetime.", "sequence"],
SuffixBindings = [
mk_default_binding(RB)
|| RB <- RequiredBindings,
lists:all(fun(UB) -> string:prefix(UB, RB) == nomatch end, UsedBindings)
],
SuffixTemplate = [["/", B] || B <- SuffixBindings],
emqx_template:parse(SuffixTemplate).
mk_default_binding("datetime.") ->
"${datetime.rfc3339utc}";
mk_default_binding(Binding) ->
"${" ++ Binding ++ "}".
render_container(Template, Data) ->
case emqx_template:render(Template, {emqx_jsonish, Data}) of
{Result, []} ->
iolist_to_string(Result);
{_, Errors} ->
erlang:error({unrecoverable_error, {container_undefined, Errors}})
end.
render_blob(Template, Data) ->
%% NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`.
{Result, _Errors} = emqx_template:render(Template, {emqx_jsonish, Data}),
iolist_to_string(Result).
render_content(Template, Data) ->
%% NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`.
{Result, _Errors} = emqx_template:render(Template, {emqx_jsonish, Data}),
Result.
iolist_to_string(IOList) ->
unicode:characters_to_list(IOList).
channel_status(#{mode := direct}, _ConnResId) ->
%% There's nothing in particular to check for in this mode; the connector health check
%% already verifies that we're able to use the client to list containers.
?status_connected;
channel_status(#{mode := aggregated} = ActionState, ConnResId) ->
#{container := Container, aggreg_id := AggregId} = ActionState,
%% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
Timestamp = erlang:system_time(second),
ok = emqx_connector_aggregator:tick(AggregId, Timestamp),
ok = check_container_accessible(ConnResId, Container),
ok = check_aggreg_upload_errors(AggregId),
?status_connected.
health_check(ConnResId) ->
case
emqx_resource_pool:health_check_workers(
ConnResId,
fun ?MODULE:do_health_check/1,
emqx_resource_pool:health_check_timeout(),
#{return_values => true}
)
of
{ok, []} ->
?status_disconnected;
{ok, Values} ->
AllOk = lists:all(fun(S) -> S =:= ok end, Values),
case AllOk of
true ->
?status_connected;
false ->
?status_disconnected
end;
{error, _} ->
?status_disconnected
end.
map_error({failed_connect, _} = Reason) ->
{recoverable_error, Reason};
map_error(Reason) ->
{unrecoverable_error, Reason}.
check_aggreg_upload_errors(AggregId) ->
case emqx_connector_aggregator:take_error(AggregId) of
[Error] ->
%% TODO
%% This approach means that, for example, 3 upload failures will cause
%% the channel to be marked as unhealthy for 3 consecutive health checks.
ErrorMessage = emqx_utils:format(Error),
throw({unhealthy_target, ErrorMessage});
[] ->
ok
end.
check_container_accessible(Pool, Container) ->
list_blobs(Pool, Container).
-spec take_chunk(transfer_buffer(), pos_integer()) -> {_IOList :: iolist(), transfer_buffer()}.
take_chunk(Buffer, MaxBlockSize) ->
RemainingBytes = MaxBlockSize,
take_chunk(Buffer, RemainingBytes, _IOListAcc = []).
-spec take_chunk(transfer_buffer(), non_neg_integer(), IOList) ->
{IOList, transfer_buffer()}
when
IOList :: iolist().
take_chunk(RemainingBuffer = [], _RemainingBytes, IOListAcc) ->
{IOListAcc, RemainingBuffer};
take_chunk(RemainingBuffer, RemainingBytes, IOListAcc) when RemainingBytes =< 0 ->
{IOListAcc, [RemainingBuffer]};
take_chunk([Data0], RemainingBytes, IOListAcc) ->
case do_take_chunk([Data0], RemainingBytes) of
{done, Data} ->
{[IOListAcc, Data], _RemainingBuffer = []};
{more, {Data, Rest}} ->
{[IOListAcc, Data], [Rest]}
end;
take_chunk([Data0, Rest0], RemainingBytes0, IOListAcc) ->
case do_take_chunk(Data0, RemainingBytes0) of
{done, Data} ->
RemainingBytes = RemainingBytes0 - iolist_size(Data),
take_chunk([Rest0], RemainingBytes, [IOListAcc, Data]);
{more, {Data, Rest}} ->
{[IOListAcc, Data], [Rest, Rest0]}
end.
do_take_chunk(Data, ChunkSize) ->
case iolist_size(Data) =< ChunkSize of
true ->
{done, Data};
false ->
BinData0 = iolist_to_binary(Data),
{more, split_binary(BinData0, ChunkSize)}
end.
%% ensure_transfer_buffer_shape([_, _] = Buffer) ->
%% Buffer;
%% ensure_transfer_buffer_shape(Data) ->
%% [Data].
block_id(N) ->
NumDigits = 32,
list_to_binary(string:pad(integer_to_list(N), NumDigits, leading, $0)).

View File

@ -0,0 +1,73 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_connector_info).
-behaviour(emqx_connector_info).
-include("emqx_bridge_azure_blob_storage.hrl").
%% `emqx_connector_info' API
-export([
type_name/0,
bridge_types/0,
resource_callback_module/0,
config_schema/0,
schema_module/0,
api_schema/1
]).
%% API
-export([]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-define(SCHEMA_MOD, emqx_bridge_azure_blob_storage_connector_schema).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% `emqx_connector_info' API
%%------------------------------------------------------------------------------
type_name() ->
?CONNECTOR_TYPE.
bridge_types() ->
[?ACTION_TYPE].
resource_callback_module() ->
emqx_bridge_azure_blob_storage_connector.
config_schema() ->
{?CONNECTOR_TYPE,
hoconsc:mk(
hoconsc:map(
name,
hoconsc:ref(
?SCHEMA_MOD,
"config_connector"
)
),
#{
desc => <<"Azure Blob Storage Connector Config">>,
required => false
}
)}.
schema_module() ->
?SCHEMA_MOD.
api_schema(Method) ->
emqx_connector_schema:api_ref(
?SCHEMA_MOD, ?CONNECTOR_TYPE_BIN, Method ++ "_connector"
).
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

View File

@ -0,0 +1,142 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_connector_schema).
-behaviour(hocon_schema).
-behaviour(emqx_connector_examples).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
%% `hocon_schema' API
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%% `emqx_connector_examples' API
-export([
connector_examples/1
]).
%% API
-export([]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-define(CONNECTOR_TYPE, azure_blob_storage).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
"connector_azure_blob_storage".
roots() ->
[].
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, fields(connector_config));
fields("config_connector") ->
emqx_connector_schema:common_fields() ++ fields(connector_config);
fields(connector_config) ->
[
{account_name,
mk(string(), #{
required => true,
desc => ?DESC("account_name")
})},
{account_key,
emqx_schema_secret:mk(
#{
desc => ?DESC("account_key")
}
)},
{endpoint,
mk(
string(),
#{
required => false,
importance => ?IMPORTANCE_HIDDEN
}
)}
] ++
emqx_connector_schema:resource_opts_ref(?MODULE, resource_opts);
fields(resource_opts) ->
emqx_connector_schema:resource_opts_fields().
desc("config_connector") ->
?DESC("config_connector");
desc(resource_opts) ->
?DESC(emqx_resource_schema, resource_opts);
desc(_Name) ->
undefined.
%%-------------------------------------------------------------------------------------------------
%% `emqx_connector_examples' API
%%-------------------------------------------------------------------------------------------------
connector_examples(Method) ->
[
#{
<<"abs">> => #{
summary => <<"Azure Blob Storage Connector">>,
value => connector_example(Method)
}
}
].
connector_example(get) ->
maps:merge(
connector_example(put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
connector_example(post) ->
maps:merge(
connector_example(put),
#{
type => atom_to_binary(?CONNECTOR_TYPE),
name => <<"my_connector">>
}
);
connector_example(put) ->
#{
enable => true,
description => <<"My connector">>,
account_name => <<"my_account_name">>,
account_key => <<"******">>,
resource_opts => #{
health_check_interval => <<"45s">>,
start_after_created => true,
start_timeout => <<"5s">>
}
}.
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
mk(Type, Meta) -> hoconsc:mk(Type, Meta).

View File

@ -0,0 +1,54 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_sup).
%% API
-export([
start_link/0,
start_child/1,
delete_child/1
]).
%% `supervisor' API
-export([init/1]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(ChildSpec) ->
supervisor:start_child(?MODULE, ChildSpec).
delete_child(ChildId) ->
case supervisor:terminate_child(?MODULE, ChildId) of
ok ->
supervisor:delete_child(?MODULE, ChildId);
Error ->
Error
end.
%%------------------------------------------------------------------------------
%% `supervisor' API
%%------------------------------------------------------------------------------
init([]) ->
SupFlags = #{
strategy => one_for_one,
intensity => 1,
period => 1
},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

View File

@ -0,0 +1,687 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_common_test_helpers, [on_exit/1]).
-import(emqx_utils_conv, [bin/1, str/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("erlazure/include/erlazure.hrl").
-include("emqx_bridge_azure_blob_storage.hrl").
-define(ACCOUNT_NAME_BIN, <<"devstoreaccount1">>).
-define(ACCOUNT_KEY_BIN, <<
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsu"
"Fq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
>>).
-define(CONF_MAX_RECORDS, 100).
-define(CONF_COLUMN_ORDER, ?CONF_COLUMN_ORDER([])).
-define(CONF_COLUMN_ORDER(T), [
<<"publish_received_at">>,
<<"clientid">>,
<<"topic">>,
<<"payload">>,
<<"empty">>
| T
]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Endpoint = os:getenv("AZURITE_ENDPOINT", "http://toxiproxy:10000/"),
#{host := Host, port := Port} = uri_string:parse(Endpoint),
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
ProxyName = "azurite_plain",
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
true ->
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_bridge_azure_blob_storage,
emqx_bridge,
emqx_rule_engine,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _Api} = emqx_common_test_http:create_default_app(),
[
{apps, Apps},
{proxy_name, ProxyName},
{proxy_host, ProxyHost},
{proxy_port, ProxyPort},
{endpoint, Endpoint}
| Config
];
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_azurite);
_ ->
{skip, no_azurite}
end
end.
end_per_suite(Config) ->
Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(TestCase, Config0) ->
ct:timetrap(timer:seconds(31)),
Endpoint = ?config(endpoint, Config0),
UniqueNum = integer_to_binary(erlang:unique_integer()),
Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>,
ConnectorConfig = connector_config(Name, Endpoint),
ContainerName = container_name(Name),
%% TODO: switch based on test
ActionConfig =
case lists:member(TestCase, direct_action_cases()) of
true ->
direct_action_config(#{
connector => Name,
parameters => #{container => ContainerName}
});
false ->
aggreg_action_config(#{
connector => Name,
parameters => #{container => ContainerName}
})
end,
Client = start_control_client(Endpoint),
ct:pal("container name: ~s", [ContainerName]),
ok = ensure_new_container(ContainerName, Client),
Config = [
{bridge_kind, action},
{action_type, ?ACTION_TYPE_BIN},
{action_name, Name},
{action_config, ActionConfig},
{connector_name, Name},
{connector_type, ?CONNECTOR_TYPE_BIN},
{connector_config, ConnectorConfig},
{container_name, ContainerName},
{client, Client}
| Config0
],
Config.
end_per_testcase(_Testcase, Config) ->
Client = ?config(client, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_common_test_helpers:call_janitor(),
ok = snabbkaffe:stop(),
stop_control_client(Client),
ok.
direct_action_cases() ->
[
t_sync_query,
t_sync_query_down
].
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
start_control_client(Endpoint) ->
{ok, Client} = erlazure:start(#{
endpoint => Endpoint,
account => binary_to_list(?ACCOUNT_NAME_BIN),
key => binary_to_list(?ACCOUNT_KEY_BIN)
}),
Client.
stop_control_client(Client) ->
gen_server:stop(Client).
container_name(Name) ->
IOList = re:replace(bin(Name), <<"[^a-z0-9-]">>, <<"-">>, [global]),
iolist_to_binary(IOList).
ensure_new_container(Name0, Client) ->
Name = str(Name0),
case erlazure:create_container(Client, Name) of
{ok, created} ->
ok;
{error, #{code := "ContainerAlreadyExists"}} ->
{ok, deleted} = erlazure:delete_container(Client, Name),
{ok, created} = erlazure:create_container(Client, Name),
ok
end,
on_exit(fun() -> {ok, deleted} = erlazure:delete_container(Client, Name) end),
ok.
connector_config(Name, Endpoint) ->
InnerConfigMap0 =
#{
<<"enable">> => true,
<<"tags">> => [<<"bridge">>],
<<"description">> => <<"my cool bridge">>,
<<"endpoint">> => Endpoint,
%% Default Azurite credentials
%% See: https://github.com/Azure/Azurite/blob/main/README.md#default-storage-account
<<"account_name">> => ?ACCOUNT_NAME_BIN,
<<"account_key">> => ?ACCOUNT_KEY_BIN,
<<"resource_opts">> =>
#{
<<"health_check_interval">> => <<"1s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
}
},
emqx_bridge_v2_testlib:parse_and_check_connector(?ACTION_TYPE_BIN, Name, InnerConfigMap0).
direct_action_config(Overrides0) ->
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
CommonConfig =
#{
<<"enable">> => true,
<<"connector">> => <<"please override">>,
<<"parameters">> =>
#{
<<"mode">> => <<"direct">>,
<<"container">> => <<"${payload.c}">>,
<<"blob">> => <<"${payload.b}">>,
<<"content">> => <<"${.}">>
},
<<"resource_opts">> => #{
<<"batch_size">> => 1,
<<"batch_time">> => <<"0ms">>,
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"1s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"15s">>,
<<"resume_interval">> => <<"1s">>,
<<"worker_pool_size">> => <<"1">>
}
},
emqx_utils_maps:deep_merge(CommonConfig, Overrides).
aggreg_action_config(Overrides0) ->
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
CommonConfig =
#{
<<"enable">> => true,
<<"connector">> => <<"please override">>,
<<"parameters">> =>
#{
<<"mode">> => <<"aggregated">>,
<<"aggregation">> => #{
<<"container">> => #{
<<"type">> => <<"csv">>,
<<"column_order">> => ?CONF_COLUMN_ORDER
},
<<"time_interval">> => <<"4s">>,
<<"max_records">> => ?CONF_MAX_RECORDS
},
<<"container">> => <<"mycontainer">>,
<<"blob">> => <<"${action}/${node}/${datetime.rfc3339}/${sequence}">>
},
<<"resource_opts">> => #{
<<"batch_size">> => 100,
<<"batch_time">> => <<"10ms">>,
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"1s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"15s">>,
<<"resume_interval">> => <<"1s">>,
<<"worker_pool_size">> => <<"1">>
}
},
emqx_utils_maps:deep_merge(CommonConfig, Overrides).
aggreg_id(BridgeName) ->
{?ACTION_TYPE_BIN, BridgeName}.
mk_message_event(ClientId, Topic, Payload) ->
Message = emqx_message:make(bin(ClientId), bin(Topic), Payload),
{Event, _} = emqx_rule_events:eventmsg_publish(Message),
emqx_utils_maps:binary_key_map(Event).
mk_message({ClientId, Topic, Payload}) ->
emqx_message:make(bin(ClientId), bin(Topic), Payload).
publish_messages(MessageEvents) ->
lists:foreach(fun emqx:publish/1, MessageEvents).
publish_messages_delayed(MessageEvents, Delay) ->
lists:foreach(
fun(Msg) ->
emqx:publish(Msg),
ct:sleep(Delay)
end,
MessageEvents
).
list_blobs(Config) ->
Client = ?config(client, Config),
ContainerName = ?config(container_name, Config),
{Blobs, _} = erlazure:list_blobs(Client, str(ContainerName)),
Blobs.
get_blob(BlobName, Config) ->
Client = ?config(client, Config),
ContainerName = ?config(container_name, Config),
{ok, Blob} = erlazure:get_blob(Client, str(ContainerName), str(BlobName)),
Blob.
get_and_decode_event(BlobName, Config) ->
maps:update_with(
<<"payload">>,
fun(Raw) -> emqx_utils_json:decode(Raw, [return_maps]) end,
emqx_utils_json:decode(get_blob(BlobName, Config), [return_maps])
).
list_committed_blocks(Config) ->
Client = ?config(client, Config),
Container0 = ?config(container_name, Config),
Container = emqx_utils_conv:str(Container0),
{Blobs, _} = erlazure:list_blobs(Client, Container),
lists:map(
fun(#cloud_blob{name = BlobName}) ->
{Blocks, _} = erlazure:get_block_list(Client, Container, BlobName),
{BlobName, [{Id, Type} || #blob_block{id = Id, type = Type} <- Blocks]}
end,
Blobs
).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_start_stop(Config) ->
ok = emqx_bridge_v2_testlib:t_start_stop(Config, azure_blob_storage_stop),
ok.
t_create_via_http(Config) ->
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
ok.
t_on_get_status(Config) ->
ok = emqx_bridge_v2_testlib:t_on_get_status(Config),
ok.
%% Testing non-aggregated / direct action
t_sync_query(Config) ->
ContainerName = ?config(container_name, Config),
Topic = <<"t/a">>,
Payload0 = #{
<<"b">> => <<"myblob">>,
<<"c">> => ContainerName,
<<"x">> => <<"first data">>
},
Payload0Bin = emqx_utils_json:encode(Payload0),
ClientId = <<"some_client">>,
MsgEvent0 = mk_message_event(ClientId, Topic, Payload0Bin),
ok = emqx_bridge_v2_testlib:t_sync_query(
Config,
fun() -> MsgEvent0 end,
fun(Res) -> ?assertMatch(ok, Res) end,
azure_blob_storage_bridge_connector_upload_ok
),
Decoded0 = get_and_decode_event(<<"myblob">>, Config),
?assertMatch(#{<<"payload">> := #{<<"x">> := <<"first data">>}}, Decoded0),
%% Test sending the same payload again, so that the same blob is written to.
ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
Payload1 = Payload0#{<<"x">> => <<"new data">>},
Payload1Bin = emqx_utils_json:encode(Payload1),
MsgEvent1 = mk_message_event(ClientId, Topic, Payload1Bin),
Message = {BridgeId, MsgEvent1},
?assertMatch(ok, emqx_resource:simple_sync_query(ResourceId, Message)),
Decoded1 = get_and_decode_event(<<"myblob">>, Config),
?assertMatch(#{<<"payload">> := #{<<"x">> := <<"new data">>}}, Decoded1),
ok.
%% Testing non-aggregated / direct action
t_sync_query_down(Config) ->
ContainerName = ?config(container_name, Config),
Payload0 = #{
<<"b">> => <<"myblob">>,
<<"c">> => ContainerName,
<<"x">> => <<"first data">>
},
Payload0Bin = emqx_utils_json:encode(Payload0),
ClientId = <<"some_client">>,
ok = emqx_bridge_v2_testlib:t_sync_query_down(
Config,
#{
make_message_fn => fun(Topic) -> mk_message({ClientId, Topic, Payload0Bin}) end,
enter_tp_filter =>
?match_event(#{
?snk_kind := azure_blob_storage_bridge_on_query_enter,
mode := direct
}),
error_tp_filter =>
?match_event(#{?snk_kind := azure_blob_storage_bridge_direct_upload_error}),
success_tp_filter =>
?match_event(#{?snk_kind := azure_blob_storage_bridge_connector_upload_ok})
}
),
ok.
t_aggreg_upload(Config) ->
ActionName = ?config(action_name, Config),
AggregId = aggreg_id(ActionName),
?check_trace(
#{timetrap => timer:seconds(30)},
begin
ActionNameString = unicode:characters_to_list(ActionName),
NodeString = atom_to_list(node()),
%% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge_api(Config)),
{ok, _Rule} =
emqx_bridge_v2_testlib:create_rule_and_action_http(
?ACTION_TYPE_BIN, <<"">>, Config, #{
sql => <<
"SELECT"
" *,"
" strlen(payload) as psize,"
" unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at"
" FROM 'abs/#'"
>>
}
),
Messages = lists:map(fun mk_message/1, [
{<<"C1">>, T1 = <<"abs/a/b/c">>, P1 = <<"{\"hello\":\"world\"}">>},
{<<"C2">>, T2 = <<"abs/foo/bar">>, P2 = <<"baz">>},
{<<"C3">>, T3 = <<"abs/t/42">>, P3 = <<"">>},
%% Won't match rule filter
{<<"C4">>, <<"t/42">>, <<"won't appear in results">>}
]),
ok = publish_messages(Messages),
%% Wait until the delivery is completed.
?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
%% Check the uploaded objects.
_Uploads =
[#cloud_blob{name = BlobName, properties = UploadProps}] = list_blobs(Config),
?assertMatch(#{content_type := "text/csv"}, maps:from_list(UploadProps)),
?assertMatch(
[ActionNameString, NodeString, _Datetime, _Seq = "0"],
string:split(BlobName, "/", all)
),
Content = get_blob(BlobName, Config),
%% Verify that column order is respected.
?assertMatch(
{ok, [
?CONF_COLUMN_ORDER(_),
[_TS1, <<"C1">>, T1, P1, <<>> | _],
[_TS2, <<"C2">>, T2, P2, <<>> | _],
[_TS3, <<"C3">>, T3, P3, <<>> | _]
]},
erl_csv:decode(Content)
),
ok
end,
[]
),
ok.
%% This test verifies that the bridge will reuse existing aggregation buffer after a
%% restart.
t_aggreg_upload_restart(Config) ->
ActionName = ?config(action_name, Config),
AggregId = aggreg_id(ActionName),
?check_trace(
#{timetrap => timer:seconds(30)},
begin
ActionNameString = unicode:characters_to_list(ActionName),
NodeString = atom_to_list(node()),
%% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge_api(Config)),
{ok, _Rule} =
emqx_bridge_v2_testlib:create_rule_and_action_http(
?ACTION_TYPE_BIN, <<"">>, Config, #{
sql => <<
"SELECT"
" *,"
" strlen(payload) as psize,"
" unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at"
" FROM 'abs/#'"
>>
}
),
Messages = lists:map(fun mk_message/1, [
{<<"C1">>, T1 = <<"abs/a/b/c">>, P1 = <<"{\"hello\":\"world\"}">>},
{<<"C2">>, T2 = <<"abs/foo/bar">>, P2 = <<"baz">>},
{<<"C3">>, T3 = <<"abs/t/42">>, P3 = <<"">>}
]),
ok = publish_messages(Messages),
{ok, _} = ?block_until(#{
?snk_kind := connector_aggreg_records_written, action := AggregId
}),
{ok, _} =
?wait_async_action(
begin
%% Restart the bridge.
{ok, {{_, 204, _}, _, _}} = emqx_bridge_v2_testlib:disable_kind_http_api(
Config
),
{ok, {{_, 204, _}, _, _}} = emqx_bridge_v2_testlib:enable_kind_http_api(
Config
),
%% Send some more messages.
ok = publish_messages(Messages)
end,
#{?snk_kind := connector_aggreg_records_written, action := AggregId}
),
%% Wait until the delivery is completed.
?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
%% Check there's still only one upload.
[#cloud_blob{name = BlobName, properties = UploadProps}] = list_blobs(Config),
?assertMatch(#{content_type := "text/csv"}, maps:from_list(UploadProps)),
?assertMatch(
[ActionNameString, NodeString, _Datetime, _Seq = "0"],
string:split(BlobName, "/", all)
),
Content = get_blob(BlobName, Config),
?assertMatch(
{ok, [
?CONF_COLUMN_ORDER(_),
[_TS1, <<"C1">>, T1, P1, <<>> | _],
[_TS2, <<"C2">>, T2, P2, <<>> | _],
[_TS3, <<"C3">>, T3, P3, <<>> | _],
[_TS1, <<"C1">>, T1, P1, <<>> | _],
[_TS2, <<"C2">>, T2, P2, <<>> | _],
[_TS3, <<"C3">>, T3, P3, <<>> | _]
]},
erl_csv:decode(Content)
),
ok
end,
[]
),
ok.
%% This test verifies that the bridge can recover from a buffer file corruption, and does
%% so while preserving uncompromised data.
t_aggreg_upload_restart_corrupted(Config) ->
ActionName = ?config(action_name, Config),
AggregId = aggreg_id(ActionName),
BatchSize = ?CONF_MAX_RECORDS div 2,
?check_trace(
#{timetrap => timer:seconds(30)},
begin
%% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge_api(Config)),
{ok, _Rule} =
emqx_bridge_v2_testlib:create_rule_and_action_http(
?ACTION_TYPE_BIN, <<"">>, Config, #{
sql => <<
"SELECT"
" *,"
" strlen(payload) as psize,"
" unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at"
" FROM 'abs/#'"
>>
}
),
Messages1 = [
{integer_to_binary(N), <<"abs/a/b/c">>, <<"{\"hello\":\"world\"}">>}
|| N <- lists:seq(1, BatchSize)
],
%% Ensure that they span multiple batch queries.
{ok, {ok, _}} =
?wait_async_action(
publish_messages_delayed(lists:map(fun mk_message/1, Messages1), 1),
#{?snk_kind := connector_aggreg_records_written, action := AggregId}
),
ct:pal("first batch's records have been written"),
%% Find out the buffer file.
{ok, #{filename := Filename}} = ?block_until(
#{?snk_kind := connector_aggreg_buffer_allocated, action := AggregId}
),
ct:pal("new buffer allocated"),
%% Stop the bridge, corrupt the buffer file, and restart the bridge.
{ok, {{_, 204, _}, _, _}} = emqx_bridge_v2_testlib:disable_kind_http_api(Config),
BufferFileSize = filelib:file_size(Filename),
ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, BufferFileSize div 2),
{ok, {{_, 204, _}, _, _}} = emqx_bridge_v2_testlib:enable_kind_http_api(Config),
%% Send some more messages.
Messages2 = [
{integer_to_binary(N), <<"abs/a/b/c">>, <<"{\"hello\":\"world\"}">>}
|| N <- lists:seq(1, BatchSize)
],
ok = publish_messages_delayed(lists:map(fun mk_message/1, Messages2), 1),
ct:pal("published second batch"),
%% Wait until the delivery is completed.
{ok, _} = ?block_until(#{
?snk_kind := connector_aggreg_delivery_completed, action := AggregId
}),
ct:pal("delivery completed"),
%% Check that upload contains part of the first batch and all of the second batch.
[#cloud_blob{name = BlobName}] = list_blobs(Config),
{ok, CSV = [_Header | Rows]} = erl_csv:decode(get_blob(BlobName, Config)),
NRows = length(Rows),
?assert(NRows > BatchSize, CSV),
?assertEqual(
lists:sublist(Messages1, NRows - BatchSize) ++ Messages2,
[{ClientID, Topic, Payload} || [_TS, ClientID, Topic, Payload | _] <- Rows],
CSV
),
ok
end,
[]
),
ok.
%% This test verifies that the bridge will finish uploading a buffer file after a restart.
t_aggreg_pending_upload_restart(Config) ->
ActionName = ?config(action_name, Config),
AggregId = aggreg_id(ActionName),
?check_trace(
#{timetrap => timer:seconds(30)},
begin
%% Create a bridge with the sample configuration.
?assertMatch(
{ok, _Bridge},
emqx_bridge_v2_testlib:create_bridge_api(
Config,
#{
<<"parameters">> =>
#{
<<"max_block_size">> => <<"1024B">>
}
}
)
),
{ok, _Rule} =
emqx_bridge_v2_testlib:create_rule_and_action_http(
?ACTION_TYPE_BIN, <<"">>, Config, #{
sql => <<
"SELECT"
" *,"
" strlen(payload) as psize,"
" unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at"
" FROM 'abs/#'"
>>
}
),
%% Send few large messages that will require multipart upload.
%% Ensure that they span multiple batch queries.
Payload0 = iolist_to_binary(lists:duplicate(128, "PAYLOAD!")),
%% Payload0 = iolist_to_binary(lists:duplicate(128 * 1024, "PAYLOAD!")),
Messages = [
{integer_to_binary(N), <<"abs/a/b/c">>, Payload0}
|| N <- lists:seq(1, 10)
],
{ok, {ok, _}} =
?wait_async_action(
publish_messages_delayed(lists:map(fun mk_message/1, Messages), 10),
%% Wait until the multipart upload is started.
#{?snk_kind := azure_blob_storage_will_write_chunk}
),
ct:pal("published messages"),
%% Stop the bridge.
{ok, {{_, 204, _}, _, _}} = emqx_bridge_v2_testlib:disable_kind_http_api(
Config
),
ct:pal("stopped bridge"),
%% Verify that pending uploads have been gracefully aborted.
?assertMatch([{_Name, []}], list_committed_blocks(Config)),
%% Restart the bridge.
{ok, {{_, 204, _}, _, _}} = emqx_bridge_v2_testlib:enable_kind_http_api(
Config
),
ct:pal("restarted bridge"),
%% Wait until the delivery is completed.
{ok, _} = ?block_until(#{
?snk_kind := connector_aggreg_delivery_completed, action := AggregId
}),
ct:pal("delivery complete"),
%% Check that delivery contains all the messages.
?assertMatch([{_Name, [_ | _]}], list_committed_blocks(Config)),
[#cloud_blob{name = BlobName}] = list_blobs(Config),
{ok, CSV = [_Header | Rows]} = erl_csv:decode(get_blob(BlobName, Config)),
?assertEqual(
Messages,
[{ClientID, Topic, Payload} || [_TS, ClientID, Topic, Payload | _] <- Rows],
CSV
),
ok
end,
[]
),
ok.

View File

@ -0,0 +1,99 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_tests).
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
take_chunk(Buffer, BlockSize) ->
emqx_bridge_azure_blob_storage_connector:take_chunk(Buffer, BlockSize).
%%------------------------------------------------------------------------------
%% Generators
%%------------------------------------------------------------------------------
iodata_gen() ->
frequency([
{10, []},
{10, binary()},
{10, list(oneof([binary(), byte()]))},
{1, ?SIZED(Size, resize(Size div 3, list(iodata_gen())))}
]).
buffer_gen() ->
?LET(
DataParts,
list(iodata_gen()),
lists:foldl(
fun(DataPart, Buffer) ->
[Buffer, DataPart]
end,
_InitialBuffer = [],
DataParts
)
).
%%------------------------------------------------------------------------------
%% Properties
%%------------------------------------------------------------------------------
%% Verifies that we can take several chunks from the buffer, and that data is not lost nor
%% created.
take_chunk_preserves_data_prop() ->
?FORALL(
{Buffer, BlockSize, Steps},
{buffer_gen(), non_neg_integer(), pos_integer()},
begin
{Chunks, FinalBuffer} =
lists:mapfoldl(
fun(_, Acc) ->
take_chunk(Acc, BlockSize)
end,
Buffer,
lists:seq(1, Steps)
),
%% Original buffer is preserved
BufferBin = iolist_to_binary(Buffer),
ConcatenatedBin = iolist_to_binary([Chunks, FinalBuffer]),
?WHENFAIL(
ct:pal(
"block size: ~b\nsteps: ~b\noriginal buffer:\n ~p\nchunks + final buffer:\n ~p",
[BlockSize, Steps, Buffer, {Chunks, FinalBuffer}]
),
BufferBin =:= ConcatenatedBin
)
end
).
%% Verifies that the produced chunk has at most the requested size.
take_chunk_size_prop() ->
?FORALL(
{Buffer, BlockSize},
{buffer_gen(), non_neg_integer()},
begin
{Chunk, FinalBuffer} = take_chunk(Buffer, BlockSize),
?WHENFAIL(
ct:pal(
"block size: ~b\n\noriginal buffer:\n ~p\nchunk + final buffer:\n ~p"
"\nchunk size: ~b",
[BlockSize, Buffer, {Chunk, FinalBuffer}, iolist_size(Chunk)]
),
iolist_size(Chunk) =< BlockSize
)
end
).
%%------------------------------------------------------------------------------
%% Tests
%%------------------------------------------------------------------------------
take_chunk_test_() ->
Props = [take_chunk_preserves_data_prop(), take_chunk_size_prop()],
Opts = [{numtests, 1000}, {to_file, user}, {max_size, 100}],
{timeout, 300, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}.

View File

@ -418,7 +418,7 @@ format_primitive_type_desc(TypeStr, DescResolver) ->
get_primitive_typespec(TypeStr) -> get_primitive_typespec(TypeStr) ->
emqx_conf_schema_types:readable_docgen(?MODULE, TypeStr). emqx_conf_schema_types:readable_docgen(?MODULE, TypeStr).
%% All types should have a namespace to avlid name clashing. %% All types should have a namespace to avoid name clashing.
is_missing_namespace(ShortName, FullName, RootNames) -> is_missing_namespace(ShortName, FullName, RootNames) ->
case lists:member(ShortName, RootNames) of case lists:member(ShortName, RootNames) of
true -> true ->

View File

@ -107,7 +107,8 @@ hard_coded_connector_info_modules_ee() ->
emqx_bridge_pulsar_connector_info, emqx_bridge_pulsar_connector_info,
emqx_bridge_tdengine_connector_info, emqx_bridge_tdengine_connector_info,
emqx_bridge_rabbitmq_connector_info, emqx_bridge_rabbitmq_connector_info,
emqx_bridge_s3_connector_info emqx_bridge_s3_connector_info,
emqx_bridge_azure_blob_storage_connector_info
]. ].
-else. -else.
hard_coded_connector_info_modules_ee() -> hard_coded_connector_info_modules_ee() ->

View File

@ -24,6 +24,8 @@
format_status/2 format_status/2
]). ]).
-export_type([buffer_map/0]).
-record(delivery, { -record(delivery, {
id :: id(), id :: id(),
callback_module :: module(), callback_module :: module(),

View File

@ -117,6 +117,7 @@
emqx_bridge_rabbitmq, emqx_bridge_rabbitmq,
emqx_bridge_azure_event_hub, emqx_bridge_azure_event_hub,
emqx_bridge_s3, emqx_bridge_s3,
emqx_bridge_azure_blob_storage,
emqx_schema_registry, emqx_schema_registry,
emqx_eviction_agent, emqx_eviction_agent,
emqx_node_rebalance, emqx_node_rebalance,

View File

@ -0,0 +1 @@
Implemented Azure Blob Storage data integration.

View File

@ -189,6 +189,7 @@ defmodule EMQXUmbrella.MixProject do
:emqx_license, :emqx_license,
:emqx_s3, :emqx_s3,
:emqx_bridge_s3, :emqx_bridge_s3,
:emqx_bridge_azure_blob_storage,
:emqx_schema_registry, :emqx_schema_registry,
:emqx_schema_validation, :emqx_schema_validation,
:emqx_enterprise, :emqx_enterprise,

View File

@ -104,6 +104,7 @@ is_community_umbrella_app("apps/emqx_bridge_rabbitmq") -> false;
is_community_umbrella_app("apps/emqx_ft") -> false; is_community_umbrella_app("apps/emqx_ft") -> false;
is_community_umbrella_app("apps/emqx_s3") -> false; is_community_umbrella_app("apps/emqx_s3") -> false;
is_community_umbrella_app("apps/emqx_bridge_s3") -> false; is_community_umbrella_app("apps/emqx_bridge_s3") -> false;
is_community_umbrella_app("apps/emqx_bridge_azure_blob_storage") -> false;
is_community_umbrella_app("apps/emqx_schema_registry") -> false; is_community_umbrella_app("apps/emqx_schema_registry") -> false;
is_community_umbrella_app("apps/emqx_enterprise") -> false; is_community_umbrella_app("apps/emqx_enterprise") -> false;
is_community_umbrella_app("apps/emqx_bridge_kinesis") -> false; is_community_umbrella_app("apps/emqx_bridge_kinesis") -> false;

View File

@ -0,0 +1,88 @@
emqx_bridge_azure_blob_storage_action_schema {
azure_blob_storage.label:
"""Upload to Azure Blob Storage"""
azure_blob_storage.desc:
"""Action that takes incoming events and uploads them to the Azure Blob Storage service."""
direct_parameters.label:
"""Direct Azure Blob Storage Upload action parameters"""
direct_parameters.desc:
"""Set of parameters for the upload action. Action supports templates in Azure Blob Storage container name, blob name and blob content."""
direct_container_template.desc:
"""The name of the Azure Blob Storage container name."""
direct_container_template.label:
"""Container Name"""
direct_blob_template.desc:
"""The name of the Azure Blob Storage blob name."""
direct_blob_template.label:
"""Blob Name"""
direct_content_template.label:
"""Azure Blob Storage Blob Content"""
direct_content_template.desc:
"""Content of the Azure Blob Storage blob being uploaded. Supports templates."""
parameters.label:
"""Azure Blob Storage action parameters"""
parameters.desc:
"""Set of parameters for the action."""
aggreg_parameters.label:
"""Azure Blob Storage Aggregated Mode action parameters"""
aggreg_parameters.desc:
"""Set of parameters for the action in aggregated mode."""
direct_mode.label:
"""Direct Azure Blob Storage Upload"""
direct_mode.desc:
"""Enables uploading of events to the Azure Blob Storage service as separate objects."""
aggregated_mode.label:
"""Aggregated Azure Blob Storage Upload"""
aggregated_mode.desc:
"""Enables time-based aggregation of incoming events and uploading them to the Azure Blob Storage service as a single object."""
aggregation.label:
"""Aggregation parameters"""
aggregation.desc:
"""Set of parameters governing the aggregation process."""
aggregation_interval.label:
"""Time interval"""
aggregation_interval.desc:
"""Amount of time events will be aggregated in a single object before uploading."""
aggregation_max_records.label:
"""Maximum number of records"""
aggregation_max_records.desc:
"""Number of records (events) allowed per each aggregated object. Each aggregated upload will contain no more than that number of events, but may contain less.<br/>
If event rate is high enough, there obviously may be more than one aggregated upload during the same time interval. These uploads will have different, but consecutive sequence numbers, which will be a part of Azure Blob Storage blob name."""
aggregated_container_name.label:
"""Container for aggregated events"""
aggregated_container_name.desc:
"""Settings governing the file format of an upload containing aggregated events. Does not support templates."""
aggregated_blob_template.label:
"""Azure Blob Storage blob name template"""
aggregated_blob_template.desc:
"""Template for the Azure Blob Storage blob name of an aggregated upload.<br/>
Template may contain placeholders for the following variables:
<ul>
<li><code>${action}</code>: name of the action (required).<li/>
<li><code>${node}</code>: name of the EMQX node conducting the upload (required).<li/>
<li><code>${datetime.{format}}</code>: date and time when aggregation started, formatted according to the <code>{format}</code> string (required):
<ul>
<li><code>${datetime.rfc3339utc}</code>: RFC3339-formatted date and time in UTC,<li/>
<li><code>${datetime.rfc3339}</code>: RFC3339-formatted date and time in local timezone,<li/>
<li><code>${datetime.unix}</code>: Unix timestamp.<li/>
</ul>
<li/>
<li><code>${datetime_until.{format}}</code>: date and time when aggregation ended, with the same formatting options.<li/>
<li><code>${sequence}</code>: sequence number of the aggregated upload within the same time interval (required).<li/>
</ul>
All other placeholders are considered invalid. Note that placeholders marked as required will be added as a path suffix to the Azure Blob Storage blob name if they are missing from the template."""
}

View File

@ -0,0 +1,16 @@
emqx_bridge_azure_blob_storage_connector_schema {
config_connector.label:
"""Azure Blob Storage Connector Configuration"""
config_connector.desc:
"""Configuration for a connector to Azure Blob Storage service."""
account_name.label:
"""Account Name"""
account_name.desc:
"""Account name for Azure Blob Storage service."""
account_key.label:
"""Account Key"""
account_key.desc:
"""Account key for Azure Blob Storage service."""
}

View File

@ -249,6 +249,9 @@ for dep in ${CT_DEPS}; do
elasticsearch) elasticsearch)
FILES+=( '.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml' ) FILES+=( '.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml' )
;; ;;
azurite)
FILES+=( '.ci/docker-compose-file/docker-compose-azurite.yaml' )
;;
*) *)
echo "unknown_ct_dependency $dep" echo "unknown_ct_dependency $dep"
exit 1 exit 1