diff --git a/etc/emqx.conf b/etc/emqx.conf index 59ae3be38..9754cbfbf 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2187,4 +2187,33 @@ alarm.size_limit = 1000 ## Default: 24h alarm.validity_period = 24h +##-------------------------------------------------------------------- +## Telemetry +##-------------------------------------------------------------------- + +## Enable telemetry +## +## Value: true | false +## +## Default: false +telemetry.enabled = false + +## The destination URL for the telemetry data report +## +## Value: String +## +## Default: https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry +telemetry.url = https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry + +## Interval for reporting telemetry data +## +## Value: Duration +## -d: day +## -h: hour +## -m: minute +## -s: second +## +## Default: 7d +telemetry.report_interval = 7d + {{ additional_configs }} diff --git a/priv/emqx.schema b/priv/emqx.schema index 38aa55998..8b2d2a5af 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2165,3 +2165,28 @@ end}. {size_limit, cuttlefish:conf_get("alarm.size_limit", Conf)}, {validity_period, cuttlefish:conf_get("alarm.validity_period", Conf)}] end}. + +%%-------------------------------------------------------------------- +%% Telemetry +%%-------------------------------------------------------------------- +{mapping, "telemetry.enabled", "emqx.telemetry", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "telemetry.url", "emqx.telemetry", [ + {default, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry"}, + {datatype, string} +]}. + +{mapping, "telemetry.report_interval", "emqx.telemetry", [ + {default, "7d"}, + {datatype, {duration, s}} +]}. + +{translation, "emqx.telemetry", fun(Conf) -> + [ {enabled, cuttlefish:conf_get("telemetry.enabled", Conf)} + , {url, cuttlefish:conf_get("telemetry.url", Conf)} + , {report_interval, cuttlefish:conf_get("telemetry.report_interval", Conf)} + ] +end}. \ No newline at end of file diff --git a/src/emqx_kernel_sup.erl b/src/emqx_kernel_sup.erl index a1ba9cfe4..3864ad4ac 100644 --- a/src/emqx_kernel_sup.erl +++ b/src/emqx_kernel_sup.erl @@ -32,24 +32,29 @@ init([]) -> child_spec(emqx_hooks, worker), child_spec(emqx_stats, worker), child_spec(emqx_metrics, worker), + child_spec(emqx_telemetry, worker, [config(telemetry)]), child_spec(emqx_ctl, worker), child_spec(emqx_zone, worker)]}}. -child_spec(M, worker) -> +child_spec(M, Type) -> + child_spec(M, Type, []). + +child_spec(M, worker, Args) -> #{id => M, - start => {M, start_link, []}, + start => {M, start_link, Args}, restart => permanent, shutdown => 5000, type => worker, modules => [M] }; -child_spec(M, supervisor) -> +child_spec(M, supervisor, Args) -> #{id => M, - start => {M, start_link, []}, + start => {M, start_link, Args}, restart => permanent, shutdown => infinity, type => supervisor, modules => [M] }. +config(Name) -> emqx:get_env(Name, []). \ No newline at end of file diff --git a/src/emqx_telemetry.erl b/src/emqx_telemetry.erl new file mode 100644 index 000000000..33fb80f64 --- /dev/null +++ b/src/emqx_telemetry.erl @@ -0,0 +1,401 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_telemetry). + +-behaviour(gen_server). + +-include("emqx.hrl"). +-include("logger.hrl"). + + -include_lib("kernel/include/file.hrl"). + +-logger_header("[Telemetry]"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-export([ start_link/1 + , stop/0 + ]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-export([ enable/0 + , disable/0 + , is_enabled/0 + , get_uuid/0 + , get_telemetry/0 + ]). + +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. + +-import(proplists, [ get_value/2 + , get_value/3 + ]). + +-record(telemetry, { + id :: non_neg_integer(), + + uuid :: binary(), + + enabled :: boolean() + }). + +-record(state, { + uuid :: undefined | binary(), + + enabled :: undefined | boolean(), + + url :: string(), + + report_interval :: undefined | non_neg_integer(), + + timer = undefined :: undefined | reference() + }). + +%% The count of 100-nanosecond intervals between the UUID epoch +%% 1582-10-15 00:00:00 and the UNIX epoch 1970-01-01 00:00:00. +-define(GREGORIAN_EPOCH_OFFSET, 16#01b21dd213814000). + +-define(UNIQUE_ID, 9527). + +-define(TELEMETRY, emqx_telemetry). + +%%-------------------------------------------------------------------- +%% Mnesia bootstrap +%%-------------------------------------------------------------------- + +mnesia(boot) -> + ok = ekka_mnesia:create_table(?TELEMETRY, + [{type, set}, + {disc_copies, [node()]}, + {local_content, true}, + {record_name, telemetry}, + {attributes, record_info(fields, telemetry)}]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?TELEMETRY, disc_copies). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +start_link(Opts) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). + +stop() -> + gen_server:stop(?MODULE). + +enable() -> + gen_server:call(?MODULE, enable). + +disable() -> + gen_server:call(?MODULE, disable). + +is_enabled() -> + gen_server:call(?MODULE, is_enabled). + +get_uuid() -> + gen_server:call(?MODULE, get_uuid). + +get_telemetry() -> + gen_server:call(?MODULE, get_telemetry). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([Opts]) -> + State = #state{url = get_value(url, Opts), + report_interval = timer:seconds(get_value(report_interval, Opts))}, + NState = case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of + [] -> + Enabled = get_value(enabled, Opts), + UUID = generate_uuid(), + mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, + uuid = UUID, + enabled = Enabled}), + State#state{enabled = Enabled, uuid = UUID}; + [#telemetry{uuid = UUID, enabled = Enabled} | _] -> + State#state{enabled = Enabled, uuid = UUID} + end, + {ok, ensure_first_report_timer(timer:seconds(1), NState)}. + +handle_call(enable, _From, State = #state{uuid = UUID}) -> + mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, + uuid = UUID, + enabled = true}), + {reply, ok, ensure_report_timer(State#state{enabled = true})}; + +handle_call(disable, _From, State = #state{uuid = UUID}) -> + mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, + uuid = UUID, + enabled = false}), + {reply, ok, State#state{enabled = false}}; + +handle_call(is_enabled, _From, State = #state{enabled = Enabled}) -> + {reply, Enabled, State}; + +handle_call(get_uuid, _From, State = #state{uuid = UUID}) -> + {reply, {ok, UUID}, State}; + +handle_call(get_telemetry, _From, State) -> + {reply, {ok, get_telemetry(State)}, State}; + +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?LOG(error, "Unexpected msg: ~p", [Msg]), + {noreply, State}. + +handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer = TRef, + enabled = false}) -> + {noreply, State}; +handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer = TRef}) -> + report_telemetry(State), + {noreply, ensure_report_timer(State)}; + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +ensure_first_report_timer(FirstReportInterval, State) -> + State#state{timer = emqx_misc:start_timer(FirstReportInterval, time_to_report_telemetry_data)}. + +ensure_report_timer(State = #state{report_interval = ReportInterval}) -> + State#state{timer = emqx_misc:start_timer(ReportInterval, time_to_report_telemetry_data)}. + +emqx_version() -> + {ok, Version} = application:get_key(emqx, vsn), + Version. + +license() -> + case application:get_key(emqx, description) of + {ok, "EMQ X Broker"} -> + [{edition, <<"community">>}]; + {ok, "EMQ X Enterprise"} -> + case search_license_callback() of + {error, not_found} -> + [{edition, <<"enterprise">>}]; + {M, F} -> + case erlang:function_exported(M, F, 0) of + true -> + erlang:apply(M, F, []); + false -> + [{edition, <<"enterprise">>}] + end + end + end. + +os_info() -> + case erlang:system_info(os_type) of + {unix,darwin} -> + [Name | _] = string:tokens(os:cmd("sw_vers -productName"), "\n"), + [Version | _] = string:tokens(os:cmd("sw_vers -productVersion"), "\n"), + [{os_name, Name}, + {os_version, Version}]; + {unix, _} -> + case file:read_file_info("/etc/os-release") of + {error, _} -> + [{os_name, "Unknown"}, + {os_version, "Unknown"}]; + {ok, FileInfo} -> + case FileInfo#file_info.access of + Access when Access =:= read orelse Access =:= read_write -> + OSInfo = lists:foldl(fun(Line, Acc) -> + [Var, Value] = string:tokens(Line, "="), + NValue = case Value of + _ when is_list(Value) -> + lists:nth(1, string:tokens(Value, "\"")); + _ -> + Value + end, + [{Var, NValue} | Acc] + end, [], string:tokens(os:cmd("cat /etc/os-release"), "\n")), + [{os_name, get_value("NAME", OSInfo)}, + {os_version, get_value("VERSION", OSInfo, get_value("VERSION_ID", OSInfo))}]; + _ -> + [{os_name, "Unknown"}, + {os_version, "Unknown"}] + end + end; + {win32, nt} -> + Ver = os:cmd("ver"), + case re:run(Ver, "[a-zA-Z ]+ \\[Version ([0-9]+[\.])+[0-9]+\\]", [{capture, none}]) of + match -> + [NVer | _] = string:tokens(Ver, "\r\n"), + {match, [Version]} = re:run(NVer, "([0-9]+[\.])+[0-9]+", [{capture, first, list}]), + [Name | _] = string:split(NVer, " [Version "), + [{os_name, Name}, + {os_version, Version}]; + nomatch -> + [{os_name, "Unknown"}, + {os_version, "Unknown"}] + end + end. + +otp_version() -> + erlang:system_info(otp_release). + +uptime() -> + element(1, erlang:statistics(wall_clock)). + +nodes_uuid() -> + Nodes = lists:delete(node(), ekka_mnesia:running_nodes()), + lists:foldl(fun(Node, Acc) -> + case rpc:call(Node, ?MODULE, get_uuid, []) of + {badrpc, _Reason} -> + Acc; + UUID -> + [UUID | Acc] + end + end, [], Nodes). + +active_plugins() -> + lists:foldl(fun(#plugin{name = Name, active = Active}, Acc) -> + case Active of + true -> [Name | Acc]; + false -> Acc + end + end, [], emqx_plugins:list()). + +active_modules() -> + lists:foldl(fun({Name, Persistent}, Acc) -> + case Persistent of + true -> [Name | Acc]; + false -> Acc + end + end, [], emqx_modules:list()). + +num_clients() -> + emqx_stats:getstat('connections.count'). + +messages_sent() -> + emqx_metrics:val('messages.sent'). + +messages_received() -> + emqx_metrics:val('messages.received'). + +generate_uuid() -> + MicroSeconds = erlang:system_time(microsecond), + Timestamp = MicroSeconds * 10 + ?GREGORIAN_EPOCH_OFFSET, + <> = <>, + <> = crypto:strong_rand_bytes(4), + <> = crypto:strong_rand_bytes(6), + <> = <<16#01:4, TimeHigh:12>>, + <> = <<1:1, 0:1, ClockSeq:14>>, + <> = <>, + list_to_binary(io_lib:format("~.16B-~.16B-~.16B-~.16B-~.16B", [TimeLow, TimeMid, NTimeHigh, NClockSeq, Node])). + +get_telemetry(#state{uuid = UUID}) -> + OSInfo = os_info(), + [{emqx_version, bin(emqx_version())}, + {license, license()}, + {os_name, bin(get_value(os_name, OSInfo))}, + {os_version, bin(get_value(os_version, OSInfo))}, + {otp_version, bin(otp_version())}, + {up_time, uptime()}, + {uuid, UUID}, + {nodes_uuid, nodes_uuid()}, + {active_plugins, active_plugins()}, + {active_modules, active_modules()}, + {num_clients, num_clients()}, + {messages_received, messages_received()}, + {messages_sent, messages_sent()}]. + +report_telemetry(State = #state{url = URL}) -> + Data = get_telemetry(State), + case emqx_json:safe_encode(Data) of + {ok, Bin} -> + case httpc_request(post, URL, [], Bin) of + {ok, {{_, StatusCode, _}, _, _}} + when StatusCode =:= 200 orelse StatusCode =:= 204 -> + ?LOG(debug, "Report ~p successfully", [Bin]); + {ok, {{_, StatusCode, ReasonPhrase}, _, Body}} -> + ?LOG(error, "Report ~p failed due to ~p ~s(~s)", [Bin, StatusCode, ReasonPhrase, Body]); + {error, Reason} -> + ?LOG(error, "Report ~p failed due to ~p", [Bin, Reason]) + end; + {error, Reason} -> + ?LOG(error, "Encode ~p failed due to ~p", [Data, Reason]) + end. + +httpc_request(Method, URL, Headers, Body) -> + httpc:request(Method, {URL, Headers, "application/json", Body}, [], []). + +ignore_lib_apps(Apps) -> + LibApps = [kernel, stdlib, sasl, appmon, eldap, erts, + syntax_tools, ssl, crypto, mnesia, os_mon, + inets, goldrush, gproc, runtime_tools, + snmp, otp_mibs, public_key, asn1, ssh, hipe, + common_test, observer, webtool, xmerl, tools, + test_server, compiler, debugger, eunit, et, + wx], + [AppName || {AppName, _, _} <- Apps, not lists:member(AppName, LibApps)]. + +search_license_callback() -> + search_license_callback(ignore_lib_apps(application:loaded_applications()), []). + +search_license_callback([], []) -> + {error, not_found}; +search_license_callback([], [Callback | _]) -> + Callback; +search_license_callback([App | More], Acc) -> + {ok, Modules} = application:get_key(App, modules), + Callbacks = lists:foldl(fun(Module, AccIn) -> + case proplists:get_value(license_callback, module_attributes(Module), undefined) of + undefined -> AccIn; + [Callback | _] -> [{Module, Callback} | AccIn] + end + end, [], Modules), + search_license_callback(More, Acc ++ Callbacks). + +module_attributes(Module) -> + try Module:module_info(attributes) + catch + error:undef -> []; + error:Reason -> error(Reason) + end. + +bin(L) when is_list(L) -> + list_to_binary(L); +bin(B) when is_binary(B) -> + B. \ No newline at end of file diff --git a/test/emqx_telemetry_SUITE.erl b/test/emqx_telemetry_SUITE.erl new file mode 100644 index 000000000..089abae48 --- /dev/null +++ b/test/emqx_telemetry_SUITE.erl @@ -0,0 +1,65 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_telemetry_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-import(proplists, [get_value/2]). + +all() -> emqx_ct:all(?MODULE). + +init_per_testcase(_, Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + Config. + +end_per_testcase(_, _Config) -> + emqx_ct_helpers:stop_apps([]). + +t_uuid(_) -> + UUID = emqx_telemetry:generate_uuid(), + Parts = binary:split(UUID, <<"-">>, [global, trim]), + ?assertEqual(5, length(Parts)), + {ok, UUID2} = emqx_telemetry:get_uuid(), + emqx_telemetry:stop(), + emqx_telemetry:start_link([{enabled, true}, + {url, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry"}, + {report_interval, 7 * 24 * 60 * 60}]), + {ok, UUID3} = emqx_telemetry:get_uuid(), + ?assertEqual(UUID2, UUID3). + +t_get_telemetry(_) -> + {ok, TelemetryData} = emqx_telemetry:get_telemetry(), + OTPVersion = bin(erlang:system_info(otp_release)), + ?assertEqual(OTPVersion, get_value(otp_version, TelemetryData)), + {ok, UUID} = emqx_telemetry:get_uuid(), + ?assertEqual(UUID, get_value(uuid, TelemetryData)), + ?assertEqual(0, get_value(num_clients, TelemetryData)). + +t_enable(_) -> + ok = emqx_telemetry:enable(), + ?assertEqual(true, emqx_telemetry:is_enabled()), + ok = emqx_telemetry:disable(), + ?assertEqual(false, emqx_telemetry:is_enabled()). + +bin(L) when is_list(L) -> + list_to_binary(L); +bin(B) when is_binary(B) -> + B. \ No newline at end of file