diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 8c28f9775..dcd0b181e 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -28,30 +28,33 @@ -author('feng@emqtt.io'). --define(PRINT_MSG(Msg), io:format(Msg)). - --define(PRINT(Format, Args), io:format(Format, Args)). - -behaviour(application). %% Application callbacks -export([start/2, stop/1]). +-define(SERVICES, [config, + event, + retained, + client, + session, + pubsub, + router, + broker, + metrics, + bridge, + auth, + acl, + monitor]). + +-define(PRINT_MSG(Msg), io:format(Msg)). + +-define(PRINT(Format, Args), io:format(Format, Args)). + %%%============================================================================= %%% Application callbacks %%%============================================================================= -%%------------------------------------------------------------------------------ -%% @private -%% @doc -%% This function is called whenever an application is started using -%% application:start/[1,2], and should start the processes of the -%% application. If the application is structured according to the OTP -%% design principles as a supervision tree, this means starting the -%% top supervisor of the tree. -%% -%% @end -%%------------------------------------------------------------------------------ -spec start(StartType, StartArgs) -> {ok, pid()} | {ok, pid(), State} | {error, Reason} when StartType :: normal | {takeover, node()} | {failover, node()}, StartArgs :: term(), @@ -60,7 +63,7 @@ start(_StartType, _StartArgs) -> print_banner(), {ok, Sup} = emqttd_sup:start_link(), - start_servers(Sup), + start_services(Sup), ok = emqttd_mnesia:wait(), {ok, Listeners} = application:get_env(listen), emqttd:open(Listeners), @@ -76,12 +79,7 @@ print_vsn() -> {ok, Desc} = application:get_key(description), ?PRINT("~s ~s is running now~n", [Desc, Vsn]). -start_servers(Sup) -> - {ok, SessOpts} = application:get_env(session), - {ok, RetainOpts} = application:get_env(retain), - {ok, BrokerOpts} = application:get_env(broker), - {ok, MetricOpts} = application:get_env(metrics), - {ok, AclOpts} = application:get_env(acl), +start_services(Sup) -> lists:foreach( fun({Name, F}) when is_function(F) -> ?PRINT("~s is starting...", [Name]), @@ -95,23 +93,54 @@ start_servers(Sup) -> ?PRINT("~s is starting...", [ Name]), start_child(Sup, Server, Opts), ?PRINT_MSG("[done]~n") - end, - [{"emqttd config", emqttd_config}, - {"emqttd event", emqttd_event}, - {"emqttd server", emqttd_server, RetainOpts}, - {"emqttd client manager", emqttd_cm}, - {"emqttd session manager", emqttd_sm}, - {"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts}, - {"emqttd auth", emqttd_auth}, - {"emqttd pubsub", emqttd_pubsub}, - {"emqttd router", emqttd_router}, - {"emqttd broker", emqttd_broker, BrokerOpts}, - {"emqttd metrics", emqttd_metrics, MetricOpts}, - {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, - {"emqttd acl", emqttd_acl, AclOpts}, - {"emqttd internal acl", emqttd_acl_internal, AclOpts}, - {"emqttd monitor", emqttd_monitor} - ]). + end, lists:flatten([service(Srv) || Srv <- ?SERVICES])). + +service(config) -> + {"emqttd config", emqttd_config}; + +service(event) -> + {"emqttd event", emqttd_event}; + +service(retained) -> + {ok, RetainOpts} = application:get_env(retain), + {"emqttd server", emqttd_server, RetainOpts}; + +service(client) -> + {"emqttd client manager", emqttd_cm}; + +service(session) -> + {ok, SessOpts} = application:get_env(session), + [{"emqttd session manager", emqttd_sm}, + {"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts}]; + +service(pubsub) -> + {"emqttd pubsub", emqttd_pubsub}; + +service(router) -> + {"emqttd router", emqttd_router}; + +service(broker) -> + {ok, BrokerOpts} = application:get_env(broker), + {"emqttd broker", emqttd_broker, BrokerOpts}; + +service(metrics) -> + {ok, MetricOpts} = application:get_env(metrics), + {"emqttd metrics", emqttd_metrics, MetricOpts}; + +service(bridge) -> + {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}; + +service(auth) -> + {ok, AuthMods} = application:get_env(auth), + {"emqttd auth", emqttd_auth, AuthMods}; + +service(acl) -> + {ok, AclOpts} = application:get_env(acl), + [{"emqttd acl", emqttd_acl, AclOpts}, + {"emqttd internal acl", emqttd_acl_internal, AclOpts}]; + +service(monitor) -> + {"emqttd monitor", emqttd_monitor}. start_child(Sup, {supervisor, Name}) -> supervisor:start_child(Sup, supervisor_spec(Name)); @@ -143,15 +172,6 @@ worker_spec(Name, Opts) -> {Name, start_link, [Opts]}, permanent, 5000, worker, [Name]}. -%%------------------------------------------------------------------------------ -%% @private -%% @doc -%% This function is called whenever an application has stopped. It -%% is intended to be the opposite of Module:start/2 and should do -%% any necessary cleaning up. The return value is ignored. -%% -%% @end -%%------------------------------------------------------------------------------ -spec stop(State :: term()) -> term(). stop(_State) -> ok. diff --git a/apps/emqttd/src/emqttd_auth.erl b/apps/emqttd/src/emqttd_auth.erl index 139236547..a4920c38a 100644 --- a/apps/emqttd/src/emqttd_auth.erl +++ b/apps/emqttd/src/emqttd_auth.erl @@ -22,6 +22,8 @@ %%% @doc %%% emqttd authentication. %%% +%%% TODO: +%%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_auth). @@ -30,7 +32,7 @@ -include("emqttd.hrl"). --export([start_link/1, check/2]). +-export([start_link/1, login/2, add_module/2, remove_module/1, all_modules/0, stop/0]). -behavior(gen_server). @@ -38,31 +40,108 @@ -define(AUTH_TABLE, mqtt_auth). -start_link(AuthOpts) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [AuthOpts], []). +%%%============================================================================= +%%% Auth behavihour +%%%============================================================================= --spec check(mqtt_user(), binary()) -> true | false. -check(User, Password) when is_record(User, mqtt_user) -> - [{_, }] = ets:lookup(?AUTH_TABLE, auth_modules), +-ifdef(use_specs). -init([AuthOpts]) -> - AuthMod = authmod(Name), - ok = AuthMod:init(Opts), - ets:new(?TAB, [named_table, protected]), - ets:insert(?TAB, {mod, AuthMod}), - {ok, undefined}. +-callback check(User, Password, State) -> ok | ignore | {error, string()} when + User :: mqtt_user(), + Password :: binary(), + State :: any(). -authmod(Name) when is_atom(Name) -> - list_to_atom(lists:concat(["emqttd_auth_", Name])). +-callback description() -> string(). -handle_call(Req, _From, State) -> - {stop, {badreq, Req}, State}. +-else. -handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. +-export([behaviour_info/1]). -handle_info(Info, State) -> - {stop, {badinfo, Info}, State}. +behaviour_info(callbacks) -> + [{check, 3}, {description, 0}]; +behaviour_info(_Other) -> + undefined. + +-endif. + +-spec start_link(list()) -> {ok, pid()} | ignore | {error, any()}. +start_link(AuthMods) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [AuthMods], []). + +-spec login(mqtt_user(), undefined | binary()) -> ok | {error, string()}. +login(User, Password) when is_record(User, mqtt_user) -> + [{_, AuthMods}] = ets:lookup(?AUTH_TABLE, auth_modules), + check(User, Password, AuthMods). + +check(_User, _Password, []) -> + {error, "No auth module to check!"}; +check(User, Password, [{Mod, State} | Mods]) -> + case Mod:check(User, Password, State) of + ok -> ok; + {error, Reason} -> {error, Reason}; + ignore -> check(User, Password, Mods) + end. + +add_module(Mod, Opts) -> + gen_server:call(?MODULE, {add_module, Mod, Opts}). + +remove_module(Mod) -> + gen_server:call(?MODULE, {remove_module, Mod}). + +all_modules() -> + case ets:lookup(?AUTH_TABLE, auth_modules) of + [] -> []; + [{_, AuthMods}] -> AuthMods + end. + +stop() -> + gen_server:call(?MODULE, stop). + +init([AuthMods]) -> + ets:new(?AUTH_TABLE, [set, named_table, protected]), + Modules = [begin {ok, State} = Mod:init(Opts), + {authmod(Mod), State} end || {Mod, Opts} <- AuthMods], + ets:insert(?AUTH_TABLE, {auth_modules, Modules}), + {ok, state}. + +handle_call({add_module, Mod, Opts}, _From, State) -> + AuthMods = all_modules(), + Reply = + case lists:keyfind(Mod, 1, AuthMods) of + false -> + case catch Mod:init(Opts) of + {ok, ModState} -> + ets:insert(?AUTH_TABLE, {auth_modules, [{Mod, ModState}|AuthMods]}), + ok; + {error, Reason} -> + {error, Reason}; + {'EXIT', Error} -> + {error, Error} + end; + _ -> + {error, existed} + end, + {reply, Reply, State}; + +handle_call({remove_module, Mod}, _From, State) -> + AuthMods = all_modules(), + Reply = + case lists:keyfind(Mod, 1, AuthMods) of + false -> + {error, not_found}; + _ -> + ets:insert(?AUTH_TABLE, {auth_modules, lists:keydelete(Mod, 1, AuthMods)}), ok + end, + {reply, Reply, State}; + +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. terminate(_Reason, _State) -> ok. @@ -70,3 +149,10 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +authmod(Name) when is_atom(Name) -> + list_to_atom(lists:concat(["emqttd_auth_", Name])). + diff --git a/apps/emqttd/src/emqttd_auth_anonymous.erl b/apps/emqttd/src/emqttd_auth_anonymous.erl index 257e5c539..ef206a1ae 100644 --- a/apps/emqttd/src/emqttd_auth_anonymous.erl +++ b/apps/emqttd/src/emqttd_auth_anonymous.erl @@ -28,16 +28,13 @@ -author('feng@emqtt.io'). --export([init/1, add_user/2, check_login/2, del_user/1]). +-behaviour(emqttd_auth). -init(_Opts) -> ok. - -check_login(_, _) -> true. - -add_user(_, _) -> ok. - -del_user(_Username) -> ok. +-export([init/1, check/3, description/0]). +init(Opts) -> {ok, Opts}. +check(_User, _Password, _Opts) -> ok. +description() -> "Anonymous authentication module". diff --git a/apps/emqttd/src/emqttd_auth_clientid.erl b/apps/emqttd/src/emqttd_auth_clientid.erl index 45403ae9d..bc9c5c132 100644 --- a/apps/emqttd/src/emqttd_auth_clientid.erl +++ b/apps/emqttd/src/emqttd_auth_clientid.erl @@ -30,14 +30,65 @@ -include("emqttd.hrl"). --export([init/1]). +-export([add_clientid/1, add_clientid/2, + lookup_clientid/1, remove_clientid/1, + all_clientids/0]). + +-behaviour(emqttd_auth). + +%% emqttd_auth callbacks +-export([init/1, check/3, description/0]). -define(AUTH_CLIENTID_TABLE, mqtt_auth_clientid). +-record(?AUTH_CLIENTID_TABLE, {clientid, password = undefined}). + +add_clientid(ClientId) when is_binary(ClientId) -> + R = #mqtt_auth_clientid{clientid = ClientId}, + mnesia:transaction(fun() -> mnesia:write(R) end). + +add_clientid(ClientId, Password) -> + R = #mqtt_auth_clientid{clientid = ClientId, password = Password}, + mnesia:transaction(fun() -> mnesia:write(R) end). + +lookup_clientid(ClientId) -> + mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId). + +all_clientids() -> + mnesia:dirty_all_keys(?AUTH_CLIENTID_TABLE). + +remove_clientid(ClientId) -> + mnesia:transaction(fun() -> mnesia:delete({?AUTH_CLIENTID_TABLE, ClientId}) end). + init(Opts) -> mnesia:create_table(?AUTH_CLIENTID_TABLE, [ + {type, set}, {disc_copies, [node()]}, - {attributes, record_info(fields, mqtt_user)}]), - mnesia:add_table_copy(?AUTH_CLIENTID_TABLE, node(), ram_copies), + {attributes, record_info(fields, ?AUTH_CLIENTID_TABLE)}]), + mnesia:add_table_copy(?AUTH_CLIENTID_TABLE, node(), disc_copies), {ok, Opts}. +check(#mqtt_user{clientid = ClientId}, _Password, []) -> + check_clientid_only(ClientId); +check(#mqtt_user{clientid = ClientId}, _Password, [{password, no}|_]) -> + check_clientid_only(ClientId); +check(#mqtt_user{clientid = ClientId}, Password, [{password, yes}|_]) -> + case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of + [] -> {error, "ClientId Not Found"}; + [#?AUTH_CLIENTID_TABLE{password = Password}] -> ok; %% TODO: plaintext?? + _ -> {error, "Password Not Right"} + end. + +description() -> "ClientId authentication module". + +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +check_clientid_only(ClientId) -> + case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of + [] -> {error, "ClientId Not Found"}; + _ -> ok + end. + + diff --git a/apps/emqttd/src/emqttd_auth_username.erl b/apps/emqttd/src/emqttd_auth_username.erl index dc7137f8f..c6d1c5b38 100644 --- a/apps/emqttd/src/emqttd_auth_username.erl +++ b/apps/emqttd/src/emqttd_auth_username.erl @@ -30,38 +30,72 @@ -include("emqttd.hrl"). --export([init/1, add/2, check/2, delete/1]). +-export([add_user/2, remove_user/1, + lookup_user/1, all_users/0]). --define(AUTH_USER_TABLE, mqtt_auth_username). +%% emqttd_auth callbacks +-export([init/1, check/3, description/0]). --record(mqtt_auth_username, {username, password}). +-define(AUTH_USERNAME_TABLE, mqtt_auth_username). -init(_Opts) -> - mnesia:create_table(?AUTH_USER_TABLE, [ - {ram_copies, [node()]}, - {attributes, record_info(fields, mqtt_user)}]), - mnesia:add_table_copy(?AUTH_USER_TABLE, node(), ram_copies), - ok. +-record(?AUTH_USERNAME_TABLE, {username, password}). -check(undefined, _) -> false; +%%%============================================================================= +%%% API +%%%============================================================================= -check(_, undefined) -> false; +add_user(Username, Password) -> + R = #?AUTH_USERNAME_TABLE{username = Username, password = hash(Password)}, + mnesia:transaction(fun() -> mnesia:write(R) end). -check(Username, Password) when is_binary(Username), is_binary(Password) -> - PasswdHash = crypto:hash(md5, Password), - case mnesia:dirty_read(?AUTH_USER_TABLE, Username) of - [#mqtt_user{}] -> true; %password=PasswdHash} - _ -> false +lookup_user(Username) -> + mnesia:dirty_read(?AUTH_USERNAME_TABLE, Username). + +remove_user(Username) -> + mnesia:transaction(fun() -> mnesia:delete({?AUTH_USERNAME_TABLE, Username}) end). + +all_users() -> + mnesia:dirty_all_keys(?AUTH_USERNAME_TABLE). + +%%%============================================================================= +%%% emqttd_auth callbacks +%%%============================================================================= +init(Opts) -> + mnesia:create_table(?AUTH_USERNAME_TABLE, [ + {type, set}, + {disc_copies, [node()]}, + {attributes, record_info(fields, ?AUTH_USERNAME_TABLE)}]), + mnesia:add_table_copy(?AUTH_USERNAME_TABLE, node(), disc_copies), + {ok, Opts}. + +check(#mqtt_user{username = Username}, Password, _Opts) -> + case mnesia:dirty_read(?AUTH_USERNAME_TABLE, Username) of + [] -> + {error, "Username Not Found"}; + [#?AUTH_USERNAME_TABLE{password = <>}] -> + case Hash =:= hash(Salt, Password) of + true -> ok; + false -> {error, "Password Not Right"} + end end. -add(Username, Password) when is_binary(Username) and is_binary(Password) -> - mnesia:dirty_write( - #mqtt_user{ - username = Username - %password = crypto:hash(md5, Password) - } - ). +description() -> "Username password authentication module". + +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +hash(Password) -> + hash(salt(), Password). + +hash(SaltBin, Password) -> + Hash = erlang:md5(<>), + <>. + +salt() -> + {A1,A2,A3} = now(), + random:seed(A1, A2, A3), + Salt = random:uniform(16#ffffffff), + <>. -delete(Username) when is_binary(Username) -> - mnesia:dirty_delete(?AUTH_USER_TABLE, Username). diff --git a/rel/files/app.config b/rel/files/app.config index 3fb6493e5..13649098c 100644 --- a/rel/files/app.config +++ b/rel/files/app.config @@ -38,32 +38,48 @@ {logger, {lager, info}} ]}, {emqttd, [ - %Authetication. Internal, Anonymous Default - {auth, [{anonymous, []}]}, - %ACL config - {acl, [{file, "etc/acl.config"}]}, + %% Authetication. , Anonymous Default + {auth, [ + %% authentication with username, password + %{username, []}, + %% authentication with clientid + %{clientid, [{password, no}]}, + %% allow all + {anonymous, []} + ]}, + %% ACL config + {acl, [ + {file, "etc/acl.config"} + ]}, + %% Packet {packet, [ {max_clientid_len, 1024}, {max_packet_size, 16#ffff} ]}, + %% Session {session, [ {expires, 1}, {max_queue, 1000}, {store_qos0, false} ]}, + %% Retain messages {retain, [ {store_limit, 100000} ]}, + %% Broker {broker, [ {sys_interval, 60} ]}, + %% Metrics {metrics, [ {pub_interval, 60} ]}, + %% Bridge {bridge, [ {max_queue_len, 1000}, %NO effect now {ping_down_interval, 1} %seconds ]}, + %% Listen Ports {listen, [ {mqtt, 1883, [ {backlog, 512}, @@ -89,7 +105,7 @@ % Plugins {plugins, [ {emqttd_auth_ldap, [ldap_params]}, - {emqttd_auth_mysql, [mysql_params]}, + {emqttd_auth_mysql, [mysql_params]} ]} ]} ].