From adc6226eae9a3bb62d9e062db7d4bd07ff261b32 Mon Sep 17 00:00:00 2001 From: lafirest Date: Wed, 18 Aug 2021 12:05:19 +0800 Subject: [PATCH] refactor(emqx_retainer): emqx_retainer_api use openapi model --- .../src/emqx_mgmt_api_configs.erl | 2 +- apps/emqx_retainer/src/emqx_retainer.erl | 13 +- apps/emqx_retainer/src/emqx_retainer_api.erl | 231 +++++++++++++++--- .../src/emqx_retainer_mnesia.erl | 58 +++-- .../test/emqx_retainer_api_SUITE.erl | 17 +- 5 files changed, 256 insertions(+), 65 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index 6ff401048..1a89835ff 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -24,7 +24,7 @@ , config_reset/2 ]). --export([get_conf_schema/2]). +-export([get_conf_schema/2, gen_schema/1]). -define(PARAM_CONF_PATH, [#{ name => conf_path, diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index dfbe5cc69..4c36cb541 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -37,7 +37,8 @@ -export([ get_expiry_time/1 , update_config/1 , clean/0 - , delete/1]). + , delete/1 + , page_read/3]). %% gen_server callbacks -export([ init/1 @@ -66,6 +67,8 @@ -callback delete_message(context(), topic()) -> ok. -callback store_retained(context(), message()) -> ok. -callback read_message(context(), topic()) -> {ok, list()}. +-callback page_read(context(), topic(), non_neg_integer(), non_neg_integer()) -> + {ok, list()}. -callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. -callback clear_expired(context()) -> ok. -callback clean(context()) -> ok. @@ -166,6 +169,9 @@ clean() -> delete(Topic) -> gen_server:call(?MODULE, {?FUNCTION_NAME, Topic}). +page_read(Topic, Page, Limit) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, Topic, Page, Limit}). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -198,6 +204,11 @@ handle_call({delete, Topic}, _, #{context := Context} = State) -> delete_message(Context, Topic), {reply, ok, State}; +handle_call({page_read, Topic, Page, Limit}, _, #{context := Context} = State) -> + Mod = get_backend_module(), + Result = Mod:page_read(Context, Topic, Page, Limit), + {reply, Result, State}; + handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index a0b72c858..d766eab06 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -16,52 +16,213 @@ -module(emqx_retainer_api). --rest_api(#{name => lookup_config, - method => 'GET', - path => "/retainer", - func => lookup_config, - descr => "lookup retainer config" - }). +-behaviour(minirest_api). --rest_api(#{name => update_config, - method => 'PUT', - path => "/retainer", - func => update_config, - descr => "update retainer config" - }). +-include_lib("emqx/include/emqx.hrl"). --export([ lookup_config/2 - , update_config/2 - ]). +-export([api_spec/0]). -lookup_config(_Bindings, _Params) -> - Config = emqx:get_config([emqx_retainer]), - return({ok, Config}). +-export([ lookup_retained_warp/2 + , with_topic_warp/2 + , config/2]). -update_config(_Bindings, Params) -> +-import(emqx_mgmt_api_configs, [gen_schema/1]). +-import(emqx_mgmt_util, [ response_array_schema/2 + , response_schema/1 + , response_error_schema/2]). + +-define(CFG_BODY(DESCR), + #{description => list_to_binary(DESCR), + content => #{<<"application/json">> => + #{schema => gen_schema(emqx_config:get([emqx_retainer]))}}}). + +api_spec() -> + { + [ lookup_retained_api() + , with_topic_api() + , config_api() + ], + [ message_schema(message, fun message_properties/0) + , message_schema(detail_message, fun detail_message_properties/0) + ] + }. + +lookup_retained_api() -> + Metadata = + #{get => #{description => <<"lookup matching messages">>, + parameters => [ #{name => page, + in => query, + description => <<"Page">>, + schema => #{type => integer, default => 1}} + , #{name => limit, + in => query, + description => <<"Page size">>, + schema => #{type => integer, + default => emqx_mgmt:max_row_limit()}} + ], + responses => #{ <<"200">> => + response_array_schema("List retained messages", message) + , <<"405">> => response_schema(<<"NotAllowed">>) + }}}, + {"/mqtt/retainer/messages", Metadata, lookup_retained_warp}. + +with_topic_api() -> + MetaData = #{get => #{description => <<"lookup matching messages">>, + parameters => [ #{name => topic, + in => path, + required => true, + schema => #{type => "string"}} + , #{name => page, + in => query, + description => <<"Page">>, + schema => #{type => integer, default => 1}} + , #{name => limit, + in => query, + description => <<"Page size">>, + schema => #{type => integer, + default => emqx_mgmt:max_row_limit()}} + ], + responses => #{ <<"200">> => + response_array_schema("List retained messages", detail_message) + , <<"405">> => response_schema(<<"NotAllowed">>)}}, + delete => #{description => <<"delete matching messages">>, + parameters => [#{name => topic, + in => path, + required => true, + schema => #{type => "string"}}], + responses => #{ <<"200">> => response_schema(<<"Successed">>) + , <<"405">> => response_schema(<<"NotAllowed">>)}} + }, + {"/mqtt/retainer/message/:topic", MetaData, with_topic_warp}. + +config_api() -> + MetaData = #{ + get => #{ + description => <<"get retainer config">>, + responses => #{<<"200">> => ?CFG_BODY("Get configs successfully"), + <<"404">> => response_error_schema( + <<"Config not found">>, ['NOT_FOUND'])} + }, + put => #{ + description => <<"Update retainer config">>, + 'requestBody' => + ?CFG_BODY("The format of the request body is depend on the 'conf_path' parameter in the query string"), + responses => #{<<"200">> => response_schema("Update configs successfully"), + <<"400">> => response_error_schema( + <<"Update configs failed">>, ['UPDATE_FAILED'])} + } + }, + {"/mqtt/retainer", MetaData, config}. + +lookup_retained_warp(Type, Req) -> + check_backend(Type, Req, fun lookup_retained/2). + +with_topic_warp(Type, Req) -> + check_backend(Type, Req, fun with_topic/2). + +config(get, _) -> + Config = emqx_config:get([emqx_retainer]), + Body = emqx_json:encode(Config), + {200, Body}; + +config(put, Req) -> try - ConfigList = proplists:get_value(<<"emqx_retainer">>, Params), - {ok, RawConf} = hocon:binary(jsx:encode(#{<<"emqx_retainer">> => ConfigList}), + {ok, Body, _} = cowboy_req:read_body(Req), + Cfg = emqx_json:decode(Body), + {ok, RawConf} = hocon:binary(jsx:encode(#{<<"emqx_retainer">> => Cfg}), #{format => richmap}), RichConf = hocon_schema:check(emqx_retainer_schema, RawConf, #{atom_key => true}), #{emqx_retainer := Conf} = hocon_schema:richmap_to_map(RichConf), - Action = proplists:get_value(<<"action">>, Params, undefined), - do_update_config(Action, Conf), - return() - catch _:_:Reason -> - return({error, Reason}) + emqx_retainer:update_config(Conf), + {200, #{<<"content-type">> => <<"text/plain">>}, <<"Update configs successfully">>} + catch _:Reason:_ -> + {400, + #{code => 'UPDATE_FAILED', + message => erlang:list_to_binary(io_lib:format("~p~n", [Reason]))}} end. %%------------------------------------------------------------------------------ %% Interval Funcs %%------------------------------------------------------------------------------ -do_update_config(undefined, Config) -> - emqx_retainer:update_config(Config); -do_update_config(<<"test">>, _) -> - ok. +lookup_retained(get, Req) -> + lookup(undefined, Req, fun format_message/1). -%% TODO: V5 API -return() -> - ok. -return(_) -> - ok. +with_topic(get, Req) -> + Topic = cowboy_req:binding(topic, Req), + lookup(Topic, Req, fun format_detail_message/1); + +with_topic(delete, Req) -> + Topic = cowboy_req:binding(topic, Req), + emqx_retainer_mnesia:delete_message(undefined, Topic), + {200}. + +-spec lookup(undefined | binary(), + cowboy_req:req(), + fun((#message{}) -> map())) -> + {200, map()}. +lookup(Topic, Req, Formatter) -> + #{page := Page, + limit := Limit} = cowboy_req:match_qs([{page, int, 1}, + {limit, int, emqx_mgmt:max_row_limit()}], + Req), + {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit), + {200, format_message(Msgs, Formatter)}. + + +message_schema(Type, Properties) -> + #{Type => #{type => object, + properties => Properties()}}. + +message_properties() -> + #{msgid => #{type => string, + description => <<"Message ID">>}, + topic => #{type => string, + description => <<"Topic">>}, + qos => #{type => integer, + enum => [0, 1, 2], + description => <<"Qos">>}, + publish_at => #{type => string, + description => <<"publish datetime">>}, + from_clientid => #{type => string, + description => <<"Message from">>}, + from_username => #{type => string, + description => <<"publish username">>}}. + +detail_message_properties() -> + Base = message_properties(), + Base#{payload => #{type => string, + description => <<"Topic">>}}. + +format_message(Messages, Formatter) when is_list(Messages)-> + [Formatter(Message) || Message <- Messages]; + +format_message(Message, Formatter) -> + Formatter(Message). + +format_message(#message{id = ID, qos = Qos, topic = Topic, from = From, timestamp = Timestamp, headers = Headers}) -> + #{msgid => emqx_guid:to_hexstr(ID), + qos => Qos, + topic => Topic, + publish_at => erlang:list_to_binary(emqx_mgmt_util:strftime(Timestamp div 1000)), + from_clientid => to_bin_string(From), + from_username => maps:get(username, Headers, <<>>) + }. + +format_detail_message(#message{payload = Payload} = Msg) -> + Base = format_message(Msg), + Base#{payload => Payload}. + +to_bin_string(Data) when is_binary(Data) -> + Data; +to_bin_string(Data) -> + list_to_binary(io_lib:format("~p", [Data])). + +check_backend(Type, Req, Cont) -> + case emqx:get_config([emqx_retainer, config, type]) of + built_in_database -> + Cont(Type, Req); + _ -> + {405, + #{<<"content-type">> => <<"text/plain">>}, + <<"This API only for built in database">>} + end. diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 1c9956050..5f91a40c9 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -24,9 +24,10 @@ -include_lib("stdlib/include/qlc.hrl"). --export([delete_message/2 +-export([ delete_message/2 , store_retained/2 , read_message/2 + , page_read/4 , match_messages/3 , clear_expired/1 , clean/1]). @@ -129,6 +130,19 @@ delete_message(_, Topic) -> read_message(_, Topic) -> {ok, read_messages(Topic)}. +page_read(_, Topic, Page, Limit) -> + Cursor = make_cursor(Topic), + case Page > 1 of + true -> + _ = qlc:next_answers(Cursor, (Page - 1) * Limit), + ok; + _ -> + ok + end, + Rows = qlc:next_answers(Cursor, Limit), + qlc:delete_cursor(Cursor), + {ok, Rows}. + match_messages(_, Topic, Cursor) -> MaxReadNum = emqx:get_config([?APP, flow_control, max_read_number]), case Cursor of @@ -152,34 +166,28 @@ clean(_) -> sort_retained([]) -> []; sort_retained([Msg]) -> [Msg]; sort_retained(Msgs) -> - lists:sort(fun(#message{timestamp = Ts1}, #message{timestamp = Ts2}) -> - Ts1 =< Ts2 end, - Msgs). + lists:sort(fun compare_message/2, Msgs). + +compare_message(M1, M2) -> + M1#message.timestamp =< M2#message.timestamp. -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- topic2tokens(Topic) -> emqx_topic:words(Topic). -spec start_batch_read(topic(), pos_integer()) -> batch_read_result(). start_batch_read(Topic, MaxReadNum) -> - Ms = make_match_spec(Topic), - TabQH = ets:table(?TAB, [{traverse, {select, Ms}}]), - QH = qlc:q([E || E <- TabQH]), - Cursor = qlc:cursor(QH), + Cursor = make_cursor(Topic), batch_read_messages(Cursor, MaxReadNum). -spec batch_read_messages(emqx_retainer_storage:cursor(), pos_integer()) -> batch_read_result(). batch_read_messages(Cursor, MaxReadNum) -> Answers = qlc:next_answers(Cursor, MaxReadNum), - Orders = sort_retained(Answers), - case erlang:length(Orders) < MaxReadNum of + case erlang:length(Answers) < MaxReadNum of true -> qlc:delete_cursor(Cursor), - {ok, Orders, undefined}; + {ok, Answers, undefined}; _ -> - {ok, Orders, Cursor} + {ok, Answers, Cursor} end. -spec(read_messages(emqx_types:topic()) @@ -217,14 +225,28 @@ condition(Ws) -> _ -> (Ws1 -- ['#']) ++ '_' end. --spec make_match_spec(topic()) -> ets:match_spec(). -make_match_spec(Filter) -> +-spec make_match_spec(undefined | topic()) -> ets:match_spec(). +make_match_spec(Topic) -> NowMs = erlang:system_time(millisecond), - Cond = condition(emqx_topic:words(Filter)), + Cond = + case Topic of + undefined -> + '_'; + _ -> + condition(emqx_topic:words(Topic)) + end, MsHd = #retained{topic = Cond, msg = '$2', expiry_time = '$3'}, [{MsHd, [{'=:=', '$3', 0}], ['$2']}, {MsHd, [{'>', '$3', NowMs}], ['$2']}]. +-spec make_cursor(undefined | topic()) -> qlc:query_cursor(). +make_cursor(Topic) -> + Ms = make_match_spec(Topic), + TabQH = ets:table(?TAB, [{traverse, {select, Ms}}]), + QH = qlc:q([E || E <- TabQH]), + QH2 = qlc:sort(QH, {order, fun compare_message/2}), + qlc:cursor(QH2). + -spec is_table_full() -> boolean(). is_table_full() -> #{max_retained_messages := Limit} = emqx:get_config([?APP, config]), diff --git a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl index 6ce64ae2e..5d379e8cd 100644 --- a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl @@ -33,10 +33,11 @@ -define(HOST, "http://127.0.0.1:8081/"). -define(API_VERSION, "v4"). -define(BASE_PATH, "api"). +-define(CFG_URI, "/configs/retainer"). all() -> -%% TODO: V5 API -%% emqx_ct:all(?MODULE). + %% TODO: V5 API + %% emqx_ct:all(?MODULE). []. groups() -> @@ -69,16 +70,12 @@ set_special_configs(_) -> %%------------------------------------------------------------------------------ t_config(_Config) -> - {ok, Return} = request_http_rest_lookup(["retainer"]), + {ok, Return} = request_http_rest_lookup([?CFG_URI]), NowCfg = get_http_data(Return), NewCfg = NowCfg#{<<"msg_expiry_interval">> => timer:seconds(60)}, RetainerConf = #{<<"emqx_retainer">> => NewCfg}, - {ok, _} = request_http_rest_update(["retainer?action=test"], RetainerConf), - {ok, TestReturn} = request_http_rest_lookup(["retainer"]), - ?assertEqual(NowCfg, get_http_data(TestReturn)), - - {ok, _} = request_http_rest_update(["retainer"], RetainerConf), + {ok, _} = request_http_rest_update([?CFG_URI], RetainerConf), {ok, UpdateReturn} = request_http_rest_lookup(["retainer"]), ?assertEqual(NewCfg, get_http_data(UpdateReturn)), ok. @@ -141,12 +138,12 @@ receive_messages(Count, Msgs) -> end. switch_emqx_retainer(undefined, IsEnable) -> - {ok, Return} = request_http_rest_lookup(["retainer"]), + {ok, Return} = request_http_rest_lookup([?COMMON_SHARD]), NowCfg = get_http_data(Return), switch_emqx_retainer(NowCfg, IsEnable); switch_emqx_retainer(NowCfg, IsEnable) -> NewCfg = NowCfg#{<<"enable">> => IsEnable}, RetainerConf = #{<<"emqx_retainer">> => NewCfg}, - {ok, _} = request_http_rest_update(["retainer"], RetainerConf), + {ok, _} = request_http_rest_update([?CFG_URI], RetainerConf), NewCfg.