commit
af1cc669c8
|
@ -185,13 +185,15 @@
|
||||||
{max_clients, 512},
|
{max_clients, 512},
|
||||||
%% Socket Access Control
|
%% Socket Access Control
|
||||||
{access, [{allow, all}]},
|
{access, [{allow, all}]},
|
||||||
|
%% Rate Limit. Format is 'burst, rate', Unit is KB/Sec
|
||||||
|
%% {rate_limit, "100,10"}, %% 100K burst, 10K rate
|
||||||
%% Socket Options
|
%% Socket Options
|
||||||
{sockopts, [
|
{sockopts, [
|
||||||
{backlog, 512}
|
|
||||||
%Set buffer if hight thoughtput
|
%Set buffer if hight thoughtput
|
||||||
%{recbuf, 4096},
|
%{recbuf, 4096},
|
||||||
%{sndbuf, 4096}
|
%{sndbuf, 4096},
|
||||||
%{buffer, 4096},
|
%{buffer, 4096},
|
||||||
|
{backlog, 512}
|
||||||
]}
|
]}
|
||||||
]},
|
]},
|
||||||
{mqtts, 8883, [
|
{mqtts, 8883, [
|
||||||
|
|
|
@ -175,15 +175,17 @@
|
||||||
{acceptors, 16},
|
{acceptors, 16},
|
||||||
%% Maximum number of concurrent clients
|
%% Maximum number of concurrent clients
|
||||||
{max_clients, 8192},
|
{max_clients, 8192},
|
||||||
|
%% Rate Limit. Format is 'burst, rate', Unit is KB/Sec.
|
||||||
|
%% {rate_limit, "10,1"}, %% 10K burst, 1K rate
|
||||||
%% Socket Access Control
|
%% Socket Access Control
|
||||||
{access, [{allow, all}]},
|
{access, [{allow, all}]},
|
||||||
%% Socket Options
|
%% Socket Options
|
||||||
{sockopts, [
|
{sockopts, [
|
||||||
{backlog, 512}
|
|
||||||
%Set buffer if hight thoughtput
|
%Set buffer if hight thoughtput
|
||||||
%{recbuf, 4096},
|
%{recbuf, 4096},
|
||||||
%{sndbuf, 4096}
|
%{sndbuf, 4096},
|
||||||
%{buffer, 4096},
|
%{buffer, 4096},
|
||||||
|
{backlog, 512}
|
||||||
]}
|
]}
|
||||||
]},
|
]},
|
||||||
{mqtts, 8883, [
|
{mqtts, 8883, [
|
||||||
|
|
|
@ -28,6 +28,10 @@
|
||||||
## max atom number
|
## max atom number
|
||||||
## +t
|
## +t
|
||||||
|
|
||||||
|
## Set the distribution buffer busy limit (dist_buf_busy_limit) in kilobytes.
|
||||||
|
## Valid range is 1-2097151. Default is 1024.
|
||||||
|
## +zdbbl 8192
|
||||||
|
|
||||||
##-------------------------------------------------------------------------
|
##-------------------------------------------------------------------------
|
||||||
## Env
|
## Env
|
||||||
##-------------------------------------------------------------------------
|
##-------------------------------------------------------------------------
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
{application, emqttd,
|
{application, emqttd,
|
||||||
[
|
[
|
||||||
{id, "emqttd"},
|
{id, "emqttd"},
|
||||||
{vsn, "0.12.3"},
|
{vsn, "0.12.4"},
|
||||||
{description, "Erlang MQTT Broker"},
|
{description, "Erlang MQTT Broker"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
|
|
|
@ -91,7 +91,8 @@ open_listener({https, Port, Options}) ->
|
||||||
mochiweb:start_http(Port, Options, MFArgs).
|
mochiweb:start_http(Port, Options, MFArgs).
|
||||||
|
|
||||||
open_listener(Protocol, Port, Options) ->
|
open_listener(Protocol, Port, Options) ->
|
||||||
MFArgs = {emqttd_client, start_link, [env(mqtt)]},
|
Rl = rate_limiter(emqttd_opts:g(rate_limit, Options)),
|
||||||
|
MFArgs = {emqttd_client, start_link, [[{rate_limiter, Rl} | env(mqtt)]]},
|
||||||
esockd:open(Protocol, Port, merge_sockopts(Options) , MFArgs).
|
esockd:open(Protocol, Port, merge_sockopts(Options) , MFArgs).
|
||||||
|
|
||||||
merge_sockopts(Options) ->
|
merge_sockopts(Options) ->
|
||||||
|
@ -99,6 +100,14 @@ merge_sockopts(Options) ->
|
||||||
proplists:get_value(sockopts, Options, [])),
|
proplists:get_value(sockopts, Options, [])),
|
||||||
emqttd_opts:merge(Options, [{sockopts, SockOpts}]).
|
emqttd_opts:merge(Options, [{sockopts, SockOpts}]).
|
||||||
|
|
||||||
|
%% TODO: will refactor in 0.14.0 release.
|
||||||
|
rate_limiter(undefined) ->
|
||||||
|
undefined;
|
||||||
|
rate_limiter(Config) ->
|
||||||
|
Bps = fun(S) -> list_to_integer(string:strip(S)) * 1024 end,
|
||||||
|
[Burst, Rate] = [Bps(S) || S <- string:tokens(Config, ",")],
|
||||||
|
esockd_rate_limiter:new(Burst, Rate).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Close Listeners
|
%% @doc Close Listeners
|
||||||
%% @end
|
%% @end
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
%%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% MQTT Client
|
%%% MQTT Client Connection.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
@ -52,7 +52,7 @@
|
||||||
conn_name,
|
conn_name,
|
||||||
await_recv,
|
await_recv,
|
||||||
conn_state,
|
conn_state,
|
||||||
conserve,
|
rate_limiter,
|
||||||
parser,
|
parser,
|
||||||
proto_state,
|
proto_state,
|
||||||
packet_opts,
|
packet_opts,
|
||||||
|
@ -85,22 +85,26 @@ unsubscribe(CPid, Topics) ->
|
||||||
|
|
||||||
init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) ->
|
init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) ->
|
||||||
% Transform if ssl.
|
% Transform if ssl.
|
||||||
{ok, NewSock} = esockd_connection:accept(SockArgs),
|
{ok, NewSock} = esockd_connection:accept(SockArgs),
|
||||||
|
%%TODO:...
|
||||||
|
{ok, BufSizes} = inet:getopts(Sock, [sndbuf, recbuf, buffer]),
|
||||||
|
io:format("~p~n", [BufSizes]),
|
||||||
{ok, Peername} = emqttd_net:peername(Sock),
|
{ok, Peername} = emqttd_net:peername(Sock),
|
||||||
{ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
|
{ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
|
||||||
SendFun = fun(Data) -> Transport:send(NewSock, Data) end,
|
SendFun = send_fun(Transport, NewSock),
|
||||||
PktOpts = proplists:get_value(packet, MqttEnv),
|
PktOpts = proplists:get_value(packet, MqttEnv),
|
||||||
ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts),
|
ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts),
|
||||||
State = control_throttle(#state{transport = Transport,
|
Limiter = proplists:get_value(rate_limiter, MqttEnv),
|
||||||
socket = NewSock,
|
State = run_socket(#state{transport = Transport,
|
||||||
peername = Peername,
|
socket = NewSock,
|
||||||
conn_name = ConnStr,
|
peername = Peername,
|
||||||
await_recv = false,
|
conn_name = ConnStr,
|
||||||
conn_state = running,
|
await_recv = false,
|
||||||
conserve = false,
|
conn_state = running,
|
||||||
packet_opts = PktOpts,
|
rate_limiter = Limiter,
|
||||||
parser = emqttd_parser:new(PktOpts),
|
packet_opts = PktOpts,
|
||||||
proto_state = ProtoState}),
|
parser = emqttd_parser:new(PktOpts),
|
||||||
|
proto_state = ProtoState}),
|
||||||
ClientOpts = proplists:get_value(client, MqttEnv),
|
ClientOpts = proplists:get_value(client, MqttEnv),
|
||||||
IdleTimout = proplists:get_value(idle_timeout, ClientOpts, 10),
|
IdleTimout = proplists:get_value(idle_timeout, ClientOpts, 10),
|
||||||
gen_server:enter_loop(?MODULE, [], State, timer:seconds(IdleTimout)).
|
gen_server:enter_loop(?MODULE, [], State, timer:seconds(IdleTimout)).
|
||||||
|
@ -131,12 +135,11 @@ handle_cast(Msg, State) ->
|
||||||
|
|
||||||
handle_info(timeout, State) ->
|
handle_info(timeout, State) ->
|
||||||
stop({shutdown, timeout}, State);
|
stop({shutdown, timeout}, State);
|
||||||
|
|
||||||
handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState,
|
%% Asynchronous SUBACK
|
||||||
conn_name = ConnName}) ->
|
handle_info({suback, PacketId, GrantedQos}, State = #state{proto_state = ProtoState}) ->
|
||||||
lager:warning("Shutdown for duplicate clientid: ~s, conn:~s",
|
{ok, ProtoState1} = emqttd_protocol:send(?SUBACK_PACKET(PacketId, GrantedQos), ProtoState),
|
||||||
[emqttd_protocol:clientid(ProtoState), ConnName]),
|
noreply(State#state{proto_state = ProtoState1});
|
||||||
stop({shutdown, duplicate_id}, State);
|
|
||||||
|
|
||||||
handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) ->
|
handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) ->
|
||||||
{ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
|
{ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
|
||||||
|
@ -146,20 +149,32 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State = #state{proto_state = Prot
|
||||||
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
|
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
|
||||||
noreply(State#state{proto_state = ProtoState1});
|
noreply(State#state{proto_state = ProtoState1});
|
||||||
|
|
||||||
handle_info({inet_reply, _Ref, ok}, State) ->
|
handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState,
|
||||||
noreply(State);
|
conn_name = ConnName}) ->
|
||||||
|
lager:warning("Shutdown for duplicate clientid: ~s, conn:~s",
|
||||||
|
[emqttd_protocol:clientid(ProtoState), ConnName]),
|
||||||
|
stop({shutdown, duplicate_id}, State);
|
||||||
|
|
||||||
|
handle_info(activate_sock, State) ->
|
||||||
|
noreply(run_socket(State#state{conn_state = running}));
|
||||||
|
|
||||||
handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) ->
|
handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) ->
|
||||||
|
Size = size(Data),
|
||||||
lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]),
|
lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]),
|
||||||
emqttd_metrics:inc('bytes/received', size(Data)),
|
emqttd_metrics:inc('bytes/received', Size),
|
||||||
received(Data, control_throttle(State #state{await_recv = false}));
|
received(Data, rate_limit(Size, State#state{await_recv = false}));
|
||||||
|
|
||||||
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
|
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
|
||||||
|
%%TODO: ...
|
||||||
network_error(Reason, State);
|
network_error(Reason, State);
|
||||||
|
|
||||||
|
handle_info({inet_reply, _Ref, ok}, State) ->
|
||||||
|
%%TODO: ok...
|
||||||
|
io:format("inet_reply ok~n"),
|
||||||
|
noreply(State);
|
||||||
|
|
||||||
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
|
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||||
?ERROR("Unexpected inet_reply - ~p", [Reason], State),
|
network_error(Reason, State);
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) ->
|
handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) ->
|
||||||
?DEBUG("Start KeepAlive with ~p seconds", [TimeoutSec], State),
|
?DEBUG("Start KeepAlive with ~p seconds", [TimeoutSec], State),
|
||||||
|
@ -174,14 +189,14 @@ handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport
|
||||||
|
|
||||||
handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
|
handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
|
||||||
case emqttd_keepalive:check(KeepAlive) of
|
case emqttd_keepalive:check(KeepAlive) of
|
||||||
{ok, KeepAlive1} ->
|
{ok, KeepAlive1} ->
|
||||||
noreply(State#state{keepalive = KeepAlive1});
|
noreply(State#state{keepalive = KeepAlive1});
|
||||||
{error, timeout} ->
|
{error, timeout} ->
|
||||||
?DEBUG("Keepalive Timeout!", [], State),
|
?DEBUG("Keepalive Timeout!", [], State),
|
||||||
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
|
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
?DEBUG("Keepalive Error - ~p", [Error], State),
|
?DEBUG("Keepalive Error - ~p", [Error], State),
|
||||||
stop({shutdown, keepalive_error}, State#state{keepalive = undefined})
|
stop({shutdown, keepalive_error}, State#state{keepalive = undefined})
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
@ -223,27 +238,27 @@ with_session(Fun, State = #state{proto_state = ProtoState}) ->
|
||||||
|
|
||||||
%% receive and parse tcp data
|
%% receive and parse tcp data
|
||||||
received(<<>>, State) ->
|
received(<<>>, State) ->
|
||||||
{noreply, State, hibernate};
|
noreply(State);
|
||||||
|
|
||||||
received(Bytes, State = #state{packet_opts = PacketOpts,
|
received(Bytes, State = #state{parser = Parser,
|
||||||
parser = Parser,
|
packet_opts = PacketOpts,
|
||||||
proto_state = ProtoState}) ->
|
proto_state = ProtoState}) ->
|
||||||
case catch Parser(Bytes) of
|
case catch Parser(Bytes) of
|
||||||
{more, NewParser} ->
|
{more, NewParser} ->
|
||||||
noreply(control_throttle(State#state{parser = NewParser}));
|
noreply(run_socket(State#state{parser = NewParser}));
|
||||||
{ok, Packet, Rest} ->
|
{ok, Packet, Rest} ->
|
||||||
emqttd_metrics:received(Packet),
|
emqttd_metrics:received(Packet),
|
||||||
case emqttd_protocol:received(Packet, ProtoState) of
|
case emqttd_protocol:received(Packet, ProtoState) of
|
||||||
{ok, ProtoState1} ->
|
{ok, ProtoState1} ->
|
||||||
received(Rest, State#state{parser = emqttd_parser:new(PacketOpts),
|
received(Rest, State#state{parser = emqttd_parser:new(PacketOpts),
|
||||||
proto_state = ProtoState1});
|
proto_state = ProtoState1});
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
?ERROR("Protocol error - ~p", [Error], State),
|
?ERROR("Protocol error - ~p", [Error], State),
|
||||||
stop({shutdown, Error}, State);
|
stop({shutdown, Error}, State);
|
||||||
{error, Error, ProtoState1} ->
|
{error, Error, ProtoState1} ->
|
||||||
stop({shutdown, Error}, State#state{proto_state = ProtoState1});
|
stop({shutdown, Error}, State#state{proto_state = ProtoState1});
|
||||||
{stop, Reason, ProtoState1} ->
|
{stop, Reason, ProtoState1} ->
|
||||||
stop(Reason, State#state{proto_state = ProtoState1})
|
stop(Reason, State#state{proto_state = ProtoState1})
|
||||||
end;
|
end;
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
?ERROR("Framing error - ~p", [Error], State),
|
?ERROR("Framing error - ~p", [Error], State),
|
||||||
|
@ -258,6 +273,20 @@ network_error(Reason, State = #state{peername = Peername}) ->
|
||||||
[emqttd_net:format(Peername), Reason]),
|
[emqttd_net:format(Peername), Reason]),
|
||||||
stop({shutdown, conn_closed}, State).
|
stop({shutdown, conn_closed}, State).
|
||||||
|
|
||||||
|
rate_limit(_Size, State = #state{rate_limiter = undefined}) ->
|
||||||
|
run_socket(State);
|
||||||
|
rate_limit(Size, State = #state{socket = Sock, rate_limiter = Limiter}) ->
|
||||||
|
{ok, BufSizes} = inet:getopts(Sock, [sndbuf, recbuf, buffer]),
|
||||||
|
io:format("~p~n", [BufSizes]),
|
||||||
|
case esockd_rate_limiter:check(Limiter, Size) of
|
||||||
|
{0, Limiter1} ->
|
||||||
|
run_socket(State#state{conn_state = running, rate_limiter = Limiter1});
|
||||||
|
{Pause, Limiter1} ->
|
||||||
|
?ERROR("~p Received, Rate Limiter Pause for ~w", [Size, Pause], State),
|
||||||
|
erlang:send_after(Pause, self(), activate_sock),
|
||||||
|
State#state{conn_state = blocked, rate_limiter = Limiter1}
|
||||||
|
end.
|
||||||
|
|
||||||
run_socket(State = #state{conn_state = blocked}) ->
|
run_socket(State = #state{conn_state = blocked}) ->
|
||||||
State;
|
State;
|
||||||
run_socket(State = #state{await_recv = true}) ->
|
run_socket(State = #state{await_recv = true}) ->
|
||||||
|
@ -266,11 +295,11 @@ run_socket(State = #state{transport = Transport, socket = Sock}) ->
|
||||||
Transport:async_recv(Sock, 0, infinity),
|
Transport:async_recv(Sock, 0, infinity),
|
||||||
State#state{await_recv = true}.
|
State#state{await_recv = true}.
|
||||||
|
|
||||||
control_throttle(State = #state{conn_state = Flow,
|
send_fun(Transport, Sock) ->
|
||||||
conserve = Conserve}) ->
|
fun(Data) ->
|
||||||
case {Flow, Conserve} of
|
try Transport:port_command(Sock, Data) of
|
||||||
{running, true} -> State #state{conn_state = blocked};
|
true -> ok
|
||||||
{blocked, false} -> run_socket(State #state{conn_state = running});
|
catch
|
||||||
{_, _} -> run_socket(State)
|
error:Error -> exit({socket_error, Error})
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -240,10 +240,7 @@ process(?SUBSCRIBE_PACKET(PacketId, TopicTable),
|
||||||
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
|
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
|
||||||
send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);
|
send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);
|
||||||
false ->
|
false ->
|
||||||
AckFun = fun(GrantedQos) ->
|
emqttd_session:subscribe(Session, PacketId, TopicTable), {ok, State}
|
||||||
send(?SUBACK_PACKET(PacketId, GrantedQos), State)
|
|
||||||
end,
|
|
||||||
emqttd_session:subscribe(Session, TopicTable, AckFun), {ok, State}
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% protect from empty topic list
|
%% protect from empty topic list
|
||||||
|
|
|
@ -81,9 +81,6 @@
|
||||||
%% Client Pid bind with session
|
%% Client Pid bind with session
|
||||||
client_pid :: pid(),
|
client_pid :: pid(),
|
||||||
|
|
||||||
%% Client Monitor
|
|
||||||
client_mon :: reference(),
|
|
||||||
|
|
||||||
%% Last packet id of the session
|
%% Last packet id of the session
|
||||||
packet_id = 1,
|
packet_id = 1,
|
||||||
|
|
||||||
|
@ -170,8 +167,12 @@ destroy(SessPid, ClientId) ->
|
||||||
subscribe(SessPid, TopicTable) ->
|
subscribe(SessPid, TopicTable) ->
|
||||||
subscribe(SessPid, TopicTable, fun(_) -> ok end).
|
subscribe(SessPid, TopicTable, fun(_) -> ok end).
|
||||||
|
|
||||||
-spec subscribe(pid(), [{binary(), mqtt_qos()}], AckFun :: fun()) -> ok.
|
-spec subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok.
|
||||||
subscribe(SessPid, TopicTable, AckFun) ->
|
subscribe(SessPid, PacketId, TopicTable) ->
|
||||||
|
From = self(),
|
||||||
|
AckFun = fun(GrantedQos) ->
|
||||||
|
From ! {suback, PacketId, GrantedQos}
|
||||||
|
end,
|
||||||
gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
|
gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -224,7 +225,8 @@ unsubscribe(SessPid, Topics) ->
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
init([CleanSess, ClientId, ClientPid]) ->
|
init([CleanSess, ClientId, ClientPid]) ->
|
||||||
%% process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
|
true = link(ClientPid),
|
||||||
QEnv = emqttd:env(mqtt, queue),
|
QEnv = emqttd:env(mqtt, queue),
|
||||||
SessEnv = emqttd:env(mqtt, session),
|
SessEnv = emqttd:env(mqtt, session),
|
||||||
Session = #session{
|
Session = #session{
|
||||||
|
@ -245,10 +247,8 @@ init([CleanSess, ClientId, ClientPid]) ->
|
||||||
collect_interval = emqttd_opts:g(collect_interval, SessEnv, 0),
|
collect_interval = emqttd_opts:g(collect_interval, SessEnv, 0),
|
||||||
timestamp = os:timestamp()},
|
timestamp = os:timestamp()},
|
||||||
emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
|
emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
|
||||||
%% monitor client
|
|
||||||
MRef = erlang:monitor(process, ClientPid),
|
|
||||||
%% start statistics
|
%% start statistics
|
||||||
{ok, start_collector(Session#session{client_mon = MRef}), hibernate}.
|
{ok, start_collector(Session), hibernate}.
|
||||||
|
|
||||||
prioritise_call(Msg, _From, _Len, _State) ->
|
prioritise_call(Msg, _From, _Len, _State) ->
|
||||||
case Msg of _ -> 0 end.
|
case Msg of _ -> 0 end.
|
||||||
|
@ -268,7 +268,6 @@ prioritise_cast(Msg, _Len, _State) ->
|
||||||
|
|
||||||
prioritise_info(Msg, _Len, _State) ->
|
prioritise_info(Msg, _Len, _State) ->
|
||||||
case Msg of
|
case Msg of
|
||||||
{'DOWN', _, _, _, _} -> 10;
|
|
||||||
{'EXIT', _, _} -> 10;
|
{'EXIT', _, _} -> 10;
|
||||||
session_expired -> 10;
|
session_expired -> 10;
|
||||||
{timeout, _, _} -> 5;
|
{timeout, _, _} -> 5;
|
||||||
|
@ -303,13 +302,13 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{
|
||||||
|
|
||||||
case TopicTable -- Subscriptions of
|
case TopicTable -- Subscriptions of
|
||||||
[] ->
|
[] ->
|
||||||
catch AckFun([Qos || {_, Qos} <- TopicTable]),
|
AckFun([Qos || {_, Qos} <- TopicTable]),
|
||||||
noreply(Session);
|
noreply(Session);
|
||||||
_ ->
|
_ ->
|
||||||
%% subscribe first and don't care if the subscriptions have been existed
|
%% subscribe first and don't care if the subscriptions have been existed
|
||||||
{ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
|
{ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
|
||||||
|
|
||||||
catch AckFun(GrantedQos),
|
AckFun(GrantedQos),
|
||||||
|
|
||||||
emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
|
emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
|
||||||
|
|
||||||
|
@ -368,7 +367,6 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
|
||||||
|
|
||||||
#session{client_id = ClientId,
|
#session{client_id = ClientId,
|
||||||
client_pid = OldClientPid,
|
client_pid = OldClientPid,
|
||||||
client_mon = MRef,
|
|
||||||
inflight_queue = InflightQ,
|
inflight_queue = InflightQ,
|
||||||
awaiting_ack = AwaitingAck,
|
awaiting_ack = AwaitingAck,
|
||||||
awaiting_comp = AwaitingComp,
|
awaiting_comp = AwaitingComp,
|
||||||
|
@ -388,10 +386,12 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
|
||||||
true ->
|
true ->
|
||||||
lager:error([{client, ClientId}], "Session(~s): ~p kickout ~p",
|
lager:error([{client, ClientId}], "Session(~s): ~p kickout ~p",
|
||||||
[ClientId, ClientPid, OldClientPid]),
|
[ClientId, ClientPid, OldClientPid]),
|
||||||
OldClientPid ! {stop, duplicate_id, ClientPid},
|
unlink(OldClientPid),
|
||||||
erlang:demonitor(MRef, [flush])
|
OldClientPid ! {stop, duplicate_id, ClientPid}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
true = link(ClientPid),
|
||||||
|
|
||||||
%% Redeliver PUBREL
|
%% Redeliver PUBREL
|
||||||
[ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)],
|
[ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)],
|
||||||
|
|
||||||
|
@ -402,7 +402,6 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
|
||||||
[cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)],
|
[cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)],
|
||||||
|
|
||||||
Session1 = Session#session{client_pid = ClientPid,
|
Session1 = Session#session{client_pid = ClientPid,
|
||||||
client_mon = erlang:monitor(process, ClientPid),
|
|
||||||
awaiting_ack = #{},
|
awaiting_ack = #{},
|
||||||
awaiting_comp = #{},
|
awaiting_comp = #{},
|
||||||
expired_timer = undefined},
|
expired_timer = undefined},
|
||||||
|
@ -548,21 +547,24 @@ handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id =
|
||||||
emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
|
emqttd_sm:register_session(CleanSess, ClientId, info(Session)),
|
||||||
{noreply, start_collector(Session), hibernate};
|
{noreply, start_collector(Session), hibernate};
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, ClientPid, _}, Session = #session{clean_sess = true,
|
handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
|
||||||
client_pid = ClientPid}) ->
|
client_pid = ClientPid}) ->
|
||||||
{stop, normal, Session};
|
{stop, normal, Session};
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, ClientPid, _}, Session = #session{clean_sess = false,
|
handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false,
|
||||||
client_pid = ClientPid,
|
client_id = ClientId,
|
||||||
expired_after = Expires}) ->
|
client_pid = ClientPid,
|
||||||
|
expired_after = Expires}) ->
|
||||||
|
lager:info("Session(~s): unlink with client ~p: reason=~p",
|
||||||
|
[ClientId, ClientPid, Reason]),
|
||||||
TRef = timer(Expires, session_expired),
|
TRef = timer(Expires, session_expired),
|
||||||
noreply(Session#session{client_pid = undefined, client_mon = undefined, expired_timer = TRef});
|
noreply(Session#session{client_pid = undefined, expired_timer = TRef});
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, Pid, Reason}, Session = #session{client_id = ClientId,
|
handle_info({'EXIT', Pid, Reason}, Session = #session{client_id = ClientId,
|
||||||
client_pid = ClientPid}) ->
|
client_pid = ClientPid}) ->
|
||||||
lager:error([{client, ClientId}], "Session(~s): unexpected DOWN: "
|
|
||||||
"client_pid=~p, down_pid=~p, reason=~p",
|
lager:error("Session(~s): Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p",
|
||||||
[ClientId, ClientPid, Pid, Reason]),
|
[ClientId, ClientPid, Pid, Reason]),
|
||||||
noreply(Session);
|
noreply(Session);
|
||||||
|
|
||||||
handle_info(session_expired, Session = #session{client_id = ClientId}) ->
|
handle_info(session_expired, Session = #session{client_id = ClientId}) ->
|
||||||
|
|
|
@ -1,32 +0,0 @@
|
||||||
%%%-----------------------------------------------------------------------------
|
|
||||||
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
|
|
||||||
%%%
|
|
||||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
%%% of this software and associated documentation files (the "Software"), to deal
|
|
||||||
%%% in the Software without restriction, including without limitation the rights
|
|
||||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
%%% copies of the Software, and to permit persons to whom the Software is
|
|
||||||
%%% furnished to do so, subject to the following conditions:
|
|
||||||
%%%
|
|
||||||
%%% The above copyright notice and this permission notice shall be included in all
|
|
||||||
%%% copies or substantial portions of the Software.
|
|
||||||
%%%
|
|
||||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
%%% SOFTWARE.
|
|
||||||
%%%-----------------------------------------------------------------------------
|
|
||||||
%%% @doc
|
|
||||||
%%% emqttd client throttle.
|
|
||||||
%%%
|
|
||||||
%%% @end
|
|
||||||
%%%-----------------------------------------------------------------------------
|
|
||||||
-module(emqttd_throttle).
|
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
|
||||||
|
|
||||||
%% TODO:... 0.11.0...
|
|
||||||
|
|
|
@ -115,7 +115,7 @@ reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
|
||||||
State#wsocket_state{parser = emqttd_parser:new(PktOpts)}.
|
State#wsocket_state{parser = emqttd_parser:new(PktOpts)}.
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% gen_fsm callbacks
|
%%% gen_server callbacks
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
init([WsPid, Req, ReplyChannel, PktOpts]) ->
|
init([WsPid, Req, ReplyChannel, PktOpts]) ->
|
||||||
|
@ -164,6 +164,11 @@ handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState})
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
%% Asynchronous SUBACK
|
||||||
|
handle_info({suback, PacketId, GrantedQos}, State = #client_state{proto_state = ProtoState}) ->
|
||||||
|
{ok, ProtoState1} = emqttd_protocol:send(?SUBACK_PACKET(PacketId, GrantedQos), ProtoState),
|
||||||
|
noreply(State#client_state{proto_state = ProtoState1});
|
||||||
|
|
||||||
handle_info({deliver, Message}, State = #client_state{proto_state = ProtoState}) ->
|
handle_info({deliver, Message}, State = #client_state{proto_state = ProtoState}) ->
|
||||||
{ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
|
{ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
|
||||||
noreply(State#client_state{proto_state = ProtoState1});
|
noreply(State#client_state{proto_state = ProtoState1});
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
-module(emqttd_retained_tests).
|
||||||
|
|
||||||
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
retain_test() ->
|
||||||
|
mnesia:start(),
|
||||||
|
emqttd_retained:mnesia(boot),
|
||||||
|
mnesia:stop().
|
||||||
|
|
||||||
|
-endif.
|
Loading…
Reference in New Issue