chore(authorization): moves authorization configuration items from zone to root
This commit is contained in:
parent
87881621bb
commit
5652917af6
|
@ -88,6 +88,48 @@ broker {
|
|||
perf.trie_compaction = true
|
||||
}
|
||||
|
||||
|
||||
authorization {
|
||||
## Behaviour after not matching a rule.
|
||||
##
|
||||
## @doc authorization.no_match
|
||||
## ValueType: allow | deny
|
||||
## Default: allow
|
||||
no_match: allow
|
||||
|
||||
## The action when authorization check reject current operation
|
||||
##
|
||||
## @doc authorization.deny_action
|
||||
## ValueType: ignore | disconnect
|
||||
## Default: ignore
|
||||
deny_action: ignore
|
||||
|
||||
## Whether to enable Authorization cache.
|
||||
##
|
||||
## If enabled, Authorization roles for each client will be cached in the memory
|
||||
##
|
||||
## @doc authorization.cache.enable
|
||||
## ValueType: Boolean
|
||||
## Default: true
|
||||
cache.enable: true
|
||||
|
||||
## The maximum count of Authorization entries can be cached for a client.
|
||||
##
|
||||
## @doc authorization.cache.max_size
|
||||
## ValueType: Integer
|
||||
## Range: [0, 1048576]
|
||||
## Default: 32
|
||||
cache.max_size: 32
|
||||
|
||||
## The time after which an Authorization cache entry will be deleted
|
||||
##
|
||||
## @doc authorization.cache.ttl
|
||||
## ValueType: Duration
|
||||
## Default: 1m
|
||||
cache.ttl: 1m
|
||||
}
|
||||
|
||||
|
||||
##==================================================================
|
||||
## Zones and Listeners
|
||||
##==================================================================
|
||||
|
@ -114,7 +156,6 @@ broker {
|
|||
## - `auth.*`
|
||||
## - `stats.*`
|
||||
## - `mqtt.*`
|
||||
## - `authorization.*`
|
||||
## - `flapping_detect.*`
|
||||
## - `force_shutdown.*`
|
||||
## - `conn_congestion.*`
|
||||
|
@ -396,47 +437,6 @@ zones.default {
|
|||
|
||||
}
|
||||
|
||||
authorization {
|
||||
|
||||
## Enable Authorization check.
|
||||
##
|
||||
## @doc zones.<name>.authorization.enable
|
||||
## ValueType: Boolean
|
||||
## Default: true
|
||||
enable = true
|
||||
|
||||
## The action when authorization check reject current operation
|
||||
##
|
||||
## @doc zones.<name>.authorization.deny_action
|
||||
## ValueType: ignore | disconnect
|
||||
## Default: ignore
|
||||
deny_action = ignore
|
||||
|
||||
## Whether to enable Authorization cache.
|
||||
##
|
||||
## If enabled, Authorization roles for each client will be cached in the memory
|
||||
##
|
||||
## @doc zones.<name>.authorization.cache.enable
|
||||
## ValueType: Boolean
|
||||
## Default: true
|
||||
cache.enable = true
|
||||
|
||||
## The maximum count of Authorization entries can be cached for a client.
|
||||
##
|
||||
## @doc zones.<name>.authorization.cache.max_size
|
||||
## ValueType: Integer
|
||||
## Range: [0, 1048576]
|
||||
## Default: 32
|
||||
cache.max_size = 32
|
||||
|
||||
## The time after which an Authorization cache entry will be deleted
|
||||
##
|
||||
## @doc zones.<name>.authorization.cache.ttl
|
||||
## ValueType: Duration
|
||||
## Default: 1m
|
||||
cache.ttl = 1m
|
||||
}
|
||||
|
||||
flapping_detect {
|
||||
## Enable Flapping Detection.
|
||||
##
|
||||
|
@ -1158,7 +1158,6 @@ zones.default {
|
|||
#This is an example zone which has less "strict" settings.
|
||||
#It's useful to clients connecting the broker from trusted networks.
|
||||
zones.internal {
|
||||
authorization.enable = true
|
||||
auth.enable = false
|
||||
listeners.mqtt_internal {
|
||||
type = tcp
|
||||
|
|
|
@ -39,23 +39,24 @@ authenticate(Credential) ->
|
|||
%% @doc Check Authorization
|
||||
-spec authorize(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic())
|
||||
-> allow | deny.
|
||||
authorize(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
|
||||
case emqx_authz_cache:is_enabled(Zone) of
|
||||
authorize(ClientInfo, PubSub, Topic) ->
|
||||
case emqx_authz_cache:is_enabled() of
|
||||
true -> check_authorization_cache(ClientInfo, PubSub, Topic);
|
||||
false -> do_authorize(ClientInfo, PubSub, Topic)
|
||||
end.
|
||||
|
||||
check_authorization_cache(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
|
||||
case emqx_authz_cache:get_authz_cache(Zone, PubSub, Topic) of
|
||||
check_authorization_cache(ClientInfo, PubSub, Topic) ->
|
||||
case emqx_authz_cache:get_authz_cache(PubSub, Topic) of
|
||||
not_found ->
|
||||
AuthzResult = do_authorize(ClientInfo, PubSub, Topic),
|
||||
emqx_authz_cache:put_authz_cache(Zone, PubSub, Topic, AuthzResult),
|
||||
emqx_authz_cache:put_authz_cache(PubSub, Topic, AuthzResult),
|
||||
AuthzResult;
|
||||
AuthzResult -> AuthzResult
|
||||
end.
|
||||
|
||||
do_authorize(ClientInfo, PubSub, Topic) ->
|
||||
case run_hooks('client.authorize', [ClientInfo, PubSub, Topic], allow) of
|
||||
NoMatch = emqx:get_config([authorization, no_match], allow),
|
||||
case run_hooks('client.authorize', [ClientInfo, PubSub, Topic], NoMatch) of
|
||||
allow -> allow;
|
||||
_Other -> deny
|
||||
end.
|
||||
|
|
|
@ -18,15 +18,15 @@
|
|||
|
||||
-include("emqx.hrl").
|
||||
|
||||
-export([ list_authz_cache/1
|
||||
, get_authz_cache/3
|
||||
, put_authz_cache/4
|
||||
, cleanup_authz_cache/1
|
||||
-export([ list_authz_cache/0
|
||||
, get_authz_cache/2
|
||||
, put_authz_cache/3
|
||||
, cleanup_authz_cache/0
|
||||
, empty_authz_cache/0
|
||||
, dump_authz_cache/0
|
||||
, get_cache_max_size/1
|
||||
, get_cache_ttl/1
|
||||
, is_enabled/1
|
||||
, get_cache_max_size/0
|
||||
, get_cache_ttl/0
|
||||
, is_enabled/0
|
||||
, drain_cache/0
|
||||
]).
|
||||
|
||||
|
@ -50,45 +50,45 @@ cache_k(PubSub, Topic)-> {PubSub, Topic}.
|
|||
cache_v(AuthzResult)-> {AuthzResult, time_now()}.
|
||||
drain_k() -> {?MODULE, drain_timestamp}.
|
||||
|
||||
-spec(is_enabled(atom()) -> boolean()).
|
||||
is_enabled(Zone) ->
|
||||
emqx_config:get_zone_conf(Zone, [authorization, cache, enable]).
|
||||
-spec(is_enabled() -> boolean()).
|
||||
is_enabled() ->
|
||||
emqx:get_config([authorization, cache, enable], false).
|
||||
|
||||
-spec(get_cache_max_size(atom()) -> integer()).
|
||||
get_cache_max_size(Zone) ->
|
||||
emqx_config:get_zone_conf(Zone, [authorization, cache, max_size]).
|
||||
-spec(get_cache_max_size() -> integer()).
|
||||
get_cache_max_size() ->
|
||||
emqx:get_config([authorization, cache, max_size]).
|
||||
|
||||
-spec(get_cache_ttl(atom()) -> integer()).
|
||||
get_cache_ttl(Zone) ->
|
||||
emqx_config:get_zone_conf(Zone, [authorization, cache, ttl]).
|
||||
-spec(get_cache_ttl() -> integer()).
|
||||
get_cache_ttl() ->
|
||||
emqx:get_config([authorization, cache, ttl]).
|
||||
|
||||
-spec(list_authz_cache(atom()) -> [authz_cache_entry()]).
|
||||
list_authz_cache(Zone) ->
|
||||
cleanup_authz_cache(Zone),
|
||||
-spec(list_authz_cache() -> [authz_cache_entry()]).
|
||||
list_authz_cache() ->
|
||||
cleanup_authz_cache(),
|
||||
map_authz_cache(fun(Cache) -> Cache end).
|
||||
|
||||
%% We'll cleanup the cache before replacing an expired authz.
|
||||
-spec get_authz_cache(atom(), emqx_types:pubsub(), emqx_topic:topic()) ->
|
||||
-spec get_authz_cache(emqx_types:pubsub(), emqx_topic:topic()) ->
|
||||
authz_result() | not_found.
|
||||
get_authz_cache(Zone, PubSub, Topic) ->
|
||||
get_authz_cache(PubSub, Topic) ->
|
||||
case erlang:get(cache_k(PubSub, Topic)) of
|
||||
undefined -> not_found;
|
||||
{AuthzResult, CachedAt} ->
|
||||
if_expired(get_cache_ttl(Zone), CachedAt,
|
||||
if_expired(get_cache_ttl(), CachedAt,
|
||||
fun(false) ->
|
||||
AuthzResult;
|
||||
(true) ->
|
||||
cleanup_authz_cache(Zone),
|
||||
cleanup_authz_cache(),
|
||||
not_found
|
||||
end)
|
||||
end.
|
||||
|
||||
%% If the cache get full, and also the latest one
|
||||
%% is expired, then delete all the cache entries
|
||||
-spec put_authz_cache(atom(), emqx_types:pubsub(), emqx_topic:topic(), authz_result())
|
||||
-spec put_authz_cache(emqx_types:pubsub(), emqx_topic:topic(), authz_result())
|
||||
-> ok.
|
||||
put_authz_cache(Zone, PubSub, Topic, AuthzResult) ->
|
||||
MaxSize = get_cache_max_size(Zone), true = (MaxSize =/= 0),
|
||||
put_authz_cache(PubSub, Topic, AuthzResult) ->
|
||||
MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
|
||||
Size = get_cache_size(),
|
||||
case Size < MaxSize of
|
||||
true ->
|
||||
|
@ -96,7 +96,7 @@ put_authz_cache(Zone, PubSub, Topic, AuthzResult) ->
|
|||
false ->
|
||||
NewestK = get_newest_key(),
|
||||
{_AuthzResult, CachedAt} = erlang:get(NewestK),
|
||||
if_expired(get_cache_ttl(Zone), CachedAt,
|
||||
if_expired(get_cache_ttl(), CachedAt,
|
||||
fun(true) ->
|
||||
% all cache expired, cleanup first
|
||||
empty_authz_cache(),
|
||||
|
@ -123,10 +123,10 @@ evict_authz_cache() ->
|
|||
decr_cache_size().
|
||||
|
||||
%% cleanup all the expired cache entries
|
||||
-spec(cleanup_authz_cache(atom()) -> ok).
|
||||
cleanup_authz_cache(Zone) ->
|
||||
-spec(cleanup_authz_cache() -> ok).
|
||||
cleanup_authz_cache() ->
|
||||
keys_queue_set(
|
||||
cleanup_authz(get_cache_ttl(Zone), keys_queue_get())).
|
||||
cleanup_authz(get_cache_ttl(), keys_queue_get())).
|
||||
|
||||
get_oldest_key() ->
|
||||
keys_queue_pick(queue_front()).
|
||||
|
@ -143,8 +143,8 @@ dump_authz_cache() ->
|
|||
map_authz_cache(fun(Cache) -> Cache end).
|
||||
|
||||
map_authz_cache(Fun) ->
|
||||
[Fun(R) || R = {{SubPub, _T}, _Authz} <- get(), SubPub =:= publish
|
||||
orelse SubPub =:= subscribe].
|
||||
[Fun(R) || R = {{SubPub, _T}, _Authz} <- erlang:get(),
|
||||
SubPub =:= publish orelse SubPub =:= subscribe].
|
||||
foreach_authz_cache(Fun) ->
|
||||
_ = map_authz_cache(Fun),
|
||||
ok.
|
||||
|
|
|
@ -425,7 +425,7 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S
|
|||
end;
|
||||
|
||||
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
||||
Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) ->
|
||||
Channel = #channel{clientinfo = ClientInfo}) ->
|
||||
case emqx_packet:check(Packet) of
|
||||
ok ->
|
||||
TopicFilters0 = parse_topic_filters(TopicFilters),
|
||||
|
@ -434,7 +434,7 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|||
HasAuthzDeny = lists:any(fun({_TopicFilter, ReasonCode}) ->
|
||||
ReasonCode =:= ?RC_NOT_AUTHORIZED
|
||||
end, TupleTopicFilters0),
|
||||
DenyAction = emqx_config:get_zone_conf(Zone, [authorization, deny_action]),
|
||||
DenyAction = emqx:get_config([authorization, deny_action], ignore),
|
||||
case DenyAction =:= disconnect andalso HasAuthzDeny of
|
||||
true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
|
||||
false ->
|
||||
|
@ -536,8 +536,7 @@ process_connect(AckProps, Channel = #channel{conninfo = ConnInfo,
|
|||
%% Process Publish
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
||||
Channel = #channel{clientinfo = #{zone := Zone}}) ->
|
||||
process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
|
||||
case pipeline([fun check_quota_exceeded/2,
|
||||
fun process_alias/2,
|
||||
fun check_pub_alias/2,
|
||||
|
@ -550,7 +549,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
|||
{error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
|
||||
?LOG(warning, "Cannot publish message to ~s due to ~s.",
|
||||
[Topic, emqx_reason_codes:text(Rc)]),
|
||||
case emqx_config:get_zone_conf(Zone, [authorization, deny_action]) of
|
||||
case emqx:get_config([authorization, deny_action], ignore) of
|
||||
ignore ->
|
||||
case QoS of
|
||||
?QOS_0 -> {ok, NChannel};
|
||||
|
@ -955,9 +954,8 @@ handle_call({takeover, 'end'}, Channel = #channel{session = Session,
|
|||
AllPendings = lists:append(Delivers, Pendings),
|
||||
disconnect_and_shutdown(takeovered, AllPendings, Channel);
|
||||
|
||||
handle_call(list_authz_cache, #channel{clientinfo = #{zone := Zone}}
|
||||
= Channel) ->
|
||||
{reply, emqx_authz_cache:list_authz_cache(Zone), Channel};
|
||||
handle_call(list_authz_cache, Channel) ->
|
||||
{reply, emqx_authz_cache:list_authz_cache(), Channel};
|
||||
|
||||
handle_call({quota, Policy}, Channel) ->
|
||||
Zone = info(zone, Channel),
|
||||
|
@ -1420,8 +1418,7 @@ check_pub_alias(_Packet, _Channel) -> ok.
|
|||
|
||||
check_pub_authz(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
|
||||
#channel{clientinfo = ClientInfo}) ->
|
||||
case is_authz_enabled(ClientInfo) andalso
|
||||
emqx_access_control:authorize(ClientInfo, publish, Topic) of
|
||||
case emqx_access_control:authorize(ClientInfo, publish, Topic) of
|
||||
false -> ok;
|
||||
allow -> ok;
|
||||
deny -> {error, ?RC_NOT_AUTHORIZED}
|
||||
|
@ -1454,8 +1451,7 @@ check_sub_authzs([], _Channel, Acc) ->
|
|||
lists:reverse(Acc).
|
||||
|
||||
check_sub_authz(TopicFilter, #channel{clientinfo = ClientInfo}) ->
|
||||
case is_authz_enabled(ClientInfo) andalso
|
||||
emqx_access_control:authorize(ClientInfo, subscribe, TopicFilter) of
|
||||
case emqx_access_control:authorize(ClientInfo, subscribe, TopicFilter) of
|
||||
false -> allow;
|
||||
Result -> Result
|
||||
end.
|
||||
|
@ -1621,11 +1617,6 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
|
|||
_ -> shutdown(Reason, Channel)
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Is Authorization enabled?
|
||||
is_authz_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
|
||||
(not IsSuperuser) andalso emqx_config:get_zone_conf(Zone, [authorization, enable]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Parse Topic Filters
|
||||
|
||||
|
|
|
@ -70,7 +70,7 @@
|
|||
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
|
||||
-export([ssl/1]).
|
||||
|
||||
structs() -> ["zones", "listeners", "broker", "plugins", "sysmon", "alarm"].
|
||||
structs() -> ["zones", "listeners", "broker", "plugins", "sysmon", "alarm", "authorization"].
|
||||
|
||||
fields("stats") ->
|
||||
[ {"enable", t(boolean(), undefined, true)}
|
||||
|
@ -80,10 +80,10 @@ fields("auth") ->
|
|||
[ {"enable", t(boolean(), undefined, false)}
|
||||
];
|
||||
|
||||
fields("authorization_settings") ->
|
||||
[ {"enable", t(boolean(), undefined, true)}
|
||||
, {"cache", ref("authorization_cache")}
|
||||
fields("authorization") ->
|
||||
[ {"no_match", t(union(allow, deny), undefined, allow)}
|
||||
, {"deny_action", t(union(ignore, disconnect), undefined, ignore)}
|
||||
, {"cache", ref("authorization_cache")}
|
||||
];
|
||||
|
||||
fields("authorization_cache") ->
|
||||
|
@ -129,7 +129,6 @@ fields("zones") ->
|
|||
|
||||
fields("zone_settings") ->
|
||||
[ {"mqtt", ref("mqtt")}
|
||||
, {"authorization", ref("authorization_settings")}
|
||||
, {"auth", ref("auth")}
|
||||
, {"stats", ref("stats")}
|
||||
, {"flapping_detect", ref("flapping_detect")}
|
||||
|
|
|
@ -26,7 +26,6 @@ all() -> emqx_ct:all(?MODULE).
|
|||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
toggle_authz(true),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
|
@ -78,6 +77,3 @@ t_drain_authz_cache(_) ->
|
|||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||
?assert(length(gen_server:call(ClientPid, list_authz_cache)) > 0),
|
||||
emqtt:stop(Client).
|
||||
|
||||
toggle_authz(Bool) when is_boolean(Bool) ->
|
||||
emqx_config:put_zone_conf(default, [authorization, enable], Bool).
|
||||
|
|
Loading…
Reference in New Issue