feat(retainer): make additional implementations pluggable

Co-authored-by: Thales Macedo Garitezi <thalesmg@gmail.com>
This commit is contained in:
Ilya Averyanov 2024-02-24 02:58:48 +03:00
parent d94066a10a
commit dfdf7455d3
12 changed files with 523 additions and 180 deletions

View File

@ -32,8 +32,7 @@
-export([ -export([
delete_message/2, delete_message/2,
store_retained/2, store_retained/2
get_backend_module/0
]). ]).
-export([ -export([
@ -45,7 +44,15 @@
page_read/3, page_read/3,
post_config_update/5, post_config_update/5,
stats_fun/0, stats_fun/0,
retained_count/0 retained_count/0,
backend_module/0,
backend_module/1,
enabled/0
]).
%% For testing only
-export([
context/0
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -54,8 +61,7 @@
handle_call/3, handle_call/3,
handle_cast/2, handle_cast/2,
handle_info/2, handle_info/2,
terminate/2, terminate/2
code_change/3
]). ]).
-export_type([ -export_type([
@ -71,29 +77,35 @@
context := undefined | context(), context := undefined | context(),
clear_timer := undefined | reference() clear_timer := undefined | reference()
}. }.
-type context() :: term().
-type backend_state() :: term().
-type context() :: #{
module := module(),
state := backend_state()
}.
-type topic() :: emqx_types:topic(). -type topic() :: emqx_types:topic().
-type message() :: emqx_types:message(). -type message() :: emqx_types:message().
-type cursor() :: undefined | term(). -type cursor() :: undefined | term().
-type backend() :: emqx_retainer_mnesia | module(). -type has_next() :: boolean().
-define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)). -define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)).
-define(DEF_EXPIRY_INTERVAL, 0). -define(DEF_EXPIRY_INTERVAL, 0).
-define(MAX_PAYLOAD_SIZE_CONFIG_PATH, [retainer, max_payload_size]). -define(MAX_PAYLOAD_SIZE_CONFIG_PATH, [retainer, max_payload_size]).
-callback create(map()) -> context(). -callback create(hocon:config()) -> backend_state().
-callback update(context(), map()) -> ok | need_recreate. -callback update(backend_state(), hocon:config()) -> ok | need_recreate.
-callback close(context()) -> ok. -callback close(backend_state()) -> ok.
-callback delete_message(context(), topic()) -> ok. -callback delete_message(backend_state(), topic()) -> ok.
-callback store_retained(context(), message()) -> ok. -callback store_retained(backend_state(), message()) -> ok.
-callback read_message(context(), topic()) -> {ok, list()}. -callback read_message(backend_state(), topic()) -> {ok, list(message())}.
-callback page_read(context(), topic(), non_neg_integer(), non_neg_integer()) -> -callback page_read(backend_state(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) ->
{ok, HasNext :: boolean(), list()}. {ok, has_next(), list(message())}.
-callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. -callback match_messages(backend_state(), topic(), cursor()) -> {ok, list(message()), cursor()}.
-callback clear_expired(context()) -> ok. -callback clear_expired(backend_state()) -> ok.
-callback clean(context()) -> ok. -callback clean(backend_state()) -> ok.
-callback size(context()) -> non_neg_integer(). -callback size(backend_state()) -> non_neg_integer().
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Hook API %% Hook API
@ -131,6 +143,13 @@ on_message_publish(Msg = #message{flags = #{retain := true}}, Context) ->
on_message_publish(Msg, _) -> on_message_publish(Msg, _) ->
{ok, Msg}. {ok, Msg}.
%%------------------------------------------------------------------------------
%% Config API
%%------------------------------------------------------------------------------
post_config_update(_, _UpdateReq, NewConf, OldConf, _AppEnvs) ->
call({update_config, NewConf, OldConf}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% APIs %% APIs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -140,6 +159,7 @@ on_message_publish(Msg, _) ->
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec get_expiry_time(message()) -> non_neg_integer().
get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) -> get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) ->
0; 0;
get_expiry_time(#message{ get_expiry_time(#message{
@ -154,41 +174,46 @@ get_expiry_time(#message{timestamp = Ts}) ->
_ -> Ts + Interval _ -> Ts + Interval
end. end.
get_stop_publish_clear_msg() ->
emqx_conf:get([retainer, stop_publish_clear_msg], false).
-spec update_config(hocon:config()) -> {ok, _} | {error, _}. -spec update_config(hocon:config()) -> {ok, _} | {error, _}.
update_config(Conf) -> update_config(Conf) ->
emqx_conf:update([retainer], Conf, #{override_to => cluster}). emqx_conf:update([retainer], Conf, #{override_to => cluster}).
-spec clean() -> ok.
clean() -> clean() ->
call(?FUNCTION_NAME). call(?FUNCTION_NAME).
-spec delete(topic()) -> ok.
delete(Topic) -> delete(Topic) ->
call({?FUNCTION_NAME, Topic}). call({?FUNCTION_NAME, Topic}).
-spec retained_count() -> non_neg_integer().
retained_count() -> retained_count() ->
call(?FUNCTION_NAME). call(?FUNCTION_NAME).
-spec read_message(topic()) -> {ok, list(message())}.
read_message(Topic) -> read_message(Topic) ->
call({?FUNCTION_NAME, Topic}). call({?FUNCTION_NAME, Topic}).
-spec page_read(emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) ->
{ok, has_next(), list(message())}.
page_read(Topic, Page, Limit) -> page_read(Topic, Page, Limit) ->
call({?FUNCTION_NAME, Topic, Page, Limit}). call({?FUNCTION_NAME, Topic, Page, Limit}).
post_config_update(_, _UpdateReq, NewConf, OldConf, _AppEnvs) -> -spec enabled() -> boolean().
call({update_config, NewConf, OldConf}). enabled() ->
call(?FUNCTION_NAME).
call(Req) -> -spec context() -> ok.
gen_server:call(?MODULE, Req, infinity). context() ->
call(?FUNCTION_NAME).
%%------------------------------------------------------------------------------
%% Internal APIs
%%------------------------------------------------------------------------------
stats_fun() -> stats_fun() ->
gen_server:cast(?MODULE, ?FUNCTION_NAME). gen_server:cast(?MODULE, ?FUNCTION_NAME).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec get_basic_usage_info() -> #{retained_messages => non_neg_integer()}. -spec get_basic_usage_info() -> #{retained_messages => non_neg_integer()}.
get_basic_usage_info() -> get_basic_usage_info() ->
try try
@ -206,13 +231,14 @@ init([]) ->
erlang:process_flag(trap_exit, true), erlang:process_flag(trap_exit, true),
emqx_conf:add_handler([retainer], ?MODULE), emqx_conf:add_handler([retainer], ?MODULE),
State = new_state(), State = new_state(),
#{enable := Enable} = Cfg = emqx:get_config([retainer]), RetainerConfig = emqx:get_config([retainer]),
{ok, {ok,
case Enable of case maps:get(enable, RetainerConfig) of
false ->
State;
true -> true ->
enable_retainer(State, Cfg); BackendConfig = enabled_backend_config(RetainerConfig),
_ -> enable_retainer(State, RetainerConfig, BackendConfig)
State
end}. end}.
handle_call({update_config, NewConf, OldConf}, _, State) -> handle_call({update_config, NewConf, OldConf}, _, State) ->
@ -220,39 +246,34 @@ handle_call({update_config, NewConf, OldConf}, _, State) ->
emqx_retainer_dispatcher:refresh_limiter(NewConf), emqx_retainer_dispatcher:refresh_limiter(NewConf),
{reply, ok, State2}; {reply, ok, State2};
handle_call(clean, _, #{context := Context} = State) -> handle_call(clean, _, #{context := Context} = State) ->
clean(Context), _ = clean(Context),
{reply, ok, State}; {reply, ok, State};
handle_call({delete, Topic}, _, #{context := Context} = State) -> handle_call({delete, Topic}, _, #{context := Context} = State) ->
delete_message(Context, Topic), _ = delete_message(Context, Topic),
{reply, ok, State}; {reply, ok, State};
handle_call({read_message, Topic}, _, #{context := Context} = State) -> handle_call({read_message, Topic}, _, #{context := Context} = State) ->
Mod = get_backend_module(), {reply, read_message(Context, Topic), State};
Result = Mod:read_message(Context, Topic),
{reply, Result, State};
handle_call({page_read, Topic, Page, Limit}, _, #{context := Context} = State) -> handle_call({page_read, Topic, Page, Limit}, _, #{context := Context} = State) ->
Mod = get_backend_module(), {reply, page_read(Context, Topic, Page, Limit), State};
Result = Mod:page_read(Context, Topic, Page, Limit),
{reply, Result, State};
handle_call(retained_count, _From, State = #{context := Context}) -> handle_call(retained_count, _From, State = #{context := Context}) ->
Mod = get_backend_module(), {reply, count(Context), State};
RetainedCount = Mod:size(Context), handle_call(enabled, _From, State = #{enable := Enable}) ->
{reply, RetainedCount, State}; {reply, Enable, State};
handle_call(context, _From, State = #{context := Context}) ->
{reply, Context, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast(stats_fun, #{context := Context} = State) -> handle_cast(stats_fun, #{context := Context} = State) ->
Mod = get_backend_module(), emqx_stats:setstat('retained.count', 'retained.max', count(Context)),
Size = Mod:size(Context),
emqx_stats:setstat('retained.count', 'retained.max', Size),
{noreply, State}; {noreply, State};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
handle_info(clear_expired, #{context := Context} = State) -> handle_info(clear_expired, #{context := Context} = State) ->
Mod = get_backend_module(), ok = clear_expired(Context),
Mod:clear_expired(Context),
Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL), Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL),
{noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate}; {noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
@ -264,12 +285,16 @@ terminate(_Reason, #{clear_timer := ClearTimer}) ->
_ = stop_timer(ClearTimer), _ = stop_timer(ClearTimer),
ok. ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Internal functions %% Internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
call(Req) ->
gen_server:call(?MODULE, Req, infinity).
get_stop_publish_clear_msg() ->
emqx_conf:get([retainer, stop_publish_clear_msg], false).
-spec new_state() -> state(). -spec new_state() -> state().
new_state() -> new_state() ->
#{ #{
@ -287,8 +312,34 @@ dispatch(Context, Topic) ->
-spec delete_message(context(), topic()) -> ok. -spec delete_message(context(), topic()) -> ok.
delete_message(Context, Topic) -> delete_message(Context, Topic) ->
Mod = get_backend_module(), Mod = backend_module(Context),
Mod:delete_message(Context, Topic). BackendState = backend_state(Context),
Mod:delete_message(BackendState, Topic).
-spec read_message(context(), topic()) -> {ok, list(message())}.
read_message(Context, Topic) ->
Mod = backend_module(Context),
BackendState = backend_state(Context),
Mod:read_message(BackendState, Topic).
-spec page_read(context(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) ->
{ok, has_next(), list(message())}.
page_read(Context, Topic, Page, Limit) ->
Mod = backend_module(Context),
BackendState = backend_state(Context),
Mod:page_read(BackendState, Topic, Page, Limit).
-spec count(context()) -> non_neg_integer().
count(Context) ->
Mod = backend_module(Context),
BackendState = backend_state(Context),
Mod:size(BackendState).
-spec clear_expired(context()) -> ok.
clear_expired(Context) ->
Mod = backend_module(Context),
BackendState = backend_state(Context),
Mod:clear_expired(BackendState).
-spec store_retained(context(), message()) -> ok. -spec store_retained(context(), message()) -> ok.
store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) -> store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) ->
@ -303,52 +354,47 @@ store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) ->
limit => Limit limit => Limit
}); });
_ -> _ ->
Mod = get_backend_module(), Mod = backend_module(Context),
Mod:store_retained(Context, Msg) BackendState = backend_state(Context),
Mod:store_retained(BackendState, Msg)
end. end.
-spec clean(context()) -> ok. -spec clean(context()) -> ok.
clean(Context) -> clean(Context) ->
Mod = get_backend_module(), Mod = backend_module(Context),
Mod:clean(Context). BackendState = backend_state(Context),
Mod:clean(BackendState).
-spec update_config(state(), hocon:config(), hocon:config()) -> state(). -spec update_config(state(), hocon:config(), hocon:config()) -> state().
update_config(State, Conf, OldConf) -> update_config(State, NewConfig, OldConfig) ->
update_config( update_config(
maps:get(enable, Conf), maps:get(enable, NewConfig),
maps:get(enable, OldConf), maps:get(enable, OldConfig),
State, State,
Conf, NewConfig,
OldConf OldConfig
). ).
-spec update_config(boolean(), boolean(), state(), hocon:config(), hocon:config()) -> state(). -spec update_config(boolean(), boolean(), state(), hocon:config(), hocon:config()) -> state().
update_config(false, _, State, _, _) -> update_config(false, _, State, _, _) ->
disable_retainer(State); disable_retainer(State);
update_config(true, false, State, NewConf, _) -> update_config(true, false, State, NewConfig, _) ->
enable_retainer(State, NewConf); enable_retainer(State, NewConfig, enabled_backend_config(NewConfig));
update_config( update_config(
true, true,
true, true,
#{clear_timer := ClearTimer, context := Context} = State, #{clear_timer := ClearTimer, context := Context} = State,
NewConf, NewConfig,
OldConf OldConfig
) -> ) ->
#{ #{msg_clear_interval := ClearInterval} = NewConfig,
backend := #{ OldBackendConfig = enabled_backend_config(OldConfig),
type := BackendType NewBackendConfig = enabled_backend_config(NewConfig),
} = NewBackendConfig, OldMod = config_backend_module(OldBackendConfig),
msg_clear_interval := ClearInterval NewMod = config_backend_module(NewBackendConfig),
} = NewConf,
#{ SameBackendType = NewMod =:= OldMod,
backend := #{ case SameBackendType andalso ok =:= OldMod:update(Context, NewBackendConfig) of
type := OldBackendType
}
} = OldConf,
SameBackendType = BackendType =:= OldBackendType,
Mod = get_backend_module(),
case SameBackendType andalso ok =:= Mod:update(Context, NewBackendConfig) of
true -> true ->
State#{ State#{
clear_timer := check_timer( clear_timer := check_timer(
@ -359,18 +405,16 @@ update_config(
}; };
false -> false ->
State2 = disable_retainer(State), State2 = disable_retainer(State),
enable_retainer(State2, NewConf) enable_retainer(State2, NewConfig, NewBackendConfig)
end. end.
-spec enable_retainer(state(), hocon:config()) -> state(). -spec enable_retainer(state(), hocon:config(), hocon:config()) -> state().
enable_retainer( enable_retainer(
State, State,
#{ #{msg_clear_interval := ClearInterval} = _RetainerConfig,
msg_clear_interval := ClearInterval, BackendConfig
backend := BackendCfg
}
) -> ) ->
Context = create(BackendCfg), Context = create(BackendConfig),
ok = load(Context), ok = load(Context),
State#{ State#{
enable := true, enable := true,
@ -415,20 +459,43 @@ check_timer(Timer, undefined, _) ->
check_timer(Timer, _, _) -> check_timer(Timer, _, _) ->
Timer. Timer.
-spec get_backend_module() -> backend(). -spec enabled_backend_config(hocon:config()) -> hocon:config() | no_return().
get_backend_module() -> enabled_backend_config(#{backend := Backend, external_backends := ExternalBackends} = Config) ->
case emqx:get_config([retainer, backend]) of AllBackends = [Backend | maps:values(ExternalBackends)],
#{type := built_in_database} -> emqx_retainer_mnesia; case lists:search(fun(#{enable := Enable}) -> Enable end, AllBackends) of
#{type := Backend} -> Backend {value, EnabledBackend} -> EnabledBackend;
false -> error({no_enabled_backend, Config})
end. end.
create(#{type := built_in_database} = Cfg) -> -spec config_backend_module(hocon:config()) -> module().
Mod = get_backend_module(), config_backend_module(Config) ->
Mod:create(Cfg). case Config of
#{type := built_in_database} -> emqx_retainer_mnesia;
#{module := Module} -> Module
end.
-spec backend_module(context()) -> module().
backend_module(#{module := Module}) -> Module.
-spec backend_state(context()) -> backend_state().
backend_state(#{state := State}) -> State.
-spec backend_module() -> module() | no_return().
backend_module() ->
Config = enabled_backend_config(emqx:get_config([retainer])),
config_backend_module(Config).
-spec create(hocon:config()) -> context().
create(Cfg) ->
Mod = config_backend_module(Cfg),
#{
module => Mod,
state => Mod:create(Cfg)
}.
-spec close(context()) -> ok | {error, term()}. -spec close(context()) -> ok | {error, term()}.
close(Context) -> close(Context) ->
Mod = get_backend_module(), Mod = backend_module(Context),
Mod:close(Context). Mod:close(Context).
-spec load(context()) -> ok. -spec load(context()) -> ok.

View File

@ -185,11 +185,11 @@ lookup_retained(get, #{query_string := Qs}) ->
Page = maps:get(<<"page">>, Qs, 1), Page = maps:get(<<"page">>, Qs, 1),
Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()), Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()),
Topic = maps:get(<<"topic">>, Qs, undefined), Topic = maps:get(<<"topic">>, Qs, undefined),
{ok, HasNext, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit), {ok, HasNext, Msgs} = emqx_retainer:page_read(Topic, Page, Limit),
Meta = Meta =
case Topic of case Topic of
undefined -> undefined ->
#{count => emqx_retainer_mnesia:size(?TAB_MESSAGE)}; #{count => emqx_retainer:retained_count()};
_ -> _ ->
#{} #{}
end, end,
@ -205,7 +205,7 @@ lookup_retained(get, #{query_string := Qs}) ->
with_topic(get, #{bindings := Bindings}) -> with_topic(get, #{bindings := Bindings}) ->
Topic = maps:get(topic, Bindings), Topic = maps:get(topic, Bindings),
{ok, _, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1), {ok, _, Msgs} = emqx_retainer:page_read(Topic, 1, 1),
case Msgs of case Msgs of
[H | _] -> [H | _] ->
{200, format_detail_message(H)}; {200, format_detail_message(H)};
@ -217,14 +217,14 @@ with_topic(get, #{bindings := Bindings}) ->
end; end;
with_topic(delete, #{bindings := Bindings}) -> with_topic(delete, #{bindings := Bindings}) ->
Topic = maps:get(topic, Bindings), Topic = maps:get(topic, Bindings),
case emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1) of case emqx_retainer:page_read(Topic, 1, 1) of
{ok, _, []} -> {ok, _, []} ->
{404, #{ {404, #{
code => <<"NOT_FOUND">>, code => <<"NOT_FOUND">>,
message => <<"Viewed message doesn't exist">> message => <<"Viewed message doesn't exist">>
}}; }};
{ok, _, _} -> {ok, _, _} ->
emqx_retainer_mnesia:delete_message(undefined, Topic), emqx_retainer:delete(Topic),
{204} {204}
end. end.
@ -265,8 +265,8 @@ to_bin_string(Data) ->
list_to_binary(io_lib:format("~p", [Data])). list_to_binary(io_lib:format("~p", [Data])).
check_backend(Type, Params, Cont) -> check_backend(Type, Params, Cont) ->
case emqx:get_config([retainer, backend, type]) of case emqx_retainer:backend_module() of
built_in_database -> emqx_retainer_mnesia ->
Cont(Type, Params); Cont(Type, Params);
_ -> _ ->
{400, 'BAD_REQUEST', <<"This API only support built in database">>} {400, 'BAD_REQUEST', <<"This API only support built in database">>}

View File

@ -26,12 +26,12 @@
]). ]).
start(_Type, _Args) -> start(_Type, _Args) ->
ok = emqx_retainer_mnesia_cli:load(), ok = emqx_retainer_cli:load(),
init_bucket(), init_bucket(),
emqx_retainer_sup:start_link(). emqx_retainer_sup:start_link().
stop(_State) -> stop(_State) ->
ok = emqx_retainer_mnesia_cli:unload(), ok = emqx_retainer_cli:unload(),
delete_bucket(), delete_bucket(),
ok. ok.

View File

@ -14,7 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_retainer_mnesia_cli). -module(emqx_retainer_cli).
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
@ -32,25 +32,40 @@ load() ->
ok = emqx_ctl:register_command(retainer, {?MODULE, retainer}, []). ok = emqx_ctl:register_command(retainer, {?MODULE, retainer}, []).
retainer(["info"]) -> retainer(["info"]) ->
count(); if_enabled(fun() ->
count()
end);
retainer(["topics"]) -> retainer(["topics"]) ->
topic(1, 1000); if_enabled(fun() ->
topic(1, 1000)
end);
retainer(["topics", Start, Len]) -> retainer(["topics", Start, Len]) ->
topic(list_to_integer(Start), list_to_integer(Len)); if_enabled(fun() ->
topic(list_to_integer(Start), list_to_integer(Len))
end);
retainer(["clean", Topic]) -> retainer(["clean", Topic]) ->
emqx_retainer:delete(list_to_binary(Topic)); if_enabled(fun() ->
emqx_retainer:delete(list_to_binary(Topic))
end);
retainer(["clean"]) -> retainer(["clean"]) ->
emqx_retainer:clean(); if_enabled(fun() ->
emqx_retainer:clean()
end);
retainer(["reindex", "status"]) -> retainer(["reindex", "status"]) ->
if_mnesia_enabled(fun() ->
case emqx_retainer_mnesia:reindex_status() of case emqx_retainer_mnesia:reindex_status() of
true -> true ->
?PRINT_MSG("Reindexing is in progress~n"); ?PRINT_MSG("Reindexing is in progress~n");
false -> false ->
?PRINT_MSG("Reindexing is not running~n") ?PRINT_MSG("Reindexing is not running~n")
end; end
end);
retainer(["reindex", "start"]) -> retainer(["reindex", "start"]) ->
retainer(["reindex", "start", "false"]); if_mnesia_enabled(fun() ->
retainer(["reindex", "start", "false"])
end);
retainer(["reindex", "start", ForceParam]) -> retainer(["reindex", "start", ForceParam]) ->
if_mnesia_enabled(fun() ->
case mria_rlog:role() of case mria_rlog:role() of
core -> core ->
Force = Force =
@ -61,7 +76,8 @@ retainer(["reindex", "start", ForceParam]) ->
do_reindex(Force); do_reindex(Force);
replicant -> replicant ->
?PRINT_MSG("Can't run reindex on a replicant node") ?PRINT_MSG("Can't run reindex on a replicant node")
end; end
end);
retainer(_) -> retainer(_) ->
emqx_ctl:usage( emqx_ctl:usage(
[ [
@ -71,10 +87,12 @@ retainer(_) ->
"Show topics of retained messages by the specified range"}, "Show topics of retained messages by the specified range"},
{"retainer clean", "Clean all retained messages"}, {"retainer clean", "Clean all retained messages"},
{"retainer clean <Topic>", "Clean retained messages by the specified topic filter"}, {"retainer clean <Topic>", "Clean retained messages by the specified topic filter"},
{"retainer reindex status", "Show reindex status"}, {"retainer reindex status",
"Show reindex status.\nOnly available for built-in backend."},
{"retainer reindex start [force]", {"retainer reindex start [force]",
"Generate new retainer topic indices from config settings.\n" "Generate new retainer topic indices from config settings.\n"
"Pass true as <Force> to ignore previously started reindexing"} "Pass true as <Force> to ignore previously started reindexing.\n"
"Only available for built-in backend."}
] ]
). ).
@ -107,6 +125,23 @@ count() ->
topic(Start, Len) -> topic(Start, Len) ->
count(), count(),
Topics = lists:sublist(emqx_retainer_mnesia:topics(), Start, Len), {ok, _HasNext, Messages} = emqx_retainer:page_read(<<"#">>, Start, Len),
[?PRINT("~ts~n", [I]) || I <- Topics], [?PRINT("~ts~n", [emqx_message:topic(M)]) || M <- Messages],
ok. ok.
if_enabled(Fun) ->
case emqx_retainer:enabled() of
true -> Fun();
false -> ?PRINT_MSG("Retainer is not enabled~n")
end.
if_mnesia_backend(Fun) ->
case emqx_retainer:backend_module() of
emqx_retainer_mnesia -> Fun();
_ -> ?PRINT_MSG("Command only applicable for builtin backend~n")
end.
if_mnesia_enabled(Fun) ->
if_enabled(fun() ->
if_mnesia_backend(Fun)
end).

View File

@ -236,7 +236,7 @@ cast(Msg) ->
-spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}. -spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}.
dispatch(Context, Pid, Topic, Cursor, Limiter) -> dispatch(Context, Pid, Topic, Cursor, Limiter) ->
Mod = emqx_retainer:get_backend_module(), Mod = emqx_retainer:backend_module(Context),
case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
false -> false ->
{ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]), {ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]),

View File

@ -51,10 +51,8 @@
-export([reindex/2, reindex_status/0]). -export([reindex/2, reindex_status/0]).
-ifdef(TEST).
-export([populate_index_meta/0]). -export([populate_index_meta/0]).
-export([reindex/3]). -export([reindex/3]).
-endif.
-record(retained_message, {topic, msg, expiry_time}). -record(retained_message, {topic, msg, expiry_time}).
-record(retained_index, {key, expiry_time}). -record(retained_index, {key, expiry_time}).
@ -79,7 +77,7 @@ topics() ->
%% emqx_retainer callbacks %% emqx_retainer callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
create(#{storage_type := StorageType}) -> create(#{storage_type := StorageType, max_retained_messages := MaxRetainedMessages} = Config) ->
ok = create_table( ok = create_table(
?TAB_INDEX_META, ?TAB_INDEX_META,
retained_index_meta, retained_index_meta,
@ -87,7 +85,7 @@ create(#{storage_type := StorageType}) ->
set, set,
StorageType StorageType
), ),
ok = populate_index_meta(), ok = populate_index_meta(Config),
ok = create_table( ok = create_table(
?TAB_MESSAGE, ?TAB_MESSAGE,
retained_message, retained_message,
@ -102,8 +100,7 @@ create(#{storage_type := StorageType}) ->
ordered_set, ordered_set,
StorageType StorageType
), ),
%% The context is not used by this backend #{storage_type => StorageType, max_retained_messages => MaxRetainedMessages}.
#{storage_type => StorageType}.
create_table(Table, RecordName, Attributes, Type, StorageType) -> create_table(Table, RecordName, Attributes, Type, StorageType) ->
Copies = Copies =
@ -139,17 +136,15 @@ create_table(Table, RecordName, Attributes, Type, StorageType) ->
ok ok
end. end.
update(#{storage_type := StorageType}, #{storage_type := StorageType}) -> update(_State, _NewConfig) ->
ok;
update(_Context, _NewConfig) ->
need_recreate. need_recreate.
close(_Context) -> ok. close(_State) -> ok.
store_retained(_Context, Msg = #message{topic = Topic}) -> store_retained(State, Msg = #message{topic = Topic}) ->
ExpiryTime = emqx_retainer:get_expiry_time(Msg), ExpiryTime = emqx_retainer:get_expiry_time(Msg),
Tokens = topic_to_tokens(Topic), Tokens = topic_to_tokens(Topic),
case is_table_full() andalso is_new_topic(Tokens) of case is_table_full(State) andalso is_new_topic(Tokens) of
true -> true ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "failed_to_retain_message", msg => "failed_to_retain_message",
@ -182,7 +177,7 @@ clear_expired() ->
QC = qlc:cursor(QH), QC = qlc:cursor(QH),
clear_batch(dirty_indices(write), QC). clear_batch(dirty_indices(write), QC).
delete_message(_Context, Topic) -> delete_message(_State, Topic) ->
Tokens = topic_to_tokens(Topic), Tokens = topic_to_tokens(Topic),
case emqx_topic:wildcard(Topic) of case emqx_topic:wildcard(Topic) of
false -> false ->
@ -198,10 +193,10 @@ delete_message(_Context, Topic) ->
) )
end. end.
read_message(_Context, Topic) -> read_message(_State, Topic) ->
{ok, read_messages(Topic)}. {ok, read_messages(Topic)}.
match_messages(_Context, Topic, undefined) -> match_messages(_State, Topic, undefined) ->
Tokens = topic_to_tokens(Topic), Tokens = topic_to_tokens(Topic),
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
QH = msg_table(search_table(Tokens, Now)), QH = msg_table(search_table(Tokens, Now)),
@ -212,7 +207,7 @@ match_messages(_Context, Topic, undefined) ->
Cursor = qlc:cursor(QH), Cursor = qlc:cursor(QH),
match_messages(undefined, Topic, {Cursor, BatchNum}) match_messages(undefined, Topic, {Cursor, BatchNum})
end; end;
match_messages(_Context, _Topic, {Cursor, BatchNum}) -> match_messages(_State, _Topic, {Cursor, BatchNum}) ->
case qlc_next_answers(Cursor, BatchNum) of case qlc_next_answers(Cursor, BatchNum) of
{closed, Rows} -> {closed, Rows} ->
{ok, Rows, undefined}; {ok, Rows, undefined};
@ -220,7 +215,7 @@ match_messages(_Context, _Topic, {Cursor, BatchNum}) ->
{ok, Rows, {Cursor, BatchNum}} {ok, Rows, {Cursor, BatchNum}}
end. end.
page_read(_Context, Topic, Page, Limit) -> page_read(_State, Topic, Page, Limit) ->
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
QH = QH =
case Topic of case Topic of
@ -263,7 +258,8 @@ size(_) ->
table_size(). table_size().
reindex(Force, StatusFun) -> reindex(Force, StatusFun) ->
reindex(config_indices(), Force, StatusFun). Config = emqx:get_config([retainer, backend]),
reindex(config_indices(Config), Force, StatusFun).
reindex_status() -> reindex_status() ->
case mnesia:dirty_read(?TAB_INDEX_META, ?META_KEY) of case mnesia:dirty_read(?TAB_INDEX_META, ?META_KEY) of
@ -438,9 +434,8 @@ make_index_match_spec(Index, Tokens, NowMs) ->
MsHd = #retained_index{key = Cond, expiry_time = '$3'}, MsHd = #retained_index{key = Cond, expiry_time = '$3'},
{[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}], IsExact}. {[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}], IsExact}.
is_table_full() -> is_table_full(#{max_retained_messages := MaxRetainedMessages} = _State) ->
Limit = emqx:get_config([retainer, backend, max_retained_messages]), MaxRetainedMessages > 0 andalso (table_size() >= MaxRetainedMessages).
Limit > 0 andalso (table_size() >= Limit).
is_new_topic(Tokens) -> is_new_topic(Tokens) ->
case mnesia:dirty_read(?TAB_MESSAGE, Tokens) of case mnesia:dirty_read(?TAB_MESSAGE, Tokens) of
@ -453,11 +448,15 @@ is_new_topic(Tokens) ->
table_size() -> table_size() ->
mnesia:table_info(?TAB_MESSAGE, size). mnesia:table_info(?TAB_MESSAGE, size).
config_indices() -> config_indices(#{index_specs := IndexSpecs}) ->
lists:sort(emqx_config:get([retainer, backend, index_specs])). IndexSpecs.
populate_index_meta() -> populate_index_meta() ->
ConfigIndices = config_indices(), Config = emqx:get_config([retainer, backend]),
populate_index_meta(Config).
populate_index_meta(Config) ->
ConfigIndices = config_indices(Config),
case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_populate_index_meta/1, [ConfigIndices]) of case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_populate_index_meta/1, [ConfigIndices]) of
{atomic, ok} -> {atomic, ok} ->
ok; ok;

View File

@ -36,7 +36,8 @@ roots() ->
[ [
{"retainer", {"retainer",
hoconsc:mk(hoconsc:ref(?MODULE, "retainer"), #{ hoconsc:mk(hoconsc:ref(?MODULE, "retainer"), #{
converter => fun retainer_converter/2 converter => fun retainer_converter/2,
validator => fun validate_backends_enabled/1
})} })}
]. ].
@ -86,11 +87,29 @@ fields("retainer") ->
aliases => [deliver_rate] aliases => [deliver_rate]
} }
)}, )},
{backend, backend_config()} {backend, backend_config()},
{external_backends,
?HOCON(
hoconsc:ref(?MODULE, external_backends),
#{
desc => ?DESC(backends),
required => false,
default => #{},
importance => ?IMPORTANCE_HIDDEN
}
)}
]; ];
fields(mnesia_config) -> fields(mnesia_config) ->
[ [
{type, sc(built_in_database, mnesia_config_type, built_in_database)}, {type,
?HOCON(
built_in_database,
#{
desc => ?DESC(mnesia_config_type),
required => false,
default => built_in_database
}
)},
{storage_type, {storage_type,
sc( sc(
hoconsc:enum([ram, disc]), hoconsc:enum([ram, disc]),
@ -103,7 +122,13 @@ fields(mnesia_config) ->
max_retained_messages, max_retained_messages,
0 0
)}, )},
{index_specs, fun retainer_indices/1} {index_specs, fun retainer_indices/1},
{enable,
?HOCON(boolean(), #{
desc => ?DESC(mnesia_enable),
required => false,
default => true
})}
]; ];
fields(flow_control) -> fields(flow_control) ->
[ [
@ -125,7 +150,9 @@ fields(flow_control) ->
batch_deliver_limiter, batch_deliver_limiter,
undefined undefined
)} )}
]. ];
fields(external_backends) ->
emqx_schema_hooks:injection_point('retainer.external_backends').
desc("retainer") -> desc("retainer") ->
"Configuration related to handling `PUBLISH` packets with a `retain` flag set to 1."; "Configuration related to handling `PUBLISH` packets with a `retain` flag set to 1.";
@ -140,9 +167,6 @@ desc(_) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%sc(Type, DescId) ->
%% hoconsc:mk(Type, #{desc => ?DESC(DescId)}).
sc(Type, DescId, Default) -> sc(Type, DescId, Default) ->
sc(Type, DescId, Default, ?DEFAULT_IMPORTANCE). sc(Type, DescId, Default, ?DEFAULT_IMPORTANCE).
sc(Type, DescId, Default, Importance) -> sc(Type, DescId, Default, Importance) ->
@ -214,3 +238,16 @@ retainer_converter(#{<<"deliver_rate">> := Delivery} = Conf, Opts) ->
retainer_converter(Conf1#{<<"delivery_rate">> => Delivery}, Opts); retainer_converter(Conf1#{<<"delivery_rate">> => Delivery}, Opts);
retainer_converter(Conf, _Opts) -> retainer_converter(Conf, _Opts) ->
Conf. Conf.
validate_backends_enabled(Config) ->
BuiltInBackend = maps:get(<<"backend">>, Config, #{}),
ExternalBackends = maps:values(maps:get(<<"external_backends">>, Config, #{})),
Enabled = lists:filter(fun(#{<<"enable">> := E}) -> E end, [BuiltInBackend | ExternalBackends]),
case Enabled of
[#{}] ->
ok;
_Conflicts = [_ | _] ->
{error, multiple_enabled_backends};
_None = [] ->
{error, no_enabled_backend}
end.

View File

@ -644,7 +644,7 @@ t_reindex(_) ->
t_get_basic_usage_info(_Config) -> t_get_basic_usage_info(_Config) ->
?assertEqual(#{retained_messages => 0}, emqx_retainer:get_basic_usage_info()), ?assertEqual(#{retained_messages => 0}, emqx_retainer:get_basic_usage_info()),
Context = undefined, Context = emqx_retainer:context(),
lists:foreach( lists:foreach(
fun(N) -> fun(N) ->
Num = integer_to_binary(N), Num = integer_to_binary(N),
@ -788,8 +788,8 @@ t_compatibility_for_deliver_rate(_) ->
). ).
t_update_config(_) -> t_update_config(_) ->
OldConf = emqx_config:get([retainer]), OldConf = emqx_config:get_raw([retainer]),
NewConf = emqx_utils_maps:deep_put([backend, storage_type], OldConf, disk), NewConf = emqx_utils_maps:deep_put([<<"backend">>, <<"storage_type">>], OldConf, <<"disk">>),
emqx_retainer:update_config(NewConf). emqx_retainer:update_config(NewConf).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -836,7 +836,7 @@ with_conf(ConfMod, Case) ->
emqx_retainer:update_config(NewConf), emqx_retainer:update_config(NewConf),
try try
Case(), Case(),
emqx_retainer:update_config(Conf) {ok, _} = emqx_retainer:update_config(Conf)
catch catch
Type:Error:Strace -> Type:Error:Strace ->
emqx_retainer:update_config(Conf), emqx_retainer:update_config(Conf),

View File

@ -0,0 +1,104 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_retainer_backend_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() -> emqx_common_test_helpers:all(?MODULE).
-define(BASE_CONF, #{
<<"retainer">> =>
#{
<<"enable">> => true,
<<"backend">> =>
#{
<<"type">> => <<"built_in_database">>,
<<"storage_type">> => <<"ram">>,
<<"max_retained_messages">> => 0
}
}
}).
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
{emqx, #{
before_start => fun() ->
ok = emqx_schema_hooks:inject_from_modules([emqx_retainer_dummy])
end
}},
emqx_conf,
{emqx_retainer, ?BASE_CONF}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)).
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
t_external_backend(_Config) ->
{ok, _} = emqx_retainer:update_config(#{
<<"enable">> => true,
<<"backend">> => #{
<<"enable">> => false
},
<<"external_backends">> =>
#{
<<"dummy">> => #{<<"enable">> => true}
}
}),
?assertMatch(
ok,
emqx_retainer:clean()
),
?assertMatch(
ok,
emqx_retainer:delete(<<"topic">>)
),
?assertMatch(
{ok, []},
emqx_retainer:read_message(<<"topic">>)
),
?assertMatch(
{ok, false, []},
emqx_retainer:page_read(<<"topic">>, 0, 10)
),
?assertEqual(
0,
emqx_retainer:retained_count()
),
?assertEqual(
emqx_retainer_dummy,
emqx_retainer:backend_module()
),
?assertEqual(
true,
emqx_retainer:enabled()
).

View File

@ -38,22 +38,22 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)). emqx_cth_suite:stop(?config(suite_apps, Config)).
t_reindex_status(_Config) -> t_reindex_status(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["reindex", "status"]). ok = emqx_retainer_cli:retainer(["reindex", "status"]).
t_info(_Config) -> t_info(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["info"]). ok = emqx_retainer_cli:retainer(["info"]).
t_topics(_Config) -> t_topics(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["topics"]). ok = emqx_retainer_cli:retainer(["topics"]).
t_topics_with_len(_Config) -> t_topics_with_len(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["topics", "100", "200"]). ok = emqx_retainer_cli:retainer(["topics", "100", "200"]).
t_clean(_Config) -> t_clean(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["clean"]). ok = emqx_retainer_cli:retainer(["clean"]).
t_topic(_Config) -> t_topic(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["clean", "foo/bar"]). ok = emqx_retainer_cli:retainer(["clean", "foo/bar"]).
t_reindex(_Config) -> t_reindex(_Config) ->
{ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
@ -84,6 +84,6 @@ t_reindex(_Config) ->
), ),
emqx_config:put([retainer, backend, index_specs], [[4, 5]]), emqx_config:put([retainer, backend, index_specs], [[4, 5]]),
ok = emqx_retainer_mnesia_cli:retainer(["reindex", "start"]), ok = emqx_retainer_cli:retainer(["reindex", "start"]),
?assertEqual(1000, mnesia:table_info(?TAB_INDEX, size)). ?assertEqual(1000, mnesia:table_info(?TAB_INDEX, size)).

View File

@ -0,0 +1,98 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_retainer_dummy).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-behaviour(emqx_retainer).
-export([
create/1,
update/2,
close/1,
delete_message/2,
store_retained/2,
read_message/2,
page_read/4,
match_messages/3,
clear_expired/1,
clean/1,
size/1
]).
-behaviour(emqx_schema_hooks).
-export([
fields/1,
injected_fields/0
]).
injected_fields() ->
#{
'retainer.external_backends' => external_backend_fields()
}.
create(_Config) -> #{}.
update(_Context, _Config) -> ok.
close(_Context) -> ok.
delete_message(_Context, _Topic) -> ok.
store_retained(_Context, _Message) -> ok.
read_message(_Context, _Topic) -> {ok, []}.
page_read(_Context, _Topic, _Offset, _Limit) -> {ok, false, []}.
match_messages(_Context, _Topic, _Cursor) -> {ok, [], 0}.
clear_expired(_Context) -> ok.
clean(_Context) -> ok.
size(_Context) -> 0.
external_backend_fields() ->
[
{dummy, hoconsc:ref(?MODULE, dummy)}
].
fields(dummy) ->
[
{module,
hoconsc:mk(
emqx_retainer_dummy,
#{
desc => <<"dummy backend mod">>,
required => false,
default => <<"emqx_retainer_dummy">>,
importance => ?IMPORTANCE_HIDDEN
}
)},
{enable,
hoconsc:mk(
boolean(),
#{
desc => <<"enable dummy backend">>,
required => true,
default => false
}
)}
].

View File

@ -33,6 +33,9 @@ mnesia_config_storage_type.desc:
mnesia_config_type.desc: mnesia_config_type.desc:
"""Backend type.""" """Backend type."""
mnesia_enable.desc:
"""Enable built-in Mnesia backend."""
msg_clear_interval.desc: msg_clear_interval.desc:
"""Interval for EMQX to scan expired messages and delete them. Never scan if the value is 0.""" """Interval for EMQX to scan expired messages and delete them. Never scan if the value is 0."""