diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index b9bf6b55e..3021e913a 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -43,11 +43,7 @@ -define(QOS_1, 1). %% At least once -define(QOS_2, 2). %% Exactly once --define(QOS0, 0). %% At most once --define(QOS1, 1). %% At least once --define(QOS2, 2). %% Exactly once - --define(IS_QOS(I), (I >= ?QOS0 andalso I =< ?QOS2)). +-define(IS_QOS(I), (I >= ?QOS_0 andalso I =< ?QOS_2)). -define(QOS_I(Name), begin diff --git a/src/emqx_auth_mod.erl b/src/emqx_auth_mod.erl index 8f03eb4fa..00ecb659a 100644 --- a/src/emqx_auth_mod.erl +++ b/src/emqx_auth_mod.erl @@ -39,4 +39,3 @@ behaviour_info(_Other) -> undefined. -endif. - diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 461564bd2..a4dc840cc 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -30,11 +30,11 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {client_pid, options, reconnect_interval, +-record(state, {client_pid, options, reconnect_interval, mountpoint, queue, mqueue_type, max_pending_messages, forwards = [], subscriptions = []}). --record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false, +-record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false, packet_id, topic, props, payload}). start_link(Name, Options) -> diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 22c37d26f..3825583bc 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -95,7 +95,7 @@ | {force_ping, boolean()} | {properties, properties()}). --record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false, +-record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false, packet_id, topic, props, payload}). -type(mqtt_msg() :: #mqtt_msg{}). @@ -1217,7 +1217,7 @@ retry_send([{Type, Msg, Ts} | Msgs], Now, State = #state{retry_interval = Interv retry_send(publish, Msg = #mqtt_msg{qos = QoS, packet_id = PacketId}, Now, State = #state{inflight = Inflight}) -> - Msg1 = Msg#mqtt_msg{dup = (QoS =:= ?QOS1)}, + Msg1 = Msg#mqtt_msg{dup = (QoS =:= ?QOS_1)}, case send(Msg1, State) of {ok, NewState} -> Inflight1 = emqx_inflight:update(PacketId, {publish, Msg1, Now}, Inflight), diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 91e5d4d59..e6c34dc9d 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -34,7 +34,7 @@ make(Topic, Payload) -> -spec(make(atom() | emqx_types:client_id(), emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()). make(From, Topic, Payload) -> - make(From, ?QOS0, Topic, Payload). + make(From, ?QOS_0, Topic, Payload). -spec(make(atom() | emqx_types:client_id(), emqx_mqtt_types:qos(), emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()). @@ -47,7 +47,7 @@ make(From, QoS, Topic, Payload) -> payload = Payload, timestamp = os:timestamp()}. -msgid(?QOS0) -> undefined; +msgid(?QOS_0) -> undefined; msgid(_QoS) -> emqx_guid:gen(). set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) -> diff --git a/src/emqx_mqtt_types.erl b/src/emqx_mqtt_types.erl index 6beb17780..71451cca7 100644 --- a/src/emqx_mqtt_types.erl +++ b/src/emqx_mqtt_types.erl @@ -22,7 +22,7 @@ -export_type([topic_filters/0]). -export_type([packet_id/0, packet_type/0, packet/0]). --type(qos() :: ?QOS0 | ?QOS1 | ?QOS2). +-type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2). -type(version() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 | ?MQTT_PROTO_V5). -type(qos_name() :: qos0 | at_most_once | qos1 | at_least_once | @@ -40,4 +40,3 @@ }). -type(topic_filters() :: [{emqx_topic:topic(), subopts()}]). -type(packet() :: #mqtt_packet{}). - diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 90fe59ba8..56390d412 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -89,13 +89,13 @@ -opaque(mqueue() :: #mqueue{}). -spec(init(options()) -> mqueue()). -init(Opts = #{max_len := MaxLen0, store_qos0 := QoS0}) -> +init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of true -> MaxLen0; false -> ?MAX_LEN_INFINITY end, #mqueue{max_len = MaxLen, - store_qos0 = QoS0, + store_qos0 = QoS_0, p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE), default_p = get_priority_opt(Opts) }. @@ -165,4 +165,3 @@ get_priority_opt(Opts) -> %% while the highest 'infinity' is a [{infinity, queue:queue()}] get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY; get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp). - diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 2040e595e..6f430c5f2 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -103,7 +103,7 @@ validate_properties(_, _) -> validate_subscription({Topic, #{qos := QoS}}) -> emqx_topic:validate(filter, Topic) andalso validate_qos(QoS). -validate_qos(QoS) when ?QOS0 =< QoS, QoS =< ?QOS2 -> +validate_qos(QoS) when ?QOS_0 =< QoS, QoS =< ?QOS_2 -> true; validate_qos(_) -> error(bad_qos). diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index 75118b563..bb0d8bb0b 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -138,5 +138,5 @@ compat(connack, 16#9C) -> ?CONNACK_SERVER; compat(connack, 16#9D) -> ?CONNACK_SERVER; compat(connack, 16#9F) -> ?CONNACK_SERVER; -compat(suback, Code) when Code =< ?QOS2 -> Code; +compat(suback, Code) when Code =< ?QOS_2 -> Code; compat(suback, Code) when Code >= 16#80 -> 16#80. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 85b753f3d..cd7e80c84 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -517,7 +517,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight handle_cast({resume, #{conn_pid := ConnPid, will_msg := WillMsg, expiry_interval := SessionExpiryInterval, - max_inflight := MaxInflight, + max_inflight := MaxInflight, topic_alias_maximum := TopicAliasMaximum}}, State = #state{client_id = ClientId, conn_pid = OldConnPid, clean_start = CleanStart, @@ -547,7 +547,7 @@ handle_cast({resume, #{conn_pid := ConnPid, await_rel_timer = undefined, expiry_timer = undefined, expiry_interval = SessionExpiryInterval, - inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight), + inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight), topic_alias_maximum = TopicAliasMaximum, will_delay_timer = undefined, will_msg = WillMsg}, @@ -574,10 +574,10 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> end, State, Msgs)}; %% Dispatch message -handle_info({dispatch, Topic, Msg = #message{headers = Headers}}, +handle_info({dispatch, Topic, Msg = #message{headers = Headers}}, State = #state{subscriptions = SubMap, topic_alias_maximum = TopicAliasMaximum}) when is_record(Msg, message) -> TopicAlias = maps:get('Topic-Alias', Headers, undefined), - if + if TopicAlias =:= undefined orelse TopicAlias =< TopicAliasMaximum -> noreply(case maps:find(Topic, SubMap) of {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> @@ -802,13 +802,13 @@ dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) -> end; %% Deliver qos0 message directly to client -dispatch(Msg = #message{qos = ?QOS0} = Msg, State) -> +dispatch(Msg = #message{qos = ?QOS_0} = Msg, State) -> deliver(undefined, Msg, State), inc_stats(deliver, Msg, State); dispatch(Msg = #message{qos = QoS} = Msg, State = #state{next_pkt_id = PacketId, inflight = Inflight}) - when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> + when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case emqx_inflight:is_full(Inflight) of true -> enqueue_msg(Msg, State); false -> @@ -825,7 +825,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q}) -> %%------------------------------------------------------------------------------ redeliver({PacketId, Msg = #message{qos = QoS}}, State) -> - deliver(PacketId, if QoS =:= ?QOS2 -> Msg; + deliver(PacketId, if QoS =:= ?QOS_2 -> Msg; true -> emqx_message:set_flag(dup, Msg) end, State); @@ -974,4 +974,3 @@ noreply(State) -> shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. - diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 1127f60d9..a4e64c7d0 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -100,7 +100,7 @@ serialize_parse_connect(_) -> ?assertEqual({ok, Packet1, <<>>}, parse_serialize(Packet1)), Packet2 = ?CONNECT_PACKET(#mqtt_packet_connect{ client_id = <<"clientId">>, - will_qos = ?QOS1, + will_qos = ?QOS_1, will_flag = true, will_retain = true, will_topic = <<"will">>, @@ -427,4 +427,3 @@ parse(Bin, Opts) when is_map(Opts) -> payload() -> iolist_to_binary(["payload." || _I <- lists:seq(1, 1000)]). - diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index 37207e40c..3bb06d14c 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -35,7 +35,7 @@ all() -> message_make(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), ?assertEqual(0, Msg#message.qos), - Msg1 = emqx_message:make(<<"clientid">>, ?QOS2, <<"topic">>, <<"payload">>), + Msg1 = emqx_message:make(<<"clientid">>, ?QOS_2, <<"topic">>, <<"payload">>), ?assert(is_binary(Msg1#message.id)), ?assertEqual(2, Msg1#message.qos). diff --git a/test/emqx_mountpoint_SUITE.erl b/test/emqx_mountpoint_SUITE.erl index 61d8d3652..a77baf751 100644 --- a/test/emqx_mountpoint_SUITE.erl +++ b/test/emqx_mountpoint_SUITE.erl @@ -28,8 +28,8 @@ t_mount_unmount(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), Msg2 = emqx_mountpoint:mount(<<"mount">>, Msg), ?assertEqual(<<"mounttopic">>, Msg2#message.topic), - TopicFilter = [{<<"mounttopic">>, #{qos => ?QOS2}}], - TopicFilter = emqx_mountpoint:mount(<<"mount">>, [{<<"topic">>, #{qos => ?QOS2}}]), + TopicFilter = [{<<"mounttopic">>, #{qos => ?QOS_2}}], + TopicFilter = emqx_mountpoint:mount(<<"mount">>, [{<<"topic">>, #{qos => ?QOS_2}}]), Msg = emqx_mountpoint:unmount(<<"mount">>, Msg2). t_replvar(_) -> diff --git a/test/emqx_packet_SUITE.erl b/test/emqx_packet_SUITE.erl index dda27c45b..42fcbc9d7 100644 --- a/test/emqx_packet_SUITE.erl +++ b/test/emqx_packet_SUITE.erl @@ -44,7 +44,7 @@ packet_type_name(_) -> ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). packet_validate(_) -> - ?assert(emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, [{<<"topic">>, #{qos => ?QOS0}}]))), + ?assert(emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, [{<<"topic">>, #{qos => ?QOS_0}}]))), ?assert(emqx_packet:validate(?UNSUBSCRIBE_PACKET(89, [<<"topic">>]))), ?assert(emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{}))), ?assert(emqx_packet:validate(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1}, <<"payload">>))), @@ -52,7 +52,7 @@ packet_validate(_) -> ?assertError(subscription_identifier_invalid, emqx_packet:validate( ?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => -1}, - [{<<"topic">>, #{qos => ?QOS0}}]))), + [{<<"topic">>, #{qos => ?QOS_0}}]))), ?assertError(topic_filters_invalid, emqx_packet:validate(?UNSUBSCRIBE_PACKET(1,[]))), ?assertError(topic_name_invalid, @@ -90,14 +90,14 @@ packet_validate(_) -> packet_message(_) -> Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - qos = ?QOS0, + qos = ?QOS_0, retain = false, dup = false}, variable = #mqtt_packet_publish{topic_name = <<"topic">>, packet_id = 10, properties = #{}}, payload = <<"payload">>}, - Msg = emqx_message:make(<<"clientid">>, ?QOS0, <<"topic">>, <<"payload">>), + Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>), Msg2 = emqx_message:set_flag(retain, false, Msg), Pkt = emqx_packet:from_message(10, Msg2), Msg3 = emqx_message:set_header(username, "test", Msg2), @@ -112,8 +112,8 @@ packet_format(_) -> io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]), - io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]), - io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]), + io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))]), + io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]), io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). @@ -122,7 +122,7 @@ packet_will_msg(_) -> client_id = <<"clientid">>, username = "test", will_retain = true, - will_qos = ?QOS2, + will_qos = ?QOS_2, will_topic = <<"topic">>, will_props = #{}, will_payload = <<"payload">>}, diff --git a/test/emqx_protocol_SUITE.erl.bk b/test/emqx_protocol_SUITE.erl.bk deleted file mode 100644 index f2d6d306a..000000000 --- a/test/emqx_protocol_SUITE.erl.bk +++ /dev/null @@ -1,147 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_protocol_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include("emqx.hrl"). - --include("emqx_mqtt.hrl"). - --include_lib("eunit/include/eunit.hrl"). - --import(emqx_serializer, [serialize/1]). - -all() -> - [{group, parser}, - {group, serializer}, - {group, packet}, - {group, message}]. - -groups() -> - [{parser, [], - [parse_connect, - parse_bridge, - parse_publish, - parse_puback, - parse_pubrec, - parse_pubrel, - parse_pubcomp, - parse_subscribe, - parse_unsubscribe, - parse_pingreq, - parse_disconnect]}, - {serializer, [], - [serialize_connect, - serialize_connack, - serialize_publish, - serialize_puback, - serialize_pubrel, - serialize_subscribe, - serialize_suback, - serialize_unsubscribe, - serialize_unsuback, - serialize_pingreq, - serialize_pingresp, - serialize_disconnect]}, - {packet, [], - [packet_proto_name, - packet_type_name, - packet_connack_name, - packet_format]}, - {message, [], - [message_make, - message_from_packet, - message_flag]}]. - - - -%%-------------------------------------------------------------------- -%% Packet Cases -%%-------------------------------------------------------------------- - -packet_proto_name(_) -> - ?assertEqual(<<"MQIsdp">>, emqx_packet:protocol_name(3)), - ?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(4)). - -packet_type_name(_) -> - ?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)), - ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). - -packet_connack_name(_) -> - ?assertEqual('CONNACK_ACCEPT', emqx_packet:connack_name(?CONNACK_ACCEPT)), - ?assertEqual('CONNACK_PROTO_VER', emqx_packet:connack_name(?CONNACK_PROTO_VER)), - ?assertEqual('CONNACK_INVALID_ID', emqx_packet:connack_name(?CONNACK_INVALID_ID)), - ?assertEqual('CONNACK_SERVER', emqx_packet:connack_name(?CONNACK_SERVER)), - ?assertEqual('CONNACK_CREDENTIALS', emqx_packet:connack_name(?CONNACK_CREDENTIALS)), - ?assertEqual('CONNACK_AUTH', emqx_packet:connack_name(?CONNACK_AUTH)). - -packet_format(_) -> - io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]), - io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), - io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]), - io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), - io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), - io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]), - io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]), - io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]), - io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), - io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). - -%%-------------------------------------------------------------------- -%% Message Cases -%%-------------------------------------------------------------------- - -message_make(_) -> - Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), - ?assertEqual(0, Msg#mqtt_message.qos), - Msg1 = emqx_message:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>), - ?assert(is_binary(Msg1#mqtt_message.id)), - ?assertEqual(2, Msg1#mqtt_message.qos). - -message_from_packet(_) -> - Msg = emqx_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)), - ?assertEqual(1, Msg#mqtt_message.qos), - ?assertEqual(10, Msg#mqtt_message.pktid), - ?assertEqual(<<"topic">>, Msg#mqtt_message.topic), - WillMsg = emqx_message:from_packet(#mqtt_packet_connect{will_flag = true, - will_topic = <<"WillTopic">>, - will_msg = <<"WillMsg">>}), - ?assertEqual(<<"WillTopic">>, WillMsg#mqtt_message.topic), - ?assertEqual(<<"WillMsg">>, WillMsg#mqtt_message.payload), - - Msg2 = emqx_message:from_packet(<<"username">>, <<"clientid">>, - ?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)), - ?assertEqual({<<"clientid">>, <<"username">>}, Msg2#mqtt_message.from), - io:format("~s", [emqx_message:format(Msg2)]). - -message_flag(_) -> - Pkt = ?PUBLISH_PACKET(1, <<"t">>, 2, <<"payload">>), - Msg2 = emqx_message:from_packet(<<"clientid">>, Pkt), - Msg3 = emqx_message:set_flag(retain, Msg2), - Msg4 = emqx_message:set_flag(dup, Msg3), - ?assert(Msg4#mqtt_message.dup), - ?assert(Msg4#mqtt_message.retain), - Msg5 = emqx_message:set_flag(Msg4), - Msg6 = emqx_message:unset_flag(dup, Msg5), - Msg7 = emqx_message:unset_flag(retain, Msg6), - ?assertNot(Msg7#mqtt_message.dup), - ?assertNot(Msg7#mqtt_message.retain), - emqx_message:unset_flag(Msg7), - emqx_message:to_packet(Msg7). -