diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 085565403..a4bbb378c 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -23,6 +23,7 @@ -export([set_headers/2]). -export([get_header/2, get_header/3, set_header/3]). -export([is_expired/1, update_expiry/1]). +-export([remove_topic_alias/1]). -export([format/1]). -type(flag() :: atom()). @@ -109,6 +110,9 @@ update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, update_expiry(Msg) -> Msg. +remove_topic_alias(Msg = #message{headers = Headers}) -> + Msg#message{headers = maps:remove('Topic-Alias', Headers)}. + %% MilliSeconds elapsed(Since) -> max(0, timer:now_diff(os:timestamp(), Since) div 1000). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 5495e4ff7..7c182c755 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -617,12 +617,12 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); -deliver({publish, PacketId, Msg = #message{headers = Headers}}, PState = #pstate{mountpoint = MountPoint}) -> +deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) -> _ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg), Msg1 = emqx_message:update_expiry(Msg), Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), - send(emqx_packet:from_message(PacketId, Msg2#message{headers = maps:remove('Topic-Alias', Headers)}), PState); - + send(emqx_packet:from_message(PacketId, emqx_message:remove_topic_alias(Msg2)), PState); + deliver({puback, PacketId, ReasonCode}, PState) -> send(?PUBACK_PACKET(PacketId, ReasonCode), PState);