diff --git a/apps/emqx/include/http_api.hrl b/apps/emqx/include/http_api.hrl
index ba1438374..0f6372584 100644
--- a/apps/emqx/include/http_api.hrl
+++ b/apps/emqx/include/http_api.hrl
@@ -17,6 +17,7 @@
%% HTTP API Auth
-define(BAD_USERNAME_OR_PWD, 'BAD_USERNAME_OR_PWD').
-define(BAD_API_KEY_OR_SECRET, 'BAD_API_KEY_OR_SECRET').
+-define(API_KEY_NOT_ALLOW_MSG, <<"This API Key don't have permission to access this resource">>).
%% Bad Request
-define(BAD_REQUEST, 'BAD_REQUEST').
diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl
index d803f67be..67f125e5f 100644
--- a/apps/emqx/include/logger.hrl
+++ b/apps/emqx/include/logger.hrl
@@ -61,19 +61,15 @@
)
end).
--define(AUDIT(_Level_, _From_, _Meta_), begin
+-define(AUDIT(_LevelFun_, _MetaFun_), begin
case emqx_config:get([log, audit], #{enable => false}) of
#{enable := false} ->
ok;
#{enable := true, level := _AllowLevel_} ->
+ _Level_ = _LevelFun_,
case logger:compare_levels(_AllowLevel_, _Level_) of
_R_ when _R_ == lt; _R_ == eq ->
- emqx_trace:log(
- _Level_,
- [{emqx_audit, fun(L, _) -> L end, undefined, undefined}],
- _Msg = undefined,
- _Meta_#{from => _From_}
- );
+ emqx_audit:log(_Level_, _MetaFun_);
gt ->
ok
end
diff --git a/apps/emqx/src/config/emqx_config_logger.erl b/apps/emqx/src/config/emqx_config_logger.erl
index c675edb52..87baef627 100644
--- a/apps/emqx/src/config/emqx_config_logger.erl
+++ b/apps/emqx/src/config/emqx_config_logger.erl
@@ -23,6 +23,8 @@
-export([post_config_update/5]).
-export([filter_audit/2]).
+-include("logger.hrl").
+
-define(LOG, [log]).
-define(AUDIT_HANDLER, emqx_audit).
@@ -96,6 +98,7 @@ update_log_handlers(NewHandlers) ->
ok.
update_log_handler({removed, Id}) ->
+ audit("audit_disabled", Id),
log_to_console("Config override: ~s is removed~n", [id_for_log(Id)]),
logger:remove_handler(Id);
update_log_handler({Action, {handler, Id, Mod, Conf}}) ->
@@ -104,6 +107,7 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) ->
_ = logger:remove_handler(Id),
case logger:add_handler(Id, Mod, Conf) of
ok ->
+ audit("audit_enabled", Id),
ok;
%% Don't crash here, otherwise the cluster rpc will retry the wrong handler forever.
{error, Reason} ->
@@ -114,6 +118,23 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) ->
end,
ok.
+-ifdef(EMQX_RELEASE_EDITION).
+
+-if(?EMQX_RELEASE_EDITION == ee).
+audit(Event, ?AUDIT_HANDLER) ->
+ emqx_audit:log(alert, #{event => Event, from => event});
+audit(_, _) ->
+ ok.
+-else.
+audit(_, _) ->
+ ok.
+-endif.
+
+-else.
+audit(_, _) ->
+ ok.
+-endif.
+
id_for_log(console) -> "log.console";
id_for_log(Other) -> "log.file." ++ atom_to_list(Other).
diff --git a/apps/emqx_audit/BSL.txt b/apps/emqx_audit/BSL.txt
new file mode 100644
index 000000000..0acc0e696
--- /dev/null
+++ b/apps/emqx_audit/BSL.txt
@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor: Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work: EMQX Enterprise Edition
+ The Licensed Work is (c) 2023
+ Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+ modify, and create derivative work for research
+ or education.
+Change Date: 2027-02-01
+Change License: Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+ or a license that is compatible with GPL Version 2.0 or a later version,
+ where “compatible” means that software provided under the Change License can
+ be included in a program with software provided under GPL Version 2.0 or a
+ later version. Licensor may specify additional Change Licenses without
+ limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+ impose any additional restriction on the right granted in this License, as
+ the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.
diff --git a/apps/emqx_audit/README.md b/apps/emqx_audit/README.md
new file mode 100644
index 000000000..48c625ed5
--- /dev/null
+++ b/apps/emqx_audit/README.md
@@ -0,0 +1,5 @@
+emqx_audit
+=====
+
+Audit log for EMQX, empowers users to efficiently access the desired audit trail data
+and facilitates auditing, compliance, troubleshooting, and security analysis.
diff --git a/apps/emqx_audit/include/emqx_audit.hrl b/apps/emqx_audit/include/emqx_audit.hrl
new file mode 100644
index 000000000..1b4349387
--- /dev/null
+++ b/apps/emqx_audit/include/emqx_audit.hrl
@@ -0,0 +1,27 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-define(AUDIT, emqx_audit).
+
+-record(?AUDIT, {
+ seq,
+ %% basic info
+ created_at,
+ node,
+ from,
+ source,
+ source_ip,
+ %% operation info
+ operation_id,
+ operation_type,
+ args,
+ operation_result,
+ failure,
+ %% request detail
+ http_method,
+ http_request,
+ http_status_code,
+ duration_ms,
+ extra
+}).
diff --git a/apps/emqx_audit/rebar.config b/apps/emqx_audit/rebar.config
new file mode 100644
index 000000000..fac0f9b07
--- /dev/null
+++ b/apps/emqx_audit/rebar.config
@@ -0,0 +1,5 @@
+{erl_opts, [debug_info]}.
+{deps, [
+ {emqx, {path, "../emqx"}},
+ {emqx_utils, {path, "../emqx_utils"}}
+]}.
diff --git a/apps/emqx_audit/src/emqx_audit.app.src b/apps/emqx_audit/src/emqx_audit.app.src
new file mode 100644
index 000000000..96cdd11ce
--- /dev/null
+++ b/apps/emqx_audit/src/emqx_audit.app.src
@@ -0,0 +1,10 @@
+{application, emqx_audit, [
+ {description, "Audit log for EMQX"},
+ {vsn, "0.1.0"},
+ {registered, []},
+ {mod, {emqx_audit_app, []}},
+ {applications, [kernel, stdlib, emqx]},
+ {env, []},
+ {modules, []},
+ {links, []}
+]}.
diff --git a/apps/emqx_audit/src/emqx_audit.erl b/apps/emqx_audit/src/emqx_audit.erl
new file mode 100644
index 000000000..4477bbd8b
--- /dev/null
+++ b/apps/emqx_audit/src/emqx_audit.erl
@@ -0,0 +1,221 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_audit).
+
+-behaviour(gen_server).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include("emqx_audit.hrl").
+
+%% API
+-export([start_link/0]).
+-export([log/1, log/2]).
+
+%% gen_server callbacks
+-export([
+ init/1,
+ handle_continue/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+]).
+
+-define(FILTER_REQ, [cert, host_info, has_sent_resp, pid, path_info, peer, ref, sock, streamid]).
+-define(CLEAN_EXPIRED_MS, 60 * 1000).
+
+to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) ->
+ #?AUDIT{
+ created_at = erlang:system_time(microsecond),
+ node = node(),
+ operation_id = <<"">>,
+ operation_type = atom_to_binary(Cmd),
+ args = Args,
+ operation_result = <<"">>,
+ failure = <<"">>,
+ duration_ms = DurationMs,
+ from = cli,
+ source = <<"">>,
+ source_ip = <<"">>,
+ http_status_code = <<"">>,
+ http_method = <<"">>,
+ http_request = <<"">>
+ };
+to_audit(#{http_method := get}) ->
+ ok;
+to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api ->
+ #{
+ source := Source,
+ source_ip := SourceIp,
+ %% operation info
+ operation_id := OperationId,
+ operation_type := OperationType,
+ operation_result := OperationResult,
+ %% request detail
+ http_status_code := StatusCode,
+ http_method := Method,
+ http_request := Request,
+ duration_ms := DurationMs
+ } = Log,
+ #?AUDIT{
+ created_at = erlang:system_time(microsecond),
+ node = node(),
+ from = From,
+ source = Source,
+ source_ip = SourceIp,
+ %% operation info
+ operation_id = OperationId,
+ operation_type = OperationType,
+ operation_result = OperationResult,
+ failure = maps:get(failure, Log, <<"">>),
+ %% request detail
+ http_status_code = StatusCode,
+ http_method = Method,
+ http_request = Request,
+ duration_ms = DurationMs,
+ args = <<"">>
+ };
+to_audit(#{from := event, event := Event}) ->
+ #?AUDIT{
+ created_at = erlang:system_time(microsecond),
+ node = node(),
+ from = event,
+ source = <<"">>,
+ source_ip = <<"">>,
+ %% operation info
+ operation_id = iolist_to_binary(Event),
+ operation_type = <<"">>,
+ operation_result = <<"">>,
+ failure = <<"">>,
+ %% request detail
+ http_status_code = <<"">>,
+ http_method = <<"">>,
+ http_request = <<"">>,
+ duration_ms = 0,
+ args = <<"">>
+ };
+to_audit(#{from := erlang_console, function := F, args := Args}) ->
+ #?AUDIT{
+ created_at = erlang:system_time(microsecond),
+ node = node(),
+ from = erlang_console,
+ source = <<"">>,
+ source_ip = <<"">>,
+ %% operation info
+ operation_id = <<"">>,
+ operation_type = <<"">>,
+ operation_result = <<"">>,
+ failure = <<"">>,
+ %% request detail
+ http_status_code = <<"">>,
+ http_method = <<"">>,
+ http_request = <<"">>,
+ duration_ms = 0,
+ args = iolist_to_binary(io_lib:format("~p: ~p~n", [F, Args]))
+ }.
+
+log(_Level, undefined) ->
+ ok;
+log(Level, Meta1) ->
+ Meta2 = Meta1#{time => logger:timestamp(), level => Level},
+ Filter = [{emqx_audit, fun(L, _) -> L end, undefined, undefined}],
+ emqx_trace:log(Level, Filter, undefined, Meta2),
+ emqx_audit:log(Meta2).
+
+log(Log) ->
+ gen_server:cast(?MODULE, {write, to_audit(Log)}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+ ok = mria:create_table(?AUDIT, [
+ {type, ordered_set},
+ {rlog_shard, ?COMMON_SHARD},
+ {storage, disc_copies},
+ {record_name, ?AUDIT},
+ {attributes, record_info(fields, ?AUDIT)}
+ ]),
+ {ok, #{}, {continue, setup}}.
+
+handle_continue(setup, #{} = State) ->
+ ok = mria:wait_for_tables([?AUDIT]),
+ clean_expired(),
+ {noreply, State}.
+
+handle_call(_Request, _From, State = #{}) ->
+ {reply, ok, State}.
+
+handle_cast({write, Log}, State) ->
+ _ = write_log(Log),
+ {noreply, State#{}, ?CLEAN_EXPIRED_MS};
+handle_cast(_Request, State = #{}) ->
+ {noreply, State}.
+
+handle_info(timeout, State = #{}) ->
+ clean_expired(),
+ {noreply, State, hibernate};
+handle_info(_Info, State = #{}) ->
+ {noreply, State}.
+
+terminate(_Reason, _State = #{}) ->
+ ok.
+
+code_change(_OldVsn, State = #{}, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+write_log(Log) ->
+ case
+ mria:transaction(
+ ?COMMON_SHARD,
+ fun(L) ->
+ New =
+ case mnesia:last(?AUDIT) of
+ '$end_of_table' -> 1;
+ LastId -> LastId + 1
+ end,
+ mnesia:write(L#?AUDIT{seq = New})
+ end,
+ [Log]
+ )
+ of
+ {atomic, ok} ->
+ ok;
+ Reason ->
+ ?SLOG(warning, #{
+ msg => "write_audit_log_failed",
+ reason => Reason
+ })
+ end.
+
+clean_expired() ->
+ MaxSize = max_size(),
+ LatestId = latest_id(),
+ Min = LatestId - MaxSize,
+ %% MS = ets:fun2ms(fun(#?AUDIT{seq = Seq}) when Seq =< Min -> true end),
+ MS = [{#?AUDIT{seq = '$1', _ = '_'}, [{'=<', '$1', Min}], [true]}],
+ NumDeleted = mnesia:ets(fun ets:select_delete/2, [?AUDIT, MS]),
+ ?SLOG(debug, #{
+ msg => "clean_audit_log",
+ latest_id => LatestId,
+ min => Min,
+ deleted_number => NumDeleted
+ }),
+ ok.
+
+latest_id() ->
+ case mnesia:dirty_last(?AUDIT) of
+ '$end_of_table' -> 0;
+ Seq -> Seq
+ end.
+
+max_size() ->
+ emqx_conf:get([log, audit, max_filter_size], 5000).
diff --git a/apps/emqx_audit/src/emqx_audit_api.erl b/apps/emqx_audit/src/emqx_audit_api.erl
new file mode 100644
index 000000000..aaa364464
--- /dev/null
+++ b/apps/emqx_audit/src/emqx_audit_api.erl
@@ -0,0 +1,386 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_audit_api).
+
+-behaviour(minirest_api).
+
+%% API
+-export([api_spec/0, paths/0, schema/1, namespace/0, fields/1]).
+-export([audit/2]).
+-export([qs2ms/2, format/1]).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include("emqx_audit.hrl").
+
+-import(hoconsc, [mk/2, ref/2, array/1]).
+
+-define(TAGS, ["Audit"]).
+
+-define(AUDIT_QS_SCHEMA, [
+ {<<"node">>, atom},
+ {<<"from">>, atom},
+ {<<"source">>, binary},
+ {<<"source_ip">>, binary},
+ {<<"operation_id">>, binary},
+ {<<"operation_type">>, binary},
+ {<<"operation_result">>, atom},
+ {<<"http_status_code">>, integer},
+ {<<"http_method">>, atom},
+ {<<"gte_created_at">>, timestamp},
+ {<<"lte_created_at">>, timestamp},
+ {<<"gte_duration_ms">>, timestamp},
+ {<<"lte_duration_ms">>, timestamp}
+]).
+
+namespace() -> "audit".
+
+api_spec() ->
+ emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+ ["/audit"].
+
+schema("/audit") ->
+ #{
+ 'operationId' => audit,
+ get => #{
+ tags => ?TAGS,
+ description => ?DESC(audit_get),
+ parameters => [
+ {node,
+ ?HOCON(binary(), #{
+ in => query,
+ required => false,
+ example => <<"emqx@127.0.0.1">>,
+ desc => ?DESC(filter_node)
+ })},
+ {from,
+ ?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console, event]), #{
+ in => query,
+ required => false,
+ example => <<"dashboard">>,
+ desc => ?DESC(filter_from)
+ })},
+ {source,
+ ?HOCON(binary(), #{
+ in => query,
+ required => false,
+ example => <<"admin">>,
+ desc => ?DESC(filter_source)
+ })},
+ {source_ip,
+ ?HOCON(binary(), #{
+ in => query,
+ required => false,
+ example => <<"127.0.0.1">>,
+ desc => ?DESC(filter_source_ip)
+ })},
+ {operation_id,
+ ?HOCON(binary(), #{
+ in => query,
+ required => false,
+ example => <<"/rules/{id}">>,
+ desc => ?DESC(filter_operation_id)
+ })},
+ {operation_type,
+ ?HOCON(binary(), #{
+ in => query,
+ example => <<"rules">>,
+ required => false,
+ desc => ?DESC(filter_operation_type)
+ })},
+ {operation_result,
+ ?HOCON(?ENUM([success, failure]), #{
+ in => query,
+ example => failure,
+ required => false,
+ desc => ?DESC(filter_operation_result)
+ })},
+ {http_status_code,
+ ?HOCON(integer(), #{
+ in => query,
+ example => 200,
+ required => false,
+ desc => ?DESC(filter_http_status_code)
+ })},
+ {http_method,
+ ?HOCON(?ENUM([post, put, delete]), #{
+ in => query,
+ example => post,
+ required => false,
+ desc => ?DESC(filter_http_method)
+ })},
+ {gte_duration_ms,
+ ?HOCON(integer(), #{
+ in => query,
+ example => 0,
+ required => false,
+ desc => ?DESC(filter_gte_duration_ms)
+ })},
+ {lte_duration_ms,
+ ?HOCON(integer(), #{
+ in => query,
+ example => 1000,
+ required => false,
+ desc => ?DESC(filter_lte_duration_ms)
+ })},
+ {gte_created_at,
+ ?HOCON(emqx_utils_calendar:epoch_millisecond(), #{
+ in => query,
+ required => false,
+ example => <<"2023-10-15T00:00:00.820384+08:00">>,
+ desc => ?DESC(filter_gte_created_at)
+ })},
+ {lte_created_at,
+ ?HOCON(emqx_utils_calendar:epoch_millisecond(), #{
+ in => query,
+ example => <<"2023-10-16T00:00:00.820384+08:00">>,
+ required => false,
+ desc => ?DESC(filter_lte_created_at)
+ })},
+ ref(emqx_dashboard_swagger, page),
+ ref(emqx_dashboard_swagger, limit)
+ ],
+ summary => <<"List audit logs">>,
+ responses => #{
+ 200 =>
+ emqx_dashboard_swagger:schema_with_example(
+ array(?REF(audit_list)),
+ audit_log_list_example()
+ )
+ }
+ }
+ }.
+
+fields(audit_list) ->
+ [
+ {data, mk(array(?REF(audit)), #{desc => ?DESC("audit_resp")})},
+ {meta, mk(ref(emqx_dashboard_swagger, meta), #{})}
+ ];
+fields(audit) ->
+ [
+ {created_at,
+ ?HOCON(
+ emqx_utils_calendar:epoch_millisecond(),
+ #{
+ desc => "The time when the log is created"
+ }
+ )},
+ {node,
+ ?HOCON(binary(), #{
+ desc => "The node name to which the log is created"
+ })},
+ {from,
+ ?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console, event]), #{
+ desc => "The source type of the log"
+ })},
+ {source,
+ ?HOCON(binary(), #{
+ desc => "The source of the log"
+ })},
+ {source_ip,
+ ?HOCON(binary(), #{
+ desc => "The source ip of the log"
+ })},
+ {operation_id,
+ ?HOCON(binary(), #{
+ desc => "The operation id of the log"
+ })},
+ {operation_type,
+ ?HOCON(binary(), #{
+ desc => "The operation type of the log"
+ })},
+ {operation_result,
+ ?HOCON(?ENUM([success, failure]), #{
+ desc => "The operation result of the log"
+ })},
+ {http_status_code,
+ ?HOCON(integer(), #{
+ desc => "The http status code of the log"
+ })},
+ {http_method,
+ ?HOCON(?ENUM([post, put, delete]), #{
+ desc => "The http method of the log"
+ })},
+ {duration_ms,
+ ?HOCON(integer(), #{
+ desc => "The duration of the log"
+ })},
+ {args,
+ ?HOCON(?ARRAY(binary()), #{
+ desc => "The args of the log"
+ })},
+ {failure,
+ ?HOCON(?ARRAY(binary()), #{
+ desc => "The failure of the log"
+ })},
+ {http_request,
+ ?HOCON(?REF(http_request), #{
+ desc => "The http request of the log"
+ })}
+ ];
+fields(http_request) ->
+ [
+ {bindings, ?HOCON(map(), #{})},
+ {body, ?HOCON(map(), #{})},
+ {headers, ?HOCON(map(), #{})},
+ {method, ?HOCON(?ENUM([post, put, delete]), #{})}
+ ].
+
+audit(get, #{query_string := QueryString}) ->
+ case
+ emqx_mgmt_api:node_query(
+ node(),
+ ?AUDIT,
+ QueryString,
+ ?AUDIT_QS_SCHEMA,
+ fun ?MODULE:qs2ms/2,
+ fun ?MODULE:format/1
+ )
+ of
+ {error, page_limit_invalid} ->
+ {400, #{code => 'BAD_REQUEST', message => <<"page_limit_invalid">>}};
+ {error, Node, Error} ->
+ Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
+ {500, #{code => <<"NODE_DOWN">>, message => Message}};
+ Result ->
+ {200, Result}
+ end.
+
+qs2ms(_Tab, {Qs, _}) ->
+ #{
+ match_spec => gen_match_spec(Qs, #?AUDIT{_ = '_'}, []),
+ fuzzy_fun => undefined
+ }.
+
+gen_match_spec([], Audit, Conn) ->
+ [{Audit, Conn, ['$_']}];
+gen_match_spec([{node, '=:=', T} | Qs], Audit, Conn) ->
+ gen_match_spec(Qs, Audit#?AUDIT{node = T}, Conn);
+gen_match_spec([{from, '=:=', T} | Qs], Audit, Conn) ->
+ gen_match_spec(Qs, Audit#?AUDIT{from = T}, Conn);
+gen_match_spec([{source, '=:=', T} | Qs], Audit, Conn) ->
+ gen_match_spec(Qs, Audit#?AUDIT{source = T}, Conn);
+gen_match_spec([{source_ip, '=:=', T} | Qs], Audit, Conn) ->
+ gen_match_spec(Qs, Audit#?AUDIT{source_ip = T}, Conn);
+gen_match_spec([{operation_id, '=:=', T} | Qs], Audit, Conn) ->
+ gen_match_spec(Qs, Audit#?AUDIT{operation_id = T}, Conn);
+gen_match_spec([{operation_type, '=:=', T} | Qs], Audit, Conn) ->
+ gen_match_spec(Qs, Audit#?AUDIT{operation_type = T}, Conn);
+gen_match_spec([{operation_result, '=:=', T} | Qs], Audit, Conn) ->
+ gen_match_spec(Qs, Audit#?AUDIT{operation_result = T}, Conn);
+gen_match_spec([{http_status_code, '=:=', T} | Qs], Audit, Conn) ->
+ gen_match_spec(Qs, Audit#?AUDIT{http_status_code = T}, Conn);
+gen_match_spec([{http_method, '=:=', T} | Qs], Audit, Conn) ->
+ gen_match_spec(Qs, Audit#?AUDIT{http_method = T}, Conn);
+gen_match_spec([{created_at, Hold, T} | Qs], Audit, Conn) ->
+ gen_match_spec(Qs, Audit#?AUDIT{created_at = '$1'}, [{'$1', Hold, T} | Conn]);
+gen_match_spec([{created_at, Hold1, T1, Hold2, T2} | Qs], Audit, Conn) ->
+ gen_match_spec(Qs, Audit#?AUDIT{created_at = '$1'}, [
+ {'$1', Hold1, T1}, {'$1', Hold2, T2} | Conn
+ ]);
+gen_match_spec([{duration_ms, Hold, T} | Qs], Audit, Conn) ->
+ gen_match_spec(Qs, Audit#?AUDIT{duration_ms = '$2'}, [{'$2', Hold, T} | Conn]);
+gen_match_spec([{duration_ms, Hold1, T1, Hold2, T2} | Qs], Audit, Conn) ->
+ gen_match_spec(Qs, Audit#?AUDIT{duration_ms = '$2'}, [
+ {'$2', Hold1, T1}, {'$2', Hold2, T2} | Conn
+ ]).
+
+format(Audit) ->
+ #?AUDIT{
+ created_at = CreatedAt,
+ node = Node,
+ from = From,
+ source = Source,
+ source_ip = SourceIp,
+ operation_id = OperationId,
+ operation_type = OperationType,
+ operation_result = OperationResult,
+ http_status_code = HttpStatusCode,
+ http_method = HttpMethod,
+ duration_ms = DurationMs,
+ args = Args,
+ failure = Failure,
+ http_request = HttpRequest
+ } = Audit,
+ #{
+ created_at => emqx_utils_calendar:epoch_to_rfc3339(CreatedAt, microsecond),
+ node => Node,
+ from => From,
+ source => Source,
+ source_ip => SourceIp,
+ operation_id => OperationId,
+ operation_type => OperationType,
+ operation_result => OperationResult,
+ http_status_code => HttpStatusCode,
+ http_method => HttpMethod,
+ duration_ms => DurationMs,
+ args => Args,
+ failure => Failure,
+ http_request => HttpRequest
+ }.
+
+audit_log_list_example() ->
+ #{
+ data => [api_example(), cli_example()],
+ meta => #{
+ <<"count">> => 2,
+ <<"hasnext">> => false,
+ <<"limit">> => 50,
+ <<"page">> => 1
+ }
+ }.
+
+api_example() ->
+ #{
+ <<"args">> => "",
+ <<"created_at">> => "2023-10-17T10:41:20.383993+08:00",
+ <<"duration_ms">> => 0,
+ <<"failure">> => "",
+ <<"from">> => "dashboard",
+ <<"http_method">> => "post",
+ <<"http_request">> => #{
+ <<"bindings">> => #{},
+ <<"body">> => #{
+ <<"password">> => "******",
+ <<"username">> => "admin"
+ },
+ <<"headers">> => #{
+ <<"accept">> => "*/*",
+ <<"authorization">> => "******",
+ <<"connection">> => "keep-alive",
+ <<"content-length">> => "45",
+ <<"content-type">> => "application/json"
+ },
+ <<"method">> => "post"
+ },
+ <<"http_status_code">> => 200,
+ <<"node">> => "emqx@127.0.0.1",
+ <<"operation_id">> => "/login",
+ <<"operation_result">> => "success",
+ <<"operation_type">> => "login",
+ <<"source">> => "admin",
+ <<"source_ip">> => "127.0.0.1"
+ }.
+
+cli_example() ->
+ #{
+ <<"args">> => [<<"show">>, <<"log">>],
+ <<"created_at">> => "2023-10-17T10:45:13.100426+08:00",
+ <<"duration_ms">> => 7,
+ <<"failure">> => "",
+ <<"from">> => "cli",
+ <<"http_method">> => "",
+ <<"http_request">> => "",
+ <<"http_status_code">> => "",
+ <<"node">> => "emqx@127.0.0.1",
+ <<"operation_id">> => "",
+ <<"operation_result">> => "",
+ <<"operation_type">> => "conf",
+ <<"source">> => "",
+ <<"source_ip">> => ""
+ }.
diff --git a/apps/emqx_audit/src/emqx_audit_app.erl b/apps/emqx_audit/src/emqx_audit_app.erl
new file mode 100644
index 000000000..aa8fa1a39
--- /dev/null
+++ b/apps/emqx_audit/src/emqx_audit_app.erl
@@ -0,0 +1,15 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_audit_app).
+
+-behaviour(application).
+
+-export([start/2, stop/1]).
+
+start(_StartType, _StartArgs) ->
+ emqx_audit_sup:start_link().
+
+stop(_State) ->
+ ok.
diff --git a/apps/emqx_audit/src/emqx_audit_sup.erl b/apps/emqx_audit/src/emqx_audit_sup.erl
new file mode 100644
index 000000000..b3a5ca985
--- /dev/null
+++ b/apps/emqx_audit/src/emqx_audit_sup.erl
@@ -0,0 +1,33 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_audit_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+start_link() ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+init([]) ->
+ SupFlags = #{
+ strategy => one_for_all,
+ intensity => 10,
+ period => 10
+ },
+ ChildSpecs = [
+ #{
+ id => emqx_audit,
+ start => {emqx_audit, start_link, []},
+ type => worker,
+ restart => transient,
+ shutdown => 1000
+ }
+ ],
+ {ok, {SupFlags, ChildSpecs}}.
diff --git a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl
new file mode 100644
index 000000000..6fb860b6e
--- /dev/null
+++ b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl
@@ -0,0 +1,170 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_audit_api_SUITE).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(CONF_DEFAULT, #{
+ node =>
+ #{
+ name => "emqx1@127.0.0.1",
+ cookie => "emqxsecretcookie",
+ data_dir => "data"
+ },
+ log => #{
+ audit =>
+ #{
+ enable => true,
+ ignore_high_frequency_request => true,
+ level => info,
+ max_filter_size => 15,
+ rotation_count => 2,
+ rotation_size => "10MB",
+ time_offset => "system"
+ }
+ }
+}).
+
+all() ->
+ emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+ _ = application:load(emqx_conf),
+ emqx_config:erase_all(),
+ emqx_mgmt_api_test_util:init_suite([emqx_ctl, emqx_conf, emqx_audit]),
+ ok = emqx_common_test_helpers:load_config(emqx_enterprise_schema, ?CONF_DEFAULT),
+ emqx_config:save_schema_mod_and_names(emqx_enterprise_schema),
+ application:set_env(emqx, boot_modules, []),
+ emqx_conf_cli:load(),
+ Config.
+
+end_per_suite(_) ->
+ emqx_mgmt_api_test_util:end_suite([emqx_audit, emqx_conf, emqx_ctl]).
+
+t_http_api(_) ->
+ process_flag(trap_exit, true),
+ AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
+ AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+ {ok, Zones} = emqx_mgmt_api_configs_SUITE:get_global_zone(),
+ NewZones = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_qos_allowed">>], Zones, 1),
+ {ok, #{<<"mqtt">> := Res}} = emqx_mgmt_api_configs_SUITE:update_global_zone(NewZones),
+ ?assertMatch(#{<<"max_qos_allowed">> := 1}, Res),
+ {ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader),
+ ?assertMatch(
+ #{
+ <<"data">> := [
+ #{
+ <<"from">> := <<"rest_api">>,
+ <<"operation_id">> := <<"/configs/global_zone">>,
+ <<"source_ip">> := <<"127.0.0.1">>,
+ <<"source">> := _,
+ <<"http_request">> := #{
+ <<"method">> := <<"put">>,
+ <<"body">> := #{<<"mqtt">> := #{<<"max_qos_allowed">> := 1}},
+ <<"bindings">> := _,
+ <<"headers">> := #{<<"authorization">> := <<"******">>}
+ },
+ <<"http_status_code">> := 200,
+ <<"operation_result">> := <<"success">>,
+ <<"operation_type">> := <<"configs">>
+ }
+ ]
+ },
+ emqx_utils_json:decode(Res1, [return_maps])
+ ),
+ ok.
+
+t_cli(_Config) ->
+ ok = emqx_ctl:run_command(["conf", "show", "log"]),
+ AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
+ AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+ {ok, Res} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader),
+ #{<<"data">> := Data} = emqx_utils_json:decode(Res, [return_maps]),
+ ?assertMatch(
+ [
+ #{
+ <<"from">> := <<"cli">>,
+ <<"operation_id">> := <<"">>,
+ <<"source_ip">> := <<"">>,
+ <<"operation_type">> := <<"conf">>,
+ <<"args">> := [<<"show">>, <<"log">>],
+ <<"node">> := _,
+ <<"source">> := <<"">>,
+ <<"http_request">> := <<"">>
+ }
+ ],
+ Data
+ ),
+
+ %% check filter
+ {ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "from=cli", AuthHeader),
+ #{<<"data">> := Data1} = emqx_utils_json:decode(Res1, [return_maps]),
+ ?assertEqual(Data, Data1),
+ {ok, Res2} = emqx_mgmt_api_test_util:request_api(
+ get, AuditPath, "from=erlang_console", AuthHeader
+ ),
+ ?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res2, [return_maps])),
+ ok.
+
+t_kickout_clients_without_log(_) ->
+ process_flag(trap_exit, true),
+ AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
+ {ok, AuditLogs1} = emqx_mgmt_api_test_util:request_api(get, AuditPath),
+ kickout_clients(),
+ {ok, AuditLogs2} = emqx_mgmt_api_test_util:request_api(get, AuditPath),
+ ?assertEqual(AuditLogs1, AuditLogs2),
+ ok.
+
+kickout_clients() ->
+ ClientId1 = <<"client1">>,
+ ClientId2 = <<"client2">>,
+ ClientId3 = <<"client3">>,
+
+ {ok, C1} = emqtt:start_link(#{
+ clientid => ClientId1,
+ proto_ver => v5,
+ properties => #{'Session-Expiry-Interval' => 120}
+ }),
+ {ok, _} = emqtt:connect(C1),
+ {ok, C2} = emqtt:start_link(#{clientid => ClientId2}),
+ {ok, _} = emqtt:connect(C2),
+ {ok, C3} = emqtt:start_link(#{clientid => ClientId3}),
+ {ok, _} = emqtt:connect(C3),
+
+ timer:sleep(300),
+
+ %% get /clients
+ ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]),
+ {ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
+ ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]),
+ ClientsMeta = maps:get(<<"meta">>, ClientsResponse),
+ ClientsPage = maps:get(<<"page">>, ClientsMeta),
+ ClientsLimit = maps:get(<<"limit">>, ClientsMeta),
+ ClientsCount = maps:get(<<"count">>, ClientsMeta),
+ ?assertEqual(ClientsPage, 1),
+ ?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()),
+ ?assertEqual(ClientsCount, 3),
+
+ %% kickout clients
+ KickoutPath = emqx_mgmt_api_test_util:api_path(["clients", "kickout", "bulk"]),
+ KickoutBody = [ClientId1, ClientId2, ClientId3],
+ {ok, 204, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody),
+
+ {ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
+ ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]),
+ ?assertMatch(#{<<"meta">> := #{<<"count">> := 0}}, ClientsResponse2).
diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl
index ddabdae95..b47d1f961 100644
--- a/apps/emqx_conf/src/emqx_conf_cli.erl
+++ b/apps/emqx_conf/src/emqx_conf_cli.erl
@@ -108,15 +108,14 @@ admins(_) ->
emqx_ctl:usage(usage_sync()).
audit(Level, From, Log) ->
- Log1 = redact(Log#{time => logger:timestamp()}),
- ?AUDIT(Level, From, Log1).
+ ?AUDIT(Level, redact(Log#{from => From})).
-redact(Logs = #{cmd := admins, args := ["add", Username, _Password | Rest]}) ->
- Logs#{args => ["add", Username, "******" | Rest]};
-redact(Logs = #{cmd := admins, args := ["passwd", Username, _Password]}) ->
- Logs#{args => ["passwd", Username, "******"]};
-redact(Logs = #{cmd := license, args := ["update", _License]}) ->
- Logs#{args => ["update", "******"]};
+redact(Logs = #{cmd := admins, args := [<<"add">>, Username, _Password | Rest]}) ->
+ Logs#{args => [<<"add">>, Username, <<"******">> | Rest]};
+redact(Logs = #{cmd := admins, args := [<<"passwd">>, Username, _Password]}) ->
+ Logs#{args => [<<"passwd">>, Username, <<"******">>]};
+redact(Logs = #{cmd := license, args := [<<"update">>, _License]}) ->
+ Logs#{args => [<<"update">>, "******"]};
redact(Logs) ->
Logs.
diff --git a/apps/emqx_dashboard/src/emqx_dashboard.erl b/apps/emqx_dashboard/src/emqx_dashboard.erl
index 96ff3e167..cf4330e34 100644
--- a/apps/emqx_dashboard/src/emqx_dashboard.erl
+++ b/apps/emqx_dashboard/src/emqx_dashboard.erl
@@ -72,7 +72,7 @@ start_listeners(Listeners) ->
base_path => emqx_dashboard_swagger:base_path(),
modules => minirest_api:find_api_modules(apps()),
authorization => Authorization,
- log => fun emqx_dashboard_audit:log/1,
+ log => fun emqx_dashboard_audit:log/2,
security => [#{'basicAuth' => []}, #{'bearerAuth' => []}],
swagger_global_spec => GlobalSpec,
dispatch => dispatch(),
@@ -222,7 +222,7 @@ authorize(Req) ->
{bearer, Token} ->
case emqx_dashboard_admin:verify_token(Req, Token) of
{ok, Username} ->
- {ok, #{auth_type => jwt_token, username => Username}};
+ {ok, #{auth_type => jwt_token, source => Username}};
{error, token_timeout} ->
{401, 'TOKEN_TIME_OUT', <<"Token expired, get new token by POST /login">>};
{error, not_found} ->
@@ -253,15 +253,14 @@ api_key_authorize(Req, Key, Secret) ->
Path = cowboy_req:path(Req),
case emqx_mgmt_auth:authorize(Path, Req, Key, Secret) of
ok ->
- {ok, #{auth_type => api_key, api_key => Key}};
+ {ok, #{auth_type => api_key, source => Key}};
{error, <<"not_allowed">>} ->
return_unauthorized(
?BAD_API_KEY_OR_SECRET,
<<"Not allowed, Check api_key/api_secret">>
);
{error, unauthorized_role} ->
- {403, 'UNAUTHORIZED_ROLE',
- <<"This API Key don't have permission to access this resource">>};
+ {403, 'UNAUTHORIZED_ROLE', ?API_KEY_NOT_ALLOW_MSG};
{error, _} ->
return_unauthorized(
?BAD_API_KEY_OR_SECRET,
diff --git a/apps/emqx_dashboard/src/emqx_dashboard_audit.erl b/apps/emqx_dashboard/src/emqx_dashboard_audit.erl
index cb5c0f42b..c2ef1a99f 100644
--- a/apps/emqx_dashboard/src/emqx_dashboard_audit.erl
+++ b/apps/emqx_dashboard/src/emqx_dashboard_audit.erl
@@ -17,30 +17,101 @@
-module(emqx_dashboard_audit).
-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/http_api.hrl").
%% API
--export([log/1]).
+-export([log/2]).
-log(Meta0) ->
- #{req_start := ReqStart, req_end := ReqEnd, code := Code, method := Method} = Meta0,
- Duration = erlang:convert_time_unit(ReqEnd - ReqStart, native, millisecond),
- Level = level(Method, Code, Duration),
- Username = maps:get(username, Meta0, <<"">>),
- From = from(maps:get(auth_type, Meta0, "")),
- Meta1 = maps:without([req_start, req_end], Meta0),
- Meta2 = Meta1#{time => logger:timestamp(), duration_ms => Duration},
- Meta = emqx_utils:redact(Meta2),
- ?AUDIT(
- Level,
- From,
- Meta#{username => binary_to_list(Username), node => node()}
- ),
- ok.
+%% filter high frequency events
+-define(HIGH_FREQUENCY_REQUESTS, [
+ <<"/publish">>,
+ <<"/clients/:clientid/subscribe">>,
+ <<"/clients/:clientid/unsubscribe">>,
+ <<"/publish/bulk">>,
+ <<"/clients/:clientid/unsubscribe/bulk">>,
+ <<"/clients/:clientid/subscribe/bulk">>,
+ <<"/clients/kickout/bulk">>
+]).
-from(jwt_token) -> "dashboard";
-from(_) -> "rest_api".
+log(#{code := Code, method := Method} = Meta, Req) ->
+ %% Keep level/2 and log_meta/1 inside of this ?AUDIT macro
+ ?AUDIT(level(Method, Code), log_meta(Meta, Req)).
-level(get, _Code, _) -> debug;
-level(_, Code, _) when Code >= 200 andalso Code < 300 -> info;
-level(_, Code, _) when Code >= 300 andalso Code < 400 -> warning;
-level(_, Code, _) when Code >= 400 andalso Code < 500 -> error;
-level(_, _, _) -> critical.
+log_meta(Meta, Req) ->
+ #{operation_id := OperationId} = Meta,
+ case
+ lists:member(OperationId, ?HIGH_FREQUENCY_REQUESTS) andalso
+ ignore_high_frequency_request()
+ of
+ true ->
+ undefined;
+ false ->
+ Meta1 = #{
+ time => logger:timestamp(),
+ from => from(Meta),
+ source => source(Meta),
+ duration_ms => duration_ms(Meta),
+ source_ip => source_ip(Req),
+ operation_type => operation_type(Meta),
+ %% method for http filter api.
+ http_method => maps:get(method, Meta),
+ http_request => http_request(Meta),
+ http_status_code => maps:get(code, Meta),
+ operation_result => operation_result(Meta),
+ node => node()
+ },
+ Meta2 = maps:without([req_start, req_end, method, headers, body, bindings, code], Meta),
+ emqx_utils:redact(maps:merge(Meta2, Meta1))
+ end.
+
+duration_ms(#{req_start := ReqStart, req_end := ReqEnd}) ->
+ erlang:convert_time_unit(ReqEnd - ReqStart, native, millisecond).
+
+from(#{auth_type := jwt_token}) ->
+ dashboard;
+from(#{auth_type := api_key}) ->
+ rest_api;
+from(#{operation_id := <<"/login">>}) ->
+ dashboard;
+from(#{code := Code} = Meta) when Code =:= 401 orelse Code =:= 403 ->
+ case maps:find(failure, Meta) of
+ {ok, #{code := 'BAD_API_KEY_OR_SECRET'}} -> rest_api;
+ {ok, #{code := 'UNAUTHORIZED_ROLE', message := ?API_KEY_NOT_ALLOW_MSG}} -> rest_api;
+ %% 'TOKEN_TIME_OUT' 'BAD_TOKEN' is dashboard code.
+ _ -> dashboard
+ end.
+
+source(#{source := Source}) -> Source;
+source(#{operation_id := <<"/login">>, body := #{<<"username">> := Username}}) -> Username;
+source(_Meta) -> <<"">>.
+
+source_ip(Req) ->
+ case cowboy_req:header(<<"x-forwarded-for">>, Req, undefined) of
+ undefined ->
+ {RemoteIP, _} = cowboy_req:peer(Req),
+ iolist_to_binary(inet:ntoa(RemoteIP));
+ Addresses ->
+ hd(binary:split(Addresses, <<",">>))
+ end.
+
+operation_type(Meta) ->
+ case maps:find(operation_id, Meta) of
+ {ok, OperationId} ->
+ lists:nth(2, binary:split(OperationId, <<"/">>, [global]));
+ _ ->
+ <<"unknown">>
+ end.
+
+http_request(Meta) ->
+ maps:with([method, headers, bindings, body], Meta).
+
+operation_result(#{failure := _}) -> failure;
+operation_result(_) -> success.
+
+level(get, _Code) -> debug;
+level(_, Code) when Code >= 200 andalso Code < 300 -> info;
+level(_, Code) when Code >= 300 andalso Code < 400 -> warning;
+level(_, Code) when Code >= 400 andalso Code < 500 -> error;
+level(_, _) -> critical.
+
+ignore_high_frequency_request() ->
+ emqx_conf:get([log, audit, ignore_high_frequency_request], true).
diff --git a/apps/emqx_enterprise/src/emqx_enterprise.app.src b/apps/emqx_enterprise/src/emqx_enterprise.app.src
index 37d31c5ec..1a5359db6 100644
--- a/apps/emqx_enterprise/src/emqx_enterprise.app.src
+++ b/apps/emqx_enterprise/src/emqx_enterprise.app.src
@@ -1,6 +1,6 @@
{application, emqx_enterprise, [
{description, "EMQX Enterprise Edition"},
- {vsn, "0.1.3"},
+ {vsn, "0.1.4"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_enterprise/src/emqx_enterprise_schema.erl b/apps/emqx_enterprise/src/emqx_enterprise_schema.erl
index 0cf5850c8..5537c3259 100644
--- a/apps/emqx_enterprise/src/emqx_enterprise_schema.erl
+++ b/apps/emqx_enterprise/src/emqx_enterprise_schema.erl
@@ -78,6 +78,24 @@ fields("log_audit_handler") ->
desc => ?DESC(emqx_conf_schema, "log_file_handler_max_size"),
importance => ?IMPORTANCE_MEDIUM
}
+ )},
+ {"max_filter_size",
+ hoconsc:mk(
+ range(10, 30000),
+ #{
+ default => 5000,
+ desc => ?DESC(emqx_conf_schema, "audit_log_max_filter_limit"),
+ importance => ?IMPORTANCE_MEDIUM
+ }
+ )},
+ {"ignore_high_frequency_request",
+ hoconsc:mk(
+ boolean(),
+ #{
+ default => true,
+ desc => ?DESC(emqx_conf_schema, "audit_log_ignore_high_frequency_request"),
+ importance => ?IMPORTANCE_MEDIUM
+ }
)}
] ++ CommonConfs1;
fields(Name) ->
diff --git a/apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl b/apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl
index e2aece927..bf1f358ea 100644
--- a/apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl
+++ b/apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl
@@ -95,6 +95,8 @@ t_audit_log_conf(_Config) ->
<<"enable">> => false,
<<"level">> => <<"info">>,
<<"path">> => <<"log/audit.log">>,
+ <<"ignore_high_frequency_request">> => true,
+ <<"max_filter_size">> => 5000,
<<"rotation_count">> => 10,
<<"rotation_size">> => <<"50MB">>,
<<"time_offset">> => <<"system">>
diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl
index b698446b9..380ccfa6d 100644
--- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl
+++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl
@@ -33,7 +33,7 @@
]
).
-%% minirest/dashbaord_swagger behaviour callbacks
+%% minirest/dashboard_swagger behaviour callbacks
-export([
api_spec/0,
paths/0,
diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm
index 16f901d27..5ea6eee70 100644
--- a/apps/emqx_machine/priv/reboot_lists.eterm
+++ b/apps/emqx_machine/priv/reboot_lists.eterm
@@ -124,7 +124,8 @@
emqx_ft,
emqx_gcp_device,
emqx_dashboard_rbac,
- emqx_dashboard_sso
+ emqx_dashboard_sso,
+ emqx_audit
],
%% must always be of type `load'
ce_business_apps =>
diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl
index 0d847376e..aa6180e23 100644
--- a/apps/emqx_machine/src/emqx_machine_boot.erl
+++ b/apps/emqx_machine/src/emqx_machine_boot.erl
@@ -47,7 +47,10 @@ post_boot() ->
ok = ensure_apps_started(),
ok = print_vsn(),
ok = start_autocluster(),
- ?AUDIT(alert, cli, #{time => logger:timestamp(), event => "emqx_start"}),
+ ?AUDIT(alert, #{
+ event => "emqx_start",
+ from => event
+ }),
ignore.
-ifdef(TEST).
diff --git a/apps/emqx_machine/src/emqx_machine_terminator.erl b/apps/emqx_machine/src/emqx_machine_terminator.erl
index 4757507b5..d43d5fea9 100644
--- a/apps/emqx_machine/src/emqx_machine_terminator.erl
+++ b/apps/emqx_machine/src/emqx_machine_terminator.erl
@@ -67,9 +67,9 @@ graceful() ->
%% @doc Shutdown the Erlang VM and wait indefinitely.
graceful_wait() ->
- ?AUDIT(alert, cli, #{
- time => logger:timestamp(),
- event => emqx_gracefully_stop
+ ?AUDIT(alert, #{
+ event => "emqx_gracefully_stop",
+ from => event
}),
ok = graceful(),
exit_loop().
diff --git a/apps/emqx_machine/src/emqx_restricted_shell.erl b/apps/emqx_machine/src/emqx_restricted_shell.erl
index a582a3cb8..4ddc913c0 100644
--- a/apps/emqx_machine/src/emqx_restricted_shell.erl
+++ b/apps/emqx_machine/src/emqx_restricted_shell.erl
@@ -112,11 +112,11 @@ max_heap_size_warning(MF, Args) ->
log(_, {?MODULE, prompt_func}, [[{history, _}]]) ->
ok;
log(IsAllow, MF, Args) ->
- ?AUDIT(warning, erlang_console, #{
- time => logger:timestamp(),
+ ?AUDIT(warning, #{
function => MF,
args => pp_args(Args),
- permission => IsAllow
+ permission => IsAllow,
+ from => erlang_console
}),
to_console(IsAllow, MF, Args).
diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl
index 0acffbe4a..dffa39ae5 100644
--- a/apps/emqx_management/src/emqx_mgmt_api.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api.erl
@@ -341,11 +341,11 @@ do_select(
try
case maps:get(continuation, QueryState, undefined) of
undefined ->
- ets:select(Tab, Ms, Limit);
+ ets:select_reverse(Tab, Ms, Limit);
Continuation ->
%% XXX: Repair is necessary because we pass Continuation back
%% and forth through the nodes in the `do_cluster_query`
- ets:select(ets:repair_continuation(Continuation, Ms))
+ ets:select_reverse(ets:repair_continuation(Continuation, Ms))
end
catch
exit:_ = Exit ->
diff --git a/changes/ee/feat-11773.en.md b/changes/ee/feat-11773.en.md
new file mode 100644
index 000000000..fcc96b1e6
--- /dev/null
+++ b/changes/ee/feat-11773.en.md
@@ -0,0 +1 @@
+Support audit log filter via dashboard (http api).
diff --git a/mix.exs b/mix.exs
index 3817b5121..ed869c414 100644
--- a/mix.exs
+++ b/mix.exs
@@ -58,7 +58,7 @@ defmodule EMQXUmbrella.MixProject do
{:ekka, github: "emqx/ekka", tag: "0.15.16", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "3.2.0", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true},
- {:minirest, github: "emqx/minirest", tag: "1.3.13", override: true},
+ {:minirest, github: "emqx/minirest", tag: "1.3.14", override: true},
{:ecpool, github: "emqx/ecpool", tag: "0.5.4", override: true},
{:replayq, github: "emqx/replayq", tag: "0.3.7", override: true},
{:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
@@ -214,7 +214,8 @@ defmodule EMQXUmbrella.MixProject do
:emqx_bridge_azure_event_hub,
:emqx_gcp_device,
:emqx_dashboard_rbac,
- :emqx_dashboard_sso
+ :emqx_dashboard_sso,
+ :emqx_audit
])
end
@@ -329,6 +330,8 @@ defmodule EMQXUmbrella.MixProject do
:emqx_gateway_lwm2m,
:emqx_gateway_exproto,
:emqx_dashboard,
+ :emqx_dashboard_sso,
+ :emqx_audit,
:emqx_resource,
:emqx_connector,
:emqx_exhook,
diff --git a/rebar.config b/rebar.config
index e2e1a7cf0..75dc98556 100644
--- a/rebar.config
+++ b/rebar.config
@@ -65,7 +65,7 @@
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.16"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.2.0"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}}
- , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.13"}}}
+ , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.14"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}}
, {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
@@ -102,6 +102,7 @@
{emqx_schema_parser,decode,3},
{emqx_schema_parser,encode,3},
{emqx_schema_registry,add_schema,1},
+ {emqx_audit, log, 2},
emqx_exhook_pb, % generated code for protobuf
emqx_exproto_pb % generated code for protobuf
]}.
diff --git a/rebar.config.erl b/rebar.config.erl
index dd9bd1b04..d9277d7e8 100644
--- a/rebar.config.erl
+++ b/rebar.config.erl
@@ -110,6 +110,7 @@ is_community_umbrella_app("apps/emqx_bridge_azure_event_hub") -> false;
is_community_umbrella_app("apps/emqx_gcp_device") -> false;
is_community_umbrella_app("apps/emqx_dashboard_rbac") -> false;
is_community_umbrella_app("apps/emqx_dashboard_sso") -> false;
+is_community_umbrella_app("apps/emqx_audit") -> false;
is_community_umbrella_app(_) -> true.
is_jq_supported() ->
diff --git a/rel/i18n/emqx_audit_api.hocon b/rel/i18n/emqx_audit_api.hocon
new file mode 100644
index 000000000..37080838b
--- /dev/null
+++ b/rel/i18n/emqx_audit_api.hocon
@@ -0,0 +1,58 @@
+emqx_audit_api {
+
+audit_get.desc:
+"""Get audit logs based on filter API, empowers users to efficiently
+access the desired audit trail data and facilitates auditing, compliance,
+troubleshooting, and security analysis"""
+
+audit_get.label:
+"List audit logs"
+
+filter_node.desc:
+"Filter logs based on the node name to which the logs are created."
+
+filter_from.desc:
+""""Filter logs based on source type, valid values include:
+`dashboard`: Dashboard request logs, requiring the use of a jwt_token.
+`rest_api`: API KEY request logs.
+`cli`: The emqx command line logs.
+`erlang_console`: The emqx remote_console run function logs.
+`event`: Logs related to events such as emqx_start, emqx_gracefully_stop, audit_enabled, and audit_disabled."""
+
+filter_source.desc:
+""""Filter logs based on source, Possible values are:
+The login username when logs are generated from the dashboard.
+The API Keys when logs are generated from the REST API.
+empty string when logs are generated from CLI, Erlang console, or an event."""
+
+filter_source_ip.desc:
+"Filter logs based on source ip when logs are generated from dashboard and REST API."
+
+filter_operation_id.desc:
+"Filter log with swagger's operation_id when logs are generated from dashboard and REST API."
+
+filter_operation_type.desc:
+"Filter logs with operation type."
+
+filter_operation_result.desc:
+"Filter logs with operation result."
+
+filter_http_status_code.desc:
+"Filter The HTTP API with response code when logs are generated from dashboard and REST API."
+
+filter_http_method.desc:
+"Filter The HTTP API Request with method when logs are generated from dashboard and REST API."
+
+filter_gte_duration_ms.desc:
+"Filter logs with a duration greater than or equal to given microseconds."
+
+filter_lte_duration_ms.desc:
+"Filter logs with a duration less than or equal to given microseconds."
+
+filter_gte_created_at.desc:
+"Filter logs with a creation time greater than or equal to the given timestamp, rfc3339 or timestamp(millisecond)"
+
+filter_lte_created_at.desc:
+"Filter logs with a creation time less than or equal to the given timestamp, rfc3339 or timestamp(millisecond)"
+
+}
diff --git a/rel/i18n/emqx_conf_schema.hocon b/rel/i18n/emqx_conf_schema.hocon
index fde0f7ff3..ff2c3109a 100644
--- a/rel/i18n/emqx_conf_schema.hocon
+++ b/rel/i18n/emqx_conf_schema.hocon
@@ -725,6 +725,19 @@ audit_handler_level.desc:
audit_handler_level.label:
"""Log Level"""
+audit_log_max_filter_limit.desc:
+"""Store the latest N log entries in a database for allow `/audit` HTTP API to filter and retrieval of log data."""
+
+audit_log_max_filter_limit.label:
+"""Max Filter Limit"""
+
+audit_log_ignore_high_frequency_request.desc:
+"""Ignore high frequency requests to avoid flooding the audit log,
+such as publish/subscribe kick out http api requests are ignored."""
+
+audit_log_ignore_high_frequency_request.label:
+"""Ignore High Frequency Request"""
+
desc_rpc.desc:
"""EMQX uses a library called gen_rpc
for inter-broker communication.
Most of the time the default config should work,