diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 97d4e7412..f04de7d90 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -131,7 +131,7 @@ -type transition() :: {add | del, site()}. -type update_cluster_result() :: - ok + {ok, unchanged | [site()]} | {error, {nonexistent_db, emqx_ds:db()}} | {error, {nonexistent_sites, [site()]}} | {error, {too_few_sites, [site()]}} @@ -449,7 +449,7 @@ allocate_shards_trans(DB) -> Allocation ). --spec assign_db_sites_trans(emqx_ds:db(), [site()]) -> ok. +-spec assign_db_sites_trans(emqx_ds:db(), [site()]) -> {ok, [site()]}. assign_db_sites_trans(DB, Sites) -> Opts = db_config_trans(DB), case [S || S <- Sites, mnesia:read(?NODE_TAB, S, read) == []] of @@ -466,21 +466,22 @@ assign_db_sites_trans(DB, Sites) -> %% 2. Ensure that sites are responsible for roughly the same number of shards. Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write), Reallocation = compute_allocation(Shards, Sites, Opts), - lists:foreach( + ok = lists:foreach( fun({Record, ReplicaSet}) -> ok = mnesia:write(Record#?SHARD_TAB{target_set = ReplicaSet}) end, Reallocation - ). + ), + {ok, Sites}. --spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> ok. +-spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> {ok, unchanged | [site()]}. modify_db_sites_trans(DB, Modifications) -> Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write), Sites0 = list_db_target_sites(Shards), Sites = lists:foldl(fun apply_transition/2, Sites0, Modifications), case Sites of Sites0 -> - ok; + {ok, unchanged}; _Changed -> assign_db_sites_trans(DB, Sites) end. diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 35b22cf32..8303ff861 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -239,7 +239,7 @@ t_rebalance(Config) -> ), %% Scale down the cluster by removing the first node. - ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])), + ?assertMatch({ok, _}, ds_repl_meta(N1, leave_db_site, [?DB, S1])), ct:pal("Transitions (~p -> ~p): ~p~n", [ Sites, tl(Sites), emqx_ds_test_helpers:transitions(N1, ?DB) ]), @@ -297,12 +297,12 @@ t_join_leave_errors(Config) -> ), %% NOTE: Leaving a non-existent site is not an error. ?assertEqual( - ok, + {ok, unchanged}, ds_repl_meta(N1, leave_db_site, [?DB, <<"NO-MANS-SITE">>]) ), %% Should be no-op. - ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])), + ?assertEqual({ok, unchanged}, ds_repl_meta(N1, join_db_site, [?DB, S1])), ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)), %% Impossible to leave the last site. @@ -312,13 +312,13 @@ t_join_leave_errors(Config) -> ), %% "Move" the DB to the other node. - ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])), - ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), + ?assertMatch({ok, _}, ds_repl_meta(N1, join_db_site, [?DB, S2])), + ?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?DB, S1])), ?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?DB)), ?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), %% Should be no-op. - ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), + ?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?DB, S1])), ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)). t_rebalance_chaotic_converges(init, Config) -> @@ -457,7 +457,7 @@ t_rebalance_offline_restarts(Config) -> %% Shut down N3 and then remove it from the DB. ok = emqx_cth_cluster:stop_node(N3), - ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])), + ?assertMatch({ok, _}, ds_repl_meta(N1, leave_db_site, [?DB, S3])), Transitions = emqx_ds_test_helpers:transitions(N1, ?DB), ct:pal("Transitions: ~p~n", [Transitions]), diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index 996f39626..f3ad1c151 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -164,8 +164,8 @@ apply_stream(DB, NodeStream0, Stream0, N) -> -> ?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}), %% Apply the transition. - ?assertEqual( - ok, + ?assertMatch( + {ok, _}, ?ON( Node, emqx_ds_replication_layer_meta:Operation(DB, Arg) diff --git a/apps/emqx_management/src/emqx_mgmt_api_ds.erl b/apps/emqx_management/src/emqx_mgmt_api_ds.erl index c1a03feb4..5f36bdce7 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_ds.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_ds.erl @@ -323,17 +323,11 @@ get_db(get, #{bindings := #{ds := DB}}) -> }). db_replicas(get, #{bindings := #{ds := DB}}) -> - Replicas = lists:flatmap( - fun(Shard) -> - #{replica_set := RS} = emqx_ds_replication_layer_meta:shard_info(DB, Shard), - maps:keys(RS) - end, - emqx_ds_replication_layer_meta:shards(DB) - ), - ?OK(lists:usort(Replicas)); + Replicas = emqx_ds_replication_layer_meta:db_sites(DB), + ?OK(Replicas); db_replicas(put, #{bindings := #{ds := DB}, body := Sites}) -> case update_db_sites(DB, Sites, rest) of - ok -> + {ok, _} -> {202, <<"OK">>}; {error, Description} -> ?BAD_REQUEST(400, Description) @@ -341,21 +335,23 @@ db_replicas(put, #{bindings := #{ds := DB}, body := Sites}) -> db_replica(put, #{bindings := #{ds := DB, site := Site}}) -> case join(DB, Site, rest) of - ok -> + {ok, _} -> {202, <<"OK">>}; {error, Description} -> ?BAD_REQUEST(400, Description) end; db_replica(delete, #{bindings := #{ds := DB, site := Site}}) -> case leave(DB, Site, rest) of - ok -> + {ok, Sites} when is_list(Sites) -> {202, <<"OK">>}; + {ok, unchanged} -> + ?NOT_FOUND(<<"Site is not part of replica set">>); {error, Description} -> ?BAD_REQUEST(400, Description) end. -spec update_db_sites(emqx_ds:db(), [emqx_ds_replication_layer_meta:site()], rest | cli) -> - ok | {error, binary()}. + {ok, [emqx_ds_replication_layer_meta:site()]} | {error, _}. update_db_sites(DB, Sites, Via) when is_list(Sites) -> ?SLOG(warning, #{ msg => "durable_storage_rebalance_request", ds => DB, sites => Sites, via => Via @@ -364,14 +360,16 @@ update_db_sites(DB, Sites, Via) when is_list(Sites) -> update_db_sites(_, _, _) -> {error, <<"Bad type">>}. --spec join(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}. +-spec join(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> + {ok, unchanged | [emqx_ds_replication_layer_meta:site()]} | {error, _}. join(DB, Site, Via) -> ?SLOG(warning, #{ msg => "durable_storage_join_request", ds => DB, site => Site, via => Via }), meta_result_to_binary(emqx_ds_replication_layer_meta:join_db_site(DB, Site)). --spec leave(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}. +-spec leave(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> + {ok, unchanged | [emqx_ds_replication_layer_meta:site()]} | {error, _}. leave(DB, Site, Via) -> ?SLOG(warning, #{ msg => "durable_storage_leave_request", ds => DB, site => Site, via => Via @@ -468,8 +466,8 @@ list_shards(DB) -> || Shard <- emqx_ds_replication_layer_meta:shards(DB) ]. -meta_result_to_binary(ok) -> - ok; +meta_result_to_binary({ok, Result}) -> + {ok, Result}; meta_result_to_binary({error, {nonexistent_sites, UnknownSites}}) -> Msg = ["Unknown sites: " | lists:join(", ", UnknownSites)], {error, iolist_to_binary(Msg)}; diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 32a24d9bd..fef2ff217 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -855,7 +855,7 @@ do_ds(["set_replicas", DBStr | SitesStr]) -> {ok, DB} -> Sites = lists:map(fun list_to_binary/1, SitesStr), case emqx_mgmt_api_ds:update_db_sites(DB, Sites, cli) of - ok -> + {ok, _} -> emqx_ctl:print("ok~n"); {error, Description} -> emqx_ctl:print("Unable to update replicas: ~s~n", [Description]) @@ -867,7 +867,9 @@ do_ds(["join", DBStr, Site]) -> case emqx_utils:safe_to_existing_atom(DBStr) of {ok, DB} -> case emqx_mgmt_api_ds:join(DB, list_to_binary(Site), cli) of - ok -> + {ok, unchanged} -> + emqx_ctl:print("unchanged~n"); + {ok, _} -> emqx_ctl:print("ok~n"); {error, Description} -> emqx_ctl:print("Unable to update replicas: ~s~n", [Description]) @@ -879,7 +881,9 @@ do_ds(["leave", DBStr, Site]) -> case emqx_utils:safe_to_existing_atom(DBStr) of {ok, DB} -> case emqx_mgmt_api_ds:leave(DB, list_to_binary(Site), cli) of - ok -> + {ok, unchanged} -> + emqx_ctl:print("unchanged~n"); + {ok, _} -> emqx_ctl:print("ok~n"); {error, Description} -> emqx_ctl:print("Unable to update replicas: ~s~n", [Description]) diff --git a/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl index fef9276ca..8048a6820 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl @@ -174,6 +174,14 @@ t_leave(_) -> request_api(delete, Path) ). +t_leave_notfound(_) -> + Site = "not_part_of_replica_set", + Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", Site]), + ?assertMatch( + {error, {_, 404, _}}, + request_api(delete, Path) + ). + parse_error({ok, Code, JSON}) -> {ok, Code, emqx_utils_json:decode(JSON)}; parse_error(Err) ->