From 3e6b17146a770108db231af6779f817329a78155 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 5 Jan 2015 23:23:08 +0800 Subject: [PATCH] seperate emqtt_protol from emqtt_client --- apps/emqtt/include/emqtt_frame.hrl | 15 -- apps/emqtt/src/emqtt_client.erl | 238 ++++++----------------------- apps/emqtt/src/emqtt_cm.erl | 10 +- apps/emqtt/src/emqtt_protocol.erl | 202 ++++++++++++++++++++---- apps/emqtt/src/emqtt_router.erl | 15 ++ 5 files changed, 236 insertions(+), 244 deletions(-) diff --git a/apps/emqtt/include/emqtt_frame.hrl b/apps/emqtt/include/emqtt_frame.hrl index 5ea69a335..e878681cd 100644 --- a/apps/emqtt/include/emqtt_frame.hrl +++ b/apps/emqtt/include/emqtt_frame.hrl @@ -51,21 +51,6 @@ -define(CONNACK_CREDENTIALS, 4). %% bad user name or password -define(CONNACK_AUTH, 5). %% not authorized --record(state, {socket, - conn_name, - await_recv, - connection_state, - conserve, - parse_state, - message_id, - client_id, - clean_sess, - will_msg, - keep_alive, - awaiting_ack, - subtopics, - awaiting_rel}). - -record(mqtt_frame, {fixed, variable, diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index b0a47d74d..cd3132774 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -26,9 +26,7 @@ -behaviour(gen_server). --export([start_link/1, - info/1, - go/2]). +-export([start_link/1, info/1, go/2]). -export([init/1, handle_call/3, @@ -43,25 +41,17 @@ -include("emqtt_frame.hrl"). -%% -%-record(state, {socket, -% conn_name, -% await_recv, -% connection_state, -% conserve, -% parse_state, -% message_id, -% client_id, -% clean_sess, -% will_msg, -% keep_alive, -% awaiting_ack, -% subtopics, -% awaiting_rel}). - - --define(FRAME_TYPE(Frame, Type), - Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). +%%Client State... +-record(state, { + socket, + conn_name, + await_recv, + connection_state, + conserve, + parse_state, + proto_state, + keep_alive +}). start_link(Sock) -> gen_server:start_link(?MODULE, [Sock], []). @@ -73,9 +63,9 @@ go(Pid, Sock) -> gen_server:call(Pid, {go, Sock}). init([Sock]) -> - {ok, #state{socket = Sock}, 1000}. + {ok, #state{socket = Sock}, hibernate}. -handle_call({go, Sock}, _From, State=#state{socket = Sock}) -> +handle_call({go, Sock}, _From, State = #state{socket = Sock}) -> {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), {reply, ok, control_throttle( @@ -85,21 +75,13 @@ handle_call({go, Sock}, _From, State=#state{socket = Sock}) -> connection_state = running, conserve = false, parse_state = emqtt_frame:initial_state(), - message_id = 1, - subtopics = [], - awaiting_ack = gb_trees:empty(), - awaiting_rel = gb_trees:empty()})}; + proto_state = emqtt_protocol:initial_state()})}; +handle_call(info, _From, State = #state{conn_name=ConnName, proto_state = ProtoState}) -> + {reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State}; -handle_call(info, _From, #state{conn_name=ConnName, - message_id=MsgId, client_id=ClientId} = State) -> - Info = [{conn_name, ConnName}, - {message_id, MsgId}, - {client_id, ClientId}], - {reply, Info, State}; - -handle_call(_Req, _From, State) -> - {reply, ok, State}. +handle_call(Req, _From, State) -> + {stop, {badreq, Req}, State}. handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. @@ -107,45 +89,15 @@ handle_cast(Msg, State) -> handle_info(timeout, State) -> stop({shutdown, timeout}, State); -handle_info({stop, duplicate_id}, State=#state{conn_name=ConnName, client_id=ClientId}) -> - ?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]), +handle_info({stop, duplicate_id}, State=#state{conn_name=ConnName}) -> + %%TODO: + %?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]), stop({shutdown, duplicate_id}, State); -handle_info({dispatch, Msg}, #state{socket = Sock, message_id=MsgId} = State) -> - - #mqtt_msg{retain = Retain, - qos = Qos, - topic = Topic, - dup = Dup, - payload = Payload, - encoder = Encoder} = Msg, - - Payload1 = - if - Encoder == undefined -> Payload; - true -> Encoder(Payload) - end, - - Frame = #mqtt_frame{ - fixed = #mqtt_frame_fixed{type = ?PUBLISH, - qos = Qos, - retain = Retain, - dup = Dup}, - variable = #mqtt_frame_publish{topic_name = Topic, - message_id = if - Qos == ?QOS_0 -> undefined; - true -> MsgId - end}, - payload = Payload1}, - - send_frame(Sock, Frame), - - if - Qos == ?QOS_0 -> - {noreply, State}; - true -> - {noreply, next_msg_id(State)} - end; +%%TODO: ok?? +handle_info({dispatch, Msg}, #state{proto_state = ProtoState} = State) -> + {ok, ProtoState1} = emqtt_protocol:send_message(Msg, ProtoState), + {noreply, State#state{proto_state = ProtoState1}}; handle_info({inet_reply, _Ref, ok}, State) -> {noreply, State, hibernate}; @@ -157,6 +109,7 @@ handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ socket = Sock}=State) handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> network_error(Reason, State); +%%TODO: HOW TO HANDLE THIS?? handle_info({inet_reply, _Sock, {error, Reason}}, State) -> {noreply, State}; @@ -174,9 +127,10 @@ handle_info(Info, State) -> ?ERROR("badinfo :~p",[Info]), {stop, {badinfo, Info}, State}. -terminate(_Reason, #state{client_id = ClientId, keep_alive=KeepAlive}) -> - emqtt_keep_alive:cancel(KeepAlive), - emqtt_cm:destroy(ClientId, self()), +terminate(_Reason, #state{proto_state = ProtoState}) -> + %%TODO: fix keep_alive... + %%emqtt_keep_alive:cancel(KeepAlive), + emqtt_protocol:client_terminated(ProtoState), ok. code_change(_OldVsn, State, _Extra) -> @@ -192,10 +146,11 @@ async_recv(Sock, Length, Timeout) when is_port(Sock) -> % receive and parse tcp data %------------------------------------------------------- process_received_bytes(<<>>, State) -> - {noreply, State}; + {noreply, State, hibernate}; process_received_bytes(Bytes, State = #state{ parse_state = ParseState, + proto_state = ProtoState, conn_name = ConnStr }) -> case emqtt_frame:parse(Bytes, ParseState) of {more, ParseState1} -> @@ -203,72 +158,31 @@ process_received_bytes(Bytes, control_throttle( State #state{ parse_state = ParseState1 }), hibernate}; {ok, Frame, Rest} -> - case process_frame(Frame, State) of - {ok, State1} -> - PS = emqtt_frame:initial_state(), + case emqtt_protol:handle_frame(Frame, ProtoState) of + {ok, ProtoState1} -> process_received_bytes( Rest, - State1 #state{ parse_state = PS}); - {err, Reason, State1} -> - ?ERROR("MQTT protocol error ~p for connection ~p~n", [Reason, ConnStr]), - stop({shutdown, Reason}, State1); - {stop, State1} -> - stop(normal, State1) + State#state{ parse_state = emqtt_frame:initial_state(), + proto_state = ProtoState1 }); + {error, Error} -> + ?ERROR("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]), + stop({shutdown, Error}, State); + {error, Error, ProtoState1} -> + stop({shutdown, Error}, State#state{proto_state = ProtoState1}); + {stop, ProtoState1} -> + stop(normal, State#state{proto_state = ProtoState1}) end; {error, Error} -> ?ERROR("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]), stop({shutdown, Error}, State) end. -process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}}, - State=#state{client_id=ClientId, keep_alive=KeepAlive}) -> - KeepAlive1 = emqtt_keep_alive:activate(KeepAlive), - case validate_frame(Type, Frame) of - ok -> - ?INFO("frame from ~s: ~p", [ClientId, Frame]), - handle_retained(Type, Frame), - emqtt_protocol:process_request(Type, Frame, State#state{keep_alive=KeepAlive1}); - {error, Reason} -> - {err, Reason, State} - end. - -next_msg_id(State = #state{ message_id = 16#ffff }) -> - State #state{ message_id = 1 }; -next_msg_id(State = #state{ message_id = MsgId }) -> - State #state{ message_id = MsgId + 1 }. - -maybe_clean_sess(false, _Conn, _ClientId) -> - % todo: establish subscription to deliver old unacknowledged messages - ok. - -%%---------------------------------------------------------------------------- - -make_will_msg(#mqtt_frame_connect{ will_flag = false }) -> - undefined; -make_will_msg(#mqtt_frame_connect{ will_retain = Retain, - will_qos = Qos, - will_topic = Topic, - will_msg = Msg }) -> - #mqtt_msg{retain = Retain, - qos = Qos, - topic = Topic, - dup = false, - payload = Msg }. - -send_will_msg(#state{will_msg = undefined}) -> - ignore; -send_will_msg(#state{will_msg = WillMsg }) -> - emqtt_pubsub:publish(WillMsg). - -send_frame(Sock, Frame) -> - ?INFO("send frame:~p", [Frame]), - erlang:port_command(Sock, emqtt_frame:serialise(Frame)). - %%---------------------------------------------------------------------------- network_error(Reason, State = #state{ conn_name = ConnStr}) -> ?ERROR("MQTT detected network error '~p' for ~p", [Reason, ConnStr]), - send_will_msg(State), + %%TODO: where to SEND WILL MSG?? + %%send_will_msg(State), % todo: flush channel after publish stop({shutdown, conn_closed}, State). @@ -292,63 +206,3 @@ control_throttle(State = #state{ connection_state = Flow, stop(Reason, State ) -> {stop, Reason, State}. -valid_client_id(ClientId) -> - ClientIdLen = size(ClientId), - 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. - -handle_retained(?PUBLISH, #mqtt_frame{fixed = #mqtt_frame_fixed{retain = false}}) -> - ignore; - -handle_retained(?PUBLISH, #mqtt_frame{ - fixed = #mqtt_frame_fixed{retain = true}, - variable = #mqtt_frame_publish{topic_name = Topic}, - payload= <<>> }) -> - emqtt_retained:delete(Topic); - -handle_retained(?PUBLISH, Frame=#mqtt_frame{ - fixed = #mqtt_frame_fixed{retain = true}, - variable = #mqtt_frame_publish{topic_name = Topic}}) -> - emqtt_retained:insert(Topic, make_msg(Frame)); - -handle_retained(_, _) -> - ignore. - -validate_frame(?PUBLISH, #mqtt_frame{variable = #mqtt_frame_publish{topic_name = Topic}}) -> - case emqtt_topic:validate({publish, Topic}) of - true -> ok; - false -> {error, badtopic} - end; - -validate_frame(?UNSUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) -> - ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics, - not emqtt_topic:validate({subscribe, Topic})], - case ErrTopics of - [] -> ok; - _ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic} - end; - -validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) -> - ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics, - not (emqtt_topic:validate({subscribe, Topic}) and (Qos < 3))], - case ErrTopics of - [] -> ok; - _ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic} - end; - -validate_frame(_Type, _Frame) -> - ok. - -make_msg(#mqtt_frame{ - fixed = #mqtt_frame_fixed{qos = Qos, - retain = Retain, - dup = Dup}, - variable = #mqtt_frame_publish{topic_name = Topic, - message_id = MessageId}, - payload = Payload}) -> - #mqtt_msg{retain = Retain, - qos = Qos, - topic = Topic, - dup = Dup, - msgid = MessageId, - payload = Payload}. - diff --git a/apps/emqtt/src/emqtt_cm.erl b/apps/emqtt/src/emqtt_cm.erl index d6809f92e..94e5424e0 100644 --- a/apps/emqtt/src/emqtt_cm.erl +++ b/apps/emqtt/src/emqtt_cm.erl @@ -25,6 +25,8 @@ -author('feng@slimchat.io'). +-include("emqtt_log.hrl"). + -behaviour(gen_server). -define(SERVER, ?MODULE). @@ -69,7 +71,7 @@ create(ClientId, Pid) -> -spec destroy(ClientId :: binary(), Pid :: pid()) -> ok. destroy(ClientId, Pid) when is_binary(ClientId) -> - gen_server:cast(?SERVER, {destroy, ClientId, Pid}); + gen_server:cast(?SERVER, {destroy, ClientId, Pid}). %% ------------------------------------------------------------------ %% gen_server Function Definitions @@ -91,7 +93,7 @@ handle_call({create, ClientId, Pid}, _From, State) -> ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)}); [] -> ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)}) - end. + end, {reply, ok, State}; handle_call(_Request, _From, State) -> @@ -106,7 +108,7 @@ handle_cast({destroy, ClientId, Pid}, State) when is_binary(ClientId) -> ignore; [] -> ?ERROR("cannot find client '~s' with ~p", [ClientId, Pid]) - end + end, {noreply, State}; handle_cast(_Msg, State) -> @@ -114,7 +116,7 @@ handle_cast(_Msg, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> ets:match_delete(emqtt_client, {{'_', DownPid, MRef}}), - {noreply, State}. + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index 39eec70e5..5438f4e64 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -28,16 +28,69 @@ -include("emqtt_frame.hrl"). --export([process_request/3]). +-record(proto_state, { + socket, + message_id, + client_id, + clean_sess, + will_msg, + awaiting_ack, + subtopics, + awaiting_rel +}). -process_request(?CONNECT, +-type proto_state() :: #proto_state{}. + +-export([initial_state/1, handle_frame/2, send_message/2, client_terminated/1]). + +-export([info/1]). + +-define(FRAME_TYPE(Frame, Type), + Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). + +initial_state(Socket) -> + #proto_state{ + socket = Socket, + message_id = 1, + awaiting_ack = gb_trees:empty(), + subtopics = [], + awaiting_rel = gb_trees:empty() + }. + +info(#proto_state{ message_id = MsgId, + client_id = ClientId, + clean_sess = CleanSess, + will_msg = WillMsg, + subtopics = SubTopics}) -> + [ {message_id, MsgId}, + {client_id, ClientId}, + {clean_sess, CleanSess}, + {will_msg, WillMsg}, + {subtopics, SubTopics} ]. + +-spec handle_frame(Frame, State) -> {ok, NewState} | {error, any()} when + Frame :: #mqtt_frame{}, + State :: proto_state(), + NewState :: proto_state(). + +handle_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, + State = #proto_state{client_id = ClientId}) -> + ?INFO("frame from ~s: ~p", [ClientId, Frame]), + case validate_frame(Type, Frame) of + ok -> + handle_request(Type, Frame, State); + {error, Reason} -> + {error, Reason, State} + end. + +handle_request(?CONNECT, #mqtt_frame{ variable = #mqtt_frame_connect{ username = Username, password = Password, proto_ver = ProtoVersion, clean_sess = CleanSess, keep_alive = AlivePeriod, - client_id = ClientId } = Var}, #state{socket = Sock} = State) -> + client_id = ClientId } = Var}, State = #proto_state{socket = Sock}) -> {ReturnCode, State1} = case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)), valid_client_id(ClientId)} of @@ -52,12 +105,12 @@ process_request(?CONNECT, {?CONNACK_CREDENTIALS, State}; true -> ?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]), + %%TODO: + %%KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), emqtt_cm:create(ClientId, self()), - KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), {?CONNACK_ACCEPT, - State #state{ will_msg = make_will_msg(Var), - client_id = ClientId, - keep_alive = KeepAlive}} + State #proto_state{ will_msg = make_will_msg(Var), + client_id = ClientId }} end end, ?INFO("recv conn...:~p", [ReturnCode]), @@ -66,26 +119,26 @@ process_request(?CONNECT, return_code = ReturnCode }}), {ok, State1}; -process_request(?PUBLISH, Frame=#mqtt_frame{ +handle_request(?PUBLISH, Frame=#mqtt_frame{ fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) -> - emqtt_pubsub:publish(make_msg(Frame)), + emqtt_router:route(make_msg(Frame)), {ok, State}; -process_request(?PUBLISH, +handle_request(?PUBLISH, Frame=#mqtt_frame{ fixed = #mqtt_frame_fixed{qos = ?QOS_1}, variable = #mqtt_frame_publish{message_id = MsgId}}, - State=#state{socket=Sock}) -> + State=#proto_state{socket=Sock}) -> emqtt_pubsub:publish(make_msg(Frame)), send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK }, variable = #mqtt_frame_publish{ message_id = MsgId}}), {ok, State}; -process_request(?PUBLISH, +handle_request(?PUBLISH, Frame=#mqtt_frame{ fixed = #mqtt_frame_fixed{qos = ?QOS_2}, variable = #mqtt_frame_publish{message_id = MsgId}}, - State=#state{socket=Sock}) -> + State=#proto_state{socket=Sock}) -> emqtt_pubsub:publish(make_msg(Frame)), put({msg, MsgId}, pubrec), send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC}, @@ -93,40 +146,40 @@ process_request(?PUBLISH, {ok, State}; -process_request(?PUBACK, #mqtt_frame{}, State) -> +handle_request(?PUBACK, #mqtt_frame{}, State) -> %TODO: fixme later {ok, State}; -process_request(?PUBREC, #mqtt_frame{ +handle_request(?PUBREC, #mqtt_frame{ variable = #mqtt_frame_publish{message_id = MsgId}}, - State=#state{socket=Sock}) -> + State=#proto_state{socket=Sock}) -> %TODO: fixme later send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL}, variable = #mqtt_frame_publish{ message_id = MsgId}}), {ok, State}; -process_request(?PUBREL, +handle_request(?PUBREL, #mqtt_frame{ variable = #mqtt_frame_publish{message_id = MsgId}}, - State=#state{socket=Sock}) -> + State=#proto_state{socket=Sock}) -> erase({msg, MsgId}), send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP}, variable = #mqtt_frame_publish{ message_id = MsgId}}), {ok, State}; -process_request(?PUBCOMP, #mqtt_frame{ +handle_request(?PUBCOMP, #mqtt_frame{ variable = #mqtt_frame_publish{message_id = _MsgId}}, State) -> %TODO: fixme later {ok, State}; -process_request(?SUBSCRIBE, +handle_request(?SUBSCRIBE, #mqtt_frame{ variable = #mqtt_frame_subscribe{message_id = MessageId, topic_table = Topics}, payload = undefined}, - #state{socket=Sock} = State) -> + #proto_state{socket=Sock} = State) -> [emqtt_pubsub:subscribe({Name, Qos}, self()) || #mqtt_topic{name=Name, qos=Qos} <- Topics], @@ -140,11 +193,11 @@ process_request(?SUBSCRIBE, {ok, State}; -process_request(?UNSUBSCRIBE, +handle_request(?UNSUBSCRIBE, #mqtt_frame{ variable = #mqtt_frame_subscribe{message_id = MessageId, topic_table = Topics }, - payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) -> + payload = undefined}, #proto_state{socket = Sock, client_id = ClientId} = State) -> [emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], @@ -154,26 +207,64 @@ process_request(?UNSUBSCRIBE, {ok, State}; -process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) -> +%, keep_alive=KeepAlive +handle_request(?PINGREQ, #mqtt_frame{}, #proto_state{socket=Sock}=State) -> %Keep alive timer - KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), + %%TODO:... + %%KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), - {ok, State#state{keep_alive=KeepAlive1}}; + {ok, State}; -process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) -> +handle_request(?DISCONNECT, #mqtt_frame{}, State=#proto_state{client_id=ClientId}) -> ?INFO("~s disconnected", [ClientId]), {stop, State}. +-spec send_message(Message, State) -> {ok, NewState} when + Message :: mqtt_msg(), + State :: proto_state(), + NewState :: proto_state(). + +send_message(Message, State = #proto_state{socket = Sock, message_id = MsgId}) -> + + #mqtt_msg{retain = Retain, + qos = Qos, + topic = Topic, + dup = Dup, + payload = Payload, + encoder = Encoder} = Message, + + Payload1 = + if + Encoder == undefined -> Payload; + true -> Encoder(Payload) + end, + Frame = #mqtt_frame{ + fixed = #mqtt_frame_fixed{type = ?PUBLISH, + qos = Qos, + retain = Retain, + dup = Dup}, + variable = #mqtt_frame_publish{topic_name = Topic, + message_id = if + Qos == ?QOS_0 -> undefined; + true -> MsgId + end}, + payload = Payload1}, + + send_frame(Sock, Frame), + if + Qos == ?QOS_0 -> + {ok, State}; + true -> + {ok, next_msg_id(State)} + end. + send_frame(Sock, Frame) -> ?INFO("send frame:~p", [Frame]), erlang:port_command(Sock, emqtt_frame:serialise(Frame)). -valid_client_id(ClientId) -> - ClientIdLen = size(ClientId), - 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. - -validate_frame(_Type, _Frame) -> - ok. +%%TODO: fix me later... +client_terminated(#proto_state{client_id = ClientId} = State) -> + emqtt_cm:destroy(ClientId, self()). make_msg(#mqtt_frame{ fixed = #mqtt_frame_fixed{qos = Qos, @@ -200,3 +291,48 @@ make_will_msg(#mqtt_frame_connect{ will_retain = Retain, topic = Topic, dup = false, payload = Msg }. + +next_msg_id(State = #proto_state{ message_id = 16#ffff }) -> + State #proto_state{ message_id = 1 }; +next_msg_id(State = #proto_state{ message_id = MsgId }) -> + State #proto_state{ message_id = MsgId + 1 }. + +valid_client_id(ClientId) -> + ClientIdLen = size(ClientId), + 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. + +validate_frame(?PUBLISH, #mqtt_frame{variable = #mqtt_frame_publish{topic_name = Topic}}) -> + case emqtt_topic:validate({publish, Topic}) of + true -> ok; + false -> {error, badtopic} + end; + +validate_frame(?UNSUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) -> + ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics, + not emqtt_topic:validate({subscribe, Topic})], + case ErrTopics of + [] -> ok; + _ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic} + end; + +validate_frame(?SUBSCRIBE, #mqtt_frame{variable = #mqtt_frame_subscribe{topic_table = Topics}}) -> + ErrTopics = [Topic || #mqtt_topic{name=Topic, qos=Qos} <- Topics, + not (emqtt_topic:validate({subscribe, Topic}) and (Qos < 3))], + case ErrTopics of + [] -> ok; + _ -> ?ERROR("error topics: ~p", [ErrTopics]), {error, badtopic} + end; + +validate_frame(_Type, _Frame) -> + ok. + +maybe_clean_sess(false, _Conn, _ClientId) -> + % todo: establish subscription to deliver old unacknowledged messages + ok. + +%%---------------------------------------------------------------------------- + +send_will_msg(#proto_state{will_msg = undefined}) -> + ignore; +send_will_msg(#proto_state{will_msg = WillMsg }) -> + emqtt_pubsub:publish(WillMsg). diff --git a/apps/emqtt/src/emqtt_router.erl b/apps/emqtt/src/emqtt_router.erl index a33a5446e..215e10522 100644 --- a/apps/emqtt/src/emqtt_router.erl +++ b/apps/emqtt/src/emqtt_router.erl @@ -20,8 +20,23 @@ %% SOFTWARE. %%------------------------------------------------------------------------------ +%%route chain... statistics -module(emqtt_router). +-include("emqtt.hrl"). + +-include("emqtt_frame.hrl"). + +-export([route/1]). + %%Router Chain--> %%--->In %%Out<--- + +-spec route(Msg :: mqtt_msg()) -> any(). +route(Msg) -> + emqtt_pubsub:publish(retained(Msg)). + +retained(Msg = #mqtt_msg{retain = true, topic = Topic}) -> + emqtt_retained:insert(Topic, Msg), Msg. +