Merge branch 'improve_connect' into emqx30

This commit is contained in:
tigercl 2018-10-09 18:03:22 +08:00 committed by GitHub
commit 40251a034c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 11 deletions

View File

@ -260,6 +260,7 @@ process_packet(?CONNECT_PACKET(
clean_start = CleanStart, clean_start = CleanStart,
keepalive = Keepalive, keepalive = Keepalive,
properties = ConnProps, properties = ConnProps,
will_props = WillProps,
will_topic = WillTopic, will_topic = WillTopic,
client_id = ClientId, client_id = ClientId,
username = Username, username = Username,
@ -267,7 +268,16 @@ process_packet(?CONNECT_PACKET(
%% TODO: Mountpoint... %% TODO: Mountpoint...
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg) %% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
WillMsg = emqx_packet:will_msg(Connect), Connect1 = if
ProtoVer =:= ?MQTT_PROTO_V5 ->
WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0),
SessionExpiryInterval = get_property('Session-Expiry-Interval', ConnProps, 0),
WillProps1 = set_property('Will-Delay-Interval', erlang:min(SessionExpiryInterval, WillDelayInterval), WillProps),
Connect#mqtt_packet_connect{will_props = WillProps1};
true ->
Connect
end,
WillMsg = emqx_packet:will_msg(Connect1),
PState1 = set_username(Username, PState1 = set_username(Username,
PState#pstate{client_id = ClientId, PState#pstate{client_id = ClientId,
@ -600,14 +610,14 @@ try_open_session(PState = #pstate{zone = Zone,
set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) -> set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
maps:put(max_inflight, if maps:put(max_inflight, if
ProtoVer =:= ?MQTT_PROTO_V5 -> ProtoVer =:= ?MQTT_PROTO_V5 ->
maps:get('Receive-Maximum', ConnProps, 65535); get_property('Receive-Maximum', ConnProps, 65535);
true -> true ->
emqx_zone:get_env(Zone, max_inflight, 65535) emqx_zone:get_env(Zone, max_inflight, 65535)
end, SessAttrs); end, SessAttrs);
set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) -> set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) ->
maps:put(expiry_interval, if maps:put(expiry_interval, if
ProtoVer =:= ?MQTT_PROTO_V5 -> ProtoVer =:= ?MQTT_PROTO_V5 ->
maps:get('Session-Expiry-Interval', ConnProps, 0); get_property('Session-Expiry-Interval', ConnProps, 0);
true -> true ->
case CleanStart of case CleanStart of
true -> 0; true -> 0;
@ -618,7 +628,7 @@ set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, c
set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) -> set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
maps:put(topic_alias_maximum, if maps:put(topic_alias_maximum, if
ProtoVer =:= ?MQTT_PROTO_V5 -> ProtoVer =:= ?MQTT_PROTO_V5 ->
maps:get('Topic-Alias-Maximum', ConnProps, 0); get_property('Topic-Alias-Maximum', ConnProps, 0);
true -> true ->
emqx_zone:get_env(Zone, max_topic_alias, 0) emqx_zone:get_env(Zone, max_topic_alias, 0)
end, SessAttrs); end, SessAttrs);
@ -642,6 +652,11 @@ set_property(Name, Value, undefined) ->
set_property(Name, Value, Props) -> set_property(Name, Value, Props) ->
Props#{Name => Value}. Props#{Name => Value}.
get_property(_Name, undefined, Default) ->
Default;
get_property(Name, Props, Default) ->
maps:get(Name, Props, Default).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Check Packet %% Check Packet
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -777,20 +792,22 @@ shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict;
emqx_cm:unregister_connection(ClientId); emqx_cm:unregister_connection(ClientId);
shutdown(Reason, PState = #pstate{connected = true, shutdown(Reason, PState = #pstate{connected = true,
client_id = ClientId, client_id = ClientId,
will_msg = WillMsg}) -> will_msg = WillMsg,
session = Session}) ->
?LOG(info, "Shutdown for ~p", [Reason], PState), ?LOG(info, "Shutdown for ~p", [Reason], PState),
_ = send_willmsg(WillMsg), _ = send_willmsg(WillMsg, Session),
emqx_hooks:run('client.disconnected', [credentials(PState), Reason]), emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
emqx_cm:unregister_connection(ClientId). emqx_cm:unregister_connection(ClientId).
send_willmsg(undefined) -> send_willmsg(undefined, _Session) ->
ignore; ignore;
send_willmsg(WillMsg = #message{topic = Topic, send_willmsg(WillMsg = #message{topic = Topic,
headers = #{'Will-Delay-Interval' := Interval}}) headers = #{'Will-Delay-Interval' := Interval}}, Session)
when is_integer(Interval), Interval > 0 -> when is_integer(Interval), Interval > 0 ->
SendAfter = integer_to_binary(Interval), SendAfter = integer_to_binary(Interval),
emqx_broker:publish(WillMsg#message{topic = <<"$delayed/", SendAfter/binary, "/", Topic/binary>>}); Session1 = list_to_binary(pid_to_list(Session)),
send_willmsg(WillMsg) -> emqx_broker:publish(WillMsg#message{topic = <<"$will/", Session1/binary, "/", SendAfter/binary, "/", Topic/binary>>});
send_willmsg(WillMsg, _Session) ->
emqx_broker:publish(WillMsg). emqx_broker:publish(WillMsg).
start_keepalive(0, _PState) -> start_keepalive(0, _PState) ->

View File

@ -542,7 +542,7 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
%% Clean Session: true -> false??? %% Clean Session: true -> false???
CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)), CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]), emqx_hooks:run('session.resumed', [#{client_id => ClientId, session => self()}, attrs(State)]),
%% Replay delivery and Dequeue pending messages %% Replay delivery and Dequeue pending messages
noreply(dequeue(retry_delivery(true, State1))); noreply(dequeue(retry_delivery(true, State1)));