feat(ds): Pass mgmt_ds REST API calls to the application

This commit is contained in:
ieQu1 2024-04-05 13:34:32 +02:00
parent 46261440cb
commit 2504b8126b
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
2 changed files with 26 additions and 35 deletions

View File

@ -349,55 +349,34 @@ db_replica(put, #{bindings := #{ds := DB, site := Site}}) ->
db_replica(delete, #{bindings := #{ds := DB, site := Site}}) ->
case leave(DB, Site, rest) of
ok ->
{202, <<"OK">>}
%% {error, Description} ->
%% ?BAD_REQUEST(400, Description)
{202, <<"OK">>};
{error, Description} ->
?BAD_REQUEST(400, Description)
end.
-spec update_db_sites(emqx_ds:db(), [emqx_ds_replication_layer_meta:site()], rest | cli) ->
ok | {error, binary()}.
update_db_sites(DB, Sites, Via) when is_list(Sites) ->
UnknownSites = lists:usort(Sites) -- emqx_ds_replication_layer_meta:sites(),
case {UnknownSites, Sites} of
{[], [_ | _]} ->
?SLOG(warning, #{
msg => "durable_storage_rebalance_request", ds => DB, sites => Sites, via => Via
}),
%% TODO: Do stuff
ok;
{_, []} ->
{error, <<"Empty replica list">>};
{UnknownSites, _} ->
Message = io_lib:format(
"Unknown sites: ~p",
[lists:map(fun binary_to_list/1, UnknownSites)]
),
{error, iolist_to_binary(Message)}
end;
?SLOG(warning, #{
msg => "durable_storage_rebalance_request", ds => DB, sites => Sites, via => Via
}),
meta_result_to_binary(emqx_ds_replication_layer_meta:assign_db_sites(DB, Sites));
update_db_sites(_, _, _) ->
{error, <<"Bad type">>}.
-spec join(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}.
join(DB, Site, Via) ->
case lists:member(Site, emqx_ds_replication_layer_meta:sites()) of
true ->
?SLOG(warning, #{
msg => "durable_storage_join_request", ds => DB, site => Site, via => Via
}),
%% TODO: Do stuff
ok;
false ->
Message = io_lib:format("Unknown site: ~s", [Site]),
{error, iolist_to_binary(Message)}
end.
?SLOG(warning, #{
msg => "durable_storage_join_request", ds => DB, site => Site, via => Via
}),
meta_result_to_binary(emqx_ds_replication_layer_meta:join_db_site(DB, Site)).
-spec leave(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}.
leave(DB, Site, Via) ->
%% TODO: Do stuff
?SLOG(warning, #{
msg => "durable_storage_leave_request", ds => DB, site => Site, via => Via
}),
ok.
meta_result_to_binary(emqx_ds_replication_layer_meta:leave_db_site(DB, Site)).
%%================================================================================
%% Internal functions
@ -488,3 +467,15 @@ list_shards(DB) ->
end
|| Shard <- emqx_ds_replication_layer_meta:shards(DB)
].
meta_result_to_binary(ok) ->
ok;
meta_result_to_binary({error, {nonexistent_sites, UnknownSites}}) ->
Msg = ["Unknown sites: " | lists:join(", ", UnknownSites)],
{error, iolist_to_binary(Msg)};
meta_result_to_binary({error, {nonexistent_db, DB}}) ->
IOList = io_lib:format("Unknown storage: ~p", [DB]),
{error, iolist_to_binary(IOList)};
meta_result_to_binary({error, Err}) ->
IOList = io_lib:format("Error: ~p", [Err]),
{error, iolist_to_binary(IOList)}.

View File

@ -144,7 +144,7 @@ t_put_replicas(_) ->
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]),
%% Error cases:
?assertMatch(
{ok, 400, #{<<"message">> := <<"Unknown sites: [\"invalid_site\"]">>}},
{ok, 400, #{<<"message">> := <<"Unknown sites: invalid_site">>}},
parse_error(request_api_with_body(put, Path, [<<"invalid_site">>]))
),
%% Success case:
@ -170,7 +170,7 @@ t_leave(_) ->
ThisSite = emqx_ds_replication_layer_meta:this_site(),
Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]),
?assertMatch(
{ok, "OK"},
{error, {_, 400, _}},
request_api(delete, Path)
).