Improve the 'try_open_session' function
This commit is contained in:
parent
bb9c41c9f0
commit
d5b17c516e
|
@ -21,6 +21,7 @@
|
||||||
-export([init/2]).
|
-export([init/2]).
|
||||||
-export([info/1]).
|
-export([info/1]).
|
||||||
-export([attrs/1]).
|
-export([attrs/1]).
|
||||||
|
-export([attr/2]).
|
||||||
-export([caps/1]).
|
-export([caps/1]).
|
||||||
-export([stats/1]).
|
-export([stats/1]).
|
||||||
-export([client_id/1]).
|
-export([client_id/1]).
|
||||||
|
@ -162,6 +163,28 @@ attrs(#pstate{zone = Zone,
|
||||||
{is_bridge, IsBridge},
|
{is_bridge, IsBridge},
|
||||||
{connected_at, ConnectedAt}].
|
{connected_at, ConnectedAt}].
|
||||||
|
|
||||||
|
attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
|
||||||
|
get_property('Receive-Maximum', ConnProps, 65535);
|
||||||
|
attr(max_inflight, #pstate{zone = Zone}) ->
|
||||||
|
emqx_zone:get_env(Zone, max_inflight, 65535);
|
||||||
|
attr(expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
|
||||||
|
get_property('Session-Expiry-Interval', ConnProps, 0);
|
||||||
|
attr(expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}) ->
|
||||||
|
case CleanStart of
|
||||||
|
true -> 0;
|
||||||
|
false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
|
||||||
|
end;
|
||||||
|
attr(topic_alias_maximum, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
|
||||||
|
get_property('Topic-Alias-Maximum', ConnProps, 0);
|
||||||
|
attr(topic_alias_maximum, #pstate{zone = Zone}) ->
|
||||||
|
emqx_zone:get_env(Zone, max_topic_alias, 0);
|
||||||
|
attr(Name, PState) ->
|
||||||
|
Attrs = lists:zip(record_info(fields, pstate), tl(tuple_to_list(PState))),
|
||||||
|
case lists:keyfind(Name, 1, Attrs) of
|
||||||
|
{_, Value} -> Value;
|
||||||
|
false -> undefined
|
||||||
|
end.
|
||||||
|
|
||||||
caps(#pstate{zone = Zone}) ->
|
caps(#pstate{zone = Zone}) ->
|
||||||
emqx_mqtt_caps:get_caps(Zone).
|
emqx_mqtt_caps:get_caps(Zone).
|
||||||
|
|
||||||
|
@ -348,8 +371,8 @@ process_packet(?CONNECT_PACKET(
|
||||||
PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}),
|
PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}),
|
||||||
emqx_logger:set_metadata_client_id(PState3#pstate.client_id),
|
emqx_logger:set_metadata_client_id(PState3#pstate.client_id),
|
||||||
%% Open session
|
%% Open session
|
||||||
SessAttrs = lists:foldl(fun set_session_attrs/2, #{will_msg => make_will_msg(ConnPkt)}, [{max_inflight, PState3}, {expiry_interval, PState3}, {misc, PState3}]),
|
SessAttrs = #{will_msg => make_will_msg(ConnPkt)},
|
||||||
case try_open_session(SessAttrs) of
|
case try_open_session(SessAttrs, PState3) of
|
||||||
{ok, SPid, SP} ->
|
{ok, SPid, SP} ->
|
||||||
PState4 = PState3#pstate{session = SPid, connected = true},
|
PState4 = PState3#pstate{session = SPid, connected = true},
|
||||||
ok = emqx_cm:register_connection(client_id(PState4)),
|
ok = emqx_cm:register_connection(client_id(PState4)),
|
||||||
|
@ -673,54 +696,26 @@ maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps})
|
||||||
maybe_assign_client_id(PState) ->
|
maybe_assign_client_id(PState) ->
|
||||||
PState.
|
PState.
|
||||||
|
|
||||||
try_open_session(SessAttrs = #{zone := _,
|
try_open_session(SessAttrs, PState = #pstate{zone = Zone,
|
||||||
client_id := _,
|
client_id = ClientId,
|
||||||
conn_pid := _,
|
conn_pid = ConnPid,
|
||||||
username := _,
|
username = Username,
|
||||||
will_msg := _,
|
clean_start = CleanStart}) ->
|
||||||
clean_start := _}) ->
|
case emqx_sm:open_session(
|
||||||
case emqx_sm:open_session(SessAttrs) of
|
maps:merge(#{zone => Zone,
|
||||||
|
client_id => ClientId,
|
||||||
|
conn_pid => ConnPid,
|
||||||
|
username => Username,
|
||||||
|
clean_start => CleanStart,
|
||||||
|
max_inflight => attr(max_inflight, PState),
|
||||||
|
expiry_interval => attr(expiry_interval, PState),
|
||||||
|
topic_alias_maximum => attr(topic_alias_maximum, PState)},
|
||||||
|
SessAttrs)) of
|
||||||
{ok, SPid} ->
|
{ok, SPid} ->
|
||||||
{ok, SPid, false};
|
{ok, SPid, false};
|
||||||
Other -> Other
|
Other -> Other
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
set_session_attrs({max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) ->
|
|
||||||
maps:put(max_inflight, get_property('Receive-Maximum', ConnProps, 65535), SessAttrs);
|
|
||||||
|
|
||||||
set_session_attrs({max_inflight, #pstate{zone = Zone}}, SessAttrs) ->
|
|
||||||
maps:put(max_inflight, emqx_zone:get_env(Zone, max_inflight, 65535), SessAttrs);
|
|
||||||
|
|
||||||
set_session_attrs({expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) ->
|
|
||||||
maps:put(expiry_interval, get_property('Session-Expiry-Interval', ConnProps, 0), SessAttrs);
|
|
||||||
|
|
||||||
set_session_attrs({expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}}, SessAttrs) ->
|
|
||||||
maps:put(expiry_interval, case CleanStart of
|
|
||||||
true -> 0;
|
|
||||||
false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
|
|
||||||
end, SessAttrs);
|
|
||||||
|
|
||||||
set_session_attrs({topic_alias_maximum, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) ->
|
|
||||||
maps:put(topic_alias_maximum, get_property('Topic-Alias-Maximum', ConnProps, 0), SessAttrs);
|
|
||||||
|
|
||||||
set_session_attrs({topic_alias_maximum, #pstate{zone = Zone}}, SessAttrs) ->
|
|
||||||
maps:put(topic_alias_maximum, emqx_zone:get_env(Zone, max_topic_alias, 0), SessAttrs);
|
|
||||||
|
|
||||||
set_session_attrs({misc, #pstate{zone = Zone,
|
|
||||||
client_id = ClientId,
|
|
||||||
conn_pid = ConnPid,
|
|
||||||
username = Username,
|
|
||||||
clean_start = CleanStart}}, SessAttrs) ->
|
|
||||||
SessAttrs#{zone => Zone,
|
|
||||||
client_id => ClientId,
|
|
||||||
conn_pid => ConnPid,
|
|
||||||
username => Username,
|
|
||||||
clean_start => CleanStart};
|
|
||||||
|
|
||||||
set_session_attrs(_, SessAttrs) ->
|
|
||||||
SessAttrs.
|
|
||||||
|
|
||||||
authenticate(Credentials, Password) ->
|
authenticate(Credentials, Password) ->
|
||||||
case emqx_access_control:authenticate(Credentials, Password) of
|
case emqx_access_control:authenticate(Credentials, Password) of
|
||||||
ok -> {ok, false};
|
ok -> {ok, false};
|
||||||
|
@ -978,3 +973,4 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) ->
|
||||||
undefined;
|
undefined;
|
||||||
reason_codes_compat(PktType, ReasonCodes, _ProtoVer) ->
|
reason_codes_compat(PktType, ReasonCodes, _ProtoVer) ->
|
||||||
[emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes].
|
[emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes].
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue