diff --git a/CHANGES-5.0.md b/CHANGES-5.0.md index af8aaecf1..1a08e41e8 100644 --- a/CHANGES-5.0.md +++ b/CHANGES-5.0.md @@ -10,7 +10,7 @@ * Fix GET /listeners API crash When some nodes still in initial configuration. [#9002](https://github.com/emqx/emqx/pull/9002) * Fix empty variable interpolation in authentication and authorization. Placeholders for undefined variables are rendered now as empty strings and do not cause errors anymore. [#8963](https://github.com/emqx/emqx/pull/8963) * Fix the latency statistics error of the slow subscription module when `stats_type` is `internal` or `response`. [#8986](https://github.com/emqx/emqx/pull/8986) -* Redispatch shared subscription messages. +* Redispatch shared subscription messages. [#9104](https://github.com/emqx/emqx/pull/9104) # 5.0.8 diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index d1a111dc5..742868694 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -997,8 +997,13 @@ maybe_nack(Delivers) -> lists:filter(fun not_nacked/1, Delivers). not_nacked({deliver, _Topic, Msg}) -> - not (emqx_shared_sub:is_ack_required(Msg) andalso - (ok == emqx_shared_sub:nack_no_connection(Msg))). + case emqx_shared_sub:is_ack_required(Msg) of + true -> + ok = emqx_shared_sub:nack_no_connection(Msg), + false; + false -> + true + end. maybe_mark_as_delivered(Session, Delivers) -> case emqx_session:info(is_persistent, Session) of @@ -1222,6 +1227,8 @@ handle_call( ChanInfo1 = info(NChannel), emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}), reply(ok, reset_timer(alive_timer, NChannel)); +handle_call(get_mqueue, Channel) -> + reply({ok, get_mqueue(Channel)}, Channel); handle_call(Req, Channel) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), reply(ignored, Channel). @@ -2224,3 +2231,6 @@ get_mqtt_conf(Zone, Key, Default) -> set_field(Name, Value, Channel) -> Pos = emqx_misc:index_of(Name, record_info(fields, channel)), setelement(Pos + 1, Channel, Value). + +get_mqueue(#channel{session = Session}) -> + emqx_session:get_mqueue(Session). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 2e79bcfb1..b285d0a88 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -60,7 +60,8 @@ info/2, is_session/1, stats/1, - obtain_next_pkt_id/1 + obtain_next_pkt_id/1, + get_mqueue/1 ]). -export([ @@ -917,3 +918,6 @@ age(Now, Ts) -> Now - Ts. set_field(Name, Value, Session) -> Pos = emqx_misc:index_of(Name, record_info(fields, session)), setelement(Pos + 1, Session, Value). + +get_mqueue(#session{mqueue = Q}) -> + emqx_mqueue:to_list(Q). diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 0f7e082eb..975b403b9 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -47,8 +47,7 @@ maybe_ack/1, maybe_nack_dropped/1, nack_no_connection/1, - is_ack_required/1, - get_group/1 + is_ack_required/1 ]). %% for testing @@ -275,13 +274,6 @@ get_redispatch_to(Msg) -> -spec is_ack_required(emqx_types:message()) -> boolean(). is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). --spec get_group(emqx_types:message()) -> {ok, any()} | error. -get_group(Msg) -> - case get_group_ack(Msg) of - ?NO_ACK -> error; - {Group, _Sender, _Ref} -> {ok, Group} - end. - %% @doc Negative ack dropped message due to inflight window or message queue being full. -spec maybe_nack_dropped(emqx_types:message()) -> boolean(). maybe_nack_dropped(Msg) -> diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 5089a3a24..291286aa2 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -678,6 +678,40 @@ test_redispatch_qos1(_Config, AckEnabled) -> emqtt:stop(UsedSubPid2), ok. +t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) -> + ok = ensure_config(sticky, true), + Group = <<"group1">>, + Topic = <<"foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + SubOpts = [{clean_start, false}], + {ok, ConnPub} = emqtt:start_link([{clientid, <<"pub">>}]), + {ok, _} = emqtt:connect(ConnPub), + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1} | SubOpts]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2} | SubOpts]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}), + emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}), + + ok = emqtt:stop(ConnPid1), + ok = emqtt:stop(ConnPid2), + + [Pid1, Pid2] = emqx_shared_sub:subscribers(Group, Topic), + ?assert(is_process_alive(Pid1)), + ?assert(is_process_alive(Pid2)), + + {ok, _} = emqtt:publish(ConnPub, Topic, <<"hello11">>, 1), + ct:sleep(100), + {ok, Msgs1} = gen_server:call(Pid1, get_mqueue), + {ok, Msgs2} = gen_server:call(Pid2, get_mqueue), + %% assert the message is in mqueue (because socket is closed) + ?assertMatch([#message{payload = <<"hello11">>}], Msgs1 ++ Msgs2), + emqtt:stop(ConnPub), + ok. + %% No ack, QoS 2 subscriptions, %% client1 receives one message, send pubrec, then suspend %% client2 acts normal (auto_ack=true)