diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 4a784252b..2e4ff7d77 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -144,7 +144,6 @@ attrs(#pstate{zone = Zone, proto_ver = ProtoVer, proto_name = ProtoName, keepalive = Keepalive, - will_topic = WillTopic, mountpoint = Mountpoint, is_super = IsSuper, is_bridge = IsBridge, @@ -158,7 +157,6 @@ attrs(#pstate{zone = Zone, {proto_name, ProtoName}, {clean_start, CleanStart}, {keepalive, Keepalive}, - {will_topic, WillTopic}, {mountpoint, Mountpoint}, {is_super, IsSuper}, {is_bridge, IsBridge}, @@ -286,14 +284,13 @@ process_packet(?CONNECT_PACKET( clean_start = CleanStart, keepalive = Keepalive, properties = ConnProps, - will_topic = WillTopic, client_id = ClientId, username = Username, password = Password} = Connect), PState) -> %% TODO: Mountpoint... %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) - WillMsg = emqx_packet:will_msg(Connect), + WillMsg = make_will_msg(Connect), PState1 = set_username(Username, PState#pstate{client_id = ClientId, @@ -302,7 +299,6 @@ process_packet(?CONNECT_PACKET( clean_start = CleanStart, keepalive = Keepalive, conn_props = ConnProps, - will_topic = WillTopic, will_msg = WillMsg, is_bridge = IsBridge, connected_at = os:timestamp()}), @@ -527,7 +523,6 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, 'Wildcard-Subscription-Available' => flag(Wildcard), 'Subscription-Identifier-Available' => 1, 'Response-Information' => ResponseInformation, - 'Shared-Subscription-Available' => flag(Shared)}, Props1 = if @@ -616,14 +611,16 @@ try_open_session(PState = #pstate{zone = Zone, client_id = ClientId, conn_pid = ConnPid, username = Username, - clean_start = CleanStart}) -> + clean_start = CleanStart, + will_msg = WillMsg}) -> SessAttrs = #{ zone => Zone, client_id => ClientId, conn_pid => ConnPid, username => Username, - clean_start => CleanStart + clean_start => CleanStart, + will_msg => WillMsg }, SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]), @@ -636,14 +633,14 @@ try_open_session(PState = #pstate{zone = Zone, set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) -> maps:put(max_inflight, if ProtoVer =:= ?MQTT_PROTO_V5 -> - maps:get('Receive-Maximum', ConnProps, 65535); + get_property('Receive-Maximum', ConnProps, 65535); true -> emqx_zone:get_env(Zone, max_inflight, 65535) end, SessAttrs); set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) -> maps:put(expiry_interval, if ProtoVer =:= ?MQTT_PROTO_V5 -> - maps:get('Session-Expiry-Interval', ConnProps, 0); + get_property('Session-Expiry-Interval', ConnProps, 0); true -> case CleanStart of true -> 0; @@ -654,7 +651,7 @@ set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, c set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) -> maps:put(topic_alias_maximum, if ProtoVer =:= ?MQTT_PROTO_V5 -> - maps:get('Topic-Alias-Maximum', ConnProps, 0); + get_property('Topic-Alias-Maximum', ConnProps, 0); true -> emqx_zone:get_env(Zone, max_topic_alias, 0) end, SessAttrs); @@ -678,6 +675,21 @@ set_property(Name, Value, ?NO_PROPS) -> set_property(Name, Value, Props) -> Props#{Name => Value}. +get_property(_Name, undefined, Default) -> + Default; +get_property(Name, Props, Default) -> + maps:get(Name, Props, Default). + +make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer, + will_props = WillProps} = Connect) -> + emqx_packet:will_msg(if + ProtoVer =:= ?MQTT_PROTO_V5 -> + WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0), + Connect#mqtt_packet_connect{will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)}; + true -> + Connect + end). + %%------------------------------------------------------------------------------ %% Check Packet %%------------------------------------------------------------------------------ @@ -816,23 +828,11 @@ shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict; Reason =:= discard -> emqx_cm:unregister_connection(ClientId); shutdown(Reason, PState = #pstate{connected = true, - client_id = ClientId, - will_msg = WillMsg}) -> + client_id = ClientId}) -> ?LOG(info, "Shutdown for ~p", [Reason], PState), - _ = send_willmsg(WillMsg), emqx_hooks:run('client.disconnected', [credentials(PState), Reason]), emqx_cm:unregister_connection(ClientId). -send_willmsg(undefined) -> - ignore; -send_willmsg(WillMsg = #message{topic = Topic, - headers = #{'Will-Delay-Interval' := Interval}}) - when is_integer(Interval), Interval > 0 -> - SendAfter = integer_to_binary(Interval), - emqx_broker:publish(WillMsg#message{topic = <<"$delayed/", SendAfter/binary, "/", Topic/binary>>}); -send_willmsg(WillMsg) -> - emqx_broker:publish(WillMsg). - start_keepalive(0, _PState) -> ignore; start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d327723f5..286066a59 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -47,7 +47,7 @@ -export([info/1, attrs/1]). -export([stats/1]). -export([resume/2, discard/2]). --export([update_expiry_interval/2, update_misc/2]). +-export([update_expiry_interval/2]). -export([subscribe/2, subscribe/4]). -export([publish/3]). -export([puback/2, puback/3]). @@ -147,7 +147,11 @@ %% Created at created_at :: erlang:timestamp(), - topic_alias_maximum :: pos_integer() + topic_alias_maximum :: pos_integer(), + + will_msg :: emqx:message(), + + will_delay_timer :: reference() | undefined }). -type(spid() :: pid()). @@ -307,9 +311,9 @@ unsubscribe(SPid, PacketId, Properties, TopicFilters) -> UnsubReq = {PacketId, Properties, TopicFilters}, gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}). --spec(resume(spid(), pid()) -> ok). -resume(SPid, ConnPid) -> - gen_server:cast(SPid, {resume, ConnPid}). +-spec(resume(spid(), map()) -> ok). +resume(SPid, SessAttrs) -> + gen_server:cast(SPid, {resume, SessAttrs}). %% @doc Discard the session -spec(discard(spid(), ByPid :: pid()) -> ok). @@ -320,9 +324,6 @@ discard(SPid, ByPid) -> update_expiry_interval(SPid, Interval) -> gen_server:cast(SPid, {expiry_interval, Interval}). -update_misc(SPid, Misc) -> - gen_server:cast(SPid, {update_misc, Misc}). - -spec(close(spid()) -> ok). close(SPid) -> gen_server:call(SPid, close, infinity). @@ -338,7 +339,8 @@ init([Parent, #{zone := Zone, clean_start := CleanStart, expiry_interval := ExpiryInterval, max_inflight := MaxInflight, - topic_alias_maximum := TopicAliasMaximum}]) -> + topic_alias_maximum := TopicAliasMaximum, + will_msg := WillMsg}]) -> process_flag(trap_exit, true), true = link(ConnPid), IdleTimout = get_env(Zone, idle_timeout, 30000), @@ -362,7 +364,8 @@ init([Parent, #{zone := Zone, deliver_stats = 0, enqueue_stats = 0, created_at = os:timestamp(), - topic_alias_maximum = TopicAliasMaximum + topic_alias_maximum = TopicAliasMaximum, + will_msg = WillMsg }, emqx_sm:register_session(ClientId, attrs(State)), emqx_sm:set_session_stats(ClientId, stats(State)), @@ -511,17 +514,22 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight end; %% RESUME: -handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, - conn_pid = OldConnPid, - clean_start = CleanStart, - retry_timer = RetryTimer, - await_rel_timer = AwaitTimer, - expiry_timer = ExpireTimer}) -> +handle_cast({resume, #{conn_pid := ConnPid, + will_msg := WillMsg, + expiry_interval := SessionExpiryInterval, + max_inflight := MaxInflight, + topic_alias_maximum := TopicAliasMaximum}}, State = #state{client_id = ClientId, + conn_pid = OldConnPid, + clean_start = CleanStart, + retry_timer = RetryTimer, + await_rel_timer = AwaitTimer, + expiry_timer = ExpireTimer, + will_delay_timer = WillDelayTimer}) -> ?LOG(info, "Resumed by connection ~p ", [ConnPid], State), %% Cancel Timers - lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer]), + lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]), case kick(ClientId, OldConnPid, ConnPid) of ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State); @@ -530,14 +538,19 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, true = link(ConnPid), - State1 = State#state{conn_pid = ConnPid, - binding = binding(ConnPid), - old_conn_pid = OldConnPid, - clean_start = false, - retry_timer = undefined, - awaiting_rel = #{}, - await_rel_timer = undefined, - expiry_timer = undefined}, + State1 = State#state{conn_pid = ConnPid, + binding = binding(ConnPid), + old_conn_pid = OldConnPid, + clean_start = false, + retry_timer = undefined, + awaiting_rel = #{}, + await_rel_timer = undefined, + expiry_timer = undefined, + expiry_interval = SessionExpiryInterval, + inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight), + topic_alias_maximum = TopicAliasMaximum, + will_delay_timer = undefined, + will_msg = WillMsg}, %% Clean Session: true -> false??? CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)), @@ -550,10 +563,6 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, handle_cast({expiry_interval, Interval}, State) -> {noreply, State#state{expiry_interval = Interval}}; -handle_cast({update_misc, #{max_inflight := MaxInflight, topic_alias_maximum := TopicAliasMaximum}}, State) -> - {noreply, State#state{inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight), - topic_alias_maximum = TopicAliasMaximum}}; - handle_cast(Msg, State) -> emqx_logger:error("[Session] unexpected cast: ~p", [Msg]), {noreply, State}. @@ -612,11 +621,17 @@ handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> ?LOG(info, "expired, shutdown now:(", [], State), shutdown(expired, State); -handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) -> - {stop, Reason, State#state{conn_pid = undefined}}; +handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) -> + send_willmsg(WillMsg), + {noreply, State#state{will_msg = undefined}}; + +handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) -> + send_willmsg(WillMsg), + {stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}}; handle_info({'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) -> - {noreply, ensure_expire_timer(State#state{conn_pid = undefined})}; + State1 = ensure_will_delay_timer(State), + {noreply, ensure_expire_timer(State1#state{conn_pid = undefined})}; handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) -> %% ignore @@ -631,8 +646,9 @@ handle_info(Info, State) -> emqx_logger:error("[Session] unexpected info: ~p", [Info]), {noreply, State}. -terminate(Reason, #state{client_id = ClientId, conn_pid = ConnPid}) -> +terminate(Reason, #state{will_msg = WillMsg, client_id = ClientId, conn_pid = ConnPid}) -> emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]), + send_willmsg(WillMsg), %% Ensure to shutdown the connection if ConnPid =/= undefined -> @@ -714,6 +730,14 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, ensure_retry_timer(Interval - max(0, Age), State) end. +%%------------------------------------------------------------------------------ +%% Send Will Message +%%------------------------------------------------------------------------------ +send_willmsg(undefined) -> + ignore; +send_willmsg(WillMsg) -> + emqx_broker:publish(WillMsg). + %%------------------------------------------------------------------------------ %% Expire Awaiting Rel %%------------------------------------------------------------------------------ @@ -899,6 +923,11 @@ ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > ensure_expire_timer(State) -> State. +ensure_will_delay_timer(State = #state{will_msg = #message{headers = #{'Will-Delay-Interval' := WillDelayInterval}}}) -> + State#state{will_delay_timer = emqx_misc:start_timer(WillDelayInterval * 1000, will_delay)}; +ensure_will_delay_timer(State) -> + State. + ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined, idle_timeout = IdleTimeout}) -> State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index d45548a78..bc3f6ff68 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -22,7 +22,7 @@ -export([open_session/1, close_session/1]). -export([lookup_session/1, lookup_session_pid/1]). --export([resume_session/1, resume_session/2]). +-export([resume_session/2]). -export([discard_session/1, discard_session/2]). -export([register_session/2, unregister_session/1]). -export([get_session_attrs/1, set_session_attrs/2]). @@ -59,15 +59,13 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid end, emqx_sm_locker:trans(ClientId, CleanStart); -open_session(SessAttrs = #{clean_start := false, - client_id := ClientId, - conn_pid := ConnPid, - max_inflight := MaxInflight, - topic_alias_maximum := TopicAliasMaximum}) -> +open_session(SessAttrs = #{clean_start := false, + client_id := ClientId, + max_inflight := MaxInflight, + topic_alias_maximum := TopicAliasMaximum}) -> ResumeStart = fun(_) -> - case resume_session(ClientId, ConnPid) of + case resume_session(ClientId, SessAttrs) of {ok, SPid} -> - emqx_session:update_misc(SPid, #{max_inflight => MaxInflight, topic_alias_maximum => TopicAliasMaximum}), {ok, SPid, true}; {error, not_found} -> emqx_session_sup:start_session(SessAttrs) @@ -90,15 +88,12 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) -> end, lookup_session(ClientId)). %% @doc Try to resume a session. --spec(resume_session(emqx_types:client_id()) -> {ok, pid()} | {error, term()}). -resume_session(ClientId) -> - resume_session(ClientId, self()). - -resume_session(ClientId, ConnPid) -> +-spec(resume_session(emqx_types:client_id(), map()) -> {ok, pid()} | {error, term()}). +resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) -> case lookup_session(ClientId) of [] -> {error, not_found}; [{_ClientId, SPid}] -> - ok = emqx_session:resume(SPid, ConnPid), + ok = emqx_session:resume(SPid, SessAttrs), {ok, SPid}; Sessions -> [{_, SPid}|StaleSessions] = lists:reverse(Sessions), @@ -106,7 +101,7 @@ resume_session(ClientId, ConnPid) -> lists:foreach(fun({_, StalePid}) -> catch emqx_session:discard(StalePid, ConnPid) end, StaleSessions), - ok = emqx_session:resume(SPid, ConnPid), + ok = emqx_session:resume(SPid, SessAttrs), {ok, SPid} end. diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index e82f70395..c6bea732a 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -38,6 +38,13 @@ clean_start = false, password = <<"public">>})). +-define(CLIENT3, ?CONNECT_PACKET(#mqtt_packet_connect{ + username = <<"admin">>, + proto_ver = ?MQTT_PROTO_V5, + clean_start = false, + password = <<"public">>, + will_props = #{'Will-Delay-Interval' => 2}})). + -define(SUBCODE, [0]). -define(PACKETID, 1). @@ -66,6 +73,7 @@ groups() -> [ mqtt_connect, mqtt_connect_with_tcp, + mqtt_connect_with_will_props, mqtt_connect_with_ssl_oneway, mqtt_connect_with_ssl_twoway, mqtt_connect_with_ws @@ -109,6 +117,14 @@ mqtt_connect_with_tcp(_) -> {ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), _} = raw_recv_pase(Data), emqx_client_sock:close(Sock). +mqtt_connect_with_will_props(_) -> + %% Issue #599 + %% Empty clientId and clean_session = false + {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), + Packet = raw_send_serialize(?CLIENT3), + emqx_client_sock:send(Sock, Packet), + emqx_client_sock:close(Sock). + mqtt_connect_with_ssl_oneway(_) -> emqx:shutdown(), emqx_ct_broker_helpers:change_opts(ssl_oneway), diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index 4a49a1fc5..24de66d0a 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -46,14 +46,15 @@ init([ClientId]) -> }. handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> - Attrs = #{ zone => Zone, - client_id => ClientId, - conn_pid => ClientPid, - clean_start => true, - username => undefined, - expiry_interval => 0, - max_inflight => 0, - topic_alias_maximum => 0 + Attrs = #{ zone => Zone, + client_id => ClientId, + conn_pid => ClientPid, + clean_start => true, + username => undefined, + expiry_interval => 0, + max_inflight => 0, + topic_alias_maximum => 0, + will_msg => undefined }, {ok, SessPid} = emqx_sm:open_session(Attrs), {reply, {ok, SessPid}, diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index f97f475f8..060557c6c 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -131,6 +131,125 @@ connect_v5(_) -> #{'Response-Information' := _RespInfo}), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) end), + + % test clean start + with_connection(fun(Sock) -> + emqx_client_sock:send(Sock, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + clean_start = true, + client_id = <<"myclient">>, + properties = + #{'Session-Expiry-Interval' => 10}}) + )), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), + emqx_client_sock:send(Sock, raw_send_serialize( + ?DISCONNECT_PACKET(?RC_SUCCESS) + )) + end), + + timer:sleep(1000), + + with_connection(fun(Sock) -> + emqx_client_sock:send(Sock, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + clean_start = false, + client_id = <<"myclient">>, + properties = + #{'Session-Expiry-Interval' => 10}}) + )), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) + end), + + % test will message publish and cancel + with_connection(fun(Sock) -> + emqx_client_sock:send(Sock, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + clean_start = true, + client_id = <<"myclient">>, + will_flag = true, + will_qos = ?QOS_1, + will_retain = false, + will_props = #{'Will-Delay-Interval' => 5}, + will_topic = <<"TopicA">>, + will_payload = <<"will message">>, + properties = #{'Session-Expiry-Interval' => 3} + } + ) + ) + ), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5), + + {ok, Sock2} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, + [binary, {packet, raw}, + {active, false}], 3000), + + do_connect(Sock2, ?MQTT_PROTO_V5), + + emqx_client_sock:send(Sock2, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1, + qos => ?QOS_2, + rap => 0, + nl => 0, + rc => 0}}]), + #{version => ?MQTT_PROTO_V5})), + + {ok, SubData} = gen_tcp:recv(Sock2, 0), + {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5), + + emqx_client_sock:send(Sock, raw_send_serialize( + ?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE) + ) + ), + + {error, timeout} = gen_tcp:recv(Sock2, 0, 1000), + + % session resumed + {ok, Sock3} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, + [binary, {packet, raw}, + {active, false}], 3000), + + emqx_client_sock:send(Sock3, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + clean_start = false, + client_id = <<"myclient">>, + will_flag = true, + will_qos = ?QOS_1, + will_retain = false, + will_props = #{'Will-Delay-Interval' => 5}, + will_topic = <<"TopicA">>, + will_payload = <<"will message 2">>, + properties = #{'Session-Expiry-Interval' => 3} + } + ) + ) + ), + {ok, Data3} = gen_tcp:recv(Sock3, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), + + emqx_client_sock:send(Sock3, raw_send_serialize( + ?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE) + ) + ), + + {ok, WillData} = gen_tcp:recv(Sock2, 0), + {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5), + + emqx_client_sock:close(Sock2) + end), ok. do_connect(Sock, ProtoVer) -> diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index 110e13026..2b83b6afb 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -24,8 +24,15 @@ all() -> [t_open_close_session]. t_open_close_session(_) -> emqx_ct_broker_helpers:run_setup_steps(), {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>), - Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid, - zone => internal, username => <<"zhou">>, expiry_interval => 0, max_inflight => 0, topic_alias_maximum => 0}, + Attrs = #{clean_start => true, + client_id => <<"client">>, + conn_pid => ClientPid, + zone => internal, + username => <<"zhou">>, + expiry_interval => 0, + max_inflight => 0, + topic_alias_maximum => 0, + will_msg => undefined}, {ok, SPid} = emqx_sm:open_session(Attrs), [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>), SPid = emqx_sm:lookup_session_pid(<<"client">>),