diff --git a/rebar.config b/rebar.config index 22fbccaf9..89b9436af 100644 --- a/rebar.config +++ b/rebar.config @@ -2,9 +2,9 @@ [{jsx, "2.9.0"}, % hex {cowboy, "2.6.1"}, % hex {gproc, "0.8.0"}, % hex - {ekka, "0.5.6"}, % hex {replayq, "0.1.1"}, %hex {esockd, "5.5.0"}, %hex + {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.7"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}. diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index e84665488..cf5f79ee5 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -113,7 +113,7 @@ handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) -> set_alarm_(AlarmId, AlarmDesc), {ok, State}; handle_event({clear_alarm, AlarmId}, State) -> - ?LOG(warning, "Clear Alarm: ~p", [AlarmId]), + ?LOG(info, "Clear Alarm: ~p", [AlarmId]), case encode_alarm({AlarmId, undefined}) of {ok, Json} -> emqx_broker:safe_publish(alarm_msg(topic(clear), Json)); diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 8e8ce4c1b..f2c6b6dec 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -38,8 +38,6 @@ start(_Type, _Args) -> register(emqx, self()), emqx_alarm_handler:load(), - emqx_logger_handler:init(), - print_vsn(), {ok, Sup}. diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 2a16bb348..65d34d740 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -209,7 +209,7 @@ idle(enter, _, State) -> keep_state_and_data; idle(timeout, _Timeout, State) -> - {stop, idle_timeout, State}; + {stop, {shutdown, idle_timeout}, State}; idle(cast, {incoming, Packet}, State) -> handle_incoming(Packet, fun(NState) -> diff --git a/src/emqx_logger_handler.erl b/src/emqx_logger_handler.erl deleted file mode 100644 index e0f5d9af4..000000000 --- a/src/emqx_logger_handler.erl +++ /dev/null @@ -1,43 +0,0 @@ -%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_logger_handler). - --export([log/2]). - --export([init/0]). - -init() -> - logger:add_handler(emqx_logger_handler, - emqx_logger_handler, - #{level => error, - filters => [{easy_filter, {fun filter_by_level/2, []}}], - filters_default => stop}). - --spec log(LogEvent, Config) -> ok when LogEvent :: logger:log_event(), Config :: logger:handler_config(). -log(#{msg := {report, #{report := [{supervisor, SupName}, - {errorContext, Error}, - {reason, Reason}, - {offender, _}]}}}, _Config) -> - alarm_handler:set_alarm({supervisor_report, [{supervisor, SupName}, - {errorContext, Error}, - {reason, Reason}]}), - ok; -log(_LogEvent, _Config) -> - ok. - -filter_by_level(LogEvent = #{level := error}, _Extra) -> - LogEvent; -filter_by_level(_LogEvent, _Extra) -> - stop. diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 7d1ecc04e..b88c82df6 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -891,11 +891,11 @@ check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}}) check_sub_acl(TopicFilters, #pstate{zone = Zone, credentials = Credentials}) -> EnableAcl = emqx_zone:get_env(Zone, enable_acl, false), lists:foldr( - fun({Topic, SubOpts}, {Ok, Acc}) when EnableAcl -> - AllowTerm = {Ok, [{Topic, SubOpts}|Acc]}, + fun({Topic, SubOpts}, {ok, Acc}) when EnableAcl -> + AllowTerm = {ok, [{Topic, SubOpts}|Acc]}, DenyTerm = {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]}, do_acl_check(subscribe, Credentials, Topic, AllowTerm, DenyTerm); - (TopicFilter, Acc) -> + (TopicFilter, {ok, Acc}) -> {ok, [TopicFilter | Acc]} end, {ok, []}, TopicFilters). diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 17d07ca0a..392ce77f5 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -301,10 +301,10 @@ pubcomp(SPid, PacketId, ReasonCode) -> -spec(unsubscribe(spid(), emqx_types:topic_table()) -> ok). unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> - TopicFilters = lists:map(fun({RawTopic, Opts}) -> - emqx_topic:parse(RawTopic, Opts); - (RawTopic) when is_binary(RawTopic) -> - emqx_topic:parse(RawTopic) + TopicFilters = lists:map(fun({RawTopic, Opts}) -> + emqx_topic:parse(RawTopic, Opts); + (RawTopic) when is_binary(RawTopic) -> + emqx_topic:parse(RawTopic) end, RawTopicFilters), unsubscribe(SPid, undefined, #{}, TopicFilters). diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index 771883be2..94ed70633 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -312,9 +312,12 @@ terminate(SockError, _Req, #state{keepalive = Keepalive, case {ProtoState, Shutdown} of {undefined, _} -> ok; {_, {shutdown, Reason}} -> - emqx_protocol:terminate(Reason, ProtoState); + emqx_protocol:terminate(Reason, ProtoState), + exit(Reason); {_, Error} -> - emqx_protocol:terminate(Error, ProtoState) + ?LOG(error, "Unexpected terminated for ~p", [Error]), + emqx_protocol:terminate(Error, ProtoState), + exit(unknown) end. %%-------------------------------------------------------------------- @@ -334,8 +337,6 @@ handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) -> shutdown(Error, State#state{proto_state = NProtoState}) end. - - ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined, idle_timeout = IdleTimeout}) -> @@ -345,7 +346,7 @@ ensure_stats_timer(State) -> shutdown(Reason, State) -> %% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696) - self() ! {stop, Reason}, + self() ! {stop, State#state{shutdown = {shutdown, Reason}}}, {ok, State}. wsock_stats() -> diff --git a/test/emqx_SUITE_data/slave.config b/test/emqx_SUITE_data/slave.config deleted file mode 100644 index 1cdf851b5..000000000 --- a/test/emqx_SUITE_data/slave.config +++ /dev/null @@ -1,45 +0,0 @@ -[{emqx, - [{plugins_loaded_file,"loaded_plugins"}, - {plugins_etc_dir,"plugins/"}, - {broker_sys_interval,60}, - {cache_acl,true}, - {allow_anonymous,true}, - {license_file,"../../etc/emqx.lic"}, - {protocol,[{max_clientid_len,1024},{max_packet_size,65536}]}, - {client, - [{max_publish_rate,5},{idle_timeout,30000},{enable_stats,60000}]}, - {session, - [{max_subscriptions,0}, - {upgrade_qos,false}, - {max_inflight,32}, - {retry_interval,20000}, - {max_awaiting_rel,100}, - {await_rel_timeout,20000}, - {enable_stats,60000}, - {expiry_interval,7200000}]}, - {mqueue, - [{priority,[]}, - {type,simple}, - {max_length,infinity}, - {low_watermark,0.2}, - {high_watermark,0.6}, - {store_qos0,true}]}, - {pubsub,[{pool_size,8},{by_clientid,true},{async,true}]}, - {bridge,[{max_queue_len,10000},{ping_down_interval,1}]}, - {listeners, []}, - {sysmon, - [{long_gc,false}, - {long_schedule,240}, - {large_heap,8388608}, - {busy_port,false}, - {busy_dist_port,true}]}]}, - {gen_rpc, - [{socket_keepalive_count,2}, - {socket_keepalive_interval,5}, - {socket_keepalive_idle,5}, - {call_receive_timeout,15000}, - {authentication_timeout,5000}, - {send_timeout,5000}, - {connect_timeout,5000}, - {tcp_client_port,5369}, - {tcp_server_port,7369}]}]. diff --git a/test/emqx_alarm_handler_SUITE.erl b/test/emqx_alarm_handler_SUITE.erl index c918ce4a7..f6aba8a1f 100644 --- a/test/emqx_alarm_handler_SUITE.erl +++ b/test/emqx_alarm_handler_SUITE.erl @@ -24,8 +24,7 @@ -include("emqx_mqtt.hrl"). -include("emqx.hrl"). -all() -> [t_alarm_handler, - t_logger_handler]. +all() -> [t_alarm_handler]. init_per_suite(Config) -> emqx_ct_helpers:start_apps([], fun set_special_configs/1), @@ -97,23 +96,6 @@ t_alarm_handler(_) -> end). -t_logger_handler(_) -> - %% Meck supervisor report - logger:log(error, #{label => {supervisor, start_error}, - report => [{supervisor, {local, tmp_sup}}, - {errorContext, shutdown}, - {reason, reached_max_restart_intensity}, - {offender, [{pid, meck}, - {id, meck}, - {mfargs, {meck, start_link, []}}, - {restart_type, permanent}, - {shutdown, 5000}, - {child_type, worker}]}]}, - #{logger_formatter => #{title => "SUPERVISOR REPORT"}, - report_cb => fun logger:format_otp_report/1}), - timer:sleep(20), - ?assertEqual(true, lists:keymember(supervisor_report, 1, emqx_alarm_handler:get_alarms())). - raw_send_serialize(Packet) -> emqx_frame:serialize(Packet). diff --git a/test/emqx_ws_channel_SUITE.erl b/test/emqx_ws_channel_SUITE.erl index ada11a95a..0ce31b464 100644 --- a/test/emqx_ws_channel_SUITE.erl +++ b/test/emqx_ws_channel_SUITE.erl @@ -28,10 +28,23 @@ username = <<"admin">>, password = <<"public">>})). +-define(WILL_TOPIC, <<"test/websocket/will">>). + +-define(WILL_CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{ + client_id = <<"mqtt_client">>, + username = <<"admin">>, + password = <<"public">>, + will_flag = true, + will_qos = ?QOS_1, + will_topic = ?WILL_TOPIC, + will_payload = <<"payload">> + })). + all() -> [ t_ws_connect_api , t_ws_auth_failure , t_ws_other_type_frame + , t_ws_will ]. init_per_suite(Config) -> @@ -41,6 +54,22 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). +t_ws_will(_Config) -> + {ok, ClientPid} = emqx_client:start_link(), + {ok, _} = emqx_client:connect(ClientPid), + {ok, _, [1]} = emqx_client:subscribe(ClientPid, ?WILL_TOPIC, qos1), + WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), + {ok, _} = rfc6455_client:open(WS), + Packet = raw_send_serialize(?WILL_CLIENT), + ok = rfc6455_client:send_binary(WS, Packet), + {binary, Bin} = rfc6455_client:recv(WS), + Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT), + {ok, Connack, <<>>, _} = raw_recv_pase(Bin), + exit(WS, abnomal), + ?assertEqual(1, length(emqx_client_SUITE:receive_messages(1))), + ok = emqx_client:disconnect(ClientPid), + ok. + t_ws_auth_failure(_Config) -> application:set_env(emqx, allow_anonymous, false), WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), diff --git a/test/rfc6455_client.erl b/test/rfc6455_client.erl index 987b72407..18b094a76 100644 --- a/test/rfc6455_client.erl +++ b/test/rfc6455_client.erl @@ -186,7 +186,6 @@ do_close(State = #state{socket = Socket}, {Code, Reason}) -> gen_tcp:send(Socket, encode_frame(1, 8, Payload)), State#state{phase = closing}. - loop(State = #state{socket = Socket, ppid = PPid, data = Data, phase = Phase}) -> receive