fix(dsrepl): properly handle error conditions in generation mgmt

Also update few outdated typespecs. Also make error reasons easier
to comprehend.
This commit is contained in:
Andrew Mayorov 2024-03-19 15:20:58 +01:00
parent f2268aa69a
commit 35b18f9125
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 22 additions and 12 deletions

View File

@ -410,12 +410,14 @@ delete_next(
{ok, end_of_stream} {ok, end_of_stream}
end. end.
-spec update_config(shard_id(), emqx_ds:time(), emqx_ds:create_db_opts()) -> ok. -spec update_config(shard_id(), emqx_ds:time(), emqx_ds:create_db_opts()) ->
ok | {error, overlaps_existing_generations}.
update_config(ShardId, Since, Options) -> update_config(ShardId, Since, Options) ->
Call = #call_update_config{since = Since, options = Options}, Call = #call_update_config{since = Since, options = Options},
gen_server:call(?REF(ShardId), Call, infinity). gen_server:call(?REF(ShardId), Call, infinity).
-spec add_generation(shard_id(), emqx_ds:time()) -> ok. -spec add_generation(shard_id(), emqx_ds:time()) ->
ok | {error, overlaps_existing_generations}.
add_generation(ShardId, Since) -> add_generation(ShardId, Since) ->
gen_server:call(?REF(ShardId), #call_add_generation{since = Since}, infinity). gen_server:call(?REF(ShardId), #call_add_generation{since = Since}, infinity).
@ -481,13 +483,21 @@ init({ShardId, Options}) ->
{ok, S}. {ok, S}.
handle_call(#call_update_config{since = Since, options = Options}, _From, S0) -> handle_call(#call_update_config{since = Since, options = Options}, _From, S0) ->
S = #s{} = handle_update_config(S0, Since, Options), case handle_update_config(S0, Since, Options) of
commit_metadata(S), S = #s{} ->
{reply, ok, S}; commit_metadata(S),
{reply, ok, S};
Error = {error, _} ->
{reply, Error, S0}
end;
handle_call(#call_add_generation{since = Since}, _From, S0) -> handle_call(#call_add_generation{since = Since}, _From, S0) ->
S = #s{} = handle_add_generation(S0, Since), case handle_add_generation(S0, Since) of
commit_metadata(S), S = #s{} ->
{reply, ok, S}; commit_metadata(S),
{reply, ok, S};
Error = {error, _} ->
{reply, Error, S0}
end;
handle_call(#call_list_generations_with_lifetimes{}, _From, S) -> handle_call(#call_list_generations_with_lifetimes{}, _From, S) ->
Generations = handle_list_generations_with_lifetimes(S), Generations = handle_list_generations_with_lifetimes(S),
{reply, Generations, S}; {reply, Generations, S};
@ -531,7 +541,7 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) ->
). ).
-spec handle_add_generation(server_state(), emqx_ds:time()) -> -spec handle_add_generation(server_state(), emqx_ds:time()) ->
server_state() | {error, nonmonotonic}. server_state() | {error, overlaps_existing_generations}.
handle_add_generation(S0, Since) -> handle_add_generation(S0, Since) ->
#s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0, #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0,
@ -583,7 +593,7 @@ handle_add_generation(S0, Since) ->
end. end.
-spec handle_update_config(server_state(), emqx_ds:time(), emqx_ds:create_db_opts()) -> -spec handle_update_config(server_state(), emqx_ds:time(), emqx_ds:create_db_opts()) ->
server_state(). server_state() | {error, overlaps_existing_generations}.
handle_update_config(S0 = #s{schema = Schema}, Since, Options) -> handle_update_config(S0 = #s{schema = Schema}, Since, Options) ->
Prototype = maps:get(storage, Options), Prototype = maps:get(storage, Options),
S = S0#s{schema = Schema#{prototype := Prototype}}, S = S0#s{schema = Schema#{prototype := Prototype}},
@ -719,7 +729,7 @@ db_dir({DB, ShardId}) ->
filename:join([emqx_ds:base_dir(), atom_to_list(DB), binary_to_list(ShardId)]). filename:join([emqx_ds:base_dir(), atom_to_list(DB), binary_to_list(ShardId)]).
-spec update_last_until(Schema, emqx_ds:time()) -> -spec update_last_until(Schema, emqx_ds:time()) ->
Schema | {error, exists | nonmonotonic} Schema | {error, exists | overlaps_existing_generations}
when when
Schema :: shard_schema() | shard(). Schema :: shard_schema() | shard().
update_last_until(Schema = #{current_generation := GenId}, Until) -> update_last_until(Schema = #{current_generation := GenId}, Until) ->
@ -729,7 +739,7 @@ update_last_until(Schema = #{current_generation := GenId}, Until) ->
#{since := Until} -> #{since := Until} ->
{error, exists}; {error, exists};
#{since := CurrentSince} when CurrentSince > Until -> #{since := CurrentSince} when CurrentSince > Until ->
{error, nonmonotonic} {error, overlaps_existing_generations}
end. end.
run_post_creation_actions( run_post_creation_actions(