From 92ce6ef539d1350cef8a21d1dd38b2e7904c377d Mon Sep 17 00:00:00 2001 From: erylee Date: Sat, 22 Dec 2012 14:28:04 +0800 Subject: [PATCH] merge emqtt_processor to emqtt_client --- include/emqtt_frame.hrl | 1 + src/.emqtt_frame.erl.swp | Bin 0 -> 20480 bytes src/emqtt_client.erl | 226 ++++++++++++++++++++++++++++++++------- src/emqtt_processor.erl | 224 -------------------------------------- 4 files changed, 190 insertions(+), 261 deletions(-) create mode 100644 src/.emqtt_frame.erl.swp delete mode 100644 src/emqtt_processor.erl diff --git a/include/emqtt_frame.hrl b/include/emqtt_frame.hrl index fb6eebe29..701351a27 100644 --- a/include/emqtt_frame.hrl +++ b/include/emqtt_frame.hrl @@ -98,3 +98,4 @@ message_id, payload}). + diff --git a/src/.emqtt_frame.erl.swp b/src/.emqtt_frame.erl.swp new file mode 100644 index 0000000000000000000000000000000000000000..6c2212e6f44145ebc2606d4ccdb4d3037c977ad9 GIT binary patch literal 20480 zcmeHNTZ|-C87@V+C?Ho86B7@!!R>~gp6*$eU1xh{GBZ27+u7}z-Rap|$kNtySNGIz zPgPS@J;TyF7-I|xA_)OC7(sjxqKOhBCO&{*4DbLe;w6d?1{5#rgF$@~ynX*Ur>ahM zSNF_W;o*WDRFfF9QZh ziV_Ya9Ej&Y+p7=nyiD6Rn$J;%EkoC_tFJm7UpM)ha3JA8!hwVX2?r7mBpgUMkZ>U3 z!2cx&yw(NUr!X^w`}{d7f8W#f`#$-8T>c*GD!)L!e?tB~(N!*Alb?hG2?r7mBpgUM zkZ>U3K*E890|^Hb4kR2%IFNAQf4~8~rfEAc?&B2jmYz@xx2a09RvxDvPm_}itL_E+F6@Lk|Z;9;NxoB$?)>i`B^ z1Y8LG<%62`L*NXs9eDEtpa-Y}dx1T`5b(Q8H0=k#7k~$V6TlSkQQ(b>HSK<27Wnf; zn)X$o0n~x*z?&Cp+Ao0zfeNq{xCr>e`!($a;4$DXpaJX!hJd$_2zddx8z=#n0xu%@ z@kQWH;8q|9{2d9E?*sP$W#GfWZ@^>X^SAxv%K6Ki^?#~O)3CgXSwod^HpAbFwT#xk z1v_OJt%}|>R}E^iA8%9iGO$gxY3NqPHC&h4EZKH5HAA1u_+r{tdSK%Ty13ZE6`-C`?SO6Wdz4;~EuQ1V}9yuD2t@ zsL~qh#PGj!*fpFv+{Vc;#%r(lbZ~QqQ6&qN;yNI1m)r*0^A2tfJ6LfWoPzBcF3En4 z7wwi=&G5$C!^S@OH}!8E=)M^FrryTedK7@>xNT&~v~=h6!~|#_=L3&+OFS?V644mT z-;6em?n)K(W;m+`nIcu{E>{gEuH3f?YTmCL`p=6|-a}-l=<)4Ux%c*8D5zqMD_2bd zJL#+($uU;2jG0!W(lo48p^zKP zXQ=iVY?yiqrXF;u^pat`e@=KvMzqTaIUxa{W^!L~@4o3GxZjt`U45v`Z%Ac0Qa6#E z72AQ->TQb{Otee(Md{|um-10N^JQgNwe^1FO-uybifbxY;f~=rwv%C|wTg|hR#8@Y z$O4@+wzdqpi4oZZn=DNos~nh}3-u&Qa)r}ekwjNwhE}yD+BPC4s6eidhA8y{v7a|b z(`-}R;{yl{5VkO5TOJn}{lTS|1(VM6iEi9-(`M^w;`ezz+?r`UKV+A?7vfEHRcp7X zmUMM?w_9@S5Y`+gpMHx0by9+tq&Ez)D)25~&dSFzY-<>UEC=L{jgozn3qv2eEZwp# zV?*f*k%+2_I58-xf6#@7)x|UQX2tb%&q#6cQDIiohHFOMw2ah&`J-3{${9AgC6^Zu zw9hXMR*BIq`J8x^QfU$yY?oW=;!#SYo=k=fl@1l%T5CwDvM^hoT{tqkTd2G?Y~0?m z9WQkYc3Dtuy2eQU1XcIHbIy){dy#pK3G>`(umsdid}0r zjg(S`&(~j%e{9BXop#K|vd2=@G#kz3b_|c^@}ulXX-%i)s%TZSj7>G0jMs9RBmDsC zfGsW?sKNQnb{nfk({91VwCl`6!8}^P`DkK{n!!vr(x)7rI_RmlpfBP6b?J~#W7AwQ zAw)QQ+58?hHFuotFU~GM9r!i7GcwmIas@gQTZdDDo2EtSZWZ7}M%~tf&%wFQ5f{xlPya}mDwoxZC0$}-U zo~4L|L$Yuvy^EceomjTV+HlvsWe{FBn+8}=HCi6@kos6eGsFfS;xsUYhVZKqC*1bZ zNkk3IvpIz&F%x#Mz0`zY6jJ1$W1>sH3;c$}f70Pf+kqM2D&R`s6~z7b0Q-T9f!`y(e;RlgI0NhlwgImrhJOwC z60iu20+#?kMJ)dl;A_BVfa5?O$N~RCJbxDW3Qz+6i5UJY@HlWUPzJ6At^wXgO#dA4 zP2lrD1E>R&z%cL;;K!KL%fMs6T>!=Ydx1Rw&38ADJP8L94kR2%IFNAQU2wo|d8uP* z5ek}a#l#-pFdX`X$n{uOqjE~_rF$X{l;?Nx^19x1dn=$kC(6k?B3HZ?=>+AIzI(h& zgpP#*54}@!@IP0A2tG~Of>hu!^HjndysOo)Mmk6&WVNn}L8KwG4|8KXCMI_E*5bkN z92+mNNf|LvAgs}}3k6D9$cz~)!|Sse2!@q0sTNOxjDmvu+4ddS^YkkDP?$vxQ+t-={nPEoYy)6|F#Mj?Gj9wjIcYb-S> zJtaw|1nTLgt=Ga&6B8oXEfgrkqi8NgqNlJiO)(sQLt+VUh)(#M3~s4d3x~xlLi(D> z`hXfa(=HDA35Q-5DHNxXUgB&EDdzRLxDJ;GR(qiJn}uBccE?gt+S__mGo7lz^QW@K#fW}>B9kwE}A#`RHq(+K0=!9aM17^+8g-2>B9y$~`JUunD zU&-1Ic@-SBd%BI+z3PmWcFoXZ@<#& zsh~Se|1LD=K(V}Uuz@QJD&^-~XpYQCiWg8*DBxePA&=pV?9kXK%>K+7c8C>*`JsYu zMM`(PWgzK@wecmo++Lcho?_!;Q1Y@+awN#%FEF*_@SH4*UyEbbA0Gd2h2!&>JpcFa z_rHW#|I0uFxDmJ-_%-7Fv%tN;F(3_`L#+Q0a1xjX{(%^u&iC&J4g)FRRmAob@81d( zfy;qkA-=y4m|Z8NT2PTo#OgLwzJ>>0r|z@15>b=!82~-8)t2RkVa#nVJqGiY4K^ z;tO_@Y1KXVD+fM<8XoYihjtQ;coL4V>iZJXb@5r8QR?^|OTMM!of_&_)+C*im2p(l z$F-sQ{^+P53>+@ps(;f#78S>-)W|=G)hO-h&x9wsHI3gS$#dS2cbpX;a{L1it0V(D z{6Yd9q^$dAt8^J58kwLDoi7JYe%HB6C74+!V7WM7_D>qAOYUHC0tD+jLVy&9!1?hg z+(?Tfoq!VBw4;=uQh!P~85(g$97R!c+K~K-$@dJ~nBbtQO0az@a;}X9LWn}jfjoD~ zNaaQg{}eK)w~si{Z>r-!QT_`N8@4w(2AVu0B~QjghSC4oHTN0QVHOtLm4SPSe)Ct zFndT{%R91yv=_}MXtTdujM7Pw?mzC?NV|$oLYyj1_tjh98LQ7e9Y z2j&fGH+u&vB9?x9+(aZUR@ifs$0@jUL zB=q(jMXiI<1F>{>O3<}UuUQrH2_cjqx4NLJsL*dMcAVaiJo%tHHoQ5%=M+}%gl6VD ze6-?srzZRc9lnl>Jx+8NSR#2IzTPPI#}$Xi gen_server2:start_link(?MODULE, [], []). @@ -45,9 +39,19 @@ start_link() -> go(Pid, Sock) -> gen_server2:call(Pid, {go, Sock}). +info(Pid) -> + gen_server2:call(Pid, info). + init([]) -> {ok, undefined, hibernate, {backoff, 1000, 1000, 10000}}. +handle_call(info, _From, #state{conn_name=ConnName, + message_id=MsgId, client_id=ClientId} = State) -> + Info = [{conn_name, ConnName}, + {message_id, MsgId}, + {client_id, ClientId}], + {reply, Info, State}; + handle_call({go, Sock}, _From, _State) -> process_flag(trap_exit, true), ok = throw_on_error( @@ -62,17 +66,19 @@ handle_call({go, Sock}, _From, _State) -> connection_state = running, conserve = false, parse_state = emqtt_frame:initial_state(), - proc_state = emqtt_processor:initial_state(Sock) })}. + message_id = 1, + subscriptions = dict:new(), + awaiting_ack = gb_tree:empty()})}. handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. -handle_info({route, Msg}, #state{proc_state=PState} = State) -> +handle_info({route, Msg}, #state{socket = Sock} = State) -> #mqtt_msg{ retain = Retain, qos = Qos, topic = Topic, dup = Dup, - message_id = MessageId, + %message_id = MessageId, payload = Payload } = Msg, Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{ @@ -85,7 +91,7 @@ handle_info({route, Msg}, #state{proc_state=PState} = State) -> message_id = 1}, payload = Payload }, - emqtt_processor:send_client(Frame, PState), + send_frame(Sock, Frame), {noreply, State}; handle_info({inet_reply, _Ref, ok}, State) -> @@ -128,7 +134,6 @@ process_received_bytes(<<>>, State) -> {noreply, State}; process_received_bytes(Bytes, State = #state{ parse_state = ParseState, - proc_state = ProcState, conn_name = ConnStr }) -> case emqtt_frame:parse(Bytes, ParseState) of @@ -137,35 +142,181 @@ process_received_bytes(Bytes, control_throttle( State #state{ parse_state = ParseState1 }), hibernate}; {ok, Frame, Rest} -> - case emqtt_processor:process_frame(Frame, ProcState) of - {ok, ProcState1} -> + case process_frame(Frame, State) of + {ok, State1} -> PS = emqtt_frame:initial_state(), process_received_bytes( Rest, - State #state{ parse_state = PS, - proc_state = ProcState1 }); - {err, Reason, ProcState1} -> - error_logger:info_msg("MQTT protocol error ~p for connection ~p~n", + State1 #state{ parse_state = PS}); + {err, Reason, State1} -> + ?ERROR("MQTT protocol error ~p for connection ~p~n", [Reason, ConnStr]), - stop({shutdown, Reason}, pstate(State, ProcState1)); - {stop, ProcState1} -> - stop(normal, pstate(State, ProcState1)) + stop({shutdown, Reason}, State1); + {stop, State1} -> + stop(normal, State1) end; {error, Error} -> - error_logger:erro_msg("MQTT detected framing error ~p for connection ~p~n", + ?ERROR("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]), stop({shutdown, Error}, State) end. -pstate(State = #state {}, PState = #proc_state{}) -> - State #state{ proc_state = PState }. +process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, + State ) -> + process_request(Type, Frame, State). + +process_request(?CONNECT, + #mqtt_frame{ variable = #mqtt_frame_connect{ + username = Username, + password = Password, + proto_ver = ProtoVersion, + clean_sess = CleanSess, + client_id = ClientId } = Var}, #state{socket = Sock} = State) -> + ?INFO("connect frame: ~p~n", [Var]), + {ReturnCode, State1} = + case {ProtoVersion =:= ?MQTT_PROTO_MAJOR, + emqtt_util:valid_client_id(ClientId)} of + {false, _} -> + {?CONNACK_PROTO_VER, State}; + {_, false} -> + {?CONNACK_INVALID_ID, State}; + _ -> + case creds(Username, Password) of + nocreds -> + error_logger:error_msg("MQTT login failed - no credentials~n"), + {?CONNACK_CREDENTIALS, State}; + {UserBin, PassBin} -> + {?CONNACK_ACCEPT, + State #state{ will_msg = make_will_msg(Var), + client_id = ClientId }} + end + end, + send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK}, + variable = #mqtt_frame_connack{ + return_code = ReturnCode }}), + {ok, State1}; + +process_request(?PUBACK, + #mqtt_frame{ + variable = #mqtt_frame_publish{ message_id = MessageId }}, + #state{awaiting_ack = Awaiting } = State) -> + {ok, State #state{ awaiting_ack = gb_trees:delete( MessageId, Awaiting)}}; + +process_request(?PUBLISH, + #mqtt_frame{ + fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, State) -> + {err, qos2_not_supported, State}; + +process_request(?PUBLISH, + #mqtt_frame{ + fixed = #mqtt_frame_fixed{ qos = Qos, + retain = Retain, + dup = Dup }, + variable = #mqtt_frame_publish{ topic_name = Topic, + message_id = MessageId }, + payload = Payload }, #state{ socket=Sock, message_id = MsgId } = State) -> + Msg = #mqtt_msg{ retain = Retain, + qos = Qos, + topic = Topic, + dup = Dup, + message_id = MessageId, + payload = Payload }, + emqtt_router:route(Msg), + + send_frame(Sock, + #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK }, + variable = #mqtt_frame_publish{ message_id = MsgId}}), + {ok, State}; + +process_request(?SUBSCRIBE, + #mqtt_frame{ + variable = #mqtt_frame_subscribe{ message_id = MessageId, + topic_table = Topics }, + payload = undefined }, + #state{socket=Sock} = State0) -> + QosResponse = + lists:foldl(fun (#mqtt_topic{ name = TopicName, + qos = Qos }, QosList) -> + SupportedQos = supported_subs_qos(Qos), + [SupportedQos | QosList] + end, [], Topics), + + [emqtt_topic:insert(Name) || #mqtt_topic{name=Name} <- Topics], + + [emqtt_router:insert(#subscriber{topic=emqtt_util:binary(Name), pid=self()}) + || #mqtt_topic{name=Name} <- Topics], + + send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK }, + variable = #mqtt_frame_suback{ + message_id = MessageId, + qos_table = QosResponse }}), + + {ok, State0}; + +process_request(?UNSUBSCRIBE, + #mqtt_frame{ + variable = #mqtt_frame_subscribe{ message_id = MessageId, + topic_table = Topics }, + payload = undefined }, #state{ socket = Sock, client_id = ClientId, + subscriptions = Subs0} = State) -> + + + [emqtt_router:delete(#subscriber{topic=Name, pid=self()}) + || #mqtt_topic{name=Name} <- Topics], + + send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK }, + variable = #mqtt_frame_suback{ message_id = MessageId }}), + + {ok, State #state{ subscriptions = Subs0 }}; + +process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock}=State) -> + send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), + {ok, State}; + +process_request(?DISCONNECT, #mqtt_frame{}, State) -> + {stop, State}. + +next_msg_id(State = #state{ message_id = 16#ffff }) -> + State #state{ message_id = 1 }; +next_msg_id(State = #state{ message_id = MsgId }) -> + State #state{ message_id = MsgId + 1 }. + +maybe_clean_sess(false, _Conn, _ClientId) -> + % todo: establish subscription to deliver old unacknowledged messages + ok. + +%%---------------------------------------------------------------------------- + +make_will_msg(#mqtt_frame_connect{ will_flag = false }) -> + undefined; +make_will_msg(#mqtt_frame_connect{ will_retain = Retain, + will_qos = Qos, + will_topic = Topic, + will_msg = Msg }) -> + #mqtt_msg{ retain = Retain, + qos = Qos, + topic = Topic, + dup = false, + payload = Msg }. + +creds(User, Pass) -> + {User, Pass}. + +supported_subs_qos(?QOS_0) -> ?QOS_0; +supported_subs_qos(?QOS_1) -> ?QOS_1; +supported_subs_qos(?QOS_2) -> ?QOS_1. + +send_will(#state{ will_msg = WillMsg }) -> + ?INFO("willmsg: ~p~n", [WillMsg]). + +send_frame(Sock, Frame) -> + erlang:port_command(Sock, emqtt_frame:serialise(Frame)). %%---------------------------------------------------------------------------- network_error(_Reason, - State = #state{ conn_name = ConnStr, - proc_state = PState }) -> - error_logger:info_msg("MQTT detected network error for ~p~n", [ConnStr]), - emqtt_processor:send_will(PState), + State = #state{ conn_name = ConnStr}) -> + ?INFO("MQTT detected network error for ~p~n", [ConnStr]), + send_will(State), % todo: flush channel after publish stop({shutdown, conn_closed}, State). @@ -189,3 +340,4 @@ control_throttle(State = #state{ connection_state = Flow, stop(Reason, State ) -> {stop, Reason, State}. + diff --git a/src/emqtt_processor.erl b/src/emqtt_processor.erl deleted file mode 100644 index cc277bc04..000000000 --- a/src/emqtt_processor.erl +++ /dev/null @@ -1,224 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. -%% - -%%Don't send amqp message - --module(emqtt_processor). - --export([info/2, initial_state/1, - process_frame/2, send_client/2, send_will/1]). - --include("emqtt.hrl"). --include("emqtt_frame.hrl"). - --record(proc_state, { socket, - subscriptions, - consumer_tags, - unacked_pubs, - awaiting_ack, - awaiting_seqno, - message_id, - client_id, - clean_sess, - will_msg, - channels, - connection, - exchange }). - --define(FRAME_TYPE(Frame, Type), - Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). - -initial_state(Socket) -> - #proc_state{ unacked_pubs = gb_trees:empty(), - awaiting_ack = gb_trees:empty(), - message_id = 1, - subscriptions = dict:new(), - consumer_tags = {undefined, undefined}, - channels = {undefined, undefined}, - exchange = emqtt_util:env(exchange), - socket = Socket }. - -info(client_id, #proc_state{ client_id = ClientId }) -> ClientId. - -process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, - PState ) -> - process_request(Type, Frame, PState). - -process_request(?CONNECT, - #mqtt_frame{ variable = #mqtt_frame_connect{ - username = Username, - password = Password, - proto_ver = ProtoVersion, - clean_sess = CleanSess, - client_id = ClientId } = Var}, PState) -> - error_logger:info_msg("connect frame: ~p~n", [Var]), - {ReturnCode, PState1} = - case {ProtoVersion =:= ?MQTT_PROTO_MAJOR, - emqtt_util:valid_client_id(ClientId)} of - {false, _} -> - {?CONNACK_PROTO_VER, PState}; - {_, false} -> - {?CONNACK_INVALID_ID, PState}; - _ -> - case creds(Username, Password) of - nocreds -> - error_logger:error_msg("MQTT login failed - no credentials~n"), - {?CONNACK_CREDENTIALS, PState}; - {UserBin, PassBin} -> - {?CONNACK_ACCEPT, - PState #proc_state{ will_msg = make_will_msg(Var), - client_id = ClientId }} - end - end, - send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK}, - variable = #mqtt_frame_connack{ - return_code = ReturnCode }}, PState1), - {ok, PState1}; - -process_request(?PUBACK, - #mqtt_frame{ - variable = #mqtt_frame_publish{ message_id = MessageId }}, - #proc_state{awaiting_ack = Awaiting } = PState) -> - {ok, PState #proc_state{ awaiting_ack = gb_trees:delete( MessageId, Awaiting)}}; - -process_request(?PUBLISH, - #mqtt_frame{ - fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, PState) -> - {err, qos2_not_supported, PState}; - -process_request(?PUBLISH, - #mqtt_frame{ - fixed = #mqtt_frame_fixed{ qos = Qos, - retain = Retain, - dup = Dup }, - variable = #mqtt_frame_publish{ topic_name = Topic, - message_id = MessageId }, - payload = Payload }, #proc_state{ message_id = MsgId } = PState) -> - Msg = #mqtt_msg{ retain = Retain, - qos = Qos, - topic = Topic, - dup = Dup, - message_id = MessageId, - payload = Payload }, - emqtt_router:route(Msg), - - send_client( - #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK }, - variable = #mqtt_frame_publish{ message_id = MsgId}}, - PState), - {ok, PState}; - - -process_request(?SUBSCRIBE, - #mqtt_frame{ - variable = #mqtt_frame_subscribe{ message_id = MessageId, - topic_table = Topics }, - payload = undefined }, - #proc_state{} = PState0) -> - QosResponse = - lists:foldl(fun (#mqtt_topic{ name = TopicName, - qos = Qos }, QosList) -> - SupportedQos = supported_subs_qos(Qos), - [SupportedQos | QosList] - end, [], Topics), - - [emqtt_topic:insert(Name) || #mqtt_topic{name=Name} <- Topics], - - [emqtt_router:insert(#subscriber{topic=emqtt_util:binary(Name), pid=self()}) - || #mqtt_topic{name=Name} <- Topics], - - send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK }, - variable = #mqtt_frame_suback{ - message_id = MessageId, - qos_table = QosResponse }}, PState0), - - {ok, PState0}; - -process_request(?UNSUBSCRIBE, - #mqtt_frame{ - variable = #mqtt_frame_subscribe{ message_id = MessageId, - topic_table = Topics }, - payload = undefined }, #proc_state{ client_id = ClientId, - subscriptions = Subs0} = PState) -> - - - [emqtt_router:delete(#subscriber{topic=Name, pid=self()}) - || #mqtt_topic{name=Name} <- Topics], - - send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK }, - variable = #mqtt_frame_suback{ message_id = MessageId }}, - PState), - - {ok, PState #proc_state{ subscriptions = Subs0 }}; - -process_request(?PINGREQ, #mqtt_frame{}, PState) -> - send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}, - PState), - {ok, PState}; - -process_request(?DISCONNECT, #mqtt_frame{}, PState) -> - {stop, PState}. - - - -next_msg_id(PState = #proc_state{ message_id = 16#ffff }) -> - PState #proc_state{ message_id = 1 }; -next_msg_id(PState = #proc_state{ message_id = MsgId }) -> - PState #proc_state{ message_id = MsgId + 1 }. - -%% decide at which qos level to deliver based on subscription -%% and the message publish qos level. non-MQTT publishes are -%% assumed to be qos 1, regardless of delivery_mode. -delivery_qos(Tag, _Headers, #proc_state{ consumer_tags = {Tag, _} }) -> - {?QOS_0, ?QOS_0}; -delivery_qos(Tag, Headers, #proc_state{ consumer_tags = {_, Tag} }) -> - case emqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of - {byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1}; - undefined -> {?QOS_1, ?QOS_1} - end. - -maybe_clean_sess(false, _Conn, _ClientId) -> - % todo: establish subscription to deliver old unacknowledged messages - ok. - -%%---------------------------------------------------------------------------- - -make_will_msg(#mqtt_frame_connect{ will_flag = false }) -> - undefined; -make_will_msg(#mqtt_frame_connect{ will_retain = Retain, - will_qos = Qos, - will_topic = Topic, - will_msg = Msg }) -> - #mqtt_msg{ retain = Retain, - qos = Qos, - topic = Topic, - dup = false, - payload = Msg }. - -creds(User, Pass) -> - {User, Pass}. - -supported_subs_qos(?QOS_0) -> ?QOS_0; -supported_subs_qos(?QOS_1) -> ?QOS_1; -supported_subs_qos(?QOS_2) -> ?QOS_1. - -send_will(PState = #proc_state{ will_msg = WillMsg }) -> - error_logger:info_msg("willmsg: ~p~n", [WillMsg]). - - -send_client(Frame, #proc_state{ socket = Sock }) -> - erlang:port_command(Sock, emqtt_frame:serialise(Frame)). -