Merge pull request #13578 from thalesmg/20240806-r58-port-raft-precond
feat(dsraft): support atomic batches + preconditions (release-58)
This commit is contained in:
commit
cf608a73a5
|
@ -19,6 +19,7 @@
|
|||
-compile(nowarn_export_all).
|
||||
|
||||
-include("../../emqx/include/emqx.hrl").
|
||||
-include("../../emqx_durable_storage/include/emqx_ds.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("stdlib/include/assert.hrl").
|
||||
-include("../../emqx/include/asserts.hrl").
|
||||
|
@ -145,7 +146,7 @@ t_06_smoke_add_generation(Config) ->
|
|||
?assertMatch(ok, emqx_ds:add_generation(DB)),
|
||||
[
|
||||
{Gen1, #{created_at := Created1, since := Since1, until := Until1}},
|
||||
{Gen2, #{created_at := Created2, since := Since2, until := undefined}}
|
||||
{_Gen2, #{created_at := Created2, since := Since2, until := undefined}}
|
||||
] = maps:to_list(emqx_ds:list_generations_with_lifetimes(DB)),
|
||||
%% Check units of the return values (+/- 10s from test begin time):
|
||||
?give_or_take(BeginTime, 10_000, Created1),
|
||||
|
@ -234,8 +235,8 @@ t_09_atomic_store_batch(Config) ->
|
|||
DB = ?FUNCTION_NAME,
|
||||
?check_trace(
|
||||
begin
|
||||
application:set_env(emqx_durable_storage, egress_batch_size, 1),
|
||||
?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
|
||||
DBOpts = (opts(Config))#{atomic_batches => true},
|
||||
?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)),
|
||||
Msgs = [
|
||||
message(<<"1">>, <<"1">>, 0),
|
||||
message(<<"2">>, <<"2">>, 1),
|
||||
|
@ -243,13 +244,8 @@ t_09_atomic_store_batch(Config) ->
|
|||
],
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_ds:store_batch(DB, Msgs, #{
|
||||
atomic => true,
|
||||
sync => true
|
||||
})
|
||||
),
|
||||
{ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_buffer_flush}),
|
||||
?assertMatch(#{batch := [_, _, _]}, Flush)
|
||||
emqx_ds:store_batch(DB, Msgs, #{sync => true})
|
||||
)
|
||||
end,
|
||||
[]
|
||||
),
|
||||
|
@ -289,6 +285,124 @@ t_10_non_atomic_store_batch(Config) ->
|
|||
),
|
||||
ok.
|
||||
|
||||
t_11_batch_preconditions(Config) ->
|
||||
DB = ?FUNCTION_NAME,
|
||||
?check_trace(
|
||||
begin
|
||||
DBOpts = (opts(Config))#{
|
||||
atomic_batches => true,
|
||||
force_monotonic_timestamps => false
|
||||
},
|
||||
?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)),
|
||||
|
||||
%% Conditional delete
|
||||
TS = 42,
|
||||
Batch1 = #dsbatch{
|
||||
preconditions = [{if_exists, matcher(<<"c1">>, <<"t/a">>, '_', TS)}],
|
||||
operations = [{delete, matcher(<<"c1">>, <<"t/a">>, '_', TS)}]
|
||||
},
|
||||
%% Conditional insert
|
||||
M1 = message(<<"c1">>, <<"t/a">>, <<"M1">>, TS),
|
||||
Batch2 = #dsbatch{
|
||||
preconditions = [{unless_exists, matcher(<<"c1">>, <<"t/a">>, '_', TS)}],
|
||||
operations = [M1]
|
||||
},
|
||||
|
||||
%% No such message yet, precondition fails:
|
||||
?assertEqual(
|
||||
{error, unrecoverable, {precondition_failed, not_found}},
|
||||
emqx_ds:store_batch(DB, Batch1)
|
||||
),
|
||||
%% No such message yet, `unless` precondition holds:
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_ds:store_batch(DB, Batch2)
|
||||
),
|
||||
%% Now there's such message, `unless` precondition now fails:
|
||||
?assertEqual(
|
||||
{error, unrecoverable, {precondition_failed, M1}},
|
||||
emqx_ds:store_batch(DB, Batch2)
|
||||
),
|
||||
%% On the other hand, `if` precondition now holds:
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_ds:store_batch(DB, Batch1)
|
||||
),
|
||||
|
||||
%% Wait at least until current epoch ends.
|
||||
ct:sleep(1000),
|
||||
%% There's no messages in the DB.
|
||||
?assertEqual(
|
||||
[],
|
||||
emqx_ds_test_helpers:consume(DB, emqx_topic:words(<<"t/#">>))
|
||||
)
|
||||
end,
|
||||
[]
|
||||
).
|
||||
|
||||
t_12_batch_precondition_conflicts(Config) ->
|
||||
DB = ?FUNCTION_NAME,
|
||||
NBatches = 50,
|
||||
NMessages = 10,
|
||||
?check_trace(
|
||||
begin
|
||||
DBOpts = (opts(Config))#{
|
||||
atomic_batches => true,
|
||||
force_monotonic_timestamps => false
|
||||
},
|
||||
?assertMatch(ok, emqx_ds:open_db(DB, DBOpts)),
|
||||
|
||||
ConflictBatches = [
|
||||
#dsbatch{
|
||||
%% If the slot is free...
|
||||
preconditions = [{if_exists, matcher(<<"c1">>, <<"t/slot">>, _Free = <<>>, 0)}],
|
||||
%% Take it and write NMessages extra messages, so that batches take longer to
|
||||
%% process and have higher chances to conflict with each other.
|
||||
operations =
|
||||
[
|
||||
message(<<"c1">>, <<"t/slot">>, integer_to_binary(I), _TS = 0)
|
||||
| [
|
||||
message(<<"c1">>, {"t/owner/~p/~p", [I, J]}, <<>>, I * 100 + J)
|
||||
|| J <- lists:seq(1, NMessages)
|
||||
]
|
||||
]
|
||||
}
|
||||
|| I <- lists:seq(1, NBatches)
|
||||
],
|
||||
|
||||
%% Run those batches concurrently.
|
||||
ok = emqx_ds:store_batch(DB, [message(<<"c1">>, <<"t/slot">>, <<>>, 0)]),
|
||||
Results = emqx_utils:pmap(
|
||||
fun(B) -> emqx_ds:store_batch(DB, B) end,
|
||||
ConflictBatches,
|
||||
infinity
|
||||
),
|
||||
|
||||
%% Only one should have succeeded.
|
||||
?assertEqual([ok], [Ok || Ok = ok <- Results]),
|
||||
|
||||
%% While other failed with an identical `precondition_failed`.
|
||||
Failures = lists:usort([PreconditionFailed || {error, _, PreconditionFailed} <- Results]),
|
||||
?assertMatch(
|
||||
[{precondition_failed, #message{topic = <<"t/slot">>, payload = <<_/bytes>>}}],
|
||||
Failures
|
||||
),
|
||||
|
||||
%% Wait at least until current epoch ends.
|
||||
ct:sleep(1000),
|
||||
%% Storage should contain single batch's messages.
|
||||
[{precondition_failed, #message{payload = IOwner}}] = Failures,
|
||||
WinnerBatch = lists:nth(binary_to_integer(IOwner), ConflictBatches),
|
||||
BatchMessages = lists:sort(WinnerBatch#dsbatch.operations),
|
||||
DBMessages = emqx_ds_test_helpers:consume(DB, emqx_topic:words(<<"t/#">>)),
|
||||
?assertEqual(
|
||||
BatchMessages,
|
||||
DBMessages
|
||||
)
|
||||
end,
|
||||
[]
|
||||
).
|
||||
|
||||
t_smoke_delete_next(Config) ->
|
||||
DB = ?FUNCTION_NAME,
|
||||
?check_trace(
|
||||
|
@ -534,12 +648,25 @@ message(ClientId, Topic, Payload, PublishedAt) ->
|
|||
|
||||
message(Topic, Payload, PublishedAt) ->
|
||||
#message{
|
||||
topic = Topic,
|
||||
payload = Payload,
|
||||
topic = try_format(Topic),
|
||||
payload = try_format(Payload),
|
||||
timestamp = PublishedAt,
|
||||
id = emqx_guid:gen()
|
||||
}.
|
||||
|
||||
matcher(ClientID, Topic, Payload, Timestamp) ->
|
||||
#message_matcher{
|
||||
from = ClientID,
|
||||
topic = try_format(Topic),
|
||||
timestamp = Timestamp,
|
||||
payload = Payload
|
||||
}.
|
||||
|
||||
try_format({Fmt, Args}) ->
|
||||
emqx_utils:format(Fmt, Args);
|
||||
try_format(String) ->
|
||||
String.
|
||||
|
||||
delete(DB, It, Selector, BatchSize) ->
|
||||
delete(DB, It, Selector, BatchSize, 0).
|
||||
|
||||
|
@ -562,9 +689,18 @@ all() ->
|
|||
|
||||
groups() ->
|
||||
TCs = emqx_common_test_helpers:all(?MODULE),
|
||||
%% TODO: Remove once builtin-local supports preconditions + atomic batches.
|
||||
BuiltinLocalTCs =
|
||||
TCs --
|
||||
[
|
||||
t_09_atomic_store_batch,
|
||||
t_11_batch_preconditions,
|
||||
t_12_batch_precondition_conflicts
|
||||
],
|
||||
BuiltinRaftTCs = TCs,
|
||||
[
|
||||
{builtin_local, TCs},
|
||||
{builtin_raft, TCs}
|
||||
{builtin_local, BuiltinLocalTCs},
|
||||
{builtin_raft, BuiltinRaftTCs}
|
||||
].
|
||||
|
||||
init_per_group(builtin_local, Config) ->
|
||||
|
|
|
@ -43,7 +43,7 @@
|
|||
%% `emqx_ds_buffer':
|
||||
init_buffer/3,
|
||||
flush_buffer/4,
|
||||
shard_of_message/4
|
||||
shard_of_operation/4
|
||||
]).
|
||||
|
||||
%% Internal exports:
|
||||
|
@ -55,6 +55,7 @@
|
|||
-export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
|
||||
|
||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||
-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
|
||||
|
||||
%%================================================================================
|
||||
%% Type declarations
|
||||
|
@ -230,9 +231,9 @@ flush_buffer(DB, Shard, Messages, S0 = #bs{options = Options}) ->
|
|||
make_batch(_ForceMonotonic = true, Latest, Messages) ->
|
||||
assign_monotonic_timestamps(Latest, Messages, []);
|
||||
make_batch(false, Latest, Messages) ->
|
||||
assign_message_timestamps(Latest, Messages, []).
|
||||
assign_operation_timestamps(Latest, Messages, []).
|
||||
|
||||
assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) ->
|
||||
assign_monotonic_timestamps(Latest0, [Message = #message{} | Rest], Acc0) ->
|
||||
case emqx_message:timestamp(Message, microsecond) of
|
||||
TimestampUs when TimestampUs > Latest0 ->
|
||||
Latest = TimestampUs;
|
||||
|
@ -241,28 +242,43 @@ assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) ->
|
|||
end,
|
||||
Acc = [assign_timestamp(Latest, Message) | Acc0],
|
||||
assign_monotonic_timestamps(Latest, Rest, Acc);
|
||||
assign_monotonic_timestamps(Latest, [Operation | Rest], Acc0) ->
|
||||
Acc = [Operation | Acc0],
|
||||
assign_monotonic_timestamps(Latest, Rest, Acc);
|
||||
assign_monotonic_timestamps(Latest, [], Acc) ->
|
||||
{Latest, lists:reverse(Acc)}.
|
||||
|
||||
assign_message_timestamps(Latest0, [Message | Rest], Acc0) ->
|
||||
TimestampUs = emqx_message:timestamp(Message, microsecond),
|
||||
assign_operation_timestamps(Latest0, [Message = #message{} | Rest], Acc0) ->
|
||||
TimestampUs = emqx_message:timestamp(Message),
|
||||
Latest = max(TimestampUs, Latest0),
|
||||
Acc = [assign_timestamp(TimestampUs, Message) | Acc0],
|
||||
assign_message_timestamps(Latest, Rest, Acc);
|
||||
assign_message_timestamps(Latest, [], Acc) ->
|
||||
assign_operation_timestamps(Latest, Rest, Acc);
|
||||
assign_operation_timestamps(Latest, [Operation | Rest], Acc0) ->
|
||||
Acc = [Operation | Acc0],
|
||||
assign_operation_timestamps(Latest, Rest, Acc);
|
||||
assign_operation_timestamps(Latest, [], Acc) ->
|
||||
{Latest, lists:reverse(Acc)}.
|
||||
|
||||
assign_timestamp(TimestampUs, Message) ->
|
||||
{TimestampUs, Message}.
|
||||
|
||||
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) -> shard().
|
||||
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
|
||||
-spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic, _Options) -> shard().
|
||||
shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
|
||||
case SerializeBy of
|
||||
clientid -> Key = From;
|
||||
topic -> Key = Topic
|
||||
end,
|
||||
shard_of_key(DB, Key);
|
||||
shard_of_operation(DB, {_, #message_matcher{from = From, topic = Topic}}, SerializeBy, _Options) ->
|
||||
case SerializeBy of
|
||||
clientid -> Key = From;
|
||||
topic -> Key = Topic
|
||||
end,
|
||||
shard_of_key(DB, Key).
|
||||
|
||||
shard_of_key(DB, Key) ->
|
||||
N = emqx_ds_builtin_local_meta:n_shards(DB),
|
||||
Hash =
|
||||
case SerializeBy of
|
||||
clientid -> erlang:phash2(From, N);
|
||||
topic -> erlang:phash2(Topic, N)
|
||||
end,
|
||||
Hash = erlang:phash2(Key, N),
|
||||
integer_to_binary(Hash).
|
||||
|
||||
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
||||
|
@ -288,7 +304,7 @@ get_streams(DB, TopicFilter, StartTime) ->
|
|||
-spec make_iterator(
|
||||
emqx_ds:db(), emqx_ds:ds_specific_stream(), emqx_ds:topic_filter(), emqx_ds:time()
|
||||
) ->
|
||||
emqx_ds:make_iterator_result(emqx_ds:ds_specific_iterator()).
|
||||
emqx_ds:make_iterator_result(iterator()).
|
||||
make_iterator(DB, ?stream(Shard, InnerStream), TopicFilter, StartTime) ->
|
||||
ShardId = {DB, Shard},
|
||||
case
|
||||
|
@ -302,7 +318,7 @@ make_iterator(DB, ?stream(Shard, InnerStream), TopicFilter, StartTime) ->
|
|||
Error
|
||||
end.
|
||||
|
||||
-spec update_iterator(emqx_ds:db(), emqx_ds:ds_specific_iterator(), emqx_ds:message_key()) ->
|
||||
-spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) ->
|
||||
emqx_ds:make_iterator_result(iterator()).
|
||||
update_iterator(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, Key) ->
|
||||
case emqx_ds_storage_layer:update_iterator({DB, Shard}, StorageIter0, Key) of
|
||||
|
@ -380,7 +396,7 @@ do_next(DB, Iter0 = #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0}, N) ->
|
|||
end.
|
||||
|
||||
-spec do_delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
|
||||
emqx_ds:delete_next_result(emqx_ds:delete_iterator()).
|
||||
emqx_ds:delete_next_result(delete_iterator()).
|
||||
do_delete_next(
|
||||
DB, Iter = #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0}, Selector, N
|
||||
) ->
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
|
||||
current_timestamp/2,
|
||||
|
||||
shard_of_message/4,
|
||||
shard_of_operation/4,
|
||||
flush_buffer/4,
|
||||
init_buffer/3
|
||||
]).
|
||||
|
@ -83,6 +83,7 @@
|
|||
ra_state/0
|
||||
]).
|
||||
|
||||
-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
|
||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
-include("emqx_ds_replication_layer.hrl").
|
||||
|
@ -100,7 +101,10 @@
|
|||
n_shards => pos_integer(),
|
||||
n_sites => pos_integer(),
|
||||
replication_factor => pos_integer(),
|
||||
replication_options => _TODO :: #{}
|
||||
replication_options => _TODO :: #{},
|
||||
%% Inherited from `emqx_ds:generic_db_opts()`.
|
||||
force_monotonic_timestamps => boolean(),
|
||||
atomic_batches => boolean()
|
||||
}.
|
||||
|
||||
%% This enapsulates the stream entity from the replication level.
|
||||
|
@ -135,11 +139,12 @@
|
|||
?enc := emqx_ds_storage_layer:delete_iterator()
|
||||
}.
|
||||
|
||||
%% TODO: this type is obsolete and is kept only for compatibility with
|
||||
%% BPAPIs. Remove it when emqx_ds_proto_v4 is gone (EMQX 5.6)
|
||||
%% Write batch.
|
||||
%% Instances of this type currently form the majority of the Raft log.
|
||||
-type batch() :: #{
|
||||
?tag := ?BATCH,
|
||||
?batch_messages := [emqx_types:message()]
|
||||
?batch_operations := [emqx_ds:operation()],
|
||||
?batch_preconditions => [emqx_ds:precondition()]
|
||||
}.
|
||||
|
||||
-type generation_rank() :: {shard_id(), term()}.
|
||||
|
@ -240,16 +245,45 @@ drop_db(DB) ->
|
|||
_ = emqx_ds_proto_v4:drop_db(list_nodes(), DB),
|
||||
emqx_ds_replication_layer_meta:drop_db(DB).
|
||||
|
||||
-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) ->
|
||||
-spec store_batch(emqx_ds:db(), emqx_ds:batch(), emqx_ds:message_store_opts()) ->
|
||||
emqx_ds:store_batch_result().
|
||||
store_batch(DB, Messages, Opts) ->
|
||||
store_batch(DB, Batch = #dsbatch{preconditions = [_ | _]}, Opts) ->
|
||||
%% NOTE: Atomic batch is implied, will not check with DB config.
|
||||
store_batch_atomic(DB, Batch, Opts);
|
||||
store_batch(DB, Batch, Opts) ->
|
||||
case emqx_ds_replication_layer_meta:db_config(DB) of
|
||||
#{atomic_batches := true} ->
|
||||
store_batch_atomic(DB, Batch, Opts);
|
||||
#{} ->
|
||||
store_batch_buffered(DB, Batch, Opts)
|
||||
end.
|
||||
|
||||
store_batch_buffered(DB, #dsbatch{operations = Operations}, Opts) ->
|
||||
store_batch_buffered(DB, Operations, Opts);
|
||||
store_batch_buffered(DB, Batch, Opts) ->
|
||||
try
|
||||
emqx_ds_buffer:store_batch(DB, Messages, Opts)
|
||||
emqx_ds_buffer:store_batch(DB, Batch, Opts)
|
||||
catch
|
||||
error:{Reason, _Call} when Reason == timeout; Reason == noproc ->
|
||||
{error, recoverable, Reason}
|
||||
end.
|
||||
|
||||
store_batch_atomic(DB, Batch, _Opts) ->
|
||||
Shards = shards_of_batch(DB, Batch),
|
||||
case Shards of
|
||||
[Shard] ->
|
||||
case ra_store_batch(DB, Shard, Batch) of
|
||||
{timeout, ServerId} ->
|
||||
{error, recoverable, {timeout, ServerId}};
|
||||
Result ->
|
||||
Result
|
||||
end;
|
||||
[] ->
|
||||
ok;
|
||||
[_ | _] ->
|
||||
{error, unrecoverable, atomic_batch_spans_multiple_shards}
|
||||
end.
|
||||
|
||||
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
|
||||
[{emqx_ds:stream_rank(), stream()}].
|
||||
get_streams(DB, TopicFilter, StartTime) ->
|
||||
|
@ -392,17 +426,49 @@ flush_buffer(DB, Shard, Messages, State) ->
|
|||
end,
|
||||
{State, Result}.
|
||||
|
||||
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) ->
|
||||
-spec shard_of_operation(
|
||||
emqx_ds:db(),
|
||||
emqx_ds:operation() | emqx_ds:precondition(),
|
||||
clientid | topic,
|
||||
_Options
|
||||
) ->
|
||||
emqx_ds_replication_layer:shard_id().
|
||||
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
|
||||
shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
|
||||
case SerializeBy of
|
||||
clientid -> Key = From;
|
||||
topic -> Key = Topic
|
||||
end,
|
||||
shard_of_key(DB, Key);
|
||||
shard_of_operation(DB, {_OpName, Matcher}, SerializeBy, _Options) ->
|
||||
#message_matcher{from = From, topic = Topic} = Matcher,
|
||||
case SerializeBy of
|
||||
clientid -> Key = From;
|
||||
topic -> Key = Topic
|
||||
end,
|
||||
shard_of_key(DB, Key).
|
||||
|
||||
shard_of_key(DB, Key) ->
|
||||
N = emqx_ds_replication_shard_allocator:n_shards(DB),
|
||||
Hash =
|
||||
case SerializeBy of
|
||||
clientid -> erlang:phash2(From, N);
|
||||
topic -> erlang:phash2(Topic, N)
|
||||
end,
|
||||
Hash = erlang:phash2(Key, N),
|
||||
integer_to_binary(Hash).
|
||||
|
||||
shards_of_batch(DB, #dsbatch{operations = Operations, preconditions = Preconditions}) ->
|
||||
shards_of_batch(DB, Preconditions, shards_of_batch(DB, Operations, []));
|
||||
shards_of_batch(DB, Operations) ->
|
||||
shards_of_batch(DB, Operations, []).
|
||||
|
||||
shards_of_batch(DB, [Operation | Rest], Acc) ->
|
||||
case shard_of_operation(DB, Operation, clientid, #{}) of
|
||||
Shard when Shard =:= hd(Acc) ->
|
||||
shards_of_batch(DB, Rest, Acc);
|
||||
Shard when Acc =:= [] ->
|
||||
shards_of_batch(DB, Rest, [Shard]);
|
||||
ShardAnother ->
|
||||
[ShardAnother | Acc]
|
||||
end;
|
||||
shards_of_batch(_DB, [], Acc) ->
|
||||
Acc.
|
||||
|
||||
%%================================================================================
|
||||
%% Internal exports (RPC targets)
|
||||
%%================================================================================
|
||||
|
@ -612,7 +678,7 @@ list_nodes() ->
|
|||
-define(SHARD_RPC(DB, SHARD, NODE, BODY),
|
||||
case
|
||||
emqx_ds_replication_layer_shard:servers(
|
||||
DB, SHARD, application:get_env(emqx_durable_storage, reads, leader_preferred)
|
||||
DB, SHARD, application:get_env(emqx_ds_builtin_raft, reads, leader_preferred)
|
||||
)
|
||||
of
|
||||
[{_, NODE} | _] ->
|
||||
|
@ -624,13 +690,22 @@ list_nodes() ->
|
|||
end
|
||||
).
|
||||
|
||||
-spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) ->
|
||||
ok | {timeout, _} | {error, recoverable | unrecoverable, _Err}.
|
||||
ra_store_batch(DB, Shard, Messages) ->
|
||||
Command = #{
|
||||
?tag => ?BATCH,
|
||||
?batch_messages => Messages
|
||||
},
|
||||
-spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:batch()) ->
|
||||
ok | {timeout, _} | emqx_ds:error(_).
|
||||
ra_store_batch(DB, Shard, Batch) ->
|
||||
case Batch of
|
||||
#dsbatch{operations = Operations, preconditions = Preconditions} ->
|
||||
Command = #{
|
||||
?tag => ?BATCH,
|
||||
?batch_operations => Operations,
|
||||
?batch_preconditions => Preconditions
|
||||
};
|
||||
Operations ->
|
||||
Command = #{
|
||||
?tag => ?BATCH,
|
||||
?batch_operations => Operations
|
||||
}
|
||||
end,
|
||||
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
|
||||
case emqx_ds_replication_layer_shard:process_command(Servers, Command, ?RA_TIMEOUT) of
|
||||
{ok, Result, _Leader} ->
|
||||
|
@ -767,6 +842,7 @@ ra_drop_shard(DB, Shard) ->
|
|||
|
||||
-define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release').
|
||||
-define(pd_ra_bytes_need_release, '$emqx_ds_raft_bytes_need_release').
|
||||
-define(pd_ra_force_monotonic, '$emqx_ds_raft_force_monotonic').
|
||||
|
||||
-spec init(_Args :: map()) -> ra_state().
|
||||
init(#{db := DB, shard := Shard}) ->
|
||||
|
@ -776,18 +852,30 @@ init(#{db := DB, shard := Shard}) ->
|
|||
{ra_state(), _Reply, _Effects}.
|
||||
apply(
|
||||
RaftMeta,
|
||||
#{
|
||||
Command = #{
|
||||
?tag := ?BATCH,
|
||||
?batch_messages := MessagesIn
|
||||
?batch_operations := OperationsIn
|
||||
},
|
||||
#{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0
|
||||
) ->
|
||||
?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}),
|
||||
{Stats, Latest, Messages} = assign_timestamps(Latest0, MessagesIn),
|
||||
Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{durable => false}),
|
||||
State = State0#{latest := Latest},
|
||||
set_ts(DBShard, Latest),
|
||||
Effects = try_release_log(Stats, RaftMeta, State),
|
||||
?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => OperationsIn, latest => Latest0}),
|
||||
Preconditions = maps:get(?batch_preconditions, Command, []),
|
||||
{Stats, Latest, Operations} = assign_timestamps(DB, Latest0, OperationsIn),
|
||||
%% FIXME
|
||||
case emqx_ds_precondition:verify(emqx_ds_storage_layer, DBShard, Preconditions) of
|
||||
ok ->
|
||||
Result = emqx_ds_storage_layer:store_batch(DBShard, Operations, #{durable => false}),
|
||||
State = State0#{latest := Latest},
|
||||
set_ts(DBShard, Latest),
|
||||
Effects = try_release_log(Stats, RaftMeta, State);
|
||||
PreconditionFailed = {precondition_failed, _} ->
|
||||
Result = {error, unrecoverable, PreconditionFailed},
|
||||
State = State0,
|
||||
Effects = [];
|
||||
Result ->
|
||||
State = State0,
|
||||
Effects = []
|
||||
end,
|
||||
Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
|
||||
{State, Result, Effects};
|
||||
apply(
|
||||
|
@ -862,6 +950,21 @@ apply(
|
|||
Effects = handle_custom_event(DBShard, Latest, CustomEvent),
|
||||
{State#{latest => Latest}, ok, Effects}.
|
||||
|
||||
assign_timestamps(DB, Latest, Messages) ->
|
||||
ForceMonotonic = force_monotonic_timestamps(DB),
|
||||
assign_timestamps(ForceMonotonic, Latest, Messages, [], 0, 0).
|
||||
|
||||
force_monotonic_timestamps(DB) ->
|
||||
case erlang:get(?pd_ra_force_monotonic) of
|
||||
undefined ->
|
||||
DBConfig = emqx_ds_replication_layer_meta:db_config(DB),
|
||||
Flag = maps:get(force_monotonic_timestamps, DBConfig),
|
||||
erlang:put(?pd_ra_force_monotonic, Flag);
|
||||
Flag ->
|
||||
ok
|
||||
end,
|
||||
Flag.
|
||||
|
||||
try_release_log({_N, BatchSize}, RaftMeta = #{index := CurrentIdx}, State) ->
|
||||
%% NOTE
|
||||
%% Because cursor release means storage flush (see
|
||||
|
@ -924,10 +1027,7 @@ tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
|
|||
?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}),
|
||||
handle_custom_event(DBShard, Timestamp, tick).
|
||||
|
||||
assign_timestamps(Latest, Messages) ->
|
||||
assign_timestamps(Latest, Messages, [], 0, 0).
|
||||
|
||||
assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) ->
|
||||
assign_timestamps(true, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
|
||||
case emqx_message:timestamp(Message0, microsecond) of
|
||||
TimestampUs when TimestampUs > Latest0 ->
|
||||
Latest = TimestampUs,
|
||||
|
@ -936,8 +1036,17 @@ assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) ->
|
|||
Latest = Latest0 + 1,
|
||||
Message = assign_timestamp(Latest, Message0)
|
||||
end,
|
||||
assign_timestamps(Latest, Rest, [Message | Acc], N + 1, Sz + approx_message_size(Message0));
|
||||
assign_timestamps(Latest, [], Acc, N, Size) ->
|
||||
MSize = approx_message_size(Message0),
|
||||
assign_timestamps(true, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
|
||||
assign_timestamps(false, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
|
||||
TimestampUs = emqx_message:timestamp(Message0),
|
||||
Latest = max(Latest0, TimestampUs),
|
||||
Message = assign_timestamp(TimestampUs, Message0),
|
||||
MSize = approx_message_size(Message0),
|
||||
assign_timestamps(false, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
|
||||
assign_timestamps(ForceMonotonic, Latest, [Operation | Rest], Acc, N, Sz) ->
|
||||
assign_timestamps(ForceMonotonic, Latest, Rest, [Operation | Acc], N + 1, Sz);
|
||||
assign_timestamps(_ForceMonotonic, Latest, [], Acc, N, Size) ->
|
||||
{{N, Size}, Latest, lists:reverse(Acc)}.
|
||||
|
||||
assign_timestamp(TimestampUs, Message) ->
|
||||
|
|
|
@ -19,7 +19,8 @@
|
|||
-define(enc, 3).
|
||||
|
||||
%% ?BATCH
|
||||
-define(batch_messages, 2).
|
||||
-define(batch_operations, 2).
|
||||
-define(batch_preconditions, 4).
|
||||
-define(timestamp, 3).
|
||||
|
||||
%% add_generation / update_config
|
||||
|
|
|
@ -56,6 +56,7 @@
|
|||
topic/0,
|
||||
batch/0,
|
||||
operation/0,
|
||||
deletion/0,
|
||||
precondition/0,
|
||||
stream/0,
|
||||
delete_stream/0,
|
||||
|
@ -110,7 +111,9 @@
|
|||
message()
|
||||
%% Delete a message.
|
||||
%% Does nothing if the message does not exist.
|
||||
| {delete, message_matcher('_')}.
|
||||
| deletion().
|
||||
|
||||
-type deletion() :: {delete, message_matcher('_')}.
|
||||
|
||||
%% Precondition.
|
||||
%% Fails whole batch if the storage already has the matching message (`if_exists'),
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
%% API:
|
||||
-export([start_link/4, store_batch/3, shard_of_message/3]).
|
||||
-export([start_link/4, store_batch/3, shard_of_operation/3]).
|
||||
-export([ls/0]).
|
||||
|
||||
%% behavior callbacks:
|
||||
|
@ -46,19 +46,18 @@
|
|||
-define(cbm(DB), {?MODULE, DB}).
|
||||
|
||||
-record(enqueue_req, {
|
||||
messages :: [emqx_types:message()],
|
||||
operations :: [emqx_ds:operation()],
|
||||
sync :: boolean(),
|
||||
atomic :: boolean(),
|
||||
n_messages :: non_neg_integer(),
|
||||
n_operations :: non_neg_integer(),
|
||||
payload_bytes :: non_neg_integer()
|
||||
}).
|
||||
|
||||
-callback init_buffer(emqx_ds:db(), _Shard, _Options) -> {ok, _State}.
|
||||
|
||||
-callback flush_buffer(emqx_ds:db(), _Shard, [emqx_types:message()], State) ->
|
||||
-callback flush_buffer(emqx_ds:db(), _Shard, [emqx_ds:operation()], State) ->
|
||||
{State, ok | {error, recoverable | unrecoverable, _}}.
|
||||
|
||||
-callback shard_of_message(emqx_ds:db(), emqx_types:message(), topic | clientid, _Options) ->
|
||||
-callback shard_of_operation(emqx_ds:db(), emqx_ds:operation(), topic | clientid, _Options) ->
|
||||
_Shard.
|
||||
|
||||
%%================================================================================
|
||||
|
@ -77,39 +76,33 @@ start_link(CallbackModule, CallbackOptions, DB, Shard) ->
|
|||
?via(DB, Shard), ?MODULE, [CallbackModule, CallbackOptions, DB, Shard], []
|
||||
).
|
||||
|
||||
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
|
||||
-spec store_batch(emqx_ds:db(), [emqx_ds:operation()], emqx_ds:message_store_opts()) ->
|
||||
emqx_ds:store_batch_result().
|
||||
store_batch(DB, Messages, Opts) ->
|
||||
store_batch(DB, Operations, Opts) ->
|
||||
Sync = maps:get(sync, Opts, true),
|
||||
Atomic = maps:get(atomic, Opts, false),
|
||||
%% Usually we expect all messages in the batch to go into the
|
||||
%% single shard, so this function is optimized for the happy case.
|
||||
case shards_of_batch(DB, Messages) of
|
||||
[{Shard, {NMsgs, NBytes}}] ->
|
||||
case shards_of_batch(DB, Operations) of
|
||||
[{Shard, {NOps, NBytes}}] ->
|
||||
%% Happy case:
|
||||
enqueue_call_or_cast(
|
||||
?via(DB, Shard),
|
||||
#enqueue_req{
|
||||
messages = Messages,
|
||||
operations = Operations,
|
||||
sync = Sync,
|
||||
atomic = Atomic,
|
||||
n_messages = NMsgs,
|
||||
n_operations = NOps,
|
||||
payload_bytes = NBytes
|
||||
}
|
||||
);
|
||||
[_, _ | _] when Atomic ->
|
||||
%% It's impossible to commit a batch to multiple shards
|
||||
%% atomically
|
||||
{error, unrecoverable, atomic_commit_to_multiple_shards};
|
||||
_Shards ->
|
||||
%% Use a slower implementation for the unlikely case:
|
||||
repackage_messages(DB, Messages, Sync)
|
||||
repackage_messages(DB, Operations, Sync)
|
||||
end.
|
||||
|
||||
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> _Shard.
|
||||
shard_of_message(DB, Message, ShardBy) ->
|
||||
-spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic) -> _Shard.
|
||||
shard_of_operation(DB, Operation, ShardBy) ->
|
||||
{CBM, Options} = persistent_term:get(?cbm(DB)),
|
||||
CBM:shard_of_message(DB, Message, ShardBy, Options).
|
||||
CBM:shard_of_operation(DB, Operation, ShardBy, Options).
|
||||
|
||||
%%================================================================================
|
||||
%% behavior callbacks
|
||||
|
@ -129,7 +122,7 @@ shard_of_message(DB, Message, ShardBy) ->
|
|||
n = 0 :: non_neg_integer(),
|
||||
n_bytes = 0 :: non_neg_integer(),
|
||||
tref :: undefined | reference(),
|
||||
queue :: queue:queue(emqx_types:message()),
|
||||
queue :: queue:queue(emqx_ds:operation()),
|
||||
pending_replies = [] :: [gen_server:from()]
|
||||
}).
|
||||
|
||||
|
@ -168,31 +161,29 @@ format_status(Status) ->
|
|||
|
||||
handle_call(
|
||||
#enqueue_req{
|
||||
messages = Msgs,
|
||||
operations = Operations,
|
||||
sync = Sync,
|
||||
atomic = Atomic,
|
||||
n_messages = NMsgs,
|
||||
n_operations = NOps,
|
||||
payload_bytes = NBytes
|
||||
},
|
||||
From,
|
||||
S0 = #s{pending_replies = Replies0}
|
||||
) ->
|
||||
S = S0#s{pending_replies = [From | Replies0]},
|
||||
{noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)};
|
||||
{noreply, enqueue(Sync, Operations, NOps, NBytes, S)};
|
||||
handle_call(_Call, _From, S) ->
|
||||
{reply, {error, unknown_call}, S}.
|
||||
|
||||
handle_cast(
|
||||
#enqueue_req{
|
||||
messages = Msgs,
|
||||
operations = Operations,
|
||||
sync = Sync,
|
||||
atomic = Atomic,
|
||||
n_messages = NMsgs,
|
||||
n_operations = NOps,
|
||||
payload_bytes = NBytes
|
||||
},
|
||||
S
|
||||
) ->
|
||||
{noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)};
|
||||
{noreply, enqueue(Sync, Operations, NOps, NBytes, S)};
|
||||
handle_cast(_Cast, S) ->
|
||||
{noreply, S}.
|
||||
|
||||
|
@ -215,11 +206,10 @@ terminate(_Reason, #s{db = DB}) ->
|
|||
|
||||
enqueue(
|
||||
Sync,
|
||||
Atomic,
|
||||
Msgs,
|
||||
Ops,
|
||||
BatchSize,
|
||||
BatchBytes,
|
||||
S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0}
|
||||
S0 = #s{n = NOps0, n_bytes = NBytes0, queue = Q0}
|
||||
) ->
|
||||
%% At this point we don't split the batches, even when they aren't
|
||||
%% atomic. It wouldn't win us anything in terms of memory, and
|
||||
|
@ -227,18 +217,18 @@ enqueue(
|
|||
%% granularity should be fine enough.
|
||||
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
|
||||
NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity),
|
||||
NMsgs = NMsgs0 + BatchSize,
|
||||
NMsgs = NOps0 + BatchSize,
|
||||
NBytes = NBytes0 + BatchBytes,
|
||||
case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of
|
||||
case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NOps0 > 0) of
|
||||
true ->
|
||||
%% Adding this batch would cause buffer to overflow. Flush
|
||||
%% it now, and retry:
|
||||
S1 = flush(S0),
|
||||
enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1);
|
||||
enqueue(Sync, Ops, BatchSize, BatchBytes, S1);
|
||||
false ->
|
||||
%% The buffer is empty, we enqueue the atomic batch in its
|
||||
%% entirety:
|
||||
Q1 = lists:foldl(fun queue:in/2, Q0, Msgs),
|
||||
Q1 = lists:foldl(fun queue:in/2, Q0, Ops),
|
||||
S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1},
|
||||
case NMsgs >= NMax orelse NBytes >= NBytesMax of
|
||||
true ->
|
||||
|
@ -336,18 +326,18 @@ do_flush(
|
|||
}
|
||||
end.
|
||||
|
||||
-spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) ->
|
||||
-spec shards_of_batch(emqx_ds:db(), [emqx_ds:operation()]) ->
|
||||
[{_ShardId, {NMessages, NBytes}}]
|
||||
when
|
||||
NMessages :: non_neg_integer(),
|
||||
NBytes :: non_neg_integer().
|
||||
shards_of_batch(DB, Messages) ->
|
||||
shards_of_batch(DB, Batch) ->
|
||||
maps:to_list(
|
||||
lists:foldl(
|
||||
fun(Message, Acc) ->
|
||||
fun(Operation, Acc) ->
|
||||
%% TODO: sharding strategy must be part of the DS DB schema:
|
||||
Shard = shard_of_message(DB, Message, clientid),
|
||||
Size = payload_size(Message),
|
||||
Shard = shard_of_operation(DB, Operation, clientid),
|
||||
Size = payload_size(Operation),
|
||||
maps:update_with(
|
||||
Shard,
|
||||
fun({N, S}) ->
|
||||
|
@ -358,36 +348,35 @@ shards_of_batch(DB, Messages) ->
|
|||
)
|
||||
end,
|
||||
#{},
|
||||
Messages
|
||||
Batch
|
||||
)
|
||||
).
|
||||
|
||||
repackage_messages(DB, Messages, Sync) ->
|
||||
repackage_messages(DB, Batch, Sync) ->
|
||||
Batches = lists:foldl(
|
||||
fun(Message, Acc) ->
|
||||
Shard = shard_of_message(DB, Message, clientid),
|
||||
Size = payload_size(Message),
|
||||
fun(Operation, Acc) ->
|
||||
Shard = shard_of_operation(DB, Operation, clientid),
|
||||
Size = payload_size(Operation),
|
||||
maps:update_with(
|
||||
Shard,
|
||||
fun({N, S, Msgs}) ->
|
||||
{N + 1, S + Size, [Message | Msgs]}
|
||||
{N + 1, S + Size, [Operation | Msgs]}
|
||||
end,
|
||||
{1, Size, [Message]},
|
||||
{1, Size, [Operation]},
|
||||
Acc
|
||||
)
|
||||
end,
|
||||
#{},
|
||||
Messages
|
||||
Batch
|
||||
),
|
||||
maps:fold(
|
||||
fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) ->
|
||||
fun(Shard, {NOps, ByteSize, RevOperations}, ErrAcc) ->
|
||||
Err = enqueue_call_or_cast(
|
||||
?via(DB, Shard),
|
||||
#enqueue_req{
|
||||
messages = lists:reverse(RevMessages),
|
||||
operations = lists:reverse(RevOperations),
|
||||
sync = Sync,
|
||||
atomic = false,
|
||||
n_messages = NMsgs,
|
||||
n_operations = NOps,
|
||||
payload_bytes = ByteSize
|
||||
}
|
||||
),
|
||||
|
@ -427,4 +416,6 @@ cancel_timer(S = #s{tref = TRef}) ->
|
|||
%% @doc Return approximate size of the MQTT message (it doesn't take
|
||||
%% all things into account, for example headers and extras)
|
||||
payload_size(#message{payload = P, topic = T}) ->
|
||||
size(P) + size(T).
|
||||
size(P) + size(T);
|
||||
payload_size({_OpName, _}) ->
|
||||
0.
|
||||
|
|
|
@ -0,0 +1,184 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_ds_precondition).
|
||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||
-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
|
||||
|
||||
-export([verify/3]).
|
||||
-export([matches/2]).
|
||||
|
||||
-export_type([matcher/0, mismatch/0]).
|
||||
|
||||
-type matcher() :: #message_matcher{}.
|
||||
-type mismatch() :: emqx_types:message() | not_found.
|
||||
|
||||
-callback lookup_message(_Ctx, matcher()) ->
|
||||
emqx_types:message() | not_found | emqx_ds:error(_).
|
||||
|
||||
%%
|
||||
|
||||
-spec verify(module(), _Ctx, [emqx_ds:precondition()]) ->
|
||||
ok | {precondition_failed, mismatch()} | emqx_ds:error(_).
|
||||
verify(Mod, Ctx, [_Precondition = {Cond, Msg} | Rest]) ->
|
||||
case verify_precondition(Mod, Ctx, Cond, Msg) of
|
||||
ok ->
|
||||
verify(Mod, Ctx, Rest);
|
||||
Failed ->
|
||||
Failed
|
||||
end;
|
||||
verify(_Mod, _Ctx, []) ->
|
||||
ok.
|
||||
|
||||
verify_precondition(Mod, Ctx, if_exists, Matcher) ->
|
||||
case Mod:lookup_message(Ctx, Matcher) of
|
||||
Msg = #message{} ->
|
||||
verify_match(Msg, Matcher);
|
||||
not_found ->
|
||||
{precondition_failed, not_found};
|
||||
Error = {error, _, _} ->
|
||||
Error
|
||||
end;
|
||||
verify_precondition(Mod, Ctx, unless_exists, Matcher) ->
|
||||
case Mod:lookup_message(Ctx, Matcher) of
|
||||
Msg = #message{} ->
|
||||
verify_nomatch(Msg, Matcher);
|
||||
not_found ->
|
||||
ok;
|
||||
Error = {error, _, _} ->
|
||||
Error
|
||||
end.
|
||||
|
||||
verify_match(Msg, Matcher) ->
|
||||
case matches(Msg, Matcher) of
|
||||
true -> ok;
|
||||
false -> {precondition_failed, Msg}
|
||||
end.
|
||||
|
||||
verify_nomatch(Msg, Matcher) ->
|
||||
case matches(Msg, Matcher) of
|
||||
false -> ok;
|
||||
true -> {precondition_failed, Msg}
|
||||
end.
|
||||
|
||||
-spec matches(emqx_types:message(), matcher()) -> boolean().
|
||||
matches(
|
||||
Message,
|
||||
#message_matcher{from = From, topic = Topic, payload = Pat, headers = Headers}
|
||||
) ->
|
||||
case Message of
|
||||
#message{from = From, topic = Topic} when Pat =:= '_' ->
|
||||
matches_headers(Message, Headers);
|
||||
#message{from = From, topic = Topic, payload = Pat} ->
|
||||
matches_headers(Message, Headers);
|
||||
_ ->
|
||||
false
|
||||
end.
|
||||
|
||||
matches_headers(_Message, MatchHeaders) when map_size(MatchHeaders) =:= 0 ->
|
||||
true;
|
||||
matches_headers(#message{headers = Headers}, MatchHeaders) ->
|
||||
maps:intersect(MatchHeaders, Headers) =:= MatchHeaders.
|
||||
|
||||
%% Basic tests
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-compile(export_all).
|
||||
|
||||
conjunction_test() ->
|
||||
%% Contradictory preconditions, always false.
|
||||
Preconditions = [
|
||||
{if_exists, matcher(<<"c1">>, <<"t/1">>, 0, '_')},
|
||||
{unless_exists, matcher(<<"c1">>, <<"t/1">>, 0, '_')}
|
||||
],
|
||||
?assertEqual(
|
||||
{precondition_failed, not_found},
|
||||
verify(?MODULE, [], Preconditions)
|
||||
),
|
||||
%% Check that the order does not matter.
|
||||
?assertEqual(
|
||||
{precondition_failed, not_found},
|
||||
verify(?MODULE, [], lists:reverse(Preconditions))
|
||||
),
|
||||
?assertEqual(
|
||||
{precondition_failed, message(<<"c1">>, <<"t/1">>, 0, <<>>)},
|
||||
verify(
|
||||
?MODULE,
|
||||
[message(<<"c1">>, <<"t/1">>, 0, <<>>)],
|
||||
Preconditions
|
||||
)
|
||||
).
|
||||
|
||||
matches_test() ->
|
||||
?assert(
|
||||
matches(
|
||||
message(<<"mtest1">>, <<"t/same">>, 12345, <<?MODULE_STRING>>),
|
||||
matcher(<<"mtest1">>, <<"t/same">>, 12345, '_')
|
||||
)
|
||||
).
|
||||
|
||||
matches_headers_test() ->
|
||||
?assert(
|
||||
matches(
|
||||
message(<<"mtest2">>, <<"t/same">>, 23456, <<?MODULE_STRING>>, #{h1 => 42, h2 => <<>>}),
|
||||
matcher(<<"mtest2">>, <<"t/same">>, 23456, '_', #{h2 => <<>>})
|
||||
)
|
||||
).
|
||||
|
||||
mismatches_headers_test() ->
|
||||
?assertNot(
|
||||
matches(
|
||||
message(<<"mtest3">>, <<"t/same">>, 23456, <<?MODULE_STRING>>, #{h1 => 42, h2 => <<>>}),
|
||||
matcher(<<"mtest3">>, <<"t/same">>, 23456, '_', #{h2 => <<>>, h3 => <<"required">>})
|
||||
)
|
||||
).
|
||||
|
||||
matcher(ClientID, Topic, TS, Payload) ->
|
||||
matcher(ClientID, Topic, TS, Payload, #{}).
|
||||
|
||||
matcher(ClientID, Topic, TS, Payload, Headers) ->
|
||||
#message_matcher{
|
||||
from = ClientID,
|
||||
topic = Topic,
|
||||
timestamp = TS,
|
||||
payload = Payload,
|
||||
headers = Headers
|
||||
}.
|
||||
|
||||
message(ClientID, Topic, TS, Payload) ->
|
||||
message(ClientID, Topic, TS, Payload, #{}).
|
||||
|
||||
message(ClientID, Topic, TS, Payload, Headers) ->
|
||||
#message{
|
||||
id = <<>>,
|
||||
qos = 0,
|
||||
from = ClientID,
|
||||
topic = Topic,
|
||||
timestamp = TS,
|
||||
payload = Payload,
|
||||
headers = Headers
|
||||
}.
|
||||
|
||||
lookup_message(Messages, Matcher) ->
|
||||
case lists:search(fun(M) -> matches(M, Matcher) end, Messages) of
|
||||
{value, Message} ->
|
||||
Message;
|
||||
false ->
|
||||
not_found
|
||||
end.
|
||||
|
||||
-endif.
|
|
@ -37,6 +37,7 @@
|
|||
update_iterator/4,
|
||||
next/6,
|
||||
delete_next/7,
|
||||
lookup_message/3,
|
||||
|
||||
handle_event/4
|
||||
]).
|
||||
|
@ -46,6 +47,7 @@
|
|||
|
||||
-export_type([options/0]).
|
||||
|
||||
-include("emqx_ds.hrl").
|
||||
-include("emqx_ds_metrics.hrl").
|
||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
|
@ -68,10 +70,13 @@
|
|||
-define(start_time, 3).
|
||||
-define(storage_key, 4).
|
||||
-define(last_seen_key, 5).
|
||||
-define(cooked_payloads, 6).
|
||||
-define(cooked_msg_ops, 6).
|
||||
-define(cooked_lts_ops, 7).
|
||||
-define(cooked_ts, 8).
|
||||
|
||||
%% atoms:
|
||||
-define(delete, 100).
|
||||
|
||||
-type options() ::
|
||||
#{
|
||||
bits_per_wildcard_level => pos_integer(),
|
||||
|
@ -110,7 +115,7 @@
|
|||
|
||||
-type cooked_batch() ::
|
||||
#{
|
||||
?cooked_payloads := [{binary(), binary()}],
|
||||
?cooked_msg_ops := [{binary(), binary() | ?delete}],
|
||||
?cooked_lts_ops := [{binary(), binary()}],
|
||||
?cooked_ts := integer()
|
||||
}.
|
||||
|
@ -271,24 +276,28 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) ->
|
|||
-spec prepare_batch(
|
||||
emqx_ds_storage_layer:shard_id(),
|
||||
s(),
|
||||
[{emqx_ds:time(), emqx_types:message()}, ...],
|
||||
emqx_ds_storage_layer:batch(),
|
||||
emqx_ds_storage_layer:batch_store_opts()
|
||||
) ->
|
||||
{ok, cooked_batch()}.
|
||||
prepare_batch(_ShardId, S, Messages, _Options) ->
|
||||
prepare_batch(_ShardId, S, Batch, _Options) ->
|
||||
_ = erase(?lts_persist_ops),
|
||||
{Payloads, MaxTs} =
|
||||
{Operations, MaxTs} =
|
||||
lists:mapfoldl(
|
||||
fun({Timestamp, Msg}, Acc) ->
|
||||
{Key, _} = make_key(S, Timestamp, Msg),
|
||||
Payload = {Key, message_to_value_v1(Msg)},
|
||||
{Payload, max(Acc, Timestamp)}
|
||||
fun
|
||||
({Timestamp, Msg = #message{topic = Topic}}, Acc) ->
|
||||
{Key, _} = make_key(S, Timestamp, Topic),
|
||||
Op = {Key, message_to_value_v1(Msg)},
|
||||
{Op, max(Acc, Timestamp)};
|
||||
({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}, Acc) ->
|
||||
{Key, _} = make_key(S, Timestamp, Topic),
|
||||
{_Op = {Key, ?delete}, Acc}
|
||||
end,
|
||||
0,
|
||||
Messages
|
||||
Batch
|
||||
),
|
||||
{ok, #{
|
||||
?cooked_payloads => Payloads,
|
||||
?cooked_msg_ops => Operations,
|
||||
?cooked_lts_ops => pop_lts_persist_ops(),
|
||||
?cooked_ts => MaxTs
|
||||
}}.
|
||||
|
@ -302,7 +311,7 @@ prepare_batch(_ShardId, S, Messages, _Options) ->
|
|||
commit_batch(
|
||||
_ShardId,
|
||||
_Data,
|
||||
#{?cooked_payloads := [], ?cooked_lts_ops := LTS},
|
||||
#{?cooked_msg_ops := [], ?cooked_lts_ops := LTS},
|
||||
_Options
|
||||
) ->
|
||||
%% Assert:
|
||||
|
@ -311,7 +320,7 @@ commit_batch(
|
|||
commit_batch(
|
||||
_ShardId,
|
||||
#s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars},
|
||||
#{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs},
|
||||
#{?cooked_lts_ops := LtsOps, ?cooked_msg_ops := Operations, ?cooked_ts := MaxTs},
|
||||
Options
|
||||
) ->
|
||||
{ok, Batch} = rocksdb:batch(),
|
||||
|
@ -326,10 +335,13 @@ commit_batch(
|
|||
_ = emqx_ds_lts:trie_update(Trie, LtsOps),
|
||||
%% Commit payloads:
|
||||
lists:foreach(
|
||||
fun({Key, Val}) ->
|
||||
ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val))
|
||||
fun
|
||||
({Key, Val}) when is_tuple(Val) ->
|
||||
ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val));
|
||||
({Key, ?delete}) ->
|
||||
ok = rocksdb:batch_delete(Batch, DataCF, Key)
|
||||
end,
|
||||
Payloads
|
||||
Operations
|
||||
),
|
||||
Result = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)),
|
||||
rocksdb:release_batch(Batch),
|
||||
|
@ -556,6 +568,23 @@ delete_next_until(
|
|||
rocksdb:iterator_close(ITHandle)
|
||||
end.
|
||||
|
||||
-spec lookup_message(emqx_ds_storage_layer:shard_id(), s(), emqx_ds_precondition:matcher()) ->
|
||||
emqx_types:message() | not_found | emqx_ds:error(_).
|
||||
lookup_message(
|
||||
_ShardId,
|
||||
S = #s{db = DB, data = CF},
|
||||
#message_matcher{topic = Topic, timestamp = Timestamp}
|
||||
) ->
|
||||
{Key, _} = make_key(S, Timestamp, Topic),
|
||||
case rocksdb:get(DB, CF, Key, _ReadOpts = []) of
|
||||
{ok, Blob} ->
|
||||
deserialize(Blob);
|
||||
not_found ->
|
||||
not_found;
|
||||
Error ->
|
||||
{error, unrecoverable, {rocksdb, Error}}
|
||||
end.
|
||||
|
||||
handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
|
||||
%% If the last message was published more than one epoch ago, and
|
||||
%% the shard remains idle, we need to advance safety cutoff
|
||||
|
@ -811,9 +840,9 @@ format_key(KeyMapper, Key) ->
|
|||
Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)],
|
||||
lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])).
|
||||
|
||||
-spec make_key(s(), emqx_ds:time(), emqx_types:message()) -> {binary(), [binary()]}.
|
||||
make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, #message{topic = TopicBin}) ->
|
||||
Tokens = emqx_topic:words(TopicBin),
|
||||
-spec make_key(s(), emqx_ds:time(), emqx_types:topic()) -> {binary(), [binary()]}.
|
||||
make_key(#s{keymappers = KeyMappers, trie = Trie}, Timestamp, Topic) ->
|
||||
Tokens = emqx_topic:words(Topic),
|
||||
{TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
|
||||
VaryingHashes = [hash_topic_level(I) || I <- Varying],
|
||||
KeyMapper = array:get(length(Varying), KeyMappers),
|
||||
|
|
|
@ -37,6 +37,9 @@
|
|||
next/4,
|
||||
delete_next/5,
|
||||
|
||||
%% Preconditions
|
||||
lookup_message/2,
|
||||
|
||||
%% Generations
|
||||
update_config/3,
|
||||
add_generation/2,
|
||||
|
@ -61,6 +64,7 @@
|
|||
-export_type([
|
||||
gen_id/0,
|
||||
generation/0,
|
||||
batch/0,
|
||||
cf_refs/0,
|
||||
stream/0,
|
||||
delete_stream/0,
|
||||
|
@ -74,6 +78,7 @@
|
|||
batch_store_opts/0
|
||||
]).
|
||||
|
||||
-include("emqx_ds.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
|
||||
|
@ -115,6 +120,11 @@
|
|||
|
||||
-type gen_id() :: 0..16#ffff.
|
||||
|
||||
-type batch() :: [
|
||||
{emqx_ds:time(), emqx_types:message()}
|
||||
| emqx_ds:deletion()
|
||||
].
|
||||
|
||||
%% Options affecting how batches should be stored.
|
||||
%% See also: `emqx_ds:message_store_opts()'.
|
||||
-type batch_store_opts() ::
|
||||
|
@ -294,6 +304,10 @@
|
|||
| {ok, end_of_stream}
|
||||
| emqx_ds:error(_).
|
||||
|
||||
%% Lookup a single message, for preconditions to work.
|
||||
-callback lookup_message(shard_id(), generation_data(), emqx_ds_precondition:matcher()) ->
|
||||
emqx_types:message() | not_found | emqx_ds:error(_).
|
||||
|
||||
-callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) ->
|
||||
[CustomEvent].
|
||||
|
||||
|
@ -317,14 +331,10 @@ drop_shard(Shard) ->
|
|||
|
||||
%% @doc This is a convenicence wrapper that combines `prepare' and
|
||||
%% `commit' operations.
|
||||
-spec store_batch(
|
||||
shard_id(),
|
||||
[{emqx_ds:time(), emqx_types:message()}],
|
||||
batch_store_opts()
|
||||
) ->
|
||||
-spec store_batch(shard_id(), batch(), batch_store_opts()) ->
|
||||
emqx_ds:store_batch_result().
|
||||
store_batch(Shard, Messages, Options) ->
|
||||
case prepare_batch(Shard, Messages, #{}) of
|
||||
store_batch(Shard, Batch, Options) ->
|
||||
case prepare_batch(Shard, Batch, #{}) of
|
||||
{ok, CookedBatch} ->
|
||||
commit_batch(Shard, CookedBatch, Options);
|
||||
ignore ->
|
||||
|
@ -342,23 +352,21 @@ store_batch(Shard, Messages, Options) ->
|
|||
%%
|
||||
%% The underlying storage layout MAY use timestamp as a unique message
|
||||
%% ID.
|
||||
-spec prepare_batch(
|
||||
shard_id(),
|
||||
[{emqx_ds:time(), emqx_types:message()}],
|
||||
batch_prepare_opts()
|
||||
) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_).
|
||||
prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
|
||||
-spec prepare_batch(shard_id(), batch(), batch_prepare_opts()) ->
|
||||
{ok, cooked_batch()} | ignore | emqx_ds:error(_).
|
||||
prepare_batch(Shard, Batch, Options) ->
|
||||
%% NOTE
|
||||
%% We assume that batches do not span generations. Callers should enforce this.
|
||||
?tp(emqx_ds_storage_layer_prepare_batch, #{
|
||||
shard => Shard, messages => Messages, options => Options
|
||||
shard => Shard, batch => Batch, options => Options
|
||||
}),
|
||||
%% FIXME: always store messages in the current generation
|
||||
case generation_at(Shard, Time) of
|
||||
Time = batch_starts_at(Batch),
|
||||
case is_integer(Time) andalso generation_at(Shard, Time) of
|
||||
{GenId, #{module := Mod, data := GenData}} ->
|
||||
T0 = erlang:monotonic_time(microsecond),
|
||||
Result =
|
||||
case Mod:prepare_batch(Shard, GenData, Messages, Options) of
|
||||
case Mod:prepare_batch(Shard, GenData, Batch, Options) of
|
||||
{ok, CookedBatch} ->
|
||||
{ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
|
||||
Error = {error, _, _} ->
|
||||
|
@ -368,11 +376,21 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
|
|||
%% TODO store->prepare
|
||||
emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
|
||||
Result;
|
||||
false ->
|
||||
%% No write operations in this batch.
|
||||
ignore;
|
||||
not_found ->
|
||||
%% Generation is likely already GCed.
|
||||
ignore
|
||||
end;
|
||||
prepare_batch(_Shard, [], _Options) ->
|
||||
ignore.
|
||||
end.
|
||||
|
||||
-spec batch_starts_at(batch()) -> emqx_ds:time() | undefined.
|
||||
batch_starts_at([{Time, _Message} | _]) when is_integer(Time) ->
|
||||
Time;
|
||||
batch_starts_at([{delete, #message_matcher{timestamp = Time}} | _]) ->
|
||||
Time;
|
||||
batch_starts_at([]) ->
|
||||
undefined.
|
||||
|
||||
%% @doc Commit cooked batch to the storage.
|
||||
%%
|
||||
|
@ -559,6 +577,16 @@ update_config(ShardId, Since, Options) ->
|
|||
add_generation(ShardId, Since) ->
|
||||
gen_server:call(?REF(ShardId), #call_add_generation{since = Since}, infinity).
|
||||
|
||||
-spec lookup_message(shard_id(), emqx_ds_precondition:matcher()) ->
|
||||
emqx_types:message() | not_found | emqx_ds:error(_).
|
||||
lookup_message(ShardId, Matcher = #message_matcher{timestamp = Time}) ->
|
||||
case generation_at(ShardId, Time) of
|
||||
{_GenId, #{module := Mod, data := GenData}} ->
|
||||
Mod:lookup_message(ShardId, GenData, Matcher);
|
||||
not_found ->
|
||||
not_found
|
||||
end.
|
||||
|
||||
-spec list_generations_with_lifetimes(shard_id()) ->
|
||||
#{
|
||||
gen_id() => #{
|
||||
|
|
|
@ -21,6 +21,8 @@
|
|||
%% used for testing.
|
||||
-module(emqx_ds_storage_reference).
|
||||
|
||||
-include("emqx_ds.hrl").
|
||||
|
||||
-behaviour(emqx_ds_storage_layer).
|
||||
|
||||
%% API:
|
||||
|
@ -39,7 +41,8 @@
|
|||
make_delete_iterator/5,
|
||||
update_iterator/4,
|
||||
next/6,
|
||||
delete_next/7
|
||||
delete_next/7,
|
||||
lookup_message/3
|
||||
]).
|
||||
|
||||
%% internal exports:
|
||||
|
@ -49,6 +52,8 @@
|
|||
|
||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||
|
||||
-define(DB_KEY(TIMESTAMP), <<TIMESTAMP:64>>).
|
||||
|
||||
%%================================================================================
|
||||
%% Type declarations
|
||||
%%================================================================================
|
||||
|
@ -102,23 +107,22 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
|
|||
ok = rocksdb:drop_column_family(DBHandle, CFHandle),
|
||||
ok.
|
||||
|
||||
prepare_batch(_ShardId, _Data, Messages, _Options) ->
|
||||
{ok, Messages}.
|
||||
prepare_batch(_ShardId, _Data, Batch, _Options) ->
|
||||
{ok, Batch}.
|
||||
|
||||
commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, Options) ->
|
||||
{ok, Batch} = rocksdb:batch(),
|
||||
lists:foreach(
|
||||
fun({TS, Msg}) ->
|
||||
Key = <<TS:64>>,
|
||||
Val = term_to_binary(Msg),
|
||||
rocksdb:batch_put(Batch, CF, Key, Val)
|
||||
end,
|
||||
Messages
|
||||
),
|
||||
Res = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)),
|
||||
rocksdb:release_batch(Batch),
|
||||
commit_batch(_ShardId, S = #s{db = DB}, Batch, Options) ->
|
||||
{ok, BatchHandle} = rocksdb:batch(),
|
||||
lists:foreach(fun(Op) -> process_batch_operation(S, Op, BatchHandle) end, Batch),
|
||||
Res = rocksdb:write_batch(DB, BatchHandle, write_batch_opts(Options)),
|
||||
rocksdb:release_batch(BatchHandle),
|
||||
Res.
|
||||
|
||||
process_batch_operation(S, {TS, Msg = #message{}}, BatchHandle) ->
|
||||
Val = encode_message(Msg),
|
||||
rocksdb:batch_put(BatchHandle, S#s.cf, ?DB_KEY(TS), Val);
|
||||
process_batch_operation(S, {delete, #message_matcher{timestamp = TS}}, BatchHandle) ->
|
||||
rocksdb:batch_delete(BatchHandle, S#s.cf, ?DB_KEY(TS)).
|
||||
|
||||
get_streams(_Shard, _Data, _TopicFilter, _StartTime) ->
|
||||
[#stream{}].
|
||||
|
||||
|
@ -205,6 +209,16 @@ delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now, IsCurr
|
|||
{ok, It, NumDeleted, NumIterated}
|
||||
end.
|
||||
|
||||
lookup_message(_ShardId, #s{db = DB, cf = CF}, #message_matcher{timestamp = TS}) ->
|
||||
case rocksdb:get(DB, CF, ?DB_KEY(TS), _ReadOpts = []) of
|
||||
{ok, Val} ->
|
||||
decode_message(Val);
|
||||
not_found ->
|
||||
not_found;
|
||||
{error, Reason} ->
|
||||
{error, unrecoverable, Reason}
|
||||
end.
|
||||
|
||||
%%================================================================================
|
||||
%% Internal functions
|
||||
%%================================================================================
|
||||
|
@ -214,7 +228,7 @@ do_next(_, _, _, _, 0, Key, Acc) ->
|
|||
do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) ->
|
||||
case rocksdb:iterator_move(IT, Action) of
|
||||
{ok, Key = <<TS:64>>, Blob} ->
|
||||
Msg = #message{topic = Topic} = binary_to_term(Blob),
|
||||
Msg = #message{topic = Topic} = decode_message(Blob),
|
||||
TopicWords = emqx_topic:words(Topic),
|
||||
case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of
|
||||
true ->
|
||||
|
@ -234,7 +248,7 @@ do_delete_next(
|
|||
) ->
|
||||
case rocksdb:iterator_move(IT, Action) of
|
||||
{ok, Key, Blob} ->
|
||||
Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob),
|
||||
Msg = #message{topic = Topic, timestamp = TS} = decode_message(Blob),
|
||||
TopicWords = emqx_topic:words(Topic),
|
||||
case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of
|
||||
true ->
|
||||
|
@ -285,6 +299,12 @@ do_delete_next(
|
|||
{Key0, {AccDel, AccIter}}
|
||||
end.
|
||||
|
||||
encode_message(Msg) ->
|
||||
term_to_binary(Msg).
|
||||
|
||||
decode_message(Val) ->
|
||||
binary_to_term(Val).
|
||||
|
||||
%% @doc Generate a column family ID for the MQTT messages
|
||||
-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
|
||||
data_cf(GenId) ->
|
||||
|
|
|
@ -33,7 +33,8 @@
|
|||
make_delete_iterator/5,
|
||||
update_iterator/4,
|
||||
next/6,
|
||||
delete_next/7
|
||||
delete_next/7,
|
||||
lookup_message/3
|
||||
]).
|
||||
|
||||
%% internal exports:
|
||||
|
@ -43,6 +44,7 @@
|
|||
|
||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
-include("emqx_ds.hrl").
|
||||
-include("emqx_ds_metrics.hrl").
|
||||
|
||||
-ifdef(TEST).
|
||||
|
@ -56,11 +58,12 @@
|
|||
%%================================================================================
|
||||
|
||||
%% TLOG entry
|
||||
%% keys:
|
||||
-define(cooked_payloads, 6).
|
||||
%% Keys:
|
||||
-define(cooked_msg_ops, 6).
|
||||
-define(cooked_lts_ops, 7).
|
||||
%% Payload:
|
||||
-define(cooked_payload(TIMESTAMP, STATIC, VARYING, VALUE),
|
||||
-define(cooked_delete, 100).
|
||||
-define(cooked_msg_op(TIMESTAMP, STATIC, VARYING, VALUE),
|
||||
{TIMESTAMP, STATIC, VARYING, VALUE}
|
||||
).
|
||||
|
||||
|
@ -176,25 +179,39 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{data_cf = DataCF, trie_cf = TrieCF,
|
|||
ok = rocksdb:drop_column_family(DBHandle, TrieCF),
|
||||
ok.
|
||||
|
||||
prepare_batch(_ShardId, S = #s{trie = Trie}, Messages, _Options) ->
|
||||
prepare_batch(
|
||||
_ShardId,
|
||||
S = #s{trie = Trie},
|
||||
Operations,
|
||||
_Options
|
||||
) ->
|
||||
_ = erase(?lts_persist_ops),
|
||||
Payloads = [
|
||||
begin
|
||||
Tokens = words(Topic),
|
||||
{Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
|
||||
?cooked_payload(Timestamp, Static, Varying, serialize(S, Varying, Msg))
|
||||
end
|
||||
|| {Timestamp, Msg = #message{topic = Topic}} <- Messages
|
||||
],
|
||||
OperationsCooked = emqx_utils:flattermap(
|
||||
fun
|
||||
({Timestamp, Msg = #message{topic = Topic}}) ->
|
||||
Tokens = words(Topic),
|
||||
{Static, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens),
|
||||
?cooked_msg_op(Timestamp, Static, Varying, serialize(S, Varying, Msg));
|
||||
({delete, #message_matcher{topic = Topic, timestamp = Timestamp}}) ->
|
||||
case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
|
||||
{ok, {Static, Varying}} ->
|
||||
?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete);
|
||||
undefined ->
|
||||
%% Topic is unknown, nothing to delete.
|
||||
[]
|
||||
end
|
||||
end,
|
||||
Operations
|
||||
),
|
||||
{ok, #{
|
||||
?cooked_payloads => Payloads,
|
||||
?cooked_msg_ops => OperationsCooked,
|
||||
?cooked_lts_ops => pop_lts_persist_ops()
|
||||
}}.
|
||||
|
||||
commit_batch(
|
||||
_ShardId,
|
||||
#s{db = DB, trie_cf = TrieCF, data_cf = DataCF, trie = Trie, hash_bytes = HashBytes},
|
||||
#{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads},
|
||||
#{?cooked_lts_ops := LtsOps, ?cooked_msg_ops := Operations},
|
||||
Options
|
||||
) ->
|
||||
{ok, Batch} = rocksdb:batch(),
|
||||
|
@ -210,12 +227,17 @@ commit_batch(
|
|||
_ = emqx_ds_lts:trie_update(Trie, LtsOps),
|
||||
%% Commit payloads:
|
||||
lists:foreach(
|
||||
fun(?cooked_payload(Timestamp, Static, Varying, ValBlob)) ->
|
||||
MasterKey = mk_key(Static, 0, <<>>, Timestamp),
|
||||
ok = rocksdb:batch_put(Batch, DataCF, MasterKey, ValBlob),
|
||||
mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp)
|
||||
fun
|
||||
(?cooked_msg_op(Timestamp, Static, Varying, ValBlob = <<_/bytes>>)) ->
|
||||
MasterKey = mk_key(Static, 0, <<>>, Timestamp),
|
||||
ok = rocksdb:batch_put(Batch, DataCF, MasterKey, ValBlob),
|
||||
mk_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp);
|
||||
(?cooked_msg_op(Timestamp, Static, Varying, ?cooked_delete)) ->
|
||||
MasterKey = mk_key(Static, 0, <<>>, Timestamp),
|
||||
ok = rocksdb:batch_delete(Batch, DataCF, MasterKey),
|
||||
delete_index(Batch, DataCF, HashBytes, Static, Varying, Timestamp)
|
||||
end,
|
||||
Payloads
|
||||
Operations
|
||||
),
|
||||
Result = rocksdb:write_batch(DB, Batch, [
|
||||
{disable_wal, not maps:get(durable, Options, true)}
|
||||
|
@ -285,6 +307,28 @@ delete_next(Shard, S, It0, Selector, BatchSize, Now, IsCurrent) ->
|
|||
Ret
|
||||
end.
|
||||
|
||||
lookup_message(
|
||||
Shard,
|
||||
S = #s{db = DB, data_cf = CF, trie = Trie},
|
||||
#message_matcher{topic = Topic, timestamp = Timestamp}
|
||||
) ->
|
||||
case emqx_ds_lts:lookup_topic_key(Trie, words(Topic)) of
|
||||
{ok, {StaticIdx, _Varying}} ->
|
||||
DSKey = mk_key(StaticIdx, 0, <<>>, Timestamp),
|
||||
case rocksdb:get(DB, CF, DSKey, _ReadOpts = []) of
|
||||
{ok, Val} ->
|
||||
{ok, TopicStructure} = emqx_ds_lts:reverse_lookup(Trie, StaticIdx),
|
||||
Msg = deserialize(S, Val),
|
||||
enrich(Shard, S, TopicStructure, DSKey, Msg);
|
||||
not_found ->
|
||||
not_found;
|
||||
{error, Reason} ->
|
||||
{error, unrecoverable, Reason}
|
||||
end;
|
||||
undefined ->
|
||||
not_found
|
||||
end.
|
||||
|
||||
%%================================================================================
|
||||
%% Internal exports
|
||||
%%================================================================================
|
||||
|
@ -330,12 +374,18 @@ serialize(#s{serialization_schema = SSchema, with_guid = WithGuid}, Varying, Msg
|
|||
},
|
||||
emqx_ds_msg_serializer:serialize(SSchema, Msg).
|
||||
|
||||
enrich(#ctx{shard = Shard, s = S, topic_structure = TopicStructure}, DSKey, Msg0) ->
|
||||
enrich(Shard, S, TopicStructure, DSKey, Msg0).
|
||||
|
||||
enrich(
|
||||
#ctx{shard = Shard, topic_structure = Structure, s = #s{with_guid = WithGuid}},
|
||||
Shard,
|
||||
#s{with_guid = WithGuid},
|
||||
TopicStructure,
|
||||
DSKey,
|
||||
Msg0
|
||||
) ->
|
||||
Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(Structure, words(Msg0#message.topic))),
|
||||
Tokens = words(Msg0#message.topic),
|
||||
Topic = emqx_topic:join(emqx_ds_lts:decompress_topic(TopicStructure, Tokens)),
|
||||
Msg0#message{
|
||||
topic = Topic,
|
||||
id =
|
||||
|
@ -584,6 +634,16 @@ mk_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) ->
|
|||
mk_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) ->
|
||||
ok.
|
||||
|
||||
delete_index(Batch, CF, HashBytes, Static, Varying, Timestamp) ->
|
||||
delete_index(Batch, CF, HashBytes, Static, Timestamp, 1, Varying).
|
||||
|
||||
delete_index(Batch, CF, HashBytes, Static, Timestamp, N, [TopicLevel | Varying]) ->
|
||||
Key = mk_key(Static, N, hash(HashBytes, TopicLevel), Timestamp),
|
||||
ok = rocksdb:batch_delete(Batch, CF, Key),
|
||||
delete_index(Batch, CF, HashBytes, Static, Timestamp, N + 1, Varying);
|
||||
delete_index(_Batch, _CF, _HashBytes, _Static, _Timestamp, _N, []) ->
|
||||
ok.
|
||||
|
||||
%%%%%%%% Keys %%%%%%%%%%
|
||||
|
||||
get_key_range(StaticIdx, WildcardIdx, Hash) ->
|
||||
|
|
|
@ -18,11 +18,14 @@
|
|||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include("emqx_ds.hrl").
|
||||
-include("../../emqx/include/emqx.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("stdlib/include/assert.hrl").
|
||||
|
||||
-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))).
|
||||
|
||||
-define(FUTURE, (1 bsl 64 - 1)).
|
||||
|
||||
-define(SHARD, shard(?FUNCTION_NAME)).
|
||||
|
@ -66,6 +69,30 @@ t_store(_Config) ->
|
|||
},
|
||||
?assertMatch(ok, emqx_ds:store_batch(?FUNCTION_NAME, [Msg])).
|
||||
|
||||
%% Smoke test of applying batch operations
|
||||
t_operations(db_config, _Config) ->
|
||||
#{force_monotonic_timestamps => false}.
|
||||
|
||||
t_operations(_Config) ->
|
||||
Batch1 = [
|
||||
make_message(100, <<"t/1">>, <<"M1">>),
|
||||
make_message(200, <<"t/2">>, <<"M2">>),
|
||||
make_message(300, <<"t/3">>, <<"M3">>)
|
||||
],
|
||||
Batch2 = [
|
||||
make_deletion(200, <<"t/2">>, <<"M2">>),
|
||||
make_deletion(300, <<"t/3">>, '_'),
|
||||
make_deletion(400, <<"t/4">>, '_')
|
||||
],
|
||||
?assertEqual(ok, emqx_ds:store_batch(?FUNCTION_NAME, Batch1)),
|
||||
?assertEqual(ok, emqx_ds:store_batch(?FUNCTION_NAME, Batch2)),
|
||||
?assertMatch(
|
||||
[
|
||||
#message{timestamp = 100, topic = <<"t/1">>, payload = <<"M1">>}
|
||||
],
|
||||
dump_messages(?SHARD, <<"t/#">>, 0)
|
||||
).
|
||||
|
||||
%% Smoke test for iteration through a concrete topic
|
||||
t_iterate(_Config) ->
|
||||
%% Prepare data:
|
||||
|
@ -124,8 +151,6 @@ t_delete(_Config) ->
|
|||
?assertNot(is_map_key(TopicToDelete, MessagesByTopic), #{msgs => MessagesByTopic}),
|
||||
?assertEqual(20, length(Messages)).
|
||||
|
||||
-define(assertSameSet(A, B), ?assertEqual(lists:sort(A), lists:sort(B))).
|
||||
|
||||
%% Smoke test that verifies that concrete topics are mapped to
|
||||
%% individual streams, unless there's too many of them.
|
||||
t_get_streams(Config) ->
|
||||
|
@ -417,79 +442,26 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) ->
|
|||
%% || Topic <- Topics, PublishedAt <- Timestamps
|
||||
%% ].
|
||||
|
||||
%% t_iterate_multigen(_Config) ->
|
||||
%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
||||
%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
|
||||
%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 1000, ?DEFAULT_CONFIG),
|
||||
%% Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"],
|
||||
%% Timestamps = lists:seq(1, 100),
|
||||
%% _ = [
|
||||
%% store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt}))
|
||||
%% || Topic <- Topics, PublishedAt <- Timestamps
|
||||
%% ],
|
||||
%% ?assertEqual(
|
||||
%% lists:sort([
|
||||
%% {Topic, PublishedAt}
|
||||
%% || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps
|
||||
%% ]),
|
||||
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)])
|
||||
%% ),
|
||||
%% ?assertEqual(
|
||||
%% lists:sort([
|
||||
%% {Topic, PublishedAt}
|
||||
%% || Topic <- ["a", "a/bar"], PublishedAt <- lists:seq(60, 100)
|
||||
%% ]),
|
||||
%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 60)])
|
||||
%% ).
|
||||
|
||||
%% t_iterate_multigen_preserve_restore(_Config) ->
|
||||
%% ReplayID = atom_to_binary(?FUNCTION_NAME),
|
||||
%% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 10, ?COMPACT_CONFIG),
|
||||
%% {ok, 2} = emqx_ds_storage_layer:create_generation(?SHARD, 50, ?DEFAULT_CONFIG),
|
||||
%% {ok, 3} = emqx_ds_storage_layer:create_generation(?SHARD, 100, ?DEFAULT_CONFIG),
|
||||
%% Topics = ["foo/bar", "foo/bar/baz", "a/bar"],
|
||||
%% Timestamps = lists:seq(1, 100),
|
||||
%% TopicFilter = "foo/#",
|
||||
%% TopicsMatching = ["foo/bar", "foo/bar/baz"],
|
||||
%% _ = [
|
||||
%% store(?SHARD, TS, Topic, term_to_binary({Topic, TS}))
|
||||
%% || Topic <- Topics, TS <- Timestamps
|
||||
%% ],
|
||||
%% It0 = iterator(?SHARD, TopicFilter, 0),
|
||||
%% {It1, Res10} = iterate(It0, 10),
|
||||
%% % preserve mid-generation
|
||||
%% ok = emqx_ds_storage_layer:preserve_iterator(It1, ReplayID),
|
||||
%% {ok, It2} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
|
||||
%% {It3, Res100} = iterate(It2, 88),
|
||||
%% % preserve on the generation boundary
|
||||
%% ok = emqx_ds_storage_layer:preserve_iterator(It3, ReplayID),
|
||||
%% {ok, It4} = emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID),
|
||||
%% {It5, Res200} = iterate(It4, 1000),
|
||||
%% ?assertEqual({end_of_stream, []}, iterate(It5, 1)),
|
||||
%% ?assertEqual(
|
||||
%% lists:sort([{Topic, TS} || Topic <- TopicsMatching, TS <- Timestamps]),
|
||||
%% lists:sort([binary_to_term(Payload) || Payload <- Res10 ++ Res100 ++ Res200])
|
||||
%% ),
|
||||
%% ?assertEqual(
|
||||
%% ok,
|
||||
%% emqx_ds_storage_layer:discard_iterator(?SHARD, ReplayID)
|
||||
%% ),
|
||||
%% ?assertEqual(
|
||||
%% {error, not_found},
|
||||
%% emqx_ds_storage_layer:restore_iterator(?SHARD, ReplayID)
|
||||
%% ).
|
||||
|
||||
make_message(PublishedAt, Topic, Payload) when is_list(Topic) ->
|
||||
make_message(PublishedAt, list_to_binary(Topic), Payload);
|
||||
make_message(PublishedAt, Topic, Payload) when is_binary(Topic) ->
|
||||
ID = emqx_guid:gen(),
|
||||
#message{
|
||||
id = ID,
|
||||
from = <<?MODULE_STRING>>,
|
||||
topic = Topic,
|
||||
timestamp = PublishedAt,
|
||||
payload = Payload
|
||||
}.
|
||||
|
||||
make_deletion(Timestamp, Topic, Payload) ->
|
||||
{delete, #message_matcher{
|
||||
from = <<?MODULE_STRING>>,
|
||||
topic = Topic,
|
||||
timestamp = Timestamp,
|
||||
payload = Payload
|
||||
}}.
|
||||
|
||||
make_topic(Tokens = [_ | _]) ->
|
||||
emqx_topic:join([bin(T) || T <- Tokens]).
|
||||
|
||||
|
@ -535,13 +507,23 @@ end_per_suite(Config) ->
|
|||
ok.
|
||||
|
||||
init_per_testcase(TC, Config) ->
|
||||
ok = emqx_ds:open_db(TC, ?DB_CONFIG(Config)),
|
||||
ok = emqx_ds:open_db(TC, db_config(TC, Config)),
|
||||
Config.
|
||||
|
||||
end_per_testcase(TC, _Config) ->
|
||||
emqx_ds:drop_db(TC),
|
||||
ok.
|
||||
|
||||
db_config(TC, Config) ->
|
||||
ConfigBase = ?DB_CONFIG(Config),
|
||||
SpecificConfig =
|
||||
try
|
||||
?MODULE:TC(?FUNCTION_NAME, Config)
|
||||
catch
|
||||
error:undef -> #{}
|
||||
end,
|
||||
maps:merge(ConfigBase, SpecificConfig).
|
||||
|
||||
shard(TC) ->
|
||||
{TC, <<"0">>}.
|
||||
|
||||
|
|
|
@ -377,7 +377,7 @@ nodes_of_clientid(DB, ClientId, Nodes = [N0 | _]) ->
|
|||
shard_of_clientid(DB, Node, ClientId) ->
|
||||
?ON(
|
||||
Node,
|
||||
emqx_ds_buffer:shard_of_message(DB, #message{from = ClientId}, clientid)
|
||||
emqx_ds_buffer:shard_of_operation(DB, #message{from = ClientId}, clientid)
|
||||
).
|
||||
|
||||
%% Consume eagerly:
|
||||
|
|
Loading…
Reference in New Issue