Merge pull request #12589 from savonarola/0222-retainer-interface

Make retainer pluggable
This commit is contained in:
Ilya Averyanov 2024-02-29 10:31:15 +02:00 committed by GitHub
commit d4519bda82
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 559 additions and 218 deletions

View File

@ -13,6 +13,8 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-ifndef(EMQX_RETAINER_HRL).
-define(EMQX_RETAINER_HRL, true).
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
@ -22,22 +24,4 @@
-define(TAB_INDEX_META, emqx_retainer_index_meta). -define(TAB_INDEX_META, emqx_retainer_index_meta).
-define(RETAINER_SHARD, emqx_retainer_shard). -define(RETAINER_SHARD, emqx_retainer_shard).
-type topic() :: binary(). -endif.
-type payload() :: binary().
-type message() :: #message{}.
-type context() :: #{
context_id := pos_integer(),
atom() => term()
}.
-define(DELIVER_SEMAPHORE, deliver_remained_quota).
-type semaphore() :: ?DELIVER_SEMAPHORE.
-type cursor() :: undefined | term().
-type result() :: term().
-define(SHARED_CONTEXT_TAB, emqx_retainer_ctx).
-record(shared_context, {key :: atom(), value :: term()}).
-type shared_context_key() :: ?DELIVER_SEMAPHORE.
-type backend() :: emqx_retainer_storage_mnesia.

View File

@ -32,8 +32,7 @@
-export([ -export([
delete_message/2, delete_message/2,
store_retained/2, store_retained/2
get_backend_module/0
]). ]).
-export([ -export([
@ -45,7 +44,15 @@
page_read/3, page_read/3,
post_config_update/5, post_config_update/5,
stats_fun/0, stats_fun/0,
retained_count/0 retained_count/0,
backend_module/0,
backend_module/1,
enabled/0
]).
%% For testing only
-export([
context/0
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -54,8 +61,12 @@
handle_call/3, handle_call/3,
handle_cast/2, handle_cast/2,
handle_info/2, handle_info/2,
terminate/2, terminate/2
code_change/3 ]).
-export_type([
cursor/0,
context/0
]). ]).
%% exported for `emqx_telemetry' %% exported for `emqx_telemetry'
@ -63,26 +74,38 @@
-type state() :: #{ -type state() :: #{
enable := boolean(), enable := boolean(),
context_id := non_neg_integer(),
context := undefined | context(), context := undefined | context(),
clear_timer := undefined | reference() clear_timer := undefined | reference()
}. }.
-type backend_state() :: term().
-type context() :: #{
module := module(),
state := backend_state()
}.
-type topic() :: emqx_types:topic().
-type message() :: emqx_types:message().
-type cursor() :: undefined | term().
-type has_next() :: boolean().
-define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)). -define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)).
-define(DEF_EXPIRY_INTERVAL, 0). -define(DEF_EXPIRY_INTERVAL, 0).
-define(MAX_PAYLOAD_SIZE_CONFIG_PATH, [retainer, max_payload_size]). -define(MAX_PAYLOAD_SIZE_CONFIG_PATH, [retainer, max_payload_size]).
-define(CAST(Msg), gen_server:cast(?MODULE, Msg)). -callback create(hocon:config()) -> backend_state().
-callback update(backend_state(), hocon:config()) -> ok | need_recreate.
-callback delete_message(context(), topic()) -> ok. -callback close(backend_state()) -> ok.
-callback store_retained(context(), message()) -> ok. -callback delete_message(backend_state(), topic()) -> ok.
-callback read_message(context(), topic()) -> {ok, list()}. -callback store_retained(backend_state(), message()) -> ok.
-callback page_read(context(), topic(), non_neg_integer(), non_neg_integer()) -> -callback read_message(backend_state(), topic()) -> {ok, list(message())}.
{ok, HasNext :: boolean(), list()}. -callback page_read(backend_state(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) ->
-callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. {ok, has_next(), list(message())}.
-callback clear_expired(context()) -> ok. -callback match_messages(backend_state(), topic(), cursor()) -> {ok, list(message()), cursor()}.
-callback clean(context()) -> ok. -callback clear_expired(backend_state()) -> ok.
-callback size(context()) -> non_neg_integer(). -callback clean(backend_state()) -> ok.
-callback size(backend_state()) -> non_neg_integer().
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Hook API %% Hook API
@ -120,6 +143,13 @@ on_message_publish(Msg = #message{flags = #{retain := true}}, Context) ->
on_message_publish(Msg, _) -> on_message_publish(Msg, _) ->
{ok, Msg}. {ok, Msg}.
%%------------------------------------------------------------------------------
%% Config API
%%------------------------------------------------------------------------------
post_config_update(_, _UpdateReq, NewConf, OldConf, _AppEnvs) ->
call({update_config, NewConf, OldConf}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% APIs %% APIs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -129,6 +159,7 @@ on_message_publish(Msg, _) ->
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec get_expiry_time(message()) -> non_neg_integer().
get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) -> get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) ->
0; 0;
get_expiry_time(#message{ get_expiry_time(#message{
@ -143,46 +174,50 @@ get_expiry_time(#message{timestamp = Ts}) ->
_ -> Ts + Interval _ -> Ts + Interval
end. end.
get_stop_publish_clear_msg() ->
emqx_conf:get([retainer, stop_publish_clear_msg], false).
-spec update_config(hocon:config()) -> {ok, _} | {error, _}. -spec update_config(hocon:config()) -> {ok, _} | {error, _}.
update_config(Conf) -> update_config(Conf) ->
emqx_conf:update([retainer], Conf, #{override_to => cluster}). emqx_conf:update([retainer], Conf, #{override_to => cluster}).
-spec clean() -> ok.
clean() -> clean() ->
call(?FUNCTION_NAME). call(?FUNCTION_NAME).
-spec delete(topic()) -> ok.
delete(Topic) -> delete(Topic) ->
call({?FUNCTION_NAME, Topic}). call({?FUNCTION_NAME, Topic}).
-spec retained_count() -> non_neg_integer().
retained_count() -> retained_count() ->
call(?FUNCTION_NAME). call(?FUNCTION_NAME).
-spec read_message(topic()) -> {ok, list(message())}.
read_message(Topic) -> read_message(Topic) ->
call({?FUNCTION_NAME, Topic}). call({?FUNCTION_NAME, Topic}).
-spec page_read(emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) ->
{ok, has_next(), list(message())}.
page_read(Topic, Page, Limit) -> page_read(Topic, Page, Limit) ->
call({?FUNCTION_NAME, Topic, Page, Limit}). call({?FUNCTION_NAME, Topic, Page, Limit}).
post_config_update(_, _UpdateReq, NewConf, OldConf, _AppEnvs) -> -spec enabled() -> boolean().
call({update_config, NewConf, OldConf}). enabled() ->
call(?FUNCTION_NAME).
call(Req) -> -spec context() -> ok.
gen_server:call(?MODULE, Req, infinity). context() ->
call(?FUNCTION_NAME).
%%------------------------------------------------------------------------------
%% Internal APIs
%%------------------------------------------------------------------------------
stats_fun() -> stats_fun() ->
gen_server:cast(?MODULE, ?FUNCTION_NAME). gen_server:cast(?MODULE, ?FUNCTION_NAME).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec get_basic_usage_info() -> #{retained_messages => non_neg_integer()}. -spec get_basic_usage_info() -> #{retained_messages => non_neg_integer()}.
get_basic_usage_info() -> get_basic_usage_info() ->
try try
RetainedMessages = gen_server:call(?MODULE, retained_count), #{retained_messages => retained_count()}
#{retained_messages => RetainedMessages}
catch catch
_:_ -> _:_ ->
#{retained_messages => 0} #{retained_messages => 0}
@ -196,13 +231,14 @@ init([]) ->
erlang:process_flag(trap_exit, true), erlang:process_flag(trap_exit, true),
emqx_conf:add_handler([retainer], ?MODULE), emqx_conf:add_handler([retainer], ?MODULE),
State = new_state(), State = new_state(),
#{enable := Enable} = Cfg = emqx:get_config([retainer]), RetainerConfig = emqx:get_config([retainer]),
{ok, {ok,
case Enable of case maps:get(enable, RetainerConfig) of
false ->
State;
true -> true ->
enable_retainer(State, Cfg); BackendConfig = enabled_backend_config(RetainerConfig),
_ -> enable_retainer(State, RetainerConfig, BackendConfig)
State
end}. end}.
handle_call({update_config, NewConf, OldConf}, _, State) -> handle_call({update_config, NewConf, OldConf}, _, State) ->
@ -210,39 +246,34 @@ handle_call({update_config, NewConf, OldConf}, _, State) ->
emqx_retainer_dispatcher:refresh_limiter(NewConf), emqx_retainer_dispatcher:refresh_limiter(NewConf),
{reply, ok, State2}; {reply, ok, State2};
handle_call(clean, _, #{context := Context} = State) -> handle_call(clean, _, #{context := Context} = State) ->
clean(Context), _ = clean(Context),
{reply, ok, State}; {reply, ok, State};
handle_call({delete, Topic}, _, #{context := Context} = State) -> handle_call({delete, Topic}, _, #{context := Context} = State) ->
delete_message(Context, Topic), _ = delete_message(Context, Topic),
{reply, ok, State}; {reply, ok, State};
handle_call({read_message, Topic}, _, #{context := Context} = State) -> handle_call({read_message, Topic}, _, #{context := Context} = State) ->
Mod = get_backend_module(), {reply, read_message(Context, Topic), State};
Result = Mod:read_message(Context, Topic),
{reply, Result, State};
handle_call({page_read, Topic, Page, Limit}, _, #{context := Context} = State) -> handle_call({page_read, Topic, Page, Limit}, _, #{context := Context} = State) ->
Mod = get_backend_module(), {reply, page_read(Context, Topic, Page, Limit), State};
Result = Mod:page_read(Context, Topic, Page, Limit),
{reply, Result, State};
handle_call(retained_count, _From, State = #{context := Context}) -> handle_call(retained_count, _From, State = #{context := Context}) ->
Mod = get_backend_module(), {reply, count(Context), State};
RetainedCount = Mod:size(Context), handle_call(enabled, _From, State = #{enable := Enable}) ->
{reply, RetainedCount, State}; {reply, Enable, State};
handle_call(context, _From, State = #{context := Context}) ->
{reply, Context, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast(stats_fun, #{context := Context} = State) -> handle_cast(stats_fun, #{context := Context} = State) ->
Mod = get_backend_module(), emqx_stats:setstat('retained.count', 'retained.max', count(Context)),
Size = Mod:size(Context),
emqx_stats:setstat('retained.count', 'retained.max', Size),
{noreply, State}; {noreply, State};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
handle_info(clear_expired, #{context := Context} = State) -> handle_info(clear_expired, #{context := Context} = State) ->
Mod = get_backend_module(), ok = clear_expired(Context),
Mod:clear_expired(Context),
Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL), Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL),
{noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate}; {noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
@ -254,25 +285,24 @@ terminate(_Reason, #{clear_timer := ClearTimer}) ->
_ = stop_timer(ClearTimer), _ = stop_timer(ClearTimer),
ok. ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Internal functions %% Internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
call(Req) ->
gen_server:call(?MODULE, Req, infinity).
get_stop_publish_clear_msg() ->
emqx_conf:get([retainer, stop_publish_clear_msg], false).
-spec new_state() -> state(). -spec new_state() -> state().
new_state() -> new_state() ->
#{ #{
enable => false, enable => false,
context_id => 0,
context => undefined, context => undefined,
clear_timer => undefined clear_timer => undefined
}. }.
-spec new_context(pos_integer()) -> context().
new_context(Id) ->
#{context_id => Id}.
payload_size_limit() -> payload_size_limit() ->
emqx_conf:get(?MAX_PAYLOAD_SIZE_CONFIG_PATH, ?DEF_MAX_PAYLOAD_SIZE). emqx_conf:get(?MAX_PAYLOAD_SIZE_CONFIG_PATH, ?DEF_MAX_PAYLOAD_SIZE).
@ -282,8 +312,34 @@ dispatch(Context, Topic) ->
-spec delete_message(context(), topic()) -> ok. -spec delete_message(context(), topic()) -> ok.
delete_message(Context, Topic) -> delete_message(Context, Topic) ->
Mod = get_backend_module(), Mod = backend_module(Context),
Mod:delete_message(Context, Topic). BackendState = backend_state(Context),
Mod:delete_message(BackendState, Topic).
-spec read_message(context(), topic()) -> {ok, list(message())}.
read_message(Context, Topic) ->
Mod = backend_module(Context),
BackendState = backend_state(Context),
Mod:read_message(BackendState, Topic).
-spec page_read(context(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) ->
{ok, has_next(), list(message())}.
page_read(Context, Topic, Page, Limit) ->
Mod = backend_module(Context),
BackendState = backend_state(Context),
Mod:page_read(BackendState, Topic, Page, Limit).
-spec count(context()) -> non_neg_integer().
count(Context) ->
Mod = backend_module(Context),
BackendState = backend_state(Context),
Mod:size(BackendState).
-spec clear_expired(context()) -> ok.
clear_expired(Context) ->
Mod = backend_module(Context),
BackendState = backend_state(Context),
Mod:clear_expired(BackendState).
-spec store_retained(context(), message()) -> ok. -spec store_retained(context(), message()) -> ok.
store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) -> store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) ->
@ -298,54 +354,47 @@ store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) ->
limit => Limit limit => Limit
}); });
_ -> _ ->
Mod = get_backend_module(), Mod = backend_module(Context),
Mod:store_retained(Context, Msg) BackendState = backend_state(Context),
Mod:store_retained(BackendState, Msg)
end. end.
-spec clean(context()) -> ok. -spec clean(context()) -> ok.
clean(Context) -> clean(Context) ->
Mod = get_backend_module(), Mod = backend_module(Context),
Mod:clean(Context). BackendState = backend_state(Context),
Mod:clean(BackendState).
-spec update_config(state(), hocon:config(), hocon:config()) -> state(). -spec update_config(state(), hocon:config(), hocon:config()) -> state().
update_config(State, Conf, OldConf) -> update_config(State, NewConfig, OldConfig) ->
update_config( update_config(
maps:get(enable, Conf), maps:get(enable, NewConfig),
maps:get(enable, OldConf), maps:get(enable, OldConfig),
State, State,
Conf, NewConfig,
OldConf OldConfig
). ).
-spec update_config(boolean(), boolean(), state(), hocon:config(), hocon:config()) -> state(). -spec update_config(boolean(), boolean(), state(), hocon:config(), hocon:config()) -> state().
update_config(false, _, State, _, _) -> update_config(false, _, State, _, _) ->
disable_retainer(State); disable_retainer(State);
update_config(true, false, State, NewConf, _) -> update_config(true, false, State, NewConfig, _) ->
enable_retainer(State, NewConf); enable_retainer(State, NewConfig, enabled_backend_config(NewConfig));
update_config( update_config(
true, true,
true, true,
#{clear_timer := ClearTimer} = State, #{clear_timer := ClearTimer, context := Context} = State,
NewConf, NewConfig,
OldConf OldConfig
) -> ) ->
#{ #{msg_clear_interval := ClearInterval} = NewConfig,
backend := #{ OldBackendConfig = enabled_backend_config(OldConfig),
type := BackendType, NewBackendConfig = enabled_backend_config(NewConfig),
storage_type := StorageType OldMod = config_backend_module(OldBackendConfig),
}, NewMod = config_backend_module(NewBackendConfig),
msg_clear_interval := ClearInterval
} = NewConf,
#{ SameBackendType = NewMod =:= OldMod,
backend := #{ case SameBackendType andalso ok =:= OldMod:update(Context, NewBackendConfig) of
type := OldBackendType,
storage_type := OldStorageType
}
} = OldConf,
SameBackendType = BackendType =:= OldBackendType,
SameStorageType = StorageType =:= OldStorageType,
case SameBackendType andalso SameStorageType of
true -> true ->
State#{ State#{
clear_timer := check_timer( clear_timer := check_timer(
@ -356,23 +405,19 @@ update_config(
}; };
false -> false ->
State2 = disable_retainer(State), State2 = disable_retainer(State),
enable_retainer(State2, NewConf) enable_retainer(State2, NewConfig, NewBackendConfig)
end. end.
-spec enable_retainer(state(), hocon:config()) -> state(). -spec enable_retainer(state(), hocon:config(), hocon:config()) -> state().
enable_retainer( enable_retainer(
#{context_id := ContextId} = State, State,
#{ #{msg_clear_interval := ClearInterval} = _RetainerConfig,
msg_clear_interval := ClearInterval, BackendConfig
backend := BackendCfg
}
) -> ) ->
NewContextId = ContextId + 1, Context = create(BackendConfig),
Context = create_resource(new_context(NewContextId), BackendCfg), ok = load(Context),
load(Context),
State#{ State#{
enable := true, enable := true,
context_id := NewContextId,
context := Context, context := Context,
clear_timer := add_timer(ClearInterval, clear_expired) clear_timer := add_timer(ClearInterval, clear_expired)
}. }.
@ -384,8 +429,8 @@ disable_retainer(
context := Context context := Context
} = State } = State
) -> ) ->
unload(), ok = unload(),
ok = close_resource(Context), ok = close(Context),
State#{ State#{
enable := false, enable := false,
clear_timer := stop_timer(ClearTimer) clear_timer := stop_timer(ClearTimer)
@ -414,24 +459,44 @@ check_timer(Timer, undefined, _) ->
check_timer(Timer, _, _) -> check_timer(Timer, _, _) ->
Timer. Timer.
-spec get_backend_module() -> backend(). -spec enabled_backend_config(hocon:config()) -> hocon:config() | no_return().
get_backend_module() -> enabled_backend_config(#{backend := Backend, external_backends := ExternalBackends} = Config) ->
ModName = AllBackends = [Backend | maps:values(ExternalBackends)],
case emqx:get_config([retainer, backend]) of case lists:search(fun(#{enable := Enable}) -> Enable end, AllBackends) of
#{type := built_in_database} -> mnesia; {value, EnabledBackend} -> EnabledBackend;
#{type := Backend} -> Backend false -> error({no_enabled_backend, Config})
end, end.
erlang:list_to_existing_atom(io_lib:format("~ts_~ts", [?APP, ModName])).
create_resource(Context, #{type := built_in_database} = Cfg) -> -spec config_backend_module(hocon:config()) -> module().
emqx_retainer_mnesia:create_resource(Cfg), config_backend_module(Config) ->
Context. case Config of
#{type := built_in_database} -> emqx_retainer_mnesia;
#{module := Module} -> Module
end.
-spec close_resource(context()) -> ok | {error, term()}. -spec backend_module(context()) -> module().
close_resource(#{resource_id := ResourceId}) -> backend_module(#{module := Module}) -> Module.
emqx_resource:stop(ResourceId);
close_resource(_) -> -spec backend_state(context()) -> backend_state().
ok. backend_state(#{state := State}) -> State.
-spec backend_module() -> module() | no_return().
backend_module() ->
Config = enabled_backend_config(emqx:get_config([retainer])),
config_backend_module(Config).
-spec create(hocon:config()) -> context().
create(Cfg) ->
Mod = config_backend_module(Cfg),
#{
module => Mod,
state => Mod:create(Cfg)
}.
-spec close(context()) -> ok | {error, term()}.
close(Context) ->
Mod = backend_module(Context),
Mod:close(Context).
-spec load(context()) -> ok. -spec load(context()) -> ok.
load(Context) -> load(Context) ->

View File

@ -185,11 +185,11 @@ lookup_retained(get, #{query_string := Qs}) ->
Page = maps:get(<<"page">>, Qs, 1), Page = maps:get(<<"page">>, Qs, 1),
Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()), Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()),
Topic = maps:get(<<"topic">>, Qs, undefined), Topic = maps:get(<<"topic">>, Qs, undefined),
{ok, HasNext, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit), {ok, HasNext, Msgs} = emqx_retainer:page_read(Topic, Page, Limit),
Meta = Meta =
case Topic of case Topic of
undefined -> undefined ->
#{count => emqx_retainer_mnesia:size(?TAB_MESSAGE)}; #{count => emqx_retainer:retained_count()};
_ -> _ ->
#{} #{}
end, end,
@ -205,7 +205,7 @@ lookup_retained(get, #{query_string := Qs}) ->
with_topic(get, #{bindings := Bindings}) -> with_topic(get, #{bindings := Bindings}) ->
Topic = maps:get(topic, Bindings), Topic = maps:get(topic, Bindings),
{ok, _, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1), {ok, _, Msgs} = emqx_retainer:page_read(Topic, 1, 1),
case Msgs of case Msgs of
[H | _] -> [H | _] ->
{200, format_detail_message(H)}; {200, format_detail_message(H)};
@ -217,14 +217,14 @@ with_topic(get, #{bindings := Bindings}) ->
end; end;
with_topic(delete, #{bindings := Bindings}) -> with_topic(delete, #{bindings := Bindings}) ->
Topic = maps:get(topic, Bindings), Topic = maps:get(topic, Bindings),
case emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1) of case emqx_retainer:page_read(Topic, 1, 1) of
{ok, _, []} -> {ok, _, []} ->
{404, #{ {404, #{
code => <<"NOT_FOUND">>, code => <<"NOT_FOUND">>,
message => <<"Viewed message doesn't exist">> message => <<"Viewed message doesn't exist">>
}}; }};
{ok, _, _} -> {ok, _, _} ->
emqx_retainer_mnesia:delete_message(undefined, Topic), emqx_retainer:delete(Topic),
{204} {204}
end. end.
@ -265,8 +265,8 @@ to_bin_string(Data) ->
list_to_binary(io_lib:format("~p", [Data])). list_to_binary(io_lib:format("~p", [Data])).
check_backend(Type, Params, Cont) -> check_backend(Type, Params, Cont) ->
case emqx:get_config([retainer, backend, type]) of case emqx_retainer:backend_module() of
built_in_database -> emqx_retainer_mnesia ->
Cont(Type, Params); Cont(Type, Params);
_ -> _ ->
{400, 'BAD_REQUEST', <<"This API only support built in database">>} {400, 'BAD_REQUEST', <<"This API only support built in database">>}

View File

@ -26,12 +26,12 @@
]). ]).
start(_Type, _Args) -> start(_Type, _Args) ->
ok = emqx_retainer_mnesia_cli:load(), ok = emqx_retainer_cli:load(),
init_bucket(), init_bucket(),
emqx_retainer_sup:start_link(). emqx_retainer_sup:start_link().
stop(_State) -> stop(_State) ->
ok = emqx_retainer_mnesia_cli:unload(), ok = emqx_retainer_cli:unload(),
delete_bucket(), delete_bucket(),
ok. ok.

View File

@ -14,7 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_retainer_mnesia_cli). -module(emqx_retainer_cli).
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
@ -32,36 +32,52 @@ load() ->
ok = emqx_ctl:register_command(retainer, {?MODULE, retainer}, []). ok = emqx_ctl:register_command(retainer, {?MODULE, retainer}, []).
retainer(["info"]) -> retainer(["info"]) ->
count(); if_enabled(fun() ->
count()
end);
retainer(["topics"]) -> retainer(["topics"]) ->
topic(1, 1000); if_enabled(fun() ->
topic(1, 1000)
end);
retainer(["topics", Start, Len]) -> retainer(["topics", Start, Len]) ->
topic(list_to_integer(Start), list_to_integer(Len)); if_enabled(fun() ->
topic(list_to_integer(Start), list_to_integer(Len))
end);
retainer(["clean", Topic]) -> retainer(["clean", Topic]) ->
emqx_retainer:delete(list_to_binary(Topic)); if_enabled(fun() ->
emqx_retainer:delete(list_to_binary(Topic))
end);
retainer(["clean"]) -> retainer(["clean"]) ->
emqx_retainer:clean(); if_enabled(fun() ->
emqx_retainer:clean()
end);
retainer(["reindex", "status"]) -> retainer(["reindex", "status"]) ->
case emqx_retainer_mnesia:reindex_status() of if_mnesia_enabled(fun() ->
true -> case emqx_retainer_mnesia:reindex_status() of
?PRINT_MSG("Reindexing is in progress~n"); true ->
false -> ?PRINT_MSG("Reindexing is in progress~n");
?PRINT_MSG("Reindexing is not running~n") false ->
end; ?PRINT_MSG("Reindexing is not running~n")
end
end);
retainer(["reindex", "start"]) -> retainer(["reindex", "start"]) ->
retainer(["reindex", "start", "false"]); if_mnesia_enabled(fun() ->
retainer(["reindex", "start", "false"])
end);
retainer(["reindex", "start", ForceParam]) -> retainer(["reindex", "start", ForceParam]) ->
case mria_rlog:role() of if_mnesia_enabled(fun() ->
core -> case mria_rlog:role() of
Force = core ->
case ForceParam of Force =
"true" -> true; case ForceParam of
_ -> false "true" -> true;
end, _ -> false
do_reindex(Force); end,
replicant -> do_reindex(Force);
?PRINT_MSG("Can't run reindex on a replicant node") replicant ->
end; ?PRINT_MSG("Can't run reindex on a replicant node")
end
end);
retainer(_) -> retainer(_) ->
emqx_ctl:usage( emqx_ctl:usage(
[ [
@ -71,10 +87,12 @@ retainer(_) ->
"Show topics of retained messages by the specified range"}, "Show topics of retained messages by the specified range"},
{"retainer clean", "Clean all retained messages"}, {"retainer clean", "Clean all retained messages"},
{"retainer clean <Topic>", "Clean retained messages by the specified topic filter"}, {"retainer clean <Topic>", "Clean retained messages by the specified topic filter"},
{"retainer reindex status", "Show reindex status"}, {"retainer reindex status",
"Show reindex status.\nOnly available for built-in backend."},
{"retainer reindex start [force]", {"retainer reindex start [force]",
"Generate new retainer topic indices from config settings.\n" "Generate new retainer topic indices from config settings.\n"
"Pass true as <Force> to ignore previously started reindexing"} "Pass true as <Force> to ignore previously started reindexing.\n"
"Only available for built-in backend."}
] ]
). ).
@ -107,6 +125,23 @@ count() ->
topic(Start, Len) -> topic(Start, Len) ->
count(), count(),
Topics = lists:sublist(emqx_retainer_mnesia:topics(), Start, Len), {ok, _HasNext, Messages} = emqx_retainer:page_read(<<"#">>, Start, Len),
[?PRINT("~ts~n", [I]) || I <- Topics], [?PRINT("~ts~n", [emqx_message:topic(M)]) || M <- Messages],
ok. ok.
if_enabled(Fun) ->
case emqx_retainer:enabled() of
true -> Fun();
false -> ?PRINT_MSG("Retainer is not enabled~n")
end.
if_mnesia_backend(Fun) ->
case emqx_retainer:backend_module() of
emqx_retainer_mnesia -> Fun();
_ -> ?PRINT_MSG("Command only applicable for builtin backend~n")
end.
if_mnesia_enabled(Fun) ->
if_enabled(fun() ->
if_mnesia_backend(Fun)
end).

View File

@ -44,6 +44,9 @@
]). ]).
-type limiter() :: emqx_htb_limiter:limiter(). -type limiter() :: emqx_htb_limiter:limiter().
-type context() :: emqx_retainer:context().
-type topic() :: emqx_types:topic().
-type cursor() :: emqx_retainer:cursor().
-define(POOL, ?MODULE). -define(POOL, ?MODULE).
@ -233,7 +236,7 @@ cast(Msg) ->
-spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}. -spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}.
dispatch(Context, Pid, Topic, Cursor, Limiter) -> dispatch(Context, Pid, Topic, Cursor, Limiter) ->
Mod = emqx_retainer:get_backend_module(), Mod = emqx_retainer:backend_module(Context),
case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
false -> false ->
{ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]), {ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]),

View File

@ -26,6 +26,9 @@
%% emqx_retainer callbacks %% emqx_retainer callbacks
-export([ -export([
create/1,
update/2,
close/1,
delete_message/2, delete_message/2,
store_retained/2, store_retained/2,
read_message/2, read_message/2,
@ -46,14 +49,10 @@
%% Management API: %% Management API:
-export([topics/0]). -export([topics/0]).
-export([create_resource/1]).
-export([reindex/2, reindex_status/0]). -export([reindex/2, reindex_status/0]).
-ifdef(TEST).
-export([populate_index_meta/0]). -export([populate_index_meta/0]).
-export([reindex/3]). -export([reindex/3]).
-endif.
-record(retained_message, {topic, msg, expiry_time}). -record(retained_message, {topic, msg, expiry_time}).
-record(retained_index, {key, expiry_time}). -record(retained_index, {key, expiry_time}).
@ -78,7 +77,7 @@ topics() ->
%% emqx_retainer callbacks %% emqx_retainer callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
create_resource(#{storage_type := StorageType}) -> create(#{storage_type := StorageType, max_retained_messages := MaxRetainedMessages} = Config) ->
ok = create_table( ok = create_table(
?TAB_INDEX_META, ?TAB_INDEX_META,
retained_index_meta, retained_index_meta,
@ -86,7 +85,7 @@ create_resource(#{storage_type := StorageType}) ->
set, set,
StorageType StorageType
), ),
ok = populate_index_meta(), ok = populate_index_meta(Config),
ok = create_table( ok = create_table(
?TAB_MESSAGE, ?TAB_MESSAGE,
retained_message, retained_message,
@ -100,7 +99,8 @@ create_resource(#{storage_type := StorageType}) ->
record_info(fields, retained_index), record_info(fields, retained_index),
ordered_set, ordered_set,
StorageType StorageType
). ),
#{storage_type => StorageType, max_retained_messages => MaxRetainedMessages}.
create_table(Table, RecordName, Attributes, Type, StorageType) -> create_table(Table, RecordName, Attributes, Type, StorageType) ->
Copies = Copies =
@ -136,10 +136,15 @@ create_table(Table, RecordName, Attributes, Type, StorageType) ->
ok ok
end. end.
store_retained(_, Msg = #message{topic = Topic}) -> update(_State, _NewConfig) ->
need_recreate.
close(_State) -> ok.
store_retained(State, Msg = #message{topic = Topic}) ->
ExpiryTime = emqx_retainer:get_expiry_time(Msg), ExpiryTime = emqx_retainer:get_expiry_time(Msg),
Tokens = topic_to_tokens(Topic), Tokens = topic_to_tokens(Topic),
case is_table_full() andalso is_new_topic(Tokens) of case is_table_full(State) andalso is_new_topic(Tokens) of
true -> true ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "failed_to_retain_message", msg => "failed_to_retain_message",
@ -172,7 +177,7 @@ clear_expired() ->
QC = qlc:cursor(QH), QC = qlc:cursor(QH),
clear_batch(dirty_indices(write), QC). clear_batch(dirty_indices(write), QC).
delete_message(_, Topic) -> delete_message(_State, Topic) ->
Tokens = topic_to_tokens(Topic), Tokens = topic_to_tokens(Topic),
case emqx_topic:wildcard(Topic) of case emqx_topic:wildcard(Topic) of
false -> false ->
@ -188,10 +193,10 @@ delete_message(_, Topic) ->
) )
end. end.
read_message(_, Topic) -> read_message(_State, Topic) ->
{ok, read_messages(Topic)}. {ok, read_messages(Topic)}.
match_messages(_, Topic, undefined) -> match_messages(_State, Topic, undefined) ->
Tokens = topic_to_tokens(Topic), Tokens = topic_to_tokens(Topic),
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
QH = msg_table(search_table(Tokens, Now)), QH = msg_table(search_table(Tokens, Now)),
@ -202,7 +207,7 @@ match_messages(_, Topic, undefined) ->
Cursor = qlc:cursor(QH), Cursor = qlc:cursor(QH),
match_messages(undefined, Topic, {Cursor, BatchNum}) match_messages(undefined, Topic, {Cursor, BatchNum})
end; end;
match_messages(_, _Topic, {Cursor, BatchNum}) -> match_messages(_State, _Topic, {Cursor, BatchNum}) ->
case qlc_next_answers(Cursor, BatchNum) of case qlc_next_answers(Cursor, BatchNum) of
{closed, Rows} -> {closed, Rows} ->
{ok, Rows, undefined}; {ok, Rows, undefined};
@ -210,7 +215,7 @@ match_messages(_, _Topic, {Cursor, BatchNum}) ->
{ok, Rows, {Cursor, BatchNum}} {ok, Rows, {Cursor, BatchNum}}
end. end.
page_read(_, Topic, Page, Limit) -> page_read(_State, Topic, Page, Limit) ->
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
QH = QH =
case Topic of case Topic of
@ -253,7 +258,8 @@ size(_) ->
table_size(). table_size().
reindex(Force, StatusFun) -> reindex(Force, StatusFun) ->
reindex(config_indices(), Force, StatusFun). Config = emqx:get_config([retainer, backend]),
reindex(config_indices(Config), Force, StatusFun).
reindex_status() -> reindex_status() ->
case mnesia:dirty_read(?TAB_INDEX_META, ?META_KEY) of case mnesia:dirty_read(?TAB_INDEX_META, ?META_KEY) of
@ -428,9 +434,8 @@ make_index_match_spec(Index, Tokens, NowMs) ->
MsHd = #retained_index{key = Cond, expiry_time = '$3'}, MsHd = #retained_index{key = Cond, expiry_time = '$3'},
{[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}], IsExact}. {[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}], IsExact}.
is_table_full() -> is_table_full(#{max_retained_messages := MaxRetainedMessages} = _State) ->
Limit = emqx:get_config([retainer, backend, max_retained_messages]), MaxRetainedMessages > 0 andalso (table_size() >= MaxRetainedMessages).
Limit > 0 andalso (table_size() >= Limit).
is_new_topic(Tokens) -> is_new_topic(Tokens) ->
case mnesia:dirty_read(?TAB_MESSAGE, Tokens) of case mnesia:dirty_read(?TAB_MESSAGE, Tokens) of
@ -443,11 +448,15 @@ is_new_topic(Tokens) ->
table_size() -> table_size() ->
mnesia:table_info(?TAB_MESSAGE, size). mnesia:table_info(?TAB_MESSAGE, size).
config_indices() -> config_indices(#{index_specs := IndexSpecs}) ->
lists:sort(emqx_config:get([retainer, backend, index_specs])). IndexSpecs.
populate_index_meta() -> populate_index_meta() ->
ConfigIndices = config_indices(), Config = emqx:get_config([retainer, backend]),
populate_index_meta(Config).
populate_index_meta(Config) ->
ConfigIndices = config_indices(Config),
case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_populate_index_meta/1, [ConfigIndices]) of case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_populate_index_meta/1, [ConfigIndices]) of
{atomic, ok} -> {atomic, ok} ->
ok; ok;

View File

@ -36,7 +36,8 @@ roots() ->
[ [
{"retainer", {"retainer",
hoconsc:mk(hoconsc:ref(?MODULE, "retainer"), #{ hoconsc:mk(hoconsc:ref(?MODULE, "retainer"), #{
converter => fun retainer_converter/2 converter => fun retainer_converter/2,
validator => fun validate_backends_enabled/1
})} })}
]. ].
@ -86,11 +87,29 @@ fields("retainer") ->
aliases => [deliver_rate] aliases => [deliver_rate]
} }
)}, )},
{backend, backend_config()} {backend, backend_config()},
{external_backends,
?HOCON(
hoconsc:ref(?MODULE, external_backends),
#{
desc => ?DESC(backends),
required => false,
default => #{},
importance => ?IMPORTANCE_HIDDEN
}
)}
]; ];
fields(mnesia_config) -> fields(mnesia_config) ->
[ [
{type, sc(built_in_database, mnesia_config_type, built_in_database)}, {type,
?HOCON(
built_in_database,
#{
desc => ?DESC(mnesia_config_type),
required => false,
default => built_in_database
}
)},
{storage_type, {storage_type,
sc( sc(
hoconsc:enum([ram, disc]), hoconsc:enum([ram, disc]),
@ -103,7 +122,13 @@ fields(mnesia_config) ->
max_retained_messages, max_retained_messages,
0 0
)}, )},
{index_specs, fun retainer_indices/1} {index_specs, fun retainer_indices/1},
{enable,
?HOCON(boolean(), #{
desc => ?DESC(mnesia_enable),
required => false,
default => true
})}
]; ];
fields(flow_control) -> fields(flow_control) ->
[ [
@ -125,7 +150,9 @@ fields(flow_control) ->
batch_deliver_limiter, batch_deliver_limiter,
undefined undefined
)} )}
]. ];
fields(external_backends) ->
emqx_schema_hooks:injection_point('retainer.external_backends').
desc("retainer") -> desc("retainer") ->
"Configuration related to handling `PUBLISH` packets with a `retain` flag set to 1."; "Configuration related to handling `PUBLISH` packets with a `retain` flag set to 1.";
@ -140,9 +167,6 @@ desc(_) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%sc(Type, DescId) ->
%% hoconsc:mk(Type, #{desc => ?DESC(DescId)}).
sc(Type, DescId, Default) -> sc(Type, DescId, Default) ->
sc(Type, DescId, Default, ?DEFAULT_IMPORTANCE). sc(Type, DescId, Default, ?DEFAULT_IMPORTANCE).
sc(Type, DescId, Default, Importance) -> sc(Type, DescId, Default, Importance) ->
@ -214,3 +238,16 @@ retainer_converter(#{<<"deliver_rate">> := Delivery} = Conf, Opts) ->
retainer_converter(Conf1#{<<"delivery_rate">> => Delivery}, Opts); retainer_converter(Conf1#{<<"delivery_rate">> => Delivery}, Opts);
retainer_converter(Conf, _Opts) -> retainer_converter(Conf, _Opts) ->
Conf. Conf.
validate_backends_enabled(Config) ->
BuiltInBackend = maps:get(<<"backend">>, Config, #{}),
ExternalBackends = maps:values(maps:get(<<"external_backends">>, Config, #{})),
Enabled = lists:filter(fun(#{<<"enable">> := E}) -> E end, [BuiltInBackend | ExternalBackends]),
case Enabled of
[#{}] ->
ok;
_Conflicts = [_ | _] ->
{error, multiple_enabled_backends};
_None = [] ->
{error, no_enabled_backend}
end.

View File

@ -19,8 +19,6 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-include("emqx_retainer.hrl"). -include("emqx_retainer.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -646,7 +644,7 @@ t_reindex(_) ->
t_get_basic_usage_info(_Config) -> t_get_basic_usage_info(_Config) ->
?assertEqual(#{retained_messages => 0}, emqx_retainer:get_basic_usage_info()), ?assertEqual(#{retained_messages => 0}, emqx_retainer:get_basic_usage_info()),
Context = undefined, Context = emqx_retainer:context(),
lists:foreach( lists:foreach(
fun(N) -> fun(N) ->
Num = integer_to_binary(N), Num = integer_to_binary(N),
@ -789,6 +787,11 @@ t_compatibility_for_deliver_rate(_) ->
Parser(DeliveryInf) Parser(DeliveryInf)
). ).
t_update_config(_) ->
OldConf = emqx_config:get_raw([retainer]),
NewConf = emqx_utils_maps:deep_put([<<"backend">>, <<"storage_type">>], OldConf, <<"disk">>),
emqx_retainer:update_config(NewConf).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -833,7 +836,7 @@ with_conf(ConfMod, Case) ->
emqx_retainer:update_config(NewConf), emqx_retainer:update_config(NewConf),
try try
Case(), Case(),
emqx_retainer:update_config(Conf) {ok, _} = emqx_retainer:update_config(Conf)
catch catch
Type:Error:Strace -> Type:Error:Strace ->
emqx_retainer:update_config(Conf), emqx_retainer:update_config(Conf),

View File

@ -0,0 +1,104 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_retainer_backend_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() -> emqx_common_test_helpers:all(?MODULE).
-define(BASE_CONF, #{
<<"retainer">> =>
#{
<<"enable">> => true,
<<"backend">> =>
#{
<<"type">> => <<"built_in_database">>,
<<"storage_type">> => <<"ram">>,
<<"max_retained_messages">> => 0
}
}
}).
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
{emqx, #{
before_start => fun() ->
ok = emqx_schema_hooks:inject_from_modules([emqx_retainer_dummy])
end
}},
emqx_conf,
{emqx_retainer, ?BASE_CONF}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{suite_apps, Apps} | Config].
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)).
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
t_external_backend(_Config) ->
{ok, _} = emqx_retainer:update_config(#{
<<"enable">> => true,
<<"backend">> => #{
<<"enable">> => false
},
<<"external_backends">> =>
#{
<<"dummy">> => #{<<"enable">> => true}
}
}),
?assertMatch(
ok,
emqx_retainer:clean()
),
?assertMatch(
ok,
emqx_retainer:delete(<<"topic">>)
),
?assertMatch(
{ok, []},
emqx_retainer:read_message(<<"topic">>)
),
?assertMatch(
{ok, false, []},
emqx_retainer:page_read(<<"topic">>, 0, 10)
),
?assertEqual(
0,
emqx_retainer:retained_count()
),
?assertEqual(
emqx_retainer_dummy,
emqx_retainer:backend_module()
),
?assertEqual(
true,
emqx_retainer:enabled()
).

View File

@ -38,22 +38,22 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)). emqx_cth_suite:stop(?config(suite_apps, Config)).
t_reindex_status(_Config) -> t_reindex_status(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["reindex", "status"]). ok = emqx_retainer_cli:retainer(["reindex", "status"]).
t_info(_Config) -> t_info(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["info"]). ok = emqx_retainer_cli:retainer(["info"]).
t_topics(_Config) -> t_topics(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["topics"]). ok = emqx_retainer_cli:retainer(["topics"]).
t_topics_with_len(_Config) -> t_topics_with_len(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["topics", "100", "200"]). ok = emqx_retainer_cli:retainer(["topics", "100", "200"]).
t_clean(_Config) -> t_clean(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["clean"]). ok = emqx_retainer_cli:retainer(["clean"]).
t_topic(_Config) -> t_topic(_Config) ->
ok = emqx_retainer_mnesia_cli:retainer(["clean", "foo/bar"]). ok = emqx_retainer_cli:retainer(["clean", "foo/bar"]).
t_reindex(_Config) -> t_reindex(_Config) ->
{ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
@ -84,6 +84,6 @@ t_reindex(_Config) ->
), ),
emqx_config:put([retainer, backend, index_specs], [[4, 5]]), emqx_config:put([retainer, backend, index_specs], [[4, 5]]),
ok = emqx_retainer_mnesia_cli:retainer(["reindex", "start"]), ok = emqx_retainer_cli:retainer(["reindex", "start"]),
?assertEqual(1000, mnesia:table_info(?TAB_INDEX, size)). ?assertEqual(1000, mnesia:table_info(?TAB_INDEX, size)).

View File

@ -0,0 +1,98 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_retainer_dummy).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-behaviour(emqx_retainer).
-export([
create/1,
update/2,
close/1,
delete_message/2,
store_retained/2,
read_message/2,
page_read/4,
match_messages/3,
clear_expired/1,
clean/1,
size/1
]).
-behaviour(emqx_schema_hooks).
-export([
fields/1,
injected_fields/0
]).
injected_fields() ->
#{
'retainer.external_backends' => external_backend_fields()
}.
create(_Config) -> #{}.
update(_Context, _Config) -> ok.
close(_Context) -> ok.
delete_message(_Context, _Topic) -> ok.
store_retained(_Context, _Message) -> ok.
read_message(_Context, _Topic) -> {ok, []}.
page_read(_Context, _Topic, _Offset, _Limit) -> {ok, false, []}.
match_messages(_Context, _Topic, _Cursor) -> {ok, [], 0}.
clear_expired(_Context) -> ok.
clean(_Context) -> ok.
size(_Context) -> 0.
external_backend_fields() ->
[
{dummy, hoconsc:ref(?MODULE, dummy)}
].
fields(dummy) ->
[
{module,
hoconsc:mk(
emqx_retainer_dummy,
#{
desc => <<"dummy backend mod">>,
required => false,
default => <<"emqx_retainer_dummy">>,
importance => ?IMPORTANCE_HIDDEN
}
)},
{enable,
hoconsc:mk(
boolean(),
#{
desc => <<"enable dummy backend">>,
required => true,
default => false
}
)}
].

View File

@ -621,7 +621,7 @@ mock_httpc() ->
). ).
mock_advanced_mqtt_features() -> mock_advanced_mqtt_features() ->
Context = undefined, Context = emqx_retainer:context(),
lists:foreach( lists:foreach(
fun(N) -> fun(N) ->
Num = integer_to_binary(N), Num = integer_to_binary(N),

View File

@ -33,6 +33,9 @@ mnesia_config_storage_type.desc:
mnesia_config_type.desc: mnesia_config_type.desc:
"""Backend type.""" """Backend type."""
mnesia_enable.desc:
"""Enable built-in Mnesia backend."""
msg_clear_interval.desc: msg_clear_interval.desc:
"""Interval for EMQX to scan expired messages and delete them. Never scan if the value is 0.""" """Interval for EMQX to scan expired messages and delete them. Never scan if the value is 0."""