From 5d87d400f450261a2c13f242b45a59ec7d398ccd Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 6 Mar 2024 15:24:14 -0300 Subject: [PATCH] feat(ds): add atomic store API Part of https://emqx.atlassian.net/browse/EMQX-11841 --- apps/emqx_durable_storage/src/emqx_ds.erl | 11 +++- .../src/emqx_ds_replication_layer_egress.erl | 51 +++++++++++--- .../src/emqx_ds_storage_bitfield_lts.erl | 13 ++++ .../src/emqx_ds_storage_reference.erl | 14 ++++ .../test/emqx_ds_SUITE.erl | 66 +++++++++++++++++++ .../emqx_ds_storage_bitfield_lts_SUITE.erl | 64 ++++++++++++++++++ 6 files changed, 208 insertions(+), 11 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 993194da8..c7fa3552b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -150,7 +150,16 @@ -type message_store_opts() :: #{ - sync => boolean() + %% Whether to wait until the message storage has been acknowledged to return from + %% `store_batch'. + %% Default: `true'. + sync => boolean(), + %% Whether the whole batch given to `store_batch' should be inserted atomically as + %% a unit. Note: the whole batch must be crafted so that it belongs to a single + %% shard (if applicable to the backend), as the batch will be split accordingly + %% even if this flag is `true'. + %% Default: `false'. + atomic => boolean() }. -type generic_db_opts() :: diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 6c1499620..25064ad60 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -51,6 +51,7 @@ -define(flush, flush). -record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}). +-record(enqueue_atomic_req, {batch :: [emqx_types:message()], sync :: boolean()}). %%================================================================================ %% API functions @@ -64,13 +65,34 @@ start_link(DB, Shard) -> ok. store_batch(DB, Messages, Opts) -> Sync = maps:get(sync, Opts, true), - lists:foreach( - fun(Message) -> - Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), - gen_server:call(?via(DB, Shard), #enqueue_req{message = Message, sync = Sync}) - end, - Messages - ). + case maps:get(atomic, Opts, false) of + false -> + lists:foreach( + fun(Message) -> + Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), + gen_server:call(?via(DB, Shard), #enqueue_req{ + message = Message, + sync = Sync + }) + end, + Messages + ); + true -> + maps:foreach( + fun(Shard, Batch) -> + gen_server:call(?via(DB, Shard), #enqueue_atomic_req{ + batch = Batch, + sync = Sync + }) + end, + maps:groups_from_list( + fun(Message) -> + emqx_ds_replication_layer:shard_of_message(DB, Message, clientid) + end, + Messages + ) + ) + end. %%================================================================================ %% behavior callbacks @@ -101,6 +123,9 @@ init([DB, Shard]) -> handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) -> do_enqueue(From, Sync, Msg, S); +handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync}, From, S) -> + Len = length(Batch), + do_enqueue(From, Sync, {atomic, Len, Batch}, S); handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -131,7 +156,7 @@ do_flush( Batch = #{?tag => ?BATCH, ?batch_messages => lists:reverse(Messages)}, ok = emqx_ds_proto_v2:store_batch(Leader, DB, Shard, Batch, #{}), [gen_server:reply(From, ok) || From <- lists:reverse(Replies)], - ?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard}), + ?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages}), erlang:garbage_collect(), S#s{ n = 0, @@ -140,9 +165,15 @@ do_flush( tref = start_timer() }. -do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> +do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), - S1 = S0#s{n = N + 1, batch = [Msg | Batch]}, + S1 = + case MsgOrBatch of + {atomic, NumMsgs, Msgs} -> + S0#s{n = N + NumMsgs, batch = Msgs ++ Batch}; + Msg -> + S0#s{n = N + 1, batch = [Msg | Batch]} + end, S2 = case N >= NMax of true -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 7ffdd1e2b..d265d8fec 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -230,6 +230,19 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{}) -> emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). +store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options = #{atomic := true}) -> + {ok, Batch} = rocksdb:batch(), + lists:foreach( + fun(Msg) -> + {Key, _} = make_key(S, Msg), + Val = serialize(Msg), + rocksdb:batch_put(Batch, Data, Key, Val) + end, + Messages + ), + Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []), + rocksdb:release_batch(Batch), + Res; store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> lists:foreach( fun(Msg) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 16b3f891f..92918bb13 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -90,6 +90,20 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) -> ok = rocksdb:drop_column_family(DBHandle, CFHandle), ok. +store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := true}) -> + {ok, Batch} = rocksdb:batch(), + lists:foreach( + fun(Msg) -> + Id = erlang:unique_integer([monotonic]), + Key = <>, + Val = term_to_binary(Msg), + rocksdb:batch_put(Batch, CF, Key, Val) + end, + Messages + ), + Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []), + rocksdb:release_batch(Batch), + Res; store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> lists:foreach( fun(Msg) -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 9dae8e699..a0dae0e6f 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -307,6 +307,71 @@ t_08_smoke_list_drop_generation(_Config) -> ), ok. +t_09_atomic_store_batch(_Config) -> + DB = ?FUNCTION_NAME, + ?check_trace( + begin + application:set_env(emqx_durable_storage, egress_batch_size, 1), + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + Msgs = [ + message(<<"1">>, <<"1">>, 0), + message(<<"2">>, <<"2">>, 1), + message(<<"3">>, <<"3">>, 2) + ], + ?assertEqual( + ok, + emqx_ds:store_batch(DB, Msgs, #{ + atomic => true, + sync => true + }) + ), + + ok + end, + fun(Trace) -> + %% Must contain exactly one flush with all messages. + ?assertMatch( + [#{batch := [_, _, _]}], + ?of_kind(emqx_ds_replication_layer_egress_flush, Trace) + ), + ok + end + ), + ok. + +t_10_non_atomic_store_batch(_Config) -> + DB = ?FUNCTION_NAME, + ?check_trace( + begin + application:set_env(emqx_durable_storage, egress_batch_size, 1), + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + Msgs = [ + message(<<"1">>, <<"1">>, 0), + message(<<"2">>, <<"2">>, 1), + message(<<"3">>, <<"3">>, 2) + ], + %% Non-atomic batches may be split. + ?assertEqual( + ok, + emqx_ds:store_batch(DB, Msgs, #{ + atomic => false, + sync => true + }) + ), + + ok + end, + fun(Trace) -> + %% Should contain one flush per message. + ?assertMatch( + [#{batch := [_]}, #{batch := [_]}, #{batch := [_]}], + ?of_kind(emqx_ds_replication_layer_egress_flush, Trace) + ), + ok + end + ), + ok. + t_drop_generation_with_never_used_iterator(_Config) -> %% This test checks how the iterator behaves when: %% 1) it's created at generation 1 and not consumed from. @@ -549,6 +614,7 @@ iterate(DB, It0, BatchSize, Acc) -> all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), Apps = emqx_cth_suite:start( [mria, emqx_durable_storage], #{work_dir => ?config(priv_dir, Config)} diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 72bb70949..173669919 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -219,6 +219,69 @@ t_replay(_Config) -> ?assert(check(?SHARD, <<"#">>, 0, Messages)), ok. +t_atomic_store_batch(_Config) -> + DB = ?FUNCTION_NAME, + ?check_trace( + begin + application:set_env(emqx_durable_storage, egress_batch_size, 1), + Msgs = [ + make_message(0, <<"1">>, <<"1">>), + make_message(1, <<"2">>, <<"2">>), + make_message(2, <<"3">>, <<"3">>) + ], + ?assertEqual( + ok, + emqx_ds:store_batch(DB, Msgs, #{ + atomic => true, + sync => true + }) + ), + + ok + end, + fun(Trace) -> + %% Must contain exactly one flush with all messages. + ?assertMatch( + [#{batch := [_, _, _]}], + ?of_kind(emqx_ds_replication_layer_egress_flush, Trace) + ), + ok + end + ), + ok. + +t_non_atomic_store_batch(_Config) -> + DB = ?FUNCTION_NAME, + ?check_trace( + begin + application:set_env(emqx_durable_storage, egress_batch_size, 1), + Msgs = [ + make_message(0, <<"1">>, <<"1">>), + make_message(1, <<"2">>, <<"2">>), + make_message(2, <<"3">>, <<"3">>) + ], + %% Non-atomic batches may be split. + ?assertEqual( + ok, + emqx_ds:store_batch(DB, Msgs, #{ + atomic => false, + sync => true + }) + ), + + ok + end, + fun(Trace) -> + %% Should contain one flush per message. + ?assertMatch( + [#{batch := [_]}, #{batch := [_]}, #{batch := [_]}], + ?of_kind(emqx_ds_replication_layer_egress_flush, Trace) + ), + ok + end + ), + ok. + check(Shard, TopicFilter, StartTime, ExpectedMessages) -> ExpectedFiltered = lists:filter( fun(#message{topic = Topic, timestamp = TS}) -> @@ -418,6 +481,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). suite() -> [{timetrap, {seconds, 20}}]. init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), Apps = emqx_cth_suite:start( [emqx_durable_storage], #{work_dir => emqx_cth_suite:work_dir(Config)}