support acl mods
This commit is contained in:
parent
e4e33c2d5d
commit
e742d73f70
|
@ -59,13 +59,6 @@
|
||||||
|
|
||||||
-type mqtt_session() :: #mqtt_session{}.
|
-type mqtt_session() :: #mqtt_session{}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% MQTT Retained Message
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-record(mqtt_retained, {topic, qos, payload}).
|
|
||||||
|
|
||||||
-type mqtt_retained() :: #mqtt_retained{}.
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT User Management
|
%% MQTT User Management
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -93,3 +86,10 @@
|
||||||
|
|
||||||
-record(mqtt_plugin, {name, version, attrs, description}).
|
-record(mqtt_plugin, {name, version, attrs, description}).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% MQTT Retained Message
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-record(mqtt_retained, {topic, qos, payload}).
|
||||||
|
|
||||||
|
-type mqtt_retained() :: #mqtt_retained{}.
|
||||||
|
|
||||||
|
|
|
@ -35,23 +35,6 @@
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
|
||||||
|
|
||||||
%% API Function Exports
|
|
||||||
-export([start_link/1, check/3, reload/0]).
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
|
||||||
|
|
||||||
-export([compile/1, match/3]).
|
|
||||||
|
|
||||||
-endif.
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
||||||
terminate/2, code_change/3]).
|
|
||||||
|
|
||||||
-type pubsub() :: subscribe | publish | pubsub.
|
-type pubsub() :: subscribe | publish | pubsub.
|
||||||
|
|
||||||
-type who() :: all | binary() |
|
-type who() :: all | binary() |
|
||||||
|
@ -64,12 +47,38 @@
|
||||||
{deny, all} |
|
{deny, all} |
|
||||||
{deny, who(), pubsub(), list(binary())}.
|
{deny, who(), pubsub(), list(binary())}.
|
||||||
|
|
||||||
-record(mqtt_acl, {pubsub :: pubsub(),
|
-export_type([pubsub/0]).
|
||||||
rules :: list(rule())}).
|
|
||||||
|
-callback check_acl(PubSub, User, Topic) -> {ok, allow | deny} | ignore | {error, any()} when
|
||||||
|
PubSub :: pubsub(),
|
||||||
|
User :: mqtt_user(),
|
||||||
|
Topic :: binary().
|
||||||
|
|
||||||
|
-callback reload_acl() -> ok | {error, any()}.
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
%% API Function Exports
|
||||||
|
-export([start_link/1, check/3, reload/0, register_mod/1, unregister_mod/1]).
|
||||||
|
|
||||||
|
%% ACL Callback
|
||||||
|
-export([check_acl/3, reload_acl/0]).
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
-export([compile/1, match/3]).
|
||||||
|
|
||||||
|
-endif.
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
-define(ACL_TAB, mqtt_acl).
|
-define(ACL_TAB, mqtt_acl).
|
||||||
|
|
||||||
-record(state, {raw_rules = []}).
|
-record(state, {acl_file, raw_rules = []}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -91,16 +100,50 @@ start_link(AclOpts) ->
|
||||||
User :: mqtt_user(),
|
User :: mqtt_user(),
|
||||||
Topic :: binary().
|
Topic :: binary().
|
||||||
check(PubSub, User, Topic) ->
|
check(PubSub, User, Topic) ->
|
||||||
case match(User, Topic, lookup(PubSub)) of
|
case ets:lookup(?ACL_TAB, acl_mods) of
|
||||||
nomatch -> allow;
|
[] -> {error, "No ACL mod!"};
|
||||||
{matched, allow} -> allow;
|
[{_, Mods}] -> check(PubSub, User, Topic, Mods)
|
||||||
{matched, deny} -> deny
|
|
||||||
end.
|
end.
|
||||||
|
check(_PubSub, _User, _Topic, []) ->
|
||||||
|
{error, "All ACL mods ignored!"};
|
||||||
|
check(PubSub, User, Topic, [Mod|Mods]) ->
|
||||||
|
case Mod:check_acl(PubSub, User, Topic) of
|
||||||
|
{ok, AllowDeny} -> AllowDeny;
|
||||||
|
ignore -> check(PubSub, User, Topic, Mods)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%TODO:
|
||||||
|
reload() ->
|
||||||
|
case ets:lookup(?ACL_TAB, acl_mods) of
|
||||||
|
[] -> {error, "No ACL mod!"};
|
||||||
|
[{_, Mods}] -> [M:reload() || M <- Mods]
|
||||||
|
end.
|
||||||
|
|
||||||
|
register_mod(Mod) ->
|
||||||
|
gen_server:call(?MODULE, {register_mod, Mod}).
|
||||||
|
|
||||||
|
unregister_mod(Mod) ->
|
||||||
|
gen_server:call(?MODULE, {unregister_mod, Mod}).
|
||||||
|
|
||||||
|
-spec check_acl(PubSub, User, Topic) -> {ok, allow} | {ok, deny} | ignore | {error, any()} when
|
||||||
|
PubSub :: pubsub(),
|
||||||
|
User :: mqtt_user(),
|
||||||
|
Topic :: binary().
|
||||||
|
check_acl(PubSub, User, Topic) ->
|
||||||
|
case match(User, Topic, lookup(PubSub)) of
|
||||||
|
{matched, allow} -> {ok, allow};
|
||||||
|
{matched, deny} -> {ok, deny};
|
||||||
|
nomatch -> {error, nomatch}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec reload_acl() -> ok.
|
||||||
|
reload_acl() ->
|
||||||
|
gen_server:call(?SERVER, reload).
|
||||||
|
|
||||||
lookup(PubSub) ->
|
lookup(PubSub) ->
|
||||||
case ets:lookup(?ACL_TAB, PubSub) of
|
case ets:lookup(?ACL_TAB, PubSub) of
|
||||||
[] -> [];
|
[] -> [];
|
||||||
[#mqtt_acl{pubsub = PubSub, rules = Rules}] -> Rules
|
[{PubSub, Rules}] -> Rules
|
||||||
end.
|
end.
|
||||||
|
|
||||||
match(_User, _Topic, []) ->
|
match(_User, _Topic, []) ->
|
||||||
|
@ -112,23 +155,23 @@ match(User, Topic, [Rule|Rules]) ->
|
||||||
{matched, AllowDeny} -> {matched, AllowDeny}
|
{matched, AllowDeny} -> {matched, AllowDeny}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec reload() -> ok.
|
|
||||||
reload() ->
|
|
||||||
gen_server:call(?SERVER, reload).
|
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
%% gen_server Function Definitions
|
%% gen_server Function Definitions
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
init([AclOpts]) ->
|
init([AclOpts]) ->
|
||||||
|
ets:new(?ACL_TAB, [set, protected, named_table]),
|
||||||
|
ets:insert(?ACL_TAB, {acl_mods, [?MODULE]}),
|
||||||
AclFile = proplists:get_value(file, AclOpts),
|
AclFile = proplists:get_value(file, AclOpts),
|
||||||
|
load_rules(#state{acl_file = AclFile}).
|
||||||
|
|
||||||
|
load_rules(State = #state{acl_file = AclFile}) ->
|
||||||
{ok, Terms} = file:consult(AclFile),
|
{ok, Terms} = file:consult(AclFile),
|
||||||
Rules = [compile(Term) || Term <- Terms],
|
Rules = [compile(Term) || Term <- Terms],
|
||||||
ets:new(?ACL_TAB, [set, protected, named_table, {keypos, 2}]),
|
|
||||||
lists:foreach(fun(PubSub) ->
|
lists:foreach(fun(PubSub) ->
|
||||||
ets:insert(?ACL_TAB, #mqtt_acl{pubsub = PubSub, rules =
|
ets:insert(?ACL_TAB, {PubSub,
|
||||||
lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)})
|
lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)})
|
||||||
end, [publish, subscribe]),
|
end, [publish, subscribe]),
|
||||||
{ok, #state{raw_rules = Terms}}.
|
{ok, State#state{raw_rules = Terms}}.
|
||||||
|
|
||||||
filter(_PubSub, {allow, all}) ->
|
filter(_PubSub, {allow, all}) ->
|
||||||
true;
|
true;
|
||||||
|
@ -144,7 +187,36 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
handle_call(reload, _From, State) ->
|
handle_call(reload, _From, State) ->
|
||||||
{reply, {error, unsupported}, State}.
|
case catch load_rules(State) of
|
||||||
|
{ok, NewState} ->
|
||||||
|
{reply, ok, NewState};
|
||||||
|
{'EXIT', Error} ->
|
||||||
|
{reply, {error, Error}, State}
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_call({register_mod, Mod}, _From, State) ->
|
||||||
|
[{_, Mods}] = ets:lookup(?ACL_TAB, acl_mods),
|
||||||
|
case lists:member(Mod, Mods) of
|
||||||
|
true ->
|
||||||
|
{reply, {error, registered}, State};
|
||||||
|
false ->
|
||||||
|
ets:insert(?ACL_TAB, {acl_mods, [Mod|Mods]}),
|
||||||
|
{reply, ok, State}
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_call({unregister_mod, Mod}, _From, State) ->
|
||||||
|
[{_, Mods}] = ets:lookup(?ACL_TAB, acl_mods),
|
||||||
|
case lists:member(Mod, Mods) of
|
||||||
|
true ->
|
||||||
|
ets:insert(?ACL_TAB, lists:delete(Mod, Mods)),
|
||||||
|
{reply, ok, State};
|
||||||
|
false ->
|
||||||
|
{reply, {error, not_found}, State}
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_call(Req, _From, State) ->
|
||||||
|
lager:error("Bad Request: ~p", [Req]),
|
||||||
|
{reply, {error, badreq}, State}.
|
||||||
|
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
Loading…
Reference in New Issue