diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index 22eb1a728..da692bbc2 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -40,7 +40,7 @@ -include("emqtt_frame.hrl"). %%Client State... --record(state, { +-record(conn_state, { socket, conn_name, await_recv, @@ -61,14 +61,14 @@ go(Pid, Sock) -> gen_server:call(Pid, {go, Sock}). init([Sock]) -> - {ok, #state{socket = Sock}, hibernate}. + {ok, #conn_state{socket = Sock}, hibernate}. -handle_call({go, Sock}, _From, State = #state{socket = Sock}) -> +handle_call({go, Sock}, _From, State = #conn_state{socket = Sock}) -> {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), lager:debug("conn from ~s", [ConnStr]), {reply, ok, control_throttle( - #state{ socket = Sock, + #conn_state{ socket = Sock, conn_name = ConnStr, await_recv = false, connection_state = running, @@ -76,7 +76,7 @@ handle_call({go, Sock}, _From, State = #state{socket = Sock}) -> parse_state = emqtt_frame:initial_state(), proto_state = emqtt_protocol:initial_state(Sock)})}; -handle_call(info, _From, State = #state{conn_name=ConnName, proto_state = ProtoState}) -> +handle_call(info, _From, State = #conn_state{conn_name=ConnName, proto_state = ProtoState}) -> {reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State}; handle_call(Req, _From, State) -> @@ -88,22 +88,22 @@ handle_cast(Msg, State) -> handle_info(timeout, State) -> stop({shutdown, timeout}, State); -handle_info({stop, duplicate_id}, State=#state{conn_name=ConnName}) -> +handle_info({stop, duplicate_id}, State=#conn_state{conn_name=ConnName}) -> %%TODO: %lager:error("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]), stop({shutdown, duplicate_id}, State); %%TODO: ok?? -handle_info({dispatch, Msg}, #state{proto_state = ProtoState} = State) -> +handle_info({dispatch, Msg}, #conn_state{proto_state = ProtoState} = State) -> {ok, ProtoState1} = emqtt_protocol:send_message(Msg, ProtoState), - {noreply, State#state{proto_state = ProtoState1}}; + {noreply, State#conn_state{proto_state = ProtoState1}}; handle_info({inet_reply, _Ref, ok}, State) -> {noreply, State, hibernate}; -handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ socket = Sock}=State) -> +handle_info({inet_async, Sock, _Ref, {ok, Data}}, #conn_state{ socket = Sock}=State) -> process_received_bytes( - Data, control_throttle(State #state{ await_recv = false })); + Data, control_throttle(State #conn_state{ await_recv = false })); handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> network_error(Reason, State); @@ -112,27 +112,27 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) -> {noreply, State}; -handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) -> +handle_info(keep_alive_timeout, #conn_state{keep_alive=KeepAlive}=State) -> case emqtt_keep_alive:state(KeepAlive) of idle -> - lager:info("keep_alive timeout: ~p", [State#state.conn_name]), + lager:info("keep_alive timeout: ~p", [State#conn_state.conn_name]), {stop, normal, State}; active -> KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), - {noreply, State#state{keep_alive=KeepAlive1}} + {noreply, State#conn_state{keep_alive=KeepAlive1}} end; handle_info(Info, State) -> lager:error("badinfo :~p",[Info]), {stop, {badinfo, Info}, State}. -terminate(_Reason, #state{proto_state = unefined}) -> +terminate(_Reason, #conn_state{proto_state = unefined}) -> %%TODO: fix keep_alive... %%emqtt_keep_alive:cancel(KeepAlive), %emqtt_protocol:client_terminated(ProtoState), ok; -terminate(_Reason, #state{proto_state = ProtoState}) -> +terminate(_Reason, #conn_state{proto_state = ProtoState}) -> %%TODO: fix keep_alive... %%emqtt_keep_alive:cancel(KeepAlive), emqtt_protocol:client_terminated(ProtoState), @@ -154,28 +154,28 @@ process_received_bytes(<<>>, State) -> {noreply, State, hibernate}; process_received_bytes(Bytes, - State = #state{ parse_state = ParseState, + State = #conn_state{ parse_state = ParseState, proto_state = ProtoState, conn_name = ConnStr }) -> case emqtt_frame:parse(Bytes, ParseState) of {more, ParseState1} -> {noreply, - control_throttle( State #state{ parse_state = ParseState1 }), + control_throttle( State #conn_state{ parse_state = ParseState1 }), hibernate}; {ok, Frame, Rest} -> case emqtt_protocol:handle_frame(Frame, ProtoState) of {ok, ProtoState1} -> process_received_bytes( Rest, - State#state{ parse_state = emqtt_frame:initial_state(), + State#conn_state{ parse_state = emqtt_frame:initial_state(), proto_state = ProtoState1 }); {error, Error} -> lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]), stop({shutdown, Error}, State); {error, Error, ProtoState1} -> - stop({shutdown, Error}, State#state{proto_state = ProtoState1}); + stop({shutdown, Error}, State#conn_state{proto_state = ProtoState1}); {stop, ProtoState1} -> - stop(normal, State#state{proto_state = ProtoState1}) + stop(normal, State#conn_state{proto_state = ProtoState1}) end; {error, Error} -> lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]), @@ -184,26 +184,26 @@ process_received_bytes(Bytes, %%---------------------------------------------------------------------------- network_error(Reason, - State = #state{ conn_name = ConnStr}) -> + State = #conn_state{ conn_name = ConnStr}) -> lager:error("MQTT detected network error '~p' for ~p", [Reason, ConnStr]), %%TODO: where to SEND WILL MSG?? %%send_will_msg(State), % todo: flush channel after publish stop({shutdown, conn_closed}, State). -run_socket(State = #state{ connection_state = blocked }) -> +run_socket(State = #conn_state{ connection_state = blocked }) -> State; -run_socket(State = #state{ await_recv = true }) -> +run_socket(State = #conn_state{ await_recv = true }) -> State; -run_socket(State = #state{ socket = Sock }) -> +run_socket(State = #conn_state{ socket = Sock }) -> async_recv(Sock, 0, infinity), - State#state{ await_recv = true }. + State#conn_state{ await_recv = true }. -control_throttle(State = #state{ connection_state = Flow, +control_throttle(State = #conn_state{ connection_state = Flow, conserve = Conserve }) -> case {Flow, Conserve} of - {running, true} -> State #state{ connection_state = blocked }; - {blocked, false} -> run_socket(State #state{ + {running, true} -> State #conn_state{ connection_state = blocked }; + {blocked, false} -> run_socket(State #conn_state{ connection_state = running }); {_, _} -> run_socket(State) end. diff --git a/apps/emqtt/src/emqtt_cm.erl b/apps/emqtt/src/emqtt_cm.erl index bd5562aba..0182f71e7 100644 --- a/apps/emqtt/src/emqtt_cm.erl +++ b/apps/emqtt/src/emqtt_cm.erl @@ -37,7 +37,7 @@ -export([start_link/0]). --export([lookup/1, create/2, destroy/2]). +-export([lookup/1, register/2, unregister/2]). %% ------------------------------------------------------------------ %% gen_server Function Exports @@ -56,56 +56,65 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +%% +%% @doc lookup client pid with clientId. +%% -spec lookup(ClientId :: binary()) -> pid() | undefined. -lookup(ClientId) -> +lookup(ClientId) when is_binary(ClientId) -> case ets:lookup(emqtt_client, ClientId) of [{_, Pid, _}] -> Pid; [] -> undefined end. --spec create(ClientId :: binary(), Pid :: pid()) -> ok. -create(ClientId, Pid) -> - gen_server:call(?SERVER, {create, ClientId, Pid}). +%% +%% @doc register clientId with pid. +%% +-spec register(ClientId :: binary(), Pid :: pid()) -> ok. +register(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) -> + gen_server:call(?SERVER, {register, ClientId, Pid}). --spec destroy(ClientId :: binary(), Pid :: pid()) -> ok. -destroy(ClientId, Pid) when is_binary(ClientId) -> - gen_server:cast(?SERVER, {destroy, ClientId, Pid}). +%% +%% @doc unregister clientId with pid. +%% +-spec unregister(ClientId :: binary(), Pid :: pid()) -> ok. +unregister(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) -> + gen_server:cast(?SERVER, {unregister, ClientId, Pid}). %% ------------------------------------------------------------------ %% gen_server Function Definitions %% ------------------------------------------------------------------ -init(Args) -> +init([]) -> %on one node ets:new(?TAB, [set, named_table, protected]), - {ok, Args}. + {ok, none}. -handle_call({create, ClientId, Pid}, _From, State) -> +handle_call({register, ClientId, Pid}, _From, State) -> case ets:lookup(?TAB, ClientId) of [{_, Pid, _}] -> - lager:error("client '~s' has been registered with ~p", [ClientId, Pid]), + lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), ignore; [{_, OldPid, MRef}] -> - OldPid ! {stop, duplicate_id}, + OldPid ! {stop, duplicate_id, Pid}, erlang:demonitor(MRef), - ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)}); + insert(ClientId, Pid); [] -> - ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)}) + insert(ClientId, Pid) end, {reply, ok, State}; handle_call(_Request, _From, State) -> {reply, ok, State}. -handle_cast({destroy, ClientId, Pid}, State) when is_binary(ClientId) -> +handle_cast({unregister, ClientId, Pid}, State) -> case ets:lookup(?TAB, ClientId) of [{_, Pid, MRef}] -> erlang:demonitor(MRef), ets:delete(?TAB, ClientId); - [_] -> + [_] -> ignore; [] -> - lager:error("cannot find client '~s' with ~p", [ClientId, Pid]) + lager:error("cannot find clientId '~s' with ~p", [ClientId, Pid]) end, {noreply, State}; @@ -125,4 +134,6 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +insert(ClientId, Pid) -> + ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)}). diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index 9528b8096..f2504ace5 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -106,7 +106,7 @@ handle_request(?CONNECT, lager:info("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]), %%TODO: %%KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), - emqtt_cm:create(ClientId, self()), + emqtt_cm:register(ClientId, self()), {?CONNACK_ACCEPT, State #proto_state{ will_msg = make_will_msg(Var), client_id = ClientId }} @@ -265,7 +265,7 @@ send_frame(Sock, Frame) -> %%TODO: fix me later... client_terminated(#proto_state{client_id = ClientId} = State) -> ok. - %emqtt_cm:destroy(ClientId, self()). + %emqtt_cm:unregister(ClientId, self()). make_msg(#mqtt_frame{ fixed = #mqtt_frame_fixed{qos = Qos, @@ -336,4 +336,4 @@ maybe_clean_sess(false, _Conn, _ClientId) -> send_will_msg(#proto_state{will_msg = undefined}) -> ignore; send_will_msg(#proto_state{will_msg = WillMsg }) -> - emqtt_pubsub:publish(WillMsg). + emqtt_router:route(WillMsg).