From d94066a10a6256c6f98184c19829d4c19d2d3d6a Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 22 Feb 2024 22:32:12 +0300 Subject: [PATCH] chore(retainer): refactor types; remove hotwiring to builtin config --- apps/emqx_retainer/include/emqx_retainer.hrl | 22 ++----------- apps/emqx_retainer/src/emqx_retainer.erl | 33 ++++++++++++------- .../src/emqx_retainer_dispatcher.erl | 3 ++ .../src/emqx_retainer_mnesia.erl | 8 ++++- .../test/emqx_retainer_SUITE.erl | 7 ++-- .../test/emqx_telemetry_SUITE.erl | 2 +- 6 files changed, 40 insertions(+), 35 deletions(-) diff --git a/apps/emqx_retainer/include/emqx_retainer.hrl b/apps/emqx_retainer/include/emqx_retainer.hrl index 40aec6f55..e12f0571f 100644 --- a/apps/emqx_retainer/include/emqx_retainer.hrl +++ b/apps/emqx_retainer/include/emqx_retainer.hrl @@ -13,6 +13,8 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- +-ifndef(EMQX_RETAINER_HRL). +-define(EMQX_RETAINER_HRL, true). -include_lib("emqx/include/emqx.hrl"). @@ -22,22 +24,4 @@ -define(TAB_INDEX_META, emqx_retainer_index_meta). -define(RETAINER_SHARD, emqx_retainer_shard). --type topic() :: binary(). --type payload() :: binary(). --type message() :: #message{}. - --type context() :: #{ - context_id := pos_integer(), - atom() => term() -}. - --define(DELIVER_SEMAPHORE, deliver_remained_quota). --type semaphore() :: ?DELIVER_SEMAPHORE. --type cursor() :: undefined | term(). --type result() :: term(). - --define(SHARED_CONTEXT_TAB, emqx_retainer_ctx). --record(shared_context, {key :: atom(), value :: term()}). --type shared_context_key() :: ?DELIVER_SEMAPHORE. - --type backend() :: emqx_retainer_storage_mnesia. +-endif. diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index dd31d4c78..92846f5a4 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -58,6 +58,11 @@ code_change/3 ]). +-export_type([ + cursor/0, + context/0 +]). + %% exported for `emqx_telemetry' -export([get_basic_usage_info/0]). @@ -66,12 +71,19 @@ context := undefined | context(), clear_timer := undefined | reference() }. +-type context() :: term(). + +-type topic() :: emqx_types:topic(). +-type message() :: emqx_types:message(). +-type cursor() :: undefined | term(). +-type backend() :: emqx_retainer_mnesia | module(). -define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)). -define(DEF_EXPIRY_INTERVAL, 0). -define(MAX_PAYLOAD_SIZE_CONFIG_PATH, [retainer, max_payload_size]). -callback create(map()) -> context(). +-callback update(context(), map()) -> ok | need_recreate. -callback close(context()) -> ok. -callback delete_message(context(), topic()) -> ok. -callback store_retained(context(), message()) -> ok. @@ -180,8 +192,7 @@ stats_fun() -> -spec get_basic_usage_info() -> #{retained_messages => non_neg_integer()}. get_basic_usage_info() -> try - RetainedMessages = gen_server:call(?MODULE, retained_count), - #{retained_messages => RetainedMessages} + #{retained_messages => retained_count()} catch _:_ -> #{retained_messages => 0} @@ -319,27 +330,25 @@ update_config(true, false, State, NewConf, _) -> update_config( true, true, - #{clear_timer := ClearTimer} = State, + #{clear_timer := ClearTimer, context := Context} = State, NewConf, OldConf ) -> #{ backend := #{ - type := BackendType, - storage_type := StorageType - }, + type := BackendType + } = NewBackendConfig, msg_clear_interval := ClearInterval } = NewConf, #{ backend := #{ - type := OldBackendType, - storage_type := OldStorageType + type := OldBackendType } } = OldConf, SameBackendType = BackendType =:= OldBackendType, - SameStorageType = StorageType =:= OldStorageType, - case SameBackendType andalso SameStorageType of + Mod = get_backend_module(), + case SameBackendType andalso ok =:= Mod:update(Context, NewBackendConfig) of true -> State#{ clear_timer := check_timer( @@ -362,7 +371,7 @@ enable_retainer( } ) -> Context = create(BackendCfg), - load(Context), + ok = load(Context), State#{ enable := true, context := Context, @@ -376,7 +385,7 @@ disable_retainer( context := Context } = State ) -> - unload(), + ok = unload(), ok = close(Context), State#{ enable := false, diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index 656794827..feef32393 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -44,6 +44,9 @@ ]). -type limiter() :: emqx_htb_limiter:limiter(). +-type context() :: emqx_retainer:context(). +-type topic() :: emqx_types:topic(). +-type cursor() :: emqx_retainer:cursor(). -define(POOL, ?MODULE). diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 6725c15cb..99fe64dcb 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -27,6 +27,7 @@ %% emqx_retainer callbacks -export([ create/1, + update/2, close/1, delete_message/2, store_retained/2, @@ -102,7 +103,7 @@ create(#{storage_type := StorageType}) -> StorageType ), %% The context is not used by this backend - #{}. + #{storage_type => StorageType}. create_table(Table, RecordName, Attributes, Type, StorageType) -> Copies = @@ -138,6 +139,11 @@ create_table(Table, RecordName, Attributes, Type, StorageType) -> ok end. +update(#{storage_type := StorageType}, #{storage_type := StorageType}) -> + ok; +update(_Context, _NewConfig) -> + need_recreate. + close(_Context) -> ok. store_retained(_Context, Msg = #message{topic = Topic}) -> diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index cbb65436a..5dd8a82fe 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -19,8 +19,6 @@ -compile(export_all). -compile(nowarn_export_all). --define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). - -include("emqx_retainer.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -789,6 +787,11 @@ t_compatibility_for_deliver_rate(_) -> Parser(DeliveryInf) ). +t_update_config(_) -> + OldConf = emqx_config:get([retainer]), + NewConf = emqx_utils_maps:deep_put([backend, storage_type], OldConf, disk), + emqx_retainer:update_config(NewConf). + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl b/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl index 8831329c5..f7916265d 100644 --- a/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl @@ -621,7 +621,7 @@ mock_httpc() -> ). mock_advanced_mqtt_features() -> - Context = undefined, + Context = emqx_retainer:context(), lists:foreach( fun(N) -> Num = integer_to_binary(N),