feat(telemetry): add cluster uuid
This commit is contained in:
parent
d41389b34f
commit
1c71237abe
|
@ -47,7 +47,8 @@
|
|||
]).
|
||||
|
||||
-export([
|
||||
get_uuid/0,
|
||||
get_node_uuid/0,
|
||||
get_cluster_uuid/0,
|
||||
get_telemetry/0
|
||||
]).
|
||||
|
||||
|
@ -67,12 +68,13 @@
|
|||
]).
|
||||
|
||||
-record(telemetry, {
|
||||
id :: non_neg_integer(),
|
||||
id :: atom(),
|
||||
uuid :: binary()
|
||||
}).
|
||||
|
||||
-record(state, {
|
||||
uuid :: undefined | binary(),
|
||||
node_uuid :: undefined | binary(),
|
||||
cluster_uuid :: undefined | binary(),
|
||||
url :: string(),
|
||||
report_interval :: non_neg_integer(),
|
||||
timer = undefined :: undefined | reference(),
|
||||
|
@ -85,10 +87,12 @@
|
|||
%% 1582-10-15 00:00:00 and the UNIX epoch 1970-01-01 00:00:00.
|
||||
-define(GREGORIAN_EPOCH_OFFSET, 16#01b21dd213814000).
|
||||
|
||||
-define(UNIQUE_ID, 9527).
|
||||
-define(CLUSTER_UUID_KEY, cluster_uuid).
|
||||
|
||||
-define(TELEMETRY, emqx_telemetry).
|
||||
|
||||
-define(TELEMETRY_SHARD, emqx_telemetry_shard).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -99,7 +103,7 @@ start_link() ->
|
|||
[
|
||||
{type, set},
|
||||
{storage, disc_copies},
|
||||
{local_content, true},
|
||||
{rlog_shard, ?TELEMETRY_SHARD},
|
||||
{record_name, telemetry},
|
||||
{attributes, record_info(fields, telemetry)}
|
||||
]
|
||||
|
@ -117,8 +121,11 @@ enable() ->
|
|||
disable() ->
|
||||
gen_server:call(?MODULE, disable).
|
||||
|
||||
get_uuid() ->
|
||||
gen_server:call(?MODULE, get_uuid).
|
||||
get_node_uuid() ->
|
||||
gen_server:call(?MODULE, get_node_uuid).
|
||||
|
||||
get_cluster_uuid() ->
|
||||
gen_server:call(?MODULE, get_cluster_uuid).
|
||||
|
||||
get_telemetry() ->
|
||||
gen_server:call(?MODULE, get_telemetry).
|
||||
|
@ -135,19 +142,8 @@ get_telemetry() ->
|
|||
-dialyzer([{nowarn_function, [init/1]}]).
|
||||
init(_Opts) ->
|
||||
State0 = empty_state(),
|
||||
UUID1 =
|
||||
case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of
|
||||
[] ->
|
||||
UUID = generate_uuid(),
|
||||
mria:dirty_write(?TELEMETRY, #telemetry{
|
||||
id = ?UNIQUE_ID,
|
||||
uuid = UUID
|
||||
}),
|
||||
UUID;
|
||||
[#telemetry{uuid = UUID} | _] ->
|
||||
UUID
|
||||
end,
|
||||
{ok, State0#state{uuid = UUID1}}.
|
||||
{NodeUUID, ClusterUUID} = ensure_uuids(),
|
||||
{ok, State0#state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}}.
|
||||
|
||||
handle_call(enable, _From, State0) ->
|
||||
case ?MODULE:official_version(emqx_app:get_release()) of
|
||||
|
@ -165,7 +161,9 @@ handle_call(disable, _From, State = #state{timer = Timer}) ->
|
|||
false ->
|
||||
{reply, {error, not_official_version}, State}
|
||||
end;
|
||||
handle_call(get_uuid, _From, State = #state{uuid = UUID}) ->
|
||||
handle_call(get_node_uuid, _From, State = #state{node_uuid = UUID}) ->
|
||||
{reply, {ok, UUID}, State};
|
||||
handle_call(get_cluster_uuid, _From, State = #state{cluster_uuid = UUID}) ->
|
||||
{reply, {ok, UUID}, State};
|
||||
handle_call(get_telemetry, _From, State) ->
|
||||
{_State, Telemetry} = get_telemetry(State),
|
||||
|
@ -270,7 +268,7 @@ nodes_uuid() ->
|
|||
Nodes = lists:delete(node(), mria_mnesia:running_nodes()),
|
||||
lists:foldl(
|
||||
fun(Node, Acc) ->
|
||||
case emqx_telemetry_proto_v1:get_uuid(Node) of
|
||||
case emqx_telemetry_proto_v1:get_node_uuid(Node) of
|
||||
{badrpc, _Reason} ->
|
||||
Acc;
|
||||
{ok, UUID} ->
|
||||
|
@ -322,7 +320,7 @@ generate_uuid() ->
|
|||
).
|
||||
|
||||
-spec get_telemetry(state()) -> {state(), proplists:proplist()}.
|
||||
get_telemetry(State0 = #state{uuid = UUID}) ->
|
||||
get_telemetry(State0 = #state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}) ->
|
||||
OSInfo = os_info(),
|
||||
{MQTTRTInsights, State} = mqtt_runtime_insights(State0),
|
||||
#{
|
||||
|
@ -336,7 +334,8 @@ get_telemetry(State0 = #state{uuid = UUID}) ->
|
|||
{os_version, bin(get_value(os_version, OSInfo))},
|
||||
{otp_version, bin(otp_version())},
|
||||
{up_time, uptime()},
|
||||
{uuid, UUID},
|
||||
{uuid, NodeUUID},
|
||||
{cluster_uuid, ClusterUUID},
|
||||
{nodes_uuid, nodes_uuid()},
|
||||
{active_plugins, active_plugins()},
|
||||
{num_clients, num_clients()},
|
||||
|
@ -519,6 +518,45 @@ bin(B) when is_binary(B) ->
|
|||
bool2int(true) -> 1;
|
||||
bool2int(false) -> 0.
|
||||
|
||||
ensure_uuids() ->
|
||||
Txn = fun() ->
|
||||
NodeUUID =
|
||||
case mnesia:wread({?TELEMETRY, node()}) of
|
||||
[] ->
|
||||
NodeUUID0 = generate_uuid(),
|
||||
mnesia:write(
|
||||
?TELEMETRY,
|
||||
#telemetry{
|
||||
id = node(),
|
||||
uuid = NodeUUID0
|
||||
},
|
||||
write
|
||||
),
|
||||
NodeUUID0;
|
||||
[#telemetry{uuid = NodeUUID0}] ->
|
||||
NodeUUID0
|
||||
end,
|
||||
ClusterUUID =
|
||||
case mnesia:wread({?TELEMETRY, ?CLUSTER_UUID_KEY}) of
|
||||
[] ->
|
||||
ClusterUUID0 = generate_uuid(),
|
||||
mnesia:write(
|
||||
?TELEMETRY,
|
||||
#telemetry{
|
||||
id = ?CLUSTER_UUID_KEY,
|
||||
uuid = ClusterUUID0
|
||||
},
|
||||
write
|
||||
),
|
||||
ClusterUUID0;
|
||||
[#telemetry{uuid = ClusterUUID0}] ->
|
||||
ClusterUUID0
|
||||
end,
|
||||
{NodeUUID, ClusterUUID}
|
||||
end,
|
||||
{atomic, UUIDs} = mria:transaction(?TELEMETRY_SHARD, Txn),
|
||||
UUIDs.
|
||||
|
||||
empty_state() ->
|
||||
#state{
|
||||
url = ?TELEMETRY_URL,
|
||||
|
|
|
@ -20,7 +20,8 @@
|
|||
|
||||
-export([
|
||||
introduced_in/0,
|
||||
get_uuid/1
|
||||
get_node_uuid/1,
|
||||
get_cluster_uuid/1
|
||||
]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
@ -28,6 +29,10 @@
|
|||
introduced_in() ->
|
||||
"5.0.0".
|
||||
|
||||
-spec get_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc().
|
||||
get_uuid(Node) ->
|
||||
rpc:call(Node, emqx_telemetry, get_uuid, []).
|
||||
-spec get_node_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc().
|
||||
get_node_uuid(Node) ->
|
||||
rpc:call(Node, emqx_telemetry, get_node_uuid, []).
|
||||
|
||||
-spec get_cluster_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc().
|
||||
get_cluster_uuid(Node) ->
|
||||
rpc:call(Node, emqx_telemetry, get_cluster_uuid, []).
|
||||
|
|
|
@ -144,13 +144,7 @@ init_per_testcase(t_exhook_info, Config) ->
|
|||
{ok, _} = application:ensure_all_started(emqx_exhook),
|
||||
Config;
|
||||
init_per_testcase(_Testcase, Config) ->
|
||||
TestPID = self(),
|
||||
ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]),
|
||||
ok = meck:expect(httpc, request, fun(
|
||||
Method, {URL, Headers, _ContentType, Body}, _HTTPOpts, _Opts
|
||||
) ->
|
||||
TestPID ! {request, Method, URL, Headers, Body}
|
||||
end),
|
||||
mock_httpc(),
|
||||
Config.
|
||||
|
||||
end_per_testcase(t_get_telemetry, _Config) ->
|
||||
|
@ -198,20 +192,43 @@ end_per_testcase(_Testcase, _Config) ->
|
|||
meck:unload([httpc]),
|
||||
ok.
|
||||
|
||||
t_uuid(_) ->
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
t_node_uuid(_) ->
|
||||
UUID = emqx_telemetry:generate_uuid(),
|
||||
Parts = binary:split(UUID, <<"-">>, [global, trim]),
|
||||
?assertEqual(5, length(Parts)),
|
||||
{ok, UUID2} = emqx_telemetry:get_uuid(),
|
||||
{ok, NodeUUID2} = emqx_telemetry:get_node_uuid(),
|
||||
emqx_telemetry:disable(),
|
||||
emqx_telemetry:enable(),
|
||||
emqx_modules_conf:set_telemetry_status(false),
|
||||
emqx_modules_conf:set_telemetry_status(true),
|
||||
{ok, UUID3} = emqx_telemetry:get_uuid(),
|
||||
{ok, UUID4} = emqx_telemetry_proto_v1:get_uuid(node()),
|
||||
?assertEqual(UUID2, UUID3),
|
||||
?assertEqual(UUID3, UUID4),
|
||||
?assertMatch({badrpc, nodedown}, emqx_telemetry_proto_v1:get_uuid('fake@node')).
|
||||
{ok, NodeUUID3} = emqx_telemetry:get_node_uuid(),
|
||||
{ok, NodeUUID4} = emqx_telemetry_proto_v1:get_node_uuid(node()),
|
||||
?assertEqual(NodeUUID2, NodeUUID3),
|
||||
?assertEqual(NodeUUID3, NodeUUID4),
|
||||
?assertMatch({badrpc, nodedown}, emqx_telemetry_proto_v1:get_node_uuid('fake@node')).
|
||||
|
||||
t_cluster_uuid(_Config) ->
|
||||
{ok, ClusterUUID0} = emqx_telemetry:get_cluster_uuid(),
|
||||
{ok, ClusterUUID1} = emqx_telemetry_proto_v1:get_cluster_uuid(node()),
|
||||
?assertEqual(ClusterUUID0, ClusterUUID1),
|
||||
{ok, NodeUUID0} = emqx_telemetry:get_node_uuid(),
|
||||
|
||||
Node = start_slave(n1),
|
||||
try
|
||||
ok = setup_slave(Node),
|
||||
{ok, ClusterUUID2} = emqx_telemetry_proto_v1:get_cluster_uuid(Node),
|
||||
?assertEqual(ClusterUUID0, ClusterUUID2),
|
||||
{ok, NodeUUID1} = emqx_telemetry_proto_v1:get_node_uuid(Node),
|
||||
?assertNotEqual(NodeUUID0, NodeUUID1),
|
||||
ok
|
||||
after
|
||||
ok = stop_slave(Node)
|
||||
end,
|
||||
ok.
|
||||
|
||||
t_official_version(_) ->
|
||||
true = emqx_telemetry:official_version("0.0.0"),
|
||||
|
@ -231,8 +248,11 @@ t_get_telemetry(_Config) ->
|
|||
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
|
||||
OTPVersion = bin(erlang:system_info(otp_release)),
|
||||
?assertEqual(OTPVersion, get_value(otp_version, TelemetryData)),
|
||||
{ok, UUID} = emqx_telemetry:get_uuid(),
|
||||
?assertEqual(UUID, get_value(uuid, TelemetryData)),
|
||||
{ok, NodeUUID} = emqx_telemetry:get_node_uuid(),
|
||||
{ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(),
|
||||
?assertEqual(NodeUUID, get_value(uuid, TelemetryData)),
|
||||
?assertEqual(ClusterUUID, get_value(cluster_uuid, TelemetryData)),
|
||||
?assertNotEqual(NodeUUID, ClusterUUID),
|
||||
?assertEqual(0, get_value(num_clients, TelemetryData)),
|
||||
BuildInfo = get_value(build_info, TelemetryData),
|
||||
?assertMatch(
|
||||
|
@ -449,6 +469,10 @@ t_exhook_info(_Config) ->
|
|||
),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
assert_approximate(Map, Key, Expected) ->
|
||||
Value = maps:get(Key, Map),
|
||||
?assertEqual(Expected, float_to_list(Value, [{decimals, 2}])).
|
||||
|
@ -562,3 +586,83 @@ set_special_configs(emqx_authz) ->
|
|||
ok;
|
||||
set_special_configs(_App) ->
|
||||
ok.
|
||||
|
||||
start_slave(Name) ->
|
||||
% We want VMs to only occupy a single core
|
||||
CommonBeamOpts = "+S 1:1 ",
|
||||
{ok, Node} = slave:start_link(host(), Name, CommonBeamOpts ++ ebin_path()),
|
||||
Node.
|
||||
|
||||
%% for some unknown reason, gen_rpc running locally or in CI might
|
||||
%% start with different `port_discovery' modes, which means that'll
|
||||
%% either be listening at the port in the config (`tcp_server_port',
|
||||
%% 5369) if `manual', else it'll listen on 5370 if started as
|
||||
%% `stateless'.
|
||||
find_gen_rpc_port() ->
|
||||
[EPort] = [
|
||||
EPort
|
||||
|| {links, Ls} <- process_info(whereis(gen_rpc_server_tcp)),
|
||||
EPort <- Ls,
|
||||
is_port(EPort)
|
||||
],
|
||||
{ok, {_, Port}} = inet:sockname(EPort),
|
||||
Port.
|
||||
|
||||
setup_slave(Node) ->
|
||||
TestNode = node(),
|
||||
Port = find_gen_rpc_port(),
|
||||
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
|
||||
ok = rpc:call(
|
||||
Node,
|
||||
application,
|
||||
set_env,
|
||||
[gen_rpc, tcp_server_port, 9002]
|
||||
),
|
||||
ok = rpc:call(
|
||||
Node,
|
||||
application,
|
||||
set_env,
|
||||
[gen_rpc, client_config_per_node, {internal, #{TestNode => Port}}]
|
||||
),
|
||||
ok = rpc:call(
|
||||
Node,
|
||||
application,
|
||||
set_env,
|
||||
[gen_rpc, port_discovery, manual]
|
||||
),
|
||||
Handler =
|
||||
fun
|
||||
(emqx) ->
|
||||
application:set_env(
|
||||
emqx,
|
||||
boot_modules,
|
||||
[]
|
||||
),
|
||||
ekka:join(TestNode),
|
||||
ok;
|
||||
(_) ->
|
||||
ok
|
||||
end,
|
||||
ok = rpc:call(
|
||||
Node,
|
||||
emqx_common_test_helpers,
|
||||
start_apps,
|
||||
[
|
||||
[emqx_conf, emqx_modules],
|
||||
Handler
|
||||
]
|
||||
),
|
||||
ok.
|
||||
|
||||
stop_slave(Node) ->
|
||||
slave:stop(Node).
|
||||
|
||||
host() ->
|
||||
[_, Host] = string:tokens(atom_to_list(node()), "@"),
|
||||
Host.
|
||||
|
||||
ebin_path() ->
|
||||
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
|
||||
|
||||
is_lib(Path) ->
|
||||
string:prefix(Path, code:lib_dir()) =:= nomatch.
|
||||
|
|
|
@ -59,6 +59,7 @@ init_per_testcase(t_status_fail, Config) ->
|
|||
init_per_testcase(t_status, Config) ->
|
||||
meck:new(emqx_telemetry, [non_strict, passthrough]),
|
||||
meck:expect(emqx_telemetry, official_version, 1, true),
|
||||
meck:expect(emqx_telemetry, enable, fun() -> ok end),
|
||||
Config;
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
Config.
|
||||
|
|
Loading…
Reference in New Issue