diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 3b533d9d3..cc57e001f 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -52,10 +52,8 @@ %% for testing -ifdef(TEST). --export([ subscribers/2 - , ack_enabled/0 - , strategy/1 - ]). +-compile(export_all). +-compile(nowarn_export_all). -endif. %% gen_server callbacks diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 1422f07b6..00edde5b1 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -481,6 +481,45 @@ t_handle_deliver_nl(_) -> NMsg = emqx_message:set_flag(nl, Msg), {ok, Channel} = emqx_channel:handle_deliver([{deliver, <<"t1">>, NMsg}], Channel). +t_handle_deliver_shared_in_no_connection(_) -> + Grp = <<"g">>, + Sender = self(), + Ref1 = make_ref(), + Ref2 = make_ref(), + Chann = emqx_channel:set_field(conn_state, disconnected, channel()), + + Msg0 = emqx_shared_sub:with_group_ack( + emqx_message:make(test, ?QOS_1, <<"t">>, <<"qos1">>), + Grp, + fresh, + Sender, + Ref1 + ), + Msg1 = emqx_shared_sub:with_group_ack( + emqx_message:make(test, ?QOS_2, <<"t">>, <<"qos2">>), + Grp, + retry, + Sender, + Ref2 + ), + Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}], + + %% all shared msgs should be queued if shared_dispatch_ack_enabled=false + meck:new(emqx_shared_sub, [passthrough, no_history]), + meck:expect(emqx_shared_sub, is_ack_required, fun(_) -> false end), + {ok, Chann1} = emqx_channel:handle_deliver(Delivers, Chann), + ?assertEqual(2, proplists:get_value(mqueue_len, emqx_channel:stats(Chann1))), + meck:unload(emqx_shared_sub), + + %% only fresh shared msgs should be queued if shared_dispatch_ack_enabled=true + meck:new(emqx_shared_sub, [passthrough, no_history]), + meck:expect(emqx_shared_sub, is_ack_required, fun(_) -> true end), + {ok, Chann2} = emqx_channel:handle_deliver(Delivers, Chann), + ?assertEqual(1, proplists:get_value(mqueue_len, emqx_channel:stats(Chann2))), + receive {Ref1, {shared_sub_nack, no_connection}} -> ok after 0 -> ?assert(false) end, + receive {Ref2, shared_sub_ack} -> ok after 0 -> ?assert(false) end, + meck:unload(emqx_shared_sub). + %%-------------------------------------------------------------------- %% Test cases for handle_out %%--------------------------------------------------------------------