refactor(retainer_mnesia): Export transactions

This commit is contained in:
ieQu1 2022-08-22 10:56:26 +02:00
parent fa12c66ad9
commit 6f4d0e2ed5
2 changed files with 97 additions and 99 deletions

View File

@ -36,6 +36,15 @@
size/1
]).
%% Internal exports (RPC)
-export([
do_store_retained/1,
do_clear_expired/0,
do_delete_message/1,
do_populate_index_meta/1,
do_reindex_batch/2
]).
%% Management API:
-export([topics/0]).
@ -126,26 +135,8 @@ create_table(Table, RecordName, Attributes, Type, StorageType) ->
ok
end.
store_retained(_, #message{topic = Topic} = Msg) ->
ExpiryTime = emqx_retainer:get_expiry_time(Msg),
Tokens = topic_to_tokens(Topic),
Fun =
case is_table_full() of
false ->
fun() ->
store_retained(db_indices(write), Msg, Tokens, ExpiryTime)
end;
_ ->
fun() ->
case mnesia:read(?TAB_MESSAGE, Tokens, write) of
[_] ->
store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
[] ->
mnesia:abort(table_is_full)
end
end
end,
case mria:transaction(?RETAINER_SHARD, Fun) of
store_retained(_, Msg = #message{topic = Topic}) ->
case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_store_retained/1, [Msg]) of
{atomic, ok} ->
?tp(debug, message_retained, #{topic => Topic}),
ok;
@ -157,7 +148,26 @@ store_retained(_, #message{topic = Topic} = Msg) ->
})
end.
do_store_retained(#message{topic = Topic} = Msg) ->
ExpiryTime = emqx_retainer:get_expiry_time(Msg),
Tokens = topic_to_tokens(Topic),
case is_table_full() of
false ->
store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
_ ->
case mnesia:read(?TAB_MESSAGE, Tokens, write) of
[_] ->
store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
[] ->
mnesia:abort(table_is_full)
end
end.
clear_expired(_) ->
{atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear_expired/0),
ok.
do_clear_expired() ->
NowMs = erlang:system_time(millisecond),
QH = qlc:q([
TopicTokens
@ -167,36 +177,29 @@ clear_expired(_) ->
} <- mnesia:table(?TAB_MESSAGE, [{lock, write}]),
(ExpiryTime =/= 0) and (ExpiryTime < NowMs)
]),
Fun = fun() ->
QC = qlc:cursor(QH),
clear_batch(db_indices(write), QC)
end,
{atomic, _} = mria:transaction(?RETAINER_SHARD, Fun),
ok.
QC = qlc:cursor(QH),
clear_batch(db_indices(write), QC).
delete_message(_, Topic) ->
Tokens = topic_to_tokens(Topic),
DeleteFun =
case emqx_topic:wildcard(Topic) of
false ->
fun() ->
ok = delete_message_by_topic(Tokens, db_indices(write))
end;
true ->
fun() ->
QH = topic_search_table(Tokens),
qlc:fold(
fun(TopicTokens, _) ->
ok = delete_message_by_topic(TopicTokens, db_indices(write))
end,
undefined,
QH
)
end
end,
{atomic, _} = mria:transaction(?RETAINER_SHARD, DeleteFun),
{atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_delete_message/1, [Topic]),
ok.
do_delete_message(Topic) ->
Tokens = topic_to_tokens(Topic),
case emqx_topic:wildcard(Topic) of
false ->
ok = delete_message_by_topic(Tokens, db_indices(write));
true ->
QH = topic_search_table(Tokens),
qlc:fold(
fun(TopicTokens, _) ->
ok = delete_message_by_topic(TopicTokens, db_indices(write))
end,
undefined,
QH
)
end.
read_message(_, Topic) ->
{ok, read_messages(Topic)}.
@ -267,16 +270,11 @@ reindex(Force, StatusFun) ->
reindex(config_indices(), Force, StatusFun).
reindex_status() ->
Fun = fun() ->
mnesia:read(?TAB_INDEX_META, ?META_KEY)
end,
case mria:transaction(?RETAINER_SHARD, Fun) of
{atomic, [#retained_index_meta{reindexing = true}]} ->
case mnesia:dirty_read(?TAB_INDEX_META, ?META_KEY) of
[#retained_index_meta{reindexing = true}] ->
true;
{atomic, _} ->
false;
{aborted, Reason} ->
{error, Reason}
_ ->
false
end.
%%--------------------------------------------------------------------
@ -439,37 +437,7 @@ config_indices() ->
populate_index_meta() ->
ConfigIndices = config_indices(),
Fun = fun() ->
case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
[
#retained_index_meta{
read_indices = ReadIndices,
write_indices = WriteIndices,
reindexing = Reindexing
}
] ->
case {ReadIndices, WriteIndices, Reindexing} of
{_, _, true} ->
ok;
{ConfigIndices, ConfigIndices, false} ->
ok;
{DBWriteIndices, DBReadIndices, false} ->
{error, DBWriteIndices, DBReadIndices}
end;
[] ->
mnesia:write(
?TAB_INDEX_META,
#retained_index_meta{
key = ?META_KEY,
read_indices = ConfigIndices,
write_indices = ConfigIndices,
reindexing = false
},
write
)
end
end,
case mria:transaction(?RETAINER_SHARD, Fun) of
case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_populate_index_meta/1, [ConfigIndices]) of
{atomic, ok} ->
ok;
{atomic, {error, DBWriteIndices, DBReadIndices}} ->
@ -488,6 +456,36 @@ populate_index_meta() ->
{error, Reason}
end.
do_populate_index_meta(ConfigIndices) ->
case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of
[
#retained_index_meta{
read_indices = ReadIndices,
write_indices = WriteIndices,
reindexing = Reindexing
}
] ->
case {ReadIndices, WriteIndices, Reindexing} of
{_, _, true} ->
ok;
{ConfigIndices, ConfigIndices, false} ->
ok;
{DBWriteIndices, DBReadIndices, false} ->
{error, DBWriteIndices, DBReadIndices}
end;
[] ->
mnesia:write(
?TAB_INDEX_META,
#retained_index_meta{
key = ?META_KEY,
read_indices = ConfigIndices,
write_indices = ConfigIndices,
reindexing = false
},
write
)
end.
db_indices(Type) ->
case mnesia:read(?TAB_INDEX_META, ?META_KEY) of
[#retained_index_meta{read_indices = ReadIndices, write_indices = WriteIndices}] ->
@ -533,6 +531,7 @@ reindex(NewIndices, Force, StatusFun) when
end.
try_start_reindex(NewIndices, true) ->
%% Note: we don't expect reindexing during upgrade, so this function is internal
mria:transaction(
?RETAINER_SHARD,
fun() -> start_reindex(NewIndices) end
@ -566,6 +565,7 @@ start_reindex(NewIndices) ->
).
finalize_reindex() ->
%% Note: we don't expect reindexing during upgrade, so this function is internal
{atomic, ok} = mria:transaction(
?RETAINER_SHARD,
fun() ->
@ -601,16 +601,7 @@ reindex_topic(Indices, Topic) ->
end.
reindex_batch(QC, Done, StatusFun) ->
Fun = fun() ->
Indices = db_indices(write),
{Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE),
ok = lists:foreach(
fun(Topic) -> reindex_topic(Indices, Topic) end,
Topics
),
{Status, Done + length(Topics)}
end,
case mria:transaction(?RETAINER_SHARD, Fun) of
case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [QC, Done]) of
{atomic, {more, NewDone}} ->
_ = StatusFun(NewDone),
reindex_batch(QC, NewDone, StatusFun);
@ -625,6 +616,15 @@ reindex_batch(QC, Done, StatusFun) ->
{error, Reason}
end.
do_reindex_batch(QC, Done) ->
Indices = db_indices(write),
{Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE),
ok = lists:foreach(
fun(Topic) -> reindex_topic(Indices, Topic) end,
Topics
),
{Status, Done + length(Topics)}.
wait_dispatch_complete(Timeout) ->
Nodes = mria_mnesia:running_nodes(),
{Results, []} = emqx_retainer_proto_v1:wait_dispatch_complete(Nodes, Timeout),

View File

@ -45,9 +45,7 @@ retainer(["reindex", "status"]) ->
true ->
?PRINT_MSG("Reindexing is in progress~n");
false ->
?PRINT_MSG("Reindexing is not running~n");
{error, Reason} ->
?PRINT("Can't get reindex status: ~p~n", [Reason])
?PRINT_MSG("Reindexing is not running~n")
end;
retainer(["reindex", "start"]) ->
retainer(["reindex", "start", "false"]);