diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index e918d8d52..884ff27eb 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -255,6 +255,8 @@ deliver([], Context, Pid, Topic, Cursor, Limiter) -> deliver(Result, Context, Pid, Topic, Cursor, Limiter) -> case erlang:is_process_alive(Pid) of false -> + ok = close_cursor(Cursor), + ?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}), {ok, Limiter}; _ -> DeliverNum = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined), @@ -272,6 +274,11 @@ deliver(Result, Context, Pid, Topic, Cursor, Limiter) -> end end. +close_cursor({{qlc_cursor, _} = Cursor, _}) -> + qlc:delete_cursor(Cursor); +close_cursor(_Cursor) -> + ok. + do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) -> {ok, Limiter}; do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) -> diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index d4ad43907..993684ded 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -21,6 +21,7 @@ -include("emqx_retainer.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -461,6 +462,66 @@ t_flow_control(_) -> }), ok. +t_cursor_cleanup(_) -> + Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"), + LimiterCfg = make_limiter_cfg(Rate), + JsonCfg = make_limiter_json(<<"1/1s">>), + emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg), + emqx_retainer:update_config(#{ + <<"delivery_rate">> => <<"1/1s">>, + <<"flow_control">> => + #{ + <<"batch_read_number">> => 1, + <<"batch_deliver_number">> => 1, + <<"batch_deliver_limiter">> => JsonCfg + } + }), + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + lists:foreach( + fun(I) -> + emqtt:publish( + C1, + <<"retained/", (integer_to_binary(I))/binary>>, + <<"this is a retained message">>, + [{qos, 0}, {retain, true}] + ) + end, + lists:seq(1, 5) + ), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), + + snabbkaffe:start_trace(), + + ?assertWaitEvent( + emqtt:disconnect(C1), + #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/#">>}, + 2000 + ), + + QLCProcesses = lists:filter( + fun(Pid) -> + {current_function, {qlc, wait_for_request, 3}} =:= + erlang:process_info(Pid, current_function) + end, + erlang:processes() + ), + + ?assertEqual(0, length(QLCProcesses)), + + snabbkaffe:stop(), + + emqx_limiter_server:del_bucket(emqx_retainer, internal), + emqx_retainer:update_config(#{ + <<"flow_control">> => + #{ + <<"batch_read_number">> => 1, + <<"batch_deliver_number">> => 1 + } + }), + + ok. + t_clear_expired(_) -> ConfMod = fun(Conf) -> Conf#{