From 749ef823abaebb4b9e969b9bb4c60efc88d7860a Mon Sep 17 00:00:00 2001 From: tigercl Date: Mon, 19 Aug 2019 10:50:52 +0800 Subject: [PATCH] Refix code about mqtt spec (#2806) Refix code about mqtt spec --- src/emqx_channel.erl | 19 ++++++------------- src/emqx_cm.erl | 4 ++-- src/emqx_connection.erl | 3 ++- 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index b9b18cd8d..57ff91a16 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -280,7 +280,7 @@ handle_in(?CONNECT_PACKET( handle_out(connack, ReasonCode, NChannel) end; -handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel = #channel{proto_ver = Ver}) -> +handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), Channel = #channel{proto_ver = Ver}) -> case pipeline([fun validate_in/2, fun process_alias/2, fun check_publish/2], Packet, Channel) of @@ -289,11 +289,7 @@ handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel = #channel{pro {error, ReasonCode, NChannel} -> ?LOG(warning, "Cannot publish message to ~s due to ~s", [Topic, emqx_reason_codes:text(ReasonCode, Ver)]), - case QoS of - ?QOS_0 -> handle_out(puberr, ReasonCode, NChannel); - ?QOS_1 -> handle_out(puback, {PacketId, ReasonCode}, NChannel); - ?QOS_2 -> handle_out(pubrec, {PacketId, ReasonCode}, NChannel) - end + handle_out(disconnect, ReasonCode, NChannel) end; %%TODO: How to handle the ReasonCode? @@ -845,13 +841,10 @@ check_will_retain(#mqtt_packet_connect{will_retain = true}, %%-------------------------------------------------------------------- enrich_client(ConnPkt, Channel) -> - case pipeline([fun set_username/2, - fun maybe_use_username_as_clientid/2, - fun maybe_assign_clientid/2, - fun set_rest_client_fields/2], ConnPkt, Channel) of - {ok, NConnPkt, NChannel} -> {ok, NConnPkt, NChannel}; - Error -> Error - end. + pipeline([fun set_username/2, + fun maybe_use_username_as_clientid/2, + fun maybe_assign_clientid/2, + fun set_rest_client_fields/2], ConnPkt, Channel). maybe_use_username_as_clientid(_ConnPkt, Channel = #channel{client = #{username := undefined}}) -> {ok, Channel}; diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index dbfb38d0d..c393d6ab0 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -189,7 +189,7 @@ takeover_session(ClientId) -> takeover_session(ClientId, ChanPid); ChanPids -> [ChanPid|StalePids] = lists:reverse(ChanPids), - ?LOG(error, "[SM] More than one channel found: ~p", [ChanPids]), + ?LOG(error, "More than one channel found: ~p", [ChanPids]), lists:foreach(fun(StalePid) -> catch discard_session(ClientId, StalePid) end, StalePids), @@ -220,7 +220,7 @@ discard_session(ClientId) when is_binary(ClientId) -> discard_session(ClientId, ChanPid) catch _:Error:_Stk -> - ?LOG(error, "[SM] Failed to discard ~p: ~p", [ChanPid, Error]) + ?LOG(error, "Failed to discard ~p: ~p", [ChanPid, Error]) end end, ChanPids) end. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index cfb743a25..5ff534a2b 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -257,7 +257,8 @@ connected(enter, _PrevSt, State = #state{chan_state = ChanState}) -> connected(cast, {incoming, Packet = ?PACKET(?CONNECT)}, State) -> ?LOG(warning, "Unexpected connect: ~p", [Packet]), - shutdown(unexpected_incoming_connect, State); + Shutdown = fun(NewSt) -> shutdown(?RC_PROTOCOL_ERROR, NewSt) end, + handle_outgoing(?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), Shutdown, State); connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> handle_incoming(Packet, fun keep_state/1, State);