feat(ds): Add a CLI for managing DB replicas

This commit is contained in:
ieQu1 2024-04-04 22:12:59 +02:00
parent a62db08676
commit 46261440cb
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
2 changed files with 43 additions and 2 deletions

View File

@ -357,7 +357,7 @@ db_replica(delete, #{bindings := #{ds := DB, site := Site}}) ->
-spec update_db_sites(emqx_ds:db(), [emqx_ds_replication_layer_meta:site()], rest | cli) -> -spec update_db_sites(emqx_ds:db(), [emqx_ds_replication_layer_meta:site()], rest | cli) ->
ok | {error, binary()}. ok | {error, binary()}.
update_db_sites(DB, Sites, Via) when is_list(Sites) -> update_db_sites(DB, Sites, Via) when is_list(Sites) ->
UnknownSites = Sites -- emqx_ds_replication_layer_meta:sites(), UnknownSites = lists:usort(Sites) -- emqx_ds_replication_layer_meta:sites(),
case {UnknownSites, Sites} of case {UnknownSites, Sites} of
{[], [_ | _]} -> {[], [_ | _]} ->
?SLOG(warning, #{ ?SLOG(warning, #{

View File

@ -810,9 +810,50 @@ ds(CMD) ->
do_ds(["info"]) -> do_ds(["info"]) ->
emqx_ds_replication_layer_meta:print_status(); emqx_ds_replication_layer_meta:print_status();
do_ds(["set_replicas", DBStr | SitesStr]) ->
case emqx_utils:safe_to_existing_atom(DBStr) of
{ok, DB} ->
Sites = lists:map(fun list_to_binary/1, SitesStr),
case emqx_mgmt_api_ds:update_db_sites(DB, Sites, cli) of
ok ->
emqx_ctl:print("ok~n");
{error, Description} ->
emqx_ctl:print("Unable to update replicas: ~s~n", [Description])
end;
{error, _} ->
emqx_ctl:print("Unknown durable storage")
end;
do_ds(["join", DBStr, Site]) ->
case emqx_utils:safe_to_existing_atom(DBStr) of
{ok, DB} ->
case emqx_mgmt_api_ds:join(DB, list_to_binary(Site), cli) of
ok ->
emqx_ctl:print("ok~n");
{error, Description} ->
emqx_ctl:print("Unable to update replicas: ~s~n", [Description])
end;
{error, _} ->
emqx_ctl:print("Unknown durable storage~n")
end;
do_ds(["leave", DBStr, Site]) ->
case emqx_utils:safe_to_existing_atom(DBStr) of
{ok, DB} ->
case emqx_mgmt_api_ds:leave(DB, list_to_binary(Site), cli) of
ok ->
emqx_ctl:print("ok~n");
{error, Description} ->
emqx_ctl:print("Unable to update replicas: ~s~n", [Description])
end;
{error, _} ->
emqx_ctl:print("Unknown durable storage~n")
end;
do_ds(_) -> do_ds(_) ->
emqx_ctl:usage([ emqx_ctl:usage([
{"ds info", "Show overview of the embedded durable storage state"} {"ds info", "Show overview of the embedded durable storage state"},
{"ds set_replicas <storage> <site1> <site2> ...",
"Change the replica set of the durable storage"},
{"ds join <storage> <site>", "Add site to the replica set of the storage"},
{"ds leave <storage> <site>", "Remove site from the replica set of the storage"}
]). ]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------