fix(sessmem): avoid defining unsafe defaults in `#session`

This commit is contained in:
Andrew Mayorov 2023-12-01 11:06:00 +03:00
parent b5ad0f9815
commit 6255ee0833
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 14 additions and 10 deletions

View File

@ -26,7 +26,7 @@
%% Clients Subscriptions. %% Clients Subscriptions.
subscriptions :: map(), subscriptions :: map(),
%% Max subscriptions allowed %% Max subscriptions allowed
max_subscriptions = infinity :: non_neg_integer() | infinity, max_subscriptions :: non_neg_integer() | infinity,
%% Upgrade QoS? %% Upgrade QoS?
upgrade_qos = false :: boolean(), upgrade_qos = false :: boolean(),
%% Client <- Broker: QoS1/2 messages sent to the client but %% Client <- Broker: QoS1/2 messages sent to the client but
@ -40,14 +40,14 @@
%% Next packet id of the session %% Next packet id of the session
next_pkt_id = 1 :: emqx_types:packet_id(), next_pkt_id = 1 :: emqx_types:packet_id(),
%% Retry interval for redelivering QoS1/2 messages (Unit: millisecond) %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond)
retry_interval = 0 :: timeout(), retry_interval :: timeout(),
%% Client -> Broker: QoS2 messages received from the client, but %% Client -> Broker: QoS2 messages received from the client, but
%% have not been completely acknowledged %% have not been completely acknowledged
awaiting_rel :: map(), awaiting_rel :: map(),
%% Maximum number of awaiting QoS2 messages allowed %% Maximum number of awaiting QoS2 messages allowed
max_awaiting_rel = infinity :: non_neg_integer() | infinity, max_awaiting_rel :: non_neg_integer() | infinity,
%% Awaiting PUBREL Timeout (Unit: millisecond) %% Awaiting PUBREL Timeout (Unit: millisecond)
await_rel_timeout = 0 :: timeout(), await_rel_timeout :: timeout(),
%% Created at %% Created at
created_at :: pos_integer() created_at :: pos_integer()
}). }).

View File

@ -158,7 +158,7 @@ create(
Conf Conf
) -> ) ->
QueueOpts = get_mqueue_conf(Zone), QueueOpts = get_mqueue_conf(Zone),
Session = #session{ #session{
id = emqx_guid:gen(), id = emqx_guid:gen(),
clientid = ClientId, clientid = ClientId,
created_at = erlang:system_time(millisecond), created_at = erlang:system_time(millisecond),
@ -167,9 +167,13 @@ create(
inflight = emqx_inflight:new(ReceiveMax), inflight = emqx_inflight:new(ReceiveMax),
mqueue = emqx_mqueue:init(QueueOpts), mqueue = emqx_mqueue:init(QueueOpts),
next_pkt_id = 1, next_pkt_id = 1,
awaiting_rel = #{} awaiting_rel = #{},
}, max_subscriptions = maps:get(max_subscriptions, Conf),
preserve_conf(Conf, Session). max_awaiting_rel = maps:get(max_awaiting_rel, Conf),
upgrade_qos = maps:get(upgrade_qos, Conf),
retry_interval = maps:get(retry_interval, Conf),
await_rel_timeout = maps:get(await_rel_timeout, Conf)
}.
get_mqueue_conf(Zone) -> get_mqueue_conf(Zone) ->
#{ #{
@ -204,7 +208,7 @@ open(ClientInfo = #{clientid := ClientId}, ConnInfo, Conf) ->
case emqx_cm:takeover_session_end(TakeoverState) of case emqx_cm:takeover_session_end(TakeoverState) of
{ok, Pendings} -> {ok, Pendings} ->
Session1 = resize_inflight(ConnInfo, Session0), Session1 = resize_inflight(ConnInfo, Session0),
Session = preserve_conf(Conf, Session1), Session = apply_conf(Conf, Session1),
clean_session(ClientInfo, Session, Pendings); clean_session(ClientInfo, Session, Pendings);
{error, _} -> {error, _} ->
% TODO log error? % TODO log error?
@ -219,7 +223,7 @@ resize_inflight(#{receive_maximum := ReceiveMax}, Session = #session{inflight =
inflight = emqx_inflight:resize(ReceiveMax, Inflight) inflight = emqx_inflight:resize(ReceiveMax, Inflight)
}. }.
preserve_conf(Conf, Session = #session{}) -> apply_conf(Conf, Session = #session{}) ->
Session#session{ Session#session{
max_subscriptions = maps:get(max_subscriptions, Conf), max_subscriptions = maps:get(max_subscriptions, Conf),
max_awaiting_rel = maps:get(max_awaiting_rel, Conf), max_awaiting_rel = maps:get(max_awaiting_rel, Conf),