refactor: rm `emqx_ds_replay`
This commit is contained in:
parent
c46b8de938
commit
3344bfb0bd
|
@ -42,6 +42,8 @@
|
||||||
message_stats/0,
|
message_stats/0,
|
||||||
message_store_opts/0,
|
message_store_opts/0,
|
||||||
session_id/0,
|
session_id/0,
|
||||||
|
replay/0,
|
||||||
|
replay_id/0,
|
||||||
iterator_id/0,
|
iterator_id/0,
|
||||||
iterator/0,
|
iterator/0,
|
||||||
shard/0,
|
shard/0,
|
||||||
|
@ -80,6 +82,13 @@
|
||||||
%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps.
|
%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps.
|
||||||
-type time() :: non_neg_integer().
|
-type time() :: non_neg_integer().
|
||||||
|
|
||||||
|
-type replay_id() :: binary().
|
||||||
|
|
||||||
|
-type replay() :: {
|
||||||
|
_TopicFilter :: emqx_topic:words(),
|
||||||
|
_StartTime :: time()
|
||||||
|
}.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API funcions
|
%% API funcions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
|
@ -277,13 +277,13 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
|
||||||
Value = make_message_value(Topic, MessagePayload),
|
Value = make_message_value(Topic, MessagePayload),
|
||||||
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
|
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
|
||||||
|
|
||||||
-spec make_iterator(db(), emqx_ds_replay:replay()) ->
|
-spec make_iterator(db(), emqx_ds:replay()) ->
|
||||||
{ok, iterator()} | {error, _TODO}.
|
{ok, iterator()} | {error, _TODO}.
|
||||||
make_iterator(DB, Replay) ->
|
make_iterator(DB, Replay) ->
|
||||||
Options = emqx_ds_conf:shard_iteration_options(DB#db.shard),
|
Options = emqx_ds_conf:shard_iteration_options(DB#db.shard),
|
||||||
make_iterator(DB, Replay, Options).
|
make_iterator(DB, Replay, Options).
|
||||||
|
|
||||||
-spec make_iterator(db(), emqx_ds_replay:replay(), iteration_options()) ->
|
-spec make_iterator(db(), emqx_ds:replay(), iteration_options()) ->
|
||||||
% {error, invalid_start_time}? might just start from the beginning of time
|
% {error, invalid_start_time}? might just start from the beginning of time
|
||||||
% and call it a day: client violated the contract anyway.
|
% and call it a day: client violated the contract anyway.
|
||||||
{ok, iterator()} | {error, _TODO}.
|
{ok, iterator()} | {error, _TODO}.
|
||||||
|
@ -337,7 +337,7 @@ preserve_iterator(#it{cursor = Cursor}) ->
|
||||||
},
|
},
|
||||||
term_to_binary(State).
|
term_to_binary(State).
|
||||||
|
|
||||||
-spec restore_iterator(db(), emqx_ds_replay:replay(), binary()) ->
|
-spec restore_iterator(db(), emqx_ds:replay(), binary()) ->
|
||||||
{ok, iterator()} | {error, _TODO}.
|
{ok, iterator()} | {error, _TODO}.
|
||||||
restore_iterator(DB, Replay, Serial) when is_binary(Serial) ->
|
restore_iterator(DB, Replay, Serial) when is_binary(Serial) ->
|
||||||
State = binary_to_term(Serial),
|
State = binary_to_term(Serial),
|
||||||
|
@ -419,7 +419,7 @@ hash(Input, Bits) ->
|
||||||
% at most 32 bits
|
% at most 32 bits
|
||||||
erlang:phash2(Input, 1 bsl Bits).
|
erlang:phash2(Input, 1 bsl Bits).
|
||||||
|
|
||||||
-spec make_keyspace_filter(emqx_ds_replay:replay(), keymapper()) -> keyspace_filter().
|
-spec make_keyspace_filter(emqx_ds:replay(), keymapper()) -> keyspace_filter().
|
||||||
make_keyspace_filter({TopicFilter, StartTime}, Keymapper) ->
|
make_keyspace_filter({TopicFilter, StartTime}, Keymapper) ->
|
||||||
Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper),
|
Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper),
|
||||||
HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper),
|
HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper),
|
||||||
|
|
|
@ -1,36 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
-module(emqx_ds_replay).
|
|
||||||
|
|
||||||
%% API:
|
|
||||||
-export([]).
|
|
||||||
|
|
||||||
-export_type([replay_id/0, replay/0]).
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% Type declarations
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
-type replay_id() :: binary().
|
|
||||||
|
|
||||||
-type replay() :: {
|
|
||||||
_TopicFilter :: emqx_ds:words(),
|
|
||||||
_StartTime :: emqx_ds:time()
|
|
||||||
}.
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% API funcions
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% behaviour callbacks
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% Internal exports
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% Internal functions
|
|
||||||
%%================================================================================
|
|
|
@ -71,7 +71,7 @@
|
||||||
-record(it, {
|
-record(it, {
|
||||||
shard :: emqx_ds:shard(),
|
shard :: emqx_ds:shard(),
|
||||||
gen :: gen_id(),
|
gen :: gen_id(),
|
||||||
replay :: emqx_ds_replay:replay(),
|
replay :: emqx_ds:replay(),
|
||||||
module :: module(),
|
module :: module(),
|
||||||
data :: term()
|
data :: term()
|
||||||
}).
|
}).
|
||||||
|
@ -112,10 +112,10 @@
|
||||||
-callback store(_Schema, binary(), emqx_ds:time(), emqx_ds:topic(), binary()) ->
|
-callback store(_Schema, binary(), emqx_ds:time(), emqx_ds:topic(), binary()) ->
|
||||||
ok | {error, _}.
|
ok | {error, _}.
|
||||||
|
|
||||||
-callback make_iterator(_Schema, emqx_ds_replay:replay()) ->
|
-callback make_iterator(_Schema, emqx_ds:replay()) ->
|
||||||
{ok, _It} | {error, _}.
|
{ok, _It} | {error, _}.
|
||||||
|
|
||||||
-callback restore_iterator(_Schema, emqx_ds_replay:replay(), binary()) -> {ok, _It} | {error, _}.
|
-callback restore_iterator(_Schema, emqx_ds:replay(), binary()) -> {ok, _It} | {error, _}.
|
||||||
|
|
||||||
-callback preserve_iterator(_Schema, _It) -> term().
|
-callback preserve_iterator(_Schema, _It) -> term().
|
||||||
|
|
||||||
|
@ -140,7 +140,7 @@ store(Shard, GUID, Time, Topic, Msg) ->
|
||||||
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
|
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
|
||||||
Mod:store(Data, GUID, Time, Topic, Msg).
|
Mod:store(Data, GUID, Time, Topic, Msg).
|
||||||
|
|
||||||
-spec make_iterator(emqx_ds:shard(), emqx_ds_replay:replay()) ->
|
-spec make_iterator(emqx_ds:shard(), emqx_ds:replay()) ->
|
||||||
{ok, iterator()} | {error, _TODO}.
|
{ok, iterator()} | {error, _TODO}.
|
||||||
make_iterator(Shard, Replay = {_, StartTime}) ->
|
make_iterator(Shard, Replay = {_, StartTime}) ->
|
||||||
{GenId, Gen} = meta_lookup_gen(Shard, StartTime),
|
{GenId, Gen} = meta_lookup_gen(Shard, StartTime),
|
||||||
|
@ -173,7 +173,7 @@ next(It = #it{module = Mod, data = ItData}) ->
|
||||||
preserve_iterator(It = #it{}, IteratorID) ->
|
preserve_iterator(It = #it{}, IteratorID) ->
|
||||||
iterator_put_state(IteratorID, It).
|
iterator_put_state(IteratorID, It).
|
||||||
|
|
||||||
-spec restore_iterator(emqx_ds:shard(), emqx_ds_replay:replay_id()) ->
|
-spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
|
||||||
{ok, iterator()} | {error, _TODO}.
|
{ok, iterator()} | {error, _TODO}.
|
||||||
restore_iterator(Shard, ReplayID) ->
|
restore_iterator(Shard, ReplayID) ->
|
||||||
case iterator_get_state(Shard, ReplayID) of
|
case iterator_get_state(Shard, ReplayID) of
|
||||||
|
@ -185,7 +185,7 @@ restore_iterator(Shard, ReplayID) ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec is_iterator_present(emqx_ds:shard(), emqx_ds_replay:replay_id()) ->
|
-spec is_iterator_present(emqx_ds:shard(), emqx_ds:replay_id()) ->
|
||||||
boolean().
|
boolean().
|
||||||
is_iterator_present(Shard, ReplayID) ->
|
is_iterator_present(Shard, ReplayID) ->
|
||||||
%% TODO: use keyMayExist after added to wrapper?
|
%% TODO: use keyMayExist after added to wrapper?
|
||||||
|
@ -196,7 +196,7 @@ is_iterator_present(Shard, ReplayID) ->
|
||||||
false
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec discard_iterator(emqx_ds:shard(), emqx_ds_replay:replay_id()) ->
|
-spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
|
||||||
ok | {error, _TODO}.
|
ok | {error, _TODO}.
|
||||||
discard_iterator(Shard, ReplayID) ->
|
discard_iterator(Shard, ReplayID) ->
|
||||||
iterator_delete(Shard, ReplayID).
|
iterator_delete(Shard, ReplayID).
|
||||||
|
|
|
@ -31,7 +31,7 @@ store(Tab, MessageID, PublishedAt, Topic, Payload) ->
|
||||||
true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}),
|
true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec iterate(t(), emqx_ds_replay:replay()) ->
|
-spec iterate(t(), emqx_ds:replay()) ->
|
||||||
[binary()].
|
[binary()].
|
||||||
iterate(Tab, {TopicFilter, StartTime}) ->
|
iterate(Tab, {TopicFilter, StartTime}) ->
|
||||||
ets:foldr(
|
ets:foldr(
|
||||||
|
|
Loading…
Reference in New Issue