diff --git a/bin/emqtt_ctl b/bin/emqtt_ctl index edd24c2dc..b056316c1 100755 --- a/bin/emqtt_ctl +++ b/bin/emqtt_ctl @@ -111,11 +111,47 @@ case "$1" in $NODETOOL rpc emqtt_ctl cluster $@ ;; + add_user) + if [ $# -ne 3 ]; then + echo "Usage: $SCRIPT add_user " + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqtt is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqtt_ctl add_user $@ + ;; + + delete_user) + if [ $# -ne 2 ]; then + echo "Usage: $SCRIPT delete_user " + exit 1 + fi + + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqtt is not running!" + exit 1 + fi + shift + + $NODETOOL rpc emqtt_ctl delete_user $@ + ;; + *) echo "Usage: $SCRIPT" echo " status #query emqtt status" echo " cluster_info #query cluster nodes" echo " cluster #cluster node" + echo " add_user #add user" + echo " delete_user #delete user" exit 1 ;; diff --git a/etc/emqtt.config b/etc/emqtt.config index 4512e5848..c995ebc38 100644 --- a/etc/emqtt.config +++ b/etc/emqtt.config @@ -22,6 +22,7 @@ ]} ]}, {emqtt, [ + {auth, {internal, []}}, %internal, anonymous {listeners, [ {1883, [ binary, diff --git a/src/.emqtt_frame.erl.swp b/src/.emqtt_frame.erl.swp deleted file mode 100644 index 6c2212e6f..000000000 Binary files a/src/.emqtt_frame.erl.swp and /dev/null differ diff --git a/src/emqtt_auth.erl b/src/emqtt_auth.erl new file mode 100644 index 000000000..e89252d90 --- /dev/null +++ b/src/emqtt_auth.erl @@ -0,0 +1,67 @@ +-module(emqtt_auth). + +-include("emqtt.hrl"). + +-export([start_link/0, + add/2, + check/2, + delete/1]). + +-behavior(gen_server). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, {authmod, authopts}). + +start_link() -> + gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []). + +check(Username, Password) when is_binary(Username) -> + gen_server2:call(?MODULE, {check, Username, Password}). + +add(Username, Password) when is_binary(Username) -> + gen_server2:call(?MODULE, {add, Username, Password}). + +delete(Username) when is_binary(Username) -> + gen_server2:cast(?MODULE, {delete, Username}). + +init([]) -> + {ok, {Name, Opts}} = application:get_env(auth), + AuthMod = authmod(Name), + ok = AuthMod:init(Opts), + ?INFO("authmod is ~p", [AuthMod]), + ?INFO("~p is started", [?MODULE]), + {ok, #state{authmod=AuthMod, authopts=Opts}}. + +authmod(Name) when is_atom(Name) -> + list_to_atom(lists:concat(["emqtt_auth_", Name])). + +handle_call({check, Username, Password}, _From, #state{authmod=AuthMod} = State) -> + {reply, AuthMod:check(Username, Password), State}; + +handle_call({add, Username, Password}, _From, #state{authmod=AuthMod} = State) -> + {reply, AuthMod:add(Username, Password), State}; + +handle_call(Req, _From, State) -> + {stop, {badreq, Req}, State}. + +handle_cast({delete, Username}, #state{authmod=AuthMod} = State) -> + AuthMod:delete(Username), + {noreply, State}; + +handle_cast(Msg, State) -> + {stop, {badmsg, Msg}, State}. + +handle_info(Info, State) -> + {stop, {badinfo, Info}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/emqtt_auth_anonymous.erl b/src/emqtt_auth_anonymous.erl new file mode 100644 index 000000000..f17dec334 --- /dev/null +++ b/src/emqtt_auth_anonymous.erl @@ -0,0 +1,18 @@ +-module(emqtt_auth_anonymous). + +-export([init/1, + add/2, + check/2, + delete/1]). + +init(_Opts) -> + ok. + +check(Username, _) when is_binary(Username) -> + true. + +add(Username, _Password) when is_binary(Username) -> + ok. + +delete(Username) when is_binary(Username) -> + ok. diff --git a/src/emqtt_auth_internal.erl b/src/emqtt_auth_internal.erl new file mode 100644 index 000000000..66b3a8e3e --- /dev/null +++ b/src/emqtt_auth_internal.erl @@ -0,0 +1,29 @@ +-module(emqtt_auth_internal). + +-include("emqtt.hrl"). + +-export([init/1, + add/2, + check/2, + delete/1]). + +init(_Opts) -> + mnesia:create_table(internal_user, [ + {ram_copies, [node()]}, + {attributes, record_info(fields, internal_user)}]), + mnesia:add_table_copy(internal_user, node(), ram_copies), + ok. + +check(Username, Password) when is_binary(Username) -> + PasswdHash = crypto:md5(Password), + case mnesia:dirty_read(internal_user, Username) of + [#internal_user{passwdhash=PasswdHash}] -> true; + _ -> false + end. + +add(Username, Password) -> + mnesia:dirty_write(#internal_user{username=Username, passwdhash=crypto:md5(Password)}). + +delete(Username) -> + mnesia:dirty_delete(internal_user, Username). + diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index 8615d6679..4cc9eca20 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -68,7 +68,7 @@ handle_call({go, Sock}, _From, _State) -> parse_state = emqtt_frame:initial_state(), message_id = 1, subscriptions = dict:new(), - awaiting_ack = gb_tree:empty()})}. + awaiting_ack = gb_trees:empty()})}. handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. @@ -241,11 +241,8 @@ process_request(?SUBSCRIBE, [SupportedQos | QosList] end, [], Topics), - [emqtt_topic:insert(Name) || #mqtt_topic{name=Name} <- Topics], + [emqtt_router:subscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], - [emqtt_router:insert(#subscriber{topic=emqtt_util:binary(Name), pid=self()}) - || #mqtt_topic{name=Name} <- Topics], - send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK }, variable = #mqtt_frame_suback{ message_id = MessageId, @@ -261,8 +258,7 @@ process_request(?UNSUBSCRIBE, subscriptions = Subs0} = State) -> - [emqtt_router:delete(#subscriber{topic=Name, pid=self()}) - || #mqtt_topic{name=Name} <- Topics], + [emqtt_router:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics], send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK }, variable = #mqtt_frame_suback{ message_id = MessageId }}), diff --git a/src/emqtt_ctl.erl b/src/emqtt_ctl.erl index e70ff41c0..c6673aefa 100644 --- a/src/emqtt_ctl.erl +++ b/src/emqtt_ctl.erl @@ -26,3 +26,8 @@ cluster(Node) -> ?PRINT("failed to cluster with ~p~n", [Node]) end. +add_user(Username, Password) -> + ?PRINT("~p", [emqtt_auth:add(list_to_binary(Username), list_to_binary(Password))]). + +delete_user(Username) -> + ?PRINT("~p", [emqtt_auth:delete(list_to_binary(Username))]). diff --git a/src/emqtt_router.erl b/src/emqtt_router.erl index 461356621..00c413920 100644 --- a/src/emqtt_router.erl +++ b/src/emqtt_router.erl @@ -6,10 +6,9 @@ -export([start_link/0]). --export([route/1, - route/2, - insert/1, - delete/1]). +-export([subscribe/2, + unsubscribe/2, + publish/2]). -behaviour(gen_server). @@ -23,41 +22,72 @@ -record(state, {}). start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []). -binary(S) when is_list(S) -> - list_to_binary(S); +subscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) -> + gen_server2:call(?MODULE, {subscribe, Topic, Client}). -binary(B) when is_binary(B) -> - B. +unsubscribe(Topic, Client) when is_binary(Topic) and is_pid(Client) -> + gen_server2:cast(?MODULE, {unsubscribe, Topic, Client}). -route(#mqtt_msg{topic=Topic}=Msg) when is_record(Msg, mqtt_msg) -> - error_logger:info_msg("route msg: ~p~n", [Msg]), - [ Pid ! {route, Msg} || #subscriber{pid=Pid} <- ets:lookup(subscriber, binary(Topic)) ]. +publish(Topic, Msg) when is_binary(Topic) and is_record(Msg, mqtt_msg) -> + [ + [Client ! {route, Msg} || #subscriber{client=Client} <- ets:lookup(subscriber, Path)] + || #topic{path=Path} <- match(Topic)]. -route(Topic, Msg) -> - [ Pid ! {route, Msg} || #subscriber{pid=Pid} <- ets:lookup(subscriber, Topic) ]. -insert(Sub) when is_record(Sub, subscriber) -> - gen_server:call(?MODULE, {insert, Sub}). - -delete(Sub) when is_record(Sub, subscriber) -> - gen_server:cast(?MODULE, {delete, Sub}). +match(Topic) when is_binary(Topic) -> + Words = topic_split(Topic), + DirectMatches = mnesia:dirty_read(direct_topic, Words), + WildcardMatches = lists:append([ + mnesia:dirty_read(wildcard_topic, Key) || + Key <- mnesia:dirty_all_keys(wildcard_topic), + topic_match(Words, Key) + ]), + DirectMatches ++ WildcardMatches. init([]) -> + mnesia:create_table( + direct_topic, [ + {record_name, topic}, + {ram_copies, [node()]}, + {attributes, record_info(fields, topic)}]), + mnesia:add_table_copy(direct_topic, node(), ram_copies), + mnesia:create_table( + wildcard_topic, [ + {record_name, topic}, + {ram_copies, [node()]}, + {attributes, record_info(fields, topic)}]), + mnesia:add_table_copy(wildcard_topic, node(), ram_copies), ets:new(subscriber, [bag, named_table, {keypos, 2}]), ?INFO_MSG("emqtt_router is started."), {ok, #state{}}. -handle_call({insert, Sub}, _From, State) -> - ets:insert(subscriber, Sub), +handle_call({subscribe, Topic, Client}, _From, State) -> + Words = topic_split(Topic), + case topic_type(Words) of + direct -> + ok = mnesia:dirty_write(direct_topic, #topic{words=Words, path=Topic}); + wildcard -> + ok = mnesia:dirty_write(wildcard_topic, #topic{words=Words, path=Topic}) + end, + ets:insert(subscriber, #subscriber{topic=Topic, client=Client}), {reply, ok, State}; handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. -handle_cast({delete, Sub}, State) -> - ets:delete_object(subscriber, Sub), +handle_cast({unsubscribe, Topic, Client}, State) -> + ets:delete_object(subscriber, #subscriber{topic=Topic, client=Client}), + %TODO: how to remove topic + % + %Words = topic_split(Topic), + %case topic_type(Words) of + %direct -> + % mnesia:dirty_delete(direct_topic, #topic{words=Words, path=Topic}); + %wildcard -> + % mnesia:direct_delete(wildcard_topic, #topic{words=Words, path=Topic}) + %end, {noreply, State}; handle_cast(Msg, State) -> @@ -72,4 +102,35 @@ terminate(_Reason, _State) -> code_change(_OldVsn, _State, _Extra) -> ok. +%-------------------------------------- +% internal functions +%-------------------------------------- + +topic_type([]) -> + direct; +topic_type([<<"#">>]) -> + wildcard; +topic_type([<<"+">>|_T]) -> + wildcard; +topic_type([_|T]) -> + topic_type(T). + +topic_match([], []) -> + true; + +topic_match([H|T1], [H|T2]) -> + topic_match(T1, T2); + +topic_match([_H|T1], [<<"+">>|T2]) -> + topic_match(T1, T2); + +topic_match(_, [<<"#">>]) -> + true; + +topic_match([], [_H|_T2]) -> + false. + +topic_split(S) -> + binary:split(S, [<<"/">>], [global]). + diff --git a/src/emqtt_sup.erl b/src/emqtt_sup.erl index 4d12fdf35..ed9f44148 100644 --- a/src/emqtt_sup.erl +++ b/src/emqtt_sup.erl @@ -26,7 +26,7 @@ start_link(Listeners) -> init([Listeners]) -> {ok, { {one_for_all, 5, 10}, [ - ?CHILD(emqtt_topic, worker), + ?CHILD(emqtt_auth, worker), ?CHILD(emqtt_router, worker), ?CHILD(emqtt_client_sup, supervisor) | listener_children(Listeners) ]} diff --git a/src/emqtt_topic.erl b/src/emqtt_topic.erl deleted file mode 100644 index f287e4d84..000000000 --- a/src/emqtt_topic.erl +++ /dev/null @@ -1,119 +0,0 @@ - --module(emqtt_topic). - --include("emqtt.hrl"). - --export([start_link/0, - match/1, - insert/1, - delete/1]). - --behaviour(gen_server). - --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). - --record(state, {}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -match(Topic0) -> - Topic = emqtt_util:binary(Topic0), - Words = topic_split(Topic), - DirectMatches = mnesia:dirty_read(direct_topic, Words), - WildcardMatches = lists:append([ - mnesia:dirty_read(wildcard_topic, Key) || - Key <- mnesia:dirty_all_keys(wildcard_topic), - topic_match(Words, Key) - ]), - DirectMatches ++ WildcardMatches. - -insert(Topic) -> - gen_server:call(?MODULE, {insert, emqtt_util:binary(Topic)}). - -delete(Topic) -> - gen_server:cast(?MODULE, {delete, emqtt_util:binary(Topic)}). - -init([]) -> - {atomic, ok} = mnesia:create_table( - direct_topic, [ - {record_name, topic}, - {ram_copies, [node()]}, - {attributes, record_info(fields, topic)}]), - {atomic, ok} = mnesia:create_table( - wildcard_topic, [ - {record_name, topic}, - {ram_copies, [node()]}, - {attributes, record_info(fields, topic)}]), - ?INFO_MSG("emqtt_topic is started."), - {ok, #state{}}. - -handle_call({insert, Topic}, _From, State) -> - Words = topic_split(Topic), - Reply = - case topic_type(Words) of - direct -> - mnesia:dirty_write(direct_topic, #topic{words=Words, path=Topic}); - wildcard -> - mnesia:dirty_write(wildcard_topic, #topic{words=Words, path=Topic}) - end, - {reply, Reply, State}; - -handle_call(Req, _From, State) -> - {stop, {badreq, Req}, State}. - -handle_cast({delete, Topic}, State) -> - Words = topic_split(Topic), - case topic_type(Words) of - direct -> - mnesia:dirty_delete(direct_topic, #topic{words=Words, path=Topic}); - wildcard -> - mnesia:direct_delete(wildcard_topic, #topic{words=Words, path=Topic}) - end, - {noreply, State}; - -handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. - -handle_info(Info, State) -> - {stop, {badinfo, Info}, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, _State, _Extra) -> - ok. - -topic_type([]) -> - direct; -topic_type([<<"#">>]) -> - wildcard; -topic_type([<<"+">>|_T]) -> - wildcard; -topic_type([_|T]) -> - topic_type(T). - -topic_match([], []) -> - true; - -topic_match([H|T1], [H|T2]) -> - topic_match(T1, T2); - -topic_match([_H|T1], [<<"+">>|T2]) -> - topic_match(T1, T2); - -topic_match(_, [<<"#">>]) -> - true; - -topic_match([], [_H|_T2]) -> - false. - -topic_split(S) -> - binary:split(S, [<<"/">>], [global]). - -