From a62db08676a17a52688a9968a37c52cb0c595f95 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 4 Apr 2024 12:17:14 +0200 Subject: [PATCH] feat(ds): Add REST API for durable storage --- .../src/emqx_ds_replication_layer_meta.erl | 22 +- apps/emqx_management/src/emqx_mgmt_api_ds.erl | 490 ++++++++++++++++++ .../test/emqx_mgmt_api_ds_SUITE.erl | 180 +++++++ 3 files changed, 691 insertions(+), 1 deletion(-) create mode 100644 apps/emqx_management/src/emqx_mgmt_api_ds.erl create mode 100644 apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 2fdd1c39d..66029d4ca 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -29,7 +29,9 @@ -export([ shards/1, my_shards/1, + shard_info/2, allocate_shards/1, + replica_set/2, sites/0, node/1, this_site/0, @@ -52,7 +54,6 @@ replica_set_transitions/2, update_replica_set/3, db_sites/1, - replica_set/2, target_set/2 ]). @@ -188,6 +189,25 @@ shards(DB) -> Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})), [Shard || #?SHARD_TAB{shard = {_, Shard}} <- Recs]. +-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + #{replica_set := #{site() => #{status => up | joining}}} + | undefined. +shard_info(DB, Shard) -> + case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of + [] -> + undefined; + [#?SHARD_TAB{replica_set = Replicas}] -> + ReplicaSet = maps:from_list([ + begin + %% TODO: + ReplInfo = #{status => up}, + {I, ReplInfo} + end + || I <- Replicas + ]), + #{replica_set => ReplicaSet} + end. + -spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. my_shards(DB) -> Site = this_site(), diff --git a/apps/emqx_management/src/emqx_mgmt_api_ds.erl b/apps/emqx_management/src/emqx_mgmt_api_ds.erl new file mode 100644 index 000000000..acd6cf462 --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_api_ds.erl @@ -0,0 +1,490 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_ds). + +-behaviour(minirest_api). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_utils/include/emqx_utils_api.hrl"). + +-import(hoconsc, [mk/2, ref/1, enum/1, array/1]). + +%% API: +-export([ + list_sites/2, + get_site/2, + list_dbs/2, + get_db/2, + db_replicas/2, + db_replica/2, + + update_db_sites/3, + join/3, + leave/3 +]). + +%% behavior callbacks: +-export([ + namespace/0, + api_spec/0, + schema/1, + paths/0, + fields/1 +]). + +%% internal exports: +-export([]). + +-export_type([]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(TAGS, [<<"Durable storage">>]). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +namespace() -> + undefined. + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + +paths() -> + [ + "/ds/sites", + "/ds/sites/:site", + "/ds/storages", + "/ds/storages/:ds", + "/ds/storages/:ds/replicas", + "/ds/storages/:ds/replicas/:site" + ]. + +schema("/ds/sites") -> + #{ + 'operationId' => list_sites, + get => + #{ + description => <<"List sites">>, + tags => ?TAGS, + responses => + #{ + 200 => mk(array(binary()), #{desc => <<"List sites">>}) + } + } + }; +schema("/ds/sites/:site") -> + #{ + 'operationId' => get_site, + get => + #{ + description => <<"Get sites">>, + parameters => [param_site_id()], + tags => ?TAGS, + responses => + #{ + 200 => mk(ref(site), #{desc => <<"Get information about the site">>}), + 404 => not_found(<<"Site">>) + } + } + }; +schema("/ds/storages") -> + #{ + 'operationId' => list_dbs, + get => + #{ + description => <<"List durable storages">>, + tags => ?TAGS, + responses => + #{ + 200 => mk(array(atom()), #{desc => <<"List durable storages">>}) + } + } + }; +schema("/ds/storages/:ds") -> + #{ + 'operationId' => get_db, + get => + #{ + description => <<"Get durable storage">>, + tags => ?TAGS, + parameters => [param_storage_id()], + responses => + #{ + 200 => mk(ref(db), #{desc => <<"Get information about a durable storage">>}), + 400 => not_found(<<"Durable storage">>) + } + } + }; +schema("/ds/storages/:ds/replicas") -> + Parameters = [param_storage_id()], + #{ + 'operationId' => db_replicas, + get => + #{ + description => <<"List replicas of the durable storage">>, + tags => ?TAGS, + parameters => Parameters, + responses => + #{ + 200 => mk(array(binary()), #{ + desc => <<"List sites that contain replicas of the durable storage">> + }), + 400 => not_found(<<"Durable storage">>) + } + }, + put => + #{ + description => <<"Update replicas of the durable storage">>, + tags => ?TAGS, + parameters => Parameters, + responses => + #{ + 202 => mk(array(binary()), #{}), + 400 => bad_request() + }, + 'requestBody' => mk(array(binary()), #{desc => <<"New list of sites">>}) + } + }; +schema("/ds/storages/:ds/replicas/:site") -> + Parameters = [param_storage_id(), param_site_id()], + #{ + 'operationId' => db_replica, + put => + #{ + description => <<"Add site as a replica for the durable storage">>, + tags => ?TAGS, + parameters => Parameters, + responses => + #{ + 202 => <<"OK">>, + 400 => bad_request(), + 404 => not_found(<<"Object">>) + } + }, + delete => + #{ + description => <<"Remove site as a replica for the durable storage">>, + tags => ?TAGS, + parameters => Parameters, + responses => + #{ + 202 => <<"OK">>, + 400 => bad_request(), + 404 => not_found(<<"Object">>) + } + } + }. + +fields(site) -> + [ + {node, + mk( + atom(), + #{ + desc => <<"Name of the EMQX handling the site">>, + example => <<"'emqx@example.com'">> + } + )}, + {up, + mk( + boolean(), + #{desc => <<"Site is up and running">>} + )}, + {shards, + mk( + array(ref(sites_shard)), + #{desc => <<"Durable storages that have replicas at the site">>} + )} + ]; +fields(sites_shard) -> + [ + {storage, + mk( + atom(), + #{ + desc => <<"Durable storage ID">>, + example => 'emqx_persistent_message' + } + )}, + {id, + mk( + binary(), + #{ + desc => <<"Shard ID">>, + example => <<"1">> + } + )}, + {status, + mk( + atom(), + #{ + desc => <<"Shard status">>, + example => up + } + )} + ]; +fields(db) -> + [ + {name, + mk( + atom(), + #{ + desc => <<"Name of the durable storage">>, + example => 'emqx_persistent_message' + } + )}, + {shards, + mk( + array(ref(db_shard)), + #{desc => <<"List of storage shards">>} + )} + ]; +fields(db_shard) -> + [ + {id, + mk( + binary(), + #{ + desc => <<"Shard ID">>, + example => <<"1">> + } + )}, + {replicas, + mk( + hoconsc:array(ref(db_site)), + #{desc => <<"List of sites containing replicas of the storage">>} + )} + ]; +fields(db_site) -> + [ + {site, + mk( + binary(), + #{ + desc => <<"Site ID">>, + example => example_site() + } + )}, + {status, + mk( + enum([up, joining]), + #{desc => <<"Status of the replica">>} + )} + ]. + +%%================================================================================ +%% Internal exports +%%================================================================================ + +list_sites(get, _Params) -> + {200, emqx_ds_replication_layer_meta:sites()}. + +get_site(get, #{bindings := #{site := Site}}) -> + case lists:member(Site, emqx_ds_replication_layer_meta:sites()) of + false -> + ?NOT_FOUND(<<"Site not found: ", Site/binary>>); + true -> + Node = emqx_ds_replication_layer_meta:node(Site), + IsUp = lists:member(Node, [node() | nodes()]), + Shards = shards_of_site(Site), + ?OK(#{ + node => Node, + up => IsUp, + shards => Shards + }) + end. + +list_dbs(get, _Params) -> + ?OK(dbs()). + +get_db(get, #{bindings := #{ds := DB}}) -> + ?OK(#{ + name => DB, + shards => list_shards(DB) + }). + +db_replicas(get, #{bindings := #{ds := DB}}) -> + Replicas = lists:flatmap( + fun(Shard) -> + #{replica_set := RS} = emqx_ds_replication_layer_meta:shard_info(DB, Shard), + maps:keys(RS) + end, + emqx_ds_replication_layer_meta:shards(DB) + ), + ?OK(lists:usort(Replicas)); +db_replicas(put, #{bindings := #{ds := DB}, body := Sites}) -> + case update_db_sites(DB, Sites, rest) of + ok -> + {202, <<"OK">>}; + {error, Description} -> + ?BAD_REQUEST(400, Description) + end. + +db_replica(put, #{bindings := #{ds := DB, site := Site}}) -> + case join(DB, Site, rest) of + ok -> + {202, <<"OK">>}; + {error, Description} -> + ?BAD_REQUEST(400, Description) + end; +db_replica(delete, #{bindings := #{ds := DB, site := Site}}) -> + case leave(DB, Site, rest) of + ok -> + {202, <<"OK">>} + %% {error, Description} -> + %% ?BAD_REQUEST(400, Description) + end. + +-spec update_db_sites(emqx_ds:db(), [emqx_ds_replication_layer_meta:site()], rest | cli) -> + ok | {error, binary()}. +update_db_sites(DB, Sites, Via) when is_list(Sites) -> + UnknownSites = Sites -- emqx_ds_replication_layer_meta:sites(), + case {UnknownSites, Sites} of + {[], [_ | _]} -> + ?SLOG(warning, #{ + msg => "durable_storage_rebalance_request", ds => DB, sites => Sites, via => Via + }), + %% TODO: Do stuff + ok; + {_, []} -> + {error, <<"Empty replica list">>}; + {UnknownSites, _} -> + Message = io_lib:format( + "Unknown sites: ~p", + [lists:map(fun binary_to_list/1, UnknownSites)] + ), + {error, iolist_to_binary(Message)} + end; +update_db_sites(_, _, _) -> + {error, <<"Bad type">>}. + +-spec join(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}. +join(DB, Site, Via) -> + case lists:member(Site, emqx_ds_replication_layer_meta:sites()) of + true -> + ?SLOG(warning, #{ + msg => "durable_storage_join_request", ds => DB, site => Site, via => Via + }), + %% TODO: Do stuff + ok; + false -> + Message = io_lib:format("Unknown site: ~s", [Site]), + {error, iolist_to_binary(Message)} + end. + +-spec leave(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}. +leave(DB, Site, Via) -> + %% TODO: Do stuff + ?SLOG(warning, #{ + msg => "durable_storage_leave_request", ds => DB, site => Site, via => Via + }), + ok. + +%%================================================================================ +%% Internal functions +%%================================================================================ + +%% site_info(Site) -> +%% #{}. + +not_found(What) -> + emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <>). + +bad_request() -> + emqx_dashboard_swagger:error_codes(['BAD_REQUEST'], <<"Bad request">>). + +param_site_id() -> + Info = #{ + required => true, + in => path, + desc => <<"Site ID">>, + example => example_site() + }, + {site, mk(binary(), Info)}. + +param_storage_id() -> + Info = #{ + required => true, + in => path, + desc => <<"Durable storage ID">>, + example => emqx_persistent_message + }, + {ds, mk(enum(dbs()), Info)}. + +example_site() -> + try + emqx_ds_replication_layer_meta:this_site() + catch + _:_ -> + <<"AFA18CB1C22F0157">> + end. + +dbs() -> + [emqx_persistent_message]. + +shards_of_site(Site) -> + lists:flatmap( + fun({DB, Shard}) -> + case emqx_ds_replication_layer_meta:shard_info(DB, Shard) of + #{replica_set := #{Site := Info}} -> + [ + #{ + storage => DB, + id => Shard, + status => maps:get(status, Info) + } + ]; + _ -> + [] + end + end, + [ + {DB, Shard} + || DB <- dbs(), + Shard <- emqx_ds_replication_layer_meta:shards(DB) + ] + ). + +list_shards(DB) -> + [ + begin + #{replica_set := RS} = emqx_ds_replication_layer_meta:shard_info(DB, Shard), + Replicas = maps:fold( + fun(Site, #{status := Status}, Acc) -> + [ + #{ + site => Site, + status => Status + } + | Acc + ] + end, + [], + RS + ), + #{ + id => Shard, + replicas => Replicas + } + end + || Shard <- emqx_ds_replication_layer_meta:shards(DB) + ]. diff --git a/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl new file mode 100644 index 000000000..ee0544730 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl @@ -0,0 +1,180 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_ds_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-import(emqx_mgmt_api_test_util, [api_path/1, request_api/2, request_api_with_body/3]). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, "session_persistence.enable = true"}, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _} = emqx_common_test_http:create_default_app(), + [{suite_apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(suite_apps, Config)). + +init_per_testcase(_, Config) -> + Config. + +end_per_testcase(_, Config) -> + Config. + +t_get_sites(_) -> + Path = api_path(["ds", "sites"]), + {ok, Response} = request_api(get, Path), + ?assertEqual( + [emqx_ds_replication_layer_meta:this_site()], + emqx_utils_json:decode(Response, [return_maps]) + ). + +t_get_storages(_) -> + Path = api_path(["ds", "storages"]), + {ok, Response} = request_api(get, Path), + ?assertEqual( + [<<"emqx_persistent_message">>], + emqx_utils_json:decode(Response, [return_maps]) + ). + +t_get_site(_) -> + %% Unknown sites must result in error 404: + Path404 = api_path(["ds", "sites", "unknown_site"]), + ?assertMatch( + {error, {_, 404, _}}, + request_api(get, Path404) + ), + %% Valid path: + Path = api_path(["ds", "sites", emqx_ds_replication_layer_meta:this_site()]), + {ok, Response} = request_api(get, Path), + ThisNode = atom_to_binary(node()), + ?assertMatch( + #{ + <<"node">> := ThisNode, + <<"up">> := true, + <<"shards">> := + [ + #{ + <<"storage">> := <<"emqx_persistent_message">>, + <<"id">> := _, + <<"status">> := <<"up">> + } + | _ + ] + }, + emqx_utils_json:decode(Response, [return_maps]) + ). + +t_get_db(_) -> + %% Unknown DBs must result in error 400 (since the DS parameter is an enum): + Path400 = api_path(["ds", "storages", "unknown_ds"]), + ?assertMatch( + {error, {_, 400, _}}, + request_api(get, Path400) + ), + %% Valid path: + Path = api_path(["ds", "storages", "emqx_persistent_message"]), + {ok, Response} = request_api(get, Path), + ThisSite = emqx_ds_replication_layer_meta:this_site(), + ?assertMatch( + #{ + <<"name">> := <<"emqx_persistent_message">>, + <<"shards">> := + [ + #{ + <<"id">> := _, + <<"replicas">> := + [ + #{ + <<"site">> := ThisSite, + <<"status">> := <<"up">> + } + | _ + ] + } + | _ + ] + }, + emqx_utils_json:decode(Response) + ). + +t_get_replicas(_) -> + %% Unknown DBs must result in error 400 (since the DS parameter is an enum): + Path400 = api_path(["ds", "storages", "unknown_ds", "replicas"]), + ?assertMatch( + {error, {_, 400, _}}, + request_api(get, Path400) + ), + %% Valid path: + Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]), + {ok, Response} = request_api(get, Path), + ThisSite = emqx_ds_replication_layer_meta:this_site(), + ?assertEqual( + [ThisSite], + emqx_utils_json:decode(Response) + ). + +t_put_replicas(_) -> + Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]), + %% Error cases: + ?assertMatch( + {ok, 400, #{<<"message">> := <<"Unknown sites: [\"invalid_site\"]">>}}, + parse_error(request_api_with_body(put, Path, [<<"invalid_site">>])) + ), + %% Success case: + ?assertMatch( + {ok, 202, <<"OK">>}, + request_api_with_body(put, Path, [emqx_ds_replication_layer_meta:this_site()]) + ). + +t_join(_) -> + Path400 = api_path(["ds", "storages", "emqx_persistent_message", "replicas", "unknown_site"]), + ?assertMatch( + {error, {_, 400, _}}, + parse_error(request_api(put, Path400)) + ), + ThisSite = emqx_ds_replication_layer_meta:this_site(), + Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]), + ?assertMatch( + {ok, "OK"}, + request_api(put, Path) + ). + +t_leave(_) -> + ThisSite = emqx_ds_replication_layer_meta:this_site(), + Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]), + ?assertMatch( + {ok, "OK"}, + request_api(delete, Path) + ). + +parse_error({ok, Code, JSON}) -> + {ok, Code, emqx_utils_json:decode(JSON)}; +parse_error(Err) -> + Err.