Merge pull request #6391 from zhongwencool/trace-bug-fix

fix: trace handler start time not correct
This commit is contained in:
zhongwencool 2021-12-07 22:58:55 +08:00 committed by GitHub
commit 9965288947
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 28 deletions

View File

@ -53,6 +53,7 @@
-ifdef(TEST). -ifdef(TEST).
-export([ log_file/2 -export([ log_file/2
, create_table/0 , create_table/0
, find_closest_time/2
]). ]).
-endif. -endif.
@ -288,15 +289,15 @@ get_enable_trace() ->
find_closest_time(Traces, Now) -> find_closest_time(Traces, Now) ->
Sec = Sec =
lists:foldl( lists:foldl(
fun(#?TRACE{start_at = Start, end_at = End}, Closest) fun(#?TRACE{start_at = Start, end_at = End, enable = true}, Closest) ->
when Start >= Now andalso Now < End -> %% running min(closest(End, Now, Closest), closest(Start, Now, Closest));
min(End - Now, Closest); (_, Closest) -> Closest
(#?TRACE{start_at = Start}, Closest) when Start < Now -> %% waiting
min(Now - Start, Closest);
(_, Closest) -> Closest %% finished
end, 60 * 15, Traces), end, 60 * 15, Traces),
timer:seconds(Sec). timer:seconds(Sec).
closest(Time, Now, Closest) when Now >= Time -> Closest;
closest(Time, Now, Closest) -> min(Time - Now, Closest).
disable_finished([]) -> ok; disable_finished([]) -> ok;
disable_finished(Traces) -> disable_finished(Traces) ->
transaction(fun() -> transaction(fun() ->

View File

@ -135,7 +135,7 @@ stream_log_file(#{name := Name}, Params) ->
{ok, Node} -> {ok, Node} ->
Position = binary_to_integer(Position0), Position = binary_to_integer(Position0),
Bytes = binary_to_integer(Bytes0), Bytes = binary_to_integer(Bytes0),
case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
{ok, Bin} -> {ok, Bin} ->
Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes}, Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
{ok, #{meta => Meta, items => Bin}}; {ok, #{meta => Meta, items => Bin}};
@ -143,7 +143,7 @@ stream_log_file(#{name := Name}, Params) ->
Meta = #{<<"position">> => Size, <<"bytes">> => Bytes}, Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
{ok, #{meta => Meta, items => <<"">>}}; {ok, #{meta => Meta, items => <<"">>}};
{error, Reason} -> {error, Reason} ->
logger:log(error, "read_file_failed by ~p", [{Node, Name, Reason, Position, Bytes}]), logger:log(error, "read_file_failed ~p", [{Node, Name, Reason, Position, Bytes}]),
{error, Reason}; {error, Reason};
{badrpc, nodedown} -> {badrpc, nodedown} ->
{error, "BadRpc node down"} {error, "BadRpc node down"}
@ -174,21 +174,24 @@ read_trace_file(Name, Position, Limit) ->
end. end.
read_file(Path, Offset, Bytes) -> read_file(Path, Offset, Bytes) ->
{ok, IoDevice} = file:open(Path, [read, raw, binary]), case file:open(Path, [read, raw, binary]) of
try {ok, IoDevice} ->
_ = case Offset of try
0 -> ok; _ = case Offset of
_ -> file:position(IoDevice, {bof, Offset}) 0 -> ok;
end, _ -> file:position(IoDevice, {bof, Offset})
case file:read(IoDevice, Bytes) of end,
{ok, Bin} -> {ok, Bin}; case file:read(IoDevice, Bytes) of
{error, Reason} -> {error, Reason}; {ok, Bin} -> {ok, Bin};
eof -> {error, Reason} -> {error, Reason};
{ok, #file_info{size = Size}} = file:read_file_info(IoDevice), eof ->
{eof, Size} {ok, #file_info{size = Size}} = file:read_file_info(IoDevice),
end {eof, Size}
after end
file:close(IoDevice) after
file:close(IoDevice)
end;
{error, Reason} -> {error, Reason}
end. end.
to_node(Node) -> to_node(Node) ->
@ -204,7 +207,6 @@ collect_file_size(Nodes, FileName, AllFiles) ->
end, #{}, Nodes). end, #{}, Nodes).
status(false, _Start, _End, _Now) -> <<"stopped">>; status(false, _Start, _End, _Now) -> <<"stopped">>;
%% asynchronously create trace, we should wait 1 seconds status(true, Start, _End, Now) when Now < Start -> <<"waiting">>;
status(true, Start, _End, Now) when Now < Start + 2 -> <<"waiting">>;
status(true, _Start, End, Now) when Now >= End -> <<"stopped">>; status(true, _Start, End, Now) when Now >= End -> <<"stopped">>;
status(true, _Start, _End, _Now) -> <<"running">>. status(true, _Start, _End, _Now) -> <<"running">>.

View File

@ -196,9 +196,9 @@ t_update_enable(_Config) ->
t_load_state(_Config) -> t_load_state(_Config) ->
Now = erlang:system_time(second), Now = erlang:system_time(second),
Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>}, Running = #{name => <<"Running">>, type => <<"topic">>,
{<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)}, topic => <<"/x/y/1">>, start_at => to_rfc3339(Now - 1),
{<<"end_at">>, to_rfc3339(Now + 2)}], end_at => to_rfc3339(Now + 2)},
Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, <<"topic">>}, Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, <<"topic">>},
{<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)}, {<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)},
{<<"end_at">>, to_rfc3339(Now + 8)}], {<<"end_at">>, to_rfc3339(Now + 8)}],
@ -300,6 +300,30 @@ t_download_log(_Config) ->
ok = emqtt:disconnect(Client), ok = emqtt:disconnect(Client),
ok. ok.
t_find_closed_time(_Config) ->
DefaultMs = 60 * 15000,
Now = erlang:system_time(second),
Traces2 = [],
?assertEqual(DefaultMs, emqx_trace:find_closest_time(Traces2, Now)),
Traces3 = [#emqx_trace{name = <<"disable">>, start_at = Now + 1,
end_at = Now + 2, enable = false}],
?assertEqual(DefaultMs, emqx_trace:find_closest_time(Traces3, Now)),
Traces4 = [#emqx_trace{name = <<"running">>, start_at = Now, end_at = Now + 10, enable = true}],
?assertEqual(10000, emqx_trace:find_closest_time(Traces4, Now)),
Traces5 = [#emqx_trace{name = <<"waiting">>, start_at = Now + 2,
end_at = Now + 10, enable = true}],
?assertEqual(2000, emqx_trace:find_closest_time(Traces5, Now)),
Traces = [
#emqx_trace{name = <<"waiting">>, start_at = Now + 1, end_at = Now + 2, enable = true},
#emqx_trace{name = <<"running0">>, start_at = Now, end_at = Now + 5, enable = true},
#emqx_trace{name = <<"running1">>, start_at = Now - 1, end_at = Now + 1, enable = true},
#emqx_trace{name = <<"finished">>, start_at = Now - 2, end_at = Now - 1, enable = true},
#emqx_trace{name = <<"waiting">>, start_at = Now + 1, end_at = Now + 1, enable = true},
#emqx_trace{name = <<"stopped">>, start_at = Now, end_at = Now + 10, enable = false}
],
?assertEqual(1000, emqx_trace:find_closest_time(Traces, Now)),
ok.
to_rfc3339(Second) -> to_rfc3339(Second) ->
list_to_binary(calendar:system_time_to_rfc3339(Second)). list_to_binary(calendar:system_time_to_rfc3339(Second)).