diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 065eb0f25..c8041297b 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -84,6 +84,28 @@ -define(REF(Shard), {via, gproc, {n, l, {?MODULE, Shard}}}). +%%================================================================================ +%% Callbacks +%%================================================================================ + +-callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) -> + {_Schema, cf_refs()}. + +-callback open(emqx_replay:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> + term(). + +-callback store(_Schema, binary(), emqx_replay:time(), emqx_replay:topic(), binary()) -> + ok | {error, _}. + +-callback make_iterator(_Schema, emqx_replay:replay()) -> + {ok, _It} | {error, _}. + +-callback restore_iterator(_Schema, emqx_replay:replay(), binary()) -> {ok, _It} | {error, _}. + +-callback preserve_iterator(_Schema, _It) -> term(). + +-callback next(It) -> {value, binary(), It} | none | {error, closed}. + %%================================================================================ %% API funcions %%================================================================================ @@ -100,7 +122,7 @@ create_generation(Shard, Since, Config = {_Module, _Options}) -> -spec store( emqx_replay:shard(), emqx_guid:guid(), emqx_replay:time(), emqx_replay:topic(), binary() ) -> - ok | {error, _TODO}. + ok | {error, _}. store(Shard, GUID, Time, Topic, Msg) -> {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), Mod:store(Data, GUID, Time, Topic, Msg).