From 4dafbf21f664737dc71b0da2bda1f848ba95dc79 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 2 Feb 2024 17:21:26 +0100 Subject: [PATCH] fix(dsrepl): make db + shard part of machine state It doesn't feel right, but right now is the easiest way to have it in the scope of `apply/3`, because `init/1` doesn't actually invoked for ra machines recovered from the existing log / snapshot. --- .../src/emqx_ds_replication_layer.erl | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 53a491d1f..c091392b1 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -574,8 +574,7 @@ ra_drop_shard(DB, Shard) -> %% init(#{db := DB, shard := Shard}) -> - _ = erlang:put(emqx_ds_db_shard, {DB, Shard}), - #{latest => 0}. + #{db_shard => {DB, Shard}, latest => 0}. apply( #{index := RaftIdx}, @@ -583,7 +582,7 @@ apply( ?tag := ?BATCH, ?batch_messages := MessagesIn }, - #{latest := Latest} = State + #{db_shard := DBShard, latest := Latest} = State ) -> %% NOTE %% Unique timestamp tracking real time closely. @@ -597,7 +596,7 @@ apply( %% currently relies on wall clock time to decide if it's safe to iterate over %% next epoch, this is likely wrong. Ideally it should rely on consensus clock %% time instead. - Result = emqx_ds_storage_layer:store_batch(erlang:get(emqx_ds_db_shard), Messages, #{}), + Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), NState = State#{latest := NLatest}, %% TODO: Need to measure effects of changing frequency of `release_cursor`. Effect = {release_cursor, RaftIdx, NState}, @@ -605,23 +604,23 @@ apply( apply( _RaftMeta, #{?tag := add_generation}, - State + #{db_shard := DBShard} = State ) -> - Result = emqx_ds_storage_layer:add_generation(erlang:get(emqx_ds_db_shard)), + Result = emqx_ds_storage_layer:add_generation(DBShard), {State, Result}; apply( _RaftMeta, #{?tag := update_config, ?config := Opts}, - State + #{db_shard := DBShard} = State ) -> - Result = emqx_ds_storage_layer:update_config(erlang:get(emqx_ds_db_shard), Opts), + Result = emqx_ds_storage_layer:update_config(DBShard, Opts), {State, Result}; apply( _RaftMeta, #{?tag := drop_generation, ?generation := GenId}, - State + #{db_shard := DBShard} = State ) -> - Result = emqx_ds_storage_layer:drop_generation(erlang:get(emqx_ds_db_shard), GenId), + Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId), {State, Result}. assign_timestamps(Latest, Messages) ->