chore(ds): Simplify iteration-related typespecs
This commit is contained in:
parent
cf6a5e1643
commit
bf9d57f6a9
|
@ -98,7 +98,7 @@ store(Zone, GUID, Time, Topic, Msg) ->
|
||||||
#{module := Mod, data := Data} = meta_lookup_gen(Zone, Time),
|
#{module := Mod, data := Data} = meta_lookup_gen(Zone, Time),
|
||||||
Mod:store(Data, GUID, Time, Topic, Msg).
|
Mod:store(Data, GUID, Time, Topic, Msg).
|
||||||
|
|
||||||
-spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay_message_storage:time()) ->
|
-spec make_iterator(emqx_types:zone(), emqx_topic:words(), emqx_replay:time()) ->
|
||||||
{ok, _TODO} | {error, _TODO}.
|
{ok, _TODO} | {error, _TODO}.
|
||||||
make_iterator(Zone, TopicFilter, StartTime) ->
|
make_iterator(Zone, TopicFilter, StartTime) ->
|
||||||
%% TODO: this is not supposed to work like this. Just a mock-up
|
%% TODO: this is not supposed to work like this. Just a mock-up
|
||||||
|
|
|
@ -140,11 +140,8 @@
|
||||||
%% Type declarations
|
%% Type declarations
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
%% parsed
|
-type topic() :: emqx_replay:topic().
|
||||||
-type topic() :: list(binary()).
|
-type time() :: emqx_replay:time().
|
||||||
|
|
||||||
%% TODO granularity?
|
|
||||||
-type time() :: integer().
|
|
||||||
|
|
||||||
%% Number of bits
|
%% Number of bits
|
||||||
-type bits() :: non_neg_integer().
|
-type bits() :: non_neg_integer().
|
||||||
|
@ -292,20 +289,19 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
|
||||||
Value = make_message_value(Topic, MessagePayload),
|
Value = make_message_value(Topic, MessagePayload),
|
||||||
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
|
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
|
||||||
|
|
||||||
-spec make_iterator(db(), emqx_topic:words(), time() | earliest) ->
|
-spec make_iterator(db(), emqx_topic:words(), time()) ->
|
||||||
{ok, iterator()} | {error, _TODO}.
|
{ok, iterator()} | {error, _TODO}.
|
||||||
make_iterator(DB, TopicFilter, StartTime) ->
|
make_iterator(DB, TopicFilter, StartTime) ->
|
||||||
Options = emqx_replay_conf:zone_iteration_options(DB#db.zone),
|
Options = emqx_replay_conf:zone_iteration_options(DB#db.zone),
|
||||||
make_iterator(DB, TopicFilter, StartTime, Options).
|
make_iterator(DB, TopicFilter, StartTime, Options).
|
||||||
|
|
||||||
-spec make_iterator(db(), emqx_topic:words(), time() | earliest, iteration_options()) ->
|
-spec make_iterator(db(), emqx_topic:words(), time(), iteration_options()) ->
|
||||||
% {error, invalid_start_time}? might just start from the beginning of time
|
% {error, invalid_start_time}? might just start from the beginning of time
|
||||||
% and call it a day: client violated the contract anyway.
|
% and call it a day: client violated the contract anyway.
|
||||||
{ok, iterator()} | {error, _TODO}.
|
{ok, iterator()} | {error, _TODO}.
|
||||||
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime, Options) ->
|
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime, Options) ->
|
||||||
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
|
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
|
||||||
{ok, ITHandle} ->
|
{ok, ITHandle} ->
|
||||||
% TODO earliest
|
|
||||||
Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper),
|
Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper),
|
||||||
InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper),
|
InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper),
|
||||||
RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)),
|
RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)),
|
||||||
|
|
Loading…
Reference in New Issue