wip: provide persist feedback to broker + alternative egress impl

This commit is contained in:
Andrew Mayorov 2024-02-20 20:33:57 +01:00
parent 83dd6a2896
commit 1895b250e9
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
5 changed files with 74 additions and 83 deletions

View File

@ -250,9 +250,11 @@ persist_publish(Msg) ->
case emqx_persistent_message:persist(Msg) of case emqx_persistent_message:persist(Msg) of
ok -> ok ->
[persisted]; [persisted];
{_SkipOrError, _Reason} -> {skipped, _Reason} ->
[];
{error, _Reason} = Error ->
% TODO: log errors? % TODO: log errors?
[] [Error]
end. end.
%% Called internally %% Called internally

View File

@ -745,6 +745,9 @@ ensure_quota(PubRes, Channel = #channel{quota = Limiter}) ->
pubrec_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; pubrec_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
pubrec_reason_code([_ | _]) -> ?RC_SUCCESS. pubrec_reason_code([_ | _]) -> ?RC_SUCCESS.
puback_reason_code(_PacketId, _Msg, [{error, _Reason}]) ->
%% FIXME
undefined;
puback_reason_code(PacketId, Msg, [] = PubRes) -> puback_reason_code(PacketId, Msg, [] = PubRes) ->
emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_NO_MATCHING_SUBSCRIBERS); emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_NO_MATCHING_SUBSCRIBERS);
puback_reason_code(PacketId, Msg, [_ | _] = PubRes) -> puback_reason_code(PacketId, Msg, [_ | _] = PubRes) ->

View File

@ -99,7 +99,10 @@ needs_persistence(Msg) ->
-spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result(). -spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result().
store_message(Msg) -> store_message(Msg) ->
emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{sync => false}). case emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{}) of
[ok] -> ok;
[error] -> {error, timeout}
end.
has_subscribers(#message{topic = Topic}) -> has_subscribers(#message{topic = Topic}) ->
emqx_persistent_session_ds_router:has_any_route(Topic). emqx_persistent_session_ds_router:has_any_route(Topic).

View File

@ -386,7 +386,7 @@ t_message_gc(Config) ->
message(<<"foo/bar">>, <<"1">>, 0), message(<<"foo/bar">>, <<"1">>, 0),
message(<<"foo/baz">>, <<"2">>, 1) message(<<"foo/baz">>, <<"2">>, 1)
], ],
ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0), [ok, ok] = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0),
?tp(inserted_batch, #{}), ?tp(inserted_batch, #{}),
{ok, _} = ?block_until(#{?snk_kind := ps_message_gc_added_gen}), {ok, _} = ?block_until(#{?snk_kind := ps_message_gc_added_gen}),
@ -395,7 +395,7 @@ t_message_gc(Config) ->
message(<<"foo/bar">>, <<"3">>, Now + 100), message(<<"foo/bar">>, <<"3">>, Now + 100),
message(<<"foo/baz">>, <<"4">>, Now + 101) message(<<"foo/baz">>, <<"4">>, Now + 101)
], ],
ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1), [ok, ok] = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1),
{ok, _} = snabbkaffe:block_until( {ok, _} = snabbkaffe:block_until(
?match_n_events(NShards, #{?snk_kind := message_gc_generation_dropped}), ?match_n_events(NShards, #{?snk_kind := message_gc_generation_dropped}),

View File

@ -27,6 +27,8 @@
%% servers, if needed. %% servers, if needed.
-module(emqx_ds_replication_layer_egress). -module(emqx_ds_replication_layer_egress).
-include_lib("emqx_utils/include/emqx_message.hrl").
-behaviour(gen_server). -behaviour(gen_server).
%% API: %% API:
@ -46,36 +48,43 @@
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
-define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}). -define(DRAIN_TIMEOUT, 1).
-define(flush, flush). -define(COOLDOWN_TIMEOUT_MIN, 1000).
-define(COOLDOWN_TIMEOUT_MAX, 5000).
-record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}). -define(name(DB, Shard), {n, l, {?MODULE, DB, Shard}}).
-define(via(DB, Shard), {via, gproc, ?name(DB, Shard)}).
%%================================================================================ %%================================================================================
%% API functions %% API functions
%%================================================================================ %%================================================================================
-define(STORE_TIMEOUT, 60 * 1000).
-spec start_link(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, pid()}. -spec start_link(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, pid()}.
start_link(DB, Shard) -> start_link(DB, Shard) ->
gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []). gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []).
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> -spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
ok. [ok | error].
store_batch(DB, Messages, Opts) -> store_batch(DB, Messages, _Opts) ->
Sync = maps:get(sync, Opts, true), Pid = self(),
lists:foreach( Refs = lists:map(
fun(MessageIn) -> fun(MessageIn) ->
Ref = erlang:make_ref(),
Message = emqx_message:set_timestamp(emqx_ds:timestamp_us(), MessageIn), Message = emqx_message:set_timestamp(emqx_ds:timestamp_us(), MessageIn),
Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
ok = gen_server:call( _ = gproc:send(?name(DB, Shard), {Pid, Ref, Message}),
?via(DB, Shard), Ref
#enqueue_req{message = Message, sync = Sync},
?STORE_TIMEOUT
)
end, end,
Messages Messages
),
%% FIXME
lists:map(
fun(Ref) ->
receive
{Ref, Result} -> Result
end
end,
Refs
). ).
%%================================================================================ %%================================================================================
@ -84,11 +93,7 @@ store_batch(DB, Messages, Opts) ->
-record(s, { -record(s, {
db :: emqx_ds:db(), db :: emqx_ds:db(),
shard :: emqx_ds_replication_layer:shard_id(), shard :: emqx_ds_replication_layer:shard_id()
n = 0 :: non_neg_integer(),
tref :: reference(),
batch = [] :: [emqx_types:message()],
pending_replies = [] :: [gen_server:from()]
}). }).
init([DB, Shard]) -> init([DB, Shard]) ->
@ -96,21 +101,22 @@ init([DB, Shard]) ->
process_flag(message_queue_data, off_heap), process_flag(message_queue_data, off_heap),
S = #s{ S = #s{
db = DB, db = DB,
shard = Shard, shard = Shard
tref = start_timer()
}, },
{ok, S}. {ok, S}.
handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) ->
do_enqueue(From, Sync, Msg, S);
handle_call(_Call, _From, S) -> handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}. {reply, {error, unknown_call}, S}.
handle_cast(_Cast, S) -> handle_cast(_Cast, S) ->
{noreply, S}. {noreply, S}.
handle_info(?flush, S) -> handle_info(Req = {_Pid, _Ref, #message{}}, S) ->
{noreply, do_flush(S)}; ok = timer:sleep(?DRAIN_TIMEOUT),
Batch = [Req | drain_requests(1, max_batch_size())],
_ = flush(Batch, S),
true = erlang:garbage_collect(),
{noreply, S};
handle_info(_Info, S) -> handle_info(_Info, S) ->
{noreply, S}. {noreply, S}.
@ -125,70 +131,47 @@ terminate(_Reason, _S) ->
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================
-define(COOLDOWN_MIN, 1000). drain_requests(M, M) ->
-define(COOLDOWN_MAX, 5000). [M];
drain_requests(N, M) ->
receive
Req = {_Pid, _Ref, #message{}} ->
[Req | drain_requests(N + 1, M)]
after 0 ->
[N]
end.
do_flush(S = #s{batch = []}) -> flush(Batch, #s{db = DB, shard = Shard}) ->
S#s{tref = start_timer()};
do_flush(
S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard}
) ->
%% FIXME %% FIXME
case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of Messages = [Message || {_, _, Message} <- Batch],
case emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages) of
ok -> ok ->
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), Size = reply(ok, Batch),
?tp( ?tp(
emqx_ds_replication_layer_egress_flush, emqx_ds_replication_layer_egress_flush,
#{db => DB, shard => Shard} #{db => DB, shard => Shard, size => Size}
), ),
true = erlang:garbage_collect(), ok;
S#s{
n = 0,
batch = [],
pending_replies = [],
tref = start_timer()
};
{error, Reason} -> {error, Reason} ->
Size = reply(error, Batch),
?tp( ?tp(
warning, warning,
emqx_ds_replication_layer_egress_flush_failed, emqx_ds_replication_layer_egress_flush_failed,
#{db => DB, shard => Shard, reason => Reason} #{db => DB, shard => Shard, size => Size, reason => Reason}
), ),
Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), ok = cooldown(),
ok = timer:sleep(Cooldown), {error, Reason}
S#s{tref = start_timer()}
end. end.
do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> reply(Result, [{Pid, Ref, _Message} | Rest]) ->
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), erlang:send(Pid, {Ref, Result}),
S1 = S0#s{n = N + 1, batch = [Msg | Batch]}, reply(Result, Rest);
S2 = reply(_Result, [Size]) when is_integer(Size) ->
case N >= NMax of Size.
true ->
_ = erlang:cancel_timer(S0#s.tref),
do_flush(S1);
false ->
S1
end,
%% TODO: later we may want to delay the reply until the message is
%% replicated, but it requies changes to the PUBACK/PUBREC flow to
%% allow for async replies. For now, we ack when the message is
%% _buffered_ rather than stored.
%%
%% Otherwise, the client would freeze for at least flush interval,
%% or until the buffer is filled.
S =
case Sync of
true ->
S2#s{pending_replies = [From | Replies]};
false ->
gen_server:reply(From, ok),
S2
end,
%% TODO: add a backpressure mechanism for the server to avoid
%% building a long message queue.
{noreply, S}.
start_timer() -> cooldown() ->
Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), Timeout = ?COOLDOWN_TIMEOUT_MIN + rand:uniform(?COOLDOWN_TIMEOUT_MAX - ?COOLDOWN_TIMEOUT_MIN),
erlang:send_after(Interval, self(), ?flush). timer:sleep(Timeout).
max_batch_size() ->
max(1, application:get_env(emqx_durable_storage, egress_batch_size, 1000)).