refactor(pluglib): move connectivity checks to `emqx_connector_lib`

This commit is contained in:
Andrew Mayorov 2023-06-09 00:22:53 +03:00
parent 8919a6ef93
commit e6fb0203b4
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
4 changed files with 88 additions and 19 deletions

View File

@ -47,6 +47,8 @@ fields(config) ->
%% `emqx_resource' API
%%========================================================================================
-define(HTTP_CONNECT_TIMEOUT, 1000).
callback_mode() -> always_sync.
is_buffer_supported() -> false.
@ -171,7 +173,7 @@ opentsdb_connectivity(Server) ->
<<"https://", _/binary>> -> Server;
_ -> "http://" ++ Server
end,
emqx_plugin_libs_rule:http_connectivity(SvrUrl).
emqx_connector_lib:http_connectivity(SvrUrl, ?HTTP_CONNECT_TIMEOUT).
format_opentsdb_msg(Msg) ->
maps:with(

View File

@ -15,8 +15,39 @@
%%--------------------------------------------------------------------
-module(emqx_connector_lib).
%% connectivity check
-export([
http_connectivity/2,
tcp_connectivity/3
]).
-export([resolve_dns/2]).
-spec http_connectivity(uri_string:uri_string(), timeout()) ->
ok | {error, Reason :: term()}.
http_connectivity(Url, Timeout) ->
case emqx_http_lib:uri_parse(Url) of
{ok, #{host := Host, port := Port}} ->
tcp_connectivity(Host, Port, Timeout);
{error, Reason} ->
{error, Reason}
end.
-spec tcp_connectivity(
Host :: inet:socket_address() | inet:hostname(),
Port :: inet:port_number(),
timeout()
) ->
ok | {error, Reason :: term()}.
tcp_connectivity(Host, Port, Timeout) ->
case gen_tcp:connect(Host, Port, emqx_utils:ipv6_probe([]), Timeout) of
{ok, Sock} ->
gen_tcp:close(Sock),
ok;
{error, Reason} ->
{error, Reason}
end.
%% @doc Mostly for meck.
resolve_dns(DNS, Type) ->
inet_res:lookup(DNS, in, Type).

View File

@ -0,0 +1,54 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_lib_tests).
-include_lib("eunit/include/eunit.hrl").
http_connectivity_ok_test() ->
{ok, Socket} = gen_tcp:listen(0, []),
{ok, Port} = inet:port(Socket),
?assertEqual(
ok,
emqx_connector_lib:http_connectivity("http://127.0.0.1:" ++ integer_to_list(Port), 1000)
),
gen_tcp:close(Socket).
http_connectivity_error_test() ->
{ok, Socket} = gen_tcp:listen(0, []),
{ok, Port} = inet:port(Socket),
ok = gen_tcp:close(Socket),
?assertEqual(
{error, econnrefused},
emqx_connector_lib:http_connectivity("http://127.0.0.1:" ++ integer_to_list(Port), 1000)
).
tcp_connectivity_ok_test() ->
{ok, Socket} = gen_tcp:listen(0, []),
{ok, Port} = inet:port(Socket),
?assertEqual(
ok,
emqx_connector_lib:tcp_connectivity("127.0.0.1", Port, 1000)
),
ok = gen_tcp:close(Socket).
tcp_connectivity_error_test() ->
{ok, Socket} = gen_tcp:listen(0, []),
{ok, Port} = inet:port(Socket),
ok = gen_tcp:close(Socket),
?assertEqual(
{error, econnrefused},
emqx_connector_lib:tcp_connectivity("127.0.0.1", Port, 1000)
).

View File

@ -21,26 +21,8 @@
-include_lib("eunit/include/eunit.hrl").
-define(PORT, 9876).
all() -> emqx_common_test_helpers:all(?MODULE).
t_http_connectivity(_) ->
{ok, Socket} = gen_tcp:listen(?PORT, []),
ok = emqx_plugin_libs_rule:http_connectivity(
"http://127.0.0.1:" ++ integer_to_list(?PORT), 1000
),
gen_tcp:close(Socket),
{error, _} = emqx_plugin_libs_rule:http_connectivity(
"http://127.0.0.1:" ++ integer_to_list(?PORT), 1000
).
t_tcp_connectivity(_) ->
{ok, Socket} = gen_tcp:listen(?PORT, []),
ok = emqx_plugin_libs_rule:tcp_connectivity("127.0.0.1", ?PORT, 1000),
gen_tcp:close(Socket),
{error, _} = emqx_plugin_libs_rule:tcp_connectivity("127.0.0.1", ?PORT, 1000).
t_atom_key(_) ->
_ = erlang,
_ = port,