From 0b1f0db73c1ca6b065875de70c15493e78c5586e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 15 Jul 2024 17:54:21 -0300 Subject: [PATCH] chore(cluster link): refactor HTTP API for CRUD operations Fixes https://emqx.atlassian.net/browse/EMQX-12627 --- .../src/emqx_cluster_link_api.erl | 168 +++++++++++++++--- .../src/emqx_cluster_link_config.erl | 102 ++++++++++- .../src/emqx_cluster_link_schema.erl | 5 +- .../test/emqx_cluster_link_api_SUITE.erl | 144 ++++++++++----- .../test/emqx_mgmt_api_test_util.erl | 27 +++ apps/emqx_utils/include/emqx_utils_api.hrl | 2 + 6 files changed, 380 insertions(+), 68 deletions(-) diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl index 33634607e..324d6dd68 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl @@ -7,6 +7,7 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/http_api.hrl"). +-include_lib("emqx_utils/include/emqx_utils_api.hrl"). -export([ api_spec/0, @@ -14,7 +15,10 @@ schema/1 ]). --export([config/2]). +-export([ + '/cluster/links'/2, + '/cluster/links/:name'/2 +]). -define(CONF_PATH, [cluster, links]). -define(TAGS, [<<"Cluster">>]). @@ -24,12 +28,13 @@ api_spec() -> paths() -> [ - "/cluster/links" + "/cluster/links", + "/cluster/links/:name" ]. schema("/cluster/links") -> #{ - 'operationId' => config, + 'operationId' => '/cluster/links', get => #{ description => "Get cluster links configuration", @@ -37,14 +42,63 @@ schema("/cluster/links") -> responses => #{200 => links_config_schema()} }, - put => + post => #{ - description => "Update cluster links configuration", + description => "Create a cluster link configuration", tags => ?TAGS, - 'requestBody' => links_config_schema(), + 'requestBody' => link_config_schema(), responses => #{ - 200 => links_config_schema(), + 200 => link_config_schema(), + 400 => + emqx_dashboard_swagger:error_codes( + [?BAD_REQUEST, ?ALREADY_EXISTS], + <<"Update Config Failed">> + ) + } + } + }; +schema("/cluster/links/:name") -> + #{ + 'operationId' => '/cluster/links/:name', + get => + #{ + description => "Get a cluster link configuration", + tags => ?TAGS, + parameters => [param_path_name()], + responses => + #{ + 200 => link_config_schema(), + 404 => emqx_dashboard_swagger:error_codes( + [?NOT_FOUND], <<"Cluster link not found">> + ) + } + }, + delete => + #{ + description => "Delete a cluster link configuration", + tags => ?TAGS, + parameters => [param_path_name()], + responses => + #{ + 204 => <<"Link deleted">>, + 404 => emqx_dashboard_swagger:error_codes( + [?NOT_FOUND], <<"Cluster link not found">> + ) + } + }, + put => + #{ + description => "Update a cluster link configuration", + tags => ?TAGS, + parameters => [param_path_name()], + 'requestBody' => update_link_config_schema(), + responses => + #{ + 200 => link_config_schema(), + 404 => emqx_dashboard_swagger:error_codes( + [?NOT_FOUND], <<"Cluster link not found">> + ), 400 => emqx_dashboard_swagger:error_codes( [?BAD_REQUEST], <<"Update Config Failed">> @@ -57,28 +111,66 @@ schema("/cluster/links") -> %% API Handler funcs %%-------------------------------------------------------------------- -config(get, _Params) -> - {200, get_raw()}; -config(put, #{body := Body}) -> - case emqx_cluster_link_config:update(Body) of - {ok, NewConfig} -> - {200, NewConfig}; - {error, Reason} -> - Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), - {400, ?BAD_REQUEST, Message} - end. +'/cluster/links'(get, _Params) -> + ?OK(get_raw()); +'/cluster/links'(post, #{body := Body = #{<<"name">> := Name}}) -> + with_link( + Name, + return(?BAD_REQUEST('ALREADY_EXISTS', <<"Cluster link already exists">>)), + fun() -> + case emqx_cluster_link_config:create(Body) of + {ok, Res} -> + ?CREATED(Res); + {error, Reason} -> + Message = list_to_binary(io_lib:format("Create link failed ~p", [Reason])), + ?BAD_REQUEST(Message) + end + end + ). + +'/cluster/links/:name'(get, #{bindings := #{name := Name}}) -> + with_link(Name, fun(Link) -> ?OK(Link) end, not_found()); +'/cluster/links/:name'(put, #{bindings := #{name := Name}, body := Params0}) -> + with_link( + Name, + fun(Link) -> + Params = Params0#{<<"name">> => Name}, + case emqx_cluster_link_config:update_one_link(Params) of + {ok, Res} -> + ?OK(Res); + {error, Reason} -> + Message = list_to_binary(io_lib:format("Update link failed ~p", [Reason])), + ?BAD_REQUEST(Message) + end + end, + not_found() + ); +'/cluster/links/:name'(delete, #{bindings := #{name := Name}}) -> + with_link( + Name, + fun() -> + case emqx_cluster_link_config:delete(Name) of + ok -> + ?NO_CONTENT; + {error, Reason} -> + Message = list_to_binary(io_lib:format("Delete link failed ~p", [Reason])), + ?BAD_REQUEST(Message) + end + end, + not_found() + ). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- get_raw() -> - #{<<"links">> := Conf} = + #{<<"cluster">> := #{<<"links">> := Links}} = emqx_config:fill_defaults( - #{<<"links">> => emqx_conf:get_raw(?CONF_PATH)}, + #{<<"cluster">> => #{<<"links">> => emqx_conf:get_raw(?CONF_PATH)}}, #{obfuscate_sensitive_values => true} ), - Conf. + Links. links_config_schema() -> emqx_cluster_link_schema:links_schema( @@ -87,6 +179,24 @@ links_config_schema() -> } ). +link_config_schema() -> + emqx_cluster_link_schema:link_schema(). + +param_path_name() -> + {name, + hoconsc:mk( + binary(), + #{ + in => path, + required => true, + example => <<"my_link">>, + desc => ?DESC("param_path_name") + } + )}. + +update_link_config_schema() -> + proplists:delete(name, emqx_cluster_link_schema:fields("link")). + links_config_example() -> [ #{ @@ -114,3 +224,21 @@ links_config_example() -> <<"name">> => <<"emqxcl_c">> } ]. + +with_link(Name, FoundFn, NotFoundFn) -> + case emqx_cluster_link_config:link_raw(Name) of + undefined -> + NotFoundFn(); + Link = #{} -> + {arity, Arity} = erlang:fun_info(FoundFn, arity), + case Arity of + 1 -> FoundFn(Link); + 0 -> FoundFn() + end + end. + +return(Response) -> + fun() -> Response end. + +not_found() -> + return(?NOT_FOUND(<<"Cluster link not found">>)). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index f27c7702e..36655460b 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -4,6 +4,8 @@ -module(emqx_cluster_link_config). +-feature(maybe_expr, enable). + -behaviour(emqx_config_handler). -include_lib("emqx/include/logger.hrl"). @@ -28,11 +30,15 @@ -export([ %% General + create/1, + delete/1, + update_one_link/1, update/1, cluster/0, enabled_links/0, links/0, link/1, + link_raw/1, topic_filters/1, %% Connections emqtt_options/1, @@ -55,6 +61,52 @@ %% +create(LinkConfig) -> + #{<<"name">> := Name} = LinkConfig, + case + emqx_conf:update( + ?LINKS_PATH, + {create, LinkConfig}, + #{rawconf_with_defaults => true, override_to => cluster} + ) + of + {ok, #{raw_config := NewConfigRows}} -> + NewLinkConfig = find_link(Name, NewConfigRows), + {ok, NewLinkConfig}; + {error, Reason} -> + {error, Reason} + end. + +delete(Name) -> + case + emqx_conf:update( + ?LINKS_PATH, + {delete, Name}, + #{rawconf_with_defaults => true, override_to => cluster} + ) + of + {ok, _} -> + ok; + {error, Reason} -> + {error, Reason} + end. + +update_one_link(LinkConfig) -> + #{<<"name">> := Name} = LinkConfig, + case + emqx_conf:update( + ?LINKS_PATH, + {update, LinkConfig}, + #{rawconf_with_defaults => true, override_to => cluster} + ) + of + {ok, #{raw_config := NewConfigRows}} -> + NewLinkConfig = find_link(Name, NewConfigRows), + {ok, NewLinkConfig}; + {error, Reason} -> + {error, Reason} + end. + update(Config) -> case emqx_conf:update( @@ -75,11 +127,20 @@ cluster() -> links() -> emqx:get_config(?LINKS_PATH, []). +links_raw() -> + emqx:get_raw_config(?LINKS_PATH, []). + enabled_links() -> [L || L = #{enable := true} <- links()]. link(Name) -> - case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, links()) of + find_link(Name, links()). + +link_raw(Name) -> + find_link(Name, links_raw()). + +find_link(Name, Links) -> + case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, Links) of [LinkConf | _] -> LinkConf; [] -> undefined end. @@ -133,6 +194,37 @@ remove_handler() -> pre_config_update(?LINKS_PATH, RawConf, RawConf) -> {ok, RawConf}; +pre_config_update(?LINKS_PATH, {create, LinkRawConf}, OldRawConf) -> + #{<<"name">> := Name} = LinkRawConf, + maybe + undefined ?= find_link(Name, OldRawConf), + NewRawConf0 = OldRawConf ++ [LinkRawConf], + NewRawConf = convert_certs(maybe_increment_ps_actor_incr(NewRawConf0, OldRawConf)), + {ok, NewRawConf} + else + _ -> + {error, already_exists} + end; +pre_config_update(?LINKS_PATH, {update, LinkRawConf}, OldRawConf) -> + #{<<"name">> := Name} = LinkRawConf, + maybe + {ok, {_Found, Front, Rear}} = safe_take(Name, OldRawConf), + NewRawConf0 = Front ++ [LinkRawConf] ++ Rear, + NewRawConf = convert_certs(maybe_increment_ps_actor_incr(NewRawConf0, OldRawConf)), + {ok, NewRawConf} + else + not_found -> + {error, not_found} + end; +pre_config_update(?LINKS_PATH, {delete, Name}, OldRawConf) -> + maybe + {ok, {_Found, Front, Rear}} = safe_take(Name, OldRawConf), + NewRawConf = Front ++ Rear, + {ok, NewRawConf} + else + _ -> + {error, not_found} + end; pre_config_update(?LINKS_PATH, NewRawConf, OldRawConf) -> {ok, convert_certs(maybe_increment_ps_actor_incr(NewRawConf, OldRawConf))}. @@ -320,3 +412,11 @@ do_convert_certs(LinkName, SSLOpts) -> ), throw({bad_ssl_config, Reason}) end. + +safe_take(Name, Transformations) -> + case lists:splitwith(fun(#{<<"name">> := N}) -> N =/= Name end, Transformations) of + {_Front, []} -> + not_found; + {Front, [Found | Rear]} -> + {ok, {Found, Front, Rear}} + end. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl index f46249a4f..a6073d677 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl @@ -12,7 +12,7 @@ -export([injected_fields/0]). %% Used in emqx_cluster_link_api --export([links_schema/1]). +-export([links_schema/1, link_schema/0]). -export([ roots/0, @@ -37,6 +37,9 @@ links_schema(Meta) -> default => [], validator => fun links_validator/1, desc => ?DESC("links") }). +link_schema() -> + hoconsc:ref(?MODULE, "link"). + fields("link") -> [ {enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})}, diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl index c5ec8da6c..5bb1c377a 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl @@ -37,6 +37,10 @@ "-----END CERTIFICATE-----" >>). +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + all() -> emqx_common_test_helpers:all(?MODULE). @@ -47,7 +51,7 @@ init_per_suite(Config) -> [ emqx_conf, emqx_management, - {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}, + emqx_mgmt_api_test_util:emqx_dashboard(), emqx_cluster_link ], #{work_dir => emqx_cth_suite:work_dir(Config)} @@ -61,8 +65,7 @@ end_per_suite(Config) -> ok. auth_header() -> - {ok, API} = emqx_common_test_http:create_default_app(), - emqx_common_test_http:auth_header(API). + emqx_mgmt_api_test_util:auth_header_(). init_per_testcase(_TC, Config) -> {ok, _} = emqx_cluster_link_config:update([]), @@ -71,62 +74,111 @@ init_per_testcase(_TC, Config) -> end_per_testcase(_TC, _Config) -> ok. -t_put_get_valid(Config) -> - Auth = ?config(auth, Config), - Path = ?API_PATH, - {ok, Resp} = emqx_mgmt_api_test_util:request_api(get, Path, Auth), - ?assertMatch([], emqx_utils_json:decode(Resp)), +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ - Link1 = #{ +api_root() -> + <<"cluster/links">>. + +list() -> + Path = emqx_mgmt_api_test_util:api_path([api_root()]), + emqx_mgmt_api_test_util:simple_request(get, Path, _Params = ""). + +get_link(Name) -> + Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]), + emqx_mgmt_api_test_util:simple_request(get, Path, _Params = ""). + +delete_link(Name) -> + Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]), + emqx_mgmt_api_test_util:simple_request(delete, Path, _Params = ""). + +update_link(Name, Params) -> + Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]), + emqx_mgmt_api_test_util:simple_request(put, Path, Params). + +create_link(Name, Params0) -> + Params = Params0#{<<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path([api_root()]), + emqx_mgmt_api_test_util:simple_request(post, Path, Params). + +link_params() -> + link_params(_Overrides = #{}). + +link_params(Overrides) -> + Default = #{ + <<"clientid">> => <<"linkclientid">>, + <<"username">> => <<"myusername">>, <<"pool_size">> => 1, <<"server">> => <<"emqxcl_2.nohost:31883">>, - <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], - <<"name">> => <<"emqcl_1">> + <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>] }, - Link2 = #{ - <<"pool_size">> => 1, - <<"server">> => <<"emqxcl_2.nohost:41883">>, - <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], - <<"name">> => <<"emqcl_2">> - }, - ?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link1, Link2])), + emqx_utils_maps:deep_merge(Default, Overrides). - {ok, Resp1} = emqx_mgmt_api_test_util:request_api(get, Path, Auth), - ?assertMatch([Link1, Link2], emqx_utils_json:decode(Resp1)), +%%------------------------------------------------------------------------------ +%% Test cases +%%------------------------------------------------------------------------------ + +t_put_get_valid(_Config) -> + ?assertMatch({200, []}, list()), + + Name1 = <<"emqcl_1">>, + Link1 = link_params(#{ + <<"server">> => <<"emqxcl_2.nohost:31883">>, + <<"name">> => Name1 + }), + Name2 = <<"emqcl_2">>, + Link2 = link_params(#{ + <<"server">> => <<"emqxcl_2.nohost:41883">>, + <<"name">> => Name2 + }), + ?assertMatch({201, _}, create_link(Name1, Link1)), + ?assertMatch({201, _}, create_link(Name2, Link2)), + ?assertMatch({200, [_, _]}, list()), DisabledLink1 = Link1#{<<"enable">> => false}, - ?assertMatch( - {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [DisabledLink1, Link2]) - ), - - {ok, Resp2} = emqx_mgmt_api_test_util:request_api(get, Path, Auth), - ?assertMatch([DisabledLink1, Link2], emqx_utils_json:decode(Resp2)), + ?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, DisabledLink1))), + ?assertMatch({200, #{<<"enable">> := false}}, get_link(Name1)), + ?assertMatch({200, #{<<"enable">> := true}}, get_link(Name2)), SSL = #{<<"enable">> => true, <<"cacertfile">> => ?CACERT}, SSLLink1 = Link1#{<<"ssl">> => SSL}, + ?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, SSLLink1))), ?assertMatch( - {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link2, SSLLink1]) + {200, #{<<"ssl">> := #{<<"enable">> := true, <<"cacertfile">> := _Path}}}, + get_link(Name1) ), - {ok, Resp3} = emqx_mgmt_api_test_util:request_api(get, Path, Auth), + ok. +t_put_invalid(_Config) -> + Name = <<"l1">>, + {201, _} = create_link(Name, link_params()), ?assertMatch( - [Link2, #{<<"ssl">> := #{<<"enable">> := true, <<"cacertfile">> := _Path}}], - emqx_utils_json:decode(Resp3) + {400, _}, + update_link(Name, maps:remove(<<"server">>, link_params())) ). -t_put_invalid(Config) -> - Auth = ?config(auth, Config), - Path = ?API_PATH, - Link = #{ - <<"pool_size">> => 1, - <<"server">> => <<"emqxcl_2.nohost:31883">>, - <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], - <<"name">> => <<"emqcl_1">> - }, - ?assertMatch( - {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link, Link]) - ), - ?assertMatch( - {error, {_, 400, _}}, - emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [maps:remove(<<"name">>, Link)]) - ). +t_crud(_Config) -> + %% No links initially. + ?assertMatch({200, []}, list()), + NameA = <<"a">>, + ?assertMatch({404, _}, get_link(NameA)), + ?assertMatch({404, _}, delete_link(NameA)), + ?assertMatch({404, _}, update_link(NameA, link_params())), + + Params1 = link_params(), + ?assertMatch({201, #{<<"name">> := NameA}}, create_link(NameA, Params1)), + ?assertMatch({400, #{<<"code">> := <<"ALREADY_EXISTS">>}}, create_link(NameA, Params1)), + ?assertMatch({200, [#{<<"name">> := NameA}]}, list()), + ?assertMatch({200, #{<<"name">> := NameA}}, get_link(NameA)), + + Params2 = Params1#{<<"pool_size">> := 2}, + ?assertMatch({200, #{<<"name">> := NameA}}, update_link(NameA, Params2)), + + ?assertMatch({204, _}, delete_link(NameA)), + ?assertMatch({404, _}, delete_link(NameA)), + ?assertMatch({404, _}, get_link(NameA)), + ?assertMatch({404, _}, update_link(NameA, Params1)), + ?assertMatch({200, []}, list()), + + ok. diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index 106a65a9c..4b1d40651 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -293,3 +293,30 @@ format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) -> FileNames ), erlang:iolist_to_binary([WithPaths, StartBoundary, <<"--">>, LineSeparator]). + +maybe_json_decode(X) -> + case emqx_utils_json:safe_decode(X, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> X + end. + +simple_request(Method, Path, Params) -> + AuthHeader = auth_header_(), + Opts = #{return_all => true}, + case request_api(Method, Path, "", AuthHeader, Params, Opts) of + {ok, {{_, Status, _}, _Headers, Body0}} -> + Body = maybe_json_decode(Body0), + {Status, Body}; + {error, {{_, Status, _}, _Headers, Body0}} -> + Body = + case emqx_utils_json:safe_decode(Body0, [return_maps]) of + {ok, Decoded0 = #{<<"message">> := Msg0}} -> + Msg = maybe_json_decode(Msg0), + Decoded0#{<<"message">> := Msg}; + {ok, Decoded0} -> + Decoded0; + {error, _} -> + Body0 + end, + {Status, Body} + end. diff --git a/apps/emqx_utils/include/emqx_utils_api.hrl b/apps/emqx_utils/include/emqx_utils_api.hrl index ba2941a4f..0876b9829 100644 --- a/apps/emqx_utils/include/emqx_utils_api.hrl +++ b/apps/emqx_utils/include/emqx_utils_api.hrl @@ -21,6 +21,8 @@ -define(OK(CONTENT), {200, CONTENT}). +-define(CREATED(CONTENT), {201, CONTENT}). + -define(NO_CONTENT, 204). -define(BAD_REQUEST(CODE, REASON), {400, ?ERROR_MSG(CODE, REASON)}).