Merge branch 'dev' of github.com:emqtt/emqtt into dev

This commit is contained in:
Ery Lee 2015-04-17 01:47:37 +08:00
commit 189a16d55e
17 changed files with 395 additions and 426 deletions

View File

@ -0,0 +1,243 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd authentication and ACL server.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_access_control).
-author('feng@emqtt.io').
-include("emqttd.hrl").
-behaviour(gen_server).
-define(SERVER, ?MODULE).
%% API Function Exports
-export([start_link/1,
auth/2, % authentication
check_acl/3, % acl check
reload_acl/0, % reload acl
register_mod/3,
unregister_mod/2,
lookup_mods/1,
stop/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(ACCESS_CONTROL_TAB, mqtt_access_control).
%%%=============================================================================
%%% API
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% Start access control server.
%%
%% @end
%%------------------------------------------------------------------------------
-spec start_link(AcOpts :: list()) -> {ok, pid()} | ignore | {error, any()}.
start_link(AcOpts) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [AcOpts], []).
%%------------------------------------------------------------------------------
%% @doc
%% Authenticate client.
%%
%% @end
%%------------------------------------------------------------------------------
-spec auth(mqtt_client(), undefined | binary()) -> ok | {error, string()}.
auth(Client, Password) when is_record(Client, mqtt_client) ->
auth(Client, Password, lookup_mods(auth)).
auth(_Client, _Password, []) ->
{error, "No auth module to check!"};
auth(Client, Password, [{Mod, State} | Mods]) ->
case Mod:check(Client, Password, State) of
ok -> ok;
{error, Reason} -> {error, Reason};
ignore -> auth(Client, Password, Mods)
end.
%%------------------------------------------------------------------------------
%% @doc
%% Check ACL.
%%
%% @end
%%------------------------------------------------------------------------------
-spec check_acl(Client, PubSub, Topic) -> allow | deny when
Client :: mqtt_client(),
PubSub :: pubsub(),
Topic :: binary().
check_acl(Client, PubSub, Topic) when PubSub =:= publish orelse PubSub =:= subscribe ->
case lookup_mods(acl) of
[] -> allow;
AclMods -> check_acl(Client, PubSub, Topic, AclMods)
end.
check_acl(#mqtt_client{clientid = ClientId}, PubSub, Topic, []) ->
lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]),
allow;
check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) ->
case M:check_acl({Client, PubSub, Topic}, State) of
allow -> allow;
deny -> deny;
ignore -> check_acl(Client, PubSub, Topic, AclMods)
end.
%%------------------------------------------------------------------------------
%% @doc
%% Reload ACL.
%%
%% @end
%%------------------------------------------------------------------------------
-spec reload_acl() -> list() | {error, any()}.
reload_acl() ->
[M:reload_acl(State) || {M, State} <- lookup_mods(acl)].
%%------------------------------------------------------------------------------
%% @doc
%% Register auth or ACL module.
%%
%% @end
%%------------------------------------------------------------------------------
-spec register_mod(Type :: auth | acl, Mod :: atom(), Opts :: list()) -> ok | {error, any()}.
register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl->
gen_server:call(?SERVER, {register_mod, Type, Mod, Opts}).
%%------------------------------------------------------------------------------
%% @doc
%% Unregister auth or ACL module.
%%
%% @end
%%------------------------------------------------------------------------------
-spec unregister_mod(Type :: auth | acl, Mod :: atom()) -> ok | {error, any()}.
unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl ->
gen_server:call(?SERVER, {unregister_mod, Type, Mod}).
%%------------------------------------------------------------------------------
%% @doc
%% Lookup authentication or ACL modules.
%%
%% @end
%%------------------------------------------------------------------------------
-spec lookup_mods(auth | acl) -> list().
lookup_mods(Type) ->
case ets:lookup(?ACCESS_CONTROL_TAB, tab_key(Type)) of
[] -> [];
[{_, Mods}] -> Mods
end.
tab_key(auth) ->
auth_modules;
tab_key(acl) ->
acl_modules.
%%------------------------------------------------------------------------------
%% @doc
%% Stop access control server.
%%
%% @end
%%------------------------------------------------------------------------------
stop() ->
gen_server:call(?MODULE, stop).
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([AcOpts]) ->
ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]),
ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, AcOpts))}),
ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, AcOpts))}),
{ok, state}.
init_mods(auth, AuthMods) ->
[init_mod(fun authmod/1, Name, Opts) || {Name, Opts} <- AuthMods];
init_mods(acl, AclMods) ->
[init_mod(fun aclmod/1, Name, Opts) || {Name, Opts} <- AclMods].
init_mod(Fun, Name, Opts) ->
Module = Fun(Name),
{ok, State} = Module:init(Opts),
{Module, State}.
handle_call({register_mod, Type, Mod, Opts}, _From, State) ->
Mods = lookup_mods(Type),
Reply =
case lists:keyfind(Mod, 1, Mods) of
false ->
case catch Mod:init(Opts) of
{ok, ModState} ->
ets:insert(?ACCESS_CONTROL_TAB, {tab_key(Type), [{Mod, ModState}|Mods]}),
ok;
{'EXIT', Error} ->
{error, Error}
end;
_ ->
{error, existed}
end,
{reply, Reply, State};
handle_call({unregister_mod, Type, Mod}, _From, State) ->
Mods = lookup_mods(Type),
Reply =
case lists:keyfind(Mod, 1, Mods) of
false ->
{error, not_found};
_ ->
ets:insert(?ACCESS_CONTROL_TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}), ok
end,
{reply, Reply, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(Req, _From, State) ->
lager:error("Bad Request: ~p", [Req]),
{reply, {error, badreq}, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
authmod(Name) when is_atom(Name) ->
list_to_atom(lists:concat(["emqttd_auth_", Name])).
aclmod(Name) when is_atom(Name) ->
list_to_atom(lists:concat(["emqttd_acl_", Name])).

View File

@ -26,6 +26,8 @@
%%%-----------------------------------------------------------------------------
-module(emqttd_access_rule).
-author('feng@emqtt.io').
-include("emqttd.hrl").
-type who() :: all | binary() |

View File

@ -1,208 +0,0 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd ACL.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_acl).
-author('feng@emqtt.io').
-include("emqttd.hrl").
-behaviour(gen_server).
-define(SERVER, ?MODULE).
%% API Function Exports
-export([start_link/1, check/1, reload/0,
register_mod/2, unregister_mod/1, all_modules/0,
stop/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(ACL_TABLE, mqtt_acl).
%%%=============================================================================
%%% ACL behavihour
%%%=============================================================================
-ifdef(use_specs).
-callback init(AclOpts :: list()) -> {ok, State :: any()}.
-callback check_acl({Client, PubSub, Topic}, State :: any()) -> allow | deny | ignore when
Client :: mqtt_client(),
PubSub :: pubsub(),
Topic :: binary().
-callback reload_acl(State :: any()) -> ok | {error, any()}.
-callback description() -> string().
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
[{init, 1}, {check_acl, 2}, {reload_acl, 1}, {description, 0}];
behaviour_info(_Other) ->
undefined.
-endif.
%%%=============================================================================
%%% API
%%%=============================================================================
%% @doc Start ACL Server.
-spec start_link(AclMods :: list()) -> {ok, pid()} | ignore | {error, any()}.
start_link(AclMods) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [AclMods], []).
%% @doc Check ACL.
-spec check({Client, PubSub, Topic}) -> allow | deny when
Client :: mqtt_client(),
PubSub :: pubsub(),
Topic :: binary().
check({Client, PubSub, Topic}) when PubSub =:= publish orelse PubSub =:= subscribe ->
case all_modules() of
[] -> allow;
[{_, AclMods}] -> check({Client, PubSub, Topic}, AclMods)
end.
check({#mqtt_client{clientid = ClientId}, PubSub, Topic}, []) ->
lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]),
allow;
check({Client, PubSub, Topic}, [{M, State}|AclMods]) ->
case M:check_acl({Client, PubSub, Topic}, State) of
allow -> allow;
deny -> deny;
ignore -> check({Client, PubSub, Topic}, AclMods)
end.
%% @doc Reload ACL.
-spec reload() -> list() | {error, any()}.
reload() ->
case ets:lookup(?ACL_TABLE, acl_modules) of
[] ->
{error, "No ACL modules!"};
[{_, AclMods}] ->
[M:reload_acl(State) || {M, State} <- AclMods]
end.
%% @doc Register ACL Module.
-spec register_mod(AclMod :: atom(), Opts :: list()) -> ok | {error, any()}.
register_mod(AclMod, Opts) ->
gen_server:call(?SERVER, {register_mod, AclMod, Opts}).
%% @doc Unregister ACL Module.
-spec unregister_mod(AclMod :: atom()) -> ok | {error, any()}.
unregister_mod(AclMod) ->
gen_server:call(?SERVER, {unregister_mod, AclMod}).
%% @doc All ACL Modules.
-spec all_modules() -> list().
all_modules() ->
case ets:lookup(?ACL_TABLE, acl_modules) of
[] -> [];
[{_, AclMods}] -> AclMods
end.
%% @doc Stop ACL server.
-spec stop() -> ok.
stop() ->
gen_server:call(?SERVER, stop).
%%%=============================================================================
%%% gen_server callbacks.
%%%=============================================================================
init([AclMods]) ->
ets:new(?ACL_TABLE, [set, protected, named_table, {read_concurrency, true}]),
AclMods1 = lists:map(
fun({M, Opts}) ->
AclMod = aclmod(M),
{ok, State} = AclMod:init(Opts),
{AclMod, State}
end, AclMods),
ets:insert(?ACL_TABLE, {acl_modules, AclMods1}),
{ok, state}.
handle_call({register_mod, Mod, Opts}, _From, State) ->
AclMods = all_modules(),
Reply =
case lists:keyfind(Mod, 1, AclMods) of
false ->
case catch Mod:init(Opts) of
{ok, ModState} ->
ets:insert(?ACL_TABLE, {acl_modules, [{Mod, ModState}|AclMods]}),
ok;
{'EXIT', Error} ->
{error, Error}
end;
_ ->
{error, existed}
end,
{reply, Reply, State};
handle_call({unregister_mod, Mod}, _From, State) ->
AclMods = all_modules(),
Reply =
case lists:keyfind(Mod, 1, AclMods) of
false ->
{error, not_found};
_ ->
ets:insert(?ACL_TABLE, {acl_modules, lists:keydelete(Mod, 1, AclMods)}), ok
end,
{reply, Reply, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(Req, _From, State) ->
lager:error("Bad Request: ~p", [Req]),
{reply, {error, badreq}, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
aclmod(Name) when is_atom(Name) ->
list_to_atom(lists:concat(["emqttd_acl_", Name])).

View File

@ -32,7 +32,7 @@
-export([all_rules/0]).
-behaviour(emqttd_acl).
-behaviour(emqttd_acl_mod).
%% ACL callbacks
-export([init/1, check_acl/2, reload_acl/1, description/0]).

View File

@ -0,0 +1,60 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd ACL behaviour.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_acl_mod).
-author('feng@emqtt.io').
-include("emqttd.hrl").
%%%=============================================================================
%%% ACL behavihour
%%%=============================================================================
-ifdef(use_specs).
-callback init(AclOpts :: list()) -> {ok, State :: any()}.
-callback check_acl({Client, PubSub, Topic}, State :: any()) -> allow | deny | ignore when
Client :: mqtt_client(),
PubSub :: pubsub(),
Topic :: binary().
-callback reload_acl(State :: any()) -> ok | {error, any()}.
-callback description() -> string().
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
[{init, 1}, {check_acl, 2}, {reload_acl, 1}, {description, 0}];
behaviour_info(_Other) ->
undefined.
-endif.

View File

@ -43,8 +43,7 @@
broker,
metrics,
bridge,
auth,
acl,
access_control,
sysmon]).
-define(PRINT_MSG(Msg), io:format(Msg)).
@ -121,12 +120,9 @@ server(metrics) ->
{"emqttd metrics", emqttd_metrics, MetricOpts};
server(bridge) ->
{"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}};
server(auth) ->
{ok, AuthMods} = application:get_env(auth),
{"emqttd auth", emqttd_auth, AuthMods};
server(acl) ->
{ok, AclOpts} = application:get_env(acl),
{"emqttd acl", emqttd_acl, AclOpts};
server(access_control) ->
{ok, AcOpts} = application:get_env(access_control),
{"emqttd access control", emqttd_access_control, AcOpts};
server(sysmon) ->
{"emqttd system monitor", emqttd_sysmon}.

View File

@ -1,187 +0,0 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd authentication.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_auth).
-author('feng@emqtt.io').
-include("emqttd.hrl").
-export([start_link/1, check/2,
register_mod/2, unregister_mod/1, all_modules/0,
stop/0]).
-behavior(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(AUTH_TAB, mqtt_auth).
%%%=============================================================================
%%% Auth behavihour
%%%=============================================================================
-ifdef(use_specs).
-callback check(Client, Password, State) -> ok | ignore | {error, string()} when
Client :: mqtt_client(),
Password :: binary(),
State :: any().
-callback description() -> string().
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
[{check, 3}, {description, 0}];
behaviour_info(_Other) ->
undefined.
-endif.
%%------------------------------------------------------------------------------
%% @doc
%% Start authentication server.
%%
%% @end
%%------------------------------------------------------------------------------
-spec start_link(list()) -> {ok, pid()} | ignore | {error, any()}.
start_link(AuthMods) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [AuthMods], []).
%%------------------------------------------------------------------------------
%% @doc
%% Authenticate client.
%%
%% @end
%%------------------------------------------------------------------------------
-spec check(mqtt_client(), undefined | binary()) -> ok | {error, string()}.
check(Client, Password) when is_record(Client, mqtt_client) ->
check(Client, Password, all_modules()).
check(_Client, _Password, []) ->
{error, "No auth module to check!"};
check(Client, Password, [{Mod, State} | Mods]) ->
case Mod:check(Client, Password, State) of
ok -> ok;
{error, Reason} -> {error, Reason};
ignore -> check(Client, Password, Mods)
end.
%%------------------------------------------------------------------------------
%% @doc
%% Register authentication module.
%%
%% @end
%%------------------------------------------------------------------------------
-spec register_mod(Mod :: atom(), Opts :: any()) -> ok | {error, any()}.
register_mod(Mod, Opts) ->
gen_server:call(?MODULE, {register_mod, Mod, Opts}).
%%------------------------------------------------------------------------------
%% @doc
%% Unregister authentication module.
%%
%% @end
%%------------------------------------------------------------------------------
-spec unregister_mod(Mod :: atom()) -> ok | {error, any()}.
unregister_mod(Mod) ->
gen_server:call(?MODULE, {unregister_mod, Mod}).
all_modules() ->
case ets:lookup(?AUTH_TAB, auth_modules) of
[] -> [];
[{_, AuthMods}] -> AuthMods
end.
stop() ->
gen_server:call(?MODULE, stop).
init([AuthMods]) ->
ets:new(?AUTH_TAB, [set, named_table, protected, {read_concurrency, true}]),
Modules = lists:map(
fun({Mod, Opts}) ->
AuthMod = authmod(Mod),
{ok, State} = AuthMod:init(Opts),
{AuthMod, State}
end, AuthMods),
ets:insert(?AUTH_TAB, {auth_modules, Modules}),
{ok, state}.
handle_call({register_mod, Mod, Opts}, _From, State) ->
AuthMods = all_modules(),
Reply =
case lists:keyfind(Mod, 1, AuthMods) of
false ->
case catch Mod:init(Opts) of
{ok, ModState} ->
ets:insert(?AUTH_TAB, {auth_modules, [{Mod, ModState}|AuthMods]}),
ok;
{error, Reason} ->
{error, Reason};
{'EXIT', Error} ->
{error, Error}
end;
_ ->
{error, existed}
end,
{reply, Reply, State};
handle_call({unregister_mod, Mod}, _From, State) ->
AuthMods = all_modules(),
Reply =
case lists:keyfind(Mod, 1, AuthMods) of
false ->
{error, not_found};
_ ->
ets:insert(?AUTH_TAB, {auth_modules, lists:keydelete(Mod, 1, AuthMods)}), ok
end,
{reply, Reply, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
authmod(Name) when is_atom(Name) ->
list_to_atom(lists:concat(["emqttd_auth_", Name])).

View File

@ -28,7 +28,7 @@
-author('feng@emqtt.io').
-behaviour(emqttd_auth).
-behaviour(emqttd_auth_mod).
-export([init/1, check/3, description/0]).

View File

@ -34,7 +34,7 @@
lookup_clientid/1, remove_clientid/1,
all_clientids/0]).
-behaviour(emqttd_auth).
-behaviour(emqttd_auth_mod).
%% emqttd_auth callbacks
-export([init/1, check/3, description/0]).

View File

@ -0,0 +1,56 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd authentication behaviour.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_auth_mod).
-author('feng@emqtt.io').
-include("emqttd.hrl").
%%%=============================================================================
%%% Auth behavihour
%%%=============================================================================
-ifdef(use_specs).
-callback check(Client, Password, State) -> ok | ignore | {error, string()} when
Client :: mqtt_client(),
Password :: binary(),
State :: any().
-callback description() -> string().
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
[{check, 3}, {description, 0}];
behaviour_info(_Other) ->
undefined.
-endif.

View File

@ -30,6 +30,8 @@
-include("emqttd.hrl").
-behaviour(emqttd_auth_mod).
-export([add_user/2, remove_user/1,
lookup_user/1, all_users/0]).

View File

@ -78,7 +78,7 @@ authorized(Req) ->
false;
"Basic " ++ BasicAuth ->
{Username, Password} = user_passwd(BasicAuth),
case emqttd_auth:login(#mqtt_client{username = Username}, Password) of
case emqttd_access_control:auth(#mqtt_client{username = Username}, Password) of
ok ->
true;
{error, Reason} ->

View File

@ -45,3 +45,4 @@ merge(Defaults, Options) ->
end, Defaults, Options).

View File

@ -123,7 +123,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
case validate_connect(Var, State) of
?CONNACK_ACCEPT ->
Client = #mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr},
case emqttd_auth:login(Client, Password) of
case emqttd_access_control:auth(Client, Password) of
ok ->
ClientId1 = clientid(ClientId, State),
start_keepalive(KeepAlive),
@ -146,7 +146,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) ->
case emqttd_acl:check({client(State), publish, Topic}) of
case emqttd_access_control:check_acl(client(State), publish, Topic) of
allow ->
emqttd_session:publish(Session, ClientId, {?QOS_0, emqtt_message:from_packet(Packet)});
deny ->
@ -156,7 +156,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) ->
case emqttd_acl:check({client(State), publish, Topic}) of
case emqttd_access_control:check_acl(client(State), publish, Topic) of
allow ->
emqttd_session:publish(Session, ClientId, {?QOS_1, emqtt_message:from_packet(Packet)}),
send(?PUBACK_PACKET(?PUBACK, PacketId), State);
@ -167,7 +167,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) ->
case emqttd_acl:check({client(State), publish, Topic}) of
case emqttd_access_control:check_acl(client(State), publish, Topic) of
allow ->
NewSession = emqttd_session:publish(Session, ClientId, {?QOS_2, emqtt_message:from_packet(Packet)}),
send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession});
@ -191,7 +191,7 @@ handle(?PUBACK_PACKET(Type, PacketId), State = #proto_state{session = Session})
{ok, NewState};
handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = ClientId, session = Session}) ->
AllowDenies = [emqttd_acl:check({client(State), subscribe, Topic}) || {Topic, _Qos} <- TopicTable],
AllowDenies = [emqttd_access_control:check_acl(client(State), subscribe, Topic) || {Topic, _Qos} <- TopicTable],
case lists:member(deny, AllowDenies) of
true ->
%%TODO: return 128 QoS when deny...

View File

@ -186,6 +186,7 @@ subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topi
end,
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics),
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
lager:info("Client ~s subscribe ~p. Granted QoS: ~p", [ClientId, Topics, GrantedQos]),
%%TODO: should be gen_event and notification...
[emqttd_msg_store:redeliver(Name, self()) || {Name, _} <- Topics],
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
@ -209,6 +210,7 @@ unsubscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, To
end,
%%unsubscribe from topic tree
ok = emqttd_pubsub:unsubscribe(Topics),
lager:info("Client ~s unsubscribe ~p.", [ClientId, Topics]),
SubMap1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc) end, SubMap, Topics),
{ok, SessState#session_state{submap = SubMap1}};

2
go
View File

@ -2,4 +2,4 @@
# -*- tab-width:4;indent-tabs-mode:nil -*-
# ex: ts=4 sw=4 et
make && make dist && cd rel/emqtt && ./bin/emqtt console
make && make dist && cd rel/emqttd && ./bin/emqttd console

View File

@ -40,6 +40,7 @@
{logger, {lager, info}}
]},
{emqttd, [
{access_control, [
%% Authetication. , Anonymous Default
{auth, [
%% authentication with username, password
@ -53,6 +54,7 @@
{acl, [
%% User internal ACL module
{internal, [{file, "etc/acl.config"}, {nomatch, allow}]}
]}
]},
%% Packet
{packet, [