merge
This commit is contained in:
commit
484cf8ed79
9
TODO
9
TODO
|
@ -17,3 +17,12 @@
|
||||||
temporary, 5000, worker, [emqtt_client]}]}}.
|
temporary, 5000, worker, [emqtt_client]}]}}.
|
||||||
|
|
||||||
fucking stupid..... esockd locked
|
fucking stupid..... esockd locked
|
||||||
|
|
||||||
|
0.2.1
|
||||||
|
=====
|
||||||
|
|
||||||
|
full MQTT 3.1.1 support...
|
||||||
|
|
||||||
|
node cluster....
|
||||||
|
|
||||||
|
one million connections test...
|
||||||
|
|
|
@ -18,13 +18,13 @@
|
||||||
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
|
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
|
||||||
%%
|
%%
|
||||||
|
|
||||||
-define(CLIENT_ID_MAXLEN, 1024).
|
|
||||||
|
|
||||||
-define(PROTOCOL_NAMES, [{3, <<"MQIsdp">>}, {4, <<"MQTT">>}]).
|
-define(PROTOCOL_NAMES, [{3, <<"MQIsdp">>}, {4, <<"MQTT">>}]).
|
||||||
|
|
||||||
-define(MQTT_PROTO_MAJOR, 3).
|
-define(MQTT_PROTO_MAJOR, 3).
|
||||||
-define(MQTT_PROTO_MINOR, 1).
|
-define(MQTT_PROTO_MINOR, 1).
|
||||||
|
|
||||||
|
-define(CLIENT_ID_MAXLEN, 1024).
|
||||||
|
|
||||||
%% frame types
|
%% frame types
|
||||||
|
|
||||||
-define(CONNECT, 1).
|
-define(CONNECT, 1).
|
||||||
|
@ -104,4 +104,3 @@
|
||||||
|
|
||||||
-record(mqtt_frame_other, {other}).
|
-record(mqtt_frame_other, {other}).
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -43,20 +43,21 @@
|
||||||
|
|
||||||
-include("emqtt_frame.hrl").
|
-include("emqtt_frame.hrl").
|
||||||
|
|
||||||
-record(state, {socket,
|
%%
|
||||||
conn_name,
|
%-record(state, {socket,
|
||||||
await_recv,
|
% conn_name,
|
||||||
connection_state,
|
% await_recv,
|
||||||
conserve,
|
% connection_state,
|
||||||
parse_state,
|
% conserve,
|
||||||
message_id,
|
% parse_state,
|
||||||
client_id,
|
% message_id,
|
||||||
clean_sess,
|
% client_id,
|
||||||
will_msg,
|
% clean_sess,
|
||||||
keep_alive,
|
% will_msg,
|
||||||
awaiting_ack,
|
% keep_alive,
|
||||||
subtopics,
|
% awaiting_ack,
|
||||||
awaiting_rel}).
|
% subtopics,
|
||||||
|
% awaiting_rel}).
|
||||||
|
|
||||||
|
|
||||||
-define(FRAME_TYPE(Frame, Type),
|
-define(FRAME_TYPE(Frame, Type),
|
||||||
|
@ -226,7 +227,7 @@ process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}},
|
||||||
ok ->
|
ok ->
|
||||||
?INFO("frame from ~s: ~p", [ClientId, Frame]),
|
?INFO("frame from ~s: ~p", [ClientId, Frame]),
|
||||||
handle_retained(Type, Frame),
|
handle_retained(Type, Frame),
|
||||||
process_request(Type, Frame, State#state{keep_alive=KeepAlive1});
|
emqtt_protocol:process_request(Type, Frame, State#state{keep_alive=KeepAlive1});
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{err, Reason, State}
|
{err, Reason, State}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -29,10 +29,9 @@
|
||||||
-include("emqtt_log.hrl").
|
-include("emqtt_log.hrl").
|
||||||
|
|
||||||
-export([status/1,
|
-export([status/1,
|
||||||
cluster_info/1,
|
|
||||||
cluster/1,
|
cluster/1,
|
||||||
add_user/1,
|
useradd/1,
|
||||||
delete_user/1]).
|
userdel/1]).
|
||||||
|
|
||||||
status([]) ->
|
status([]) ->
|
||||||
{InternalStatus, _ProvidedStatus} = init:get_status(),
|
{InternalStatus, _ProvidedStatus} = init:get_status(),
|
||||||
|
@ -44,26 +43,47 @@ status([]) ->
|
||||||
?PRINT_MSG("emqtt is running~n")
|
?PRINT_MSG("emqtt is running~n")
|
||||||
end.
|
end.
|
||||||
|
|
||||||
cluster_info([]) ->
|
cluster([]) ->
|
||||||
Nodes = [node()|nodes()],
|
Nodes = [node()|nodes()],
|
||||||
?PRINT("cluster nodes: ~p~n", [Nodes]).
|
?PRINT("cluster nodes: ~p~n", [Nodes]);
|
||||||
|
|
||||||
cluster([SNode]) ->
|
cluster([SNode]) ->
|
||||||
Node = list_to_atom(SNode),
|
Node = node_name(SNode),
|
||||||
case net_adm:ping(Node) of
|
case net_adm:ping(Node) of
|
||||||
pong ->
|
pong ->
|
||||||
application:stop(emqtt),
|
application:stop(emqtt),
|
||||||
|
application:stop(esockd),
|
||||||
mnesia:stop(),
|
mnesia:stop(),
|
||||||
mnesia:start(),
|
mnesia:start(),
|
||||||
mnesia:change_config(extra_db_nodes, [Node]),
|
mnesia:change_config(extra_db_nodes, [Node]),
|
||||||
|
application:start(esockd),
|
||||||
application:start(emqtt),
|
application:start(emqtt),
|
||||||
?PRINT("cluster with ~p successfully.~n", [Node]);
|
?PRINT("cluster with ~p successfully.~n", [Node]);
|
||||||
pang ->
|
pang ->
|
||||||
?PRINT("failed to connect to ~p~n", [Node])
|
?PRINT("failed to connect to ~p~n", [Node])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
add_user([Username, Password]) ->
|
|
||||||
|
useradd([Username, Password]) ->
|
||||||
?PRINT("~p", [emqtt_auth:add(list_to_binary(Username), list_to_binary(Password))]).
|
?PRINT("~p", [emqtt_auth:add(list_to_binary(Username), list_to_binary(Password))]).
|
||||||
|
|
||||||
delete_user([Username]) ->
|
userdel([Username]) ->
|
||||||
?PRINT("~p", [emqtt_auth:delete(list_to_binary(Username))]).
|
?PRINT("~p", [emqtt_auth:delete(list_to_binary(Username))]).
|
||||||
|
|
||||||
|
node_name(SNode) ->
|
||||||
|
SNode1 =
|
||||||
|
case string:tokens(SNode, "@") of
|
||||||
|
[_Node, _Server] ->
|
||||||
|
SNode;
|
||||||
|
_ ->
|
||||||
|
case net_kernel:longnames() of
|
||||||
|
true ->
|
||||||
|
SNode ++ "@" ++ inet_db:gethostname() ++
|
||||||
|
"." ++ inet_db:res_option(domain);
|
||||||
|
false ->
|
||||||
|
SNode ++ "@" ++ inet_db:gethostname();
|
||||||
|
_ ->
|
||||||
|
SNode
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
list_to_atom(SNode1).
|
||||||
|
|
|
@ -164,4 +164,39 @@ process_request(?DISCONNECT, #mqtt_frame{}, State=#state{client_id=ClientId}) ->
|
||||||
?INFO("~s disconnected", [ClientId]),
|
?INFO("~s disconnected", [ClientId]),
|
||||||
{stop, State}.
|
{stop, State}.
|
||||||
|
|
||||||
|
send_frame(Sock, Frame) ->
|
||||||
|
?INFO("send frame:~p", [Frame]),
|
||||||
|
erlang:port_command(Sock, emqtt_frame:serialise(Frame)).
|
||||||
|
|
||||||
|
valid_client_id(ClientId) ->
|
||||||
|
ClientIdLen = size(ClientId),
|
||||||
|
1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN.
|
||||||
|
|
||||||
|
validate_frame(_Type, _Frame) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
make_msg(#mqtt_frame{
|
||||||
|
fixed = #mqtt_frame_fixed{qos = Qos,
|
||||||
|
retain = Retain,
|
||||||
|
dup = Dup},
|
||||||
|
variable = #mqtt_frame_publish{topic_name = Topic,
|
||||||
|
message_id = MessageId},
|
||||||
|
payload = Payload}) ->
|
||||||
|
#mqtt_msg{retain = Retain,
|
||||||
|
qos = Qos,
|
||||||
|
topic = Topic,
|
||||||
|
dup = Dup,
|
||||||
|
msgid = MessageId,
|
||||||
|
payload = Payload}.
|
||||||
|
|
||||||
|
make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
|
||||||
|
undefined;
|
||||||
|
make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
|
||||||
|
will_qos = Qos,
|
||||||
|
will_topic = Topic,
|
||||||
|
will_msg = Msg }) ->
|
||||||
|
#mqtt_msg{retain = Retain,
|
||||||
|
qos = Qos,
|
||||||
|
topic = Topic,
|
||||||
|
dup = false,
|
||||||
|
payload = Msg }.
|
||||||
|
|
|
@ -25,4 +25,3 @@
|
||||||
%%Router Chain-->
|
%%Router Chain-->
|
||||||
%%--->In
|
%%--->In
|
||||||
%%Out<---
|
%%Out<---
|
||||||
|
|
||||||
|
|
|
@ -97,27 +97,9 @@ case "$1" in
|
||||||
$NODETOOL rpc emqtt_ctl status $@
|
$NODETOOL rpc emqtt_ctl status $@
|
||||||
;;
|
;;
|
||||||
|
|
||||||
cluster_info)
|
|
||||||
if [ $# -ne 1 ]; then
|
|
||||||
echo "Usage: $SCRIPT cluster_info"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Make sure the local node IS running
|
|
||||||
RES=`$NODETOOL ping`
|
|
||||||
if [ "$RES" != "pong" ]; then
|
|
||||||
echo "Node is not running!"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
shift
|
|
||||||
|
|
||||||
$NODETOOL rpc emqtt_ctl cluster_info $@
|
|
||||||
;;
|
|
||||||
|
|
||||||
|
|
||||||
cluster)
|
cluster)
|
||||||
if [ $# -ne 2 ]; then
|
if [ $# -gt 2 ]; then
|
||||||
echo "Usage: $SCRIPT cluster <Node>"
|
echo "Usage: $SCRIPT cluster [<Node>]"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
@ -132,9 +114,9 @@ case "$1" in
|
||||||
$NODETOOL rpc emqtt_ctl cluster $@
|
$NODETOOL rpc emqtt_ctl cluster $@
|
||||||
;;
|
;;
|
||||||
|
|
||||||
add_user)
|
useradd)
|
||||||
if [ $# -ne 3 ]; then
|
if [ $# -ne 3 ]; then
|
||||||
echo "Usage: $SCRIPT add_user <Username> <Password>"
|
echo "Usage: $SCRIPT useradd <Username> <Password>"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
@ -146,12 +128,12 @@ case "$1" in
|
||||||
fi
|
fi
|
||||||
shift
|
shift
|
||||||
|
|
||||||
$NODETOOL rpc emqtt_ctl add_user $@
|
$NODETOOL rpc emqtt_ctl useradd $@
|
||||||
;;
|
;;
|
||||||
|
|
||||||
delete_user)
|
userdel)
|
||||||
if [ $# -ne 2 ]; then
|
if [ $# -ne 2 ]; then
|
||||||
echo "Usage: $SCRIPT delete_user <Username>"
|
echo "Usage: $SCRIPT userdel <Username>"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
@ -163,16 +145,15 @@ case "$1" in
|
||||||
fi
|
fi
|
||||||
shift
|
shift
|
||||||
|
|
||||||
$NODETOOL rpc emqtt_ctl delete_user $@
|
$NODETOOL rpc emqtt_ctl userdel $@
|
||||||
;;
|
;;
|
||||||
|
|
||||||
*)
|
*)
|
||||||
echo "Usage: $SCRIPT"
|
echo "Usage: $SCRIPT"
|
||||||
echo " status #query emqtt status"
|
echo " status #query emqtt status"
|
||||||
echo " cluster_info #query cluster nodes"
|
echo " cluster [<Node>] #query or cluster nodes"
|
||||||
echo " cluster <Node> #cluster node"
|
echo " useradd <Username> <Password> #add user"
|
||||||
echo " add_user <Username> <Password> #add user"
|
echo " userdel <Username> #delete user"
|
||||||
echo " delete_user <Username> #delete user"
|
|
||||||
exit 1
|
exit 1
|
||||||
;;
|
;;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue