emqx/src/emqx_channel.erl

1645 lines
68 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% MQTT Channel
-module(emqx_channel).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Channel]").
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-export([ info/1
, info/2
, stats/1
, caps/1
]).
-export([ init/2
, handle_in/2
, handle_deliver/2
, handle_out/3
, handle_timeout/3
, handle_call/2
, handle_info/2
, terminate/2
]).
%% Exports for CT
-export([set_field/3]).
-import(emqx_misc,
[ run_fold/3
, pipeline/3
, maybe_apply/2
]).
-export_type([channel/0]).
-record(channel, {
%% MQTT ConnInfo
conninfo :: emqx_types:conninfo(),
%% MQTT ClientInfo
clientinfo :: emqx_types:clientinfo(),
%% MQTT Session
session :: maybe(emqx_session:session()),
%% Keepalive
keepalive :: maybe(emqx_keepalive:keepalive()),
%% MQTT Will Msg
will_msg :: maybe(emqx_types:message()),
%% MQTT Topic Aliases
topic_aliases :: emqx_types:topic_aliases(),
%% MQTT Topic Alias Maximum
alias_maximum :: maybe(map()),
%% Authentication Data Cache
auth_cache :: maybe(map()),
%% Quota checkers
quota :: maybe(emqx_limiter:limiter()),
%% Timers
timers :: #{atom() => disabled | maybe(reference())},
%% Conn State
conn_state :: conn_state(),
%% Takeover
takeover :: boolean(),
%% Resume
resuming :: boolean(),
%% Pending delivers when takeovering
pendings :: list()
}).
-opaque(channel() :: #channel{}).
-type(conn_state() :: idle | connecting | connected | disconnected).
-type(reply() :: {outgoing, emqx_types:packet()}
| {outgoing, [emqx_types:packet()]}
| {event, conn_state()|updated}
| {close, Reason :: atom()}).
-type(replies() :: emqx_types:packet() | reply() | [reply()]).
-define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}).
-define(TIMER_TABLE, #{
alive_timer => keepalive,
retry_timer => retry_delivery,
await_timer => expire_awaiting_rel,
expire_timer => expire_session,
will_timer => will_message,
quota_timer => expire_quota_limit
}).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
%%--------------------------------------------------------------------
%% Info, Attrs and Caps
%%--------------------------------------------------------------------
%% @doc Get infos of the channel.
-spec(info(channel()) -> emqx_types:infos()).
info(Channel) ->
maps:from_list(info(?INFO_KEYS, Channel)).
-spec(info(list(atom())|atom(), channel()) -> term()).
info(Keys, Channel) when is_list(Keys) ->
[{Key, info(Key, Channel)} || Key <- Keys];
info(conninfo, #channel{conninfo = ConnInfo}) ->
ConnInfo;
info(zone, #channel{clientinfo = #{zone := Zone}}) ->
Zone;
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
ClientId;
info(username, #channel{clientinfo = #{username := Username}}) ->
Username;
info(socktype, #channel{conninfo = #{socktype := SockType}}) ->
SockType;
info(peername, #channel{conninfo = #{peername := Peername}}) ->
Peername;
info(sockname, #channel{conninfo = #{sockname := Sockname}}) ->
Sockname;
info(proto_name, #channel{conninfo = #{proto_name := ProtoName}}) ->
ProtoName;
info(proto_ver, #channel{conninfo = #{proto_ver := ProtoVer}}) ->
ProtoVer;
info(connected_at, #channel{conninfo = #{connected_at := ConnectedAt}}) ->
ConnectedAt;
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, #channel{session = Session}) ->
maybe_apply(fun emqx_session:info/1, Session);
info(conn_state, #channel{conn_state = ConnState}) ->
ConnState;
info(keepalive, #channel{keepalive = Keepalive}) ->
maybe_apply(fun emqx_keepalive:info/1, Keepalive);
info(will_msg, #channel{will_msg = undefined}) ->
undefined;
info(will_msg, #channel{will_msg = WillMsg}) ->
emqx_message:to_map(WillMsg);
info(topic_aliases, #channel{topic_aliases = Aliases}) ->
Aliases;
info(alias_maximum, #channel{alias_maximum = Limits}) ->
Limits;
info(timers, #channel{timers = Timers}) -> Timers.
%% TODO: Add more stats.
-spec(stats(channel()) -> emqx_types:stats()).
stats(#channel{session = Session})->
emqx_session:stats(Session).
-spec(caps(channel()) -> emqx_types:caps()).
caps(#channel{clientinfo = #{zone := Zone}}) ->
emqx_mqtt_caps:get_caps(Zone).
%%--------------------------------------------------------------------
%% Init the channel
%%--------------------------------------------------------------------
-spec(init(emqx_types:conninfo(), proplists:proplist()) -> channel()).
init(ConnInfo = #{peername := {PeerHost, _Port},
sockname := {_Host, SockPort}}, Options) ->
Zone = proplists:get_value(zone, Options),
Peercert = maps:get(peercert, ConnInfo, undefined),
Protocol = maps:get(protocol, ConnInfo, mqtt),
MountPoint = emqx_zone:mountpoint(Zone),
QuotaPolicy = emqx_zone:quota_policy(Zone),
ClientInfo = setting_peercert_infos(
Peercert,
#{zone => Zone,
protocol => Protocol,
peerhost => PeerHost,
sockport => SockPort,
clientid => undefined,
username => undefined,
mountpoint => MountPoint,
is_bridge => false,
is_superuser => false
}, Options),
{NClientInfo, NConnInfo} = take_ws_cookie(ClientInfo, ConnInfo),
#channel{conninfo = NConnInfo,
clientinfo = NClientInfo,
topic_aliases = #{inbound => #{},
outbound => #{}
},
auth_cache = #{},
quota = emqx_limiter:init(Zone, QuotaPolicy),
timers = #{},
conn_state = idle,
takeover = false,
resuming = false,
pendings = []
}.
setting_peercert_infos(NoSSL, ClientInfo, _Options)
when NoSSL =:= nossl;
NoSSL =:= undefined ->
ClientInfo#{username => undefined};
setting_peercert_infos(Peercert, ClientInfo, Options) ->
{DN, CN} = {esockd_peercert:subject(Peercert),
esockd_peercert:common_name(Peercert)},
Username = case proplists:get_value(peer_cert_as_username, Options) of
cn -> CN;
dn -> DN;
crt -> Peercert;
_ -> undefined
end,
ClientInfo#{username => Username, dn => DN, cn => CN}.
take_ws_cookie(ClientInfo, ConnInfo) ->
case maps:take(ws_cookie, ConnInfo) of
{WsCookie, NConnInfo} ->
{ClientInfo#{ws_cookie => WsCookie}, NConnInfo};
_ ->
{ClientInfo, ConnInfo}
end.
%%--------------------------------------------------------------------
%% Handle incoming packet
%%--------------------------------------------------------------------
-spec(handle_in(emqx_types:packet(), channel())
-> {ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}).
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connected}) ->
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
case pipeline([fun enrich_conninfo/2,
fun run_conn_hooks/2,
fun check_connect/2,
fun enrich_client/2,
fun set_log_meta/2,
fun check_banned/2,
fun auth_connect/2
], ConnPkt, Channel#channel{conn_state = connecting}) of
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
NChannel1 = NChannel#channel{
will_msg = emqx_packet:will_msg(NConnPkt),
alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
},
case enhanced_auth(?CONNECT_PACKET(NConnPkt), NChannel1) of
{ok, Properties, NChannel2} ->
process_connect(Properties, ensure_connected(NChannel2));
{continue, Properties, NChannel2} ->
handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2);
{error, ReasonCode, NChannel2} ->
handle_out(connack, ReasonCode, NChannel2)
end;
{error, ReasonCode, NChannel} ->
handle_out(connack, ReasonCode, NChannel)
end;
handle_in(Packet = ?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION, _Properties), Channel = #channel{conn_state = ConnState}) ->
case enhanced_auth(Packet, Channel) of
{ok, NProperties, NChannel} ->
case ConnState of
connecting ->
process_connect(NProperties, ensure_connected(NChannel));
connected ->
handle_out(auth, {?RC_SUCCESS, NProperties}, NChannel);
_ ->
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel)
end;
{continue, NProperties, NChannel} ->
handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel);
{error, NReasonCode, NChannel} ->
handle_out(connack, NReasonCode, NChannel)
end;
handle_in(Packet = ?AUTH_PACKET(?RC_RE_AUTHENTICATE, _Properties), Channel = #channel{conn_state = connected}) ->
case enhanced_auth(Packet, Channel) of
{ok, NProperties, NChannel} ->
handle_out(auth, {?RC_SUCCESS, NProperties}, NChannel);
{continue, NProperties, NChannel} ->
handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel);
{error, NReasonCode, NChannel} ->
handle_out(disconnect, NReasonCode, NChannel)
end;
handle_in(?PACKET(_), Channel = #channel{conn_state = ConnState}) when ConnState =/= connected ->
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
case emqx_packet:check(Packet) of
ok -> process_publish(Packet, Channel);
{error, ReasonCode} ->
handle_out(disconnect, ReasonCode, Channel)
end;
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
= #channel{clientinfo = ClientInfo, session = Session}) ->
case emqx_session:puback(PacketId, Session) of
{ok, Msg, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Properties),
{ok, Channel#channel{session = NSession}};
{ok, Msg, Publishes, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Properties),
handle_out(publish, Publishes, Channel#channel{session = NSession});
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
ok = emqx_metrics:inc('packets.puback.inuse'),
{ok, Channel};
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBACK PacketId ~w is not found.", [PacketId]),
ok = emqx_metrics:inc('packets.puback.missed'),
{ok, Channel}
end;
handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
= #channel{clientinfo = ClientInfo, session = Session}) ->
case emqx_session:pubrec(PacketId, Session) of
{ok, Msg, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Properties),
NChannel = Channel#channel{session = NSession},
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]),
ok = emqx_metrics:inc('packets.pubrec.inuse'),
handle_out(pubrel, {PacketId, RC}, Channel);
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBREC ~w is not found.", [PacketId]),
ok = emqx_metrics:inc('packets.pubrec.missed'),
handle_out(pubrel, {PacketId, RC}, Channel)
end;
handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
case emqx_session:pubrel(PacketId, Session) of
{ok, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBREL PacketId ~w is not found.", [PacketId]),
ok = emqx_metrics:inc('packets.pubrel.missed'),
handle_out(pubcomp, {PacketId, RC}, Channel)
end;
handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
case emqx_session:pubcomp(PacketId, Session) of
{ok, NSession} ->
{ok, Channel#channel{session = NSession}};
{ok, Publishes, NSession} ->
handle_out(publish, Publishes, Channel#channel{session = NSession});
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
ok = emqx_metrics:inc('packets.pubcomp.inuse'),
{ok, Channel};
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]),
ok = emqx_metrics:inc('packets.pubcomp.missed'),
{ok, Channel}
end;
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
case emqx_packet:check(Packet) of
ok -> TopicFilters1 = parse_topic_filters(TopicFilters),
TopicFilters2 = put_subid_in_subopts(Properties, TopicFilters1),
TopicFilters3 = run_hooks('client.subscribe',
[ClientInfo, Properties],
TopicFilters2
),
{ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Properties, Channel),
case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso
lists:any(fun(ReasonCode) ->
ReasonCode =:= ?RC_NOT_AUTHORIZED
end, ReasonCodes) of
true ->
handle_out(disconnect, ?RC_NOT_AUTHORIZED, NChannel);
false ->
handle_out(suback, {PacketId, ReasonCodes}, NChannel)
end;
{error, ReasonCode} ->
handle_out(disconnect, ReasonCode, Channel)
end;
handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
Channel = #channel{clientinfo = ClientInfo}) ->
case emqx_packet:check(Packet) of
ok -> TopicFilters1 = run_hooks('client.unsubscribe',
[ClientInfo, Properties],
parse_topic_filters(TopicFilters)
),
{ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Properties, Channel),
handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
{error, ReasonCode} ->
handle_out(disconnect, ReasonCode, Channel)
end;
handle_in(?PACKET(?PINGREQ), Channel) ->
{ok, ?PACKET(?PINGRESP), Channel};
handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) ->
NChannel = maybe_clean_will_msg(ReasonCode, Channel#channel{conninfo = ConnInfo#{disconn_props => Properties}}),
process_disconnect(ReasonCode, Properties, NChannel);
handle_in(?AUTH_PACKET(), Channel) ->
handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
shutdown(Reason, Channel);
handle_in({frame_error, frame_too_large}, Channel = #channel{conn_state = connecting}) ->
shutdown(frame_too_large, ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
shutdown(Reason, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
handle_in({frame_error, frame_too_large}, Channel = #channel{conn_state = connected}) ->
handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = connected}) ->
handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
?LOG(error, "Unexpected frame error: ~p", [Reason]),
{ok, Channel};
handle_in(Packet, Channel) ->
?LOG(error, "Unexpected incoming: ~p", [Packet]),
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
%%--------------------------------------------------------------------
%% Process Connect
%%--------------------------------------------------------------------
process_connect(AckProps, Channel = #channel{conninfo = #{clean_start := CleanStart} = ConnInfo, clientinfo = ClientInfo}) ->
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
{ok, #{session := Session, present := false}} ->
NChannel = Channel#channel{session = Session},
handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, NChannel);
{ok, #{session := Session, present := true, pendings := Pendings}} ->
Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
NChannel = Channel#channel{session = Session,
resuming = true,
pendings = Pendings1
},
handle_out(connack, {?RC_SUCCESS, sp(true), AckProps}, NChannel);
{error, client_id_unavailable} ->
handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel);
{error, Reason} ->
?LOG(error, "Failed to open session due to ~p", [Reason]),
handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel)
end.
%%--------------------------------------------------------------------
%% Process Publish
%%--------------------------------------------------------------------
process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
Channel = #channel{clientinfo = #{zone := Zone}}) ->
case pipeline([fun check_quota_exceeded/2,
fun process_alias/2,
fun check_pub_alias/2,
fun check_pub_acl/2,
fun check_pub_caps/2
], Packet, Channel) of
{ok, NPacket, NChannel} ->
Msg = packet_to_message(NPacket, NChannel),
do_publish(PacketId, Msg, NChannel);
{error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
?LOG(warning, "Cannot publish message to ~s due to ~s.",
[Topic, emqx_reason_codes:text(Rc)]),
case emqx_zone:get_env(Zone, acl_deny_action, ignore) of
ignore ->
case QoS of
?QOS_0 -> {ok, NChannel};
_ ->
handle_out(puback, {PacketId, Rc}, NChannel)
end;
disconnect ->
handle_out(disconnect, Rc, NChannel)
end;
{error, Rc = ?RC_QUOTA_EXCEEDED, NChannel} ->
?LOG(warning, "Cannot publish messages to ~s due to ~s.",
[Topic, emqx_reason_codes:text(Rc)]),
case QoS of
?QOS_0 ->
ok = emqx_metrics:inc('packets.publish.dropped'),
{ok, NChannel};
?QOS_1 ->
handle_out(puback, {PacketId, Rc}, NChannel);
?QOS_2 ->
handle_out(pubrec, {PacketId, Rc}, NChannel)
end;
{error, Rc, NChannel} ->
?LOG(warning, "Cannot publish message to ~s due to ~s.",
[Topic, emqx_reason_codes:text(Rc)]),
handle_out(disconnect, Rc, NChannel)
end.
packet_to_message(Packet, #channel{
conninfo = #{proto_ver := ProtoVer},
clientinfo = #{
protocol := Protocol,
clientid := ClientId,
username := Username,
peerhost := PeerHost,
mountpoint := MountPoint
}
}) ->
emqx_mountpoint:mount(MountPoint,
emqx_packet:to_message(
Packet, ClientId,
#{proto_ver => ProtoVer,
protocol => Protocol,
username => Username,
peerhost => PeerHost})).
do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->
Result = emqx_broker:publish(Msg),
NChannel = ensure_quota(Result, Channel),
{ok, NChannel};
do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
PubRes = emqx_broker:publish(Msg),
RC = puback_reason_code(PubRes),
NChannel = ensure_quota(PubRes, Channel),
handle_out(puback, {PacketId, RC}, NChannel);
do_publish(PacketId, Msg = #message{qos = ?QOS_2},
Channel = #channel{session = Session}) ->
case emqx_session:publish(PacketId, Msg, Session) of
{ok, PubRes, NSession} ->
RC = puback_reason_code(PubRes),
NChannel1 = ensure_timer(await_timer, Channel#channel{session = NSession}),
NChannel2 = ensure_quota(PubRes, NChannel1),
handle_out(pubrec, {PacketId, RC}, NChannel2);
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
ok = emqx_metrics:inc('packets.publish.inuse'),
handle_out(pubrec, {PacketId, RC}, Channel);
{error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
?LOG(warning, "Dropped the qos2 packet ~w "
"due to awaiting_rel is full.", [PacketId]),
ok = emqx_metrics:inc('packets.publish.dropped'),
handle_out(pubrec, {PacketId, RC}, Channel)
end.
ensure_quota(_, Channel = #channel{quota = undefined}) ->
Channel;
ensure_quota(PubRes, Channel = #channel{quota = Limiter}) ->
Cnt = lists:foldl(
fun({_, _, ok}, N) -> N + 1;
({_, _, {ok, I}}, N) -> N + I;
(_, N) -> N
end, 1, PubRes),
case emqx_limiter:check(#{cnt => Cnt, oct => 0}, Limiter) of
{ok, NLimiter} ->
Channel#channel{quota = NLimiter};
{pause, Intv, NLimiter} ->
ensure_timer(quota_timer, Intv, Channel#channel{quota = NLimiter})
end.
-compile({inline, [puback_reason_code/1]}).
puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
puback_reason_code([_|_]) -> ?RC_SUCCESS.
-compile({inline, [after_message_acked/3]}).
after_message_acked(ClientInfo, Msg, PubAckProps) ->
ok = emqx_metrics:inc('messages.acked'),
emqx_hooks:run('message.acked', [ClientInfo,
emqx_message:set_header(puback_props, PubAckProps, Msg)]).
%%--------------------------------------------------------------------
%% Process Subscribe
%%--------------------------------------------------------------------
-compile({inline, [process_subscribe/3]}).
process_subscribe(TopicFilters, SubProps, Channel) ->
process_subscribe(TopicFilters, SubProps, Channel, []).
process_subscribe([], _SubProps, Channel, Acc) ->
{lists:reverse(Acc), Channel};
process_subscribe([{TopicFilter, SubOpts}|More], SubProps, Channel, Acc) ->
case check_subscribe(TopicFilter, SubOpts, Channel) of
ok ->
{RC, NChannel} = do_subscribe(TopicFilter, SubOpts#{sub_props => SubProps}, Channel),
process_subscribe(More, SubProps, NChannel, [RC|Acc]);
{error, RC} ->
?LOG(warning, "Cannot subscribe ~s due to ~s.", [TopicFilter, emqx_reason_codes:text(RC)]),
process_subscribe(More, SubProps, Channel, [RC|Acc])
end.
do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
session = Session}) ->
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
{ok, NSession} ->
{QoS, Channel#channel{session = NSession}};
{error, RC} ->
?LOG(warning, "Cannot subscribe ~s due to ~s.", [TopicFilter, emqx_reason_codes:text(RC)]),
{RC, Channel}
end.
%%--------------------------------------------------------------------
%% Process Unsubscribe
%%--------------------------------------------------------------------
-compile({inline, [process_unsubscribe/3]}).
process_unsubscribe(TopicFilters, UnSubProps, Channel) ->
process_unsubscribe(TopicFilters, UnSubProps, Channel, []).
process_unsubscribe([], _UnSubProps, Channel, Acc) ->
{lists:reverse(Acc), Channel};
process_unsubscribe([{TopicFilter, SubOpts}|More], UnSubProps, Channel, Acc) ->
{RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts#{unsub_props => UnSubProps}, Channel),
process_unsubscribe(More, UnSubProps, NChannel, [RC|Acc]).
do_unsubscribe(TopicFilter, SubOpts, Channel =
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
session = Session}) ->
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of
{ok, NSession} ->
{?RC_SUCCESS, Channel#channel{session = NSession}};
{error, RC} -> {RC, Channel}
end.
%%--------------------------------------------------------------------
%% Process Disconnect
%%--------------------------------------------------------------------
%% MQTT-v5.0: 3.14.4 DISCONNECT Actions
maybe_clean_will_msg(?RC_SUCCESS, Channel) ->
Channel#channel{will_msg = undefined};
maybe_clean_will_msg(_ReasonCode, Channel) ->
Channel.
%% MQTT-v5.0: 3.14.2.2.2 Session Expiry Interval
process_disconnect(_ReasonCode, #{'Session-Expiry-Interval' := Interval},
Channel = #channel{conninfo = #{expiry_interval := 0}})
when Interval > 0 ->
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
process_disconnect(ReasonCode, Properties, Channel) ->
NChannel = maybe_update_expiry_interval(Properties, Channel),
{ok, {close, disconnect_reason(ReasonCode)}, NChannel}.
maybe_update_expiry_interval(#{'Session-Expiry-Interval' := Interval},
Channel = #channel{conninfo = ConnInfo}) ->
Channel#channel{conninfo = ConnInfo#{expiry_interval => Interval}};
maybe_update_expiry_interval(_Properties, Channel) -> Channel.
%%--------------------------------------------------------------------
%% Handle Delivers from broker to client
%%--------------------------------------------------------------------
-spec(handle_deliver(list(emqx_types:deliver()), channel())
-> {ok, channel()} | {ok, replies(), channel()}).
handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
session = Session,
clientinfo = #{clientid := ClientId}}) ->
NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session),
{ok, Channel#channel{session = NSession}};
handle_deliver(Delivers, Channel = #channel{takeover = true,
pendings = Pendings,
session = Session,
clientinfo = #{clientid := ClientId}}) ->
NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)),
{ok, Channel#channel{pendings = NPendings}};
handle_deliver(Delivers, Channel = #channel{session = Session,
clientinfo = #{clientid := ClientId}}) ->
case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
{ok, Publishes, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
{ok, NSession} ->
{ok, Channel#channel{session = NSession}}
end.
ignore_local(Delivers, Subscriber, Session) ->
Subs = emqx_session:info(subscriptions, Session),
lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
case maps:find(Topic, Subs) of
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.no_local'),
true;
_ ->
false
end
end, Delivers).
%% Nack delivers from shared subscription
maybe_nack(Delivers) ->
lists:filter(fun not_nacked/1, Delivers).
not_nacked({deliver, _Topic, Msg}) ->
not (emqx_shared_sub:is_ack_required(Msg)
andalso (ok == emqx_shared_sub:nack_no_connection(Msg))).
%%--------------------------------------------------------------------
%% Handle outgoing packet
%%--------------------------------------------------------------------
-spec(handle_out(atom(), term(), channel())
-> {ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}).
handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = ConnInfo}) ->
AckProps = run_fold([fun enrich_connack_caps/2,
fun enrich_server_keepalive/2,
fun enrich_response_information/2,
fun enrich_assigned_clientid/2
], Props, Channel),
NAckProps = run_hooks('client.connack', [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps),
return_connack(?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps),
ensure_keepalive(NAckProps, Channel));
handle_out(connack, ReasonCode, Channel = #channel{conninfo = ConnInfo}) ->
Reason = emqx_reason_codes:name(ReasonCode),
AckProps = run_hooks('client.connack', [ConnInfo, Reason], emqx_mqtt_props:new()),
AckPacket = ?CONNACK_PACKET(case maps:get(proto_ver, ConnInfo) of
?MQTT_PROTO_V5 -> ReasonCode;
_ -> emqx_reason_codes:compat(connack, ReasonCode)
end, sp(false), AckProps),
shutdown(Reason, AckPacket, Channel);
%% Optimize?
handle_out(publish, [], Channel) ->
{ok, Channel};
handle_out(publish, Publishes, Channel) ->
{Packets, NChannel} = do_deliver(Publishes, Channel),
{ok, {outgoing, Packets}, NChannel};
handle_out(puback, {PacketId, ReasonCode}, Channel) ->
{ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel};
handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
{ok, ?PUBREC_PACKET(PacketId, ReasonCode), Channel};
handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
{ok, ?PUBREL_PACKET(PacketId, ReasonCode), Channel};
handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
{ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel};
handle_out(suback, {PacketId, ReasonCodes}, Channel = ?IS_MQTT_V5) ->
return_suback(?SUBACK_PACKET(PacketId, ReasonCodes), Channel);
handle_out(suback, {PacketId, ReasonCodes}, Channel) ->
ReasonCodes1 = [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes],
return_suback(?SUBACK_PACKET(PacketId, ReasonCodes1), Channel);
handle_out(unsuback, {PacketId, ReasonCodes}, Channel = ?IS_MQTT_V5) ->
return_unsuback(?UNSUBACK_PACKET(PacketId, ReasonCodes), Channel);
handle_out(unsuback, {PacketId, _ReasonCodes}, Channel) ->
return_unsuback(?UNSUBACK_PACKET(PacketId), Channel);
handle_out(disconnect, ReasonCode, Channel) when is_integer(ReasonCode) ->
ReasonName = disconnect_reason(ReasonCode),
handle_out(disconnect, {ReasonCode, ReasonName}, Channel);
handle_out(disconnect, {ReasonCode, ReasonName}, Channel = ?IS_MQTT_V5) ->
Packet = ?DISCONNECT_PACKET(ReasonCode),
{ok, [{outgoing, Packet}, {close, ReasonName}], Channel};
handle_out(disconnect, {_ReasonCode, ReasonName}, Channel) ->
{ok, {close, ReasonName}, Channel};
handle_out(auth, {ReasonCode, Properties}, Channel) ->
{ok, ?AUTH_PACKET(ReasonCode, Properties), Channel};
handle_out(Type, Data, Channel) ->
?LOG(error, "Unexpected outgoing: ~s, ~p", [Type, Data]),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Return ConnAck
%%--------------------------------------------------------------------
return_connack(AckPacket, Channel) ->
Replies = [{event, connected}, {connack, AckPacket}],
case maybe_resume_session(Channel) of
ignore -> {ok, Replies, Channel};
{ok, Publishes, NSession} ->
NChannel = Channel#channel{session = NSession,
resuming = false,
pendings = []
},
{Packets, NChannel1} = do_deliver(Publishes, NChannel),
Outgoing = [{outgoing, Packets} || length(Packets) > 0],
{ok, Replies ++ Outgoing, NChannel1}
end.
%%--------------------------------------------------------------------
%% Deliver publish: broker -> client
%%--------------------------------------------------------------------
%% return list(emqx_types:packet())
do_deliver({pubrel, PacketId}, Channel) ->
{[?PUBREL_PACKET(PacketId, ?RC_SUCCESS)], Channel};
do_deliver({PacketId, Msg}, Channel = #channel{clientinfo = ClientInfo =
#{mountpoint := MountPoint}}) ->
ok = emqx_metrics:inc('messages.delivered'),
Msg1 = emqx_hooks:run_fold('message.delivered',
[ClientInfo],
emqx_message:update_expiry(Msg)
),
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
Packet = emqx_message:to_packet(PacketId, Msg2),
{NPacket, NChannel} = packing_alias(Packet, Channel),
{[NPacket], NChannel};
do_deliver([Publish], Channel) ->
do_deliver(Publish, Channel);
do_deliver(Publishes, Channel) when is_list(Publishes) ->
{Packets, NChannel} =
lists:foldl(fun(Publish, {Acc, Chann}) ->
{Packets, NChann} = do_deliver(Publish, Chann),
{Packets ++ Acc, NChann}
end, {[], Channel}, Publishes),
{lists:reverse(Packets), NChannel}.
%%--------------------------------------------------------------------
%% Handle out suback
%%--------------------------------------------------------------------
return_suback(Packet, Channel) ->
{ok, [{outgoing, Packet}, {event, updated}], Channel}.
return_unsuback(Packet, Channel) ->
{ok, [{outgoing, Packet}, {event, updated}], Channel}.
%%--------------------------------------------------------------------
%% Handle call
%%--------------------------------------------------------------------
-spec(handle_call(Req :: term(), channel())
-> {reply, Reply :: term(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}).
handle_call(kick, Channel) ->
Channel1 = ensure_disconnected(kicked, Channel),
disconnect_and_shutdown(kicked, ok, Channel1);
handle_call(discard, Channel) ->
disconnect_and_shutdown(discarded, ok, Channel);
%% Session Takeover
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
reply(Session, Channel#channel{takeover = true});
handle_call({takeover, 'end'}, Channel = #channel{session = Session,
pendings = Pendings}) ->
ok = emqx_session:takeover(Session),
%% TODO: Should not drain deliver here (side effect)
Delivers = emqx_misc:drain_deliver(),
AllPendings = lists:append(Delivers, Pendings),
disconnect_and_shutdown(takeovered, AllPendings, Channel);
handle_call(list_acl_cache, Channel) ->
{reply, emqx_acl_cache:list_acl_cache(), Channel};
handle_call({quota, Policy}, Channel) ->
Zone = info(zone, Channel),
Quota = emqx_limiter:init(Zone, Policy),
reply(ok, Channel#channel{quota = Quota});
handle_call(Req, Channel) ->
?LOG(error, "Unexpected call: ~p", [Req]),
reply(ignored, Channel).
%%--------------------------------------------------------------------
%% Handle Info
%%--------------------------------------------------------------------
-spec(handle_info(Info :: term(), channel())
-> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}).
handle_info({subscribe, TopicFilters}, Channel ) ->
{_, NChannel} = lists:foldl(
fun({TopicFilter, SubOpts}, {_, ChannelAcc}) ->
do_subscribe(TopicFilter, SubOpts, ChannelAcc)
end, {[], Channel}, parse_topic_filters(TopicFilters)),
{ok, NChannel};
handle_info({unsubscribe, TopicFilters}, Channel) ->
{_RC, NChannel} = process_unsubscribe(TopicFilters, #{}, Channel),
{ok, NChannel};
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
shutdown(Reason, Channel);
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
shutdown(Reason, Channel);
handle_info({sock_closed, Reason}, Channel =
#channel{conn_state = connected,
clientinfo = ClientInfo = #{zone := Zone}}) ->
emqx_zone:enable_flapping_detect(Zone)
andalso emqx_flapping:detect(ClientInfo),
Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)),
case maybe_shutdown(Reason, Channel1) of
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
Shutdown -> Shutdown
end;
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
{ok, Channel};
handle_info(clean_acl_cache, Channel) ->
ok = emqx_acl_cache:empty_acl_cache(),
{ok, Channel};
handle_info(Info, Channel) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Handle timeout
%%--------------------------------------------------------------------
-spec(handle_timeout(reference(), Msg :: term(), channel())
-> {ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}).
handle_timeout(_TRef, {keepalive, _StatVal},
Channel = #channel{keepalive = undefined}) ->
{ok, Channel};
handle_timeout(_TRef, {keepalive, _StatVal},
Channel = #channel{conn_state = disconnected}) ->
{ok, Channel};
handle_timeout(_TRef, {keepalive, StatVal},
Channel = #channel{keepalive = Keepalive}) ->
case emqx_keepalive:check(StatVal, Keepalive) of
{ok, NKeepalive} ->
NChannel = Channel#channel{keepalive = NKeepalive},
{ok, reset_timer(alive_timer, NChannel)};
{error, timeout} ->
handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel)
end;
handle_timeout(_TRef, retry_delivery,
Channel = #channel{conn_state = disconnected}) ->
{ok, Channel};
handle_timeout(_TRef, retry_delivery,
Channel = #channel{session = Session}) ->
case emqx_session:retry(Session) of
{ok, NSession} ->
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
{ok, Publishes, Timeout, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
end;
handle_timeout(_TRef, expire_awaiting_rel,
Channel = #channel{conn_state = disconnected}) ->
{ok, Channel};
handle_timeout(_TRef, expire_awaiting_rel,
Channel = #channel{session = Session}) ->
case emqx_session:expire(awaiting_rel, Session) of
{ok, NSession} ->
{ok, clean_timer(await_timer, Channel#channel{session = NSession})};
{ok, Timeout, NSession} ->
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})}
end;
handle_timeout(_TRef, expire_session, Channel) ->
shutdown(expired, Channel);
handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) ->
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
handle_timeout(_TRef, expire_quota_limit, Channel) ->
{ok, clean_timer(quota_timer, Channel)};
handle_timeout(_TRef, Msg, Channel) ->
?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Ensure timers
%%--------------------------------------------------------------------
ensure_timer([Name], Channel) ->
ensure_timer(Name, Channel);
ensure_timer([Name | Rest], Channel) ->
ensure_timer(Rest, ensure_timer(Name, Channel));
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
TRef = maps:get(Name, Timers, undefined),
Time = interval(Name, Channel),
case TRef == undefined andalso Time > 0 of
true -> ensure_timer(Name, Time, Channel);
false -> Channel %% Timer disabled or exists
end.
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
Msg = maps:get(Name, ?TIMER_TABLE),
TRef = emqx_misc:start_timer(Time, Msg),
Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) ->
ensure_timer(Name, clean_timer(Name, Channel)).
reset_timer(Name, Time, Channel) ->
ensure_timer(Name, Time, clean_timer(Name, Channel)).
clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive);
interval(retry_timer, #channel{session = Session}) ->
timer:seconds(emqx_session:info(retry_interval, Session));
interval(await_timer, #channel{session = Session}) ->
timer:seconds(emqx_session:info(await_rel_timeout, Session));
interval(expire_timer, #channel{conninfo = ConnInfo}) ->
timer:seconds(maps:get(expiry_interval, ConnInfo));
interval(will_timer, #channel{will_msg = WillMsg}) ->
timer:seconds(will_delay_interval(WillMsg)).
%%--------------------------------------------------------------------
%% Terminate
%%--------------------------------------------------------------------
-spec(terminate(any(), channel()) -> ok).
terminate(_, #channel{conn_state = idle}) -> ok;
terminate(normal, Channel) ->
run_terminate_hook(normal, Channel);
terminate({shutdown, Reason}, Channel)
when Reason =:= kicked; Reason =:= discarded; Reason =:= takeovered ->
run_terminate_hook(Reason, Channel);
terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
run_terminate_hook(Reason, Channel).
run_terminate_hook(_Reason, #channel{session = undefined}) -> ok;
run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) ->
emqx_session:terminate(ClientInfo, Reason, Session).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Enrich MQTT Connect Info
enrich_conninfo(ConnPkt = #mqtt_packet_connect{
proto_name = ProtoName,
proto_ver = ProtoVer,
clean_start = CleanStart,
keepalive = Keepalive,
properties = ConnProps,
clientid = ClientId,
username = Username
},
Channel = #channel{conninfo = ConnInfo,
clientinfo = #{zone := Zone}
}) ->
ExpiryInterval = expiry_interval(Zone, ConnPkt),
ReceiveMaximum = receive_maximum(Zone, ConnProps),
NConnInfo = ConnInfo#{proto_name => ProtoName,
proto_ver => ProtoVer,
clean_start => CleanStart,
keepalive => Keepalive,
clientid => ClientId,
username => Username,
conn_props => ConnProps,
expiry_interval => ExpiryInterval,
receive_maximum => ReceiveMaximum
},
{ok, Channel#channel{conninfo = NConnInfo}}.
%% If the Session Expiry Interval is absent the value 0 is used.
-compile({inline, [expiry_interval/2]}).
expiry_interval(_Zone, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
properties = ConnProps}) ->
emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0);
expiry_interval(Zone, #mqtt_packet_connect{clean_start = false}) ->
emqx_zone:session_expiry_interval(Zone);
expiry_interval(_Zone, #mqtt_packet_connect{clean_start = true}) ->
0.
-compile({inline, [receive_maximum/2]}).
receive_maximum(Zone, ConnProps) ->
emqx_mqtt_props:get('Receive-Maximum', ConnProps, emqx_zone:max_inflight(Zone)).
%%--------------------------------------------------------------------
%% Run Connect Hooks
run_conn_hooks(ConnPkt, Channel = #channel{conninfo = ConnInfo}) ->
ConnProps = emqx_packet:info(properties, ConnPkt),
case run_hooks('client.connect', [ConnInfo], ConnProps) of
Error = {error, _Reason} -> Error;
NConnProps -> {ok, emqx_packet:set_props(NConnProps, ConnPkt), Channel}
end.
%%--------------------------------------------------------------------
%% Check Connect Packet
check_connect(ConnPkt, #channel{clientinfo = #{zone := Zone}}) ->
emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone)).
%%--------------------------------------------------------------------
%% Enrich Client Info
enrich_client(ConnPkt, Channel = #channel{clientinfo = ClientInfo}) ->
{ok, NConnPkt, NClientInfo} = pipeline([fun set_username/2,
fun set_bridge_mode/2,
fun maybe_username_as_clientid/2,
fun maybe_assign_clientid/2,
fun fix_mountpoint/2
], ConnPkt, ClientInfo),
{ok, NConnPkt, Channel#channel{clientinfo = NClientInfo}}.
set_username(#mqtt_packet_connect{username = Username},
ClientInfo = #{username := undefined}) ->
{ok, ClientInfo#{username => Username}};
set_username(_ConnPkt, ClientInfo) -> {ok, ClientInfo}.
set_bridge_mode(#mqtt_packet_connect{is_bridge = true}, ClientInfo) ->
{ok, ClientInfo#{is_bridge => true}};
set_bridge_mode(_ConnPkt, _ClientInfo) -> ok.
maybe_username_as_clientid(_ConnPkt, ClientInfo = #{username := undefined}) ->
{ok, ClientInfo};
maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone, username := Username}) ->
case emqx_zone:use_username_as_clientid(Zone) of
true -> {ok, ClientInfo#{clientid => Username}};
false -> ok
end.
maybe_assign_clientid(_ConnPkt, ClientInfo = #{clientid := ClientId})
when ClientId /= undefined ->
{ok, ClientInfo};
maybe_assign_clientid(#mqtt_packet_connect{clientid = <<>>}, ClientInfo) ->
%% Generate a rand clientId
{ok, ClientInfo#{clientid => emqx_guid:to_base62(emqx_guid:gen())}};
maybe_assign_clientid(#mqtt_packet_connect{clientid = ClientId}, ClientInfo) ->
{ok, ClientInfo#{clientid => ClientId}}.
fix_mountpoint(_ConnPkt, #{mountpoint := undefined}) -> ok;
fix_mountpoint(_ConnPkt, ClientInfo = #{mountpoint := MountPoint}) ->
MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo),
{ok, ClientInfo#{mountpoint := MountPoint1}}.
%%--------------------------------------------------------------------
%% Set log metadata
set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
emqx_logger:set_metadata_clientid(ClientId).
%%--------------------------------------------------------------------
%% Check banned
check_banned(_ConnPkt, #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
case emqx_zone:enable_ban(Zone) andalso emqx_banned:check(ClientInfo) of
true -> {error, ?RC_BANNED};
false -> ok
end.
%%--------------------------------------------------------------------
%% Auth Connect
auth_connect(#mqtt_packet_connect{clientid = ClientId,
username = Username,
password = Password
},
#channel{clientinfo = ClientInfo} = Channel) ->
case emqx_access_control:authenticate(ClientInfo#{password => Password}) of
{ok, AuthResult} ->
is_anonymous(AuthResult) andalso
emqx_metrics:inc('client.auth.anonymous'),
NClientInfo = maps:merge(ClientInfo, AuthResult),
{ok, Channel#channel{clientinfo = NClientInfo}};
{error, Reason} ->
?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p",
[ClientId, Username, Reason]),
{error, emqx_reason_codes:connack_error(Reason)}
end.
is_anonymous(#{anonymous := true}) -> true;
is_anonymous(_AuthResult) -> false.
%%--------------------------------------------------------------------
%% Enhanced Authentication
enhanced_auth(?CONNECT_PACKET(#mqtt_packet_connect{
proto_ver = Protover,
properties = Properties
}), Channel) ->
case Protover of
?MQTT_PROTO_V5 ->
AuthMethod = emqx_mqtt_props:get('Authentication-Method', Properties, undefined),
AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined),
do_enhanced_auth(AuthMethod, AuthData, Channel);
_ ->
{ok, #{}, Channel}
end;
enhanced_auth(?AUTH_PACKET(_ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) ->
AuthMethod = emqx_mqtt_props:get('Authentication-Method', emqx_mqtt_props:get(conn_props, ConnInfo, #{}), undefined),
NAuthMethod = emqx_mqtt_props:get('Authentication-Method', Properties, undefined),
AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined),
case NAuthMethod =:= undefined orelse NAuthMethod =/= AuthMethod of
true ->
{error, emqx_reason_codes:connack_error(bad_authentication_method), Channel};
false ->
do_enhanced_auth(AuthMethod, AuthData, Channel)
end.
do_enhanced_auth(undefined, undefined, Channel) ->
{ok, #{}, Channel};
do_enhanced_auth(undefined, _AuthData, Channel) ->
{error, emqx_reason_codes:connack_error(not_authorized), Channel};
do_enhanced_auth(_AuthMethod, undefined, Channel) ->
{error, emqx_reason_codes:connack_error(not_authorized), Channel};
do_enhanced_auth(AuthMethod, AuthData, Channel = #channel{auth_cache = Cache}) ->
case run_hooks('client.enhanced_authenticate',[AuthMethod, AuthData], Cache) of
{ok, NAuthData, NCache} ->
NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData},
{ok, NProperties, Channel#channel{auth_cache = NCache}};
{continue, NAuthData, NCache} ->
NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData},
{continue, NProperties, Channel#channel{auth_cache = NCache}};
_ ->
{error, emqx_reason_codes:connack_error(not_authorized), Channel}
end.
%%--------------------------------------------------------------------
%% Process Topic Alias
process_alias(Packet = #mqtt_packet{
variable = #mqtt_packet_publish{topic_name = <<>>,
properties = #{'Topic-Alias' := AliasId}
} = Publish
},
Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases}) ->
case find_alias(inbound, AliasId, TopicAliases) of
{ok, Topic} ->
NPublish = Publish#mqtt_packet_publish{topic_name = Topic},
{ok, Packet#mqtt_packet{variable = NPublish}, Channel};
false -> {error, ?RC_PROTOCOL_ERROR}
end;
process_alias(#mqtt_packet{
variable = #mqtt_packet_publish{topic_name = Topic,
properties = #{'Topic-Alias' := AliasId}
}
},
Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases}) ->
NTopicAliases = save_alias(inbound, AliasId, Topic, TopicAliases),
{ok, Channel#channel{topic_aliases = NTopicAliases}};
process_alias(_Packet, Channel) -> {ok, Channel}.
%%--------------------------------------------------------------------
%% Packing Topic Alias
packing_alias(Packet = #mqtt_packet{
variable = #mqtt_packet_publish{
topic_name = Topic,
properties = Prop
} = Publish
},
Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases, alias_maximum = Limits}) ->
case find_alias(outbound, Topic, TopicAliases) of
{ok, AliasId} ->
NPublish = Publish#mqtt_packet_publish{
topic_name = <<>>,
properties = maps:merge(Prop, #{'Topic-Alias' => AliasId})
},
{Packet#mqtt_packet{variable = NPublish}, Channel};
error ->
#{outbound := Aliases} = TopicAliases,
AliasId = maps:size(Aliases) + 1,
case (Limits =:= undefined) orelse
(AliasId =< maps:get(outbound, Limits, 0)) of
true ->
NTopicAliases = save_alias(outbound, AliasId, Topic, TopicAliases),
NChannel = Channel#channel{topic_aliases = NTopicAliases},
NPublish = Publish#mqtt_packet_publish{
topic_name = Topic,
properties = maps:merge(Prop, #{'Topic-Alias' => AliasId})
},
{Packet#mqtt_packet{variable = NPublish}, NChannel};
false -> {Packet, Channel}
end
end;
packing_alias(Packet, Channel) -> {Packet, Channel}.
%%--------------------------------------------------------------------
%% Check quota state
check_quota_exceeded(_, #channel{timers = Timers}) ->
case maps:get(quota_timer, Timers, undefined) of
undefined -> ok;
_ -> {error, ?RC_QUOTA_EXCEEDED}
end.
%%--------------------------------------------------------------------
%% Check Pub Alias
check_pub_alias(#mqtt_packet{
variable = #mqtt_packet_publish{
properties = #{'Topic-Alias' := AliasId}
}
},
#channel{alias_maximum = Limits}) ->
case (Limits =:= undefined) orelse
(AliasId =< maps:get(inbound, Limits, ?MAX_TOPIC_AlIAS)) of
true -> ok;
false -> {error, ?RC_TOPIC_ALIAS_INVALID}
end;
check_pub_alias(_Packet, _Channel) -> ok.
%%--------------------------------------------------------------------
%% Check Pub ACL
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
#channel{clientinfo = ClientInfo}) ->
case is_acl_enabled(ClientInfo) andalso
emqx_access_control:check_acl(ClientInfo, publish, Topic) of
false -> ok;
allow -> ok;
deny -> {error, ?RC_NOT_AUTHORIZED}
end.
%%--------------------------------------------------------------------
%% Check Pub Caps
check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
retain = Retain},
variable = #mqtt_packet_publish{topic_name = Topic}
},
#channel{clientinfo = #{zone := Zone}}) ->
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}).
%%--------------------------------------------------------------------
%% Check Subscribe
check_subscribe(TopicFilter, SubOpts, Channel) ->
case check_sub_acl(TopicFilter, Channel) of
allow -> check_sub_caps(TopicFilter, SubOpts, Channel);
deny -> {error, ?RC_NOT_AUTHORIZED}
end.
%%--------------------------------------------------------------------
%% Check Sub ACL
check_sub_acl(TopicFilter, #channel{clientinfo = ClientInfo}) ->
case is_acl_enabled(ClientInfo) andalso
emqx_access_control:check_acl(ClientInfo, subscribe, TopicFilter) of
false -> allow;
Result -> Result
end.
%%--------------------------------------------------------------------
%% Check Sub Caps
check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) ->
emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts).
%%--------------------------------------------------------------------
%% Enrich SubId
put_subid_in_subopts(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
[{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters];
put_subid_in_subopts(_Properties, TopicFilters) -> TopicFilters.
%%--------------------------------------------------------------------
%% Enrich SubOpts
enrich_subopts(SubOpts, _Channel = ?IS_MQTT_V5) ->
SubOpts;
enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) ->
NL = flag(emqx_zone:ignore_loop_deliver(Zone)),
SubOpts#{rap => flag(IsBridge), nl => NL}.
%%--------------------------------------------------------------------
%% Enrich ConnAck Caps
enrich_connack_caps(AckProps, ?IS_MQTT_V5 = #channel{clientinfo = #{zone := Zone}}) ->
#{max_packet_size := MaxPktSize,
max_qos_allowed := MaxQoS,
retain_available := Retain,
max_topic_alias := MaxAlias,
shared_subscription := Shared,
wildcard_subscription := Wildcard
} = emqx_mqtt_caps:get_caps(Zone),
NAckProps = AckProps#{'Retain-Available' => flag(Retain),
'Maximum-Packet-Size' => MaxPktSize,
'Topic-Alias-Maximum' => MaxAlias,
'Wildcard-Subscription-Available' => flag(Wildcard),
'Subscription-Identifier-Available' => 1,
'Shared-Subscription-Available' => flag(Shared)
},
%% MQTT 5.0 - 3.2.2.3.4:
%% It is a Protocol Error to include Maximum QoS more than once,
%% or to have a value other than 0 or 1. If the Maximum QoS is absent,
%% the Client uses a Maximum QoS of 2.
case MaxQoS =:= 2 of
true -> NAckProps;
_ -> NAckProps#{'Maximum-QoS' => MaxQoS}
end;
enrich_connack_caps(AckProps, _Channel) -> AckProps.
%%--------------------------------------------------------------------
%% Enrich server keepalive
enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) ->
case emqx_zone:server_keepalive(Zone) of
undefined -> AckProps;
Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive}
end.
%%--------------------------------------------------------------------
%% Enrich response information
enrich_response_information(AckProps, #channel{conninfo = #{conn_props := ConnProps},
clientinfo = #{zone := Zone}}) ->
case emqx_mqtt_props:get('Request-Response-Information', ConnProps, 0) of
0 -> AckProps;
1 -> AckProps#{'Response-Information' => emqx_zone:response_information(Zone)}
end.
%%--------------------------------------------------------------------
%% Enrich Assigned ClientId
enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo,
clientinfo = #{clientid := ClientId}}) ->
case maps:get(clientid, ConnInfo) of
<<>> -> %% Original ClientId is null.
AckProps#{'Assigned-Client-Identifier' => ClientId};
_Origin -> AckProps
end.
%%--------------------------------------------------------------------
%% Ensure connected
ensure_connected(Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
Channel#channel{conninfo = NConnInfo,
conn_state = connected
}.
%%--------------------------------------------------------------------
%% Init Alias Maximum
init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
properties = Properties},
#{zone := Zone} = _ClientInfo) ->
#{outbound => emqx_mqtt_props:get('Topic-Alias-Maximum', Properties, 0),
inbound => emqx_mqtt_caps:get_caps(Zone, max_topic_alias, ?MAX_TOPIC_AlIAS)
};
init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
%%--------------------------------------------------------------------
%% Enrich Keepalive
ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
ensure_keepalive_timer(Interval, Channel);
ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) ->
ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel).
ensure_keepalive_timer(0, Channel) -> Channel;
ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
Backoff = emqx_zone:keepalive_backoff(Zone),
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
%%--------------------------------------------------------------------
%% Maybe Resume Session
maybe_resume_session(#channel{resuming = false}) ->
ignore;
maybe_resume_session(#channel{session = Session,
resuming = true,
pendings = Pendings}) ->
{ok, Publishes, Session1} = emqx_session:replay(Session),
case emqx_session:deliver(Pendings, Session1) of
{ok, Session2} ->
{ok, Publishes, Session2};
{ok, More, Session2} ->
{ok, lists:append(Publishes, More), Session2}
end.
%%--------------------------------------------------------------------
%% Maybe Shutdown the Channel
maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
case maps:get(expiry_interval, ConnInfo) of
?UINT_MAX -> {ok, Channel};
I when I > 0 ->
{ok, ensure_timer(expire_timer, timer:seconds(I), Channel)};
_ -> shutdown(Reason, Channel)
end.
%%--------------------------------------------------------------------
%% Is ACL enabled?
-compile({inline, [is_acl_enabled/1]}).
is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
(not IsSuperuser) andalso emqx_zone:enable_acl(Zone).
%%--------------------------------------------------------------------
%% Parse Topic Filters
-compile({inline, [parse_topic_filters/1]}).
parse_topic_filters(TopicFilters) ->
lists:map(fun emqx_topic:parse/1, TopicFilters).
%%--------------------------------------------------------------------
%% Ensure disconnected
ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
%%--------------------------------------------------------------------
%% Maybe Publish will msg
mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
Channel;
mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
case will_delay_interval(WillMsg) of
0 -> publish_will_msg(WillMsg),
Channel#channel{will_msg = undefined};
I -> ensure_timer(will_timer, timer:seconds(I), Channel)
end.
will_delay_interval(WillMsg) ->
maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0).
publish_will_msg(Msg) -> emqx_broker:publish(Msg).
%%--------------------------------------------------------------------
%% Disconnect Reason
disconnect_reason(?RC_SUCCESS) -> normal;
disconnect_reason(ReasonCode) -> emqx_reason_codes:name(ReasonCode).
reason_code(takeovered) -> ?RC_SESSION_TAKEN_OVER;
reason_code(discarded) -> ?RC_SESSION_TAKEN_OVER;
reason_code(_) -> ?RC_NORMAL_DISCONNECTION.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
-compile({inline, [run_hooks/2, run_hooks/3]}).
run_hooks(Name, Args) ->
ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
run_hooks(Name, Args, Acc) ->
ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc).
-compile({inline, [find_alias/3, save_alias/4]}).
find_alias(_, _ ,undefined) -> false;
find_alias(inbound, AliasId, _TopicAliases = #{inbound := Aliases}) ->
maps:find(AliasId, Aliases);
find_alias(outbound, Topic, _TopicAliases = #{outbound := Aliases}) ->
maps:find(Topic, Aliases).
save_alias(_, _, _, undefined) -> false;
save_alias(inbound, AliasId, Topic, TopicAliases = #{inbound := Aliases}) ->
NAliases = maps:put(AliasId, Topic, Aliases),
TopicAliases#{inbound => NAliases};
save_alias(outbound, AliasId, Topic, TopicAliases = #{outbound := Aliases}) ->
NAliases = maps:put(Topic, AliasId, Aliases),
TopicAliases#{outbound => NAliases}.
-compile({inline, [reply/2, shutdown/2, shutdown/3, sp/1, flag/1]}).
reply(Reply, Channel) ->
{reply, Reply, Channel}.
shutdown(success, Channel) ->
shutdown(normal, Channel);
shutdown(Reason, Channel) ->
{shutdown, Reason, Channel}.
shutdown(success, Reply, Channel) ->
shutdown(normal, Reply, Channel);
shutdown(Reason, Reply, Channel) ->
{shutdown, Reason, Reply, Channel}.
shutdown(success, Reply, Packet, Channel) ->
shutdown(normal, Reply, Packet, Channel);
shutdown(Reason, Reply, Packet, Channel) ->
{shutdown, Reason, Reply, Packet, Channel}.
disconnect_and_shutdown(Reason, Reply, Channel = ?IS_MQTT_V5
= #channel{conn_state = connected}) ->
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel);
disconnect_and_shutdown(Reason, Reply, Channel) ->
shutdown(Reason, Reply, Channel).
sp(true) -> 1;
sp(false) -> 0.
flag(true) -> 1;
flag(false) -> 0.
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------
set_field(Name, Value, Channel) ->
Pos = emqx_misc:index_of(Name, record_info(fields, channel)),
setelement(Pos+1, Channel, Value).