Merge pull request #5446 from lafirest/chore/emqx_retainer

chore(emqx_retainer): simplified the retainer config
This commit is contained in:
lafirest 2021-08-11 09:46:21 +08:00 committed by GitHub
commit 69a202e4c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 38 additions and 49 deletions

View File

@ -61,21 +61,18 @@ emqx_retainer: {
## Storage connect parameters
##
## Value: mnesia
## Value: built_in_database
##
connector:
[
{
type: mnesia
config: {
## storage_type: ram | disc | disc_only
storage_type: ram
config: {
## Maximum number of retained messages. 0 means no limit.
##
## Value: Number >= 0
max_retained_messages: 0
}
}
]
type: built_in_database
## storage_type: ram | disc | disc_only
storage_type: ram
## Maximum number of retained messages. 0 means no limit.
##
## Value: Number >= 0
max_retained_messages: 0
}
}

View File

@ -338,16 +338,16 @@ get_msg_deliver_quota() ->
update_config(#{clear_timer := ClearTimer,
release_quota_timer := QuotaTimer} = State, Conf) ->
#{enable := Enable,
connector := [Connector | _],
config := Config,
flow_control := #{quota_release_interval := QuotaInterval},
msg_clear_interval := ClearInterval} = Conf,
#{connector := [OldConnector | _]} = emqx_config:get([?APP]),
#{config := OldConfig} = emqx_config:get([?APP]),
case Enable of
true ->
StorageType = maps:get(type, Connector),
OldStrorageType = maps:get(type, OldConnector),
StorageType = maps:get(type, Config),
OldStrorageType = maps:get(type, OldConfig),
case OldStrorageType of
StorageType ->
State#{clear_timer := check_timer(ClearTimer,
@ -368,9 +368,9 @@ update_config(#{clear_timer := ClearTimer,
enable_retainer(#{context_id := ContextId} = State,
#{msg_clear_interval := ClearInterval,
flow_control := #{quota_release_interval := ReleaseInterval},
connector := [Connector | _]}) ->
config := Config}) ->
NewContextId = ContextId + 1,
Context = create_resource(new_context(NewContextId), Connector),
Context = create_resource(new_context(NewContextId), Config),
load(Context),
State#{enable := true,
context_id := NewContextId,
@ -416,14 +416,19 @@ check_timer(Timer, _, _) ->
-spec get_backend_module() -> backend().
get_backend_module() ->
[#{type := Backend} | _] = emqx_config:get([?APP, connector]),
erlang:list_to_existing_atom(io_lib:format("~s_~s", [?APP, Backend])).
#{type := Backend} = emqx_config:get([?APP, config]),
ModName = if Backend =:= built_in_database ->
mnesia;
true ->
Backend
end,
erlang:list_to_existing_atom(io_lib:format("~s_~s", [?APP, ModName])).
create_resource(Context, #{type := mnesia, config := Cfg}) ->
create_resource(Context, #{type := built_in_database} = Cfg) ->
emqx_retainer_mnesia:create_resource(Cfg),
Context;
create_resource(Context, #{type := DB, config := Config}) ->
create_resource(Context, #{type := DB} = Config) ->
ResourceID = erlang:iolist_to_binary([io_lib:format("~s_~s", [?APP, DB])]),
case emqx_resource:create(
ResourceID,

View File

@ -33,8 +33,6 @@
-export([create_resource/1]).
-define(DEF_MAX_RETAINED_MESSAGES, 0).
-rlog_shard({?RETAINER_SHARD, ?TAB}).
-record(retained, {topic, msg, expiry_time}).
@ -229,10 +227,7 @@ make_match_spec(Filter) ->
-spec is_table_full() -> boolean().
is_table_full() ->
[#{config := Cfg} | _] = emqx_config:get([?APP, connector]),
Limit = maps:get(max_retained_messages,
Cfg,
?DEF_MAX_RETAINED_MESSAGES),
#{max_retained_messages := Limit} = emqx_config:get([?APP, config]),
Limit > 0 andalso (table_size() >= Limit).
-spec table_size() -> non_neg_integer().

View File

@ -12,18 +12,14 @@ fields("emqx_retainer") ->
[ {enable, t(boolean(), false)}
, {msg_expiry_interval, t(emqx_schema:duration_ms(), "0s")}
, {msg_clear_interval, t(emqx_schema:duration_ms(), "0s")}
, {connector, connector()}
, {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))}
, {max_payload_size, t(emqx_schema:bytesize(), "1MB")}
, {config, config()}
];
fields(mnesia_connector) ->
[ {type, ?TYPE(hoconsc:union([mnesia]))}
, {config, ?TYPE(hoconsc:ref(?MODULE, mnesia_connector_cfg))}
];
fields(mnesia_connector_cfg) ->
[ {storage_type, t(hoconsc:union([ram, disc, disc_only]), ram)}
fields(mnesia_config) ->
[ {type, ?TYPE(hoconsc:union([built_in_database]))}
, {storage_type, t(hoconsc:union([ram, disc, disc_only]), ram)}
, {max_retained_messages, t(integer(), 0, fun is_pos_integer/1)}
];
@ -43,11 +39,8 @@ t(Type, Default, Validator) ->
hoconsc:t(Type, #{default => Default,
validator => Validator}).
union_array(Item) when is_list(Item) ->
hoconsc:array(hoconsc:union(Item)).
is_pos_integer(V) ->
V >= 0.
connector() ->
#{type => union_array([hoconsc:ref(?MODULE, mnesia_connector)])}.
config() ->
#{type => hoconsc:union([hoconsc:ref(?MODULE, mnesia_config)])}.

View File

@ -67,14 +67,13 @@ new_emqx_retainer_conf() ->
#{enable => true,
msg_expiry_interval => 0,
msg_clear_interval => 0,
connector => [#{type => mnesia,
config =>
#{max_retained_messages => 0,
storage_type => ram}}],
config => #{type => built_in_database,
max_retained_messages => 0,
storage_type => ram},
flow_control => #{max_read_number => 0,
msg_deliver_quota => 0,
quota_release_interval => 0},
max_payload_size => 1024 * 1024}.
max_payload_size => 1024 * 1024}.
%%--------------------------------------------------------------------
%% Test Cases