Add emqx_types module and 'credentials' type

This commit is contained in:
Feng Lee 2018-08-26 16:24:51 +08:00
parent 03d2d24949
commit 0e3728c940
8 changed files with 201 additions and 164 deletions

View File

@ -70,18 +70,26 @@
-type(topic_table() :: [{topic(), subopts()}]).
%%--------------------------------------------------------------------
%% Client and Session
%% Zone, Credentials, Client and Session
%%--------------------------------------------------------------------
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()).
-type(peername() :: {inet:ip_address(), inet:port_number()}).
-type(zone() :: atom()).
-type(client_id() :: binary() | atom()).
-type(username() :: binary() | atom()).
-type(username() :: binary() | undefined).
-type(zone() :: atom()).
-type(password() :: binary() | undefined).
-type(peername() :: {inet:ip_address(), inet:port_number()}).
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()).
-type(credentials() :: #{client_id := binary(),
username := binary(),
peername := peername(),
zone => zone(),
atom() => term()}).
-record(client, {
id :: client_id(),

View File

@ -20,9 +20,9 @@
-export([start_link/0]).
-export([authenticate/2]).
-export([check_acl/3, reload_acl/0, lookup_mods/1]).
-export([clean_acl_cache/1, clean_acl_cache/2]).
-export([check_acl/3, reload_acl/0]).
-export([register_mod/3, register_mod/4, unregister_mod/2]).
-export([lookup_mods/1]).
-export([stop/0]).
%% gen_server callbacks
@ -32,8 +32,6 @@
-define(TAB, ?MODULE).
-define(SERVER, ?MODULE).
-type(password() :: undefined | binary()).
-record(state, {}).
%%------------------------------------------------------------------------------
@ -41,81 +39,88 @@
%%------------------------------------------------------------------------------
%% @doc Start access control server.
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
-spec(start_link() -> {ok, pid()} | {error, term()}).
start_link() ->
start_with(fun register_default_acl/0).
start_with(Fun) ->
case gen_server:start_link({local, ?SERVER}, ?MODULE, [], []) of
{ok, Pid} ->
ok = register_default_mod(),
{ok, Pid};
Fun(), {ok, Pid};
{error, Reason} ->
{error, Reason}
end.
register_default_mod() ->
register_default_acl() ->
case emqx_config:get_env(acl_file) of
undefined -> ok;
File ->
emqx_access_control:register_mod(acl, emqx_acl_internal, [File])
File -> register_mod(acl, emqx_acl_internal, [File])
end.
%% @doc Authenticate Client.
-spec(authenticate(Client :: client(), Password :: password())
-> ok | {ok, boolean()} | {error, term()}).
authenticate(Client, Password) when is_record(Client, client) ->
authenticate(Client, Password, lookup_mods(auth)).
-spec(authenticate(credentials(), password())
-> ok | {ok, map()} | {continue, map()} | {error, term()}).
authenticate(Credentials, Password) ->
authenticate(Credentials, Password, lookup_mods(auth)).
authenticate(#client{zone = Zone}, _Password, []) ->
authenticate(Credentials, _Password, []) ->
Zone = maps:get(zone, Credentials, undefined),
case emqx_zone:get_env(Zone, allow_anonymous, false) of
true -> ok;
false -> {error, "No auth module to check!"}
false -> {error, auth_modules_not_found}
end;
authenticate(Client, Password, [{Mod, State, _Seq} | Mods]) ->
case catch Mod:check(Client, Password, State) of
authenticate(Credentials, Password, [{Mod, State, _Seq} | Mods]) ->
case catch Mod:check(Credentials, Password, State) of
ok -> ok;
{ok, IsSuper} -> {ok, IsSuper};
ignore -> authenticate(Client, Password, Mods);
{error, Reason} -> {error, Reason};
{'EXIT', Error} -> {error, Error}
{ok, IsSuper} when is_boolean(IsSuper) ->
{ok, #{is_superuser => IsSuper}};
{ok, Result} when is_map(Result) ->
{ok, Result};
{continue, Result} when is_map(Result) ->
{continue, Result};
ignore ->
authenticate(Credentials, Password, Mods);
{error, Reason} ->
{error, Reason};
{'EXIT', Error} ->
{error, Error}
end.
%% @doc Check ACL
-spec(check_acl(client(), pubsub(), topic()) -> allow | deny).
check_acl(Client, PubSub, Topic) when ?PS(PubSub) ->
check_acl(Client, PubSub, Topic, lookup_mods(acl)).
-spec(check_acl(credentials(), pubsub(), topic()) -> allow | deny).
check_acl(Credentials, PubSub, Topic) when ?PS(PubSub) ->
check_acl(Credentials, PubSub, Topic, lookup_mods(acl)).
check_acl(#client{zone = Zone}, _PubSub, _Topic, []) ->
check_acl(Credentials, _PubSub, _Topic, []) ->
Zone = maps:get(zone, Credentials, undefined),
emqx_zone:get_env(Zone, acl_nomatch, deny);
check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
case Mod:check_acl({Client, PubSub, Topic}, State) of
check_acl(Credentials, PubSub, Topic, [{Mod, State, _Seq}|Mods]) ->
case Mod:check_acl({Credentials, PubSub, Topic}, State) of
ignore -> check_acl(Credentials, PubSub, Topic, Mods);
allow -> allow;
deny -> deny;
ignore -> check_acl(Client, PubSub, Topic, AclMods)
deny -> deny
end.
%% @doc Reload ACL Rules.
-spec(reload_acl() -> list(ok | {error, already_existed})).
-spec(reload_acl() -> list(ok | {error, term()})).
reload_acl() ->
[Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)].
%% @doc Register Authentication or ACL module.
-spec(register_mod(auth | acl, atom(), list()) -> ok | {error, term()}).
%% @doc Register an Auth/ACL module.
-spec(register_mod(auth | acl, module(), list()) -> ok | {error, term()}).
register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl ->
register_mod(Type, Mod, Opts, 0).
-spec(register_mod(auth | acl, atom(), list(), non_neg_integer())
-spec(register_mod(auth | acl, module(), list(), non_neg_integer())
-> ok | {error, term()}).
register_mod(Type, Mod, Opts, Seq) when Type =:= auth; Type =:= acl->
gen_server:call(?SERVER, {register_mod, Type, Mod, Opts, Seq}).
%% @doc Unregister authentication or ACL module
-spec(unregister_mod(Type :: auth | acl, Mod :: atom())
-> ok | {error, not_found | term()}).
%% @doc Unregister an Auth/ACL module.
-spec(unregister_mod(auth | acl, module()) -> ok | {error, not_found | term()}).
unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl ->
gen_server:call(?SERVER, {unregister_mod, Type, Mod}).
%% @doc Lookup authentication or ACL modules.
%% @doc Lookup all Auth/ACL modules.
-spec(lookup_mods(auth | acl) -> list()).
lookup_mods(Type) ->
case ets:lookup(?TAB, tab_key(Type)) of
@ -126,19 +131,12 @@ lookup_mods(Type) ->
tab_key(auth) -> auth_modules;
tab_key(acl) -> acl_modules.
%% @doc Stop access control server.
stop() ->
gen_server:stop(?MODULE, normal, infinity).
gen_server:stop(?SERVER, normal, infinity).
%%TODO: Support ACL cache...
clean_acl_cache(_ClientId) ->
ok.
clean_acl_cache(_ClientId, _Topic) ->
ok.
%%--------------------------------------------------------------------
%%-----------------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
%%-----------------------------------------------------------------------------
init([]) ->
_ = emqx_tables:new(?TAB, [set, protected, {read_concurrency, true}]),
@ -146,31 +144,31 @@ init([]) ->
handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
Mods = lookup_mods(Type),
Existed = lists:keyfind(Mod, 1, Mods),
{reply, if_existed(Existed, fun() ->
reply(case lists:keyfind(Mod, 1, Mods) of
true ->
{error, already_existed};
false ->
case catch Mod:init(Opts) of
{ok, ModState} ->
NewMods = lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) ->
Seq1 >= Seq2
end, [{Mod, ModState, Seq} | Mods]),
ets:insert(?TAB, {tab_key(Type), NewMods}),
ok;
ets:insert(?TAB, {tab_key(Type), NewMods}), ok;
{error, Error} ->
{error, Error};
{'EXIT', Reason} ->
{error, Reason}
end
end), State};
end, State);
handle_call({unregister_mod, Type, Mod}, _From, State) ->
Mods = lookup_mods(Type),
case lists:keyfind(Mod, 1, Mods) of
reply(case lists:keyfind(Mod, 1, Mods) of
false ->
{reply, {error, not_found}, State};
_ ->
_ = ets:insert(?TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}),
{reply, ok, State}
end;
{error, not_found};
true ->
ets:insert(?TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}), ok
end, State);
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
@ -197,8 +195,6 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
if_existed(false, Fun) ->
Fun();
if_existed(_Mod, _Fun) ->
{error, already_existed}.
reply(Reply, State) ->
{reply, Reply, State}.

View File

@ -17,9 +17,9 @@
-include("emqx.hrl").
-type(who() :: all | binary() |
{ipaddr, esockd_cidr:cidr_string()} |
{client, binary()} |
{user, binary()}).
{user, binary()} |
{ipaddr, esockd_cidr:cidr_string()}).
-type(access() :: subscribe | publish | pubsub).
@ -30,7 +30,8 @@
-export_type([rule/0]).
-export([compile/1, match/3]).
-export([compile/1]).
-export([match/3]).
-define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= deny))).
@ -71,7 +72,8 @@ compile(topic, Topic) ->
end.
'pattern?'(Words) ->
lists:member(<<"%u">>, Words) orelse lists:member(<<"%c">>, Words).
lists:member(<<"%u">>, Words)
orelse lists:member(<<"%c">>, Words).
bin(L) when is_list(L) ->
list_to_binary(L);
@ -79,69 +81,70 @@ bin(B) when is_binary(B) ->
B.
%% @doc Match access rule
-spec(match(client(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch).
match(_Client, _Topic, {AllowDeny, all}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) ->
-spec(match(credentials(), topic(), rule())
-> {matched, allow} | {matched, deny} | nomatch).
match(_Credentials, _Topic, {AllowDeny, all}) when ?ALLOW_DENY(AllowDeny) ->
{matched, AllowDeny};
match(Client, Topic, {AllowDeny, Who, _PubSub, TopicFilters})
when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) ->
case match_who(Client, Who)
andalso match_topics(Client, Topic, TopicFilters) of
match(Credentials, Topic, {AllowDeny, Who, _PubSub, TopicFilters})
when ?ALLOW_DENY(AllowDeny) ->
case match_who(Credentials, Who)
andalso match_topics(Credentials, Topic, TopicFilters) of
true -> {matched, AllowDeny};
false -> nomatch
end.
match_who(_Client, all) ->
match_who(_Credentials, all) ->
true;
match_who(_Client, {user, all}) ->
match_who(_Credentials, {user, all}) ->
true;
match_who(_Client, {client, all}) ->
match_who(_Credentials, {client, all}) ->
true;
match_who(#client{id = ClientId}, {client, ClientId}) ->
match_who(#{client_id := ClientId}, {client, ClientId}) ->
true;
match_who(#client{username = Username}, {user, Username}) ->
match_who(#{username := Username}, {user, Username}) ->
true;
match_who(#client{peername = undefined}, {ipaddr, _Tup}) ->
match_who(#{peername := undefined}, {ipaddr, _Tup}) ->
false;
match_who(#client{peername = {IP, _}}, {ipaddr, CIDR}) ->
match_who(#{peername := {IP, _}}, {ipaddr, CIDR}) ->
esockd_cidr:match(IP, CIDR);
match_who(Client, {'and', Conds}) when is_list(Conds) ->
match_who(Credentials, {'and', Conds}) when is_list(Conds) ->
lists:foldl(fun(Who, Allow) ->
match_who(Client, Who) andalso Allow
match_who(Credentials, Who) andalso Allow
end, true, Conds);
match_who(Client, {'or', Conds}) when is_list(Conds) ->
match_who(Credentials, {'or', Conds}) when is_list(Conds) ->
lists:foldl(fun(Who, Allow) ->
match_who(Client, Who) orelse Allow
match_who(Credentials, Who) orelse Allow
end, false, Conds);
match_who(_Client, _Who) ->
match_who(_Credentials, _Who) ->
false.
match_topics(_Client, _Topic, []) ->
match_topics(_Credentials, _Topic, []) ->
false;
match_topics(Client, Topic, [{pattern, PatternFilter}|Filters]) ->
TopicFilter = feed_var(Client, PatternFilter),
match_topics(Credentials, Topic, [{pattern, PatternFilter}|Filters]) ->
TopicFilter = feed_var(Credentials, PatternFilter),
match_topic(emqx_topic:words(Topic), TopicFilter)
orelse match_topics(Client, Topic, Filters);
match_topics(Client, Topic, [TopicFilter|Filters]) ->
orelse match_topics(Credentials, Topic, Filters);
match_topics(Credentials, Topic, [TopicFilter|Filters]) ->
match_topic(emqx_topic:words(Topic), TopicFilter)
orelse match_topics(Client, Topic, Filters).
orelse match_topics(Credentials, Topic, Filters).
match_topic(Topic, {eq, TopicFilter}) ->
Topic =:= TopicFilter;
Topic == TopicFilter;
match_topic(Topic, TopicFilter) ->
emqx_topic:match(Topic, TopicFilter).
feed_var(Client, Pattern) ->
feed_var(Client, Pattern, []).
feed_var(_Client, [], Acc) ->
feed_var(Credentials, Pattern) ->
feed_var(Credentials, Pattern, []).
feed_var(_Credentials, [], Acc) ->
lists:reverse(Acc);
feed_var(Client = #client{id = undefined}, [<<"%c">>|Words], Acc) ->
feed_var(Client, Words, [<<"%c">>|Acc]);
feed_var(Client = #client{id = ClientId}, [<<"%c">>|Words], Acc) ->
feed_var(Client, Words, [ClientId |Acc]);
feed_var(Client = #client{username = undefined}, [<<"%u">>|Words], Acc) ->
feed_var(Client, Words, [<<"%u">>|Acc]);
feed_var(Client = #client{username = Username}, [<<"%u">>|Words], Acc) ->
feed_var(Client, Words, [Username|Acc]);
feed_var(Client, [W|Words], Acc) ->
feed_var(Client, Words, [W|Acc]).
feed_var(Credentials = #{client_id := undefined}, [<<"%c">>|Words], Acc) ->
feed_var(Credentials, Words, [<<"%c">>|Acc]);
feed_var(Credentials = #{client_id := ClientId}, [<<"%c">>|Words], Acc) ->
feed_var(Credentials, Words, [ClientId |Acc]);
feed_var(Credentials = #{username := undefined}, [<<"%u">>|Words], Acc) ->
feed_var(Credentials, Words, [<<"%u">>|Acc]);
feed_var(Credentials = #{username := Username}, [<<"%u">>|Words], Acc) ->
feed_var(Credentials, Words, [Username|Acc]);
feed_var(Credentials, [W|Words], Acc) ->
feed_var(Credentials, Words, [W|Acc]).

View File

@ -25,11 +25,11 @@
-define(ACL_RULE_TAB, emqx_acl_rule).
-record(state, {config}).
-record(state, {acl_file}).
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% @doc Read all rules
-spec(all_rules() -> list(emqx_access_rule:rule())).
@ -39,17 +39,17 @@ all_rules() ->
[{_, Rules}] -> Rules
end.
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% ACL callbacks
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% @doc Init internal ACL
-spec(init([File :: string()]) -> {ok, State :: any()}).
-spec(init([File :: string()]) -> {ok, State :: term()}).
init([File]) ->
_ = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]),
{ok, load_rules_from_file(#state{config = File})}.
{ok, load_rules_from_file(#state{acl_file = File})}.
load_rules_from_file(State = #state{config = AclFile}) ->
load_rules_from_file(State = #state{acl_file = AclFile}) ->
{ok, Terms} = file:consult(AclFile),
Rules = [emqx_access_rule:compile(Term) || Term <- Terms],
lists:foreach(fun(PubSub) ->
@ -73,15 +73,12 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
false.
%% @doc Check ACL
-spec(check_acl({Client, PubSub, Topic}, State) -> allow | deny | ignore when
Client :: client(),
PubSub :: pubsub(),
Topic :: topic(),
State :: #state{}).
check_acl(_Who, #state{config = undefined}) ->
-spec(check_acl({credentials(), pubsub(), topic()}, #state{})
-> allow | deny | ignore).
check_acl(_Who, #state{acl_file = undefined}) ->
allow;
check_acl({Client, PubSub, Topic}, #state{}) ->
case match(Client, Topic, lookup(PubSub)) of
check_acl({Credentials, PubSub, Topic}, #state{}) ->
case match(Credentials, Topic, lookup(PubSub)) of
{matched, allow} -> allow;
{matched, deny} -> deny;
nomatch -> ignore
@ -93,26 +90,24 @@ lookup(PubSub) ->
[{PubSub, Rules}] -> Rules
end.
match(_Client, _Topic, []) ->
match(_Credentials, _Topic, []) ->
nomatch;
match(Client, Topic, [Rule|Rules]) ->
case emqx_access_rule:match(Client, Topic, Rule) of
nomatch -> match(Client, Topic, Rules);
match(Credentials, Topic, [Rule|Rules]) ->
case emqx_access_rule:match(Credentials, Topic, Rule) of
nomatch -> match(Credentials, Topic, Rules);
{matched, AllowDeny} -> {matched, AllowDeny}
end.
%% @doc Reload ACL
-spec(reload_acl(State :: #state{}) -> ok | {error, Reason :: any()}).
reload_acl(#state{config = undefined}) ->
-spec(reload_acl(#state{}) -> ok | {error, term()}).
reload_acl(#state{acl_file = undefined}) ->
ok;
reload_acl(State) ->
case catch load_rules_from_file(State) of
{'EXIT', Error} -> {error, Error};
true -> io:format("~s~n", ["reload acl_internal successfully"]), ok
{'EXIT', Error} ->
{error, Error};
true -> ok
end.
%% @doc ACL Module Description
-spec(description() -> string()).
description() ->
"Internal ACL with etc/acl.conf".

View File

@ -22,14 +22,12 @@
-ifdef(use_specs).
-callback(init(AclOpts :: list()) -> {ok, State :: any()}).
-callback(init(AclOpts :: list()) -> {ok, State :: term()}).
-callback(check_acl({Client :: client(),
PubSub :: pubsub(),
Topic :: topic()}, State :: any())
-callback(check_acl({credentials(), pubsub(), topic()}, State :: term())
-> allow | deny | ignore).
-callback(reload_acl(State :: any()) -> ok | {error, term()}).
-callback(reload_acl(State :: term()) -> ok | {error, term()}).
-callback(description() -> string()).

View File

@ -22,13 +22,11 @@
-ifdef(use_specs).
-callback(init(AuthOpts :: list()) -> {ok, State :: any()}).
-callback(check(Client :: client(),
Password :: binary(),
State :: any())
-> ok | {ok, boolean()} | ignore | {error, string()}).
-callback(init(AuthOpts :: list()) -> {ok, State :: term()}).
-callback(check(credentials(), password(), State :: term())
-> ok | {ok, boolean()} | {ok, map()} |
{continue, map()} | ignore | {error, term()}).
-callback(description() -> string()).
-else.

38
src/emqx_types.erl Normal file
View File

@ -0,0 +1,38 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_types).
%%-include("emqx.hrl").
-export_type([zone/0, client_id/0, username/0, password/0, peername/0,
protocol/0, credentials/0]).
%%-export_type([payload/0, message/0, delivery/0]).
-type(zone() :: atom()).
-type(client_id() :: binary() | atom()).
-type(username() :: binary() | undefined).
-type(password() :: binary() | undefined).
-type(peername() :: {inet:ip_address(), inet:port_number()}).
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()).
-type(credentials() :: #{client_id := client_id(),
username := username(),
peername := peername(),
zone => zone(),
atom() => term()}).
-type(payload() :: binary() | iodata()).
%-type(message() :: #message{}).
%-type(delivery() :: #delivery{}).

View File

@ -247,6 +247,7 @@ terminate(SockError, _Req, #state{keepalive = Keepalive,
case Reason of
undefined ->
ok;
%%TODO:
%%emqx_protocol:shutdown(SockError, ProtoState);
_ ->
ok%%emqx_protocol:shutdown(Reason, ProtoState)