feat(ds): support operations + preconditions in skipstream-lts

This commit is contained in:
Andrew Mayorov 2024-07-30 16:56:29 +02:00
parent fcf76d28ba
commit 7b243ef7ad
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 82 additions and 22 deletions

View File

@ -33,7 +33,8 @@
make_delete_iterator/5,
update_iterator/4,
next/6,
delete_next/7
delete_next/7,
lookup_message/3
]).
%% internal exports:
@ -43,6 +44,7 @@
-include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-include("emqx_ds.hrl").
-include("emqx_ds_metrics.hrl").
-ifdef(TEST).
@ -56,11 +58,12 @@
%%================================================================================
%% TLOG entry
%% keys:
-define(cooked_payloads, 6).
%% Keys:
-define(cooked_msg_ops, 6).
-define(cooked_lts_ops, 7).
%% Payload:
-define(cooked_payload(TIMESTAMP, STATIC, VARYING, VALUE),
-define(cooked_delete, 100).
-define(cooked_msg_op(TIMESTAMP, STATIC, VARYING, VALUE),
{TIMESTAMP, STATIC, VARYING, VALUE}
).
@ -176,25 +179,39 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF,
ok = rocksdb:drop_column_family(DBHandle, TrieCF),
ok.
prepare_batch(_ShardId, S = #s{trie = Trie}, Messages, _Options) ->
prepare_batch(
_ShardId,
S = #s{trie = Trie},
Operations,
_Options
) ->
_ = erase(?lts_persist_ops),
Payloads = [
begin
OperationsCooked = emqx_utils:flattermap(
fun
({Timestamp, Msg = #message{topic = Topic}}) ->
Tokens = words(Topic),
{Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
?cooked_payload(Timestamp, Static, Varying, serialize(S, Varying, Msg))
?cooked_msg_op(Timestamp, Static, Varying, serialize(S, Varying, Msg));
({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}) ->
case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
{ok, {Static, Varying}} ->
?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete);
undefined ->
%% Topic is unknown, nothing to delete.
[]
end
|| {Timestamp, Msg = #message{topic = Topic}} <- Messages
],
end,
Operations
),
{ok, #{
?cooked_payloads => Payloads,
?cooked_msg_ops => OperationsCooked,
?cooked_lts_ops => pop_lts_persist_ops()
}}.
commit_batch(
_ShardId,
#s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie, hash_bytes = HashBytes},
#{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads},
#{?cooked_lts_ops := LtsOps, ?cooked_msg_ops := Operations},
Options
) ->
{ok, Batch} = rocksdb:batch(),
@ -210,12 +227,17 @@ commit_batch(
_ = emqx_ds_lts:trie_update(Trie, LtsOps),
%% Commit payloads:
lists:foreach(
fun(?cooked_payload(Timestamp, Static, Varying, ValBlob)) ->
fun
(?cooked_msg_op(Timestamp, Static, Varying, ValBlob = <<_/bytes>>)) ->
MasterKey = mk_key(Static, 0, <<>>, Timestamp),
ok = rocksdb:batch_put(Batch, DataCF, MasterKey, ValBlob),
mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp)
mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp);
(?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete)) ->
MasterKey = mk_key(Static, 0, <<>>, Timestamp),
ok = rocksdb:batch_delete(Batch, DataCF, MasterKey),
delete_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp)
end,
Payloads
Operations
),
Result = rocksdb:write_batch(DB, Batch, [
{disable_wal, not maps:get(durable, Options, true)}
@ -285,6 +307,28 @@ delete_next(Shard, S, It0, Selector, BatchSize, Now, IsCurrent) ->
Ret
end.
lookup_message(
Shard,
S = #s{db = DB, data_cf = CF, trie = Trie},
#message_matcher{topic = Topic, timestamp = Timestamp}
) ->
case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
{ok, {StaticIdx, _Varying}} ->
DSKey = mk_key(StaticIdx, 0, <<>>, Timestamp),
case rocksdb:get(DB, CF, DSKey, _ReadOpts = []) of
{ok, Val} ->
{ok, TopicStructure} = emqx_ds_lts:reverse_lookup(Trie, StaticIdx),
Msg = deserialize(S, Val),
enrich(Shard, S, TopicStructure, DSKey, Msg);
not_found ->
not_found;
{error, Reason} ->
{error, unrecoverable, Reason}
end;
undefined ->
not_found
end.
%%================================================================================
%% Internal exports
%%================================================================================
@ -330,12 +374,18 @@ serialize(#s{serialization_schema = SSchema, with_guid = WithGuid}, Varying, Msg
},
emqx_ds_msg_serializer:serialize(SSchema, Msg).
enrich(#ctx{shard = Shard, s = S, topic_structure = TopicStructure}, DSKey, Msg0) ->
enrich(Shard, S, TopicStructure, DSKey, Msg0).
enrich(
#ctx{shard = Shard, topic_structure = Structure, s = #s{with_guid = WithGuid}},
Shard,
#s{with_guid = WithGuid},
TopicStructure,
DSKey,
Msg0
) ->
Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(Structure, words(Msg0#message.topic))),
Tokens = words(Msg0#message.topic),
Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(TopicStructure, Tokens)),
Msg0#message{
topic = Topic,
id =
@ -584,6 +634,16 @@ mk_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) ->
mk_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) ->
ok.
delete_index(Batch, CF, HashBytes, Static, Varying, Timestamp) ->
delete_index(Batch, CF, HashBytes, Static, Timestamp, 1, Varying).
delete_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) ->
Key = mk_key(Static, N, hash(HashBytes, TopicLevel), Timestamp),
ok = rocksdb:batch_delete(Batch, CF, Key),
delete_index(Batch, CF, HashBytes, Static, Timestamp, N + 1, Varying);
delete_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) ->
ok.
%%%%%%%% Keys %%%%%%%%%%
get_key_range(StaticIdx, WildcardIdx, Hash) ->