diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 19a1dd838..9b9ef7f7d 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -21,6 +21,7 @@ -export([init/2]). -export([info/1]). -export([attrs/1]). +-export([attr/2]). -export([caps/1]). -export([stats/1]). -export([client_id/1]). @@ -162,6 +163,28 @@ attrs(#pstate{zone = Zone, {is_bridge, IsBridge}, {connected_at, ConnectedAt}]. +attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> + get_property('Receive-Maximum', ConnProps, 65535); +attr(max_inflight, #pstate{zone = Zone}) -> + emqx_zone:get_env(Zone, max_inflight, 65535); +attr(expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> + get_property('Session-Expiry-Interval', ConnProps, 0); +attr(expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}) -> + case CleanStart of + true -> 0; + false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) + end; +attr(topic_alias_maximum, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> + get_property('Topic-Alias-Maximum', ConnProps, 0); +attr(topic_alias_maximum, #pstate{zone = Zone}) -> + emqx_zone:get_env(Zone, max_topic_alias, 0); +attr(Name, PState) -> + Attrs = lists:zip(record_info(fields, pstate), tl(tuple_to_list(PState))), + case lists:keyfind(Name, 1, Attrs) of + {_, Value} -> Value; + false -> undefined + end. + caps(#pstate{zone = Zone}) -> emqx_mqtt_caps:get_caps(Zone). @@ -348,8 +371,8 @@ process_packet(?CONNECT_PACKET( PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}), emqx_logger:set_metadata_client_id(PState3#pstate.client_id), %% Open session - SessAttrs = lists:foldl(fun set_session_attrs/2, #{will_msg => make_will_msg(ConnPkt)}, [{max_inflight, PState3}, {expiry_interval, PState3}, {misc, PState3}]), - case try_open_session(SessAttrs) of + SessAttrs = #{will_msg => make_will_msg(ConnPkt)}, + case try_open_session(SessAttrs, PState3) of {ok, SPid, SP} -> PState4 = PState3#pstate{session = SPid, connected = true}, ok = emqx_cm:register_connection(client_id(PState4)), @@ -673,54 +696,26 @@ maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps}) maybe_assign_client_id(PState) -> PState. -try_open_session(SessAttrs = #{zone := _, - client_id := _, - conn_pid := _, - username := _, - will_msg := _, - clean_start := _}) -> - case emqx_sm:open_session(SessAttrs) of +try_open_session(SessAttrs, PState = #pstate{zone = Zone, + client_id = ClientId, + conn_pid = ConnPid, + username = Username, + clean_start = CleanStart}) -> + case emqx_sm:open_session( + maps:merge(#{zone => Zone, + client_id => ClientId, + conn_pid => ConnPid, + username => Username, + clean_start => CleanStart, + max_inflight => attr(max_inflight, PState), + expiry_interval => attr(expiry_interval, PState), + topic_alias_maximum => attr(topic_alias_maximum, PState)}, + SessAttrs)) of {ok, SPid} -> {ok, SPid, false}; Other -> Other end. - -set_session_attrs({max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) -> - maps:put(max_inflight, get_property('Receive-Maximum', ConnProps, 65535), SessAttrs); - -set_session_attrs({max_inflight, #pstate{zone = Zone}}, SessAttrs) -> - maps:put(max_inflight, emqx_zone:get_env(Zone, max_inflight, 65535), SessAttrs); - -set_session_attrs({expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) -> - maps:put(expiry_interval, get_property('Session-Expiry-Interval', ConnProps, 0), SessAttrs); - -set_session_attrs({expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}}, SessAttrs) -> - maps:put(expiry_interval, case CleanStart of - true -> 0; - false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) - end, SessAttrs); - -set_session_attrs({topic_alias_maximum, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) -> - maps:put(topic_alias_maximum, get_property('Topic-Alias-Maximum', ConnProps, 0), SessAttrs); - -set_session_attrs({topic_alias_maximum, #pstate{zone = Zone}}, SessAttrs) -> - maps:put(topic_alias_maximum, emqx_zone:get_env(Zone, max_topic_alias, 0), SessAttrs); - -set_session_attrs({misc, #pstate{zone = Zone, - client_id = ClientId, - conn_pid = ConnPid, - username = Username, - clean_start = CleanStart}}, SessAttrs) -> - SessAttrs#{zone => Zone, - client_id => ClientId, - conn_pid => ConnPid, - username => Username, - clean_start => CleanStart}; - -set_session_attrs(_, SessAttrs) -> - SessAttrs. - authenticate(Credentials, Password) -> case emqx_access_control:authenticate(Credentials, Password) of ok -> {ok, false}; @@ -978,3 +973,4 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) -> undefined; reason_codes_compat(PktType, ReasonCodes, _ProtoVer) -> [emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes]. +