feat(ds): adopt buffer interface to `emqx_ds:operation()`

This commit is contained in:
Andrew Mayorov 2024-07-30 16:53:38 +02:00
parent 451b03ff99
commit 3b5d98c1d9
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
4 changed files with 89 additions and 73 deletions

View File

@ -43,7 +43,7 @@
%% `emqx_ds_buffer':
init_buffer/3,
flush_buffer/4,
shard_of_message/4
shard_of_operation/4
]).
%% Internal exports:
@ -55,6 +55,7 @@
-export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
-include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("emqx_durable_storage/include/emqx_ds.hrl").
%%================================================================================
%% Type declarations
@ -255,14 +256,23 @@ assign_message_timestamps(Latest, [], Acc) ->
assign_timestamp(TimestampUs, Message) ->
{TimestampUs, Message}.
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) -> shard().
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
N = emqx_ds_builtin_local_meta:n_shards(DB),
Hash =
-spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic, _Options) -> shard().
shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
case SerializeBy of
clientid -> erlang:phash2(From, N);
topic -> erlang:phash2(Topic, N)
clientid -> Key = From;
topic -> Key = Topic
end,
shard_of_key(DB, Key);
shard_of_operation(DB, {_, #message_matcher{from = From, topic = Topic}}, SerializeBy, _Options) ->
case SerializeBy of
clientid -> Key = From;
topic -> Key = Topic
end,
shard_of_key(DB, Key).
shard_of_key(DB, Key) ->
N = emqx_ds_builtin_local_meta:n_shards(DB),
Hash = erlang:phash2(Key, N),
integer_to_binary(Hash).
-spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) ->

View File

@ -29,7 +29,7 @@
current_timestamp/2,
shard_of_message/4,
shard_of_operation/4,
flush_buffer/4,
init_buffer/3
]).
@ -392,15 +392,30 @@ flush_buffer(DB, Shard, Messages, State) ->
end,
{State, Result}.
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) ->
-spec shard_of_operation(
emqx_ds:db(),
emqx_ds:operation() | emqx_ds:precondition(),
clientid | topic,
_Options
) ->
emqx_ds_replication_layer:shard_id().
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
N = emqx_ds_replication_shard_allocator:n_shards(DB),
Hash =
shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
case SerializeBy of
clientid -> erlang:phash2(From, N);
topic -> erlang:phash2(Topic, N)
clientid -> Key = From;
topic -> Key = Topic
end,
shard_of_key(DB, Key);
shard_of_operation(DB, {_OpName, Matcher}, SerializeBy, _Options) ->
#message_matcher{from = From, topic = Topic} = Matcher,
case SerializeBy of
clientid -> Key = From;
topic -> Key = Topic
end,
shard_of_key(DB, Key).
shard_of_key(DB, Key) ->
N = emqx_ds_replication_shard_allocator:n_shards(DB),
Hash = erlang:phash2(Key, N),
integer_to_binary(Hash).
%%================================================================================

View File

@ -21,7 +21,7 @@
-behaviour(gen_server).
%% API:
-export([start_link/4, store_batch/3, shard_of_message/3]).
-export([start_link/4, store_batch/3, shard_of_operation/3]).
-export([ls/0]).
%% behavior callbacks:
@ -46,19 +46,18 @@
-define(cbm(DB), {?MODULE, DB}).
-record(enqueue_req, {
messages :: [emqx_types:message()],
operations :: [emqx_ds:operation()],
sync :: boolean(),
atomic :: boolean(),
n_messages :: non_neg_integer(),
n_operations :: non_neg_integer(),
payload_bytes :: non_neg_integer()
}).
-callback init_buffer(emqx_ds:db(), _Shard, _Options) -> {ok, _State}.
-callback flush_buffer(emqx_ds:db(), _Shard, [emqx_types:message()], State) ->
-callback flush_buffer(emqx_ds:db(), _Shard, [emqx_ds:operation()], State) ->
{State, ok | {error, recoverable | unrecoverable, _}}.
-callback shard_of_message(emqx_ds:db(), emqx_types:message(), topic | clientid, _Options) ->
-callback shard_of_operation(emqx_ds:db(), emqx_ds:operation(), topic | clientid, _Options) ->
_Shard.
%%================================================================================
@ -77,39 +76,33 @@ start_link(CallbackModule, CallbackOptions, DB, Shard) ->
?via(DB, Shard), ?MODULE, [CallbackModule, CallbackOptions, DB, Shard], []
).
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
-spec store_batch(emqx_ds:db(), [emqx_ds:operation()], emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result().
store_batch(DB, Messages, Opts) ->
store_batch(DB, Operations, Opts) ->
Sync = maps:get(sync, Opts, true),
Atomic = maps:get(atomic, Opts, false),
%% Usually we expect all messages in the batch to go into the
%% single shard, so this function is optimized for the happy case.
case shards_of_batch(DB, Messages) of
[{Shard, {NMsgs, NBytes}}] ->
case shards_of_batch(DB, Operations) of
[{Shard, {NOps, NBytes}}] ->
%% Happy case:
enqueue_call_or_cast(
?via(DB, Shard),
#enqueue_req{
messages = Messages,
operations = Operations,
sync = Sync,
atomic = Atomic,
n_messages = NMsgs,
n_operations = NOps,
payload_bytes = NBytes
}
);
[_, _ | _] when Atomic ->
%% It's impossible to commit a batch to multiple shards
%% atomically
{error, unrecoverable, atomic_commit_to_multiple_shards};
_Shards ->
%% Use a slower implementation for the unlikely case:
repackage_messages(DB, Messages, Sync)
repackage_messages(DB, Operations, Sync)
end.
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> _Shard.
shard_of_message(DB, Message, ShardBy) ->
-spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic) -> _Shard.
shard_of_operation(DB, Operation, ShardBy) ->
{CBM, Options} = persistent_term:get(?cbm(DB)),
CBM:shard_of_message(DB, Message, ShardBy, Options).
CBM:shard_of_operation(DB, Operation, ShardBy, Options).
%%================================================================================
%% behavior callbacks
@ -129,7 +122,7 @@ shard_of_message(DB, Message, ShardBy) ->
n = 0 :: non_neg_integer(),
n_bytes = 0 :: non_neg_integer(),
tref :: undefined | reference(),
queue :: queue:queue(emqx_types:message()),
queue :: queue:queue(emqx_ds:operation()),
pending_replies = [] :: [gen_server:from()]
}).
@ -168,31 +161,29 @@ format_status(Status) ->
handle_call(
#enqueue_req{
messages = Msgs,
operations = Operations,
sync = Sync,
atomic = Atomic,
n_messages = NMsgs,
n_operations = NOps,
payload_bytes = NBytes
},
From,
S0 = #s{pending_replies = Replies0}
) ->
S = S0#s{pending_replies = [From | Replies0]},
{noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)};
{noreply, enqueue(Sync, Operations, NOps, NBytes, S)};
handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}.
handle_cast(
#enqueue_req{
messages = Msgs,
operations = Operations,
sync = Sync,
atomic = Atomic,
n_messages = NMsgs,
n_operations = NOps,
payload_bytes = NBytes
},
S
) ->
{noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)};
{noreply, enqueue(Sync, Operations, NOps, NBytes, S)};
handle_cast(_Cast, S) ->
{noreply, S}.
@ -215,11 +206,10 @@ terminate(_Reason, #s{db = DB}) ->
enqueue(
Sync,
Atomic,
Msgs,
Ops,
BatchSize,
BatchBytes,
S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0}
S0 = #s{n = NOps0, n_bytes = NBytes0, queue = Q0}
) ->
%% At this point we don't split the batches, even when they aren't
%% atomic. It wouldn't win us anything in terms of memory, and
@ -227,18 +217,18 @@ enqueue(
%% granularity should be fine enough.
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity),
NMsgs = NMsgs0 + BatchSize,
NMsgs = NOps0 + BatchSize,
NBytes = NBytes0 + BatchBytes,
case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of
case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NOps0 > 0) of
true ->
%% Adding this batch would cause buffer to overflow. Flush
%% it now, and retry:
S1 = flush(S0),
enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1);
enqueue(Sync, Ops, BatchSize, BatchBytes, S1);
false ->
%% The buffer is empty, we enqueue the atomic batch in its
%% entirety:
Q1 = lists:foldl(fun queue:in/2, Q0, Msgs),
Q1 = lists:foldl(fun queue:in/2, Q0, Ops),
S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1},
case NMsgs >= NMax orelse NBytes >= NBytesMax of
true ->
@ -336,18 +326,18 @@ do_flush(
}
end.
-spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) ->
-spec shards_of_batch(emqx_ds:db(), [emqx_ds:operation()]) ->
[{_ShardId, {NMessages, NBytes}}]
when
NMessages :: non_neg_integer(),
NBytes :: non_neg_integer().
shards_of_batch(DB, Messages) ->
shards_of_batch(DB, Batch) ->
maps:to_list(
lists:foldl(
fun(Message, Acc) ->
fun(Operation, Acc) ->
%% TODO: sharding strategy must be part of the DS DB schema:
Shard = shard_of_message(DB, Message, clientid),
Size = payload_size(Message),
Shard = shard_of_operation(DB, Operation, clientid),
Size = payload_size(Operation),
maps:update_with(
Shard,
fun({N, S}) ->
@ -358,36 +348,35 @@ shards_of_batch(DB, Messages) ->
)
end,
#{},
Messages
Batch
)
).
repackage_messages(DB, Messages, Sync) ->
repackage_messages(DB, Batch, Sync) ->
Batches = lists:foldl(
fun(Message, Acc) ->
Shard = shard_of_message(DB, Message, clientid),
Size = payload_size(Message),
fun(Operation, Acc) ->
Shard = shard_of_operation(DB, Operation, clientid),
Size = payload_size(Operation),
maps:update_with(
Shard,
fun({N, S, Msgs}) ->
{N + 1, S + Size, [Message | Msgs]}
{N + 1, S + Size, [Operation | Msgs]}
end,
{1, Size, [Message]},
{1, Size, [Operation]},
Acc
)
end,
#{},
Messages
Batch
),
maps:fold(
fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) ->
fun(Shard, {NOps, ByteSize, RevOperations}, ErrAcc) ->
Err = enqueue_call_or_cast(
?via(DB, Shard),
#enqueue_req{
messages = lists:reverse(RevMessages),
operations = lists:reverse(RevOperations),
sync = Sync,
atomic = false,
n_messages = NMsgs,
n_operations = NOps,
payload_bytes = ByteSize
}
),
@ -427,4 +416,6 @@ cancel_timer(S = #s{tref = TRef}) ->
%% @doc Return approximate size of the MQTT message (it doesn't take
%% all things into account, for example headers and extras)
payload_size(#message{payload = P, topic = T}) ->
size(P) + size(T).
size(P) + size(T);
payload_size({_OpName, _}) ->
0.

View File

@ -377,7 +377,7 @@ nodes_of_clientid(DB, ClientId, Nodes = [N0 | _]) ->
shard_of_clientid(DB, Node, ClientId) ->
?ON(
Node,
emqx_ds_buffer:shard_of_message(DB, #message{from = ClientId}, clientid)
emqx_ds_buffer:shard_of_operation(DB, #message{from = ClientId}, clientid)
).
%% Consume eagerly: