fix: find SubOpts by shared_record, not deliver topic
This commit is contained in:
parent
9732e31395
commit
802a36c670
|
@ -409,18 +409,14 @@ enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) ->
|
||||||
[Msg | enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session)]
|
[Msg | enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session)]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
enrich_deliver(
|
|
||||||
ClientInfo,
|
|
||||||
{deliver, Topic, Msg = #message{headers = #{redispatch_to := {Group, Topic}}}},
|
|
||||||
UpgradeQoS,
|
|
||||||
Session
|
|
||||||
) ->
|
|
||||||
%% Only QoS_1 and QoS_2 messages added `redispatch_to` header
|
|
||||||
%% For QoS 0 message, send it as regular dispatch
|
|
||||||
Deliver = {deliver, emqx_topic:make_shared_record(Group, Topic), Msg},
|
|
||||||
enrich_deliver(ClientInfo, Deliver, UpgradeQoS, Session);
|
|
||||||
enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) ->
|
enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) ->
|
||||||
SubOpts = ?IMPL(Session):get_subscription(Topic, Session),
|
SubOpts =
|
||||||
|
case Msg of
|
||||||
|
#message{headers = #{shared_record := SharedRecord}} ->
|
||||||
|
?IMPL(Session):get_subscription(SharedRecord, Session);
|
||||||
|
_ ->
|
||||||
|
?IMPL(Session):get_subscription(Topic, Session)
|
||||||
|
end,
|
||||||
enrich_message(ClientInfo, Msg, SubOpts, UpgradeQoS).
|
enrich_message(ClientInfo, Msg, SubOpts, UpgradeQoS).
|
||||||
|
|
||||||
enrich_message(
|
enrich_message(
|
||||||
|
|
|
@ -141,14 +141,15 @@ record(Group, Topic, SubPid) ->
|
||||||
dispatch(Group, Topic, Delivery) ->
|
dispatch(Group, Topic, Delivery) ->
|
||||||
dispatch(Group, Topic, Delivery, _FailedSubs = #{}).
|
dispatch(Group, Topic, Delivery, _FailedSubs = #{}).
|
||||||
|
|
||||||
dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
dispatch(Group, Topic, Delivery = #delivery{message = Msg0}, FailedSubs) ->
|
||||||
#message{from = ClientId, topic = SourceTopic} = Msg,
|
#message{from = ClientId, topic = SourceTopic} = Msg0,
|
||||||
|
Msg1 = with_shared_record(Msg0, Group, Topic),
|
||||||
case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of
|
case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of
|
||||||
false ->
|
false ->
|
||||||
{error, no_subscribers};
|
{error, no_subscribers};
|
||||||
{Type, SubPid} ->
|
{Type, SubPid} ->
|
||||||
Msg1 = with_redispatch_to(Msg, Group, Topic),
|
Msg2 = with_redispatch_to(Msg1, Group, Topic),
|
||||||
case do_dispatch(SubPid, Group, Topic, Msg1, Type) of
|
case do_dispatch(SubPid, Group, Topic, Msg2, Type) of
|
||||||
ok ->
|
ok ->
|
||||||
{ok, 1};
|
{ok, 1};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -239,6 +240,9 @@ with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) ->
|
||||||
with_redispatch_to(Msg, Group, Topic) ->
|
with_redispatch_to(Msg, Group, Topic) ->
|
||||||
emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).
|
emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).
|
||||||
|
|
||||||
|
with_shared_record(Msg, Group, Topic) ->
|
||||||
|
emqx_message:set_headers(#{shared_record => emqx_topic:make_shared_record(Group, Topic)}, Msg).
|
||||||
|
|
||||||
%% @hidden Redispatch is needed only for the messages with redispatch_to header added.
|
%% @hidden Redispatch is needed only for the messages with redispatch_to header added.
|
||||||
is_redispatch_needed(#message{} = Msg) ->
|
is_redispatch_needed(#message{} = Msg) ->
|
||||||
case get_redispatch_to(Msg) of
|
case get_redispatch_to(Msg) of
|
||||||
|
|
|
@ -308,7 +308,10 @@ t_shared_subscribe(Config) when is_list(Config) ->
|
||||||
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
|
emqx_broker:safe_publish(emqx_message:make(ct, <<"topic">>, <<"hello">>)),
|
||||||
?assert(
|
?assert(
|
||||||
receive
|
receive
|
||||||
{deliver, <<"topic">>, #message{payload = <<"hello">>}} ->
|
{deliver, <<"topic">>, #message{
|
||||||
|
headers = #{shared_record := #share{group = <<"group">>, topic = <<"topic">>}},
|
||||||
|
payload = <<"hello">>
|
||||||
|
}} ->
|
||||||
true;
|
true;
|
||||||
Msg ->
|
Msg ->
|
||||||
ct:pal("Msg: ~p", [Msg]),
|
ct:pal("Msg: ~p", [Msg]),
|
||||||
|
|
Loading…
Reference in New Issue