diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index ca75dd7f0..a60c19100 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -38,16 +38,16 @@ -include("emqtt.hrl"). %%Client State... --record(state, { - socket, - peer_name, - conn_name, - await_recv, - conn_state, - conserve, - parse_state, - proto_state, - keepalive +-record(state, { + socket, + peer_name, + conn_name, + await_recv, + conn_state, + conserve, + parse_state, + proto_state, + keepalive }). start_link(Sock) -> @@ -98,8 +98,8 @@ handle_info({stop, duplicate_id, NewPid}, State=#state{conn_name=ConnName}) -> stop({shutdown, duplicate_id}, State); %%TODO: ok?? -handle_info({dispatch, Message}, #state{proto_state = ProtoState} = State) -> - {ok, ProtoState1} = emqtt_protocol:send_message(Message, ProtoState), +handle_info({dispatch, From, Message}, #state{proto_state = ProtoState} = State) -> + {ok, ProtoState1} = emqtt_protocol:send_message({From, Message}, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; handle_info({inet_reply, _Ref, ok}, State) -> @@ -185,8 +185,8 @@ process_received_bytes(Bytes, stop({shutdown, Error}, State); {error, Error, ProtoState1} -> stop({shutdown, Error}, State#state{proto_state = ProtoState1}); - {stop, ProtoState1} -> - stop(normal, State#state{proto_state = ProtoState1}) + {stop, Reason, ProtoState1} -> + stop(Reason, State#state{proto_state = ProtoState1}) end; {error, Error} -> lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]), diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index bd58c8f7e..6900924bb 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -57,6 +57,8 @@ -define(PACKET_TYPE(Packet, Type), Packet = #mqtt_packet { header = #mqtt_packet_header { type = Type }}). +-define(PUBACK_PACKET(PacketId), #mqtt_packet_puback { packet_id = PacketId }). + initial_state(Socket, Peername) -> #proto_state{ socket = Socket, @@ -143,13 +145,14 @@ handle_packet(?CONNECT, Packet = #mqtt_packet { {ok, State1#proto_state { session = Session }}; handle_packet(?PUBLISH, Packet = #mqtt_packet { - header = #mqtt_packet_header {qos = ?QOS_0}}, State) -> - emqtt_session:publish(Session, ?QOS_0, make_message(Packet)), + header = #mqtt_packet_header {qos = ?QOS_0}}, + State = #proto_state{session = Session}) -> + emqtt_session:publish(Session, {?QOS_0, make_message(Packet)}), {ok, State}; handle_packet(?PUBLISH, Packet = #mqtt_packet { header = #mqtt_packet_header { qos = ?QOS_1 }, - variable = #mqtt_packet_publish{packet_id = PacketId}}, + variable = #mqtt_packet_publish{packet_id = PacketId }}, State = #proto_state { session = Session }) -> emqtt_session:publish(Session, {?QOS_1, make_message(Packet)}), send_packet( make_packet(?PUBACK, PacketId), State); @@ -157,67 +160,57 @@ handle_packet(?PUBLISH, Packet = #mqtt_packet { handle_packet(?PUBLISH, Packet = #mqtt_packet { header = #mqtt_packet_header { qos = ?QOS_2 }, variable = #mqtt_packet_publish { packet_id = PacketId } }, - State) -> - %%FIXME: this is not right...should store it first... + State = #proto_state { session = Session }) -> NewSession = emqtt_session:publish(Session, {?QOS_2, make_message(Packet)}), send_packet( make_packet(?PUBREC, PacketId), State#proto_state {session = NewSession} ); -handle_packet(?PUBACK, #mqtt_packet {}, State) -> - %FIXME Later - {ok, State}; +handle_packet(Puback, #mqtt_packet{variable = ?PUBACK_PACKET(PacketId) }, + State = #proto_state { session = Session }) + when Puback >= ?PUBACK andalso Puback =< ?PUBCOMP -> -handle_packet(?PUBREC, #mqtt_packet { - variable = #mqtt_packet_puback { packet_id = PacketId }}, - State) -> - %FIXME Later: should release the message here - send_packet( make_packet(?PUBREL, PacketId), State ); - -handle_packet(?PUBREL, #mqtt_packet { variable = #mqtt_packet_puback { packet_id = PacketId}}, State) -> - %%FIXME: not right... - erase({msg, PacketId}), - send_packet( make_packet(?PUBCOMP, PacketId), State ); - -handle_packet(?PUBCOMP, #mqtt_packet { - variable = #mqtt_packet_puback{packet_id = _PacketId}}, State) -> - %TODO: fixme later - {ok, State}; + NewSession = emqtt_session:puback(Session, {Puback, PacketId}), + NewState = State#proto_state {session = NewSession}, + if + Puback =:= ?PUBREC -> + send_packet( make_packet(?PUBREL, PacketId), NewState); + Puback =:= ?PUBREL -> + send_packet( make_packet(?PUBCOMP, PacketId), NewState); + true -> + ok + end, + {ok, NewState}; handle_packet(?SUBSCRIBE, #mqtt_packet { variable = #mqtt_packet_subscribe{ packet_id = PacketId, - topic_table = Topics}, + topic_table = TopicTable}, payload = undefined}, - State) -> - - %%FIXME: this is not right... - [emqtt_pubsub:subscribe({Name, Qos}, self()) || - #mqtt_topic{name=Name, qos=Qos} <- Topics], - - GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics], + State = #proto_state { session = Session } ) -> + Topics = [{Name, Qos} || #mqtt_topic{name=Name, qos=Qos} <- TopicTable], + {ok, NewSession, GrantedQos} = emqtt_session:subscribe(Session, Topics), send_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?SUBACK }, variable = #mqtt_packet_suback{ packet_id = PacketId, qos_table = GrantedQos }}, State); - handle_packet(?UNSUBSCRIBE, #mqtt_packet { variable = #mqtt_packet_subscribe{ packet_id = PacketId, topic_table = Topics }, payload = undefined}, - State = #proto_state{client_id = ClientId}) -> - - [emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], - + State = #proto_state{session = Session}) -> + {ok, NewSession} = emqtt_session:unsubscribe(Session, [Name || #mqtt_topic{ name = Name } <- Topics]), send_packet(#mqtt_packet { header = #mqtt_packet_header {type = ?UNSUBACK }, - variable = #mqtt_packet_suback{packet_id = PacketId }}, State); + variable = #mqtt_packet_suback{packet_id = PacketId }}, + State#proto_state { session = NewSession } ); handle_packet(?PINGREQ, #mqtt_packet{}, State) -> send_packet(make_packet(?PINGRESP), State); -handle_packet(?DISCONNECT, #mqtt_packet{}, State=#proto_state{peer_name = PeerName, client_id = ClientId}) -> - {stop, State}. +handle_packet(?DISCONNECT, #mqtt_packet{}, State) -> + %%how to handle session? + {stop, normal, State}. make_packet(Type) when Type >= ?CONNECT andalso Type =< ?DISCONNECT -> #mqtt_packet{ header = #mqtt_packet_header { type = Type } }. @@ -232,16 +225,16 @@ puback_qos(?PUBREL) -> ?QOS_1; puback_qos(?PUBCOMP) -> ?QOS_0. -spec send_message(Message, State) -> {ok, NewState} when - Message :: mqtt_message(), + Message :: {pid(), mqtt_message()}, State :: proto_state(), NewState :: proto_state(). -send_message(Message = #mqtt_message{ +send_message({From, Message = #mqtt_message{ retain = Retain, qos = Qos, topic = Topic, dup = Dup, - payload = Payload}, + payload = Payload}}, State = #proto_state{packet_id = PacketId}) -> Packet = #mqtt_packet { @@ -266,13 +259,13 @@ send_message(Message = #mqtt_message{ {ok, next_packet_id(State)} end. -send_packet(Packet, #proto_state{socket = Sock, peer_name = PeerName, client_id = ClientId}) -> +send_packet(Packet, State = #proto_state{socket = Sock, peer_name = PeerName, client_id = ClientId}) -> lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]), Data = emqtt_packet:serialise(Packet), lager:debug("SENT to ~s: ~p", [PeerName, Data]), %%FIXME Later... - erlang:port_command(Sock, Data). - {ok, State}; + erlang:port_command(Sock, Data), + {ok, State}. %%TODO: fix me later... connection_lost(#proto_state{client_id = ClientId} = State) -> diff --git a/apps/emqtt/src/emqtt_pubsub.erl b/apps/emqtt/src/emqtt_pubsub.erl index f687b7527..84f162fdf 100644 --- a/apps/emqtt/src/emqtt_pubsub.erl +++ b/apps/emqtt/src/emqtt_pubsub.erl @@ -111,7 +111,7 @@ publish(Topic, Msg) when is_binary(Topic) -> %dispatch locally, should only be called by publish dispatch(Topic, Msg) when is_binary(Topic) -> - [SubPid ! {dispatch, Msg} || #topic_subscriber{subpid=SubPid} <- ets:lookup(topic_subscriber, Topic)]. + [SubPid ! {dispatch, {self(), Msg}} || #topic_subscriber{subpid=SubPid} <- ets:lookup(topic_subscriber, Topic)]. -spec match(Topic :: binary()) -> [topic()]. match(Topic) when is_binary(Topic) -> diff --git a/apps/emqtt/src/emqtt_retained.erl b/apps/emqtt/src/emqtt_retained.erl index 2db923316..96c605537 100644 --- a/apps/emqtt/src/emqtt_retained.erl +++ b/apps/emqtt/src/emqtt_retained.erl @@ -76,7 +76,7 @@ delete(Topic) -> gen_server:cast(?MODULE, {delete, Topic}). send(Topic, Client) -> - [Client ! {dispatch, Msg} ||{_, Msg} <- lookup(Topic)]. + [Client ! {dispatch, {self(), Msg}} ||{_, Msg} <- lookup(Topic)]. init([]) -> ets:new(retained_msg, [set, protected, named_table]), diff --git a/apps/emqtt/src/emqtt_session.erl b/apps/emqtt/src/emqtt_session.erl index da52036aa..ae6271098 100644 --- a/apps/emqtt/src/emqtt_session.erl +++ b/apps/emqtt/src/emqtt_session.erl @@ -22,6 +22,22 @@ -module(emqtt_session). +-include("emqtt.hrl"). + +-include("emqtt_packet.hrl"). + +%% ------------------------------------------------------------------ +%% API Function Exports +%% ------------------------------------------------------------------ +-export([start/1, resume/1, publish/2, puback/2]). + +%% ------------------------------------------------------------------ +%% gen_server Function Exports +%% ------------------------------------------------------------------ + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + -record(session_state, { client_id, client_pid, @@ -31,18 +47,6 @@ awaiting_ack, awaiting_rel }). -%% ------------------------------------------------------------------ -%% API Function Exports -%% ------------------------------------------------------------------ --export([start/1, resume/1, publish/2]). - -%% ------------------------------------------------------------------ -%% gen_server Function Exports -%% ------------------------------------------------------------------ - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - %% ------------------------------------------------------------------ %% API Function Definitions %% ------------------------------------------------------------------ @@ -63,7 +67,7 @@ publish(_, {?QOS_0, Message}) -> %%TODO: publish(_, {?QOS_1, Message}) -> - emqtt_router:route(Message), + emqtt_router:route(Message); %%TODO: publish(Session = #session_state{awaiting_rel = Awaiting}, {?QOS_2, Message}) -> @@ -72,8 +76,27 @@ publish(Session = #session_state{awaiting_rel = Awaiting}, {?QOS_2, Message}) -> publish(_, {?QOS_2, Message}) -> %TODO: - put({msg, PacketId}, pubrec), - emqtt_router:route(Message), + %put({msg, PacketId}, pubrec), + emqtt_router:route(Message). + +puback(_, {?PUBACK, PacketId}) -> + 'TODO'; +puback(_, {?PUBREC, PacketId}) -> + 'TODO'; +puback(_, {?PUBREL, PacketId}) -> + %FIXME Later: should release the message here + erase({msg, PacketId}), + 'TODO'; +puback(_, {?PUBCOMP, PacketId}) -> + 'TODO'. + +subscribe(Session, Topics) -> + %%TODO. + {ok, Session, [Qos || {_Name, Qos} <- Topics]}. + +unsubscribe(Session, Topics) -> + %%TODO. + {ok, Session}. initial_state(ClientId) -> #session_state { client_id = ClientId,