This commit is contained in:
Feng Lee 2015-04-07 13:33:05 +08:00
parent 925b45bb47
commit c864944051
6 changed files with 313 additions and 109 deletions

View File

@ -28,30 +28,33 @@
-author('feng@emqtt.io').
-define(PRINT_MSG(Msg), io:format(Msg)).
-define(PRINT(Format, Args), io:format(Format, Args)).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
-define(SERVICES, [config,
event,
retained,
client,
session,
pubsub,
router,
broker,
metrics,
bridge,
auth,
acl,
monitor]).
-define(PRINT_MSG(Msg), io:format(Msg)).
-define(PRINT(Format, Args), io:format(Format, Args)).
%%%=============================================================================
%%% Application callbacks
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @private
%% @doc
%% This function is called whenever an application is started using
%% application:start/[1,2], and should start the processes of the
%% application. If the application is structured according to the OTP
%% design principles as a supervision tree, this means starting the
%% top supervisor of the tree.
%%
%% @end
%%------------------------------------------------------------------------------
-spec start(StartType, StartArgs) -> {ok, pid()} | {ok, pid(), State} | {error, Reason} when
StartType :: normal | {takeover, node()} | {failover, node()},
StartArgs :: term(),
@ -60,7 +63,7 @@
start(_StartType, _StartArgs) ->
print_banner(),
{ok, Sup} = emqttd_sup:start_link(),
start_servers(Sup),
start_services(Sup),
ok = emqttd_mnesia:wait(),
{ok, Listeners} = application:get_env(listen),
emqttd:open(Listeners),
@ -76,12 +79,7 @@ print_vsn() ->
{ok, Desc} = application:get_key(description),
?PRINT("~s ~s is running now~n", [Desc, Vsn]).
start_servers(Sup) ->
{ok, SessOpts} = application:get_env(session),
{ok, RetainOpts} = application:get_env(retain),
{ok, BrokerOpts} = application:get_env(broker),
{ok, MetricOpts} = application:get_env(metrics),
{ok, AclOpts} = application:get_env(acl),
start_services(Sup) ->
lists:foreach(
fun({Name, F}) when is_function(F) ->
?PRINT("~s is starting...", [Name]),
@ -95,23 +93,54 @@ start_servers(Sup) ->
?PRINT("~s is starting...", [ Name]),
start_child(Sup, Server, Opts),
?PRINT_MSG("[done]~n")
end,
[{"emqttd config", emqttd_config},
{"emqttd event", emqttd_event},
{"emqttd server", emqttd_server, RetainOpts},
{"emqttd client manager", emqttd_cm},
{"emqttd session manager", emqttd_sm},
{"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts},
{"emqttd auth", emqttd_auth},
{"emqttd pubsub", emqttd_pubsub},
{"emqttd router", emqttd_router},
{"emqttd broker", emqttd_broker, BrokerOpts},
{"emqttd metrics", emqttd_metrics, MetricOpts},
{"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
{"emqttd acl", emqttd_acl, AclOpts},
{"emqttd internal acl", emqttd_acl_internal, AclOpts},
{"emqttd monitor", emqttd_monitor}
]).
end, lists:flatten([service(Srv) || Srv <- ?SERVICES])).
service(config) ->
{"emqttd config", emqttd_config};
service(event) ->
{"emqttd event", emqttd_event};
service(retained) ->
{ok, RetainOpts} = application:get_env(retain),
{"emqttd server", emqttd_server, RetainOpts};
service(client) ->
{"emqttd client manager", emqttd_cm};
service(session) ->
{ok, SessOpts} = application:get_env(session),
[{"emqttd session manager", emqttd_sm},
{"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts}];
service(pubsub) ->
{"emqttd pubsub", emqttd_pubsub};
service(router) ->
{"emqttd router", emqttd_router};
service(broker) ->
{ok, BrokerOpts} = application:get_env(broker),
{"emqttd broker", emqttd_broker, BrokerOpts};
service(metrics) ->
{ok, MetricOpts} = application:get_env(metrics),
{"emqttd metrics", emqttd_metrics, MetricOpts};
service(bridge) ->
{"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}};
service(auth) ->
{ok, AuthMods} = application:get_env(auth),
{"emqttd auth", emqttd_auth, AuthMods};
service(acl) ->
{ok, AclOpts} = application:get_env(acl),
[{"emqttd acl", emqttd_acl, AclOpts},
{"emqttd internal acl", emqttd_acl_internal, AclOpts}];
service(monitor) ->
{"emqttd monitor", emqttd_monitor}.
start_child(Sup, {supervisor, Name}) ->
supervisor:start_child(Sup, supervisor_spec(Name));
@ -143,15 +172,6 @@ worker_spec(Name, Opts) ->
{Name, start_link, [Opts]},
permanent, 5000, worker, [Name]}.
%%------------------------------------------------------------------------------
%% @private
%% @doc
%% This function is called whenever an application has stopped. It
%% is intended to be the opposite of Module:start/2 and should do
%% any necessary cleaning up. The return value is ignored.
%%
%% @end
%%------------------------------------------------------------------------------
-spec stop(State :: term()) -> term().
stop(_State) ->
ok.

View File

@ -22,6 +22,8 @@
%%% @doc
%%% emqttd authentication.
%%%
%%% TODO:
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_auth).
@ -30,7 +32,7 @@
-include("emqttd.hrl").
-export([start_link/1, check/2]).
-export([start_link/1, login/2, add_module/2, remove_module/1, all_modules/0, stop/0]).
-behavior(gen_server).
@ -38,31 +40,108 @@
-define(AUTH_TABLE, mqtt_auth).
start_link(AuthOpts) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [AuthOpts], []).
%%%=============================================================================
%%% Auth behavihour
%%%=============================================================================
-spec check(mqtt_user(), binary()) -> true | false.
check(User, Password) when is_record(User, mqtt_user) ->
[{_, }] = ets:lookup(?AUTH_TABLE, auth_modules),
-ifdef(use_specs).
init([AuthOpts]) ->
AuthMod = authmod(Name),
ok = AuthMod:init(Opts),
ets:new(?TAB, [named_table, protected]),
ets:insert(?TAB, {mod, AuthMod}),
{ok, undefined}.
-callback check(User, Password, State) -> ok | ignore | {error, string()} when
User :: mqtt_user(),
Password :: binary(),
State :: any().
authmod(Name) when is_atom(Name) ->
list_to_atom(lists:concat(["emqttd_auth_", Name])).
-callback description() -> string().
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
-else.
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
-export([behaviour_info/1]).
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.
behaviour_info(callbacks) ->
[{check, 3}, {description, 0}];
behaviour_info(_Other) ->
undefined.
-endif.
-spec start_link(list()) -> {ok, pid()} | ignore | {error, any()}.
start_link(AuthMods) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [AuthMods], []).
-spec login(mqtt_user(), undefined | binary()) -> ok | {error, string()}.
login(User, Password) when is_record(User, mqtt_user) ->
[{_, AuthMods}] = ets:lookup(?AUTH_TABLE, auth_modules),
check(User, Password, AuthMods).
check(_User, _Password, []) ->
{error, "No auth module to check!"};
check(User, Password, [{Mod, State} | Mods]) ->
case Mod:check(User, Password, State) of
ok -> ok;
{error, Reason} -> {error, Reason};
ignore -> check(User, Password, Mods)
end.
add_module(Mod, Opts) ->
gen_server:call(?MODULE, {add_module, Mod, Opts}).
remove_module(Mod) ->
gen_server:call(?MODULE, {remove_module, Mod}).
all_modules() ->
case ets:lookup(?AUTH_TABLE, auth_modules) of
[] -> [];
[{_, AuthMods}] -> AuthMods
end.
stop() ->
gen_server:call(?MODULE, stop).
init([AuthMods]) ->
ets:new(?AUTH_TABLE, [set, named_table, protected]),
Modules = [begin {ok, State} = Mod:init(Opts),
{authmod(Mod), State} end || {Mod, Opts} <- AuthMods],
ets:insert(?AUTH_TABLE, {auth_modules, Modules}),
{ok, state}.
handle_call({add_module, Mod, Opts}, _From, State) ->
AuthMods = all_modules(),
Reply =
case lists:keyfind(Mod, 1, AuthMods) of
false ->
case catch Mod:init(Opts) of
{ok, ModState} ->
ets:insert(?AUTH_TABLE, {auth_modules, [{Mod, ModState}|AuthMods]}),
ok;
{error, Reason} ->
{error, Reason};
{'EXIT', Error} ->
{error, Error}
end;
_ ->
{error, existed}
end,
{reply, Reply, State};
handle_call({remove_module, Mod}, _From, State) ->
AuthMods = all_modules(),
Reply =
case lists:keyfind(Mod, 1, AuthMods) of
false ->
{error, not_found};
_ ->
ets:insert(?AUTH_TABLE, {auth_modules, lists:keydelete(Mod, 1, AuthMods)}), ok
end,
{reply, Reply, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
@ -70,3 +149,10 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
authmod(Name) when is_atom(Name) ->
list_to_atom(lists:concat(["emqttd_auth_", Name])).

View File

@ -28,16 +28,13 @@
-author('feng@emqtt.io').
-export([init/1, add_user/2, check_login/2, del_user/1]).
-behaviour(emqttd_auth).
init(_Opts) -> ok.
check_login(_, _) -> true.
add_user(_, _) -> ok.
del_user(_Username) -> ok.
-export([init/1, check/3, description/0]).
init(Opts) -> {ok, Opts}.
check(_User, _Password, _Opts) -> ok.
description() -> "Anonymous authentication module".

View File

@ -30,14 +30,65 @@
-include("emqttd.hrl").
-export([init/1]).
-export([add_clientid/1, add_clientid/2,
lookup_clientid/1, remove_clientid/1,
all_clientids/0]).
-behaviour(emqttd_auth).
%% emqttd_auth callbacks
-export([init/1, check/3, description/0]).
-define(AUTH_CLIENTID_TABLE, mqtt_auth_clientid).
-record(?AUTH_CLIENTID_TABLE, {clientid, password = undefined}).
add_clientid(ClientId) when is_binary(ClientId) ->
R = #mqtt_auth_clientid{clientid = ClientId},
mnesia:transaction(fun() -> mnesia:write(R) end).
add_clientid(ClientId, Password) ->
R = #mqtt_auth_clientid{clientid = ClientId, password = Password},
mnesia:transaction(fun() -> mnesia:write(R) end).
lookup_clientid(ClientId) ->
mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId).
all_clientids() ->
mnesia:dirty_all_keys(?AUTH_CLIENTID_TABLE).
remove_clientid(ClientId) ->
mnesia:transaction(fun() -> mnesia:delete({?AUTH_CLIENTID_TABLE, ClientId}) end).
init(Opts) ->
mnesia:create_table(?AUTH_CLIENTID_TABLE, [
{type, set},
{disc_copies, [node()]},
{attributes, record_info(fields, mqtt_user)}]),
mnesia:add_table_copy(?AUTH_CLIENTID_TABLE, node(), ram_copies),
{attributes, record_info(fields, ?AUTH_CLIENTID_TABLE)}]),
mnesia:add_table_copy(?AUTH_CLIENTID_TABLE, node(), disc_copies),
{ok, Opts}.
check(#mqtt_user{clientid = ClientId}, _Password, []) ->
check_clientid_only(ClientId);
check(#mqtt_user{clientid = ClientId}, _Password, [{password, no}|_]) ->
check_clientid_only(ClientId);
check(#mqtt_user{clientid = ClientId}, Password, [{password, yes}|_]) ->
case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of
[] -> {error, "ClientId Not Found"};
[#?AUTH_CLIENTID_TABLE{password = Password}] -> ok; %% TODO: plaintext??
_ -> {error, "Password Not Right"}
end.
description() -> "ClientId authentication module".
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
check_clientid_only(ClientId) ->
case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of
[] -> {error, "ClientId Not Found"};
_ -> ok
end.

View File

@ -30,38 +30,72 @@
-include("emqttd.hrl").
-export([init/1, add/2, check/2, delete/1]).
-export([add_user/2, remove_user/1,
lookup_user/1, all_users/0]).
-define(AUTH_USER_TABLE, mqtt_auth_username).
%% emqttd_auth callbacks
-export([init/1, check/3, description/0]).
-record(mqtt_auth_username, {username, password}).
-define(AUTH_USERNAME_TABLE, mqtt_auth_username).
init(_Opts) ->
mnesia:create_table(?AUTH_USER_TABLE, [
{ram_copies, [node()]},
{attributes, record_info(fields, mqtt_user)}]),
mnesia:add_table_copy(?AUTH_USER_TABLE, node(), ram_copies),
ok.
-record(?AUTH_USERNAME_TABLE, {username, password}).
check(undefined, _) -> false;
%%%=============================================================================
%%% API
%%%=============================================================================
check(_, undefined) -> false;
add_user(Username, Password) ->
R = #?AUTH_USERNAME_TABLE{username = Username, password = hash(Password)},
mnesia:transaction(fun() -> mnesia:write(R) end).
check(Username, Password) when is_binary(Username), is_binary(Password) ->
PasswdHash = crypto:hash(md5, Password),
case mnesia:dirty_read(?AUTH_USER_TABLE, Username) of
[#mqtt_user{}] -> true; %password=PasswdHash}
_ -> false
lookup_user(Username) ->
mnesia:dirty_read(?AUTH_USERNAME_TABLE, Username).
remove_user(Username) ->
mnesia:transaction(fun() -> mnesia:delete({?AUTH_USERNAME_TABLE, Username}) end).
all_users() ->
mnesia:dirty_all_keys(?AUTH_USERNAME_TABLE).
%%%=============================================================================
%%% emqttd_auth callbacks
%%%=============================================================================
init(Opts) ->
mnesia:create_table(?AUTH_USERNAME_TABLE, [
{type, set},
{disc_copies, [node()]},
{attributes, record_info(fields, ?AUTH_USERNAME_TABLE)}]),
mnesia:add_table_copy(?AUTH_USERNAME_TABLE, node(), disc_copies),
{ok, Opts}.
check(#mqtt_user{username = Username}, Password, _Opts) ->
case mnesia:dirty_read(?AUTH_USERNAME_TABLE, Username) of
[] ->
{error, "Username Not Found"};
[#?AUTH_USERNAME_TABLE{password = <<Salt:4/binary, Hash>>}] ->
case Hash =:= hash(Salt, Password) of
true -> ok;
false -> {error, "Password Not Right"}
end
end.
add(Username, Password) when is_binary(Username) and is_binary(Password) ->
mnesia:dirty_write(
#mqtt_user{
username = Username
%password = crypto:hash(md5, Password)
}
).
description() -> "Username password authentication module".
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
hash(Password) ->
hash(salt(), Password).
hash(SaltBin, Password) ->
Hash = erlang:md5(<<SaltBin/binary, Password/binary>>),
<<SaltBin/binary, Hash/binary>>.
salt() ->
{A1,A2,A3} = now(),
random:seed(A1, A2, A3),
Salt = random:uniform(16#ffffffff),
<<Salt:32>>.
delete(Username) when is_binary(Username) ->
mnesia:dirty_delete(?AUTH_USER_TABLE, Username).

View File

@ -38,32 +38,48 @@
{logger, {lager, info}}
]},
{emqttd, [
%Authetication. Internal, Anonymous Default
{auth, [{anonymous, []}]},
%ACL config
{acl, [{file, "etc/acl.config"}]},
%% Authetication. , Anonymous Default
{auth, [
%% authentication with username, password
%{username, []},
%% authentication with clientid
%{clientid, [{password, no}]},
%% allow all
{anonymous, []}
]},
%% ACL config
{acl, [
{file, "etc/acl.config"}
]},
%% Packet
{packet, [
{max_clientid_len, 1024},
{max_packet_size, 16#ffff}
]},
%% Session
{session, [
{expires, 1},
{max_queue, 1000},
{store_qos0, false}
]},
%% Retain messages
{retain, [
{store_limit, 100000}
]},
%% Broker
{broker, [
{sys_interval, 60}
]},
%% Metrics
{metrics, [
{pub_interval, 60}
]},
%% Bridge
{bridge, [
{max_queue_len, 1000}, %NO effect now
{ping_down_interval, 1} %seconds
]},
%% Listen Ports
{listen, [
{mqtt, 1883, [
{backlog, 512},
@ -89,7 +105,7 @@
% Plugins
{plugins, [
{emqttd_auth_ldap, [ldap_params]},
{emqttd_auth_mysql, [mysql_params]},
{emqttd_auth_mysql, [mysql_params]}
]}
]}
].