merge emqtt_processor to emqtt_client

This commit is contained in:
erylee 2012-12-22 14:28:04 +08:00
parent 0330d0ff01
commit 92ce6ef539
4 changed files with 190 additions and 261 deletions

View File

@ -98,3 +98,4 @@
message_id,
payload}).

BIN
src/.emqtt_frame.erl.swp Normal file

Binary file not shown.

View File

@ -2,7 +2,7 @@
-behaviour(gen_server2).
-export([start_link/0, go/2]).
-export([start_link/0, go/2, info/1]).
-export([init/1,
handle_call/3,
@ -22,22 +22,16 @@
connection_state,
conserve,
parse_state,
proc_state }).
-record(proc_state, { socket,
subscriptions,
consumer_tags,
unacked_pubs,
awaiting_ack,
awaiting_seqno,
message_id,
client_id,
clean_sess,
will_msg,
channels,
connection,
exchange }).
message_id,
client_id,
clean_sess,
will_msg,
awaiting_ack,
subscriptions
}).
-define(FRAME_TYPE(Frame, Type),
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
start_link() ->
gen_server2:start_link(?MODULE, [], []).
@ -45,9 +39,19 @@ start_link() ->
go(Pid, Sock) ->
gen_server2:call(Pid, {go, Sock}).
info(Pid) ->
gen_server2:call(Pid, info).
init([]) ->
{ok, undefined, hibernate, {backoff, 1000, 1000, 10000}}.
handle_call(info, _From, #state{conn_name=ConnName,
message_id=MsgId, client_id=ClientId} = State) ->
Info = [{conn_name, ConnName},
{message_id, MsgId},
{client_id, ClientId}],
{reply, Info, State};
handle_call({go, Sock}, _From, _State) ->
process_flag(trap_exit, true),
ok = throw_on_error(
@ -62,17 +66,19 @@ handle_call({go, Sock}, _From, _State) ->
connection_state = running,
conserve = false,
parse_state = emqtt_frame:initial_state(),
proc_state = emqtt_processor:initial_state(Sock) })}.
message_id = 1,
subscriptions = dict:new(),
awaiting_ack = gb_tree:empty()})}.
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info({route, Msg}, #state{proc_state=PState} = State) ->
handle_info({route, Msg}, #state{socket = Sock} = State) ->
#mqtt_msg{ retain = Retain,
qos = Qos,
topic = Topic,
dup = Dup,
message_id = MessageId,
%message_id = MessageId,
payload = Payload } = Msg,
Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{
@ -85,7 +91,7 @@ handle_info({route, Msg}, #state{proc_state=PState} = State) ->
message_id = 1},
payload = Payload },
emqtt_processor:send_client(Frame, PState),
send_frame(Sock, Frame),
{noreply, State};
handle_info({inet_reply, _Ref, ok}, State) ->
@ -128,7 +134,6 @@ process_received_bytes(<<>>, State) ->
{noreply, State};
process_received_bytes(Bytes,
State = #state{ parse_state = ParseState,
proc_state = ProcState,
conn_name = ConnStr }) ->
case
emqtt_frame:parse(Bytes, ParseState) of
@ -137,35 +142,181 @@ process_received_bytes(Bytes,
control_throttle( State #state{ parse_state = ParseState1 }),
hibernate};
{ok, Frame, Rest} ->
case emqtt_processor:process_frame(Frame, ProcState) of
{ok, ProcState1} ->
case process_frame(Frame, State) of
{ok, State1} ->
PS = emqtt_frame:initial_state(),
process_received_bytes(
Rest,
State #state{ parse_state = PS,
proc_state = ProcState1 });
{err, Reason, ProcState1} ->
error_logger:info_msg("MQTT protocol error ~p for connection ~p~n",
State1 #state{ parse_state = PS});
{err, Reason, State1} ->
?ERROR("MQTT protocol error ~p for connection ~p~n",
[Reason, ConnStr]),
stop({shutdown, Reason}, pstate(State, ProcState1));
{stop, ProcState1} ->
stop(normal, pstate(State, ProcState1))
stop({shutdown, Reason}, State1);
{stop, State1} ->
stop(normal, State1)
end;
{error, Error} ->
error_logger:erro_msg("MQTT detected framing error ~p for connection ~p~n",
?ERROR("MQTT detected framing error ~p for connection ~p~n",
[ConnStr, Error]),
stop({shutdown, Error}, State)
end.
pstate(State = #state {}, PState = #proc_state{}) ->
State #state{ proc_state = PState }.
process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
State ) ->
process_request(Type, Frame, State).
process_request(?CONNECT,
#mqtt_frame{ variable = #mqtt_frame_connect{
username = Username,
password = Password,
proto_ver = ProtoVersion,
clean_sess = CleanSess,
client_id = ClientId } = Var}, #state{socket = Sock} = State) ->
?INFO("connect frame: ~p~n", [Var]),
{ReturnCode, State1} =
case {ProtoVersion =:= ?MQTT_PROTO_MAJOR,
emqtt_util:valid_client_id(ClientId)} of
{false, _} ->
{?CONNACK_PROTO_VER, State};
{_, false} ->
{?CONNACK_INVALID_ID, State};
_ ->
case creds(Username, Password) of
nocreds ->
error_logger:error_msg("MQTT login failed - no credentials~n"),
{?CONNACK_CREDENTIALS, State};
{UserBin, PassBin} ->
{?CONNACK_ACCEPT,
State #state{ will_msg = make_will_msg(Var),
client_id = ClientId }}
end
end,
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK},
variable = #mqtt_frame_connack{
return_code = ReturnCode }}),
{ok, State1};
process_request(?PUBACK,
#mqtt_frame{
variable = #mqtt_frame_publish{ message_id = MessageId }},
#state{awaiting_ack = Awaiting } = State) ->
{ok, State #state{ awaiting_ack = gb_trees:delete( MessageId, Awaiting)}};
process_request(?PUBLISH,
#mqtt_frame{
fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, State) ->
{err, qos2_not_supported, State};
process_request(?PUBLISH,
#mqtt_frame{
fixed = #mqtt_frame_fixed{ qos = Qos,
retain = Retain,
dup = Dup },
variable = #mqtt_frame_publish{ topic_name = Topic,
message_id = MessageId },
payload = Payload }, #state{ socket=Sock, message_id = MsgId } = State) ->
Msg = #mqtt_msg{ retain = Retain,
qos = Qos,
topic = Topic,
dup = Dup,
message_id = MessageId,
payload = Payload },
emqtt_router:route(Msg),
send_frame(Sock,
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
variable = #mqtt_frame_publish{ message_id = MsgId}}),
{ok, State};
process_request(?SUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{ message_id = MessageId,
topic_table = Topics },
payload = undefined },
#state{socket=Sock} = State0) ->
QosResponse =
lists:foldl(fun (#mqtt_topic{ name = TopicName,
qos = Qos }, QosList) ->
SupportedQos = supported_subs_qos(Qos),
[SupportedQos | QosList]
end, [], Topics),
[emqtt_topic:insert(Name) || #mqtt_topic{name=Name} <- Topics],
[emqtt_router:insert(#subscriber{topic=emqtt_util:binary(Name), pid=self()})
|| #mqtt_topic{name=Name} <- Topics],
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK },
variable = #mqtt_frame_suback{
message_id = MessageId,
qos_table = QosResponse }}),
{ok, State0};
process_request(?UNSUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{ message_id = MessageId,
topic_table = Topics },
payload = undefined }, #state{ socket = Sock, client_id = ClientId,
subscriptions = Subs0} = State) ->
[emqtt_router:delete(#subscriber{topic=Name, pid=self()})
|| #mqtt_topic{name=Name} <- Topics],
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK },
variable = #mqtt_frame_suback{ message_id = MessageId }}),
{ok, State #state{ subscriptions = Subs0 }};
process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock}=State) ->
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}),
{ok, State};
process_request(?DISCONNECT, #mqtt_frame{}, State) ->
{stop, State}.
next_msg_id(State = #state{ message_id = 16#ffff }) ->
State #state{ message_id = 1 };
next_msg_id(State = #state{ message_id = MsgId }) ->
State #state{ message_id = MsgId + 1 }.
maybe_clean_sess(false, _Conn, _ClientId) ->
% todo: establish subscription to deliver old unacknowledged messages
ok.
%%----------------------------------------------------------------------------
make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
undefined;
make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
will_qos = Qos,
will_topic = Topic,
will_msg = Msg }) ->
#mqtt_msg{ retain = Retain,
qos = Qos,
topic = Topic,
dup = false,
payload = Msg }.
creds(User, Pass) ->
{User, Pass}.
supported_subs_qos(?QOS_0) -> ?QOS_0;
supported_subs_qos(?QOS_1) -> ?QOS_1;
supported_subs_qos(?QOS_2) -> ?QOS_1.
send_will(#state{ will_msg = WillMsg }) ->
?INFO("willmsg: ~p~n", [WillMsg]).
send_frame(Sock, Frame) ->
erlang:port_command(Sock, emqtt_frame:serialise(Frame)).
%%----------------------------------------------------------------------------
network_error(_Reason,
State = #state{ conn_name = ConnStr,
proc_state = PState }) ->
error_logger:info_msg("MQTT detected network error for ~p~n", [ConnStr]),
emqtt_processor:send_will(PState),
State = #state{ conn_name = ConnStr}) ->
?INFO("MQTT detected network error for ~p~n", [ConnStr]),
send_will(State),
% todo: flush channel after publish
stop({shutdown, conn_closed}, State).
@ -189,3 +340,4 @@ control_throttle(State = #state{ connection_state = Flow,
stop(Reason, State ) ->
{stop, Reason, State}.

View File

@ -1,224 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%%Don't send amqp message
-module(emqtt_processor).
-export([info/2, initial_state/1,
process_frame/2, send_client/2, send_will/1]).
-include("emqtt.hrl").
-include("emqtt_frame.hrl").
-record(proc_state, { socket,
subscriptions,
consumer_tags,
unacked_pubs,
awaiting_ack,
awaiting_seqno,
message_id,
client_id,
clean_sess,
will_msg,
channels,
connection,
exchange }).
-define(FRAME_TYPE(Frame, Type),
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
initial_state(Socket) ->
#proc_state{ unacked_pubs = gb_trees:empty(),
awaiting_ack = gb_trees:empty(),
message_id = 1,
subscriptions = dict:new(),
consumer_tags = {undefined, undefined},
channels = {undefined, undefined},
exchange = emqtt_util:env(exchange),
socket = Socket }.
info(client_id, #proc_state{ client_id = ClientId }) -> ClientId.
process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
PState ) ->
process_request(Type, Frame, PState).
process_request(?CONNECT,
#mqtt_frame{ variable = #mqtt_frame_connect{
username = Username,
password = Password,
proto_ver = ProtoVersion,
clean_sess = CleanSess,
client_id = ClientId } = Var}, PState) ->
error_logger:info_msg("connect frame: ~p~n", [Var]),
{ReturnCode, PState1} =
case {ProtoVersion =:= ?MQTT_PROTO_MAJOR,
emqtt_util:valid_client_id(ClientId)} of
{false, _} ->
{?CONNACK_PROTO_VER, PState};
{_, false} ->
{?CONNACK_INVALID_ID, PState};
_ ->
case creds(Username, Password) of
nocreds ->
error_logger:error_msg("MQTT login failed - no credentials~n"),
{?CONNACK_CREDENTIALS, PState};
{UserBin, PassBin} ->
{?CONNACK_ACCEPT,
PState #proc_state{ will_msg = make_will_msg(Var),
client_id = ClientId }}
end
end,
send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK},
variable = #mqtt_frame_connack{
return_code = ReturnCode }}, PState1),
{ok, PState1};
process_request(?PUBACK,
#mqtt_frame{
variable = #mqtt_frame_publish{ message_id = MessageId }},
#proc_state{awaiting_ack = Awaiting } = PState) ->
{ok, PState #proc_state{ awaiting_ack = gb_trees:delete( MessageId, Awaiting)}};
process_request(?PUBLISH,
#mqtt_frame{
fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, PState) ->
{err, qos2_not_supported, PState};
process_request(?PUBLISH,
#mqtt_frame{
fixed = #mqtt_frame_fixed{ qos = Qos,
retain = Retain,
dup = Dup },
variable = #mqtt_frame_publish{ topic_name = Topic,
message_id = MessageId },
payload = Payload }, #proc_state{ message_id = MsgId } = PState) ->
Msg = #mqtt_msg{ retain = Retain,
qos = Qos,
topic = Topic,
dup = Dup,
message_id = MessageId,
payload = Payload },
emqtt_router:route(Msg),
send_client(
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
variable = #mqtt_frame_publish{ message_id = MsgId}},
PState),
{ok, PState};
process_request(?SUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{ message_id = MessageId,
topic_table = Topics },
payload = undefined },
#proc_state{} = PState0) ->
QosResponse =
lists:foldl(fun (#mqtt_topic{ name = TopicName,
qos = Qos }, QosList) ->
SupportedQos = supported_subs_qos(Qos),
[SupportedQos | QosList]
end, [], Topics),
[emqtt_topic:insert(Name) || #mqtt_topic{name=Name} <- Topics],
[emqtt_router:insert(#subscriber{topic=emqtt_util:binary(Name), pid=self()})
|| #mqtt_topic{name=Name} <- Topics],
send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK },
variable = #mqtt_frame_suback{
message_id = MessageId,
qos_table = QosResponse }}, PState0),
{ok, PState0};
process_request(?UNSUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{ message_id = MessageId,
topic_table = Topics },
payload = undefined }, #proc_state{ client_id = ClientId,
subscriptions = Subs0} = PState) ->
[emqtt_router:delete(#subscriber{topic=Name, pid=self()})
|| #mqtt_topic{name=Name} <- Topics],
send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK },
variable = #mqtt_frame_suback{ message_id = MessageId }},
PState),
{ok, PState #proc_state{ subscriptions = Subs0 }};
process_request(?PINGREQ, #mqtt_frame{}, PState) ->
send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }},
PState),
{ok, PState};
process_request(?DISCONNECT, #mqtt_frame{}, PState) ->
{stop, PState}.
next_msg_id(PState = #proc_state{ message_id = 16#ffff }) ->
PState #proc_state{ message_id = 1 };
next_msg_id(PState = #proc_state{ message_id = MsgId }) ->
PState #proc_state{ message_id = MsgId + 1 }.
%% decide at which qos level to deliver based on subscription
%% and the message publish qos level. non-MQTT publishes are
%% assumed to be qos 1, regardless of delivery_mode.
delivery_qos(Tag, _Headers, #proc_state{ consumer_tags = {Tag, _} }) ->
{?QOS_0, ?QOS_0};
delivery_qos(Tag, Headers, #proc_state{ consumer_tags = {_, Tag} }) ->
case emqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of
{byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1};
undefined -> {?QOS_1, ?QOS_1}
end.
maybe_clean_sess(false, _Conn, _ClientId) ->
% todo: establish subscription to deliver old unacknowledged messages
ok.
%%----------------------------------------------------------------------------
make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
undefined;
make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
will_qos = Qos,
will_topic = Topic,
will_msg = Msg }) ->
#mqtt_msg{ retain = Retain,
qos = Qos,
topic = Topic,
dup = false,
payload = Msg }.
creds(User, Pass) ->
{User, Pass}.
supported_subs_qos(?QOS_0) -> ?QOS_0;
supported_subs_qos(?QOS_1) -> ?QOS_1;
supported_subs_qos(?QOS_2) -> ?QOS_1.
send_will(PState = #proc_state{ will_msg = WillMsg }) ->
error_logger:info_msg("willmsg: ~p~n", [WillMsg]).
send_client(Frame, #proc_state{ socket = Sock }) ->
erlang:port_command(Sock, emqtt_frame:serialise(Frame)).