From fe4640922d8e5ee1bc52fba8dd74c168a3376f2e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 7 Sep 2023 15:31:21 -0300 Subject: [PATCH] feat(ds): add delete callback --- .../src/emqx_ds_message_storage_bitmask.erl | 11 +++++++++- .../src/emqx_ds_storage_layer.erl | 20 +++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index 57608e5cb..a97b89580 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -77,11 +77,14 @@ %% %%================================================================================ +-behaviour(emqx_ds_storage_layer). + %% API: -export([create_new/3, open/5]). -export([make_keymapper/1]). -export([store/5]). +-export([delete/4]). -export([make_iterator/2]). -export([make_iterator/3]). -export([next/1]). @@ -270,13 +273,19 @@ make_keymapper(#{ epoch = 1 bsl TimestampLSBs }. --spec store(db(), emqx_guid:guid(), time(), topic(), binary()) -> +-spec store(db(), emqx_guid:guid(), emqx_ds:time(), topic(), binary()) -> ok | {error, _TODO}. store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, MessagePayload) -> Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper), Value = make_message_value(Topic, MessagePayload), rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). +-spec delete(db(), emqx_guid:guid(), emqx_ds:time(), topic()) -> + ok | {error, _TODO}. +delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic) -> + Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper), + rocksdb:delete(DBHandle, CFHandle, Key, DB#db.write_options). + -spec make_iterator(db(), emqx_ds:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, Replay) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 47c29e170..a16c9b476 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -10,6 +10,7 @@ -export([create_generation/3]). -export([store/5]). +-export([delete/4]). -export([make_iterator/2, next/1]). @@ -109,7 +110,16 @@ -callback open(emqx_ds:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> term(). --callback store(_Schema, binary(), emqx_ds:time(), emqx_ds:topic(), binary()) -> +-callback store( + _Schema, + _MessageID :: binary(), + emqx_ds:time(), + emqx_ds:topic(), + _Payload :: binary() +) -> + ok | {error, _}. + +-callback delete(_Schema, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic()) -> ok | {error, _}. -callback make_iterator(_Schema, emqx_ds:replay()) -> @@ -117,7 +127,7 @@ -callback restore_iterator(_Schema, emqx_ds:replay(), binary()) -> {ok, _It} | {error, _}. --callback preserve_iterator(_Schema, _It) -> term(). +-callback preserve_iterator(_It) -> term(). -callback next(It) -> {value, binary(), It} | none | {error, closed}. @@ -140,6 +150,12 @@ store(Shard, GUID, Time, Topic, Msg) -> {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), Mod:store(Data, GUID, Time, Topic, Msg). +-spec delete(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic()) -> + ok | {error, _}. +delete(Shard, GUID, Time, Topic) -> + {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), + Mod:delete(Data, GUID, Time, Topic). + -spec make_iterator(emqx_ds:shard(), emqx_ds:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(Shard, Replay = {_, StartTime}) ->