feat(dsraft): support atomic batches + preconditions

This commit is contained in:
Andrew Mayorov 2024-07-30 16:55:38 +02:00 committed by Thales Macedo Garitezi
parent 11951f8f6c
commit 5356d678cc
5 changed files with 207 additions and 65 deletions

View File

@ -83,6 +83,7 @@
ra_state/0
]).
-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
-include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-include("emqx_ds_replication_layer.hrl").
@ -135,11 +136,12 @@
?enc := emqx_ds_storage_layer:delete_iterator()
}.
%% TODO: this type is obsolete and is kept only for compatibility with
%% BPAPIs. Remove it when emqx_ds_proto_v4 is gone (EMQX 5.6)
%% Write batch.
%% Instances of this type currently form the mojority of the Raft log.
-type batch() :: #{
?tag := ?BATCH,
?batch_messages := [emqx_types:message()]
?batch_operations := [emqx_ds:operation()],
?batch_preconditions => [emqx_ds:precondition()]
}.
-type generation_rank() :: {shard_id(), term()}.
@ -240,16 +242,45 @@ drop_db(DB) ->
_ = emqx_ds_proto_v4:drop_db(list_nodes(), DB),
emqx_ds_replication_layer_meta:drop_db(DB).
-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) ->
-spec store_batch(emqx_ds:db(), emqx_ds:batch(), emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result().
store_batch(DB, Messages, Opts) ->
store_batch(DB, Batch = #dsbatch{preconditions = [_ | _]}, Opts) ->
%% NOTE: Atomic batch is implied, will not check with DB config.
store_batch_atomic(DB, Batch, Opts);
store_batch(DB, Batch, Opts) ->
case emqx_ds_replication_layer_meta:db_config(DB) of
#{atomic_batches := true} ->
store_batch_atomic(DB, Batch, Opts);
#{} ->
store_batch_buffered(DB, Batch, Opts)
end.
store_batch_buffered(DB, #dsbatch{operations = Operations}, Opts) ->
store_batch_buffered(DB, Operations, Opts);
store_batch_buffered(DB, Batch, Opts) ->
try
emqx_ds_buffer:store_batch(DB, Messages, Opts)
emqx_ds_buffer:store_batch(DB, Batch, Opts)
catch
error:{Reason, _Call} when Reason == timeout; Reason == noproc ->
{error, recoverable, Reason}
end.
store_batch_atomic(DB, Batch, _Opts) ->
Shards = shards_of_batch(DB, Batch),
case Shards of
[Shard] ->
case ra_store_batch(DB, Shard, Batch) of
{timeout, ServerId} ->
{error, recoverable, {timeout, ServerId}};
Result ->
Result
end;
[] ->
ok;
[_ | _] ->
{error, unrecoverable, nonatomic_batch_spans_multiple_storages}
end.
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[{emqx_ds:stream_rank(), stream()}].
get_streams(DB, TopicFilter, StartTime) ->
@ -418,6 +449,23 @@ shard_of_key(DB, Key) ->
Hash = erlang:phash2(Key, N),
integer_to_binary(Hash).
shards_of_batch(DB, #dsbatch{operations = Operations, preconditions = Preconditions}) ->
shards_of_batch(DB, Preconditions, shards_of_batch(DB, Operations, []));
shards_of_batch(DB, Operations) ->
shards_of_batch(DB, Operations, []).
shards_of_batch(DB, [Operation | Rest], Acc) ->
case shard_of_operation(DB, Operation, clientid, #{}) of
Shard when Shard =:= hd(Acc) ->
shards_of_batch(DB, Rest, Acc);
Shard when Acc =:= [] ->
shards_of_batch(DB, Rest, [Shard]);
ShardAnother ->
[ShardAnother | Acc]
end;
shards_of_batch(_DB, [], Acc) ->
Acc.
%%================================================================================
%% Internal exports (RPC targets)
%%================================================================================
@ -639,13 +687,22 @@ list_nodes() ->
end
).
-spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) ->
ok | {timeout, _} | {error, recoverable | unrecoverable, _Err}.
ra_store_batch(DB, Shard, Messages) ->
-spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:batch()) ->
ok | {timeout, _} | emqx_ds:error(_).
ra_store_batch(DB, Shard, Batch) ->
case Batch of
#dsbatch{operations = Operations, preconditions = Preconditions} ->
Command = #{
?tag => ?BATCH,
?batch_messages => Messages
},
?batch_operations => Operations,
?batch_preconditions => Preconditions
};
Operations ->
Command = #{
?tag => ?BATCH,
?batch_operations => Operations
}
end,
Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case emqx_ds_replication_layer_shard:process_command(Servers, Command, ?RA_TIMEOUT) of
{ok, Result, _Leader} ->
@ -782,6 +839,7 @@ ra_drop_shard(DB, Shard) ->
-define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release').
-define(pd_ra_bytes_need_release, '$emqx_ds_raft_bytes_need_release').
-define(pd_ra_force_monotonic, '$emqx_ds_raft_force_monotonic').
-spec init(_Args :: map()) -> ra_state().
init(#{db := DB, shard := Shard}) ->
@ -791,18 +849,30 @@ init(#{db := DB, shard := Shard}) ->
{ra_state(), _Reply, _Effects}.
apply(
RaftMeta,
#{
Command = #{
?tag := ?BATCH,
?batch_messages := MessagesIn
?batch_operations := OperationsIn
},
#{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0
) ->
?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}),
{Stats, Latest, Messages} = assign_timestamps(Latest0, MessagesIn),
Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{durable => false}),
?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => OperationsIn, latest => Latest0}),
Preconditions = maps:get(?batch_preconditions, Command, []),
{Stats, Latest, Operations} = assign_timestamps(DB, Latest0, OperationsIn),
%% FIXME
case emqx_ds_precondition:verify(emqx_ds_storage_layer, DBShard, Preconditions) of
ok ->
Result = emqx_ds_storage_layer:store_batch(DBShard, Operations, #{durable => false}),
State = State0#{latest := Latest},
set_ts(DBShard, Latest),
Effects = try_release_log(Stats, RaftMeta, State),
Effects = try_release_log(Stats, RaftMeta, State);
PreconditionFailed = {precondition_failed, _} ->
Result = {error, unrecoverable, PreconditionFailed},
State = State0,
Effects = [];
Result ->
State = State0,
Effects = []
end,
Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}),
{State, Result, Effects};
apply(
@ -877,6 +947,21 @@ apply(
Effects = handle_custom_event(DBShard, Latest, CustomEvent),
{State#{latest => Latest}, ok, Effects}.
assign_timestamps(DB, Latest, Messages) ->
ForceMonotonic = force_monotonic_timestamps(DB),
assign_timestamps(ForceMonotonic, Latest, Messages, [], 0, 0).
force_monotonic_timestamps(DB) ->
case erlang:get(?pd_ra_force_monotonic) of
undefined ->
DBConfig = emqx_ds_replication_layer_meta:db_config(DB),
Flag = maps:get(force_monotonic_timestamps, DBConfig),
erlang:put(?pd_ra_force_monotonic, Flag);
Flag ->
ok
end,
Flag.
try_release_log({_N, BatchSize}, RaftMeta = #{index := CurrentIdx}, State) ->
%% NOTE
%% Because cursor release means storage flush (see
@ -939,10 +1024,7 @@ tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}),
handle_custom_event(DBShard, Timestamp, tick).
assign_timestamps(Latest, Messages) ->
assign_timestamps(Latest, Messages, [], 0, 0).
assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) ->
assign_timestamps(true, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
case emqx_message:timestamp(Message0, microsecond) of
TimestampUs when TimestampUs > Latest0 ->
Latest = TimestampUs,
@ -951,8 +1033,17 @@ assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) ->
Latest = Latest0 + 1,
Message = assign_timestamp(Latest, Message0)
end,
assign_timestamps(Latest, Rest, [Message | Acc], N + 1, Sz + approx_message_size(Message0));
assign_timestamps(Latest, [], Acc, N, Size) ->
MSize = approx_message_size(Message0),
assign_timestamps(true, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
assign_timestamps(false, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
Timestamp = emqx_message:timestamp(Message0),
Latest = max(Latest0, Timestamp),
Message = assign_timestamp(Timestamp, Message0),
MSize = approx_message_size(Message0),
assign_timestamps(false, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
assign_timestamps(ForceMonotonic, Latest, [Operation | Rest], Acc, N, Sz) ->
assign_timestamps(ForceMonotonic, Latest, Rest, [Operation | Acc], N + 1, Sz);
assign_timestamps(_ForceMonotonic, Latest, [], Acc, N, Size) ->
{{N, Size}, Latest, lists:reverse(Acc)}.
assign_timestamp(TimestampUs, Message) ->

View File

@ -19,7 +19,8 @@
-define(enc, 3).
%% ?BATCH
-define(batch_messages, 2).
-define(batch_operations, 2).
-define(batch_preconditions, 4).
-define(timestamp, 3).
%% add_generation / update_config

View File

@ -56,6 +56,7 @@
topic/0,
batch/0,
operation/0,
deletion/0,
precondition/0,
stream/0,
delete_stream/0,
@ -110,7 +111,9 @@
message()
%% Delete a message.
%% Does nothing if the message does not exist.
| {delete, message_matcher('_')}.
| deletion().
-type deletion() :: {delete, message_matcher('_')}.
%% Precondition.
%% Fails whole batch if the storage already has the matching message (`if_exists'),

View File

@ -37,6 +37,9 @@
next/4,
delete_next/5,
%% Preconditions
lookup_message/2,
%% Generations
update_config/3,
add_generation/2,
@ -74,6 +77,7 @@
batch_store_opts/0
]).
-include("emqx_ds.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
@ -115,6 +119,11 @@
-type gen_id() :: 0..16#ffff.
-type batch() :: [
{emqx_ds:time(), emqx_types:message()}
| emqx_ds:deletion()
].
%% Options affecting how batches should be stored.
%% See also: `emqx_ds:message_store_opts()'.
-type batch_store_opts() ::
@ -294,6 +303,10 @@
| {ok, end_of_stream}
| emqx_ds:error(_).
%% Lookup a single message, for preconditions to work.
-callback lookup_message(shard_id(), generation_data(), emqx_ds_precondition:matcher()) ->
emqx_types:message() | not_found | emqx_ds:error(_).
-callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) ->
[CustomEvent].
@ -317,14 +330,10 @@ drop_shard(Shard) ->
%% @doc This is a convenicence wrapper that combines `prepare' and
%% `commit' operations.
-spec store_batch(
shard_id(),
[{emqx_ds:time(), emqx_types:message()}],
batch_store_opts()
) ->
-spec store_batch(shard_id(), batch(), batch_store_opts()) ->
emqx_ds:store_batch_result().
store_batch(Shard, Messages, Options) ->
case prepare_batch(Shard, Messages, #{}) of
store_batch(Shard, Batch, Options) ->
case prepare_batch(Shard, Batch, #{}) of
{ok, CookedBatch} ->
commit_batch(Shard, CookedBatch, Options);
ignore ->
@ -342,23 +351,21 @@ store_batch(Shard, Messages, Options) ->
%%
%% The underlying storage layout MAY use timestamp as a unique message
%% ID.
-spec prepare_batch(
shard_id(),
[{emqx_ds:time(), emqx_types:message()}],
batch_prepare_opts()
) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_).
prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
-spec prepare_batch(shard_id(), batch(), batch_prepare_opts()) ->
{ok, cooked_batch()} | ignore | emqx_ds:error(_).
prepare_batch(Shard, Batch, Options) ->
%% NOTE
%% We assume that batches do not span generations. Callers should enforce this.
?tp(emqx_ds_storage_layer_prepare_batch, #{
shard => Shard, messages => Messages, options => Options
shard => Shard, batch => Batch, options => Options
}),
%% FIXME: always store messages in the current generation
case generation_at(Shard, Time) of
Time = batch_starts_at(Batch),
case is_integer(Time) andalso generation_at(Shard, Time) of
{GenId, #{module := Mod, data := GenData}} ->
T0 = erlang:monotonic_time(microsecond),
Result =
case Mod:prepare_batch(Shard, GenData, Messages, Options) of
case Mod:prepare_batch(Shard, GenData, Batch, Options) of
{ok, CookedBatch} ->
{ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
Error = {error, _, _} ->
@ -368,11 +375,21 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
%% TODO store->prepare
emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
Result;
false ->
%% No write operations in this batch.
ignore;
not_found ->
%% Generation is likely already GCed.
ignore
end;
prepare_batch(_Shard, [], _Options) ->
ignore.
end.
-spec batch_starts_at(batch()) -> emqx_ds:time() | undefined.
batch_starts_at([{Time, _Message} | _]) when is_integer(Time) ->
Time;
batch_starts_at([{delete, #message_matcher{timestamp = Time}} | _]) ->
Time;
batch_starts_at([]) ->
undefined.
%% @doc Commit cooked batch to the storage.
%%
@ -559,6 +576,16 @@ update_config(ShardId, Since, Options) ->
add_generation(ShardId, Since) ->
gen_server:call(?REF(ShardId), #call_add_generation{since = Since}, infinity).
-spec lookup_message(shard_id(), emqx_ds_precondition:matcher()) ->
emqx_types:message() | not_found | emqx_ds:error(_).
lookup_message(ShardId, Matcher = #message_matcher{timestamp = Time}) ->
case generation_at(ShardId, Time) of
{_GenId, #{module := Mod, data := GenData}} ->
Mod:lookup_message(ShardId, GenData, Matcher);
not_found ->
not_found
end.
-spec list_generations_with_lifetimes(shard_id()) ->
#{
gen_id() => #{

View File

@ -21,6 +21,8 @@
%% used for testing.
-module(emqx_ds_storage_reference).
-include("emqx_ds.hrl").
-behaviour(emqx_ds_storage_layer).
%% API:
@ -39,7 +41,8 @@
make_delete_iterator/5,
update_iterator/4,
next/6,
delete_next/7
delete_next/7,
lookup_message/3
]).
%% internal exports:
@ -49,6 +52,8 @@
-include_lib("emqx_utils/include/emqx_message.hrl").
-define(DB_KEY(TIMESTAMP), <<TIMESTAMP:64>>).
%%================================================================================
%% Type declarations
%%================================================================================
@ -102,23 +107,22 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
ok = rocksdb:drop_column_family(DBHandle, CFHandle),
ok.
prepare_batch(_ShardId, _Data, Messages, _Options) ->
{ok, Messages}.
prepare_batch(_ShardId, _Data, Batch, _Options) ->
{ok, Batch}.
commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, Options) ->
{ok, Batch} = rocksdb:batch(),
lists:foreach(
fun({TS, Msg}) ->
Key = <<TS:64>>,
Val = term_to_binary(Msg),
rocksdb:batch_put(Batch, CF, Key, Val)
end,
Messages
),
Res = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)),
rocksdb:release_batch(Batch),
commit_batch(_ShardId, S = #s{db = DB}, Batch, Options) ->
{ok, BatchHandle} = rocksdb:batch(),
lists:foreach(fun(Op) -> process_batch_operation(S, Op, BatchHandle) end, Batch),
Res = rocksdb:write_batch(DB, BatchHandle, write_batch_opts(Options)),
rocksdb:release_batch(BatchHandle),
Res.
process_batch_operation(S, {TS, Msg = #message{}}, BatchHandle) ->
Val = encode_message(Msg),
rocksdb:batch_put(BatchHandle, S#s.cf, ?DB_KEY(TS), Val);
process_batch_operation(S, {delete, #message_matcher{timestamp = TS}}, BatchHandle) ->
rocksdb:batch_delete(BatchHandle, S#s.cf, ?DB_KEY(TS)).
get_streams(_Shard, _Data, _TopicFilter, _StartTime) ->
[#stream{}].
@ -205,6 +209,16 @@ delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now, IsCurr
{ok, It, NumDeleted, NumIterated}
end.
lookup_message(_ShardId, #s{db = DB, cf = CF}, #message_matcher{timestamp = TS}) ->
case rocksdb:get(DB, CF, ?DB_KEY(TS), _ReadOpts = []) of
{ok, Val} ->
decode_message(Val);
not_found ->
not_found;
{error, Reason} ->
{error, unrecoverable, Reason}
end.
%%================================================================================
%% Internal functions
%%================================================================================
@ -214,7 +228,7 @@ do_next(_, _, _, _, 0, Key, Acc) ->
do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) ->
case rocksdb:iterator_move(IT, Action) of
{ok, Key = <<TS:64>>, Blob} ->
Msg = #message{topic = Topic} = binary_to_term(Blob),
Msg = #message{topic = Topic} = decode_message(Blob),
TopicWords = emqx_topic:words(Topic),
case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of
true ->
@ -234,7 +248,7 @@ do_delete_next(
) ->
case rocksdb:iterator_move(IT, Action) of
{ok, Key, Blob} ->
Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob),
Msg = #message{topic = Topic, timestamp = TS} = decode_message(Blob),
TopicWords = emqx_topic:words(Topic),
case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of
true ->
@ -285,6 +299,12 @@ do_delete_next(
{Key0, {AccDel, AccIter}}
end.
encode_message(Msg) ->
term_to_binary(Msg).
decode_message(Val) ->
binary_to_term(Val).
%% @doc Generate a column family ID for the MQTT messages
-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
data_cf(GenId) ->