From 3b5d98c1d9b456b1e8488d33a6a6b3aa0feb6305 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 30 Jul 2024 16:53:38 +0200 Subject: [PATCH] feat(ds): adopt buffer interface to `emqx_ds:operation()` --- .../src/emqx_ds_builtin_local.erl | 26 +++-- .../src/emqx_ds_replication_layer.erl | 31 ++++-- .../src/emqx_ds_buffer.erl | 103 ++++++++---------- .../test/emqx_ds_test_helpers.erl | 2 +- 4 files changed, 89 insertions(+), 73 deletions(-) diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl index a7cc795b6..29fc98083 100644 --- a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl @@ -43,7 +43,7 @@ %% `emqx_ds_buffer': init_buffer/3, flush_buffer/4, - shard_of_message/4 + shard_of_operation/4 ]). %% Internal exports: @@ -55,6 +55,7 @@ -export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]). -include_lib("emqx_utils/include/emqx_message.hrl"). +-include_lib("emqx_durable_storage/include/emqx_ds.hrl"). %%================================================================================ %% Type declarations @@ -255,14 +256,23 @@ assign_message_timestamps(Latest, [], Acc) -> assign_timestamp(TimestampUs, Message) -> {TimestampUs, Message}. --spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) -> shard(). -shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) -> +-spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic, _Options) -> shard(). +shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) -> + case SerializeBy of + clientid -> Key = From; + topic -> Key = Topic + end, + shard_of_key(DB, Key); +shard_of_operation(DB, {_, #message_matcher{from = From, topic = Topic}}, SerializeBy, _Options) -> + case SerializeBy of + clientid -> Key = From; + topic -> Key = Topic + end, + shard_of_key(DB, Key). + +shard_of_key(DB, Key) -> N = emqx_ds_builtin_local_meta:n_shards(DB), - Hash = - case SerializeBy of - clientid -> erlang:phash2(From, N); - topic -> erlang:phash2(Topic, N) - end, + Hash = erlang:phash2(Key, N), integer_to_binary(Hash). -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index 669abdbf1..f54981884 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -29,7 +29,7 @@ current_timestamp/2, - shard_of_message/4, + shard_of_operation/4, flush_buffer/4, init_buffer/3 ]). @@ -392,15 +392,30 @@ flush_buffer(DB, Shard, Messages, State) -> end, {State, Result}. --spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) -> +-spec shard_of_operation( + emqx_ds:db(), + emqx_ds:operation() | emqx_ds:precondition(), + clientid | topic, + _Options +) -> emqx_ds_replication_layer:shard_id(). -shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) -> +shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) -> + case SerializeBy of + clientid -> Key = From; + topic -> Key = Topic + end, + shard_of_key(DB, Key); +shard_of_operation(DB, {_OpName, Matcher}, SerializeBy, _Options) -> + #message_matcher{from = From, topic = Topic} = Matcher, + case SerializeBy of + clientid -> Key = From; + topic -> Key = Topic + end, + shard_of_key(DB, Key). + +shard_of_key(DB, Key) -> N = emqx_ds_replication_shard_allocator:n_shards(DB), - Hash = - case SerializeBy of - clientid -> erlang:phash2(From, N); - topic -> erlang:phash2(Topic, N) - end, + Hash = erlang:phash2(Key, N), integer_to_binary(Hash). %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl index dec9eea80..cf83c8f2e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl @@ -21,7 +21,7 @@ -behaviour(gen_server). %% API: --export([start_link/4, store_batch/3, shard_of_message/3]). +-export([start_link/4, store_batch/3, shard_of_operation/3]). -export([ls/0]). %% behavior callbacks: @@ -46,19 +46,18 @@ -define(cbm(DB), {?MODULE, DB}). -record(enqueue_req, { - messages :: [emqx_types:message()], + operations :: [emqx_ds:operation()], sync :: boolean(), - atomic :: boolean(), - n_messages :: non_neg_integer(), + n_operations :: non_neg_integer(), payload_bytes :: non_neg_integer() }). -callback init_buffer(emqx_ds:db(), _Shard, _Options) -> {ok, _State}. --callback flush_buffer(emqx_ds:db(), _Shard, [emqx_types:message()], State) -> +-callback flush_buffer(emqx_ds:db(), _Shard, [emqx_ds:operation()], State) -> {State, ok | {error, recoverable | unrecoverable, _}}. --callback shard_of_message(emqx_ds:db(), emqx_types:message(), topic | clientid, _Options) -> +-callback shard_of_operation(emqx_ds:db(), emqx_ds:operation(), topic | clientid, _Options) -> _Shard. %%================================================================================ @@ -77,39 +76,33 @@ start_link(CallbackModule, CallbackOptions, DB, Shard) -> ?via(DB, Shard), ?MODULE, [CallbackModule, CallbackOptions, DB, Shard], [] ). --spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> +-spec store_batch(emqx_ds:db(), [emqx_ds:operation()], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). -store_batch(DB, Messages, Opts) -> +store_batch(DB, Operations, Opts) -> Sync = maps:get(sync, Opts, true), - Atomic = maps:get(atomic, Opts, false), %% Usually we expect all messages in the batch to go into the %% single shard, so this function is optimized for the happy case. - case shards_of_batch(DB, Messages) of - [{Shard, {NMsgs, NBytes}}] -> + case shards_of_batch(DB, Operations) of + [{Shard, {NOps, NBytes}}] -> %% Happy case: enqueue_call_or_cast( ?via(DB, Shard), #enqueue_req{ - messages = Messages, + operations = Operations, sync = Sync, - atomic = Atomic, - n_messages = NMsgs, + n_operations = NOps, payload_bytes = NBytes } ); - [_, _ | _] when Atomic -> - %% It's impossible to commit a batch to multiple shards - %% atomically - {error, unrecoverable, atomic_commit_to_multiple_shards}; _Shards -> %% Use a slower implementation for the unlikely case: - repackage_messages(DB, Messages, Sync) + repackage_messages(DB, Operations, Sync) end. --spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> _Shard. -shard_of_message(DB, Message, ShardBy) -> +-spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic) -> _Shard. +shard_of_operation(DB, Operation, ShardBy) -> {CBM, Options} = persistent_term:get(?cbm(DB)), - CBM:shard_of_message(DB, Message, ShardBy, Options). + CBM:shard_of_operation(DB, Operation, ShardBy, Options). %%================================================================================ %% behavior callbacks @@ -129,7 +122,7 @@ shard_of_message(DB, Message, ShardBy) -> n = 0 :: non_neg_integer(), n_bytes = 0 :: non_neg_integer(), tref :: undefined | reference(), - queue :: queue:queue(emqx_types:message()), + queue :: queue:queue(emqx_ds:operation()), pending_replies = [] :: [gen_server:from()] }). @@ -168,31 +161,29 @@ format_status(Status) -> handle_call( #enqueue_req{ - messages = Msgs, + operations = Operations, sync = Sync, - atomic = Atomic, - n_messages = NMsgs, + n_operations = NOps, payload_bytes = NBytes }, From, S0 = #s{pending_replies = Replies0} ) -> S = S0#s{pending_replies = [From | Replies0]}, - {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; + {noreply, enqueue(Sync, Operations, NOps, NBytes, S)}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. handle_cast( #enqueue_req{ - messages = Msgs, + operations = Operations, sync = Sync, - atomic = Atomic, - n_messages = NMsgs, + n_operations = NOps, payload_bytes = NBytes }, S ) -> - {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; + {noreply, enqueue(Sync, Operations, NOps, NBytes, S)}; handle_cast(_Cast, S) -> {noreply, S}. @@ -215,11 +206,10 @@ terminate(_Reason, #s{db = DB}) -> enqueue( Sync, - Atomic, - Msgs, + Ops, BatchSize, BatchBytes, - S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0} + S0 = #s{n = NOps0, n_bytes = NBytes0, queue = Q0} ) -> %% At this point we don't split the batches, even when they aren't %% atomic. It wouldn't win us anything in terms of memory, and @@ -227,18 +217,18 @@ enqueue( %% granularity should be fine enough. NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity), - NMsgs = NMsgs0 + BatchSize, + NMsgs = NOps0 + BatchSize, NBytes = NBytes0 + BatchBytes, - case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of + case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NOps0 > 0) of true -> %% Adding this batch would cause buffer to overflow. Flush %% it now, and retry: S1 = flush(S0), - enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); + enqueue(Sync, Ops, BatchSize, BatchBytes, S1); false -> %% The buffer is empty, we enqueue the atomic batch in its %% entirety: - Q1 = lists:foldl(fun queue:in/2, Q0, Msgs), + Q1 = lists:foldl(fun queue:in/2, Q0, Ops), S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1}, case NMsgs >= NMax orelse NBytes >= NBytesMax of true -> @@ -336,18 +326,18 @@ do_flush( } end. --spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) -> +-spec shards_of_batch(emqx_ds:db(), [emqx_ds:operation()]) -> [{_ShardId, {NMessages, NBytes}}] when NMessages :: non_neg_integer(), NBytes :: non_neg_integer(). -shards_of_batch(DB, Messages) -> +shards_of_batch(DB, Batch) -> maps:to_list( lists:foldl( - fun(Message, Acc) -> + fun(Operation, Acc) -> %% TODO: sharding strategy must be part of the DS DB schema: - Shard = shard_of_message(DB, Message, clientid), - Size = payload_size(Message), + Shard = shard_of_operation(DB, Operation, clientid), + Size = payload_size(Operation), maps:update_with( Shard, fun({N, S}) -> @@ -358,36 +348,35 @@ shards_of_batch(DB, Messages) -> ) end, #{}, - Messages + Batch ) ). -repackage_messages(DB, Messages, Sync) -> +repackage_messages(DB, Batch, Sync) -> Batches = lists:foldl( - fun(Message, Acc) -> - Shard = shard_of_message(DB, Message, clientid), - Size = payload_size(Message), + fun(Operation, Acc) -> + Shard = shard_of_operation(DB, Operation, clientid), + Size = payload_size(Operation), maps:update_with( Shard, fun({N, S, Msgs}) -> - {N + 1, S + Size, [Message | Msgs]} + {N + 1, S + Size, [Operation | Msgs]} end, - {1, Size, [Message]}, + {1, Size, [Operation]}, Acc ) end, #{}, - Messages + Batch ), maps:fold( - fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) -> + fun(Shard, {NOps, ByteSize, RevOperations}, ErrAcc) -> Err = enqueue_call_or_cast( ?via(DB, Shard), #enqueue_req{ - messages = lists:reverse(RevMessages), + operations = lists:reverse(RevOperations), sync = Sync, - atomic = false, - n_messages = NMsgs, + n_operations = NOps, payload_bytes = ByteSize } ), @@ -427,4 +416,6 @@ cancel_timer(S = #s{tref = TRef}) -> %% @doc Return approximate size of the MQTT message (it doesn't take %% all things into account, for example headers and extras) payload_size(#message{payload = P, topic = T}) -> - size(P) + size(T). + size(P) + size(T); +payload_size({_OpName, _}) -> + 0. diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index 08c08e0c5..cbf38bb62 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -377,7 +377,7 @@ nodes_of_clientid(DB, ClientId, Nodes = [N0 | _]) -> shard_of_clientid(DB, Node, ClientId) -> ?ON( Node, - emqx_ds_buffer:shard_of_message(DB, #message{from = ClientId}, clientid) + emqx_ds_buffer:shard_of_operation(DB, #message{from = ClientId}, clientid) ). %% Consume eagerly: