integrate with acl

This commit is contained in:
Ery Lee 2015-04-08 02:36:16 +08:00
parent fa24100514
commit 826ca7afca
5 changed files with 174 additions and 253 deletions

View File

@ -42,6 +42,8 @@
{deny, all} | {deny, all} |
{deny, who(), access(), list(topic())}. {deny, who(), access(), list(topic())}.
-export_type([rule/0]).
-export([compile/1, match/3]). -export([compile/1, match/3]).
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------

View File

@ -35,8 +35,8 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
%% API Function Exports %% API Function Exports
-export([start_link/1, check/3, reload/0, -export([start_link/1, check/1, reload/0,
register_mod/1, unregister_mod/1, all_modules/0, register_mod/2, unregister_mod/1, all_modules/0,
stop/0]). stop/0]).
%% gen_server callbacks %% gen_server callbacks
@ -53,12 +53,12 @@
-callback init(AclOpts :: list()) -> {ok, State :: any()}. -callback init(AclOpts :: list()) -> {ok, State :: any()}.
-callback check_acl(User, PubSub, Topic) -> allow | deny | ignore when -callback check_acl({User, PubSub, Topic}, State :: any()) -> allow | deny | ignore when
User :: mqtt_user(), User :: mqtt_user(),
PubSub :: pubsub(), PubSub :: pubsub(),
Topic :: binary(). Topic :: binary().
-callback reload_acl() -> ok | {error, any()}. -callback reload_acl(State :: any()) -> ok | {error, any()}.
-callback description() -> string(). -callback description() -> string().
@ -67,7 +67,7 @@
-export([behaviour_info/1]). -export([behaviour_info/1]).
behaviour_info(callbacks) -> behaviour_info(callbacks) ->
[{init, 1}, {check_acl, 3}, {reload_acl, 0}, {description, 0}]; [{init, 1}, {check_acl, 2}, {reload_acl, 1}, {description, 0}];
behaviour_info(_Other) -> behaviour_info(_Other) ->
undefined. undefined.
@ -77,107 +77,107 @@ behaviour_info(_Other) ->
%%% API %%% API
%%%============================================================================= %%%=============================================================================
%%------------------------------------------------------------------------------ %% @doc Start ACL Server.
%% @doc -spec start_link(AclMods :: list()) -> {ok, pid()} | ignore | {error, any()}.
%% Start ACL Server. start_link(AclMods) ->
%% gen_server:start_link({local, ?SERVER}, ?MODULE, [AclMods], []).
%% @end
%%------------------------------------------------------------------------------
-spec start_link(AclOpts) -> {ok, pid()} | ignore | {error, any()} when
AclOpts :: [{file, list()}].
start_link(AclOpts) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [AclOpts], []).
%%------------------------------------------------------------------------------ %% @doc Check ACL.
%% @doc -spec check({User, PubSub, Topic}) -> allow | deny when
%% Check ACL.
%%
%% @end
%%--------------------------------------------------------------------------
-spec check(User, PubSub, Topic) -> allow | deny | ignore when
User :: mqtt_user(), User :: mqtt_user(),
PubSub :: pubsub(), PubSub :: pubsub(),
Topic :: binary(). Topic :: binary().
check(User, PubSub, Topic) when PubSub =:= publish orelse PubSub =:= subscribe -> check({User, PubSub, Topic}) when PubSub =:= publish orelse PubSub =:= subscribe ->
case ets:lookup(?ACL_TABLE, acl_modules) of case ets:lookup(?ACL_TABLE, acl_modules) of
[] -> allow; [] -> allow;
[{_, Mods}] -> check(User, PubSub, Topic, Mods) [{_, AclMods}] -> check({User, PubSub, Topic}, AclMods)
end. end.
check(#mqtt_user{clientid = ClientId}, PubSub, Topic, []) -> check({#mqtt_user{clientid = ClientId}, PubSub, Topic}, []) ->
lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]), lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]),
allow; allow;
check(User, PubSub, Topic, [Mod|Mods]) -> check({User, PubSub, Topic}, [{M, State}|AclMods]) ->
case Mod:check_acl(User, PubSub, Topic) of case M:check_acl({User, PubSub, Topic}, State) of
allow -> allow; allow -> allow;
deny -> deny; deny -> deny;
ignore -> check(User, PubSub, Topic, Mods) ignore -> check({User, PubSub, Topic}, AclMods)
end. end.
%%------------------------------------------------------------------------------ %% @doc Reload ACL.
%% @doc -spec reload() -> list() | {error, any()}.
%% Reload ACL.
%%
%% @end
%%------------------------------------------------------------------------------
reload() -> reload() ->
case ets:lookup(?ACL_TABLE, acl_modules) of case ets:lookup(?ACL_TABLE, acl_modules) of
[] -> {error, "No ACL mod!"}; [] ->
[{_, Mods}] -> [M:reload_acl() || M <- Mods] {error, "No ACL modules!"};
[{_, AclMods}] ->
[M:reload_acl(State) || {M, State} <- AclMods]
end. end.
%%------------------------------------------------------------------------------ %% @doc Register ACL Module.
%% @doc -spec register_mod(AclMod :: atom(), Opts :: list()) -> ok | {error, any()}.
%% Register ACL Module. register_mod(AclMod, Opts) ->
%% gen_server:call(?SERVER, {register_mod, AclMod, Opts}).
%% @end
%%------------------------------------------------------------------------------
-spec register_mod(Mod :: atom()) -> ok | {error, any()}.
register_mod(Mod) ->
gen_server:call(?SERVER, {register_mod, Mod}).
%%------------------------------------------------------------------------------ %% @doc Unregister ACL Module.
%% @doc -spec unregister_mod(AclMod :: atom()) -> ok | {error, any()}.
%% Unregister ACL Module. unregister_mod(AclMod) ->
%% gen_server:call(?SERVER, {unregister_mod, AclMod}).
%% @end
%%------------------------------------------------------------------------------
-spec unregister_mod(Mod :: atom()) -> ok | {error, any()}.
unregister_mod(Mod) ->
gen_server:cast(?SERVER, {unregister_mod, Mod}).
%%------------------------------------------------------------------------------ %% @doc All ACL Modules.
%% @doc -spec all_modules() -> list().
%% All ACL Modules.
%%
%% @end
%%------------------------------------------------------------------------------
all_modules() -> all_modules() ->
case ets:lookup(?ACL_TABLE, acl_modules) of case ets:lookup(?ACL_TABLE, acl_modules) of
[] -> []; [] -> [];
[{_, Mods}] -> Mods [{_, AclMods}] -> AclMods
end. end.
%% @doc Stop ACL server.
-spec stop() -> ok.
stop() -> stop() ->
gen_server:call(?SERVER, stop). gen_server:call(?SERVER, stop).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks. %%% gen_server callbacks.
%%%============================================================================= %%%=============================================================================
init([_AclOpts]) -> init([AclMods]) ->
ets:new(?ACL_TABLE, [set, protected, named_table]), ets:new(?ACL_TABLE, [set, protected, named_table]),
AclMods1 = lists:map(
fun({M, Opts}) ->
AclMod = aclmod(M),
{ok, State} = AclMod:init(Opts),
{AclMod, State}
end, AclMods),
ets:insert(?ACL_TABLE, {acl_modules, AclMods1}),
{ok, state}. {ok, state}.
handle_call({register_mod, Mod}, _From, State) -> handle_call({register_mod, Mod, Opts}, _From, State) ->
Mods = all_modules(), AclMods = all_modules(),
case lists:member(Mod, Mods) of Reply =
true -> case lists:keyfind(Mod, 1, AclMods) of
{reply, {error, existed}, State};
false -> false ->
ets:insert(?ACL_TABLE, {acl_modules, [Mod | Mods]}), case catch Mod:init(Opts) of
{reply, ok, State} {ok, ModState} ->
ets:insert(?ACL_TABLE, {acl_modules, [{Mod, ModState}|AclMods]}),
ok;
{'EXIT', Error} ->
{error, Error}
end; end;
_ ->
{error, existed}
end,
{reply, Reply, State};
handle_call({unregister_mod, Mod}, _From, State) ->
AclMods = all_modules(),
Reply =
case lists:keyfind(Mod, 1, AclMods) of
false ->
{error, not_found};
_ ->
ets:insert(?ACL_TABLE, {acl_modules, lists:keydelete(Mod, 1, AclMods)}), ok
end,
{reply, Reply, State};
handle_call(stop, _From, State) -> handle_call(stop, _From, State) ->
{stop, normal, ok, State}; {stop, normal, ok, State};
@ -186,16 +186,6 @@ handle_call(Req, _From, State) ->
lager:error("Bad Request: ~p", [Req]), lager:error("Bad Request: ~p", [Req]),
{reply, {error, badreq}, State}. {reply, {error, badreq}, State}.
handle_cast({unregister_mod, Mod}, State) ->
Mods = all_modules(),
case lists:member(Mod, Mods) of
true ->
ets:insert(?ACL_TABLE, {acl_modules, lists:delete(Mod, Mods)});
false ->
lager:error("unknown acl module: ~s", [Mod])
end,
{noreply, State};
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
@ -212,3 +202,7 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
aclmod(Name) when is_atom(Name) ->
list_to_atom(lists:concat(["emqttd_acl_", Name])).

View File

@ -30,62 +30,76 @@
-include("emqttd.hrl"). -include("emqttd.hrl").
-export([start_link/1, stop/0]). -export([all_rules/0]).
-behaviour(emqttd_acl). -behaviour(emqttd_acl).
%% ACL callbacks %% ACL callbacks
-export([check_acl/3, reload_acl/0, description/0]). -export([init/1, check_acl/2, reload_acl/1, description/0]).
-behaviour(gen_server).
-define(SERVER, ?MODULE).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(ACL_RULE_TABLE, mqtt_acl_rule). -define(ACL_RULE_TABLE, mqtt_acl_rule).
-record(state, {acl_file, raw_rules = []}). -record(state, {acl_file, nomatch = allow}).
%%%============================================================================= %%%=============================================================================
%%% API %%% API
%%%============================================================================= %%%=============================================================================
%%------------------------------------------------------------------------------ %% @doc Read all rules.
%% @doc -spec all_rules() -> list(emqttd_access_rule:rule()).
%% Start Internal ACL Server. all_rules() ->
%% case ets:lookup(?ACL_RULE_TABLE, all_rules) of
%% @end [] -> [];
%%------------------------------------------------------------------------------ [{_, Rules}] -> Rules
-spec start_link(AclOpts) -> {ok, pid()} | ignore | {error, any()} when end.
AclOpts :: [{file, list()}].
start_link(AclOpts) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [AclOpts], []).
stop() ->
gen_server:call(?SERVER, stop).
%%%============================================================================= %%%=============================================================================
%%% ACL callbacks %%% ACL callbacks
%%%============================================================================= %%%=============================================================================
%%------------------------------------------------------------------------------ %% @doc init internal ACL.
%% @doc -spec init(AclOpts :: list()) -> {ok, State :: any()}.
%% Check ACL. init(AclOpts) ->
%% ets:new(?ACL_RULE_TABLE, [set, public, named_table]),
%% @end AclFile = proplists:get_value(file, AclOpts),
%%------------------------------------------------------------------------------ Default = proplists:get_value(nomatch, AclOpts, allow),
-spec check_acl(User, PubSub, Topic) -> allow | deny | ignore when State = #state{acl_file = AclFile, nomatch = Default},
load_rules(State),
{ok, State}.
load_rules(#state{acl_file = AclFile}) ->
{ok, Terms} = file:consult(AclFile),
Rules = [emqttd_access_rule:compile(Term) || Term <- Terms],
lists:foreach(fun(PubSub) ->
ets:insert(?ACL_RULE_TABLE, {PubSub,
lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)})
end, [publish, subscribe]),
ets:insert(?ACL_RULE_TABLE, {all_rules, Terms}).
filter(_PubSub, {allow, all}) ->
true;
filter(_PubSub, {deny, all}) ->
true;
filter(publish, {_AllowDeny, _Who, publish, _Topics}) ->
true;
filter(_PubSub, {_AllowDeny, _Who, pubsub, _Topics}) ->
true;
filter(subscribe, {_AllowDeny, _Who, subscribe, _Topics}) ->
true;
filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
false.
%% @doc Check ACL.
-spec check_acl({User, PubSub, Topic}, State) -> allow | deny | ignore when
User :: mqtt_user(), User :: mqtt_user(),
PubSub :: pubsub(), PubSub :: pubsub(),
Topic :: binary(). Topic :: binary(),
check_acl(User, PubSub, Topic) -> State :: #state{}.
check_acl({User, PubSub, Topic}, #state{nomatch = Default}) ->
case match(User, Topic, lookup(PubSub)) of case match(User, Topic, lookup(PubSub)) of
{matched, allow} -> allow; {matched, allow} -> allow;
{matched, deny} -> deny; {matched, deny} -> deny;
nomatch -> ignore nomatch -> Default
end. end.
lookup(PubSub) -> lookup(PubSub) ->
@ -103,89 +117,16 @@ match(User, Topic, [Rule|Rules]) ->
{matched, AllowDeny} -> {matched, AllowDeny} {matched, AllowDeny} -> {matched, AllowDeny}
end. end.
%%------------------------------------------------------------------------------ %% @doc Reload ACL.
%% @doc -spec reload_acl(State :: #state{}) -> ok | {error, Reason :: any()}.
%% Reload ACL. reload_acl(State) ->
%% case catch load_rules(State) of
%% @end {'EXIT', Error} -> {error, Error};
%%------------------------------------------------------------------------------ _ -> ok
-spec reload_acl() -> ok. end.
reload_acl() ->
gen_server:call(?SERVER, reload).
%%------------------------------------------------------------------------------ %% @doc ACL Description.
%% @doc
%% ACL Description.
%%
%% @end
%%------------------------------------------------------------------------------
-spec description() -> string(). -spec description() -> string().
description() -> description() ->
"Internal ACL with etc/acl.config". "Internal ACL with etc/acl.config".
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([AclOpts]) ->
ets:new(?ACL_RULE_TABLE, [set, protected, named_table]),
AclFile = proplists:get_value(file, AclOpts),
{ok, State} = load_rules(#state{acl_file = AclFile}),
emqttd_acl:register_mod(?MODULE),
{ok, State}.
handle_call(reload, _From, State) ->
case catch load_rules(State) of
{ok, NewState} ->
{reply, ok, NewState};
{'EXIT', Error} ->
{reply, {error, Error}, State}
end;
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(Req, _From, State) ->
lager:error("BadReq: ~p", [Req]),
{reply, {error, badreq}, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
emqttd_acl:unregister_mod(?MODULE),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
load_rules(State = #state{acl_file = AclFile}) ->
{ok, Terms} = file:consult(AclFile),
Rules = [emqttd_access_rule:compile(Term) || Term <- Terms],
lists:foreach(fun(PubSub) ->
ets:insert(?ACL_RULE_TABLE, {PubSub,
lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)})
end, [publish, subscribe]),
{ok, State#state{raw_rules = Terms}}.
filter(_PubSub, {allow, all}) ->
true;
filter(_PubSub, {deny, all}) ->
true;
filter(publish, {_AllowDeny, _Who, publish, _Topics}) ->
true;
filter(_PubSub, {_AllowDeny, _Who, pubsub, _Topics}) ->
true;
filter(subscribe, {_AllowDeny, _Who, subscribe, _Topics}) ->
true;
filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
false.

View File

@ -136,8 +136,7 @@ service(auth) ->
service(acl) -> service(acl) ->
{ok, AclOpts} = application:get_env(acl), {ok, AclOpts} = application:get_env(acl),
[{"emqttd acl", emqttd_acl, AclOpts}, {"emqttd acl", emqttd_acl, AclOpts};
{"emqttd internal acl", emqttd_acl_internal, AclOpts}];
service(monitor) -> service(monitor) ->
{"emqttd monitor", emqttd_monitor}. {"emqttd monitor", emqttd_monitor}.

View File

@ -96,15 +96,7 @@ received(Packet = ?PACKET(_Type), State = #proto_state{peername = Peername,
lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]), lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]),
case validate_packet(Packet) of case validate_packet(Packet) of
ok -> ok ->
case access_control(Packet, State) of
{ok, allow} ->
handle(Packet, State); handle(Packet, State);
{ok, deny} ->
{error, acl_denied, State};
{error, AclError} ->
lager:error("Client ~s@~s: acl error - ~p", [ClientId, emqttd_net:format(Peername), AclError]),
{error, acl_error, State}
end;
{error, Reason} -> {error, Reason} ->
{error, Reason, State} {error, Reason, State}
end. end.
@ -149,20 +141,37 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
{ok, Session} = emqttd_session:start({CleanSess, ClientId, self()}), {ok, Session} = emqttd_session:start({CleanSess, ClientId, self()}),
{ok, State2#proto_state{session = Session}}; {ok, State2#proto_state{session = Session}};
handle(Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload), handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
State = #proto_state{session = Session}) -> State = #proto_state{client_id = ClientId, session = Session}) ->
emqttd_session:publish(Session, {?QOS_0, emqttd_message:from_packet(Packet)}), case emqttd_acl:check({mqtt_user(State), publish, Topic}) of
allow ->
emqttd_session:publish(Session, {?QOS_0, emqttd_message:from_packet(Packet)});
deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic])
end,
{ok, State}; {ok, State};
handle(Packet = ?PUBLISH_PACKET(?QOS_1, _Topic, PacketId, _Payload), handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
State = #proto_state{session = Session}) -> State = #proto_state{client_id = ClientId, session = Session}) ->
case emqttd_acl:check({mqtt_user(State), publish, Topic}) of
allow ->
emqttd_session:publish(Session, {?QOS_1, emqttd_message:from_packet(Packet)}), emqttd_session:publish(Session, {?QOS_1, emqttd_message:from_packet(Packet)}),
send(?PUBACK_PACKET(?PUBACK, PacketId), State); send(?PUBACK_PACKET(?PUBACK, PacketId), State);
deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]),
{ok, State}
end;
handle(Packet = ?PUBLISH_PACKET(?QOS_2, _Topic, PacketId, _Payload), handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload),
State = #proto_state{session = Session}) -> State = #proto_state{client_id = ClientId, session = Session}) ->
case emqttd_acl:check({mqtt_user(State), publish, Topic}) of
allow ->
NewSession = emqttd_session:publish(Session, {?QOS_2, emqttd_message:from_packet(Packet)}), NewSession = emqttd_session:publish(Session, {?QOS_2, emqttd_message:from_packet(Packet)}),
send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession}); send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession});
deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]),
{ok, State}
end;
handle(?PUBACK_PACKET(Type, PacketId), State = #proto_state{session = Session}) handle(?PUBACK_PACKET(Type, PacketId), State = #proto_state{session = Session})
when Type >= ?PUBACK andalso Type =< ?PUBCOMP -> when Type >= ?PUBACK andalso Type =< ?PUBCOMP ->
@ -179,8 +188,15 @@ handle(?PUBACK_PACKET(Type, PacketId), State = #proto_state{session = Session})
{ok, NewState}; {ok, NewState};
handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) -> handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->
AllowDenies = [emqttd_acl:check({mqtt_user(State), subscribe, Topic}) || {Topic, _Qos} <- TopicTable],
case lists:member(deny, AllowDenies) of
true ->
%%TODO: return 128 QoS when deny...
{ok, State};
false ->
{ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable),
send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession}); send(?SUBACK_PACKET(PacketId, GrantedQos), State#proto_state{session = NewSession})
end;
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
{ok, NewSession} = emqttd_session:unsubscribe(Session, Topics), {ok, NewSession} = emqttd_session:unsubscribe(Session, Topics),
@ -218,9 +234,7 @@ send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername
Transport:send(Sock, Data), Transport:send(Sock, Data),
{ok, State}. {ok, State}.
%%
%% @doc redeliver PUBREL PacketId %% @doc redeliver PUBREL PacketId
%%
redeliver({?PUBREL, PacketId}, State) -> redeliver({?PUBREL, PacketId}, State) ->
send(?PUBREL_PACKET(PacketId), State). send(?PUBREL_PACKET(PacketId), State).
@ -238,7 +252,8 @@ clientid(<<>>, #proto_state{peername = Peername}) ->
clientid(ClientId, _State) -> ClientId. clientid(ClientId, _State) -> ClientId.
%%---------------------------------------------------------------------------- mqtt_user(#proto_state{peername = {Addr, _Port}, client_id = ClientId, username = Username}) ->
#mqtt_user{username = Username, clientid = ClientId, ipaddr = Addr}.
send_willmsg(undefined) -> ignore; send_willmsg(undefined) -> ignore;
%%TODO:should call session... %%TODO:should call session...
@ -316,36 +331,6 @@ validate_qos(undefined) -> true;
validate_qos(Qos) when Qos =< ?QOS_2 -> true; validate_qos(Qos) when Qos =< ?QOS_2 -> true;
validate_qos(_) -> false. validate_qos(_) -> false.
access_control(publish, Topic, State = #proto_state{client_id = ClientId}) ->
case emqttd_acl:check(mqtt_user(State), publish, Topic) of
{ok, allow} ->
allow;
{ok, deny} ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), deny;
{error, AclError} ->
lager:error("ACL Error: ~p when ~s publish to ~s", [AclError, ClientId, Topic]), deny
end.
access_control(?SUBSCRIBE_PACKET(_PacketId, TopicTable), State) ->
check_acl(mqtt_user(State), subscribe, [Topic || {Topic, _Qos} <- TopicTable]);
mqtt_user(#proto_state{peername = {Addr, _Port}, client_id = ClientId, username = Username}) ->
#mqtt_user{username = Username, clientid = ClientId, ipaddr = Addr}.
check_acl(_User, subscribe, []) ->
{ok, allow};
check_acl(User = #mqtt_user{clientid=ClientId}, subscribe, [Topic|Topics]) ->
case emqttd_acl:check(User, subscribe, Topic) of
{ok, allow} ->
check_acl(User, subscribe, Topics);
{ok, deny} ->
lager:warning("ACL Deny: ~s cannnot subscribe ~s", [ClientId, Topic]),
{ok, deny};
{error, Error} ->
{error, Error}
end.
try_unregister(undefined, _) -> ok; try_unregister(undefined, _) -> ok;
try_unregister(ClientId, _) -> emqttd_cm:unregister(ClientId, self()). try_unregister(ClientId, _) -> emqttd_cm:unregister(ClientId, self()).