Merge pull request #9372 from savonarola/optimize-retainer
chore(retainer): optimize index writes
This commit is contained in:
commit
ecb2f85746
|
@ -27,6 +27,7 @@
|
|||
{emqx_prometheus,1}.
|
||||
{emqx_resource,1}.
|
||||
{emqx_retainer,1}.
|
||||
{emqx_retainer,2}.
|
||||
{emqx_rule_engine,1}.
|
||||
{emqx_shared_sub,1}.
|
||||
{emqx_slow_subs,1}.
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
|
||||
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}},
|
||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}},
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.6"}}},
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.7"}}},
|
||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
|
||||
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.31.2"}}},
|
||||
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
|
||||
|
|
|
@ -199,6 +199,7 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
|
|||
Reason =:= listener_disabled;
|
||||
Reason =:= quic_app_missing
|
||||
->
|
||||
?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}),
|
||||
console_print(
|
||||
"Listener ~ts is NOT started due to: ~p.~n",
|
||||
[listener_id(Type, ListenerName), Reason]
|
||||
|
@ -212,8 +213,12 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
|
|||
),
|
||||
ok;
|
||||
{error, {already_started, Pid}} ->
|
||||
?tp(listener_not_started, #{
|
||||
type => Type, bind => Bind, status => {already_started, Pid}
|
||||
}),
|
||||
{error, {already_started, Pid}};
|
||||
{error, Reason} ->
|
||||
?tp(listener_not_started, #{type => Type, bind => Bind, status => {error, Reason}}),
|
||||
ListenerId = listener_id(Type, ListenerName),
|
||||
BindStr = format_bind(Bind),
|
||||
?ELOG(
|
||||
|
|
|
@ -43,6 +43,9 @@ init_per_suite(Config) ->
|
|||
timer:seconds(100)
|
||||
),
|
||||
fun(Trace) ->
|
||||
ct:pal("listener start statuses: ~p", [
|
||||
?of_kind([listener_started, listener_not_started], Trace)
|
||||
]),
|
||||
%% more than one listener
|
||||
?assertMatch([_ | _], ?of_kind(listener_started, Trace))
|
||||
end
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
{application, emqx_retainer, [
|
||||
{description, "EMQX Retainer"},
|
||||
% strict semver, bump manually!
|
||||
{vsn, "5.0.7"},
|
||||
{vsn, "5.0.8"},
|
||||
{modules, []},
|
||||
{registered, [emqx_retainer_sup]},
|
||||
{applications, [kernel, stdlib, emqx]},
|
||||
|
|
|
@ -38,11 +38,9 @@
|
|||
|
||||
%% Internal exports (RPC)
|
||||
-export([
|
||||
do_store_retained/1,
|
||||
do_clear_expired/0,
|
||||
do_delete_message/1,
|
||||
do_populate_index_meta/1,
|
||||
do_reindex_batch/2
|
||||
do_reindex_batch/2,
|
||||
active_indices/0
|
||||
]).
|
||||
|
||||
%% Management API:
|
||||
|
@ -66,6 +64,8 @@
|
|||
-define(CLEAR_BATCH_SIZE, 1000).
|
||||
-define(REINDEX_BATCH_SIZE, 1000).
|
||||
-define(REINDEX_DISPATCH_WAIT, 30000).
|
||||
-define(REINDEX_RPC_RETRY_INTERVAL, 1000).
|
||||
-define(REINDEX_INDEX_UPDATE_WAIT, 30000).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Management API
|
||||
|
@ -136,64 +136,41 @@ create_table(Table, RecordName, Attributes, Type, StorageType) ->
|
|||
end.
|
||||
|
||||
store_retained(_, Msg = #message{topic = Topic}) ->
|
||||
case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_store_retained/1, [Msg]) of
|
||||
{atomic, ok} ->
|
||||
?tp(debug, message_retained, #{topic => Topic}),
|
||||
ok;
|
||||
{aborted, Reason} ->
|
||||
ExpiryTime = emqx_retainer:get_expiry_time(Msg),
|
||||
Tokens = topic_to_tokens(Topic),
|
||||
case is_table_full() andalso is_new_topic(Tokens) of
|
||||
true ->
|
||||
?SLOG(error, #{
|
||||
msg => "failed_to_retain_message",
|
||||
topic => Topic,
|
||||
reason => Reason
|
||||
})
|
||||
end.
|
||||
|
||||
do_store_retained(#message{topic = Topic} = Msg) ->
|
||||
ExpiryTime = emqx_retainer:get_expiry_time(Msg),
|
||||
Tokens = topic_to_tokens(Topic),
|
||||
case is_table_full() of
|
||||
reason => table_is_full
|
||||
});
|
||||
false ->
|
||||
store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
|
||||
_ ->
|
||||
case mnesia:read(?TAB_MESSAGE, Tokens, write) of
|
||||
[_] ->
|
||||
store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
|
||||
[] ->
|
||||
mnesia:abort(table_is_full)
|
||||
end
|
||||
do_store_retained(Msg, Tokens, ExpiryTime)
|
||||
end.
|
||||
|
||||
clear_expired(_) ->
|
||||
{atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear_expired/0),
|
||||
ok.
|
||||
|
||||
do_clear_expired() ->
|
||||
NowMs = erlang:system_time(millisecond),
|
||||
QH = qlc:q([
|
||||
TopicTokens
|
||||
RetainedMsg
|
||||
|| #retained_message{
|
||||
topic = TopicTokens,
|
||||
expiry_time = ExpiryTime
|
||||
} <- mnesia:table(?TAB_MESSAGE, [{lock, write}]),
|
||||
} = RetainedMsg <- ets:table(?TAB_MESSAGE),
|
||||
(ExpiryTime =/= 0) and (ExpiryTime < NowMs)
|
||||
]),
|
||||
QC = qlc:cursor(QH),
|
||||
clear_batch(db_indices(write), QC).
|
||||
clear_batch(dirty_indices(write), QC).
|
||||
|
||||
delete_message(_, Topic) ->
|
||||
{atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_delete_message/1, [Topic]),
|
||||
ok.
|
||||
|
||||
do_delete_message(Topic) ->
|
||||
Tokens = topic_to_tokens(Topic),
|
||||
case emqx_topic:wildcard(Topic) of
|
||||
false ->
|
||||
ok = delete_message_by_topic(Tokens, db_indices(write));
|
||||
ok = delete_message_by_topic(Tokens, dirty_indices(write));
|
||||
true ->
|
||||
QH = topic_search_table(Tokens),
|
||||
QH = search_table(Tokens, 0),
|
||||
qlc:fold(
|
||||
fun(TopicTokens, _) ->
|
||||
ok = delete_message_by_topic(TopicTokens, db_indices(write))
|
||||
fun(RetainedMsg, _) ->
|
||||
ok = delete_message_with_indices(RetainedMsg, dirty_indices(write))
|
||||
end,
|
||||
undefined,
|
||||
QH
|
||||
|
@ -206,7 +183,7 @@ read_message(_, Topic) ->
|
|||
match_messages(_, Topic, undefined) ->
|
||||
Tokens = topic_to_tokens(Topic),
|
||||
Now = erlang:system_time(millisecond),
|
||||
QH = search_table(Tokens, Now),
|
||||
QH = msg_table(search_table(Tokens, Now)),
|
||||
case batch_read_number() of
|
||||
all_remaining ->
|
||||
{ok, qlc:eval(QH), undefined};
|
||||
|
@ -227,10 +204,10 @@ page_read(_, Topic, Page, Limit) ->
|
|||
QH =
|
||||
case Topic of
|
||||
undefined ->
|
||||
search_table(undefined, ['#'], Now);
|
||||
msg_table(search_table(undefined, ['#'], Now));
|
||||
_ ->
|
||||
Tokens = topic_to_tokens(Topic),
|
||||
search_table(Tokens, Now)
|
||||
msg_table(search_table(Tokens, Now))
|
||||
end,
|
||||
OrderedQH = qlc:sort(QH, {order, fun compare_message/2}),
|
||||
Cursor = qlc:cursor(OrderedQH),
|
||||
|
@ -281,49 +258,49 @@ reindex_status() ->
|
|||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
store_retained(Indices, Msg, Tokens, ExpiryTime) ->
|
||||
ok = store_retained_message(Msg, Tokens, ExpiryTime),
|
||||
ok = emqx_retainer_index:foreach_index_key(
|
||||
fun(Key) -> store_retained_index(Key, ExpiryTime) end,
|
||||
Indices,
|
||||
Tokens
|
||||
).
|
||||
do_store_retained(Msg, TopicTokens, ExpiryTime) ->
|
||||
%% Retained message is stored syncronously on all core nodes
|
||||
ok = do_store_retained_message(Msg, TopicTokens, ExpiryTime),
|
||||
%% Since retained message was stored syncronously on all core nodes,
|
||||
%% now we are sure that
|
||||
%% * either we will write correct indices
|
||||
%% * or if we a replicant with outdated write indices due to reindexing,
|
||||
%% the correct indices will be added by reindexing
|
||||
ok = do_store_retained_indices(TopicTokens, ExpiryTime).
|
||||
|
||||
store_retained_message(Msg, Tokens, ExpiryTime) ->
|
||||
do_store_retained_message(Msg, TopicTokens, ExpiryTime) ->
|
||||
RetainedMessage = #retained_message{
|
||||
topic = Tokens,
|
||||
topic = TopicTokens,
|
||||
msg = Msg,
|
||||
expiry_time = ExpiryTime
|
||||
},
|
||||
mnesia:write(?TAB_MESSAGE, RetainedMessage, write).
|
||||
ok = mria:dirty_write_sync(?TAB_MESSAGE, RetainedMessage).
|
||||
|
||||
store_retained_index(Key, ExpiryTime) ->
|
||||
do_store_retained_indices(TopicTokens, ExpiryTime) ->
|
||||
Indices = dirty_indices(write),
|
||||
ok = emqx_retainer_index:foreach_index_key(
|
||||
fun(Key) -> do_store_retained_index(Key, ExpiryTime) end,
|
||||
Indices,
|
||||
TopicTokens
|
||||
).
|
||||
|
||||
do_store_retained_index(Key, ExpiryTime) ->
|
||||
RetainedIndex = #retained_index{
|
||||
key = Key,
|
||||
expiry_time = ExpiryTime
|
||||
},
|
||||
mnesia:write(?TAB_INDEX, RetainedIndex, write).
|
||||
mria:dirty_write(?TAB_INDEX, RetainedIndex).
|
||||
|
||||
topic_search_table(Tokens) ->
|
||||
Index = emqx_retainer_index:select_index(Tokens, db_indices(read)),
|
||||
topic_search_table(Index, Tokens).
|
||||
|
||||
topic_search_table(undefined, Tokens) ->
|
||||
Cond = emqx_retainer_index:condition(Tokens),
|
||||
Ms = [{#retained_message{topic = Cond, msg = '_', expiry_time = '_'}, [], ['$_']}],
|
||||
MsgQH = mnesia:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]),
|
||||
qlc:q([Topic || #retained_message{topic = Topic} <- MsgQH]);
|
||||
topic_search_table(Index, Tokens) ->
|
||||
Cond = emqx_retainer_index:condition(Index, Tokens),
|
||||
Ms = [{#retained_index{key = Cond, expiry_time = '_'}, [], ['$_']}],
|
||||
IndexQH = mnesia:table(?TAB_INDEX, [{traverse, {select, Ms}}]),
|
||||
msg_table(SearchTable) ->
|
||||
qlc:q([
|
||||
emqx_retainer_index:restore_topic(Key)
|
||||
|| #retained_index{key = Key} <- IndexQH
|
||||
Msg
|
||||
|| #retained_message{
|
||||
msg = Msg
|
||||
} <- SearchTable
|
||||
]).
|
||||
|
||||
search_table(Tokens, Now) ->
|
||||
Indices = dirty_read_indices(),
|
||||
Indices = dirty_indices(read),
|
||||
Index = emqx_retainer_index:select_index(Tokens, Indices),
|
||||
search_table(Index, Tokens, Now).
|
||||
|
||||
|
@ -341,26 +318,21 @@ search_table(Index, Tokens, Now) ->
|
|||
|| TopicTokens <- Topics
|
||||
]),
|
||||
qlc:q([
|
||||
Msg
|
||||
RetainedMsg
|
||||
|| [
|
||||
#retained_message{
|
||||
msg = Msg,
|
||||
expiry_time = ExpiryTime
|
||||
}
|
||||
} = RetainedMsg
|
||||
] <- RetainedMsgQH,
|
||||
(ExpiryTime == 0) or (ExpiryTime > Now)
|
||||
]).
|
||||
|
||||
dirty_read_indices() ->
|
||||
case ets:lookup(?TAB_INDEX_META, ?META_KEY) of
|
||||
[#retained_index_meta{read_indices = ReadIndices}] -> ReadIndices;
|
||||
[] -> []
|
||||
end.
|
||||
|
||||
clear_batch(Indices, QC) ->
|
||||
{Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE),
|
||||
lists:foreach(
|
||||
fun(TopicTokens) -> delete_message_by_topic(TopicTokens, Indices) end,
|
||||
fun(RetainedMsg) ->
|
||||
delete_message_with_indices(RetainedMsg, Indices)
|
||||
end,
|
||||
Rows
|
||||
),
|
||||
case Result of
|
||||
|
@ -369,14 +341,23 @@ clear_batch(Indices, QC) ->
|
|||
end.
|
||||
|
||||
delete_message_by_topic(TopicTokens, Indices) ->
|
||||
case mnesia:dirty_read(?TAB_MESSAGE, TopicTokens) of
|
||||
[] -> ok;
|
||||
[RetainedMsg] -> delete_message_with_indices(RetainedMsg, Indices)
|
||||
end.
|
||||
|
||||
delete_message_with_indices(RetainedMsg, Indices) ->
|
||||
#retained_message{topic = TopicTokens, expiry_time = ExpiryTime} = RetainedMsg,
|
||||
ok = emqx_retainer_index:foreach_index_key(
|
||||
fun(Key) ->
|
||||
mnesia:delete({?TAB_INDEX, Key})
|
||||
mria:dirty_delete_object(?TAB_INDEX, #retained_index{
|
||||
key = Key, expiry_time = ExpiryTime
|
||||
})
|
||||
end,
|
||||
Indices,
|
||||
TopicTokens
|
||||
),
|
||||
ok = mnesia:delete({?TAB_MESSAGE, TopicTokens}).
|
||||
ok = mria:dirty_delete_object(?TAB_MESSAGE, RetainedMsg).
|
||||
|
||||
compare_message(M1, M2) ->
|
||||
M1#message.timestamp =< M2#message.timestamp.
|
||||
|
@ -415,20 +396,26 @@ qlc_next_answers(QC, N) ->
|
|||
|
||||
make_message_match_spec(Tokens, NowMs) ->
|
||||
Cond = emqx_retainer_index:condition(Tokens),
|
||||
MsHd = #retained_message{topic = Cond, msg = '$2', expiry_time = '$3'},
|
||||
[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$2']}].
|
||||
MsHd = #retained_message{topic = Cond, msg = '_', expiry_time = '$3'},
|
||||
[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
|
||||
|
||||
make_index_match_spec(Index, Tokens, NowMs) ->
|
||||
Cond = emqx_retainer_index:condition(Index, Tokens),
|
||||
MsHd = #retained_index{key = Cond, expiry_time = '$3'},
|
||||
[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
|
||||
|
||||
-spec is_table_full() -> boolean().
|
||||
is_table_full() ->
|
||||
Limit = emqx:get_config([retainer, backend, max_retained_messages]),
|
||||
Limit > 0 andalso (table_size() >= Limit).
|
||||
|
||||
-spec table_size() -> non_neg_integer().
|
||||
is_new_topic(Tokens) ->
|
||||
case mnesia:dirty_read(?TAB_MESSAGE, Tokens) of
|
||||
[_] ->
|
||||
false;
|
||||
[] ->
|
||||
true
|
||||
end.
|
||||
|
||||
table_size() ->
|
||||
mnesia:table_info(?TAB_MESSAGE, size).
|
||||
|
||||
|
@ -486,8 +473,14 @@ do_populate_index_meta(ConfigIndices) ->
|
|||
)
|
||||
end.
|
||||
|
||||
dirty_indices(Type) ->
|
||||
indices(ets:lookup(?TAB_INDEX_META, ?META_KEY), Type).
|
||||
|
||||
db_indices(Type) ->
|
||||
case mnesia:read(?TAB_INDEX_META, ?META_KEY) of
|
||||
indices(mnesia:read(?TAB_INDEX_META, ?META_KEY), Type).
|
||||
|
||||
indices(IndexRecords, Type) ->
|
||||
case IndexRecords of
|
||||
[#retained_index_meta{read_indices = ReadIndices, write_indices = WriteIndices}] ->
|
||||
case Type of
|
||||
read -> ReadIndices;
|
||||
|
@ -506,10 +499,15 @@ batch_read_number() ->
|
|||
reindex(NewIndices, Force, StatusFun) when
|
||||
is_boolean(Force) andalso is_function(StatusFun, 1)
|
||||
->
|
||||
%% Do not run on replicants
|
||||
core = mria_rlog:role(),
|
||||
%% Disable read indices and update write indices so that new records are written
|
||||
%% with correct indices. Also block parallel reindexing.
|
||||
case try_start_reindex(NewIndices, Force) of
|
||||
{atomic, ok} ->
|
||||
%% Wait for all nodes to have new indices, including rlog nodes
|
||||
true = wait_indices_updated({[], NewIndices}, ?REINDEX_INDEX_UPDATE_WAIT),
|
||||
|
||||
%% Wait for all dispatch operations to be completed to avoid
|
||||
%% inconsistent results.
|
||||
true = wait_dispatch_complete(?REINDEX_DISPATCH_WAIT),
|
||||
|
@ -592,7 +590,7 @@ reindex_topic(Indices, Topic) ->
|
|||
case mnesia:read(?TAB_MESSAGE, Topic, read) of
|
||||
[#retained_message{expiry_time = ExpiryTime}] ->
|
||||
ok = emqx_retainer_index:foreach_index_key(
|
||||
fun(Key) -> store_retained_index(Key, ExpiryTime) end,
|
||||
fun(Key) -> do_store_retained_index(Key, ExpiryTime) end,
|
||||
Indices,
|
||||
Topic
|
||||
);
|
||||
|
@ -627,8 +625,35 @@ do_reindex_batch(QC, Done) ->
|
|||
|
||||
wait_dispatch_complete(Timeout) ->
|
||||
Nodes = mria_mnesia:running_nodes(),
|
||||
{Results, []} = emqx_retainer_proto_v1:wait_dispatch_complete(Nodes, Timeout),
|
||||
{Results, []} = emqx_retainer_proto_v2:wait_dispatch_complete(Nodes, Timeout),
|
||||
lists:all(
|
||||
fun(Result) -> Result =:= ok end,
|
||||
Results
|
||||
).
|
||||
|
||||
wait_indices_updated(_Indices, TimeLeft) when TimeLeft < 0 -> false;
|
||||
wait_indices_updated(Indices, TimeLeft) ->
|
||||
case timer:tc(fun() -> are_indices_updated(Indices) end) of
|
||||
{_, true} ->
|
||||
true;
|
||||
{TimePassed, false} ->
|
||||
timer:sleep(?REINDEX_RPC_RETRY_INTERVAL),
|
||||
wait_indices_updated(
|
||||
Indices, TimeLeft - ?REINDEX_RPC_RETRY_INTERVAL - TimePassed / 1000
|
||||
)
|
||||
end.
|
||||
|
||||
active_indices() ->
|
||||
{dirty_indices(read), dirty_indices(write)}.
|
||||
|
||||
are_indices_updated(Indices) ->
|
||||
Nodes = mria_mnesia:running_nodes(),
|
||||
case emqx_retainer_proto_v2:active_mnesia_indices(Nodes) of
|
||||
{Results, []} ->
|
||||
lists:all(
|
||||
fun(NodeIndices) -> NodeIndices =:= Indices end,
|
||||
Results
|
||||
);
|
||||
_ ->
|
||||
false
|
||||
end.
|
||||
|
|
|
@ -50,11 +50,39 @@ retainer(["reindex", "status"]) ->
|
|||
retainer(["reindex", "start"]) ->
|
||||
retainer(["reindex", "start", "false"]);
|
||||
retainer(["reindex", "start", ForceParam]) ->
|
||||
Force =
|
||||
case ForceParam of
|
||||
"true" -> true;
|
||||
_ -> false
|
||||
end,
|
||||
case mria_rlog:role() of
|
||||
core ->
|
||||
Force =
|
||||
case ForceParam of
|
||||
"true" -> true;
|
||||
_ -> false
|
||||
end,
|
||||
do_reindex(Force);
|
||||
replicant ->
|
||||
?PRINT_MSG("Can't run reindex on a replicant node")
|
||||
end;
|
||||
retainer(_) ->
|
||||
emqx_ctl:usage(
|
||||
[
|
||||
{"retainer info", "Show the count of retained messages"},
|
||||
{"retainer topics", "Show all topics of retained messages"},
|
||||
{"retainer clean", "Clean all retained messages"},
|
||||
{"retainer clean <Topic>", "Clean retained messages by the specified topic filter"},
|
||||
{"retainer reindex status", "Show reindex status"},
|
||||
{"retainer reindex start [force]",
|
||||
"Generate new retainer topic indices from config settings.\n"
|
||||
"Pass true as <Force> to ignore previously started reindexing"}
|
||||
]
|
||||
).
|
||||
|
||||
unload() ->
|
||||
ok = emqx_ctl:unregister_command(retainer).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Private
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
do_reindex(Force) ->
|
||||
?PRINT_MSG("Starting reindexing~n"),
|
||||
emqx_retainer_mnesia:reindex(
|
||||
Force,
|
||||
|
@ -69,20 +97,4 @@ retainer(["reindex", "start", ForceParam]) ->
|
|||
?PRINT("Reindexed ~p messages~n", [Done])
|
||||
end
|
||||
),
|
||||
?PRINT_MSG("Reindexing finished~n");
|
||||
retainer(_) ->
|
||||
emqx_ctl:usage(
|
||||
[
|
||||
{"retainer info", "Show the count of retained messages"},
|
||||
{"retainer topics", "Show all topics of retained messages"},
|
||||
{"retainer clean", "Clean all retained messages"},
|
||||
{"retainer clean <Topic>", "Clean retained messages by the specified topic filter"},
|
||||
{"retainer reindex status", "Show reindex status"},
|
||||
{"retainer reindex start [force]",
|
||||
"Generate new retainer topic indices config settings.\n"
|
||||
"Pass true as <Force> to ignore previously started reindexing"}
|
||||
]
|
||||
).
|
||||
|
||||
unload() ->
|
||||
ok = emqx_ctl:unregister_command(retainer).
|
||||
?PRINT_MSG("Reindexing finished~n").
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_retainer_proto_v2).
|
||||
|
||||
-behaviour(emqx_bpapi).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
-export([
|
||||
introduced_in/0,
|
||||
wait_dispatch_complete/2,
|
||||
active_mnesia_indices/1
|
||||
]).
|
||||
|
||||
-define(TIMEOUT, 5000).
|
||||
|
||||
introduced_in() ->
|
||||
"5.0.13".
|
||||
|
||||
-spec wait_dispatch_complete(list(node()), timeout()) -> emqx_rpc:multicall_result(ok).
|
||||
wait_dispatch_complete(Nodes, Timeout) ->
|
||||
rpc:multicall(Nodes, emqx_retainer_dispatcher, wait_dispatch_complete, [Timeout]).
|
||||
|
||||
-spec active_mnesia_indices(list(node())) ->
|
||||
emqx_rpc:multicall_result({list(emqx_retainer_index:index()), list(emqx_retainer_index:index())}).
|
||||
active_mnesia_indices(Nodes) ->
|
||||
rpc:multicall(Nodes, emqx_retainer_mnesia, active_indices, [], ?TIMEOUT).
|
|
@ -318,6 +318,25 @@ t_message_expiry_2(_) ->
|
|||
end,
|
||||
with_conf(ConfMod, Case).
|
||||
|
||||
t_table_full(_) ->
|
||||
ConfMod = fun(Conf) ->
|
||||
Conf#{<<"backend">> => #{<<"max_retained_messages">> => <<"1">>}}
|
||||
end,
|
||||
Case = fun() ->
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
emqtt:publish(C1, <<"retained/t/1">>, <<"a">>, [{qos, 0}, {retain, true}]),
|
||||
emqtt:publish(C1, <<"retained/t/2">>, <<"b">>, [{qos, 0}, {retain, true}]),
|
||||
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/t/1">>, [{qos, 0}, {rh, 0}]),
|
||||
?assertEqual(1, length(receive_messages(1))),
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/t/2">>, [{qos, 0}, {rh, 0}]),
|
||||
?assertEqual(0, length(receive_messages(1))),
|
||||
|
||||
ok = emqtt:disconnect(C1)
|
||||
end,
|
||||
with_conf(ConfMod, Case).
|
||||
|
||||
t_clean(_) ->
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
- Add more PSK ciphers support [#9505](https://github.com/emqx/emqx/pull/9505).
|
||||
|
||||
- Improve `emqx_retainer` write performance: get rid of transactions on write [#9372](https://github.com/emqx/emqx/pull/9372).
|
||||
|
||||
## Bug fixes
|
||||
|
||||
- Fix that the obsolete SSL files aren't deleted after the ExHook config update [#9432](https://github.com/emqx/emqx/pull/9432).
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
|
||||
- 支持更多的 PSK 密码套件[#9505](https://github.com/emqx/emqx/pull/9505)。
|
||||
|
||||
- 提高 `emqx_retainer` 写入性能:摆脱写入时的事务 [#9372](https://github.com/emqx/emqx/pull/9372)。
|
||||
|
||||
## 修复
|
||||
|
||||
- 修复 ExHook 更新 SSL 相关配置后,过时的 SSL 文件没有被删除的问题 [#9432](https://github.com/emqx/emqx/pull/9432)。
|
||||
|
|
2
mix.exs
2
mix.exs
|
@ -52,7 +52,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
|
||||
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
|
||||
{:esockd, github: "emqx/esockd", tag: "5.9.4", override: true},
|
||||
{:ekka, github: "emqx/ekka", tag: "0.13.6", override: true},
|
||||
{:ekka, github: "emqx/ekka", tag: "0.13.7", override: true},
|
||||
{:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
|
||||
{:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
|
||||
{:minirest, github: "emqx/minirest", tag: "1.3.7", override: true},
|
||||
|
|
|
@ -54,7 +54,7 @@
|
|||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}
|
||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.6"}}}
|
||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.7"}}}
|
||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
|
||||
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}}
|
||||
|
|
Loading…
Reference in New Issue