401 lines
12 KiB
Erlang
401 lines
12 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_authz_SUITE).
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-include("emqx_authz.hrl").
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("emqx/include/emqx_placeholder.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
all() ->
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
groups() ->
|
|
[].
|
|
|
|
init_per_suite(Config) ->
|
|
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
|
meck:expect(emqx_resource, create_local, fun(_, _, _, _) -> {ok, meck_data} end),
|
|
meck:expect(emqx_resource, remove_local, fun(_) -> ok end),
|
|
meck:expect(
|
|
emqx_authz,
|
|
acl_conf_file,
|
|
fun() ->
|
|
emqx_common_test_helpers:deps_path(emqx_authz, "etc/acl.conf")
|
|
end
|
|
),
|
|
|
|
ok = emqx_common_test_helpers:start_apps(
|
|
[emqx_conf, emqx_authz],
|
|
fun set_special_configs/1
|
|
),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
{ok, _} = emqx:update_config(
|
|
[authorization],
|
|
#{
|
|
<<"no_match">> => <<"allow">>,
|
|
<<"cache">> => #{<<"enable">> => <<"true">>},
|
|
<<"sources">> => []
|
|
}
|
|
),
|
|
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
|
|
meck:unload(emqx_resource),
|
|
ok.
|
|
|
|
init_per_testcase(TestCase, Config) when
|
|
TestCase =:= t_subscribe_deny_disconnect_publishes_last_will_testament;
|
|
TestCase =:= t_publish_deny_disconnect_publishes_last_will_testament
|
|
->
|
|
{ok, _} = emqx_authz:update(?CMD_REPLACE, []),
|
|
{ok, _} = emqx:update_config([authorization, deny_action], disconnect),
|
|
Config;
|
|
init_per_testcase(_, Config) ->
|
|
{ok, _} = emqx_authz:update(?CMD_REPLACE, []),
|
|
Config.
|
|
|
|
end_per_testcase(TestCase, _Config) when
|
|
TestCase =:= t_subscribe_deny_disconnect_publishes_last_will_testament;
|
|
TestCase =:= t_publish_deny_disconnect_publishes_last_will_testament
|
|
->
|
|
{ok, _} = emqx:update_config([authorization, deny_action], ignore),
|
|
ok;
|
|
end_per_testcase(_TestCase, _Config) ->
|
|
ok.
|
|
|
|
set_special_configs(emqx_authz) ->
|
|
{ok, _} = emqx:update_config([authorization, cache, enable], false),
|
|
{ok, _} = emqx:update_config([authorization, no_match], deny),
|
|
{ok, _} = emqx:update_config([authorization, sources], []),
|
|
ok;
|
|
set_special_configs(_App) ->
|
|
ok.
|
|
|
|
-define(SOURCE1, #{
|
|
<<"type">> => <<"http">>,
|
|
<<"enable">> => true,
|
|
<<"url">> => <<"https://example.com:443/a/b?c=d">>,
|
|
<<"headers">> => #{},
|
|
<<"ssl">> => #{<<"enable">> => true},
|
|
<<"method">> => <<"get">>,
|
|
<<"request_timeout">> => 5000
|
|
}).
|
|
-define(SOURCE2, #{
|
|
<<"type">> => <<"mongodb">>,
|
|
<<"enable">> => true,
|
|
<<"mongo_type">> => <<"single">>,
|
|
<<"server">> => <<"127.0.0.1:27017">>,
|
|
<<"w_mode">> => <<"unsafe">>,
|
|
<<"pool_size">> => 1,
|
|
<<"database">> => <<"mqtt">>,
|
|
<<"ssl">> => #{<<"enable">> => false},
|
|
<<"collection">> => <<"authz">>,
|
|
<<"filter">> => #{<<"a">> => <<"b">>}
|
|
}).
|
|
-define(SOURCE3, #{
|
|
<<"type">> => <<"mysql">>,
|
|
<<"enable">> => true,
|
|
<<"server">> => <<"127.0.0.1:27017">>,
|
|
<<"pool_size">> => 1,
|
|
<<"database">> => <<"mqtt">>,
|
|
<<"username">> => <<"xx">>,
|
|
<<"password">> => <<"ee">>,
|
|
<<"auto_reconnect">> => true,
|
|
<<"ssl">> => #{<<"enable">> => false},
|
|
<<"query">> => <<"abcb">>
|
|
}).
|
|
-define(SOURCE4, #{
|
|
<<"type">> => <<"postgresql">>,
|
|
<<"enable">> => true,
|
|
<<"server">> => <<"127.0.0.1:27017">>,
|
|
<<"pool_size">> => 1,
|
|
<<"database">> => <<"mqtt">>,
|
|
<<"username">> => <<"xx">>,
|
|
<<"password">> => <<"ee">>,
|
|
<<"auto_reconnect">> => true,
|
|
<<"ssl">> => #{<<"enable">> => false},
|
|
<<"query">> => <<"abcb">>
|
|
}).
|
|
-define(SOURCE5, #{
|
|
<<"type">> => <<"redis">>,
|
|
<<"redis_type">> => <<"single">>,
|
|
<<"enable">> => true,
|
|
<<"server">> => <<"127.0.0.1:27017">>,
|
|
<<"pool_size">> => 1,
|
|
<<"database">> => 0,
|
|
<<"password">> => <<"ee">>,
|
|
<<"auto_reconnect">> => true,
|
|
<<"ssl">> => #{<<"enable">> => false},
|
|
<<"cmd">> => <<"HGETALL mqtt_authz:", ?PH_USERNAME/binary>>
|
|
}).
|
|
-define(SOURCE6, #{
|
|
<<"type">> => <<"file">>,
|
|
<<"enable">> => true,
|
|
<<"rules">> =>
|
|
<<
|
|
"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}."
|
|
"\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}."
|
|
>>
|
|
}).
|
|
-define(SOURCE7, #{
|
|
<<"type">> => <<"file">>,
|
|
<<"enable">> => true,
|
|
<<"rules">> =>
|
|
<<
|
|
"{allow,{username,\"some_client\"},publish,[\"some_client/lwt\"]}.\n"
|
|
"{deny, all}."
|
|
>>
|
|
}).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Testcases
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_update_source(_) ->
|
|
%% replace all
|
|
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE3]),
|
|
{ok, _} = emqx_authz:update(?CMD_PREPEND, ?SOURCE2),
|
|
{ok, _} = emqx_authz:update(?CMD_PREPEND, ?SOURCE1),
|
|
{ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE4),
|
|
{ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE5),
|
|
{ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE6),
|
|
|
|
?assertMatch(
|
|
[
|
|
#{type := http, enable := true},
|
|
#{type := mongodb, enable := true},
|
|
#{type := mysql, enable := true},
|
|
#{type := postgresql, enable := true},
|
|
#{type := redis, enable := true},
|
|
#{type := file, enable := true}
|
|
],
|
|
emqx_conf:get([authorization, sources], [])
|
|
),
|
|
|
|
{ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE1#{<<"enable">> := true}),
|
|
{ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE2#{<<"enable">> := true}),
|
|
{ok, _} = emqx_authz:update({?CMD_REPLACE, mysql}, ?SOURCE3#{<<"enable">> := true}),
|
|
{ok, _} = emqx_authz:update({?CMD_REPLACE, postgresql}, ?SOURCE4#{<<"enable">> := true}),
|
|
{ok, _} = emqx_authz:update({?CMD_REPLACE, redis}, ?SOURCE5#{<<"enable">> := true}),
|
|
{ok, _} = emqx_authz:update({?CMD_REPLACE, file}, ?SOURCE6#{<<"enable">> := true}),
|
|
|
|
{ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE1#{<<"enable">> := false}),
|
|
{ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE2#{<<"enable">> := false}),
|
|
{ok, _} = emqx_authz:update({?CMD_REPLACE, mysql}, ?SOURCE3#{<<"enable">> := false}),
|
|
{ok, _} = emqx_authz:update({?CMD_REPLACE, postgresql}, ?SOURCE4#{<<"enable">> := false}),
|
|
{ok, _} = emqx_authz:update({?CMD_REPLACE, redis}, ?SOURCE5#{<<"enable">> := false}),
|
|
{ok, _} = emqx_authz:update({?CMD_REPLACE, file}, ?SOURCE6#{<<"enable">> := false}),
|
|
|
|
?assertMatch(
|
|
[
|
|
#{type := http, enable := false},
|
|
#{type := mongodb, enable := false},
|
|
#{type := mysql, enable := false},
|
|
#{type := postgresql, enable := false},
|
|
#{type := redis, enable := false},
|
|
#{type := file, enable := false}
|
|
],
|
|
emqx_conf:get([authorization, sources], [])
|
|
),
|
|
|
|
{ok, _} = emqx_authz:update(?CMD_REPLACE, []).
|
|
|
|
t_delete_source(_) ->
|
|
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE1]),
|
|
|
|
?assertMatch([#{type := http, enable := true}], emqx_conf:get([authorization, sources], [])),
|
|
|
|
{ok, _} = emqx_authz:update({?CMD_DELETE, http}, #{}),
|
|
|
|
?assertMatch([], emqx_conf:get([authorization, sources], [])).
|
|
|
|
t_move_source(_) ->
|
|
{ok, _} = emqx_authz:update(
|
|
?CMD_REPLACE,
|
|
[
|
|
?SOURCE1,
|
|
?SOURCE2,
|
|
?SOURCE3,
|
|
?SOURCE4,
|
|
?SOURCE5,
|
|
?SOURCE6
|
|
]
|
|
),
|
|
?assertMatch(
|
|
[
|
|
#{type := http},
|
|
#{type := mongodb},
|
|
#{type := mysql},
|
|
#{type := postgresql},
|
|
#{type := redis},
|
|
#{type := file}
|
|
],
|
|
emqx_authz:lookup()
|
|
),
|
|
|
|
{ok, _} = emqx_authz:move(postgresql, ?CMD_MOVE_FRONT),
|
|
?assertMatch(
|
|
[
|
|
#{type := postgresql},
|
|
#{type := http},
|
|
#{type := mongodb},
|
|
#{type := mysql},
|
|
#{type := redis},
|
|
#{type := file}
|
|
],
|
|
emqx_authz:lookup()
|
|
),
|
|
|
|
{ok, _} = emqx_authz:move(http, ?CMD_MOVE_REAR),
|
|
?assertMatch(
|
|
[
|
|
#{type := postgresql},
|
|
#{type := mongodb},
|
|
#{type := mysql},
|
|
#{type := redis},
|
|
#{type := file},
|
|
#{type := http}
|
|
],
|
|
emqx_authz:lookup()
|
|
),
|
|
|
|
{ok, _} = emqx_authz:move(mysql, ?CMD_MOVE_BEFORE(postgresql)),
|
|
?assertMatch(
|
|
[
|
|
#{type := mysql},
|
|
#{type := postgresql},
|
|
#{type := mongodb},
|
|
#{type := redis},
|
|
#{type := file},
|
|
#{type := http}
|
|
],
|
|
emqx_authz:lookup()
|
|
),
|
|
|
|
{ok, _} = emqx_authz:move(mongodb, ?CMD_MOVE_AFTER(http)),
|
|
?assertMatch(
|
|
[
|
|
#{type := mysql},
|
|
#{type := postgresql},
|
|
#{type := redis},
|
|
#{type := file},
|
|
#{type := http},
|
|
#{type := mongodb}
|
|
],
|
|
emqx_authz:lookup()
|
|
),
|
|
|
|
ok.
|
|
|
|
t_get_enabled_authzs_none_enabled(_Config) ->
|
|
?assertEqual([], emqx_authz:get_enabled_authzs()).
|
|
|
|
t_get_enabled_authzs_some_enabled(_Config) ->
|
|
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE4]),
|
|
?assertEqual([postgresql], emqx_authz:get_enabled_authzs()).
|
|
|
|
t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) ->
|
|
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]),
|
|
{ok, C} = emqtt:start_link([
|
|
{username, <<"some_client">>},
|
|
{will_topic, <<"some_client/lwt">>},
|
|
{will_payload, <<"should be published">>}
|
|
]),
|
|
{ok, _} = emqtt:connect(C),
|
|
ok = emqx:subscribe(<<"some_client/lwt">>),
|
|
process_flag(trap_exit, true),
|
|
|
|
try
|
|
emqtt:subscribe(C, <<"unauthorized">>),
|
|
error(should_have_disconnected)
|
|
catch
|
|
exit:{{shutdown, tcp_closed}, _} ->
|
|
ok
|
|
end,
|
|
|
|
receive
|
|
{deliver, <<"some_client/lwt">>, #message{payload = <<"should be published">>}} ->
|
|
ok
|
|
after 2_000 ->
|
|
error(lwt_not_published)
|
|
end,
|
|
|
|
ok.
|
|
|
|
t_publish_deny_disconnect_publishes_last_will_testament(_Config) ->
|
|
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]),
|
|
{ok, C} = emqtt:start_link([
|
|
{username, <<"some_client">>},
|
|
{will_topic, <<"some_client/lwt">>},
|
|
{will_payload, <<"should be published">>}
|
|
]),
|
|
{ok, _} = emqtt:connect(C),
|
|
ok = emqx:subscribe(<<"some_client/lwt">>),
|
|
process_flag(trap_exit, true),
|
|
|
|
%% disconnect is async
|
|
Ref = monitor(process, C),
|
|
emqtt:publish(C, <<"some/topic">>, <<"unauthorized">>),
|
|
receive
|
|
{'DOWN', Ref, process, C, _} ->
|
|
ok
|
|
after 1_000 ->
|
|
error(client_should_have_been_disconnected)
|
|
end,
|
|
receive
|
|
{deliver, <<"some_client/lwt">>, #message{payload = <<"should be published">>}} ->
|
|
ok
|
|
after 2_000 ->
|
|
error(lwt_not_published)
|
|
end,
|
|
|
|
ok.
|
|
|
|
t_publish_last_will_testament_denied_topic(_Config) ->
|
|
{ok, C} = emqtt:start_link([
|
|
{will_topic, <<"$SYS/lwt">>},
|
|
{will_payload, <<"should not be published">>}
|
|
]),
|
|
{ok, _} = emqtt:connect(C),
|
|
ok = emqx:subscribe(<<"$SYS/lwt">>),
|
|
unlink(C),
|
|
ok = snabbkaffe:start_trace(),
|
|
{true, {ok, _}} = ?wait_async_action(
|
|
exit(C, kill),
|
|
#{?snk_kind := last_will_testament_publish_denied},
|
|
1_000
|
|
),
|
|
ok = snabbkaffe:stop(),
|
|
|
|
receive
|
|
{deliver, <<"$SYS/lwt">>, #message{payload = <<"should not be published">>}} ->
|
|
error(lwt_should_not_be_published_to_forbidden_topic)
|
|
after 1_000 ->
|
|
ok
|
|
end,
|
|
|
|
ok.
|
|
|
|
stop_apps(Apps) ->
|
|
lists:foreach(fun application:stop/1, Apps).
|