618 lines
18 KiB
Erlang
618 lines
18 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_telemetry).
|
|
|
|
-behaviour(gen_server).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-include("emqx_modules.hrl").
|
|
|
|
-export([
|
|
start_link/0,
|
|
stop/0
|
|
]).
|
|
|
|
%% gen_server callbacks
|
|
-export([
|
|
init/1,
|
|
handle_call/3,
|
|
handle_cast/2,
|
|
handle_continue/2,
|
|
handle_info/2,
|
|
terminate/2,
|
|
code_change/3
|
|
]).
|
|
|
|
-export([
|
|
enable/0,
|
|
disable/0
|
|
]).
|
|
|
|
-export([
|
|
get_node_uuid/0,
|
|
get_cluster_uuid/0,
|
|
get_telemetry/0
|
|
]).
|
|
|
|
-export([official_version/1]).
|
|
|
|
%% Internal exports (RPC)
|
|
-export([
|
|
do_ensure_uuids/0
|
|
]).
|
|
|
|
%% internal export
|
|
-export([read_raw_build_info/0]).
|
|
|
|
-ifdef(TEST).
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
-endif.
|
|
|
|
-import(proplists, [
|
|
get_value/2,
|
|
get_value/3
|
|
]).
|
|
|
|
-record(telemetry, {
|
|
id :: atom(),
|
|
uuid :: binary()
|
|
}).
|
|
|
|
-record(state, {
|
|
node_uuid :: undefined | binary(),
|
|
cluster_uuid :: undefined | binary(),
|
|
url :: string(),
|
|
report_interval :: non_neg_integer(),
|
|
timer = undefined :: undefined | reference(),
|
|
previous_metrics = #{} :: map()
|
|
}).
|
|
|
|
-type state() :: #state{}.
|
|
|
|
%% The count of 100-nanosecond intervals between the UUID epoch
|
|
%% 1582-10-15 00:00:00 and the UNIX epoch 1970-01-01 00:00:00.
|
|
-define(GREGORIAN_EPOCH_OFFSET, 16#01b21dd213814000).
|
|
|
|
-define(CLUSTER_UUID_KEY, cluster_uuid).
|
|
|
|
-define(TELEMETRY, emqx_telemetry).
|
|
|
|
-define(TELEMETRY_SHARD, emqx_telemetry_shard).
|
|
|
|
-define(NODE_UUID_FILENAME, "node.uuid").
|
|
-define(CLUSTER_UUID_FILENAME, "cluster.uuid").
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% API
|
|
%%--------------------------------------------------------------------
|
|
|
|
start_link() ->
|
|
Opts = emqx:get_config([telemetry], #{}),
|
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
|
|
|
|
stop() ->
|
|
gen_server:stop(?MODULE).
|
|
|
|
enable() ->
|
|
gen_server:call(?MODULE, enable, 15_000).
|
|
|
|
disable() ->
|
|
gen_server:call(?MODULE, disable).
|
|
|
|
get_node_uuid() ->
|
|
gen_server:call(?MODULE, get_node_uuid).
|
|
|
|
get_cluster_uuid() ->
|
|
gen_server:call(?MODULE, get_cluster_uuid).
|
|
|
|
get_telemetry() ->
|
|
gen_server:call(?MODULE, get_telemetry).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% gen_server callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
init(_Opts) ->
|
|
{ok, undefined, {continue, init}}.
|
|
|
|
handle_continue(init, _) ->
|
|
ok = mria:create_table(
|
|
?TELEMETRY,
|
|
[
|
|
{type, set},
|
|
{storage, disc_copies},
|
|
{rlog_shard, ?TELEMETRY_SHARD},
|
|
{record_name, telemetry},
|
|
{attributes, record_info(fields, telemetry)}
|
|
]
|
|
),
|
|
ok = mria:wait_for_tables([?TELEMETRY]),
|
|
State0 = empty_state(),
|
|
{NodeUUID, ClusterUUID} = ensure_uuids(),
|
|
{noreply, State0#state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}};
|
|
handle_continue(Continue, State) ->
|
|
?SLOG(error, #{msg => "unexpected_continue", continue => Continue}),
|
|
{noreply, State}.
|
|
|
|
handle_call(enable, _From, State) ->
|
|
%% Wait a few moments before reporting the first telemetry, as the
|
|
%% apps might still be starting up. Also, this avoids hanging
|
|
%% `emqx_modules_app' initialization in case the POST request
|
|
%% takes a lot of time.
|
|
FirstReportTimeoutMS = timer:seconds(10),
|
|
{reply, ok, ensure_report_timer(FirstReportTimeoutMS, State)};
|
|
handle_call(disable, _From, State = #state{timer = Timer}) ->
|
|
emqx_misc:cancel_timer(Timer),
|
|
{reply, ok, State#state{timer = undefined}};
|
|
handle_call(get_node_uuid, _From, State = #state{node_uuid = UUID}) ->
|
|
{reply, {ok, UUID}, State};
|
|
handle_call(get_cluster_uuid, _From, State = #state{cluster_uuid = UUID}) ->
|
|
{reply, {ok, UUID}, State};
|
|
handle_call(get_telemetry, _From, State) ->
|
|
{_State, Telemetry} = get_telemetry(State),
|
|
{reply, {ok, Telemetry}, State};
|
|
handle_call(Req, _From, State) ->
|
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
|
{reply, ignored, State}.
|
|
|
|
handle_cast(Msg, State) ->
|
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
|
{noreply, State}.
|
|
|
|
handle_info({timeout, TRef, time_to_report_telemetry_data}, State0 = #state{timer = TRef}) ->
|
|
State = report_telemetry(State0),
|
|
{noreply, ensure_report_timer(State)};
|
|
handle_info({timeout, _TRef, time_to_report_telemetry_data}, State = #state{timer = undefined}) ->
|
|
{noreply, State};
|
|
handle_info(Info, State) ->
|
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
|
{noreply, State}.
|
|
|
|
terminate(_Reason, _State) ->
|
|
ok.
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%------------------------------------------------------------------------------
|
|
|
|
official_version(Version) ->
|
|
Pt = "^\\d+\\.\\d+(?:\\.\\d+)?(?:(-(?:alpha|beta|rc)\\.[1-9][0-9]*))?$",
|
|
match =:= re:run(Version, Pt, [{capture, none}]).
|
|
|
|
ensure_report_timer(State = #state{report_interval = ReportInterval}) ->
|
|
ensure_report_timer(ReportInterval, State).
|
|
|
|
ensure_report_timer(ReportInterval, State) ->
|
|
State#state{timer = emqx_misc:start_timer(ReportInterval, time_to_report_telemetry_data)}.
|
|
|
|
os_info() ->
|
|
case erlang:system_info(os_type) of
|
|
{unix, darwin} ->
|
|
[Name | _] = string:tokens(os:cmd("sw_vers -productName"), "\n"),
|
|
[Version | _] = string:tokens(os:cmd("sw_vers -productVersion"), "\n"),
|
|
[
|
|
{os_name, Name},
|
|
{os_version, Version}
|
|
];
|
|
{unix, _} ->
|
|
case file:read_file("/etc/os-release") of
|
|
{error, _} ->
|
|
[
|
|
{os_name, "Unknown"},
|
|
{os_version, "Unknown"}
|
|
];
|
|
{ok, FileContent} ->
|
|
OSInfo = parse_os_release(FileContent),
|
|
[
|
|
{os_name, get_value("NAME", OSInfo)},
|
|
{os_version,
|
|
get_value(
|
|
"VERSION",
|
|
OSInfo,
|
|
get_value(
|
|
"VERSION_ID",
|
|
OSInfo,
|
|
get_value("PRETTY_NAME", OSInfo)
|
|
)
|
|
)}
|
|
]
|
|
end;
|
|
{win32, nt} ->
|
|
Ver = os:cmd("ver"),
|
|
case re:run(Ver, "[a-zA-Z ]+ \\[Version ([0-9]+[\.])+[0-9]+\\]", [{capture, none}]) of
|
|
match ->
|
|
[NVer | _] = string:tokens(Ver, "\r\n"),
|
|
{match, [Version]} =
|
|
re:run(NVer, "([0-9]+[\.])+[0-9]+", [{capture, first, list}]),
|
|
[Name | _] = string:split(NVer, " [Version "),
|
|
[
|
|
{os_name, Name},
|
|
{os_version, Version}
|
|
];
|
|
nomatch ->
|
|
[
|
|
{os_name, "Unknown"},
|
|
{os_version, "Unknown"}
|
|
]
|
|
end
|
|
end.
|
|
|
|
otp_version() ->
|
|
erlang:system_info(otp_release).
|
|
|
|
uptime() ->
|
|
element(1, erlang:statistics(wall_clock)).
|
|
|
|
nodes_uuid() ->
|
|
Nodes = lists:delete(node(), mria:running_nodes()),
|
|
lists:foldl(
|
|
fun(Node, Acc) ->
|
|
case emqx_telemetry_proto_v1:get_node_uuid(Node) of
|
|
{badrpc, _Reason} ->
|
|
Acc;
|
|
{ok, UUID} ->
|
|
[UUID | Acc]
|
|
end
|
|
end,
|
|
[],
|
|
Nodes
|
|
).
|
|
|
|
active_plugins() ->
|
|
lists:foldl(
|
|
fun
|
|
(#{running_status := running} = Plugin, Acc) ->
|
|
#{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin,
|
|
[iolist_to_binary([Name, "-", Vsn]) | Acc];
|
|
(_, Acc) ->
|
|
Acc
|
|
end,
|
|
[],
|
|
emqx_plugins:list()
|
|
).
|
|
|
|
num_clients() ->
|
|
emqx_stats:getstat('live_connections.count').
|
|
|
|
messages_sent() ->
|
|
emqx_metrics:val('messages.sent').
|
|
|
|
messages_received() ->
|
|
emqx_metrics:val('messages.received').
|
|
|
|
topic_count() ->
|
|
emqx_stats:getstat('topics.count').
|
|
|
|
generate_uuid() ->
|
|
MicroSeconds = erlang:system_time(microsecond),
|
|
Timestamp = MicroSeconds * 10 + ?GREGORIAN_EPOCH_OFFSET,
|
|
<<TimeHigh:12, TimeMid:16, TimeLow:32>> = <<Timestamp:60>>,
|
|
<<ClockSeq:32>> = crypto:strong_rand_bytes(4),
|
|
<<First:7, _:1, Last:40>> = crypto:strong_rand_bytes(6),
|
|
<<NTimeHigh:16>> = <<16#01:4, TimeHigh:12>>,
|
|
<<NClockSeq:16>> = <<1:1, 0:1, ClockSeq:14>>,
|
|
<<Node:48>> = <<First:7, 1:1, Last:40>>,
|
|
list_to_binary(
|
|
io_lib:format(
|
|
"~.16B-~.16B-~.16B-~.16B-~.16B",
|
|
[TimeLow, TimeMid, NTimeHigh, NClockSeq, Node]
|
|
)
|
|
).
|
|
|
|
-spec get_telemetry(state()) -> {state(), proplists:proplist()}.
|
|
get_telemetry(State0 = #state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}) ->
|
|
OSInfo = os_info(),
|
|
{MQTTRTInsights, State} = mqtt_runtime_insights(State0),
|
|
#{
|
|
rule_engine := RuleEngineInfo,
|
|
bridge := BridgeInfo
|
|
} = get_rule_engine_and_bridge_info(),
|
|
{State, [
|
|
{emqx_version, bin(emqx_app:get_release())},
|
|
{license, [{edition, <<"opensource">>}]},
|
|
{os_name, bin(get_value(os_name, OSInfo))},
|
|
{os_version, bin(get_value(os_version, OSInfo))},
|
|
{otp_version, bin(otp_version())},
|
|
{up_time, uptime()},
|
|
{uuid, NodeUUID},
|
|
{cluster_uuid, ClusterUUID},
|
|
{nodes_uuid, nodes_uuid()},
|
|
{active_plugins, active_plugins()},
|
|
{num_clients, num_clients()},
|
|
{messages_received, messages_received()},
|
|
{messages_sent, messages_sent()},
|
|
{build_info, build_info()},
|
|
{vm_specs, vm_specs()},
|
|
{mqtt_runtime_insights, MQTTRTInsights},
|
|
{advanced_mqtt_features, advanced_mqtt_features()},
|
|
{authn_authz, get_authn_authz_info()},
|
|
{gateway, get_gateway_info()},
|
|
{rule_engine, RuleEngineInfo},
|
|
{bridge, BridgeInfo},
|
|
{exhook, get_exhook_info()}
|
|
]}.
|
|
|
|
report_telemetry(State0 = #state{url = URL}) ->
|
|
{State, Data} = get_telemetry(State0),
|
|
case emqx_json:safe_encode(Data) of
|
|
{ok, Bin} ->
|
|
httpc_request(post, URL, [], Bin),
|
|
?tp(debug, telemetry_data_reported, #{});
|
|
{error, Reason} ->
|
|
%% debug? why?
|
|
?tp(debug, telemetry_data_encode_error, #{data => Data, reason => Reason})
|
|
end,
|
|
State.
|
|
|
|
httpc_request(Method, URL, Headers, Body) ->
|
|
HTTPOptions = [{timeout, 10_000}, {ssl, [{verify, verify_none}]}],
|
|
Options = [],
|
|
httpc:request(Method, {URL, Headers, "application/json", Body}, HTTPOptions, Options).
|
|
|
|
parse_os_release(FileContent) ->
|
|
lists:foldl(
|
|
fun(Line, Acc) ->
|
|
[Var, Value] = string:tokens(Line, "="),
|
|
NValue =
|
|
case Value of
|
|
_ when is_list(Value) ->
|
|
lists:nth(1, string:tokens(Value, "\""));
|
|
_ ->
|
|
Value
|
|
end,
|
|
[{Var, NValue} | Acc]
|
|
end,
|
|
[],
|
|
string:tokens(binary:bin_to_list(FileContent), "\n")
|
|
).
|
|
|
|
build_info() ->
|
|
case ?MODULE:read_raw_build_info() of
|
|
{ok, BuildInfo} ->
|
|
%% running on EMQX release
|
|
{ok, Fields} = hocon:binary(BuildInfo),
|
|
Fields;
|
|
_ ->
|
|
#{}
|
|
end.
|
|
|
|
read_raw_build_info() ->
|
|
Filename = filename:join([
|
|
code:root_dir(),
|
|
"releases",
|
|
emqx_app:get_release(),
|
|
"BUILD_INFO"
|
|
]),
|
|
file:read_file(Filename).
|
|
|
|
vm_specs() ->
|
|
SysMemData = memsup:get_system_memory_data(),
|
|
[
|
|
{num_cpus, erlang:system_info(logical_processors)},
|
|
{total_memory, proplists:get_value(total_memory, SysMemData)}
|
|
].
|
|
|
|
-spec mqtt_runtime_insights(state()) -> {map(), state()}.
|
|
mqtt_runtime_insights(State0) ->
|
|
{MQTTRates, State} = update_mqtt_rates(State0),
|
|
MQTTRTInsights = MQTTRates#{num_topics => topic_count()},
|
|
{MQTTRTInsights, State}.
|
|
|
|
-spec update_mqtt_rates(state()) -> {map(), state()}.
|
|
update_mqtt_rates(
|
|
State = #state{
|
|
previous_metrics = PrevMetrics0,
|
|
report_interval = ReportInterval
|
|
}
|
|
) when
|
|
is_integer(ReportInterval), ReportInterval > 0
|
|
->
|
|
MetricsToCheck =
|
|
[
|
|
{messages_sent_rate, messages_sent, fun messages_sent/0},
|
|
{messages_received_rate, messages_received, fun messages_received/0}
|
|
],
|
|
{Metrics, PrevMetrics} =
|
|
lists:foldl(
|
|
fun({RateKey, CountKey, Fun}, {Rates0, PrevMetrics1}) ->
|
|
NewCount = Fun(),
|
|
OldCount = maps:get(CountKey, PrevMetrics1, 0),
|
|
Rate = (NewCount - OldCount) / ReportInterval,
|
|
Rates = Rates0#{RateKey => Rate},
|
|
PrevMetrics2 = PrevMetrics1#{CountKey => NewCount},
|
|
{Rates, PrevMetrics2}
|
|
end,
|
|
{#{}, PrevMetrics0},
|
|
MetricsToCheck
|
|
),
|
|
{Metrics, State#state{previous_metrics = PrevMetrics}};
|
|
update_mqtt_rates(State) ->
|
|
{#{}, State}.
|
|
|
|
advanced_mqtt_features() ->
|
|
#{retained_messages := RetainedMessages} = emqx_retainer:get_basic_usage_info(),
|
|
#{topic_rewrite_rule_count := RewriteRules} = emqx_rewrite:get_basic_usage_info(),
|
|
#{delayed_message_count := DelayedCount} = emqx_delayed:get_basic_usage_info(),
|
|
#{auto_subscribe_count := AutoSubscribe} = emqx_auto_subscribe:get_basic_usage_info(),
|
|
#{
|
|
topic_rewrite => RewriteRules,
|
|
delayed => DelayedCount,
|
|
retained => RetainedMessages,
|
|
auto_subscribe => AutoSubscribe
|
|
}.
|
|
|
|
get_authn_authz_info() ->
|
|
try
|
|
#{
|
|
authenticators := AuthnTypes,
|
|
overridden_listeners := OverriddenListeners
|
|
} = emqx_authn:get_enabled_authns(),
|
|
AuthzTypes = emqx_authz:get_enabled_authzs(),
|
|
#{
|
|
authn => AuthnTypes,
|
|
authn_listener => OverriddenListeners,
|
|
authz => AuthzTypes
|
|
}
|
|
catch
|
|
_:_ ->
|
|
#{
|
|
authn => [],
|
|
authn_listener => [],
|
|
authz => []
|
|
}
|
|
end.
|
|
|
|
get_gateway_info() ->
|
|
try
|
|
emqx_gateway:get_basic_usage_info()
|
|
catch
|
|
%% if gateway is not available, for instance
|
|
_:_ ->
|
|
#{}
|
|
end.
|
|
|
|
get_rule_engine_and_bridge_info() ->
|
|
#{
|
|
num_rules := NumRules,
|
|
referenced_bridges := ReferencedBridges
|
|
} = emqx_rule_engine:get_basic_usage_info(),
|
|
#{
|
|
num_bridges := NumDataBridges,
|
|
count_by_type := BridgeTypeCount
|
|
} = emqx_bridge:get_basic_usage_info(),
|
|
BridgeInfo =
|
|
maps:fold(
|
|
fun(BridgeType, BridgeCount, Acc) ->
|
|
ReferencingRules = maps:get(BridgeType, ReferencedBridges, 0),
|
|
Acc#{
|
|
BridgeType => #{
|
|
num => BridgeCount,
|
|
num_linked_by_rules => ReferencingRules
|
|
}
|
|
}
|
|
end,
|
|
#{},
|
|
BridgeTypeCount
|
|
),
|
|
#{
|
|
rule_engine => #{num_rules => NumRules},
|
|
bridge => #{
|
|
num_data_bridges => NumDataBridges,
|
|
data_bridge => BridgeInfo
|
|
}
|
|
}.
|
|
|
|
get_exhook_info() ->
|
|
emqx_exhook:get_basic_usage_info().
|
|
|
|
bin(L) when is_list(L) ->
|
|
list_to_binary(L);
|
|
bin(A) when is_atom(A) ->
|
|
atom_to_binary(A);
|
|
bin(B) when is_binary(B) ->
|
|
B.
|
|
|
|
ensure_uuids() ->
|
|
{atomic, {NodeUUID, ClusterUUID}} = mria:transaction(
|
|
?TELEMETRY_SHARD, fun ?MODULE:do_ensure_uuids/0
|
|
),
|
|
save_uuid_to_file(NodeUUID, node),
|
|
save_uuid_to_file(ClusterUUID, cluster),
|
|
{NodeUUID, ClusterUUID}.
|
|
|
|
do_ensure_uuids() ->
|
|
NodeUUID =
|
|
case mnesia:wread({?TELEMETRY, node()}) of
|
|
[] ->
|
|
NodeUUID0 =
|
|
case get_uuid_from_file(node) of
|
|
{ok, NUUID} -> NUUID;
|
|
undefined -> generate_uuid()
|
|
end,
|
|
mnesia:write(
|
|
?TELEMETRY,
|
|
#telemetry{
|
|
id = node(),
|
|
uuid = NodeUUID0
|
|
},
|
|
write
|
|
),
|
|
NodeUUID0;
|
|
[#telemetry{uuid = NodeUUID0}] ->
|
|
NodeUUID0
|
|
end,
|
|
ClusterUUID =
|
|
case mnesia:wread({?TELEMETRY, ?CLUSTER_UUID_KEY}) of
|
|
[] ->
|
|
ClusterUUID0 =
|
|
case get_uuid_from_file(cluster) of
|
|
{ok, CUUID} -> CUUID;
|
|
undefined -> generate_uuid()
|
|
end,
|
|
mnesia:write(
|
|
?TELEMETRY,
|
|
#telemetry{
|
|
id = ?CLUSTER_UUID_KEY,
|
|
uuid = ClusterUUID0
|
|
},
|
|
write
|
|
),
|
|
ClusterUUID0;
|
|
[#telemetry{uuid = ClusterUUID0}] ->
|
|
ClusterUUID0
|
|
end,
|
|
{NodeUUID, ClusterUUID}.
|
|
|
|
get_uuid_from_file(Type) ->
|
|
Path = uuid_file_path(Type),
|
|
case file:read_file(Path) of
|
|
{ok,
|
|
UUID =
|
|
<<_:8/binary, "-", _:4/binary, "-", _:4/binary, "-", _:4/binary, "-", _:12/binary>>} ->
|
|
{ok, UUID};
|
|
_ ->
|
|
undefined
|
|
end.
|
|
|
|
save_uuid_to_file(UUID, Type) when is_binary(UUID) ->
|
|
Path = uuid_file_path(Type),
|
|
ok = filelib:ensure_dir(Path),
|
|
ok = file:write_file(Path, UUID).
|
|
|
|
uuid_file_path(Type) ->
|
|
DataDir = emqx:data_dir(),
|
|
Filename =
|
|
case Type of
|
|
node -> ?NODE_UUID_FILENAME;
|
|
cluster -> ?CLUSTER_UUID_FILENAME
|
|
end,
|
|
filename:join(DataDir, Filename).
|
|
|
|
empty_state() ->
|
|
#state{
|
|
url = ?TELEMETRY_URL,
|
|
report_interval = timer:seconds(?REPORT_INTERVAL)
|
|
}.
|