Merge pull request #13003 from thalesmg/fix-aggreg-name-r57-20240509

fix(aggregator): namespace aggregator ids with action type
This commit is contained in:
Thales Macedo Garitezi 2024-05-09 17:25:06 -03:00 committed by GitHub
commit e9af99c625
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 55 additions and 39 deletions

View File

@ -209,6 +209,7 @@ start_channel(State, #{
bucket := Bucket bucket := Bucket
} }
}) -> }) ->
AggregId = {Type, Name},
AggregOpts = #{ AggregOpts = #{
time_interval => TimeInterval, time_interval => TimeInterval,
max_records => MaxRecords, max_records => MaxRecords,
@ -223,19 +224,21 @@ start_channel(State, #{
client_config => maps:get(client_config, State), client_config => maps:get(client_config, State),
uploader_config => maps:with([min_part_size, max_part_size], Parameters) uploader_config => maps:with([min_part_size, max_part_size], Parameters)
}, },
_ = emqx_connector_aggreg_sup:delete_child({Type, Name}), _ = emqx_connector_aggreg_sup:delete_child(AggregId),
{ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{ {ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{
id => {Type, Name}, id => AggregId,
start => {emqx_connector_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]}, start =>
{emqx_connector_aggreg_upload_sup, start_link, [AggregId, AggregOpts, DeliveryOpts]},
type => supervisor, type => supervisor,
restart => permanent restart => permanent
}), }),
#{ #{
type => ?ACTION_AGGREGATED_UPLOAD, type => ?ACTION_AGGREGATED_UPLOAD,
name => Name, name => Name,
aggreg_id => AggregId,
bucket => Bucket, bucket => Bucket,
supervisor => SupPid, supervisor => SupPid,
on_stop => fun() -> emqx_connector_aggreg_sup:delete_child({Type, Name}) end on_stop => fun() -> emqx_connector_aggreg_sup:delete_child(AggregId) end
}. }.
upload_options(Parameters) -> upload_options(Parameters) ->
@ -254,12 +257,14 @@ channel_status(#{type := ?ACTION_UPLOAD}, _State) ->
%% Since bucket name may be templated, we can't really provide any additional %% Since bucket name may be templated, we can't really provide any additional
%% information regarding the channel health. %% information regarding the channel health.
?status_connected; ?status_connected;
channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) -> channel_status(
#{type := ?ACTION_AGGREGATED_UPLOAD, aggreg_id := AggregId, bucket := Bucket}, State
) ->
%% NOTE: This will effectively trigger uploads of buffers yet to be uploaded. %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
Timestamp = erlang:system_time(second), Timestamp = erlang:system_time(second),
ok = emqx_connector_aggregator:tick(Name, Timestamp), ok = emqx_connector_aggregator:tick(AggregId, Timestamp),
ok = check_bucket_accessible(Bucket, State), ok = check_bucket_accessible(Bucket, State),
ok = check_aggreg_upload_errors(Name), ok = check_aggreg_upload_errors(AggregId),
?status_connected. ?status_connected.
check_bucket_accessible(Bucket, #{client_config := Config}) -> check_bucket_accessible(Bucket, #{client_config := Config}) ->
@ -278,8 +283,8 @@ check_bucket_accessible(Bucket, #{client_config := Config}) ->
end end
end. end.
check_aggreg_upload_errors(Name) -> check_aggreg_upload_errors(AggregId) ->
case emqx_connector_aggregator:take_error(Name) of case emqx_connector_aggregator:take_error(AggregId) of
[Error] -> [Error] ->
%% TODO %% TODO
%% This approach means that, for example, 3 upload failures will cause %% This approach means that, for example, 3 upload failures will cause
@ -353,11 +358,11 @@ run_simple_upload(
{error, map_error(Reason)} {error, map_error(Reason)}
end. end.
run_aggregated_upload(InstId, Records, #{name := Name}) -> run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) ->
Timestamp = erlang:system_time(second), Timestamp = erlang:system_time(second),
case emqx_connector_aggregator:push_records(Name, Timestamp, Records) of case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of
ok -> ok ->
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}), ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}),
ok; ok;
{error, Reason} -> {error, Reason} ->
{error, {unrecoverable_error, Reason}} {error, {unrecoverable_error, Reason}}
@ -406,8 +411,8 @@ init_transfer_state(BufferMap, Opts) ->
Key = mk_object_key(BufferMap, Opts), Key = mk_object_key(BufferMap, Opts),
emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig). emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
mk_object_key(BufferMap, #{action := Name, key := Template}) -> mk_object_key(BufferMap, #{action := AggregId, key := Template}) ->
emqx_template:render_strict(Template, {?MODULE, {Name, BufferMap}}). emqx_template:render_strict(Template, {?MODULE, {AggregId, BufferMap}}).
process_append(Writes, Upload0) -> process_append(Writes, Upload0) ->
{ok, Upload} = emqx_s3_upload:append(Writes, Upload0), {ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
@ -443,9 +448,9 @@ process_terminate(Upload) ->
-spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) -> -spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) ->
{ok, integer() | string()} | {error, undefined}. {ok, integer() | string()} | {error, undefined}.
lookup([<<"action">>], {Name, _Buffer}) -> lookup([<<"action">>], {_AggregId = {_Type, Name}, _Buffer}) ->
{ok, mk_fs_safe_string(Name)}; {ok, mk_fs_safe_string(Name)};
lookup(Accessor, {_Name, Buffer = #{}}) -> lookup(Accessor, {_AggregId, Buffer = #{}}) ->
lookup_buffer_var(Accessor, Buffer); lookup_buffer_var(Accessor, Buffer);
lookup(_Accessor, _Context) -> lookup(_Accessor, _Context) ->
{error, undefined}. {error, undefined}.

View File

@ -158,6 +158,7 @@ t_on_get_status(Config) ->
t_aggreg_upload(Config) -> t_aggreg_upload(Config) ->
Bucket = ?config(s3_bucket, Config), Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
AggregId = aggreg_id(BridgeName),
BridgeNameString = unicode:characters_to_list(BridgeName), BridgeNameString = unicode:characters_to_list(BridgeName),
NodeString = atom_to_list(node()), NodeString = atom_to_list(node()),
%% Create a bridge with the sample configuration. %% Create a bridge with the sample configuration.
@ -170,7 +171,7 @@ t_aggreg_upload(Config) ->
]), ]),
ok = send_messages(BridgeName, MessageEvents), ok = send_messages(BridgeName, MessageEvents),
%% Wait until the delivery is completed. %% Wait until the delivery is completed.
?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
%% Check the uploaded objects. %% Check the uploaded objects.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
?assertMatch( ?assertMatch(
@ -196,6 +197,7 @@ t_aggreg_upload(Config) ->
t_aggreg_upload_rule(Config) -> t_aggreg_upload_rule(Config) ->
Bucket = ?config(s3_bucket, Config), Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
AggregId = aggreg_id(BridgeName),
ClientID = emqx_utils_conv:bin(?FUNCTION_NAME), ClientID = emqx_utils_conv:bin(?FUNCTION_NAME),
%% Create a bridge with the sample configuration and a simple SQL rule. %% Create a bridge with the sample configuration and a simple SQL rule.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
@ -217,7 +219,7 @@ t_aggreg_upload_rule(Config) ->
emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>), emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>),
emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>) emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>)
]), ]),
?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
%% Check the uploaded objects. %% Check the uploaded objects.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
_CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key), _CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key),
@ -249,6 +251,7 @@ t_aggreg_upload_restart(Config) ->
%% after a restart. %% after a restart.
Bucket = ?config(s3_bucket, Config), Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
AggregId = aggreg_id(BridgeName),
%% Create a bridge with the sample configuration. %% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
%% Send some sample messages that look like Rule SQL productions. %% Send some sample messages that look like Rule SQL productions.
@ -258,15 +261,15 @@ t_aggreg_upload_restart(Config) ->
{<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>} {<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>}
]), ]),
ok = send_messages(BridgeName, MessageEvents), ok = send_messages(BridgeName, MessageEvents),
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := AggregId}),
%% Restart the bridge. %% Restart the bridge.
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
%% Send some more messages. %% Send some more messages.
ok = send_messages(BridgeName, MessageEvents), ok = send_messages(BridgeName, MessageEvents),
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := AggregId}),
%% Wait until the delivery is completed. %% Wait until the delivery is completed.
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
%% Check there's still only one upload. %% Check there's still only one upload.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
_Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), _Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
@ -289,6 +292,7 @@ t_aggreg_upload_restart_corrupted(Config) ->
%% and does so while preserving uncompromised data. %% and does so while preserving uncompromised data.
Bucket = ?config(s3_bucket, Config), Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
AggregId = aggreg_id(BridgeName),
BatchSize = ?CONF_MAX_RECORDS div 2, BatchSize = ?CONF_MAX_RECORDS div 2,
%% Create a bridge with the sample configuration. %% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
@ -300,13 +304,13 @@ t_aggreg_upload_restart_corrupted(Config) ->
%% Ensure that they span multiple batch queries. %% Ensure that they span multiple batch queries.
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1), ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1),
{ok, _} = ?block_until( {ok, _} = ?block_until(
#{?snk_kind := connector_aggreg_records_written, action := BridgeName}, #{?snk_kind := connector_aggreg_records_written, action := AggregId},
infinity, infinity,
0 0
), ),
%% Find out the buffer file. %% Find out the buffer file.
{ok, #{filename := Filename}} = ?block_until( {ok, #{filename := Filename}} = ?block_until(
#{?snk_kind := connector_aggreg_buffer_allocated, action := BridgeName} #{?snk_kind := connector_aggreg_buffer_allocated, action := AggregId}
), ),
%% Stop the bridge, corrupt the buffer file, and restart the bridge. %% Stop the bridge, corrupt the buffer file, and restart the bridge.
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
@ -320,7 +324,7 @@ t_aggreg_upload_restart_corrupted(Config) ->
], ],
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0), ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0),
%% Wait until the delivery is completed. %% Wait until the delivery is completed.
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
%% Check that upload contains part of the first batch and all of the second batch. %% Check that upload contains part of the first batch and all of the second batch.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key), CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key),
@ -341,6 +345,7 @@ t_aggreg_pending_upload_restart(Config) ->
%% a restart. %% a restart.
Bucket = ?config(s3_bucket, Config), Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
AggregId = aggreg_id(BridgeName),
%% Create a bridge with the sample configuration. %% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
%% Send few large messages that will require multipart upload. %% Send few large messages that will require multipart upload.
@ -362,7 +367,7 @@ t_aggreg_pending_upload_restart(Config) ->
%% Restart the bridge. %% Restart the bridge.
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), {ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
%% Wait until the delivery is completed. %% Wait until the delivery is completed.
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}), {ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
%% Check that delivery contains all the messages. %% Check that delivery contains all the messages.
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket), _Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
[_Header | Rows] = fetch_parse_csv(Bucket, Key), [_Header | Rows] = fetch_parse_csv(Bucket, Key),
@ -377,6 +382,7 @@ t_aggreg_next_rotate(Config) ->
%% and windowing work correctly under high rate, high concurrency conditions. %% and windowing work correctly under high rate, high concurrency conditions.
Bucket = ?config(s3_bucket, Config), Bucket = ?config(s3_bucket, Config),
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
AggregId = aggreg_id(BridgeName),
NSenders = 4, NSenders = 4,
%% Create a bridge with the sample configuration. %% Create a bridge with the sample configuration.
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)), ?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
@ -393,7 +399,7 @@ t_aggreg_next_rotate(Config) ->
%% Wait for the last delivery to complete. %% Wait for the last delivery to complete.
ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)), ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)),
?block_until( ?block_until(
#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}, infinity, 0 #{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}, infinity, 0
), ),
%% There should be at least 2 time windows of aggregated records. %% There should be at least 2 time windows of aggregated records.
Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)], Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)],
@ -465,3 +471,6 @@ fetch_parse_csv(Bucket, Key) ->
#{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key), #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
{ok, CSV} = erl_csv:decode(Content), {ok, CSV} = erl_csv:decode(Content),
CSV. CSV.
aggreg_id(BridgeName) ->
{?BRIDGE_TYPE, BridgeName}.

View File

@ -25,7 +25,7 @@
]). ]).
-record(delivery, { -record(delivery, {
name :: _Name, id :: id(),
callback_module :: module(), callback_module :: module(),
container :: emqx_connector_aggreg_csv:container(), container :: emqx_connector_aggreg_csv:container(),
reader :: emqx_connector_aggreg_buffer:reader(), reader :: emqx_connector_aggreg_buffer:reader(),
@ -33,6 +33,8 @@
empty :: boolean() empty :: boolean()
}). }).
-type id() :: term().
-type state() :: #delivery{}. -type state() :: #delivery{}.
-type init_opts() :: #{ -type init_opts() :: #{
@ -59,22 +61,22 @@
%% %%
start_link(Name, Buffer, Opts) -> start_link(Id, Buffer, Opts) ->
proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]). proc_lib:start_link(?MODULE, init, [self(), Id, Buffer, Opts]).
%% %%
-spec init(pid(), _Name, buffer(), init_opts()) -> no_return(). -spec init(pid(), id(), buffer(), init_opts()) -> no_return().
init(Parent, Name, Buffer, Opts) -> init(Parent, Id, Buffer, Opts) ->
?tp(connector_aggreg_delivery_started, #{action => Name, buffer => Buffer}), ?tp(connector_aggreg_delivery_started, #{action => Id, buffer => Buffer}),
Reader = open_buffer(Buffer), Reader = open_buffer(Buffer),
Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}), Delivery = init_delivery(Id, Reader, Buffer, Opts#{action => Id}),
_ = erlang:process_flag(trap_exit, true), _ = erlang:process_flag(trap_exit, true),
ok = proc_lib:init_ack({ok, self()}), ok = proc_lib:init_ack({ok, self()}),
loop(Delivery, Parent, []). loop(Delivery, Parent, []).
init_delivery( init_delivery(
Name, Id,
Reader, Reader,
Buffer, Buffer,
Opts = #{ Opts = #{
@ -84,7 +86,7 @@ init_delivery(
) -> ) ->
BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer), BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer),
#delivery{ #delivery{
name = Name, id = Id,
callback_module = Mod, callback_module = Mod,
container = mk_container(ContainerOpts), container = mk_container(ContainerOpts),
reader = Reader, reader = Reader,
@ -158,16 +160,16 @@ process_write(Delivery = #delivery{callback_module = Mod, transfer = Transfer0})
error({transfer_failed, Reason}) error({transfer_failed, Reason})
end. end.
process_complete(#delivery{name = Name, empty = true}) -> process_complete(#delivery{id = Id, empty = true}) ->
?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => empty}), ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => empty}),
exit({shutdown, {skipped, empty}}); exit({shutdown, {skipped, empty}});
process_complete(#delivery{ process_complete(#delivery{
name = Name, callback_module = Mod, container = Container, transfer = Transfer0 id = Id, callback_module = Mod, container = Container, transfer = Transfer0
}) -> }) ->
Trailer = emqx_connector_aggreg_csv:close(Container), Trailer = emqx_connector_aggreg_csv:close(Container),
Transfer = Mod:process_append(Trailer, Transfer0), Transfer = Mod:process_append(Trailer, Transfer0),
{ok, Completed} = Mod:process_complete(Transfer), {ok, Completed} = Mod:process_complete(Transfer),
?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => Completed}), ?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => Completed}),
ok. ok.
%% %%