fix(aggregator): namespace aggregator ids with action type
Otherwise, actions of different types but same names will clash when starting the aggregator supervision tree.
This commit is contained in:
parent
57dda70d2f
commit
7b80a9aa44
|
@ -209,6 +209,7 @@ start_channel(State, #{
|
||||||
bucket := Bucket
|
bucket := Bucket
|
||||||
}
|
}
|
||||||
}) ->
|
}) ->
|
||||||
|
AggregId = {Type, Name},
|
||||||
AggregOpts = #{
|
AggregOpts = #{
|
||||||
time_interval => TimeInterval,
|
time_interval => TimeInterval,
|
||||||
max_records => MaxRecords,
|
max_records => MaxRecords,
|
||||||
|
@ -223,19 +224,21 @@ start_channel(State, #{
|
||||||
client_config => maps:get(client_config, State),
|
client_config => maps:get(client_config, State),
|
||||||
uploader_config => maps:with([min_part_size, max_part_size], Parameters)
|
uploader_config => maps:with([min_part_size, max_part_size], Parameters)
|
||||||
},
|
},
|
||||||
_ = emqx_connector_aggreg_sup:delete_child({Type, Name}),
|
_ = emqx_connector_aggreg_sup:delete_child(AggregId),
|
||||||
{ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{
|
{ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{
|
||||||
id => {Type, Name},
|
id => AggregId,
|
||||||
start => {emqx_connector_aggreg_upload_sup, start_link, [Name, AggregOpts, DeliveryOpts]},
|
start =>
|
||||||
|
{emqx_connector_aggreg_upload_sup, start_link, [AggregId, AggregOpts, DeliveryOpts]},
|
||||||
type => supervisor,
|
type => supervisor,
|
||||||
restart => permanent
|
restart => permanent
|
||||||
}),
|
}),
|
||||||
#{
|
#{
|
||||||
type => ?ACTION_AGGREGATED_UPLOAD,
|
type => ?ACTION_AGGREGATED_UPLOAD,
|
||||||
name => Name,
|
name => Name,
|
||||||
|
aggreg_id => AggregId,
|
||||||
bucket => Bucket,
|
bucket => Bucket,
|
||||||
supervisor => SupPid,
|
supervisor => SupPid,
|
||||||
on_stop => fun() -> emqx_connector_aggreg_sup:delete_child({Type, Name}) end
|
on_stop => fun() -> emqx_connector_aggreg_sup:delete_child(AggregId) end
|
||||||
}.
|
}.
|
||||||
|
|
||||||
upload_options(Parameters) ->
|
upload_options(Parameters) ->
|
||||||
|
@ -254,12 +257,14 @@ channel_status(#{type := ?ACTION_UPLOAD}, _State) ->
|
||||||
%% Since bucket name may be templated, we can't really provide any additional
|
%% Since bucket name may be templated, we can't really provide any additional
|
||||||
%% information regarding the channel health.
|
%% information regarding the channel health.
|
||||||
?status_connected;
|
?status_connected;
|
||||||
channel_status(#{type := ?ACTION_AGGREGATED_UPLOAD, name := Name, bucket := Bucket}, State) ->
|
channel_status(
|
||||||
|
#{type := ?ACTION_AGGREGATED_UPLOAD, aggreg_id := AggregId, bucket := Bucket}, State
|
||||||
|
) ->
|
||||||
%% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
|
%% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
|
||||||
Timestamp = erlang:system_time(second),
|
Timestamp = erlang:system_time(second),
|
||||||
ok = emqx_connector_aggregator:tick(Name, Timestamp),
|
ok = emqx_connector_aggregator:tick(AggregId, Timestamp),
|
||||||
ok = check_bucket_accessible(Bucket, State),
|
ok = check_bucket_accessible(Bucket, State),
|
||||||
ok = check_aggreg_upload_errors(Name),
|
ok = check_aggreg_upload_errors(AggregId),
|
||||||
?status_connected.
|
?status_connected.
|
||||||
|
|
||||||
check_bucket_accessible(Bucket, #{client_config := Config}) ->
|
check_bucket_accessible(Bucket, #{client_config := Config}) ->
|
||||||
|
@ -278,8 +283,8 @@ check_bucket_accessible(Bucket, #{client_config := Config}) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_aggreg_upload_errors(Name) ->
|
check_aggreg_upload_errors(AggregId) ->
|
||||||
case emqx_connector_aggregator:take_error(Name) of
|
case emqx_connector_aggregator:take_error(AggregId) of
|
||||||
[Error] ->
|
[Error] ->
|
||||||
%% TODO
|
%% TODO
|
||||||
%% This approach means that, for example, 3 upload failures will cause
|
%% This approach means that, for example, 3 upload failures will cause
|
||||||
|
@ -353,11 +358,11 @@ run_simple_upload(
|
||||||
{error, map_error(Reason)}
|
{error, map_error(Reason)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
run_aggregated_upload(InstId, Records, #{name := Name}) ->
|
run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) ->
|
||||||
Timestamp = erlang:system_time(second),
|
Timestamp = erlang:system_time(second),
|
||||||
case emqx_connector_aggregator:push_records(Name, Timestamp, Records) of
|
case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of
|
||||||
ok ->
|
ok ->
|
||||||
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => Name}),
|
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}),
|
||||||
ok;
|
ok;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, {unrecoverable_error, Reason}}
|
{error, {unrecoverable_error, Reason}}
|
||||||
|
@ -406,8 +411,8 @@ init_transfer_state(BufferMap, Opts) ->
|
||||||
Key = mk_object_key(BufferMap, Opts),
|
Key = mk_object_key(BufferMap, Opts),
|
||||||
emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
|
emqx_s3_upload:new(Client, Key, UploadOpts, UploaderConfig).
|
||||||
|
|
||||||
mk_object_key(BufferMap, #{action := Name, key := Template}) ->
|
mk_object_key(BufferMap, #{action := AggregId, key := Template}) ->
|
||||||
emqx_template:render_strict(Template, {?MODULE, {Name, BufferMap}}).
|
emqx_template:render_strict(Template, {?MODULE, {AggregId, BufferMap}}).
|
||||||
|
|
||||||
process_append(Writes, Upload0) ->
|
process_append(Writes, Upload0) ->
|
||||||
{ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
|
{ok, Upload} = emqx_s3_upload:append(Writes, Upload0),
|
||||||
|
@ -443,9 +448,9 @@ process_terminate(Upload) ->
|
||||||
|
|
||||||
-spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) ->
|
-spec lookup(emqx_template:accessor(), {_Name, buffer_map()}) ->
|
||||||
{ok, integer() | string()} | {error, undefined}.
|
{ok, integer() | string()} | {error, undefined}.
|
||||||
lookup([<<"action">>], {Name, _Buffer}) ->
|
lookup([<<"action">>], {_AggregId = {_Type, Name}, _Buffer}) ->
|
||||||
{ok, mk_fs_safe_string(Name)};
|
{ok, mk_fs_safe_string(Name)};
|
||||||
lookup(Accessor, {_Name, Buffer = #{}}) ->
|
lookup(Accessor, {_AggregId, Buffer = #{}}) ->
|
||||||
lookup_buffer_var(Accessor, Buffer);
|
lookup_buffer_var(Accessor, Buffer);
|
||||||
lookup(_Accessor, _Context) ->
|
lookup(_Accessor, _Context) ->
|
||||||
{error, undefined}.
|
{error, undefined}.
|
||||||
|
|
|
@ -158,6 +158,7 @@ t_on_get_status(Config) ->
|
||||||
t_aggreg_upload(Config) ->
|
t_aggreg_upload(Config) ->
|
||||||
Bucket = ?config(s3_bucket, Config),
|
Bucket = ?config(s3_bucket, Config),
|
||||||
BridgeName = ?config(bridge_name, Config),
|
BridgeName = ?config(bridge_name, Config),
|
||||||
|
AggregId = aggreg_id(BridgeName),
|
||||||
BridgeNameString = unicode:characters_to_list(BridgeName),
|
BridgeNameString = unicode:characters_to_list(BridgeName),
|
||||||
NodeString = atom_to_list(node()),
|
NodeString = atom_to_list(node()),
|
||||||
%% Create a bridge with the sample configuration.
|
%% Create a bridge with the sample configuration.
|
||||||
|
@ -170,7 +171,7 @@ t_aggreg_upload(Config) ->
|
||||||
]),
|
]),
|
||||||
ok = send_messages(BridgeName, MessageEvents),
|
ok = send_messages(BridgeName, MessageEvents),
|
||||||
%% Wait until the delivery is completed.
|
%% Wait until the delivery is completed.
|
||||||
?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
|
?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
|
||||||
%% Check the uploaded objects.
|
%% Check the uploaded objects.
|
||||||
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
|
@ -196,6 +197,7 @@ t_aggreg_upload(Config) ->
|
||||||
t_aggreg_upload_rule(Config) ->
|
t_aggreg_upload_rule(Config) ->
|
||||||
Bucket = ?config(s3_bucket, Config),
|
Bucket = ?config(s3_bucket, Config),
|
||||||
BridgeName = ?config(bridge_name, Config),
|
BridgeName = ?config(bridge_name, Config),
|
||||||
|
AggregId = aggreg_id(BridgeName),
|
||||||
ClientID = emqx_utils_conv:bin(?FUNCTION_NAME),
|
ClientID = emqx_utils_conv:bin(?FUNCTION_NAME),
|
||||||
%% Create a bridge with the sample configuration and a simple SQL rule.
|
%% Create a bridge with the sample configuration and a simple SQL rule.
|
||||||
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
||||||
|
@ -217,7 +219,7 @@ t_aggreg_upload_rule(Config) ->
|
||||||
emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>),
|
emqx_message:make(?FUNCTION_NAME, T3 = <<"s3/empty">>, P3 = <<>>),
|
||||||
emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>)
|
emqx_message:make(?FUNCTION_NAME, <<"not/s3">>, <<"should not be here">>)
|
||||||
]),
|
]),
|
||||||
?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
|
?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
|
||||||
%% Check the uploaded objects.
|
%% Check the uploaded objects.
|
||||||
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
||||||
_CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key),
|
_CSV = [Header | Rows] = fetch_parse_csv(Bucket, Key),
|
||||||
|
@ -249,6 +251,7 @@ t_aggreg_upload_restart(Config) ->
|
||||||
%% after a restart.
|
%% after a restart.
|
||||||
Bucket = ?config(s3_bucket, Config),
|
Bucket = ?config(s3_bucket, Config),
|
||||||
BridgeName = ?config(bridge_name, Config),
|
BridgeName = ?config(bridge_name, Config),
|
||||||
|
AggregId = aggreg_id(BridgeName),
|
||||||
%% Create a bridge with the sample configuration.
|
%% Create a bridge with the sample configuration.
|
||||||
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
||||||
%% Send some sample messages that look like Rule SQL productions.
|
%% Send some sample messages that look like Rule SQL productions.
|
||||||
|
@ -258,15 +261,15 @@ t_aggreg_upload_restart(Config) ->
|
||||||
{<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>}
|
{<<"C3">>, T3 = <<"t/42">>, P3 = <<"">>}
|
||||||
]),
|
]),
|
||||||
ok = send_messages(BridgeName, MessageEvents),
|
ok = send_messages(BridgeName, MessageEvents),
|
||||||
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}),
|
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := AggregId}),
|
||||||
%% Restart the bridge.
|
%% Restart the bridge.
|
||||||
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
|
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
|
||||||
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
|
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
|
||||||
%% Send some more messages.
|
%% Send some more messages.
|
||||||
ok = send_messages(BridgeName, MessageEvents),
|
ok = send_messages(BridgeName, MessageEvents),
|
||||||
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := BridgeName}),
|
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_records_written, action := AggregId}),
|
||||||
%% Wait until the delivery is completed.
|
%% Wait until the delivery is completed.
|
||||||
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
|
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
|
||||||
%% Check there's still only one upload.
|
%% Check there's still only one upload.
|
||||||
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
||||||
_Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
|
_Upload = #{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
|
||||||
|
@ -289,6 +292,7 @@ t_aggreg_upload_restart_corrupted(Config) ->
|
||||||
%% and does so while preserving uncompromised data.
|
%% and does so while preserving uncompromised data.
|
||||||
Bucket = ?config(s3_bucket, Config),
|
Bucket = ?config(s3_bucket, Config),
|
||||||
BridgeName = ?config(bridge_name, Config),
|
BridgeName = ?config(bridge_name, Config),
|
||||||
|
AggregId = aggreg_id(BridgeName),
|
||||||
BatchSize = ?CONF_MAX_RECORDS div 2,
|
BatchSize = ?CONF_MAX_RECORDS div 2,
|
||||||
%% Create a bridge with the sample configuration.
|
%% Create a bridge with the sample configuration.
|
||||||
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
||||||
|
@ -300,13 +304,13 @@ t_aggreg_upload_restart_corrupted(Config) ->
|
||||||
%% Ensure that they span multiple batch queries.
|
%% Ensure that they span multiple batch queries.
|
||||||
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1),
|
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages1), 1),
|
||||||
{ok, _} = ?block_until(
|
{ok, _} = ?block_until(
|
||||||
#{?snk_kind := connector_aggreg_records_written, action := BridgeName},
|
#{?snk_kind := connector_aggreg_records_written, action := AggregId},
|
||||||
infinity,
|
infinity,
|
||||||
0
|
0
|
||||||
),
|
),
|
||||||
%% Find out the buffer file.
|
%% Find out the buffer file.
|
||||||
{ok, #{filename := Filename}} = ?block_until(
|
{ok, #{filename := Filename}} = ?block_until(
|
||||||
#{?snk_kind := connector_aggreg_buffer_allocated, action := BridgeName}
|
#{?snk_kind := connector_aggreg_buffer_allocated, action := AggregId}
|
||||||
),
|
),
|
||||||
%% Stop the bridge, corrupt the buffer file, and restart the bridge.
|
%% Stop the bridge, corrupt the buffer file, and restart the bridge.
|
||||||
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
|
{ok, _} = emqx_bridge_v2:disable_enable(disable, ?BRIDGE_TYPE, BridgeName),
|
||||||
|
@ -320,7 +324,7 @@ t_aggreg_upload_restart_corrupted(Config) ->
|
||||||
],
|
],
|
||||||
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0),
|
ok = send_messages_delayed(BridgeName, lists:map(fun mk_message_event/1, Messages2), 0),
|
||||||
%% Wait until the delivery is completed.
|
%% Wait until the delivery is completed.
|
||||||
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
|
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
|
||||||
%% Check that upload contains part of the first batch and all of the second batch.
|
%% Check that upload contains part of the first batch and all of the second batch.
|
||||||
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
||||||
CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key),
|
CSV = [_Header | Rows] = fetch_parse_csv(Bucket, Key),
|
||||||
|
@ -341,6 +345,7 @@ t_aggreg_pending_upload_restart(Config) ->
|
||||||
%% a restart.
|
%% a restart.
|
||||||
Bucket = ?config(s3_bucket, Config),
|
Bucket = ?config(s3_bucket, Config),
|
||||||
BridgeName = ?config(bridge_name, Config),
|
BridgeName = ?config(bridge_name, Config),
|
||||||
|
AggregId = aggreg_id(BridgeName),
|
||||||
%% Create a bridge with the sample configuration.
|
%% Create a bridge with the sample configuration.
|
||||||
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
||||||
%% Send few large messages that will require multipart upload.
|
%% Send few large messages that will require multipart upload.
|
||||||
|
@ -362,7 +367,7 @@ t_aggreg_pending_upload_restart(Config) ->
|
||||||
%% Restart the bridge.
|
%% Restart the bridge.
|
||||||
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
|
{ok, _} = emqx_bridge_v2:disable_enable(enable, ?BRIDGE_TYPE, BridgeName),
|
||||||
%% Wait until the delivery is completed.
|
%% Wait until the delivery is completed.
|
||||||
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}),
|
{ok, _} = ?block_until(#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}),
|
||||||
%% Check that delivery contains all the messages.
|
%% Check that delivery contains all the messages.
|
||||||
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
_Uploads = [#{key := Key}] = emqx_bridge_s3_test_helpers:list_objects(Bucket),
|
||||||
[_Header | Rows] = fetch_parse_csv(Bucket, Key),
|
[_Header | Rows] = fetch_parse_csv(Bucket, Key),
|
||||||
|
@ -377,6 +382,7 @@ t_aggreg_next_rotate(Config) ->
|
||||||
%% and windowing work correctly under high rate, high concurrency conditions.
|
%% and windowing work correctly under high rate, high concurrency conditions.
|
||||||
Bucket = ?config(s3_bucket, Config),
|
Bucket = ?config(s3_bucket, Config),
|
||||||
BridgeName = ?config(bridge_name, Config),
|
BridgeName = ?config(bridge_name, Config),
|
||||||
|
AggregId = aggreg_id(BridgeName),
|
||||||
NSenders = 4,
|
NSenders = 4,
|
||||||
%% Create a bridge with the sample configuration.
|
%% Create a bridge with the sample configuration.
|
||||||
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
?assertMatch({ok, _Bridge}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
||||||
|
@ -393,7 +399,7 @@ t_aggreg_next_rotate(Config) ->
|
||||||
%% Wait for the last delivery to complete.
|
%% Wait for the last delivery to complete.
|
||||||
ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)),
|
ok = timer:sleep(round(?CONF_TIME_INTERVAL * 0.5)),
|
||||||
?block_until(
|
?block_until(
|
||||||
#{?snk_kind := connector_aggreg_delivery_completed, action := BridgeName}, infinity, 0
|
#{?snk_kind := connector_aggreg_delivery_completed, action := AggregId}, infinity, 0
|
||||||
),
|
),
|
||||||
%% There should be at least 2 time windows of aggregated records.
|
%% There should be at least 2 time windows of aggregated records.
|
||||||
Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)],
|
Uploads = [K || #{key := K} <- emqx_bridge_s3_test_helpers:list_objects(Bucket)],
|
||||||
|
@ -465,3 +471,6 @@ fetch_parse_csv(Bucket, Key) ->
|
||||||
#{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
|
#{content := Content} = emqx_bridge_s3_test_helpers:get_object(Bucket, Key),
|
||||||
{ok, CSV} = erl_csv:decode(Content),
|
{ok, CSV} = erl_csv:decode(Content),
|
||||||
CSV.
|
CSV.
|
||||||
|
|
||||||
|
aggreg_id(BridgeName) ->
|
||||||
|
{?BRIDGE_TYPE, BridgeName}.
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-record(delivery, {
|
-record(delivery, {
|
||||||
name :: _Name,
|
id :: id(),
|
||||||
callback_module :: module(),
|
callback_module :: module(),
|
||||||
container :: emqx_connector_aggreg_csv:container(),
|
container :: emqx_connector_aggreg_csv:container(),
|
||||||
reader :: emqx_connector_aggreg_buffer:reader(),
|
reader :: emqx_connector_aggreg_buffer:reader(),
|
||||||
|
@ -33,6 +33,8 @@
|
||||||
empty :: boolean()
|
empty :: boolean()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-type id() :: term().
|
||||||
|
|
||||||
-type state() :: #delivery{}.
|
-type state() :: #delivery{}.
|
||||||
|
|
||||||
-type init_opts() :: #{
|
-type init_opts() :: #{
|
||||||
|
@ -59,22 +61,22 @@
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
start_link(Name, Buffer, Opts) ->
|
start_link(Id, Buffer, Opts) ->
|
||||||
proc_lib:start_link(?MODULE, init, [self(), Name, Buffer, Opts]).
|
proc_lib:start_link(?MODULE, init, [self(), Id, Buffer, Opts]).
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
-spec init(pid(), _Name, buffer(), init_opts()) -> no_return().
|
-spec init(pid(), id(), buffer(), init_opts()) -> no_return().
|
||||||
init(Parent, Name, Buffer, Opts) ->
|
init(Parent, Id, Buffer, Opts) ->
|
||||||
?tp(connector_aggreg_delivery_started, #{action => Name, buffer => Buffer}),
|
?tp(connector_aggreg_delivery_started, #{action => Id, buffer => Buffer}),
|
||||||
Reader = open_buffer(Buffer),
|
Reader = open_buffer(Buffer),
|
||||||
Delivery = init_delivery(Name, Reader, Buffer, Opts#{action => Name}),
|
Delivery = init_delivery(Id, Reader, Buffer, Opts#{action => Id}),
|
||||||
_ = erlang:process_flag(trap_exit, true),
|
_ = erlang:process_flag(trap_exit, true),
|
||||||
ok = proc_lib:init_ack({ok, self()}),
|
ok = proc_lib:init_ack({ok, self()}),
|
||||||
loop(Delivery, Parent, []).
|
loop(Delivery, Parent, []).
|
||||||
|
|
||||||
init_delivery(
|
init_delivery(
|
||||||
Name,
|
Id,
|
||||||
Reader,
|
Reader,
|
||||||
Buffer,
|
Buffer,
|
||||||
Opts = #{
|
Opts = #{
|
||||||
|
@ -84,7 +86,7 @@ init_delivery(
|
||||||
) ->
|
) ->
|
||||||
BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer),
|
BufferMap = emqx_connector_aggregator:buffer_to_map(Buffer),
|
||||||
#delivery{
|
#delivery{
|
||||||
name = Name,
|
id = Id,
|
||||||
callback_module = Mod,
|
callback_module = Mod,
|
||||||
container = mk_container(ContainerOpts),
|
container = mk_container(ContainerOpts),
|
||||||
reader = Reader,
|
reader = Reader,
|
||||||
|
@ -158,16 +160,16 @@ process_write(Delivery = #delivery{callback_module = Mod, transfer = Transfer0})
|
||||||
error({transfer_failed, Reason})
|
error({transfer_failed, Reason})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
process_complete(#delivery{name = Name, empty = true}) ->
|
process_complete(#delivery{id = Id, empty = true}) ->
|
||||||
?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => empty}),
|
?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => empty}),
|
||||||
exit({shutdown, {skipped, empty}});
|
exit({shutdown, {skipped, empty}});
|
||||||
process_complete(#delivery{
|
process_complete(#delivery{
|
||||||
name = Name, callback_module = Mod, container = Container, transfer = Transfer0
|
id = Id, callback_module = Mod, container = Container, transfer = Transfer0
|
||||||
}) ->
|
}) ->
|
||||||
Trailer = emqx_connector_aggreg_csv:close(Container),
|
Trailer = emqx_connector_aggreg_csv:close(Container),
|
||||||
Transfer = Mod:process_append(Trailer, Transfer0),
|
Transfer = Mod:process_append(Trailer, Transfer0),
|
||||||
{ok, Completed} = Mod:process_complete(Transfer),
|
{ok, Completed} = Mod:process_complete(Transfer),
|
||||||
?tp(connector_aggreg_delivery_completed, #{action => Name, transfer => Completed}),
|
?tp(connector_aggreg_delivery_completed, #{action => Id, transfer => Completed}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
Loading…
Reference in New Issue