diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl index 164050932..bb786c1f0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl @@ -293,6 +293,21 @@ delete_next(Shard, S, It0, Selector, BatchSize, Now, IsCurrent) -> %% Internal functions %%================================================================================ +%% Loop context: +-record(ctx, { + shard, + %% Generation runtime state + s, + %% RocksDB iterators: + iters, + %% Cached topic structure for the static index: + topic_structure, + %% Maximum time: + tmax, + %% Compressed topic filter, split into words: + filter +}). + get_streams(Trie, TopicFilter) -> lists:map( fun({Static, _Varying}) -> @@ -316,34 +331,19 @@ serialize(#s{serialization_schema = SSchema, with_guid = WithGuid}, Varying, Msg emqx_ds_msg_serializer:serialize(SSchema, Msg). enrich( - Shard, - #s{trie = Trie, with_guid = WithGuid}, + #ctx{shard = Shard, topic_structure = Structure, s = #s{with_guid = WithGuid}}, DSKey, - StaticKey, Msg0 ) -> - case emqx_ds_lts:reverse_lookup(Trie, StaticKey) of - {ok, Structure} -> - %% Reconstruct the original topic from the static topic - %% index and varying parts: - Topic = emqx_topic:join( - emqx_ds_lts:decompress_topic(Structure, words(Msg0#message.topic)) - ), - Msg0#message{ - topic = Topic, - id = - case WithGuid of - true -> Msg0#message.id; - false -> fake_guid(Shard, DSKey) - end - }; - undefined -> - Err = #{ - msg => "LTS trie missing key", - key => StaticKey - }, - throw({unrecoverable, Err}) - end. + Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(Structure, words(Msg0#message.topic))), + Msg0#message{ + topic = Topic, + id = + case WithGuid of + true -> Msg0#message.id; + false -> fake_guid(Shard, DSKey) + end + }. deserialize( #s{serialization_schema = SSchema}, @@ -437,15 +437,41 @@ do_init_iterators(S, Static, [], _WildcardLevel) -> } ]. -next_loop(Shard, S, It = #it{ts = TS}, Iterators, BatchSize, TMax) -> - next_loop(Shard, S, It, Iterators, BatchSize, TMax, {seek, TS}, []). +next_loop( + Shard, + S = #s{trie = Trie}, + It = #it{static_index = StaticIdx, ts = TS, compressed_tf = CompressedTF}, + Iterators, + BatchSize, + TMax +) -> + TopicStructure = + case emqx_ds_lts:reverse_lookup(Trie, StaticIdx) of + {ok, Rev} -> + Rev; + undefined -> + throw(#{ + msg => "LTS trie missing key", + key => StaticIdx + }) + end, + Ctx = #ctx{ + shard = Shard, + s = S, + iters = Iterators, + topic_structure = TopicStructure, + filter = words(CompressedTF), + tmax = TMax + }, + next_loop(Ctx, It, BatchSize, {seek, TS}, []). -next_loop(_Shard, _S, It, _Iterators, 0, _TMax, Op, Acc) -> +next_loop(_Ctx, It, 0, Op, Acc) -> finalize_loop(It, Op, Acc); -next_loop(Shard, S, It0, Iterators, BatchSize, TMax, Op, Acc) -> +next_loop(Ctx, It0, BatchSize, Op, Acc) -> %% ?tp(notice, skipstream_loop, #{ %% ts => It0#it.ts, tf => It0#it.compressed_tf, bs => BatchSize, tmax => TMax, op => Op %% }), + #ctx{s = S, tmax = TMax, iters = Iterators} = Ctx, #it{static_index = StaticIdx, compressed_tf = CompressedTF} = It0, case next_step(S, StaticIdx, CompressedTF, Iterators, undefined, Op) of none -> @@ -463,12 +489,12 @@ next_loop(Shard, S, It0, Iterators, BatchSize, TMax, Op, Acc) -> {seek, TS} -> %% ?tp(notice, skipstream_loop_result, #{r => seek, ts => TS}), It = It0#it{ts = TS}, - next_loop(Shard, S, It, Iterators, BatchSize, TMax, {seek, TS}, Acc); + next_loop(Ctx, It, BatchSize, {seek, TS}, Acc); {ok, TS, DSKey, Msg0} -> %% ?tp(notice, skipstream_loop_result, #{r => ok, ts => TS, key => Key}), - Message = enrich(Shard, S, DSKey, StaticIdx, Msg0), + Message = enrich(Ctx, DSKey, Msg0), It = It0#it{ts = TS}, - next_loop(Shard, S, It, Iterators, BatchSize - 1, TMax, next, [{DSKey, Message} | Acc]) + next_loop(Ctx, It, BatchSize - 1, next, [{DSKey, Message} | Acc]) end. finalize_loop(It0, Op, Acc) ->