chore: fix shared subscription redispatch
This commit is contained in:
parent
8893801910
commit
9989f4df7e
|
@ -638,26 +638,21 @@ run_terminate_hooks(ClientInfo, takeovered, Session) ->
|
||||||
run_terminate_hooks(ClientInfo, Reason, Session) ->
|
run_terminate_hooks(ClientInfo, Reason, Session) ->
|
||||||
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
||||||
|
|
||||||
redispatch_shared_messages(#session{inflight = Inflight}) ->
|
redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
|
||||||
InflightList = emqx_inflight:to_list(Inflight),
|
InflightList = lists:map(fun({_, {Msg, _Ts}}) -> Msg end,
|
||||||
lists:foreach(fun
|
emqx_inflight:to_list(sort_fun(), Inflight)),
|
||||||
%% Only QoS1 messages get redispatched, because QoS2 messages
|
MqList = mqueue_to_list(Q, []),
|
||||||
%% must be sent to the same client, once they're in flight
|
emqx_shared_sub:redispatch(InflightList ++ MqList).
|
||||||
({_, {#message{qos = ?QOS_2} = Msg, _}}) ->
|
|
||||||
?LOG(warning, "Not redispatching qos2 msg: ~s", [emqx_message:format(Msg)]);
|
%% convert mqueue to a list
|
||||||
({_, {#message{topic = Topic, qos = ?QOS_1} = Msg, _}}) ->
|
%% the messages at the head of the list is to be dispatched before the tail
|
||||||
case emqx_shared_sub:get_group(Msg) of
|
mqueue_to_list(Q, Acc) ->
|
||||||
{ok, Group} ->
|
case emqx_mqueue:out(Q) of
|
||||||
%% Note that dispatch is called with self() in failed subs
|
{empty, _Q} ->
|
||||||
%% This is done to avoid dispatching back to caller
|
lists:reverse(Acc);
|
||||||
Delivery = #delivery{sender = self(), message = Msg},
|
{{value, Msg}, Q1} ->
|
||||||
emqx_shared_sub:dispatch_to_non_self(Group, Topic, Delivery);
|
mqueue_to_list(Q1, [Msg | Acc])
|
||||||
_ ->
|
end.
|
||||||
false
|
|
||||||
end;
|
|
||||||
(_) ->
|
|
||||||
ok
|
|
||||||
end, InflightList).
|
|
||||||
|
|
||||||
-compile({inline, [run_hook/2]}).
|
-compile({inline, [run_hook/2]}).
|
||||||
run_hook(Name, Args) ->
|
run_hook(Name, Args) ->
|
||||||
|
|
|
@ -39,7 +39,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ dispatch/3
|
-export([ dispatch/3
|
||||||
, dispatch_to_non_self/3
|
, redispatch/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ maybe_ack/1
|
-export([ maybe_ack/1
|
||||||
|
@ -47,7 +47,6 @@
|
||||||
, nack_no_connection/1
|
, nack_no_connection/1
|
||||||
, is_ack_required/1
|
, is_ack_required/1
|
||||||
, is_retry_dispatch/1
|
, is_retry_dispatch/1
|
||||||
, get_group/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% for testing
|
%% for testing
|
||||||
|
@ -84,6 +83,7 @@
|
||||||
-define(ACK, shared_sub_ack).
|
-define(ACK, shared_sub_ack).
|
||||||
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
||||||
-define(NO_ACK, no_ack).
|
-define(NO_ACK, no_ack).
|
||||||
|
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
|
||||||
|
|
||||||
-record(state, {pmon}).
|
-record(state, {pmon}).
|
||||||
|
|
||||||
|
@ -134,11 +134,12 @@ dispatch(Group, Topic, Delivery) ->
|
||||||
Strategy = strategy(Group),
|
Strategy = strategy(Group),
|
||||||
dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}).
|
dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}).
|
||||||
|
|
||||||
dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg0}, FailedSubs) ->
|
||||||
#message{from = ClientId, topic = SourceTopic} = Msg,
|
#message{from = ClientId, topic = SourceTopic} = Msg0,
|
||||||
case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of
|
case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of
|
||||||
false -> {error, no_subscribers};
|
false -> {error, no_subscribers};
|
||||||
{Type, SubPid} ->
|
{Type, SubPid} ->
|
||||||
|
Msg = with_redispatch_to(Msg0, Group, Topic),
|
||||||
case do_dispatch(SubPid, Group, Topic, Msg, Type) of
|
case do_dispatch(SubPid, Group, Topic, Msg, Type) of
|
||||||
ok -> {ok, 1};
|
ok -> {ok, 1};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -162,7 +163,7 @@ ack_enabled() ->
|
||||||
emqx:get_env(shared_dispatch_ack_enabled, false).
|
emqx:get_env(shared_dispatch_ack_enabled, false).
|
||||||
|
|
||||||
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
|
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
|
||||||
%% Deadlock otherwise
|
%% dispatch without ack, deadlock otherwise
|
||||||
send(SubPid, Topic, {deliver, Topic, Msg});
|
send(SubPid, Topic, {deliver, Topic, Msg});
|
||||||
%% return either 'ok' (when everything is fine) or 'error'
|
%% return either 'ok' (when everything is fine) or 'error'
|
||||||
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
||||||
|
@ -176,6 +177,10 @@ do_dispatch(SubPid, Group, Topic, Msg, Type) ->
|
||||||
send(SubPid, Topic, {deliver, Topic, Msg})
|
send(SubPid, Topic, {deliver, Topic, Msg})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> Msg;
|
||||||
|
with_redispatch_to(Msg, Group, Topic) ->
|
||||||
|
emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).
|
||||||
|
|
||||||
dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
|
dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
|
||||||
%% For QoS 1/2 message, expect an ack
|
%% For QoS 1/2 message, expect an ack
|
||||||
Ref = erlang:monitor(process, SubPid),
|
Ref = erlang:monitor(process, SubPid),
|
||||||
|
@ -228,13 +233,22 @@ without_group_ack(Msg) ->
|
||||||
get_group_ack(Msg) ->
|
get_group_ack(Msg) ->
|
||||||
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
|
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
|
||||||
|
|
||||||
-spec(get_group(emqx_types:message()) -> {ok, any()} | error).
|
%% @hidden Redispatch is neede only for the messages with redispatch_to header added.
|
||||||
get_group(Msg) ->
|
is_redispatch_needed(Msg) ->
|
||||||
case get_group_ack(Msg) of
|
case get_redispatch_to(Msg) of
|
||||||
{_Sender, {_Type, Group, _Ref}} -> {ok, Group};
|
?REDISPATCH_TO(_, _) ->
|
||||||
_ -> error
|
true;
|
||||||
|
_ ->
|
||||||
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @hidden Return the `redispatch_to` group-topic in the message header.
|
||||||
|
%% `false` is returned if the message is not a shared dispatch.
|
||||||
|
%% or when it's a QoS 0 message.
|
||||||
|
-spec(get_redispatch_to(emqx_types:message()) -> emqx_types:topic() | false).
|
||||||
|
get_redispatch_to(Msg) ->
|
||||||
|
emqx_message:get_header(redispatch_to, Msg, false).
|
||||||
|
|
||||||
-spec(is_ack_required(emqx_types:message()) -> boolean()).
|
-spec(is_ack_required(emqx_types:message()) -> boolean()).
|
||||||
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
|
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
|
||||||
|
|
||||||
|
@ -245,6 +259,26 @@ is_retry_dispatch(Msg) ->
|
||||||
_ -> false
|
_ -> false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @doc Redispatch shared deliveries to other members in the group.
|
||||||
|
redispatch(Messages0) ->
|
||||||
|
Messages = lists:filter(fun is_redispatch_needed/1, Messages0),
|
||||||
|
case length(Messages) of
|
||||||
|
L when L > 0 ->
|
||||||
|
?LOG(info, "Redispatching ~p shared subscription messages", [L]),
|
||||||
|
lists:foreach(fun redispatch_shared_message/1, Messages);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
redispatch_shared_message(Msg) ->
|
||||||
|
%% As long as it's still a #message{} record in inflight,
|
||||||
|
%% we should try to re-dispatch
|
||||||
|
?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg),
|
||||||
|
%% Note that dispatch is called with self() in failed subs
|
||||||
|
%% This is done to avoid dispatching back to caller
|
||||||
|
Delivery = #delivery{sender = self(), message = Msg},
|
||||||
|
dispatch_to_non_self(Group, Topic, Delivery).
|
||||||
|
|
||||||
%% @doc Negative ack dropped message due to inflight window or message queue being full.
|
%% @doc Negative ack dropped message due to inflight window or message queue being full.
|
||||||
-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop).
|
-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop).
|
||||||
maybe_nack_dropped(Msg) ->
|
maybe_nack_dropped(Msg) ->
|
||||||
|
|
Loading…
Reference in New Issue