diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 851089acb..6de7d8695 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -485,7 +485,7 @@ schema("/bridges_probe") -> RequestMeta = #{module => ?MODULE, method => post, path => "/bridges_probe"}, case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of {ok, #{body := #{<<"type">> := ConnType} = Params}} -> - case do_probe(ConnType, maps:remove(<<"type">>, Params)) of + case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params)) of ok -> {204}; {error, Error} -> @@ -495,49 +495,6 @@ schema("/bridges_probe") -> BadRequest end. -do_probe(ConnType, Params) -> - case test_connection(host_and_port(ConnType, Params)) of - ok -> - emqx_bridge_resource:create_dry_run(ConnType, Params); - Error -> - Error - end. - -host_and_port(mqtt, #{<<"server">> := Server}) -> - case string:split(Server, ":") of - [Host, Port] -> {Host, list_to_integer(Port)}; - _Other -> error(invalid_server, Server) - end; -host_and_port(webhook, #{<<"url">> := Url}) -> - {BaseUrl, _Path} = parse_url(Url), - {ok, #{host := Host, port := Port}} = emqx_http_lib:uri_parse(BaseUrl), - {Host, Port}; -host_and_port(_Unknown, _) -> - undefined. - -%% [TODO] remove in EMQX-8588 when resource manager handles things more elegantly -test_connection(undefined) -> - %% be friendly, it might fail later on with a 'timeout' error. - ok; -test_connection({Host, Port}) -> - case gen_tcp:connect(Host, Port, [], 5000) of - {ok, TestSocket} -> gen_tcp:close(TestSocket); - Error -> Error - end. - -parse_url(Url) -> - case string:split(Url, "//", leading) of - [Scheme, UrlRem] -> - case string:split(UrlRem, "/", leading) of - [HostPort, Path] -> - {iolist_to_binary([Scheme, "//", HostPort]), Path}; - [HostPort] -> - {iolist_to_binary([Scheme, "//", HostPort]), <<>>} - end; - [Url] -> - error({invalid_url, Url}) - end. - lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> Nodes = mria_mnesia:running_nodes(), case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index a32019e41..a77da7544 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -33,13 +33,21 @@ ) ) ). --define(HTTP_BRIDGE(URL, TYPE, NAME), #{ +-define(BRIDGE(NAME, TYPE), #{ + <<"ssl">> => #{<<"enable">> => false}, <<"type">> => TYPE, - <<"name">> => NAME, + <<"name">> => NAME +}). +-define(MQTT_BRIDGE(SERVER), ?BRIDGE(<<"mqtt_egress_test_bridge">>, <<"mqtt">>)#{ + <<"server">> => SERVER, + <<"username">> => <<"user1">>, + <<"password">> => <<"">>, + <<"proto_ver">> => <<"v5">> +}). +-define(HTTP_BRIDGE(URL, TYPE, NAME), ?BRIDGE(NAME, TYPE)#{ <<"url">> => URL, <<"local_topic">> => <<"emqx_webhook/#">>, <<"method">> => <<"post">>, - <<"ssl">> => #{<<"enable">> => false}, <<"body">> => <<"${payload}">>, <<"headers">> => #{ <<"content-type">> => <<"application/json">> @@ -596,16 +604,6 @@ t_with_redact_update(_Config) -> ?assertEqual(Password, Value), ok. --define(MQTT_BRIDGE(Server), #{ - <<"server">> => Server, - <<"username">> => <<"user1">>, - <<"password">> => <<"">>, - <<"proto_ver">> => <<"v5">>, - <<"ssl">> => #{<<"enable">> => false}, - <<"type">> => <<"mqtt">>, - <<"name">> => <<"mqtt_egress_test_bridge">> -}). - t_bridges_probe(Config) -> Port = ?config(port, Config), URL = ?URL(Port, "some_path"), @@ -623,11 +621,18 @@ t_bridges_probe(Config) -> ?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME) ), - {ok, 400, _} = request( + {ok, 400, NxDomain} = request( post, uri(["bridges_probe"]), ?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>, ?BRIDGE_TYPE, ?BRIDGE_NAME) ), + ?assertMatch( + #{ + <<"code">> := <<"TEST_FAILED">>, + <<"message">> := _ + }, + jsx:decode(NxDomain) + ), {ok, 204, _} = request( post, @@ -635,11 +640,25 @@ t_bridges_probe(Config) -> ?MQTT_BRIDGE(<<"127.0.0.1:1883">>) ), - {ok, 400, _} = request( + {ok, 400, ConnRefused} = request( post, uri(["bridges_probe"]), ?MQTT_BRIDGE(<<"127.0.0.1:2883">>) ), + ?assertMatch( + #{ + <<"code">> := <<"TEST_FAILED">>, + <<"message">> := <<"#{reason => econnrefused", _/binary>> + }, + jsx:decode(ConnRefused) + ), + + {ok, 400, BadReq} = request( + post, + uri(["bridges_probe"]), + ?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>) + ), + ?assertMatch(#{<<"code">> := <<"BAD_REQUEST">>}, jsx:decode(BadReq)), ok. request(Method, Url, Body) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 8ad3fdd80..821bcbc5c 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -174,6 +174,9 @@ create_dry_run(ResourceType, Config) -> case wait_for_ready(ResId, 15000) of ok -> remove(ResId); + {error, Reason} -> + _ = remove(ResId), + {error, Reason}; timeout -> _ = remove(ResId), {error, timeout} @@ -632,16 +635,18 @@ data_record_to_external_map_with_metrics(Data) -> metrics => get_metrics(Data#data.id) }. --spec wait_for_ready(resource_id(), integer()) -> ok | timeout. +-spec wait_for_ready(resource_id(), integer()) -> ok | timeout | {error, term()}. wait_for_ready(ResId, WaitTime) -> do_wait_for_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY). do_wait_for_ready(_ResId, 0) -> timeout; do_wait_for_ready(ResId, Retry) -> - case ets_lookup(ResId) of - {ok, _Group, #{status := connected}} -> + case read_cache(ResId) of + {_Group, #data{status = connected}} -> ok; + {_Group, #data{status = disconnected, error = Reason}} -> + {error, Reason}; _ -> timer:sleep(?WAIT_FOR_RESOURCE_DELAY), do_wait_for_ready(ResId, Retry - 1)