Implement will message delay publish in session, add test case for clean start and will message in connect packet

This commit is contained in:
周子博 2018-10-25 14:26:31 +08:00
parent 540484e603
commit 6675e3d496
4 changed files with 189 additions and 59 deletions

View File

@ -50,7 +50,6 @@
clean_start,
topic_aliases,
packet_size,
will_topic,
will_msg,
keepalive,
mountpoint,
@ -142,7 +141,6 @@ attrs(#pstate{zone = Zone,
proto_ver = ProtoVer,
proto_name = ProtoName,
keepalive = Keepalive,
will_topic = WillTopic,
mountpoint = Mountpoint,
is_super = IsSuper,
is_bridge = IsBridge,
@ -156,7 +154,6 @@ attrs(#pstate{zone = Zone,
{proto_name, ProtoName},
{clean_start, CleanStart},
{keepalive, Keepalive},
{will_topic, WillTopic},
{mountpoint, Mountpoint},
{is_super, IsSuper},
{is_bridge, IsBridge},
@ -285,7 +282,6 @@ process_packet(?CONNECT_PACKET(
keepalive = Keepalive,
properties = ConnProps,
will_props = WillProps,
will_topic = WillTopic,
client_id = ClientId,
username = Username,
password = Password} = Connect), PState) ->
@ -310,7 +306,6 @@ process_packet(?CONNECT_PACKET(
clean_start = CleanStart,
keepalive = Keepalive,
conn_props = ConnProps,
will_topic = WillTopic,
will_msg = WillMsg,
is_bridge = IsBridge,
connected_at = os:timestamp()}),
@ -535,7 +530,6 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
'Wildcard-Subscription-Available' => flag(Wildcard),
'Subscription-Identifier-Available' => 1,
'Response-Information' => ResponseInformation,
'Shared-Subscription-Available' => flag(Shared)},
Props1 = if
@ -624,23 +618,22 @@ try_open_session(PState = #pstate{zone = Zone,
client_id = ClientId,
conn_pid = ConnPid,
username = Username,
clean_start = CleanStart}) ->
clean_start = CleanStart,
will_msg = WillMsg}) ->
SessAttrs = #{
zone => Zone,
client_id => ClientId,
conn_pid => ConnPid,
username => Username,
clean_start => CleanStart
clean_start => CleanStart,
will_msg => WillMsg
},
SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]),
case emqx_sm:open_session(SessAttrs1) of
{ok, SPid} ->
{ok, SPid, false};
{ok, SPid, true} ->
emqx_delayed_publish:cancel_publish(ClientId),
{ok, SPid, true};
Other -> Other
end.
@ -832,21 +825,11 @@ shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict;
Reason =:= discard ->
emqx_cm:unregister_connection(ClientId);
shutdown(Reason, PState = #pstate{connected = true,
client_id = ClientId,
will_msg = WillMsg}) ->
client_id = ClientId}) ->
?LOG(info, "Shutdown for ~p", [Reason], PState),
_ = send_willmsg(WillMsg, ClientId),
emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
emqx_cm:unregister_connection(ClientId).
send_willmsg(undefined, _ClientId) ->
ignore;
send_willmsg(WillMsg = #message{headers = #{'Will-Delay-Interval' := Interval}}, ClientId)
when is_integer(Interval), Interval > 0 ->
emqx_delayed_publish:delay_publish(WillMsg, ClientId);
send_willmsg(WillMsg, _ClientId) ->
emqx_broker:publish(WillMsg).
start_keepalive(0, _PState) ->
ignore;
start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 ->

View File

@ -46,7 +46,7 @@
-export([start_link/1]).
-export([info/1, attrs/1]).
-export([stats/1]).
-export([resume/2, discard/2]).
-export([resume/3, discard/2]).
-export([update_expiry_interval/2, update_misc/2]).
-export([subscribe/2, subscribe/4]).
-export([publish/3]).
@ -147,7 +147,11 @@
%% Created at
created_at :: erlang:timestamp(),
topic_alias_maximum :: pos_integer()
topic_alias_maximum :: pos_integer(),
will_msg :: emqx:message(),
will_delay_timer :: reference() | undefined
}).
-type(spid() :: pid()).
@ -307,9 +311,9 @@ unsubscribe(SPid, PacketId, Properties, TopicFilters) ->
UnsubReq = {PacketId, Properties, TopicFilters},
gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}).
-spec(resume(spid(), pid()) -> ok).
resume(SPid, ConnPid) ->
gen_server:cast(SPid, {resume, ConnPid}).
-spec(resume(spid(), pid(), emqx:message()) -> ok).
resume(SPid, ConnPid, WillMsg) ->
gen_server:cast(SPid, {resume, ConnPid, WillMsg}).
%% @doc Discard the session
-spec(discard(spid(), ByPid :: pid()) -> ok).
@ -338,7 +342,8 @@ init([Parent, #{zone := Zone,
clean_start := CleanStart,
expiry_interval := ExpiryInterval,
max_inflight := MaxInflight,
topic_alias_maximum := TopicAliasMaximum}]) ->
topic_alias_maximum := TopicAliasMaximum,
will_msg := WillMsg}]) ->
process_flag(trap_exit, true),
true = link(ConnPid),
IdleTimout = get_env(Zone, idle_timeout, 30000),
@ -362,7 +367,8 @@ init([Parent, #{zone := Zone,
deliver_stats = 0,
enqueue_stats = 0,
created_at = os:timestamp(),
topic_alias_maximum = TopicAliasMaximum
topic_alias_maximum = TopicAliasMaximum,
will_msg = WillMsg
},
emqx_sm:register_session(ClientId, attrs(State)),
emqx_sm:set_session_stats(ClientId, stats(State)),
@ -511,17 +517,18 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
end;
%% RESUME:
handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
conn_pid = OldConnPid,
clean_start = CleanStart,
retry_timer = RetryTimer,
await_rel_timer = AwaitTimer,
expiry_timer = ExpireTimer}) ->
handle_cast({resume, ConnPid, WillMsg}, State = #state{client_id = ClientId,
conn_pid = OldConnPid,
clean_start = CleanStart,
retry_timer = RetryTimer,
await_rel_timer = AwaitTimer,
expiry_timer = ExpireTimer,
will_delay_timer = WillDelayTimer}) ->
?LOG(info, "Resumed by connection ~p ", [ConnPid], State),
%% Cancel Timers
lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer]),
lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]),
case kick(ClientId, OldConnPid, ConnPid) of
ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State);
@ -530,14 +537,16 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
true = link(ConnPid),
State1 = State#state{conn_pid = ConnPid,
binding = binding(ConnPid),
old_conn_pid = OldConnPid,
clean_start = false,
retry_timer = undefined,
awaiting_rel = #{},
await_rel_timer = undefined,
expiry_timer = undefined},
State1 = State#state{conn_pid = ConnPid,
binding = binding(ConnPid),
old_conn_pid = OldConnPid,
clean_start = false,
retry_timer = undefined,
awaiting_rel = #{},
await_rel_timer = undefined,
expiry_timer = undefined,
will_delay_timer = undefined,
will_msg = WillMsg},
%% Clean Session: true -> false???
CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
@ -610,13 +619,19 @@ handle_info({timeout, Timer, emit_stats},
end;
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
?LOG(info, "expired, shutdown now:(", [], State),
shutdown(expired, State);
shutdown(expired, State#state{will_msg = undefined});
handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) ->
{stop, Reason, State#state{conn_pid = undefined}};
handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) ->
send_willmsg(WillMsg),
{noreply, State#state{will_msg = undefined}};
handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) ->
send_willmsg(WillMsg),
{stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}};
handle_info({'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
{noreply, ensure_expire_timer(State#state{conn_pid = undefined})};
State1 = ensure_will_delay_timer(State),
{noreply, ensure_expire_timer(State1#state{conn_pid = undefined})};
handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
%% ignore
@ -631,8 +646,9 @@ handle_info(Info, State) ->
emqx_logger:error("[Session] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(Reason, #state{client_id = ClientId, conn_pid = ConnPid}) ->
terminate(Reason, #state{will_msg = WillMsg, client_id = ClientId, conn_pid = ConnPid}) ->
emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
send_willmsg(WillMsg),
%% Ensure to shutdown the connection
if
ConnPid =/= undefined ->
@ -714,6 +730,14 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
ensure_retry_timer(Interval - max(0, Age), State)
end.
%%------------------------------------------------------------------------------
%% Send Will Message
%%------------------------------------------------------------------------------
send_willmsg(undefined) ->
ignore;
send_willmsg(WillMsg) ->
emqx_broker:publish(WillMsg).
%%------------------------------------------------------------------------------
%% Expire Awaiting Rel
%%------------------------------------------------------------------------------
@ -899,6 +923,11 @@ ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval >
ensure_expire_timer(State) ->
State.
ensure_will_delay_timer(State = #state{will_msg = #message{headers = #{'Will-Delay-Interval' := WillDelayInterval}}}) ->
State#state{will_delay_timer = emqx_misc:start_timer(WillDelayInterval * 1000, will_delay)};
ensure_will_delay_timer(State) ->
State.
ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined,
idle_timeout = IdleTimeout}) ->
State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};

View File

@ -59,13 +59,12 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid
end,
emqx_sm_locker:trans(ClientId, CleanStart);
open_session(SessAttrs = #{clean_start := false,
client_id := ClientId,
conn_pid := ConnPid,
max_inflight := MaxInflight,
topic_alias_maximum := TopicAliasMaximum}) ->
open_session(SessAttrs = #{clean_start := false,
client_id := ClientId,
max_inflight := MaxInflight,
topic_alias_maximum := TopicAliasMaximum}) ->
ResumeStart = fun(_) ->
case resume_session(ClientId, ConnPid) of
case resume_session(ClientId, SessAttrs) of
{ok, SPid} ->
emqx_session:update_misc(SPid, #{max_inflight => MaxInflight, topic_alias_maximum => TopicAliasMaximum}),
{ok, SPid, true};
@ -92,13 +91,13 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
%% @doc Try to resume a session.
-spec(resume_session(emqx_types:client_id()) -> {ok, pid()} | {error, term()}).
resume_session(ClientId) ->
resume_session(ClientId, self()).
resume_session(ClientId, #{conn_pid => self(), will_msg => undefined}).
resume_session(ClientId, ConnPid) ->
resume_session(ClientId, #{conn_pid := ConnPid, will_msg := WillMsg}) ->
case lookup_session(ClientId) of
[] -> {error, not_found};
[{_ClientId, SPid}] ->
ok = emqx_session:resume(SPid, ConnPid),
ok = emqx_session:resume(SPid, ConnPid, WillMsg),
{ok, SPid};
Sessions ->
[{_, SPid}|StaleSessions] = lists:reverse(Sessions),
@ -106,7 +105,7 @@ resume_session(ClientId, ConnPid) ->
lists:foreach(fun({_, StalePid}) ->
catch emqx_session:discard(StalePid, ConnPid)
end, StaleSessions),
ok = emqx_session:resume(SPid, ConnPid),
ok = emqx_session:resume(SPid, ConnPid, WillMsg),
{ok, SPid}
end.

View File

@ -131,6 +131,125 @@ connect_v5(_) ->
#{'Response-Information' := _RespInfo}), _} =
raw_recv_parse(Data, ?MQTT_PROTO_V5)
end),
% test clean start
with_connection(fun(Sock) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
clean_start = true,
client_id = <<"myclient">>,
properties =
#{'Session-Expiry-Interval' => 10}})
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock, raw_send_serialize(
?DISCONNECT_PACKET(?RC_SUCCESS)
))
end),
timer:sleep(1000),
with_connection(fun(Sock) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
clean_start = false,
client_id = <<"myclient">>,
properties =
#{'Session-Expiry-Interval' => 10}})
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
end),
% test will message publish and cancel
with_connection(fun(Sock) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
clean_start = true,
client_id = <<"myclient">>,
will_flag = true,
will_qos = ?QOS_1,
will_retain = false,
will_props = #{'Will-Delay-Interval' => 5},
will_topic = <<"TopicA">>,
will_payload = <<"will message">>,
properties = #{'Session-Expiry-Interval' => 3}
}
)
)
),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
{ok, Sock2} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
[binary, {packet, raw},
{active, false}], 3000),
do_connect(Sock2, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock2, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1,
qos => ?QOS_2,
rap => 0,
nl => 0,
rc => 0}}]),
#{version => ?MQTT_PROTO_V5})),
{ok, SubData} = gen_tcp:recv(Sock2, 0),
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock, raw_send_serialize(
?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE)
)
),
{error, timeout} = gen_tcp:recv(Sock2, 0, 1000),
% session resumed
{ok, Sock3} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
[binary, {packet, raw},
{active, false}], 3000),
emqx_client_sock:send(Sock3,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
clean_start = false,
client_id = <<"myclient">>,
will_flag = true,
will_qos = ?QOS_1,
will_retain = false,
will_props = #{'Will-Delay-Interval' => 5},
will_topic = <<"TopicA">>,
will_payload = <<"will message 2">>,
properties = #{'Session-Expiry-Interval' => 3}
}
)
)
),
{ok, Data3} = gen_tcp:recv(Sock3, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock3, raw_send_serialize(
?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE)
)
),
{ok, WillData} = gen_tcp:recv(Sock2, 0),
{ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5),
emqx_client_sock:close(Sock2)
end),
ok.
do_connect(Sock, ProtoVer) ->