fix(retainer): fix qlc cursor cleanup

This commit is contained in:
Ilya Averyanov 2024-05-08 19:56:17 +03:00
parent ba66f2303a
commit 57287f0722
6 changed files with 226 additions and 97 deletions

View File

@ -2,7 +2,7 @@
{application, emqx_retainer, [ {application, emqx_retainer, [
{description, "EMQX Retainer"}, {description, "EMQX Retainer"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.22"}, {vsn, "5.0.23"},
{modules, []}, {modules, []},
{registered, [emqx_retainer_sup]}, {registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx, emqx_ctl]}, {applications, [kernel, stdlib, emqx, emqx_ctl]},

View File

@ -47,6 +47,7 @@
retained_count/0, retained_count/0,
backend_module/0, backend_module/0,
backend_module/1, backend_module/1,
backend_state/1,
enabled/0 enabled/0
]). ]).
@ -103,6 +104,7 @@
-callback page_read(backend_state(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) -> -callback page_read(backend_state(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) ->
{ok, has_next(), list(message())}. {ok, has_next(), list(message())}.
-callback match_messages(backend_state(), topic(), cursor()) -> {ok, list(message()), cursor()}. -callback match_messages(backend_state(), topic(), cursor()) -> {ok, list(message()), cursor()}.
-callback delete_cursor(backend_state(), cursor()) -> ok.
-callback clear_expired(backend_state()) -> ok. -callback clear_expired(backend_state()) -> ok.
-callback clean(backend_state()) -> ok. -callback clean(backend_state()) -> ok.
-callback size(backend_state()) -> non_neg_integer(). -callback size(backend_state()) -> non_neg_integer().
@ -339,7 +341,7 @@ count(Context) ->
clear_expired(Context) -> clear_expired(Context) ->
Mod = backend_module(Context), Mod = backend_module(Context),
BackendState = backend_state(Context), BackendState = backend_state(Context),
Mod:clear_expired(BackendState). ok = Mod:clear_expired(BackendState).
-spec store_retained(context(), message()) -> ok. -spec store_retained(context(), message()) -> ok.
store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) -> store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) ->

View File

@ -46,15 +46,26 @@
-type limiter() :: emqx_htb_limiter:limiter(). -type limiter() :: emqx_htb_limiter:limiter().
-type context() :: emqx_retainer:context(). -type context() :: emqx_retainer:context().
-type topic() :: emqx_types:topic(). -type topic() :: emqx_types:topic().
-type cursor() :: emqx_retainer:cursor().
-define(POOL, ?MODULE). -define(POOL, ?MODULE).
%% For tests
-export([
dispatch/3
]).
%% This module is `emqx_retainer` companion
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
%%%=================================================================== %%%===================================================================
%%% API %%% API
%%%=================================================================== %%%===================================================================
dispatch(Context, Topic) -> dispatch(Context, Topic) ->
cast({?FUNCTION_NAME, Context, self(), Topic}). dispatch(Context, Topic, self()).
dispatch(Context, Topic, Pid) ->
cast({dispatch, Context, Pid, Topic}).
%% reset the client's limiter after updated the limiter's config %% reset the client's limiter after updated the limiter's config
refresh_limiter() -> refresh_limiter() ->
@ -156,7 +167,7 @@ handle_call(Req, _From, State) ->
| {noreply, NewState :: term(), hibernate} | {noreply, NewState :: term(), hibernate}
| {stop, Reason :: term(), NewState :: term()}. | {stop, Reason :: term(), NewState :: term()}.
handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
{ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter), {ok, Limiter2} = dispatch(Context, Pid, Topic, Limiter),
{noreply, State#{limiter := Limiter2}}; {noreply, State#{limiter := Limiter2}};
handle_cast({refresh_limiter, Conf}, State) -> handle_cast({refresh_limiter, Conf}, State) ->
BucketCfg = emqx_utils_maps:deep_get([flow_control, batch_deliver_limiter], Conf, undefined), BucketCfg = emqx_utils_maps:deep_get([flow_control, batch_deliver_limiter], Conf, undefined),
@ -234,86 +245,120 @@ format_status(_Opt, Status) ->
cast(Msg) -> cast(Msg) ->
gen_server:cast(worker(), Msg). gen_server:cast(worker(), Msg).
-spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}. -spec dispatch(context(), pid(), topic(), limiter()) -> {ok, limiter()}.
dispatch(Context, Pid, Topic, Cursor, Limiter) -> dispatch(Context, Pid, Topic, Limiter) ->
Mod = emqx_retainer:backend_module(Context), Mod = emqx_retainer:backend_module(Context),
case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of State = emqx_retainer:backend_state(Context),
false -> case emqx_topic:wildcard(Topic) of
{ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]),
deliver(Result, Context, Pid, Topic, undefined, Limiter);
true -> true ->
{ok, Result, NewCursor} = erlang:apply(Mod, match_messages, [Context, Topic, Cursor]), {ok, Messages, Cursor} = Mod:match_messages(State, Topic, undefined),
deliver(Result, Context, Pid, Topic, NewCursor, Limiter) dispatch_with_cursor(Context, Messages, Cursor, Pid, Topic, Limiter);
false ->
{ok, Messages} = Mod:read_message(State, Topic),
dispatch_at_once(Messages, Pid, Topic, Limiter)
end. end.
-spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> dispatch_at_once(Messages, Pid, Topic, Limiter0) ->
{ok, limiter()}. case deliver(Messages, Pid, Topic, Limiter0) of
deliver([], _Context, _Pid, _Topic, undefined, Limiter) -> {ok, Limiter1} ->
{ok, Limiter1};
{drop, Limiter1} ->
{ok, Limiter1};
no_receiver ->
?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}),
{ok, Limiter0}
end.
dispatch_with_cursor(Context, [], Cursor, _Pid, _Topic, Limiter) ->
ok = delete_cursor(Context, Cursor),
{ok, Limiter}; {ok, Limiter};
deliver([], Context, Pid, Topic, Cursor, Limiter) -> dispatch_with_cursor(Context, Messages0, Cursor0, Pid, Topic, Limiter0) ->
dispatch(Context, Pid, Topic, Cursor, Limiter); case deliver(Messages0, Pid, Topic, Limiter0) of
deliver(Result, Context, Pid, Topic, Cursor, Limiter) -> {ok, Limiter1} ->
{ok, Messages1, Cursor1} = match_next(Context, Topic, Cursor0),
dispatch_with_cursor(Context, Messages1, Cursor1, Pid, Topic, Limiter1);
{drop, Limiter1} ->
ok = delete_cursor(Context, Cursor0),
{ok, Limiter1};
no_receiver ->
?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}),
ok = delete_cursor(Context, Cursor0),
{ok, Limiter0}
end.
match_next(_Context, _Topic, undefined) ->
{ok, [], undefined};
match_next(Context, Topic, Cursor) ->
Mod = emqx_retainer:backend_module(Context),
State = emqx_retainer:backend_state(Context),
Mod:match_messages(State, Topic, Cursor).
delete_cursor(_Context, undefined) ->
ok;
delete_cursor(Context, Cursor) ->
Mod = emqx_retainer:backend_module(Context),
State = emqx_retainer:backend_state(Context),
Mod:delete_cursor(State, Cursor).
-spec deliver([emqx_types:message()], pid(), topic(), limiter()) ->
{ok, limiter()} | {drop, limiter()} | no_receiver.
deliver(Messages, Pid, Topic, Limiter) ->
case erlang:is_process_alive(Pid) of case erlang:is_process_alive(Pid) of
false -> false ->
{ok, Limiter}; no_receiver;
_ -> _ ->
DeliverNum = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined), BatchSize = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined),
case DeliverNum of case BatchSize of
0 -> 0 ->
do_deliver(Result, Pid, Topic), deliver_to_client(Messages, Pid, Topic),
{ok, Limiter}; {ok, Limiter};
_ -> _ ->
case do_deliver(Result, DeliverNum, Pid, Topic, Limiter) of deliver_in_batches(Messages, BatchSize, Pid, Topic, Limiter)
{ok, Limiter2} ->
deliver([], Context, Pid, Topic, Cursor, Limiter2);
{drop, Limiter2} ->
{ok, Limiter2}
end
end end
end. end.
do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) -> deliver_in_batches([], _BatchSize, _Pid, _Topic, Limiter) ->
{ok, Limiter}; {ok, Limiter};
do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) -> deliver_in_batches(Msgs, BatchSize, Pid, Topic, Limiter0) ->
{Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs), {BatchActualSize, Batch, RestMsgs} = take(BatchSize, Msgs),
case emqx_htb_limiter:consume(Num, Limiter) of case emqx_htb_limiter:consume(BatchActualSize, Limiter0) of
{ok, Limiter2} -> {ok, Limiter1} ->
do_deliver(ToDelivers, Pid, Topic), ok = deliver_to_client(Batch, Pid, Topic),
do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2); deliver_in_batches(RestMsgs, BatchSize, Pid, Topic, Limiter1);
{drop, _} = Drop -> {drop, _Limiter1} = Drop ->
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "retained_message_dropped", msg => "retained_message_dropped",
reason => "reached_ratelimit", reason => "reached_ratelimit",
dropped_count => length(ToDelivers) dropped_count => BatchActualSize
}), }),
Drop Drop
end. end.
do_deliver([Msg | T], Pid, Topic) -> deliver_to_client([Msg | T], Pid, Topic) ->
case emqx_banned:look_up({clientid, Msg#message.from}) of _ =
[] -> case emqx_banned:look_up({clientid, Msg#message.from}) of
Pid ! {deliver, Topic, Msg}, [] ->
ok; Pid ! {deliver, Topic, Msg};
_ -> _ ->
?tp( ?tp(
notice, notice,
ignore_retained_message_deliver, ignore_retained_message_deliver,
#{ #{
reason => "client is banned", reason => "client is banned",
clientid => Msg#message.from clientid => Msg#message.from
} }
) )
end, end,
do_deliver(T, Pid, Topic); deliver_to_client(T, Pid, Topic);
do_deliver([], _, _) -> deliver_to_client([], _, _) ->
ok. ok.
safe_split(N, List) -> take(N, List) ->
safe_split(N, List, 0, []). take(N, List, 0, []).
safe_split(0, List, Count, Acc) -> take(0, List, Count, Acc) ->
{Count, lists:reverse(Acc), List}; {Count, lists:reverse(Acc), List};
safe_split(_N, [], Count, Acc) -> take(_N, [], Count, Acc) ->
{Count, lists:reverse(Acc), []}; {Count, lists:reverse(Acc), []};
safe_split(N, [H | T], Count, Acc) -> take(N, [H | T], Count, Acc) ->
safe_split(N - 1, T, Count + 1, [H | Acc]). take(N - 1, T, Count + 1, [H | Acc]).

View File

@ -35,6 +35,7 @@
read_message/2, read_message/2,
page_read/4, page_read/4,
match_messages/3, match_messages/3,
delete_cursor/2,
clear_expired/1, clear_expired/1,
clean/1, clean/1,
size/1 size/1
@ -205,7 +206,7 @@ delete_message(_State, Topic) ->
read_message(_State, Topic) -> read_message(_State, Topic) ->
{ok, read_messages(Topic)}. {ok, read_messages(Topic)}.
match_messages(_State, Topic, undefined) -> match_messages(State, Topic, undefined) ->
Tokens = topic_to_tokens(Topic), Tokens = topic_to_tokens(Topic),
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
QH = msg_table(search_table(Tokens, Now)), QH = msg_table(search_table(Tokens, Now)),
@ -214,7 +215,7 @@ match_messages(_State, Topic, undefined) ->
{ok, qlc:eval(QH), undefined}; {ok, qlc:eval(QH), undefined};
BatchNum when is_integer(BatchNum) -> BatchNum when is_integer(BatchNum) ->
Cursor = qlc:cursor(QH), Cursor = qlc:cursor(QH),
match_messages(undefined, Topic, {Cursor, BatchNum}) match_messages(State, Topic, {Cursor, BatchNum})
end; end;
match_messages(_State, _Topic, {Cursor, BatchNum}) -> match_messages(_State, _Topic, {Cursor, BatchNum}) ->
case qlc_next_answers(Cursor, BatchNum) of case qlc_next_answers(Cursor, BatchNum) of
@ -224,6 +225,11 @@ match_messages(_State, _Topic, {Cursor, BatchNum}) ->
{ok, Rows, {Cursor, BatchNum}} {ok, Rows, {Cursor, BatchNum}}
end. end.
delete_cursor(_State, {Cursor, _}) ->
qlc:delete_cursor(Cursor);
delete_cursor(_State, undefined) ->
ok.
page_read(_State, Topic, Page, Limit) -> page_read(_State, Topic, Page, Limit) ->
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
QH = QH =
@ -562,6 +568,7 @@ reindex(NewIndices, Force, StatusFun) when
%% Fill index records in batches. %% Fill index records in batches.
QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]), QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]),
ok = reindex_batch(qlc:cursor(QH), 0, StatusFun), ok = reindex_batch(qlc:cursor(QH), 0, StatusFun),
%% Enable read indices and unlock reindexing. %% Enable read indices and unlock reindexing.

View File

@ -21,6 +21,7 @@
-include("emqx_retainer.hrl"). -include("emqx_retainer.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -96,14 +97,19 @@ end_per_group(_Group, Config) ->
emqx_retainer_mnesia:populate_index_meta(), emqx_retainer_mnesia:populate_index_meta(),
Config. Config.
init_per_testcase(t_get_basic_usage_info, Config) -> init_per_testcase(_TestCase, Config) ->
mnesia:clear_table(?TAB_INDEX), mnesia:clear_table(?TAB_INDEX),
mnesia:clear_table(?TAB_MESSAGE), mnesia:clear_table(?TAB_MESSAGE),
emqx_retainer_mnesia:populate_index_meta(), emqx_retainer_mnesia:populate_index_meta(),
Config;
init_per_testcase(_TestCase, Config) ->
Config. Config.
end_per_testcase(t_flow_control, _Config) ->
restore_delivery();
end_per_testcase(t_cursor_cleanup, _Config) ->
restore_delivery();
end_per_testcase(_TestCase, _Config) ->
ok.
app_spec() -> app_spec() ->
{emqx_retainer, ?BASE_CONF}. {emqx_retainer, ?BASE_CONF}.
@ -405,19 +411,7 @@ t_stop_publish_clear_msg(_) ->
ok = emqtt:disconnect(C1). ok = emqtt:disconnect(C1).
t_flow_control(_) -> t_flow_control(_) ->
Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"), setup_slow_delivery(),
LimiterCfg = make_limiter_cfg(Rate),
JsonCfg = make_limiter_json(<<"1/1s">>),
emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg),
emqx_retainer:update_config(#{
<<"delivery_rate">> => <<"1/1s">>,
<<"flow_control">> =>
#{
<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1,
<<"batch_deliver_limiter">> => JsonCfg
}
}),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish( emqtt:publish(
@ -442,23 +436,60 @@ t_flow_control(_) ->
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
?assertEqual(3, length(receive_messages(3))), ?assertEqual(3, length(receive_messages(3))),
End = erlang:system_time(millisecond), End = erlang:system_time(millisecond),
Diff = End - Begin, Diff = End - Begin,
?assert( ?assert(
Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9), Diff > timer:seconds(2.1) andalso Diff < timer:seconds(3.9),
lists:flatten(io_lib:format("Diff is :~p~n", [Diff])) lists:flatten(io_lib:format("Diff is :~p~n", [Diff]))
), ),
ok = emqtt:disconnect(C1), ok = emqtt:disconnect(C1),
ok.
t_cursor_cleanup(_) ->
setup_slow_delivery(),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
lists:foreach(
fun(I) ->
emqtt:publish(
C1,
<<"retained/", (integer_to_binary(I))/binary>>,
<<"this is a retained message">>,
[{qos, 0}, {retain, true}]
)
end,
lists:seq(1, 5)
),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
snabbkaffe:start_trace(),
?assertWaitEvent(
emqtt:disconnect(C1),
#{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/#">>},
2000
),
?assertEqual(0, qlc_process_count()),
{Pid, Ref} = spawn_monitor(fun() -> ok end),
receive
{'DOWN', Ref, _, _, _} -> ok
after 1000 -> ct:fail("should receive 'DOWN' message")
end,
?assertWaitEvent(
emqx_retainer_dispatcher:dispatch(emqx_retainer:context(), <<"retained/1">>, Pid),
#{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/1">>},
2000
),
?assertEqual(0, qlc_process_count()),
snabbkaffe:stop(),
emqx_limiter_server:del_bucket(emqx_retainer, internal),
emqx_retainer:update_config(#{
<<"flow_control">> =>
#{
<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1
}
}),
ok. ok.
t_clear_expired(_) -> t_clear_expired(_) ->
@ -849,15 +880,21 @@ with_conf(ConfMod, Case) ->
end. end.
make_limiter_cfg(Rate) -> make_limiter_cfg(Rate) ->
Client = #{ make_limiter_cfg(Rate, #{}).
rate => Rate,
initial => 0, make_limiter_cfg(Rate, ClientOpts) ->
burst => 0, Client = maps:merge(
low_watermark => 1, #{
divisible => false, rate => Rate,
max_retry_time => timer:seconds(5), initial => 0,
failure_strategy => force burst => 0,
}, low_watermark => 1,
divisible => false,
max_retry_time => timer:seconds(5),
failure_strategy => force
},
ClientOpts
),
#{client => Client, rate => Rate, initial => 0, burst => 0}. #{client => Client, rate => Rate, initial => 0, burst => 0}.
make_limiter_json(Rate) -> make_limiter_json(Rate) ->
@ -909,3 +946,40 @@ do_publish(Client, Topic, Payload, Opts, {sleep, Time}) ->
Res = emqtt:publish(Client, Topic, Payload, Opts), Res = emqtt:publish(Client, Topic, Payload, Opts),
ct:sleep(Time), ct:sleep(Time),
Res. Res.
setup_slow_delivery() ->
Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"),
LimiterCfg = make_limiter_cfg(Rate),
JsonCfg = make_limiter_json(<<"1/1s">>),
emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg),
emqx_retainer:update_config(#{
<<"delivery_rate">> => <<"1/1s">>,
<<"flow_control">> =>
#{
<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1,
<<"batch_deliver_limiter">> => JsonCfg
}
}).
restore_delivery() ->
emqx_limiter_server:del_bucket(emqx_retainer, internal),
emqx_retainer:update_config(#{
<<"flow_control">> =>
#{
<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1
}
}).
qlc_processes() ->
lists:filter(
fun(Pid) ->
{current_function, {qlc, wait_for_request, 3}} =:=
erlang:process_info(Pid, current_function)
end,
erlang:processes()
).
qlc_process_count() ->
length(qlc_processes()).

View File

@ -0,0 +1 @@
Fix process leak in `emqx_retainer` application. Previously, client disconnection while receiving retained messages could cause a process leak.