From 68990f1538dc0b450b6e63fb8a412498fe21e935 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 30 Jul 2024 16:56:29 +0200 Subject: [PATCH] feat(ds): support operations + preconditions in skipstream-lts --- .../src/emqx_ds_storage_skipstream_lts.erl | 104 ++++++++++++++---- 1 file changed, 82 insertions(+), 22 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl index cb87b8a6f..b466983b7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl @@ -33,7 +33,8 @@ make_delete_iterator/5, update_iterator/4, next/6, - delete_next/7 + delete_next/7, + lookup_message/3 ]). %% internal exports: @@ -43,6 +44,7 @@ -include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). +-include("emqx_ds.hrl"). -include("emqx_ds_metrics.hrl"). -ifdef(TEST). @@ -56,11 +58,12 @@ %%================================================================================ %% TLOG entry -%% keys: --define(cooked_payloads, 6). +%% Keys: +-define(cooked_msg_ops, 6). -define(cooked_lts_ops, 7). %% Payload: --define(cooked_payload(TIMESTAMP, STATIC, VARYING, VALUE), +-define(cooked_delete, 100). +-define(cooked_msg_op(TIMESTAMP, STATIC, VARYING, VALUE), {TIMESTAMP, STATIC, VARYING, VALUE} ). @@ -176,25 +179,39 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF, ok = rocksdb:drop_column_family(DBHandle, TrieCF), ok. -prepare_batch(_ShardId, S = #s{trie = Trie}, Messages, _Options) -> +prepare_batch( + _ShardId, + S = #s{trie = Trie}, + Operations, + _Options +) -> _ = erase(?lts_persist_ops), - Payloads = [ - begin - Tokens = words(Topic), - {Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), - ?cooked_payload(Timestamp, Static, Varying, serialize(S, Varying, Msg)) - end - || {Timestamp, Msg = #message{topic = Topic}} <- Messages - ], + OperationsCooked = emqx_utils:flattermap( + fun + ({Timestamp, Msg = #message{topic = Topic}}) -> + Tokens = words(Topic), + {Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), + ?cooked_msg_op(Timestamp, Static, Varying, serialize(S, Varying, Msg)); + ({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}) -> + case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of + {ok, {Static, Varying}} -> + ?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete); + undefined -> + %% Topic is unknown, nothing to delete. + [] + end + end, + Operations + ), {ok, #{ - ?cooked_payloads => Payloads, + ?cooked_msg_ops => OperationsCooked, ?cooked_lts_ops => pop_lts_persist_ops() }}. commit_batch( _ShardId, #s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie, hash_bytes = HashBytes}, - #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads}, + #{?cooked_lts_ops := LtsOps, ?cooked_msg_ops := Operations}, Options ) -> {ok, Batch} = rocksdb:batch(), @@ -210,12 +227,17 @@ commit_batch( _ = emqx_ds_lts:trie_update(Trie, LtsOps), %% Commit payloads: lists:foreach( - fun(?cooked_payload(Timestamp, Static, Varying, ValBlob)) -> - MasterKey = mk_key(Static, 0, <<>>, Timestamp), - ok = rocksdb:batch_put(Batch, DataCF, MasterKey, ValBlob), - mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp) + fun + (?cooked_msg_op(Timestamp, Static, Varying, ValBlob = <<_/bytes>>)) -> + MasterKey = mk_key(Static, 0, <<>>, Timestamp), + ok = rocksdb:batch_put(Batch, DataCF, MasterKey, ValBlob), + mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp); + (?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete)) -> + MasterKey = mk_key(Static, 0, <<>>, Timestamp), + ok = rocksdb:batch_delete(Batch, DataCF, MasterKey), + delete_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp) end, - Payloads + Operations ), Result = rocksdb:write_batch(DB, Batch, [ {disable_wal, not maps:get(durable, Options, true)} @@ -285,6 +307,28 @@ delete_next(Shard, S, It0, Selector, BatchSize, Now, IsCurrent) -> Ret end. +lookup_message( + Shard, + S = #s{db = DB, data_cf = CF, trie = Trie}, + #message_matcher{topic = Topic, timestamp = Timestamp} +) -> + case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of + {ok, {StaticIdx, _Varying}} -> + DSKey = mk_key(StaticIdx, 0, <<>>, Timestamp), + case rocksdb:get(DB, CF, DSKey, _ReadOpts = []) of + {ok, Val} -> + {ok, TopicStructure} = emqx_ds_lts:reverse_lookup(Trie, StaticIdx), + Msg = deserialize(S, Val), + enrich(Shard, S, TopicStructure, DSKey, Msg); + not_found -> + not_found; + {error, Reason} -> + {error, unrecoverable, Reason} + end; + undefined -> + not_found + end. + %%================================================================================ %% Internal exports %%================================================================================ @@ -330,12 +374,18 @@ serialize(#s{serialization_schema = SSchema, with_guid = WithGuid}, Varying, Msg }, emqx_ds_msg_serializer:serialize(SSchema, Msg). +enrich(#ctx{shard = Shard, s = S, topic_structure = TopicStructure}, DSKey, Msg0) -> + enrich(Shard, S, TopicStructure, DSKey, Msg0). + enrich( - #ctx{shard = Shard, topic_structure = Structure, s = #s{with_guid = WithGuid}}, + Shard, + #s{with_guid = WithGuid}, + TopicStructure, DSKey, Msg0 ) -> - Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(Structure, words(Msg0#message.topic))), + Tokens = words(Msg0#message.topic), + Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(TopicStructure, Tokens)), Msg0#message{ topic = Topic, id = @@ -584,6 +634,16 @@ mk_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) -> mk_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) -> ok. +delete_index(Batch, CF, HashBytes, Static, Varying, Timestamp) -> + delete_index(Batch, CF, HashBytes, Static, Timestamp, 1, Varying). + +delete_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) -> + Key = mk_key(Static, N, hash(HashBytes, TopicLevel), Timestamp), + ok = rocksdb:batch_delete(Batch, CF, Key), + delete_index(Batch, CF, HashBytes, Static, Timestamp, N + 1, Varying); +delete_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) -> + ok. + %%%%%%%% Keys %%%%%%%%%% get_key_range(StaticIdx, WildcardIdx, Hash) ->