diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index a9d904da1..ab72eb7f3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -109,6 +109,7 @@ list_shards(DB) -> -spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. open_db(DB, CreateOpts) -> + ok = emqx_ds_sup:ensure_workers(), Opts = emqx_ds_replication_layer_meta:open_db(DB, CreateOpts), MyShards = emqx_ds_replication_layer_meta:my_shards(DB), lists:foreach( diff --git a/apps/emqx_durable_storage/src/emqx_ds_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_sup.erl index 081557a46..819d7d874 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_sup.erl @@ -1,12 +1,12 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_ds_sup). -behaviour(supervisor). %% API: --export([start_link/0]). +-export([start_link/0, ensure_workers/0]). %% behaviour callbacks: -export([init/1]). @@ -23,21 +23,41 @@ -spec start_link() -> {ok, pid()}. start_link() -> - supervisor:start_link({local, ?SUP}, ?MODULE, []). + supervisor:start_link({local, ?SUP}, ?MODULE, top). + +-spec ensure_workers() -> ok. +ensure_workers() -> + ChildSpec = #{ + id => workers_sup, + restart => temporary, + type => supervisor, + start => {supervisor, start_link, [?MODULE, workers]} + }, + case supervisor:start_child(?SUP, ChildSpec) of + {ok, _} -> + ok; + {error, already_present} -> + ok; + {error, {already_started, _}} -> + ok + end. %%================================================================================ %% behaviour callbacks %%================================================================================ -dialyzer({nowarn_function, init/1}). -init([]) -> +init(top) -> + SupFlags = #{ + strategy => one_for_all, + intensity => 10, + period => 1 + }, + {ok, {SupFlags, []}}; +init(workers) -> %% TODO: technically, we don't need rocksDB for the alternative %% backends. But right now we have any: - Children = - case mria:rocksdb_backend_available() of - true -> [meta(), storage_layer_sup()]; - false -> [] - end, + Children = [meta(), storage_layer_sup()], SupFlags = #{ strategy => one_for_all, intensity => 0, diff --git a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src index fd2c56293..f6f757cfd 100644 --- a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src +++ b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src @@ -2,7 +2,7 @@ {application, emqx_durable_storage, [ {description, "Message persistence and subscription replays for EMQX"}, % strict semver, bump manually! - {vsn, "0.1.9"}, + {vsn, "0.1.10"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, rocksdb, gproc, mria, emqx_utils]}, diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 7b733406d..22fad903e 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_ds_storage_bitfield_lts_SUITE). @@ -378,6 +378,7 @@ suite() -> [{timetrap, {seconds, 20}}]. init_per_suite(Config) -> {ok, _} = application:ensure_all_started(emqx_durable_storage), + emqx_ds_sup:ensure_workers(), Config. end_per_suite(_Config) ->