746 lines
23 KiB
Erlang
746 lines
23 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_retainer_mnesia).
|
|
|
|
-behaviour(emqx_retainer).
|
|
-behaviour(emqx_db_backup).
|
|
|
|
-include("emqx_retainer.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
-include_lib("stdlib/include/ms_transform.hrl").
|
|
|
|
%% emqx_retainer callbacks
|
|
-export([
|
|
create/1,
|
|
update/2,
|
|
close/1,
|
|
delete_message/2,
|
|
store_retained/2,
|
|
read_message/2,
|
|
page_read/4,
|
|
match_messages/3,
|
|
delete_cursor/2,
|
|
clear_expired/1,
|
|
clean/1,
|
|
size/1
|
|
]).
|
|
|
|
%% Internal exports (RPC)
|
|
-export([
|
|
do_populate_index_meta/1,
|
|
do_reindex_batch/2,
|
|
active_indices/0
|
|
]).
|
|
|
|
%% Management API:
|
|
-export([topics/0]).
|
|
|
|
-export([reindex/2, reindex_status/0]).
|
|
|
|
-export([populate_index_meta/0]).
|
|
-export([reindex/3]).
|
|
|
|
-export([
|
|
backup_tables/0,
|
|
on_backup_table_imported/2
|
|
]).
|
|
|
|
-record(retained_message, {topic, msg, expiry_time}).
|
|
-record(retained_index, {key, expiry_time}).
|
|
-record(retained_index_meta, {key, read_indices, write_indices, reindexing, extra}).
|
|
|
|
-define(META_KEY, index_meta).
|
|
|
|
-define(REINDEX_BATCH_SIZE, 1000).
|
|
-define(REINDEX_DISPATCH_WAIT, 30000).
|
|
-define(REINDEX_RPC_RETRY_INTERVAL, 1000).
|
|
-define(REINDEX_INDEX_UPDATE_WAIT, 30000).
|
|
|
|
-define(MESSAGE_SCAN_BATCH_SIZE, 100).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Management API
|
|
%%--------------------------------------------------------------------
|
|
|
|
topics() ->
|
|
[emqx_topic:join(I) || I <- mnesia:dirty_all_keys(?TAB_MESSAGE)].
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Data backup
|
|
%%--------------------------------------------------------------------
|
|
|
|
backup_tables() ->
|
|
[?TAB_MESSAGE || is_enabled()].
|
|
|
|
on_backup_table_imported(?TAB_MESSAGE, Opts) ->
|
|
case is_enabled() of
|
|
true ->
|
|
maybe_print("Starting reindexing retained messages ~n", [], Opts),
|
|
Res = reindex(false, mk_status_fun(Opts)),
|
|
maybe_print("Reindexing retained messages finished~n", [], Opts),
|
|
Res;
|
|
false ->
|
|
ok
|
|
end;
|
|
on_backup_table_imported(_Tab, _Opts) ->
|
|
ok.
|
|
|
|
mk_status_fun(Opts) ->
|
|
fun(Done) ->
|
|
log_status(Done),
|
|
maybe_print("Reindexed ~p messages~n", [Done], Opts)
|
|
end.
|
|
|
|
maybe_print(Fmt, Args, #{print_fun := Fun}) when is_function(Fun, 2) ->
|
|
Fun(Fmt, Args);
|
|
maybe_print(_Fmt, _Args, _Opts) ->
|
|
ok.
|
|
|
|
log_status(Done) ->
|
|
?SLOG(
|
|
info,
|
|
#{
|
|
msg => "retainer_message_record_reindexing_progress",
|
|
done => Done
|
|
}
|
|
).
|
|
|
|
is_enabled() ->
|
|
emqx_retainer:enabled() andalso emqx_retainer:backend_module() =:= ?MODULE.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% emqx_retainer callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
create(#{storage_type := StorageType, max_retained_messages := MaxRetainedMessages} = Config) ->
|
|
ok = create_table(
|
|
?TAB_INDEX_META,
|
|
retained_index_meta,
|
|
record_info(fields, retained_index_meta),
|
|
set,
|
|
StorageType
|
|
),
|
|
ok = populate_index_meta(Config),
|
|
ok = create_table(
|
|
?TAB_MESSAGE,
|
|
retained_message,
|
|
record_info(fields, retained_message),
|
|
ordered_set,
|
|
StorageType
|
|
),
|
|
ok = create_table(
|
|
?TAB_INDEX,
|
|
retained_index,
|
|
record_info(fields, retained_index),
|
|
ordered_set,
|
|
StorageType
|
|
),
|
|
#{storage_type => StorageType, max_retained_messages => MaxRetainedMessages}.
|
|
|
|
create_table(Table, RecordName, Attributes, Type, StorageType) ->
|
|
Copies =
|
|
case StorageType of
|
|
ram -> ram_copies;
|
|
disc -> disc_copies
|
|
end,
|
|
|
|
StoreProps = [
|
|
{ets, [
|
|
compressed,
|
|
{read_concurrency, true},
|
|
{write_concurrency, true}
|
|
]},
|
|
{dets, [{auto_save, 1000}]}
|
|
],
|
|
|
|
ok = mria:create_table(Table, [
|
|
{type, Type},
|
|
{rlog_shard, ?RETAINER_SHARD},
|
|
{storage, Copies},
|
|
{record_name, RecordName},
|
|
{attributes, Attributes},
|
|
{storage_properties, StoreProps}
|
|
]),
|
|
ok = mria_rlog:wait_for_shards([?RETAINER_SHARD], infinity),
|
|
ok = mria:wait_for_tables([Table]),
|
|
case mnesia:table_info(Table, storage_type) of
|
|
Copies ->
|
|
ok;
|
|
_Other ->
|
|
{atomic, ok} = mnesia:change_table_copy_type(Table, node(), Copies),
|
|
ok
|
|
end.
|
|
|
|
update(_State, _NewConfig) ->
|
|
need_recreate.
|
|
|
|
close(_State) -> ok.
|
|
|
|
store_retained(State, Msg = #message{topic = Topic}) ->
|
|
ExpiryTime = emqx_retainer:get_expiry_time(Msg),
|
|
Tokens = topic_to_tokens(Topic),
|
|
case is_table_full(State) andalso is_new_topic(Tokens) of
|
|
true ->
|
|
?SLOG(error, #{
|
|
msg => "failed_to_retain_message",
|
|
topic => Topic,
|
|
reason => table_is_full
|
|
});
|
|
false ->
|
|
do_store_retained(Msg, Tokens, ExpiryTime),
|
|
?tp(message_retained, #{topic => Topic}),
|
|
ok
|
|
end.
|
|
|
|
clear_expired(_) ->
|
|
case mria_rlog:role() of
|
|
core ->
|
|
clear_expired();
|
|
_ ->
|
|
ok
|
|
end.
|
|
|
|
clear_expired() ->
|
|
NowMs = erlang:system_time(millisecond),
|
|
S0 = ets_stream(?TAB_MESSAGE),
|
|
S1 = emqx_utils_stream:filter(
|
|
fun(#retained_message{expiry_time = ExpiryTime}) ->
|
|
ExpiryTime =/= 0 andalso ExpiryTime < NowMs
|
|
end,
|
|
S0
|
|
),
|
|
DirtyWriteIndices = dirty_indices(write),
|
|
emqx_utils_stream:foreach(
|
|
fun(RetainedMsg) ->
|
|
delete_message_with_indices(RetainedMsg, DirtyWriteIndices)
|
|
end,
|
|
S1
|
|
).
|
|
|
|
delete_message(_State, Topic) ->
|
|
Tokens = topic_to_tokens(Topic),
|
|
case emqx_topic:wildcard(Topic) of
|
|
false ->
|
|
ok = delete_message_by_topic(Tokens, dirty_indices(write));
|
|
true ->
|
|
S = search_stream(Tokens, 0),
|
|
DirtyWriteIndices = dirty_indices(write),
|
|
emqx_utils_stream:foreach(
|
|
fun(RetainedMsg) -> delete_message_with_indices(RetainedMsg, DirtyWriteIndices) end,
|
|
S
|
|
)
|
|
end.
|
|
|
|
read_message(_State, Topic) ->
|
|
{ok, read_messages(Topic)}.
|
|
|
|
match_messages(State, Topic, undefined) ->
|
|
Tokens = topic_to_tokens(Topic),
|
|
Now = erlang:system_time(millisecond),
|
|
S = msg_stream(search_stream(Tokens, Now)),
|
|
case batch_read_number() of
|
|
all_remaining ->
|
|
{ok, emqx_utils_stream:consume(S), undefined};
|
|
BatchNum when is_integer(BatchNum) ->
|
|
match_messages(State, Topic, {S, BatchNum})
|
|
end;
|
|
match_messages(_State, _Topic, {S0, BatchNum}) ->
|
|
case emqx_utils_stream:consume(BatchNum, S0) of
|
|
{Rows, S1} ->
|
|
{ok, Rows, {S1, BatchNum}};
|
|
Rows when is_list(Rows) ->
|
|
{ok, Rows, undefined}
|
|
end.
|
|
|
|
delete_cursor(_State, _Cursor) ->
|
|
ok.
|
|
|
|
page_read(_State, Topic, Page, Limit) ->
|
|
Now = erlang:system_time(millisecond),
|
|
S0 =
|
|
case Topic of
|
|
undefined ->
|
|
msg_stream(search_stream(undefined, ['#'], Now));
|
|
_ ->
|
|
Tokens = topic_to_tokens(Topic),
|
|
msg_stream(search_stream(Tokens, Now))
|
|
end,
|
|
%% This is very inefficient, but we are limited with inherited API
|
|
S1 = emqx_utils_stream:list(
|
|
lists:sort(
|
|
fun compare_message/2,
|
|
emqx_utils_stream:consume(S0)
|
|
)
|
|
),
|
|
NSkip = (Page - 1) * Limit,
|
|
S2 = emqx_utils_stream:drop(NSkip, S1),
|
|
case emqx_utils_stream:consume(Limit, S2) of
|
|
{Rows, _S3} ->
|
|
{ok, true, Rows};
|
|
Rows when is_list(Rows) ->
|
|
{ok, false, Rows}
|
|
end.
|
|
|
|
clean(_) ->
|
|
_ = mria:clear_table(?TAB_MESSAGE),
|
|
_ = mria:clear_table(?TAB_INDEX),
|
|
ok.
|
|
|
|
size(_) ->
|
|
table_size().
|
|
|
|
reindex(Force, StatusFun) ->
|
|
Config = emqx:get_config([retainer, backend]),
|
|
reindex(config_indices(Config), Force, StatusFun).
|
|
|
|
reindex_status() ->
|
|
case mnesia:dirty_read(?TAB_INDEX_META, ?META_KEY) of
|
|
[#retained_index_meta{reindexing = true}] ->
|
|
true;
|
|
_ ->
|
|
false
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
do_store_retained(Msg, TopicTokens, ExpiryTime) ->
|
|
%% Retained message is stored syncronously on all core nodes
|
|
%%
|
|
%% No transaction, meaning that concurrent writes in the cluster may
|
|
%% lead to inconsistent replicas. This could manifest in two clients
|
|
%% getting different retained messages for the same topic, depending
|
|
%% on which node they are connected to. We tolerate that.
|
|
ok = do_store_retained_message(Msg, TopicTokens, ExpiryTime),
|
|
%% Since retained message was stored syncronously on all core nodes,
|
|
%% now we are sure that
|
|
%% * either we will write correct indices
|
|
%% * or if we a replicant with outdated write indices due to reindexing,
|
|
%% the correct indices will be added by reindexing
|
|
%%
|
|
%% No transacation as well, meaning that concurrent writes in the cluster
|
|
%% may lead to inconsistent index replicas. This essentially allows for
|
|
%% inconsistent query results, where index entry has different expiry time
|
|
%% than the message it points to.
|
|
ok = do_store_retained_indices(TopicTokens, ExpiryTime).
|
|
|
|
do_store_retained_message(Msg, TopicTokens, ExpiryTime) ->
|
|
RetainedMessage = #retained_message{
|
|
topic = TopicTokens,
|
|
msg = Msg,
|
|
expiry_time = ExpiryTime
|
|
},
|
|
ok = mria:dirty_write_sync(?TAB_MESSAGE, RetainedMessage).
|
|
|
|
do_store_retained_indices(TopicTokens, ExpiryTime) ->
|
|
Indices = dirty_indices(write),
|
|
ok = mria:async_dirty(?RETAINER_SHARD, fun() ->
|
|
emqx_retainer_index:foreach_index_key(
|
|
fun(Key) -> do_store_retained_index(Key, ExpiryTime) end,
|
|
Indices,
|
|
TopicTokens
|
|
)
|
|
end).
|
|
|
|
do_store_retained_index(Key, ExpiryTime) ->
|
|
RetainedIndex = #retained_index{
|
|
key = Key,
|
|
expiry_time = ExpiryTime
|
|
},
|
|
mnesia:write(?TAB_INDEX, RetainedIndex, write).
|
|
|
|
msg_stream(SearchStream) ->
|
|
emqx_utils_stream:map(
|
|
fun(#retained_message{msg = Msg}) -> Msg end,
|
|
SearchStream
|
|
).
|
|
|
|
search_stream(Tokens, Now) ->
|
|
Indices = dirty_indices(read),
|
|
Index = emqx_retainer_index:select_index(Tokens, Indices),
|
|
search_stream(Index, Tokens, Now).
|
|
|
|
search_stream(undefined, Tokens, Now) ->
|
|
Ms = make_message_match_spec(Tokens, Now),
|
|
emqx_utils_stream:ets(
|
|
fun
|
|
(undefined) -> ets:select(?TAB_MESSAGE, Ms, 1);
|
|
(Cont) -> ets:select(Cont)
|
|
end
|
|
);
|
|
search_stream(Index, FilterTokens, Now) ->
|
|
{Ms, IsExactMs} = make_index_match_spec(Index, FilterTokens, Now),
|
|
IndexRecordStream = emqx_utils_stream:ets(
|
|
fun
|
|
(undefined) -> ets:select(?TAB_INDEX, Ms, 1);
|
|
(Cont) -> ets:select(Cont)
|
|
end
|
|
),
|
|
TopicStream = emqx_utils_stream:map(
|
|
fun(#retained_index{key = Key}) -> emqx_retainer_index:restore_topic(Key) end,
|
|
IndexRecordStream
|
|
),
|
|
MatchingTopicStream = emqx_utils_stream:filter(
|
|
fun(TopicTokens) -> match(IsExactMs, TopicTokens, FilterTokens) end,
|
|
TopicStream
|
|
),
|
|
RetainMsgStream = emqx_utils_stream:chainmap(
|
|
fun(TopicTokens) -> emqx_utils_stream:list(ets:lookup(?TAB_MESSAGE, TopicTokens)) end,
|
|
MatchingTopicStream
|
|
),
|
|
ValidRetainMsgStream = emqx_utils_stream:filter(
|
|
fun(#retained_message{expiry_time = ExpiryTime}) ->
|
|
ExpiryTime =:= 0 orelse ExpiryTime > Now
|
|
end,
|
|
RetainMsgStream
|
|
),
|
|
ValidRetainMsgStream.
|
|
|
|
match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true;
|
|
match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens).
|
|
|
|
delete_message_by_topic(TopicTokens, Indices) ->
|
|
case mnesia:dirty_read(?TAB_MESSAGE, TopicTokens) of
|
|
[] -> ok;
|
|
[RetainedMsg] -> delete_message_with_indices(RetainedMsg, Indices)
|
|
end.
|
|
|
|
delete_message_with_indices(RetainedMsg, Indices) ->
|
|
#retained_message{topic = TopicTokens, expiry_time = ExpiryTime} = RetainedMsg,
|
|
ok = emqx_retainer_index:foreach_index_key(
|
|
fun(Key) ->
|
|
mria:dirty_delete_object(?TAB_INDEX, #retained_index{
|
|
key = Key, expiry_time = ExpiryTime
|
|
})
|
|
end,
|
|
Indices,
|
|
TopicTokens
|
|
),
|
|
ok = mria:dirty_delete_object(?TAB_MESSAGE, RetainedMsg).
|
|
|
|
compare_message(M1, M2) ->
|
|
M1#message.timestamp =< M2#message.timestamp.
|
|
|
|
topic_to_tokens(Topic) ->
|
|
emqx_topic:words(Topic).
|
|
|
|
-spec read_messages(emqx_types:topic()) ->
|
|
[emqx_types:message()].
|
|
read_messages(Topic) ->
|
|
Tokens = topic_to_tokens(Topic),
|
|
case mnesia:dirty_read(?TAB_MESSAGE, Tokens) of
|
|
[] ->
|
|
[];
|
|
[#retained_message{msg = Msg, expiry_time = Et}] ->
|
|
case Et =:= 0 orelse Et >= erlang:system_time(millisecond) of
|
|
true -> [Msg];
|
|
false -> []
|
|
end
|
|
end.
|
|
|
|
make_message_match_spec(Tokens, NowMs) ->
|
|
Cond = emqx_retainer_index:condition(Tokens),
|
|
MsHd = #retained_message{topic = Cond, msg = '_', expiry_time = '$3'},
|
|
[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
|
|
|
|
make_index_match_spec(Index, Tokens, NowMs) ->
|
|
{Cond, IsExact} = emqx_retainer_index:condition(Index, Tokens),
|
|
MsHd = #retained_index{key = Cond, expiry_time = '$3'},
|
|
{[{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}], IsExact}.
|
|
|
|
is_table_full(#{max_retained_messages := MaxRetainedMessages} = _State) ->
|
|
MaxRetainedMessages > 0 andalso (table_size() >= MaxRetainedMessages).
|
|
|
|
is_new_topic(Tokens) ->
|
|
case mnesia:dirty_read(?TAB_MESSAGE, Tokens) of
|
|
[_] ->
|
|
false;
|
|
[] ->
|
|
true
|
|
end.
|
|
|
|
table_size() ->
|
|
mnesia:table_info(?TAB_MESSAGE, size).
|
|
|
|
config_indices(#{index_specs := IndexSpecs}) ->
|
|
IndexSpecs.
|
|
|
|
populate_index_meta() ->
|
|
Config = emqx:get_config([retainer, backend]),
|
|
populate_index_meta(Config).
|
|
|
|
populate_index_meta(Config) ->
|
|
ConfigIndices = config_indices(Config),
|
|
case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_populate_index_meta/1, [ConfigIndices]) of
|
|
{atomic, ok} ->
|
|
ok;
|
|
{atomic, {error, DBWriteIndices, DBReadIndices}} ->
|
|
?SLOG(warning, #{
|
|
msg => "emqx_retainer_outdated_indices",
|
|
config_indices => ConfigIndices,
|
|
db_write_indices => DBWriteIndices,
|
|
db_read_indices => DBReadIndices
|
|
}),
|
|
ok;
|
|
{aborted, Reason} ->
|
|
?SLOG(error, #{
|
|
msg => "failed_to_populate_emqx_retainer_indices",
|
|
reason => Reason
|
|
}),
|
|
{error, Reason}
|
|
end.
|
|
|
|
do_populate_index_meta(ConfigIndices) ->
|
|
case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
|
|
[
|
|
#retained_index_meta{
|
|
read_indices = ReadIndices,
|
|
write_indices = WriteIndices,
|
|
reindexing = Reindexing
|
|
}
|
|
] ->
|
|
case {ReadIndices, WriteIndices, Reindexing} of
|
|
{_, _, true} ->
|
|
ok;
|
|
{ConfigIndices, ConfigIndices, false} ->
|
|
ok;
|
|
{DBWriteIndices, DBReadIndices, false} ->
|
|
{error, DBWriteIndices, DBReadIndices}
|
|
end;
|
|
[] ->
|
|
mnesia:write(
|
|
?TAB_INDEX_META,
|
|
#retained_index_meta{
|
|
key = ?META_KEY,
|
|
read_indices = ConfigIndices,
|
|
write_indices = ConfigIndices,
|
|
reindexing = false
|
|
},
|
|
write
|
|
)
|
|
end.
|
|
|
|
dirty_indices(Type) ->
|
|
indices(ets:lookup(?TAB_INDEX_META, ?META_KEY), Type).
|
|
|
|
db_indices(Type) ->
|
|
indices(mnesia:read(?TAB_INDEX_META, ?META_KEY), Type).
|
|
|
|
indices(IndexRecords, Type) ->
|
|
case IndexRecords of
|
|
[#retained_index_meta{read_indices = ReadIndices, write_indices = WriteIndices}] ->
|
|
case Type of
|
|
read -> ReadIndices;
|
|
write -> WriteIndices
|
|
end;
|
|
[] ->
|
|
[]
|
|
end.
|
|
|
|
batch_read_number() ->
|
|
case emqx:get_config([retainer, flow_control, batch_read_number]) of
|
|
0 -> all_remaining;
|
|
BatchNum when is_integer(BatchNum) -> BatchNum
|
|
end.
|
|
|
|
reindex(NewIndices, Force, StatusFun) when
|
|
is_boolean(Force) andalso is_function(StatusFun, 1)
|
|
->
|
|
%% Do not run on replicants
|
|
core = mria_rlog:role(),
|
|
%% Disable read indices and update write indices so that new records are written
|
|
%% with correct indices. Also block parallel reindexing.
|
|
case try_start_reindex(NewIndices, Force) of
|
|
{atomic, ok} ->
|
|
%% Wait for all nodes to have new indices, including rlog nodes
|
|
true = wait_indices_updated({[], NewIndices}, ?REINDEX_INDEX_UPDATE_WAIT),
|
|
|
|
%% Wait for all dispatch operations to be completed to avoid
|
|
%% inconsistent results.
|
|
true = wait_dispatch_complete(?REINDEX_DISPATCH_WAIT),
|
|
|
|
%% All new dispatch operations will see
|
|
%% indices disabled, so we feel free to clear index table.
|
|
|
|
%% Clear old index records.
|
|
{atomic, ok} = mria:clear_table(?TAB_INDEX),
|
|
|
|
%% Fill index records in batches.
|
|
TopicStream = emqx_utils_stream:map(
|
|
fun(#retained_message{topic = Topic}) -> Topic end,
|
|
ets_stream(?TAB_MESSAGE)
|
|
),
|
|
ok = reindex_batch(TopicStream, 0, StatusFun),
|
|
|
|
%% Enable read indices and unlock reindexing.
|
|
finalize_reindex();
|
|
{atomic, Reason} ->
|
|
Reason
|
|
end.
|
|
|
|
try_start_reindex(NewIndices, true) ->
|
|
%% Note: we don't expect reindexing during upgrade, so this function is internal
|
|
mria:transaction(
|
|
?RETAINER_SHARD,
|
|
fun() -> start_reindex(NewIndices) end
|
|
);
|
|
try_start_reindex(NewIndices, false) ->
|
|
mria:transaction(
|
|
?RETAINER_SHARD,
|
|
fun() ->
|
|
case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
|
|
[#retained_index_meta{reindexing = false}] ->
|
|
start_reindex(NewIndices);
|
|
_ ->
|
|
{error, already_started}
|
|
end
|
|
end
|
|
).
|
|
|
|
start_reindex(NewIndices) ->
|
|
?tp(warning, retainer_message_reindexing_started, #{
|
|
hint => <<"during reindexing, subscription performance may degrade">>
|
|
}),
|
|
mnesia:write(
|
|
?TAB_INDEX_META,
|
|
#retained_index_meta{
|
|
key = ?META_KEY,
|
|
read_indices = [],
|
|
write_indices = NewIndices,
|
|
reindexing = true
|
|
},
|
|
write
|
|
).
|
|
|
|
finalize_reindex() ->
|
|
%% Note: we don't expect reindexing during upgrade, so this function is internal
|
|
{atomic, ok} = mria:transaction(
|
|
?RETAINER_SHARD,
|
|
fun() ->
|
|
case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
|
|
[#retained_index_meta{write_indices = WriteIndices} = Meta] ->
|
|
mnesia:write(
|
|
?TAB_INDEX_META,
|
|
Meta#retained_index_meta{
|
|
key = ?META_KEY,
|
|
read_indices = WriteIndices,
|
|
reindexing = false
|
|
},
|
|
write
|
|
);
|
|
[] ->
|
|
ok
|
|
end
|
|
end
|
|
),
|
|
?tp(warning, retainer_message_reindexing_finished, #{}),
|
|
ok.
|
|
|
|
reindex_topic(Indices, Topic) ->
|
|
case mnesia:read(?TAB_MESSAGE, Topic, read) of
|
|
[#retained_message{expiry_time = ExpiryTime}] ->
|
|
ok = emqx_retainer_index:foreach_index_key(
|
|
fun(Key) -> do_store_retained_index(Key, ExpiryTime) end,
|
|
Indices,
|
|
Topic
|
|
);
|
|
[] ->
|
|
ok
|
|
end.
|
|
|
|
reindex_batch(Stream0, Done, StatusFun) ->
|
|
case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [Stream0, Done]) of
|
|
{atomic, {more, NewDone, Stream1}} ->
|
|
_ = StatusFun(NewDone),
|
|
reindex_batch(Stream1, NewDone, StatusFun);
|
|
{atomic, {done, NewDone}} ->
|
|
_ = StatusFun(NewDone),
|
|
ok;
|
|
{aborted, Reason} ->
|
|
?SLOG(error, #{
|
|
msg => "failed_to_reindex_retained_messages",
|
|
reason => Reason
|
|
}),
|
|
{error, Reason}
|
|
end.
|
|
|
|
do_reindex_batch(Stream0, Done) ->
|
|
Indices = db_indices(write),
|
|
Result = emqx_utils_stream:consume(?REINDEX_BATCH_SIZE, Stream0),
|
|
Topics =
|
|
case Result of
|
|
{Rows, _Stream1} ->
|
|
Rows;
|
|
Rows when is_list(Rows) ->
|
|
Rows
|
|
end,
|
|
ok = lists:foreach(
|
|
fun(Topic) -> reindex_topic(Indices, Topic) end,
|
|
Topics
|
|
),
|
|
case Result of
|
|
{_Rows, Stream1} ->
|
|
{more, Done + length(Topics), Stream1};
|
|
_Rows ->
|
|
{done, Done + length(Topics)}
|
|
end.
|
|
|
|
wait_dispatch_complete(Timeout) ->
|
|
Nodes = mria:running_nodes(),
|
|
{Results, []} = emqx_retainer_proto_v2:wait_dispatch_complete(Nodes, Timeout),
|
|
lists:all(
|
|
fun(Result) -> Result =:= ok end,
|
|
Results
|
|
).
|
|
|
|
wait_indices_updated(_Indices, TimeLeft) when TimeLeft < 0 -> false;
|
|
wait_indices_updated(Indices, TimeLeft) ->
|
|
case timer:tc(fun() -> are_indices_updated(Indices) end) of
|
|
{_, true} ->
|
|
true;
|
|
{TimePassed, false} ->
|
|
timer:sleep(?REINDEX_RPC_RETRY_INTERVAL),
|
|
wait_indices_updated(
|
|
Indices, TimeLeft - ?REINDEX_RPC_RETRY_INTERVAL - TimePassed / 1000
|
|
)
|
|
end.
|
|
|
|
active_indices() ->
|
|
{dirty_indices(read), dirty_indices(write)}.
|
|
|
|
are_indices_updated(Indices) ->
|
|
Nodes = mria:running_nodes(),
|
|
case emqx_retainer_proto_v2:active_mnesia_indices(Nodes) of
|
|
{Results, []} ->
|
|
lists:all(
|
|
fun(NodeIndices) -> NodeIndices =:= Indices end,
|
|
Results
|
|
);
|
|
_ ->
|
|
false
|
|
end.
|
|
|
|
ets_stream(Tab) ->
|
|
emqx_utils_stream:ets(
|
|
fun
|
|
(undefined) -> ets:match_object(Tab, '_', ?MESSAGE_SCAN_BATCH_SIZE);
|
|
(Cont) -> ets:match_object(Cont)
|
|
end
|
|
).
|