From acdae4fba38f235cf860b3801f5532bd82678498 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 19 May 2024 13:21:10 +0200 Subject: [PATCH] fix(ds): Workaround for the idempotency error when dropping gens --- .../src/emqx_ds_storage_layer.erl | 70 ++++++++++++------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index d36d8e96f..68e2f4597 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -564,6 +564,7 @@ start_link(Shard = {_, _}, Options) -> init({ShardId, Options}) -> process_flag(trap_exit, true), + ?tp(info, ds_storage_init, #{shard => ShardId}), logger:set_process_metadata(#{shard_id => ShardId, domain => [ds, storage_layer, shard]}), erase_schema_runtime(ShardId), clear_all_checkpoints(ShardId), @@ -777,18 +778,31 @@ handle_drop_generation(S0, GenId) -> } = S0, #{module := Mod, cf_refs := GenCFRefs} = GenSchema, #{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard, - case Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData) of - ok -> - CFRefs = OldCFRefs -- GenCFRefs, - Shard = maps:remove(?GEN_KEY(GenId), OldShard), - Schema = maps:remove(?GEN_KEY(GenId), OldSchema), - S = S0#s{ - cf_refs = CFRefs, - shard = Shard, - schema = Schema - }, - {ok, S} - end. + try + Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData) + catch + EC:Err:Stack -> + ?tp( + error, + ds_storage_layer_failed_to_drop_generation, + #{ + shard => ShardId, + EC => Err, + stacktrace => Stack, + generation => GenId, + s => format_status(S0) + } + ) + end, + CFRefs = OldCFRefs -- GenCFRefs, + Shard = maps:remove(?GEN_KEY(GenId), OldShard), + Schema = maps:remove(?GEN_KEY(GenId), OldSchema), + S = S0#s{ + cf_refs = CFRefs, + shard = Shard, + schema = Schema + }, + {ok, S}. -spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) -> generation(). @@ -940,14 +954,18 @@ handle_accept_snapshot(ShardId) -> %% general. %% %% The mechanism of storage layer events should be refined later. --spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. +-spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> {gen_id(), [CustomEvent]}. handle_event(Shard, Time, Event) -> - {_GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time), - ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}), - case erlang:function_exported(Mod, handle_event, 4) of - true -> - Mod:handle_event(Shard, GenData, Time, Event); - false -> + case generation_at(Shard, Time) of + {_GenId, #{module := Mod, data := GenData}} -> + ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}), + case erlang:function_exported(Mod, handle_event, 4) of + true -> + Mod:handle_event(Shard, GenData, Time, Event); + false -> + [] + end; + _ -> [] end. @@ -989,12 +1007,16 @@ generation_at(Shard, Time) -> generation_at(Time, Current, Schema). generation_at(Time, GenId, Schema) -> - #{?GEN_KEY(GenId) := Gen} = Schema, - case Gen of - #{since := Since} when Time < Since andalso GenId > 0 -> - generation_at(Time, prev_generation_id(GenId), Schema); + case Schema of + #{?GEN_KEY(GenId) := Gen} -> + case Gen of + #{since := Since} when Time < Since andalso GenId > 0 -> + generation_at(Time, prev_generation_id(GenId), Schema); + _ -> + {GenId, Gen} + end; _ -> - {GenId, Gen} + not_found end. -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}).