From bba5b42c992184b3ba203f754a9fb5f30b9a00bc Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 26 Oct 2023 10:55:08 +0200 Subject: [PATCH] fix: broken test cases --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 4 +- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 56 +++++++++++---- .../test/emqx_bridge_v2_test_connector.erl | 14 ++-- apps/emqx_resource/src/emqx_resource.erl | 71 +++++++++---------- 4 files changed, 88 insertions(+), 57 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 4560f9a3d..1ca2f8b18 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -362,8 +362,8 @@ send_message(BridgeType, BridgeName, Message, QueryOpts0) -> do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config); #{enable := false} -> {error, bridge_stopped}; - Error -> - Error + _Error -> + {error, bridge_not_found} end. do_send_msg_with_enabled_config( diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 95385c585..df8bfdf1a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -83,6 +83,9 @@ bridge_config() -> } }. +fun_table_name() -> + emqx_bridge_v2_SUITE_fun_table. + registered_process_name() -> my_registered_process. @@ -124,15 +127,29 @@ end_per_suite(Config) -> emqx_common_test_helpers:stop_apps(start_apps()). init_per_testcase(_TestCase, Config) -> + ets:new(fun_table_name(), [named_table, public]), %% Create a fake connector {ok, _} = emqx_connector:create(con_type(), con_name(), con_config()), Config. end_per_testcase(_TestCase, Config) -> + ets:delete(fun_table_name()), %% Remove the fake connector {ok, _} = emqx_connector:remove(con_type(), con_name()), Config. +%% Hocon does not support placing a fun in a config map so we replace it with a string + +wrap_fun(Fun) -> + UniqRef = make_ref(), + UniqRefBin = term_to_binary(UniqRef), + UniqRefStr = iolist_to_binary(base64:encode(UniqRefBin)), + ets:insert(fun_table_name(), {UniqRefStr, Fun}), + UniqRefStr. + +unwrap_fun(UniqRefStr) -> + ets:lookup_element(fun_table_name(), UniqRefStr, 2). + t_create_remove(_) -> {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()), {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), @@ -155,28 +172,28 @@ t_create_dry_run(_) -> t_create_dry_run_fail_add_channel(_) -> Msg = <<"Failed to add channel">>, - OnAddChannel1 = fun() -> + OnAddChannel1 = wrap_fun(fun() -> {error, Msg} - end, + end), Conf1 = (bridge_config())#{on_add_channel_fun => OnAddChannel1}, {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), - OnAddChannel2 = fun() -> + OnAddChannel2 = wrap_fun(fun() -> throw(Msg) - end, + end), Conf2 = (bridge_config())#{on_add_channel_fun => OnAddChannel2}, {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2), ok. t_create_dry_run_fail_get_channel_status(_) -> Msg = <<"Failed to add channel">>, - Fun1 = fun() -> + Fun1 = wrap_fun(fun() -> {error, Msg} - end, + end), Conf1 = (bridge_config())#{on_get_channel_status_fun => Fun1}, {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), - Fun2 = fun() -> + Fun2 = wrap_fun(fun() -> throw(Msg) - end, + end), Conf2 = (bridge_config())#{on_get_channel_status_fun => Fun2}, {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2), ok. @@ -205,7 +222,9 @@ t_manual_health_check(_) -> ok. t_manual_health_check_exception(_) -> - Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> throw(my_error) end}, + Conf = (bridge_config())#{ + <<"on_get_channel_status_fun">> => wrap_fun(fun() -> throw(my_error) end) + }, {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), %% Run a health check for the bridge {error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), @@ -213,7 +232,9 @@ t_manual_health_check_exception(_) -> ok. t_manual_health_check_exception_error(_) -> - Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> error(my_error) end}, + Conf = (bridge_config())#{ + <<"on_get_channel_status_fun">> => wrap_fun(fun() -> error(my_error) end) + }, {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), %% Run a health check for the bridge {error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), @@ -221,7 +242,9 @@ t_manual_health_check_exception_error(_) -> ok. t_manual_health_check_error(_) -> - Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> {error, my_error} end}, + Conf = (bridge_config())#{ + <<"on_get_channel_status_fun">> => wrap_fun(fun() -> {error, my_error} end) + }, {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), %% Run a health check for the bridge {error, my_error} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), @@ -246,7 +269,9 @@ t_send_message(_) -> t_send_message_unhealthy_channel(_) -> OnGetStatusResponseETS = ets:new(on_get_status_response_ets, [public]), ets:insert(OnGetStatusResponseETS, {status_value, {error, my_error}}), - OnGetStatusFun = fun() -> ets:lookup_element(OnGetStatusResponseETS, status_value, 2) end, + OnGetStatusFun = wrap_fun(fun() -> + ets:lookup_element(OnGetStatusResponseETS, status_value, 2) + end), Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => OnGetStatusFun}, {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), %% Register name for this process @@ -265,7 +290,7 @@ t_send_message_unhealthy_channel(_) -> bridge_type(), my_test_bridge, <<"my_msg">>, - #{resume_interval => 100} + #{} ), receive <<"my_msg">> -> @@ -278,7 +303,10 @@ t_send_message_unhealthy_channel(_) -> ok. t_unhealthy_channel_alarm(_) -> - Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> {error, my_error} end}, + Conf = (bridge_config())#{ + <<"on_get_channel_status_fun">> => + wrap_fun(fun() -> {error, my_error} end) + }, 0 = get_bridge_v2_alarm_cnt(), {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), 1 = get_bridge_v2_alarm_cnt(), diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl index 2774643a9..fd6e5698b 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl @@ -44,8 +44,9 @@ on_add_channel( _InstId, _State, _ChannelId, - #{on_add_channel_fun := Fun} + #{on_add_channel_fun := FunRef} ) -> + Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), Fun(); on_add_channel( _InstId, @@ -80,8 +81,12 @@ on_query( %% Lookup the channel ChannelState = maps:get(ChannelId, Channels, not_found), case ChannelState of - not_found -> throw(<<"Channel not active">>); - _ -> ok + not_found -> + error( + {recoverable_error, <<"Unexpected type for batch message (Expected send_message)">>} + ); + _ -> + ok end, SendTo = maps:get(send_to, ChannelState), SendTo ! Message, @@ -112,7 +117,8 @@ on_get_channel_status( Channels = maps:get(channels, State), ChannelState = maps:get(ChannelId, Channels), case ChannelState of - #{on_get_channel_status_fun := Fun} -> + #{on_get_channel_status_fun := FunRef} -> + Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), Fun(); _ -> connected diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index f7f878d82..603285f66 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -358,43 +358,40 @@ query(ResId, Request, Opts) -> case get_query_mode_error(ResId, Opts) of {error, _} = ErrorTuple -> ErrorTuple; - {QM, Error} -> - case {QM, Error} of - {_, unhealthy_target} -> - emqx_resource_metrics:matched_inc(ResId), - emqx_resource_metrics:dropped_resource_stopped_inc(ResId), - ?RESOURCE_ERROR(unhealthy_target, "unhealthy target"); - {_, {unhealthy_target, _Message}} -> - emqx_resource_metrics:matched_inc(ResId), - emqx_resource_metrics:dropped_resource_stopped_inc(ResId), - ?RESOURCE_ERROR(unhealthy_target, "unhealthy target"); - {simple_async, _} -> - %% TODO(5.1.1): pass Resource instead of ResId to simple APIs - %% so the buffer worker does not need to lookup the cache again - emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts); - {simple_sync, _} -> - %% TODO(5.1.1): pass Resource instead of ResId to simple APIs - %% so the buffer worker does not need to lookup the cache again - emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts); - {simple_async_internal_buffer, _} -> - %% This is for bridges/connectors that have internal buffering, such - %% as Kafka and Pulsar producers. - %% TODO(5.1.1): pass Resource instead of ResId to simple APIs - %% so the buffer worker does not need to lookup the cache again - emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts); - {simple_sync_internal_buffer, _} -> - %% This is for bridges/connectors that have internal buffering, such - %% as Kafka and Pulsar producers. - %% TODO(5.1.1): pass Resource instead of ResId to simple APIs - %% so the buffer worker does not need to lookup the cache again - emqx_resource_buffer_worker:simple_sync_internal_buffer_query( - ResId, Request, Opts - ); - {sync, _} -> - emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); - {async, _} -> - emqx_resource_buffer_worker:async_query(ResId, Request, Opts) - end + {_, unhealthy_target} -> + emqx_resource_metrics:matched_inc(ResId), + emqx_resource_metrics:dropped_resource_stopped_inc(ResId), + ?RESOURCE_ERROR(unhealthy_target, "unhealthy target"); + {_, {unhealthy_target, _Message}} -> + emqx_resource_metrics:matched_inc(ResId), + emqx_resource_metrics:dropped_resource_stopped_inc(ResId), + ?RESOURCE_ERROR(unhealthy_target, "unhealthy target"); + {simple_async, _} -> + %% TODO(5.1.1): pass Resource instead of ResId to simple APIs + %% so the buffer worker does not need to lookup the cache again + emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts); + {simple_sync, _} -> + %% TODO(5.1.1): pass Resource instead of ResId to simple APIs + %% so the buffer worker does not need to lookup the cache again + emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts); + {simple_async_internal_buffer, _} -> + %% This is for bridges/connectors that have internal buffering, such + %% as Kafka and Pulsar producers. + %% TODO(5.1.1): pass Resource instead of ResId to simple APIs + %% so the buffer worker does not need to lookup the cache again + emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts); + {simple_sync_internal_buffer, _} -> + %% This is for bridges/connectors that have internal buffering, such + %% as Kafka and Pulsar producers. + %% TODO(5.1.1): pass Resource instead of ResId to simple APIs + %% so the buffer worker does not need to lookup the cache again + emqx_resource_buffer_worker:simple_sync_internal_buffer_query( + ResId, Request, Opts + ); + {sync, _} -> + emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); + {async, _} -> + emqx_resource_buffer_worker:async_query(ResId, Request, Opts) end. get_query_mode_error(ResId, Opts) ->