chore(cluster link): refactor HTTP API for CRUD operations

Fixes https://emqx.atlassian.net/browse/EMQX-12627
This commit is contained in:
Thales Macedo Garitezi 2024-07-15 17:54:21 -03:00
parent d1c218303d
commit 0b1f0db73c
6 changed files with 380 additions and 68 deletions

View File

@ -7,6 +7,7 @@
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/http_api.hrl").
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
-export([
api_spec/0,
@ -14,7 +15,10 @@
schema/1
]).
-export([config/2]).
-export([
'/cluster/links'/2,
'/cluster/links/:name'/2
]).
-define(CONF_PATH, [cluster, links]).
-define(TAGS, [<<"Cluster">>]).
@ -24,12 +28,13 @@ api_spec() ->
paths() ->
[
"/cluster/links"
"/cluster/links",
"/cluster/links/:name"
].
schema("/cluster/links") ->
#{
'operationId' => config,
'operationId' => '/cluster/links',
get =>
#{
description => "Get cluster links configuration",
@ -37,14 +42,63 @@ schema("/cluster/links") ->
responses =>
#{200 => links_config_schema()}
},
put =>
post =>
#{
description => "Update cluster links configuration",
description => "Create a cluster link configuration",
tags => ?TAGS,
'requestBody' => links_config_schema(),
'requestBody' => link_config_schema(),
responses =>
#{
200 => links_config_schema(),
200 => link_config_schema(),
400 =>
emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST, ?ALREADY_EXISTS],
<<"Update Config Failed">>
)
}
}
};
schema("/cluster/links/:name") ->
#{
'operationId' => '/cluster/links/:name',
get =>
#{
description => "Get a cluster link configuration",
tags => ?TAGS,
parameters => [param_path_name()],
responses =>
#{
200 => link_config_schema(),
404 => emqx_dashboard_swagger:error_codes(
[?NOT_FOUND], <<"Cluster link not found">>
)
}
},
delete =>
#{
description => "Delete a cluster link configuration",
tags => ?TAGS,
parameters => [param_path_name()],
responses =>
#{
204 => <<"Link deleted">>,
404 => emqx_dashboard_swagger:error_codes(
[?NOT_FOUND], <<"Cluster link not found">>
)
}
},
put =>
#{
description => "Update a cluster link configuration",
tags => ?TAGS,
parameters => [param_path_name()],
'requestBody' => update_link_config_schema(),
responses =>
#{
200 => link_config_schema(),
404 => emqx_dashboard_swagger:error_codes(
[?NOT_FOUND], <<"Cluster link not found">>
),
400 =>
emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST], <<"Update Config Failed">>
@ -57,28 +111,66 @@ schema("/cluster/links") ->
%% API Handler funcs
%%--------------------------------------------------------------------
config(get, _Params) ->
{200, get_raw()};
config(put, #{body := Body}) ->
case emqx_cluster_link_config:update(Body) of
{ok, NewConfig} ->
{200, NewConfig};
{error, Reason} ->
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
{400, ?BAD_REQUEST, Message}
end.
'/cluster/links'(get, _Params) ->
?OK(get_raw());
'/cluster/links'(post, #{body := Body = #{<<"name">> := Name}}) ->
with_link(
Name,
return(?BAD_REQUEST('ALREADY_EXISTS', <<"Cluster link already exists">>)),
fun() ->
case emqx_cluster_link_config:create(Body) of
{ok, Res} ->
?CREATED(Res);
{error, Reason} ->
Message = list_to_binary(io_lib:format("Create link failed ~p", [Reason])),
?BAD_REQUEST(Message)
end
end
).
'/cluster/links/:name'(get, #{bindings := #{name := Name}}) ->
with_link(Name, fun(Link) -> ?OK(Link) end, not_found());
'/cluster/links/:name'(put, #{bindings := #{name := Name}, body := Params0}) ->
with_link(
Name,
fun(Link) ->
Params = Params0#{<<"name">> => Name},
case emqx_cluster_link_config:update_one_link(Params) of
{ok, Res} ->
?OK(Res);
{error, Reason} ->
Message = list_to_binary(io_lib:format("Update link failed ~p", [Reason])),
?BAD_REQUEST(Message)
end
end,
not_found()
);
'/cluster/links/:name'(delete, #{bindings := #{name := Name}}) ->
with_link(
Name,
fun() ->
case emqx_cluster_link_config:delete(Name) of
ok ->
?NO_CONTENT;
{error, Reason} ->
Message = list_to_binary(io_lib:format("Delete link failed ~p", [Reason])),
?BAD_REQUEST(Message)
end
end,
not_found()
).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
get_raw() ->
#{<<"links">> := Conf} =
#{<<"cluster">> := #{<<"links">> := Links}} =
emqx_config:fill_defaults(
#{<<"links">> => emqx_conf:get_raw(?CONF_PATH)},
#{<<"cluster">> => #{<<"links">> => emqx_conf:get_raw(?CONF_PATH)}},
#{obfuscate_sensitive_values => true}
),
Conf.
Links.
links_config_schema() ->
emqx_cluster_link_schema:links_schema(
@ -87,6 +179,24 @@ links_config_schema() ->
}
).
link_config_schema() ->
emqx_cluster_link_schema:link_schema().
param_path_name() ->
{name,
hoconsc:mk(
binary(),
#{
in => path,
required => true,
example => <<"my_link">>,
desc => ?DESC("param_path_name")
}
)}.
update_link_config_schema() ->
proplists:delete(name, emqx_cluster_link_schema:fields("link")).
links_config_example() ->
[
#{
@ -114,3 +224,21 @@ links_config_example() ->
<<"name">> => <<"emqxcl_c">>
}
].
with_link(Name, FoundFn, NotFoundFn) ->
case emqx_cluster_link_config:link_raw(Name) of
undefined ->
NotFoundFn();
Link = #{} ->
{arity, Arity} = erlang:fun_info(FoundFn, arity),
case Arity of
1 -> FoundFn(Link);
0 -> FoundFn()
end
end.
return(Response) ->
fun() -> Response end.
not_found() ->
return(?NOT_FOUND(<<"Cluster link not found">>)).

View File

@ -4,6 +4,8 @@
-module(emqx_cluster_link_config).
-feature(maybe_expr, enable).
-behaviour(emqx_config_handler).
-include_lib("emqx/include/logger.hrl").
@ -28,11 +30,15 @@
-export([
%% General
create/1,
delete/1,
update_one_link/1,
update/1,
cluster/0,
enabled_links/0,
links/0,
link/1,
link_raw/1,
topic_filters/1,
%% Connections
emqtt_options/1,
@ -55,6 +61,52 @@
%%
create(LinkConfig) ->
#{<<"name">> := Name} = LinkConfig,
case
emqx_conf:update(
?LINKS_PATH,
{create, LinkConfig},
#{rawconf_with_defaults => true, override_to => cluster}
)
of
{ok, #{raw_config := NewConfigRows}} ->
NewLinkConfig = find_link(Name, NewConfigRows),
{ok, NewLinkConfig};
{error, Reason} ->
{error, Reason}
end.
delete(Name) ->
case
emqx_conf:update(
?LINKS_PATH,
{delete, Name},
#{rawconf_with_defaults => true, override_to => cluster}
)
of
{ok, _} ->
ok;
{error, Reason} ->
{error, Reason}
end.
update_one_link(LinkConfig) ->
#{<<"name">> := Name} = LinkConfig,
case
emqx_conf:update(
?LINKS_PATH,
{update, LinkConfig},
#{rawconf_with_defaults => true, override_to => cluster}
)
of
{ok, #{raw_config := NewConfigRows}} ->
NewLinkConfig = find_link(Name, NewConfigRows),
{ok, NewLinkConfig};
{error, Reason} ->
{error, Reason}
end.
update(Config) ->
case
emqx_conf:update(
@ -75,11 +127,20 @@ cluster() ->
links() ->
emqx:get_config(?LINKS_PATH, []).
links_raw() ->
emqx:get_raw_config(?LINKS_PATH, []).
enabled_links() ->
[L || L = #{enable := true} <- links()].
link(Name) ->
case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, links()) of
find_link(Name, links()).
link_raw(Name) ->
find_link(Name, links_raw()).
find_link(Name, Links) ->
case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, Links) of
[LinkConf | _] -> LinkConf;
[] -> undefined
end.
@ -133,6 +194,37 @@ remove_handler() ->
pre_config_update(?LINKS_PATH, RawConf, RawConf) ->
{ok, RawConf};
pre_config_update(?LINKS_PATH, {create, LinkRawConf}, OldRawConf) ->
#{<<"name">> := Name} = LinkRawConf,
maybe
undefined ?= find_link(Name, OldRawConf),
NewRawConf0 = OldRawConf ++ [LinkRawConf],
NewRawConf = convert_certs(maybe_increment_ps_actor_incr(NewRawConf0, OldRawConf)),
{ok, NewRawConf}
else
_ ->
{error, already_exists}
end;
pre_config_update(?LINKS_PATH, {update, LinkRawConf}, OldRawConf) ->
#{<<"name">> := Name} = LinkRawConf,
maybe
{ok, {_Found, Front, Rear}} = safe_take(Name, OldRawConf),
NewRawConf0 = Front ++ [LinkRawConf] ++ Rear,
NewRawConf = convert_certs(maybe_increment_ps_actor_incr(NewRawConf0, OldRawConf)),
{ok, NewRawConf}
else
not_found ->
{error, not_found}
end;
pre_config_update(?LINKS_PATH, {delete, Name}, OldRawConf) ->
maybe
{ok, {_Found, Front, Rear}} = safe_take(Name, OldRawConf),
NewRawConf = Front ++ Rear,
{ok, NewRawConf}
else
_ ->
{error, not_found}
end;
pre_config_update(?LINKS_PATH, NewRawConf, OldRawConf) ->
{ok, convert_certs(maybe_increment_ps_actor_incr(NewRawConf, OldRawConf))}.
@ -320,3 +412,11 @@ do_convert_certs(LinkName, SSLOpts) ->
),
throw({bad_ssl_config, Reason})
end.
safe_take(Name, Transformations) ->
case lists:splitwith(fun(#{<<"name">> := N}) -> N =/= Name end, Transformations) of
{_Front, []} ->
not_found;
{Front, [Found | Rear]} ->
{ok, {Found, Front, Rear}}
end.

View File

@ -12,7 +12,7 @@
-export([injected_fields/0]).
%% Used in emqx_cluster_link_api
-export([links_schema/1]).
-export([links_schema/1, link_schema/0]).
-export([
roots/0,
@ -37,6 +37,9 @@ links_schema(Meta) ->
default => [], validator => fun links_validator/1, desc => ?DESC("links")
}).
link_schema() ->
hoconsc:ref(?MODULE, "link").
fields("link") ->
[
{enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})},

View File

@ -37,6 +37,10 @@
"-----END CERTIFICATE-----"
>>).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -47,7 +51,7 @@ init_per_suite(Config) ->
[
emqx_conf,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"},
emqx_mgmt_api_test_util:emqx_dashboard(),
emqx_cluster_link
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
@ -61,8 +65,7 @@ end_per_suite(Config) ->
ok.
auth_header() ->
{ok, API} = emqx_common_test_http:create_default_app(),
emqx_common_test_http:auth_header(API).
emqx_mgmt_api_test_util:auth_header_().
init_per_testcase(_TC, Config) ->
{ok, _} = emqx_cluster_link_config:update([]),
@ -71,62 +74,111 @@ init_per_testcase(_TC, Config) ->
end_per_testcase(_TC, _Config) ->
ok.
t_put_get_valid(Config) ->
Auth = ?config(auth, Config),
Path = ?API_PATH,
{ok, Resp} = emqx_mgmt_api_test_util:request_api(get, Path, Auth),
?assertMatch([], emqx_utils_json:decode(Resp)),
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
Link1 = #{
api_root() ->
<<"cluster/links">>.
list() ->
Path = emqx_mgmt_api_test_util:api_path([api_root()]),
emqx_mgmt_api_test_util:simple_request(get, Path, _Params = "").
get_link(Name) ->
Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]),
emqx_mgmt_api_test_util:simple_request(get, Path, _Params = "").
delete_link(Name) ->
Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]),
emqx_mgmt_api_test_util:simple_request(delete, Path, _Params = "").
update_link(Name, Params) ->
Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]),
emqx_mgmt_api_test_util:simple_request(put, Path, Params).
create_link(Name, Params0) ->
Params = Params0#{<<"name">> => Name},
Path = emqx_mgmt_api_test_util:api_path([api_root()]),
emqx_mgmt_api_test_util:simple_request(post, Path, Params).
link_params() ->
link_params(_Overrides = #{}).
link_params(Overrides) ->
Default = #{
<<"clientid">> => <<"linkclientid">>,
<<"username">> => <<"myusername">>,
<<"pool_size">> => 1,
<<"server">> => <<"emqxcl_2.nohost:31883">>,
<<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
<<"name">> => <<"emqcl_1">>
<<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>]
},
Link2 = #{
<<"pool_size">> => 1,
<<"server">> => <<"emqxcl_2.nohost:41883">>,
<<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
<<"name">> => <<"emqcl_2">>
},
?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link1, Link2])),
emqx_utils_maps:deep_merge(Default, Overrides).
{ok, Resp1} = emqx_mgmt_api_test_util:request_api(get, Path, Auth),
?assertMatch([Link1, Link2], emqx_utils_json:decode(Resp1)),
%%------------------------------------------------------------------------------
%% Test cases
%%------------------------------------------------------------------------------
t_put_get_valid(_Config) ->
?assertMatch({200, []}, list()),
Name1 = <<"emqcl_1">>,
Link1 = link_params(#{
<<"server">> => <<"emqxcl_2.nohost:31883">>,
<<"name">> => Name1
}),
Name2 = <<"emqcl_2">>,
Link2 = link_params(#{
<<"server">> => <<"emqxcl_2.nohost:41883">>,
<<"name">> => Name2
}),
?assertMatch({201, _}, create_link(Name1, Link1)),
?assertMatch({201, _}, create_link(Name2, Link2)),
?assertMatch({200, [_, _]}, list()),
DisabledLink1 = Link1#{<<"enable">> => false},
?assertMatch(
{ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [DisabledLink1, Link2])
),
{ok, Resp2} = emqx_mgmt_api_test_util:request_api(get, Path, Auth),
?assertMatch([DisabledLink1, Link2], emqx_utils_json:decode(Resp2)),
?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, DisabledLink1))),
?assertMatch({200, #{<<"enable">> := false}}, get_link(Name1)),
?assertMatch({200, #{<<"enable">> := true}}, get_link(Name2)),
SSL = #{<<"enable">> => true, <<"cacertfile">> => ?CACERT},
SSLLink1 = Link1#{<<"ssl">> => SSL},
?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, SSLLink1))),
?assertMatch(
{ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link2, SSLLink1])
{200, #{<<"ssl">> := #{<<"enable">> := true, <<"cacertfile">> := _Path}}},
get_link(Name1)
),
{ok, Resp3} = emqx_mgmt_api_test_util:request_api(get, Path, Auth),
ok.
t_put_invalid(_Config) ->
Name = <<"l1">>,
{201, _} = create_link(Name, link_params()),
?assertMatch(
[Link2, #{<<"ssl">> := #{<<"enable">> := true, <<"cacertfile">> := _Path}}],
emqx_utils_json:decode(Resp3)
{400, _},
update_link(Name, maps:remove(<<"server">>, link_params()))
).
t_put_invalid(Config) ->
Auth = ?config(auth, Config),
Path = ?API_PATH,
Link = #{
<<"pool_size">> => 1,
<<"server">> => <<"emqxcl_2.nohost:31883">>,
<<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>],
<<"name">> => <<"emqcl_1">>
},
?assertMatch(
{error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link, Link])
),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [maps:remove(<<"name">>, Link)])
).
t_crud(_Config) ->
%% No links initially.
?assertMatch({200, []}, list()),
NameA = <<"a">>,
?assertMatch({404, _}, get_link(NameA)),
?assertMatch({404, _}, delete_link(NameA)),
?assertMatch({404, _}, update_link(NameA, link_params())),
Params1 = link_params(),
?assertMatch({201, #{<<"name">> := NameA}}, create_link(NameA, Params1)),
?assertMatch({400, #{<<"code">> := <<"ALREADY_EXISTS">>}}, create_link(NameA, Params1)),
?assertMatch({200, [#{<<"name">> := NameA}]}, list()),
?assertMatch({200, #{<<"name">> := NameA}}, get_link(NameA)),
Params2 = Params1#{<<"pool_size">> := 2},
?assertMatch({200, #{<<"name">> := NameA}}, update_link(NameA, Params2)),
?assertMatch({204, _}, delete_link(NameA)),
?assertMatch({404, _}, delete_link(NameA)),
?assertMatch({404, _}, get_link(NameA)),
?assertMatch({404, _}, update_link(NameA, Params1)),
?assertMatch({200, []}, list()),
ok.

View File

@ -293,3 +293,30 @@ format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) ->
FileNames
),
erlang:iolist_to_binary([WithPaths, StartBoundary, <<"--">>, LineSeparator]).
maybe_json_decode(X) ->
case emqx_utils_json:safe_decode(X, [return_maps]) of
{ok, Decoded} -> Decoded;
{error, _} -> X
end.
simple_request(Method, Path, Params) ->
AuthHeader = auth_header_(),
Opts = #{return_all => true},
case request_api(Method, Path, "", AuthHeader, Params, Opts) of
{ok, {{_, Status, _}, _Headers, Body0}} ->
Body = maybe_json_decode(Body0),
{Status, Body};
{error, {{_, Status, _}, _Headers, Body0}} ->
Body =
case emqx_utils_json:safe_decode(Body0, [return_maps]) of
{ok, Decoded0 = #{<<"message">> := Msg0}} ->
Msg = maybe_json_decode(Msg0),
Decoded0#{<<"message">> := Msg};
{ok, Decoded0} ->
Decoded0;
{error, _} ->
Body0
end,
{Status, Body}
end.

View File

@ -21,6 +21,8 @@
-define(OK(CONTENT), {200, CONTENT}).
-define(CREATED(CONTENT), {201, CONTENT}).
-define(NO_CONTENT, 204).
-define(BAD_REQUEST(CODE, REASON), {400, ?ERROR_MSG(CODE, REASON)}).