diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 9da2a4373..524078153 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -304,11 +304,11 @@ do_deliver([], _, _, _) -> require_semaphore(Semaphore, Id) -> Remained = ets:update_counter(?SHARED_CONTEXT_TAB, Semaphore, - {#shared_context.value, -1, 0, 0}), + {#shared_context.value, -1, -1, -1}), wait_semaphore(Remained, Id). -spec wait_semaphore(non_neg_integer(), pos_integer()) -> boolean(). -wait_semaphore(0, Id) -> +wait_semaphore(X, Id) when X < 0 -> gen_server:call(?MODULE, {?FUNCTION_NAME, Id}, infinity); wait_semaphore(_, _) -> true. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 3fa6b8abd..de2481580 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -40,12 +40,18 @@ end_per_suite(_Config) -> init_per_testcase(TestCase, Config) -> emqx_retainer:clean(), - Interval = case TestCase of - t_message_expiry_2 -> 2000; - _ -> 0 - end, - OldCfg = emqx_config:get([?APP]), - emqx_config:put([?APP], OldCfg#{msg_expiry_interval := Interval}), + DefaultCfg = new_emqx_retainer_conf(), + NewCfg = case TestCase of + t_message_expiry_2 -> + DefaultCfg#{msg_expiry_interval := 2000}; + t_flow_control -> + DefaultCfg#{flow_control := #{max_read_number => 1, + msg_deliver_quota => 1, + quota_release_interval => timer:seconds(1)}}; + _ -> + DefaultCfg + end, + emqx_retainer:update_config(NewCfg), application:ensure_all_started(emqx_retainer), Config. @@ -55,18 +61,20 @@ set_special_configs(_) -> ok. init_emqx_retainer_conf() -> - emqx_config:put([?APP], - #{enable => true, - msg_expiry_interval => 0, - msg_clear_interval => 0, - connector => [#{type => mnesia, - config => - #{max_retained_messages => 0, - storage_type => ram}}], - flow_control => #{max_read_number => 0, - msg_deliver_quota => 0, - quota_release_interval => 0}, - max_payload_size => 1024 * 1024}). + emqx_config:put([?APP], new_emqx_retainer_conf()). + +new_emqx_retainer_conf() -> + #{enable => true, + msg_expiry_interval => 0, + msg_clear_interval => 0, + connector => [#{type => mnesia, + config => + #{max_retained_messages => 0, + storage_type => ram}}], + flow_control => #{max_read_number => 0, + msg_deliver_quota => 0, + quota_release_interval => 0}, + max_payload_size => 1024 * 1024}. %%-------------------------------------------------------------------- %% Test Cases @@ -94,18 +102,35 @@ t_retain_handling(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), + %% rh = 0, no wildcard, and with empty retained message + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), + ?assertEqual(0, length(receive_messages(1))), + {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>), + + %% rh = 0, has wildcard, and with empty retained message + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), + ?assertEqual(0, length(receive_messages(1))), + {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>), + emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), ?assertEqual(1, length(receive_messages(1))), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), ?assertEqual(1, length(receive_messages(1))), + + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), + ?assertEqual(1, length(receive_messages(1))), + {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 1}]), ?assertEqual(1, length(receive_messages(1))), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 1}]), ?assertEqual(0, length(receive_messages(1))), + {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 2}]), @@ -191,6 +216,24 @@ t_clean(_) -> ok = emqtt:disconnect(C1). +t_flow_control(_) -> + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), + emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/1">>, <<"this is a retained message 1">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/3">>, <<"this is a retained message 3">>, [{qos, 0}, {retain, true}]), + Begin = erlang:system_time(millisecond), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), + ?assertEqual(3, length(receive_messages(3))), + End = erlang:system_time(millisecond), + Diff = End - Begin, + + %% msg_deliver_quota = 1 and quota_release_interval = 1, and there has three message + %% so total wait time is between in 1 ~ 2s(may be timer will delay, so plus 0.5s to maximum) + ?assert(Diff > timer:seconds(1) andalso Diff < timer:seconds(2.5)), + + ok = emqtt:disconnect(C1). + %%-------------------------------------------------------------------- %% Helper functions %%--------------------------------------------------------------------