From 2504b8126b5cb551173bdb722e017f9bbff03db0 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 5 Apr 2024 13:34:32 +0200 Subject: [PATCH] feat(ds): Pass mgmt_ds REST API calls to the application --- apps/emqx_management/src/emqx_mgmt_api_ds.erl | 57 ++++++++----------- .../test/emqx_mgmt_api_ds_SUITE.erl | 4 +- 2 files changed, 26 insertions(+), 35 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_ds.erl b/apps/emqx_management/src/emqx_mgmt_api_ds.erl index 8e64a7de5..c1a03feb4 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_ds.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_ds.erl @@ -349,55 +349,34 @@ db_replica(put, #{bindings := #{ds := DB, site := Site}}) -> db_replica(delete, #{bindings := #{ds := DB, site := Site}}) -> case leave(DB, Site, rest) of ok -> - {202, <<"OK">>} - %% {error, Description} -> - %% ?BAD_REQUEST(400, Description) + {202, <<"OK">>}; + {error, Description} -> + ?BAD_REQUEST(400, Description) end. -spec update_db_sites(emqx_ds:db(), [emqx_ds_replication_layer_meta:site()], rest | cli) -> ok | {error, binary()}. update_db_sites(DB, Sites, Via) when is_list(Sites) -> - UnknownSites = lists:usort(Sites) -- emqx_ds_replication_layer_meta:sites(), - case {UnknownSites, Sites} of - {[], [_ | _]} -> - ?SLOG(warning, #{ - msg => "durable_storage_rebalance_request", ds => DB, sites => Sites, via => Via - }), - %% TODO: Do stuff - ok; - {_, []} -> - {error, <<"Empty replica list">>}; - {UnknownSites, _} -> - Message = io_lib:format( - "Unknown sites: ~p", - [lists:map(fun binary_to_list/1, UnknownSites)] - ), - {error, iolist_to_binary(Message)} - end; + ?SLOG(warning, #{ + msg => "durable_storage_rebalance_request", ds => DB, sites => Sites, via => Via + }), + meta_result_to_binary(emqx_ds_replication_layer_meta:assign_db_sites(DB, Sites)); update_db_sites(_, _, _) -> {error, <<"Bad type">>}. -spec join(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}. join(DB, Site, Via) -> - case lists:member(Site, emqx_ds_replication_layer_meta:sites()) of - true -> - ?SLOG(warning, #{ - msg => "durable_storage_join_request", ds => DB, site => Site, via => Via - }), - %% TODO: Do stuff - ok; - false -> - Message = io_lib:format("Unknown site: ~s", [Site]), - {error, iolist_to_binary(Message)} - end. + ?SLOG(warning, #{ + msg => "durable_storage_join_request", ds => DB, site => Site, via => Via + }), + meta_result_to_binary(emqx_ds_replication_layer_meta:join_db_site(DB, Site)). -spec leave(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}. leave(DB, Site, Via) -> - %% TODO: Do stuff ?SLOG(warning, #{ msg => "durable_storage_leave_request", ds => DB, site => Site, via => Via }), - ok. + meta_result_to_binary(emqx_ds_replication_layer_meta:leave_db_site(DB, Site)). %%================================================================================ %% Internal functions @@ -488,3 +467,15 @@ list_shards(DB) -> end || Shard <- emqx_ds_replication_layer_meta:shards(DB) ]. + +meta_result_to_binary(ok) -> + ok; +meta_result_to_binary({error, {nonexistent_sites, UnknownSites}}) -> + Msg = ["Unknown sites: " | lists:join(", ", UnknownSites)], + {error, iolist_to_binary(Msg)}; +meta_result_to_binary({error, {nonexistent_db, DB}}) -> + IOList = io_lib:format("Unknown storage: ~p", [DB]), + {error, iolist_to_binary(IOList)}; +meta_result_to_binary({error, Err}) -> + IOList = io_lib:format("Error: ~p", [Err]), + {error, iolist_to_binary(IOList)}. diff --git a/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl index ee0544730..fef9276ca 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl @@ -144,7 +144,7 @@ t_put_replicas(_) -> Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]), %% Error cases: ?assertMatch( - {ok, 400, #{<<"message">> := <<"Unknown sites: [\"invalid_site\"]">>}}, + {ok, 400, #{<<"message">> := <<"Unknown sites: invalid_site">>}}, parse_error(request_api_with_body(put, Path, [<<"invalid_site">>])) ), %% Success case: @@ -170,7 +170,7 @@ t_leave(_) -> ThisSite = emqx_ds_replication_layer_meta:this_site(), Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]), ?assertMatch( - {ok, "OK"}, + {error, {_, 400, _}}, request_api(delete, Path) ).