From e742d73f70bfaeca318617d7f876ee388d8151da Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Sun, 5 Apr 2015 23:17:46 +0800 Subject: [PATCH] support acl mods --- apps/emqttd/include/emqttd.hrl | 14 ++-- apps/emqttd/src/emqttd_acl.erl | 138 +++++++++++++++++++++++++-------- 2 files changed, 112 insertions(+), 40 deletions(-) diff --git a/apps/emqttd/include/emqttd.hrl b/apps/emqttd/include/emqttd.hrl index a5ee399e2..ed74dff13 100644 --- a/apps/emqttd/include/emqttd.hrl +++ b/apps/emqttd/include/emqttd.hrl @@ -59,13 +59,6 @@ -type mqtt_session() :: #mqtt_session{}. -%%------------------------------------------------------------------------------ -%% MQTT Retained Message -%%------------------------------------------------------------------------------ --record(mqtt_retained, {topic, qos, payload}). - --type mqtt_retained() :: #mqtt_retained{}. - %%------------------------------------------------------------------------------ %% MQTT User Management %%------------------------------------------------------------------------------ @@ -93,3 +86,10 @@ -record(mqtt_plugin, {name, version, attrs, description}). +%%------------------------------------------------------------------------------ +%% MQTT Retained Message +%%------------------------------------------------------------------------------ +-record(mqtt_retained, {topic, qos, payload}). + +-type mqtt_retained() :: #mqtt_retained{}. + diff --git a/apps/emqttd/src/emqttd_acl.erl b/apps/emqttd/src/emqttd_acl.erl index e0d1d5af4..11e96ec42 100644 --- a/apps/emqttd/src/emqttd_acl.erl +++ b/apps/emqttd/src/emqttd_acl.erl @@ -35,23 +35,6 @@ -include("emqttd.hrl"). --behaviour(gen_server). - --define(SERVER, ?MODULE). - -%% API Function Exports --export([start_link/1, check/3, reload/0]). - --ifdef(TEST). - --export([compile/1, match/3]). - --endif. - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -type pubsub() :: subscribe | publish | pubsub. -type who() :: all | binary() | @@ -64,12 +47,38 @@ {deny, all} | {deny, who(), pubsub(), list(binary())}. --record(mqtt_acl, {pubsub :: pubsub(), - rules :: list(rule())}). +-export_type([pubsub/0]). + +-callback check_acl(PubSub, User, Topic) -> {ok, allow | deny} | ignore | {error, any()} when + PubSub :: pubsub(), + User :: mqtt_user(), + Topic :: binary(). + +-callback reload_acl() -> ok | {error, any()}. + +-behaviour(gen_server). + +-define(SERVER, ?MODULE). + +%% API Function Exports +-export([start_link/1, check/3, reload/0, register_mod/1, unregister_mod/1]). + +%% ACL Callback +-export([check_acl/3, reload_acl/0]). + +-ifdef(TEST). + +-export([compile/1, match/3]). + +-endif. + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). -define(ACL_TAB, mqtt_acl). --record(state, {raw_rules = []}). +-record(state, {acl_file, raw_rules = []}). %%%============================================================================= %%% API @@ -91,16 +100,50 @@ start_link(AclOpts) -> User :: mqtt_user(), Topic :: binary(). check(PubSub, User, Topic) -> - case match(User, Topic, lookup(PubSub)) of - nomatch -> allow; - {matched, allow} -> allow; - {matched, deny} -> deny + case ets:lookup(?ACL_TAB, acl_mods) of + [] -> {error, "No ACL mod!"}; + [{_, Mods}] -> check(PubSub, User, Topic, Mods) end. +check(_PubSub, _User, _Topic, []) -> + {error, "All ACL mods ignored!"}; +check(PubSub, User, Topic, [Mod|Mods]) -> + case Mod:check_acl(PubSub, User, Topic) of + {ok, AllowDeny} -> AllowDeny; + ignore -> check(PubSub, User, Topic, Mods) + end. + +%%TODO: +reload() -> + case ets:lookup(?ACL_TAB, acl_mods) of + [] -> {error, "No ACL mod!"}; + [{_, Mods}] -> [M:reload() || M <- Mods] + end. + +register_mod(Mod) -> + gen_server:call(?MODULE, {register_mod, Mod}). + +unregister_mod(Mod) -> + gen_server:call(?MODULE, {unregister_mod, Mod}). + +-spec check_acl(PubSub, User, Topic) -> {ok, allow} | {ok, deny} | ignore | {error, any()} when + PubSub :: pubsub(), + User :: mqtt_user(), + Topic :: binary(). +check_acl(PubSub, User, Topic) -> + case match(User, Topic, lookup(PubSub)) of + {matched, allow} -> {ok, allow}; + {matched, deny} -> {ok, deny}; + nomatch -> {error, nomatch} + end. + +-spec reload_acl() -> ok. +reload_acl() -> + gen_server:call(?SERVER, reload). lookup(PubSub) -> case ets:lookup(?ACL_TAB, PubSub) of [] -> []; - [#mqtt_acl{pubsub = PubSub, rules = Rules}] -> Rules + [{PubSub, Rules}] -> Rules end. match(_User, _Topic, []) -> @@ -112,23 +155,23 @@ match(User, Topic, [Rule|Rules]) -> {matched, AllowDeny} -> {matched, AllowDeny} end. --spec reload() -> ok. -reload() -> - gen_server:call(?SERVER, reload). - %% ------------------------------------------------------------------ %% gen_server Function Definitions %% ------------------------------------------------------------------ init([AclOpts]) -> + ets:new(?ACL_TAB, [set, protected, named_table]), + ets:insert(?ACL_TAB, {acl_mods, [?MODULE]}), AclFile = proplists:get_value(file, AclOpts), + load_rules(#state{acl_file = AclFile}). + +load_rules(State = #state{acl_file = AclFile}) -> {ok, Terms} = file:consult(AclFile), Rules = [compile(Term) || Term <- Terms], - ets:new(?ACL_TAB, [set, protected, named_table, {keypos, 2}]), lists:foreach(fun(PubSub) -> - ets:insert(?ACL_TAB, #mqtt_acl{pubsub = PubSub, rules = + ets:insert(?ACL_TAB, {PubSub, lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)}) end, [publish, subscribe]), - {ok, #state{raw_rules = Terms}}. + {ok, State#state{raw_rules = Terms}}. filter(_PubSub, {allow, all}) -> true; @@ -144,7 +187,36 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> false. handle_call(reload, _From, State) -> - {reply, {error, unsupported}, State}. + case catch load_rules(State) of + {ok, NewState} -> + {reply, ok, NewState}; + {'EXIT', Error} -> + {reply, {error, Error}, State} + end; + +handle_call({register_mod, Mod}, _From, State) -> + [{_, Mods}] = ets:lookup(?ACL_TAB, acl_mods), + case lists:member(Mod, Mods) of + true -> + {reply, {error, registered}, State}; + false -> + ets:insert(?ACL_TAB, {acl_mods, [Mod|Mods]}), + {reply, ok, State} + end; + +handle_call({unregister_mod, Mod}, _From, State) -> + [{_, Mods}] = ets:lookup(?ACL_TAB, acl_mods), + case lists:member(Mod, Mods) of + true -> + ets:insert(?ACL_TAB, lists:delete(Mod, Mods)), + {reply, ok, State}; + false -> + {reply, {error, not_found}, State} + end; + +handle_call(Req, _From, State) -> + lager:error("Bad Request: ~p", [Req]), + {reply, {error, badreq}, State}. handle_cast(_Msg, State) -> {noreply, State}.