Merge pull request #11824 from zhongwencool/audit-log-reviewed
refactor: audit log review refactor
This commit is contained in:
commit
e896957803
|
@ -32,6 +32,5 @@
|
||||||
|
|
||||||
-define(SHARD, ?COMMON_SHARD).
|
-define(SHARD, ?COMMON_SHARD).
|
||||||
-define(MAX_SIZE, 30).
|
-define(MAX_SIZE, 30).
|
||||||
-define(OWN_KEYS, [level, filters, filter_default, handlers]).
|
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -40,7 +40,9 @@
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
|
-define(AUDIT_HANDLER, emqx_audit).
|
||||||
-define(TRACE_FILTER, emqx_trace_filter).
|
-define(TRACE_FILTER, emqx_trace_filter).
|
||||||
|
-define(OWN_KEYS, [level, filters, filter_default, handlers]).
|
||||||
|
|
||||||
-define(TRACE(Tag, Msg, Meta), ?TRACE(debug, Tag, Msg, Meta)).
|
-define(TRACE(Tag, Msg, Meta), ?TRACE(debug, Tag, Msg, Meta)).
|
||||||
|
|
||||||
|
@ -61,21 +63,35 @@
|
||||||
)
|
)
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
-ifdef(EMQX_RELEASE_EDITION).
|
||||||
|
|
||||||
|
-if(?EMQX_RELEASE_EDITION == ee).
|
||||||
|
|
||||||
-define(AUDIT(_LevelFun_, _MetaFun_), begin
|
-define(AUDIT(_LevelFun_, _MetaFun_), begin
|
||||||
case emqx_config:get([log, audit], #{enable => false}) of
|
case logger_config:get(logger, ?AUDIT_HANDLER) of
|
||||||
#{enable := false} ->
|
{error, {not_found, _}} ->
|
||||||
ok;
|
ok;
|
||||||
#{enable := true, level := _AllowLevel_} ->
|
{ok, Handler = #{level := _AllowLevel_}} ->
|
||||||
_Level_ = _LevelFun_,
|
_Level_ = _LevelFun_,
|
||||||
case logger:compare_levels(_AllowLevel_, _Level_) of
|
case logger:compare_levels(_AllowLevel_, _Level_) of
|
||||||
_R_ when _R_ == lt; _R_ == eq ->
|
_R_ when _R_ == lt; _R_ == eq ->
|
||||||
emqx_audit:log(_Level_, _MetaFun_);
|
emqx_audit:log(_Level_, _MetaFun_, Handler);
|
||||||
gt ->
|
_ ->
|
||||||
ok
|
ok
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
-else.
|
||||||
|
%% Only for compile pass, ce edition will not call it
|
||||||
|
-define(AUDIT(_L_, _M_), _ = {_L_, _M_}).
|
||||||
|
-endif.
|
||||||
|
|
||||||
|
-else.
|
||||||
|
%% Only for compile pass, ce edition will not call it
|
||||||
|
-define(AUDIT(_L_, _M_), _ = {_L_, _M_}).
|
||||||
|
-endif.
|
||||||
|
|
||||||
%% print to 'user' group leader
|
%% print to 'user' group leader
|
||||||
-define(ULOG(Fmt, Args), io:format(user, Fmt, Args)).
|
-define(ULOG(Fmt, Args), io:format(user, Fmt, Args)).
|
||||||
-define(ELOG(Fmt, Args), io:format(standard_error, Fmt, Args)).
|
-define(ELOG(Fmt, Args), io:format(standard_error, Fmt, Args)).
|
||||||
|
|
|
@ -26,7 +26,6 @@
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
|
|
||||||
-define(LOG, [log]).
|
-define(LOG, [log]).
|
||||||
-define(AUDIT_HANDLER, emqx_audit).
|
|
||||||
|
|
||||||
add_handler() ->
|
add_handler() ->
|
||||||
ok = emqx_config_handler:add_handler(?LOG, ?MODULE),
|
ok = emqx_config_handler:add_handler(?LOG, ?MODULE),
|
||||||
|
@ -97,8 +96,11 @@ update_log_handlers(NewHandlers) ->
|
||||||
ok = application:set_env(kernel, logger, NewHandlers),
|
ok = application:set_env(kernel, logger, NewHandlers),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% Don't remove audit log handler here, we need record this removed action into audit log file.
|
||||||
|
%% we will remove audit log handler after audit log is record in emqx_audit:log/3.
|
||||||
|
update_log_handler({removed, ?AUDIT_HANDLER}) ->
|
||||||
|
ok;
|
||||||
update_log_handler({removed, Id}) ->
|
update_log_handler({removed, Id}) ->
|
||||||
audit("audit_disabled", Id),
|
|
||||||
log_to_console("Config override: ~s is removed~n", [id_for_log(Id)]),
|
log_to_console("Config override: ~s is removed~n", [id_for_log(Id)]),
|
||||||
logger:remove_handler(Id);
|
logger:remove_handler(Id);
|
||||||
update_log_handler({Action, {handler, Id, Mod, Conf}}) ->
|
update_log_handler({Action, {handler, Id, Mod, Conf}}) ->
|
||||||
|
@ -107,7 +109,6 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) ->
|
||||||
_ = logger:remove_handler(Id),
|
_ = logger:remove_handler(Id),
|
||||||
case logger:add_handler(Id, Mod, Conf) of
|
case logger:add_handler(Id, Mod, Conf) of
|
||||||
ok ->
|
ok ->
|
||||||
audit("audit_enabled", Id),
|
|
||||||
ok;
|
ok;
|
||||||
%% Don't crash here, otherwise the cluster rpc will retry the wrong handler forever.
|
%% Don't crash here, otherwise the cluster rpc will retry the wrong handler forever.
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -118,23 +119,6 @@ update_log_handler({Action, {handler, Id, Mod, Conf}}) ->
|
||||||
end,
|
end,
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-ifdef(EMQX_RELEASE_EDITION).
|
|
||||||
|
|
||||||
-if(?EMQX_RELEASE_EDITION == ee).
|
|
||||||
audit(Event, ?AUDIT_HANDLER) ->
|
|
||||||
emqx_audit:log(alert, #{event => Event, from => event});
|
|
||||||
audit(_, _) ->
|
|
||||||
ok.
|
|
||||||
-else.
|
|
||||||
audit(_, _) ->
|
|
||||||
ok.
|
|
||||||
-endif.
|
|
||||||
|
|
||||||
-else.
|
|
||||||
audit(_, _) ->
|
|
||||||
ok.
|
|
||||||
-endif.
|
|
||||||
|
|
||||||
id_for_log(console) -> "log.console";
|
id_for_log(console) -> "log.console";
|
||||||
id_for_log(Other) -> "log.file." ++ atom_to_list(Other).
|
id_for_log(Other) -> "log.file." ++ atom_to_list(Other).
|
||||||
|
|
||||||
|
|
|
@ -105,7 +105,7 @@ log_filter([{Id, FilterFun, Filter, Name} | Rest], Log0) ->
|
||||||
ignore ->
|
ignore ->
|
||||||
ignore;
|
ignore;
|
||||||
Log ->
|
Log ->
|
||||||
case logger_config:get(ets:whereis(logger), Id) of
|
case logger_config:get(logger, Id) of
|
||||||
{ok, #{module := Module} = HandlerConfig0} ->
|
{ok, #{module := Module} = HandlerConfig0} ->
|
||||||
HandlerConfig = maps:without(?OWN_KEYS, HandlerConfig0),
|
HandlerConfig = maps:without(?OWN_KEYS, HandlerConfig0),
|
||||||
try
|
try
|
||||||
|
|
|
@ -5,7 +5,6 @@
|
||||||
-define(AUDIT, emqx_audit).
|
-define(AUDIT, emqx_audit).
|
||||||
|
|
||||||
-record(?AUDIT, {
|
-record(?AUDIT, {
|
||||||
seq,
|
|
||||||
%% basic info
|
%% basic info
|
||||||
created_at,
|
created_at,
|
||||||
node,
|
node,
|
||||||
|
|
|
@ -12,7 +12,9 @@
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
-export([log/1, log/2]).
|
-export([log/3]).
|
||||||
|
|
||||||
|
-export([trans_clean_expired/2]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([
|
-export([
|
||||||
|
@ -26,12 +28,15 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(FILTER_REQ, [cert, host_info, has_sent_resp, pid, path_info, peer, ref, sock, streamid]).
|
-define(FILTER_REQ, [cert, host_info, has_sent_resp, pid, path_info, peer, ref, sock, streamid]).
|
||||||
-define(CLEAN_EXPIRED_MS, 60 * 1000).
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-define(INTERVAL, 100).
|
||||||
|
-else.
|
||||||
|
-define(INTERVAL, 10000).
|
||||||
|
-endif.
|
||||||
|
|
||||||
to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) ->
|
to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) ->
|
||||||
#?AUDIT{
|
#?AUDIT{
|
||||||
created_at = erlang:system_time(microsecond),
|
|
||||||
node = node(),
|
|
||||||
operation_id = <<"">>,
|
operation_id = <<"">>,
|
||||||
operation_type = atom_to_binary(Cmd),
|
operation_type = atom_to_binary(Cmd),
|
||||||
args = Args,
|
args = Args,
|
||||||
|
@ -45,8 +50,6 @@ to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) ->
|
||||||
http_method = <<"">>,
|
http_method = <<"">>,
|
||||||
http_request = <<"">>
|
http_request = <<"">>
|
||||||
};
|
};
|
||||||
to_audit(#{http_method := get}) ->
|
|
||||||
ok;
|
|
||||||
to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api ->
|
to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api ->
|
||||||
#{
|
#{
|
||||||
source := Source,
|
source := Source,
|
||||||
|
@ -62,8 +65,6 @@ to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api
|
||||||
duration_ms := DurationMs
|
duration_ms := DurationMs
|
||||||
} = Log,
|
} = Log,
|
||||||
#?AUDIT{
|
#?AUDIT{
|
||||||
created_at = erlang:system_time(microsecond),
|
|
||||||
node = node(),
|
|
||||||
from = From,
|
from = From,
|
||||||
source = Source,
|
source = Source,
|
||||||
source_ip = SourceIp,
|
source_ip = SourceIp,
|
||||||
|
@ -79,29 +80,8 @@ to_audit(#{from := From} = Log) when From =:= dashboard orelse From =:= rest_api
|
||||||
duration_ms = DurationMs,
|
duration_ms = DurationMs,
|
||||||
args = <<"">>
|
args = <<"">>
|
||||||
};
|
};
|
||||||
to_audit(#{from := event, event := Event}) ->
|
|
||||||
#?AUDIT{
|
|
||||||
created_at = erlang:system_time(microsecond),
|
|
||||||
node = node(),
|
|
||||||
from = event,
|
|
||||||
source = <<"">>,
|
|
||||||
source_ip = <<"">>,
|
|
||||||
%% operation info
|
|
||||||
operation_id = iolist_to_binary(Event),
|
|
||||||
operation_type = <<"">>,
|
|
||||||
operation_result = <<"">>,
|
|
||||||
failure = <<"">>,
|
|
||||||
%% request detail
|
|
||||||
http_status_code = <<"">>,
|
|
||||||
http_method = <<"">>,
|
|
||||||
http_request = <<"">>,
|
|
||||||
duration_ms = 0,
|
|
||||||
args = <<"">>
|
|
||||||
};
|
|
||||||
to_audit(#{from := erlang_console, function := F, args := Args}) ->
|
to_audit(#{from := erlang_console, function := F, args := Args}) ->
|
||||||
#?AUDIT{
|
#?AUDIT{
|
||||||
created_at = erlang:system_time(microsecond),
|
|
||||||
node = node(),
|
|
||||||
from = erlang_console,
|
from = erlang_console,
|
||||||
source = <<"">>,
|
source = <<"">>,
|
||||||
source_ip = <<"">>,
|
source_ip = <<"">>,
|
||||||
|
@ -118,16 +98,30 @@ to_audit(#{from := erlang_console, function := F, args := Args}) ->
|
||||||
args = iolist_to_binary(io_lib:format("~p: ~p~n", [F, Args]))
|
args = iolist_to_binary(io_lib:format("~p: ~p~n", [F, Args]))
|
||||||
}.
|
}.
|
||||||
|
|
||||||
log(_Level, undefined) ->
|
log(_Level, undefined, _Handler) ->
|
||||||
ok;
|
ok;
|
||||||
log(Level, Meta1) ->
|
log(Level, Meta1, Handler) ->
|
||||||
Meta2 = Meta1#{time => logger:timestamp(), level => Level},
|
Meta2 = Meta1#{time => logger:timestamp(), level => Level},
|
||||||
Filter = [{emqx_audit, fun(L, _) -> L end, undefined, undefined}],
|
log_to_file(Level, Meta2, Handler),
|
||||||
emqx_trace:log(Level, Filter, undefined, Meta2),
|
log_to_db(Meta2),
|
||||||
emqx_audit:log(Meta2).
|
remove_handler_when_disabled().
|
||||||
|
|
||||||
log(Log) ->
|
remove_handler_when_disabled() ->
|
||||||
gen_server:cast(?MODULE, {write, to_audit(Log)}).
|
case emqx_config:get([log, audit, enable], false) of
|
||||||
|
true ->
|
||||||
|
ok;
|
||||||
|
false ->
|
||||||
|
_ = logger:remove_handler(?AUDIT_HANDLER),
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
log_to_db(Log) ->
|
||||||
|
Audit0 = to_audit(Log),
|
||||||
|
Audit = Audit0#?AUDIT{
|
||||||
|
node = node(),
|
||||||
|
created_at = erlang:system_time(microsecond)
|
||||||
|
},
|
||||||
|
mria:dirty_write(?AUDIT, Audit).
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
@ -142,80 +136,110 @@ init([]) ->
|
||||||
]),
|
]),
|
||||||
{ok, #{}, {continue, setup}}.
|
{ok, #{}, {continue, setup}}.
|
||||||
|
|
||||||
handle_continue(setup, #{} = State) ->
|
handle_continue(setup, State) ->
|
||||||
ok = mria:wait_for_tables([?AUDIT]),
|
ok = mria:wait_for_tables([?AUDIT]),
|
||||||
clean_expired(),
|
NewState = State#{role => mria_rlog:role()},
|
||||||
{noreply, State}.
|
?AUDIT(alert, #{
|
||||||
|
cmd => emqx,
|
||||||
|
args => ["start"],
|
||||||
|
version => emqx_release:version(),
|
||||||
|
from => cli,
|
||||||
|
duration_ms => 0
|
||||||
|
}),
|
||||||
|
{noreply, NewState, interval(NewState)}.
|
||||||
|
|
||||||
handle_call(_Request, _From, State = #{}) ->
|
handle_call(_Request, _From, State) ->
|
||||||
{reply, ok, State}.
|
{reply, ignore, State, interval(State)}.
|
||||||
|
|
||||||
handle_cast({write, Log}, State) ->
|
handle_cast(_Request, State) ->
|
||||||
_ = write_log(Log),
|
{noreply, State, interval(State)}.
|
||||||
{noreply, State#{}, ?CLEAN_EXPIRED_MS};
|
|
||||||
handle_cast(_Request, State = #{}) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
handle_info(timeout, State = #{}) ->
|
handle_info(timeout, State) ->
|
||||||
clean_expired(),
|
ExtraWait = clean_expired_logs(),
|
||||||
{noreply, State, hibernate};
|
{noreply, State, interval(State) + ExtraWait};
|
||||||
handle_info(_Info, State = #{}) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State, interval(State)}.
|
||||||
|
|
||||||
terminate(_Reason, _State = #{}) ->
|
terminate(_Reason, _State) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
code_change(_OldVsn, State = #{}, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
write_log(Log) ->
|
%% if clean_expired transaction aborted, it will be scheduled with extra 60 seconds.
|
||||||
case
|
clean_expired_logs() ->
|
||||||
mria:transaction(
|
|
||||||
?COMMON_SHARD,
|
|
||||||
fun(L) ->
|
|
||||||
New =
|
|
||||||
case mnesia:last(?AUDIT) of
|
|
||||||
'$end_of_table' -> 1;
|
|
||||||
LastId -> LastId + 1
|
|
||||||
end,
|
|
||||||
mnesia:write(L#?AUDIT{seq = New})
|
|
||||||
end,
|
|
||||||
[Log]
|
|
||||||
)
|
|
||||||
of
|
|
||||||
{atomic, ok} ->
|
|
||||||
ok;
|
|
||||||
Reason ->
|
|
||||||
?SLOG(warning, #{
|
|
||||||
msg => "write_audit_log_failed",
|
|
||||||
reason => Reason
|
|
||||||
})
|
|
||||||
end.
|
|
||||||
|
|
||||||
clean_expired() ->
|
|
||||||
MaxSize = max_size(),
|
MaxSize = max_size(),
|
||||||
LatestId = latest_id(),
|
Oldest = mnesia:dirty_first(?AUDIT),
|
||||||
Min = LatestId - MaxSize,
|
CurSize = mnesia:table_info(?AUDIT, size),
|
||||||
%% MS = ets:fun2ms(fun(#?AUDIT{seq = Seq}) when Seq =< Min -> true end),
|
case CurSize - MaxSize of
|
||||||
MS = [{#?AUDIT{seq = '$1', _ = '_'}, [{'=<', '$1', Min}], [true]}],
|
DelSize when DelSize > 0 ->
|
||||||
NumDeleted = mnesia:ets(fun ets:select_delete/2, [?AUDIT, MS]),
|
case
|
||||||
?SLOG(debug, #{
|
mria:transaction(
|
||||||
msg => "clean_audit_log",
|
?COMMON_SHARD,
|
||||||
latest_id => LatestId,
|
fun ?MODULE:trans_clean_expired/2,
|
||||||
min => Min,
|
[Oldest, DelSize]
|
||||||
deleted_number => NumDeleted
|
)
|
||||||
}),
|
of
|
||||||
ok.
|
{atomic, ok} ->
|
||||||
|
0;
|
||||||
latest_id() ->
|
{aborted, Reason} ->
|
||||||
case mnesia:dirty_last(?AUDIT) of
|
?SLOG(error, #{
|
||||||
'$end_of_table' -> 0;
|
msg => "clean_expired_audit_aborted",
|
||||||
Seq -> Seq
|
reason => Reason,
|
||||||
|
delete_size => DelSize,
|
||||||
|
current_size => CurSize,
|
||||||
|
max_count => MaxSize
|
||||||
|
}),
|
||||||
|
60000
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
0
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
trans_clean_expired(Oldest, DelCount) ->
|
||||||
|
First = mnesia:first(?AUDIT),
|
||||||
|
%% Other node already clean from the oldest record.
|
||||||
|
%% ensure not delete twice, otherwise records that should not be deleted will be deleted.
|
||||||
|
case First =:= Oldest of
|
||||||
|
true -> do_clean_expired(First, DelCount);
|
||||||
|
false -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_clean_expired(_, DelSize) when DelSize =< 0 -> ok;
|
||||||
|
do_clean_expired('$end_of_table', _DelSize) ->
|
||||||
|
ok;
|
||||||
|
do_clean_expired(CurKey, DeleteSize) ->
|
||||||
|
mnesia:delete(?AUDIT, CurKey, sticky_write),
|
||||||
|
do_clean_expired(mnesia:next(?AUDIT, CurKey), DeleteSize - 1).
|
||||||
|
|
||||||
max_size() ->
|
max_size() ->
|
||||||
emqx_conf:get([log, audit, max_filter_size], 5000).
|
emqx_conf:get([log, audit, max_filter_size], 5000).
|
||||||
|
|
||||||
|
interval(#{role := replicant}) -> hibernate;
|
||||||
|
interval(#{role := core}) -> ?INTERVAL + rand:uniform(?INTERVAL).
|
||||||
|
|
||||||
|
log_to_file(Level, Meta, #{module := Module} = Handler) ->
|
||||||
|
Log = #{level => Level, meta => Meta, msg => undefined},
|
||||||
|
Handler1 = maps:without(?OWN_KEYS, Handler),
|
||||||
|
try
|
||||||
|
erlang:apply(Module, log, [Log, Handler1])
|
||||||
|
catch
|
||||||
|
C:R:S ->
|
||||||
|
case logger:remove_handler(?AUDIT_HANDLER) of
|
||||||
|
ok ->
|
||||||
|
logger:internal_log(
|
||||||
|
error, {removed_failing_handler, ?AUDIT_HANDLER, C, R, S}
|
||||||
|
);
|
||||||
|
{error, {not_found, _}} ->
|
||||||
|
ok;
|
||||||
|
{error, Reason} ->
|
||||||
|
logger:internal_log(
|
||||||
|
error,
|
||||||
|
{removed_handler_failed, ?AUDIT_HANDLER, Reason, C, R, S}
|
||||||
|
)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
|
@ -35,6 +35,7 @@
|
||||||
{<<"gte_duration_ms">>, timestamp},
|
{<<"gte_duration_ms">>, timestamp},
|
||||||
{<<"lte_duration_ms">>, timestamp}
|
{<<"lte_duration_ms">>, timestamp}
|
||||||
]).
|
]).
|
||||||
|
-define(DISABLE_MSG, <<"Audit is disabled">>).
|
||||||
|
|
||||||
namespace() -> "audit".
|
namespace() -> "audit".
|
||||||
|
|
||||||
|
@ -59,7 +60,7 @@ schema("/audit") ->
|
||||||
desc => ?DESC(filter_node)
|
desc => ?DESC(filter_node)
|
||||||
})},
|
})},
|
||||||
{from,
|
{from,
|
||||||
?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console, event]), #{
|
?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console]), #{
|
||||||
in => query,
|
in => query,
|
||||||
required => false,
|
required => false,
|
||||||
example => <<"dashboard">>,
|
example => <<"dashboard">>,
|
||||||
|
@ -151,7 +152,11 @@ schema("/audit") ->
|
||||||
emqx_dashboard_swagger:schema_with_example(
|
emqx_dashboard_swagger:schema_with_example(
|
||||||
array(?REF(audit_list)),
|
array(?REF(audit_list)),
|
||||||
audit_log_list_example()
|
audit_log_list_example()
|
||||||
)
|
),
|
||||||
|
400 => emqx_dashboard_swagger:error_codes(
|
||||||
|
['BAD_REQUEST'],
|
||||||
|
?DISABLE_MSG
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
@ -175,7 +180,7 @@ fields(audit) ->
|
||||||
desc => "The node name to which the log is created"
|
desc => "The node name to which the log is created"
|
||||||
})},
|
})},
|
||||||
{from,
|
{from,
|
||||||
?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console, event]), #{
|
?HOCON(?ENUM([dashboard, rest_api, cli, erlang_console]), #{
|
||||||
desc => "The source type of the log"
|
desc => "The source type of the log"
|
||||||
})},
|
})},
|
||||||
{source,
|
{source,
|
||||||
|
@ -232,23 +237,30 @@ fields(http_request) ->
|
||||||
].
|
].
|
||||||
|
|
||||||
audit(get, #{query_string := QueryString}) ->
|
audit(get, #{query_string := QueryString}) ->
|
||||||
case
|
case emqx_config:get([log, audit, enable], false) of
|
||||||
emqx_mgmt_api:node_query(
|
false ->
|
||||||
node(),
|
{400, #{code => 'BAD_REQUEST', message => ?DISABLE_MSG}};
|
||||||
?AUDIT,
|
true ->
|
||||||
QueryString,
|
case
|
||||||
?AUDIT_QS_SCHEMA,
|
emqx_mgmt_api:node_query(
|
||||||
fun ?MODULE:qs2ms/2,
|
node(),
|
||||||
fun ?MODULE:format/1
|
?AUDIT,
|
||||||
)
|
QueryString,
|
||||||
of
|
?AUDIT_QS_SCHEMA,
|
||||||
{error, page_limit_invalid} ->
|
fun ?MODULE:qs2ms/2,
|
||||||
{400, #{code => 'BAD_REQUEST', message => <<"page_limit_invalid">>}};
|
fun ?MODULE:format/1
|
||||||
{error, Node, Error} ->
|
)
|
||||||
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
|
of
|
||||||
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
{error, page_limit_invalid} ->
|
||||||
Result ->
|
{400, #{code => 'BAD_REQUEST', message => <<"page_limit_invalid">>}};
|
||||||
{200, Result}
|
{error, Node, Error} ->
|
||||||
|
Message = list_to_binary(
|
||||||
|
io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])
|
||||||
|
),
|
||||||
|
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
||||||
|
Result ->
|
||||||
|
{200, Result}
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
qs2ms(_Tab, {Qs, _}) ->
|
qs2ms(_Tab, {Qs, _}) ->
|
||||||
|
|
|
@ -19,6 +19,19 @@
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
[
|
||||||
|
{group, audit, [sequence]}
|
||||||
|
].
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
[
|
||||||
|
{audit, [sequence], common_tests()}
|
||||||
|
].
|
||||||
|
|
||||||
|
common_tests() ->
|
||||||
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
-define(CONF_DEFAULT, #{
|
-define(CONF_DEFAULT, #{
|
||||||
node =>
|
node =>
|
||||||
#{
|
#{
|
||||||
|
@ -40,15 +53,13 @@
|
||||||
}
|
}
|
||||||
}).
|
}).
|
||||||
|
|
||||||
all() ->
|
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
_ = application:load(emqx_conf),
|
_ = application:load(emqx_conf),
|
||||||
emqx_config:erase_all(),
|
emqx_config:erase_all(),
|
||||||
emqx_mgmt_api_test_util:init_suite([emqx_ctl, emqx_conf, emqx_audit]),
|
emqx_mgmt_api_test_util:init_suite([emqx_ctl, emqx_conf, emqx_audit]),
|
||||||
ok = emqx_common_test_helpers:load_config(emqx_enterprise_schema, ?CONF_DEFAULT),
|
ok = emqx_common_test_helpers:load_config(emqx_enterprise_schema, ?CONF_DEFAULT),
|
||||||
emqx_config:save_schema_mod_and_names(emqx_enterprise_schema),
|
emqx_config:save_schema_mod_and_names(emqx_enterprise_schema),
|
||||||
|
ok = emqx_config_logger:refresh_config(),
|
||||||
application:set_env(emqx, boot_modules, []),
|
application:set_env(emqx, boot_modules, []),
|
||||||
emqx_conf_cli:load(),
|
emqx_conf_cli:load(),
|
||||||
Config.
|
Config.
|
||||||
|
@ -89,6 +100,44 @@ t_http_api(_) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_disabled(_) ->
|
||||||
|
Enable = [log, audit, enable],
|
||||||
|
?assertEqual(true, emqx:get_config(Enable)),
|
||||||
|
AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
|
||||||
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
{ok, _} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader),
|
||||||
|
Size1 = mnesia:table_info(emqx_audit, size),
|
||||||
|
|
||||||
|
{ok, Logs} = emqx_mgmt_api_configs_SUITE:get_config("log"),
|
||||||
|
Logs1 = emqx_utils_maps:deep_put([<<"audit">>, <<"max_filter_size">>], Logs, 100),
|
||||||
|
NewLogs = emqx_utils_maps:deep_put([<<"audit">>, <<"enable">>], Logs1, false),
|
||||||
|
{ok, _} = emqx_mgmt_api_configs_SUITE:update_config("log", NewLogs),
|
||||||
|
{ok, GetLog1} = emqx_mgmt_api_configs_SUITE:get_config("log"),
|
||||||
|
?assertEqual(NewLogs, GetLog1),
|
||||||
|
?assertMatch(
|
||||||
|
{error, _},
|
||||||
|
emqx_mgmt_api_test_util:request_api(get, AuditPath, "limit=1", AuthHeader)
|
||||||
|
),
|
||||||
|
|
||||||
|
Size2 = mnesia:table_info(emqx_audit, size),
|
||||||
|
%% Record the audit disable action, so the size + 1
|
||||||
|
?assertEqual(Size1 + 1, Size2),
|
||||||
|
|
||||||
|
{ok, Zones} = emqx_mgmt_api_configs_SUITE:get_global_zone(),
|
||||||
|
NewZones = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_topic_levels">>], Zones, 111),
|
||||||
|
{ok, #{<<"mqtt">> := Res}} = emqx_mgmt_api_configs_SUITE:update_global_zone(NewZones),
|
||||||
|
?assertMatch(#{<<"max_topic_levels">> := 111}, Res),
|
||||||
|
Size3 = mnesia:table_info(emqx_audit, size),
|
||||||
|
%% Don't record mqtt update request.
|
||||||
|
?assertEqual(Size2, Size3),
|
||||||
|
%% enabled again
|
||||||
|
{ok, _} = emqx_mgmt_api_configs_SUITE:update_config("log", Logs1),
|
||||||
|
{ok, GetLog2} = emqx_mgmt_api_configs_SUITE:get_config("log"),
|
||||||
|
?assertEqual(Logs1, GetLog2),
|
||||||
|
Size4 = mnesia:table_info(emqx_audit, size),
|
||||||
|
?assertEqual(Size3 + 1, Size4),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_cli(_Config) ->
|
t_cli(_Config) ->
|
||||||
ok = emqx_ctl:run_command(["conf", "show", "log"]),
|
ok = emqx_ctl:run_command(["conf", "show", "log"]),
|
||||||
AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
|
AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
|
||||||
|
@ -121,6 +170,35 @@ t_cli(_Config) ->
|
||||||
?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res2, [return_maps])),
|
?assertMatch(#{<<"data">> := []}, emqx_utils_json:decode(Res2, [return_maps])),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_max_size(_Config) ->
|
||||||
|
{ok, _} = emqx:update_config([log, audit, max_filter_size], 1000),
|
||||||
|
SizeFun =
|
||||||
|
fun() ->
|
||||||
|
AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
|
||||||
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Limit = "limit=1000",
|
||||||
|
{ok, Res} = emqx_mgmt_api_test_util:request_api(get, AuditPath, Limit, AuthHeader),
|
||||||
|
#{<<"data">> := Data} = emqx_utils_json:decode(Res, [return_maps]),
|
||||||
|
erlang:length(Data)
|
||||||
|
end,
|
||||||
|
InitSize = SizeFun(),
|
||||||
|
lists:foreach(
|
||||||
|
fun(_) ->
|
||||||
|
ok = emqx_ctl:run_command(["conf", "show", "log"])
|
||||||
|
end,
|
||||||
|
lists:duplicate(100, 1)
|
||||||
|
),
|
||||||
|
timer:sleep(110),
|
||||||
|
Size1 = SizeFun(),
|
||||||
|
?assert(Size1 - InitSize >= 100, {Size1, InitSize}),
|
||||||
|
{ok, _} = emqx:update_config([log, audit, max_filter_size], 10),
|
||||||
|
%% wait for clean_expired
|
||||||
|
timer:sleep(250),
|
||||||
|
ExpectSize = emqx:get_config([log, audit, max_filter_size]),
|
||||||
|
Size2 = SizeFun(),
|
||||||
|
?assertEqual(ExpectSize, Size2, {sys:get_state(emqx_audit)}),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_kickout_clients_without_log(_) ->
|
t_kickout_clients_without_log(_) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
|
AuditPath = emqx_mgmt_api_test_util:api_path(["audit"]),
|
||||||
|
@ -167,4 +245,4 @@ kickout_clients() ->
|
||||||
|
|
||||||
{ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
|
{ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
|
||||||
ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]),
|
ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]),
|
||||||
?assertMatch(#{<<"meta">> := #{<<"count">> := 0}}, ClientsResponse2).
|
?assertMatch(#{<<"data">> := []}, ClientsResponse2).
|
||||||
|
|
|
@ -37,10 +37,15 @@
|
||||||
-define(AUDIT_MOD, audit).
|
-define(AUDIT_MOD, audit).
|
||||||
-define(UPDATE_READONLY_KEYS_PROHIBITED, "update_readonly_keys_prohibited").
|
-define(UPDATE_READONLY_KEYS_PROHIBITED, "update_readonly_keys_prohibited").
|
||||||
|
|
||||||
|
-dialyzer({no_match, [load/0]}).
|
||||||
|
|
||||||
load() ->
|
load() ->
|
||||||
emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, [hidden]),
|
emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, [hidden]),
|
||||||
emqx_ctl:register_command(?CONF, {?MODULE, conf}, []),
|
emqx_ctl:register_command(?CONF, {?MODULE, conf}, []),
|
||||||
emqx_ctl:register_command(?AUDIT_MOD, {?MODULE, audit}, [hidden]),
|
case emqx_release:edition() of
|
||||||
|
ee -> emqx_ctl:register_command(?AUDIT_MOD, {?MODULE, audit}, [hidden]);
|
||||||
|
ce -> ok
|
||||||
|
end,
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
unload() ->
|
unload() ->
|
||||||
|
|
|
@ -72,7 +72,7 @@ start_listeners(Listeners) ->
|
||||||
base_path => emqx_dashboard_swagger:base_path(),
|
base_path => emqx_dashboard_swagger:base_path(),
|
||||||
modules => minirest_api:find_api_modules(apps()),
|
modules => minirest_api:find_api_modules(apps()),
|
||||||
authorization => Authorization,
|
authorization => Authorization,
|
||||||
log => fun emqx_dashboard_audit:log/2,
|
log => audit_log_fun(),
|
||||||
security => [#{'basicAuth' => []}, #{'bearerAuth' => []}],
|
security => [#{'basicAuth' => []}, #{'bearerAuth' => []}],
|
||||||
swagger_global_spec => GlobalSpec,
|
swagger_global_spec => GlobalSpec,
|
||||||
dispatch => dispatch(),
|
dispatch => dispatch(),
|
||||||
|
@ -210,9 +210,19 @@ filter_false(K, V, S) -> [{K, V} | S].
|
||||||
listener_name(Protocol) ->
|
listener_name(Protocol) ->
|
||||||
list_to_atom(atom_to_list(Protocol) ++ ":dashboard").
|
list_to_atom(atom_to_list(Protocol) ++ ":dashboard").
|
||||||
|
|
||||||
|
-dialyzer({no_match, [audit_log_fun/0]}).
|
||||||
|
|
||||||
|
audit_log_fun() ->
|
||||||
|
case emqx_release:edition() of
|
||||||
|
ee -> fun emqx_dashboard_audit:log/2;
|
||||||
|
ce -> undefined
|
||||||
|
end.
|
||||||
|
|
||||||
-if(?EMQX_RELEASE_EDITION =/= ee).
|
-if(?EMQX_RELEASE_EDITION =/= ee).
|
||||||
|
|
||||||
%% dialyzer complains about the `unauthorized_role' clause...
|
%% dialyzer complains about the `unauthorized_role' clause...
|
||||||
-dialyzer({no_match, [authorize/1, api_key_authorize/3]}).
|
-dialyzer({no_match, [authorize/1, api_key_authorize/3]}).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
authorize(Req) ->
|
authorize(Req) ->
|
||||||
|
|
|
@ -37,10 +37,11 @@ log(#{code := Code, method := Method} = Meta, Req) ->
|
||||||
?AUDIT(level(Method, Code), log_meta(Meta, Req)).
|
?AUDIT(level(Method, Code), log_meta(Meta, Req)).
|
||||||
|
|
||||||
log_meta(Meta, Req) ->
|
log_meta(Meta, Req) ->
|
||||||
#{operation_id := OperationId} = Meta,
|
#{operation_id := OperationId, method := Method} = Meta,
|
||||||
case
|
case
|
||||||
lists:member(OperationId, ?HIGH_FREQUENCY_REQUESTS) andalso
|
Method =:= get orelse
|
||||||
ignore_high_frequency_request()
|
(lists:member(OperationId, ?HIGH_FREQUENCY_REQUESTS) andalso
|
||||||
|
ignore_high_frequency_request())
|
||||||
of
|
of
|
||||||
true ->
|
true ->
|
||||||
undefined;
|
undefined;
|
||||||
|
@ -53,7 +54,7 @@ log_meta(Meta, Req) ->
|
||||||
source_ip => source_ip(Req),
|
source_ip => source_ip(Req),
|
||||||
operation_type => operation_type(Meta),
|
operation_type => operation_type(Meta),
|
||||||
%% method for http filter api.
|
%% method for http filter api.
|
||||||
http_method => maps:get(method, Meta),
|
http_method => Method,
|
||||||
http_request => http_request(Meta),
|
http_request => http_request(Meta),
|
||||||
http_status_code => maps:get(code, Meta),
|
http_status_code => maps:get(code, Meta),
|
||||||
operation_result => operation_result(Meta),
|
operation_result => operation_result(Meta),
|
||||||
|
|
|
@ -47,10 +47,6 @@ post_boot() ->
|
||||||
ok = ensure_apps_started(),
|
ok = ensure_apps_started(),
|
||||||
ok = print_vsn(),
|
ok = print_vsn(),
|
||||||
ok = start_autocluster(),
|
ok = start_autocluster(),
|
||||||
?AUDIT(alert, #{
|
|
||||||
event => "emqx_start",
|
|
||||||
from => event
|
|
||||||
}),
|
|
||||||
ignore.
|
ignore.
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
|
@ -68,8 +68,11 @@ graceful() ->
|
||||||
%% @doc Shutdown the Erlang VM and wait indefinitely.
|
%% @doc Shutdown the Erlang VM and wait indefinitely.
|
||||||
graceful_wait() ->
|
graceful_wait() ->
|
||||||
?AUDIT(alert, #{
|
?AUDIT(alert, #{
|
||||||
event => "emqx_gracefully_stop",
|
cmd => emqx,
|
||||||
from => event
|
args => ["stop"],
|
||||||
|
version => emqx_release:version(),
|
||||||
|
from => cli,
|
||||||
|
duration_ms => element(1, erlang:statistics(wall_clock))
|
||||||
}),
|
}),
|
||||||
ok = graceful(),
|
ok = graceful(),
|
||||||
exit_loop().
|
exit_loop().
|
||||||
|
|
|
@ -16,14 +16,13 @@ filter_from.desc:
|
||||||
`dashboard`: Dashboard request logs, requiring the use of a jwt_token.
|
`dashboard`: Dashboard request logs, requiring the use of a jwt_token.
|
||||||
`rest_api`: API KEY request logs.
|
`rest_api`: API KEY request logs.
|
||||||
`cli`: The emqx command line logs.
|
`cli`: The emqx command line logs.
|
||||||
`erlang_console`: The emqx remote_console run function logs.
|
`erlang_console`: The emqx remote_console run function logs."""
|
||||||
`event`: Logs related to events such as emqx_start, emqx_gracefully_stop, audit_enabled, and audit_disabled."""
|
|
||||||
|
|
||||||
filter_source.desc:
|
filter_source.desc:
|
||||||
""""Filter logs based on source, Possible values are:
|
""""Filter logs based on source, Possible values are:
|
||||||
The login username when logs are generated from the dashboard.
|
The login username when logs are generated from the dashboard.
|
||||||
The API Keys when logs are generated from the REST API.
|
The API Keys when logs are generated from the REST API.
|
||||||
empty string when logs are generated from CLI, Erlang console, or an event."""
|
empty string when logs are generated from CLI, Erlang console."""
|
||||||
|
|
||||||
filter_source_ip.desc:
|
filter_source_ip.desc:
|
||||||
"Filter logs based on source ip when logs are generated from dashboard and REST API."
|
"Filter logs based on source ip when logs are generated from dashboard and REST API."
|
||||||
|
|
|
@ -726,7 +726,8 @@ audit_handler_level.label:
|
||||||
"""Log Level"""
|
"""Log Level"""
|
||||||
|
|
||||||
audit_log_max_filter_limit.desc:
|
audit_log_max_filter_limit.desc:
|
||||||
"""Store the latest N log entries in a database for allow `/audit` HTTP API to filter and retrieval of log data."""
|
"""Store the latest N log entries in a database for allow `/audit` HTTP API to filter and retrieval of log data.
|
||||||
|
The interval for purging redundant log records is maintained within a range of 10~20 seconds."""
|
||||||
|
|
||||||
audit_log_max_filter_limit.label:
|
audit_log_max_filter_limit.label:
|
||||||
"""Max Filter Limit"""
|
"""Max Filter Limit"""
|
||||||
|
|
Loading…
Reference in New Issue