From 234037aee1346facd33776e9291ec067c036b236 Mon Sep 17 00:00:00 2001 From: GilbertWong Date: Tue, 9 Jul 2019 16:20:09 +0800 Subject: [PATCH] Fix will message mechanism for websocket channel --- src/emqx_session.erl | 8 ++++---- src/emqx_ws_channel.erl | 11 ++++++----- test/emqx_ws_channel_SUITE.erl | 29 +++++++++++++++++++++++++++++ test/rfc6455_client.erl | 1 - 4 files changed, 39 insertions(+), 10 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 17d07ca0a..392ce77f5 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -301,10 +301,10 @@ pubcomp(SPid, PacketId, ReasonCode) -> -spec(unsubscribe(spid(), emqx_types:topic_table()) -> ok). unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> - TopicFilters = lists:map(fun({RawTopic, Opts}) -> - emqx_topic:parse(RawTopic, Opts); - (RawTopic) when is_binary(RawTopic) -> - emqx_topic:parse(RawTopic) + TopicFilters = lists:map(fun({RawTopic, Opts}) -> + emqx_topic:parse(RawTopic, Opts); + (RawTopic) when is_binary(RawTopic) -> + emqx_topic:parse(RawTopic) end, RawTopicFilters), unsubscribe(SPid, undefined, #{}, TopicFilters). diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index 771883be2..6950b0d6d 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -312,9 +312,12 @@ terminate(SockError, _Req, #state{keepalive = Keepalive, case {ProtoState, Shutdown} of {undefined, _} -> ok; {_, {shutdown, Reason}} -> - emqx_protocol:terminate(Reason, ProtoState); + emqx_protocol:terminate(Reason, ProtoState), + exit(Reason); {_, Error} -> - emqx_protocol:terminate(Error, ProtoState) + ?LOG(error, "Un expected terminated for ~p", [Error]), + emqx_protocol:terminate(Error, ProtoState), + exit(unknown) end. %%-------------------------------------------------------------------- @@ -334,8 +337,6 @@ handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> shutdown(Error, State#state{proto_state = NProtoState}) end. - - ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined, idle_timeout = IdleTimeout}) -> @@ -345,7 +346,7 @@ ensure_stats_timer(State) -> shutdown(Reason, State) -> %% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696) - self() ! {stop, Reason}, + self() ! {stop, State#state{shutdown = {shutdown, Reason}}}, {ok, State}. wsock_stats() -> diff --git a/test/emqx_ws_channel_SUITE.erl b/test/emqx_ws_channel_SUITE.erl index ada11a95a..0ce31b464 100644 --- a/test/emqx_ws_channel_SUITE.erl +++ b/test/emqx_ws_channel_SUITE.erl @@ -28,10 +28,23 @@ username = <<"admin">>, password = <<"public">>})). +-define(WILL_TOPIC, <<"test/websocket/will">>). + +-define(WILL_CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{ + client_id = <<"mqtt_client">>, + username = <<"admin">>, + password = <<"public">>, + will_flag = true, + will_qos = ?QOS_1, + will_topic = ?WILL_TOPIC, + will_payload = <<"payload">> + })). + all() -> [ t_ws_connect_api , t_ws_auth_failure , t_ws_other_type_frame + , t_ws_will ]. init_per_suite(Config) -> @@ -41,6 +54,22 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). +t_ws_will(_Config) -> + {ok, ClientPid} = emqx_client:start_link(), + {ok, _} = emqx_client:connect(ClientPid), + {ok, _, [1]} = emqx_client:subscribe(ClientPid, ?WILL_TOPIC, qos1), + WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), + {ok, _} = rfc6455_client:open(WS), + Packet = raw_send_serialize(?WILL_CLIENT), + ok = rfc6455_client:send_binary(WS, Packet), + {binary, Bin} = rfc6455_client:recv(WS), + Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT), + {ok, Connack, <<>>, _} = raw_recv_pase(Bin), + exit(WS, abnomal), + ?assertEqual(1, length(emqx_client_SUITE:receive_messages(1))), + ok = emqx_client:disconnect(ClientPid), + ok. + t_ws_auth_failure(_Config) -> application:set_env(emqx, allow_anonymous, false), WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), diff --git a/test/rfc6455_client.erl b/test/rfc6455_client.erl index 987b72407..18b094a76 100644 --- a/test/rfc6455_client.erl +++ b/test/rfc6455_client.erl @@ -186,7 +186,6 @@ do_close(State = #state{socket = Socket}, {Code, Reason}) -> gen_tcp:send(Socket, encode_frame(1, 8, Payload)), State#state{phase = closing}. - loop(State = #state{socket = Socket, ppid = PPid, data = Data, phase = Phase}) -> receive