fix(emqx_retainer): fix remaining quota check error
This commit is contained in:
parent
2476664385
commit
4427ec8155
|
@ -304,11 +304,11 @@ do_deliver([], _, _, _) ->
|
||||||
require_semaphore(Semaphore, Id) ->
|
require_semaphore(Semaphore, Id) ->
|
||||||
Remained = ets:update_counter(?SHARED_CONTEXT_TAB,
|
Remained = ets:update_counter(?SHARED_CONTEXT_TAB,
|
||||||
Semaphore,
|
Semaphore,
|
||||||
{#shared_context.value, -1, 0, 0}),
|
{#shared_context.value, -1, -1, -1}),
|
||||||
wait_semaphore(Remained, Id).
|
wait_semaphore(Remained, Id).
|
||||||
|
|
||||||
-spec wait_semaphore(non_neg_integer(), pos_integer()) -> boolean().
|
-spec wait_semaphore(non_neg_integer(), pos_integer()) -> boolean().
|
||||||
wait_semaphore(0, Id) ->
|
wait_semaphore(X, Id) when X < 0 ->
|
||||||
gen_server:call(?MODULE, {?FUNCTION_NAME, Id}, infinity);
|
gen_server:call(?MODULE, {?FUNCTION_NAME, Id}, infinity);
|
||||||
wait_semaphore(_, _) ->
|
wait_semaphore(_, _) ->
|
||||||
true.
|
true.
|
||||||
|
|
|
@ -40,12 +40,18 @@ end_per_suite(_Config) ->
|
||||||
|
|
||||||
init_per_testcase(TestCase, Config) ->
|
init_per_testcase(TestCase, Config) ->
|
||||||
emqx_retainer:clean(),
|
emqx_retainer:clean(),
|
||||||
Interval = case TestCase of
|
DefaultCfg = new_emqx_retainer_conf(),
|
||||||
t_message_expiry_2 -> 2000;
|
NewCfg = case TestCase of
|
||||||
_ -> 0
|
t_message_expiry_2 ->
|
||||||
|
DefaultCfg#{msg_expiry_interval := 2000};
|
||||||
|
t_flow_control ->
|
||||||
|
DefaultCfg#{flow_control := #{max_read_number => 1,
|
||||||
|
msg_deliver_quota => 1,
|
||||||
|
quota_release_interval => timer:seconds(1)}};
|
||||||
|
_ ->
|
||||||
|
DefaultCfg
|
||||||
end,
|
end,
|
||||||
OldCfg = emqx_config:get([?APP]),
|
emqx_retainer:update_config(NewCfg),
|
||||||
emqx_config:put([?APP], OldCfg#{msg_expiry_interval := Interval}),
|
|
||||||
application:ensure_all_started(emqx_retainer),
|
application:ensure_all_started(emqx_retainer),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
@ -55,7 +61,9 @@ set_special_configs(_) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_emqx_retainer_conf() ->
|
init_emqx_retainer_conf() ->
|
||||||
emqx_config:put([?APP],
|
emqx_config:put([?APP], new_emqx_retainer_conf()).
|
||||||
|
|
||||||
|
new_emqx_retainer_conf() ->
|
||||||
#{enable => true,
|
#{enable => true,
|
||||||
msg_expiry_interval => 0,
|
msg_expiry_interval => 0,
|
||||||
msg_clear_interval => 0,
|
msg_clear_interval => 0,
|
||||||
|
@ -66,7 +74,7 @@ init_emqx_retainer_conf() ->
|
||||||
flow_control => #{max_read_number => 0,
|
flow_control => #{max_read_number => 0,
|
||||||
msg_deliver_quota => 0,
|
msg_deliver_quota => 0,
|
||||||
quota_release_interval => 0},
|
quota_release_interval => 0},
|
||||||
max_payload_size => 1024 * 1024}).
|
max_payload_size => 1024 * 1024}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test Cases
|
%% Test Cases
|
||||||
|
@ -94,18 +102,35 @@ t_retain_handling(_) ->
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
|
||||||
|
%% rh = 0, no wildcard, and with empty retained message
|
||||||
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
||||||
|
?assertEqual(0, length(receive_messages(1))),
|
||||||
|
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
|
||||||
|
|
||||||
|
%% rh = 0, has wildcard, and with empty retained message
|
||||||
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
|
||||||
|
?assertEqual(0, length(receive_messages(1))),
|
||||||
|
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>),
|
||||||
|
|
||||||
emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]),
|
emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]),
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
||||||
?assertEqual(1, length(receive_messages(1))),
|
?assertEqual(1, length(receive_messages(1))),
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
||||||
?assertEqual(1, length(receive_messages(1))),
|
?assertEqual(1, length(receive_messages(1))),
|
||||||
|
|
||||||
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
|
||||||
|
?assertEqual(1, length(receive_messages(1))),
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
|
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 1}]),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 1}]),
|
||||||
?assertEqual(1, length(receive_messages(1))),
|
?assertEqual(1, length(receive_messages(1))),
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 1}]),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 1}]),
|
||||||
?assertEqual(0, length(receive_messages(1))),
|
?assertEqual(0, length(receive_messages(1))),
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
|
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 2}]),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 2}]),
|
||||||
|
@ -191,6 +216,24 @@ t_clean(_) ->
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1).
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
|
t_flow_control(_) ->
|
||||||
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]),
|
||||||
|
emqtt:publish(C1, <<"retained/1">>, <<"this is a retained message 1">>, [{qos, 0}, {retain, true}]),
|
||||||
|
emqtt:publish(C1, <<"retained/3">>, <<"this is a retained message 3">>, [{qos, 0}, {retain, true}]),
|
||||||
|
Begin = erlang:system_time(millisecond),
|
||||||
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
|
||||||
|
?assertEqual(3, length(receive_messages(3))),
|
||||||
|
End = erlang:system_time(millisecond),
|
||||||
|
Diff = End - Begin,
|
||||||
|
|
||||||
|
%% msg_deliver_quota = 1 and quota_release_interval = 1, and there has three message
|
||||||
|
%% so total wait time is between in 1 ~ 2s(may be timer will delay, so plus 0.5s to maximum)
|
||||||
|
?assert(Diff > timer:seconds(1) andalso Diff < timer:seconds(2.5)),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue