From 31d4c92a175ced11f4df78a15965653c41cfd224 Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Wed, 18 May 2022 14:33:09 +0400 Subject: [PATCH 1/2] fix(shared): retry dispatch when inflights are full --- src/emqx_session.erl | 7 +- src/emqx_shared_sub.erl | 132 ++++++++++++++++++++++++--------- test/emqx_shared_sub_SUITE.erl | 57 ++++++++++++-- 3 files changed, 151 insertions(+), 45 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d01f33e6a..20a61b4f6 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -449,8 +449,8 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = case emqx_inflight:is_full(Inflight) of true -> Session1 = case maybe_nack(Msg) of - true -> Session; - false -> enqueue(ClientInfo, Msg, Session) + drop -> Session; + store -> enqueue(ClientInfo, Msg, Session) end, {ok, Session1}; false -> @@ -650,7 +650,7 @@ redispatch_shared_messages(#session{inflight = Inflight}) -> %% Note that dispatch is called with self() in failed subs %% This is done to avoid dispatching back to caller Delivery = #delivery{sender = self(), message = Msg}, - emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]); + emqx_shared_sub:dispatch_to_non_self(Group, Topic, Delivery); _ -> false end; @@ -716,4 +716,3 @@ age(Now, Ts) -> Now - Ts. set_field(Name, Value, Session) -> Pos = emqx_misc:index_of(Name, record_info(fields, session)), setelement(Pos+1, Session, Value). - diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 121d777ca..c7fb1a3cb 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -39,8 +39,8 @@ ]). -export([ dispatch/3 - , dispatch/4 - ]). + , dispatch_to_non_self/3 + ]). -export([ maybe_ack/1 , maybe_nack_dropped/1 @@ -123,22 +123,28 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. +-spec(dispatch_to_non_self(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery()) + -> emqx_types:deliver_result()). +dispatch_to_non_self(Group, Topic, Delivery) -> + Strategy = strategy(Group), + dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{self() => sender}). + -spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). dispatch(Group, Topic, Delivery) -> - dispatch(Group, Topic, Delivery, _FailedSubs = []). + Strategy = strategy(Group), + dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}). -dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> +dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> #message{from = ClientId, topic = SourceTopic} = Msg, - case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of - false -> - {error, no_subscribers}; + case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of + false -> {error, no_subscribers}; {Type, SubPid} -> case do_dispatch(SubPid, Group, Topic, Msg, Type) of ok -> {ok, 1}; - {error, _Reason} -> + {error, Reason} -> %% Failed to dispatch to this sub, try next. - dispatch(Group, Topic, Delivery, [SubPid | FailedSubs]) + dispatch(Strategy, Group, Topic, Delivery, FailedSubs#{SubPid => Reason}) end end. @@ -165,33 +171,33 @@ do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch SubPid ! {deliver, Topic, Msg}, ok; -do_dispatch(SubPid, _Group, Topic, Msg, retry) -> - %% Retry implies all subscribers nack:ed, send again without ack - SubPid ! {deliver, Topic, Msg}, - ok; -do_dispatch(SubPid, Group, Topic, Msg, fresh) -> +do_dispatch(SubPid, Group, Topic, Msg, Type) -> case ack_enabled() of true -> - dispatch_with_ack(SubPid, Group, Topic, Msg); + dispatch_with_ack(SubPid, Group, Topic, Msg, Type); false -> SubPid ! {deliver, Topic, Msg}, ok end. -dispatch_with_ack(SubPid, Group, Topic, Msg) -> +dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), Sender = self(), - SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)}, + SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}, Timeout = case Msg#message.qos of ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); ?QOS_2 -> infinity end, + + %% This OpaqueRef is a forward compatibilty workaround. Pre 4.3.15 versions + %% pass Ref from `{Sender, Ref}` ack header back as it is. + OpaqueRef = old_ref(Type, Group, Ref), try receive - {Ref, ?ACK} -> + {ReceivedRef, ?ACK} when ReceivedRef =:= Ref; ReceivedRef =:= OpaqueRef -> ok; - {Ref, ?NACK(Reason)} -> + {ReceivedRef, ?NACK(Reason)} when ReceivedRef =:= Ref; ReceivedRef =:= OpaqueRef -> %% the receive session may nack this message when its queue is full {error, Reason}; {'DOWN', Ref, process, SubPid, Reason} -> @@ -204,8 +210,11 @@ dispatch_with_ack(SubPid, Group, Topic, Msg) -> _ = erlang:demonitor(Ref, [flush]) end. -with_group_ack(Msg, Group, Sender, Ref) -> - emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg). +with_group_ack(Msg, Group, Type, Sender, Ref) -> + emqx_message:set_headers(#{shared_dispatch_ack => {Sender, old_ref(Type, Group, Ref)}}, Msg). + +old_ref(Type, Group, Ref) -> + {Type, Group, Ref}. -spec(without_group_ack(emqx_types:message()) -> emqx_types:message()). without_group_ack(Msg) -> @@ -217,19 +226,32 @@ get_group_ack(Msg) -> -spec(get_group(emqx_types:message()) -> {ok, any()} | error). get_group(Msg) -> case get_group_ack(Msg) of - ?NO_ACK -> error; - {Group, _Sender, _Ref} -> {ok, Group} + {_Sender, {_Type, Group, _Ref}} -> {ok, Group}; + _ -> error end. -spec(is_ack_required(emqx_types:message()) -> boolean()). is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). %% @doc Negative ack dropped message due to inflight window or message queue being full. --spec(maybe_nack_dropped(emqx_types:message()) -> boolean()). +-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop). maybe_nack_dropped(Msg) -> case get_group_ack(Msg) of - ?NO_ACK -> false; - {_Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped) + %% No ack header is present, put it into mqueue + ?NO_ACK -> store; + + %% For fresh Ref we send a nack and return true, to note that the inflight is full + {Sender, {fresh, _Group, Ref}} -> nack(Sender, Ref, dropped), drop; + + %% For retry Ref we can't reject a message if inflight is full, so we mark it as + %% acknowledged and put it into mqueue + {_Sender, {retry, _Group, _Ref}} -> maybe_ack(Msg), store; + + %% This clause is for backward compatibility + Ack -> + {Sender, Ref} = fetch_sender_ref(Ack), + nack(Sender, Ref, dropped), + drop end. %% @doc Negative ack message due to connection down. @@ -237,7 +259,7 @@ maybe_nack_dropped(Msg) -> %% i.e is_ack_required returned true. -spec(nack_no_connection(emqx_types:message()) -> ok). nack_no_connection(Msg) -> - {_Group, Sender, Ref} = get_group_ack(Msg), + {Sender, Ref} = fetch_sender_ref(get_group_ack(Msg)), nack(Sender, Ref, no_connection). -spec(nack(pid(), reference(), dropped | no_connection) -> ok). @@ -250,11 +272,16 @@ maybe_ack(Msg) -> case get_group_ack(Msg) of ?NO_ACK -> Msg; - {_Group, Sender, Ref} -> + Ack -> + {Sender, Ref} = fetch_sender_ref(Ack), Sender ! {Ref, ?ACK}, without_group_ack(Msg) end. +fetch_sender_ref({Sender, {_Type, _Group, Ref}}) -> {Sender, Ref}; +%% These clauses are for backward compatibility +fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}. + pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> Sub0 = erlang:get({shared_sub_sticky, Group, Topic}), case is_active_sub(Sub0, FailedSubs) of @@ -264,23 +291,37 @@ pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> {fresh, Sub0}; false -> %% randomly pick one for the first message - {Type, Sub} = do_pick(random, ClientId, SourceTopic, Group, Topic, [Sub0 | FailedSubs]), - %% stick to whatever pick result - erlang:put({shared_sub_sticky, Group, Topic}, Sub), - {Type, Sub} + FailedSubs1 = maps_put_new(FailedSubs, Sub0, inactive), + case do_pick(random, ClientId, SourceTopic, Group, Topic, FailedSubs1) of + false -> false; + {Type, Sub} -> + %% stick to whatever pick result + erlang:put({shared_sub_sticky, Group, Topic}, Sub), + {Type, Sub} + end end; pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs). do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> All = subscribers(Group, Topic), - case All -- FailedSubs of + case lists:filter(fun(Sub) -> not maps:is_key(Sub, FailedSubs) end, All) of [] when All =:= [] -> %% Genuinely no subscriber false; [] -> - %% All offline? pick one anyway - {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)}; + %% We try redispatch to subs who dropped the message because inflight was full. + Found = maps_find_by(FailedSubs, fun({SubPid, FailReason}) -> + FailReason == dropped andalso is_alive_sub(SubPid) + end), + case Found of + {ok, Dropped, dropped} -> + %% Found dropped client + {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, [Dropped])}; + error -> + %% All offline? pick one anyway + {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)} + end; Subs -> %% More than one available {fresh, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs)} @@ -414,7 +455,7 @@ update_stats(State) -> %% Return 'true' if the subscriber process is alive AND not in the failed list is_active_sub(Pid, FailedSubs) -> - is_alive_sub(Pid) andalso not lists:member(Pid, FailedSubs). + not maps:is_key(Pid, FailedSubs) andalso is_alive_sub(Pid). %% erlang:is_process_alive/1 does not work with remote pid. is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> @@ -427,3 +468,22 @@ delete_route_if_needed({Group, Topic}) -> true -> ok; false -> ok = emqx_router:do_delete_route(Topic, {Group, node()}) end. + +maps_find_by(Map, Predicate) when is_map(Map) -> + maps_find_by(maps:iterator(Map), Predicate); + +maps_find_by(Iterator, Predicate) -> + case maps:next(Iterator) of + none -> error; + {Key, Value, NewIterator} -> + case Predicate(Key, Value) of + true -> {ok, Key, Value}; + false -> maps_find_by(NewIterator, Predicate) + end + end. + +maps_put_new(Map, Key, Value) -> + case Map of + #{Key := _} -> Map; + _ -> Map#{Key => Value} + end. diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 5c2de94d0..30bd96a66 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -47,20 +47,20 @@ t_is_ack_required(_) -> ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})). t_maybe_nack_dropped(_) -> - ?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, - ?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)), + ?assertEqual(store, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), + Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, + ?assertEqual(drop, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end). t_nack_no_connection(_) -> - Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok after 100 -> timeout end). t_maybe_ack(_) -> ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, ?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}}, emqx_shared_sub:maybe_ack(Msg)), ?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end). @@ -446,6 +446,53 @@ t_redispatch(_) -> emqtt:stop(UsedSubPid2), ok. +t_dispatch_when_inflights_are_full(_) -> + ok = ensure_config(round_robin, true), + Topic = <<"foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + + %% Note that max_inflight is 1 + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {max_inflight, 1}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {max_inflight, 1}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar">>, 2}), + emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar">>, 2}), + + Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>), + ct:sleep(100), + + sys:suspend(ConnPid1), + sys:suspend(ConnPid2), + + %% Fill in the inflight for first client + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)), + + %% Fill in the inflight for second client + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), + + %% Now kill any client + erlang:exit(ConnPid1, normal), + ct:sleep(100), + + %% And try to send the message + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)), + + %% And see that it gets dispatched to the client which is alive, even if it's inflight is full + sys:resume(ConnPid2), + ct:sleep(100), + ?assertMatch({true, ConnPid2}, last_message(<<"hello3">>, [ConnPid1, ConnPid2])), + ?assertMatch({true, ConnPid2}, last_message(<<"hello4">>, [ConnPid1, ConnPid2])), + + emqtt:stop(ConnPid2), + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- From aabfb718043dd2d5a90ee0a13ad522f91b405c28 Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Thu, 26 May 2022 12:32:15 +0400 Subject: [PATCH 2/2] chore: changes.md --- CHANGES-4.3.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 68a0b4b0c..5f904913f 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -30,6 +30,7 @@ File format: * Fix mqtt-sn client disconnected due to re-send a duplicated qos2 message * Rule-engine function hexstr2bin/1 support half byte [#7977] * Add rule-engine function float2str/2, user can specify the float output precision [#7991] +* Shared message delivery when all alive shared subs have full inflight * Improved resilience against autocluster partitioning during cluster startup. [#7876]