From dfdf7455d3a7b7faad071fad47684753cf9e36d6 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Sat, 24 Feb 2024 02:58:48 +0300 Subject: [PATCH] feat(retainer): make additional implementations pluggable Co-authored-by: Thales Macedo Garitezi --- apps/emqx_retainer/src/emqx_retainer.erl | 265 +++++++++++------- apps/emqx_retainer/src/emqx_retainer_api.erl | 14 +- apps/emqx_retainer/src/emqx_retainer_app.erl | 4 +- ...r_mnesia_cli.erl => emqx_retainer_cli.erl} | 91 ++++-- .../src/emqx_retainer_dispatcher.erl | 2 +- .../src/emqx_retainer_mnesia.erl | 47 ++-- .../src/emqx_retainer_schema.erl | 53 +++- .../test/emqx_retainer_SUITE.erl | 8 +- .../test/emqx_retainer_backend_SUITE.erl | 104 +++++++ .../test/emqx_retainer_cli_SUITE.erl | 14 +- .../test/emqx_retainer_dummy.erl | 98 +++++++ rel/i18n/emqx_retainer_schema.hocon | 3 + 12 files changed, 523 insertions(+), 180 deletions(-) rename apps/emqx_retainer/src/{emqx_retainer_mnesia_cli.erl => emqx_retainer_cli.erl} (60%) create mode 100644 apps/emqx_retainer/test/emqx_retainer_backend_SUITE.erl create mode 100644 apps/emqx_retainer/test/emqx_retainer_dummy.erl diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 92846f5a4..b375f30ad 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -32,8 +32,7 @@ -export([ delete_message/2, - store_retained/2, - get_backend_module/0 + store_retained/2 ]). -export([ @@ -45,7 +44,15 @@ page_read/3, post_config_update/5, stats_fun/0, - retained_count/0 + retained_count/0, + backend_module/0, + backend_module/1, + enabled/0 +]). + +%% For testing only +-export([ + context/0 ]). %% gen_server callbacks @@ -54,8 +61,7 @@ handle_call/3, handle_cast/2, handle_info/2, - terminate/2, - code_change/3 + terminate/2 ]). -export_type([ @@ -71,29 +77,35 @@ context := undefined | context(), clear_timer := undefined | reference() }. --type context() :: term(). + +-type backend_state() :: term(). + +-type context() :: #{ + module := module(), + state := backend_state() +}. -type topic() :: emqx_types:topic(). -type message() :: emqx_types:message(). -type cursor() :: undefined | term(). --type backend() :: emqx_retainer_mnesia | module(). +-type has_next() :: boolean(). -define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)). -define(DEF_EXPIRY_INTERVAL, 0). -define(MAX_PAYLOAD_SIZE_CONFIG_PATH, [retainer, max_payload_size]). --callback create(map()) -> context(). --callback update(context(), map()) -> ok | need_recreate. --callback close(context()) -> ok. --callback delete_message(context(), topic()) -> ok. --callback store_retained(context(), message()) -> ok. --callback read_message(context(), topic()) -> {ok, list()}. --callback page_read(context(), topic(), non_neg_integer(), non_neg_integer()) -> - {ok, HasNext :: boolean(), list()}. --callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}. --callback clear_expired(context()) -> ok. --callback clean(context()) -> ok. --callback size(context()) -> non_neg_integer(). +-callback create(hocon:config()) -> backend_state(). +-callback update(backend_state(), hocon:config()) -> ok | need_recreate. +-callback close(backend_state()) -> ok. +-callback delete_message(backend_state(), topic()) -> ok. +-callback store_retained(backend_state(), message()) -> ok. +-callback read_message(backend_state(), topic()) -> {ok, list(message())}. +-callback page_read(backend_state(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) -> + {ok, has_next(), list(message())}. +-callback match_messages(backend_state(), topic(), cursor()) -> {ok, list(message()), cursor()}. +-callback clear_expired(backend_state()) -> ok. +-callback clean(backend_state()) -> ok. +-callback size(backend_state()) -> non_neg_integer(). %%------------------------------------------------------------------------------ %% Hook API @@ -131,6 +143,13 @@ on_message_publish(Msg = #message{flags = #{retain := true}}, Context) -> on_message_publish(Msg, _) -> {ok, Msg}. +%%------------------------------------------------------------------------------ +%% Config API +%%------------------------------------------------------------------------------ + +post_config_update(_, _UpdateReq, NewConf, OldConf, _AppEnvs) -> + call({update_config, NewConf, OldConf}). + %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ @@ -140,6 +159,7 @@ on_message_publish(Msg, _) -> start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec get_expiry_time(message()) -> non_neg_integer(). get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) -> 0; get_expiry_time(#message{ @@ -154,41 +174,46 @@ get_expiry_time(#message{timestamp = Ts}) -> _ -> Ts + Interval end. -get_stop_publish_clear_msg() -> - emqx_conf:get([retainer, stop_publish_clear_msg], false). - -spec update_config(hocon:config()) -> {ok, _} | {error, _}. update_config(Conf) -> emqx_conf:update([retainer], Conf, #{override_to => cluster}). +-spec clean() -> ok. clean() -> call(?FUNCTION_NAME). +-spec delete(topic()) -> ok. delete(Topic) -> call({?FUNCTION_NAME, Topic}). +-spec retained_count() -> non_neg_integer(). retained_count() -> call(?FUNCTION_NAME). +-spec read_message(topic()) -> {ok, list(message())}. read_message(Topic) -> call({?FUNCTION_NAME, Topic}). +-spec page_read(emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) -> + {ok, has_next(), list(message())}. page_read(Topic, Page, Limit) -> call({?FUNCTION_NAME, Topic, Page, Limit}). -post_config_update(_, _UpdateReq, NewConf, OldConf, _AppEnvs) -> - call({update_config, NewConf, OldConf}). +-spec enabled() -> boolean(). +enabled() -> + call(?FUNCTION_NAME). -call(Req) -> - gen_server:call(?MODULE, Req, infinity). +-spec context() -> ok. +context() -> + call(?FUNCTION_NAME). + +%%------------------------------------------------------------------------------ +%% Internal APIs +%%------------------------------------------------------------------------------ stats_fun() -> gen_server:cast(?MODULE, ?FUNCTION_NAME). -%%------------------------------------------------------------------------------ -%% APIs -%%------------------------------------------------------------------------------ - -spec get_basic_usage_info() -> #{retained_messages => non_neg_integer()}. get_basic_usage_info() -> try @@ -206,13 +231,14 @@ init([]) -> erlang:process_flag(trap_exit, true), emqx_conf:add_handler([retainer], ?MODULE), State = new_state(), - #{enable := Enable} = Cfg = emqx:get_config([retainer]), + RetainerConfig = emqx:get_config([retainer]), {ok, - case Enable of + case maps:get(enable, RetainerConfig) of + false -> + State; true -> - enable_retainer(State, Cfg); - _ -> - State + BackendConfig = enabled_backend_config(RetainerConfig), + enable_retainer(State, RetainerConfig, BackendConfig) end}. handle_call({update_config, NewConf, OldConf}, _, State) -> @@ -220,39 +246,34 @@ handle_call({update_config, NewConf, OldConf}, _, State) -> emqx_retainer_dispatcher:refresh_limiter(NewConf), {reply, ok, State2}; handle_call(clean, _, #{context := Context} = State) -> - clean(Context), + _ = clean(Context), {reply, ok, State}; handle_call({delete, Topic}, _, #{context := Context} = State) -> - delete_message(Context, Topic), + _ = delete_message(Context, Topic), {reply, ok, State}; handle_call({read_message, Topic}, _, #{context := Context} = State) -> - Mod = get_backend_module(), - Result = Mod:read_message(Context, Topic), - {reply, Result, State}; + {reply, read_message(Context, Topic), State}; handle_call({page_read, Topic, Page, Limit}, _, #{context := Context} = State) -> - Mod = get_backend_module(), - Result = Mod:page_read(Context, Topic, Page, Limit), - {reply, Result, State}; + {reply, page_read(Context, Topic, Page, Limit), State}; handle_call(retained_count, _From, State = #{context := Context}) -> - Mod = get_backend_module(), - RetainedCount = Mod:size(Context), - {reply, RetainedCount, State}; + {reply, count(Context), State}; +handle_call(enabled, _From, State = #{enable := Enable}) -> + {reply, Enable, State}; +handle_call(context, _From, State = #{context := Context}) -> + {reply, Context, State}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. handle_cast(stats_fun, #{context := Context} = State) -> - Mod = get_backend_module(), - Size = Mod:size(Context), - emqx_stats:setstat('retained.count', 'retained.max', Size), + emqx_stats:setstat('retained.count', 'retained.max', count(Context)), {noreply, State}; handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info(clear_expired, #{context := Context} = State) -> - Mod = get_backend_module(), - Mod:clear_expired(Context), + ok = clear_expired(Context), Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL), {noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate}; handle_info(Info, State) -> @@ -264,12 +285,16 @@ terminate(_Reason, #{clear_timer := ClearTimer}) -> _ = stop_timer(ClearTimer), ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ + +call(Req) -> + gen_server:call(?MODULE, Req, infinity). + +get_stop_publish_clear_msg() -> + emqx_conf:get([retainer, stop_publish_clear_msg], false). + -spec new_state() -> state(). new_state() -> #{ @@ -287,8 +312,34 @@ dispatch(Context, Topic) -> -spec delete_message(context(), topic()) -> ok. delete_message(Context, Topic) -> - Mod = get_backend_module(), - Mod:delete_message(Context, Topic). + Mod = backend_module(Context), + BackendState = backend_state(Context), + Mod:delete_message(BackendState, Topic). + +-spec read_message(context(), topic()) -> {ok, list(message())}. +read_message(Context, Topic) -> + Mod = backend_module(Context), + BackendState = backend_state(Context), + Mod:read_message(BackendState, Topic). + +-spec page_read(context(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) -> + {ok, has_next(), list(message())}. +page_read(Context, Topic, Page, Limit) -> + Mod = backend_module(Context), + BackendState = backend_state(Context), + Mod:page_read(BackendState, Topic, Page, Limit). + +-spec count(context()) -> non_neg_integer(). +count(Context) -> + Mod = backend_module(Context), + BackendState = backend_state(Context), + Mod:size(BackendState). + +-spec clear_expired(context()) -> ok. +clear_expired(Context) -> + Mod = backend_module(Context), + BackendState = backend_state(Context), + Mod:clear_expired(BackendState). -spec store_retained(context(), message()) -> ok. store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) -> @@ -303,52 +354,47 @@ store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) -> limit => Limit }); _ -> - Mod = get_backend_module(), - Mod:store_retained(Context, Msg) + Mod = backend_module(Context), + BackendState = backend_state(Context), + Mod:store_retained(BackendState, Msg) end. -spec clean(context()) -> ok. clean(Context) -> - Mod = get_backend_module(), - Mod:clean(Context). + Mod = backend_module(Context), + BackendState = backend_state(Context), + Mod:clean(BackendState). -spec update_config(state(), hocon:config(), hocon:config()) -> state(). -update_config(State, Conf, OldConf) -> +update_config(State, NewConfig, OldConfig) -> update_config( - maps:get(enable, Conf), - maps:get(enable, OldConf), + maps:get(enable, NewConfig), + maps:get(enable, OldConfig), State, - Conf, - OldConf + NewConfig, + OldConfig ). -spec update_config(boolean(), boolean(), state(), hocon:config(), hocon:config()) -> state(). update_config(false, _, State, _, _) -> disable_retainer(State); -update_config(true, false, State, NewConf, _) -> - enable_retainer(State, NewConf); +update_config(true, false, State, NewConfig, _) -> + enable_retainer(State, NewConfig, enabled_backend_config(NewConfig)); update_config( true, true, #{clear_timer := ClearTimer, context := Context} = State, - NewConf, - OldConf + NewConfig, + OldConfig ) -> - #{ - backend := #{ - type := BackendType - } = NewBackendConfig, - msg_clear_interval := ClearInterval - } = NewConf, + #{msg_clear_interval := ClearInterval} = NewConfig, + OldBackendConfig = enabled_backend_config(OldConfig), + NewBackendConfig = enabled_backend_config(NewConfig), + OldMod = config_backend_module(OldBackendConfig), + NewMod = config_backend_module(NewBackendConfig), - #{ - backend := #{ - type := OldBackendType - } - } = OldConf, - SameBackendType = BackendType =:= OldBackendType, - Mod = get_backend_module(), - case SameBackendType andalso ok =:= Mod:update(Context, NewBackendConfig) of + SameBackendType = NewMod =:= OldMod, + case SameBackendType andalso ok =:= OldMod:update(Context, NewBackendConfig) of true -> State#{ clear_timer := check_timer( @@ -359,18 +405,16 @@ update_config( }; false -> State2 = disable_retainer(State), - enable_retainer(State2, NewConf) + enable_retainer(State2, NewConfig, NewBackendConfig) end. --spec enable_retainer(state(), hocon:config()) -> state(). +-spec enable_retainer(state(), hocon:config(), hocon:config()) -> state(). enable_retainer( State, - #{ - msg_clear_interval := ClearInterval, - backend := BackendCfg - } + #{msg_clear_interval := ClearInterval} = _RetainerConfig, + BackendConfig ) -> - Context = create(BackendCfg), + Context = create(BackendConfig), ok = load(Context), State#{ enable := true, @@ -415,20 +459,43 @@ check_timer(Timer, undefined, _) -> check_timer(Timer, _, _) -> Timer. --spec get_backend_module() -> backend(). -get_backend_module() -> - case emqx:get_config([retainer, backend]) of - #{type := built_in_database} -> emqx_retainer_mnesia; - #{type := Backend} -> Backend +-spec enabled_backend_config(hocon:config()) -> hocon:config() | no_return(). +enabled_backend_config(#{backend := Backend, external_backends := ExternalBackends} = Config) -> + AllBackends = [Backend | maps:values(ExternalBackends)], + case lists:search(fun(#{enable := Enable}) -> Enable end, AllBackends) of + {value, EnabledBackend} -> EnabledBackend; + false -> error({no_enabled_backend, Config}) end. -create(#{type := built_in_database} = Cfg) -> - Mod = get_backend_module(), - Mod:create(Cfg). +-spec config_backend_module(hocon:config()) -> module(). +config_backend_module(Config) -> + case Config of + #{type := built_in_database} -> emqx_retainer_mnesia; + #{module := Module} -> Module + end. + +-spec backend_module(context()) -> module(). +backend_module(#{module := Module}) -> Module. + +-spec backend_state(context()) -> backend_state(). +backend_state(#{state := State}) -> State. + +-spec backend_module() -> module() | no_return(). +backend_module() -> + Config = enabled_backend_config(emqx:get_config([retainer])), + config_backend_module(Config). + +-spec create(hocon:config()) -> context(). +create(Cfg) -> + Mod = config_backend_module(Cfg), + #{ + module => Mod, + state => Mod:create(Cfg) + }. -spec close(context()) -> ok | {error, term()}. close(Context) -> - Mod = get_backend_module(), + Mod = backend_module(Context), Mod:close(Context). -spec load(context()) -> ok. diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index 6d5eee477..dad4c73d8 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -185,11 +185,11 @@ lookup_retained(get, #{query_string := Qs}) -> Page = maps:get(<<"page">>, Qs, 1), Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()), Topic = maps:get(<<"topic">>, Qs, undefined), - {ok, HasNext, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit), + {ok, HasNext, Msgs} = emqx_retainer:page_read(Topic, Page, Limit), Meta = case Topic of undefined -> - #{count => emqx_retainer_mnesia:size(?TAB_MESSAGE)}; + #{count => emqx_retainer:retained_count()}; _ -> #{} end, @@ -205,7 +205,7 @@ lookup_retained(get, #{query_string := Qs}) -> with_topic(get, #{bindings := Bindings}) -> Topic = maps:get(topic, Bindings), - {ok, _, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1), + {ok, _, Msgs} = emqx_retainer:page_read(Topic, 1, 1), case Msgs of [H | _] -> {200, format_detail_message(H)}; @@ -217,14 +217,14 @@ with_topic(get, #{bindings := Bindings}) -> end; with_topic(delete, #{bindings := Bindings}) -> Topic = maps:get(topic, Bindings), - case emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1) of + case emqx_retainer:page_read(Topic, 1, 1) of {ok, _, []} -> {404, #{ code => <<"NOT_FOUND">>, message => <<"Viewed message doesn't exist">> }}; {ok, _, _} -> - emqx_retainer_mnesia:delete_message(undefined, Topic), + emqx_retainer:delete(Topic), {204} end. @@ -265,8 +265,8 @@ to_bin_string(Data) -> list_to_binary(io_lib:format("~p", [Data])). check_backend(Type, Params, Cont) -> - case emqx:get_config([retainer, backend, type]) of - built_in_database -> + case emqx_retainer:backend_module() of + emqx_retainer_mnesia -> Cont(Type, Params); _ -> {400, 'BAD_REQUEST', <<"This API only support built in database">>} diff --git a/apps/emqx_retainer/src/emqx_retainer_app.erl b/apps/emqx_retainer/src/emqx_retainer_app.erl index f76978042..58c74bbae 100644 --- a/apps/emqx_retainer/src/emqx_retainer_app.erl +++ b/apps/emqx_retainer/src/emqx_retainer_app.erl @@ -26,12 +26,12 @@ ]). start(_Type, _Args) -> - ok = emqx_retainer_mnesia_cli:load(), + ok = emqx_retainer_cli:load(), init_bucket(), emqx_retainer_sup:start_link(). stop(_State) -> - ok = emqx_retainer_mnesia_cli:unload(), + ok = emqx_retainer_cli:unload(), delete_bucket(), ok. diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl b/apps/emqx_retainer/src/emqx_retainer_cli.erl similarity index 60% rename from apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl rename to apps/emqx_retainer/src/emqx_retainer_cli.erl index e1f35a454..4c8731566 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl +++ b/apps/emqx_retainer/src/emqx_retainer_cli.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_retainer_mnesia_cli). +-module(emqx_retainer_cli). -include_lib("emqx/include/logger.hrl"). @@ -32,36 +32,52 @@ load() -> ok = emqx_ctl:register_command(retainer, {?MODULE, retainer}, []). retainer(["info"]) -> - count(); + if_enabled(fun() -> + count() + end); retainer(["topics"]) -> - topic(1, 1000); + if_enabled(fun() -> + topic(1, 1000) + end); retainer(["topics", Start, Len]) -> - topic(list_to_integer(Start), list_to_integer(Len)); + if_enabled(fun() -> + topic(list_to_integer(Start), list_to_integer(Len)) + end); retainer(["clean", Topic]) -> - emqx_retainer:delete(list_to_binary(Topic)); + if_enabled(fun() -> + emqx_retainer:delete(list_to_binary(Topic)) + end); retainer(["clean"]) -> - emqx_retainer:clean(); + if_enabled(fun() -> + emqx_retainer:clean() + end); retainer(["reindex", "status"]) -> - case emqx_retainer_mnesia:reindex_status() of - true -> - ?PRINT_MSG("Reindexing is in progress~n"); - false -> - ?PRINT_MSG("Reindexing is not running~n") - end; + if_mnesia_enabled(fun() -> + case emqx_retainer_mnesia:reindex_status() of + true -> + ?PRINT_MSG("Reindexing is in progress~n"); + false -> + ?PRINT_MSG("Reindexing is not running~n") + end + end); retainer(["reindex", "start"]) -> - retainer(["reindex", "start", "false"]); + if_mnesia_enabled(fun() -> + retainer(["reindex", "start", "false"]) + end); retainer(["reindex", "start", ForceParam]) -> - case mria_rlog:role() of - core -> - Force = - case ForceParam of - "true" -> true; - _ -> false - end, - do_reindex(Force); - replicant -> - ?PRINT_MSG("Can't run reindex on a replicant node") - end; + if_mnesia_enabled(fun() -> + case mria_rlog:role() of + core -> + Force = + case ForceParam of + "true" -> true; + _ -> false + end, + do_reindex(Force); + replicant -> + ?PRINT_MSG("Can't run reindex on a replicant node") + end + end); retainer(_) -> emqx_ctl:usage( [ @@ -71,10 +87,12 @@ retainer(_) -> "Show topics of retained messages by the specified range"}, {"retainer clean", "Clean all retained messages"}, {"retainer clean ", "Clean retained messages by the specified topic filter"}, - {"retainer reindex status", "Show reindex status"}, + {"retainer reindex status", + "Show reindex status.\nOnly available for built-in backend."}, {"retainer reindex start [force]", "Generate new retainer topic indices from config settings.\n" - "Pass true as to ignore previously started reindexing"} + "Pass true as to ignore previously started reindexing.\n" + "Only available for built-in backend."} ] ). @@ -107,6 +125,23 @@ count() -> topic(Start, Len) -> count(), - Topics = lists:sublist(emqx_retainer_mnesia:topics(), Start, Len), - [?PRINT("~ts~n", [I]) || I <- Topics], + {ok, _HasNext, Messages} = emqx_retainer:page_read(<<"#">>, Start, Len), + [?PRINT("~ts~n", [emqx_message:topic(M)]) || M <- Messages], ok. + +if_enabled(Fun) -> + case emqx_retainer:enabled() of + true -> Fun(); + false -> ?PRINT_MSG("Retainer is not enabled~n") + end. + +if_mnesia_backend(Fun) -> + case emqx_retainer:backend_module() of + emqx_retainer_mnesia -> Fun(); + _ -> ?PRINT_MSG("Command only applicable for builtin backend~n") + end. + +if_mnesia_enabled(Fun) -> + if_enabled(fun() -> + if_mnesia_backend(Fun) + end). diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index feef32393..e918d8d52 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -236,7 +236,7 @@ cast(Msg) -> -spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}. dispatch(Context, Pid, Topic, Cursor, Limiter) -> - Mod = emqx_retainer:get_backend_module(), + Mod = emqx_retainer:backend_module(Context), case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of false -> {ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]), diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 99fe64dcb..bdc1f2c67 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -51,10 +51,8 @@ -export([reindex/2, reindex_status/0]). --ifdef(TEST). -export([populate_index_meta/0]). -export([reindex/3]). --endif. -record(retained_message, {topic, msg, expiry_time}). -record(retained_index, {key, expiry_time}). @@ -79,7 +77,7 @@ topics() -> %% emqx_retainer callbacks %%-------------------------------------------------------------------- -create(#{storage_type := StorageType}) -> +create(#{storage_type := StorageType, max_retained_messages := MaxRetainedMessages} = Config) -> ok = create_table( ?TAB_INDEX_META, retained_index_meta, @@ -87,7 +85,7 @@ create(#{storage_type := StorageType}) -> set, StorageType ), - ok = populate_index_meta(), + ok = populate_index_meta(Config), ok = create_table( ?TAB_MESSAGE, retained_message, @@ -102,8 +100,7 @@ create(#{storage_type := StorageType}) -> ordered_set, StorageType ), - %% The context is not used by this backend - #{storage_type => StorageType}. + #{storage_type => StorageType, max_retained_messages => MaxRetainedMessages}. create_table(Table, RecordName, Attributes, Type, StorageType) -> Copies = @@ -139,17 +136,15 @@ create_table(Table, RecordName, Attributes, Type, StorageType) -> ok end. -update(#{storage_type := StorageType}, #{storage_type := StorageType}) -> - ok; -update(_Context, _NewConfig) -> +update(_State, _NewConfig) -> need_recreate. -close(_Context) -> ok. +close(_State) -> ok. -store_retained(_Context, Msg = #message{topic = Topic}) -> +store_retained(State, Msg = #message{topic = Topic}) -> ExpiryTime = emqx_retainer:get_expiry_time(Msg), Tokens = topic_to_tokens(Topic), - case is_table_full() andalso is_new_topic(Tokens) of + case is_table_full(State) andalso is_new_topic(Tokens) of true -> ?SLOG(error, #{ msg => "failed_to_retain_message", @@ -182,7 +177,7 @@ clear_expired() -> QC = qlc:cursor(QH), clear_batch(dirty_indices(write), QC). -delete_message(_Context, Topic) -> +delete_message(_State, Topic) -> Tokens = topic_to_tokens(Topic), case emqx_topic:wildcard(Topic) of false -> @@ -198,10 +193,10 @@ delete_message(_Context, Topic) -> ) end. -read_message(_Context, Topic) -> +read_message(_State, Topic) -> {ok, read_messages(Topic)}. -match_messages(_Context, Topic, undefined) -> +match_messages(_State, Topic, undefined) -> Tokens = topic_to_tokens(Topic), Now = erlang:system_time(millisecond), QH = msg_table(search_table(Tokens, Now)), @@ -212,7 +207,7 @@ match_messages(_Context, Topic, undefined) -> Cursor = qlc:cursor(QH), match_messages(undefined, Topic, {Cursor, BatchNum}) end; -match_messages(_Context, _Topic, {Cursor, BatchNum}) -> +match_messages(_State, _Topic, {Cursor, BatchNum}) -> case qlc_next_answers(Cursor, BatchNum) of {closed, Rows} -> {ok, Rows, undefined}; @@ -220,7 +215,7 @@ match_messages(_Context, _Topic, {Cursor, BatchNum}) -> {ok, Rows, {Cursor, BatchNum}} end. -page_read(_Context, Topic, Page, Limit) -> +page_read(_State, Topic, Page, Limit) -> Now = erlang:system_time(millisecond), QH = case Topic of @@ -263,7 +258,8 @@ size(_) -> table_size(). reindex(Force, StatusFun) -> - reindex(config_indices(), Force, StatusFun). + Config = emqx:get_config([retainer, backend]), + reindex(config_indices(Config), Force, StatusFun). reindex_status() -> case mnesia:dirty_read(?TAB_INDEX_META, ?META_KEY) of @@ -438,9 +434,8 @@ make_index_match_spec(Index, Tokens, NowMs) -> MsHd = #retained_index{key = Cond, expiry_time = '$3'}, {[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}], IsExact}. -is_table_full() -> - Limit = emqx:get_config([retainer, backend, max_retained_messages]), - Limit > 0 andalso (table_size() >= Limit). +is_table_full(#{max_retained_messages := MaxRetainedMessages} = _State) -> + MaxRetainedMessages > 0 andalso (table_size() >= MaxRetainedMessages). is_new_topic(Tokens) -> case mnesia:dirty_read(?TAB_MESSAGE, Tokens) of @@ -453,11 +448,15 @@ is_new_topic(Tokens) -> table_size() -> mnesia:table_info(?TAB_MESSAGE, size). -config_indices() -> - lists:sort(emqx_config:get([retainer, backend, index_specs])). +config_indices(#{index_specs := IndexSpecs}) -> + IndexSpecs. populate_index_meta() -> - ConfigIndices = config_indices(), + Config = emqx:get_config([retainer, backend]), + populate_index_meta(Config). + +populate_index_meta(Config) -> + ConfigIndices = config_indices(Config), case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_populate_index_meta/1, [ConfigIndices]) of {atomic, ok} -> ok; diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 4158e2e95..b89ee7db6 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -36,7 +36,8 @@ roots() -> [ {"retainer", hoconsc:mk(hoconsc:ref(?MODULE, "retainer"), #{ - converter => fun retainer_converter/2 + converter => fun retainer_converter/2, + validator => fun validate_backends_enabled/1 })} ]. @@ -86,11 +87,29 @@ fields("retainer") -> aliases => [deliver_rate] } )}, - {backend, backend_config()} + {backend, backend_config()}, + {external_backends, + ?HOCON( + hoconsc:ref(?MODULE, external_backends), + #{ + desc => ?DESC(backends), + required => false, + default => #{}, + importance => ?IMPORTANCE_HIDDEN + } + )} ]; fields(mnesia_config) -> [ - {type, sc(built_in_database, mnesia_config_type, built_in_database)}, + {type, + ?HOCON( + built_in_database, + #{ + desc => ?DESC(mnesia_config_type), + required => false, + default => built_in_database + } + )}, {storage_type, sc( hoconsc:enum([ram, disc]), @@ -103,7 +122,13 @@ fields(mnesia_config) -> max_retained_messages, 0 )}, - {index_specs, fun retainer_indices/1} + {index_specs, fun retainer_indices/1}, + {enable, + ?HOCON(boolean(), #{ + desc => ?DESC(mnesia_enable), + required => false, + default => true + })} ]; fields(flow_control) -> [ @@ -125,7 +150,9 @@ fields(flow_control) -> batch_deliver_limiter, undefined )} - ]. + ]; +fields(external_backends) -> + emqx_schema_hooks:injection_point('retainer.external_backends'). desc("retainer") -> "Configuration related to handling `PUBLISH` packets with a `retain` flag set to 1."; @@ -140,9 +167,6 @@ desc(_) -> %% Internal functions %%-------------------------------------------------------------------- -%%sc(Type, DescId) -> -%% hoconsc:mk(Type, #{desc => ?DESC(DescId)}). - sc(Type, DescId, Default) -> sc(Type, DescId, Default, ?DEFAULT_IMPORTANCE). sc(Type, DescId, Default, Importance) -> @@ -214,3 +238,16 @@ retainer_converter(#{<<"deliver_rate">> := Delivery} = Conf, Opts) -> retainer_converter(Conf1#{<<"delivery_rate">> => Delivery}, Opts); retainer_converter(Conf, _Opts) -> Conf. + +validate_backends_enabled(Config) -> + BuiltInBackend = maps:get(<<"backend">>, Config, #{}), + ExternalBackends = maps:values(maps:get(<<"external_backends">>, Config, #{})), + Enabled = lists:filter(fun(#{<<"enable">> := E}) -> E end, [BuiltInBackend | ExternalBackends]), + case Enabled of + [#{}] -> + ok; + _Conflicts = [_ | _] -> + {error, multiple_enabled_backends}; + _None = [] -> + {error, no_enabled_backend} + end. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 5dd8a82fe..f42255832 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -644,7 +644,7 @@ t_reindex(_) -> t_get_basic_usage_info(_Config) -> ?assertEqual(#{retained_messages => 0}, emqx_retainer:get_basic_usage_info()), - Context = undefined, + Context = emqx_retainer:context(), lists:foreach( fun(N) -> Num = integer_to_binary(N), @@ -788,8 +788,8 @@ t_compatibility_for_deliver_rate(_) -> ). t_update_config(_) -> - OldConf = emqx_config:get([retainer]), - NewConf = emqx_utils_maps:deep_put([backend, storage_type], OldConf, disk), + OldConf = emqx_config:get_raw([retainer]), + NewConf = emqx_utils_maps:deep_put([<<"backend">>, <<"storage_type">>], OldConf, <<"disk">>), emqx_retainer:update_config(NewConf). %%-------------------------------------------------------------------- @@ -836,7 +836,7 @@ with_conf(ConfMod, Case) -> emqx_retainer:update_config(NewConf), try Case(), - emqx_retainer:update_config(Conf) + {ok, _} = emqx_retainer:update_config(Conf) catch Type:Error:Strace -> emqx_retainer:update_config(Conf), diff --git a/apps/emqx_retainer/test/emqx_retainer_backend_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_backend_SUITE.erl new file mode 100644 index 000000000..b276189b3 --- /dev/null +++ b/apps/emqx_retainer/test/emqx_retainer_backend_SUITE.erl @@ -0,0 +1,104 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_retainer_backend_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- + +all() -> emqx_common_test_helpers:all(?MODULE). + +-define(BASE_CONF, #{ + <<"retainer">> => + #{ + <<"enable">> => true, + <<"backend">> => + #{ + <<"type">> => <<"built_in_database">>, + <<"storage_type">> => <<"ram">>, + <<"max_retained_messages">> => 0 + } + } +}). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, #{ + before_start => fun() -> + ok = emqx_schema_hooks:inject_from_modules([emqx_retainer_dummy]) + end + }}, + emqx_conf, + {emqx_retainer, ?BASE_CONF} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{suite_apps, Apps} | Config]. + +end_per_suite(Config) -> + emqx_cth_suite:stop(?config(suite_apps, Config)). + +%%-------------------------------------------------------------------- +%% Test Cases +%%-------------------------------------------------------------------- + +t_external_backend(_Config) -> + {ok, _} = emqx_retainer:update_config(#{ + <<"enable">> => true, + <<"backend">> => #{ + <<"enable">> => false + }, + <<"external_backends">> => + #{ + <<"dummy">> => #{<<"enable">> => true} + } + }), + ?assertMatch( + ok, + emqx_retainer:clean() + ), + ?assertMatch( + ok, + emqx_retainer:delete(<<"topic">>) + ), + ?assertMatch( + {ok, []}, + emqx_retainer:read_message(<<"topic">>) + ), + ?assertMatch( + {ok, false, []}, + emqx_retainer:page_read(<<"topic">>, 0, 10) + ), + ?assertEqual( + 0, + emqx_retainer:retained_count() + ), + ?assertEqual( + emqx_retainer_dummy, + emqx_retainer:backend_module() + ), + ?assertEqual( + true, + emqx_retainer:enabled() + ). diff --git a/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl index db69abff1..7ea32ac71 100644 --- a/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl @@ -38,22 +38,22 @@ end_per_suite(Config) -> emqx_cth_suite:stop(?config(suite_apps, Config)). t_reindex_status(_Config) -> - ok = emqx_retainer_mnesia_cli:retainer(["reindex", "status"]). + ok = emqx_retainer_cli:retainer(["reindex", "status"]). t_info(_Config) -> - ok = emqx_retainer_mnesia_cli:retainer(["info"]). + ok = emqx_retainer_cli:retainer(["info"]). t_topics(_Config) -> - ok = emqx_retainer_mnesia_cli:retainer(["topics"]). + ok = emqx_retainer_cli:retainer(["topics"]). t_topics_with_len(_Config) -> - ok = emqx_retainer_mnesia_cli:retainer(["topics", "100", "200"]). + ok = emqx_retainer_cli:retainer(["topics", "100", "200"]). t_clean(_Config) -> - ok = emqx_retainer_mnesia_cli:retainer(["clean"]). + ok = emqx_retainer_cli:retainer(["clean"]). t_topic(_Config) -> - ok = emqx_retainer_mnesia_cli:retainer(["clean", "foo/bar"]). + ok = emqx_retainer_cli:retainer(["clean", "foo/bar"]). t_reindex(_Config) -> {ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), @@ -84,6 +84,6 @@ t_reindex(_Config) -> ), emqx_config:put([retainer, backend, index_specs], [[4, 5]]), - ok = emqx_retainer_mnesia_cli:retainer(["reindex", "start"]), + ok = emqx_retainer_cli:retainer(["reindex", "start"]), ?assertEqual(1000, mnesia:table_info(?TAB_INDEX, size)). diff --git a/apps/emqx_retainer/test/emqx_retainer_dummy.erl b/apps/emqx_retainer/test/emqx_retainer_dummy.erl new file mode 100644 index 000000000..e72a2d0cf --- /dev/null +++ b/apps/emqx_retainer/test/emqx_retainer_dummy.erl @@ -0,0 +1,98 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_retainer_dummy). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-behaviour(emqx_retainer). + +-export([ + create/1, + update/2, + close/1, + delete_message/2, + store_retained/2, + read_message/2, + page_read/4, + match_messages/3, + clear_expired/1, + clean/1, + size/1 +]). + +-behaviour(emqx_schema_hooks). + +-export([ + fields/1, + injected_fields/0 +]). + +injected_fields() -> + #{ + 'retainer.external_backends' => external_backend_fields() + }. + +create(_Config) -> #{}. + +update(_Context, _Config) -> ok. + +close(_Context) -> ok. + +delete_message(_Context, _Topic) -> ok. + +store_retained(_Context, _Message) -> ok. + +read_message(_Context, _Topic) -> {ok, []}. + +page_read(_Context, _Topic, _Offset, _Limit) -> {ok, false, []}. + +match_messages(_Context, _Topic, _Cursor) -> {ok, [], 0}. + +clear_expired(_Context) -> ok. + +clean(_Context) -> ok. + +size(_Context) -> 0. + +external_backend_fields() -> + [ + {dummy, hoconsc:ref(?MODULE, dummy)} + ]. + +fields(dummy) -> + [ + {module, + hoconsc:mk( + emqx_retainer_dummy, + #{ + desc => <<"dummy backend mod">>, + required => false, + default => <<"emqx_retainer_dummy">>, + importance => ?IMPORTANCE_HIDDEN + } + )}, + {enable, + hoconsc:mk( + boolean(), + #{ + desc => <<"enable dummy backend">>, + required => true, + default => false + } + )} + ]. diff --git a/rel/i18n/emqx_retainer_schema.hocon b/rel/i18n/emqx_retainer_schema.hocon index e6679dbf7..b1b570285 100644 --- a/rel/i18n/emqx_retainer_schema.hocon +++ b/rel/i18n/emqx_retainer_schema.hocon @@ -33,6 +33,9 @@ mnesia_config_storage_type.desc: mnesia_config_type.desc: """Backend type.""" +mnesia_enable.desc: +"""Enable built-in Mnesia backend.""" + msg_clear_interval.desc: """Interval for EMQX to scan expired messages and delete them. Never scan if the value is 0."""