test: ensure shared messages queued by session
This commit is contained in:
parent
95bc9cd8e0
commit
2440733a6f
|
@ -52,10 +52,8 @@
|
||||||
|
|
||||||
%% for testing
|
%% for testing
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([ subscribers/2
|
-compile(export_all).
|
||||||
, ack_enabled/0
|
-compile(nowarn_export_all).
|
||||||
, strategy/1
|
|
||||||
]).
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
|
|
@ -481,6 +481,45 @@ t_handle_deliver_nl(_) ->
|
||||||
NMsg = emqx_message:set_flag(nl, Msg),
|
NMsg = emqx_message:set_flag(nl, Msg),
|
||||||
{ok, Channel} = emqx_channel:handle_deliver([{deliver, <<"t1">>, NMsg}], Channel).
|
{ok, Channel} = emqx_channel:handle_deliver([{deliver, <<"t1">>, NMsg}], Channel).
|
||||||
|
|
||||||
|
t_handle_deliver_shared_in_no_connection(_) ->
|
||||||
|
Grp = <<"g">>,
|
||||||
|
Sender = self(),
|
||||||
|
Ref1 = make_ref(),
|
||||||
|
Ref2 = make_ref(),
|
||||||
|
Chann = emqx_channel:set_field(conn_state, disconnected, channel()),
|
||||||
|
|
||||||
|
Msg0 = emqx_shared_sub:with_group_ack(
|
||||||
|
emqx_message:make(test, ?QOS_1, <<"t">>, <<"qos1">>),
|
||||||
|
Grp,
|
||||||
|
fresh,
|
||||||
|
Sender,
|
||||||
|
Ref1
|
||||||
|
),
|
||||||
|
Msg1 = emqx_shared_sub:with_group_ack(
|
||||||
|
emqx_message:make(test, ?QOS_2, <<"t">>, <<"qos2">>),
|
||||||
|
Grp,
|
||||||
|
retry,
|
||||||
|
Sender,
|
||||||
|
Ref2
|
||||||
|
),
|
||||||
|
Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
|
||||||
|
|
||||||
|
%% all shared msgs should be queued if shared_dispatch_ack_enabled=false
|
||||||
|
meck:new(emqx_shared_sub, [passthrough, no_history]),
|
||||||
|
meck:expect(emqx_shared_sub, is_ack_required, fun(_) -> false end),
|
||||||
|
{ok, Chann1} = emqx_channel:handle_deliver(Delivers, Chann),
|
||||||
|
?assertEqual(2, proplists:get_value(mqueue_len, emqx_channel:stats(Chann1))),
|
||||||
|
meck:unload(emqx_shared_sub),
|
||||||
|
|
||||||
|
%% only fresh shared msgs should be queued if shared_dispatch_ack_enabled=true
|
||||||
|
meck:new(emqx_shared_sub, [passthrough, no_history]),
|
||||||
|
meck:expect(emqx_shared_sub, is_ack_required, fun(_) -> true end),
|
||||||
|
{ok, Chann2} = emqx_channel:handle_deliver(Delivers, Chann),
|
||||||
|
?assertEqual(1, proplists:get_value(mqueue_len, emqx_channel:stats(Chann2))),
|
||||||
|
receive {Ref1, {shared_sub_nack, no_connection}} -> ok after 0 -> ?assert(false) end,
|
||||||
|
receive {Ref2, shared_sub_ack} -> ok after 0 -> ?assert(false) end,
|
||||||
|
meck:unload(emqx_shared_sub).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases for handle_out
|
%% Test cases for handle_out
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue