diff --git a/apps/emqttd/src/emqttd_access_rule.erl b/apps/emqttd/src/emqttd_access_rule.erl index 096c10b8d..7d367f68f 100644 --- a/apps/emqttd/src/emqttd_access_rule.erl +++ b/apps/emqttd/src/emqttd_access_rule.erl @@ -42,6 +42,8 @@ {deny, all} | {deny, who(), access(), list(topic())}. +-export_type([rule/0]). + -export([compile/1, match/3]). %%%----------------------------------------------------------------------------- diff --git a/apps/emqttd/src/emqttd_acl.erl b/apps/emqttd/src/emqttd_acl.erl index 46abb5373..02350e7d4 100644 --- a/apps/emqttd/src/emqttd_acl.erl +++ b/apps/emqttd/src/emqttd_acl.erl @@ -35,8 +35,8 @@ -define(SERVER, ?MODULE). %% API Function Exports --export([start_link/1, check/3, reload/0, - register_mod/1, unregister_mod/1, all_modules/0, +-export([start_link/1, check/1, reload/0, + register_mod/2, unregister_mod/1, all_modules/0, stop/0]). %% gen_server callbacks @@ -53,12 +53,12 @@ -callback init(AclOpts :: list()) -> {ok, State :: any()}. --callback check_acl(User, PubSub, Topic) -> allow | deny | ignore when +-callback check_acl({User, PubSub, Topic}, State :: any()) -> allow | deny | ignore when User :: mqtt_user(), PubSub :: pubsub(), Topic :: binary(). --callback reload_acl() -> ok | {error, any()}. +-callback reload_acl(State :: any()) -> ok | {error, any()}. -callback description() -> string(). @@ -67,7 +67,7 @@ -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{init, 1}, {check_acl, 3}, {reload_acl, 0}, {description, 0}]; + [{init, 1}, {check_acl, 2}, {reload_acl, 1}, {description, 0}]; behaviour_info(_Other) -> undefined. @@ -77,107 +77,107 @@ behaviour_info(_Other) -> %%% API %%%============================================================================= -%%------------------------------------------------------------------------------ -%% @doc -%% Start ACL Server. -%% -%% @end -%%------------------------------------------------------------------------------ --spec start_link(AclOpts) -> {ok, pid()} | ignore | {error, any()} when - AclOpts :: [{file, list()}]. -start_link(AclOpts) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [AclOpts], []). +%% @doc Start ACL Server. +-spec start_link(AclMods :: list()) -> {ok, pid()} | ignore | {error, any()}. +start_link(AclMods) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [AclMods], []). -%%------------------------------------------------------------------------------ -%% @doc -%% Check ACL. -%% -%% @end -%%-------------------------------------------------------------------------- --spec check(User, PubSub, Topic) -> allow | deny | ignore when +%% @doc Check ACL. +-spec check({User, PubSub, Topic}) -> allow | deny when User :: mqtt_user(), PubSub :: pubsub(), Topic :: binary(). -check(User, PubSub, Topic) when PubSub =:= publish orelse PubSub =:= subscribe -> +check({User, PubSub, Topic}) when PubSub =:= publish orelse PubSub =:= subscribe -> case ets:lookup(?ACL_TABLE, acl_modules) of [] -> allow; - [{_, Mods}] -> check(User, PubSub, Topic, Mods) + [{_, AclMods}] -> check({User, PubSub, Topic}, AclMods) end. -check(#mqtt_user{clientid = ClientId}, PubSub, Topic, []) -> +check({#mqtt_user{clientid = ClientId}, PubSub, Topic}, []) -> lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]), allow; -check(User, PubSub, Topic, [Mod|Mods]) -> - case Mod:check_acl(User, PubSub, Topic) of +check({User, PubSub, Topic}, [{M, State}|AclMods]) -> + case M:check_acl({User, PubSub, Topic}, State) of allow -> allow; deny -> deny; - ignore -> check(User, PubSub, Topic, Mods) + ignore -> check({User, PubSub, Topic}, AclMods) end. -%%------------------------------------------------------------------------------ -%% @doc -%% Reload ACL. -%% -%% @end -%%------------------------------------------------------------------------------ +%% @doc Reload ACL. +-spec reload() -> list() | {error, any()}. reload() -> case ets:lookup(?ACL_TABLE, acl_modules) of - [] -> {error, "No ACL mod!"}; - [{_, Mods}] -> [M:reload_acl() || M <- Mods] + [] -> + {error, "No ACL modules!"}; + [{_, AclMods}] -> + [M:reload_acl(State) || {M, State} <- AclMods] end. -%%------------------------------------------------------------------------------ -%% @doc -%% Register ACL Module. -%% -%% @end -%%------------------------------------------------------------------------------ --spec register_mod(Mod :: atom()) -> ok | {error, any()}. -register_mod(Mod) -> - gen_server:call(?SERVER, {register_mod, Mod}). +%% @doc Register ACL Module. +-spec register_mod(AclMod :: atom(), Opts :: list()) -> ok | {error, any()}. +register_mod(AclMod, Opts) -> + gen_server:call(?SERVER, {register_mod, AclMod, Opts}). -%%------------------------------------------------------------------------------ -%% @doc -%% Unregister ACL Module. -%% -%% @end -%%------------------------------------------------------------------------------ --spec unregister_mod(Mod :: atom()) -> ok | {error, any()}. -unregister_mod(Mod) -> - gen_server:cast(?SERVER, {unregister_mod, Mod}). +%% @doc Unregister ACL Module. +-spec unregister_mod(AclMod :: atom()) -> ok | {error, any()}. +unregister_mod(AclMod) -> + gen_server:call(?SERVER, {unregister_mod, AclMod}). -%%------------------------------------------------------------------------------ -%% @doc -%% All ACL Modules. -%% -%% @end -%%------------------------------------------------------------------------------ +%% @doc All ACL Modules. +-spec all_modules() -> list(). all_modules() -> case ets:lookup(?ACL_TABLE, acl_modules) of [] -> []; - [{_, Mods}] -> Mods + [{_, AclMods}] -> AclMods end. +%% @doc Stop ACL server. +-spec stop() -> ok. stop() -> gen_server:call(?SERVER, stop). %%%============================================================================= %%% gen_server callbacks. %%%============================================================================= -init([_AclOpts]) -> +init([AclMods]) -> ets:new(?ACL_TABLE, [set, protected, named_table]), + AclMods1 = lists:map( + fun({M, Opts}) -> + AclMod = aclmod(M), + {ok, State} = AclMod:init(Opts), + {AclMod, State} + end, AclMods), + ets:insert(?ACL_TABLE, {acl_modules, AclMods1}), {ok, state}. -handle_call({register_mod, Mod}, _From, State) -> - Mods = all_modules(), - case lists:member(Mod, Mods) of - true -> - {reply, {error, existed}, State}; - false -> - ets:insert(?ACL_TABLE, {acl_modules, [Mod | Mods]}), - {reply, ok, State} - end; +handle_call({register_mod, Mod, Opts}, _From, State) -> + AclMods = all_modules(), + Reply = + case lists:keyfind(Mod, 1, AclMods) of + false -> + case catch Mod:init(Opts) of + {ok, ModState} -> + ets:insert(?ACL_TABLE, {acl_modules, [{Mod, ModState}|AclMods]}), + ok; + {'EXIT', Error} -> + {error, Error} + end; + _ -> + {error, existed} + end, + {reply, Reply, State}; + +handle_call({unregister_mod, Mod}, _From, State) -> + AclMods = all_modules(), + Reply = + case lists:keyfind(Mod, 1, AclMods) of + false -> + {error, not_found}; + _ -> + ets:insert(?ACL_TABLE, {acl_modules, lists:keydelete(Mod, 1, AclMods)}), ok + end, + {reply, Reply, State}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; @@ -186,16 +186,6 @@ handle_call(Req, _From, State) -> lager:error("Bad Request: ~p", [Req]), {reply, {error, badreq}, State}. -handle_cast({unregister_mod, Mod}, State) -> - Mods = all_modules(), - case lists:member(Mod, Mods) of - true -> - ets:insert(?ACL_TABLE, {acl_modules, lists:delete(Mod, Mods)}); - false -> - lager:error("unknown acl module: ~s", [Mod]) - end, - {noreply, State}; - handle_cast(_Msg, State) -> {noreply, State}. @@ -212,3 +202,7 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= +aclmod(Name) when is_atom(Name) -> + list_to_atom(lists:concat(["emqttd_acl_", Name])). + + diff --git a/apps/emqttd/src/emqttd_acl_internal.erl b/apps/emqttd/src/emqttd_acl_internal.erl index 3de356874..593c8368d 100644 --- a/apps/emqttd/src/emqttd_acl_internal.erl +++ b/apps/emqttd/src/emqttd_acl_internal.erl @@ -30,62 +30,76 @@ -include("emqttd.hrl"). --export([start_link/1, stop/0]). +-export([all_rules/0]). -behaviour(emqttd_acl). %% ACL callbacks --export([check_acl/3, reload_acl/0, description/0]). - --behaviour(gen_server). - --define(SERVER, ?MODULE). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +-export([init/1, check_acl/2, reload_acl/1, description/0]). -define(ACL_RULE_TABLE, mqtt_acl_rule). --record(state, {acl_file, raw_rules = []}). +-record(state, {acl_file, nomatch = allow}). %%%============================================================================= %%% API %%%============================================================================= -%%------------------------------------------------------------------------------ -%% @doc -%% Start Internal ACL Server. -%% -%% @end -%%------------------------------------------------------------------------------ --spec start_link(AclOpts) -> {ok, pid()} | ignore | {error, any()} when - AclOpts :: [{file, list()}]. -start_link(AclOpts) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [AclOpts], []). - -stop() -> - gen_server:call(?SERVER, stop). +%% @doc Read all rules. +-spec all_rules() -> list(emqttd_access_rule:rule()). +all_rules() -> + case ets:lookup(?ACL_RULE_TABLE, all_rules) of + [] -> []; + [{_, Rules}] -> Rules + end. %%%============================================================================= %%% ACL callbacks %%%============================================================================= -%%------------------------------------------------------------------------------ -%% @doc -%% Check ACL. -%% -%% @end -%%------------------------------------------------------------------------------ --spec check_acl(User, PubSub, Topic) -> allow | deny | ignore when +%% @doc init internal ACL. +-spec init(AclOpts :: list()) -> {ok, State :: any()}. +init(AclOpts) -> + ets:new(?ACL_RULE_TABLE, [set, public, named_table]), + AclFile = proplists:get_value(file, AclOpts), + Default = proplists:get_value(nomatch, AclOpts, allow), + State = #state{acl_file = AclFile, nomatch = Default}, + load_rules(State), + {ok, State}. + +load_rules(#state{acl_file = AclFile}) -> + {ok, Terms} = file:consult(AclFile), + Rules = [emqttd_access_rule:compile(Term) || Term <- Terms], + lists:foreach(fun(PubSub) -> + ets:insert(?ACL_RULE_TABLE, {PubSub, + lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)}) + end, [publish, subscribe]), + ets:insert(?ACL_RULE_TABLE, {all_rules, Terms}). + +filter(_PubSub, {allow, all}) -> + true; +filter(_PubSub, {deny, all}) -> + true; +filter(publish, {_AllowDeny, _Who, publish, _Topics}) -> + true; +filter(_PubSub, {_AllowDeny, _Who, pubsub, _Topics}) -> + true; +filter(subscribe, {_AllowDeny, _Who, subscribe, _Topics}) -> + true; +filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> + false. + +%% @doc Check ACL. +-spec check_acl({User, PubSub, Topic}, State) -> allow | deny | ignore when User :: mqtt_user(), PubSub :: pubsub(), - Topic :: binary(). -check_acl(User, PubSub, Topic) -> + Topic :: binary(), + State :: #state{}. +check_acl({User, PubSub, Topic}, #state{nomatch = Default}) -> case match(User, Topic, lookup(PubSub)) of {matched, allow} -> allow; {matched, deny} -> deny; - nomatch -> ignore + nomatch -> Default end. lookup(PubSub) -> @@ -103,89 +117,16 @@ match(User, Topic, [Rule|Rules]) -> {matched, AllowDeny} -> {matched, AllowDeny} end. -%%------------------------------------------------------------------------------ -%% @doc -%% Reload ACL. -%% -%% @end -%%------------------------------------------------------------------------------ --spec reload_acl() -> ok. -reload_acl() -> - gen_server:call(?SERVER, reload). +%% @doc Reload ACL. +-spec reload_acl(State :: #state{}) -> ok | {error, Reason :: any()}. +reload_acl(State) -> + case catch load_rules(State) of + {'EXIT', Error} -> {error, Error}; + _ -> ok + end. -%%------------------------------------------------------------------------------ -%% @doc -%% ACL Description. -%% -%% @end -%%------------------------------------------------------------------------------ +%% @doc ACL Description. -spec description() -> string(). description() -> "Internal ACL with etc/acl.config". -%%%============================================================================= -%%% gen_server callbacks -%%%============================================================================= - -init([AclOpts]) -> - ets:new(?ACL_RULE_TABLE, [set, protected, named_table]), - AclFile = proplists:get_value(file, AclOpts), - {ok, State} = load_rules(#state{acl_file = AclFile}), - emqttd_acl:register_mod(?MODULE), - {ok, State}. - -handle_call(reload, _From, State) -> - case catch load_rules(State) of - {ok, NewState} -> - {reply, ok, NewState}; - {'EXIT', Error} -> - {reply, {error, Error}, State} - end; - -handle_call(stop, _From, State) -> - {stop, normal, ok, State}; - -handle_call(Req, _From, State) -> - lager:error("BadReq: ~p", [Req]), - {reply, {error, badreq}, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - emqttd_acl:unregister_mod(?MODULE), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%============================================================================= -%%% Internal functions -%%%============================================================================= - -load_rules(State = #state{acl_file = AclFile}) -> - {ok, Terms} = file:consult(AclFile), - Rules = [emqttd_access_rule:compile(Term) || Term <- Terms], - lists:foreach(fun(PubSub) -> - ets:insert(?ACL_RULE_TABLE, {PubSub, - lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)}) - end, [publish, subscribe]), - {ok, State#state{raw_rules = Terms}}. - -filter(_PubSub, {allow, all}) -> - true; -filter(_PubSub, {deny, all}) -> - true; -filter(publish, {_AllowDeny, _Who, publish, _Topics}) -> - true; -filter(_PubSub, {_AllowDeny, _Who, pubsub, _Topics}) -> - true; -filter(subscribe, {_AllowDeny, _Who, subscribe, _Topics}) -> - true; -filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> - false. - - diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index dcd0b181e..6b084c071 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -136,8 +136,7 @@ service(auth) -> service(acl) -> {ok, AclOpts} = application:get_env(acl), - [{"emqttd acl", emqttd_acl, AclOpts}, - {"emqttd internal acl", emqttd_acl_internal, AclOpts}]; + {"emqttd acl", emqttd_acl, AclOpts}; service(monitor) -> {"emqttd monitor", emqttd_monitor}. diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index e64504f21..e86f3733b 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -96,15 +96,7 @@ received(Packet = ?PACKET(_Type), State = #proto_state{peername = Peername, lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]), case validate_packet(Packet) of ok -> - case access_control(Packet, State) of - {ok, allow} -> - handle(Packet, State); - {ok, deny} -> - {error, acl_denied, State}; - {error, AclError} -> - lager:error("Client ~s@~s: acl error - ~p", [ClientId, emqttd_net:format(Peername), AclError]), - {error, acl_error, State} - end; + handle(Packet, State); {error, Reason} -> {error, Reason, State} end. @@ -149,20 +141,37 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = {ok, Session} = emqttd_session:start({CleanSess, ClientId, self()}), {ok, State2#proto_state{session = Session}}; -handle(Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload), - State = #proto_state{session = Session}) -> - emqttd_session:publish(Session, {?QOS_0, emqttd_message:from_packet(Packet)}), +handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), + State = #proto_state{client_id = ClientId, session = Session}) -> + case emqttd_acl:check({mqtt_user(State), publish, Topic}) of + allow -> + emqttd_session:publish(Session, {?QOS_0, emqttd_message:from_packet(Packet)}); + deny -> + lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]) + end, {ok, State}; -handle(Packet = ?PUBLISH_PACKET(?QOS_1, _Topic, PacketId, _Payload), - State = #proto_state{session = Session}) -> - emqttd_session:publish(Session, {?QOS_1, emqttd_message:from_packet(Packet)}), - send(?PUBACK_PACKET(?PUBACK, PacketId), State); +handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), + State = #proto_state{client_id = ClientId, session = Session}) -> + case emqttd_acl:check({mqtt_user(State), publish, Topic}) of + allow -> + emqttd_session:publish(Session, {?QOS_1, emqttd_message:from_packet(Packet)}), + send(?PUBACK_PACKET(?PUBACK, PacketId), State); + deny -> + lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), + {ok, State} + end; -handle(Packet = ?PUBLISH_PACKET(?QOS_2, _Topic, PacketId, _Payload), - State = #proto_state{session = Session}) -> - NewSession = emqttd_session:publish(Session, {?QOS_2, emqttd_message:from_packet(Packet)}), - send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession}); +handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), + State = #proto_state{client_id = ClientId, session = Session}) -> + case emqttd_acl:check({mqtt_user(State), publish, Topic}) of + allow -> + NewSession = emqttd_session:publish(Session, {?QOS_2, emqttd_message:from_packet(Packet)}), + send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession}); + deny -> + lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), + {ok, State} + end; handle(?PUBACK_PACKET(Type, PacketId), State = #proto_state{session = Session}) when Type >= ?PUBACK andalso Type =< ?PUBCOMP -> @@ -179,8 +188,15 @@ handle(?PUBACK_PACKET(Type, PacketId), State = #proto_state{session = Session}) {ok, NewState}; handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) -> - {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), - send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession}); + AllowDenies = [emqttd_acl:check({mqtt_user(State), subscribe, Topic}) || {Topic, _Qos} <- TopicTable], + case lists:member(deny, AllowDenies) of + true -> + %%TODO: return 128 QoS when deny... + {ok, State}; + false -> + {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), + send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession}) + end; handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> {ok, NewSession} = emqttd_session:unsubscribe(Session, Topics), @@ -218,9 +234,7 @@ send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername Transport:send(Sock, Data), {ok, State}. -%% %% @doc redeliver PUBREL PacketId -%% redeliver({?PUBREL, PacketId}, State) -> send(?PUBREL_PACKET(PacketId), State). @@ -238,7 +252,8 @@ clientid(<<>>, #proto_state{peername = Peername}) -> clientid(ClientId, _State) -> ClientId. -%%---------------------------------------------------------------------------- +mqtt_user(#proto_state{peername = {Addr, _Port}, client_id = ClientId, username = Username}) -> + #mqtt_user{username = Username, clientid = ClientId, ipaddr = Addr}. send_willmsg(undefined) -> ignore; %%TODO:should call session... @@ -316,36 +331,6 @@ validate_qos(undefined) -> true; validate_qos(Qos) when Qos =< ?QOS_2 -> true; validate_qos(_) -> false. -access_control(publish, Topic, State = #proto_state{client_id = ClientId}) -> - case emqttd_acl:check(mqtt_user(State), publish, Topic) of - {ok, allow} -> - allow; - {ok, deny} -> - lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), deny; - {error, AclError} -> - lager:error("ACL Error: ~p when ~s publish to ~s", [AclError, ClientId, Topic]), deny - end. - -access_control(?SUBSCRIBE_PACKET(_PacketId, TopicTable), State) -> - check_acl(mqtt_user(State), subscribe, [Topic || {Topic, _Qos} <- TopicTable]); - -mqtt_user(#proto_state{peername = {Addr, _Port}, client_id = ClientId, username = Username}) -> - #mqtt_user{username = Username, clientid = ClientId, ipaddr = Addr}. - -check_acl(_User, subscribe, []) -> - {ok, allow}; -check_acl(User = #mqtt_user{clientid=ClientId}, subscribe, [Topic|Topics]) -> - case emqttd_acl:check(User, subscribe, Topic) of - {ok, allow} -> - check_acl(User, subscribe, Topics); - {ok, deny} -> - lager:warning("ACL Deny: ~s cannnot subscribe ~s", [ClientId, Topic]), - {ok, deny}; - {error, Error} -> - {error, Error} - end. - - try_unregister(undefined, _) -> ok; try_unregister(ClientId, _) -> emqttd_cm:unregister(ClientId, self()).