refactor(emqx_trace): Extract transactions to DL module

This commit is contained in:
ieQu1 2022-08-18 11:53:59 +02:00
parent 3906819dda
commit cdde392d6e
3 changed files with 147 additions and 74 deletions

View File

@ -51,10 +51,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(TRACE, ?MODULE). -include("emqx_trace.hrl").
-define(SHARD, ?COMMON_SHARD).
-define(MAX_SIZE, 30).
-define(OWN_KEYS, [level, filters, filter_default, handlers]).
-ifdef(TEST). -ifdef(TEST).
-export([ -export([
@ -66,15 +63,6 @@
-export_type([ip_address/0]). -export_type([ip_address/0]).
-type ip_address() :: string(). -type ip_address() :: string().
-record(?TRACE, {
name :: binary() | undefined | '_',
type :: clientid | topic | ip_address | undefined | '_',
filter :: emqx_types:topic() | emqx_types:clientid() | ip_address() | undefined | '_',
enable = true :: boolean() | '_',
start_at :: integer() | undefined | '_',
end_at :: integer() | undefined | '_'
}).
publish(#message{topic = <<"$SYS/", _/binary>>}) -> publish(#message{topic = <<"$SYS/", _/binary>>}) ->
ignore; ignore;
publish(#message{from = From, topic = Topic, payload = Payload}) when publish(#message{from = From, topic = Topic, payload = Payload}) when
@ -172,13 +160,7 @@ create(Trace) ->
-spec delete(Name :: binary()) -> ok | {error, not_found}. -spec delete(Name :: binary()) -> ok | {error, not_found}.
delete(Name) -> delete(Name) ->
Tran = fun() -> transaction(fun emqx_trace_dl:delete/1, [Name]).
case mnesia:read(?TRACE, Name) of
[_] -> mnesia:delete(?TRACE, Name, write);
[] -> mnesia:abort(not_found)
end
end,
transaction(Tran).
-spec clear() -> ok | {error, Reason :: term()}. -spec clear() -> ok | {error, Reason :: term()}.
clear() -> clear() ->
@ -190,20 +172,7 @@ clear() ->
-spec update(Name :: binary(), Enable :: boolean()) -> -spec update(Name :: binary(), Enable :: boolean()) ->
ok | {error, not_found | finished}. ok | {error, not_found | finished}.
update(Name, Enable) -> update(Name, Enable) ->
Tran = fun() -> transaction(fun emqx_trace_dl:update/2, [Name, Enable]).
case mnesia:read(?TRACE, Name) of
[] ->
mnesia:abort(not_found);
[#?TRACE{enable = Enable}] ->
ok;
[Rec] ->
case erlang:system_time(second) >= Rec#?TRACE.end_at of
false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write);
true -> mnesia:abort(finished)
end
end
end,
transaction(Tran).
check() -> check() ->
gen_server:call(?MODULE, check). gen_server:call(?MODULE, check).
@ -211,13 +180,7 @@ check() ->
-spec get_trace_filename(Name :: binary()) -> -spec get_trace_filename(Name :: binary()) ->
{ok, FileName :: string()} | {error, not_found}. {ok, FileName :: string()} | {error, not_found}.
get_trace_filename(Name) -> get_trace_filename(Name) ->
Tran = fun() -> transaction(fun emqx_trace_dl:get_trace_filename/1, [Name]).
case mnesia:read(?TRACE, Name, read) of
[] -> mnesia:abort(not_found);
[#?TRACE{start_at = Start}] -> {ok, filename(Name, Start)}
end
end,
transaction(Tran).
-spec trace_file(File :: file:filename_all()) -> -spec trace_file(File :: file:filename_all()) ->
{ok, Node :: list(), Binary :: binary()} {ok, Node :: list(), Binary :: binary()}
@ -309,23 +272,7 @@ code_change(_, State, _Extra) ->
{ok, State}. {ok, State}.
insert_new_trace(Trace) -> insert_new_trace(Trace) ->
Tran = fun() -> transaction(fun emqx_trace_dl:insert_new_trace/1, [Trace]).
case mnesia:read(?TRACE, Trace#?TRACE.name) of
[] ->
#?TRACE{start_at = StartAt, type = Type, filter = Filter} = Trace,
Match = #?TRACE{_ = '_', start_at = StartAt, type = Type, filter = Filter},
case mnesia:match_object(?TRACE, Match, read) of
[] ->
ok = mnesia:write(?TRACE, Trace, write),
{ok, Trace};
[#?TRACE{name = Name}] ->
mnesia:abort({duplicate_condition, Name})
end;
[#?TRACE{name = Name}] ->
mnesia:abort({already_existed, Name})
end
end,
transaction(Tran).
update_trace(Traces) -> update_trace(Traces) ->
Now = erlang:system_time(second), Now = erlang:system_time(second),
@ -347,9 +294,7 @@ stop_all_trace_handler() ->
get_enabled_trace() -> get_enabled_trace() ->
{atomic, Traces} = {atomic, Traces} =
mria:ro_transaction(?SHARD, fun() -> mria:ro_transaction(?SHARD, fun emqx_trace_dl:get_enabled_trace/0),
mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read)
end),
Traces. Traces.
find_closest_time(Traces, Now) -> find_closest_time(Traces, Now) ->
@ -372,17 +317,7 @@ closest(Time, Now, Closest) -> min(Time - Now, Closest).
disable_finished([]) -> disable_finished([]) ->
ok; ok;
disable_finished(Traces) -> disable_finished(Traces) ->
transaction(fun() -> transaction(fun emqx_trace_dl:delete_finished/1, [Traces]).
lists:map(
fun(#?TRACE{name = Name}) ->
case mnesia:read(?TRACE, Name, write) of
[] -> ok;
[Trace] -> mnesia:write(?TRACE, Trace#?TRACE{enable = false}, write)
end
end,
Traces
)
end).
start_trace(Traces, Started0) -> start_trace(Traces, Started0) ->
Started = lists:map(fun(#{name := Name}) -> Name end, Started0), Started = lists:map(fun(#{name := Name}) -> Name end, Started0),
@ -586,8 +521,8 @@ filename(Name, Start) ->
[Time, _] = string:split(calendar:system_time_to_rfc3339(Start), "T", leading), [Time, _] = string:split(calendar:system_time_to_rfc3339(Start), "T", leading),
lists:flatten(["trace_", binary_to_list(Name), "_", Time, ".log"]). lists:flatten(["trace_", binary_to_list(Name), "_", Time, ".log"]).
transaction(Tran) -> transaction(Fun, Args) ->
case mria:transaction(?COMMON_SHARD, Tran) of case mria:transaction(?COMMON_SHARD, Fun, Args) of
{atomic, Res} -> Res; {atomic, Res} -> Res;
{aborted, Reason} -> {error, Reason} {aborted, Reason} -> {error, Reason}
end. end.

View File

@ -0,0 +1,35 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_TRACE_HRL).
-define(EMQX_TRACE_HRL, true).
-define(TRACE, emqx_trace).
-record(?TRACE, {
name :: binary() | undefined | '_',
type :: clientid | topic | ip_address | undefined | '_',
filter ::
emqx_types:topic() | emqx_types:clientid() | emqx_trace:ip_address() | undefined | '_',
enable = true :: boolean() | '_',
start_at :: integer() | undefined | '_',
end_at :: integer() | undefined | '_'
}).
-define(SHARD, ?COMMON_SHARD).
-define(MAX_SIZE, 30).
-define(OWN_KEYS, [level, filters, filter_default, handlers]).
-endif.

View File

@ -0,0 +1,103 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% Data layer for emqx_trace
-module(emqx_trace_dl).
%% API:
-export([
update/2,
insert_new_trace/1,
delete/1,
get_trace_filename/1,
delete_finished/1,
get_enabled_trace/0
]).
-include("emqx_trace.hrl").
%%================================================================================
%% API funcions
%%================================================================================
%% Introduced in 5.0
-spec update(Name :: binary(), Enable :: boolean()) ->
ok.
update(Name, Enable) ->
case mnesia:read(?TRACE, Name) of
[] ->
mnesia:abort(not_found);
[#?TRACE{enable = Enable}] ->
ok;
[Rec] ->
case erlang:system_time(second) >= Rec#?TRACE.end_at of
false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write);
true -> mnesia:abort(finished)
end
end.
%% Introduced in 5.0
insert_new_trace(Trace) ->
case mnesia:read(?TRACE, Trace#?TRACE.name) of
[] ->
#?TRACE{start_at = StartAt, type = Type, filter = Filter} = Trace,
Match = #?TRACE{_ = '_', start_at = StartAt, type = Type, filter = Filter},
case mnesia:match_object(?TRACE, Match, read) of
[] ->
ok = mnesia:write(?TRACE, Trace, write),
{ok, Trace};
[#?TRACE{name = Name}] ->
mnesia:abort({duplicate_condition, Name})
end;
[#?TRACE{name = Name}] ->
mnesia:abort({already_existed, Name})
end.
%% Introduced in 5.0
-spec delete(Name :: binary()) -> ok.
delete(Name) ->
case mnesia:read(?TRACE, Name) of
[_] -> mnesia:delete(?TRACE, Name, write);
[] -> mnesia:abort(not_found)
end.
%% Introduced in 5.0
-spec get_trace_filename(Name :: binary()) -> {ok, string()}.
get_trace_filename(Name) ->
case mnesia:read(?TRACE, Name, read) of
[] -> mnesia:abort(not_found);
[#?TRACE{start_at = Start}] -> {ok, emqx_trace:filename(Name, Start)}
end.
%% Introduced in 5.0
delete_finished(Traces) ->
lists:map(
fun(#?TRACE{name = Name}) ->
case mnesia:read(?TRACE, Name, write) of
[] -> ok;
[Trace] -> mnesia:write(?TRACE, Trace#?TRACE{enable = false}, write)
end
end,
Traces
).
%% Introduced in 5.0
get_enabled_trace() ->
mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read).
%%================================================================================
%% Internal functions
%%================================================================================