diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions
index 55b4b9ecd..9f1373f5a 100644
--- a/apps/emqx/priv/bpapi.versions
+++ b/apps/emqx/priv/bpapi.versions
@@ -23,3 +23,4 @@
{emqx_topic_metrics,1}.
{emqx_delayed,1}.
{emqx_mgmt_cluster,1}.
+{emqx_retainer,1}.
diff --git a/apps/emqx/src/emqx_rpc.erl b/apps/emqx/src/emqx_rpc.erl
index b8d74fd3a..590730e66 100644
--- a/apps/emqx/src/emqx_rpc.erl
+++ b/apps/emqx/src/emqx_rpc.erl
@@ -51,11 +51,13 @@
-type badrpc() :: {badrpc, term()} | {badtcp, term()}.
--type call_result() :: term() | badrpc().
+-type call_result(Result) :: Result | badrpc().
+
+-type call_result() :: call_result(term()).
-type cast_result() :: true.
--type multicall_result(Result) :: {[Result], _BadNodes :: [node()]}.
+-type multicall_result(Result) :: {[call_result(Result)], _BadNodes :: [node()]}.
-type multicall_result() :: multicall_result(term()).
diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl
index cda953a76..2169f42ba 100644
--- a/apps/emqx/test/emqx_common_test_helpers.erl
+++ b/apps/emqx/test/emqx_common_test_helpers.erl
@@ -488,8 +488,9 @@ is_tcp_server_available(Host, Port, Timeout) ->
start_ekka() ->
try mnesia_hook:module_info() of
_ -> ekka:start()
- catch _:_ ->
- %% Falling back to using Mnesia DB backend.
- application:set_env(mria, db_backend, mnesia),
- ekka:start()
+ catch
+ _:_ ->
+ %% Falling back to using Mnesia DB backend.
+ application:set_env(mria, db_backend, mnesia),
+ ekka:start()
end.
diff --git a/apps/emqx/test/emqx_config_handler_SUITE.erl b/apps/emqx/test/emqx_config_handler_SUITE.erl
index 72e3ed62f..ae34bee7a 100644
--- a/apps/emqx/test/emqx_config_handler_SUITE.erl
+++ b/apps/emqx/test/emqx_config_handler_SUITE.erl
@@ -223,7 +223,9 @@ t_callback_crash(_Config) ->
Opts = #{rawconf_with_defaults => true},
ok = emqx_config_handler:add_handler(CrashPath, ?MODULE),
Old = emqx:get_raw_config(CrashPath),
- ?assertMatch({error, {config_update_crashed, _}}, emqx:update_config(CrashPath, <<"89%">>, Opts)),
+ ?assertMatch(
+ {error, {config_update_crashed, _}}, emqx:update_config(CrashPath, <<"89%">>, Opts)
+ ),
New = emqx:get_raw_config(CrashPath),
?assertEqual(Old, New),
ok = emqx_config_handler:remove_handler(CrashPath),
diff --git a/apps/emqx_retainer/include/emqx_retainer.hrl b/apps/emqx_retainer/include/emqx_retainer.hrl
index 95d5eb9fb..e5d8a3112 100644
--- a/apps/emqx_retainer/include/emqx_retainer.hrl
+++ b/apps/emqx_retainer/include/emqx_retainer.hrl
@@ -17,7 +17,9 @@
-include_lib("emqx/include/emqx.hrl").
-define(APP, emqx_retainer).
--define(TAB, ?APP).
+-define(TAB_MESSAGE, emqx_retainer_message).
+-define(TAB_INDEX, emqx_retainer_index).
+-define(TAB_INDEX_META, emqx_retainer_index_meta).
-define(RETAINER_SHARD, emqx_retainer_shard).
-type topic() :: binary().
diff --git a/apps/emqx_retainer/src/emqx_retainer_app.erl b/apps/emqx_retainer/src/emqx_retainer_app.erl
index 2a7620664..2285d4551 100644
--- a/apps/emqx_retainer/src/emqx_retainer_app.erl
+++ b/apps/emqx_retainer/src/emqx_retainer_app.erl
@@ -24,7 +24,9 @@
]).
start(_Type, _Args) ->
+ ok = emqx_retainer_mnesia_cli:load(),
emqx_retainer_sup:start_link().
stop(_State) ->
+ ok = emqx_retainer_mnesia_cli:unload(),
ok.
diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl
index 1bdf11432..1c39958b2 100644
--- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl
+++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl
@@ -26,6 +26,7 @@
start_link/2,
dispatch/2,
refresh_limiter/0,
+ wait_dispatch_complete/1,
worker/0
]).
@@ -61,6 +62,15 @@ refresh_limiter() ->
Workers
).
+wait_dispatch_complete(Timeout) ->
+ Workers = gproc_pool:active_workers(?POOL),
+ lists:foreach(
+ fun({_, Pid}) ->
+ ok = gen_server:call(Pid, ?FUNCTION_NAME, Timeout)
+ end,
+ Workers
+ ).
+
worker() ->
gproc_pool:pick_worker(?POOL, self()).
@@ -120,6 +130,8 @@ init([Pool, Id]) ->
| {noreply, NewState :: term(), hibernate}
| {stop, Reason :: term(), Reply :: term(), NewState :: term()}
| {stop, Reason :: term(), NewState :: term()}.
+handle_call(wait_dispatch_complete, _From, State) ->
+ {reply, ok, State};
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}.
diff --git a/apps/emqx_retainer/src/emqx_retainer_index.erl b/apps/emqx_retainer/src/emqx_retainer_index.erl
new file mode 100644
index 000000000..0d739e2b0
--- /dev/null
+++ b/apps/emqx_retainer/src/emqx_retainer_index.erl
@@ -0,0 +1,194 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_retainer_index).
+
+-export([
+ foreach_index_key/3,
+ to_index_key/2,
+ index_score/2,
+ select_index/2,
+ condition/1,
+ condition/2,
+ restore_topic/1
+]).
+
+-export_type([index/0]).
+
+-type index() :: list(pos_integer()).
+
+%% @doc Index key is a term that can be effectively searched in the index table.
+-type index_key() :: {index(), {emqx_topic:words(), emqx_topic:words()}}.
+
+-type match_pattern_part() :: term().
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+%% @doc Given words of a concrete topic (`Tokens') and a list of `Indices',
+%% constructs index keys for the topic and each of the indices.
+%% `Fun' is called with each of these keys.
+-spec foreach_index_key(fun((index_key()) -> any()), list(index()), emqx_topic:words()) -> ok.
+foreach_index_key(_Fun, [], _Tokens) ->
+ ok;
+foreach_index_key(Fun, [Index | Indices], Tokens) ->
+ Key = to_index_key(Index, Tokens),
+ _ = Fun(Key),
+ foreach_index_key(Fun, Indices, Tokens).
+
+%% @doc Given a concrete topic and an index
+%% returns the corresponding index key.
+%%
+%% In an index key words from indexed and unindexed positions are split.
+%%
+%% E.g given `[2, 3]' index and `[<<"a">>, <<"b">>, <<"c">>, <<"d">>]' topic,
+%% returns `{[2, 3], {[<<"b">>, <<"c">>], [<<"a">>, <<"d">>]}}' term.
+%%
+%% @see foreach_index_key/3
+-spec to_index_key(index(), emqx_topic:words()) -> index_key().
+to_index_key(Index, Tokens) ->
+ {Index, split_index_tokens(Index, Tokens, 1, [], [])}.
+
+%% @doc Given an index and a wildcard topic
+%% returns the length of the constant prefix of the
+%% according index key.
+%%
+%% E.g. for `[2,3]' index and ['+', <<"b">>, '+', <<"d">>]
wildcard topic
+%% the score is `1', because the according index key pattern is
+%% {[<<"b">>, '_'], ['_', <<"d">>]}
.
+%%
+%% @see foreach_index_key/3
+%% @see to_index_key/2
+-spec index_score(index(), emqx_topic:words()) -> non_neg_integer().
+index_score(Index, Tokens) ->
+ index_score(Index, Tokens, 1, 0).
+
+%% @doc Given a list of indices and a wildcard topic
+%% returns index with the best score.
+%%
+%% Returns `undefined' if there are no indices with score `> 0'.
+%%
+%% @see index_score/2
+-spec select_index(emqx:words(), list(index())) -> index() | undefined.
+select_index(Tokens, Indices) ->
+ select_index(Tokens, Indices, 0, undefined).
+
+%% @doc For an index and a wildcard topic
+%% returns a matchspec pattern for the corresponding index key.
+%%
+%% E.g. for `[2, 3]' index and ['+', <<"b">>, '+', <<"d">>]
wildcard topic
+%% returns {[2, 3], {[<<"b">>, '_'], ['_', <<"d">>]}}
pattern.
+-spec condition(index(), emqx_topic:words()) -> match_pattern_part().
+condition(Index, Tokens) ->
+ {Index, condition(Index, Tokens, 1, [], [])}.
+
+%% @doc Returns a matchspec pattern for a wildcard topic.
+%%
+%% E.g. for ['+', <<"b">>, '+', <<"d">>, '#']
wildcard topic
+%% returns ['_', <<"b">>, '_', <<"d">> | '_']
pattern.
+-spec condition(emqx_topic:words()) -> match_pattern_part().
+condition(Tokens) ->
+ Tokens1 = [
+ case W =:= '+' of
+ true -> '_';
+ _ -> W
+ end
+ || W <- Tokens
+ ],
+ case length(Tokens1) > 0 andalso lists:last(Tokens1) =:= '#' of
+ false -> Tokens1;
+ _ -> (Tokens1 -- ['#']) ++ '_'
+ end.
+
+%% @doc Restores concrete topic from its index key representation.
+%%
+%% E.g given `{[2, 3], {[<<"b">>, <<"c">>], [<<"a">>, <<"d">>]}}' index key
+%% returns `[<<"a">>, <<"b">>, <<"c">>, <<"d">>]' topic.
+-spec restore_topic(index_key()) -> emqx_topic:words().
+restore_topic({Index, {IndexTokens, OtherTokens}}) ->
+ restore_topic(Index, IndexTokens, OtherTokens, 1, []).
+
+%%--------------------------------------------------------------------
+%% Private
+%%--------------------------------------------------------------------
+
+split_index_tokens([NIndex | OtherIndex], [Token | Tokens], N, IndexTokens, OtherTokens) when
+ NIndex == N
+->
+ split_index_tokens(OtherIndex, Tokens, N + 1, [Token | IndexTokens], OtherTokens);
+split_index_tokens([_NIndex | _] = Index, [Token | Tokens], N, IndexTokens, OtherTokens) ->
+ split_index_tokens(Index, Tokens, N + 1, IndexTokens, [Token | OtherTokens]);
+split_index_tokens([], Tokens, _N, IndexTokens, OtherTokens) ->
+ {lists:reverse(IndexTokens), lists:reverse(OtherTokens) ++ Tokens};
+split_index_tokens(_Index, [], _N, IndexTokens, OtherTokens) ->
+ {lists:reverse(IndexTokens), lists:reverse(OtherTokens)}.
+
+index_score([N | _Index], [Ph | _Tokens], N, Score) when
+ Ph =:= '+'; Ph =:= '#'
+->
+ Score;
+index_score([N | Index], [_Word | Tokens], N, Score) ->
+ index_score(Index, Tokens, N + 1, Score + 1);
+index_score(Index, [_Word | Tokens], N, Score) ->
+ index_score(Index, Tokens, N + 1, Score);
+index_score([], _Tokens, _N, Score) ->
+ Score;
+index_score(_Index, [], _N, Score) ->
+ Score.
+
+select_index(_Tokens, [], _MaxScore, SelectedIndex) ->
+ SelectedIndex;
+select_index(Tokens, [Index | Indices], MaxScore, SelectedIndex) ->
+ Score = index_score(Index, Tokens),
+ case Score > MaxScore of
+ true ->
+ select_index(Tokens, Indices, Score, Index);
+ false ->
+ select_index(Tokens, Indices, MaxScore, SelectedIndex)
+ end.
+
+condition([_NIndex | _OtherIndex], ['#' | _OtherTokens], _N, IndexMatch, OtherMatch) ->
+ {lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch) ++ '_'};
+condition([], ['#' | _OtherTokens], _N, IndexMatch, OtherMatch) ->
+ {lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ '_'};
+condition([], Tokens, _N, IndexMatch, OtherMatch) ->
+ {lists:reverse(IndexMatch), lists:reverse(OtherMatch) ++ condition(Tokens)};
+condition([_NIndex | _OtherIndex], [], _N, IndexMatch, OtherMatch) ->
+ {lists:reverse(IndexMatch) ++ '_', lists:reverse(OtherMatch)};
+condition([NIndex | OtherIndex], ['+' | OtherTokens], N, IndexMatch, OtherMatch) when
+ NIndex =:= N
+->
+ condition(OtherIndex, OtherTokens, N + 1, ['_' | IndexMatch], OtherMatch);
+condition(Index, ['+' | OtherTokens], N, IndexMatch, OtherMatch) ->
+ condition(Index, OtherTokens, N + 1, IndexMatch, ['_' | OtherMatch]);
+condition([NIndex | OtherIndex], [Token | OtherTokens], N, IndexMatch, OtherMatch) when
+ NIndex =:= N, is_binary(Token)
+->
+ condition(OtherIndex, OtherTokens, N + 1, [Token | IndexMatch], OtherMatch);
+condition(Index, [Token | OtherTokens], N, IndexMatch, OtherMatch) when
+ is_binary(Token)
+->
+ condition(Index, OtherTokens, N + 1, IndexMatch, [Token | OtherMatch]).
+
+restore_topic(_Index, [], OtherTokens, _N, Tokens) ->
+ lists:reverse(Tokens) ++ OtherTokens;
+restore_topic([NIndex | OtherIndex], [IndexToken | OtherIndexTokens], OtherTokens, N, Tokens) when
+ NIndex =:= N
+->
+ restore_topic(OtherIndex, OtherIndexTokens, OtherTokens, N + 1, [IndexToken | Tokens]);
+restore_topic(OtherIndex, IndexTokens, [Token | OtherTokens], N, Tokens) ->
+ restore_topic(OtherIndex, IndexTokens, OtherTokens, N + 1, [Token | Tokens]).
diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl
index fd76442b9..235d30cc0 100644
--- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl
+++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl
@@ -20,6 +20,7 @@
-include("emqx_retainer.hrl").
-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -36,15 +37,52 @@
-export([create_resource/1]).
--record(retained, {topic, msg, expiry_time}).
+-export([reindex/2, reindex_status/0]).
--type batch_read_result() ::
- {ok, list(emqx:message()), cursor()}.
+-ifdef(TEST).
+-export([populate_index_meta/0]).
+-export([reindex/3]).
+-endif.
+
+-record(retained_message, {topic, msg, expiry_time}).
+-record(retained_index, {key, expiry_time}).
+-record(retained_index_meta, {key, read_indices, write_indices, reindexing, extra}).
+
+-define(META_KEY, index_meta).
+
+-define(CLEAR_BATCH_SIZE, 1000).
+-define(REINDEX_BATCH_SIZE, 1000).
+-define(REINDEX_DISPATCH_WAIT, 30000).
%%--------------------------------------------------------------------
-%% emqx_retainer_storage callbacks
+%% emqx_retainer callbacks
%%--------------------------------------------------------------------
+
create_resource(#{storage_type := StorageType}) ->
+ ok = create_table(
+ ?TAB_INDEX_META,
+ retained_index_meta,
+ record_info(fields, retained_index_meta),
+ set,
+ StorageType
+ ),
+ ok = populate_index_meta(),
+ ok = create_table(
+ ?TAB_MESSAGE,
+ retained_message,
+ record_info(fields, retained_message),
+ ordered_set,
+ StorageType
+ ),
+ ok = create_table(
+ ?TAB_INDEX,
+ retained_index,
+ record_info(fields, retained_index),
+ ordered_set,
+ StorageType
+ ).
+
+create_table(Table, RecordName, Attributes, Type, StorageType) ->
Copies =
case StorageType of
ram -> ram_copies;
@@ -60,221 +98,321 @@ create_resource(#{storage_type := StorageType}) ->
{dets, [{auto_save, 1000}]}
],
- ok = mria:create_table(?TAB, [
- {type, ordered_set},
+ ok = mria:create_table(Table, [
+ {type, Type},
{rlog_shard, ?RETAINER_SHARD},
{storage, Copies},
- {record_name, retained},
- {attributes, record_info(fields, retained)},
+ {record_name, RecordName},
+ {attributes, Attributes},
{storage_properties, StoreProps}
]),
ok = mria_rlog:wait_for_shards([?RETAINER_SHARD], infinity),
- case mnesia:table_info(?TAB, storage_type) of
+ case mnesia:table_info(Table, storage_type) of
Copies ->
ok;
_Other ->
- {atomic, ok} = mnesia:change_table_copy_type(?TAB, node(), Copies),
+ {atomic, ok} = mnesia:change_table_copy_type(Table, node(), Copies),
ok
end.
-store_retained(_, Msg = #message{topic = Topic}) ->
+store_retained(_, #message{topic = Topic} = Msg) ->
ExpiryTime = emqx_retainer:get_expiry_time(Msg),
- case is_table_full() of
- false ->
- mria:dirty_write(
- ?TAB,
- #retained{
- topic = topic2tokens(Topic),
- msg = Msg,
- expiry_time = ExpiryTime
- }
- );
- _ ->
- Tokens = topic2tokens(Topic),
- Fun = fun() ->
- case mnesia:read(?TAB, Tokens) of
- [_] ->
- mnesia:write(
- ?TAB,
- #retained{
- topic = Tokens,
- msg = Msg,
- expiry_time = ExpiryTime
- },
- write
- );
- [] ->
- mnesia:abort(table_is_full)
+ Tokens = topic_to_tokens(Topic),
+ Fun =
+ case is_table_full() of
+ false ->
+ fun() ->
+ store_retained(db_indices(write), Msg, Tokens, ExpiryTime)
+ end;
+ _ ->
+ fun() ->
+ case mnesia:read(?TAB_MESSAGE, Tokens, write) of
+ [_] ->
+ store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
+ [] ->
+ mnesia:abort(table_is_full)
+ end
end
- end,
- case mria:transaction(?RETAINER_SHARD, Fun) of
- {atomic, ok} ->
- ok;
- {aborted, Reason} ->
- ?SLOG(error, #{
- msg => "failed_to_retain_message",
- topic => Topic,
- reason => Reason
- })
- end
+ end,
+ case mria:transaction(?RETAINER_SHARD, Fun) of
+ {atomic, ok} ->
+ ?tp(debug, message_retained, #{topic => Topic}),
+ ok;
+ {aborted, Reason} ->
+ ?SLOG(error, #{
+ msg => "failed_to_retain_message",
+ topic => Topic,
+ reason => Reason
+ })
end.
clear_expired(_) ->
NowMs = erlang:system_time(millisecond),
- MsHd = #retained{topic = '$1', msg = '_', expiry_time = '$3'},
- Ms = [{MsHd, [{'=/=', '$3', 0}, {'<', '$3', NowMs}], ['$1']}],
+ QH = qlc:q([
+ TopicTokens
+ || #retained_message{
+ topic = TopicTokens,
+ expiry_time = ExpiryTime
+ } <- mnesia:table(?TAB_MESSAGE, [{lock, write}]),
+ (ExpiryTime =/= 0) and (ExpiryTime < NowMs)
+ ]),
Fun = fun() ->
- Keys = mnesia:select(?TAB, Ms, write),
- lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys)
+ QC = qlc:cursor(QH),
+ clear_batch(db_indices(write), QC)
end,
{atomic, _} = mria:transaction(?RETAINER_SHARD, Fun),
ok.
delete_message(_, Topic) ->
- case emqx_topic:wildcard(Topic) of
- true ->
- match_delete_messages(Topic);
- false ->
- Tokens = topic2tokens(Topic),
- Fun = fun() ->
- mnesia:delete({?TAB, Tokens})
- end,
- _ = mria:transaction(?RETAINER_SHARD, Fun),
- ok
- end,
+ Tokens = topic_to_tokens(Topic),
+ DeleteFun =
+ case emqx_topic:wildcard(Topic) of
+ false ->
+ fun() ->
+ ok = delete_message_by_topic(Tokens, db_indices(write))
+ end;
+ true ->
+ fun() ->
+ QH = topic_search_table(Tokens),
+ qlc:fold(
+ fun(TopicTokens, _) ->
+ ok = delete_message_by_topic(TopicTokens, db_indices(write))
+ end,
+ undefined,
+ QH
+ )
+ end
+ end,
+ {atomic, _} = mria:transaction(?RETAINER_SHARD, DeleteFun),
ok.
read_message(_, Topic) ->
{ok, read_messages(Topic)}.
-page_read(_, Topic, Page, Limit) ->
- Cursor = make_cursor(Topic),
- case Page > 1 of
- true ->
- _ = qlc:next_answers(Cursor, (Page - 1) * Limit),
- ok;
- _ ->
- ok
- end,
- Rows = qlc:next_answers(Cursor, Limit),
- qlc:delete_cursor(Cursor),
- {ok, Rows}.
-
-match_messages(_, Topic, Cursor) ->
- BatchReadNum = emqx:get_config([retainer, flow_control, batch_read_number]),
- case Cursor of
- undefined ->
- case BatchReadNum of
- 0 ->
- {ok, sort_retained(match_messages(Topic)), undefined};
- _ ->
- start_batch_read(Topic, BatchReadNum)
- end;
- _ ->
- batch_read_messages(Cursor, BatchReadNum)
+match_messages(_, Topic, undefined) ->
+ Tokens = topic_to_tokens(Topic),
+ Now = erlang:system_time(millisecond),
+ QH = search_table(Tokens, Now),
+ case batch_read_number() of
+ all_remaining ->
+ {ok, qlc:eval(QH), undefined};
+ BatchNum when is_integer(BatchNum) ->
+ Cursor = qlc:cursor(QH),
+ match_messages(undefined, Topic, {Cursor, BatchNum})
+ end;
+match_messages(_, _Topic, {Cursor, BatchNum}) ->
+ case qlc_next_answers(Cursor, BatchNum) of
+ {closed, Rows} ->
+ {ok, Rows, undefined};
+ {more, Rows} ->
+ {ok, Rows, {Cursor, BatchNum}}
end.
+page_read(_, Topic, Page, Limit) ->
+ Now = erlang:system_time(millisecond),
+ QH =
+ case Topic of
+ undefined ->
+ search_table(undefined, ['#'], Now);
+ _ ->
+ Tokens = topic_to_tokens(Topic),
+ search_table(Tokens, Now)
+ end,
+ OrderedQH = qlc:sort(QH, {order, fun compare_message/2}),
+ Cursor = qlc:cursor(OrderedQH),
+ NSkip = (Page - 1) * Limit,
+ SkipResult =
+ case NSkip > 0 of
+ true ->
+ {Result, _} = qlc_next_answers(Cursor, NSkip),
+ Result;
+ false ->
+ more
+ end,
+ PageRows =
+ case SkipResult of
+ closed ->
+ [];
+ more ->
+ case qlc_next_answers(Cursor, Limit) of
+ {closed, Rows} ->
+ Rows;
+ {more, Rows} ->
+ qlc:delete_cursor(Cursor),
+ Rows
+ end
+ end,
+ {ok, PageRows}.
+
clean(_) ->
- _ = mria:clear_table(?TAB),
+ _ = mria:clear_table(?TAB_MESSAGE),
+ _ = mria:clear_table(?TAB_INDEX),
ok.
size(_) ->
table_size().
+reindex(Force, StatusFun) ->
+ reindex(config_indices(), Force, StatusFun).
+
+reindex_status() ->
+ Fun = fun() ->
+ mnesia:read(?TAB_INDEX_META, ?META_KEY)
+ end,
+ case mria:transaction(?RETAINER_SHARD, Fun) of
+ {atomic, [#retained_index_meta{reindexing = true}]} ->
+ true;
+ {atomic, _} ->
+ false;
+ {aborted, Reason} ->
+ {error, Reason}
+ end.
+
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
-sort_retained([]) -> [];
-sort_retained([Msg]) -> [Msg];
-sort_retained(Msgs) -> lists:sort(fun compare_message/2, Msgs).
+
+store_retained(Indices, Msg, Tokens, ExpiryTime) ->
+ ok = store_retained_message(Msg, Tokens, ExpiryTime),
+ ok = emqx_retainer_index:foreach_index_key(
+ fun(Key) -> store_retained_index(Key, ExpiryTime) end,
+ Indices,
+ Tokens
+ ).
+
+store_retained_message(Msg, Tokens, ExpiryTime) ->
+ RetainedMessage = #retained_message{
+ topic = Tokens,
+ msg = Msg,
+ expiry_time = ExpiryTime
+ },
+ mnesia:write(?TAB_MESSAGE, RetainedMessage, write).
+
+store_retained_index(Key, ExpiryTime) ->
+ RetainedIndex = #retained_index{
+ key = Key,
+ expiry_time = ExpiryTime
+ },
+ mnesia:write(?TAB_INDEX, RetainedIndex, write).
+
+topic_search_table(Tokens) ->
+ Index = emqx_retainer_index:select_index(Tokens, db_indices(read)),
+ topic_search_table(Index, Tokens).
+
+topic_search_table(undefined, Tokens) ->
+ Cond = emqx_retainer_index:condition(Tokens),
+ Ms = [{#retained_message{topic = Cond, msg = '_', expiry_time = '_'}, [], ['$_']}],
+ MsgQH = mnesia:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]),
+ qlc:q([Topic || #retained_message{topic = Topic} <- MsgQH]);
+topic_search_table(Index, Tokens) ->
+ Cond = emqx_retainer_index:condition(Index, Tokens),
+ Ms = [{#retained_index{key = Cond, expiry_time = '_'}, [], ['$_']}],
+ IndexQH = mnesia:table(?TAB_INDEX, [{traverse, {select, Ms}}]),
+ qlc:q([
+ emqx_retainer_index:restore_topic(Key)
+ || #retained_index{key = Key} <- IndexQH
+ ]).
+
+search_table(Tokens, Now) ->
+ Indices = dirty_read_indices(),
+ Index = emqx_retainer_index:select_index(Tokens, Indices),
+ search_table(Index, Tokens, Now).
+
+search_table(undefined, Tokens, Now) ->
+ Ms = make_message_match_spec(Tokens, Now),
+ ets:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]);
+search_table(Index, Tokens, Now) ->
+ Ms = make_index_match_spec(Index, Tokens, Now),
+ Topics = [
+ emqx_retainer_index:restore_topic(Key)
+ || #retained_index{key = Key} <- ets:select(?TAB_INDEX, Ms)
+ ],
+ RetainedMsgQH = qlc:q([
+ ets:lookup(?TAB_MESSAGE, TopicTokens)
+ || TopicTokens <- Topics
+ ]),
+ qlc:q([
+ Msg
+ || [
+ #retained_message{
+ msg = Msg,
+ expiry_time = ExpiryTime
+ }
+ ] <- RetainedMsgQH,
+ (ExpiryTime == 0) or (ExpiryTime > Now)
+ ]).
+
+dirty_read_indices() ->
+ case ets:lookup(?TAB_INDEX_META, ?META_KEY) of
+ [#retained_index_meta{read_indices = ReadIndices}] -> ReadIndices;
+ [] -> []
+ end.
+
+clear_batch(Indices, QC) ->
+ {Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE),
+ lists:foreach(
+ fun(TopicTokens) -> delete_message_by_topic(TopicTokens, Indices) end,
+ Rows
+ ),
+ case Result of
+ closed -> ok;
+ more -> clear_batch(Indices, QC)
+ end.
+
+delete_message_by_topic(TopicTokens, Indices) ->
+ ok = emqx_retainer_index:foreach_index_key(
+ fun(Key) ->
+ mnesia:delete({?TAB_INDEX, Key})
+ end,
+ Indices,
+ TopicTokens
+ ),
+ ok = mnesia:delete({?TAB_MESSAGE, TopicTokens}).
compare_message(M1, M2) ->
M1#message.timestamp =< M2#message.timestamp.
-topic2tokens(Topic) ->
+topic_to_tokens(Topic) ->
emqx_topic:words(Topic).
--spec start_batch_read(topic(), pos_integer()) -> batch_read_result().
-start_batch_read(Topic, MaxReadNum) ->
- Cursor = make_cursor(Topic),
- batch_read_messages(Cursor, MaxReadNum).
-
--spec batch_read_messages(emqx_retainer_storage:cursor(), pos_integer()) -> batch_read_result().
-batch_read_messages(Cursor, MaxReadNum) ->
- Answers = qlc:next_answers(Cursor, MaxReadNum),
- case erlang:length(Answers) < MaxReadNum of
- true ->
- qlc:delete_cursor(Cursor),
- {ok, Answers, undefined};
- _ ->
- {ok, Answers, Cursor}
- end.
-
-spec read_messages(emqx_types:topic()) ->
[emqx_types:message()].
read_messages(Topic) ->
- Tokens = topic2tokens(Topic),
- case mnesia:dirty_read(?TAB, Tokens) of
+ Tokens = topic_to_tokens(Topic),
+ case mnesia:dirty_read(?TAB_MESSAGE, Tokens) of
[] ->
[];
- [#retained{msg = Msg, expiry_time = Et}] ->
+ [#retained_message{msg = Msg, expiry_time = Et}] ->
case Et =:= 0 orelse Et >= erlang:system_time(millisecond) of
true -> [Msg];
false -> []
end
end.
--spec match_messages(emqx_types:topic()) ->
- [emqx_types:message()].
-match_messages(Filter) ->
- Ms = make_match_spec(Filter),
- mnesia:dirty_select(?TAB, Ms).
-
--spec match_delete_messages(emqx_types:topic()) -> ok.
-match_delete_messages(Filter) ->
- Cond = condition(emqx_topic:words(Filter)),
- MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'},
- Ms = [{MsHd, [], ['$_']}],
- Rs = mnesia:dirty_select(?TAB, Ms),
- lists:foreach(fun(R) -> mria:dirty_delete_object(?TAB, R) end, Rs).
-
-%% @private
-condition(Ws) ->
- Ws1 = [
- case W =:= '+' of
- true -> '_';
- _ -> W
- end
- || W <- Ws
- ],
- case lists:last(Ws1) =:= '#' of
- false -> Ws1;
- _ -> (Ws1 -- ['#']) ++ '_'
+qlc_next_answers(QC, N) ->
+ case qlc:next_answers(QC, N) of
+ NextAnswers when
+ is_list(NextAnswers) andalso
+ length(NextAnswers) < N
+ ->
+ qlc:delete_cursor(QC),
+ {closed, NextAnswers};
+ NextAnswers when is_list(NextAnswers) ->
+ {more, NextAnswers};
+ {error, Module, Reason} ->
+ qlc:delete_cursor(QC),
+ error({qlc_error, Module, Reason})
end.
--spec make_match_spec(undefined | topic()) -> ets:match_spec().
-make_match_spec(Topic) ->
- NowMs = erlang:system_time(millisecond),
- Cond =
- case Topic of
- undefined ->
- '_';
- _ ->
- condition(emqx_topic:words(Topic))
- end,
- MsHd = #retained{topic = Cond, msg = '$2', expiry_time = '$3'},
- [
- {MsHd, [{'=:=', '$3', 0}], ['$2']},
- {MsHd, [{'>', '$3', NowMs}], ['$2']}
- ].
+make_message_match_spec(Tokens, NowMs) ->
+ Cond = emqx_retainer_index:condition(Tokens),
+ MsHd = #retained_message{topic = Cond, msg = '$2', expiry_time = '$3'},
+ [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$2']}].
--spec make_cursor(undefined | topic()) -> qlc:query_cursor().
-make_cursor(Topic) ->
- Ms = make_match_spec(Topic),
- TabQH = ets:table(?TAB, [{traverse, {select, Ms}}]),
- QH = qlc:q([E || E <- TabQH]),
- QH2 = qlc:sort(QH, {order, fun compare_message/2}),
- qlc:cursor(QH2).
+make_index_match_spec(Index, Tokens, NowMs) ->
+ Cond = emqx_retainer_index:condition(Index, Tokens),
+ MsHd = #retained_index{key = Cond, expiry_time = '$3'},
+ [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
-spec is_table_full() -> boolean().
is_table_full() ->
@@ -283,4 +421,203 @@ is_table_full() ->
-spec table_size() -> non_neg_integer().
table_size() ->
- mnesia:table_info(?TAB, size).
+ mnesia:table_info(?TAB_MESSAGE, size).
+
+config_indices() ->
+ lists:sort(emqx_config:get([retainer, backend, index_specs])).
+
+populate_index_meta() ->
+ ConfigIndices = config_indices(),
+ Fun = fun() ->
+ case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
+ [
+ #retained_index_meta{
+ read_indices = ReadIndices,
+ write_indices = WriteIndices,
+ reindexing = Reindexing
+ }
+ ] ->
+ case {ReadIndices, WriteIndices, Reindexing} of
+ {_, _, true} ->
+ ok;
+ {ConfigIndices, ConfigIndices, false} ->
+ ok;
+ {DBWriteIndices, DBReadIndices, false} ->
+ {error, DBWriteIndices, DBReadIndices}
+ end;
+ [] ->
+ mnesia:write(
+ ?TAB_INDEX_META,
+ #retained_index_meta{
+ key = ?META_KEY,
+ read_indices = ConfigIndices,
+ write_indices = ConfigIndices,
+ reindexing = false
+ },
+ write
+ )
+ end
+ end,
+ case mria:transaction(?RETAINER_SHARD, Fun) of
+ {atomic, ok} ->
+ ok;
+ {atomic, {error, DBWriteIndices, DBReadIndices}} ->
+ ?SLOG(warning, #{
+ msg => "emqx_retainer_outdated_indices",
+ config_indices => ConfigIndices,
+ db_write_indices => DBWriteIndices,
+ db_read_indices => DBReadIndices
+ }),
+ ok;
+ {aborted, Reason} ->
+ ?SLOG(error, #{
+ msg => "failed_to_populate_emqx_retainer_indices",
+ reason => Reason
+ }),
+ {error, Reason}
+ end.
+
+db_indices(Type) ->
+ case mnesia:read(?TAB_INDEX_META, ?META_KEY) of
+ [#retained_index_meta{read_indices = ReadIndices, write_indices = WriteIndices}] ->
+ case Type of
+ read -> ReadIndices;
+ write -> WriteIndices
+ end;
+ [] ->
+ []
+ end.
+
+batch_read_number() ->
+ case emqx:get_config([retainer, flow_control, batch_read_number]) of
+ 0 -> all_remaining;
+ BatchNum when is_integer(BatchNum) -> BatchNum
+ end.
+
+reindex(NewIndices, Force, StatusFun) when
+ is_boolean(Force) andalso is_function(StatusFun, 1)
+->
+ %% Disable read indices and update write indices so that new records are written
+ %% with correct indices. Also block parallel reindexing.
+ case try_start_reindex(NewIndices, Force) of
+ {atomic, ok} ->
+ %% Wait for all dispatch operations to be completed to avoid
+ %% inconsistent results.
+ true = wait_dispatch_complete(?REINDEX_DISPATCH_WAIT),
+
+ %% All new dispatch operations will see
+ %% indices disabled, so we feel free to clear index table.
+
+ %% Clear old index records.
+ {atomic, ok} = mria:clear_table(?TAB_INDEX),
+
+ %% Fill index records in batches.
+ QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]),
+ ok = reindex_batch(qlc:cursor(QH), 0, StatusFun),
+
+ %% Enable read indices and unlock reindexing.
+ finalize_reindex();
+ {atomic, Reason} ->
+ Reason
+ end.
+
+try_start_reindex(NewIndices, true) ->
+ mria:transaction(
+ ?RETAINER_SHARD,
+ fun() -> start_reindex(NewIndices) end
+ );
+try_start_reindex(NewIndices, false) ->
+ mria:transaction(
+ ?RETAINER_SHARD,
+ fun() ->
+ case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
+ [#retained_index_meta{reindexing = false}] ->
+ start_reindex(NewIndices);
+ _ ->
+ {error, already_started}
+ end
+ end
+ ).
+
+start_reindex(NewIndices) ->
+ ?tp(warning, retainer_message_reindexing_started, #{
+ hint => <<"during reindexing, subscription performance may degrade">>
+ }),
+ mnesia:write(
+ ?TAB_INDEX_META,
+ #retained_index_meta{
+ key = ?META_KEY,
+ read_indices = [],
+ write_indices = NewIndices,
+ reindexing = true
+ },
+ write
+ ).
+
+finalize_reindex() ->
+ {atomic, ok} = mria:transaction(
+ ?RETAINER_SHARD,
+ fun() ->
+ case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
+ [#retained_index_meta{write_indices = WriteIndices} = Meta] ->
+ mnesia:write(
+ ?TAB_INDEX_META,
+ Meta#retained_index_meta{
+ key = ?META_KEY,
+ read_indices = WriteIndices,
+ reindexing = false
+ },
+ write
+ );
+ [] ->
+ ok
+ end
+ end
+ ),
+ ?tp(warning, retainer_message_reindexing_finished, #{}),
+ ok.
+
+reindex_topic(Indices, Topic) ->
+ case mnesia:read(?TAB_MESSAGE, Topic, read) of
+ [#retained_message{expiry_time = ExpiryTime}] ->
+ ok = emqx_retainer_index:foreach_index_key(
+ fun(Key) -> store_retained_index(Key, ExpiryTime) end,
+ Indices,
+ Topic
+ );
+ [] ->
+ ok
+ end.
+
+reindex_batch(QC, Done, StatusFun) ->
+ Fun = fun() ->
+ Indices = db_indices(write),
+ {Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE),
+ ok = lists:foreach(
+ fun(Topic) -> reindex_topic(Indices, Topic) end,
+ Topics
+ ),
+ {Status, Done + length(Topics)}
+ end,
+ case mria:transaction(?RETAINER_SHARD, Fun) of
+ {atomic, {more, NewDone}} ->
+ _ = StatusFun(NewDone),
+ reindex_batch(QC, NewDone, StatusFun);
+ {atomic, {closed, NewDone}} ->
+ _ = StatusFun(NewDone),
+ ok;
+ {aborted, Reason} ->
+ ?SLOG(error, #{
+ msg => "failed_to_reindex_retained_messages",
+ reason => Reason
+ }),
+ {error, Reason}
+ end.
+
+wait_dispatch_complete(Timeout) ->
+ Nodes = mria_mnesia:running_nodes(),
+ {Results, []} = emqx_retainer_proto_v1:wait_dispatch_complete(Nodes, Timeout),
+ lists:all(
+ fun(Result) -> Result =:= ok end,
+ Results
+ ).
diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl
new file mode 100644
index 000000000..8e0c81a6d
--- /dev/null
+++ b/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl
@@ -0,0 +1,77 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_retainer_mnesia_cli).
+
+-include_lib("emqx/include/logger.hrl").
+
+-export([load/0, retainer/1, unload/0]).
+
+-define(PRINT_MSG(Msg), io:format(Msg)).
+
+-define(PRINT(Format, Args), io:format(Format, Args)).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+load() ->
+ ok = emqx_ctl:register_command(retainer, {?MODULE, retainer}, []).
+
+retainer(["reindex", "status"]) ->
+ case emqx_retainer_mnesia:reindex_status() of
+ true ->
+ ?PRINT_MSG("Reindexing is in progress~n");
+ false ->
+ ?PRINT_MSG("Reindexing is not running~n");
+ {error, Reason} ->
+ ?PRINT("Can't get reindex status: ~p~n", [Reason])
+ end;
+retainer(["reindex", "start"]) ->
+ retainer(["reindex", "start", "false"]);
+retainer(["reindex", "start", ForceParam]) ->
+ Force =
+ case ForceParam of
+ "true" -> true;
+ _ -> false
+ end,
+ ?PRINT_MSG("Starting reindexing~n"),
+ emqx_retainer_mnesia:reindex(
+ Force,
+ fun(Done) ->
+ ?SLOG(
+ info,
+ #{
+ msg => "retainer_message_record_reindexing_progress",
+ done => Done
+ }
+ ),
+ ?PRINT("Reindexed ~p messages~n", [Done])
+ end
+ ),
+ ?PRINT_MSG("Reindexing finished~n");
+retainer(_) ->
+ emqx_ctl:usage(
+ [
+ {"retainer reindex status", "Show reindex status"},
+ {"retainer reindex start [force]",
+ "Generate new retainer topic indices config settings.\n"
+ "Pass true as to ignore previously started reindexing"}
+ ]
+ ).
+
+unload() ->
+ ok = emqx_ctl:unregister_command(retainer).
diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl
index a15892453..7b46f147c 100644
--- a/apps/emqx_retainer/src/emqx_retainer_schema.erl
+++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl
@@ -5,6 +5,13 @@
-export([roots/0, fields/1, desc/1, namespace/0]).
+-define(DEFAULT_INDICES, [
+ [1, 2, 3],
+ [1, 3],
+ [2, 3],
+ [3]
+]).
+
namespace() -> "retainer".
roots() -> ["retainer"].
@@ -54,7 +61,8 @@ fields(mnesia_config) ->
max_retained_messages,
0,
fun is_pos_integer/1
- )}
+ )},
+ {index_specs, fun retainer_indices/1}
];
fields(flow_control) ->
[
@@ -113,3 +121,36 @@ backend_config() ->
backend,
mnesia_config
).
+
+retainer_indices(type) ->
+ list(list(integer()));
+retainer_indices(desc) ->
+ "Retainer index specifications: list of arrays of positive ascending integers. "
+ "Each array specifies an index. Numbers in an index specification are 1-based "
+ "word positions in topics. Words from specified positions will be used for indexing.
"
+ "For example, it is good to have [2, 4]
index to optimize "
+ "+/X/+/Y/...
topic wildcard subscriptions.";
+retainer_indices(example) ->
+ [[2, 4], [1, 3]];
+retainer_indices(default) ->
+ ?DEFAULT_INDICES;
+retainer_indices(validator) ->
+ fun is_valid_index_specs/1;
+retainer_indices(_) ->
+ undefined.
+
+is_valid_index_specs(IndexSpecs) ->
+ case lists:all(fun is_valid_index_spec/1, IndexSpecs) of
+ true ->
+ case length(IndexSpecs) =:= ordsets:size(ordsets:from_list(IndexSpecs)) of
+ true -> ok;
+ false -> {error, duplicate_index_specs}
+ end;
+ false ->
+ {error, invalid_index_spec}
+ end.
+
+is_valid_index_spec(IndexSpec) ->
+ length(IndexSpec) > 0 andalso
+ lists:all(fun(Idx) -> Idx > 0 end, IndexSpec) andalso
+ IndexSpec =:= ordsets:to_list(ordsets:from_list(IndexSpec)).
diff --git a/apps/emqx_retainer/src/proto/emqx_retainer_proto_v1.erl b/apps/emqx_retainer/src/proto/emqx_retainer_proto_v1.erl
new file mode 100644
index 000000000..dfe71b196
--- /dev/null
+++ b/apps/emqx_retainer/src/proto/emqx_retainer_proto_v1.erl
@@ -0,0 +1,33 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_retainer_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+-export([
+ introduced_in/0,
+ wait_dispatch_complete/2
+]).
+
+introduced_in() ->
+ "5.0.0".
+
+-spec wait_dispatch_complete(list(node()), timeout()) -> emqx_rpc:multicall_result(ok).
+wait_dispatch_complete(Nodes, Timeout) ->
+ rpc:multicall(Nodes, emqx_retainer_dispatcher, wait_dispatch_complete, [Timeout]).
diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl
index 30201bdef..f207bdd04 100644
--- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl
+++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl
@@ -19,13 +19,30 @@
-compile(export_all).
-compile(nowarn_export_all).
--define(APP, emqx_retainer).
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
+-include("emqx_retainer.hrl").
+
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-all() -> emqx_common_test_helpers:all(?MODULE).
+all() ->
+ [
+ {group, mnesia_without_indices},
+ {group, mnesia_with_indices},
+ {group, mnesia_reindex}
+ ].
+
+groups() ->
+ [
+ {mnesia_without_indices, [sequence], common_tests()},
+ {mnesia_with_indices, [sequence], common_tests()},
+ {mnesia_reindex, [sequence], [t_reindex]}
+ ].
+
+common_tests() ->
+ emqx_common_test_helpers:all(?MODULE) -- [t_reindex].
-define(BASE_CONF, <<
""
@@ -54,13 +71,9 @@ all() -> emqx_common_test_helpers:all(?MODULE).
%%--------------------------------------------------------------------
init_per_suite(Config) ->
- application:load(emqx_conf),
- ok = ekka:start(),
- ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
-
load_base_conf(),
emqx_ratelimiter_SUITE:base_conf(),
- emqx_common_test_helpers:start_apps([emqx_retainer]),
+ emqx_common_test_helpers:start_apps([emqx_conf, ?APP]),
Config.
end_per_suite(_Config) ->
@@ -68,11 +81,26 @@ end_per_suite(_Config) ->
mria:stop(),
mria_mnesia:delete_schema(),
- emqx_common_test_helpers:stop_apps([emqx_retainer]).
+ emqx_common_test_helpers:stop_apps([?APP, emqx_conf]).
-init_per_testcase(_, Config) ->
- {ok, _} = emqx_cluster_rpc:start_link(),
- timer:sleep(200),
+init_per_group(mnesia_without_indices, Config) ->
+ mnesia:clear_table(?TAB_INDEX_META),
+ mnesia:clear_table(?TAB_INDEX),
+ mnesia:clear_table(?TAB_MESSAGE),
+ Config;
+init_per_group(mnesia_reindex, Config) ->
+ emqx_retainer_mnesia:populate_index_meta(),
+ mnesia:clear_table(?TAB_INDEX),
+ mnesia:clear_table(?TAB_MESSAGE),
+ Config;
+init_per_group(_, Config) ->
+ emqx_retainer_mnesia:populate_index_meta(),
+ mnesia:clear_table(?TAB_INDEX),
+ mnesia:clear_table(?TAB_MESSAGE),
+ Config.
+
+end_per_group(_Group, Config) ->
+ emqx_retainer_mnesia:populate_index_meta(),
Config.
load_base_conf() ->
@@ -116,6 +144,8 @@ t_retain_handling(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
+ ok = emqx_retainer:clean(),
+
%% rh = 0, no wildcard, and with empty retained message
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
?assertEqual(0, length(receive_messages(1))),
@@ -247,21 +277,26 @@ t_message_expiry(_) ->
ok = emqtt:disconnect(C1).
t_message_expiry_2(_) ->
- emqx_retainer:update_config(#{<<"msg_expiry_interval">> => <<"2s">>}),
- {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
- {ok, _} = emqtt:connect(C1),
- emqtt:publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}]),
+ ConfMod = fun(Conf) ->
+ Conf#{<<"msg_expiry_interval">> := <<"2s">>}
+ end,
+ Case = fun() ->
+ {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+ {ok, _} = emqtt:connect(C1),
+ emqtt:publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}]),
- {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
- ?assertEqual(1, length(receive_messages(1))),
- timer:sleep(4000),
- {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
- ?assertEqual(0, length(receive_messages(1))),
- {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
+ {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
+ ?assertEqual(1, length(receive_messages(1))),
+ timer:sleep(4000),
+ {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
+ ?assertEqual(0, length(receive_messages(1))),
+ {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
- emqtt:publish(C1, <<"retained">>, <<"">>, [{qos, 0}, {retain, true}]),
+ emqtt:publish(C1, <<"retained">>, <<"">>, [{qos, 0}, {retain, true}]),
- ok = emqtt:disconnect(C1).
+ ok = emqtt:disconnect(C1)
+ end,
+ with_conf(ConfMod, Case).
t_clean(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
@@ -488,10 +523,107 @@ t_only_for_coverage(_) ->
true = erlang:exit(Dispatcher, normal),
ok.
+t_reindex(_) ->
+ {ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+ {ok, _} = emqtt:connect(C),
+
+ ok = emqx_retainer:clean(),
+ ok = emqx_retainer_mnesia:reindex([[1, 3]], false, fun(_Done) -> ok end),
+
+ %% Prepare retained messages for "retained/N1/N2" topics
+ ?check_trace(
+ ?wait_async_action(
+ lists:foreach(
+ fun(N1) ->
+ lists:foreach(
+ fun(N2) ->
+ emqtt:publish(
+ C,
+ erlang:iolist_to_binary([
+ <<"retained/">>,
+ io_lib:format("~5..0w", [N1]),
+ <<"/">>,
+ io_lib:format("~5..0w", [N2])
+ ]),
+ <<"this is a retained message">>,
+ [{qos, 0}, {retain, true}]
+ )
+ end,
+ lists:seq(1, 10)
+ )
+ end,
+ lists:seq(1, 1000)
+ ),
+ #{?snk_kind := message_retained, topic := <<"retained/01000/00010">>},
+ 5000
+ ),
+ []
+ ),
+
+ ?check_trace(
+ ?wait_async_action(
+ begin
+ %% Spawn reindexing in the background
+ spawn_link(
+ fun() ->
+ timer:sleep(1000),
+ emqx_retainer_mnesia:reindex(
+ [[1, 4]],
+ false,
+ fun(Done) ->
+ ?tp(
+ info,
+ reindexing_progress,
+ #{done => Done}
+ )
+ end
+ )
+ end
+ ),
+
+ %% Subscribe to "retained/N/+" for some time, while reindexing is in progress
+ T = erlang:monotonic_time(millisecond),
+ ok = test_retain_while_reindexing(C, T + 3000)
+ end,
+ #{?snk_kind := reindexing_progress, done := 10000},
+ 10000
+ ),
+ fun(Trace) ->
+ ?assertMatch(
+ [_ | _],
+ lists:filter(
+ fun
+ (#{done := 10000}) -> true;
+ (_) -> false
+ end,
+ ?of_kind(reindexing_progress, Trace)
+ )
+ )
+ end
+ ).
+
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
+test_retain_while_reindexing(C, Deadline) ->
+ case erlang:monotonic_time(millisecond) > Deadline of
+ true ->
+ ok;
+ false ->
+ N = rand:uniform(1000),
+ Topic = iolist_to_binary([
+ <<"retained/">>,
+ io_lib:format("~5..0w", [N]),
+ <<"/+">>
+ ]),
+ {ok, #{}, [0]} = emqtt:subscribe(C, Topic, [{qos, 0}, {rh, 0}]),
+ Messages = receive_messages(10),
+ ?assertEqual(10, length(Messages)),
+ {ok, #{}, [0]} = emqtt:unsubscribe(C, Topic),
+ test_retain_while_reindexing(C, Deadline)
+ end.
+
receive_messages(Count) ->
receive_messages(Count, []).
receive_messages(0, Msgs) ->
diff --git a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl
index ba062a883..9b80a1cc3 100644
--- a/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl
+++ b/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl
@@ -21,6 +21,7 @@
-include("emqx_retainer.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
@@ -89,7 +90,6 @@ t_messages(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqx_retainer:clean(),
- timer:sleep(500),
Each = fun(I) ->
emqtt:publish(
@@ -100,8 +100,14 @@ t_messages(_) ->
)
end,
- lists:foreach(Each, lists:seq(1, 5)),
- timer:sleep(500),
+ ?check_trace(
+ ?wait_async_action(
+ lists:foreach(Each, lists:seq(1, 5)),
+ #{?snk_kind := message_retained, topic := <<"retained/A">>},
+ 500
+ ),
+ []
+ ),
{ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
Msgs = decode_json(MsgsJson),
diff --git a/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl
index 69c4c6801..a6902c428 100644
--- a/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl
+++ b/apps/emqx_retainer/test/emqx_retainer_cli_SUITE.erl
@@ -19,21 +19,54 @@
-compile(export_all).
-compile(nowarn_export_all).
+-include("emqx_retainer.hrl").
+
-include_lib("eunit/include/eunit.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
-init_per_testcase(_TestCase, Config) ->
+init_per_suite(Config) ->
+ emqx_retainer_SUITE:load_base_conf(),
+ %% Start Apps
+ emqx_common_test_helpers:start_apps([emqx_retainer]),
Config.
-end_per_testcase(_TestCase, Config) ->
- Config.
+end_per_suite(_Config) ->
+ emqx_common_test_helpers:stop_apps([emqx_retainer]).
-% t_cmd(_) ->
-% error('TODO').
+t_reindex_status(_Config) ->
+ ok = emqx_retainer_mnesia_cli:retainer(["reindex", "status"]).
-% t_unload(_) ->
-% error('TODO').
+t_reindex(_Config) ->
+ {ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+ {ok, _} = emqtt:connect(C),
-% t_load(_) ->
-% error('TODO').
+ ok = emqx_retainer:clean(),
+
+ ?check_trace(
+ ?wait_async_action(
+ lists:foreach(
+ fun(N) ->
+ emqtt:publish(
+ C,
+ erlang:iolist_to_binary([
+ <<"retained/">>,
+ io_lib:format("~5..0w", [N])
+ ]),
+ <<"this is a retained message">>,
+ [{qos, 0}, {retain, true}]
+ )
+ end,
+ lists:seq(1, 1000)
+ ),
+ #{?snk_kind := message_retained, topic := <<"retained/01000">>},
+ 1000
+ ),
+ []
+ ),
+
+ emqx_config:put([retainer, backend, index_specs], [[4, 5]]),
+ ok = emqx_retainer_mnesia_cli:retainer(["reindex", "start"]),
+
+ ?assertEqual(1000, mnesia:table_info(?TAB_INDEX, size)).
diff --git a/apps/emqx_retainer/test/emqx_retainer_index_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_index_SUITE.erl
new file mode 100644
index 000000000..ce37f2788
--- /dev/null
+++ b/apps/emqx_retainer/test/emqx_retainer_index_SUITE.erl
@@ -0,0 +1,218 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_retainer_index_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(_TestCase, Config) ->
+ Config.
+
+t_foreach_index_key(_Config) ->
+ put(index_key, undefined),
+ ok = emqx_retainer_index:foreach_index_key(
+ fun(IndexKey) -> put(index_key, IndexKey) end,
+ [[1, 3]],
+ [<<"a">>, <<"b">>, <<"c">>]
+ ),
+
+ ?assertEqual(
+ {[1, 3], {[<<"a">>, <<"c">>], [<<"b">>]}},
+ get(index_key)
+ ).
+
+t_to_index_key(_Config) ->
+ ?assertEqual(
+ {[1, 3], {[<<"a">>, <<"c">>], [<<"b">>]}},
+ emqx_retainer_index:to_index_key(
+ [1, 3],
+ [<<"a">>, <<"b">>, <<"c">>]
+ )
+ ),
+
+ ?assertEqual(
+ {[1, 4], {[<<"a">>], [<<"b">>, <<"c">>]}},
+ emqx_retainer_index:to_index_key(
+ [1, 4],
+ [<<"a">>, <<"b">>, <<"c">>]
+ )
+ ).
+
+t_index_score(_Config) ->
+ ?assertEqual(
+ 0,
+ emqx_retainer_index:index_score(
+ [1, 4],
+ ['+', <<"a">>, <<"b">>, '+']
+ )
+ ),
+
+ ?assertEqual(
+ 0,
+ emqx_retainer_index:index_score(
+ [1, 2],
+ ['+', <<"a">>, <<"b">>, '+']
+ )
+ ),
+
+ ?assertEqual(
+ 2,
+ emqx_retainer_index:index_score(
+ [1, 2],
+ [<<"a">>, <<"b">>, '+']
+ )
+ ),
+
+ ?assertEqual(
+ 1,
+ emqx_retainer_index:index_score(
+ [1, 2],
+ [<<"a">>]
+ )
+ ),
+
+ ?assertEqual(
+ 1,
+ emqx_retainer_index:index_score(
+ [2, 3, 4, 5],
+ ['+', <<"a">>, '#']
+ )
+ ),
+
+ ?assertEqual(
+ 2,
+ emqx_retainer_index:index_score(
+ [2, 3, 4, 5],
+ ['+', <<"a">>, <<"b">>, '+']
+ )
+ ).
+
+t_select_index(_Config) ->
+ ?assertEqual(
+ [2, 3, 4, 5],
+ emqx_retainer_index:select_index(
+ ['+', <<"a">>, <<"b">>, '+'],
+ [
+ [1, 4],
+ [2, 3, 4, 5],
+ [1, 2]
+ ]
+ )
+ ),
+
+ ?assertEqual(
+ undefined,
+ emqx_retainer_index:select_index(
+ ['+', <<"a">>, <<"b">>, '+'],
+ [
+ [1, 4]
+ ]
+ )
+ ).
+
+t_condition(_Config) ->
+ ?assertEqual(
+ ['_', <<"a">>, <<"b">>, '_'],
+ emqx_retainer_index:condition(
+ ['+', <<"a">>, <<"b">>, '+']
+ )
+ ),
+
+ ?assertEqual(
+ ['_', <<"a">> | '_'],
+ emqx_retainer_index:condition(
+ ['+', <<"a">>, '#']
+ )
+ ).
+
+t_condition_index(_Config) ->
+ ?assertEqual(
+ {[2, 3], {[<<"a">>, <<"b">>], ['_', '_']}},
+ emqx_retainer_index:condition(
+ [2, 3],
+ ['+', <<"a">>, <<"b">>, '+']
+ )
+ ),
+
+ ?assertEqual(
+ {[3, 4], {[<<"b">>, '_'], ['_', <<"a">>]}},
+ emqx_retainer_index:condition(
+ [3, 4],
+ ['+', <<"a">>, <<"b">>, '+']
+ )
+ ),
+
+ ?assertEqual(
+ {[3, 5], {[<<"b">> | '_'], ['_', <<"a">>, '_']}},
+ emqx_retainer_index:condition(
+ [3, 5],
+ ['+', <<"a">>, <<"b">>, '+']
+ )
+ ),
+
+ ?assertEqual(
+ {[3, 5], {[<<"b">> | '_'], ['_', <<"a">> | '_']}},
+ emqx_retainer_index:condition(
+ [3, 5],
+ ['+', <<"a">>, <<"b">>, '#']
+ )
+ ),
+
+ ?assertEqual(
+ {[3, 4], {[<<"b">> | '_'], ['_', <<"a">> | '_']}},
+ emqx_retainer_index:condition(
+ [3, 4],
+ ['+', <<"a">>, <<"b">>, '#']
+ )
+ ),
+
+ ?assertEqual(
+ {[1], {[<<"a">>], '_'}},
+ emqx_retainer_index:condition(
+ [1],
+ [<<"a">>, '#']
+ )
+ ).
+
+t_restore_topic(_Config) ->
+ ?assertEqual(
+ [<<"x">>, <<"a">>, <<"b">>, <<"y">>],
+ emqx_retainer_index:restore_topic(
+ {[2, 3], {[<<"a">>, <<"b">>], [<<"x">>, <<"y">>]}}
+ )
+ ),
+
+ ?assertEqual(
+ [<<"x">>, <<"a">>, <<"b">>, <<"y">>],
+ emqx_retainer_index:restore_topic(
+ {[3, 4], {[<<"b">>, <<"y">>], [<<"x">>, <<"a">>]}}
+ )
+ ),
+
+ ?assertEqual(
+ [<<"x">>, <<"a">>, <<"b">>, <<"y">>],
+ emqx_retainer_index:restore_topic(
+ {[3, 5], {[<<"b">>], [<<"x">>, <<"a">>, <<"y">>]}}
+ )
+ ).