From 5cc778ff303015567e0ac575fc18528010c14f2d Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Wed, 18 May 2022 14:33:09 +0400 Subject: [PATCH] fix(shared): retry dispatch after all inflights are full --- src/emqx_session.erl | 3 +- src/emqx_shared_sub.erl | 92 ++++++++++++++++++++++------------ test/emqx_shared_sub_SUITE.erl | 55 ++++++++++++++++++-- 3 files changed, 113 insertions(+), 37 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d01f33e6a..ee409b1c7 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -650,7 +650,7 @@ redispatch_shared_messages(#session{inflight = Inflight}) -> %% Note that dispatch is called with self() in failed subs %% This is done to avoid dispatching back to caller Delivery = #delivery{sender = self(), message = Msg}, - emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]); + emqx_shared_sub:dispatch_to_non_self(Group, Topic, Delivery); _ -> false end; @@ -716,4 +716,3 @@ age(Now, Ts) -> Now - Ts. set_field(Name, Value, Session) -> Pos = emqx_misc:index_of(Name, record_info(fields, session)), setelement(Pos+1, Session, Value). - diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 121d777ca..596a27a8e 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -39,8 +39,8 @@ ]). -export([ dispatch/3 - , dispatch/4 - ]). + , dispatch_to_non_self/3 + ]). -export([ maybe_ack/1 , maybe_nack_dropped/1 @@ -123,22 +123,28 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. +-spec(dispatch_to_non_self(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery()) + -> emqx_types:deliver_result()). +dispatch_to_non_self(Group, Topic, Delivery) -> + Strategy = strategy(Group), + dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{self() => sender}). + -spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). dispatch(Group, Topic, Delivery) -> - dispatch(Group, Topic, Delivery, _FailedSubs = []). + Strategy = strategy(Group), + dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}). -dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> +dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> #message{from = ClientId, topic = SourceTopic} = Msg, - case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of - false -> - {error, no_subscribers}; + case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of + false -> {error, no_subscribers}; {Type, SubPid} -> case do_dispatch(SubPid, Group, Topic, Msg, Type) of ok -> {ok, 1}; - {error, _Reason} -> + {error, Reason} -> %% Failed to dispatch to this sub, try next. - dispatch(Group, Topic, Delivery, [SubPid | FailedSubs]) + dispatch(Strategy, Group, Topic, Delivery, FailedSubs#{SubPid => Reason}) end end. @@ -165,24 +171,20 @@ do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch SubPid ! {deliver, Topic, Msg}, ok; -do_dispatch(SubPid, _Group, Topic, Msg, retry) -> - %% Retry implies all subscribers nack:ed, send again without ack - SubPid ! {deliver, Topic, Msg}, - ok; -do_dispatch(SubPid, Group, Topic, Msg, fresh) -> +do_dispatch(SubPid, Group, Topic, Msg, Type) -> case ack_enabled() of true -> - dispatch_with_ack(SubPid, Group, Topic, Msg); + dispatch_with_ack(SubPid, Group, Topic, Msg, Type); false -> SubPid ! {deliver, Topic, Msg}, ok end. -dispatch_with_ack(SubPid, Group, Topic, Msg) -> +dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), Sender = self(), - SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)}, + SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}, Timeout = case Msg#message.qos of ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); ?QOS_2 -> infinity @@ -204,8 +206,8 @@ dispatch_with_ack(SubPid, Group, Topic, Msg) -> _ = erlang:demonitor(Ref, [flush]) end. -with_group_ack(Msg, Group, Sender, Ref) -> - emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg). +with_group_ack(Msg, Group, Type, Sender, Ref) -> + emqx_message:set_headers(#{shared_dispatch_ack => {Type, Group, Sender, Ref}}, Msg). -spec(without_group_ack(emqx_types:message()) -> emqx_types:message()). without_group_ack(Msg) -> @@ -218,7 +220,7 @@ get_group_ack(Msg) -> get_group(Msg) -> case get_group_ack(Msg) of ?NO_ACK -> error; - {Group, _Sender, _Ref} -> {ok, Group} + {_Type, Group, _Sender, _Ref} -> {ok, Group} end. -spec(is_ack_required(emqx_types:message()) -> boolean()). @@ -228,8 +230,9 @@ is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). -spec(maybe_nack_dropped(emqx_types:message()) -> boolean()). maybe_nack_dropped(Msg) -> case get_group_ack(Msg) of - ?NO_ACK -> false; - {_Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped) + ?NO_ACK -> false; + {fresh, _Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped); + {retry, _Group, _Sender, _Ref} -> maybe_ack(Msg), false end. %% @doc Negative ack message due to connection down. @@ -237,7 +240,7 @@ maybe_nack_dropped(Msg) -> %% i.e is_ack_required returned true. -spec(nack_no_connection(emqx_types:message()) -> ok). nack_no_connection(Msg) -> - {_Group, Sender, Ref} = get_group_ack(Msg), + {_Type, _Group, Sender, Ref} = get_group_ack(Msg), nack(Sender, Ref, no_connection). -spec(nack(pid(), reference(), dropped | no_connection) -> ok). @@ -250,7 +253,7 @@ maybe_ack(Msg) -> case get_group_ack(Msg) of ?NO_ACK -> Msg; - {_Group, Sender, Ref} -> + {_Type, _Group, Sender, Ref} -> Sender ! {Ref, ?ACK}, without_group_ack(Msg) end. @@ -264,23 +267,31 @@ pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> {fresh, Sub0}; false -> %% randomly pick one for the first message - {Type, Sub} = do_pick(random, ClientId, SourceTopic, Group, Topic, [Sub0 | FailedSubs]), - %% stick to whatever pick result - erlang:put({shared_sub_sticky, Group, Topic}, Sub), - {Type, Sub} + FailedSubs1 = maps_put_new(FailedSubs, Sub0, inactive), + case do_pick(random, ClientId, SourceTopic, Group, Topic, FailedSubs1) of + false -> false; + {Type, Sub} -> + %% stick to whatever pick result + erlang:put({shared_sub_sticky, Group, Topic}, Sub), + {Type, Sub} + end end; pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs). do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> All = subscribers(Group, Topic), - case All -- FailedSubs of + case lists:filter(fun(Sub) -> not maps:is_key(Sub, FailedSubs) end, All) of [] when All =:= [] -> %% Genuinely no subscriber false; [] -> %% All offline? pick one anyway - {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)}; + %% We redispatch only to subs who dropped the message because inflight was full. + Dropped = maps_find_by(FailedSubs, fun({SubPid, FailReason}) -> + FailReason == dropped andalso is_alive_sub(SubPid) + end), + {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Dropped)}; Subs -> %% More than one available {fresh, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs)} @@ -414,7 +425,7 @@ update_stats(State) -> %% Return 'true' if the subscriber process is alive AND not in the failed list is_active_sub(Pid, FailedSubs) -> - is_alive_sub(Pid) andalso not lists:member(Pid, FailedSubs). + not maps:is_key(Pid, FailedSubs) andalso is_alive_sub(Pid). %% erlang:is_process_alive/1 does not work with remote pid. is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> @@ -427,3 +438,22 @@ delete_route_if_needed({Group, Topic}) -> true -> ok; false -> ok = emqx_router:do_delete_route(Topic, {Group, node()}) end. + +maps_find_by(Map, Predicate) when is_map(Map) -> + maps_find_by(maps:iterator(Map), Predicate); + +maps_find_by(Iterator, Predicate) -> + case maps:next(Iterator) of + none -> error; + {Key, Value, NewIterator} -> + case Predicate(Key, Value) of + true -> {ok, Key, Value}; + false -> maps_find_by(NewIterator, Predicate) + end + end. + +maps_put_new(Map, Key, Value) -> + case Map of + #{Key := _} -> Map; + _ -> Map#{Key => Value} + end. diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 5c2de94d0..3d3340858 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -1,4 +1,4 @@ -%%-------------------------------------------------------------------- +%fresh, fresh, fresh, %-------------------------------------------------------------------- %% Copyright (c) 2018-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -48,19 +48,19 @@ t_is_ack_required(_) -> t_maybe_nack_dropped(_) -> ?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {fresh, <<"group">>, self(), for_test}}}, ?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end). t_nack_no_connection(_) -> - Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {fresh, <<"group">>, self(), for_test}}}, ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok after 100 -> timeout end). t_maybe_ack(_) -> ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {fresh, <<"group">>, self(), for_test}}}, ?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}}, emqx_shared_sub:maybe_ack(Msg)), ?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end). @@ -446,6 +446,53 @@ t_redispatch(_) -> emqtt:stop(UsedSubPid2), ok. +t_dispatch_when_inflights_are_full(_) -> + ok = ensure_config(round_robin, true), + Topic = <<"foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + + %% Note that max_inflight is 1 + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {max_inflight, 1}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {max_inflight, 1}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar">>, 2}), + emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar">>, 2}), + + Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>), + ct:sleep(100), + + sys:suspend(ConnPid1), + sys:suspend(ConnPid2), + + %% Fill in the inflight for first client + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)), + + %% Fill in the inflight for second client + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), + + %% Now kill any client + erlang:exit(ConnPid1, normal), + ct:sleep(100), + + %% And try to send the message + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)), + + %% And see that it gets dispatched to the client which is alive, even if it's inflight is full + sys:resume(ConnPid2), + ct:sleep(100), + ?assertMatch({true, ConnPid2}, last_message(<<"hello3">>, [ConnPid1, ConnPid2])), + ?assertMatch({true, ConnPid2}, last_message(<<"hello4">>, [ConnPid1, ConnPid2])), + + emqtt:stop(ConnPid2), + ok. + %%-------------------------------------------------------------------- %% help functions %%--------------------------------------------------------------------