refactor(stomp): improve code form naming
This commit is contained in:
parent
9e47d31f79
commit
1263a05bbc
|
@ -19,6 +19,9 @@
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-include("emqx_stomp.hrl").
|
-include("emqx_stomp.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
-logger_header("[Stomp-Conn]").
|
||||||
|
|
||||||
-export([ start_link/3
|
-export([ start_link/3
|
||||||
, info/1
|
, info/1
|
||||||
|
@ -36,16 +39,13 @@
|
||||||
%% for protocol
|
%% for protocol
|
||||||
-export([send/4, heartbeat/2]).
|
-export([send/4, heartbeat/2]).
|
||||||
|
|
||||||
-record(stomp_client, {transport, socket, peername, conn_name, conn_state,
|
-record(state, {transport, socket, peername, conn_name, conn_state,
|
||||||
await_recv, rate_limit, parser, proto_state,
|
await_recv, rate_limit, parser, pstate,
|
||||||
proto_env, heartbeat}).
|
proto_env, heartbeat}).
|
||||||
|
|
||||||
-define(INFO_KEYS, [peername, await_recv, conn_state]).
|
-define(INFO_KEYS, [peername, await_recv, conn_state]).
|
||||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
||||||
|
|
||||||
-define(LOG(Level, Format, Args, State),
|
|
||||||
emqx_logger:Level("Stomp(~s): " ++ Format, [State#stomp_client.conn_name | Args])).
|
|
||||||
|
|
||||||
start_link(Transport, Sock, ProtoEnv) ->
|
start_link(Transport, Sock, ProtoEnv) ->
|
||||||
{ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, ProtoEnv]])}.
|
{ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, ProtoEnv]])}.
|
||||||
|
|
||||||
|
@ -61,11 +61,11 @@ init([Transport, Sock, ProtoEnv]) ->
|
||||||
SendFun = {fun ?MODULE:send/4, [Transport, Sock, self()]},
|
SendFun = {fun ?MODULE:send/4, [Transport, Sock, self()]},
|
||||||
HrtBtFun = {fun ?MODULE:heartbeat/2, [Transport, Sock]},
|
HrtBtFun = {fun ?MODULE:heartbeat/2, [Transport, Sock]},
|
||||||
Parser = emqx_stomp_frame:init_parer_state(ProtoEnv),
|
Parser = emqx_stomp_frame:init_parer_state(ProtoEnv),
|
||||||
ProtoState = emqx_stomp_protocol:init(#{peername => Peername,
|
PState = emqx_stomp_protocol:init(#{peername => Peername,
|
||||||
sendfun => SendFun,
|
sendfun => SendFun,
|
||||||
heartfun => HrtBtFun}, ProtoEnv),
|
heartfun => HrtBtFun}, ProtoEnv),
|
||||||
RateLimit = init_rate_limit(proplists:get_value(rate_limit, ProtoEnv)),
|
RateLimit = init_rate_limit(proplists:get_value(rate_limit, ProtoEnv)),
|
||||||
State = run_socket(#stomp_client{transport = Transport,
|
State = run_socket(#state{transport = Transport,
|
||||||
socket = NewSock,
|
socket = NewSock,
|
||||||
peername = Peername,
|
peername = Peername,
|
||||||
conn_name = ConnName,
|
conn_name = ConnName,
|
||||||
|
@ -74,7 +74,8 @@ init([Transport, Sock, ProtoEnv]) ->
|
||||||
rate_limit = RateLimit,
|
rate_limit = RateLimit,
|
||||||
parser = Parser,
|
parser = Parser,
|
||||||
proto_env = ProtoEnv,
|
proto_env = ProtoEnv,
|
||||||
proto_state = ProtoState}),
|
pstate = PState}),
|
||||||
|
emqx_logger:set_metadata_peername(esockd:format(Peername)),
|
||||||
gen_server:enter_loop(?MODULE, [{hibernate_after, 5000}], State, 20000);
|
gen_server:enter_loop(?MODULE, [{hibernate_after, 5000}], State, 20000);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{stop, Reason}
|
{stop, Reason}
|
||||||
|
@ -96,15 +97,15 @@ send(Data, Transport, Sock, ConnPid) ->
|
||||||
heartbeat(Transport, Sock) ->
|
heartbeat(Transport, Sock) ->
|
||||||
Transport:send(Sock, <<$\n>>).
|
Transport:send(Sock, <<$\n>>).
|
||||||
|
|
||||||
handle_call(info, _From, State = #stomp_client{transport = Transport,
|
handle_call(info, _From, State = #state{transport = Transport,
|
||||||
socket = Sock,
|
socket = Sock,
|
||||||
peername = Peername,
|
peername = Peername,
|
||||||
await_recv = AwaitRecv,
|
await_recv = AwaitRecv,
|
||||||
conn_state = ConnState,
|
conn_state = ConnState,
|
||||||
proto_state = ProtoState}) ->
|
pstate = PState}) ->
|
||||||
ClientInfo = [{peername, Peername}, {await_recv, AwaitRecv},
|
ClientInfo = [{peername, Peername}, {await_recv, AwaitRecv},
|
||||||
{conn_state, ConnState}],
|
{conn_state, ConnState}],
|
||||||
ProtoInfo = emqx_stomp_protocol:info(ProtoState),
|
ProtoInfo = emqx_stomp_protocol:info(PState),
|
||||||
case Transport:getstat(Sock, ?SOCK_STATS) of
|
case Transport:getstat(Sock, ?SOCK_STATS) of
|
||||||
{ok, SockStats} ->
|
{ok, SockStats} ->
|
||||||
{reply, lists:append([ClientInfo, ProtoInfo, SockStats]), State};
|
{reply, lists:append([ClientInfo, ProtoInfo, SockStats]), State};
|
||||||
|
@ -113,11 +114,11 @@ handle_call(info, _From, State = #stomp_client{transport = Transport,
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?LOG(error, "unexpected request: ~p", [Req], State),
|
?LOG(error, "unexpected request: ~p", [Req]),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?LOG(error, "unexpected msg: ~p", [Msg], State),
|
?LOG(error, "unexpected msg: ~p", [Msg]),
|
||||||
noreply(State).
|
noreply(State).
|
||||||
|
|
||||||
handle_info(timeout, State) ->
|
handle_info(timeout, State) ->
|
||||||
|
@ -144,15 +145,15 @@ handle_info({timeout, TRef, TMsg}, State) when TMsg =:= incoming;
|
||||||
shutdown({sock_error, Reason}, State)
|
shutdown({sock_error, Reason}, State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({'EXIT', HbProc, Error}, State = #stomp_client{heartbeat = HbProc}) ->
|
handle_info({'EXIT', HbProc, Error}, State = #state{heartbeat = HbProc}) ->
|
||||||
stop(Error, State);
|
stop(Error, State);
|
||||||
|
|
||||||
handle_info(activate_sock, State) ->
|
handle_info(activate_sock, State) ->
|
||||||
noreply(run_socket(State#stomp_client{conn_state = running}));
|
noreply(run_socket(State#state{conn_state = running}));
|
||||||
|
|
||||||
handle_info({inet_async, _Sock, _Ref, {ok, Bytes}}, State) ->
|
handle_info({inet_async, _Sock, _Ref, {ok, Bytes}}, State) ->
|
||||||
?LOG(debug, "RECV ~p", [Bytes], State),
|
?LOG(debug, "RECV ~p", [Bytes]),
|
||||||
received(Bytes, rate_limit(size(Bytes), State#stomp_client{await_recv = false}));
|
received(Bytes, rate_limit(size(Bytes), State#state{await_recv = false}));
|
||||||
|
|
||||||
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
|
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
|
||||||
shutdown(Reason, State);
|
shutdown(Reason, State);
|
||||||
|
@ -163,29 +164,29 @@ handle_info({inet_reply, _Ref, ok}, State) ->
|
||||||
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
|
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||||
shutdown(Reason, State);
|
shutdown(Reason, State);
|
||||||
|
|
||||||
handle_info({deliver, _Topic, Msg}, State = #stomp_client{proto_state = ProtoState}) ->
|
handle_info({deliver, _Topic, Msg}, State = #state{pstate = PState}) ->
|
||||||
noreply(State#stomp_client{proto_state = case emqx_stomp_protocol:send(Msg, ProtoState) of
|
noreply(State#state{pstate = case emqx_stomp_protocol:send(Msg, PState) of
|
||||||
{ok, ProtoState1} ->
|
{ok, PState1} ->
|
||||||
ProtoState1;
|
PState1;
|
||||||
{error, dropped, ProtoState1} ->
|
{error, dropped, PState1} ->
|
||||||
ProtoState1
|
PState1
|
||||||
end});
|
end});
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info], State),
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||||
noreply(State).
|
noreply(State).
|
||||||
|
|
||||||
terminate(Reason, State = #stomp_client{transport = Transport,
|
terminate(Reason, #state{transport = Transport,
|
||||||
socket = Sock,
|
socket = Sock,
|
||||||
proto_state = ProtoState}) ->
|
pstate = PState}) ->
|
||||||
?LOG(info, "terminated for ~p", [Reason], State),
|
?LOG(info, "terminated for ~p", [Reason]),
|
||||||
Transport:fast_close(Sock),
|
Transport:fast_close(Sock),
|
||||||
case {ProtoState, Reason} of
|
case {PState, Reason} of
|
||||||
{undefined, _} -> ok;
|
{undefined, _} -> ok;
|
||||||
{_, {shutdown, Error}} ->
|
{_, {shutdown, Error}} ->
|
||||||
emqx_stomp_protocol:shutdown(Error, ProtoState);
|
emqx_stomp_protocol:shutdown(Error, PState);
|
||||||
{_, Reason} ->
|
{_, Reason} ->
|
||||||
emqx_stomp_protocol:shutdown(Reason, ProtoState)
|
emqx_stomp_protocol:shutdown(Reason, PState)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
@ -195,69 +196,69 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% Receive and Parse data
|
%% Receive and Parse data
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
with_proto(Fun, Args, State = #stomp_client{proto_state = ProtoState}) ->
|
with_proto(Fun, Args, State = #state{pstate = PState}) ->
|
||||||
case erlang:apply(emqx_stomp_protocol, Fun, Args ++ [ProtoState]) of
|
case erlang:apply(emqx_stomp_protocol, Fun, Args ++ [PState]) of
|
||||||
{ok, NProtoState} ->
|
{ok, NPState} ->
|
||||||
noreply(State#stomp_client{proto_state = NProtoState});
|
noreply(State#state{pstate = NPState});
|
||||||
{F, Reason, NProtoState} when F == stop;
|
{F, Reason, NPState} when F == stop;
|
||||||
F == error;
|
F == error;
|
||||||
F == shutdown ->
|
F == shutdown ->
|
||||||
shutdown(Reason, State#stomp_client{proto_state = NProtoState})
|
shutdown(Reason, State#state{pstate = NPState})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
received(<<>>, State) ->
|
received(<<>>, State) ->
|
||||||
noreply(State);
|
noreply(State);
|
||||||
|
|
||||||
received(Bytes, State = #stomp_client{parser = Parser,
|
received(Bytes, State = #state{parser = Parser,
|
||||||
proto_state = ProtoState}) ->
|
pstate = PState}) ->
|
||||||
try emqx_stomp_frame:parse(Bytes, Parser) of
|
try emqx_stomp_frame:parse(Bytes, Parser) of
|
||||||
{more, NewParser} ->
|
{more, NewParser} ->
|
||||||
noreply(State#stomp_client{parser = NewParser});
|
noreply(State#state{parser = NewParser});
|
||||||
{ok, Frame, Rest} ->
|
{ok, Frame, Rest} ->
|
||||||
?LOG(info, "RECV Frame: ~s", [emqx_stomp_frame:format(Frame)], State),
|
?LOG(info, "RECV Frame: ~s", [emqx_stomp_frame:format(Frame)]),
|
||||||
case emqx_stomp_protocol:received(Frame, ProtoState) of
|
case emqx_stomp_protocol:received(Frame, PState) of
|
||||||
{ok, ProtoState1} ->
|
{ok, PState1} ->
|
||||||
received(Rest, reset_parser(State#stomp_client{proto_state = ProtoState1}));
|
received(Rest, reset_parser(State#state{pstate = PState1}));
|
||||||
{error, Error, ProtoState1} ->
|
{error, Error, PState1} ->
|
||||||
shutdown(Error, State#stomp_client{proto_state = ProtoState1});
|
shutdown(Error, State#state{pstate = PState1});
|
||||||
{stop, Reason, ProtoState1} ->
|
{stop, Reason, PState1} ->
|
||||||
stop(Reason, State#stomp_client{proto_state = ProtoState1})
|
stop(Reason, State#state{pstate = PState1})
|
||||||
end;
|
end;
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
?LOG(error, "Framing error - ~s", [Error], State),
|
?LOG(error, "Framing error - ~s", [Error]),
|
||||||
?LOG(error, "Bytes: ~p", [Bytes], State),
|
?LOG(error, "Bytes: ~p", [Bytes]),
|
||||||
shutdown(frame_error, State)
|
shutdown(frame_error, State)
|
||||||
catch
|
catch
|
||||||
_Error:Reason ->
|
_Error:Reason ->
|
||||||
?LOG(error, "Parser failed for ~p", [Reason], State),
|
?LOG(error, "Parser failed for ~p", [Reason]),
|
||||||
?LOG(error, "Error data: ~p", [Bytes], State),
|
?LOG(error, "Error data: ~p", [Bytes]),
|
||||||
shutdown(parse_error, State)
|
shutdown(parse_error, State)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
reset_parser(State = #stomp_client{proto_env = ProtoEnv}) ->
|
reset_parser(State = #state{proto_env = ProtoEnv}) ->
|
||||||
State#stomp_client{parser = emqx_stomp_frame:init_parer_state(ProtoEnv)}.
|
State#state{parser = emqx_stomp_frame:init_parer_state(ProtoEnv)}.
|
||||||
|
|
||||||
rate_limit(_Size, State = #stomp_client{rate_limit = undefined}) ->
|
rate_limit(_Size, State = #state{rate_limit = undefined}) ->
|
||||||
run_socket(State);
|
run_socket(State);
|
||||||
rate_limit(Size, State = #stomp_client{rate_limit = Rl}) ->
|
rate_limit(Size, State = #state{rate_limit = Rl}) ->
|
||||||
case esockd_rate_limit:check(Size, Rl) of
|
case esockd_rate_limit:check(Size, Rl) of
|
||||||
{0, Rl1} ->
|
{0, Rl1} ->
|
||||||
run_socket(State#stomp_client{conn_state = running, rate_limit = Rl1});
|
run_socket(State#state{conn_state = running, rate_limit = Rl1});
|
||||||
{Pause, Rl1} ->
|
{Pause, Rl1} ->
|
||||||
?LOG(error, "Rate limiter pause for ~p", [Pause], State),
|
?LOG(error, "Rate limiter pause for ~p", [Pause]),
|
||||||
erlang:send_after(Pause, self(), activate_sock),
|
erlang:send_after(Pause, self(), activate_sock),
|
||||||
State#stomp_client{conn_state = blocked, rate_limit = Rl1}
|
State#state{conn_state = blocked, rate_limit = Rl1}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
run_socket(State = #stomp_client{conn_state = blocked}) ->
|
run_socket(State = #state{conn_state = blocked}) ->
|
||||||
State;
|
State;
|
||||||
run_socket(State = #stomp_client{await_recv = true}) ->
|
run_socket(State = #state{await_recv = true}) ->
|
||||||
State;
|
State;
|
||||||
run_socket(State = #stomp_client{transport = Transport, socket = Sock}) ->
|
run_socket(State = #state{transport = Transport, socket = Sock}) ->
|
||||||
Transport:async_recv(Sock, 0, infinity),
|
Transport:async_recv(Sock, 0, infinity),
|
||||||
State#stomp_client{await_recv = true}.
|
State#state{await_recv = true}.
|
||||||
|
|
||||||
getstat(Stat, #stomp_client{transport = Transport, socket = Sock}) ->
|
getstat(Stat, #state{transport = Transport, socket = Sock}) ->
|
||||||
case Transport:getstat(Sock, [Stat]) of
|
case Transport:getstat(Sock, [Stat]) of
|
||||||
{ok, [{Stat, Val}]} -> {ok, Val};
|
{ok, [{Stat, Val}]} -> {ok, Val};
|
||||||
{error, Error} -> {error, Error}
|
{error, Error} -> {error, Error}
|
||||||
|
|
|
@ -38,7 +38,7 @@
|
||||||
, timeout/3
|
, timeout/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-record(stomp_proto, {
|
-record(pstate, {
|
||||||
peername,
|
peername,
|
||||||
heartfun,
|
heartfun,
|
||||||
sendfun,
|
sendfun,
|
||||||
|
@ -58,7 +58,7 @@
|
||||||
outgoing_timer => outgoing
|
outgoing_timer => outgoing
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type(stomp_proto() :: #stomp_proto{}).
|
-type(pstate() :: #pstate{}).
|
||||||
|
|
||||||
%% @doc Init protocol
|
%% @doc Init protocol
|
||||||
init(#{peername := Peername,
|
init(#{peername := Peername,
|
||||||
|
@ -66,14 +66,14 @@ init(#{peername := Peername,
|
||||||
heartfun := HeartFun}, Env) ->
|
heartfun := HeartFun}, Env) ->
|
||||||
AllowAnonymous = get_value(allow_anonymous, Env, false),
|
AllowAnonymous = get_value(allow_anonymous, Env, false),
|
||||||
DefaultUser = get_value(default_user, Env),
|
DefaultUser = get_value(default_user, Env),
|
||||||
#stomp_proto{peername = Peername,
|
#pstate{peername = Peername,
|
||||||
heartfun = HeartFun,
|
heartfun = HeartFun,
|
||||||
sendfun = SendFun,
|
sendfun = SendFun,
|
||||||
timers = #{},
|
timers = #{},
|
||||||
allow_anonymous = AllowAnonymous,
|
allow_anonymous = AllowAnonymous,
|
||||||
default_user = DefaultUser}.
|
default_user = DefaultUser}.
|
||||||
|
|
||||||
info(#stomp_proto{connected = Connected,
|
info(#pstate{connected = Connected,
|
||||||
proto_ver = ProtoVer,
|
proto_ver = ProtoVer,
|
||||||
proto_name = ProtoName,
|
proto_name = ProtoName,
|
||||||
heart_beats = Heartbeats,
|
heart_beats = Heartbeats,
|
||||||
|
@ -86,23 +86,25 @@ info(#stomp_proto{connected = Connected,
|
||||||
{login, Login},
|
{login, Login},
|
||||||
{subscriptions, Subscriptions}].
|
{subscriptions, Subscriptions}].
|
||||||
|
|
||||||
-spec(received(stomp_frame(), stomp_proto())
|
-spec(received(stomp_frame(), pstate())
|
||||||
-> {ok, stomp_proto()}
|
-> {ok, pstate()}
|
||||||
| {error, any(), stomp_proto()}
|
| {error, any(), pstate()}
|
||||||
| {stop, any(), stomp_proto()}).
|
| {stop, any(), pstate()}).
|
||||||
received(Frame = #stomp_frame{command = <<"STOMP">>}, State) ->
|
received(Frame = #stomp_frame{command = <<"STOMP">>}, State) ->
|
||||||
received(Frame#stomp_frame{command = <<"CONNECT">>}, State);
|
received(Frame#stomp_frame{command = <<"CONNECT">>}, State);
|
||||||
|
|
||||||
received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
|
received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
|
||||||
State = #stomp_proto{connected = false, allow_anonymous = AllowAnonymous, default_user = DefaultUser}) ->
|
State = #pstate{connected = false, allow_anonymous = AllowAnonymous, default_user = DefaultUser}) ->
|
||||||
case negotiate_version(header(<<"accept-version">>, Headers)) of
|
case negotiate_version(header(<<"accept-version">>, Headers)) of
|
||||||
{ok, Version} ->
|
{ok, Version} ->
|
||||||
Login = header(<<"login">>, Headers),
|
Login = header(<<"login">>, Headers),
|
||||||
Passc = header(<<"passcode">>, Headers),
|
Passc = header(<<"passcode">>, Headers),
|
||||||
case check_login(Login, Passc, AllowAnonymous, DefaultUser) of
|
case check_login(Login, Passc, AllowAnonymous, DefaultUser) of
|
||||||
true ->
|
true ->
|
||||||
|
emqx_logger:set_metadata_clientid(Login),
|
||||||
|
|
||||||
Heartbeats = parse_heartbeats(header(<<"heart-beat">>, Headers, <<"0,0">>)),
|
Heartbeats = parse_heartbeats(header(<<"heart-beat">>, Headers, <<"0,0">>)),
|
||||||
NState = start_heartbeart_timer(Heartbeats, State#stomp_proto{connected = true,
|
NState = start_heartbeart_timer(Heartbeats, State#pstate{connected = true,
|
||||||
proto_ver = Version, login = Login}),
|
proto_ver = Version, login = Login}),
|
||||||
send(connected_frame([{<<"version">>, Version},
|
send(connected_frame([{<<"version">>, Version},
|
||||||
{<<"heart-beat">>, reverse_heartbeats(Heartbeats)}]), NState);
|
{<<"heart-beat">>, reverse_heartbeats(Heartbeats)}]), NState);
|
||||||
|
@ -116,7 +118,7 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
|
||||||
{error, unsupported_version, State}
|
{error, unsupported_version, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
received(#stomp_frame{command = <<"CONNECT">>}, State = #stomp_proto{connected = true}) ->
|
received(#stomp_frame{command = <<"CONNECT">>}, State = #pstate{connected = true}) ->
|
||||||
{error, unexpected_connect, State};
|
{error, unexpected_connect, State};
|
||||||
|
|
||||||
received(#stomp_frame{command = <<"SEND">>, headers = Headers, body = Body}, State) ->
|
received(#stomp_frame{command = <<"SEND">>, headers = Headers, body = Body}, State) ->
|
||||||
|
@ -134,7 +136,7 @@ received(#stomp_frame{command = <<"SEND">>, headers = Headers, body = Body}, Sta
|
||||||
end;
|
end;
|
||||||
|
|
||||||
received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
|
received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
|
||||||
State = #stomp_proto{subscriptions = Subscriptions}) ->
|
State = #pstate{subscriptions = Subscriptions}) ->
|
||||||
Id = header(<<"id">>, Headers),
|
Id = header(<<"id">>, Headers),
|
||||||
Topic = header(<<"destination">>, Headers),
|
Topic = header(<<"destination">>, Headers),
|
||||||
Ack = header(<<"ack">>, Headers, <<"auto">>),
|
Ack = header(<<"ack">>, Headers, <<"auto">>),
|
||||||
|
@ -143,18 +145,18 @@ received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
|
||||||
{ok, State};
|
{ok, State};
|
||||||
false ->
|
false ->
|
||||||
emqx_broker:subscribe(Topic),
|
emqx_broker:subscribe(Topic),
|
||||||
{ok, State#stomp_proto{subscriptions = [{Id, Topic, Ack}|Subscriptions]}}
|
{ok, State#pstate{subscriptions = [{Id, Topic, Ack}|Subscriptions]}}
|
||||||
end,
|
end,
|
||||||
maybe_send_receipt(receipt_id(Headers), State1);
|
maybe_send_receipt(receipt_id(Headers), State1);
|
||||||
|
|
||||||
received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers},
|
received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers},
|
||||||
State = #stomp_proto{subscriptions = Subscriptions}) ->
|
State = #pstate{subscriptions = Subscriptions}) ->
|
||||||
Id = header(<<"id">>, Headers),
|
Id = header(<<"id">>, Headers),
|
||||||
|
|
||||||
{ok, State1} = case lists:keyfind(Id, 1, Subscriptions) of
|
{ok, State1} = case lists:keyfind(Id, 1, Subscriptions) of
|
||||||
{Id, Topic, _Ack} ->
|
{Id, Topic, _Ack} ->
|
||||||
ok = emqx_broker:unsubscribe(Topic),
|
ok = emqx_broker:unsubscribe(Topic),
|
||||||
{ok, State#stomp_proto{subscriptions = lists:keydelete(Id, 1, Subscriptions)}};
|
{ok, State#pstate{subscriptions = lists:keydelete(Id, 1, Subscriptions)}};
|
||||||
false ->
|
false ->
|
||||||
{ok, State}
|
{ok, State}
|
||||||
end,
|
end,
|
||||||
|
@ -238,7 +240,7 @@ received(#stomp_frame{command = <<"DISCONNECT">>, headers = Headers}, State) ->
|
||||||
{stop, normal, State}.
|
{stop, normal, State}.
|
||||||
|
|
||||||
send(Msg = #message{topic = Topic, headers = Headers, payload = Payload},
|
send(Msg = #message{topic = Topic, headers = Headers, payload = Payload},
|
||||||
State = #stomp_proto{subscriptions = Subscriptions}) ->
|
State = #pstate{subscriptions = Subscriptions}) ->
|
||||||
case lists:keyfind(Topic, 2, Subscriptions) of
|
case lists:keyfind(Topic, 2, Subscriptions) of
|
||||||
{Id, Topic, Ack} ->
|
{Id, Topic, Ack} ->
|
||||||
Headers0 = [{<<"subscription">>, Id},
|
Headers0 = [{<<"subscription">>, Id},
|
||||||
|
@ -260,7 +262,7 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload},
|
||||||
{error, dropped, State}
|
{error, dropped, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
send(Frame, State = #stomp_proto{sendfun = {Fun, Args}}) ->
|
send(Frame, State = #pstate{sendfun = {Fun, Args}}) ->
|
||||||
?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]),
|
?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]),
|
||||||
Data = emqx_stomp_frame:serialize(Frame),
|
Data = emqx_stomp_frame:serialize(Frame),
|
||||||
?LOG(debug, "SEND ~p", [Data]),
|
?LOG(debug, "SEND ~p", [Data]),
|
||||||
|
@ -271,23 +273,23 @@ shutdown(_Reason, _State) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
timeout(_TRef, {incoming, NewVal},
|
timeout(_TRef, {incoming, NewVal},
|
||||||
State = #stomp_proto{heart_beats = HrtBt}) ->
|
State = #pstate{heart_beats = HrtBt}) ->
|
||||||
case emqx_stomp_heartbeat:check(incoming, NewVal, HrtBt) of
|
case emqx_stomp_heartbeat:check(incoming, NewVal, HrtBt) of
|
||||||
{error, timeout} ->
|
{error, timeout} ->
|
||||||
{shutdown, heartbeat_timeout, State};
|
{shutdown, heartbeat_timeout, State};
|
||||||
{ok, NHrtBt} ->
|
{ok, NHrtBt} ->
|
||||||
{ok, reset_timer(incoming_timer, State#stomp_proto{heart_beats = NHrtBt})}
|
{ok, reset_timer(incoming_timer, State#pstate{heart_beats = NHrtBt})}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
timeout(_TRef, {outgoing, NewVal},
|
timeout(_TRef, {outgoing, NewVal},
|
||||||
State = #stomp_proto{heart_beats = HrtBt,
|
State = #pstate{heart_beats = HrtBt,
|
||||||
heartfun = {Fun, Args}}) ->
|
heartfun = {Fun, Args}}) ->
|
||||||
case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of
|
case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of
|
||||||
{error, timeout} ->
|
{error, timeout} ->
|
||||||
_ = erlang:apply(Fun, Args),
|
_ = erlang:apply(Fun, Args),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
{ok, NHrtBt} ->
|
{ok, NHrtBt} ->
|
||||||
{ok, reset_timer(outgoing_timer, State#stomp_proto{heart_beats = NHrtBt})}
|
{ok, reset_timer(outgoing_timer, State#pstate{heart_beats = NHrtBt})}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
negotiate_version(undefined) ->
|
negotiate_version(undefined) ->
|
||||||
|
@ -396,7 +398,7 @@ reverse_heartbeats({Cx, Cy}) ->
|
||||||
start_heartbeart_timer(Heartbeats, State) ->
|
start_heartbeart_timer(Heartbeats, State) ->
|
||||||
ensure_timer(
|
ensure_timer(
|
||||||
[incoming_timer, outgoing_timer],
|
[incoming_timer, outgoing_timer],
|
||||||
State#stomp_proto{heart_beats = emqx_stomp_heartbeat:init(Heartbeats)}).
|
State#pstate{heart_beats = emqx_stomp_heartbeat:init(Heartbeats)}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Timer
|
%% Timer
|
||||||
|
@ -406,7 +408,7 @@ ensure_timer([Name], State) ->
|
||||||
ensure_timer([Name | Rest], State) ->
|
ensure_timer([Name | Rest], State) ->
|
||||||
ensure_timer(Rest, ensure_timer(Name, State));
|
ensure_timer(Rest, ensure_timer(Name, State));
|
||||||
|
|
||||||
ensure_timer(Name, State = #stomp_proto{timers = Timers}) ->
|
ensure_timer(Name, State = #pstate{timers = Timers}) ->
|
||||||
TRef = maps:get(Name, Timers, undefined),
|
TRef = maps:get(Name, Timers, undefined),
|
||||||
Time = interval(Name, State),
|
Time = interval(Name, State),
|
||||||
case TRef == undefined andalso is_integer(Time) andalso Time > 0 of
|
case TRef == undefined andalso is_integer(Time) andalso Time > 0 of
|
||||||
|
@ -414,10 +416,10 @@ ensure_timer(Name, State = #stomp_proto{timers = Timers}) ->
|
||||||
false -> State %% Timer disabled or exists
|
false -> State %% Timer disabled or exists
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ensure_timer(Name, Time, State = #stomp_proto{timers = Timers}) ->
|
ensure_timer(Name, Time, State = #pstate{timers = Timers}) ->
|
||||||
Msg = maps:get(Name, ?TIMER_TABLE),
|
Msg = maps:get(Name, ?TIMER_TABLE),
|
||||||
TRef = emqx_misc:start_timer(Time, Msg),
|
TRef = emqx_misc:start_timer(Time, Msg),
|
||||||
State#stomp_proto{timers = Timers#{Name => TRef}}.
|
State#pstate{timers = Timers#{Name => TRef}}.
|
||||||
|
|
||||||
reset_timer(Name, State) ->
|
reset_timer(Name, State) ->
|
||||||
ensure_timer(Name, clean_timer(Name, State)).
|
ensure_timer(Name, clean_timer(Name, State)).
|
||||||
|
@ -425,10 +427,10 @@ reset_timer(Name, State) ->
|
||||||
reset_timer(Name, Time, State) ->
|
reset_timer(Name, Time, State) ->
|
||||||
ensure_timer(Name, Time, clean_timer(Name, State)).
|
ensure_timer(Name, Time, clean_timer(Name, State)).
|
||||||
|
|
||||||
clean_timer(Name, State = #stomp_proto{timers = Timers}) ->
|
clean_timer(Name, State = #pstate{timers = Timers}) ->
|
||||||
State#stomp_proto{timers = maps:remove(Name, Timers)}.
|
State#pstate{timers = maps:remove(Name, Timers)}.
|
||||||
|
|
||||||
interval(incoming_timer, #stomp_proto{heart_beats = HrtBt}) ->
|
interval(incoming_timer, #pstate{heart_beats = HrtBt}) ->
|
||||||
emqx_stomp_heartbeat:interval(incoming, HrtBt);
|
emqx_stomp_heartbeat:interval(incoming, HrtBt);
|
||||||
interval(outgoing_timer, #stomp_proto{heart_beats = HrtBt}) ->
|
interval(outgoing_timer, #pstate{heart_beats = HrtBt}) ->
|
||||||
emqx_stomp_heartbeat:interval(outgoing, HrtBt).
|
emqx_stomp_heartbeat:interval(outgoing, HrtBt).
|
||||||
|
|
Loading…
Reference in New Issue