fix: add `redispatch_to` header to all msgs when deliver shared-sub

- to find correct SubOpts for shared-sub dispatch
- use previous key `redispatch_to` to ensure rolling upgrade compatibility
This commit is contained in:
JimMoen 2023-10-27 16:15:23 +08:00
parent 53383991d9
commit 3b5cc912e7
No known key found for this signature in database
GPG Key ID: 87A520B4F76BA86D
3 changed files with 14 additions and 21 deletions

View File

@ -412,8 +412,8 @@ enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) ->
enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) -> enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) ->
SubOpts = SubOpts =
case Msg of case Msg of
#message{headers = #{shared_record := SharedRecord}} -> #message{headers = #{redispatch_to := {Group, T}}} ->
?IMPL(Session):get_subscription(SharedRecord, Session); ?IMPL(Session):get_subscription(emqx_topic:make_shared_record(Group, T), Session);
_ -> _ ->
?IMPL(Session):get_subscription(Topic, Session) ?IMPL(Session):get_subscription(Topic, Session)
end, end,

View File

@ -141,15 +141,14 @@ record(Group, Topic, SubPid) ->
dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery) ->
dispatch(Group, Topic, Delivery, _FailedSubs = #{}). dispatch(Group, Topic, Delivery, _FailedSubs = #{}).
dispatch(Group, Topic, Delivery = #delivery{message = Msg0}, FailedSubs) -> dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
#message{from = ClientId, topic = SourceTopic} = Msg0, #message{from = ClientId, topic = SourceTopic} = Msg,
Msg1 = with_shared_record(Msg0, Group, Topic),
case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of
false -> false ->
{error, no_subscribers}; {error, no_subscribers};
{Type, SubPid} -> {Type, SubPid} ->
Msg2 = with_redispatch_to(Msg1, Group, Topic), Msg1 = with_redispatch_to(Msg, Group, Topic),
case do_dispatch(SubPid, Group, Topic, Msg2, Type) of case do_dispatch(SubPid, Group, Topic, Msg1, Type) of
ok -> ok ->
{ok, 1}; {ok, 1};
{error, Reason} -> {error, Reason} ->
@ -235,22 +234,16 @@ without_group_ack(Msg) ->
get_group_ack(Msg) -> get_group_ack(Msg) ->
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> %% always add `redispatch_to` header to the message
Msg; %% for QOS_0 msgs, redispatch_to is not needed and filtered out in is_redispatch_needed/1
with_redispatch_to(Msg, Group, Topic) -> with_redispatch_to(Msg, Group, Topic) ->
emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg). emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).
with_shared_record(Msg, Group, Topic) -> %% @hidden Redispatch is needed only for the messages which not QOS_0
emqx_message:set_headers(#{shared_record => emqx_topic:make_shared_record(Group, Topic)}, Msg). is_redispatch_needed(#message{qos = ?QOS_0}) ->
false;
%% @hidden Redispatch is needed only for the messages with redispatch_to header added. is_redispatch_needed(#message{headers = #{redispatch_to := ?REDISPATCH_TO(_, _)}}) ->
is_redispatch_needed(#message{} = Msg) -> true.
case get_redispatch_to(Msg) of
?REDISPATCH_TO(_, _) ->
true;
_ ->
false
end.
%% @doc Redispatch shared deliveries to other members in the group. %% @doc Redispatch shared deliveries to other members in the group.
redispatch(Messages0) -> redispatch(Messages0) ->

View File

@ -309,7 +309,7 @@ t_shared_subscribe(Config) when is_list(Config) ->
?assert( ?assert(
receive receive
{deliver, <<"topic">>, #message{ {deliver, <<"topic">>, #message{
headers = #{shared_record := #share{group = <<"group">>, topic = <<"topic">>}}, headers = #{redispatch_to := {<<"group">>, <<"topic">>}},
payload = <<"hello">> payload = <<"hello">>
}} -> }} ->
true; true;