From 1895b250e93c58374af2e4860d9208b564dce9ff Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 20 Feb 2024 20:33:57 +0100 Subject: [PATCH] wip: provide persist feedback to broker + alternative egress impl --- apps/emqx/src/emqx_broker.erl | 6 +- apps/emqx/src/emqx_channel.erl | 3 + apps/emqx/src/emqx_persistent_message.erl | 5 +- .../test/emqx_persistent_messages_SUITE.erl | 4 +- .../src/emqx_ds_replication_layer_egress.erl | 139 ++++++++---------- 5 files changed, 74 insertions(+), 83 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 40969ed02..eab342646 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -250,9 +250,11 @@ persist_publish(Msg) -> case emqx_persistent_message:persist(Msg) of ok -> [persisted]; - {_SkipOrError, _Reason} -> + {skipped, _Reason} -> + []; + {error, _Reason} = Error -> % TODO: log errors? - [] + [Error] end. %% Called internally diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index bb9c84e8c..7f3674be5 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -745,6 +745,9 @@ ensure_quota(PubRes, Channel = #channel{quota = Limiter}) -> pubrec_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; pubrec_reason_code([_ | _]) -> ?RC_SUCCESS. +puback_reason_code(_PacketId, _Msg, [{error, _Reason}]) -> + %% FIXME + undefined; puback_reason_code(PacketId, Msg, [] = PubRes) -> emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_NO_MATCHING_SUBSCRIBERS); puback_reason_code(PacketId, Msg, [_ | _] = PubRes) -> diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 7f1e60723..9ae961a8d 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -99,7 +99,10 @@ needs_persistence(Msg) -> -spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result(). store_message(Msg) -> - emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{sync => false}). + case emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{}) of + [ok] -> ok; + [error] -> {error, timeout} + end. has_subscribers(#message{topic = Topic}) -> emqx_persistent_session_ds_router:has_any_route(Topic). diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 5e52fd78e..a3a0ac6b8 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -386,7 +386,7 @@ t_message_gc(Config) -> message(<<"foo/bar">>, <<"1">>, 0), message(<<"foo/baz">>, <<"2">>, 1) ], - ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0), + [ok, ok] = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0), ?tp(inserted_batch, #{}), {ok, _} = ?block_until(#{?snk_kind := ps_message_gc_added_gen}), @@ -395,7 +395,7 @@ t_message_gc(Config) -> message(<<"foo/bar">>, <<"3">>, Now + 100), message(<<"foo/baz">>, <<"4">>, Now + 101) ], - ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1), + [ok, ok] = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1), {ok, _} = snabbkaffe:block_until( ?match_n_events(NShards, #{?snk_kind := message_gc_generation_dropped}), diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index e2fc17089..c222afdb2 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -27,6 +27,8 @@ %% servers, if needed. -module(emqx_ds_replication_layer_egress). +-include_lib("emqx_utils/include/emqx_message.hrl"). + -behaviour(gen_server). %% API: @@ -46,36 +48,43 @@ %% Type declarations %%================================================================================ --define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}). --define(flush, flush). +-define(DRAIN_TIMEOUT, 1). +-define(COOLDOWN_TIMEOUT_MIN, 1000). +-define(COOLDOWN_TIMEOUT_MAX, 5000). --record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}). +-define(name(DB, Shard), {n, l, {?MODULE, DB, Shard}}). +-define(via(DB, Shard), {via, gproc, ?name(DB, Shard)}). %%================================================================================ %% API functions %%================================================================================ --define(STORE_TIMEOUT, 60 * 1000). - -spec start_link(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, pid()}. start_link(DB, Shard) -> gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []). -spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> - ok. -store_batch(DB, Messages, Opts) -> - Sync = maps:get(sync, Opts, true), - lists:foreach( + [ok | error]. +store_batch(DB, Messages, _Opts) -> + Pid = self(), + Refs = lists:map( fun(MessageIn) -> + Ref = erlang:make_ref(), Message = emqx_message:set_timestamp(emqx_ds:timestamp_us(), MessageIn), Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), - ok = gen_server:call( - ?via(DB, Shard), - #enqueue_req{message = Message, sync = Sync}, - ?STORE_TIMEOUT - ) + _ = gproc:send(?name(DB, Shard), {Pid, Ref, Message}), + Ref end, Messages + ), + %% FIXME + lists:map( + fun(Ref) -> + receive + {Ref, Result} -> Result + end + end, + Refs ). %%================================================================================ @@ -84,11 +93,7 @@ store_batch(DB, Messages, Opts) -> -record(s, { db :: emqx_ds:db(), - shard :: emqx_ds_replication_layer:shard_id(), - n = 0 :: non_neg_integer(), - tref :: reference(), - batch = [] :: [emqx_types:message()], - pending_replies = [] :: [gen_server:from()] + shard :: emqx_ds_replication_layer:shard_id() }). init([DB, Shard]) -> @@ -96,21 +101,22 @@ init([DB, Shard]) -> process_flag(message_queue_data, off_heap), S = #s{ db = DB, - shard = Shard, - tref = start_timer() + shard = Shard }, {ok, S}. -handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) -> - do_enqueue(From, Sync, Msg, S); handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. handle_cast(_Cast, S) -> {noreply, S}. -handle_info(?flush, S) -> - {noreply, do_flush(S)}; +handle_info(Req = {_Pid, _Ref, #message{}}, S) -> + ok = timer:sleep(?DRAIN_TIMEOUT), + Batch = [Req | drain_requests(1, max_batch_size())], + _ = flush(Batch, S), + true = erlang:garbage_collect(), + {noreply, S}; handle_info(_Info, S) -> {noreply, S}. @@ -125,70 +131,47 @@ terminate(_Reason, _S) -> %% Internal functions %%================================================================================ --define(COOLDOWN_MIN, 1000). --define(COOLDOWN_MAX, 5000). +drain_requests(M, M) -> + [M]; +drain_requests(N, M) -> + receive + Req = {_Pid, _Ref, #message{}} -> + [Req | drain_requests(N + 1, M)] + after 0 -> + [N] + end. -do_flush(S = #s{batch = []}) -> - S#s{tref = start_timer()}; -do_flush( - S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard} -) -> +flush(Batch, #s{db = DB, shard = Shard}) -> %% FIXME - case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of + Messages = [Message || {_, _, Message} <- Batch], + case emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages) of ok -> - lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), + Size = reply(ok, Batch), ?tp( emqx_ds_replication_layer_egress_flush, - #{db => DB, shard => Shard} + #{db => DB, shard => Shard, size => Size} ), - true = erlang:garbage_collect(), - S#s{ - n = 0, - batch = [], - pending_replies = [], - tref = start_timer() - }; + ok; {error, Reason} -> + Size = reply(error, Batch), ?tp( warning, emqx_ds_replication_layer_egress_flush_failed, - #{db => DB, shard => Shard, reason => Reason} + #{db => DB, shard => Shard, size => Size, reason => Reason} ), - Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), - ok = timer:sleep(Cooldown), - S#s{tref = start_timer()} + ok = cooldown(), + {error, Reason} end. -do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> - NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), - S1 = S0#s{n = N + 1, batch = [Msg | Batch]}, - S2 = - case N >= NMax of - true -> - _ = erlang:cancel_timer(S0#s.tref), - do_flush(S1); - false -> - S1 - end, - %% TODO: later we may want to delay the reply until the message is - %% replicated, but it requies changes to the PUBACK/PUBREC flow to - %% allow for async replies. For now, we ack when the message is - %% _buffered_ rather than stored. - %% - %% Otherwise, the client would freeze for at least flush interval, - %% or until the buffer is filled. - S = - case Sync of - true -> - S2#s{pending_replies = [From | Replies]}; - false -> - gen_server:reply(From, ok), - S2 - end, - %% TODO: add a backpressure mechanism for the server to avoid - %% building a long message queue. - {noreply, S}. +reply(Result, [{Pid, Ref, _Message} | Rest]) -> + erlang:send(Pid, {Ref, Result}), + reply(Result, Rest); +reply(_Result, [Size]) when is_integer(Size) -> + Size. -start_timer() -> - Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), - erlang:send_after(Interval, self(), ?flush). +cooldown() -> + Timeout = ?COOLDOWN_TIMEOUT_MIN + rand:uniform(?COOLDOWN_TIMEOUT_MAX - ?COOLDOWN_TIMEOUT_MIN), + timer:sleep(Timeout). + +max_batch_size() -> + max(1, application:get_env(emqx_durable_storage, egress_batch_size, 1000)).