refactor: move ?REDISPATCH_TO macro to emqx_mqtt.hrl
This commit is contained in:
parent
3b5cc912e7
commit
d563121284
|
@ -675,6 +675,8 @@ end).
|
||||||
-define(QUEUE, "$queue").
|
-define(QUEUE, "$queue").
|
||||||
-define(SHARE(Group, Topic), emqx_topic:join([<<?SHARE>>, Group, Topic])).
|
-define(SHARE(Group, Topic), emqx_topic:join([<<?SHARE>>, Group, Topic])).
|
||||||
|
|
||||||
|
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
|
||||||
|
|
||||||
-define(SHARE_EMPTY_FILTER, share_subscription_topic_cannot_be_empty).
|
-define(SHARE_EMPTY_FILTER, share_subscription_topic_cannot_be_empty).
|
||||||
-define(SHARE_EMPTY_GROUP, share_subscription_group_name_cannot_be_empty).
|
-define(SHARE_EMPTY_GROUP, share_subscription_group_name_cannot_be_empty).
|
||||||
-define(SHARE_RECURSIVELY, '$share_cannot_be_used_as_real_topic_filter').
|
-define(SHARE_RECURSIVELY, '$share_cannot_be_used_as_real_topic_filter').
|
||||||
|
|
|
@ -412,7 +412,7 @@ enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) ->
|
||||||
enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) ->
|
enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) ->
|
||||||
SubOpts =
|
SubOpts =
|
||||||
case Msg of
|
case Msg of
|
||||||
#message{headers = #{redispatch_to := {Group, T}}} ->
|
#message{headers = #{redispatch_to := ?REDISPATCH_TO(Group, T)}} ->
|
||||||
?IMPL(Session):get_subscription(emqx_topic:make_shared_record(Group, T), Session);
|
?IMPL(Session):get_subscription(emqx_topic:make_shared_record(Group, T), Session);
|
||||||
_ ->
|
_ ->
|
||||||
?IMPL(Session):get_subscription(Topic, Session)
|
?IMPL(Session):get_subscription(Topic, Session)
|
||||||
|
|
|
@ -95,7 +95,6 @@
|
||||||
-define(ACK, shared_sub_ack).
|
-define(ACK, shared_sub_ack).
|
||||||
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
||||||
-define(NO_ACK, no_ack).
|
-define(NO_ACK, no_ack).
|
||||||
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
|
|
||||||
-define(SUBSCRIBER_DOWN, noproc).
|
-define(SUBSCRIBER_DOWN, noproc).
|
||||||
|
|
||||||
-type redispatch_to() :: ?REDISPATCH_TO(emqx_types:group(), emqx_types:topic()).
|
-type redispatch_to() :: ?REDISPATCH_TO(emqx_types:group(), emqx_types:topic()).
|
||||||
|
|
|
@ -309,7 +309,7 @@ t_shared_subscribe(Config) when is_list(Config) ->
|
||||||
?assert(
|
?assert(
|
||||||
receive
|
receive
|
||||||
{deliver, <<"topic">>, #message{
|
{deliver, <<"topic">>, #message{
|
||||||
headers = #{redispatch_to := {<<"group">>, <<"topic">>}},
|
headers = #{redispatch_to := ?REDISPATCH_TO(<<"group">>, <<"topic">>)},
|
||||||
payload = <<"hello">>
|
payload = <<"hello">>
|
||||||
}} ->
|
}} ->
|
||||||
true;
|
true;
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
|
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
@ -42,7 +43,7 @@ t_printable_maps(_) ->
|
||||||
peerhost => {127, 0, 0, 1},
|
peerhost => {127, 0, 0, 1},
|
||||||
peername => {{127, 0, 0, 1}, 9980},
|
peername => {{127, 0, 0, 1}, 9980},
|
||||||
sockname => {{127, 0, 0, 1}, 1883},
|
sockname => {{127, 0, 0, 1}, 1883},
|
||||||
redispatch_to => {<<"group">>, <<"sub/topic/+">>},
|
redispatch_to => ?REDISPATCH_TO(<<"group">>, <<"sub/topic/+">>),
|
||||||
shared_dispatch_ack => {self(), ref}
|
shared_dispatch_ack => {self(), ref}
|
||||||
},
|
},
|
||||||
Converted = emqx_rule_events:printable_maps(Headers),
|
Converted = emqx_rule_events:printable_maps(Headers),
|
||||||
|
|
Loading…
Reference in New Issue