fix(shared): only re-dispatch QoS1 inflights

This commit is contained in:
Zaiming (Stone) Shi 2022-10-05 16:03:00 +02:00
parent 8d42589bf5
commit d23dfcca39
3 changed files with 24 additions and 10 deletions

View File

@ -32,6 +32,11 @@ File format:
- Added a test to prevent a last will testament message to be
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
- QoS1 and QoS2 messages in session's buffer are re-dispatched to other members in the group
when the session terminates [#9094](https://github.com/emqx/emqx/pull/9094).
Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true
to prevent sessions from buffering messages, however this acknowledgement comes with a cost.
### Bug fixes
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
@ -48,6 +53,11 @@ File format:
Same `format_status` callback is added here too for `gen_server`s which hold password in
their state.
- Fix shared subscription message re-dispatches [#9094](https://github.com/emqx/emqx/pull/9094).
- When discarding QoS 2 inflight messages, there were excessive logs
- For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic,
but not the subscrbing topic), caused messages to be lost when dispatching.
## v4.3.20
### Bug fixes

View File

@ -641,14 +641,18 @@ run_terminate_hooks(ClientInfo, Reason, Session) ->
redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
AllInflights = emqx_inflight:to_list(sort_fun(), Inflight),
F = fun({_, {Msg, _Ts}}) ->
case Msg of
#message{} ->
{true, Msg};
_ ->
%% QoS 2, after pubrec is received
%% the inflight record is updated to an atom
false
end
case Msg of
#message{qos = ?QOS_1} ->
%% For QoS 2, here is what the spec says:
%% If the Client's Session terminates before the Client reconnects,
%% the Server MUST NOT send the Application Message to any other
%% subscribed Client [MQTT-4.8.2-5].
{true, Msg};
_ ->
%% QoS 2, after pubrec is received
%% the inflight record is updated to an atom
false
end
end,
InflightList = lists:filtermap(F, AllInflights),
MqList = mqueue_to_list(Q, []),

View File

@ -550,7 +550,7 @@ t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
%% No ack, QoS 2 subscriptions,
%% client1 receives one message, send pubrec, then suspend
%% client2 acts normal (aot_ack=true)
%% client2 acts normal (auto_ack=true)
%% Expected behaviour:
%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
t_dispatch_qos2({init, Config}) when is_list(Config) ->
@ -593,7 +593,7 @@ t_dispatch_qos2(Config) when is_list(Config) ->
?assert(MsgRec2 > MsgRec1),
sys:resume(ConnPid1),
%% emqtt automatically send PUBREC, but since auto_ack is set to false
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
%% the 4th message yet since max_inflight is 1.
MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),