fix: broken test cases

This commit is contained in:
Kjell Winblad 2023-10-26 10:55:08 +02:00 committed by Zaiming (Stone) Shi
parent 7ee21cab20
commit bba5b42c99
4 changed files with 88 additions and 57 deletions

View File

@ -362,8 +362,8 @@ send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config);
#{enable := false} ->
{error, bridge_stopped};
Error ->
Error
_Error ->
{error, bridge_not_found}
end.
do_send_msg_with_enabled_config(

View File

@ -83,6 +83,9 @@ bridge_config() ->
}
}.
fun_table_name() ->
emqx_bridge_v2_SUITE_fun_table.
registered_process_name() ->
my_registered_process.
@ -124,15 +127,29 @@ end_per_suite(Config) ->
emqx_common_test_helpers:stop_apps(start_apps()).
init_per_testcase(_TestCase, Config) ->
ets:new(fun_table_name(), [named_table, public]),
%% Create a fake connector
{ok, _} = emqx_connector:create(con_type(), con_name(), con_config()),
Config.
end_per_testcase(_TestCase, Config) ->
ets:delete(fun_table_name()),
%% Remove the fake connector
{ok, _} = emqx_connector:remove(con_type(), con_name()),
Config.
%% Hocon does not support placing a fun in a config map so we replace it with a string
wrap_fun(Fun) ->
UniqRef = make_ref(),
UniqRefBin = term_to_binary(UniqRef),
UniqRefStr = iolist_to_binary(base64:encode(UniqRefBin)),
ets:insert(fun_table_name(), {UniqRefStr, Fun}),
UniqRefStr.
unwrap_fun(UniqRefStr) ->
ets:lookup_element(fun_table_name(), UniqRefStr, 2).
t_create_remove(_) ->
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
@ -155,28 +172,28 @@ t_create_dry_run(_) ->
t_create_dry_run_fail_add_channel(_) ->
Msg = <<"Failed to add channel">>,
OnAddChannel1 = fun() ->
OnAddChannel1 = wrap_fun(fun() ->
{error, Msg}
end,
end),
Conf1 = (bridge_config())#{on_add_channel_fun => OnAddChannel1},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1),
OnAddChannel2 = fun() ->
OnAddChannel2 = wrap_fun(fun() ->
throw(Msg)
end,
end),
Conf2 = (bridge_config())#{on_add_channel_fun => OnAddChannel2},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2),
ok.
t_create_dry_run_fail_get_channel_status(_) ->
Msg = <<"Failed to add channel">>,
Fun1 = fun() ->
Fun1 = wrap_fun(fun() ->
{error, Msg}
end,
end),
Conf1 = (bridge_config())#{on_get_channel_status_fun => Fun1},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1),
Fun2 = fun() ->
Fun2 = wrap_fun(fun() ->
throw(Msg)
end,
end),
Conf2 = (bridge_config())#{on_get_channel_status_fun => Fun2},
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2),
ok.
@ -205,7 +222,9 @@ t_manual_health_check(_) ->
ok.
t_manual_health_check_exception(_) ->
Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> throw(my_error) end},
Conf = (bridge_config())#{
<<"on_get_channel_status_fun">> => wrap_fun(fun() -> throw(my_error) end)
},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge
{error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
@ -213,7 +232,9 @@ t_manual_health_check_exception(_) ->
ok.
t_manual_health_check_exception_error(_) ->
Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> error(my_error) end},
Conf = (bridge_config())#{
<<"on_get_channel_status_fun">> => wrap_fun(fun() -> error(my_error) end)
},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge
{error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
@ -221,7 +242,9 @@ t_manual_health_check_exception_error(_) ->
ok.
t_manual_health_check_error(_) ->
Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> {error, my_error} end},
Conf = (bridge_config())#{
<<"on_get_channel_status_fun">> => wrap_fun(fun() -> {error, my_error} end)
},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge
{error, my_error} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
@ -246,7 +269,9 @@ t_send_message(_) ->
t_send_message_unhealthy_channel(_) ->
OnGetStatusResponseETS = ets:new(on_get_status_response_ets, [public]),
ets:insert(OnGetStatusResponseETS, {status_value, {error, my_error}}),
OnGetStatusFun = fun() -> ets:lookup_element(OnGetStatusResponseETS, status_value, 2) end,
OnGetStatusFun = wrap_fun(fun() ->
ets:lookup_element(OnGetStatusResponseETS, status_value, 2)
end),
Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => OnGetStatusFun},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Register name for this process
@ -265,7 +290,7 @@ t_send_message_unhealthy_channel(_) ->
bridge_type(),
my_test_bridge,
<<"my_msg">>,
#{resume_interval => 100}
#{}
),
receive
<<"my_msg">> ->
@ -278,7 +303,10 @@ t_send_message_unhealthy_channel(_) ->
ok.
t_unhealthy_channel_alarm(_) ->
Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> {error, my_error} end},
Conf = (bridge_config())#{
<<"on_get_channel_status_fun">> =>
wrap_fun(fun() -> {error, my_error} end)
},
0 = get_bridge_v2_alarm_cnt(),
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
1 = get_bridge_v2_alarm_cnt(),

View File

@ -44,8 +44,9 @@ on_add_channel(
_InstId,
_State,
_ChannelId,
#{on_add_channel_fun := Fun}
#{on_add_channel_fun := FunRef}
) ->
Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef),
Fun();
on_add_channel(
_InstId,
@ -80,8 +81,12 @@ on_query(
%% Lookup the channel
ChannelState = maps:get(ChannelId, Channels, not_found),
case ChannelState of
not_found -> throw(<<"Channel not active">>);
_ -> ok
not_found ->
error(
{recoverable_error, <<"Unexpected type for batch message (Expected send_message)">>}
);
_ ->
ok
end,
SendTo = maps:get(send_to, ChannelState),
SendTo ! Message,
@ -112,7 +117,8 @@ on_get_channel_status(
Channels = maps:get(channels, State),
ChannelState = maps:get(ChannelId, Channels),
case ChannelState of
#{on_get_channel_status_fun := Fun} ->
#{on_get_channel_status_fun := FunRef} ->
Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef),
Fun();
_ ->
connected

View File

@ -358,43 +358,40 @@ query(ResId, Request, Opts) ->
case get_query_mode_error(ResId, Opts) of
{error, _} = ErrorTuple ->
ErrorTuple;
{QM, Error} ->
case {QM, Error} of
{_, unhealthy_target} ->
emqx_resource_metrics:matched_inc(ResId),
emqx_resource_metrics:dropped_resource_stopped_inc(ResId),
?RESOURCE_ERROR(unhealthy_target, "unhealthy target");
{_, {unhealthy_target, _Message}} ->
emqx_resource_metrics:matched_inc(ResId),
emqx_resource_metrics:dropped_resource_stopped_inc(ResId),
?RESOURCE_ERROR(unhealthy_target, "unhealthy target");
{simple_async, _} ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
{simple_sync, _} ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts);
{simple_async_internal_buffer, _} ->
%% This is for bridges/connectors that have internal buffering, such
%% as Kafka and Pulsar producers.
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
{simple_sync_internal_buffer, _} ->
%% This is for bridges/connectors that have internal buffering, such
%% as Kafka and Pulsar producers.
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_internal_buffer_query(
ResId, Request, Opts
);
{sync, _} ->
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
{async, _} ->
emqx_resource_buffer_worker:async_query(ResId, Request, Opts)
end
{_, unhealthy_target} ->
emqx_resource_metrics:matched_inc(ResId),
emqx_resource_metrics:dropped_resource_stopped_inc(ResId),
?RESOURCE_ERROR(unhealthy_target, "unhealthy target");
{_, {unhealthy_target, _Message}} ->
emqx_resource_metrics:matched_inc(ResId),
emqx_resource_metrics:dropped_resource_stopped_inc(ResId),
?RESOURCE_ERROR(unhealthy_target, "unhealthy target");
{simple_async, _} ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
{simple_sync, _} ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts);
{simple_async_internal_buffer, _} ->
%% This is for bridges/connectors that have internal buffering, such
%% as Kafka and Pulsar producers.
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
{simple_sync_internal_buffer, _} ->
%% This is for bridges/connectors that have internal buffering, such
%% as Kafka and Pulsar producers.
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_internal_buffer_query(
ResId, Request, Opts
);
{sync, _} ->
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
{async, _} ->
emqx_resource_buffer_worker:async_query(ResId, Request, Opts)
end.
get_query_mode_error(ResId, Opts) ->