diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 78a518d9b..b871747ba 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -630,7 +630,7 @@ maybe_ack(Msg) -> emqx_shared_sub:maybe_ack(Msg). maybe_nack(Msg) -> - ok == emqx_shared_sub:maybe_nack_dropped(Msg). + emqx_shared_sub:maybe_nack_dropped(Msg). get_subopts(Topic, SubMap) -> case maps:find(Topic, SubMap) of @@ -813,16 +813,18 @@ run_terminate_hooks(ClientInfo, Reason, Session) -> redispatch_shared_messages(#session{inflight = Inflight}) -> InflightList = emqx_inflight:to_list(Inflight), - lists:map(fun({_, {#message{topic = Topic} = Msg, _}}) -> - case emqx_shared_sub:get_group(Msg) of - {ok, Group} -> - Delivery = #delivery{sender = self(), message = Msg}, - emqx_shared_sub:dispatch(Group, Topic, Delivery); - - _ -> - false - end - end, InflightList). + lists:map( + fun({_, {#message{topic = Topic} = Msg, _}}) -> + case emqx_shared_sub:get_group(Msg) of + {ok, Group} -> + Delivery = #delivery{sender = self(), message = Msg}, + emqx_shared_sub:dispatch(Group, Topic, Delivery); + _ -> + false + end + end, + InflightList + ). cleanup_self_from_shared_subs() -> emqx_shared_sub:cleanup(self()). diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 82e3f76d6..39f55ceb8 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -48,9 +48,9 @@ t_is_ack_required(_) -> ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})). t_maybe_nack_dropped(_) -> - ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), + ?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, - ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(Msg)), + ?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual( ok, receive