feat: make `create_generation` safer against bad input
This commit is contained in:
parent
418ecbcbbc
commit
3c13dd38f6
|
@ -92,7 +92,7 @@ start_link(Zone) ->
|
|||
gen_server:start_link(?REF(Zone), ?MODULE, [Zone], []).
|
||||
|
||||
-spec create_generation(emqx_types:zone(), emqx_replay:time(), emqx_replay_conf:backend_config()) ->
|
||||
{ok, gen_id()}.
|
||||
{ok, gen_id()} | {error, nonmonotonic}.
|
||||
create_generation(Zone, Since, Config = {_Module, _Options}) ->
|
||||
gen_server:call(?REF(Zone), {create_generation, Since, Config}).
|
||||
|
||||
|
@ -148,8 +148,12 @@ init([Zone]) ->
|
|||
{ok, S}.
|
||||
|
||||
handle_call({create_generation, Since, Config}, _From, S) ->
|
||||
{ok, GenId, NS} = create_new_gen(Since, Config, S),
|
||||
{reply, {ok, GenId}, NS};
|
||||
case create_new_gen(Since, Config, S) of
|
||||
{ok, GenId, NS} ->
|
||||
{reply, {ok, GenId}, NS};
|
||||
{error, _} = Error ->
|
||||
{reply, Error, S}
|
||||
end;
|
||||
handle_call(_Call, _From, S) ->
|
||||
{reply, {error, unknown_call}, S}.
|
||||
|
||||
|
@ -193,17 +197,22 @@ ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) ->
|
|||
create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) ->
|
||||
GenId = get_next_id(meta_get_current(Zone)),
|
||||
GenId = get_next_id(schema_get_current(DBHandle)),
|
||||
% TODO: Propagate errors to clients.
|
||||
true = is_gen_valid(Zone, GenId, Since),
|
||||
{ok, Gen, NS} = create_gen(GenId, Since, Config, S),
|
||||
%% TODO: Transaction? Column family creation can't be transactional, anyway.
|
||||
ok = schema_put_gen(DBHandle, GenId, Gen),
|
||||
ok = schema_put_current(DBHandle, GenId),
|
||||
ok = meta_register_gen(Zone, GenId, Gen),
|
||||
{ok, GenId, NS}.
|
||||
case is_gen_valid(Zone, GenId, Since) of
|
||||
ok ->
|
||||
{ok, Gen, NS} = create_gen(GenId, Since, Config, S),
|
||||
%% TODO: Transaction? Column family creation can't be transactional, anyway.
|
||||
ok = schema_put_gen(DBHandle, GenId, Gen),
|
||||
ok = schema_put_current(DBHandle, GenId),
|
||||
ok = meta_register_gen(Zone, GenId, open_gen(GenId, Gen, NS)),
|
||||
{ok, GenId, NS};
|
||||
{error, _} = Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
% -spec
|
||||
-spec create_gen(gen_id(), emqx_replay:time(), emqx_replay_conf:backend_config(), state()) ->
|
||||
{ok, generation(), state()}.
|
||||
create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, column_families = CFs}) ->
|
||||
% TODO: Backend implementation should ensure idempotency.
|
||||
{Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options),
|
||||
Gen = #{
|
||||
module => Module,
|
||||
|
@ -360,10 +369,15 @@ get_next_id(undefined) -> 0;
|
|||
get_next_id(GenId) -> GenId + 1.
|
||||
|
||||
is_gen_valid(Zone, GenId, Since) when GenId > 0 ->
|
||||
[#{since := SincePrev} | _] = meta_lookup(Zone, GenId - 1),
|
||||
Since > SincePrev;
|
||||
[GenPrev | _] = meta_lookup(Zone, GenId - 1),
|
||||
case GenPrev of
|
||||
#{since := SincePrev} when Since > SincePrev ->
|
||||
ok;
|
||||
#{} ->
|
||||
{error, nonmonotonic}
|
||||
end;
|
||||
is_gen_valid(_Zone, 0, 0) ->
|
||||
true.
|
||||
ok.
|
||||
|
||||
%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok.
|
||||
%% store_cfs(DBHandle, CFRefs) ->
|
||||
|
|
Loading…
Reference in New Issue