From 5356d678cc33b74f64f15a233e0c345436f693bc Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 30 Jul 2024 16:55:38 +0200 Subject: [PATCH] feat(dsraft): support atomic batches + preconditions --- .../src/emqx_ds_replication_layer.erl | 145 ++++++++++++++---- .../src/emqx_ds_replication_layer.hrl | 3 +- apps/emqx_durable_storage/src/emqx_ds.erl | 5 +- .../src/emqx_ds_storage_layer.erl | 65 +++++--- .../src/emqx_ds_storage_reference.erl | 54 +++++-- 5 files changed, 207 insertions(+), 65 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index f54981884..3430a3bda 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -83,6 +83,7 @@ ra_state/0 ]). +-include_lib("emqx_durable_storage/include/emqx_ds.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). -include("emqx_ds_replication_layer.hrl"). @@ -135,11 +136,12 @@ ?enc := emqx_ds_storage_layer:delete_iterator() }. -%% TODO: this type is obsolete and is kept only for compatibility with -%% BPAPIs. Remove it when emqx_ds_proto_v4 is gone (EMQX 5.6) +%% Write batch. +%% Instances of this type currently form the mojority of the Raft log. -type batch() :: #{ ?tag := ?BATCH, - ?batch_messages := [emqx_types:message()] + ?batch_operations := [emqx_ds:operation()], + ?batch_preconditions => [emqx_ds:precondition()] }. -type generation_rank() :: {shard_id(), term()}. @@ -240,16 +242,45 @@ drop_db(DB) -> _ = emqx_ds_proto_v4:drop_db(list_nodes(), DB), emqx_ds_replication_layer_meta:drop_db(DB). --spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> +-spec store_batch(emqx_ds:db(), emqx_ds:batch(), emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). -store_batch(DB, Messages, Opts) -> +store_batch(DB, Batch = #dsbatch{preconditions = [_ | _]}, Opts) -> + %% NOTE: Atomic batch is implied, will not check with DB config. + store_batch_atomic(DB, Batch, Opts); +store_batch(DB, Batch, Opts) -> + case emqx_ds_replication_layer_meta:db_config(DB) of + #{atomic_batches := true} -> + store_batch_atomic(DB, Batch, Opts); + #{} -> + store_batch_buffered(DB, Batch, Opts) + end. + +store_batch_buffered(DB, #dsbatch{operations = Operations}, Opts) -> + store_batch_buffered(DB, Operations, Opts); +store_batch_buffered(DB, Batch, Opts) -> try - emqx_ds_buffer:store_batch(DB, Messages, Opts) + emqx_ds_buffer:store_batch(DB, Batch, Opts) catch error:{Reason, _Call} when Reason == timeout; Reason == noproc -> {error, recoverable, Reason} end. +store_batch_atomic(DB, Batch, _Opts) -> + Shards = shards_of_batch(DB, Batch), + case Shards of + [Shard] -> + case ra_store_batch(DB, Shard, Batch) of + {timeout, ServerId} -> + {error, recoverable, {timeout, ServerId}}; + Result -> + Result + end; + [] -> + ok; + [_ | _] -> + {error, unrecoverable, nonatomic_batch_spans_multiple_storages} + end. + -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. get_streams(DB, TopicFilter, StartTime) -> @@ -418,6 +449,23 @@ shard_of_key(DB, Key) -> Hash = erlang:phash2(Key, N), integer_to_binary(Hash). +shards_of_batch(DB, #dsbatch{operations = Operations, preconditions = Preconditions}) -> + shards_of_batch(DB, Preconditions, shards_of_batch(DB, Operations, [])); +shards_of_batch(DB, Operations) -> + shards_of_batch(DB, Operations, []). + +shards_of_batch(DB, [Operation | Rest], Acc) -> + case shard_of_operation(DB, Operation, clientid, #{}) of + Shard when Shard =:= hd(Acc) -> + shards_of_batch(DB, Rest, Acc); + Shard when Acc =:= [] -> + shards_of_batch(DB, Rest, [Shard]); + ShardAnother -> + [ShardAnother | Acc] + end; +shards_of_batch(_DB, [], Acc) -> + Acc. + %%================================================================================ %% Internal exports (RPC targets) %%================================================================================ @@ -639,13 +687,22 @@ list_nodes() -> end ). --spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) -> - ok | {timeout, _} | {error, recoverable | unrecoverable, _Err}. -ra_store_batch(DB, Shard, Messages) -> - Command = #{ - ?tag => ?BATCH, - ?batch_messages => Messages - }, +-spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:batch()) -> + ok | {timeout, _} | emqx_ds:error(_). +ra_store_batch(DB, Shard, Batch) -> + case Batch of + #dsbatch{operations = Operations, preconditions = Preconditions} -> + Command = #{ + ?tag => ?BATCH, + ?batch_operations => Operations, + ?batch_preconditions => Preconditions + }; + Operations -> + Command = #{ + ?tag => ?BATCH, + ?batch_operations => Operations + } + end, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), case emqx_ds_replication_layer_shard:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> @@ -782,6 +839,7 @@ ra_drop_shard(DB, Shard) -> -define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release'). -define(pd_ra_bytes_need_release, '$emqx_ds_raft_bytes_need_release'). +-define(pd_ra_force_monotonic, '$emqx_ds_raft_force_monotonic'). -spec init(_Args :: map()) -> ra_state(). init(#{db := DB, shard := Shard}) -> @@ -791,18 +849,30 @@ init(#{db := DB, shard := Shard}) -> {ra_state(), _Reply, _Effects}. apply( RaftMeta, - #{ + Command = #{ ?tag := ?BATCH, - ?batch_messages := MessagesIn + ?batch_operations := OperationsIn }, #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0 ) -> - ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}), - {Stats, Latest, Messages} = assign_timestamps(Latest0, MessagesIn), - Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{durable => false}), - State = State0#{latest := Latest}, - set_ts(DBShard, Latest), - Effects = try_release_log(Stats, RaftMeta, State), + ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => OperationsIn, latest => Latest0}), + Preconditions = maps:get(?batch_preconditions, Command, []), + {Stats, Latest, Operations} = assign_timestamps(DB, Latest0, OperationsIn), + %% FIXME + case emqx_ds_precondition:verify(emqx_ds_storage_layer, DBShard, Preconditions) of + ok -> + Result = emqx_ds_storage_layer:store_batch(DBShard, Operations, #{durable => false}), + State = State0#{latest := Latest}, + set_ts(DBShard, Latest), + Effects = try_release_log(Stats, RaftMeta, State); + PreconditionFailed = {precondition_failed, _} -> + Result = {error, unrecoverable, PreconditionFailed}, + State = State0, + Effects = []; + Result -> + State = State0, + Effects = [] + end, Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}), {State, Result, Effects}; apply( @@ -877,6 +947,21 @@ apply( Effects = handle_custom_event(DBShard, Latest, CustomEvent), {State#{latest => Latest}, ok, Effects}. +assign_timestamps(DB, Latest, Messages) -> + ForceMonotonic = force_monotonic_timestamps(DB), + assign_timestamps(ForceMonotonic, Latest, Messages, [], 0, 0). + +force_monotonic_timestamps(DB) -> + case erlang:get(?pd_ra_force_monotonic) of + undefined -> + DBConfig = emqx_ds_replication_layer_meta:db_config(DB), + Flag = maps:get(force_monotonic_timestamps, DBConfig), + erlang:put(?pd_ra_force_monotonic, Flag); + Flag -> + ok + end, + Flag. + try_release_log({_N, BatchSize}, RaftMeta = #{index := CurrentIdx}, State) -> %% NOTE %% Because cursor release means storage flush (see @@ -939,10 +1024,7 @@ tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}), handle_custom_event(DBShard, Timestamp, tick). -assign_timestamps(Latest, Messages) -> - assign_timestamps(Latest, Messages, [], 0, 0). - -assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) -> +assign_timestamps(true, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) -> case emqx_message:timestamp(Message0, microsecond) of TimestampUs when TimestampUs > Latest0 -> Latest = TimestampUs, @@ -951,8 +1033,17 @@ assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) -> Latest = Latest0 + 1, Message = assign_timestamp(Latest, Message0) end, - assign_timestamps(Latest, Rest, [Message | Acc], N + 1, Sz + approx_message_size(Message0)); -assign_timestamps(Latest, [], Acc, N, Size) -> + MSize = approx_message_size(Message0), + assign_timestamps(true, Latest, Rest, [Message | Acc], N + 1, Sz + MSize); +assign_timestamps(false, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) -> + Timestamp = emqx_message:timestamp(Message0), + Latest = max(Latest0, Timestamp), + Message = assign_timestamp(Timestamp, Message0), + MSize = approx_message_size(Message0), + assign_timestamps(false, Latest, Rest, [Message | Acc], N + 1, Sz + MSize); +assign_timestamps(ForceMonotonic, Latest, [Operation | Rest], Acc, N, Sz) -> + assign_timestamps(ForceMonotonic, Latest, Rest, [Operation | Acc], N + 1, Sz); +assign_timestamps(_ForceMonotonic, Latest, [], Acc, N, Size) -> {{N, Size}, Latest, lists:reverse(Acc)}. assign_timestamp(TimestampUs, Message) -> diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.hrl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.hrl index f33090c46..c87e9bcba 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.hrl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.hrl @@ -19,7 +19,8 @@ -define(enc, 3). %% ?BATCH --define(batch_messages, 2). +-define(batch_operations, 2). +-define(batch_preconditions, 4). -define(timestamp, 3). %% add_generation / update_config diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 69de92325..0e4336a26 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -56,6 +56,7 @@ topic/0, batch/0, operation/0, + deletion/0, precondition/0, stream/0, delete_stream/0, @@ -110,7 +111,9 @@ message() %% Delete a message. %% Does nothing if the message does not exist. - | {delete, message_matcher('_')}. + | deletion(). + +-type deletion() :: {delete, message_matcher('_')}. %% Precondition. %% Fails whole batch if the storage already has the matching message (`if_exists'), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index d6250254d..ac2edc254 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -37,6 +37,9 @@ next/4, delete_next/5, + %% Preconditions + lookup_message/2, + %% Generations update_config/3, add_generation/2, @@ -74,6 +77,7 @@ batch_store_opts/0 ]). +-include("emqx_ds.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). @@ -115,6 +119,11 @@ -type gen_id() :: 0..16#ffff. +-type batch() :: [ + {emqx_ds:time(), emqx_types:message()} + | emqx_ds:deletion() +]. + %% Options affecting how batches should be stored. %% See also: `emqx_ds:message_store_opts()'. -type batch_store_opts() :: @@ -294,6 +303,10 @@ | {ok, end_of_stream} | emqx_ds:error(_). +%% Lookup a single message, for preconditions to work. +-callback lookup_message(shard_id(), generation_data(), emqx_ds_precondition:matcher()) -> + emqx_types:message() | not_found | emqx_ds:error(_). + -callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. @@ -317,14 +330,10 @@ drop_shard(Shard) -> %% @doc This is a convenicence wrapper that combines `prepare' and %% `commit' operations. --spec store_batch( - shard_id(), - [{emqx_ds:time(), emqx_types:message()}], - batch_store_opts() -) -> +-spec store_batch(shard_id(), batch(), batch_store_opts()) -> emqx_ds:store_batch_result(). -store_batch(Shard, Messages, Options) -> - case prepare_batch(Shard, Messages, #{}) of +store_batch(Shard, Batch, Options) -> + case prepare_batch(Shard, Batch, #{}) of {ok, CookedBatch} -> commit_batch(Shard, CookedBatch, Options); ignore -> @@ -342,23 +351,21 @@ store_batch(Shard, Messages, Options) -> %% %% The underlying storage layout MAY use timestamp as a unique message %% ID. --spec prepare_batch( - shard_id(), - [{emqx_ds:time(), emqx_types:message()}], - batch_prepare_opts() -) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_). -prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> +-spec prepare_batch(shard_id(), batch(), batch_prepare_opts()) -> + {ok, cooked_batch()} | ignore | emqx_ds:error(_). +prepare_batch(Shard, Batch, Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. ?tp(emqx_ds_storage_layer_prepare_batch, #{ - shard => Shard, messages => Messages, options => Options + shard => Shard, batch => Batch, options => Options }), %% FIXME: always store messages in the current generation - case generation_at(Shard, Time) of + Time = batch_starts_at(Batch), + case is_integer(Time) andalso generation_at(Shard, Time) of {GenId, #{module := Mod, data := GenData}} -> T0 = erlang:monotonic_time(microsecond), Result = - case Mod:prepare_batch(Shard, GenData, Messages, Options) of + case Mod:prepare_batch(Shard, GenData, Batch, Options) of {ok, CookedBatch} -> {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; Error = {error, _, _} -> @@ -368,11 +375,21 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> %% TODO store->prepare emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), Result; + false -> + %% No write operations in this batch. + ignore; not_found -> + %% Generation is likely already GCed. ignore - end; -prepare_batch(_Shard, [], _Options) -> - ignore. + end. + +-spec batch_starts_at(batch()) -> emqx_ds:time() | undefined. +batch_starts_at([{Time, _Message} | _]) when is_integer(Time) -> + Time; +batch_starts_at([{delete, #message_matcher{timestamp = Time}} | _]) -> + Time; +batch_starts_at([]) -> + undefined. %% @doc Commit cooked batch to the storage. %% @@ -559,6 +576,16 @@ update_config(ShardId, Since, Options) -> add_generation(ShardId, Since) -> gen_server:call(?REF(ShardId), #call_add_generation{since = Since}, infinity). +-spec lookup_message(shard_id(), emqx_ds_precondition:matcher()) -> + emqx_types:message() | not_found | emqx_ds:error(_). +lookup_message(ShardId, Matcher = #message_matcher{timestamp = Time}) -> + case generation_at(ShardId, Time) of + {_GenId, #{module := Mod, data := GenData}} -> + Mod:lookup_message(ShardId, GenData, Matcher); + not_found -> + not_found + end. + -spec list_generations_with_lifetimes(shard_id()) -> #{ gen_id() => #{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index cfd6f30ac..869602fcd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -21,6 +21,8 @@ %% used for testing. -module(emqx_ds_storage_reference). +-include("emqx_ds.hrl"). + -behaviour(emqx_ds_storage_layer). %% API: @@ -39,7 +41,8 @@ make_delete_iterator/5, update_iterator/4, next/6, - delete_next/7 + delete_next/7, + lookup_message/3 ]). %% internal exports: @@ -49,6 +52,8 @@ -include_lib("emqx_utils/include/emqx_message.hrl"). +-define(DB_KEY(TIMESTAMP), <>). + %%================================================================================ %% Type declarations %%================================================================================ @@ -102,23 +107,22 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) -> ok = rocksdb:drop_column_family(DBHandle, CFHandle), ok. -prepare_batch(_ShardId, _Data, Messages, _Options) -> - {ok, Messages}. +prepare_batch(_ShardId, _Data, Batch, _Options) -> + {ok, Batch}. -commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, Options) -> - {ok, Batch} = rocksdb:batch(), - lists:foreach( - fun({TS, Msg}) -> - Key = <>, - Val = term_to_binary(Msg), - rocksdb:batch_put(Batch, CF, Key, Val) - end, - Messages - ), - Res = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)), - rocksdb:release_batch(Batch), +commit_batch(_ShardId, S = #s{db = DB}, Batch, Options) -> + {ok, BatchHandle} = rocksdb:batch(), + lists:foreach(fun(Op) -> process_batch_operation(S, Op, BatchHandle) end, Batch), + Res = rocksdb:write_batch(DB, BatchHandle, write_batch_opts(Options)), + rocksdb:release_batch(BatchHandle), Res. +process_batch_operation(S, {TS, Msg = #message{}}, BatchHandle) -> + Val = encode_message(Msg), + rocksdb:batch_put(BatchHandle, S#s.cf, ?DB_KEY(TS), Val); +process_batch_operation(S, {delete, #message_matcher{timestamp = TS}}, BatchHandle) -> + rocksdb:batch_delete(BatchHandle, S#s.cf, ?DB_KEY(TS)). + get_streams(_Shard, _Data, _TopicFilter, _StartTime) -> [#stream{}]. @@ -205,6 +209,16 @@ delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now, IsCurr {ok, It, NumDeleted, NumIterated} end. +lookup_message(_ShardId, #s{db = DB, cf = CF}, #message_matcher{timestamp = TS}) -> + case rocksdb:get(DB, CF, ?DB_KEY(TS), _ReadOpts = []) of + {ok, Val} -> + decode_message(Val); + not_found -> + not_found; + {error, Reason} -> + {error, unrecoverable, Reason} + end. + %%================================================================================ %% Internal functions %%================================================================================ @@ -214,7 +228,7 @@ do_next(_, _, _, _, 0, Key, Acc) -> do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) -> case rocksdb:iterator_move(IT, Action) of {ok, Key = <>, Blob} -> - Msg = #message{topic = Topic} = binary_to_term(Blob), + Msg = #message{topic = Topic} = decode_message(Blob), TopicWords = emqx_topic:words(Topic), case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of true -> @@ -234,7 +248,7 @@ do_delete_next( ) -> case rocksdb:iterator_move(IT, Action) of {ok, Key, Blob} -> - Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob), + Msg = #message{topic = Topic, timestamp = TS} = decode_message(Blob), TopicWords = emqx_topic:words(Topic), case emqx_topic:match(TopicWords, TopicFilter) andalso TS >= StartTime of true -> @@ -285,6 +299,12 @@ do_delete_next( {Key0, {AccDel, AccIter}} end. +encode_message(Msg) -> + term_to_binary(Msg). + +decode_message(Val) -> + binary_to_term(Val). + %% @doc Generate a column family ID for the MQTT messages -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. data_cf(GenId) ->