diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index 58ebbbf1f..67f125e5f 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -69,20 +69,13 @@ end). _Level_ = _LevelFun_, case logger:compare_levels(_AllowLevel_, _Level_) of _R_ when _R_ == lt; _R_ == eq -> - ?LOG_AUDIT_EVENT(_Level_, _MetaFun_); + emqx_audit:log(_Level_, _MetaFun_); gt -> ok end end end). --define(LOG_AUDIT_EVENT(Level, M), begin - M1 = (M)#{time => logger:timestamp(), level => Level}, - Filter = [{emqx_audit, fun(L, _) -> L end, undefined, undefined}], - emqx_trace:log(Level, Filter, undefined, M1), - emqx_audit:log(M1) -end). - %% print to 'user' group leader -define(ULOG(Fmt, Args), io:format(user, Fmt, Args)). -define(ELOG(Fmt, Args), io:format(standard_error, Fmt, Args)). diff --git a/apps/emqx/src/config/emqx_config_logger.erl b/apps/emqx/src/config/emqx_config_logger.erl index 57502bba8..89e439a2a 100644 --- a/apps/emqx/src/config/emqx_config_logger.erl +++ b/apps/emqx/src/config/emqx_config_logger.erl @@ -118,8 +118,9 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) -> end, ok. +-dialyzer({nowarn_function, [audit/2]}). audit(Event, ?AUDIT_HANDLER) -> - ?LOG_AUDIT_EVENT(alert, #{event => Event, from => event}); + emqx_audit:log(alert, #{event => Event, from => event}); audit(_, _) -> ok. diff --git a/apps/emqx_audit/src/emqx_audit.erl b/apps/emqx_audit/src/emqx_audit.erl index 4b96f00b8..64e76ef9b 100644 --- a/apps/emqx_audit/src/emqx_audit.erl +++ b/apps/emqx_audit/src/emqx_audit.erl @@ -25,8 +25,8 @@ -include("emqx_audit.hrl"). %% API --export([start_link/1]). --export([log/1]). +-export([start_link/0]). +-export([log/1, log/2]). %% gen_server callbacks -export([ @@ -132,14 +132,21 @@ to_audit(#{from := erlang_console, function := F, args := Args}) -> args = iolist_to_binary(io_lib:format("~p: ~p~n", [F, Args])) }. +log(_Level, undefined) -> + ok; +log(Level, Meta1) -> + Meta2 = Meta1#{time => logger:timestamp(), level => Level}, + Filter = [{emqx_audit, fun(L, _) -> L end, undefined, undefined}], + emqx_trace:log(Level, Filter, undefined, Meta2), + emqx_audit:log(Meta2). + log(Log) -> gen_server:cast(?MODULE, {write, to_audit(Log)}). -start_link(Config) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Config], []). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -init([Config]) -> - erlang:process_flag(trap_exit, true), +init([]) -> ok = mria:create_table(?AUDIT, [ {type, ordered_set}, {rlog_shard, ?COMMON_SHARD}, @@ -147,28 +154,25 @@ init([Config]) -> {record_name, ?AUDIT}, {attributes, record_info(fields, ?AUDIT)} ]), - {ok, Config, {continue, setup}}. + {ok, #{}, {continue, setup}}. -handle_continue(setup, #{max_size := MaxSize} = State) -> +handle_continue(setup, #{} = State) -> ok = mria:wait_for_tables([?AUDIT]), - LatestId = latest_id(), - clean_expired(LatestId, MaxSize), - {noreply, State#{latest_id => LatestId}}. + clean_expired(), + {noreply, State}. handle_call(_Request, _From, State = #{}) -> {reply, ok, State}. -handle_cast({write, Log}, State = #{latest_id := LatestId}) -> - NewSeq = LatestId + 1, - Audit = Log#?AUDIT{seq = NewSeq}, - mnesia:dirty_write(?AUDIT, Audit), - {noreply, State#{latest_id => NewSeq}, ?CLEAN_EXPIRED_MS}; +handle_cast({write, Log}, State) -> + _ = write_log(Log), + {noreply, State#{}, ?CLEAN_EXPIRED_MS}; handle_cast(_Request, State = #{}) -> {noreply, State}. -handle_info(timeout, State = #{max_size := MaxSize, latest_id := LatestId}) -> - clean_expired(LatestId, MaxSize), - {noreply, State#{latest_id => latest_id()}, hibernate}; +handle_info(timeout, State = #{}) -> + clean_expired(), + {noreply, State, hibernate}; handle_info(_Info, State = #{}) -> {noreply, State}. @@ -182,7 +186,33 @@ code_change(_OldVsn, State = #{}, _Extra) -> %%% Internal functions %%%=================================================================== -clean_expired(LatestId, MaxSize) -> +write_log(Log) -> + case + mria:transaction( + ?COMMON_SHARD, + fun(L) -> + New = + case mnesia:last(?AUDIT) of + '$end_of_table' -> 1; + LastId -> LastId + 1 + end, + mnesia:write(L#?AUDIT{seq = New}) + end, + [Log] + ) + of + {atomic, ok} -> + ok; + Reason -> + ?SLOG(warning, #{ + msg => "write_audit_log_failed", + reason => Reason + }) + end. + +clean_expired() -> + MaxSize = max_size(), + LatestId = latest_id(), Min = LatestId - MaxSize, %% MS = ets:fun2ms(fun(#?AUDIT{seq = Seq}) when Seq =< Min -> true end), MS = [{#?AUDIT{seq = '$1', _ = '_'}, [{'=<', '$1', Min}], [true]}], @@ -200,3 +230,6 @@ latest_id() -> '$end_of_table' -> 0; Seq -> Seq end. + +max_size() -> + emqx_conf:get([log, audit, max_filter_size], 5000). diff --git a/apps/emqx_audit/src/emqx_audit_sup.erl b/apps/emqx_audit/src/emqx_audit_sup.erl index 460ba90d6..0671a9e0f 100644 --- a/apps/emqx_audit/src/emqx_audit_sup.erl +++ b/apps/emqx_audit/src/emqx_audit_sup.erl @@ -36,7 +36,7 @@ init([]) -> ChildSpecs = [ #{ id => emqx_audit, - start => {emqx_audit, start_link, [#{max_size => 5000}]}, + start => {emqx_audit, start_link, []}, type => worker, restart => transient, shutdown => 1000 diff --git a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl new file mode 100644 index 000000000..6fb860b6e --- /dev/null +++ b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl @@ -0,0 +1,170 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_audit_api_SUITE). +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-define(CONF_DEFAULT, #{ + node => + #{ + name => "emqx1@127.0.0.1", + cookie => "emqxsecretcookie", + data_dir => "data" + }, + log => #{ + audit => + #{ + enable => true, + ignore_high_frequency_request => true, + level => info, + max_filter_size => 15, + rotation_count => 2, + rotation_size => "10MB", + time_offset => "system" + } + } +}). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + _ = application:load(emqx_conf), + emqx_config:erase_all(), + emqx_mgmt_api_test_util:init_suite([emqx_ctl, emqx_conf, emqx_audit]), + ok = emqx_common_test_helpers:load_config(emqx_enterprise_schema, ?CONF_DEFAULT), + emqx_config:save_schema_mod_and_names(emqx_enterprise_schema), + application:set_env(emqx, boot_modules, []), + emqx_conf_cli:load(), + Config. + +end_per_suite(_) -> + emqx_mgmt_api_test_util:end_suite([emqx_audit, emqx_conf, emqx_ctl]). + +t_http_api(_) -> + process_flag(trap_exit, true), + AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + {ok, Zones} = emqx_mgmt_api_configs_SUITE:get_global_zone(), + NewZones = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_qos_allowed">>], Zones, 1), + {ok, #{<<"mqtt">> := Res}} = emqx_mgmt_api_configs_SUITE:update_global_zone(NewZones), + ?assertMatch(#{<<"max_qos_allowed">> := 1}, Res), + {ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader), + ?assertMatch( + #{ + <<"data">> := [ + #{ + <<"from">> := <<"rest_api">>, + <<"operation_id">> := <<"/configs/global_zone">>, + <<"source_ip">> := <<"127.0.0.1">>, + <<"source">> := _, + <<"http_request">> := #{ + <<"method">> := <<"put">>, + <<"body">> := #{<<"mqtt">> := #{<<"max_qos_allowed">> := 1}}, + <<"bindings">> := _, + <<"headers">> := #{<<"authorization">> := <<"******">>} + }, + <<"http_status_code">> := 200, + <<"operation_result">> := <<"success">>, + <<"operation_type">> := <<"configs">> + } + ] + }, + emqx_utils_json:decode(Res1, [return_maps]) + ), + ok. + +t_cli(_Config) -> + ok = emqx_ctl:run_command(["conf", "show", "log"]), + AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + {ok, Res} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader), + #{<<"data">> := Data} = emqx_utils_json:decode(Res, [return_maps]), + ?assertMatch( + [ + #{ + <<"from">> := <<"cli">>, + <<"operation_id">> := <<"">>, + <<"source_ip">> := <<"">>, + <<"operation_type">> := <<"conf">>, + <<"args">> := [<<"show">>, <<"log">>], + <<"node">> := _, + <<"source">> := <<"">>, + <<"http_request">> := <<"">> + } + ], + Data + ), + + %% check filter + {ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "from=cli", AuthHeader), + #{<<"data">> := Data1} = emqx_utils_json:decode(Res1, [return_maps]), + ?assertEqual(Data, Data1), + {ok, Res2} = emqx_mgmt_api_test_util:request_api( + get, AuditPath, "from=erlang_console", AuthHeader + ), + ?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res2, [return_maps])), + ok. + +t_kickout_clients_without_log(_) -> + process_flag(trap_exit, true), + AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]), + {ok, AuditLogs1} = emqx_mgmt_api_test_util:request_api(get, AuditPath), + kickout_clients(), + {ok, AuditLogs2} = emqx_mgmt_api_test_util:request_api(get, AuditPath), + ?assertEqual(AuditLogs1, AuditLogs2), + ok. + +kickout_clients() -> + ClientId1 = <<"client1">>, + ClientId2 = <<"client2">>, + ClientId3 = <<"client3">>, + + {ok, C1} = emqtt:start_link(#{ + clientid => ClientId1, + proto_ver => v5, + properties => #{'Session-Expiry-Interval' => 120} + }), + {ok, _} = emqtt:connect(C1), + {ok, C2} = emqtt:start_link(#{clientid => ClientId2}), + {ok, _} = emqtt:connect(C2), + {ok, C3} = emqtt:start_link(#{clientid => ClientId3}), + {ok, _} = emqtt:connect(C3), + + timer:sleep(300), + + %% get /clients + ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]), + {ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), + ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]), + ClientsMeta = maps:get(<<"meta">>, ClientsResponse), + ClientsPage = maps:get(<<"page">>, ClientsMeta), + ClientsLimit = maps:get(<<"limit">>, ClientsMeta), + ClientsCount = maps:get(<<"count">>, ClientsMeta), + ?assertEqual(ClientsPage, 1), + ?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()), + ?assertEqual(ClientsCount, 3), + + %% kickout clients + KickoutPath = emqx_mgmt_api_test_util:api_path(["clients", "kickout", "bulk"]), + KickoutBody = [ClientId1, ClientId2, ClientId3], + {ok, 204, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody), + + {ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath), + ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]), + ?assertMatch(#{<<"meta">> := #{<<"count">> := 0}}, ClientsResponse2). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_audit.erl b/apps/emqx_dashboard/src/emqx_dashboard_audit.erl index 4d3f6209e..704e849bc 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_audit.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_audit.erl @@ -20,15 +20,15 @@ %% API -export([log/2]). -%% todo filter high frequency events --define(HIGH_FREQUENCY_EVENTS, [ - mqtt_subscribe, - mqtt_unsubscribe, - mqtt_subscribe_batch, - mqtt_unsubscribe_batch, - mqtt_publish, - mqtt_publish_batch, - kickout_client +%% filter high frequency events +-define(HIGH_FREQUENCY_REQUESTS, [ + <<"/clients/:clientid/publish">>, + <<"/clients/:clientid/subscribe">>, + <<"/clients/:clientid/unsubscribe">>, + <<"/clients/:clientid/publish/bulk">>, + <<"/clients/:clientid/unsubscribe/bulk">>, + <<"/clients/:clientid/subscribe/bulk">>, + <<"/clients/kickout/bulk">> ]). log(#{code := Code, method := Method} = Meta, Req) -> @@ -36,22 +36,31 @@ log(#{code := Code, method := Method} = Meta, Req) -> ?AUDIT(level(Method, Code), log_meta(Meta, Req)). log_meta(Meta, Req) -> - Meta1 = #{ - time => logger:timestamp(), - from => from(Meta), - source => source(Meta), - duration_ms => duration_ms(Meta), - source_ip => source_ip(Req), - operation_type => operation_type(Meta), - %% method for http filter api. - http_method => maps:get(method, Meta), - http_request => http_request(Meta), - http_status_code => maps:get(code, Meta), - operation_result => operation_result(Meta), - node => node() - }, - Meta2 = maps:without([req_start, req_end, method, headers, body, bindings, code], Meta), - emqx_utils:redact(maps:merge(Meta2, Meta1)). + #{operation_id := OperationId} = Meta, + case + lists:member(OperationId, ?HIGH_FREQUENCY_REQUESTS) andalso + ignore_high_frequency_request() + of + true -> + undefined; + false -> + Meta1 = #{ + time => logger:timestamp(), + from => from(Meta), + source => source(Meta), + duration_ms => duration_ms(Meta), + source_ip => source_ip(Req), + operation_type => operation_type(Meta), + %% method for http filter api. + http_method => maps:get(method, Meta), + http_request => http_request(Meta), + http_status_code => maps:get(code, Meta), + operation_result => operation_result(Meta), + node => node() + }, + Meta2 = maps:without([req_start, req_end, method, headers, body, bindings, code], Meta), + emqx_utils:redact(maps:merge(Meta2, Meta1)) + end. duration_ms(#{req_start := ReqStart, req_end := ReqEnd}) -> erlang:convert_time_unit(ReqEnd - ReqStart, native, millisecond). @@ -84,8 +93,10 @@ source_ip(Req) -> operation_type(Meta) -> case maps:find(operation_id, Meta) of - {ok, OperationId} -> lists:nth(2, binary:split(OperationId, <<"/">>)); - _ -> <<"unknown">> + {ok, OperationId} -> + lists:nth(2, binary:split(OperationId, <<"/">>, [global])); + _ -> + <<"unknown">> end. http_request(Meta) -> @@ -99,3 +110,6 @@ level(_, Code) when Code >= 200 andalso Code < 300 -> info; level(_, Code) when Code >= 300 andalso Code < 400 -> warning; level(_, Code) when Code >= 400 andalso Code < 500 -> error; level(_, _) -> critical. + +ignore_high_frequency_request() -> + emqx_conf:get([log, audit, ignore_high_frequency_request], true). diff --git a/apps/emqx_enterprise/src/emqx_enterprise.app.src b/apps/emqx_enterprise/src/emqx_enterprise.app.src index 37d31c5ec..1a5359db6 100644 --- a/apps/emqx_enterprise/src/emqx_enterprise.app.src +++ b/apps/emqx_enterprise/src/emqx_enterprise.app.src @@ -1,6 +1,6 @@ {application, emqx_enterprise, [ {description, "EMQX Enterprise Edition"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_enterprise/src/emqx_enterprise_schema.erl b/apps/emqx_enterprise/src/emqx_enterprise_schema.erl index 0cf5850c8..5537c3259 100644 --- a/apps/emqx_enterprise/src/emqx_enterprise_schema.erl +++ b/apps/emqx_enterprise/src/emqx_enterprise_schema.erl @@ -78,6 +78,24 @@ fields("log_audit_handler") -> desc => ?DESC(emqx_conf_schema, "log_file_handler_max_size"), importance => ?IMPORTANCE_MEDIUM } + )}, + {"max_filter_size", + hoconsc:mk( + range(10, 30000), + #{ + default => 5000, + desc => ?DESC(emqx_conf_schema, "audit_log_max_filter_limit"), + importance => ?IMPORTANCE_MEDIUM + } + )}, + {"ignore_high_frequency_request", + hoconsc:mk( + boolean(), + #{ + default => true, + desc => ?DESC(emqx_conf_schema, "audit_log_ignore_high_frequency_request"), + importance => ?IMPORTANCE_MEDIUM + } )} ] ++ CommonConfs1; fields(Name) -> diff --git a/mix.exs b/mix.exs index 3551951fd..a088d89f7 100644 --- a/mix.exs +++ b/mix.exs @@ -58,7 +58,7 @@ defmodule EMQXUmbrella.MixProject do {:ekka, github: "emqx/ekka", tag: "0.15.16", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.2.0", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true}, - {:minirest, github: "emqx/minirest", tag: "1.3.13", override: true}, + {:minirest, github: "emqx/minirest", tag: "1.3.14", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.4", override: true}, {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, diff --git a/rebar.config b/rebar.config index e2e1a7cf0..9f66553cc 100644 --- a/rebar.config +++ b/rebar.config @@ -65,7 +65,7 @@ , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.16"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.2.0"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}} - , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.13"}}} + , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.14"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}} , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} diff --git a/rel/i18n/emqx_conf_schema.hocon b/rel/i18n/emqx_conf_schema.hocon index fde0f7ff3..ff05eaf6a 100644 --- a/rel/i18n/emqx_conf_schema.hocon +++ b/rel/i18n/emqx_conf_schema.hocon @@ -725,6 +725,19 @@ audit_handler_level.desc: audit_handler_level.label: """Log Level""" +audit_log_max_filter_limit.desc: +"""Maximum size of the filter.""" + +audit_log_max_filter_limit.label: +"""Max Filter Limit""" + +audit_log_ignore_high_frequency_request.desc: +"""Ignore high frequency requests to avoid flooding the audit log. +such publish/subscribe kickout http api requests are ignored.""" + +audit_log_ignore_high_frequency_request.label: +"""Ignore High Frequency Request""" + desc_rpc.desc: """EMQX uses a library called gen_rpc for inter-broker communication.
Most of the time the default config should work,