move 'process_request' to emqtt_protocol
This commit is contained in:
parent
2576463523
commit
29a8a0f283
|
@ -23,6 +23,8 @@
|
||||||
-define(MQTT_PROTO_MAJOR, 3).
|
-define(MQTT_PROTO_MAJOR, 3).
|
||||||
-define(MQTT_PROTO_MINOR, 1).
|
-define(MQTT_PROTO_MINOR, 1).
|
||||||
|
|
||||||
|
-define(CLIENT_ID_MAXLEN, 23).
|
||||||
|
|
||||||
%% frame types
|
%% frame types
|
||||||
|
|
||||||
-define(CONNECT, 1).
|
-define(CONNECT, 1).
|
||||||
|
@ -49,6 +51,21 @@
|
||||||
-define(CONNACK_CREDENTIALS, 4). %% bad user name or password
|
-define(CONNACK_CREDENTIALS, 4). %% bad user name or password
|
||||||
-define(CONNACK_AUTH, 5). %% not authorized
|
-define(CONNACK_AUTH, 5). %% not authorized
|
||||||
|
|
||||||
|
-record(state, {socket,
|
||||||
|
conn_name,
|
||||||
|
await_recv,
|
||||||
|
connection_state,
|
||||||
|
conserve,
|
||||||
|
parse_state,
|
||||||
|
message_id,
|
||||||
|
client_id,
|
||||||
|
clean_sess,
|
||||||
|
will_msg,
|
||||||
|
keep_alive,
|
||||||
|
awaiting_ack,
|
||||||
|
subtopics,
|
||||||
|
awaiting_rel}).
|
||||||
|
|
||||||
|
|
||||||
-record(mqtt_frame, {fixed,
|
-record(mqtt_frame, {fixed,
|
||||||
variable,
|
variable,
|
||||||
|
|
|
@ -43,24 +43,6 @@
|
||||||
|
|
||||||
-include("emqtt_frame.hrl").
|
-include("emqtt_frame.hrl").
|
||||||
|
|
||||||
-define(CLIENT_ID_MAXLEN, 23).
|
|
||||||
|
|
||||||
-record(state, {socket,
|
|
||||||
conn_name,
|
|
||||||
await_recv,
|
|
||||||
connection_state,
|
|
||||||
conserve,
|
|
||||||
parse_state,
|
|
||||||
message_id,
|
|
||||||
client_id,
|
|
||||||
clean_sess,
|
|
||||||
will_msg,
|
|
||||||
keep_alive,
|
|
||||||
awaiting_ack,
|
|
||||||
subtopics,
|
|
||||||
awaiting_rel}).
|
|
||||||
|
|
||||||
|
|
||||||
-define(FRAME_TYPE(Frame, Type),
|
-define(FRAME_TYPE(Frame, Type),
|
||||||
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
|
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
|
||||||
|
|
||||||
|
@ -233,140 +215,6 @@ process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}},
|
||||||
{err, Reason, State}
|
{err, Reason, State}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
process_request(?CONNECT,
|
|
||||||
#mqtt_frame{ variable = #mqtt_frame_connect{
|
|
||||||
username = Username,
|
|
||||||
password = Password,
|
|
||||||
proto_ver = ProtoVersion,
|
|
||||||
clean_sess = CleanSess,
|
|
||||||
keep_alive = AlivePeriod,
|
|
||||||
client_id = ClientId } = Var}, #state{socket = Sock} = State) ->
|
|
||||||
{ReturnCode, State1} =
|
|
||||||
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
|
|
||||||
valid_client_id(ClientId)} of
|
|
||||||
{false, _} ->
|
|
||||||
{?CONNACK_PROTO_VER, State};
|
|
||||||
{_, false} ->
|
|
||||||
{?CONNACK_INVALID_ID, State};
|
|
||||||
_ ->
|
|
||||||
case emqtt_auth:check(Username, Password) of
|
|
||||||
false ->
|
|
||||||
?ERROR_MSG("MQTT login failed - no credentials"),
|
|
||||||
{?CONNACK_CREDENTIALS, State};
|
|
||||||
true ->
|
|
||||||
?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]),
|
|
||||||
emqtt_cm:create(ClientId, self()),
|
|
||||||
KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
|
|
||||||
{?CONNACK_ACCEPT,
|
|
||||||
State #state{ will_msg = make_will_msg(Var),
|
|
||||||
client_id = ClientId,
|
|
||||||
keep_alive = KeepAlive}}
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
?INFO("recv conn...:~p", [ReturnCode]),
|
|
||||||
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK},
|
|
||||||
variable = #mqtt_frame_connack{
|
|
||||||
return_code = ReturnCode }}),
|
|
||||||
{ok, State1};
|
|
||||||
|
|
||||||
process_request(?PUBLISH, Frame=#mqtt_frame{
|
|
||||||
fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) ->
|
|
||||||
emqtt_pubsub:publish(make_msg(Frame)),
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
process_request(?PUBLISH,
|
|
||||||
Frame=#mqtt_frame{
|
|
||||||
fixed = #mqtt_frame_fixed{qos = ?QOS_1},
|
|
||||||
variable = #mqtt_frame_publish{message_id = MsgId}},
|
|
||||||
State=#state{socket=Sock}) ->
|
|
||||||
emqtt_pubsub:publish(make_msg(Frame)),
|
|
||||||
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK },
|
|
||||||
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
process_request(?PUBLISH,
|
|
||||||
Frame=#mqtt_frame{
|
|
||||||
fixed = #mqtt_frame_fixed{qos = ?QOS_2},
|
|
||||||
variable = #mqtt_frame_publish{message_id = MsgId}},
|
|
||||||
State=#state{socket=Sock}) ->
|
|
||||||
emqtt_pubsub:publish(make_msg(Frame)),
|
|
||||||
put({msg, MsgId}, pubrec),
|
|
||||||
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC},
|
|
||||||
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
|
||||||
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
process_request(?PUBACK, #mqtt_frame{}, State) ->
|
|
||||||
%TODO: fixme later
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
process_request(?PUBREC, #mqtt_frame{
|
|
||||||
variable = #mqtt_frame_publish{message_id = MsgId}},
|
|
||||||
State=#state{socket=Sock}) ->
|
|
||||||
%TODO: fixme later
|
|
||||||
send_frame(Sock,
|
|
||||||
#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL},
|
|
||||||
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
process_request(?PUBREL,
|
|
||||||
#mqtt_frame{
|
|
||||||
variable = #mqtt_frame_publish{message_id = MsgId}},
|
|
||||||
State=#state{socket=Sock}) ->
|
|
||||||
erase({msg, MsgId}),
|
|
||||||
send_frame(Sock,
|
|
||||||
#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP},
|
|
||||||
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
process_request(?PUBCOMP, #mqtt_frame{
|
|
||||||
variable = #mqtt_frame_publish{message_id = _MsgId}}, State) ->
|
|
||||||
%TODO: fixme later
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
process_request(?SUBSCRIBE,
|
|
||||||
#mqtt_frame{
|
|
||||||
variable = #mqtt_frame_subscribe{message_id = MessageId,
|
|
||||||
topic_table = Topics},
|
|
||||||
payload = undefined},
|
|
||||||
#state{socket=Sock} = State) ->
|
|
||||||
|
|
||||||
[emqtt_pubsub:subscribe({Name, Qos}, self()) ||
|
|
||||||
#mqtt_topic{name=Name, qos=Qos} <- Topics],
|
|
||||||
|
|
||||||
GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics],
|
|
||||||
|
|
||||||
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
|
|
||||||
variable = #mqtt_frame_suback{
|
|
||||||
message_id = MessageId,
|
|
||||||
qos_table = GrantedQos}}),
|
|
||||||
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
process_request(?UNSUBSCRIBE,
|
|
||||||
#mqtt_frame{
|
|
||||||
variable = #mqtt_frame_subscribe{message_id = MessageId,
|
|
||||||
topic_table = Topics },
|
|
||||||
payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) ->
|
|
||||||
|
|
||||||
|
|
||||||
[emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
|
|
||||||
|
|
||||||
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK },
|
|
||||||
variable = #mqtt_frame_suback{message_id = MessageId }}),
|
|
||||||
|
|
||||||
{ok, State};
|
|
||||||
|
|
||||||
process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) ->
|
|
||||||
%Keep alive timer
|
|
||||||
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
|
|
||||||
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}),
|
|
||||||
{ok, State#state{keep_alive=KeepAlive1}};
|
|
||||||
|
|
||||||
process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) ->
|
|
||||||
?INFO("~s disconnected", [ClientId]),
|
|
||||||
{stop, State}.
|
|
||||||
|
|
||||||
next_msg_id(State = #state{ message_id = 16#ffff }) ->
|
next_msg_id(State = #state{ message_id = 16#ffff }) ->
|
||||||
State #state{ message_id = 1 };
|
State #state{ message_id = 1 };
|
||||||
next_msg_id(State = #state{ message_id = MsgId }) ->
|
next_msg_id(State = #state{ message_id = MsgId }) ->
|
||||||
|
|
|
@ -1,5 +1,167 @@
|
||||||
|
%%-----------------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2014, Feng Lee <feng@slimchat.io>
|
||||||
|
%%
|
||||||
|
%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
%% of this software and associated documentation files (the "Software"), to deal
|
||||||
|
%% in the Software without restriction, including without limitation the rights
|
||||||
|
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
%% copies of the Software, and to permit persons to whom the Software is
|
||||||
|
%% furnished to do so, subject to the following conditions:
|
||||||
|
%%
|
||||||
|
%% The above copyright notice and this permission notice shall be included in all
|
||||||
|
%% copies or substantial portions of the Software.
|
||||||
|
%%
|
||||||
|
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
%% SOFTWARE.
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqtt_protocol).
|
-module(emqtt_protocol).
|
||||||
|
|
||||||
|
-include("emqtt.hrl").
|
||||||
|
|
||||||
|
-include("emqtt_log.hrl").
|
||||||
|
|
||||||
-include("emqtt_frame.hrl").
|
-include("emqtt_frame.hrl").
|
||||||
|
|
||||||
|
-export([process_request/3]).
|
||||||
|
|
||||||
|
process_request(?CONNECT,
|
||||||
|
#mqtt_frame{ variable = #mqtt_frame_connect{
|
||||||
|
username = Username,
|
||||||
|
password = Password,
|
||||||
|
proto_ver = ProtoVersion,
|
||||||
|
clean_sess = CleanSess,
|
||||||
|
keep_alive = AlivePeriod,
|
||||||
|
client_id = ClientId } = Var}, #state{socket = Sock} = State) ->
|
||||||
|
{ReturnCode, State1} =
|
||||||
|
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
|
||||||
|
valid_client_id(ClientId)} of
|
||||||
|
{false, _} ->
|
||||||
|
{?CONNACK_PROTO_VER, State};
|
||||||
|
{_, false} ->
|
||||||
|
{?CONNACK_INVALID_ID, State};
|
||||||
|
_ ->
|
||||||
|
case emqtt_auth:check(Username, Password) of
|
||||||
|
false ->
|
||||||
|
?ERROR_MSG("MQTT login failed - no credentials"),
|
||||||
|
{?CONNACK_CREDENTIALS, State};
|
||||||
|
true ->
|
||||||
|
?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]),
|
||||||
|
emqtt_cm:create(ClientId, self()),
|
||||||
|
KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
|
||||||
|
{?CONNACK_ACCEPT,
|
||||||
|
State #state{ will_msg = make_will_msg(Var),
|
||||||
|
client_id = ClientId,
|
||||||
|
keep_alive = KeepAlive}}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
?INFO("recv conn...:~p", [ReturnCode]),
|
||||||
|
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK},
|
||||||
|
variable = #mqtt_frame_connack{
|
||||||
|
return_code = ReturnCode }}),
|
||||||
|
{ok, State1};
|
||||||
|
|
||||||
|
process_request(?PUBLISH, Frame=#mqtt_frame{
|
||||||
|
fixed = #mqtt_frame_fixed{qos = ?QOS_0}}, State) ->
|
||||||
|
emqtt_pubsub:publish(make_msg(Frame)),
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
|
process_request(?PUBLISH,
|
||||||
|
Frame=#mqtt_frame{
|
||||||
|
fixed = #mqtt_frame_fixed{qos = ?QOS_1},
|
||||||
|
variable = #mqtt_frame_publish{message_id = MsgId}},
|
||||||
|
State=#state{socket=Sock}) ->
|
||||||
|
emqtt_pubsub:publish(make_msg(Frame)),
|
||||||
|
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBACK },
|
||||||
|
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
|
process_request(?PUBLISH,
|
||||||
|
Frame=#mqtt_frame{
|
||||||
|
fixed = #mqtt_frame_fixed{qos = ?QOS_2},
|
||||||
|
variable = #mqtt_frame_publish{message_id = MsgId}},
|
||||||
|
State=#state{socket=Sock}) ->
|
||||||
|
emqtt_pubsub:publish(make_msg(Frame)),
|
||||||
|
put({msg, MsgId}, pubrec),
|
||||||
|
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBREC},
|
||||||
|
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
||||||
|
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
|
process_request(?PUBACK, #mqtt_frame{}, State) ->
|
||||||
|
%TODO: fixme later
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
|
process_request(?PUBREC, #mqtt_frame{
|
||||||
|
variable = #mqtt_frame_publish{message_id = MsgId}},
|
||||||
|
State=#state{socket=Sock}) ->
|
||||||
|
%TODO: fixme later
|
||||||
|
send_frame(Sock,
|
||||||
|
#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBREL},
|
||||||
|
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
|
process_request(?PUBREL,
|
||||||
|
#mqtt_frame{
|
||||||
|
variable = #mqtt_frame_publish{message_id = MsgId}},
|
||||||
|
State=#state{socket=Sock}) ->
|
||||||
|
erase({msg, MsgId}),
|
||||||
|
send_frame(Sock,
|
||||||
|
#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBCOMP},
|
||||||
|
variable = #mqtt_frame_publish{ message_id = MsgId}}),
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
|
process_request(?PUBCOMP, #mqtt_frame{
|
||||||
|
variable = #mqtt_frame_publish{message_id = _MsgId}}, State) ->
|
||||||
|
%TODO: fixme later
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
|
process_request(?SUBSCRIBE,
|
||||||
|
#mqtt_frame{
|
||||||
|
variable = #mqtt_frame_subscribe{message_id = MessageId,
|
||||||
|
topic_table = Topics},
|
||||||
|
payload = undefined},
|
||||||
|
#state{socket=Sock} = State) ->
|
||||||
|
|
||||||
|
[emqtt_pubsub:subscribe({Name, Qos}, self()) ||
|
||||||
|
#mqtt_topic{name=Name, qos=Qos} <- Topics],
|
||||||
|
|
||||||
|
GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics],
|
||||||
|
|
||||||
|
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
|
||||||
|
variable = #mqtt_frame_suback{
|
||||||
|
message_id = MessageId,
|
||||||
|
qos_table = GrantedQos}}),
|
||||||
|
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
|
process_request(?UNSUBSCRIBE,
|
||||||
|
#mqtt_frame{
|
||||||
|
variable = #mqtt_frame_subscribe{message_id = MessageId,
|
||||||
|
topic_table = Topics },
|
||||||
|
payload = undefined}, #state{socket = Sock, client_id = ClientId} = State) ->
|
||||||
|
|
||||||
|
|
||||||
|
[emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
|
||||||
|
|
||||||
|
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?UNSUBACK },
|
||||||
|
variable = #mqtt_frame_suback{message_id = MessageId }}),
|
||||||
|
|
||||||
|
{ok, State};
|
||||||
|
|
||||||
|
process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) ->
|
||||||
|
%Keep alive timer
|
||||||
|
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
|
||||||
|
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}),
|
||||||
|
{ok, State#state{keep_alive=KeepAlive1}};
|
||||||
|
|
||||||
|
process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) ->
|
||||||
|
?INFO("~s disconnected", [ClientId]),
|
||||||
|
{stop, State}.
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue