emqx/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl

2286 lines
72 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mqttsn_channel).
-behaviour(emqx_gateway_channel).
-include("emqx_mqttsn.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/types.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_access_control.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% API
-export([
info/1,
info/2,
stats/1
]).
-export([
init/2,
handle_in/2,
handle_out/3,
handle_deliver/2,
handle_timeout/3,
terminate/2,
set_conn_state/2
]).
-export([
handle_call/3,
handle_cast/2,
handle_info/2
]).
-record(channel, {
%% Context
ctx :: emqx_gateway_ctx:context(),
%% Gateway Id
gateway_id :: integer(),
%% Enable negative_qos
enable_negative_qos :: boolean(),
%% MQTT-SN Connection Info
conninfo :: emqx_types:conninfo(),
%% MQTT-SN Client Info
clientinfo :: emqx_types:clientinfo(),
%% Session
session :: emqx_mqttsn_session:session() | undefined,
%% Keepalive
keepalive :: emqx_keepalive:keepalive() | undefined,
%% Will Msg
will_msg :: emqx_types:message() | undefined,
%% ClientInfo override specs
clientinfo_override :: map(),
%% Connection State
conn_state :: conn_state(),
%% Inflight register message queue
register_inflight :: option(term()),
%% Topics list for awaiting to register to client
register_awaiting_queue :: list(),
%% Duration for asleep
asleep_timer_duration :: integer() | undefined,
%% Timer
timers :: #{atom() => disable | undefined | reference()},
%%% Takeover
takeover :: boolean(),
%% Resume
resuming :: boolean(),
%% Pending delivers when takeovering
pendings :: list()
}).
-type channel() :: #channel{}.
-type conn_state() ::
idle
| connecting
| connected
| asleep
| awake
| disconnected.
-type reply() ::
{outgoing, mqtt_sn_message()}
| {outgoing, [mqtt_sn_message()]}
| {event, conn_state() | updated}
| {close, Reason :: atom()}.
-type replies() :: reply() | [reply()].
-define(DEFAULT_OVERRIDE, #{
clientid => <<"${ConnInfo.clientid}">>
%, username => <<"${ConnInfo.clientid}">>
%, password => <<"${Packet.headers.passcode}">>
}).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
-define(NEG_QOS_CLIENT_ID, <<"NegQoS-Client">>).
-define(REGISTER_INFLIGHT(TopicId, TopicName), #channel{register_inflight = {TopicId, _, TopicName}}).
-define(MAX_RETRY_TIMES, 3).
% 5s
-define(REGISTER_TIMEOUT, 5000).
%% 2h
-define(DEFAULT_SESSION_EXPIRY, 7200000).
%%--------------------------------------------------------------------
%% Init the channel
%%--------------------------------------------------------------------
%% @doc Init protocol
init(
ConnInfo = #{
peername := {PeerHost, _},
sockname := {_, SockPort}
},
Option
) ->
Peercert = maps:get(peercert, ConnInfo, undefined),
Mountpoint = maps:get(mountpoint, Option, undefined),
GwId = maps:get(gateway_id, Option),
EnableNegQoS = maps:get(enable_qos3, Option, true),
ListenerId =
case maps:get(listener, Option, undefined) of
undefined -> undefined;
{GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName)
end,
EnableAuthn = maps:get(enable_authn, Option, true),
ClientInfo = set_peercert_infos(
Peercert,
#{
zone => default,
listener => ListenerId,
protocol => 'mqtt-sn',
peerhost => PeerHost,
sockport => SockPort,
clientid => undefined,
username => undefined,
is_bridge => false,
is_superuser => false,
enable_authn => EnableAuthn,
mountpoint => Mountpoint
}
),
Ctx = maps:get(ctx, Option),
Override = maps:merge(
?DEFAULT_OVERRIDE,
maps:get(clientinfo_override, Option, #{})
),
#channel{
ctx = Ctx,
gateway_id = GwId,
enable_negative_qos = EnableNegQoS,
conninfo = ConnInfo,
clientinfo = ClientInfo,
clientinfo_override = Override,
conn_state = idle,
register_awaiting_queue = [],
timers = #{},
takeover = false,
resuming = false,
pendings = []
}.
set_peercert_infos(NoSSL, ClientInfo) when
NoSSL =:= nossl;
NoSSL =:= undefined
->
ClientInfo;
set_peercert_infos(Peercert, ClientInfo) ->
{DN, CN} = {esockd_peercert:subject(Peercert), esockd_peercert:common_name(Peercert)},
ClientInfo#{dn => DN, cn => CN}.
-spec info(channel()) -> emqx_types:infos().
info(Channel) ->
maps:from_list(info(?INFO_KEYS, Channel)).
-spec info(list(atom()) | atom(), channel()) -> term().
info(Keys, Channel) when is_list(Keys) ->
[{Key, info(Key, Channel)} || Key <- Keys];
info(conninfo, #channel{conninfo = ConnInfo}) ->
ConnInfo;
info(conn_state, #channel{conn_state = ConnState}) ->
ConnState;
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, #channel{session = Session}) ->
emqx_utils:maybe_apply(fun emqx_mqttsn_session:info/1, Session);
info(will_msg, #channel{will_msg = WillMsg}) ->
WillMsg;
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
ClientId;
info(ctx, #channel{ctx = Ctx}) ->
Ctx.
-spec stats(channel()) -> emqx_types:stats().
stats(#channel{session = undefined}) ->
[];
stats(#channel{session = Session}) ->
emqx_mqttsn_session:stats(Session).
set_conn_state(ConnState, Channel) ->
Channel#channel{conn_state = ConnState}.
enrich_conninfo(
?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId),
Channel = #channel{conninfo = ConnInfo}
) ->
CleanStart = Flags#mqtt_sn_flags.clean_start,
NConnInfo = ConnInfo#{
clientid => ClientId,
proto_name => <<"MQTT-SN">>,
proto_ver => <<"1.2">>,
clean_start => CleanStart,
keepalive => Duration,
expiry_interval => expiry_interval(Flags)
},
{ok, Channel#channel{conninfo = NConnInfo}}.
expiry_interval(#mqtt_sn_flags{clean_start = false}) ->
%% TODO: make it configurable
?DEFAULT_SESSION_EXPIRY;
expiry_interval(#mqtt_sn_flags{clean_start = true}) ->
0.
run_conn_hooks(
Packet,
Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo
}
) ->
%% XXX: Assign headers of Packet to ConnProps
ConnProps = #{},
case run_hooks(Ctx, 'client.connect', [ConnInfo], ConnProps) of
Error = {error, _Reason} -> Error;
_NConnProps -> {ok, Packet, Channel}
end.
enrich_clientinfo(
Packet,
Channel = #channel{
conninfo = ConnInfo,
clientinfo = ClientInfo0,
clientinfo_override = Override
}
) ->
ClientInfo = write_clientinfo(
feedvar(Override, Packet, ConnInfo, ClientInfo0),
ClientInfo0
),
{ok, NPacket, NClientInfo} = emqx_utils:pipeline(
[
fun maybe_assign_clientid/2,
%% FIXME: CALL After authentication successfully
fun fix_mountpoint/2
],
Packet,
ClientInfo
),
{ok, NPacket, Channel#channel{clientinfo = NClientInfo}}.
feedvar(Override, Packet, ConnInfo, ClientInfo) ->
Envs = #{
'ConnInfo' => ConnInfo,
'ClientInfo' => ClientInfo,
'Packet' => connect_packet_to_map(Packet)
},
maps:map(
fun(_K, V) ->
Tokens = emqx_placeholder:preproc_tmpl(V),
emqx_placeholder:proc_tmpl(Tokens, Envs)
end,
Override
).
connect_packet_to_map(#mqtt_sn_message{}) ->
%% XXX: Empty now
#{}.
write_clientinfo(Override, ClientInfo) ->
Override1 = maps:with([username, password, clientid], Override),
maps:merge(ClientInfo, Override1).
maybe_assign_clientid(_Packet, ClientInfo = #{clientid := ClientId}) when
ClientId == undefined;
ClientId == <<>>
->
{ok, ClientInfo#{clientid => emqx_guid:to_base62(emqx_guid:gen())}};
maybe_assign_clientid(_Packet, ClientInfo) ->
{ok, ClientInfo}.
fix_mountpoint(_Packet, #{mountpoint := undefined}) ->
ok;
fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) ->
%% TODO: Enrich the variable replacement????
%% i.e: ${ClientInfo.auth_result.productKey}
Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
{ok, ClientInfo#{mountpoint := Mountpoint1}}.
set_log_meta(_Packet, #channel{clientinfo = #{clientid := ClientId}}) ->
emqx_logger:set_metadata_clientid(ClientId),
ok.
maybe_require_will_msg(?SN_CONNECT_MSG(Flags, _, _, _), Channel) ->
#mqtt_sn_flags{will = Will} = Flags,
case Will of
true ->
{error, need_will_msg, Channel};
_ ->
ok
end.
auth_connect(
_Packet,
Channel = #channel{
ctx = Ctx,
clientinfo = ClientInfo
}
) ->
#{
clientid := ClientId,
username := Username
} = ClientInfo,
case emqx_gateway_ctx:authenticate(Ctx, ClientInfo) of
{ok, NClientInfo} ->
{ok, Channel#channel{clientinfo = NClientInfo}};
{error, Reason} ->
?SLOG(warning, #{
msg => "client_login_failed",
clientid => ClientId,
username => Username,
reason => Reason
}),
{error, name_to_returncode(Reason)}
end.
ensure_connected(
Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
schedule_connection_expire(Channel#channel{
conninfo = NConnInfo,
conn_state = connected
}).
schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
undefined ->
Channel;
Interval ->
ensure_timer(connection_expire, Interval, Channel)
end.
process_connect(
Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo = #{clean_start := CleanStart},
clientinfo = ClientInfo,
will_msg = MaybeWillMsg
}
) ->
SessFun = fun(ClientInfoT, _) -> emqx_mqttsn_session:init(ClientInfoT, MaybeWillMsg) end,
case
emqx_gateway_ctx:open_session(
Ctx,
CleanStart,
ClientInfo,
ConnInfo,
SessFun,
_SessMod = emqx_mqttsn_session
)
of
{ok, #{
session := Session,
present := false
}} ->
handle_out(
connack,
?SN_RC_ACCEPTED,
Channel#channel{session = Session}
);
{ok, #{session := Session, present := true, pendings := Pendings}} ->
Pendings1 = lists:usort(lists:append(Pendings, emqx_utils:drain_deliver())),
NChannel = Channel#channel{
session = Session,
resuming = true,
pendings = Pendings1
},
handle_out(connack, ?SN_RC_ACCEPTED, NChannel);
{error, Reason} ->
?SLOG(error, #{
msg => "failed_to_open_session",
reason => Reason
}),
handle_out(connack, ?SN_RC2_FAILED_SESSION, Channel)
end.
%%--------------------------------------------------------------------
%% Enrich Keepalive
ensure_keepalive(Channel = #channel{conninfo = ConnInfo}) ->
ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel).
ensure_keepalive_timer(0, Channel) ->
Channel;
ensure_keepalive_timer(Interval, Channel) ->
Keepalive = emqx_keepalive:init(Interval),
ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}).
%%--------------------------------------------------------------------
%% Handle incoming packet
%%--------------------------------------------------------------------
-spec handle_in(emqx_types:packet() | {frame_error, any()}, channel()) ->
{ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}.
%% SEARCHGW, GWINFO
handle_in(
?SN_SEARCHGW_MSG(_Radius),
Channel = #channel{gateway_id = GwId}
) ->
{ok, {outgoing, ?SN_GWINFO_MSG(GwId, <<>>)}, Channel};
handle_in(?SN_ADVERTISE_MSG(_GwId, _Radius), Channel) ->
% ignore
shutdown(normal, Channel);
%% Ack DISCONNECT even if it is not connected
handle_in(
?SN_DISCONNECT_MSG(_Duration),
Channel = #channel{conn_state = idle}
) ->
handle_out(disconnect, normal, Channel);
handle_in(
Publish =
?SN_PUBLISH_MSG(
#mqtt_sn_flags{
qos = ?QOS_NEG1,
topic_id_type = TopicIdType
},
TopicId,
MsgId,
Data
),
Channel = #channel{conn_state = idle}
) ->
case check_negative_qos_enable(Publish, Channel) of
ok ->
TopicName =
case TopicIdType of
?SN_SHORT_TOPIC ->
TopicId;
?SN_PREDEFINED_TOPIC ->
Registry = emqx_mqttsn_registry:init(),
emqx_mqttsn_registry:lookup_topic(TopicId, Registry);
_ ->
undefined
end,
case TopicName =/= undefined of
true ->
Msg = emqx_message:make(
?NEG_QOS_CLIENT_ID,
?QOS_0,
TopicName,
Data
),
?SLOG(debug, #{
msg => "receive_qo3_message_in_idle_mode",
topic => TopicName,
data => Data
}),
_ = emqx_broker:publish(Msg),
ok;
false ->
ok
end,
shutdown(normal, Channel);
{error, Rc} ->
?tp(info, ignore_negative_qos, #{
topic_id => TopicId,
msg_id => MsgId,
return_code => Rc
}),
PubAck = ?SN_PUBACK_MSG(TopicId, MsgId, Rc),
shutdown(normal, PubAck, Channel)
end;
handle_in(
Pkt = #mqtt_sn_message{type = Type},
Channel = #channel{conn_state = idle}
) when
Type /= ?SN_CONNECT
->
?SLOG(warning, #{
msg => "receive_unknown_packet_in_idle_state",
packet => Pkt
}),
shutdown(normal, Channel);
handle_in(
?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId),
Channel = #channel{conn_state = connecting}
) ->
?SLOG(warning, #{msg => "receive_connect_packet_in_connecting_state"}),
{ok, Channel};
handle_in(
?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId),
Channel = #channel{conn_state = connected}
) ->
{error, unexpected_connect, Channel};
handle_in(
?SN_WILLTOPIC_EMPTY_MSG,
Channel = #channel{conn_state = connecting}
) ->
%% 6.3:
%% Note that if a client wants to delete only its Will data at
%% connection setup, it could send a CONNECT message with
%% 'CleanSession=false' and 'Will=true',
%% and sends an empty WILLTOPIC message to the GW when prompted to do so
case auth_connect(fake_packet, Channel#channel{will_msg = undefined}) of
{ok, NChannel} ->
process_connect(ensure_connected(NChannel));
{error, ReasonCode} ->
handle_out(connack, ReasonCode, Channel)
end;
handle_in(
?SN_WILLTOPIC_MSG(Flags, Topic),
Channel = #channel{
conn_state = connecting,
clientinfo = #{clientid := ClientId}
}
) ->
#mqtt_sn_flags{qos = QoS, retain = Retain} = Flags,
WillMsg0 = emqx_message:make(ClientId, QoS, Topic, <<>>),
WillMsg = emqx_message:set_flag(retain, Retain, WillMsg0),
NChannel = Channel#channel{will_msg = WillMsg},
{ok, {outgoing, ?SN_WILLMSGREQ_MSG()}, NChannel};
handle_in(
?SN_WILLMSG_MSG(Payload),
Channel = #channel{
conn_state = connecting,
will_msg = WillMsg
}
) ->
NWillMsg = WillMsg#message{payload = Payload},
case auth_connect(fake_packet, Channel#channel{will_msg = NWillMsg}) of
{ok, NChannel} ->
process_connect(ensure_connected(NChannel));
{error, ReasonCode} ->
handle_out(connack, ReasonCode, Channel)
end;
%% TODO: takeover ???
handle_in(
?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, ClientId),
Channel = #channel{
clientinfo = #{clientid := ClientId},
conn_state = ConnState
}
) when
ConnState == asleep;
ConnState == awake
->
%% From the asleep or awake state a client can return either to the
%% active state by sending a CONNECT message [6.14]
?SLOG(info, #{
msg => "goto_connected_state",
previous_state => ConnState,
clientid => ClientId
}),
handle_out(
connack,
?SN_RC_ACCEPTED,
Channel#channel{conn_state = connected}
);
%% new connection
handle_in(
Packet = ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId),
Channel = #channel{conn_state = idle}
) ->
case
emqx_utils:pipeline(
[
fun enrich_conninfo/2,
fun run_conn_hooks/2,
fun enrich_clientinfo/2,
fun set_log_meta/2,
%% TODO: How to implement the banned in the gateway instance?
%, fun check_banned/2
fun maybe_require_will_msg/2,
fun auth_connect/2
],
Packet,
Channel#channel{conn_state = connecting}
)
of
{ok, _NPacket, NChannel} ->
process_connect(ensure_connected(NChannel));
{error, need_will_msg, NChannel} ->
{ok, {outgoing, ?SN_WILLTOPICREQ_MSG()}, NChannel};
{error, ReasonCode, NChannel} ->
handle_out(connack, ReasonCode, NChannel)
end;
handle_in(
?SN_REGISTER_MSG(_TopicId, MsgId, TopicName),
Channel = #channel{session = Session}
) ->
Registry = emqx_mqttsn_session:registry(Session),
case emqx_mqttsn_registry:reg(TopicName, Registry) of
{ok, TopicId, NRegistry} ->
?SLOG(debug, #{
msg => "registered_topic_name",
topic_name => TopicName,
topic_id => TopicId
}),
AckPacket = ?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED),
NSession = emqx_mqttsn_session:set_registry(NRegistry, Session),
{ok, {outgoing, AckPacket}, Channel#channel{session = NSession}};
{error, too_large} ->
?SLOG(error, #{
msg => "register_topic_failed",
topic_name => TopicName,
reason => topic_id_fulled
}),
AckPacket = ?SN_REGACK_MSG(
?SN_INVALID_TOPIC_ID,
MsgId,
?SN_RC_NOT_SUPPORTED
),
{ok, {outgoing, AckPacket}, Channel};
{error, wildcard_topic} ->
?SLOG(error, #{
msg => "register_topic_failed",
topic_name => TopicName,
reason => not_support_wildcard_topic
}),
AckPacket = ?SN_REGACK_MSG(
?SN_INVALID_TOPIC_ID,
MsgId,
?SN_RC_NOT_SUPPORTED
),
{ok, {outgoing, AckPacket}, Channel}
end;
handle_in(
?SN_REGACK_MSG(TopicId, _MsgId, ?SN_RC_ACCEPTED),
Channel = ?REGISTER_INFLIGHT(TopicId, TopicName)
) ->
?SLOG(debug, #{
msg => "register_topic_name_to_client_succesfully",
topic_id => TopicId,
topic_name => TopicName
}),
NChannel = cancel_timer(
retry_register,
Channel#channel{register_inflight = undefined}
),
send_next_register_or_replay_publish(TopicName, NChannel);
handle_in(
?SN_REGACK_MSG(TopicId, _MsgId, Reason),
Channel = ?REGISTER_INFLIGHT(TopicId, TopicName)
) ->
case Reason of
?SN_RC_CONGESTION ->
%% TODO: a or b?
%% a. waiting for next register timer
%% b. re-new the re-transmit timer
{ok, Channel};
_ ->
%% skip this topic-name register, if the reason is
%% ?SN_RC_NOT_SUPPORTED, ?SN_RC_INVALID_TOPIC_ID, etc.
?SLOG(warning, #{
msg => "skipp_register_topic_name_to_client",
topic_id => TopicId,
topic_name => TopicName
}),
NChannel = cancel_timer(
retry_register,
Channel#channel{register_inflight = undefined}
),
send_next_register_or_replay_publish(TopicName, NChannel)
end;
handle_in(
?SN_REGACK_MSG(TopicId, MsgId, Reason),
Channel = #channel{register_inflight = Inflight}
) ->
?SLOG(error, #{
msg => "unexpected_regack_msg",
acked_msg_id => MsgId,
acked_topic_id => TopicId,
acked_reason => Reason,
current_inflight => Inflight
}),
{ok, Channel};
handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) ->
TopicId =
case is_integer(TopicId0) of
true ->
TopicId0;
_ ->
<<Id:16>> = TopicId0,
Id
end,
case
emqx_utils:pipeline(
[
fun check_negative_qos_enable/2,
fun preproc_pub_pkt/2,
fun convert_topic_id_to_name/2,
fun check_pub_authz/2,
fun convert_pub_to_msg/2
],
PubPkt,
Channel
)
of
{ok, Msg, NChannel} ->
do_publish(TopicId, MsgId, Msg, NChannel);
{error, ReturnCode, NChannel} ->
?tp(info, publish_msg_rejected, #{
topic_id => TopicId,
msg_id => MsgId,
return_code => ReturnCode
}),
handle_out(puback, {TopicId, MsgId, ReturnCode}, NChannel)
end;
handle_in(
?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
Channel = #channel{
ctx = Ctx,
session = Session,
clientinfo = ClientInfo
}
) ->
Registry = emqx_mqttsn_session:registry(Session),
case ReturnCode of
?SN_RC_ACCEPTED ->
case emqx_mqttsn_session:puback(ClientInfo, MsgId, Session) of
{ok, Msg, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Channel),
{Replies, NChannel} = goto_asleep_if_buffered_msgs_sent(
Channel#channel{session = NSession}
),
{ok, Replies, NChannel};
{ok, Msg, Publishes, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Channel),
handle_out(
publish,
Publishes,
Channel#channel{session = NSession}
);
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?SLOG(warning, #{
msg => "commit_puback_failed",
msg_id => MsgId,
reason => msg_id_inused
}),
ok = metrics_inc(Ctx, 'packets.puback.inuse'),
{ok, Channel};
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?SLOG(warning, #{
msg => "commit_puback_failed",
msg_id => MsgId,
reason => not_found
}),
ok = metrics_inc(Ctx, 'packets.puback.missed'),
{ok, Channel}
end;
?SN_RC_INVALID_TOPIC_ID ->
case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of
undefined ->
{ok, Channel};
TopicName ->
%% notice that this TopicName maybe normal or predefined,
%% involving the predefined topic name in register to
%% enhance the gateway's robustness even inconsistent
%% with MQTT-SN channels
handle_out(register, {TopicId, TopicName}, Channel)
end;
_ ->
?SLOG(error, #{
msg => "cannt_handle_PUBACK",
return_code => ReturnCode
}),
{ok, Channel}
end;
handle_in(
?SN_PUBREC_MSG(?SN_PUBREC, MsgId),
Channel = #channel{
ctx = Ctx,
session = Session,
clientinfo = ClientInfo
}
) ->
case emqx_mqttsn_session:pubrec(ClientInfo, MsgId, Session) of
{ok, Msg, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Channel),
NChannel = Channel#channel{session = NSession},
handle_out(pubrel, MsgId, NChannel);
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?SLOG(warning, #{
msg => "commit_PUBREC_failed",
msg_id => MsgId,
reason => msg_id_inused
}),
ok = metrics_inc(Ctx, 'packets.pubrec.inuse'),
handle_out(pubrel, MsgId, Channel);
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?SLOG(warning, #{
msg => "commit_PUBREC_failed",
msg_id => MsgId,
reason => not_found
}),
ok = metrics_inc(Ctx, 'packets.pubrec.missed'),
handle_out(pubrel, MsgId, Channel)
end;
handle_in(
?SN_PUBREC_MSG(?SN_PUBREL, MsgId),
Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}
) ->
case emqx_mqttsn_session:pubrel(ClientInfo, MsgId, Session) of
{ok, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out(pubcomp, MsgId, NChannel);
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?SLOG(warning, #{
msg => "commit_PUBREL_failed",
msg_id => MsgId,
reason => not_found
}),
ok = metrics_inc(Ctx, 'packets.pubrel.missed'),
handle_out(pubcomp, MsgId, Channel)
end;
handle_in(
?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId),
Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}
) ->
case emqx_mqttsn_session:pubcomp(ClientInfo, MsgId, Session) of
{ok, NSession} ->
{Replies, NChannel} = goto_asleep_if_buffered_msgs_sent(
Channel#channel{session = NSession}
),
{ok, Replies, NChannel};
{ok, Publishes, NSession} ->
handle_out(
publish,
Publishes,
Channel#channel{session = NSession}
);
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?SLOG(warning, #{
msg => "commit_PUBCOMP_failed",
msg_id => MsgId,
reason => msg_id_inused
}),
ok = metrics_inc(Ctx, 'packets.pubcomp.inuse'),
{ok, Channel};
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?SLOG(warning, #{
msg => "commit_PUBCOMP_failed",
msg_id => MsgId,
reason => not_found
}),
ok = metrics_inc(Ctx, 'packets.pubcomp.missed'),
{ok, Channel}
end;
handle_in(SubPkt = ?SN_SUBSCRIBE_MSG(_, MsgId, _), Channel) ->
case
emqx_utils:pipeline(
[
fun preproc_subs_type/2,
fun check_subscribe_authz/2,
fun run_client_subs_hook/2,
fun do_subscribe/2
],
SubPkt,
Channel
)
of
{ok, {TopicId, _TopicName, SubOpts}, NChannel} ->
GrantedQoS = maps:get(qos, SubOpts),
SubAck = ?SN_SUBACK_MSG(
#mqtt_sn_flags{qos = GrantedQoS},
TopicId,
MsgId,
?SN_RC_ACCEPTED
),
{ok, outgoing_and_update(SubAck), NChannel};
{error, ReturnCode, NChannel} ->
SubAck = ?SN_SUBACK_MSG(
#mqtt_sn_flags{},
?SN_INVALID_TOPIC_ID,
MsgId,
ReturnCode
),
{ok, {outgoing, SubAck}, NChannel}
end;
handle_in(
UnsubPkt = ?SN_UNSUBSCRIBE_MSG(_, MsgId, TopicIdOrName),
Channel
) ->
case
emqx_utils:pipeline(
[
fun preproc_unsub_type/2,
fun run_client_unsub_hook/2,
fun do_unsubscribe/2
],
UnsubPkt,
Channel
)
of
{ok, _TopicName, NChannel} ->
UnsubAck = ?SN_UNSUBACK_MSG(MsgId),
{ok, outgoing_and_update(UnsubAck), NChannel};
{error, Reason, NChannel} ->
?SLOG(warning, #{
msg => "unsubscribe_failed",
topic => TopicIdOrName,
reason => Reason
}),
%% XXX: Even if it fails, the reply is successful.
UnsubAck = ?SN_UNSUBACK_MSG(MsgId),
{ok, {outgoing, UnsubAck}, NChannel}
end;
handle_in(?SN_PINGREQ_MSG(ClientId), Channel) when
ClientId == undefined;
ClientId == <<>>
->
{ok, {outgoing, ?SN_PINGRESP_MSG()}, Channel};
handle_in(
?SN_PINGREQ_MSG(ReqClientId),
Channel = #channel{clientinfo = #{clientid := ClientId}}
) when
ReqClientId =/= ClientId
->
?SLOG(warning, #{
msg => "awake_pingreq_clientid_not_match",
clientid => ClientId,
request_clientid => ReqClientId
}),
%% FIXME: takeover_and_awake..
{ok, Channel};
handle_in(
?SN_PINGREQ_MSG(ClientId),
Channel = #channel{conn_state = ConnState}
) when
ConnState == idle; ConnState == asleep; ConnState == awake
->
awake(ClientId, Channel);
handle_in(
?SN_PINGREQ_MSG(ClientId),
Channel = #channel{
conn_state = connected,
clientinfo = #{clientid := ClientId}
}
) ->
{ok, {outgoing, ?SN_PINGRESP_MSG()}, Channel};
handle_in(?SN_DISCONNECT_MSG(_Duration = undefined), Channel) ->
handle_out(disconnect, normal, Channel);
handle_in(
?SN_DISCONNECT_MSG(Duration),
Channel = #channel{conn_state = ConnState}
) when
ConnState == connected; ConnState == asleep
->
%% A DISCONNECT message with a Duration field is sent by a client
%% when it wants to go to the “asleep” state. The receipt of this
%% message is also acknowledged by the gateway by means of a
%% DISCONNECT message (without a duration field) [5.4.21]
%%
AckPkt = ?SN_DISCONNECT_MSG(undefined),
{ok, [{outgoing, AckPkt}, {event, asleep}], asleep(Duration, Channel)};
handle_in(
?SN_WILLTOPICUPD_MSG(Flags, Topic),
Channel = #channel{
will_msg = WillMsg,
clientinfo = #{clientid := ClientId}
}
) ->
NWillMsg =
case Topic of
undefined -> undefined;
_ -> update_will_topic(WillMsg, Flags, Topic, ClientId)
end,
AckPkt = ?SN_WILLTOPICRESP_MSG(?SN_RC_ACCEPTED),
{ok, {outgoing, AckPkt}, Channel#channel{will_msg = NWillMsg}};
handle_in(
?SN_WILLMSGUPD_MSG(Payload),
Channel = #channel{will_msg = WillMsg}
) ->
AckPkt = ?SN_WILLMSGRESP_MSG(?SN_RC_ACCEPTED),
NWillMsg = update_will_msg(WillMsg, Payload),
{ok, {outgoing, AckPkt}, Channel#channel{will_msg = NWillMsg}};
handle_in(
{frame_error, Reason},
Channel = #channel{conn_state = _ConnState}
) ->
?SLOG(error, #{
msg => "unexpected_frame_error",
reason => Reason
}),
shutdown(Reason, Channel).
after_message_acked(ClientInfo, Msg, #channel{ctx = Ctx}) ->
ok = metrics_inc(Ctx, 'messages.acked'),
run_hooks_without_metrics(
Ctx,
'message.acked',
[ClientInfo, emqx_message:set_header(puback_props, #{}, Msg)]
).
outgoing_and_update(Pkt) ->
[{outgoing, Pkt}, {event, update}].
send_next_register_or_replay_publish(
_TopicName,
Channel = #channel{register_awaiting_queue = []}
) ->
{Outgoing, NChannel} = resume_or_replay_messages(Channel),
{ok, Outgoing, NChannel};
send_next_register_or_replay_publish(
_TopicName,
Channel = #channel{register_awaiting_queue = RAQueue}
) ->
[RegisterReq | NRAQueue] = RAQueue,
handle_out(
register,
RegisterReq,
Channel#channel{register_awaiting_queue = NRAQueue}
).
%%--------------------------------------------------------------------
%% Handle Publish
check_negative_qos_enable(
?SN_PUBLISH_MSG(Flags, _TopicId, _MsgId, _Data),
#channel{enable_negative_qos = EnableNegQoS}
) ->
#mqtt_sn_flags{qos = QoS} = Flags,
case EnableNegQoS =:= false andalso QoS =:= ?QOS_NEG1 of
true ->
{error, ?SN_RC_NOT_SUPPORTED};
false ->
ok
end.
preproc_pub_pkt(
?SN_PUBLISH_MSG(Flags, Topic0, _MsgId, Data),
Channel
) ->
#mqtt_sn_flags{topic_id_type = TopicIdType} = Flags,
case TopicIdType of
?SN_NORMAL_TOPIC ->
<<TopicId:16>> = Topic0,
TopicIndicator = {id, TopicId},
{ok, {TopicIndicator, Flags, Data}, Channel};
?SN_PREDEFINED_TOPIC ->
TopicIndicator = {id, Topic0},
{ok, {TopicIndicator, Flags, Data}, Channel};
?SN_SHORT_TOPIC ->
case emqx_topic:wildcard(Topic0) of
true ->
{error, ?SN_RC_NOT_SUPPORTED};
false ->
TopicIndicator = {name, Topic0},
{ok, {TopicIndicator, Flags, Data}, Channel}
end
end.
convert_topic_id_to_name({{name, TopicName}, Flags, Data}, Channel) ->
{ok, {TopicName, Flags, Data}, Channel};
convert_topic_id_to_name(
{{id, TopicId}, Flags, Data},
Channel = #channel{session = Session}
) ->
Registry = emqx_mqttsn_session:registry(Session),
case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of
undefined ->
{error, ?SN_RC_INVALID_TOPIC_ID};
TopicName ->
{ok, {TopicName, Flags, Data}, Channel}
end.
check_pub_authz(
{TopicName, #mqtt_sn_flags{qos = QoS, retain = Retain}, _Data},
#channel{ctx = Ctx, clientinfo = ClientInfo}
) ->
Action = ?AUTHZ_PUBLISH(QoS, Retain),
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, Action, TopicName) of
allow -> ok;
deny -> {error, ?SN_RC2_NOT_AUTHORIZE}
end.
convert_pub_to_msg(
{TopicName, Flags, Data},
Channel = #channel{clientinfo = #{clientid := ClientId, mountpoint := Mountpoint}}
) ->
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
NewQoS = get_corrected_qos(QoS),
NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
Message = put_message_headers(
emqx_message:make(
ClientId,
NewQoS,
NTopicName,
Data,
#{dup => Dup, retain => Retain},
#{}
),
Channel
),
{ok, Message, Channel}.
put_message_headers(Msg, #channel{
conninfo = #{proto_ver := ProtoVer},
clientinfo = #{
protocol := Protocol,
username := Username,
peerhost := PeerHost
}
}) ->
emqx_message:set_headers(
#{
proto_ver => ProtoVer,
protocol => Protocol,
username => Username,
peerhost => PeerHost
},
Msg
).
get_corrected_qos(?QOS_NEG1) -> ?QOS_0;
get_corrected_qos(QoS) -> QoS.
do_publish(_TopicId, _MsgId, Msg = #message{qos = ?QOS_0}, Channel) ->
_ = emqx_broker:publish(Msg),
{ok, Channel};
do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_1}, Channel) ->
_ = emqx_broker:publish(Msg),
handle_out(puback, {TopicId, MsgId, ?SN_RC_ACCEPTED}, Channel);
do_publish(
TopicId,
MsgId,
Msg = #message{qos = ?QOS_2},
Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}
) ->
case emqx_mqttsn_session:publish(ClientInfo, MsgId, Msg, Session) of
{ok, _PubRes, NSession} ->
NChannel1 = ensure_timer(
expire_awaiting_rel,
Channel#channel{session = NSession}
),
handle_out(pubrec, MsgId, NChannel1);
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
ok = metrics_inc(Ctx, 'packets.publish.inuse'),
%% XXX: Use PUBACK to reply a PUBLISH Error Code
handle_out(
puback,
{TopicId, MsgId, ?SN_RC_NOT_SUPPORTED},
Channel
);
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
ok = metrics_inc(Ctx, 'packets.publish.dropped'),
handle_out(puback, {TopicId, MsgId, ?SN_RC_CONGESTION}, Channel)
end.
%%--------------------------------------------------------------------
%% Handle Susbscribe
preproc_subs_type(
?SN_SUBSCRIBE_MSG_TYPE(
?SN_NORMAL_TOPIC,
TopicName,
QoS
),
Channel = #channel{session = Session}
) ->
Registry = emqx_mqttsn_session:registry(Session),
%% If the gateway is able accept the subscription,
%% it assigns a topic id to the received topic name
%% and returns it within a SUBACK message
case emqx_mqttsn_registry:reg(TopicName, Registry) of
{error, too_large} ->
{error, ?SN_RC2_EXCEED_LIMITATION};
{error, wildcard_topic} ->
%% If the client subscribes to a topic name which contains a
%% wildcard character, the returning SUBACK message will contain
%% the topic id value 0x0000. The GW will the use the registration
%% procedure to inform the client about the to-be-used topic id
%% value when it has the first PUBLISH message with a matching
%% topic name to be sent to the client, see also Section 6.10.
{ok, {?SN_INVALID_TOPIC_ID, TopicName, QoS}, Channel};
{ok, TopicId, NRegistry} ->
NSession = emqx_mqttsn_session:set_registry(NRegistry, Session),
{ok, {TopicId, TopicName, QoS}, Channel#channel{session = NSession}}
end;
preproc_subs_type(
?SN_SUBSCRIBE_MSG_TYPE(
?SN_PREDEFINED_TOPIC,
TopicId,
QoS
),
Channel = #channel{session = Session}
) ->
Registry = emqx_mqttsn_session:registry(Session),
case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of
undefined ->
{error, ?SN_RC_INVALID_TOPIC_ID};
TopicName ->
{ok, {TopicId, TopicName, QoS}, Channel}
end;
preproc_subs_type(
?SN_SUBSCRIBE_MSG_TYPE(
?SN_SHORT_TOPIC,
TopicId,
QoS
),
Channel
) ->
TopicName =
case is_binary(TopicId) of
true -> TopicId;
false -> <<TopicId:16>>
end,
%% XXX: ?SN_INVALID_TOPIC_ID ???
{ok, {?SN_INVALID_TOPIC_ID, TopicName, QoS}, Channel};
preproc_subs_type(
?SN_SUBSCRIBE_MSG_TYPE(_Reserved, _TopicId, _QoS),
_Channel
) ->
{error, ?SN_RC_NOT_SUPPORTED}.
check_subscribe_authz(
{_TopicId, TopicName, QoS},
Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}
) ->
Action = ?AUTHZ_SUBSCRIBE(QoS),
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, Action, TopicName) of
allow ->
{ok, Channel};
_ ->
{error, ?SN_RC2_NOT_AUTHORIZE}
end.
run_client_subs_hook(
{TopicId, TopicName, QoS},
Channel = #channel{
ctx = Ctx,
clientinfo = ClientInfo
}
) ->
{TopicName1, SubOpts0} = emqx_topic:parse(TopicName),
TopicFilters = [{TopicName1, SubOpts0#{qos => QoS}}],
case
run_hooks(
Ctx,
'client.subscribe',
[ClientInfo, #{}],
TopicFilters
)
of
[] ->
?SLOG(warning, #{
msg => "skip_to_subscribe",
topic_name => TopicName,
reason => "'client.subscribe' filtered it"
}),
{error, ?SN_RC2_EXCEED_LIMITATION};
[{NTopicName, NSubOpts} | _] ->
{ok, {TopicId, NTopicName, NSubOpts}, Channel}
end.
do_subscribe(
{TopicId, TopicName, SubOpts},
Channel = #channel{
session = Session,
clientinfo =
ClientInfo =
#{mountpoint := Mountpoint}
}
) ->
NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts),
case emqx_mqttsn_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of
{ok, NSession} ->
{ok, {TopicId, NTopicName, NSubOpts}, Channel#channel{session = NSession}};
{error, ?RC_QUOTA_EXCEEDED} ->
?SLOG(warning, #{
msg => "cannt_subscribe_due_to_quota_exceeded",
topic_name => TopicName,
reason => emqx_reason_codes:text(?RC_QUOTA_EXCEEDED)
}),
{error, ?SN_RC2_EXCEED_LIMITATION}
end.
%%--------------------------------------------------------------------
%% Handle Unsubscribe
preproc_unsub_type(
?SN_UNSUBSCRIBE_MSG_TYPE(
?SN_NORMAL_TOPIC,
TopicName
),
Channel
) ->
{ok, TopicName, Channel};
preproc_unsub_type(
?SN_UNSUBSCRIBE_MSG_TYPE(
?SN_PREDEFINED_TOPIC,
TopicId
),
Channel = #channel{session = Session}
) ->
Registry = emqx_mqttsn_session:registry(Session),
case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of
undefined ->
{error, not_found};
TopicName ->
{ok, TopicName, Channel}
end;
preproc_unsub_type(
?SN_UNSUBSCRIBE_MSG_TYPE(
?SN_SHORT_TOPIC,
TopicId
),
Channel
) ->
TopicName =
case is_binary(TopicId) of
true -> TopicId;
false -> <<TopicId:16>>
end,
{ok, TopicName, Channel}.
run_client_unsub_hook(
TopicName,
Channel = #channel{
ctx = Ctx,
clientinfo = ClientInfo
}
) ->
TopicFilters = [emqx_topic:parse(TopicName)],
case
run_hooks(
Ctx,
'client.unsubscribe',
[ClientInfo, #{}],
TopicFilters
)
of
[] ->
{ok, [], Channel};
NTopicFilters ->
{ok, NTopicFilters, Channel}
end.
do_unsubscribe(
TopicFilters,
Channel = #channel{
session = Session,
clientinfo =
ClientInfo =
#{mountpoint := Mountpoint}
}
) ->
NChannel =
lists:foldl(
fun({TopicName, SubOpts}, ChannAcc) ->
NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
NSubOpts = maps:merge(
emqx_gateway_utils:default_subopts(),
SubOpts
),
case
emqx_mqttsn_session:unsubscribe(
ClientInfo,
NTopicName,
NSubOpts,
Session
)
of
{ok, NSession} ->
ChannAcc#channel{session = NSession};
{error, ?RC_NO_SUBSCRIPTION_EXISTED} ->
ChannAcc
end
end,
Channel,
TopicFilters
),
{ok, TopicFilters, NChannel}.
%%--------------------------------------------------------------------
%% Awake & Asleep
awake(ClientId, Channel = #channel{conn_state = idle}) ->
?SLOG(warning, #{
msg => "awake_pingreq_in_idle_state",
clientid => ClientId
}),
%% TODO: takeover and awake?
%% 1. Query emqx_cm_registry to get the session state?
%% 2. Takeover it and goto awake state
{ok, {outgoing, ?SN_PINGRESP_MSG()}, Channel};
awake(
ClientId,
Channel = #channel{
conn_state = ConnState,
session = Session,
clientinfo = ClientInfo = #{clientid := ClientId}
}
) when
ConnState == asleep; ConnState == awake
->
?SLOG(info, #{
msg => "goto_awake_state",
clientid => ClientId,
previous_state => ConnState
}),
{ok, Publishes, NSession} = emqx_mqttsn_session:replay(ClientInfo, Session),
Channel1 = cancel_timer(expire_asleep, Channel),
{Replies0, NChannel0} = outgoing_deliver_and_register(
do_deliver(
Publishes,
Channel1#channel{
conn_state = awake, session = NSession
}
)
),
Replies1 = [{event, awake} | Replies0],
{Replies2, NChannel} = goto_asleep_if_buffered_msgs_sent(NChannel0),
{ok, Replies1 ++ Replies2, NChannel}.
goto_asleep_if_buffered_msgs_sent(
Channel = #channel{
conn_state = awake,
session = Session,
asleep_timer_duration = Duration
}
) ->
case
emqx_mqueue:is_empty(emqx_mqttsn_session:info(mqueue, Session)) andalso
emqx_inflight:is_empty(emqx_mqttsn_session:info(inflight, Session))
of
true ->
?SLOG(info, #{
msg => "goto_asleep_state",
reason => buffered_messages_sent,
duration => Duration
}),
Replies = [
{outgoing, ?SN_PINGRESP_MSG()},
{event, asleep}
],
{Replies, ensure_asleep_timer(Channel#channel{conn_state = asleep})};
false ->
{[], Channel}
end;
goto_asleep_if_buffered_msgs_sent(Channel) ->
{[], Channel}.
asleep(Duration, Channel = #channel{conn_state = asleep}) ->
%% 6.14: The client can also modify its sleep duration
%% by sending a DISCONNECT message with a new value of
%% the sleep duration
%%
%% XXX: Do we need to limit the maximum of Duration?
?SLOG(debug, #{
msg => "update_asleep_timer",
new_duration => Duration
}),
ensure_asleep_timer(Duration, cancel_timer(expire_asleep, Channel));
asleep(Duration, Channel = #channel{conn_state = connected}) ->
?SLOG(info, #{
msg => "goto_asleep_state",
duration => Duration
}),
ensure_asleep_timer(Duration, Channel#channel{conn_state = asleep}).
%%--------------------------------------------------------------------
%% Handle outgoing packet
%%--------------------------------------------------------------------
-spec handle_out(atom(), term(), channel()) ->
{ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}.
handle_out(
connack,
?SN_RC_ACCEPTED,
Channel = #channel{ctx = Ctx, conninfo = ConnInfo}
) ->
_ = run_hooks(
Ctx,
'client.connack',
[ConnInfo, returncode_name(?SN_RC_ACCEPTED)],
#{}
),
return_connack(
?SN_CONNACK_MSG(?SN_RC_ACCEPTED),
ensure_keepalive(Channel)
);
handle_out(
connack,
ReasonCode,
Channel = #channel{ctx = Ctx, conninfo = ConnInfo}
) ->
Reason = returncode_name(ReasonCode),
_ = run_hooks(Ctx, 'client.connack', [ConnInfo, Reason], #{}),
AckPacket = ?SN_CONNACK_MSG(ReasonCode),
shutdown(Reason, AckPacket, Channel);
handle_out(publish, Publishes, Channel) ->
{Replies1, NChannel} = outgoing_deliver_and_register(
do_deliver(Publishes, Channel)
),
{Replies2, NChannel2} = goto_asleep_if_buffered_msgs_sent(NChannel),
{ok, Replies1 ++ Replies2, NChannel2};
handle_out(puback, {TopicId, MsgId, Rc}, Channel) ->
{ok, {outgoing, ?SN_PUBACK_MSG(TopicId, MsgId, Rc)}, Channel};
handle_out(pubrec, MsgId, Channel) ->
{ok, {outgoing, ?SN_PUBREC_MSG(?SN_PUBREC, MsgId)}, Channel};
handle_out(pubrel, MsgId, Channel) ->
{ok, {outgoing, ?SN_PUBREC_MSG(?SN_PUBREL, MsgId)}, Channel};
handle_out(pubcomp, MsgId, Channel) ->
{ok, {outgoing, ?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId)}, Channel};
handle_out(
register,
{TopicId, TopicName},
Channel = #channel{
session = Session,
register_inflight = undefined
}
) ->
{MsgId, NSession} = emqx_mqttsn_session:obtain_next_pkt_id(Session),
Outgoing = {outgoing, ?SN_REGISTER_MSG(TopicId, MsgId, TopicName)},
NChannel = Channel#channel{
session = NSession,
register_inflight = {TopicId, MsgId, TopicName}
},
{ok, Outgoing, ensure_register_timer(NChannel)};
handle_out(
register,
{TopicId, TopicName},
Channel = #channel{
register_inflight = Inflight,
register_awaiting_queue = RAQueue
}
) ->
case enqueue_register_request({TopicId, TopicName}, Inflight, RAQueue) of
ignore ->
?SLOG(debug, #{
msg => "ingore_register_request_to_client",
register_request =>
#{
topic_id => TopicId,
topic_name => TopicName
}
}),
{ok, Channel};
NRAQueue ->
?SLOG(debug, #{
msg => "put_register_msg_into_awaiting_queue",
register_request =>
#{
topic_id => TopicId,
topic_name => TopicName
},
register_awaiting_queue_size => length(NRAQueue)
}),
{ok, Channel#channel{register_awaiting_queue = NRAQueue}}
end;
handle_out(disconnect, RC, Channel) ->
DisPkt = ?SN_DISCONNECT_MSG(undefined),
Reason =
case is_atom(RC) of
true -> RC;
false -> returncode_name(RC)
end,
{ok, [{outgoing, DisPkt}, {close, Reason}], Channel}.
enqueue_register_request({_, TopicName}, {_, _, TopicName}, _RAQueue) ->
ignore;
enqueue_register_request({TopicId, TopicName}, _, RAQueue) ->
HasQueued = lists:any(fun({_, T}) -> T == TopicName end, RAQueue),
case HasQueued of
true -> ignore;
false -> RAQueue ++ [{TopicId, TopicName}]
end.
%%--------------------------------------------------------------------
%% Return ConnAck
%%--------------------------------------------------------------------
return_connack(AckPacket, Channel) ->
Replies1 = [{event, connected}, {outgoing, AckPacket}],
{Replies2, NChannel} = maybe_resume_session(Channel),
{ok, Replies1 ++ Replies2, NChannel}.
%%--------------------------------------------------------------------
%% Maybe Resume Session
maybe_resume_session(Channel = #channel{resuming = false}) ->
{[], Channel};
maybe_resume_session(
Channel = #channel{
session = Session,
resuming = true
}
) ->
Subs = emqx_mqttsn_session:info(subscriptions, Session),
case subs_resume() andalso map_size(Subs) =/= 0 of
true ->
TopicNames = lists:filter(fun(T) -> not emqx_topic:wildcard(T) end, maps:keys(Subs)),
Registers = lists:map(fun(T) -> {register, T} end, TopicNames),
{Registers, Channel};
false ->
resume_or_replay_messages(Channel)
end.
resume_or_replay_messages(
Channel = #channel{
resuming = Resuming,
pendings = Pendings,
session = Session,
clientinfo = ClientInfo
}
) ->
{NPendings, NChannel} =
case Resuming of
true ->
{Pendings, Channel#channel{resuming = false, pendings = []}};
false ->
{[], Channel}
end,
{ok, Publishes, Session1} = emqx_mqttsn_session:replay(ClientInfo, Session),
{NPublishes, NSession} =
case emqx_mqttsn_session:deliver(ClientInfo, NPendings, Session1) of
{ok, Session2} ->
{Publishes, Session2};
{ok, More, Session2} ->
{lists:append(Publishes, More), Session2}
end,
outgoing_deliver_and_register(
do_deliver(NPublishes, NChannel#channel{session = NSession})
).
subs_resume() ->
emqx:get_config([gateway, mqttsn, subs_resume]).
%%--------------------------------------------------------------------
%% Deliver publish: broker -> client
%%--------------------------------------------------------------------
do_deliver({pubrel, MsgId}, Channel) ->
{[?SN_PUBREC_MSG(?SN_PUBREL, MsgId)], Channel};
do_deliver(
{MsgId, Msg},
Channel = #channel{
ctx = Ctx,
clientinfo =
ClientInfo =
#{mountpoint := Mountpoint}
}
) ->
metrics_inc(Ctx, 'messages.delivered'),
Msg1 = run_hooks_without_metrics(
Ctx,
'message.delivered',
[ClientInfo],
emqx_message:update_expiry(Msg)
),
Msg2 = emqx_mountpoint:unmount(Mountpoint, Msg1),
Packet = message_to_packet(MsgId, Msg2, Channel),
{[Packet], Channel};
do_deliver([Publish], Channel) ->
do_deliver(Publish, Channel);
do_deliver(Publishes, Channel) when is_list(Publishes) ->
{Packets, NChannel} =
lists:foldl(
fun(Publish, {Acc, Chann}) ->
{Packets, NChann} = do_deliver(Publish, Chann),
{Packets ++ Acc, NChann}
end,
{[], Channel},
Publishes
),
{lists:reverse(Packets), NChannel}.
outgoing_deliver_and_register({Packets, Channel}) ->
{NPackets, NRegisters} =
lists:foldl(
fun(P, {Acc0, Acc1}) ->
case P of
{register, _} ->
{Acc0, [P | Acc1]};
_ ->
{[P | Acc0], Acc1}
end
end,
{[], []},
Packets
),
{[{outgoing, lists:reverse(NPackets)}] ++ lists:reverse(NRegisters), Channel}.
message_to_packet(
MsgId,
Message,
#channel{session = Session}
) ->
QoS = emqx_message:qos(Message),
Topic = emqx_message:topic(Message),
Payload = emqx_message:payload(Message),
NMsgId =
case QoS of
?QOS_0 -> 0;
_ -> MsgId
end,
Registry = emqx_mqttsn_session:registry(Session),
case emqx_mqttsn_registry:lookup_topic_id(Topic, Registry) of
{predef, PredefTopicId} ->
Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_PREDEFINED_TOPIC},
?SN_PUBLISH_MSG(Flags, PredefTopicId, NMsgId, Payload);
TopicId when is_integer(TopicId) ->
Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_NORMAL_TOPIC},
?SN_PUBLISH_MSG(Flags, TopicId, NMsgId, Payload);
undefined when byte_size(Topic) =:= 2 ->
Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_SHORT_TOPIC},
?SN_PUBLISH_MSG(Flags, Topic, NMsgId, Payload);
undefined ->
{register, Topic}
end.
%%--------------------------------------------------------------------
%% Handle call
%%--------------------------------------------------------------------
-spec handle_call(Req :: term(), From :: term(), channel()) ->
{reply, Reply :: term(), channel()}
| {reply, Reply :: term(), replies(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}.
handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
case do_subscribe({?SN_INVALID_TOPIC_ID, Topic, SubOpts}, Channel) of
{ok, {_, NTopicName, NSubOpts}, NChannel} ->
reply_and_update({ok, {NTopicName, NSubOpts}}, NChannel);
{error, ?SN_RC2_EXCEED_LIMITATION} ->
reply({error, exceed_limitation}, Channel)
end;
handle_call({unsubscribe, Topic}, _From, Channel) ->
TopicFilters = [emqx_topic:parse(Topic)],
{ok, _, NChannel} = do_unsubscribe(TopicFilters, Channel),
reply_and_update(ok, NChannel);
handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
reply({ok, maps:to_list(emqx_mqttsn_session:info(subscriptions, Session))}, Channel);
handle_call(kick, _From, Channel) ->
NChannel = ensure_disconnected(kicked, Channel),
shutdown_and_reply(kicked, ok, NChannel);
handle_call(discard, _From, Channel) ->
shutdown_and_reply(discarded, ok, Channel);
handle_call({takeover, 'begin'}, _From, Channel = #channel{session = Session}) ->
%% In MQTT-SN the meaning of a “clean session” is extended to the Will
%% feature, i.e. not only the subscriptions are persistent, but also the
%% Will topic and the Will message. [6.3]
%%
%% FIXME: We need to reply WillMsg and Session
reply(Session, Channel#channel{takeover = true});
handle_call(
{takeover, 'end'},
_From,
Channel = #channel{
session = Session,
pendings = Pendings
}
) ->
ok = emqx_mqttsn_session:takeover(Session),
%% TODO: Should not drain deliver here (side effect)
Delivers = emqx_utils:drain_deliver(),
AllPendings = lists:append(Delivers, Pendings),
shutdown_and_reply(takenover, AllPendings, Channel);
%handle_call(list_authz_cache, _From, Channel) ->
% {reply, emqx_authz_cache:list_authz_cache(), Channel};
%% XXX: No Quota Now
% handle_call({quota, Policy}, _From, Channel) ->
% Zone = info(zone, Channel),
% Quota = emqx_limiter:init(Zone, Policy),
% reply(ok, Channel#channel{quota = Quota});
handle_call(Req, _From, Channel) ->
?SLOG(error, #{
msg => "unexpected_call",
call => Req
}),
reply(ignored, Channel).
%%--------------------------------------------------------------------
%% Handle Cast
%%--------------------------------------------------------------------
-spec handle_cast(Req :: term(), channel()) ->
ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
handle_cast(_Req, Channel) ->
{ok, Channel}.
%%--------------------------------------------------------------------
%% Handle Info
%%--------------------------------------------------------------------
-spec handle_info(Info :: term(), channel()) ->
ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
handle_info(
{sock_closed, Reason},
Channel = #channel{conn_state = idle}
) ->
shutdown(Reason, Channel);
handle_info(
{sock_closed, Reason},
Channel = #channel{conn_state = connecting}
) ->
shutdown(Reason, Channel);
handle_info(
{sock_closed, Reason},
Channel = #channel{
conn_state = connected,
clientinfo = _ClientInfo
}
) ->
%% XXX: Flapping detect ???
%% How to get the flapping detect policy ???
%emqx_zone:enable_flapping_detect(Zone)
% andalso emqx_flapping:detect(ClientInfo),
NChannel = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)),
case maybe_shutdown(Reason, NChannel) of
{ok, NChannel1} -> {ok, {event, disconnected}, NChannel1};
Shutdown -> Shutdown
end;
handle_info(
{sock_closed, Reason},
Channel = #channel{conn_state = disconnected}
) ->
?SLOG(error, #{
msg => "unexpected_sock_closed",
reason => Reason
}),
{ok, Channel};
handle_info(clean_authz_cache, Channel) ->
ok = emqx_authz_cache:empty_authz_cache(),
{ok, Channel};
handle_info({subscribe, _}, Channel) ->
{ok, Channel};
handle_info({register, TopicName}, Channel = #channel{session = Session}) ->
Registry = emqx_mqttsn_session:registry(Session),
case emqx_mqttsn_registry:reg(TopicName, Registry) of
{error, Reason} ->
?SLOG(error, #{
msg => "register_topic_failed",
topic_name => TopicName,
reason => Reason
}),
{ok, Channel};
{ok, TopicId, NRegistry} ->
NSession = emqx_mqttsn_session:set_registry(NRegistry, Session),
handle_out(register, {TopicId, TopicName}, Channel#channel{session = NSession})
end;
handle_info(Info, Channel) ->
?SLOG(error, #{
msg => "unexpected_info",
info => Info
}),
{ok, Channel}.
maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
case maps:get(expiry_interval, ConnInfo) of
?UINT_MAX ->
{ok, Channel};
I when I > 0 ->
{ok, ensure_timer(expire_session, I, Channel)};
_ ->
shutdown(Reason, Channel)
end.
%%--------------------------------------------------------------------
%% Ensure disconnected
ensure_disconnected(
Reason,
Channel = #channel{
ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo
}
) ->
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)},
ok = run_hooks(
Ctx,
'client.disconnected',
[ClientInfo, Reason, NConnInfo]
),
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
Channel;
mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
ok = publish_will_msg(put_message_headers(WillMsg, Channel)),
Channel#channel{will_msg = undefined}.
publish_will_msg(Msg) ->
_ = emqx_broker:publish(Msg),
ok.
%%--------------------------------------------------------------------
%% Handle Delivers from broker to client
%%--------------------------------------------------------------------
-spec handle_deliver(list(emqx_types:deliver()), channel()) ->
{ok, channel()}
| {ok, replies(), channel()}.
handle_deliver(
Delivers,
Channel = #channel{
ctx = Ctx,
conn_state = ConnState,
session = Session,
clientinfo = ClientInfo = #{clientid := ClientId}
}
) when
ConnState =:= disconnected;
ConnState =:= asleep
->
NSession = emqx_mqttsn_session:enqueue(
ClientInfo,
ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx),
Session
),
{ok, Channel#channel{session = NSession}};
%% There are two scenarios need to cache delivering messages:
%% 1. it is being takeover by other channel
%% 2. it is being resume registered topic-names
handle_deliver(
Delivers,
Channel = #channel{
ctx = Ctx,
takeover = Takeover,
pendings = Pendings,
session = Session,
resuming = Resuming,
clientinfo = #{clientid := ClientId}
}
) when
Takeover == true; Resuming == true
->
NPendings = lists:append(
Pendings,
ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx)
),
{ok, Channel#channel{pendings = NPendings}};
handle_deliver(
Delivers,
Channel = #channel{
ctx = Ctx,
session = Session,
clientinfo = ClientInfo = #{clientid := ClientId}
}
) ->
case
emqx_mqttsn_session:deliver(
ClientInfo,
ignore_local(Delivers, ClientId, Session, Ctx),
Session
)
of
{ok, Publishes, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out(
publish,
Publishes,
ensure_timer(retry_delivery, NChannel)
);
{ok, NSession} ->
{ok, Channel#channel{session = NSession}}
end.
ignore_local(Delivers, Subscriber, Session, Ctx) ->
Subs = emqx_mqttsn_session:info(subscriptions, Session),
lists:filter(
fun({deliver, Topic, #message{from = Publisher}}) ->
case maps:find(Topic, Subs) of
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
ok = metrics_inc(Ctx, 'delivery.dropped'),
ok = metrics_inc(Ctx, 'delivery.dropped.no_local'),
false;
_ ->
true
end
end,
Delivers
).
%% Nack delivers from shared subscription
maybe_nack(Delivers) ->
lists:filter(fun not_nacked/1, Delivers).
not_nacked({deliver, _Topic, Msg}) ->
not (emqx_shared_sub:is_ack_required(Msg) andalso
(ok == emqx_shared_sub:nack_no_connection(Msg))).
%%--------------------------------------------------------------------
%% Handle timeout
%%--------------------------------------------------------------------
-spec handle_timeout(reference(), Msg :: term(), channel()) ->
{ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}.
handle_timeout(
_TRef,
{keepalive, _StatVal},
Channel = #channel{keepalive = undefined}
) ->
{ok, Channel};
handle_timeout(
_TRef,
{keepalive, _StatVal},
Channel = #channel{conn_state = ConnState}
) when
ConnState =:= disconnected;
ConnState =:= asleep
->
{ok, Channel};
handle_timeout(
_TRef,
{keepalive, StatVal},
Channel = #channel{keepalive = Keepalive}
) ->
case emqx_keepalive:check(StatVal, Keepalive) of
{ok, NKeepalive} ->
NChannel = Channel#channel{keepalive = NKeepalive},
{ok, reset_timer(keepalive, NChannel)};
{error, timeout} ->
handle_out(disconnect, ?SN_RC2_KEEPALIVE_TIMEOUT, Channel)
end;
handle_timeout(
_TRef,
retry_delivery,
Channel = #channel{conn_state = disconnected}
) ->
{ok, Channel};
handle_timeout(
_TRef,
retry_delivery = TimerName,
Channel = #channel{conn_state = asleep}
) ->
{ok, reset_timer(TimerName, Channel)};
handle_timeout(
_TRef,
expire_awaiting_rel,
Channel = #channel{conn_state = disconnected}
) ->
{ok, Channel};
handle_timeout(
_TRef,
expire_awaiting_rel = TimerName,
Channel = #channel{conn_state = asleep}
) ->
{ok, reset_timer(TimerName, Channel)};
handle_timeout(
_TRef,
TimerName,
Channel = #channel{session = Session, clientinfo = ClientInfo}
) when TimerName == retry_delivery; TimerName == expire_awaiting_rel ->
case emqx_mqttsn_session:handle_timeout(ClientInfo, TimerName, Session) of
{ok, Publishes, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out(publish, Publishes, clean_timer(TimerName, NChannel));
{ok, Publishes, Timeout, NSession} ->
NChannel = Channel#channel{session = NSession},
%% XXX: These replay messages should awaiting register acked?
handle_out(publish, Publishes, reset_timer(TimerName, Timeout, NChannel))
end;
handle_timeout(
_TRef,
{retry_register, RetryTimes},
Channel = #channel{register_inflight = {TopicId, MsgId, TopicName}}
) ->
case RetryTimes < ?MAX_RETRY_TIMES of
true ->
Outgoing = {outgoing, ?SN_REGISTER_MSG(TopicId, MsgId, TopicName)},
{ok, Outgoing, ensure_register_timer(RetryTimes + 1, Channel)};
false ->
?SLOG(error, #{
msg => "register_topic_reached_max_retry_times",
register_request =>
#{
topic_id => TopicId,
msg_id => MsgId,
topic_name => TopicName
}
}),
handle_out(disconnect, ?SN_RC2_REACHED_MAX_RETRY, Channel)
end;
handle_timeout(_TRef, expire_session, Channel) ->
shutdown(expired, Channel);
handle_timeout(_TRef, expire_asleep, Channel) ->
shutdown(asleep_timeout, Channel);
handle_timeout(_TRef, connection_expire, Channel) ->
NChannel = clean_timer(connection_expire, Channel),
handle_out(disconnect, expired, NChannel);
handle_timeout(_TRef, Msg, Channel) ->
%% NOTE
%% We do not expect `emqx_mqttsn_session` to set up any custom timers (i.e with
%% `emqx_session:ensure_timer/3`), because `emqx_session_mem` doesn't use any.
?SLOG(error, #{
msg => "unexpected_timeout",
timeout_msg => Msg
}),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Terminate
%%--------------------------------------------------------------------
terminate(_Reason, _Channel) ->
ok.
reply(Reply, Channel) ->
{reply, Reply, Channel}.
reply_and_update(Reply, Channel) ->
{reply, Reply, [{event, updated}], Channel}.
shutdown(Reason, Channel) ->
{shutdown, Reason, Channel}.
shutdown(Reason, AckFrame, Channel) ->
{shutdown, Reason, AckFrame, Channel}.
shutdown_and_reply(Reason, Reply, Channel) ->
{shutdown, Reason, Reply, Channel}.
%%--------------------------------------------------------------------
%% Will
update_will_topic(
undefined,
#mqtt_sn_flags{qos = QoS, retain = Retain},
Topic,
ClientId
) ->
WillMsg0 = emqx_message:make(ClientId, QoS, Topic, <<>>),
emqx_message:set_flag(retain, Retain, WillMsg0);
update_will_topic(
Will,
#mqtt_sn_flags{qos = QoS, retain = Retain},
Topic,
_ClientId
) ->
emqx_message:set_flag(
retain,
Retain,
Will#message{qos = QoS, topic = Topic}
).
update_will_msg(Will, Payload) ->
Will#message{payload = Payload}.
%%--------------------------------------------------------------------
%% Timer
ensure_asleep_timer(Channel = #channel{asleep_timer_duration = Duration}) when
is_integer(Duration)
->
ensure_asleep_timer(Duration, Channel).
ensure_asleep_timer(Durtion, Channel) ->
ensure_timer(
expire_asleep,
timer:seconds(Durtion),
Channel#channel{asleep_timer_duration = Durtion}
).
ensure_register_timer(Channel) ->
ensure_register_timer(0, Channel).
ensure_register_timer(RetryTimes, Channel = #channel{timers = Timers}) ->
TRef = emqx_utils:start_timer(?REGISTER_TIMEOUT, {retry_register, RetryTimes}),
Channel#channel{timers = Timers#{retry_register => TRef}}.
cancel_timer(Name, Channel = #channel{timers = Timers}) ->
case maps:get(Name, Timers, undefined) of
undefined ->
Channel;
TRef ->
emqx_utils:cancel_timer(TRef),
Channel#channel{timers = maps:without([Name], Timers)}
end.
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
TRef = maps:get(Name, Timers, undefined),
Time = interval(Name, Channel),
case TRef == undefined andalso is_integer(Time) andalso Time > 0 of
true -> ensure_timer(Name, Time, Channel);
%% Timer disabled or exists
false -> Channel
end.
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
TRef = emqx_utils:start_timer(Time, Name),
Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) ->
ensure_timer(Name, clean_timer(Name, Channel)).
reset_timer(Name, Time, Channel) ->
ensure_timer(Name, Time, clean_timer(Name, Channel)).
clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
interval(keepalive, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(check_interval, KeepAlive);
interval(retry_delivery, #channel{session = Session}) ->
emqx_mqttsn_session:info(retry_interval, Session);
interval(expire_awaiting_rel, #channel{session = Session}) ->
emqx_mqttsn_session:info(await_rel_timeout, Session).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
run_hooks(Ctx, Name, Args) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name),
emqx_hooks:run(Name, Args).
run_hooks(Ctx, Name, Args, Acc) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name),
emqx_hooks:run_fold(Name, Args, Acc).
run_hooks_without_metrics(_Ctx, Name, Args) ->
emqx_hooks:run(Name, Args).
run_hooks_without_metrics(_Ctx, Name, Args, Acc) ->
emqx_hooks:run_fold(Name, Args, Acc).
metrics_inc(Ctx, Name) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name).
returncode_name(?SN_RC_ACCEPTED) -> accepted;
returncode_name(?SN_RC_CONGESTION) -> rejected_congestion;
returncode_name(?SN_RC_INVALID_TOPIC_ID) -> rejected_invaild_topic_id;
returncode_name(?SN_RC_NOT_SUPPORTED) -> rejected_not_supported;
returncode_name(?SN_RC2_NOT_AUTHORIZE) -> rejected_not_authorize;
returncode_name(?SN_RC2_FAILED_SESSION) -> rejected_failed_open_session;
returncode_name(?SN_RC2_KEEPALIVE_TIMEOUT) -> rejected_keepalive_timeout;
returncode_name(?SN_RC2_EXCEED_LIMITATION) -> rejected_exceed_limitation;
returncode_name(?SN_RC2_REACHED_MAX_RETRY) -> reached_max_retry_times;
returncode_name(_) -> accepted.
name_to_returncode(not_authorized) -> ?SN_RC2_NOT_AUTHORIZE.