From 3344bfb0bd74c5fa9542a54e278f72eb39424cdf Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 16 Aug 2023 15:56:29 -0300 Subject: [PATCH] refactor: rm `emqx_ds_replay` --- apps/emqx_durable_storage/src/emqx_ds.erl | 9 +++++ .../src/emqx_ds_message_storage_bitmask.erl | 8 ++--- .../src/emqx_ds_replay.erl | 36 ------------------- .../src/emqx_ds_storage_layer.erl | 14 ++++---- .../emqx_ds_message_storage_bitmask_shim.erl | 2 +- 5 files changed, 21 insertions(+), 48 deletions(-) delete mode 100644 apps/emqx_durable_storage/src/emqx_ds_replay.erl diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 3cc7ca886..7ec9f3801 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -42,6 +42,8 @@ message_stats/0, message_store_opts/0, session_id/0, + replay/0, + replay_id/0, iterator_id/0, iterator/0, shard/0, @@ -80,6 +82,13 @@ %% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. -type time() :: non_neg_integer(). +-type replay_id() :: binary(). + +-type replay() :: { + _TopicFilter :: emqx_topic:words(), + _StartTime :: time() +}. + %%================================================================================ %% API funcions %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index 74a50c302..57608e5cb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -277,13 +277,13 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, Value = make_message_value(Topic, MessagePayload), rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). --spec make_iterator(db(), emqx_ds_replay:replay()) -> +-spec make_iterator(db(), emqx_ds:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, Replay) -> Options = emqx_ds_conf:shard_iteration_options(DB#db.shard), make_iterator(DB, Replay, Options). --spec make_iterator(db(), emqx_ds_replay:replay(), iteration_options()) -> +-spec make_iterator(db(), emqx_ds:replay(), iteration_options()) -> % {error, invalid_start_time}? might just start from the beginning of time % and call it a day: client violated the contract anyway. {ok, iterator()} | {error, _TODO}. @@ -337,7 +337,7 @@ preserve_iterator(#it{cursor = Cursor}) -> }, term_to_binary(State). --spec restore_iterator(db(), emqx_ds_replay:replay(), binary()) -> +-spec restore_iterator(db(), emqx_ds:replay(), binary()) -> {ok, iterator()} | {error, _TODO}. restore_iterator(DB, Replay, Serial) when is_binary(Serial) -> State = binary_to_term(Serial), @@ -419,7 +419,7 @@ hash(Input, Bits) -> % at most 32 bits erlang:phash2(Input, 1 bsl Bits). --spec make_keyspace_filter(emqx_ds_replay:replay(), keymapper()) -> keyspace_filter(). +-spec make_keyspace_filter(emqx_ds:replay(), keymapper()) -> keyspace_filter(). make_keyspace_filter({TopicFilter, StartTime}, Keymapper) -> Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper), HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper), diff --git a/apps/emqx_durable_storage/src/emqx_ds_replay.erl b/apps/emqx_durable_storage/src/emqx_ds_replay.erl deleted file mode 100644 index b9ffa32ac..000000000 --- a/apps/emqx_durable_storage/src/emqx_ds_replay.erl +++ /dev/null @@ -1,36 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- --module(emqx_ds_replay). - -%% API: --export([]). - --export_type([replay_id/0, replay/0]). - -%%================================================================================ -%% Type declarations -%%================================================================================ - --type replay_id() :: binary(). - --type replay() :: { - _TopicFilter :: emqx_ds:words(), - _StartTime :: emqx_ds:time() -}. - -%%================================================================================ -%% API funcions -%%================================================================================ - -%%================================================================================ -%% behaviour callbacks -%%================================================================================ - -%%================================================================================ -%% Internal exports -%%================================================================================ - -%%================================================================================ -%% Internal functions -%%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index adede5322..69c0e008c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -71,7 +71,7 @@ -record(it, { shard :: emqx_ds:shard(), gen :: gen_id(), - replay :: emqx_ds_replay:replay(), + replay :: emqx_ds:replay(), module :: module(), data :: term() }). @@ -112,10 +112,10 @@ -callback store(_Schema, binary(), emqx_ds:time(), emqx_ds:topic(), binary()) -> ok | {error, _}. --callback make_iterator(_Schema, emqx_ds_replay:replay()) -> +-callback make_iterator(_Schema, emqx_ds:replay()) -> {ok, _It} | {error, _}. --callback restore_iterator(_Schema, emqx_ds_replay:replay(), binary()) -> {ok, _It} | {error, _}. +-callback restore_iterator(_Schema, emqx_ds:replay(), binary()) -> {ok, _It} | {error, _}. -callback preserve_iterator(_Schema, _It) -> term(). @@ -140,7 +140,7 @@ store(Shard, GUID, Time, Topic, Msg) -> {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), Mod:store(Data, GUID, Time, Topic, Msg). --spec make_iterator(emqx_ds:shard(), emqx_ds_replay:replay()) -> +-spec make_iterator(emqx_ds:shard(), emqx_ds:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(Shard, Replay = {_, StartTime}) -> {GenId, Gen} = meta_lookup_gen(Shard, StartTime), @@ -173,7 +173,7 @@ next(It = #it{module = Mod, data = ItData}) -> preserve_iterator(It = #it{}, IteratorID) -> iterator_put_state(IteratorID, It). --spec restore_iterator(emqx_ds:shard(), emqx_ds_replay:replay_id()) -> +-spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> {ok, iterator()} | {error, _TODO}. restore_iterator(Shard, ReplayID) -> case iterator_get_state(Shard, ReplayID) of @@ -185,7 +185,7 @@ restore_iterator(Shard, ReplayID) -> Error end. --spec is_iterator_present(emqx_ds:shard(), emqx_ds_replay:replay_id()) -> +-spec is_iterator_present(emqx_ds:shard(), emqx_ds:replay_id()) -> boolean(). is_iterator_present(Shard, ReplayID) -> %% TODO: use keyMayExist after added to wrapper? @@ -196,7 +196,7 @@ is_iterator_present(Shard, ReplayID) -> false end. --spec discard_iterator(emqx_ds:shard(), emqx_ds_replay:replay_id()) -> +-spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> ok | {error, _TODO}. discard_iterator(Shard, ReplayID) -> iterator_delete(Shard, ReplayID). diff --git a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl index 10431eb1a..e9daf2581 100644 --- a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl +++ b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl @@ -31,7 +31,7 @@ store(Tab, MessageID, PublishedAt, Topic, Payload) -> true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}), ok. --spec iterate(t(), emqx_ds_replay:replay()) -> +-spec iterate(t(), emqx_ds:replay()) -> [binary()]. iterate(Tab, {TopicFilter, StartTime}) -> ets:foldr(