From d631b5b29671089933a87bb8674e238a91ecae5b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 31 Jul 2024 18:44:57 +0200 Subject: [PATCH] feat(ds): support deletions + precondition-related API in bitfield-lts --- .../src/emqx_ds_storage_bitfield_lts.erl | 67 +++++++++++++------ .../src/emqx_ds_storage_layer.erl | 1 + 2 files changed, 49 insertions(+), 19 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index fb831318e..28a4d54c2 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -37,6 +37,7 @@ update_iterator/4, next/6, delete_next/7, + lookup_message/3, handle_event/4 ]). @@ -46,6 +47,7 @@ -export_type([options/0]). +-include("emqx_ds.hrl"). -include("emqx_ds_metrics.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). @@ -68,10 +70,13 @@ -define(start_time, 3). -define(storage_key, 4). -define(last_seen_key, 5). --define(cooked_payloads, 6). +-define(cooked_msg_ops, 6). -define(cooked_lts_ops, 7). -define(cooked_ts, 8). +%% atoms: +-define(delete, 100). + -type options() :: #{ bits_per_wildcard_level => pos_integer(), @@ -110,7 +115,7 @@ -type cooked_batch() :: #{ - ?cooked_payloads := [{binary(), binary()}], + ?cooked_msg_ops := [{binary(), binary() | ?delete}], ?cooked_lts_ops := [{binary(), binary()}], ?cooked_ts := integer() }. @@ -271,24 +276,28 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) -> -spec prepare_batch( emqx_ds_storage_layer:shard_id(), s(), - [{emqx_ds:time(), emqx_types:message()}, ...], + emqx_ds_storage_layer:batch(), emqx_ds_storage_layer:batch_store_opts() ) -> {ok, cooked_batch()}. -prepare_batch(_ShardId, S, Messages, _Options) -> +prepare_batch(_ShardId, S, Batch, _Options) -> _ = erase(?lts_persist_ops), - {Payloads, MaxTs} = + {Operations, MaxTs} = lists:mapfoldl( - fun({Timestamp, Msg}, Acc) -> - {Key, _} = make_key(S, Timestamp, Msg), - Payload = {Key, message_to_value_v1(Msg)}, - {Payload, max(Acc, Timestamp)} + fun + ({Timestamp, Msg = #message{topic = Topic}}, Acc) -> + {Key, _} = make_key(S, Timestamp, Topic), + Op = {Key, message_to_value_v1(Msg)}, + {Op, max(Acc, Timestamp)}; + ({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}, Acc) -> + {Key, _} = make_key(S, Timestamp, Topic), + {_Op = {Key, ?delete}, Acc} end, 0, - Messages + Batch ), {ok, #{ - ?cooked_payloads => Payloads, + ?cooked_msg_ops => Operations, ?cooked_lts_ops => pop_lts_persist_ops(), ?cooked_ts => MaxTs }}. @@ -302,7 +311,7 @@ prepare_batch(_ShardId, S, Messages, _Options) -> commit_batch( _ShardId, _Data, - #{?cooked_payloads := [], ?cooked_lts_ops := LTS}, + #{?cooked_msg_ops := [], ?cooked_lts_ops := LTS}, _Options ) -> %% Assert: @@ -311,7 +320,7 @@ commit_batch( commit_batch( _ShardId, #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars}, - #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs}, + #{?cooked_lts_ops := LtsOps, ?cooked_msg_ops := Operations, ?cooked_ts := MaxTs}, Options ) -> {ok, Batch} = rocksdb:batch(), @@ -326,10 +335,13 @@ commit_batch( _ = emqx_ds_lts:trie_update(Trie, LtsOps), %% Commit payloads: lists:foreach( - fun({Key, Val}) -> - ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val)) + fun + ({Key, Val}) when is_tuple(Val) -> + ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val)); + ({Key, ?delete}) -> + ok = rocksdb:batch_delete(Batch, DataCF, Key) end, - Payloads + Operations ), Result = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)), rocksdb:release_batch(Batch), @@ -556,6 +568,23 @@ delete_next_until( rocksdb:iterator_close(ITHandle) end. +-spec lookup_message(emqx_ds_storage_layer:shard_id(), s(), emqx_ds_precondition:matcher()) -> + emqx_types:message() | not_found | emqx_ds:error(_). +lookup_message( + _ShardId, + S = #s{db = DB, data = CF}, + #message_matcher{topic = Topic, timestamp = Timestamp} +) -> + {Key, _} = make_key(S, Timestamp, Topic), + case rocksdb:get(DB, CF, Key, _ReadOpts = []) of + {ok, Blob} -> + deserialize(Blob); + not_found -> + not_found; + Error -> + {error, unrecoverable, {rocksdb, Error}} + end. + handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> %% If the last message was published more than one epoch ago, and %% the shard remains idle, we need to advance safety cutoff @@ -811,9 +840,9 @@ format_key(KeyMapper, Key) -> Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)], lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])). --spec make_key(s(), emqx_ds:time(), emqx_types:message()) -> {binary(), [binary()]}. -make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, #message{topic = TopicBin}) -> - Tokens = emqx_topic:words(TopicBin), +-spec make_key(s(), emqx_ds:time(), emqx_types:topic()) -> {binary(), [binary()]}. +make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, Topic) -> + Tokens = emqx_topic:words(Topic), {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), VaryingHashes = [hash_topic_level(I) || I <- Varying], KeyMapper = array:get(length(Varying), KeyMappers), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index ac2edc254..3afdad01a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -64,6 +64,7 @@ -export_type([ gen_id/0, generation/0, + batch/0, cf_refs/0, stream/0, delete_stream/0,