add auth support

This commit is contained in:
erylee 2012-12-22 16:42:32 +08:00
parent 87db1acf2b
commit 425bc2157e
11 changed files with 244 additions and 150 deletions

View File

@ -111,11 +111,47 @@ case "$1" in
$NODETOOL rpc emqtt_ctl cluster $@
;;
add_user)
if [ $# -ne 3 ]; then
echo "Usage: $SCRIPT add_user <Username> <Password>"
exit 1
fi
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqtt is not running!"
exit 1
fi
shift
$NODETOOL rpc emqtt_ctl add_user $@
;;
delete_user)
if [ $# -ne 2 ]; then
echo "Usage: $SCRIPT delete_user <Username>"
exit 1
fi
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqtt is not running!"
exit 1
fi
shift
$NODETOOL rpc emqtt_ctl delete_user $@
;;
*)
echo "Usage: $SCRIPT"
echo " status #query emqtt status"
echo " cluster_info #query cluster nodes"
echo " cluster <Node> #cluster node"
echo " add_user <Username> <Password> #add user"
echo " delete_user <Username> #delete user"
exit 1
;;

View File

@ -22,6 +22,7 @@
]}
]},
{emqtt, [
{auth, {internal, []}}, %internal, anonymous
{listeners, [
{1883, [
binary,

Binary file not shown.

67
src/emqtt_auth.erl Normal file
View File

@ -0,0 +1,67 @@
-module(emqtt_auth).
-include("emqtt.hrl").
-export([start_link/0,
add/2,
check/2,
delete/1]).
-behavior(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state, {authmod, authopts}).
start_link() ->
gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
check(Username, Password) when is_binary(Username) ->
gen_server2:call(?MODULE, {check, Username, Password}).
add(Username, Password) when is_binary(Username) ->
gen_server2:call(?MODULE, {add, Username, Password}).
delete(Username) when is_binary(Username) ->
gen_server2:cast(?MODULE, {delete, Username}).
init([]) ->
{ok, {Name, Opts}} = application:get_env(auth),
AuthMod = authmod(Name),
ok = AuthMod:init(Opts),
?INFO("authmod is ~p", [AuthMod]),
?INFO("~p is started", [?MODULE]),
{ok, #state{authmod=AuthMod, authopts=Opts}}.
authmod(Name) when is_atom(Name) ->
list_to_atom(lists:concat(["emqtt_auth_", Name])).
handle_call({check, Username, Password}, _From, #state{authmod=AuthMod} = State) ->
{reply, AuthMod:check(Username, Password), State};
handle_call({add, Username, Password}, _From, #state{authmod=AuthMod} = State) ->
{reply, AuthMod:add(Username, Password), State};
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast({delete, Username}, #state{authmod=AuthMod} = State) ->
AuthMod:delete(Username),
{noreply, State};
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -0,0 +1,18 @@
-module(emqtt_auth_anonymous).
-export([init/1,
add/2,
check/2,
delete/1]).
init(_Opts) ->
ok.
check(Username, _) when is_binary(Username) ->
true.
add(Username, _Password) when is_binary(Username) ->
ok.
delete(Username) when is_binary(Username) ->
ok.

View File

@ -0,0 +1,29 @@
-module(emqtt_auth_internal).
-include("emqtt.hrl").
-export([init/1,
add/2,
check/2,
delete/1]).
init(_Opts) ->
mnesia:create_table(internal_user, [
{ram_copies, [node()]},
{attributes, record_info(fields, internal_user)}]),
mnesia:add_table_copy(internal_user, node(), ram_copies),
ok.
check(Username, Password) when is_binary(Username) ->
PasswdHash = crypto:md5(Password),
case mnesia:dirty_read(internal_user, Username) of
[#internal_user{passwdhash=PasswdHash}] -> true;
_ -> false
end.
add(Username, Password) ->
mnesia:dirty_write(#internal_user{username=Username, passwdhash=crypto:md5(Password)}).
delete(Username) ->
mnesia:dirty_delete(internal_user, Username).

View File

@ -68,7 +68,7 @@ handle_call({go, Sock}, _From, _State) ->
parse_state = emqtt_frame:initial_state(),
message_id = 1,
subscriptions = dict:new(),
awaiting_ack = gb_tree:empty()})}.
awaiting_ack = gb_trees:empty()})}.
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
@ -241,11 +241,8 @@ process_request(?SUBSCRIBE,
[SupportedQos | QosList]
end, [], Topics),
[emqtt_topic:insert(Name) || #mqtt_topic{name=Name} <- Topics],
[emqtt_router:subscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
[emqtt_router:insert(#subscriber{topic=emqtt_util:binary(Name), pid=self()})
|| #mqtt_topic{name=Name} <- Topics],
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK },
variable = #mqtt_frame_suback{
message_id = MessageId,
@ -261,8 +258,7 @@ process_request(?UNSUBSCRIBE,
subscriptions = Subs0} = State) ->
[emqtt_router:delete(#subscriber{topic=Name, pid=self()})
|| #mqtt_topic{name=Name} <- Topics],
[emqtt_router:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK },
variable = #mqtt_frame_suback{ message_id = MessageId }}),

View File

@ -26,3 +26,8 @@ cluster(Node) ->
?PRINT("failed to cluster with ~p~n", [Node])
end.
add_user(Username, Password) ->
?PRINT("~p", [emqtt_auth:add(list_to_binary(Username), list_to_binary(Password))]).
delete_user(Username) ->
?PRINT("~p", [emqtt_auth:delete(list_to_binary(Username))]).

View File

@ -6,10 +6,9 @@
-export([start_link/0]).
-export([route/1,
route/2,
insert/1,
delete/1]).
-export([subscribe/2,
unsubscribe/2,
publish/2]).
-behaviour(gen_server).
@ -23,41 +22,72 @@
-record(state, {}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
binary(S) when is_list(S) ->
list_to_binary(S);
subscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) ->
gen_server2:call(?MODULE, {subscribe, Topic, Client}).
binary(B) when is_binary(B) ->
B.
unsubscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) ->
gen_server2:cast(?MODULE, {unsubscribe, Topic, Client}).
route(#mqtt_msg{topic=Topic}=Msg) when is_record(Msg, mqtt_msg) ->
error_logger:info_msg("route msg: ~p~n", [Msg]),
[ Pid ! {route, Msg} || #subscriber{pid=Pid} <- ets:lookup(subscriber, binary(Topic)) ].
publish(Topic, Msg) when is_binary(Topic) and is_record(Msg, mqtt_msg) ->
[
[Client ! {route, Msg} || #subscriber{client=Client} <- ets:lookup(subscriber, Path)]
|| #topic{path=Path} <- match(Topic)].
route(Topic, Msg) ->
[ Pid ! {route, Msg} || #subscriber{pid=Pid} <- ets:lookup(subscriber, Topic) ].
insert(Sub) when is_record(Sub, subscriber) ->
gen_server:call(?MODULE, {insert, Sub}).
delete(Sub) when is_record(Sub, subscriber) ->
gen_server:cast(?MODULE, {delete, Sub}).
match(Topic) when is_binary(Topic) ->
Words = topic_split(Topic),
DirectMatches = mnesia:dirty_read(direct_topic, Words),
WildcardMatches = lists:append([
mnesia:dirty_read(wildcard_topic, Key) ||
Key <- mnesia:dirty_all_keys(wildcard_topic),
topic_match(Words, Key)
]),
DirectMatches ++ WildcardMatches.
init([]) ->
mnesia:create_table(
direct_topic, [
{record_name, topic},
{ram_copies, [node()]},
{attributes, record_info(fields, topic)}]),
mnesia:add_table_copy(direct_topic, node(), ram_copies),
mnesia:create_table(
wildcard_topic, [
{record_name, topic},
{ram_copies, [node()]},
{attributes, record_info(fields, topic)}]),
mnesia:add_table_copy(wildcard_topic, node(), ram_copies),
ets:new(subscriber, [bag, named_table, {keypos, 2}]),
?INFO_MSG("emqtt_router is started."),
{ok, #state{}}.
handle_call({insert, Sub}, _From, State) ->
ets:insert(subscriber, Sub),
handle_call({subscribe, Topic, Client}, _From, State) ->
Words = topic_split(Topic),
case topic_type(Words) of
direct ->
ok = mnesia:dirty_write(direct_topic, #topic{words=Words, path=Topic});
wildcard ->
ok = mnesia:dirty_write(wildcard_topic, #topic{words=Words, path=Topic})
end,
ets:insert(subscriber, #subscriber{topic=Topic, client=Client}),
{reply, ok, State};
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast({delete, Sub}, State) ->
ets:delete_object(subscriber, Sub),
handle_cast({unsubscribe, Topic, Client}, State) ->
ets:delete_object(subscriber, #subscriber{topic=Topic, client=Client}),
%TODO: how to remove topic
%
%Words = topic_split(Topic),
%case topic_type(Words) of
%direct ->
% mnesia:dirty_delete(direct_topic, #topic{words=Words, path=Topic});
%wildcard ->
% mnesia:direct_delete(wildcard_topic, #topic{words=Words, path=Topic})
%end,
{noreply, State};
handle_cast(Msg, State) ->
@ -72,4 +102,35 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, _State, _Extra) ->
ok.
%--------------------------------------
% internal functions
%--------------------------------------
topic_type([]) ->
direct;
topic_type([<<"#">>]) ->
wildcard;
topic_type([<<"+">>|_T]) ->
wildcard;
topic_type([_|T]) ->
topic_type(T).
topic_match([], []) ->
true;
topic_match([H|T1], [H|T2]) ->
topic_match(T1, T2);
topic_match([_H|T1], [<<"+">>|T2]) ->
topic_match(T1, T2);
topic_match(_, [<<"#">>]) ->
true;
topic_match([], [_H|_T2]) ->
false.
topic_split(S) ->
binary:split(S, [<<"/">>], [global]).

View File

@ -26,7 +26,7 @@ start_link(Listeners) ->
init([Listeners]) ->
{ok, { {one_for_all, 5, 10}, [
?CHILD(emqtt_topic, worker),
?CHILD(emqtt_auth, worker),
?CHILD(emqtt_router, worker),
?CHILD(emqtt_client_sup, supervisor)
| listener_children(Listeners) ]}

View File

@ -1,119 +0,0 @@
-module(emqtt_topic).
-include("emqtt.hrl").
-export([start_link/0,
match/1,
insert/1,
delete/1]).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state, {}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
match(Topic0) ->
Topic = emqtt_util:binary(Topic0),
Words = topic_split(Topic),
DirectMatches = mnesia:dirty_read(direct_topic, Words),
WildcardMatches = lists:append([
mnesia:dirty_read(wildcard_topic, Key) ||
Key <- mnesia:dirty_all_keys(wildcard_topic),
topic_match(Words, Key)
]),
DirectMatches ++ WildcardMatches.
insert(Topic) ->
gen_server:call(?MODULE, {insert, emqtt_util:binary(Topic)}).
delete(Topic) ->
gen_server:cast(?MODULE, {delete, emqtt_util:binary(Topic)}).
init([]) ->
{atomic, ok} = mnesia:create_table(
direct_topic, [
{record_name, topic},
{ram_copies, [node()]},
{attributes, record_info(fields, topic)}]),
{atomic, ok} = mnesia:create_table(
wildcard_topic, [
{record_name, topic},
{ram_copies, [node()]},
{attributes, record_info(fields, topic)}]),
?INFO_MSG("emqtt_topic is started."),
{ok, #state{}}.
handle_call({insert, Topic}, _From, State) ->
Words = topic_split(Topic),
Reply =
case topic_type(Words) of
direct ->
mnesia:dirty_write(direct_topic, #topic{words=Words, path=Topic});
wildcard ->
mnesia:dirty_write(wildcard_topic, #topic{words=Words, path=Topic})
end,
{reply, Reply, State};
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast({delete, Topic}, State) ->
Words = topic_split(Topic),
case topic_type(Words) of
direct ->
mnesia:dirty_delete(direct_topic, #topic{words=Words, path=Topic});
wildcard ->
mnesia:direct_delete(wildcard_topic, #topic{words=Words, path=Topic})
end,
{noreply, State};
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, _State, _Extra) ->
ok.
topic_type([]) ->
direct;
topic_type([<<"#">>]) ->
wildcard;
topic_type([<<"+">>|_T]) ->
wildcard;
topic_type([_|T]) ->
topic_type(T).
topic_match([], []) ->
true;
topic_match([H|T1], [H|T2]) ->
topic_match(T1, T2);
topic_match([_H|T1], [<<"+">>|T2]) ->
topic_match(T1, T2);
topic_match(_, [<<"#">>]) ->
true;
topic_match([], [_H|_T2]) ->
false.
topic_split(S) ->
binary:split(S, [<<"/">>], [global]).