diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 06e852dcd..69f5b8231 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -410,12 +410,14 @@ delete_next( {ok, end_of_stream} end. --spec update_config(shard_id(), emqx_ds:time(), emqx_ds:create_db_opts()) -> ok. +-spec update_config(shard_id(), emqx_ds:time(), emqx_ds:create_db_opts()) -> + ok | {error, overlaps_existing_generations}. update_config(ShardId, Since, Options) -> Call = #call_update_config{since = Since, options = Options}, gen_server:call(?REF(ShardId), Call, infinity). --spec add_generation(shard_id(), emqx_ds:time()) -> ok. +-spec add_generation(shard_id(), emqx_ds:time()) -> + ok | {error, overlaps_existing_generations}. add_generation(ShardId, Since) -> gen_server:call(?REF(ShardId), #call_add_generation{since = Since}, infinity). @@ -481,13 +483,21 @@ init({ShardId, Options}) -> {ok, S}. handle_call(#call_update_config{since = Since, options = Options}, _From, S0) -> - S = #s{} = handle_update_config(S0, Since, Options), - commit_metadata(S), - {reply, ok, S}; + case handle_update_config(S0, Since, Options) of + S = #s{} -> + commit_metadata(S), + {reply, ok, S}; + Error = {error, _} -> + {reply, Error, S0} + end; handle_call(#call_add_generation{since = Since}, _From, S0) -> - S = #s{} = handle_add_generation(S0, Since), - commit_metadata(S), - {reply, ok, S}; + case handle_add_generation(S0, Since) of + S = #s{} -> + commit_metadata(S), + {reply, ok, S}; + Error = {error, _} -> + {reply, Error, S0} + end; handle_call(#call_list_generations_with_lifetimes{}, _From, S) -> Generations = handle_list_generations_with_lifetimes(S), {reply, Generations, S}; @@ -531,7 +541,7 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) -> ). -spec handle_add_generation(server_state(), emqx_ds:time()) -> - server_state() | {error, nonmonotonic}. + server_state() | {error, overlaps_existing_generations}. handle_add_generation(S0, Since) -> #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0, @@ -583,7 +593,7 @@ handle_add_generation(S0, Since) -> end. -spec handle_update_config(server_state(), emqx_ds:time(), emqx_ds:create_db_opts()) -> - server_state(). + server_state() | {error, overlaps_existing_generations}. handle_update_config(S0 = #s{schema = Schema}, Since, Options) -> Prototype = maps:get(storage, Options), S = S0#s{schema = Schema#{prototype := Prototype}}, @@ -719,7 +729,7 @@ db_dir({DB, ShardId}) -> filename:join([emqx_ds:base_dir(), atom_to_list(DB), binary_to_list(ShardId)]). -spec update_last_until(Schema, emqx_ds:time()) -> - Schema | {error, exists | nonmonotonic} + Schema | {error, exists | overlaps_existing_generations} when Schema :: shard_schema() | shard(). update_last_until(Schema = #{current_generation := GenId}, Until) -> @@ -729,7 +739,7 @@ update_last_until(Schema = #{current_generation := GenId}, Until) -> #{since := Until} -> {error, exists}; #{since := CurrentSince} when CurrentSince > Until -> - {error, nonmonotonic} + {error, overlaps_existing_generations} end. run_post_creation_actions(