Merge pull request #11389 from keynslug/perf/EMQX-10706/squash-retained-index

perf(retainer): squash index updates into single dirty activity
This commit is contained in:
Andrew Mayorov 2023-08-03 21:32:04 +04:00 committed by GitHub
commit 420653e5a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 31 additions and 10 deletions

View File

@ -28,7 +28,7 @@
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.9"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.10"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},

View File

@ -2,7 +2,7 @@
{application, emqx_retainer, [ {application, emqx_retainer, [
{description, "EMQX Retainer"}, {description, "EMQX Retainer"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.15"}, {vsn, "5.0.16"},
{modules, []}, {modules, []},
{registered, [emqx_retainer_sup]}, {registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx, emqx_ctl]}, {applications, [kernel, stdlib, emqx, emqx_ctl]},

View File

@ -153,6 +153,14 @@ store_retained(_, Msg = #message{topic = Topic}) ->
end. end.
clear_expired(_) -> clear_expired(_) ->
case mria_rlog:role() of
core ->
clear_expired();
_ ->
ok
end.
clear_expired() ->
NowMs = erlang:system_time(millisecond), NowMs = erlang:system_time(millisecond),
QH = qlc:q([ QH = qlc:q([
RetainedMsg RetainedMsg
@ -263,12 +271,22 @@ reindex_status() ->
do_store_retained(Msg, TopicTokens, ExpiryTime) -> do_store_retained(Msg, TopicTokens, ExpiryTime) ->
%% Retained message is stored syncronously on all core nodes %% Retained message is stored syncronously on all core nodes
%%
%% No transaction, meaning that concurrent writes in the cluster may
%% lead to inconsistent replicas. This could manifest in two clients
%% getting different retained messages for the same topic, depending
%% on which node they are connected to. We tolerate that.
ok = do_store_retained_message(Msg, TopicTokens, ExpiryTime), ok = do_store_retained_message(Msg, TopicTokens, ExpiryTime),
%% Since retained message was stored syncronously on all core nodes, %% Since retained message was stored syncronously on all core nodes,
%% now we are sure that %% now we are sure that
%% * either we will write correct indices %% * either we will write correct indices
%% * or if we a replicant with outdated write indices due to reindexing, %% * or if we a replicant with outdated write indices due to reindexing,
%% the correct indices will be added by reindexing %% the correct indices will be added by reindexing
%%
%% No transacation as well, meaning that concurrent writes in the cluster
%% may lead to inconsistent index replicas. This essentially allows for
%% inconsistent query results, where index entry has different expiry time
%% than the message it points to.
ok = do_store_retained_indices(TopicTokens, ExpiryTime). ok = do_store_retained_indices(TopicTokens, ExpiryTime).
do_store_retained_message(Msg, TopicTokens, ExpiryTime) -> do_store_retained_message(Msg, TopicTokens, ExpiryTime) ->
@ -281,18 +299,20 @@ do_store_retained_message(Msg, TopicTokens, ExpiryTime) ->
do_store_retained_indices(TopicTokens, ExpiryTime) -> do_store_retained_indices(TopicTokens, ExpiryTime) ->
Indices = dirty_indices(write), Indices = dirty_indices(write),
ok = emqx_retainer_index:foreach_index_key( ok = mria:async_dirty(?RETAINER_SHARD, fun() ->
fun(Key) -> do_store_retained_index(Key, ExpiryTime) end, emqx_retainer_index:foreach_index_key(
Indices, fun(Key) -> do_store_retained_index(Key, ExpiryTime) end,
TopicTokens Indices,
). TopicTokens
)
end).
do_store_retained_index(Key, ExpiryTime) -> do_store_retained_index(Key, ExpiryTime) ->
RetainedIndex = #retained_index{ RetainedIndex = #retained_index{
key = Key, key = Key,
expiry_time = ExpiryTime expiry_time = ExpiryTime
}, },
mria:dirty_write(?TAB_INDEX, RetainedIndex). mnesia:write(?TAB_INDEX, RetainedIndex, write).
msg_table(SearchTable) -> msg_table(SearchTable) ->
qlc:q([ qlc:q([

View File

@ -0,0 +1 @@
Improved retained message publishing latency by consolidating multiple index update operations into a single mnesia activity, leveraging the new APIs introduced in mria 0.6.0.

View File

@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do
{:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
{:esockd, github: "emqx/esockd", tag: "5.9.6", override: true}, {:esockd, github: "emqx/esockd", tag: "5.9.6", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-1", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-1", override: true},
{:ekka, github: "emqx/ekka", tag: "0.15.9", override: true}, {:ekka, github: "emqx/ekka", tag: "0.15.10", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true},
{:minirest, github: "emqx/minirest", tag: "1.3.11", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.11", override: true},

View File

@ -62,7 +62,7 @@
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}
, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-1"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-1"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.9"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.10"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.11"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.11"}}}