From 5cfe151f7cb0fff275657be7c83e26dbe18e82c9 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Sat, 28 Oct 2023 15:18:36 +0200 Subject: [PATCH] fix: type conversion for rule with converted bridge V1 reference * Make sure that a rule that refer to a bridge that has been converted to a bridge V2 bridge gets its type converted if needed. * Add test case for sending message to a Bridge V2 through a rule --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 3 + .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 57 ++++++++++++++++++- .../src/emqx_rule_actions.erl | 11 +++- 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index d343f5ec8..2d48ddb0a 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -888,6 +888,9 @@ bridge_v1_type_to_bridge_v2_type(kafka) -> bridge_v1_type_to_bridge_v2_type(azure_event_hub) -> azure_event_hub. +%% This function should return true for all inputs that are bridge V1 types for +%% bridges that have been refactored to bridge V2s, and for all all bridge V2 +%% types. For everything else the function should return false. is_bridge_v2_type(Atom) when is_atom(Atom) -> is_bridge_v2_type(atom_to_binary(Atom, utf8)); is_bridge_v2_type(<<"kafka">>) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 0d6ee86f0..61d555724 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -98,7 +98,14 @@ registered_process_name() -> all() -> emqx_common_test_helpers:all(?MODULE). -start_apps() -> [emqx, emqx_conf, emqx_connector, emqx_bridge]. +start_apps() -> + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge, + emqx_rule_engine + ]. setup_mocks() -> MeckOpts = [passthrough, no_link, no_history, non_strict], @@ -115,7 +122,14 @@ setup_mocks() -> catch meck:new(emqx_bridge_v2, MeckOpts), meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()), meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()), - + IsBridgeV2TypeFun = fun(Type) -> + BridgeV2Type = bridge_type(), + case Type of + BridgeV2Type -> true; + _ -> false + end + end, + meck:expect(emqx_bridge_v2, is_bridge_v2_type, 1, IsBridgeV2TypeFun), ok. init_per_suite(Config) -> @@ -135,7 +149,8 @@ app_specs() -> emqx, emqx_conf, emqx_connector, - emqx_bridge + emqx_bridge, + emqx_rule_engine ]. init_per_testcase(_TestCase, Config) -> @@ -314,6 +329,42 @@ t_send_message(_) -> {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. +t_send_message_through_rule(_) -> + BridgeName = my_test_bridge, + {ok, _} = emqx_bridge_v2:create(bridge_type(), BridgeName, bridge_config()), + %% Create a rule to send message to the bridge + {ok, _} = emqx_rule_engine:create_rule( + #{ + sql => <<"select * from \"t/a\"">>, + id => atom_to_binary(?FUNCTION_NAME), + actions => [ + << + (atom_to_binary(bridge_type()))/binary, + ":", + (atom_to_binary(BridgeName))/binary + >> + ], + description => <<"bridge_v2 test rule">> + } + ), + %% Register name for this process + register(registered_process_name(), self()), + %% Send message to the topic + ClientId = atom_to_binary(?FUNCTION_NAME), + Payload = <<"hello">>, + Msg = emqx_message:make(ClientId, 0, <<"t/a">>, Payload), + emqx:publish(Msg), + receive + #{payload := Payload} -> + ok + after 10000 -> + ct:fail("Failed to receive message") + end, + unregister(registered_process_name()), + ok = emqx_rule_engine:delete_rule(atom_to_binary(?FUNCTION_NAME)), + {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), + ok. + t_send_message_unhealthy_channel(_) -> OnGetStatusResponseETS = ets:new(on_get_status_response_ets, [public]), ets:insert(OnGetStatusResponseETS, {status_value, {error, my_error}}), diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 79da878d0..f136cd5df 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -47,7 +47,16 @@ parse_action(BridgeId) when is_binary(BridgeId) -> {Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId), case emqx_bridge_v2:is_bridge_v2_type(Type) of true -> - {bridge_v2, Type, Name}; + %% Could be an old bridge V1 type that should be converted to a V2 type + try emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type) of + BridgeV2Type -> + {bridge_v2, BridgeV2Type, Name} + catch + _:_ -> + %% We got a bridge v2 type that is not also a bridge v1 + %% type + {bridge_v2, Type, Name} + end; false -> {bridge, Type, Name, emqx_bridge_resource:resource_id(Type, Name)} end;