diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index fdc29fae8..b8b7a5b3a 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -43,7 +43,9 @@ {mqtt_wildcard_subscription, true}]). -define(PUBCAP_KEYS, [max_qos_allowed, - mqtt_retain_available]). + mqtt_retain_available, + mqtt_topic_alias + ]). -define(SUBCAP_KEYS, [max_qos_allowed, max_topic_levels, mqtt_shared_subscription, @@ -60,8 +62,15 @@ do_check_pub(Props = #{qos := QoS}, [{max_qos_allowed, MaxQoS}|Caps]) -> true -> {error, ?RC_QOS_NOT_SUPPORTED}; false -> do_check_pub(Props, Caps) end; +do_check_pub(Props = #{ topic_alias := TopicAlias}, [{max_topic_alias, MaxTopicAlias}| Caps]) -> + case TopicAlias =< MaxTopicAlias andalso TopicAlias > 0 of + false -> {error, ?RC_TOPIC_ALIAS_INVALID}; + true -> do_check_pub(Props, Caps) + end; do_check_pub(#{retain := true}, [{mqtt_retain_available, false}|_Caps]) -> {error, ?RC_RETAIN_NOT_SUPPORTED}; +do_check_pub(Props, [{max_topic_alias, _} | Caps]) -> + do_check_pub(Props, Caps); do_check_pub(Props, [{mqtt_retain_available, _}|Caps]) -> do_check_pub(Props, Caps). @@ -136,4 +145,3 @@ with_env(Zone, Key, InitFun) -> Caps; ZoneCaps -> ZoneCaps end. - diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 715526964..fc90cf492 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -72,9 +72,11 @@ validate_packet_id(_) -> validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I}) when I =< 0; I >= 16#FFFFFFF -> error(subscription_identifier_invalid); -validate_properties(?PUBLISH, # {'Topic-Alias':= I}) +validate_properties(?PUBLISH, #{'Topic-Alias':= I}) when I =:= 0 -> error(topic_alias_invalid); +validate_properties(?PUBLISH, #{'Subscription-Identifier' := _I}) -> + error(protocol_error); validate_properties(_, _) -> true. @@ -236,4 +238,3 @@ format_password(_Password) -> '******'. i(true) -> 1; i(false) -> 0; i(I) when is_integer(I) -> I. - diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 39929513e..7e95a4d90 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -33,36 +33,36 @@ -export([shutdown/2]). -record(pstate, { - zone, - sendfun, - peername, - peercert, - proto_ver, - proto_name, - ackprops, - client_id, - is_assigned, - conn_pid, - conn_props, - ack_props, - username, - session, - clean_start, - topic_aliases, - packet_size, - will_topic, - will_msg, - keepalive, - mountpoint, - is_super, - is_bridge, - enable_ban, - enable_acl, - recv_stats, - send_stats, - connected, - connected_at - }). + zone, + sendfun, + peername, + peercert, + proto_ver, + proto_name, + ackprops, + client_id, + is_assigned, + conn_pid, + conn_props, + ack_props, + username, + session, + clean_start, + topic_aliases, + packet_size, + will_topic, + will_msg, + keepalive, + mountpoint, + is_super, + is_bridge, + enable_ban, + enable_acl, + recv_stats, + send_stats, + connected, + connected_at + }). -type(state() :: #pstate{}). -export_type([state/0]). @@ -315,9 +315,12 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PSt case check_publish(Packet, PState) of {ok, PState1} -> do_publish(Packet, PState1); + {error, ?RC_TOPIC_ALIAS_INVALID} -> + ?LOG(error, "Protocol error - ~p", [?RC_TOPIC_ALIAS_INVALID], PState), + {error, ?RC_TOPIC_ALIAS_INVALID, PState}; {error, ReasonCode} -> ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", [Topic, ReasonCode], PState), - {ok, PState} + {error, ReasonCode, PState} end; process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) -> @@ -644,9 +647,11 @@ check_publish(Packet, PState) -> run_check_steps([fun check_pub_caps/2, fun check_pub_acl/2], Packet, PState). -check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = R}}, +check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Retain}, + variable = #mqtt_packet_publish{ properties = Properties}}, #pstate{zone = Zone}) -> - emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => R}). + #{'Topic-Alias' := TopicAlias} = Properties, + emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic_alias => TopicAlias}). check_pub_acl(_Packet, #pstate{is_super = IsSuper, enable_acl = EnableAcl}) when IsSuper orelse (not EnableAcl) -> diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index 85f6fae1d..d5751f9bb 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -38,7 +38,7 @@ t_get_set_caps(_) -> mqtt_wildcard_subscription => true }, Caps2 = Caps#{max_packet_size => 1048576}, - case emqx_mqtt_caps:get_caps(zone) of + case emqx_mqtt_caps:get_caps(zone) of Caps -> ok; Caps2 -> ok end, @@ -64,20 +64,28 @@ t_check_pub(_) -> {ok, _} = emqx_zone:start_link(), PubCaps = #{ max_qos_allowed => ?QOS_1, - mqtt_retain_available => false + mqtt_retain_available => false, + max_topic_alias => 4 }, emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps), timer:sleep(100), + ct:log("~p", [emqx_mqtt_caps:get_caps(zone, publish)]), BadPubProps1 = #{ qos => ?QOS_2, retain => false - }, + }, {error, ?RC_QOS_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps1), BadPubProps2 = #{ qos => ?QOS_1, retain => true - }, + }, {error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps2), + BadPubProps3 = #{ + qos => ?QOS_1, + retain => false, + topic_alias => 5 + }, + {error, ?RC_TOPIC_ALIAS_INVALID} = emqx_mqtt_caps:check_pub(zone, BadPubProps3), PubProps = #{ qos => ?QOS_1, retain => false @@ -96,19 +104,19 @@ t_check_sub(_) -> mqtt_wildcard_subscription => true }, - ok = do_check_sub([{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts}]), + ok = do_check_sub([{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts}]), ok = do_check_sub(Caps#{max_qos_allowed => ?QOS_1}, [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{qos => ?QOS_1}}]), - ok = do_check_sub(Caps#{max_topic_levels => 1}, - [{<<"client/stat">>, Opts}], + ok = do_check_sub(Caps#{max_topic_levels => 1}, + [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{rc => ?RC_TOPIC_FILTER_INVALID}}]), - ok = do_check_sub(Caps#{mqtt_shared_subscription => false}, - [{<<"client/stat">>, Opts}], + ok = do_check_sub(Caps#{mqtt_shared_subscription => false}, + [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{rc => ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}}]), + ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false}, [{<<"vlient/+/dsofi">>, Opts}], [{<<"vlient/+/dsofi">>, Opts#{rc => ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}}]), emqx_zone:stop(). -