%%-------------------------------------------------------------------- %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_audit_api_SUITE). -compile(export_all). -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). all() -> [ {group, audit, [sequence]} ]. groups() -> [ {audit, [sequence], common_tests()} ]. common_tests() -> emqx_common_test_helpers:all(?MODULE). -define(CONF_DEFAULT, #{ node => #{ name => "emqx1@127.0.0.1", cookie => "emqxsecretcookie", data_dir => "data" }, log => #{ audit => #{ enable => true, ignore_high_frequency_request => true, level => info, max_filter_size => 15, rotation_count => 2, rotation_size => "10MB", time_offset => "system" } } }). init_per_suite(Config) -> _ = application:load(emqx_conf), emqx_config:erase_all(), emqx_mgmt_api_test_util:init_suite([emqx_ctl, emqx_conf, emqx_audit]), ok = emqx_common_test_helpers:load_config(emqx_enterprise_schema, ?CONF_DEFAULT), emqx_config:save_schema_mod_and_names(emqx_enterprise_schema), ok = emqx_config_logger:refresh_config(), application:set_env(emqx, boot_modules, []), emqx_conf_cli:load(), Config. end_per_suite(_) -> emqx_mgmt_api_test_util:end_suite([emqx_audit, emqx_conf, emqx_ctl]). t_http_api(_) -> process_flag(trap_exit, true), AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), {ok, Zones} = emqx_mgmt_api_configs_SUITE:get_global_zone(), NewZones = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_qos_allowed">>], Zones, 1), {ok, #{<<"mqtt">> := Res}} = emqx_mgmt_api_configs_SUITE:update_global_zone(NewZones), ?assertMatch(#{<<"max_qos_allowed">> := 1}, Res), {ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader), ?assertMatch( #{ <<"data">> := [ #{ <<"from">> := <<"rest_api">>, <<"operation_id">> := <<"/configs/global_zone">>, <<"source_ip">> := <<"127.0.0.1">>, <<"source">> := _, <<"http_request">> := #{ <<"method">> := <<"put">>, <<"body">> := #{<<"mqtt">> := #{<<"max_qos_allowed">> := 1}}, <<"bindings">> := _, <<"headers">> := #{<<"authorization">> := <<"******">>} }, <<"http_status_code">> := 200, <<"operation_result">> := <<"success">>, <<"operation_type">> := <<"configs">> } ] }, emqx_utils_json:decode(Res1, [return_maps]) ), ok. t_disabled(_) -> Enable = [log, audit, enable], ?assertEqual(true, emqx:get_config(Enable)), AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), {ok, _} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader), Size1 = mnesia:table_info(emqx_audit, size), {ok, Logs} = emqx_mgmt_api_configs_SUITE:get_config("log"), Logs1 = emqx_utils_maps:deep_put([<<"audit">>, <<"max_filter_size">>], Logs, 199), NewLogs = emqx_utils_maps:deep_put([<<"audit">>, <<"enable">>], Logs1, false), {ok, _} = emqx_mgmt_api_configs_SUITE:update_config("log", NewLogs), {ok, GetLog1} = emqx_mgmt_api_configs_SUITE:get_config("log"), ?assertEqual(NewLogs, GetLog1), ?assertMatch( {error, _}, emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader) ), Size2 = mnesia:table_info(emqx_audit, size), %% Record the audit disable action, so the size + 1 ?assertEqual(Size1 + 1, Size2), {ok, Zones} = emqx_mgmt_api_configs_SUITE:get_global_zone(), NewZones = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_topic_levels">>], Zones, 111), {ok, #{<<"mqtt">> := Res}} = emqx_mgmt_api_configs_SUITE:update_global_zone(NewZones), ?assertMatch(#{<<"max_topic_levels">> := 111}, Res), Size3 = mnesia:table_info(emqx_audit, size), %% Don't record mqtt update request. ?assertEqual(Size2, Size3), %% enabled again {ok, _} = emqx_mgmt_api_configs_SUITE:update_config("log", Logs1), {ok, GetLog2} = emqx_mgmt_api_configs_SUITE:get_config("log"), ?assertEqual(Logs1, GetLog2), Size4 = mnesia:table_info(emqx_audit, size), ?assertEqual(Size3 + 1, Size4), ok. t_cli(_Config) -> Size = mnesia:table_info(emqx_audit, size), TimeInt = erlang:system_time(microsecond) - 1000, Time = integer_to_list(TimeInt), DateStr = calendar:system_time_to_rfc3339(TimeInt, [{unit, microsecond}]), Date = emqx_http_lib:uri_encode(DateStr), ok = emqx_ctl:run_command(["conf", "show", "log"]), AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), {ok, Res} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader), #{<<"data">> := Data} = emqx_utils_json:decode(Res, [return_maps]), ?assertMatch( [ #{ <<"from">> := <<"cli">>, <<"operation_id">> := <<"">>, <<"source_ip">> := <<"">>, <<"operation_type">> := <<"conf">>, <<"args">> := [<<"show">>, <<"log">>], <<"node">> := _, <<"source">> := <<"">>, <<"http_request">> := <<"">> } ], Data ), %% check create at is valid [#{<<"created_at">> := CreateAtRaw}] = Data, CreateAt = calendar:rfc3339_to_system_time(binary_to_list(CreateAtRaw), [{unit, microsecond}]), ?assert(CreateAt > TimeInt, CreateAtRaw), ?assert(CreateAt < TimeInt + 5000000, CreateAtRaw), %% check cli filter {ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "from=cli", AuthHeader), #{<<"data">> := Data1} = emqx_utils_json:decode(Res1, [return_maps]), ?assertEqual(Data, Data1), {ok, Res2} = emqx_mgmt_api_test_util:request_api( get, AuditPath, "from=erlang_console", AuthHeader ), ?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res2, [return_maps])), %% check created_at filter microsecond {ok, Res3} = emqx_mgmt_api_test_util:request_api( get, AuditPath, "gte_created_at=" ++ Time, AuthHeader ), #{<<"data">> := Data3} = emqx_utils_json:decode(Res3, [return_maps]), ?assertEqual(1, erlang:length(Data3)), %% check created_at filter rfc3339 {ok, Res31} = emqx_mgmt_api_test_util:request_api( get, AuditPath, "gte_created_at=" ++ Date, AuthHeader ), ?assertEqual(Res3, Res31), %% check created_at filter millisecond TimeMs = integer_to_list(TimeInt div 1000), {ok, Res32} = emqx_mgmt_api_test_util:request_api( get, AuditPath, "gte_created_at=" ++ TimeMs, AuthHeader ), ?assertEqual(Res3, Res32), %% check created_at filter microsecond {ok, Res4} = emqx_mgmt_api_test_util:request_api( get, AuditPath, "lte_created_at=" ++ Time, AuthHeader ), #{<<"data">> := Data4} = emqx_utils_json:decode(Res4, [return_maps]), ?assertEqual(Size, erlang:length(Data4)), %% check created_at filter rfc3339 {ok, Res41} = emqx_mgmt_api_test_util:request_api( get, AuditPath, "lte_created_at=" ++ Date, AuthHeader ), ?assertEqual(Res4, Res41), %% check created_at filter millisecond {ok, Res42} = emqx_mgmt_api_test_util:request_api( get, AuditPath, "lte_created_at=" ++ TimeMs, AuthHeader ), ?assertEqual(Res4, Res42), %% check duration_ms filter {ok, Res5} = emqx_mgmt_api_test_util:request_api( get, AuditPath, "gte_duration_ms=0", AuthHeader ), #{<<"data">> := Data5} = emqx_utils_json:decode(Res5, [return_maps]), ?assertEqual(Size + 1, erlang:length(Data5)), {ok, Res6} = emqx_mgmt_api_test_util:request_api( get, AuditPath, "lte_duration_ms=-1", AuthHeader ), ?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res6, [return_maps])), ok. t_max_size(_Config) -> {ok, _} = emqx:update_config([log, audit, max_filter_size], 999), %% Make sure this process is using latest max_filter_size. ?assertEqual(ignore, gen_server:call(emqx_audit, whatever)), SizeFun = fun() -> AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), Limit = "limit=1000", {ok, Res} = emqx_mgmt_api_test_util:request_api(get, AuditPath, Limit, AuthHeader), #{<<"data">> := Data} = emqx_utils_json:decode(Res, [return_maps]), erlang:length(Data) end, InitSize = SizeFun(), lists:foreach( fun(_) -> ok = emqx_ctl:run_command(["conf", "show", "log"]) end, lists:duplicate(100, 1) ), _ = mnesia:dump_log(), LogCount = wait_for_dirty_write_log_done(1500), Size1 = SizeFun(), ?assert(Size1 - InitSize >= 100, #{ api => Size1, init => InitSize, log_size => LogCount, config => emqx:get_config([log, audit, max_filter_size]) }), {ok, _} = emqx:update_config([log, audit, max_filter_size], 10), %% wait for clean_expired timer:sleep(250), ExpectSize = emqx:get_config([log, audit, max_filter_size]), Size2 = SizeFun(), ?assertEqual(ExpectSize, Size2, {sys:get_state(emqx_audit)}), ok. t_kickout_clients_without_log(_) -> process_flag(trap_exit, true), AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]), {ok, AuditLogs1} = emqx_mgmt_api_test_util:request_api(get, AuditPath), kickout_clients(), {ok, AuditLogs2} = emqx_mgmt_api_test_util:request_api(get, AuditPath), ?assertEqual(AuditLogs1, AuditLogs2), ok. kickout_clients() -> ClientId1 = <<"client1">>, ClientId2 = <<"client2">>, ClientId3 = <<"client3">>, {ok, C1} = emqtt:start_link(#{ clientid => ClientId1, proto_ver => v5, properties => #{'Session-Expiry-Interval' => 120} }), {ok, _} = emqtt:connect(C1), {ok, C2} = emqtt:start_link(#{clientid => ClientId2}), {ok, _} = emqtt:connect(C2), {ok, C3} = emqtt:start_link(#{clientid => ClientId3}), {ok, _} = emqtt:connect(C3), timer:sleep(300), %% get /clients ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]), {ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]), ClientsMeta = maps:get(<<"meta">>, ClientsResponse), ClientsPage = maps:get(<<"page">>, ClientsMeta), ClientsLimit = maps:get(<<"limit">>, ClientsMeta), ClientsCount = maps:get(<<"count">>, ClientsMeta), ?assertEqual(ClientsPage, 1), ?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()), ?assertEqual(ClientsCount, 3), %% kickout clients KickoutPath = emqx_mgmt_api_test_util:api_path(["clients", "kickout", "bulk"]), KickoutBody = [ClientId1, ClientId2, ClientId3], {ok, 204, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody), {ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]), ?assertMatch(#{<<"data">> := []}, ClientsResponse2). wait_for_dirty_write_log_done(MaxMs) -> Size = mnesia:table_info(emqx_audit, size), wait_for_dirty_write_log_done(Size, MaxMs). wait_for_dirty_write_log_done(Size, RemainMs) when RemainMs =< 0 -> Size; wait_for_dirty_write_log_done(Prev, RemainMs) -> SleepMs = 100, ct:sleep(SleepMs), case mnesia:table_info(emqx_audit, size) of Prev -> ct:sleep(SleepMs * 2), Prev; New -> wait_for_dirty_write_log_done(New, RemainMs - SleepMs) end.