refactor(emqx_retainer): emqx_retainer_api use openapi model

This commit is contained in:
lafirest 2021-08-18 12:05:19 +08:00
parent 213b7c2501
commit adc6226eae
5 changed files with 256 additions and 65 deletions

View File

@ -24,7 +24,7 @@
, config_reset/2 , config_reset/2
]). ]).
-export([get_conf_schema/2]). -export([get_conf_schema/2, gen_schema/1]).
-define(PARAM_CONF_PATH, [#{ -define(PARAM_CONF_PATH, [#{
name => conf_path, name => conf_path,

View File

@ -37,7 +37,8 @@
-export([ get_expiry_time/1 -export([ get_expiry_time/1
, update_config/1 , update_config/1
, clean/0 , clean/0
, delete/1]). , delete/1
, page_read/3]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([ init/1
@ -66,6 +67,8 @@
-callback delete_message(context(), topic()) -> ok. -callback delete_message(context(), topic()) -> ok.
-callback store_retained(context(), message()) -> ok. -callback store_retained(context(), message()) -> ok.
-callback read_message(context(), topic()) -> {ok, list()}. -callback read_message(context(), topic()) -> {ok, list()}.
-callback page_read(context(), topic(), non_neg_integer(), non_neg_integer()) ->
{ok, list()}.
-callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. -callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}.
-callback clear_expired(context()) -> ok. -callback clear_expired(context()) -> ok.
-callback clean(context()) -> ok. -callback clean(context()) -> ok.
@ -166,6 +169,9 @@ clean() ->
delete(Topic) -> delete(Topic) ->
gen_server:call(?MODULE, {?FUNCTION_NAME, Topic}). gen_server:call(?MODULE, {?FUNCTION_NAME, Topic}).
page_read(Topic, Page, Limit) ->
gen_server:call(?MODULE, {?FUNCTION_NAME, Topic, Page, Limit}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -198,6 +204,11 @@ handle_call({delete, Topic}, _, #{context := Context} = State) ->
delete_message(Context, Topic), delete_message(Context, Topic),
{reply, ok, State}; {reply, ok, State};
handle_call({page_read, Topic, Page, Limit}, _, #{context := Context} = State) ->
Mod = get_backend_module(),
Result = Mod:page_read(Context, Topic, Page, Limit),
{reply, Result, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]), ?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}. {reply, ignored, State}.

View File

@ -16,52 +16,213 @@
-module(emqx_retainer_api). -module(emqx_retainer_api).
-rest_api(#{name => lookup_config, -behaviour(minirest_api).
method => 'GET',
path => "/retainer",
func => lookup_config,
descr => "lookup retainer config"
}).
-rest_api(#{name => update_config, -include_lib("emqx/include/emqx.hrl").
method => 'PUT',
path => "/retainer",
func => update_config,
descr => "update retainer config"
}).
-export([ lookup_config/2 -export([api_spec/0]).
, update_config/2
]).
lookup_config(_Bindings, _Params) -> -export([ lookup_retained_warp/2
Config = emqx:get_config([emqx_retainer]), , with_topic_warp/2
return({ok, Config}). , config/2]).
update_config(_Bindings, Params) -> -import(emqx_mgmt_api_configs, [gen_schema/1]).
-import(emqx_mgmt_util, [ response_array_schema/2
, response_schema/1
, response_error_schema/2]).
-define(CFG_BODY(DESCR),
#{description => list_to_binary(DESCR),
content => #{<<"application/json">> =>
#{schema => gen_schema(emqx_config:get([emqx_retainer]))}}}).
api_spec() ->
{
[ lookup_retained_api()
, with_topic_api()
, config_api()
],
[ message_schema(message, fun message_properties/0)
, message_schema(detail_message, fun detail_message_properties/0)
]
}.
lookup_retained_api() ->
Metadata =
#{get => #{description => <<"lookup matching messages">>,
parameters => [ #{name => page,
in => query,
description => <<"Page">>,
schema => #{type => integer, default => 1}}
, #{name => limit,
in => query,
description => <<"Page size">>,
schema => #{type => integer,
default => emqx_mgmt:max_row_limit()}}
],
responses => #{ <<"200">> =>
response_array_schema("List retained messages", message)
, <<"405">> => response_schema(<<"NotAllowed">>)
}}},
{"/mqtt/retainer/messages", Metadata, lookup_retained_warp}.
with_topic_api() ->
MetaData = #{get => #{description => <<"lookup matching messages">>,
parameters => [ #{name => topic,
in => path,
required => true,
schema => #{type => "string"}}
, #{name => page,
in => query,
description => <<"Page">>,
schema => #{type => integer, default => 1}}
, #{name => limit,
in => query,
description => <<"Page size">>,
schema => #{type => integer,
default => emqx_mgmt:max_row_limit()}}
],
responses => #{ <<"200">> =>
response_array_schema("List retained messages", detail_message)
, <<"405">> => response_schema(<<"NotAllowed">>)}},
delete => #{description => <<"delete matching messages">>,
parameters => [#{name => topic,
in => path,
required => true,
schema => #{type => "string"}}],
responses => #{ <<"200">> => response_schema(<<"Successed">>)
, <<"405">> => response_schema(<<"NotAllowed">>)}}
},
{"/mqtt/retainer/message/:topic", MetaData, with_topic_warp}.
config_api() ->
MetaData = #{
get => #{
description => <<"get retainer config">>,
responses => #{<<"200">> => ?CFG_BODY("Get configs successfully"),
<<"404">> => response_error_schema(
<<"Config not found">>, ['NOT_FOUND'])}
},
put => #{
description => <<"Update retainer config">>,
'requestBody' =>
?CFG_BODY("The format of the request body is depend on the 'conf_path' parameter in the query string"),
responses => #{<<"200">> => response_schema("Update configs successfully"),
<<"400">> => response_error_schema(
<<"Update configs failed">>, ['UPDATE_FAILED'])}
}
},
{"/mqtt/retainer", MetaData, config}.
lookup_retained_warp(Type, Req) ->
check_backend(Type, Req, fun lookup_retained/2).
with_topic_warp(Type, Req) ->
check_backend(Type, Req, fun with_topic/2).
config(get, _) ->
Config = emqx_config:get([emqx_retainer]),
Body = emqx_json:encode(Config),
{200, Body};
config(put, Req) ->
try try
ConfigList = proplists:get_value(<<"emqx_retainer">>, Params), {ok, Body, _} = cowboy_req:read_body(Req),
{ok, RawConf} = hocon:binary(jsx:encode(#{<<"emqx_retainer">> => ConfigList}), Cfg = emqx_json:decode(Body),
{ok, RawConf} = hocon:binary(jsx:encode(#{<<"emqx_retainer">> => Cfg}),
#{format => richmap}), #{format => richmap}),
RichConf = hocon_schema:check(emqx_retainer_schema, RawConf, #{atom_key => true}), RichConf = hocon_schema:check(emqx_retainer_schema, RawConf, #{atom_key => true}),
#{emqx_retainer := Conf} = hocon_schema:richmap_to_map(RichConf), #{emqx_retainer := Conf} = hocon_schema:richmap_to_map(RichConf),
Action = proplists:get_value(<<"action">>, Params, undefined), emqx_retainer:update_config(Conf),
do_update_config(Action, Conf), {200, #{<<"content-type">> => <<"text/plain">>}, <<"Update configs successfully">>}
return() catch _:Reason:_ ->
catch _:_:Reason -> {400,
return({error, Reason}) #{code => 'UPDATE_FAILED',
message => erlang:list_to_binary(io_lib:format("~p~n", [Reason]))}}
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Interval Funcs %% Interval Funcs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
do_update_config(undefined, Config) -> lookup_retained(get, Req) ->
emqx_retainer:update_config(Config); lookup(undefined, Req, fun format_message/1).
do_update_config(<<"test">>, _) ->
ok.
%% TODO: V5 API with_topic(get, Req) ->
return() -> Topic = cowboy_req:binding(topic, Req),
ok. lookup(Topic, Req, fun format_detail_message/1);
return(_) ->
ok. with_topic(delete, Req) ->
Topic = cowboy_req:binding(topic, Req),
emqx_retainer_mnesia:delete_message(undefined, Topic),
{200}.
-spec lookup(undefined | binary(),
cowboy_req:req(),
fun((#message{}) -> map())) ->
{200, map()}.
lookup(Topic, Req, Formatter) ->
#{page := Page,
limit := Limit} = cowboy_req:match_qs([{page, int, 1},
{limit, int, emqx_mgmt:max_row_limit()}],
Req),
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit),
{200, format_message(Msgs, Formatter)}.
message_schema(Type, Properties) ->
#{Type => #{type => object,
properties => Properties()}}.
message_properties() ->
#{msgid => #{type => string,
description => <<"Message ID">>},
topic => #{type => string,
description => <<"Topic">>},
qos => #{type => integer,
enum => [0, 1, 2],
description => <<"Qos">>},
publish_at => #{type => string,
description => <<"publish datetime">>},
from_clientid => #{type => string,
description => <<"Message from">>},
from_username => #{type => string,
description => <<"publish username">>}}.
detail_message_properties() ->
Base = message_properties(),
Base#{payload => #{type => string,
description => <<"Topic">>}}.
format_message(Messages, Formatter) when is_list(Messages)->
[Formatter(Message) || Message <- Messages];
format_message(Message, Formatter) ->
Formatter(Message).
format_message(#message{id = ID, qos = Qos, topic = Topic, from = From, timestamp = Timestamp, headers = Headers}) ->
#{msgid => emqx_guid:to_hexstr(ID),
qos => Qos,
topic => Topic,
publish_at => erlang:list_to_binary(emqx_mgmt_util:strftime(Timestamp div 1000)),
from_clientid => to_bin_string(From),
from_username => maps:get(username, Headers, <<>>)
}.
format_detail_message(#message{payload = Payload} = Msg) ->
Base = format_message(Msg),
Base#{payload => Payload}.
to_bin_string(Data) when is_binary(Data) ->
Data;
to_bin_string(Data) ->
list_to_binary(io_lib:format("~p", [Data])).
check_backend(Type, Req, Cont) ->
case emqx:get_config([emqx_retainer, config, type]) of
built_in_database ->
Cont(Type, Req);
_ ->
{405,
#{<<"content-type">> => <<"text/plain">>},
<<"This API only for built in database">>}
end.

View File

@ -24,9 +24,10 @@
-include_lib("stdlib/include/qlc.hrl"). -include_lib("stdlib/include/qlc.hrl").
-export([delete_message/2 -export([ delete_message/2
, store_retained/2 , store_retained/2
, read_message/2 , read_message/2
, page_read/4
, match_messages/3 , match_messages/3
, clear_expired/1 , clear_expired/1
, clean/1]). , clean/1]).
@ -129,6 +130,19 @@ delete_message(_, Topic) ->
read_message(_, Topic) -> read_message(_, Topic) ->
{ok, read_messages(Topic)}. {ok, read_messages(Topic)}.
page_read(_, Topic, Page, Limit) ->
Cursor = make_cursor(Topic),
case Page > 1 of
true ->
_ = qlc:next_answers(Cursor, (Page - 1) * Limit),
ok;
_ ->
ok
end,
Rows = qlc:next_answers(Cursor, Limit),
qlc:delete_cursor(Cursor),
{ok, Rows}.
match_messages(_, Topic, Cursor) -> match_messages(_, Topic, Cursor) ->
MaxReadNum = emqx:get_config([?APP, flow_control, max_read_number]), MaxReadNum = emqx:get_config([?APP, flow_control, max_read_number]),
case Cursor of case Cursor of
@ -152,34 +166,28 @@ clean(_) ->
sort_retained([]) -> []; sort_retained([]) -> [];
sort_retained([Msg]) -> [Msg]; sort_retained([Msg]) -> [Msg];
sort_retained(Msgs) -> sort_retained(Msgs) ->
lists:sort(fun(#message{timestamp = Ts1}, #message{timestamp = Ts2}) -> lists:sort(fun compare_message/2, Msgs).
Ts1 =< Ts2 end,
Msgs). compare_message(M1, M2) ->
M1#message.timestamp =< M2#message.timestamp.
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
topic2tokens(Topic) -> topic2tokens(Topic) ->
emqx_topic:words(Topic). emqx_topic:words(Topic).
-spec start_batch_read(topic(), pos_integer()) -> batch_read_result(). -spec start_batch_read(topic(), pos_integer()) -> batch_read_result().
start_batch_read(Topic, MaxReadNum) -> start_batch_read(Topic, MaxReadNum) ->
Ms = make_match_spec(Topic), Cursor = make_cursor(Topic),
TabQH = ets:table(?TAB, [{traverse, {select, Ms}}]),
QH = qlc:q([E || E <- TabQH]),
Cursor = qlc:cursor(QH),
batch_read_messages(Cursor, MaxReadNum). batch_read_messages(Cursor, MaxReadNum).
-spec batch_read_messages(emqx_retainer_storage:cursor(), pos_integer()) -> batch_read_result(). -spec batch_read_messages(emqx_retainer_storage:cursor(), pos_integer()) -> batch_read_result().
batch_read_messages(Cursor, MaxReadNum) -> batch_read_messages(Cursor, MaxReadNum) ->
Answers = qlc:next_answers(Cursor, MaxReadNum), Answers = qlc:next_answers(Cursor, MaxReadNum),
Orders = sort_retained(Answers), case erlang:length(Answers) < MaxReadNum of
case erlang:length(Orders) < MaxReadNum of
true -> true ->
qlc:delete_cursor(Cursor), qlc:delete_cursor(Cursor),
{ok, Orders, undefined}; {ok, Answers, undefined};
_ -> _ ->
{ok, Orders, Cursor} {ok, Answers, Cursor}
end. end.
-spec(read_messages(emqx_types:topic()) -spec(read_messages(emqx_types:topic())
@ -217,14 +225,28 @@ condition(Ws) ->
_ -> (Ws1 -- ['#']) ++ '_' _ -> (Ws1 -- ['#']) ++ '_'
end. end.
-spec make_match_spec(topic()) -> ets:match_spec(). -spec make_match_spec(undefined | topic()) -> ets:match_spec().
make_match_spec(Filter) -> make_match_spec(Topic) ->
NowMs = erlang:system_time(millisecond), NowMs = erlang:system_time(millisecond),
Cond = condition(emqx_topic:words(Filter)), Cond =
case Topic of
undefined ->
'_';
_ ->
condition(emqx_topic:words(Topic))
end,
MsHd = #retained{topic = Cond, msg = '$2', expiry_time = '$3'}, MsHd = #retained{topic = Cond, msg = '$2', expiry_time = '$3'},
[{MsHd, [{'=:=', '$3', 0}], ['$2']}, [{MsHd, [{'=:=', '$3', 0}], ['$2']},
{MsHd, [{'>', '$3', NowMs}], ['$2']}]. {MsHd, [{'>', '$3', NowMs}], ['$2']}].
-spec make_cursor(undefined | topic()) -> qlc:query_cursor().
make_cursor(Topic) ->
Ms = make_match_spec(Topic),
TabQH = ets:table(?TAB, [{traverse, {select, Ms}}]),
QH = qlc:q([E || E <- TabQH]),
QH2 = qlc:sort(QH, {order, fun compare_message/2}),
qlc:cursor(QH2).
-spec is_table_full() -> boolean(). -spec is_table_full() -> boolean().
is_table_full() -> is_table_full() ->
#{max_retained_messages := Limit} = emqx:get_config([?APP, config]), #{max_retained_messages := Limit} = emqx:get_config([?APP, config]),

View File

@ -33,10 +33,11 @@
-define(HOST, "http://127.0.0.1:8081/"). -define(HOST, "http://127.0.0.1:8081/").
-define(API_VERSION, "v4"). -define(API_VERSION, "v4").
-define(BASE_PATH, "api"). -define(BASE_PATH, "api").
-define(CFG_URI, "/configs/retainer").
all() -> all() ->
%% TODO: V5 API %% TODO: V5 API
%% emqx_ct:all(?MODULE). %% emqx_ct:all(?MODULE).
[]. [].
groups() -> groups() ->
@ -69,16 +70,12 @@ set_special_configs(_) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_config(_Config) -> t_config(_Config) ->
{ok, Return} = request_http_rest_lookup(["retainer"]), {ok, Return} = request_http_rest_lookup([?CFG_URI]),
NowCfg = get_http_data(Return), NowCfg = get_http_data(Return),
NewCfg = NowCfg#{<<"msg_expiry_interval">> => timer:seconds(60)}, NewCfg = NowCfg#{<<"msg_expiry_interval">> => timer:seconds(60)},
RetainerConf = #{<<"emqx_retainer">> => NewCfg}, RetainerConf = #{<<"emqx_retainer">> => NewCfg},
{ok, _} = request_http_rest_update(["retainer?action=test"], RetainerConf), {ok, _} = request_http_rest_update([?CFG_URI], RetainerConf),
{ok, TestReturn} = request_http_rest_lookup(["retainer"]),
?assertEqual(NowCfg, get_http_data(TestReturn)),
{ok, _} = request_http_rest_update(["retainer"], RetainerConf),
{ok, UpdateReturn} = request_http_rest_lookup(["retainer"]), {ok, UpdateReturn} = request_http_rest_lookup(["retainer"]),
?assertEqual(NewCfg, get_http_data(UpdateReturn)), ?assertEqual(NewCfg, get_http_data(UpdateReturn)),
ok. ok.
@ -141,12 +138,12 @@ receive_messages(Count, Msgs) ->
end. end.
switch_emqx_retainer(undefined, IsEnable) -> switch_emqx_retainer(undefined, IsEnable) ->
{ok, Return} = request_http_rest_lookup(["retainer"]), {ok, Return} = request_http_rest_lookup([?COMMON_SHARD]),
NowCfg = get_http_data(Return), NowCfg = get_http_data(Return),
switch_emqx_retainer(NowCfg, IsEnable); switch_emqx_retainer(NowCfg, IsEnable);
switch_emqx_retainer(NowCfg, IsEnable) -> switch_emqx_retainer(NowCfg, IsEnable) ->
NewCfg = NowCfg#{<<"enable">> => IsEnable}, NewCfg = NowCfg#{<<"enable">> => IsEnable},
RetainerConf = #{<<"emqx_retainer">> => NewCfg}, RetainerConf = #{<<"emqx_retainer">> => NewCfg},
{ok, _} = request_http_rest_update(["retainer"], RetainerConf), {ok, _} = request_http_rest_update([?CFG_URI], RetainerConf),
NewCfg. NewCfg.