diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 94483bb87..73eacea8b 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -638,26 +638,21 @@ run_terminate_hooks(ClientInfo, takeovered, Session) -> run_terminate_hooks(ClientInfo, Reason, Session) -> run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). -redispatch_shared_messages(#session{inflight = Inflight}) -> - InflightList = emqx_inflight:to_list(Inflight), - lists:foreach(fun - %% Only QoS1 messages get redispatched, because QoS2 messages - %% must be sent to the same client, once they're in flight - ({_, {#message{qos = ?QOS_2} = Msg, _}}) -> - ?LOG(warning, "Not redispatching qos2 msg: ~s", [emqx_message:format(Msg)]); - ({_, {#message{topic = Topic, qos = ?QOS_1} = Msg, _}}) -> - case emqx_shared_sub:get_group(Msg) of - {ok, Group} -> - %% Note that dispatch is called with self() in failed subs - %% This is done to avoid dispatching back to caller - Delivery = #delivery{sender = self(), message = Msg}, - emqx_shared_sub:dispatch_to_non_self(Group, Topic, Delivery); - _ -> - false - end; - (_) -> - ok - end, InflightList). +redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) -> + InflightList = lists:map(fun({_, {Msg, _Ts}}) -> Msg end, + emqx_inflight:to_list(sort_fun(), Inflight)), + MqList = mqueue_to_list(Q, []), + emqx_shared_sub:redispatch(InflightList ++ MqList). + +%% convert mqueue to a list +%% the messages at the head of the list is to be dispatched before the tail +mqueue_to_list(Q, Acc) -> + case emqx_mqueue:out(Q) of + {empty, _Q} -> + lists:reverse(Acc); + {{value, Msg}, Q1} -> + mqueue_to_list(Q1, [Msg | Acc]) + end. -compile({inline, [run_hook/2]}). run_hook(Name, Args) -> diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index cc57e001f..4987248cf 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -39,7 +39,7 @@ ]). -export([ dispatch/3 - , dispatch_to_non_self/3 + , redispatch/1 ]). -export([ maybe_ack/1 @@ -47,7 +47,6 @@ , nack_no_connection/1 , is_ack_required/1 , is_retry_dispatch/1 - , get_group/1 ]). %% for testing @@ -84,6 +83,7 @@ -define(ACK, shared_sub_ack). -define(NACK(Reason), {shared_sub_nack, Reason}). -define(NO_ACK, no_ack). +-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). -record(state, {pmon}). @@ -134,11 +134,12 @@ dispatch(Group, Topic, Delivery) -> Strategy = strategy(Group), dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}). -dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> - #message{from = ClientId, topic = SourceTopic} = Msg, +dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg0}, FailedSubs) -> + #message{from = ClientId, topic = SourceTopic} = Msg0, case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of false -> {error, no_subscribers}; {Type, SubPid} -> + Msg = with_redispatch_to(Msg0, Group, Topic), case do_dispatch(SubPid, Group, Topic, Msg, Type) of ok -> {ok, 1}; {error, Reason} -> @@ -162,7 +163,7 @@ ack_enabled() -> emqx:get_env(shared_dispatch_ack_enabled, false). do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> - %% Deadlock otherwise + %% dispatch without ack, deadlock otherwise send(SubPid, Topic, {deliver, Topic, Msg}); %% return either 'ok' (when everything is fine) or 'error' do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> @@ -176,6 +177,10 @@ do_dispatch(SubPid, Group, Topic, Msg, Type) -> send(SubPid, Topic, {deliver, Topic, Msg}) end. +with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> Msg; +with_redispatch_to(Msg, Group, Topic) -> + emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg). + dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), @@ -228,13 +233,22 @@ without_group_ack(Msg) -> get_group_ack(Msg) -> emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). --spec(get_group(emqx_types:message()) -> {ok, any()} | error). -get_group(Msg) -> - case get_group_ack(Msg) of - {_Sender, {_Type, Group, _Ref}} -> {ok, Group}; - _ -> error +%% @hidden Redispatch is neede only for the messages with redispatch_to header added. +is_redispatch_needed(Msg) -> + case get_redispatch_to(Msg) of + ?REDISPATCH_TO(_, _) -> + true; + _ -> + false end. +%% @hidden Return the `redispatch_to` group-topic in the message header. +%% `false` is returned if the message is not a shared dispatch. +%% or when it's a QoS 0 message. +-spec(get_redispatch_to(emqx_types:message()) -> emqx_types:topic() | false). +get_redispatch_to(Msg) -> + emqx_message:get_header(redispatch_to, Msg, false). + -spec(is_ack_required(emqx_types:message()) -> boolean()). is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). @@ -245,6 +259,26 @@ is_retry_dispatch(Msg) -> _ -> false end. +%% @doc Redispatch shared deliveries to other members in the group. +redispatch(Messages0) -> + Messages = lists:filter(fun is_redispatch_needed/1, Messages0), + case length(Messages) of + L when L > 0 -> + ?LOG(info, "Redispatching ~p shared subscription messages", [L]), + lists:foreach(fun redispatch_shared_message/1, Messages); + _ -> + ok + end. + +redispatch_shared_message(Msg) -> + %% As long as it's still a #message{} record in inflight, + %% we should try to re-dispatch + ?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg), + %% Note that dispatch is called with self() in failed subs + %% This is done to avoid dispatching back to caller + Delivery = #delivery{sender = self(), message = Msg}, + dispatch_to_non_self(Group, Topic, Delivery). + %% @doc Negative ack dropped message due to inflight window or message queue being full. -spec(maybe_nack_dropped(emqx_types:message()) -> store | drop). maybe_nack_dropped(Msg) ->