diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl index b2157c7ce..63e76faed 100644 --- a/apps/emqx_stomp/src/emqx_stomp_connection.erl +++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl @@ -43,7 +43,7 @@ ]). %% for protocol --export([send/4, heartbeat/2]). +-export([send/4, heartbeat/2, statfun/3]). %% for mgmt -export([call/2, call/3]). @@ -157,6 +157,7 @@ init_state(Transport, Socket, ProtoEnv) -> {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), SendFun = {fun ?MODULE:send/4, [Transport, Socket, self()]}, + StatFun = {fun ?MODULE:statfun/3, [Transport, Socket]}, HrtBtFun = {fun ?MODULE:heartbeat/2, [Transport, Socket]}, Parser = emqx_stomp_frame:init_parer_state(ProtoEnv), @@ -168,6 +169,7 @@ init_state(Transport, Socket, ProtoEnv) -> peername => Peername, sockname => Sockname, peercert => Peercert, + statfun => StatFun, sendfun => SendFun, heartfun => HrtBtFun, conn_mod => ?MODULE @@ -218,8 +220,15 @@ send(Frame, Transport, Sock, ConnPid) -> end. heartbeat(Transport, Sock) -> + ?LOG(debug, "SEND heartbeat: \\n"), Transport:send(Sock, <<$\n>>). +statfun(Stat, Transport, Sock) -> + case Transport:getstat(Sock, [Stat]) of + {ok, [{Stat, Val}]} -> {ok, Val}; + {error, Error} -> {error, Error} + end. + handle_call(info, _From, State) -> {reply, info(State), State}; diff --git a/apps/emqx_stomp/src/emqx_stomp_heartbeat.erl b/apps/emqx_stomp/src/emqx_stomp_heartbeat.erl index 2a221ad68..4097756fe 100644 --- a/apps/emqx_stomp/src/emqx_stomp_heartbeat.erl +++ b/apps/emqx_stomp/src/emqx_stomp_heartbeat.erl @@ -23,9 +23,10 @@ , check/3 , info/1 , interval/2 + , reset/3 ]). --record(heartbeater, {interval, statval, repeat}). +-record(heartbeater, {interval, statval, repeat, repeat_max}). -type name() :: incoming | outgoing. @@ -42,19 +43,23 @@ init({0, 0}) -> #{}; init({Cx, Cy}) -> maps:filter(fun(_, V) -> V /= undefined end, - #{incoming => heartbeater(Cx), - outgoing => heartbeater(Cy) + #{incoming => heartbeater(incoming, Cx), + outgoing => heartbeater(outgoing, Cy) }). -heartbeater(0) -> +heartbeater(_, 0) -> undefined; -heartbeater(I) -> +heartbeater(N, I) -> #heartbeater{ interval = I, statval = 0, - repeat = 0 + repeat = 0, + repeat_max = repeat_max(N) }. +repeat_max(incoming) -> 1; +repeat_max(outgoing) -> 0. + -spec check(name(), pos_integer(), heartbeat()) -> {ok, heartbeat()} | {error, timeout}. @@ -67,11 +72,12 @@ check(Name, NewVal, HrtBt) -> end. check(NewVal, HrtBter = #heartbeater{statval = OldVal, - repeat = Repeat}) -> + repeat = Repeat, + repeat_max = Max}) -> if NewVal =/= OldVal -> {ok, HrtBter#heartbeater{statval = NewVal, repeat = 0}}; - Repeat < 1 -> + Repeat < Max -> {ok, HrtBter#heartbeater{repeat = Repeat + 1}}; true -> {error, timeout} end. @@ -89,3 +95,10 @@ interval(Type, HrtBt) -> undefined -> undefined; #heartbeater{interval = Intv} -> Intv end. + +reset(Type, StatVal, HrtBt) -> + case maps:get(Type, HrtBt, undefined) of + undefined -> HrtBt; + HrtBter -> + HrtBt#{Type => HrtBter#heartbeater{statval = StatVal, repeat = 0}} + end. diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index 06740430c..fc211be10 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -69,6 +69,8 @@ sendfun :: {function(), list()}, %% Heartbeat function heartfun :: {function(), list()}, + %% Get Socket stat function + statfun :: {function(), list()}, %% The confs for the connection %% TODO: put these configs into a public mem? allow_anonymous :: maybe(boolean()), @@ -77,6 +79,9 @@ -define(DEFAULT_SUB_ACK, <<"auto">>). +-define(INCOMING_TIMER_BACKOFF, 1.25). +-define(OUTCOMING_TIMER_BACKOFF, 0.75). + -define(TIMER_TABLE, #{ incoming_timer => incoming, outgoing_timer => outgoing, @@ -108,7 +113,8 @@ %% @doc Init protocol init(ConnInfo = #{peername := {PeerHost, _Port}, sockname := {_Host, SockPort}, - sendfun := SendFun, + statfun := StatFun, + sendfun := SendFun, heartfun := HeartFun}, Opts) -> NConnInfo = default_conninfo(ConnInfo), @@ -132,6 +138,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, clientinfo = ClientInfo, heartfun = HeartFun, sendfun = SendFun, + statfun = StatFun, timers = #{}, transaction = #{}, allow_anonymous = AllowAnonymous, @@ -231,6 +238,8 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers}, default_user(State) ) of true -> + Heartbeats = parse_heartbeats( + header(<<"heart-beat">>, Headers, <<"0,0">>)), ClientId = emqx_guid:to_base62(emqx_guid:gen()), emqx_logger:set_metadata_clientid(ClientId), ConnInfo = State#pstate.conninfo, @@ -238,6 +247,7 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers}, NConnInfo = ConnInfo#{ proto_ver => Version, clientid => ClientId, + keepalive => element(1, Heartbeats) div 1000, username => Login }, NClitInfo = ClitInfo#{ @@ -250,7 +260,6 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers}, emqx_cm:discard_session(ClientId), emqx_cm:register_channel(ClientId, ConnPid, NConnInfo) end), - Heartbeats = parse_heartbeats(header(<<"heart-beat">>, Headers, <<"0,0">>)), NState = start_heartbeart_timer( Heartbeats, State#pstate{ @@ -454,11 +463,18 @@ timeout(_TRef, {incoming, NewVal}, timeout(_TRef, {outgoing, NewVal}, State = #pstate{heart_beats = HrtBt, + statfun = {StatFun, StatArgs}, heartfun = {Fun, Args}}) -> case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of {error, timeout} -> _ = erlang:apply(Fun, Args), - {ok, State}; + case erlang:apply(StatFun, [send_oct] ++ StatArgs) of + {ok, NewVal2} -> + NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal2, HrtBt), + {ok, reset_timer(outgoing_timer, State#pstate{heart_beats = NHrtBt})}; + {error, Reason} -> + {shutdown, {error, {get_stats_error, Reason}}, State} + end; {ok, NHrtBt} -> {ok, reset_timer(outgoing_timer, State#pstate{heart_beats = NHrtBt})} end; @@ -633,7 +649,11 @@ reverse_heartbeats({Cx, Cy}) -> start_heartbeart_timer(Heartbeats, State) -> ensure_timer( [incoming_timer, outgoing_timer], - State#pstate{heart_beats = emqx_stomp_heartbeat:init(Heartbeats)}). + State#pstate{heart_beats = emqx_stomp_heartbeat:init(backoff(Heartbeats))}). + +backoff({Cx, Cy}) -> + {erlang:ceil(Cx * ?INCOMING_TIMER_BACKOFF), + erlang:ceil(Cy * ?OUTCOMING_TIMER_BACKOFF)}. %%-------------------------------------------------------------------- %% pub & sub helpers @@ -768,3 +788,4 @@ interval(outgoing_timer, #pstate{heart_beats = HrtBt}) -> emqx_stomp_heartbeat:interval(outgoing, HrtBt); interval(clean_trans_timer, _) -> ?TRANS_TIMEOUT. +