Merge branch 'dev' of github.com:emqtt/emqtt into dev

This commit is contained in:
Ery Lee 2015-01-06 11:51:18 +08:00
commit e7bb275923
3 changed files with 16 additions and 5 deletions

View File

@ -65,6 +65,7 @@ init([Sock]) ->
handle_call({go, Sock}, _From, State = #state{socket = Sock}) ->
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
lager:debug("conn from ~s", [ConnStr]),
{reply, ok,
control_throttle(
#state{ socket = Sock,
@ -73,7 +74,7 @@ handle_call({go, Sock}, _From, State = #state{socket = Sock}) ->
connection_state = running,
conserve = false,
parse_state = emqtt_frame:initial_state(),
proto_state = emqtt_protocol:initial_state()})};
proto_state = emqtt_protocol:initial_state(Sock)})};
handle_call(info, _From, State = #state{conn_name=ConnName, proto_state = ProtoState}) ->
{reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State};
@ -125,6 +126,12 @@ handle_info(Info, State) ->
lager:error("badinfo :~p",[Info]),
{stop, {badinfo, Info}, State}.
terminate(_Reason, #state{proto_state = unefined}) ->
%%TODO: fix keep_alive...
%%emqtt_keep_alive:cancel(KeepAlive),
%emqtt_protocol:client_terminated(ProtoState),
ok;
terminate(_Reason, #state{proto_state = ProtoState}) ->
%%TODO: fix keep_alive...
%%emqtt_keep_alive:cancel(KeepAlive),
@ -156,7 +163,7 @@ process_received_bytes(Bytes,
control_throttle( State #state{ parse_state = ParseState1 }),
hibernate};
{ok, Frame, Rest} ->
case emqtt_protol:handle_frame(Frame, ProtoState) of
case emqtt_protocol:handle_frame(Frame, ProtoState) of
{ok, ProtoState1} ->
process_received_bytes(
Rest,

View File

@ -88,7 +88,8 @@ handle_request(?CONNECT,
proto_ver = ProtoVersion,
clean_sess = CleanSess,
keep_alive = AlivePeriod,
client_id = ClientId } = Var}, State = #proto_state{socket = Sock}) ->
client_id = ClientId } = Var}, State0 = #proto_state{socket = Sock}) ->
State = State0#proto_state{client_id = ClientId},
{ReturnCode, State1} =
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
valid_client_id(ClientId)} of
@ -263,7 +264,8 @@ send_frame(Sock, Frame) ->
%%TODO: fix me later...
client_terminated(#proto_state{client_id = ClientId} = State) ->
emqtt_cm:destroy(ClientId, self()).
ok.
%emqtt_cm:destroy(ClientId, self()).
make_msg(#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = Qos,

View File

@ -38,5 +38,7 @@ route(Msg) ->
emqtt_pubsub:publish(retained(Msg)).
retained(Msg = #mqtt_msg{retain = true, topic = Topic}) ->
emqtt_retained:insert(Topic, Msg), Msg.
emqtt_retained:insert(Topic, Msg), Msg;
retained(Msg) -> Msg.