diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 87d0554c0..84921be6b 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -171,6 +171,10 @@ strategy(Group) -> get_default_shared_subscription_strategy() end. +-spec ack_enabled() -> boolean(). +ack_enabled() -> + emqx:get_config([broker, shared_dispatch_ack_enabled], false). + do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise SubPid ! {deliver, Topic, Msg}, @@ -182,8 +186,14 @@ do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> do_dispatch(SubPid, _Group, Topic, Msg, retry) -> %% Retry implies all subscribers nack:ed, send again without ack send(SubPid, Topic, {deliver, Topic, Msg}); -do_dispatch(SubPid, _Group, Topic, Msg, fresh) -> - send(SubPid, Topic, {deliver, Topic, Msg}). +do_dispatch(SubPid, Group, Topic, Msg, fresh) -> + case ack_enabled() of + true -> + %% TODO: delete this case after 5.1.0 + do_dispatch_with_ack(SubPid, Group, Topic, Msg); + false -> + send(SubPid, Topic, {deliver, Topic, Msg}) + end. -spec do_dispatch_with_ack(pid(), emqx_types:group(), emqx_types:topic(), emqx_types:message()) -> ok | {error, _}. diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index 5e9284a41..3de081df8 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -440,6 +440,7 @@ zone_global_defaults() -> server_keepalive => disabled, session_expiry_interval => 7200000, shared_subscription => true, + shared_subscription_strategy => round_robin, strict_mode => false, upgrade_qos => false, use_username_as_clientid => false, diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 7c6c144eb..d4bb9bbea 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -769,12 +769,12 @@ t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) -> %% Expected behaviour: %% the messages sent to client1's inflight and mq are re-dispatched after client1 is down t_dispatch_qos2({init, Config}) when is_list(Config) -> + ok = ensure_config(round_robin, _AckEnabled = false), emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1), Config; t_dispatch_qos2({'end', Config}) when is_list(Config) -> emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0); t_dispatch_qos2(Config) when is_list(Config) -> - ok = ensure_config(round_robin, _AckEnabled = false), Topic = <<"foo/bar/1">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, @@ -923,12 +923,12 @@ t_session_takeover(Config) when is_list(Config) -> ok. t_session_kicked({init, Config}) when is_list(Config) -> + ok = ensure_config(round_robin, _AckEnabled = false), emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1), Config; t_session_kicked({'end', Config}) when is_list(Config) -> emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0); t_session_kicked(Config) when is_list(Config) -> - ok = ensure_config(round_robin, _AckEnabled = false), Topic = <<"foo/bar/1">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, @@ -1019,12 +1019,12 @@ ensure_config(Strategy) -> ensure_config(Strategy, _AckEnabled = true). ensure_config(Strategy, AckEnabled) -> - emqx_config:put([broker, shared_subscription_strategy], Strategy), + emqx_config:put([mqtt, shared_subscription_strategy], Strategy), emqx_config:put([broker, shared_dispatch_ack_enabled], AckEnabled), ok. ensure_node_config(Node, Strategy) -> - rpc:call(Node, emqx_config, force_put, [[broker, shared_subscription_strategy], Strategy]). + rpc:call(Node, emqx_config, force_put, [[mqtt, shared_subscription_strategy], Strategy]). ensure_group_config(Group2Strategy) -> lists:foreach(