Merge pull request #8918 from HJianBo/fix-shared-subs-dead-msg-loop

fix(shared-sub): fix dead loop if all subscribers are disconected
This commit is contained in:
Zaiming (Stone) Shi 2022-09-08 13:25:10 +02:00 committed by GitHub
commit 2ef6008599
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 99 additions and 29 deletions

View File

@ -16,6 +16,7 @@ File format:
- Fix rule-engine update behaviour which may initialize actions for disabled rules. [#8849](https://github.com/emqx/emqx/pull/8849)
- Fix JWT plugin don't support non-integer timestamp claims. [#8862](https://github.com/emqx/emqx/pull/8862)
- Fix a possible dead loop caused by shared subscriptions with `shared_dispatch_ack_enabled=true`. [#8918](https://github.com/emqx/emqx/pull/8918)
- Fix dashboard binding IP address not working. [#8916](https://github.com/emqx/emqx/pull/8916)

View File

@ -2,18 +2,22 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.20",
[{load_module,emqx_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}]},
{"4.3.19",
[{load_module,emqx_message,brutal_purge,soft_purge,[]},
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
{"4.3.18",
[{load_module,emqx_message,brutal_purge,soft_purge,[]},
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
@ -23,11 +27,11 @@
[{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_exclusive_subscription,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
@ -94,12 +98,12 @@
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
@ -506,6 +510,7 @@
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
@ -513,7 +518,6 @@
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
@ -549,13 +553,13 @@
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
@ -712,18 +716,22 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.20",
[{load_module,emqx_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}]},
{"4.3.19",
[{load_module,emqx_message,brutal_purge,soft_purge,[]},
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
{"4.3.18",
[{load_module,emqx_message,brutal_purge,soft_purge,[]},
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
@ -733,11 +741,11 @@
[{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_exclusive_subscription,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
@ -807,12 +815,12 @@
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
@ -1193,6 +1201,7 @@
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
@ -1204,7 +1213,6 @@
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
@ -1234,6 +1242,7 @@
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
@ -1244,7 +1253,6 @@
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},

View File

@ -735,8 +735,8 @@ handle_deliver(Delivers, Channel = #channel{
%% NOTE: Order is important here. While the takeover is in
%% progress, the session cannot enqueue messages, since it already
%% passed on the queue to the new connection in the session state.
NPendings = lists:append(Pendings,
ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session)),
NDelivers = ignore_local(ClientInfo, maybe_discard_shared_delivers(Delivers), ClientId, Session),
NPendings = lists:append(Pendings, NDelivers),
{ok, Channel#channel{pendings = NPendings}};
handle_deliver(Delivers, Channel = #channel{
@ -744,8 +744,8 @@ handle_deliver(Delivers, Channel = #channel{
takeover = false,
session = Session,
clientinfo = #{clientid := ClientId} = ClientInfo}) ->
NSession = emqx_session:enqueue(ClientInfo,
ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session), Session),
NDelivers = ignore_local(ClientInfo, maybe_discard_shared_delivers(Delivers), ClientId, Session),
NSession = emqx_session:enqueue(ClientInfo, NDelivers, Session),
{ok, Channel#channel{session = NSession}};
handle_deliver(Delivers, Channel = #channel{
@ -776,12 +776,23 @@ ignore_local(ClientInfo, Delivers, Subscriber, Session) ->
end, Delivers).
%% Nack delivers from shared subscription
maybe_nack(Delivers) ->
lists:filter(fun not_nacked/1, Delivers).
not_nacked({deliver, _Topic, Msg}) ->
not (emqx_shared_sub:is_ack_required(Msg)
andalso (ok == emqx_shared_sub:nack_no_connection(Msg))).
maybe_discard_shared_delivers(Delivers) ->
lists:filtermap(
fun({deliver, Topic, Msg}) ->
case emqx_shared_sub:is_ack_required(Msg) of
false ->
true;
true ->
case emqx_shared_sub:is_retry_dispatch(Msg) of
true ->
%% force enqueue the retried shared deliver
{true, {deliver, Topic, emqx_shared_sub:maybe_ack(Msg)}};
false ->
ok = emqx_shared_sub:nack_no_connection(Msg),
false
end
end
end, Delivers).
%%--------------------------------------------------------------------
%% Handle outgoing packet

View File

@ -46,15 +46,14 @@
, maybe_nack_dropped/1
, nack_no_connection/1
, is_ack_required/1
, is_retry_dispatch/1
, get_group/1
]).
%% for testing
-ifdef(TEST).
-export([ subscribers/2
, ack_enabled/0
, strategy/1
]).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
%% gen_server callbacks
@ -239,6 +238,13 @@ get_group(Msg) ->
-spec(is_ack_required(emqx_types:message()) -> boolean()).
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
-spec(is_retry_dispatch(emqx_types:message()) -> boolean()).
is_retry_dispatch(Msg) ->
case get_group_ack(Msg) of
{_Sender, {retry, _Group, _Ref}} -> true;
_ -> false
end.
%% @doc Negative ack dropped message due to inflight window or message queue being full.
-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop).
maybe_nack_dropped(Msg) ->
@ -280,10 +286,15 @@ maybe_ack(Msg) ->
Msg;
Ack ->
{Sender, Ref} = fetch_sender_ref(Ack),
Sender ! {Ref, ?ACK},
ack(Sender, Ref),
without_group_ack(Msg)
end.
-spec(ack(pid(), reference()) -> ok).
ack(Sender, Ref) ->
Sender ! {Ref, ?ACK},
ok.
fetch_sender_ref({Sender, {_Type, _Group, Ref}}) -> {Sender, Ref};
%% These clauses are for backward compatibility
fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}.

View File

@ -481,6 +481,45 @@ t_handle_deliver_nl(_) ->
NMsg = emqx_message:set_flag(nl, Msg),
{ok, Channel} = emqx_channel:handle_deliver([{deliver, <<"t1">>, NMsg}], Channel).
t_handle_deliver_shared_in_no_connection(_) ->
Grp = <<"g">>,
Sender = self(),
Ref1 = make_ref(),
Ref2 = make_ref(),
Chann = emqx_channel:set_field(conn_state, disconnected, channel()),
Msg0 = emqx_shared_sub:with_group_ack(
emqx_message:make(test, ?QOS_1, <<"t">>, <<"qos1">>),
Grp,
fresh,
Sender,
Ref1
),
Msg1 = emqx_shared_sub:with_group_ack(
emqx_message:make(test, ?QOS_2, <<"t">>, <<"qos2">>),
Grp,
retry,
Sender,
Ref2
),
Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
%% all shared msgs should be queued if shared_dispatch_ack_enabled=false
meck:new(emqx_shared_sub, [passthrough, no_history]),
meck:expect(emqx_shared_sub, is_ack_required, fun(_) -> false end),
{ok, Chann1} = emqx_channel:handle_deliver(Delivers, Chann),
?assertEqual(2, proplists:get_value(mqueue_len, emqx_channel:stats(Chann1))),
meck:unload(emqx_shared_sub),
%% only fresh shared msgs should be queued if shared_dispatch_ack_enabled=true
meck:new(emqx_shared_sub, [passthrough, no_history]),
meck:expect(emqx_shared_sub, is_ack_required, fun(_) -> true end),
{ok, Chann2} = emqx_channel:handle_deliver(Delivers, Chann),
?assertEqual(1, proplists:get_value(mqueue_len, emqx_channel:stats(Chann2))),
receive {Ref1, {shared_sub_nack, no_connection}} -> ok after 0 -> ?assert(false) end,
receive {Ref2, shared_sub_ack} -> ok after 0 -> ?assert(false) end,
meck:unload(emqx_shared_sub).
%%--------------------------------------------------------------------
%% Test cases for handle_out
%%--------------------------------------------------------------------