Improve topic alias maximum in connect packet
This commit is contained in:
parent
064db65206
commit
2d354ca883
|
@ -44,7 +44,7 @@
|
||||||
|
|
||||||
-define(PUBCAP_KEYS, [max_qos_allowed,
|
-define(PUBCAP_KEYS, [max_qos_allowed,
|
||||||
mqtt_retain_available,
|
mqtt_retain_available,
|
||||||
mqtt_topic_alias
|
max_topic_alias
|
||||||
]).
|
]).
|
||||||
-define(SUBCAP_KEYS, [max_qos_allowed,
|
-define(SUBCAP_KEYS, [max_qos_allowed,
|
||||||
max_topic_levels,
|
max_topic_levels,
|
||||||
|
|
|
@ -574,13 +574,11 @@ maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps})
|
||||||
maybe_assign_client_id(PState) ->
|
maybe_assign_client_id(PState) ->
|
||||||
PState.
|
PState.
|
||||||
|
|
||||||
try_open_session(#pstate{zone = Zone,
|
try_open_session(PState = #pstate{zone = Zone,
|
||||||
proto_ver = ProtoVer,
|
client_id = ClientId,
|
||||||
client_id = ClientId,
|
conn_pid = ConnPid,
|
||||||
conn_pid = ConnPid,
|
username = Username,
|
||||||
conn_props = ConnProps,
|
clean_start = CleanStart}) ->
|
||||||
username = Username,
|
|
||||||
clean_start = CleanStart}) ->
|
|
||||||
|
|
||||||
SessAttrs = #{
|
SessAttrs = #{
|
||||||
zone => Zone,
|
zone => Zone,
|
||||||
|
@ -590,30 +588,42 @@ try_open_session(#pstate{zone = Zone,
|
||||||
clean_start => CleanStart
|
clean_start => CleanStart
|
||||||
},
|
},
|
||||||
|
|
||||||
MaxInflight = #{max_inflight => if
|
SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]),
|
||||||
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
case emqx_sm:open_session(SessAttrs1) of
|
||||||
maps:get('Receive-Maximum', ConnProps, 65535);
|
|
||||||
true ->
|
|
||||||
emqx_zone:get_env(Zone, max_inflight, 65535)
|
|
||||||
end},
|
|
||||||
SessionExpiryInterval = #{expiry_interval => if
|
|
||||||
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
|
||||||
maps:get('Session-Expiry-Interval', ConnProps, 0);
|
|
||||||
true ->
|
|
||||||
case CleanStart of
|
|
||||||
true ->
|
|
||||||
0;
|
|
||||||
false ->
|
|
||||||
emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
|
|
||||||
end
|
|
||||||
end},
|
|
||||||
|
|
||||||
case emqx_sm:open_session(maps:merge(SessAttrs, maps:merge(MaxInflight, SessionExpiryInterval))) of
|
|
||||||
{ok, SPid} ->
|
{ok, SPid} ->
|
||||||
{ok, SPid, false};
|
{ok, SPid, false};
|
||||||
Other -> Other
|
Other -> Other
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
|
||||||
|
maps:put(max_inflight, if
|
||||||
|
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
||||||
|
maps:get('Receive-Maximum', ConnProps, 65535);
|
||||||
|
true ->
|
||||||
|
emqx_zone:get_env(Zone, max_inflight, 65535)
|
||||||
|
end, SessAttrs);
|
||||||
|
set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) ->
|
||||||
|
maps:put(expiry_interval, if
|
||||||
|
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
||||||
|
maps:get('Session-Expiry-Interval', ConnProps, 0);
|
||||||
|
true ->
|
||||||
|
case CleanStart of
|
||||||
|
true -> 0;
|
||||||
|
false ->
|
||||||
|
emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
|
||||||
|
end
|
||||||
|
end, SessAttrs);
|
||||||
|
set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
|
||||||
|
maps:put(topic_alias_maximum, if
|
||||||
|
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
||||||
|
maps:get('Topic-Alias-Maximum', ConnProps, 0);
|
||||||
|
true ->
|
||||||
|
emqx_zone:get_env(Zone, max_topic_alias, 0)
|
||||||
|
end, SessAttrs);
|
||||||
|
set_session_attrs({_, #pstate{}}, SessAttrs) ->
|
||||||
|
SessAttrs.
|
||||||
|
|
||||||
|
|
||||||
authenticate(Credentials, Password) ->
|
authenticate(Credentials, Password) ->
|
||||||
case emqx_access_control:authenticate(Credentials, Password) of
|
case emqx_access_control:authenticate(Credentials, Password) of
|
||||||
ok -> {ok, false};
|
ok -> {ok, false};
|
||||||
|
|
|
@ -47,7 +47,7 @@
|
||||||
-export([info/1, attrs/1]).
|
-export([info/1, attrs/1]).
|
||||||
-export([stats/1]).
|
-export([stats/1]).
|
||||||
-export([resume/2, discard/2]).
|
-export([resume/2, discard/2]).
|
||||||
-export([update_expiry_interval/2, update_max_inflight/2]).
|
-export([update_expiry_interval/2, update_misc/2]).
|
||||||
-export([subscribe/2, subscribe/4]).
|
-export([subscribe/2, subscribe/4]).
|
||||||
-export([publish/3]).
|
-export([publish/3]).
|
||||||
-export([puback/2, puback/3]).
|
-export([puback/2, puback/3]).
|
||||||
|
@ -145,7 +145,9 @@
|
||||||
enqueue_stats = 0,
|
enqueue_stats = 0,
|
||||||
|
|
||||||
%% Created at
|
%% Created at
|
||||||
created_at :: erlang:timestamp()
|
created_at :: erlang:timestamp(),
|
||||||
|
|
||||||
|
topic_alias_maximum :: pos_integer()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type(spid() :: pid()).
|
-type(spid() :: pid()).
|
||||||
|
@ -318,8 +320,8 @@ discard(SPid, ByPid) ->
|
||||||
update_expiry_interval(SPid, Interval) ->
|
update_expiry_interval(SPid, Interval) ->
|
||||||
gen_server:cast(SPid, {expiry_interval, Interval * 1000}).
|
gen_server:cast(SPid, {expiry_interval, Interval * 1000}).
|
||||||
|
|
||||||
update_max_inflight(SPid, MaxInflight) ->
|
update_misc(SPid, Misc) ->
|
||||||
gen_server:cast(SPid, {max_inflight, MaxInflight}).
|
gen_server:cast(SPid, {update_misc, Misc}).
|
||||||
|
|
||||||
-spec(close(spid()) -> ok).
|
-spec(close(spid()) -> ok).
|
||||||
close(SPid) ->
|
close(SPid) ->
|
||||||
|
@ -329,36 +331,38 @@ close(SPid) ->
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init([Parent, #{zone := Zone,
|
init([Parent, #{zone := Zone,
|
||||||
client_id := ClientId,
|
client_id := ClientId,
|
||||||
username := Username,
|
username := Username,
|
||||||
conn_pid := ConnPid,
|
conn_pid := ConnPid,
|
||||||
clean_start := CleanStart,
|
clean_start := CleanStart,
|
||||||
expiry_interval := ExpiryInterval,
|
expiry_interval := ExpiryInterval,
|
||||||
max_inflight := MaxInflight}]) ->
|
max_inflight := MaxInflight,
|
||||||
|
topic_alias_maximum := TopicAliasMaximum}]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
true = link(ConnPid),
|
true = link(ConnPid),
|
||||||
IdleTimout = get_env(Zone, idle_timeout, 30000),
|
IdleTimout = get_env(Zone, idle_timeout, 30000),
|
||||||
State = #state{idle_timeout = IdleTimout,
|
State = #state{idle_timeout = IdleTimout,
|
||||||
clean_start = CleanStart,
|
clean_start = CleanStart,
|
||||||
binding = binding(ConnPid),
|
binding = binding(ConnPid),
|
||||||
client_id = ClientId,
|
client_id = ClientId,
|
||||||
username = Username,
|
username = Username,
|
||||||
conn_pid = ConnPid,
|
conn_pid = ConnPid,
|
||||||
subscriptions = #{},
|
subscriptions = #{},
|
||||||
max_subscriptions = get_env(Zone, max_subscriptions, 0),
|
max_subscriptions = get_env(Zone, max_subscriptions, 0),
|
||||||
upgrade_qos = get_env(Zone, upgrade_qos, false),
|
upgrade_qos = get_env(Zone, upgrade_qos, false),
|
||||||
inflight = emqx_inflight:new(MaxInflight),
|
inflight = emqx_inflight:new(MaxInflight),
|
||||||
mqueue = init_mqueue(Zone),
|
mqueue = init_mqueue(Zone),
|
||||||
retry_interval = get_env(Zone, retry_interval, 0),
|
retry_interval = get_env(Zone, retry_interval, 0),
|
||||||
awaiting_rel = #{},
|
awaiting_rel = #{},
|
||||||
await_rel_timeout = get_env(Zone, await_rel_timeout),
|
await_rel_timeout = get_env(Zone, await_rel_timeout),
|
||||||
max_awaiting_rel = get_env(Zone, max_awaiting_rel),
|
max_awaiting_rel = get_env(Zone, max_awaiting_rel),
|
||||||
expiry_interval = ExpiryInterval,
|
expiry_interval = ExpiryInterval,
|
||||||
enable_stats = get_env(Zone, enable_stats, true),
|
enable_stats = get_env(Zone, enable_stats, true),
|
||||||
deliver_stats = 0,
|
deliver_stats = 0,
|
||||||
enqueue_stats = 0,
|
enqueue_stats = 0,
|
||||||
created_at = os:timestamp()
|
created_at = os:timestamp(),
|
||||||
|
topic_alias_maximum = TopicAliasMaximum
|
||||||
},
|
},
|
||||||
emqx_sm:register_session(ClientId, attrs(State)),
|
emqx_sm:register_session(ClientId, attrs(State)),
|
||||||
emqx_sm:set_session_stats(ClientId, stats(State)),
|
emqx_sm:set_session_stats(ClientId, stats(State)),
|
||||||
|
@ -546,8 +550,9 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
|
||||||
handle_cast({expiry_interval, Interval}, State) ->
|
handle_cast({expiry_interval, Interval}, State) ->
|
||||||
{noreply, State#state{expiry_interval = Interval}};
|
{noreply, State#state{expiry_interval = Interval}};
|
||||||
|
|
||||||
handle_cast({max_inflight, MaxInflight}, State) ->
|
handle_cast({update_misc, #{max_inflight := MaxInflight, topic_alias_maximum := TopicAliasMaximum}}, State) ->
|
||||||
{noreply, State#state{inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight)}};
|
{noreply, State#state{inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight),
|
||||||
|
topic_alias_maximum = TopicAliasMaximum}};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
|
emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
|
||||||
|
@ -560,15 +565,22 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->
|
||||||
end, State, Msgs)};
|
end, State, Msgs)};
|
||||||
|
|
||||||
%% Dispatch message
|
%% Dispatch message
|
||||||
handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) ->
|
handle_info({dispatch, Topic, Msg = #message{headers = Headers}},
|
||||||
noreply(case maps:find(Topic, SubMap) of
|
State = #state{subscriptions = SubMap, topic_alias_maximum = TopicAliasMaximum}) when is_record(Msg, message) ->
|
||||||
{ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
|
TopicAlias = maps:get('Topic-Alias', Headers, undefined),
|
||||||
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State);
|
if
|
||||||
{ok, #{nl := Nl, qos := QoS, rap := Rap}} ->
|
TopicAlias =:= undefined orelse TopicAlias =< TopicAliasMaximum ->
|
||||||
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State);
|
noreply(case maps:find(Topic, SubMap) of
|
||||||
error ->
|
{ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
|
||||||
dispatch(emqx_message:unset_flag(dup, Msg), State)
|
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State);
|
||||||
end);
|
{ok, #{nl := Nl, qos := QoS, rap := Rap}} ->
|
||||||
|
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State);
|
||||||
|
error ->
|
||||||
|
dispatch(emqx_message:unset_flag(dup, Msg), State)
|
||||||
|
end);
|
||||||
|
true ->
|
||||||
|
noreply(State)
|
||||||
|
end;
|
||||||
|
|
||||||
%% Do nothing if the client has been disconnected.
|
%% Do nothing if the client has been disconnected.
|
||||||
handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) ->
|
handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) ->
|
||||||
|
|
|
@ -56,11 +56,15 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid
|
||||||
end,
|
end,
|
||||||
emqx_sm_locker:trans(ClientId, CleanStart);
|
emqx_sm_locker:trans(ClientId, CleanStart);
|
||||||
|
|
||||||
open_session(SessAttrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid, max_inflight := MaxInflight}) ->
|
open_session(SessAttrs = #{clean_start := false,
|
||||||
|
client_id := ClientId,
|
||||||
|
conn_pid := ConnPid,
|
||||||
|
max_inflight := MaxInflight,
|
||||||
|
topic_alias_maximum := TopicAliasMaximum}) ->
|
||||||
ResumeStart = fun(_) ->
|
ResumeStart = fun(_) ->
|
||||||
case resume_session(ClientId, ConnPid) of
|
case resume_session(ClientId, ConnPid) of
|
||||||
{ok, SPid} ->
|
{ok, SPid} ->
|
||||||
emqx_session:update_max_inflight(SPid, MaxInflight),
|
emqx_session:update_misc(SPid, #{max_inflight => MaxInflight, topic_alias_maximum => TopicAliasMaximum}),
|
||||||
{ok, SPid, true};
|
{ok, SPid, true};
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
emqx_session_sup:start_session(SessAttrs)
|
emqx_session_sup:start_session(SessAttrs)
|
||||||
|
|
|
@ -52,7 +52,8 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
|
||||||
clean_start => true,
|
clean_start => true,
|
||||||
username => undefined,
|
username => undefined,
|
||||||
expiry_interval => 0,
|
expiry_interval => 0,
|
||||||
max_inflight => 0
|
max_inflight => 0,
|
||||||
|
topic_alias_maximum => 0
|
||||||
},
|
},
|
||||||
{ok, SessPid} = emqx_sm:open_session(Attrs),
|
{ok, SessPid} = emqx_sm:open_session(Attrs),
|
||||||
{reply, {ok, SessPid},
|
{reply, {ok, SessPid},
|
||||||
|
|
|
@ -44,7 +44,8 @@ t_get_set_caps(_) ->
|
||||||
end,
|
end,
|
||||||
PubCaps = #{
|
PubCaps = #{
|
||||||
max_qos_allowed => ?QOS_2,
|
max_qos_allowed => ?QOS_2,
|
||||||
mqtt_retain_available => true
|
mqtt_retain_available => true,
|
||||||
|
max_topic_alias => 0
|
||||||
},
|
},
|
||||||
PubCaps = emqx_mqtt_caps:get_caps(zone, publish),
|
PubCaps = emqx_mqtt_caps:get_caps(zone, publish),
|
||||||
NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1},
|
NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1},
|
||||||
|
|
|
@ -25,7 +25,7 @@ t_open_close_session(_) ->
|
||||||
emqx_ct_broker_helpers:run_setup_steps(),
|
emqx_ct_broker_helpers:run_setup_steps(),
|
||||||
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
|
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
|
||||||
Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid,
|
Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid,
|
||||||
zone => internal, username => <<"zhou">>, expiry_interval => 0, max_inflight => 0},
|
zone => internal, username => <<"zhou">>, expiry_interval => 0, max_inflight => 0, topic_alias_maximum => 0},
|
||||||
{ok, SPid} = emqx_sm:open_session(Attrs),
|
{ok, SPid} = emqx_sm:open_session(Attrs),
|
||||||
[{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
|
[{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
|
||||||
SPid = emqx_sm:lookup_session_pid(<<"client">>),
|
SPid = emqx_sm:lookup_session_pid(<<"client">>),
|
||||||
|
|
Loading…
Reference in New Issue