From 1263a05bbcdc6f9f26b3a3d02622adc0f9ccede8 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 10 Dec 2020 09:38:46 +0800 Subject: [PATCH] refactor(stomp): improve code form naming --- apps/emqx_stomp/src/emqx_stomp_connection.erl | 165 +++++++++--------- apps/emqx_stomp/src/emqx_stomp_protocol.erl | 60 ++++--- 2 files changed, 114 insertions(+), 111 deletions(-) diff --git a/apps/emqx_stomp/src/emqx_stomp_connection.erl b/apps/emqx_stomp/src/emqx_stomp_connection.erl index dc1977944..3bb3a58fd 100644 --- a/apps/emqx_stomp/src/emqx_stomp_connection.erl +++ b/apps/emqx_stomp/src/emqx_stomp_connection.erl @@ -19,6 +19,9 @@ -behaviour(gen_server). -include("emqx_stomp.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-logger_header("[Stomp-Conn]"). -export([ start_link/3 , info/1 @@ -36,16 +39,13 @@ %% for protocol -export([send/4, heartbeat/2]). --record(stomp_client, {transport, socket, peername, conn_name, conn_state, - await_recv, rate_limit, parser, proto_state, - proto_env, heartbeat}). +-record(state, {transport, socket, peername, conn_name, conn_state, + await_recv, rate_limit, parser, pstate, + proto_env, heartbeat}). -define(INFO_KEYS, [peername, await_recv, conn_state]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). --define(LOG(Level, Format, Args, State), - emqx_logger:Level("Stomp(~s): " ++ Format, [State#stomp_client.conn_name | Args])). - start_link(Transport, Sock, ProtoEnv) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, ProtoEnv]])}. @@ -61,20 +61,21 @@ init([Transport, Sock, ProtoEnv]) -> SendFun = {fun ?MODULE:send/4, [Transport, Sock, self()]}, HrtBtFun = {fun ?MODULE:heartbeat/2, [Transport, Sock]}, Parser = emqx_stomp_frame:init_parer_state(ProtoEnv), - ProtoState = emqx_stomp_protocol:init(#{peername => Peername, - sendfun => SendFun, - heartfun => HrtBtFun}, ProtoEnv), + PState = emqx_stomp_protocol:init(#{peername => Peername, + sendfun => SendFun, + heartfun => HrtBtFun}, ProtoEnv), RateLimit = init_rate_limit(proplists:get_value(rate_limit, ProtoEnv)), - State = run_socket(#stomp_client{transport = Transport, - socket = NewSock, - peername = Peername, - conn_name = ConnName, - conn_state = running, - await_recv = false, - rate_limit = RateLimit, - parser = Parser, - proto_env = ProtoEnv, - proto_state = ProtoState}), + State = run_socket(#state{transport = Transport, + socket = NewSock, + peername = Peername, + conn_name = ConnName, + conn_state = running, + await_recv = false, + rate_limit = RateLimit, + parser = Parser, + proto_env = ProtoEnv, + pstate = PState}), + emqx_logger:set_metadata_peername(esockd:format(Peername)), gen_server:enter_loop(?MODULE, [{hibernate_after, 5000}], State, 20000); {error, Reason} -> {stop, Reason} @@ -96,15 +97,15 @@ send(Data, Transport, Sock, ConnPid) -> heartbeat(Transport, Sock) -> Transport:send(Sock, <<$\n>>). -handle_call(info, _From, State = #stomp_client{transport = Transport, - socket = Sock, - peername = Peername, - await_recv = AwaitRecv, - conn_state = ConnState, - proto_state = ProtoState}) -> +handle_call(info, _From, State = #state{transport = Transport, + socket = Sock, + peername = Peername, + await_recv = AwaitRecv, + conn_state = ConnState, + pstate = PState}) -> ClientInfo = [{peername, Peername}, {await_recv, AwaitRecv}, {conn_state, ConnState}], - ProtoInfo = emqx_stomp_protocol:info(ProtoState), + ProtoInfo = emqx_stomp_protocol:info(PState), case Transport:getstat(Sock, ?SOCK_STATS) of {ok, SockStats} -> {reply, lists:append([ClientInfo, ProtoInfo, SockStats]), State}; @@ -113,11 +114,11 @@ handle_call(info, _From, State = #stomp_client{transport = Transport, end; handle_call(Req, _From, State) -> - ?LOG(error, "unexpected request: ~p", [Req], State), + ?LOG(error, "unexpected request: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "unexpected msg: ~p", [Msg], State), + ?LOG(error, "unexpected msg: ~p", [Msg]), noreply(State). handle_info(timeout, State) -> @@ -144,15 +145,15 @@ handle_info({timeout, TRef, TMsg}, State) when TMsg =:= incoming; shutdown({sock_error, Reason}, State) end; -handle_info({'EXIT', HbProc, Error}, State = #stomp_client{heartbeat = HbProc}) -> +handle_info({'EXIT', HbProc, Error}, State = #state{heartbeat = HbProc}) -> stop(Error, State); handle_info(activate_sock, State) -> - noreply(run_socket(State#stomp_client{conn_state = running})); + noreply(run_socket(State#state{conn_state = running})); handle_info({inet_async, _Sock, _Ref, {ok, Bytes}}, State) -> - ?LOG(debug, "RECV ~p", [Bytes], State), - received(Bytes, rate_limit(size(Bytes), State#stomp_client{await_recv = false})); + ?LOG(debug, "RECV ~p", [Bytes]), + received(Bytes, rate_limit(size(Bytes), State#state{await_recv = false})); handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> shutdown(Reason, State); @@ -163,29 +164,29 @@ handle_info({inet_reply, _Ref, ok}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); -handle_info({deliver, _Topic, Msg}, State = #stomp_client{proto_state = ProtoState}) -> - noreply(State#stomp_client{proto_state = case emqx_stomp_protocol:send(Msg, ProtoState) of - {ok, ProtoState1} -> - ProtoState1; - {error, dropped, ProtoState1} -> - ProtoState1 - end}); +handle_info({deliver, _Topic, Msg}, State = #state{pstate = PState}) -> + noreply(State#state{pstate = case emqx_stomp_protocol:send(Msg, PState) of + {ok, PState1} -> + PState1; + {error, dropped, PState1} -> + PState1 + end}); handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info], State), + ?LOG(error, "Unexpected info: ~p", [Info]), noreply(State). -terminate(Reason, State = #stomp_client{transport = Transport, - socket = Sock, - proto_state = ProtoState}) -> - ?LOG(info, "terminated for ~p", [Reason], State), +terminate(Reason, #state{transport = Transport, + socket = Sock, + pstate = PState}) -> + ?LOG(info, "terminated for ~p", [Reason]), Transport:fast_close(Sock), - case {ProtoState, Reason} of + case {PState, Reason} of {undefined, _} -> ok; {_, {shutdown, Error}} -> - emqx_stomp_protocol:shutdown(Error, ProtoState); + emqx_stomp_protocol:shutdown(Error, PState); {_, Reason} -> - emqx_stomp_protocol:shutdown(Reason, ProtoState) + emqx_stomp_protocol:shutdown(Reason, PState) end. code_change(_OldVsn, State, _Extra) -> @@ -195,69 +196,69 @@ code_change(_OldVsn, State, _Extra) -> %% Receive and Parse data %%-------------------------------------------------------------------- -with_proto(Fun, Args, State = #stomp_client{proto_state = ProtoState}) -> - case erlang:apply(emqx_stomp_protocol, Fun, Args ++ [ProtoState]) of - {ok, NProtoState} -> - noreply(State#stomp_client{proto_state = NProtoState}); - {F, Reason, NProtoState} when F == stop; - F == error; - F == shutdown -> - shutdown(Reason, State#stomp_client{proto_state = NProtoState}) +with_proto(Fun, Args, State = #state{pstate = PState}) -> + case erlang:apply(emqx_stomp_protocol, Fun, Args ++ [PState]) of + {ok, NPState} -> + noreply(State#state{pstate = NPState}); + {F, Reason, NPState} when F == stop; + F == error; + F == shutdown -> + shutdown(Reason, State#state{pstate = NPState}) end. received(<<>>, State) -> noreply(State); -received(Bytes, State = #stomp_client{parser = Parser, - proto_state = ProtoState}) -> +received(Bytes, State = #state{parser = Parser, + pstate = PState}) -> try emqx_stomp_frame:parse(Bytes, Parser) of {more, NewParser} -> - noreply(State#stomp_client{parser = NewParser}); + noreply(State#state{parser = NewParser}); {ok, Frame, Rest} -> - ?LOG(info, "RECV Frame: ~s", [emqx_stomp_frame:format(Frame)], State), - case emqx_stomp_protocol:received(Frame, ProtoState) of - {ok, ProtoState1} -> - received(Rest, reset_parser(State#stomp_client{proto_state = ProtoState1})); - {error, Error, ProtoState1} -> - shutdown(Error, State#stomp_client{proto_state = ProtoState1}); - {stop, Reason, ProtoState1} -> - stop(Reason, State#stomp_client{proto_state = ProtoState1}) + ?LOG(info, "RECV Frame: ~s", [emqx_stomp_frame:format(Frame)]), + case emqx_stomp_protocol:received(Frame, PState) of + {ok, PState1} -> + received(Rest, reset_parser(State#state{pstate = PState1})); + {error, Error, PState1} -> + shutdown(Error, State#state{pstate = PState1}); + {stop, Reason, PState1} -> + stop(Reason, State#state{pstate = PState1}) end; {error, Error} -> - ?LOG(error, "Framing error - ~s", [Error], State), - ?LOG(error, "Bytes: ~p", [Bytes], State), + ?LOG(error, "Framing error - ~s", [Error]), + ?LOG(error, "Bytes: ~p", [Bytes]), shutdown(frame_error, State) catch _Error:Reason -> - ?LOG(error, "Parser failed for ~p", [Reason], State), - ?LOG(error, "Error data: ~p", [Bytes], State), + ?LOG(error, "Parser failed for ~p", [Reason]), + ?LOG(error, "Error data: ~p", [Bytes]), shutdown(parse_error, State) end. -reset_parser(State = #stomp_client{proto_env = ProtoEnv}) -> - State#stomp_client{parser = emqx_stomp_frame:init_parer_state(ProtoEnv)}. +reset_parser(State = #state{proto_env = ProtoEnv}) -> + State#state{parser = emqx_stomp_frame:init_parer_state(ProtoEnv)}. -rate_limit(_Size, State = #stomp_client{rate_limit = undefined}) -> +rate_limit(_Size, State = #state{rate_limit = undefined}) -> run_socket(State); -rate_limit(Size, State = #stomp_client{rate_limit = Rl}) -> +rate_limit(Size, State = #state{rate_limit = Rl}) -> case esockd_rate_limit:check(Size, Rl) of {0, Rl1} -> - run_socket(State#stomp_client{conn_state = running, rate_limit = Rl1}); + run_socket(State#state{conn_state = running, rate_limit = Rl1}); {Pause, Rl1} -> - ?LOG(error, "Rate limiter pause for ~p", [Pause], State), + ?LOG(error, "Rate limiter pause for ~p", [Pause]), erlang:send_after(Pause, self(), activate_sock), - State#stomp_client{conn_state = blocked, rate_limit = Rl1} + State#state{conn_state = blocked, rate_limit = Rl1} end. -run_socket(State = #stomp_client{conn_state = blocked}) -> +run_socket(State = #state{conn_state = blocked}) -> State; -run_socket(State = #stomp_client{await_recv = true}) -> +run_socket(State = #state{await_recv = true}) -> State; -run_socket(State = #stomp_client{transport = Transport, socket = Sock}) -> +run_socket(State = #state{transport = Transport, socket = Sock}) -> Transport:async_recv(Sock, 0, infinity), - State#stomp_client{await_recv = true}. + State#state{await_recv = true}. -getstat(Stat, #stomp_client{transport = Transport, socket = Sock}) -> +getstat(Stat, #state{transport = Transport, socket = Sock}) -> case Transport:getstat(Sock, [Stat]) of {ok, [{Stat, Val}]} -> {ok, Val}; {error, Error} -> {error, Error} diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index 4834955a2..a366105b8 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -38,7 +38,7 @@ , timeout/3 ]). --record(stomp_proto, { +-record(pstate, { peername, heartfun, sendfun, @@ -58,7 +58,7 @@ outgoing_timer => outgoing }). --type(stomp_proto() :: #stomp_proto{}). +-type(pstate() :: #pstate{}). %% @doc Init protocol init(#{peername := Peername, @@ -66,14 +66,14 @@ init(#{peername := Peername, heartfun := HeartFun}, Env) -> AllowAnonymous = get_value(allow_anonymous, Env, false), DefaultUser = get_value(default_user, Env), - #stomp_proto{peername = Peername, + #pstate{peername = Peername, heartfun = HeartFun, sendfun = SendFun, timers = #{}, allow_anonymous = AllowAnonymous, default_user = DefaultUser}. -info(#stomp_proto{connected = Connected, +info(#pstate{connected = Connected, proto_ver = ProtoVer, proto_name = ProtoName, heart_beats = Heartbeats, @@ -86,23 +86,25 @@ info(#stomp_proto{connected = Connected, {login, Login}, {subscriptions, Subscriptions}]. --spec(received(stomp_frame(), stomp_proto()) - -> {ok, stomp_proto()} - | {error, any(), stomp_proto()} - | {stop, any(), stomp_proto()}). +-spec(received(stomp_frame(), pstate()) + -> {ok, pstate()} + | {error, any(), pstate()} + | {stop, any(), pstate()}). received(Frame = #stomp_frame{command = <<"STOMP">>}, State) -> received(Frame#stomp_frame{command = <<"CONNECT">>}, State); received(#stomp_frame{command = <<"CONNECT">>, headers = Headers}, - State = #stomp_proto{connected = false, allow_anonymous = AllowAnonymous, default_user = DefaultUser}) -> + State = #pstate{connected = false, allow_anonymous = AllowAnonymous, default_user = DefaultUser}) -> case negotiate_version(header(<<"accept-version">>, Headers)) of {ok, Version} -> Login = header(<<"login">>, Headers), Passc = header(<<"passcode">>, Headers), case check_login(Login, Passc, AllowAnonymous, DefaultUser) of true -> + emqx_logger:set_metadata_clientid(Login), + Heartbeats = parse_heartbeats(header(<<"heart-beat">>, Headers, <<"0,0">>)), - NState = start_heartbeart_timer(Heartbeats, State#stomp_proto{connected = true, + NState = start_heartbeart_timer(Heartbeats, State#pstate{connected = true, proto_ver = Version, login = Login}), send(connected_frame([{<<"version">>, Version}, {<<"heart-beat">>, reverse_heartbeats(Heartbeats)}]), NState); @@ -116,7 +118,7 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers}, {error, unsupported_version, State} end; -received(#stomp_frame{command = <<"CONNECT">>}, State = #stomp_proto{connected = true}) -> +received(#stomp_frame{command = <<"CONNECT">>}, State = #pstate{connected = true}) -> {error, unexpected_connect, State}; received(#stomp_frame{command = <<"SEND">>, headers = Headers, body = Body}, State) -> @@ -134,7 +136,7 @@ received(#stomp_frame{command = <<"SEND">>, headers = Headers, body = Body}, Sta end; received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers}, - State = #stomp_proto{subscriptions = Subscriptions}) -> + State = #pstate{subscriptions = Subscriptions}) -> Id = header(<<"id">>, Headers), Topic = header(<<"destination">>, Headers), Ack = header(<<"ack">>, Headers, <<"auto">>), @@ -143,18 +145,18 @@ received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers}, {ok, State}; false -> emqx_broker:subscribe(Topic), - {ok, State#stomp_proto{subscriptions = [{Id, Topic, Ack}|Subscriptions]}} + {ok, State#pstate{subscriptions = [{Id, Topic, Ack}|Subscriptions]}} end, maybe_send_receipt(receipt_id(Headers), State1); received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers}, - State = #stomp_proto{subscriptions = Subscriptions}) -> + State = #pstate{subscriptions = Subscriptions}) -> Id = header(<<"id">>, Headers), {ok, State1} = case lists:keyfind(Id, 1, Subscriptions) of {Id, Topic, _Ack} -> ok = emqx_broker:unsubscribe(Topic), - {ok, State#stomp_proto{subscriptions = lists:keydelete(Id, 1, Subscriptions)}}; + {ok, State#pstate{subscriptions = lists:keydelete(Id, 1, Subscriptions)}}; false -> {ok, State} end, @@ -238,7 +240,7 @@ received(#stomp_frame{command = <<"DISCONNECT">>, headers = Headers}, State) -> {stop, normal, State}. send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, - State = #stomp_proto{subscriptions = Subscriptions}) -> + State = #pstate{subscriptions = Subscriptions}) -> case lists:keyfind(Topic, 2, Subscriptions) of {Id, Topic, Ack} -> Headers0 = [{<<"subscription">>, Id}, @@ -260,7 +262,7 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload}, {error, dropped, State} end; -send(Frame, State = #stomp_proto{sendfun = {Fun, Args}}) -> +send(Frame, State = #pstate{sendfun = {Fun, Args}}) -> ?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]), Data = emqx_stomp_frame:serialize(Frame), ?LOG(debug, "SEND ~p", [Data]), @@ -271,23 +273,23 @@ shutdown(_Reason, _State) -> ok. timeout(_TRef, {incoming, NewVal}, - State = #stomp_proto{heart_beats = HrtBt}) -> + State = #pstate{heart_beats = HrtBt}) -> case emqx_stomp_heartbeat:check(incoming, NewVal, HrtBt) of {error, timeout} -> {shutdown, heartbeat_timeout, State}; {ok, NHrtBt} -> - {ok, reset_timer(incoming_timer, State#stomp_proto{heart_beats = NHrtBt})} + {ok, reset_timer(incoming_timer, State#pstate{heart_beats = NHrtBt})} end; timeout(_TRef, {outgoing, NewVal}, - State = #stomp_proto{heart_beats = HrtBt, + State = #pstate{heart_beats = HrtBt, heartfun = {Fun, Args}}) -> case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of {error, timeout} -> _ = erlang:apply(Fun, Args), {ok, State}; {ok, NHrtBt} -> - {ok, reset_timer(outgoing_timer, State#stomp_proto{heart_beats = NHrtBt})} + {ok, reset_timer(outgoing_timer, State#pstate{heart_beats = NHrtBt})} end. negotiate_version(undefined) -> @@ -396,7 +398,7 @@ reverse_heartbeats({Cx, Cy}) -> start_heartbeart_timer(Heartbeats, State) -> ensure_timer( [incoming_timer, outgoing_timer], - State#stomp_proto{heart_beats = emqx_stomp_heartbeat:init(Heartbeats)}). + State#pstate{heart_beats = emqx_stomp_heartbeat:init(Heartbeats)}). %%-------------------------------------------------------------------- %% Timer @@ -406,7 +408,7 @@ ensure_timer([Name], State) -> ensure_timer([Name | Rest], State) -> ensure_timer(Rest, ensure_timer(Name, State)); -ensure_timer(Name, State = #stomp_proto{timers = Timers}) -> +ensure_timer(Name, State = #pstate{timers = Timers}) -> TRef = maps:get(Name, Timers, undefined), Time = interval(Name, State), case TRef == undefined andalso is_integer(Time) andalso Time > 0 of @@ -414,10 +416,10 @@ ensure_timer(Name, State = #stomp_proto{timers = Timers}) -> false -> State %% Timer disabled or exists end. -ensure_timer(Name, Time, State = #stomp_proto{timers = Timers}) -> +ensure_timer(Name, Time, State = #pstate{timers = Timers}) -> Msg = maps:get(Name, ?TIMER_TABLE), TRef = emqx_misc:start_timer(Time, Msg), - State#stomp_proto{timers = Timers#{Name => TRef}}. + State#pstate{timers = Timers#{Name => TRef}}. reset_timer(Name, State) -> ensure_timer(Name, clean_timer(Name, State)). @@ -425,10 +427,10 @@ reset_timer(Name, State) -> reset_timer(Name, Time, State) -> ensure_timer(Name, Time, clean_timer(Name, State)). -clean_timer(Name, State = #stomp_proto{timers = Timers}) -> - State#stomp_proto{timers = maps:remove(Name, Timers)}. +clean_timer(Name, State = #pstate{timers = Timers}) -> + State#pstate{timers = maps:remove(Name, Timers)}. -interval(incoming_timer, #stomp_proto{heart_beats = HrtBt}) -> +interval(incoming_timer, #pstate{heart_beats = HrtBt}) -> emqx_stomp_heartbeat:interval(incoming, HrtBt); -interval(outgoing_timer, #stomp_proto{heart_beats = HrtBt}) -> +interval(outgoing_timer, #pstate{heart_beats = HrtBt}) -> emqx_stomp_heartbeat:interval(outgoing, HrtBt).