Compare commits

...

2 Commits

Author SHA1 Message Date
zmstone a2a598469e
Merge pull request #13206 from emqx/0607-patch-qlc-leak-for-5.6
fix(retainer): fix qlc cursor cleanup
2024-06-07 13:44:29 +02:00
Ilya Averyanov 986e7dc1a6 fix(retainer): fix qlc cursor cleanup 2024-06-07 13:38:50 +02:00
2 changed files with 68 additions and 0 deletions

View File

@ -255,6 +255,8 @@ deliver([], Context, Pid, Topic, Cursor, Limiter) ->
deliver(Result, Context, Pid, Topic, Cursor, Limiter) -> deliver(Result, Context, Pid, Topic, Cursor, Limiter) ->
case erlang:is_process_alive(Pid) of case erlang:is_process_alive(Pid) of
false -> false ->
ok = close_cursor(Cursor),
?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}),
{ok, Limiter}; {ok, Limiter};
_ -> _ ->
DeliverNum = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined), DeliverNum = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined),
@ -272,6 +274,11 @@ deliver(Result, Context, Pid, Topic, Cursor, Limiter) ->
end end
end. end.
close_cursor({{qlc_cursor, _} = Cursor, _}) ->
qlc:delete_cursor(Cursor);
close_cursor(_Cursor) ->
ok.
do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) -> do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) ->
{ok, Limiter}; {ok, Limiter};
do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) -> do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->

View File

@ -21,6 +21,7 @@
-include("emqx_retainer.hrl"). -include("emqx_retainer.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -461,6 +462,66 @@ t_flow_control(_) ->
}), }),
ok. ok.
t_cursor_cleanup(_) ->
Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"),
LimiterCfg = make_limiter_cfg(Rate),
JsonCfg = make_limiter_json(<<"1/1s">>),
emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg),
emqx_retainer:update_config(#{
<<"delivery_rate">> => <<"1/1s">>,
<<"flow_control">> =>
#{
<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1,
<<"batch_deliver_limiter">> => JsonCfg
}
}),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
lists:foreach(
fun(I) ->
emqtt:publish(
C1,
<<"retained/", (integer_to_binary(I))/binary>>,
<<"this is a retained message">>,
[{qos, 0}, {retain, true}]
)
end,
lists:seq(1, 5)
),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
snabbkaffe:start_trace(),
?assertWaitEvent(
emqtt:disconnect(C1),
#{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/#">>},
2000
),
QLCProcesses = lists:filter(
fun(Pid) ->
{current_function, {qlc, wait_for_request, 3}} =:=
erlang:process_info(Pid, current_function)
end,
erlang:processes()
),
?assertEqual(0, length(QLCProcesses)),
snabbkaffe:stop(),
emqx_limiter_server:del_bucket(emqx_retainer, internal),
emqx_retainer:update_config(#{
<<"flow_control">> =>
#{
<<"batch_read_number">> => 1,
<<"batch_deliver_number">> => 1
}
}),
ok.
t_clear_expired(_) -> t_clear_expired(_) ->
ConfMod = fun(Conf) -> ConfMod = fun(Conf) ->
Conf#{ Conf#{