Fix will msg (#2156)

* Remove will_msg and will_topic from protocol state

* Modify try_open_session/1
This commit is contained in:
tigercl 2019-01-17 13:57:42 +08:00 committed by Shawn
parent 3748cd434d
commit bc31faac6b
5 changed files with 47 additions and 43 deletions

View File

@ -50,8 +50,6 @@
clean_start, clean_start,
topic_aliases, topic_aliases,
packet_size, packet_size,
will_topic,
will_msg,
keepalive, keepalive,
mountpoint, mountpoint,
is_super, is_super,
@ -130,13 +128,11 @@ info(PState = #pstate{conn_props = ConnProps,
ack_props = AckProps, ack_props = AckProps,
session = Session, session = Session,
topic_aliases = Aliases, topic_aliases = Aliases,
will_msg = WillMsg,
enable_acl = EnableAcl}) -> enable_acl = EnableAcl}) ->
attrs(PState) ++ [{conn_props, ConnProps}, attrs(PState) ++ [{conn_props, ConnProps},
{ack_props, AckProps}, {ack_props, AckProps},
{session, Session}, {session, Session},
{topic_aliases, Aliases}, {topic_aliases, Aliases},
{will_msg, WillMsg},
{enable_acl, EnableAcl}]. {enable_acl, EnableAcl}].
attrs(#pstate{zone = Zone, attrs(#pstate{zone = Zone,
@ -349,11 +345,11 @@ process_packet(?CONNECT_PACKET(
case authenticate(credentials(PState2), Password) of case authenticate(credentials(PState2), Password) of
{ok, IsSuper} -> {ok, IsSuper} ->
%% Maybe assign a clientId %% Maybe assign a clientId
PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper, PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}),
will_msg = make_will_msg(ConnPkt)}),
emqx_logger:set_metadata_client_id(PState3#pstate.client_id), emqx_logger:set_metadata_client_id(PState3#pstate.client_id),
%% Open session %% Open session
case try_open_session(PState3) of SessAttrs = lists:foldl(fun set_session_attrs/2, #{will_msg => make_will_msg(ConnPkt)}, [{max_inflight, PState3}, {expiry_interval, PState3}, {misc, PState3}]),
case try_open_session(SessAttrs) of
{ok, SPid, SP} -> {ok, SPid, SP} ->
PState4 = PState3#pstate{session = SPid, connected = true}, PState4 = PState3#pstate{session = SPid, connected = true},
ok = emqx_cm:register_connection(client_id(PState4)), ok = emqx_cm:register_connection(client_id(PState4)),
@ -502,16 +498,15 @@ process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := In
case Interval =/= 0 andalso OldInterval =:= 0 of case Interval =/= 0 andalso OldInterval =:= 0 of
true -> true ->
deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState), deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState),
{error, protocol_error, PState#pstate{will_msg = undefined}}; {error, protocol_error, PState};
false -> false ->
emqx_session:update_expiry_interval(SPid, Interval), emqx_session:update_expiry_interval(SPid, Interval),
%% Clean willmsg {stop, normal, PState}
{stop, normal, PState#pstate{will_msg = undefined}}
end; end;
process_packet(?DISCONNECT_PACKET(?RC_SUCCESS), PState) -> process_packet(?DISCONNECT_PACKET(?RC_SUCCESS), PState) ->
{stop, normal, PState#pstate{will_msg = undefined}}; {stop, normal, PState};
process_packet(?DISCONNECT_PACKET(_), PState) -> process_packet(?DISCONNECT_PACKET(_), PState) ->
{stop, normal, PState}. {stop, {shutdown, abnormal_disconnet}, PState}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% ConnAck --> Client %% ConnAck --> Client
@ -678,23 +673,13 @@ maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps})
maybe_assign_client_id(PState) -> maybe_assign_client_id(PState) ->
PState. PState.
try_open_session(PState = #pstate{zone = Zone, try_open_session(SessAttrs = #{zone := _,
client_id = ClientId, client_id := _,
conn_pid = ConnPid, conn_pid := _,
username = Username, username := _,
clean_start = CleanStart, will_msg := _,
will_msg = WillMsg}) -> clean_start := _}) ->
case emqx_sm:open_session(SessAttrs) of
SessAttrs = #{
zone => Zone,
client_id => ClientId,
conn_pid => ConnPid,
username => Username,
clean_start => CleanStart,
will_msg => WillMsg
},
SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}]),
case emqx_sm:open_session(SessAttrs1) of
{ok, SPid} -> {ok, SPid} ->
{ok, SPid, false}; {ok, SPid, false};
Other -> Other Other -> Other
@ -722,6 +707,17 @@ set_session_attrs({topic_alias_maximum, #pstate{proto_ver = ?MQTT_PROTO_V5, conn
set_session_attrs({topic_alias_maximum, #pstate{zone = Zone}}, SessAttrs) -> set_session_attrs({topic_alias_maximum, #pstate{zone = Zone}}, SessAttrs) ->
maps:put(topic_alias_maximum, emqx_zone:get_env(Zone, max_topic_alias, 0), SessAttrs); maps:put(topic_alias_maximum, emqx_zone:get_env(Zone, max_topic_alias, 0), SessAttrs);
set_session_attrs({misc, #pstate{zone = Zone,
client_id = ClientId,
conn_pid = ConnPid,
username = Username,
clean_start = CleanStart}}, SessAttrs) ->
SessAttrs#{zone => Zone,
client_id => ClientId,
conn_pid => ConnPid,
username => Username,
clean_start => CleanStart};
set_session_attrs(_, SessAttrs) -> set_session_attrs(_, SessAttrs) ->
SessAttrs. SessAttrs.

View File

@ -630,11 +630,21 @@ handle_info({'EXIT', ConnPid, Reason}, #state{conn_pid = ConnPid})
exit(Reason); exit(Reason);
handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) -> handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) ->
send_willmsg(WillMsg), case Reason of
normal ->
ignore;
_ ->
send_willmsg(WillMsg)
end,
{stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}}; {stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}};
handle_info({'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) -> handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) ->
State1 = ensure_will_delay_timer(State), State1 = case Reason of
normal ->
State#state{will_msg = undefined};
_ ->
ensure_will_delay_timer(State)
end,
{noreply, ensure_expire_timer(State1#state{conn_pid = undefined})}; {noreply, ensure_expire_timer(State1#state{conn_pid = undefined})};
handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) -> handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->

View File

@ -74,7 +74,6 @@
{socktype, _}, {socktype, _},
{topic_aliases, _}, {topic_aliases, _},
{username, _}, {username, _},
{will_msg, _},
{zone, _}]). {zone, _}]).
all() -> all() ->

View File

@ -50,8 +50,6 @@
clean_start, clean_start,
topic_aliases, topic_aliases,
packet_size, packet_size,
will_topic,
will_msg,
keepalive, keepalive,
mountpoint, mountpoint,
is_super, is_super,
@ -352,7 +350,7 @@ connect_v5(_) ->
will_props = #{'Will-Delay-Interval' => 5}, will_props = #{'Will-Delay-Interval' => 5},
will_topic = <<"TopicA">>, will_topic = <<"TopicA">>,
will_payload = <<"will message">>, will_payload = <<"will message">>,
properties = #{'Session-Expiry-Interval' => 3} properties = #{'Session-Expiry-Interval' => 0}
} }
) )
) )
@ -377,11 +375,11 @@ connect_v5(_) ->
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5), {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock, raw_send_serialize( emqx_client_sock:send(Sock, raw_send_serialize(
?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE) ?DISCONNECT_PACKET(?RC_SUCCESS)
) )
), ),
{error, timeout} = gen_tcp:recv(Sock2, 0, 1000), {error, timeout} = gen_tcp:recv(Sock2, 0, 2000),
% session resumed % session resumed
{ok, Sock3} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, {ok, Sock3} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
@ -403,18 +401,20 @@ connect_v5(_) ->
will_payload = <<"will message 2">>, will_payload = <<"will message 2">>,
properties = #{'Session-Expiry-Interval' => 3} properties = #{'Session-Expiry-Interval' => 3}
} }
) ),
#{version => ?MQTT_PROTO_V5}
) )
), ),
{ok, Data3} = gen_tcp:recv(Sock3, 0), {ok, Data3} = gen_tcp:recv(Sock3, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5), {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock3, raw_send_serialize( emqx_client_sock:send(Sock3, raw_send_serialize(
?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE) ?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE),
#{version => ?MQTT_PROTO_V5}
) )
), ),
{ok, WillData} = gen_tcp:recv(Sock2, 0), {ok, WillData} = gen_tcp:recv(Sock2, 0, 5000),
{ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5), {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5),
emqx_client_sock:close(Sock2) emqx_client_sock:close(Sock2)

View File

@ -56,7 +56,6 @@
{ack_props, _}, {ack_props, _},
{session, _}, {session, _},
{topic_aliases, _}, {topic_aliases, _},
{will_msg, _},
{enable_acl, _}]). {enable_acl, _}]).
-define(ATTRS, [{clean_start,true}, -define(ATTRS, [{clean_start,true},