diff --git a/apps/emqx_management/src/emqx_mgmt_api_ds.erl b/apps/emqx_management/src/emqx_mgmt_api_ds.erl index acd6cf462..8e64a7de5 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_ds.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_ds.erl @@ -357,7 +357,7 @@ db_replica(delete, #{bindings := #{ds := DB, site := Site}}) -> -spec update_db_sites(emqx_ds:db(), [emqx_ds_replication_layer_meta:site()], rest | cli) -> ok | {error, binary()}. update_db_sites(DB, Sites, Via) when is_list(Sites) -> - UnknownSites = Sites -- emqx_ds_replication_layer_meta:sites(), + UnknownSites = lists:usort(Sites) -- emqx_ds_replication_layer_meta:sites(), case {UnknownSites, Sites} of {[], [_ | _]} -> ?SLOG(warning, #{ diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index ddbc60d5c..12dd23d77 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -810,9 +810,50 @@ ds(CMD) -> do_ds(["info"]) -> emqx_ds_replication_layer_meta:print_status(); +do_ds(["set_replicas", DBStr | SitesStr]) -> + case emqx_utils:safe_to_existing_atom(DBStr) of + {ok, DB} -> + Sites = lists:map(fun list_to_binary/1, SitesStr), + case emqx_mgmt_api_ds:update_db_sites(DB, Sites, cli) of + ok -> + emqx_ctl:print("ok~n"); + {error, Description} -> + emqx_ctl:print("Unable to update replicas: ~s~n", [Description]) + end; + {error, _} -> + emqx_ctl:print("Unknown durable storage") + end; +do_ds(["join", DBStr, Site]) -> + case emqx_utils:safe_to_existing_atom(DBStr) of + {ok, DB} -> + case emqx_mgmt_api_ds:join(DB, list_to_binary(Site), cli) of + ok -> + emqx_ctl:print("ok~n"); + {error, Description} -> + emqx_ctl:print("Unable to update replicas: ~s~n", [Description]) + end; + {error, _} -> + emqx_ctl:print("Unknown durable storage~n") + end; +do_ds(["leave", DBStr, Site]) -> + case emqx_utils:safe_to_existing_atom(DBStr) of + {ok, DB} -> + case emqx_mgmt_api_ds:leave(DB, list_to_binary(Site), cli) of + ok -> + emqx_ctl:print("ok~n"); + {error, Description} -> + emqx_ctl:print("Unable to update replicas: ~s~n", [Description]) + end; + {error, _} -> + emqx_ctl:print("Unknown durable storage~n") + end; do_ds(_) -> emqx_ctl:usage([ - {"ds info", "Show overview of the embedded durable storage state"} + {"ds info", "Show overview of the embedded durable storage state"}, + {"ds set_replicas ...", + "Change the replica set of the durable storage"}, + {"ds join ", "Add site to the replica set of the storage"}, + {"ds leave ", "Remove site from the replica set of the storage"} ]). %%--------------------------------------------------------------------