diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 73eacea8b..f87fc1f23 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -639,8 +639,17 @@ run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> - InflightList = lists:map(fun({_, {Msg, _Ts}}) -> Msg end, - emqx_inflight:to_list(sort_fun(), Inflight)), + F = fun({_, {Msg, _Ts}}) -> + case Msg of + #message{} -> + {true, Msg}; + _ -> + %% QoS 2, after pubrec is received + %% the inflight record is updated to an atom + false + end + end, + InflightList = lists:filtermap(F, emqx_inflight:to_list(sort_fun(), Inflight)), MqList = mqueue_to_list(Q, []), emqx_shared_sub:redispatch(InflightList ++ MqList). diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index be897569a..8667ae6c4 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -236,7 +236,7 @@ get_group_ack(Msg) -> emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). %% @hidden Redispatch is neede only for the messages with redispatch_to header added. -is_redispatch_needed(Msg) -> +is_redispatch_needed(#message{} = Msg) -> case get_redispatch_to(Msg) of ?REDISPATCH_TO(_, _) -> true; @@ -272,7 +272,7 @@ redispatch(Messages0) -> ok end. -redispatch_shared_message(Msg) -> +redispatch_shared_message(#message{} = Msg) -> %% As long as it's still a #message{} record in inflight, %% we should try to re-dispatch ?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg),