From 668b39768c4e258a3eb9ecefdd0ccc0bfb17c962 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 6 Jan 2015 03:39:31 +0000 Subject: [PATCH] pubsut test --- apps/emqtt/src/emqtt_client.erl | 11 +++++++++-- apps/emqtt/src/emqtt_protocol.erl | 6 ++++-- apps/emqtt/src/emqtt_router.erl | 4 +++- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index 8ed84a55e..22eb1a728 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -65,6 +65,7 @@ init([Sock]) -> handle_call({go, Sock}, _From, State = #state{socket = Sock}) -> {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), + lager:debug("conn from ~s", [ConnStr]), {reply, ok, control_throttle( #state{ socket = Sock, @@ -73,7 +74,7 @@ handle_call({go, Sock}, _From, State = #state{socket = Sock}) -> connection_state = running, conserve = false, parse_state = emqtt_frame:initial_state(), - proto_state = emqtt_protocol:initial_state()})}; + proto_state = emqtt_protocol:initial_state(Sock)})}; handle_call(info, _From, State = #state{conn_name=ConnName, proto_state = ProtoState}) -> {reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State}; @@ -125,6 +126,12 @@ handle_info(Info, State) -> lager:error("badinfo :~p",[Info]), {stop, {badinfo, Info}, State}. +terminate(_Reason, #state{proto_state = unefined}) -> + %%TODO: fix keep_alive... + %%emqtt_keep_alive:cancel(KeepAlive), + %emqtt_protocol:client_terminated(ProtoState), + ok; + terminate(_Reason, #state{proto_state = ProtoState}) -> %%TODO: fix keep_alive... %%emqtt_keep_alive:cancel(KeepAlive), @@ -156,7 +163,7 @@ process_received_bytes(Bytes, control_throttle( State #state{ parse_state = ParseState1 }), hibernate}; {ok, Frame, Rest} -> - case emqtt_protol:handle_frame(Frame, ProtoState) of + case emqtt_protocol:handle_frame(Frame, ProtoState) of {ok, ProtoState1} -> process_received_bytes( Rest, diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index 34da96b7c..9e004c8bd 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -88,7 +88,8 @@ handle_request(?CONNECT, proto_ver = ProtoVersion, clean_sess = CleanSess, keep_alive = AlivePeriod, - client_id = ClientId } = Var}, State = #proto_state{socket = Sock}) -> + client_id = ClientId } = Var}, State0 = #proto_state{socket = Sock}) -> + State = State0#proto_state{client_id = ClientId}, {ReturnCode, State1} = case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)), valid_client_id(ClientId)} of @@ -262,7 +263,8 @@ send_frame(Sock, Frame) -> %%TODO: fix me later... client_terminated(#proto_state{client_id = ClientId} = State) -> - emqtt_cm:destroy(ClientId, self()). + ok. + %emqtt_cm:destroy(ClientId, self()). make_msg(#mqtt_frame{ fixed = #mqtt_frame_fixed{qos = Qos, diff --git a/apps/emqtt/src/emqtt_router.erl b/apps/emqtt/src/emqtt_router.erl index 215e10522..6b5c48c9b 100644 --- a/apps/emqtt/src/emqtt_router.erl +++ b/apps/emqtt/src/emqtt_router.erl @@ -38,5 +38,7 @@ route(Msg) -> emqtt_pubsub:publish(retained(Msg)). retained(Msg = #mqtt_msg{retain = true, topic = Topic}) -> - emqtt_retained:insert(Topic, Msg), Msg. + emqtt_retained:insert(Topic, Msg), Msg; + +retained(Msg) -> Msg.