fix format
This commit is contained in:
parent
fb6e3dc066
commit
e8133366e1
|
@ -74,8 +74,9 @@ init(SockArgs = {Transport, Sock, _SockFun}) ->
|
||||||
proto_state = emqtt_protocol:init(Transport, NewSock, Peername)}),
|
proto_state = emqtt_protocol:init(Transport, NewSock, Peername)}),
|
||||||
gen_server:enter_loop(?MODULE, [], State, 10000).
|
gen_server:enter_loop(?MODULE, [], State, 10000).
|
||||||
|
|
||||||
handle_call(info, _From, State = #state{
|
%%TODO: Not enough...
|
||||||
conn_name=ConnName, proto_state = ProtoState}) ->
|
handle_call(info, _From, State = #state{conn_name=ConnName,
|
||||||
|
proto_state = ProtoState}) ->
|
||||||
{reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State};
|
{reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
@ -87,7 +88,8 @@ handle_cast(Msg, State) ->
|
||||||
handle_info(timeout, State) ->
|
handle_info(timeout, State) ->
|
||||||
stop({shutdown, timeout}, State);
|
stop({shutdown, timeout}, State);
|
||||||
|
|
||||||
handle_info({stop, duplicate_id, _NewPid}, State=#state{ proto_state = ProtoState, conn_name=ConnName}) ->
|
handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState,
|
||||||
|
conn_name=ConnName}) ->
|
||||||
%% TODO: to...
|
%% TODO: to...
|
||||||
%% need transfer data???
|
%% need transfer data???
|
||||||
%% emqtt_client:transfer(NewPid, Data),
|
%% emqtt_client:transfer(NewPid, Data),
|
||||||
|
@ -107,7 +109,7 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} =
|
||||||
handle_info({inet_reply, _Ref, ok}, State) ->
|
handle_info({inet_reply, _Ref, ok}, State) ->
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{ peer_name = PeerName, socket = Sock }) ->
|
handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peer_name = PeerName, socket = Sock}) ->
|
||||||
lager:debug("RECV from ~s: ~p", [PeerName, Data]),
|
lager:debug("RECV from ~s: ~p", [PeerName, Data]),
|
||||||
process_received_bytes(
|
process_received_bytes(
|
||||||
Data, control_throttle(State #state{ await_recv = false }));
|
Data, control_throttle(State #state{ await_recv = false }));
|
||||||
|
@ -124,28 +126,29 @@ handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport
|
||||||
KeepAlive = emqtt_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}),
|
KeepAlive = emqtt_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}),
|
||||||
{noreply, State#state{ keepalive = KeepAlive }};
|
{noreply, State#state{ keepalive = KeepAlive }};
|
||||||
|
|
||||||
handle_info({keepalive, timeout}, State = #state { keepalive = KeepAlive }) ->
|
handle_info({keepalive, timeout}, State = #state{keepalive = KeepAlive}) ->
|
||||||
case emqtt_keepalive:resume(KeepAlive) of
|
case emqtt_keepalive:resume(KeepAlive) of
|
||||||
timeout ->
|
timeout ->
|
||||||
lager:info("Client ~s: Keepalive Timeout!", [State#state.peer_name]),
|
lager:info("Client ~s: Keepalive Timeout!", [State#state.peer_name]),
|
||||||
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
|
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
|
||||||
{resumed, KeepAlive1} ->
|
{resumed, KeepAlive1} ->
|
||||||
lager:info("Client ~s: Keepalive Resumed", [State#state.peer_name]),
|
lager:info("Client ~s: Keepalive Resumed", [State#state.peer_name]),
|
||||||
{noreply, State#state{ keepalive = KeepAlive1 }}
|
{noreply, State#state{keepalive = KeepAlive1}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info(Info, State = #state{peer_name = PeerName}) ->
|
handle_info(Info, State = #state{peer_name = PeerName}) ->
|
||||||
lager:critical("Client ~s: unexpected info ~p",[PeerName, Info]),
|
lager:critical("Client ~s: unexpected info ~p",[PeerName, Info]),
|
||||||
{stop, {badinfo, Info}, State}.
|
{stop, {badinfo, Info}, State}.
|
||||||
|
|
||||||
terminate(Reason, #state{ peer_name = PeerName, keepalive = KeepAlive, proto_state = ProtoState }) ->
|
terminate(Reason, #state{peer_name = PeerName, keepalive = KeepAlive, proto_state = ProtoState}) ->
|
||||||
lager:info("Client ~s: ~p terminated, reason: ~p~n", [PeerName, self(), Reason]),
|
lager:info("Client ~s: ~p terminated, reason: ~p~n", [PeerName, self(), Reason]),
|
||||||
emqtt_keepalive:cancel(KeepAlive),
|
emqtt_keepalive:cancel(KeepAlive),
|
||||||
case {ProtoState, Reason} of
|
case {ProtoState, Reason} of
|
||||||
{undefined, _} -> ok;
|
{undefined, _} -> ok;
|
||||||
{_, {shutdown, Error}} ->
|
{_, {shutdown, Error}} ->
|
||||||
emqtt_protocol:shutdown(Error, ProtoState);
|
emqtt_protocol:shutdown(Error, ProtoState);
|
||||||
{_, _} -> ok %TODO:
|
{_, _} ->
|
||||||
|
ok
|
||||||
end,
|
end,
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -158,22 +161,19 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
process_received_bytes(<<>>, State) ->
|
process_received_bytes(<<>>, State) ->
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
process_received_bytes(Bytes,
|
process_received_bytes(Bytes, State = #state{parse_state = ParseState,
|
||||||
State = #state{ parse_state = ParseState,
|
|
||||||
proto_state = ProtoState,
|
proto_state = ProtoState,
|
||||||
conn_name = ConnStr }) ->
|
conn_name = ConnStr}) ->
|
||||||
case emqtt_parser:parse(Bytes, ParseState) of
|
case emqtt_parser:parse(Bytes, ParseState) of
|
||||||
{more, ParseState1} ->
|
{more, ParseState1} ->
|
||||||
{noreply,
|
{noreply,
|
||||||
control_throttle( State #state{ parse_state = ParseState1 }),
|
control_throttle( State #state{parse_state = ParseState1}),
|
||||||
hibernate};
|
hibernate};
|
||||||
{ok, Packet, Rest} ->
|
{ok, Packet, Rest} ->
|
||||||
case emqtt_protocol:handle_packet(Packet, ProtoState) of
|
case emqtt_protocol:handle_packet(Packet, ProtoState) of
|
||||||
{ok, ProtoState1} ->
|
{ok, ProtoState1} ->
|
||||||
process_received_bytes(
|
process_received_bytes(Rest, State#state{parse_state = emqtt_parser:init(),
|
||||||
Rest,
|
proto_state = ProtoState1});
|
||||||
State#state{ parse_state = emqtt_parser:init(),
|
|
||||||
proto_state = ProtoState1 });
|
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
|
lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
|
||||||
stop({shutdown, Error}, State);
|
stop({shutdown, Error}, State);
|
||||||
|
@ -188,7 +188,7 @@ process_received_bytes(Bytes,
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
network_error(Reason, State = #state{ peer_name = PeerName }) ->
|
network_error(Reason, State = #state{peer_name = PeerName}) ->
|
||||||
lager:error("Client ~s: MQTT detected network error '~p'", [PeerName, Reason]),
|
lager:error("Client ~s: MQTT detected network error '~p'", [PeerName, Reason]),
|
||||||
stop({shutdown, conn_closed}, State).
|
stop({shutdown, conn_closed}, State).
|
||||||
|
|
||||||
|
@ -200,12 +200,11 @@ run_socket(State = #state{transport = Transport, socket = Sock}) ->
|
||||||
Transport:async_recv(Sock, 0, infinity),
|
Transport:async_recv(Sock, 0, infinity),
|
||||||
State#state{ await_recv = true }.
|
State#state{ await_recv = true }.
|
||||||
|
|
||||||
control_throttle(State = #state{ conn_state = Flow,
|
control_throttle(State = #state{conn_state = Flow,
|
||||||
conserve = Conserve }) ->
|
conserve = Conserve}) ->
|
||||||
case {Flow, Conserve} of
|
case {Flow, Conserve} of
|
||||||
{running, true} -> State #state{ conn_state = blocked };
|
{running, true} -> State #state{conn_state = blocked};
|
||||||
{blocked, false} -> run_socket(State #state{
|
{blocked, false} -> run_socket(State #state{conn_state = running});
|
||||||
conn_state = running });
|
|
||||||
{_, _} -> run_socket(State)
|
{_, _} -> run_socket(State)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
%% SOFTWARE.
|
%% SOFTWARE.
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqtt_protocol).
|
-module(emqtt_protocol).
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqtt.hrl").
|
||||||
|
@ -29,14 +28,12 @@
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
|
|
||||||
-export([init/3, client_id/1]).
|
-export([init/3, client_id/1]).
|
||||||
|
|
||||||
-export([handle_packet/2, send_message/2, send_packet/2, redeliver/2, shutdown/2]).
|
-export([handle_packet/2, send_message/2, send_packet/2, redeliver/2, shutdown/2]).
|
||||||
|
|
||||||
-export([info/1]).
|
-export([info/1]).
|
||||||
|
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
%% Protocol State
|
%% Protocol State
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
|
@ -54,22 +51,14 @@
|
||||||
will_msg
|
will_msg
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
-ifdef(use_specs).
|
|
||||||
|
|
||||||
-type(proto_state() :: #proto_state{}).
|
-type(proto_state() :: #proto_state{}).
|
||||||
|
|
||||||
-spec(send_message({pid() | tuple(), mqtt_message()}, proto_state()) -> {ok, proto_state()}).
|
-spec(send_message({pid() | tuple(), mqtt_message()}, proto_state()) -> {ok, proto_state()}).
|
||||||
|
|
||||||
-spec(handle_packet(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}).
|
-spec(handle_packet(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}).
|
||||||
|
|
||||||
-endif.
|
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
-define(PACKET_TYPE(Packet, Type),
|
-define(PACKET_TYPE(Packet, Type),
|
||||||
Packet = #mqtt_packet { header = #mqtt_packet_header { type = Type }}).
|
Packet = #mqtt_packet{header = #mqtt_packet_header { type = Type }}).
|
||||||
|
|
||||||
-define(PUBACK_PACKET(PacketId), #mqtt_packet_puback { packet_id = PacketId }).
|
-define(PUBACK_PACKET(PacketId), #mqtt_packet_puback { packet_id = PacketId }).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue