fix(dsstore): avoid storing `cf_refs()` in the RocksDB itself
This is both pointless and confusing.
This commit is contained in:
parent
80ea2e62f7
commit
2180cc7c26
|
@ -111,7 +111,8 @@
|
|||
|
||||
-type shard_id() :: {emqx_ds:db(), binary()}.
|
||||
|
||||
-type cf_refs() :: [{string(), rocksdb:cf_handle()}].
|
||||
-type cf_ref() :: {string(), rocksdb:cf_handle()}.
|
||||
-type cf_refs() :: [cf_ref()].
|
||||
|
||||
-type gen_id() :: 0..16#ffff.
|
||||
|
||||
|
@ -179,7 +180,7 @@
|
|||
%% Module-specific data defined at generation creation time:
|
||||
data := Data,
|
||||
%% Column families used by this generation
|
||||
cf_refs := cf_refs(),
|
||||
cf_names := [string()],
|
||||
%% Time at which this was created. Might differ from `since', in particular for the
|
||||
%% first generation.
|
||||
created_at := emqx_message:timestamp(),
|
||||
|
@ -789,9 +790,9 @@ handle_drop_generation(S0, GenId) ->
|
|||
#s{
|
||||
shard_id = ShardId,
|
||||
db = DB,
|
||||
schema = #{?GEN_KEY(GenId) := GenSchema} = OldSchema,
|
||||
shard = OldShard,
|
||||
cf_refs = OldCFRefs
|
||||
schema = #{?GEN_KEY(GenId) := GenSchema} = Schema0,
|
||||
shard = #{?GEN_KEY(GenId) := #{data := RuntimeData}} = Shard0,
|
||||
cf_refs = CFRefs0
|
||||
} = S0,
|
||||
%% 1. Commit the metadata first, so other functions are less
|
||||
%% likely to see stale data, and replicas don't end up
|
||||
|
@ -800,16 +801,16 @@ handle_drop_generation(S0, GenId) ->
|
|||
%%
|
||||
%% Note: in theory, this operation may be interrupted in the
|
||||
%% middle. This will leave column families hanging.
|
||||
Shard = maps:remove(?GEN_KEY(GenId), OldShard),
|
||||
Schema = maps:remove(?GEN_KEY(GenId), OldSchema),
|
||||
Shard = maps:remove(?GEN_KEY(GenId), Shard0),
|
||||
Schema = maps:remove(?GEN_KEY(GenId), Schema0),
|
||||
S1 = S0#s{
|
||||
shard = Shard,
|
||||
schema = Schema
|
||||
},
|
||||
commit_metadata(S1),
|
||||
%% 2. Now, actually drop the data from RocksDB:
|
||||
#{module := Mod, cf_refs := GenCFRefs} = GenSchema,
|
||||
#{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard,
|
||||
#{module := Mod, cf_names := GenCFNames} = GenSchema,
|
||||
GenCFRefs = [cf_ref(Name, CFRefs0) || Name <- GenCFNames],
|
||||
try
|
||||
Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData)
|
||||
catch
|
||||
|
@ -826,7 +827,7 @@ handle_drop_generation(S0, GenId) ->
|
|||
}
|
||||
)
|
||||
end,
|
||||
CFRefs = OldCFRefs -- GenCFRefs,
|
||||
CFRefs = CFRefs0 -- GenCFRefs,
|
||||
S = S1#s{cf_refs = CFRefs},
|
||||
{ok, S}.
|
||||
|
||||
|
@ -878,7 +879,7 @@ new_generation(ShardId, DB, Schema0, Shard0, Since) ->
|
|||
GenSchema = #{
|
||||
module => Mod,
|
||||
data => GenData,
|
||||
cf_refs => NewCFRefs,
|
||||
cf_names => cf_names(NewCFRefs),
|
||||
created_at => erlang:system_time(millisecond),
|
||||
since => Since,
|
||||
until => undefined
|
||||
|
@ -964,18 +965,22 @@ update_last_until(Schema = #{current_generation := GenId}, Until) ->
|
|||
{error, overlaps_existing_generations}
|
||||
end.
|
||||
|
||||
handle_flush(S = #s{db = DB, cf_need_flush = NeedFlushGenId, schema = Schema}) ->
|
||||
handle_flush(S = #s{db = DB, cf_refs = CFRefs, cf_need_flush = NeedFlushGenId, shard = Shard}) ->
|
||||
%% NOTE
|
||||
%% There could have been few generations added since the last time `flush/1` was
|
||||
%% called. Strictly speaking, we don't need to flush them all at once as part of
|
||||
%% a single atomic flush, but the error handling is a bit easier this way.
|
||||
CurrentGenId = maps:get(current_generation, Schema),
|
||||
CurrentGenId = maps:get(current_generation, Shard),
|
||||
GenIds = lists:seq(NeedFlushGenId, CurrentGenId),
|
||||
CFHandles = lists:flatmap(
|
||||
fun(GenId) ->
|
||||
#{?GEN_KEY(GenId) := #{cf_refs := CFRefs}} = Schema,
|
||||
{_, CFHandles} = lists:unzip(CFRefs),
|
||||
CFHandles
|
||||
case Shard of
|
||||
#{?GEN_KEY(GenId) := #{cf_names := CFNames}} ->
|
||||
[cf_handle(N, CFRefs) || N <- CFNames];
|
||||
#{} ->
|
||||
%% Generation was probably dropped.
|
||||
[]
|
||||
end
|
||||
end,
|
||||
GenIds
|
||||
),
|
||||
|
@ -1028,6 +1033,19 @@ mk_write_options(#{durable := false}) ->
|
|||
mk_write_options(#{}) ->
|
||||
[].
|
||||
|
||||
-spec cf_names(cf_refs()) -> [string()].
|
||||
cf_names(CFRefs) ->
|
||||
{CFNames, _CFHandles} = lists:unzip(CFRefs),
|
||||
CFNames.
|
||||
|
||||
-spec cf_ref(_Name :: string(), cf_refs()) -> cf_ref().
|
||||
cf_ref(Name, CFRefs) ->
|
||||
lists:keyfind(Name, 1, CFRefs).
|
||||
|
||||
-spec cf_handle(_Name :: string(), cf_refs()) -> rocksdb:cf_handle().
|
||||
cf_handle(Name, CFRefs) ->
|
||||
element(2, cf_ref(Name, CFRefs)).
|
||||
|
||||
%%--------------------------------------------------------------------------------
|
||||
%% Schema access
|
||||
%%--------------------------------------------------------------------------------
|
||||
|
|
Loading…
Reference in New Issue