1645 lines
68 KiB
Erlang
1645 lines
68 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% MQTT Channel
|
|
-module(emqx_channel).
|
|
|
|
-include("emqx.hrl").
|
|
-include("emqx_mqtt.hrl").
|
|
-include("logger.hrl").
|
|
-include("types.hrl").
|
|
|
|
-logger_header("[Channel]").
|
|
|
|
-ifdef(TEST).
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
-endif.
|
|
|
|
-export([ info/1
|
|
, info/2
|
|
, stats/1
|
|
, caps/1
|
|
]).
|
|
|
|
-export([ init/2
|
|
, handle_in/2
|
|
, handle_deliver/2
|
|
, handle_out/3
|
|
, handle_timeout/3
|
|
, handle_call/2
|
|
, handle_info/2
|
|
, terminate/2
|
|
]).
|
|
|
|
%% Exports for CT
|
|
-export([set_field/3]).
|
|
|
|
-import(emqx_misc,
|
|
[ run_fold/3
|
|
, pipeline/3
|
|
, maybe_apply/2
|
|
]).
|
|
|
|
-export_type([channel/0]).
|
|
|
|
-record(channel, {
|
|
%% MQTT ConnInfo
|
|
conninfo :: emqx_types:conninfo(),
|
|
%% MQTT ClientInfo
|
|
clientinfo :: emqx_types:clientinfo(),
|
|
%% MQTT Session
|
|
session :: maybe(emqx_session:session()),
|
|
%% Keepalive
|
|
keepalive :: maybe(emqx_keepalive:keepalive()),
|
|
%% MQTT Will Msg
|
|
will_msg :: maybe(emqx_types:message()),
|
|
%% MQTT Topic Aliases
|
|
topic_aliases :: emqx_types:topic_aliases(),
|
|
%% MQTT Topic Alias Maximum
|
|
alias_maximum :: maybe(map()),
|
|
%% Authentication Data Cache
|
|
auth_cache :: maybe(map()),
|
|
%% Quota checkers
|
|
quota :: maybe(emqx_limiter:limiter()),
|
|
%% Timers
|
|
timers :: #{atom() => disabled | maybe(reference())},
|
|
%% Conn State
|
|
conn_state :: conn_state(),
|
|
%% Takeover
|
|
takeover :: boolean(),
|
|
%% Resume
|
|
resuming :: boolean(),
|
|
%% Pending delivers when takeovering
|
|
pendings :: list()
|
|
}).
|
|
|
|
-opaque(channel() :: #channel{}).
|
|
|
|
-type(conn_state() :: idle | connecting | connected | disconnected).
|
|
|
|
-type(reply() :: {outgoing, emqx_types:packet()}
|
|
| {outgoing, [emqx_types:packet()]}
|
|
| {event, conn_state()|updated}
|
|
| {close, Reason :: atom()}).
|
|
|
|
-type(replies() :: emqx_types:packet() | reply() | [reply()]).
|
|
|
|
-define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}).
|
|
|
|
-define(TIMER_TABLE, #{
|
|
alive_timer => keepalive,
|
|
retry_timer => retry_delivery,
|
|
await_timer => expire_awaiting_rel,
|
|
expire_timer => expire_session,
|
|
will_timer => will_message,
|
|
quota_timer => expire_quota_limit
|
|
}).
|
|
|
|
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
|
|
|
|
-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Info, Attrs and Caps
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% @doc Get infos of the channel.
|
|
-spec(info(channel()) -> emqx_types:infos()).
|
|
info(Channel) ->
|
|
maps:from_list(info(?INFO_KEYS, Channel)).
|
|
|
|
-spec(info(list(atom())|atom(), channel()) -> term()).
|
|
info(Keys, Channel) when is_list(Keys) ->
|
|
[{Key, info(Key, Channel)} || Key <- Keys];
|
|
info(conninfo, #channel{conninfo = ConnInfo}) ->
|
|
ConnInfo;
|
|
info(zone, #channel{clientinfo = #{zone := Zone}}) ->
|
|
Zone;
|
|
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
|
|
ClientId;
|
|
info(username, #channel{clientinfo = #{username := Username}}) ->
|
|
Username;
|
|
info(socktype, #channel{conninfo = #{socktype := SockType}}) ->
|
|
SockType;
|
|
info(peername, #channel{conninfo = #{peername := Peername}}) ->
|
|
Peername;
|
|
info(sockname, #channel{conninfo = #{sockname := Sockname}}) ->
|
|
Sockname;
|
|
info(proto_name, #channel{conninfo = #{proto_name := ProtoName}}) ->
|
|
ProtoName;
|
|
info(proto_ver, #channel{conninfo = #{proto_ver := ProtoVer}}) ->
|
|
ProtoVer;
|
|
info(connected_at, #channel{conninfo = #{connected_at := ConnectedAt}}) ->
|
|
ConnectedAt;
|
|
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
|
|
ClientInfo;
|
|
info(session, #channel{session = Session}) ->
|
|
maybe_apply(fun emqx_session:info/1, Session);
|
|
info(conn_state, #channel{conn_state = ConnState}) ->
|
|
ConnState;
|
|
info(keepalive, #channel{keepalive = Keepalive}) ->
|
|
maybe_apply(fun emqx_keepalive:info/1, Keepalive);
|
|
info(will_msg, #channel{will_msg = undefined}) ->
|
|
undefined;
|
|
info(will_msg, #channel{will_msg = WillMsg}) ->
|
|
emqx_message:to_map(WillMsg);
|
|
info(topic_aliases, #channel{topic_aliases = Aliases}) ->
|
|
Aliases;
|
|
info(alias_maximum, #channel{alias_maximum = Limits}) ->
|
|
Limits;
|
|
info(timers, #channel{timers = Timers}) -> Timers.
|
|
|
|
%% TODO: Add more stats.
|
|
-spec(stats(channel()) -> emqx_types:stats()).
|
|
stats(#channel{session = Session})->
|
|
emqx_session:stats(Session).
|
|
|
|
-spec(caps(channel()) -> emqx_types:caps()).
|
|
caps(#channel{clientinfo = #{zone := Zone}}) ->
|
|
emqx_mqtt_caps:get_caps(Zone).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Init the channel
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(init(emqx_types:conninfo(), proplists:proplist()) -> channel()).
|
|
init(ConnInfo = #{peername := {PeerHost, _Port},
|
|
sockname := {_Host, SockPort}}, Options) ->
|
|
Zone = proplists:get_value(zone, Options),
|
|
Peercert = maps:get(peercert, ConnInfo, undefined),
|
|
Protocol = maps:get(protocol, ConnInfo, mqtt),
|
|
MountPoint = emqx_zone:mountpoint(Zone),
|
|
QuotaPolicy = emqx_zone:quota_policy(Zone),
|
|
ClientInfo = setting_peercert_infos(
|
|
Peercert,
|
|
#{zone => Zone,
|
|
protocol => Protocol,
|
|
peerhost => PeerHost,
|
|
sockport => SockPort,
|
|
clientid => undefined,
|
|
username => undefined,
|
|
mountpoint => MountPoint,
|
|
is_bridge => false,
|
|
is_superuser => false
|
|
}, Options),
|
|
{NClientInfo, NConnInfo} = take_ws_cookie(ClientInfo, ConnInfo),
|
|
#channel{conninfo = NConnInfo,
|
|
clientinfo = NClientInfo,
|
|
topic_aliases = #{inbound => #{},
|
|
outbound => #{}
|
|
},
|
|
auth_cache = #{},
|
|
quota = emqx_limiter:init(Zone, QuotaPolicy),
|
|
timers = #{},
|
|
conn_state = idle,
|
|
takeover = false,
|
|
resuming = false,
|
|
pendings = []
|
|
}.
|
|
|
|
setting_peercert_infos(NoSSL, ClientInfo, _Options)
|
|
when NoSSL =:= nossl;
|
|
NoSSL =:= undefined ->
|
|
ClientInfo#{username => undefined};
|
|
|
|
setting_peercert_infos(Peercert, ClientInfo, Options) ->
|
|
{DN, CN} = {esockd_peercert:subject(Peercert),
|
|
esockd_peercert:common_name(Peercert)},
|
|
Username = case proplists:get_value(peer_cert_as_username, Options) of
|
|
cn -> CN;
|
|
dn -> DN;
|
|
crt -> Peercert;
|
|
_ -> undefined
|
|
end,
|
|
ClientInfo#{username => Username, dn => DN, cn => CN}.
|
|
|
|
take_ws_cookie(ClientInfo, ConnInfo) ->
|
|
case maps:take(ws_cookie, ConnInfo) of
|
|
{WsCookie, NConnInfo} ->
|
|
{ClientInfo#{ws_cookie => WsCookie}, NConnInfo};
|
|
_ ->
|
|
{ClientInfo, ConnInfo}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle incoming packet
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(handle_in(emqx_types:packet(), channel())
|
|
-> {ok, channel()}
|
|
| {ok, replies(), channel()}
|
|
| {shutdown, Reason :: term(), channel()}
|
|
| {shutdown, Reason :: term(), replies(), channel()}).
|
|
handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connected}) ->
|
|
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
|
|
|
|
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
|
case pipeline([fun enrich_conninfo/2,
|
|
fun run_conn_hooks/2,
|
|
fun check_connect/2,
|
|
fun enrich_client/2,
|
|
fun set_log_meta/2,
|
|
fun check_banned/2,
|
|
fun auth_connect/2
|
|
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
|
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
|
|
NChannel1 = NChannel#channel{
|
|
will_msg = emqx_packet:will_msg(NConnPkt),
|
|
alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
|
|
},
|
|
case enhanced_auth(?CONNECT_PACKET(NConnPkt), NChannel1) of
|
|
{ok, Properties, NChannel2} ->
|
|
process_connect(Properties, ensure_connected(NChannel2));
|
|
{continue, Properties, NChannel2} ->
|
|
handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2);
|
|
{error, ReasonCode, NChannel2} ->
|
|
handle_out(connack, ReasonCode, NChannel2)
|
|
end;
|
|
{error, ReasonCode, NChannel} ->
|
|
handle_out(connack, ReasonCode, NChannel)
|
|
end;
|
|
|
|
handle_in(Packet = ?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION, _Properties), Channel = #channel{conn_state = ConnState}) ->
|
|
case enhanced_auth(Packet, Channel) of
|
|
{ok, NProperties, NChannel} ->
|
|
case ConnState of
|
|
connecting ->
|
|
process_connect(NProperties, ensure_connected(NChannel));
|
|
connected ->
|
|
handle_out(auth, {?RC_SUCCESS, NProperties}, NChannel);
|
|
_ ->
|
|
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel)
|
|
end;
|
|
{continue, NProperties, NChannel} ->
|
|
handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel);
|
|
{error, NReasonCode, NChannel} ->
|
|
handle_out(connack, NReasonCode, NChannel)
|
|
end;
|
|
|
|
handle_in(Packet = ?AUTH_PACKET(?RC_RE_AUTHENTICATE, _Properties), Channel = #channel{conn_state = connected}) ->
|
|
case enhanced_auth(Packet, Channel) of
|
|
{ok, NProperties, NChannel} ->
|
|
handle_out(auth, {?RC_SUCCESS, NProperties}, NChannel);
|
|
{continue, NProperties, NChannel} ->
|
|
handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel);
|
|
{error, NReasonCode, NChannel} ->
|
|
handle_out(disconnect, NReasonCode, NChannel)
|
|
end;
|
|
|
|
handle_in(?PACKET(_), Channel = #channel{conn_state = ConnState}) when ConnState =/= connected ->
|
|
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
|
|
|
|
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
|
|
case emqx_packet:check(Packet) of
|
|
ok -> process_publish(Packet, Channel);
|
|
{error, ReasonCode} ->
|
|
handle_out(disconnect, ReasonCode, Channel)
|
|
end;
|
|
|
|
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
|
|
= #channel{clientinfo = ClientInfo, session = Session}) ->
|
|
case emqx_session:puback(PacketId, Session) of
|
|
{ok, Msg, NSession} ->
|
|
ok = after_message_acked(ClientInfo, Msg, Properties),
|
|
{ok, Channel#channel{session = NSession}};
|
|
{ok, Msg, Publishes, NSession} ->
|
|
ok = after_message_acked(ClientInfo, Msg, Properties),
|
|
handle_out(publish, Publishes, Channel#channel{session = NSession});
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
|
?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
|
|
ok = emqx_metrics:inc('packets.puback.inuse'),
|
|
{ok, Channel};
|
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
|
?LOG(warning, "The PUBACK PacketId ~w is not found.", [PacketId]),
|
|
ok = emqx_metrics:inc('packets.puback.missed'),
|
|
{ok, Channel}
|
|
end;
|
|
|
|
handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
|
|
= #channel{clientinfo = ClientInfo, session = Session}) ->
|
|
case emqx_session:pubrec(PacketId, Session) of
|
|
{ok, Msg, NSession} ->
|
|
ok = after_message_acked(ClientInfo, Msg, Properties),
|
|
NChannel = Channel#channel{session = NSession},
|
|
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
|
|
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
|
?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]),
|
|
ok = emqx_metrics:inc('packets.pubrec.inuse'),
|
|
handle_out(pubrel, {PacketId, RC}, Channel);
|
|
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
|
?LOG(warning, "The PUBREC ~w is not found.", [PacketId]),
|
|
ok = emqx_metrics:inc('packets.pubrec.missed'),
|
|
handle_out(pubrel, {PacketId, RC}, Channel)
|
|
end;
|
|
|
|
handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
|
case emqx_session:pubrel(PacketId, Session) of
|
|
{ok, NSession} ->
|
|
NChannel = Channel#channel{session = NSession},
|
|
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
|
|
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
|
?LOG(warning, "The PUBREL PacketId ~w is not found.", [PacketId]),
|
|
ok = emqx_metrics:inc('packets.pubrel.missed'),
|
|
handle_out(pubcomp, {PacketId, RC}, Channel)
|
|
end;
|
|
|
|
handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
|
case emqx_session:pubcomp(PacketId, Session) of
|
|
{ok, NSession} ->
|
|
{ok, Channel#channel{session = NSession}};
|
|
{ok, Publishes, NSession} ->
|
|
handle_out(publish, Publishes, Channel#channel{session = NSession});
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
|
ok = emqx_metrics:inc('packets.pubcomp.inuse'),
|
|
{ok, Channel};
|
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
|
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]),
|
|
ok = emqx_metrics:inc('packets.pubcomp.missed'),
|
|
{ok, Channel}
|
|
end;
|
|
|
|
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
|
|
case emqx_packet:check(Packet) of
|
|
ok -> TopicFilters1 = parse_topic_filters(TopicFilters),
|
|
TopicFilters2 = put_subid_in_subopts(Properties, TopicFilters1),
|
|
TopicFilters3 = run_hooks('client.subscribe',
|
|
[ClientInfo, Properties],
|
|
TopicFilters2
|
|
),
|
|
{ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Properties, Channel),
|
|
case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso
|
|
lists:any(fun(ReasonCode) ->
|
|
ReasonCode =:= ?RC_NOT_AUTHORIZED
|
|
end, ReasonCodes) of
|
|
true ->
|
|
handle_out(disconnect, ?RC_NOT_AUTHORIZED, NChannel);
|
|
false ->
|
|
handle_out(suback, {PacketId, ReasonCodes}, NChannel)
|
|
end;
|
|
{error, ReasonCode} ->
|
|
handle_out(disconnect, ReasonCode, Channel)
|
|
end;
|
|
|
|
handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
Channel = #channel{clientinfo = ClientInfo}) ->
|
|
case emqx_packet:check(Packet) of
|
|
ok -> TopicFilters1 = run_hooks('client.unsubscribe',
|
|
[ClientInfo, Properties],
|
|
parse_topic_filters(TopicFilters)
|
|
),
|
|
{ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Properties, Channel),
|
|
handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
|
|
{error, ReasonCode} ->
|
|
handle_out(disconnect, ReasonCode, Channel)
|
|
end;
|
|
|
|
handle_in(?PACKET(?PINGREQ), Channel) ->
|
|
{ok, ?PACKET(?PINGRESP), Channel};
|
|
|
|
handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) ->
|
|
NChannel = maybe_clean_will_msg(ReasonCode, Channel#channel{conninfo = ConnInfo#{disconn_props => Properties}}),
|
|
process_disconnect(ReasonCode, Properties, NChannel);
|
|
|
|
handle_in(?AUTH_PACKET(), Channel) ->
|
|
handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
|
|
|
|
handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
|
|
shutdown(Reason, Channel);
|
|
|
|
handle_in({frame_error, frame_too_large}, Channel = #channel{conn_state = connecting}) ->
|
|
shutdown(frame_too_large, ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel);
|
|
handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
|
|
shutdown(Reason, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
|
|
|
|
handle_in({frame_error, frame_too_large}, Channel = #channel{conn_state = connected}) ->
|
|
handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
|
|
handle_in({frame_error, Reason}, Channel = #channel{conn_state = connected}) ->
|
|
handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
|
|
|
|
handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
|
|
?LOG(error, "Unexpected frame error: ~p", [Reason]),
|
|
{ok, Channel};
|
|
|
|
handle_in(Packet, Channel) ->
|
|
?LOG(error, "Unexpected incoming: ~p", [Packet]),
|
|
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Process Connect
|
|
%%--------------------------------------------------------------------
|
|
|
|
process_connect(AckProps, Channel = #channel{conninfo = #{clean_start := CleanStart} = ConnInfo, clientinfo = ClientInfo}) ->
|
|
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
|
|
{ok, #{session := Session, present := false}} ->
|
|
NChannel = Channel#channel{session = Session},
|
|
handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, NChannel);
|
|
{ok, #{session := Session, present := true, pendings := Pendings}} ->
|
|
Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
|
|
NChannel = Channel#channel{session = Session,
|
|
resuming = true,
|
|
pendings = Pendings1
|
|
},
|
|
handle_out(connack, {?RC_SUCCESS, sp(true), AckProps}, NChannel);
|
|
{error, client_id_unavailable} ->
|
|
handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel);
|
|
{error, Reason} ->
|
|
?LOG(error, "Failed to open session due to ~p", [Reason]),
|
|
handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Process Publish
|
|
%%--------------------------------------------------------------------
|
|
|
|
process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
|
Channel = #channel{clientinfo = #{zone := Zone}}) ->
|
|
case pipeline([fun check_quota_exceeded/2,
|
|
fun process_alias/2,
|
|
fun check_pub_alias/2,
|
|
fun check_pub_acl/2,
|
|
fun check_pub_caps/2
|
|
], Packet, Channel) of
|
|
{ok, NPacket, NChannel} ->
|
|
Msg = packet_to_message(NPacket, NChannel),
|
|
do_publish(PacketId, Msg, NChannel);
|
|
{error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
|
|
?LOG(warning, "Cannot publish message to ~s due to ~s.",
|
|
[Topic, emqx_reason_codes:text(Rc)]),
|
|
case emqx_zone:get_env(Zone, acl_deny_action, ignore) of
|
|
ignore ->
|
|
case QoS of
|
|
?QOS_0 -> {ok, NChannel};
|
|
_ ->
|
|
handle_out(puback, {PacketId, Rc}, NChannel)
|
|
end;
|
|
disconnect ->
|
|
handle_out(disconnect, Rc, NChannel)
|
|
end;
|
|
{error, Rc = ?RC_QUOTA_EXCEEDED, NChannel} ->
|
|
?LOG(warning, "Cannot publish messages to ~s due to ~s.",
|
|
[Topic, emqx_reason_codes:text(Rc)]),
|
|
case QoS of
|
|
?QOS_0 ->
|
|
ok = emqx_metrics:inc('packets.publish.dropped'),
|
|
{ok, NChannel};
|
|
?QOS_1 ->
|
|
handle_out(puback, {PacketId, Rc}, NChannel);
|
|
?QOS_2 ->
|
|
handle_out(pubrec, {PacketId, Rc}, NChannel)
|
|
end;
|
|
{error, Rc, NChannel} ->
|
|
?LOG(warning, "Cannot publish message to ~s due to ~s.",
|
|
[Topic, emqx_reason_codes:text(Rc)]),
|
|
handle_out(disconnect, Rc, NChannel)
|
|
end.
|
|
|
|
packet_to_message(Packet, #channel{
|
|
conninfo = #{proto_ver := ProtoVer},
|
|
clientinfo = #{
|
|
protocol := Protocol,
|
|
clientid := ClientId,
|
|
username := Username,
|
|
peerhost := PeerHost,
|
|
mountpoint := MountPoint
|
|
}
|
|
}) ->
|
|
emqx_mountpoint:mount(MountPoint,
|
|
emqx_packet:to_message(
|
|
Packet, ClientId,
|
|
#{proto_ver => ProtoVer,
|
|
protocol => Protocol,
|
|
username => Username,
|
|
peerhost => PeerHost})).
|
|
|
|
do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->
|
|
Result = emqx_broker:publish(Msg),
|
|
NChannel = ensure_quota(Result, Channel),
|
|
{ok, NChannel};
|
|
|
|
do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
|
|
PubRes = emqx_broker:publish(Msg),
|
|
RC = puback_reason_code(PubRes),
|
|
NChannel = ensure_quota(PubRes, Channel),
|
|
handle_out(puback, {PacketId, RC}, NChannel);
|
|
|
|
do_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
|
Channel = #channel{session = Session}) ->
|
|
case emqx_session:publish(PacketId, Msg, Session) of
|
|
{ok, PubRes, NSession} ->
|
|
RC = puback_reason_code(PubRes),
|
|
NChannel1 = ensure_timer(await_timer, Channel#channel{session = NSession}),
|
|
NChannel2 = ensure_quota(PubRes, NChannel1),
|
|
handle_out(pubrec, {PacketId, RC}, NChannel2);
|
|
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
|
ok = emqx_metrics:inc('packets.publish.inuse'),
|
|
handle_out(pubrec, {PacketId, RC}, Channel);
|
|
{error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
|
|
?LOG(warning, "Dropped the qos2 packet ~w "
|
|
"due to awaiting_rel is full.", [PacketId]),
|
|
ok = emqx_metrics:inc('packets.publish.dropped'),
|
|
handle_out(pubrec, {PacketId, RC}, Channel)
|
|
end.
|
|
|
|
ensure_quota(_, Channel = #channel{quota = undefined}) ->
|
|
Channel;
|
|
ensure_quota(PubRes, Channel = #channel{quota = Limiter}) ->
|
|
Cnt = lists:foldl(
|
|
fun({_, _, ok}, N) -> N + 1;
|
|
({_, _, {ok, I}}, N) -> N + I;
|
|
(_, N) -> N
|
|
end, 1, PubRes),
|
|
case emqx_limiter:check(#{cnt => Cnt, oct => 0}, Limiter) of
|
|
{ok, NLimiter} ->
|
|
Channel#channel{quota = NLimiter};
|
|
{pause, Intv, NLimiter} ->
|
|
ensure_timer(quota_timer, Intv, Channel#channel{quota = NLimiter})
|
|
end.
|
|
|
|
-compile({inline, [puback_reason_code/1]}).
|
|
puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
|
|
puback_reason_code([_|_]) -> ?RC_SUCCESS.
|
|
|
|
-compile({inline, [after_message_acked/3]}).
|
|
after_message_acked(ClientInfo, Msg, PubAckProps) ->
|
|
ok = emqx_metrics:inc('messages.acked'),
|
|
emqx_hooks:run('message.acked', [ClientInfo,
|
|
emqx_message:set_header(puback_props, PubAckProps, Msg)]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Process Subscribe
|
|
%%--------------------------------------------------------------------
|
|
|
|
-compile({inline, [process_subscribe/3]}).
|
|
process_subscribe(TopicFilters, SubProps, Channel) ->
|
|
process_subscribe(TopicFilters, SubProps, Channel, []).
|
|
|
|
process_subscribe([], _SubProps, Channel, Acc) ->
|
|
{lists:reverse(Acc), Channel};
|
|
|
|
process_subscribe([{TopicFilter, SubOpts}|More], SubProps, Channel, Acc) ->
|
|
case check_subscribe(TopicFilter, SubOpts, Channel) of
|
|
ok ->
|
|
{RC, NChannel} = do_subscribe(TopicFilter, SubOpts#{sub_props => SubProps}, Channel),
|
|
process_subscribe(More, SubProps, NChannel, [RC|Acc]);
|
|
{error, RC} ->
|
|
?LOG(warning, "Cannot subscribe ~s due to ~s.", [TopicFilter, emqx_reason_codes:text(RC)]),
|
|
process_subscribe(More, SubProps, Channel, [RC|Acc])
|
|
end.
|
|
|
|
do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
|
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
|
session = Session}) ->
|
|
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
|
|
case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
|
|
{ok, NSession} ->
|
|
{QoS, Channel#channel{session = NSession}};
|
|
{error, RC} ->
|
|
?LOG(warning, "Cannot subscribe ~s due to ~s.", [TopicFilter, emqx_reason_codes:text(RC)]),
|
|
{RC, Channel}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Process Unsubscribe
|
|
%%--------------------------------------------------------------------
|
|
|
|
-compile({inline, [process_unsubscribe/3]}).
|
|
process_unsubscribe(TopicFilters, UnSubProps, Channel) ->
|
|
process_unsubscribe(TopicFilters, UnSubProps, Channel, []).
|
|
|
|
process_unsubscribe([], _UnSubProps, Channel, Acc) ->
|
|
{lists:reverse(Acc), Channel};
|
|
|
|
process_unsubscribe([{TopicFilter, SubOpts}|More], UnSubProps, Channel, Acc) ->
|
|
{RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts#{unsub_props => UnSubProps}, Channel),
|
|
process_unsubscribe(More, UnSubProps, NChannel, [RC|Acc]).
|
|
|
|
do_unsubscribe(TopicFilter, SubOpts, Channel =
|
|
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
|
|
session = Session}) ->
|
|
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of
|
|
{ok, NSession} ->
|
|
{?RC_SUCCESS, Channel#channel{session = NSession}};
|
|
{error, RC} -> {RC, Channel}
|
|
end.
|
|
%%--------------------------------------------------------------------
|
|
%% Process Disconnect
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% MQTT-v5.0: 3.14.4 DISCONNECT Actions
|
|
maybe_clean_will_msg(?RC_SUCCESS, Channel) ->
|
|
Channel#channel{will_msg = undefined};
|
|
maybe_clean_will_msg(_ReasonCode, Channel) ->
|
|
Channel.
|
|
|
|
%% MQTT-v5.0: 3.14.2.2.2 Session Expiry Interval
|
|
process_disconnect(_ReasonCode, #{'Session-Expiry-Interval' := Interval},
|
|
Channel = #channel{conninfo = #{expiry_interval := 0}})
|
|
when Interval > 0 ->
|
|
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
|
|
|
|
process_disconnect(ReasonCode, Properties, Channel) ->
|
|
NChannel = maybe_update_expiry_interval(Properties, Channel),
|
|
{ok, {close, disconnect_reason(ReasonCode)}, NChannel}.
|
|
|
|
maybe_update_expiry_interval(#{'Session-Expiry-Interval' := Interval},
|
|
Channel = #channel{conninfo = ConnInfo}) ->
|
|
Channel#channel{conninfo = ConnInfo#{expiry_interval => Interval}};
|
|
maybe_update_expiry_interval(_Properties, Channel) -> Channel.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle Delivers from broker to client
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(handle_deliver(list(emqx_types:deliver()), channel())
|
|
-> {ok, channel()} | {ok, replies(), channel()}).
|
|
handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
|
|
session = Session,
|
|
clientinfo = #{clientid := ClientId}}) ->
|
|
NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session),
|
|
{ok, Channel#channel{session = NSession}};
|
|
|
|
handle_deliver(Delivers, Channel = #channel{takeover = true,
|
|
pendings = Pendings,
|
|
session = Session,
|
|
clientinfo = #{clientid := ClientId}}) ->
|
|
NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)),
|
|
{ok, Channel#channel{pendings = NPendings}};
|
|
|
|
handle_deliver(Delivers, Channel = #channel{session = Session,
|
|
clientinfo = #{clientid := ClientId}}) ->
|
|
case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
|
|
{ok, Publishes, NSession} ->
|
|
NChannel = Channel#channel{session = NSession},
|
|
handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
|
|
{ok, NSession} ->
|
|
{ok, Channel#channel{session = NSession}}
|
|
end.
|
|
|
|
ignore_local(Delivers, Subscriber, Session) ->
|
|
Subs = emqx_session:info(subscriptions, Session),
|
|
lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
|
|
case maps:find(Topic, Subs) of
|
|
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
|
|
ok = emqx_metrics:inc('delivery.dropped'),
|
|
ok = emqx_metrics:inc('delivery.dropped.no_local'),
|
|
true;
|
|
_ ->
|
|
false
|
|
end
|
|
end, Delivers).
|
|
|
|
%% Nack delivers from shared subscription
|
|
maybe_nack(Delivers) ->
|
|
lists:filter(fun not_nacked/1, Delivers).
|
|
|
|
not_nacked({deliver, _Topic, Msg}) ->
|
|
not (emqx_shared_sub:is_ack_required(Msg)
|
|
andalso (ok == emqx_shared_sub:nack_no_connection(Msg))).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle outgoing packet
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(handle_out(atom(), term(), channel())
|
|
-> {ok, channel()}
|
|
| {ok, replies(), channel()}
|
|
| {shutdown, Reason :: term(), channel()}
|
|
| {shutdown, Reason :: term(), replies(), channel()}).
|
|
handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = ConnInfo}) ->
|
|
AckProps = run_fold([fun enrich_connack_caps/2,
|
|
fun enrich_server_keepalive/2,
|
|
fun enrich_response_information/2,
|
|
fun enrich_assigned_clientid/2
|
|
], Props, Channel),
|
|
NAckProps = run_hooks('client.connack', [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps),
|
|
|
|
return_connack(?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps),
|
|
ensure_keepalive(NAckProps, Channel));
|
|
|
|
handle_out(connack, ReasonCode, Channel = #channel{conninfo = ConnInfo}) ->
|
|
Reason = emqx_reason_codes:name(ReasonCode),
|
|
AckProps = run_hooks('client.connack', [ConnInfo, Reason], emqx_mqtt_props:new()),
|
|
AckPacket = ?CONNACK_PACKET(case maps:get(proto_ver, ConnInfo) of
|
|
?MQTT_PROTO_V5 -> ReasonCode;
|
|
_ -> emqx_reason_codes:compat(connack, ReasonCode)
|
|
end, sp(false), AckProps),
|
|
shutdown(Reason, AckPacket, Channel);
|
|
|
|
%% Optimize?
|
|
handle_out(publish, [], Channel) ->
|
|
{ok, Channel};
|
|
|
|
handle_out(publish, Publishes, Channel) ->
|
|
{Packets, NChannel} = do_deliver(Publishes, Channel),
|
|
{ok, {outgoing, Packets}, NChannel};
|
|
|
|
handle_out(puback, {PacketId, ReasonCode}, Channel) ->
|
|
{ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel};
|
|
|
|
handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
|
|
{ok, ?PUBREC_PACKET(PacketId, ReasonCode), Channel};
|
|
|
|
handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
|
|
{ok, ?PUBREL_PACKET(PacketId, ReasonCode), Channel};
|
|
|
|
handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
|
|
{ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel};
|
|
|
|
handle_out(suback, {PacketId, ReasonCodes}, Channel = ?IS_MQTT_V5) ->
|
|
return_suback(?SUBACK_PACKET(PacketId, ReasonCodes), Channel);
|
|
|
|
handle_out(suback, {PacketId, ReasonCodes}, Channel) ->
|
|
ReasonCodes1 = [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes],
|
|
return_suback(?SUBACK_PACKET(PacketId, ReasonCodes1), Channel);
|
|
|
|
handle_out(unsuback, {PacketId, ReasonCodes}, Channel = ?IS_MQTT_V5) ->
|
|
return_unsuback(?UNSUBACK_PACKET(PacketId, ReasonCodes), Channel);
|
|
|
|
handle_out(unsuback, {PacketId, _ReasonCodes}, Channel) ->
|
|
return_unsuback(?UNSUBACK_PACKET(PacketId), Channel);
|
|
|
|
handle_out(disconnect, ReasonCode, Channel) when is_integer(ReasonCode) ->
|
|
ReasonName = disconnect_reason(ReasonCode),
|
|
handle_out(disconnect, {ReasonCode, ReasonName}, Channel);
|
|
|
|
handle_out(disconnect, {ReasonCode, ReasonName}, Channel = ?IS_MQTT_V5) ->
|
|
Packet = ?DISCONNECT_PACKET(ReasonCode),
|
|
{ok, [{outgoing, Packet}, {close, ReasonName}], Channel};
|
|
|
|
handle_out(disconnect, {_ReasonCode, ReasonName}, Channel) ->
|
|
{ok, {close, ReasonName}, Channel};
|
|
|
|
handle_out(auth, {ReasonCode, Properties}, Channel) ->
|
|
{ok, ?AUTH_PACKET(ReasonCode, Properties), Channel};
|
|
|
|
handle_out(Type, Data, Channel) ->
|
|
?LOG(error, "Unexpected outgoing: ~s, ~p", [Type, Data]),
|
|
{ok, Channel}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Return ConnAck
|
|
%%--------------------------------------------------------------------
|
|
|
|
return_connack(AckPacket, Channel) ->
|
|
Replies = [{event, connected}, {connack, AckPacket}],
|
|
case maybe_resume_session(Channel) of
|
|
ignore -> {ok, Replies, Channel};
|
|
{ok, Publishes, NSession} ->
|
|
NChannel = Channel#channel{session = NSession,
|
|
resuming = false,
|
|
pendings = []
|
|
},
|
|
{Packets, NChannel1} = do_deliver(Publishes, NChannel),
|
|
Outgoing = [{outgoing, Packets} || length(Packets) > 0],
|
|
{ok, Replies ++ Outgoing, NChannel1}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Deliver publish: broker -> client
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% return list(emqx_types:packet())
|
|
do_deliver({pubrel, PacketId}, Channel) ->
|
|
{[?PUBREL_PACKET(PacketId, ?RC_SUCCESS)], Channel};
|
|
|
|
do_deliver({PacketId, Msg}, Channel = #channel{clientinfo = ClientInfo =
|
|
#{mountpoint := MountPoint}}) ->
|
|
ok = emqx_metrics:inc('messages.delivered'),
|
|
Msg1 = emqx_hooks:run_fold('message.delivered',
|
|
[ClientInfo],
|
|
emqx_message:update_expiry(Msg)
|
|
),
|
|
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
|
|
Packet = emqx_message:to_packet(PacketId, Msg2),
|
|
{NPacket, NChannel} = packing_alias(Packet, Channel),
|
|
{[NPacket], NChannel};
|
|
|
|
do_deliver([Publish], Channel) ->
|
|
do_deliver(Publish, Channel);
|
|
|
|
do_deliver(Publishes, Channel) when is_list(Publishes) ->
|
|
{Packets, NChannel} =
|
|
lists:foldl(fun(Publish, {Acc, Chann}) ->
|
|
{Packets, NChann} = do_deliver(Publish, Chann),
|
|
{Packets ++ Acc, NChann}
|
|
end, {[], Channel}, Publishes),
|
|
{lists:reverse(Packets), NChannel}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle out suback
|
|
%%--------------------------------------------------------------------
|
|
|
|
return_suback(Packet, Channel) ->
|
|
{ok, [{outgoing, Packet}, {event, updated}], Channel}.
|
|
|
|
return_unsuback(Packet, Channel) ->
|
|
{ok, [{outgoing, Packet}, {event, updated}], Channel}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle call
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(handle_call(Req :: term(), channel())
|
|
-> {reply, Reply :: term(), channel()}
|
|
| {shutdown, Reason :: term(), Reply :: term(), channel()}
|
|
| {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}).
|
|
handle_call(kick, Channel) ->
|
|
Channel1 = ensure_disconnected(kicked, Channel),
|
|
disconnect_and_shutdown(kicked, ok, Channel1);
|
|
|
|
handle_call(discard, Channel) ->
|
|
disconnect_and_shutdown(discarded, ok, Channel);
|
|
|
|
%% Session Takeover
|
|
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
|
|
reply(Session, Channel#channel{takeover = true});
|
|
|
|
handle_call({takeover, 'end'}, Channel = #channel{session = Session,
|
|
pendings = Pendings}) ->
|
|
ok = emqx_session:takeover(Session),
|
|
%% TODO: Should not drain deliver here (side effect)
|
|
Delivers = emqx_misc:drain_deliver(),
|
|
AllPendings = lists:append(Delivers, Pendings),
|
|
disconnect_and_shutdown(takeovered, AllPendings, Channel);
|
|
|
|
handle_call(list_acl_cache, Channel) ->
|
|
{reply, emqx_acl_cache:list_acl_cache(), Channel};
|
|
|
|
handle_call({quota, Policy}, Channel) ->
|
|
Zone = info(zone, Channel),
|
|
Quota = emqx_limiter:init(Zone, Policy),
|
|
reply(ok, Channel#channel{quota = Quota});
|
|
|
|
handle_call(Req, Channel) ->
|
|
?LOG(error, "Unexpected call: ~p", [Req]),
|
|
reply(ignored, Channel).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle Info
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(handle_info(Info :: term(), channel())
|
|
-> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}).
|
|
|
|
handle_info({subscribe, TopicFilters}, Channel ) ->
|
|
{_, NChannel} = lists:foldl(
|
|
fun({TopicFilter, SubOpts}, {_, ChannelAcc}) ->
|
|
do_subscribe(TopicFilter, SubOpts, ChannelAcc)
|
|
end, {[], Channel}, parse_topic_filters(TopicFilters)),
|
|
{ok, NChannel};
|
|
|
|
handle_info({unsubscribe, TopicFilters}, Channel) ->
|
|
{_RC, NChannel} = process_unsubscribe(TopicFilters, #{}, Channel),
|
|
{ok, NChannel};
|
|
|
|
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
|
|
shutdown(Reason, Channel);
|
|
|
|
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
|
|
shutdown(Reason, Channel);
|
|
|
|
handle_info({sock_closed, Reason}, Channel =
|
|
#channel{conn_state = connected,
|
|
clientinfo = ClientInfo = #{zone := Zone}}) ->
|
|
emqx_zone:enable_flapping_detect(Zone)
|
|
andalso emqx_flapping:detect(ClientInfo),
|
|
Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)),
|
|
case maybe_shutdown(Reason, Channel1) of
|
|
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
|
|
Shutdown -> Shutdown
|
|
end;
|
|
|
|
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
|
|
?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
|
|
{ok, Channel};
|
|
|
|
handle_info(clean_acl_cache, Channel) ->
|
|
ok = emqx_acl_cache:empty_acl_cache(),
|
|
{ok, Channel};
|
|
|
|
handle_info(Info, Channel) ->
|
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
|
{ok, Channel}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Handle timeout
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(handle_timeout(reference(), Msg :: term(), channel())
|
|
-> {ok, channel()}
|
|
| {ok, replies(), channel()}
|
|
| {shutdown, Reason :: term(), channel()}).
|
|
handle_timeout(_TRef, {keepalive, _StatVal},
|
|
Channel = #channel{keepalive = undefined}) ->
|
|
{ok, Channel};
|
|
handle_timeout(_TRef, {keepalive, _StatVal},
|
|
Channel = #channel{conn_state = disconnected}) ->
|
|
{ok, Channel};
|
|
handle_timeout(_TRef, {keepalive, StatVal},
|
|
Channel = #channel{keepalive = Keepalive}) ->
|
|
case emqx_keepalive:check(StatVal, Keepalive) of
|
|
{ok, NKeepalive} ->
|
|
NChannel = Channel#channel{keepalive = NKeepalive},
|
|
{ok, reset_timer(alive_timer, NChannel)};
|
|
{error, timeout} ->
|
|
handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel)
|
|
end;
|
|
|
|
handle_timeout(_TRef, retry_delivery,
|
|
Channel = #channel{conn_state = disconnected}) ->
|
|
{ok, Channel};
|
|
handle_timeout(_TRef, retry_delivery,
|
|
Channel = #channel{session = Session}) ->
|
|
case emqx_session:retry(Session) of
|
|
{ok, NSession} ->
|
|
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
|
|
{ok, Publishes, Timeout, NSession} ->
|
|
NChannel = Channel#channel{session = NSession},
|
|
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
|
|
end;
|
|
|
|
handle_timeout(_TRef, expire_awaiting_rel,
|
|
Channel = #channel{conn_state = disconnected}) ->
|
|
{ok, Channel};
|
|
handle_timeout(_TRef, expire_awaiting_rel,
|
|
Channel = #channel{session = Session}) ->
|
|
case emqx_session:expire(awaiting_rel, Session) of
|
|
{ok, NSession} ->
|
|
{ok, clean_timer(await_timer, Channel#channel{session = NSession})};
|
|
{ok, Timeout, NSession} ->
|
|
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})}
|
|
end;
|
|
|
|
handle_timeout(_TRef, expire_session, Channel) ->
|
|
shutdown(expired, Channel);
|
|
|
|
handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) ->
|
|
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
|
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
|
|
|
|
handle_timeout(_TRef, expire_quota_limit, Channel) ->
|
|
{ok, clean_timer(quota_timer, Channel)};
|
|
|
|
handle_timeout(_TRef, Msg, Channel) ->
|
|
?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
|
|
{ok, Channel}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Ensure timers
|
|
%%--------------------------------------------------------------------
|
|
|
|
ensure_timer([Name], Channel) ->
|
|
ensure_timer(Name, Channel);
|
|
ensure_timer([Name | Rest], Channel) ->
|
|
ensure_timer(Rest, ensure_timer(Name, Channel));
|
|
|
|
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
|
|
TRef = maps:get(Name, Timers, undefined),
|
|
Time = interval(Name, Channel),
|
|
case TRef == undefined andalso Time > 0 of
|
|
true -> ensure_timer(Name, Time, Channel);
|
|
false -> Channel %% Timer disabled or exists
|
|
end.
|
|
|
|
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
|
|
Msg = maps:get(Name, ?TIMER_TABLE),
|
|
TRef = emqx_misc:start_timer(Time, Msg),
|
|
Channel#channel{timers = Timers#{Name => TRef}}.
|
|
|
|
reset_timer(Name, Channel) ->
|
|
ensure_timer(Name, clean_timer(Name, Channel)).
|
|
|
|
reset_timer(Name, Time, Channel) ->
|
|
ensure_timer(Name, Time, clean_timer(Name, Channel)).
|
|
|
|
clean_timer(Name, Channel = #channel{timers = Timers}) ->
|
|
Channel#channel{timers = maps:remove(Name, Timers)}.
|
|
|
|
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
|
|
emqx_keepalive:info(interval, KeepAlive);
|
|
interval(retry_timer, #channel{session = Session}) ->
|
|
timer:seconds(emqx_session:info(retry_interval, Session));
|
|
interval(await_timer, #channel{session = Session}) ->
|
|
timer:seconds(emqx_session:info(await_rel_timeout, Session));
|
|
interval(expire_timer, #channel{conninfo = ConnInfo}) ->
|
|
timer:seconds(maps:get(expiry_interval, ConnInfo));
|
|
interval(will_timer, #channel{will_msg = WillMsg}) ->
|
|
timer:seconds(will_delay_interval(WillMsg)).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Terminate
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(terminate(any(), channel()) -> ok).
|
|
terminate(_, #channel{conn_state = idle}) -> ok;
|
|
terminate(normal, Channel) ->
|
|
run_terminate_hook(normal, Channel);
|
|
terminate({shutdown, Reason}, Channel)
|
|
when Reason =:= kicked; Reason =:= discarded; Reason =:= takeovered ->
|
|
run_terminate_hook(Reason, Channel);
|
|
terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
|
|
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
|
run_terminate_hook(Reason, Channel).
|
|
|
|
run_terminate_hook(_Reason, #channel{session = undefined}) -> ok;
|
|
run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) ->
|
|
emqx_session:terminate(ClientInfo, Reason, Session).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Enrich MQTT Connect Info
|
|
|
|
enrich_conninfo(ConnPkt = #mqtt_packet_connect{
|
|
proto_name = ProtoName,
|
|
proto_ver = ProtoVer,
|
|
clean_start = CleanStart,
|
|
keepalive = Keepalive,
|
|
properties = ConnProps,
|
|
clientid = ClientId,
|
|
username = Username
|
|
},
|
|
Channel = #channel{conninfo = ConnInfo,
|
|
clientinfo = #{zone := Zone}
|
|
}) ->
|
|
ExpiryInterval = expiry_interval(Zone, ConnPkt),
|
|
ReceiveMaximum = receive_maximum(Zone, ConnProps),
|
|
NConnInfo = ConnInfo#{proto_name => ProtoName,
|
|
proto_ver => ProtoVer,
|
|
clean_start => CleanStart,
|
|
keepalive => Keepalive,
|
|
clientid => ClientId,
|
|
username => Username,
|
|
conn_props => ConnProps,
|
|
expiry_interval => ExpiryInterval,
|
|
receive_maximum => ReceiveMaximum
|
|
},
|
|
{ok, Channel#channel{conninfo = NConnInfo}}.
|
|
|
|
%% If the Session Expiry Interval is absent the value 0 is used.
|
|
-compile({inline, [expiry_interval/2]}).
|
|
expiry_interval(_Zone, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
|
|
properties = ConnProps}) ->
|
|
emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0);
|
|
expiry_interval(Zone, #mqtt_packet_connect{clean_start = false}) ->
|
|
emqx_zone:session_expiry_interval(Zone);
|
|
expiry_interval(_Zone, #mqtt_packet_connect{clean_start = true}) ->
|
|
0.
|
|
|
|
-compile({inline, [receive_maximum/2]}).
|
|
receive_maximum(Zone, ConnProps) ->
|
|
emqx_mqtt_props:get('Receive-Maximum', ConnProps, emqx_zone:max_inflight(Zone)).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Run Connect Hooks
|
|
|
|
run_conn_hooks(ConnPkt, Channel = #channel{conninfo = ConnInfo}) ->
|
|
ConnProps = emqx_packet:info(properties, ConnPkt),
|
|
case run_hooks('client.connect', [ConnInfo], ConnProps) of
|
|
Error = {error, _Reason} -> Error;
|
|
NConnProps -> {ok, emqx_packet:set_props(NConnProps, ConnPkt), Channel}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Check Connect Packet
|
|
|
|
check_connect(ConnPkt, #channel{clientinfo = #{zone := Zone}}) ->
|
|
emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone)).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Enrich Client Info
|
|
|
|
enrich_client(ConnPkt, Channel = #channel{clientinfo = ClientInfo}) ->
|
|
{ok, NConnPkt, NClientInfo} = pipeline([fun set_username/2,
|
|
fun set_bridge_mode/2,
|
|
fun maybe_username_as_clientid/2,
|
|
fun maybe_assign_clientid/2,
|
|
fun fix_mountpoint/2
|
|
], ConnPkt, ClientInfo),
|
|
{ok, NConnPkt, Channel#channel{clientinfo = NClientInfo}}.
|
|
|
|
set_username(#mqtt_packet_connect{username = Username},
|
|
ClientInfo = #{username := undefined}) ->
|
|
{ok, ClientInfo#{username => Username}};
|
|
set_username(_ConnPkt, ClientInfo) -> {ok, ClientInfo}.
|
|
|
|
set_bridge_mode(#mqtt_packet_connect{is_bridge = true}, ClientInfo) ->
|
|
{ok, ClientInfo#{is_bridge => true}};
|
|
set_bridge_mode(_ConnPkt, _ClientInfo) -> ok.
|
|
|
|
maybe_username_as_clientid(_ConnPkt, ClientInfo = #{username := undefined}) ->
|
|
{ok, ClientInfo};
|
|
maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone, username := Username}) ->
|
|
case emqx_zone:use_username_as_clientid(Zone) of
|
|
true -> {ok, ClientInfo#{clientid => Username}};
|
|
false -> ok
|
|
end.
|
|
|
|
maybe_assign_clientid(_ConnPkt, ClientInfo = #{clientid := ClientId})
|
|
when ClientId /= undefined ->
|
|
{ok, ClientInfo};
|
|
maybe_assign_clientid(#mqtt_packet_connect{clientid = <<>>}, ClientInfo) ->
|
|
%% Generate a rand clientId
|
|
{ok, ClientInfo#{clientid => emqx_guid:to_base62(emqx_guid:gen())}};
|
|
maybe_assign_clientid(#mqtt_packet_connect{clientid = ClientId}, ClientInfo) ->
|
|
{ok, ClientInfo#{clientid => ClientId}}.
|
|
|
|
fix_mountpoint(_ConnPkt, #{mountpoint := undefined}) -> ok;
|
|
fix_mountpoint(_ConnPkt, ClientInfo = #{mountpoint := MountPoint}) ->
|
|
MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo),
|
|
{ok, ClientInfo#{mountpoint := MountPoint1}}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Set log metadata
|
|
|
|
set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) ->
|
|
emqx_logger:set_metadata_clientid(ClientId).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Check banned
|
|
|
|
check_banned(_ConnPkt, #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
|
|
case emqx_zone:enable_ban(Zone) andalso emqx_banned:check(ClientInfo) of
|
|
true -> {error, ?RC_BANNED};
|
|
false -> ok
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Auth Connect
|
|
|
|
auth_connect(#mqtt_packet_connect{clientid = ClientId,
|
|
username = Username,
|
|
password = Password
|
|
},
|
|
#channel{clientinfo = ClientInfo} = Channel) ->
|
|
case emqx_access_control:authenticate(ClientInfo#{password => Password}) of
|
|
{ok, AuthResult} ->
|
|
is_anonymous(AuthResult) andalso
|
|
emqx_metrics:inc('client.auth.anonymous'),
|
|
NClientInfo = maps:merge(ClientInfo, AuthResult),
|
|
{ok, Channel#channel{clientinfo = NClientInfo}};
|
|
{error, Reason} ->
|
|
?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p",
|
|
[ClientId, Username, Reason]),
|
|
{error, emqx_reason_codes:connack_error(Reason)}
|
|
end.
|
|
|
|
is_anonymous(#{anonymous := true}) -> true;
|
|
is_anonymous(_AuthResult) -> false.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Enhanced Authentication
|
|
|
|
enhanced_auth(?CONNECT_PACKET(#mqtt_packet_connect{
|
|
proto_ver = Protover,
|
|
properties = Properties
|
|
}), Channel) ->
|
|
case Protover of
|
|
?MQTT_PROTO_V5 ->
|
|
AuthMethod = emqx_mqtt_props:get('Authentication-Method', Properties, undefined),
|
|
AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined),
|
|
do_enhanced_auth(AuthMethod, AuthData, Channel);
|
|
_ ->
|
|
{ok, #{}, Channel}
|
|
end;
|
|
|
|
enhanced_auth(?AUTH_PACKET(_ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) ->
|
|
AuthMethod = emqx_mqtt_props:get('Authentication-Method', emqx_mqtt_props:get(conn_props, ConnInfo, #{}), undefined),
|
|
NAuthMethod = emqx_mqtt_props:get('Authentication-Method', Properties, undefined),
|
|
AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined),
|
|
case NAuthMethod =:= undefined orelse NAuthMethod =/= AuthMethod of
|
|
true ->
|
|
{error, emqx_reason_codes:connack_error(bad_authentication_method), Channel};
|
|
false ->
|
|
do_enhanced_auth(AuthMethod, AuthData, Channel)
|
|
end.
|
|
|
|
do_enhanced_auth(undefined, undefined, Channel) ->
|
|
{ok, #{}, Channel};
|
|
do_enhanced_auth(undefined, _AuthData, Channel) ->
|
|
{error, emqx_reason_codes:connack_error(not_authorized), Channel};
|
|
do_enhanced_auth(_AuthMethod, undefined, Channel) ->
|
|
{error, emqx_reason_codes:connack_error(not_authorized), Channel};
|
|
do_enhanced_auth(AuthMethod, AuthData, Channel = #channel{auth_cache = Cache}) ->
|
|
case run_hooks('client.enhanced_authenticate',[AuthMethod, AuthData], Cache) of
|
|
{ok, NAuthData, NCache} ->
|
|
NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData},
|
|
{ok, NProperties, Channel#channel{auth_cache = NCache}};
|
|
{continue, NAuthData, NCache} ->
|
|
NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData},
|
|
{continue, NProperties, Channel#channel{auth_cache = NCache}};
|
|
_ ->
|
|
{error, emqx_reason_codes:connack_error(not_authorized), Channel}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Process Topic Alias
|
|
|
|
process_alias(Packet = #mqtt_packet{
|
|
variable = #mqtt_packet_publish{topic_name = <<>>,
|
|
properties = #{'Topic-Alias' := AliasId}
|
|
} = Publish
|
|
},
|
|
Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases}) ->
|
|
case find_alias(inbound, AliasId, TopicAliases) of
|
|
{ok, Topic} ->
|
|
NPublish = Publish#mqtt_packet_publish{topic_name = Topic},
|
|
{ok, Packet#mqtt_packet{variable = NPublish}, Channel};
|
|
false -> {error, ?RC_PROTOCOL_ERROR}
|
|
end;
|
|
|
|
process_alias(#mqtt_packet{
|
|
variable = #mqtt_packet_publish{topic_name = Topic,
|
|
properties = #{'Topic-Alias' := AliasId}
|
|
}
|
|
},
|
|
Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases}) ->
|
|
NTopicAliases = save_alias(inbound, AliasId, Topic, TopicAliases),
|
|
{ok, Channel#channel{topic_aliases = NTopicAliases}};
|
|
|
|
process_alias(_Packet, Channel) -> {ok, Channel}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Packing Topic Alias
|
|
|
|
packing_alias(Packet = #mqtt_packet{
|
|
variable = #mqtt_packet_publish{
|
|
topic_name = Topic,
|
|
properties = Prop
|
|
} = Publish
|
|
},
|
|
Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases, alias_maximum = Limits}) ->
|
|
case find_alias(outbound, Topic, TopicAliases) of
|
|
{ok, AliasId} ->
|
|
NPublish = Publish#mqtt_packet_publish{
|
|
topic_name = <<>>,
|
|
properties = maps:merge(Prop, #{'Topic-Alias' => AliasId})
|
|
},
|
|
{Packet#mqtt_packet{variable = NPublish}, Channel};
|
|
error ->
|
|
#{outbound := Aliases} = TopicAliases,
|
|
AliasId = maps:size(Aliases) + 1,
|
|
case (Limits =:= undefined) orelse
|
|
(AliasId =< maps:get(outbound, Limits, 0)) of
|
|
true ->
|
|
NTopicAliases = save_alias(outbound, AliasId, Topic, TopicAliases),
|
|
NChannel = Channel#channel{topic_aliases = NTopicAliases},
|
|
NPublish = Publish#mqtt_packet_publish{
|
|
topic_name = Topic,
|
|
properties = maps:merge(Prop, #{'Topic-Alias' => AliasId})
|
|
},
|
|
{Packet#mqtt_packet{variable = NPublish}, NChannel};
|
|
false -> {Packet, Channel}
|
|
end
|
|
end;
|
|
packing_alias(Packet, Channel) -> {Packet, Channel}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Check quota state
|
|
|
|
check_quota_exceeded(_, #channel{timers = Timers}) ->
|
|
case maps:get(quota_timer, Timers, undefined) of
|
|
undefined -> ok;
|
|
_ -> {error, ?RC_QUOTA_EXCEEDED}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Check Pub Alias
|
|
|
|
check_pub_alias(#mqtt_packet{
|
|
variable = #mqtt_packet_publish{
|
|
properties = #{'Topic-Alias' := AliasId}
|
|
}
|
|
},
|
|
#channel{alias_maximum = Limits}) ->
|
|
case (Limits =:= undefined) orelse
|
|
(AliasId =< maps:get(inbound, Limits, ?MAX_TOPIC_AlIAS)) of
|
|
true -> ok;
|
|
false -> {error, ?RC_TOPIC_ALIAS_INVALID}
|
|
end;
|
|
check_pub_alias(_Packet, _Channel) -> ok.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Check Pub ACL
|
|
|
|
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
|
|
#channel{clientinfo = ClientInfo}) ->
|
|
case is_acl_enabled(ClientInfo) andalso
|
|
emqx_access_control:check_acl(ClientInfo, publish, Topic) of
|
|
false -> ok;
|
|
allow -> ok;
|
|
deny -> {error, ?RC_NOT_AUTHORIZED}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Check Pub Caps
|
|
|
|
check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
|
|
retain = Retain},
|
|
variable = #mqtt_packet_publish{topic_name = Topic}
|
|
},
|
|
#channel{clientinfo = #{zone := Zone}}) ->
|
|
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Check Subscribe
|
|
|
|
check_subscribe(TopicFilter, SubOpts, Channel) ->
|
|
case check_sub_acl(TopicFilter, Channel) of
|
|
allow -> check_sub_caps(TopicFilter, SubOpts, Channel);
|
|
deny -> {error, ?RC_NOT_AUTHORIZED}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Check Sub ACL
|
|
|
|
check_sub_acl(TopicFilter, #channel{clientinfo = ClientInfo}) ->
|
|
case is_acl_enabled(ClientInfo) andalso
|
|
emqx_access_control:check_acl(ClientInfo, subscribe, TopicFilter) of
|
|
false -> allow;
|
|
Result -> Result
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Check Sub Caps
|
|
|
|
check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) ->
|
|
emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Enrich SubId
|
|
|
|
put_subid_in_subopts(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
|
|
[{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters];
|
|
put_subid_in_subopts(_Properties, TopicFilters) -> TopicFilters.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Enrich SubOpts
|
|
|
|
enrich_subopts(SubOpts, _Channel = ?IS_MQTT_V5) ->
|
|
SubOpts;
|
|
enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) ->
|
|
NL = flag(emqx_zone:ignore_loop_deliver(Zone)),
|
|
SubOpts#{rap => flag(IsBridge), nl => NL}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Enrich ConnAck Caps
|
|
|
|
enrich_connack_caps(AckProps, ?IS_MQTT_V5 = #channel{clientinfo = #{zone := Zone}}) ->
|
|
#{max_packet_size := MaxPktSize,
|
|
max_qos_allowed := MaxQoS,
|
|
retain_available := Retain,
|
|
max_topic_alias := MaxAlias,
|
|
shared_subscription := Shared,
|
|
wildcard_subscription := Wildcard
|
|
} = emqx_mqtt_caps:get_caps(Zone),
|
|
NAckProps = AckProps#{'Retain-Available' => flag(Retain),
|
|
'Maximum-Packet-Size' => MaxPktSize,
|
|
'Topic-Alias-Maximum' => MaxAlias,
|
|
'Wildcard-Subscription-Available' => flag(Wildcard),
|
|
'Subscription-Identifier-Available' => 1,
|
|
'Shared-Subscription-Available' => flag(Shared)
|
|
},
|
|
%% MQTT 5.0 - 3.2.2.3.4:
|
|
%% It is a Protocol Error to include Maximum QoS more than once,
|
|
%% or to have a value other than 0 or 1. If the Maximum QoS is absent,
|
|
%% the Client uses a Maximum QoS of 2.
|
|
case MaxQoS =:= 2 of
|
|
true -> NAckProps;
|
|
_ -> NAckProps#{'Maximum-QoS' => MaxQoS}
|
|
end;
|
|
|
|
enrich_connack_caps(AckProps, _Channel) -> AckProps.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Enrich server keepalive
|
|
|
|
enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) ->
|
|
case emqx_zone:server_keepalive(Zone) of
|
|
undefined -> AckProps;
|
|
Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Enrich response information
|
|
|
|
enrich_response_information(AckProps, #channel{conninfo = #{conn_props := ConnProps},
|
|
clientinfo = #{zone := Zone}}) ->
|
|
case emqx_mqtt_props:get('Request-Response-Information', ConnProps, 0) of
|
|
0 -> AckProps;
|
|
1 -> AckProps#{'Response-Information' => emqx_zone:response_information(Zone)}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Enrich Assigned ClientId
|
|
|
|
enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo,
|
|
clientinfo = #{clientid := ClientId}}) ->
|
|
case maps:get(clientid, ConnInfo) of
|
|
<<>> -> %% Original ClientId is null.
|
|
AckProps#{'Assigned-Client-Identifier' => ClientId};
|
|
_Origin -> AckProps
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Ensure connected
|
|
|
|
ensure_connected(Channel = #channel{conninfo = ConnInfo,
|
|
clientinfo = ClientInfo}) ->
|
|
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
|
|
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
|
|
Channel#channel{conninfo = NConnInfo,
|
|
conn_state = connected
|
|
}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Init Alias Maximum
|
|
|
|
init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
|
|
properties = Properties},
|
|
#{zone := Zone} = _ClientInfo) ->
|
|
#{outbound => emqx_mqtt_props:get('Topic-Alias-Maximum', Properties, 0),
|
|
inbound => emqx_mqtt_caps:get_caps(Zone, max_topic_alias, ?MAX_TOPIC_AlIAS)
|
|
};
|
|
init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Enrich Keepalive
|
|
|
|
ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
|
|
ensure_keepalive_timer(Interval, Channel);
|
|
ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) ->
|
|
ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel).
|
|
|
|
ensure_keepalive_timer(0, Channel) -> Channel;
|
|
ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
|
|
Backoff = emqx_zone:keepalive_backoff(Zone),
|
|
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
|
|
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Maybe Resume Session
|
|
|
|
maybe_resume_session(#channel{resuming = false}) ->
|
|
ignore;
|
|
maybe_resume_session(#channel{session = Session,
|
|
resuming = true,
|
|
pendings = Pendings}) ->
|
|
{ok, Publishes, Session1} = emqx_session:replay(Session),
|
|
case emqx_session:deliver(Pendings, Session1) of
|
|
{ok, Session2} ->
|
|
{ok, Publishes, Session2};
|
|
{ok, More, Session2} ->
|
|
{ok, lists:append(Publishes, More), Session2}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Maybe Shutdown the Channel
|
|
|
|
maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
|
|
case maps:get(expiry_interval, ConnInfo) of
|
|
?UINT_MAX -> {ok, Channel};
|
|
I when I > 0 ->
|
|
{ok, ensure_timer(expire_timer, timer:seconds(I), Channel)};
|
|
_ -> shutdown(Reason, Channel)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Is ACL enabled?
|
|
|
|
-compile({inline, [is_acl_enabled/1]}).
|
|
is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
|
|
(not IsSuperuser) andalso emqx_zone:enable_acl(Zone).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Parse Topic Filters
|
|
|
|
-compile({inline, [parse_topic_filters/1]}).
|
|
parse_topic_filters(TopicFilters) ->
|
|
lists:map(fun emqx_topic:parse/1, TopicFilters).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Ensure disconnected
|
|
|
|
ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
|
|
clientinfo = ClientInfo}) ->
|
|
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
|
|
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
|
|
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Maybe Publish will msg
|
|
|
|
mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
|
|
Channel;
|
|
mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
|
|
case will_delay_interval(WillMsg) of
|
|
0 -> publish_will_msg(WillMsg),
|
|
Channel#channel{will_msg = undefined};
|
|
I -> ensure_timer(will_timer, timer:seconds(I), Channel)
|
|
end.
|
|
|
|
will_delay_interval(WillMsg) ->
|
|
maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0).
|
|
|
|
publish_will_msg(Msg) -> emqx_broker:publish(Msg).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Disconnect Reason
|
|
|
|
disconnect_reason(?RC_SUCCESS) -> normal;
|
|
disconnect_reason(ReasonCode) -> emqx_reason_codes:name(ReasonCode).
|
|
|
|
reason_code(takeovered) -> ?RC_SESSION_TAKEN_OVER;
|
|
reason_code(discarded) -> ?RC_SESSION_TAKEN_OVER;
|
|
reason_code(_) -> ?RC_NORMAL_DISCONNECTION.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Helper functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
-compile({inline, [run_hooks/2, run_hooks/3]}).
|
|
run_hooks(Name, Args) ->
|
|
ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
|
|
|
|
run_hooks(Name, Args, Acc) ->
|
|
ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc).
|
|
|
|
-compile({inline, [find_alias/3, save_alias/4]}).
|
|
|
|
find_alias(_, _ ,undefined) -> false;
|
|
find_alias(inbound, AliasId, _TopicAliases = #{inbound := Aliases}) ->
|
|
maps:find(AliasId, Aliases);
|
|
find_alias(outbound, Topic, _TopicAliases = #{outbound := Aliases}) ->
|
|
maps:find(Topic, Aliases).
|
|
|
|
save_alias(_, _, _, undefined) -> false;
|
|
save_alias(inbound, AliasId, Topic, TopicAliases = #{inbound := Aliases}) ->
|
|
NAliases = maps:put(AliasId, Topic, Aliases),
|
|
TopicAliases#{inbound => NAliases};
|
|
save_alias(outbound, AliasId, Topic, TopicAliases = #{outbound := Aliases}) ->
|
|
NAliases = maps:put(Topic, AliasId, Aliases),
|
|
TopicAliases#{outbound => NAliases}.
|
|
|
|
-compile({inline, [reply/2, shutdown/2, shutdown/3, sp/1, flag/1]}).
|
|
|
|
reply(Reply, Channel) ->
|
|
{reply, Reply, Channel}.
|
|
|
|
shutdown(success, Channel) ->
|
|
shutdown(normal, Channel);
|
|
shutdown(Reason, Channel) ->
|
|
{shutdown, Reason, Channel}.
|
|
|
|
shutdown(success, Reply, Channel) ->
|
|
shutdown(normal, Reply, Channel);
|
|
shutdown(Reason, Reply, Channel) ->
|
|
{shutdown, Reason, Reply, Channel}.
|
|
|
|
shutdown(success, Reply, Packet, Channel) ->
|
|
shutdown(normal, Reply, Packet, Channel);
|
|
shutdown(Reason, Reply, Packet, Channel) ->
|
|
{shutdown, Reason, Reply, Packet, Channel}.
|
|
|
|
disconnect_and_shutdown(Reason, Reply, Channel = ?IS_MQTT_V5
|
|
= #channel{conn_state = connected}) ->
|
|
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel);
|
|
|
|
disconnect_and_shutdown(Reason, Reply, Channel) ->
|
|
shutdown(Reason, Reply, Channel).
|
|
|
|
sp(true) -> 1;
|
|
sp(false) -> 0.
|
|
|
|
flag(true) -> 1;
|
|
flag(false) -> 0.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% For CT tests
|
|
%%--------------------------------------------------------------------
|
|
|
|
set_field(Name, Value, Channel) ->
|
|
Pos = emqx_misc:index_of(Name, record_info(fields, channel)),
|
|
setelement(Pos+1, Channel, Value).
|
|
|