From 42be694d40835595c3309db77be52fc3774dda4b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 3 Aug 2023 12:51:36 +0400 Subject: [PATCH] perf(retainer): squash index updates into single dirty activity In order to minimize number of round trips to core nodes and between them, improving publishing latency. This shouldn't make consistency worse than it was before. --- .../src/emqx_retainer_mnesia.erl | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 0152c240e..0d407ea78 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -263,12 +263,22 @@ reindex_status() -> do_store_retained(Msg, TopicTokens, ExpiryTime) -> %% Retained message is stored syncronously on all core nodes + %% + %% No transaction, meaning that concurrent writes in the cluster may + %% lead to inconsistent replicas. This could manifest in two clients + %% getting different retained messages for the same topic, depending + %% on which node they are connected to. We tolerate that. ok = do_store_retained_message(Msg, TopicTokens, ExpiryTime), %% Since retained message was stored syncronously on all core nodes, %% now we are sure that %% * either we will write correct indices %% * or if we a replicant with outdated write indices due to reindexing, %% the correct indices will be added by reindexing + %% + %% No transacation as well, meaning that concurrent writes in the cluster + %% may lead to inconsistent index replicas. This essentially allows for + %% inconsistent query results, where index entry has different expiry time + %% than the message it points to. ok = do_store_retained_indices(TopicTokens, ExpiryTime). do_store_retained_message(Msg, TopicTokens, ExpiryTime) -> @@ -281,18 +291,20 @@ do_store_retained_message(Msg, TopicTokens, ExpiryTime) -> do_store_retained_indices(TopicTokens, ExpiryTime) -> Indices = dirty_indices(write), - ok = emqx_retainer_index:foreach_index_key( - fun(Key) -> do_store_retained_index(Key, ExpiryTime) end, - Indices, - TopicTokens - ). + ok = mria:async_dirty(?RETAINER_SHARD, fun() -> + emqx_retainer_index:foreach_index_key( + fun(Key) -> do_store_retained_index(Key, ExpiryTime) end, + Indices, + TopicTokens + ) + end). do_store_retained_index(Key, ExpiryTime) -> RetainedIndex = #retained_index{ key = Key, expiry_time = ExpiryTime }, - mria:dirty_write(?TAB_INDEX, RetainedIndex). + mnesia:write(?TAB_INDEX, RetainedIndex, write). msg_table(SearchTable) -> qlc:q([