state -> conn_state

This commit is contained in:
Ery Lee 2015-01-06 12:17:25 +08:00
parent e7bb275923
commit f1c7185f52
3 changed files with 60 additions and 49 deletions

View File

@ -40,7 +40,7 @@
-include("emqtt_frame.hrl"). -include("emqtt_frame.hrl").
%%Client State... %%Client State...
-record(state, { -record(conn_state, {
socket, socket,
conn_name, conn_name,
await_recv, await_recv,
@ -61,14 +61,14 @@ go(Pid, Sock) ->
gen_server:call(Pid, {go, Sock}). gen_server:call(Pid, {go, Sock}).
init([Sock]) -> init([Sock]) ->
{ok, #state{socket = Sock}, hibernate}. {ok, #conn_state{socket = Sock}, hibernate}.
handle_call({go, Sock}, _From, State = #state{socket = Sock}) -> handle_call({go, Sock}, _From, State = #conn_state{socket = Sock}) ->
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
lager:debug("conn from ~s", [ConnStr]), lager:debug("conn from ~s", [ConnStr]),
{reply, ok, {reply, ok,
control_throttle( control_throttle(
#state{ socket = Sock, #conn_state{ socket = Sock,
conn_name = ConnStr, conn_name = ConnStr,
await_recv = false, await_recv = false,
connection_state = running, connection_state = running,
@ -76,7 +76,7 @@ handle_call({go, Sock}, _From, State = #state{socket = Sock}) ->
parse_state = emqtt_frame:initial_state(), parse_state = emqtt_frame:initial_state(),
proto_state = emqtt_protocol:initial_state(Sock)})}; proto_state = emqtt_protocol:initial_state(Sock)})};
handle_call(info, _From, State = #state{conn_name=ConnName, proto_state = ProtoState}) -> handle_call(info, _From, State = #conn_state{conn_name=ConnName, proto_state = ProtoState}) ->
{reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State}; {reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
@ -88,22 +88,22 @@ handle_cast(Msg, State) ->
handle_info(timeout, State) -> handle_info(timeout, State) ->
stop({shutdown, timeout}, State); stop({shutdown, timeout}, State);
handle_info({stop, duplicate_id}, State=#state{conn_name=ConnName}) -> handle_info({stop, duplicate_id}, State=#conn_state{conn_name=ConnName}) ->
%%TODO: %%TODO:
%lager:error("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]), %lager:error("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),
stop({shutdown, duplicate_id}, State); stop({shutdown, duplicate_id}, State);
%%TODO: ok?? %%TODO: ok??
handle_info({dispatch, Msg}, #state{proto_state = ProtoState} = State) -> handle_info({dispatch, Msg}, #conn_state{proto_state = ProtoState} = State) ->
{ok, ProtoState1} = emqtt_protocol:send_message(Msg, ProtoState), {ok, ProtoState1} = emqtt_protocol:send_message(Msg, ProtoState),
{noreply, State#state{proto_state = ProtoState1}}; {noreply, State#conn_state{proto_state = ProtoState1}};
handle_info({inet_reply, _Ref, ok}, State) -> handle_info({inet_reply, _Ref, ok}, State) ->
{noreply, State, hibernate}; {noreply, State, hibernate};
handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ socket = Sock}=State) -> handle_info({inet_async, Sock, _Ref, {ok, Data}}, #conn_state{ socket = Sock}=State) ->
process_received_bytes( process_received_bytes(
Data, control_throttle(State #state{ await_recv = false })); Data, control_throttle(State #conn_state{ await_recv = false }));
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
network_error(Reason, State); network_error(Reason, State);
@ -112,27 +112,27 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
handle_info({inet_reply, _Sock, {error, Reason}}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
{noreply, State}; {noreply, State};
handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) -> handle_info(keep_alive_timeout, #conn_state{keep_alive=KeepAlive}=State) ->
case emqtt_keep_alive:state(KeepAlive) of case emqtt_keep_alive:state(KeepAlive) of
idle -> idle ->
lager:info("keep_alive timeout: ~p", [State#state.conn_name]), lager:info("keep_alive timeout: ~p", [State#conn_state.conn_name]),
{stop, normal, State}; {stop, normal, State};
active -> active ->
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
{noreply, State#state{keep_alive=KeepAlive1}} {noreply, State#conn_state{keep_alive=KeepAlive1}}
end; end;
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("badinfo :~p",[Info]), lager:error("badinfo :~p",[Info]),
{stop, {badinfo, Info}, State}. {stop, {badinfo, Info}, State}.
terminate(_Reason, #state{proto_state = unefined}) -> terminate(_Reason, #conn_state{proto_state = unefined}) ->
%%TODO: fix keep_alive... %%TODO: fix keep_alive...
%%emqtt_keep_alive:cancel(KeepAlive), %%emqtt_keep_alive:cancel(KeepAlive),
%emqtt_protocol:client_terminated(ProtoState), %emqtt_protocol:client_terminated(ProtoState),
ok; ok;
terminate(_Reason, #state{proto_state = ProtoState}) -> terminate(_Reason, #conn_state{proto_state = ProtoState}) ->
%%TODO: fix keep_alive... %%TODO: fix keep_alive...
%%emqtt_keep_alive:cancel(KeepAlive), %%emqtt_keep_alive:cancel(KeepAlive),
emqtt_protocol:client_terminated(ProtoState), emqtt_protocol:client_terminated(ProtoState),
@ -154,28 +154,28 @@ process_received_bytes(<<>>, State) ->
{noreply, State, hibernate}; {noreply, State, hibernate};
process_received_bytes(Bytes, process_received_bytes(Bytes,
State = #state{ parse_state = ParseState, State = #conn_state{ parse_state = ParseState,
proto_state = ProtoState, proto_state = ProtoState,
conn_name = ConnStr }) -> conn_name = ConnStr }) ->
case emqtt_frame:parse(Bytes, ParseState) of case emqtt_frame:parse(Bytes, ParseState) of
{more, ParseState1} -> {more, ParseState1} ->
{noreply, {noreply,
control_throttle( State #state{ parse_state = ParseState1 }), control_throttle( State #conn_state{ parse_state = ParseState1 }),
hibernate}; hibernate};
{ok, Frame, Rest} -> {ok, Frame, Rest} ->
case emqtt_protocol:handle_frame(Frame, ProtoState) of case emqtt_protocol:handle_frame(Frame, ProtoState) of
{ok, ProtoState1} -> {ok, ProtoState1} ->
process_received_bytes( process_received_bytes(
Rest, Rest,
State#state{ parse_state = emqtt_frame:initial_state(), State#conn_state{ parse_state = emqtt_frame:initial_state(),
proto_state = ProtoState1 }); proto_state = ProtoState1 });
{error, Error} -> {error, Error} ->
lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]), lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
stop({shutdown, Error}, State); stop({shutdown, Error}, State);
{error, Error, ProtoState1} -> {error, Error, ProtoState1} ->
stop({shutdown, Error}, State#state{proto_state = ProtoState1}); stop({shutdown, Error}, State#conn_state{proto_state = ProtoState1});
{stop, ProtoState1} -> {stop, ProtoState1} ->
stop(normal, State#state{proto_state = ProtoState1}) stop(normal, State#conn_state{proto_state = ProtoState1})
end; end;
{error, Error} -> {error, Error} ->
lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]), lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]),
@ -184,26 +184,26 @@ process_received_bytes(Bytes,
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
network_error(Reason, network_error(Reason,
State = #state{ conn_name = ConnStr}) -> State = #conn_state{ conn_name = ConnStr}) ->
lager:error("MQTT detected network error '~p' for ~p", [Reason, ConnStr]), lager:error("MQTT detected network error '~p' for ~p", [Reason, ConnStr]),
%%TODO: where to SEND WILL MSG?? %%TODO: where to SEND WILL MSG??
%%send_will_msg(State), %%send_will_msg(State),
% todo: flush channel after publish % todo: flush channel after publish
stop({shutdown, conn_closed}, State). stop({shutdown, conn_closed}, State).
run_socket(State = #state{ connection_state = blocked }) -> run_socket(State = #conn_state{ connection_state = blocked }) ->
State; State;
run_socket(State = #state{ await_recv = true }) -> run_socket(State = #conn_state{ await_recv = true }) ->
State; State;
run_socket(State = #state{ socket = Sock }) -> run_socket(State = #conn_state{ socket = Sock }) ->
async_recv(Sock, 0, infinity), async_recv(Sock, 0, infinity),
State#state{ await_recv = true }. State#conn_state{ await_recv = true }.
control_throttle(State = #state{ connection_state = Flow, control_throttle(State = #conn_state{ connection_state = Flow,
conserve = Conserve }) -> conserve = Conserve }) ->
case {Flow, Conserve} of case {Flow, Conserve} of
{running, true} -> State #state{ connection_state = blocked }; {running, true} -> State #conn_state{ connection_state = blocked };
{blocked, false} -> run_socket(State #state{ {blocked, false} -> run_socket(State #conn_state{
connection_state = running }); connection_state = running });
{_, _} -> run_socket(State) {_, _} -> run_socket(State)
end. end.

View File

@ -37,7 +37,7 @@
-export([start_link/0]). -export([start_link/0]).
-export([lookup/1, create/2, destroy/2]). -export([lookup/1, register/2, unregister/2]).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% gen_server Function Exports %% gen_server Function Exports
@ -56,56 +56,65 @@
start_link() -> start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%
%% @doc lookup client pid with clientId.
%%
-spec lookup(ClientId :: binary()) -> pid() | undefined. -spec lookup(ClientId :: binary()) -> pid() | undefined.
lookup(ClientId) -> lookup(ClientId) when is_binary(ClientId) ->
case ets:lookup(emqtt_client, ClientId) of case ets:lookup(emqtt_client, ClientId) of
[{_, Pid, _}] -> Pid; [{_, Pid, _}] -> Pid;
[] -> undefined [] -> undefined
end. end.
-spec create(ClientId :: binary(), Pid :: pid()) -> ok. %%
create(ClientId, Pid) -> %% @doc register clientId with pid.
gen_server:call(?SERVER, {create, ClientId, Pid}). %%
-spec register(ClientId :: binary(), Pid :: pid()) -> ok.
register(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) ->
gen_server:call(?SERVER, {register, ClientId, Pid}).
-spec destroy(ClientId :: binary(), Pid :: pid()) -> ok. %%
destroy(ClientId, Pid) when is_binary(ClientId) -> %% @doc unregister clientId with pid.
gen_server:cast(?SERVER, {destroy, ClientId, Pid}). %%
-spec unregister(ClientId :: binary(), Pid :: pid()) -> ok.
unregister(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) ->
gen_server:cast(?SERVER, {unregister, ClientId, Pid}).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% gen_server Function Definitions %% gen_server Function Definitions
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
init(Args) -> init([]) ->
%on one node %on one node
ets:new(?TAB, [set, named_table, protected]), ets:new(?TAB, [set, named_table, protected]),
{ok, Args}. {ok, none}.
handle_call({create, ClientId, Pid}, _From, State) -> handle_call({register, ClientId, Pid}, _From, State) ->
case ets:lookup(?TAB, ClientId) of case ets:lookup(?TAB, ClientId) of
[{_, Pid, _}] -> [{_, Pid, _}] ->
lager:error("client '~s' has been registered with ~p", [ClientId, Pid]), lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
ignore; ignore;
[{_, OldPid, MRef}] -> [{_, OldPid, MRef}] ->
OldPid ! {stop, duplicate_id}, OldPid ! {stop, duplicate_id, Pid},
erlang:demonitor(MRef), erlang:demonitor(MRef),
ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)}); insert(ClientId, Pid);
[] -> [] ->
ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)}) insert(ClientId, Pid)
end, end,
{reply, ok, State}; {reply, ok, State};
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
handle_cast({destroy, ClientId, Pid}, State) when is_binary(ClientId) -> handle_cast({unregister, ClientId, Pid}, State) ->
case ets:lookup(?TAB, ClientId) of case ets:lookup(?TAB, ClientId) of
[{_, Pid, MRef}] -> [{_, Pid, MRef}] ->
erlang:demonitor(MRef), erlang:demonitor(MRef),
ets:delete(?TAB, ClientId); ets:delete(?TAB, ClientId);
[_] -> [_] ->
ignore; ignore;
[] -> [] ->
lager:error("cannot find client '~s' with ~p", [ClientId, Pid]) lager:error("cannot find clientId '~s' with ~p", [ClientId, Pid])
end, end,
{noreply, State}; {noreply, State};
@ -125,4 +134,6 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
insert(ClientId, Pid) ->
ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)}).

View File

@ -106,7 +106,7 @@ handle_request(?CONNECT,
lager:info("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]), lager:info("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]),
%%TODO: %%TODO:
%%KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), %%KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
emqtt_cm:create(ClientId, self()), emqtt_cm:register(ClientId, self()),
{?CONNACK_ACCEPT, {?CONNACK_ACCEPT,
State #proto_state{ will_msg = make_will_msg(Var), State #proto_state{ will_msg = make_will_msg(Var),
client_id = ClientId }} client_id = ClientId }}
@ -265,7 +265,7 @@ send_frame(Sock, Frame) ->
%%TODO: fix me later... %%TODO: fix me later...
client_terminated(#proto_state{client_id = ClientId} = State) -> client_terminated(#proto_state{client_id = ClientId} = State) ->
ok. ok.
%emqtt_cm:destroy(ClientId, self()). %emqtt_cm:unregister(ClientId, self()).
make_msg(#mqtt_frame{ make_msg(#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = Qos, fixed = #mqtt_frame_fixed{qos = Qos,
@ -336,4 +336,4 @@ maybe_clean_sess(false, _Conn, _ClientId) ->
send_will_msg(#proto_state{will_msg = undefined}) -> send_will_msg(#proto_state{will_msg = undefined}) ->
ignore; ignore;
send_will_msg(#proto_state{will_msg = WillMsg }) -> send_will_msg(#proto_state{will_msg = WillMsg }) ->
emqtt_pubsub:publish(WillMsg). emqtt_router:route(WillMsg).