From 6c73679b02fb3e9cf4cd78b31917acfedf2f5590 Mon Sep 17 00:00:00 2001 From: lafirest Date: Tue, 10 Aug 2021 16:32:07 +0800 Subject: [PATCH] chore(emqx_retainer): simplified the retainer config --- apps/emqx_retainer/etc/emqx_retainer.conf | 27 +++++++++---------- apps/emqx_retainer/src/emqx_retainer.erl | 25 ++++++++++------- .../src/emqx_retainer_mnesia.erl | 7 +---- .../src/emqx_retainer_schema.erl | 19 +++++-------- .../test/emqx_retainer_SUITE.erl | 9 +++---- 5 files changed, 38 insertions(+), 49 deletions(-) diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 3a96909ff..4a4308b2e 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -61,21 +61,18 @@ emqx_retainer: { ## Storage connect parameters ## - ## Value: mnesia + ## Value: built_in_database ## - connector: - [ - { - type: mnesia - config: { - ## storage_type: ram | disc | disc_only - storage_type: ram + config: { - ## Maximum number of retained messages. 0 means no limit. - ## - ## Value: Number >= 0 - max_retained_messages: 0 - } - } - ] + type: built_in_database + + ## storage_type: ram | disc | disc_only + storage_type: ram + + ## Maximum number of retained messages. 0 means no limit. + ## + ## Value: Number >= 0 + max_retained_messages: 0 + } } diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 86cf98af6..acd119dee 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -338,16 +338,16 @@ get_msg_deliver_quota() -> update_config(#{clear_timer := ClearTimer, release_quota_timer := QuotaTimer} = State, Conf) -> #{enable := Enable, - connector := [Connector | _], + config := Config, flow_control := #{quota_release_interval := QuotaInterval}, msg_clear_interval := ClearInterval} = Conf, - #{connector := [OldConnector | _]} = emqx_config:get([?APP]), + #{config := OldConfig} = emqx_config:get([?APP]), case Enable of true -> - StorageType = maps:get(type, Connector), - OldStrorageType = maps:get(type, OldConnector), + StorageType = maps:get(type, Config), + OldStrorageType = maps:get(type, OldConfig), case OldStrorageType of StorageType -> State#{clear_timer := check_timer(ClearTimer, @@ -368,9 +368,9 @@ update_config(#{clear_timer := ClearTimer, enable_retainer(#{context_id := ContextId} = State, #{msg_clear_interval := ClearInterval, flow_control := #{quota_release_interval := ReleaseInterval}, - connector := [Connector | _]}) -> + config := Config}) -> NewContextId = ContextId + 1, - Context = create_resource(new_context(NewContextId), Connector), + Context = create_resource(new_context(NewContextId), Config), load(Context), State#{enable := true, context_id := NewContextId, @@ -416,14 +416,19 @@ check_timer(Timer, _, _) -> -spec get_backend_module() -> backend(). get_backend_module() -> - [#{type := Backend} | _] = emqx_config:get([?APP, connector]), - erlang:list_to_existing_atom(io_lib:format("~s_~s", [?APP, Backend])). + #{type := Backend} = emqx_config:get([?APP, config]), + ModName = if Backend =:= built_in_database -> + mnesia; + true -> + Backend + end, + erlang:list_to_existing_atom(io_lib:format("~s_~s", [?APP, ModName])). -create_resource(Context, #{type := mnesia, config := Cfg}) -> +create_resource(Context, #{type := built_in_database} = Cfg) -> emqx_retainer_mnesia:create_resource(Cfg), Context; -create_resource(Context, #{type := DB, config := Config}) -> +create_resource(Context, #{type := DB} = Config) -> ResourceID = erlang:iolist_to_binary([io_lib:format("~s_~s", [?APP, DB])]), case emqx_resource:create( ResourceID, diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index dcfeffa8c..34e3e49db 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -33,8 +33,6 @@ -export([create_resource/1]). --define(DEF_MAX_RETAINED_MESSAGES, 0). - -rlog_shard({?RETAINER_SHARD, ?TAB}). -record(retained, {topic, msg, expiry_time}). @@ -229,10 +227,7 @@ make_match_spec(Filter) -> -spec is_table_full() -> boolean(). is_table_full() -> - [#{config := Cfg} | _] = emqx_config:get([?APP, connector]), - Limit = maps:get(max_retained_messages, - Cfg, - ?DEF_MAX_RETAINED_MESSAGES), + #{max_retained_messages := Limit} = emqx_config:get([?APP, config]), Limit > 0 andalso (table_size() >= Limit). -spec table_size() -> non_neg_integer(). diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 96cf80846..df31f647f 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -12,18 +12,14 @@ fields("emqx_retainer") -> [ {enable, t(boolean(), false)} , {msg_expiry_interval, t(emqx_schema:duration_ms(), "0s")} , {msg_clear_interval, t(emqx_schema:duration_ms(), "0s")} - , {connector, connector()} , {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))} , {max_payload_size, t(emqx_schema:bytesize(), "1MB")} + , {config, config()} ]; -fields(mnesia_connector) -> - [ {type, ?TYPE(hoconsc:union([mnesia]))} - , {config, ?TYPE(hoconsc:ref(?MODULE, mnesia_connector_cfg))} - ]; - -fields(mnesia_connector_cfg) -> - [ {storage_type, t(hoconsc:union([ram, disc, disc_only]), ram)} +fields(mnesia_config) -> + [ {type, ?TYPE(hoconsc:union([built_in_database]))} + , {storage_type, t(hoconsc:union([ram, disc, disc_only]), ram)} , {max_retained_messages, t(integer(), 0, fun is_pos_integer/1)} ]; @@ -43,11 +39,8 @@ t(Type, Default, Validator) -> hoconsc:t(Type, #{default => Default, validator => Validator}). -union_array(Item) when is_list(Item) -> - hoconsc:array(hoconsc:union(Item)). - is_pos_integer(V) -> V >= 0. -connector() -> - #{type => union_array([hoconsc:ref(?MODULE, mnesia_connector)])}. +config() -> + #{type => hoconsc:union([hoconsc:ref(?MODULE, mnesia_config)])}. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index de2481580..a2efd0357 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -67,14 +67,13 @@ new_emqx_retainer_conf() -> #{enable => true, msg_expiry_interval => 0, msg_clear_interval => 0, - connector => [#{type => mnesia, - config => - #{max_retained_messages => 0, - storage_type => ram}}], + config => #{type => built_in_database, + max_retained_messages => 0, + storage_type => ram}, flow_control => #{max_read_number => 0, msg_deliver_quota => 0, quota_release_interval => 0}, - max_payload_size => 1024 * 1024}. + max_payload_size => 1024 * 1024}. %%-------------------------------------------------------------------- %% Test Cases