feat(ds): adopt buffer interface to `emqx_ds:operation()`

This commit is contained in:
Andrew Mayorov 2024-07-30 16:53:38 +02:00 committed by Thales Macedo Garitezi
parent 0aa4cdbaf3
commit 11951f8f6c
4 changed files with 89 additions and 73 deletions

View File

@ -43,7 +43,7 @@
%% `emqx_ds_buffer': %% `emqx_ds_buffer':
init_buffer/3, init_buffer/3,
flush_buffer/4, flush_buffer/4,
shard_of_message/4 shard_of_operation/4
]). ]).
%% Internal exports: %% Internal exports:
@ -55,6 +55,7 @@
-export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]). -export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
%%================================================================================ %%================================================================================
%% Type declarations %% Type declarations
@ -255,14 +256,23 @@ assign_message_timestamps(Latest, [], Acc) ->
assign_timestamp(TimestampUs, Message) -> assign_timestamp(TimestampUs, Message) ->
{TimestampUs, Message}. {TimestampUs, Message}.
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) -> shard(). -spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic, _Options) -> shard().
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) -> shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
case SerializeBy of
clientid -> Key = From;
topic -> Key = Topic
end,
shard_of_key(DB, Key);
shard_of_operation(DB, {_, #message_matcher{from = From, topic = Topic}}, SerializeBy, _Options) ->
case SerializeBy of
clientid -> Key = From;
topic -> Key = Topic
end,
shard_of_key(DB, Key).
shard_of_key(DB, Key) ->
N = emqx_ds_builtin_local_meta:n_shards(DB), N = emqx_ds_builtin_local_meta:n_shards(DB),
Hash = Hash = erlang:phash2(Key, N),
case SerializeBy of
clientid -> erlang:phash2(From, N);
topic -> erlang:phash2(Topic, N)
end,
integer_to_binary(Hash). integer_to_binary(Hash).
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->

View File

@ -29,7 +29,7 @@
current_timestamp/2, current_timestamp/2,
shard_of_message/4, shard_of_operation/4,
flush_buffer/4, flush_buffer/4,
init_buffer/3 init_buffer/3
]). ]).
@ -392,15 +392,30 @@ flush_buffer(DB, Shard, Messages, State) ->
end, end,
{State, Result}. {State, Result}.
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) -> -spec shard_of_operation(
emqx_ds:db(),
emqx_ds:operation() | emqx_ds:precondition(),
clientid | topic,
_Options
) ->
emqx_ds_replication_layer:shard_id(). emqx_ds_replication_layer:shard_id().
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) -> shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
case SerializeBy of
clientid -> Key = From;
topic -> Key = Topic
end,
shard_of_key(DB, Key);
shard_of_operation(DB, {_OpName, Matcher}, SerializeBy, _Options) ->
#message_matcher{from = From, topic = Topic} = Matcher,
case SerializeBy of
clientid -> Key = From;
topic -> Key = Topic
end,
shard_of_key(DB, Key).
shard_of_key(DB, Key) ->
N = emqx_ds_replication_shard_allocator:n_shards(DB), N = emqx_ds_replication_shard_allocator:n_shards(DB),
Hash = Hash = erlang:phash2(Key, N),
case SerializeBy of
clientid -> erlang:phash2(From, N);
topic -> erlang:phash2(Topic, N)
end,
integer_to_binary(Hash). integer_to_binary(Hash).
%%================================================================================ %%================================================================================

View File

@ -21,7 +21,7 @@
-behaviour(gen_server). -behaviour(gen_server).
%% API: %% API:
-export([start_link/4, store_batch/3, shard_of_message/3]). -export([start_link/4, store_batch/3, shard_of_operation/3]).
-export([ls/0]). -export([ls/0]).
%% behavior callbacks: %% behavior callbacks:
@ -46,19 +46,18 @@
-define(cbm(DB), {?MODULE, DB}). -define(cbm(DB), {?MODULE, DB}).
-record(enqueue_req, { -record(enqueue_req, {
messages :: [emqx_types:message()], operations :: [emqx_ds:operation()],
sync :: boolean(), sync :: boolean(),
atomic :: boolean(), n_operations :: non_neg_integer(),
n_messages :: non_neg_integer(),
payload_bytes :: non_neg_integer() payload_bytes :: non_neg_integer()
}). }).
-callback init_buffer(emqx_ds:db(), _Shard, _Options) -> {ok, _State}. -callback init_buffer(emqx_ds:db(), _Shard, _Options) -> {ok, _State}.
-callback flush_buffer(emqx_ds:db(), _Shard, [emqx_types:message()], State) -> -callback flush_buffer(emqx_ds:db(), _Shard, [emqx_ds:operation()], State) ->
{State, ok | {error, recoverable | unrecoverable, _}}. {State, ok | {error, recoverable | unrecoverable, _}}.
-callback shard_of_message(emqx_ds:db(), emqx_types:message(), topic | clientid, _Options) -> -callback shard_of_operation(emqx_ds:db(), emqx_ds:operation(), topic | clientid, _Options) ->
_Shard. _Shard.
%%================================================================================ %%================================================================================
@ -77,39 +76,33 @@ start_link(CallbackModule, CallbackOptions, DB, Shard) ->
?via(DB, Shard), ?MODULE, [CallbackModule, CallbackOptions, DB, Shard], [] ?via(DB, Shard), ?MODULE, [CallbackModule, CallbackOptions, DB, Shard], []
). ).
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> -spec store_batch(emqx_ds:db(), [emqx_ds:operation()], emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
store_batch(DB, Messages, Opts) -> store_batch(DB, Operations, Opts) ->
Sync = maps:get(sync, Opts, true), Sync = maps:get(sync, Opts, true),
Atomic = maps:get(atomic, Opts, false),
%% Usually we expect all messages in the batch to go into the %% Usually we expect all messages in the batch to go into the
%% single shard, so this function is optimized for the happy case. %% single shard, so this function is optimized for the happy case.
case shards_of_batch(DB, Messages) of case shards_of_batch(DB, Operations) of
[{Shard, {NMsgs, NBytes}}] -> [{Shard, {NOps, NBytes}}] ->
%% Happy case: %% Happy case:
enqueue_call_or_cast( enqueue_call_or_cast(
?via(DB, Shard), ?via(DB, Shard),
#enqueue_req{ #enqueue_req{
messages = Messages, operations = Operations,
sync = Sync, sync = Sync,
atomic = Atomic, n_operations = NOps,
n_messages = NMsgs,
payload_bytes = NBytes payload_bytes = NBytes
} }
); );
[_, _ | _] when Atomic ->
%% It's impossible to commit a batch to multiple shards
%% atomically
{error, unrecoverable, atomic_commit_to_multiple_shards};
_Shards -> _Shards ->
%% Use a slower implementation for the unlikely case: %% Use a slower implementation for the unlikely case:
repackage_messages(DB, Messages, Sync) repackage_messages(DB, Operations, Sync)
end. end.
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> _Shard. -spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic) -> _Shard.
shard_of_message(DB, Message, ShardBy) -> shard_of_operation(DB, Operation, ShardBy) ->
{CBM, Options} = persistent_term:get(?cbm(DB)), {CBM, Options} = persistent_term:get(?cbm(DB)),
CBM:shard_of_message(DB, Message, ShardBy, Options). CBM:shard_of_operation(DB, Operation, ShardBy, Options).
%%================================================================================ %%================================================================================
%% behavior callbacks %% behavior callbacks
@ -129,7 +122,7 @@ shard_of_message(DB, Message, ShardBy) ->
n = 0 :: non_neg_integer(), n = 0 :: non_neg_integer(),
n_bytes = 0 :: non_neg_integer(), n_bytes = 0 :: non_neg_integer(),
tref :: undefined | reference(), tref :: undefined | reference(),
queue :: queue:queue(emqx_types:message()), queue :: queue:queue(emqx_ds:operation()),
pending_replies = [] :: [gen_server:from()] pending_replies = [] :: [gen_server:from()]
}). }).
@ -168,31 +161,29 @@ format_status(Status) ->
handle_call( handle_call(
#enqueue_req{ #enqueue_req{
messages = Msgs, operations = Operations,
sync = Sync, sync = Sync,
atomic = Atomic, n_operations = NOps,
n_messages = NMsgs,
payload_bytes = NBytes payload_bytes = NBytes
}, },
From, From,
S0 = #s{pending_replies = Replies0} S0 = #s{pending_replies = Replies0}
) -> ) ->
S = S0#s{pending_replies = [From | Replies0]}, S = S0#s{pending_replies = [From | Replies0]},
{noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; {noreply, enqueue(Sync, Operations, NOps, NBytes, S)};
handle_call(_Call, _From, S) -> handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}. {reply, {error, unknown_call}, S}.
handle_cast( handle_cast(
#enqueue_req{ #enqueue_req{
messages = Msgs, operations = Operations,
sync = Sync, sync = Sync,
atomic = Atomic, n_operations = NOps,
n_messages = NMsgs,
payload_bytes = NBytes payload_bytes = NBytes
}, },
S S
) -> ) ->
{noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; {noreply, enqueue(Sync, Operations, NOps, NBytes, S)};
handle_cast(_Cast, S) -> handle_cast(_Cast, S) ->
{noreply, S}. {noreply, S}.
@ -215,11 +206,10 @@ terminate(_Reason, #s{db = DB}) ->
enqueue( enqueue(
Sync, Sync,
Atomic, Ops,
Msgs,
BatchSize, BatchSize,
BatchBytes, BatchBytes,
S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0} S0 = #s{n = NOps0, n_bytes = NBytes0, queue = Q0}
) -> ) ->
%% At this point we don't split the batches, even when they aren't %% At this point we don't split the batches, even when they aren't
%% atomic. It wouldn't win us anything in terms of memory, and %% atomic. It wouldn't win us anything in terms of memory, and
@ -227,18 +217,18 @@ enqueue(
%% granularity should be fine enough. %% granularity should be fine enough.
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity), NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity),
NMsgs = NMsgs0 + BatchSize, NMsgs = NOps0 + BatchSize,
NBytes = NBytes0 + BatchBytes, NBytes = NBytes0 + BatchBytes,
case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NOps0 > 0) of
true -> true ->
%% Adding this batch would cause buffer to overflow. Flush %% Adding this batch would cause buffer to overflow. Flush
%% it now, and retry: %% it now, and retry:
S1 = flush(S0), S1 = flush(S0),
enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); enqueue(Sync, Ops, BatchSize, BatchBytes, S1);
false -> false ->
%% The buffer is empty, we enqueue the atomic batch in its %% The buffer is empty, we enqueue the atomic batch in its
%% entirety: %% entirety:
Q1 = lists:foldl(fun queue:in/2, Q0, Msgs), Q1 = lists:foldl(fun queue:in/2, Q0, Ops),
S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1}, S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1},
case NMsgs >= NMax orelse NBytes >= NBytesMax of case NMsgs >= NMax orelse NBytes >= NBytesMax of
true -> true ->
@ -336,18 +326,18 @@ do_flush(
} }
end. end.
-spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) -> -spec shards_of_batch(emqx_ds:db(), [emqx_ds:operation()]) ->
[{_ShardId, {NMessages, NBytes}}] [{_ShardId, {NMessages, NBytes}}]
when when
NMessages :: non_neg_integer(), NMessages :: non_neg_integer(),
NBytes :: non_neg_integer(). NBytes :: non_neg_integer().
shards_of_batch(DB, Messages) -> shards_of_batch(DB, Batch) ->
maps:to_list( maps:to_list(
lists:foldl( lists:foldl(
fun(Message, Acc) -> fun(Operation, Acc) ->
%% TODO: sharding strategy must be part of the DS DB schema: %% TODO: sharding strategy must be part of the DS DB schema:
Shard = shard_of_message(DB, Message, clientid), Shard = shard_of_operation(DB, Operation, clientid),
Size = payload_size(Message), Size = payload_size(Operation),
maps:update_with( maps:update_with(
Shard, Shard,
fun({N, S}) -> fun({N, S}) ->
@ -358,36 +348,35 @@ shards_of_batch(DB, Messages) ->
) )
end, end,
#{}, #{},
Messages Batch
) )
). ).
repackage_messages(DB, Messages, Sync) -> repackage_messages(DB, Batch, Sync) ->
Batches = lists:foldl( Batches = lists:foldl(
fun(Message, Acc) -> fun(Operation, Acc) ->
Shard = shard_of_message(DB, Message, clientid), Shard = shard_of_operation(DB, Operation, clientid),
Size = payload_size(Message), Size = payload_size(Operation),
maps:update_with( maps:update_with(
Shard, Shard,
fun({N, S, Msgs}) -> fun({N, S, Msgs}) ->
{N + 1, S + Size, [Message | Msgs]} {N + 1, S + Size, [Operation | Msgs]}
end, end,
{1, Size, [Message]}, {1, Size, [Operation]},
Acc Acc
) )
end, end,
#{}, #{},
Messages Batch
), ),
maps:fold( maps:fold(
fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) -> fun(Shard, {NOps, ByteSize, RevOperations}, ErrAcc) ->
Err = enqueue_call_or_cast( Err = enqueue_call_or_cast(
?via(DB, Shard), ?via(DB, Shard),
#enqueue_req{ #enqueue_req{
messages = lists:reverse(RevMessages), operations = lists:reverse(RevOperations),
sync = Sync, sync = Sync,
atomic = false, n_operations = NOps,
n_messages = NMsgs,
payload_bytes = ByteSize payload_bytes = ByteSize
} }
), ),
@ -427,4 +416,6 @@ cancel_timer(S = #s{tref = TRef}) ->
%% @doc Return approximate size of the MQTT message (it doesn't take %% @doc Return approximate size of the MQTT message (it doesn't take
%% all things into account, for example headers and extras) %% all things into account, for example headers and extras)
payload_size(#message{payload = P, topic = T}) -> payload_size(#message{payload = P, topic = T}) ->
size(P) + size(T). size(P) + size(T);
payload_size({_OpName, _}) ->
0.

View File

@ -377,7 +377,7 @@ nodes_of_clientid(DB, ClientId, Nodes = [N0 | _]) ->
shard_of_clientid(DB, Node, ClientId) -> shard_of_clientid(DB, Node, ClientId) ->
?ON( ?ON(
Node, Node,
emqx_ds_buffer:shard_of_message(DB, #message{from = ClientId}, clientid) emqx_ds_buffer:shard_of_operation(DB, #message{from = ClientId}, clientid)
). ).
%% Consume eagerly: %% Consume eagerly: