From d23dfcca39efa778f056c4a2094cd6e900c62fcf Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 5 Oct 2022 16:03:00 +0200 Subject: [PATCH] fix(shared): only re-dispatch QoS1 inflights --- CHANGES-4.3.md | 10 ++++++++++ src/emqx_session.erl | 20 ++++++++++++-------- test/emqx_shared_sub_SUITE.erl | 4 ++-- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 713f89f37..ef96ce745 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -32,6 +32,11 @@ File format: - Added a test to prevent a last will testament message to be published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894) +- QoS1 and QoS2 messages in session's buffer are re-dispatched to other members in the group + when the session terminates [#9094](https://github.com/emqx/emqx/pull/9094). + Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true + to prevent sessions from buffering messages, however this acknowledgement comes with a cost. + ### Bug fixes - Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908) @@ -48,6 +53,11 @@ File format: Same `format_status` callback is added here too for `gen_server`s which hold password in their state. +- Fix shared subscription message re-dispatches [#9094](https://github.com/emqx/emqx/pull/9094). + - When discarding QoS 2 inflight messages, there were excessive logs + - For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic, + but not the subscrbing topic), caused messages to be lost when dispatching. + ## v4.3.20 ### Bug fixes diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 40c8eacc0..d2379a3fd 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -641,14 +641,18 @@ run_terminate_hooks(ClientInfo, Reason, Session) -> redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> AllInflights = emqx_inflight:to_list(sort_fun(), Inflight), F = fun({_, {Msg, _Ts}}) -> - case Msg of - #message{} -> - {true, Msg}; - _ -> - %% QoS 2, after pubrec is received - %% the inflight record is updated to an atom - false - end + case Msg of + #message{qos = ?QOS_1} -> + %% For QoS 2, here is what the spec says: + %% If the Client's Session terminates before the Client reconnects, + %% the Server MUST NOT send the Application Message to any other + %% subscribed Client [MQTT-4.8.2-5]. + {true, Msg}; + _ -> + %% QoS 2, after pubrec is received + %% the inflight record is updated to an atom + false + end end, InflightList = lists:filtermap(F, AllInflights), MqList = mqueue_to_list(Q, []), diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index ba216abba..a16f948ff 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -550,7 +550,7 @@ t_dispatch_when_inflights_are_full(Config) when is_list(Config) -> %% No ack, QoS 2 subscriptions, %% client1 receives one message, send pubrec, then suspend -%% client2 acts normal (aot_ack=true) +%% client2 acts normal (auto_ack=true) %% Expected behaviour: %% the messages sent to client1's inflight and mq are re-dispatched after client1 is down t_dispatch_qos2({init, Config}) when is_list(Config) -> @@ -593,7 +593,7 @@ t_dispatch_qos2(Config) when is_list(Config) -> ?assert(MsgRec2 > MsgRec1), sys:resume(ConnPid1), - %% emqtt automatically send PUBREC, but since auto_ack is set to false + %% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false %% so it will never send PUBCOMP, hence EMQX should not attempt to send %% the 4th message yet since max_inflight is 1. MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),