%%-------------------------------------------------------------------- %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_authz_api_sources_SUITE). -compile(nowarn_export_all). -compile(export_all). -import(emqx_dashboard_api_test_helpers, [request/3, uri/1]). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl"). -define(MONGO_SINGLE_HOST, "mongo"). -define(MYSQL_HOST, "mysql:3306"). -define(PGSQL_HOST, "pgsql"). -define(REDIS_SINGLE_HOST, "redis"). -define(SOURCE1, #{ <<"type">> => <<"http">>, <<"enable">> => true, <<"url">> => <<"https://fake.com:443/acl?username=", ?PH_USERNAME/binary>>, <<"ssl">> => #{<<"enable">> => true}, <<"headers">> => #{}, <<"method">> => <<"get">>, <<"request_timeout">> => <<"5s">> }). -define(SOURCE2, #{ <<"type">> => <<"mongodb">>, <<"enable">> => true, <<"mongo_type">> => <<"single">>, <<"server">> => <>, <<"w_mode">> => <<"unsafe">>, <<"pool_size">> => 1, <<"database">> => <<"mqtt">>, <<"ssl">> => #{<<"enable">> => false}, <<"collection">> => <<"fake">>, <<"filter">> => #{<<"a">> => <<"b">>} }). -define(SOURCE3, #{ <<"type">> => <<"mysql">>, <<"enable">> => true, <<"server">> => <>, <<"pool_size">> => 1, <<"database">> => <<"mqtt">>, <<"username">> => <<"xx">>, <<"password">> => <<"ee">>, <<"auto_reconnect">> => true, <<"ssl">> => #{<<"enable">> => false}, <<"query">> => <<"abcb">> }). -define(SOURCE4, #{ <<"type">> => <<"postgresql">>, <<"enable">> => true, <<"server">> => <>, <<"pool_size">> => 1, <<"database">> => <<"mqtt">>, <<"username">> => <<"xx">>, <<"password">> => <<"ee">>, <<"auto_reconnect">> => true, <<"ssl">> => #{<<"enable">> => false}, <<"query">> => <<"abcb">> }). -define(SOURCE5, #{ <<"type">> => <<"redis">>, <<"enable">> => true, <<"servers">> => <>, <<"redis_type">> => <<"cluster">>, <<"pool_size">> => 1, <<"password">> => <<"ee">>, <<"auto_reconnect">> => true, <<"ssl">> => #{<<"enable">> => false}, <<"cmd">> => <<"HGETALL mqtt_authz:", ?PH_USERNAME/binary>> }). -define(SOURCE6, #{ <<"type">> => <<"file">>, <<"enable">> => true, <<"rules">> => << "{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}." "\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}." >> }). all() -> emqx_common_test_helpers:all(?MODULE). groups() -> []. init_per_suite(Config) -> ok = stop_apps([emqx_resource]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_resource, create_local, fun(_, _, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, health_check, fun(St) -> {ok, St} end), meck:expect(emqx_resource, remove_local, fun(_) -> ok end), meck:expect( emqx_authz, acl_conf_file, fun() -> emqx_common_test_helpers:deps_path(emqx_authz, "etc/acl.conf") end ), ok = emqx_common_test_helpers:start_apps( [emqx_conf, emqx_authz, emqx_dashboard], fun set_special_configs/1 ), ok = start_apps([emqx_resource]), Config. end_per_suite(_Config) -> {ok, _} = emqx:update_config( [authorization], #{ <<"no_match">> => <<"allow">>, <<"cache">> => #{<<"enable">> => <<"true">>}, <<"sources">> => [] } ), %% resource and connector should be stop first, %% or authz_[mysql|pgsql|redis..]_SUITE would be failed ok = stop_apps([emqx_resource]), emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]), meck:unload(emqx_resource), ok. set_special_configs(emqx_dashboard) -> emqx_dashboard_api_test_helpers:set_default_config(); set_special_configs(emqx_authz) -> {ok, _} = emqx:update_config([authorization, cache, enable], false), {ok, _} = emqx:update_config([authorization, no_match], deny), {ok, _} = emqx:update_config([authorization, sources], []), ok; set_special_configs(_App) -> ok. init_per_testcase(t_api, Config) -> meck:new(emqx_misc, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_misc, gen_id, fun() -> "fake" end), meck:new(emqx, [non_strict, passthrough, no_history, no_link]), meck:expect( emqx, data_dir, fun() -> {data_dir, Data} = lists:keyfind(data_dir, 1, Config), Data end ), Config; init_per_testcase(_, Config) -> Config. end_per_testcase(t_api, _Config) -> meck:unload(emqx_misc), meck:unload(emqx), ok; end_per_testcase(_, _Config) -> ok. %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ t_api(_) -> {ok, 200, Result1} = request(get, uri(["authorization", "sources"]), []), ?assertEqual([], get_sources(Result1)), {ok, 404, ErrResult} = request(get, uri(["authorization", "sources", "http"]), []), ?assertMatch( #{<<"code">> := <<"NOT_FOUND">>, <<"message">> := <<"Not found: http">>}, jsx:decode(ErrResult) ), [ begin {ok, 204, _} = request(post, uri(["authorization", "sources"]), Source) end || Source <- lists:reverse([?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6]) ], {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE1), {ok, 200, Result2} = request(get, uri(["authorization", "sources"]), []), Sources = get_sources(Result2), ?assertMatch( [ #{<<"type">> := <<"http">>}, #{<<"type">> := <<"mongodb">>}, #{<<"type">> := <<"mysql">>}, #{<<"type">> := <<"postgresql">>}, #{<<"type">> := <<"redis">>}, #{<<"type">> := <<"file">>} ], Sources ), ?assert(filelib:is_file(emqx_authz:acl_conf_file())), {ok, 204, _} = request( put, uri(["authorization", "sources", "http"]), ?SOURCE1#{<<"enable">> := false} ), {ok, 200, Result3} = request(get, uri(["authorization", "sources", "http"]), []), ?assertMatch(#{<<"type">> := <<"http">>, <<"enable">> := false}, jsx:decode(Result3)), Keyfile = emqx_common_test_helpers:app_path( emqx, filename:join(["etc", "certs", "key.pem"]) ), Certfile = emqx_common_test_helpers:app_path( emqx, filename:join(["etc", "certs", "cert.pem"]) ), Cacertfile = emqx_common_test_helpers:app_path( emqx, filename:join(["etc", "certs", "cacert.pem"]) ), {ok, 204, _} = request( put, uri(["authorization", "sources", "mongodb"]), ?SOURCE2#{ <<"ssl">> => #{ <<"enable">> => <<"true">>, <<"cacertfile">> => Cacertfile, <<"certfile">> => Certfile, <<"keyfile">> => Keyfile, <<"verify">> => <<"verify_none">> } } ), {ok, 200, Result4} = request(get, uri(["authorization", "sources", "mongodb"]), []), {ok, 200, Status4} = request(get, uri(["authorization", "sources", "mongodb", "status"]), []), #{ <<"metrics">> := #{ <<"allow">> := 0, <<"deny">> := 0, <<"total">> := 0, <<"nomatch">> := 0 } } = jiffy:decode(Status4, [return_maps]), ?assertMatch( #{ <<"type">> := <<"mongodb">>, <<"ssl">> := #{ <<"enable">> := <<"true">>, <<"cacertfile">> := _, <<"certfile">> := _, <<"keyfile">> := _, <<"verify">> := <<"verify_none">> } }, jsx:decode(Result4) ), {ok, Cacert} = file:read_file(Cacertfile), {ok, Cert} = file:read_file(Certfile), {ok, Key} = file:read_file(Keyfile), {ok, 204, _} = request( put, uri(["authorization", "sources", "mongodb"]), ?SOURCE2#{ <<"ssl">> => #{ <<"enable">> => <<"true">>, <<"cacertfile">> => Cacert, <<"certfile">> => Cert, <<"keyfile">> => Key, <<"verify">> => <<"verify_none">> } } ), {ok, 200, Result5} = request(get, uri(["authorization", "sources", "mongodb"]), []), ?assertMatch( #{ <<"type">> := <<"mongodb">>, <<"ssl">> := #{ <<"enable">> := <<"true">>, <<"cacertfile">> := _, <<"certfile">> := _, <<"keyfile">> := _, <<"verify">> := <<"verify_none">> } }, jsx:decode(Result5) ), {ok, 200, Status5_1} = request(get, uri(["authorization", "sources", "mongodb", "status"]), []), #{ <<"metrics">> := #{ <<"allow">> := 0, <<"deny">> := 0, <<"total">> := 0, <<"nomatch">> := 0 } } = jiffy:decode(Status5_1, [return_maps]), #{ ssl := #{ cacertfile := SavedCacertfile, certfile := SavedCertfile, keyfile := SavedKeyfile } } = emqx_authz:lookup(mongodb), ?assert(filelib:is_file(SavedCacertfile)), ?assert(filelib:is_file(SavedCertfile)), ?assert(filelib:is_file(SavedKeyfile)), {ok, 204, _} = request( put, uri(["authorization", "sources", "mysql"]), ?SOURCE3#{<<"server">> := <<"192.168.1.100:3306">>} ), {ok, 204, _} = request( put, uri(["authorization", "sources", "postgresql"]), ?SOURCE4#{<<"server">> := <<"fake">>} ), {ok, 204, _} = request( put, uri(["authorization", "sources", "redis"]), ?SOURCE5#{ <<"servers">> := [ <<"192.168.1.100:6379">>, <<"192.168.1.100:6380">> ] } ), lists:foreach( fun(#{<<"type">> := Type}) -> {ok, 204, _} = request( delete, uri(["authorization", "sources", binary_to_list(Type)]), [] ) end, Sources ), {ok, 200, Result6} = request(get, uri(["authorization", "sources"]), []), ?assertEqual([], get_sources(Result6)), ?assertEqual([], emqx:get_config([authorization, sources])), {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE6), {ok, Client} = emqtt:start_link( [ {username, <<"u_event3">>}, {clientid, <<"c_event3">>}, {proto_ver, v5}, {properties, #{'Session-Expiry-Interval' => 60}} ] ), emqtt:connect(Client), timer:sleep(50), emqtt:publish( Client, <<"t1">>, #{'Message-Expiry-Interval' => 60}, <<"{\"id\": 1, \"name\": \"ha\"}">>, [{qos, 1}] ), {ok, 200, Status5} = request(get, uri(["authorization", "sources", "file", "status"]), []), #{ <<"metrics">> := #{ <<"allow">> := 1, <<"deny">> := 0, <<"total">> := 1, <<"nomatch">> := 0 } } = jiffy:decode(Status5, [return_maps]), timer:sleep(50), emqtt:publish( Client, <<"t2">>, #{'Message-Expiry-Interval' => 60}, <<"{\"id\": 1, \"name\": \"ha\"}">>, [{qos, 1}] ), {ok, 200, Status6} = request(get, uri(["authorization", "sources", "file", "status"]), []), #{ <<"metrics">> := #{ <<"allow">> := 2, <<"deny">> := 0, <<"total">> := 2, <<"nomatch">> := 0 } } = jiffy:decode(Status6, [return_maps]), timer:sleep(50), emqtt:publish( Client, <<"t3">>, #{'Message-Expiry-Interval' => 60}, <<"{\"id\": 1, \"name\": \"ha\"}">>, [{qos, 1}] ), timer:sleep(50), {ok, 200, Status7} = request(get, uri(["authorization", "sources", "file", "status"]), []), #{ <<"metrics">> := #{ <<"allow">> := 3, <<"deny">> := 0, <<"total">> := 3, <<"nomatch">> := 0 } } = jiffy:decode(Status7, [return_maps]), ok. t_move_source(_) -> {ok, _} = emqx_authz:update(replace, [?SOURCE1, ?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5]), ?assertMatch( [ #{type := http}, #{type := mongodb}, #{type := mysql}, #{type := postgresql}, #{type := redis} ], emqx_authz:lookup() ), {ok, 204, _} = request( post, uri(["authorization", "sources", "postgresql", "move"]), #{<<"position">> => <<"front">>} ), ?assertMatch( [ #{type := postgresql}, #{type := http}, #{type := mongodb}, #{type := mysql}, #{type := redis} ], emqx_authz:lookup() ), {ok, 204, _} = request( post, uri(["authorization", "sources", "http", "move"]), #{<<"position">> => <<"rear">>} ), ?assertMatch( [ #{type := postgresql}, #{type := mongodb}, #{type := mysql}, #{type := redis}, #{type := http} ], emqx_authz:lookup() ), {ok, 204, _} = request( post, uri(["authorization", "sources", "mysql", "move"]), #{<<"position">> => <<"before:postgresql">>} ), ?assertMatch( [ #{type := mysql}, #{type := postgresql}, #{type := mongodb}, #{type := redis}, #{type := http} ], emqx_authz:lookup() ), {ok, 204, _} = request( post, uri(["authorization", "sources", "mongodb", "move"]), #{<<"position">> => <<"after:http">>} ), ?assertMatch( [ #{type := mysql}, #{type := postgresql}, #{type := redis}, #{type := http}, #{type := mongodb} ], emqx_authz:lookup() ), ok. t_aggregate_metrics(_) -> Metrics = #{ 'emqx@node1.emqx.io' => #{ metrics => #{ failed => 0, total => 1, rate => 0.0, rate_last5m => 0.0, rate_max => 0.1, success => 1 } }, 'emqx@node2.emqx.io' => #{ metrics => #{ failed => 0, total => 1, rate => 0.0, rate_last5m => 0.0, rate_max => 0.1, success => 1 } } }, Res = emqx_authn_api:aggregate_metrics(maps:values(Metrics)), ?assertEqual( #{ metrics => #{ failed => 0, total => 2, rate => 0.0, rate_last5m => 0.0, rate_max => 0.2, success => 2 } }, Res ). get_sources(Result) -> maps:get(<<"sources">>, jsx:decode(Result), []). data_dir() -> emqx:data_dir(). start_apps(Apps) -> lists:foreach(fun application:ensure_all_started/1, Apps). stop_apps(Apps) -> lists:foreach(fun application:stop/1, Apps).