diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index c861d27e4..c236b9c28 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -36,6 +36,15 @@ size/1 ]). +%% Internal exports (RPC) +-export([ + do_store_retained/1, + do_clear_expired/0, + do_delete_message/1, + do_populate_index_meta/1, + do_reindex_batch/2 +]). + %% Management API: -export([topics/0]). @@ -126,26 +135,8 @@ create_table(Table, RecordName, Attributes, Type, StorageType) -> ok end. -store_retained(_, #message{topic = Topic} = Msg) -> - ExpiryTime = emqx_retainer:get_expiry_time(Msg), - Tokens = topic_to_tokens(Topic), - Fun = - case is_table_full() of - false -> - fun() -> - store_retained(db_indices(write), Msg, Tokens, ExpiryTime) - end; - _ -> - fun() -> - case mnesia:read(?TAB_MESSAGE, Tokens, write) of - [_] -> - store_retained(db_indices(write), Msg, Tokens, ExpiryTime); - [] -> - mnesia:abort(table_is_full) - end - end - end, - case mria:transaction(?RETAINER_SHARD, Fun) of +store_retained(_, Msg = #message{topic = Topic}) -> + case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_store_retained/1, [Msg]) of {atomic, ok} -> ?tp(debug, message_retained, #{topic => Topic}), ok; @@ -157,7 +148,26 @@ store_retained(_, #message{topic = Topic} = Msg) -> }) end. +do_store_retained(#message{topic = Topic} = Msg) -> + ExpiryTime = emqx_retainer:get_expiry_time(Msg), + Tokens = topic_to_tokens(Topic), + case is_table_full() of + false -> + store_retained(db_indices(write), Msg, Tokens, ExpiryTime); + _ -> + case mnesia:read(?TAB_MESSAGE, Tokens, write) of + [_] -> + store_retained(db_indices(write), Msg, Tokens, ExpiryTime); + [] -> + mnesia:abort(table_is_full) + end + end. + clear_expired(_) -> + {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear_expired/0), + ok. + +do_clear_expired() -> NowMs = erlang:system_time(millisecond), QH = qlc:q([ TopicTokens @@ -167,36 +177,29 @@ clear_expired(_) -> } <- mnesia:table(?TAB_MESSAGE, [{lock, write}]), (ExpiryTime =/= 0) and (ExpiryTime < NowMs) ]), - Fun = fun() -> - QC = qlc:cursor(QH), - clear_batch(db_indices(write), QC) - end, - {atomic, _} = mria:transaction(?RETAINER_SHARD, Fun), - ok. + QC = qlc:cursor(QH), + clear_batch(db_indices(write), QC). delete_message(_, Topic) -> - Tokens = topic_to_tokens(Topic), - DeleteFun = - case emqx_topic:wildcard(Topic) of - false -> - fun() -> - ok = delete_message_by_topic(Tokens, db_indices(write)) - end; - true -> - fun() -> - QH = topic_search_table(Tokens), - qlc:fold( - fun(TopicTokens, _) -> - ok = delete_message_by_topic(TopicTokens, db_indices(write)) - end, - undefined, - QH - ) - end - end, - {atomic, _} = mria:transaction(?RETAINER_SHARD, DeleteFun), + {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_delete_message/1, [Topic]), ok. +do_delete_message(Topic) -> + Tokens = topic_to_tokens(Topic), + case emqx_topic:wildcard(Topic) of + false -> + ok = delete_message_by_topic(Tokens, db_indices(write)); + true -> + QH = topic_search_table(Tokens), + qlc:fold( + fun(TopicTokens, _) -> + ok = delete_message_by_topic(TopicTokens, db_indices(write)) + end, + undefined, + QH + ) + end. + read_message(_, Topic) -> {ok, read_messages(Topic)}. @@ -267,16 +270,11 @@ reindex(Force, StatusFun) -> reindex(config_indices(), Force, StatusFun). reindex_status() -> - Fun = fun() -> - mnesia:read(?TAB_INDEX_META, ?META_KEY) - end, - case mria:transaction(?RETAINER_SHARD, Fun) of - {atomic, [#retained_index_meta{reindexing = true}]} -> + case mnesia:dirty_read(?TAB_INDEX_META, ?META_KEY) of + [#retained_index_meta{reindexing = true}] -> true; - {atomic, _} -> - false; - {aborted, Reason} -> - {error, Reason} + _ -> + false end. %%-------------------------------------------------------------------- @@ -439,37 +437,7 @@ config_indices() -> populate_index_meta() -> ConfigIndices = config_indices(), - Fun = fun() -> - case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of - [ - #retained_index_meta{ - read_indices = ReadIndices, - write_indices = WriteIndices, - reindexing = Reindexing - } - ] -> - case {ReadIndices, WriteIndices, Reindexing} of - {_, _, true} -> - ok; - {ConfigIndices, ConfigIndices, false} -> - ok; - {DBWriteIndices, DBReadIndices, false} -> - {error, DBWriteIndices, DBReadIndices} - end; - [] -> - mnesia:write( - ?TAB_INDEX_META, - #retained_index_meta{ - key = ?META_KEY, - read_indices = ConfigIndices, - write_indices = ConfigIndices, - reindexing = false - }, - write - ) - end - end, - case mria:transaction(?RETAINER_SHARD, Fun) of + case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_populate_index_meta/1, [ConfigIndices]) of {atomic, ok} -> ok; {atomic, {error, DBWriteIndices, DBReadIndices}} -> @@ -488,6 +456,36 @@ populate_index_meta() -> {error, Reason} end. +do_populate_index_meta(ConfigIndices) -> + case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of + [ + #retained_index_meta{ + read_indices = ReadIndices, + write_indices = WriteIndices, + reindexing = Reindexing + } + ] -> + case {ReadIndices, WriteIndices, Reindexing} of + {_, _, true} -> + ok; + {ConfigIndices, ConfigIndices, false} -> + ok; + {DBWriteIndices, DBReadIndices, false} -> + {error, DBWriteIndices, DBReadIndices} + end; + [] -> + mnesia:write( + ?TAB_INDEX_META, + #retained_index_meta{ + key = ?META_KEY, + read_indices = ConfigIndices, + write_indices = ConfigIndices, + reindexing = false + }, + write + ) + end. + db_indices(Type) -> case mnesia:read(?TAB_INDEX_META, ?META_KEY) of [#retained_index_meta{read_indices = ReadIndices, write_indices = WriteIndices}] -> @@ -533,6 +531,7 @@ reindex(NewIndices, Force, StatusFun) when end. try_start_reindex(NewIndices, true) -> + %% Note: we don't expect reindexing during upgrade, so this function is internal mria:transaction( ?RETAINER_SHARD, fun() -> start_reindex(NewIndices) end @@ -566,6 +565,7 @@ start_reindex(NewIndices) -> ). finalize_reindex() -> + %% Note: we don't expect reindexing during upgrade, so this function is internal {atomic, ok} = mria:transaction( ?RETAINER_SHARD, fun() -> @@ -601,16 +601,7 @@ reindex_topic(Indices, Topic) -> end. reindex_batch(QC, Done, StatusFun) -> - Fun = fun() -> - Indices = db_indices(write), - {Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE), - ok = lists:foreach( - fun(Topic) -> reindex_topic(Indices, Topic) end, - Topics - ), - {Status, Done + length(Topics)} - end, - case mria:transaction(?RETAINER_SHARD, Fun) of + case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [QC, Done]) of {atomic, {more, NewDone}} -> _ = StatusFun(NewDone), reindex_batch(QC, NewDone, StatusFun); @@ -625,6 +616,15 @@ reindex_batch(QC, Done, StatusFun) -> {error, Reason} end. +do_reindex_batch(QC, Done) -> + Indices = db_indices(write), + {Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE), + ok = lists:foreach( + fun(Topic) -> reindex_topic(Indices, Topic) end, + Topics + ), + {Status, Done + length(Topics)}. + wait_dispatch_complete(Timeout) -> Nodes = mria_mnesia:running_nodes(), {Results, []} = emqx_retainer_proto_v1:wait_dispatch_complete(Nodes, Timeout), diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl index 22eeafe08..a576b953d 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl @@ -45,9 +45,7 @@ retainer(["reindex", "status"]) -> true -> ?PRINT_MSG("Reindexing is in progress~n"); false -> - ?PRINT_MSG("Reindexing is not running~n"); - {error, Reason} -> - ?PRINT("Can't get reindex status: ~p~n", [Reason]) + ?PRINT_MSG("Reindexing is not running~n") end; retainer(["reindex", "start"]) -> retainer(["reindex", "start", "false"]);