fix(shared): nack fix
This commit is contained in:
parent
4d19420100
commit
a661a26218
|
@ -630,7 +630,7 @@ maybe_ack(Msg) ->
|
||||||
emqx_shared_sub:maybe_ack(Msg).
|
emqx_shared_sub:maybe_ack(Msg).
|
||||||
|
|
||||||
maybe_nack(Msg) ->
|
maybe_nack(Msg) ->
|
||||||
ok == emqx_shared_sub:maybe_nack_dropped(Msg).
|
emqx_shared_sub:maybe_nack_dropped(Msg).
|
||||||
|
|
||||||
get_subopts(Topic, SubMap) ->
|
get_subopts(Topic, SubMap) ->
|
||||||
case maps:find(Topic, SubMap) of
|
case maps:find(Topic, SubMap) of
|
||||||
|
@ -813,16 +813,18 @@ run_terminate_hooks(ClientInfo, Reason, Session) ->
|
||||||
|
|
||||||
redispatch_shared_messages(#session{inflight = Inflight}) ->
|
redispatch_shared_messages(#session{inflight = Inflight}) ->
|
||||||
InflightList = emqx_inflight:to_list(Inflight),
|
InflightList = emqx_inflight:to_list(Inflight),
|
||||||
lists:map(fun({_, {#message{topic = Topic} = Msg, _}}) ->
|
lists:map(
|
||||||
case emqx_shared_sub:get_group(Msg) of
|
fun({_, {#message{topic = Topic} = Msg, _}}) ->
|
||||||
{ok, Group} ->
|
case emqx_shared_sub:get_group(Msg) of
|
||||||
Delivery = #delivery{sender = self(), message = Msg},
|
{ok, Group} ->
|
||||||
emqx_shared_sub:dispatch(Group, Topic, Delivery);
|
Delivery = #delivery{sender = self(), message = Msg},
|
||||||
|
emqx_shared_sub:dispatch(Group, Topic, Delivery);
|
||||||
_ ->
|
_ ->
|
||||||
false
|
false
|
||||||
end
|
end
|
||||||
end, InflightList).
|
end,
|
||||||
|
InflightList
|
||||||
|
).
|
||||||
|
|
||||||
cleanup_self_from_shared_subs() ->
|
cleanup_self_from_shared_subs() ->
|
||||||
emqx_shared_sub:cleanup(self()).
|
emqx_shared_sub:cleanup(self()).
|
||||||
|
|
|
@ -48,9 +48,9 @@ t_is_ack_required(_) ->
|
||||||
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
|
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
|
||||||
|
|
||||||
t_maybe_nack_dropped(_) ->
|
t_maybe_nack_dropped(_) ->
|
||||||
?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
|
?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
|
||||||
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
|
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
|
||||||
?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(Msg)),
|
?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
ok,
|
ok,
|
||||||
receive
|
receive
|
||||||
|
|
Loading…
Reference in New Issue