Merge branch 'issue#1983' into emqx30
This commit is contained in:
commit
34370ef622
|
@ -34,16 +34,21 @@
|
||||||
-export([match/3]).
|
-export([match/3]).
|
||||||
|
|
||||||
-define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= deny))).
|
-define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= deny))).
|
||||||
|
-define(PUBSUB(A), ((A =:= subscribe) orelse (A =:= publish) orelse (A =:= pubsub))).
|
||||||
|
|
||||||
%% @doc Compile Access Rule.
|
%% @doc Compile Access Rule.
|
||||||
compile({A, all}) when ?ALLOW_DENY(A) ->
|
compile({A, all}) when ?ALLOW_DENY(A) ->
|
||||||
{A, all};
|
{A, all};
|
||||||
|
|
||||||
compile({A, Who, Access, Topic}) when ?ALLOW_DENY(A), is_binary(Topic) ->
|
compile({A, Who, Access, Topic}) when ?ALLOW_DENY(A), ?PUBSUB(Access), is_binary(Topic) ->
|
||||||
{A, compile(who, Who), Access, [compile(topic, Topic)]};
|
{A, compile(who, Who), Access, [compile(topic, Topic)]};
|
||||||
|
|
||||||
compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A) ->
|
compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A), ?PUBSUB(Access) ->
|
||||||
{A, compile(who, Who), Access, [compile(topic, Topic) || Topic <- TopicFilters]}.
|
{A, compile(who, Who), Access, [compile(topic, Topic) || Topic <- TopicFilters]};
|
||||||
|
|
||||||
|
compile(Rule) ->
|
||||||
|
emqx_logger:error("[ACCESS_RULE] Malformed rule: ~p", [Rule]),
|
||||||
|
{error, bad_rule}.
|
||||||
|
|
||||||
compile(who, all) ->
|
compile(who, all) ->
|
||||||
all;
|
all;
|
||||||
|
|
|
@ -46,18 +46,26 @@ all_rules() ->
|
||||||
-spec(init([File :: string()]) -> {ok, #{}}).
|
-spec(init([File :: string()]) -> {ok, #{}}).
|
||||||
init([File]) ->
|
init([File]) ->
|
||||||
_ = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]),
|
_ = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]),
|
||||||
true = load_rules_from_file(File),
|
ok = load_rules_from_file(File),
|
||||||
{ok, #{acl_file => File}}.
|
{ok, #{acl_file => File}}.
|
||||||
|
|
||||||
load_rules_from_file(AclFile) ->
|
load_rules_from_file(AclFile) ->
|
||||||
{ok, Terms} = file:consult(AclFile),
|
case file:consult(AclFile) of
|
||||||
|
{ok, Terms} ->
|
||||||
Rules = [emqx_access_rule:compile(Term) || Term <- Terms],
|
Rules = [emqx_access_rule:compile(Term) || Term <- Terms],
|
||||||
lists:foreach(fun(PubSub) ->
|
lists:foreach(fun(PubSub) ->
|
||||||
ets:insert(?ACL_RULE_TAB, {PubSub,
|
ets:insert(?ACL_RULE_TAB, {PubSub,
|
||||||
lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)})
|
lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)})
|
||||||
end, [publish, subscribe]),
|
end, [publish, subscribe]),
|
||||||
ets:insert(?ACL_RULE_TAB, {all_rules, Terms}).
|
ets:insert(?ACL_RULE_TAB, {all_rules, Terms}),
|
||||||
|
ok;
|
||||||
|
{error, Reason} ->
|
||||||
|
emqx_logger:error("[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]),
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
filter(_PubSub, {error, _}) ->
|
||||||
|
false;
|
||||||
filter(_PubSub, {allow, all}) ->
|
filter(_PubSub, {allow, all}) ->
|
||||||
true;
|
true;
|
||||||
filter(_PubSub, {deny, all}) ->
|
filter(_PubSub, {deny, all}) ->
|
||||||
|
@ -100,9 +108,11 @@ match(Credentials, Topic, [Rule|Rules]) ->
|
||||||
-spec(reload_acl(state()) -> ok | {error, term()}).
|
-spec(reload_acl(state()) -> ok | {error, term()}).
|
||||||
reload_acl(#{acl_file := AclFile}) ->
|
reload_acl(#{acl_file := AclFile}) ->
|
||||||
case catch load_rules_from_file(AclFile) of
|
case catch load_rules_from_file(AclFile) of
|
||||||
true ->
|
ok ->
|
||||||
emqx_logger:info("Reload acl_file ~s successfully", [AclFile]),
|
emqx_logger:info("Reload acl_file ~s successfully", [AclFile]),
|
||||||
ok;
|
ok;
|
||||||
|
{error, Error} ->
|
||||||
|
{error, Error};
|
||||||
{'EXIT', Error} ->
|
{'EXIT', Error} ->
|
||||||
{error, Error}
|
{error, Error}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -105,7 +105,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
ensure_expiry_timer(State) ->
|
ensure_expiry_timer(State) ->
|
||||||
State#{expiry_timer := emqx_misc:start_timer(timer:seconds(2), expire)}.
|
State#{expiry_timer := emqx_misc:start_timer(timer:seconds(1), expire)}.
|
||||||
-else.
|
-else.
|
||||||
ensure_expiry_timer(State) ->
|
ensure_expiry_timer(State) ->
|
||||||
State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}.
|
State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}.
|
||||||
|
|
|
@ -355,7 +355,8 @@ compile_rule(_) ->
|
||||||
{deny, all, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
|
{deny, all, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
|
||||||
compile({deny, all, subscribe, ["$SYS/#", "#"]}),
|
compile({deny, all, subscribe, ["$SYS/#", "#"]}),
|
||||||
{allow, all} = compile({allow, all}),
|
{allow, all} = compile({allow, all}),
|
||||||
{deny, all} = compile({deny, all}).
|
{deny, all} = compile({deny, all}),
|
||||||
|
{error, bad_rule} = compile({test, malformed}).
|
||||||
|
|
||||||
match_rule(_) ->
|
match_rule(_) ->
|
||||||
User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}},
|
User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}},
|
||||||
|
|
Loading…
Reference in New Issue