feat(ds): support operations + preconditions in skipstream-lts
This commit is contained in:
parent
5356d678cc
commit
68990f1538
|
@ -33,7 +33,8 @@
|
||||||
make_delete_iterator/5,
|
make_delete_iterator/5,
|
||||||
update_iterator/4,
|
update_iterator/4,
|
||||||
next/6,
|
next/6,
|
||||||
delete_next/7
|
delete_next/7,
|
||||||
|
lookup_message/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% internal exports:
|
%% internal exports:
|
||||||
|
@ -43,6 +44,7 @@
|
||||||
|
|
||||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||||
-include_lib("snabbkaffe/include/trace.hrl").
|
-include_lib("snabbkaffe/include/trace.hrl").
|
||||||
|
-include("emqx_ds.hrl").
|
||||||
-include("emqx_ds_metrics.hrl").
|
-include("emqx_ds_metrics.hrl").
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
@ -56,11 +58,12 @@
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
%% TLOG entry
|
%% TLOG entry
|
||||||
%% keys:
|
%% Keys:
|
||||||
-define(cooked_payloads, 6).
|
-define(cooked_msg_ops, 6).
|
||||||
-define(cooked_lts_ops, 7).
|
-define(cooked_lts_ops, 7).
|
||||||
%% Payload:
|
%% Payload:
|
||||||
-define(cooked_payload(TIMESTAMP, STATIC, VARYING, VALUE),
|
-define(cooked_delete, 100).
|
||||||
|
-define(cooked_msg_op(TIMESTAMP, STATIC, VARYING, VALUE),
|
||||||
{TIMESTAMP, STATIC, VARYING, VALUE}
|
{TIMESTAMP, STATIC, VARYING, VALUE}
|
||||||
).
|
).
|
||||||
|
|
||||||
|
@ -176,25 +179,39 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF,
|
||||||
ok = rocksdb:drop_column_family(DBHandle, TrieCF),
|
ok = rocksdb:drop_column_family(DBHandle, TrieCF),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
prepare_batch(_ShardId, S = #s{trie = Trie}, Messages, _Options) ->
|
prepare_batch(
|
||||||
|
_ShardId,
|
||||||
|
S = #s{trie = Trie},
|
||||||
|
Operations,
|
||||||
|
_Options
|
||||||
|
) ->
|
||||||
_ = erase(?lts_persist_ops),
|
_ = erase(?lts_persist_ops),
|
||||||
Payloads = [
|
OperationsCooked = emqx_utils:flattermap(
|
||||||
begin
|
fun
|
||||||
|
({Timestamp, Msg = #message{topic = Topic}}) ->
|
||||||
Tokens = words(Topic),
|
Tokens = words(Topic),
|
||||||
{Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
|
{Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
|
||||||
?cooked_payload(Timestamp, Static, Varying, serialize(S, Varying, Msg))
|
?cooked_msg_op(Timestamp, Static, Varying, serialize(S, Varying, Msg));
|
||||||
|
({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}) ->
|
||||||
|
case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
|
||||||
|
{ok, {Static, Varying}} ->
|
||||||
|
?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete);
|
||||||
|
undefined ->
|
||||||
|
%% Topic is unknown, nothing to delete.
|
||||||
|
[]
|
||||||
end
|
end
|
||||||
|| {Timestamp, Msg = #message{topic = Topic}} <- Messages
|
end,
|
||||||
],
|
Operations
|
||||||
|
),
|
||||||
{ok, #{
|
{ok, #{
|
||||||
?cooked_payloads => Payloads,
|
?cooked_msg_ops => OperationsCooked,
|
||||||
?cooked_lts_ops => pop_lts_persist_ops()
|
?cooked_lts_ops => pop_lts_persist_ops()
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
commit_batch(
|
commit_batch(
|
||||||
_ShardId,
|
_ShardId,
|
||||||
#s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie, hash_bytes = HashBytes},
|
#s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie, hash_bytes = HashBytes},
|
||||||
#{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads},
|
#{?cooked_lts_ops := LtsOps, ?cooked_msg_ops := Operations},
|
||||||
Options
|
Options
|
||||||
) ->
|
) ->
|
||||||
{ok, Batch} = rocksdb:batch(),
|
{ok, Batch} = rocksdb:batch(),
|
||||||
|
@ -210,12 +227,17 @@ commit_batch(
|
||||||
_ = emqx_ds_lts:trie_update(Trie, LtsOps),
|
_ = emqx_ds_lts:trie_update(Trie, LtsOps),
|
||||||
%% Commit payloads:
|
%% Commit payloads:
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(?cooked_payload(Timestamp, Static, Varying, ValBlob)) ->
|
fun
|
||||||
|
(?cooked_msg_op(Timestamp, Static, Varying, ValBlob = <<_/bytes>>)) ->
|
||||||
MasterKey = mk_key(Static, 0, <<>>, Timestamp),
|
MasterKey = mk_key(Static, 0, <<>>, Timestamp),
|
||||||
ok = rocksdb:batch_put(Batch, DataCF, MasterKey, ValBlob),
|
ok = rocksdb:batch_put(Batch, DataCF, MasterKey, ValBlob),
|
||||||
mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp)
|
mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp);
|
||||||
|
(?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete)) ->
|
||||||
|
MasterKey = mk_key(Static, 0, <<>>, Timestamp),
|
||||||
|
ok = rocksdb:batch_delete(Batch, DataCF, MasterKey),
|
||||||
|
delete_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp)
|
||||||
end,
|
end,
|
||||||
Payloads
|
Operations
|
||||||
),
|
),
|
||||||
Result = rocksdb:write_batch(DB, Batch, [
|
Result = rocksdb:write_batch(DB, Batch, [
|
||||||
{disable_wal, not maps:get(durable, Options, true)}
|
{disable_wal, not maps:get(durable, Options, true)}
|
||||||
|
@ -285,6 +307,28 @@ delete_next(Shard, S, It0, Selector, BatchSize, Now, IsCurrent) ->
|
||||||
Ret
|
Ret
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
lookup_message(
|
||||||
|
Shard,
|
||||||
|
S = #s{db = DB, data_cf = CF, trie = Trie},
|
||||||
|
#message_matcher{topic = Topic, timestamp = Timestamp}
|
||||||
|
) ->
|
||||||
|
case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
|
||||||
|
{ok, {StaticIdx, _Varying}} ->
|
||||||
|
DSKey = mk_key(StaticIdx, 0, <<>>, Timestamp),
|
||||||
|
case rocksdb:get(DB, CF, DSKey, _ReadOpts = []) of
|
||||||
|
{ok, Val} ->
|
||||||
|
{ok, TopicStructure} = emqx_ds_lts:reverse_lookup(Trie, StaticIdx),
|
||||||
|
Msg = deserialize(S, Val),
|
||||||
|
enrich(Shard, S, TopicStructure, DSKey, Msg);
|
||||||
|
not_found ->
|
||||||
|
not_found;
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, unrecoverable, Reason}
|
||||||
|
end;
|
||||||
|
undefined ->
|
||||||
|
not_found
|
||||||
|
end.
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Internal exports
|
%% Internal exports
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -330,12 +374,18 @@ serialize(#s{serialization_schema = SSchema, with_guid = WithGuid}, Varying, Msg
|
||||||
},
|
},
|
||||||
emqx_ds_msg_serializer:serialize(SSchema, Msg).
|
emqx_ds_msg_serializer:serialize(SSchema, Msg).
|
||||||
|
|
||||||
|
enrich(#ctx{shard = Shard, s = S, topic_structure = TopicStructure}, DSKey, Msg0) ->
|
||||||
|
enrich(Shard, S, TopicStructure, DSKey, Msg0).
|
||||||
|
|
||||||
enrich(
|
enrich(
|
||||||
#ctx{shard = Shard, topic_structure = Structure, s = #s{with_guid = WithGuid}},
|
Shard,
|
||||||
|
#s{with_guid = WithGuid},
|
||||||
|
TopicStructure,
|
||||||
DSKey,
|
DSKey,
|
||||||
Msg0
|
Msg0
|
||||||
) ->
|
) ->
|
||||||
Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(Structure, words(Msg0#message.topic))),
|
Tokens = words(Msg0#message.topic),
|
||||||
|
Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(TopicStructure, Tokens)),
|
||||||
Msg0#message{
|
Msg0#message{
|
||||||
topic = Topic,
|
topic = Topic,
|
||||||
id =
|
id =
|
||||||
|
@ -584,6 +634,16 @@ mk_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) ->
|
||||||
mk_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) ->
|
mk_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
delete_index(Batch, CF, HashBytes, Static, Varying, Timestamp) ->
|
||||||
|
delete_index(Batch, CF, HashBytes, Static, Timestamp, 1, Varying).
|
||||||
|
|
||||||
|
delete_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) ->
|
||||||
|
Key = mk_key(Static, N, hash(HashBytes, TopicLevel), Timestamp),
|
||||||
|
ok = rocksdb:batch_delete(Batch, CF, Key),
|
||||||
|
delete_index(Batch, CF, HashBytes, Static, Timestamp, N + 1, Varying);
|
||||||
|
delete_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
%%%%%%%% Keys %%%%%%%%%%
|
%%%%%%%% Keys %%%%%%%%%%
|
||||||
|
|
||||||
get_key_range(StaticIdx, WildcardIdx, Hash) ->
|
get_key_range(StaticIdx, WildcardIdx, Hash) ->
|
||||||
|
|
Loading…
Reference in New Issue