diff --git a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl index 252da8c0f..ad7112707 100644 --- a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl @@ -53,6 +53,7 @@ -ifdef(TEST). -export([ log_file/2 , create_table/0 + , find_closest_time/2 ]). -endif. @@ -288,15 +289,15 @@ get_enable_trace() -> find_closest_time(Traces, Now) -> Sec = lists:foldl( - fun(#?TRACE{start_at = Start, end_at = End}, Closest) - when Start >= Now andalso Now < End -> %% running - min(End - Now, Closest); - (#?TRACE{start_at = Start}, Closest) when Start < Now -> %% waiting - min(Now - Start, Closest); - (_, Closest) -> Closest %% finished + fun(#?TRACE{start_at = Start, end_at = End, enable = true}, Closest) -> + min(closest(End, Now, Closest), closest(Start, Now, Closest)); + (_, Closest) -> Closest end, 60 * 15, Traces), timer:seconds(Sec). +closest(Time, Now, Closest) when Now >= Time -> Closest; +closest(Time, Now, Closest) -> min(Time - Now, Closest). + disable_finished([]) -> ok; disable_finished(Traces) -> transaction(fun() -> diff --git a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl index d2bca542b..0e298698e 100644 --- a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl +++ b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl @@ -135,7 +135,7 @@ stream_log_file(#{name := Name}, Params) -> {ok, Node} -> Position = binary_to_integer(Position0), Bytes = binary_to_integer(Bytes0), - case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of + case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of {ok, Bin} -> Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes}, {ok, #{meta => Meta, items => Bin}}; @@ -143,7 +143,7 @@ stream_log_file(#{name := Name}, Params) -> Meta = #{<<"position">> => Size, <<"bytes">> => Bytes}, {ok, #{meta => Meta, items => <<"">>}}; {error, Reason} -> - logger:log(error, "read_file_failed by ~p", [{Node, Name, Reason, Position, Bytes}]), + logger:log(error, "read_file_failed ~p", [{Node, Name, Reason, Position, Bytes}]), {error, Reason}; {badrpc, nodedown} -> {error, "BadRpc node down"} @@ -174,21 +174,24 @@ read_trace_file(Name, Position, Limit) -> end. read_file(Path, Offset, Bytes) -> - {ok, IoDevice} = file:open(Path, [read, raw, binary]), - try - _ = case Offset of - 0 -> ok; - _ -> file:position(IoDevice, {bof, Offset}) - end, - case file:read(IoDevice, Bytes) of - {ok, Bin} -> {ok, Bin}; - {error, Reason} -> {error, Reason}; - eof -> - {ok, #file_info{size = Size}} = file:read_file_info(IoDevice), - {eof, Size} - end - after - file:close(IoDevice) + case file:open(Path, [read, raw, binary]) of + {ok, IoDevice} -> + try + _ = case Offset of + 0 -> ok; + _ -> file:position(IoDevice, {bof, Offset}) + end, + case file:read(IoDevice, Bytes) of + {ok, Bin} -> {ok, Bin}; + {error, Reason} -> {error, Reason}; + eof -> + {ok, #file_info{size = Size}} = file:read_file_info(IoDevice), + {eof, Size} + end + after + file:close(IoDevice) + end; + {error, Reason} -> {error, Reason} end. to_node(Node) -> @@ -204,7 +207,6 @@ collect_file_size(Nodes, FileName, AllFiles) -> end, #{}, Nodes). status(false, _Start, _End, _Now) -> <<"stopped">>; -%% asynchronously create trace, we should wait 1 seconds -status(true, Start, _End, Now) when Now < Start + 2 -> <<"waiting">>; +status(true, Start, _End, Now) when Now < Start -> <<"waiting">>; status(true, _Start, End, Now) when Now >= End -> <<"stopped">>; status(true, _Start, _End, _Now) -> <<"running">>. diff --git a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl index 4f33e5b7f..e20fb6957 100644 --- a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl @@ -196,9 +196,9 @@ t_update_enable(_Config) -> t_load_state(_Config) -> Now = erlang:system_time(second), - Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>}, - {<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)}, - {<<"end_at">>, to_rfc3339(Now + 2)}], + Running = #{name => <<"Running">>, type => <<"topic">>, + topic => <<"/x/y/1">>, start_at => to_rfc3339(Now - 1), + end_at => to_rfc3339(Now + 2)}, Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)}, {<<"end_at">>, to_rfc3339(Now + 8)}], @@ -300,6 +300,30 @@ t_download_log(_Config) -> ok = emqtt:disconnect(Client), ok. +t_find_closed_time(_Config) -> + DefaultMs = 60 * 15000, + Now = erlang:system_time(second), + Traces2 = [], + ?assertEqual(DefaultMs, emqx_trace:find_closest_time(Traces2, Now)), + Traces3 = [#emqx_trace{name = <<"disable">>, start_at = Now + 1, + end_at = Now + 2, enable = false}], + ?assertEqual(DefaultMs, emqx_trace:find_closest_time(Traces3, Now)), + Traces4 = [#emqx_trace{name = <<"running">>, start_at = Now, end_at = Now + 10, enable = true}], + ?assertEqual(10000, emqx_trace:find_closest_time(Traces4, Now)), + Traces5 = [#emqx_trace{name = <<"waiting">>, start_at = Now + 2, + end_at = Now + 10, enable = true}], + ?assertEqual(2000, emqx_trace:find_closest_time(Traces5, Now)), + Traces = [ + #emqx_trace{name = <<"waiting">>, start_at = Now + 1, end_at = Now + 2, enable = true}, + #emqx_trace{name = <<"running0">>, start_at = Now, end_at = Now + 5, enable = true}, + #emqx_trace{name = <<"running1">>, start_at = Now - 1, end_at = Now + 1, enable = true}, + #emqx_trace{name = <<"finished">>, start_at = Now - 2, end_at = Now - 1, enable = true}, + #emqx_trace{name = <<"waiting">>, start_at = Now + 1, end_at = Now + 1, enable = true}, + #emqx_trace{name = <<"stopped">>, start_at = Now, end_at = Now + 10, enable = false} + ], + ?assertEqual(1000, emqx_trace:find_closest_time(Traces, Now)), + ok. + to_rfc3339(Second) -> list_to_binary(calendar:system_time_to_rfc3339(Second)).