From cbf9bfb2e698e9a20cd776b30946eb3ffa4c0ad5 Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Wed, 27 Apr 2022 20:13:31 +0400 Subject: [PATCH] feat(shared): redispatch to another shared sub, when no ACK received --- src/emqx_session.erl | 57 ++++++++++++++++----------- src/emqx_shared_sub.erl | 72 +++++++++++++++++++++------------- test/emqx_shared_sub_SUITE.erl | 46 ++++++++++++++++++++-- 3 files changed, 121 insertions(+), 54 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 45ad18bbe..13516b256 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -491,10 +491,12 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = end, {ok, Session1}; false -> + %% Note that we publish message without shared ack header + %% But add to inflight with ack headers Publish = {PacketId, maybe_ack(Msg)}, - Msg2 = mark_begin_deliver(Msg), - Session1 = await(PacketId, Msg2, Session), - {ok, [Publish], next_pkt_id(Session1)} + Msg1 = with_ts(mark_begin_deliver(Msg)), + Inflight1 = emqx_inflight:insert(PacketId, Msg1, Inflight), + {ok, [Publish], next_pkt_id(Session#session{inflight = Inflight1})} end. -spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver()) | emqx_types:message(), @@ -532,14 +534,10 @@ enrich_delivers({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) enrich_subopts(get_subopts(Topic, Subs), Msg, Session). maybe_ack(Msg) -> - case emqx_shared_sub:is_ack_required(Msg) of - true -> emqx_shared_sub:maybe_ack(Msg); - false -> Msg - end. + emqx_shared_sub:maybe_ack(Msg). maybe_nack(Msg) -> - emqx_shared_sub:is_ack_required(Msg) - andalso (ok == emqx_shared_sub:maybe_nack_dropped(Msg)). + ok == emqx_shared_sub:maybe_nack_dropped(Msg). get_subopts(Topic, SubMap) -> case maps:find(Topic, SubMap) of @@ -572,14 +570,6 @@ enrich_subopts([{subid, SubId} | Opts], Msg, Session) -> Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg), enrich_subopts(Opts, Msg1, Session). -%%-------------------------------------------------------------------- -%% Awaiting ACK for QoS1/QoS2 Messages -%%-------------------------------------------------------------------- - -await(PacketId, Msg, Session = #session{inflight = Inflight}) -> - Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight), - Session#session{inflight = Inflight1}. - %%-------------------------------------------------------------------- %% Retry Delivery %%-------------------------------------------------------------------- @@ -679,16 +669,39 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) -> end. -spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok). -terminate(ClientInfo, discarded, Session) -> - run_hook('session.discarded', [ClientInfo, info(Session)]); -terminate(ClientInfo, takeovered, Session) -> - run_hook('session.takeovered', [ClientInfo, info(Session)]); terminate(ClientInfo, Reason, Session) -> + run_terminate_hooks(ClientInfo, Reason, Session), + cleanup_self_from_shared_subs(), + redispatch_shared_messages(Session), + ok. + +run_terminate_hooks(ClientInfo, discarded, Session) -> + run_hook('session.discarded', [ClientInfo, info(Session)]); +run_terminate_hooks(ClientInfo, takeovered, Session) -> + run_hook('session.takeovered', [ClientInfo, info(Session)]); +run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). +redispatch_shared_messages(#session{inflight = Inflight}) -> + InflightList = emqx_inflight:to_list(Inflight), + lists:map(fun({_, {#message{topic = Topic} = Msg, _}}) -> + case emqx_shared_sub:get_group(Msg) of + {ok, Group} -> + Delivery = #delivery{sender = self(), message = Msg}, + emqx_shared_sub:dispatch(Group, Topic, Delivery); + + _ -> + false + end + end, InflightList). + +cleanup_self_from_shared_subs() -> + emqx_shared_sub:cleanup(self()). + -compile({inline, [run_hook/2]}). run_hook(Name, Args) -> - ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). + ok = emqx_metrics:inc(Name), + emqx_hooks:run(Name, Args). %%-------------------------------------------------------------------- %% Inc message/delivery expired counter diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 12e64b59a..f188352c7 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -44,6 +44,8 @@ , maybe_nack_dropped/1 , nack_no_connection/1 , is_ack_required/1 + , get_group/1 + , cleanup/1 ]). %% for testing @@ -117,6 +119,10 @@ subscribe(Group, Topic, SubPid) when is_pid(SubPid) -> unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}). +-spec(cleanup(pid()) -> ok). +cleanup(SubPid) -> + gen_server:call(?SERVER, {cleanup, SubPid}). + record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. @@ -131,7 +137,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> false -> {error, no_subscribers}; {Type, SubPid} -> - case do_dispatch(SubPid, Topic, Msg, Type) of + case do_dispatch(SubPid, Group, Topic, Msg, Type) of ok -> {ok, 1}; {error, _Reason} -> %% Failed to dispatch to this sub, try next. @@ -153,36 +159,33 @@ strategy(Group) -> ack_enabled() -> emqx:get_env(shared_dispatch_ack_enabled, false). -do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() -> +do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -do_dispatch(SubPid, Topic, Msg, Type) -> - dispatch_per_qos(SubPid, Topic, Msg, Type). - %% return either 'ok' (when everything is fine) or 'error' -dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> +do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -dispatch_per_qos(SubPid, Topic, Msg, retry) -> +do_dispatch(SubPid, _Group, Topic, Msg, retry) -> %% Retry implies all subscribers nack:ed, send again without ack - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -dispatch_per_qos(SubPid, Topic, Msg, fresh) -> +do_dispatch(SubPid, Group, Topic, Msg, fresh) -> case ack_enabled() of true -> - dispatch_with_ack(SubPid, Topic, Msg); + dispatch_with_ack(SubPid, Group, Topic, Msg); false -> - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok end. -dispatch_with_ack(SubPid, Topic, Msg) -> +dispatch_with_ack(SubPid, Group, Topic, Msg) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), Sender = self(), - _ = erlang:send(SubPid, {deliver, Topic, with_ack_ref(Msg, {Sender, Ref})}), + SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)}, Timeout = case Msg#message.qos of ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); ?QOS_2 -> infinity @@ -204,24 +207,32 @@ dispatch_with_ack(SubPid, Topic, Msg) -> _ = erlang:demonitor(Ref, [flush]) end. -with_ack_ref(Msg, SenderRef) -> - emqx_message:set_headers(#{shared_dispatch_ack => SenderRef}, Msg). +with_group_ack(Msg, Group, Sender, Ref) -> + emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg). -without_ack_ref(Msg) -> +-spec(without_group_ack(emqx_types:message()) -> emqx_types:message()). +without_group_ack(Msg) -> emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg). -get_ack_ref(Msg) -> +get_group_ack(Msg) -> emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). +-spec(get_group(emqx_types:message()) -> {ok, any()} | error). +get_group(Msg) -> + case get_group_ack(Msg) of + ?NO_ACK -> error; + {Group, _Sender, _Ref} -> {ok, Group} + end. + -spec(is_ack_required(emqx_types:message()) -> boolean()). -is_ack_required(Msg) -> ?NO_ACK =/= get_ack_ref(Msg). +is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). %% @doc Negative ack dropped message due to inflight window or message queue being full. -spec(maybe_nack_dropped(emqx_types:message()) -> ok). maybe_nack_dropped(Msg) -> - case get_ack_ref(Msg) of + case get_group_ack(Msg) of ?NO_ACK -> ok; - {Sender, Ref} -> nack(Sender, Ref, dropped) + {_Group, Sender, Ref} -> nack(Sender, Ref, dropped) end. %% @doc Negative ack message due to connection down. @@ -229,22 +240,23 @@ maybe_nack_dropped(Msg) -> %% i.e is_ack_required returned true. -spec(nack_no_connection(emqx_types:message()) -> ok). nack_no_connection(Msg) -> - {Sender, Ref} = get_ack_ref(Msg), + {_Group, Sender, Ref} = get_group_ack(Msg), nack(Sender, Ref, no_connection). -spec(nack(pid(), reference(), dropped | no_connection) -> ok). nack(Sender, Ref, Reason) -> - erlang:send(Sender, {Ref, ?NACK(Reason)}), + Sender ! {Ref, ?NACK(Reason)}, ok. -spec(maybe_ack(emqx_types:message()) -> emqx_types:message()). maybe_ack(Msg) -> - case get_ack_ref(Msg) of + case get_group_ack(Msg) of ?NO_ACK -> Msg; - {Sender, Ref} -> - erlang:send(Sender, {Ref, ?ACK}), - without_ack_ref(Msg) + {_Group, Sender, Ref} -> + Sender ! {Ref, ?ACK}, + without_group_ack(Msg) + %% without_group_ack(Msg) end. pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> @@ -327,6 +339,10 @@ init_monitors() -> emqx_pmon:monitor(SubPid, Mon) end, emqx_pmon:new(), ?TAB). +handle_call({cleanup, SubPid}, _From, State) -> + cleanup_down(SubPid), + {reply, ok, State}; + handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)), case ets:member(?SHARED_SUBS, {Group, Topic}) of diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 0a6c47d60..69538f4fa 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -48,19 +48,19 @@ t_is_ack_required(_) -> t_maybe_nack_dropped(_) -> ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end). t_nack_no_connection(_) -> - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok after 100 -> timeout end). t_maybe_ack(_) -> ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}}, emqx_shared_sub:maybe_ack(Msg)), ?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end). @@ -284,11 +284,14 @@ test_two_messages(Strategy, Group) -> ok. last_message(ExpectedPayload, Pids) -> + last_message(ExpectedPayload, Pids, 100). + +last_message(ExpectedPayload, Pids, Timeout) -> receive {publish, #{client_pid := Pid, payload := ExpectedPayload}} -> ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]), {true, Pid} - after 100 -> + after Timeout -> ct:pal("not yet"), <<"not yet?">> end. @@ -410,6 +413,39 @@ t_local_fallback(_) -> ?assertEqual(UsedSubPid1, UsedSubPid2), ok. +%% This one tests that broker tries to select another shared subscriber +%% If the first one doesn't return an ACK +t_redispatch(_) -> + ok = ensure_config(sticky, true), + application:set_env(emqx, shared_dispatch_ack_enabled, true), + + Group = <<"group1">>, + + Topic = <<"foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}), + emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}), + + Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>), + + emqx:publish(Message), + + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + ok = emqtt:stop(UsedSubPid1), + + Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000), + ?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res), + + {true, UsedSubPid2} = Res, + emqtt:stop(UsedSubPid2), + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- @@ -481,6 +517,8 @@ setup_node(Node, Port) -> opts => [{zone,internal}], proto => tcp}]), application:set_env(gen_rpc, port_discovery, manual), + application:set_env(gen_rpc, tcp_server_port, Port * 2), + %% application:set_env(gen_rpc, ssl_server_port, Port * 2 + 1), ok; (_) -> ok