From 3c13dd38f68dd745e5a7ab0d0b3dc52033841cd1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 17 Jan 2023 17:21:59 +0300 Subject: [PATCH] feat: make `create_generation` safer against bad input --- .../src/emqx_replay_local_store.erl | 44 ++++++++++++------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 77f0b2924..7148308b2 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -92,7 +92,7 @@ start_link(Zone) -> gen_server:start_link(?REF(Zone), ?MODULE, [Zone], []). -spec create_generation(emqx_types:zone(), emqx_replay:time(), emqx_replay_conf:backend_config()) -> - {ok, gen_id()}. + {ok, gen_id()} | {error, nonmonotonic}. create_generation(Zone, Since, Config = {_Module, _Options}) -> gen_server:call(?REF(Zone), {create_generation, Since, Config}). @@ -148,8 +148,12 @@ init([Zone]) -> {ok, S}. handle_call({create_generation, Since, Config}, _From, S) -> - {ok, GenId, NS} = create_new_gen(Since, Config, S), - {reply, {ok, GenId}, NS}; + case create_new_gen(Since, Config, S) of + {ok, GenId, NS} -> + {reply, {ok, GenId}, NS}; + {error, _} = Error -> + {reply, Error, S} + end; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -193,17 +197,22 @@ ensure_current_generation(S = #s{zone = Zone, db = DBHandle}) -> create_new_gen(Since, Config, S = #s{zone = Zone, db = DBHandle}) -> GenId = get_next_id(meta_get_current(Zone)), GenId = get_next_id(schema_get_current(DBHandle)), - % TODO: Propagate errors to clients. - true = is_gen_valid(Zone, GenId, Since), - {ok, Gen, NS} = create_gen(GenId, Since, Config, S), - %% TODO: Transaction? Column family creation can't be transactional, anyway. - ok = schema_put_gen(DBHandle, GenId, Gen), - ok = schema_put_current(DBHandle, GenId), - ok = meta_register_gen(Zone, GenId, Gen), - {ok, GenId, NS}. + case is_gen_valid(Zone, GenId, Since) of + ok -> + {ok, Gen, NS} = create_gen(GenId, Since, Config, S), + %% TODO: Transaction? Column family creation can't be transactional, anyway. + ok = schema_put_gen(DBHandle, GenId, Gen), + ok = schema_put_current(DBHandle, GenId), + ok = meta_register_gen(Zone, GenId, open_gen(GenId, Gen, NS)), + {ok, GenId, NS}; + {error, _} = Error -> + Error + end. -% -spec +-spec create_gen(gen_id(), emqx_replay:time(), emqx_replay_conf:backend_config(), state()) -> + {ok, generation(), state()}. create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, column_families = CFs}) -> + % TODO: Backend implementation should ensure idempotency. {Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options), Gen = #{ module => Module, @@ -360,10 +369,15 @@ get_next_id(undefined) -> 0; get_next_id(GenId) -> GenId + 1. is_gen_valid(Zone, GenId, Since) when GenId > 0 -> - [#{since := SincePrev} | _] = meta_lookup(Zone, GenId - 1), - Since > SincePrev; + [GenPrev | _] = meta_lookup(Zone, GenId - 1), + case GenPrev of + #{since := SincePrev} when Since > SincePrev -> + ok; + #{} -> + {error, nonmonotonic} + end; is_gen_valid(_Zone, 0, 0) -> - true. + ok. %% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok. %% store_cfs(DBHandle, CFRefs) ->