emqx/apps/emqx_management/src/emqx_mgmt_api_clients.erl

732 lines
27 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_clients).
-behaviour(minirest_api).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include("emqx_mgmt.hrl").
%% API
-export([api_spec/0]).
-export([ clients/2
, client/2
, subscriptions/2
, authz_cache/2
, subscribe/2
, unsubscribe/2
, subscribe_batch/2
]).
-export([ query/4
, format_channel_info/1
]).
%% for batch operation
-export([do_subscribe/3]).
%% for test suite
-export([ unix_ts_to_rfc3339_bin/1
, unix_ts_to_rfc3339_bin/2
, time_string_to_unix_ts_int/1
, time_string_to_unix_ts_int/2
]).
-define(CLIENT_QS_SCHEMA, {emqx_channel_info,
[ {<<"node">>, atom}
, {<<"username">>, binary}
, {<<"zone">>, atom}
, {<<"ip_address">>, ip}
, {<<"conn_state">>, atom}
, {<<"clean_start">>, atom}
, {<<"proto_name">>, binary}
, {<<"proto_ver">>, integer}
, {<<"like_clientid">>, binary}
, {<<"like_username">>, binary}
, {<<"gte_created_at">>, timestamp}
, {<<"lte_created_at">>, timestamp}
, {<<"gte_connected_at">>, timestamp}
, {<<"lte_connected_at">>, timestamp}]}).
-define(query_fun, {?MODULE, query}).
-define(format_fun, {?MODULE, format_channel_info}).
-define(CLIENT_ID_NOT_FOUND,
<<"{\"code\": \"RESOURCE_NOT_FOUND\", \"reason\": \"Client id not found\"}">>).
api_spec() ->
{apis(), schemas()}.
apis() ->
[ clients_api()
, client_api()
, clients_authz_cache_api()
, clients_subscriptions_api()
, subscribe_api()
, unsubscribe_api()].
schemas() ->
Client = #{
client => #{
type => object,
properties => emqx_mgmt_util:properties(properties(client))
}
},
AuthzCache = #{
authz_cache => #{
type => object,
properties => emqx_mgmt_util:properties(properties(authz_cache))
}
},
[Client, AuthzCache].
properties(client) ->
[
{awaiting_rel_cnt, integer, <<"v4 api name [awaiting_rel] Number of awaiting PUBREC packet">>},
{awaiting_rel_max, integer, <<"v4 api name [max_awaiting_rel]. Maximum allowed number of awaiting PUBREC packet">>},
{clean_start, boolean, <<"Indicate whether the client is using a brand new session">>},
{clientid, string , <<"Client identifier">>},
{connected, boolean, <<"Whether the client is connected">>},
{connected_at, string , <<"Client connection time, rfc3339">>},
{created_at, string , <<"Session creation time, rfc3339">>},
{disconnected_at, string , <<"Client offline time, This field is only valid and returned when connected is false, rfc3339">>},
{expiry_interval, integer, <<"Session expiration interval, with the unit of second">>},
{heap_size, integer, <<"Process heap size with the unit of byte">>},
{inflight_cnt, integer, <<"Current length of inflight">>},
{inflight_max, integer, <<"v4 api name [max_inflight]. Maximum length of inflight">>},
{ip_address, string , <<"Client's IP address">>},
{port, integer, <<"Client's port">>},
{is_bridge, boolean, <<"Indicates whether the client is connectedvia bridge">>},
{keepalive, integer, <<"keepalive time, with the unit of second">>},
{mailbox_len, integer, <<"Process mailbox size">>},
{mqueue_dropped, integer, <<"Number of messages dropped by the message queue due to exceeding the length">>},
{mqueue_len, integer, <<"Current length of message queue">>},
{mqueue_max, integer, <<"v4 api name [max_mqueue]. Maximum length of message queue">>},
{node, string , <<"Name of the node to which the client is connected">>},
{proto_name, string , <<"Client protocol name">>},
{proto_ver, integer, <<"Protocol version used by the client">>},
{recv_cnt, integer, <<"Number of TCP packets received">>},
{recv_msg, integer, <<"Number of PUBLISH packets received">>},
{recv_oct, integer, <<"Number of bytes received by EMQ X Broker (the same below)">>},
{recv_pkt, integer, <<"Number of MQTT packets received">>},
{reductions, integer, <<"Erlang reduction">>},
{send_cnt, integer, <<"Number of TCP packets sent">>},
{send_msg, integer, <<"Number of PUBLISH packets sent">>},
{send_oct, integer, <<"Number of bytes sent">>},
{send_pkt, integer, <<"Number of MQTT packets sent">>},
{subscriptions_cnt, integer, <<"Number of subscriptions established by this client.">>},
{subscriptions_max, integer, <<"v4 api name [max_subscriptions] Maximum number of subscriptions allowed by this client">>},
{username, string , <<"User name of client when connecting">>},
{will_msg, string , <<"Client will message">>},
{zone, string , <<"Indicate the configuration group used by the client">>}
];
properties(authz_cache) ->
[
{access, string, <<"Access type">>},
{result, string, <<"Allow or deny">>},
{topic, string, <<"Topic name">>},
{updated_time, integer, <<"Update time">>}
].
clients_api() ->
Metadata = #{
get => #{
description => <<"List clients">>,
parameters => [
#{
name => page,
in => query,
required => false,
description => <<"Page">>,
schema => #{type => integer}
},
#{
name => limit,
in => query,
required => false,
description => <<"Page limit">>,
schema => #{type => integer}
},
#{
name => node,
in => query,
required => false,
description => <<"Node name">>,
schema => #{type => string}
},
#{
name => username,
in => query,
required => false,
description => <<"User name">>,
schema => #{type => string}
},
#{
name => zone,
in => query,
required => false,
schema => #{type => string}
},
#{
name => ip_address,
in => query,
required => false,
description => <<"Client's IP address">>,
schema => #{type => string}
},
#{
name => conn_state,
in => query,
required => false,
description => <<"The current connection status of the client, the possible values are connected,idle,disconnected">>,
schema => #{type => string, enum => [connected, idle, disconnected]}
},
#{
name => clean_start,
in => query,
required => false,
description => <<"Whether the client uses a new session">>,
schema => #{type => boolean}
},
#{
name => proto_name,
in => query,
required => false,
description => <<"Client protocol name, the possible values are MQTT,CoAP,LwM2M,MQTT-SN">>,
schema => #{type => string, enum => ['MQTT', 'CoAP', 'LwM2M', 'MQTT-SN']}
},
#{
name => proto_ver,
in => query,
required => false,
description => <<"Client protocol version">>,
schema => #{type => string}
},
#{
name => like_clientid,
in => query,
required => false,
description => <<"Fuzzy search of client identifier by substring method">>,
schema => #{type => string}
},
#{
name => like_username,
in => query,
required => false,
description => <<"Client user name, fuzzy search by substring">>,
schema => #{type => string}
},
#{
name => gte_created_at,
in => query,
required => false,
description => <<"Search client session creation time by greater than or equal method, rfc3339 or timestamp(millisecond)">>,
schema => #{type => string}
},
#{
name => lte_created_at,
in => query,
required => false,
description => <<"Search client session creation time by less than or equal method, rfc3339 or timestamp(millisecond)">>,
schema => #{type => string}
},
#{
name => gte_connected_at,
in => query,
required => false,
description => <<"Search client connection creation time by greater than or equal method, rfc3339 or timestamp(millisecond)">>,
schema => #{type => string}
},
#{
name => lte_connected_at,
in => query,
required => false,
description => <<"Search client connection creation time by less than or equal method, rfc3339 or timestamp(millisecond) ">>,
schema => #{type => string}
}
],
responses => #{
<<"200">> => emqx_mgmt_util:array_schema(client, <<"List clients 200 OK">>),
<<"400">> => emqx_mgmt_util:error_schema(<<"Invalid parameters">>, ['INVALID_PARAMETER'])}}},
{"/clients", Metadata, clients}.
client_api() ->
Metadata = #{
get => #{
description => <<"Get clients info by client ID">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}],
responses => #{
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(client, <<"List clients 200 OK">>)}},
delete => #{
description => <<"Kick out client by client ID">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}],
responses => #{
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(client, <<"List clients 200 OK">>)}}},
{"/clients/:clientid", Metadata, client}.
clients_authz_cache_api() ->
Metadata = #{
get => #{
description => <<"Get client authz cache">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}],
responses => #{
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(authz_cache, <<"Get client authz cache">>)}},
delete => #{
description => <<"Clean client authz cache">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}],
responses => #{
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(<<"Delete clients 200 OK">>)}}},
{"/clients/:clientid/authz_cache", Metadata, authz_cache}.
clients_subscriptions_api() ->
Metadata = #{
get => #{
description => <<"Get client subscriptions">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}],
responses => #{
<<"200">> =>
emqx_mgmt_util:array_schema(subscription, <<"Get client subscriptions">>)}}
},
{"/clients/:clientid/subscriptions", Metadata, subscriptions}.
unsubscribe_api() ->
Metadata = #{
post => #{
description => <<"Unsubscribe">>,
parameters => [
#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}
],
'requestBody' => emqx_mgmt_util:schema(#{
type => object,
properties => #{
topic => #{
type => string,
description => <<"Topic">>}}}),
responses => #{
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(<<"Unsubscribe ok">>)}}},
{"/clients/:clientid/unsubscribe", Metadata, unsubscribe}.
subscribe_api() ->
Metadata = #{
post => #{
description => <<"Subscribe">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}],
'requestBody' => emqx_mgmt_util:schema(#{
type => object,
properties => #{
topic => #{
type => string,
description => <<"Topic">>},
qos => #{
type => integer,
enum => [0, 1, 2],
example => 0,
description => <<"QoS">>}}}),
responses => #{
<<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:schema(<<"Subscribe ok">>)}}},
{"/clients/:clientid/subscribe", Metadata, subscribe}.
%%%==============================================================================================
%% parameters trans
clients(get, #{query_string := Qs}) ->
list(generate_qs(Qs)).
client(get, #{bindings := Bindings}) ->
lookup(Bindings);
client(delete, #{bindings := Bindings}) ->
kickout(Bindings).
authz_cache(get, #{bindings := Bindings}) ->
get_authz_cache(Bindings);
authz_cache(delete, #{bindings := Bindings}) ->
clean_authz_cache(Bindings).
subscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
Topic = maps:get(<<"topic">>, TopicInfo),
Qos = maps:get(<<"qos">>, TopicInfo, 0),
subscribe(#{clientid => ClientID, topic => Topic, qos => Qos}).
unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
Topic = maps:get(<<"topic">>, TopicInfo),
unsubscribe(#{clientid => ClientID, topic => Topic}).
%% TODO: batch
subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) ->
Topics =
[begin
Topic = maps:get(<<"topic">>, TopicInfo),
Qos = maps:get(<<"qos">>, TopicInfo, 0),
#{topic => Topic, qos => Qos}
end || TopicInfo <- TopicInfos],
subscribe_batch(#{clientid => ClientID, topics => Topics}).
subscriptions(get, #{bindings := #{clientid := ClientID}}) ->
{Node, Subs0} = emqx_mgmt:list_client_subscriptions(ClientID),
Subs = lists:map(fun({Topic, SubOpts}) ->
#{node => Node, clientid => ClientID, topic => Topic, qos => maps:get(qos, SubOpts)}
end, Subs0),
{200, Subs}.
%%%==============================================================================================
%% api apply
list(Params) ->
{Tab, QuerySchema} = ?CLIENT_QS_SCHEMA,
case maps:get(<<"node">>, Params, undefined) of
undefined ->
Response = emqx_mgmt_api:cluster_query(Params, Tab,
QuerySchema, ?query_fun),
emqx_mgmt_util:generate_response(Response);
Node1 ->
Node = binary_to_atom(Node1, utf8),
ParamsWithoutNode = maps:without([<<"node">>], Params),
Response = emqx_mgmt_api:node_query(Node, ParamsWithoutNode,
Tab, QuerySchema, ?query_fun),
emqx_mgmt_util:generate_response(Response)
end.
lookup(#{clientid := ClientID}) ->
case emqx_mgmt:lookup_client({clientid, ClientID}, ?format_fun) of
[] ->
{404, ?CLIENT_ID_NOT_FOUND};
ClientInfo ->
{200, hd(ClientInfo)}
end.
kickout(#{clientid := ClientID}) ->
emqx_mgmt:kickout_client(ClientID),
{200}.
get_authz_cache(#{clientid := ClientID})->
case emqx_mgmt:list_authz_cache(ClientID) of
{error, not_found} ->
{404, ?CLIENT_ID_NOT_FOUND};
{error, Reason} ->
Message = list_to_binary(io_lib:format("~p", [Reason])),
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
Caches ->
Response = [format_authz_cache(Cache) || Cache <- Caches],
{200, Response}
end.
clean_authz_cache(#{clientid := ClientID}) ->
case emqx_mgmt:clean_authz_cache(ClientID) of
ok ->
{200};
{error, not_found} ->
{404, ?CLIENT_ID_NOT_FOUND};
{error, Reason} ->
Message = list_to_binary(io_lib:format("~p", [Reason])),
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}}
end.
subscribe(#{clientid := ClientID, topic := Topic, qos := Qos}) ->
case do_subscribe(ClientID, Topic, Qos) of
{error, channel_not_found} ->
{404, ?CLIENT_ID_NOT_FOUND};
{error, Reason} ->
Message = list_to_binary(io_lib:format("~p", [Reason])),
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
ok ->
{200}
end.
unsubscribe(#{clientid := ClientID, topic := Topic}) ->
case do_unsubscribe(ClientID, Topic) of
{error, channel_not_found} ->
{404, ?CLIENT_ID_NOT_FOUND};
{error, Reason} ->
Message = list_to_binary(io_lib:format("~p", [Reason])),
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
{unsubscribe, [{Topic, #{}}]} ->
{200}
end.
subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
ArgList = [[ClientID, Topic, Qos] || #{topic := Topic, qos := Qos} <- Topics],
emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList).
%%--------------------------------------------------------------------
%% internal function
do_subscribe(ClientID, Topic0, Qos) ->
{Topic, Opts} = emqx_topic:parse(Topic0),
TopicTable = [{Topic, Opts#{qos => Qos}}],
case emqx_mgmt:subscribe(ClientID, TopicTable) of
{error, Reason} ->
{error, Reason};
{subscribe, Subscriptions} ->
case proplists:is_defined(Topic, Subscriptions) of
true ->
ok;
false ->
{error, unknow_error}
end
end.
do_unsubscribe(ClientID, Topic) ->
case emqx_mgmt:unsubscribe(ClientID, Topic) of
{error, Reason} ->
{error, Reason};
Res ->
Res
end.
%%--------------------------------------------------------------------
%% QueryString Generation (try rfc3339 to timestamp or keep timestamp)
time_keys() ->
[ <<"gte_created_at">>
, <<"lte_created_at">>
, <<"gte_connected_at">>
, <<"lte_connected_at">>].
generate_qs(Qs) ->
Fun =
fun (Key, NQs) ->
case NQs of
%% TimeString likes "2021-01-01T00:00:00.000+08:00" (in rfc3339)
%% or "1609430400000" (in millisecond)
#{Key := TimeString} -> NQs#{Key => time_string_to_unix_ts_int(TimeString)};
#{} -> NQs
end
end,
lists:foldl(Fun, Qs, time_keys()).
%%--------------------------------------------------------------------
%% Query Functions
query(Tab, {Qs, []}, Continuation, Limit) ->
Ms = qs2ms(Qs),
emqx_mgmt_api:select_table_with_count(Tab, Ms, Continuation, Limit,
fun format_channel_info/1);
query(Tab, {Qs, Fuzzy}, Continuation, Limit) ->
Ms = qs2ms(Qs),
FuzzyFilterFun = fuzzy_filter_fun(Fuzzy),
emqx_mgmt_api:select_table_with_count(Tab, {Ms, FuzzyFilterFun}, Continuation, Limit,
fun format_channel_info/1).
%%--------------------------------------------------------------------
%% QueryString to Match Spec
-spec qs2ms(list()) -> ets:match_spec().
qs2ms(Qs) ->
{MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
[{{'$1', MtchHead, '_'}, Conds, ['$_']}].
qs2ms([], _, {MtchHead, Conds}) ->
{MtchHead, lists:reverse(Conds)};
qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) ->
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)),
qs2ms(Rest, N, {NMtchHead, Conds});
qs2ms([Qs | Rest], N, {MtchHead, Conds}) ->
Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8),
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)),
NConds = put_conds(Qs, Holder, Conds),
qs2ms(Rest, N+1, {NMtchHead, NConds}).
put_conds({_, Op, V}, Holder, Conds) ->
[{Op, Holder, V} | Conds];
put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) ->
[{Op2, Holder, V2},
{Op1, Holder, V1} | Conds].
ms(clientid, X) ->
#{clientinfo => #{clientid => X}};
ms(username, X) ->
#{clientinfo => #{username => X}};
ms(zone, X) ->
#{clientinfo => #{zone => X}};
ms(conn_state, X) ->
#{conn_state => X};
ms(ip_address, X) ->
#{conninfo => #{peername => {X, '_'}}};
ms(clean_start, X) ->
#{conninfo => #{clean_start => X}};
ms(proto_name, X) ->
#{conninfo => #{proto_name => X}};
ms(proto_ver, X) ->
#{conninfo => #{proto_ver => X}};
ms(connected_at, X) ->
#{conninfo => #{connected_at => X}};
ms(created_at, X) ->
#{session => #{created_at => X}}.
%%--------------------------------------------------------------------
%% Match funcs
fuzzy_filter_fun(Fuzzy) ->
REFuzzy = lists:map(fun({K, like, S}) ->
{ok, RE} = re:compile(escape(S)),
{K, like, RE}
end, Fuzzy),
fun(MsRaws) when is_list(MsRaws) ->
lists:filter( fun(E) -> run_fuzzy_filter(E, REFuzzy) end
, MsRaws)
end.
escape(B) when is_binary(B) ->
re:replace(B, <<"\\\\">>, <<"\\\\\\\\">>, [{return, binary}, global]).
run_fuzzy_filter(_, []) ->
true;
run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE} | Fuzzy]) ->
Val = case maps:get(Key, ClientInfo, "") of
undefined -> "";
V -> V
end,
re:run(Val, RE, [{capture, none}]) == match andalso run_fuzzy_filter(E, Fuzzy).
%%--------------------------------------------------------------------
%% format funcs
format_channel_info({_, ClientInfo, ClientStats}) ->
StatsMap = maps:without([memory, next_pkt_id, total_heap_size],
maps:from_list(ClientStats)),
ClientInfoMap0 = maps:fold(fun take_maps_from_inner/3, #{}, ClientInfo),
{IpAddress, Port} = peername_dispart(maps:get(peername, ClientInfoMap0)),
Connected = maps:get(conn_state, ClientInfoMap0) =:= connected,
ClientInfoMap1 = maps:merge(StatsMap, ClientInfoMap0),
ClientInfoMap2 = maps:put(node, node(), ClientInfoMap1),
ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2),
ClientInfoMap4 = maps:put(port, Port, ClientInfoMap3),
ClientInfoMap = maps:put(connected, Connected, ClientInfoMap4),
RemoveList =
[ auth_result
, peername
, sockname
, peerhost
, conn_state
, send_pend
, conn_props
, peercert
, sockstate
, subscriptions
, receive_maximum
, protocol
, is_superuser
, sockport
, anonymous
, mountpoint
, socktype
, active_n
, await_rel_timeout
, conn_mod
, sockname
, retry_interval
, upgrade_qos
],
TimesKeys = [created_at, connected_at, disconnected_at],
%% format timestamp to rfc3339
lists:foldl(fun result_format_time_fun/2
, maps:without(RemoveList, ClientInfoMap)
, TimesKeys).
%% format func helpers
take_maps_from_inner(_Key, Value, Current) when is_map(Value) ->
maps:merge(Current, Value);
take_maps_from_inner(Key, Value, Current) ->
maps:put(Key, Value, Current).
result_format_time_fun(Key, NClientInfoMap) ->
case NClientInfoMap of
#{Key := TimeStamp} -> NClientInfoMap#{Key => unix_ts_to_rfc3339_bin(TimeStamp)};
#{} -> NClientInfoMap
end.
-spec(peername_dispart(emqx_types:peername()) -> {binary(), inet:port_number()}).
peername_dispart({Addr, Port}) ->
AddrBinary = list_to_binary(inet:ntoa(Addr)),
%% PortBinary = integer_to_binary(Port),
{AddrBinary, Port}.
format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) ->
#{ access => PubSub,
topic => Topic,
result => AuthzResult,
updated_time => Timestamp
}.
%%--------------------------------------------------------------------
%% time format funcs
unix_ts_to_rfc3339_bin(TimeStamp) ->
unix_ts_to_rfc3339_bin(TimeStamp, millisecond).
unix_ts_to_rfc3339_bin(TimeStamp, Unit) when is_integer(TimeStamp) ->
list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])).
time_string_to_unix_ts_int(DateTime) ->
time_string_to_unix_ts_int(DateTime, millisecond).
time_string_to_unix_ts_int(DateTime, Unit) when is_binary(DateTime) ->
try binary_to_integer(DateTime) of
TimeStamp when is_integer(TimeStamp) -> TimeStamp
catch
error:badarg ->
calendar:rfc3339_to_system_time(binary_to_list(DateTime), [{unit, Unit}])
end.