From d563121284d63fd0ff79873223ec6308a09ee607 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 27 Oct 2023 17:54:14 +0800 Subject: [PATCH] refactor: move ?REDISPATCH_TO macro to emqx_mqtt.hrl --- apps/emqx/include/emqx_mqtt.hrl | 2 ++ apps/emqx/src/emqx_session.erl | 2 +- apps/emqx/src/emqx_shared_sub.erl | 1 - apps/emqx/test/emqx_broker_SUITE.erl | 2 +- apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl | 3 ++- 5 files changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index 93c70a6e1..53fed0f9d 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -675,6 +675,8 @@ end). -define(QUEUE, "$queue"). -define(SHARE(Group, Topic), emqx_topic:join([<>, Group, Topic])). +-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). + -define(SHARE_EMPTY_FILTER, share_subscription_topic_cannot_be_empty). -define(SHARE_EMPTY_GROUP, share_subscription_group_name_cannot_be_empty). -define(SHARE_RECURSIVELY, '$share_cannot_be_used_as_real_topic_filter'). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index f5157aaf3..147b0b35c 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -412,7 +412,7 @@ enrich_delivers(ClientInfo, [D | Rest], UpgradeQoS, Session) -> enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) -> SubOpts = case Msg of - #message{headers = #{redispatch_to := {Group, T}}} -> + #message{headers = #{redispatch_to := ?REDISPATCH_TO(Group, T)}} -> ?IMPL(Session):get_subscription(emqx_topic:make_shared_record(Group, T), Session); _ -> ?IMPL(Session):get_subscription(Topic, Session) diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 691ab7497..89a785590 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -95,7 +95,6 @@ -define(ACK, shared_sub_ack). -define(NACK(Reason), {shared_sub_nack, Reason}). -define(NO_ACK, no_ack). --define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). -define(SUBSCRIBER_DOWN, noproc). -type redispatch_to() :: ?REDISPATCH_TO(emqx_types:group(), emqx_types:topic()). diff --git a/apps/emqx/test/emqx_broker_SUITE.erl b/apps/emqx/test/emqx_broker_SUITE.erl index 18d9f9651..b416f1730 100644 --- a/apps/emqx/test/emqx_broker_SUITE.erl +++ b/apps/emqx/test/emqx_broker_SUITE.erl @@ -309,7 +309,7 @@ t_shared_subscribe(Config) when is_list(Config) -> ?assert( receive {deliver, <<"topic">>, #message{ - headers = #{redispatch_to := {<<"group">>, <<"topic">>}}, + headers = #{redispatch_to := ?REDISPATCH_TO(<<"group">>, <<"topic">>)}, payload = <<"hello">> }} -> true; diff --git a/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl index f8fe49ca8..fee112d9a 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). @@ -42,7 +43,7 @@ t_printable_maps(_) -> peerhost => {127, 0, 0, 1}, peername => {{127, 0, 0, 1}, 9980}, sockname => {{127, 0, 0, 1}, 1883}, - redispatch_to => {<<"group">>, <<"sub/topic/+">>}, + redispatch_to => ?REDISPATCH_TO(<<"group">>, <<"sub/topic/+">>), shared_dispatch_ack => {self(), ref} }, Converted = emqx_rule_events:printable_maps(Headers),