Merge pull request #13514 from ieQu1/skip-streams-improvement

fix(ds): Improve logic of skipstream LTS layout
This commit is contained in:
ieQu1 2024-07-24 13:28:44 +02:00 committed by GitHub
commit d1edf8aad2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 70 additions and 48 deletions

View File

@ -55,9 +55,14 @@
%% Type declarations
%%================================================================================
%% TLOG entry
%% keys:
-define(cooked_payloads, 6).
-define(cooked_lts_ops, 7).
%% Payload:
-define(cooked_payload(TIMESTAMP, STATIC, VARYING, VALUE),
{TIMESTAMP, STATIC, VARYING, VALUE}
).
-define(lts_persist_ops, emqx_ds_storage_skipstream_lts_ops).
@ -101,10 +106,11 @@
-record(it, {
static_index :: emqx_ds_lts:static_key(),
%% Minimal timestamp of the next message:
%% Timestamp of the last visited message:
ts :: ts(),
%% Compressed topic filter:
compressed_tf :: binary()
compressed_tf :: binary(),
misc = []
}).
%% Level iterator:
@ -170,28 +176,16 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF,
ok = rocksdb:drop_column_family(DBHandle, TrieCF),
ok.
prepare_batch(
_ShardId,
S = #s{trie = Trie, hash_bytes = HashBytes},
Messages,
_Options
) ->
prepare_batch(_ShardId, S = #s{trie = Trie}, Messages, _Options) ->
_ = erase(?lts_persist_ops),
Payloads =
lists:flatmap(
fun({Timestamp, Msg = #message{topic = Topic}}) ->
Payloads = [
begin
Tokens = words(Topic),
{Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
%% TODO: is it possible to create index during the
%% commit phase to avoid transferring indexes through
%% the translog?
[
{mk_key(Static, 0, <<>>, Timestamp), serialize(S, Varying, Msg)}
| mk_index(HashBytes, Static, Timestamp, Varying)
]
end,
Messages
),
?cooked_payload(Timestamp, Static, Varying, serialize(S, Varying, Msg))
end
|| {Timestamp, Msg = #message{topic = Topic}} <- Messages
],
{ok, #{
?cooked_payloads => Payloads,
?cooked_lts_ops => pop_lts_persist_ops()
@ -199,7 +193,7 @@ prepare_batch(
commit_batch(
_ShardId,
#s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie},
#s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie, hash_bytes = HashBytes},
#{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads},
Options
) ->
@ -216,8 +210,10 @@ commit_batch(
_ = emqx_ds_lts:trie_update(Trie, LtsOps),
%% Commit payloads:
lists:foreach(
fun({Key, Val}) ->
ok = rocksdb:batch_put(Batch, DataCF, Key, Val)
fun(?cooked_payload(Timestamp, Static, Varying, ValBlob)) ->
MasterKey = mk_key(Static, 0, <<>>, Timestamp),
ok = rocksdb:batch_put(Batch, DataCF, MasterKey, ValBlob),
mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp)
end,
Payloads
),
@ -243,12 +239,14 @@ get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
get_delete_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
get_streams(Trie, TopicFilter).
make_iterator(_Shard, _State, _Stream, _TopicFilter, TS) when TS >= ?max_ts ->
{error, unrecoverable, "Timestamp is too large"};
make_iterator(_Shard, #s{trie = Trie}, #stream{static_index = StaticIdx}, TopicFilter, StartTime) ->
{ok, TopicStructure} = emqx_ds_lts:reverse_lookup(Trie, StaticIdx),
CompressedTF = emqx_ds_lts:compress_topic(StaticIdx, TopicStructure, TopicFilter),
{ok, #it{
static_index = StaticIdx,
ts = StartTime,
ts = dec_ts(StartTime),
compressed_tf = emqx_topic:join(CompressedTF)
}}.
@ -442,7 +440,7 @@ do_init_iterators(S, Static, [], _WildcardLevel) ->
next_loop(
Shard,
S = #s{trie = Trie},
It = #it{static_index = StaticIdx, ts = TS, compressed_tf = CompressedTF},
It = #it{static_index = StaticIdx, ts = LastTS, compressed_tf = CompressedTF},
Iterators,
BatchSize,
TMax
@ -465,10 +463,10 @@ next_loop(
filter = words(CompressedTF),
tmax = TMax
},
next_loop(Ctx, It, BatchSize, {seek, TS}, []).
next_loop(Ctx, It, BatchSize, {seek, inc_ts(LastTS)}, []).
next_loop(_Ctx, It, 0, Op, Acc) ->
finalize_loop(It, Op, Acc);
next_loop(_Ctx, It, 0, _Op, Acc) ->
finalize_loop(It, Acc);
next_loop(Ctx, It0, BatchSize, Op, Acc) ->
%% ?tp(notice, skipstream_loop, #{
%% ts => It0#it.ts, tf => It0#it.compressed_tf, bs => BatchSize, tmax => TMax, op => Op
@ -479,15 +477,15 @@ next_loop(Ctx, It0, BatchSize, Op, Acc) ->
none ->
%% ?tp(notice, skipstream_loop_result, #{r => none}),
inc_counter(?DS_SKIPSTREAM_LTS_EOS),
finalize_loop(It0, Op, Acc);
finalize_loop(It0, Acc);
{seek, TS} when TS > TMax ->
%% ?tp(notice, skipstream_loop_result, #{r => seek_future, ts => TS}),
inc_counter(?DS_SKIPSTREAM_LTS_FUTURE),
finalize_loop(It0, {seek, TS}, Acc);
finalize_loop(It0, Acc);
{ok, TS, _Key, _Msg0} when TS > TMax ->
%% ?tp(notice, skipstream_loop_result, #{r => ok_future, ts => TS, key => _Key}),
inc_counter(?DS_SKIPSTREAM_LTS_FUTURE),
finalize_loop(It0, {seek, TS}, Acc);
finalize_loop(It0, Acc);
{seek, TS} ->
%% ?tp(notice, skipstream_loop_result, #{r => seek, ts => TS}),
It = It0#it{ts = TS},
@ -499,12 +497,7 @@ next_loop(Ctx, It0, BatchSize, Op, Acc) ->
next_loop(Ctx, It, BatchSize - 1, next, [{DSKey, Message} | Acc])
end.
finalize_loop(It0, Op, Acc) ->
case Op of
next -> NextTS = It0#it.ts + 1;
{seek, NextTS} -> ok
end,
It = It0#it{ts = NextTS},
finalize_loop(It, Acc) ->
{ok, It, lists:reverse(Acc)}.
next_step(
@ -581,14 +574,15 @@ free_iterators(Its) ->
%%%%%%%% Indexes %%%%%%%%%%
mk_index(HashBytes, Static, Timestamp, Varying) ->
mk_index(HashBytes, Static, Timestamp, 1, Varying, []).
mk_index(Batch, CF, HashBytes, Static, Varying, Timestamp) ->
mk_index(Batch, CF, HashBytes, Static, Timestamp, 1, Varying).
mk_index(HashBytes, Static, Timestamp, N, [TopicLevel | Varying], Acc) ->
Op = {mk_key(Static, N, hash(HashBytes, TopicLevel), Timestamp), <<>>},
mk_index(HashBytes, Static, Timestamp, N + 1, Varying, [Op | Acc]);
mk_index(_HashBytes, _Static, _Timestamp, _N, [], Acc) ->
Acc.
mk_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) ->
Key = mk_key(Static, N, hash(HashBytes, TopicLevel), Timestamp),
ok = rocksdb:batch_put(Batch, CF, Key, <<>>),
mk_index(Batch, CF, HashBytes, Static, Timestamp, N + 1, Varying);
mk_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) ->
ok.
%%%%%%%% Keys %%%%%%%%%%
@ -747,3 +741,31 @@ collect_counters(Shard) ->
end,
?COUNTERS
).
inc_ts(?max_ts) -> 0;
inc_ts(TS) when TS >= 0, TS < ?max_ts -> TS + 1.
dec_ts(0) -> ?max_ts;
dec_ts(TS) when TS > 0, TS =< ?max_ts -> TS - 1.
%%================================================================================
%% Tests
%%================================================================================
-ifdef(TEST).
inc_dec_test_() ->
Numbers = [0, 1, 100, ?max_ts - 1, ?max_ts],
[
?_assertEqual(N, dec_ts(inc_ts(N)))
|| N <- Numbers
].
dec_inc_test_() ->
Numbers = [0, 1, 100, ?max_ts - 1, ?max_ts],
[
?_assertEqual(N, inc_ts(dec_ts(N)))
|| N <- Numbers
].
-endif.