From 57287f07228cf015442950eb2ea91353522bb69a Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 8 May 2024 19:56:17 +0300 Subject: [PATCH] fix(retainer): fix qlc cursor cleanup --- apps/emqx_retainer/src/emqx_retainer.app.src | 2 +- apps/emqx_retainer/src/emqx_retainer.erl | 4 +- .../src/emqx_retainer_dispatcher.erl | 163 +++++++++++------- .../src/emqx_retainer_mnesia.erl | 11 +- .../test/emqx_retainer_SUITE.erl | 142 +++++++++++---- changes/ce/fix-12996.en.md | 1 + 6 files changed, 226 insertions(+), 97 deletions(-) create mode 100644 changes/ce/fix-12996.en.md diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 4a8b3cdc3..7bcde8d50 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.22"}, + {vsn, "5.0.23"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx, emqx_ctl]}, diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index b375f30ad..743046c80 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -47,6 +47,7 @@ retained_count/0, backend_module/0, backend_module/1, + backend_state/1, enabled/0 ]). @@ -103,6 +104,7 @@ -callback page_read(backend_state(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) -> {ok, has_next(), list(message())}. -callback match_messages(backend_state(), topic(), cursor()) -> {ok, list(message()), cursor()}. +-callback delete_cursor(backend_state(), cursor()) -> ok. -callback clear_expired(backend_state()) -> ok. -callback clean(backend_state()) -> ok. -callback size(backend_state()) -> non_neg_integer(). @@ -339,7 +341,7 @@ count(Context) -> clear_expired(Context) -> Mod = backend_module(Context), BackendState = backend_state(Context), - Mod:clear_expired(BackendState). + ok = Mod:clear_expired(BackendState). -spec store_retained(context(), message()) -> ok. store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) -> diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index e918d8d52..19ae7bbe9 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -46,15 +46,26 @@ -type limiter() :: emqx_htb_limiter:limiter(). -type context() :: emqx_retainer:context(). -type topic() :: emqx_types:topic(). --type cursor() :: emqx_retainer:cursor(). -define(POOL, ?MODULE). +%% For tests +-export([ + dispatch/3 +]). + +%% This module is `emqx_retainer` companion +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + %%%=================================================================== %%% API %%%=================================================================== + dispatch(Context, Topic) -> - cast({?FUNCTION_NAME, Context, self(), Topic}). + dispatch(Context, Topic, self()). + +dispatch(Context, Topic, Pid) -> + cast({dispatch, Context, Pid, Topic}). %% reset the client's limiter after updated the limiter's config refresh_limiter() -> @@ -156,7 +167,7 @@ handle_call(Req, _From, State) -> | {noreply, NewState :: term(), hibernate} | {stop, Reason :: term(), NewState :: term()}. handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> - {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter), + {ok, Limiter2} = dispatch(Context, Pid, Topic, Limiter), {noreply, State#{limiter := Limiter2}}; handle_cast({refresh_limiter, Conf}, State) -> BucketCfg = emqx_utils_maps:deep_get([flow_control, batch_deliver_limiter], Conf, undefined), @@ -234,86 +245,120 @@ format_status(_Opt, Status) -> cast(Msg) -> gen_server:cast(worker(), Msg). --spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}. -dispatch(Context, Pid, Topic, Cursor, Limiter) -> +-spec dispatch(context(), pid(), topic(), limiter()) -> {ok, limiter()}. +dispatch(Context, Pid, Topic, Limiter) -> Mod = emqx_retainer:backend_module(Context), - case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of - false -> - {ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]), - deliver(Result, Context, Pid, Topic, undefined, Limiter); + State = emqx_retainer:backend_state(Context), + case emqx_topic:wildcard(Topic) of true -> - {ok, Result, NewCursor} = erlang:apply(Mod, match_messages, [Context, Topic, Cursor]), - deliver(Result, Context, Pid, Topic, NewCursor, Limiter) + {ok, Messages, Cursor} = Mod:match_messages(State, Topic, undefined), + dispatch_with_cursor(Context, Messages, Cursor, Pid, Topic, Limiter); + false -> + {ok, Messages} = Mod:read_message(State, Topic), + dispatch_at_once(Messages, Pid, Topic, Limiter) end. --spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) -> - {ok, limiter()}. -deliver([], _Context, _Pid, _Topic, undefined, Limiter) -> +dispatch_at_once(Messages, Pid, Topic, Limiter0) -> + case deliver(Messages, Pid, Topic, Limiter0) of + {ok, Limiter1} -> + {ok, Limiter1}; + {drop, Limiter1} -> + {ok, Limiter1}; + no_receiver -> + ?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}), + {ok, Limiter0} + end. + +dispatch_with_cursor(Context, [], Cursor, _Pid, _Topic, Limiter) -> + ok = delete_cursor(Context, Cursor), {ok, Limiter}; -deliver([], Context, Pid, Topic, Cursor, Limiter) -> - dispatch(Context, Pid, Topic, Cursor, Limiter); -deliver(Result, Context, Pid, Topic, Cursor, Limiter) -> +dispatch_with_cursor(Context, Messages0, Cursor0, Pid, Topic, Limiter0) -> + case deliver(Messages0, Pid, Topic, Limiter0) of + {ok, Limiter1} -> + {ok, Messages1, Cursor1} = match_next(Context, Topic, Cursor0), + dispatch_with_cursor(Context, Messages1, Cursor1, Pid, Topic, Limiter1); + {drop, Limiter1} -> + ok = delete_cursor(Context, Cursor0), + {ok, Limiter1}; + no_receiver -> + ?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}), + ok = delete_cursor(Context, Cursor0), + {ok, Limiter0} + end. + +match_next(_Context, _Topic, undefined) -> + {ok, [], undefined}; +match_next(Context, Topic, Cursor) -> + Mod = emqx_retainer:backend_module(Context), + State = emqx_retainer:backend_state(Context), + Mod:match_messages(State, Topic, Cursor). + +delete_cursor(_Context, undefined) -> + ok; +delete_cursor(Context, Cursor) -> + Mod = emqx_retainer:backend_module(Context), + State = emqx_retainer:backend_state(Context), + Mod:delete_cursor(State, Cursor). + +-spec deliver([emqx_types:message()], pid(), topic(), limiter()) -> + {ok, limiter()} | {drop, limiter()} | no_receiver. +deliver(Messages, Pid, Topic, Limiter) -> case erlang:is_process_alive(Pid) of false -> - {ok, Limiter}; + no_receiver; _ -> - DeliverNum = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined), - case DeliverNum of + BatchSize = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined), + case BatchSize of 0 -> - do_deliver(Result, Pid, Topic), + deliver_to_client(Messages, Pid, Topic), {ok, Limiter}; _ -> - case do_deliver(Result, DeliverNum, Pid, Topic, Limiter) of - {ok, Limiter2} -> - deliver([], Context, Pid, Topic, Cursor, Limiter2); - {drop, Limiter2} -> - {ok, Limiter2} - end + deliver_in_batches(Messages, BatchSize, Pid, Topic, Limiter) end end. -do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) -> +deliver_in_batches([], _BatchSize, _Pid, _Topic, Limiter) -> {ok, Limiter}; -do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) -> - {Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs), - case emqx_htb_limiter:consume(Num, Limiter) of - {ok, Limiter2} -> - do_deliver(ToDelivers, Pid, Topic), - do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2); - {drop, _} = Drop -> +deliver_in_batches(Msgs, BatchSize, Pid, Topic, Limiter0) -> + {BatchActualSize, Batch, RestMsgs} = take(BatchSize, Msgs), + case emqx_htb_limiter:consume(BatchActualSize, Limiter0) of + {ok, Limiter1} -> + ok = deliver_to_client(Batch, Pid, Topic), + deliver_in_batches(RestMsgs, BatchSize, Pid, Topic, Limiter1); + {drop, _Limiter1} = Drop -> ?SLOG(debug, #{ msg => "retained_message_dropped", reason => "reached_ratelimit", - dropped_count => length(ToDelivers) + dropped_count => BatchActualSize }), Drop end. -do_deliver([Msg | T], Pid, Topic) -> - case emqx_banned:look_up({clientid, Msg#message.from}) of - [] -> - Pid ! {deliver, Topic, Msg}, - ok; - _ -> - ?tp( - notice, - ignore_retained_message_deliver, - #{ - reason => "client is banned", - clientid => Msg#message.from - } - ) - end, - do_deliver(T, Pid, Topic); -do_deliver([], _, _) -> +deliver_to_client([Msg | T], Pid, Topic) -> + _ = + case emqx_banned:look_up({clientid, Msg#message.from}) of + [] -> + Pid ! {deliver, Topic, Msg}; + _ -> + ?tp( + notice, + ignore_retained_message_deliver, + #{ + reason => "client is banned", + clientid => Msg#message.from + } + ) + end, + deliver_to_client(T, Pid, Topic); +deliver_to_client([], _, _) -> ok. -safe_split(N, List) -> - safe_split(N, List, 0, []). +take(N, List) -> + take(N, List, 0, []). -safe_split(0, List, Count, Acc) -> +take(0, List, Count, Acc) -> {Count, lists:reverse(Acc), List}; -safe_split(_N, [], Count, Acc) -> +take(_N, [], Count, Acc) -> {Count, lists:reverse(Acc), []}; -safe_split(N, [H | T], Count, Acc) -> - safe_split(N - 1, T, Count + 1, [H | Acc]). +take(N, [H | T], Count, Acc) -> + take(N - 1, T, Count + 1, [H | Acc]). diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 7e2a73a09..daaa776b7 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -35,6 +35,7 @@ read_message/2, page_read/4, match_messages/3, + delete_cursor/2, clear_expired/1, clean/1, size/1 @@ -205,7 +206,7 @@ delete_message(_State, Topic) -> read_message(_State, Topic) -> {ok, read_messages(Topic)}. -match_messages(_State, Topic, undefined) -> +match_messages(State, Topic, undefined) -> Tokens = topic_to_tokens(Topic), Now = erlang:system_time(millisecond), QH = msg_table(search_table(Tokens, Now)), @@ -214,7 +215,7 @@ match_messages(_State, Topic, undefined) -> {ok, qlc:eval(QH), undefined}; BatchNum when is_integer(BatchNum) -> Cursor = qlc:cursor(QH), - match_messages(undefined, Topic, {Cursor, BatchNum}) + match_messages(State, Topic, {Cursor, BatchNum}) end; match_messages(_State, _Topic, {Cursor, BatchNum}) -> case qlc_next_answers(Cursor, BatchNum) of @@ -224,6 +225,11 @@ match_messages(_State, _Topic, {Cursor, BatchNum}) -> {ok, Rows, {Cursor, BatchNum}} end. +delete_cursor(_State, {Cursor, _}) -> + qlc:delete_cursor(Cursor); +delete_cursor(_State, undefined) -> + ok. + page_read(_State, Topic, Page, Limit) -> Now = erlang:system_time(millisecond), QH = @@ -562,6 +568,7 @@ reindex(NewIndices, Force, StatusFun) when %% Fill index records in batches. QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]), + ok = reindex_batch(qlc:cursor(QH), 0, StatusFun), %% Enable read indices and unlock reindexing. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index d4ad43907..b29974068 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -21,6 +21,7 @@ -include("emqx_retainer.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -96,14 +97,19 @@ end_per_group(_Group, Config) -> emqx_retainer_mnesia:populate_index_meta(), Config. -init_per_testcase(t_get_basic_usage_info, Config) -> +init_per_testcase(_TestCase, Config) -> mnesia:clear_table(?TAB_INDEX), mnesia:clear_table(?TAB_MESSAGE), emqx_retainer_mnesia:populate_index_meta(), - Config; -init_per_testcase(_TestCase, Config) -> Config. +end_per_testcase(t_flow_control, _Config) -> + restore_delivery(); +end_per_testcase(t_cursor_cleanup, _Config) -> + restore_delivery(); +end_per_testcase(_TestCase, _Config) -> + ok. + app_spec() -> {emqx_retainer, ?BASE_CONF}. @@ -405,19 +411,7 @@ t_stop_publish_clear_msg(_) -> ok = emqtt:disconnect(C1). t_flow_control(_) -> - Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"), - LimiterCfg = make_limiter_cfg(Rate), - JsonCfg = make_limiter_json(<<"1/1s">>), - emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg), - emqx_retainer:update_config(#{ - <<"delivery_rate">> => <<"1/1s">>, - <<"flow_control">> => - #{ - <<"batch_read_number">> => 1, - <<"batch_deliver_number">> => 1, - <<"batch_deliver_limiter">> => JsonCfg - } - }), + setup_slow_delivery(), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), emqtt:publish( @@ -442,23 +436,60 @@ t_flow_control(_) -> {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(3, length(receive_messages(3))), End = erlang:system_time(millisecond), + Diff = End - Begin, ?assert( - Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9), + Diff > timer:seconds(2.1) andalso Diff < timer:seconds(3.9), lists:flatten(io_lib:format("Diff is :~p~n", [Diff])) ), ok = emqtt:disconnect(C1), + ok. + +t_cursor_cleanup(_) -> + setup_slow_delivery(), + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + lists:foreach( + fun(I) -> + emqtt:publish( + C1, + <<"retained/", (integer_to_binary(I))/binary>>, + <<"this is a retained message">>, + [{qos, 0}, {retain, true}] + ) + end, + lists:seq(1, 5) + ), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), + + snabbkaffe:start_trace(), + + ?assertWaitEvent( + emqtt:disconnect(C1), + #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/#">>}, + 2000 + ), + + ?assertEqual(0, qlc_process_count()), + + {Pid, Ref} = spawn_monitor(fun() -> ok end), + receive + {'DOWN', Ref, _, _, _} -> ok + after 1000 -> ct:fail("should receive 'DOWN' message") + end, + + ?assertWaitEvent( + emqx_retainer_dispatcher:dispatch(emqx_retainer:context(), <<"retained/1">>, Pid), + #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/1">>}, + 2000 + ), + + ?assertEqual(0, qlc_process_count()), + + snabbkaffe:stop(), - emqx_limiter_server:del_bucket(emqx_retainer, internal), - emqx_retainer:update_config(#{ - <<"flow_control">> => - #{ - <<"batch_read_number">> => 1, - <<"batch_deliver_number">> => 1 - } - }), ok. t_clear_expired(_) -> @@ -849,15 +880,21 @@ with_conf(ConfMod, Case) -> end. make_limiter_cfg(Rate) -> - Client = #{ - rate => Rate, - initial => 0, - burst => 0, - low_watermark => 1, - divisible => false, - max_retry_time => timer:seconds(5), - failure_strategy => force - }, + make_limiter_cfg(Rate, #{}). + +make_limiter_cfg(Rate, ClientOpts) -> + Client = maps:merge( + #{ + rate => Rate, + initial => 0, + burst => 0, + low_watermark => 1, + divisible => false, + max_retry_time => timer:seconds(5), + failure_strategy => force + }, + ClientOpts + ), #{client => Client, rate => Rate, initial => 0, burst => 0}. make_limiter_json(Rate) -> @@ -909,3 +946,40 @@ do_publish(Client, Topic, Payload, Opts, {sleep, Time}) -> Res = emqtt:publish(Client, Topic, Payload, Opts), ct:sleep(Time), Res. + +setup_slow_delivery() -> + Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"), + LimiterCfg = make_limiter_cfg(Rate), + JsonCfg = make_limiter_json(<<"1/1s">>), + emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg), + emqx_retainer:update_config(#{ + <<"delivery_rate">> => <<"1/1s">>, + <<"flow_control">> => + #{ + <<"batch_read_number">> => 1, + <<"batch_deliver_number">> => 1, + <<"batch_deliver_limiter">> => JsonCfg + } + }). + +restore_delivery() -> + emqx_limiter_server:del_bucket(emqx_retainer, internal), + emqx_retainer:update_config(#{ + <<"flow_control">> => + #{ + <<"batch_read_number">> => 1, + <<"batch_deliver_number">> => 1 + } + }). + +qlc_processes() -> + lists:filter( + fun(Pid) -> + {current_function, {qlc, wait_for_request, 3}} =:= + erlang:process_info(Pid, current_function) + end, + erlang:processes() + ). + +qlc_process_count() -> + length(qlc_processes()). diff --git a/changes/ce/fix-12996.en.md b/changes/ce/fix-12996.en.md new file mode 100644 index 000000000..0c3cc872f --- /dev/null +++ b/changes/ce/fix-12996.en.md @@ -0,0 +1 @@ +Fix process leak in `emqx_retainer` application. Previously, client disconnection while receiving retained messages could cause a process leak.