diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 786b99c44..f27603bf9 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -35,6 +35,7 @@ ]). -export([ load/0 + , lookup/1 , lookup/2 , lookup/3 , list/0 @@ -191,6 +192,10 @@ list_bridges_by_connector(ConnectorId) -> [B || B = #{raw_config := #{<<"connector">> := Id}} <- list(), ConnectorId =:= Id]. +lookup(Id) -> + {Type, Name} = parse_bridge_id(Id), + lookup(Type, Name). + lookup(Type, Name) -> RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), lookup(Type, Name, RawConf). diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 807ad32f6..16da7395f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -160,6 +160,7 @@ t_http_crud_apis(_) -> } = jsx:decode(Bridge), %% send an message to emqx and the message should be forwarded to the HTTP server + wait_for_resource_ready(BridgeID, 5), Body = <<"my msg">>, emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), ?assert( @@ -212,6 +213,7 @@ t_http_crud_apis(_) -> }, jsx:decode(Bridge3Str)), %% send an message to emqx again, check the path has been changed + wait_for_resource_ready(BridgeID, 5), emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), ?assert( receive @@ -320,3 +322,14 @@ auth_header_() -> operation_path(Oper, BridgeID) -> uri(["bridges", BridgeID, "operation", Oper]). + +wait_for_resource_ready(InstId, 0) -> + ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]), + ct:fail(wait_resource_timeout); +wait_for_resource_ready(InstId, Retry) -> + case emqx_bridge:lookup(InstId) of + {ok, #{resource_data := #{status := started}}} -> ok; + _ -> + timer:sleep(100), + wait_for_resource_ready(InstId, Retry-1) + end. diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 4caf700a9..12a3a8e23 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -241,6 +241,7 @@ t_mqtt_conn_bridge_ingress(_) -> emqx:subscribe(LocalTopic), %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDIngress, 5), emqx:publish(emqx_message:make(RemoteTopic, Payload)), %% we should receive a message on the local broker, with specified topic ?assert( @@ -309,6 +310,7 @@ t_mqtt_conn_bridge_egress(_) -> emqx:subscribe(RemoteTopic), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(LocalTopic, Payload)), %% we should receive a message on the "remote" broker, with specified topic @@ -370,6 +372,7 @@ t_mqtt_conn_update(_) -> , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + wait_for_resource_ready(BridgeIDEgress, 2), %% then we try to update 'server' of the connector, to an unavailable IP address %% the update should fail because of 'unreachable' or 'connrefused' @@ -412,6 +415,11 @@ t_mqtt_conn_update2(_) -> , <<"status">> := <<"disconnected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + %% We try to fix the 'server' parameter, to another unavailable server.. + %% The update should success: we don't check the connectivity of the new config + %% if the resource is now disconnected. + {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), + ?MQTT_CONNECOTR2(<<"127.0.0.1:2604">>)), %% we fix the 'server' parameter to a normal one, it should work {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), @@ -444,9 +452,9 @@ t_mqtt_conn_update3(_) -> <<"name">> => ?BRIDGE_NAME_EGRESS }), #{ <<"id">> := BridgeIDEgress - , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + wait_for_resource_ready(BridgeIDEgress, 2), %% delete the connector should fail because it is in use by a bridge {ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []), @@ -499,6 +507,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> emqx:subscribe(LocalTopic), %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDIngress, 5), emqx:publish(emqx_message:make(RemoteTopic, Payload)), %% we should receive a message on the local broker, with specified topic ?assert( @@ -563,6 +572,7 @@ t_egress_mqtt_bridge_with_rules(_) -> emqx:subscribe(RemoteTopic), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(LocalTopic, Payload)), %% we should receive a message on the "remote" broker, with specified topic ?assert( @@ -583,6 +593,7 @@ t_egress_mqtt_bridge_with_rules(_) -> RuleTopic = <<"t/1">>, RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, emqx:subscribe(RemoteTopic2), + wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(RuleTopic, Payload2)), {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), #{ <<"id">> := RuleId @@ -646,3 +657,13 @@ auth_header_() -> {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), {"Authorization", "Bearer " ++ binary_to_list(Token)}. +wait_for_resource_ready(InstId, 0) -> + ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]), + ct:fail(wait_resource_timeout); +wait_for_resource_ready(InstId, Retry) -> + case emqx_bridge:lookup(InstId) of + {ok, #{resource_data := #{status := started}}} -> ok; + _ -> + timer:sleep(100), + wait_for_resource_ready(InstId, Retry-1) + end.