329 lines
13 KiB
Erlang
329 lines
13 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
-module(emqx_audit_api_SUITE).
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
all() ->
|
|
[
|
|
{group, audit, [sequence]}
|
|
].
|
|
|
|
groups() ->
|
|
[
|
|
{audit, [sequence], common_tests()}
|
|
].
|
|
|
|
common_tests() ->
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
-define(CONF_DEFAULT, #{
|
|
node =>
|
|
#{
|
|
name => "emqx1@127.0.0.1",
|
|
cookie => "emqxsecretcookie",
|
|
data_dir => "data"
|
|
},
|
|
log => #{
|
|
audit =>
|
|
#{
|
|
enable => true,
|
|
ignore_high_frequency_request => true,
|
|
level => info,
|
|
max_filter_size => 15,
|
|
rotation_count => 2,
|
|
rotation_size => "10MB",
|
|
time_offset => "system"
|
|
}
|
|
}
|
|
}).
|
|
|
|
init_per_suite(Config) ->
|
|
_ = application:load(emqx_conf),
|
|
emqx_config:erase_all(),
|
|
emqx_mgmt_api_test_util:init_suite([emqx_ctl, emqx_conf, emqx_audit]),
|
|
ok = emqx_common_test_helpers:load_config(emqx_enterprise_schema, ?CONF_DEFAULT),
|
|
emqx_config:save_schema_mod_and_names(emqx_enterprise_schema),
|
|
ok = emqx_config_logger:refresh_config(),
|
|
application:set_env(emqx, boot_modules, []),
|
|
emqx_conf_cli:load(),
|
|
Config.
|
|
|
|
end_per_suite(_) ->
|
|
emqx_mgmt_api_test_util:end_suite([emqx_audit, emqx_conf, emqx_ctl]).
|
|
|
|
t_http_api(_) ->
|
|
process_flag(trap_exit, true),
|
|
AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
|
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
{ok, Zones} = emqx_mgmt_api_configs_SUITE:get_global_zone(),
|
|
NewZones = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_qos_allowed">>], Zones, 1),
|
|
{ok, #{<<"mqtt">> := Res}} = emqx_mgmt_api_configs_SUITE:update_global_zone(NewZones),
|
|
?assertMatch(#{<<"max_qos_allowed">> := 1}, Res),
|
|
{ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader),
|
|
?assertMatch(
|
|
#{
|
|
<<"data">> := [
|
|
#{
|
|
<<"from">> := <<"rest_api">>,
|
|
<<"operation_id">> := <<"/configs/global_zone">>,
|
|
<<"source_ip">> := <<"127.0.0.1">>,
|
|
<<"source">> := _,
|
|
<<"http_request">> := #{
|
|
<<"method">> := <<"put">>,
|
|
<<"body">> := #{<<"mqtt">> := #{<<"max_qos_allowed">> := 1}},
|
|
<<"bindings">> := _,
|
|
<<"headers">> := #{<<"authorization">> := <<"******">>}
|
|
},
|
|
<<"http_status_code">> := 200,
|
|
<<"operation_result">> := <<"success">>,
|
|
<<"operation_type">> := <<"configs">>
|
|
}
|
|
]
|
|
},
|
|
emqx_utils_json:decode(Res1, [return_maps])
|
|
),
|
|
ok.
|
|
|
|
t_disabled(_) ->
|
|
Enable = [log, audit, enable],
|
|
?assertEqual(true, emqx:get_config(Enable)),
|
|
AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
|
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
{ok, _} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader),
|
|
Size1 = mnesia:table_info(emqx_audit, size),
|
|
|
|
{ok, Logs} = emqx_mgmt_api_configs_SUITE:get_config("log"),
|
|
Logs1 = emqx_utils_maps:deep_put([<<"audit">>, <<"max_filter_size">>], Logs, 199),
|
|
NewLogs = emqx_utils_maps:deep_put([<<"audit">>, <<"enable">>], Logs1, false),
|
|
{ok, _} = emqx_mgmt_api_configs_SUITE:update_config("log", NewLogs),
|
|
{ok, GetLog1} = emqx_mgmt_api_configs_SUITE:get_config("log"),
|
|
?assertEqual(NewLogs, GetLog1),
|
|
?assertMatch(
|
|
{error, _},
|
|
emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader)
|
|
),
|
|
|
|
Size2 = mnesia:table_info(emqx_audit, size),
|
|
%% Record the audit disable action, so the size + 1
|
|
?assertEqual(Size1 + 1, Size2),
|
|
|
|
{ok, Zones} = emqx_mgmt_api_configs_SUITE:get_global_zone(),
|
|
NewZones = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_topic_levels">>], Zones, 111),
|
|
{ok, #{<<"mqtt">> := Res}} = emqx_mgmt_api_configs_SUITE:update_global_zone(NewZones),
|
|
?assertMatch(#{<<"max_topic_levels">> := 111}, Res),
|
|
Size3 = mnesia:table_info(emqx_audit, size),
|
|
%% Don't record mqtt update request.
|
|
?assertEqual(Size2, Size3),
|
|
%% enabled again
|
|
{ok, _} = emqx_mgmt_api_configs_SUITE:update_config("log", Logs1),
|
|
{ok, GetLog2} = emqx_mgmt_api_configs_SUITE:get_config("log"),
|
|
?assertEqual(Logs1, GetLog2),
|
|
Size4 = mnesia:table_info(emqx_audit, size),
|
|
?assertEqual(Size3 + 1, Size4),
|
|
ok.
|
|
|
|
t_cli(_Config) ->
|
|
Size = mnesia:table_info(emqx_audit, size),
|
|
TimeInt = erlang:system_time(microsecond) - 1000,
|
|
Time = integer_to_list(TimeInt),
|
|
DateStr = calendar:system_time_to_rfc3339(TimeInt, [{unit, microsecond}]),
|
|
Date = emqx_http_lib:uri_encode(DateStr),
|
|
ok = emqx_ctl:run_command(["conf", "show", "log"]),
|
|
AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
|
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
{ok, Res} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader),
|
|
#{<<"data">> := Data} = emqx_utils_json:decode(Res, [return_maps]),
|
|
?assertMatch(
|
|
[
|
|
#{
|
|
<<"from">> := <<"cli">>,
|
|
<<"operation_id">> := <<"">>,
|
|
<<"source_ip">> := <<"">>,
|
|
<<"operation_type">> := <<"conf">>,
|
|
<<"args">> := [<<"show">>, <<"log">>],
|
|
<<"node">> := _,
|
|
<<"source">> := <<"">>,
|
|
<<"http_request">> := <<"">>
|
|
}
|
|
],
|
|
Data
|
|
),
|
|
%% check create at is valid
|
|
[#{<<"created_at">> := CreateAtRaw}] = Data,
|
|
CreateAt = calendar:rfc3339_to_system_time(binary_to_list(CreateAtRaw), [{unit, microsecond}]),
|
|
?assert(CreateAt > TimeInt, CreateAtRaw),
|
|
?assert(CreateAt < TimeInt + 5000000, CreateAtRaw),
|
|
%% check cli filter
|
|
{ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "from=cli", AuthHeader),
|
|
#{<<"data">> := Data1} = emqx_utils_json:decode(Res1, [return_maps]),
|
|
?assertEqual(Data, Data1),
|
|
{ok, Res2} = emqx_mgmt_api_test_util:request_api(
|
|
get, AuditPath, "from=erlang_console", AuthHeader
|
|
),
|
|
?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res2, [return_maps])),
|
|
|
|
%% check created_at filter microsecond
|
|
{ok, Res3} = emqx_mgmt_api_test_util:request_api(
|
|
get, AuditPath, "gte_created_at=" ++ Time, AuthHeader
|
|
),
|
|
#{<<"data">> := Data3} = emqx_utils_json:decode(Res3, [return_maps]),
|
|
?assertEqual(1, erlang:length(Data3)),
|
|
%% check created_at filter rfc3339
|
|
{ok, Res31} = emqx_mgmt_api_test_util:request_api(
|
|
get, AuditPath, "gte_created_at=" ++ Date, AuthHeader
|
|
),
|
|
?assertEqual(Res3, Res31),
|
|
%% check created_at filter millisecond
|
|
TimeMs = integer_to_list(TimeInt div 1000),
|
|
{ok, Res32} = emqx_mgmt_api_test_util:request_api(
|
|
get, AuditPath, "gte_created_at=" ++ TimeMs, AuthHeader
|
|
),
|
|
?assertEqual(Res3, Res32),
|
|
|
|
%% check created_at filter microsecond
|
|
{ok, Res4} = emqx_mgmt_api_test_util:request_api(
|
|
get, AuditPath, "lte_created_at=" ++ Time, AuthHeader
|
|
),
|
|
#{<<"data">> := Data4} = emqx_utils_json:decode(Res4, [return_maps]),
|
|
?assertEqual(Size, erlang:length(Data4)),
|
|
|
|
%% check created_at filter rfc3339
|
|
{ok, Res41} = emqx_mgmt_api_test_util:request_api(
|
|
get, AuditPath, "lte_created_at=" ++ Date, AuthHeader
|
|
),
|
|
?assertEqual(Res4, Res41),
|
|
%% check created_at filter millisecond
|
|
{ok, Res42} = emqx_mgmt_api_test_util:request_api(
|
|
get, AuditPath, "lte_created_at=" ++ TimeMs, AuthHeader
|
|
),
|
|
?assertEqual(Res4, Res42),
|
|
|
|
%% check duration_ms filter
|
|
{ok, Res5} = emqx_mgmt_api_test_util:request_api(
|
|
get, AuditPath, "gte_duration_ms=0", AuthHeader
|
|
),
|
|
#{<<"data">> := Data5} = emqx_utils_json:decode(Res5, [return_maps]),
|
|
?assertEqual(Size + 1, erlang:length(Data5)),
|
|
{ok, Res6} = emqx_mgmt_api_test_util:request_api(
|
|
get, AuditPath, "lte_duration_ms=-1", AuthHeader
|
|
),
|
|
?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res6, [return_maps])),
|
|
ok.
|
|
|
|
t_max_size(_Config) ->
|
|
{ok, _} = emqx:update_config([log, audit, max_filter_size], 999),
|
|
%% Make sure this process is using latest max_filter_size.
|
|
?assertEqual(ignore, gen_server:call(emqx_audit, whatever)),
|
|
SizeFun =
|
|
fun() ->
|
|
AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
|
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
Limit = "limit=1000",
|
|
{ok, Res} = emqx_mgmt_api_test_util:request_api(get, AuditPath, Limit, AuthHeader),
|
|
#{<<"data">> := Data} = emqx_utils_json:decode(Res, [return_maps]),
|
|
erlang:length(Data)
|
|
end,
|
|
InitSize = SizeFun(),
|
|
lists:foreach(
|
|
fun(_) ->
|
|
ok = emqx_ctl:run_command(["conf", "show", "log"])
|
|
end,
|
|
lists:duplicate(100, 1)
|
|
),
|
|
_ = mnesia:dump_log(),
|
|
LogCount = wait_for_dirty_write_log_done(1500),
|
|
Size1 = SizeFun(),
|
|
?assert(Size1 - InitSize >= 100, #{
|
|
api => Size1,
|
|
init => InitSize,
|
|
log_size => LogCount,
|
|
config => emqx:get_config([log, audit, max_filter_size])
|
|
}),
|
|
{ok, _} = emqx:update_config([log, audit, max_filter_size], 10),
|
|
%% wait for clean_expired
|
|
timer:sleep(250),
|
|
ExpectSize = emqx:get_config([log, audit, max_filter_size]),
|
|
Size2 = SizeFun(),
|
|
?assertEqual(ExpectSize, Size2, {sys:get_state(emqx_audit)}),
|
|
ok.
|
|
|
|
t_kickout_clients_without_log(_) ->
|
|
process_flag(trap_exit, true),
|
|
AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
|
|
{ok, AuditLogs1} = emqx_mgmt_api_test_util:request_api(get, AuditPath),
|
|
kickout_clients(),
|
|
{ok, AuditLogs2} = emqx_mgmt_api_test_util:request_api(get, AuditPath),
|
|
?assertEqual(AuditLogs1, AuditLogs2),
|
|
ok.
|
|
|
|
kickout_clients() ->
|
|
ClientId1 = <<"client1">>,
|
|
ClientId2 = <<"client2">>,
|
|
ClientId3 = <<"client3">>,
|
|
|
|
{ok, C1} = emqtt:start_link(#{
|
|
clientid => ClientId1,
|
|
proto_ver => v5,
|
|
properties => #{'Session-Expiry-Interval' => 120}
|
|
}),
|
|
{ok, _} = emqtt:connect(C1),
|
|
{ok, C2} = emqtt:start_link(#{clientid => ClientId2}),
|
|
{ok, _} = emqtt:connect(C2),
|
|
{ok, C3} = emqtt:start_link(#{clientid => ClientId3}),
|
|
{ok, _} = emqtt:connect(C3),
|
|
|
|
timer:sleep(300),
|
|
|
|
%% get /clients
|
|
ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]),
|
|
{ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
|
|
ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]),
|
|
ClientsMeta = maps:get(<<"meta">>, ClientsResponse),
|
|
ClientsPage = maps:get(<<"page">>, ClientsMeta),
|
|
ClientsLimit = maps:get(<<"limit">>, ClientsMeta),
|
|
ClientsCount = maps:get(<<"count">>, ClientsMeta),
|
|
?assertEqual(ClientsPage, 1),
|
|
?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()),
|
|
?assertEqual(ClientsCount, 3),
|
|
|
|
%% kickout clients
|
|
KickoutPath = emqx_mgmt_api_test_util:api_path(["clients", "kickout", "bulk"]),
|
|
KickoutBody = [ClientId1, ClientId2, ClientId3],
|
|
{ok, 204, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody),
|
|
|
|
{ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
|
|
ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]),
|
|
?assertMatch(#{<<"data">> := []}, ClientsResponse2).
|
|
|
|
wait_for_dirty_write_log_done(MaxMs) ->
|
|
Size = mnesia:table_info(emqx_audit, size),
|
|
wait_for_dirty_write_log_done(Size, MaxMs).
|
|
|
|
wait_for_dirty_write_log_done(Size, RemainMs) when RemainMs =< 0 -> Size;
|
|
wait_for_dirty_write_log_done(Prev, RemainMs) ->
|
|
SleepMs = 100,
|
|
ct:sleep(SleepMs),
|
|
case mnesia:table_info(emqx_audit, size) of
|
|
Prev ->
|
|
ct:sleep(SleepMs * 2),
|
|
Prev;
|
|
New ->
|
|
wait_for_dirty_write_log_done(New, RemainMs - SleepMs)
|
|
end.
|