Auto-pull-request-on-2021-01-29 (#4114)
* fix(share sub): fix the issue that the number of subscriptions dropped to 0 during the picking subscriber and caused a crash * Connection Busy Alarms (#3992) feat(emqx_connection): improve port_busy alarm * fix(emqx_connection): tune the congestion alarm params * chore(deps): upgrade esockd to 5.7.5 * fix(appup): correct the appup file
This commit is contained in:
parent
2e7ec25ae2
commit
6c1129dc6a
|
@ -6,7 +6,7 @@
|
||||||
[{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
|
[{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
|
||||||
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
|
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
|
||||||
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
|
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
|
||||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.4"}}},
|
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}},
|
||||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.5"}}},
|
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.5"}}},
|
||||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
|
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
|
||||||
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
{"4.2.0", [
|
{"4.2.0", [
|
||||||
|
{add_module, emqx_congestion},
|
||||||
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
||||||
|
@ -11,37 +12,68 @@
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{update, emqx_connection, {advanced, []}},
|
{update, emqx_connection, {advanced, []}},
|
||||||
{update, emqx_ws_connection, {advanced, []}},
|
{update, emqx_ws_connection, {advanced, []}},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
{resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]}
|
{resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]}
|
||||||
]},
|
]},
|
||||||
{"4.2.1", [
|
{"4.2.1", [
|
||||||
|
{add_module, emqx_congestion},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{update, emqx_connection, {advanced, []}},
|
{update, emqx_connection, {advanced, []}},
|
||||||
{update, emqx_ws_connection, {advanced, []}},
|
{update, emqx_ws_connection, {advanced, []}},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]}
|
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]}
|
||||||
]},
|
]},
|
||||||
|
{<<"4.2.[23]">>, [
|
||||||
|
{add_module, emqx_congestion},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
{"4.2.0", [
|
{"4.2.0", [
|
||||||
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{update, emqx_connection, {advanced, []}},
|
{update, emqx_connection, {advanced, []}},
|
||||||
{update, emqx_ws_connection, {advanced, []}},
|
{update, emqx_ws_connection, {advanced, []}},
|
||||||
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]}
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
||||||
|
{delete_module, emqx_congestion}
|
||||||
]},
|
]},
|
||||||
{"4.2.1", [
|
{"4.2.1", [
|
||||||
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{update, emqx_connection, {advanced, []}},
|
{update, emqx_connection, {advanced, []}},
|
||||||
{update, emqx_ws_connection, {advanced, []}},
|
{update, emqx_ws_connection, {advanced, []}},
|
||||||
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]}
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
||||||
|
{delete_module, emqx_congestion}
|
||||||
|
]},
|
||||||
|
{<<"4.2.[23]">>, [
|
||||||
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{delete_module, emqx_congestion}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
]
|
]
|
||||||
|
|
|
@ -37,6 +37,7 @@
|
||||||
-export([ activate/1
|
-export([ activate/1
|
||||||
, activate/2
|
, activate/2
|
||||||
, deactivate/1
|
, deactivate/1
|
||||||
|
, deactivate/2
|
||||||
, delete_all_deactivated_alarms/0
|
, delete_all_deactivated_alarms/0
|
||||||
, get_alarms/0
|
, get_alarms/0
|
||||||
, get_alarms/1
|
, get_alarms/1
|
||||||
|
@ -132,7 +133,10 @@ activate(Name, Details) ->
|
||||||
gen_server:call(?MODULE, {activate_alarm, Name, Details}).
|
gen_server:call(?MODULE, {activate_alarm, Name, Details}).
|
||||||
|
|
||||||
deactivate(Name) ->
|
deactivate(Name) ->
|
||||||
gen_server:call(?MODULE, {deactivate_alarm, Name}).
|
gen_server:call(?MODULE, {deactivate_alarm, Name, no_details}).
|
||||||
|
|
||||||
|
deactivate(Name, Details) ->
|
||||||
|
gen_server:call(?MODULE, {deactivate_alarm, Name, Details}).
|
||||||
|
|
||||||
delete_all_deactivated_alarms() ->
|
delete_all_deactivated_alarms() ->
|
||||||
gen_server:call(?MODULE, delete_all_deactivated_alarms).
|
gen_server:call(?MODULE, delete_all_deactivated_alarms).
|
||||||
|
@ -179,34 +183,13 @@ handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Act
|
||||||
{reply, ok, State}
|
{reply, ok, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call({deactivate_alarm, Name}, _From, State = #state{actions = Actions,
|
handle_call({deactivate_alarm, Name, Details}, _From, State = #state{
|
||||||
size_limit = SizeLimit}) ->
|
actions = Actions, size_limit = SizeLimit}) ->
|
||||||
case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of
|
case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of
|
||||||
[] ->
|
[] ->
|
||||||
{reply, {error, not_found}, State};
|
{reply, {error, not_found}, State};
|
||||||
[#activated_alarm{name = Name,
|
[Alarm] ->
|
||||||
details = Details,
|
deactivate_alarm(Details, SizeLimit, Actions, Alarm),
|
||||||
message = Message,
|
|
||||||
activate_at = ActivateAt}] ->
|
|
||||||
case SizeLimit > 0 andalso (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of
|
|
||||||
true ->
|
|
||||||
case mnesia:dirty_first(?DEACTIVATED_ALARM) of
|
|
||||||
'$end_of_table' ->
|
|
||||||
ok;
|
|
||||||
ActivateAt2 ->
|
|
||||||
mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2)
|
|
||||||
end;
|
|
||||||
false ->
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
Alarm = #deactivated_alarm{activate_at = ActivateAt,
|
|
||||||
name = Name,
|
|
||||||
details = Details,
|
|
||||||
message = Message,
|
|
||||||
deactivate_at = erlang:system_time(microsecond)},
|
|
||||||
mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
|
|
||||||
mnesia:dirty_write(?DEACTIVATED_ALARM, Alarm),
|
|
||||||
do_actions(deactivate, Alarm, Actions),
|
|
||||||
{reply, ok, State}
|
{reply, ok, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -254,18 +237,50 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{
|
||||||
|
activate_at = ActivateAt, name = Name, details = Details0,
|
||||||
|
message = Msg0}) ->
|
||||||
|
case SizeLimit > 0 andalso
|
||||||
|
(mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of
|
||||||
|
true ->
|
||||||
|
case mnesia:dirty_first(?DEACTIVATED_ALARM) of
|
||||||
|
'$end_of_table' -> ok;
|
||||||
|
ActivateAt2 ->
|
||||||
|
mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2)
|
||||||
|
end;
|
||||||
|
false -> ok
|
||||||
|
end,
|
||||||
|
HistoryAlarm = make_deactivated_alarm(ActivateAt, Name, Details0, Msg0,
|
||||||
|
erlang:system_time(microsecond)),
|
||||||
|
DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details,
|
||||||
|
normalize_message(Name, Details),
|
||||||
|
erlang:system_time(microsecond)),
|
||||||
|
mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm),
|
||||||
|
mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
|
||||||
|
do_actions(deactivate, DeActAlarm, Actions).
|
||||||
|
|
||||||
|
make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) ->
|
||||||
|
#deactivated_alarm{
|
||||||
|
activate_at = ActivateAt,
|
||||||
|
name = Name,
|
||||||
|
details = Details,
|
||||||
|
message = Message,
|
||||||
|
deactivate_at = DeActivateAt}.
|
||||||
|
|
||||||
deactivate_all_alarms() ->
|
deactivate_all_alarms() ->
|
||||||
lists:foreach(fun(#activated_alarm{name = Name,
|
lists:foreach(
|
||||||
details = Details,
|
fun(#activated_alarm{name = Name,
|
||||||
message = Message,
|
details = Details,
|
||||||
activate_at = ActivateAt}) ->
|
message = Message,
|
||||||
mnesia:dirty_write(?DEACTIVATED_ALARM,
|
activate_at = ActivateAt}) ->
|
||||||
#deactivated_alarm{activate_at = ActivateAt,
|
mnesia:dirty_write(?DEACTIVATED_ALARM,
|
||||||
name = Name,
|
#deactivated_alarm{
|
||||||
details = Details,
|
activate_at = ActivateAt,
|
||||||
message = Message,
|
name = Name,
|
||||||
deactivate_at = erlang:system_time(microsecond)})
|
details = Details,
|
||||||
end, ets:tab2list(?ACTIVATED_ALARM)),
|
message = Message,
|
||||||
|
deactivate_at = erlang:system_time(microsecond)})
|
||||||
|
end, ets:tab2list(?ACTIVATED_ALARM)),
|
||||||
mnesia:clear_table(?ACTIVATED_ALARM).
|
mnesia:clear_table(?ACTIVATED_ALARM).
|
||||||
|
|
||||||
ensure_delete_timer(State = #state{validity_period = ValidityPeriod}) ->
|
ensure_delete_timer(State = #state{validity_period = ValidityPeriod}) ->
|
||||||
|
@ -332,6 +347,8 @@ normalize(#deactivated_alarm{activate_at = ActivateAt,
|
||||||
deactivate_at => DeactivateAt,
|
deactivate_at => DeactivateAt,
|
||||||
activated => false}.
|
activated => false}.
|
||||||
|
|
||||||
|
normalize_message(Name, no_details) ->
|
||||||
|
list_to_binary(io_lib:format("~p", [Name]));
|
||||||
normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) ->
|
normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) ->
|
||||||
list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark]));
|
list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark]));
|
||||||
normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) ->
|
normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) ->
|
||||||
|
@ -344,8 +361,7 @@ normalize_message(partition, #{occurred := Node}) ->
|
||||||
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
|
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
|
||||||
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
|
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
|
||||||
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
|
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
|
||||||
normalize_message(<<"mqtt_conn/congested/", ClientId/binary>>, _) ->
|
normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) ->
|
||||||
list_to_binary(io_lib:format("MQTT connection for clientid '~s' is congested", [ClientId]));
|
list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info]));
|
||||||
normalize_message(_Name, _UnknownDetails) ->
|
normalize_message(_Name, _UnknownDetails) ->
|
||||||
<<"Unknown alarm">>.
|
<<"Unknown alarm">>.
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,7 @@
|
||||||
|
|
||||||
-export([ info/1
|
-export([ info/1
|
||||||
, info/2
|
, info/2
|
||||||
|
, set_conn_state/2
|
||||||
, stats/1
|
, stats/1
|
||||||
, caps/1
|
, caps/1
|
||||||
]).
|
]).
|
||||||
|
@ -87,7 +88,7 @@
|
||||||
pendings :: list()
|
pendings :: list()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-opaque(channel() :: #channel{}).
|
-type(channel() :: #channel{}).
|
||||||
|
|
||||||
-type(conn_state() :: idle | connecting | connected | disconnected).
|
-type(conn_state() :: idle | connecting | connected | disconnected).
|
||||||
|
|
||||||
|
@ -127,26 +128,26 @@ info(Keys, Channel) when is_list(Keys) ->
|
||||||
[{Key, info(Key, Channel)} || Key <- Keys];
|
[{Key, info(Key, Channel)} || Key <- Keys];
|
||||||
info(conninfo, #channel{conninfo = ConnInfo}) ->
|
info(conninfo, #channel{conninfo = ConnInfo}) ->
|
||||||
ConnInfo;
|
ConnInfo;
|
||||||
info(zone, #channel{clientinfo = #{zone := Zone}}) ->
|
info(socktype, #channel{conninfo = ConnInfo}) ->
|
||||||
Zone;
|
maps:get(socktype, ConnInfo, undefined);
|
||||||
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
|
info(peername, #channel{conninfo = ConnInfo}) ->
|
||||||
ClientId;
|
maps:get(peername, ConnInfo, undefined);
|
||||||
info(username, #channel{clientinfo = #{username := Username}}) ->
|
info(sockname, #channel{conninfo = ConnInfo}) ->
|
||||||
Username;
|
maps:get(sockname, ConnInfo, undefined);
|
||||||
info(socktype, #channel{conninfo = #{socktype := SockType}}) ->
|
info(proto_name, #channel{conninfo = ConnInfo}) ->
|
||||||
SockType;
|
maps:get(proto_name, ConnInfo, undefined);
|
||||||
info(peername, #channel{conninfo = #{peername := Peername}}) ->
|
info(proto_ver, #channel{conninfo = ConnInfo}) ->
|
||||||
Peername;
|
maps:get(proto_ver, ConnInfo, undefined);
|
||||||
info(sockname, #channel{conninfo = #{sockname := Sockname}}) ->
|
info(connected_at, #channel{conninfo = ConnInfo}) ->
|
||||||
Sockname;
|
maps:get(connected_at, ConnInfo, undefined);
|
||||||
info(proto_name, #channel{conninfo = #{proto_name := ProtoName}}) ->
|
|
||||||
ProtoName;
|
|
||||||
info(proto_ver, #channel{conninfo = #{proto_ver := ProtoVer}}) ->
|
|
||||||
ProtoVer;
|
|
||||||
info(connected_at, #channel{conninfo = #{connected_at := ConnectedAt}}) ->
|
|
||||||
ConnectedAt;
|
|
||||||
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
|
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
|
||||||
ClientInfo;
|
ClientInfo;
|
||||||
|
info(zone, #channel{clientinfo = ClientInfo}) ->
|
||||||
|
maps:get(zone, ClientInfo, undefined);
|
||||||
|
info(clientid, #channel{clientinfo = ClientInfo}) ->
|
||||||
|
maps:get(clientid, ClientInfo, undefined);
|
||||||
|
info(username, #channel{clientinfo = ClientInfo}) ->
|
||||||
|
maps:get(username, ClientInfo, undefined);
|
||||||
info(session, #channel{session = Session}) ->
|
info(session, #channel{session = Session}) ->
|
||||||
maybe_apply(fun emqx_session:info/1, Session);
|
maybe_apply(fun emqx_session:info/1, Session);
|
||||||
info(conn_state, #channel{conn_state = ConnState}) ->
|
info(conn_state, #channel{conn_state = ConnState}) ->
|
||||||
|
@ -163,6 +164,9 @@ info(alias_maximum, #channel{alias_maximum = Limits}) ->
|
||||||
Limits;
|
Limits;
|
||||||
info(timers, #channel{timers = Timers}) -> Timers.
|
info(timers, #channel{timers = Timers}) -> Timers.
|
||||||
|
|
||||||
|
set_conn_state(ConnState, Channel) ->
|
||||||
|
Channel#channel{conn_state = ConnState}.
|
||||||
|
|
||||||
%% TODO: Add more stats.
|
%% TODO: Add more stats.
|
||||||
-spec(stats(channel()) -> emqx_types:stats()).
|
-spec(stats(channel()) -> emqx_types:stats()).
|
||||||
stats(#channel{session = Session})->
|
stats(#channel{session = Session})->
|
||||||
|
@ -1290,7 +1294,7 @@ packing_alias(Packet = #mqtt_packet{
|
||||||
},
|
},
|
||||||
Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases, alias_maximum = Limits}) ->
|
Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases, alias_maximum = Limits}) ->
|
||||||
case find_alias(outbound, Topic, TopicAliases) of
|
case find_alias(outbound, Topic, TopicAliases) of
|
||||||
{ok, AliasId} ->
|
{ok, AliasId} ->
|
||||||
NPublish = Publish#mqtt_packet_publish{
|
NPublish = Publish#mqtt_packet_publish{
|
||||||
topic_name = <<>>,
|
topic_name = <<>>,
|
||||||
properties = maps:merge(Prop, #{'Topic-Alias' => AliasId})
|
properties = maps:merge(Prop, #{'Topic-Alias' => AliasId})
|
||||||
|
|
|
@ -0,0 +1,161 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_congestion).
|
||||||
|
|
||||||
|
-export([ maybe_alarm_port_busy/3
|
||||||
|
, maybe_alarm_port_busy/4
|
||||||
|
, maybe_alarm_too_many_publish/5
|
||||||
|
, maybe_alarm_too_many_publish/6
|
||||||
|
, cancel_alarms/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
-define(ALARM_CONN_CONGEST(Channel, Reason),
|
||||||
|
list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s/~s", [emqx_channel:info(clientid, Channel),
|
||||||
|
maps:get(username, emqx_channel:info(clientinfo, Channel), <<"undefined">>),
|
||||||
|
Reason]))).
|
||||||
|
|
||||||
|
-define(ALARM_CONN_INFO_KEYS, [
|
||||||
|
socktype, sockname, peername, clientid, username, proto_name, proto_ver,
|
||||||
|
connected_at, conn_state
|
||||||
|
]).
|
||||||
|
-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
|
||||||
|
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
|
||||||
|
-define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]).
|
||||||
|
-define(ALARM_SENT(REASON), {alarm_sent, REASON}).
|
||||||
|
-define(ALL_ALARM_REASONS, [port_busy, too_many_publish]).
|
||||||
|
-define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}).
|
||||||
|
-define(CONFIRM_CLEAR_INTERVAL, 10000).
|
||||||
|
|
||||||
|
maybe_alarm_port_busy(Socket, Transport, Channel) ->
|
||||||
|
maybe_alarm_port_busy(Socket, Transport, Channel, false).
|
||||||
|
|
||||||
|
maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) ->
|
||||||
|
case is_tcp_congested(Socket, Transport) of
|
||||||
|
true -> alarm_congestion(Socket, Transport, Channel, port_busy);
|
||||||
|
false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy,
|
||||||
|
ForceClear)
|
||||||
|
end.
|
||||||
|
|
||||||
|
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
|
||||||
|
MaxBatchSize) ->
|
||||||
|
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
|
||||||
|
MaxBatchSize, false).
|
||||||
|
|
||||||
|
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
|
||||||
|
PubMsgCount = _MaxBatchSize, _ForceClear) ->
|
||||||
|
%% we only alarm it when the process is "too busy"
|
||||||
|
alarm_congestion(Socket, Transport, Channel, too_many_publish);
|
||||||
|
maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount,
|
||||||
|
_MaxBatchSize, ForceClear) when PubMsgCount == 0 ->
|
||||||
|
%% but we clear the alarm until it is really "idle", to avoid sending
|
||||||
|
%% alarms and clears too frequently
|
||||||
|
cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish,
|
||||||
|
ForceClear);
|
||||||
|
maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount,
|
||||||
|
_MaxBatchSize, _ForceClear) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
cancel_alarms(Socket, Transport, Channel) ->
|
||||||
|
lists:foreach(fun(Reason) ->
|
||||||
|
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason)
|
||||||
|
end, ?ALL_ALARM_REASONS).
|
||||||
|
|
||||||
|
alarm_congestion(Socket, Transport, Channel, Reason) ->
|
||||||
|
case has_alarm_sent(Reason) of
|
||||||
|
false -> do_alarm_congestion(Socket, Transport, Channel, Reason);
|
||||||
|
true ->
|
||||||
|
%% pretend we have sent an alarm again
|
||||||
|
update_alarm_sent_at(Reason)
|
||||||
|
end.
|
||||||
|
|
||||||
|
cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) ->
|
||||||
|
case is_alarm_allowed_clear(Reason, ForceClear) of
|
||||||
|
true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
|
||||||
|
false -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_alarm_congestion(Socket, Transport, Channel, Reason) ->
|
||||||
|
ok = update_alarm_sent_at(Reason),
|
||||||
|
AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel),
|
||||||
|
emqx_alarm:activate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
|
||||||
|
ok = remove_alarm_sent_at(Reason),
|
||||||
|
AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel),
|
||||||
|
emqx_alarm:deactivate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
is_tcp_congested(Socket, Transport) ->
|
||||||
|
case Transport:getstat(Socket, [send_pend]) of
|
||||||
|
{ok, [{send_pend, N}]} when N > 0 -> true;
|
||||||
|
_ -> false
|
||||||
|
end.
|
||||||
|
|
||||||
|
has_alarm_sent(Reason) ->
|
||||||
|
case get_alarm_sent_at(Reason) of
|
||||||
|
0 -> false;
|
||||||
|
_ -> true
|
||||||
|
end.
|
||||||
|
update_alarm_sent_at(Reason) ->
|
||||||
|
erlang:put(?ALARM_SENT(Reason), timenow()),
|
||||||
|
ok.
|
||||||
|
remove_alarm_sent_at(Reason) ->
|
||||||
|
erlang:erase(?ALARM_SENT(Reason)),
|
||||||
|
ok.
|
||||||
|
get_alarm_sent_at(Reason) ->
|
||||||
|
case erlang:get(?ALARM_SENT(Reason)) of
|
||||||
|
undefined -> 0;
|
||||||
|
LastSentAt -> LastSentAt
|
||||||
|
end.
|
||||||
|
|
||||||
|
is_alarm_allowed_clear(Reason, _ForceClear = true) ->
|
||||||
|
has_alarm_sent(Reason);
|
||||||
|
is_alarm_allowed_clear(Reason, _ForceClear = false) ->
|
||||||
|
%% only sent clears when the alarm was not triggered in the last
|
||||||
|
%% ?CONFIRM_CLEAR_INTERVAL time
|
||||||
|
case timenow() - get_alarm_sent_at(Reason) of
|
||||||
|
Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true;
|
||||||
|
_ -> false
|
||||||
|
end.
|
||||||
|
|
||||||
|
timenow() ->
|
||||||
|
erlang:system_time(millisecond).
|
||||||
|
|
||||||
|
%%==============================================================================
|
||||||
|
%% Alarm message
|
||||||
|
%%==============================================================================
|
||||||
|
tcp_congestion_alarm_details(Socket, Transport, Channel) ->
|
||||||
|
ProcInfo = process_info(self(), ?PROC_INFO_KEYS),
|
||||||
|
BasicInfo = [{pid, list_to_binary(pid_to_list(self()))} | ProcInfo],
|
||||||
|
Stat = case Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS) of
|
||||||
|
{ok, Stat0} -> Stat0;
|
||||||
|
{error, _} -> []
|
||||||
|
end,
|
||||||
|
Opts = case Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS) of
|
||||||
|
{ok, Opts0} -> Opts0;
|
||||||
|
{error, _} -> []
|
||||||
|
end,
|
||||||
|
SockInfo = Stat ++ Opts,
|
||||||
|
ConnInfo = [conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS],
|
||||||
|
maps:from_list(BasicInfo ++ ConnInfo ++ SockInfo).
|
||||||
|
|
||||||
|
conn_info(Key, Channel) when Key =:= sockname; Key =:= peername ->
|
||||||
|
{IPStr, Port} = emqx_channel:info(Key, Channel),
|
||||||
|
{Key, iolist_to_binary([inet:ntoa(IPStr),":",integer_to_list(Port)])};
|
||||||
|
conn_info(Key, Channel) ->
|
||||||
|
{Key, emqx_channel:info(Key, Channel)}.
|
|
@ -103,17 +103,6 @@
|
||||||
|
|
||||||
-define(ENABLED(X), (X =/= undefined)).
|
-define(ENABLED(X), (X =/= undefined)).
|
||||||
|
|
||||||
-define(ALARM_TCP_CONGEST(Channel),
|
|
||||||
list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s", [emqx_channel:info(clientid, Channel), maps:get(username, emqx_channel:info(clientinfo, Channel), <<"undefined">>)]))).
|
|
||||||
|
|
||||||
|
|
||||||
-define(ALARM_CONN_INFO_KEYS, [
|
|
||||||
socktype, sockname, peername,
|
|
||||||
clientid, username, proto_name, proto_ver, connected_at
|
|
||||||
]).
|
|
||||||
-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
|
|
||||||
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
|
|
||||||
|
|
||||||
-dialyzer({no_match, [info/2]}).
|
-dialyzer({no_match, [info/2]}).
|
||||||
-dialyzer({nowarn_function, [ init/4
|
-dialyzer({nowarn_function, [ init/4
|
||||||
, init_state/3
|
, init_state/3
|
||||||
|
@ -272,7 +261,7 @@ recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
||||||
Msg ->
|
Msg ->
|
||||||
process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State))
|
process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State))
|
||||||
after
|
after
|
||||||
IdleTimeout ->
|
IdleTimeout + 100 ->
|
||||||
hibernate(Parent, cancel_stats_timer(State))
|
hibernate(Parent, cancel_stats_timer(State))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -385,8 +374,12 @@ handle_msg({Passive, _Sock}, State)
|
||||||
handle_info(activate_socket, NState1);
|
handle_info(activate_socket, NState1);
|
||||||
|
|
||||||
handle_msg(Deliver = {deliver, _Topic, _Msg},
|
handle_msg(Deliver = {deliver, _Topic, _Msg},
|
||||||
State = #state{active_n = ActiveN}) ->
|
#state{active_n = MaxBatchSize, transport = Transport,
|
||||||
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
|
socket = Socket, channel = Channel} = State) ->
|
||||||
|
Delivers0 = emqx_misc:drain_deliver(MaxBatchSize),
|
||||||
|
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
|
||||||
|
length(Delivers0), MaxBatchSize),
|
||||||
|
Delivers = [Deliver|Delivers0],
|
||||||
with_channel(handle_deliver, [Delivers], State);
|
with_channel(handle_deliver, [Delivers], State);
|
||||||
|
|
||||||
%% Something sent
|
%% Something sent
|
||||||
|
@ -438,10 +431,12 @@ handle_msg(Msg, State) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Terminate
|
%% Terminate
|
||||||
|
|
||||||
terminate(Reason, State = #state{channel = Channel}) ->
|
terminate(Reason, State = #state{channel = Channel, transport = Transport,
|
||||||
|
socket = Socket}) ->
|
||||||
?LOG(debug, "Terminated due to ~p", [Reason]),
|
?LOG(debug, "Terminated due to ~p", [Reason]),
|
||||||
emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)),
|
Channel1 = emqx_channel:set_conn_state(disconnected, Channel),
|
||||||
emqx_channel:terminate(Reason, Channel),
|
emqx_congestion:cancel_alarms(Socket, Transport, Channel1),
|
||||||
|
emqx_channel:terminate(Reason, Channel1),
|
||||||
close_socket(State),
|
close_socket(State),
|
||||||
exit(Reason).
|
exit(Reason).
|
||||||
|
|
||||||
|
@ -553,8 +548,12 @@ handle_timeout(_TRef, limit_timeout, State) ->
|
||||||
},
|
},
|
||||||
handle_info(activate_socket, NState);
|
handle_info(activate_socket, NState);
|
||||||
|
|
||||||
handle_timeout(_TRef, emit_stats, State =
|
handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize,
|
||||||
#state{channel = Channel}) ->
|
channel = Channel, transport = Transport, socket = Socket}) ->
|
||||||
|
{_, MsgQLen} = erlang:process_info(self(), message_queue_len),
|
||||||
|
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel, true),
|
||||||
|
emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel,
|
||||||
|
MsgQLen, MaxBatchSize, true),
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||||
{ok, State#state{stats_timer = undefined}};
|
{ok, State#state{stats_timer = undefined}};
|
||||||
|
@ -667,7 +666,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
|
||||||
Oct = iolist_size(IoData),
|
Oct = iolist_size(IoData),
|
||||||
ok = emqx_metrics:inc('bytes.sent', Oct),
|
ok = emqx_metrics:inc('bytes.sent', Oct),
|
||||||
emqx_pd:inc_counter(outgoing_bytes, Oct),
|
emqx_pd:inc_counter(outgoing_bytes, Oct),
|
||||||
maybe_warn_congestion(Socket, Transport, Channel),
|
emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel),
|
||||||
case Transport:async_send(Socket, IoData, [nosuspend]) of
|
case Transport:async_send(Socket, IoData, [nosuspend]) of
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
Error = {error, _Reason} ->
|
Error = {error, _Reason} ->
|
||||||
|
@ -676,48 +675,6 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
maybe_warn_congestion(Socket, Transport, Channel) ->
|
|
||||||
IsCongestAlarmSet = is_congestion_alarm_set(),
|
|
||||||
case is_congested(Socket, Transport) of
|
|
||||||
true when not IsCongestAlarmSet ->
|
|
||||||
ok = set_congestion_alarm(),
|
|
||||||
emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel),
|
|
||||||
tcp_congestion_alarm_details(Socket, Transport, Channel));
|
|
||||||
false when IsCongestAlarmSet ->
|
|
||||||
ok = clear_congestion_alarm(),
|
|
||||||
emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel));
|
|
||||||
_ -> ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
is_congested(Socket, Transport) ->
|
|
||||||
case Transport:getstat(Socket, [send_pend]) of
|
|
||||||
{ok, [{send_pend, N}]} when N > 0 -> true;
|
|
||||||
_ -> false
|
|
||||||
end.
|
|
||||||
|
|
||||||
is_congestion_alarm_set() ->
|
|
||||||
case erlang:get(conn_congested) of
|
|
||||||
true -> true;
|
|
||||||
_ -> false
|
|
||||||
end.
|
|
||||||
set_congestion_alarm() ->
|
|
||||||
erlang:put(conn_congested, true), ok.
|
|
||||||
clear_congestion_alarm() ->
|
|
||||||
erlang:put(conn_congested, false), ok.
|
|
||||||
|
|
||||||
tcp_congestion_alarm_details(Socket, Transport, Channel) ->
|
|
||||||
{ok, Stat} = Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS),
|
|
||||||
{ok, Opts} = Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS),
|
|
||||||
SockInfo = maps:from_list(Stat ++ Opts),
|
|
||||||
ConnInfo = maps:from_list([conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS]),
|
|
||||||
maps:merge(ConnInfo, SockInfo).
|
|
||||||
|
|
||||||
conn_info(Key, Channel) when Key =:= sockname; Key =:= peername ->
|
|
||||||
{IPStr, Port} = emqx_channel:info(Key, Channel),
|
|
||||||
{Key, iolist_to_binary([inet:ntoa(IPStr),":",integer_to_list(Port)])};
|
|
||||||
conn_info(Key, Channel) ->
|
|
||||||
{Key, emqx_channel:info(Key, Channel)}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handle Info
|
%% Handle Info
|
||||||
|
|
||||||
|
|
|
@ -145,12 +145,12 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer,
|
||||||
case emqx_vm:cpu_util() of %% TODO: should be improved?
|
case emqx_vm:cpu_util() of %% TODO: should be improved?
|
||||||
0 ->
|
0 ->
|
||||||
State#{timer := undefined};
|
State#{timer := undefined};
|
||||||
Busy when Busy / 100 >= CPUHighWatermark ->
|
Busy when Busy >= CPUHighWatermark ->
|
||||||
emqx_alarm:activate(high_cpu_usage, #{usage => Busy,
|
emqx_alarm:activate(high_cpu_usage, #{usage => Busy,
|
||||||
high_watermark => CPUHighWatermark,
|
high_watermark => CPUHighWatermark,
|
||||||
low_watermark => CPULowWatermark}),
|
low_watermark => CPULowWatermark}),
|
||||||
ensure_check_timer(State);
|
ensure_check_timer(State);
|
||||||
Busy when Busy / 100 =< CPULowWatermark ->
|
Busy when Busy =< CPULowWatermark ->
|
||||||
emqx_alarm:deactivate(high_cpu_usage),
|
emqx_alarm:deactivate(high_cpu_usage),
|
||||||
ensure_check_timer(State);
|
ensure_check_timer(State);
|
||||||
_Busy ->
|
_Busy ->
|
||||||
|
|
|
@ -246,7 +246,7 @@ pick(Strategy, ClientId, Group, Topic, FailedSubs) ->
|
||||||
do_pick(Strategy, ClientId, Group, Topic, FailedSubs) ->
|
do_pick(Strategy, ClientId, Group, Topic, FailedSubs) ->
|
||||||
All = subscribers(Group, Topic),
|
All = subscribers(Group, Topic),
|
||||||
case All -- FailedSubs of
|
case All -- FailedSubs of
|
||||||
[] when FailedSubs =:= [] ->
|
[] when All =:= [] ->
|
||||||
%% Genuinely no subscriber
|
%% Genuinely no subscriber
|
||||||
false;
|
false;
|
||||||
[] ->
|
[] ->
|
||||||
|
|
|
@ -54,6 +54,7 @@ init_per_suite(Config) ->
|
||||||
|
|
||||||
ok = meck:expect(emqx_alarm, activate, fun(_, _) -> ok end),
|
ok = meck:expect(emqx_alarm, activate, fun(_, _) -> ok end),
|
||||||
ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end),
|
ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end),
|
||||||
|
ok = meck:expect(emqx_alarm, deactivate, fun(_, _) -> ok end),
|
||||||
|
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
@ -77,6 +78,9 @@ init_per_testcase(_TestCase, Config) ->
|
||||||
(peercert, [sock]) -> undefined
|
(peercert, [sock]) -> undefined
|
||||||
end),
|
end),
|
||||||
ok = meck:expect(emqx_transport, setopts, fun(_Sock, _Opts) -> ok end),
|
ok = meck:expect(emqx_transport, setopts, fun(_Sock, _Opts) -> ok end),
|
||||||
|
ok = meck:expect(emqx_transport, getopts, fun(_Sock, Options) ->
|
||||||
|
{ok, [{K, 0} || K <- Options]}
|
||||||
|
end),
|
||||||
ok = meck:expect(emqx_transport, getstat, fun(_Sock, Options) ->
|
ok = meck:expect(emqx_transport, getstat, fun(_Sock, Options) ->
|
||||||
{ok, [{K, 0} || K <- Options]}
|
{ok, [{K, 0} || K <- Options]}
|
||||||
end),
|
end),
|
||||||
|
|
Loading…
Reference in New Issue