refactor(emqx_connection): make the mqtt tcp connection work with new config

This commit is contained in:
Shawn 2021-07-06 18:26:54 +08:00
parent 969e72c82b
commit 5efd5c8d3b
7 changed files with 137 additions and 104 deletions

View File

@ -870,7 +870,7 @@ zones.default {
## received. ## received.
## ##
## @doc zones.<name>.mqtt.idle_timeout ## @doc zones.<name>.mqtt.idle_timeout
## ValueType: Duration | infinity ## ValueType: Duration
## Default: 15s ## Default: 15s
idle_timeout: 15s idle_timeout: 15s

View File

@ -32,14 +32,9 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}). -spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}).
authenticate(ClientInfo = #{zone := Zone}) -> authenticate(ClientInfo = #{zone := Zone, listener := Listener}) ->
AuthResult = default_auth_result(Zone), AuthResult = default_auth_result(Zone, Listener),
case emqx_zone:get_env(Zone, bypass_auth_plugins, false) of return_auth_result(run_hooks('client.authenticate', [ClientInfo], AuthResult)).
true ->
return_auth_result(AuthResult);
false ->
return_auth_result(run_hooks('client.authenticate', [ClientInfo], AuthResult))
end.
%% @doc Check ACL %% @doc Check ACL
-spec(check_acl(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic()) -spec(check_acl(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic())
@ -59,17 +54,16 @@ check_acl_cache(ClientInfo, PubSub, Topic) ->
AclResult -> AclResult AclResult -> AclResult
end. end.
do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) -> do_check_acl(ClientInfo, PubSub, Topic) ->
Default = emqx_zone:get_env(Zone, acl_nomatch, deny), case run_hooks('client.check_acl', [ClientInfo, PubSub, Topic], allow) of
case run_hooks('client.check_acl', [ClientInfo, PubSub, Topic], Default) of
allow -> allow; allow -> allow;
_Other -> deny _Other -> deny
end. end.
default_auth_result(Zone) -> default_auth_result(Zone, Listener) ->
case emqx_zone:get_env(Zone, allow_anonymous, false) of case emqx_config:get_listener_conf(Zone, Listener, [auth, enable]) of
true -> #{auth_result => success, anonymous => true}; false -> #{auth_result => success, anonymous => true};
false -> #{auth_result => not_authorized, anonymous => false} true -> #{auth_result => not_authorized, anonymous => false}
end. end.
-compile({inline, [run_hooks/3]}). -compile({inline, [run_hooks/3]}).

View File

@ -31,6 +31,8 @@
-export([ info/1 -export([ info/1
, info/2 , info/2
, get_mqtt_conf/3
, get_mqtt_conf/4
, set_conn_state/2 , set_conn_state/2
, get_session/1 , get_session/1
, set_session/2 , set_session/2
@ -63,7 +65,7 @@
, maybe_apply/2 , maybe_apply/2
]). ]).
-export_type([channel/0]). -export_type([channel/0, opts/0]).
-record(channel, { -record(channel, {
%% MQTT ConnInfo %% MQTT ConnInfo
@ -98,6 +100,8 @@
-type(channel() :: #channel{}). -type(channel() :: #channel{}).
-type(opts() :: #{zone := atom(), listener := atom(), atom() => term()}).
-type(conn_state() :: idle | connecting | connected | disconnected). -type(conn_state() :: idle | connecting | connected | disconnected).
-type(reply() :: {outgoing, emqx_types:packet()} -type(reply() :: {outgoing, emqx_types:packet()}
@ -151,7 +155,9 @@ info(connected_at, #channel{conninfo = ConnInfo}) ->
info(clientinfo, #channel{clientinfo = ClientInfo}) -> info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo; ClientInfo;
info(zone, #channel{clientinfo = ClientInfo}) -> info(zone, #channel{clientinfo = ClientInfo}) ->
maps:get(zone, ClientInfo, undefined); maps:get(zone, ClientInfo);
info(listener, #channel{clientinfo = ClientInfo}) ->
maps:get(listener, ClientInfo);
info(clientid, #channel{clientinfo = ClientInfo}) -> info(clientid, #channel{clientinfo = ClientInfo}) ->
maps:get(clientid, ClientInfo, undefined); maps:get(clientid, ClientInfo, undefined);
info(username, #channel{clientinfo = ClientInfo}) -> info(username, #channel{clientinfo = ClientInfo}) ->
@ -195,17 +201,20 @@ caps(#channel{clientinfo = #{zone := Zone}}) ->
%% Init the channel %% Init the channel
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(init(emqx_types:conninfo(), proplists:proplist()) -> channel()). -spec(init(emqx_types:conninfo(), opts()) -> channel()).
init(ConnInfo = #{peername := {PeerHost, _Port}, init(ConnInfo = #{peername := {PeerHost, _Port},
sockname := {_Host, SockPort}}, Options) -> sockname := {_Host, SockPort}}, #{zone := Zone, listener := Listener}) ->
Zone = proplists:get_value(zone, Options),
Peercert = maps:get(peercert, ConnInfo, undefined), Peercert = maps:get(peercert, ConnInfo, undefined),
Protocol = maps:get(protocol, ConnInfo, mqtt), Protocol = maps:get(protocol, ConnInfo, mqtt),
MountPoint = emqx_zone:mountpoint(Zone), MountPoint = case get_mqtt_conf(Zone, Listener, mountpoint) of
QuotaPolicy = emqx_zone:quota_policy(Zone), "" -> undefined;
ClientInfo = setting_peercert_infos( MP -> MP
end,
QuotaPolicy = emqx_config:get_listener_conf(Zone, Listener, [rate_limit, quota]),
ClientInfo = set_peercert_infos(
Peercert, Peercert,
#{zone => Zone, #{zone => Zone,
listener => Listener,
protocol => Protocol, protocol => Protocol,
peerhost => PeerHost, peerhost => PeerHost,
sockport => SockPort, sockport => SockPort,
@ -214,7 +223,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
mountpoint => MountPoint, mountpoint => MountPoint,
is_bridge => false, is_bridge => false,
is_superuser => false is_superuser => false
}, Options), }, Zone, Listener),
{NClientInfo, NConnInfo} = take_ws_cookie(ClientInfo, ConnInfo), {NClientInfo, NConnInfo} = take_ws_cookie(ClientInfo, ConnInfo),
#channel{conninfo = NConnInfo, #channel{conninfo = NConnInfo,
clientinfo = NClientInfo, clientinfo = NClientInfo,
@ -222,7 +231,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
outbound => #{} outbound => #{}
}, },
auth_cache = #{}, auth_cache = #{},
quota = emqx_limiter:init(Zone, QuotaPolicy), quota = emqx_limiter:init(Zone, quota_policy(QuotaPolicy)),
timers = #{}, timers = #{},
conn_state = idle, conn_state = idle,
takeover = false, takeover = false,
@ -230,30 +239,34 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
pendings = [] pendings = []
}. }.
setting_peercert_infos(NoSSL, ClientInfo, _Options) quota_policy(RawPolicy) ->
[{Name, {list_to_integer(StrCount),
erlang:trunc(hocon_postprocess:duration(StrWind) / 1000)}}
|| {Name, [StrCount, StrWind]} <- maps:to_list(RawPolicy)].
set_peercert_infos(NoSSL, ClientInfo, _, _)
when NoSSL =:= nossl; when NoSSL =:= nossl;
NoSSL =:= undefined -> NoSSL =:= undefined ->
ClientInfo#{username => undefined}; ClientInfo#{username => undefined};
setting_peercert_infos(Peercert, ClientInfo, Options) -> set_peercert_infos(Peercert, ClientInfo, Zone, Listener) ->
{DN, CN} = {esockd_peercert:subject(Peercert), {DN, CN} = {esockd_peercert:subject(Peercert),
esockd_peercert:common_name(Peercert)}, esockd_peercert:common_name(Peercert)},
Username = peer_cert_as(peer_cert_as_username, Options, Peercert, DN, CN), PeercetAs = fun(Key) ->
ClientId = peer_cert_as(peer_cert_as_clientid, Options, Peercert, DN, CN), % esockd_peercert:peercert is opaque
ClientInfo#{username => Username, clientid => ClientId, dn => DN, cn => CN}. % https://github.com/emqx/esockd/blob/master/src/esockd_peercert.erl
case get_mqtt_conf(Zone, Listener, Key) of
-dialyzer([{nowarn_function, [peer_cert_as/5]}]).
% esockd_peercert:peercert is opaque
% https://github.com/emqx/esockd/blob/master/src/esockd_peercert.erl
peer_cert_as(Key, Options, Peercert, DN, CN) ->
case proplists:get_value(Key, Options) of
cn -> CN; cn -> CN;
dn -> DN; dn -> DN;
crt -> Peercert; crt -> Peercert;
pem -> base64:encode(Peercert); pem -> base64:encode(Peercert);
md5 -> emqx_passwd:hash(md5, Peercert); md5 -> emqx_passwd:hash(md5, Peercert);
_ -> undefined _ -> undefined
end. end
end,
Username = PeercetAs(peer_cert_as_username),
ClientId = PeercetAs(peer_cert_as_clientid),
ClientInfo#{username => Username, clientid => ClientId, dn => DN, cn => CN}.
take_ws_cookie(ClientInfo, ConnInfo) -> take_ws_cookie(ClientInfo, ConnInfo) ->
case maps:take(ws_cookie, ConnInfo) of case maps:take(ws_cookie, ConnInfo) of
@ -403,16 +416,17 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S
end; end;
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> Channel = #channel{clientinfo = ClientInfo = #{zone := Zone, listener := Listener}}) ->
case emqx_packet:check(Packet) of case emqx_packet:check(Packet) of
ok -> ok ->
TopicFilters0 = parse_topic_filters(TopicFilters), TopicFilters0 = parse_topic_filters(TopicFilters),
TopicFilters1 = put_subid_in_subopts(Properties, TopicFilters0), TopicFilters1 = put_subid_in_subopts(Properties, TopicFilters0),
TupleTopicFilters0 = check_sub_acls(TopicFilters1, Channel), TupleTopicFilters0 = check_sub_acls(TopicFilters1, Channel),
case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso HasAclDeny = lists:any(fun({_TopicFilter, ReasonCode}) ->
lists:any(fun({_TopicFilter, ReasonCode}) -> ReasonCode =:= ?RC_NOT_AUTHORIZED
ReasonCode =:= ?RC_NOT_AUTHORIZED end, TupleTopicFilters0),
end, TupleTopicFilters0) of DenyAction = emqx_config:get_listener_conf(Zone, Listener, [acl, deny_action]),
case DenyAction =:= disconnect andalso HasAclDeny of
true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel); true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
false -> false ->
Replace = fun Replace = fun
@ -512,7 +526,7 @@ process_connect(AckProps, Channel = #channel{conninfo = ConnInfo,
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
Channel = #channel{clientinfo = #{zone := Zone}}) -> Channel = #channel{clientinfo = #{zone := Zone, listener := Listener}}) ->
case pipeline([fun check_quota_exceeded/2, case pipeline([fun check_quota_exceeded/2,
fun process_alias/2, fun process_alias/2,
fun check_pub_alias/2, fun check_pub_alias/2,
@ -525,7 +539,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
{error, Rc = ?RC_NOT_AUTHORIZED, NChannel} -> {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
?LOG(warning, "Cannot publish message to ~s due to ~s.", ?LOG(warning, "Cannot publish message to ~s due to ~s.",
[Topic, emqx_reason_codes:text(Rc)]), [Topic, emqx_reason_codes:text(Rc)]),
case emqx_zone:get_env(Zone, acl_deny_action, ignore) of case emqx_config:get_listener_conf(Zone, Listener, [acl_deny_action]) of
ignore -> ignore ->
case QoS of case QoS of
?QOS_0 -> {ok, NChannel}; ?QOS_0 -> {ok, NChannel};
@ -968,8 +982,8 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting})
handle_info({sock_closed, Reason}, Channel = handle_info({sock_closed, Reason}, Channel =
#channel{conn_state = connected, #channel{conn_state = connected,
clientinfo = ClientInfo = #{zone := Zone}}) -> clientinfo = ClientInfo = #{zone := Zone, listener := Listener}}) ->
emqx_zone:enable_flapping_detect(Zone) emqx_config:get_listener_conf(Zone, Listener, [flapping_detect, enable])
andalso emqx_flapping:detect(ClientInfo), andalso emqx_flapping:detect(ClientInfo),
Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)), Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)),
case maybe_shutdown(Reason, Channel1) of case maybe_shutdown(Reason, Channel1) of
@ -1130,9 +1144,9 @@ enrich_conninfo(ConnPkt = #mqtt_packet_connect{
username = Username username = Username
}, },
Channel = #channel{conninfo = ConnInfo, Channel = #channel{conninfo = ConnInfo,
clientinfo = #{zone := Zone} clientinfo = #{zone := Zone, listener := Listener}
}) -> }) ->
ExpiryInterval = expiry_interval(Zone, ConnPkt), ExpiryInterval = expiry_interval(Zone, Listener, ConnPkt),
NConnInfo = ConnInfo#{proto_name => ProtoName, NConnInfo = ConnInfo#{proto_name => ProtoName,
proto_ver => ProtoVer, proto_ver => ProtoVer,
clean_start => CleanStart, clean_start => CleanStart,
@ -1141,22 +1155,21 @@ enrich_conninfo(ConnPkt = #mqtt_packet_connect{
username => Username, username => Username,
conn_props => ConnProps, conn_props => ConnProps,
expiry_interval => ExpiryInterval, expiry_interval => ExpiryInterval,
receive_maximum => receive_maximum(Zone, ConnProps) receive_maximum => receive_maximum(Zone, Listener, ConnProps)
}, },
{ok, Channel#channel{conninfo = NConnInfo}}. {ok, Channel#channel{conninfo = NConnInfo}}.
%% If the Session Expiry Interval is absent the value 0 is used. %% If the Session Expiry Interval is absent the value 0 is used.
-compile({inline, [expiry_interval/2]}). expiry_interval(_, _, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
expiry_interval(_Zone, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
properties = ConnProps}) -> properties = ConnProps}) ->
emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0); emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0);
expiry_interval(Zone, #mqtt_packet_connect{clean_start = false}) -> expiry_interval(Zone, Listener, #mqtt_packet_connect{clean_start = false}) ->
emqx_zone:session_expiry_interval(Zone); get_mqtt_conf(Zone, Listener, session_expiry_interval);
expiry_interval(_Zone, #mqtt_packet_connect{clean_start = true}) -> expiry_interval(_, _, #mqtt_packet_connect{clean_start = true}) ->
0. 0.
receive_maximum(Zone, ConnProps) -> receive_maximum(Zone, Listener, ConnProps) ->
MaxInflightConfig = case emqx_zone:max_inflight(Zone) of MaxInflightConfig = case get_mqtt_conf(Zone, Listener, max_inflight) of
0 -> ?RECEIVE_MAXIMUM_LIMIT; 0 -> ?RECEIVE_MAXIMUM_LIMIT;
N -> N N -> N
end, end,
@ -1205,8 +1218,9 @@ set_bridge_mode(_ConnPkt, _ClientInfo) -> ok.
maybe_username_as_clientid(_ConnPkt, ClientInfo = #{username := undefined}) -> maybe_username_as_clientid(_ConnPkt, ClientInfo = #{username := undefined}) ->
{ok, ClientInfo}; {ok, ClientInfo};
maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone, username := Username}) -> maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone, listener := Listener,
case emqx_zone:use_username_as_clientid(Zone) of username := Username}) ->
case get_mqtt_conf(Zone, Listener, use_username_as_clientid) of
true -> {ok, ClientInfo#{clientid => Username}}; true -> {ok, ClientInfo#{clientid => Username}};
false -> ok false -> ok
end. end.
@ -1234,8 +1248,8 @@ set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Check banned %% Check banned
check_banned(_ConnPkt, #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> check_banned(_ConnPkt, #channel{clientinfo = ClientInfo}) ->
case emqx_zone:enable_ban(Zone) andalso emqx_banned:check(ClientInfo) of case emqx_banned:check(ClientInfo) of
true -> {error, ?RC_BANNED}; true -> {error, ?RC_BANNED};
false -> ok false -> ok
end. end.
@ -1463,8 +1477,9 @@ put_subid_in_subopts(_Properties, TopicFilters) -> TopicFilters.
enrich_subopts(SubOpts, _Channel = ?IS_MQTT_V5) -> enrich_subopts(SubOpts, _Channel = ?IS_MQTT_V5) ->
SubOpts; SubOpts;
enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) -> enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge,
NL = flag(emqx_zone:ignore_loop_deliver(Zone)), listener := Listener}}) ->
NL = flag(get_mqtt_conf(Zone, Listener, ignore_loop_deliver)),
SubOpts#{rap => flag(IsBridge), nl => NL}. SubOpts#{rap => flag(IsBridge), nl => NL}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1499,8 +1514,8 @@ enrich_connack_caps(AckProps, _Channel) -> AckProps.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Enrich server keepalive %% Enrich server keepalive
enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) -> enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone, listener := Listener}}) ->
case emqx_zone:server_keepalive(Zone) of case get_mqtt_conf(Zone, Listener, server_keepalive) of
undefined -> AckProps; undefined -> AckProps;
Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive} Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive}
end. end.
@ -1509,10 +1524,14 @@ enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) ->
%% Enrich response information %% Enrich response information
enrich_response_information(AckProps, #channel{conninfo = #{conn_props := ConnProps}, enrich_response_information(AckProps, #channel{conninfo = #{conn_props := ConnProps},
clientinfo = #{zone := Zone}}) -> clientinfo = #{zone := Zone, listener := Listener}}) ->
case emqx_mqtt_props:get('Request-Response-Information', ConnProps, 0) of case emqx_mqtt_props:get('Request-Response-Information', ConnProps, 0) of
0 -> AckProps; 0 -> AckProps;
1 -> AckProps#{'Response-Information' => emqx_zone:response_information(Zone)} 1 -> AckProps#{'Response-Information' =>
case get_mqtt_conf(Zone, Listener, response_information, "") of
"" -> undefined;
RspInfo -> RspInfo
end}
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1559,9 +1578,10 @@ ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel = #channel{conninfo
ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) -> ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) ->
ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel). ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel).
ensure_keepalive_timer(0, Channel) -> Channel; ensure_keepalive_timer(disabled, Channel) -> Channel;
ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone,
Backoff = emqx_zone:keepalive_backoff(Zone), listener := Listener}}) ->
Backoff = get_mqtt_conf(Zone, Listener, keepalive_backoff),
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
@ -1604,8 +1624,8 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
%% Is ACL enabled? %% Is ACL enabled?
-compile({inline, [is_acl_enabled/1]}). -compile({inline, [is_acl_enabled/1]}).
is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) -> is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) ->
(not IsSuperuser) andalso emqx_zone:enable_acl(Zone). (not IsSuperuser) andalso emqx_config:get_listener_conf(Zone, Listener, [acl, enable]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Parse Topic Filters %% Parse Topic Filters
@ -1715,6 +1735,12 @@ sp(false) -> 0.
flag(true) -> 1; flag(true) -> 1;
flag(false) -> 0. flag(false) -> 0.
get_mqtt_conf(Zone, Listener, Key) ->
emqx_config:get_listener_conf(Zone, Listener, [mqtt, Key]).
get_mqtt_conf(Zone, Listener, Key, Default) ->
emqx_config:get_listener_conf(Zone, Listener, [mqtt, Key], Default).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% For CT tests %% For CT tests
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1722,4 +1748,3 @@ flag(false) -> 0.
set_field(Name, Value, Channel) -> set_field(Name, Value, Channel) ->
Pos = emqx_misc:index_of(Name, record_info(fields, channel)), Pos = emqx_misc:index_of(Name, record_info(fields, channel)),
setelement(Pos+1, Channel, Value). setelement(Pos+1, Channel, Value).

View File

@ -108,7 +108,6 @@
}). }).
-type(state() :: #state{}). -type(state() :: #state{}).
-type(opts() :: #{zone := atom(), listener := atom(), atom() => term()}).
-define(ACTIVE_N, 100). -define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]).
@ -137,7 +136,7 @@
, system_code_change/4 , system_code_change/4
]}). ]}).
-spec(start_link(esockd:transport(), esockd:socket(), opts()) -spec(start_link(esockd:transport(), esockd:socket(), emqx_channel:opts())
-> {ok, pid()}). -> {ok, pid()}).
start_link(Transport, Socket, Options) -> start_link(Transport, Socket, Options) ->
Args = [self(), Transport, Socket, Options], Args = [self(), Transport, Socket, Options],
@ -256,18 +255,23 @@ init_state(Transport, Socket, Options) ->
}, },
Zone = maps:get(zone, Options), Zone = maps:get(zone, Options),
Listener = maps:get(listener, Options), Listener = maps:get(listener, Options),
Limiter = emqx_limiter:init(Zone, undefined, undefined, []),
PubLimit = emqx_zone:publish_limit(Zone), FrameOpts = #{
BytesIn = proplists:get_value(rate_limit, Options), strict_mode => emqx_config:get_listener_conf(Zone, Listener, [mqtt, strict_mode]),
RateLimit = emqx_zone:ratelimit(Zone), max_size => emqx_config:get_listener_conf(Zone, Listener, [mqtt, max_packet_size])
Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit), },
FrameOpts = emqx_zone:mqtt_frame_options(Zone),
ParseState = emqx_frame:initial_parse_state(FrameOpts), ParseState = emqx_frame:initial_parse_state(FrameOpts),
Serialize = emqx_frame:serialize_opts(), Serialize = emqx_frame:serialize_opts(),
Channel = emqx_channel:init(ConnInfo, Options), Channel = emqx_channel:init(ConnInfo, Options),
GcState = emqx_zone:init_gc_state(Zone), GcState = case emqx_config:get_listener_conf(Zone, Listener, [force_gc]) of
StatsTimer = emqx_zone:stats_timer(Zone), #{enable := false} -> undefined;
IdleTimeout = emqx_zone:idle_timeout(Zone), GcPolicy -> emqx_gc:init(GcPolicy)
end,
StatsTimer = case emqx_config:get_listener_conf(Zone, Listener, [stats, enable]) of
true -> undefined;
false -> disabled
end,
IdleTimeout = emqx_channel:get_mqtt_conf(Zone, Listener, idle_timeout),
IdleTimer = start_timer(IdleTimeout, idle_timeout), IdleTimer = start_timer(IdleTimeout, idle_timeout),
#state{transport = Transport, #state{transport = Transport,
socket = Socket, socket = Socket,
@ -291,8 +295,11 @@ run_loop(Parent, State = #state{transport = Transport,
peername = Peername, peername = Peername,
channel = Channel}) -> channel = Channel}) ->
emqx_logger:set_metadata_peername(esockd:format(Peername)), emqx_logger:set_metadata_peername(esockd:format(Peername)),
emqx_misc:tune_heap_size(emqx_zone:oom_policy( case emqx_config:get_listener_conf(emqx_channel:info(zone, Channel),
emqx_channel:info(zone, Channel))), emqx_channel:info(listener, Channel), [force_shutdown]) of
#{enable := false} -> ok;
ShutdownPolicy -> emqx_misc:tune_heap_size(ShutdownPolicy)
end,
case activate_socket(State) of case activate_socket(State) of
{ok, NState} -> hibernate(Parent, NState); {ok, NState} -> hibernate(Parent, NState);
{error, Reason} -> {error, Reason} ->
@ -783,15 +790,18 @@ run_gc(Stats, State = #state{gc_state = GcSt}) ->
end. end.
check_oom(State = #state{channel = Channel}) -> check_oom(State = #state{channel = Channel}) ->
Zone = emqx_channel:info(zone, Channel), ShutdownPolicy = emqx_config:get_listener_conf(emqx_channel:info(zone, Channel),
OomPolicy = emqx_zone:oom_policy(Zone), emqx_channel:info(listener, Channel), [force_shutdown]),
?tp(debug, check_oom, #{policy => OomPolicy}), ?tp(debug, check_oom, #{policy => ShutdownPolicy}),
case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of case ShutdownPolicy of
{shutdown, Reason} -> #{enable := false} -> ok;
%% triggers terminate/2 callback immediately ShutdownPolicy ->
erlang:exit({shutdown, Reason}); case emqx_misc:check_oom(ShutdownPolicy) of
_Other -> {shutdown, Reason} ->
ok %% triggers terminate/2 callback immediately
erlang:exit({shutdown, Reason});
_ -> ok
end
end, end,
State. State.

View File

@ -34,6 +34,9 @@
, serialize/2 , serialize/2
]). ]).
-export([ set_opts/2
]).
-export_type([ options/0 -export_type([ options/0
, parse_state/0 , parse_state/0
, parse_result/0 , parse_result/0
@ -81,11 +84,11 @@ initial_parse_state() ->
-spec(initial_parse_state(options()) -> {none, options()}). -spec(initial_parse_state(options()) -> {none, options()}).
initial_parse_state(Options) when is_map(Options) -> initial_parse_state(Options) when is_map(Options) ->
?none(merge_opts(Options)). ?none(maps:merge(?DEFAULT_OPTIONS, Options)).
%% @pivate -spec set_opts(parse_state(), options()) -> parse_state().
merge_opts(Options) -> set_opts({_, OldOpts}, Opts) ->
maps:merge(?DEFAULT_OPTIONS, Options). maps:merge(OldOpts, Opts).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Parse MQTT Frame %% Parse MQTT Frame

View File

@ -68,7 +68,8 @@ console_print(_Fmt, _Args) -> ok.
-> {ok, pid()} | {error, term()}). -> {ok, pid()} | {error, term()}).
do_start_listener(ZoneName, ListenerName, #{type := tcp, bind := ListenOn} = Opts) -> do_start_listener(ZoneName, ListenerName, #{type := tcp, bind := ListenOn} = Opts) ->
esockd:open(listener_id(ZoneName, ListenerName), ListenOn, merge_default(esockd_opts(Opts)), esockd:open(listener_id(ZoneName, ListenerName), ListenOn, merge_default(esockd_opts(Opts)),
{emqx_connection, start_link, [{ZoneName, ListenerName}]}); {emqx_connection, start_link,
[#{zone => ZoneName, listener => ListenerName}]});
%% Start MQTT/WS listener %% Start MQTT/WS listener
do_start_listener(ZoneName, ListenerName, #{type := ws, bind := ListenOn} = Opts) -> do_start_listener(ZoneName, ListenerName, #{type := ws, bind := ListenOn} = Opts) ->

View File

@ -59,7 +59,7 @@
-export([includes/0]). -export([includes/0]).
structs() -> ["cluster", "node", "rpc", "log", "lager", structs() -> ["cluster", "node", "rpc", "log", "lager",
"acl", "mqtt", "zones", "listeners", "module", "broker", "zones", "listeners", "module", "broker",
"plugins", "sysmon", "alarm", "telemetry"] "plugins", "sysmon", "alarm", "telemetry"]
++ includes(). ++ includes().
@ -244,7 +244,7 @@ fields("acl_cache") ->
]; ];
fields("mqtt") -> fields("mqtt") ->
[ {"mountpoint", t(binary(), undefined, <<"">>)} [ {"mountpoint", t(binary(), undefined, <<>>)}
, {"idle_timeout", maybe_infinity(duration(), "15s")} , {"idle_timeout", maybe_infinity(duration(), "15s")}
, {"max_packet_size", maybe_infinity(bytesize(), "1MB")} , {"max_packet_size", maybe_infinity(bytesize(), "1MB")}
, {"max_clientid_len", t(integer(), undefined, 65535)} , {"max_clientid_len", t(integer(), undefined, 65535)}
@ -256,7 +256,7 @@ fields("mqtt") ->
, {"shared_subscription", t(boolean(), undefined, true)} , {"shared_subscription", t(boolean(), undefined, true)}
, {"ignore_loop_deliver", t(boolean())} , {"ignore_loop_deliver", t(boolean())}
, {"strict_mode", t(boolean(), undefined, false)} , {"strict_mode", t(boolean(), undefined, false)}
, {"response_information", t(string(), undefined, undefined)} , {"response_information", t(string(), undefined, "")}
, {"server_keepalive", maybe_disabled(integer())} , {"server_keepalive", maybe_disabled(integer())}
, {"keepalive_backoff", t(float(), undefined, 0.75)} , {"keepalive_backoff", t(float(), undefined, 0.75)}
, {"max_subscriptions", maybe_infinity(integer())} , {"max_subscriptions", maybe_infinity(integer())}
@ -365,7 +365,7 @@ fields("ws_opts") ->
[ {"mqtt_path", t(string(), undefined, "/mqtt")} [ {"mqtt_path", t(string(), undefined, "/mqtt")}
, {"mqtt_piggyback", t(union(single, multiple), undefined, multiple)} , {"mqtt_piggyback", t(union(single, multiple), undefined, multiple)}
, {"compress", t(boolean())} , {"compress", t(boolean())}
, {"idle_timeout", maybe_infinity(duration())} , {"idle_timeout", t(duration(), undefined, "15s")}
, {"max_frame_size", maybe_infinity(integer())} , {"max_frame_size", maybe_infinity(integer())}
, {"fail_if_no_subprotocol", t(boolean(), undefined, true)} , {"fail_if_no_subprotocol", t(boolean(), undefined, true)}
, {"supported_subprotocols", t(string(), undefined, , {"supported_subprotocols", t(string(), undefined,