diff --git a/src/emqx_inflight.erl b/src/emqx_inflight.erl index 3353983d8..876052974 100644 --- a/src/emqx_inflight.erl +++ b/src/emqx_inflight.erl @@ -14,7 +14,7 @@ -module(emqx_inflight). --export([new/1, contain/2, lookup/2, insert/3, update/3, delete/2, values/1, +-export([new/1, contain/2, lookup/2, insert/3, update/3, update_size/2, delete/2, values/1, to_list/1, size/1, max_size/1, is_full/1, is_empty/1, window/1]). -type(max_size() :: pos_integer()). @@ -46,6 +46,10 @@ delete(Key, {?MODULE, MaxSize, Tree}) -> update(Key, Val, {?MODULE, MaxSize, Tree}) -> {?MODULE, MaxSize, gb_trees:update(Key, Val, Tree)}. +-spec(update_size(integer(), inflight()) -> inflight()). +update_size(MaxSize, {?MODULE, _OldMaxSize, Tree}) -> + {?MODULE, MaxSize, Tree}. + -spec(is_full(inflight()) -> boolean()). is_full({?MODULE, 0, _Tree}) -> false; diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index fc90cf492..c1bbbb6fd 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -61,6 +61,11 @@ validate(?PUBLISH_PACKET(_QoS, Topic, _, Properties, _)) -> ((not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid)) andalso validate_properties(?PUBLISH, Properties); +validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' := 0}})) -> + error(protocol_error); +validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' := _}})) -> + true; + validate(_Packet) -> true. diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 15a287116..753dc099b 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -208,11 +208,8 @@ received(Packet = ?PACKET(Type), PState) -> true -> {Packet1, PState1} = preprocess_properties(Packet, PState), process_packet(Packet1, inc_stats(recv, Type, PState1)); - {'EXIT', {topic_filters_invalid, _Stacktrace}} -> - deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState), - {error, topic_filters_invalid, PState}; {'EXIT', {Reason, _Stacktrace}} -> - deliver({disconnect, ?RC_MALFORMED_PACKET}, PState), + deliver({disconnect, rc(Reason)}, PState), {error, Reason, PState} end. @@ -593,17 +590,25 @@ try_open_session(#pstate{zone = Zone, clean_start => CleanStart }, - case emqx_sm:open_session(maps:put(expiry_interval, if - ProtoVer =:= ?MQTT_PROTO_V5 -> - maps:get('Session-Expiry-Interval', ConnProps, 0); - true -> - case CleanStart of - true -> - 0; - false -> - emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) - end - end, SessAttrs)) of + MaxInflight = #{max_inflight => if + ProtoVer =:= ?MQTT_PROTO_V5 -> + maps:get('Receive-Maximum', ConnProps, 65535); + true -> + emqx_zone:get_env(Zone, max_inflight, 65535) + end}, + SessionExpiryInterval = #{expiry_interval => if + ProtoVer =:= ?MQTT_PROTO_V5 -> + maps:get('Session-Expiry-Interval', ConnProps, 0); + true -> + case CleanStart of + true -> + 0; + false -> + emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) + end + end}, + + case emqx_sm:open_session(maps:merge(SessAttrs, maps:merge(MaxInflight, SessionExpiryInterval))) of {ok, SPid} -> {ok, SPid, false}; Other -> Other @@ -782,6 +787,14 @@ start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 -> Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75), self() ! {keepalive, start, round(Secs * Backoff)}. +rc(Reason) -> + case Reason of + protocol_error -> ?RC_PROTOCOL_ERROR; + topic_filters_invalid -> ?RC_TOPIC_FILTER_INVALID; + topic_name_invalid -> ?RC_TOPIC_NAME_INVALID; + _ -> ?RC_MALFORMED_PACKET + end. + %%----------------------------------------------------------------------------- %% Parse topic filters %%----------------------------------------------------------------------------- diff --git a/src/emqx_session.erl b/src/emqx_session.erl index ad3e66cc5..4d570bc08 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -47,7 +47,7 @@ -export([info/1, attrs/1]). -export([stats/1]). -export([resume/2, discard/2]). --export([update_expiry_interval/2]). +-export([update_expiry_interval/2, update_max_inflight/2]). -export([subscribe/2, subscribe/4]). -export([publish/3]). -export([puback/2, puback/3]). @@ -318,6 +318,9 @@ discard(SPid, ByPid) -> update_expiry_interval(SPid, Interval) -> gen_server:cast(SPid, {expiry_interval, Interval * 1000}). +update_max_inflight(SPid, MaxInflight) -> + gen_server:cast(SPid, {max_inflight, MaxInflight}). + -spec(close(spid()) -> ok). close(SPid) -> gen_server:call(SPid, close, infinity). @@ -331,10 +334,10 @@ init([Parent, #{zone := Zone, username := Username, conn_pid := ConnPid, clean_start := CleanStart, - expiry_interval := ExpiryInterval}]) -> + expiry_interval := ExpiryInterval, + max_inflight := MaxInflight}]) -> process_flag(trap_exit, true), true = link(ConnPid), - MaxInflight = get_env(Zone, max_inflight), IdleTimout = get_env(Zone, idle_timeout, 30000), State = #state{idle_timeout = IdleTimout, clean_start = CleanStart, @@ -543,6 +546,9 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, handle_cast({expiry_interval, Interval}, State) -> {noreply, State#state{expiry_interval = Interval}}; +handle_cast({max_inflight, MaxInflight}, State) -> + {noreply, State#state{inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight)}}; + handle_cast(Msg, State) -> emqx_logger:error("[Session] unexpected cast: ~p", [Msg]), {noreply, State}. diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 36d416f3b..98046823f 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -56,10 +56,11 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid end, emqx_sm_locker:trans(ClientId, CleanStart); -open_session(SessAttrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) -> +open_session(SessAttrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid, max_inflight := MaxInflight}) -> ResumeStart = fun(_) -> case resume_session(ClientId, ConnPid) of {ok, SPid} -> + emqx_session:update_max_inflight(SPid, MaxInflight), {ok, SPid, true}; {error, not_found} -> emqx_session_sup:start_session(SessAttrs) diff --git a/test/emqx_mock_client.erl b/test/emqx_mock_client.erl index 4528239e6..0aa01458e 100644 --- a/test/emqx_mock_client.erl +++ b/test/emqx_mock_client.erl @@ -51,7 +51,8 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) -> conn_pid => ClientPid, clean_start => true, username => undefined, - expiry_interval => 0 + expiry_interval => 0, + max_inflight => 0 }, {ok, SessPid} = emqx_sm:open_session(Attrs), {reply, {ok, SessPid}, diff --git a/test/emqx_sm_SUITE.erl b/test/emqx_sm_SUITE.erl index 6f9b92399..1f85d4724 100644 --- a/test/emqx_sm_SUITE.erl +++ b/test/emqx_sm_SUITE.erl @@ -25,7 +25,7 @@ t_open_close_session(_) -> emqx_ct_broker_helpers:run_setup_steps(), {ok, ClientPid} = emqx_mock_client:start_link(<<"client">>), Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid, - zone => internal, username => <<"zhou">>, expiry_interval => 0}, + zone => internal, username => <<"zhou">>, expiry_interval => 0, max_inflight => 0}, {ok, SPid} = emqx_sm:open_session(Attrs), [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>), SPid = emqx_sm:lookup_session_pid(<<"client">>),