From d2b6a95484fab41ca9162fa6bf3721b57e03ffbb Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sun, 31 Oct 2021 12:41:07 +0800 Subject: [PATCH 01/29] fix(stomp): fix anonymous not working --- apps/emqx_stomp/src/emqx_stomp_protocol.erl | 34 +++++++++++---------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index cc5c28ce9..0bd80d628 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -77,19 +77,19 @@ init(#{peername := Peername, AllowAnonymous = get_value(allow_anonymous, Env, false), DefaultUser = get_value(default_user, Env), #pstate{peername = Peername, - heartfun = HeartFun, - sendfun = SendFun, - timers = #{}, - transaction = #{}, - allow_anonymous = AllowAnonymous, - default_user = DefaultUser}. + heartfun = HeartFun, + sendfun = SendFun, + timers = #{}, + transaction = #{}, + allow_anonymous = AllowAnonymous, + default_user = DefaultUser}. info(#pstate{connected = Connected, - proto_ver = ProtoVer, - proto_name = ProtoName, - heart_beats = Heartbeats, - login = Login, - subscriptions = Subscriptions}) -> + proto_ver = ProtoVer, + proto_name = ProtoName, + heart_beats = Heartbeats, + login = Login, + subscriptions = Subscriptions}) -> [{connected, Connected}, {proto_ver, ProtoVer}, {proto_name, ProtoName}, @@ -139,7 +139,7 @@ received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) - end; received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers}, - State = #pstate{subscriptions = Subscriptions}) -> + State = #pstate{subscriptions = Subscriptions, login = Login}) -> Id = header(<<"id">>, Headers), Topic = header(<<"destination">>, Headers), Ack = header(<<"ack">>, Headers, <<"auto">>), @@ -147,7 +147,7 @@ received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers}, {Id, Topic, Ack} -> {ok, State}; false -> - emqx_broker:subscribe(Topic), + emqx_broker:subscribe(Topic, Login), {ok, State#pstate{subscriptions = [{Id, Topic, Ack}|Subscriptions]}} end, maybe_send_receipt(receipt_id(Headers), State1); @@ -312,13 +312,15 @@ negotiate_version(Ver, [AcceptVer|_]) when Ver >= AcceptVer -> negotiate_version(Ver, [_|T]) -> negotiate_version(Ver, T). -check_login(undefined, _, AllowAnonymous, _) -> +check_login(Login, _, AllowAnonymous, _) + when Login == <<>>; + Login == undefined -> AllowAnonymous; check_login(_, _, _, undefined) -> false; check_login(Login, Passcode, _, DefaultUser) -> - case {list_to_binary(get_value(login, DefaultUser)), - list_to_binary(get_value(passcode, DefaultUser))} of + case {iolist_to_binary(get_value(login, DefaultUser)), + iolist_to_binary(get_value(passcode, DefaultUser))} of {Login, Passcode} -> true; {_, _ } -> false end. From ed505ee120cdd611364183731a0b7a814da875ed Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 1 Nov 2021 09:25:49 +0800 Subject: [PATCH 02/29] refactor(stomp): compatible hooks system --- apps/emqx_stomp/src/emqx_stomp_connection.erl | 354 +++++++++++++---- apps/emqx_stomp/src/emqx_stomp_heartbeat.erl | 1 - apps/emqx_stomp/src/emqx_stomp_protocol.erl | 360 ++++++++++++++---- 3 files changed, 582 insertions(+), 133 deletions(-) diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl index d4e7f6475..d926e8eef 100644 --- a/apps/emqx_stomp/src/emqx_stomp_connection.erl +++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl @@ -20,9 +20,16 @@ -include("emqx_stomp.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -logger_header("[Stomp-Conn]"). +-import(emqx_misc, + [ maybe_apply/2 + , start_timer/2 + ]). + -export([ start_link/3 , info/1 ]). @@ -39,52 +46,160 @@ %% for protocol -export([send/4, heartbeat/2]). --record(state, {transport, socket, peername, conn_name, conn_state, - await_recv, rate_limit, parser, pstate, - proto_env, heartbeat}). +%% for mgmt +-export([call/2, call/3]). --define(INFO_KEYS, [peername, await_recv, conn_state]). --define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). +-record(state, { + %% TCP/TLS Transport + transport :: esockd:transport(), + %% TCP/TLS Socket + socket :: esockd:socket(), + %% Peername of the connection + peername :: emqx_types:peername(), + %% Sockname of the connection + sockname :: emqx_types:peername(), + %% Sock State + sockstate :: emqx_types:sockstate(), + %% The {active, N} option + active_n :: pos_integer(), + %% Limiter + limiter :: maybe(emqx_limiter:limiter()), + %% Limit Timer + limit_timer :: maybe(reference()), + %% GC State + gc_state :: maybe(emqx_gc:gc_state()), + %% Stats Timer + stats_timer :: disabled | maybe(reference()), + + await_recv, parser, pstate, + proto_env, heartbeat}). + +-type(state() :: #state{}). + +-define(DEFAULT_GC_POLICY, #{bytes => 16777216, count => 16000}). +-define(DEFAULT_OOM_POLICY, #{ max_heap_size => 8388608, + message_queue_len => 10000}). + +-define(ACTIVE_N, 100). +-define(IDLE_TIMEOUT, 30000). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). +-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). +-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). + +-define(ENABLED(X), (X =/= undefined)). start_link(Transport, Sock, ProtoEnv) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, ProtoEnv]])}. -info(CPid) -> - gen_server:call(CPid, info, infinity). +-spec info(pid()|state()) -> emqx_types:infos(). +info(CPid) when is_pid(CPid) -> + call(CPid, info); +info(State = #state{pstate = PState}) -> + ChanInfo = emqx_stomp_protocol:info(PState), + SockInfo = maps:from_list( + info(?INFO_KEYS, State)), + ChanInfo#{sockinfo => SockInfo}. -init([Transport, Sock, ProtoEnv]) -> - process_flag(trap_exit, true), - case Transport:wait(Sock) of - {ok, NewSock} -> - {ok, Peername} = Transport:ensure_ok_or_exit(peername, [NewSock]), - ConnName = esockd:format(Peername), - SendFun = {fun ?MODULE:send/4, [Transport, Sock, self()]}, - HrtBtFun = {fun ?MODULE:heartbeat/2, [Transport, Sock]}, - Parser = emqx_stomp_frame:init_parer_state(ProtoEnv), - PState = emqx_stomp_protocol:init(#{peername => Peername, - sendfun => SendFun, - heartfun => HrtBtFun}, ProtoEnv), - RateLimit = init_rate_limit(proplists:get_value(rate_limit, ProtoEnv)), - State = run_socket(#state{transport = Transport, - socket = NewSock, - peername = Peername, - conn_name = ConnName, - conn_state = running, - await_recv = false, - rate_limit = RateLimit, - parser = Parser, - proto_env = ProtoEnv, - pstate = PState}), - emqx_logger:set_metadata_peername(esockd:format(Peername)), - gen_server:enter_loop(?MODULE, [{hibernate_after, 5000}], State, 20000); +info(Keys, State) when is_list(Keys) -> + [{Key, info(Key, State)} || Key <- Keys]; +info(socktype, #state{transport = Transport, socket = Socket}) -> + Transport:type(Socket); +info(peername, #state{peername = Peername}) -> + Peername; +info(sockname, #state{sockname = Sockname}) -> + Sockname; +info(sockstate, #state{sockstate = SockSt}) -> + SockSt; +info(active_n, #state{active_n = ActiveN}) -> + ActiveN; +info(stats_timer, #state{stats_timer = StatsTimer}) -> + StatsTimer; +info(limit_timer, #state{limit_timer = LimitTimer}) -> + LimitTimer; +info(limiter, #state{limiter = Limiter}) -> + maybe_apply(fun emqx_limiter:info/1, Limiter). + +-spec stats(pid()|state()) -> emqx_types:stats(). +stats(CPid) when is_pid(CPid) -> + call(CPid, stats); +stats(#state{transport = Transport, + socket = Socket, + pstate = PState}) -> + SockStats = case Transport:getstat(Socket, ?SOCK_STATS) of + {ok, Ss} -> Ss; + {error, _} -> [] + end, + ConnStats = emqx_pd:get_counters(?CONN_STATS), + ChanStats = emqx_stomp_protocol:stats(PState), + ProcStats = emqx_misc:proc_stats(), + lists:append([SockStats, ConnStats, ChanStats, ProcStats]). + +call(Pid, Req) -> + call(Pid, Req, infinity). +call(Pid, Req, Timeout) -> + gen_server:call(Pid, Req, Timeout). + +init([Transport, RawSocket, ProtoEnv]) -> + case Transport:wait(RawSocket) of + {ok, Socket} -> + init_state(Transport, Socket, ProtoEnv); {error, Reason} -> - {stop, Reason} + ok = Transport:fast_close(RawSocket), + exit_on_sock_error(Reason) end. -init_rate_limit(undefined) -> - undefined; -init_rate_limit({Rate, Burst}) -> - esockd_rate_limit:new(Rate, Burst). +init_state(Transport, Socket, ProtoEnv) -> + {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), + {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), + + SendFun = {fun ?MODULE:send/4, [Transport, Socket, self()]}, + HrtBtFun = {fun ?MODULE:heartbeat/2, [Transport, Socket]}, + Parser = emqx_stomp_frame:init_parer_state(ProtoEnv), + + ActiveN = proplists:get_value(active_n, ProtoEnv, ?ACTIVE_N), + GcState = emqx_gc:init(?DEFAULT_GC_POLICY), + + Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]), + ConnInfo = #{socktype => Transport:type(Socket), + peername => Peername, + sockname => Sockname, + peercert => Peercert, + sendfun => SendFun, + heartfun => HrtBtFun, + conn_mod => ?MODULE + }, + PState = emqx_stomp_protocol:init(ConnInfo, ProtoEnv), + State = #state{transport = Transport, + socket = Socket, + peername = Peername, + sockname = Sockname, + sockstate = idle, + active_n = ActiveN, + limiter = undefined, + parser = Parser, + proto_env = ProtoEnv, + gc_state = GcState, + pstate = PState}, + case activate_socket(State) of + {ok, NState} -> + emqx_logger:set_metadata_peername( + esockd:format(Peername)), + gen_server:enter_loop( + ?MODULE, [{hibernate_after, 5000}], NState, 20000); + {error, Reason} -> + ok = Transport:fast_close(Socket), + exit_on_sock_error(Reason) + end. + +-spec exit_on_sock_error(any()) -> no_return(). +exit_on_sock_error(Reason) when Reason =:= einval; + Reason =:= enotconn; + Reason =:= closed -> + erlang:exit(normal); +exit_on_sock_error(timeout) -> + erlang:exit({shutdown, ssl_upgrade_timeout}); +exit_on_sock_error(Reason) -> + erlang:exit({shutdown, Reason}). send(Data, Transport, Sock, ConnPid) -> try Transport:async_send(Sock, Data) of @@ -101,10 +216,10 @@ handle_call(info, _From, State = #state{transport = Transport, socket = Sock, peername = Peername, await_recv = AwaitRecv, - conn_state = ConnState, + sockstate = ConnState, pstate = PState}) -> ClientInfo = [{peername, Peername}, {await_recv, AwaitRecv}, - {conn_state, ConnState}], + {sockstate, ConnState}], ProtoInfo = emqx_stomp_protocol:info(PState), case Transport:getstat(Sock, ?SOCK_STATS) of {ok, SockStats} -> @@ -113,6 +228,12 @@ handle_call(info, _From, State = #state{transport = Transport, {stop, Reason, lists:append([ClientInfo, ProtoInfo]), State} end; +handle_call(discard, _From, State) -> + shutdown(discared, State); + +handle_call(kick, _From, State) -> + shutdown(kicked, State); + handle_call(Req, _From, State) -> ?LOG(error, "unexpected request: ~p", [Req]), {reply, ignored, State}. @@ -121,6 +242,11 @@ handle_cast(Msg, State) -> ?LOG(error, "unexpected msg: ~p", [Msg]), noreply(State). +handle_info({event, connected}, State = #state{pstate = PState}) -> + ClientId = emqx_stomp_protocol:info(clientid, PState), + emqx_cm:insert_channel_info(ClientId, info(State), stats(State)), + noreply(State); + handle_info(timeout, State) -> shutdown(idle_timeout, State); @@ -141,26 +267,76 @@ handle_info({timeout, TRef, TMsg}, State) when TMsg =:= incoming; shutdown({sock_error, Reason}, State) end; +handle_info({timeout, _TRef, limit_timeout}, State) -> + NState = State#state{sockstate = idle, + limit_timer = undefined + }, + handle_info(activate_socket, NState); + +handle_info({timeout, _TRef, emit_stats}, + State = #state{pstate = PState}) -> + ClientId = emqx_stomp_protocol:info(clientid, PState), + emqx_cm:set_chan_stats(ClientId, stats(State)), + {ok, State#state{stats_timer = undefined}}; + handle_info({timeout, TRef, TMsg}, State) -> with_proto(timeout, [TRef, TMsg], State); handle_info({'EXIT', HbProc, Error}, State = #state{heartbeat = HbProc}) -> stop(Error, State); -handle_info(activate_sock, State) -> - noreply(run_socket(State#state{conn_state = running})); - -handle_info({inet_async, _Sock, _Ref, {ok, Bytes}}, State) -> - ?LOG(debug, "RECV ~p", [Bytes]), - received(Bytes, rate_limit(size(Bytes), State#state{await_recv = false})); - -handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> - shutdown(Reason, State); +handle_info(activate_socket, State = #state{sockstate = OldSst}) -> + case activate_socket(State) of + {ok, NState = #state{sockstate = NewSst}} -> + case OldSst =/= NewSst of + true -> {ok, {event, NewSst}, NState}; + false -> {ok, NState} + end; + {error, Reason} -> + handle_info({sock_error, Reason}, State) + end; handle_info({inet_reply, _Ref, ok}, State) -> noreply(State); +handle_info({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> + ?LOG(debug, "RECV ~0p", [Data]), + Oct = iolist_size(Data), + inc_counter(incoming_bytes, Oct), + ok = emqx_metrics:inc('bytes.received', Oct), + ensure_stats_timer(?IDLE_TIMEOUT, received(Data, State)); + +handle_info({Passive, _Sock}, State) + when Passive == tcp_passive; Passive == ssl_passive -> + %% In Stats + Pubs = emqx_pd:reset_counter(incoming_pubs), + Bytes = emqx_pd:reset_counter(incoming_bytes), + InStats = #{cnt => Pubs, oct => Bytes}, + %% Ensure Rate Limit + NState = ensure_rate_limit(InStats, State), + %% Run GC and Check OOM + NState1 = check_oom(run_gc(InStats, NState)), + handle_info(activate_socket, NState1); + +handle_info({Error, _Sock, Reason}, State) + when Error == tcp_error; Error == ssl_error -> + handle_info({sock_error, Reason}, State); + +handle_info({Closed, _Sock}, State) + when Closed == tcp_closed; Closed == ssl_closed -> + handle_info({sock_closed, Closed}, close_socket(State)); + handle_info({inet_reply, _Sock, {error, Reason}}, State) -> + handle_info({sock_error, Reason}, State); + +handle_info({sock_error, Reason}, State) -> + case Reason =/= closed andalso Reason =/= einval of + true -> ?LOG(warning, "socket_error: ~p", [Reason]); + false -> ok + end, + handle_info({sock_closed, Reason}, close_socket(State)); + +handle_info({sock_closed, Reason}, State) -> shutdown(Reason, State); handle_info({deliver, _Topic, Msg}, State = #state{pstate = PState}) -> @@ -208,7 +384,7 @@ with_proto(Fun, Args, State = #state{pstate = PState}) -> received(<<>>, State) -> noreply(State); -received(Bytes, State = #state{parser = Parser, +received(Bytes, State = #state{parser = Parser, pstate = PState}) -> try emqx_stomp_frame:parse(Bytes, Parser) of {more, NewParser} -> @@ -237,25 +413,68 @@ received(Bytes, State = #state{parser = Parser, reset_parser(State = #state{proto_env = ProtoEnv}) -> State#state{parser = emqx_stomp_frame:init_parer_state(ProtoEnv)}. -rate_limit(_Size, State = #state{rate_limit = undefined}) -> - run_socket(State); -rate_limit(Size, State = #state{rate_limit = Rl}) -> - case esockd_rate_limit:check(Size, Rl) of - {0, Rl1} -> - run_socket(State#state{conn_state = running, rate_limit = Rl1}); - {Pause, Rl1} -> - ?LOG(error, "Rate limiter pause for ~p", [Pause]), - erlang:send_after(Pause, self(), activate_sock), - State#state{conn_state = blocked, rate_limit = Rl1} +activate_socket(State = #state{sockstate = closed}) -> + {ok, State}; +activate_socket(State = #state{sockstate = blocked}) -> + {ok, State}; +activate_socket(State = #state{transport = Transport, + socket = Socket, + active_n = N}) -> + case Transport:setopts(Socket, [{active, N}]) of + ok -> {ok, State#state{sockstate = running}}; + Error -> Error end. -run_socket(State = #state{conn_state = blocked}) -> - State; -run_socket(State = #state{await_recv = true}) -> - State; -run_socket(State = #state{transport = Transport, socket = Sock}) -> - Transport:async_recv(Sock, 0, infinity), - State#state{await_recv = true}. +close_socket(State = #state{sockstate = closed}) -> State; +close_socket(State = #state{transport = Transport, socket = Socket}) -> + ok = Transport:fast_close(Socket), + State#state{sockstate = closed}. + +%%-------------------------------------------------------------------- +%% Ensure rate limit + +ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> + case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of + false -> State; + {ok, Limiter1} -> + State#state{limiter = Limiter1}; + {pause, Time, Limiter1} -> + ?LOG(warning, "Pause ~pms due to rate limit", [Time]), + TRef = start_timer(Time, limit_timeout), + State#state{sockstate = blocked, + limiter = Limiter1, + limit_timer = TRef + } + end. + +%%-------------------------------------------------------------------- +%% Run GC and Check OOM + +run_gc(Stats, State = #state{gc_state = GcSt}) -> + case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of + false -> State; + {_IsGC, GcSt1} -> + State#state{gc_state = GcSt1} + end. + +check_oom(State) -> + OomPolicy = ?DEFAULT_OOM_POLICY, + ?tp(debug, check_oom, #{policy => OomPolicy}), + case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of + {shutdown, Reason} -> + %% triggers terminate/2 callback immediately + erlang:exit({shutdown, Reason}); + _Other -> + ok + end, + State. + +%%-------------------------------------------------------------------- +%% Ensure/cancel stats timer + +ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) -> + State#state{stats_timer = start_timer(Timeout, emit_stats)}; +ensure_stats_timer(_Timeout, State) -> State. getstat(Stat, #state{transport = Transport, socket = Sock}) -> case Transport:getstat(Sock, [Stat]) of @@ -272,3 +491,6 @@ stop(Reason, State) -> shutdown(Reason, State) -> stop({shutdown, Reason}, State). +inc_counter(Key, Inc) -> + _ = emqx_pd:inc_counter(Key, Inc), + ok. diff --git a/apps/emqx_stomp/src/emqx_stomp_heartbeat.erl b/apps/emqx_stomp/src/emqx_stomp_heartbeat.erl index 145359e53..2a221ad68 100644 --- a/apps/emqx_stomp/src/emqx_stomp_heartbeat.erl +++ b/apps/emqx_stomp/src/emqx_stomp_heartbeat.erl @@ -33,7 +33,6 @@ outgoing => #heartbeater{} }. - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index 0bd80d628..dc02ac232 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -30,6 +30,8 @@ %% API -export([ init/2 , info/1 + , info/2 + , stats/1 ]). -export([ received/2 @@ -45,19 +47,28 @@ ]). -record(pstate, { - peername, - heartfun, - sendfun, + %% Stomp ConnInfo + conninfo :: emqx_types:conninfo(), + %% Stomp ClientInfo + clientinfo :: emqx_types:clientinfo(), + %% Stomp Heartbeats + heart_beats :: emqx_stomp_hearbeat:heartbeat(), + %% Stomp Connection State connected = false, - proto_ver, - proto_name, - heart_beats, - login, - allow_anonymous, - default_user, - subscriptions = [], + %% Timers timers :: #{atom() => disable | undefined | reference()}, - transaction :: #{binary() => list()} + %% Transaction + transaction :: #{binary() => list()}, + %% Subscriptions + subscriptions = [], + %% Send function + sendfun :: function(), + %% Heartbeat function + heartfun :: function(), + %% The confs for the connection + %% TODO: put these configs into a public mem? + allow_anonymous, + default_user }). -define(TIMER_TABLE, #{ @@ -68,34 +79,132 @@ -define(TRANS_TIMEOUT, 60000). +-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). + +-define(STATS_KEYS, [subscriptions_cnt, + subscriptions_max, + inflight_cnt, + inflight_max, + mqueue_len, + mqueue_max, + mqueue_dropped, + next_pkt_id, + awaiting_rel_cnt, + awaiting_rel_max + ]). + -type(pstate() :: #pstate{}). %% @doc Init protocol -init(#{peername := Peername, - sendfun := SendFun, - heartfun := HeartFun}, Env) -> - AllowAnonymous = get_value(allow_anonymous, Env, false), - DefaultUser = get_value(default_user, Env), - #pstate{peername = Peername, - heartfun = HeartFun, - sendfun = SendFun, - timers = #{}, - transaction = #{}, - allow_anonymous = AllowAnonymous, - default_user = DefaultUser}. +init(ConnInfo = #{peername := {PeerHost, _Port}, + sockname := {_Host, SockPort}, + sendfun := SendFun, + heartfun := HeartFun}, Opts) -> -info(#pstate{connected = Connected, - proto_ver = ProtoVer, - proto_name = ProtoName, - heart_beats = Heartbeats, - login = Login, - subscriptions = Subscriptions}) -> - [{connected, Connected}, - {proto_ver, ProtoVer}, - {proto_name, ProtoName}, - {heart_beats, Heartbeats}, - {login, Login}, - {subscriptions, Subscriptions}]. + NConnInfo = default_conninfo(ConnInfo), + + ClientInfo = #{zone => undefined, + protocol => stomp, + peerhost => PeerHost, + sockport => SockPort, + clientid => undefined, + username => undefined, + mountpoint => undefined, + is_bridge => false, + is_superuser => false + }, + + AllowAnonymous = get_value(allow_anonymous, Opts, false), + DefaultUser = get_value(default_user, Opts), + + #pstate{ + conninfo = NConnInfo, + clientinfo = ClientInfo, + heartfun = HeartFun, + sendfun = SendFun, + timers = #{}, + transaction = #{}, + allow_anonymous = AllowAnonymous, + default_user = DefaultUser + }. + +default_conninfo(ConnInfo) -> + NConnInfo = maps:without([sendfun, heartfun], ConnInfo), + NConnInfo#{ + proto_name => <<"STOMP">>, + proto_ver => <<"1.2">>, + clean_start => true, + clientid => undefined, + username => undefined, + conn_props => [], + connected => false, + connected_at => undefined, + keepalive => undefined, + receive_maximum => 0, + expiry_interval => 0 + }. + +-spec info(pstate()) -> emqx_types:infos(). +info(State) -> + maps:from_list(info(?INFO_KEYS, State)). + +-spec info(list(atom())|atom(), pstate()) -> term(). +info(Keys, State) when is_list(Keys) -> + [{Key, info(Key, State)} || Key <- Keys]; +info(conninfo, #pstate{conninfo = ConnInfo}) -> + ConnInfo; +info(socktype, #pstate{conninfo = ConnInfo}) -> + maps:get(socktype, ConnInfo, undefined); +info(peername, #pstate{conninfo = ConnInfo}) -> + maps:get(peername, ConnInfo, undefined); +info(sockname, #pstate{conninfo = ConnInfo}) -> + maps:get(sockname, ConnInfo, undefined); +info(proto_name, #pstate{conninfo = ConnInfo}) -> + maps:get(proto_name, ConnInfo, undefined); +info(proto_ver, #pstate{conninfo = ConnInfo}) -> + maps:get(proto_ver, ConnInfo, undefined); +info(connected_at, #pstate{conninfo = ConnInfo}) -> + maps:get(connected_at, ConnInfo, undefined); +info(clientinfo, #pstate{clientinfo = ClientInfo}) -> + ClientInfo; +info(zone, _) -> + undefined; +info(clientid, #pstate{clientinfo = ClientInfo}) -> + maps:get(clientid, ClientInfo, undefined); +info(username, #pstate{clientinfo = ClientInfo}) -> + maps:get(username, ClientInfo, undefined); +info(session, State) -> + session_info(State); +info(conn_state, #pstate{connected = true}) -> + connected; +info(conn_state, _) -> + disconnected; +info(will_msg, _) -> + undefined. + +session_info(#pstate{conninfo = ConnInfo, subscriptions = Subs}) -> + NSubs = lists:foldl(fun({_Id, Topic, _Ack}, Acc) -> + Acc#{Topic => ?DEFAULT_SUBOPTS} + end, #{}, Subs), + #{subscriptions => NSubs, + upgrade_qos => false, + retry_interval => 0, + await_rel_timeout => 0, + created_at => maps:get(connected_at, ConnInfo, 0) + }. + +-spec stats(pstate()) -> emqx_types:stats(). +stats(#pstate{subscriptions = Subs}) -> + [{subscriptions_cnt, length(Subs)}, + {subscriptions_max, 0}, + {inflight_cnt, 0}, + {inflight_max, 0}, + {mqueue_len, 0}, + {mqueue_max, 0}, + {mqueue_dropped, 0}, + {next_pkt_id, 0}, + {awaiting_rel_cnt, 0}, + {awaiting_rel_max, 0}]. -spec(received(stomp_frame(), pstate()) -> {ok, pstate()} @@ -105,20 +214,50 @@ received(Frame = #stomp_frame{command = <<"STOMP">>}, State) -> received(Frame#stomp_frame{command = <<"CONNECT">>}, State); received(#stomp_frame{command = <<"CONNECT">>, headers = Headers}, - State = #pstate{connected = false, allow_anonymous = AllowAnonymous, default_user = DefaultUser}) -> + State = #pstate{connected = false}) -> case negotiate_version(header(<<"accept-version">>, Headers)) of {ok, Version} -> Login = header(<<"login">>, Headers), Passc = header(<<"passcode">>, Headers), - case check_login(Login, Passc, AllowAnonymous, DefaultUser) of + case check_login(Login, Passc, + allow_anonymous(State), + default_user(State) + ) of true -> - emqx_logger:set_metadata_clientid(Login), + NLogin = case Login == undefined orelse Login == <<>> of + false -> Login; + true -> emqx_guid:to_base62(emqx_guid:gen()) + end, + emqx_logger:set_metadata_clientid(NLogin), + ConnInfo = State#pstate.conninfo, + ClitInfo = State#pstate.clientinfo, + NConnInfo = ConnInfo#{ + proto_ver => Version, + clientid => NLogin, + username => NLogin + }, + NClitInfo = ClitInfo#{ + clientid => NLogin, + username => NLogin + }, + ConnPid = self(), + _ = emqx_cm_locker:trans(NLogin, fun(_) -> + emqx_cm:discard_session(NLogin), + emqx_cm:register_channel(NLogin, ConnPid, NConnInfo) + end), Heartbeats = parse_heartbeats(header(<<"heart-beat">>, Headers, <<"0,0">>)), - NState = start_heartbeart_timer(Heartbeats, State#pstate{connected = true, - proto_ver = Version, login = Login}), - send(connected_frame([{<<"version">>, Version}, - {<<"heart-beat">>, reverse_heartbeats(Heartbeats)}]), NState); + NState = start_heartbeart_timer( + Heartbeats, + State#pstate{ + conninfo = NConnInfo, + clientinfo = NClitInfo} + ), + ConnectedFrame = connected_frame( + [{<<"version">>, Version}, + {<<"heart-beat">>, reverse_heartbeats(Heartbeats)} + ]), + send(ConnectedFrame, ensure_connected(NState)); false -> _ = send(error_frame(undefined, <<"Login or passcode error!">>), State), {error, login_or_passcode_error, State} @@ -130,6 +269,7 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers}, end; received(#stomp_frame{command = <<"CONNECT">>}, State = #pstate{connected = true}) -> + ?LOG(error, "Received CONNECT frame on connected=true state"), {error, unexpected_connect, State}; received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) -> @@ -139,30 +279,56 @@ received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) - end; received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers}, - State = #pstate{subscriptions = Subscriptions, login = Login}) -> + State = #pstate{subscriptions = Subs}) -> Id = header(<<"id">>, Headers), Topic = header(<<"destination">>, Headers), Ack = header(<<"ack">>, Headers, <<"auto">>), - {ok, State1} = case lists:keyfind(Id, 1, Subscriptions) of - {Id, Topic, Ack} -> - {ok, State}; - false -> - emqx_broker:subscribe(Topic, Login), - {ok, State#pstate{subscriptions = [{Id, Topic, Ack}|Subscriptions]}} - end, - maybe_send_receipt(receipt_id(Headers), State1); + + case lists:keyfind(Id, 1, Subs) of + {Id, Topic, Ack} -> + ?LOG(info, "Subscription has established: ~s", [Topic]), + maybe_send_receipt(receipt_id(Headers), State); + false -> + case check_acl(subscribe, Topic, State) of + allow -> + ClientInfo = State#pstate.clientinfo, + ClientId = maps:get(clientid, ClientInfo), + %% XXX: We don't parse the request topic name or filter + %% which meaning stomp does not support shared + %% subscription + _ = run_hooks('client.subscribe', + [ClientInfo, _SubProps = #{}], + [{Topic, _TopicOpts = #{}}]), + + emqx_broker:subscribe(Topic, ClientId), + + SubOpts = ?DEFAULT_SUBOPTS#{is_new => true}, + _ = run_hooks('session.subscribed', + [ClientInfo, Topic, SubOpts]), + + NState = put_subs({Id, Topic, Ack}, State), + maybe_send_receipt(receipt_id(Headers), NState) + end + end; received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers}, - State = #pstate{subscriptions = Subscriptions}) -> + State = #pstate{subscriptions = Subs, clientinfo = ClientInfo}) -> Id = header(<<"id">>, Headers), + {ok, State1} = case lists:keyfind(Id, 1, Subs) of + {Id, Topic, _Ack} -> + _ = run_hooks('client.unsubscribe', + [ClientInfo, #{}], + [{Topic, #{}}]), - {ok, State1} = case lists:keyfind(Id, 1, Subscriptions) of - {Id, Topic, _Ack} -> - ok = emqx_broker:unsubscribe(Topic), - {ok, State#pstate{subscriptions = lists:keydelete(Id, 1, Subscriptions)}}; - false -> - {ok, State} - end, + ok = emqx_broker:unsubscribe(Topic), + + _ = run_hooks('session.unsubscribe', + [ClientInfo, Topic, ?DEFAULT_SUBOPTS]), + + {ok, remove_subs(Id, State)}; + false -> + {ok, State} + end, maybe_send_receipt(receipt_id(Headers), State1); %% ACK @@ -240,8 +406,8 @@ received(#stomp_frame{command = <<"DISCONNECT">>, headers = Headers}, State) -> {stop, normal, State}. send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, - State = #pstate{subscriptions = Subscriptions}) -> - case lists:keyfind(Topic, 2, Subscriptions) of + State = #pstate{subscriptions = Subs}) -> + case lists:keyfind(Topic, 2, Subs) of {Id, Topic, Ack} -> Headers0 = [{<<"subscription">>, Id}, {<<"message-id">>, next_msgid()}, @@ -269,6 +435,9 @@ send(Frame, State = #pstate{sendfun = {Fun, Args}}) -> erlang:apply(Fun, [Data] ++ Args), {ok, State}. +shutdown(Reason, State = #pstate{connected = true}) -> + _ = ensure_disconnected(Reason, State), + ok; shutdown(_Reason, _State) -> ok. @@ -398,11 +567,18 @@ receipt_id(Headers) -> handle_recv_send_frame(#stomp_frame{command = <<"SEND">>, headers = Headers, body = Body}, State) -> Topic = header(<<"destination">>, Headers), - _ = maybe_send_receipt(receipt_id(Headers), State), - _ = emqx_broker:publish( - make_mqtt_message(Topic, Headers, iolist_to_binary(Body)) - ), - State. + case check_acl(publish, Topic, State) of + allow -> + _ = maybe_send_receipt(receipt_id(Headers), State), + _ = emqx_broker:publish( + make_mqtt_message(Topic, Headers, iolist_to_binary(Body)) + ), + State; + deny -> + ErrFrame = error_frame(receipt_id(Headers), <<"Not Authorized">>), + {ok, NState} = send(ErrFrame, State), + NState + end. handle_recv_ack_frame(#stomp_frame{command = <<"ACK">>, headers = Headers}, State) -> Id = header(<<"id">>, Headers), @@ -435,6 +611,58 @@ start_heartbeart_timer(Heartbeats, State) -> [incoming_timer, outgoing_timer], State#pstate{heart_beats = emqx_stomp_heartbeat:init(Heartbeats)}). +%%-------------------------------------------------------------------- +%% ... + +check_acl(PubSub, Topic, State = #pstate{clientinfo = ClientInfo}) -> + case is_acl_enabled(State) andalso + emqx_access_control:check_acl(ClientInfo, PubSub, Topic) of + false -> allow; + Res -> Res + end. + +put_subs({Id, Topic, Ack}, State = #pstate{subscriptions = Subs}) -> + State#pstate{subscriptions = lists:keystore(Id, 1, Subs, {Id, Topic, Ack})}. + +remove_subs(Id, State = #pstate{subscriptions = Subs}) -> + State#pstate{subscriptions = lists:keydelete(Id, 1, Subs)}. + +%%-------------------------------------------------------------------- +%% ... + +is_acl_enabled(_) -> + %% TODO: configs from somewhere + true. + +default_user(#pstate{default_user = DefaultUser}) -> + DefaultUser. +allow_anonymous(#pstate{allow_anonymous = AllowAnonymous}) -> + AllowAnonymous. + +ensure_connected(State = #pstate{conninfo = ConnInfo, + clientinfo = ClientInfo}) -> + NConnInfo = ConnInfo#{ + connected => true, + connected_at => erlang:system_time(millisecond) + }, + %% send connected event + self() ! {event, connected}, + ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), + State#pstate{conninfo = NConnInfo, + connected = true + }. + +ensure_disconnected(Reason, State = #pstate{conninfo = ConnInfo, clientinfo = ClientInfo}) -> + NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, + ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), + State#pstate{conninfo = NConnInfo, connected = false}. + +run_hooks(Name, Args) -> + emqx_hooks:run(Name, Args). + +run_hooks(Name, Args, Acc) -> + emqx_hooks:run_fold(Name, Args, Acc). + %%-------------------------------------------------------------------- %% Timer From d2924e82ab9e9c95d14a0611d07104ba3f445ac4 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 2 Nov 2021 12:27:15 +0800 Subject: [PATCH 03/29] fix(stomp): fix kick/discard crash errors --- apps/emqx_stomp/src/emqx_stomp_connection.erl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl index d926e8eef..ba5413dfc 100644 --- a/apps/emqx_stomp/src/emqx_stomp_connection.erl +++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl @@ -229,10 +229,11 @@ handle_call(info, _From, State = #state{transport = Transport, end; handle_call(discard, _From, State) -> - shutdown(discared, State); + %% TODO: send the DISCONNECT packet? + shutdown_and_reply(discared, ok, State); handle_call(kick, _From, State) -> - shutdown(kicked, State); + shutdown_and_reply(kicked, ok, State); handle_call(Req, _From, State) -> ?LOG(error, "unexpected request: ~p", [Req]), @@ -491,6 +492,9 @@ stop(Reason, State) -> shutdown(Reason, State) -> stop({shutdown, Reason}, State). +shutdown_and_reply(Reason, Reply, State) -> + {stop, {shutdown, Reason}, Reply, State}. + inc_counter(Key, Inc) -> _ = emqx_pd:inc_counter(Key, Inc), ok. From 7734d6969c241fe7ee184385930c97b568f74114 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 2 Nov 2021 13:24:21 +0800 Subject: [PATCH 04/29] fix(stomp): support pub/sub operations --- apps/emqx_stomp/src/emqx_stomp_connection.erl | 5 +- apps/emqx_stomp/src/emqx_stomp_protocol.erl | 129 ++++++++++++------ 2 files changed, 88 insertions(+), 46 deletions(-) diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl index ba5413dfc..a75c90c4d 100644 --- a/apps/emqx_stomp/src/emqx_stomp_connection.erl +++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl @@ -349,8 +349,7 @@ handle_info({deliver, _Topic, Msg}, State = #state{pstate = PState}) -> end}); handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), - noreply(State). + with_proto(handle_info, [Info], State). terminate(Reason, #state{transport = Transport, socket = Sock, @@ -374,6 +373,8 @@ code_change(_OldVsn, State, _Extra) -> with_proto(Fun, Args, State = #state{pstate = PState}) -> case erlang:apply(emqx_stomp_protocol, Fun, Args ++ [PState]) of + ok -> + noreply(State); {ok, NPState} -> noreply(State#state{pstate = NPState}); {F, Reason, NPState} when F == stop; diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index dc02ac232..c2851895b 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -40,6 +40,9 @@ , timeout/3 ]). +-export([ handle_info/2 + ]). + %% for trans callback -export([ handle_recv_send_frame/2 , handle_recv_ack_frame/2 @@ -60,7 +63,7 @@ %% Transaction transaction :: #{binary() => list()}, %% Subscriptions - subscriptions = [], + subscriptions = #{}, %% Send function sendfun :: function(), %% Heartbeat function @@ -109,7 +112,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, sockport => SockPort, clientid => undefined, username => undefined, - mountpoint => undefined, + mountpoint => undefined, %% XXX: not supported now is_bridge => false, is_superuser => false }, @@ -183,10 +186,7 @@ info(will_msg, _) -> undefined. session_info(#pstate{conninfo = ConnInfo, subscriptions = Subs}) -> - NSubs = lists:foldl(fun({_Id, Topic, _Ack}, Acc) -> - Acc#{Topic => ?DEFAULT_SUBOPTS} - end, #{}, Subs), - #{subscriptions => NSubs, + #{subscriptions => Subs, upgrade_qos => false, retry_interval => 0, await_rel_timeout => 0, @@ -195,7 +195,7 @@ session_info(#pstate{conninfo = ConnInfo, subscriptions = Subs}) -> -spec stats(pstate()) -> emqx_types:stats(). stats(#pstate{subscriptions = Subs}) -> - [{subscriptions_cnt, length(Subs)}, + [{subscriptions_cnt, maps:size(Subs)}, {subscriptions_max, 0}, {inflight_cnt, 0}, {inflight_max, 0}, @@ -284,29 +284,23 @@ received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers}, Topic = header(<<"destination">>, Headers), Ack = header(<<"ack">>, Headers, <<"auto">>), - case lists:keyfind(Id, 1, Subs) of - {Id, Topic, Ack} -> + case find_sub_by_id(Id, Subs) of + {Topic, #{sub_props := #{id := Id, ack := Ack}}} -> ?LOG(info, "Subscription has established: ~s", [Topic]), maybe_send_receipt(receipt_id(Headers), State); - false -> + undefined -> case check_acl(subscribe, Topic, State) of allow -> ClientInfo = State#pstate.clientinfo, - ClientId = maps:get(clientid, ClientInfo), - %% XXX: We don't parse the request topic name or filter - %% which meaning stomp does not support shared - %% subscription + + [{TopicFilter, SubOpts}] = parse_topic_filters( + [{Topic, ?DEFAULT_SUBOPTS} + ]), + NSubOpts = SubOpts#{sub_props => #{id => Id, ack => Ack}}, _ = run_hooks('client.subscribe', [ClientInfo, _SubProps = #{}], - [{Topic, _TopicOpts = #{}}]), - - emqx_broker:subscribe(Topic, ClientId), - - SubOpts = ?DEFAULT_SUBOPTS#{is_new => true}, - _ = run_hooks('session.subscribed', - [ClientInfo, Topic, SubOpts]), - - NState = put_subs({Id, Topic, Ack}, State), + [{TopicFilter, NSubOpts}]), + NState = do_subscribe(TopicFilter, NSubOpts, State), maybe_send_receipt(receipt_id(Headers), NState) end end; @@ -314,22 +308,17 @@ received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers}, received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers}, State = #pstate{subscriptions = Subs, clientinfo = ClientInfo}) -> Id = header(<<"id">>, Headers), - {ok, State1} = case lists:keyfind(Id, 1, Subs) of - {Id, Topic, _Ack} -> + {ok, NState} = case find_sub_by_id(Id, Subs) of + {Topic, #{sub_props := #{id := Id}}} -> _ = run_hooks('client.unsubscribe', [ClientInfo, #{}], [{Topic, #{}}]), - - ok = emqx_broker:unsubscribe(Topic), - - _ = run_hooks('session.unsubscribe', - [ClientInfo, Topic, ?DEFAULT_SUBOPTS]), - - {ok, remove_subs(Id, State)}; - false -> + State1 = do_unsubscribe(Topic, ?DEFAULT_SUBOPTS, State), + {ok, State1}; + undefined -> {ok, State} end, - maybe_send_receipt(receipt_id(Headers), State1); + maybe_send_receipt(receipt_id(Headers), NState); %% ACK %% id:12345 @@ -407,8 +396,8 @@ received(#stomp_frame{command = <<"DISCONNECT">>, headers = Headers}, State) -> send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, State = #pstate{subscriptions = Subs}) -> - case lists:keyfind(Topic, 2, Subs) of - {Id, Topic, Ack} -> + case find_sub_by_topic(Topic, Subs) of + {Topic, #{sub_props := #{id := Id, ack := Ack}}} -> Headers0 = [{<<"subscription">>, Id}, {<<"message-id">>, next_msgid()}, {<<"destination">>, Topic}, @@ -423,7 +412,7 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, headers = Headers1 ++ maps:get(stomp_headers, Headers, []), body = Payload}, send(Frame, State); - false -> + undefined -> ?LOG(error, "Stomp dropped: ~p", [Msg]), {error, dropped, State} end; @@ -466,6 +455,27 @@ timeout(_TRef, clean_trans, State = #pstate{transaction = Trans}) -> NTrans = maps:filter(fun(_, {Ts, _}) -> Ts + ?TRANS_TIMEOUT < Now end, Trans), {ok, ensure_clean_trans_timer(State#pstate{transaction = NTrans})}. + +-spec(handle_info(Info :: term(), pstate()) + -> ok | {ok, pstate()} | {shutdown, Reason :: term(), pstate()}). + +handle_info({subscribe, TopicFilters}, State) -> + NState = lists:foldl( + fun({TopicFilter, SubOpts}, StateAcc) -> + do_subscribe(TopicFilter, SubOpts#{is_new => true}, StateAcc) + end, State, parse_topic_filters(TopicFilters)), + {ok, NState}; + +handle_info({unsubscribe, TopicFilters}, State) -> + NState = lists:foldl(fun({TopicFilter, SubOpts}, StateAcc) -> + do_unsubscribe(TopicFilter, SubOpts, StateAcc) + end, State, parse_topic_filters(TopicFilters)), + {ok, NState}; + +handle_info(Info, State) -> + ?LOG(warning, "Unexpected info ~p", [Info]), + {ok, State}. + negotiate_version(undefined) -> {ok, <<"1.0">>}; negotiate_version(Accepts) -> @@ -612,7 +622,10 @@ start_heartbeart_timer(Heartbeats, State) -> State#pstate{heart_beats = emqx_stomp_heartbeat:init(Heartbeats)}). %%-------------------------------------------------------------------- -%% ... +%% pub & sub helpers + +parse_topic_filters(TopicFilters) -> + lists:map(fun emqx_topic:parse/1, TopicFilters). check_acl(PubSub, Topic, State = #pstate{clientinfo = ClientInfo}) -> case is_acl_enabled(State) andalso @@ -621,19 +634,47 @@ check_acl(PubSub, Topic, State = #pstate{clientinfo = ClientInfo}) -> Res -> Res end. -put_subs({Id, Topic, Ack}, State = #pstate{subscriptions = Subs}) -> - State#pstate{subscriptions = lists:keystore(Id, 1, Subs, {Id, Topic, Ack})}. +do_subscribe(TopicFilter, SubOpts, + State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) -> + ClientId = maps:get(clientid, ClientInfo), + _ = emqx_broker:subscribe(TopicFilter, ClientId), + NSubOpts = SubOpts#{is_new => true}, + _ = run_hooks('session.subscribed', + [ClientInfo, TopicFilter, NSubOpts]), -remove_subs(Id, State = #pstate{subscriptions = Subs}) -> - State#pstate{subscriptions = lists:keydelete(Id, 1, Subs)}. + State#pstate{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}. -%%-------------------------------------------------------------------- -%% ... +do_unsubscribe(TopicFilter, SubOpts, + State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) -> + ok = emqx_broker:unsubscribe(TopicFilter), + _ = run_hooks('session.unsubscribe', + [ClientInfo, TopicFilter, SubOpts]), + + State#pstate{subscriptions = maps:remove(TopicFilter, Subs)}. + +find_sub_by_topic(Topic, Subs) -> + case maps:get(Topic, Subs, undefined) of + undefined -> undefined; + SubOpts -> {Topic, SubOpts} + end. + +find_sub_by_id(Id, Subs) -> + Found = maps:filter(fun(_, SubOpts) -> + %% FIXME: datatype?? + maps:get(id, maps:get(sub_props, SubOpts, #{}), -1) == Id + end, Subs), + case maps:to_list(Found) of + [] -> undefined; + [Sub|_] -> Sub + end. is_acl_enabled(_) -> %% TODO: configs from somewhere true. +%%-------------------------------------------------------------------- +%% helpers + default_user(#pstate{default_user = DefaultUser}) -> DefaultUser. allow_anonymous(#pstate{allow_anonymous = AllowAnonymous}) -> From fa2e97b1c5efb72dc4a07cfcf9242b5a7ddf312b Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 2 Nov 2021 13:41:57 +0800 Subject: [PATCH 05/29] chore(stomp): update appup.src --- apps/emqx_stomp/src/emqx_stomp.app.src | 2 +- apps/emqx_stomp/src/emqx_stomp.appup.src | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) create mode 100644 apps/emqx_stomp/src/emqx_stomp.appup.src diff --git a/apps/emqx_stomp/src/emqx_stomp.app.src b/apps/emqx_stomp/src/emqx_stomp.app.src index b03abdac1..87a0fd089 100644 --- a/apps/emqx_stomp/src/emqx_stomp.app.src +++ b/apps/emqx_stomp/src/emqx_stomp.app.src @@ -1,6 +1,6 @@ {application, emqx_stomp, [{description, "EMQ X Stomp Protocol Plugin"}, - {vsn, "4.3.0"}, % strict semver, bump manually! + {vsn, "4.3.1"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_stomp_sup]}, {applications, [kernel,stdlib]}, diff --git a/apps/emqx_stomp/src/emqx_stomp.appup.src b/apps/emqx_stomp/src/emqx_stomp.appup.src new file mode 100644 index 000000000..30cf9f908 --- /dev/null +++ b/apps/emqx_stomp/src/emqx_stomp.appup.src @@ -0,0 +1,8 @@ +%% -*- mode: erlang -*- +{"4.3.1", + [{"4.3.0", + [{restart_application,emqx_auth_http}]}, + {<<".*">>,[]}], + [{"4.3.0", + [{restart_application,emqx_auth_http}]}, + {<<".*">>,[]}]}. From cc6ea6e4dd8052746468aae2a3f7431204b0d6b8 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 2 Nov 2021 13:55:57 +0800 Subject: [PATCH 06/29] chore(stomp): remove needless properties --- apps/emqx_stomp/src/emqx_stomp.appup.src | 4 +-- apps/emqx_stomp/src/emqx_stomp_connection.erl | 29 +++++-------------- 2 files changed, 9 insertions(+), 24 deletions(-) diff --git a/apps/emqx_stomp/src/emqx_stomp.appup.src b/apps/emqx_stomp/src/emqx_stomp.appup.src index 30cf9f908..8a3e7e720 100644 --- a/apps/emqx_stomp/src/emqx_stomp.appup.src +++ b/apps/emqx_stomp/src/emqx_stomp.appup.src @@ -1,8 +1,8 @@ %% -*- mode: erlang -*- {"4.3.1", [{"4.3.0", - [{restart_application,emqx_auth_http}]}, + [{restart_application,emqx_stomp}]}, {<<".*">>,[]}], [{"4.3.0", - [{restart_application,emqx_auth_http}]}, + [{restart_application,emqx_stomp}]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl index a75c90c4d..6cf1371e6 100644 --- a/apps/emqx_stomp/src/emqx_stomp_connection.erl +++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl @@ -70,9 +70,13 @@ gc_state :: maybe(emqx_gc:gc_state()), %% Stats Timer stats_timer :: disabled | maybe(reference()), - - await_recv, parser, pstate, - proto_env, heartbeat}). + %% Parser State + parser :: emqx_stomp_frame:parser(), + %% Protocol State + pstate :: emqx_stomp_protocol:pstate(), + %% XXX: some common confs + proto_env :: list() + }). -type(state() :: #state{}). @@ -212,22 +216,6 @@ send(Data, Transport, Sock, ConnPid) -> heartbeat(Transport, Sock) -> Transport:send(Sock, <<$\n>>). -handle_call(info, _From, State = #state{transport = Transport, - socket = Sock, - peername = Peername, - await_recv = AwaitRecv, - sockstate = ConnState, - pstate = PState}) -> - ClientInfo = [{peername, Peername}, {await_recv, AwaitRecv}, - {sockstate, ConnState}], - ProtoInfo = emqx_stomp_protocol:info(PState), - case Transport:getstat(Sock, ?SOCK_STATS) of - {ok, SockStats} -> - {reply, lists:append([ClientInfo, ProtoInfo, SockStats]), State}; - {error, Reason} -> - {stop, Reason, lists:append([ClientInfo, ProtoInfo]), State} - end; - handle_call(discard, _From, State) -> %% TODO: send the DISCONNECT packet? shutdown_and_reply(discared, ok, State); @@ -283,9 +271,6 @@ handle_info({timeout, _TRef, emit_stats}, handle_info({timeout, TRef, TMsg}, State) -> with_proto(timeout, [TRef, TMsg], State); -handle_info({'EXIT', HbProc, Error}, State = #state{heartbeat = HbProc}) -> - stop(Error, State); - handle_info(activate_socket, State = #state{sockstate = OldSst}) -> case activate_socket(State) of {ok, NState = #state{sockstate = NewSst}} -> From 2c4d3d1d2498bf8d248b7940479880a12f292c66 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 2 Nov 2021 14:30:00 +0800 Subject: [PATCH 07/29] chore(stomp): fix dialyzer warnings --- apps/emqx_stomp/src/emqx_stomp_connection.erl | 24 ++++++++++++------- apps/emqx_stomp/src/emqx_stomp_protocol.erl | 19 +++++++++------ src/emqx_types.erl | 3 ++- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl index 6cf1371e6..5a8b7a92c 100644 --- a/apps/emqx_stomp/src/emqx_stomp_connection.erl +++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl @@ -26,8 +26,7 @@ -logger_header("[Stomp-Conn]"). -import(emqx_misc, - [ maybe_apply/2 - , start_timer/2 + [ start_timer/2 ]). -export([ start_link/3 @@ -92,6 +91,13 @@ -define(ENABLED(X), (X =/= undefined)). +-dialyzer({nowarn_function, [ ensure_stats_timer/2 + ]}). + +-dialyzer({no_return, [ init/1 + , init_state/3 + ]}). + start_link(Transport, Sock, ProtoEnv) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, ProtoEnv]])}. @@ -115,13 +121,7 @@ info(sockname, #state{sockname = Sockname}) -> info(sockstate, #state{sockstate = SockSt}) -> SockSt; info(active_n, #state{active_n = ActiveN}) -> - ActiveN; -info(stats_timer, #state{stats_timer = StatsTimer}) -> - StatsTimer; -info(limit_timer, #state{limit_timer = LimitTimer}) -> - LimitTimer; -info(limiter, #state{limiter = Limiter}) -> - maybe_apply(fun emqx_limiter:info/1, Limiter). + ActiveN. -spec stats(pid()|state()) -> emqx_types:stats(). stats(CPid) when is_pid(CPid) -> @@ -216,6 +216,12 @@ send(Data, Transport, Sock, ConnPid) -> heartbeat(Transport, Sock) -> Transport:send(Sock, <<$\n>>). +handle_call(info, _From, State) -> + {reply, info(State), State}; + +handle_call(stats, _From, State) -> + {reply, stats(State), State}; + handle_call(discard, _From, State) -> %% TODO: send the DISCONNECT packet? shutdown_and_reply(discared, ok, State); diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index c2851895b..eb628ce47 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -20,6 +20,7 @@ -include("emqx_stomp.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). @@ -55,7 +56,7 @@ %% Stomp ClientInfo clientinfo :: emqx_types:clientinfo(), %% Stomp Heartbeats - heart_beats :: emqx_stomp_hearbeat:heartbeat(), + heart_beats :: maybe(emqx_stomp_hearbeat:heartbeat()), %% Stomp Connection State connected = false, %% Timers @@ -65,13 +66,13 @@ %% Subscriptions subscriptions = #{}, %% Send function - sendfun :: function(), + sendfun :: {function(), list()}, %% Heartbeat function - heartfun :: function(), + heartfun :: {function(), list()}, %% The confs for the connection %% TODO: put these configs into a public mem? - allow_anonymous, - default_user + allow_anonymous :: maybe(boolean()), + default_user :: maybe(list()) }). -define(TIMER_TABLE, #{ @@ -96,6 +97,10 @@ awaiting_rel_max ]). +-dialyzer({nowarn_function, [ check_acl/3 + , init/2 + ]}). + -type(pstate() :: #pstate{}). %% @doc Init protocol @@ -417,7 +422,7 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, {error, dropped, State} end; -send(Frame, State = #pstate{sendfun = {Fun, Args}}) -> +send(Frame, State = #pstate{sendfun = {Fun, Args}}) when is_record(Frame, stomp_frame) -> ?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]), Data = emqx_stomp_frame:serialize(Frame), ?LOG(debug, "SEND ~p", [Data]), @@ -681,7 +686,7 @@ allow_anonymous(#pstate{allow_anonymous = AllowAnonymous}) -> AllowAnonymous. ensure_connected(State = #pstate{conninfo = ConnInfo, - clientinfo = ClientInfo}) -> + clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{ connected => true, connected_at => erlang:system_time(millisecond) diff --git a/src/emqx_types.erl b/src/emqx_types.erl index fbe62e4b2..0c15bd288 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -94,7 +94,8 @@ -type(ver() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 | ?MQTT_PROTO_V5 - | non_neg_integer()). + | non_neg_integer() + | binary()). -type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2). -type(qos_name() :: qos0 | at_most_once | From 14515e680e546e617b72b788b306143138233586 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 2 Nov 2021 21:35:34 +0800 Subject: [PATCH 08/29] fix(stomp): fix stats_timer not working --- apps/emqx_stomp/src/emqx_stomp_connection.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl index 5a8b7a92c..7ec58a6f6 100644 --- a/apps/emqx_stomp/src/emqx_stomp_connection.erl +++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl @@ -237,7 +237,9 @@ handle_cast(Msg, State) -> ?LOG(error, "unexpected msg: ~p", [Msg]), noreply(State). -handle_info({event, connected}, State = #state{pstate = PState}) -> +handle_info({event, Name}, State = #state{pstate = PState}) + when Name == connected; + Name == updated -> ClientId = emqx_stomp_protocol:info(clientid, PState), emqx_cm:insert_channel_info(ClientId, info(State), stats(State)), noreply(State); @@ -272,7 +274,7 @@ handle_info({timeout, _TRef, emit_stats}, State = #state{pstate = PState}) -> ClientId = emqx_stomp_protocol:info(clientid, PState), emqx_cm:set_chan_stats(ClientId, stats(State)), - {ok, State#state{stats_timer = undefined}}; + noreply(State#state{stats_timer = undefined}); handle_info({timeout, TRef, TMsg}, State) -> with_proto(timeout, [TRef, TMsg], State); @@ -296,7 +298,7 @@ handle_info({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> Oct = iolist_size(Data), inc_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), - ensure_stats_timer(?IDLE_TIMEOUT, received(Data, State)); + received(Data, ensure_stats_timer(?IDLE_TIMEOUT, State)); handle_info({Passive, _Sock}, State) when Passive == tcp_passive; Passive == ssl_passive -> From 0a7f04caa36f97da81a473b2a0a9356befd02ecc Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 2 Nov 2021 21:36:03 +0800 Subject: [PATCH 09/29] fix(stomp): enrich sub-opts if sub-id/ack absent --- apps/emqx_stomp/src/emqx_stomp_protocol.erl | 41 +++++++++++++++++---- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index eb628ce47..0dbfbb13b 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -75,6 +75,8 @@ default_user :: maybe(list()) }). +-define(DEFAULT_SUB_ACK, <<"auto">>). + -define(TIMER_TABLE, #{ incoming_timer => incoming, outgoing_timer => outgoing, @@ -287,12 +289,17 @@ received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers}, State = #pstate{subscriptions = Subs}) -> Id = header(<<"id">>, Headers), Topic = header(<<"destination">>, Headers), - Ack = header(<<"ack">>, Headers, <<"auto">>), + Ack = header(<<"ack">>, Headers, ?DEFAULT_SUB_ACK), case find_sub_by_id(Id, Subs) of - {Topic, #{sub_props := #{id := Id, ack := Ack}}} -> + {Topic, #{sub_props := #{id := Id}}} -> ?LOG(info, "Subscription has established: ~s", [Topic]), maybe_send_receipt(receipt_id(Headers), State); + {InuseTopic, #{sub_props := #{id := InuseId}}} -> + ?LOG(info, "Subscription id ~p inused by topic: ~s, " + "request topic: ~s", [InuseId, InuseTopic, Topic]), + send(error_frame(receipt_id(Headers), + ["Request sub-id ", Id, " inused "]), State); undefined -> case check_acl(subscribe, Topic, State) of allow -> @@ -466,8 +473,9 @@ timeout(_TRef, clean_trans, State = #pstate{transaction = Trans}) -> handle_info({subscribe, TopicFilters}, State) -> NState = lists:foldl( - fun({TopicFilter, SubOpts}, StateAcc) -> - do_subscribe(TopicFilter, SubOpts#{is_new => true}, StateAcc) + fun({TopicFilter, SubOpts}, StateAcc = #pstate{subscriptions = Subs}) -> + NSubOpts = enrich_sub_opts(SubOpts, Subs), + do_subscribe(TopicFilter, NSubOpts, StateAcc) end, State, parse_topic_filters(TopicFilters)), {ok, NState}; @@ -646,7 +654,7 @@ do_subscribe(TopicFilter, SubOpts, NSubOpts = SubOpts#{is_new => true}, _ = run_hooks('session.subscribed', [ClientInfo, TopicFilter, NSubOpts]), - + send_event_to_self(updated), State#pstate{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}. do_unsubscribe(TopicFilter, SubOpts, @@ -654,7 +662,7 @@ do_unsubscribe(TopicFilter, SubOpts, ok = emqx_broker:unsubscribe(TopicFilter), _ = run_hooks('session.unsubscribe', [ClientInfo, TopicFilter, SubOpts]), - + send_event_to_self(updated), State#pstate{subscriptions = maps:remove(TopicFilter, Subs)}. find_sub_by_topic(Topic, Subs) -> @@ -677,6 +685,21 @@ is_acl_enabled(_) -> %% TODO: configs from somewhere true. +%% automaticly fill the next sub-id and ack if sub-id is absent +enrich_sub_opts(SubOpts0, Subs) -> + SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0), + SubProps = maps:get(sub_props, SubOpts, #{}), + SubOpts#{sub_props => + maps:merge(#{id => next_sub_id(Subs), + ack => ?DEFAULT_SUB_ACK}, SubProps)}. + +next_sub_id(Subs) -> + Ids = maps:fold(fun(_, SubOpts, Acc) -> + [binary_to_integer( + maps:get(id, maps:get(sub_props, SubOpts, #{}), <<"0">>)) | Acc] + end, [], Subs), + integer_to_binary(lists:max(Ids) + 1). + %%-------------------------------------------------------------------- %% helpers @@ -691,8 +714,7 @@ ensure_connected(State = #pstate{conninfo = ConnInfo, connected => true, connected_at => erlang:system_time(millisecond) }, - %% send connected event - self() ! {event, connected}, + send_event_to_self(connected), ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), State#pstate{conninfo = NConnInfo, connected = true @@ -703,6 +725,9 @@ ensure_disconnected(Reason, State = #pstate{conninfo = ConnInfo, clientinfo = Cl ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), State#pstate{conninfo = NConnInfo, connected = false}. +send_event_to_self(Name) -> + self() ! {event, Name}, ok. + run_hooks(Name, Args) -> emqx_hooks:run(Name, Args). From af7b5704ab4ab26c94921c263d412d1d0a0d78eb Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 3 Nov 2021 13:29:03 +0800 Subject: [PATCH 10/29] fix(stomp): counting packets and messages --- apps/emqx_stomp/src/emqx_stomp_connection.erl | 36 ++++++++++++++++- apps/emqx_stomp/src/emqx_stomp_protocol.erl | 39 ++++++++++--------- 2 files changed, 55 insertions(+), 20 deletions(-) diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl index 7ec58a6f6..b2157c7ce 100644 --- a/apps/emqx_stomp/src/emqx_stomp_connection.erl +++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl @@ -205,7 +205,11 @@ exit_on_sock_error(timeout) -> exit_on_sock_error(Reason) -> erlang:exit({shutdown, Reason}). -send(Data, Transport, Sock, ConnPid) -> +send(Frame, Transport, Sock, ConnPid) -> + ?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]), + ok = inc_outgoing_stats(Frame), + Data = emqx_stomp_frame:serialize(Frame), + ?LOG(debug, "SEND ~p", [Data]), try Transport:async_send(Sock, Data) of ok -> ok; {error, Reason} -> ConnPid ! {shutdown, Reason} @@ -386,6 +390,7 @@ received(Bytes, State = #state{parser = Parser, noreply(State#state{parser = NewParser}); {ok, Frame, Rest} -> ?LOG(info, "RECV Frame: ~s", [emqx_stomp_frame:format(Frame)]), + ok = inc_incoming_stats(Frame), case emqx_stomp_protocol:received(Frame, PState) of {ok, PState1} -> received(Rest, reset_parser(State#state{pstate = PState1})); @@ -425,6 +430,35 @@ close_socket(State = #state{transport = Transport, socket = Socket}) -> ok = Transport:fast_close(Socket), State#state{sockstate = closed}. +%%-------------------------------------------------------------------- +%% Inc incoming/outgoing stats + +inc_incoming_stats(#stomp_frame{command = Cmd}) -> + inc_counter(recv_pkt, 1), + case Cmd of + <<"SEND">> -> + inc_counter(recv_msg, 1), + inc_counter(incoming_pubs, 1), + emqx_metrics:inc('messages.received'), + emqx_metrics:inc('messages.qos1.received'); + _ -> + ok + end, + emqx_metrics:inc('packets.received'). + +inc_outgoing_stats(#stomp_frame{command = Cmd}) -> + inc_counter(send_pkt, 1), + case Cmd of + <<"MESSAGE">> -> + inc_counter(send_msg, 1), + inc_counter(outgoing_pubs, 1), + emqx_metrics:inc('messages.sent'), + emqx_metrics:inc('messages.qos1.sent'); + _ -> + ok + end, + emqx_metrics:inc('packets.sent'). + %%-------------------------------------------------------------------- %% Ensure rate limit diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index 0dbfbb13b..06740430c 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -231,27 +231,24 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers}, default_user(State) ) of true -> - NLogin = case Login == undefined orelse Login == <<>> of - false -> Login; - true -> emqx_guid:to_base62(emqx_guid:gen()) - end, - emqx_logger:set_metadata_clientid(NLogin), + ClientId = emqx_guid:to_base62(emqx_guid:gen()), + emqx_logger:set_metadata_clientid(ClientId), ConnInfo = State#pstate.conninfo, ClitInfo = State#pstate.clientinfo, NConnInfo = ConnInfo#{ proto_ver => Version, - clientid => NLogin, - username => NLogin + clientid => ClientId, + username => Login }, NClitInfo = ClitInfo#{ - clientid => NLogin, - username => NLogin + clientid => ClientId, + username => Login }, ConnPid = self(), - _ = emqx_cm_locker:trans(NLogin, fun(_) -> - emqx_cm:discard_session(NLogin), - emqx_cm:register_channel(NLogin, ConnPid, NConnInfo) + _ = emqx_cm_locker:trans(ClientId, fun(_) -> + emqx_cm:discard_session(ClientId), + emqx_cm:register_channel(ClientId, ConnPid, NConnInfo) end), Heartbeats = parse_heartbeats(header(<<"heart-beat">>, Headers, <<"0,0">>)), NState = start_heartbeart_timer( @@ -406,8 +403,13 @@ received(#stomp_frame{command = <<"DISCONNECT">>, headers = Headers}, State) -> _ = maybe_send_receipt(receipt_id(Headers), State), {stop, normal, State}. -send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, - State = #pstate{subscriptions = Subs}) -> +send(Msg0 = #message{}, + State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) -> + ok = emqx_metrics:inc('messages.delivered'), + Msg = emqx_hooks:run_fold('message.delivered', [ClientInfo], Msg0), + #message{topic = Topic, + headers = Headers, + payload = Payload} = Msg, case find_sub_by_topic(Topic, Subs) of {Topic, #{sub_props := #{id := Id, ack := Ack}}} -> Headers0 = [{<<"subscription">>, Id}, @@ -423,6 +425,8 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, Frame = #stomp_frame{command = <<"MESSAGE">>, headers = Headers1 ++ maps:get(stomp_headers, Headers, []), body = Payload}, + + send(Frame, State); undefined -> ?LOG(error, "Stomp dropped: ~p", [Msg]), @@ -430,10 +434,7 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, end; send(Frame, State = #pstate{sendfun = {Fun, Args}}) when is_record(Frame, stomp_frame) -> - ?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]), - Data = emqx_stomp_frame:serialize(Frame), - ?LOG(debug, "SEND ~p", [Data]), - erlang:apply(Fun, [Data] ++ Args), + erlang:apply(Fun, [Frame] ++ Args), {ok, State}. shutdown(Reason, State = #pstate{connected = true}) -> @@ -453,7 +454,7 @@ timeout(_TRef, {incoming, NewVal}, timeout(_TRef, {outgoing, NewVal}, State = #pstate{heart_beats = HrtBt, - heartfun = {Fun, Args}}) -> + heartfun = {Fun, Args}}) -> case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of {error, timeout} -> _ = erlang:apply(Fun, Args), From f7760232e4950b13a1f5c35dc5e179933240d81f Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 3 Nov 2021 13:57:21 +0800 Subject: [PATCH 11/29] fix(stomp): parse heartbeat EOL frame --- apps/emqx_stomp/src/emqx_stomp_frame.erl | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/apps/emqx_stomp/src/emqx_stomp_frame.erl b/apps/emqx_stomp/src/emqx_stomp_frame.erl index fa9cb63a8..c4d19ae3a 100644 --- a/apps/emqx_stomp/src/emqx_stomp_frame.erl +++ b/apps/emqx_stomp/src/emqx_stomp_frame.erl @@ -126,6 +126,13 @@ parse(Bytes, #{phase := body, len := Len, state := State}) -> parse(Bytes, Parser = #{pre := Pre}) -> parse(<
>, maps:without([pre], Parser));
+parse(<>, Parser = #{phase := none}) ->
+    parse(Rest, Parser);
+parse(<>, Parser = #{phase := none}) ->
+    case byte_size(Rest) of
+        0 -> {more, Parser};
+        _ -> parse(Rest, Parser)
+    end;
 parse(<>, #{phase := Phase, state := State}) ->
     parse(Phase, <>, State);
 parse(<>, Parser) ->

From e5f30a4d289a6bf90806a2c58ce89d7737da67ec Mon Sep 17 00:00:00 2001
From: zhouzb 
Date: Wed, 3 Nov 2021 15:26:04 +0800
Subject: [PATCH 12/29] chore(version): skip the version occupied by enterprise

---
 apps/emqx_web_hook/src/emqx_web_hook.app.src | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/apps/emqx_web_hook/src/emqx_web_hook.app.src b/apps/emqx_web_hook/src/emqx_web_hook.app.src
index 2744bece7..e0632625f 100644
--- a/apps/emqx_web_hook/src/emqx_web_hook.app.src
+++ b/apps/emqx_web_hook/src/emqx_web_hook.app.src
@@ -1,6 +1,6 @@
 {application, emqx_web_hook,
  [{description, "EMQ X WebHook Plugin"},
-  {vsn, "4.3.6"}, % strict semver, bump manually!
+  {vsn, "4.3.7"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_web_hook_sup]},
   {applications, [kernel,stdlib,ehttpc]},

From 9faab7cc9b4b8c17d4c6f51760b5154cc4af21dd Mon Sep 17 00:00:00 2001
From: xiangfangyang-tech 
Date: Wed, 3 Nov 2021 16:00:02 +0800
Subject: [PATCH 13/29] chore(autotest): improve git_action script with
 emqx-fvt tag

---
 .github/workflows/run_automate_tests.yaml | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/.github/workflows/run_automate_tests.yaml b/.github/workflows/run_automate_tests.yaml
index e654e87c2..fb213c713 100644
--- a/.github/workflows/run_automate_tests.yaml
+++ b/.github/workflows/run_automate_tests.yaml
@@ -24,6 +24,7 @@ jobs:
           git config --global credential.helper store
           echo "${{ secrets.CI_GIT_TOKEN }}" >> scripts/git-token
           make deps-emqx-ee
+          make clean
         fi
         make docker
         echo "::set-output name=version::$(./pkg-vsn.sh)"
@@ -95,7 +96,7 @@ jobs:
     - uses: actions/checkout@v2
       with:
         repository: emqx/emqx-fvt
-        ref: integration_test_suites
+        ref: v1.4.0
         path: scripts
     - uses: actions/setup-java@v1
       with:
@@ -188,7 +189,7 @@ jobs:
     - uses: actions/checkout@v2
       with:
         repository: emqx/emqx-fvt
-        ref: integration_test_suites
+        ref: v1.4.0
         path: scripts
     - uses: actions/setup-java@v1
       with:
@@ -292,7 +293,7 @@ jobs:
     - uses: actions/checkout@v2
       with:
         repository: emqx/emqx-fvt
-        ref: integration_test_suites
+        ref: v1.4.0
         path: scripts
     - uses: actions/setup-java@v1
       with:
@@ -390,7 +391,7 @@ jobs:
     - uses: actions/checkout@v2
       with:
         repository: emqx/emqx-fvt
-        ref: integration_test_suites
+        ref: v1.4.0
         path: scripts
     - uses: actions/setup-java@v1
       with:

From e4e8590a77330a08ca06e9689cbfd360a59fa5a0 Mon Sep 17 00:00:00 2001
From: JianBo He 
Date: Wed, 3 Nov 2021 20:31:10 +0800
Subject: [PATCH 14/29] fix(stomp): backoff outgoung hear-beat timer interval

---
 apps/emqx_stomp/src/emqx_stomp_connection.erl | 11 ++++++-
 apps/emqx_stomp/src/emqx_stomp_heartbeat.erl  | 29 ++++++++++++++-----
 apps/emqx_stomp/src/emqx_stomp_protocol.erl   | 29 ++++++++++++++++---
 3 files changed, 56 insertions(+), 13 deletions(-)

diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl
index b2157c7ce..63e76faed 100644
--- a/apps/emqx_stomp/src/emqx_stomp_connection.erl
+++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl
@@ -43,7 +43,7 @@
         ]).
 
 %% for protocol
--export([send/4, heartbeat/2]).
+-export([send/4, heartbeat/2, statfun/3]).
 
 %% for mgmt
 -export([call/2, call/3]).
@@ -157,6 +157,7 @@ init_state(Transport, Socket, ProtoEnv) ->
     {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
 
     SendFun = {fun ?MODULE:send/4, [Transport, Socket, self()]},
+    StatFun = {fun ?MODULE:statfun/3, [Transport, Socket]},
     HrtBtFun = {fun ?MODULE:heartbeat/2, [Transport, Socket]},
     Parser = emqx_stomp_frame:init_parer_state(ProtoEnv),
 
@@ -168,6 +169,7 @@ init_state(Transport, Socket, ProtoEnv) ->
                  peername => Peername,
                  sockname => Sockname,
                  peercert => Peercert,
+                 statfun  => StatFun,
                  sendfun  => SendFun,
                  heartfun => HrtBtFun,
                  conn_mod => ?MODULE
@@ -218,8 +220,15 @@ send(Frame, Transport, Sock, ConnPid) ->
     end.
 
 heartbeat(Transport, Sock) ->
+    ?LOG(debug, "SEND heartbeat: \\n"),
     Transport:send(Sock, <<$\n>>).
 
+statfun(Stat, Transport, Sock) ->
+    case Transport:getstat(Sock, [Stat]) of
+        {ok, [{Stat, Val}]} -> {ok, Val};
+        {error, Error}      -> {error, Error}
+    end.
+
 handle_call(info, _From, State) ->
     {reply, info(State), State};
 
diff --git a/apps/emqx_stomp/src/emqx_stomp_heartbeat.erl b/apps/emqx_stomp/src/emqx_stomp_heartbeat.erl
index 2a221ad68..4097756fe 100644
--- a/apps/emqx_stomp/src/emqx_stomp_heartbeat.erl
+++ b/apps/emqx_stomp/src/emqx_stomp_heartbeat.erl
@@ -23,9 +23,10 @@
         , check/3
         , info/1
         , interval/2
+        , reset/3
         ]).
 
--record(heartbeater, {interval, statval, repeat}).
+-record(heartbeater, {interval, statval, repeat, repeat_max}).
 
 -type name() :: incoming | outgoing.
 
@@ -42,19 +43,23 @@ init({0, 0}) ->
     #{};
 init({Cx, Cy}) ->
     maps:filter(fun(_, V) -> V /= undefined end,
-      #{incoming => heartbeater(Cx),
-        outgoing => heartbeater(Cy)
+      #{incoming => heartbeater(incoming, Cx),
+        outgoing => heartbeater(outgoing, Cy)
        }).
 
-heartbeater(0) ->
+heartbeater(_, 0) ->
     undefined;
-heartbeater(I) ->
+heartbeater(N, I) ->
     #heartbeater{
        interval = I,
        statval = 0,
-       repeat = 0
+       repeat = 0,
+       repeat_max = repeat_max(N)
       }.
 
+repeat_max(incoming) -> 1;
+repeat_max(outgoing) -> 0.
+
 -spec check(name(), pos_integer(), heartbeat())
     -> {ok, heartbeat()}
      | {error, timeout}.
@@ -67,11 +72,12 @@ check(Name, NewVal, HrtBt) ->
     end.
 
 check(NewVal, HrtBter = #heartbeater{statval = OldVal,
-                                     repeat = Repeat}) ->
+                                     repeat = Repeat,
+                                     repeat_max = Max}) ->
     if
         NewVal =/= OldVal ->
             {ok, HrtBter#heartbeater{statval = NewVal, repeat = 0}};
-        Repeat < 1 ->
+        Repeat < Max ->
             {ok, HrtBter#heartbeater{repeat = Repeat + 1}};
         true -> {error, timeout}
     end.
@@ -89,3 +95,10 @@ interval(Type, HrtBt) ->
         undefined -> undefined;
         #heartbeater{interval = Intv} -> Intv
     end.
+
+reset(Type, StatVal, HrtBt) ->
+    case maps:get(Type, HrtBt, undefined) of
+        undefined -> HrtBt;
+        HrtBter ->
+            HrtBt#{Type => HrtBter#heartbeater{statval = StatVal, repeat = 0}}
+    end.
diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl
index 06740430c..fc211be10 100644
--- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl
+++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl
@@ -69,6 +69,8 @@
           sendfun :: {function(), list()},
           %% Heartbeat function
           heartfun :: {function(), list()},
+          %% Get Socket stat function
+          statfun :: {function(), list()},
           %% The confs for the connection
           %% TODO: put these configs into a public mem?
           allow_anonymous :: maybe(boolean()),
@@ -77,6 +79,9 @@
 
 -define(DEFAULT_SUB_ACK, <<"auto">>).
 
+-define(INCOMING_TIMER_BACKOFF, 1.25).
+-define(OUTCOMING_TIMER_BACKOFF, 0.75).
+
 -define(TIMER_TABLE, #{
           incoming_timer => incoming,
           outgoing_timer => outgoing,
@@ -108,7 +113,8 @@
 %% @doc Init protocol
 init(ConnInfo = #{peername := {PeerHost, _Port},
                   sockname := {_Host, SockPort},
-                  sendfun := SendFun,
+                  statfun  := StatFun,
+                  sendfun  := SendFun,
                   heartfun := HeartFun}, Opts) ->
 
     NConnInfo = default_conninfo(ConnInfo),
@@ -132,6 +138,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
        clientinfo = ClientInfo,
        heartfun = HeartFun,
        sendfun = SendFun,
+       statfun = StatFun,
        timers = #{},
        transaction = #{},
        allow_anonymous = AllowAnonymous,
@@ -231,6 +238,8 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
                              default_user(State)
                             ) of
                 true ->
+                    Heartbeats = parse_heartbeats(
+                                   header(<<"heart-beat">>, Headers, <<"0,0">>)),
                     ClientId = emqx_guid:to_base62(emqx_guid:gen()),
                     emqx_logger:set_metadata_clientid(ClientId),
                     ConnInfo = State#pstate.conninfo,
@@ -238,6 +247,7 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
                     NConnInfo = ConnInfo#{
                                   proto_ver => Version,
                                   clientid => ClientId,
+                                  keepalive => element(1, Heartbeats) div 1000,
                                   username => Login
                                  },
                     NClitInfo = ClitInfo#{
@@ -250,7 +260,6 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
                         emqx_cm:discard_session(ClientId),
                         emqx_cm:register_channel(ClientId, ConnPid, NConnInfo)
                     end),
-                    Heartbeats = parse_heartbeats(header(<<"heart-beat">>, Headers, <<"0,0">>)),
                     NState = start_heartbeart_timer(
                                Heartbeats,
                                State#pstate{
@@ -454,11 +463,18 @@ timeout(_TRef, {incoming, NewVal},
 
 timeout(_TRef, {outgoing, NewVal},
         State = #pstate{heart_beats = HrtBt,
+                        statfun = {StatFun, StatArgs},
                         heartfun = {Fun, Args}}) ->
     case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of
         {error, timeout} ->
             _ = erlang:apply(Fun, Args),
-            {ok, State};
+            case erlang:apply(StatFun, [send_oct] ++ StatArgs) of
+                {ok, NewVal2} ->
+                    NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal2, HrtBt),
+                    {ok, reset_timer(outgoing_timer, State#pstate{heart_beats = NHrtBt})};
+                {error, Reason} ->
+                    {shutdown, {error, {get_stats_error, Reason}}, State}
+            end;
         {ok, NHrtBt} ->
             {ok, reset_timer(outgoing_timer, State#pstate{heart_beats = NHrtBt})}
     end;
@@ -633,7 +649,11 @@ reverse_heartbeats({Cx, Cy}) ->
 start_heartbeart_timer(Heartbeats, State) ->
     ensure_timer(
       [incoming_timer, outgoing_timer],
-      State#pstate{heart_beats = emqx_stomp_heartbeat:init(Heartbeats)}).
+      State#pstate{heart_beats = emqx_stomp_heartbeat:init(backoff(Heartbeats))}).
+
+backoff({Cx, Cy}) ->
+    {erlang:ceil(Cx * ?INCOMING_TIMER_BACKOFF),
+     erlang:ceil(Cy * ?OUTCOMING_TIMER_BACKOFF)}.
 
 %%--------------------------------------------------------------------
 %% pub & sub helpers
@@ -768,3 +788,4 @@ interval(outgoing_timer, #pstate{heart_beats = HrtBt}) ->
     emqx_stomp_heartbeat:interval(outgoing, HrtBt);
 interval(clean_trans_timer, _) ->
     ?TRANS_TIMEOUT.
+

From 981f74d4588d6633682eac66505c0cb4e7190fd8 Mon Sep 17 00:00:00 2001
From: JianBo He 
Date: Wed, 3 Nov 2021 21:12:20 +0800
Subject: [PATCH 15/29] test(stomp): refine stomp test cases

---
 apps/emqx_stomp/test/emqx_stomp_SUITE.erl           | 2 +-
 apps/emqx_stomp/test/emqx_stomp_heartbeat_SUITE.erl | 3 +--
 2 files changed, 2 insertions(+), 3 deletions(-)

diff --git a/apps/emqx_stomp/test/emqx_stomp_SUITE.erl b/apps/emqx_stomp/test/emqx_stomp_SUITE.erl
index 9a5d9698e..c8ab88311 100644
--- a/apps/emqx_stomp/test/emqx_stomp_SUITE.erl
+++ b/apps/emqx_stomp/test/emqx_stomp_SUITE.erl
@@ -100,7 +100,7 @@ t_heartbeat(_) ->
                                                      {<<"host">>, <<"127.0.0.1:61613">>},
                                                      {<<"login">>, <<"guest">>},
                                                      {<<"passcode">>, <<"guest">>},
-                                                     {<<"heart-beat">>, <<"1000,800">>}])),
+                                                     {<<"heart-beat">>, <<"1000,2000">>}])),
                         {ok, Data} = gen_tcp:recv(Sock, 0),
                         {ok, #stomp_frame{command = <<"CONNECTED">>,
                                           headers = _,
diff --git a/apps/emqx_stomp/test/emqx_stomp_heartbeat_SUITE.erl b/apps/emqx_stomp/test/emqx_stomp_heartbeat_SUITE.erl
index cbced5f43..b0ce378fd 100644
--- a/apps/emqx_stomp/test/emqx_stomp_heartbeat_SUITE.erl
+++ b/apps/emqx_stomp/test/emqx_stomp_heartbeat_SUITE.erl
@@ -35,8 +35,7 @@ t_check_1(_) ->
     {ok, HrtBt1} = emqx_stomp_heartbeat:check(incoming, 0, HrtBt),
     {error, timeout} = emqx_stomp_heartbeat:check(incoming, 0, HrtBt1),
 
-    {ok, HrtBt2} = emqx_stomp_heartbeat:check(outgoing, 0, HrtBt1),
-    {error, timeout} = emqx_stomp_heartbeat:check(outgoing, 0, HrtBt2),
+    {error, timeout} = emqx_stomp_heartbeat:check(outgoing, 0, HrtBt1),
     ok.
 
 t_check_2(_) ->

From f36abc281ad667ca6cfb05d20d86bdf57d082da4 Mon Sep 17 00:00:00 2001
From: JianBo He 
Date: Thu, 4 Nov 2021 09:11:02 +0800
Subject: [PATCH 16/29] chore(types): add comment for ver type

---
 src/emqx_types.erl | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/emqx_types.erl b/src/emqx_types.erl
index 0c15bd288..3e53eafd1 100644
--- a/src/emqx_types.erl
+++ b/src/emqx_types.erl
@@ -95,7 +95,9 @@
              | ?MQTT_PROTO_V4
              | ?MQTT_PROTO_V5
              | non_neg_integer()
-             | binary()).
+             %% Some non-MQTT versions of protocol may be a binary type
+             | binary()
+             ).
 
 -type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2).
 -type(qos_name() :: qos0 | at_most_once |

From d7aec58370f67a0f8a0f6984a03ed75e6b3e1b2f Mon Sep 17 00:00:00 2001
From: zhanghongtong 
Date: Thu, 4 Nov 2021 11:27:36 +0800
Subject: [PATCH 17/29] ci(relup): fix old vsn error

Signed-off-by: zhanghongtong 
---
 .github/workflows/run_fvt_tests.yaml | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/run_fvt_tests.yaml b/.github/workflows/run_fvt_tests.yaml
index 2215b9188..80fced0e4 100644
--- a/.github/workflows/run_fvt_tests.yaml
+++ b/.github/workflows/run_fvt_tests.yaml
@@ -223,7 +223,7 @@ jobs:
         - name: Generate matrix
           id: generate-matrix
           run: |
-            matrix=$(echo -n "$OLD_VSNS" | jq -R -s -c 'split(" ")')
+            matrix=$(echo -n "$OLD_VSNS" | sed 's/ $//g' | jq -R -s -c 'split(" ")')
             echo "::set-output name=matrix::$matrix"
 
     relup_test_build:
@@ -275,6 +275,7 @@ jobs:
         runs-on: ubuntu-20.04
         container: emqx/relup-test-env:erl23.2.7.2-emqx-2-ubuntu20.04
         strategy:
+          fail-fast: false
           matrix:
             old_vsn: ${{ fromJson(needs.relup_test_plan.outputs.matrix) }}
         env:

From a406c4f4708def050ae5b10f310a97872862ac68 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Thu, 4 Nov 2021 16:30:46 +0800
Subject: [PATCH 18/29] fix(ekka): add timeout to rpc:multicall/4

---
 rebar.config | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/rebar.config b/rebar.config
index c49733535..4994d54ae 100644
--- a/rebar.config
+++ b/rebar.config
@@ -43,7 +43,7 @@
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
-    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.3"}}}
+    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.4"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
     , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}}

From ef9fe12825eee72106f30b1c5935caae31979462 Mon Sep 17 00:00:00 2001
From: JianBo He 
Date: Thu, 4 Nov 2021 21:11:12 +0800
Subject: [PATCH 19/29] fix(stomp): fix bad_return_value

---
 apps/emqx_stomp/src/emqx_stomp_connection.erl | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl
index 63e76faed..d2bf3d8b0 100644
--- a/apps/emqx_stomp/src/emqx_stomp_connection.erl
+++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl
@@ -292,13 +292,10 @@ handle_info({timeout, _TRef, emit_stats},
 handle_info({timeout, TRef, TMsg}, State) ->
     with_proto(timeout, [TRef, TMsg], State);
 
-handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
+handle_info(activate_socket, State) ->
     case activate_socket(State) of
-        {ok, NState = #state{sockstate = NewSst}} ->
-            case OldSst =/= NewSst of
-                true -> {ok, {event, NewSst}, NState};
-                false -> {ok, NState}
-            end;
+        {ok, NState} ->
+            noreply(NState);
         {error, Reason} ->
             handle_info({sock_error, Reason}, State)
     end;

From 27afecb3acaf4ebced6e1d2c7a006ba5fd12716b Mon Sep 17 00:00:00 2001
From: k32 <10274441+k32@users.noreply.github.com>
Date: Thu, 4 Nov 2021 13:45:29 +0100
Subject: [PATCH 20/29] fix(emqx_connection): Introduce backpressure while
 sending data

Fixes #5494
---
 src/emqx_connection.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl
index ab91c02b4..b8d1c485d 100644
--- a/src/emqx_connection.erl
+++ b/src/emqx_connection.erl
@@ -702,7 +702,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
     ok = emqx_metrics:inc('bytes.sent', Oct),
     inc_counter(outgoing_bytes, Oct),
     emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
-    case Transport:async_send(Socket, IoData, [nosuspend]) of
+    case Transport:async_send(Socket, IoData, []) of
         ok -> ok;
         Error = {error, _Reason} ->
             %% Send an inet_reply to postpone handling the error

From 37edb03866f32f759cb09e1a66ea8c8828a8fca7 Mon Sep 17 00:00:00 2001
From: Zaiming Shi 
Date: Thu, 4 Nov 2021 18:05:30 +0100
Subject: [PATCH 21/29] build: fix elvis check and ensure newline at EOF

---
 ...{elvis_lint.yaml => code_style_check.yaml} | 14 +++++--
 scripts/elvis-check.sh                        | 40 +++++++++++++++----
 2 files changed, 43 insertions(+), 11 deletions(-)
 rename .github/workflows/{elvis_lint.yaml => code_style_check.yaml} (52%)

diff --git a/.github/workflows/elvis_lint.yaml b/.github/workflows/code_style_check.yaml
similarity index 52%
rename from .github/workflows/elvis_lint.yaml
rename to .github/workflows/code_style_check.yaml
index 1fdbeba87..959512c20 100644
--- a/.github/workflows/elvis_lint.yaml
+++ b/.github/workflows/code_style_check.yaml
@@ -1,4 +1,4 @@
-name: Elvis Linter
+name: Code style check
 
 on: [pull_request]
 
@@ -7,10 +7,18 @@ jobs:
     runs-on: ubuntu-20.04
     steps:
       - uses: actions/checkout@v2
+        with:
+          fetch-depth: 1000
       - name: Set git token
         if: endsWith(github.repository, 'enterprise')
         run: |
           echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
           git config --global credential.helper store
-      - run: |
-          ./scripts/elvis-check.sh $GITHUB_BASE_REF
+      - name: Run elvis check
+        run: |
+          set -e
+          if [ -f EMQX_ENTERPRISE ]; then
+            ./scripts/elvis-check.sh $GITHUB_BASE_REF emqx-enterprise
+          else
+            ./scripts/elvis-check.sh $GITHUB_BASE_REF emqx
+          fi
diff --git a/scripts/elvis-check.sh b/scripts/elvis-check.sh
index 5fe482865..264200d6b 100755
--- a/scripts/elvis-check.sh
+++ b/scripts/elvis-check.sh
@@ -5,16 +5,16 @@
 
 set -euo pipefail
 
-ELVIS_VERSION='1.0.0-emqx-2'
+elvis_version='1.0.0-emqx-2'
 
 base="${1:-}"
+repo="${2:-emqx/emqx}"
+REPO="${GITHUB_REPOSITORY:-${repo}}"
 if [ "${base}" = "" ]; then
     echo "Usage $0 "
     exit 1
 fi
 
-elvis_version="${2:-$ELVIS_VERSION}"
-
 echo "elvis -v: $elvis_version"
 echo "git diff base: $base"
 
@@ -27,11 +27,7 @@ if [[ "$base" =~ [0-9a-f]{8,40} ]]; then
     # base is a commit sha1
     compare_base="$base"
 else
-    if [[ $CI == true ]];then
-        remote="$(git remote -v | grep -E "github\.com(.|/)$GITHUB_REPOSITORY" | grep fetch | awk '{print $1}')"
-    else
-        remote="$(git remote -v | grep -E 'github\.com(.|/)emqx' | grep fetch | awk '{print $1}')"
-    fi
+    remote="$(git remote -v | grep -E "github\.com(:|/)$REPO((\.git)|(\s))" | grep fetch | awk '{print $1}')"
     git fetch "$remote" "$base"
     compare_base="$remote/$base"
 fi
@@ -58,3 +54,31 @@ if [ $bad_file_count -gt 0 ]; then
     echo "elvis: $bad_file_count errors"
     exit 1
 fi
+
+### now check new-line at EOF for changed files
+
+nl_at_eof() {
+    local file="$1"
+    if ! [ -f "$file" ]; then
+        return
+    fi
+    case "$file" in
+        *.png|*rebar3)
+            return
+            ;;
+    esac
+    local lastbyte
+    lastbyte="$(tail -c 1 "$file" 2>&1)"
+    if [ "$lastbyte" != '' ]; then
+        echo "$file"
+        return 1
+    fi
+}
+
+for file in $(git_diff); do
+    if ! nl_at_eof "$file"; then
+        bad_file_count=$(( bad_file_count  + 1 ))
+    fi
+done
+
+exit $bad_file_count

From f5a2421fdb7b974447c383a8c7609d31beb3309f Mon Sep 17 00:00:00 2001
From: Zaiming Shi 
Date: Mon, 8 Nov 2021 13:25:28 +0100
Subject: [PATCH 22/29] fix(bin/emqx): ensure NAME is set

---
 bin/emqx | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/bin/emqx b/bin/emqx
index 0662d4a46..e68c1f8c2 100755
--- a/bin/emqx
+++ b/bin/emqx
@@ -27,7 +27,6 @@ export EMU="beam"
 export PROGNAME="erl"
 DYNLIBS_DIR="$RUNNER_ROOT_DIR/dynlibs"
 ERTS_LIB_DIR="$ERTS_DIR/../lib"
-MNESIA_DATA_DIR="$RUNNER_DATA_DIR/mnesia/$NAME"
 
 # Echo to stderr on errors
 echoerr() { echo "$*" 1>&2; }
@@ -409,6 +408,7 @@ case $NAME in
         NAME=$NAME@$(relx_get_nodename)
         ;;
 esac
+MNESIA_DATA_DIR="$RUNNER_DATA_DIR/mnesia/$NAME"
 
 # Check the first argument for instructions
 case "$1" in

From 7d07e8d9484a842eff585c3385f66c22f41dbf53 Mon Sep 17 00:00:00 2001
From: k32 <10274441+k32@users.noreply.github.com>
Date: Mon, 8 Nov 2021 14:24:37 +0100
Subject: [PATCH 23/29] chore(emqx): Update version and appup file

---
 scripts/update_appup.escript | 23 ++++++-----------
 src/emqx.app.src             |  2 +-
 src/emqx.appup.src           | 48 ++++++++++++++++++++++++------------
 3 files changed, 40 insertions(+), 33 deletions(-)

diff --git a/scripts/update_appup.escript b/scripts/update_appup.escript
index c8061f9de..8eb1c7967 100755
--- a/scripts/update_appup.escript
+++ b/scripts/update_appup.escript
@@ -21,12 +21,11 @@ support other repos too.
 
 Usage:
 
-   update_appup.escript [--check] [--repo URL] [--remote NAME] [--skip-build] [--make-commad SCRIPT] [--release-dir DIR] 
+   update_appup.escript [--check] [--repo URL] [--remote NAME] [--skip-build] [--make-commad SCRIPT] [--release-dir DIR] 
 
 Options:
 
   --check           Don't update the appfile, just check that they are complete
-  --prev-tag        Specify the previous release tag. Otherwise the previous patch version is used
   --repo            Upsteam git repo URL
   --remote          Get upstream repo URL from the specified git remote
   --skip-build      Don't rebuild the releases. May produce wrong results
@@ -34,7 +33,7 @@ Options:
   --release-dir     Release directory
   --src-dirs        Directories where source code is found. Defaults to '{src,apps,lib-*}/**/'
   --binary-rel-url  Binary release URL pattern. %TAG% variable is substituted with the release tag.
-                    E.g. \"https://github.com/emqx/emqx/releases/download/v4.3.8/emqx-centos7-%TAG%-amd64.zip\"
+                    E.g. \"https://github.com/emqx/emqx/releases/download/v%TAG%/emqx-centos7-%TAG%-amd64.zip\"
 ".
 
 -record(app,
@@ -60,18 +59,12 @@ ignored_apps() ->
     [emqx_dashboard, emqx_management] ++ otp_standard_apps().
 
 main(Args) ->
-    #{current_release := CurrentRelease} = Options = parse_args(Args, default_options()),
+    #{prev_tag := Baseline} = Options = parse_args(Args, default_options()),
     init_globals(Options),
-    case find_prev_tag(CurrentRelease) of
-        {ok, Baseline} ->
-            main(Options, Baseline);
-        undefined ->
-            log("No appup update is needed for this release, nothing to be done~n", []),
-            ok
-    end.
+    main(Options, Baseline).
 
-parse_args([CurrentRelease = [A|_]], State) when A =/= $- ->
-    State#{current_release => CurrentRelease};
+parse_args([PrevTag = [A|_]], State) when A =/= $- ->
+    State#{prev_tag => PrevTag};
 parse_args(["--check"|Rest], State) ->
     parse_args(Rest, State#{check => true});
 parse_args(["--skip-build"|Rest], State) ->
@@ -86,8 +79,6 @@ parse_args(["--release-dir", Dir|Rest], State) ->
     parse_args(Rest, State#{beams_dir => Dir});
 parse_args(["--src-dirs", Pattern|Rest], State) ->
     parse_args(Rest, State#{src_dirs => Pattern});
-parse_args(["--prev-tag", Tag|Rest], State) ->
-    parse_args(Rest, State#{prev_tag => Tag});
 parse_args(["--binary-rel-url", URL|Rest], State) ->
     parse_args(Rest, State#{binary_rel_url => {ok, URL}});
 parse_args(_, _) ->
@@ -163,7 +154,7 @@ download_prev_release(Tag, #{binary_rel_url := {ok, URL0}, clone_url := Repo}) -
     Dir = filename:basename(Repo, ".git") ++ [$-|Tag],
     Filename = filename:join(BaseDir, Dir),
     Script = "mkdir -p ${OUTFILE} &&
-              { [ -f ${OUTFILE}.zip ] || wget -O ${OUTFILE}.zip ${URL}; } &&
+              wget -O ${OUTFILE}.zip ${URL} &&
               unzip -n -d ${OUTFILE} ${OUTFILE}.zip",
     Env = [{"TAG", Tag}, {"OUTFILE", Filename}, {"URL", URL}],
     bash(Script, Env),
diff --git a/src/emqx.app.src b/src/emqx.app.src
index 5fbb99c9a..a107c7ed4 100644
--- a/src/emqx.app.src
+++ b/src/emqx.app.src
@@ -1,7 +1,7 @@
 {application, emqx,
  [{id, "emqx"},
   {description, "EMQ X"},
-  {vsn, "4.3.10"}, % strict semver, bump manually!
+  {vsn, "4.3.11"}, % strict semver, bump manually!
   {modules, []},
   {registered, []},
   {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},
diff --git a/src/emqx.appup.src b/src/emqx.appup.src
index 326c0aaf0..b826775d6 100644
--- a/src/emqx.appup.src
+++ b/src/emqx.appup.src
@@ -1,7 +1,9 @@
 %% -*- mode: erlang -*-
 {VSN,
-  [{"4.3.9",
-    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+  [{"4.3.10",[{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
+   {"4.3.9",
+    [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
@@ -10,7 +12,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.8",
-    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
@@ -19,7 +22,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.7",
-    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@@ -30,7 +34,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.6",
-    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@@ -42,7 +47,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.5",
-    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
@@ -55,7 +61,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.4",
-    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
@@ -69,7 +76,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.3",
-    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
      {load_module,emqx_misc,brutal_purge,soft_purge,[]},
      {load_module,emqx_packet,brutal_purge,soft_purge,[]},
@@ -147,8 +155,10 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {<<".*">>,[]}],
-  [{"4.3.9",
-    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+  [{"4.3.10",[{load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
+   {"4.3.9",
+    [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
@@ -157,7 +167,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.8",
-    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
@@ -166,7 +177,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.7",
-    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
@@ -177,7 +189,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.6",
-    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_cm,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
@@ -189,7 +202,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.5",
-    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
@@ -202,7 +216,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.4",
-    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
@@ -216,7 +231,8 @@
      {load_module,emqx_rpc,brutal_purge,soft_purge,[]},
      {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
    {"4.3.3",
-    [{load_module,emqx_channel,brutal_purge,soft_purge,[]},
+    [{load_module,emqx_connection,brutal_purge,soft_purge,[]},
+     {load_module,emqx_channel,brutal_purge,soft_purge,[]},
      {load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
      {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},

From 325c5e5a97d907a05bd6154a58aa6df744462971 Mon Sep 17 00:00:00 2001
From: Zaiming Shi 
Date: Fri, 5 Nov 2021 13:05:34 +0100
Subject: [PATCH 24/29] chore: sync ce code added only to ee back to ce

---
 apps/emqx_auth_pgsql/rebar.config             |  2 +-
 apps/emqx_exproto/src/emqx_exproto.app.src    |  2 +-
 apps/emqx_exproto/src/emqx_exproto.appup.src  | 14 ++++-----
 .../emqx_exproto/src/emqx_exproto_channel.erl | 30 ++++++++++++++-----
 apps/emqx_exproto/src/emqx_exproto_conn.erl   |  6 +++-
 apps/emqx_stomp/src/emqx_stomp_connection.erl |  6 ++--
 6 files changed, 39 insertions(+), 21 deletions(-)

diff --git a/apps/emqx_auth_pgsql/rebar.config b/apps/emqx_auth_pgsql/rebar.config
index 3155bbef3..e1a1c752c 100644
--- a/apps/emqx_auth_pgsql/rebar.config
+++ b/apps/emqx_auth_pgsql/rebar.config
@@ -1,5 +1,5 @@
 {deps,
- [{epgsql, {git, "https://github.com/epgsql/epgsql", {tag, "4.4.0"}}}
+ [{epgsql, {git, "https://github.com/epgsql/epgsql.git", {tag, "4.4.0"}}}
  ]}.
 
 {erl_opts, [warn_unused_vars,
diff --git a/apps/emqx_exproto/src/emqx_exproto.app.src b/apps/emqx_exproto/src/emqx_exproto.app.src
index 3157fb482..e2c674779 100644
--- a/apps/emqx_exproto/src/emqx_exproto.app.src
+++ b/apps/emqx_exproto/src/emqx_exproto.app.src
@@ -1,6 +1,6 @@
 {application, emqx_exproto,
  [{description, "EMQ X Extension for Protocol"},
-  {vsn, "4.3.2"}, %% strict semver
+  {vsn, "4.3.3"}, %% strict semver
   {modules, []},
   {registered, []},
   {mod, {emqx_exproto_app, []}},
diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src
index abfebbd6f..b5cd29869 100644
--- a/apps/emqx_exproto/src/emqx_exproto.appup.src
+++ b/apps/emqx_exproto/src/emqx_exproto.appup.src
@@ -1,12 +1,11 @@
 %% -*-: erlang -*-
 {VSN,
  [
-    {"4.3.1", [
-      {load_module, emqx_exproto_gsvr, brutal_purge, soft_purge, []},
-      {load_module, emqx_exproto_gcli, brutal_purge, soft_purge, []},
+    {"4.3.2", [
+      {load_module, emqx_exproto_conn, brutal_purge, soft_purge, []},
       {load_module, emqx_exproto_channel, brutal_purge, soft_purge, []}
     ]},
-    {"4.3.0", [
+    {<<"4.3.[0-1]">>, [
       {load_module, emqx_exproto_gsvr, brutal_purge, soft_purge, []},
       {load_module, emqx_exproto_gcli, brutal_purge, soft_purge, []},
       {load_module, emqx_exproto_conn, brutal_purge, soft_purge, []},
@@ -15,12 +14,11 @@
     {<<".*">>, []}
  ],
  [
-    {"4.3.1", [
-      {load_module, emqx_exproto_gsvr, brutal_purge, soft_purge, []},
-      {load_module, emqx_exproto_gcli, brutal_purge, soft_purge, []},
+    {"4.3.2", [
+      {load_module, emqx_exproto_conn, brutal_purge, soft_purge, []},
       {load_module, emqx_exproto_channel, brutal_purge, soft_purge, []}
     ]},
-    {"4.3.0", [
+    {<<"4.3.[0-1]">>, [
       {load_module, emqx_exproto_gsvr, brutal_purge, soft_purge, []},
       {load_module, emqx_exproto_gcli, brutal_purge, soft_purge, []},
       {load_module, emqx_exproto_conn, brutal_purge, soft_purge, []},
diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl
index ba6205c9a..4c9dca5d5 100644
--- a/apps/emqx_exproto/src/emqx_exproto_channel.erl
+++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl
@@ -94,6 +94,9 @@
          awaiting_rel_max
         ]).
 
+-define(CHANMOCK(P), {exproto_anonymous_client, P}).
+-define(CHAN_CONN_TAB, emqx_channel_conn).
+
 %%--------------------------------------------------------------------
 %% Info, Attrs and Caps
 %%--------------------------------------------------------------------
@@ -155,13 +158,20 @@ init(ConnInfo = #{socktype := Socktype,
                        conn_state = connecting,
                        timers = #{}
                       },
-
-    Req = #{conninfo =>
-            peercert(Peercert,
-                     #{socktype => socktype(Socktype),
-                       peername => address(Peername),
-                       sockname => address(Sockname)})},
-    try_dispatch(on_socket_created, wrap(Req), Channel).
+    %% Check license limitation
+    case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of
+        {error, _Reason} ->
+            throw(nopermission);
+        _ ->
+            ConnMod = maps:get(conn_mod, NConnInfo),
+            true = ets:insert(?CHAN_CONN_TAB, {?CHANMOCK(self()), ConnMod}),
+            Req = #{conninfo =>
+                    peercert(Peercert,
+                             #{socktype => socktype(Socktype),
+                               peername => address(Peername),
+                               sockname => address(Sockname)})},
+            try_dispatch(on_socket_created, wrap(Req), Channel)
+    end.
 
 %% @private
 peercert(NoSsl, ConnInfo) when NoSsl == nossl;
@@ -283,6 +293,7 @@ handle_call({auth, ClientInfo0, Password},
                 emqx_metrics:inc('client.auth.anonymous'),
             NClientInfo = maps:merge(ClientInfo1, AuthResult),
             NChannel = Channel1#channel{clientinfo = NClientInfo},
+            clean_anonymous_clients(),
             case emqx_cm:open_session(true, NClientInfo, NConnInfo) of
                 {ok, _Session} ->
                     ?LOG(debug, "Client ~s (Username: '~s') authorized successfully!",
@@ -399,12 +410,16 @@ handle_info(Info, Channel) ->
 
 -spec(terminate(any(), channel()) -> channel()).
 terminate(Reason, Channel) ->
+    clean_anonymous_clients(),
     Req = #{reason => stringfy(Reason)},
     try_dispatch(on_socket_closed, wrap(Req), Channel).
 
 is_anonymous(#{anonymous := true}) -> true;
 is_anonymous(_AuthResult)          -> false.
 
+clean_anonymous_clients() ->
+    ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())).
+
 %%--------------------------------------------------------------------
 %% Sub/UnSub
 %%--------------------------------------------------------------------
@@ -577,7 +592,6 @@ default_conninfo(ConnInfo) ->
     ConnInfo#{clean_start => true,
               clientid => undefined,
               username => undefined,
-              conn_mod => undefined,
               conn_props => #{},
               connected => true,
               connected_at => erlang:system_time(millisecond),
diff --git a/apps/emqx_exproto/src/emqx_exproto_conn.erl b/apps/emqx_exproto/src/emqx_exproto_conn.erl
index c7f73e104..685a05687 100644
--- a/apps/emqx_exproto/src/emqx_exproto_conn.erl
+++ b/apps/emqx_exproto/src/emqx_exproto_conn.erl
@@ -233,7 +233,11 @@ init(Parent, WrappedSock, Peername0, Options) ->
     case esockd_wait(WrappedSock) of
         {ok, NWrappedSock} ->
             Peername = esockd_peername(NWrappedSock, Peername0),
-            run_loop(Parent, init_state(NWrappedSock, Peername, Options));
+            try
+                run_loop(Parent, init_state(NWrappedSock, Peername, Options))
+            catch
+                throw : nopermission -> erlang:exit(normal)
+            end;
         {error, Reason} ->
             ok = esockd_close(WrappedSock),
             exit_on_sock_error(Reason)
diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl
index d2bf3d8b0..19ae6bc26 100644
--- a/apps/emqx_stomp/src/emqx_stomp_connection.erl
+++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl
@@ -91,6 +91,8 @@
 
 -define(ENABLED(X), (X =/= undefined)).
 
+-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_stomp_connection]}}]).
+
 -dialyzer({nowarn_function, [ ensure_stats_timer/2
                             ]}).
 
@@ -101,7 +103,7 @@
 start_link(Transport, Sock, ProtoEnv) ->
     {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, ProtoEnv]])}.
 
--spec info(pid()|state()) -> emqx_types:infos().
+-spec info(pid() | state()) -> emqx_types:infos().
 info(CPid) when is_pid(CPid) ->
     call(CPid, info);
 info(State = #state{pstate = PState}) ->
@@ -123,7 +125,7 @@ info(sockstate, #state{sockstate = SockSt}) ->
 info(active_n, #state{active_n = ActiveN}) ->
     ActiveN.
 
--spec stats(pid()|state()) -> emqx_types:stats().
+-spec stats(pid() | state()) -> emqx_types:stats().
 stats(CPid) when is_pid(CPid) ->
     call(CPid, stats);
 stats(#state{transport = Transport,

From 412a68ac750302214683742cfb44f65d44de6c3a Mon Sep 17 00:00:00 2001
From: Zaiming Shi 
Date: Fri, 5 Nov 2021 13:13:04 +0100
Subject: [PATCH 25/29] chore: bump version for emqx_proto

---
 apps/emqx_exproto/src/emqx_exproto.app.src     | 2 +-
 apps/emqx_exproto/src/emqx_exproto.appup.src   | 2 ++
 apps/emqx_exproto/src/emqx_exproto_channel.erl | 1 -
 3 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/apps/emqx_exproto/src/emqx_exproto.app.src b/apps/emqx_exproto/src/emqx_exproto.app.src
index e2c674779..f7cab4c2e 100644
--- a/apps/emqx_exproto/src/emqx_exproto.app.src
+++ b/apps/emqx_exproto/src/emqx_exproto.app.src
@@ -1,6 +1,6 @@
 {application, emqx_exproto,
  [{description, "EMQ X Extension for Protocol"},
-  {vsn, "4.3.3"}, %% strict semver
+  {vsn, "4.3.4"}, %% 4.3.3 is used by ee
   {modules, []},
   {registered, []},
   {mod, {emqx_exproto_app, []}},
diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src
index b5cd29869..6b8dde713 100644
--- a/apps/emqx_exproto/src/emqx_exproto.appup.src
+++ b/apps/emqx_exproto/src/emqx_exproto.appup.src
@@ -1,6 +1,7 @@
 %% -*-: erlang -*-
 {VSN,
  [
+    {"4.3.3", []}, %% 4.3.3 is used by ee
     {"4.3.2", [
       {load_module, emqx_exproto_conn, brutal_purge, soft_purge, []},
       {load_module, emqx_exproto_channel, brutal_purge, soft_purge, []}
@@ -14,6 +15,7 @@
     {<<".*">>, []}
  ],
  [
+    {"4.3.3", []}, %% 4.3.3 is used by ee
     {"4.3.2", [
       {load_module, emqx_exproto_conn, brutal_purge, soft_purge, []},
       {load_module, emqx_exproto_channel, brutal_purge, soft_purge, []}
diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl
index 4c9dca5d5..be609860e 100644
--- a/apps/emqx_exproto/src/emqx_exproto_channel.erl
+++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl
@@ -158,7 +158,6 @@ init(ConnInfo = #{socktype := Socktype,
                        conn_state = connecting,
                        timers = #{}
                       },
-    %% Check license limitation
     case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of
         {error, _Reason} ->
             throw(nopermission);

From f1f2e51c99b5b6f7be2d4a98b0b8e47e2f8de10e Mon Sep 17 00:00:00 2001
From: Zaiming Shi 
Date: Fri, 5 Nov 2021 16:33:39 +0100
Subject: [PATCH 26/29] fix: update appup

---
 apps/emqx_exproto/src/emqx_exproto.appup.src  |  55 +++++----
 .../src/emqx_rule_engine.app.src              |   2 +-
 .../src/emqx_rule_engine.appup.src            | 106 ++++++++----------
 apps/emqx_stomp/src/emqx_stomp.app.src        |   2 +-
 apps/emqx_stomp/src/emqx_stomp.appup.src      |  14 ++-
 5 files changed, 83 insertions(+), 96 deletions(-)

diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src
index 6b8dde713..e0a021af5 100644
--- a/apps/emqx_exproto/src/emqx_exproto.appup.src
+++ b/apps/emqx_exproto/src/emqx_exproto.appup.src
@@ -1,31 +1,26 @@
-%% -*-: erlang -*-
+%% -*- mode: erlang -*-
 {VSN,
- [
-    {"4.3.3", []}, %% 4.3.3 is used by ee
-    {"4.3.2", [
-      {load_module, emqx_exproto_conn, brutal_purge, soft_purge, []},
-      {load_module, emqx_exproto_channel, brutal_purge, soft_purge, []}
-    ]},
-    {<<"4.3.[0-1]">>, [
-      {load_module, emqx_exproto_gsvr, brutal_purge, soft_purge, []},
-      {load_module, emqx_exproto_gcli, brutal_purge, soft_purge, []},
-      {load_module, emqx_exproto_conn, brutal_purge, soft_purge, []},
-      {load_module, emqx_exproto_channel, brutal_purge, soft_purge, []}
-    ]},
-    {<<".*">>, []}
- ],
- [
-    {"4.3.3", []}, %% 4.3.3 is used by ee
-    {"4.3.2", [
-      {load_module, emqx_exproto_conn, brutal_purge, soft_purge, []},
-      {load_module, emqx_exproto_channel, brutal_purge, soft_purge, []}
-    ]},
-    {<<"4.3.[0-1]">>, [
-      {load_module, emqx_exproto_gsvr, brutal_purge, soft_purge, []},
-      {load_module, emqx_exproto_gcli, brutal_purge, soft_purge, []},
-      {load_module, emqx_exproto_conn, brutal_purge, soft_purge, []},
-      {load_module, emqx_exproto_channel, brutal_purge, soft_purge, []}
-    ]},
-    {<<".*">>, []}
- ]
-}.
+  [{"4.3.3",
+    [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
+     {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
+   {"4.3.2",
+    [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
+     {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
+   {<<"4.3.[0-1]">>,
+    [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
+     {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
+     {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
+     {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
+   {<<".*">>,[]}],
+  [{"4.3.3",
+    [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
+     {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
+   {"4.3.2",
+    [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
+     {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
+   {<<"4.3.[0-1]">>,
+    [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
+     {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
+     {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
+     {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
+   {<<".*">>,[]}]}.
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src
index 89147b76c..66af0da21 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src
@@ -1,6 +1,6 @@
 {application, emqx_rule_engine,
  [{description, "EMQ X Rule Engine"},
-  {vsn, "4.3.5"}, % strict semver, bump manually!
+  {vsn, "4.3.6"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_rule_engine_sup, emqx_rule_registry]},
   {applications, [kernel,stdlib,rulesql,getopt]},
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src
index 80b64f027..c94dc994d 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src
@@ -1,64 +1,52 @@
-%% -*-: erlang -*-
-{"4.3.5",
- [ {"4.3.0",
-    [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
-    , {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
-    , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
-    , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
-    , {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
-    ]},
-   {"4.3.1",
-    [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
-    , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
-    , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
-    , {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
-    ]},
-   {"4.3.2",
-    [ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
-    , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
-    , {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
-    , {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
-    ]},
-   {"4.3.3",
-    [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
-    , {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
-    , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
-    ]},
-   {"4.3.4",
-    [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
-    , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
-    ]},
-   {<<".*">>, []}
- ],
- [
+%% -*- mode: erlang -*-
+{VSN,
+  [{"4.3.5",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
    {"4.3.0",
-    [ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
-    , {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
-    , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
-    , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
-    , {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
-    ]},
+    [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
+     {apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
+     {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
    {"4.3.1",
-    [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
-    , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
-    , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
-    , {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
-    ]},
+    [{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
+     {apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
+     {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
    {"4.3.2",
-    [ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
-    , {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
-    , {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
-    , {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
-    ]},
+    [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
+     {apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
+     {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
    {"4.3.3",
-    [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
-    , {load_module, emqx_rule_actions, brutal_purge, soft_purge, []}
-    , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
-    ]},
+    [{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
    {"4.3.4",
-    [ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
-    , {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
-    ]},
-   {<<".*">>, []}
- ]
-}.
+    [{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
+   {<<".*">>,[]}],
+  [{"4.3.5",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
+   {"4.3.0",
+    [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
+     {apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
+     {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
+   {"4.3.1",
+    [{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
+     {apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
+     {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
+   {"4.3.2",
+    [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
+     {apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
+     {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
+   {"4.3.3",
+    [{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
+   {"4.3.4",
+    [{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
+     {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
+   {<<".*">>,[]}]}.
diff --git a/apps/emqx_stomp/src/emqx_stomp.app.src b/apps/emqx_stomp/src/emqx_stomp.app.src
index 87a0fd089..d2ecae53b 100644
--- a/apps/emqx_stomp/src/emqx_stomp.app.src
+++ b/apps/emqx_stomp/src/emqx_stomp.app.src
@@ -1,6 +1,6 @@
 {application, emqx_stomp,
  [{description, "EMQ X Stomp Protocol Plugin"},
-  {vsn, "4.3.1"}, % strict semver, bump manually!
+  {vsn, "4.3.2"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_stomp_sup]},
   {applications, [kernel,stdlib]},
diff --git a/apps/emqx_stomp/src/emqx_stomp.appup.src b/apps/emqx_stomp/src/emqx_stomp.appup.src
index 8a3e7e720..e82b2fbde 100644
--- a/apps/emqx_stomp/src/emqx_stomp.appup.src
+++ b/apps/emqx_stomp/src/emqx_stomp.appup.src
@@ -1,8 +1,12 @@
 %% -*- mode: erlang -*-
-{"4.3.1",
-  [{"4.3.0",
-    [{restart_application,emqx_stomp}]},
+{VSN,
+  [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
+   {"4.3.0",
+    [{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]},
+     {restart_application,emqx_stomp}]},
    {<<".*">>,[]}],
-  [{"4.3.0",
-     [{restart_application,emqx_stomp}]},
+  [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
+   {"4.3.0",
+    [{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]},
+     {restart_application,emqx_stomp}]},
    {<<".*">>,[]}]}.

From 4f3790a6f58a51a63489622a5cbf458cc94ce7c7 Mon Sep 17 00:00:00 2001
From: Zaiming Shi 
Date: Fri, 5 Nov 2021 16:44:55 +0100
Subject: [PATCH 27/29] style: fix code style for emqx_exproto

---
 apps/emqx_exproto/src/emqx_exproto_channel.erl |  2 +-
 apps/emqx_exproto/src/emqx_exproto_conn.erl    | 14 +++++++-------
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl
index be609860e..67f85f932 100644
--- a/apps/emqx_exproto/src/emqx_exproto_channel.erl
+++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl
@@ -106,7 +106,7 @@
 info(Channel) ->
     maps:from_list(info(?INFO_KEYS, Channel)).
 
--spec(info(list(atom())|atom(), channel()) -> term()).
+-spec(info(list(atom()) | atom(), channel()) -> term()).
 info(Keys, Channel) when is_list(Keys) ->
     [{Key, info(Key, Channel)} || Key <- Keys];
 info(conninfo, #channel{conninfo = ConnInfo}) ->
diff --git a/apps/emqx_exproto/src/emqx_exproto_conn.erl b/apps/emqx_exproto/src/emqx_exproto_conn.erl
index 685a05687..02c0b31d6 100644
--- a/apps/emqx_exproto/src/emqx_exproto_conn.erl
+++ b/apps/emqx_exproto/src/emqx_exproto_conn.erl
@@ -115,7 +115,7 @@ start_link(esockd_transport, Sock, Options) ->
 %%--------------------------------------------------------------------
 
 %% @doc Get infos of the connection/channel.
--spec(info(pid()|state()) -> emqx_types:infos()).
+-spec(info(pid() | state()) -> emqx_types:infos()).
 info(CPid) when is_pid(CPid) ->
     call(CPid, info);
 info(State = #state{channel = Channel}) ->
@@ -137,7 +137,7 @@ info(sockstate, #state{sockstate = SockSt}) ->
 info(active_n, #state{active_n = ActiveN}) ->
     ActiveN.
 
--spec(stats(pid()|state()) -> emqx_types:stats()).
+-spec(stats(pid() | state()) -> emqx_types:stats()).
 stats(CPid) when is_pid(CPid) ->
     call(CPid, stats);
 stats(#state{socket  = Socket,
@@ -341,7 +341,7 @@ cancel_stats_timer(State) -> State.
 
 process_msg([], Parent, State) -> recvloop(Parent, State);
 
-process_msg([Msg|More], Parent, State) ->
+process_msg([Msg | More], Parent, State) ->
     case catch handle_msg(Msg, State) of
         ok ->
             process_msg(More, Parent, State);
@@ -417,7 +417,7 @@ handle_msg({Passive, _Sock}, State)
 
 handle_msg(Deliver = {deliver, _Topic, _Msg},
            State = #state{active_n = ActiveN}) ->
-    Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
+    Delivers = [Deliver | emqx_misc:drain_deliver(ActiveN)],
     with_channel(handle_deliver, [Delivers], State);
 
 %% Something sent
@@ -605,9 +605,9 @@ handle_outgoing(IoData, State = #state{socket = Socket}) ->
 handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
     case activate_socket(State) of
         {ok, NState = #state{sockstate = NewSst}} ->
-            if OldSst =/= NewSst ->
-                   {ok, {event, NewSst}, NState};
-               true -> {ok, NState}
+            case OldSst =/= NewSst of
+                true -> {ok, {event, NewSst}, NState};
+                false -> {ok, NState}
             end;
         {error, Reason} ->
             handle_info({sock_error, Reason}, State)

From ed171b8e609f29980f42248135cea69cd53f2725 Mon Sep 17 00:00:00 2001
From: Zaiming Shi 
Date: Fri, 5 Nov 2021 16:46:56 +0100
Subject: [PATCH 28/29] chore: ensure version bump for dashboard app

---
 lib-ce/emqx_dashboard/src/emqx_dashboard.app.src | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src b/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src
index 2c79b10b8..724a76237 100644
--- a/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src
+++ b/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src
@@ -1,6 +1,6 @@
 {application, emqx_dashboard,
  [{description, "EMQ X Web Dashboard"},
-  {vsn, "4.3.5"}, % strict semver, bump manually!
+  {vsn, "4.3.6"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_dashboard_sup]},
   {applications, [kernel,stdlib,mnesia,minirest]},

From 83ecdb242fff270d4588dda1230b50ee8a0c6be9 Mon Sep 17 00:00:00 2001
From: Zaiming Shi 
Date: Mon, 8 Nov 2021 09:29:49 +0100
Subject: [PATCH 29/29] fix(appup): delete module load from app restart
 instruction group

---
 apps/emqx_stomp/src/emqx_stomp.appup.src | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/apps/emqx_stomp/src/emqx_stomp.appup.src b/apps/emqx_stomp/src/emqx_stomp.appup.src
index e82b2fbde..bf4603e52 100644
--- a/apps/emqx_stomp/src/emqx_stomp.appup.src
+++ b/apps/emqx_stomp/src/emqx_stomp.appup.src
@@ -2,11 +2,9 @@
 {VSN,
   [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
    {"4.3.0",
-    [{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]},
-     {restart_application,emqx_stomp}]},
+    [{restart_application,emqx_stomp}]},
    {<<".*">>,[]}],
   [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
    {"4.3.0",
-    [{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]},
-     {restart_application,emqx_stomp}]},
+    [{restart_application,emqx_stomp}]},
    {<<".*">>,[]}]}.