emqx/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl

836 lines
27 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_telemetry_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-import(proplists, [get_value/2]).
-define(MODULES_CONF, #{
<<"dealyed">> => <<"true">>,
<<"max_delayed_messages">> => <<"0">>
}).
all() -> emqx_common_test_helpers:all(?MODULE).
suite() ->
[
{timetrap, {minutes, 1}},
{repeat, 1}
].
apps() ->
[
emqx_conf,
emqx_connector,
emqx_retainer,
emqx_auth,
emqx_auth_redis,
emqx_auth_mnesia,
emqx_auth_postgresql,
emqx_modules,
emqx_telemetry,
emqx_bridge_http,
emqx_bridge,
emqx_rule_engine,
emqx_management
].
init_per_suite(Config) ->
WorkDir = ?config(priv_dir, Config),
Apps = emqx_cth_suite:start(apps(), #{work_dir => WorkDir}),
emqx_mgmt_api_test_util:init_suite(),
[{apps, Apps}, {work_dir, WorkDir} | Config].
end_per_suite(Config) ->
mnesia:clear_table(cluster_rpc_commit),
mnesia:clear_table(cluster_rpc_mfa),
Apps = ?config(apps, Config),
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(t_get_telemetry_without_memsup, Config) ->
ok = application:stop(os_mon),
init_per_testcase(t_get_telemetry, Config);
init_per_testcase(t_get_telemetry, Config) ->
DataDir = ?config(data_dir, Config),
mock_httpc(),
ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(
emqx_telemetry,
read_raw_build_info,
fun() ->
Path = filename:join([DataDir, "BUILD_INFO"]),
{ok, Template} = file:read_file(Path),
Vars0 = [
{build_info_arch, "arch"},
{build_info_wordsize, "64"},
{build_info_os, "os"},
{build_info_erlang, "erlang"},
{build_info_elixir, "elixir"},
{build_info_relform, "relform"}
],
Vars = [
{atom_to_list(K), iolist_to_binary(V)}
|| {K, V} <- Vars0
],
Rendered = bbmustache:render(Template, Vars),
{ok, Rendered}
end
),
Lwm2mDataDir = emqx_common_test_helpers:deps_path(
emqx_gateway,
"test/emqx_gateway_SUITE_data"
),
ok = emqx_gateway_SUITE:setup_fake_usage_data(Lwm2mDataDir),
emqx_common_test_helpers:start_apps([emqx_gateway]),
Config;
init_per_testcase(t_advanced_mqtt_features, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
{atomic, ok} = mria:clear_table(emqx_delayed),
mock_advanced_mqtt_features(),
Config;
init_per_testcase(t_authn_authz_info, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
create_authn('mqtt:global', built_in_database),
create_authn('tcp:default', redis),
create_authn('ws:default', redis),
create_authz(postgresql),
Config;
init_per_testcase(t_enable, Config) ->
ok = meck:new(emqx_telemetry_config, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(emqx_telemetry_config, is_official_version, fun(_) -> true end),
mock_httpc(),
Config;
init_per_testcase(t_send_after_enable, Config) ->
ok = meck:new(emqx_telemetry_config, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(emqx_telemetry_config, is_official_version, fun(_) -> true end),
mock_httpc(),
Config;
init_per_testcase(t_rule_engine_and_data_bridge_info, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
ok = emqx_bridge_SUITE:setup_fake_telemetry_data(),
ok = setup_fake_rule_engine_data(),
Config;
init_per_testcase(t_exhook_info, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
ExhookConf =
#{
<<"exhook">> =>
#{
<<"servers">> =>
[
#{
<<"name">> => "myhook",
<<"url">> => "http://127.0.0.1:9000"
}
]
}
},
{ok, _} = emqx_exhook_demo_svr:start(),
{ok, Sock} = gen_tcp:connect("localhost", 9000, [], 3000),
_ = gen_tcp:close(Sock),
ok = emqx_common_test_helpers:load_config(emqx_exhook_schema, ExhookConf),
emqx_common_test_helpers:start_apps([emqx_exhook]),
Config;
init_per_testcase(t_cluster_uuid, Config) ->
Node = start_peer(n1),
[{n1, Node} | Config];
init_per_testcase(t_uuid_restored_from_file, Config) ->
Config;
init_per_testcase(t_uuid_saved_to_file, Config) ->
DataDir = emqx:data_dir(),
NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
file:delete(NodeUUIDFile),
file:delete(ClusterUUIDFile),
Config;
init_per_testcase(t_num_clients, Config) ->
ok = snabbkaffe:start_trace(),
Config;
init_per_testcase(_Testcase, Config) ->
mock_httpc(),
Config.
end_per_testcase(t_get_telemetry_without_memsup, Config) ->
application:start(os_mon),
end_per_testcase(t_get_telemetry, Config);
end_per_testcase(t_get_telemetry, _Config) ->
meck:unload([httpc, emqx_telemetry]),
application:stop(emqx_gateway),
ok;
end_per_testcase(t_advanced_mqtt_features, _Config) ->
process_flag(trap_exit, true),
ok = emqx_retainer:clean(),
{ok, _} = emqx_auto_subscribe:update([]),
ok = emqx_rewrite:update([]),
{atomic, ok} = mria:clear_table(emqx_delayed),
ok;
end_per_testcase(t_authn_authz_info, _Config) ->
emqx_authz:update({delete, postgresql}, #{}),
lists:foreach(
fun(ChainName) ->
catch emqx_authn_test_lib:delete_authenticators(
[authentication],
ChainName
)
end,
['mqtt:global', 'tcp:default', 'ws:default']
),
ok;
end_per_testcase(t_enable, _Config) ->
meck:unload([httpc, emqx_telemetry_config]);
end_per_testcase(t_send_after_enable, _Config) ->
meck:unload([httpc, emqx_telemetry_config]);
end_per_testcase(t_rule_engine_and_data_bridge_info, _Config) ->
ok;
end_per_testcase(t_exhook_info, _Config) ->
emqx_exhook_demo_svr:stop(),
application:stop(emqx_exhook),
ok;
end_per_testcase(t_cluster_uuid, Config) ->
Node = proplists:get_value(n1, Config),
ok = stop_peer(Node);
end_per_testcase(t_num_clients, Config) ->
ok = snabbkaffe:stop(),
Config;
end_per_testcase(_Testcase, _Config) ->
case catch meck:unload([httpc]) of
_ -> ok
end,
ok.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
t_node_uuid(_) ->
UUID = emqx_telemetry:generate_uuid(),
Parts = binary:split(UUID, <<"-">>, [global, trim]),
?assertEqual(5, length(Parts)),
{ok, NodeUUID2} = emqx_telemetry:get_node_uuid(),
emqx_telemetry:stop_reporting(),
emqx_telemetry:start_reporting(),
emqx_telemetry_config:set_telemetry_status(false),
emqx_telemetry_config:set_telemetry_status(true),
{ok, NodeUUID3} = emqx_telemetry:get_node_uuid(),
{ok, NodeUUID4} = emqx_telemetry_proto_v1:get_node_uuid(node()),
?assertEqual(NodeUUID2, NodeUUID3),
?assertEqual(NodeUUID3, NodeUUID4),
?assertMatch({badrpc, nodedown}, emqx_telemetry_proto_v1:get_node_uuid('fake@node')).
t_cluster_uuid(Config) ->
Node = proplists:get_value(n1, Config),
{ok, ClusterUUID0} = emqx_telemetry:get_cluster_uuid(),
{ok, ClusterUUID1} = emqx_telemetry_proto_v1:get_cluster_uuid(node()),
?assertEqual(ClusterUUID0, ClusterUUID1),
{ok, NodeUUID0} = emqx_telemetry:get_node_uuid(),
{ok, ClusterUUID2} = emqx_telemetry_proto_v1:get_cluster_uuid(Node),
?assertEqual(ClusterUUID0, ClusterUUID2),
{ok, NodeUUID1} = emqx_telemetry_proto_v1:get_node_uuid(Node),
?assertNotEqual(NodeUUID0, NodeUUID1),
ok.
%% should attempt read UUID from file in data dir to keep UUIDs
%% unique, in the event of a database purge.
t_uuid_restored_from_file(Config) ->
%% Stop the emqx_telemetry application first
{atomic, ok} = mria:clear_table(emqx_telemetry),
application:stop(emqx_telemetry),
%% Rewrite the the uuid files
NodeUUID = <<"AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE">>,
ClusterUUID = <<"FFFFFFFF-GGGG-HHHH-IIII-JJJJJJJJJJJJ">>,
DataDir = ?config(work_dir, Config),
NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
ok = file:write_file(NodeUUIDFile, NodeUUID),
ok = file:write_file(ClusterUUIDFile, ClusterUUID),
%% Start the emqx_telemetry application again
application:start(emqx_telemetry),
%% Check the UUIDs
?assertEqual(
{ok, NodeUUID},
emqx_telemetry:get_node_uuid()
),
?assertEqual(
{ok, ClusterUUID},
emqx_telemetry:get_cluster_uuid()
),
ok.
t_uuid_saved_to_file(_Config) ->
DataDir = emqx:data_dir(),
NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
%% preconditions
?assertEqual({error, enoent}, file:read_file(NodeUUIDFile)),
?assertEqual({error, enoent}, file:read_file(ClusterUUIDFile)),
%% clear the UUIDs in the DB
{atomic, ok} = mria:clear_table(emqx_telemetry),
application:stop(emqx_telemetry),
application:start(emqx_telemetry),
{ok, NodeUUID} = emqx_telemetry:get_node_uuid(),
{ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(),
?assertEqual(
{ok, NodeUUID},
file:read_file(NodeUUIDFile)
),
?assertEqual(
{ok, ClusterUUID},
file:read_file(ClusterUUIDFile)
),
ok.
t_is_official_version(_) ->
Is = fun(V) -> is_official_version(V) end,
true = lists:all(Is, [
"0.0.0",
"1.1.1",
"10.10.10",
"0.0-alpha.1",
"1.1-alpha.1",
"10.10-alpha.10",
"1.1-rc.1",
"1.1-beta.1",
"5.0.0",
"5.0.0-alpha.1",
"5.0.0-beta.4",
"5.0-rc.1",
"5.0.0-rc.1"
]),
false = lists:any(Is, [
"1.1-alpha.a",
"1.1-alpha.0",
"0.0.0.0",
"1.1.a",
"5.0.0-alpha.a",
"5.0.0-beta.a",
"5.0.0-rc.a",
"5.0.0-foo",
"5.0.0-rc.1-ccdf7920"
]).
t_get_telemetry(_Config) ->
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
OTPVersion = bin(erlang:system_info(otp_release)),
?assertEqual(OTPVersion, get_value(otp_version, TelemetryData)),
{ok, NodeUUID} = emqx_telemetry:get_node_uuid(),
{ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(),
?assertEqual(NodeUUID, get_value(uuid, TelemetryData)),
?assertEqual(ClusterUUID, get_value(cluster_uuid, TelemetryData)),
?assertNotEqual(NodeUUID, ClusterUUID),
?assertEqual(0, get_value(num_clients, TelemetryData)),
BuildInfo = get_value(build_info, TelemetryData),
?assertMatch(
#{
<<"arch">> := <<_/binary>>,
<<"elixir">> := <<_/binary>>,
<<"erlang">> := <<_/binary>>,
<<"os">> := <<_/binary>>,
<<"relform">> := <<_/binary>>,
<<"wordsize">> := Wordsize
} when is_integer(Wordsize),
BuildInfo
),
VMSpecs = get_value(vm_specs, TelemetryData),
?assert(is_integer(get_value(num_cpus, VMSpecs))),
?assert(0 =< get_value(num_cpus, VMSpecs)),
?assert(is_integer(get_value(total_memory, VMSpecs))),
?assert(0 =< get_value(total_memory, VMSpecs)),
MQTTRTInsights = get_value(mqtt_runtime_insights, TelemetryData),
?assert(is_number(maps:get(messages_sent_rate, MQTTRTInsights))),
?assert(is_number(maps:get(messages_received_rate, MQTTRTInsights))),
?assert(is_integer(maps:get(num_topics, MQTTRTInsights))),
?assert(is_map(get_value(authn_authz, TelemetryData))),
GatewayInfo = get_value(gateway, TelemetryData),
?assert(is_map(GatewayInfo)),
lists:foreach(
fun({GatewayType, GatewayData}) ->
?assertMatch(
#{
authn := GwAuthn,
num_clients := NClients,
listeners := Ls
} when
is_binary(GwAuthn) andalso
is_integer(NClients) andalso
is_list(Ls),
GatewayData,
#{gateway_type => GatewayType}
),
ListenersData = maps:get(listeners, GatewayData),
lists:foreach(
fun(L) -> assert_gateway_listener_shape(L, GatewayType) end,
ListenersData
)
end,
maps:to_list(GatewayInfo)
),
ok.
t_num_clients(_Config) ->
{ok, Client} = emqtt:start_link([
{client_id, <<"live_client">>},
{port, 1883},
{clean_start, false}
]),
{{ok, _}, {ok, _}} = ?wait_async_action(
{ok, _} = emqtt:connect(Client),
#{
?snk_kind := emqx_stats_setstat,
count_stat := 'live_connections.count',
value := 1
},
5_000
),
{ok, TelemetryData0} = emqx_telemetry:get_telemetry(),
?assertEqual(1, get_value(num_clients, TelemetryData0)),
{ok, {ok, _}} = ?wait_async_action(
ok = emqtt:disconnect(Client),
#{
?snk_kind := emqx_stats_setstat,
count_stat := 'live_connections.count',
value := 0
},
5_000
),
{ok, TelemetryData1} = emqx_telemetry:get_telemetry(),
?assertEqual(0, get_value(num_clients, TelemetryData1)),
ok.
t_advanced_mqtt_features(_) ->
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
AdvFeats = get_value(advanced_mqtt_features, TelemetryData),
?assertEqual(
#{
retained => 5,
topic_rewrite => 2,
auto_subscribe => 3,
delayed => 4
},
AdvFeats
),
ok.
t_authn_authz_info(_) ->
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
AuthnAuthzInfo = get_value(authn_authz, TelemetryData),
?assertEqual(
#{
authn =>
[
<<"password_based:built_in_database">>,
<<"password_based:redis">>
],
authn_listener => #{<<"password_based:redis">> => 2},
authz => [postgresql]
},
AuthnAuthzInfo
).
t_enable(_) ->
ok = emqx_telemetry:start_reporting(),
ok = emqx_telemetry:stop_reporting().
t_send_after_enable(_) ->
ok = emqx_telemetry:stop_reporting(),
ok = snabbkaffe:start_trace(),
try
ok = emqx_telemetry:start_reporting(),
Timeout = 12_000,
?assertMatch(
{ok, _},
?wait_async_action(
ok = emqx_telemetry:start_reporting(),
#{?snk_kind := telemetry_data_reported},
Timeout
)
),
receive
{request, post, _URL, _Headers, Body} ->
{ok, Decoded} = emqx_utils_json:safe_decode(Body, [return_maps]),
?assertMatch(
#{
<<"uuid">> := _,
<<"messages_received">> := _,
<<"messages_sent">> := _,
<<"build_info">> := #{},
<<"vm_specs">> :=
#{
<<"num_cpus">> := _,
<<"total_memory">> := _
},
<<"mqtt_runtime_insights">> :=
#{
<<"messages_received_rate">> := _,
<<"messages_sent_rate">> := _,
<<"num_topics">> := _
},
<<"advanced_mqtt_features">> :=
#{
<<"retained">> := _,
<<"topic_rewrite">> := _,
<<"auto_subscribe">> := _,
<<"delayed">> := _
}
},
Decoded
)
after 2100 ->
exit(telemetry_not_reported)
end
after
ok = snabbkaffe:stop()
end.
t_mqtt_runtime_insights(_) ->
State0 = emqx_telemetry:empty_state(),
{MQTTRTInsights1, State1} = emqx_telemetry:mqtt_runtime_insights(State0),
?assertEqual(
#{
messages_sent_rate => 0.0,
messages_received_rate => 0.0,
num_topics => 0
},
MQTTRTInsights1
),
%% add some fake stats
emqx_metrics:set('messages.sent', 10_000_000_000),
emqx_metrics:set('messages.received', 20_000_000_000),
emqx_stats:setstat('topics.count', 30_000),
{MQTTRTInsights2, _State2} = emqx_telemetry:mqtt_runtime_insights(State1),
assert_approximate(MQTTRTInsights2, messages_sent_rate, "16.53"),
assert_approximate(MQTTRTInsights2, messages_received_rate, "33.07"),
?assertEqual(30_000, maps:get(num_topics, MQTTRTInsights2)),
ok.
t_rule_engine_and_data_bridge_info(_Config) ->
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
ct:pal("telemetry data: ~p~n", [TelemetryData]),
RuleInfo = get_value(rule_engine, TelemetryData),
BridgeInfo = get_value(bridge, TelemetryData),
?assertEqual(
#{num_rules => 3},
RuleInfo
),
?assertEqual(
#{
data_bridge =>
#{
http => #{num => 1, num_linked_by_rules => 3},
mqtt => #{num => 2, num_linked_by_rules => 2}
},
num_data_bridges => 3
},
BridgeInfo
),
ok.
t_exhook_info(_Config) ->
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
ExhookInfo = get_value(exhook, TelemetryData),
?assertEqual(1, maps:get(num_servers, ExhookInfo)),
[Server] = maps:get(servers, ExhookInfo),
?assertEqual(grpc, maps:get(driver, Server)),
Hooks = maps:get(hooks, Server),
?assertEqual(
[
'client.authenticate',
'client.authorize',
'client.connack',
'client.connect',
'client.connected',
'client.disconnected',
'client.subscribe',
'client.unsubscribe',
'message.acked',
'message.delivered',
'message.dropped',
'message.publish',
'session.created',
'session.discarded',
'session.resumed',
'session.subscribed',
'session.takenover',
'session.terminated',
'session.unsubscribed'
],
lists:sort(Hooks)
),
ok.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
assert_approximate(Map, Key, Expected) ->
Value = maps:get(Key, Map),
?assertEqual(Expected, float_to_list(Value, [{decimals, 2}])).
bin(L) when is_list(L) ->
list_to_binary(L);
bin(B) when is_binary(B) ->
B.
mock_httpc() ->
TestPID = self(),
ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(
httpc,
request,
fun
(Method, {URL, Headers, _ContentType, Body}, _HTTPOpts, _Opts) ->
TestPID ! {request, Method, URL, Headers, Body};
(Method, {URL, Headers}, _HTTPOpts, _Opts) ->
TestPID ! {request, Method, URL, Headers, undefined}
end
),
ok = meck:expect(
httpc,
request,
fun(Method, {URL, Headers}, _Opts) ->
TestPID ! {request, Method, URL, Headers, undefined}
end
).
mock_advanced_mqtt_features() ->
Context = emqx_retainer:context(),
lists:foreach(
fun(N) ->
Num = integer_to_binary(N),
Message = emqx_message:make(<<"retained/", Num/binary>>, <<"payload">>),
ok = emqx_retainer:store_retained(Context, Message)
end,
lists:seq(1, 5)
),
lists:foreach(
fun(N) ->
DelaySec = integer_to_binary(N + 10),
Message = emqx_message:make(
<<"$delayed/", DelaySec/binary, "/delayed">>, <<"payload">>
),
{stop, _} = emqx_delayed:on_message_publish(Message)
end,
lists:seq(1, 4)
),
AutoSubscribeTopics =
lists:map(
fun(N) ->
Num = integer_to_binary(N),
Topic = <<"auto/", Num/binary>>,
#{<<"topic">> => Topic}
end,
lists:seq(1, 3)
),
{ok, _} = emqx_auto_subscribe:update(AutoSubscribeTopics),
RewriteTopics =
lists:map(
fun(N) ->
Num = integer_to_binary(N),
DestTopic = <<"rewrite/dest/", Num/binary>>,
SourceTopic = <<"rewrite/source/", Num/binary>>,
#{
<<"source_topic">> => SourceTopic,
<<"dest_topic">> => DestTopic,
<<"action">> => all,
<<"re">> => DestTopic
}
end,
lists:seq(1, 2)
),
ok = emqx_rewrite:update(RewriteTopics),
ok.
create_authn(ChainName, built_in_database) ->
emqx_authn_chains:create_authenticator(
ChainName,
#{
mechanism => password_based,
backend => built_in_database,
enable => true,
user_id_type => username,
password_hash_algorithm => #{
name => plain,
salt_position => suffix
}
}
);
create_authn(ChainName, redis) ->
emqx_authn_chains:create_authenticator(
ChainName,
#{
mechanism => password_based,
backend => redis,
enable => true,
user_id_type => username,
cmd => <<"HMGET mqtt_user:${username} password_hash salt is_superuser">>,
password_hash_algorithm => #{
name => plain,
salt_position => suffix
}
}
).
create_authz(postgresql) ->
emqx_authz:update(
append,
#{
<<"type">> => <<"postgresql">>,
<<"enable">> => true,
<<"server">> => <<"127.0.0.1:27017">>,
<<"pool_size">> => 1,
<<"database">> => <<"mqtt">>,
<<"username">> => <<"xx">>,
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false},
<<"query">> => <<"abcb">>
}
).
assert_gateway_listener_shape(ListenerData, GatewayType) ->
?assertMatch(
#{type := LType, authn := LAuthn} when
is_atom(LType) andalso is_binary(LAuthn),
ListenerData,
#{gateway_type => GatewayType}
).
setup_fake_rule_engine_data() ->
{ok, _} =
emqx_rule_engine:create_rule(
#{
id => <<"rule:t_get_basic_usage_info:1">>,
sql => <<"select 1 from topic">>,
actions =>
[
#{function => <<"erlang:hibernate">>, args => #{}},
#{function => console},
<<"webhook:basic_usage_info_webhook">>,
<<"webhook:basic_usage_info_webhook_disabled">>
]
}
),
{ok, _} =
emqx_rule_engine:create_rule(
#{
id => <<"rule:t_get_basic_usage_info:2">>,
sql => <<"select 1 from topic">>,
actions =>
[
<<"mqtt:basic_usage_info_mqtt">>,
<<"webhook:basic_usage_info_webhook">>
]
}
),
{ok, _} =
emqx_rule_engine:create_rule(
#{
id => <<"rule:t_get_basic_usage_info:3">>,
sql => <<"select 1 from \"$bridges/mqtt:basic_usage_info_mqtt\"">>,
actions =>
[
#{function => console}
]
}
),
ok.
%% for some unknown reason, gen_rpc running locally or in CI might
%% start with different `port_discovery' modes, which means that'll
%% either be listening at the port in the config (`tcp_server_port',
%% 5369) if `manual', else it'll listen on 5370 if started as
%% `stateless'.
find_gen_rpc_port() ->
[EPort] = [
EPort
|| {links, Ls} <- process_info(whereis(gen_rpc_server_tcp)),
EPort <- Ls,
is_port(EPort)
],
{ok, {_, Port}} = inet:sockname(EPort),
Port.
start_peer(Name) ->
Port = find_gen_rpc_port(),
TestNode = node(),
Handler =
fun
(emqx) ->
application:set_env(emqx, boot_modules, []),
ekka:join(TestNode),
emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
ok;
(_App) ->
emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
ok
end,
Opts = #{
env => [
{gen_rpc, tcp_server_port, 9002},
{gen_rpc, port_discovery, manual},
{gen_rpc, client_config_per_node, {internal, #{TestNode => Port}}}
],
load_schema => false,
configure_gen_rpc => false,
env_handler => Handler,
load_apps => [gen_rpc, emqx],
listener_ports => [],
apps => [emqx, emqx_conf, emqx_retainer, emqx_modules, emqx_telemetry]
},
emqx_common_test_helpers:start_peer(Name, Opts).
stop_peer(Node) ->
rpc:call(Node, ?MODULE, leave_cluster, []),
ok = emqx_cth_peer:stop(Node),
?assertEqual([node()], mria:running_nodes()),
?assertEqual([], nodes()),
_ = application:stop(mria),
ok = application:start(mria).
leave_cluster() ->
try mnesia_hook:module_info() of
_ -> ekka:leave()
catch
_:_ ->
%% We have to set the db_backend to mnesia even for `ekka:leave/0`!!
application:set_env(mria, db_backend, mnesia),
ekka:leave()
end.
is_official_version(V) ->
emqx_telemetry_config:is_official_version(V).