From aa2ee9c4096e4cdb186855fe102679e57354454e Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 22 Feb 2024 19:26:52 +0300 Subject: [PATCH] chore(retainer): add backend creation to the behaviour --- apps/emqx_retainer/src/emqx_retainer.erl | 43 +++++++------------ .../src/emqx_retainer_mnesia.erl | 24 ++++++----- 2 files changed, 30 insertions(+), 37 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 39a9de550..dd31d4c78 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -63,7 +63,6 @@ -type state() :: #{ enable := boolean(), - context_id := non_neg_integer(), context := undefined | context(), clear_timer := undefined | reference() }. @@ -72,8 +71,8 @@ -define(DEF_EXPIRY_INTERVAL, 0). -define(MAX_PAYLOAD_SIZE_CONFIG_PATH, [retainer, max_payload_size]). --define(CAST(Msg), gen_server:cast(?MODULE, Msg)). - +-callback create(map()) -> context(). +-callback close(context()) -> ok. -callback delete_message(context(), topic()) -> ok. -callback store_retained(context(), message()) -> ok. -callback read_message(context(), topic()) -> {ok, list()}. @@ -264,15 +263,10 @@ code_change(_OldVsn, State, _Extra) -> new_state() -> #{ enable => false, - context_id => 0, context => undefined, clear_timer => undefined }. --spec new_context(pos_integer()) -> context(). -new_context(Id) -> - #{context_id => Id}. - payload_size_limit() -> emqx_conf:get(?MAX_PAYLOAD_SIZE_CONFIG_PATH, ?DEF_MAX_PAYLOAD_SIZE). @@ -361,18 +355,16 @@ update_config( -spec enable_retainer(state(), hocon:config()) -> state(). enable_retainer( - #{context_id := ContextId} = State, + State, #{ msg_clear_interval := ClearInterval, backend := BackendCfg } ) -> - NewContextId = ContextId + 1, - Context = create_resource(new_context(NewContextId), BackendCfg), + Context = create(BackendCfg), load(Context), State#{ enable := true, - context_id := NewContextId, context := Context, clear_timer := add_timer(ClearInterval, clear_expired) }. @@ -385,7 +377,7 @@ disable_retainer( } = State ) -> unload(), - ok = close_resource(Context), + ok = close(Context), State#{ enable := false, clear_timer := stop_timer(ClearTimer) @@ -416,22 +408,19 @@ check_timer(Timer, _, _) -> -spec get_backend_module() -> backend(). get_backend_module() -> - ModName = - case emqx:get_config([retainer, backend]) of - #{type := built_in_database} -> mnesia; - #{type := Backend} -> Backend - end, - erlang:list_to_existing_atom(io_lib:format("~ts_~ts", [?APP, ModName])). + case emqx:get_config([retainer, backend]) of + #{type := built_in_database} -> emqx_retainer_mnesia; + #{type := Backend} -> Backend + end. -create_resource(Context, #{type := built_in_database} = Cfg) -> - emqx_retainer_mnesia:create_resource(Cfg), - Context. +create(#{type := built_in_database} = Cfg) -> + Mod = get_backend_module(), + Mod:create(Cfg). --spec close_resource(context()) -> ok | {error, term()}. -close_resource(#{resource_id := ResourceId}) -> - emqx_resource:stop(ResourceId); -close_resource(_) -> - ok. +-spec close(context()) -> ok | {error, term()}. +close(Context) -> + Mod = get_backend_module(), + Mod:close(Context). -spec load(context()) -> ok. load(Context) -> diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 4a685dd23..6725c15cb 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -26,6 +26,8 @@ %% emqx_retainer callbacks -export([ + create/1, + close/1, delete_message/2, store_retained/2, read_message/2, @@ -46,8 +48,6 @@ %% Management API: -export([topics/0]). --export([create_resource/1]). - -export([reindex/2, reindex_status/0]). -ifdef(TEST). @@ -78,7 +78,7 @@ topics() -> %% emqx_retainer callbacks %%-------------------------------------------------------------------- -create_resource(#{storage_type := StorageType}) -> +create(#{storage_type := StorageType}) -> ok = create_table( ?TAB_INDEX_META, retained_index_meta, @@ -100,7 +100,9 @@ create_resource(#{storage_type := StorageType}) -> record_info(fields, retained_index), ordered_set, StorageType - ). + ), + %% The context is not used by this backend + #{}. create_table(Table, RecordName, Attributes, Type, StorageType) -> Copies = @@ -136,7 +138,9 @@ create_table(Table, RecordName, Attributes, Type, StorageType) -> ok end. -store_retained(_, Msg = #message{topic = Topic}) -> +close(_Context) -> ok. + +store_retained(_Context, Msg = #message{topic = Topic}) -> ExpiryTime = emqx_retainer:get_expiry_time(Msg), Tokens = topic_to_tokens(Topic), case is_table_full() andalso is_new_topic(Tokens) of @@ -172,7 +176,7 @@ clear_expired() -> QC = qlc:cursor(QH), clear_batch(dirty_indices(write), QC). -delete_message(_, Topic) -> +delete_message(_Context, Topic) -> Tokens = topic_to_tokens(Topic), case emqx_topic:wildcard(Topic) of false -> @@ -188,10 +192,10 @@ delete_message(_, Topic) -> ) end. -read_message(_, Topic) -> +read_message(_Context, Topic) -> {ok, read_messages(Topic)}. -match_messages(_, Topic, undefined) -> +match_messages(_Context, Topic, undefined) -> Tokens = topic_to_tokens(Topic), Now = erlang:system_time(millisecond), QH = msg_table(search_table(Tokens, Now)), @@ -202,7 +206,7 @@ match_messages(_, Topic, undefined) -> Cursor = qlc:cursor(QH), match_messages(undefined, Topic, {Cursor, BatchNum}) end; -match_messages(_, _Topic, {Cursor, BatchNum}) -> +match_messages(_Context, _Topic, {Cursor, BatchNum}) -> case qlc_next_answers(Cursor, BatchNum) of {closed, Rows} -> {ok, Rows, undefined}; @@ -210,7 +214,7 @@ match_messages(_, _Topic, {Cursor, BatchNum}) -> {ok, Rows, {Cursor, BatchNum}} end. -page_read(_, Topic, Page, Limit) -> +page_read(_Context, Topic, Page, Limit) -> Now = erlang:system_time(millisecond), QH = case Topic of