diff --git a/apps/emqx/include/logger.hrl b/apps/emqx/include/logger.hrl index d803f67be..58ebbbf1f 100644 --- a/apps/emqx/include/logger.hrl +++ b/apps/emqx/include/logger.hrl @@ -61,25 +61,28 @@ ) end). --define(AUDIT(_Level_, _From_, _Meta_), begin +-define(AUDIT(_LevelFun_, _MetaFun_), begin case emqx_config:get([log, audit], #{enable => false}) of #{enable := false} -> ok; #{enable := true, level := _AllowLevel_} -> + _Level_ = _LevelFun_, case logger:compare_levels(_AllowLevel_, _Level_) of _R_ when _R_ == lt; _R_ == eq -> - emqx_trace:log( - _Level_, - [{emqx_audit, fun(L, _) -> L end, undefined, undefined}], - _Msg = undefined, - _Meta_#{from => _From_} - ); + ?LOG_AUDIT_EVENT(_Level_, _MetaFun_); gt -> ok end end end). +-define(LOG_AUDIT_EVENT(Level, M), begin + M1 = (M)#{time => logger:timestamp(), level => Level}, + Filter = [{emqx_audit, fun(L, _) -> L end, undefined, undefined}], + emqx_trace:log(Level, Filter, undefined, M1), + emqx_audit:log(M1) +end). + %% print to 'user' group leader -define(ULOG(Fmt, Args), io:format(user, Fmt, Args)). -define(ELOG(Fmt, Args), io:format(standard_error, Fmt, Args)). diff --git a/apps/emqx/src/config/emqx_config_logger.erl b/apps/emqx/src/config/emqx_config_logger.erl index c675edb52..57502bba8 100644 --- a/apps/emqx/src/config/emqx_config_logger.erl +++ b/apps/emqx/src/config/emqx_config_logger.erl @@ -23,6 +23,8 @@ -export([post_config_update/5]). -export([filter_audit/2]). +-include("logger.hrl"). + -define(LOG, [log]). -define(AUDIT_HANDLER, emqx_audit). @@ -96,6 +98,7 @@ update_log_handlers(NewHandlers) -> ok. update_log_handler({removed, Id}) -> + audit("audit_disabled", Id), log_to_console("Config override: ~s is removed~n", [id_for_log(Id)]), logger:remove_handler(Id); update_log_handler({Action, {handler, Id, Mod, Conf}}) -> @@ -104,6 +107,7 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) -> _ = logger:remove_handler(Id), case logger:add_handler(Id, Mod, Conf) of ok -> + audit("audit_enabled", Id), ok; %% Don't crash here, otherwise the cluster rpc will retry the wrong handler forever. {error, Reason} -> @@ -114,6 +118,11 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) -> end, ok. +audit(Event, ?AUDIT_HANDLER) -> + ?LOG_AUDIT_EVENT(alert, #{event => Event, from => event}); +audit(_, _) -> + ok. + id_for_log(console) -> "log.console"; id_for_log(Other) -> "log.file." ++ atom_to_list(Other). diff --git a/apps/emqx_audit/README.md b/apps/emqx_audit/README.md new file mode 100644 index 000000000..48c625ed5 --- /dev/null +++ b/apps/emqx_audit/README.md @@ -0,0 +1,5 @@ +emqx_audit +===== + +Audit log for EMQX, empowers users to efficiently access the desired audit trail data +and facilitates auditing, compliance, troubleshooting, and security analysis. diff --git a/apps/emqx_audit/include/emqx_audit.hrl b/apps/emqx_audit/include/emqx_audit.hrl new file mode 100644 index 000000000..59536def9 --- /dev/null +++ b/apps/emqx_audit/include/emqx_audit.hrl @@ -0,0 +1,39 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-define(AUDIT, emqx_audit). + +-record(?AUDIT, { + seq, + %% basic info + created_at, + node, + from, + source, + source_ip, + %% operation info + operation_id, + operation_type, + args, + operation_result, + failure, + %% request detail + http_method, + http_request, + http_status_code, + duration_ms, + extra +}). diff --git a/apps/emqx_audit/rebar.config b/apps/emqx_audit/rebar.config new file mode 100644 index 000000000..2656fd554 --- /dev/null +++ b/apps/emqx_audit/rebar.config @@ -0,0 +1,2 @@ +{erl_opts, [debug_info]}. +{deps, []}. diff --git a/apps/emqx_audit/src/emqx_audit.app.src b/apps/emqx_audit/src/emqx_audit.app.src new file mode 100644 index 000000000..96cdd11ce --- /dev/null +++ b/apps/emqx_audit/src/emqx_audit.app.src @@ -0,0 +1,10 @@ +{application, emqx_audit, [ + {description, "Audit log for EMQX"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {emqx_audit_app, []}}, + {applications, [kernel, stdlib, emqx]}, + {env, []}, + {modules, []}, + {links, []} +]}. diff --git a/apps/emqx_audit/src/emqx_audit.erl b/apps/emqx_audit/src/emqx_audit.erl new file mode 100644 index 000000000..4b96f00b8 --- /dev/null +++ b/apps/emqx_audit/src/emqx_audit.erl @@ -0,0 +1,202 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_audit). + +%% API +-export([]). + +-behaviour(gen_server). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include("emqx_audit.hrl"). + +%% API +-export([start_link/1]). +-export([log/1]). + +%% gen_server callbacks +-export([ + init/1, + handle_continue/2, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-define(FILTER_REQ, [cert, host_info, has_sent_resp, pid, path_info, peer, ref, sock, streamid]). +-define(CLEAN_EXPIRED_MS, 60 * 1000). + +to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) -> + #?AUDIT{ + created_at = erlang:system_time(microsecond), + node = node(), + operation_id = <<"">>, + operation_type = atom_to_binary(Cmd), + args = Args, + operation_result = <<"">>, + failure = <<"">>, + duration_ms = DurationMs, + from = cli, + source = <<"">>, + source_ip = <<"">>, + http_status_code = <<"">>, + http_method = <<"">>, + http_request = <<"">> + }; +to_audit(#{http_method := get}) -> + ok; +to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api -> + #{ + source := Source, + source_ip := SourceIp, + %% operation info + operation_id := OperationId, + operation_type := OperationType, + operation_result := OperationResult, + %% request detail + http_status_code := StatusCode, + http_method := Method, + http_request := Request, + duration_ms := DurationMs + } = Log, + #?AUDIT{ + created_at = erlang:system_time(microsecond), + node = node(), + from = From, + source = Source, + source_ip = SourceIp, + %% operation info + operation_id = OperationId, + operation_type = OperationType, + operation_result = OperationResult, + failure = maps:get(failure, Log, <<"">>), + %% request detail + http_status_code = StatusCode, + http_method = Method, + http_request = Request, + duration_ms = DurationMs, + args = <<"">> + }; +to_audit(#{from := event, event := Event}) -> + #?AUDIT{ + created_at = erlang:system_time(microsecond), + node = node(), + from = event, + source = <<"">>, + source_ip = <<"">>, + %% operation info + operation_id = iolist_to_binary(Event), + operation_type = <<"">>, + operation_result = <<"">>, + failure = <<"">>, + %% request detail + http_status_code = <<"">>, + http_method = <<"">>, + http_request = <<"">>, + duration_ms = 0, + args = <<"">> + }; +to_audit(#{from := erlang_console, function := F, args := Args}) -> + #?AUDIT{ + created_at = erlang:system_time(microsecond), + node = node(), + from = erlang_console, + source = <<"">>, + source_ip = <<"">>, + %% operation info + operation_id = <<"">>, + operation_type = <<"">>, + operation_result = <<"">>, + failure = <<"">>, + %% request detail + http_status_code = <<"">>, + http_method = <<"">>, + http_request = <<"">>, + duration_ms = 0, + args = iolist_to_binary(io_lib:format("~p: ~p~n", [F, Args])) + }. + +log(Log) -> + gen_server:cast(?MODULE, {write, to_audit(Log)}). + +start_link(Config) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Config], []). + +init([Config]) -> + erlang:process_flag(trap_exit, true), + ok = mria:create_table(?AUDIT, [ + {type, ordered_set}, + {rlog_shard, ?COMMON_SHARD}, + {storage, disc_copies}, + {record_name, ?AUDIT}, + {attributes, record_info(fields, ?AUDIT)} + ]), + {ok, Config, {continue, setup}}. + +handle_continue(setup, #{max_size := MaxSize} = State) -> + ok = mria:wait_for_tables([?AUDIT]), + LatestId = latest_id(), + clean_expired(LatestId, MaxSize), + {noreply, State#{latest_id => LatestId}}. + +handle_call(_Request, _From, State = #{}) -> + {reply, ok, State}. + +handle_cast({write, Log}, State = #{latest_id := LatestId}) -> + NewSeq = LatestId + 1, + Audit = Log#?AUDIT{seq = NewSeq}, + mnesia:dirty_write(?AUDIT, Audit), + {noreply, State#{latest_id => NewSeq}, ?CLEAN_EXPIRED_MS}; +handle_cast(_Request, State = #{}) -> + {noreply, State}. + +handle_info(timeout, State = #{max_size := MaxSize, latest_id := LatestId}) -> + clean_expired(LatestId, MaxSize), + {noreply, State#{latest_id => latest_id()}, hibernate}; +handle_info(_Info, State = #{}) -> + {noreply, State}. + +terminate(_Reason, _State = #{}) -> + ok. + +code_change(_OldVsn, State = #{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +clean_expired(LatestId, MaxSize) -> + Min = LatestId - MaxSize, + %% MS = ets:fun2ms(fun(#?AUDIT{seq = Seq}) when Seq =< Min -> true end), + MS = [{#?AUDIT{seq = '$1', _ = '_'}, [{'=<', '$1', Min}], [true]}], + NumDeleted = mnesia:ets(fun ets:select_delete/2, [?AUDIT, MS]), + ?SLOG(debug, #{ + msg => "clean_audit_log", + latest_id => LatestId, + min => Min, + deleted_number => NumDeleted + }), + ok. + +latest_id() -> + case mnesia:dirty_last(?AUDIT) of + '$end_of_table' -> 0; + Seq -> Seq + end. diff --git a/apps/emqx_audit/src/emqx_audit_api.erl b/apps/emqx_audit/src/emqx_audit_api.erl new file mode 100644 index 000000000..f69d5d909 --- /dev/null +++ b/apps/emqx_audit/src/emqx_audit_api.erl @@ -0,0 +1,397 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_audit_api). + +-behaviour(minirest_api). + +%% API +-export([api_spec/0, paths/0, schema/1, namespace/0, fields/1]). +-export([audit/2]). +-export([qs2ms/2, format/1]). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include("emqx_audit.hrl"). + +-import(hoconsc, [mk/2, ref/2, array/1]). + +-define(TAGS, ["Audit"]). + +-define(AUDIT_QS_SCHEMA, [ + {<<"node">>, atom}, + {<<"from">>, atom}, + {<<"source">>, binary}, + {<<"source_ip">>, binary}, + {<<"operation_id">>, binary}, + {<<"operation_type">>, binary}, + {<<"operation_result">>, atom}, + {<<"http_status_code">>, integer}, + {<<"http_method">>, atom}, + {<<"gte_created_at">>, timestamp}, + {<<"lte_created_at">>, timestamp}, + {<<"gte_duration_ms">>, timestamp}, + {<<"lte_duration_ms">>, timestamp} +]). + +namespace() -> "audit". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + +paths() -> + ["/audit"]. + +schema("/audit") -> + #{ + 'operationId' => audit, + get => #{ + tags => ?TAGS, + description => ?DESC(audit_get), + parameters => [ + {node, + ?HOCON(binary(), #{ + in => query, + required => false, + example => <<"emqx@127.0.0.1">>, + desc => ?DESC(filter_node) + })}, + {from, + ?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console, event]), #{ + in => query, + required => false, + example => <<"dashboard">>, + desc => ?DESC(filter_from) + })}, + {source, + ?HOCON(binary(), #{ + in => query, + required => false, + example => <<"admin">>, + desc => ?DESC(filter_source) + })}, + {source_ip, + ?HOCON(binary(), #{ + in => query, + required => false, + example => <<"127.0.0.1">>, + desc => ?DESC(filter_source_ip) + })}, + {operation_id, + ?HOCON(binary(), #{ + in => query, + required => false, + example => <<"/rules/{id}">>, + desc => ?DESC(filter_operation_id) + })}, + {operation_type, + ?HOCON(binary(), #{ + in => query, + example => <<"rules">>, + required => false, + desc => ?DESC(filter_operation_type) + })}, + {operation_result, + ?HOCON(?ENUM([success, failure]), #{ + in => query, + example => failure, + required => false, + desc => ?DESC(filter_operation_result) + })}, + {http_status_code, + ?HOCON(integer(), #{ + in => query, + example => 200, + required => false, + desc => ?DESC(filter_http_status_code) + })}, + {http_method, + ?HOCON(?ENUM([post, put, delete]), #{ + in => query, + example => post, + required => false, + desc => ?DESC(filter_http_method) + })}, + {gte_duration_ms, + ?HOCON(integer(), #{ + in => query, + example => 0, + required => false, + desc => ?DESC(filter_gte_duration_ms) + })}, + {lte_duration_ms, + ?HOCON(integer(), #{ + in => query, + example => 1000, + required => false, + desc => ?DESC(filter_lte_duration_ms) + })}, + {gte_created_at, + ?HOCON(emqx_utils_calendar:epoch_millisecond(), #{ + in => query, + required => false, + example => <<"2023-10-15T00:00:00.820384+08:00">>, + desc => ?DESC(filter_gte_created_at) + })}, + {lte_created_at, + ?HOCON(emqx_utils_calendar:epoch_millisecond(), #{ + in => query, + example => <<"2023-10-16T00:00:00.820384+08:00">>, + required => false, + desc => ?DESC(filter_lte_created_at) + })}, + ref(emqx_dashboard_swagger, page), + ref(emqx_dashboard_swagger, limit) + ], + summary => <<"List audit logs">>, + responses => #{ + 200 => + emqx_dashboard_swagger:schema_with_example( + array(?REF(audit_list)), + audit_log_list_example() + ) + } + } + }. + +fields(audit_list) -> + [ + {data, mk(array(?REF(audit)), #{desc => ?DESC("audit_resp")})}, + {meta, mk(ref(emqx_dashboard_swagger, meta), #{})} + ]; +fields(audit) -> + [ + {created_at, + ?HOCON( + emqx_utils_calendar:epoch_millisecond(), + #{ + desc => "The time when the log is created" + } + )}, + {node, + ?HOCON(binary(), #{ + desc => "The node name to which the log is created" + })}, + {from, + ?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console, event]), #{ + desc => "The source type of the log" + })}, + {source, + ?HOCON(binary(), #{ + desc => "The source of the log" + })}, + {source_ip, + ?HOCON(binary(), #{ + desc => "The source ip of the log" + })}, + {operation_id, + ?HOCON(binary(), #{ + desc => "The operation id of the log" + })}, + {operation_type, + ?HOCON(binary(), #{ + desc => "The operation type of the log" + })}, + {operation_result, + ?HOCON(?ENUM([success, failure]), #{ + desc => "The operation result of the log" + })}, + {http_status_code, + ?HOCON(integer(), #{ + desc => "The http status code of the log" + })}, + {http_method, + ?HOCON(?ENUM([post, put, delete]), #{ + desc => "The http method of the log" + })}, + {duration_ms, + ?HOCON(integer(), #{ + desc => "The duration of the log" + })}, + {args, + ?HOCON(?ARRAY(binary()), #{ + desc => "The args of the log" + })}, + {failure, + ?HOCON(?ARRAY(binary()), #{ + desc => "The failure of the log" + })}, + {http_request, + ?HOCON(?REF(http_request), #{ + desc => "The http request of the log" + })} + ]; +fields(http_request) -> + [ + {bindings, ?HOCON(map(), #{})}, + {body, ?HOCON(map(), #{})}, + {headers, ?HOCON(map(), #{})}, + {method, ?HOCON(?ENUM([post, put, delete]), #{})} + ]. + +audit(get, #{query_string := QueryString}) -> + case + emqx_mgmt_api:node_query( + node(), + ?AUDIT, + QueryString, + ?AUDIT_QS_SCHEMA, + fun ?MODULE:qs2ms/2, + fun ?MODULE:format/1 + ) + of + {error, page_limit_invalid} -> + {400, #{code => 'BAD_REQUEST', message => <<"page_limit_invalid">>}}; + {error, Node, Error} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; + Result -> + {200, Result} + end. + +qs2ms(_Tab, {Qs, _}) -> + #{ + match_spec => gen_match_spec(Qs, #?AUDIT{_ = '_'}, []), + fuzzy_fun => undefined + }. + +gen_match_spec([], Audit, Conn) -> + [{Audit, Conn, ['$_']}]; +gen_match_spec([{node, '=:=', T} | Qs], Audit, Conn) -> + gen_match_spec(Qs, Audit#?AUDIT{node = T}, Conn); +gen_match_spec([{from, '=:=', T} | Qs], Audit, Conn) -> + gen_match_spec(Qs, Audit#?AUDIT{from = T}, Conn); +gen_match_spec([{source, '=:=', T} | Qs], Audit, Conn) -> + gen_match_spec(Qs, Audit#?AUDIT{source = T}, Conn); +gen_match_spec([{source_ip, '=:=', T} | Qs], Audit, Conn) -> + gen_match_spec(Qs, Audit#?AUDIT{source_ip = T}, Conn); +gen_match_spec([{operation_id, '=:=', T} | Qs], Audit, Conn) -> + gen_match_spec(Qs, Audit#?AUDIT{operation_id = T}, Conn); +gen_match_spec([{operation_type, '=:=', T} | Qs], Audit, Conn) -> + gen_match_spec(Qs, Audit#?AUDIT{operation_type = T}, Conn); +gen_match_spec([{operation_result, '=:=', T} | Qs], Audit, Conn) -> + gen_match_spec(Qs, Audit#?AUDIT{operation_result = T}, Conn); +gen_match_spec([{http_status_code, '=:=', T} | Qs], Audit, Conn) -> + gen_match_spec(Qs, Audit#?AUDIT{http_status_code = T}, Conn); +gen_match_spec([{http_method, '=:=', T} | Qs], Audit, Conn) -> + gen_match_spec(Qs, Audit#?AUDIT{http_method = T}, Conn); +gen_match_spec([{created_at, Hold, T} | Qs], Audit, Conn) -> + gen_match_spec(Qs, Audit#?AUDIT{created_at = '$1'}, [{'$1', Hold, T} | Conn]); +gen_match_spec([{created_at, Hold1, T1, Hold2, T2} | Qs], Audit, Conn) -> + gen_match_spec(Qs, Audit#?AUDIT{created_at = '$1'}, [ + {'$1', Hold1, T1}, {'$1', Hold2, T2} | Conn + ]); +gen_match_spec([{duration_ms, Hold, T} | Qs], Audit, Conn) -> + gen_match_spec(Qs, Audit#?AUDIT{duration_ms = '$2'}, [{'$2', Hold, T} | Conn]); +gen_match_spec([{duration_ms, Hold1, T1, Hold2, T2} | Qs], Audit, Conn) -> + gen_match_spec(Qs, Audit#?AUDIT{duration_ms = '$2'}, [ + {'$2', Hold1, T1}, {'$2', Hold2, T2} | Conn + ]). + +format(Audit) -> + #?AUDIT{ + created_at = CreatedAt, + node = Node, + from = From, + source = Source, + source_ip = SourceIp, + operation_id = OperationId, + operation_type = OperationType, + operation_result = OperationResult, + http_status_code = HttpStatusCode, + http_method = HttpMethod, + duration_ms = DurationMs, + args = Args, + failure = Failure, + http_request = HttpRequest + } = Audit, + #{ + created_at => emqx_utils_calendar:epoch_to_rfc3339(CreatedAt, microsecond), + node => Node, + from => From, + source => Source, + source_ip => SourceIp, + operation_id => OperationId, + operation_type => OperationType, + operation_result => OperationResult, + http_status_code => HttpStatusCode, + http_method => HttpMethod, + duration_ms => DurationMs, + args => Args, + failure => Failure, + http_request => HttpRequest + }. + +audit_log_list_example() -> + #{ + data => [api_example(), cli_example()], + meta => #{ + <<"count">> => 2, + <<"hasnext">> => false, + <<"limit">> => 50, + <<"page">> => 1 + } + }. + +api_example() -> + #{ + <<"args">> => "", + <<"created_at">> => "2023-10-17T10:41:20.383993+08:00", + <<"duration_ms">> => 0, + <<"failure">> => "", + <<"from">> => "dashboard", + <<"http_method">> => "post", + <<"http_request">> => #{ + <<"bindings">> => #{}, + <<"body">> => #{ + <<"password">> => "******", + <<"username">> => "admin" + }, + <<"headers">> => #{ + <<"accept">> => "*/*", + <<"authorization">> => "******", + <<"connection">> => "keep-alive", + <<"content-length">> => "45", + <<"content-type">> => "application/json" + }, + <<"method">> => "post" + }, + <<"http_status_code">> => 200, + <<"node">> => "emqx@127.0.0.1", + <<"operation_id">> => "/login", + <<"operation_result">> => "success", + <<"operation_type">> => "login", + <<"source">> => "admin", + <<"source_ip">> => "127.0.0.1" + }. + +cli_example() -> + #{ + <<"args">> => [<<"show">>, <<"log">>], + <<"created_at">> => "2023-10-17T10:45:13.100426+08:00", + <<"duration_ms">> => 7, + <<"failure">> => "", + <<"from">> => "cli", + <<"http_method">> => "", + <<"http_request">> => "", + <<"http_status_code">> => "", + <<"node">> => "emqx@127.0.0.1", + <<"operation_id">> => "", + <<"operation_result">> => "", + <<"operation_type">> => "conf", + <<"source">> => "", + <<"source_ip">> => "" + }. diff --git a/apps/emqx_audit/src/emqx_audit_app.erl b/apps/emqx_audit/src/emqx_audit_app.erl new file mode 100644 index 000000000..2c7b086d5 --- /dev/null +++ b/apps/emqx_audit/src/emqx_audit_app.erl @@ -0,0 +1,27 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_audit_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + emqx_audit_sup:start_link(). + +stop(_State) -> + ok. diff --git a/apps/emqx_audit/src/emqx_audit_sup.erl b/apps/emqx_audit/src/emqx_audit_sup.erl new file mode 100644 index 000000000..460ba90d6 --- /dev/null +++ b/apps/emqx_audit/src/emqx_audit_sup.erl @@ -0,0 +1,45 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_audit_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +init([]) -> + SupFlags = #{ + strategy => one_for_all, + intensity => 10, + period => 10 + }, + ChildSpecs = [ + #{ + id => emqx_audit, + start => {emqx_audit, start_link, [#{max_size => 5000}]}, + type => worker, + restart => transient, + shutdown => 1000 + } + ], + {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index ddabdae95..b47d1f961 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -108,15 +108,14 @@ admins(_) -> emqx_ctl:usage(usage_sync()). audit(Level, From, Log) -> - Log1 = redact(Log#{time => logger:timestamp()}), - ?AUDIT(Level, From, Log1). + ?AUDIT(Level, redact(Log#{from => From})). -redact(Logs = #{cmd := admins, args := ["add", Username, _Password | Rest]}) -> - Logs#{args => ["add", Username, "******" | Rest]}; -redact(Logs = #{cmd := admins, args := ["passwd", Username, _Password]}) -> - Logs#{args => ["passwd", Username, "******"]}; -redact(Logs = #{cmd := license, args := ["update", _License]}) -> - Logs#{args => ["update", "******"]}; +redact(Logs = #{cmd := admins, args := [<<"add">>, Username, _Password | Rest]}) -> + Logs#{args => [<<"add">>, Username, <<"******">> | Rest]}; +redact(Logs = #{cmd := admins, args := [<<"passwd">>, Username, _Password]}) -> + Logs#{args => [<<"passwd">>, Username, <<"******">>]}; +redact(Logs = #{cmd := license, args := [<<"update">>, _License]}) -> + Logs#{args => [<<"update">>, "******"]}; redact(Logs) -> Logs. diff --git a/apps/emqx_dashboard/src/emqx_dashboard.erl b/apps/emqx_dashboard/src/emqx_dashboard.erl index 96ff3e167..6d6d3d596 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard.erl @@ -72,7 +72,7 @@ start_listeners(Listeners) -> base_path => emqx_dashboard_swagger:base_path(), modules => minirest_api:find_api_modules(apps()), authorization => Authorization, - log => fun emqx_dashboard_audit:log/1, + log => fun emqx_dashboard_audit:log/2, security => [#{'basicAuth' => []}, #{'bearerAuth' => []}], swagger_global_spec => GlobalSpec, dispatch => dispatch(), @@ -222,7 +222,7 @@ authorize(Req) -> {bearer, Token} -> case emqx_dashboard_admin:verify_token(Req, Token) of {ok, Username} -> - {ok, #{auth_type => jwt_token, username => Username}}; + {ok, #{auth_type => jwt_token, source => Username}}; {error, token_timeout} -> {401, 'TOKEN_TIME_OUT', <<"Token expired, get new token by POST /login">>}; {error, not_found} -> @@ -253,7 +253,7 @@ api_key_authorize(Req, Key, Secret) -> Path = cowboy_req:path(Req), case emqx_mgmt_auth:authorize(Path, Req, Key, Secret) of ok -> - {ok, #{auth_type => api_key, api_key => Key}}; + {ok, #{auth_type => api_key, source => Key}}; {error, <<"not_allowed">>} -> return_unauthorized( ?BAD_API_KEY_OR_SECRET, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_audit.erl b/apps/emqx_dashboard/src/emqx_dashboard_audit.erl index cb5c0f42b..4d3f6209e 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_audit.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_audit.erl @@ -18,29 +18,84 @@ -include_lib("emqx/include/logger.hrl"). %% API --export([log/1]). +-export([log/2]). -log(Meta0) -> - #{req_start := ReqStart, req_end := ReqEnd, code := Code, method := Method} = Meta0, - Duration = erlang:convert_time_unit(ReqEnd - ReqStart, native, millisecond), - Level = level(Method, Code, Duration), - Username = maps:get(username, Meta0, <<"">>), - From = from(maps:get(auth_type, Meta0, "")), - Meta1 = maps:without([req_start, req_end], Meta0), - Meta2 = Meta1#{time => logger:timestamp(), duration_ms => Duration}, - Meta = emqx_utils:redact(Meta2), - ?AUDIT( - Level, - From, - Meta#{username => binary_to_list(Username), node => node()} - ), - ok. +%% todo filter high frequency events +-define(HIGH_FREQUENCY_EVENTS, [ + mqtt_subscribe, + mqtt_unsubscribe, + mqtt_subscribe_batch, + mqtt_unsubscribe_batch, + mqtt_publish, + mqtt_publish_batch, + kickout_client +]). -from(jwt_token) -> "dashboard"; -from(_) -> "rest_api". +log(#{code := Code, method := Method} = Meta, Req) -> + %% Keep level/2 and log_meta/1 inside of this ?AUDIT macro + ?AUDIT(level(Method, Code), log_meta(Meta, Req)). -level(get, _Code, _) -> debug; -level(_, Code, _) when Code >= 200 andalso Code < 300 -> info; -level(_, Code, _) when Code >= 300 andalso Code < 400 -> warning; -level(_, Code, _) when Code >= 400 andalso Code < 500 -> error; -level(_, _, _) -> critical. +log_meta(Meta, Req) -> + Meta1 = #{ + time => logger:timestamp(), + from => from(Meta), + source => source(Meta), + duration_ms => duration_ms(Meta), + source_ip => source_ip(Req), + operation_type => operation_type(Meta), + %% method for http filter api. + http_method => maps:get(method, Meta), + http_request => http_request(Meta), + http_status_code => maps:get(code, Meta), + operation_result => operation_result(Meta), + node => node() + }, + Meta2 = maps:without([req_start, req_end, method, headers, body, bindings, code], Meta), + emqx_utils:redact(maps:merge(Meta2, Meta1)). + +duration_ms(#{req_start := ReqStart, req_end := ReqEnd}) -> + erlang:convert_time_unit(ReqEnd - ReqStart, native, millisecond). + +from(Meta) -> + case maps:find(auth_type, Meta) of + {ok, jwt_token} -> + dashboard; + {ok, api_key} -> + rest_api; + error -> + case maps:find(operation_id, Meta) of + %% login api create jwt_token, so we don have authorization in it's headers + {ok, <<"/login">>} -> dashboard; + _ -> unknown + end + end. +source(#{source := Source}) -> Source; +source(#{operation_id := <<"/login">>, body := #{<<"username">> := Username}}) -> Username; +source(_Meta) -> <<"">>. + +source_ip(Req) -> + case cowboy_req:header(<<"x-forwarded-for">>, Req, undefined) of + undefined -> + {RemoteIP, _} = cowboy_req:peer(Req), + iolist_to_binary(inet:ntoa(RemoteIP)); + Addresses -> + hd(binary:split(Addresses, <<",">>)) + end. + +operation_type(Meta) -> + case maps:find(operation_id, Meta) of + {ok, OperationId} -> lists:nth(2, binary:split(OperationId, <<"/">>)); + _ -> <<"unknown">> + end. + +http_request(Meta) -> + maps:with([method, headers, bindings, body], Meta). + +operation_result(#{failure := _}) -> failure; +operation_result(_) -> success. + +level(get, _Code) -> debug; +level(_, Code) when Code >= 200 andalso Code < 300 -> info; +level(_, Code) when Code >= 300 andalso Code < 400 -> warning; +level(_, Code) when Code >= 400 andalso Code < 500 -> error; +level(_, _) -> critical. diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index b698446b9..380ccfa6d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -33,7 +33,7 @@ ] ). -%% minirest/dashbaord_swagger behaviour callbacks +%% minirest/dashboard_swagger behaviour callbacks -export([ api_spec/0, paths/0, diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index 16f901d27..5ea6eee70 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -124,7 +124,8 @@ emqx_ft, emqx_gcp_device, emqx_dashboard_rbac, - emqx_dashboard_sso + emqx_dashboard_sso, + emqx_audit ], %% must always be of type `load' ce_business_apps => diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index 0d847376e..aa6180e23 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -47,7 +47,10 @@ post_boot() -> ok = ensure_apps_started(), ok = print_vsn(), ok = start_autocluster(), - ?AUDIT(alert, cli, #{time => logger:timestamp(), event => "emqx_start"}), + ?AUDIT(alert, #{ + event => "emqx_start", + from => event + }), ignore. -ifdef(TEST). diff --git a/apps/emqx_machine/src/emqx_machine_terminator.erl b/apps/emqx_machine/src/emqx_machine_terminator.erl index 4757507b5..d43d5fea9 100644 --- a/apps/emqx_machine/src/emqx_machine_terminator.erl +++ b/apps/emqx_machine/src/emqx_machine_terminator.erl @@ -67,9 +67,9 @@ graceful() -> %% @doc Shutdown the Erlang VM and wait indefinitely. graceful_wait() -> - ?AUDIT(alert, cli, #{ - time => logger:timestamp(), - event => emqx_gracefully_stop + ?AUDIT(alert, #{ + event => "emqx_gracefully_stop", + from => event }), ok = graceful(), exit_loop(). diff --git a/apps/emqx_machine/src/emqx_restricted_shell.erl b/apps/emqx_machine/src/emqx_restricted_shell.erl index a582a3cb8..4ddc913c0 100644 --- a/apps/emqx_machine/src/emqx_restricted_shell.erl +++ b/apps/emqx_machine/src/emqx_restricted_shell.erl @@ -112,11 +112,11 @@ max_heap_size_warning(MF, Args) -> log(_, {?MODULE, prompt_func}, [[{history, _}]]) -> ok; log(IsAllow, MF, Args) -> - ?AUDIT(warning, erlang_console, #{ - time => logger:timestamp(), + ?AUDIT(warning, #{ function => MF, args => pp_args(Args), - permission => IsAllow + permission => IsAllow, + from => erlang_console }), to_console(IsAllow, MF, Args). diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 0acffbe4a..dffa39ae5 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -341,11 +341,11 @@ do_select( try case maps:get(continuation, QueryState, undefined) of undefined -> - ets:select(Tab, Ms, Limit); + ets:select_reverse(Tab, Ms, Limit); Continuation -> %% XXX: Repair is necessary because we pass Continuation back %% and forth through the nodes in the `do_cluster_query` - ets:select(ets:repair_continuation(Continuation, Ms)) + ets:select_reverse(ets:repair_continuation(Continuation, Ms)) end catch exit:_ = Exit -> diff --git a/mix.exs b/mix.exs index 3817b5121..3551951fd 100644 --- a/mix.exs +++ b/mix.exs @@ -214,7 +214,8 @@ defmodule EMQXUmbrella.MixProject do :emqx_bridge_azure_event_hub, :emqx_gcp_device, :emqx_dashboard_rbac, - :emqx_dashboard_sso + :eqmx_dashboard_sso, + :emqx_audit ]) end diff --git a/rel/i18n/emqx_audit_api.hocon b/rel/i18n/emqx_audit_api.hocon new file mode 100644 index 000000000..89c335f12 --- /dev/null +++ b/rel/i18n/emqx_audit_api.hocon @@ -0,0 +1,58 @@ +emqx_audit_api { + +audit_get.desc: +"""Get audit logs based on filter API, empowers users to efficiently +access the desired audit trail data and facilitates auditing, compliance, +troubleshooting, and security analysis""" + +audit_get.label: +"List audit logs" + +filter_node.desc: +"Filter logs based on the node name to which the logs are created." + +filter_from.desc: +""""Filter logs based on source type, valid values include: +`dashboard`: Dashboard request logs, requiring the use of a jwt_token. +`rest_api`: API KEY request logs. +`cli`: The emqx command line logs. +`erlang_console`: The emqx remote_console run function logs. +`event`: Logs related to events such as emqx_start, emqx_stop, audit_enabled, and audit_disabled.""" + +filter_source.desc: +""""Filter logs based on source, Possible values are: +The login username when logs are generated from the dashboard. +The API Keys when logs are generated from the REST API. +empty string when logs are generated from CLI, Erlang console, or an event.""" + +filter_source_ip.desc: +"Filter logs based on source ip when logs are generated from dashboard and REST API." + +filter_operation_id.desc: +"Filter log with swagger's operation_id when logs are generated from dashboard and REST API." + +filter_operation_type.desc: +"Filter logs with operation type." + +filter_operation_result.desc: +"Filter logs with operation result." + +filter_http_status_code.desc: +"Filter The HTTP API with response code when logs are generated from dashboard and REST API." + +filter_http_method.desc: +"Filter The HTTP API Request with method when logs are generated from dashboard and REST API." + +filter_gte_duration_ms.desc: +"Filter logs with a duration greater than or equal to given microseconds." + +filter_lte_duration_ms.desc: +"Filter logs with a duration less than or equal to given microseconds." + +filter_gte_created_at.desc: +"Filter logs with a creation time greater than or equal to the given timestamp, rfc3339 or timestamp(millisecond)" + +filter_lte_created_at.desc: +"Filter logs with a creation time less than or equal to the given timestamp, rfc3339 or timestamp(millisecond)" + +}