Merge pull request #13005 from thalesmg/fix-aggreg-sup-tree-r57-20240509
fix(aggregator): refactor supervision tree
This commit is contained in:
commit
0eaef18391
|
@ -19,6 +19,7 @@
|
||||||
emqx_bridge_s3_connector_info
|
emqx_bridge_s3_connector_info
|
||||||
]}
|
]}
|
||||||
]},
|
]},
|
||||||
|
{mod, {emqx_bridge_s3_app, []}},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{links, []}
|
{links, []}
|
||||||
]}.
|
]}.
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_connector_aggreg_app).
|
-module(emqx_bridge_s3_app).
|
||||||
|
|
||||||
-behaviour(application).
|
-behaviour(application).
|
||||||
-export([start/2, stop/1]).
|
-export([start/2, stop/1]).
|
||||||
|
@ -15,7 +15,7 @@
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
start(_StartType, _StartArgs) ->
|
start(_StartType, _StartArgs) ->
|
||||||
emqx_connector_aggreg_sup:start_link().
|
emqx_bridge_s3_sup:start_link().
|
||||||
|
|
||||||
stop(_State) ->
|
stop(_State) ->
|
||||||
ok.
|
ok.
|
|
@ -87,6 +87,8 @@
|
||||||
channels := #{channel_id() => channel_state()}
|
channels := #{channel_id() => channel_state()}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
-define(AGGREG_SUP, emqx_bridge_s3_sup).
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
-spec callback_mode() -> callback_mode().
|
-spec callback_mode() -> callback_mode().
|
||||||
|
@ -224,8 +226,8 @@ start_channel(State, #{
|
||||||
client_config => maps:get(client_config, State),
|
client_config => maps:get(client_config, State),
|
||||||
uploader_config => maps:with([min_part_size, max_part_size], Parameters)
|
uploader_config => maps:with([min_part_size, max_part_size], Parameters)
|
||||||
},
|
},
|
||||||
_ = emqx_connector_aggreg_sup:delete_child(AggregId),
|
_ = ?AGGREG_SUP:delete_child(AggregId),
|
||||||
{ok, SupPid} = emqx_connector_aggreg_sup:start_child(#{
|
{ok, SupPid} = ?AGGREG_SUP:start_child(#{
|
||||||
id => AggregId,
|
id => AggregId,
|
||||||
start =>
|
start =>
|
||||||
{emqx_connector_aggreg_upload_sup, start_link, [AggregId, AggregOpts, DeliveryOpts]},
|
{emqx_connector_aggreg_upload_sup, start_link, [AggregId, AggregOpts, DeliveryOpts]},
|
||||||
|
@ -238,7 +240,7 @@ start_channel(State, #{
|
||||||
aggreg_id => AggregId,
|
aggreg_id => AggregId,
|
||||||
bucket => Bucket,
|
bucket => Bucket,
|
||||||
supervisor => SupPid,
|
supervisor => SupPid,
|
||||||
on_stop => fun() -> emqx_connector_aggreg_sup:delete_child(AggregId) end
|
on_stop => fun() -> ?AGGREG_SUP:delete_child(AggregId) end
|
||||||
}.
|
}.
|
||||||
|
|
||||||
upload_options(Parameters) ->
|
upload_options(Parameters) ->
|
||||||
|
|
|
@ -0,0 +1,54 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_s3_sup).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([
|
||||||
|
start_link/0,
|
||||||
|
start_child/1,
|
||||||
|
delete_child/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% `supervisor' API
|
||||||
|
-export([init/1]).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Type declarations
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% API
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
start_link() ->
|
||||||
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
|
start_child(ChildSpec) ->
|
||||||
|
supervisor:start_child(?MODULE, ChildSpec).
|
||||||
|
|
||||||
|
delete_child(ChildId) ->
|
||||||
|
case supervisor:terminate_child(?MODULE, ChildId) of
|
||||||
|
ok ->
|
||||||
|
supervisor:delete_child(?MODULE, ChildId);
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% `supervisor' API
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
SupFlags = #{
|
||||||
|
strategy => one_for_one,
|
||||||
|
intensity => 1,
|
||||||
|
period => 1
|
||||||
|
},
|
||||||
|
ChildSpecs = [],
|
||||||
|
{ok, {SupFlags, ChildSpecs}}.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Internal fns
|
||||||
|
%%------------------------------------------------------------------------------
|
|
@ -1,42 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_connector_aggreg_sup).
|
|
||||||
|
|
||||||
-export([
|
|
||||||
start_link/0,
|
|
||||||
start_child/1,
|
|
||||||
delete_child/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-behaviour(supervisor).
|
|
||||||
-export([init/1]).
|
|
||||||
|
|
||||||
-define(SUPREF, ?MODULE).
|
|
||||||
|
|
||||||
%%
|
|
||||||
|
|
||||||
start_link() ->
|
|
||||||
supervisor:start_link({local, ?SUPREF}, ?MODULE, root).
|
|
||||||
|
|
||||||
start_child(ChildSpec) ->
|
|
||||||
supervisor:start_child(?SUPREF, ChildSpec).
|
|
||||||
|
|
||||||
delete_child(ChildId) ->
|
|
||||||
case supervisor:terminate_child(?SUPREF, ChildId) of
|
|
||||||
ok ->
|
|
||||||
supervisor:delete_child(?SUPREF, ChildId);
|
|
||||||
Error ->
|
|
||||||
Error
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%
|
|
||||||
|
|
||||||
init(root) ->
|
|
||||||
SupFlags = #{
|
|
||||||
strategy => one_for_one,
|
|
||||||
intensity => 1,
|
|
||||||
period => 1
|
|
||||||
},
|
|
||||||
{ok, {SupFlags, []}}.
|
|
|
@ -7,7 +7,6 @@
|
||||||
stdlib
|
stdlib
|
||||||
]},
|
]},
|
||||||
{env, []},
|
{env, []},
|
||||||
{mod, {emqx_connector_aggreg_app, []}},
|
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{links, []}
|
{links, []}
|
||||||
]}.
|
]}.
|
||||||
|
|
Loading…
Reference in New Issue