From b10f49b52cdca554f48d3c5f71f98492dfa85725 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 25 Aug 2018 11:49:19 +0800 Subject: [PATCH 1/8] Add CONNACK macros for MQTT V3.1.1 --- include/emqx_mqtt.hrl | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index e429aa4a3..1d69ac365 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -140,7 +140,20 @@ -type(mqtt_packet_type() :: ?RESERVED..?AUTH). %%-------------------------------------------------------------------- -%% MQTT Reason Codes +%% MQTT V3.1.1 Connect Return Codes +%%-------------------------------------------------------------------- + +-define(CONNACK_ACCEPT, 0). %% Connection accepted +-define(CONNACK_PROTO_VER, 1). %% Unacceptable protocol version +-define(CONNACK_INVALID_ID, 2). %% Client Identifier is correct UTF-8 but not allowed by the Server +-define(CONNACK_SERVER, 3). %% Server unavailable +-define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed +-define(CONNACK_AUTH, 5). %% Client is not authorized to connect + +-type(mqtt_connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH). + +%%-------------------------------------------------------------------- +%% MQTT V5.0 Reason Codes %%-------------------------------------------------------------------- -define(RC_SUCCESS, 16#00). From 5f42f8840193640dae371a8af63f66431d63ef2d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 25 Aug 2018 11:51:36 +0800 Subject: [PATCH 2/8] Pass paho zero_length_clientid test case --- src/emqx_protocol.erl | 31 ++++++++++++++++++++----------- src/emqx_reason_codes.erl | 21 +++++++++++++++++++++ 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 96925279f..650da4d1c 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -73,6 +73,7 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) peercert = Peercert, proto_ver = ?MQTT_PROTO_V4, proto_name = <<"MQTT">>, + client_id = <<>>, client_pid = self(), username = init_username(Peercert, Options), is_super = false, @@ -201,6 +202,8 @@ process(?CONNECT_PACKET( username = Username, password = Password} = Connect), PState) -> + io:format("~p~n", [Connect]), + PState1 = set_username(Username, PState#pstate{client_id = ClientId, proto_ver = ProtoVer, @@ -334,8 +337,12 @@ process(?PACKET(?DISCONNECT), PState) -> connack({?RC_SUCCESS, SP, PState}) -> deliver({connack, ?RC_SUCCESS, sp(SP)}, PState); -connack({ReasonCode, PState}) -> - deliver({connack, ReasonCode, 0}, PState), +connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> + _ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 -> + ReasonCode; + true -> + emqx_reason_codes:compat(connack, ReasonCode) + end}, PState), {error, emqx_reason_codes:name(ReasonCode), PState}. %%------------------------------------------------------------------------------ @@ -415,7 +422,7 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, %% Assign a clientid maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) -> - ClientId = iolist_to_binary(["emqx_", emqx_guid:gen()]), + ClientId = emqx_guid:to_base62(emqx_guid:gen()), AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps), PState#pstate{client_id = ClientId, ackprops = AckProps1}; maybe_assign_client_id(PState) -> @@ -464,18 +471,20 @@ check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, false -> {error, ?RC_PROTOCOL_ERROR} end. -%% Issue#599: Null clientId and clean_start = false -check_client_id(#mqtt_packet_connect{client_id = ClientId, - clean_start = false}, _PState) - when ClientId == undefined; ClientId == <<>> -> - {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; - %% MQTT3.1 does not allow null clientId check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3, - client_id = ClientId}, _PState) - when ClientId == undefined; ClientId == <<>> -> + client_id = <<>>}, _PState) -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; +%% Issue#599: Null clientId and clean_start = false +check_client_id(#mqtt_packet_connect{client_id = <<>>, + clean_start = false}, _PState) -> + {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; + +check_client_id(#mqtt_packet_connect{client_id = <<>>, + clean_start = true}, _PState) -> + ok; + check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone}) -> Len = byte_size(ClientId), MaxLen = emqx_zone:get_env(Zone, max_clientid_len), diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index 335be3bd4..b777b2627 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -15,7 +15,10 @@ %% @doc MQTT5 reason codes -module(emqx_reason_codes). +-include("emqx_mqtt.hrl"). + -export([name/1, text/1]). +-export([compat/2]). name(16#00) -> success; name(16#01) -> granted_qos1; @@ -107,3 +110,21 @@ text(16#A1) -> <<"Subscription Identifiers not supported">>; text(16#A2) -> <<"Wildcard Subscriptions not supported">>; text(Code) -> iolist_to_binary(["Unkown Reason Code:", integer_to_list(Code)]). +compat(connack, 16#80) -> ?CONNACK_PROTO_VER; +compat(connack, 16#81) -> ?CONNACK_PROTO_VER; +compat(connack, 16#82) -> ?CONNACK_PROTO_VER; +compat(connack, 16#83) -> ?CONNACK_PROTO_VER; +compat(connack, 16#84) -> ?CONNACK_PROTO_VER; +compat(connack, 16#85) -> ?CONNACK_INVALID_ID; +compat(connack, 16#86) -> ?CONNACK_CREDENTIALS; +compat(connack, 16#87) -> ?CONNACK_AUTH; +compat(connack, 16#88) -> ?CONNACK_SERVER; +compat(connack, 16#89) -> ?CONNACK_SERVER; +compat(connack, 16#8A) -> ?CONNACK_AUTH; +compat(connack, 16#8B) -> ?CONNACK_SERVER; +compat(connack, 16#8C) -> ?CONNACK_AUTH; +compat(connack, 16#97) -> ?CONNACK_SERVER; +compat(connack, 16#9C) -> ?CONNACK_SERVER; +compat(connack, 16#9D) -> ?CONNACK_SERVER; +compat(connack, 16#9F) -> ?CONNACK_SERVER. + From fc0f57073d75a4ba11c44a3aec1c6c287ea1024e Mon Sep 17 00:00:00 2001 From: turtled Date: Sat, 25 Aug 2018 14:32:32 +0800 Subject: [PATCH 3/8] Fix share sub dispatch fail --- src/emqx_shared_sub.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 5194de9d4..e8a9fac10 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -81,7 +81,7 @@ record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. %% TODO: dispatch strategy, ensure the delivery... -dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) -> +dispatch({Group, _Node}, Topic, Delivery = #delivery{message = Msg, flows = Flows}) -> case pick(subscribers(Group, Topic)) of false -> Delivery; SubPid -> SubPid ! {dispatch, Topic, Msg}, @@ -98,7 +98,6 @@ pick(SubPids) -> subscribers(Group, Topic) -> ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). - %%----------------------------------------------------------------------------- %% gen_server callbacks %%----------------------------------------------------------------------------- From 4af606598484ba8cdc2d443a00a76a9e62016887 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 25 Aug 2018 16:02:44 +0800 Subject: [PATCH 4/8] For paho interoperability tests --- etc/acl.conf.paho | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 etc/acl.conf.paho diff --git a/etc/acl.conf.paho b/etc/acl.conf.paho new file mode 100644 index 000000000..5beec4347 --- /dev/null +++ b/etc/acl.conf.paho @@ -0,0 +1,14 @@ +%%-------------------------------------------------------------------- +%% For paho interoperability test cases +%%-------------------------------------------------------------------- + +{deny, {client, "myclientid"}, subscribe, ["test/nosubscribe"]}. + +{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}. + +{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}. + +{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}. + +{allow, all}. + From 1aee05ce16fa3a1b38eafc2b0513a577991bab80 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 25 Aug 2018 16:03:28 +0800 Subject: [PATCH 5/8] Fix unsubscribe bug --- src/emqx_session.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 44c8f183f..db7e80a9b 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -433,7 +433,7 @@ handle_cast({subscribe, From, {PacketId, Properties, TopicFilters}}, handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> {ReasonCodes, Subscriptions1} = - lists:foldr(fun(Topic, {RcAcc, SubMap}) -> + lists:foldr(fun({Topic, _Opts}, {RcAcc, SubMap}) -> case maps:find(Topic, SubMap) of {ok, SubOpts} -> emqx_broker:unsubscribe(Topic, ClientId), @@ -649,7 +649,6 @@ retry_delivery(Force, State = #state{inflight = Inflight}) -> State; false -> Msgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)), - io:format("!!! Retry Delivery: ~p~n", [Msgs]), retry_delivery(Force, Msgs, os:timestamp(), State) end. From 612c88e71e333adf9ac261e9e0baf243e3fc8209 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 25 Aug 2018 16:04:21 +0800 Subject: [PATCH 6/8] Add 'rc' and 'subid' fields --- src/emqx_frame.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 82db0acf5..59a195e33 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -331,7 +331,7 @@ parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> {Value + Len * Multiplier, Rest}. parse_topic_filters(subscribe, Bin) -> - [{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS}} + [{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS, rc => 0, subid => 0}} || <> <= Bin]; parse_topic_filters(unsubscribe, Bin) -> From c2c1320083db5f65b7d8f96eaa11df0e37356412 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 25 Aug 2018 16:05:58 +0800 Subject: [PATCH 7/8] Update compat/2 for suback reason codes --- src/emqx_reason_codes.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index b777b2627..f300d675d 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -126,5 +126,8 @@ compat(connack, 16#8C) -> ?CONNACK_AUTH; compat(connack, 16#97) -> ?CONNACK_SERVER; compat(connack, 16#9C) -> ?CONNACK_SERVER; compat(connack, 16#9D) -> ?CONNACK_SERVER; -compat(connack, 16#9F) -> ?CONNACK_SERVER. +compat(connack, 16#9F) -> ?CONNACK_SERVER; + +compat(suback, Code) when Code =< ?QOS2 -> Code; +compat(suback, Code) when Code > 16#80 -> 16#80. From b7c2821326112ef7cf62ad39816ae20cbfc5153f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 25 Aug 2018 16:07:57 +0800 Subject: [PATCH 8/8] Make reason codes of SUBACK be compatible with MQTT V3.1.1 --- src/emqx_protocol.erl | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 650da4d1c..3faa7781a 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -391,8 +391,13 @@ deliver({pubrel, PacketId}, PState) -> deliver({pubrec, PacketId, ReasonCode}, PState) -> send(?PUBREC_PACKET(PacketId, ReasonCode), PState); -deliver({suback, PacketId, ReasonCodes}, PState) -> - send(?SUBACK_PACKET(PacketId, ReasonCodes), PState); +deliver({suback, PacketId, ReasonCodes}, PState = #pstate{proto_ver = ProtoVer}) -> + send(?SUBACK_PACKET(PacketId, + if ProtoVer =:= ?MQTT_PROTO_V5 -> + ReasonCodes; + true -> + [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes] + end), PState); deliver({unsuback, PacketId, ReasonCodes}, PState) -> send(?UNSUBACK_PACKET(PacketId, ReasonCodes), PState); @@ -408,12 +413,12 @@ deliver({disconnect, _ReasonCode}, PState) -> %% Send Packet to Client -spec(send(mqtt_packet(), state()) -> {ok, state()} | {error, term()}). -send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, - sendfun = SendFun}) -> +send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) -> + trace(send, Packet, PState), case SendFun(emqx_frame:serialize(Packet, #{version => Ver})) of - ok -> emqx_metrics:sent(Packet), - trace(send, Packet, PState), - {ok, inc_stats(send, Type, PState)}; + ok -> + emqx_metrics:sent(Packet), + {ok, inc_stats(send, Type, PState)}; {error, Reason} -> {error, Reason} end.