fix: type conversion for rule with converted bridge V1 reference
* Make sure that a rule that refer to a bridge that has been converted to a bridge V2 bridge gets its type converted if needed. * Add test case for sending message to a Bridge V2 through a rule
This commit is contained in:
parent
e1009998c9
commit
5cfe151f7c
|
@ -888,6 +888,9 @@ bridge_v1_type_to_bridge_v2_type(kafka) ->
|
||||||
bridge_v1_type_to_bridge_v2_type(azure_event_hub) ->
|
bridge_v1_type_to_bridge_v2_type(azure_event_hub) ->
|
||||||
azure_event_hub.
|
azure_event_hub.
|
||||||
|
|
||||||
|
%% This function should return true for all inputs that are bridge V1 types for
|
||||||
|
%% bridges that have been refactored to bridge V2s, and for all all bridge V2
|
||||||
|
%% types. For everything else the function should return false.
|
||||||
is_bridge_v2_type(Atom) when is_atom(Atom) ->
|
is_bridge_v2_type(Atom) when is_atom(Atom) ->
|
||||||
is_bridge_v2_type(atom_to_binary(Atom, utf8));
|
is_bridge_v2_type(atom_to_binary(Atom, utf8));
|
||||||
is_bridge_v2_type(<<"kafka">>) ->
|
is_bridge_v2_type(<<"kafka">>) ->
|
||||||
|
|
|
@ -98,7 +98,14 @@ registered_process_name() ->
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
start_apps() -> [emqx, emqx_conf, emqx_connector, emqx_bridge].
|
start_apps() ->
|
||||||
|
[
|
||||||
|
emqx,
|
||||||
|
emqx_conf,
|
||||||
|
emqx_connector,
|
||||||
|
emqx_bridge,
|
||||||
|
emqx_rule_engine
|
||||||
|
].
|
||||||
|
|
||||||
setup_mocks() ->
|
setup_mocks() ->
|
||||||
MeckOpts = [passthrough, no_link, no_history, non_strict],
|
MeckOpts = [passthrough, no_link, no_history, non_strict],
|
||||||
|
@ -115,7 +122,14 @@ setup_mocks() ->
|
||||||
catch meck:new(emqx_bridge_v2, MeckOpts),
|
catch meck:new(emqx_bridge_v2, MeckOpts),
|
||||||
meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()),
|
meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()),
|
||||||
meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()),
|
meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()),
|
||||||
|
IsBridgeV2TypeFun = fun(Type) ->
|
||||||
|
BridgeV2Type = bridge_type(),
|
||||||
|
case Type of
|
||||||
|
BridgeV2Type -> true;
|
||||||
|
_ -> false
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
meck:expect(emqx_bridge_v2, is_bridge_v2_type, 1, IsBridgeV2TypeFun),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
@ -135,7 +149,8 @@ app_specs() ->
|
||||||
emqx,
|
emqx,
|
||||||
emqx_conf,
|
emqx_conf,
|
||||||
emqx_connector,
|
emqx_connector,
|
||||||
emqx_bridge
|
emqx_bridge,
|
||||||
|
emqx_rule_engine
|
||||||
].
|
].
|
||||||
|
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
|
@ -314,6 +329,42 @@ t_send_message(_) ->
|
||||||
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_send_message_through_rule(_) ->
|
||||||
|
BridgeName = my_test_bridge,
|
||||||
|
{ok, _} = emqx_bridge_v2:create(bridge_type(), BridgeName, bridge_config()),
|
||||||
|
%% Create a rule to send message to the bridge
|
||||||
|
{ok, _} = emqx_rule_engine:create_rule(
|
||||||
|
#{
|
||||||
|
sql => <<"select * from \"t/a\"">>,
|
||||||
|
id => atom_to_binary(?FUNCTION_NAME),
|
||||||
|
actions => [
|
||||||
|
<<
|
||||||
|
(atom_to_binary(bridge_type()))/binary,
|
||||||
|
":",
|
||||||
|
(atom_to_binary(BridgeName))/binary
|
||||||
|
>>
|
||||||
|
],
|
||||||
|
description => <<"bridge_v2 test rule">>
|
||||||
|
}
|
||||||
|
),
|
||||||
|
%% Register name for this process
|
||||||
|
register(registered_process_name(), self()),
|
||||||
|
%% Send message to the topic
|
||||||
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
Msg = emqx_message:make(ClientId, 0, <<"t/a">>, Payload),
|
||||||
|
emqx:publish(Msg),
|
||||||
|
receive
|
||||||
|
#{payload := Payload} ->
|
||||||
|
ok
|
||||||
|
after 10000 ->
|
||||||
|
ct:fail("Failed to receive message")
|
||||||
|
end,
|
||||||
|
unregister(registered_process_name()),
|
||||||
|
ok = emqx_rule_engine:delete_rule(atom_to_binary(?FUNCTION_NAME)),
|
||||||
|
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_send_message_unhealthy_channel(_) ->
|
t_send_message_unhealthy_channel(_) ->
|
||||||
OnGetStatusResponseETS = ets:new(on_get_status_response_ets, [public]),
|
OnGetStatusResponseETS = ets:new(on_get_status_response_ets, [public]),
|
||||||
ets:insert(OnGetStatusResponseETS, {status_value, {error, my_error}}),
|
ets:insert(OnGetStatusResponseETS, {status_value, {error, my_error}}),
|
||||||
|
|
|
@ -47,7 +47,16 @@ parse_action(BridgeId) when is_binary(BridgeId) ->
|
||||||
{Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId),
|
{Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId),
|
||||||
case emqx_bridge_v2:is_bridge_v2_type(Type) of
|
case emqx_bridge_v2:is_bridge_v2_type(Type) of
|
||||||
true ->
|
true ->
|
||||||
{bridge_v2, Type, Name};
|
%% Could be an old bridge V1 type that should be converted to a V2 type
|
||||||
|
try emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(Type) of
|
||||||
|
BridgeV2Type ->
|
||||||
|
{bridge_v2, BridgeV2Type, Name}
|
||||||
|
catch
|
||||||
|
_:_ ->
|
||||||
|
%% We got a bridge v2 type that is not also a bridge v1
|
||||||
|
%% type
|
||||||
|
{bridge_v2, Type, Name}
|
||||||
|
end;
|
||||||
false ->
|
false ->
|
||||||
{bridge, Type, Name, emqx_bridge_resource:resource_id(Type, Name)}
|
{bridge, Type, Name, emqx_bridge_resource:resource_id(Type, Name)}
|
||||||
end;
|
end;
|
||||||
|
|
Loading…
Reference in New Issue