Merge pull request #13069 from thalesmg/azure-blob-storage-action-m-20240508

feat: implement azure blob storage action
This commit is contained in:
Thales Macedo Garitezi 2024-06-03 08:56:37 -03:00 committed by GitHub
commit c5547543e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 2692 additions and 43 deletions

View File

@ -0,0 +1,24 @@
version: '3.9'
services:
azurite:
container_name: azurite
image: mcr.microsoft.com/azure-storage/azurite:3.30.0
restart: always
expose:
- "10000"
# ports:
# - "10000:10000"
networks:
- emqx_bridge
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:10000"]
interval: 30s
timeout: 5s
retries: 4
command:
- azurite-blob
- "--blobHost"
- 0.0.0.0
- "-d"
- debug.log

View File

@ -215,5 +215,11 @@
"listen": "0.0.0.0:9200",
"upstream": "elasticsearch:9200",
"enabled": true
},
{
"name": "azurite_plain",
"listen": "0.0.0.0:10000",
"upstream": "azurite:10000",
"enabled": true
}
]

View File

@ -90,6 +90,7 @@
hard_coded_action_info_modules_ee() ->
[
emqx_bridge_azure_event_hub_action_info,
emqx_bridge_azure_blob_storage_action_info,
emqx_bridge_confluent_producer_action_info,
emqx_bridge_dynamo_action_info,
emqx_bridge_gcp_pubsub_consumer_action_info,

View File

@ -162,8 +162,8 @@ do_parse_and_check(RootBin, TypeBin, NameBin, SchemaMod, RawConf) ->
InnerConfigMap.
bridge_id(Config) ->
BridgeType = ?config(bridge_type, Config),
BridgeName = ?config(bridge_name, Config),
BridgeType = get_ct_config_with_fallback(Config, [action_type, bridge_type]),
BridgeName = get_ct_config_with_fallback(Config, [action_name, bridge_name]),
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
ConnectorId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
<<"action:", BridgeId/binary, ":", ConnectorId/binary>>.
@ -536,6 +536,36 @@ list_connectors_http_api() ->
ct:pal("list connectors result:\n ~p", [Res]),
Res.
enable_kind_http_api(Config) ->
do_enable_disable_kind_http_api(enable, Config).
disable_kind_http_api(Config) ->
do_enable_disable_kind_http_api(disable, Config).
do_enable_disable_kind_http_api(Op, Config) ->
#{
kind := Kind,
type := Type,
name := Name
} = get_common_values(Config),
RootBin =
case Kind of
action -> <<"actions">>;
source -> <<"sources">>
end,
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
OpPath =
case Op of
enable -> "true";
disable -> "false"
end,
Path = emqx_mgmt_api_test_util:api_path([RootBin, BridgeId, "enable", OpPath]),
OpStr = emqx_utils_conv:str(Op),
ct:pal(OpStr ++ " action ~p (http v2)", [{Type, Name}]),
Res = request(put, Path, _Params = []),
ct:pal(OpStr ++ " action ~p (http v2) result:\n ~p", [{Type, Name}, Res]),
Res.
update_rule_http(RuleId, Params) ->
Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId]),
ct:pal("update rule ~p:\n ~p", [RuleId, Params]),
@ -585,7 +615,7 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}).
create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
BridgeName = ?config(bridge_name, Config),
BridgeName = get_ct_config_with_fallback(Config, [action_name, bridge_name]),
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>),
Params0 = #{
@ -742,6 +772,85 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
end,
ok.
%% Like `t_sync_query', but we send the message while the connector is
%% `?status_disconnected' and test that, after recovery, the buffered message eventually
%% is sent.
%% * `make_message_fn' is a function that receives the rule topic and returns a
%% `#message{}' record.
%% * `enter_tp_filter' is a function that selects a tracepoint that indicates the point
%% inside the connector's `on_query' callback function before actually trying to push
%% the data.
%% * `error_tp_filter' is a function that selects a tracepoint that indicates the
%% message was attempted to be sent at least once and failed.
%% * `success_tp_filter' is a function that selects a tracepoint that indicates the
%% point where the message was acknowledged as successfully written.
t_sync_query_down(Config, Opts) ->
#{
make_message_fn := MakeMessageFn,
enter_tp_filter := EnterTPFilter,
error_tp_filter := ErrorTPFilter,
success_tp_filter := SuccessTPFilter
} = Opts,
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
{CTTimetrap, _} = ct:get_timetrap_info(),
?check_trace(
#{timetrap => max(0, CTTimetrap - 500)},
begin
#{type := Type} = get_common_values(Config),
?assertMatch({ok, _}, create_bridge_api(Config)),
RuleTopic = emqx_topic:join([<<"test">>, emqx_utils_conv:bin(Type)]),
{ok, _} = create_rule_and_action_http(Type, RuleTopic, Config),
ResourceId = resource_id(Config),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
?force_ordering(
#{?snk_kind := call_query},
#{?snk_kind := cut_connection, ?snk_span := start}
),
%% Note: order of arguments here is reversed compared to `?force_ordering'.
snabbkaffe_nemesis:force_ordering(
EnterTPFilter,
_NEvents = 1,
fun(Event1, Event2) ->
EnterTPFilter(Event1) andalso
?match_event(#{
?snk_kind := cut_connection,
?snk_span := {complete, _}
})(
Event2
)
end
),
spawn_link(fun() ->
?tp_span(
cut_connection,
#{},
emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort)
)
end),
try
{_, {ok, _}} =
snabbkaffe:wait_async_action(
fun() -> spawn(fun() -> emqx:publish(MakeMessageFn(RuleTopic)) end) end,
ErrorTPFilter,
infinity
)
after
emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort)
end,
{ok, _} = snabbkaffe:block_until(SuccessTPFilter, infinity),
ok
end,
[]
),
ok.
%% - `ProduceFn': produces a message in the remote system that shall be consumed. May be
%% a `{function(), integer()}' tuple.
%% - `Tracepoint': marks the end of consumed message processing.
@ -945,7 +1054,7 @@ t_on_get_status(Config, Opts) ->
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
FailureStatus = maps:get(failure_status, Opts, disconnected),
?assertMatch({ok, _}, create_bridge(Config)),
?assertMatch({ok, _}, create_bridge_api(Config)),
ResourceId = resource_id(Config),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.

View File

@ -0,0 +1,94 @@
Business Source License 1.1
Licensor: Hangzhou EMQ Technologies Co., Ltd.
Licensed Work: EMQX Enterprise Edition
The Licensed Work is (c) 2024
Hangzhou EMQ Technologies Co., Ltd.
Additional Use Grant: Students and educators are granted right to copy,
modify, and create derivative work for research
or education.
Change Date: 2028-05-17
Change License: Apache License, Version 2.0
For information about alternative licensing arrangements for the Software,
please contact Licensor: https://www.emqx.com/en/contact
Notice
The Business Source License (this document, or the “License”) is not an Open
Source license. However, the Licensed Work will eventually be made available
under an Open Source License, as stated in this License.
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
“Business Source License” is a trademark of MariaDB Corporation Ab.
-----------------------------------------------------------------------------
Business Source License 1.1
Terms
The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited
production use.
Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.
If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.
All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.
You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.
Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.
This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.
MariaDB hereby grants you permission to use this Licenses text to license
your works, and to refer to it using the trademark “Business Source License”,
as long as you comply with the Covenants of Licensor below.
Covenants of Licensor
In consideration of the right to use this Licenses text and the “Business
Source License” name and trademark, Licensor covenants to MariaDB, and to all
other recipients of the licensed work to be provided by Licensor:
1. To specify as the Change License the GPL Version 2.0 or any later version,
or a license that is compatible with GPL Version 2.0 or a later version,
where “compatible” means that software provided under the Change License can
be included in a program with software provided under GPL Version 2.0 or a
later version. Licensor may specify additional Change Licenses without
limitation.
2. To either: (a) specify an additional grant of rights to use that does not
impose any additional restriction on the right granted in this License, as
the Additional Use Grant; or (b) insert the text “None”.
3. To specify a Change Date.
4. Not to modify this License in any other way.

View File

@ -0,0 +1,16 @@
# EMQX Azure Blob Storage Bridge
This application provides connector and action implementations for the EMQX to integrate with Azure Blob Storage as part of the EMQX data integration pipelines.
Users can leverage [EMQX Rule Engine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html) to create rules that publish message data to Azure Blob Storage.
## Documentation
Refer to [Rules engine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html) for the EMQX rules engine introduction.
## Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).
## License
EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

View File

@ -0,0 +1,2 @@
azurite
toxiproxy

View File

@ -0,0 +1,16 @@
%% -*- mode: erlang; -*-
{erl_opts, [
warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
warnings_as_errors,
debug_info
]}.
{deps, [
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_connector_aggregator, {path, "../../apps/emqx_connector_aggregator"}},
{erlazure, {git, "https://github.com/emqx/erlazure.git", {tag, "0.3.0.2"}}}
]}.

View File

@ -0,0 +1,23 @@
{application, emqx_bridge_azure_blob_storage, [
{description, "EMQX Enterprise Azure Blob Storage Bridge"},
{vsn, "0.1.0"},
{registered, [emqx_bridge_azure_blob_storage_sup]},
{applications, [
kernel,
stdlib,
erlazure,
emqx_resource,
emqx_connector_aggregator
]},
{env, [
{emqx_action_info_modules, [
emqx_bridge_azure_blob_storage_action_info
]},
{emqx_connector_info_modules, [
emqx_bridge_azure_blob_storage_connector_info
]}
]},
{mod, {emqx_bridge_azure_blob_storage_app, []}},
{modules, []},
{links, []}
]}.

View File

@ -0,0 +1,16 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-ifndef(__EMQX_BRIDGE_ABS_HRL__).
-define(__EMQX_BRIDGE_ABS_HRL__, true).
-define(CONNECTOR_TYPE, azure_blob_storage).
-define(CONNECTOR_TYPE_BIN, <<"azure_blob_storage">>).
-define(ACTION_TYPE, azure_blob_storage).
-define(ACTION_TYPE_BIN, <<"azure_blob_storage">>).
-define(AGGREG_SUP, emqx_bridge_azure_blob_storage_sup).
%% END ifndef(__EMQX_BRIDGE_ABS_HRL__)
-endif.

View File

@ -0,0 +1,34 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_action_info).
-behaviour(emqx_action_info).
-include("emqx_bridge_azure_blob_storage.hrl").
%% `emqx_action_info' API
-export([
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% `emqx_action_info' API
%%------------------------------------------------------------------------------
action_type_name() -> ?ACTION_TYPE.
connector_type_name() -> ?CONNECTOR_TYPE.
schema_module() -> emqx_bridge_azure_blob_storage_action_schema.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

View File

@ -0,0 +1,310 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_action_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include("emqx_bridge_azure_blob_storage.hrl").
%% `hocon_schema' API
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%% `emqx_bridge_v2_schema' "unofficial" API
-export([
bridge_v2_examples/1
]).
%% API
-export([]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
"action_azure_blob_storage".
roots() ->
[].
fields(Field) when
Field == "get_bridge_v2";
Field == "put_bridge_v2";
Field == "post_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(?ACTION_TYPE));
fields(action) ->
{?ACTION_TYPE,
mk(
hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION_TYPE)),
#{
desc => <<"Azure Blob Storage Action Config">>,
required => false
}
)};
fields(?ACTION_TYPE) ->
emqx_bridge_v2_schema:make_producer_action_schema(
mk(
mkunion(mode, #{
<<"direct">> => ref(direct_parameters),
<<"aggregated">> => ref(aggreg_parameters)
}),
#{
required => true,
desc => ?DESC("parameters")
}
),
#{
resource_opts_ref => ref(action_resource_opts)
}
);
fields(direct_parameters) ->
[
{mode, mk(direct, #{required => true, desc => ?DESC("direct_mode")})},
%% Container in the Azure Blob Storage domain, not aggregation.
{container,
mk(
emqx_schema:template_str(),
#{
desc => ?DESC("direct_container_template"),
required => true
}
)},
{blob,
mk(
emqx_schema:template_str(),
#{
desc => ?DESC("direct_blob_template"),
required => true
}
)},
{content,
mk(
emqx_schema:template(),
#{
required => false,
default => <<"${.}">>,
desc => ?DESC("direct_content_template")
}
)}
| fields(common_action_parameters)
];
fields(aggreg_parameters) ->
[
{mode, mk(aggregated, #{required => true, desc => ?DESC("aggregated_mode")})},
{aggregation, mk(ref(aggregation), #{required => true, desc => ?DESC("aggregation")})},
%% Container in the Azure Blob Storage domain, not aggregation.
{container,
mk(
string(),
#{
desc => ?DESC("aggregated_container_name"),
required => true
}
)},
{blob,
mk(
emqx_schema:template_str(),
#{
desc => ?DESC("aggregated_blob_template"),
required => true
}
)},
{min_block_size,
mk(
emqx_schema:bytesize(),
#{
default => <<"10mb">>,
importance => ?IMPORTANCE_HIDDEN,
required => true,
validator => fun block_size_validator/1
}
)}
| fields(common_action_parameters)
];
fields(aggregation) ->
[
emqx_connector_aggregator_schema:container(),
%% TODO: Needs bucketing? (e.g. messages falling in this 1h interval)
{time_interval,
hoconsc:mk(
emqx_schema:duration_s(),
#{
required => false,
default => <<"1h">>,
desc => ?DESC("aggregation_interval")
}
)},
{max_records,
hoconsc:mk(
pos_integer(),
#{
required => false,
default => 1_000_000,
desc => ?DESC("aggregation_max_records")
}
)}
];
fields(common_action_parameters) ->
[
{max_block_size,
mk(
emqx_schema:bytesize(),
#{
default => <<"4000mb">>,
importance => ?IMPORTANCE_HIDDEN,
desc => ?DESC("max_block_size"),
required => true,
validator => fun block_size_validator/1
}
)}
];
fields(action_resource_opts) ->
%% NOTE: This action should benefit from generous batching defaults.
emqx_bridge_v2_schema:action_resource_opts_fields([
{batch_size, #{default => 100}},
{batch_time, #{default => <<"10ms">>}}
]).
desc(Name) when
Name =:= ?ACTION_TYPE;
Name =:= aggregation;
Name =:= aggreg_parameters;
Name =:= direct_parameters
->
?DESC(Name);
desc(action_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_Name) ->
undefined.
%%-------------------------------------------------------------------------------------------------
%% `emqx_bridge_v2_schema' "unofficial" API
%%-------------------------------------------------------------------------------------------------
bridge_v2_examples(Method) ->
[
#{
<<"aggregated_", ?ACTION_TYPE_BIN/binary>> => #{
summary => <<"Azure Blob Storage Aggregated Upload Action">>,
value => action_example(Method, aggregated)
},
<<"direct_", ?ACTION_TYPE_BIN/binary>> => #{
summary => <<"Azure Blob Storage Direct Upload Action">>,
value => action_example(Method, direct)
}
}
].
action_example(post, Mode) ->
maps:merge(
action_example(put, Mode),
#{
type => ?ACTION_TYPE_BIN,
name => <<"my_action">>
}
);
action_example(get, Mode) ->
maps:merge(
action_example(put, Mode),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
action_example(put, aggregated) ->
#{
enable => true,
description => <<"my action">>,
connector => <<"my_connector">>,
parameters =>
#{
mode => <<"aggregated">>,
aggregation => #{
container => #{
type => <<"csv">>,
column_order => [<<"a">>, <<"b">>]
},
time_interval => <<"4s">>,
max_records => 10_000
},
container => <<"mycontainer">>,
blob => <<"${action}/${node}/${datetime.rfc3339}/${sequence}">>
},
resource_opts =>
#{
batch_time => <<"10ms">>,
batch_size => 100,
health_check_interval => <<"30s">>,
inflight_window => 100,
query_mode => <<"sync">>,
request_ttl => <<"45s">>,
worker_pool_size => 16
}
};
action_example(put, direct) ->
#{
enable => true,
description => <<"my action">>,
connector => <<"my_connector">>,
parameters =>
#{
mode => <<"direct">>,
container => <<"${.payload.container}">>,
blob => <<"${.payload.blob}">>,
content => <<"${.payload}">>
},
resource_opts =>
#{
batch_time => <<"0ms">>,
batch_size => 1,
health_check_interval => <<"30s">>,
inflight_window => 100,
query_mode => <<"sync">>,
request_ttl => <<"45s">>,
worker_pool_size => 16
}
}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
ref(Name) -> hoconsc:ref(?MODULE, Name).
mk(Type, Meta) -> hoconsc:mk(Type, Meta).
mkunion(Field, Schemas) ->
hoconsc:union(fun(Arg) -> scunion(Field, Schemas, Arg) end).
scunion(_Field, Schemas, all_union_members) ->
maps:values(Schemas);
scunion(Field, Schemas, {value, Value}) ->
Selector = maps:get(emqx_utils_conv:bin(Field), Value, undefined),
case Selector == undefined orelse maps:find(emqx_utils_conv:bin(Selector), Schemas) of
{ok, Schema} ->
[Schema];
_Error ->
throw(#{field_name => Field, expected => maps:keys(Schemas)})
end.
block_size_validator(SizeLimit) ->
case SizeLimit =< 4_000 * 1024 * 1024 of
true -> ok;
false -> {error, "must be less than 4000 MiB"}
end.

View File

@ -0,0 +1,28 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_app).
-behaviour(application).
%% `application' API
-export([start/2, stop/1]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% `application' API
%%------------------------------------------------------------------------------
start(_StartType, _StartArgs) ->
emqx_bridge_azure_blob_storage_sup:start_link().
stop(_State) ->
ok.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

View File

@ -0,0 +1,742 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_connector).
-behaviour(emqx_resource).
-behaviour(emqx_connector_aggreg_delivery).
-behaviour(emqx_template).
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("emqx_connector_aggregator/include/emqx_connector_aggregator.hrl").
-include_lib("emqx/include/emqx_trace.hrl").
-include("emqx_bridge_azure_blob_storage.hrl").
%% `emqx_resource' API
-export([
callback_mode/0,
on_start/2,
on_stop/2,
on_get_status/2,
on_get_channels/1,
on_add_channel/4,
on_remove_channel/3,
on_get_channel_status/3,
on_query/3,
on_batch_query/3
]).
%% `ecpool_worker' API
-export([
connect/1,
do_create_append_blob/3,
do_create_block_blob/3,
do_append_data/5,
do_put_block_list/4,
do_put_block_blob/4,
do_health_check/1,
do_list_blobs/2
]).
%% `emqx_connector_aggreg_delivery' API
-export([
init_transfer_state/2,
process_append/2,
process_write/1,
process_complete/1
]).
%% `emqx_template' API
-export([lookup/2]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-type container() :: string().
-type blob() :: string().
-type connector_config() :: #{
endpoint => string(),
account_name := string(),
account_key := emqx_secret:t(string()),
resource_opts := map(),
any() => term()
}.
-type connector_state() :: #{
pool_name := connector_resource_id(),
installed_actions := #{action_resource_id() => action_state()}
}.
-type action_config() :: direct_action_config() | aggreg_action_config().
-type direct_action_config() :: #{
parameters := #{
mode := direct,
container := template_str(),
blob := template_str(),
content := template_str(),
max_block_size := pos_integer()
}
}.
-type aggreg_action_config() :: #{
parameters := #{
mode := aggregated,
aggregation := #{
%% TODO: other containers
container := #{type := csv},
time_interval := pos_integer(),
max_records := pos_integer()
},
container := string(),
blob := template_str(),
max_block_size := pos_integer(),
min_block_size := pos_integer()
},
any() => term()
}.
-type template_str() :: unicode:chardata().
-type action_state() :: direct_action_state() | aggreg_action_state().
-type direct_action_state() :: #{
mode := direct,
container := emqx_template:t(),
blob := emqx_template:t(),
content := emqx_template:t(),
max_block_size := pos_integer()
}.
-type aggreg_action_state() :: #{
mode := aggregated,
name := binary(),
container := string(),
aggreg_id := aggreg_id(),
supervisor := pid(),
on_stop := {module(), atom(), [term()]}
}.
-type aggreg_id() :: {binary(), binary()}.
-type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}.
-type pool_name() :: connector_resource_id().
-type transfer_opts() :: #{
upload_options := #{
action := binary(),
blob := emqx_template:t(),
container := string(),
min_block_size := pos_integer(),
max_block_size := pos_integer(),
pool := connector_resource_id()
}
}.
-type transfer_buffer() :: iolist().
-type transfer_state() :: #{
blob := blob(),
buffer := transfer_buffer(),
buffer_size := non_neg_integer(),
container := container(),
max_block_size := pos_integer(),
min_block_size := pos_integer(),
next_block := queue:queue(iolist()),
num_blocks := non_neg_integer(),
pool := pool_name(),
started := boolean()
}.
%%------------------------------------------------------------------------------
%% `emqx_resource' API
%%------------------------------------------------------------------------------
-spec callback_mode() -> callback_mode().
callback_mode() ->
always_sync.
-spec on_start(connector_resource_id(), connector_config()) ->
{ok, connector_state()} | {error, _Reason}.
on_start(ConnResId, ConnConfig) ->
#{
account_name := AccountName,
account_key := AccountKey
} = ConnConfig,
Endpoint = maps:get(endpoint, ConnConfig, undefined),
ClientOpts = [
{account_name, AccountName},
{account_key, AccountKey},
{endpoint, Endpoint}
],
case emqx_resource_pool:start(ConnResId, ?MODULE, ClientOpts) of
ok ->
State = #{
pool_name => ConnResId,
installed_actions => #{}
},
{ok, State};
{error, Reason} ->
{error, Reason}
end.
-spec on_stop(connector_resource_id(), connector_state()) -> ok.
on_stop(ConnResId, _ConnState) ->
Res = emqx_resource_pool:stop(ConnResId),
?tp(azure_blob_storage_stop, #{instance_id => ConnResId}),
Res.
-spec on_get_status(connector_resource_id(), connector_state()) ->
?status_connected | ?status_disconnected.
on_get_status(ConnResId, _ConnState) ->
health_check(ConnResId).
-spec on_add_channel(
connector_resource_id(),
connector_state(),
action_resource_id(),
action_config()
) ->
{ok, connector_state()}.
on_add_channel(_ConnResId, ConnState0, ActionResId, ActionConfig) ->
ActionState = install_action(ActionConfig, ConnState0),
ConnState = emqx_utils_maps:deep_put([installed_actions, ActionResId], ConnState0, ActionState),
{ok, ConnState}.
-spec on_remove_channel(
connector_resource_id(),
connector_state(),
action_resource_id()
) ->
{ok, connector_state()}.
on_remove_channel(_ConnResId, ConnState0, ActionResId) ->
#{installed_actions := InstalledActions0} = ConnState0,
case maps:take(ActionResId, InstalledActions0) of
{ActionState, InstalledActions} ->
ok = stop_action(ActionState),
ConnState = ConnState0#{installed_actions := InstalledActions},
{ok, ConnState};
error ->
{ok, ConnState0}
end.
-spec on_get_channels(connector_resource_id()) ->
[{action_resource_id(), action_config()}].
on_get_channels(ConnResId) ->
emqx_bridge_v2:get_channels_for_connector(ConnResId).
-spec on_get_channel_status(
connector_resource_id(),
action_resource_id(),
connector_state()
) ->
?status_connected | ?status_disconnected.
on_get_channel_status(
ConnResId,
ActionResId,
_ConnectorState = #{installed_actions := InstalledActions}
) when is_map_key(ActionResId, InstalledActions) ->
#{ActionResId := ActionConfig} = InstalledActions,
channel_status(ActionConfig, ConnResId);
on_get_channel_status(_ConnResId, _ActionResId, _ConnState) ->
?status_disconnected.
-spec on_query(connector_resource_id(), query(), connector_state()) ->
{ok, _Result} | {error, _Reason}.
on_query(ConnResId, {Tag, Data}, #{installed_actions := InstalledActions}) ->
case maps:get(Tag, InstalledActions, undefined) of
ChannelState = #{mode := direct} ->
?tp(azure_blob_storage_bridge_on_query_enter, #{mode => direct}),
run_direct_transfer(Data, ConnResId, Tag, ChannelState);
ChannelState = #{mode := aggregated} ->
?tp(azure_blob_storage_bridge_on_query_enter, #{mode => aggregated}),
run_aggregated_transfer([Data], ChannelState);
undefined ->
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
end.
-spec on_batch_query(connector_resource_id(), [query()], connector_state()) ->
{ok, _Result} | {error, _Reason}.
on_batch_query(_ConnResId, [{Tag, Data0} | Rest], #{installed_actions := InstalledActions}) ->
case maps:get(Tag, InstalledActions, undefined) of
ActionState = #{mode := aggregated} ->
Records = [Data0 | [Data || {_, Data} <- Rest]],
run_aggregated_transfer(Records, ActionState);
undefined ->
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
end.
%%------------------------------------------------------------------------------
%% `ecpool_worker' API
%%------------------------------------------------------------------------------
connect(Opts0) ->
#{
account_name := AccountName,
account_key := AccountKey,
endpoint := Endpoint
} = maps:from_list(Opts0),
erlazure:start(#{account => AccountName, key => AccountKey, endpoint => Endpoint}).
do_create_append_blob(Worker, Container, Blob) ->
%% TODO: check container type before setting content type
Opts = [{content_type, "text/csv"}],
erlazure:put_append_blob(Worker, Container, Blob, Opts, infinity).
create_block_blob(Pool, Container, Blob) ->
ecpool:pick_and_do(Pool, {?MODULE, do_create_block_blob, [Container, Blob]}, no_handover).
do_create_block_blob(Worker, Container, Blob) ->
%% TODO: check container type before setting content type
Opts = [{content_type, "text/csv"}],
erlazure:put_block_blob(Worker, Container, Blob, <<>>, Opts, infinity).
append_data(Pool, Container, Blob, BlockId, IOData) ->
ecpool:pick_and_do(
Pool, {?MODULE, do_append_data, [Container, Blob, BlockId, IOData]}, no_handover
).
do_append_data(Worker, Container, Blob, BlockId, IOData) ->
erlazure:put_block(Worker, Container, Blob, BlockId, IOData, [], infinity).
put_block_list(Pool, Container, Blob, BlockRefs) ->
ecpool:pick_and_do(
Pool, {?MODULE, do_put_block_list, [Container, Blob, BlockRefs]}, no_handover
).
do_put_block_list(Worker, Container, Blob, BlockRefs) ->
%% TODO: check container type before setting content type
Opts = [{req_opts, [{headers, [{"x-ms-blob-content-type", "text/csv"}]}]}],
erlazure:put_block_list(Worker, Container, Blob, BlockRefs, Opts, infinity).
put_block_blob(Pool, Container, Blob, IOData) ->
ecpool:pick_and_do(Pool, {?MODULE, do_put_block_blob, [Container, Blob, IOData]}, no_handover).
do_put_block_blob(Worker, Container, Blob, IOData) ->
erlazure:put_block_blob(Worker, Container, Blob, IOData, [], infinity).
do_health_check(Worker) ->
case erlazure:list_containers(Worker, [], infinity) of
{error, _} ->
error;
{L, _} when is_list(L) ->
ok
end.
list_blobs(Pool, Container) ->
ecpool:pick_and_do(Pool, {?MODULE, do_list_blobs, [Container]}, no_handover).
do_list_blobs(Worker, Container) ->
case erlazure:list_blobs(Worker, Container, [], infinity) of
{error, _} ->
error;
{L, _} when is_list(L) ->
ok
end.
%%------------------------------------------------------------------------------
%% `emqx_connector_aggreg_delivery' API
%%------------------------------------------------------------------------------
-spec init_transfer_state(buffer(), transfer_opts()) ->
transfer_state().
init_transfer_state(Buffer, Opts) ->
#{
upload_options := #{
action := ActionName,
blob := BlobTemplate,
container := Container,
max_block_size := MaxBlockSize,
min_block_size := MinBlockSize,
pool := Pool
}
} = Opts,
Blob = mk_blob_name_key(Buffer, ActionName, BlobTemplate),
#{
blob => Blob,
buffer => [],
buffer_size => 0,
container => Container,
max_block_size => MaxBlockSize,
min_block_size => MinBlockSize,
next_block => queue:new(),
num_blocks => 0,
pool => Pool,
started => false
}.
mk_blob_name_key(Buffer, ActionName, BlobTemplate) ->
emqx_template:render_strict(BlobTemplate, {?MODULE, {ActionName, Buffer}}).
-spec process_append(iodata(), transfer_state()) ->
transfer_state().
process_append(IOData, TransferState0) ->
#{
buffer := Buffer,
buffer_size := BufferSize0,
min_block_size := MinBlockSize,
next_block := NextBlock0
} = TransferState0,
Size = iolist_size(IOData),
case Size + BufferSize0 >= MinBlockSize of
true ->
%% Block is ready to be written.
TransferState0#{
buffer := [],
buffer_size := 0,
next_block := queue:in([Buffer, IOData], NextBlock0)
};
false ->
TransferState0#{
buffer := [Buffer, IOData],
buffer_size := BufferSize0 + Size
}
end.
-spec process_write(transfer_state()) ->
{ok, transfer_state()} | {error, term()}.
process_write(TransferState0 = #{started := false}) ->
#{
pool := Pool,
blob := Blob,
container := Container
} = TransferState0,
%% TODO
%% Possible optimization: if the whole buffer fits the 5000 MiB `put_block_blob'
%% limit, we could upload the whole thing here.
case create_block_blob(Pool, Container, Blob) of
{ok, _} ->
TransferState = TransferState0#{started := true},
process_write(TransferState);
{error, Reason} ->
{error, Reason}
end;
process_write(TransferState0 = #{started := true}) ->
#{
next_block := NextBlock0
} = TransferState0,
case queue:out(NextBlock0) of
{{value, Block}, NextBlock} ->
?tp(azure_blob_storage_will_write_chunk, #{}),
do_process_write(Block, TransferState0#{next_block := NextBlock});
{empty, _} ->
{ok, TransferState0}
end.
do_process_write(IOData, TransferState0 = #{started := true}) ->
#{
blob := Blob,
container := Container,
num_blocks := NumBlocks,
pool := Pool
} = TransferState0,
case append_data(Pool, Container, Blob, block_id(NumBlocks), IOData) of
{ok, _} ->
TransferState = TransferState0#{num_blocks := NumBlocks + 1},
process_write(TransferState);
{error, Reason} ->
{error, Reason}
end.
-spec process_complete(transfer_state()) ->
{ok, term()}.
process_complete(TransferState) ->
#{
blob := Blob,
buffer := Buffer,
buffer_size := BufferSize,
container := Container,
num_blocks := NumBlocks0,
pool := Pool
} = TransferState,
%% Flush any left-over data
NumBlocks =
case BufferSize > 0 of
true ->
{ok, #{num_blocks := NumBlocks1}} = do_process_write(Buffer, TransferState),
NumBlocks1;
false ->
NumBlocks0
end,
BlockRefs = [{block_id(N), latest} || N <- lists:seq(0, NumBlocks - 1)],
case put_block_list(Pool, Container, Blob, BlockRefs) of
{ok, _} ->
{ok, #{num_blocks => NumBlocks}};
{error, Reason} ->
exit({upload_failed, Reason})
end.
%%------------------------------------------------------------------------------
%% `emqx_template' API
%%------------------------------------------------------------------------------
-spec lookup(emqx_template:accessor(), {_Name, buffer()}) ->
{ok, integer() | string()} | {error, undefined}.
lookup([<<"action">>], {ActionName, _Buffer}) ->
{ok, mk_fs_safe_string(ActionName)};
lookup([<<"node">>], {_ActionName, _Buffer}) ->
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
lookup(Accessor, {_ActionName, Buffer}) ->
lookup_buffer_var(Accessor, Buffer);
lookup(_Accessor, _Context) ->
{error, undefined}.
lookup_buffer_var(Accessor, Buffer) ->
case emqx_connector_aggreg_buffer_ctx:lookup(Accessor, Buffer) of
{ok, String} when is_list(String) ->
{ok, mk_fs_safe_string(String)};
{ok, Value} ->
{ok, Value};
{error, Reason} ->
{error, Reason}
end.
mk_fs_safe_string(String) ->
unicode:characters_to_binary(string:replace(String, ":", "_", all)).
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
-spec install_action(action_config(), connector_state()) -> action_state().
install_action(#{parameters := #{mode := direct}} = ActionConfig, _ConnState) ->
#{
parameters := #{
mode := Mode = direct,
container := ContainerTemplateStr,
blob := BlobTemplateStr,
content := ContentTemplateStr,
max_block_size := MaxBlockSize
}
} = ActionConfig,
ContainerTemplate = emqx_template:parse(ContainerTemplateStr),
BlobTemplate = emqx_template:parse(BlobTemplateStr),
ContentTemplate = emqx_template:parse(ContentTemplateStr),
#{
mode => Mode,
container => ContainerTemplate,
blob => BlobTemplate,
content => ContentTemplate,
max_block_size => MaxBlockSize
};
install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState) ->
#{pool_name := Pool} = ConnState,
#{
bridge_name := Name,
parameters := #{
mode := Mode = aggregated,
aggregation := #{
container := ContainerOpts,
max_records := MaxRecords,
time_interval := TimeInterval
},
container := ContainerName,
blob := BlobTemplateStr,
max_block_size := MaxBlockSize,
min_block_size := MinBlockSize
}
} = ActionConfig,
Type = ?ACTION_TYPE_BIN,
AggregId = {Type, Name},
Blob = mk_blob_name_template(BlobTemplateStr),
AggregOpts = #{
max_records => MaxRecords,
time_interval => TimeInterval,
work_dir => work_dir(Type, Name)
},
TransferOpts = #{
action => Name,
blob => Blob,
container => ContainerName,
max_block_size => MaxBlockSize,
min_block_size => MinBlockSize,
pool => Pool
},
DeliveryOpts = #{
callback_module => ?MODULE,
container => ContainerOpts,
upload_options => TransferOpts
},
_ = ?AGGREG_SUP:delete_child(AggregId),
{ok, SupPid} = ?AGGREG_SUP:start_child(#{
id => AggregId,
start =>
{emqx_connector_aggreg_upload_sup, start_link, [AggregId, AggregOpts, DeliveryOpts]},
type => supervisor,
restart => permanent
}),
#{
mode => Mode,
name => Name,
container => ContainerName,
aggreg_id => AggregId,
supervisor => SupPid,
on_stop => {?AGGREG_SUP, delete_child, [AggregId]}
}.
-spec stop_action(action_config()) -> ok | {error, any()}.
stop_action(#{on_stop := {M, F, A}}) ->
apply(M, F, A);
stop_action(_) ->
ok.
run_direct_transfer(Data, ConnResId, ActionResId, ActionState) ->
#{
container := ContainerTemplate,
blob := BlobTemplate,
content := ContentTemplate,
max_block_size := MaxBlockSize
} = ActionState,
Container = render_container(ContainerTemplate, Data),
Blob = render_blob(BlobTemplate, Data),
Content = render_content(ContentTemplate, Data),
emqx_trace:rendered_action_template(ActionResId, #{
container => Container,
blob => Blob,
content => #emqx_trace_format_func_data{
function = fun unicode:characters_to_binary/1,
data = Content
}
}),
case iolist_size(Content) > MaxBlockSize of
true ->
error({unrecoverable_error, payload_too_large});
false ->
ok
end,
case put_block_blob(ConnResId, Container, Blob, Content) of
{ok, created} ->
?tp(azure_blob_storage_bridge_connector_upload_ok, #{instance_id => ConnResId}),
ok;
{error, Reason} ->
?tp(
azure_blob_storage_bridge_direct_upload_error,
#{instance_id => ConnResId, reason => Reason}
),
{error, map_error(Reason)}
end.
run_aggregated_transfer(Records, #{aggreg_id := AggregId}) ->
Timestamp = erlang:system_time(second),
case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of
ok ->
ok;
{error, Reason} ->
{error, {unrecoverable_error, Reason}}
end.
work_dir(Type, Name) ->
filename:join([emqx:data_dir(), bridge, Type, Name]).
-spec mk_blob_name_template(template_str()) -> emqx_template:str().
mk_blob_name_template(TemplateStr) ->
Template = emqx_template:parse(TemplateStr),
{_, BindingErrors} = emqx_template:render(Template, #{}),
{UsedBindings, _} = lists:unzip(BindingErrors),
SuffixTemplate = mk_suffix_template(UsedBindings),
case emqx_template:is_const(SuffixTemplate) of
true ->
Template;
false ->
Template ++ SuffixTemplate
end.
mk_suffix_template(UsedBindings) ->
RequiredBindings = ["action", "node", "datetime.", "sequence"],
SuffixBindings = [
mk_default_binding(RB)
|| RB <- RequiredBindings,
lists:all(fun(UB) -> string:prefix(UB, RB) == nomatch end, UsedBindings)
],
SuffixTemplate = [["/", B] || B <- SuffixBindings],
emqx_template:parse(SuffixTemplate).
mk_default_binding("datetime.") ->
"${datetime.rfc3339utc}";
mk_default_binding(Binding) ->
"${" ++ Binding ++ "}".
render_container(Template, Data) ->
case emqx_template:render(Template, {emqx_jsonish, Data}) of
{Result, []} ->
iolist_to_string(Result);
{_, Errors} ->
error({unrecoverable_error, {container_undefined, Errors}})
end.
render_blob(Template, Data) ->
%% NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`.
{Result, _Errors} = emqx_template:render(Template, {emqx_jsonish, Data}),
iolist_to_string(Result).
render_content(Template, Data) ->
%% NOTE: ignoring errors here, missing variables will be rendered as `"undefined"`.
{Result, _Errors} = emqx_template:render(Template, {emqx_jsonish, Data}),
Result.
iolist_to_string(IOList) ->
unicode:characters_to_list(IOList).
channel_status(#{mode := direct}, _ConnResId) ->
%% There's nothing in particular to check for in this mode; the connector health check
%% already verifies that we're able to use the client to list containers.
?status_connected;
channel_status(#{mode := aggregated} = ActionState, ConnResId) ->
#{container := Container, aggreg_id := AggregId} = ActionState,
%% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
Timestamp = erlang:system_time(second),
ok = emqx_connector_aggregator:tick(AggregId, Timestamp),
ok = check_container_accessible(ConnResId, Container),
ok = check_aggreg_upload_errors(AggregId),
?status_connected.
health_check(ConnResId) ->
case
emqx_resource_pool:health_check_workers(
ConnResId,
fun ?MODULE:do_health_check/1,
emqx_resource_pool:health_check_timeout(),
#{return_values => true}
)
of
{ok, []} ->
?status_disconnected;
{ok, Values} ->
AllOk = lists:all(fun(S) -> S =:= ok end, Values),
case AllOk of
true ->
?status_connected;
false ->
?status_disconnected
end;
{error, _} ->
?status_disconnected
end.
map_error({failed_connect, _} = Reason) ->
{recoverable_error, Reason};
map_error(Reason) ->
{unrecoverable_error, Reason}.
check_aggreg_upload_errors(AggregId) ->
case emqx_connector_aggregator:take_error(AggregId) of
[Error] ->
%% TODO
%% This approach means that, for example, 3 upload failures will cause
%% the channel to be marked as unhealthy for 3 consecutive health checks.
ErrorMessage = emqx_utils:format(Error),
throw({unhealthy_target, ErrorMessage});
[] ->
ok
end.
check_container_accessible(Pool, Container) ->
list_blobs(Pool, Container).
block_id(N) ->
NumDigits = 32,
list_to_binary(string:pad(integer_to_list(N), NumDigits, leading, $0)).

View File

@ -0,0 +1,73 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_connector_info).
-behaviour(emqx_connector_info).
-include("emqx_bridge_azure_blob_storage.hrl").
%% `emqx_connector_info' API
-export([
type_name/0,
bridge_types/0,
resource_callback_module/0,
config_schema/0,
schema_module/0,
api_schema/1
]).
%% API
-export([]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-define(SCHEMA_MOD, emqx_bridge_azure_blob_storage_connector_schema).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% `emqx_connector_info' API
%%------------------------------------------------------------------------------
type_name() ->
?CONNECTOR_TYPE.
bridge_types() ->
[?ACTION_TYPE].
resource_callback_module() ->
emqx_bridge_azure_blob_storage_connector.
config_schema() ->
{?CONNECTOR_TYPE,
hoconsc:mk(
hoconsc:map(
name,
hoconsc:ref(
?SCHEMA_MOD,
"config_connector"
)
),
#{
desc => <<"Azure Blob Storage Connector Config">>,
required => false
}
)}.
schema_module() ->
?SCHEMA_MOD.
api_schema(Method) ->
emqx_connector_schema:api_ref(
?SCHEMA_MOD, ?CONNECTOR_TYPE_BIN, Method ++ "_connector"
).
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

View File

@ -0,0 +1,142 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_connector_schema).
-behaviour(hocon_schema).
-behaviour(emqx_connector_examples).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
%% `hocon_schema' API
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%% `emqx_connector_examples' API
-export([
connector_examples/1
]).
%% API
-export([]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-define(CONNECTOR_TYPE, azure_blob_storage).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
"connector_azure_blob_storage".
roots() ->
[].
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, fields(connector_config));
fields("config_connector") ->
emqx_connector_schema:common_fields() ++ fields(connector_config);
fields(connector_config) ->
[
{account_name,
mk(string(), #{
required => true,
desc => ?DESC("account_name")
})},
{account_key,
emqx_schema_secret:mk(
#{
desc => ?DESC("account_key")
}
)},
{endpoint,
mk(
string(),
#{
required => false,
importance => ?IMPORTANCE_HIDDEN
}
)}
] ++
emqx_connector_schema:resource_opts_ref(?MODULE, resource_opts);
fields(resource_opts) ->
emqx_connector_schema:resource_opts_fields().
desc("config_connector") ->
?DESC("config_connector");
desc(resource_opts) ->
?DESC(emqx_resource_schema, resource_opts);
desc(_Name) ->
undefined.
%%-------------------------------------------------------------------------------------------------
%% `emqx_connector_examples' API
%%-------------------------------------------------------------------------------------------------
connector_examples(Method) ->
[
#{
<<"abs">> => #{
summary => <<"Azure Blob Storage Connector">>,
value => connector_example(Method)
}
}
].
connector_example(get) ->
maps:merge(
connector_example(put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
connector_example(post) ->
maps:merge(
connector_example(put),
#{
type => atom_to_binary(?CONNECTOR_TYPE),
name => <<"my_connector">>
}
);
connector_example(put) ->
#{
enable => true,
description => <<"My connector">>,
account_name => <<"my_account_name">>,
account_key => <<"******">>,
resource_opts => #{
health_check_interval => <<"45s">>,
start_after_created => true,
start_timeout => <<"5s">>
}
}.
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
mk(Type, Meta) -> hoconsc:mk(Type, Meta).

View File

@ -0,0 +1,54 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_sup).
%% API
-export([
start_link/0,
start_child/1,
delete_child/1
]).
%% `supervisor' API
-export([init/1]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(ChildSpec) ->
supervisor:start_child(?MODULE, ChildSpec).
delete_child(ChildId) ->
case supervisor:terminate_child(?MODULE, ChildId) of
ok ->
supervisor:delete_child(?MODULE, ChildId);
Error ->
Error
end.
%%------------------------------------------------------------------------------
%% `supervisor' API
%%------------------------------------------------------------------------------
init([]) ->
SupFlags = #{
strategy => one_for_one,
intensity => 1,
period => 1
},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

View File

@ -0,0 +1,708 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_azure_blob_storage_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_common_test_helpers, [on_exit/1]).
-import(emqx_utils_conv, [bin/1, str/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("erlazure/include/erlazure.hrl").
-include("emqx_bridge_azure_blob_storage.hrl").
-define(ACCOUNT_NAME_BIN, <<"devstoreaccount1">>).
-define(ACCOUNT_KEY_BIN, <<
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsu"
"Fq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
>>).
-define(CONF_MAX_RECORDS, 100).
-define(CONF_COLUMN_ORDER, ?CONF_COLUMN_ORDER([])).
-define(CONF_COLUMN_ORDER(T), [
<<"publish_received_at">>,
<<"clientid">>,
<<"topic">>,
<<"payload">>,
<<"empty">>
| T
]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Endpoint = os:getenv("AZURITE_ENDPOINT", "http://toxiproxy:10000/"),
#{host := Host, port := Port} = uri_string:parse(Endpoint),
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
ProxyName = "azurite_plain",
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
true ->
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_bridge_azure_blob_storage,
emqx_bridge,
emqx_rule_engine,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _Api} = emqx_common_test_http:create_default_app(),
[
{apps, Apps},
{proxy_name, ProxyName},
{proxy_host, ProxyHost},
{proxy_port, ProxyPort},
{endpoint, Endpoint}
| Config
];
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_azurite);
_ ->
{skip, no_azurite}
end
end.
end_per_suite(Config) ->
Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(TestCase, Config0) ->
ct:timetrap(timer:seconds(31)),
Endpoint = ?config(endpoint, Config0),
UniqueNum = integer_to_binary(erlang:unique_integer()),
Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>,
ConnectorConfig = connector_config(Name, Endpoint),
ContainerName = container_name(Name),
%% TODO: switch based on test
ActionConfig =
case lists:member(TestCase, direct_action_cases()) of
true ->
direct_action_config(#{
connector => Name,
parameters => #{container => ContainerName}
});
false ->
aggreg_action_config(#{
connector => Name,
parameters => #{container => ContainerName}
})
end,
Client = start_control_client(Endpoint),
ct:pal("container name: ~s", [ContainerName]),
ok = ensure_new_container(ContainerName, Client),
Config = [
{bridge_kind, action},
{action_type, ?ACTION_TYPE_BIN},
{action_name, Name},
{action_config, ActionConfig},
{connector_name, Name},
{connector_type, ?CONNECTOR_TYPE_BIN},
{connector_config, ConnectorConfig},
{container_name, ContainerName},
{client, Client}
| Config0
],
Config.
end_per_testcase(_Testcase, Config) ->
Client = ?config(client, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_common_test_helpers:call_janitor(),
ok = snabbkaffe:stop(),
stop_control_client(Client),
ok.
direct_action_cases() ->
[
t_sync_query,
t_sync_query_down,
t_max_block_size_direct_transfer
].
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
start_control_client(Endpoint) ->
{ok, Client} = erlazure:start(#{
endpoint => Endpoint,
account => binary_to_list(?ACCOUNT_NAME_BIN),
key => binary_to_list(?ACCOUNT_KEY_BIN)
}),
Client.
stop_control_client(Client) ->
gen_server:stop(Client).
container_name(Name) ->
IOList = re:replace(bin(Name), <<"[^a-z0-9-]">>, <<"-">>, [global]),
iolist_to_binary(IOList).
ensure_new_container(Name0, Client) ->
Name = str(Name0),
case erlazure:create_container(Client, Name) of
{ok, created} ->
ok;
{error, #{code := "ContainerAlreadyExists"}} ->
{ok, deleted} = erlazure:delete_container(Client, Name),
{ok, created} = erlazure:create_container(Client, Name),
ok
end,
on_exit(fun() -> {ok, deleted} = erlazure:delete_container(Client, Name) end),
ok.
connector_config(Name, Endpoint) ->
InnerConfigMap0 =
#{
<<"enable">> => true,
<<"tags">> => [<<"bridge">>],
<<"description">> => <<"my cool bridge">>,
<<"endpoint">> => Endpoint,
%% Default Azurite credentials
%% See: https://github.com/Azure/Azurite/blob/main/README.md#default-storage-account
<<"account_name">> => ?ACCOUNT_NAME_BIN,
<<"account_key">> => ?ACCOUNT_KEY_BIN,
<<"resource_opts">> =>
#{
<<"health_check_interval">> => <<"1s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
}
},
emqx_bridge_v2_testlib:parse_and_check_connector(?ACTION_TYPE_BIN, Name, InnerConfigMap0).
direct_action_config(Overrides0) ->
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
CommonConfig =
#{
<<"enable">> => true,
<<"connector">> => <<"please override">>,
<<"parameters">> =>
#{
<<"mode">> => <<"direct">>,
<<"container">> => <<"${payload.c}">>,
<<"blob">> => <<"${payload.b}">>,
<<"content">> => <<"${.}">>
},
<<"resource_opts">> => #{
<<"batch_size">> => 1,
<<"batch_time">> => <<"0ms">>,
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"1s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"15s">>,
<<"resume_interval">> => <<"1s">>,
<<"worker_pool_size">> => <<"1">>
}
},
emqx_utils_maps:deep_merge(CommonConfig, Overrides).
aggreg_action_config(Overrides0) ->
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
CommonConfig =
#{
<<"enable">> => true,
<<"connector">> => <<"please override">>,
<<"parameters">> =>
#{
<<"mode">> => <<"aggregated">>,
<<"aggregation">> => #{
<<"container">> => #{
<<"type">> => <<"csv">>,
<<"column_order">> => ?CONF_COLUMN_ORDER
},
<<"time_interval">> => <<"4s">>,
<<"max_records">> => ?CONF_MAX_RECORDS
},
<<"container">> => <<"mycontainer">>,
<<"blob">> => <<"${action}/${node}/${datetime.rfc3339}/${sequence}">>
},
<<"resource_opts">> => #{
<<"batch_size">> => 100,
<<"batch_time">> => <<"10ms">>,
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"1s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"15s">>,
<<"resume_interval">> => <<"1s">>,
<<"worker_pool_size">> => <<"1">>
}
},
emqx_utils_maps:deep_merge(CommonConfig, Overrides).
aggreg_id(BridgeName) ->
{?ACTION_TYPE_BIN, BridgeName}.
mk_message_event(ClientId, Topic, Payload) ->
Message = emqx_message:make(bin(ClientId), bin(Topic), Payload),
{Event, _} = emqx_rule_events:eventmsg_publish(Message),
emqx_utils_maps:binary_key_map(Event).
mk_message({ClientId, Topic, Payload}) ->
emqx_message:make(bin(ClientId), bin(Topic), Payload).
publish_messages(MessageEvents) ->
lists:foreach(fun emqx:publish/1, MessageEvents).
publish_messages_delayed(MessageEvents, Delay) ->
lists:foreach(
fun(Msg) ->
emqx:publish(Msg),
ct:sleep(Delay)
end,
MessageEvents
).
list_blobs(Config) ->
Client = ?config(client, Config),
ContainerName = ?config(container_name, Config),
{Blobs, _} = erlazure:list_blobs(Client, str(ContainerName)),
Blobs.
get_blob(BlobName, Config) ->
Client = ?config(client, Config),
ContainerName = ?config(container_name, Config),
{ok, Blob} = erlazure:get_blob(Client, str(ContainerName), str(BlobName)),
Blob.
get_and_decode_event(BlobName, Config) ->
maps:update_with(
<<"payload">>,
fun(Raw) -> emqx_utils_json:decode(Raw, [return_maps]) end,
emqx_utils_json:decode(get_blob(BlobName, Config), [return_maps])
).
list_committed_blocks(Config) ->
Client = ?config(client, Config),
Container0 = ?config(container_name, Config),
Container = emqx_utils_conv:str(Container0),
{Blobs, _} = erlazure:list_blobs(Client, Container),
lists:map(
fun(#cloud_blob{name = BlobName}) ->
{Blocks, _} = erlazure:get_block_list(Client, Container, BlobName),
{BlobName, [{Id, Type} || #blob_block{id = Id, type = Type} <- Blocks]}
end,
Blobs
).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_start_stop(Config) ->
ok = emqx_bridge_v2_testlib:t_start_stop(Config, azure_blob_storage_stop),
ok.
t_create_via_http(Config) ->
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
ok.
t_on_get_status(Config) ->
ok = emqx_bridge_v2_testlib:t_on_get_status(Config),
ok.
%% Testing non-aggregated / direct action
t_sync_query(Config) ->
ContainerName = ?config(container_name, Config),
Topic = <<"t/a">>,
Payload0 = #{
<<"b">> => <<"myblob">>,
<<"c">> => ContainerName,
<<"x">> => <<"first data">>
},
Payload0Bin = emqx_utils_json:encode(Payload0),
ClientId = <<"some_client">>,
MsgEvent0 = mk_message_event(ClientId, Topic, Payload0Bin),
ok = emqx_bridge_v2_testlib:t_sync_query(
Config,
fun() -> MsgEvent0 end,
fun(Res) -> ?assertMatch(ok, Res) end,
azure_blob_storage_bridge_connector_upload_ok
),
Decoded0 = get_and_decode_event(<<"myblob">>, Config),
?assertMatch(#{<<"payload">> := #{<<"x">> := <<"first data">>}}, Decoded0),
%% Test sending the same payload again, so that the same blob is written to.
ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
Payload1 = Payload0#{<<"x">> => <<"new data">>},
Payload1Bin = emqx_utils_json:encode(Payload1),
MsgEvent1 = mk_message_event(ClientId, Topic, Payload1Bin),
Message = {BridgeId, MsgEvent1},
?assertMatch(ok, emqx_resource:simple_sync_query(ResourceId, Message)),
Decoded1 = get_and_decode_event(<<"myblob">>, Config),
?assertMatch(#{<<"payload">> := #{<<"x">> := <<"new data">>}}, Decoded1),
ok.
%% Testing non-aggregated / direct action
t_sync_query_down(Config) ->
ContainerName = ?config(container_name, Config),
Payload0 = #{
<<"b">> => <<"myblob">>,
<<"c">> => ContainerName,
<<"x">> => <<"first data">>
},
Payload0Bin = emqx_utils_json:encode(Payload0),
ClientId = <<"some_client">>,
ok = emqx_bridge_v2_testlib:t_sync_query_down(
Config,
#{
make_message_fn => fun(Topic) -> mk_message({ClientId, Topic, Payload0Bin}) end,
enter_tp_filter =>
?match_event(#{
?snk_kind := azure_blob_storage_bridge_on_query_enter,
mode := direct
}),
error_tp_filter =>
?match_event(#{?snk_kind := azure_blob_storage_bridge_direct_upload_error}),
success_tp_filter =>
?match_event(#{?snk_kind := azure_blob_storage_bridge_connector_upload_ok})
}
),
ok.
t_aggreg_upload(Config) ->
ActionName = ?config(action_name, Config),
AggregId = aggreg_id(ActionName),
?check_trace(
#{timetrap => timer:seconds(30)},
begin
ActionNameString = unicode:characters_to_list(ActionName),
NodeString = atom_to_list(node()),
%% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge_api(Config)),
{ok, _Rule} =
emqx_bridge_v2_testlib:create_rule_and_action_http(
?ACTION_TYPE_BIN, <<"">>, Config, #{
sql => <<
"SELECT"
" *,"
" strlen(payload) as psize,"
" unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at"
" FROM 'abs/#'"
>>
}
),
Messages = lists:map(fun mk_message/1, [
{<<"C1">>, T1 = <<"abs/a/b/c">>, P1 = <<"{\"hello\":\"world\"}">>},
{<<"C2">>, T2 = <<"abs/foo/bar">>, P2 = <<"baz">>},
{<<"C3">>, T3 = <<"abs/t/42">>, P3 = <<"">>},
%% Won't match rule filter
{<<"C4">>, <<"t/42">>, <<"won't appear in results">>}
]),
ok = publish_messages(Messages),
%% Wait until the delivery is completed.
?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
%% Check the uploaded objects.
_Uploads =
[#cloud_blob{name = BlobName, properties = UploadProps}] = list_blobs(Config),
?assertMatch(#{content_type := "text/csv"}, maps:from_list(UploadProps)),
?assertMatch(
[ActionNameString, NodeString, _Datetime, _Seq = "0"],
string:split(BlobName, "/", all)
),
Content = get_blob(BlobName, Config),
%% Verify that column order is respected.
?assertMatch(
{ok, [
?CONF_COLUMN_ORDER(_),
[_TS1, <<"C1">>, T1, P1, <<>> | _],
[_TS2, <<"C2">>, T2, P2, <<>> | _],
[_TS3, <<"C3">>, T3, P3, <<>> | _]
]},
erl_csv:decode(Content)
),
ok
end,
[]
),
ok.
%% This test verifies that the bridge will reuse existing aggregation buffer after a
%% restart.
t_aggreg_upload_restart(Config) ->
ActionName = ?config(action_name, Config),
AggregId = aggreg_id(ActionName),
?check_trace(
#{timetrap => timer:seconds(30)},
begin
ActionNameString = unicode:characters_to_list(ActionName),
NodeString = atom_to_list(node()),
%% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge_api(Config)),
{ok, _Rule} =
emqx_bridge_v2_testlib:create_rule_and_action_http(
?ACTION_TYPE_BIN, <<"">>, Config, #{
sql => <<
"SELECT"
" *,"
" strlen(payload) as psize,"
" unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at"
" FROM 'abs/#'"
>>
}
),
Messages = lists:map(fun mk_message/1, [
{<<"C1">>, T1 = <<"abs/a/b/c">>, P1 = <<"{\"hello\":\"world\"}">>},
{<<"C2">>, T2 = <<"abs/foo/bar">>, P2 = <<"baz">>},
{<<"C3">>, T3 = <<"abs/t/42">>, P3 = <<"">>}
]),
ok = publish_messages(Messages),
{ok, _} = ?block_until(#{
?snk_kind := connector_aggreg_records_written, action := AggregId
}),
{ok, _} =
?wait_async_action(
begin
%% Restart the bridge.
{ok, {{_, 204, _}, _, _}} = emqx_bridge_v2_testlib:disable_kind_http_api(
Config
),
{ok, {{_, 204, _}, _, _}} = emqx_bridge_v2_testlib:enable_kind_http_api(
Config
),
%% Send some more messages.
ok = publish_messages(Messages)
end,
#{?snk_kind := connector_aggreg_records_written, action := AggregId}
),
%% Wait until the delivery is completed.
?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
%% Check there's still only one upload.
[#cloud_blob{name = BlobName, properties = UploadProps}] = list_blobs(Config),
?assertMatch(#{content_type := "text/csv"}, maps:from_list(UploadProps)),
?assertMatch(
[ActionNameString, NodeString, _Datetime, _Seq = "0"],
string:split(BlobName, "/", all)
),
Content = get_blob(BlobName, Config),
?assertMatch(
{ok, [
?CONF_COLUMN_ORDER(_),
[_TS1, <<"C1">>, T1, P1, <<>> | _],
[_TS2, <<"C2">>, T2, P2, <<>> | _],
[_TS3, <<"C3">>, T3, P3, <<>> | _],
[_TS1, <<"C1">>, T1, P1, <<>> | _],
[_TS2, <<"C2">>, T2, P2, <<>> | _],
[_TS3, <<"C3">>, T3, P3, <<>> | _]
]},
erl_csv:decode(Content)
),
ok
end,
[]
),
ok.
%% This test verifies that the bridge can recover from a buffer file corruption, and does
%% so while preserving uncompromised data.
t_aggreg_upload_restart_corrupted(Config) ->
ActionName = ?config(action_name, Config),
AggregId = aggreg_id(ActionName),
BatchSize = ?CONF_MAX_RECORDS div 2,
?check_trace(
#{timetrap => timer:seconds(30)},
begin
%% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge_api(Config)),
{ok, _Rule} =
emqx_bridge_v2_testlib:create_rule_and_action_http(
?ACTION_TYPE_BIN, <<"">>, Config, #{
sql => <<
"SELECT"
" *,"
" strlen(payload) as psize,"
" unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at"
" FROM 'abs/#'"
>>
}
),
Messages1 = [
{integer_to_binary(N), <<"abs/a/b/c">>, <<"{\"hello\":\"world\"}">>}
|| N <- lists:seq(1, BatchSize)
],
%% Ensure that they span multiple batch queries.
{ok, {ok, _}} =
?wait_async_action(
publish_messages_delayed(lists:map(fun mk_message/1, Messages1), 1),
#{?snk_kind := connector_aggreg_records_written, action := AggregId}
),
ct:pal("first batch's records have been written"),
%% Find out the buffer file.
{ok, #{filename := Filename}} = ?block_until(
#{?snk_kind := connector_aggreg_buffer_allocated, action := AggregId}
),
ct:pal("new buffer allocated"),
%% Stop the bridge, corrupt the buffer file, and restart the bridge.
{ok, {{_, 204, _}, _, _}} = emqx_bridge_v2_testlib:disable_kind_http_api(Config),
BufferFileSize = filelib:file_size(Filename),
ok = emqx_connector_aggregator_test_helpers:truncate_at(Filename, BufferFileSize div 2),
{ok, {{_, 204, _}, _, _}} = emqx_bridge_v2_testlib:enable_kind_http_api(Config),
%% Send some more messages.
Messages2 = [
{integer_to_binary(N), <<"abs/a/b/c">>, <<"{\"hello\":\"world\"}">>}
|| N <- lists:seq(1, BatchSize)
],
ok = publish_messages_delayed(lists:map(fun mk_message/1, Messages2), 1),
ct:pal("published second batch"),
%% Wait until the delivery is completed.
{ok, _} = ?block_until(#{
?snk_kind := connector_aggreg_delivery_completed, action := AggregId
}),
ct:pal("delivery completed"),
%% Check that upload contains part of the first batch and all of the second batch.
[#cloud_blob{name = BlobName}] = list_blobs(Config),
{ok, CSV = [_Header | Rows]} = erl_csv:decode(get_blob(BlobName, Config)),
NRows = length(Rows),
?assert(NRows > BatchSize, CSV),
?assertEqual(
lists:sublist(Messages1, NRows - BatchSize) ++ Messages2,
[{ClientID, Topic, Payload} || [_TS, ClientID, Topic, Payload | _] <- Rows],
CSV
),
ok
end,
[]
),
ok.
%% This test verifies that the bridge will finish uploading a buffer file after a restart.
t_aggreg_pending_upload_restart(Config) ->
ActionName = ?config(action_name, Config),
AggregId = aggreg_id(ActionName),
?check_trace(
#{timetrap => timer:seconds(30)},
begin
%% Create a bridge with the sample configuration.
?assertMatch(
{ok, _Bridge},
emqx_bridge_v2_testlib:create_bridge_api(
Config,
#{
<<"parameters">> =>
#{
<<"min_block_size">> => <<"1024B">>
}
}
)
),
{ok, _Rule} =
emqx_bridge_v2_testlib:create_rule_and_action_http(
?ACTION_TYPE_BIN, <<"">>, Config, #{
sql => <<
"SELECT"
" *,"
" strlen(payload) as psize,"
" unix_ts_to_rfc3339(publish_received_at, 'millisecond') as publish_received_at"
" FROM 'abs/#'"
>>
}
),
%% Send few large messages that will require multipart upload.
%% Ensure that they span multiple batch queries.
Payload0 = iolist_to_binary(lists:duplicate(128, "PAYLOAD!")),
%% Payload0 = iolist_to_binary(lists:duplicate(128 * 1024, "PAYLOAD!")),
Messages = [
{integer_to_binary(N), <<"abs/a/b/c">>, Payload0}
|| N <- lists:seq(1, 10)
],
{ok, {ok, _}} =
?wait_async_action(
publish_messages_delayed(lists:map(fun mk_message/1, Messages), 10),
%% Wait until the multipart upload is started.
#{?snk_kind := azure_blob_storage_will_write_chunk}
),
ct:pal("published messages"),
%% Stop the bridge.
{ok, {{_, 204, _}, _, _}} = emqx_bridge_v2_testlib:disable_kind_http_api(
Config
),
ct:pal("stopped bridge"),
%% Verify that pending uploads have been gracefully aborted.
?assertMatch([{_Name, []}], list_committed_blocks(Config)),
%% Restart the bridge.
{ok, {{_, 204, _}, _, _}} = emqx_bridge_v2_testlib:enable_kind_http_api(
Config
),
ct:pal("restarted bridge"),
%% Wait until the delivery is completed.
{ok, _} = ?block_until(#{
?snk_kind := connector_aggreg_delivery_completed, action := AggregId
}),
ct:pal("delivery complete"),
%% Check that delivery contains all the messages.
?assertMatch([{_Name, [_ | _]}], list_committed_blocks(Config)),
[#cloud_blob{name = BlobName}] = list_blobs(Config),
{ok, CSV = [_Header | Rows]} = erl_csv:decode(get_blob(BlobName, Config)),
?assertEqual(
Messages,
[{ClientID, Topic, Payload} || [_TS, ClientID, Topic, Payload | _] <- Rows],
CSV
),
ok
end,
[]
),
ok.
%% Checks that we return an unrecoverable error if the payload exceeds `max_block_size'.
t_max_block_size_direct_transfer(Config) ->
{ok, _Bridge} = emqx_bridge_v2_testlib:create_bridge_api(
Config,
#{<<"parameters">> => #{<<"max_block_size">> => <<"1B">>}}
),
Topic = <<"t/a">>,
ClientId = <<"myclient">>,
ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
Payload = <<"too large">>,
PayloadBin = emqx_utils_json:encode(Payload),
MsgEvent = mk_message_event(ClientId, Topic, PayloadBin),
Message = {BridgeId, MsgEvent},
?assertMatch(
{error, {unrecoverable_error, payload_too_large}},
emqx_resource:simple_sync_query(ResourceId, Message)
),
ok.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_s3, [
{description, "EMQX Enterprise S3 Bridge"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [
kernel,

View File

@ -44,11 +44,7 @@ fields(s3_connector_config) ->
emqx_s3_schema:fields(s3_client) ++
emqx_connector_schema:resource_opts_ref(?MODULE, s3_connector_resource_opts);
fields(s3_connector_resource_opts) ->
CommonOpts = emqx_connector_schema:common_resource_opts_subfields(),
lists:filter(
fun({N, _}) -> lists:member(N, CommonOpts) end,
emqx_connector_schema:resource_opts_fields()
).
emqx_connector_schema:resource_opts_fields().
desc("config_connector") ->
?DESC(config_connector);

View File

@ -407,8 +407,8 @@ iolist_to_string(IOList) ->
%% `emqx_connector_aggreg_delivery` APIs
-spec init_transfer_state(buffer_map(), map()) -> emqx_s3_upload:t().
init_transfer_state(BufferMap, Opts) ->
-spec init_transfer_state(buffer(), map()) -> emqx_s3_upload:t().
init_transfer_state(Buffer, Opts) ->
#{
bucket := Bucket,
upload_options := UploadOpts,
@ -416,11 +416,11 @@ init_transfer_state(BufferMap, Opts) ->
uploader_config := UploaderConfig
} = Opts,
Client = emqx_s3_client:create(Bucket, Config),
Key = mk_object_key(BufferMap, Opts),
Key = mk_object_key(Buffer, Opts),
emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
mk_object_key(BufferMap, #{action := AggregId, key := Template}) ->
emqx_template:render_strict(Template, {?MODULE, {AggregId, BufferMap}}).
mk_object_key(Buffer, #{action := AggregId, key := Template}) ->
emqx_template:render_strict(Template, {?MODULE, {AggregId, Buffer}}).
process_append(Writes, Upload0) ->
{ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
@ -454,34 +454,26 @@ process_terminate(Upload) ->
%% `emqx_template` APIs
-spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) ->
-spec lookup(emqx_template:accessor(), {_Name, buffer()}) ->
{ok, integer() | string()} | {error, undefined}.
lookup([<<"action">>], {_AggregId = {_Type, Name}, _Buffer}) ->
{ok, mk_fs_safe_string(Name)};
lookup(Accessor, {_AggregId, Buffer = #{}}) ->
lookup([<<"node">>], {_AggregId, _Buffer}) ->
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
lookup(Accessor, {_AggregId, Buffer}) ->
lookup_buffer_var(Accessor, Buffer);
lookup(_Accessor, _Context) ->
{error, undefined}.
lookup_buffer_var([<<"datetime">>, Format], #{since := Since}) ->
{ok, format_timestamp(Since, Format)};
lookup_buffer_var([<<"datetime_until">>, Format], #{until := Until}) ->
{ok, format_timestamp(Until, Format)};
lookup_buffer_var([<<"sequence">>], #{seq := Seq}) ->
{ok, Seq};
lookup_buffer_var([<<"node">>], #{}) ->
{ok, mk_fs_safe_string(atom_to_binary(erlang:node()))};
lookup_buffer_var(_Binding, _Context) ->
{error, undefined}.
format_timestamp(Timestamp, <<"rfc3339utc">>) ->
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]),
mk_fs_safe_string(String);
format_timestamp(Timestamp, <<"rfc3339">>) ->
String = calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]),
mk_fs_safe_string(String);
format_timestamp(Timestamp, <<"unix">>) ->
Timestamp.
lookup_buffer_var(Accessor, Buffer) ->
case emqx_connector_aggreg_buffer_ctx:lookup(Accessor, Buffer) of
{ok, String} when is_list(String) ->
{ok, mk_fs_safe_string(String)};
{ok, Value} ->
{ok, Value};
{error, Reason} ->
{error, Reason}
end.
mk_fs_safe_string(String) ->
unicode:characters_to_binary(string:replace(String, ":", "_", all)).

View File

@ -418,7 +418,7 @@ format_primitive_type_desc(TypeStr, DescResolver) ->
get_primitive_typespec(TypeStr) ->
emqx_conf_schema_types:readable_docgen(?MODULE, TypeStr).
%% All types should have a namespace to avlid name clashing.
%% All types should have a namespace to avoid name clashing.
is_missing_namespace(ShortName, FullName, RootNames) ->
case lists:member(ShortName, RootNames) of
true ->

View File

@ -107,7 +107,8 @@ hard_coded_connector_info_modules_ee() ->
emqx_bridge_pulsar_connector_info,
emqx_bridge_tdengine_connector_info,
emqx_bridge_rabbitmq_connector_info,
emqx_bridge_s3_connector_info
emqx_bridge_s3_connector_info,
emqx_bridge_azure_blob_storage_connector_info
].
-else.
hard_coded_connector_info_modules_ee() ->

View File

@ -0,0 +1,42 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_connector_aggreg_buffer_ctx).
-behaviour(emqx_template).
-include("emqx_connector_aggregator.hrl").
%% `emqx_template' API
-export([lookup/2]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% `emqx_template' API
%%------------------------------------------------------------------------------
-spec lookup(emqx_template:accessor(), buffer()) ->
{ok, integer() | string()} | {error, undefined}.
lookup([<<"datetime">>, Format], #buffer{since = Since}) ->
{ok, format_timestamp(Since, Format)};
lookup([<<"datetime_until">>, Format], #buffer{until = Until}) ->
{ok, format_timestamp(Until, Format)};
lookup([<<"sequence">>], #buffer{seq = Seq}) ->
{ok, Seq};
lookup(_Binding, _Context) ->
{error, undefined}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
format_timestamp(Timestamp, <<"rfc3339utc">>) ->
calendar:system_time_to_rfc3339(Timestamp, [{unit, second}, {offset, "Z"}]);
format_timestamp(Timestamp, <<"rfc3339">>) ->
calendar:system_time_to_rfc3339(Timestamp, [{unit, second}]);
format_timestamp(Timestamp, <<"unix">>) ->
Timestamp.

View File

@ -24,6 +24,8 @@
format_status/2
]).
-export_type([buffer_map/0]).
-record(delivery, {
id :: id(),
callback_module :: module(),
@ -46,7 +48,7 @@
%% @doc Initialize the transfer state, such as blob storage path, transfer options, client
%% credentials, etc. .
-callback init_transfer_state(buffer_map(), map()) -> transfer_state().
-callback init_transfer_state(buffer(), map()) -> transfer_state().
%% @doc Append data to the transfer before sending. Usually should not fail.
-callback process_append(iodata(), transfer_state()) -> transfer_state().
@ -84,13 +86,12 @@ init_delivery(
callback_module := Mod
}
) ->
BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer),
#delivery{
id = Id,
callback_module = Mod,
container = mk_container(ContainerOpts),
reader = Reader,
transfer = Mod:init_transfer_state(BufferMap, Opts),
transfer = Mod:init_transfer_state(Buffer, Opts),
empty = true
}.

View File

@ -1,6 +1,6 @@
{application, emqx_connector_aggregator, [
{description, "EMQX Enterprise Connector Data Aggregator"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [
kernel,

View File

@ -0,0 +1,89 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_connector_aggregator_schema).
-behaviour(hocon_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
%% API
-export([
container/0
]).
%% `hocon_schema' API
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
container() ->
{container,
hoconsc:mk(
%% TODO: Support selectors once there are more than one container.
hoconsc:union(fun
(all_union_members) -> [ref(container_csv)];
({value, _Value}) -> [ref(container_csv)]
end),
#{
required => true,
default => #{<<"type">> => <<"csv">>},
desc => ?DESC("container")
}
)}.
%%------------------------------------------------------------------------------
%% `hocon_schema' API
%%------------------------------------------------------------------------------
namespace() -> "connector_aggregator".
roots() -> [].
fields(container_csv) ->
[
{type,
mk(
csv,
#{
required => true,
desc => ?DESC("container_csv")
}
)},
{column_order,
mk(
hoconsc:array(string()),
#{
required => false,
default => [],
desc => ?DESC("container_csv_column_order")
}
)}
].
desc(Name) when
Name == container_csv
->
?DESC(Name);
desc(_Name) ->
undefined.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
mk(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Name) -> hoconsc:ref(?MODULE, Name).

View File

@ -117,6 +117,7 @@
emqx_bridge_rabbitmq,
emqx_bridge_azure_event_hub,
emqx_bridge_s3,
emqx_bridge_azure_blob_storage,
emqx_schema_registry,
emqx_eviction_agent,
emqx_node_rebalance,

View File

@ -3,7 +3,7 @@
{id, "emqx_machine"},
{description, "The EMQX Machine"},
% strict semver, bump manually!
{vsn, "0.3.0"},
{vsn, "0.3.1"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, emqx_ctl]},

View File

@ -0,0 +1 @@
Implemented Azure Blob Storage data integration.

View File

@ -189,6 +189,7 @@ defmodule EMQXUmbrella.MixProject do
:emqx_license,
:emqx_s3,
:emqx_bridge_s3,
:emqx_bridge_azure_blob_storage,
:emqx_schema_registry,
:emqx_schema_validation,
:emqx_enterprise,

View File

@ -104,6 +104,7 @@ is_community_umbrella_app("apps/emqx_bridge_rabbitmq") -> false;
is_community_umbrella_app("apps/emqx_ft") -> false;
is_community_umbrella_app("apps/emqx_s3") -> false;
is_community_umbrella_app("apps/emqx_bridge_s3") -> false;
is_community_umbrella_app("apps/emqx_bridge_azure_blob_storage") -> false;
is_community_umbrella_app("apps/emqx_schema_registry") -> false;
is_community_umbrella_app("apps/emqx_enterprise") -> false;
is_community_umbrella_app("apps/emqx_bridge_kinesis") -> false;

View File

@ -0,0 +1,88 @@
emqx_bridge_azure_blob_storage_action_schema {
azure_blob_storage.label:
"""Upload to Azure Blob Storage"""
azure_blob_storage.desc:
"""Action that takes incoming events and uploads them to the Azure Blob Storage service."""
direct_parameters.label:
"""Direct Azure Blob Storage Upload action parameters"""
direct_parameters.desc:
"""Set of parameters for the upload action. Action supports templates in Azure Blob Storage container name, blob name and blob content."""
direct_container_template.desc:
"""The name of the Azure Blob Storage container name."""
direct_container_template.label:
"""Container Name"""
direct_blob_template.desc:
"""The name of the Azure Blob Storage blob name."""
direct_blob_template.label:
"""Blob Name"""
direct_content_template.label:
"""Azure Blob Storage Blob Content"""
direct_content_template.desc:
"""Content of the Azure Blob Storage blob being uploaded. Supports templates."""
parameters.label:
"""Azure Blob Storage action parameters"""
parameters.desc:
"""Set of parameters for the action."""
aggreg_parameters.label:
"""Azure Blob Storage Aggregated Mode action parameters"""
aggreg_parameters.desc:
"""Set of parameters for the action in aggregated mode."""
direct_mode.label:
"""Direct Azure Blob Storage Upload"""
direct_mode.desc:
"""Enables uploading of events to the Azure Blob Storage service as separate objects."""
aggregated_mode.label:
"""Aggregated Azure Blob Storage Upload"""
aggregated_mode.desc:
"""Enables time-based aggregation of incoming events and uploading them to the Azure Blob Storage service as a single object."""
aggregation.label:
"""Aggregation parameters"""
aggregation.desc:
"""Set of parameters governing the aggregation process."""
aggregation_interval.label:
"""Time interval"""
aggregation_interval.desc:
"""Amount of time events will be aggregated in a single object before uploading."""
aggregation_max_records.label:
"""Maximum number of records"""
aggregation_max_records.desc:
"""Number of records (events) allowed per each aggregated object. Each aggregated upload will contain no more than that number of events, but may contain less.<br/>
If event rate is high enough, there obviously may be more than one aggregated upload during the same time interval. These uploads will have different, but consecutive sequence numbers, which will be a part of Azure Blob Storage blob name."""
aggregated_container_name.label:
"""Azure Blob Storage Container name"""
aggregated_container_name.desc:
"""The Azure Blob Storage container name. Does not support templates."""
aggregated_blob_template.label:
"""Azure Blob Storage blob name template"""
aggregated_blob_template.desc:
"""Template for the Azure Blob Storage blob name of an aggregated upload.<br/>
Template may contain placeholders for the following variables:
<ul>
<li><code>${action}</code>: name of the action (required).</li>
<li><code>${node}</code>: name of the EMQX node conducting the upload (required).</li>
<li><code>${datetime.{format}}</code>: date and time when aggregation started, formatted according to the <code>{format}</code> string (required):
<ul>
<li><code>${datetime.rfc3339utc}</code>: RFC3339-formatted date and time in UTC,</li>
<li><code>${datetime.rfc3339}</code>: RFC3339-formatted date and time in local timezone,</li>
<li><code>${datetime.unix}</code>: Unix timestamp.</li>
</ul>
</li>
<li><code>${datetime_until.{format}}</code>: date and time when aggregation ended, with the same formatting options.</li>
<li><code>${sequence}</code>: sequence number of the aggregated upload within the same time interval (required).</li>
</ul>
All other placeholders are considered invalid. Note that placeholders marked as required will be added as a path suffix to the Azure Blob Storage blob name if they are missing from the template."""
}

View File

@ -0,0 +1,16 @@
emqx_bridge_azure_blob_storage_connector_schema {
config_connector.label:
"""Azure Blob Storage Connector Configuration"""
config_connector.desc:
"""Configuration for a connector to Azure Blob Storage service."""
account_name.label:
"""Account Name"""
account_name.desc:
"""Account name for Azure Blob Storage service."""
account_key.label:
"""Account Key"""
account_key.desc:
"""Account key for Azure Blob Storage service."""
}

View File

@ -0,0 +1,19 @@
emqx_connector_aggregator_schema {
container.label:
"""Container for aggregated events"""
container.desc:
"""Settings governing the file format of an upload containing aggregated events."""
container_csv.label:
"""CSV container"""
container_csv.desc:
"""Records (events) will be aggregated and uploaded as a CSV file."""
container_csv_column_order.label:
"""CSV column order"""
container_csv_column_order.desc:
"""Event fields that will be ordered first as columns in the resulting CSV file.<br/>
Regardless of this setting, resulting CSV will contain all the fields of aggregated events, but all the columns not explicitly mentioned here will be ordered after the ones listed here in the lexicographical order."""
}

View File

@ -249,6 +249,9 @@ for dep in ${CT_DEPS}; do
elasticsearch)
FILES+=( '.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml' )
;;
azurite)
FILES+=( '.ci/docker-compose-file/docker-compose-azurite.yaml' )
;;
*)
echo "unknown_ct_dependency $dep"
exit 1