Merge pull request #9708 from sstrigler/EMQX-8588-refactor-emqx-resource-manager

EMQX 8588 refactor emqx resource manager
This commit is contained in:
Stefan Strigler 2023-01-11 16:13:55 +01:00 committed by GitHub
commit 417d95d76f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 62 deletions

View File

@ -485,7 +485,7 @@ schema("/bridges_probe") ->
RequestMeta = #{module => ?MODULE, method => post, path => "/bridges_probe"},
case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of
{ok, #{body := #{<<"type">> := ConnType} = Params}} ->
case do_probe(ConnType, maps:remove(<<"type">>, Params)) of
case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params)) of
ok ->
{204};
{error, Error} ->
@ -495,49 +495,6 @@ schema("/bridges_probe") ->
BadRequest
end.
do_probe(ConnType, Params) ->
case test_connection(host_and_port(ConnType, Params)) of
ok ->
emqx_bridge_resource:create_dry_run(ConnType, Params);
Error ->
Error
end.
host_and_port(mqtt, #{<<"server">> := Server}) ->
case string:split(Server, ":") of
[Host, Port] -> {Host, list_to_integer(Port)};
_Other -> error(invalid_server, Server)
end;
host_and_port(webhook, #{<<"url">> := Url}) ->
{BaseUrl, _Path} = parse_url(Url),
{ok, #{host := Host, port := Port}} = emqx_http_lib:uri_parse(BaseUrl),
{Host, Port};
host_and_port(_Unknown, _) ->
undefined.
%% [TODO] remove in EMQX-8588 when resource manager handles things more elegantly
test_connection(undefined) ->
%% be friendly, it might fail later on with a 'timeout' error.
ok;
test_connection({Host, Port}) ->
case gen_tcp:connect(Host, Port, [], 5000) of
{ok, TestSocket} -> gen_tcp:close(TestSocket);
Error -> Error
end.
parse_url(Url) ->
case string:split(Url, "//", leading) of
[Scheme, UrlRem] ->
case string:split(UrlRem, "/", leading) of
[HostPort, Path] ->
{iolist_to_binary([Scheme, "//", HostPort]), Path};
[HostPort] ->
{iolist_to_binary([Scheme, "//", HostPort]), <<>>}
end;
[Url] ->
error({invalid_url, Url})
end.
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
Nodes = mria_mnesia:running_nodes(),
case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of

View File

@ -33,13 +33,21 @@
)
)
).
-define(HTTP_BRIDGE(URL, TYPE, NAME), #{
-define(BRIDGE(NAME, TYPE), #{
<<"ssl">> => #{<<"enable">> => false},
<<"type">> => TYPE,
<<"name">> => NAME,
<<"name">> => NAME
}).
-define(MQTT_BRIDGE(SERVER), ?BRIDGE(<<"mqtt_egress_test_bridge">>, <<"mqtt">>)#{
<<"server">> => SERVER,
<<"username">> => <<"user1">>,
<<"password">> => <<"">>,
<<"proto_ver">> => <<"v5">>
}).
-define(HTTP_BRIDGE(URL, TYPE, NAME), ?BRIDGE(NAME, TYPE)#{
<<"url">> => URL,
<<"local_topic">> => <<"emqx_webhook/#">>,
<<"method">> => <<"post">>,
<<"ssl">> => #{<<"enable">> => false},
<<"body">> => <<"${payload}">>,
<<"headers">> => #{
<<"content-type">> => <<"application/json">>
@ -596,16 +604,6 @@ t_with_redact_update(_Config) ->
?assertEqual(Password, Value),
ok.
-define(MQTT_BRIDGE(Server), #{
<<"server">> => Server,
<<"username">> => <<"user1">>,
<<"password">> => <<"">>,
<<"proto_ver">> => <<"v5">>,
<<"ssl">> => #{<<"enable">> => false},
<<"type">> => <<"mqtt">>,
<<"name">> => <<"mqtt_egress_test_bridge">>
}).
t_bridges_probe(Config) ->
Port = ?config(port, Config),
URL = ?URL(Port, "some_path"),
@ -623,11 +621,18 @@ t_bridges_probe(Config) ->
?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME)
),
{ok, 400, _} = request(
{ok, 400, NxDomain} = request(
post,
uri(["bridges_probe"]),
?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>, ?BRIDGE_TYPE, ?BRIDGE_NAME)
),
?assertMatch(
#{
<<"code">> := <<"TEST_FAILED">>,
<<"message">> := _
},
jsx:decode(NxDomain)
),
{ok, 204, _} = request(
post,
@ -635,11 +640,25 @@ t_bridges_probe(Config) ->
?MQTT_BRIDGE(<<"127.0.0.1:1883">>)
),
{ok, 400, _} = request(
{ok, 400, ConnRefused} = request(
post,
uri(["bridges_probe"]),
?MQTT_BRIDGE(<<"127.0.0.1:2883">>)
),
?assertMatch(
#{
<<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"#{reason => econnrefused", _/binary>>
},
jsx:decode(ConnRefused)
),
{ok, 400, BadReq} = request(
post,
uri(["bridges_probe"]),
?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>)
),
?assertMatch(#{<<"code">> := <<"BAD_REQUEST">>}, jsx:decode(BadReq)),
ok.
request(Method, Url, Body) ->

View File

@ -174,6 +174,9 @@ create_dry_run(ResourceType, Config) ->
case wait_for_ready(ResId, 15000) of
ok ->
remove(ResId);
{error, Reason} ->
_ = remove(ResId),
{error, Reason};
timeout ->
_ = remove(ResId),
{error, timeout}
@ -632,16 +635,18 @@ data_record_to_external_map_with_metrics(Data) ->
metrics => get_metrics(Data#data.id)
}.
-spec wait_for_ready(resource_id(), integer()) -> ok | timeout.
-spec wait_for_ready(resource_id(), integer()) -> ok | timeout | {error, term()}.
wait_for_ready(ResId, WaitTime) ->
do_wait_for_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY).
do_wait_for_ready(_ResId, 0) ->
timeout;
do_wait_for_ready(ResId, Retry) ->
case ets_lookup(ResId) of
{ok, _Group, #{status := connected}} ->
case read_cache(ResId) of
{_Group, #data{status = connected}} ->
ok;
{_Group, #data{status = disconnected, error = Reason}} ->
{error, Reason};
_ ->
timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
do_wait_for_ready(ResId, Retry - 1)