diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index aa250b7ad..8ed7a470a 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -16,6 +16,7 @@ File format: - Fix rule-engine update behaviour which may initialize actions for disabled rules. [#8849](https://github.com/emqx/emqx/pull/8849) - Fix JWT plugin don't support non-integer timestamp claims. [#8862](https://github.com/emqx/emqx/pull/8862) +- Fix a possible dead loop caused by shared subscriptions with `shared_dispatch_ack_enabled=true`. [#8918](https://github.com/emqx/emqx/pull/8918) - Fix dashboard binding IP address not working. [#8916](https://github.com/emqx/emqx/pull/8916) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 682da8bf6..de5cf7fe0 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,18 +2,22 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.3.20", - [{load_module,emqx_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}]}, {"4.3.19", - [{load_module,emqx_message,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.3.18", - [{load_module,emqx_message,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -23,11 +27,11 @@ [{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_exclusive_subscription,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, - {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {update,emqx_broker_sup,supervisor}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, @@ -94,12 +98,12 @@ {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, - {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {update,emqx_broker_sup,supervisor}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, @@ -506,6 +510,7 @@ {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, @@ -513,7 +518,6 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, - {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {update,emqx_broker_sup,supervisor}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, @@ -549,13 +553,13 @@ {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, - {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {update,emqx_broker_sup,supervisor}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -712,18 +716,22 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.20", - [{load_module,emqx_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}]}, {"4.3.19", - [{load_module,emqx_message,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.3.18", - [{load_module,emqx_message,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -733,11 +741,11 @@ [{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_exclusive_subscription,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, - {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {update,emqx_broker_sup,supervisor}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, @@ -807,12 +815,12 @@ {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, - {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {update,emqx_broker_sup,supervisor}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, @@ -1193,6 +1201,7 @@ {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, @@ -1204,7 +1213,6 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, - {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {update,emqx_broker_sup,supervisor}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, @@ -1234,6 +1242,7 @@ {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, @@ -1244,7 +1253,6 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, - {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {update,emqx_broker_sup,supervisor}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 939be4270..c49299c58 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -735,8 +735,8 @@ handle_deliver(Delivers, Channel = #channel{ %% NOTE: Order is important here. While the takeover is in %% progress, the session cannot enqueue messages, since it already %% passed on the queue to the new connection in the session state. - NPendings = lists:append(Pendings, - ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session)), + NDelivers = ignore_local(ClientInfo, maybe_discard_shared_delivers(Delivers), ClientId, Session), + NPendings = lists:append(Pendings, NDelivers), {ok, Channel#channel{pendings = NPendings}}; handle_deliver(Delivers, Channel = #channel{ @@ -744,8 +744,8 @@ handle_deliver(Delivers, Channel = #channel{ takeover = false, session = Session, clientinfo = #{clientid := ClientId} = ClientInfo}) -> - NSession = emqx_session:enqueue(ClientInfo, - ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session), Session), + NDelivers = ignore_local(ClientInfo, maybe_discard_shared_delivers(Delivers), ClientId, Session), + NSession = emqx_session:enqueue(ClientInfo, NDelivers, Session), {ok, Channel#channel{session = NSession}}; handle_deliver(Delivers, Channel = #channel{ @@ -776,12 +776,23 @@ ignore_local(ClientInfo, Delivers, Subscriber, Session) -> end, Delivers). %% Nack delivers from shared subscription -maybe_nack(Delivers) -> - lists:filter(fun not_nacked/1, Delivers). - -not_nacked({deliver, _Topic, Msg}) -> - not (emqx_shared_sub:is_ack_required(Msg) - andalso (ok == emqx_shared_sub:nack_no_connection(Msg))). +maybe_discard_shared_delivers(Delivers) -> + lists:filtermap( + fun({deliver, Topic, Msg}) -> + case emqx_shared_sub:is_ack_required(Msg) of + false -> + true; + true -> + case emqx_shared_sub:is_retry_dispatch(Msg) of + true -> + %% force enqueue the retried shared deliver + {true, {deliver, Topic, emqx_shared_sub:maybe_ack(Msg)}}; + false -> + ok = emqx_shared_sub:nack_no_connection(Msg), + false + end + end + end, Delivers). %%-------------------------------------------------------------------- %% Handle outgoing packet diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 337d63d15..cc57e001f 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -46,15 +46,14 @@ , maybe_nack_dropped/1 , nack_no_connection/1 , is_ack_required/1 + , is_retry_dispatch/1 , get_group/1 ]). %% for testing -ifdef(TEST). --export([ subscribers/2 - , ack_enabled/0 - , strategy/1 - ]). +-compile(export_all). +-compile(nowarn_export_all). -endif. %% gen_server callbacks @@ -239,6 +238,13 @@ get_group(Msg) -> -spec(is_ack_required(emqx_types:message()) -> boolean()). is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). +-spec(is_retry_dispatch(emqx_types:message()) -> boolean()). +is_retry_dispatch(Msg) -> + case get_group_ack(Msg) of + {_Sender, {retry, _Group, _Ref}} -> true; + _ -> false + end. + %% @doc Negative ack dropped message due to inflight window or message queue being full. -spec(maybe_nack_dropped(emqx_types:message()) -> store | drop). maybe_nack_dropped(Msg) -> @@ -280,10 +286,15 @@ maybe_ack(Msg) -> Msg; Ack -> {Sender, Ref} = fetch_sender_ref(Ack), - Sender ! {Ref, ?ACK}, + ack(Sender, Ref), without_group_ack(Msg) end. +-spec(ack(pid(), reference()) -> ok). +ack(Sender, Ref) -> + Sender ! {Ref, ?ACK}, + ok. + fetch_sender_ref({Sender, {_Type, _Group, Ref}}) -> {Sender, Ref}; %% These clauses are for backward compatibility fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}. diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 1422f07b6..00edde5b1 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -481,6 +481,45 @@ t_handle_deliver_nl(_) -> NMsg = emqx_message:set_flag(nl, Msg), {ok, Channel} = emqx_channel:handle_deliver([{deliver, <<"t1">>, NMsg}], Channel). +t_handle_deliver_shared_in_no_connection(_) -> + Grp = <<"g">>, + Sender = self(), + Ref1 = make_ref(), + Ref2 = make_ref(), + Chann = emqx_channel:set_field(conn_state, disconnected, channel()), + + Msg0 = emqx_shared_sub:with_group_ack( + emqx_message:make(test, ?QOS_1, <<"t">>, <<"qos1">>), + Grp, + fresh, + Sender, + Ref1 + ), + Msg1 = emqx_shared_sub:with_group_ack( + emqx_message:make(test, ?QOS_2, <<"t">>, <<"qos2">>), + Grp, + retry, + Sender, + Ref2 + ), + Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}], + + %% all shared msgs should be queued if shared_dispatch_ack_enabled=false + meck:new(emqx_shared_sub, [passthrough, no_history]), + meck:expect(emqx_shared_sub, is_ack_required, fun(_) -> false end), + {ok, Chann1} = emqx_channel:handle_deliver(Delivers, Chann), + ?assertEqual(2, proplists:get_value(mqueue_len, emqx_channel:stats(Chann1))), + meck:unload(emqx_shared_sub), + + %% only fresh shared msgs should be queued if shared_dispatch_ack_enabled=true + meck:new(emqx_shared_sub, [passthrough, no_history]), + meck:expect(emqx_shared_sub, is_ack_required, fun(_) -> true end), + {ok, Chann2} = emqx_channel:handle_deliver(Delivers, Chann), + ?assertEqual(1, proplists:get_value(mqueue_len, emqx_channel:stats(Chann2))), + receive {Ref1, {shared_sub_nack, no_connection}} -> ok after 0 -> ?assert(false) end, + receive {Ref2, shared_sub_ack} -> ok after 0 -> ?assert(false) end, + meck:unload(emqx_shared_sub). + %%-------------------------------------------------------------------- %% Test cases for handle_out %%--------------------------------------------------------------------