Remove 'topic_alias_maximum' from session's state
This commit is contained in:
parent
b7a39f25f2
commit
dc06c0beab
|
@ -619,14 +619,12 @@ maybe_use_username_as_clientid(ClientId, undefined, _PState) ->
|
|||
ClientId;
|
||||
maybe_use_username_as_clientid(ClientId, Username, #pstate{zone = Zone}) ->
|
||||
case emqx_zone:get_env(Zone, use_username_as_clientid, false) of
|
||||
true ->
|
||||
Username;
|
||||
false ->
|
||||
ClientId
|
||||
true -> Username;
|
||||
false -> ClientId
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Assign a clientid
|
||||
%% Assign a clientId
|
||||
|
||||
maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps}) ->
|
||||
ClientId = emqx_guid:to_base62(emqx_guid:gen()),
|
||||
|
@ -650,41 +648,30 @@ try_open_session(PState = #pstate{zone = Zone,
|
|||
clean_start => CleanStart,
|
||||
will_msg => WillMsg
|
||||
},
|
||||
SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]),
|
||||
SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}]),
|
||||
case emqx_sm:open_session(SessAttrs1) of
|
||||
{ok, SPid} ->
|
||||
{ok, SPid, false};
|
||||
Other -> Other
|
||||
end.
|
||||
|
||||
set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
|
||||
maps:put(max_inflight, if
|
||||
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
||||
get_property('Receive-Maximum', ConnProps, 65535);
|
||||
true ->
|
||||
emqx_zone:get_env(Zone, max_inflight, 65535)
|
||||
end, SessAttrs);
|
||||
set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) ->
|
||||
maps:put(expiry_interval, if
|
||||
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
||||
get_property('Session-Expiry-Interval', ConnProps, 0);
|
||||
true ->
|
||||
case CleanStart of
|
||||
true -> 0;
|
||||
false ->
|
||||
emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
|
||||
end
|
||||
end, SessAttrs);
|
||||
set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
|
||||
maps:put(topic_alias_maximum, if
|
||||
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
||||
get_property('Topic-Alias-Maximum', ConnProps, 0);
|
||||
true ->
|
||||
emqx_zone:get_env(Zone, max_topic_alias, 0)
|
||||
end, SessAttrs);
|
||||
set_session_attrs({_, #pstate{}}, SessAttrs) ->
|
||||
SessAttrs.
|
||||
set_session_attrs({max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) ->
|
||||
maps:put(max_inflight, get_property('Receive-Maximum', ConnProps, 65535), SessAttrs);
|
||||
|
||||
set_session_attrs({max_inflight, #pstate{zone = Zone}}, SessAttrs) ->
|
||||
maps:put(max_inflight, emqx_zone:get_env(Zone, max_inflight, 65535), SessAttrs);
|
||||
|
||||
set_session_attrs({expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) ->
|
||||
maps:put(expiry_interval, get_property('Session-Expiry-Interval', ConnProps, 0), SessAttrs);
|
||||
|
||||
set_session_attrs({expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}}, SessAttrs) ->
|
||||
maps:put(expiry_interval, case CleanStart of
|
||||
true -> 0;
|
||||
false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
|
||||
end, SessAttrs);
|
||||
|
||||
set_session_attrs(_, SessAttrs) ->
|
||||
SessAttrs.
|
||||
|
||||
authenticate(Credentials, Password) ->
|
||||
case emqx_access_control:authenticate(Credentials, Password) of
|
||||
|
|
|
@ -147,8 +147,6 @@
|
|||
%% Created at
|
||||
created_at :: erlang:timestamp(),
|
||||
|
||||
topic_alias_maximum :: pos_integer(),
|
||||
|
||||
will_msg :: emqx:message(),
|
||||
|
||||
will_delay_timer :: reference() | undefined
|
||||
|
@ -341,7 +339,6 @@ init([Parent, #{zone := Zone,
|
|||
clean_start := CleanStart,
|
||||
expiry_interval := ExpiryInterval,
|
||||
max_inflight := MaxInflight,
|
||||
topic_alias_maximum := TopicAliasMaximum,
|
||||
will_msg := WillMsg}]) ->
|
||||
emqx_logger:set_metadata_client_id(ClientId),
|
||||
process_flag(trap_exit, true),
|
||||
|
@ -367,7 +364,6 @@ init([Parent, #{zone := Zone,
|
|||
deliver_stats = 0,
|
||||
enqueue_stats = 0,
|
||||
created_at = os:timestamp(),
|
||||
topic_alias_maximum = TopicAliasMaximum,
|
||||
will_msg = WillMsg
|
||||
},
|
||||
emqx_sm:register_session(ClientId, attrs(State)),
|
||||
|
@ -518,22 +514,23 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
|
|||
end;
|
||||
|
||||
%% RESUME:
|
||||
handle_cast({resume, #{conn_pid := ConnPid,
|
||||
will_msg := WillMsg,
|
||||
expiry_interval := SessionExpiryInterval,
|
||||
max_inflight := MaxInflight,
|
||||
topic_alias_maximum := TopicAliasMaximum}}, State = #state{client_id = ClientId,
|
||||
conn_pid = OldConnPid,
|
||||
clean_start = CleanStart,
|
||||
retry_timer = RetryTimer,
|
||||
await_rel_timer = AwaitTimer,
|
||||
expiry_timer = ExpireTimer,
|
||||
will_delay_timer = WillDelayTimer}) ->
|
||||
handle_cast({resume, #{conn_pid := ConnPid,
|
||||
will_msg := WillMsg,
|
||||
expiry_interval := ExpiryInterval,
|
||||
max_inflight := MaxInflight}},
|
||||
State = #state{client_id = ClientId,
|
||||
conn_pid = OldConnPid,
|
||||
clean_start = CleanStart,
|
||||
retry_timer = RetryTimer,
|
||||
await_rel_timer = AwaitTimer,
|
||||
expiry_timer = ExpireTimer,
|
||||
will_delay_timer = WillDelayTimer}) ->
|
||||
|
||||
?LOG(info, "Resumed by connection ~p ", [ConnPid], State),
|
||||
|
||||
%% Cancel Timers
|
||||
lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]),
|
||||
lists:foreach(fun emqx_misc:cancel_timer/1,
|
||||
[RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]),
|
||||
|
||||
case kick(ClientId, OldConnPid, ConnPid) of
|
||||
ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State);
|
||||
|
@ -542,19 +539,18 @@ handle_cast({resume, #{conn_pid := ConnPid,
|
|||
|
||||
true = link(ConnPid),
|
||||
|
||||
State1 = State#state{conn_pid = ConnPid,
|
||||
binding = binding(ConnPid),
|
||||
old_conn_pid = OldConnPid,
|
||||
clean_start = false,
|
||||
retry_timer = undefined,
|
||||
awaiting_rel = #{},
|
||||
await_rel_timer = undefined,
|
||||
expiry_timer = undefined,
|
||||
expiry_interval = SessionExpiryInterval,
|
||||
inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight),
|
||||
topic_alias_maximum = TopicAliasMaximum,
|
||||
will_delay_timer = undefined,
|
||||
will_msg = WillMsg},
|
||||
State1 = State#state{conn_pid = ConnPid,
|
||||
binding = binding(ConnPid),
|
||||
old_conn_pid = OldConnPid,
|
||||
clean_start = false,
|
||||
retry_timer = undefined,
|
||||
awaiting_rel = #{},
|
||||
await_rel_timer = undefined,
|
||||
expiry_timer = undefined,
|
||||
expiry_interval = ExpiryInterval,
|
||||
inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight),
|
||||
will_delay_timer = undefined,
|
||||
will_msg = WillMsg},
|
||||
|
||||
%% Clean Session: true -> false???
|
||||
CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
|
||||
|
@ -573,9 +569,10 @@ handle_cast(Msg, State) ->
|
|||
|
||||
%% Batch dispatch
|
||||
handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->
|
||||
{noreply, lists:foldl(fun(Msg, NewState) ->
|
||||
element(2, handle_info({dispatch, Topic, Msg}, NewState))
|
||||
end, State, Msgs)};
|
||||
noreply(lists:foldl(
|
||||
fun(Msg, St) ->
|
||||
element(2, handle_info({dispatch, Topic, Msg}, St))
|
||||
end, State, Msgs));
|
||||
|
||||
%% Dispatch message
|
||||
handle_info({dispatch, Topic, Msg = #message{}}, State) ->
|
||||
|
@ -584,12 +581,11 @@ handle_info({dispatch, Topic, Msg = #message{}}, State) ->
|
|||
%% Require ack, but we do not have connection
|
||||
%% negative ack the message so it can try the next subscriber in the group
|
||||
ok = emqx_shared_sub:nack_no_connection(Msg),
|
||||
noreply(State);
|
||||
{noreply, State};
|
||||
false ->
|
||||
handle_dispatch(Topic, Msg, State)
|
||||
noreply(handle_dispatch(Topic, Msg, State))
|
||||
end;
|
||||
|
||||
|
||||
%% Do nothing if the client has been disconnected.
|
||||
handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) ->
|
||||
noreply(State#state{retry_timer = undefined});
|
||||
|
@ -663,25 +659,17 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
has_connection(#state{conn_pid = Pid}) -> is_pid(Pid) andalso is_process_alive(Pid).
|
||||
has_connection(#state{conn_pid = Pid}) ->
|
||||
is_pid(Pid) andalso is_process_alive(Pid).
|
||||
|
||||
handle_dispatch(Topic, Msg = #message{headers = Headers},
|
||||
State = #state{subscriptions = SubMap,
|
||||
topic_alias_maximum = TopicAliasMaximum
|
||||
}) ->
|
||||
TopicAlias = maps:get('Topic-Alias', Headers, undefined),
|
||||
if
|
||||
TopicAlias =:= undefined orelse TopicAlias =< TopicAliasMaximum ->
|
||||
noreply(case maps:find(Topic, SubMap) of
|
||||
{ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
|
||||
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State);
|
||||
{ok, #{nl := Nl, qos := QoS, rap := Rap}} ->
|
||||
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State);
|
||||
error ->
|
||||
dispatch(emqx_message:unset_flag(dup, Msg), State)
|
||||
end);
|
||||
true ->
|
||||
noreply(State)
|
||||
handle_dispatch(Topic, Msg, State = #state{subscriptions = SubMap}) ->
|
||||
case maps:find(Topic, SubMap) of
|
||||
{ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
|
||||
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State);
|
||||
{ok, #{nl := Nl, qos := QoS, rap := Rap}} ->
|
||||
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State);
|
||||
error ->
|
||||
dispatch(emqx_message:unset_flag(dup, Msg), State)
|
||||
end.
|
||||
|
||||
suback(_From, undefined, _ReasonCodes) ->
|
||||
|
@ -1011,3 +999,4 @@ noreply(State) ->
|
|||
|
||||
shutdown(Reason, State) ->
|
||||
{stop, {shutdown, Reason}, State}.
|
||||
|
||||
|
|
Loading…
Reference in New Issue