From 2180cc7c269c73be1ce31864d888335d80438136 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 13 Jun 2024 15:26:34 +0200 Subject: [PATCH] fix(dsstore): avoid storing `cf_refs()` in the RocksDB itself This is both pointless and confusing. --- .../src/emqx_ds_storage_layer.erl | 50 +++++++++++++------ 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index d8624f56b..e0e0256c4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -111,7 +111,8 @@ -type shard_id() :: {emqx_ds:db(), binary()}. --type cf_refs() :: [{string(), rocksdb:cf_handle()}]. +-type cf_ref() :: {string(), rocksdb:cf_handle()}. +-type cf_refs() :: [cf_ref()]. -type gen_id() :: 0..16#ffff. @@ -179,7 +180,7 @@ %% Module-specific data defined at generation creation time: data := Data, %% Column families used by this generation - cf_refs := cf_refs(), + cf_names := [string()], %% Time at which this was created. Might differ from `since', in particular for the %% first generation. created_at := emqx_message:timestamp(), @@ -789,9 +790,9 @@ handle_drop_generation(S0, GenId) -> #s{ shard_id = ShardId, db = DB, - schema = #{?GEN_KEY(GenId) := GenSchema} = OldSchema, - shard = OldShard, - cf_refs = OldCFRefs + schema = #{?GEN_KEY(GenId) := GenSchema} = Schema0, + shard = #{?GEN_KEY(GenId) := #{data := RuntimeData}} = Shard0, + cf_refs = CFRefs0 } = S0, %% 1. Commit the metadata first, so other functions are less %% likely to see stale data, and replicas don't end up @@ -800,16 +801,16 @@ handle_drop_generation(S0, GenId) -> %% %% Note: in theory, this operation may be interrupted in the %% middle. This will leave column families hanging. - Shard = maps:remove(?GEN_KEY(GenId), OldShard), - Schema = maps:remove(?GEN_KEY(GenId), OldSchema), + Shard = maps:remove(?GEN_KEY(GenId), Shard0), + Schema = maps:remove(?GEN_KEY(GenId), Schema0), S1 = S0#s{ shard = Shard, schema = Schema }, commit_metadata(S1), %% 2. Now, actually drop the data from RocksDB: - #{module := Mod, cf_refs := GenCFRefs} = GenSchema, - #{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard, + #{module := Mod, cf_names := GenCFNames} = GenSchema, + GenCFRefs = [cf_ref(Name, CFRefs0) || Name <- GenCFNames], try Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData) catch @@ -826,7 +827,7 @@ handle_drop_generation(S0, GenId) -> } ) end, - CFRefs = OldCFRefs -- GenCFRefs, + CFRefs = CFRefs0 -- GenCFRefs, S = S1#s{cf_refs = CFRefs}, {ok, S}. @@ -878,7 +879,7 @@ new_generation(ShardId, DB, Schema0, Shard0, Since) -> GenSchema = #{ module => Mod, data => GenData, - cf_refs => NewCFRefs, + cf_names => cf_names(NewCFRefs), created_at => erlang:system_time(millisecond), since => Since, until => undefined @@ -964,18 +965,22 @@ update_last_until(Schema = #{current_generation := GenId}, Until) -> {error, overlaps_existing_generations} end. -handle_flush(S = #s{db = DB, cf_need_flush = NeedFlushGenId, schema = Schema}) -> +handle_flush(S = #s{db = DB, cf_refs = CFRefs, cf_need_flush = NeedFlushGenId, shard = Shard}) -> %% NOTE %% There could have been few generations added since the last time `flush/1` was %% called. Strictly speaking, we don't need to flush them all at once as part of %% a single atomic flush, but the error handling is a bit easier this way. - CurrentGenId = maps:get(current_generation, Schema), + CurrentGenId = maps:get(current_generation, Shard), GenIds = lists:seq(NeedFlushGenId, CurrentGenId), CFHandles = lists:flatmap( fun(GenId) -> - #{?GEN_KEY(GenId) := #{cf_refs := CFRefs}} = Schema, - {_, CFHandles} = lists:unzip(CFRefs), - CFHandles + case Shard of + #{?GEN_KEY(GenId) := #{cf_names := CFNames}} -> + [cf_handle(N, CFRefs) || N <- CFNames]; + #{} -> + %% Generation was probably dropped. + [] + end end, GenIds ), @@ -1028,6 +1033,19 @@ mk_write_options(#{durable := false}) -> mk_write_options(#{}) -> []. +-spec cf_names(cf_refs()) -> [string()]. +cf_names(CFRefs) -> + {CFNames, _CFHandles} = lists:unzip(CFRefs), + CFNames. + +-spec cf_ref(_Name :: string(), cf_refs()) -> cf_ref(). +cf_ref(Name, CFRefs) -> + lists:keyfind(Name, 1, CFRefs). + +-spec cf_handle(_Name :: string(), cf_refs()) -> rocksdb:cf_handle(). +cf_handle(Name, CFRefs) -> + element(2, cf_ref(Name, CFRefs)). + %%-------------------------------------------------------------------------------- %% Schema access %%--------------------------------------------------------------------------------