feat(ds): add delete callback

This commit is contained in:
Thales Macedo Garitezi 2023-09-07 15:31:21 -03:00
parent 7805cc8c9b
commit fe4640922d
2 changed files with 28 additions and 3 deletions

View File

@ -77,11 +77,14 @@
%%
%%================================================================================
-behaviour(emqx_ds_storage_layer).
%% API:
-export([create_new/3, open/5]).
-export([make_keymapper/1]).
-export([store/5]).
-export([delete/4]).
-export([make_iterator/2]).
-export([make_iterator/3]).
-export([next/1]).
@ -270,13 +273,19 @@ make_keymapper(#{
epoch = 1 bsl TimestampLSBs
}.
-spec store(db(), emqx_guid:guid(), time(), topic(), binary()) ->
-spec store(db(), emqx_guid:guid(), emqx_ds:time(), topic(), binary()) ->
ok | {error, _TODO}.
store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, MessagePayload) ->
Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper),
Value = make_message_value(Topic, MessagePayload),
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
-spec delete(db(), emqx_guid:guid(), emqx_ds:time(), topic()) ->
ok | {error, _TODO}.
delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic) ->
Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper),
rocksdb:delete(DBHandle, CFHandle, Key, DB#db.write_options).
-spec make_iterator(db(), emqx_ds:replay()) ->
{ok, iterator()} | {error, _TODO}.
make_iterator(DB, Replay) ->

View File

@ -10,6 +10,7 @@
-export([create_generation/3]).
-export([store/5]).
-export([delete/4]).
-export([make_iterator/2, next/1]).
@ -109,7 +110,16 @@
-callback open(emqx_ds:shard(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
term().
-callback store(_Schema, binary(), emqx_ds:time(), emqx_ds:topic(), binary()) ->
-callback store(
_Schema,
_MessageID :: binary(),
emqx_ds:time(),
emqx_ds:topic(),
_Payload :: binary()
) ->
ok | {error, _}.
-callback delete(_Schema, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic()) ->
ok | {error, _}.
-callback make_iterator(_Schema, emqx_ds:replay()) ->
@ -117,7 +127,7 @@
-callback restore_iterator(_Schema, emqx_ds:replay(), binary()) -> {ok, _It} | {error, _}.
-callback preserve_iterator(_Schema, _It) -> term().
-callback preserve_iterator(_It) -> term().
-callback next(It) -> {value, binary(), It} | none | {error, closed}.
@ -140,6 +150,12 @@ store(Shard, GUID, Time, Topic, Msg) ->
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
Mod:store(Data, GUID, Time, Topic, Msg).
-spec delete(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic()) ->
ok | {error, _}.
delete(Shard, GUID, Time, Topic) ->
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
Mod:delete(Data, GUID, Time, Topic).
-spec make_iterator(emqx_ds:shard(), emqx_ds:replay()) ->
{ok, iterator()} | {error, _TODO}.
make_iterator(Shard, Replay = {_, StartTime}) ->