chore: rename acl to authz
Signed-off-by: zhanghongtong <rory-z@outlook.com>
This commit is contained in:
parent
eac9420170
commit
4c5b75f281
|
@ -1097,7 +1097,7 @@ zones.default {
|
||||||
|
|
||||||
authorization {
|
authorization {
|
||||||
|
|
||||||
## Enable ACL check.
|
## Enable Authorization check.
|
||||||
##
|
##
|
||||||
## @doc zones.<name>.authorization.enable
|
## @doc zones.<name>.authorization.enable
|
||||||
## ValueType: Boolean
|
## ValueType: Boolean
|
||||||
|
@ -1111,16 +1111,16 @@ zones.default {
|
||||||
## Default: ignore
|
## Default: ignore
|
||||||
deny_action: ignore
|
deny_action: ignore
|
||||||
|
|
||||||
## Whether to enable ACL cache.
|
## Whether to enable Authorization cache.
|
||||||
##
|
##
|
||||||
## If enabled, ACLs roles for each client will be cached in the memory
|
## If enabled, Authorization roles for each client will be cached in the memory
|
||||||
##
|
##
|
||||||
## @doc zones.<name>.authorization.cache.enable
|
## @doc zones.<name>.authorization.cache.enable
|
||||||
## ValueType: Boolean
|
## ValueType: Boolean
|
||||||
## Default: true
|
## Default: true
|
||||||
cache.enable: true
|
cache.enable: true
|
||||||
|
|
||||||
## The maximum count of ACL entries can be cached for a client.
|
## The maximum count of Authorization entries can be cached for a client.
|
||||||
##
|
##
|
||||||
## @doc zones.<name>.authorization.cache.max_size
|
## @doc zones.<name>.authorization.cache.max_size
|
||||||
## ValueType: Integer
|
## ValueType: Integer
|
||||||
|
@ -1128,7 +1128,7 @@ zones.default {
|
||||||
## Default: 32
|
## Default: 32
|
||||||
cache.max_size: 32
|
cache.max_size: 32
|
||||||
|
|
||||||
## The time after which an ACL cache entry will be deleted
|
## The time after which an Authorization cache entry will be deleted
|
||||||
##
|
##
|
||||||
## @doc zones.<name>.authorization.cache.ttl
|
## @doc zones.<name>.authorization.cache.ttl
|
||||||
## ValueType: Duration
|
## ValueType: Duration
|
||||||
|
|
|
@ -31,22 +31,22 @@
|
||||||
authenticate(Credential) ->
|
authenticate(Credential) ->
|
||||||
run_hooks('client.authenticate', [Credential], ok).
|
run_hooks('client.authenticate', [Credential], ok).
|
||||||
|
|
||||||
%% @doc Check ACL
|
%% @doc Check Authorization
|
||||||
-spec authorize(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic())
|
-spec authorize(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic())
|
||||||
-> allow | deny.
|
-> allow | deny.
|
||||||
authorize(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
|
authorize(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
|
||||||
case emqx_acl_cache:is_enabled(Zone) of
|
case emqx_authz_cache:is_enabled(Zone) of
|
||||||
true -> check_authorization_cache(ClientInfo, PubSub, Topic);
|
true -> check_authorization_cache(ClientInfo, PubSub, Topic);
|
||||||
false -> do_authorize(ClientInfo, PubSub, Topic)
|
false -> do_authorize(ClientInfo, PubSub, Topic)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_authorization_cache(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
|
check_authorization_cache(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
|
||||||
case emqx_acl_cache:get_acl_cache(Zone, PubSub, Topic) of
|
case emqx_authz_cache:get_authz_cache(Zone, PubSub, Topic) of
|
||||||
not_found ->
|
not_found ->
|
||||||
AclResult = do_authorize(ClientInfo, PubSub, Topic),
|
AuthzResult = do_authorize(ClientInfo, PubSub, Topic),
|
||||||
emqx_acl_cache:put_acl_cache(Zone, PubSub, Topic, AclResult),
|
emqx_authz_cache:put_authz_cache(Zone, PubSub, Topic, AuthzResult),
|
||||||
AclResult;
|
AuthzResult;
|
||||||
AclResult -> AclResult
|
AuthzResult -> AuthzResult
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_authorize(ClientInfo, PubSub, Topic) ->
|
do_authorize(ClientInfo, PubSub, Topic) ->
|
||||||
|
|
|
@ -14,16 +14,16 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_acl_cache).
|
-module(emqx_authz_cache).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
|
||||||
-export([ list_acl_cache/1
|
-export([ list_authz_cache/1
|
||||||
, get_acl_cache/3
|
, get_authz_cache/3
|
||||||
, put_acl_cache/4
|
, put_authz_cache/4
|
||||||
, cleanup_acl_cache/1
|
, cleanup_authz_cache/1
|
||||||
, empty_acl_cache/0
|
, empty_authz_cache/0
|
||||||
, dump_acl_cache/0
|
, dump_authz_cache/0
|
||||||
, get_cache_max_size/1
|
, get_cache_max_size/1
|
||||||
, get_cache_ttl/1
|
, get_cache_ttl/1
|
||||||
, is_enabled/1
|
, is_enabled/1
|
||||||
|
@ -38,16 +38,16 @@
|
||||||
, get_oldest_key/0
|
, get_oldest_key/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-type(acl_result() :: allow | deny).
|
-type(authz_result() :: allow | deny).
|
||||||
-type(system_time() :: integer()).
|
-type(system_time() :: integer()).
|
||||||
-type(cache_key() :: {emqx_types:pubsub(), emqx_types:topic()}).
|
-type(cache_key() :: {emqx_types:pubsub(), emqx_types:topic()}).
|
||||||
-type(cache_val() :: {acl_result(), system_time()}).
|
-type(cache_val() :: {authz_result(), system_time()}).
|
||||||
|
|
||||||
-type(acl_cache_entry() :: {cache_key(), cache_val()}).
|
-type(authz_cache_entry() :: {cache_key(), cache_val()}).
|
||||||
|
|
||||||
%% Wrappers for key and value
|
%% Wrappers for key and value
|
||||||
cache_k(PubSub, Topic)-> {PubSub, Topic}.
|
cache_k(PubSub, Topic)-> {PubSub, Topic}.
|
||||||
cache_v(AclResult)-> {AclResult, time_now()}.
|
cache_v(AuthzResult)-> {AuthzResult, time_now()}.
|
||||||
drain_k() -> {?MODULE, drain_timestamp}.
|
drain_k() -> {?MODULE, drain_timestamp}.
|
||||||
|
|
||||||
-spec(is_enabled(atom()) -> boolean()).
|
-spec(is_enabled(atom()) -> boolean()).
|
||||||
|
@ -62,71 +62,71 @@ get_cache_max_size(Zone) ->
|
||||||
get_cache_ttl(Zone) ->
|
get_cache_ttl(Zone) ->
|
||||||
emqx_config:get_zone_conf(Zone, [authorization, cache, ttl]).
|
emqx_config:get_zone_conf(Zone, [authorization, cache, ttl]).
|
||||||
|
|
||||||
-spec(list_acl_cache(atom()) -> [acl_cache_entry()]).
|
-spec(list_authz_cache(atom()) -> [authz_cache_entry()]).
|
||||||
list_acl_cache(Zone) ->
|
list_authz_cache(Zone) ->
|
||||||
cleanup_acl_cache(Zone),
|
cleanup_authz_cache(Zone),
|
||||||
map_acl_cache(fun(Cache) -> Cache end).
|
map_authz_cache(fun(Cache) -> Cache end).
|
||||||
|
|
||||||
%% We'll cleanup the cache before replacing an expired acl.
|
%% We'll cleanup the cache before replacing an expired authz.
|
||||||
-spec get_acl_cache(atom(), emqx_types:pubsub(), emqx_topic:topic()) ->
|
-spec get_authz_cache(atom(), emqx_types:pubsub(), emqx_topic:topic()) ->
|
||||||
acl_result() | not_found.
|
authz_result() | not_found.
|
||||||
get_acl_cache(Zone, PubSub, Topic) ->
|
get_authz_cache(Zone, PubSub, Topic) ->
|
||||||
case erlang:get(cache_k(PubSub, Topic)) of
|
case erlang:get(cache_k(PubSub, Topic)) of
|
||||||
undefined -> not_found;
|
undefined -> not_found;
|
||||||
{AclResult, CachedAt} ->
|
{AuthzResult, CachedAt} ->
|
||||||
if_expired(get_cache_ttl(Zone), CachedAt,
|
if_expired(get_cache_ttl(Zone), CachedAt,
|
||||||
fun(false) ->
|
fun(false) ->
|
||||||
AclResult;
|
AuthzResult;
|
||||||
(true) ->
|
(true) ->
|
||||||
cleanup_acl_cache(Zone),
|
cleanup_authz_cache(Zone),
|
||||||
not_found
|
not_found
|
||||||
end)
|
end)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% If the cache get full, and also the latest one
|
%% If the cache get full, and also the latest one
|
||||||
%% is expired, then delete all the cache entries
|
%% is expired, then delete all the cache entries
|
||||||
-spec put_acl_cache(atom(), emqx_types:pubsub(), emqx_topic:topic(), acl_result())
|
-spec put_authz_cache(atom(), emqx_types:pubsub(), emqx_topic:topic(), authz_result())
|
||||||
-> ok.
|
-> ok.
|
||||||
put_acl_cache(Zone, PubSub, Topic, AclResult) ->
|
put_authz_cache(Zone, PubSub, Topic, AuthzResult) ->
|
||||||
MaxSize = get_cache_max_size(Zone), true = (MaxSize =/= 0),
|
MaxSize = get_cache_max_size(Zone), true = (MaxSize =/= 0),
|
||||||
Size = get_cache_size(),
|
Size = get_cache_size(),
|
||||||
case Size < MaxSize of
|
case Size < MaxSize of
|
||||||
true ->
|
true ->
|
||||||
add_acl(PubSub, Topic, AclResult);
|
add_authz(PubSub, Topic, AuthzResult);
|
||||||
false ->
|
false ->
|
||||||
NewestK = get_newest_key(),
|
NewestK = get_newest_key(),
|
||||||
{_AclResult, CachedAt} = erlang:get(NewestK),
|
{_AuthzResult, CachedAt} = erlang:get(NewestK),
|
||||||
if_expired(get_cache_ttl(Zone), CachedAt,
|
if_expired(get_cache_ttl(Zone), CachedAt,
|
||||||
fun(true) ->
|
fun(true) ->
|
||||||
% all cache expired, cleanup first
|
% all cache expired, cleanup first
|
||||||
empty_acl_cache(),
|
empty_authz_cache(),
|
||||||
add_acl(PubSub, Topic, AclResult);
|
add_authz(PubSub, Topic, AuthzResult);
|
||||||
(false) ->
|
(false) ->
|
||||||
% cache full, perform cache replacement
|
% cache full, perform cache replacement
|
||||||
evict_acl_cache(),
|
evict_authz_cache(),
|
||||||
add_acl(PubSub, Topic, AclResult)
|
add_authz(PubSub, Topic, AuthzResult)
|
||||||
end)
|
end)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% delete all the acl entries
|
%% delete all the authz entries
|
||||||
-spec(empty_acl_cache() -> ok).
|
-spec(empty_authz_cache() -> ok).
|
||||||
empty_acl_cache() ->
|
empty_authz_cache() ->
|
||||||
foreach_acl_cache(fun({CacheK, _CacheV}) -> erlang:erase(CacheK) end),
|
foreach_authz_cache(fun({CacheK, _CacheV}) -> erlang:erase(CacheK) end),
|
||||||
set_cache_size(0),
|
set_cache_size(0),
|
||||||
keys_queue_set(queue:new()).
|
keys_queue_set(queue:new()).
|
||||||
|
|
||||||
%% delete the oldest acl entry
|
%% delete the oldest authz entry
|
||||||
-spec(evict_acl_cache() -> ok).
|
-spec(evict_authz_cache() -> ok).
|
||||||
evict_acl_cache() ->
|
evict_authz_cache() ->
|
||||||
OldestK = keys_queue_out(),
|
OldestK = keys_queue_out(),
|
||||||
erlang:erase(OldestK),
|
erlang:erase(OldestK),
|
||||||
decr_cache_size().
|
decr_cache_size().
|
||||||
|
|
||||||
%% cleanup all the expired cache entries
|
%% cleanup all the expired cache entries
|
||||||
-spec(cleanup_acl_cache(atom()) -> ok).
|
-spec(cleanup_authz_cache(atom()) -> ok).
|
||||||
cleanup_acl_cache(Zone) ->
|
cleanup_authz_cache(Zone) ->
|
||||||
keys_queue_set(
|
keys_queue_set(
|
||||||
cleanup_acl(get_cache_ttl(Zone), keys_queue_get())).
|
cleanup_authz(get_cache_ttl(Zone), keys_queue_get())).
|
||||||
|
|
||||||
get_oldest_key() ->
|
get_oldest_key() ->
|
||||||
keys_queue_pick(queue_front()).
|
keys_queue_pick(queue_front()).
|
||||||
|
@ -134,22 +134,22 @@ get_newest_key() ->
|
||||||
keys_queue_pick(queue_rear()).
|
keys_queue_pick(queue_rear()).
|
||||||
|
|
||||||
get_cache_size() ->
|
get_cache_size() ->
|
||||||
case erlang:get(acl_cache_size) of
|
case erlang:get(authz_cache_size) of
|
||||||
undefined -> 0;
|
undefined -> 0;
|
||||||
Size -> Size
|
Size -> Size
|
||||||
end.
|
end.
|
||||||
|
|
||||||
dump_acl_cache() ->
|
dump_authz_cache() ->
|
||||||
map_acl_cache(fun(Cache) -> Cache end).
|
map_authz_cache(fun(Cache) -> Cache end).
|
||||||
|
|
||||||
map_acl_cache(Fun) ->
|
map_authz_cache(Fun) ->
|
||||||
[Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish
|
[Fun(R) || R = {{SubPub, _T}, _Authz} <- get(), SubPub =:= publish
|
||||||
orelse SubPub =:= subscribe].
|
orelse SubPub =:= subscribe].
|
||||||
foreach_acl_cache(Fun) ->
|
foreach_authz_cache(Fun) ->
|
||||||
_ = map_acl_cache(Fun),
|
_ = map_authz_cache(Fun),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% All acl cache entries added before `drain_cache()` invocation will become expired
|
%% All authz cache entries added before `drain_cache()` invocation will become expired
|
||||||
drain_cache() ->
|
drain_cache() ->
|
||||||
_ = persistent_term:put(drain_k(), time_now()),
|
_ = persistent_term:put(drain_k(), time_now()),
|
||||||
ok.
|
ok.
|
||||||
|
@ -158,52 +158,52 @@ drain_cache() ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
add_acl(PubSub, Topic, AclResult) ->
|
add_authz(PubSub, Topic, AuthzResult) ->
|
||||||
K = cache_k(PubSub, Topic),
|
K = cache_k(PubSub, Topic),
|
||||||
V = cache_v(AclResult),
|
V = cache_v(AuthzResult),
|
||||||
case erlang:get(K) of
|
case erlang:get(K) of
|
||||||
undefined -> add_new_acl(K, V);
|
undefined -> add_new_authz(K, V);
|
||||||
{_AclResult, _CachedAt} ->
|
{_AuthzResult, _CachedAt} ->
|
||||||
update_acl(K, V)
|
update_authz(K, V)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
add_new_acl(K, V) ->
|
add_new_authz(K, V) ->
|
||||||
erlang:put(K, V),
|
erlang:put(K, V),
|
||||||
keys_queue_in(K),
|
keys_queue_in(K),
|
||||||
incr_cache_size().
|
incr_cache_size().
|
||||||
|
|
||||||
update_acl(K, V) ->
|
update_authz(K, V) ->
|
||||||
erlang:put(K, V),
|
erlang:put(K, V),
|
||||||
keys_queue_update(K).
|
keys_queue_update(K).
|
||||||
|
|
||||||
cleanup_acl(TTL, KeysQ) ->
|
cleanup_authz(TTL, KeysQ) ->
|
||||||
case queue:out(KeysQ) of
|
case queue:out(KeysQ) of
|
||||||
{{value, OldestK}, KeysQ2} ->
|
{{value, OldestK}, KeysQ2} ->
|
||||||
{_AclResult, CachedAt} = erlang:get(OldestK),
|
{_AuthzResult, CachedAt} = erlang:get(OldestK),
|
||||||
if_expired(TTL, CachedAt,
|
if_expired(TTL, CachedAt,
|
||||||
fun(false) -> KeysQ;
|
fun(false) -> KeysQ;
|
||||||
(true) ->
|
(true) ->
|
||||||
erlang:erase(OldestK),
|
erlang:erase(OldestK),
|
||||||
decr_cache_size(),
|
decr_cache_size(),
|
||||||
cleanup_acl(TTL, KeysQ2)
|
cleanup_authz(TTL, KeysQ2)
|
||||||
end);
|
end);
|
||||||
{empty, KeysQ} -> KeysQ
|
{empty, KeysQ} -> KeysQ
|
||||||
end.
|
end.
|
||||||
|
|
||||||
incr_cache_size() ->
|
incr_cache_size() ->
|
||||||
erlang:put(acl_cache_size, get_cache_size() + 1), ok.
|
erlang:put(authz_cache_size, get_cache_size() + 1), ok.
|
||||||
decr_cache_size() ->
|
decr_cache_size() ->
|
||||||
Size = get_cache_size(),
|
Size = get_cache_size(),
|
||||||
case Size > 1 of
|
case Size > 1 of
|
||||||
true ->
|
true ->
|
||||||
erlang:put(acl_cache_size, Size-1);
|
erlang:put(authz_cache_size, Size-1);
|
||||||
false ->
|
false ->
|
||||||
erlang:put(acl_cache_size, 0)
|
erlang:put(authz_cache_size, 0)
|
||||||
end,
|
end,
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
set_cache_size(N) ->
|
set_cache_size(N) ->
|
||||||
erlang:put(acl_cache_size, N), ok.
|
erlang:put(authz_cache_size, N), ok.
|
||||||
|
|
||||||
%%% Ordered Keys Q %%%
|
%%% Ordered Keys Q %%%
|
||||||
keys_queue_in(Key) ->
|
keys_queue_in(Key) ->
|
||||||
|
@ -236,9 +236,9 @@ keys_queue_remove(Key, KeysQ) ->
|
||||||
end, KeysQ).
|
end, KeysQ).
|
||||||
|
|
||||||
keys_queue_set(KeysQ) ->
|
keys_queue_set(KeysQ) ->
|
||||||
erlang:put(acl_keys_q, KeysQ), ok.
|
erlang:put(authz_keys_q, KeysQ), ok.
|
||||||
keys_queue_get() ->
|
keys_queue_get() ->
|
||||||
case erlang:get(acl_keys_q) of
|
case erlang:get(authz_keys_q) of
|
||||||
undefined -> queue:new();
|
undefined -> queue:new();
|
||||||
KeysQ -> KeysQ
|
KeysQ -> KeysQ
|
||||||
end.
|
end.
|
|
@ -431,12 +431,12 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
||||||
ok ->
|
ok ->
|
||||||
TopicFilters0 = parse_topic_filters(TopicFilters),
|
TopicFilters0 = parse_topic_filters(TopicFilters),
|
||||||
TopicFilters1 = put_subid_in_subopts(Properties, TopicFilters0),
|
TopicFilters1 = put_subid_in_subopts(Properties, TopicFilters0),
|
||||||
TupleTopicFilters0 = check_sub_acls(TopicFilters1, Channel),
|
TupleTopicFilters0 = check_sub_authzs(TopicFilters1, Channel),
|
||||||
HasAclDeny = lists:any(fun({_TopicFilter, ReasonCode}) ->
|
HasAuthzDeny = lists:any(fun({_TopicFilter, ReasonCode}) ->
|
||||||
ReasonCode =:= ?RC_NOT_AUTHORIZED
|
ReasonCode =:= ?RC_NOT_AUTHORIZED
|
||||||
end, TupleTopicFilters0),
|
end, TupleTopicFilters0),
|
||||||
DenyAction = emqx_config:get_zone_conf(Zone, [authorization, deny_action]),
|
DenyAction = emqx_config:get_zone_conf(Zone, [authorization, deny_action]),
|
||||||
case DenyAction =:= disconnect andalso HasAclDeny of
|
case DenyAction =:= disconnect andalso HasAuthzDeny of
|
||||||
true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
|
true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
|
||||||
false ->
|
false ->
|
||||||
Replace = fun
|
Replace = fun
|
||||||
|
@ -542,7 +542,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
|
||||||
case pipeline([fun check_quota_exceeded/2,
|
case pipeline([fun check_quota_exceeded/2,
|
||||||
fun process_alias/2,
|
fun process_alias/2,
|
||||||
fun check_pub_alias/2,
|
fun check_pub_alias/2,
|
||||||
fun check_pub_acl/2,
|
fun check_pub_authz/2,
|
||||||
fun check_pub_caps/2
|
fun check_pub_caps/2
|
||||||
], Packet, Channel) of
|
], Packet, Channel) of
|
||||||
{ok, NPacket, NChannel} ->
|
{ok, NPacket, NChannel} ->
|
||||||
|
@ -956,9 +956,9 @@ handle_call({takeover, 'end'}, Channel = #channel{session = Session,
|
||||||
AllPendings = lists:append(Delivers, Pendings),
|
AllPendings = lists:append(Delivers, Pendings),
|
||||||
disconnect_and_shutdown(takeovered, AllPendings, Channel);
|
disconnect_and_shutdown(takeovered, AllPendings, Channel);
|
||||||
|
|
||||||
handle_call(list_acl_cache, #channel{clientinfo = #{zone := Zone}}
|
handle_call(list_authz_cache, #channel{clientinfo = #{zone := Zone}}
|
||||||
= Channel) ->
|
= Channel) ->
|
||||||
{reply, emqx_acl_cache:list_acl_cache(Zone), Channel};
|
{reply, emqx_authz_cache:list_authz_cache(Zone), Channel};
|
||||||
|
|
||||||
handle_call({quota, Policy}, Channel) ->
|
handle_call({quota, Policy}, Channel) ->
|
||||||
Zone = info(zone, Channel),
|
Zone = info(zone, Channel),
|
||||||
|
@ -1009,8 +1009,8 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}
|
||||||
?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
|
?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(clean_acl_cache, Channel) ->
|
handle_info(clean_authz_cache, Channel) ->
|
||||||
ok = emqx_acl_cache:empty_acl_cache(),
|
ok = emqx_authz_cache:empty_authz_cache(),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(Info, Channel) ->
|
handle_info(Info, Channel) ->
|
||||||
|
@ -1414,11 +1414,11 @@ check_pub_alias(#mqtt_packet{
|
||||||
check_pub_alias(_Packet, _Channel) -> ok.
|
check_pub_alias(_Packet, _Channel) -> ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Check Pub ACL
|
%% Check Pub Authorization
|
||||||
|
|
||||||
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
|
check_pub_authz(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
|
||||||
#channel{clientinfo = ClientInfo}) ->
|
#channel{clientinfo = ClientInfo}) ->
|
||||||
case is_acl_enabled(ClientInfo) andalso
|
case is_authz_enabled(ClientInfo) andalso
|
||||||
emqx_access_control:authorize(ClientInfo, publish, Topic) of
|
emqx_access_control:authorize(ClientInfo, publish, Topic) of
|
||||||
false -> ok;
|
false -> ok;
|
||||||
allow -> ok;
|
allow -> ok;
|
||||||
|
@ -1436,23 +1436,23 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
|
||||||
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}).
|
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Check Sub ACL
|
%% Check Sub Authorization
|
||||||
|
|
||||||
check_sub_acls(TopicFilters, Channel) ->
|
check_sub_authzs(TopicFilters, Channel) ->
|
||||||
check_sub_acls(TopicFilters, Channel, []).
|
check_sub_authzs(TopicFilters, Channel, []).
|
||||||
|
|
||||||
check_sub_acls([ TopicFilter = {Topic, _} | More] , Channel, Acc) ->
|
check_sub_authzs([ TopicFilter = {Topic, _} | More] , Channel, Acc) ->
|
||||||
case check_sub_acl(Topic, Channel) of
|
case check_sub_authz(Topic, Channel) of
|
||||||
allow ->
|
allow ->
|
||||||
check_sub_acls(More, Channel, [ {TopicFilter, 0} | Acc]);
|
check_sub_authzs(More, Channel, [ {TopicFilter, 0} | Acc]);
|
||||||
deny ->
|
deny ->
|
||||||
check_sub_acls(More, Channel, [ {TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
|
check_sub_authzs(More, Channel, [ {TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
|
||||||
end;
|
end;
|
||||||
check_sub_acls([], _Channel, Acc) ->
|
check_sub_authzs([], _Channel, Acc) ->
|
||||||
lists:reverse(Acc).
|
lists:reverse(Acc).
|
||||||
|
|
||||||
check_sub_acl(TopicFilter, #channel{clientinfo = ClientInfo}) ->
|
check_sub_authz(TopicFilter, #channel{clientinfo = ClientInfo}) ->
|
||||||
case is_acl_enabled(ClientInfo) andalso
|
case is_authz_enabled(ClientInfo) andalso
|
||||||
emqx_access_control:authorize(ClientInfo, subscribe, TopicFilter) of
|
emqx_access_control:authorize(ClientInfo, subscribe, TopicFilter) of
|
||||||
false -> allow;
|
false -> allow;
|
||||||
Result -> Result
|
Result -> Result
|
||||||
|
@ -1620,8 +1620,8 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Is ACL enabled?
|
%% Is Authorization enabled?
|
||||||
is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
|
is_authz_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
|
||||||
(not IsSuperuser) andalso emqx_config:get_zone_conf(Zone, [authorization, enable]).
|
(not IsSuperuser) andalso emqx_config:get_zone_conf(Zone, [authorization, enable]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%%
|
%%
|
||||||
%% [ACL](https://github.com/emqtt/emqttd/wiki/ACL)
|
%% [Authorization](https://github.com/emqtt/emqttd/wiki/Authorization)
|
||||||
%%
|
%%
|
||||||
%% -type who() :: all | binary() |
|
%% -type who() :: all | binary() |
|
||||||
%% {ipaddr, esockd_access:cidr()} |
|
%% {ipaddr, esockd_access:cidr()} |
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_acl_cache_SUITE).
|
-module(emqx_authz_cache_SUITE).
|
||||||
|
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
@ -26,7 +26,7 @@ all() -> emqx_ct:all(?MODULE).
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_ct_helpers:boot_modules(all),
|
emqx_ct_helpers:boot_modules(all),
|
||||||
emqx_ct_helpers:start_apps([]),
|
emqx_ct_helpers:start_apps([]),
|
||||||
toggle_acl(true),
|
toggle_authz(true),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
|
@ -36,7 +36,7 @@ end_per_suite(_Config) ->
|
||||||
%% Test cases
|
%% Test cases
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_clean_acl_cache(_) ->
|
t_clean_authz_cache(_) ->
|
||||||
{ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]),
|
{ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||||
|
@ -49,14 +49,14 @@ t_clean_acl_cache(_) ->
|
||||||
lists:last(Pids);
|
lists:last(Pids);
|
||||||
_ -> {error, not_found}
|
_ -> {error, not_found}
|
||||||
end,
|
end,
|
||||||
Caches = gen_server:call(ClientPid, list_acl_cache),
|
Caches = gen_server:call(ClientPid, list_authz_cache),
|
||||||
ct:log("acl caches: ~p", [Caches]),
|
ct:log("authz caches: ~p", [Caches]),
|
||||||
?assert(length(Caches) > 0),
|
?assert(length(Caches) > 0),
|
||||||
erlang:send(ClientPid, clean_acl_cache),
|
erlang:send(ClientPid, clean_authz_cache),
|
||||||
?assertEqual(0, length(gen_server:call(ClientPid, list_acl_cache))),
|
?assertEqual(0, length(gen_server:call(ClientPid, list_authz_cache))),
|
||||||
emqtt:stop(Client).
|
emqtt:stop(Client).
|
||||||
|
|
||||||
t_drain_acl_cache(_) ->
|
t_drain_authz_cache(_) ->
|
||||||
{ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]),
|
{ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||||
|
@ -69,15 +69,15 @@ t_drain_acl_cache(_) ->
|
||||||
lists:last(Pids);
|
lists:last(Pids);
|
||||||
_ -> {error, not_found}
|
_ -> {error, not_found}
|
||||||
end,
|
end,
|
||||||
Caches = gen_server:call(ClientPid, list_acl_cache),
|
Caches = gen_server:call(ClientPid, list_authz_cache),
|
||||||
ct:log("acl caches: ~p", [Caches]),
|
ct:log("authz caches: ~p", [Caches]),
|
||||||
?assert(length(Caches) > 0),
|
?assert(length(Caches) > 0),
|
||||||
emqx_acl_cache:drain_cache(),
|
emqx_authz_cache:drain_cache(),
|
||||||
?assertEqual(0, length(gen_server:call(ClientPid, list_acl_cache))),
|
?assertEqual(0, length(gen_server:call(ClientPid, list_authz_cache))),
|
||||||
ct:sleep(100),
|
ct:sleep(100),
|
||||||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||||
?assert(length(gen_server:call(ClientPid, list_acl_cache)) > 0),
|
?assert(length(gen_server:call(ClientPid, list_authz_cache)) > 0),
|
||||||
emqtt:stop(Client).
|
emqtt:stop(Client).
|
||||||
|
|
||||||
toggle_acl(Bool) when is_boolean(Bool) ->
|
toggle_authz(Bool) when is_boolean(Bool) ->
|
||||||
emqx_config:put_zone_conf(default, [authorization, enable], Bool).
|
emqx_config:put_zone_conf(default, [authorization, enable], Bool).
|
|
@ -14,20 +14,20 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_acl_test_mod).
|
-module(emqx_authz_test_mod).
|
||||||
|
|
||||||
%% ACL callbacks
|
%% Authorization callbacks
|
||||||
-export([ init/1
|
-export([ init/1
|
||||||
, authorize/2
|
, authorize/2
|
||||||
, description/0
|
, description/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
init(AclOpts) ->
|
init(AuthzOpts) ->
|
||||||
{ok, AclOpts}.
|
{ok, AuthzOpts}.
|
||||||
|
|
||||||
authorize({_User, _PubSub, _Topic}, _State) ->
|
authorize({_User, _PubSub, _Topic}, _State) ->
|
||||||
allow.
|
allow.
|
||||||
|
|
||||||
description() ->
|
description() ->
|
||||||
"Test ACL Mod".
|
"Test Authorization Mod".
|
||||||
|
|
|
@ -862,20 +862,20 @@ t_packing_alias(_) ->
|
||||||
#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}},
|
#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}},
|
||||||
channel())).
|
channel())).
|
||||||
|
|
||||||
t_check_pub_acl(_) ->
|
t_check_pub_authz(_) ->
|
||||||
emqx_config:put_zone_conf(default, [authorization, enable], true),
|
emqx_config:put_zone_conf(default, [authorization, enable], true),
|
||||||
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
|
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
|
||||||
ok = emqx_channel:check_pub_acl(Publish, channel()).
|
ok = emqx_channel:check_pub_authz(Publish, channel()).
|
||||||
|
|
||||||
t_check_pub_alias(_) ->
|
t_check_pub_alias(_) ->
|
||||||
Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
|
Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
|
||||||
Channel = emqx_channel:set_field(alias_maximum, #{inbound => 10}, channel()),
|
Channel = emqx_channel:set_field(alias_maximum, #{inbound => 10}, channel()),
|
||||||
ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel).
|
ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel).
|
||||||
|
|
||||||
t_check_sub_acls(_) ->
|
t_check_sub_authzs(_) ->
|
||||||
emqx_config:put_zone_conf(default, [authorization, enable], true),
|
emqx_config:put_zone_conf(default, [authorization, enable], true),
|
||||||
TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS},
|
TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS},
|
||||||
[{TopicFilter, 0}] = emqx_channel:check_sub_acls([TopicFilter], channel()).
|
[{TopicFilter, 0}] = emqx_channel:check_sub_authzs([TopicFilter], channel()).
|
||||||
|
|
||||||
t_enrich_connack_caps(_) ->
|
t_enrich_connack_caps(_) ->
|
||||||
ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),
|
ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),
|
||||||
|
|
|
@ -71,8 +71,8 @@ generate_config() ->
|
||||||
hocon_schema:generate(emqx_schema, Conf).
|
hocon_schema:generate(emqx_schema, Conf).
|
||||||
|
|
||||||
set_app_env({App, Lists}) ->
|
set_app_env({App, Lists}) ->
|
||||||
lists:foreach(fun({acl_file, _Var}) ->
|
lists:foreach(fun({authz_file, _Var}) ->
|
||||||
application:set_env(App, acl_file, local_path(["etc", "acl.conf"]));
|
application:set_env(App, authz_file, local_path(["etc", "authz.conf"]));
|
||||||
({plugins_loaded_file, _Var}) ->
|
({plugins_loaded_file, _Var}) ->
|
||||||
application:set_env(App,
|
application:set_env(App,
|
||||||
plugins_loaded_file,
|
plugins_loaded_file,
|
||||||
|
|
|
@ -23,7 +23,7 @@ authz:{
|
||||||
keyfile: "etc/certs/client-key.pem"
|
keyfile: "etc/certs/client-key.pem"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or clientid = '%c'"
|
sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or clientid = '%c'"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
type: pgsql
|
type: pgsql
|
||||||
|
@ -36,7 +36,7 @@ authz:{
|
||||||
auto_reconnect: true
|
auto_reconnect: true
|
||||||
ssl: {enable: false}
|
ssl: {enable: false}
|
||||||
}
|
}
|
||||||
sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
|
sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
type: redis
|
type: redis
|
||||||
|
@ -48,7 +48,7 @@ authz:{
|
||||||
auto_reconnect: true
|
auto_reconnect: true
|
||||||
ssl: {enable: false}
|
ssl: {enable: false}
|
||||||
}
|
}
|
||||||
cmd: "HGETALL mqtt_acl:%u"
|
cmd: "HGETALL mqtt_authz:%u"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
principal: {username: "^admin?"}
|
principal: {username: "^admin?"}
|
||||||
|
@ -77,7 +77,7 @@ authz:{
|
||||||
Create Example Table
|
Create Example Table
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
CREATE TABLE `mqtt_acl` (
|
CREATE TABLE `mqtt_authz` (
|
||||||
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
|
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
|
||||||
`ipaddress` VARCHAR(60) NOT NULL DEFAULT '',
|
`ipaddress` VARCHAR(60) NOT NULL DEFAULT '',
|
||||||
`username` VARCHAR(100) NOT NULL DEFAULT '',
|
`username` VARCHAR(100) NOT NULL DEFAULT '',
|
||||||
|
@ -93,7 +93,7 @@ Sample data in the default configuration:
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
-- Only 127.0.0.1 users can subscribe to system topics
|
-- Only 127.0.0.1 users can subscribe to system topics
|
||||||
INSERT INTO mqtt_acl (ipaddress, username, clientid, action, permission, topic) VALUES ('127.0.0.1', '', '', 'subscribe', 'allow', '$SYS/#');
|
INSERT INTO mqtt_authz (ipaddress, username, clientid, action, permission, topic) VALUES ('127.0.0.1', '', '', 'subscribe', 'allow', '$SYS/#');
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Pgsql
|
#### Pgsql
|
||||||
|
@ -104,7 +104,7 @@ Create Example Table
|
||||||
CREATE TYPE ACTION AS ENUM('publish','subscribe','all');
|
CREATE TYPE ACTION AS ENUM('publish','subscribe','all');
|
||||||
CREATE TYPE PERMISSION AS ENUM('allow','deny');
|
CREATE TYPE PERMISSION AS ENUM('allow','deny');
|
||||||
|
|
||||||
CREATE TABLE mqtt_acl (
|
CREATE TABLE mqtt_authz (
|
||||||
id SERIAL PRIMARY KEY,
|
id SERIAL PRIMARY KEY,
|
||||||
ipaddress CHARACTER VARYING(60) NOT NULL DEFAULT '',
|
ipaddress CHARACTER VARYING(60) NOT NULL DEFAULT '',
|
||||||
username CHARACTER VARYING(100) NOT NULL DEFAULT '',
|
username CHARACTER VARYING(100) NOT NULL DEFAULT '',
|
||||||
|
@ -120,7 +120,7 @@ Sample data in the default configuration:
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
-- Only 127.0.0.1 users can subscribe to system topics
|
-- Only 127.0.0.1 users can subscribe to system topics
|
||||||
INSERT INTO mqtt_acl (ipaddress, username, clientid, action, permission, topic) VALUES ('127.0.0.1', '', '', 'subscribe', 'allow', '$SYS/#');
|
INSERT INTO mqtt_authz (ipaddress, username, clientid, action, permission, topic) VALUES ('127.0.0.1', '', '', 'subscribe', 'allow', '$SYS/#');
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Redis
|
#### Redis
|
||||||
|
@ -128,7 +128,7 @@ INSERT INTO mqtt_acl (ipaddress, username, clientid, action, permission, topic)
|
||||||
Sample data in the default configuration:
|
Sample data in the default configuration:
|
||||||
|
|
||||||
```
|
```
|
||||||
HSET mqtt_acl:emqx '$SYS/#' subscribe
|
HSET mqtt_authz:emqx '$SYS/#' subscribe
|
||||||
```
|
```
|
||||||
|
|
||||||
A rule of Redis AuthZ defines `publish`, `subscribe`, or `all `information. All lists in the rule are **allow** lists.
|
A rule of Redis AuthZ defines `publish`, `subscribe`, or `all `information. All lists in the rule are **allow** lists.
|
||||||
|
|
|
@ -26,7 +26,7 @@ emqx_authz:{
|
||||||
# keyfile: "{{ platform_etc_dir }}/certs/client-key.pem"
|
# keyfile: "{{ platform_etc_dir }}/certs/client-key.pem"
|
||||||
# }
|
# }
|
||||||
# }
|
# }
|
||||||
# sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or clientid = '%c'"
|
# sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or clientid = '%c'"
|
||||||
# },
|
# },
|
||||||
# {
|
# {
|
||||||
# type: pgsql
|
# type: pgsql
|
||||||
|
@ -39,7 +39,7 @@ emqx_authz:{
|
||||||
# auto_reconnect: true
|
# auto_reconnect: true
|
||||||
# ssl: {enable: false}
|
# ssl: {enable: false}
|
||||||
# }
|
# }
|
||||||
# sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
|
# sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
|
||||||
# },
|
# },
|
||||||
# {
|
# {
|
||||||
# type: redis
|
# type: redis
|
||||||
|
@ -51,7 +51,7 @@ emqx_authz:{
|
||||||
# auto_reconnect: true
|
# auto_reconnect: true
|
||||||
# ssl: {enable: false}
|
# ssl: {enable: false}
|
||||||
# }
|
# }
|
||||||
# cmd: "HGETALL mqtt_acl:%u"
|
# cmd: "HGETALL mqtt_authz:%u"
|
||||||
# },
|
# },
|
||||||
# {
|
# {
|
||||||
# type: mongo
|
# type: mongo
|
||||||
|
@ -62,7 +62,7 @@ emqx_authz:{
|
||||||
# database: mqtt
|
# database: mqtt
|
||||||
# ssl: {enable: false}
|
# ssl: {enable: false}
|
||||||
# }
|
# }
|
||||||
# collection: mqtt_acl
|
# collection: mqtt_authz
|
||||||
# find: { "$or": [ { "username": "%u" }, { "clientid": "%c" } ] }
|
# find: { "$or": [ { "username": "%u" }, { "clientid": "%c" } ] }
|
||||||
# },
|
# },
|
||||||
{
|
{
|
||||||
|
|
|
@ -72,7 +72,7 @@ post_config_update(NewRules, _OldConf) ->
|
||||||
Action = find_action_in_hooks(),
|
Action = find_action_in_hooks(),
|
||||||
ok = emqx_hooks:del('client.authorize', Action),
|
ok = emqx_hooks:del('client.authorize', Action),
|
||||||
ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [InitedRules]}, -1),
|
ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [InitedRules]}, -1),
|
||||||
ok = emqx_acl_cache:drain_cache().
|
ok = emqx_authz_cache:drain_cache().
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
|
@ -178,7 +178,7 @@ b2l(B) when is_binary(B) -> binary_to_list(B).
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Check AuthZ
|
%% @doc Check AuthZ
|
||||||
-spec(authorize(emqx_types:clientinfo(), emqx_types:all(), emqx_topic:topic(), emqx_permission_rule:acl_result(), rules())
|
-spec(authorize(emqx_types:clientinfo(), emqx_types:all(), emqx_topic:topic(), allow | deny, rules())
|
||||||
-> {stop, allow} | {ok, deny}).
|
-> {stop, allow} | {ok, deny}).
|
||||||
authorize(#{username := Username,
|
authorize(#{username := Username,
|
||||||
peerhost := IpAddress
|
peerhost := IpAddress
|
||||||
|
|
|
@ -58,7 +58,7 @@ end_per_suite(_Config) ->
|
||||||
|
|
||||||
% set_special_configs(emqx) ->
|
% set_special_configs(emqx) ->
|
||||||
% application:set_env(emqx, allow_anonymous, true),
|
% application:set_env(emqx, allow_anonymous, true),
|
||||||
% application:set_env(emqx, enable_acl_cache, false),
|
% application:set_env(emqx, enable_authz_cache, false),
|
||||||
% ok;
|
% ok;
|
||||||
% set_special_configs(emqx_authz) ->
|
% set_special_configs(emqx_authz) ->
|
||||||
% emqx_config:put([emqx_authz], #{rules => []}),
|
% emqx_config:put([emqx_authz], #{rules => []}),
|
||||||
|
|
|
@ -46,7 +46,7 @@ init_per_suite(Config) ->
|
||||||
<<"auto_reconnect">> => true,
|
<<"auto_reconnect">> => true,
|
||||||
<<"ssl">> => #{<<"enable">> => false}
|
<<"ssl">> => #{<<"enable">> => false}
|
||||||
},
|
},
|
||||||
<<"cmd">> => <<"HGETALL mqtt_acl:%u">>,
|
<<"cmd">> => <<"HGETALL mqtt_authz:%u">>,
|
||||||
<<"type">> => <<"redis">> }],
|
<<"type">> => <<"redis">> }],
|
||||||
emqx_authz:update(replace, Rules),
|
emqx_authz:update(replace, Rules),
|
||||||
Config.
|
Config.
|
||||||
|
|
|
@ -26,7 +26,7 @@ GET | /nodes/:node/sessions/ | A list of sessions on a node
|
||||||
GET | /subscriptions/:clientid | A list of subscriptions of a client
|
GET | /subscriptions/:clientid | A list of subscriptions of a client
|
||||||
GET | /nodes/:node/subscriptions/:clientid | A list of subscriptions of a client on the node
|
GET | /nodes/:node/subscriptions/:clientid | A list of subscriptions of a client on the node
|
||||||
GET | /nodes/:node/subscriptions/ | A list of subscriptions on a node
|
GET | /nodes/:node/subscriptions/ | A list of subscriptions on a node
|
||||||
PUT | /clients/:clientid/clean_acl_cache | Clean ACL cache of a client
|
PUT | /clients/:clientid/clean_authz_cache | Clean Authorization cache of a client
|
||||||
GET | /configs/ | Get all configs
|
GET | /configs/ | Get all configs
|
||||||
GET | /nodes/:node/configs/ | Get all configs of a node
|
GET | /nodes/:node/configs/ | Get all configs of a node
|
||||||
GET | /nodes/:node/plugin_configs/:plugin | Get configurations of a plugin on the node
|
GET | /nodes/:node/plugin_configs/:plugin | Get configurations of a plugin on the node
|
||||||
|
|
|
@ -60,7 +60,7 @@ exproto.listener.protoname.idle_timeout = 30s
|
||||||
##
|
##
|
||||||
## See: https://github.com/emqtt/esockd#allowdeny
|
## See: https://github.com/emqtt/esockd#allowdeny
|
||||||
##
|
##
|
||||||
## Value: ACL Rule
|
## Value: Authorization Rule
|
||||||
##
|
##
|
||||||
## Example: "allow 192.168.0.0/24"
|
## Example: "allow 192.168.0.0/24"
|
||||||
exproto.listener.protoname.access.1 = "allow all"
|
exproto.listener.protoname.access.1 = "allow all"
|
||||||
|
|
|
@ -153,7 +153,7 @@ To subscribe any topic, issue following command:
|
||||||
- {username} and {password} are optional.
|
- {username} and {password} are optional.
|
||||||
- if {username} or {password} is incorrect, the error code `unauthorized` will be returned.
|
- if {username} or {password} is incorrect, the error code `unauthorized` will be returned.
|
||||||
- topic is subscribed with qos1.
|
- topic is subscribed with qos1.
|
||||||
- if the subscription failed due to ACL deny, the error code `forbidden` will be returned.
|
- if the subscription failed due to Authorization deny, the error code `forbidden` will be returned.
|
||||||
|
|
||||||
CoAP Client Unobserve Operation (unsubscribe topic)
|
CoAP Client Unobserve Operation (unsubscribe topic)
|
||||||
---------------------------------------------------
|
---------------------------------------------------
|
||||||
|
@ -196,7 +196,7 @@ Issue a coap put command to publish messages. For example:
|
||||||
- payload could be any binary data.
|
- payload could be any binary data.
|
||||||
- payload data type is "application/octet-stream".
|
- payload data type is "application/octet-stream".
|
||||||
- publish message will be sent with qos0.
|
- publish message will be sent with qos0.
|
||||||
- if the publishing failed due to ACL deny, the error code `forbidden` will be returned.
|
- if the publishing failed due to Authorization deny, the error code `forbidden` will be returned.
|
||||||
|
|
||||||
CoAP Client Keep Alive
|
CoAP Client Keep Alive
|
||||||
----------------------
|
----------------------
|
||||||
|
@ -230,7 +230,7 @@ ClientId, Username, Password and Topic
|
||||||
--------------------------------------
|
--------------------------------------
|
||||||
ClientId/username/password/topic in the coap URI are the concepts in mqtt. That is to say, emqx-coap is trying to fit coap message into mqtt system, by borrowing the client/username/password/topic from mqtt.
|
ClientId/username/password/topic in the coap URI are the concepts in mqtt. That is to say, emqx-coap is trying to fit coap message into mqtt system, by borrowing the client/username/password/topic from mqtt.
|
||||||
|
|
||||||
The Auth/ACL/Hook features in mqtt also applies on coap stuff. For example:
|
The Auth/Authorization/Hook features in mqtt also applies on coap stuff. For example:
|
||||||
- If username/password is not authorized, coap client will get an unauthorized error.
|
- If username/password is not authorized, coap client will get an unauthorized error.
|
||||||
- If username or clientid is not allowed to published specific topic, coap message will be dropped in fact, although coap client will get an acknoledgement from emqx-coap.
|
- If username or clientid is not allowed to published specific topic, coap message will be dropped in fact, although coap client will get an acknoledgement from emqx-coap.
|
||||||
- If a coap message is published, a 'message.publish' hook is able to capture this message as well.
|
- If a coap message is published, a 'message.publish' hook is able to capture this message as well.
|
||||||
|
|
|
@ -228,7 +228,7 @@ chann_subscribe(Topic, State = #state{clientid = ClientId}) ->
|
||||||
emqx_hooks:run('session.subscribed', [clientinfo(State), Topic, ?SUBOPTS]),
|
emqx_hooks:run('session.subscribed', [clientinfo(State), Topic, ?SUBOPTS]),
|
||||||
ok;
|
ok;
|
||||||
deny ->
|
deny ->
|
||||||
?LOG(warning, "subscribe to ~p by clientid ~p failed due to acl check.",
|
?LOG(warning, "subscribe to ~p by clientid ~p failed due to authz check.",
|
||||||
[Topic, ClientId]),
|
[Topic, ClientId]),
|
||||||
{error, forbidden}
|
{error, forbidden}
|
||||||
end.
|
end.
|
||||||
|
@ -248,7 +248,7 @@ chann_publish(Topic, Payload, State = #state{clientid = ClientId}) ->
|
||||||
emqx_message:make(ClientId, ?QOS_0, Topic, Payload))),
|
emqx_message:make(ClientId, ?QOS_0, Topic, Payload))),
|
||||||
ok;
|
ok;
|
||||||
deny ->
|
deny ->
|
||||||
?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.",
|
?LOG(warning, "publish to ~p by clientid ~p failed due to authz check.",
|
||||||
[Topic, ClientId]),
|
[Topic, ClientId]),
|
||||||
{error, forbidden}
|
{error, forbidden}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -68,7 +68,7 @@
|
||||||
% ?assert(false)
|
% ?assert(false)
|
||||||
% end.
|
% end.
|
||||||
|
|
||||||
% t_publish_acl_deny(_Config) ->
|
% t_publish_authz_deny(_Config) ->
|
||||||
% Topic = <<"abc">>, Payload = <<"123">>,
|
% Topic = <<"abc">>, Payload = <<"123">>,
|
||||||
% TopicStr = binary_to_list(Topic),
|
% TopicStr = binary_to_list(Topic),
|
||||||
% URI = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret",
|
% URI = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret",
|
||||||
|
@ -110,7 +110,7 @@
|
||||||
|
|
||||||
% [] = emqx:subscribers(Topic).
|
% [] = emqx:subscribers(Topic).
|
||||||
|
|
||||||
% t_observe_acl_deny(_Config) ->
|
% t_observe_authz_deny(_Config) ->
|
||||||
% Topic = <<"abc">>, TopicStr = binary_to_list(Topic),
|
% Topic = <<"abc">>, TopicStr = binary_to_list(Topic),
|
||||||
% Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret",
|
% Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret",
|
||||||
% ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history]),
|
% ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history]),
|
||||||
|
@ -265,7 +265,7 @@
|
||||||
% end.
|
% end.
|
||||||
|
|
||||||
% % mqtt connection kicked by coap with same client id
|
% % mqtt connection kicked by coap with same client id
|
||||||
% t_acl(_Config) ->
|
% t_authz(_Config) ->
|
||||||
% OldPath = emqx:get_env(plugins_etc_dir),
|
% OldPath = emqx:get_env(plugins_etc_dir),
|
||||||
% application:set_env(emqx, plugins_etc_dir,
|
% application:set_env(emqx, plugins_etc_dir,
|
||||||
% emqx_ct_helpers:deps_path(emqx_authz, "test")),
|
% emqx_ct_helpers:deps_path(emqx_authz, "test")),
|
||||||
|
|
|
@ -129,7 +129,7 @@
|
||||||
|
|
||||||
% {'on_client_authorize', Resp} = emqx_exhook_demo_svr:take(),
|
% {'on_client_authorize', Resp} = emqx_exhook_demo_svr:take(),
|
||||||
% Expected =
|
% Expected =
|
||||||
% #{result => aclresult_to_bool(Result),
|
% #{result => authzresult_to_bool(Result),
|
||||||
% type => pubsub_to_enum(PubSub),
|
% type => pubsub_to_enum(PubSub),
|
||||||
% topic => Topic,
|
% topic => Topic,
|
||||||
% clientinfo => from_clientinfo(ClientInfo)
|
% clientinfo => from_clientinfo(ClientInfo)
|
||||||
|
@ -425,7 +425,7 @@
|
||||||
% authresult_to_bool(AuthResult) ->
|
% authresult_to_bool(AuthResult) ->
|
||||||
% maps:get(auth_result, AuthResult, undefined) == success.
|
% maps:get(auth_result, AuthResult, undefined) == success.
|
||||||
|
|
||||||
% aclresult_to_bool(Result) ->
|
% authzresult_to_bool(Result) ->
|
||||||
% Result == allow.
|
% Result == allow.
|
||||||
|
|
||||||
% pubsub_to_enum(publish) -> 'PUBLISH';
|
% pubsub_to_enum(publish) -> 'PUBLISH';
|
||||||
|
@ -490,7 +490,7 @@
|
||||||
|
|
||||||
% set_special_cfgs(emqx) ->
|
% set_special_cfgs(emqx) ->
|
||||||
% application:set_env(emqx, allow_anonymous, false),
|
% application:set_env(emqx, allow_anonymous, false),
|
||||||
% application:set_env(emqx, enable_acl_cache, false),
|
% application:set_env(emqx, enable_authz_cache, false),
|
||||||
% application:set_env(emqx, modules_loaded_file, undefined),
|
% application:set_env(emqx, modules_loaded_file, undefined),
|
||||||
% application:set_env(emqx, plugins_loaded_file,
|
% application:set_env(emqx, plugins_loaded_file,
|
||||||
% emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
|
% emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
|
||||||
|
|
|
@ -40,7 +40,7 @@
|
||||||
|
|
||||||
% set_special_cfgs(emqx) ->
|
% set_special_cfgs(emqx) ->
|
||||||
% application:set_env(emqx, allow_anonymous, false),
|
% application:set_env(emqx, allow_anonymous, false),
|
||||||
% application:set_env(emqx, enable_acl_cache, false),
|
% application:set_env(emqx, enable_authz_cache, false),
|
||||||
% application:set_env(emqx, plugins_loaded_file, undefined),
|
% application:set_env(emqx, plugins_loaded_file, undefined),
|
||||||
% application:set_env(emqx, modules_loaded_file, undefined);
|
% application:set_env(emqx, modules_loaded_file, undefined);
|
||||||
% set_special_cfgs(emqx_exhook) ->
|
% set_special_cfgs(emqx_exhook) ->
|
||||||
|
|
|
@ -318,7 +318,7 @@ handle_call({subscribe, TopicFilter, Qos},
|
||||||
clientinfo = ClientInfo}) ->
|
clientinfo = ClientInfo}) ->
|
||||||
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, TopicFilter) of
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, TopicFilter) of
|
||||||
deny ->
|
deny ->
|
||||||
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
|
{reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel};
|
||||||
_ ->
|
_ ->
|
||||||
{ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
|
{ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
|
||||||
{reply, ok, NChannel}
|
{reply, ok, NChannel}
|
||||||
|
@ -338,7 +338,7 @@ handle_call({publish, Topic, Qos, Payload},
|
||||||
mountpoint := Mountpoint}}) ->
|
mountpoint := Mountpoint}}) ->
|
||||||
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of
|
||||||
deny ->
|
deny ->
|
||||||
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
|
{reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel};
|
||||||
_ ->
|
_ ->
|
||||||
Msg = emqx_message:make(From, Qos, Topic, Payload),
|
Msg = emqx_message:make(From, Qos, Topic, Payload),
|
||||||
NMsg = emqx_mountpoint:mount(Mountpoint, Msg),
|
NMsg = emqx_mountpoint:mount(Mountpoint, Msg),
|
||||||
|
|
|
@ -80,7 +80,7 @@ LEGAL DISCLAIMER
|
||||||
<Type>Integer</Type>
|
<Type>Integer</Type>
|
||||||
<RangeEnumeration>1-65534</RangeEnumeration>
|
<RangeEnumeration>1-65534</RangeEnumeration>
|
||||||
<Units></Units>
|
<Units></Units>
|
||||||
<Description><![CDATA[Resources 0 and 1 point to the Object Instance for which the Instances of the ACL Resource of that Access Control Object Instance are applicable.]]></Description>
|
<Description><![CDATA[Resources 0 and 1 point to the Object Instance for which the Instances of the Authorization Resource of that Access Control Object Instance are applicable.]]></Description>
|
||||||
</Item>
|
</Item>
|
||||||
<Item ID="1">
|
<Item ID="1">
|
||||||
<Name>Object Instance ID</Name>
|
<Name>Object Instance ID</Name>
|
||||||
|
@ -93,7 +93,7 @@ LEGAL DISCLAIMER
|
||||||
<Description><![CDATA[See above]]></Description>
|
<Description><![CDATA[See above]]></Description>
|
||||||
</Item>
|
</Item>
|
||||||
<Item ID="2">
|
<Item ID="2">
|
||||||
<Name>ACL</Name>
|
<Name>Authorization</Name>
|
||||||
<Operations>RW</Operations>
|
<Operations>RW</Operations>
|
||||||
<MultipleInstances>Multiple</MultipleInstances>
|
<MultipleInstances>Multiple</MultipleInstances>
|
||||||
<Mandatory>Optional</Mandatory>
|
<Mandatory>Optional</Mandatory>
|
||||||
|
@ -101,7 +101,7 @@ LEGAL DISCLAIMER
|
||||||
<RangeEnumeration>16-bit</RangeEnumeration>
|
<RangeEnumeration>16-bit</RangeEnumeration>
|
||||||
<Units></Units>
|
<Units></Units>
|
||||||
<Description><![CDATA[The Resource Instance ID MUST be the Short Server ID of a certain LwM2M Server for which associated access rights are contained in the Resource Instance value.
|
<Description><![CDATA[The Resource Instance ID MUST be the Short Server ID of a certain LwM2M Server for which associated access rights are contained in the Resource Instance value.
|
||||||
The Resource Instance ID 0 is a specific ID, determining the ACL Instance which contains the default access rights.
|
The Resource Instance ID 0 is a specific ID, determining the Authorization Instance which contains the default access rights.
|
||||||
Each bit set in the Resource Instance value, grants an access right to the LwM2M Server to the corresponding operation.
|
Each bit set in the Resource Instance value, grants an access right to the LwM2M Server to the corresponding operation.
|
||||||
The bit order is specified as below.
|
The bit order is specified as below.
|
||||||
1st LSB: R(Read, Observe, Write-Attributes)
|
1st LSB: R(Read, Observe, Write-Attributes)
|
||||||
|
|
|
@ -488,7 +488,7 @@ handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) ->
|
||||||
[ fun check_qos3_enable/2
|
[ fun check_qos3_enable/2
|
||||||
, fun preproc_pub_pkt/2
|
, fun preproc_pub_pkt/2
|
||||||
, fun convert_topic_id_to_name/2
|
, fun convert_topic_id_to_name/2
|
||||||
, fun check_pub_acl/2
|
, fun check_pub_authz/2
|
||||||
, fun convert_pub_to_msg/2
|
, fun convert_pub_to_msg/2
|
||||||
], PubPkt, Channel) of
|
], PubPkt, Channel) of
|
||||||
{ok, Msg, NChannel} ->
|
{ok, Msg, NChannel} ->
|
||||||
|
@ -593,7 +593,7 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId),
|
||||||
handle_in(SubPkt = ?SN_SUBSCRIBE_MSG(_, MsgId, _), Channel) ->
|
handle_in(SubPkt = ?SN_SUBSCRIBE_MSG(_, MsgId, _), Channel) ->
|
||||||
case emqx_misc:pipeline(
|
case emqx_misc:pipeline(
|
||||||
[ fun preproc_subs_type/2
|
[ fun preproc_subs_type/2
|
||||||
, fun check_subscribe_acl/2
|
, fun check_subscribe_authz/2
|
||||||
, fun do_subscribe/2
|
, fun do_subscribe/2
|
||||||
], SubPkt, Channel) of
|
], SubPkt, Channel) of
|
||||||
{ok, {TopicId, GrantedQoS}, NChannel} ->
|
{ok, {TopicId, GrantedQoS}, NChannel} ->
|
||||||
|
@ -729,7 +729,7 @@ convert_topic_id_to_name({{id, TopicId}, Flags, Data},
|
||||||
{ok, {TopicName, Flags, Data}, Channel}
|
{ok, {TopicName, Flags, Data}, Channel}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_pub_acl({TopicName, _Flags, _Data},
|
check_pub_authz({TopicName, _Flags, _Data},
|
||||||
#channel{ctx = Ctx, clientinfo = ClientInfo}) ->
|
#channel{ctx = Ctx, clientinfo = ClientInfo}) ->
|
||||||
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, TopicName) of
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, TopicName) of
|
||||||
allow -> ok;
|
allow -> ok;
|
||||||
|
@ -834,7 +834,7 @@ preproc_subs_type(?SN_SUBSCRIBE_MSG_TYPE(_Reserved, _TopicId, _QoS),
|
||||||
_Channel) ->
|
_Channel) ->
|
||||||
{error, ?SN_RC_NOT_SUPPORTED}.
|
{error, ?SN_RC_NOT_SUPPORTED}.
|
||||||
|
|
||||||
check_subscribe_acl({_TopicId, TopicName, _QoS},
|
check_subscribe_authz({_TopicId, TopicName, _QoS},
|
||||||
Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
|
Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
|
||||||
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, TopicName) of
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, TopicName) of
|
||||||
allow ->
|
allow ->
|
||||||
|
@ -1097,8 +1097,8 @@ handle_call(discard, Channel) ->
|
||||||
% AllPendings = lists:append(Delivers, Pendings),
|
% AllPendings = lists:append(Delivers, Pendings),
|
||||||
% shutdown_and_reply(takeovered, AllPendings, Channel);
|
% shutdown_and_reply(takeovered, AllPendings, Channel);
|
||||||
|
|
||||||
%handle_call(list_acl_cache, Channel) ->
|
%handle_call(list_authz_cache, Channel) ->
|
||||||
% {reply, emqx_acl_cache:list_acl_cache(), Channel};
|
% {reply, emqx_authz_cache:list_authz_cache(), Channel};
|
||||||
|
|
||||||
%% XXX: No Quota Now
|
%% XXX: No Quota Now
|
||||||
% handle_call({quota, Policy}, Channel) ->
|
% handle_call({quota, Policy}, Channel) ->
|
||||||
|
@ -1162,8 +1162,8 @@ handle_info({sock_closed, Reason},
|
||||||
?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
|
?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(clean_acl_cache, Channel) ->
|
handle_info(clean_authz_cache, Channel) ->
|
||||||
ok = emqx_acl_cache:empty_acl_cache(),
|
ok = emqx_authz_cache:empty_authz_cache(),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(Info, Channel) ->
|
handle_info(Info, Channel) ->
|
||||||
|
|
|
@ -359,7 +359,7 @@ handle_in(Frame = ?PACKET(?CMD_SEND, Headers),
|
||||||
Topic = header(<<"destination">>, Headers),
|
Topic = header(<<"destination">>, Headers),
|
||||||
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of
|
||||||
deny ->
|
deny ->
|
||||||
handle_out(error, {receipt_id(Headers), "ACL Deny"}, Channel);
|
handle_out(error, {receipt_id(Headers), "Authorization Deny"}, Channel);
|
||||||
allow ->
|
allow ->
|
||||||
case header(<<"transaction">>, Headers) of
|
case header(<<"transaction">>, Headers) of
|
||||||
undefined ->
|
undefined ->
|
||||||
|
@ -393,7 +393,7 @@ handle_in(?PACKET(?CMD_SUBSCRIBE, Headers),
|
||||||
false ->
|
false ->
|
||||||
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of
|
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of
|
||||||
deny ->
|
deny ->
|
||||||
handle_out(error, {receipt_id(Headers), "ACL Deny"}, Channel);
|
handle_out(error, {receipt_id(Headers), "Authorization Deny"}, Channel);
|
||||||
allow ->
|
allow ->
|
||||||
_ = emqx_broker:subscribe(MountedTopic),
|
_ = emqx_broker:subscribe(MountedTopic),
|
||||||
maybe_outgoing_receipt(
|
maybe_outgoing_receipt(
|
||||||
|
@ -585,9 +585,9 @@ handle_call(discard, Channel) ->
|
||||||
% AllPendings = lists:append(Delivers, Pendings),
|
% AllPendings = lists:append(Delivers, Pendings),
|
||||||
% shutdown_and_reply(takeovered, AllPendings, Channel);
|
% shutdown_and_reply(takeovered, AllPendings, Channel);
|
||||||
|
|
||||||
handle_call(list_acl_cache, Channel) ->
|
handle_call(list_authz_cache, Channel) ->
|
||||||
%% This won't work
|
%% This won't work
|
||||||
{reply, emqx_acl_cache:list_acl_cache(default), Channel};
|
{reply, emqx_authz_cache:list_authz_cache(default), Channel};
|
||||||
|
|
||||||
%% XXX: No Quota Now
|
%% XXX: No Quota Now
|
||||||
% handle_call({quota, Policy}, Channel) ->
|
% handle_call({quota, Policy}, Channel) ->
|
||||||
|
@ -652,8 +652,8 @@ handle_info({sock_closed, Reason},
|
||||||
?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
|
?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(clean_acl_cache, Channel) ->
|
handle_info(clean_authz_cache, Channel) ->
|
||||||
ok = emqx_acl_cache:empty_acl_cache(),
|
ok = emqx_authz_cache:empty_authz_cache(),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(Info, Channel) ->
|
handle_info(Info, Channel) ->
|
||||||
|
|
|
@ -350,7 +350,7 @@ t_ack(_) ->
|
||||||
body = _}, _, _} = parse(Data4)
|
body = _}, _, _} = parse(Data4)
|
||||||
end).
|
end).
|
||||||
|
|
||||||
%% TODO: Mountpoint, AuthChain, ACL + Mountpoint, ClientInfoOverride,
|
%% TODO: Mountpoint, AuthChain, Authorization + Mountpoint, ClientInfoOverride,
|
||||||
%% Listeners, Metrics, Stats, ClientInfo
|
%% Listeners, Metrics, Stats, ClientInfo
|
||||||
%%
|
%%
|
||||||
%% TODO: Start/Stop, List Instace
|
%% TODO: Start/Stop, List Instace
|
||||||
|
|
|
@ -42,11 +42,11 @@
|
||||||
-export([ lookup_client/2
|
-export([ lookup_client/2
|
||||||
, lookup_client/3
|
, lookup_client/3
|
||||||
, kickout_client/1
|
, kickout_client/1
|
||||||
, list_acl_cache/1
|
, list_authz_cache/1
|
||||||
, clean_acl_cache/1
|
, clean_authz_cache/1
|
||||||
, clean_acl_cache/2
|
, clean_authz_cache/2
|
||||||
, clean_acl_cache_all/0
|
, clean_authz_cache_all/0
|
||||||
, clean_acl_cache_all/1
|
, clean_authz_cache_all/1
|
||||||
, set_ratelimit_policy/2
|
, set_ratelimit_policy/2
|
||||||
, set_quota_policy/2
|
, set_quota_policy/2
|
||||||
]).
|
]).
|
||||||
|
@ -266,39 +266,39 @@ kickout_client(Node, ClientId) when Node =:= node() ->
|
||||||
kickout_client(Node, ClientId) ->
|
kickout_client(Node, ClientId) ->
|
||||||
rpc_call(Node, kickout_client, [Node, ClientId]).
|
rpc_call(Node, kickout_client, [Node, ClientId]).
|
||||||
|
|
||||||
list_acl_cache(ClientId) ->
|
list_authz_cache(ClientId) ->
|
||||||
call_client(ClientId, list_acl_cache).
|
call_client(ClientId, list_authz_cache).
|
||||||
|
|
||||||
clean_acl_cache(ClientId) ->
|
clean_authz_cache(ClientId) ->
|
||||||
Results = [clean_acl_cache(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
|
Results = [clean_authz_cache(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
|
||||||
case lists:any(fun(Item) -> Item =:= ok end, Results) of
|
case lists:any(fun(Item) -> Item =:= ok end, Results) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> lists:last(Results)
|
false -> lists:last(Results)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
clean_acl_cache(Node, ClientId) when Node =:= node() ->
|
clean_authz_cache(Node, ClientId) when Node =:= node() ->
|
||||||
case emqx_cm:lookup_channels(ClientId) of
|
case emqx_cm:lookup_channels(ClientId) of
|
||||||
[] ->
|
[] ->
|
||||||
{error, not_found};
|
{error, not_found};
|
||||||
Pids when is_list(Pids) ->
|
Pids when is_list(Pids) ->
|
||||||
erlang:send(lists:last(Pids), clean_acl_cache),
|
erlang:send(lists:last(Pids), clean_authz_cache),
|
||||||
ok
|
ok
|
||||||
end;
|
end;
|
||||||
clean_acl_cache(Node, ClientId) ->
|
clean_authz_cache(Node, ClientId) ->
|
||||||
rpc_call(Node, clean_acl_cache, [Node, ClientId]).
|
rpc_call(Node, clean_authz_cache, [Node, ClientId]).
|
||||||
|
|
||||||
clean_acl_cache_all() ->
|
clean_authz_cache_all() ->
|
||||||
Results = [{Node, clean_acl_cache_all(Node)} || Node <- ekka_mnesia:running_nodes()],
|
Results = [{Node, clean_authz_cache_all(Node)} || Node <- ekka_mnesia:running_nodes()],
|
||||||
case lists:filter(fun({_Node, Item}) -> Item =/= ok end, Results) of
|
case lists:filter(fun({_Node, Item}) -> Item =/= ok end, Results) of
|
||||||
[] -> ok;
|
[] -> ok;
|
||||||
BadNodes -> {error, BadNodes}
|
BadNodes -> {error, BadNodes}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
clean_acl_cache_all(Node) when Node =:= node() ->
|
clean_authz_cache_all(Node) when Node =:= node() ->
|
||||||
emqx_acl_cache:drain_cache();
|
emqx_authz_cache:drain_cache();
|
||||||
|
|
||||||
clean_acl_cache_all(Node) ->
|
clean_authz_cache_all(Node) ->
|
||||||
rpc_call(Node, clean_acl_cache_all, [Node]).
|
rpc_call(Node, clean_authz_cache_all, [Node]).
|
||||||
|
|
||||||
set_ratelimit_policy(ClientId, Policy) ->
|
set_ratelimit_policy(ClientId, Policy) ->
|
||||||
call_client(ClientId, {ratelimit, Policy}).
|
call_client(ClientId, {ratelimit, Policy}).
|
||||||
|
|
|
@ -14,34 +14,34 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_mgmt_api_acl).
|
-module(emqx_mgmt_api_authz).
|
||||||
|
|
||||||
-include("emqx_mgmt.hrl").
|
-include("emqx_mgmt.hrl").
|
||||||
|
|
||||||
-rest_api(#{name => clean_acl_cache_all,
|
-rest_api(#{name => clean_authz_cache_all,
|
||||||
method => 'DELETE',
|
method => 'DELETE',
|
||||||
path => "/acl-cache",
|
path => "/authz-cache",
|
||||||
func => clean_all,
|
func => clean_all,
|
||||||
descr => "Clean acl cache on all nodes"}).
|
descr => "Clean authz cache on all nodes"}).
|
||||||
|
|
||||||
-rest_api(#{name => clean_acl_cache_node,
|
-rest_api(#{name => clean_authz_cache_node,
|
||||||
method => 'DELETE',
|
method => 'DELETE',
|
||||||
path => "nodes/:atom:node/acl-cache",
|
path => "nodes/:atom:node/authz-cache",
|
||||||
func => clean_node,
|
func => clean_node,
|
||||||
descr => "Clean acl cache on specific node"}).
|
descr => "Clean authz cache on specific node"}).
|
||||||
|
|
||||||
-export([ clean_all/2
|
-export([ clean_all/2
|
||||||
, clean_node/2
|
, clean_node/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
clean_all(_Bindings, _Params) ->
|
clean_all(_Bindings, _Params) ->
|
||||||
case emqx_mgmt:clean_acl_cache_all() of
|
case emqx_mgmt:clean_authz_cache_all() of
|
||||||
ok -> emqx_mgmt:return();
|
ok -> emqx_mgmt:return();
|
||||||
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
|
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
clean_node(#{node := Node}, _Params) ->
|
clean_node(#{node := Node}, _Params) ->
|
||||||
case emqx_mgmt:clean_acl_cache_all(Node) of
|
case emqx_mgmt:clean_authz_cache_all(Node) of
|
||||||
ok -> emqx_mgmt:return();
|
ok -> emqx_mgmt:return();
|
||||||
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
|
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
|
||||||
end.
|
end.
|
|
@ -29,7 +29,7 @@
|
||||||
|
|
||||||
-export([ clients/2
|
-export([ clients/2
|
||||||
, client/2
|
, client/2
|
||||||
, acl_cache/2
|
, authz_cache/2
|
||||||
, subscribe/2
|
, subscribe/2
|
||||||
, subscribe_batch/2]).
|
, subscribe_batch/2]).
|
||||||
|
|
||||||
|
@ -67,7 +67,7 @@ api_spec() ->
|
||||||
apis() ->
|
apis() ->
|
||||||
[ clients_api()
|
[ clients_api()
|
||||||
, client_api()
|
, client_api()
|
||||||
, clients_acl_cache_api()
|
, clients_authz_cache_api()
|
||||||
, subscribe_api()].
|
, subscribe_api()].
|
||||||
|
|
||||||
schemas() ->
|
schemas() ->
|
||||||
|
@ -187,8 +187,8 @@ schemas() ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
AclCache = #{
|
AuthzCache = #{
|
||||||
acl_cache => #{
|
authz_cache => #{
|
||||||
type => object,
|
type => object,
|
||||||
properties => #{
|
properties => #{
|
||||||
topic => #{
|
topic => #{
|
||||||
|
@ -209,7 +209,7 @@ schemas() ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
[Client, AclCache].
|
[Client, AuthzCache].
|
||||||
|
|
||||||
clients_api() ->
|
clients_api() ->
|
||||||
Metadata = #{
|
Metadata = #{
|
||||||
|
@ -245,10 +245,10 @@ client_api() ->
|
||||||
<<"200">> => emqx_mgmt_util:response_schema(<<"List clients 200 OK">>, client)}}},
|
<<"200">> => emqx_mgmt_util:response_schema(<<"List clients 200 OK">>, client)}}},
|
||||||
{"/clients/:clientid", Metadata, client}.
|
{"/clients/:clientid", Metadata, client}.
|
||||||
|
|
||||||
clients_acl_cache_api() ->
|
clients_authz_cache_api() ->
|
||||||
Metadata = #{
|
Metadata = #{
|
||||||
get => #{
|
get => #{
|
||||||
description => <<"Get client acl cache">>,
|
description => <<"Get client authz cache">>,
|
||||||
parameters => [#{
|
parameters => [#{
|
||||||
name => clientid,
|
name => clientid,
|
||||||
in => path,
|
in => path,
|
||||||
|
@ -257,9 +257,9 @@ clients_acl_cache_api() ->
|
||||||
}],
|
}],
|
||||||
responses => #{
|
responses => #{
|
||||||
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
|
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
|
||||||
<<"200">> => emqx_mgmt_util:response_schema(<<"Get client acl cache">>, acl_cache)}},
|
<<"200">> => emqx_mgmt_util:response_schema(<<"Get client authz cache">>, <<"authz_cache">>)}},
|
||||||
delete => #{
|
delete => #{
|
||||||
description => <<"Clean client acl cache">>,
|
description => <<"Clean client authz cache">>,
|
||||||
parameters => [#{
|
parameters => [#{
|
||||||
name => clientid,
|
name => clientid,
|
||||||
in => path,
|
in => path,
|
||||||
|
@ -268,8 +268,8 @@ clients_acl_cache_api() ->
|
||||||
}],
|
}],
|
||||||
responses => #{
|
responses => #{
|
||||||
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
|
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
|
||||||
<<"200">> => emqx_mgmt_util:response_schema(<<"Delete clients acl cache OK">>)}}},
|
<<"200">> => emqx_mgmt_util:response_schema(<<"Delete clients 200 OK">>)}}},
|
||||||
{"/clients/:clientid/acl_cache", Metadata, acl_cache}.
|
{"/clients/:clientid/authz_cache", Metadata, authz_cache}.
|
||||||
|
|
||||||
subscribe_api() ->
|
subscribe_api() ->
|
||||||
Metadata = #{
|
Metadata = #{
|
||||||
|
@ -329,13 +329,13 @@ client(delete, Request) ->
|
||||||
ClientID = cowboy_req:binding(clientid, Request),
|
ClientID = cowboy_req:binding(clientid, Request),
|
||||||
kickout(#{clientid => ClientID}).
|
kickout(#{clientid => ClientID}).
|
||||||
|
|
||||||
acl_cache(get, Request) ->
|
authz_cache(get, Request) ->
|
||||||
ClientID = cowboy_req:binding(clientid, Request),
|
ClientID = cowboy_req:binding(clientid, Request),
|
||||||
get_acl_cache(#{clientid => ClientID});
|
get_authz_cache(#{clientid => ClientID});
|
||||||
|
|
||||||
acl_cache(delete, Request) ->
|
authz_cache(delete, Request) ->
|
||||||
ClientID = cowboy_req:binding(clientid, Request),
|
ClientID = cowboy_req:binding(clientid, Request),
|
||||||
clean_acl_cache(#{clientid => ClientID}).
|
clean_authz_cache(#{clientid => ClientID}).
|
||||||
|
|
||||||
subscribe(post, Request) ->
|
subscribe(post, Request) ->
|
||||||
ClientID = cowboy_req:binding(clientid, Request),
|
ClientID = cowboy_req:binding(clientid, Request),
|
||||||
|
@ -382,20 +382,20 @@ kickout(#{clientid := ClientID}) ->
|
||||||
emqx_mgmt:kickout_client(ClientID),
|
emqx_mgmt:kickout_client(ClientID),
|
||||||
{200}.
|
{200}.
|
||||||
|
|
||||||
get_acl_cache(#{clientid := ClientID})->
|
get_authz_cache(#{clientid := ClientID})->
|
||||||
case emqx_mgmt:list_acl_cache(ClientID) of
|
case emqx_mgmt:list_authz_cache(ClientID) of
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
{404, ?CLIENT_ID_NOT_FOUND};
|
{404, ?CLIENT_ID_NOT_FOUND};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
Message = list_to_binary(io_lib:format("~p", [Reason])),
|
Message = list_to_binary(io_lib:format("~p", [Reason])),
|
||||||
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
|
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
|
||||||
Caches ->
|
Caches ->
|
||||||
Response = [format_acl_cache(Cache) || Cache <- Caches],
|
Response = [format_authz_cache(Cache) || Cache <- Caches],
|
||||||
{200, Response}
|
{200, Response}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
clean_acl_cache(#{clientid := ClientID}) ->
|
clean_authz_cache(#{clientid := ClientID}) ->
|
||||||
case emqx_mgmt:clean_acl_cache(ClientID) of
|
case emqx_mgmt:clean_authz_cache(ClientID) of
|
||||||
ok ->
|
ok ->
|
||||||
{200};
|
{200};
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
|
@ -483,11 +483,11 @@ peer_to_binary({Addr, Port}) ->
|
||||||
peer_to_binary(Addr) ->
|
peer_to_binary(Addr) ->
|
||||||
list_to_binary(inet:ntoa(Addr)).
|
list_to_binary(inet:ntoa(Addr)).
|
||||||
|
|
||||||
format_acl_cache({{PubSub, Topic}, {AclResult, Timestamp}}) ->
|
format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) ->
|
||||||
#{
|
#{
|
||||||
access => PubSub,
|
access => PubSub,
|
||||||
topic => Topic,
|
topic => Topic,
|
||||||
result => AclResult,
|
result => AuthzResult,
|
||||||
updated_time => Timestamp
|
updated_time => Timestamp
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -60,9 +60,9 @@ metrics_schema() ->
|
||||||
'client.disconnected' => #{
|
'client.disconnected' => #{
|
||||||
type => integer,
|
type => integer,
|
||||||
description => <<"Number of client disconnects">>},
|
description => <<"Number of client disconnects">>},
|
||||||
'client.check_acl' => #{
|
'client.check_authz' => #{
|
||||||
type => integer,
|
type => integer,
|
||||||
description => <<"Number of ACL rule checks">>},
|
description => <<"Number of Authorization rule checks">>},
|
||||||
'client.subscribe' => #{
|
'client.subscribe' => #{
|
||||||
type => integer,
|
type => integer,
|
||||||
description => <<"Number of client subscriptions">>},
|
description => <<"Number of client subscriptions">>},
|
||||||
|
@ -167,7 +167,7 @@ metrics_schema() ->
|
||||||
description => <<"Number of received PUBLISH packet with occupied identifiers">>},
|
description => <<"Number of received PUBLISH packet with occupied identifiers">>},
|
||||||
'packets.publish.auth_error' => #{
|
'packets.publish.auth_error' => #{
|
||||||
type => integer,
|
type => integer,
|
||||||
description => <<"Number of received PUBLISH packets with failed the ACL check">>},
|
description => <<"Number of received PUBLISH packets with failed the Authorization check">>},
|
||||||
'packets.publish.error' => #{
|
'packets.publish.error' => #{
|
||||||
type => integer,
|
type => integer,
|
||||||
description => <<"Number of received PUBLISH packet that cannot be published">>},
|
description => <<"Number of received PUBLISH packet that cannot be published">>},
|
||||||
|
@ -227,7 +227,7 @@ metrics_schema() ->
|
||||||
description => <<"Number of received SUBSCRIBE packet with failed subscriptions">>},
|
description => <<"Number of received SUBSCRIBE packet with failed subscriptions">>},
|
||||||
'packets.subscribe.auth_error' => #{
|
'packets.subscribe.auth_error' => #{
|
||||||
type => integer,
|
type => integer,
|
||||||
description => <<"Number of received SUBACK packet with failed ACL check">>},
|
description => <<"Number of received SUBACK packet with failed Authorization check">>},
|
||||||
'packets.suback.sent' => #{
|
'packets.suback.sent' => #{
|
||||||
type => integer,
|
type => integer,
|
||||||
description => <<"Number of sent SUBACK packet">>},
|
description => <<"Number of sent SUBACK packet">>},
|
||||||
|
|
|
@ -35,7 +35,7 @@
|
||||||
, list_apps/0
|
, list_apps/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% APP Auth/ACL API
|
%% APP Auth/Authorization API
|
||||||
-export([is_authorized/2]).
|
-export([is_authorized/2]).
|
||||||
|
|
||||||
-define(APP, emqx_management).
|
-define(APP, emqx_management).
|
||||||
|
|
|
@ -38,7 +38,7 @@
|
||||||
, trace/1
|
, trace/1
|
||||||
, log/1
|
, log/1
|
||||||
, mgmt/1
|
, mgmt/1
|
||||||
, acl/1
|
, authz/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(PROC_INFOKEYS, [status,
|
-define(PROC_INFOKEYS, [status,
|
||||||
|
@ -515,31 +515,31 @@ listeners(_) ->
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc acl Command
|
%% @doc authz Command
|
||||||
|
|
||||||
acl(["cache-clean", "node", Node]) ->
|
authz(["cache-clean", "node", Node]) ->
|
||||||
case emqx_mgmt:clean_acl_cache_all(erlang:list_to_existing_atom(Node)) of
|
case emqx_mgmt:clean_authz_cache_all(erlang:list_to_existing_atom(Node)) of
|
||||||
ok ->
|
ok ->
|
||||||
emqx_ctl:print("ACL cache drain started on node ~s.~n", [Node]);
|
emqx_ctl:print("Authorization cache drain started on node ~s.~n", [Node]);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
emqx_ctl:print("ACL drain failed on node ~s: ~0p.~n", [Node, Reason])
|
emqx_ctl:print("Authorization drain failed on node ~s: ~0p.~n", [Node, Reason])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
acl(["cache-clean", "all"]) ->
|
authz(["cache-clean", "all"]) ->
|
||||||
case emqx_mgmt:clean_acl_cache_all() of
|
case emqx_mgmt:clean_authz_cache_all() of
|
||||||
ok ->
|
ok ->
|
||||||
emqx_ctl:print("Started ACL cache drain in all nodes~n");
|
emqx_ctl:print("Started Authorization cache drain in all nodes~n");
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
emqx_ctl:print("ACL cache-clean failed: ~p.~n", [Reason])
|
emqx_ctl:print("Authorization cache-clean failed: ~p.~n", [Reason])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
acl(["cache-clean", ClientId]) ->
|
authz(["cache-clean", ClientId]) ->
|
||||||
emqx_mgmt:clean_acl_cache(ClientId);
|
emqx_mgmt:clean_authz_cache(ClientId);
|
||||||
|
|
||||||
acl(_) ->
|
authz(_) ->
|
||||||
emqx_ctl:usage([{"acl cache-clean all", "Clears acl cache on all nodes"},
|
emqx_ctl:usage([{"authz cache-clean all", "Clears authorization cache on all nodes"},
|
||||||
{"acl cache-clean node <Node>", "Clears acl cache on given node"},
|
{"authz cache-clean node <Node>", "Clears authorization cache on given node"},
|
||||||
{"acl cache-clean <ClientId>", "Clears acl cache for given client"}
|
{"authz cache-clean <ClientId>", "Clears authorization cache for given client"}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -87,10 +87,10 @@ t_clients(_) ->
|
||||||
AfterKickoutResponse = emqx_mgmt_api_test_util:request_api(get, Client2Path),
|
AfterKickoutResponse = emqx_mgmt_api_test_util:request_api(get, Client2Path),
|
||||||
?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse),
|
?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse),
|
||||||
|
|
||||||
%% get /clients/:clientid/acl_cache should has no acl cache
|
%% get /clients/:clientid/authz_cache should has no authz cache
|
||||||
Client1AclCachePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "acl_cache"]),
|
Client1AuthzCachePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "authz_cache"]),
|
||||||
{ok, Client1AclCache} = emqx_mgmt_api_test_util:request_api(get, Client1AclCachePath),
|
{ok, Client1AuthzCache} = emqx_mgmt_api_test_util:request_api(get, Client1AuthzCachePath),
|
||||||
?assertEqual("[]", Client1AclCache),
|
?assertEqual("[]", Client1AuthzCache),
|
||||||
|
|
||||||
%% post /clients/:clientid/subscribe
|
%% post /clients/:clientid/subscribe
|
||||||
SubscribeBody = #{topic => Topic, qos => Qos},
|
SubscribeBody = #{topic => Topic, qos => Qos},
|
||||||
|
|
Loading…
Reference in New Issue