seperate emqtt_protol from emqtt_client

This commit is contained in:
Feng Lee 2015-01-05 23:23:08 +08:00
parent 484cf8ed79
commit 3e6b17146a
5 changed files with 236 additions and 244 deletions

View File

@ -51,21 +51,6 @@
-define(CONNACK_CREDENTIALS, 4). %% bad user name or password -define(CONNACK_CREDENTIALS, 4). %% bad user name or password
-define(CONNACK_AUTH, 5). %% not authorized -define(CONNACK_AUTH, 5). %% not authorized
-record(state, {socket,
conn_name,
await_recv,
connection_state,
conserve,
parse_state,
message_id,
client_id,
clean_sess,
will_msg,
keep_alive,
awaiting_ack,
subtopics,
awaiting_rel}).
-record(mqtt_frame, {fixed, -record(mqtt_frame, {fixed,
variable, variable,

View File

@ -26,9 +26,7 @@
-behaviour(gen_server). -behaviour(gen_server).
-export([start_link/1, -export([start_link/1, info/1, go/2]).
info/1,
go/2]).
-export([init/1, -export([init/1,
handle_call/3, handle_call/3,
@ -43,25 +41,17 @@
-include("emqtt_frame.hrl"). -include("emqtt_frame.hrl").
%% %%Client State...
%-record(state, {socket, -record(state, {
% conn_name, socket,
% await_recv, conn_name,
% connection_state, await_recv,
% conserve, connection_state,
% parse_state, conserve,
% message_id, parse_state,
% client_id, proto_state,
% clean_sess, keep_alive
% will_msg, }).
% keep_alive,
% awaiting_ack,
% subtopics,
% awaiting_rel}).
-define(FRAME_TYPE(Frame, Type),
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
start_link(Sock) -> start_link(Sock) ->
gen_server:start_link(?MODULE, [Sock], []). gen_server:start_link(?MODULE, [Sock], []).
@ -73,7 +63,7 @@ go(Pid, Sock) ->
gen_server:call(Pid, {go, Sock}). gen_server:call(Pid, {go, Sock}).
init([Sock]) -> init([Sock]) ->
{ok, #state{socket = Sock}, 1000}. {ok, #state{socket = Sock}, hibernate}.
handle_call({go, Sock}, _From, State = #state{socket = Sock}) -> handle_call({go, Sock}, _From, State = #state{socket = Sock}) ->
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
@ -85,21 +75,13 @@ handle_call({go, Sock}, _From, State=#state{socket = Sock}) ->
connection_state = running, connection_state = running,
conserve = false, conserve = false,
parse_state = emqtt_frame:initial_state(), parse_state = emqtt_frame:initial_state(),
message_id = 1, proto_state = emqtt_protocol:initial_state()})};
subtopics = [],
awaiting_ack = gb_trees:empty(),
awaiting_rel = gb_trees:empty()})};
handle_call(info, _From, State = #state{conn_name=ConnName, proto_state = ProtoState}) ->
{reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State};
handle_call(info, _From, #state{conn_name=ConnName, handle_call(Req, _From, State) ->
message_id=MsgId, client_id=ClientId} = State) -> {stop, {badreq, Req}, State}.
Info = [{conn_name, ConnName},
{message_id, MsgId},
{client_id, ClientId}],
{reply, Info, State};
handle_call(_Req, _From, State) ->
{reply, ok, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}. {stop, {badmsg, Msg}, State}.
@ -107,45 +89,15 @@ handle_cast(Msg, State) ->
handle_info(timeout, State) -> handle_info(timeout, State) ->
stop({shutdown, timeout}, State); stop({shutdown, timeout}, State);
handle_info({stop, duplicate_id}, State=#state{conn_name=ConnName, client_id=ClientId}) -> handle_info({stop, duplicate_id}, State=#state{conn_name=ConnName}) ->
?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]), %%TODO:
%?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),
stop({shutdown, duplicate_id}, State); stop({shutdown, duplicate_id}, State);
handle_info({dispatch, Msg}, #state{socket = Sock, message_id=MsgId} = State) -> %%TODO: ok??
handle_info({dispatch, Msg}, #state{proto_state = ProtoState} = State) ->
#mqtt_msg{retain = Retain, {ok, ProtoState1} = emqtt_protocol:send_message(Msg, ProtoState),
qos = Qos, {noreply, State#state{proto_state = ProtoState1}};
topic = Topic,
dup = Dup,
payload = Payload,
encoder = Encoder} = Msg,
Payload1 =
if
Encoder == undefined -> Payload;
true -> Encoder(Payload)
end,
Frame = #mqtt_frame{
fixed = #mqtt_frame_fixed{type = ?PUBLISH,
qos = Qos,
retain = Retain,
dup = Dup},
variable = #mqtt_frame_publish{topic_name = Topic,
message_id = if
Qos == ?QOS_0 -> undefined;
true -> MsgId
end},
payload = Payload1},
send_frame(Sock, Frame),
if
Qos == ?QOS_0 ->
{noreply, State};
true ->
{noreply, next_msg_id(State)}
end;
handle_info({inet_reply, _Ref, ok}, State) -> handle_info({inet_reply, _Ref, ok}, State) ->
{noreply, State, hibernate}; {noreply, State, hibernate};
@ -157,6 +109,7 @@ handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ socket = Sock}=State)
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
network_error(Reason, State); network_error(Reason, State);
%%TODO: HOW TO HANDLE THIS??
handle_info({inet_reply, _Sock, {error, Reason}}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
{noreply, State}; {noreply, State};
@ -174,9 +127,10 @@ handle_info(Info, State) ->
?ERROR("badinfo :~p",[Info]), ?ERROR("badinfo :~p",[Info]),
{stop, {badinfo, Info}, State}. {stop, {badinfo, Info}, State}.
terminate(_Reason, #state{client_id = ClientId, keep_alive=KeepAlive}) -> terminate(_Reason, #state{proto_state = ProtoState}) ->
emqtt_keep_alive:cancel(KeepAlive), %%TODO: fix keep_alive...
emqtt_cm:destroy(ClientId, self()), %%emqtt_keep_alive:cancel(KeepAlive),
emqtt_protocol:client_terminated(ProtoState),
ok. ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
@ -192,10 +146,11 @@ async_recv(Sock, Length, Timeout) when is_port(Sock) ->
% receive and parse tcp data % receive and parse tcp data
%------------------------------------------------------- %-------------------------------------------------------
process_received_bytes(<<>>, State) -> process_received_bytes(<<>>, State) ->
{noreply, State}; {noreply, State, hibernate};
process_received_bytes(Bytes, process_received_bytes(Bytes,
State = #state{ parse_state = ParseState, State = #state{ parse_state = ParseState,
proto_state = ProtoState,
conn_name = ConnStr }) -> conn_name = ConnStr }) ->
case emqtt_frame:parse(Bytes, ParseState) of case emqtt_frame:parse(Bytes, ParseState) of
{more, ParseState1} -> {more, ParseState1} ->
@ -203,72 +158,31 @@ process_received_bytes(Bytes,
control_throttle( State #state{ parse_state = ParseState1 }), control_throttle( State #state{ parse_state = ParseState1 }),
hibernate}; hibernate};
{ok, Frame, Rest} -> {ok, Frame, Rest} ->
case process_frame(Frame, State) of case emqtt_protol:handle_frame(Frame, ProtoState) of
{ok, State1} -> {ok, ProtoState1} ->
PS = emqtt_frame:initial_state(),
process_received_bytes( process_received_bytes(
Rest, Rest,
State1 #state{ parse_state = PS}); State#state{ parse_state = emqtt_frame:initial_state(),
{err, Reason, State1} -> proto_state = ProtoState1 });
?ERROR("MQTT protocol error ~p for connection ~p~n", [Reason, ConnStr]), {error, Error} ->
stop({shutdown, Reason}, State1); ?ERROR("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
{stop, State1} -> stop({shutdown, Error}, State);
stop(normal, State1) {error, Error, ProtoState1} ->
stop({shutdown, Error}, State#state{proto_state = ProtoState1});
{stop, ProtoState1} ->
stop(normal, State#state{proto_state = ProtoState1})
end; end;
{error, Error} -> {error, Error} ->
?ERROR("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]), ?ERROR("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]),
stop({shutdown, Error}, State) stop({shutdown, Error}, State)
end. end.
process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}},
State=#state{client_id=ClientId, keep_alive=KeepAlive}) ->
KeepAlive1 = emqtt_keep_alive:activate(KeepAlive),
case validate_frame(Type, Frame) of
ok ->
?INFO("frame from ~s: ~p", [ClientId, Frame]),
handle_retained(Type, Frame),
emqtt_protocol:process_request(Type, Frame, State#state{keep_alive=KeepAlive1});
{error, Reason} ->
{err, Reason, State}
end.
next_msg_id(State = #state{ message_id = 16#ffff }) ->
State #state{ message_id = 1 };
next_msg_id(State = #state{ message_id = MsgId }) ->
State #state{ message_id = MsgId + 1 }.
maybe_clean_sess(false, _Conn, _ClientId) ->
% todo: establish subscription to deliver old unacknowledged messages
ok.
%%----------------------------------------------------------------------------
make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
undefined;
make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
will_qos = Qos,
will_topic = Topic,
will_msg = Msg }) ->
#mqtt_msg{retain = Retain,
qos = Qos,
topic = Topic,
dup = false,
payload = Msg }.
send_will_msg(#state{will_msg = undefined}) ->
ignore;
send_will_msg(#state{will_msg = WillMsg }) ->
emqtt_pubsub:publish(WillMsg).
send_frame(Sock, Frame) ->
?INFO("send frame:~p", [Frame]),
erlang:port_command(Sock, emqtt_frame:serialise(Frame)).
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
network_error(Reason, network_error(Reason,
State = #state{ conn_name = ConnStr}) -> State = #state{ conn_name = ConnStr}) ->
?ERROR("MQTT detected network error '~p' for ~p", [Reason, ConnStr]), ?ERROR("MQTT detected network error '~p' for ~p", [Reason, ConnStr]),
send_will_msg(State), %%TODO: where to SEND WILL MSG??
%%send_will_msg(State),
% todo: flush channel after publish % todo: flush channel after publish
stop({shutdown, conn_closed}, State). stop({shutdown, conn_closed}, State).
@ -292,63 +206,3 @@ control_throttle(State = #state{ connection_state = Flow,
stop(Reason, State ) -> stop(Reason, State ) ->
{stop, Reason, State}. {stop, Reason, State}.
valid_client_id(ClientId) ->
ClientIdLen = size(ClientId),
1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN.
handle_retained(?PUBLISH, #mqtt_frame{fixed = #mqtt_frame_fixed{retain = false}}) ->
ignore;
handle_retained(?PUBLISH, #mqtt_frame{
fixed = #mqtt_frame_fixed{retain = true},
variable = #mqtt_frame_publish{topic_name = Topic},
payload= <<>> }) ->
emqtt_retained:delete(Topic);
handle_retained(?PUBLISH, Frame=#mqtt_frame{
fixed = #mqtt_frame_fixed{retain = true},
variable = #mqtt_frame_publish{topic_name = Topic}}) ->
emqtt_retained:insert(Topic, make_msg(Frame));
handle_retained(_, _) ->
ignore.
validate_frame(?PUBLISH, #mqtt_frame{variable = #mqtt_frame_publish{topic_name = Topic}}) ->
case emqtt_topic:validate({publish, Topic}) of
true -> ok;
false -> {error, badtopic}
end;
validate_frame(?UNSUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) ->
ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics,
not emqtt_topic:validate({subscribe, Topic})],
case ErrTopics of
[] -> ok;
_ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic}
end;
validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) ->
ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics,
not (emqtt_topic:validate({subscribe, Topic}) and (Qos < 3))],
case ErrTopics of
[] -> ok;
_ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic}
end;
validate_frame(_Type, _Frame) ->
ok.
make_msg(#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = Qos,
retain = Retain,
dup = Dup},
variable = #mqtt_frame_publish{topic_name = Topic,
message_id = MessageId},
payload = Payload}) ->
#mqtt_msg{retain = Retain,
qos = Qos,
topic = Topic,
dup = Dup,
msgid = MessageId,
payload = Payload}.

View File

@ -25,6 +25,8 @@
-author('feng@slimchat.io'). -author('feng@slimchat.io').
-include("emqtt_log.hrl").
-behaviour(gen_server). -behaviour(gen_server).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
@ -69,7 +71,7 @@ create(ClientId, Pid) ->
-spec destroy(ClientId :: binary(), Pid :: pid()) -> ok. -spec destroy(ClientId :: binary(), Pid :: pid()) -> ok.
destroy(ClientId, Pid) when is_binary(ClientId) -> destroy(ClientId, Pid) when is_binary(ClientId) ->
gen_server:cast(?SERVER, {destroy, ClientId, Pid}); gen_server:cast(?SERVER, {destroy, ClientId, Pid}).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% gen_server Function Definitions %% gen_server Function Definitions
@ -91,7 +93,7 @@ handle_call({create, ClientId, Pid}, _From, State) ->
ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)}); ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)});
[] -> [] ->
ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)}) ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)})
end. end,
{reply, ok, State}; {reply, ok, State};
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
@ -106,7 +108,7 @@ handle_cast({destroy, ClientId, Pid}, State) when is_binary(ClientId) ->
ignore; ignore;
[] -> [] ->
?ERROR("cannot find client '~s' with ~p", [ClientId, Pid]) ?ERROR("cannot find client '~s' with ~p", [ClientId, Pid])
end end,
{noreply, State}; {noreply, State};
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
@ -114,7 +116,7 @@ handle_cast(_Msg, State) ->
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
ets:match_delete(emqtt_client, {{'_', DownPid, MRef}}), ets:match_delete(emqtt_client, {{'_', DownPid, MRef}}),
{noreply, State}. {noreply, State};
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.

View File

@ -28,16 +28,69 @@
-include("emqtt_frame.hrl"). -include("emqtt_frame.hrl").
-export([process_request/3]). -record(proto_state, {
socket,
message_id,
client_id,
clean_sess,
will_msg,
awaiting_ack,
subtopics,
awaiting_rel
}).
process_request(?CONNECT, -type proto_state() :: #proto_state{}.
-export([initial_state/1, handle_frame/2, send_message/2, client_terminated/1]).
-export([info/1]).
-define(FRAME_TYPE(Frame, Type),
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
initial_state(Socket) ->
#proto_state{
socket = Socket,
message_id = 1,
awaiting_ack = gb_trees:empty(),
subtopics = [],
awaiting_rel = gb_trees:empty()
}.
info(#proto_state{ message_id = MsgId,
client_id = ClientId,
clean_sess = CleanSess,
will_msg = WillMsg,
subtopics = SubTopics}) ->
[ {message_id, MsgId},
{client_id, ClientId},
{clean_sess, CleanSess},
{will_msg, WillMsg},
{subtopics, SubTopics} ].
-spec handle_frame(Frame, State) -> {ok, NewState} | {error, any()} when
Frame :: #mqtt_frame{},
State :: proto_state(),
NewState :: proto_state().
handle_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
State = #proto_state{client_id = ClientId}) ->
?INFO("frame from ~s: ~p", [ClientId, Frame]),
case validate_frame(Type, Frame) of
ok ->
handle_request(Type, Frame, State);
{error, Reason} ->
{error, Reason, State}
end.
handle_request(?CONNECT,
#mqtt_frame{ variable = #mqtt_frame_connect{ #mqtt_frame{ variable = #mqtt_frame_connect{
username = Username, username = Username,
password = Password, password = Password,
proto_ver = ProtoVersion, proto_ver = ProtoVersion,
clean_sess = CleanSess, clean_sess = CleanSess,
keep_alive = AlivePeriod, keep_alive = AlivePeriod,
client_id = ClientId } = Var}, #state{socket = Sock} = State) -> client_id = ClientId } = Var}, State = #proto_state{socket = Sock}) ->
{ReturnCode, State1} = {ReturnCode, State1} =
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)), case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
valid_client_id(ClientId)} of valid_client_id(ClientId)} of
@ -52,12 +105,12 @@ process_request(?CONNECT,
{?CONNACK_CREDENTIALS, State}; {?CONNACK_CREDENTIALS, State};
true -> true ->
?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]), ?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]),
%%TODO:
%%KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
emqtt_cm:create(ClientId, self()), emqtt_cm:create(ClientId, self()),
KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
{?CONNACK_ACCEPT, {?CONNACK_ACCEPT,
State #state{ will_msg = make_will_msg(Var), State #proto_state{ will_msg = make_will_msg(Var),
client_id = ClientId, client_id = ClientId }}
keep_alive = KeepAlive}}
end end
end, end,
?INFO("recv conn...:~p", [ReturnCode]), ?INFO("recv conn...:~p", [ReturnCode]),
@ -66,26 +119,26 @@ process_request(?CONNECT,
return_code = ReturnCode }}), return_code = ReturnCode }}),
{ok, State1}; {ok, State1};
process_request(?PUBLISH, Frame=#mqtt_frame{ handle_request(?PUBLISH, Frame=#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) -> fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) ->
emqtt_pubsub:publish(make_msg(Frame)), emqtt_router:route(make_msg(Frame)),
{ok, State}; {ok, State};
process_request(?PUBLISH, handle_request(?PUBLISH,
Frame=#mqtt_frame{ Frame=#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = ?QOS_1}, fixed = #mqtt_frame_fixed{qos = ?QOS_1},
variable = #mqtt_frame_publish{message_id = MsgId}}, variable = #mqtt_frame_publish{message_id = MsgId}},
State=#state{socket=Sock}) -> State=#proto_state{socket=Sock}) ->
emqtt_pubsub:publish(make_msg(Frame)), emqtt_pubsub:publish(make_msg(Frame)),
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK }, send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK },
variable = #mqtt_frame_publish{ message_id = MsgId}}), variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State}; {ok, State};
process_request(?PUBLISH, handle_request(?PUBLISH,
Frame=#mqtt_frame{ Frame=#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = ?QOS_2}, fixed = #mqtt_frame_fixed{qos = ?QOS_2},
variable = #mqtt_frame_publish{message_id = MsgId}}, variable = #mqtt_frame_publish{message_id = MsgId}},
State=#state{socket=Sock}) -> State=#proto_state{socket=Sock}) ->
emqtt_pubsub:publish(make_msg(Frame)), emqtt_pubsub:publish(make_msg(Frame)),
put({msg, MsgId}, pubrec), put({msg, MsgId}, pubrec),
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC}, send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC},
@ -93,40 +146,40 @@ process_request(?PUBLISH,
{ok, State}; {ok, State};
process_request(?PUBACK, #mqtt_frame{}, State) -> handle_request(?PUBACK, #mqtt_frame{}, State) ->
%TODO: fixme later %TODO: fixme later
{ok, State}; {ok, State};
process_request(?PUBREC, #mqtt_frame{ handle_request(?PUBREC, #mqtt_frame{
variable = #mqtt_frame_publish{message_id = MsgId}}, variable = #mqtt_frame_publish{message_id = MsgId}},
State=#state{socket=Sock}) -> State=#proto_state{socket=Sock}) ->
%TODO: fixme later %TODO: fixme later
send_frame(Sock, send_frame(Sock,
#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL}, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL},
variable = #mqtt_frame_publish{ message_id = MsgId}}), variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State}; {ok, State};
process_request(?PUBREL, handle_request(?PUBREL,
#mqtt_frame{ #mqtt_frame{
variable = #mqtt_frame_publish{message_id = MsgId}}, variable = #mqtt_frame_publish{message_id = MsgId}},
State=#state{socket=Sock}) -> State=#proto_state{socket=Sock}) ->
erase({msg, MsgId}), erase({msg, MsgId}),
send_frame(Sock, send_frame(Sock,
#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP}, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP},
variable = #mqtt_frame_publish{ message_id = MsgId}}), variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State}; {ok, State};
process_request(?PUBCOMP, #mqtt_frame{ handle_request(?PUBCOMP, #mqtt_frame{
variable = #mqtt_frame_publish{message_id = _MsgId}}, State) -> variable = #mqtt_frame_publish{message_id = _MsgId}}, State) ->
%TODO: fixme later %TODO: fixme later
{ok, State}; {ok, State};
process_request(?SUBSCRIBE, handle_request(?SUBSCRIBE,
#mqtt_frame{ #mqtt_frame{
variable = #mqtt_frame_subscribe{message_id = MessageId, variable = #mqtt_frame_subscribe{message_id = MessageId,
topic_table = Topics}, topic_table = Topics},
payload = undefined}, payload = undefined},
#state{socket=Sock} = State) -> #proto_state{socket=Sock} = State) ->
[emqtt_pubsub:subscribe({Name, Qos}, self()) || [emqtt_pubsub:subscribe({Name, Qos}, self()) ||
#mqtt_topic{name=Name, qos=Qos} <- Topics], #mqtt_topic{name=Name, qos=Qos} <- Topics],
@ -140,11 +193,11 @@ process_request(?SUBSCRIBE,
{ok, State}; {ok, State};
process_request(?UNSUBSCRIBE, handle_request(?UNSUBSCRIBE,
#mqtt_frame{ #mqtt_frame{
variable = #mqtt_frame_subscribe{message_id = MessageId, variable = #mqtt_frame_subscribe{message_id = MessageId,
topic_table = Topics }, topic_table = Topics },
payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) -> payload = undefined}, #proto_state{socket = Sock, client_id = ClientId} = State) ->
[emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], [emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
@ -154,26 +207,64 @@ process_request(?UNSUBSCRIBE,
{ok, State}; {ok, State};
process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) -> %, keep_alive=KeepAlive
handle_request(?PINGREQ, #mqtt_frame{}, #proto_state{socket=Sock}=State) ->
%Keep alive timer %Keep alive timer
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), %%TODO:...
%%KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}),
{ok, State#state{keep_alive=KeepAlive1}}; {ok, State};
process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) -> handle_request(?DISCONNECT, #mqtt_frame{}, State=#proto_state{client_id=ClientId}) ->
?INFO("~s disconnected", [ClientId]), ?INFO("~s disconnected", [ClientId]),
{stop, State}. {stop, State}.
-spec send_message(Message, State) -> {ok, NewState} when
Message :: mqtt_msg(),
State :: proto_state(),
NewState :: proto_state().
send_message(Message, State = #proto_state{socket = Sock, message_id = MsgId}) ->
#mqtt_msg{retain = Retain,
qos = Qos,
topic = Topic,
dup = Dup,
payload = Payload,
encoder = Encoder} = Message,
Payload1 =
if
Encoder == undefined -> Payload;
true -> Encoder(Payload)
end,
Frame = #mqtt_frame{
fixed = #mqtt_frame_fixed{type = ?PUBLISH,
qos = Qos,
retain = Retain,
dup = Dup},
variable = #mqtt_frame_publish{topic_name = Topic,
message_id = if
Qos == ?QOS_0 -> undefined;
true -> MsgId
end},
payload = Payload1},
send_frame(Sock, Frame),
if
Qos == ?QOS_0 ->
{ok, State};
true ->
{ok, next_msg_id(State)}
end.
send_frame(Sock, Frame) -> send_frame(Sock, Frame) ->
?INFO("send frame:~p", [Frame]), ?INFO("send frame:~p", [Frame]),
erlang:port_command(Sock, emqtt_frame:serialise(Frame)). erlang:port_command(Sock, emqtt_frame:serialise(Frame)).
valid_client_id(ClientId) -> %%TODO: fix me later...
ClientIdLen = size(ClientId), client_terminated(#proto_state{client_id = ClientId} = State) ->
1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. emqtt_cm:destroy(ClientId, self()).
validate_frame(_Type, _Frame) ->
ok.
make_msg(#mqtt_frame{ make_msg(#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = Qos, fixed = #mqtt_frame_fixed{qos = Qos,
@ -200,3 +291,48 @@ make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
topic = Topic, topic = Topic,
dup = false, dup = false,
payload = Msg }. payload = Msg }.
next_msg_id(State = #proto_state{ message_id = 16#ffff }) ->
State #proto_state{ message_id = 1 };
next_msg_id(State = #proto_state{ message_id = MsgId }) ->
State #proto_state{ message_id = MsgId + 1 }.
valid_client_id(ClientId) ->
ClientIdLen = size(ClientId),
1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN.
validate_frame(?PUBLISH, #mqtt_frame{variable = #mqtt_frame_publish{topic_name = Topic}}) ->
case emqtt_topic:validate({publish, Topic}) of
true -> ok;
false -> {error, badtopic}
end;
validate_frame(?UNSUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) ->
ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics,
not emqtt_topic:validate({subscribe, Topic})],
case ErrTopics of
[] -> ok;
_ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic}
end;
validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) ->
ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics,
not (emqtt_topic:validate({subscribe, Topic}) and (Qos < 3))],
case ErrTopics of
[] -> ok;
_ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic}
end;
validate_frame(_Type, _Frame) ->
ok.
maybe_clean_sess(false, _Conn, _ClientId) ->
% todo: establish subscription to deliver old unacknowledged messages
ok.
%%----------------------------------------------------------------------------
send_will_msg(#proto_state{will_msg = undefined}) ->
ignore;
send_will_msg(#proto_state{will_msg = WillMsg }) ->
emqtt_pubsub:publish(WillMsg).

View File

@ -20,8 +20,23 @@
%% SOFTWARE. %% SOFTWARE.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%%route chain... statistics
-module(emqtt_router). -module(emqtt_router).
-include("emqtt.hrl").
-include("emqtt_frame.hrl").
-export([route/1]).
%%Router Chain--> %%Router Chain-->
%%--->In %%--->In
%%Out<--- %%Out<---
-spec route(Msg :: mqtt_msg()) -> any().
route(Msg) ->
emqtt_pubsub:publish(retained(Msg)).
retained(Msg = #mqtt_msg{retain = true, topic = Topic}) ->
emqtt_retained:insert(Topic, Msg), Msg.