Merge pull request #13047 from keynslug/fix/EMQX-12366/api-ds-leave

feat(api-ds): provide more information on nonexistent site leave
This commit is contained in:
Andrew Mayorov 2024-05-14 23:21:07 +02:00 committed by GitHub
commit 3e1e4bab17
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 45 additions and 34 deletions

View File

@ -131,7 +131,7 @@
-type transition() :: {add | del, site()}.
-type update_cluster_result() ::
ok
{ok, unchanged | [site()]}
| {error, {nonexistent_db, emqx_ds:db()}}
| {error, {nonexistent_sites, [site()]}}
| {error, {too_few_sites, [site()]}}
@ -449,7 +449,7 @@ allocate_shards_trans(DB) ->
Allocation
).
-spec assign_db_sites_trans(emqx_ds:db(), [site()]) -> ok.
-spec assign_db_sites_trans(emqx_ds:db(), [site()]) -> {ok, [site()]}.
assign_db_sites_trans(DB, Sites) ->
Opts = db_config_trans(DB),
case [S || S <- Sites, mnesia:read(?NODE_TAB, S, read) == []] of
@ -466,21 +466,22 @@ assign_db_sites_trans(DB, Sites) ->
%% 2. Ensure that sites are responsible for roughly the same number of shards.
Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
Reallocation = compute_allocation(Shards, Sites, Opts),
lists:foreach(
ok = lists:foreach(
fun({Record, ReplicaSet}) ->
ok = mnesia:write(Record#?SHARD_TAB{target_set = ReplicaSet})
end,
Reallocation
).
),
{ok, Sites}.
-spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> ok.
-spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> {ok, unchanged | [site()]}.
modify_db_sites_trans(DB, Modifications) ->
Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
Sites0 = list_db_target_sites(Shards),
Sites = lists:foldl(fun apply_transition/2, Sites0, Modifications),
case Sites of
Sites0 ->
ok;
{ok, unchanged};
_Changed ->
assign_db_sites_trans(DB, Sites)
end.

View File

@ -239,7 +239,7 @@ t_rebalance(Config) ->
),
%% Scale down the cluster by removing the first node.
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
?assertMatch({ok, _}, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
ct:pal("Transitions (~p -> ~p): ~p~n", [
Sites, tl(Sites), emqx_ds_test_helpers:transitions(N1, ?DB)
]),
@ -297,12 +297,12 @@ t_join_leave_errors(Config) ->
),
%% NOTE: Leaving a non-existent site is not an error.
?assertEqual(
ok,
{ok, unchanged},
ds_repl_meta(N1, leave_db_site, [?DB, <<"NO-MANS-SITE">>])
),
%% Should be no-op.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])),
?assertEqual({ok, unchanged}, ds_repl_meta(N1, join_db_site, [?DB, S1])),
?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)),
%% Impossible to leave the last site.
@ -312,13 +312,13 @@ t_join_leave_errors(Config) ->
),
%% "Move" the DB to the other node.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])),
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertMatch({ok, _}, ds_repl_meta(N1, join_db_site, [?DB, S2])),
?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?DB)),
?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
%% Should be no-op.
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)).
t_rebalance_chaotic_converges(init, Config) ->
@ -457,7 +457,7 @@ t_rebalance_offline_restarts(Config) ->
%% Shut down N3 and then remove it from the DB.
ok = emqx_cth_cluster:stop_node(N3),
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])),
?assertMatch({ok, _}, ds_repl_meta(N1, leave_db_site, [?DB, S3])),
Transitions = emqx_ds_test_helpers:transitions(N1, ?DB),
ct:pal("Transitions: ~p~n", [Transitions]),

View File

@ -164,8 +164,8 @@ apply_stream(DB, NodeStream0, Stream0, N) ->
->
?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}),
%% Apply the transition.
?assertEqual(
ok,
?assertMatch(
{ok, _},
?ON(
Node,
emqx_ds_replication_layer_meta:Operation(DB, Arg)

View File

@ -323,17 +323,11 @@ get_db(get, #{bindings := #{ds := DB}}) ->
}).
db_replicas(get, #{bindings := #{ds := DB}}) ->
Replicas = lists:flatmap(
fun(Shard) ->
#{replica_set := RS} = emqx_ds_replication_layer_meta:shard_info(DB, Shard),
maps:keys(RS)
end,
emqx_ds_replication_layer_meta:shards(DB)
),
?OK(lists:usort(Replicas));
Replicas = emqx_ds_replication_layer_meta:db_sites(DB),
?OK(Replicas);
db_replicas(put, #{bindings := #{ds := DB}, body := Sites}) ->
case update_db_sites(DB, Sites, rest) of
ok ->
{ok, _} ->
{202, <<"OK">>};
{error, Description} ->
?BAD_REQUEST(400, Description)
@ -341,21 +335,23 @@ db_replicas(put, #{bindings := #{ds := DB}, body := Sites}) ->
db_replica(put, #{bindings := #{ds := DB, site := Site}}) ->
case join(DB, Site, rest) of
ok ->
{ok, _} ->
{202, <<"OK">>};
{error, Description} ->
?BAD_REQUEST(400, Description)
end;
db_replica(delete, #{bindings := #{ds := DB, site := Site}}) ->
case leave(DB, Site, rest) of
ok ->
{ok, Sites} when is_list(Sites) ->
{202, <<"OK">>};
{ok, unchanged} ->
?NOT_FOUND(<<"Site is not part of replica set">>);
{error, Description} ->
?BAD_REQUEST(400, Description)
end.
-spec update_db_sites(emqx_ds:db(), [emqx_ds_replication_layer_meta:site()], rest | cli) ->
ok | {error, binary()}.
{ok, [emqx_ds_replication_layer_meta:site()]} | {error, _}.
update_db_sites(DB, Sites, Via) when is_list(Sites) ->
?SLOG(warning, #{
msg => "durable_storage_rebalance_request", ds => DB, sites => Sites, via => Via
@ -364,14 +360,16 @@ update_db_sites(DB, Sites, Via) when is_list(Sites) ->
update_db_sites(_, _, _) ->
{error, <<"Bad type">>}.
-spec join(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}.
-spec join(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) ->
{ok, unchanged | [emqx_ds_replication_layer_meta:site()]} | {error, _}.
join(DB, Site, Via) ->
?SLOG(warning, #{
msg => "durable_storage_join_request", ds => DB, site => Site, via => Via
}),
meta_result_to_binary(emqx_ds_replication_layer_meta:join_db_site(DB, Site)).
-spec leave(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}.
-spec leave(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) ->
{ok, unchanged | [emqx_ds_replication_layer_meta:site()]} | {error, _}.
leave(DB, Site, Via) ->
?SLOG(warning, #{
msg => "durable_storage_leave_request", ds => DB, site => Site, via => Via
@ -468,8 +466,8 @@ list_shards(DB) ->
|| Shard <- emqx_ds_replication_layer_meta:shards(DB)
].
meta_result_to_binary(ok) ->
ok;
meta_result_to_binary({ok, Result}) ->
{ok, Result};
meta_result_to_binary({error, {nonexistent_sites, UnknownSites}}) ->
Msg = ["Unknown sites: " | lists:join(", ", UnknownSites)],
{error, iolist_to_binary(Msg)};

View File

@ -855,7 +855,7 @@ do_ds(["set_replicas", DBStr | SitesStr]) ->
{ok, DB} ->
Sites = lists:map(fun list_to_binary/1, SitesStr),
case emqx_mgmt_api_ds:update_db_sites(DB, Sites, cli) of
ok ->
{ok, _} ->
emqx_ctl:print("ok~n");
{error, Description} ->
emqx_ctl:print("Unable to update replicas: ~s~n", [Description])
@ -867,7 +867,9 @@ do_ds(["join", DBStr, Site]) ->
case emqx_utils:safe_to_existing_atom(DBStr) of
{ok, DB} ->
case emqx_mgmt_api_ds:join(DB, list_to_binary(Site), cli) of
ok ->
{ok, unchanged} ->
emqx_ctl:print("unchanged~n");
{ok, _} ->
emqx_ctl:print("ok~n");
{error, Description} ->
emqx_ctl:print("Unable to update replicas: ~s~n", [Description])
@ -879,7 +881,9 @@ do_ds(["leave", DBStr, Site]) ->
case emqx_utils:safe_to_existing_atom(DBStr) of
{ok, DB} ->
case emqx_mgmt_api_ds:leave(DB, list_to_binary(Site), cli) of
ok ->
{ok, unchanged} ->
emqx_ctl:print("unchanged~n");
{ok, _} ->
emqx_ctl:print("ok~n");
{error, Description} ->
emqx_ctl:print("Unable to update replicas: ~s~n", [Description])

View File

@ -174,6 +174,14 @@ t_leave(_) ->
request_api(delete, Path)
).
t_leave_notfound(_) ->
Site = "not_part_of_replica_set",
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", Site]),
?assertMatch(
{error, {_, 404, _}},
request_api(delete, Path)
).
parse_error({ok, Code, JSON}) ->
{ok, Code, emqx_utils_json:decode(JSON)};
parse_error(Err) ->