%%-------------------------------------------------------------------- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_authz_SUITE). -compile(nowarn_export_all). -compile(export_all). -include("emqx_authz.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). all() -> emqx_common_test_helpers:all(?MODULE). groups() -> []. init_per_suite(Config) -> Apps = emqx_cth_suite:start( [ emqx, {emqx_conf, "authorization { cache { enable = false }, no_match = deny, sources = [] }"}, emqx_auth ], #{ work_dir => filename:join(?config(priv_dir, Config), ?MODULE) } ), ok = emqx_authz_test_lib:register_fake_sources([http, redis, mongodb, mysql, postgresql]), [{suite_apps, Apps} | Config]. end_per_suite(Config) -> {ok, _} = emqx:update_config( [authorization], #{ <<"no_match">> => <<"allow">>, <<"cache">> => #{<<"enable">> => <<"true">>}, <<"sources">> => [] } ), ok = emqx_authz_test_lib:deregister_sources(), emqx_cth_suite:stop(?config(suite_apps, Config)), ok. init_per_testcase(TestCase, Config) when TestCase =:= t_subscribe_deny_disconnect_publishes_last_will_testament; TestCase =:= t_publish_last_will_testament_banned_client_connecting; TestCase =:= t_publish_deny_disconnect_publishes_last_will_testament -> {ok, _} = emqx_authz:update(?CMD_REPLACE, []), {ok, _} = emqx:update_config([authorization, deny_action], disconnect), Config; init_per_testcase(_TestCase, Config) -> _ = file:delete(emqx_authz_file:acl_conf_file()), {ok, _} = emqx_authz:update(?CMD_REPLACE, []), Config. end_per_testcase(TestCase, _Config) when TestCase =:= t_subscribe_deny_disconnect_publishes_last_will_testament; TestCase =:= t_publish_last_will_testament_banned_client_connecting; TestCase =:= t_publish_deny_disconnect_publishes_last_will_testament -> {ok, _} = emqx:update_config([authorization, deny_action], ignore), {ok, _} = emqx_authz:update(?CMD_REPLACE, []), emqx_common_test_helpers:call_janitor(), ok; end_per_testcase(_TestCase, _Config) -> emqx_common_test_helpers:call_janitor(), ok. -define(SOURCE_HTTP, #{ <<"type">> => <<"http">>, <<"enable">> => true, <<"url">> => <<"https://example.com:443/a/b?c=d">>, <<"headers">> => #{}, <<"ssl">> => #{<<"enable">> => true}, <<"method">> => <<"get">>, <<"request_timeout">> => <<"5s">> }). -define(SOURCE_MONGODB, #{ <<"type">> => <<"mongodb">>, <<"enable">> => true, <<"mongo_type">> => <<"single">>, <<"server">> => <<"127.0.0.1:27017">>, <<"w_mode">> => <<"unsafe">>, <<"pool_size">> => 1, <<"database">> => <<"mqtt">>, <<"ssl">> => #{<<"enable">> => false}, <<"collection">> => <<"authz">>, <<"filter">> => #{<<"a">> => <<"b">>} }). -define(SOURCE_MYSQL, #{ <<"type">> => <<"mysql">>, <<"enable">> => true, <<"server">> => <<"127.0.0.1:27017">>, <<"pool_size">> => 1, <<"database">> => <<"mqtt">>, <<"username">> => <<"xx">>, <<"password">> => <<"ee">>, <<"auto_reconnect">> => true, <<"ssl">> => #{<<"enable">> => false}, <<"query">> => <<"abcb">> }). -define(SOURCE_POSTGRESQL, #{ <<"type">> => <<"postgresql">>, <<"enable">> => true, <<"server">> => <<"127.0.0.1:27017">>, <<"pool_size">> => 1, <<"database">> => <<"mqtt">>, <<"username">> => <<"xx">>, <<"password">> => <<"ee">>, <<"auto_reconnect">> => true, <<"ssl">> => #{<<"enable">> => false}, <<"query">> => <<"abcb">> }). -define(SOURCE_REDIS, #{ <<"type">> => <<"redis">>, <<"redis_type">> => <<"single">>, <<"enable">> => true, <<"server">> => <<"127.0.0.1:27017">>, <<"pool_size">> => 1, <<"database">> => 0, <<"password">> => <<"ee">>, <<"auto_reconnect">> => true, <<"ssl">> => #{<<"enable">> => false}, <<"cmd">> => <<"HGETALL mqtt_authz:", ?PH_USERNAME/binary>> }). -define(SOURCE_FILE(Rules), #{ <<"type">> => <<"file">>, <<"enable">> => true, <<"rules">> => Rules }). -define(SOURCE_FILE1, ?SOURCE_FILE( << "{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}." "\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}." >> ) ). -define(SOURCE_FILE2, ?SOURCE_FILE( << "{allow,{username,\"some_client\"},publish,[\"some_client/lwt\"]}.\n" "{deny, all}." >> ) ). %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ -define(UPDATE_ERROR(Err), {error, {pre_config_update, emqx_authz, Err}}). t_bad_file_source(_) -> BadContent = ?SOURCE_FILE(<<"{allow,{username,\"bar\"}, publish, [\"test\"]}">>), BadContentErr = {bad_acl_file_content, {1, erl_parse, ["syntax error before: ", []]}}, BadRule = ?SOURCE_FILE(<<"{allow,{username,\"bar\"},publish}.">>), BadRuleErr = #{ reason => invalid_authorization_rule, value => {allow, {username, "bar"}, publish} }, BadPermission = ?SOURCE_FILE(<<"{not_allow,{username,\"bar\"},publish,[\"test\"]}.">>), BadPermissionErr = #{reason => invalid_authorization_permission, value => not_allow}, BadAction = ?SOURCE_FILE(<<"{allow,{username,\"bar\"},pubsub,[\"test\"]}.">>), BadActionErr = #{reason => invalid_authorization_action, value => pubsub}, lists:foreach( fun({Source, Error}) -> File = emqx_authz_file:acl_conf_file(), ?assertEqual({error, enoent}, file:read_file(File)), ?assertEqual(?UPDATE_ERROR(Error), emqx_authz:update(?CMD_REPLACE, [Source])), ?assertEqual(?UPDATE_ERROR(Error), emqx_authz:update(?CMD_PREPEND, Source)), ?assertEqual(?UPDATE_ERROR(Error), emqx_authz:update(?CMD_APPEND, Source)), %% Check file is not created if update failed; ?assertEqual({error, enoent}, file:read_file(File)) end, [ {BadContent, BadContentErr}, {BadRule, BadRuleErr}, {BadPermission, BadPermissionErr}, {BadAction, BadActionErr} ] ), ?assertMatch( [], emqx_conf:get([authorization, sources], []) ). t_good_file_source(_) -> RuleBin = <<"{allow,{username,\"bar\"}, publish, [\"test\"]}.">>, GoodFileSource = ?SOURCE_FILE(RuleBin), File = emqx_authz_file:acl_conf_file(), lists:foreach( fun({Command, Argument}) -> _ = file:delete(File), ?assertMatch({ok, _}, emqx_authz:update(Command, Argument)), ?assertEqual({ok, RuleBin}, file:read_file(File)), {ok, _} = emqx_authz:update(?CMD_REPLACE, []) end, [ {?CMD_REPLACE, [GoodFileSource]}, {?CMD_PREPEND, GoodFileSource}, {?CMD_APPEND, GoodFileSource} ] ). t_update_source(_) -> %% replace all {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_MYSQL]), {ok, _} = emqx_authz:update(?CMD_PREPEND, ?SOURCE_MONGODB), {ok, _} = emqx_authz:update(?CMD_PREPEND, ?SOURCE_HTTP), {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE_POSTGRESQL), {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE_REDIS), {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE_FILE1), ?assertMatch( [ #{type := http, enable := true}, #{type := mongodb, enable := true}, #{type := mysql, enable := true}, #{type := postgresql, enable := true}, #{type := redis, enable := true}, #{type := file, enable := true} ], emqx_conf:get([authorization, sources], []) ), {ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE_HTTP#{<<"enable">> := true}), {ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE_MONGODB#{<<"enable">> := true}), {ok, _} = emqx_authz:update({?CMD_REPLACE, mysql}, ?SOURCE_MYSQL#{<<"enable">> := true}), {ok, _} = emqx_authz:update({?CMD_REPLACE, postgresql}, ?SOURCE_POSTGRESQL#{ <<"enable">> := true }), {ok, _} = emqx_authz:update({?CMD_REPLACE, redis}, ?SOURCE_REDIS#{<<"enable">> := true}), {ok, _} = emqx_authz:update({?CMD_REPLACE, file}, ?SOURCE_FILE1#{<<"enable">> := true}), {ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE_HTTP#{<<"enable">> := false}), {ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE_MONGODB#{<<"enable">> := false}), {ok, _} = emqx_authz:update({?CMD_REPLACE, mysql}, ?SOURCE_MYSQL#{<<"enable">> := false}), {ok, _} = emqx_authz:update({?CMD_REPLACE, postgresql}, ?SOURCE_POSTGRESQL#{ <<"enable">> := false }), {ok, _} = emqx_authz:update({?CMD_REPLACE, redis}, ?SOURCE_REDIS#{<<"enable">> := false}), {ok, _} = emqx_authz:update({?CMD_REPLACE, file}, ?SOURCE_FILE1#{<<"enable">> := false}), ?assertMatch( [ #{type := http, enable := false}, #{type := mongodb, enable := false}, #{type := mysql, enable := false}, #{type := postgresql, enable := false}, #{type := redis, enable := false}, #{type := file, enable := false} ], emqx_conf:get([authorization, sources], []) ), ?assertMatch( [ #{type := http, enable := false}, #{type := mongodb, enable := false}, #{type := mysql, enable := false}, #{type := postgresql, enable := false}, #{type := redis, enable := false}, #{type := file, enable := false} ], emqx_authz:lookup() ), {ok, _} = emqx_authz:update(?CMD_REPLACE, []). t_replace_all(_) -> RootKey = [<<"authorization">>], Conf = emqx:get_raw_config(RootKey), emqx_authz_utils:update_config(RootKey, Conf#{ <<"sources">> => [ ?SOURCE_FILE1, ?SOURCE_REDIS, ?SOURCE_POSTGRESQL, ?SOURCE_MYSQL, ?SOURCE_MONGODB, ?SOURCE_HTTP ] }), %% config ?assertMatch( [ #{type := file, enable := true}, #{type := redis, enable := true}, #{type := postgresql, enable := true}, #{type := mysql, enable := true}, #{type := mongodb, enable := true}, #{type := http, enable := true} ], emqx_conf:get([authorization, sources], []) ), %% hooks status ?assertMatch( [ #{type := file, enable := true}, #{type := redis, enable := true}, #{type := postgresql, enable := true}, #{type := mysql, enable := true}, #{type := mongodb, enable := true}, #{type := http, enable := true} ], emqx_authz:lookup() ), Ids = [http, mongodb, mysql, postgresql, redis, file], %% metrics lists:foreach( fun(Id) -> ?assert(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id) end, Ids ), ?assertMatch( {ok, _}, emqx_authz_utils:update_config( RootKey, Conf#{<<"sources">> => [?SOURCE_HTTP#{<<"enable">> => false}]} ) ), %% hooks status ?assertMatch([#{type := http, enable := false}], emqx_authz:lookup()), %% metrics ?assert(emqx_metrics_worker:has_metrics(authz_metrics, http)), lists:foreach( fun(Id) -> ?assertNot(emqx_metrics_worker:has_metrics(authz_metrics, Id), Id) end, Ids -- [http] ), ok. t_delete_source(_) -> {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_HTTP]), ?assertMatch([#{type := http, enable := true}], emqx_conf:get([authorization, sources], [])), {ok, _} = emqx_authz:update({?CMD_DELETE, http}, #{}), ?assertMatch([], emqx_conf:get([authorization, sources], [])). t_move_source(_) -> {ok, _} = emqx_authz:update( ?CMD_REPLACE, [ ?SOURCE_HTTP, ?SOURCE_MONGODB, ?SOURCE_MYSQL, ?SOURCE_POSTGRESQL, ?SOURCE_REDIS, ?SOURCE_FILE1 ] ), ?assertMatch( [ #{type := http}, #{type := mongodb}, #{type := mysql}, #{type := postgresql}, #{type := redis}, #{type := file} ], emqx_authz:lookup() ), {ok, _} = emqx_authz:move(postgresql, ?CMD_MOVE_FRONT), ?assertMatch( [ #{type := postgresql}, #{type := http}, #{type := mongodb}, #{type := mysql}, #{type := redis}, #{type := file} ], emqx_authz:lookup() ), {ok, _} = emqx_authz:move(http, ?CMD_MOVE_REAR), ?assertMatch( [ #{type := postgresql}, #{type := mongodb}, #{type := mysql}, #{type := redis}, #{type := file}, #{type := http} ], emqx_authz:lookup() ), {ok, _} = emqx_authz:move(mysql, ?CMD_MOVE_BEFORE(postgresql)), ?assertMatch( [ #{type := mysql}, #{type := postgresql}, #{type := mongodb}, #{type := redis}, #{type := file}, #{type := http} ], emqx_authz:lookup() ), {ok, _} = emqx_authz:move(mongodb, ?CMD_MOVE_AFTER(http)), ?assertMatch( [ #{type := mysql}, #{type := postgresql}, #{type := redis}, #{type := file}, #{type := http}, #{type := mongodb} ], emqx_authz:lookup() ), ok. t_pre_config_update_crash(_) -> ok = meck:new(emqx_authz_fake_source, [non_strict, passthrough, no_history]), ok = meck:expect(emqx_authz_fake_source, write_files, fun(_) -> meck:exception(error, oops) end), ?assertEqual( {error, {pre_config_update, emqx_authz, oops}}, emqx_authz:update(?CMD_APPEND, ?SOURCE_HTTP) ), ok = meck:unload(emqx_authz_fake_source). t_get_enabled_authzs_none_enabled(_Config) -> ?assertEqual([], emqx_authz:get_enabled_authzs()). t_get_enabled_authzs_some_enabled(_Config) -> {ok, _} = emqx_authz:update(?CMD_REPLACE, [ ?SOURCE_POSTGRESQL, ?SOURCE_REDIS#{<<"enable">> := false} ]), ?assertEqual([postgresql], emqx_authz:get_enabled_authzs()). t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) -> {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_FILE2]), {ok, C} = emqtt:start_link([ {username, <<"some_client">>}, {will_topic, <<"some_client/lwt">>}, {will_payload, <<"should be published">>} ]), {ok, _} = emqtt:connect(C), ok = emqx:subscribe(<<"some_client/lwt">>), process_flag(trap_exit, true), try emqtt:subscribe(C, <<"unauthorized">>), error(should_have_disconnected) catch exit:{{shutdown, tcp_closed}, _} -> ok end, receive {deliver, <<"some_client/lwt">>, #message{payload = <<"should be published">>}} -> ok after 2_000 -> error(lwt_not_published) end, ok. t_publish_deny_disconnect_publishes_last_will_testament(_Config) -> {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_FILE2]), {ok, C} = emqtt:start_link([ {username, <<"some_client">>}, {will_topic, <<"some_client/lwt">>}, {will_payload, <<"should be published">>} ]), {ok, _} = emqtt:connect(C), ok = emqx:subscribe(<<"some_client/lwt">>), process_flag(trap_exit, true), %% disconnect is async Ref = monitor(process, C), emqtt:publish(C, <<"some/topic">>, <<"unauthorized">>), receive {'DOWN', Ref, process, C, _} -> ok after 1_000 -> error(client_should_have_been_disconnected) end, receive {deliver, <<"some_client/lwt">>, #message{payload = <<"should be published">>}} -> ok after 2_000 -> error(lwt_not_published) end, ok. t_publish_last_will_testament_denied_topic(_Config) -> {ok, C} = emqtt:start_link([ {will_topic, <<"$SYS/lwt">>}, {will_payload, <<"should not be published">>} ]), {ok, _} = emqtt:connect(C), ok = emqx:subscribe(<<"$SYS/lwt">>), unlink(C), ok = snabbkaffe:start_trace(), {true, {ok, _}} = ?wait_async_action( exit(C, kill), #{?snk_kind := last_will_testament_publish_denied}, 1_000 ), ok = snabbkaffe:stop(), receive {deliver, <<"$SYS/lwt">>, #message{payload = <<"should not be published">>}} -> error(lwt_should_not_be_published_to_forbidden_topic) after 1_000 -> ok end, ok. %% client is allowed by ACL to publish to its LWT topic, is connected, %% and then gets banned and kicked out while connected. Should not %% publish LWT. t_publish_last_will_testament_banned_client_connecting(_Config) -> {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_FILE2]), Username = <<"some_client">>, ClientId = <<"some_clientid">>, LWTPayload = <<"should not be published">>, LWTTopic = <<"some_client/lwt">>, ok = emqx:subscribe(<<"some_client/lwt">>), {ok, C} = emqtt:start_link([ {clientid, ClientId}, {username, Username}, {will_topic, LWTTopic}, {will_payload, LWTPayload} ]), ?assertMatch({ok, _}, emqtt:connect(C)), %% Now we ban the client while it is connected. Now = erlang:system_time(second), Who = {username, Username}, emqx_banned:create(#{ who => Who, by => <<"test">>, reason => <<"test">>, at => Now, until => Now + 120 }), on_exit(fun() -> emqx_banned:delete(Who) end), %% Now kick it as we do in the ban API. process_flag(trap_exit, true), ?check_trace( begin ok = emqx_cm:kick_session(ClientId), receive {deliver, LWTTopic, #message{payload = LWTPayload}} -> error(lwt_should_not_be_published_to_forbidden_topic) after 2_000 -> ok end, ok end, fun(Trace) -> ?assertMatch( [ #{ client_banned := true, publishing_disallowed := false } ], ?of_kind(last_will_testament_publish_denied, Trace) ), ok end ), ok = snabbkaffe:stop(), ok. stop_apps(Apps) -> lists:foreach(fun application:stop/1, Apps).