Merge pull request #13559 from keynslug/feat/EMQX-12309/raft-precond

feat(dsraft): support atomic batches + preconditions
This commit is contained in:
Andrew Mayorov 2024-08-06 09:17:16 +02:00 committed by GitHub
commit 3b52b658cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 828 additions and 269 deletions

View File

@ -19,6 +19,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include("../../emqx/include/emqx.hrl"). -include("../../emqx/include/emqx.hrl").
-include("../../emqx_durable_storage/include/emqx_ds.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
-include("../../emqx/include/asserts.hrl"). -include("../../emqx/include/asserts.hrl").
@ -145,7 +146,7 @@ t_06_smoke_add_generation(Config) ->
?assertMatch(ok, emqx_ds:add_generation(DB)), ?assertMatch(ok, emqx_ds:add_generation(DB)),
[ [
{Gen1, #{created_at := Created1, since := Since1, until := Until1}}, {Gen1, #{created_at := Created1, since := Since1, until := Until1}},
{Gen2, #{created_at := Created2, since := Since2, until := undefined}} {_Gen2, #{created_at := Created2, since := Since2, until := undefined}}
] = maps:to_list(emqx_ds:list_generations_with_lifetimes(DB)), ] = maps:to_list(emqx_ds:list_generations_with_lifetimes(DB)),
%% Check units of the return values (+/- 10s from test begin time): %% Check units of the return values (+/- 10s from test begin time):
?give_or_take(BeginTime, 10_000, Created1), ?give_or_take(BeginTime, 10_000, Created1),
@ -234,8 +235,8 @@ t_09_atomic_store_batch(Config) ->
DB = ?FUNCTION_NAME, DB = ?FUNCTION_NAME,
?check_trace( ?check_trace(
begin begin
application:set_env(emqx_durable_storage, egress_batch_size, 1), DBOpts = (opts(Config))#{atomic_batches => true},
?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))), ?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)),
Msgs = [ Msgs = [
message(<<"1">>, <<"1">>, 0), message(<<"1">>, <<"1">>, 0),
message(<<"2">>, <<"2">>, 1), message(<<"2">>, <<"2">>, 1),
@ -243,13 +244,8 @@ t_09_atomic_store_batch(Config) ->
], ],
?assertEqual( ?assertEqual(
ok, ok,
emqx_ds:store_batch(DB, Msgs, #{ emqx_ds:store_batch(DB, Msgs, #{sync => true})
atomic => true, )
sync => true
})
),
{ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_buffer_flush}),
?assertMatch(#{batch := [_, _, _]}, Flush)
end, end,
[] []
), ),
@ -289,6 +285,124 @@ t_10_non_atomic_store_batch(Config) ->
), ),
ok. ok.
t_11_batch_preconditions(Config) ->
DB = ?FUNCTION_NAME,
?check_trace(
begin
DBOpts = (opts(Config))#{
atomic_batches => true,
force_monotonic_timestamps => false
},
?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)),
%% Conditional delete
TS = 42,
Batch1 = #dsbatch{
preconditions = [{if_exists, matcher(<<"c1">>, <<"t/a">>, '_', TS)}],
operations = [{delete, matcher(<<"c1">>, <<"t/a">>, '_', TS)}]
},
%% Conditional insert
M1 = message(<<"c1">>, <<"t/a">>, <<"M1">>, TS),
Batch2 = #dsbatch{
preconditions = [{unless_exists, matcher(<<"c1">>, <<"t/a">>, '_', TS)}],
operations = [M1]
},
%% No such message yet, precondition fails:
?assertEqual(
{error, unrecoverable, {precondition_failed, not_found}},
emqx_ds:store_batch(DB, Batch1)
),
%% No such message yet, `unless` precondition holds:
?assertEqual(
ok,
emqx_ds:store_batch(DB, Batch2)
),
%% Now there's such message, `unless` precondition now fails:
?assertEqual(
{error, unrecoverable, {precondition_failed, M1}},
emqx_ds:store_batch(DB, Batch2)
),
%% On the other hand, `if` precondition now holds:
?assertEqual(
ok,
emqx_ds:store_batch(DB, Batch1)
),
%% Wait at least until current epoch ends.
ct:sleep(1000),
%% There's no messages in the DB.
?assertEqual(
[],
emqx_ds_test_helpers:consume(DB, emqx_topic:words(<<"t/#">>))
)
end,
[]
).
t_12_batch_precondition_conflicts(Config) ->
DB = ?FUNCTION_NAME,
NBatches = 50,
NMessages = 10,
?check_trace(
begin
DBOpts = (opts(Config))#{
atomic_batches => true,
force_monotonic_timestamps => false
},
?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)),
ConflictBatches = [
#dsbatch{
%% If the slot is free...
preconditions = [{if_exists, matcher(<<"c1">>, <<"t/slot">>, _Free = <<>>, 0)}],
%% Take it and write NMessages extra messages, so that batches take longer to
%% process and have higher chances to conflict with each other.
operations =
[
message(<<"c1">>, <<"t/slot">>, integer_to_binary(I), _TS = 0)
| [
message(<<"c1">>, {"t/owner/~p/~p", [I, J]}, <<>>, I * 100 + J)
|| J <- lists:seq(1, NMessages)
]
]
}
|| I <- lists:seq(1, NBatches)
],
%% Run those batches concurrently.
ok = emqx_ds:store_batch(DB, [message(<<"c1">>, <<"t/slot">>, <<>>, 0)]),
Results = emqx_utils:pmap(
fun(B) -> emqx_ds:store_batch(DB, B) end,
ConflictBatches,
infinity
),
%% Only one should have succeeded.
?assertEqual([ok], [Ok || Ok = ok <- Results]),
%% While other failed with an identical `precondition_failed`.
Failures = lists:usort([PreconditionFailed || {error, _, PreconditionFailed} <- Results]),
?assertMatch(
[{precondition_failed, #message{topic = <<"t/slot">>, payload = <<_/bytes>>}}],
Failures
),
%% Wait at least until current epoch ends.
ct:sleep(1000),
%% Storage should contain single batch's messages.
[{precondition_failed, #message{payload = IOwner}}] = Failures,
WinnerBatch = lists:nth(binary_to_integer(IOwner), ConflictBatches),
BatchMessages = lists:sort(WinnerBatch#dsbatch.operations),
DBMessages = emqx_ds_test_helpers:consume(DB, emqx_topic:words(<<"t/#">>)),
?assertEqual(
BatchMessages,
DBMessages
)
end,
[]
).
t_smoke_delete_next(Config) -> t_smoke_delete_next(Config) ->
DB = ?FUNCTION_NAME, DB = ?FUNCTION_NAME,
?check_trace( ?check_trace(
@ -534,12 +648,25 @@ message(ClientId, Topic, Payload, PublishedAt) ->
message(Topic, Payload, PublishedAt) -> message(Topic, Payload, PublishedAt) ->
#message{ #message{
topic = Topic, topic = try_format(Topic),
payload = Payload, payload = try_format(Payload),
timestamp = PublishedAt, timestamp = PublishedAt,
id = emqx_guid:gen() id = emqx_guid:gen()
}. }.
matcher(ClientID, Topic, Payload, Timestamp) ->
#message_matcher{
from = ClientID,
topic = try_format(Topic),
timestamp = Timestamp,
payload = Payload
}.
try_format({Fmt, Args}) ->
emqx_utils:format(Fmt, Args);
try_format(String) ->
String.
delete(DB, It, Selector, BatchSize) -> delete(DB, It, Selector, BatchSize) ->
delete(DB, It, Selector, BatchSize, 0). delete(DB, It, Selector, BatchSize, 0).
@ -562,9 +689,18 @@ all() ->
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), TCs = emqx_common_test_helpers:all(?MODULE),
%% TODO: Remove once builtin-local supports preconditions + atomic batches.
BuiltinLocalTCs =
TCs --
[
t_09_atomic_store_batch,
t_11_batch_preconditions,
t_12_batch_precondition_conflicts
],
BuiltinRaftTCs = TCs,
[ [
{builtin_local, TCs}, {builtin_local, BuiltinLocalTCs},
{builtin_raft, TCs} {builtin_raft, BuiltinRaftTCs}
]. ].
init_per_group(builtin_local, Config) -> init_per_group(builtin_local, Config) ->

View File

@ -43,7 +43,7 @@
%% `emqx_ds_buffer': %% `emqx_ds_buffer':
init_buffer/3, init_buffer/3,
flush_buffer/4, flush_buffer/4,
shard_of_message/4 shard_of_operation/4
]). ]).
%% Internal exports: %% Internal exports:
@ -55,6 +55,7 @@
-export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]). -export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
%%================================================================================ %%================================================================================
%% Type declarations %% Type declarations
@ -230,9 +231,9 @@ flush_buffer(DB, Shard, Messages, S0 = #bs{options = Options}) ->
make_batch(_ForceMonotonic = true, Latest, Messages) -> make_batch(_ForceMonotonic = true, Latest, Messages) ->
assign_monotonic_timestamps(Latest, Messages, []); assign_monotonic_timestamps(Latest, Messages, []);
make_batch(false, Latest, Messages) -> make_batch(false, Latest, Messages) ->
assign_message_timestamps(Latest, Messages, []). assign_operation_timestamps(Latest, Messages, []).
assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) -> assign_monotonic_timestamps(Latest0, [Message = #message{} | Rest], Acc0) ->
case emqx_message:timestamp(Message, microsecond) of case emqx_message:timestamp(Message, microsecond) of
TimestampUs when TimestampUs > Latest0 -> TimestampUs when TimestampUs > Latest0 ->
Latest = TimestampUs; Latest = TimestampUs;
@ -241,28 +242,43 @@ assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) ->
end, end,
Acc = [assign_timestamp(Latest, Message) | Acc0], Acc = [assign_timestamp(Latest, Message) | Acc0],
assign_monotonic_timestamps(Latest, Rest, Acc); assign_monotonic_timestamps(Latest, Rest, Acc);
assign_monotonic_timestamps(Latest, [Operation | Rest], Acc0) ->
Acc = [Operation | Acc0],
assign_monotonic_timestamps(Latest, Rest, Acc);
assign_monotonic_timestamps(Latest, [], Acc) -> assign_monotonic_timestamps(Latest, [], Acc) ->
{Latest, lists:reverse(Acc)}. {Latest, lists:reverse(Acc)}.
assign_message_timestamps(Latest0, [Message | Rest], Acc0) -> assign_operation_timestamps(Latest0, [Message = #message{} | Rest], Acc0) ->
TimestampUs = emqx_message:timestamp(Message, microsecond), TimestampUs = emqx_message:timestamp(Message),
Latest = max(TimestampUs, Latest0), Latest = max(TimestampUs, Latest0),
Acc = [assign_timestamp(TimestampUs, Message) | Acc0], Acc = [assign_timestamp(TimestampUs, Message) | Acc0],
assign_message_timestamps(Latest, Rest, Acc); assign_operation_timestamps(Latest, Rest, Acc);
assign_message_timestamps(Latest, [], Acc) -> assign_operation_timestamps(Latest, [Operation | Rest], Acc0) ->
Acc = [Operation | Acc0],
assign_operation_timestamps(Latest, Rest, Acc);
assign_operation_timestamps(Latest, [], Acc) ->
{Latest, lists:reverse(Acc)}. {Latest, lists:reverse(Acc)}.
assign_timestamp(TimestampUs, Message) -> assign_timestamp(TimestampUs, Message) ->
{TimestampUs, Message}. {TimestampUs, Message}.
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) -> shard(). -spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic, _Options) -> shard().
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) -> shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
case SerializeBy of
clientid -> Key = From;
topic -> Key = Topic
end,
shard_of_key(DB, Key);
shard_of_operation(DB, {_, #message_matcher{from = From, topic = Topic}}, SerializeBy, _Options) ->
case SerializeBy of
clientid -> Key = From;
topic -> Key = Topic
end,
shard_of_key(DB, Key).
shard_of_key(DB, Key) ->
N = emqx_ds_builtin_local_meta:n_shards(DB), N = emqx_ds_builtin_local_meta:n_shards(DB),
Hash = Hash = erlang:phash2(Key, N),
case SerializeBy of
clientid -> erlang:phash2(From, N);
topic -> erlang:phash2(Topic, N)
end,
integer_to_binary(Hash). integer_to_binary(Hash).
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
@ -288,7 +304,7 @@ get_streams(DB, TopicFilter, StartTime) ->
-spec make_iterator( -spec make_iterator(
emqx_ds:db(), emqx_ds:ds_specific_stream(), emqx_ds:topic_filter(), emqx_ds:time() emqx_ds:db(), emqx_ds:ds_specific_stream(), emqx_ds:topic_filter(), emqx_ds:time()
) -> ) ->
emqx_ds:make_iterator_result(emqx_ds:ds_specific_iterator()). emqx_ds:make_iterator_result(iterator()).
make_iterator(DB, ?stream(Shard, InnerStream), TopicFilter, StartTime) -> make_iterator(DB, ?stream(Shard, InnerStream), TopicFilter, StartTime) ->
ShardId = {DB, Shard}, ShardId = {DB, Shard},
case case
@ -302,7 +318,7 @@ make_iterator(DB, ?stream(Shard, InnerStream), TopicFilter, StartTime) ->
Error Error
end. end.
-spec update_iterator(emqx_ds:db(), emqx_ds:ds_specific_iterator(), emqx_ds:message_key()) -> -spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) ->
emqx_ds:make_iterator_result(iterator()). emqx_ds:make_iterator_result(iterator()).
update_iterator(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, Key) -> update_iterator(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, Key) ->
case emqx_ds_storage_layer:update_iterator({DB, Shard}, StorageIter0, Key) of case emqx_ds_storage_layer:update_iterator({DB, Shard}, StorageIter0, Key) of
@ -380,7 +396,7 @@ do_next(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, N) ->
end. end.
-spec do_delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) -> -spec do_delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
emqx_ds:delete_next_result(emqx_ds:delete_iterator()). emqx_ds:delete_next_result(delete_iterator()).
do_delete_next( do_delete_next(
DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0}, Selector, N DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0}, Selector, N
) -> ) ->

View File

@ -29,7 +29,7 @@
current_timestamp/2, current_timestamp/2,
shard_of_message/4, shard_of_operation/4,
flush_buffer/4, flush_buffer/4,
init_buffer/3 init_buffer/3
]). ]).
@ -83,6 +83,7 @@
ra_state/0 ra_state/0
]). ]).
-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("snabbkaffe/include/trace.hrl").
-include("emqx_ds_replication_layer.hrl"). -include("emqx_ds_replication_layer.hrl").
@ -100,7 +101,10 @@
n_shards => pos_integer(), n_shards => pos_integer(),
n_sites => pos_integer(), n_sites => pos_integer(),
replication_factor => pos_integer(), replication_factor => pos_integer(),
replication_options => _TODO :: #{} replication_options => _TODO :: #{},
%% Inherited from `emqx_ds:generic_db_opts()`.
force_monotonic_timestamps => boolean(),
atomic_batches => boolean()
}. }.
%% This enapsulates the stream entity from the replication level. %% This enapsulates the stream entity from the replication level.
@ -135,11 +139,12 @@
?enc := emqx_ds_storage_layer:delete_iterator() ?enc := emqx_ds_storage_layer:delete_iterator()
}. }.
%% TODO: this type is obsolete and is kept only for compatibility with %% Write batch.
%% BPAPIs. Remove it when emqx_ds_proto_v4 is gone (EMQX 5.6) %% Instances of this type currently form the majority of the Raft log.
-type batch() :: #{ -type batch() :: #{
?tag := ?BATCH, ?tag := ?BATCH,
?batch_messages := [emqx_types:message()] ?batch_operations := [emqx_ds:operation()],
?batch_preconditions => [emqx_ds:precondition()]
}. }.
-type generation_rank() :: {shard_id(), term()}. -type generation_rank() :: {shard_id(), term()}.
@ -240,16 +245,45 @@ drop_db(DB) ->
_ = emqx_ds_proto_v4:drop_db(list_nodes(), DB), _ = emqx_ds_proto_v4:drop_db(list_nodes(), DB),
emqx_ds_replication_layer_meta:drop_db(DB). emqx_ds_replication_layer_meta:drop_db(DB).
-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> -spec store_batch(emqx_ds:db(), emqx_ds:batch(), emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
store_batch(DB, Messages, Opts) -> store_batch(DB, Batch = #dsbatch{preconditions = [_ | _]}, Opts) ->
%% NOTE: Atomic batch is implied, will not check with DB config.
store_batch_atomic(DB, Batch, Opts);
store_batch(DB, Batch, Opts) ->
case emqx_ds_replication_layer_meta:db_config(DB) of
#{atomic_batches := true} ->
store_batch_atomic(DB, Batch, Opts);
#{} ->
store_batch_buffered(DB, Batch, Opts)
end.
store_batch_buffered(DB, #dsbatch{operations = Operations}, Opts) ->
store_batch_buffered(DB, Operations, Opts);
store_batch_buffered(DB, Batch, Opts) ->
try try
emqx_ds_buffer:store_batch(DB, Messages, Opts) emqx_ds_buffer:store_batch(DB, Batch, Opts)
catch catch
error:{Reason, _Call} when Reason == timeout; Reason == noproc -> error:{Reason, _Call} when Reason == timeout; Reason == noproc ->
{error, recoverable, Reason} {error, recoverable, Reason}
end. end.
store_batch_atomic(DB, Batch, _Opts) ->
Shards = shards_of_batch(DB, Batch),
case Shards of
[Shard] ->
case ra_store_batch(DB, Shard, Batch) of
{timeout, ServerId} ->
{error, recoverable, {timeout, ServerId}};
Result ->
Result
end;
[] ->
ok;
[_ | _] ->
{error, unrecoverable, atomic_batch_spans_multiple_shards}
end.
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[{emqx_ds:stream_rank(), stream()}]. [{emqx_ds:stream_rank(), stream()}].
get_streams(DB, TopicFilter, StartTime) -> get_streams(DB, TopicFilter, StartTime) ->
@ -392,17 +426,49 @@ flush_buffer(DB, Shard, Messages, State) ->
end, end,
{State, Result}. {State, Result}.
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) -> -spec shard_of_operation(
emqx_ds:db(),
emqx_ds:operation() | emqx_ds:precondition(),
clientid | topic,
_Options
) ->
emqx_ds_replication_layer:shard_id(). emqx_ds_replication_layer:shard_id().
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) -> shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
case SerializeBy of
clientid -> Key = From;
topic -> Key = Topic
end,
shard_of_key(DB, Key);
shard_of_operation(DB, {_OpName, Matcher}, SerializeBy, _Options) ->
#message_matcher{from = From, topic = Topic} = Matcher,
case SerializeBy of
clientid -> Key = From;
topic -> Key = Topic
end,
shard_of_key(DB, Key).
shard_of_key(DB, Key) ->
N = emqx_ds_replication_shard_allocator:n_shards(DB), N = emqx_ds_replication_shard_allocator:n_shards(DB),
Hash = Hash = erlang:phash2(Key, N),
case SerializeBy of
clientid -> erlang:phash2(From, N);
topic -> erlang:phash2(Topic, N)
end,
integer_to_binary(Hash). integer_to_binary(Hash).
shards_of_batch(DB, #dsbatch{operations = Operations, preconditions = Preconditions}) ->
shards_of_batch(DB, Preconditions, shards_of_batch(DB, Operations, []));
shards_of_batch(DB, Operations) ->
shards_of_batch(DB, Operations, []).
shards_of_batch(DB, [Operation | Rest], Acc) ->
case shard_of_operation(DB, Operation, clientid, #{}) of
Shard when Shard =:= hd(Acc) ->
shards_of_batch(DB, Rest, Acc);
Shard when Acc =:= [] ->
shards_of_batch(DB, Rest, [Shard]);
ShardAnother ->
[ShardAnother | Acc]
end;
shards_of_batch(_DB, [], Acc) ->
Acc.
%%================================================================================ %%================================================================================
%% Internal exports (RPC targets) %% Internal exports (RPC targets)
%%================================================================================ %%================================================================================
@ -612,7 +678,7 @@ list_nodes() ->
-define(SHARD_RPC(DB, SHARD, NODE, BODY), -define(SHARD_RPC(DB, SHARD, NODE, BODY),
case case
emqx_ds_replication_layer_shard:servers( emqx_ds_replication_layer_shard:servers(
DB, SHARD, application:get_env(emqx_durable_storage, reads, leader_preferred) DB, SHARD, application:get_env(emqx_ds_builtin_raft, reads, leader_preferred)
) )
of of
[{_, NODE} | _] -> [{_, NODE} | _] ->
@ -624,13 +690,22 @@ list_nodes() ->
end end
). ).
-spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) -> -spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:batch()) ->
ok | {timeout, _} | {error, recoverable | unrecoverable, _Err}. ok | {timeout, _} | emqx_ds:error(_).
ra_store_batch(DB, Shard, Messages) -> ra_store_batch(DB, Shard, Batch) ->
Command = #{ case Batch of
?tag => ?BATCH, #dsbatch{operations = Operations, preconditions = Preconditions} ->
?batch_messages => Messages Command = #{
}, ?tag => ?BATCH,
?batch_operations => Operations,
?batch_preconditions => Preconditions
};
Operations ->
Command = #{
?tag => ?BATCH,
?batch_operations => Operations
}
end,
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case emqx_ds_replication_layer_shard:process_command(Servers, Command, ?RA_TIMEOUT) of case emqx_ds_replication_layer_shard:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} -> {ok, Result, _Leader} ->
@ -767,6 +842,7 @@ ra_drop_shard(DB, Shard) ->
-define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release'). -define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release').
-define(pd_ra_bytes_need_release, '$emqx_ds_raft_bytes_need_release'). -define(pd_ra_bytes_need_release, '$emqx_ds_raft_bytes_need_release').
-define(pd_ra_force_monotonic, '$emqx_ds_raft_force_monotonic').
-spec init(_Args :: map()) -> ra_state(). -spec init(_Args :: map()) -> ra_state().
init(#{db := DB, shard := Shard}) -> init(#{db := DB, shard := Shard}) ->
@ -776,18 +852,30 @@ init(#{db := DB, shard := Shard}) ->
{ra_state(), _Reply, _Effects}. {ra_state(), _Reply, _Effects}.
apply( apply(
RaftMeta, RaftMeta,
#{ Command = #{
?tag := ?BATCH, ?tag := ?BATCH,
?batch_messages := MessagesIn ?batch_operations := OperationsIn
}, },
#{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0 #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0
) -> ) ->
?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}), ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => OperationsIn, latest => Latest0}),
{Stats, Latest, Messages} = assign_timestamps(Latest0, MessagesIn), Preconditions = maps:get(?batch_preconditions, Command, []),
Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{durable => false}), {Stats, Latest, Operations} = assign_timestamps(DB, Latest0, OperationsIn),
State = State0#{latest := Latest}, %% FIXME
set_ts(DBShard, Latest), case emqx_ds_precondition:verify(emqx_ds_storage_layer, DBShard, Preconditions) of
Effects = try_release_log(Stats, RaftMeta, State), ok ->
Result = emqx_ds_storage_layer:store_batch(DBShard, Operations, #{durable => false}),
State = State0#{latest := Latest},
set_ts(DBShard, Latest),
Effects = try_release_log(Stats, RaftMeta, State);
PreconditionFailed = {precondition_failed, _} ->
Result = {error, unrecoverable, PreconditionFailed},
State = State0,
Effects = [];
Result ->
State = State0,
Effects = []
end,
Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}), Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
{State, Result, Effects}; {State, Result, Effects};
apply( apply(
@ -862,6 +950,21 @@ apply(
Effects = handle_custom_event(DBShard, Latest, CustomEvent), Effects = handle_custom_event(DBShard, Latest, CustomEvent),
{State#{latest => Latest}, ok, Effects}. {State#{latest => Latest}, ok, Effects}.
assign_timestamps(DB, Latest, Messages) ->
ForceMonotonic = force_monotonic_timestamps(DB),
assign_timestamps(ForceMonotonic, Latest, Messages, [], 0, 0).
force_monotonic_timestamps(DB) ->
case erlang:get(?pd_ra_force_monotonic) of
undefined ->
DBConfig = emqx_ds_replication_layer_meta:db_config(DB),
Flag = maps:get(force_monotonic_timestamps, DBConfig),
erlang:put(?pd_ra_force_monotonic, Flag);
Flag ->
ok
end,
Flag.
try_release_log({_N, BatchSize}, RaftMeta = #{index := CurrentIdx}, State) -> try_release_log({_N, BatchSize}, RaftMeta = #{index := CurrentIdx}, State) ->
%% NOTE %% NOTE
%% Because cursor release means storage flush (see %% Because cursor release means storage flush (see
@ -924,10 +1027,7 @@ tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}), ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}),
handle_custom_event(DBShard, Timestamp, tick). handle_custom_event(DBShard, Timestamp, tick).
assign_timestamps(Latest, Messages) -> assign_timestamps(true, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
assign_timestamps(Latest, Messages, [], 0, 0).
assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) ->
case emqx_message:timestamp(Message0, microsecond) of case emqx_message:timestamp(Message0, microsecond) of
TimestampUs when TimestampUs > Latest0 -> TimestampUs when TimestampUs > Latest0 ->
Latest = TimestampUs, Latest = TimestampUs,
@ -936,8 +1036,17 @@ assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) ->
Latest = Latest0 + 1, Latest = Latest0 + 1,
Message = assign_timestamp(Latest, Message0) Message = assign_timestamp(Latest, Message0)
end, end,
assign_timestamps(Latest, Rest, [Message | Acc], N + 1, Sz + approx_message_size(Message0)); MSize = approx_message_size(Message0),
assign_timestamps(Latest, [], Acc, N, Size) -> assign_timestamps(true, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
assign_timestamps(false, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
TimestampUs = emqx_message:timestamp(Message0),
Latest = max(Latest0, TimestampUs),
Message = assign_timestamp(TimestampUs, Message0),
MSize = approx_message_size(Message0),
assign_timestamps(false, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
assign_timestamps(ForceMonotonic, Latest, [Operation | Rest], Acc, N, Sz) ->
assign_timestamps(ForceMonotonic, Latest, Rest, [Operation | Acc], N + 1, Sz);
assign_timestamps(_ForceMonotonic, Latest, [], Acc, N, Size) ->
{{N, Size}, Latest, lists:reverse(Acc)}. {{N, Size}, Latest, lists:reverse(Acc)}.
assign_timestamp(TimestampUs, Message) -> assign_timestamp(TimestampUs, Message) ->

View File

@ -19,7 +19,8 @@
-define(enc, 3). -define(enc, 3).
%% ?BATCH %% ?BATCH
-define(batch_messages, 2). -define(batch_operations, 2).
-define(batch_preconditions, 4).
-define(timestamp, 3). -define(timestamp, 3).
%% add_generation / update_config %% add_generation / update_config

View File

@ -56,6 +56,7 @@
topic/0, topic/0,
batch/0, batch/0,
operation/0, operation/0,
deletion/0,
precondition/0, precondition/0,
stream/0, stream/0,
delete_stream/0, delete_stream/0,
@ -110,7 +111,9 @@
message() message()
%% Delete a message. %% Delete a message.
%% Does nothing if the message does not exist. %% Does nothing if the message does not exist.
| {delete, message_matcher('_')}. | deletion().
-type deletion() :: {delete, message_matcher('_')}.
%% Precondition. %% Precondition.
%% Fails whole batch if the storage already has the matching message (`if_exists'), %% Fails whole batch if the storage already has the matching message (`if_exists'),

View File

@ -21,7 +21,7 @@
-behaviour(gen_server). -behaviour(gen_server).
%% API: %% API:
-export([start_link/4, store_batch/3, shard_of_message/3]). -export([start_link/4, store_batch/3, shard_of_operation/3]).
-export([ls/0]). -export([ls/0]).
%% behavior callbacks: %% behavior callbacks:
@ -46,19 +46,18 @@
-define(cbm(DB), {?MODULE, DB}). -define(cbm(DB), {?MODULE, DB}).
-record(enqueue_req, { -record(enqueue_req, {
messages :: [emqx_types:message()], operations :: [emqx_ds:operation()],
sync :: boolean(), sync :: boolean(),
atomic :: boolean(), n_operations :: non_neg_integer(),
n_messages :: non_neg_integer(),
payload_bytes :: non_neg_integer() payload_bytes :: non_neg_integer()
}). }).
-callback init_buffer(emqx_ds:db(), _Shard, _Options) -> {ok, _State}. -callback init_buffer(emqx_ds:db(), _Shard, _Options) -> {ok, _State}.
-callback flush_buffer(emqx_ds:db(), _Shard, [emqx_types:message()], State) -> -callback flush_buffer(emqx_ds:db(), _Shard, [emqx_ds:operation()], State) ->
{State, ok | {error, recoverable | unrecoverable, _}}. {State, ok | {error, recoverable | unrecoverable, _}}.
-callback shard_of_message(emqx_ds:db(), emqx_types:message(), topic | clientid, _Options) -> -callback shard_of_operation(emqx_ds:db(), emqx_ds:operation(), topic | clientid, _Options) ->
_Shard. _Shard.
%%================================================================================ %%================================================================================
@ -77,39 +76,33 @@ start_link(CallbackModule, CallbackOptions, DB, Shard) ->
?via(DB, Shard), ?MODULE, [CallbackModule, CallbackOptions, DB, Shard], [] ?via(DB, Shard), ?MODULE, [CallbackModule, CallbackOptions, DB, Shard], []
). ).
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> -spec store_batch(emqx_ds:db(), [emqx_ds:operation()], emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
store_batch(DB, Messages, Opts) -> store_batch(DB, Operations, Opts) ->
Sync = maps:get(sync, Opts, true), Sync = maps:get(sync, Opts, true),
Atomic = maps:get(atomic, Opts, false),
%% Usually we expect all messages in the batch to go into the %% Usually we expect all messages in the batch to go into the
%% single shard, so this function is optimized for the happy case. %% single shard, so this function is optimized for the happy case.
case shards_of_batch(DB, Messages) of case shards_of_batch(DB, Operations) of
[{Shard, {NMsgs, NBytes}}] -> [{Shard, {NOps, NBytes}}] ->
%% Happy case: %% Happy case:
enqueue_call_or_cast( enqueue_call_or_cast(
?via(DB, Shard), ?via(DB, Shard),
#enqueue_req{ #enqueue_req{
messages = Messages, operations = Operations,
sync = Sync, sync = Sync,
atomic = Atomic, n_operations = NOps,
n_messages = NMsgs,
payload_bytes = NBytes payload_bytes = NBytes
} }
); );
[_, _ | _] when Atomic ->
%% It's impossible to commit a batch to multiple shards
%% atomically
{error, unrecoverable, atomic_commit_to_multiple_shards};
_Shards -> _Shards ->
%% Use a slower implementation for the unlikely case: %% Use a slower implementation for the unlikely case:
repackage_messages(DB, Messages, Sync) repackage_messages(DB, Operations, Sync)
end. end.
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> _Shard. -spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic) -> _Shard.
shard_of_message(DB, Message, ShardBy) -> shard_of_operation(DB, Operation, ShardBy) ->
{CBM, Options} = persistent_term:get(?cbm(DB)), {CBM, Options} = persistent_term:get(?cbm(DB)),
CBM:shard_of_message(DB, Message, ShardBy, Options). CBM:shard_of_operation(DB, Operation, ShardBy, Options).
%%================================================================================ %%================================================================================
%% behavior callbacks %% behavior callbacks
@ -129,7 +122,7 @@ shard_of_message(DB, Message, ShardBy) ->
n = 0 :: non_neg_integer(), n = 0 :: non_neg_integer(),
n_bytes = 0 :: non_neg_integer(), n_bytes = 0 :: non_neg_integer(),
tref :: undefined | reference(), tref :: undefined | reference(),
queue :: queue:queue(emqx_types:message()), queue :: queue:queue(emqx_ds:operation()),
pending_replies = [] :: [gen_server:from()] pending_replies = [] :: [gen_server:from()]
}). }).
@ -168,31 +161,29 @@ format_status(Status) ->
handle_call( handle_call(
#enqueue_req{ #enqueue_req{
messages = Msgs, operations = Operations,
sync = Sync, sync = Sync,
atomic = Atomic, n_operations = NOps,
n_messages = NMsgs,
payload_bytes = NBytes payload_bytes = NBytes
}, },
From, From,
S0 = #s{pending_replies = Replies0} S0 = #s{pending_replies = Replies0}
) -> ) ->
S = S0#s{pending_replies = [From | Replies0]}, S = S0#s{pending_replies = [From | Replies0]},
{noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; {noreply, enqueue(Sync, Operations, NOps, NBytes, S)};
handle_call(_Call, _From, S) -> handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}. {reply, {error, unknown_call}, S}.
handle_cast( handle_cast(
#enqueue_req{ #enqueue_req{
messages = Msgs, operations = Operations,
sync = Sync, sync = Sync,
atomic = Atomic, n_operations = NOps,
n_messages = NMsgs,
payload_bytes = NBytes payload_bytes = NBytes
}, },
S S
) -> ) ->
{noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; {noreply, enqueue(Sync, Operations, NOps, NBytes, S)};
handle_cast(_Cast, S) -> handle_cast(_Cast, S) ->
{noreply, S}. {noreply, S}.
@ -215,11 +206,10 @@ terminate(_Reason, #s{db = DB}) ->
enqueue( enqueue(
Sync, Sync,
Atomic, Ops,
Msgs,
BatchSize, BatchSize,
BatchBytes, BatchBytes,
S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0} S0 = #s{n = NOps0, n_bytes = NBytes0, queue = Q0}
) -> ) ->
%% At this point we don't split the batches, even when they aren't %% At this point we don't split the batches, even when they aren't
%% atomic. It wouldn't win us anything in terms of memory, and %% atomic. It wouldn't win us anything in terms of memory, and
@ -227,18 +217,18 @@ enqueue(
%% granularity should be fine enough. %% granularity should be fine enough.
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity), NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity),
NMsgs = NMsgs0 + BatchSize, NMsgs = NOps0 + BatchSize,
NBytes = NBytes0 + BatchBytes, NBytes = NBytes0 + BatchBytes,
case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NOps0 > 0) of
true -> true ->
%% Adding this batch would cause buffer to overflow. Flush %% Adding this batch would cause buffer to overflow. Flush
%% it now, and retry: %% it now, and retry:
S1 = flush(S0), S1 = flush(S0),
enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); enqueue(Sync, Ops, BatchSize, BatchBytes, S1);
false -> false ->
%% The buffer is empty, we enqueue the atomic batch in its %% The buffer is empty, we enqueue the atomic batch in its
%% entirety: %% entirety:
Q1 = lists:foldl(fun queue:in/2, Q0, Msgs), Q1 = lists:foldl(fun queue:in/2, Q0, Ops),
S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1}, S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1},
case NMsgs >= NMax orelse NBytes >= NBytesMax of case NMsgs >= NMax orelse NBytes >= NBytesMax of
true -> true ->
@ -336,18 +326,18 @@ do_flush(
} }
end. end.
-spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) -> -spec shards_of_batch(emqx_ds:db(), [emqx_ds:operation()]) ->
[{_ShardId, {NMessages, NBytes}}] [{_ShardId, {NMessages, NBytes}}]
when when
NMessages :: non_neg_integer(), NMessages :: non_neg_integer(),
NBytes :: non_neg_integer(). NBytes :: non_neg_integer().
shards_of_batch(DB, Messages) -> shards_of_batch(DB, Batch) ->
maps:to_list( maps:to_list(
lists:foldl( lists:foldl(
fun(Message, Acc) -> fun(Operation, Acc) ->
%% TODO: sharding strategy must be part of the DS DB schema: %% TODO: sharding strategy must be part of the DS DB schema:
Shard = shard_of_message(DB, Message, clientid), Shard = shard_of_operation(DB, Operation, clientid),
Size = payload_size(Message), Size = payload_size(Operation),
maps:update_with( maps:update_with(
Shard, Shard,
fun({N, S}) -> fun({N, S}) ->
@ -358,36 +348,35 @@ shards_of_batch(DB, Messages) ->
) )
end, end,
#{}, #{},
Messages Batch
) )
). ).
repackage_messages(DB, Messages, Sync) -> repackage_messages(DB, Batch, Sync) ->
Batches = lists:foldl( Batches = lists:foldl(
fun(Message, Acc) -> fun(Operation, Acc) ->
Shard = shard_of_message(DB, Message, clientid), Shard = shard_of_operation(DB, Operation, clientid),
Size = payload_size(Message), Size = payload_size(Operation),
maps:update_with( maps:update_with(
Shard, Shard,
fun({N, S, Msgs}) -> fun({N, S, Msgs}) ->
{N + 1, S + Size, [Message | Msgs]} {N + 1, S + Size, [Operation | Msgs]}
end, end,
{1, Size, [Message]}, {1, Size, [Operation]},
Acc Acc
) )
end, end,
#{}, #{},
Messages Batch
), ),
maps:fold( maps:fold(
fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) -> fun(Shard, {NOps, ByteSize, RevOperations}, ErrAcc) ->
Err = enqueue_call_or_cast( Err = enqueue_call_or_cast(
?via(DB, Shard), ?via(DB, Shard),
#enqueue_req{ #enqueue_req{
messages = lists:reverse(RevMessages), operations = lists:reverse(RevOperations),
sync = Sync, sync = Sync,
atomic = false, n_operations = NOps,
n_messages = NMsgs,
payload_bytes = ByteSize payload_bytes = ByteSize
} }
), ),
@ -427,4 +416,6 @@ cancel_timer(S = #s{tref = TRef}) ->
%% @doc Return approximate size of the MQTT message (it doesn't take %% @doc Return approximate size of the MQTT message (it doesn't take
%% all things into account, for example headers and extras) %% all things into account, for example headers and extras)
payload_size(#message{payload = P, topic = T}) -> payload_size(#message{payload = P, topic = T}) ->
size(P) + size(T). size(P) + size(T);
payload_size({_OpName, _}) ->
0.

View File

@ -0,0 +1,184 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_precondition).
-include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
-export([verify/3]).
-export([matches/2]).
-export_type([matcher/0, mismatch/0]).
-type matcher() :: #message_matcher{}.
-type mismatch() :: emqx_types:message() | not_found.
-callback lookup_message(_Ctx, matcher()) ->
emqx_types:message() | not_found | emqx_ds:error(_).
%%
-spec verify(module(), _Ctx, [emqx_ds:precondition()]) ->
ok | {precondition_failed, mismatch()} | emqx_ds:error(_).
verify(Mod, Ctx, [_Precondition = {Cond, Msg} | Rest]) ->
case verify_precondition(Mod, Ctx, Cond, Msg) of
ok ->
verify(Mod, Ctx, Rest);
Failed ->
Failed
end;
verify(_Mod, _Ctx, []) ->
ok.
verify_precondition(Mod, Ctx, if_exists, Matcher) ->
case Mod:lookup_message(Ctx, Matcher) of
Msg = #message{} ->
verify_match(Msg, Matcher);
not_found ->
{precondition_failed, not_found};
Error = {error, _, _} ->
Error
end;
verify_precondition(Mod, Ctx, unless_exists, Matcher) ->
case Mod:lookup_message(Ctx, Matcher) of
Msg = #message{} ->
verify_nomatch(Msg, Matcher);
not_found ->
ok;
Error = {error, _, _} ->
Error
end.
verify_match(Msg, Matcher) ->
case matches(Msg, Matcher) of
true -> ok;
false -> {precondition_failed, Msg}
end.
verify_nomatch(Msg, Matcher) ->
case matches(Msg, Matcher) of
false -> ok;
true -> {precondition_failed, Msg}
end.
-spec matches(emqx_types:message(), matcher()) -> boolean().
matches(
Message,
#message_matcher{from = From, topic = Topic, payload = Pat, headers = Headers}
) ->
case Message of
#message{from = From, topic = Topic} when Pat =:= '_' ->
matches_headers(Message, Headers);
#message{from = From, topic = Topic, payload = Pat} ->
matches_headers(Message, Headers);
_ ->
false
end.
matches_headers(_Message, MatchHeaders) when map_size(MatchHeaders) =:= 0 ->
true;
matches_headers(#message{headers = Headers}, MatchHeaders) ->
maps:intersect(MatchHeaders, Headers) =:= MatchHeaders.
%% Basic tests
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
conjunction_test() ->
%% Contradictory preconditions, always false.
Preconditions = [
{if_exists, matcher(<<"c1">>, <<"t/1">>, 0, '_')},
{unless_exists, matcher(<<"c1">>, <<"t/1">>, 0, '_')}
],
?assertEqual(
{precondition_failed, not_found},
verify(?MODULE, [], Preconditions)
),
%% Check that the order does not matter.
?assertEqual(
{precondition_failed, not_found},
verify(?MODULE, [], lists:reverse(Preconditions))
),
?assertEqual(
{precondition_failed, message(<<"c1">>, <<"t/1">>, 0, <<>>)},
verify(
?MODULE,
[message(<<"c1">>, <<"t/1">>, 0, <<>>)],
Preconditions
)
).
matches_test() ->
?assert(
matches(
message(<<"mtest1">>, <<"t/same">>, 12345, <<?MODULE_STRING>>),
matcher(<<"mtest1">>, <<"t/same">>, 12345, '_')
)
).
matches_headers_test() ->
?assert(
matches(
message(<<"mtest2">>, <<"t/same">>, 23456, <<?MODULE_STRING>>, #{h1 => 42, h2 => <<>>}),
matcher(<<"mtest2">>, <<"t/same">>, 23456, '_', #{h2 => <<>>})
)
).
mismatches_headers_test() ->
?assertNot(
matches(
message(<<"mtest3">>, <<"t/same">>, 23456, <<?MODULE_STRING>>, #{h1 => 42, h2 => <<>>}),
matcher(<<"mtest3">>, <<"t/same">>, 23456, '_', #{h2 => <<>>, h3 => <<"required">>})
)
).
matcher(ClientID, Topic, TS, Payload) ->
matcher(ClientID, Topic, TS, Payload, #{}).
matcher(ClientID, Topic, TS, Payload, Headers) ->
#message_matcher{
from = ClientID,
topic = Topic,
timestamp = TS,
payload = Payload,
headers = Headers
}.
message(ClientID, Topic, TS, Payload) ->
message(ClientID, Topic, TS, Payload, #{}).
message(ClientID, Topic, TS, Payload, Headers) ->
#message{
id = <<>>,
qos = 0,
from = ClientID,
topic = Topic,
timestamp = TS,
payload = Payload,
headers = Headers
}.
lookup_message(Messages, Matcher) ->
case lists:search(fun(M) -> matches(M, Matcher) end, Messages) of
{value, Message} ->
Message;
false ->
not_found
end.
-endif.

View File

@ -37,6 +37,7 @@
update_iterator/4, update_iterator/4,
next/6, next/6,
delete_next/7, delete_next/7,
lookup_message/3,
handle_event/4 handle_event/4
]). ]).
@ -46,6 +47,7 @@
-export_type([options/0]). -export_type([options/0]).
-include("emqx_ds.hrl").
-include("emqx_ds_metrics.hrl"). -include("emqx_ds_metrics.hrl").
-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("snabbkaffe/include/trace.hrl").
@ -68,10 +70,13 @@
-define(start_time, 3). -define(start_time, 3).
-define(storage_key, 4). -define(storage_key, 4).
-define(last_seen_key, 5). -define(last_seen_key, 5).
-define(cooked_payloads, 6). -define(cooked_msg_ops, 6).
-define(cooked_lts_ops, 7). -define(cooked_lts_ops, 7).
-define(cooked_ts, 8). -define(cooked_ts, 8).
%% atoms:
-define(delete, 100).
-type options() :: -type options() ::
#{ #{
bits_per_wildcard_level => pos_integer(), bits_per_wildcard_level => pos_integer(),
@ -110,7 +115,7 @@
-type cooked_batch() :: -type cooked_batch() ::
#{ #{
?cooked_payloads := [{binary(), binary()}], ?cooked_msg_ops := [{binary(), binary() | ?delete}],
?cooked_lts_ops := [{binary(), binary()}], ?cooked_lts_ops := [{binary(), binary()}],
?cooked_ts := integer() ?cooked_ts := integer()
}. }.
@ -271,24 +276,28 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) ->
-spec prepare_batch( -spec prepare_batch(
emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:shard_id(),
s(), s(),
[{emqx_ds:time(), emqx_types:message()}, ...], emqx_ds_storage_layer:batch(),
emqx_ds_storage_layer:batch_store_opts() emqx_ds_storage_layer:batch_store_opts()
) -> ) ->
{ok, cooked_batch()}. {ok, cooked_batch()}.
prepare_batch(_ShardId, S, Messages, _Options) -> prepare_batch(_ShardId, S, Batch, _Options) ->
_ = erase(?lts_persist_ops), _ = erase(?lts_persist_ops),
{Payloads, MaxTs} = {Operations, MaxTs} =
lists:mapfoldl( lists:mapfoldl(
fun({Timestamp, Msg}, Acc) -> fun
{Key, _} = make_key(S, Timestamp, Msg), ({Timestamp, Msg = #message{topic = Topic}}, Acc) ->
Payload = {Key, message_to_value_v1(Msg)}, {Key, _} = make_key(S, Timestamp, Topic),
{Payload, max(Acc, Timestamp)} Op = {Key, message_to_value_v1(Msg)},
{Op, max(Acc, Timestamp)};
({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}, Acc) ->
{Key, _} = make_key(S, Timestamp, Topic),
{_Op = {Key, ?delete}, Acc}
end, end,
0, 0,
Messages Batch
), ),
{ok, #{ {ok, #{
?cooked_payloads => Payloads, ?cooked_msg_ops => Operations,
?cooked_lts_ops => pop_lts_persist_ops(), ?cooked_lts_ops => pop_lts_persist_ops(),
?cooked_ts => MaxTs ?cooked_ts => MaxTs
}}. }}.
@ -302,7 +311,7 @@ prepare_batch(_ShardId, S, Messages, _Options) ->
commit_batch( commit_batch(
_ShardId, _ShardId,
_Data, _Data,
#{?cooked_payloads := [], ?cooked_lts_ops := LTS}, #{?cooked_msg_ops := [], ?cooked_lts_ops := LTS},
_Options _Options
) -> ) ->
%% Assert: %% Assert:
@ -311,7 +320,7 @@ commit_batch(
commit_batch( commit_batch(
_ShardId, _ShardId,
#s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars}, #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars},
#{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs}, #{?cooked_lts_ops := LtsOps, ?cooked_msg_ops := Operations, ?cooked_ts := MaxTs},
Options Options
) -> ) ->
{ok, Batch} = rocksdb:batch(), {ok, Batch} = rocksdb:batch(),
@ -326,10 +335,13 @@ commit_batch(
_ = emqx_ds_lts:trie_update(Trie, LtsOps), _ = emqx_ds_lts:trie_update(Trie, LtsOps),
%% Commit payloads: %% Commit payloads:
lists:foreach( lists:foreach(
fun({Key, Val}) -> fun
ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val)) ({Key, Val}) when is_tuple(Val) ->
ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val));
({Key, ?delete}) ->
ok = rocksdb:batch_delete(Batch, DataCF, Key)
end, end,
Payloads Operations
), ),
Result = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)), Result = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)),
rocksdb:release_batch(Batch), rocksdb:release_batch(Batch),
@ -556,6 +568,23 @@ delete_next_until(
rocksdb:iterator_close(ITHandle) rocksdb:iterator_close(ITHandle)
end. end.
-spec lookup_message(emqx_ds_storage_layer:shard_id(), s(), emqx_ds_precondition:matcher()) ->
emqx_types:message() | not_found | emqx_ds:error(_).
lookup_message(
_ShardId,
S = #s{db = DB, data = CF},
#message_matcher{topic = Topic, timestamp = Timestamp}
) ->
{Key, _} = make_key(S, Timestamp, Topic),
case rocksdb:get(DB, CF, Key, _ReadOpts = []) of
{ok, Blob} ->
deserialize(Blob);
not_found ->
not_found;
Error ->
{error, unrecoverable, {rocksdb, Error}}
end.
handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
%% If the last message was published more than one epoch ago, and %% If the last message was published more than one epoch ago, and
%% the shard remains idle, we need to advance safety cutoff %% the shard remains idle, we need to advance safety cutoff
@ -811,9 +840,9 @@ format_key(KeyMapper, Key) ->
Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)], Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)],
lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])). lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])).
-spec make_key(s(), emqx_ds:time(), emqx_types:message()) -> {binary(), [binary()]}. -spec make_key(s(), emqx_ds:time(), emqx_types:topic()) -> {binary(), [binary()]}.
make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, #message{topic = TopicBin}) -> make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, Topic) ->
Tokens = emqx_topic:words(TopicBin), Tokens = emqx_topic:words(Topic),
{TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
VaryingHashes = [hash_topic_level(I) || I <- Varying], VaryingHashes = [hash_topic_level(I) || I <- Varying],
KeyMapper = array:get(length(Varying), KeyMappers), KeyMapper = array:get(length(Varying), KeyMappers),

View File

@ -37,6 +37,9 @@
next/4, next/4,
delete_next/5, delete_next/5,
%% Preconditions
lookup_message/2,
%% Generations %% Generations
update_config/3, update_config/3,
add_generation/2, add_generation/2,
@ -61,6 +64,7 @@
-export_type([ -export_type([
gen_id/0, gen_id/0,
generation/0, generation/0,
batch/0,
cf_refs/0, cf_refs/0,
stream/0, stream/0,
delete_stream/0, delete_stream/0,
@ -74,6 +78,7 @@
batch_store_opts/0 batch_store_opts/0
]). ]).
-include("emqx_ds.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
@ -115,6 +120,11 @@
-type gen_id() :: 0..16#ffff. -type gen_id() :: 0..16#ffff.
-type batch() :: [
{emqx_ds:time(), emqx_types:message()}
| emqx_ds:deletion()
].
%% Options affecting how batches should be stored. %% Options affecting how batches should be stored.
%% See also: `emqx_ds:message_store_opts()'. %% See also: `emqx_ds:message_store_opts()'.
-type batch_store_opts() :: -type batch_store_opts() ::
@ -294,6 +304,10 @@
| {ok, end_of_stream} | {ok, end_of_stream}
| emqx_ds:error(_). | emqx_ds:error(_).
%% Lookup a single message, for preconditions to work.
-callback lookup_message(shard_id(), generation_data(), emqx_ds_precondition:matcher()) ->
emqx_types:message() | not_found | emqx_ds:error(_).
-callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) -> -callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) ->
[CustomEvent]. [CustomEvent].
@ -317,14 +331,10 @@ drop_shard(Shard) ->
%% @doc This is a convenicence wrapper that combines `prepare' and %% @doc This is a convenicence wrapper that combines `prepare' and
%% `commit' operations. %% `commit' operations.
-spec store_batch( -spec store_batch(shard_id(), batch(), batch_store_opts()) ->
shard_id(),
[{emqx_ds:time(), emqx_types:message()}],
batch_store_opts()
) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
store_batch(Shard, Messages, Options) -> store_batch(Shard, Batch, Options) ->
case prepare_batch(Shard, Messages, #{}) of case prepare_batch(Shard, Batch, #{}) of
{ok, CookedBatch} -> {ok, CookedBatch} ->
commit_batch(Shard, CookedBatch, Options); commit_batch(Shard, CookedBatch, Options);
ignore -> ignore ->
@ -342,23 +352,21 @@ store_batch(Shard, Messages, Options) ->
%% %%
%% The underlying storage layout MAY use timestamp as a unique message %% The underlying storage layout MAY use timestamp as a unique message
%% ID. %% ID.
-spec prepare_batch( -spec prepare_batch(shard_id(), batch(), batch_prepare_opts()) ->
shard_id(), {ok, cooked_batch()} | ignore | emqx_ds:error(_).
[{emqx_ds:time(), emqx_types:message()}], prepare_batch(Shard, Batch, Options) ->
batch_prepare_opts()
) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_).
prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
%% NOTE %% NOTE
%% We assume that batches do not span generations. Callers should enforce this. %% We assume that batches do not span generations. Callers should enforce this.
?tp(emqx_ds_storage_layer_prepare_batch, #{ ?tp(emqx_ds_storage_layer_prepare_batch, #{
shard => Shard, messages => Messages, options => Options shard => Shard, batch => Batch, options => Options
}), }),
%% FIXME: always store messages in the current generation %% FIXME: always store messages in the current generation
case generation_at(Shard, Time) of Time = batch_starts_at(Batch),
case is_integer(Time) andalso generation_at(Shard, Time) of
{GenId, #{module := Mod, data := GenData}} -> {GenId, #{module := Mod, data := GenData}} ->
T0 = erlang:monotonic_time(microsecond), T0 = erlang:monotonic_time(microsecond),
Result = Result =
case Mod:prepare_batch(Shard, GenData, Messages, Options) of case Mod:prepare_batch(Shard, GenData, Batch, Options) of
{ok, CookedBatch} -> {ok, CookedBatch} ->
{ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
Error = {error, _, _} -> Error = {error, _, _} ->
@ -368,11 +376,21 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
%% TODO store->prepare %% TODO store->prepare
emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
Result; Result;
false ->
%% No write operations in this batch.
ignore;
not_found -> not_found ->
%% Generation is likely already GCed.
ignore ignore
end; end.
prepare_batch(_Shard, [], _Options) ->
ignore. -spec batch_starts_at(batch()) -> emqx_ds:time() | undefined.
batch_starts_at([{Time, _Message} | _]) when is_integer(Time) ->
Time;
batch_starts_at([{delete, #message_matcher{timestamp = Time}} | _]) ->
Time;
batch_starts_at([]) ->
undefined.
%% @doc Commit cooked batch to the storage. %% @doc Commit cooked batch to the storage.
%% %%
@ -559,6 +577,16 @@ update_config(ShardId, Since, Options) ->
add_generation(ShardId, Since) -> add_generation(ShardId, Since) ->
gen_server:call(?REF(ShardId), #call_add_generation{since = Since}, infinity). gen_server:call(?REF(ShardId), #call_add_generation{since = Since}, infinity).
-spec lookup_message(shard_id(), emqx_ds_precondition:matcher()) ->
emqx_types:message() | not_found | emqx_ds:error(_).
lookup_message(ShardId, Matcher = #message_matcher{timestamp = Time}) ->
case generation_at(ShardId, Time) of
{_GenId, #{module := Mod, data := GenData}} ->
Mod:lookup_message(ShardId, GenData, Matcher);
not_found ->
not_found
end.
-spec list_generations_with_lifetimes(shard_id()) -> -spec list_generations_with_lifetimes(shard_id()) ->
#{ #{
gen_id() => #{ gen_id() => #{

View File

@ -21,6 +21,8 @@
%% used for testing. %% used for testing.
-module(emqx_ds_storage_reference). -module(emqx_ds_storage_reference).
-include("emqx_ds.hrl").
-behaviour(emqx_ds_storage_layer). -behaviour(emqx_ds_storage_layer).
%% API: %% API:
@ -39,7 +41,8 @@
make_delete_iterator/5, make_delete_iterator/5,
update_iterator/4, update_iterator/4,
next/6, next/6,
delete_next/7 delete_next/7,
lookup_message/3
]). ]).
%% internal exports: %% internal exports:
@ -49,6 +52,8 @@
-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl").
-define(DB_KEY(TIMESTAMP), <<TIMESTAMP:64>>).
%%================================================================================ %%================================================================================
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
@ -102,23 +107,22 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
ok = rocksdb:drop_column_family(DBHandle, CFHandle), ok = rocksdb:drop_column_family(DBHandle, CFHandle),
ok. ok.
prepare_batch(_ShardId, _Data, Messages, _Options) -> prepare_batch(_ShardId, _Data, Batch, _Options) ->
{ok, Messages}. {ok, Batch}.
commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, Options) -> commit_batch(_ShardId, S = #s{db = DB}, Batch, Options) ->
{ok, Batch} = rocksdb:batch(), {ok, BatchHandle} = rocksdb:batch(),
lists:foreach( lists:foreach(fun(Op) -> process_batch_operation(S, Op, BatchHandle) end, Batch),
fun({TS, Msg}) -> Res = rocksdb:write_batch(DB, BatchHandle, write_batch_opts(Options)),
Key = <<TS:64>>, rocksdb:release_batch(BatchHandle),
Val = term_to_binary(Msg),
rocksdb:batch_put(Batch, CF, Key, Val)
end,
Messages
),
Res = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)),
rocksdb:release_batch(Batch),
Res. Res.
process_batch_operation(S, {TS, Msg = #message{}}, BatchHandle) ->
Val = encode_message(Msg),
rocksdb:batch_put(BatchHandle, S#s.cf, ?DB_KEY(TS), Val);
process_batch_operation(S, {delete, #message_matcher{timestamp = TS}}, BatchHandle) ->
rocksdb:batch_delete(BatchHandle, S#s.cf, ?DB_KEY(TS)).
get_streams(_Shard, _Data, _TopicFilter, _StartTime) -> get_streams(_Shard, _Data, _TopicFilter, _StartTime) ->
[#stream{}]. [#stream{}].
@ -205,6 +209,16 @@ delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now, IsCurr
{ok, It, NumDeleted, NumIterated} {ok, It, NumDeleted, NumIterated}
end. end.
lookup_message(_ShardId, #s{db = DB, cf = CF}, #message_matcher{timestamp = TS}) ->
case rocksdb:get(DB, CF, ?DB_KEY(TS), _ReadOpts = []) of
{ok, Val} ->
decode_message(Val);
not_found ->
not_found;
{error, Reason} ->
{error, unrecoverable, Reason}
end.
%%================================================================================ %%================================================================================
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================
@ -214,7 +228,7 @@ do_next(_, _, _, _, 0, Key, Acc) ->
do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) -> do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) ->
case rocksdb:iterator_move(IT, Action) of case rocksdb:iterator_move(IT, Action) of
{ok, Key = <<TS:64>>, Blob} -> {ok, Key = <<TS:64>>, Blob} ->
Msg = #message{topic = Topic} = binary_to_term(Blob), Msg = #message{topic = Topic} = decode_message(Blob),
TopicWords = emqx_topic:words(Topic), TopicWords = emqx_topic:words(Topic),
case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of
true -> true ->
@ -234,7 +248,7 @@ do_delete_next(
) -> ) ->
case rocksdb:iterator_move(IT, Action) of case rocksdb:iterator_move(IT, Action) of
{ok, Key, Blob} -> {ok, Key, Blob} ->
Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob), Msg = #message{topic = Topic, timestamp = TS} = decode_message(Blob),
TopicWords = emqx_topic:words(Topic), TopicWords = emqx_topic:words(Topic),
case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of
true -> true ->
@ -285,6 +299,12 @@ do_delete_next(
{Key0, {AccDel, AccIter}} {Key0, {AccDel, AccIter}}
end. end.
encode_message(Msg) ->
term_to_binary(Msg).
decode_message(Val) ->
binary_to_term(Val).
%% @doc Generate a column family ID for the MQTT messages %% @doc Generate a column family ID for the MQTT messages
-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
data_cf(GenId) -> data_cf(GenId) ->

View File

@ -33,7 +33,8 @@
make_delete_iterator/5, make_delete_iterator/5,
update_iterator/4, update_iterator/4,
next/6, next/6,
delete_next/7 delete_next/7,
lookup_message/3
]). ]).
%% internal exports: %% internal exports:
@ -43,6 +44,7 @@
-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("snabbkaffe/include/trace.hrl"). -include_lib("snabbkaffe/include/trace.hrl").
-include("emqx_ds.hrl").
-include("emqx_ds_metrics.hrl"). -include("emqx_ds_metrics.hrl").
-ifdef(TEST). -ifdef(TEST).
@ -56,11 +58,12 @@
%%================================================================================ %%================================================================================
%% TLOG entry %% TLOG entry
%% keys: %% Keys:
-define(cooked_payloads, 6). -define(cooked_msg_ops, 6).
-define(cooked_lts_ops, 7). -define(cooked_lts_ops, 7).
%% Payload: %% Payload:
-define(cooked_payload(TIMESTAMP, STATIC, VARYING, VALUE), -define(cooked_delete, 100).
-define(cooked_msg_op(TIMESTAMP, STATIC, VARYING, VALUE),
{TIMESTAMP, STATIC, VARYING, VALUE} {TIMESTAMP, STATIC, VARYING, VALUE}
). ).
@ -176,25 +179,39 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF,
ok = rocksdb:drop_column_family(DBHandle, TrieCF), ok = rocksdb:drop_column_family(DBHandle, TrieCF),
ok. ok.
prepare_batch(_ShardId, S = #s{trie = Trie}, Messages, _Options) -> prepare_batch(
_ShardId,
S = #s{trie = Trie},
Operations,
_Options
) ->
_ = erase(?lts_persist_ops), _ = erase(?lts_persist_ops),
Payloads = [ OperationsCooked = emqx_utils:flattermap(
begin fun
Tokens = words(Topic), ({Timestamp, Msg = #message{topic = Topic}}) ->
{Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), Tokens = words(Topic),
?cooked_payload(Timestamp, Static, Varying, serialize(S, Varying, Msg)) {Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
end ?cooked_msg_op(Timestamp, Static, Varying, serialize(S, Varying, Msg));
|| {Timestamp, Msg = #message{topic = Topic}} <- Messages ({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}) ->
], case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
{ok, {Static, Varying}} ->
?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete);
undefined ->
%% Topic is unknown, nothing to delete.
[]
end
end,
Operations
),
{ok, #{ {ok, #{
?cooked_payloads => Payloads, ?cooked_msg_ops => OperationsCooked,
?cooked_lts_ops => pop_lts_persist_ops() ?cooked_lts_ops => pop_lts_persist_ops()
}}. }}.
commit_batch( commit_batch(
_ShardId, _ShardId,
#s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie, hash_bytes = HashBytes}, #s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie, hash_bytes = HashBytes},
#{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads}, #{?cooked_lts_ops := LtsOps, ?cooked_msg_ops := Operations},
Options Options
) -> ) ->
{ok, Batch} = rocksdb:batch(), {ok, Batch} = rocksdb:batch(),
@ -210,12 +227,17 @@ commit_batch(
_ = emqx_ds_lts:trie_update(Trie, LtsOps), _ = emqx_ds_lts:trie_update(Trie, LtsOps),
%% Commit payloads: %% Commit payloads:
lists:foreach( lists:foreach(
fun(?cooked_payload(Timestamp, Static, Varying, ValBlob)) -> fun
MasterKey = mk_key(Static, 0, <<>>, Timestamp), (?cooked_msg_op(Timestamp, Static, Varying, ValBlob = <<_/bytes>>)) ->
ok = rocksdb:batch_put(Batch, DataCF, MasterKey, ValBlob), MasterKey = mk_key(Static, 0, <<>>, Timestamp),
mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp) ok = rocksdb:batch_put(Batch, DataCF, MasterKey, ValBlob),
mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp);
(?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete)) ->
MasterKey = mk_key(Static, 0, <<>>, Timestamp),
ok = rocksdb:batch_delete(Batch, DataCF, MasterKey),
delete_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp)
end, end,
Payloads Operations
), ),
Result = rocksdb:write_batch(DB, Batch, [ Result = rocksdb:write_batch(DB, Batch, [
{disable_wal, not maps:get(durable, Options, true)} {disable_wal, not maps:get(durable, Options, true)}
@ -285,6 +307,28 @@ delete_next(Shard, S, It0, Selector, BatchSize, Now, IsCurrent) ->
Ret Ret
end. end.
lookup_message(
Shard,
S = #s{db = DB, data_cf = CF, trie = Trie},
#message_matcher{topic = Topic, timestamp = Timestamp}
) ->
case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
{ok, {StaticIdx, _Varying}} ->
DSKey = mk_key(StaticIdx, 0, <<>>, Timestamp),
case rocksdb:get(DB, CF, DSKey, _ReadOpts = []) of
{ok, Val} ->
{ok, TopicStructure} = emqx_ds_lts:reverse_lookup(Trie, StaticIdx),
Msg = deserialize(S, Val),
enrich(Shard, S, TopicStructure, DSKey, Msg);
not_found ->
not_found;
{error, Reason} ->
{error, unrecoverable, Reason}
end;
undefined ->
not_found
end.
%%================================================================================ %%================================================================================
%% Internal exports %% Internal exports
%%================================================================================ %%================================================================================
@ -330,12 +374,18 @@ serialize(#s{serialization_schema = SSchema, with_guid = WithGuid}, Varying, Msg
}, },
emqx_ds_msg_serializer:serialize(SSchema, Msg). emqx_ds_msg_serializer:serialize(SSchema, Msg).
enrich(#ctx{shard = Shard, s = S, topic_structure = TopicStructure}, DSKey, Msg0) ->
enrich(Shard, S, TopicStructure, DSKey, Msg0).
enrich( enrich(
#ctx{shard = Shard, topic_structure = Structure, s = #s{with_guid = WithGuid}}, Shard,
#s{with_guid = WithGuid},
TopicStructure,
DSKey, DSKey,
Msg0 Msg0
) -> ) ->
Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(Structure, words(Msg0#message.topic))), Tokens = words(Msg0#message.topic),
Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(TopicStructure, Tokens)),
Msg0#message{ Msg0#message{
topic = Topic, topic = Topic,
id = id =
@ -584,6 +634,16 @@ mk_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) ->
mk_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) -> mk_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) ->
ok. ok.
delete_index(Batch, CF, HashBytes, Static, Varying, Timestamp) ->
delete_index(Batch, CF, HashBytes, Static, Timestamp, 1, Varying).
delete_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) ->
Key = mk_key(Static, N, hash(HashBytes, TopicLevel), Timestamp),
ok = rocksdb:batch_delete(Batch, CF, Key),
delete_index(Batch, CF, HashBytes, Static, Timestamp, N + 1, Varying);
delete_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) ->
ok.
%%%%%%%% Keys %%%%%%%%%% %%%%%%%% Keys %%%%%%%%%%
get_key_range(StaticIdx, WildcardIdx, Hash) -> get_key_range(StaticIdx, WildcardIdx, Hash) ->

View File

@ -18,11 +18,14 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include("emqx_ds.hrl").
-include("../../emqx/include/emqx.hrl"). -include("../../emqx/include/emqx.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))).
-define(FUTURE, (1 bsl 64 - 1)). -define(FUTURE, (1 bsl 64 - 1)).
-define(SHARD, shard(?FUNCTION_NAME)). -define(SHARD, shard(?FUNCTION_NAME)).
@ -66,6 +69,30 @@ t_store(_Config) ->
}, },
?assertMatch(ok, emqx_ds:store_batch(?FUNCTION_NAME, [Msg])). ?assertMatch(ok, emqx_ds:store_batch(?FUNCTION_NAME, [Msg])).
%% Smoke test of applying batch operations
t_operations(db_config, _Config) ->
#{force_monotonic_timestamps => false}.
t_operations(_Config) ->
Batch1 = [
make_message(100, <<"t/1">>, <<"M1">>),
make_message(200, <<"t/2">>, <<"M2">>),
make_message(300, <<"t/3">>, <<"M3">>)
],
Batch2 = [
make_deletion(200, <<"t/2">>, <<"M2">>),
make_deletion(300, <<"t/3">>, '_'),
make_deletion(400, <<"t/4">>, '_')
],
?assertEqual(ok, emqx_ds:store_batch(?FUNCTION_NAME, Batch1)),
?assertEqual(ok, emqx_ds:store_batch(?FUNCTION_NAME, Batch2)),
?assertMatch(
[
#message{timestamp = 100, topic = <<"t/1">>, payload = <<"M1">>}
],
dump_messages(?SHARD, <<"t/#">>, 0)
).
%% Smoke test for iteration through a concrete topic %% Smoke test for iteration through a concrete topic
t_iterate(_Config) -> t_iterate(_Config) ->
%% Prepare data: %% Prepare data:
@ -124,8 +151,6 @@ t_delete(_Config) ->
?assertNot(is_map_key(TopicToDelete, MessagesByTopic), #{msgs => MessagesByTopic}), ?assertNot(is_map_key(TopicToDelete, MessagesByTopic), #{msgs => MessagesByTopic}),
?assertEqual(20, length(Messages)). ?assertEqual(20, length(Messages)).
-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))).
%% Smoke test that verifies that concrete topics are mapped to %% Smoke test that verifies that concrete topics are mapped to
%% individual streams, unless there's too many of them. %% individual streams, unless there's too many of them.
t_get_streams(Config) -> t_get_streams(Config) ->
@ -417,79 +442,26 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) ->
%% || Topic <- Topics, PublishedAt <- Timestamps %% || Topic <- Topics, PublishedAt <- Timestamps
%% ]. %% ].
%% t_iterate_multigen(_Config) ->
%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
%% Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
%% Timestamps = lists:seq(1, 100),
%% _ = [
%% store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
%% || Topic <- Topics, PublishedAt <- Timestamps
%% ],
%% ?assertEqual(
%% lists:sort([
%% {Topic, PublishedAt}
%% || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
%% ]),
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
%% ),
%% ?assertEqual(
%% lists:sort([
%% {Topic, PublishedAt}
%% || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100)
%% ]),
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)])
%% ).
%% t_iterate_multigen_preserve_restore(_Config) ->
%% ReplayID = atom_to_binary(?FUNCTION_NAME),
%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
%% Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
%% Timestamps = lists:seq(1, 100),
%% TopicFilter = "foo/#",
%% TopicsMatching = ["foo/bar", "foo/bar/baz"],
%% _ = [
%% store(?SHARD, TS, Topic, term_to_binary({Topic, TS}))
%% || Topic <- Topics, TS <- Timestamps
%% ],
%% It0 = iterator(?SHARD, TopicFilter, 0),
%% {It1, Res10} = iterate(It0, 10),
%% % preserve mid-generation
%% ok = emqx_ds_storage_layer:preserve_iterator(It1, ReplayID),
%% {ok, It2} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
%% {It3, Res100} = iterate(It2, 88),
%% % preserve on the generation boundary
%% ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID),
%% {ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
%% {It5, Res200} = iterate(It4, 1000),
%% ?assertEqual({end_of_stream, []}, iterate(It5, 1)),
%% ?assertEqual(
%% lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]),
%% lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200])
%% ),
%% ?assertEqual(
%% ok,
%% emqx_ds_storage_layer:discard_iterator(?SHARD, ReplayID)
%% ),
%% ?assertEqual(
%% {error, not_found},
%% emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID)
%% ).
make_message(PublishedAt, Topic, Payload) when is_list(Topic) -> make_message(PublishedAt, Topic, Payload) when is_list(Topic) ->
make_message(PublishedAt, list_to_binary(Topic), Payload); make_message(PublishedAt, list_to_binary(Topic), Payload);
make_message(PublishedAt, Topic, Payload) when is_binary(Topic) -> make_message(PublishedAt, Topic, Payload) when is_binary(Topic) ->
ID = emqx_guid:gen(), ID = emqx_guid:gen(),
#message{ #message{
id = ID, id = ID,
from = <<?MODULE_STRING>>,
topic = Topic, topic = Topic,
timestamp = PublishedAt, timestamp = PublishedAt,
payload = Payload payload = Payload
}. }.
make_deletion(Timestamp, Topic, Payload) ->
{delete, #message_matcher{
from = <<?MODULE_STRING>>,
topic = Topic,
timestamp = Timestamp,
payload = Payload
}}.
make_topic(Tokens = [_ | _]) -> make_topic(Tokens = [_ | _]) ->
emqx_topic:join([bin(T) || T <- Tokens]). emqx_topic:join([bin(T) || T <- Tokens]).
@ -535,13 +507,23 @@ end_per_suite(Config) ->
ok. ok.
init_per_testcase(TC, Config) -> init_per_testcase(TC, Config) ->
ok = emqx_ds:open_db(TC, ?DB_CONFIG(Config)), ok = emqx_ds:open_db(TC, db_config(TC, Config)),
Config. Config.
end_per_testcase(TC, _Config) -> end_per_testcase(TC, _Config) ->
emqx_ds:drop_db(TC), emqx_ds:drop_db(TC),
ok. ok.
db_config(TC, Config) ->
ConfigBase = ?DB_CONFIG(Config),
SpecificConfig =
try
?MODULE:TC(?FUNCTION_NAME, Config)
catch
error:undef -> #{}
end,
maps:merge(ConfigBase, SpecificConfig).
shard(TC) -> shard(TC) ->
{TC, <<"0">>}. {TC, <<"0">>}.

View File

@ -377,7 +377,7 @@ nodes_of_clientid(DB, ClientId, Nodes = [N0 | _]) ->
shard_of_clientid(DB, Node, ClientId) -> shard_of_clientid(DB, Node, ClientId) ->
?ON( ?ON(
Node, Node,
emqx_ds_buffer:shard_of_message(DB, #message{from = ClientId}, clientid) emqx_ds_buffer:shard_of_operation(DB, #message{from = ClientId}, clientid)
). ).
%% Consume eagerly: %% Consume eagerly: