diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 2a16bb348..65d34d740 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -209,7 +209,7 @@ idle(enter, _, State) -> keep_state_and_data; idle(timeout, _Timeout, State) -> - {stop, idle_timeout, State}; + {stop, {shutdown, idle_timeout}, State}; idle(cast, {incoming, Packet}, State) -> handle_incoming(Packet, fun(NState) -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 7d1ecc04e..b88c82df6 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -891,11 +891,11 @@ check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}}) check_sub_acl(TopicFilters, #pstate{zone = Zone, credentials = Credentials}) -> EnableAcl = emqx_zone:get_env(Zone, enable_acl, false), lists:foldr( - fun({Topic, SubOpts}, {Ok, Acc}) when EnableAcl -> - AllowTerm = {Ok, [{Topic, SubOpts}|Acc]}, + fun({Topic, SubOpts}, {ok, Acc}) when EnableAcl -> + AllowTerm = {ok, [{Topic, SubOpts}|Acc]}, DenyTerm = {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]}, do_acl_check(subscribe, Credentials, Topic, AllowTerm, DenyTerm); - (TopicFilter, Acc) -> + (TopicFilter, {ok, Acc}) -> {ok, [TopicFilter | Acc]} end, {ok, []}, TopicFilters). diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 17d07ca0a..392ce77f5 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -301,10 +301,10 @@ pubcomp(SPid, PacketId, ReasonCode) -> -spec(unsubscribe(spid(), emqx_types:topic_table()) -> ok). unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> - TopicFilters = lists:map(fun({RawTopic, Opts}) -> - emqx_topic:parse(RawTopic, Opts); - (RawTopic) when is_binary(RawTopic) -> - emqx_topic:parse(RawTopic) + TopicFilters = lists:map(fun({RawTopic, Opts}) -> + emqx_topic:parse(RawTopic, Opts); + (RawTopic) when is_binary(RawTopic) -> + emqx_topic:parse(RawTopic) end, RawTopicFilters), unsubscribe(SPid, undefined, #{}, TopicFilters). diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index 771883be2..94ed70633 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -312,9 +312,12 @@ terminate(SockError, _Req, #state{keepalive = Keepalive, case {ProtoState, Shutdown} of {undefined, _} -> ok; {_, {shutdown, Reason}} -> - emqx_protocol:terminate(Reason, ProtoState); + emqx_protocol:terminate(Reason, ProtoState), + exit(Reason); {_, Error} -> - emqx_protocol:terminate(Error, ProtoState) + ?LOG(error, "Unexpected terminated for ~p", [Error]), + emqx_protocol:terminate(Error, ProtoState), + exit(unknown) end. %%-------------------------------------------------------------------- @@ -334,8 +337,6 @@ handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> shutdown(Error, State#state{proto_state = NProtoState}) end. - - ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined, idle_timeout = IdleTimeout}) -> @@ -345,7 +346,7 @@ ensure_stats_timer(State) -> shutdown(Reason, State) -> %% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696) - self() ! {stop, Reason}, + self() ! {stop, State#state{shutdown = {shutdown, Reason}}}, {ok, State}. wsock_stats() -> diff --git a/test/emqx_SUITE_data/slave.config b/test/emqx_SUITE_data/slave.config deleted file mode 100644 index 1cdf851b5..000000000 --- a/test/emqx_SUITE_data/slave.config +++ /dev/null @@ -1,45 +0,0 @@ -[{emqx, - [{plugins_loaded_file,"loaded_plugins"}, - {plugins_etc_dir,"plugins/"}, - {broker_sys_interval,60}, - {cache_acl,true}, - {allow_anonymous,true}, - {license_file,"../../etc/emqx.lic"}, - {protocol,[{max_clientid_len,1024},{max_packet_size,65536}]}, - {client, - [{max_publish_rate,5},{idle_timeout,30000},{enable_stats,60000}]}, - {session, - [{max_subscriptions,0}, - {upgrade_qos,false}, - {max_inflight,32}, - {retry_interval,20000}, - {max_awaiting_rel,100}, - {await_rel_timeout,20000}, - {enable_stats,60000}, - {expiry_interval,7200000}]}, - {mqueue, - [{priority,[]}, - {type,simple}, - {max_length,infinity}, - {low_watermark,0.2}, - {high_watermark,0.6}, - {store_qos0,true}]}, - {pubsub,[{pool_size,8},{by_clientid,true},{async,true}]}, - {bridge,[{max_queue_len,10000},{ping_down_interval,1}]}, - {listeners, []}, - {sysmon, - [{long_gc,false}, - {long_schedule,240}, - {large_heap,8388608}, - {busy_port,false}, - {busy_dist_port,true}]}]}, - {gen_rpc, - [{socket_keepalive_count,2}, - {socket_keepalive_interval,5}, - {socket_keepalive_idle,5}, - {call_receive_timeout,15000}, - {authentication_timeout,5000}, - {send_timeout,5000}, - {connect_timeout,5000}, - {tcp_client_port,5369}, - {tcp_server_port,7369}]}]. diff --git a/test/emqx_ws_channel_SUITE.erl b/test/emqx_ws_channel_SUITE.erl index ada11a95a..0ce31b464 100644 --- a/test/emqx_ws_channel_SUITE.erl +++ b/test/emqx_ws_channel_SUITE.erl @@ -28,10 +28,23 @@ username = <<"admin">>, password = <<"public">>})). +-define(WILL_TOPIC, <<"test/websocket/will">>). + +-define(WILL_CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{ + client_id = <<"mqtt_client">>, + username = <<"admin">>, + password = <<"public">>, + will_flag = true, + will_qos = ?QOS_1, + will_topic = ?WILL_TOPIC, + will_payload = <<"payload">> + })). + all() -> [ t_ws_connect_api , t_ws_auth_failure , t_ws_other_type_frame + , t_ws_will ]. init_per_suite(Config) -> @@ -41,6 +54,22 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). +t_ws_will(_Config) -> + {ok, ClientPid} = emqx_client:start_link(), + {ok, _} = emqx_client:connect(ClientPid), + {ok, _, [1]} = emqx_client:subscribe(ClientPid, ?WILL_TOPIC, qos1), + WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), + {ok, _} = rfc6455_client:open(WS), + Packet = raw_send_serialize(?WILL_CLIENT), + ok = rfc6455_client:send_binary(WS, Packet), + {binary, Bin} = rfc6455_client:recv(WS), + Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT), + {ok, Connack, <<>>, _} = raw_recv_pase(Bin), + exit(WS, abnomal), + ?assertEqual(1, length(emqx_client_SUITE:receive_messages(1))), + ok = emqx_client:disconnect(ClientPid), + ok. + t_ws_auth_failure(_Config) -> application:set_env(emqx, allow_anonymous, false), WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), diff --git a/test/rfc6455_client.erl b/test/rfc6455_client.erl index 987b72407..18b094a76 100644 --- a/test/rfc6455_client.erl +++ b/test/rfc6455_client.erl @@ -186,7 +186,6 @@ do_close(State = #state{socket = Socket}, {Code, Reason}) -> gen_tcp:send(Socket, encode_frame(1, 8, Payload)), State#state{phase = closing}. - loop(State = #state{socket = Socket, ppid = PPid, data = Data, phase = Phase}) -> receive