diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_api.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_api.erl new file mode 100644 index 000000000..0a8d41116 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_api.erl @@ -0,0 +1,218 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_api). + +-behaviour(minirest_api). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% Swagger specs from hocon schema +-export([ + api_spec/0, + paths/0, + schema/1, + namespace/0 +]). + +-export([ + fields/1, + roots/0 +]). + +-define(TAGS, [<<"Durable Queues">>]). + +%% API callbacks +-export([ + '/durable_queues'/2, + '/durable_queues/:id'/2 +]). + +-import(hoconsc, [mk/2, ref/1, ref/2]). +-import(emqx_dashboard_swagger, [error_codes/2]). + +namespace() -> "durable_queues". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + +paths() -> + [ + "/durable_queues", + "/durable_queues/:id" + ]. + +-define(NOT_FOUND, 'NOT_FOUND'). + +schema("/durable_queues") -> + #{ + 'operationId' => '/durable_queues', + get => #{ + tags => ?TAGS, + summary => <<"List declared durable queues">>, + description => ?DESC("durable_queues_get"), + responses => #{ + 200 => emqx_dashboard_swagger:schema_with_example( + durable_queues_get(), + durable_queues_get_example() + ) + } + } + }; +schema("/durable_queues/:id") -> + #{ + 'operationId' => '/durable_queues/:id', + get => #{ + tags => ?TAGS, + summary => <<"Get a declared durable queue">>, + description => ?DESC("durable_queue_get"), + parameters => [param_queue_id()], + responses => #{ + 200 => emqx_dashboard_swagger:schema_with_example( + durable_queue_get(), + durable_queue_get_example() + ), + 404 => error_codes([?NOT_FOUND], <<"Queue Not Found">>) + } + }, + delete => #{ + tags => ?TAGS, + summary => <<"Delete a declared durable queue">>, + description => ?DESC("durable_queue_delete"), + parameters => [param_queue_id()], + responses => #{ + 200 => <<"Queue deleted">>, + 404 => error_codes([?NOT_FOUND], <<"Queue Not Found">>) + } + }, + put => #{ + tags => ?TAGS, + summary => <<"Declare a durable queue">>, + description => ?DESC("durable_queues_put"), + parameters => [param_queue_id()], + 'requestBody' => durable_queue_put(), + responses => #{ + 200 => emqx_dashboard_swagger:schema_with_example( + durable_queue_get(), + durable_queue_get_example() + ) + } + } + }. + +'/durable_queues'(get, _Params) -> + {200, queue_list()}. + +'/durable_queues/:id'(get, Params) -> + case queue_get(Params) of + {ok, Queue} -> {200, Queue}; + not_found -> serialize_error(not_found) + end; +'/durable_queues/:id'(delete, Params) -> + case queue_delete(Params) of + ok -> {200, <<"Queue deleted">>}; + not_found -> serialize_error(not_found) + end; +'/durable_queues/:id'(put, Params) -> + {200, queue_put(Params)}. + +%%-------------------------------------------------------------------- +%% Actual handlers: stubs +%%-------------------------------------------------------------------- + +queue_list() -> + persistent_term:get({?MODULE, queues}, []). + +queue_get(#{bindings := #{id := ReqId}}) -> + case [Q || #{id := Id} = Q <- queue_list(), Id =:= ReqId] of + [Queue] -> {ok, Queue}; + [] -> not_found + end. + +queue_delete(#{bindings := #{id := ReqId}}) -> + Queues0 = queue_list(), + Queues1 = [Q || #{id := Id} = Q <- Queues0, Id =/= ReqId], + persistent_term:put({?MODULE, queues}, Queues1), + case Queues0 =:= Queues1 of + true -> not_found; + false -> ok + end. + +queue_put(#{bindings := #{id := ReqId}}) -> + Queues0 = queue_list(), + Queues1 = [Q || #{id := Id} = Q <- Queues0, Id =/= ReqId], + NewQueue = #{ + id => ReqId + }, + Queues2 = [NewQueue | Queues1], + persistent_term:put({?MODULE, queues}, Queues2), + NewQueue. + +%%-------------------------------------------------------------------- +%% Schemas +%%-------------------------------------------------------------------- + +param_queue_id() -> + { + id, + mk(binary(), #{ + in => path, + desc => ?DESC(param_queue_id), + required => true, + validator => fun validate_queue_id/1 + }) + }. + +validate_queue_id(Id) -> + case emqx_topic:words(Id) of + [Segment] when is_binary(Segment) -> true; + _ -> {error, <<"Invalid queue id">>} + end. + +durable_queues_get() -> + hoconsc:array(ref(durable_queue_get)). + +durable_queue_get() -> + ref(durable_queue_get). + +durable_queue_put() -> + map(). + +roots() -> []. + +fields(durable_queue_get) -> + [ + {id, mk(binary(), #{})} + ]. + +%%-------------------------------------------------------------------- +%% Examples +%%-------------------------------------------------------------------- + +durable_queue_get_example() -> + #{ + id => <<"queue1">> + }. + +durable_queues_get_example() -> + [ + #{ + id => <<"queue1">> + }, + #{ + id => <<"queue2">> + } + ]. + +%%-------------------------------------------------------------------- +%% Error codes +%%-------------------------------------------------------------------- + +serialize_error(not_found) -> + {404, #{ + code => <<"NOT_FOUND">>, + message => <<"Queue Not Found">> + }}. diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl new file mode 100644 index 000000000..0969bdcd1 --- /dev/null +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_api_SUITE.erl @@ -0,0 +1,140 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-import( + emqx_mgmt_api_test_util, + [ + request_api/2, + request/3, + uri/1 + ] +). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, #{ + config => #{ + <<"durable_sessions">> => #{ + <<"enable">> => true, + <<"renew_streams_interval">> => "100ms" + }, + <<"durable_storage">> => #{ + <<"messages">> => #{ + <<"backend">> => <<"builtin_raft">> + } + } + } + }}, + emqx_ds_shared_sub, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ], + #{work_dir => ?config(priv_dir, Config)} + ), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)), + ok. + +init_per_testcase(_TC, Config) -> + ok = snabbkaffe:start_trace(), + Config. + +end_per_testcase(_TC, _Config) -> + ok = snabbkaffe:stop(), + ok = terminate_leaders(), + ok. +%%-------------------------------------------------------------------- +%% Tests +%%-------------------------------------------------------------------- + +t_basic_crud(_Config) -> + ?assertMatch( + {ok, []}, + api_get(["durable_queues"]) + ), + + ?assertMatch( + {ok, 200, #{ + <<"id">> := <<"q1">> + }}, + api(put, ["durable_queues", "q1"], #{}) + ), + + ?assertMatch( + {error, {_, 404, _}}, + api_get(["durable_queues", "q2"]) + ), + + ?assertMatch( + {ok, 200, #{ + <<"id">> := <<"q2">> + }}, + api(put, ["durable_queues", "q2"], #{}) + ), + + ?assertMatch( + {ok, #{ + <<"id">> := <<"q2">> + }}, + api_get(["durable_queues", "q2"]) + ), + + ?assertMatch( + {ok, [#{<<"id">> := <<"q2">>}, #{<<"id">> := <<"q1">>}]}, + api_get(["durable_queues"]) + ), + + ?assertMatch( + {ok, 200, <<"Queue deleted">>}, + api(delete, ["durable_queues", "q2"], #{}) + ), + + ?assertMatch( + {ok, [#{<<"id">> := <<"q1">>}]}, + api_get(["durable_queues"]) + ). + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +api_get(Path) -> + case request_api(get, uri(Path)) of + {ok, ResponseBody} -> + {ok, jiffy:decode(list_to_binary(ResponseBody), [return_maps])}; + {error, _} = Error -> + Error + end. + +api(Method, Path, Data) -> + case request(Method, uri(Path), Data) of + {ok, Code, ResponseBody} -> + Res = + case emqx_utils_json:safe_decode(ResponseBody, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> ResponseBody + end, + {ok, Code, Res}; + {error, _} = Error -> + Error + end. + +terminate_leaders() -> + ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup), + {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup), + ok. diff --git a/rel/i18n/emqx_ds_shared_sub_api.hocon b/rel/i18n/emqx_ds_shared_sub_api.hocon new file mode 100644 index 000000000..369aeb88e --- /dev/null +++ b/rel/i18n/emqx_ds_shared_sub_api.hocon @@ -0,0 +1,34 @@ +emqx_ds_shared_sub_api { + +param_queue_id.desc: +"""The ID of the durable queue.""" + +param_queue_id.label: +"""Queue ID""" + +durable_queues_get.desc: +"""Get the list of durable queues.""" + +durable_queues_get.label: +"""Durable Queues""" + +durable_queue_get.desc: +"""Get the information of a durable queue.""" + +durable_queue_get.label: +"""Durable Queue""" + +durable_queue_delete.desc: +"""Delete a durable queue.""" + +durable_queue_delete.label: +"""Delete Durable Queue""" + +durable_queues_put.desc: +"""Create a durable queue.""" + +durable_queues_put.label: +"""Create Durable Queue""" + + +}