This commit is contained in:
Ery Lee 2015-01-13 11:50:10 +08:00
parent 35ff84a8b7
commit cf37428c91
5 changed files with 92 additions and 76 deletions

View File

@ -38,16 +38,16 @@
-include("emqtt.hrl"). -include("emqtt.hrl").
%%Client State... %%Client State...
-record(state, { -record(state, {
socket, socket,
peer_name, peer_name,
conn_name, conn_name,
await_recv, await_recv,
conn_state, conn_state,
conserve, conserve,
parse_state, parse_state,
proto_state, proto_state,
keepalive keepalive
}). }).
start_link(Sock) -> start_link(Sock) ->
@ -98,8 +98,8 @@ handle_info({stop, duplicate_id, NewPid}, State=#state{conn_name=ConnName}) ->
stop({shutdown, duplicate_id}, State); stop({shutdown, duplicate_id}, State);
%%TODO: ok?? %%TODO: ok??
handle_info({dispatch, Message}, #state{proto_state = ProtoState} = State) -> handle_info({dispatch, From, Message}, #state{proto_state = ProtoState} = State) ->
{ok, ProtoState1} = emqtt_protocol:send_message(Message, ProtoState), {ok, ProtoState1} = emqtt_protocol:send_message({From, Message}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}}; {noreply, State#state{proto_state = ProtoState1}};
handle_info({inet_reply, _Ref, ok}, State) -> handle_info({inet_reply, _Ref, ok}, State) ->
@ -185,8 +185,8 @@ process_received_bytes(Bytes,
stop({shutdown, Error}, State); stop({shutdown, Error}, State);
{error, Error, ProtoState1} -> {error, Error, ProtoState1} ->
stop({shutdown, Error}, State#state{proto_state = ProtoState1}); stop({shutdown, Error}, State#state{proto_state = ProtoState1});
{stop, ProtoState1} -> {stop, Reason, ProtoState1} ->
stop(normal, State#state{proto_state = ProtoState1}) stop(Reason, State#state{proto_state = ProtoState1})
end; end;
{error, Error} -> {error, Error} ->
lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]), lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]),

View File

@ -57,6 +57,8 @@
-define(PACKET_TYPE(Packet, Type), -define(PACKET_TYPE(Packet, Type),
Packet = #mqtt_packet { header = #mqtt_packet_header { type = Type }}). Packet = #mqtt_packet { header = #mqtt_packet_header { type = Type }}).
-define(PUBACK_PACKET(PacketId), #mqtt_packet_puback { packet_id = PacketId }).
initial_state(Socket, Peername) -> initial_state(Socket, Peername) ->
#proto_state{ #proto_state{
socket = Socket, socket = Socket,
@ -143,13 +145,14 @@ handle_packet(?CONNECT, Packet = #mqtt_packet {
{ok, State1#proto_state { session = Session }}; {ok, State1#proto_state { session = Session }};
handle_packet(?PUBLISH, Packet = #mqtt_packet { handle_packet(?PUBLISH, Packet = #mqtt_packet {
header = #mqtt_packet_header {qos = ?QOS_0}}, State) -> header = #mqtt_packet_header {qos = ?QOS_0}},
emqtt_session:publish(Session, ?QOS_0, make_message(Packet)), State = #proto_state{session = Session}) ->
emqtt_session:publish(Session, {?QOS_0, make_message(Packet)}),
{ok, State}; {ok, State};
handle_packet(?PUBLISH, Packet = #mqtt_packet { handle_packet(?PUBLISH, Packet = #mqtt_packet {
header = #mqtt_packet_header { qos = ?QOS_1 }, header = #mqtt_packet_header { qos = ?QOS_1 },
variable = #mqtt_packet_publish{packet_id = PacketId}}, variable = #mqtt_packet_publish{packet_id = PacketId }},
State = #proto_state { session = Session }) -> State = #proto_state { session = Session }) ->
emqtt_session:publish(Session, {?QOS_1, make_message(Packet)}), emqtt_session:publish(Session, {?QOS_1, make_message(Packet)}),
send_packet( make_packet(?PUBACK, PacketId), State); send_packet( make_packet(?PUBACK, PacketId), State);
@ -157,67 +160,57 @@ handle_packet(?PUBLISH, Packet = #mqtt_packet {
handle_packet(?PUBLISH, Packet = #mqtt_packet { handle_packet(?PUBLISH, Packet = #mqtt_packet {
header = #mqtt_packet_header { qos = ?QOS_2 }, header = #mqtt_packet_header { qos = ?QOS_2 },
variable = #mqtt_packet_publish { packet_id = PacketId } }, variable = #mqtt_packet_publish { packet_id = PacketId } },
State) -> State = #proto_state { session = Session }) ->
%%FIXME: this is not right...should store it first...
NewSession = emqtt_session:publish(Session, {?QOS_2, make_message(Packet)}), NewSession = emqtt_session:publish(Session, {?QOS_2, make_message(Packet)}),
send_packet( make_packet(?PUBREC, PacketId), State#proto_state {session = NewSession} ); send_packet( make_packet(?PUBREC, PacketId), State#proto_state {session = NewSession} );
handle_packet(?PUBACK, #mqtt_packet {}, State) -> handle_packet(Puback, #mqtt_packet{variable = ?PUBACK_PACKET(PacketId) },
%FIXME Later State = #proto_state { session = Session })
{ok, State}; when Puback >= ?PUBACK andalso Puback =< ?PUBCOMP ->
handle_packet(?PUBREC, #mqtt_packet { NewSession = emqtt_session:puback(Session, {Puback, PacketId}),
variable = #mqtt_packet_puback { packet_id = PacketId }}, NewState = State#proto_state {session = NewSession},
State) -> if
%FIXME Later: should release the message here Puback =:= ?PUBREC ->
send_packet( make_packet(?PUBREL, PacketId), State ); send_packet( make_packet(?PUBREL, PacketId), NewState);
Puback =:= ?PUBREL ->
handle_packet(?PUBREL, #mqtt_packet { variable = #mqtt_packet_puback { packet_id = PacketId}}, State) -> send_packet( make_packet(?PUBCOMP, PacketId), NewState);
%%FIXME: not right... true ->
erase({msg, PacketId}), ok
send_packet( make_packet(?PUBCOMP, PacketId), State ); end,
{ok, NewState};
handle_packet(?PUBCOMP, #mqtt_packet {
variable = #mqtt_packet_puback{packet_id = _PacketId}}, State) ->
%TODO: fixme later
{ok, State};
handle_packet(?SUBSCRIBE, #mqtt_packet { handle_packet(?SUBSCRIBE, #mqtt_packet {
variable = #mqtt_packet_subscribe{ variable = #mqtt_packet_subscribe{
packet_id = PacketId, packet_id = PacketId,
topic_table = Topics}, topic_table = TopicTable},
payload = undefined}, payload = undefined},
State) -> State = #proto_state { session = Session } ) ->
%%FIXME: this is not right...
[emqtt_pubsub:subscribe({Name, Qos}, self()) ||
#mqtt_topic{name=Name, qos=Qos} <- Topics],
GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics],
Topics = [{Name, Qos} || #mqtt_topic{name=Name, qos=Qos} <- TopicTable],
{ok, NewSession, GrantedQos} = emqtt_session:subscribe(Session, Topics),
send_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?SUBACK }, send_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?SUBACK },
variable = #mqtt_packet_suback{ variable = #mqtt_packet_suback{
packet_id = PacketId, packet_id = PacketId,
qos_table = GrantedQos }}, State); qos_table = GrantedQos }}, State);
handle_packet(?UNSUBSCRIBE, #mqtt_packet { handle_packet(?UNSUBSCRIBE, #mqtt_packet {
variable = #mqtt_packet_subscribe{ variable = #mqtt_packet_subscribe{
packet_id = PacketId, packet_id = PacketId,
topic_table = Topics }, topic_table = Topics },
payload = undefined}, payload = undefined},
State = #proto_state{client_id = ClientId}) -> State = #proto_state{session = Session}) ->
{ok, NewSession} = emqtt_session:unsubscribe(Session, [Name || #mqtt_topic{ name = Name } <- Topics]),
[emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
send_packet(#mqtt_packet { header = #mqtt_packet_header {type = ?UNSUBACK }, send_packet(#mqtt_packet { header = #mqtt_packet_header {type = ?UNSUBACK },
variable = #mqtt_packet_suback{packet_id = PacketId }}, State); variable = #mqtt_packet_suback{packet_id = PacketId }},
State#proto_state { session = NewSession } );
handle_packet(?PINGREQ, #mqtt_packet{}, State) -> handle_packet(?PINGREQ, #mqtt_packet{}, State) ->
send_packet(make_packet(?PINGRESP), State); send_packet(make_packet(?PINGRESP), State);
handle_packet(?DISCONNECT, #mqtt_packet{}, State=#proto_state{peer_name = PeerName, client_id = ClientId}) -> handle_packet(?DISCONNECT, #mqtt_packet{}, State) ->
{stop, State}. %%how to handle session?
{stop, normal, State}.
make_packet(Type) when Type >= ?CONNECT andalso Type =< ?DISCONNECT -> make_packet(Type) when Type >= ?CONNECT andalso Type =< ?DISCONNECT ->
#mqtt_packet{ header = #mqtt_packet_header { type = Type } }. #mqtt_packet{ header = #mqtt_packet_header { type = Type } }.
@ -232,16 +225,16 @@ puback_qos(?PUBREL) -> ?QOS_1;
puback_qos(?PUBCOMP) -> ?QOS_0. puback_qos(?PUBCOMP) -> ?QOS_0.
-spec send_message(Message, State) -> {ok, NewState} when -spec send_message(Message, State) -> {ok, NewState} when
Message :: mqtt_message(), Message :: {pid(), mqtt_message()},
State :: proto_state(), State :: proto_state(),
NewState :: proto_state(). NewState :: proto_state().
send_message(Message = #mqtt_message{ send_message({From, Message = #mqtt_message{
retain = Retain, retain = Retain,
qos = Qos, qos = Qos,
topic = Topic, topic = Topic,
dup = Dup, dup = Dup,
payload = Payload}, payload = Payload}},
State = #proto_state{packet_id = PacketId}) -> State = #proto_state{packet_id = PacketId}) ->
Packet = #mqtt_packet { Packet = #mqtt_packet {
@ -266,13 +259,13 @@ send_message(Message = #mqtt_message{
{ok, next_packet_id(State)} {ok, next_packet_id(State)}
end. end.
send_packet(Packet, #proto_state{socket = Sock, peer_name = PeerName, client_id = ClientId}) -> send_packet(Packet, State = #proto_state{socket = Sock, peer_name = PeerName, client_id = ClientId}) ->
lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]), lager:info("SENT to ~s@~s: ~s", [ClientId, PeerName, emqtt_packet:dump(Packet)]),
Data = emqtt_packet:serialise(Packet), Data = emqtt_packet:serialise(Packet),
lager:debug("SENT to ~s: ~p", [PeerName, Data]), lager:debug("SENT to ~s: ~p", [PeerName, Data]),
%%FIXME Later... %%FIXME Later...
erlang:port_command(Sock, Data). erlang:port_command(Sock, Data),
{ok, State}; {ok, State}.
%%TODO: fix me later... %%TODO: fix me later...
connection_lost(#proto_state{client_id = ClientId} = State) -> connection_lost(#proto_state{client_id = ClientId} = State) ->

View File

@ -111,7 +111,7 @@ publish(Topic, Msg) when is_binary(Topic) ->
%dispatch locally, should only be called by publish %dispatch locally, should only be called by publish
dispatch(Topic, Msg) when is_binary(Topic) -> dispatch(Topic, Msg) when is_binary(Topic) ->
[SubPid ! {dispatch, Msg} || #topic_subscriber{subpid=SubPid} <- ets:lookup(topic_subscriber, Topic)]. [SubPid ! {dispatch, {self(), Msg}} || #topic_subscriber{subpid=SubPid} <- ets:lookup(topic_subscriber, Topic)].
-spec match(Topic :: binary()) -> [topic()]. -spec match(Topic :: binary()) -> [topic()].
match(Topic) when is_binary(Topic) -> match(Topic) when is_binary(Topic) ->

View File

@ -76,7 +76,7 @@ delete(Topic) ->
gen_server:cast(?MODULE, {delete, Topic}). gen_server:cast(?MODULE, {delete, Topic}).
send(Topic, Client) -> send(Topic, Client) ->
[Client ! {dispatch, Msg} ||{_, Msg} <- lookup(Topic)]. [Client ! {dispatch, {self(), Msg}} ||{_, Msg} <- lookup(Topic)].
init([]) -> init([]) ->
ets:new(retained_msg, [set, protected, named_table]), ets:new(retained_msg, [set, protected, named_table]),

View File

@ -22,6 +22,22 @@
-module(emqtt_session). -module(emqtt_session).
-include("emqtt.hrl").
-include("emqtt_packet.hrl").
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([start/1, resume/1, publish/2, puback/2]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(session_state, { -record(session_state, {
client_id, client_id,
client_pid, client_pid,
@ -31,18 +47,6 @@
awaiting_ack, awaiting_ack,
awaiting_rel }). awaiting_rel }).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([start/1, resume/1, publish/2]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% API Function Definitions %% API Function Definitions
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
@ -63,7 +67,7 @@ publish(_, {?QOS_0, Message}) ->
%%TODO: %%TODO:
publish(_, {?QOS_1, Message}) -> publish(_, {?QOS_1, Message}) ->
emqtt_router:route(Message), emqtt_router:route(Message);
%%TODO: %%TODO:
publish(Session = #session_state{awaiting_rel = Awaiting}, {?QOS_2, Message}) -> publish(Session = #session_state{awaiting_rel = Awaiting}, {?QOS_2, Message}) ->
@ -72,8 +76,27 @@ publish(Session = #session_state{awaiting_rel = Awaiting}, {?QOS_2, Message}) ->
publish(_, {?QOS_2, Message}) -> publish(_, {?QOS_2, Message}) ->
%TODO: %TODO:
put({msg, PacketId}, pubrec), %put({msg, PacketId}, pubrec),
emqtt_router:route(Message), emqtt_router:route(Message).
puback(_, {?PUBACK, PacketId}) ->
'TODO';
puback(_, {?PUBREC, PacketId}) ->
'TODO';
puback(_, {?PUBREL, PacketId}) ->
%FIXME Later: should release the message here
erase({msg, PacketId}),
'TODO';
puback(_, {?PUBCOMP, PacketId}) ->
'TODO'.
subscribe(Session, Topics) ->
%%TODO.
{ok, Session, [Qos || {_Name, Qos} <- Topics]}.
unsubscribe(Session, Topics) ->
%%TODO.
{ok, Session}.
initial_state(ClientId) -> initial_state(ClientId) ->
#session_state { client_id = ClientId, #session_state { client_id = ClientId,