Set topic alias on v5 only

This commit is contained in:
JianBo He 2020-03-27 16:22:13 +08:00 committed by turtleDeng
parent 0ff1e9eadb
commit edb42b1b0f
1 changed files with 12 additions and 14 deletions

View File

@ -94,6 +94,8 @@
-type(replies() :: emqx_types:packet() | reply() | [reply()]). -type(replies() :: emqx_types:packet() | reply() | [reply()]).
-define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}).
-define(TIMER_TABLE, #{ -define(TIMER_TABLE, #{
alive_timer => keepalive, alive_timer => keepalive,
retry_timer => retry_delivery, retry_timer => retry_delivery,
@ -617,16 +619,14 @@ handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
handle_out(pubcomp, {PacketId, ReasonCode}, Channel) -> handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
{ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel}; {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel};
handle_out(suback, {PacketId, ReasonCodes}, handle_out(suback, {PacketId, ReasonCodes}, Channel = ?IS_MQTT_V5) ->
Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
return_suback(?SUBACK_PACKET(PacketId, ReasonCodes), Channel); return_suback(?SUBACK_PACKET(PacketId, ReasonCodes), Channel);
handle_out(suback, {PacketId, ReasonCodes}, Channel) -> handle_out(suback, {PacketId, ReasonCodes}, Channel) ->
ReasonCodes1 = [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes], ReasonCodes1 = [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes],
return_suback(?SUBACK_PACKET(PacketId, ReasonCodes1), Channel); return_suback(?SUBACK_PACKET(PacketId, ReasonCodes1), Channel);
handle_out(unsuback, {PacketId, ReasonCodes}, handle_out(unsuback, {PacketId, ReasonCodes}, Channel = ?IS_MQTT_V5) ->
Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
return_unsuback(?UNSUBACK_PACKET(PacketId, ReasonCodes), Channel); return_unsuback(?UNSUBACK_PACKET(PacketId, ReasonCodes), Channel);
handle_out(unsuback, {PacketId, _ReasonCodes}, Channel) -> handle_out(unsuback, {PacketId, _ReasonCodes}, Channel) ->
@ -636,8 +636,7 @@ handle_out(disconnect, ReasonCode, Channel) when is_integer(ReasonCode) ->
ReasonName = disconnect_reason(ReasonCode), ReasonName = disconnect_reason(ReasonCode),
handle_out(disconnect, {ReasonCode, ReasonName}, Channel); handle_out(disconnect, {ReasonCode, ReasonName}, Channel);
handle_out(disconnect, {ReasonCode, ReasonName}, Channel = handle_out(disconnect, {ReasonCode, ReasonName}, Channel = ?IS_MQTT_V5) ->
#channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
Packet = ?DISCONNECT_PACKET(ReasonCode), Packet = ?DISCONNECT_PACKET(ReasonCode),
{ok, [{outgoing, Packet}, {close, ReasonName}], Channel}; {ok, [{outgoing, Packet}, {close, ReasonName}], Channel};
@ -1078,7 +1077,7 @@ process_alias(Packet = #mqtt_packet{
properties = #{'Topic-Alias' := AliasId} properties = #{'Topic-Alias' := AliasId}
} = Publish } = Publish
}, },
Channel = #channel{topic_aliases = TopicAliases}) -> Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases}) ->
case find_alias(inbound, AliasId, TopicAliases) of case find_alias(inbound, AliasId, TopicAliases) of
{ok, Topic} -> {ok, Topic} ->
NPublish = Publish#mqtt_packet_publish{topic_name = Topic}, NPublish = Publish#mqtt_packet_publish{topic_name = Topic},
@ -1091,7 +1090,7 @@ process_alias(#mqtt_packet{
properties = #{'Topic-Alias' := AliasId} properties = #{'Topic-Alias' := AliasId}
} }
}, },
Channel = #channel{topic_aliases = TopicAliases}) -> Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases}) ->
NTopicAliases = save_alias(inbound, AliasId, Topic, TopicAliases), NTopicAliases = save_alias(inbound, AliasId, Topic, TopicAliases),
{ok, Channel#channel{topic_aliases = NTopicAliases}}; {ok, Channel#channel{topic_aliases = NTopicAliases}};
@ -1103,7 +1102,7 @@ process_alias(_Packet, Channel) -> {ok, Channel}.
packing_alias(Packet = #mqtt_packet{ packing_alias(Packet = #mqtt_packet{
variable = #mqtt_packet_publish{topic_name = Topic} = Publish variable = #mqtt_packet_publish{topic_name = Topic} = Publish
}, },
Channel = #channel{topic_aliases = TopicAliases, alias_maximum = Limits}) -> Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases, alias_maximum = Limits}) ->
case find_alias(outbound, Topic, TopicAliases) of case find_alias(outbound, Topic, TopicAliases) of
{ok, AliasId} -> {ok, AliasId} ->
NPublish = Publish#mqtt_packet_publish{ NPublish = Publish#mqtt_packet_publish{
@ -1202,7 +1201,7 @@ enrich_subid(_Properties, TopicFilters) -> TopicFilters.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Enrich SubOpts %% Enrich SubOpts
enrich_subopts(SubOpts, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) -> enrich_subopts(SubOpts, _Channel = ?IS_MQTT_V5) ->
SubOpts; SubOpts;
enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) -> enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) ->
NL = flag(emqx_zone:ignore_loop_deliver(Zone)), NL = flag(emqx_zone:ignore_loop_deliver(Zone)),
@ -1211,8 +1210,7 @@ enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBri
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Enrich ConnAck Caps %% Enrich ConnAck Caps
enrich_connack_caps(AckProps, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}, enrich_connack_caps(AckProps, ?IS_MQTT_V5 = #channel{clientinfo = #{zone := Zone}}) ->
clientinfo = #{zone := Zone}}) ->
#{max_packet_size := MaxPktSize, #{max_packet_size := MaxPktSize,
max_qos_allowed := MaxQoS, max_qos_allowed := MaxQoS,
retain_available := Retain, retain_available := Retain,
@ -1420,8 +1418,8 @@ shutdown(success, Reply, Packet, Channel) ->
shutdown(Reason, Reply, Packet, Channel) -> shutdown(Reason, Reply, Packet, Channel) ->
{shutdown, Reason, Reply, Packet, Channel}. {shutdown, Reason, Reply, Packet, Channel}.
disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = connected, disconnect_and_shutdown(Reason, Reply, Channel = ?IS_MQTT_V5
conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) -> = #channel{conn_state = connected}) ->
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel); shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel);
disconnect_and_shutdown(Reason, Reply, Channel) -> disconnect_and_shutdown(Reason, Reply, Channel) ->