From b010efb6474c7552e37597747e3cc7d6c606e047 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 24 Jul 2024 10:23:47 +0200 Subject: [PATCH] fix(ds): Improve logic of skipstream LTS layout Iterators: Previously it used timestamp of the next message as a reference. This won't work well for the upcoming beamformer/beamsplitter feature. This commit changes the logic so iterators store timestamp of the last seen message. Cooked batches: Cooked batches no longer store index entries. Creation of indexes has been delegated to commit callback. --- .../src/emqx_ds_storage_skipstream_lts.erl | 118 +++++++++++------- 1 file changed, 70 insertions(+), 48 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl index f555241f2..cb87b8a6f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl @@ -55,9 +55,14 @@ %% Type declarations %%================================================================================ -%% keys: +%% TLOG entry +%% keys: -define(cooked_payloads, 6). -define(cooked_lts_ops, 7). +%% Payload: +-define(cooked_payload(TIMESTAMP, STATIC, VARYING, VALUE), + {TIMESTAMP, STATIC, VARYING, VALUE} +). -define(lts_persist_ops, emqx_ds_storage_skipstream_lts_ops). @@ -101,10 +106,11 @@ -record(it, { static_index :: emqx_ds_lts:static_key(), - %% Minimal timestamp of the next message: + %% Timestamp of the last visited message: ts :: ts(), %% Compressed topic filter: - compressed_tf :: binary() + compressed_tf :: binary(), + misc = [] }). %% Level iterator: @@ -170,28 +176,16 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF, ok = rocksdb:drop_column_family(DBHandle, TrieCF), ok. -prepare_batch( - _ShardId, - S = #s{trie = Trie, hash_bytes = HashBytes}, - Messages, - _Options -) -> +prepare_batch(_ShardId, S = #s{trie = Trie}, Messages, _Options) -> _ = erase(?lts_persist_ops), - Payloads = - lists:flatmap( - fun({Timestamp, Msg = #message{topic = Topic}}) -> - Tokens = words(Topic), - {Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), - %% TODO: is it possible to create index during the - %% commit phase to avoid transferring indexes through - %% the translog? - [ - {mk_key(Static, 0, <<>>, Timestamp), serialize(S, Varying, Msg)} - | mk_index(HashBytes, Static, Timestamp, Varying) - ] - end, - Messages - ), + Payloads = [ + begin + Tokens = words(Topic), + {Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), + ?cooked_payload(Timestamp, Static, Varying, serialize(S, Varying, Msg)) + end + || {Timestamp, Msg = #message{topic = Topic}} <- Messages + ], {ok, #{ ?cooked_payloads => Payloads, ?cooked_lts_ops => pop_lts_persist_ops() @@ -199,7 +193,7 @@ prepare_batch( commit_batch( _ShardId, - #s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie}, + #s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie, hash_bytes = HashBytes}, #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads}, Options ) -> @@ -216,8 +210,10 @@ commit_batch( _ = emqx_ds_lts:trie_update(Trie, LtsOps), %% Commit payloads: lists:foreach( - fun({Key, Val}) -> - ok = rocksdb:batch_put(Batch, DataCF, Key, Val) + fun(?cooked_payload(Timestamp, Static, Varying, ValBlob)) -> + MasterKey = mk_key(Static, 0, <<>>, Timestamp), + ok = rocksdb:batch_put(Batch, DataCF, MasterKey, ValBlob), + mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp) end, Payloads ), @@ -243,12 +239,14 @@ get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> get_delete_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> get_streams(Trie, TopicFilter). +make_iterator(_Shard, _State, _Stream, _TopicFilter, TS) when TS >= ?max_ts -> + {error, unrecoverable, "Timestamp is too large"}; make_iterator(_Shard, #s{trie = Trie}, #stream{static_index = StaticIdx}, TopicFilter, StartTime) -> {ok, TopicStructure} = emqx_ds_lts:reverse_lookup(Trie, StaticIdx), CompressedTF = emqx_ds_lts:compress_topic(StaticIdx, TopicStructure, TopicFilter), {ok, #it{ static_index = StaticIdx, - ts = StartTime, + ts = dec_ts(StartTime), compressed_tf = emqx_topic:join(CompressedTF) }}. @@ -442,7 +440,7 @@ do_init_iterators(S, Static, [], _WildcardLevel) -> next_loop( Shard, S = #s{trie = Trie}, - It = #it{static_index = StaticIdx, ts = TS, compressed_tf = CompressedTF}, + It = #it{static_index = StaticIdx, ts = LastTS, compressed_tf = CompressedTF}, Iterators, BatchSize, TMax @@ -465,10 +463,10 @@ next_loop( filter = words(CompressedTF), tmax = TMax }, - next_loop(Ctx, It, BatchSize, {seek, TS}, []). + next_loop(Ctx, It, BatchSize, {seek, inc_ts(LastTS)}, []). -next_loop(_Ctx, It, 0, Op, Acc) -> - finalize_loop(It, Op, Acc); +next_loop(_Ctx, It, 0, _Op, Acc) -> + finalize_loop(It, Acc); next_loop(Ctx, It0, BatchSize, Op, Acc) -> %% ?tp(notice, skipstream_loop, #{ %% ts => It0#it.ts, tf => It0#it.compressed_tf, bs => BatchSize, tmax => TMax, op => Op @@ -479,15 +477,15 @@ next_loop(Ctx, It0, BatchSize, Op, Acc) -> none -> %% ?tp(notice, skipstream_loop_result, #{r => none}), inc_counter(?DS_SKIPSTREAM_LTS_EOS), - finalize_loop(It0, Op, Acc); + finalize_loop(It0, Acc); {seek, TS} when TS > TMax -> %% ?tp(notice, skipstream_loop_result, #{r => seek_future, ts => TS}), inc_counter(?DS_SKIPSTREAM_LTS_FUTURE), - finalize_loop(It0, {seek, TS}, Acc); + finalize_loop(It0, Acc); {ok, TS, _Key, _Msg0} when TS > TMax -> %% ?tp(notice, skipstream_loop_result, #{r => ok_future, ts => TS, key => _Key}), inc_counter(?DS_SKIPSTREAM_LTS_FUTURE), - finalize_loop(It0, {seek, TS}, Acc); + finalize_loop(It0, Acc); {seek, TS} -> %% ?tp(notice, skipstream_loop_result, #{r => seek, ts => TS}), It = It0#it{ts = TS}, @@ -499,12 +497,7 @@ next_loop(Ctx, It0, BatchSize, Op, Acc) -> next_loop(Ctx, It, BatchSize - 1, next, [{DSKey, Message} | Acc]) end. -finalize_loop(It0, Op, Acc) -> - case Op of - next -> NextTS = It0#it.ts + 1; - {seek, NextTS} -> ok - end, - It = It0#it{ts = NextTS}, +finalize_loop(It, Acc) -> {ok, It, lists:reverse(Acc)}. next_step( @@ -581,14 +574,15 @@ free_iterators(Its) -> %%%%%%%% Indexes %%%%%%%%%% -mk_index(HashBytes, Static, Timestamp, Varying) -> - mk_index(HashBytes, Static, Timestamp, 1, Varying, []). +mk_index(Batch, CF, HashBytes, Static, Varying, Timestamp) -> + mk_index(Batch, CF, HashBytes, Static, Timestamp, 1, Varying). -mk_index(HashBytes, Static, Timestamp, N, [TopicLevel | Varying], Acc) -> - Op = {mk_key(Static, N, hash(HashBytes, TopicLevel), Timestamp), <<>>}, - mk_index(HashBytes, Static, Timestamp, N + 1, Varying, [Op | Acc]); -mk_index(_HashBytes, _Static, _Timestamp, _N, [], Acc) -> - Acc. +mk_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) -> + Key = mk_key(Static, N, hash(HashBytes, TopicLevel), Timestamp), + ok = rocksdb:batch_put(Batch, CF, Key, <<>>), + mk_index(Batch, CF, HashBytes, Static, Timestamp, N + 1, Varying); +mk_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) -> + ok. %%%%%%%% Keys %%%%%%%%%% @@ -747,3 +741,31 @@ collect_counters(Shard) -> end, ?COUNTERS ). + +inc_ts(?max_ts) -> 0; +inc_ts(TS) when TS >= 0, TS < ?max_ts -> TS + 1. + +dec_ts(0) -> ?max_ts; +dec_ts(TS) when TS > 0, TS =< ?max_ts -> TS - 1. + +%%================================================================================ +%% Tests +%%================================================================================ + +-ifdef(TEST). + +inc_dec_test_() -> + Numbers = [0, 1, 100, ?max_ts - 1, ?max_ts], + [ + ?_assertEqual(N, dec_ts(inc_ts(N))) + || N <- Numbers + ]. + +dec_inc_test_() -> + Numbers = [0, 1, 100, ?max_ts - 1, ?max_ts], + [ + ?_assertEqual(N, inc_ts(dec_ts(N))) + || N <- Numbers + ]. + +-endif.