From 9bb22507df4cc266b325b7f4c7a4554b6b9834c5 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 25 Oct 2023 16:09:11 +0800 Subject: [PATCH] fix: dont use transation on audit log --- apps/emqx_audit/include/emqx_audit.hrl | 1 - apps/emqx_audit/src/emqx_audit.erl | 120 ++++++++---------- apps/emqx_audit/test/emqx_audit_api_SUITE.erl | 29 +++++ 3 files changed, 84 insertions(+), 66 deletions(-) diff --git a/apps/emqx_audit/include/emqx_audit.hrl b/apps/emqx_audit/include/emqx_audit.hrl index 1b4349387..8304a9060 100644 --- a/apps/emqx_audit/include/emqx_audit.hrl +++ b/apps/emqx_audit/include/emqx_audit.hrl @@ -5,7 +5,6 @@ -define(AUDIT, emqx_audit). -record(?AUDIT, { - seq, %% basic info created_at, node, diff --git a/apps/emqx_audit/src/emqx_audit.erl b/apps/emqx_audit/src/emqx_audit.erl index 4477bbd8b..d00aa493b 100644 --- a/apps/emqx_audit/src/emqx_audit.erl +++ b/apps/emqx_audit/src/emqx_audit.erl @@ -14,6 +14,8 @@ -export([start_link/0]). -export([log/1, log/2]). +-export([dirty_clean_expired/1]). + %% gen_server callbacks -export([ init/1, @@ -26,12 +28,15 @@ ]). -define(FILTER_REQ, [cert, host_info, has_sent_resp, pid, path_info, peer, ref, sock, streamid]). --define(CLEAN_EXPIRED_MS, 60 * 1000). + +-ifdef(TEST). +-define(INTERVAL, 100). +-else. +-define(INTERVAL, 2500). +-endif. to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) -> #?AUDIT{ - created_at = erlang:system_time(microsecond), - node = node(), operation_id = <<"">>, operation_type = atom_to_binary(Cmd), args = Args, @@ -62,8 +67,6 @@ to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api duration_ms := DurationMs } = Log, #?AUDIT{ - created_at = erlang:system_time(microsecond), - node = node(), from = From, source = Source, source_ip = SourceIp, @@ -81,8 +84,6 @@ to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api }; to_audit(#{from := event, event := Event}) -> #?AUDIT{ - created_at = erlang:system_time(microsecond), - node = node(), from = event, source = <<"">>, source_ip = <<"">>, @@ -100,8 +101,6 @@ to_audit(#{from := event, event := Event}) -> }; to_audit(#{from := erlang_console, function := F, args := Args}) -> #?AUDIT{ - created_at = erlang:system_time(microsecond), - node = node(), from = erlang_console, source = <<"">>, source_ip = <<"">>, @@ -127,7 +126,12 @@ log(Level, Meta1) -> emqx_audit:log(Meta2). log(Log) -> - gen_server:cast(?MODULE, {write, to_audit(Log)}). + Audit0 = to_audit(Log), + Audit = Audit0#?AUDIT{ + node = node(), + created_at = erlang:system_time(microsecond) + }, + mria:dirty_write(?AUDIT, Audit). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -140,27 +144,28 @@ init([]) -> {record_name, ?AUDIT}, {attributes, record_info(fields, ?AUDIT)} ]), - {ok, #{}, {continue, setup}}. + case mria_rlog:role() of + core -> {ok, #{}, {continue, setup}}; + _ -> {ok, #{}} + end. -handle_continue(setup, #{} = State) -> +handle_continue(setup, State) -> ok = mria:wait_for_tables([?AUDIT]), clean_expired(), - {noreply, State}. + Interval = clean_expired_interval(), + {noreply, State#{interval => Interval}, Interval}. -handle_call(_Request, _From, State = #{}) -> - {reply, ok, State}. +handle_call(_Request, _From, State = #{interval := Interval}) -> + {reply, ignore, State, Interval}. -handle_cast({write, Log}, State) -> - _ = write_log(Log), - {noreply, State#{}, ?CLEAN_EXPIRED_MS}; -handle_cast(_Request, State = #{}) -> - {noreply, State}. +handle_cast(_Request, State = #{interval := Interval}) -> + {noreply, State, Interval}. -handle_info(timeout, State = #{}) -> +handle_info(timeout, State = #{interval := Interval}) -> clean_expired(), - {noreply, State, hibernate}; -handle_info(_Info, State = #{}) -> - {noreply, State}. + {noreply, State, Interval}; +handle_info(_Info, State = #{interval := Interval}) -> + {noreply, State, Interval}. terminate(_Reason, _State = #{}) -> ok. @@ -172,50 +177,35 @@ code_change(_OldVsn, State = #{}, _Extra) -> %%% Internal functions %%%=================================================================== -write_log(Log) -> - case - mria:transaction( - ?COMMON_SHARD, - fun(L) -> - New = - case mnesia:last(?AUDIT) of - '$end_of_table' -> 1; - LastId -> LastId + 1 - end, - mnesia:write(L#?AUDIT{seq = New}) - end, - [Log] - ) - of - {atomic, ok} -> - ok; - Reason -> - ?SLOG(warning, #{ - msg => "write_audit_log_failed", - reason => Reason - }) - end. - clean_expired() -> MaxSize = max_size(), - LatestId = latest_id(), - Min = LatestId - MaxSize, - %% MS = ets:fun2ms(fun(#?AUDIT{seq = Seq}) when Seq =< Min -> true end), - MS = [{#?AUDIT{seq = '$1', _ = '_'}, [{'=<', '$1', Min}], [true]}], - NumDeleted = mnesia:ets(fun ets:select_delete/2, [?AUDIT, MS]), - ?SLOG(debug, #{ - msg => "clean_audit_log", - latest_id => LatestId, - min => Min, - deleted_number => NumDeleted - }), - ok. - -latest_id() -> - case mnesia:dirty_last(?AUDIT) of - '$end_of_table' -> 0; - Seq -> Seq + CurSize = mnesia:table_info(?AUDIT, size), + case CurSize - MaxSize of + DelCount when DelCount > 0 -> + mria:async_dirty( + ?COMMON_SHARD, + fun ?MODULE:dirty_clean_expired/1, + [DelCount] + ); + _ -> + ok end. +dirty_clean_expired(DelCount) -> + dirty_clean_expired(mnesia:dirty_first(?AUDIT), DelCount). + +dirty_clean_expired(_, DelCount) when DelCount =< 0 -> ok; +dirty_clean_expired('$end_of_table', _DelCount) -> + ok; +dirty_clean_expired(CurKey, DeleteCount) -> + mnesia:dirty_delete(?AUDIT, CurKey), + dirty_clean_expired(mnesia:dirty_next(?AUDIT, CurKey), DeleteCount - 1). + max_size() -> emqx_conf:get([log, audit, max_filter_size], 5000). + +%% Try to make the time interval of each node is different. +%% 2 * Interval ~ 3 * Interval (5000~7500) +clean_expired_interval() -> + Interval = ?INTERVAL, + Interval * 2 + erlang:phash2(node(), Interval). diff --git a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl index 6fb860b6e..fe6ddfa96 100644 --- a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl +++ b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl @@ -121,6 +121,35 @@ t_cli(_Config) -> ?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res2, [return_maps])), ok. +t_max_size(_Config) -> + {ok, _} = emqx:update_config([log, audit, max_filter_size], 1000), + SizeFun = + fun() -> + AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Limit = "limit=1000", + {ok, Res} = emqx_mgmt_api_test_util:request_api(get, AuditPath, Limit, AuthHeader), + #{<<"data">> := Data} = emqx_utils_json:decode(Res, [return_maps]), + erlang:length(Data) + end, + InitSize = SizeFun(), + lists:foreach( + fun(_) -> + ok = emqx_ctl:run_command(["conf", "show", "log"]) + end, + lists:duplicate(100, 1) + ), + timer:sleep(110), + Size1 = SizeFun(), + ?assert(Size1 - InitSize >= 100, {Size1, InitSize}), + {ok, _} = emqx:update_config([log, audit, max_filter_size], 10), + %% wait for clean_expired + timer:sleep(500), + ExpectSize = emqx:get_config([log, audit, max_filter_size]), + Size2 = SizeFun(), + ?assertEqual(ExpectSize, Size2, {sys:get_state(emqx_audit)}), + ok. + t_kickout_clients_without_log(_) -> process_flag(trap_exit, true), AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),