diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 38966b854..45ca1a7b0 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -47,7 +47,8 @@ ]). -export([ - get_uuid/0, + get_node_uuid/0, + get_cluster_uuid/0, get_telemetry/0 ]). @@ -67,12 +68,13 @@ ]). -record(telemetry, { - id :: non_neg_integer(), + id :: atom(), uuid :: binary() }). -record(state, { - uuid :: undefined | binary(), + node_uuid :: undefined | binary(), + cluster_uuid :: undefined | binary(), url :: string(), report_interval :: non_neg_integer(), timer = undefined :: undefined | reference(), @@ -85,10 +87,12 @@ %% 1582-10-15 00:00:00 and the UNIX epoch 1970-01-01 00:00:00. -define(GREGORIAN_EPOCH_OFFSET, 16#01b21dd213814000). --define(UNIQUE_ID, 9527). +-define(CLUSTER_UUID_KEY, cluster_uuid). -define(TELEMETRY, emqx_telemetry). +-define(TELEMETRY_SHARD, emqx_telemetry_shard). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -99,7 +103,7 @@ start_link() -> [ {type, set}, {storage, disc_copies}, - {local_content, true}, + {rlog_shard, ?TELEMETRY_SHARD}, {record_name, telemetry}, {attributes, record_info(fields, telemetry)} ] @@ -117,8 +121,11 @@ enable() -> disable() -> gen_server:call(?MODULE, disable). -get_uuid() -> - gen_server:call(?MODULE, get_uuid). +get_node_uuid() -> + gen_server:call(?MODULE, get_node_uuid). + +get_cluster_uuid() -> + gen_server:call(?MODULE, get_cluster_uuid). get_telemetry() -> gen_server:call(?MODULE, get_telemetry). @@ -135,19 +142,8 @@ get_telemetry() -> -dialyzer([{nowarn_function, [init/1]}]). init(_Opts) -> State0 = empty_state(), - UUID1 = - case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of - [] -> - UUID = generate_uuid(), - mria:dirty_write(?TELEMETRY, #telemetry{ - id = ?UNIQUE_ID, - uuid = UUID - }), - UUID; - [#telemetry{uuid = UUID} | _] -> - UUID - end, - {ok, State0#state{uuid = UUID1}}. + {NodeUUID, ClusterUUID} = ensure_uuids(), + {ok, State0#state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}}. handle_call(enable, _From, State0) -> case ?MODULE:official_version(emqx_app:get_release()) of @@ -165,7 +161,9 @@ handle_call(disable, _From, State = #state{timer = Timer}) -> false -> {reply, {error, not_official_version}, State} end; -handle_call(get_uuid, _From, State = #state{uuid = UUID}) -> +handle_call(get_node_uuid, _From, State = #state{node_uuid = UUID}) -> + {reply, {ok, UUID}, State}; +handle_call(get_cluster_uuid, _From, State = #state{cluster_uuid = UUID}) -> {reply, {ok, UUID}, State}; handle_call(get_telemetry, _From, State) -> {_State, Telemetry} = get_telemetry(State), @@ -270,7 +268,7 @@ nodes_uuid() -> Nodes = lists:delete(node(), mria_mnesia:running_nodes()), lists:foldl( fun(Node, Acc) -> - case emqx_telemetry_proto_v1:get_uuid(Node) of + case emqx_telemetry_proto_v1:get_node_uuid(Node) of {badrpc, _Reason} -> Acc; {ok, UUID} -> @@ -322,7 +320,7 @@ generate_uuid() -> ). -spec get_telemetry(state()) -> {state(), proplists:proplist()}. -get_telemetry(State0 = #state{uuid = UUID}) -> +get_telemetry(State0 = #state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}) -> OSInfo = os_info(), {MQTTRTInsights, State} = mqtt_runtime_insights(State0), #{ @@ -336,7 +334,8 @@ get_telemetry(State0 = #state{uuid = UUID}) -> {os_version, bin(get_value(os_version, OSInfo))}, {otp_version, bin(otp_version())}, {up_time, uptime()}, - {uuid, UUID}, + {uuid, NodeUUID}, + {cluster_uuid, ClusterUUID}, {nodes_uuid, nodes_uuid()}, {active_plugins, active_plugins()}, {num_clients, num_clients()}, @@ -519,6 +518,45 @@ bin(B) when is_binary(B) -> bool2int(true) -> 1; bool2int(false) -> 0. +ensure_uuids() -> + Txn = fun() -> + NodeUUID = + case mnesia:wread({?TELEMETRY, node()}) of + [] -> + NodeUUID0 = generate_uuid(), + mnesia:write( + ?TELEMETRY, + #telemetry{ + id = node(), + uuid = NodeUUID0 + }, + write + ), + NodeUUID0; + [#telemetry{uuid = NodeUUID0}] -> + NodeUUID0 + end, + ClusterUUID = + case mnesia:wread({?TELEMETRY, ?CLUSTER_UUID_KEY}) of + [] -> + ClusterUUID0 = generate_uuid(), + mnesia:write( + ?TELEMETRY, + #telemetry{ + id = ?CLUSTER_UUID_KEY, + uuid = ClusterUUID0 + }, + write + ), + ClusterUUID0; + [#telemetry{uuid = ClusterUUID0}] -> + ClusterUUID0 + end, + {NodeUUID, ClusterUUID} + end, + {atomic, UUIDs} = mria:transaction(?TELEMETRY_SHARD, Txn), + UUIDs. + empty_state() -> #state{ url = ?TELEMETRY_URL, diff --git a/apps/emqx_modules/src/proto/emqx_telemetry_proto_v1.erl b/apps/emqx_modules/src/proto/emqx_telemetry_proto_v1.erl index 91af2bdfd..ebd23e534 100644 --- a/apps/emqx_modules/src/proto/emqx_telemetry_proto_v1.erl +++ b/apps/emqx_modules/src/proto/emqx_telemetry_proto_v1.erl @@ -20,7 +20,8 @@ -export([ introduced_in/0, - get_uuid/1 + get_node_uuid/1, + get_cluster_uuid/1 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -28,6 +29,10 @@ introduced_in() -> "5.0.0". --spec get_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc(). -get_uuid(Node) -> - rpc:call(Node, emqx_telemetry, get_uuid, []). +-spec get_node_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc(). +get_node_uuid(Node) -> + rpc:call(Node, emqx_telemetry, get_node_uuid, []). + +-spec get_cluster_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc(). +get_cluster_uuid(Node) -> + rpc:call(Node, emqx_telemetry, get_cluster_uuid, []). diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index fd04d5224..0c3740524 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -144,13 +144,7 @@ init_per_testcase(t_exhook_info, Config) -> {ok, _} = application:ensure_all_started(emqx_exhook), Config; init_per_testcase(_Testcase, Config) -> - TestPID = self(), - ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]), - ok = meck:expect(httpc, request, fun( - Method, {URL, Headers, _ContentType, Body}, _HTTPOpts, _Opts - ) -> - TestPID ! {request, Method, URL, Headers, Body} - end), + mock_httpc(), Config. end_per_testcase(t_get_telemetry, _Config) -> @@ -198,20 +192,43 @@ end_per_testcase(_Testcase, _Config) -> meck:unload([httpc]), ok. -t_uuid(_) -> +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +t_node_uuid(_) -> UUID = emqx_telemetry:generate_uuid(), Parts = binary:split(UUID, <<"-">>, [global, trim]), ?assertEqual(5, length(Parts)), - {ok, UUID2} = emqx_telemetry:get_uuid(), + {ok, NodeUUID2} = emqx_telemetry:get_node_uuid(), emqx_telemetry:disable(), emqx_telemetry:enable(), emqx_modules_conf:set_telemetry_status(false), emqx_modules_conf:set_telemetry_status(true), - {ok, UUID3} = emqx_telemetry:get_uuid(), - {ok, UUID4} = emqx_telemetry_proto_v1:get_uuid(node()), - ?assertEqual(UUID2, UUID3), - ?assertEqual(UUID3, UUID4), - ?assertMatch({badrpc, nodedown}, emqx_telemetry_proto_v1:get_uuid('fake@node')). + {ok, NodeUUID3} = emqx_telemetry:get_node_uuid(), + {ok, NodeUUID4} = emqx_telemetry_proto_v1:get_node_uuid(node()), + ?assertEqual(NodeUUID2, NodeUUID3), + ?assertEqual(NodeUUID3, NodeUUID4), + ?assertMatch({badrpc, nodedown}, emqx_telemetry_proto_v1:get_node_uuid('fake@node')). + +t_cluster_uuid(_Config) -> + {ok, ClusterUUID0} = emqx_telemetry:get_cluster_uuid(), + {ok, ClusterUUID1} = emqx_telemetry_proto_v1:get_cluster_uuid(node()), + ?assertEqual(ClusterUUID0, ClusterUUID1), + {ok, NodeUUID0} = emqx_telemetry:get_node_uuid(), + + Node = start_slave(n1), + try + ok = setup_slave(Node), + {ok, ClusterUUID2} = emqx_telemetry_proto_v1:get_cluster_uuid(Node), + ?assertEqual(ClusterUUID0, ClusterUUID2), + {ok, NodeUUID1} = emqx_telemetry_proto_v1:get_node_uuid(Node), + ?assertNotEqual(NodeUUID0, NodeUUID1), + ok + after + ok = stop_slave(Node) + end, + ok. t_official_version(_) -> true = emqx_telemetry:official_version("0.0.0"), @@ -231,8 +248,11 @@ t_get_telemetry(_Config) -> {ok, TelemetryData} = emqx_telemetry:get_telemetry(), OTPVersion = bin(erlang:system_info(otp_release)), ?assertEqual(OTPVersion, get_value(otp_version, TelemetryData)), - {ok, UUID} = emqx_telemetry:get_uuid(), - ?assertEqual(UUID, get_value(uuid, TelemetryData)), + {ok, NodeUUID} = emqx_telemetry:get_node_uuid(), + {ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(), + ?assertEqual(NodeUUID, get_value(uuid, TelemetryData)), + ?assertEqual(ClusterUUID, get_value(cluster_uuid, TelemetryData)), + ?assertNotEqual(NodeUUID, ClusterUUID), ?assertEqual(0, get_value(num_clients, TelemetryData)), BuildInfo = get_value(build_info, TelemetryData), ?assertMatch( @@ -449,6 +469,10 @@ t_exhook_info(_Config) -> ), ok. +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + assert_approximate(Map, Key, Expected) -> Value = maps:get(Key, Map), ?assertEqual(Expected, float_to_list(Value, [{decimals, 2}])). @@ -562,3 +586,83 @@ set_special_configs(emqx_authz) -> ok; set_special_configs(_App) -> ok. + +start_slave(Name) -> + % We want VMs to only occupy a single core + CommonBeamOpts = "+S 1:1 ", + {ok, Node} = slave:start_link(host(), Name, CommonBeamOpts ++ ebin_path()), + Node. + +%% for some unknown reason, gen_rpc running locally or in CI might +%% start with different `port_discovery' modes, which means that'll +%% either be listening at the port in the config (`tcp_server_port', +%% 5369) if `manual', else it'll listen on 5370 if started as +%% `stateless'. +find_gen_rpc_port() -> + [EPort] = [ + EPort + || {links, Ls} <- process_info(whereis(gen_rpc_server_tcp)), + EPort <- Ls, + is_port(EPort) + ], + {ok, {_, Port}} = inet:sockname(EPort), + Port. + +setup_slave(Node) -> + TestNode = node(), + Port = find_gen_rpc_port(), + [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]], + ok = rpc:call( + Node, + application, + set_env, + [gen_rpc, tcp_server_port, 9002] + ), + ok = rpc:call( + Node, + application, + set_env, + [gen_rpc, client_config_per_node, {internal, #{TestNode => Port}}] + ), + ok = rpc:call( + Node, + application, + set_env, + [gen_rpc, port_discovery, manual] + ), + Handler = + fun + (emqx) -> + application:set_env( + emqx, + boot_modules, + [] + ), + ekka:join(TestNode), + ok; + (_) -> + ok + end, + ok = rpc:call( + Node, + emqx_common_test_helpers, + start_apps, + [ + [emqx_conf, emqx_modules], + Handler + ] + ), + ok. + +stop_slave(Node) -> + slave:stop(Node). + +host() -> + [_, Host] = string:tokens(atom_to_list(node()), "@"), + Host. + +ebin_path() -> + string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). + +is_lib(Path) -> + string:prefix(Path, code:lib_dir()) =:= nomatch. diff --git a/apps/emqx_modules/test/emqx_telemetry_api_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_api_SUITE.erl index d178b32d8..6ad1da219 100644 --- a/apps/emqx_modules/test/emqx_telemetry_api_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_api_SUITE.erl @@ -59,6 +59,7 @@ init_per_testcase(t_status_fail, Config) -> init_per_testcase(t_status, Config) -> meck:new(emqx_telemetry, [non_strict, passthrough]), meck:expect(emqx_telemetry, official_version, 1, true), + meck:expect(emqx_telemetry, enable, fun() -> ok end), Config; init_per_testcase(_TestCase, Config) -> Config.