diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index e79c30f4a..c31efb0a6 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -409,18 +409,14 @@ enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) -> [Msg | enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session)] end. -enrich_deliver( - ClientInfo, - {deliver, Topic, Msg = #message{headers = #{redispatch_to := {Group, Topic}}}}, - UpgradeQoS, - Session -) -> - %% Only QoS_1 and QoS_2 messages added `redispatch_to` header - %% For QoS 0 message, send it as regular dispatch - Deliver = {deliver, emqx_topic:make_shared_record(Group, Topic), Msg}, - enrich_deliver(ClientInfo, Deliver, UpgradeQoS, Session); enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) -> - SubOpts = ?IMPL(Session):get_subscription(Topic, Session), + SubOpts = + case Msg of + #message{headers = #{shared_record := SharedRecord}} -> + ?IMPL(Session):get_subscription(SharedRecord, Session); + _ -> + ?IMPL(Session):get_subscription(Topic, Session) + end, enrich_message(ClientInfo, Msg, SubOpts, UpgradeQoS). enrich_message( diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 84921be6b..c5ee9e7ab 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -141,14 +141,15 @@ record(Group, Topic, SubPid) -> dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery, _FailedSubs = #{}). -dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> - #message{from = ClientId, topic = SourceTopic} = Msg, +dispatch(Group, Topic, Delivery = #delivery{message = Msg0}, FailedSubs) -> + #message{from = ClientId, topic = SourceTopic} = Msg0, + Msg1 = with_shared_record(Msg0, Group, Topic), case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of false -> {error, no_subscribers}; {Type, SubPid} -> - Msg1 = with_redispatch_to(Msg, Group, Topic), - case do_dispatch(SubPid, Group, Topic, Msg1, Type) of + Msg2 = with_redispatch_to(Msg1, Group, Topic), + case do_dispatch(SubPid, Group, Topic, Msg2, Type) of ok -> {ok, 1}; {error, Reason} -> @@ -239,6 +240,9 @@ with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> with_redispatch_to(Msg, Group, Topic) -> emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg). +with_shared_record(Msg, Group, Topic) -> + emqx_message:set_headers(#{shared_record => emqx_topic:make_shared_record(Group, Topic)}, Msg). + %% @hidden Redispatch is needed only for the messages with redispatch_to header added. is_redispatch_needed(#message{} = Msg) -> case get_redispatch_to(Msg) of diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index da108ceef..2bc6bd8ea 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -308,7 +308,10 @@ t_shared_subscribe(Config) when is_list(Config) -> emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)), ?assert( receive - {deliver, <<"topic">>, #message{payload = <<"hello">>}} -> + {deliver, <<"topic">>, #message{ + headers = #{shared_record := #share{group = <<"group">>, topic = <<"topic">>}}, + payload = <<"hello">> + }} -> true; Msg -> ct:pal("Msg: ~p", [Msg]),