Merge pull request #7239 from HJianBo/mqttsn-alseep

Support the client asleep mechanism for MQTT-SN gateway
This commit is contained in:
JianBo He 2022-03-10 09:35:45 +08:00 committed by GitHub
commit 0b6b2295a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 852 additions and 736 deletions

View File

@ -753,6 +753,7 @@ serialize_and_inc_stats_fun(#state{
send(IoData, State = #state{socket = Socket, send(IoData, State = #state{socket = Socket,
chann_mod = ChannMod, chann_mod = ChannMod,
channel = Channel}) -> channel = Channel}) ->
?SLOG(debug, #{msg => "SEND_data", data => IoData}),
Ctx = ChannMod:info(ctx, Channel), Ctx = ChannMod:info(ctx, Channel),
Oct = iolist_size(IoData), Oct = iolist_size(IoData),
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.sent', Oct), ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.sent', Oct),

View File

@ -103,6 +103,8 @@
-define(T_TAKEOVER, 15000). -define(T_TAKEOVER, 15000).
-define(DEFAULT_BATCH_SIZE, 10000). -define(DEFAULT_BATCH_SIZE, 10000).
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -171,8 +173,7 @@ get_chan_info(GwName, ClientId) ->
get_chan_info(GwName, ClientId, ChanPid) get_chan_info(GwName, ClientId, ChanPid)
end). end).
-spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> -spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> [pid()].
[pid()].
do_lookup_by_clientid(GwName, ClientId) -> do_lookup_by_clientid(GwName, ClientId) ->
ChanTab = emqx_gateway_cm:tabname(chan, GwName), ChanTab = emqx_gateway_cm:tabname(chan, GwName),
[Pid || {_, Pid} <- ets:lookup(ChanTab, ClientId)]. [Pid || {_, Pid} <- ets:lookup(ChanTab, ClientId)].
@ -191,13 +192,15 @@ do_get_chan_info(GwName, ClientId, ChanPid) ->
-spec get_chan_info(gateway_name(), emqx_types:clientid(), pid()) -spec get_chan_info(gateway_name(), emqx_types:clientid(), pid())
-> emqx_types:infos() | undefined. -> emqx_types:infos() | undefined.
get_chan_info(GwName, ClientId, ChanPid) -> get_chan_info(GwName, ClientId, ChanPid) ->
wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)). wrap_rpc(
emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)
).
-spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> -spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> [pid()].
[pid()].
lookup_by_clientid(GwName, ClientId) -> lookup_by_clientid(GwName, ClientId) ->
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
case emqx_gateway_cm_proto_v1:lookup_by_clientid(Nodes, GwName, ClientId) of case emqx_gateway_cm_proto_v1:lookup_by_clientid(
Nodes, GwName, ClientId) of
{Pids, []} -> {Pids, []} ->
lists:append(Pids); lists:append(Pids);
{_, _BadNodes} -> {_, _BadNodes} ->
@ -390,7 +393,7 @@ takeover_session(GwName, ClientId) ->
[ChanPid] -> [ChanPid] ->
do_takeover_session(GwName, ClientId, ChanPid); do_takeover_session(GwName, ClientId, ChanPid);
ChanPids -> ChanPids ->
[ChanPid|StalePids] = lists:reverse(ChanPids), [ChanPid | StalePids] = lists:reverse(ChanPids),
?SLOG(warning, #{ msg => "more_than_one_channel_found" ?SLOG(warning, #{ msg => "more_than_one_channel_found"
, chan_pids => ChanPids , chan_pids => ChanPids
}), }),
@ -565,41 +568,54 @@ do_get_chann_conn_mod(GwName, ClientId, ChanPid) ->
get_chann_conn_mod(GwName, ClientId, ChanPid) -> get_chann_conn_mod(GwName, ClientId, ChanPid) ->
wrap_rpc(emqx_gateway_cm_proto_v1:get_chann_conn_mod(GwName, ClientId, ChanPid)). wrap_rpc(emqx_gateway_cm_proto_v1:get_chann_conn_mod(GwName, ClientId, ChanPid)).
-spec call(gateway_name(), emqx_types:clientid(), term()) -> term(). -spec call(gateway_name(), emqx_types:clientid(), term())
-> undefined | term().
call(GwName, ClientId, Req) -> call(GwName, ClientId, Req) ->
with_channel(GwName, ClientId, fun(ChanPid) -> with_channel(
wrap_rpc(emqx_gateway_cm_proto_v1:call(GwName, ClientId, ChanPid, Req)) GwName, ClientId,
fun(ChanPid) ->
wrap_rpc(
emqx_gateway_cm_proto_v1:call(GwName, ClientId, ChanPid, Req)
)
end). end).
-spec call(gateway_name(), emqx_types:clientid(), term(), timeout()) -> term(). -spec call(gateway_name(), emqx_types:clientid(), term(), timeout())
-> undefined | term().
call(GwName, ClientId, Req, Timeout) -> call(GwName, ClientId, Req, Timeout) ->
with_channel(GwName, ClientId, fun(ChanPid) -> with_channel(
GwName, ClientId,
fun(ChanPid) ->
wrap_rpc( wrap_rpc(
emqx_gateway_cm_proto_v1:call( emqx_gateway_cm_proto_v1:call(
GwName, ClientId, ChanPid, Req, Timeout)) GwName, ClientId, ChanPid, Req, Timeout)
)
end). end).
do_call(GwName, ClientId, ChanPid, Req) -> do_call(GwName, ClientId, ChanPid, Req) ->
case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of
undefined -> error(noproc); undefined -> throw(noproc);
ConnMod -> ConnMod:call(ChanPid, Req) ConnMod -> ConnMod:call(ChanPid, Req)
end. end.
do_call(GwName, ClientId, ChanPid, Req, Timeout) -> do_call(GwName, ClientId, ChanPid, Req, Timeout) ->
case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of
undefined -> error(noproc); undefined -> throw(noproc);
ConnMod -> ConnMod:call(ChanPid, Req, Timeout) ConnMod -> ConnMod:call(ChanPid, Req, Timeout)
end. end.
-spec cast(gateway_name(), emqx_types:clientid(), term()) -> term(). -spec cast(gateway_name(), emqx_types:clientid(), term()) -> ok.
cast(GwName, ClientId, Req) -> cast(GwName, ClientId, Req) ->
with_channel(GwName, ClientId, fun(ChanPid) -> with_channel(
wrap_rpc(emqx_gateway_cm_proto_v1:cast(GwName, ClientId, ChanPid, Req)) GwName, ClientId,
end). fun(ChanPid) ->
wrap_rpc(
emqx_gateway_cm_proto_v1:cast(GwName, ClientId, ChanPid, Req))
end),
ok.
do_cast(GwName, ClientId, ChanPid, Req) -> do_cast(GwName, ClientId, ChanPid, Req) ->
case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of case do_get_chann_conn_mod(GwName, ClientId, ChanPid) of
undefined -> error(noproc); undefined -> throw(noproc);
ConnMod -> ConnMod:cast(ChanPid, Req) ConnMod -> ConnMod:cast(ChanPid, Req)
end. end.
@ -625,7 +641,7 @@ locker_unlock(Locker, ClientId) ->
%% @private %% @private
wrap_rpc(Ret) -> wrap_rpc(Ret) ->
case Ret of case Ret of
{badrpc, Reason} -> error(Reason); {badrpc, Reason} -> throw({badrpc, Reason});
Res -> Res Res -> Res
end. end.
@ -642,7 +658,7 @@ init(Options) ->
TabOpts = [public, {write_concurrency, true}], TabOpts = [public, {write_concurrency, true}],
{ChanTab, ConnTab, InfoTab} = cmtabs(GwName), {ChanTab, ConnTab, InfoTab} = cmtabs(GwName),
ok = emqx_tables:new(ChanTab, [bag, {read_concurrency, true}|TabOpts]), ok = emqx_tables:new(ChanTab, [bag, {read_concurrency, true} | TabOpts]),
ok = emqx_tables:new(ConnTab, [bag | TabOpts]), ok = emqx_tables:new(ConnTab, [bag | TabOpts]),
ok = emqx_tables:new(InfoTab, [set, compressed | TabOpts]), ok = emqx_tables:new(InfoTab, [set, compressed | TabOpts]),

View File

@ -74,8 +74,10 @@
, listeners => [] , listeners => []
}. }.
-elvis([{elvis_style, god_modules, disable}]). -elvis([ {elvis_style, god_modules, disable}
-elvis([{elvis_style, no_nested_try_catch, disable}]). , {elvis_style, no_nested_try_catch, disable}
, {elvis_style, invalid_dynamic_call, disable}
]).
-define(DEFAULT_CALL_TIMEOUT, 15000). -define(DEFAULT_CALL_TIMEOUT, 15000).
@ -255,48 +257,39 @@ kickout_client(GwName, ClientId) ->
-> {error, any()} -> {error, any()}
| {ok, list()}. | {ok, list()}.
list_client_subscriptions(GwName, ClientId) -> list_client_subscriptions(GwName, ClientId) ->
with_channel(GwName, ClientId, case client_call(GwName, ClientId, subscriptions) of
fun(Pid) -> {error, Reason} -> {error, Reason};
case emqx_gateway_conn:call(
Pid,
subscriptions, ?DEFAULT_CALL_TIMEOUT) of
{ok, Subs} -> {ok, Subs} ->
{ok, lists:map(fun({Topic, SubOpts}) -> {ok, lists:map(fun({Topic, SubOpts}) ->
SubOpts#{topic => Topic} SubOpts#{topic => Topic}
end, Subs)}; end, Subs)}
{error, Reason} -> end.
{error, Reason}
end
end).
-spec client_subscribe(gateway_name(), emqx_types:clientid(), -spec client_subscribe(gateway_name(), emqx_types:clientid(),
emqx_types:topic(), emqx_types:subopts()) emqx_types:topic(), emqx_types:subopts())
-> {error, any()} -> {error, any()}
| {ok, {emqx_types:topic(), emqx_types:subopts()}}. | {ok, {emqx_types:topic(), emqx_types:subopts()}}.
client_subscribe(GwName, ClientId, Topic, SubOpts) -> client_subscribe(GwName, ClientId, Topic, SubOpts) ->
with_channel(GwName, ClientId, client_call(GwName, ClientId, {subscribe, Topic, SubOpts}).
fun(Pid) ->
emqx_gateway_conn:call(
Pid, {subscribe, Topic, SubOpts},
?DEFAULT_CALL_TIMEOUT
)
end).
-spec client_unsubscribe(gateway_name(), -spec client_unsubscribe(gateway_name(),
emqx_types:clientid(), emqx_types:topic()) emqx_types:clientid(), emqx_types:topic())
-> {error, any()} -> {error, any()}
| ok. | ok.
client_unsubscribe(GwName, ClientId, Topic) -> client_unsubscribe(GwName, ClientId, Topic) ->
with_channel(GwName, ClientId, client_call(GwName, ClientId, {unsubscribe, Topic}).
fun(Pid) ->
emqx_gateway_conn:call(
Pid, {unsubscribe, Topic}, ?DEFAULT_CALL_TIMEOUT)
end).
with_channel(GwName, ClientId, Fun) -> client_call(GwName, ClientId, Req) ->
case emqx_gateway_cm:with_channel(GwName, ClientId, Fun) of try emqx_gateway_cm:call(
undefined -> {error, not_found}; GwName, ClientId,
Req, ?DEFAULT_CALL_TIMEOUT) of
undefined ->
{error, not_found};
Res -> Res Res -> Res
catch throw : noproc ->
{error, not_found};
throw : {badrpc, Reason} ->
{error, {badrpc, Reason}}
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -127,10 +127,14 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
services => #{ services => #{
'emqx.exproto.v1.ConnectionAdapter' => emqx_exproto_gsvr} 'emqx.exproto.v1.ConnectionAdapter' => emqx_exproto_gsvr}
}, },
SvrOptions = case maps:to_list(maps:get(ssl, Options, #{})) of SvrOptions = case emqx_map_lib:deep_get([ssl, enable], Options, false) of
[] -> []; false -> [];
SslOpts -> true ->
[{ssl_options, SslOpts}] [{ssl_options,
maps:to_list(
maps:without([enable], maps:get(ssl, Options, #{}))
)
}]
end, end,
case grpc:start_server(GwName, ListenOn, Services, SvrOptions) of case grpc:start_server(GwName, ListenOn, Services, SvrOptions) of
{ok, _SvrPid} -> {ok, _SvrPid} ->

View File

@ -71,6 +71,8 @@
register_inflight :: maybe(term()), register_inflight :: maybe(term()),
%% Topics list for awaiting to register to client %% Topics list for awaiting to register to client
register_awaiting_queue :: list(), register_awaiting_queue :: list(),
%% Duration for asleep
asleep_timer_duration :: integer() | undefined,
%% Timer %% Timer
timers :: #{atom() => disable | undefined | reference()}, timers :: #{atom() => disable | undefined | reference()},
%%% Takeover %%% Takeover
@ -81,16 +83,17 @@
pendings :: list() pendings :: list()
}). }).
-type(channel() :: #channel{}). -type channel() :: #channel{}.
-type(conn_state() :: idle | connecting | connected | asleep | disconnected). -type conn_state() :: idle | connecting | connected | asleep | awake
| disconnected.
-type(reply() :: {outgoing, mqtt_sn_message()} -type reply() :: {outgoing, mqtt_sn_message()}
| {outgoing, [mqtt_sn_message()]} | {outgoing, [mqtt_sn_message()]}
| {event, conn_state()|updated} | {event, conn_state()|updated}
| {close, Reason :: atom()}). | {close, Reason :: atom()}.
-type(replies() :: reply() | [reply()]). -type replies() :: reply() | [reply()].
-define(TIMER_TABLE, #{ -define(TIMER_TABLE, #{
alive_timer => keepalive, alive_timer => keepalive,
@ -471,8 +474,25 @@ handle_in(?SN_WILLMSG_MSG(Payload),
handle_out(connack, ReasonCode, Channel) handle_out(connack, ReasonCode, Channel)
end; end;
%% TODO: takeover ???
handle_in(?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, ClientId),
Channel = #channel{
clientinfo = #{clientid := ClientId},
conn_state = ConnState})
when ConnState == asleep;
ConnState == awake ->
%% From the asleep or awake state a client can return either to the
%% active state by sending a CONNECT message [6.14]
?SLOG(info, #{ msg => "goto_connected_state"
, previous_state => ConnState
, clientid => ClientId
}),
handle_out(connack, ?SN_RC_ACCEPTED,
Channel#channel{conn_state = connected});
%% new connection
handle_in(Packet = ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId), handle_in(Packet = ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId),
Channel) -> Channel = #channel{conn_state = idle}) ->
case emqx_misc:pipeline( case emqx_misc:pipeline(
[ fun enrich_conninfo/2 [ fun enrich_conninfo/2
, fun run_conn_hooks/2 , fun run_conn_hooks/2
@ -589,7 +609,10 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
case emqx_session:puback(ClientInfo, MsgId, Session) of case emqx_session:puback(ClientInfo, MsgId, Session) of
{ok, Msg, NSession} -> {ok, Msg, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Channel), ok = after_message_acked(ClientInfo, Msg, Channel),
{ok, Channel#channel{session = NSession}}; {Replies, NChannel} = goto_asleep_if_buffered_msgs_sent(
Channel#channel{session = NSession}
),
{ok, Replies, NChannel};
{ok, Msg, Publishes, NSession} -> {ok, Msg, Publishes, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Channel), ok = after_message_acked(ClientInfo, Msg, Channel),
handle_out(publish, handle_out(publish,
@ -672,7 +695,10 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId),
Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) -> Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}) ->
case emqx_session:pubcomp(ClientInfo, MsgId, Session) of case emqx_session:pubcomp(ClientInfo, MsgId, Session) of
{ok, NSession} -> {ok, NSession} ->
{ok, Channel#channel{session = NSession}}; {Replies, NChannel} = goto_asleep_if_buffered_msgs_sent(
Channel#channel{session = NSession}
),
{ok, Replies, NChannel};
{ok, Publishes, NSession} -> {ok, Publishes, NSession} ->
handle_out(publish, Publishes, handle_out(publish, Publishes,
Channel#channel{session = NSession}); Channel#channel{session = NSession});
@ -732,32 +758,47 @@ handle_in(UnsubPkt = ?SN_UNSUBSCRIBE_MSG(_, MsgId, TopicIdOrName),
{ok, {outgoing, UnsubAck}, NChannel} {ok, {outgoing, UnsubAck}, NChannel}
end; end;
handle_in(?SN_PINGREQ_MSG(_ClientId), handle_in(?SN_PINGREQ_MSG(ClientId), Channel)
Channel = #channel{conn_state = asleep}) -> when ClientId == undefined;
{ok, Outgoing, NChannel} = awake(Channel), ClientId == <<>> ->
NOutgoings = Outgoing ++ [{outgoing, ?SN_PINGRESP_MSG()}],
{ok, NOutgoings, NChannel};
handle_in(?SN_PINGREQ_MSG(_ClientId), Channel) ->
{ok, {outgoing, ?SN_PINGRESP_MSG()}, Channel}; {ok, {outgoing, ?SN_PINGRESP_MSG()}, Channel};
handle_in(?SN_PINGRESP_MSG(), Channel) -> handle_in(?SN_PINGREQ_MSG(ReqClientId),
Channel = #channel{clientinfo = #{clientid := ClientId}})
when ReqClientId =/= ClientId ->
?SLOG(warning, #{ msg => "awake_pingreq_clientid_not_match"
, clientid => ClientId
, request_clientid => ReqClientId
}),
%% FIXME: takeover_and_awake..
{ok, Channel}; {ok, Channel};
handle_in(?SN_DISCONNECT_MSG(Duration), Channel) -> handle_in(?SN_PINGREQ_MSG(ClientId),
case Duration of Channel = #channel{conn_state = ConnState})
undefined -> when ConnState == idle; ConnState == asleep; ConnState == awake ->
awake(ClientId, Channel);
handle_in(?SN_PINGREQ_MSG(ClientId),
Channel = #channel{conn_state = ConnState}) ->
?SLOG(error, #{ msg => "awake_pingreq_in_bad_conn_state"
, conn_state => ConnState
, clientid => ClientId
}),
handle_out(disconnect, protocol_error, Channel);
handle_in(?SN_DISCONNECT_MSG(_Duration = undefined), Channel) ->
handle_out(disconnect, normal, Channel); handle_out(disconnect, normal, Channel);
_ ->
handle_in(?SN_DISCONNECT_MSG(Duration),
Channel = #channel{conn_state = ConnState})
when ConnState == connected; ConnState == asleep ->
%% A DISCONNECT message with a Duration field is sent by a client %% A DISCONNECT message with a Duration field is sent by a client
%% when it wants to go to the asleep state. The receipt of this %% when it wants to go to the asleep state. The receipt of this
%% message is also acknowledged by the gateway by means of a %% message is also acknowledged by the gateway by means of a
%% DISCONNECT message (without a duration field) [5.4.21] %% DISCONNECT message (without a duration field) [5.4.21]
%% %%
%% TODO: asleep mechanism
AckPkt = ?SN_DISCONNECT_MSG(undefined), AckPkt = ?SN_DISCONNECT_MSG(undefined),
{ok, {outgoing, AckPkt}, asleep(Duration, Channel)} {ok, [{outgoing, AckPkt}, {event, asleep}], asleep(Duration, Channel)};
end;
handle_in(?SN_WILLTOPICUPD_MSG(Flags, Topic), handle_in(?SN_WILLTOPICUPD_MSG(Flags, Topic),
Channel = #channel{will_msg = WillMsg, Channel = #channel{will_msg = WillMsg,
@ -1100,7 +1141,24 @@ do_unsubscribe(TopicFilters,
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Awake & Asleep %% Awake & Asleep
awake(Channel = #channel{session = Session, clientinfo = ClientInfo}) -> awake(ClientId, Channel = #channel{conn_state = idle}) ->
?SLOG(warning, #{ msg => "awake_pingreq_in_idle_state"
, clientid => ClientId
}),
%% TODO: takeover and awake?
%% 1. Query emqx_cm_registry to get the session state?
%% 2. Takeover it and goto awake state
{ok, {outgoing, ?SN_PINGRESP_MSG()}, Channel};
awake(ClientId, Channel = #channel{
conn_state = ConnState,
session = Session,
clientinfo = ClientInfo = #{clientid := ClientId}})
when ConnState == asleep; ConnState == awake ->
?SLOG(info, #{ msg => "goto_awake_state"
, clientid => ClientId
, previous_state => ConnState
}),
{ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
{NPublishes, NSession} = case emqx_session:deliver(ClientInfo, [], Session1) of {NPublishes, NSession} = case emqx_session:deliver(ClientInfo, [], Session1) of
{ok, Session2} -> {ok, Session2} ->
@ -1108,24 +1166,57 @@ awake(Channel = #channel{session = Session, clientinfo = ClientInfo}) ->
{ok, More, Session2} -> {ok, More, Session2} ->
{lists:append(Publishes, More), Session2} {lists:append(Publishes, More), Session2}
end, end,
{Replies, NChannel} = outgoing_deliver_and_register( Channel1 = cancel_timer(asleep_timer, Channel),
do_deliver(NPublishes, {Replies0, NChannel0} = outgoing_deliver_and_register(
Channel#channel{session = NSession}) do_deliver(
NPublishes,
Channel1#channel{
conn_state = awake, session = NSession}
)
), ),
{ok, Replies, NChannel}. Replies1 = [{event, awake} | Replies0],
{Replies2, NChannel} = goto_asleep_if_buffered_msgs_sent(NChannel0),
{ok, Replies1 ++ Replies2, NChannel}.
goto_asleep_if_buffered_msgs_sent(
Channel = #channel{
conn_state = awake,
session = Session,
asleep_timer_duration = Duration}) ->
case emqx_mqueue:is_empty(emqx_session:info(mqueue, Session)) andalso
emqx_inflight:is_empty(emqx_session:info(inflight, Session)) of
true ->
?SLOG(info, #{ msg => "goto_asleep_state"
, reason => buffered_messages_sent
, duration => Duration
}),
Replies = [ {outgoing, ?SN_PINGRESP_MSG()}
, {event, asleep}
],
{Replies, ensure_asleep_timer(Channel#channel{conn_state = asleep})};
false ->
{[], Channel}
end;
goto_asleep_if_buffered_msgs_sent(Channel) ->
{[], Channel}.
asleep(Duration, Channel = #channel{conn_state = asleep}) -> asleep(Duration, Channel = #channel{conn_state = asleep}) ->
%% 6.14: The client can also modify its sleep duration %% 6.14: The client can also modify its sleep duration
%% by sending a DISCONNECT message with a new value of %% by sending a DISCONNECT message with a new value of
%% the sleep duration %% the sleep duration
ensure_timer(asleep_timer, Duration, %%
cancel_timer(asleep_timer, Channel) %% XXX: Do we need to limit the maximum of Duration?
); ?SLOG(debug, #{ msg => "update_asleep_timer"
, new_duration => Duration
}),
ensure_asleep_timer(Duration, cancel_timer(asleep_timer, Channel));
asleep(Duration, Channel = #channel{conn_state = connected}) -> asleep(Duration, Channel = #channel{conn_state = connected}) ->
ensure_timer(asleep_timer, Duration, ?SLOG(info, #{ msg => "goto_asleep_state"
Channel#channel{conn_state = asleep} , duration => Duration
). }),
ensure_asleep_timer(Duration, Channel#channel{conn_state = asleep}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle outgoing packet %% Handle outgoing packet
@ -1154,10 +1245,11 @@ handle_out(connack, ReasonCode,
shutdown(Reason, AckPacket, Channel); shutdown(Reason, AckPacket, Channel);
handle_out(publish, Publishes, Channel) -> handle_out(publish, Publishes, Channel) ->
{Replies, NChannel} = outgoing_deliver_and_register( {Replies1, NChannel} = outgoing_deliver_and_register(
do_deliver(Publishes, Channel) do_deliver(Publishes, Channel)
), ),
{ok, Replies, NChannel}; {Replies2, NChannel2} = goto_asleep_if_buffered_msgs_sent(NChannel),
{ok, Replies1 ++ Replies2, NChannel2};
handle_out(puback, {TopicId, MsgId, Rc}, Channel) -> handle_out(puback, {TopicId, MsgId, Rc}, Channel) ->
{ok, {outgoing, ?SN_PUBACK_MSG(TopicId, MsgId, Rc)}, Channel}; {ok, {outgoing, ?SN_PUBACK_MSG(TopicId, MsgId, Rc)}, Channel};
@ -1688,6 +1780,14 @@ update_will_msg(Will, Payload) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Timer %% Timer
ensure_asleep_timer(Channel = #channel{asleep_timer_duration = Duration})
when is_integer(Duration) ->
ensure_asleep_timer(Duration, Channel).
ensure_asleep_timer(Durtion, Channel) ->
ensure_timer(asleep_timer, timer:seconds(Durtion),
Channel#channel{asleep_timer_duration = Durtion}).
cancel_timer(Name, Channel = #channel{timers = Timers}) -> cancel_timer(Name, Channel = #channel{timers = Timers}) ->
case maps:get(Name, Timers, undefined) of case maps:get(Name, Timers, undefined) of
undefined -> undefined ->

View File

@ -362,9 +362,9 @@ format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) ->
format(?SN_PINGREQ_MSG(ClientId)) -> format(?SN_PINGREQ_MSG(ClientId)) ->
io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]); io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]);
format(?SN_PINGRESP_MSG()) -> format(?SN_PINGRESP_MSG()) ->
"SN_PINGREQ()"; "SN_PINGRESP()";
format(?SN_DISCONNECT_MSG(Duration)) -> format(?SN_DISCONNECT_MSG(Duration)) ->
io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]); io_lib:format("SN_DISCONNECT(Duration=~w)", [Duration]);
format(#mqtt_sn_message{type = Type, variable = Var}) -> format(#mqtt_sn_message{type = Type, variable = Var}) ->
io_lib:format("mqtt_sn_message(type=~s, Var=~w)", io_lib:format("mqtt_sn_message(type=~s, Var=~w)",

File diff suppressed because it is too large Load Diff