feat(emqx_bridge): add /bridges_probe API endpoint
This commit is contained in:
parent
903ae9a644
commit
96ca0d9f49
|
@ -134,4 +134,20 @@ NOTE:不允许在单节点上启用/禁用 Bridge"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
desc_api9 {
|
||||||
|
desc {
|
||||||
|
en: """
|
||||||
|
Test creating a new bridge by given ID </br>
|
||||||
|
The ID must be of format '{type}:{name}'
|
||||||
|
"""
|
||||||
|
zh: """
|
||||||
|
通过给定的 ID 测试创建一个新的桥接。 </br>
|
||||||
|
ID 的格式必须为 ’{type}:{name}”
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
label: {
|
||||||
|
en: "Test Bridge Creation"
|
||||||
|
zh: "测试桥接创建"
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,8 @@
|
||||||
'/bridges/:id'/2,
|
'/bridges/:id'/2,
|
||||||
'/bridges/:id/operation/:operation'/2,
|
'/bridges/:id/operation/:operation'/2,
|
||||||
'/nodes/:node/bridges/:id/operation/:operation'/2,
|
'/nodes/:node/bridges/:id/operation/:operation'/2,
|
||||||
'/bridges/:id/reset_metrics'/2
|
'/bridges/:id/reset_metrics'/2,
|
||||||
|
'/bridges_probe'/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([lookup_from_local_node/2]).
|
-export([lookup_from_local_node/2]).
|
||||||
|
@ -68,7 +69,8 @@ paths() ->
|
||||||
"/bridges/:id",
|
"/bridges/:id",
|
||||||
"/bridges/:id/operation/:operation",
|
"/bridges/:id/operation/:operation",
|
||||||
"/nodes/:node/bridges/:id/operation/:operation",
|
"/nodes/:node/bridges/:id/operation/:operation",
|
||||||
"/bridges/:id/reset_metrics"
|
"/bridges/:id/reset_metrics",
|
||||||
|
"/bridges_probe"
|
||||||
].
|
].
|
||||||
|
|
||||||
error_schema(Code, Message) when is_atom(Code) ->
|
error_schema(Code, Message) when is_atom(Code) ->
|
||||||
|
@ -384,6 +386,23 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
|
||||||
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
|
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
schema("/bridges_probe") ->
|
||||||
|
#{
|
||||||
|
'operationId' => '/bridges_probe',
|
||||||
|
post => #{
|
||||||
|
tags => [<<"bridges">>],
|
||||||
|
desc => ?DESC("desc_api9"),
|
||||||
|
summary => <<"Test creating bridge">>,
|
||||||
|
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||||
|
emqx_bridge_schema:post_request(),
|
||||||
|
bridge_info_examples(post)
|
||||||
|
),
|
||||||
|
responses => #{
|
||||||
|
204 => <<"Test bridge OK">>,
|
||||||
|
400 => error_schema(['TEST_FAILED'], "bridge test failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
'/bridges'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
|
'/bridges'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
|
||||||
|
@ -462,6 +481,59 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
|
'/bridges_probe'(post, Request) ->
|
||||||
|
RequestMeta = #{module => ?MODULE, method => post, path => "/bridges_probe"},
|
||||||
|
case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of
|
||||||
|
{ok, #{body := #{<<"type">> := ConnType} = Params}} ->
|
||||||
|
case do_probe(ConnType, maps:remove(<<"type">>, Params)) of
|
||||||
|
ok ->
|
||||||
|
{204};
|
||||||
|
{error, Error} ->
|
||||||
|
{400, error_msg('TEST_FAILED', Error)}
|
||||||
|
end;
|
||||||
|
BadRequest ->
|
||||||
|
BadRequest
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_probe(ConnType, Params) ->
|
||||||
|
case test_connection(host_and_port(ConnType, Params)) of
|
||||||
|
ok ->
|
||||||
|
emqx_bridge_resource:create_dry_run(ConnType, Params);
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
host_and_port(mqtt, #{<<"server">> := Server}) ->
|
||||||
|
Server;
|
||||||
|
host_and_port(webhook, #{<<"url">> := Url}) ->
|
||||||
|
{BaseUrl, _Path} = parse_url(Url),
|
||||||
|
{ok, #{host := Host, port := Port}} = emqx_http_lib:uri_parse(BaseUrl),
|
||||||
|
{Host, Port};
|
||||||
|
host_and_port(_Unknown, _) ->
|
||||||
|
undefined.
|
||||||
|
|
||||||
|
test_connection(undefined) ->
|
||||||
|
%% be friendly, it might fail later on with a 'timeout' error.
|
||||||
|
ok;
|
||||||
|
test_connection({Host, Port}) ->
|
||||||
|
case gen_tcp:connect(Host, Port, []) of
|
||||||
|
{ok, TestSocket} -> gen_tcp:close(TestSocket);
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
parse_url(Url) ->
|
||||||
|
case string:split(Url, "//", leading) of
|
||||||
|
[Scheme, UrlRem] ->
|
||||||
|
case string:split(UrlRem, "/", leading) of
|
||||||
|
[HostPort, Path] ->
|
||||||
|
{iolist_to_binary([Scheme, "//", HostPort]), Path};
|
||||||
|
[HostPort] ->
|
||||||
|
{iolist_to_binary([Scheme, "//", HostPort]), <<>>}
|
||||||
|
end;
|
||||||
|
[Url] ->
|
||||||
|
error({invalid_url, Url})
|
||||||
|
end.
|
||||||
|
|
||||||
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
|
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
|
||||||
Nodes = mria_mnesia:running_nodes(),
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
|
case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
|
||||||
|
|
|
@ -213,14 +213,16 @@ recreate(Type, Name, Conf, Opts) ->
|
||||||
Opts
|
Opts
|
||||||
).
|
).
|
||||||
|
|
||||||
create_dry_run(Type, Conf) ->
|
create_dry_run(Type, Conf0) ->
|
||||||
TmpPath = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]),
|
TmpPath = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]),
|
||||||
|
Conf = emqx_map_lib:safe_atom_key_map(Conf0),
|
||||||
case emqx_connector_ssl:convert_certs(TmpPath, Conf) of
|
case emqx_connector_ssl:convert_certs(TmpPath, Conf) of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
{ok, ConfNew} ->
|
{ok, ConfNew} ->
|
||||||
|
ParseConf = parse_confs(bin(Type), TmpPath, ConfNew),
|
||||||
Res = emqx_resource:create_dry_run_local(
|
Res = emqx_resource:create_dry_run_local(
|
||||||
bridge_to_resource_type(Type), ConfNew
|
bridge_to_resource_type(Type), ParseConf
|
||||||
),
|
),
|
||||||
_ = maybe_clear_certs(TmpPath, ConfNew),
|
_ = maybe_clear_certs(TmpPath, ConfNew),
|
||||||
Res
|
Res
|
||||||
|
|
|
@ -596,6 +596,52 @@ t_with_redact_update(_Config) ->
|
||||||
?assertEqual(Password, Value),
|
?assertEqual(Password, Value),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
-define(MQTT_BRIDGE(Server), #{
|
||||||
|
<<"server">> => Server,
|
||||||
|
<<"username">> => <<"user1">>,
|
||||||
|
<<"password">> => <<"">>,
|
||||||
|
<<"proto_ver">> => <<"v5">>,
|
||||||
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
|
<<"type">> => <<"mqtt">>,
|
||||||
|
<<"name">> => <<"mqtt_egress_test_bridge">>
|
||||||
|
}).
|
||||||
|
|
||||||
|
t_bridges_probe(Config) ->
|
||||||
|
Port = ?config(port, Config),
|
||||||
|
URL = ?URL(Port, "some_path"),
|
||||||
|
|
||||||
|
{ok, 204, <<>>} = request(
|
||||||
|
post,
|
||||||
|
uri(["bridges_probe"]),
|
||||||
|
?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% second time with same name is ok since no real bridge created
|
||||||
|
{ok, 204, <<>>} = request(
|
||||||
|
post,
|
||||||
|
uri(["bridges_probe"]),
|
||||||
|
?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME)
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, 400, _} = request(
|
||||||
|
post,
|
||||||
|
uri(["bridges_probe"]),
|
||||||
|
?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>, ?BRIDGE_TYPE, ?BRIDGE_NAME)
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, 204, _} = request(
|
||||||
|
post,
|
||||||
|
uri(["bridges_probe"]),
|
||||||
|
?MQTT_BRIDGE(<<"127.0.0.1:1883">>)
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, 400, _} = request(
|
||||||
|
post,
|
||||||
|
uri(["bridges_probe"]),
|
||||||
|
?MQTT_BRIDGE(<<"127.0.0.1:2883">>)
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
request(Method, Url, Body) ->
|
request(Method, Url, Body) ->
|
||||||
request(<<"bridge_admin">>, Method, Url, Body).
|
request(<<"bridge_admin">>, Method, Url, Body).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue