Merge pull request #4183 from zmstone/merge-e4.2.4-to-dev-4.3.0
Merge e4.2.4 to dev 4.3.0
This commit is contained in:
commit
3dfa9f45c4
|
@ -5,4 +5,4 @@
|
|||
{platform_etc_dir, "etc"}.
|
||||
{platform_lib_dir, "lib"}.
|
||||
{platform_log_dir, "log"}.
|
||||
{platform_plugins_dir, "plugins"}.
|
||||
{platform_plugins_dir, "plugins"}.
|
||||
|
|
|
@ -1596,6 +1596,10 @@ end}.
|
|||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ws.$name.peer_cert_as_username", "emqx.listeners", [
|
||||
{datatype, {enum, [cn, dn, crt]}}
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ws.$name.check_origin_enable", "emqx.listeners", [
|
||||
{datatype, {enum, [true, false]}},
|
||||
{default, false},
|
||||
|
@ -1743,6 +1747,10 @@ end}.
|
|||
{datatype, string}
|
||||
]}.
|
||||
|
||||
{mapping, "listener.wss.$name.dhfile", "emqx.listeners", [
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
||||
{mapping, "listener.wss.$name.depth", "emqx.listeners", [
|
||||
{default, 10},
|
||||
{datatype, integer}
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
|
||||
-module(emqx_access_rule).
|
||||
|
||||
-include("emqx.hrl").
|
||||
|
||||
%% APIs
|
||||
-export([ match/3
|
||||
, compile/1
|
||||
|
|
|
@ -37,6 +37,7 @@
|
|||
-export([ activate/1
|
||||
, activate/2
|
||||
, deactivate/1
|
||||
, deactivate/2
|
||||
, delete_all_deactivated_alarms/0
|
||||
, get_alarms/0
|
||||
, get_alarms/1
|
||||
|
@ -132,7 +133,10 @@ activate(Name, Details) ->
|
|||
gen_server:call(?MODULE, {activate_alarm, Name, Details}).
|
||||
|
||||
deactivate(Name) ->
|
||||
gen_server:call(?MODULE, {deactivate_alarm, Name}).
|
||||
gen_server:call(?MODULE, {deactivate_alarm, Name, no_details}).
|
||||
|
||||
deactivate(Name, Details) ->
|
||||
gen_server:call(?MODULE, {deactivate_alarm, Name, Details}).
|
||||
|
||||
delete_all_deactivated_alarms() ->
|
||||
gen_server:call(?MODULE, delete_all_deactivated_alarms).
|
||||
|
@ -183,34 +187,13 @@ handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Act
|
|||
{reply, ok, State}
|
||||
end;
|
||||
|
||||
handle_call({deactivate_alarm, Name}, _From, State = #state{actions = Actions,
|
||||
size_limit = SizeLimit}) ->
|
||||
handle_call({deactivate_alarm, Name, Details}, _From, State = #state{
|
||||
actions = Actions, size_limit = SizeLimit}) ->
|
||||
case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of
|
||||
[] ->
|
||||
{reply, {error, not_found}, State};
|
||||
[#activated_alarm{name = Name,
|
||||
details = Details,
|
||||
message = Message,
|
||||
activate_at = ActivateAt}] ->
|
||||
case SizeLimit > 0 andalso (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of
|
||||
true ->
|
||||
case mnesia:dirty_first(?DEACTIVATED_ALARM) of
|
||||
'$end_of_table' ->
|
||||
ok;
|
||||
ActivateAt2 ->
|
||||
mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2)
|
||||
end;
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
Alarm = #deactivated_alarm{activate_at = ActivateAt,
|
||||
name = Name,
|
||||
details = Details,
|
||||
message = Message,
|
||||
deactivate_at = erlang:system_time(microsecond)},
|
||||
mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
|
||||
mnesia:dirty_write(?DEACTIVATED_ALARM, Alarm),
|
||||
do_actions(deactivate, Alarm, Actions),
|
||||
[Alarm] ->
|
||||
deactivate_alarm(Details, SizeLimit, Actions, Alarm),
|
||||
{reply, ok, State}
|
||||
end;
|
||||
|
||||
|
@ -260,23 +243,50 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{
|
||||
activate_at = ActivateAt, name = Name, details = Details0,
|
||||
message = Msg0}) ->
|
||||
case SizeLimit > 0 andalso
|
||||
(mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of
|
||||
true ->
|
||||
case mnesia:dirty_first(?DEACTIVATED_ALARM) of
|
||||
'$end_of_table' -> ok;
|
||||
ActivateAt2 ->
|
||||
mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2)
|
||||
end;
|
||||
false -> ok
|
||||
end,
|
||||
HistoryAlarm = make_deactivated_alarm(ActivateAt, Name, Details0, Msg0,
|
||||
erlang:system_time(microsecond)),
|
||||
DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details,
|
||||
normalize_message(Name, Details),
|
||||
erlang:system_time(microsecond)),
|
||||
mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm),
|
||||
mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
|
||||
do_actions(deactivate, DeActAlarm, Actions).
|
||||
|
||||
make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) ->
|
||||
#deactivated_alarm{
|
||||
activate_at = ActivateAt,
|
||||
name = Name,
|
||||
details = Details,
|
||||
message = Message,
|
||||
deactivate_at = DeActivateAt}.
|
||||
|
||||
deactivate_all_alarms() ->
|
||||
lists:foreach(
|
||||
fun(#activated_alarm{
|
||||
name = Name,
|
||||
details = Details,
|
||||
message = Message,
|
||||
activate_at = ActivateAt
|
||||
}) ->
|
||||
mnesia:dirty_write(?DEACTIVATED_ALARM,
|
||||
#deactivated_alarm{
|
||||
activate_at = ActivateAt,
|
||||
name = Name,
|
||||
details = Details,
|
||||
message = Message,
|
||||
deactivate_at = erlang:system_time(microsecond)
|
||||
})
|
||||
end, ets:tab2list(?ACTIVATED_ALARM)),
|
||||
fun(#activated_alarm{name = Name,
|
||||
details = Details,
|
||||
message = Message,
|
||||
activate_at = ActivateAt}) ->
|
||||
mnesia:dirty_write(?DEACTIVATED_ALARM,
|
||||
#deactivated_alarm{
|
||||
activate_at = ActivateAt,
|
||||
name = Name,
|
||||
details = Details,
|
||||
message = Message,
|
||||
deactivate_at = erlang:system_time(microsecond)})
|
||||
end, ets:tab2list(?ACTIVATED_ALARM)),
|
||||
clear_table(?ACTIVATED_ALARM).
|
||||
|
||||
%% Delete all records from the given table, ignore result.
|
||||
|
@ -355,6 +365,8 @@ normalize(#deactivated_alarm{activate_at = ActivateAt,
|
|||
deactivate_at => DeactivateAt,
|
||||
activated => false}.
|
||||
|
||||
normalize_message(Name, no_details) ->
|
||||
list_to_binary(io_lib:format("~p", [Name]));
|
||||
normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) ->
|
||||
list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark]));
|
||||
normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) ->
|
||||
|
@ -367,8 +379,7 @@ normalize_message(partition, #{occurred := Node}) ->
|
|||
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
|
||||
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
|
||||
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
|
||||
normalize_message(<<"mqtt_conn/congested/", ClientId/binary>>, _) ->
|
||||
list_to_binary(io_lib:format("MQTT connection for clientid '~s' is congested", [ClientId]));
|
||||
normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) ->
|
||||
list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info]));
|
||||
normalize_message(_Name, _UnknownDetails) ->
|
||||
<<"Unknown alarm">>.
|
||||
|
||||
|
|
|
@ -32,11 +32,11 @@ start(_Type, _Args) ->
|
|||
print_banner(),
|
||||
ekka:start(),
|
||||
{ok, Sup} = emqx_sup:start_link(),
|
||||
ok = start_autocluster(),
|
||||
ok = emqx_plugins:init(),
|
||||
_ = emqx_plugins:load(),
|
||||
emqx_boot:is_enabled(listeners)
|
||||
andalso (ok = emqx_listeners:start()),
|
||||
start_autocluster(),
|
||||
register(emqx, self()),
|
||||
ok = emqx_alarm_handler:load(),
|
||||
print_vsn(),
|
||||
|
@ -63,9 +63,8 @@ print_vsn() ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Autocluster
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
start_autocluster() ->
|
||||
ekka:callback(prepare, fun emqx:shutdown/1),
|
||||
ekka:callback(reboot, fun emqx:reboot/0),
|
||||
ekka:autocluster(?APP).
|
||||
|
||||
_ = ekka:autocluster(?APP), %% returns 'ok' or a pid or 'any()' as in spec
|
||||
ok.
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
|
||||
-export([ info/1
|
||||
, info/2
|
||||
, set_conn_state/2
|
||||
, stats/1
|
||||
, caps/1
|
||||
]).
|
||||
|
@ -87,7 +88,7 @@
|
|||
pendings :: list()
|
||||
}).
|
||||
|
||||
-opaque(channel() :: #channel{}).
|
||||
-type(channel() :: #channel{}).
|
||||
|
||||
-type(conn_state() :: idle | connecting | connected | disconnected).
|
||||
|
||||
|
@ -127,26 +128,26 @@ info(Keys, Channel) when is_list(Keys) ->
|
|||
[{Key, info(Key, Channel)} || Key <- Keys];
|
||||
info(conninfo, #channel{conninfo = ConnInfo}) ->
|
||||
ConnInfo;
|
||||
info(zone, #channel{clientinfo = #{zone := Zone}}) ->
|
||||
Zone;
|
||||
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
|
||||
ClientId;
|
||||
info(username, #channel{clientinfo = #{username := Username}}) ->
|
||||
Username;
|
||||
info(socktype, #channel{conninfo = #{socktype := SockType}}) ->
|
||||
SockType;
|
||||
info(peername, #channel{conninfo = #{peername := Peername}}) ->
|
||||
Peername;
|
||||
info(sockname, #channel{conninfo = #{sockname := Sockname}}) ->
|
||||
Sockname;
|
||||
info(proto_name, #channel{conninfo = #{proto_name := ProtoName}}) ->
|
||||
ProtoName;
|
||||
info(proto_ver, #channel{conninfo = #{proto_ver := ProtoVer}}) ->
|
||||
ProtoVer;
|
||||
info(connected_at, #channel{conninfo = #{connected_at := ConnectedAt}}) ->
|
||||
ConnectedAt;
|
||||
info(socktype, #channel{conninfo = ConnInfo}) ->
|
||||
maps:get(socktype, ConnInfo, undefined);
|
||||
info(peername, #channel{conninfo = ConnInfo}) ->
|
||||
maps:get(peername, ConnInfo, undefined);
|
||||
info(sockname, #channel{conninfo = ConnInfo}) ->
|
||||
maps:get(sockname, ConnInfo, undefined);
|
||||
info(proto_name, #channel{conninfo = ConnInfo}) ->
|
||||
maps:get(proto_name, ConnInfo, undefined);
|
||||
info(proto_ver, #channel{conninfo = ConnInfo}) ->
|
||||
maps:get(proto_ver, ConnInfo, undefined);
|
||||
info(connected_at, #channel{conninfo = ConnInfo}) ->
|
||||
maps:get(connected_at, ConnInfo, undefined);
|
||||
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
|
||||
ClientInfo;
|
||||
info(zone, #channel{clientinfo = ClientInfo}) ->
|
||||
maps:get(zone, ClientInfo, undefined);
|
||||
info(clientid, #channel{clientinfo = ClientInfo}) ->
|
||||
maps:get(clientid, ClientInfo, undefined);
|
||||
info(username, #channel{clientinfo = ClientInfo}) ->
|
||||
maps:get(username, ClientInfo, undefined);
|
||||
info(session, #channel{session = Session}) ->
|
||||
maybe_apply(fun emqx_session:info/1, Session);
|
||||
info(conn_state, #channel{conn_state = ConnState}) ->
|
||||
|
@ -163,6 +164,9 @@ info(alias_maximum, #channel{alias_maximum = Limits}) ->
|
|||
Limits;
|
||||
info(timers, #channel{timers = Timers}) -> Timers.
|
||||
|
||||
set_conn_state(ConnState, Channel) ->
|
||||
Channel#channel{conn_state = ConnState}.
|
||||
|
||||
%% TODO: Add more stats.
|
||||
-spec(stats(channel()) -> emqx_types:stats()).
|
||||
stats(#channel{session = Session})->
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_congestion).
|
||||
|
||||
-export([ maybe_alarm_port_busy/3
|
||||
, maybe_alarm_port_busy/4
|
||||
, maybe_alarm_too_many_publish/5
|
||||
, maybe_alarm_too_many_publish/6
|
||||
, cancel_alarms/3
|
||||
]).
|
||||
|
||||
-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_congestion]}}]).
|
||||
|
||||
-define(ALARM_CONN_CONGEST(Channel, Reason),
|
||||
list_to_binary(
|
||||
io_lib:format("mqtt_conn/congested/~s/~s/~s",
|
||||
[emqx_channel:info(clientid, Channel),
|
||||
maps:get(username, emqx_channel:info(clientinfo, Channel),
|
||||
<<"undefined">>),
|
||||
Reason]))).
|
||||
|
||||
-define(ALARM_CONN_INFO_KEYS, [socktype, sockname, peername, clientid, username,
|
||||
proto_name, proto_ver, connected_at, conn_state]).
|
||||
-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
|
||||
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
|
||||
-define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]).
|
||||
-define(ALARM_SENT(REASON), {alarm_sent, REASON}).
|
||||
-define(ALL_ALARM_REASONS, [port_busy, too_many_publish]).
|
||||
-define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}).
|
||||
-define(CONFIRM_CLEAR_INTERVAL, 10000).
|
||||
|
||||
maybe_alarm_port_busy(Socket, Transport, Channel) ->
|
||||
maybe_alarm_port_busy(Socket, Transport, Channel, false).
|
||||
|
||||
maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) ->
|
||||
case is_tcp_congested(Socket, Transport) of
|
||||
true -> alarm_congestion(Socket, Transport, Channel, port_busy);
|
||||
false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy,
|
||||
ForceClear)
|
||||
end.
|
||||
|
||||
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
|
||||
MaxBatchSize) ->
|
||||
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
|
||||
MaxBatchSize, false).
|
||||
|
||||
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
|
||||
PubMsgCount = _MaxBatchSize, _ForceClear) ->
|
||||
%% we only alarm it when the process is "too busy"
|
||||
alarm_congestion(Socket, Transport, Channel, too_many_publish);
|
||||
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
|
||||
_MaxBatchSize, ForceClear) when PubMsgCount == 0 ->
|
||||
%% but we clear the alarm until it is really "idle", to avoid sending
|
||||
%% alarms and clears too frequently
|
||||
cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish,
|
||||
ForceClear);
|
||||
maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount,
|
||||
_MaxBatchSize, _ForceClear) ->
|
||||
ok.
|
||||
|
||||
cancel_alarms(Socket, Transport, Channel) ->
|
||||
lists:foreach(fun(Reason) ->
|
||||
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason)
|
||||
end, ?ALL_ALARM_REASONS).
|
||||
|
||||
alarm_congestion(Socket, Transport, Channel, Reason) ->
|
||||
case has_alarm_sent(Reason) of
|
||||
false -> do_alarm_congestion(Socket, Transport, Channel, Reason);
|
||||
true ->
|
||||
%% pretend we have sent an alarm again
|
||||
update_alarm_sent_at(Reason)
|
||||
end.
|
||||
|
||||
cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) ->
|
||||
case is_alarm_allowed_clear(Reason, ForceClear) of
|
||||
true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
|
||||
false -> ok
|
||||
end.
|
||||
|
||||
do_alarm_congestion(Socket, Transport, Channel, Reason) ->
|
||||
ok = update_alarm_sent_at(Reason),
|
||||
AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel),
|
||||
emqx_alarm:activate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails),
|
||||
ok.
|
||||
|
||||
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
|
||||
ok = remove_alarm_sent_at(Reason),
|
||||
AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel),
|
||||
emqx_alarm:deactivate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails),
|
||||
ok.
|
||||
|
||||
is_tcp_congested(Socket, Transport) ->
|
||||
case Transport:getstat(Socket, [send_pend]) of
|
||||
{ok, [{send_pend, N}]} when N > 0 -> true;
|
||||
_ -> false
|
||||
end.
|
||||
|
||||
has_alarm_sent(Reason) ->
|
||||
case get_alarm_sent_at(Reason) of
|
||||
0 -> false;
|
||||
_ -> true
|
||||
end.
|
||||
update_alarm_sent_at(Reason) ->
|
||||
erlang:put(?ALARM_SENT(Reason), timenow()),
|
||||
ok.
|
||||
remove_alarm_sent_at(Reason) ->
|
||||
erlang:erase(?ALARM_SENT(Reason)),
|
||||
ok.
|
||||
get_alarm_sent_at(Reason) ->
|
||||
case erlang:get(?ALARM_SENT(Reason)) of
|
||||
undefined -> 0;
|
||||
LastSentAt -> LastSentAt
|
||||
end.
|
||||
|
||||
is_alarm_allowed_clear(Reason, _ForceClear = true) ->
|
||||
has_alarm_sent(Reason);
|
||||
is_alarm_allowed_clear(Reason, _ForceClear = false) ->
|
||||
%% only sent clears when the alarm was not triggered in the last
|
||||
%% ?CONFIRM_CLEAR_INTERVAL time
|
||||
case timenow() - get_alarm_sent_at(Reason) of
|
||||
Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true;
|
||||
_ -> false
|
||||
end.
|
||||
|
||||
timenow() ->
|
||||
erlang:system_time(millisecond).
|
||||
|
||||
%%==============================================================================
|
||||
%% Alarm message
|
||||
%%==============================================================================
|
||||
tcp_congestion_alarm_details(Socket, Transport, Channel) ->
|
||||
ProcInfo = process_info(self(), ?PROC_INFO_KEYS),
|
||||
BasicInfo = [{pid, list_to_binary(pid_to_list(self()))} | ProcInfo],
|
||||
Stat = case Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS) of
|
||||
{ok, Stat0} -> Stat0;
|
||||
{error, _} -> []
|
||||
end,
|
||||
Opts = case Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS) of
|
||||
{ok, Opts0} -> Opts0;
|
||||
{error, _} -> []
|
||||
end,
|
||||
SockInfo = Stat ++ Opts,
|
||||
ConnInfo = [conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS],
|
||||
maps:from_list(BasicInfo ++ ConnInfo ++ SockInfo).
|
||||
|
||||
conn_info(Key, Channel) when Key =:= sockname; Key =:= peername ->
|
||||
{IPStr, Port} = emqx_channel:info(Key, Channel),
|
||||
{Key, iolist_to_binary([inet:ntoa(IPStr), ":", integer_to_list(Port)])};
|
||||
conn_info(Key, Channel) ->
|
||||
{Key, emqx_channel:info(Key, Channel)}.
|
|
@ -29,7 +29,7 @@
|
|||
-compile(nowarn_export_all).
|
||||
-endif.
|
||||
|
||||
-elvis([{elvis_style, invalid_dynamic_call, #{ ignore => [emqx_connection]}}]).
|
||||
-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_connection]}}]).
|
||||
|
||||
%% API
|
||||
-export([ start_link/3
|
||||
|
@ -276,7 +276,7 @@ recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
|||
Msg ->
|
||||
process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State))
|
||||
after
|
||||
IdleTimeout ->
|
||||
IdleTimeout + 100 ->
|
||||
hibernate(Parent, cancel_stats_timer(State))
|
||||
end.
|
||||
|
||||
|
@ -389,8 +389,12 @@ handle_msg({Passive, _Sock}, State)
|
|||
handle_info(activate_socket, NState1);
|
||||
|
||||
handle_msg(Deliver = {deliver, _Topic, _Msg},
|
||||
State = #state{active_n = ActiveN}) ->
|
||||
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
|
||||
#state{active_n = MaxBatchSize, transport = Transport,
|
||||
socket = Socket, channel = Channel} = State) ->
|
||||
Delivers0 = emqx_misc:drain_deliver(MaxBatchSize),
|
||||
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
|
||||
length(Delivers0), MaxBatchSize),
|
||||
Delivers = [Deliver|Delivers0],
|
||||
with_channel(handle_deliver, [Delivers], State);
|
||||
|
||||
%% Something sent
|
||||
|
@ -443,10 +447,12 @@ handle_msg(Msg, State) ->
|
|||
%% Terminate
|
||||
|
||||
-spec terminate(any(), state()) -> no_return().
|
||||
terminate(Reason, State = #state{channel = Channel}) ->
|
||||
terminate(Reason, State = #state{channel = Channel, transport = Transport,
|
||||
socket = Socket}) ->
|
||||
?LOG(debug, "Terminated due to ~p", [Reason]),
|
||||
emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)),
|
||||
emqx_channel:terminate(Reason, Channel),
|
||||
Channel1 = emqx_channel:set_conn_state(disconnected, Channel),
|
||||
emqx_congestion:cancel_alarms(Socket, Transport, Channel1),
|
||||
emqx_channel:terminate(Reason, Channel1),
|
||||
_ = close_socket(State),
|
||||
exit(Reason).
|
||||
|
||||
|
@ -502,8 +508,12 @@ handle_timeout(_TRef, limit_timeout, State) ->
|
|||
},
|
||||
handle_info(activate_socket, NState);
|
||||
|
||||
handle_timeout(_TRef, emit_stats, State =
|
||||
#state{channel = Channel}) ->
|
||||
handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize,
|
||||
channel = Channel, transport = Transport, socket = Socket}) ->
|
||||
{_, MsgQLen} = erlang:process_info(self(), message_queue_len),
|
||||
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel, true),
|
||||
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
|
||||
MsgQLen, MaxBatchSize, true),
|
||||
ClientId = emqx_channel:info(clientid, Channel),
|
||||
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||
{ok, State#state{stats_timer = undefined}};
|
||||
|
@ -616,7 +626,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
|
|||
Oct = iolist_size(IoData),
|
||||
ok = emqx_metrics:inc('bytes.sent', Oct),
|
||||
inc_counter(outgoing_bytes, Oct),
|
||||
maybe_warn_congestion(Socket, Transport, Channel),
|
||||
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel),
|
||||
case Transport:async_send(Socket, IoData, [nosuspend]) of
|
||||
ok -> ok;
|
||||
Error = {error, _Reason} ->
|
||||
|
@ -625,48 +635,6 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
|
|||
ok
|
||||
end.
|
||||
|
||||
maybe_warn_congestion(Socket, Transport, Channel) ->
|
||||
IsCongestAlarmSet = is_congestion_alarm_set(),
|
||||
case is_congested(Socket, Transport) of
|
||||
true when not IsCongestAlarmSet ->
|
||||
ok = set_congestion_alarm(),
|
||||
emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel),
|
||||
tcp_congestion_alarm_details(Socket, Transport, Channel));
|
||||
false when IsCongestAlarmSet ->
|
||||
ok = clear_congestion_alarm(),
|
||||
emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel));
|
||||
_ -> ok
|
||||
end.
|
||||
|
||||
is_congested(Socket, Transport) ->
|
||||
case Transport:getstat(Socket, [send_pend]) of
|
||||
{ok, [{send_pend, N}]} when N > 0 -> true;
|
||||
_ -> false
|
||||
end.
|
||||
|
||||
is_congestion_alarm_set() ->
|
||||
case erlang:get(conn_congested) of
|
||||
true -> true;
|
||||
_ -> false
|
||||
end.
|
||||
set_congestion_alarm() ->
|
||||
erlang:put(conn_congested, true), ok.
|
||||
clear_congestion_alarm() ->
|
||||
erlang:put(conn_congested, false), ok.
|
||||
|
||||
tcp_congestion_alarm_details(Socket, Transport, Channel) ->
|
||||
{ok, Stat} = Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS),
|
||||
{ok, Opts} = Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS),
|
||||
SockInfo = maps:from_list(Stat ++ Opts),
|
||||
ConnInfo = maps:from_list([conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS]),
|
||||
maps:merge(ConnInfo, SockInfo).
|
||||
|
||||
conn_info(Key, Channel) when Key =:= sockname; Key =:= peername ->
|
||||
{IPStr, Port} = emqx_channel:info(Key, Channel),
|
||||
{Key, iolist_to_binary([inet:ntoa(IPStr), ":", integer_to_list(Port)])};
|
||||
conn_info(Key, Channel) ->
|
||||
{Key, emqx_channel:info(Key, Channel)}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Handle Info
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
-include("logger.hrl").
|
||||
-include("types.hrl").
|
||||
-include("emqx_mqtt.hrl").
|
||||
-include("emqx.hrl").
|
||||
|
||||
-logger_header("[Metrics]").
|
||||
|
||||
|
|
|
@ -145,12 +145,12 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer,
|
|||
case emqx_vm:cpu_util() of %% TODO: should be improved?
|
||||
0 ->
|
||||
State#{timer := undefined};
|
||||
Busy when Busy / 100 >= CPUHighWatermark ->
|
||||
Busy when Busy >= CPUHighWatermark ->
|
||||
emqx_alarm:activate(high_cpu_usage, #{usage => Busy,
|
||||
high_watermark => CPUHighWatermark,
|
||||
low_watermark => CPULowWatermark}),
|
||||
ensure_check_timer(State);
|
||||
Busy when Busy / 100 =< CPULowWatermark ->
|
||||
Busy when Busy =< CPULowWatermark ->
|
||||
emqx_alarm:deactivate(high_cpu_usage),
|
||||
ensure_check_timer(State);
|
||||
_Busy ->
|
||||
|
|
|
@ -56,7 +56,7 @@ t_clean_acl_cache(_) ->
|
|||
emqtt:stop(Client).
|
||||
|
||||
% optimize??
|
||||
t_reload_aclfile_and_cleanall(Config) ->
|
||||
t_reload_aclfile_and_cleanall(_Config) ->
|
||||
|
||||
RasieMsg = fun() -> Self = self(), #{puback => fun(Msg) -> Self ! {puback, Msg} end,
|
||||
disconnected => fun(_) -> ok end,
|
||||
|
@ -79,27 +79,6 @@ t_reload_aclfile_and_cleanall(Config) ->
|
|||
%% Check acl cache list
|
||||
[ClientPid] = emqx_cm:lookup_channels(<<"emqx_c">>),
|
||||
?assert(length(gen_server:call(ClientPid, list_acl_cache)) > 0),
|
||||
|
||||
%% Update acl file and reload mod_acl_internal
|
||||
Path = filename:join([testdir(proplists:get_value(data_dir, Config)), "acl2.conf"]),
|
||||
ok = file:write_file(Path, <<"{deny, all}.">>),
|
||||
OldPath = emqx:get_env(acl_file),
|
||||
% application:set_env(emqx, acl_file, Path),
|
||||
emqx_mod_acl_internal:reload([{acl_file, Path}]),
|
||||
|
||||
?assert(length(gen_server:call(ClientPid, list_acl_cache)) == 0),
|
||||
{ok, PktId2} = emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, qos1),
|
||||
|
||||
receive
|
||||
{puback, #{packet_id := PktId2, reason_code := Rc2}} ->
|
||||
%% Not authorized
|
||||
?assertEqual(16#87, Rc2);
|
||||
_ ->
|
||||
?assert(false)
|
||||
end,
|
||||
application:set_env(emqx, acl_file, OldPath),
|
||||
file:delete(Path),
|
||||
emqx_mod_acl_internal:reload([{acl_file, OldPath}]),
|
||||
emqtt:stop(Client).
|
||||
|
||||
%% @private
|
||||
|
|
|
@ -54,6 +54,7 @@ init_per_suite(Config) ->
|
|||
|
||||
ok = meck:expect(emqx_alarm, activate, fun(_, _) -> ok end),
|
||||
ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end),
|
||||
ok = meck:expect(emqx_alarm, deactivate, fun(_, _) -> ok end),
|
||||
|
||||
Config.
|
||||
|
||||
|
@ -77,6 +78,9 @@ init_per_testcase(_TestCase, Config) ->
|
|||
(peercert, [sock]) -> undefined
|
||||
end),
|
||||
ok = meck:expect(emqx_transport, setopts, fun(_Sock, _Opts) -> ok end),
|
||||
ok = meck:expect(emqx_transport, getopts, fun(_Sock, Options) ->
|
||||
{ok, [{K, 0} || K <- Options]}
|
||||
end),
|
||||
ok = meck:expect(emqx_transport, getstat, fun(_Sock, Options) ->
|
||||
{ok, [{K, 0} || K <- Options]}
|
||||
end),
|
||||
|
|
|
@ -183,11 +183,7 @@ t_batch_subscribe(_) ->
|
|||
{ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"batch_test">>}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
application:set_env(emqx, enable_acl_cache, false),
|
||||
TempAcl = emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_temp.conf"),
|
||||
file:write_file(TempAcl, "{deny, {client, \"batch_test\"}, subscribe,
|
||||
[\"t1\", \"t2\", \"t3\"]}.\n"),
|
||||
timer:sleep(10),
|
||||
emqx_mod_acl_internal:reload([{acl_file, TempAcl}]),
|
||||
application:set_env(emqx, acl_nomatch, deny),
|
||||
{ok, _, [?RC_NOT_AUTHORIZED,
|
||||
?RC_NOT_AUTHORIZED,
|
||||
?RC_NOT_AUTHORIZED]} = emqtt:subscribe(Client, [{<<"t1">>, qos1},
|
||||
|
@ -198,7 +194,7 @@ t_batch_subscribe(_) ->
|
|||
?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(Client, [<<"t1">>,
|
||||
<<"t2">>,
|
||||
<<"t3">>]),
|
||||
file:delete(TempAcl),
|
||||
application:set_env(emqx, acl_nomatch, allow),
|
||||
emqtt:disconnect(Client).
|
||||
|
||||
t_connect_will_retain(_) ->
|
||||
|
@ -336,64 +332,63 @@ t_connect_session_expiry_interval(_) ->
|
|||
ok = emqtt:disconnect(Client3).
|
||||
|
||||
%% [MQTT-3.1.3-9]
|
||||
t_connect_will_delay_interval(_) ->
|
||||
process_flag(trap_exit, true),
|
||||
Topic = nth(1, ?TOPICS),
|
||||
Payload = "will message",
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
|
||||
|
||||
{ok, Client2} = emqtt:start_link([
|
||||
{clientid, <<"t_connect_will_delay_interval">>},
|
||||
{proto_ver, v5},
|
||||
{clean_start, true},
|
||||
{will_flag, true},
|
||||
{will_qos, 2},
|
||||
{will_topic, Topic},
|
||||
{will_payload, Payload},
|
||||
{will_props, #{'Will-Delay-Interval' => 3}},
|
||||
{properties, #{'Session-Expiry-Interval' => 7200}}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
%% terminate the client without sending the DISCONNECT
|
||||
emqtt:stop(Client2),
|
||||
%% should not get the will msg in 2.5s
|
||||
timer:sleep(1500),
|
||||
?assertEqual(0, length(receive_messages(1))),
|
||||
%% should get the will msg in 4.5s
|
||||
timer:sleep(1000),
|
||||
?assertEqual(1, length(receive_messages(1))),
|
||||
|
||||
%% try again, but let the session expire quickly
|
||||
{ok, Client3} = emqtt:start_link([
|
||||
{clientid, <<"t_connect_will_delay_interval">>},
|
||||
{proto_ver, v5},
|
||||
{clean_start, true},
|
||||
{will_flag, true},
|
||||
{will_qos, 2},
|
||||
{will_topic, Topic},
|
||||
{will_payload, Payload},
|
||||
{will_props, #{'Will-Delay-Interval' => 7200}},
|
||||
{properties, #{'Session-Expiry-Interval' => 3}}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client3),
|
||||
%% terminate the client without sending the DISCONNECT
|
||||
emqtt:stop(Client3),
|
||||
%% should not get the will msg in 2.5s
|
||||
timer:sleep(1500),
|
||||
?assertEqual(0, length(receive_messages(1))),
|
||||
%% should get the will msg in 4.5s
|
||||
timer:sleep(1000),
|
||||
?assertEqual(1, length(receive_messages(1))),
|
||||
|
||||
ok = emqtt:disconnect(Client1),
|
||||
|
||||
receive {'EXIT', _, _} -> ok
|
||||
after 100 -> ok
|
||||
end,
|
||||
process_flag(trap_exit, false).
|
||||
%% !!!REFACTOR NEED:
|
||||
%t_connect_will_delay_interval(_) ->
|
||||
% process_flag(trap_exit, true),
|
||||
% Topic = nth(1, ?TOPICS),
|
||||
% Payload = "will message",
|
||||
%
|
||||
% {ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
% {ok, _} = emqtt:connect(Client1),
|
||||
% {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
|
||||
%
|
||||
% {ok, Client2} = emqtt:start_link([
|
||||
% {clientid, <<"t_connect_will_delay_interval">>},
|
||||
% {proto_ver, v5},
|
||||
% {clean_start, true},
|
||||
% {will_flag, true},
|
||||
% {will_qos, 2},
|
||||
% {will_topic, Topic},
|
||||
% {will_payload, Payload},
|
||||
% {will_props, #{'Will-Delay-Interval' => 3}},
|
||||
% {properties, #{'Session-Expiry-Interval' => 7200}},
|
||||
% {keepalive, 2}
|
||||
% ]),
|
||||
% {ok, _} = emqtt:connect(Client2),
|
||||
% timer:sleep(50),
|
||||
% erlang:exit(Client2, kill),
|
||||
% timer:sleep(2000),
|
||||
% ?assertEqual(0, length(receive_messages(1))),
|
||||
% timer:sleep(5000),
|
||||
% ?assertEqual(1, length(receive_messages(1))),
|
||||
%
|
||||
% {ok, Client3} = emqtt:start_link([
|
||||
% {clientid, <<"t_connect_will_delay_interval">>},
|
||||
% {proto_ver, v5},
|
||||
% {clean_start, true},
|
||||
% {will_flag, true},
|
||||
% {will_qos, 2},
|
||||
% {will_topic, Topic},
|
||||
% {will_payload, Payload},
|
||||
% {will_props, #{'Will-Delay-Interval' => 7200}},
|
||||
% {properties, #{'Session-Expiry-Interval' => 3}},
|
||||
% {keepalive, 2}
|
||||
% ]),
|
||||
% {ok, _} = emqtt:connect(Client3),
|
||||
% timer:sleep(50),
|
||||
% erlang:exit(Client3, kill),
|
||||
%
|
||||
% timer:sleep(2000),
|
||||
% ?assertEqual(0, length(receive_messages(1))),
|
||||
% timer:sleep(5000),
|
||||
% ?assertEqual(1, length(receive_messages(1))),
|
||||
%
|
||||
% ok = emqtt:disconnect(Client1),
|
||||
%
|
||||
% receive {'EXIT', _, _} -> ok
|
||||
% after 100 -> ok
|
||||
% end,
|
||||
% process_flag(trap_exit, false).
|
||||
|
||||
%% [MQTT-3.1.4-3]
|
||||
t_connect_duplicate_clientid(_) ->
|
||||
|
|
Loading…
Reference in New Issue