fix(shared): retry dispatch when inflights are full

This commit is contained in:
Georgy Sychev 2022-05-18 14:33:09 +04:00
parent 192c65047e
commit 31d4c92a17
3 changed files with 151 additions and 45 deletions

View File

@ -449,8 +449,8 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
case emqx_inflight:is_full(Inflight) of case emqx_inflight:is_full(Inflight) of
true -> true ->
Session1 = case maybe_nack(Msg) of Session1 = case maybe_nack(Msg) of
true -> Session; drop -> Session;
false -> enqueue(ClientInfo, Msg, Session) store -> enqueue(ClientInfo, Msg, Session)
end, end,
{ok, Session1}; {ok, Session1};
false -> false ->
@ -650,7 +650,7 @@ redispatch_shared_messages(#session{inflight = Inflight}) ->
%% Note that dispatch is called with self() in failed subs %% Note that dispatch is called with self() in failed subs
%% This is done to avoid dispatching back to caller %% This is done to avoid dispatching back to caller
Delivery = #delivery{sender = self(), message = Msg}, Delivery = #delivery{sender = self(), message = Msg},
emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]); emqx_shared_sub:dispatch_to_non_self(Group, Topic, Delivery);
_ -> _ ->
false false
end; end;
@ -716,4 +716,3 @@ age(Now, Ts) -> Now - Ts.
set_field(Name, Value, Session) -> set_field(Name, Value, Session) ->
Pos = emqx_misc:index_of(Name, record_info(fields, session)), Pos = emqx_misc:index_of(Name, record_info(fields, session)),
setelement(Pos+1, Session, Value). setelement(Pos+1, Session, Value).

View File

@ -39,7 +39,7 @@
]). ]).
-export([ dispatch/3 -export([ dispatch/3
, dispatch/4 , dispatch_to_non_self/3
]). ]).
-export([ maybe_ack/1 -export([ maybe_ack/1
@ -123,22 +123,28 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
record(Group, Topic, SubPid) -> record(Group, Topic, SubPid) ->
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
-spec(dispatch_to_non_self(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery())
-> emqx_types:deliver_result()).
dispatch_to_non_self(Group, Topic, Delivery) ->
Strategy = strategy(Group),
dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{self() => sender}).
-spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery()) -spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery())
-> emqx_types:deliver_result()). -> emqx_types:deliver_result()).
dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery) ->
dispatch(Group, Topic, Delivery, _FailedSubs = []). Strategy = strategy(Group),
dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}).
dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
#message{from = ClientId, topic = SourceTopic} = Msg, #message{from = ClientId, topic = SourceTopic} = Msg,
case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of
false -> false -> {error, no_subscribers};
{error, no_subscribers};
{Type, SubPid} -> {Type, SubPid} ->
case do_dispatch(SubPid, Group, Topic, Msg, Type) of case do_dispatch(SubPid, Group, Topic, Msg, Type) of
ok -> {ok, 1}; ok -> {ok, 1};
{error, _Reason} -> {error, Reason} ->
%% Failed to dispatch to this sub, try next. %% Failed to dispatch to this sub, try next.
dispatch(Group, Topic, Delivery, [SubPid | FailedSubs]) dispatch(Strategy, Group, Topic, Delivery, FailedSubs#{SubPid => Reason})
end end
end. end.
@ -165,33 +171,33 @@ do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
%% For QoS 0 message, send it as regular dispatch %% For QoS 0 message, send it as regular dispatch
SubPid ! {deliver, Topic, Msg}, SubPid ! {deliver, Topic, Msg},
ok; ok;
do_dispatch(SubPid, _Group, Topic, Msg, retry) -> do_dispatch(SubPid, Group, Topic, Msg, Type) ->
%% Retry implies all subscribers nack:ed, send again without ack
SubPid ! {deliver, Topic, Msg},
ok;
do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
case ack_enabled() of case ack_enabled() of
true -> true ->
dispatch_with_ack(SubPid, Group, Topic, Msg); dispatch_with_ack(SubPid, Group, Topic, Msg, Type);
false -> false ->
SubPid ! {deliver, Topic, Msg}, SubPid ! {deliver, Topic, Msg},
ok ok
end. end.
dispatch_with_ack(SubPid, Group, Topic, Msg) -> dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
%% For QoS 1/2 message, expect an ack %% For QoS 1/2 message, expect an ack
Ref = erlang:monitor(process, SubPid), Ref = erlang:monitor(process, SubPid),
Sender = self(), Sender = self(),
SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)}, SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)},
Timeout = case Msg#message.qos of Timeout = case Msg#message.qos of
?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
?QOS_2 -> infinity ?QOS_2 -> infinity
end, end,
%% This OpaqueRef is a forward compatibilty workaround. Pre 4.3.15 versions
%% pass Ref from `{Sender, Ref}` ack header back as it is.
OpaqueRef = old_ref(Type, Group, Ref),
try try
receive receive
{Ref, ?ACK} -> {ReceivedRef, ?ACK} when ReceivedRef =:= Ref; ReceivedRef =:= OpaqueRef ->
ok; ok;
{Ref, ?NACK(Reason)} -> {ReceivedRef, ?NACK(Reason)} when ReceivedRef =:= Ref; ReceivedRef =:= OpaqueRef ->
%% the receive session may nack this message when its queue is full %% the receive session may nack this message when its queue is full
{error, Reason}; {error, Reason};
{'DOWN', Ref, process, SubPid, Reason} -> {'DOWN', Ref, process, SubPid, Reason} ->
@ -204,8 +210,11 @@ dispatch_with_ack(SubPid, Group, Topic, Msg) ->
_ = erlang:demonitor(Ref, [flush]) _ = erlang:demonitor(Ref, [flush])
end. end.
with_group_ack(Msg, Group, Sender, Ref) -> with_group_ack(Msg, Group, Type, Sender, Ref) ->
emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg). emqx_message:set_headers(#{shared_dispatch_ack => {Sender, old_ref(Type, Group, Ref)}}, Msg).
old_ref(Type, Group, Ref) ->
{Type, Group, Ref}.
-spec(without_group_ack(emqx_types:message()) -> emqx_types:message()). -spec(without_group_ack(emqx_types:message()) -> emqx_types:message()).
without_group_ack(Msg) -> without_group_ack(Msg) ->
@ -217,19 +226,32 @@ get_group_ack(Msg) ->
-spec(get_group(emqx_types:message()) -> {ok, any()} | error). -spec(get_group(emqx_types:message()) -> {ok, any()} | error).
get_group(Msg) -> get_group(Msg) ->
case get_group_ack(Msg) of case get_group_ack(Msg) of
?NO_ACK -> error; {_Sender, {_Type, Group, _Ref}} -> {ok, Group};
{Group, _Sender, _Ref} -> {ok, Group} _ -> error
end. end.
-spec(is_ack_required(emqx_types:message()) -> boolean()). -spec(is_ack_required(emqx_types:message()) -> boolean()).
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
%% @doc Negative ack dropped message due to inflight window or message queue being full. %% @doc Negative ack dropped message due to inflight window or message queue being full.
-spec(maybe_nack_dropped(emqx_types:message()) -> boolean()). -spec(maybe_nack_dropped(emqx_types:message()) -> store | drop).
maybe_nack_dropped(Msg) -> maybe_nack_dropped(Msg) ->
case get_group_ack(Msg) of case get_group_ack(Msg) of
?NO_ACK -> false; %% No ack header is present, put it into mqueue
{_Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped) ?NO_ACK -> store;
%% For fresh Ref we send a nack and return true, to note that the inflight is full
{Sender, {fresh, _Group, Ref}} -> nack(Sender, Ref, dropped), drop;
%% For retry Ref we can't reject a message if inflight is full, so we mark it as
%% acknowledged and put it into mqueue
{_Sender, {retry, _Group, _Ref}} -> maybe_ack(Msg), store;
%% This clause is for backward compatibility
Ack ->
{Sender, Ref} = fetch_sender_ref(Ack),
nack(Sender, Ref, dropped),
drop
end. end.
%% @doc Negative ack message due to connection down. %% @doc Negative ack message due to connection down.
@ -237,7 +259,7 @@ maybe_nack_dropped(Msg) ->
%% i.e is_ack_required returned true. %% i.e is_ack_required returned true.
-spec(nack_no_connection(emqx_types:message()) -> ok). -spec(nack_no_connection(emqx_types:message()) -> ok).
nack_no_connection(Msg) -> nack_no_connection(Msg) ->
{_Group, Sender, Ref} = get_group_ack(Msg), {Sender, Ref} = fetch_sender_ref(get_group_ack(Msg)),
nack(Sender, Ref, no_connection). nack(Sender, Ref, no_connection).
-spec(nack(pid(), reference(), dropped | no_connection) -> ok). -spec(nack(pid(), reference(), dropped | no_connection) -> ok).
@ -250,11 +272,16 @@ maybe_ack(Msg) ->
case get_group_ack(Msg) of case get_group_ack(Msg) of
?NO_ACK -> ?NO_ACK ->
Msg; Msg;
{_Group, Sender, Ref} -> Ack ->
{Sender, Ref} = fetch_sender_ref(Ack),
Sender ! {Ref, ?ACK}, Sender ! {Ref, ?ACK},
without_group_ack(Msg) without_group_ack(Msg)
end. end.
fetch_sender_ref({Sender, {_Type, _Group, Ref}}) -> {Sender, Ref};
%% These clauses are for backward compatibility
fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}.
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
Sub0 = erlang:get({shared_sub_sticky, Group, Topic}), Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
case is_active_sub(Sub0, FailedSubs) of case is_active_sub(Sub0, FailedSubs) of
@ -264,23 +291,37 @@ pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
{fresh, Sub0}; {fresh, Sub0};
false -> false ->
%% randomly pick one for the first message %% randomly pick one for the first message
{Type, Sub} = do_pick(random, ClientId, SourceTopic, Group, Topic, [Sub0 | FailedSubs]), FailedSubs1 = maps_put_new(FailedSubs, Sub0, inactive),
case do_pick(random, ClientId, SourceTopic, Group, Topic, FailedSubs1) of
false -> false;
{Type, Sub} ->
%% stick to whatever pick result %% stick to whatever pick result
erlang:put({shared_sub_sticky, Group, Topic}, Sub), erlang:put({shared_sub_sticky, Group, Topic}, Sub),
{Type, Sub} {Type, Sub}
end
end; end;
pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs). do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs).
do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
All = subscribers(Group, Topic), All = subscribers(Group, Topic),
case All -- FailedSubs of case lists:filter(fun(Sub) -> not maps:is_key(Sub, FailedSubs) end, All) of
[] when All =:= [] -> [] when All =:= [] ->
%% Genuinely no subscriber %% Genuinely no subscriber
false; false;
[] -> [] ->
%% We try redispatch to subs who dropped the message because inflight was full.
Found = maps_find_by(FailedSubs, fun({SubPid, FailReason}) ->
FailReason == dropped andalso is_alive_sub(SubPid)
end),
case Found of
{ok, Dropped, dropped} ->
%% Found dropped client
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, [Dropped])};
error ->
%% All offline? pick one anyway %% All offline? pick one anyway
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)}; {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)}
end;
Subs -> Subs ->
%% More than one available %% More than one available
{fresh, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs)} {fresh, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs)}
@ -414,7 +455,7 @@ update_stats(State) ->
%% Return 'true' if the subscriber process is alive AND not in the failed list %% Return 'true' if the subscriber process is alive AND not in the failed list
is_active_sub(Pid, FailedSubs) -> is_active_sub(Pid, FailedSubs) ->
is_alive_sub(Pid) andalso not lists:member(Pid, FailedSubs). not maps:is_key(Pid, FailedSubs) andalso is_alive_sub(Pid).
%% erlang:is_process_alive/1 does not work with remote pid. %% erlang:is_process_alive/1 does not work with remote pid.
is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
@ -427,3 +468,22 @@ delete_route_if_needed({Group, Topic}) ->
true -> ok; true -> ok;
false -> ok = emqx_router:do_delete_route(Topic, {Group, node()}) false -> ok = emqx_router:do_delete_route(Topic, {Group, node()})
end. end.
maps_find_by(Map, Predicate) when is_map(Map) ->
maps_find_by(maps:iterator(Map), Predicate);
maps_find_by(Iterator, Predicate) ->
case maps:next(Iterator) of
none -> error;
{Key, Value, NewIterator} ->
case Predicate(Key, Value) of
true -> {ok, Key, Value};
false -> maps_find_by(NewIterator, Predicate)
end
end.
maps_put_new(Map, Key, Value) ->
case Map of
#{Key := _} -> Map;
_ -> Map#{Key => Value}
end.

View File

@ -47,20 +47,20 @@ t_is_ack_required(_) ->
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})). ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
t_maybe_nack_dropped(_) -> t_maybe_nack_dropped(_) ->
?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), ?assertEqual(store, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual(drop, emqx_shared_sub:maybe_nack_dropped(Msg)),
?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end). ?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end).
t_nack_no_connection(_) -> t_nack_no_connection(_) ->
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok ?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok
after 100 -> timeout end). after 100 -> timeout end).
t_maybe_ack(_) -> t_maybe_ack(_) ->
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}}, ?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}},
emqx_shared_sub:maybe_ack(Msg)), emqx_shared_sub:maybe_ack(Msg)),
?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end). ?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).
@ -446,6 +446,53 @@ t_redispatch(_) ->
emqtt:stop(UsedSubPid2), emqtt:stop(UsedSubPid2),
ok. ok.
t_dispatch_when_inflights_are_full(_) ->
ok = ensure_config(round_robin, true),
Topic = <<"foo/bar">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
%% Note that max_inflight is 1
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {max_inflight, 1}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {max_inflight, 1}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar">>, 2}),
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar">>, 2}),
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
ct:sleep(100),
sys:suspend(ConnPid1),
sys:suspend(ConnPid2),
%% Fill in the inflight for first client
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
%% Fill in the inflight for second client
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
%% Now kill any client
erlang:exit(ConnPid1, normal),
ct:sleep(100),
%% And try to send the message
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
%% And see that it gets dispatched to the client which is alive, even if it's inflight is full
sys:resume(ConnPid2),
ct:sleep(100),
?assertMatch({true, ConnPid2}, last_message(<<"hello3">>, [ConnPid1, ConnPid2])),
?assertMatch({true, ConnPid2}, last_message(<<"hello4">>, [ConnPid1, ConnPid2])),
emqtt:stop(ConnPid2),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% help functions %% help functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------