diff --git a/apps/emqtt/include/emqtt_frame.hrl b/apps/emqtt/include/emqtt_frame.hrl index 9c233c6e5..5ea69a335 100644 --- a/apps/emqtt/include/emqtt_frame.hrl +++ b/apps/emqtt/include/emqtt_frame.hrl @@ -18,14 +18,12 @@ %% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% --define(CLIENT_ID_MAXLEN, 1024). - -define(PROTOCOL_NAMES, [{3, <<"MQIsdp">>}, {4, <<"MQTT">>}]). -define(MQTT_PROTO_MAJOR, 3). -define(MQTT_PROTO_MINOR, 1). --define(CLIENT_ID_MAXLEN, 23). +-define(CLIENT_ID_MAXLEN, 1024). %% frame types diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index ef7a01d2d..a5f3c0941 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -43,20 +43,21 @@ -include("emqtt_frame.hrl"). --record(state, {socket, - conn_name, - await_recv, - connection_state, - conserve, - parse_state, - message_id, - client_id, - clean_sess, - will_msg, - keep_alive, - awaiting_ack, - subtopics, - awaiting_rel}). +%% +%-record(state, {socket, +% conn_name, +% await_recv, +% connection_state, +% conserve, +% parse_state, +% message_id, +% client_id, +% clean_sess, +% will_msg, +% keep_alive, +% awaiting_ack, +% subtopics, +% awaiting_rel}). -define(FRAME_TYPE(Frame, Type), @@ -226,7 +227,7 @@ process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}}, ok -> ?INFO("frame from ~s: ~p", [ClientId, Frame]), handle_retained(Type, Frame), - process_request(Type, Frame, State#state{keep_alive=KeepAlive1}); + emqtt_protocol:process_request(Type, Frame, State#state{keep_alive=KeepAlive1}); {error, Reason} -> {err, Reason, State} end. diff --git a/apps/emqtt/src/emqtt_ctl.erl b/apps/emqtt/src/emqtt_ctl.erl index 9962ed73d..e24587914 100644 --- a/apps/emqtt/src/emqtt_ctl.erl +++ b/apps/emqtt/src/emqtt_ctl.erl @@ -29,10 +29,9 @@ -include("emqtt_log.hrl"). -export([status/1, - cluster_info/1, cluster/1, - add_user/1, - delete_user/1]). + useradd/1, + userdel/1]). status([]) -> {InternalStatus, _ProvidedStatus} = init:get_status(), @@ -44,26 +43,47 @@ status([]) -> ?PRINT_MSG("emqtt is running~n") end. -cluster_info([]) -> +cluster([]) -> Nodes = [node()|nodes()], - ?PRINT("cluster nodes: ~p~n", [Nodes]). + ?PRINT("cluster nodes: ~p~n", [Nodes]); cluster([SNode]) -> - Node = list_to_atom(SNode), + Node = node_name(SNode), case net_adm:ping(Node) of pong -> application:stop(emqtt), + application:stop(esockd), mnesia:stop(), mnesia:start(), mnesia:change_config(extra_db_nodes, [Node]), + application:start(esockd), application:start(emqtt), ?PRINT("cluster with ~p successfully.~n", [Node]); pang -> ?PRINT("failed to connect to ~p~n", [Node]) end. -add_user([Username, Password]) -> + +useradd([Username, Password]) -> ?PRINT("~p", [emqtt_auth:add(list_to_binary(Username), list_to_binary(Password))]). -delete_user([Username]) -> +userdel([Username]) -> ?PRINT("~p", [emqtt_auth:delete(list_to_binary(Username))]). + +node_name(SNode) -> + SNode1 = + case string:tokens(SNode, "@") of + [_Node, _Server] -> + SNode; + _ -> + case net_kernel:longnames() of + true -> + SNode ++ "@" ++ inet_db:gethostname() ++ + "." ++ inet_db:res_option(domain); + false -> + SNode ++ "@" ++ inet_db:gethostname(); + _ -> + SNode + end + end, + list_to_atom(SNode1). diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index 66af58d4f..39eec70e5 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -164,4 +164,39 @@ process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) -> ?INFO("~s disconnected", [ClientId]), {stop, State}. +send_frame(Sock, Frame) -> + ?INFO("send frame:~p", [Frame]), + erlang:port_command(Sock, emqtt_frame:serialise(Frame)). +valid_client_id(ClientId) -> + ClientIdLen = size(ClientId), + 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. + +validate_frame(_Type, _Frame) -> + ok. + +make_msg(#mqtt_frame{ + fixed = #mqtt_frame_fixed{qos = Qos, + retain = Retain, + dup = Dup}, + variable = #mqtt_frame_publish{topic_name = Topic, + message_id = MessageId}, + payload = Payload}) -> + #mqtt_msg{retain = Retain, + qos = Qos, + topic = Topic, + dup = Dup, + msgid = MessageId, + payload = Payload}. + +make_will_msg(#mqtt_frame_connect{ will_flag = false }) -> + undefined; +make_will_msg(#mqtt_frame_connect{ will_retain = Retain, + will_qos = Qos, + will_topic = Topic, + will_msg = Msg }) -> + #mqtt_msg{retain = Retain, + qos = Qos, + topic = Topic, + dup = false, + payload = Msg }. diff --git a/apps/emqtt/src/emqtt_router.erl b/apps/emqtt/src/emqtt_router.erl new file mode 100644 index 000000000..d8b9492c0 --- /dev/null +++ b/apps/emqtt/src/emqtt_router.erl @@ -0,0 +1,23 @@ +%%----------------------------------------------------------------------------- +%% Copyright (c) 2014, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ + +-module(emqtt_router). diff --git a/rel/files/emqtt_ctl b/rel/files/emqtt_ctl index becdee431..444439801 100755 --- a/rel/files/emqtt_ctl +++ b/rel/files/emqtt_ctl @@ -97,27 +97,9 @@ case "$1" in $NODETOOL rpc emqtt_ctl status $@ ;; - cluster_info) - if [ $# -ne 1 ]; then - echo "Usage: $SCRIPT cluster_info" - exit 1 - fi - - # Make sure the local node IS running - RES=`$NODETOOL ping` - if [ "$RES" != "pong" ]; then - echo "Node is not running!" - exit 1 - fi - shift - - $NODETOOL rpc emqtt_ctl cluster_info $@ - ;; - - cluster) - if [ $# -ne 2 ]; then - echo "Usage: $SCRIPT cluster " + if [ $# -gt 2 ]; then + echo "Usage: $SCRIPT cluster []" exit 1 fi @@ -132,9 +114,9 @@ case "$1" in $NODETOOL rpc emqtt_ctl cluster $@ ;; - add_user) + useradd) if [ $# -ne 3 ]; then - echo "Usage: $SCRIPT add_user " + echo "Usage: $SCRIPT useradd " exit 1 fi @@ -146,12 +128,12 @@ case "$1" in fi shift - $NODETOOL rpc emqtt_ctl add_user $@ + $NODETOOL rpc emqtt_ctl useradd $@ ;; - delete_user) + userdel) if [ $# -ne 2 ]; then - echo "Usage: $SCRIPT delete_user " + echo "Usage: $SCRIPT userdel " exit 1 fi @@ -163,16 +145,15 @@ case "$1" in fi shift - $NODETOOL rpc emqtt_ctl delete_user $@ + $NODETOOL rpc emqtt_ctl userdel $@ ;; *) echo "Usage: $SCRIPT" - echo " status #query emqtt status" - echo " cluster_info #query cluster nodes" - echo " cluster #cluster node" - echo " add_user #add user" - echo " delete_user #delete user" + echo " status #query emqtt status" + echo " cluster [] #query or cluster nodes" + echo " useradd #add user" + echo " userdel #delete user" exit 1 ;;