fix: trace handler start time not correct
This commit is contained in:
parent
26fa06b071
commit
be6160f5bd
|
@ -53,6 +53,7 @@
|
|||
-ifdef(TEST).
|
||||
-export([ log_file/2
|
||||
, create_table/0
|
||||
, find_closest_time/2
|
||||
]).
|
||||
-endif.
|
||||
|
||||
|
@ -288,15 +289,15 @@ get_enable_trace() ->
|
|||
find_closest_time(Traces, Now) ->
|
||||
Sec =
|
||||
lists:foldl(
|
||||
fun(#?TRACE{start_at = Start, end_at = End}, Closest)
|
||||
when Start >= Now andalso Now < End -> %% running
|
||||
min(End - Now, Closest);
|
||||
(#?TRACE{start_at = Start}, Closest) when Start < Now -> %% waiting
|
||||
min(Now - Start, Closest);
|
||||
(_, Closest) -> Closest %% finished
|
||||
fun(#?TRACE{start_at = Start, end_at = End, enable = true}, Closest) ->
|
||||
min(closest(End, Now, Closest), closest(Start, Now, Closest));
|
||||
(_, Closest) -> Closest
|
||||
end, 60 * 15, Traces),
|
||||
timer:seconds(Sec).
|
||||
|
||||
closest(Time, Now, Closest) when Now >= Time -> Closest;
|
||||
closest(Time, Now, Closest) -> min(Time - Now, Closest).
|
||||
|
||||
disable_finished([]) -> ok;
|
||||
disable_finished(Traces) ->
|
||||
transaction(fun() ->
|
||||
|
|
|
@ -135,7 +135,7 @@ stream_log_file(#{name := Name}, Params) ->
|
|||
{ok, Node} ->
|
||||
Position = binary_to_integer(Position0),
|
||||
Bytes = binary_to_integer(Bytes0),
|
||||
case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
|
||||
case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
|
||||
{ok, Bin} ->
|
||||
Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
|
||||
{ok, #{meta => Meta, items => Bin}};
|
||||
|
@ -143,7 +143,7 @@ stream_log_file(#{name := Name}, Params) ->
|
|||
Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
|
||||
{ok, #{meta => Meta, items => <<"">>}};
|
||||
{error, Reason} ->
|
||||
logger:log(error, "read_file_failed by ~p", [{Node, Name, Reason, Position, Bytes}]),
|
||||
logger:log(error, "read_file_failed ~p", [{Node, Name, Reason, Position, Bytes}]),
|
||||
{error, Reason};
|
||||
{badrpc, nodedown} ->
|
||||
{error, "BadRpc node down"}
|
||||
|
@ -174,21 +174,24 @@ read_trace_file(Name, Position, Limit) ->
|
|||
end.
|
||||
|
||||
read_file(Path, Offset, Bytes) ->
|
||||
{ok, IoDevice} = file:open(Path, [read, raw, binary]),
|
||||
try
|
||||
_ = case Offset of
|
||||
0 -> ok;
|
||||
_ -> file:position(IoDevice, {bof, Offset})
|
||||
end,
|
||||
case file:read(IoDevice, Bytes) of
|
||||
{ok, Bin} -> {ok, Bin};
|
||||
{error, Reason} -> {error, Reason};
|
||||
eof ->
|
||||
{ok, #file_info{size = Size}} = file:read_file_info(IoDevice),
|
||||
{eof, Size}
|
||||
end
|
||||
after
|
||||
file:close(IoDevice)
|
||||
case file:open(Path, [read, raw, binary]) of
|
||||
{ok, IoDevice} ->
|
||||
try
|
||||
_ = case Offset of
|
||||
0 -> ok;
|
||||
_ -> file:position(IoDevice, {bof, Offset})
|
||||
end,
|
||||
case file:read(IoDevice, Bytes) of
|
||||
{ok, Bin} -> {ok, Bin};
|
||||
{error, Reason} -> {error, Reason};
|
||||
eof ->
|
||||
{ok, #file_info{size = Size}} = file:read_file_info(IoDevice),
|
||||
{eof, Size}
|
||||
end
|
||||
after
|
||||
file:close(IoDevice)
|
||||
end;
|
||||
{error, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
to_node(Node) ->
|
||||
|
@ -204,7 +207,6 @@ collect_file_size(Nodes, FileName, AllFiles) ->
|
|||
end, #{}, Nodes).
|
||||
|
||||
status(false, _Start, _End, _Now) -> <<"stopped">>;
|
||||
%% asynchronously create trace, we should wait 1 seconds
|
||||
status(true, Start, _End, Now) when Now < Start + 2 -> <<"waiting">>;
|
||||
status(true, Start, _End, Now) when Now < Start -> <<"waiting">>;
|
||||
status(true, _Start, End, Now) when Now >= End -> <<"stopped">>;
|
||||
status(true, _Start, _End, _Now) -> <<"running">>.
|
||||
|
|
|
@ -196,9 +196,9 @@ t_update_enable(_Config) ->
|
|||
|
||||
t_load_state(_Config) ->
|
||||
Now = erlang:system_time(second),
|
||||
Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>},
|
||||
{<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)},
|
||||
{<<"end_at">>, to_rfc3339(Now + 2)}],
|
||||
Running = #{name => <<"Running">>, type => <<"topic">>,
|
||||
topic => <<"/x/y/1">>, start_at => to_rfc3339(Now - 1),
|
||||
end_at => to_rfc3339(Now + 2)},
|
||||
Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, <<"topic">>},
|
||||
{<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)},
|
||||
{<<"end_at">>, to_rfc3339(Now + 8)}],
|
||||
|
@ -300,6 +300,30 @@ t_download_log(_Config) ->
|
|||
ok = emqtt:disconnect(Client),
|
||||
ok.
|
||||
|
||||
t_find_closed_time(_Config) ->
|
||||
DefaultMs = 60 * 15000,
|
||||
Now = erlang:system_time(second),
|
||||
Traces2 = [],
|
||||
?assertEqual(DefaultMs, emqx_trace:find_closest_time(Traces2, Now)),
|
||||
Traces3 = [#emqx_trace{name = <<"disable">>, start_at = Now + 1,
|
||||
end_at = Now + 2, enable = false}],
|
||||
?assertEqual(DefaultMs, emqx_trace:find_closest_time(Traces3, Now)),
|
||||
Traces4 = [#emqx_trace{name = <<"running">>, start_at = Now, end_at = Now + 10, enable = true}],
|
||||
?assertEqual(10000, emqx_trace:find_closest_time(Traces4, Now)),
|
||||
Traces5 = [#emqx_trace{name = <<"waiting">>, start_at = Now + 2,
|
||||
end_at = Now + 10, enable = true}],
|
||||
?assertEqual(2000, emqx_trace:find_closest_time(Traces5, Now)),
|
||||
Traces = [
|
||||
#emqx_trace{name = <<"waiting">>, start_at = Now + 1, end_at = Now + 2, enable = true},
|
||||
#emqx_trace{name = <<"running0">>, start_at = Now, end_at = Now + 5, enable = true},
|
||||
#emqx_trace{name = <<"running1">>, start_at = Now - 1, end_at = Now + 1, enable = true},
|
||||
#emqx_trace{name = <<"finished">>, start_at = Now - 2, end_at = Now - 1, enable = true},
|
||||
#emqx_trace{name = <<"waiting">>, start_at = Now + 1, end_at = Now + 1, enable = true},
|
||||
#emqx_trace{name = <<"stopped">>, start_at = Now, end_at = Now + 10, enable = false}
|
||||
],
|
||||
?assertEqual(1000, emqx_trace:find_closest_time(Traces, Now)),
|
||||
ok.
|
||||
|
||||
to_rfc3339(Second) ->
|
||||
list_to_binary(calendar:system_time_to_rfc3339(Second)).
|
||||
|
||||
|
|
Loading…
Reference in New Issue