fix(ds): Workaround for the idempotency error when dropping gens

This commit is contained in:
ieQu1 2024-05-19 13:21:10 +02:00
parent eb7c43ee9d
commit acdae4fba3
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
1 changed files with 46 additions and 24 deletions

View File

@ -564,6 +564,7 @@ start_link(Shard = {_, _}, Options) ->
init({ShardId, Options}) -> init({ShardId, Options}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
?tp(info, ds_storage_init, #{shard => ShardId}),
logger:set_process_metadata(#{shard_id => ShardId, domain => [ds, storage_layer, shard]}), logger:set_process_metadata(#{shard_id => ShardId, domain => [ds, storage_layer, shard]}),
erase_schema_runtime(ShardId), erase_schema_runtime(ShardId),
clear_all_checkpoints(ShardId), clear_all_checkpoints(ShardId),
@ -777,18 +778,31 @@ handle_drop_generation(S0, GenId) ->
} = S0, } = S0,
#{module := Mod, cf_refs := GenCFRefs} = GenSchema, #{module := Mod, cf_refs := GenCFRefs} = GenSchema,
#{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard, #{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard,
case Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData) of try
ok -> Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData)
CFRefs = OldCFRefs -- GenCFRefs, catch
Shard = maps:remove(?GEN_KEY(GenId), OldShard), EC:Err:Stack ->
Schema = maps:remove(?GEN_KEY(GenId), OldSchema), ?tp(
S = S0#s{ error,
cf_refs = CFRefs, ds_storage_layer_failed_to_drop_generation,
shard = Shard, #{
schema = Schema shard => ShardId,
}, EC => Err,
{ok, S} stacktrace => Stack,
end. generation => GenId,
s => format_status(S0)
}
)
end,
CFRefs = OldCFRefs -- GenCFRefs,
Shard = maps:remove(?GEN_KEY(GenId), OldShard),
Schema = maps:remove(?GEN_KEY(GenId), OldSchema),
S = S0#s{
cf_refs = CFRefs,
shard = Shard,
schema = Schema
},
{ok, S}.
-spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) -> -spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) ->
generation(). generation().
@ -940,14 +954,18 @@ handle_accept_snapshot(ShardId) ->
%% general. %% general.
%% %%
%% The mechanism of storage layer events should be refined later. %% The mechanism of storage layer events should be refined later.
-spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. -spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> {gen_id(), [CustomEvent]}.
handle_event(Shard, Time, Event) -> handle_event(Shard, Time, Event) ->
{_GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time), case generation_at(Shard, Time) of
?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}), {_GenId, #{module := Mod, data := GenData}} ->
case erlang:function_exported(Mod, handle_event, 4) of ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}),
true -> case erlang:function_exported(Mod, handle_event, 4) of
Mod:handle_event(Shard, GenData, Time, Event); true ->
false -> Mod:handle_event(Shard, GenData, Time, Event);
false ->
[]
end;
_ ->
[] []
end. end.
@ -989,12 +1007,16 @@ generation_at(Shard, Time) ->
generation_at(Time, Current, Schema). generation_at(Time, Current, Schema).
generation_at(Time, GenId, Schema) -> generation_at(Time, GenId, Schema) ->
#{?GEN_KEY(GenId) := Gen} = Schema, case Schema of
case Gen of #{?GEN_KEY(GenId) := Gen} ->
#{since := Since} when Time < Since andalso GenId > 0 -> case Gen of
generation_at(Time, prev_generation_id(GenId), Schema); #{since := Since} when Time < Since andalso GenId > 0 ->
generation_at(Time, prev_generation_id(GenId), Schema);
_ ->
{GenId, Gen}
end;
_ -> _ ->
{GenId, Gen} not_found
end. end.
-define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}). -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}).