Merge pull request #12661 from thalesmg/ds-atomic-store-m-20240305

feat(ds): add atomic store API
This commit is contained in:
Thales Macedo Garitezi 2024-03-06 17:48:28 -03:00 committed by GitHub
commit c62dd56a4a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 208 additions and 11 deletions

View File

@ -150,7 +150,16 @@
-type message_store_opts() :: -type message_store_opts() ::
#{ #{
sync => boolean() %% Whether to wait until the message storage has been acknowledged to return from
%% `store_batch'.
%% Default: `true'.
sync => boolean(),
%% Whether the whole batch given to `store_batch' should be inserted atomically as
%% a unit. Note: the whole batch must be crafted so that it belongs to a single
%% shard (if applicable to the backend), as the batch will be split accordingly
%% even if this flag is `true'.
%% Default: `false'.
atomic => boolean()
}. }.
-type generic_db_opts() :: -type generic_db_opts() ::

View File

@ -51,6 +51,7 @@
-define(flush, flush). -define(flush, flush).
-record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}). -record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}).
-record(enqueue_atomic_req, {batch :: [emqx_types:message()], sync :: boolean()}).
%%================================================================================ %%================================================================================
%% API functions %% API functions
@ -64,13 +65,34 @@ start_link(DB, Shard) ->
ok. ok.
store_batch(DB, Messages, Opts) -> store_batch(DB, Messages, Opts) ->
Sync = maps:get(sync, Opts, true), Sync = maps:get(sync, Opts, true),
lists:foreach( case maps:get(atomic, Opts, false) of
fun(Message) -> false ->
Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), lists:foreach(
gen_server:call(?via(DB, Shard), #enqueue_req{message = Message, sync = Sync}) fun(Message) ->
end, Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
Messages gen_server:call(?via(DB, Shard), #enqueue_req{
). message = Message,
sync = Sync
})
end,
Messages
);
true ->
maps:foreach(
fun(Shard, Batch) ->
gen_server:call(?via(DB, Shard), #enqueue_atomic_req{
batch = Batch,
sync = Sync
})
end,
maps:groups_from_list(
fun(Message) ->
emqx_ds_replication_layer:shard_of_message(DB, Message, clientid)
end,
Messages
)
)
end.
%%================================================================================ %%================================================================================
%% behavior callbacks %% behavior callbacks
@ -101,6 +123,9 @@ init([DB, Shard]) ->
handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) -> handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) ->
do_enqueue(From, Sync, Msg, S); do_enqueue(From, Sync, Msg, S);
handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync}, From, S) ->
Len = length(Batch),
do_enqueue(From, Sync, {atomic, Len, Batch}, S);
handle_call(_Call, _From, S) -> handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}. {reply, {error, unknown_call}, S}.
@ -131,7 +156,7 @@ do_flush(
Batch = #{?tag => ?BATCH, ?batch_messages => lists:reverse(Messages)}, Batch = #{?tag => ?BATCH, ?batch_messages => lists:reverse(Messages)},
ok = emqx_ds_proto_v2:store_batch(Leader, DB, Shard, Batch, #{}), ok = emqx_ds_proto_v2:store_batch(Leader, DB, Shard, Batch, #{}),
[gen_server:reply(From, ok) || From <- lists:reverse(Replies)], [gen_server:reply(From, ok) || From <- lists:reverse(Replies)],
?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard}), ?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages}),
erlang:garbage_collect(), erlang:garbage_collect(),
S#s{ S#s{
n = 0, n = 0,
@ -140,9 +165,15 @@ do_flush(
tref = start_timer() tref = start_timer()
}. }.
do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) ->
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
S1 = S0#s{n = N + 1, batch = [Msg | Batch]}, S1 =
case MsgOrBatch of
{atomic, NumMsgs, Msgs} ->
S0#s{n = N + NumMsgs, batch = Msgs ++ Batch};
Msg ->
S0#s{n = N + 1, batch = [Msg | Batch]}
end,
S2 = S2 =
case N >= NMax of case N >= NMax of
true -> true ->

View File

@ -230,6 +230,19 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{}) ->
emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts() emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
) -> ) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options = #{atomic := true}) ->
{ok, Batch} = rocksdb:batch(),
lists:foreach(
fun(Msg) ->
{Key, _} = make_key(S, Msg),
Val = serialize(Msg),
rocksdb:batch_put(Batch, Data, Key, Val)
end,
Messages
),
Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []),
rocksdb:release_batch(Batch),
Res;
store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
lists:foreach( lists:foreach(
fun(Msg) -> fun(Msg) ->

View File

@ -90,6 +90,20 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
ok = rocksdb:drop_column_family(DBHandle, CFHandle), ok = rocksdb:drop_column_family(DBHandle, CFHandle),
ok. ok.
store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := true}) ->
{ok, Batch} = rocksdb:batch(),
lists:foreach(
fun(Msg) ->
Id = erlang:unique_integer([monotonic]),
Key = <<Id:64>>,
Val = term_to_binary(Msg),
rocksdb:batch_put(Batch, CF, Key, Val)
end,
Messages
),
Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []),
rocksdb:release_batch(Batch),
Res;
store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
lists:foreach( lists:foreach(
fun(Msg) -> fun(Msg) ->

View File

@ -307,6 +307,71 @@ t_08_smoke_list_drop_generation(_Config) ->
), ),
ok. ok.
t_09_atomic_store_batch(_Config) ->
DB = ?FUNCTION_NAME,
?check_trace(
begin
application:set_env(emqx_durable_storage, egress_batch_size, 1),
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
Msgs = [
message(<<"1">>, <<"1">>, 0),
message(<<"2">>, <<"2">>, 1),
message(<<"3">>, <<"3">>, 2)
],
?assertEqual(
ok,
emqx_ds:store_batch(DB, Msgs, #{
atomic => true,
sync => true
})
),
ok
end,
fun(Trace) ->
%% Must contain exactly one flush with all messages.
?assertMatch(
[#{batch := [_, _, _]}],
?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
),
ok
end
),
ok.
t_10_non_atomic_store_batch(_Config) ->
DB = ?FUNCTION_NAME,
?check_trace(
begin
application:set_env(emqx_durable_storage, egress_batch_size, 1),
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
Msgs = [
message(<<"1">>, <<"1">>, 0),
message(<<"2">>, <<"2">>, 1),
message(<<"3">>, <<"3">>, 2)
],
%% Non-atomic batches may be split.
?assertEqual(
ok,
emqx_ds:store_batch(DB, Msgs, #{
atomic => false,
sync => true
})
),
ok
end,
fun(Trace) ->
%% Should contain one flush per message.
?assertMatch(
[#{batch := [_]}, #{batch := [_]}, #{batch := [_]}],
?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
),
ok
end
),
ok.
t_drop_generation_with_never_used_iterator(_Config) -> t_drop_generation_with_never_used_iterator(_Config) ->
%% This test checks how the iterator behaves when: %% This test checks how the iterator behaves when:
%% 1) it's created at generation 1 and not consumed from. %% 1) it's created at generation 1 and not consumed from.
@ -549,6 +614,7 @@ iterate(DB, It0, BatchSize, Acc) ->
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
Apps = emqx_cth_suite:start( Apps = emqx_cth_suite:start(
[mria, emqx_durable_storage], [mria, emqx_durable_storage],
#{work_dir => ?config(priv_dir, Config)} #{work_dir => ?config(priv_dir, Config)}

View File

@ -219,6 +219,69 @@ t_replay(_Config) ->
?assert(check(?SHARD, <<"#">>, 0, Messages)), ?assert(check(?SHARD, <<"#">>, 0, Messages)),
ok. ok.
t_atomic_store_batch(_Config) ->
DB = ?FUNCTION_NAME,
?check_trace(
begin
application:set_env(emqx_durable_storage, egress_batch_size, 1),
Msgs = [
make_message(0, <<"1">>, <<"1">>),
make_message(1, <<"2">>, <<"2">>),
make_message(2, <<"3">>, <<"3">>)
],
?assertEqual(
ok,
emqx_ds:store_batch(DB, Msgs, #{
atomic => true,
sync => true
})
),
ok
end,
fun(Trace) ->
%% Must contain exactly one flush with all messages.
?assertMatch(
[#{batch := [_, _, _]}],
?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
),
ok
end
),
ok.
t_non_atomic_store_batch(_Config) ->
DB = ?FUNCTION_NAME,
?check_trace(
begin
application:set_env(emqx_durable_storage, egress_batch_size, 1),
Msgs = [
make_message(0, <<"1">>, <<"1">>),
make_message(1, <<"2">>, <<"2">>),
make_message(2, <<"3">>, <<"3">>)
],
%% Non-atomic batches may be split.
?assertEqual(
ok,
emqx_ds:store_batch(DB, Msgs, #{
atomic => false,
sync => true
})
),
ok
end,
fun(Trace) ->
%% Should contain one flush per message.
?assertMatch(
[#{batch := [_]}, #{batch := [_]}, #{batch := [_]}],
?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
),
ok
end
),
ok.
check(Shard, TopicFilter, StartTime, ExpectedMessages) -> check(Shard, TopicFilter, StartTime, ExpectedMessages) ->
ExpectedFiltered = lists:filter( ExpectedFiltered = lists:filter(
fun(#message{topic = Topic, timestamp = TS}) -> fun(#message{topic = Topic, timestamp = TS}) ->
@ -418,6 +481,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
suite() -> [{timetrap, {seconds, 20}}]. suite() -> [{timetrap, {seconds, 20}}].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
Apps = emqx_cth_suite:start( Apps = emqx_cth_suite:start(
[emqx_durable_storage], [emqx_durable_storage],
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Config)}