Merge pull request #13059 from keynslug/feat/EMQX-12204/single-action-schema
feat(s3-bridge): meld 2 separate actions into a single one
This commit is contained in:
commit
4d1db9f847
|
@ -118,8 +118,7 @@ hard_coded_action_info_modules_ee() ->
|
|||
emqx_bridge_pulsar_action_info,
|
||||
emqx_bridge_greptimedb_action_info,
|
||||
emqx_bridge_tdengine_action_info,
|
||||
emqx_bridge_s3_upload_action_info,
|
||||
emqx_bridge_s3_aggreg_upload_action_info
|
||||
emqx_bridge_s3_upload_action_info
|
||||
].
|
||||
-else.
|
||||
hard_coded_action_info_modules_ee() ->
|
||||
|
|
|
@ -12,8 +12,7 @@
|
|||
]},
|
||||
{env, [
|
||||
{emqx_action_info_modules, [
|
||||
emqx_bridge_s3_upload_action_info,
|
||||
emqx_bridge_s3_aggreg_upload_action_info
|
||||
emqx_bridge_s3_upload_action_info
|
||||
]},
|
||||
{emqx_connector_info_modules, [
|
||||
emqx_bridge_s3_connector_info
|
||||
|
|
|
@ -8,8 +8,6 @@
|
|||
%% Actions
|
||||
-define(ACTION_UPLOAD, s3).
|
||||
-define(BRIDGE_TYPE_UPLOAD, <<"s3">>).
|
||||
-define(ACTION_AGGREGATED_UPLOAD, s3_aggregated_upload).
|
||||
-define(BRIDGE_TYPE_AGGREGATED_UPLOAD, <<"s3_aggregated_upload">>).
|
||||
|
||||
-define(CONNECTOR, s3).
|
||||
|
||||
|
|
|
@ -1,275 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_s3_aggreg_upload).
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include("emqx_bridge_s3.hrl").
|
||||
|
||||
-define(ACTION, ?ACTION_AGGREGATED_UPLOAD).
|
||||
|
||||
-define(DEFAULT_BATCH_SIZE, 100).
|
||||
-define(DEFAULT_BATCH_TIME, <<"10ms">>).
|
||||
|
||||
-behaviour(hocon_schema).
|
||||
-export([
|
||||
namespace/0,
|
||||
roots/0,
|
||||
fields/1,
|
||||
desc/1
|
||||
]).
|
||||
|
||||
%% Interpreting options
|
||||
-export([
|
||||
mk_key_template/1,
|
||||
mk_upload_options/1
|
||||
]).
|
||||
|
||||
%% emqx_bridge_v2_schema API
|
||||
-export([bridge_v2_examples/1]).
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% `hocon_schema' API
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
namespace() ->
|
||||
"bridge_s3_aggreg_upload".
|
||||
|
||||
roots() ->
|
||||
[].
|
||||
|
||||
fields(Field) when
|
||||
Field == "get_bridge_v2";
|
||||
Field == "put_bridge_v2";
|
||||
Field == "post_bridge_v2"
|
||||
->
|
||||
emqx_bridge_v2_schema:api_fields(Field, ?ACTION, fields(?ACTION));
|
||||
fields(action) ->
|
||||
{?ACTION,
|
||||
hoconsc:mk(
|
||||
hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)),
|
||||
#{
|
||||
desc => <<"S3 Aggregated Upload Action Config">>,
|
||||
required => false
|
||||
}
|
||||
)};
|
||||
fields(?ACTION) ->
|
||||
emqx_bridge_v2_schema:make_producer_action_schema(
|
||||
hoconsc:mk(
|
||||
?R_REF(s3_aggregated_upload_parameters),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(s3_aggregated_upload_parameters)
|
||||
}
|
||||
),
|
||||
#{
|
||||
resource_opts_ref => ?R_REF(s3_aggreg_upload_resource_opts)
|
||||
}
|
||||
);
|
||||
fields(s3_aggregated_upload_parameters) ->
|
||||
lists:append([
|
||||
[
|
||||
{container,
|
||||
hoconsc:mk(
|
||||
%% TODO: Support selectors once there are more than one container.
|
||||
hoconsc:union(fun
|
||||
(all_union_members) -> [?REF(s3_aggregated_container_csv)];
|
||||
({value, _Valur}) -> [?REF(s3_aggregated_container_csv)]
|
||||
end),
|
||||
#{
|
||||
required => true,
|
||||
default => #{<<"type">> => <<"csv">>},
|
||||
desc => ?DESC(s3_aggregated_container)
|
||||
}
|
||||
)},
|
||||
{aggregation,
|
||||
hoconsc:mk(
|
||||
?REF(s3_aggregation),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(s3_aggregation)
|
||||
}
|
||||
)}
|
||||
],
|
||||
emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [
|
||||
{key, #{desc => ?DESC(s3_aggregated_upload_key)}}
|
||||
]),
|
||||
emqx_s3_schema:fields(s3_uploader)
|
||||
]);
|
||||
fields(s3_aggregated_container_csv) ->
|
||||
[
|
||||
{type,
|
||||
hoconsc:mk(
|
||||
csv,
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(s3_aggregated_container_csv)
|
||||
}
|
||||
)},
|
||||
{column_order,
|
||||
hoconsc:mk(
|
||||
hoconsc:array(string()),
|
||||
#{
|
||||
required => false,
|
||||
default => [],
|
||||
desc => ?DESC(s3_aggregated_container_csv_column_order)
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields(s3_aggregation) ->
|
||||
[
|
||||
%% TODO: Needs bucketing? (e.g. messages falling in this 1h interval)
|
||||
{time_interval,
|
||||
hoconsc:mk(
|
||||
emqx_schema:duration_s(),
|
||||
#{
|
||||
required => false,
|
||||
default => <<"1h">>,
|
||||
desc => ?DESC(s3_aggregation_interval)
|
||||
}
|
||||
)},
|
||||
{max_records,
|
||||
hoconsc:mk(
|
||||
pos_integer(),
|
||||
#{
|
||||
required => false,
|
||||
default => <<"1000000">>,
|
||||
desc => ?DESC(s3_aggregation_max_records)
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields(s3_aggreg_upload_resource_opts) ->
|
||||
%% NOTE: This action should benefit from generous batching defaults.
|
||||
emqx_bridge_v2_schema:action_resource_opts_fields([
|
||||
{batch_size, #{default => ?DEFAULT_BATCH_SIZE}},
|
||||
{batch_time, #{default => ?DEFAULT_BATCH_TIME}}
|
||||
]).
|
||||
|
||||
desc(Name) when
|
||||
Name == s3_aggregated_upload;
|
||||
Name == s3_aggregated_upload_parameters;
|
||||
Name == s3_aggregation;
|
||||
Name == s3_aggregated_container_csv
|
||||
->
|
||||
?DESC(Name);
|
||||
desc(s3_aggreg_upload_resource_opts) ->
|
||||
?DESC(emqx_resource_schema, resource_opts);
|
||||
desc(_Name) ->
|
||||
undefined.
|
||||
|
||||
%% Interpreting options
|
||||
|
||||
-spec mk_key_template(_Parameters :: map()) -> emqx_template:str().
|
||||
mk_key_template(#{key := Key}) ->
|
||||
Template = emqx_template:parse(Key),
|
||||
{_, BindingErrors} = emqx_template:render(Template, #{}),
|
||||
{UsedBindings, _} = lists:unzip(BindingErrors),
|
||||
SuffixTemplate = mk_suffix_template(UsedBindings),
|
||||
case emqx_template:is_const(SuffixTemplate) of
|
||||
true ->
|
||||
Template;
|
||||
false ->
|
||||
Template ++ SuffixTemplate
|
||||
end.
|
||||
|
||||
mk_suffix_template(UsedBindings) ->
|
||||
RequiredBindings = ["action", "node", "datetime.", "sequence"],
|
||||
SuffixBindings = [
|
||||
mk_default_binding(RB)
|
||||
|| RB <- RequiredBindings,
|
||||
lists:all(fun(UB) -> string:prefix(UB, RB) == nomatch end, UsedBindings)
|
||||
],
|
||||
SuffixTemplate = [["/", B] || B <- SuffixBindings],
|
||||
emqx_template:parse(SuffixTemplate).
|
||||
|
||||
mk_default_binding("datetime.") ->
|
||||
"${datetime.rfc3339utc}";
|
||||
mk_default_binding(Binding) ->
|
||||
"${" ++ Binding ++ "}".
|
||||
|
||||
-spec mk_upload_options(_Parameters :: map()) -> emqx_s3_client:upload_options().
|
||||
mk_upload_options(Parameters) ->
|
||||
Headers = mk_upload_headers(Parameters),
|
||||
#{
|
||||
headers => Headers,
|
||||
acl => maps:get(acl, Parameters, undefined)
|
||||
}.
|
||||
|
||||
mk_upload_headers(Parameters = #{container := Container}) ->
|
||||
Headers = normalize_headers(maps:get(headers, Parameters, #{})),
|
||||
ContainerHeaders = mk_container_headers(Container),
|
||||
maps:merge(ContainerHeaders, Headers).
|
||||
|
||||
normalize_headers(Headers) ->
|
||||
maps:fold(
|
||||
fun(Header, Value, Acc) ->
|
||||
maps:put(string:lowercase(emqx_utils_conv:str(Header)), Value, Acc)
|
||||
end,
|
||||
#{},
|
||||
Headers
|
||||
).
|
||||
|
||||
mk_container_headers(#{type := csv}) ->
|
||||
#{"content-type" => "text/csv"};
|
||||
mk_container_headers(#{}) ->
|
||||
#{}.
|
||||
|
||||
%% Examples
|
||||
|
||||
bridge_v2_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"s3_aggregated_upload">> => #{
|
||||
summary => <<"S3 Aggregated Upload">>,
|
||||
value => s3_action_example(Method)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
s3_action_example(post) ->
|
||||
maps:merge(
|
||||
s3_action_example(put),
|
||||
#{
|
||||
type => atom_to_binary(?ACTION_UPLOAD),
|
||||
name => <<"my_s3_action">>
|
||||
}
|
||||
);
|
||||
s3_action_example(get) ->
|
||||
maps:merge(
|
||||
s3_action_example(put),
|
||||
#{
|
||||
status => <<"connected">>,
|
||||
node_status => [
|
||||
#{
|
||||
node => <<"emqx@localhost">>,
|
||||
status => <<"connected">>
|
||||
}
|
||||
]
|
||||
}
|
||||
);
|
||||
s3_action_example(put) ->
|
||||
#{
|
||||
enable => true,
|
||||
connector => <<"my_s3_connector">>,
|
||||
description => <<"My action">>,
|
||||
parameters => #{
|
||||
bucket => <<"mqtt-aggregated">>,
|
||||
key => <<"${action}/${node}/${datetime.rfc3339utc}_N${sequence}.csv">>,
|
||||
acl => <<"public_read">>,
|
||||
aggregation => #{
|
||||
time_interval => <<"15m">>,
|
||||
max_records => 100_000
|
||||
},
|
||||
<<"container">> => #{
|
||||
type => <<"csv">>,
|
||||
column_order => [<<"clientid">>, <<"topic">>, <<"publish_received_at">>]
|
||||
}
|
||||
},
|
||||
resource_opts => #{
|
||||
health_check_interval => <<"10s">>,
|
||||
query_mode => <<"async">>,
|
||||
inflight_window => 100
|
||||
}
|
||||
}.
|
|
@ -1,21 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_s3_aggreg_upload_action_info).
|
||||
|
||||
-behaviour(emqx_action_info).
|
||||
|
||||
-include("emqx_bridge_s3.hrl").
|
||||
|
||||
-export([
|
||||
action_type_name/0,
|
||||
connector_type_name/0,
|
||||
schema_module/0
|
||||
]).
|
||||
|
||||
action_type_name() -> ?ACTION_AGGREGATED_UPLOAD.
|
||||
|
||||
connector_type_name() -> s3.
|
||||
|
||||
schema_module() -> emqx_bridge_s3_aggreg_upload.
|
|
@ -52,6 +52,7 @@
|
|||
}.
|
||||
|
||||
-type s3_upload_parameters() :: #{
|
||||
mode := direct,
|
||||
bucket := string(),
|
||||
key := string(),
|
||||
content := string(),
|
||||
|
@ -59,6 +60,7 @@
|
|||
}.
|
||||
|
||||
-type s3_aggregated_upload_parameters() :: #{
|
||||
mode := aggregated,
|
||||
bucket := string(),
|
||||
key := string(),
|
||||
acl => emqx_s3:acl(),
|
||||
|
@ -187,22 +189,24 @@ on_get_channel_status(_InstId, ChannelId, State = #{channels := Channels}) ->
|
|||
start_channel(_State, #{
|
||||
bridge_type := ?BRIDGE_TYPE_UPLOAD,
|
||||
parameters := Parameters = #{
|
||||
mode := Mode = direct,
|
||||
bucket := Bucket,
|
||||
key := Key,
|
||||
content := Content
|
||||
}
|
||||
}) ->
|
||||
#{
|
||||
type => ?ACTION_UPLOAD,
|
||||
mode => Mode,
|
||||
bucket => emqx_template:parse(Bucket),
|
||||
key => emqx_template:parse(Key),
|
||||
content => emqx_template:parse(Content),
|
||||
upload_options => upload_options(Parameters)
|
||||
};
|
||||
start_channel(State, #{
|
||||
bridge_type := Type = ?BRIDGE_TYPE_AGGREGATED_UPLOAD,
|
||||
bridge_type := Type = ?BRIDGE_TYPE_UPLOAD,
|
||||
bridge_name := Name,
|
||||
parameters := Parameters = #{
|
||||
mode := Mode = aggregated,
|
||||
aggregation := #{
|
||||
time_interval := TimeInterval,
|
||||
max_records := MaxRecords
|
||||
|
@ -219,9 +223,9 @@ start_channel(State, #{
|
|||
},
|
||||
DeliveryOpts = #{
|
||||
bucket => Bucket,
|
||||
key => emqx_bridge_s3_aggreg_upload:mk_key_template(Parameters),
|
||||
key => emqx_bridge_s3_upload:mk_key_template(Parameters),
|
||||
container => Container,
|
||||
upload_options => emqx_bridge_s3_aggreg_upload:mk_upload_options(Parameters),
|
||||
upload_options => emqx_bridge_s3_upload:mk_upload_options(Parameters),
|
||||
callback_module => ?MODULE,
|
||||
client_config => maps:get(client_config, State),
|
||||
uploader_config => maps:with([min_part_size, max_part_size], Parameters)
|
||||
|
@ -235,7 +239,7 @@ start_channel(State, #{
|
|||
restart => permanent
|
||||
}),
|
||||
#{
|
||||
type => ?ACTION_AGGREGATED_UPLOAD,
|
||||
mode => Mode,
|
||||
name => Name,
|
||||
aggreg_id => AggregId,
|
||||
bucket => Bucket,
|
||||
|
@ -254,14 +258,12 @@ stop_channel(#{on_stop := OnStop}) ->
|
|||
stop_channel(_ChannelState) ->
|
||||
ok.
|
||||
|
||||
channel_status(#{type := ?ACTION_UPLOAD}, _State) ->
|
||||
channel_status(#{mode := direct}, _State) ->
|
||||
%% TODO
|
||||
%% Since bucket name may be templated, we can't really provide any additional
|
||||
%% information regarding the channel health.
|
||||
?status_connected;
|
||||
channel_status(
|
||||
#{type := ?ACTION_AGGREGATED_UPLOAD, aggreg_id := AggregId, bucket := Bucket}, State
|
||||
) ->
|
||||
channel_status(#{mode := aggregated, aggreg_id := AggregId, bucket := Bucket}, State) ->
|
||||
%% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
|
||||
Timestamp = erlang:system_time(second),
|
||||
ok = emqx_connector_aggregator:tick(AggregId, Timestamp),
|
||||
|
@ -305,9 +307,9 @@ check_aggreg_upload_errors(AggregId) ->
|
|||
{ok, _Result} | {error, _Reason}.
|
||||
on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) ->
|
||||
case maps:get(Tag, Channels, undefined) of
|
||||
ChannelState = #{type := ?ACTION_UPLOAD} ->
|
||||
ChannelState = #{mode := direct} ->
|
||||
run_simple_upload(InstId, Tag, Data, ChannelState, Config);
|
||||
ChannelState = #{type := ?ACTION_AGGREGATED_UPLOAD} ->
|
||||
ChannelState = #{mode := aggregated} ->
|
||||
run_aggregated_upload(InstId, [Data], ChannelState);
|
||||
undefined ->
|
||||
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
|
||||
|
@ -317,7 +319,7 @@ on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels})
|
|||
{ok, _Result} | {error, _Reason}.
|
||||
on_batch_query(InstId, [{Tag, Data0} | Rest], #{channels := Channels}) ->
|
||||
case maps:get(Tag, Channels, undefined) of
|
||||
ChannelState = #{type := ?ACTION_AGGREGATED_UPLOAD} ->
|
||||
ChannelState = #{mode := aggregated} ->
|
||||
Records = [Data0 | [Data || {_, Data} <- Rest]],
|
||||
run_aggregated_upload(InstId, Records, ChannelState);
|
||||
undefined ->
|
||||
|
|
|
@ -18,10 +18,22 @@
|
|||
desc/1
|
||||
]).
|
||||
|
||||
%% Interpreting options
|
||||
-export([
|
||||
mk_key_template/1,
|
||||
mk_upload_options/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
bridge_v2_examples/1
|
||||
]).
|
||||
|
||||
%% Internal exports
|
||||
-export([convert_actions/2]).
|
||||
|
||||
-define(DEFAULT_AGGREG_BATCH_SIZE, 100).
|
||||
-define(DEFAULT_AGGREG_BATCH_TIME, <<"10ms">>).
|
||||
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
%% `hocon_schema' API
|
||||
%%-------------------------------------------------------------------------------------------------
|
||||
|
@ -44,25 +56,37 @@ fields(action) ->
|
|||
hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION)),
|
||||
#{
|
||||
desc => <<"S3 Upload Action Config">>,
|
||||
required => false
|
||||
required => false,
|
||||
converter => fun ?MODULE:convert_actions/2
|
||||
}
|
||||
)};
|
||||
fields(?ACTION) ->
|
||||
emqx_bridge_v2_schema:make_producer_action_schema(
|
||||
hoconsc:mk(
|
||||
?R_REF(s3_upload_parameters),
|
||||
mkunion(mode, #{
|
||||
<<"direct">> => ?R_REF(s3_direct_upload_parameters),
|
||||
<<"aggregated">> => ?R_REF(s3_aggregated_upload_parameters)
|
||||
}),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(s3_upload)
|
||||
}
|
||||
),
|
||||
#{
|
||||
resource_opts_ref => ?R_REF(s3_action_resource_opts)
|
||||
resource_opts_ref => ?R_REF(s3_upload_resource_opts)
|
||||
}
|
||||
);
|
||||
fields(s3_upload_parameters) ->
|
||||
fields(s3_direct_upload_parameters) ->
|
||||
emqx_s3_schema:fields(s3_upload) ++
|
||||
[
|
||||
{mode,
|
||||
hoconsc:mk(
|
||||
direct,
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(s3_direct_upload_mode)
|
||||
}
|
||||
)},
|
||||
{content,
|
||||
hoconsc:mk(
|
||||
emqx_schema:template(),
|
||||
|
@ -73,49 +97,224 @@ fields(s3_upload_parameters) ->
|
|||
}
|
||||
)}
|
||||
];
|
||||
fields(s3_action_resource_opts) ->
|
||||
UnsupportedOpts = [batch_size, batch_time],
|
||||
lists:filter(
|
||||
fun({N, _}) -> not lists:member(N, UnsupportedOpts) end,
|
||||
emqx_bridge_v2_schema:action_resource_opts_fields()
|
||||
).
|
||||
fields(s3_aggregated_upload_parameters) ->
|
||||
lists:append([
|
||||
[
|
||||
{mode,
|
||||
hoconsc:mk(
|
||||
aggregated,
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(s3_aggregated_upload_mode)
|
||||
}
|
||||
)},
|
||||
{container,
|
||||
hoconsc:mk(
|
||||
mkunion(type, #{
|
||||
<<"csv">> => ?REF(s3_aggregated_container_csv)
|
||||
}),
|
||||
#{
|
||||
required => true,
|
||||
default => #{<<"type">> => <<"csv">>},
|
||||
desc => ?DESC(s3_aggregated_container)
|
||||
}
|
||||
)},
|
||||
{aggregation,
|
||||
hoconsc:mk(
|
||||
?REF(s3_aggregation),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(s3_aggregation)
|
||||
}
|
||||
)}
|
||||
],
|
||||
emqx_resource_schema:override(emqx_s3_schema:fields(s3_upload), [
|
||||
{key, #{desc => ?DESC(s3_aggregated_upload_key)}}
|
||||
]),
|
||||
emqx_s3_schema:fields(s3_uploader)
|
||||
]);
|
||||
fields(s3_aggregated_container_csv) ->
|
||||
[
|
||||
{type,
|
||||
hoconsc:mk(
|
||||
csv,
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC(s3_aggregated_container_csv)
|
||||
}
|
||||
)},
|
||||
{column_order,
|
||||
hoconsc:mk(
|
||||
hoconsc:array(string()),
|
||||
#{
|
||||
required => false,
|
||||
default => [],
|
||||
desc => ?DESC(s3_aggregated_container_csv_column_order)
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields(s3_aggregation) ->
|
||||
[
|
||||
%% TODO: Needs bucketing? (e.g. messages falling in this 1h interval)
|
||||
{time_interval,
|
||||
hoconsc:mk(
|
||||
emqx_schema:duration_s(),
|
||||
#{
|
||||
required => false,
|
||||
default => <<"1h">>,
|
||||
desc => ?DESC(s3_aggregation_interval)
|
||||
}
|
||||
)},
|
||||
{max_records,
|
||||
hoconsc:mk(
|
||||
pos_integer(),
|
||||
#{
|
||||
required => false,
|
||||
default => <<"1000000">>,
|
||||
desc => ?DESC(s3_aggregation_max_records)
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields(s3_upload_resource_opts) ->
|
||||
%% NOTE: Aggregated action should benefit from generous batching defaults.
|
||||
emqx_bridge_v2_schema:action_resource_opts_fields([
|
||||
{batch_size, #{default => ?DEFAULT_AGGREG_BATCH_SIZE}},
|
||||
{batch_time, #{default => ?DEFAULT_AGGREG_BATCH_TIME}}
|
||||
]).
|
||||
|
||||
mkunion(Field, Schemas) ->
|
||||
hoconsc:union(fun(Arg) -> scunion(Field, Schemas, Arg) end).
|
||||
|
||||
scunion(_Field, Schemas, all_union_members) ->
|
||||
maps:values(Schemas);
|
||||
scunion(Field, Schemas, {value, Value}) ->
|
||||
Selector = maps:get(emqx_utils_conv:bin(Field), Value, undefined),
|
||||
case Selector == undefined orelse maps:find(emqx_utils_conv:bin(Selector), Schemas) of
|
||||
{ok, Schema} ->
|
||||
[Schema];
|
||||
_Error ->
|
||||
throw(#{field_name => Field, expected => maps:keys(Schemas)})
|
||||
end.
|
||||
|
||||
desc(s3) ->
|
||||
?DESC(s3_upload);
|
||||
desc(Name) when
|
||||
Name == s3_upload;
|
||||
Name == s3_upload_parameters
|
||||
Name == s3_direct_upload_parameters;
|
||||
Name == s3_aggregated_upload_parameters;
|
||||
Name == s3_aggregation;
|
||||
Name == s3_aggregated_container_csv
|
||||
->
|
||||
?DESC(Name);
|
||||
desc(s3_action_resource_opts) ->
|
||||
desc(s3_upload_resource_opts) ->
|
||||
?DESC(emqx_resource_schema, resource_opts);
|
||||
desc(_Name) ->
|
||||
undefined.
|
||||
|
||||
convert_actions(Conf = #{}, Opts) ->
|
||||
maps:map(fun(_Name, ConfAction) -> convert_action(ConfAction, Opts) end, Conf);
|
||||
convert_actions(undefined, _) ->
|
||||
undefined.
|
||||
|
||||
convert_action(Conf = #{<<"parameters">> := Params, <<"resource_opts">> := ResourceOpts}, _) ->
|
||||
case Params of
|
||||
#{<<"mode">> := <<"direct">>} ->
|
||||
%% NOTE: Disable batching for direct uploads.
|
||||
NResourceOpts = ResourceOpts#{<<"batch_size">> => 1, <<"batch_time">> => 0},
|
||||
Conf#{<<"resource_opts">> := NResourceOpts};
|
||||
#{} ->
|
||||
Conf
|
||||
end.
|
||||
|
||||
%% Interpreting options
|
||||
|
||||
-spec mk_key_template(_Parameters :: map()) -> emqx_template:str().
|
||||
mk_key_template(#{key := Key}) ->
|
||||
Template = emqx_template:parse(Key),
|
||||
{_, BindingErrors} = emqx_template:render(Template, #{}),
|
||||
{UsedBindings, _} = lists:unzip(BindingErrors),
|
||||
SuffixTemplate = mk_suffix_template(UsedBindings),
|
||||
case emqx_template:is_const(SuffixTemplate) of
|
||||
true ->
|
||||
Template;
|
||||
false ->
|
||||
Template ++ SuffixTemplate
|
||||
end.
|
||||
|
||||
mk_suffix_template(UsedBindings) ->
|
||||
RequiredBindings = ["action", "node", "datetime.", "sequence"],
|
||||
SuffixBindings = [
|
||||
mk_default_binding(RB)
|
||||
|| RB <- RequiredBindings,
|
||||
lists:all(fun(UB) -> string:prefix(UB, RB) == nomatch end, UsedBindings)
|
||||
],
|
||||
SuffixTemplate = [["/", B] || B <- SuffixBindings],
|
||||
emqx_template:parse(SuffixTemplate).
|
||||
|
||||
mk_default_binding("datetime.") ->
|
||||
"${datetime.rfc3339utc}";
|
||||
mk_default_binding(Binding) ->
|
||||
"${" ++ Binding ++ "}".
|
||||
|
||||
-spec mk_upload_options(_Parameters :: map()) -> emqx_s3_client:upload_options().
|
||||
mk_upload_options(Parameters) ->
|
||||
Headers = mk_upload_headers(Parameters),
|
||||
#{
|
||||
headers => Headers,
|
||||
acl => maps:get(acl, Parameters, undefined)
|
||||
}.
|
||||
|
||||
mk_upload_headers(Parameters = #{container := Container}) ->
|
||||
Headers = normalize_headers(maps:get(headers, Parameters, #{})),
|
||||
ContainerHeaders = mk_container_headers(Container),
|
||||
maps:merge(ContainerHeaders, Headers).
|
||||
|
||||
normalize_headers(Headers) ->
|
||||
maps:fold(
|
||||
fun(Header, Value, Acc) ->
|
||||
maps:put(string:lowercase(emqx_utils_conv:str(Header)), Value, Acc)
|
||||
end,
|
||||
#{},
|
||||
Headers
|
||||
).
|
||||
|
||||
mk_container_headers(#{type := csv}) ->
|
||||
#{"content-type" => "text/csv"};
|
||||
mk_container_headers(#{}) ->
|
||||
#{}.
|
||||
|
||||
%% Examples
|
||||
|
||||
bridge_v2_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"s3">> => #{
|
||||
summary => <<"S3 Simple Upload">>,
|
||||
value => s3_upload_action_example(Method)
|
||||
summary => <<"S3 Direct Upload">>,
|
||||
value => s3_upload_action_example(Method, direct)
|
||||
},
|
||||
<<"s3_aggreg">> => #{
|
||||
summary => <<"S3 Aggregated Upload">>,
|
||||
value => s3_upload_action_example(Method, aggreg)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
s3_upload_action_example(post) ->
|
||||
s3_upload_action_example(post, Mode) ->
|
||||
maps:merge(
|
||||
s3_upload_action_example(put),
|
||||
s3_upload_action_example(put, Mode),
|
||||
#{
|
||||
type => atom_to_binary(?ACTION_UPLOAD),
|
||||
name => <<"my_s3_action">>
|
||||
name => <<"my_s3_action">>,
|
||||
enable => true,
|
||||
connector => <<"my_s3_connector">>
|
||||
}
|
||||
);
|
||||
s3_upload_action_example(get) ->
|
||||
s3_upload_action_example(get, Mode) ->
|
||||
maps:merge(
|
||||
s3_upload_action_example(put),
|
||||
s3_upload_action_example(put, Mode),
|
||||
#{
|
||||
enable => true,
|
||||
connector => <<"my_s3_connector">>,
|
||||
status => <<"connected">>,
|
||||
node_status => [
|
||||
#{
|
||||
|
@ -125,12 +324,11 @@ s3_upload_action_example(get) ->
|
|||
]
|
||||
}
|
||||
);
|
||||
s3_upload_action_example(put) ->
|
||||
s3_upload_action_example(put, direct) ->
|
||||
#{
|
||||
enable => true,
|
||||
connector => <<"my_s3_connector">>,
|
||||
description => <<"My action">>,
|
||||
description => <<"My upload action">>,
|
||||
parameters => #{
|
||||
mode => <<"direct">>,
|
||||
bucket => <<"${clientid}">>,
|
||||
key => <<"${topic}">>,
|
||||
content => <<"${payload}">>,
|
||||
|
@ -140,4 +338,27 @@ s3_upload_action_example(put) ->
|
|||
query_mode => <<"sync">>,
|
||||
inflight_window => 10
|
||||
}
|
||||
};
|
||||
s3_upload_action_example(put, aggreg) ->
|
||||
#{
|
||||
description => <<"My aggregated upload action">>,
|
||||
parameters => #{
|
||||
mode => <<"aggregated">>,
|
||||
bucket => <<"mqtt-aggregated">>,
|
||||
key => <<"${action}/${node}/${datetime.rfc3339utc}_N${sequence}.csv">>,
|
||||
acl => <<"public_read">>,
|
||||
aggregation => #{
|
||||
time_interval => <<"15m">>,
|
||||
max_records => 100_000
|
||||
},
|
||||
<<"container">> => #{
|
||||
type => <<"csv">>,
|
||||
column_order => [<<"clientid">>, <<"topic">>, <<"publish_received_at">>]
|
||||
}
|
||||
},
|
||||
resource_opts => #{
|
||||
health_check_interval => <<"10s">>,
|
||||
query_mode => <<"async">>,
|
||||
inflight_window => 100
|
||||
}
|
||||
}.
|
||||
|
|
|
@ -108,6 +108,7 @@ action_config(Name, ConnectorId) ->
|
|||
<<"enable">> => true,
|
||||
<<"connector">> => ConnectorId,
|
||||
<<"parameters">> => #{
|
||||
<<"mode">> => <<"direct">>,
|
||||
<<"bucket">> => <<"${clientid}">>,
|
||||
<<"key">> => <<"${topic}">>,
|
||||
<<"content">> => <<"${payload}">>,
|
||||
|
@ -122,6 +123,8 @@ action_config(Name, ConnectorId) ->
|
|||
<<"metrics_flush_interval">> => <<"1s">>,
|
||||
<<"query_mode">> => <<"sync">>,
|
||||
<<"request_ttl">> => <<"60s">>,
|
||||
<<"batch_size">> => 42,
|
||||
<<"batch_time">> => <<"100ms">>,
|
||||
<<"resume_interval">> => <<"3s">>,
|
||||
<<"worker_pool_size">> => <<"4">>
|
||||
}
|
||||
|
@ -131,6 +134,13 @@ action_config(Name, ConnectorId) ->
|
|||
t_start_stop(Config) ->
|
||||
emqx_bridge_v2_testlib:t_start_stop(Config, s3_bridge_stopped).
|
||||
|
||||
t_ignore_batch_opts(Config) ->
|
||||
{ok, {_Status, _, Bridge}} = emqx_bridge_v2_testlib:create_bridge_api(Config),
|
||||
?assertMatch(
|
||||
#{<<"resource_opts">> := #{<<"batch_size">> := 1, <<"batch_time">> := 0}},
|
||||
Bridge
|
||||
).
|
||||
|
||||
t_start_broken_update_restart(Config) ->
|
||||
Name = ?config(connector_name, Config),
|
||||
Type = ?config(connector_type, Config),
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
-import(emqx_utils_conv, [bin/1]).
|
||||
|
||||
%% See `emqx_bridge_s3.hrl`.
|
||||
-define(BRIDGE_TYPE, <<"s3_aggregated_upload">>).
|
||||
-define(BRIDGE_TYPE, <<"s3">>).
|
||||
-define(CONNECTOR_TYPE, <<"s3">>).
|
||||
|
||||
-define(PROXY_NAME, "minio_tcp").
|
||||
|
@ -122,6 +122,7 @@ action_config(Name, ConnectorId, Bucket) ->
|
|||
<<"enable">> => true,
|
||||
<<"connector">> => ConnectorId,
|
||||
<<"parameters">> => #{
|
||||
<<"mode">> => <<"aggregated">>,
|
||||
<<"bucket">> => unicode:characters_to_binary(Bucket),
|
||||
<<"key">> => <<"${action}/${node}/${datetime.rfc3339}">>,
|
||||
<<"acl">> => <<"public_read">>,
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
emqx_bridge_s3_aggreg_upload {
|
||||
|
||||
s3_aggregated_upload.label:
|
||||
"""S3 Aggregated Upload"""
|
||||
s3_aggregated_upload.desc:
|
||||
"""Action that enables time-based aggregation of incoming events and uploading them to the S3 service as a single object."""
|
||||
|
||||
s3_aggregated_upload_parameters.label:
|
||||
"""S3 Aggregated Upload action parameters"""
|
||||
s3_aggregated_upload_parameters.desc:
|
||||
"""Set of parameters for the aggregated upload action."""
|
||||
|
||||
s3_aggregation.label:
|
||||
"""Aggregation parameters"""
|
||||
s3_aggregation.desc:
|
||||
"""Set of parameters governing the aggregation process."""
|
||||
|
||||
s3_aggregation_interval.label:
|
||||
"""Time interval"""
|
||||
s3_aggregation_interval.desc:
|
||||
"""Amount of time events will be aggregated in a single object before uploading."""
|
||||
|
||||
s3_aggregation_max_records.label:
|
||||
"""Maximum number of records"""
|
||||
s3_aggregation_max_records.desc:
|
||||
"""Number of records (events) allowed per each aggregated object. Each aggregated upload will contain no more than that number of events, but may contain less.<br/>
|
||||
If event rate is high enough, there obviously may be more than one aggregated upload during the same time interval. These uploads will have different, but consecutive sequence numbers, which will be a part of S3 object key."""
|
||||
|
||||
s3_aggregated_container.label:
|
||||
"""Container for aggregated events"""
|
||||
s3_aggregated_container.desc:
|
||||
"""Settings governing the file format of an upload containing aggregated events."""
|
||||
|
||||
s3_aggregated_container_csv.label:
|
||||
"""CSV container"""
|
||||
s3_aggregated_container_csv.desc:
|
||||
"""Records (events) will be aggregated and uploaded as a CSV file."""
|
||||
|
||||
s3_aggregated_container_csv_column_order.label:
|
||||
"""CSV column order"""
|
||||
s3_aggregated_container_csv_column_order.desc:
|
||||
"""Event fields that will be ordered first as columns in the resulting CSV file.<br/>
|
||||
Regardless of this setting, resulting CSV will contain all the fields of aggregated events, but all the columns not explicitly mentioned here will be ordered after the ones listed here in the lexicographical order."""
|
||||
|
||||
s3_aggregated_upload_key.label:
|
||||
"""S3 object key template"""
|
||||
s3_aggregated_upload_key.desc:
|
||||
"""Template for the S3 object key of an aggregated upload.<br/>
|
||||
Template may contain placeholders for the following variables:
|
||||
<ul>
|
||||
<li><code>${action}</code>: name of the action (required).<li/>
|
||||
<li><code>${node}</code>: name of the EMQX node conducting the upload (required).<li/>
|
||||
<li><code>${datetime.{format}}</code>: date and time when aggregation started, formatted according to the <code>{format}</code> string (required):
|
||||
<ul>
|
||||
<li><code>${datetime.rfc3339utc}</code>: RFC3339-formatted date and time in UTC,<li/>
|
||||
<li><code>${datetime.rfc3339}</code>: RFC3339-formatted date and time in local timezone,<li/>
|
||||
<li><code>${datetime.unix}</code>: Unix timestamp.<li/>
|
||||
</ul>
|
||||
<li/>
|
||||
<li><code>${datetime_until.{format}}</code>: date and time when aggregation ended, with the same formatting options.<li/>
|
||||
<li><code>${sequence}</code>: sequence number of the aggregated upload within the same time interval (required).<li/>
|
||||
</ul>
|
||||
All other placeholders are considered invalid. Note that placeholders marked as required will be added as a path suffix to the S3 object key if they are missing from the template."""
|
||||
}
|
|
@ -1,13 +1,23 @@
|
|||
emqx_bridge_s3_upload {
|
||||
|
||||
s3_upload.label:
|
||||
"""S3 Simple Upload"""
|
||||
"""Upload to S3"""
|
||||
s3_upload.desc:
|
||||
"""Action to upload a single object to the S3 service."""
|
||||
"""Action that takes incoming events and uploads them to the S3 API compatible service."""
|
||||
|
||||
s3_upload_parameters.label:
|
||||
"""S3 Upload action parameters"""
|
||||
s3_upload_parameters.desc:
|
||||
s3_parameters.label:
|
||||
"""S3 Upload parameters"""
|
||||
s3_parameters.desc:
|
||||
"""Set of parameters for the upload action."""
|
||||
|
||||
s3_direct_upload_mode.label:
|
||||
"""Direct S3 Upload"""
|
||||
s3_direct_upload_mode.desc:
|
||||
"""Enables uploading of events to the S3 service as separate objects."""
|
||||
|
||||
s3_direct_upload_parameters.label:
|
||||
"""Direct S3 Upload action parameters"""
|
||||
s3_direct_upload_parameters.desc:
|
||||
"""Set of parameters for the upload action. Action supports templates in S3 bucket name, object key and object content."""
|
||||
|
||||
s3_object_content.label:
|
||||
|
@ -15,4 +25,66 @@ s3_object_content.label:
|
|||
s3_object_content.desc:
|
||||
"""Content of the S3 object being uploaded. Supports templates."""
|
||||
|
||||
s3_aggregated_upload_mode.label:
|
||||
"""Aggregated S3 Upload"""
|
||||
s3_aggregated_upload_mode.desc:
|
||||
"""Enables time-based aggregation of incoming events and uploading them to the S3 service as a single object."""
|
||||
|
||||
s3_aggregated_upload_parameters.label:
|
||||
"""Aggregated S3 Upload action parameters"""
|
||||
s3_aggregated_upload_parameters.desc:
|
||||
"""Set of parameters for the aggregated upload action."""
|
||||
|
||||
s3_aggregation.label:
|
||||
"""Aggregation parameters"""
|
||||
s3_aggregation.desc:
|
||||
"""Set of parameters governing the aggregation process."""
|
||||
|
||||
s3_aggregation_interval.label:
|
||||
"""Time interval"""
|
||||
s3_aggregation_interval.desc:
|
||||
"""Amount of time events will be aggregated in a single object before uploading."""
|
||||
|
||||
s3_aggregation_max_records.label:
|
||||
"""Maximum number of records"""
|
||||
s3_aggregation_max_records.desc:
|
||||
"""Number of records (events) allowed per each aggregated object. Each aggregated upload will contain no more than that number of events, but may contain less.<br/>
|
||||
If event rate is high enough, there obviously may be more than one aggregated upload during the same time interval. These uploads will have different, but consecutive sequence numbers, which will be a part of S3 object key."""
|
||||
|
||||
s3_aggregated_container.label:
|
||||
"""Container for aggregated events"""
|
||||
s3_aggregated_container.desc:
|
||||
"""Settings governing the file format of an upload containing aggregated events."""
|
||||
|
||||
s3_aggregated_container_csv.label:
|
||||
"""CSV container"""
|
||||
s3_aggregated_container_csv.desc:
|
||||
"""Records (events) will be aggregated and uploaded as a CSV file."""
|
||||
|
||||
s3_aggregated_container_csv_column_order.label:
|
||||
"""CSV column order"""
|
||||
s3_aggregated_container_csv_column_order.desc:
|
||||
"""Event fields that will be ordered first as columns in the resulting CSV file.<br/>
|
||||
Regardless of this setting, resulting CSV will contain all the fields of aggregated events, but all the columns not explicitly mentioned here will be ordered after the ones listed here in the lexicographical order."""
|
||||
|
||||
s3_aggregated_upload_key.label:
|
||||
"""S3 object key template"""
|
||||
s3_aggregated_upload_key.desc:
|
||||
"""Template for the S3 object key of an aggregated upload.<br/>
|
||||
Template may contain placeholders for the following variables:
|
||||
<ul>
|
||||
<li><code>${action}</code>: name of the action (required).<li/>
|
||||
<li><code>${node}</code>: name of the EMQX node conducting the upload (required).<li/>
|
||||
<li><code>${datetime.{format}}</code>: date and time when aggregation started, formatted according to the <code>{format}</code> string (required):
|
||||
<ul>
|
||||
<li><code>${datetime.rfc3339utc}</code>: RFC3339-formatted date and time in UTC,<li/>
|
||||
<li><code>${datetime.rfc3339}</code>: RFC3339-formatted date and time in local timezone,<li/>
|
||||
<li><code>${datetime.unix}</code>: Unix timestamp.<li/>
|
||||
</ul>
|
||||
<li/>
|
||||
<li><code>${datetime_until.{format}}</code>: date and time when aggregation ended, with the same formatting options.<li/>
|
||||
<li><code>${sequence}</code>: sequence number of the aggregated upload within the same time interval (required).<li/>
|
||||
</ul>
|
||||
All other placeholders are considered invalid. Note that placeholders marked as required will be added as a path suffix to the S3 object key if they are missing from the template."""
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue