From 995948f0e84608dce7e4e1f6eb807b9e40772fb2 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 27 Oct 2023 11:49:04 +0800 Subject: [PATCH] refactor: remove seq from audit record --- apps/emqx/include/emqx_trace.hrl | 1 - apps/emqx/include/logger.hrl | 12 +- apps/emqx/src/config/emqx_config_logger.erl | 24 +-- apps/emqx/src/emqx_trace/emqx_trace.erl | 2 +- apps/emqx_audit/src/emqx_audit.erl | 164 +++++++++++------- apps/emqx_audit/src/emqx_audit_api.erl | 4 +- apps/emqx_audit/test/emqx_audit_api_SUITE.erl | 3 +- apps/emqx_conf/src/emqx_conf_cli.erl | 5 +- apps/emqx_dashboard/src/emqx_dashboard.erl | 10 +- .../src/emqx_dashboard_audit.erl | 9 +- apps/emqx_machine/src/emqx_machine_boot.erl | 4 - .../src/emqx_machine_terminator.erl | 7 +- rel/i18n/emqx_audit_api.hocon | 3 +- rel/i18n/emqx_conf_schema.hocon | 4 +- 14 files changed, 141 insertions(+), 111 deletions(-) diff --git a/apps/emqx/include/emqx_trace.hrl b/apps/emqx/include/emqx_trace.hrl index 62028bcc0..3f9316727 100644 --- a/apps/emqx/include/emqx_trace.hrl +++ b/apps/emqx/include/emqx_trace.hrl @@ -32,6 +32,5 @@ -define(SHARD, ?COMMON_SHARD). -define(MAX_SIZE, 30). --define(OWN_KEYS, [level, filters, filter_default, handlers]). -endif. diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index 67f125e5f..904fae0d6 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -40,7 +40,9 @@ end ). +-define(AUDIT_HANDLER, emqx_audit). -define(TRACE_FILTER, emqx_trace_filter). +-define(OWN_KEYS, [level, filters, filter_default, handlers]). -define(TRACE(Tag, Msg, Meta), ?TRACE(debug, Tag, Msg, Meta)). @@ -62,15 +64,15 @@ end). -define(AUDIT(_LevelFun_, _MetaFun_), begin - case emqx_config:get([log, audit], #{enable => false}) of - #{enable := false} -> + case logger_config:get(logger, ?AUDIT_HANDLER) of + {error, {not_found, _}} -> ok; - #{enable := true, level := _AllowLevel_} -> + {ok, Handler = #{level := _AllowLevel_}} -> _Level_ = _LevelFun_, case logger:compare_levels(_AllowLevel_, _Level_) of _R_ when _R_ == lt; _R_ == eq -> - emqx_audit:log(_Level_, _MetaFun_); - gt -> + emqx_audit:log(_Level_, _MetaFun_, Handler); + _ -> ok end end diff --git a/apps/emqx/src/config/emqx_config_logger.erl b/apps/emqx/src/config/emqx_config_logger.erl index 87baef627..ce74db8f0 100644 --- a/apps/emqx/src/config/emqx_config_logger.erl +++ b/apps/emqx/src/config/emqx_config_logger.erl @@ -26,7 +26,6 @@ -include("logger.hrl"). -define(LOG, [log]). --define(AUDIT_HANDLER, emqx_audit). add_handler() -> ok = emqx_config_handler:add_handler(?LOG, ?MODULE), @@ -97,8 +96,11 @@ update_log_handlers(NewHandlers) -> ok = application:set_env(kernel, logger, NewHandlers), ok. +%% Don't remove audit log handler here, we need record this removed action into audit log file. +%% we will remove audit log handler after audit log is record in emqx_audit:log/3. +update_log_handler({removed, ?AUDIT_HANDLER}) -> + ok; update_log_handler({removed, Id}) -> - audit("audit_disabled", Id), log_to_console("Config override: ~s is removed~n", [id_for_log(Id)]), logger:remove_handler(Id); update_log_handler({Action, {handler, Id, Mod, Conf}}) -> @@ -107,7 +109,6 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) -> _ = logger:remove_handler(Id), case logger:add_handler(Id, Mod, Conf) of ok -> - audit("audit_enabled", Id), ok; %% Don't crash here, otherwise the cluster rpc will retry the wrong handler forever. {error, Reason} -> @@ -118,23 +119,6 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) -> end, ok. --ifdef(EMQX_RELEASE_EDITION). - --if(?EMQX_RELEASE_EDITION == ee). -audit(Event, ?AUDIT_HANDLER) -> - emqx_audit:log(alert, #{event => Event, from => event}); -audit(_, _) -> - ok. --else. -audit(_, _) -> - ok. --endif. - --else. -audit(_, _) -> - ok. --endif. - id_for_log(console) -> "log.console"; id_for_log(Other) -> "log.file." ++ atom_to_list(Other). diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 99bbcc5f9..6588c99dc 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -105,7 +105,7 @@ log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) -> ignore -> ignore; Log -> - case logger_config:get(ets:whereis(logger), Id) of + case logger_config:get(logger, Id) of {ok, #{module := Module} = HandlerConfig0} -> HandlerConfig = maps:without(?OWN_KEYS, HandlerConfig0), try diff --git a/apps/emqx_audit/src/emqx_audit.erl b/apps/emqx_audit/src/emqx_audit.erl index d00aa493b..ceaf22507 100644 --- a/apps/emqx_audit/src/emqx_audit.erl +++ b/apps/emqx_audit/src/emqx_audit.erl @@ -12,9 +12,9 @@ %% API -export([start_link/0]). --export([log/1, log/2]). +-export([log/3]). --export([dirty_clean_expired/1]). +-export([trans_clean_expired/2]). %% gen_server callbacks -export([ @@ -32,7 +32,7 @@ -ifdef(TEST). -define(INTERVAL, 100). -else. --define(INTERVAL, 2500). +-define(INTERVAL, 10000). -endif. to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) -> @@ -50,8 +50,6 @@ to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) -> http_method = <<"">>, http_request = <<"">> }; -to_audit(#{http_method := get}) -> - ok; to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api -> #{ source := Source, @@ -82,23 +80,6 @@ to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api duration_ms = DurationMs, args = <<"">> }; -to_audit(#{from := event, event := Event}) -> - #?AUDIT{ - from = event, - source = <<"">>, - source_ip = <<"">>, - %% operation info - operation_id = iolist_to_binary(Event), - operation_type = <<"">>, - operation_result = <<"">>, - failure = <<"">>, - %% request detail - http_status_code = <<"">>, - http_method = <<"">>, - http_request = <<"">>, - duration_ms = 0, - args = <<"">> - }; to_audit(#{from := erlang_console, function := F, args := Args}) -> #?AUDIT{ from = erlang_console, @@ -117,15 +98,22 @@ to_audit(#{from := erlang_console, function := F, args := Args}) -> args = iolist_to_binary(io_lib:format("~p: ~p~n", [F, Args])) }. -log(_Level, undefined) -> +log(_Level, undefined, _Handler) -> ok; -log(Level, Meta1) -> +log(Level, Meta1, Handler) -> Meta2 = Meta1#{time => logger:timestamp(), level => Level}, - Filter = [{emqx_audit, fun(L, _) -> L end, undefined, undefined}], - emqx_trace:log(Level, Filter, undefined, Meta2), - emqx_audit:log(Meta2). + log_to_file(Level, Meta2, Handler), + log_to_db(Meta2), + remove_handler_when_disabled(). -log(Log) -> +remove_handler_when_disabled() -> + case emqx_config:get([log, audit, enable], false) of + true -> ok; + false -> _ = logger:remove_handler(?AUDIT_HANDLER) + end, + ok. + +log_to_db(Log) -> Audit0 = to_audit(Log), Audit = Audit0#?AUDIT{ node = node(), @@ -144,68 +132,112 @@ init([]) -> {record_name, ?AUDIT}, {attributes, record_info(fields, ?AUDIT)} ]), - case mria_rlog:role() of - core -> {ok, #{}, {continue, setup}}; - _ -> {ok, #{}} - end. + {ok, #{}, {continue, setup}}. handle_continue(setup, State) -> ok = mria:wait_for_tables([?AUDIT]), - clean_expired(), - Interval = clean_expired_interval(), - {noreply, State#{interval => Interval}, Interval}. + NewState = State#{role => mria_rlog:role()}, + ?AUDIT(alert, #{ + cmd => emqx, + args => ["start"], + version => emqx_release:version(), + from => cli, + duration_ms => 0 + }), + {noreply, NewState, interval(NewState)}. -handle_call(_Request, _From, State = #{interval := Interval}) -> - {reply, ignore, State, Interval}. +handle_call(_Request, _From, State) -> + {reply, ignore, State, interval(State)}. -handle_cast(_Request, State = #{interval := Interval}) -> - {noreply, State, Interval}. +handle_cast(_Request, State) -> + {noreply, State, interval(State)}. -handle_info(timeout, State = #{interval := Interval}) -> - clean_expired(), - {noreply, State, Interval}; -handle_info(_Info, State = #{interval := Interval}) -> - {noreply, State, Interval}. +handle_info(timeout, State) -> + ExtraWait = clean_expired_logs(), + {noreply, State, interval(State) + ExtraWait}; +handle_info(_Info, State) -> + {noreply, State, interval(State)}. -terminate(_Reason, _State = #{}) -> +terminate(_Reason, _State) -> ok. -code_change(_OldVsn, State = #{}, _Extra) -> +code_change(_OldVsn, State, _Extra) -> {ok, State}. %%%=================================================================== %%% Internal functions %%%=================================================================== -clean_expired() -> +%% if clean_expired transaction aborted, it will be scheduled with extra 60 seconds. +clean_expired_logs() -> MaxSize = max_size(), + Oldest = mnesia:dirty_first(?AUDIT), CurSize = mnesia:table_info(?AUDIT, size), case CurSize - MaxSize of - DelCount when DelCount > 0 -> - mria:async_dirty( - ?COMMON_SHARD, - fun ?MODULE:dirty_clean_expired/1, - [DelCount] - ); + DelSize when DelSize > 0 -> + case + mria:transaction( + ?COMMON_SHARD, + fun ?MODULE:trans_clean_expired/2, + [Oldest, DelSize] + ) + of + {atomic, ok} -> + 0; + {aborted, Reason} -> + ?SLOG(error, #{ + msg => "clean_expired_audit_aborted", + reason => Reason, + delete_size => DelSize, + current_size => CurSize, + max_count => MaxSize + }), + 60000 + end; _ -> - ok + 0 end. -dirty_clean_expired(DelCount) -> - dirty_clean_expired(mnesia:dirty_first(?AUDIT), DelCount). +trans_clean_expired(Oldest, DelCount) -> + First = mnesia:first(?AUDIT), + %% Other node already clean from the oldest record. + %% ensure not delete twice, otherwise records that should not be deleted will be deleted. + case First =:= Oldest of + true -> do_clean_expired(First, DelCount); + false -> ok + end. -dirty_clean_expired(_, DelCount) when DelCount =< 0 -> ok; -dirty_clean_expired('$end_of_table', _DelCount) -> +do_clean_expired(_, DelSize) when DelSize =< 0 -> ok; +do_clean_expired('$end_of_table', _DelSize) -> ok; -dirty_clean_expired(CurKey, DeleteCount) -> - mnesia:dirty_delete(?AUDIT, CurKey), - dirty_clean_expired(mnesia:dirty_next(?AUDIT, CurKey), DeleteCount - 1). +do_clean_expired(CurKey, DeleteSize) -> + mnesia:delete(?AUDIT, CurKey, sticky_write), + do_clean_expired(mnesia:next(?AUDIT, CurKey), DeleteSize - 1). max_size() -> emqx_conf:get([log, audit, max_filter_size], 5000). -%% Try to make the time interval of each node is different. -%% 2 * Interval ~ 3 * Interval (5000~7500) -clean_expired_interval() -> - Interval = ?INTERVAL, - Interval * 2 + erlang:phash2(node(), Interval). +interval(#{role := replicant}) -> hibernate; +interval(#{role := core}) -> ?INTERVAL + rand:uniform(?INTERVAL). + +log_to_file(Level, Meta, #{module := Module} = Handler) -> + Log = #{level => Level, meta => Meta, msg => undefined}, + Handler1 = maps:without(?OWN_KEYS, Handler), + try + erlang:apply(Module, log, [Log, Handler1]) + catch + C:R:S -> + case logger:remove_handler(?AUDIT_HANDLER) of + ok -> + logger:internal_log( + error, {removed_failing_handler, ?AUDIT_HANDLER, C, R, S} + ); + {error, {not_found, _}} -> + ok; + {error, Reason} -> + logger:internal_log( + error, + {removed_handler_failed, ?AUDIT_HANDLER, Reason, C, R, S} + ) + end + end. diff --git a/apps/emqx_audit/src/emqx_audit_api.erl b/apps/emqx_audit/src/emqx_audit_api.erl index aaa364464..81553dca0 100644 --- a/apps/emqx_audit/src/emqx_audit_api.erl +++ b/apps/emqx_audit/src/emqx_audit_api.erl @@ -59,7 +59,7 @@ schema("/audit") -> desc => ?DESC(filter_node) })}, {from, - ?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console, event]), #{ + ?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console]), #{ in => query, required => false, example => <<"dashboard">>, @@ -175,7 +175,7 @@ fields(audit) -> desc => "The node name to which the log is created" })}, {from, - ?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console, event]), #{ + ?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console]), #{ desc => "The source type of the log" })}, {source, diff --git a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl index fe6ddfa96..eebb8f770 100644 --- a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl +++ b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl @@ -49,6 +49,7 @@ init_per_suite(Config) -> emqx_mgmt_api_test_util:init_suite([emqx_ctl, emqx_conf, emqx_audit]), ok = emqx_common_test_helpers:load_config(emqx_enterprise_schema, ?CONF_DEFAULT), emqx_config:save_schema_mod_and_names(emqx_enterprise_schema), + ok = emqx_config_logger:refresh_config(), application:set_env(emqx, boot_modules, []), emqx_conf_cli:load(), Config. @@ -144,7 +145,7 @@ t_max_size(_Config) -> ?assert(Size1 - InitSize >= 100, {Size1, InitSize}), {ok, _} = emqx:update_config([log, audit, max_filter_size], 10), %% wait for clean_expired - timer:sleep(500), + timer:sleep(250), ExpectSize = emqx:get_config([log, audit, max_filter_size]), Size2 = SizeFun(), ?assertEqual(ExpectSize, Size2, {sys:get_state(emqx_audit)}), diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index b47d1f961..a5082a419 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -40,7 +40,10 @@ load() -> emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, [hidden]), emqx_ctl:register_command(?CONF, {?MODULE, conf}, []), - emqx_ctl:register_command(?AUDIT_MOD, {?MODULE, audit}, [hidden]), + case emqx_release:edition() of + ee -> emqx_ctl:register_command(?AUDIT_MOD, {?MODULE, audit}, [hidden]); + ce -> ok + end, ok. unload() -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard.erl b/apps/emqx_dashboard/src/emqx_dashboard.erl index cf4330e34..ab6204235 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard.erl @@ -72,7 +72,7 @@ start_listeners(Listeners) -> base_path => emqx_dashboard_swagger:base_path(), modules => minirest_api:find_api_modules(apps()), authorization => Authorization, - log => fun emqx_dashboard_audit:log/2, + log => audit_log_fun(), security => [#{'basicAuth' => []}, #{'bearerAuth' => []}], swagger_global_spec => GlobalSpec, dispatch => dispatch(), @@ -210,9 +210,17 @@ filter_false(K, V, S) -> [{K, V} | S]. listener_name(Protocol) -> list_to_atom(atom_to_list(Protocol) ++ ":dashboard"). +audit_log_fun() -> + case emqx_release:edition() of + ee -> fun emqx_dashboard_audit:log/2; + ce -> undefined + end. + -if(?EMQX_RELEASE_EDITION =/= ee). + %% dialyzer complains about the `unauthorized_role' clause... -dialyzer({no_match, [authorize/1, api_key_authorize/3]}). + -endif. authorize(Req) -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_audit.erl b/apps/emqx_dashboard/src/emqx_dashboard_audit.erl index c2ef1a99f..4b51b2cb0 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_audit.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_audit.erl @@ -37,10 +37,11 @@ log(#{code := Code, method := Method} = Meta, Req) -> ?AUDIT(level(Method, Code), log_meta(Meta, Req)). log_meta(Meta, Req) -> - #{operation_id := OperationId} = Meta, + #{operation_id := OperationId, method := Method} = Meta, case - lists:member(OperationId, ?HIGH_FREQUENCY_REQUESTS) andalso - ignore_high_frequency_request() + Method =:= get orelse + (lists:member(OperationId, ?HIGH_FREQUENCY_REQUESTS) andalso + ignore_high_frequency_request()) of true -> undefined; @@ -53,7 +54,7 @@ log_meta(Meta, Req) -> source_ip => source_ip(Req), operation_type => operation_type(Meta), %% method for http filter api. - http_method => maps:get(method, Meta), + http_method => Method, http_request => http_request(Meta), http_status_code => maps:get(code, Meta), operation_result => operation_result(Meta), diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index aa6180e23..026a82cbf 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -47,10 +47,6 @@ post_boot() -> ok = ensure_apps_started(), ok = print_vsn(), ok = start_autocluster(), - ?AUDIT(alert, #{ - event => "emqx_start", - from => event - }), ignore. -ifdef(TEST). diff --git a/apps/emqx_machine/src/emqx_machine_terminator.erl b/apps/emqx_machine/src/emqx_machine_terminator.erl index d43d5fea9..28603b9f6 100644 --- a/apps/emqx_machine/src/emqx_machine_terminator.erl +++ b/apps/emqx_machine/src/emqx_machine_terminator.erl @@ -68,8 +68,11 @@ graceful() -> %% @doc Shutdown the Erlang VM and wait indefinitely. graceful_wait() -> ?AUDIT(alert, #{ - event => "emqx_gracefully_stop", - from => event + cmd => emqx, + args => ["stop"], + version => emqx_release:version(), + from => cli, + duration_ms => element(1, erlang:statistics(wall_clock)) }), ok = graceful(), exit_loop(). diff --git a/rel/i18n/emqx_audit_api.hocon b/rel/i18n/emqx_audit_api.hocon index 37080838b..040c0009f 100644 --- a/rel/i18n/emqx_audit_api.hocon +++ b/rel/i18n/emqx_audit_api.hocon @@ -17,13 +17,12 @@ filter_from.desc: `rest_api`: API KEY request logs. `cli`: The emqx command line logs. `erlang_console`: The emqx remote_console run function logs. -`event`: Logs related to events such as emqx_start, emqx_gracefully_stop, audit_enabled, and audit_disabled.""" filter_source.desc: """"Filter logs based on source, Possible values are: The login username when logs are generated from the dashboard. The API Keys when logs are generated from the REST API. -empty string when logs are generated from CLI, Erlang console, or an event.""" +empty string when logs are generated from CLI, Erlang console.""" filter_source_ip.desc: "Filter logs based on source ip when logs are generated from dashboard and REST API." diff --git a/rel/i18n/emqx_conf_schema.hocon b/rel/i18n/emqx_conf_schema.hocon index ff2c3109a..2497244a5 100644 --- a/rel/i18n/emqx_conf_schema.hocon +++ b/rel/i18n/emqx_conf_schema.hocon @@ -726,7 +726,9 @@ audit_handler_level.label: """Log Level""" audit_log_max_filter_limit.desc: -"""Store the latest N log entries in a database for allow `/audit` HTTP API to filter and retrieval of log data.""" +"""Store the latest N log entries in a database for allow `/audit` HTTP API to filter and retrieval of log data. +The interval for purging redundant log records is maintained within a range of 10~20 seconds. +""" audit_log_max_filter_limit.label: """Max Filter Limit"""