protocol and router

This commit is contained in:
Ery Lee 2014-12-30 13:12:17 +08:00
parent 7ab3194747
commit 930e9f4f9f
6 changed files with 115 additions and 57 deletions

View File

@ -18,14 +18,12 @@
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%% %%
-define(CLIENT_ID_MAXLEN, 1024).
-define(PROTOCOL_NAMES, [{3, <<"MQIsdp">>}, {4, <<"MQTT">>}]). -define(PROTOCOL_NAMES, [{3, <<"MQIsdp">>}, {4, <<"MQTT">>}]).
-define(MQTT_PROTO_MAJOR, 3). -define(MQTT_PROTO_MAJOR, 3).
-define(MQTT_PROTO_MINOR, 1). -define(MQTT_PROTO_MINOR, 1).
-define(CLIENT_ID_MAXLEN, 23). -define(CLIENT_ID_MAXLEN, 1024).
%% frame types %% frame types

View File

@ -43,20 +43,21 @@
-include("emqtt_frame.hrl"). -include("emqtt_frame.hrl").
-record(state, {socket, %%
conn_name, %-record(state, {socket,
await_recv, % conn_name,
connection_state, % await_recv,
conserve, % connection_state,
parse_state, % conserve,
message_id, % parse_state,
client_id, % message_id,
clean_sess, % client_id,
will_msg, % clean_sess,
keep_alive, % will_msg,
awaiting_ack, % keep_alive,
subtopics, % awaiting_ack,
awaiting_rel}). % subtopics,
% awaiting_rel}).
-define(FRAME_TYPE(Frame, Type), -define(FRAME_TYPE(Frame, Type),
@ -226,7 +227,7 @@ process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}},
ok -> ok ->
?INFO("frame from ~s: ~p", [ClientId, Frame]), ?INFO("frame from ~s: ~p", [ClientId, Frame]),
handle_retained(Type, Frame), handle_retained(Type, Frame),
process_request(Type, Frame, State#state{keep_alive=KeepAlive1}); emqtt_protocol:process_request(Type, Frame, State#state{keep_alive=KeepAlive1});
{error, Reason} -> {error, Reason} ->
{err, Reason, State} {err, Reason, State}
end. end.

View File

@ -29,10 +29,9 @@
-include("emqtt_log.hrl"). -include("emqtt_log.hrl").
-export([status/1, -export([status/1,
cluster_info/1,
cluster/1, cluster/1,
add_user/1, useradd/1,
delete_user/1]). userdel/1]).
status([]) -> status([]) ->
{InternalStatus, _ProvidedStatus} = init:get_status(), {InternalStatus, _ProvidedStatus} = init:get_status(),
@ -44,26 +43,47 @@ status([]) ->
?PRINT_MSG("emqtt is running~n") ?PRINT_MSG("emqtt is running~n")
end. end.
cluster_info([]) -> cluster([]) ->
Nodes = [node()|nodes()], Nodes = [node()|nodes()],
?PRINT("cluster nodes: ~p~n", [Nodes]). ?PRINT("cluster nodes: ~p~n", [Nodes]);
cluster([SNode]) -> cluster([SNode]) ->
Node = list_to_atom(SNode), Node = node_name(SNode),
case net_adm:ping(Node) of case net_adm:ping(Node) of
pong -> pong ->
application:stop(emqtt), application:stop(emqtt),
application:stop(esockd),
mnesia:stop(), mnesia:stop(),
mnesia:start(), mnesia:start(),
mnesia:change_config(extra_db_nodes, [Node]), mnesia:change_config(extra_db_nodes, [Node]),
application:start(esockd),
application:start(emqtt), application:start(emqtt),
?PRINT("cluster with ~p successfully.~n", [Node]); ?PRINT("cluster with ~p successfully.~n", [Node]);
pang -> pang ->
?PRINT("failed to connect to ~p~n", [Node]) ?PRINT("failed to connect to ~p~n", [Node])
end. end.
add_user([Username, Password]) ->
useradd([Username, Password]) ->
?PRINT("~p", [emqtt_auth:add(list_to_binary(Username), list_to_binary(Password))]). ?PRINT("~p", [emqtt_auth:add(list_to_binary(Username), list_to_binary(Password))]).
delete_user([Username]) -> userdel([Username]) ->
?PRINT("~p", [emqtt_auth:delete(list_to_binary(Username))]). ?PRINT("~p", [emqtt_auth:delete(list_to_binary(Username))]).
node_name(SNode) ->
SNode1 =
case string:tokens(SNode, "@") of
[_Node, _Server] ->
SNode;
_ ->
case net_kernel:longnames() of
true ->
SNode ++ "@" ++ inet_db:gethostname() ++
"." ++ inet_db:res_option(domain);
false ->
SNode ++ "@" ++ inet_db:gethostname();
_ ->
SNode
end
end,
list_to_atom(SNode1).

View File

@ -164,4 +164,39 @@ process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) ->
?INFO("~s disconnected", [ClientId]), ?INFO("~s disconnected", [ClientId]),
{stop, State}. {stop, State}.
send_frame(Sock, Frame) ->
?INFO("send frame:~p", [Frame]),
erlang:port_command(Sock, emqtt_frame:serialise(Frame)).
valid_client_id(ClientId) ->
ClientIdLen = size(ClientId),
1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN.
validate_frame(_Type, _Frame) ->
ok.
make_msg(#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = Qos,
retain = Retain,
dup = Dup},
variable = #mqtt_frame_publish{topic_name = Topic,
message_id = MessageId},
payload = Payload}) ->
#mqtt_msg{retain = Retain,
qos = Qos,
topic = Topic,
dup = Dup,
msgid = MessageId,
payload = Payload}.
make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
undefined;
make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
will_qos = Qos,
will_topic = Topic,
will_msg = Msg }) ->
#mqtt_msg{retain = Retain,
qos = Qos,
topic = Topic,
dup = false,
payload = Msg }.

View File

@ -0,0 +1,23 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2014, Feng Lee <feng@slimchat.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_router).

View File

@ -97,27 +97,9 @@ case "$1" in
$NODETOOL rpc emqtt_ctl status $@ $NODETOOL rpc emqtt_ctl status $@
;; ;;
cluster_info)
if [ $# -ne 1 ]; then
echo "Usage: $SCRIPT cluster_info"
exit 1
fi
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "Node is not running!"
exit 1
fi
shift
$NODETOOL rpc emqtt_ctl cluster_info $@
;;
cluster) cluster)
if [ $# -ne 2 ]; then if [ $# -gt 2 ]; then
echo "Usage: $SCRIPT cluster <Node>" echo "Usage: $SCRIPT cluster [<Node>]"
exit 1 exit 1
fi fi
@ -132,9 +114,9 @@ case "$1" in
$NODETOOL rpc emqtt_ctl cluster $@ $NODETOOL rpc emqtt_ctl cluster $@
;; ;;
add_user) useradd)
if [ $# -ne 3 ]; then if [ $# -ne 3 ]; then
echo "Usage: $SCRIPT add_user <Username> <Password>" echo "Usage: $SCRIPT useradd <Username> <Password>"
exit 1 exit 1
fi fi
@ -146,12 +128,12 @@ case "$1" in
fi fi
shift shift
$NODETOOL rpc emqtt_ctl add_user $@ $NODETOOL rpc emqtt_ctl useradd $@
;; ;;
delete_user) userdel)
if [ $# -ne 2 ]; then if [ $# -ne 2 ]; then
echo "Usage: $SCRIPT delete_user <Username>" echo "Usage: $SCRIPT userdel <Username>"
exit 1 exit 1
fi fi
@ -163,16 +145,15 @@ case "$1" in
fi fi
shift shift
$NODETOOL rpc emqtt_ctl delete_user $@ $NODETOOL rpc emqtt_ctl userdel $@
;; ;;
*) *)
echo "Usage: $SCRIPT" echo "Usage: $SCRIPT"
echo " status #query emqtt status" echo " status #query emqtt status"
echo " cluster_info #query cluster nodes" echo " cluster [<Node>] #query or cluster nodes"
echo " cluster <Node> #cluster node" echo " useradd <Username> <Password> #add user"
echo " add_user <Username> <Password> #add user" echo " userdel <Username> #delete user"
echo " delete_user <Username> #delete user"
exit 1 exit 1
;; ;;