feat(schema): schema support epoch_second, epoch_millisecond type.

This commit is contained in:
zhongwencool 2022-02-14 10:00:36 +08:00
parent 9d5a7ead0c
commit 498434826a
14 changed files with 151 additions and 175 deletions

View File

@ -0,0 +1,58 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_datetime).
-include_lib("typerefl/include/types.hrl").
%% API
-export([ to_epoch_millisecond/1
, to_epoch_second/1
]).
-export([ epoch_to_rfc3339/1
, epoch_to_rfc3339/2
]).
-reflect_type([ epoch_millisecond/0
, epoch_second/0
]).
-type epoch_second() :: integer().
-type epoch_millisecond() :: integer().
-typerefl_from_string({epoch_second/0, ?MODULE, to_epoch_second}).
-typerefl_from_string({epoch_millisecond/0, ?MODULE, to_epoch_millisecond}).
to_epoch_second(DateTime) ->
to_epoch(DateTime, second).
to_epoch_millisecond(DateTime) ->
to_epoch(DateTime, millisecond).
to_epoch(DateTime, Unit) ->
try
case string:to_integer(DateTime) of
{Epoch, []} when Epoch >= 0 -> {ok, Epoch};
{_Epoch, []} -> {error, bad_epoch};
_ -> {ok, calendar:rfc3339_to_system_time(DateTime, [{unit, Unit}])}
end
catch error: _ ->
{error, bad_rfc3339_timestamp}
end.
epoch_to_rfc3339(TimeStamp) ->
epoch_to_rfc3339(TimeStamp, millisecond).
epoch_to_rfc3339(TimeStamp, Unit) when is_integer(TimeStamp) ->
list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])).

View File

@ -37,7 +37,6 @@
-type bar_separated_list() :: list(). -type bar_separated_list() :: list().
-type ip_port() :: tuple(). -type ip_port() :: tuple().
-type cipher() :: map(). -type cipher() :: map().
-type rfc3339_system_time() :: integer().
-type qos() :: integer(). -type qos() :: integer().
-typerefl_from_string({qos/0, emqx_schema, to_qos}). -typerefl_from_string({qos/0, emqx_schema, to_qos}).
@ -52,7 +51,6 @@
-typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}). -typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}).
-typerefl_from_string({cipher/0, emqx_schema, to_erl_cipher_suite}). -typerefl_from_string({cipher/0, emqx_schema, to_erl_cipher_suite}).
-typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}). -typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}).
-typerefl_from_string({rfc3339_system_time/0, emqx_schema, rfc3339_to_system_time}).
-export([ validate_heap_size/1 -export([ validate_heap_size/1
, parse_user_lookup_fun/1 , parse_user_lookup_fun/1
@ -65,8 +63,8 @@
to_percent/1, to_comma_separated_list/1, to_percent/1, to_comma_separated_list/1,
to_bar_separated_list/1, to_ip_port/1, to_bar_separated_list/1, to_ip_port/1,
to_erl_cipher_suite/1, to_qos/1, to_erl_cipher_suite/1, to_qos/1,
to_comma_separated_atoms/1, to_comma_separated_atoms/1
rfc3339_to_system_time/1]). ]).
-behaviour(hocon_schema). -behaviour(hocon_schema).
@ -74,8 +72,8 @@
bytesize/0, wordsize/0, percent/0, file/0, bytesize/0, wordsize/0, percent/0, file/0,
comma_separated_list/0, bar_separated_list/0, ip_port/0, comma_separated_list/0, bar_separated_list/0, ip_port/0,
cipher/0, qos/0, cipher/0, qos/0,
comma_separated_atoms/0, comma_separated_atoms/0
rfc3339_system_time/0]). ]).
-export([namespace/0, roots/0, roots/1, fields/1]). -export([namespace/0, roots/0, roots/1, fields/1]).
-export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]).
@ -1521,13 +1519,6 @@ to_comma_separated_list(Str) ->
to_comma_separated_atoms(Str) -> to_comma_separated_atoms(Str) ->
{ok, lists:map(fun to_atom/1, string:tokens(Str, ", "))}. {ok, lists:map(fun to_atom/1, string:tokens(Str, ", "))}.
rfc3339_to_system_time(DateTime) ->
try
{ok, calendar:rfc3339_to_system_time(DateTime, [{unit, second}])}
catch error: _ ->
{error, bad_rfc3339_timestamp}
end.
to_bar_separated_list(Str) -> to_bar_separated_list(Str) ->
{ok, string:tokens(Str, "| ")}. {ok, string:tokens(Str, "| ")}.

View File

@ -463,19 +463,15 @@ to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) ->
end; end;
to_trace(#{type := Type}, _Rec) -> {error, io_lib:format("required ~s field", [Type])}; to_trace(#{type := Type}, _Rec) -> {error, io_lib:format("required ~s field", [Type])};
to_trace(#{start_at := StartAt} = Trace, Rec) -> to_trace(#{start_at := StartAt} = Trace, Rec) ->
case to_system_second(StartAt) of {ok, Sec} = to_system_second(StartAt),
{ok, Sec} -> to_trace(maps:remove(start_at, Trace), Rec#?TRACE{start_at = Sec}); to_trace(maps:remove(start_at, Trace), Rec#?TRACE{start_at = Sec});
{error, Reason} -> {error, Reason}
end;
to_trace(#{end_at := EndAt} = Trace, Rec) -> to_trace(#{end_at := EndAt} = Trace, Rec) ->
Now = erlang:system_time(second), Now = erlang:system_time(second),
case to_system_second(EndAt) of case to_system_second(EndAt) of
{ok, Sec} when Sec > Now -> {ok, Sec} when Sec > Now ->
to_trace(maps:remove(end_at, Trace), Rec#?TRACE{end_at = Sec}); to_trace(maps:remove(end_at, Trace), Rec#?TRACE{end_at = Sec});
{ok, _Sec} -> {ok, _Sec} ->
{error, "end_at time has already passed"}; {error, "end_at time has already passed"}
{error, Reason} ->
{error, Reason}
end; end;
to_trace(_, Rec) -> {ok, Rec}. to_trace(_, Rec) -> {ok, Rec}.
@ -492,14 +488,9 @@ validate_ip_address(IP) ->
{error, Reason} -> {error, lists:flatten(io_lib:format("ip address: ~p", [Reason]))} {error, Reason} -> {error, lists:flatten(io_lib:format("ip address: ~p", [Reason]))}
end. end.
to_system_second(At) -> to_system_second(Sec) ->
try Now = erlang:system_time(second),
Sec = calendar:rfc3339_to_system_time(binary_to_list(At), [{unit, second}]), {ok, erlang:max(Now, Sec)}.
Now = erlang:system_time(second),
{ok, erlang:max(Now, Sec)}
catch error: {badmatch, _} ->
{error, ["The rfc3339 specification not satisfied: ", At]}
end.
zip_dir() -> zip_dir() ->
trace_dir() ++ "zip/". trace_dir() ++ "zip/".

View File

@ -50,8 +50,8 @@ end_per_testcase(_) ->
t_base_create_delete(_Config) -> t_base_create_delete(_Config) ->
Now = erlang:system_time(second), Now = erlang:system_time(second),
Start = to_rfc3339(Now), Start = Now,
End = to_rfc3339(Now + 30 * 60), End = Now + 30 * 60,
Name = <<"name1">>, Name = <<"name1">>,
ClientId = <<"test-device">>, ClientId = <<"test-device">>,
Trace = #{ Trace = #{
@ -115,25 +115,13 @@ t_create_failed(_Config) ->
{error, Reason2} = emqx_trace:create(InvalidTopic), {error, Reason2} = emqx_trace:create(InvalidTopic),
?assertEqual(<<"topic: #/#// invalid by function_clause">>, iolist_to_binary(Reason2)), ?assertEqual(<<"topic: #/#// invalid by function_clause">>, iolist_to_binary(Reason2)),
InvalidStart = [Name, {<<"type">>, topic}, {<<"topic">>, <<"/sys/">>}, {error, Reason4} = emqx_trace:create([Name, {<<"type">>, clientid}]),
{<<"start_at">>, <<"2021-12-3:12">>}], ?assertEqual(<<"required clientid field">>, iolist_to_binary(Reason4)),
{error, Reason3} = emqx_trace:create(InvalidStart),
?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
iolist_to_binary(Reason3)),
InvalidEnd = [Name, {<<"type">>, topic}, {<<"topic">>, <<"/sys/">>},
{<<"end_at">>, <<"2021-12-3:12">>}],
{error, Reason4} = emqx_trace:create(InvalidEnd),
?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>,
iolist_to_binary(Reason4)),
{error, Reason7} = emqx_trace:create([Name, {<<"type">>, clientid}]),
?assertEqual(<<"required clientid field">>, iolist_to_binary(Reason7)),
InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>}, InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>},
{<<"type">>, clientid}], {<<"type">>, clientid}],
{error, Reason9} = emqx_trace:create(InvalidPackets4), {error, Reason5} = emqx_trace:create(InvalidPackets4),
?assertEqual(<<"Name should be ^[A-Za-z]+[A-Za-z0-9-_]*$">>, iolist_to_binary(Reason9)), ?assertEqual(<<"Name should be ^[A-Za-z]+[A-Za-z0-9-_]*$">>, iolist_to_binary(Reason5)),
?assertEqual({error, "type=[topic,clientid,ip_address] required"}, ?assertEqual({error, "type=[topic,clientid,ip_address] required"},
emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"clientid">>, <<"good">>}])), emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"clientid">>, <<"good">>}])),
@ -149,21 +137,21 @@ t_create_default(_Config) ->
{<<"type">>, clientid}, {<<"clientid">>, <<"good">>}]), {<<"type">>, clientid}, {<<"clientid">>, <<"good">>}]),
[#emqx_trace{name = <<"test-name">>}] = emqx_trace:list(), [#emqx_trace{name = <<"test-name">>}] = emqx_trace:list(),
ok = emqx_trace:clear(), ok = emqx_trace:clear(),
Now = erlang:system_time(second),
Trace = [ Trace = [
{<<"name">>, <<"test-name">>}, {<<"name">>, <<"test-name">>},
{<<"type">>, topic}, {<<"type">>, topic},
{<<"topic">>, <<"/x/y/z">>}, {<<"topic">>, <<"/x/y/z">>},
{<<"start_at">>, <<"2021-10-28T10:54:47+08:00">>}, {<<"start_at">>, Now},
{<<"end_at">>, <<"2021-10-27T10:54:47+08:00">>} {<<"end_at">>, Now - 1}
], ],
{error, "end_at time has already passed"} = emqx_trace:create(Trace), {error, "end_at time has already passed"} = emqx_trace:create(Trace),
Now = erlang:system_time(second),
Trace2 = [ Trace2 = [
{<<"name">>, <<"test-name">>}, {<<"name">>, <<"test-name">>},
{<<"type">>, topic}, {<<"type">>, topic},
{<<"topic">>, <<"/x/y/z">>}, {<<"topic">>, <<"/x/y/z">>},
{<<"start_at">>, to_rfc3339(Now + 10)}, {<<"start_at">>, Now + 10},
{<<"end_at">>, to_rfc3339(Now + 3)} {<<"end_at">>, Now + 3}
], ],
{error, "failed by start_at >= end_at"} = emqx_trace:create(Trace2), {error, "failed by start_at >= end_at"} = emqx_trace:create(Trace2),
{ok, _} = emqx_trace:create([{<<"name">>, <<"test-name">>}, {ok, _} = emqx_trace:create([{<<"name">>, <<"test-name">>},
@ -190,9 +178,8 @@ t_create_with_extra_fields(_Config) ->
t_update_enable(_Config) -> t_update_enable(_Config) ->
Name = <<"test-name">>, Name = <<"test-name">>,
Now = erlang:system_time(second), Now = erlang:system_time(second),
End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)),
{ok, _} = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, topic}, {ok, _} = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, topic},
{<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]), {<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, Now + 2}]),
[#emqx_trace{enable = Enable}] = emqx_trace:list(), [#emqx_trace{enable = Enable}] = emqx_trace:list(),
?assertEqual(Enable, true), ?assertEqual(Enable, true),
ok = emqx_trace:update(Name, false), ok = emqx_trace:update(Name, false),
@ -211,14 +198,14 @@ t_update_enable(_Config) ->
t_load_state(_Config) -> t_load_state(_Config) ->
Now = erlang:system_time(second), Now = erlang:system_time(second),
Running = #{name => <<"Running">>, type => topic, Running = #{name => <<"Running">>, type => topic,
topic => <<"/x/y/1">>, start_at => to_rfc3339(Now - 1), topic => <<"/x/y/1">>, start_at => Now - 1,
end_at => to_rfc3339(Now + 2)}, end_at => Now + 2},
Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, topic}, Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, topic},
{<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)}, {<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, Now + 3},
{<<"end_at">>, to_rfc3339(Now + 8)}], {<<"end_at">>, Now + 8}],
Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, topic}, Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, topic},
{<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, to_rfc3339(Now - 5)}, {<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, Now - 5},
{<<"end_at">>, to_rfc3339(Now)}], {<<"end_at">>, Now}],
{ok, _} = emqx_trace:create(Running), {ok, _} = emqx_trace:create(Running),
{ok, _} = emqx_trace:create(Waiting), {ok, _} = emqx_trace:create(Waiting),
{error, "end_at time has already passed"} = emqx_trace:create(Finished), {error, "end_at time has already passed"} = emqx_trace:create(Finished),
@ -239,10 +226,9 @@ t_client_event(_Config) ->
application:set_env(emqx, allow_anonymous, true), application:set_env(emqx, allow_anonymous, true),
ClientId = <<"client-test">>, ClientId = <<"client-test">>,
Now = erlang:system_time(second), Now = erlang:system_time(second),
Start = to_rfc3339(Now),
Name = <<"test_client_id_event">>, Name = <<"test_client_id_event">>,
{ok, _} = emqx_trace:create([{<<"name">>, Name}, {ok, _} = emqx_trace:create([{<<"name">>, Name},
{<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Now}]),
ok = emqx_trace_handler_SUITE:filesync(Name, clientid), ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
@ -251,7 +237,7 @@ t_client_event(_Config) ->
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]),
ok = emqx_trace_handler_SUITE:filesync(Name, clientid), ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
{ok, _} = emqx_trace:create([{<<"name">>, <<"test_topic">>}, {ok, _} = emqx_trace:create([{<<"name">>, <<"test_topic">>},
{<<"type">>, topic}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]), {<<"type">>, topic}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Now}]),
ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic), ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic),
{ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)), {ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)),
ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]),
@ -269,15 +255,13 @@ t_client_event(_Config) ->
t_get_log_filename(_Config) -> t_get_log_filename(_Config) ->
Now = erlang:system_time(second), Now = erlang:system_time(second),
Start = calendar:system_time_to_rfc3339(Now),
End = calendar:system_time_to_rfc3339(Now + 2),
Name = <<"name1">>, Name = <<"name1">>,
Trace = [ Trace = [
{<<"name">>, Name}, {<<"name">>, Name},
{<<"type">>, ip_address}, {<<"type">>, ip_address},
{<<"ip_address">>, <<"127.0.0.1">>}, {<<"ip_address">>, <<"127.0.0.1">>},
{<<"start_at">>, list_to_binary(Start)}, {<<"start_at">>, Now},
{<<"end_at">>, list_to_binary(End)} {<<"end_at">>, Now +2}
], ],
{ok, _} = emqx_trace:create(Trace), {ok, _} = emqx_trace:create(Trace),
?assertEqual({error, not_found}, emqx_trace:get_trace_filename(<<"test">>)), ?assertEqual({error, not_found}, emqx_trace:get_trace_filename(<<"test">>)),
@ -322,9 +306,6 @@ t_find_closed_time(_Config) ->
?assertEqual(1000, emqx_trace:find_closest_time(Traces, Now)), ?assertEqual(1000, emqx_trace:find_closest_time(Traces, Now)),
ok. ok.
to_rfc3339(Second) ->
list_to_binary(calendar:system_time_to_rfc3339(Second)).
reload() -> reload() ->
catch ok = gen_server:stop(emqx_trace), catch ok = gen_server:stop(emqx_trace),
{ok, _Pid} = emqx_trace:start_link(). {ok, _Pid} = emqx_trace:start_link().

View File

@ -434,8 +434,16 @@ typename_to_spec("non_neg_integer()", _Mod) -> #{type => integer, minimum => 1,
typename_to_spec("number()", _Mod) -> #{type => number, example => 42}; typename_to_spec("number()", _Mod) -> #{type => number, example => 42};
typename_to_spec("string()", _Mod) -> #{type => string, example => <<"string-example">>}; typename_to_spec("string()", _Mod) -> #{type => string, example => <<"string-example">>};
typename_to_spec("atom()", _Mod) -> #{type => string, example => atom}; typename_to_spec("atom()", _Mod) -> #{type => string, example => atom};
typename_to_spec("rfc3339_system_time()", _Mod) -> #{type => string, typename_to_spec("epoch_second()", _Mod) ->
example => <<"2021-12-05T02:01:34.186Z">>, format => <<"date-time">>}; #{<<"oneOf">> => [
#{type => integer, example => 1640995200, desc => <<"epoch-second">>},
#{type => string, example => <<"2022-01-01T00:00:00.000Z">>, format => <<"date-time">>}]
};
typename_to_spec("epoch_millisecond()", _Mod) ->
#{<<"oneOf">> => [
#{type => integer, example => 1640995200000, desc => <<"epoch-millisecond">>},
#{type => string, example => <<"2022-01-01T00:00:00.000Z">>, format => <<"date-time">>}]
};
typename_to_spec("unicode_binary()", _Mod) -> #{type => string, example => <<"unicode-binary">>}; typename_to_spec("unicode_binary()", _Mod) -> #{type => string, example => <<"unicode-binary">>};
typename_to_spec("duration()", _Mod) -> #{type => string, example => <<"12m">>}; typename_to_spec("duration()", _Mod) -> #{type => string, example => <<"12m">>};
typename_to_spec("duration_s()", _Mod) -> #{type => string, example => <<"1h">>}; typename_to_spec("duration_s()", _Mod) -> #{type => string, example => <<"1h">>};

View File

@ -57,7 +57,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
paths() -> paths() ->
[ "/gateway/:name/clients" [ "/gateway/:name/clients"
@ -90,9 +90,8 @@ paths() ->
-define(QUERY_FUN, {?MODULE, query}). -define(QUERY_FUN, {?MODULE, query}).
clients(get, #{ bindings := #{name := Name0} clients(get, #{ bindings := #{name := Name0}
, query_string := Params0 , query_string := Params
}) -> }) ->
Params = emqx_mgmt_api:ensure_timestamp_format(Params0, time_keys()),
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
TabName = emqx_gateway_cm:tabname(info, GwName), TabName = emqx_gateway_cm:tabname(info, GwName),
case maps:get(<<"node">>, Params, undefined) of case maps:get(<<"node">>, Params, undefined) of
@ -210,16 +209,6 @@ extra_sub_props(Props) ->
#{subid => maps:get(<<"subid">>, Props, undefined)} #{subid => maps:get(<<"subid">>, Props, undefined)}
). ).
%%--------------------------------------------------------------------
%% QueryString data-fomrat convert
%% (try rfc3339 to timestamp or keep timestamp)
time_keys() ->
[ <<"gte_created_at">>
, <<"lte_created_at">>
, <<"gte_connected_at">>
, <<"lte_connected_at">>].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% query funcs %% query funcs
@ -508,19 +497,19 @@ params_client_searching_in_qs() ->
mk(binary(), mk(binary(),
M#{desc => <<"Use sub-string to match client's username">>})} M#{desc => <<"Use sub-string to match client's username">>})}
, {gte_created_at, , {gte_created_at,
mk(binary(), mk(emqx_datetime:epoch_millisecond(),
M#{desc => <<"Match the session created datetime greater than " M#{desc => <<"Match the session created datetime greater than "
"a certain value">>})} "a certain value">>})}
, {lte_created_at, , {lte_created_at,
mk(binary(), mk(emqx_datetime:epoch_millisecond(),
M#{desc => <<"Match the session created datetime less than " M#{desc => <<"Match the session created datetime less than "
"a certain value">>})} "a certain value">>})}
, {gte_connected_at, , {gte_connected_at,
mk(binary(), mk(emqx_datetime:epoch_millisecond(),
M#{desc => <<"Match the client socket connected datetime greater " M#{desc => <<"Match the client socket connected datetime greater "
"than a certain value">>})} "than a certain value">>})}
, {lte_connected_at, , {lte_connected_at,
mk(binary(), mk(emqx_datetime:epoch_millisecond(),
M#{desc => <<"Match the client socket connected datatime less than " M#{desc => <<"Match the client socket connected datatime less than "
"a certain value">>})} "a certain value">>})}
, {endpoint_name, , {endpoint_name,
@ -686,10 +675,10 @@ common_client_props() ->
#{ desc => <<"Indicates whether the client is connected via " #{ desc => <<"Indicates whether the client is connected via "
"bridge">>})} "bridge">>})}
, {connected_at, , {connected_at,
mk(binary(), mk(emqx_datetime:epoch_millisecond(),
#{ desc => <<"Client connection time">>})} #{ desc => <<"Client connection time">>})}
, {disconnected_at, , {disconnected_at,
mk(binary(), mk(emqx_datetime:epoch_millisecond(),
#{ desc => <<"Client offline time, This field is only valid and " #{ desc => <<"Client offline time, This field is only valid and "
"returned when connected is false">>})} "returned when connected is false">>})}
, {connected, , {connected,
@ -714,7 +703,7 @@ common_client_props() ->
#{ desc => <<"Session expiration interval, with the unit of " #{ desc => <<"Session expiration interval, with the unit of "
"second">>})} "second">>})}
, {created_at, , {created_at,
mk(binary(), mk(emqx_datetime:epoch_millisecond(),
#{ desc => <<"Session creation time">>})} #{ desc => <<"Session creation time">>})}
, {subscriptions_cnt, , {subscriptions_cnt,
mk(integer(), mk(integer(),

View File

@ -35,15 +35,6 @@
-export([do_query/6]). -export([do_query/6]).
-export([ ensure_timestamp_format/2
]).
-export([ unix_ts_to_rfc3339_bin/1
, unix_ts_to_rfc3339_bin/2
, time_string_to_unix_ts_int/1
, time_string_to_unix_ts_int/2
]).
paginate(Tables, Params, {Module, FormatFun}) -> paginate(Tables, Params, {Module, FormatFun}) ->
Qh = query_handle(Tables), Qh = query_handle(Tables),
Count = count(Tables), Count = count(Tables),
@ -261,7 +252,7 @@ select_table_with_count(_Tab, Ms, Continuation, _Limit, FmtFun) ->
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal funcs %% Internal Functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
params2qs(Params, QsSchema) when is_map(Params) -> params2qs(Params, QsSchema) when is_map(Params) ->
@ -418,41 +409,6 @@ to_ip_port(IPAddress) ->
Port = list_to_integer(Port0), Port = list_to_integer(Port0),
{IP, Port}. {IP, Port}.
%%--------------------------------------------------------------------
%% time format funcs
ensure_timestamp_format(Qs, TimeKeys)
when is_map(Qs);
is_list(TimeKeys) ->
Fun = fun (Key, NQs) ->
case NQs of
%% TimeString likes "2021-01-01T00:00:00.000+08:00" (in rfc3339)
%% or "1609430400000" (in millisecond)
#{Key := TimeString} ->
NQs#{Key => time_string_to_unix_ts_int(TimeString)};
#{} -> NQs
end
end,
lists:foldl(Fun, Qs, TimeKeys).
unix_ts_to_rfc3339_bin(TimeStamp) ->
unix_ts_to_rfc3339_bin(TimeStamp, millisecond).
unix_ts_to_rfc3339_bin(TimeStamp, Unit) when is_integer(TimeStamp) ->
list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])).
time_string_to_unix_ts_int(DateTime) ->
time_string_to_unix_ts_int(DateTime, millisecond).
time_string_to_unix_ts_int(DateTime, Unit) when is_binary(DateTime) ->
try binary_to_integer(DateTime) of
TimeStamp when is_integer(TimeStamp) -> TimeStamp
catch
error:badarg ->
calendar:rfc3339_to_system_time(
binary_to_list(DateTime), [{unit, Unit}])
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% EUnits %% EUnits
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -91,13 +91,13 @@ fields(app) ->
"""They are useful for accessing public data anonymously,""" """They are useful for accessing public data anonymously,"""
"""and are used to associate API requests.""", """and are used to associate API requests.""",
example => <<"MzAyMjk3ODMwMDk0NjIzOTUxNjcwNzQ0NzQ3MTE2NDYyMDI">>})}, example => <<"MzAyMjk3ODMwMDk0NjIzOTUxNjcwNzQ0NzQ3MTE2NDYyMDI">>})},
{expired_at, hoconsc:mk(hoconsc:union([undefined, emqx_schema:rfc3339_system_time()]), {expired_at, hoconsc:mk(hoconsc:union([undefined, emqx_datetime:epoch_second()]),
#{desc => "No longer valid datetime", #{desc => "No longer valid datetime",
example => <<"2021-12-05T02:01:34.186Z">>, example => <<"2021-12-05T02:01:34.186Z">>,
nullable => true, nullable => true,
default => undefined default => undefined
})}, })},
{created_at, hoconsc:mk(emqx_schema:rfc3339_system_time(), {created_at, hoconsc:mk(emqx_datetime:epoch_second(),
#{desc => "ApiKey create datetime", #{desc => "ApiKey create datetime",
example => <<"2021-12-01T00:00:00.000Z">> example => <<"2021-12-01T00:00:00.000Z">>
})}, })},

View File

@ -113,11 +113,11 @@ fields(ban) ->
desc => <<"Banned reason">>, desc => <<"Banned reason">>,
nullable => true, nullable => true,
example => <<"Too many requests">>})}, example => <<"Too many requests">>})},
{at, hoconsc:mk(emqx_schema:rfc3339_system_time(), #{ {at, hoconsc:mk(emqx_datetime:epoch_second(), #{
desc => <<"Create banned time, rfc3339, now if not specified">>, desc => <<"Create banned time, rfc3339, now if not specified">>,
nullable => true, nullable => true,
example => <<"2021-10-25T21:48:47+08:00">>})}, example => <<"2021-10-25T21:48:47+08:00">>})},
{until, hoconsc:mk(emqx_schema:rfc3339_system_time(), #{ {until, hoconsc:mk(emqx_datetime:epoch_second(), #{
desc => <<"Cancel banned time, rfc3339, now + 5 minute if not specified">>, desc => <<"Cancel banned time, rfc3339, now + 5 minute if not specified">>,
nullable => true, nullable => true,
example => <<"2021-10-25T21:53:47+08:00">>}) example => <<"2021-10-25T21:53:47+08:00">>})

View File

@ -71,7 +71,7 @@
<<"{\"code\": \"RESOURCE_NOT_FOUND\", \"reason\": \"Client id not found\"}">>). <<"{\"code\": \"RESOURCE_NOT_FOUND\", \"reason\": \"Client id not found\"}">>).
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
paths() -> paths() ->
[ "/clients" [ "/clients"
@ -134,22 +134,22 @@ schema("/clients") ->
in => query, in => query,
nullable => true, nullable => true,
desc => <<"Client user name, fuzzy search by substring">>})}, desc => <<"Client user name, fuzzy search by substring">>})},
{gte_created_at, hoconsc:mk(binary(), #{ {gte_created_at, hoconsc:mk(emqx_datetime:epoch_millisecond(), #{
in => query, in => query,
nullable => true, nullable => true,
desc => <<"Search client session creation time by greater", desc => <<"Search client session creation time by greater",
" than or equal method, rfc3339 or timestamp(millisecond)">>})}, " than or equal method, rfc3339 or timestamp(millisecond)">>})},
{lte_created_at, hoconsc:mk(binary(), #{ {lte_created_at, hoconsc:mk(emqx_datetime:epoch_millisecond(), #{
in => query, in => query,
nullable => true, nullable => true,
desc => <<"Search client session creation time by less", desc => <<"Search client session creation time by less",
" than or equal method, rfc3339 or timestamp(millisecond)">>})}, " than or equal method, rfc3339 or timestamp(millisecond)">>})},
{gte_connected_at, hoconsc:mk(binary(), #{ {gte_connected_at, hoconsc:mk(emqx_datetime:epoch_millisecond(), #{
in => query, in => query,
nullable => true, nullable => true,
desc => <<"Search client connection creation time by greater" desc => <<"Search client connection creation time by greater"
" than or equal method, rfc3339 or timestamp(millisecond)">>})}, " than or equal method, rfc3339 or timestamp(epoch millisecond)">>})},
{lte_connected_at, hoconsc:mk(binary(), #{ {lte_connected_at, hoconsc:mk(emqx_datetime:epoch_millisecond(), #{
in => query, in => query,
nullable => true, nullable => true,
desc => <<"Search client connection creation time by less" desc => <<"Search client connection creation time by less"
@ -281,11 +281,13 @@ fields(client) ->
<<"Indicate whether the client is using a brand new session">>})}, <<"Indicate whether the client is using a brand new session">>})},
{clientid, hoconsc:mk(binary(), #{desc => <<"Client identifier">>})}, {clientid, hoconsc:mk(binary(), #{desc => <<"Client identifier">>})},
{connected, hoconsc:mk(boolean(), #{desc => <<"Whether the client is connected">>})}, {connected, hoconsc:mk(boolean(), #{desc => <<"Whether the client is connected">>})},
{connected_at, hoconsc:mk(binary(), #{desc => <<"Client connection time, rfc3339">>})}, {connected_at, hoconsc:mk(emqx_datetime:epoch_millisecond(),
{created_at, hoconsc:mk(binary(), #{desc => <<"Session creation time, rfc3339">>})}, #{desc => <<"Client connection time, rfc3339 or timestamp(millisecond)">>})},
{disconnected_at, hoconsc:mk(binary(), #{desc => {created_at, hoconsc:mk(emqx_datetime:epoch_millisecond(),
#{desc => <<"Session creation time, rfc3339 or timestamp(millisecond)">>})},
{disconnected_at, hoconsc:mk(emqx_datetime:epoch_millisecond(), #{desc =>
<<"Client offline time." <<"Client offline time."
" It's Only valid and returned when connected is false, rfc3339">>})}, " It's Only valid and returned when connected is false, rfc3339 or timestamp(millisecond)">>})},
{expiry_interval, hoconsc:mk(integer(), #{desc => {expiry_interval, hoconsc:mk(integer(), #{desc =>
<<"Session expiration interval, with the unit of second">>})}, <<"Session expiration interval, with the unit of second">>})},
{heap_size, hoconsc:mk(integer(), #{desc => {heap_size, hoconsc:mk(integer(), #{desc =>
@ -386,7 +388,7 @@ fields(meta) ->
%%%============================================================================================== %%%==============================================================================================
%% parameters trans %% parameters trans
clients(get, #{query_string := Qs}) -> clients(get, #{query_string := Qs}) ->
list_clients(emqx_mgmt_api:ensure_timestamp_format(Qs, time_keys())). list_clients(Qs).
client(get, #{bindings := Bindings}) -> client(get, #{bindings := Bindings}) ->
lookup(Bindings); lookup(Bindings);
@ -562,16 +564,6 @@ do_unsubscribe(ClientID, Topic) ->
Res Res
end. end.
%%--------------------------------------------------------------------
%% QueryString data-fomrat convert
%% (try rfc3339 to timestamp or keep timestamp)
time_keys() ->
[ <<"gte_created_at">>
, <<"lte_created_at">>
, <<"gte_connected_at">>
, <<"lte_connected_at">>].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Query Functions %% Query Functions
@ -711,7 +703,7 @@ result_format_time_fun(Key, NClientInfoMap) ->
case NClientInfoMap of case NClientInfoMap of
#{Key := TimeStamp} -> #{Key := TimeStamp} ->
NClientInfoMap#{ NClientInfoMap#{
Key => emqx_mgmt_api:unix_ts_to_rfc3339_bin(TimeStamp)}; Key => emqx_datetime:epoch_to_rfc3339(TimeStamp)};
#{} -> #{} ->
NClientInfoMap NClientInfoMap
end. end.

View File

@ -169,13 +169,13 @@ fields(trace) ->
nullable => true, nullable => true,
example => running example => running
})}, })},
{start_at, hoconsc:mk(binary(), {start_at, hoconsc:mk(emqx_datetime:epoch_second(),
#{desc => "rfc3339 timestamp", #{desc => "rfc3339 timestamp or epoch second",
nullable => true, nullable => true,
example => <<"2021-11-04T18:17:38+08:00">> example => <<"2021-11-04T18:17:38+08:00">>
})}, })},
{end_at, hoconsc:mk(binary(), {end_at, hoconsc:mk(emqx_datetime:epoch_second(),
#{desc => "rfc3339 timestamp", #{desc => "rfc3339 timestamp or epoch second",
nullable => true, nullable => true,
example => <<"2021-11-05T18:17:38+08:00">> example => <<"2021-11-05T18:17:38+08:00">>
})}, })},

View File

@ -130,7 +130,7 @@ t_query_clients_with_time(_) ->
NowTimeStampInt = erlang:system_time(millisecond), NowTimeStampInt = erlang:system_time(millisecond),
%% Do not uri_encode `=` to `%3D` %% Do not uri_encode `=` to `%3D`
Rfc3339String = emqx_http_lib:uri_encode(binary:bin_to_list( Rfc3339String = emqx_http_lib:uri_encode(binary:bin_to_list(
emqx_mgmt_api:unix_ts_to_rfc3339_bin(NowTimeStampInt))), emqx_datetime:epoch_to_rfc3339(NowTimeStampInt))),
TimeStampString = emqx_http_lib:uri_encode(integer_to_list(NowTimeStampInt)), TimeStampString = emqx_http_lib:uri_encode(integer_to_list(NowTimeStampInt)),
LteKeys = ["lte_created_at=", "lte_connected_at="], LteKeys = ["lte_created_at=", "lte_connected_at="],
@ -148,10 +148,10 @@ t_query_clients_with_time(_) ->
|| {ok, Response} <- RequestResults], || {ok, Response} <- RequestResults],
{LteResponseDecodeds, GteResponseDecodeds} = lists:split(4, DecodedResults), {LteResponseDecodeds, GteResponseDecodeds} = lists:split(4, DecodedResults),
%% EachData :: list() %% EachData :: list()
[?assert( emqx_mgmt_api:time_string_to_unix_ts_int(CreatedAt) < NowTimeStampInt) [?assert(time_string_to_epoch_millisecond(CreatedAt) < NowTimeStampInt)
|| #{<<"data">> := EachData} <- LteResponseDecodeds, || #{<<"data">> := EachData} <- LteResponseDecodeds,
#{<<"created_at">> := CreatedAt} <- EachData], #{<<"created_at">> := CreatedAt} <- EachData],
[?assert(emqx_mgmt_api:time_string_to_unix_ts_int(ConnectedAt) < NowTimeStampInt) [?assert(time_string_to_epoch_millisecond(ConnectedAt) < NowTimeStampInt)
|| #{<<"data">> := EachData} <- LteResponseDecodeds, || #{<<"data">> := EachData} <- LteResponseDecodeds,
#{<<"connected_at">> := ConnectedAt} <- EachData], #{<<"connected_at">> := ConnectedAt} <- EachData],
[?assertEqual(EachData, []) [?assertEqual(EachData, [])
@ -180,3 +180,15 @@ t_keepalive(_Config) ->
?assertEqual(11, Keepalive), ?assertEqual(11, Keepalive),
emqtt:disconnect(C1), emqtt:disconnect(C1),
ok. ok.
time_string_to_epoch_millisecond(DateTime) ->
time_string_to_epoch(DateTime, millisecond).
time_string_to_epoch(DateTime, Unit) when is_binary(DateTime) ->
try binary_to_integer(DateTime) of
TimeStamp when is_integer(TimeStamp) -> TimeStamp
catch
error:badarg ->
calendar:rfc3339_to_system_time(
binary_to_list(DateTime), [{unit, Unit}])
end.

View File

@ -118,10 +118,9 @@ t_http_test(_Config) ->
t_download_log(_Config) -> t_download_log(_Config) ->
ClientId = <<"client-test-download">>, ClientId = <<"client-test-download">>,
Now = erlang:system_time(second), Now = erlang:system_time(second),
Start = to_rfc3339(Now),
Name = <<"test_client_id">>, Name = <<"test_client_id">>,
load(), load(),
create_trace(Name, ClientId, Start), create_trace(Name, ClientId, Now),
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
[begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
@ -156,8 +155,7 @@ t_stream_log(_Config) ->
ClientId = <<"client-stream">>, ClientId = <<"client-stream">>,
Now = erlang:system_time(second), Now = erlang:system_time(second),
Name = <<"test_stream_log">>, Name = <<"test_stream_log">>,
Start = to_rfc3339(Now - 10), create_trace(Name, ClientId, Now - 10),
create_trace(Name, ClientId, Start),
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
[begin _ = emqtt:ping(Client) end || _ <- lists:seq(1, 5)], [begin _ = emqtt:ping(Client) end || _ <- lists:seq(1, 5)],

View File

@ -129,12 +129,12 @@ fields(topic_metrics) ->
, example => <<"testtopic/1">> , example => <<"testtopic/1">>
, nullable => false})}, , nullable => false})},
{ create_time { create_time
, mk( emqx_schema:rfc3339_system_time() , mk( emqx_datetime:epoch_second()
, #{ desc => <<"Topic Metrics created date time, in rfc3339">> , #{ desc => <<"Topic Metrics created date time, in rfc3339">>
, nullable => false , nullable => false
, example => <<"2022-01-14T21:48:47+08:00">>})}, , example => <<"2022-01-14T21:48:47+08:00">>})},
{ reset_time { reset_time
, mk( emqx_schema:rfc3339_system_time() , mk( emqx_datetime:epoch_second()
, #{ desc => <<"Topic Metrics reset date time, in rfc3339. Nullable if never reset">> , #{ desc => <<"Topic Metrics reset date time, in rfc3339. Nullable if never reset">>
, nullable => true , nullable => true
, example => <<"2022-01-14T21:48:47+08:00">>})}, , example => <<"2022-01-14T21:48:47+08:00">>})},