diff --git a/etc/emqx.conf b/etc/emqx.conf index 9754cbfbf..50f733de9 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2186,34 +2186,3 @@ alarm.size_limit = 1000 ## ## Default: 24h alarm.validity_period = 24h - -##-------------------------------------------------------------------- -## Telemetry -##-------------------------------------------------------------------- - -## Enable telemetry -## -## Value: true | false -## -## Default: false -telemetry.enabled = false - -## The destination URL for the telemetry data report -## -## Value: String -## -## Default: https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry -telemetry.url = https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry - -## Interval for reporting telemetry data -## -## Value: Duration -## -d: day -## -h: hour -## -m: minute -## -s: second -## -## Default: 7d -telemetry.report_interval = 7d - -{{ additional_configs }} diff --git a/priv/emqx.schema b/priv/emqx.schema index 8b2d2a5af..38aa55998 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2165,28 +2165,3 @@ end}. {size_limit, cuttlefish:conf_get("alarm.size_limit", Conf)}, {validity_period, cuttlefish:conf_get("alarm.validity_period", Conf)}] end}. - -%%-------------------------------------------------------------------- -%% Telemetry -%%-------------------------------------------------------------------- -{mapping, "telemetry.enabled", "emqx.telemetry", [ - {default, false}, - {datatype, {enum, [true, false]}} -]}. - -{mapping, "telemetry.url", "emqx.telemetry", [ - {default, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry"}, - {datatype, string} -]}. - -{mapping, "telemetry.report_interval", "emqx.telemetry", [ - {default, "7d"}, - {datatype, {duration, s}} -]}. - -{translation, "emqx.telemetry", fun(Conf) -> - [ {enabled, cuttlefish:conf_get("telemetry.enabled", Conf)} - , {url, cuttlefish:conf_get("telemetry.url", Conf)} - , {report_interval, cuttlefish:conf_get("telemetry.report_interval", Conf)} - ] -end}. \ No newline at end of file diff --git a/src/emqx_kernel_sup.erl b/src/emqx_kernel_sup.erl index 3864ad4ac..82f196913 100644 --- a/src/emqx_kernel_sup.erl +++ b/src/emqx_kernel_sup.erl @@ -32,7 +32,6 @@ init([]) -> child_spec(emqx_hooks, worker), child_spec(emqx_stats, worker), child_spec(emqx_metrics, worker), - child_spec(emqx_telemetry, worker, [config(telemetry)]), child_spec(emqx_ctl, worker), child_spec(emqx_zone, worker)]}}. @@ -56,5 +55,3 @@ child_spec(M, supervisor, Args) -> type => supervisor, modules => [M] }. - -config(Name) -> emqx:get_env(Name, []). \ No newline at end of file diff --git a/src/emqx_telemetry.erl b/src/emqx_telemetry.erl deleted file mode 100644 index 33fb80f64..000000000 --- a/src/emqx_telemetry.erl +++ /dev/null @@ -1,401 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_telemetry). - --behaviour(gen_server). - --include("emqx.hrl"). --include("logger.hrl"). - - -include_lib("kernel/include/file.hrl"). - --logger_header("[Telemetry]"). - -%% Mnesia bootstrap --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - --export([ start_link/1 - , stop/0 - ]). - -%% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). - --export([ enable/0 - , disable/0 - , is_enabled/0 - , get_uuid/0 - , get_telemetry/0 - ]). - --ifdef(TEST). --compile(export_all). --compile(nowarn_export_all). --endif. - --import(proplists, [ get_value/2 - , get_value/3 - ]). - --record(telemetry, { - id :: non_neg_integer(), - - uuid :: binary(), - - enabled :: boolean() - }). - --record(state, { - uuid :: undefined | binary(), - - enabled :: undefined | boolean(), - - url :: string(), - - report_interval :: undefined | non_neg_integer(), - - timer = undefined :: undefined | reference() - }). - -%% The count of 100-nanosecond intervals between the UUID epoch -%% 1582-10-15 00:00:00 and the UNIX epoch 1970-01-01 00:00:00. --define(GREGORIAN_EPOCH_OFFSET, 16#01b21dd213814000). - --define(UNIQUE_ID, 9527). - --define(TELEMETRY, emqx_telemetry). - -%%-------------------------------------------------------------------- -%% Mnesia bootstrap -%%-------------------------------------------------------------------- - -mnesia(boot) -> - ok = ekka_mnesia:create_table(?TELEMETRY, - [{type, set}, - {disc_copies, [node()]}, - {local_content, true}, - {record_name, telemetry}, - {attributes, record_info(fields, telemetry)}]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TELEMETRY, disc_copies). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - -start_link(Opts) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). - -stop() -> - gen_server:stop(?MODULE). - -enable() -> - gen_server:call(?MODULE, enable). - -disable() -> - gen_server:call(?MODULE, disable). - -is_enabled() -> - gen_server:call(?MODULE, is_enabled). - -get_uuid() -> - gen_server:call(?MODULE, get_uuid). - -get_telemetry() -> - gen_server:call(?MODULE, get_telemetry). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init([Opts]) -> - State = #state{url = get_value(url, Opts), - report_interval = timer:seconds(get_value(report_interval, Opts))}, - NState = case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of - [] -> - Enabled = get_value(enabled, Opts), - UUID = generate_uuid(), - mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, - uuid = UUID, - enabled = Enabled}), - State#state{enabled = Enabled, uuid = UUID}; - [#telemetry{uuid = UUID, enabled = Enabled} | _] -> - State#state{enabled = Enabled, uuid = UUID} - end, - {ok, ensure_first_report_timer(timer:seconds(1), NState)}. - -handle_call(enable, _From, State = #state{uuid = UUID}) -> - mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, - uuid = UUID, - enabled = true}), - {reply, ok, ensure_report_timer(State#state{enabled = true})}; - -handle_call(disable, _From, State = #state{uuid = UUID}) -> - mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, - uuid = UUID, - enabled = false}), - {reply, ok, State#state{enabled = false}}; - -handle_call(is_enabled, _From, State = #state{enabled = Enabled}) -> - {reply, Enabled, State}; - -handle_call(get_uuid, _From, State = #state{uuid = UUID}) -> - {reply, {ok, UUID}, State}; - -handle_call(get_telemetry, _From, State) -> - {reply, {ok, get_telemetry(State)}, State}; - -handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - ?LOG(error, "Unexpected msg: ~p", [Msg]), - {noreply, State}. - -handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer = TRef, - enabled = false}) -> - {noreply, State}; -handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer = TRef}) -> - report_telemetry(State), - {noreply, ensure_report_timer(State)}; - -handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%------------------------------------------------------------------------------ -%% Internal functions -%%------------------------------------------------------------------------------ - -ensure_first_report_timer(FirstReportInterval, State) -> - State#state{timer = emqx_misc:start_timer(FirstReportInterval, time_to_report_telemetry_data)}. - -ensure_report_timer(State = #state{report_interval = ReportInterval}) -> - State#state{timer = emqx_misc:start_timer(ReportInterval, time_to_report_telemetry_data)}. - -emqx_version() -> - {ok, Version} = application:get_key(emqx, vsn), - Version. - -license() -> - case application:get_key(emqx, description) of - {ok, "EMQ X Broker"} -> - [{edition, <<"community">>}]; - {ok, "EMQ X Enterprise"} -> - case search_license_callback() of - {error, not_found} -> - [{edition, <<"enterprise">>}]; - {M, F} -> - case erlang:function_exported(M, F, 0) of - true -> - erlang:apply(M, F, []); - false -> - [{edition, <<"enterprise">>}] - end - end - end. - -os_info() -> - case erlang:system_info(os_type) of - {unix,darwin} -> - [Name | _] = string:tokens(os:cmd("sw_vers -productName"), "\n"), - [Version | _] = string:tokens(os:cmd("sw_vers -productVersion"), "\n"), - [{os_name, Name}, - {os_version, Version}]; - {unix, _} -> - case file:read_file_info("/etc/os-release") of - {error, _} -> - [{os_name, "Unknown"}, - {os_version, "Unknown"}]; - {ok, FileInfo} -> - case FileInfo#file_info.access of - Access when Access =:= read orelse Access =:= read_write -> - OSInfo = lists:foldl(fun(Line, Acc) -> - [Var, Value] = string:tokens(Line, "="), - NValue = case Value of - _ when is_list(Value) -> - lists:nth(1, string:tokens(Value, "\"")); - _ -> - Value - end, - [{Var, NValue} | Acc] - end, [], string:tokens(os:cmd("cat /etc/os-release"), "\n")), - [{os_name, get_value("NAME", OSInfo)}, - {os_version, get_value("VERSION", OSInfo, get_value("VERSION_ID", OSInfo))}]; - _ -> - [{os_name, "Unknown"}, - {os_version, "Unknown"}] - end - end; - {win32, nt} -> - Ver = os:cmd("ver"), - case re:run(Ver, "[a-zA-Z ]+ \\[Version ([0-9]+[\.])+[0-9]+\\]", [{capture, none}]) of - match -> - [NVer | _] = string:tokens(Ver, "\r\n"), - {match, [Version]} = re:run(NVer, "([0-9]+[\.])+[0-9]+", [{capture, first, list}]), - [Name | _] = string:split(NVer, " [Version "), - [{os_name, Name}, - {os_version, Version}]; - nomatch -> - [{os_name, "Unknown"}, - {os_version, "Unknown"}] - end - end. - -otp_version() -> - erlang:system_info(otp_release). - -uptime() -> - element(1, erlang:statistics(wall_clock)). - -nodes_uuid() -> - Nodes = lists:delete(node(), ekka_mnesia:running_nodes()), - lists:foldl(fun(Node, Acc) -> - case rpc:call(Node, ?MODULE, get_uuid, []) of - {badrpc, _Reason} -> - Acc; - UUID -> - [UUID | Acc] - end - end, [], Nodes). - -active_plugins() -> - lists:foldl(fun(#plugin{name = Name, active = Active}, Acc) -> - case Active of - true -> [Name | Acc]; - false -> Acc - end - end, [], emqx_plugins:list()). - -active_modules() -> - lists:foldl(fun({Name, Persistent}, Acc) -> - case Persistent of - true -> [Name | Acc]; - false -> Acc - end - end, [], emqx_modules:list()). - -num_clients() -> - emqx_stats:getstat('connections.count'). - -messages_sent() -> - emqx_metrics:val('messages.sent'). - -messages_received() -> - emqx_metrics:val('messages.received'). - -generate_uuid() -> - MicroSeconds = erlang:system_time(microsecond), - Timestamp = MicroSeconds * 10 + ?GREGORIAN_EPOCH_OFFSET, - <> = <>, - <> = crypto:strong_rand_bytes(4), - <> = crypto:strong_rand_bytes(6), - <> = <<16#01:4, TimeHigh:12>>, - <> = <<1:1, 0:1, ClockSeq:14>>, - <> = <>, - list_to_binary(io_lib:format("~.16B-~.16B-~.16B-~.16B-~.16B", [TimeLow, TimeMid, NTimeHigh, NClockSeq, Node])). - -get_telemetry(#state{uuid = UUID}) -> - OSInfo = os_info(), - [{emqx_version, bin(emqx_version())}, - {license, license()}, - {os_name, bin(get_value(os_name, OSInfo))}, - {os_version, bin(get_value(os_version, OSInfo))}, - {otp_version, bin(otp_version())}, - {up_time, uptime()}, - {uuid, UUID}, - {nodes_uuid, nodes_uuid()}, - {active_plugins, active_plugins()}, - {active_modules, active_modules()}, - {num_clients, num_clients()}, - {messages_received, messages_received()}, - {messages_sent, messages_sent()}]. - -report_telemetry(State = #state{url = URL}) -> - Data = get_telemetry(State), - case emqx_json:safe_encode(Data) of - {ok, Bin} -> - case httpc_request(post, URL, [], Bin) of - {ok, {{_, StatusCode, _}, _, _}} - when StatusCode =:= 200 orelse StatusCode =:= 204 -> - ?LOG(debug, "Report ~p successfully", [Bin]); - {ok, {{_, StatusCode, ReasonPhrase}, _, Body}} -> - ?LOG(error, "Report ~p failed due to ~p ~s(~s)", [Bin, StatusCode, ReasonPhrase, Body]); - {error, Reason} -> - ?LOG(error, "Report ~p failed due to ~p", [Bin, Reason]) - end; - {error, Reason} -> - ?LOG(error, "Encode ~p failed due to ~p", [Data, Reason]) - end. - -httpc_request(Method, URL, Headers, Body) -> - httpc:request(Method, {URL, Headers, "application/json", Body}, [], []). - -ignore_lib_apps(Apps) -> - LibApps = [kernel, stdlib, sasl, appmon, eldap, erts, - syntax_tools, ssl, crypto, mnesia, os_mon, - inets, goldrush, gproc, runtime_tools, - snmp, otp_mibs, public_key, asn1, ssh, hipe, - common_test, observer, webtool, xmerl, tools, - test_server, compiler, debugger, eunit, et, - wx], - [AppName || {AppName, _, _} <- Apps, not lists:member(AppName, LibApps)]. - -search_license_callback() -> - search_license_callback(ignore_lib_apps(application:loaded_applications()), []). - -search_license_callback([], []) -> - {error, not_found}; -search_license_callback([], [Callback | _]) -> - Callback; -search_license_callback([App | More], Acc) -> - {ok, Modules} = application:get_key(App, modules), - Callbacks = lists:foldl(fun(Module, AccIn) -> - case proplists:get_value(license_callback, module_attributes(Module), undefined) of - undefined -> AccIn; - [Callback | _] -> [{Module, Callback} | AccIn] - end - end, [], Modules), - search_license_callback(More, Acc ++ Callbacks). - -module_attributes(Module) -> - try Module:module_info(attributes) - catch - error:undef -> []; - error:Reason -> error(Reason) - end. - -bin(L) when is_list(L) -> - list_to_binary(L); -bin(B) when is_binary(B) -> - B. \ No newline at end of file diff --git a/test/emqx_telemetry_SUITE.erl b/test/emqx_telemetry_SUITE.erl deleted file mode 100644 index 089abae48..000000000 --- a/test/emqx_telemetry_SUITE.erl +++ /dev/null @@ -1,65 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_telemetry_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). - --import(proplists, [get_value/2]). - -all() -> emqx_ct:all(?MODULE). - -init_per_testcase(_, Config) -> - emqx_ct_helpers:boot_modules(all), - emqx_ct_helpers:start_apps([]), - Config. - -end_per_testcase(_, _Config) -> - emqx_ct_helpers:stop_apps([]). - -t_uuid(_) -> - UUID = emqx_telemetry:generate_uuid(), - Parts = binary:split(UUID, <<"-">>, [global, trim]), - ?assertEqual(5, length(Parts)), - {ok, UUID2} = emqx_telemetry:get_uuid(), - emqx_telemetry:stop(), - emqx_telemetry:start_link([{enabled, true}, - {url, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry"}, - {report_interval, 7 * 24 * 60 * 60}]), - {ok, UUID3} = emqx_telemetry:get_uuid(), - ?assertEqual(UUID2, UUID3). - -t_get_telemetry(_) -> - {ok, TelemetryData} = emqx_telemetry:get_telemetry(), - OTPVersion = bin(erlang:system_info(otp_release)), - ?assertEqual(OTPVersion, get_value(otp_version, TelemetryData)), - {ok, UUID} = emqx_telemetry:get_uuid(), - ?assertEqual(UUID, get_value(uuid, TelemetryData)), - ?assertEqual(0, get_value(num_clients, TelemetryData)). - -t_enable(_) -> - ok = emqx_telemetry:enable(), - ?assertEqual(true, emqx_telemetry:is_enabled()), - ok = emqx_telemetry:disable(), - ?assertEqual(false, emqx_telemetry:is_enabled()). - -bin(L) when is_list(L) -> - list_to_binary(L); -bin(B) when is_binary(B) -> - B. \ No newline at end of file