diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index b8b7a5b3a..5f919d9c0 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -44,7 +44,7 @@ -define(PUBCAP_KEYS, [max_qos_allowed, mqtt_retain_available, - mqtt_topic_alias + max_topic_alias ]). -define(SUBCAP_KEYS, [max_qos_allowed, max_topic_levels, diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 753dc099b..88f07138d 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -574,13 +574,11 @@ maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) maybe_assign_client_id(PState) -> PState. -try_open_session(#pstate{zone = Zone, - proto_ver = ProtoVer, - client_id = ClientId, - conn_pid = ConnPid, - conn_props = ConnProps, - username = Username, - clean_start = CleanStart}) -> +try_open_session(PState = #pstate{zone = Zone, + client_id = ClientId, + conn_pid = ConnPid, + username = Username, + clean_start = CleanStart}) -> SessAttrs = #{ zone => Zone, @@ -590,30 +588,42 @@ try_open_session(#pstate{zone = Zone, clean_start => CleanStart }, - MaxInflight = #{max_inflight => if - ProtoVer =:= ?MQTT_PROTO_V5 -> - maps:get('Receive-Maximum', ConnProps, 65535); - true -> - emqx_zone:get_env(Zone, max_inflight, 65535) - end}, - SessionExpiryInterval = #{expiry_interval => if - ProtoVer =:= ?MQTT_PROTO_V5 -> - maps:get('Session-Expiry-Interval', ConnProps, 0); - true -> - case CleanStart of - true -> - 0; - false -> - emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) - end - end}, - - case emqx_sm:open_session(maps:merge(SessAttrs, maps:merge(MaxInflight, SessionExpiryInterval))) of + SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]), + case emqx_sm:open_session(SessAttrs1) of {ok, SPid} -> {ok, SPid, false}; Other -> Other end. +set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) -> + maps:put(max_inflight, if + ProtoVer =:= ?MQTT_PROTO_V5 -> + maps:get('Receive-Maximum', ConnProps, 65535); + true -> + emqx_zone:get_env(Zone, max_inflight, 65535) + end, SessAttrs); +set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) -> + maps:put(expiry_interval, if + ProtoVer =:= ?MQTT_PROTO_V5 -> + maps:get('Session-Expiry-Interval', ConnProps, 0); + true -> + case CleanStart of + true -> 0; + false -> + emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) + end + end, SessAttrs); +set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) -> + maps:put(topic_alias_maximum, if + ProtoVer =:= ?MQTT_PROTO_V5 -> + maps:get('Topic-Alias-Maximum', ConnProps, 0); + true -> + emqx_zone:get_env(Zone, max_topic_alias, 0) + end, SessAttrs); +set_session_attrs({_, #pstate{}}, SessAttrs) -> + SessAttrs. + + authenticate(Credentials, Password) -> case emqx_access_control:authenticate(Credentials, Password) of ok -> {ok, false}; diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 4d570bc08..4514debcb 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -47,7 +47,7 @@ -export([info/1, attrs/1]). -export([stats/1]). -export([resume/2, discard/2]). --export([update_expiry_interval/2, update_max_inflight/2]). +-export([update_expiry_interval/2, update_misc/2]). -export([subscribe/2, subscribe/4]). -export([publish/3]). -export([puback/2, puback/3]). @@ -145,7 +145,9 @@ enqueue_stats = 0, %% Created at - created_at :: erlang:timestamp() + created_at :: erlang:timestamp(), + + topic_alias_maximum :: pos_integer() }). -type(spid() :: pid()). @@ -318,8 +320,8 @@ discard(SPid, ByPid) -> update_expiry_interval(SPid, Interval) -> gen_server:cast(SPid, {expiry_interval, Interval * 1000}). -update_max_inflight(SPid, MaxInflight) -> - gen_server:cast(SPid, {max_inflight, MaxInflight}). +update_misc(SPid, Misc) -> + gen_server:cast(SPid, {update_misc, Misc}). -spec(close(spid()) -> ok). close(SPid) -> @@ -329,36 +331,38 @@ close(SPid) -> %% gen_server callbacks %%------------------------------------------------------------------------------ -init([Parent, #{zone := Zone, - client_id := ClientId, - username := Username, - conn_pid := ConnPid, - clean_start := CleanStart, - expiry_interval := ExpiryInterval, - max_inflight := MaxInflight}]) -> +init([Parent, #{zone := Zone, + client_id := ClientId, + username := Username, + conn_pid := ConnPid, + clean_start := CleanStart, + expiry_interval := ExpiryInterval, + max_inflight := MaxInflight, + topic_alias_maximum := TopicAliasMaximum}]) -> process_flag(trap_exit, true), true = link(ConnPid), IdleTimout = get_env(Zone, idle_timeout, 30000), - State = #state{idle_timeout = IdleTimout, - clean_start = CleanStart, - binding = binding(ConnPid), - client_id = ClientId, - username = Username, - conn_pid = ConnPid, - subscriptions = #{}, - max_subscriptions = get_env(Zone, max_subscriptions, 0), - upgrade_qos = get_env(Zone, upgrade_qos, false), - inflight = emqx_inflight:new(MaxInflight), - mqueue = init_mqueue(Zone), - retry_interval = get_env(Zone, retry_interval, 0), - awaiting_rel = #{}, - await_rel_timeout = get_env(Zone, await_rel_timeout), - max_awaiting_rel = get_env(Zone, max_awaiting_rel), - expiry_interval = ExpiryInterval, - enable_stats = get_env(Zone, enable_stats, true), - deliver_stats = 0, - enqueue_stats = 0, - created_at = os:timestamp() + State = #state{idle_timeout = IdleTimout, + clean_start = CleanStart, + binding = binding(ConnPid), + client_id = ClientId, + username = Username, + conn_pid = ConnPid, + subscriptions = #{}, + max_subscriptions = get_env(Zone, max_subscriptions, 0), + upgrade_qos = get_env(Zone, upgrade_qos, false), + inflight = emqx_inflight:new(MaxInflight), + mqueue = init_mqueue(Zone), + retry_interval = get_env(Zone, retry_interval, 0), + awaiting_rel = #{}, + await_rel_timeout = get_env(Zone, await_rel_timeout), + max_awaiting_rel = get_env(Zone, max_awaiting_rel), + expiry_interval = ExpiryInterval, + enable_stats = get_env(Zone, enable_stats, true), + deliver_stats = 0, + enqueue_stats = 0, + created_at = os:timestamp(), + topic_alias_maximum = TopicAliasMaximum }, emqx_sm:register_session(ClientId, attrs(State)), emqx_sm:set_session_stats(ClientId, stats(State)), @@ -546,8 +550,9 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, handle_cast({expiry_interval, Interval}, State) -> {noreply, State#state{expiry_interval = Interval}}; -handle_cast({max_inflight, MaxInflight}, State) -> - {noreply, State#state{inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight)}}; +handle_cast({update_misc, #{max_inflight := MaxInflight, topic_alias_maximum := TopicAliasMaximum}}, State) -> + {noreply, State#state{inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight), + topic_alias_maximum = TopicAliasMaximum}}; handle_cast(Msg, State) -> emqx_logger:error("[Session] unexpected cast: ~p", [Msg]), @@ -560,15 +565,22 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> end, State, Msgs)}; %% Dispatch message -handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) -> - noreply(case maps:find(Topic, SubMap) of - {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> - run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State); - {ok, #{nl := Nl, qos := QoS, rap := Rap}} -> - run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State); - error -> - dispatch(emqx_message:unset_flag(dup, Msg), State) - end); +handle_info({dispatch, Topic, Msg = #message{headers = Headers}}, + State = #state{subscriptions = SubMap, topic_alias_maximum = TopicAliasMaximum}) when is_record(Msg, message) -> + TopicAlias = maps:get('Topic-Alias', Headers, undefined), + if + TopicAlias =:= undefined orelse TopicAlias =< TopicAliasMaximum -> + noreply(case maps:find(Topic, SubMap) of + {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> + run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State); + {ok, #{nl := Nl, qos := QoS, rap := Rap}} -> + run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State); + error -> + dispatch(emqx_message:unset_flag(dup, Msg), State) + end); + true -> + noreply(State) + end; %% Do nothing if the client has been disconnected. handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) -> diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 98046823f..1fa0b488b 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -56,11 +56,15 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid end, emqx_sm_locker:trans(ClientId, CleanStart); -open_session(SessAttrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid, max_inflight := MaxInflight}) -> +open_session(SessAttrs = #{clean_start := false, + client_id := ClientId, + conn_pid := ConnPid, + max_inflight := MaxInflight, + topic_alias_maximum := TopicAliasMaximum}) -> ResumeStart = fun(_) -> case resume_session(ClientId, ConnPid) of {ok, SPid} -> - emqx_session:update_max_inflight(SPid, MaxInflight), + emqx_session:update_misc(SPid, #{max_inflight => MaxInflight, topic_alias_maximum => TopicAliasMaximum}), {ok, SPid, true}; {error, not_found} -> emqx_session_sup:start_session(SessAttrs) diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index 0aa01458e..4a49a1fc5 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -52,7 +52,8 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> clean_start => true, username => undefined, expiry_interval => 0, - max_inflight => 0 + max_inflight => 0, + topic_alias_maximum => 0 }, {ok, SessPid} = emqx_sm:open_session(Attrs), {reply, {ok, SessPid}, diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index d5751f9bb..919be5218 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -44,7 +44,8 @@ t_get_set_caps(_) -> end, PubCaps = #{ max_qos_allowed => ?QOS_2, - mqtt_retain_available => true + mqtt_retain_available => true, + max_topic_alias => 0 }, PubCaps = emqx_mqtt_caps:get_caps(zone, publish), NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1}, diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index 1f85d4724..110e13026 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -25,7 +25,7 @@ t_open_close_session(_) -> emqx_ct_broker_helpers:run_setup_steps(), {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>), Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid, - zone => internal, username => <<"zhou">>, expiry_interval => 0, max_inflight => 0}, + zone => internal, username => <<"zhou">>, expiry_interval => 0, max_inflight => 0, topic_alias_maximum => 0}, {ok, SPid} = emqx_sm:open_session(Attrs), [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>), SPid = emqx_sm:lookup_session_pid(<<"client">>),