diff --git a/apps/emqx/src/emqx_ds_schema.erl b/apps/emqx/src/emqx_ds_schema.erl index 1c932c822..1cda81d1d 100644 --- a/apps/emqx/src/emqx_ds_schema.erl +++ b/apps/emqx/src/emqx_ds_schema.erl @@ -263,7 +263,7 @@ common_builtin_fields() -> sc( pos_integer(), #{ - default => 12, + default => 16, importance => ?IMPORTANCE_MEDIUM, desc => ?DESC(builtin_n_shards) } diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index 3bb2ba4c4..abe154807 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -57,12 +57,12 @@ appspec(emqx_durable_storage) -> }}. t_metadata(init, Config) -> - emqx_cth_suite:start([emqx_ds_builtin_raft], #{ + Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{ work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config) }), - Config; + [{apps, Apps} | Config]; t_metadata('end', Config) -> - emqx_cth_suite:stop([emqx_ds_builtin_raft]), + emqx_cth_suite:stop(?config(apps, Config)), Config. t_metadata(_Config) -> @@ -203,18 +203,23 @@ t_rebalance(Config) -> ?check_trace( #{timetrap => 30_000}, begin + Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes], %% 1. Initialize DB on the first node. Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), - ?assertEqual(ok, ?ON(N1, emqx_ds:open_db(?DB, Opts))), - ?assertMatch(Shards when length(Shards) == 16, shards_online(N1, ?DB)), - - %% 1.1 Open DB on the rest of the nodes: [ ?assertEqual(ok, ?ON(Node, emqx_ds:open_db(?DB, Opts))) || Node <- Nodes ], - Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes], + %% 1.1 Kick all sites except S1 from the replica set as + %% the initial condition: + ?assertMatch( + {ok, [_]}, + ?ON(N1, emqx_ds_replication_layer_meta:assign_db_sites(?DB, [S1])) + ), + ?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), + ?retry(500, 10, ?assertMatch(Shards when length(Shards) == 16, shards_online(N1, ?DB))), + ct:pal("Sites: ~p~n", [Sites]), Sequence = [ @@ -319,16 +324,15 @@ t_join_leave_errors('end', Config) -> t_join_leave_errors(Config) -> %% This testcase verifies that logical errors arising during handling of %% join/leave operations are reported correctly. - [N1, N2] = ?config(nodes, Config), Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), - ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])), - ?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [?DB, Opts])), + ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?FUNCTION_NAME, Opts])), + ?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [?FUNCTION_NAME, Opts])), [S1, S2] = [ds_repl_meta(N, this_site) || N <- [N1, N2]], - ?assertEqual([S1], ds_repl_meta(N1, db_sites, [?DB])), + ?assertEqual(lists:sort([S1, S2]), lists:sort(ds_repl_meta(N1, db_sites, [?FUNCTION_NAME]))), %% Attempts to join a nonexistent DB / site. ?assertEqual( @@ -337,33 +341,40 @@ t_join_leave_errors(Config) -> ), ?assertEqual( {error, {nonexistent_sites, [<<"NO-MANS-SITE">>]}}, - ds_repl_meta(N1, join_db_site, [?DB, <<"NO-MANS-SITE">>]) + ds_repl_meta(N1, join_db_site, [?FUNCTION_NAME, <<"NO-MANS-SITE">>]) ), %% NOTE: Leaving a non-existent site is not an error. ?assertEqual( {ok, unchanged}, - ds_repl_meta(N1, leave_db_site, [?DB, <<"NO-MANS-SITE">>]) + ds_repl_meta(N1, leave_db_site, [?FUNCTION_NAME, <<"NO-MANS-SITE">>]) ), %% Should be no-op. - ?assertEqual({ok, unchanged}, ds_repl_meta(N1, join_db_site, [?DB, S1])), - ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)), + ?assertEqual({ok, unchanged}, ds_repl_meta(N1, join_db_site, [?FUNCTION_NAME, S1])), + ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME)), - %% Impossible to leave the last site. + %% Leave S2: + ?assertEqual( + {ok, [S1]}, + ds_repl_meta(N1, leave_db_site, [?FUNCTION_NAME, S2]) + ), + %% Impossible to leave the last site: ?assertEqual( {error, {too_few_sites, []}}, - ds_repl_meta(N1, leave_db_site, [?DB, S1]) + ds_repl_meta(N1, leave_db_site, [?FUNCTION_NAME, S1]) ), %% "Move" the DB to the other node. - ?assertMatch({ok, _}, ds_repl_meta(N1, join_db_site, [?DB, S2])), - ?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?DB, S1])), - ?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?DB)), - ?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), + ?assertMatch({ok, _}, ds_repl_meta(N1, join_db_site, [?FUNCTION_NAME, S2])), + ?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?FUNCTION_NAME, S1])), + ?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME)), + ?retry( + 1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME)) + ), %% Should be no-op. - ?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?DB, S1])), - ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)). + ?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?FUNCTION_NAME, S1])), + ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME)). t_rebalance_chaotic_converges(init, Config) -> Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], @@ -395,23 +406,24 @@ t_rebalance_chaotic_converges(Config) -> ?check_trace( #{}, begin + Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), + %% Initialize DB on first two nodes. Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}), + %% Open DB: ?assertEqual( - [{ok, ok}, {ok, ok}], - erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts]) + [{ok, ok}, {ok, ok}, {ok, ok}], + erpc:multicall([N1, N2, N3], emqx_ds, open_db, [?DB, Opts]) ), - %% Open DB on the last node. - ?assertEqual( - ok, - erpc:call(N3, emqx_ds, open_db, [?DB, Opts]) + %% Kick N3 from the replica set as the initial condition: + ?assertMatch( + {ok, [_, _]}, + ?ON(N1, emqx_ds_replication_layer_meta:assign_db_sites(?DB, [S1, S2])) ), - - %% Find out which sites there are. - Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], - ct:pal("Sites: ~p~n", [Sites]), + ?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), Sequence = [ {N1, join_db_site, S3}, @@ -600,12 +612,12 @@ t_drop_generation(Config) -> ). t_error_mapping_replication_layer(init, Config) -> - emqx_cth_suite:start([emqx_ds_builtin_raft], #{ + Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{ work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config) }), - Config; + [{apps, Apps} | Config]; t_error_mapping_replication_layer('end', Config) -> - emqx_cth_suite:stop([emqx_ds_builtin_raft]), + emqx_cth_suite:stop(?config(apps, Config)), Config. t_error_mapping_replication_layer(_Config) -> @@ -701,6 +713,15 @@ t_error_mapping_replication_layer(_Config) -> %% This testcase verifies the behavior of `store_batch' operation %% when the underlying code experiences recoverable or unrecoverable %% problems. +t_store_batch_fail(init, Config) -> + Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{ + work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config) + }), + [{apps, Apps} | Config]; +t_store_batch_fail('end', Config) -> + emqx_cth_suite:stop(?config(apps, Config)), + Config. + t_store_batch_fail(_Config) -> ?check_trace( #{timetrap => 15_000}, diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index bca8eb0eb..f18114918 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -26,7 +26,7 @@ init_per_suite(Config) -> }, <<"durable_storage">> => #{ <<"messages">> => #{ - <<"backend">> => <<"builtin">> + <<"backend">> => <<"builtin_raft">> } } }