diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index c31efb0a6..f5157aaf3 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -412,8 +412,8 @@ enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) -> enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) -> SubOpts = case Msg of - #message{headers = #{shared_record := SharedRecord}} -> - ?IMPL(Session):get_subscription(SharedRecord, Session); + #message{headers = #{redispatch_to := {Group, T}}} -> + ?IMPL(Session):get_subscription(emqx_topic:make_shared_record(Group, T), Session); _ -> ?IMPL(Session):get_subscription(Topic, Session) end, diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index c5ee9e7ab..691ab7497 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -141,15 +141,14 @@ record(Group, Topic, SubPid) -> dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery, _FailedSubs = #{}). -dispatch(Group, Topic, Delivery = #delivery{message = Msg0}, FailedSubs) -> - #message{from = ClientId, topic = SourceTopic} = Msg0, - Msg1 = with_shared_record(Msg0, Group, Topic), +dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> + #message{from = ClientId, topic = SourceTopic} = Msg, case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of false -> {error, no_subscribers}; {Type, SubPid} -> - Msg2 = with_redispatch_to(Msg1, Group, Topic), - case do_dispatch(SubPid, Group, Topic, Msg2, Type) of + Msg1 = with_redispatch_to(Msg, Group, Topic), + case do_dispatch(SubPid, Group, Topic, Msg1, Type) of ok -> {ok, 1}; {error, Reason} -> @@ -235,22 +234,16 @@ without_group_ack(Msg) -> get_group_ack(Msg) -> emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). -with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> - Msg; +%% always add `redispatch_to` header to the message +%% for QOS_0 msgs, redispatch_to is not needed and filtered out in is_redispatch_needed/1 with_redispatch_to(Msg, Group, Topic) -> emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg). -with_shared_record(Msg, Group, Topic) -> - emqx_message:set_headers(#{shared_record => emqx_topic:make_shared_record(Group, Topic)}, Msg). - -%% @hidden Redispatch is needed only for the messages with redispatch_to header added. -is_redispatch_needed(#message{} = Msg) -> - case get_redispatch_to(Msg) of - ?REDISPATCH_TO(_, _) -> - true; - _ -> - false - end. +%% @hidden Redispatch is needed only for the messages which not QOS_0 +is_redispatch_needed(#message{qos = ?QOS_0}) -> + false; +is_redispatch_needed(#message{headers = #{redispatch_to := ?REDISPATCH_TO(_, _)}}) -> + true. %% @doc Redispatch shared deliveries to other members in the group. redispatch(Messages0) -> diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index 2bc6bd8ea..18d9f9651 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -309,7 +309,7 @@ t_shared_subscribe(Config) when is_list(Config) -> ?assert( receive {deliver, <<"topic">>, #message{ - headers = #{shared_record := #share{group = <<"group">>, topic = <<"topic">>}}, + headers = #{redispatch_to := {<<"group">>, <<"topic">>}}, payload = <<"hello">> }} -> true;