diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 3f9188312..1b2ac30dd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -130,7 +130,7 @@ store_batch(DB, Messages, Opts) -> n = 0 :: non_neg_integer(), n_bytes = 0 :: non_neg_integer(), tref :: reference(), - batch = [] :: [emqx_types:message()], + queue :: queue:queue(emqx_types:message()), pending_replies = [] :: [gen_server:from()] }). @@ -143,7 +143,8 @@ init([DB, Shard]) -> db = DB, shard = Shard, metrics_id = MetricsId, - tref = start_timer() + tref = start_timer(), + queue = queue:new() }, {ok, S}. @@ -151,7 +152,7 @@ handle_call(#enqueue_req{message = Msg, sync = Sync, payload_bytes = NBytes}, Fr do_enqueue(From, Sync, Msg, NBytes, S); handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync, payload_bytes = NBytes}, From, S) -> Len = length(Batch), - do_enqueue(From, Sync, {atomic, Len, NBytes, Batch}, NBytes, S); + do_enqueue(From, Sync, {atomic, Len, Batch}, NBytes, S); handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -177,63 +178,59 @@ terminate(_Reason, _S) -> -define(COOLDOWN_MIN, 1000). -define(COOLDOWN_MAX, 5000). -do_flush(S = #s{batch = []}) -> - S#s{tref = start_timer()}; do_flush( - S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard} + S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard} ) -> - case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of + Messages = queue:to_list(Q), + case emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages) of ok -> emqx_ds_builtin_metrics:inc_egress_batches(S#s.metrics_id), emqx_ds_builtin_metrics:inc_egress_messages(S#s.metrics_id, S#s.n), emqx_ds_builtin_metrics:inc_egress_bytes(S#s.metrics_id, S#s.n_bytes), lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), - true = erlang:garbage_collect(), ?tp( emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages} ), lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), - true = erlang:garbage_collect(), - ok; + erlang:garbage_collect(), + S#s{ + n = 0, + n_bytes = 0, + queue = queue:new(), + pending_replies = [], + tref = start_timer() + }; Error -> emqx_ds_builtin_metrics:inc_egress_batches_retry(S#s.metrics_id), - true = erlang:garbage_collect(), + erlang:garbage_collect(), ?tp( warning, emqx_ds_replication_layer_egress_flush_failed, #{db => DB, shard => Shard, reason => Error} ), Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), - ok = timer:sleep(Cooldown), - %% Since we drop the entire batch here, we at least reply callers with an - %% error so they don't hang indefinitely in the `gen_server' call with - %% `infinity' timeout. - lists:foreach(fun(From) -> gen_server:reply(From, {error, Error}) end, Replies) - end, - S#s{ - n = 0, - n_bytes = 0, - batch = [], - pending_replies = [], - tref = start_timer() - }. + S#s{ + tref = start_timer(Cooldown) + } + end. do_enqueue( From, Sync, MsgOrBatch, BatchBytes, - S0 = #s{n = N, n_bytes = NBytes0, batch = Batch, pending_replies = Replies} + S0 = #s{n = N, n_bytes = NBytes0, queue = Q0, pending_replies = Replies} ) -> NBytes = NBytes0 + BatchBytes, NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), S1 = case MsgOrBatch of {atomic, NumMsgs, Msgs} -> - S0#s{n = N + NumMsgs, n_bytes = NBytes, batch = Msgs ++ Batch}; + Q = lists:foldl(fun queue:in/2, Q0, Msgs), + S0#s{n = N + NumMsgs, n_bytes = NBytes, queue = Q}; Msg -> - S0#s{n = N + 1, n_bytes = NBytes, batch = [Msg | Batch]} + S0#s{n = N + 1, n_bytes = NBytes, queue = queue:in(Msg, Q0)} end, %% TODO: later we may want to delay the reply until the message is %% replicated, but it requies changes to the PUBACK/PUBREC flow to @@ -264,6 +261,9 @@ do_enqueue( start_timer() -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), + start_timer(Interval). + +start_timer(Interval) -> erlang:send_after(Interval, self(), ?flush). %% @doc Return approximate size of the MQTT message (it doesn't take