fix(shared): drop pubrel from inflight collection before redispatch
This commit is contained in:
parent
ba1c276c75
commit
6769bd4edc
|
@ -639,8 +639,17 @@ run_terminate_hooks(ClientInfo, Reason, Session) ->
|
||||||
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
||||||
|
|
||||||
redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
|
redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
|
||||||
InflightList = lists:map(fun({_, {Msg, _Ts}}) -> Msg end,
|
F = fun({_, {Msg, _Ts}}) ->
|
||||||
emqx_inflight:to_list(sort_fun(), Inflight)),
|
case Msg of
|
||||||
|
#message{} ->
|
||||||
|
{true, Msg};
|
||||||
|
_ ->
|
||||||
|
%% QoS 2, after pubrec is received
|
||||||
|
%% the inflight record is updated to an atom
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
InflightList = lists:filtermap(F, emqx_inflight:to_list(sort_fun(), Inflight)),
|
||||||
MqList = mqueue_to_list(Q, []),
|
MqList = mqueue_to_list(Q, []),
|
||||||
emqx_shared_sub:redispatch(InflightList ++ MqList).
|
emqx_shared_sub:redispatch(InflightList ++ MqList).
|
||||||
|
|
||||||
|
|
|
@ -236,7 +236,7 @@ get_group_ack(Msg) ->
|
||||||
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
|
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
|
||||||
|
|
||||||
%% @hidden Redispatch is neede only for the messages with redispatch_to header added.
|
%% @hidden Redispatch is neede only for the messages with redispatch_to header added.
|
||||||
is_redispatch_needed(Msg) ->
|
is_redispatch_needed(#message{} = Msg) ->
|
||||||
case get_redispatch_to(Msg) of
|
case get_redispatch_to(Msg) of
|
||||||
?REDISPATCH_TO(_, _) ->
|
?REDISPATCH_TO(_, _) ->
|
||||||
true;
|
true;
|
||||||
|
@ -272,7 +272,7 @@ redispatch(Messages0) ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
redispatch_shared_message(Msg) ->
|
redispatch_shared_message(#message{} = Msg) ->
|
||||||
%% As long as it's still a #message{} record in inflight,
|
%% As long as it's still a #message{} record in inflight,
|
||||||
%% we should try to re-dispatch
|
%% we should try to re-dispatch
|
||||||
?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg),
|
?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg),
|
||||||
|
|
Loading…
Reference in New Issue