chore(retainer): refactor types; remove hotwiring to builtin config
This commit is contained in:
parent
aa2ee9c409
commit
d94066a10a
|
@ -13,6 +13,8 @@
|
||||||
%% See the License for the specific language governing permissions and
|
%% See the License for the specific language governing permissions and
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
-ifndef(EMQX_RETAINER_HRL).
|
||||||
|
-define(EMQX_RETAINER_HRL, true).
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
|
||||||
|
@ -22,22 +24,4 @@
|
||||||
-define(TAB_INDEX_META, emqx_retainer_index_meta).
|
-define(TAB_INDEX_META, emqx_retainer_index_meta).
|
||||||
-define(RETAINER_SHARD, emqx_retainer_shard).
|
-define(RETAINER_SHARD, emqx_retainer_shard).
|
||||||
|
|
||||||
-type topic() :: binary().
|
-endif.
|
||||||
-type payload() :: binary().
|
|
||||||
-type message() :: #message{}.
|
|
||||||
|
|
||||||
-type context() :: #{
|
|
||||||
context_id := pos_integer(),
|
|
||||||
atom() => term()
|
|
||||||
}.
|
|
||||||
|
|
||||||
-define(DELIVER_SEMAPHORE, deliver_remained_quota).
|
|
||||||
-type semaphore() :: ?DELIVER_SEMAPHORE.
|
|
||||||
-type cursor() :: undefined | term().
|
|
||||||
-type result() :: term().
|
|
||||||
|
|
||||||
-define(SHARED_CONTEXT_TAB, emqx_retainer_ctx).
|
|
||||||
-record(shared_context, {key :: atom(), value :: term()}).
|
|
||||||
-type shared_context_key() :: ?DELIVER_SEMAPHORE.
|
|
||||||
|
|
||||||
-type backend() :: emqx_retainer_storage_mnesia.
|
|
||||||
|
|
|
@ -58,6 +58,11 @@
|
||||||
code_change/3
|
code_change/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export_type([
|
||||||
|
cursor/0,
|
||||||
|
context/0
|
||||||
|
]).
|
||||||
|
|
||||||
%% exported for `emqx_telemetry'
|
%% exported for `emqx_telemetry'
|
||||||
-export([get_basic_usage_info/0]).
|
-export([get_basic_usage_info/0]).
|
||||||
|
|
||||||
|
@ -66,12 +71,19 @@
|
||||||
context := undefined | context(),
|
context := undefined | context(),
|
||||||
clear_timer := undefined | reference()
|
clear_timer := undefined | reference()
|
||||||
}.
|
}.
|
||||||
|
-type context() :: term().
|
||||||
|
|
||||||
|
-type topic() :: emqx_types:topic().
|
||||||
|
-type message() :: emqx_types:message().
|
||||||
|
-type cursor() :: undefined | term().
|
||||||
|
-type backend() :: emqx_retainer_mnesia | module().
|
||||||
|
|
||||||
-define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)).
|
-define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)).
|
||||||
-define(DEF_EXPIRY_INTERVAL, 0).
|
-define(DEF_EXPIRY_INTERVAL, 0).
|
||||||
-define(MAX_PAYLOAD_SIZE_CONFIG_PATH, [retainer, max_payload_size]).
|
-define(MAX_PAYLOAD_SIZE_CONFIG_PATH, [retainer, max_payload_size]).
|
||||||
|
|
||||||
-callback create(map()) -> context().
|
-callback create(map()) -> context().
|
||||||
|
-callback update(context(), map()) -> ok | need_recreate.
|
||||||
-callback close(context()) -> ok.
|
-callback close(context()) -> ok.
|
||||||
-callback delete_message(context(), topic()) -> ok.
|
-callback delete_message(context(), topic()) -> ok.
|
||||||
-callback store_retained(context(), message()) -> ok.
|
-callback store_retained(context(), message()) -> ok.
|
||||||
|
@ -180,8 +192,7 @@ stats_fun() ->
|
||||||
-spec get_basic_usage_info() -> #{retained_messages => non_neg_integer()}.
|
-spec get_basic_usage_info() -> #{retained_messages => non_neg_integer()}.
|
||||||
get_basic_usage_info() ->
|
get_basic_usage_info() ->
|
||||||
try
|
try
|
||||||
RetainedMessages = gen_server:call(?MODULE, retained_count),
|
#{retained_messages => retained_count()}
|
||||||
#{retained_messages => RetainedMessages}
|
|
||||||
catch
|
catch
|
||||||
_:_ ->
|
_:_ ->
|
||||||
#{retained_messages => 0}
|
#{retained_messages => 0}
|
||||||
|
@ -319,27 +330,25 @@ update_config(true, false, State, NewConf, _) ->
|
||||||
update_config(
|
update_config(
|
||||||
true,
|
true,
|
||||||
true,
|
true,
|
||||||
#{clear_timer := ClearTimer} = State,
|
#{clear_timer := ClearTimer, context := Context} = State,
|
||||||
NewConf,
|
NewConf,
|
||||||
OldConf
|
OldConf
|
||||||
) ->
|
) ->
|
||||||
#{
|
#{
|
||||||
backend := #{
|
backend := #{
|
||||||
type := BackendType,
|
type := BackendType
|
||||||
storage_type := StorageType
|
} = NewBackendConfig,
|
||||||
},
|
|
||||||
msg_clear_interval := ClearInterval
|
msg_clear_interval := ClearInterval
|
||||||
} = NewConf,
|
} = NewConf,
|
||||||
|
|
||||||
#{
|
#{
|
||||||
backend := #{
|
backend := #{
|
||||||
type := OldBackendType,
|
type := OldBackendType
|
||||||
storage_type := OldStorageType
|
|
||||||
}
|
}
|
||||||
} = OldConf,
|
} = OldConf,
|
||||||
SameBackendType = BackendType =:= OldBackendType,
|
SameBackendType = BackendType =:= OldBackendType,
|
||||||
SameStorageType = StorageType =:= OldStorageType,
|
Mod = get_backend_module(),
|
||||||
case SameBackendType andalso SameStorageType of
|
case SameBackendType andalso ok =:= Mod:update(Context, NewBackendConfig) of
|
||||||
true ->
|
true ->
|
||||||
State#{
|
State#{
|
||||||
clear_timer := check_timer(
|
clear_timer := check_timer(
|
||||||
|
@ -362,7 +371,7 @@ enable_retainer(
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
Context = create(BackendCfg),
|
Context = create(BackendCfg),
|
||||||
load(Context),
|
ok = load(Context),
|
||||||
State#{
|
State#{
|
||||||
enable := true,
|
enable := true,
|
||||||
context := Context,
|
context := Context,
|
||||||
|
@ -376,7 +385,7 @@ disable_retainer(
|
||||||
context := Context
|
context := Context
|
||||||
} = State
|
} = State
|
||||||
) ->
|
) ->
|
||||||
unload(),
|
ok = unload(),
|
||||||
ok = close(Context),
|
ok = close(Context),
|
||||||
State#{
|
State#{
|
||||||
enable := false,
|
enable := false,
|
||||||
|
|
|
@ -44,6 +44,9 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-type limiter() :: emqx_htb_limiter:limiter().
|
-type limiter() :: emqx_htb_limiter:limiter().
|
||||||
|
-type context() :: emqx_retainer:context().
|
||||||
|
-type topic() :: emqx_types:topic().
|
||||||
|
-type cursor() :: emqx_retainer:cursor().
|
||||||
|
|
||||||
-define(POOL, ?MODULE).
|
-define(POOL, ?MODULE).
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
%% emqx_retainer callbacks
|
%% emqx_retainer callbacks
|
||||||
-export([
|
-export([
|
||||||
create/1,
|
create/1,
|
||||||
|
update/2,
|
||||||
close/1,
|
close/1,
|
||||||
delete_message/2,
|
delete_message/2,
|
||||||
store_retained/2,
|
store_retained/2,
|
||||||
|
@ -102,7 +103,7 @@ create(#{storage_type := StorageType}) ->
|
||||||
StorageType
|
StorageType
|
||||||
),
|
),
|
||||||
%% The context is not used by this backend
|
%% The context is not used by this backend
|
||||||
#{}.
|
#{storage_type => StorageType}.
|
||||||
|
|
||||||
create_table(Table, RecordName, Attributes, Type, StorageType) ->
|
create_table(Table, RecordName, Attributes, Type, StorageType) ->
|
||||||
Copies =
|
Copies =
|
||||||
|
@ -138,6 +139,11 @@ create_table(Table, RecordName, Attributes, Type, StorageType) ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
update(#{storage_type := StorageType}, #{storage_type := StorageType}) ->
|
||||||
|
ok;
|
||||||
|
update(_Context, _NewConfig) ->
|
||||||
|
need_recreate.
|
||||||
|
|
||||||
close(_Context) -> ok.
|
close(_Context) -> ok.
|
||||||
|
|
||||||
store_retained(_Context, Msg = #message{topic = Topic}) ->
|
store_retained(_Context, Msg = #message{topic = Topic}) ->
|
||||||
|
|
|
@ -19,8 +19,6 @@
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
|
|
||||||
|
|
||||||
-include("emqx_retainer.hrl").
|
-include("emqx_retainer.hrl").
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
@ -789,6 +787,11 @@ t_compatibility_for_deliver_rate(_) ->
|
||||||
Parser(DeliveryInf)
|
Parser(DeliveryInf)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
t_update_config(_) ->
|
||||||
|
OldConf = emqx_config:get([retainer]),
|
||||||
|
NewConf = emqx_utils_maps:deep_put([backend, storage_type], OldConf, disk),
|
||||||
|
emqx_retainer:update_config(NewConf).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -621,7 +621,7 @@ mock_httpc() ->
|
||||||
).
|
).
|
||||||
|
|
||||||
mock_advanced_mqtt_features() ->
|
mock_advanced_mqtt_features() ->
|
||||||
Context = undefined,
|
Context = emqx_retainer:context(),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(N) ->
|
fun(N) ->
|
||||||
Num = integer_to_binary(N),
|
Num = integer_to_binary(N),
|
||||||
|
|
Loading…
Reference in New Issue