From 9bb22507df4cc266b325b7f4c7a4554b6b9834c5 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 25 Oct 2023 16:09:11 +0800 Subject: [PATCH 1/4] fix: dont use transation on audit log --- apps/emqx_audit/include/emqx_audit.hrl | 1 - apps/emqx_audit/src/emqx_audit.erl | 120 ++++++++---------- apps/emqx_audit/test/emqx_audit_api_SUITE.erl | 29 +++++ 3 files changed, 84 insertions(+), 66 deletions(-) diff --git a/apps/emqx_audit/include/emqx_audit.hrl b/apps/emqx_audit/include/emqx_audit.hrl index 1b4349387..8304a9060 100644 --- a/apps/emqx_audit/include/emqx_audit.hrl +++ b/apps/emqx_audit/include/emqx_audit.hrl @@ -5,7 +5,6 @@ -define(AUDIT, emqx_audit). -record(?AUDIT, { - seq, %% basic info created_at, node, diff --git a/apps/emqx_audit/src/emqx_audit.erl b/apps/emqx_audit/src/emqx_audit.erl index 4477bbd8b..d00aa493b 100644 --- a/apps/emqx_audit/src/emqx_audit.erl +++ b/apps/emqx_audit/src/emqx_audit.erl @@ -14,6 +14,8 @@ -export([start_link/0]). -export([log/1, log/2]). +-export([dirty_clean_expired/1]). + %% gen_server callbacks -export([ init/1, @@ -26,12 +28,15 @@ ]). -define(FILTER_REQ, [cert, host_info, has_sent_resp, pid, path_info, peer, ref, sock, streamid]). --define(CLEAN_EXPIRED_MS, 60 * 1000). + +-ifdef(TEST). +-define(INTERVAL, 100). +-else. +-define(INTERVAL, 2500). +-endif. to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) -> #?AUDIT{ - created_at = erlang:system_time(microsecond), - node = node(), operation_id = <<"">>, operation_type = atom_to_binary(Cmd), args = Args, @@ -62,8 +67,6 @@ to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api duration_ms := DurationMs } = Log, #?AUDIT{ - created_at = erlang:system_time(microsecond), - node = node(), from = From, source = Source, source_ip = SourceIp, @@ -81,8 +84,6 @@ to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api }; to_audit(#{from := event, event := Event}) -> #?AUDIT{ - created_at = erlang:system_time(microsecond), - node = node(), from = event, source = <<"">>, source_ip = <<"">>, @@ -100,8 +101,6 @@ to_audit(#{from := event, event := Event}) -> }; to_audit(#{from := erlang_console, function := F, args := Args}) -> #?AUDIT{ - created_at = erlang:system_time(microsecond), - node = node(), from = erlang_console, source = <<"">>, source_ip = <<"">>, @@ -127,7 +126,12 @@ log(Level, Meta1) -> emqx_audit:log(Meta2). log(Log) -> - gen_server:cast(?MODULE, {write, to_audit(Log)}). + Audit0 = to_audit(Log), + Audit = Audit0#?AUDIT{ + node = node(), + created_at = erlang:system_time(microsecond) + }, + mria:dirty_write(?AUDIT, Audit). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -140,27 +144,28 @@ init([]) -> {record_name, ?AUDIT}, {attributes, record_info(fields, ?AUDIT)} ]), - {ok, #{}, {continue, setup}}. + case mria_rlog:role() of + core -> {ok, #{}, {continue, setup}}; + _ -> {ok, #{}} + end. -handle_continue(setup, #{} = State) -> +handle_continue(setup, State) -> ok = mria:wait_for_tables([?AUDIT]), clean_expired(), - {noreply, State}. + Interval = clean_expired_interval(), + {noreply, State#{interval => Interval}, Interval}. -handle_call(_Request, _From, State = #{}) -> - {reply, ok, State}. +handle_call(_Request, _From, State = #{interval := Interval}) -> + {reply, ignore, State, Interval}. -handle_cast({write, Log}, State) -> - _ = write_log(Log), - {noreply, State#{}, ?CLEAN_EXPIRED_MS}; -handle_cast(_Request, State = #{}) -> - {noreply, State}. +handle_cast(_Request, State = #{interval := Interval}) -> + {noreply, State, Interval}. -handle_info(timeout, State = #{}) -> +handle_info(timeout, State = #{interval := Interval}) -> clean_expired(), - {noreply, State, hibernate}; -handle_info(_Info, State = #{}) -> - {noreply, State}. + {noreply, State, Interval}; +handle_info(_Info, State = #{interval := Interval}) -> + {noreply, State, Interval}. terminate(_Reason, _State = #{}) -> ok. @@ -172,50 +177,35 @@ code_change(_OldVsn, State = #{}, _Extra) -> %%% Internal functions %%%=================================================================== -write_log(Log) -> - case - mria:transaction( - ?COMMON_SHARD, - fun(L) -> - New = - case mnesia:last(?AUDIT) of - '$end_of_table' -> 1; - LastId -> LastId + 1 - end, - mnesia:write(L#?AUDIT{seq = New}) - end, - [Log] - ) - of - {atomic, ok} -> - ok; - Reason -> - ?SLOG(warning, #{ - msg => "write_audit_log_failed", - reason => Reason - }) - end. - clean_expired() -> MaxSize = max_size(), - LatestId = latest_id(), - Min = LatestId - MaxSize, - %% MS = ets:fun2ms(fun(#?AUDIT{seq = Seq}) when Seq =< Min -> true end), - MS = [{#?AUDIT{seq = '$1', _ = '_'}, [{'=<', '$1', Min}], [true]}], - NumDeleted = mnesia:ets(fun ets:select_delete/2, [?AUDIT, MS]), - ?SLOG(debug, #{ - msg => "clean_audit_log", - latest_id => LatestId, - min => Min, - deleted_number => NumDeleted - }), - ok. - -latest_id() -> - case mnesia:dirty_last(?AUDIT) of - '$end_of_table' -> 0; - Seq -> Seq + CurSize = mnesia:table_info(?AUDIT, size), + case CurSize - MaxSize of + DelCount when DelCount > 0 -> + mria:async_dirty( + ?COMMON_SHARD, + fun ?MODULE:dirty_clean_expired/1, + [DelCount] + ); + _ -> + ok end. +dirty_clean_expired(DelCount) -> + dirty_clean_expired(mnesia:dirty_first(?AUDIT), DelCount). + +dirty_clean_expired(_, DelCount) when DelCount =< 0 -> ok; +dirty_clean_expired('$end_of_table', _DelCount) -> + ok; +dirty_clean_expired(CurKey, DeleteCount) -> + mnesia:dirty_delete(?AUDIT, CurKey), + dirty_clean_expired(mnesia:dirty_next(?AUDIT, CurKey), DeleteCount - 1). + max_size() -> emqx_conf:get([log, audit, max_filter_size], 5000). + +%% Try to make the time interval of each node is different. +%% 2 * Interval ~ 3 * Interval (5000~7500) +clean_expired_interval() -> + Interval = ?INTERVAL, + Interval * 2 + erlang:phash2(node(), Interval). diff --git a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl index 6fb860b6e..fe6ddfa96 100644 --- a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl +++ b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl @@ -121,6 +121,35 @@ t_cli(_Config) -> ?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res2, [return_maps])), ok. +t_max_size(_Config) -> + {ok, _} = emqx:update_config([log, audit, max_filter_size], 1000), + SizeFun = + fun() -> + AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Limit = "limit=1000", + {ok, Res} = emqx_mgmt_api_test_util:request_api(get, AuditPath, Limit, AuthHeader), + #{<<"data">> := Data} = emqx_utils_json:decode(Res, [return_maps]), + erlang:length(Data) + end, + InitSize = SizeFun(), + lists:foreach( + fun(_) -> + ok = emqx_ctl:run_command(["conf", "show", "log"]) + end, + lists:duplicate(100, 1) + ), + timer:sleep(110), + Size1 = SizeFun(), + ?assert(Size1 - InitSize >= 100, {Size1, InitSize}), + {ok, _} = emqx:update_config([log, audit, max_filter_size], 10), + %% wait for clean_expired + timer:sleep(500), + ExpectSize = emqx:get_config([log, audit, max_filter_size]), + Size2 = SizeFun(), + ?assertEqual(ExpectSize, Size2, {sys:get_state(emqx_audit)}), + ok. + t_kickout_clients_without_log(_) -> process_flag(trap_exit, true), AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]), From 995948f0e84608dce7e4e1f6eb807b9e40772fb2 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 27 Oct 2023 11:49:04 +0800 Subject: [PATCH 2/4] refactor: remove seq from audit record --- apps/emqx/include/emqx_trace.hrl | 1 - apps/emqx/include/logger.hrl | 12 +- apps/emqx/src/config/emqx_config_logger.erl | 24 +-- apps/emqx/src/emqx_trace/emqx_trace.erl | 2 +- apps/emqx_audit/src/emqx_audit.erl | 164 +++++++++++------- apps/emqx_audit/src/emqx_audit_api.erl | 4 +- apps/emqx_audit/test/emqx_audit_api_SUITE.erl | 3 +- apps/emqx_conf/src/emqx_conf_cli.erl | 5 +- apps/emqx_dashboard/src/emqx_dashboard.erl | 10 +- .../src/emqx_dashboard_audit.erl | 9 +- apps/emqx_machine/src/emqx_machine_boot.erl | 4 - .../src/emqx_machine_terminator.erl | 7 +- rel/i18n/emqx_audit_api.hocon | 3 +- rel/i18n/emqx_conf_schema.hocon | 4 +- 14 files changed, 141 insertions(+), 111 deletions(-) diff --git a/apps/emqx/include/emqx_trace.hrl b/apps/emqx/include/emqx_trace.hrl index 62028bcc0..3f9316727 100644 --- a/apps/emqx/include/emqx_trace.hrl +++ b/apps/emqx/include/emqx_trace.hrl @@ -32,6 +32,5 @@ -define(SHARD, ?COMMON_SHARD). -define(MAX_SIZE, 30). --define(OWN_KEYS, [level, filters, filter_default, handlers]). -endif. diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index 67f125e5f..904fae0d6 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -40,7 +40,9 @@ end ). +-define(AUDIT_HANDLER, emqx_audit). -define(TRACE_FILTER, emqx_trace_filter). +-define(OWN_KEYS, [level, filters, filter_default, handlers]). -define(TRACE(Tag, Msg, Meta), ?TRACE(debug, Tag, Msg, Meta)). @@ -62,15 +64,15 @@ end). -define(AUDIT(_LevelFun_, _MetaFun_), begin - case emqx_config:get([log, audit], #{enable => false}) of - #{enable := false} -> + case logger_config:get(logger, ?AUDIT_HANDLER) of + {error, {not_found, _}} -> ok; - #{enable := true, level := _AllowLevel_} -> + {ok, Handler = #{level := _AllowLevel_}} -> _Level_ = _LevelFun_, case logger:compare_levels(_AllowLevel_, _Level_) of _R_ when _R_ == lt; _R_ == eq -> - emqx_audit:log(_Level_, _MetaFun_); - gt -> + emqx_audit:log(_Level_, _MetaFun_, Handler); + _ -> ok end end diff --git a/apps/emqx/src/config/emqx_config_logger.erl b/apps/emqx/src/config/emqx_config_logger.erl index 87baef627..ce74db8f0 100644 --- a/apps/emqx/src/config/emqx_config_logger.erl +++ b/apps/emqx/src/config/emqx_config_logger.erl @@ -26,7 +26,6 @@ -include("logger.hrl"). -define(LOG, [log]). --define(AUDIT_HANDLER, emqx_audit). add_handler() -> ok = emqx_config_handler:add_handler(?LOG, ?MODULE), @@ -97,8 +96,11 @@ update_log_handlers(NewHandlers) -> ok = application:set_env(kernel, logger, NewHandlers), ok. +%% Don't remove audit log handler here, we need record this removed action into audit log file. +%% we will remove audit log handler after audit log is record in emqx_audit:log/3. +update_log_handler({removed, ?AUDIT_HANDLER}) -> + ok; update_log_handler({removed, Id}) -> - audit("audit_disabled", Id), log_to_console("Config override: ~s is removed~n", [id_for_log(Id)]), logger:remove_handler(Id); update_log_handler({Action, {handler, Id, Mod, Conf}}) -> @@ -107,7 +109,6 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) -> _ = logger:remove_handler(Id), case logger:add_handler(Id, Mod, Conf) of ok -> - audit("audit_enabled", Id), ok; %% Don't crash here, otherwise the cluster rpc will retry the wrong handler forever. {error, Reason} -> @@ -118,23 +119,6 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) -> end, ok. --ifdef(EMQX_RELEASE_EDITION). - --if(?EMQX_RELEASE_EDITION == ee). -audit(Event, ?AUDIT_HANDLER) -> - emqx_audit:log(alert, #{event => Event, from => event}); -audit(_, _) -> - ok. --else. -audit(_, _) -> - ok. --endif. - --else. -audit(_, _) -> - ok. --endif. - id_for_log(console) -> "log.console"; id_for_log(Other) -> "log.file." ++ atom_to_list(Other). diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 99bbcc5f9..6588c99dc 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -105,7 +105,7 @@ log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) -> ignore -> ignore; Log -> - case logger_config:get(ets:whereis(logger), Id) of + case logger_config:get(logger, Id) of {ok, #{module := Module} = HandlerConfig0} -> HandlerConfig = maps:without(?OWN_KEYS, HandlerConfig0), try diff --git a/apps/emqx_audit/src/emqx_audit.erl b/apps/emqx_audit/src/emqx_audit.erl index d00aa493b..ceaf22507 100644 --- a/apps/emqx_audit/src/emqx_audit.erl +++ b/apps/emqx_audit/src/emqx_audit.erl @@ -12,9 +12,9 @@ %% API -export([start_link/0]). --export([log/1, log/2]). +-export([log/3]). --export([dirty_clean_expired/1]). +-export([trans_clean_expired/2]). %% gen_server callbacks -export([ @@ -32,7 +32,7 @@ -ifdef(TEST). -define(INTERVAL, 100). -else. --define(INTERVAL, 2500). +-define(INTERVAL, 10000). -endif. to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) -> @@ -50,8 +50,6 @@ to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) -> http_method = <<"">>, http_request = <<"">> }; -to_audit(#{http_method := get}) -> - ok; to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api -> #{ source := Source, @@ -82,23 +80,6 @@ to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api duration_ms = DurationMs, args = <<"">> }; -to_audit(#{from := event, event := Event}) -> - #?AUDIT{ - from = event, - source = <<"">>, - source_ip = <<"">>, - %% operation info - operation_id = iolist_to_binary(Event), - operation_type = <<"">>, - operation_result = <<"">>, - failure = <<"">>, - %% request detail - http_status_code = <<"">>, - http_method = <<"">>, - http_request = <<"">>, - duration_ms = 0, - args = <<"">> - }; to_audit(#{from := erlang_console, function := F, args := Args}) -> #?AUDIT{ from = erlang_console, @@ -117,15 +98,22 @@ to_audit(#{from := erlang_console, function := F, args := Args}) -> args = iolist_to_binary(io_lib:format("~p: ~p~n", [F, Args])) }. -log(_Level, undefined) -> +log(_Level, undefined, _Handler) -> ok; -log(Level, Meta1) -> +log(Level, Meta1, Handler) -> Meta2 = Meta1#{time => logger:timestamp(), level => Level}, - Filter = [{emqx_audit, fun(L, _) -> L end, undefined, undefined}], - emqx_trace:log(Level, Filter, undefined, Meta2), - emqx_audit:log(Meta2). + log_to_file(Level, Meta2, Handler), + log_to_db(Meta2), + remove_handler_when_disabled(). -log(Log) -> +remove_handler_when_disabled() -> + case emqx_config:get([log, audit, enable], false) of + true -> ok; + false -> _ = logger:remove_handler(?AUDIT_HANDLER) + end, + ok. + +log_to_db(Log) -> Audit0 = to_audit(Log), Audit = Audit0#?AUDIT{ node = node(), @@ -144,68 +132,112 @@ init([]) -> {record_name, ?AUDIT}, {attributes, record_info(fields, ?AUDIT)} ]), - case mria_rlog:role() of - core -> {ok, #{}, {continue, setup}}; - _ -> {ok, #{}} - end. + {ok, #{}, {continue, setup}}. handle_continue(setup, State) -> ok = mria:wait_for_tables([?AUDIT]), - clean_expired(), - Interval = clean_expired_interval(), - {noreply, State#{interval => Interval}, Interval}. + NewState = State#{role => mria_rlog:role()}, + ?AUDIT(alert, #{ + cmd => emqx, + args => ["start"], + version => emqx_release:version(), + from => cli, + duration_ms => 0 + }), + {noreply, NewState, interval(NewState)}. -handle_call(_Request, _From, State = #{interval := Interval}) -> - {reply, ignore, State, Interval}. +handle_call(_Request, _From, State) -> + {reply, ignore, State, interval(State)}. -handle_cast(_Request, State = #{interval := Interval}) -> - {noreply, State, Interval}. +handle_cast(_Request, State) -> + {noreply, State, interval(State)}. -handle_info(timeout, State = #{interval := Interval}) -> - clean_expired(), - {noreply, State, Interval}; -handle_info(_Info, State = #{interval := Interval}) -> - {noreply, State, Interval}. +handle_info(timeout, State) -> + ExtraWait = clean_expired_logs(), + {noreply, State, interval(State) + ExtraWait}; +handle_info(_Info, State) -> + {noreply, State, interval(State)}. -terminate(_Reason, _State = #{}) -> +terminate(_Reason, _State) -> ok. -code_change(_OldVsn, State = #{}, _Extra) -> +code_change(_OldVsn, State, _Extra) -> {ok, State}. %%%=================================================================== %%% Internal functions %%%=================================================================== -clean_expired() -> +%% if clean_expired transaction aborted, it will be scheduled with extra 60 seconds. +clean_expired_logs() -> MaxSize = max_size(), + Oldest = mnesia:dirty_first(?AUDIT), CurSize = mnesia:table_info(?AUDIT, size), case CurSize - MaxSize of - DelCount when DelCount > 0 -> - mria:async_dirty( - ?COMMON_SHARD, - fun ?MODULE:dirty_clean_expired/1, - [DelCount] - ); + DelSize when DelSize > 0 -> + case + mria:transaction( + ?COMMON_SHARD, + fun ?MODULE:trans_clean_expired/2, + [Oldest, DelSize] + ) + of + {atomic, ok} -> + 0; + {aborted, Reason} -> + ?SLOG(error, #{ + msg => "clean_expired_audit_aborted", + reason => Reason, + delete_size => DelSize, + current_size => CurSize, + max_count => MaxSize + }), + 60000 + end; _ -> - ok + 0 end. -dirty_clean_expired(DelCount) -> - dirty_clean_expired(mnesia:dirty_first(?AUDIT), DelCount). +trans_clean_expired(Oldest, DelCount) -> + First = mnesia:first(?AUDIT), + %% Other node already clean from the oldest record. + %% ensure not delete twice, otherwise records that should not be deleted will be deleted. + case First =:= Oldest of + true -> do_clean_expired(First, DelCount); + false -> ok + end. -dirty_clean_expired(_, DelCount) when DelCount =< 0 -> ok; -dirty_clean_expired('$end_of_table', _DelCount) -> +do_clean_expired(_, DelSize) when DelSize =< 0 -> ok; +do_clean_expired('$end_of_table', _DelSize) -> ok; -dirty_clean_expired(CurKey, DeleteCount) -> - mnesia:dirty_delete(?AUDIT, CurKey), - dirty_clean_expired(mnesia:dirty_next(?AUDIT, CurKey), DeleteCount - 1). +do_clean_expired(CurKey, DeleteSize) -> + mnesia:delete(?AUDIT, CurKey, sticky_write), + do_clean_expired(mnesia:next(?AUDIT, CurKey), DeleteSize - 1). max_size() -> emqx_conf:get([log, audit, max_filter_size], 5000). -%% Try to make the time interval of each node is different. -%% 2 * Interval ~ 3 * Interval (5000~7500) -clean_expired_interval() -> - Interval = ?INTERVAL, - Interval * 2 + erlang:phash2(node(), Interval). +interval(#{role := replicant}) -> hibernate; +interval(#{role := core}) -> ?INTERVAL + rand:uniform(?INTERVAL). + +log_to_file(Level, Meta, #{module := Module} = Handler) -> + Log = #{level => Level, meta => Meta, msg => undefined}, + Handler1 = maps:without(?OWN_KEYS, Handler), + try + erlang:apply(Module, log, [Log, Handler1]) + catch + C:R:S -> + case logger:remove_handler(?AUDIT_HANDLER) of + ok -> + logger:internal_log( + error, {removed_failing_handler, ?AUDIT_HANDLER, C, R, S} + ); + {error, {not_found, _}} -> + ok; + {error, Reason} -> + logger:internal_log( + error, + {removed_handler_failed, ?AUDIT_HANDLER, Reason, C, R, S} + ) + end + end. diff --git a/apps/emqx_audit/src/emqx_audit_api.erl b/apps/emqx_audit/src/emqx_audit_api.erl index aaa364464..81553dca0 100644 --- a/apps/emqx_audit/src/emqx_audit_api.erl +++ b/apps/emqx_audit/src/emqx_audit_api.erl @@ -59,7 +59,7 @@ schema("/audit") -> desc => ?DESC(filter_node) })}, {from, - ?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console, event]), #{ + ?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console]), #{ in => query, required => false, example => <<"dashboard">>, @@ -175,7 +175,7 @@ fields(audit) -> desc => "The node name to which the log is created" })}, {from, - ?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console, event]), #{ + ?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console]), #{ desc => "The source type of the log" })}, {source, diff --git a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl index fe6ddfa96..eebb8f770 100644 --- a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl +++ b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl @@ -49,6 +49,7 @@ init_per_suite(Config) -> emqx_mgmt_api_test_util:init_suite([emqx_ctl, emqx_conf, emqx_audit]), ok = emqx_common_test_helpers:load_config(emqx_enterprise_schema, ?CONF_DEFAULT), emqx_config:save_schema_mod_and_names(emqx_enterprise_schema), + ok = emqx_config_logger:refresh_config(), application:set_env(emqx, boot_modules, []), emqx_conf_cli:load(), Config. @@ -144,7 +145,7 @@ t_max_size(_Config) -> ?assert(Size1 - InitSize >= 100, {Size1, InitSize}), {ok, _} = emqx:update_config([log, audit, max_filter_size], 10), %% wait for clean_expired - timer:sleep(500), + timer:sleep(250), ExpectSize = emqx:get_config([log, audit, max_filter_size]), Size2 = SizeFun(), ?assertEqual(ExpectSize, Size2, {sys:get_state(emqx_audit)}), diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index b47d1f961..a5082a419 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -40,7 +40,10 @@ load() -> emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, [hidden]), emqx_ctl:register_command(?CONF, {?MODULE, conf}, []), - emqx_ctl:register_command(?AUDIT_MOD, {?MODULE, audit}, [hidden]), + case emqx_release:edition() of + ee -> emqx_ctl:register_command(?AUDIT_MOD, {?MODULE, audit}, [hidden]); + ce -> ok + end, ok. unload() -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard.erl b/apps/emqx_dashboard/src/emqx_dashboard.erl index cf4330e34..ab6204235 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard.erl @@ -72,7 +72,7 @@ start_listeners(Listeners) -> base_path => emqx_dashboard_swagger:base_path(), modules => minirest_api:find_api_modules(apps()), authorization => Authorization, - log => fun emqx_dashboard_audit:log/2, + log => audit_log_fun(), security => [#{'basicAuth' => []}, #{'bearerAuth' => []}], swagger_global_spec => GlobalSpec, dispatch => dispatch(), @@ -210,9 +210,17 @@ filter_false(K, V, S) -> [{K, V} | S]. listener_name(Protocol) -> list_to_atom(atom_to_list(Protocol) ++ ":dashboard"). +audit_log_fun() -> + case emqx_release:edition() of + ee -> fun emqx_dashboard_audit:log/2; + ce -> undefined + end. + -if(?EMQX_RELEASE_EDITION =/= ee). + %% dialyzer complains about the `unauthorized_role' clause... -dialyzer({no_match, [authorize/1, api_key_authorize/3]}). + -endif. authorize(Req) -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_audit.erl b/apps/emqx_dashboard/src/emqx_dashboard_audit.erl index c2ef1a99f..4b51b2cb0 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_audit.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_audit.erl @@ -37,10 +37,11 @@ log(#{code := Code, method := Method} = Meta, Req) -> ?AUDIT(level(Method, Code), log_meta(Meta, Req)). log_meta(Meta, Req) -> - #{operation_id := OperationId} = Meta, + #{operation_id := OperationId, method := Method} = Meta, case - lists:member(OperationId, ?HIGH_FREQUENCY_REQUESTS) andalso - ignore_high_frequency_request() + Method =:= get orelse + (lists:member(OperationId, ?HIGH_FREQUENCY_REQUESTS) andalso + ignore_high_frequency_request()) of true -> undefined; @@ -53,7 +54,7 @@ log_meta(Meta, Req) -> source_ip => source_ip(Req), operation_type => operation_type(Meta), %% method for http filter api. - http_method => maps:get(method, Meta), + http_method => Method, http_request => http_request(Meta), http_status_code => maps:get(code, Meta), operation_result => operation_result(Meta), diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index aa6180e23..026a82cbf 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -47,10 +47,6 @@ post_boot() -> ok = ensure_apps_started(), ok = print_vsn(), ok = start_autocluster(), - ?AUDIT(alert, #{ - event => "emqx_start", - from => event - }), ignore. -ifdef(TEST). diff --git a/apps/emqx_machine/src/emqx_machine_terminator.erl b/apps/emqx_machine/src/emqx_machine_terminator.erl index d43d5fea9..28603b9f6 100644 --- a/apps/emqx_machine/src/emqx_machine_terminator.erl +++ b/apps/emqx_machine/src/emqx_machine_terminator.erl @@ -68,8 +68,11 @@ graceful() -> %% @doc Shutdown the Erlang VM and wait indefinitely. graceful_wait() -> ?AUDIT(alert, #{ - event => "emqx_gracefully_stop", - from => event + cmd => emqx, + args => ["stop"], + version => emqx_release:version(), + from => cli, + duration_ms => element(1, erlang:statistics(wall_clock)) }), ok = graceful(), exit_loop(). diff --git a/rel/i18n/emqx_audit_api.hocon b/rel/i18n/emqx_audit_api.hocon index 37080838b..040c0009f 100644 --- a/rel/i18n/emqx_audit_api.hocon +++ b/rel/i18n/emqx_audit_api.hocon @@ -17,13 +17,12 @@ filter_from.desc: `rest_api`: API KEY request logs. `cli`: The emqx command line logs. `erlang_console`: The emqx remote_console run function logs. -`event`: Logs related to events such as emqx_start, emqx_gracefully_stop, audit_enabled, and audit_disabled.""" filter_source.desc: """"Filter logs based on source, Possible values are: The login username when logs are generated from the dashboard. The API Keys when logs are generated from the REST API. -empty string when logs are generated from CLI, Erlang console, or an event.""" +empty string when logs are generated from CLI, Erlang console.""" filter_source_ip.desc: "Filter logs based on source ip when logs are generated from dashboard and REST API." diff --git a/rel/i18n/emqx_conf_schema.hocon b/rel/i18n/emqx_conf_schema.hocon index ff2c3109a..2497244a5 100644 --- a/rel/i18n/emqx_conf_schema.hocon +++ b/rel/i18n/emqx_conf_schema.hocon @@ -726,7 +726,9 @@ audit_handler_level.label: """Log Level""" audit_log_max_filter_limit.desc: -"""Store the latest N log entries in a database for allow `/audit` HTTP API to filter and retrieval of log data.""" +"""Store the latest N log entries in a database for allow `/audit` HTTP API to filter and retrieval of log data. +The interval for purging redundant log records is maintained within a range of 10~20 seconds. +""" audit_log_max_filter_limit.label: """Max Filter Limit""" From 22223dc536586b721d1a967424957ca2fc22c218 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 27 Oct 2023 16:11:30 +0800 Subject: [PATCH 3/4] fix: return 400 when audit log is disabled --- apps/emqx_audit/src/emqx_audit_api.erl | 48 ++++++++++------- apps/emqx_audit/test/emqx_audit_api_SUITE.erl | 52 +++++++++++++++++-- rel/i18n/emqx_audit_api.hocon | 2 +- 3 files changed, 80 insertions(+), 22 deletions(-) diff --git a/apps/emqx_audit/src/emqx_audit_api.erl b/apps/emqx_audit/src/emqx_audit_api.erl index 81553dca0..a7fd8f4ad 100644 --- a/apps/emqx_audit/src/emqx_audit_api.erl +++ b/apps/emqx_audit/src/emqx_audit_api.erl @@ -35,6 +35,7 @@ {<<"gte_duration_ms">>, timestamp}, {<<"lte_duration_ms">>, timestamp} ]). +-define(DISABLE_MSG, <<"Audit is disabled">>). namespace() -> "audit". @@ -151,7 +152,11 @@ schema("/audit") -> emqx_dashboard_swagger:schema_with_example( array(?REF(audit_list)), audit_log_list_example() - ) + ), + 400 => emqx_dashboard_swagger:error_codes( + ['BAD_REQUEST'], + ?DISABLE_MSG + ) } } }. @@ -232,23 +237,30 @@ fields(http_request) -> ]. audit(get, #{query_string := QueryString}) -> - case - emqx_mgmt_api:node_query( - node(), - ?AUDIT, - QueryString, - ?AUDIT_QS_SCHEMA, - fun ?MODULE:qs2ms/2, - fun ?MODULE:format/1 - ) - of - {error, page_limit_invalid} -> - {400, #{code => 'BAD_REQUEST', message => <<"page_limit_invalid">>}}; - {error, Node, Error} -> - Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])), - {500, #{code => <<"NODE_DOWN">>, message => Message}}; - Result -> - {200, Result} + case emqx_config:get([log, audit, enable], false) of + false -> + {400, #{code => 'BAD_REQUEST', message => ?DISABLE_MSG}}; + true -> + case + emqx_mgmt_api:node_query( + node(), + ?AUDIT, + QueryString, + ?AUDIT_QS_SCHEMA, + fun ?MODULE:qs2ms/2, + fun ?MODULE:format/1 + ) + of + {error, page_limit_invalid} -> + {400, #{code => 'BAD_REQUEST', message => <<"page_limit_invalid">>}}; + {error, Node, Error} -> + Message = list_to_binary( + io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error]) + ), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Result -> + {200, Result} + end end. qs2ms(_Tab, {Qs, _}) -> diff --git a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl index eebb8f770..a402efe31 100644 --- a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl +++ b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl @@ -19,6 +19,21 @@ -include_lib("eunit/include/eunit.hrl"). +all() -> + [ + {group, enabled}, + {group, disabled} + ]. + +groups() -> + [ + {enabled, [sequence], common_tests() -- [t_disabled]}, + {disabled, [sequence], [t_disabled]} + ]. + +common_tests() -> + emqx_common_test_helpers:all(?MODULE). + -define(CONF_DEFAULT, #{ node => #{ @@ -40,9 +55,6 @@ } }). -all() -> - emqx_common_test_helpers:all(?MODULE). - init_per_suite(Config) -> _ = application:load(emqx_conf), emqx_config:erase_all(), @@ -90,6 +102,40 @@ t_http_api(_) -> ), ok. +t_disabled(_) -> + Enable = [log, audit, enable], + ?assertEqual(true, emqx:get_config(Enable)), + AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + {ok, _} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader), + Size1 = mnesia:table_info(emqx_audit, size), + + {ok, Logs} = emqx_mgmt_api_configs_SUITE:get_config("log"), + Logs1 = emqx_utils_maps:deep_put([<<"audit">>, <<"max_filter_size">>], Logs, 100), + NewLogs = emqx_utils_maps:deep_put([<<"audit">>, <<"enable">>], Logs1, false), + {ok, _} = emqx_mgmt_api_configs_SUITE:update_config("log", NewLogs), + ?assertMatch( + {error, _}, + emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader) + ), + + Size2 = mnesia:table_info(emqx_audit, size), + %% Record the audit disable action, so the size + 1 + ?assertEqual(Size1 + 1, Size2), + + {ok, Zones} = emqx_mgmt_api_configs_SUITE:get_global_zone(), + NewZones = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_topic_levels">>], Zones, 111), + {ok, #{<<"mqtt">> := Res}} = emqx_mgmt_api_configs_SUITE:update_global_zone(NewZones), + ?assertMatch(#{<<"max_topic_levels">> := 111}, Res), + Size3 = mnesia:table_info(emqx_audit, size), + %% Don't record mqtt update request. + ?assertEqual(Size2, Size3), + %% enabled again + {ok, _} = emqx_mgmt_api_configs_SUITE:update_config("log", Logs1), + Size4 = mnesia:table_info(emqx_audit, size), + ?assertEqual(Size3 + 1, Size4), + ok. + t_cli(_Config) -> ok = emqx_ctl:run_command(["conf", "show", "log"]), AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]), diff --git a/rel/i18n/emqx_audit_api.hocon b/rel/i18n/emqx_audit_api.hocon index 040c0009f..40741310e 100644 --- a/rel/i18n/emqx_audit_api.hocon +++ b/rel/i18n/emqx_audit_api.hocon @@ -16,7 +16,7 @@ filter_from.desc: `dashboard`: Dashboard request logs, requiring the use of a jwt_token. `rest_api`: API KEY request logs. `cli`: The emqx command line logs. -`erlang_console`: The emqx remote_console run function logs. +`erlang_console`: The emqx remote_console run function logs.""" filter_source.desc: """"Filter logs based on source, Possible values are: From 0634ff61c0570564f7113a042b194c5ca258256b Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 27 Oct 2023 17:27:38 +0800 Subject: [PATCH 4/4] fix: dialyzer warning --- apps/emqx/include/logger.hrl | 14 ++++++++++++++ apps/emqx_audit/src/emqx_audit.erl | 10 ++++++---- apps/emqx_audit/test/emqx_audit_api_SUITE.erl | 12 +++++++----- apps/emqx_conf/src/emqx_conf_cli.erl | 2 ++ apps/emqx_dashboard/src/emqx_dashboard.erl | 2 ++ rel/i18n/emqx_conf_schema.hocon | 3 +-- 6 files changed, 32 insertions(+), 11 deletions(-) diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index 904fae0d6..a40f9dc9c 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -63,6 +63,10 @@ ) end). +-ifdef(EMQX_RELEASE_EDITION). + +-if(?EMQX_RELEASE_EDITION == ee). + -define(AUDIT(_LevelFun_, _MetaFun_), begin case logger_config:get(logger, ?AUDIT_HANDLER) of {error, {not_found, _}} -> @@ -78,6 +82,16 @@ end). end end). +-else. +%% Only for compile pass, ce edition will not call it +-define(AUDIT(_L_, _M_), _ = {_L_, _M_}). +-endif. + +-else. +%% Only for compile pass, ce edition will not call it +-define(AUDIT(_L_, _M_), _ = {_L_, _M_}). +-endif. + %% print to 'user' group leader -define(ULOG(Fmt, Args), io:format(user, Fmt, Args)). -define(ELOG(Fmt, Args), io:format(standard_error, Fmt, Args)). diff --git a/apps/emqx_audit/src/emqx_audit.erl b/apps/emqx_audit/src/emqx_audit.erl index ceaf22507..98f4a70e8 100644 --- a/apps/emqx_audit/src/emqx_audit.erl +++ b/apps/emqx_audit/src/emqx_audit.erl @@ -108,10 +108,12 @@ log(Level, Meta1, Handler) -> remove_handler_when_disabled() -> case emqx_config:get([log, audit, enable], false) of - true -> ok; - false -> _ = logger:remove_handler(?AUDIT_HANDLER) - end, - ok. + true -> + ok; + false -> + _ = logger:remove_handler(?AUDIT_HANDLER), + ok + end. log_to_db(Log) -> Audit0 = to_audit(Log), diff --git a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl index a402efe31..50b39d240 100644 --- a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl +++ b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl @@ -21,14 +21,12 @@ all() -> [ - {group, enabled}, - {group, disabled} + {group, audit, [sequence]} ]. groups() -> [ - {enabled, [sequence], common_tests() -- [t_disabled]}, - {disabled, [sequence], [t_disabled]} + {audit, [sequence], common_tests()} ]. common_tests() -> @@ -114,6 +112,8 @@ t_disabled(_) -> Logs1 = emqx_utils_maps:deep_put([<<"audit">>, <<"max_filter_size">>], Logs, 100), NewLogs = emqx_utils_maps:deep_put([<<"audit">>, <<"enable">>], Logs1, false), {ok, _} = emqx_mgmt_api_configs_SUITE:update_config("log", NewLogs), + {ok, GetLog1} = emqx_mgmt_api_configs_SUITE:get_config("log"), + ?assertEqual(NewLogs, GetLog1), ?assertMatch( {error, _}, emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader) @@ -132,6 +132,8 @@ t_disabled(_) -> ?assertEqual(Size2, Size3), %% enabled again {ok, _} = emqx_mgmt_api_configs_SUITE:update_config("log", Logs1), + {ok, GetLog2} = emqx_mgmt_api_configs_SUITE:get_config("log"), + ?assertEqual(Logs1, GetLog2), Size4 = mnesia:table_info(emqx_audit, size), ?assertEqual(Size3 + 1, Size4), ok. @@ -243,4 +245,4 @@ kickout_clients() -> {ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]), - ?assertMatch(#{<<"meta">> := #{<<"count">> := 0}}, ClientsResponse2). + ?assertMatch(#{<<"data">> := []}, ClientsResponse2). diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index a5082a419..fc00c7dc9 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -37,6 +37,8 @@ -define(AUDIT_MOD, audit). -define(UPDATE_READONLY_KEYS_PROHIBITED, "update_readonly_keys_prohibited"). +-dialyzer({no_match, [load/0]}). + load() -> emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, [hidden]), emqx_ctl:register_command(?CONF, {?MODULE, conf}, []), diff --git a/apps/emqx_dashboard/src/emqx_dashboard.erl b/apps/emqx_dashboard/src/emqx_dashboard.erl index ab6204235..96f81ca84 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard.erl @@ -210,6 +210,8 @@ filter_false(K, V, S) -> [{K, V} | S]. listener_name(Protocol) -> list_to_atom(atom_to_list(Protocol) ++ ":dashboard"). +-dialyzer({no_match, [audit_log_fun/0]}). + audit_log_fun() -> case emqx_release:edition() of ee -> fun emqx_dashboard_audit:log/2; diff --git a/rel/i18n/emqx_conf_schema.hocon b/rel/i18n/emqx_conf_schema.hocon index 2497244a5..64b96541e 100644 --- a/rel/i18n/emqx_conf_schema.hocon +++ b/rel/i18n/emqx_conf_schema.hocon @@ -727,8 +727,7 @@ audit_handler_level.label: audit_log_max_filter_limit.desc: """Store the latest N log entries in a database for allow `/audit` HTTP API to filter and retrieval of log data. -The interval for purging redundant log records is maintained within a range of 10~20 seconds. -""" +The interval for purging redundant log records is maintained within a range of 10~20 seconds.""" audit_log_max_filter_limit.label: """Max Filter Limit"""