feat(shared): redispatch to another shared sub, when no ACK received

This commit is contained in:
Georgy Sychev 2022-04-27 20:13:31 +04:00
parent eacbce66f8
commit cbf9bfb2e6
3 changed files with 121 additions and 54 deletions

View File

@ -491,10 +491,12 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
end, end,
{ok, Session1}; {ok, Session1};
false -> false ->
%% Note that we publish message without shared ack header
%% But add to inflight with ack headers
Publish = {PacketId, maybe_ack(Msg)}, Publish = {PacketId, maybe_ack(Msg)},
Msg2 = mark_begin_deliver(Msg), Msg1 = with_ts(mark_begin_deliver(Msg)),
Session1 = await(PacketId, Msg2, Session), Inflight1 = emqx_inflight:insert(PacketId, Msg1, Inflight),
{ok, [Publish], next_pkt_id(Session1)} {ok, [Publish], next_pkt_id(Session#session{inflight = Inflight1})}
end. end.
-spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver()) | emqx_types:message(), -spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver()) | emqx_types:message(),
@ -532,14 +534,10 @@ enrich_delivers({deliver, Topic, Msg}, Session = #session{subscriptions = Subs})
enrich_subopts(get_subopts(Topic, Subs), Msg, Session). enrich_subopts(get_subopts(Topic, Subs), Msg, Session).
maybe_ack(Msg) -> maybe_ack(Msg) ->
case emqx_shared_sub:is_ack_required(Msg) of emqx_shared_sub:maybe_ack(Msg).
true -> emqx_shared_sub:maybe_ack(Msg);
false -> Msg
end.
maybe_nack(Msg) -> maybe_nack(Msg) ->
emqx_shared_sub:is_ack_required(Msg) ok == emqx_shared_sub:maybe_nack_dropped(Msg).
andalso (ok == emqx_shared_sub:maybe_nack_dropped(Msg)).
get_subopts(Topic, SubMap) -> get_subopts(Topic, SubMap) ->
case maps:find(Topic, SubMap) of case maps:find(Topic, SubMap) of
@ -572,14 +570,6 @@ enrich_subopts([{subid, SubId} | Opts], Msg, Session) ->
Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg), Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg),
enrich_subopts(Opts, Msg1, Session). enrich_subopts(Opts, Msg1, Session).
%%--------------------------------------------------------------------
%% Awaiting ACK for QoS1/QoS2 Messages
%%--------------------------------------------------------------------
await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight),
Session#session{inflight = Inflight1}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Retry Delivery %% Retry Delivery
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -679,16 +669,39 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
end. end.
-spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok). -spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok).
terminate(ClientInfo, discarded, Session) ->
run_hook('session.discarded', [ClientInfo, info(Session)]);
terminate(ClientInfo, takeovered, Session) ->
run_hook('session.takeovered', [ClientInfo, info(Session)]);
terminate(ClientInfo, Reason, Session) -> terminate(ClientInfo, Reason, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session),
cleanup_self_from_shared_subs(),
redispatch_shared_messages(Session),
ok.
run_terminate_hooks(ClientInfo, discarded, Session) ->
run_hook('session.discarded', [ClientInfo, info(Session)]);
run_terminate_hooks(ClientInfo, takeovered, Session) ->
run_hook('session.takeovered', [ClientInfo, info(Session)]);
run_terminate_hooks(ClientInfo, Reason, Session) ->
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
redispatch_shared_messages(#session{inflight = Inflight}) ->
InflightList = emqx_inflight:to_list(Inflight),
lists:map(fun({_, {#message{topic = Topic} = Msg, _}}) ->
case emqx_shared_sub:get_group(Msg) of
{ok, Group} ->
Delivery = #delivery{sender = self(), message = Msg},
emqx_shared_sub:dispatch(Group, Topic, Delivery);
_ ->
false
end
end, InflightList).
cleanup_self_from_shared_subs() ->
emqx_shared_sub:cleanup(self()).
-compile({inline, [run_hook/2]}). -compile({inline, [run_hook/2]}).
run_hook(Name, Args) -> run_hook(Name, Args) ->
ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). ok = emqx_metrics:inc(Name),
emqx_hooks:run(Name, Args).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Inc message/delivery expired counter %% Inc message/delivery expired counter

View File

@ -44,6 +44,8 @@
, maybe_nack_dropped/1 , maybe_nack_dropped/1
, nack_no_connection/1 , nack_no_connection/1
, is_ack_required/1 , is_ack_required/1
, get_group/1
, cleanup/1
]). ]).
%% for testing %% for testing
@ -117,6 +119,10 @@ subscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}). gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}).
-spec(cleanup(pid()) -> ok).
cleanup(SubPid) ->
gen_server:call(?SERVER, {cleanup, SubPid}).
record(Group, Topic, SubPid) -> record(Group, Topic, SubPid) ->
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
@ -131,7 +137,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
false -> false ->
{error, no_subscribers}; {error, no_subscribers};
{Type, SubPid} -> {Type, SubPid} ->
case do_dispatch(SubPid, Topic, Msg, Type) of case do_dispatch(SubPid, Group, Topic, Msg, Type) of
ok -> {ok, 1}; ok -> {ok, 1};
{error, _Reason} -> {error, _Reason} ->
%% Failed to dispatch to this sub, try next. %% Failed to dispatch to this sub, try next.
@ -153,36 +159,33 @@ strategy(Group) ->
ack_enabled() -> ack_enabled() ->
emqx:get_env(shared_dispatch_ack_enabled, false). emqx:get_env(shared_dispatch_ack_enabled, false).
do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() -> do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
%% Deadlock otherwise %% Deadlock otherwise
_ = erlang:send(SubPid, {deliver, Topic, Msg}), SubPid ! {deliver, Topic, Msg},
ok; ok;
do_dispatch(SubPid, Topic, Msg, Type) ->
dispatch_per_qos(SubPid, Topic, Msg, Type).
%% return either 'ok' (when everything is fine) or 'error' %% return either 'ok' (when everything is fine) or 'error'
dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
%% For QoS 0 message, send it as regular dispatch %% For QoS 0 message, send it as regular dispatch
_ = erlang:send(SubPid, {deliver, Topic, Msg}), SubPid ! {deliver, Topic, Msg},
ok; ok;
dispatch_per_qos(SubPid, Topic, Msg, retry) -> do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
%% Retry implies all subscribers nack:ed, send again without ack %% Retry implies all subscribers nack:ed, send again without ack
_ = erlang:send(SubPid, {deliver, Topic, Msg}), SubPid ! {deliver, Topic, Msg},
ok; ok;
dispatch_per_qos(SubPid, Topic, Msg, fresh) -> do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
case ack_enabled() of case ack_enabled() of
true -> true ->
dispatch_with_ack(SubPid, Topic, Msg); dispatch_with_ack(SubPid, Group, Topic, Msg);
false -> false ->
_ = erlang:send(SubPid, {deliver, Topic, Msg}), SubPid ! {deliver, Topic, Msg},
ok ok
end. end.
dispatch_with_ack(SubPid, Topic, Msg) -> dispatch_with_ack(SubPid, Group, Topic, Msg) ->
%% For QoS 1/2 message, expect an ack %% For QoS 1/2 message, expect an ack
Ref = erlang:monitor(process, SubPid), Ref = erlang:monitor(process, SubPid),
Sender = self(), Sender = self(),
_ = erlang:send(SubPid, {deliver, Topic, with_ack_ref(Msg, {Sender, Ref})}), SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)},
Timeout = case Msg#message.qos of Timeout = case Msg#message.qos of
?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
?QOS_2 -> infinity ?QOS_2 -> infinity
@ -204,24 +207,32 @@ dispatch_with_ack(SubPid, Topic, Msg) ->
_ = erlang:demonitor(Ref, [flush]) _ = erlang:demonitor(Ref, [flush])
end. end.
with_ack_ref(Msg, SenderRef) -> with_group_ack(Msg, Group, Sender, Ref) ->
emqx_message:set_headers(#{shared_dispatch_ack => SenderRef}, Msg). emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg).
without_ack_ref(Msg) -> -spec(without_group_ack(emqx_types:message()) -> emqx_types:message()).
without_group_ack(Msg) ->
emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg). emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg).
get_ack_ref(Msg) -> get_group_ack(Msg) ->
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
-spec(get_group(emqx_types:message()) -> {ok, any()} | error).
get_group(Msg) ->
case get_group_ack(Msg) of
?NO_ACK -> error;
{Group, _Sender, _Ref} -> {ok, Group}
end.
-spec(is_ack_required(emqx_types:message()) -> boolean()). -spec(is_ack_required(emqx_types:message()) -> boolean()).
is_ack_required(Msg) -> ?NO_ACK =/= get_ack_ref(Msg). is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
%% @doc Negative ack dropped message due to inflight window or message queue being full. %% @doc Negative ack dropped message due to inflight window or message queue being full.
-spec(maybe_nack_dropped(emqx_types:message()) -> ok). -spec(maybe_nack_dropped(emqx_types:message()) -> ok).
maybe_nack_dropped(Msg) -> maybe_nack_dropped(Msg) ->
case get_ack_ref(Msg) of case get_group_ack(Msg) of
?NO_ACK -> ok; ?NO_ACK -> ok;
{Sender, Ref} -> nack(Sender, Ref, dropped) {_Group, Sender, Ref} -> nack(Sender, Ref, dropped)
end. end.
%% @doc Negative ack message due to connection down. %% @doc Negative ack message due to connection down.
@ -229,22 +240,23 @@ maybe_nack_dropped(Msg) ->
%% i.e is_ack_required returned true. %% i.e is_ack_required returned true.
-spec(nack_no_connection(emqx_types:message()) -> ok). -spec(nack_no_connection(emqx_types:message()) -> ok).
nack_no_connection(Msg) -> nack_no_connection(Msg) ->
{Sender, Ref} = get_ack_ref(Msg), {_Group, Sender, Ref} = get_group_ack(Msg),
nack(Sender, Ref, no_connection). nack(Sender, Ref, no_connection).
-spec(nack(pid(), reference(), dropped | no_connection) -> ok). -spec(nack(pid(), reference(), dropped | no_connection) -> ok).
nack(Sender, Ref, Reason) -> nack(Sender, Ref, Reason) ->
erlang:send(Sender, {Ref, ?NACK(Reason)}), Sender ! {Ref, ?NACK(Reason)},
ok. ok.
-spec(maybe_ack(emqx_types:message()) -> emqx_types:message()). -spec(maybe_ack(emqx_types:message()) -> emqx_types:message()).
maybe_ack(Msg) -> maybe_ack(Msg) ->
case get_ack_ref(Msg) of case get_group_ack(Msg) of
?NO_ACK -> ?NO_ACK ->
Msg; Msg;
{Sender, Ref} -> {_Group, Sender, Ref} ->
erlang:send(Sender, {Ref, ?ACK}), Sender ! {Ref, ?ACK},
without_ack_ref(Msg) without_group_ack(Msg)
%% without_group_ack(Msg)
end. end.
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
@ -327,6 +339,10 @@ init_monitors() ->
emqx_pmon:monitor(SubPid, Mon) emqx_pmon:monitor(SubPid, Mon)
end, emqx_pmon:new(), ?TAB). end, emqx_pmon:new(), ?TAB).
handle_call({cleanup, SubPid}, _From, State) ->
cleanup_down(SubPid),
{reply, ok, State};
handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)), mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)),
case ets:member(?SHARED_SUBS, {Group, Topic}) of case ets:member(?SHARED_SUBS, {Group, Topic}) of

View File

@ -48,19 +48,19 @@ t_is_ack_required(_) ->
t_maybe_nack_dropped(_) -> t_maybe_nack_dropped(_) ->
?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(Msg)),
?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end). ?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end).
t_nack_no_connection(_) -> t_nack_no_connection(_) ->
Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok ?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok
after 100 -> timeout end). after 100 -> timeout end).
t_maybe_ack(_) -> t_maybe_ack(_) ->
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}}, ?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}},
emqx_shared_sub:maybe_ack(Msg)), emqx_shared_sub:maybe_ack(Msg)),
?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end). ?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).
@ -284,11 +284,14 @@ test_two_messages(Strategy, Group) ->
ok. ok.
last_message(ExpectedPayload, Pids) -> last_message(ExpectedPayload, Pids) ->
last_message(ExpectedPayload, Pids, 100).
last_message(ExpectedPayload, Pids, Timeout) ->
receive receive
{publish, #{client_pid := Pid, payload := ExpectedPayload}} -> {publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]), ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]),
{true, Pid} {true, Pid}
after 100 -> after Timeout ->
ct:pal("not yet"), ct:pal("not yet"),
<<"not yet?">> <<"not yet?">>
end. end.
@ -410,6 +413,39 @@ t_local_fallback(_) ->
?assertEqual(UsedSubPid1, UsedSubPid2), ?assertEqual(UsedSubPid1, UsedSubPid2),
ok. ok.
%% This one tests that broker tries to select another shared subscriber
%% If the first one doesn't return an ACK
t_redispatch(_) ->
ok = ensure_config(sticky, true),
application:set_env(emqx, shared_dispatch_ack_enabled, true),
Group = <<"group1">>,
Topic = <<"foo/bar">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
emqx:publish(Message),
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
ok = emqtt:stop(UsedSubPid1),
Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000),
?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res),
{true, UsedSubPid2} = Res,
emqtt:stop(UsedSubPid2),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% help functions %% help functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -481,6 +517,8 @@ setup_node(Node, Port) ->
opts => [{zone,internal}], opts => [{zone,internal}],
proto => tcp}]), proto => tcp}]),
application:set_env(gen_rpc, port_discovery, manual), application:set_env(gen_rpc, port_discovery, manual),
application:set_env(gen_rpc, tcp_server_port, Port * 2),
%% application:set_env(gen_rpc, ssl_server_port, Port * 2 + 1),
ok; ok;
(_) -> (_) ->
ok ok