diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 8e67856d7..d4492f721 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -54,7 +54,7 @@ init() -> -spec is_persistence_enabled() -> boolean(). is_persistence_enabled() -> - persistent_term:get(?PERSISTENCE_ENABLED). + persistent_term:get(?PERSISTENCE_ENABLED, false). -spec is_persistence_enabled(emqx_types:zone()) -> boolean(). is_persistence_enabled(Zone) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index b2a461e7a..747f8241f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -79,11 +79,15 @@ start_shard({DB, Shard}) -> start_egress({DB, Shard}) -> supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)). --spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok. +-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, not_found}. stop_shard({DB, Shard}) -> Sup = ?via(#?shards_sup{db = DB}), - ok = supervisor:terminate_child(Sup, Shard), - ok = supervisor:delete_child(Sup, Shard). + case supervisor:terminate_child(Sup, Shard) of + ok -> + supervisor:delete_child(Sup, Shard); + {error, Reason} -> + {error, Reason} + end. -spec terminate_storage(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}. terminate_storage({DB, Shard}) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index d198b2ddd..cbaafc718 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -41,6 +41,7 @@ -define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}). -define(ALLOCATE_RETRY_TIMEOUT, 1_000). +-define(TRIGGER_PENDING_TIMEOUT, 60_000). -define(TRANS_RETRY_TIMEOUT, 5_000). -define(CRASH_RETRY_DELAY, 20_000). @@ -106,7 +107,7 @@ handle_call(_Call, _From, State) -> -spec handle_cast(_Cast, state()) -> {noreply, state()}. handle_cast(#trigger_transitions{}, State) -> - {noreply, handle_pending_transitions(State)}; + {noreply, handle_pending_transitions(State), ?TRIGGER_PENDING_TIMEOUT}; handle_cast(_Cast, State) -> {noreply, State}. @@ -118,13 +119,15 @@ handle_cast(_Cast, State) -> handle_info({timeout, _TRef, allocate}, State) -> {noreply, handle_allocate_shards(State)}; handle_info({changed, {shard, DB, Shard}}, State = #{db := DB}) -> - {noreply, handle_shard_changed(Shard, State)}; + {noreply, handle_shard_changed(Shard, State), ?TRIGGER_PENDING_TIMEOUT}; handle_info({changed, _}, State) -> - {noreply, State}; + {noreply, State, ?TRIGGER_PENDING_TIMEOUT}; handle_info({'EXIT', Pid, Reason}, State) -> - {noreply, handle_exit(Pid, Reason, State)}; + {noreply, handle_exit(Pid, Reason, State), ?TRIGGER_PENDING_TIMEOUT}; +handle_info(timeout, State) -> + {noreply, handle_pending_transitions(State), ?TRIGGER_PENDING_TIMEOUT}; handle_info(_Info, State) -> - {noreply, State}. + {noreply, State, ?TRIGGER_PENDING_TIMEOUT}. -spec terminate(_Reason, state()) -> _Ok. terminate(_Reason, State = #{db := DB, shards := Shards}) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl index cb44a43f6..dc7085c42 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_publish_SUITE.erl @@ -29,11 +29,18 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite(), - Config. + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. -end_per_suite(_) -> - emqx_mgmt_api_test_util:end_suite(). +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). init_per_testcase(Case, Config) -> ?MODULE:Case({init, Config}). diff --git a/apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl index 1db30a73b..05eaa830c 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_stats_SUITE.erl @@ -33,7 +33,6 @@ init_per_suite(Config) -> ], #{work_dir => emqx_cth_suite:work_dir(Config)} ), - {ok, _Api} = emqx_common_test_http:create_default_app(), [{apps, Apps} | Config]. end_per_suite(Config) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_status_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_status_SUITE.erl index 36ee4830d..64d9b032f 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_status_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_status_SUITE.erl @@ -51,11 +51,18 @@ groups() -> ]. init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite(), - Config. + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. -end_per_suite(_) -> - emqx_mgmt_api_test_util:end_suite(). +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). init_per_group(api_status_endpoint, Config) -> [{get_status_path, ["api", "v5", "status"]} | Config]; diff --git a/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl index b1d646b40..f896468b7 100644 --- a/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl @@ -25,12 +25,19 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]), + Apps = emqx_cth_suite:start( + [ + emqx_conf, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), ok = emqx_mgmt_cli:load(), - Config. + [{apps, Apps} | Config]. -end_per_suite(_) -> - emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]). +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). init_per_testcase(t_autocluster_leave = TC, Config) -> [Core1, Core2, Repl1, Repl2] =