diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 0138ffed5..35a31e65c 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -98,7 +98,7 @@ store(Zone, GUID, Time, Topic, Msg) -> #{module := Mod, data := Data} = meta_lookup_gen(Zone, Time), Mod:store(Data, GUID, Time, Topic, Msg). --spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay_message_storage:time()) -> +-spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay:time()) -> {ok, _TODO} | {error, _TODO}. make_iterator(Zone, TopicFilter, StartTime) -> %% TODO: this is not supposed to work like this. Just a mock-up diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index f2a6afaa6..fb96863d1 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -140,11 +140,8 @@ %% Type declarations %%================================================================================ -%% parsed --type topic() :: list(binary()). - -%% TODO granularity? --type time() :: integer(). +-type topic() :: emqx_replay:topic(). +-type time() :: emqx_replay:time(). %% Number of bits -type bits() :: non_neg_integer(). @@ -292,20 +289,19 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, Value = make_message_value(Topic, MessagePayload), rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). --spec make_iterator(db(), emqx_topic:words(), time() | earliest) -> +-spec make_iterator(db(), emqx_topic:words(), time()) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, TopicFilter, StartTime) -> Options = emqx_replay_conf:zone_iteration_options(DB#db.zone), make_iterator(DB, TopicFilter, StartTime, Options). --spec make_iterator(db(), emqx_topic:words(), time() | earliest, iteration_options()) -> +-spec make_iterator(db(), emqx_topic:words(), time(), iteration_options()) -> % {error, invalid_start_time}? might just start from the beginning of time % and call it a day: client violated the contract anyway. {ok, iterator()} | {error, _TODO}. make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime, Options) -> case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of {ok, ITHandle} -> - % TODO earliest Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper), InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper), RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)),