pubsut test

This commit is contained in:
root 2015-01-06 03:39:31 +00:00
parent fdc69f6ea6
commit 668b39768c
3 changed files with 16 additions and 5 deletions

View File

@ -65,6 +65,7 @@ init([Sock]) ->
handle_call({go, Sock}, _From, State = #state{socket = Sock}) -> handle_call({go, Sock}, _From, State = #state{socket = Sock}) ->
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
lager:debug("conn from ~s", [ConnStr]),
{reply, ok, {reply, ok,
control_throttle( control_throttle(
#state{ socket = Sock, #state{ socket = Sock,
@ -73,7 +74,7 @@ handle_call({go, Sock}, _From, State = #state{socket = Sock}) ->
connection_state = running, connection_state = running,
conserve = false, conserve = false,
parse_state = emqtt_frame:initial_state(), parse_state = emqtt_frame:initial_state(),
proto_state = emqtt_protocol:initial_state()})}; proto_state = emqtt_protocol:initial_state(Sock)})};
handle_call(info, _From, State = #state{conn_name=ConnName, proto_state = ProtoState}) -> handle_call(info, _From, State = #state{conn_name=ConnName, proto_state = ProtoState}) ->
{reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State}; {reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State};
@ -125,6 +126,12 @@ handle_info(Info, State) ->
lager:error("badinfo :~p",[Info]), lager:error("badinfo :~p",[Info]),
{stop, {badinfo, Info}, State}. {stop, {badinfo, Info}, State}.
terminate(_Reason, #state{proto_state = unefined}) ->
%%TODO: fix keep_alive...
%%emqtt_keep_alive:cancel(KeepAlive),
%emqtt_protocol:client_terminated(ProtoState),
ok;
terminate(_Reason, #state{proto_state = ProtoState}) -> terminate(_Reason, #state{proto_state = ProtoState}) ->
%%TODO: fix keep_alive... %%TODO: fix keep_alive...
%%emqtt_keep_alive:cancel(KeepAlive), %%emqtt_keep_alive:cancel(KeepAlive),
@ -156,7 +163,7 @@ process_received_bytes(Bytes,
control_throttle( State #state{ parse_state = ParseState1 }), control_throttle( State #state{ parse_state = ParseState1 }),
hibernate}; hibernate};
{ok, Frame, Rest} -> {ok, Frame, Rest} ->
case emqtt_protol:handle_frame(Frame, ProtoState) of case emqtt_protocol:handle_frame(Frame, ProtoState) of
{ok, ProtoState1} -> {ok, ProtoState1} ->
process_received_bytes( process_received_bytes(
Rest, Rest,

View File

@ -88,7 +88,8 @@ handle_request(?CONNECT,
proto_ver = ProtoVersion, proto_ver = ProtoVersion,
clean_sess = CleanSess, clean_sess = CleanSess,
keep_alive = AlivePeriod, keep_alive = AlivePeriod,
client_id = ClientId } = Var}, State = #proto_state{socket = Sock}) -> client_id = ClientId } = Var}, State0 = #proto_state{socket = Sock}) ->
State = State0#proto_state{client_id = ClientId},
{ReturnCode, State1} = {ReturnCode, State1} =
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)), case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
valid_client_id(ClientId)} of valid_client_id(ClientId)} of
@ -262,7 +263,8 @@ send_frame(Sock, Frame) ->
%%TODO: fix me later... %%TODO: fix me later...
client_terminated(#proto_state{client_id = ClientId} = State) -> client_terminated(#proto_state{client_id = ClientId} = State) ->
emqtt_cm:destroy(ClientId, self()). ok.
%emqtt_cm:destroy(ClientId, self()).
make_msg(#mqtt_frame{ make_msg(#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = Qos, fixed = #mqtt_frame_fixed{qos = Qos,

View File

@ -38,5 +38,7 @@ route(Msg) ->
emqtt_pubsub:publish(retained(Msg)). emqtt_pubsub:publish(retained(Msg)).
retained(Msg = #mqtt_msg{retain = true, topic = Topic}) -> retained(Msg = #mqtt_msg{retain = true, topic = Topic}) ->
emqtt_retained:insert(Topic, Msg), Msg. emqtt_retained:insert(Topic, Msg), Msg;
retained(Msg) -> Msg.