Merge pull request #2687 from emqx/master
Auto-pull-request-by-2019-07-12
This commit is contained in:
commit
1c17d514aa
|
@ -2,9 +2,9 @@
|
||||||
[{jsx, "2.9.0"}, % hex
|
[{jsx, "2.9.0"}, % hex
|
||||||
{cowboy, "2.6.1"}, % hex
|
{cowboy, "2.6.1"}, % hex
|
||||||
{gproc, "0.8.0"}, % hex
|
{gproc, "0.8.0"}, % hex
|
||||||
{ekka, "0.5.6"}, % hex
|
|
||||||
{replayq, "0.1.1"}, %hex
|
{replayq, "0.1.1"}, %hex
|
||||||
{esockd, "5.5.0"}, %hex
|
{esockd, "5.5.0"}, %hex
|
||||||
|
{ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.7"}}},
|
||||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}},
|
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}},
|
||||||
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
|
@ -113,7 +113,7 @@ handle_event({set_alarm, Alarm = {AlarmId, AlarmDesc}}, State) ->
|
||||||
set_alarm_(AlarmId, AlarmDesc),
|
set_alarm_(AlarmId, AlarmDesc),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
handle_event({clear_alarm, AlarmId}, State) ->
|
handle_event({clear_alarm, AlarmId}, State) ->
|
||||||
?LOG(warning, "Clear Alarm: ~p", [AlarmId]),
|
?LOG(info, "Clear Alarm: ~p", [AlarmId]),
|
||||||
case encode_alarm({AlarmId, undefined}) of
|
case encode_alarm({AlarmId, undefined}) of
|
||||||
{ok, Json} ->
|
{ok, Json} ->
|
||||||
emqx_broker:safe_publish(alarm_msg(topic(clear), Json));
|
emqx_broker:safe_publish(alarm_msg(topic(clear), Json));
|
||||||
|
|
|
@ -38,8 +38,6 @@ start(_Type, _Args) ->
|
||||||
register(emqx, self()),
|
register(emqx, self()),
|
||||||
|
|
||||||
emqx_alarm_handler:load(),
|
emqx_alarm_handler:load(),
|
||||||
emqx_logger_handler:init(),
|
|
||||||
|
|
||||||
print_vsn(),
|
print_vsn(),
|
||||||
{ok, Sup}.
|
{ok, Sup}.
|
||||||
|
|
||||||
|
|
|
@ -209,7 +209,7 @@ idle(enter, _, State) ->
|
||||||
keep_state_and_data;
|
keep_state_and_data;
|
||||||
|
|
||||||
idle(timeout, _Timeout, State) ->
|
idle(timeout, _Timeout, State) ->
|
||||||
{stop, idle_timeout, State};
|
{stop, {shutdown, idle_timeout}, State};
|
||||||
|
|
||||||
idle(cast, {incoming, Packet}, State) ->
|
idle(cast, {incoming, Packet}, State) ->
|
||||||
handle_incoming(Packet, fun(NState) ->
|
handle_incoming(Packet, fun(NState) ->
|
||||||
|
|
|
@ -1,43 +0,0 @@
|
||||||
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
|
|
||||||
-module(emqx_logger_handler).
|
|
||||||
|
|
||||||
-export([log/2]).
|
|
||||||
|
|
||||||
-export([init/0]).
|
|
||||||
|
|
||||||
init() ->
|
|
||||||
logger:add_handler(emqx_logger_handler,
|
|
||||||
emqx_logger_handler,
|
|
||||||
#{level => error,
|
|
||||||
filters => [{easy_filter, {fun filter_by_level/2, []}}],
|
|
||||||
filters_default => stop}).
|
|
||||||
|
|
||||||
-spec log(LogEvent, Config) -> ok when LogEvent :: logger:log_event(), Config :: logger:handler_config().
|
|
||||||
log(#{msg := {report, #{report := [{supervisor, SupName},
|
|
||||||
{errorContext, Error},
|
|
||||||
{reason, Reason},
|
|
||||||
{offender, _}]}}}, _Config) ->
|
|
||||||
alarm_handler:set_alarm({supervisor_report, [{supervisor, SupName},
|
|
||||||
{errorContext, Error},
|
|
||||||
{reason, Reason}]}),
|
|
||||||
ok;
|
|
||||||
log(_LogEvent, _Config) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
filter_by_level(LogEvent = #{level := error}, _Extra) ->
|
|
||||||
LogEvent;
|
|
||||||
filter_by_level(_LogEvent, _Extra) ->
|
|
||||||
stop.
|
|
|
@ -891,11 +891,11 @@ check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}})
|
||||||
check_sub_acl(TopicFilters, #pstate{zone = Zone, credentials = Credentials}) ->
|
check_sub_acl(TopicFilters, #pstate{zone = Zone, credentials = Credentials}) ->
|
||||||
EnableAcl = emqx_zone:get_env(Zone, enable_acl, false),
|
EnableAcl = emqx_zone:get_env(Zone, enable_acl, false),
|
||||||
lists:foldr(
|
lists:foldr(
|
||||||
fun({Topic, SubOpts}, {Ok, Acc}) when EnableAcl ->
|
fun({Topic, SubOpts}, {ok, Acc}) when EnableAcl ->
|
||||||
AllowTerm = {Ok, [{Topic, SubOpts}|Acc]},
|
AllowTerm = {ok, [{Topic, SubOpts}|Acc]},
|
||||||
DenyTerm = {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]},
|
DenyTerm = {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]},
|
||||||
do_acl_check(subscribe, Credentials, Topic, AllowTerm, DenyTerm);
|
do_acl_check(subscribe, Credentials, Topic, AllowTerm, DenyTerm);
|
||||||
(TopicFilter, Acc) ->
|
(TopicFilter, {ok, Acc}) ->
|
||||||
{ok, [TopicFilter | Acc]}
|
{ok, [TopicFilter | Acc]}
|
||||||
end, {ok, []}, TopicFilters).
|
end, {ok, []}, TopicFilters).
|
||||||
|
|
||||||
|
|
|
@ -301,10 +301,10 @@ pubcomp(SPid, PacketId, ReasonCode) ->
|
||||||
|
|
||||||
-spec(unsubscribe(spid(), emqx_types:topic_table()) -> ok).
|
-spec(unsubscribe(spid(), emqx_types:topic_table()) -> ok).
|
||||||
unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
|
unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
|
||||||
TopicFilters = lists:map(fun({RawTopic, Opts}) ->
|
TopicFilters = lists:map(fun({RawTopic, Opts}) ->
|
||||||
emqx_topic:parse(RawTopic, Opts);
|
emqx_topic:parse(RawTopic, Opts);
|
||||||
(RawTopic) when is_binary(RawTopic) ->
|
(RawTopic) when is_binary(RawTopic) ->
|
||||||
emqx_topic:parse(RawTopic)
|
emqx_topic:parse(RawTopic)
|
||||||
end, RawTopicFilters),
|
end, RawTopicFilters),
|
||||||
unsubscribe(SPid, undefined, #{}, TopicFilters).
|
unsubscribe(SPid, undefined, #{}, TopicFilters).
|
||||||
|
|
||||||
|
|
|
@ -312,9 +312,12 @@ terminate(SockError, _Req, #state{keepalive = Keepalive,
|
||||||
case {ProtoState, Shutdown} of
|
case {ProtoState, Shutdown} of
|
||||||
{undefined, _} -> ok;
|
{undefined, _} -> ok;
|
||||||
{_, {shutdown, Reason}} ->
|
{_, {shutdown, Reason}} ->
|
||||||
emqx_protocol:terminate(Reason, ProtoState);
|
emqx_protocol:terminate(Reason, ProtoState),
|
||||||
|
exit(Reason);
|
||||||
{_, Error} ->
|
{_, Error} ->
|
||||||
emqx_protocol:terminate(Error, ProtoState)
|
?LOG(error, "Unexpected terminated for ~p", [Error]),
|
||||||
|
emqx_protocol:terminate(Error, ProtoState),
|
||||||
|
exit(unknown)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -334,8 +337,6 @@ handle_incoming(Packet, SuccFun, State = #state{proto_state = ProtoState}) ->
|
||||||
shutdown(Error, State#state{proto_state = NProtoState})
|
shutdown(Error, State#state{proto_state = NProtoState})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
ensure_stats_timer(State = #state{enable_stats = true,
|
ensure_stats_timer(State = #state{enable_stats = true,
|
||||||
stats_timer = undefined,
|
stats_timer = undefined,
|
||||||
idle_timeout = IdleTimeout}) ->
|
idle_timeout = IdleTimeout}) ->
|
||||||
|
@ -345,7 +346,7 @@ ensure_stats_timer(State) ->
|
||||||
|
|
||||||
shutdown(Reason, State) ->
|
shutdown(Reason, State) ->
|
||||||
%% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696)
|
%% Fix the issue#2591(https://github.com/emqx/emqx/issues/2591#issuecomment-500278696)
|
||||||
self() ! {stop, Reason},
|
self() ! {stop, State#state{shutdown = {shutdown, Reason}}},
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
wsock_stats() ->
|
wsock_stats() ->
|
||||||
|
|
|
@ -1,45 +0,0 @@
|
||||||
[{emqx,
|
|
||||||
[{plugins_loaded_file,"loaded_plugins"},
|
|
||||||
{plugins_etc_dir,"plugins/"},
|
|
||||||
{broker_sys_interval,60},
|
|
||||||
{cache_acl,true},
|
|
||||||
{allow_anonymous,true},
|
|
||||||
{license_file,"../../etc/emqx.lic"},
|
|
||||||
{protocol,[{max_clientid_len,1024},{max_packet_size,65536}]},
|
|
||||||
{client,
|
|
||||||
[{max_publish_rate,5},{idle_timeout,30000},{enable_stats,60000}]},
|
|
||||||
{session,
|
|
||||||
[{max_subscriptions,0},
|
|
||||||
{upgrade_qos,false},
|
|
||||||
{max_inflight,32},
|
|
||||||
{retry_interval,20000},
|
|
||||||
{max_awaiting_rel,100},
|
|
||||||
{await_rel_timeout,20000},
|
|
||||||
{enable_stats,60000},
|
|
||||||
{expiry_interval,7200000}]},
|
|
||||||
{mqueue,
|
|
||||||
[{priority,[]},
|
|
||||||
{type,simple},
|
|
||||||
{max_length,infinity},
|
|
||||||
{low_watermark,0.2},
|
|
||||||
{high_watermark,0.6},
|
|
||||||
{store_qos0,true}]},
|
|
||||||
{pubsub,[{pool_size,8},{by_clientid,true},{async,true}]},
|
|
||||||
{bridge,[{max_queue_len,10000},{ping_down_interval,1}]},
|
|
||||||
{listeners, []},
|
|
||||||
{sysmon,
|
|
||||||
[{long_gc,false},
|
|
||||||
{long_schedule,240},
|
|
||||||
{large_heap,8388608},
|
|
||||||
{busy_port,false},
|
|
||||||
{busy_dist_port,true}]}]},
|
|
||||||
{gen_rpc,
|
|
||||||
[{socket_keepalive_count,2},
|
|
||||||
{socket_keepalive_interval,5},
|
|
||||||
{socket_keepalive_idle,5},
|
|
||||||
{call_receive_timeout,15000},
|
|
||||||
{authentication_timeout,5000},
|
|
||||||
{send_timeout,5000},
|
|
||||||
{connect_timeout,5000},
|
|
||||||
{tcp_client_port,5369},
|
|
||||||
{tcp_server_port,7369}]}].
|
|
|
@ -24,8 +24,7 @@
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
|
||||||
all() -> [t_alarm_handler,
|
all() -> [t_alarm_handler].
|
||||||
t_logger_handler].
|
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_ct_helpers:start_apps([], fun set_special_configs/1),
|
emqx_ct_helpers:start_apps([], fun set_special_configs/1),
|
||||||
|
@ -97,23 +96,6 @@ t_alarm_handler(_) ->
|
||||||
|
|
||||||
end).
|
end).
|
||||||
|
|
||||||
t_logger_handler(_) ->
|
|
||||||
%% Meck supervisor report
|
|
||||||
logger:log(error, #{label => {supervisor, start_error},
|
|
||||||
report => [{supervisor, {local, tmp_sup}},
|
|
||||||
{errorContext, shutdown},
|
|
||||||
{reason, reached_max_restart_intensity},
|
|
||||||
{offender, [{pid, meck},
|
|
||||||
{id, meck},
|
|
||||||
{mfargs, {meck, start_link, []}},
|
|
||||||
{restart_type, permanent},
|
|
||||||
{shutdown, 5000},
|
|
||||||
{child_type, worker}]}]},
|
|
||||||
#{logger_formatter => #{title => "SUPERVISOR REPORT"},
|
|
||||||
report_cb => fun logger:format_otp_report/1}),
|
|
||||||
timer:sleep(20),
|
|
||||||
?assertEqual(true, lists:keymember(supervisor_report, 1, emqx_alarm_handler:get_alarms())).
|
|
||||||
|
|
||||||
raw_send_serialize(Packet) ->
|
raw_send_serialize(Packet) ->
|
||||||
emqx_frame:serialize(Packet).
|
emqx_frame:serialize(Packet).
|
||||||
|
|
||||||
|
|
|
@ -28,10 +28,23 @@
|
||||||
username = <<"admin">>,
|
username = <<"admin">>,
|
||||||
password = <<"public">>})).
|
password = <<"public">>})).
|
||||||
|
|
||||||
|
-define(WILL_TOPIC, <<"test/websocket/will">>).
|
||||||
|
|
||||||
|
-define(WILL_CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{
|
||||||
|
client_id = <<"mqtt_client">>,
|
||||||
|
username = <<"admin">>,
|
||||||
|
password = <<"public">>,
|
||||||
|
will_flag = true,
|
||||||
|
will_qos = ?QOS_1,
|
||||||
|
will_topic = ?WILL_TOPIC,
|
||||||
|
will_payload = <<"payload">>
|
||||||
|
})).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
[ t_ws_connect_api
|
[ t_ws_connect_api
|
||||||
, t_ws_auth_failure
|
, t_ws_auth_failure
|
||||||
, t_ws_other_type_frame
|
, t_ws_other_type_frame
|
||||||
|
, t_ws_will
|
||||||
].
|
].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
@ -41,6 +54,22 @@ init_per_suite(Config) ->
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_ct_helpers:stop_apps([]).
|
emqx_ct_helpers:stop_apps([]).
|
||||||
|
|
||||||
|
t_ws_will(_Config) ->
|
||||||
|
{ok, ClientPid} = emqx_client:start_link(),
|
||||||
|
{ok, _} = emqx_client:connect(ClientPid),
|
||||||
|
{ok, _, [1]} = emqx_client:subscribe(ClientPid, ?WILL_TOPIC, qos1),
|
||||||
|
WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
|
||||||
|
{ok, _} = rfc6455_client:open(WS),
|
||||||
|
Packet = raw_send_serialize(?WILL_CLIENT),
|
||||||
|
ok = rfc6455_client:send_binary(WS, Packet),
|
||||||
|
{binary, Bin} = rfc6455_client:recv(WS),
|
||||||
|
Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT),
|
||||||
|
{ok, Connack, <<>>, _} = raw_recv_pase(Bin),
|
||||||
|
exit(WS, abnomal),
|
||||||
|
?assertEqual(1, length(emqx_client_SUITE:receive_messages(1))),
|
||||||
|
ok = emqx_client:disconnect(ClientPid),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_ws_auth_failure(_Config) ->
|
t_ws_auth_failure(_Config) ->
|
||||||
application:set_env(emqx, allow_anonymous, false),
|
application:set_env(emqx, allow_anonymous, false),
|
||||||
WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
|
WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
|
||||||
|
|
|
@ -186,7 +186,6 @@ do_close(State = #state{socket = Socket}, {Code, Reason}) ->
|
||||||
gen_tcp:send(Socket, encode_frame(1, 8, Payload)),
|
gen_tcp:send(Socket, encode_frame(1, 8, Payload)),
|
||||||
State#state{phase = closing}.
|
State#state{phase = closing}.
|
||||||
|
|
||||||
|
|
||||||
loop(State = #state{socket = Socket, ppid = PPid, data = Data,
|
loop(State = #state{socket = Socket, ppid = PPid, data = Data,
|
||||||
phase = Phase}) ->
|
phase = Phase}) ->
|
||||||
receive
|
receive
|
||||||
|
|
Loading…
Reference in New Issue