fix: dont use transation on audit log

This commit is contained in:
zhongwencool 2023-10-25 16:09:11 +08:00
parent efd0cfbda9
commit 9bb22507df
3 changed files with 84 additions and 66 deletions

View File

@ -5,7 +5,6 @@
-define(AUDIT, emqx_audit). -define(AUDIT, emqx_audit).
-record(?AUDIT, { -record(?AUDIT, {
seq,
%% basic info %% basic info
created_at, created_at,
node, node,

View File

@ -14,6 +14,8 @@
-export([start_link/0]). -export([start_link/0]).
-export([log/1, log/2]). -export([log/1, log/2]).
-export([dirty_clean_expired/1]).
%% gen_server callbacks %% gen_server callbacks
-export([ -export([
init/1, init/1,
@ -26,12 +28,15 @@
]). ]).
-define(FILTER_REQ, [cert, host_info, has_sent_resp, pid, path_info, peer, ref, sock, streamid]). -define(FILTER_REQ, [cert, host_info, has_sent_resp, pid, path_info, peer, ref, sock, streamid]).
-define(CLEAN_EXPIRED_MS, 60 * 1000).
-ifdef(TEST).
-define(INTERVAL, 100).
-else.
-define(INTERVAL, 2500).
-endif.
to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) -> to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) ->
#?AUDIT{ #?AUDIT{
created_at = erlang:system_time(microsecond),
node = node(),
operation_id = <<"">>, operation_id = <<"">>,
operation_type = atom_to_binary(Cmd), operation_type = atom_to_binary(Cmd),
args = Args, args = Args,
@ -62,8 +67,6 @@ to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api
duration_ms := DurationMs duration_ms := DurationMs
} = Log, } = Log,
#?AUDIT{ #?AUDIT{
created_at = erlang:system_time(microsecond),
node = node(),
from = From, from = From,
source = Source, source = Source,
source_ip = SourceIp, source_ip = SourceIp,
@ -81,8 +84,6 @@ to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api
}; };
to_audit(#{from := event, event := Event}) -> to_audit(#{from := event, event := Event}) ->
#?AUDIT{ #?AUDIT{
created_at = erlang:system_time(microsecond),
node = node(),
from = event, from = event,
source = <<"">>, source = <<"">>,
source_ip = <<"">>, source_ip = <<"">>,
@ -100,8 +101,6 @@ to_audit(#{from := event, event := Event}) ->
}; };
to_audit(#{from := erlang_console, function := F, args := Args}) -> to_audit(#{from := erlang_console, function := F, args := Args}) ->
#?AUDIT{ #?AUDIT{
created_at = erlang:system_time(microsecond),
node = node(),
from = erlang_console, from = erlang_console,
source = <<"">>, source = <<"">>,
source_ip = <<"">>, source_ip = <<"">>,
@ -127,7 +126,12 @@ log(Level, Meta1) ->
emqx_audit:log(Meta2). emqx_audit:log(Meta2).
log(Log) -> log(Log) ->
gen_server:cast(?MODULE, {write, to_audit(Log)}). Audit0 = to_audit(Log),
Audit = Audit0#?AUDIT{
node = node(),
created_at = erlang:system_time(microsecond)
},
mria:dirty_write(?AUDIT, Audit).
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@ -140,27 +144,28 @@ init([]) ->
{record_name, ?AUDIT}, {record_name, ?AUDIT},
{attributes, record_info(fields, ?AUDIT)} {attributes, record_info(fields, ?AUDIT)}
]), ]),
{ok, #{}, {continue, setup}}. case mria_rlog:role() of
core -> {ok, #{}, {continue, setup}};
_ -> {ok, #{}}
end.
handle_continue(setup, #{} = State) -> handle_continue(setup, State) ->
ok = mria:wait_for_tables([?AUDIT]), ok = mria:wait_for_tables([?AUDIT]),
clean_expired(), clean_expired(),
{noreply, State}. Interval = clean_expired_interval(),
{noreply, State#{interval => Interval}, Interval}.
handle_call(_Request, _From, State = #{}) -> handle_call(_Request, _From, State = #{interval := Interval}) ->
{reply, ok, State}. {reply, ignore, State, Interval}.
handle_cast({write, Log}, State) -> handle_cast(_Request, State = #{interval := Interval}) ->
_ = write_log(Log), {noreply, State, Interval}.
{noreply, State#{}, ?CLEAN_EXPIRED_MS};
handle_cast(_Request, State = #{}) ->
{noreply, State}.
handle_info(timeout, State = #{}) -> handle_info(timeout, State = #{interval := Interval}) ->
clean_expired(), clean_expired(),
{noreply, State, hibernate}; {noreply, State, Interval};
handle_info(_Info, State = #{}) -> handle_info(_Info, State = #{interval := Interval}) ->
{noreply, State}. {noreply, State, Interval}.
terminate(_Reason, _State = #{}) -> terminate(_Reason, _State = #{}) ->
ok. ok.
@ -172,50 +177,35 @@ code_change(_OldVsn, State = #{}, _Extra) ->
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
write_log(Log) ->
case
mria:transaction(
?COMMON_SHARD,
fun(L) ->
New =
case mnesia:last(?AUDIT) of
'$end_of_table' -> 1;
LastId -> LastId + 1
end,
mnesia:write(L#?AUDIT{seq = New})
end,
[Log]
)
of
{atomic, ok} ->
ok;
Reason ->
?SLOG(warning, #{
msg => "write_audit_log_failed",
reason => Reason
})
end.
clean_expired() -> clean_expired() ->
MaxSize = max_size(), MaxSize = max_size(),
LatestId = latest_id(), CurSize = mnesia:table_info(?AUDIT, size),
Min = LatestId - MaxSize, case CurSize - MaxSize of
%% MS = ets:fun2ms(fun(#?AUDIT{seq = Seq}) when Seq =< Min -> true end), DelCount when DelCount > 0 ->
MS = [{#?AUDIT{seq = '$1', _ = '_'}, [{'=<', '$1', Min}], [true]}], mria:async_dirty(
NumDeleted = mnesia:ets(fun ets:select_delete/2, [?AUDIT, MS]), ?COMMON_SHARD,
?SLOG(debug, #{ fun ?MODULE:dirty_clean_expired/1,
msg => "clean_audit_log", [DelCount]
latest_id => LatestId, );
min => Min, _ ->
deleted_number => NumDeleted ok
}),
ok.
latest_id() ->
case mnesia:dirty_last(?AUDIT) of
'$end_of_table' -> 0;
Seq -> Seq
end. end.
dirty_clean_expired(DelCount) ->
dirty_clean_expired(mnesia:dirty_first(?AUDIT), DelCount).
dirty_clean_expired(_, DelCount) when DelCount =< 0 -> ok;
dirty_clean_expired('$end_of_table', _DelCount) ->
ok;
dirty_clean_expired(CurKey, DeleteCount) ->
mnesia:dirty_delete(?AUDIT, CurKey),
dirty_clean_expired(mnesia:dirty_next(?AUDIT, CurKey), DeleteCount - 1).
max_size() -> max_size() ->
emqx_conf:get([log, audit, max_filter_size], 5000). emqx_conf:get([log, audit, max_filter_size], 5000).
%% Try to make the time interval of each node is different.
%% 2 * Interval ~ 3 * Interval (5000~7500)
clean_expired_interval() ->
Interval = ?INTERVAL,
Interval * 2 + erlang:phash2(node(), Interval).

View File

@ -121,6 +121,35 @@ t_cli(_Config) ->
?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res2, [return_maps])), ?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res2, [return_maps])),
ok. ok.
t_max_size(_Config) ->
{ok, _} = emqx:update_config([log, audit, max_filter_size], 1000),
SizeFun =
fun() ->
AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Limit = "limit=1000",
{ok, Res} = emqx_mgmt_api_test_util:request_api(get, AuditPath, Limit, AuthHeader),
#{<<"data">> := Data} = emqx_utils_json:decode(Res, [return_maps]),
erlang:length(Data)
end,
InitSize = SizeFun(),
lists:foreach(
fun(_) ->
ok = emqx_ctl:run_command(["conf", "show", "log"])
end,
lists:duplicate(100, 1)
),
timer:sleep(110),
Size1 = SizeFun(),
?assert(Size1 - InitSize >= 100, {Size1, InitSize}),
{ok, _} = emqx:update_config([log, audit, max_filter_size], 10),
%% wait for clean_expired
timer:sleep(500),
ExpectSize = emqx:get_config([log, audit, max_filter_size]),
Size2 = SizeFun(),
?assertEqual(ExpectSize, Size2, {sys:get_state(emqx_audit)}),
ok.
t_kickout_clients_without_log(_) -> t_kickout_clients_without_log(_) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]), AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),