feat(api-ds): provide more information on nonexistent site leave
This commit is contained in:
parent
15acd86b31
commit
35e360fcbe
|
@ -131,7 +131,7 @@
|
|||
-type transition() :: {add | del, site()}.
|
||||
|
||||
-type update_cluster_result() ::
|
||||
ok
|
||||
{ok, unchanged | [site()]}
|
||||
| {error, {nonexistent_db, emqx_ds:db()}}
|
||||
| {error, {nonexistent_sites, [site()]}}
|
||||
| {error, {too_few_sites, [site()]}}
|
||||
|
@ -449,7 +449,7 @@ allocate_shards_trans(DB) ->
|
|||
Allocation
|
||||
).
|
||||
|
||||
-spec assign_db_sites_trans(emqx_ds:db(), [site()]) -> ok.
|
||||
-spec assign_db_sites_trans(emqx_ds:db(), [site()]) -> {ok, [site()]}.
|
||||
assign_db_sites_trans(DB, Sites) ->
|
||||
Opts = db_config_trans(DB),
|
||||
case [S || S <- Sites, mnesia:read(?NODE_TAB, S, read) == []] of
|
||||
|
@ -466,21 +466,22 @@ assign_db_sites_trans(DB, Sites) ->
|
|||
%% 2. Ensure that sites are responsible for roughly the same number of shards.
|
||||
Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
|
||||
Reallocation = compute_allocation(Shards, Sites, Opts),
|
||||
lists:foreach(
|
||||
ok = lists:foreach(
|
||||
fun({Record, ReplicaSet}) ->
|
||||
ok = mnesia:write(Record#?SHARD_TAB{target_set = ReplicaSet})
|
||||
end,
|
||||
Reallocation
|
||||
).
|
||||
),
|
||||
{ok, Sites}.
|
||||
|
||||
-spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> ok.
|
||||
-spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> {ok, unchanged | [site()]}.
|
||||
modify_db_sites_trans(DB, Modifications) ->
|
||||
Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
|
||||
Sites0 = list_db_target_sites(Shards),
|
||||
Sites = lists:foldl(fun apply_transition/2, Sites0, Modifications),
|
||||
case Sites of
|
||||
Sites0 ->
|
||||
ok;
|
||||
{ok, unchanged};
|
||||
_Changed ->
|
||||
assign_db_sites_trans(DB, Sites)
|
||||
end.
|
||||
|
|
|
@ -239,7 +239,7 @@ t_rebalance(Config) ->
|
|||
),
|
||||
|
||||
%% Scale down the cluster by removing the first node.
|
||||
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
|
||||
?assertMatch({ok, _}, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
|
||||
ct:pal("Transitions (~p -> ~p): ~p~n", [
|
||||
Sites, tl(Sites), emqx_ds_test_helpers:transitions(N1, ?DB)
|
||||
]),
|
||||
|
@ -297,12 +297,12 @@ t_join_leave_errors(Config) ->
|
|||
),
|
||||
%% NOTE: Leaving a non-existent site is not an error.
|
||||
?assertEqual(
|
||||
ok,
|
||||
{ok, unchanged},
|
||||
ds_repl_meta(N1, leave_db_site, [?DB, <<"NO-MANS-SITE">>])
|
||||
),
|
||||
|
||||
%% Should be no-op.
|
||||
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])),
|
||||
?assertEqual({ok, unchanged}, ds_repl_meta(N1, join_db_site, [?DB, S1])),
|
||||
?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)),
|
||||
|
||||
%% Impossible to leave the last site.
|
||||
|
@ -312,13 +312,13 @@ t_join_leave_errors(Config) ->
|
|||
),
|
||||
|
||||
%% "Move" the DB to the other node.
|
||||
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])),
|
||||
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
|
||||
?assertMatch({ok, _}, ds_repl_meta(N1, join_db_site, [?DB, S2])),
|
||||
?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
|
||||
?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?DB)),
|
||||
?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
|
||||
|
||||
%% Should be no-op.
|
||||
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
|
||||
?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
|
||||
?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)).
|
||||
|
||||
t_rebalance_chaotic_converges(init, Config) ->
|
||||
|
@ -457,7 +457,7 @@ t_rebalance_offline_restarts(Config) ->
|
|||
|
||||
%% Shut down N3 and then remove it from the DB.
|
||||
ok = emqx_cth_cluster:stop_node(N3),
|
||||
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])),
|
||||
?assertMatch({ok, _}, ds_repl_meta(N1, leave_db_site, [?DB, S3])),
|
||||
Transitions = emqx_ds_test_helpers:transitions(N1, ?DB),
|
||||
ct:pal("Transitions: ~p~n", [Transitions]),
|
||||
|
||||
|
|
|
@ -164,8 +164,8 @@ apply_stream(DB, NodeStream0, Stream0, N) ->
|
|||
->
|
||||
?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}),
|
||||
%% Apply the transition.
|
||||
?assertEqual(
|
||||
ok,
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
?ON(
|
||||
Node,
|
||||
emqx_ds_replication_layer_meta:Operation(DB, Arg)
|
||||
|
|
|
@ -323,17 +323,11 @@ get_db(get, #{bindings := #{ds := DB}}) ->
|
|||
}).
|
||||
|
||||
db_replicas(get, #{bindings := #{ds := DB}}) ->
|
||||
Replicas = lists:flatmap(
|
||||
fun(Shard) ->
|
||||
#{replica_set := RS} = emqx_ds_replication_layer_meta:shard_info(DB, Shard),
|
||||
maps:keys(RS)
|
||||
end,
|
||||
emqx_ds_replication_layer_meta:shards(DB)
|
||||
),
|
||||
?OK(lists:usort(Replicas));
|
||||
Replicas = emqx_ds_replication_layer_meta:db_sites(DB),
|
||||
?OK(Replicas);
|
||||
db_replicas(put, #{bindings := #{ds := DB}, body := Sites}) ->
|
||||
case update_db_sites(DB, Sites, rest) of
|
||||
ok ->
|
||||
{ok, _} ->
|
||||
{202, <<"OK">>};
|
||||
{error, Description} ->
|
||||
?BAD_REQUEST(400, Description)
|
||||
|
@ -341,39 +335,43 @@ db_replicas(put, #{bindings := #{ds := DB}, body := Sites}) ->
|
|||
|
||||
db_replica(put, #{bindings := #{ds := DB, site := Site}}) ->
|
||||
case join(DB, Site, rest) of
|
||||
ok ->
|
||||
{ok, _} ->
|
||||
{202, <<"OK">>};
|
||||
{error, Description} ->
|
||||
?BAD_REQUEST(400, Description)
|
||||
end;
|
||||
db_replica(delete, #{bindings := #{ds := DB, site := Site}}) ->
|
||||
case leave(DB, Site, rest) of
|
||||
ok ->
|
||||
{ok, Sites} when is_list(Sites) ->
|
||||
{202, <<"OK">>};
|
||||
{ok, unchanged} ->
|
||||
?NOT_FOUND(<<"Site is not part of replica set">>);
|
||||
{error, Description} ->
|
||||
?BAD_REQUEST(400, Description)
|
||||
end.
|
||||
|
||||
-spec update_db_sites(emqx_ds:db(), [emqx_ds_replication_layer_meta:site()], rest | cli) ->
|
||||
ok | {error, binary()}.
|
||||
{ok, [emqx_ds_replication_layer_meta:site()]} | {error, _}.
|
||||
update_db_sites(DB, Sites, Via) when is_list(Sites) ->
|
||||
?SLOG(warning, #{
|
||||
?SLOG(notice, #{
|
||||
msg => "durable_storage_rebalance_request", ds => DB, sites => Sites, via => Via
|
||||
}),
|
||||
meta_result_to_binary(emqx_ds_replication_layer_meta:assign_db_sites(DB, Sites));
|
||||
update_db_sites(_, _, _) ->
|
||||
{error, <<"Bad type">>}.
|
||||
|
||||
-spec join(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}.
|
||||
-spec join(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) ->
|
||||
{ok, unchanged | [emqx_ds_replication_layer_meta:site()]} | {error, _}.
|
||||
join(DB, Site, Via) ->
|
||||
?SLOG(warning, #{
|
||||
?SLOG(notice, #{
|
||||
msg => "durable_storage_join_request", ds => DB, site => Site, via => Via
|
||||
}),
|
||||
meta_result_to_binary(emqx_ds_replication_layer_meta:join_db_site(DB, Site)).
|
||||
|
||||
-spec leave(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}.
|
||||
-spec leave(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) ->
|
||||
{ok, unchanged | [emqx_ds_replication_layer_meta:site()]} | {error, _}.
|
||||
leave(DB, Site, Via) ->
|
||||
?SLOG(warning, #{
|
||||
?SLOG(notice, #{
|
||||
msg => "durable_storage_leave_request", ds => DB, site => Site, via => Via
|
||||
}),
|
||||
meta_result_to_binary(emqx_ds_replication_layer_meta:leave_db_site(DB, Site)).
|
||||
|
@ -468,8 +466,8 @@ list_shards(DB) ->
|
|||
|| Shard <- emqx_ds_replication_layer_meta:shards(DB)
|
||||
].
|
||||
|
||||
meta_result_to_binary(ok) ->
|
||||
ok;
|
||||
meta_result_to_binary({ok, Result}) ->
|
||||
{ok, Result};
|
||||
meta_result_to_binary({error, {nonexistent_sites, UnknownSites}}) ->
|
||||
Msg = ["Unknown sites: " | lists:join(", ", UnknownSites)],
|
||||
{error, iolist_to_binary(Msg)};
|
||||
|
|
|
@ -174,6 +174,14 @@ t_leave(_) ->
|
|||
request_api(delete, Path)
|
||||
).
|
||||
|
||||
t_leave_notfound(_) ->
|
||||
Site = "not_part_of_replica_set",
|
||||
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", Site]),
|
||||
?assertMatch(
|
||||
{error, {_, 404, _}},
|
||||
request_api(delete, Path)
|
||||
).
|
||||
|
||||
parse_error({ok, Code, JSON}) ->
|
||||
{ok, Code, emqx_utils_json:decode(JSON)};
|
||||
parse_error(Err) ->
|
||||
|
|
Loading…
Reference in New Issue