1118 lines
39 KiB
Erlang
1118 lines
39 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_bridge_v2_SUITE).
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
|
|
|
con_mod() ->
|
|
emqx_bridge_v2_test_connector.
|
|
|
|
con_type() ->
|
|
bridge_type().
|
|
|
|
con_name() ->
|
|
my_connector.
|
|
|
|
connector_resource_id() ->
|
|
emqx_connector_resource:resource_id(con_type(), con_name()).
|
|
|
|
bridge_type() ->
|
|
test_bridge_type.
|
|
|
|
con_schema() ->
|
|
[
|
|
{
|
|
con_type(),
|
|
hoconsc:mk(
|
|
hoconsc:map(name, hoconsc:ref(?MODULE, connector_config)),
|
|
#{
|
|
desc => <<"Test Connector Config">>,
|
|
required => false
|
|
}
|
|
)
|
|
}
|
|
].
|
|
|
|
fields(connector_config) ->
|
|
[
|
|
{enable, hoconsc:mk(typerefl:boolean(), #{})},
|
|
{resource_opts, hoconsc:mk(typerefl:map(), #{})},
|
|
{on_start_fun, hoconsc:mk(typerefl:binary(), #{})},
|
|
{on_get_status_fun, hoconsc:mk(typerefl:binary(), #{})},
|
|
{on_add_channel_fun, hoconsc:mk(typerefl:binary(), #{})}
|
|
].
|
|
|
|
con_config() ->
|
|
#{
|
|
<<"enable">> => true,
|
|
<<"resource_opts">> => #{
|
|
%% Set this to a low value to make the test run faster
|
|
<<"health_check_interval">> => 100
|
|
}
|
|
}.
|
|
|
|
bridge_schema() ->
|
|
bridge_schema(_Opts = #{}).
|
|
|
|
bridge_schema(Opts) ->
|
|
Type = maps:get(bridge_type, Opts, bridge_type()),
|
|
[
|
|
{
|
|
Type,
|
|
hoconsc:mk(
|
|
hoconsc:map(name, typerefl:map()),
|
|
#{
|
|
desc => <<"Test Bridge Config">>,
|
|
required => false
|
|
}
|
|
)
|
|
}
|
|
].
|
|
|
|
bridge_config() ->
|
|
#{
|
|
<<"connector">> => atom_to_binary(con_name()),
|
|
<<"enable">> => true,
|
|
<<"send_to">> => registered_process_name(),
|
|
<<"resource_opts">> => #{
|
|
<<"resume_interval">> => 100
|
|
}
|
|
}.
|
|
|
|
fun_table_name() ->
|
|
emqx_bridge_v2_SUITE_fun_table.
|
|
|
|
registered_process_name() ->
|
|
my_registered_process.
|
|
|
|
all() ->
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
start_apps() ->
|
|
[
|
|
emqx,
|
|
emqx_conf,
|
|
emqx_connector,
|
|
emqx_bridge,
|
|
emqx_rule_engine
|
|
].
|
|
|
|
setup_mocks() ->
|
|
MeckOpts = [passthrough, no_link, no_history, non_strict],
|
|
|
|
catch meck:new(emqx_connector_schema, MeckOpts),
|
|
meck:expect(emqx_connector_schema, fields, 1, con_schema()),
|
|
meck:expect(emqx_connector_schema, connector_type_to_bridge_types, 1, [con_type()]),
|
|
|
|
catch meck:new(emqx_connector_resource, MeckOpts),
|
|
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()),
|
|
|
|
catch meck:new(emqx_bridge_v2_schema, MeckOpts),
|
|
meck:expect(emqx_bridge_v2_schema, fields, 1, bridge_schema()),
|
|
|
|
catch meck:new(emqx_bridge_v2, MeckOpts),
|
|
BridgeType = bridge_type(),
|
|
BridgeTypeBin = atom_to_binary(BridgeType),
|
|
meck:expect(
|
|
emqx_bridge_v2,
|
|
bridge_v2_type_to_connector_type,
|
|
fun(Type) when Type =:= BridgeType; Type =:= BridgeTypeBin -> con_type() end
|
|
),
|
|
meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()),
|
|
|
|
meck:expect(emqx_bridge_v2, is_bridge_v2_type, fun(Type) -> Type =:= BridgeType end),
|
|
ok.
|
|
|
|
init_per_suite(Config) ->
|
|
Apps = emqx_cth_suite:start(
|
|
app_specs(),
|
|
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
|
),
|
|
[{apps, Apps} | Config].
|
|
|
|
end_per_suite(Config) ->
|
|
Apps = ?config(apps, Config),
|
|
emqx_cth_suite:stop(Apps),
|
|
ok.
|
|
|
|
app_specs() ->
|
|
[
|
|
emqx,
|
|
emqx_conf,
|
|
emqx_connector,
|
|
emqx_bridge,
|
|
emqx_rule_engine
|
|
].
|
|
|
|
init_per_testcase(_TestCase, Config) ->
|
|
%% Setting up mocks for fake connector and bridge V2
|
|
setup_mocks(),
|
|
ets:new(fun_table_name(), [named_table, public]),
|
|
%% Create a fake connector
|
|
{ok, _} = emqx_connector:create(con_type(), con_name(), con_config()),
|
|
Config.
|
|
|
|
end_per_testcase(_TestCase, _Config) ->
|
|
ets:delete(fun_table_name()),
|
|
delete_all_bridges_and_connectors(),
|
|
meck:unload(),
|
|
emqx_common_test_helpers:call_janitor(),
|
|
ok.
|
|
|
|
delete_all_bridges_and_connectors() ->
|
|
lists:foreach(
|
|
fun(#{name := Name, type := Type}) ->
|
|
ct:pal("removing bridge ~p", [{Type, Name}]),
|
|
emqx_bridge_v2:remove(Type, Name)
|
|
end,
|
|
emqx_bridge_v2:list()
|
|
),
|
|
lists:foreach(
|
|
fun(#{name := Name, type := Type}) ->
|
|
ct:pal("removing connector ~p", [{Type, Name}]),
|
|
emqx_connector:remove(Type, Name)
|
|
end,
|
|
emqx_connector:list()
|
|
),
|
|
update_root_config(#{}),
|
|
ok.
|
|
|
|
%% Hocon does not support placing a fun in a config map so we replace it with a string
|
|
|
|
wrap_fun(Fun) ->
|
|
UniqRef = make_ref(),
|
|
UniqRefBin = term_to_binary(UniqRef),
|
|
UniqRefStr = iolist_to_binary(base64:encode(UniqRefBin)),
|
|
ets:insert(fun_table_name(), {UniqRefStr, Fun}),
|
|
UniqRefStr.
|
|
|
|
unwrap_fun(UniqRefStr) ->
|
|
ets:lookup_element(fun_table_name(), UniqRefStr, 2).
|
|
|
|
update_root_config(RootConf) ->
|
|
emqx_conf:update([actions], RootConf, #{override_to => cluster}).
|
|
|
|
update_root_connectors_config(RootConf) ->
|
|
emqx_conf:update([connectors], RootConf, #{override_to => cluster}).
|
|
|
|
t_create_remove(_) ->
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
|
ok.
|
|
|
|
t_create_disabled_bridge(_) ->
|
|
Config = #{<<"connector">> := Connector} = bridge_config(),
|
|
Disable = Config#{<<"enable">> => false},
|
|
BridgeType = bridge_type(),
|
|
{ok, _} = emqx_bridge_v2:create(BridgeType, my_enable_bridge, Config),
|
|
{ok, _} = emqx_bridge_v2:create(BridgeType, my_disable_bridge, Disable),
|
|
ConnectorId = emqx_connector_resource:resource_id(con_type(), Connector),
|
|
?assertMatch(
|
|
[
|
|
{_, #{
|
|
enable := true,
|
|
connector := Connector,
|
|
bridge_type := _
|
|
}},
|
|
{_, #{
|
|
enable := false,
|
|
connector := Connector,
|
|
bridge_type := _
|
|
}}
|
|
],
|
|
emqx_bridge_v2:get_channels_for_connector(ConnectorId)
|
|
),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_enable_bridge),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_disable_bridge),
|
|
ok.
|
|
|
|
t_list(_) ->
|
|
[] = emqx_bridge_v2:list(),
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
|
|
1 = length(emqx_bridge_v2:list()),
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge2, bridge_config()),
|
|
2 = length(emqx_bridge_v2:list()),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
|
1 = length(emqx_bridge_v2:list()),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge2),
|
|
0 = length(emqx_bridge_v2:list()),
|
|
ok.
|
|
|
|
t_create_dry_run(_) ->
|
|
ok = emqx_bridge_v2:create_dry_run(bridge_type(), bridge_config()).
|
|
|
|
t_create_dry_run_fail_add_channel(_) ->
|
|
Msg = <<"Failed to add channel">>,
|
|
OnAddChannel1 = wrap_fun(fun() ->
|
|
{error, Msg}
|
|
end),
|
|
Conf1 = (bridge_config())#{on_add_channel_fun => OnAddChannel1},
|
|
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1),
|
|
OnAddChannel2 = wrap_fun(fun() ->
|
|
throw(Msg)
|
|
end),
|
|
Conf2 = (bridge_config())#{on_add_channel_fun => OnAddChannel2},
|
|
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2),
|
|
ok.
|
|
|
|
t_create_dry_run_fail_get_channel_status(_) ->
|
|
Msg = <<"Failed to add channel">>,
|
|
Fun1 = wrap_fun(fun() ->
|
|
{error, Msg}
|
|
end),
|
|
Conf1 = (bridge_config())#{on_get_channel_status_fun => Fun1},
|
|
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1),
|
|
Fun2 = wrap_fun(fun() ->
|
|
throw(Msg)
|
|
end),
|
|
Conf2 = (bridge_config())#{on_get_channel_status_fun => Fun2},
|
|
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2),
|
|
ok.
|
|
|
|
t_create_dry_run_connector_does_not_exist(_) ->
|
|
BridgeConf = (bridge_config())#{<<"connector">> => <<"connector_does_not_exist">>},
|
|
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), BridgeConf).
|
|
|
|
t_bridge_v1_is_valid(_) ->
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
|
|
true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge),
|
|
%% Add another channel/bridge to the connector
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge_2, bridge_config()),
|
|
false = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
|
true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge_2),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge_2),
|
|
%% Non existing bridge is a valid Bridge V1
|
|
true = emqx_bridge_v2:bridge_v1_is_valid(bridge_v1_type, my_test_bridge),
|
|
ok.
|
|
|
|
t_manual_health_check(_) ->
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
|
|
%% Run a health check for the bridge
|
|
#{error := undefined, status := connected} = emqx_bridge_v2:health_check(
|
|
bridge_type(), my_test_bridge
|
|
),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
|
ok.
|
|
|
|
t_manual_health_check_exception(_) ->
|
|
Conf = (bridge_config())#{
|
|
<<"on_get_channel_status_fun">> => wrap_fun(fun() -> throw(my_error) end)
|
|
},
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
|
|
%% Run a health check for the bridge
|
|
#{error := my_error, status := disconnected} = emqx_bridge_v2:health_check(
|
|
bridge_type(), my_test_bridge
|
|
),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
|
ok.
|
|
|
|
t_manual_health_check_exception_error(_) ->
|
|
Conf = (bridge_config())#{
|
|
<<"on_get_channel_status_fun">> => wrap_fun(fun() -> error(my_error) end)
|
|
},
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
|
|
%% Run a health check for the bridge
|
|
#{error := _, status := disconnected} = emqx_bridge_v2:health_check(
|
|
bridge_type(), my_test_bridge
|
|
),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
|
ok.
|
|
|
|
t_manual_health_check_error(_) ->
|
|
Conf = (bridge_config())#{
|
|
<<"on_get_channel_status_fun">> => wrap_fun(fun() -> {error, my_error} end)
|
|
},
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
|
|
%% Run a health check for the bridge
|
|
#{error := my_error, status := disconnected} = emqx_bridge_v2:health_check(
|
|
bridge_type(), my_test_bridge
|
|
),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
|
ok.
|
|
|
|
t_send_message(_) ->
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
|
|
%% Register name for this process
|
|
register(registered_process_name(), self()),
|
|
_ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{}),
|
|
receive
|
|
<<"my_msg">> ->
|
|
ok
|
|
after 10000 ->
|
|
ct:fail("Failed to receive message")
|
|
end,
|
|
unregister(registered_process_name()),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge).
|
|
|
|
t_send_message_through_rule(_) ->
|
|
BridgeName = my_test_bridge,
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), BridgeName, bridge_config()),
|
|
%% Create a rule to send message to the bridge
|
|
{ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
|
|
#{
|
|
sql => <<"select * from \"t/a\"">>,
|
|
id => atom_to_binary(?FUNCTION_NAME),
|
|
actions => [
|
|
<<
|
|
(atom_to_binary(bridge_type()))/binary,
|
|
":",
|
|
(atom_to_binary(BridgeName))/binary
|
|
>>
|
|
],
|
|
description => <<"bridge_v2 test rule">>
|
|
}
|
|
),
|
|
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
|
|
%% Register name for this process
|
|
register(registered_process_name(), self()),
|
|
%% Send message to the topic
|
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
Payload = <<"hello">>,
|
|
Msg = emqx_message:make(ClientId, 0, <<"t/a">>, Payload),
|
|
emqx:publish(Msg),
|
|
receive
|
|
#{payload := Payload} ->
|
|
ok
|
|
after 10000 ->
|
|
ct:fail("Failed to receive message")
|
|
end,
|
|
unregister(registered_process_name()),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), BridgeName),
|
|
ok.
|
|
|
|
t_send_message_through_local_topic(_) ->
|
|
%% Bridge configuration with local topic
|
|
BridgeName = my_test_bridge,
|
|
TopicName = <<"t/b">>,
|
|
BridgeConfig = (bridge_config())#{
|
|
<<"local_topic">> => TopicName
|
|
},
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), BridgeName, BridgeConfig),
|
|
%% Register name for this process
|
|
register(registered_process_name(), self()),
|
|
%% Send message to the topic
|
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
|
Payload = <<"hej">>,
|
|
Msg = emqx_message:make(ClientId, 0, TopicName, Payload),
|
|
emqx:publish(Msg),
|
|
receive
|
|
#{payload := Payload} ->
|
|
ok
|
|
after 10000 ->
|
|
ct:fail("Failed to receive message")
|
|
end,
|
|
unregister(registered_process_name()),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), BridgeName),
|
|
ok.
|
|
|
|
t_send_message_unhealthy_channel(_) ->
|
|
OnGetStatusResponseETS = ets:new(on_get_status_response_ets, [public]),
|
|
ets:insert(OnGetStatusResponseETS, {status_value, {error, my_error}}),
|
|
OnGetStatusFun = wrap_fun(fun() ->
|
|
ets:lookup_element(OnGetStatusResponseETS, status_value, 2)
|
|
end),
|
|
Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => OnGetStatusFun},
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
|
|
%% Register name for this process
|
|
register(registered_process_name(), self()),
|
|
_ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{timeout => 1}),
|
|
receive
|
|
Any ->
|
|
ct:pal("Received message: ~p", [Any]),
|
|
ct:fail("Should not get message here")
|
|
after 1 ->
|
|
ok
|
|
end,
|
|
%% Sending should work again after the channel is healthy
|
|
ets:insert(OnGetStatusResponseETS, {status_value, connected}),
|
|
_ = emqx_bridge_v2:send_message(
|
|
bridge_type(),
|
|
my_test_bridge,
|
|
<<"my_msg">>,
|
|
#{}
|
|
),
|
|
receive
|
|
<<"my_msg">> ->
|
|
ok
|
|
after 10000 ->
|
|
ct:fail("Failed to receive message")
|
|
end,
|
|
unregister(registered_process_name()),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge).
|
|
|
|
t_send_message_unhealthy_connector(_) ->
|
|
ResponseETS = ets:new(response_ets, [public]),
|
|
ets:insert(ResponseETS, {on_start_value, conf}),
|
|
ets:insert(ResponseETS, {on_get_status_value, connecting}),
|
|
OnStartFun = wrap_fun(fun(Conf) ->
|
|
case ets:lookup_element(ResponseETS, on_start_value, 2) of
|
|
conf ->
|
|
{ok, Conf};
|
|
V ->
|
|
V
|
|
end
|
|
end),
|
|
OnGetStatusFun = wrap_fun(fun() ->
|
|
ets:lookup_element(ResponseETS, on_get_status_value, 2)
|
|
end),
|
|
ConConfig = emqx_utils_maps:deep_merge(con_config(), #{
|
|
<<"on_start_fun">> => OnStartFun,
|
|
<<"on_get_status_fun">> => OnGetStatusFun,
|
|
<<"resource_opts">> => #{<<"start_timeout">> => 100}
|
|
}),
|
|
ConName = ?FUNCTION_NAME,
|
|
{ok, _} = emqx_connector:create(con_type(), ConName, ConConfig),
|
|
BridgeConf = (bridge_config())#{
|
|
<<"connector">> => atom_to_binary(ConName)
|
|
},
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, BridgeConf),
|
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
%% Test that sending does not work when the connector is unhealthy (connecting)
|
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
register(registered_process_name(), self()),
|
|
_ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{timeout => 100}),
|
|
receive
|
|
Any ->
|
|
ct:pal("Received message: ~p", [Any]),
|
|
ct:fail("Should not get message here")
|
|
after 10 ->
|
|
ok
|
|
end,
|
|
%% We should have one alarm
|
|
1 = get_bridge_v2_alarm_cnt(),
|
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
%% Test that sending works again when the connector is healthy (connected)
|
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
ets:insert(ResponseETS, {on_get_status_value, connected}),
|
|
|
|
_ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{timeout => 1000}),
|
|
receive
|
|
<<"my_msg">> ->
|
|
ok
|
|
after 1000 ->
|
|
ct:fail("Failed to receive message")
|
|
end,
|
|
%% The alarm should be gone at this point
|
|
0 = get_bridge_v2_alarm_cnt(),
|
|
unregister(registered_process_name()),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
|
ok = emqx_connector:remove(con_type(), ConName),
|
|
ets:delete(ResponseETS),
|
|
ok.
|
|
|
|
t_connector_connected_to_connecting_to_connected_no_channel_restart(_) ->
|
|
ResponseETS = ets:new(response_ets, [public]),
|
|
ets:insert(ResponseETS, {on_start_value, conf}),
|
|
ets:insert(ResponseETS, {on_get_status_value, connected}),
|
|
OnStartFun = wrap_fun(fun(Conf) ->
|
|
case ets:lookup_element(ResponseETS, on_start_value, 2) of
|
|
conf ->
|
|
{ok, Conf};
|
|
V ->
|
|
V
|
|
end
|
|
end),
|
|
OnGetStatusFun = wrap_fun(fun() ->
|
|
ets:lookup_element(ResponseETS, on_get_status_value, 2)
|
|
end),
|
|
OnAddChannelCntr = counters:new(1, []),
|
|
OnAddChannelFun = wrap_fun(fun(_InstId, ConnectorState, _ChannelId, _ChannelConfig) ->
|
|
counters:add(OnAddChannelCntr, 1, 1),
|
|
{ok, ConnectorState}
|
|
end),
|
|
ConConfig = emqx_utils_maps:deep_merge(con_config(), #{
|
|
<<"on_start_fun">> => OnStartFun,
|
|
<<"on_get_status_fun">> => OnGetStatusFun,
|
|
<<"on_add_channel_fun">> => OnAddChannelFun,
|
|
<<"resource_opts">> => #{<<"start_timeout">> => 100}
|
|
}),
|
|
ConName = ?FUNCTION_NAME,
|
|
{ok, _} = emqx_connector:create(con_type(), ConName, ConConfig),
|
|
BridgeConf = (bridge_config())#{
|
|
<<"connector">> => atom_to_binary(ConName)
|
|
},
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, BridgeConf),
|
|
%% Wait until on_add_channel_fun is called at least once
|
|
wait_until(fun() ->
|
|
counters:get(OnAddChannelCntr, 1) =:= 1
|
|
end),
|
|
1 = counters:get(OnAddChannelCntr, 1),
|
|
%% We change the status of the connector
|
|
ets:insert(ResponseETS, {on_get_status_value, connecting}),
|
|
%% Wait until the status is changed
|
|
wait_until(fun() ->
|
|
{ok, BridgeData} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
|
|
maps:get(status, BridgeData) =:= connecting
|
|
end),
|
|
{ok, BridgeData1} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
|
|
ct:pal("Bridge V2 status changed to: ~p", [maps:get(status, BridgeData1)]),
|
|
%% We change the status again back to connected
|
|
ets:insert(ResponseETS, {on_get_status_value, connected}),
|
|
%% Wait until the status is connected again
|
|
wait_until(fun() ->
|
|
{ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
|
|
maps:get(status, BridgeData2) =:= connected
|
|
end),
|
|
%% On add channel should not have been called again
|
|
1 = counters:get(OnAddChannelCntr, 1),
|
|
%% We change the status to an error
|
|
ets:insert(ResponseETS, {on_get_status_value, {error, my_error}}),
|
|
%% Wait until the status is changed
|
|
wait_until(fun() ->
|
|
{ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
|
|
maps:get(status, BridgeData2) =:= disconnected
|
|
end),
|
|
%% Now we go back to connected
|
|
ets:insert(ResponseETS, {on_get_status_value, connected}),
|
|
wait_until(fun() ->
|
|
{ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
|
|
maps:get(status, BridgeData2) =:= connected
|
|
end),
|
|
%% Now the channel should have been removed and added again
|
|
wait_until(fun() ->
|
|
counters:get(OnAddChannelCntr, 1) =:= 2
|
|
end),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
|
ok = emqx_connector:remove(con_type(), ConName),
|
|
ets:delete(ResponseETS),
|
|
ok.
|
|
|
|
t_unhealthy_channel_alarm(_) ->
|
|
Conf = (bridge_config())#{
|
|
<<"on_get_channel_status_fun">> =>
|
|
wrap_fun(fun() -> {error, my_error} end)
|
|
},
|
|
0 = get_bridge_v2_alarm_cnt(),
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
|
|
1 = get_bridge_v2_alarm_cnt(),
|
|
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
|
0 = get_bridge_v2_alarm_cnt(),
|
|
ok.
|
|
|
|
get_bridge_v2_alarm_cnt() ->
|
|
Alarms = emqx_alarm:get_alarms(activated),
|
|
FilterFun = fun
|
|
(#{name := S}) when is_binary(S) -> string:find(S, "action") =/= nomatch;
|
|
(_) -> false
|
|
end,
|
|
length(lists:filter(FilterFun, Alarms)).
|
|
|
|
t_load_no_matching_connector(_Config) ->
|
|
Conf = bridge_config(),
|
|
BridgeTypeBin = atom_to_binary(bridge_type()),
|
|
BridgeNameBin0 = <<"my_test_bridge_update">>,
|
|
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeNameBin0, Conf)),
|
|
|
|
%% updating to invalid reference
|
|
RootConf0 = #{
|
|
BridgeTypeBin =>
|
|
#{BridgeNameBin0 => Conf#{<<"connector">> := <<"unknown">>}}
|
|
},
|
|
?assertMatch(
|
|
{error,
|
|
{post_config_update, _HandlerMod, [
|
|
#{
|
|
errors := [
|
|
{
|
|
{_, my_test_bridge_update},
|
|
{error, #{
|
|
bridge_name := my_test_bridge_update,
|
|
connector_name := <<"unknown">>,
|
|
bridge_type := _,
|
|
reason := <<"connector_not_found_or_wrong_type">>
|
|
}}
|
|
}
|
|
],
|
|
action := update
|
|
}
|
|
]}},
|
|
update_root_config(RootConf0)
|
|
),
|
|
|
|
%% creating new with invalid reference
|
|
BridgeNameBin1 = <<"my_test_bridge_new">>,
|
|
RootConf1 = #{
|
|
BridgeTypeBin =>
|
|
#{BridgeNameBin1 => Conf#{<<"connector">> := <<"unknown">>}}
|
|
},
|
|
?assertMatch(
|
|
{error,
|
|
{post_config_update, _HandlerMod, [
|
|
#{
|
|
errors := [
|
|
{
|
|
{_, my_test_bridge_new},
|
|
{error, #{
|
|
bridge_name := my_test_bridge_new,
|
|
connector_name := <<"unknown">>,
|
|
bridge_type := _,
|
|
reason := <<"connector_not_found_or_wrong_type">>
|
|
}}
|
|
}
|
|
],
|
|
action := create
|
|
}
|
|
]}},
|
|
update_root_config(RootConf1)
|
|
),
|
|
|
|
ok.
|
|
|
|
%% tests root config handler post config update hook
|
|
t_load_config_success(_Config) ->
|
|
Conf = bridge_config(),
|
|
BridgeType = bridge_type(),
|
|
BridgeTypeBin = atom_to_binary(BridgeType),
|
|
BridgeName = my_test_bridge_root,
|
|
BridgeNameBin = atom_to_binary(BridgeName),
|
|
|
|
%% pre-condition
|
|
?assertEqual(#{}, emqx_config:get([actions])),
|
|
|
|
%% create
|
|
RootConf0 = #{BridgeTypeBin => #{BridgeNameBin => Conf}},
|
|
?assertMatch(
|
|
{ok, _},
|
|
update_root_config(RootConf0)
|
|
),
|
|
BridgeTypeBin = bin(BridgeType),
|
|
BridgeNameBin = bin(BridgeName),
|
|
?assertMatch(
|
|
{ok, #{
|
|
type := BridgeTypeBin,
|
|
name := BridgeNameBin,
|
|
raw_config := #{},
|
|
resource_data := #{}
|
|
}},
|
|
emqx_bridge_v2:lookup(BridgeType, BridgeName)
|
|
),
|
|
|
|
%% update
|
|
RootConf1 = #{BridgeTypeBin => #{BridgeNameBin => Conf#{<<"some_key">> => <<"new_value">>}}},
|
|
?assertMatch(
|
|
{ok, _},
|
|
update_root_config(RootConf1)
|
|
),
|
|
?assertMatch(
|
|
{ok, #{
|
|
type := BridgeTypeBin,
|
|
name := BridgeNameBin,
|
|
raw_config := #{<<"some_key">> := <<"new_value">>},
|
|
resource_data := #{}
|
|
}},
|
|
emqx_bridge_v2:lookup(BridgeType, BridgeName)
|
|
),
|
|
|
|
%% delete
|
|
RootConf2 = #{},
|
|
?assertMatch(
|
|
{ok, _},
|
|
update_root_config(RootConf2)
|
|
),
|
|
?assertMatch(
|
|
{error, not_found},
|
|
emqx_bridge_v2:lookup(BridgeType, BridgeName)
|
|
),
|
|
|
|
ok.
|
|
|
|
t_create_no_matching_connector(_Config) ->
|
|
Conf = (bridge_config())#{<<"connector">> => <<"wrong_connector_name">>},
|
|
?assertMatch(
|
|
{error,
|
|
{pre_config_update, _HandlerMod, #{
|
|
bridge_name := _,
|
|
connector_name := _,
|
|
bridge_type := _,
|
|
reason := <<"connector_not_found_or_wrong_type">>
|
|
}}},
|
|
emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)
|
|
),
|
|
ok.
|
|
|
|
t_create_wrong_connector_type(_Config) ->
|
|
meck:expect(
|
|
emqx_bridge_v2_schema,
|
|
fields,
|
|
1,
|
|
bridge_schema(#{bridge_type => wrong_type})
|
|
),
|
|
Conf = bridge_config(),
|
|
?assertMatch(
|
|
{error,
|
|
{pre_config_update, _HandlerMod, #{
|
|
bridge_name := _,
|
|
connector_name := _,
|
|
bridge_type := wrong_type,
|
|
reason := <<"connector_not_found_or_wrong_type">>
|
|
}}},
|
|
emqx_bridge_v2:create(wrong_type, my_test_bridge, Conf)
|
|
),
|
|
ok.
|
|
|
|
t_update_connector_not_found(_Config) ->
|
|
Conf = bridge_config(),
|
|
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)),
|
|
BadConf = Conf#{<<"connector">> => <<"wrong_connector_name">>},
|
|
?assertMatch(
|
|
{error,
|
|
{pre_config_update, _HandlerMod, #{
|
|
bridge_name := _,
|
|
connector_name := _,
|
|
bridge_type := _,
|
|
reason := <<"connector_not_found_or_wrong_type">>
|
|
}}},
|
|
emqx_bridge_v2:create(bridge_type(), my_test_bridge, BadConf)
|
|
),
|
|
ok.
|
|
|
|
t_remove_single_connector_being_referenced_with_active_channels(_Config) ->
|
|
%% we test the connector post config update here because we also need bridges.
|
|
Conf = bridge_config(),
|
|
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)),
|
|
?assertMatch(
|
|
{error, {post_config_update, _HandlerMod, {active_channels, [_ | _]}}},
|
|
emqx_connector:remove(con_type(), con_name())
|
|
),
|
|
ok.
|
|
|
|
t_remove_single_connector_being_referenced_without_active_channels(_Config) ->
|
|
%% we test the connector post config update here because we also need bridges.
|
|
Conf = bridge_config(),
|
|
BridgeName = my_test_bridge,
|
|
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)),
|
|
emqx_common_test_helpers:with_mock(
|
|
emqx_bridge_v2_test_connector,
|
|
on_get_channels,
|
|
fun(_ResId) -> [] end,
|
|
fun() ->
|
|
?assertMatch(ok, emqx_connector:remove(con_type(), con_name())),
|
|
%% we no longer have connector data if this happens...
|
|
?assertMatch(
|
|
{ok, #{resource_data := #{}}},
|
|
emqx_bridge_v2:lookup(bridge_type(), BridgeName)
|
|
),
|
|
ok
|
|
end
|
|
),
|
|
ok.
|
|
|
|
t_remove_multiple_connectors_being_referenced_with_channels(_Config) ->
|
|
Conf = bridge_config(),
|
|
BridgeName = my_test_bridge,
|
|
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)),
|
|
?assertMatch(
|
|
{error,
|
|
{post_config_update, _HandlerMod, #{
|
|
reason := "connector_has_active_channels",
|
|
type := _,
|
|
connector_name := _,
|
|
active_channels := [_ | _]
|
|
}}},
|
|
update_root_connectors_config(#{})
|
|
),
|
|
ok.
|
|
|
|
t_remove_multiple_connectors_being_referenced_without_channels(_Config) ->
|
|
Conf = bridge_config(),
|
|
BridgeName = my_test_bridge,
|
|
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)),
|
|
emqx_common_test_helpers:with_mock(
|
|
emqx_bridge_v2_test_connector,
|
|
on_get_channels,
|
|
fun(_ResId) -> [] end,
|
|
fun() ->
|
|
?assertMatch(
|
|
{ok, _},
|
|
update_root_connectors_config(#{})
|
|
),
|
|
%% we no longer have connector data if this happens...
|
|
?assertMatch(
|
|
{ok, #{resource_data := #{}}},
|
|
emqx_bridge_v2:lookup(bridge_type(), BridgeName)
|
|
),
|
|
ok
|
|
end
|
|
),
|
|
ok.
|
|
|
|
t_start_operation_when_on_add_channel_gives_error(_Config) ->
|
|
Conf = bridge_config(),
|
|
BridgeName = my_test_bridge,
|
|
emqx_common_test_helpers:with_mock(
|
|
emqx_bridge_v2_test_connector,
|
|
on_add_channel,
|
|
fun(_, _, _ResId, _Channel) -> {error, <<"some_error">>} end,
|
|
fun() ->
|
|
%% We can crete the bridge event though on_add_channel returns error
|
|
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)),
|
|
?assertMatch(
|
|
#{
|
|
status := disconnected,
|
|
error := <<"some_error">>
|
|
},
|
|
emqx_bridge_v2:health_check(bridge_type(), BridgeName)
|
|
),
|
|
?assertMatch(
|
|
{ok, #{
|
|
status := disconnected,
|
|
error := <<"some_error">>
|
|
}},
|
|
emqx_bridge_v2:lookup(bridge_type(), BridgeName)
|
|
),
|
|
%% emqx_bridge_v2:start/2 should return ok if bridge if connected after
|
|
%% start and otherwise and error
|
|
?assertMatch({error, _}, emqx_bridge_v2:start(bridge_type(), BridgeName)),
|
|
%% Let us change on_add_channel to be successful and try again
|
|
ok = meck:expect(
|
|
emqx_bridge_v2_test_connector,
|
|
on_add_channel,
|
|
fun(_, _, _ResId, _Channel) -> {ok, #{}} end
|
|
),
|
|
?assertMatch(ok, emqx_bridge_v2:start(bridge_type(), BridgeName))
|
|
end
|
|
),
|
|
ok.
|
|
|
|
t_lookup_status_when_connecting(_Config) ->
|
|
ResponseETS = ets:new(response_ets, [public]),
|
|
ets:insert(ResponseETS, {on_get_status_value, ?status_connecting}),
|
|
OnGetStatusFun = wrap_fun(fun() ->
|
|
ets:lookup_element(ResponseETS, on_get_status_value, 2)
|
|
end),
|
|
|
|
ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{
|
|
<<"on_get_status_fun">> => OnGetStatusFun,
|
|
<<"resource_opts">> => #{<<"start_timeout">> => 100}
|
|
}),
|
|
ConnectorName = ?FUNCTION_NAME,
|
|
ct:pal("connector config:\n ~p", [ConnectorConfig]),
|
|
{ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig),
|
|
|
|
ActionName = my_test_action,
|
|
ChanStatusFun = wrap_fun(fun() -> ?status_disconnected end),
|
|
ActionConfig = (bridge_config())#{
|
|
<<"on_get_channel_status_fun">> => ChanStatusFun,
|
|
<<"connector">> => atom_to_binary(ConnectorName)
|
|
},
|
|
ct:pal("action config:\n ~p", [ActionConfig]),
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig),
|
|
|
|
%% Top-level status is connecting if the connector status is connecting, but the
|
|
%% channel is not yet installed. `resource_data.added_channels.$channel_id.status'
|
|
%% contains true internal status.
|
|
{ok, Res} = emqx_bridge_v2:lookup(bridge_type(), ActionName),
|
|
?assertMatch(
|
|
#{
|
|
%% This is the action's public status
|
|
status := ?status_connecting,
|
|
resource_data :=
|
|
#{
|
|
%% This is the connector's status
|
|
status := ?status_connecting
|
|
}
|
|
},
|
|
Res
|
|
),
|
|
#{resource_data := #{added_channels := Channels}} = Res,
|
|
[{_Id, ChannelData}] = maps:to_list(Channels),
|
|
?assertMatch(#{status := ?status_disconnected}, ChannelData),
|
|
ok.
|
|
|
|
t_rule_pointing_to_non_operational_channel(_Config) ->
|
|
%% Check that, if a rule sends a message to an action that is not yet installed and
|
|
%% uses `simple_async_internal_buffer', then it eventually increments the rule's
|
|
%% failed counter.
|
|
ResponseETS = ets:new(response_ets, [public]),
|
|
ets:insert(ResponseETS, {on_get_status_value, ?status_connecting}),
|
|
OnGetStatusFun = wrap_fun(fun() ->
|
|
ets:lookup_element(ResponseETS, on_get_status_value, 2)
|
|
end),
|
|
|
|
ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{
|
|
<<"on_get_status_fun">> => OnGetStatusFun,
|
|
<<"resource_opts">> => #{<<"start_timeout">> => 100}
|
|
}),
|
|
ConnectorName = ?FUNCTION_NAME,
|
|
ct:pal("connector config:\n ~p", [ConnectorConfig]),
|
|
?check_trace(
|
|
begin
|
|
{ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig),
|
|
|
|
ActionName = my_test_action,
|
|
ChanStatusFun = wrap_fun(fun() -> ?status_disconnected end),
|
|
ActionConfig = (bridge_config())#{
|
|
<<"on_get_channel_status_fun">> => ChanStatusFun,
|
|
<<"connector">> => atom_to_binary(ConnectorName)
|
|
},
|
|
ct:pal("action config:\n ~p", [ActionConfig]),
|
|
|
|
meck:new(con_mod(), [passthrough, no_history, non_strict]),
|
|
on_exit(fun() -> catch meck:unload([con_mod()]) end),
|
|
meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer),
|
|
meck:expect(con_mod(), callback_mode, 0, async_if_possible),
|
|
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig),
|
|
|
|
?assertMatch(
|
|
{ok, #{
|
|
error := <<"Not installed">>,
|
|
status := ?status_connecting,
|
|
resource_data := #{status := ?status_connecting}
|
|
}},
|
|
emqx_bridge_v2:lookup(bridge_type(), ActionName)
|
|
),
|
|
|
|
{ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
|
|
#{
|
|
sql => <<"select * from \"t/a\"">>,
|
|
id => atom_to_binary(?FUNCTION_NAME),
|
|
actions => [
|
|
<<
|
|
(atom_to_binary(bridge_type()))/binary,
|
|
":",
|
|
(atom_to_binary(ActionName))/binary
|
|
>>
|
|
]
|
|
}
|
|
),
|
|
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
|
Msg = emqx_message:make(<<"t/a">>, <<"payload">>),
|
|
emqx:publish(Msg),
|
|
|
|
ActionId = emqx_bridge_v2:id(bridge_type(), ActionName, ConnectorName),
|
|
?assertEqual(1, emqx_resource_metrics:matched_get(ActionId)),
|
|
?assertEqual(1, emqx_resource_metrics:failed_get(ActionId)),
|
|
?retry(
|
|
_Sleep0 = 100,
|
|
_Attempts = 20,
|
|
?assertMatch(
|
|
#{
|
|
counters :=
|
|
#{
|
|
matched := 1,
|
|
'actions.failed' := 1,
|
|
'actions.failed.unknown' := 1
|
|
}
|
|
},
|
|
emqx_metrics_worker:get_metrics(rule_metrics, RuleId)
|
|
)
|
|
),
|
|
|
|
ok
|
|
end,
|
|
[]
|
|
),
|
|
|
|
ok.
|
|
|
|
t_query_uses_action_query_mode(_Config) ->
|
|
%% Check that we compute the query mode from the action and not from the connector
|
|
%% when querying the resource.
|
|
|
|
%% Set one query mode for the connector...
|
|
meck:new(con_mod(), [passthrough, no_history, non_strict]),
|
|
on_exit(fun() -> catch meck:unload([con_mod()]) end),
|
|
meck:expect(con_mod(), query_mode, 1, sync),
|
|
meck:expect(con_mod(), callback_mode, 0, always_sync),
|
|
|
|
ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{
|
|
<<"resource_opts">> => #{<<"start_timeout">> => 100}
|
|
}),
|
|
ConnectorName = ?FUNCTION_NAME,
|
|
ct:pal("connector config:\n ~p", [ConnectorConfig]),
|
|
?check_trace(
|
|
begin
|
|
{ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig),
|
|
|
|
ActionName = my_test_action,
|
|
ActionConfig = (bridge_config())#{
|
|
<<"connector">> => atom_to_binary(ConnectorName)
|
|
},
|
|
ct:pal("action config:\n ~p", [ActionConfig]),
|
|
|
|
%% ... now we use a quite different query mode for the action
|
|
meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer),
|
|
meck:expect(con_mod(), callback_mode, 0, async_if_possible),
|
|
|
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig),
|
|
|
|
{ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
|
|
#{
|
|
sql => <<"select * from \"t/a\"">>,
|
|
id => atom_to_binary(?FUNCTION_NAME),
|
|
actions => [
|
|
<<
|
|
(atom_to_binary(bridge_type()))/binary,
|
|
":",
|
|
(atom_to_binary(ActionName))/binary
|
|
>>
|
|
]
|
|
}
|
|
),
|
|
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
|
|
|
|
Msg = emqx_message:make(<<"t/a">>, <<"payload">>),
|
|
{_, {ok, _}} =
|
|
?wait_async_action(
|
|
emqx:publish(Msg),
|
|
#{?snk_kind := call_query},
|
|
2_000
|
|
),
|
|
|
|
ok
|
|
end,
|
|
fun(Trace) ->
|
|
?assertMatch(
|
|
[#{query_mode := simple_async_internal_buffer}],
|
|
?of_kind(simple_query_override, Trace)
|
|
),
|
|
ok
|
|
end
|
|
),
|
|
ok.
|
|
|
|
%% Helper Functions
|
|
|
|
wait_until(Fun) ->
|
|
wait_until(Fun, 5000).
|
|
|
|
wait_until(Fun, Timeout) when Timeout >= 0 ->
|
|
case Fun() of
|
|
true ->
|
|
ok;
|
|
false ->
|
|
IdleTime = 100,
|
|
timer:sleep(IdleTime),
|
|
wait_until(Fun, Timeout - IdleTime)
|
|
end;
|
|
wait_until(_, _) ->
|
|
ct:fail("Wait until event did not happen").
|
|
|
|
bin(Bin) when is_binary(Bin) -> Bin;
|
|
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
|
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|