diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 310d21c92..78a518d9b 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -561,10 +561,12 @@ deliver_msg( end, {ok, Session1}; false -> + %% Note that we publish message without shared ack header + %% But add to inflight with ack headers Publish = {PacketId, maybe_ack(Msg)}, - Msg2 = mark_begin_deliver(Msg), - Session1 = await(PacketId, Msg2, Session), - {ok, [Publish], next_pkt_id(Session1)} + Msg1 = with_ts(mark_begin_deliver(Msg)), + Inflight1 = emqx_inflight:insert(PacketId, Msg1, Inflight), + {ok, [Publish], next_pkt_id(Session#session{inflight = Inflight1})} end. -spec enqueue( @@ -625,14 +627,10 @@ enrich_deliver({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) enrich_subopts(get_subopts(Topic, Subs), Msg, Session). maybe_ack(Msg) -> - case emqx_shared_sub:is_ack_required(Msg) of - true -> emqx_shared_sub:maybe_ack(Msg); - false -> Msg - end. + emqx_shared_sub:maybe_ack(Msg). maybe_nack(Msg) -> - emqx_shared_sub:is_ack_required(Msg) andalso - (ok == emqx_shared_sub:maybe_nack_dropped(Msg)). + ok == emqx_shared_sub:maybe_nack_dropped(Msg). get_subopts(Topic, SubMap) -> case maps:find(Topic, SubMap) of @@ -673,14 +671,6 @@ enrich_subopts([{subid, SubId} | Opts], Msg, Session) -> Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg), enrich_subopts(Opts, Msg1, Session). -%%-------------------------------------------------------------------- -%% Awaiting ACK for QoS1/QoS2 Messages -%%-------------------------------------------------------------------- - -await(PacketId, Msg, Session = #session{inflight = Inflight}) -> - Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight), - Session#session{inflight = Inflight1}. - %%-------------------------------------------------------------------- %% Retry Delivery %%-------------------------------------------------------------------- @@ -808,13 +798,36 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) -> end. -spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok. -terminate(ClientInfo, discarded, Session) -> - run_hook('session.discarded', [ClientInfo, info(Session)]); -terminate(ClientInfo, takenover, Session) -> - run_hook('session.takenover', [ClientInfo, info(Session)]); terminate(ClientInfo, Reason, Session) -> + run_terminate_hooks(ClientInfo, Reason, Session), + cleanup_self_from_shared_subs(), + redispatch_shared_messages(Session), + ok. + +run_terminate_hooks(ClientInfo, discarded, Session) -> + run_hook('session.discarded', [ClientInfo, info(Session)]); +run_terminate_hooks(ClientInfo, takeovered, Session) -> + run_hook('session.takeovered', [ClientInfo, info(Session)]); +run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). +redispatch_shared_messages(#session{inflight = Inflight}) -> + InflightList = emqx_inflight:to_list(Inflight), + lists:map(fun({_, {#message{topic = Topic} = Msg, _}}) -> + case emqx_shared_sub:get_group(Msg) of + {ok, Group} -> + Delivery = #delivery{sender = self(), message = Msg}, + emqx_shared_sub:dispatch(Group, Topic, Delivery); + + _ -> + false + end + end, InflightList). + +cleanup_self_from_shared_subs() -> + emqx_shared_sub:cleanup(self()). + +-compile({inline, [run_hook/2]}). run_hook(Name, Args) -> ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index e8ae58f79..710a97b06 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -30,24 +30,18 @@ %% APIs -export([start_link/0]). - --export([ - subscribe/3, - unsubscribe/3 -]). - +export([subscribe/3, unsubscribe/3]). -export([dispatch/3]). - -export([ maybe_ack/1, maybe_nack_dropped/1, nack_no_connection/1, - is_ack_required/1 + is_ack_required/1, + get_group/1, + cleanup/1 ]). - %% for testing -export([subscribers/2]). - %% gen_server callbacks -export([ init/1, @@ -60,27 +54,21 @@ -export_type([strategy/0]). --type strategy() :: - random - | round_robin - | sticky - %% same as hash_clientid, backward compatible - | hash - | hash_clientid - | hash_topic. +-type strategy() :: random | round_robin | sticky | hash | hash_clientid | hash_topic. + +%% same as hash_clientid, backward compatible -define(SERVER, ?MODULE). -define(TAB, emqx_shared_subscription). -define(SHARED_SUBS, emqx_shared_subscriber). -define(ALIVE_SUBS, emqx_alive_shared_subscribers). -define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5). --define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())). +-define(IS_LOCAL_PID(Pid), is_pid(Pid) andalso node(Pid) =:= node()). -define(ACK, shared_sub_ack). -define(NACK(Reason), {shared_sub_nack, Reason}). -define(NO_ACK, no_ack). -record(state, {pmon}). - -record(emqx_shared_subscription, {group, topic, subpid}). %%-------------------------------------------------------------------- @@ -88,13 +76,17 @@ %%-------------------------------------------------------------------- mnesia(boot) -> - ok = mria:create_table(?TAB, [ - {type, bag}, - {rlog_shard, ?SHARED_SUB_SHARD}, - {storage, ram_copies}, - {record_name, emqx_shared_subscription}, - {attributes, record_info(fields, emqx_shared_subscription)} - ]). + ok = + mria:create_table( + ?TAB, + [ + {type, bag}, + {rlog_shard, ?SHARED_SUB_SHARD}, + {storage, ram_copies}, + {record_name, emqx_shared_subscription}, + {attributes, record_info(fields, emqx_shared_subscription)} + ] + ). %%-------------------------------------------------------------------- %% API @@ -112,8 +104,16 @@ subscribe(Group, Topic, SubPid) when is_pid(SubPid) -> unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}). +-spec cleanup(pid()) -> ok. +cleanup(SubPid) -> + gen_server:call(?SERVER, {cleanup, SubPid}). + record(Group, Topic, SubPid) -> - #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. + #emqx_shared_subscription{ + group = Group, + topic = Topic, + subpid = SubPid + }. -spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). @@ -126,7 +126,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> false -> {error, no_subscribers}; {Type, SubPid} -> - case do_dispatch(SubPid, Topic, Msg, Type) of + case do_dispatch(SubPid, Group, Topic, Msg, Type) of ok -> {ok, 1}; {error, _Reason} -> @@ -143,40 +143,39 @@ strategy() -> ack_enabled() -> emqx:get_config([broker, shared_dispatch_ack_enabled]). -do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() -> +do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -do_dispatch(SubPid, Topic, Msg, Type) -> - dispatch_per_qos(SubPid, Topic, Msg, Type). - %% return either 'ok' (when everything is fine) or 'error' -dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> +do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -dispatch_per_qos(SubPid, Topic, Msg, retry) -> +do_dispatch(SubPid, _Group, Topic, Msg, retry) -> %% Retry implies all subscribers nack:ed, send again without ack - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok; -dispatch_per_qos(SubPid, Topic, Msg, fresh) -> +do_dispatch(SubPid, Group, Topic, Msg, fresh) -> case ack_enabled() of true -> - dispatch_with_ack(SubPid, Topic, Msg); + dispatch_with_ack(SubPid, Group, Topic, Msg); false -> - _ = erlang:send(SubPid, {deliver, Topic, Msg}), + SubPid ! {deliver, Topic, Msg}, ok end. -dispatch_with_ack(SubPid, Topic, Msg) -> +dispatch_with_ack(SubPid, Group, Topic, Msg) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), Sender = self(), - _ = erlang:send(SubPid, {deliver, Topic, with_ack_ref(Msg, {Sender, Ref})}), + SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)}, Timeout = case Msg#message.qos of - ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); - ?QOS_2 -> infinity + ?QOS_1 -> + timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); + ?QOS_2 -> + infinity end, try receive @@ -194,24 +193,37 @@ dispatch_with_ack(SubPid, Topic, Msg) -> ok = emqx_pmon:demonitor(Ref) end. -with_ack_ref(Msg, SenderRef) -> - emqx_message:set_headers(#{shared_dispatch_ack => SenderRef}, Msg). +with_group_ack(Msg, Group, Sender, Ref) -> + emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg). -without_ack_ref(Msg) -> +-spec without_group_ack(emqx_types:message()) -> emqx_types:message(). +without_group_ack(Msg) -> emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg). -get_ack_ref(Msg) -> +get_group_ack(Msg) -> emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). +-spec get_group(emqx_types:message()) -> {ok, any()} | error. +get_group(Msg) -> + case get_group_ack(Msg) of + ?NO_ACK -> + error; + {Group, _Sender, _Ref} -> + {ok, Group} + end. + -spec is_ack_required(emqx_types:message()) -> boolean(). -is_ack_required(Msg) -> ?NO_ACK =/= get_ack_ref(Msg). +is_ack_required(Msg) -> + ?NO_ACK =/= get_group_ack(Msg). %% @doc Negative ack dropped message due to inflight window or message queue being full. -spec maybe_nack_dropped(emqx_types:message()) -> ok. maybe_nack_dropped(Msg) -> - case get_ack_ref(Msg) of - ?NO_ACK -> ok; - {Sender, Ref} -> nack(Sender, Ref, dropped) + case get_group_ack(Msg) of + ?NO_ACK -> + ok; + {_Group, Sender, Ref} -> + nack(Sender, Ref, dropped) end. %% @doc Negative ack message due to connection down. @@ -219,22 +231,23 @@ maybe_nack_dropped(Msg) -> %% i.e is_ack_required returned true. -spec nack_no_connection(emqx_types:message()) -> ok. nack_no_connection(Msg) -> - {Sender, Ref} = get_ack_ref(Msg), + {_Group, Sender, Ref} = get_group_ack(Msg), nack(Sender, Ref, no_connection). -spec nack(pid(), reference(), dropped | no_connection) -> ok. nack(Sender, Ref, Reason) -> - erlang:send(Sender, {Ref, ?NACK(Reason)}), + Sender ! {Ref, ?NACK(Reason)}, ok. -spec maybe_ack(emqx_types:message()) -> emqx_types:message(). maybe_ack(Msg) -> - case get_ack_ref(Msg) of + case get_group_ack(Msg) of ?NO_ACK -> Msg; - {Sender, Ref} -> - erlang:send(Sender, {Ref, ?ACK}), - without_ack_ref(Msg) + {_Group, Sender, Ref} -> + Sender ! {Ref, ?ACK}, + %% without_group_ack(Msg) + without_group_ack(Msg) end. pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> @@ -286,8 +299,10 @@ do_pick_subscriber(_Group, _Topic, hash_topic, _ClientId, SourceTopic, Count) -> do_pick_subscriber(Group, Topic, round_robin, _ClientId, _SourceTopic, Count) -> Rem = case erlang:get({shared_sub_round_robin, Group, Topic}) of - undefined -> rand:uniform(Count) - 1; - N -> (N + 1) rem Count + undefined -> + rand:uniform(Count) - 1; + N -> + (N + 1) rem Count end, _ = erlang:put({shared_sub_round_robin, Group, Topic}, Rem), Rem + 1. @@ -316,11 +331,16 @@ init_monitors() -> ?TAB ). +handle_call({cleanup, SubPid}, _From, State) -> + cleanup_down(SubPid), + {reply, ok, State}; handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> mria:dirty_write(?TAB, record(Group, Topic, SubPid)), case ets:member(?SHARED_SUBS, {Group, Topic}) of - true -> ok; - false -> ok = emqx_router:do_add_route(Topic, {Group, node()}) + true -> + ok; + false -> + ok = emqx_router:do_add_route(Topic, {Group, node()}) end, ok = maybe_insert_alive_tab(SubPid), true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}), @@ -348,11 +368,17 @@ handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = P % handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) -> % #emqx_shared_subscription{subpid = SubPid} = OldRecord, % {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})}; - handle_info({mnesia_table_event, _Event}, State) -> {noreply, State}; handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{pmon = PMon}) -> - ?SLOG(info, #{msg => "shared_subscriber_down", sub_pid => SubPid, reason => Reason}), + ?SLOG( + info, + #{ + msg => "shared_subscriber_down", + sub_pid => SubPid, + reason => Reason + } + ), cleanup_down(SubPid), {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})}; handle_info(_Info, State) -> @@ -369,7 +395,8 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %% keep track of alive remote pids -maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok; +maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> + ok; maybe_insert_alive_tab(Pid) when is_pid(Pid) -> ets:insert(?ALIVE_SUBS, {Pid}), ok. @@ -405,6 +432,8 @@ is_alive_sub(Pid) -> delete_route_if_needed({Group, Topic}) -> case ets:member(?SHARED_SUBS, {Group, Topic}) of - true -> ok; - false -> ok = emqx_router:do_delete_route(Topic, {Group, node()}) + true -> + ok; + false -> + ok = emqx_router:do_delete_route(Topic, {Group, node()}) end. diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 6b171206a..82e3f76d6 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -49,7 +49,7 @@ t_is_ack_required(_) -> t_maybe_nack_dropped(_) -> ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual( ok, @@ -60,7 +60,7 @@ t_maybe_nack_dropped(_) -> ). t_nack_no_connection(_) -> - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual( ok, @@ -72,7 +72,7 @@ t_nack_no_connection(_) -> t_maybe_ack(_) -> ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, ?assertEqual( #message{headers = #{shared_dispatch_ack => ?no_ack}}, emqx_shared_sub:maybe_ack(Msg) @@ -321,11 +321,15 @@ test_two_messages(Strategy, WithAck) -> ok. last_message(ExpectedPayload, Pids) -> + last_message(ExpectedPayload, Pids, 100). + +last_message(ExpectedPayload, Pids, Timeout) -> receive {publish, #{client_pid := Pid, payload := ExpectedPayload}} -> ct:pal("~p ====== ~p", [Pids, Pid]), {true, Pid} - after 100 -> + after Timeout -> + ct:pal("not yet"), <<"not yet?">> end.