From bc31faac6b8f8f5623b8ca1be98e49e97ec87705 Mon Sep 17 00:00:00 2001 From: tigercl Date: Thu, 17 Jan 2019 13:57:42 +0800 Subject: [PATCH] Fix will msg (#2156) * Remove will_msg and will_topic from protocol state * Modify try_open_session/1 --- src/emqx_protocol.erl | 54 ++++++++++++++----------------- src/emqx_session.erl | 16 +++++++-- test/emqx_connection_SUITE.erl | 1 - test/emqx_protocol_SUITE.erl | 18 +++++------ test/emqx_ws_connection_SUITE.erl | 1 - 5 files changed, 47 insertions(+), 43 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index f82e8dcad..87bf50d08 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -50,8 +50,6 @@ clean_start, topic_aliases, packet_size, - will_topic, - will_msg, keepalive, mountpoint, is_super, @@ -130,13 +128,11 @@ info(PState = #pstate{conn_props = ConnProps, ack_props = AckProps, session = Session, topic_aliases = Aliases, - will_msg = WillMsg, enable_acl = EnableAcl}) -> attrs(PState) ++ [{conn_props, ConnProps}, {ack_props, AckProps}, {session, Session}, {topic_aliases, Aliases}, - {will_msg, WillMsg}, {enable_acl, EnableAcl}]. attrs(#pstate{zone = Zone, @@ -349,11 +345,11 @@ process_packet(?CONNECT_PACKET( case authenticate(credentials(PState2), Password) of {ok, IsSuper} -> %% Maybe assign a clientId - PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper, - will_msg = make_will_msg(ConnPkt)}), + PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}), emqx_logger:set_metadata_client_id(PState3#pstate.client_id), %% Open session - case try_open_session(PState3) of + SessAttrs = lists:foldl(fun set_session_attrs/2, #{will_msg => make_will_msg(ConnPkt)}, [{max_inflight, PState3}, {expiry_interval, PState3}, {misc, PState3}]), + case try_open_session(SessAttrs) of {ok, SPid, SP} -> PState4 = PState3#pstate{session = SPid, connected = true}, ok = emqx_cm:register_connection(client_id(PState4)), @@ -502,16 +498,15 @@ process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := In case Interval =/= 0 andalso OldInterval =:= 0 of true -> deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState), - {error, protocol_error, PState#pstate{will_msg = undefined}}; + {error, protocol_error, PState}; false -> emqx_session:update_expiry_interval(SPid, Interval), - %% Clean willmsg - {stop, normal, PState#pstate{will_msg = undefined}} + {stop, normal, PState} end; process_packet(?DISCONNECT_PACKET(?RC_SUCCESS), PState) -> - {stop, normal, PState#pstate{will_msg = undefined}}; + {stop, normal, PState}; process_packet(?DISCONNECT_PACKET(_), PState) -> - {stop, normal, PState}. + {stop, {shutdown, abnormal_disconnet}, PState}. %%------------------------------------------------------------------------------ %% ConnAck --> Client @@ -678,23 +673,13 @@ maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps}) maybe_assign_client_id(PState) -> PState. -try_open_session(PState = #pstate{zone = Zone, - client_id = ClientId, - conn_pid = ConnPid, - username = Username, - clean_start = CleanStart, - will_msg = WillMsg}) -> - - SessAttrs = #{ - zone => Zone, - client_id => ClientId, - conn_pid => ConnPid, - username => Username, - clean_start => CleanStart, - will_msg => WillMsg - }, - SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}]), - case emqx_sm:open_session(SessAttrs1) of +try_open_session(SessAttrs = #{zone := _, + client_id := _, + conn_pid := _, + username := _, + will_msg := _, + clean_start := _}) -> + case emqx_sm:open_session(SessAttrs) of {ok, SPid} -> {ok, SPid, false}; Other -> Other @@ -722,6 +707,17 @@ set_session_attrs({topic_alias_maximum, #pstate{proto_ver = ?MQTT_PROTO_V5, conn set_session_attrs({topic_alias_maximum, #pstate{zone = Zone}}, SessAttrs) -> maps:put(topic_alias_maximum, emqx_zone:get_env(Zone, max_topic_alias, 0), SessAttrs); +set_session_attrs({misc, #pstate{zone = Zone, + client_id = ClientId, + conn_pid = ConnPid, + username = Username, + clean_start = CleanStart}}, SessAttrs) -> + SessAttrs#{zone => Zone, + client_id => ClientId, + conn_pid => ConnPid, + username => Username, + clean_start => CleanStart}; + set_session_attrs(_, SessAttrs) -> SessAttrs. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 966ff738d..3ebcc6dfd 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -630,11 +630,21 @@ handle_info({'EXIT', ConnPid, Reason}, #state{conn_pid = ConnPid}) exit(Reason); handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) -> - send_willmsg(WillMsg), + case Reason of + normal -> + ignore; + _ -> + send_willmsg(WillMsg) + end, {stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}}; -handle_info({'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) -> - State1 = ensure_will_delay_timer(State), +handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) -> + State1 = case Reason of + normal -> + State#state{will_msg = undefined}; + _ -> + ensure_will_delay_timer(State) + end, {noreply, ensure_expire_timer(State1#state{conn_pid = undefined})}; handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) -> diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 1678eab44..769f8df1f 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -74,7 +74,6 @@ {socktype, _}, {topic_aliases, _}, {username, _}, - {will_msg, _}, {zone, _}]). all() -> diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 5a9eb3735..becdd60f2 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -50,8 +50,6 @@ clean_start, topic_aliases, packet_size, - will_topic, - will_msg, keepalive, mountpoint, is_super, @@ -352,7 +350,7 @@ connect_v5(_) -> will_props = #{'Will-Delay-Interval' => 5}, will_topic = <<"TopicA">>, will_payload = <<"will message">>, - properties = #{'Session-Expiry-Interval' => 3} + properties = #{'Session-Expiry-Interval' => 0} } ) ) @@ -377,11 +375,11 @@ connect_v5(_) -> {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock, raw_send_serialize( - ?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE) + ?DISCONNECT_PACKET(?RC_SUCCESS) ) ), - {error, timeout} = gen_tcp:recv(Sock2, 0, 1000), + {error, timeout} = gen_tcp:recv(Sock2, 0, 2000), % session resumed {ok, Sock3} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, @@ -403,18 +401,20 @@ connect_v5(_) -> will_payload = <<"will message 2">>, properties = #{'Session-Expiry-Interval' => 3} } - ) + ), + #{version => ?MQTT_PROTO_V5} ) ), {ok, Data3} = gen_tcp:recv(Sock3, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), emqx_client_sock:send(Sock3, raw_send_serialize( - ?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE) + ?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE), + #{version => ?MQTT_PROTO_V5} ) ), - {ok, WillData} = gen_tcp:recv(Sock2, 0), + {ok, WillData} = gen_tcp:recv(Sock2, 0, 5000), {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5), emqx_client_sock:close(Sock2) diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index 848e518e1..dd7b17f3e 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -56,7 +56,6 @@ {ack_props, _}, {session, _}, {topic_aliases, _}, - {will_msg, _}, {enable_acl, _}]). -define(ATTRS, [{clean_start,true},