refactor: HTTP connector into emqx_bridge_http app

This commit is contained in:
Kjell Winblad 2023-07-06 17:14:05 +02:00 committed by Ivan Dyachkov
parent 3fcfacbaa1
commit f28510b3ad
38 changed files with 132 additions and 60 deletions

View File

@ -6,7 +6,8 @@
{emqx_connector, {path, "../emqx_connector"}}, {emqx_connector, {path, "../emqx_connector"}},
{emqx_mongodb, {path, "../emqx_mongodb"}}, {emqx_mongodb, {path, "../emqx_mongodb"}},
{emqx_redis, {path, "../emqx_redis"}}, {emqx_redis, {path, "../emqx_redis"}},
{emqx_mysql, {path, "../emqx_mysql"}} {emqx_mysql, {path, "../emqx_mysql"}},
{emqx_bridge_http, {path, "../emqx_bridge_http"}}
]}. ]}.
{edoc_opts, [{preprocess, true}]}. {edoc_opts, [{preprocess, true}]}.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_authn, [ {application, emqx_authn, [
{description, "EMQX Authentication"}, {description, "EMQX Authentication"},
{vsn, "0.1.22"}, {vsn, "0.1.23"},
{modules, []}, {modules, []},
{registered, [emqx_authn_sup, emqx_authn_registry]}, {registered, [emqx_authn_sup, emqx_authn_registry]},
{applications, [ {applications, [
@ -15,7 +15,8 @@
jose, jose,
emqx_mongodb, emqx_mongodb,
emqx_redis, emqx_redis,
emqx_mysql emqx_mysql,
emqx_bridge_http
]}, ]},
{mod, {emqx_authn_app, []}}, {mod, {emqx_authn_app, []}},
{env, []}, {env, []},

View File

@ -102,7 +102,7 @@ common_fields() ->
[ [
pool_type pool_type
], ],
maps:from_list(emqx_connector_http:fields(config)) maps:from_list(emqx_bridge_http_connector:fields(config))
) )
). ).
@ -185,14 +185,14 @@ create(Config0) ->
{Config, State} = parse_config(Config0), {Config, State} = parse_config(Config0),
{ok, _Data} = emqx_authn_utils:create_resource( {ok, _Data} = emqx_authn_utils:create_resource(
ResourceId, ResourceId,
emqx_connector_http, emqx_bridge_http_connector,
Config Config
), ),
{ok, State#{resource_id => ResourceId}}. {ok, State#{resource_id => ResourceId}}.
update(Config0, #{resource_id := ResourceId} = _State) -> update(Config0, #{resource_id := ResourceId} = _State) ->
{Config, NState} = parse_config(Config0), {Config, NState} = parse_config(Config0),
case emqx_authn_utils:update_resource(emqx_connector_http, Config, ResourceId) of case emqx_authn_utils:update_resource(emqx_bridge_http_connector, Config, ResourceId) of
{error, Reason} -> {error, Reason} ->
error({load_config_error, Reason}); error({load_config_error, Reason});
{ok, _} -> {ok, _} ->

View File

@ -7,7 +7,8 @@
{emqx_connector, {path, "../emqx_connector"}}, {emqx_connector, {path, "../emqx_connector"}},
{emqx_mongodb, {path, "../emqx_mongodb"}}, {emqx_mongodb, {path, "../emqx_mongodb"}},
{emqx_redis, {path, "../emqx_redis"}}, {emqx_redis, {path, "../emqx_redis"}},
{emqx_mysql, {path, "../emqx_mysql"}} {emqx_mysql, {path, "../emqx_mysql"}},
{emqx_bridge_http, {path, "../emqx_bridge_http"}}
]}. ]}.
{shell, [ {shell, [

View File

@ -12,7 +12,8 @@
emqx_connector, emqx_connector,
emqx_mongodb, emqx_mongodb,
emqx_redis, emqx_redis,
emqx_mysql emqx_mysql,
emqx_bridge_http
]}, ]},
{env, []}, {env, []},
{modules, []}, {modules, []},

View File

@ -118,7 +118,7 @@ authz_http_common_fields() ->
[ [
pool_type pool_type
], ],
maps:from_list(emqx_connector_http:fields(config)) maps:from_list(emqx_bridge_http_connector:fields(config))
) )
). ).

View File

@ -62,12 +62,12 @@ description() ->
create(Config) -> create(Config) ->
NConfig = parse_config(Config), NConfig = parse_config(Config),
ResourceId = emqx_authn_utils:make_resource_id(?MODULE), ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
{ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_connector_http, NConfig), {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_bridge_http_connector, NConfig),
NConfig#{annotations => #{id => ResourceId}}. NConfig#{annotations => #{id => ResourceId}}.
update(Config) -> update(Config) ->
NConfig = parse_config(Config), NConfig = parse_config(Config),
case emqx_authz_utils:update_resource(emqx_connector_http, NConfig) of case emqx_authz_utils:update_resource(emqx_bridge_http_connector, NConfig) of
{error, Reason} -> error({load_config_error, Reason}); {error, Reason} -> error({load_config_error, Reason});
{ok, Id} -> NConfig#{annotations => #{id => Id}} {ok, Id} -> NConfig#{annotations => #{id => Id}}
end. end.

View File

@ -391,6 +391,8 @@ connector_fields(DB) ->
connector_fields(DB, config). connector_fields(DB, config).
connector_fields(DB, Fields) when DB =:= redis; DB =:= mysql -> connector_fields(DB, Fields) when DB =:= redis; DB =:= mysql ->
connector_fields(DB, Fields, emqx); connector_fields(DB, Fields, emqx);
connector_fields(DB, Fields) when DB =:= http ->
connector_fields(bridge_http_connector, Fields, emqx);
connector_fields(DB, Fields) -> connector_fields(DB, Fields) ->
connector_fields(DB, Fields, emqx_connector). connector_fields(DB, Fields, emqx_connector).

View File

@ -62,14 +62,14 @@
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http; bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector;
bridge_to_resource_type(webhook) -> emqx_connector_http; bridge_to_resource_type(webhook) -> emqx_bridge_http_connector;
bridge_to_resource_type(BridgeType) -> emqx_bridge_enterprise:resource_type(BridgeType). bridge_to_resource_type(BridgeType) -> emqx_bridge_enterprise:resource_type(BridgeType).
-else. -else.
bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http; bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector;
bridge_to_resource_type(webhook) -> emqx_connector_http. bridge_to_resource_type(webhook) -> emqx_bridge_http_connector.
-endif. -endif.
resource_id(BridgeId) when is_binary(BridgeId) -> resource_id(BridgeId) when is_binary(BridgeId) ->

View File

@ -68,7 +68,7 @@ basic_config() ->
)} )}
] ++ webhook_creation_opts() ++ ] ++ webhook_creation_opts() ++
proplists:delete( proplists:delete(
max_retries, emqx_connector_http:fields(config) max_retries, emqx_bridge_http_connector:fields(config)
). ).
request_config() -> request_config() ->

View File

@ -64,18 +64,18 @@ init_per_testcase(t_send_async_connection_timeout, Config) ->
init_per_testcase(t_path_not_found, Config) -> init_per_testcase(t_path_not_found, Config) ->
HTTPPath = <<"/nonexisting/path">>, HTTPPath = <<"/nonexisting/path">>,
ServerSSLOpts = false, ServerSSLOpts = false,
{ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link( {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
_Port = random, HTTPPath, ServerSSLOpts _Port = random, HTTPPath, ServerSSLOpts
), ),
ok = emqx_connector_web_hook_server:set_handler(not_found_http_handler()), ok = emqx_bridge_http_connector_test_server:set_handler(not_found_http_handler()),
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
init_per_testcase(t_too_many_requests, Config) -> init_per_testcase(t_too_many_requests, Config) ->
HTTPPath = <<"/path">>, HTTPPath = <<"/path">>,
ServerSSLOpts = false, ServerSSLOpts = false,
{ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link( {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
_Port = random, HTTPPath, ServerSSLOpts _Port = random, HTTPPath, ServerSSLOpts
), ),
ok = emqx_connector_web_hook_server:set_handler(too_many_requests_http_handler()), ok = emqx_bridge_http_connector_test_server:set_handler(too_many_requests_http_handler()),
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
Server = start_http_server(#{response_delay_ms => 0}), Server = start_http_server(#{response_delay_ms => 0}),
@ -85,7 +85,7 @@ end_per_testcase(TestCase, _Config) when
TestCase =:= t_path_not_found; TestCase =:= t_path_not_found;
TestCase =:= t_too_many_requests TestCase =:= t_too_many_requests
-> ->
ok = emqx_connector_web_hook_server:stop(), ok = emqx_bridge_http_connector_test_server:stop(),
persistent_term:erase({?MODULE, times_called}), persistent_term:erase({?MODULE, times_called}),
emqx_bridge_testlib:delete_all_bridges(), emqx_bridge_testlib:delete_all_bridges(),
emqx_common_test_helpers:call_janitor(), emqx_common_test_helpers:call_janitor(),
@ -552,7 +552,7 @@ do_t_async_retries(TestContext, Error, Fn) ->
Attempts + 1 Attempts + 1
end, end,
emqx_common_test_helpers:with_mock( emqx_common_test_helpers:with_mock(
emqx_connector_http, emqx_bridge_http_connector,
reply_delegator, reply_delegator,
fun(Context, ReplyFunAndArgs, Result) -> fun(Context, ReplyFunAndArgs, Result) ->
Attempts = GetAndBump(), Attempts = GetAndBump(),

View File

@ -10,7 +10,8 @@
{deps, [ {deps, [
{emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}} {emqx_bridge, {path, "../../apps/emqx_bridge"}},
{emqx_bridge_http, {path, "../emqx_bridge_http"}}
]}. ]}.
{xref_checks, [ {xref_checks, [

View File

@ -6,7 +6,7 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge, emqx_bridge_http,
ehttpc ehttpc
]}, ]},
{env, []}, {env, []},

View File

@ -243,7 +243,7 @@ handle_result(
) -> ) ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "gcp_pubsub_error_response", msg => "gcp_pubsub_error_response",
request => emqx_connector_http:redact_request(Request), request => emqx_bridge_http_connector:redact_request(Request),
connector => ResourceId, connector => ResourceId,
status_code => StatusCode, status_code => StatusCode,
resp_body => RespBody resp_body => RespBody
@ -252,7 +252,7 @@ handle_result(
handle_result({error, #{status_code := StatusCode}} = Result, Request, _QueryMode, ResourceId) -> handle_result({error, #{status_code := StatusCode}} = Result, Request, _QueryMode, ResourceId) ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "gcp_pubsub_error_response", msg => "gcp_pubsub_error_response",
request => emqx_connector_http:redact_request(Request), request => emqx_bridge_http_connector:redact_request(Request),
connector => ResourceId, connector => ResourceId,
status_code => StatusCode status_code => StatusCode
}), }),

View File

@ -246,10 +246,10 @@ start_echo_http_server() ->
{versions, ['tlsv1.2', 'tlsv1.3']}, {versions, ['tlsv1.2', 'tlsv1.3']},
{ciphers, ["ECDHE-RSA-AES256-GCM-SHA384", "TLS_CHACHA20_POLY1305_SHA256"]} {ciphers, ["ECDHE-RSA-AES256-GCM-SHA384", "TLS_CHACHA20_POLY1305_SHA256"]}
] ++ certs(), ] ++ certs(),
{ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link( {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
random, HTTPPath, ServerSSLOpts random, HTTPPath, ServerSSLOpts
), ),
ok = emqx_connector_web_hook_server:set_handler(success_http_handler()), ok = emqx_bridge_http_connector_test_server:set_handler(success_http_handler()),
HTTPHost = "localhost", HTTPHost = "localhost",
HostPort = HTTPHost ++ ":" ++ integer_to_list(HTTPPort), HostPort = HTTPHost ++ ":" ++ integer_to_list(HTTPPort),
true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort), true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort),
@ -261,7 +261,7 @@ start_echo_http_server() ->
stop_echo_http_server() -> stop_echo_http_server() ->
os:unsetenv("PUBSUB_EMULATOR_HOST"), os:unsetenv("PUBSUB_EMULATOR_HOST"),
ok = emqx_connector_web_hook_server:stop(). ok = emqx_bridge_http_connector_test_server:stop().
certs() -> certs() ->
CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"), CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"),
@ -983,7 +983,7 @@ t_publish_econnrefused(Config) ->
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
assert_empty_metrics(ResourceId), assert_empty_metrics(ResourceId),
ok = emqx_connector_web_hook_server:stop(), ok = emqx_bridge_http_connector_test_server:stop(),
do_econnrefused_or_timeout_test(Config, econnrefused). do_econnrefused_or_timeout_test(Config, econnrefused).
t_publish_timeout(Config) -> t_publish_timeout(Config) ->
@ -1019,7 +1019,7 @@ t_publish_timeout(Config) ->
), ),
{ok, Rep, State} {ok, Rep, State}
end, end,
ok = emqx_connector_web_hook_server:set_handler(TimeoutHandler), ok = emqx_bridge_http_connector_test_server:set_handler(TimeoutHandler),
do_econnrefused_or_timeout_test(Config, timeout). do_econnrefused_or_timeout_test(Config, timeout).
do_econnrefused_or_timeout_test(Config, Error) -> do_econnrefused_or_timeout_test(Config, Error) ->
@ -1149,7 +1149,7 @@ t_success_no_body(Config) ->
), ),
{ok, Rep, State} {ok, Rep, State}
end, end,
ok = emqx_connector_web_hook_server:set_handler(SuccessNoBodyHandler), ok = emqx_bridge_http_connector_test_server:set_handler(SuccessNoBodyHandler),
Topic = <<"t/topic">>, Topic = <<"t/topic">>,
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
@ -1187,7 +1187,7 @@ t_failure_with_body(Config) ->
), ),
{ok, Rep, State} {ok, Rep, State}
end, end,
ok = emqx_connector_web_hook_server:set_handler(FailureWithBodyHandler), ok = emqx_bridge_http_connector_test_server:set_handler(FailureWithBodyHandler),
Topic = <<"t/topic">>, Topic = <<"t/topic">>,
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
@ -1225,7 +1225,7 @@ t_failure_no_body(Config) ->
), ),
{ok, Rep, State} {ok, Rep, State}
end, end,
ok = emqx_connector_web_hook_server:set_handler(FailureNoBodyHandler), ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler),
Topic = <<"t/topic">>, Topic = <<"t/topic">>,
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
@ -1271,7 +1271,7 @@ t_unrecoverable_error(Config) ->
), ),
{ok, Rep, State} {ok, Rep, State}
end, end,
ok = emqx_connector_web_hook_server:set_handler(FailureNoBodyHandler), ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler),
Topic = <<"t/topic">>, Topic = <<"t/topic">>,
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
assert_empty_metrics(ResourceId), assert_empty_metrics(ResourceId),

View File

@ -0,0 +1,36 @@
# EMQX HTTP Broker Bridge
This application enables EMQX to connect to any HTTP API, conforming to the
HTTP standard. The connection is established via the [HTTP][1] bridge abstraction,
which facilitates the unidirectional flow of data from EMQX to the HTTP API
(egress).
Users can define a rule and efficiently transfer data to a remote HTTP API
utilizing [EMQX Rules][2].
# Documentation
- For instructions on how to use the EMQX dashboard to set up an egress bridge,
refer to [Bridge Data into HTTP API][3].
- To understand the EMQX rules engine, please refer to [EMQX Rules][2].
# HTTP APIs
We provide a range of APIs for bridge management. For more detailed
information, refer to [API Docs -Bridges][4].
# Contributing
For those interested in contributing, please consult our
[contributing guide](../../CONTRIBUTING.md).
# License
This software is under the Apache License 2.0. For more details, see
[LICENSE](../../APL.txt).
[1]: https://tools.ietf.org/html/rfc2616
[2]: https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html
[3]: https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-webhook.html
[4]: https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges

View File

@ -0,0 +1 @@
toxiproxy

View File

@ -0,0 +1,10 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}}
, {emqx_bridge, {path, "../../apps/emqx_bridge"}}
]}.
{shell, [
{apps, [emqx_bridge_http]}
]}.

View File

@ -0,0 +1,9 @@
{application, emqx_bridge_http, [
{description, "EMQX HTTP Bridge and Connector Application"},
{vsn, "0.1.1"},
{registered, []},
{applications, [kernel, stdlib, emqx_connector, emqx_resource, emqx_bridge]},
{env, []},
{modules, []},
{links, []}
]}.

View File

@ -14,7 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_connector_http). -module(emqx_bridge_http_connector).
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").

View File

@ -14,7 +14,7 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_connector_web_hook_server). -module(emqx_bridge_http_connector_test_server).
-compile([nowarn_export_all, export_all]). -compile([nowarn_export_all, export_all]).

View File

@ -13,7 +13,7 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_connector_http_tests). -module(emqx_bridge_http_connector_tests).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -47,10 +47,10 @@ wrap_auth_headers_test_() ->
headers => auth_headers() headers => auth_headers()
} }
}, },
{ok, #{request := #{headers := Headers}} = State} = emqx_connector_http:on_start( {ok, #{request := #{headers := Headers}} = State} = emqx_bridge_http_connector:on_start(
<<"test">>, Config <<"test">>, Config
), ),
{ok, 200, Req} = emqx_connector_http:on_query(foo, {send_message, #{}}, State), {ok, 200, Req} = emqx_bridge_http_connector:on_query(foo, {send_message, #{}}, State),
Tests = Tests =
[ [
?_assert(is_wrapped(V)) ?_assert(is_wrapped(V))

View File

@ -8,7 +8,8 @@
{emqx, {path, "../../apps/emqx"}}, {emqx, {path, "../../apps/emqx"}},
{emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}} {emqx_bridge, {path, "../../apps/emqx_bridge"}},
{emqx_bridge_http, {path, "../emqx_bridge_http"}}
]}. ]}.
{plugins, [rebar3_path_deps]}. {plugins, [rebar3_path_deps]}.
{project_plugins, [erlfmt]}. {project_plugins, [erlfmt]}.

View File

@ -11,7 +11,7 @@
kernel, kernel,
stdlib, stdlib,
emqx_resource, emqx_resource,
emqx_bridge, emqx_bridge_http,
%% for module emqx_connector_http %% for module emqx_connector_http
emqx_connector emqx_connector
]}, ]},

View File

@ -118,7 +118,7 @@ basic_config() ->
] ++ resource_creation_opts() ++ ] ++ resource_creation_opts() ++
proplists_without( proplists_without(
[max_retries, base_url, request], [max_retries, base_url, request],
emqx_connector_http:fields(config) emqx_bridge_http_connector:fields(config)
). ).
proplists_without(Keys, List) -> proplists_without(Keys, List) ->

View File

@ -65,7 +65,7 @@ callback_mode() -> async_if_possible.
on_start(InstanceId, Config) -> on_start(InstanceId, Config) ->
%% [FIXME] The configuration passed in here is pre-processed and transformed %% [FIXME] The configuration passed in here is pre-processed and transformed
%% in emqx_bridge_resource:parse_confs/2. %% in emqx_bridge_resource:parse_confs/2.
case emqx_connector_http:on_start(InstanceId, Config) of case emqx_bridge_http_connector:on_start(InstanceId, Config) of
{ok, State} -> {ok, State} ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "iotdb_bridge_started", msg => "iotdb_bridge_started",
@ -90,14 +90,14 @@ on_stop(InstanceId, State) ->
msg => "stopping_iotdb_bridge", msg => "stopping_iotdb_bridge",
connector => InstanceId connector => InstanceId
}), }),
Res = emqx_connector_http:on_stop(InstanceId, State), Res = emqx_bridge_http_connector:on_stop(InstanceId, State),
?tp(iotdb_bridge_stopped, #{instance_id => InstanceId}), ?tp(iotdb_bridge_stopped, #{instance_id => InstanceId}),
Res. Res.
-spec on_get_status(manager_id(), state()) -> -spec on_get_status(manager_id(), state()) ->
{connected, state()} | {disconnected, state(), term()}. {connected, state()} | {disconnected, state(), term()}.
on_get_status(InstanceId, State) -> on_get_status(InstanceId, State) ->
emqx_connector_http:on_get_status(InstanceId, State). emqx_bridge_http_connector:on_get_status(InstanceId, State).
-spec on_query(manager_id(), {send_message, map()}, state()) -> -spec on_query(manager_id(), {send_message, map()}, state()) ->
{ok, pos_integer(), [term()], term()} {ok, pos_integer(), [term()], term()}
@ -114,7 +114,7 @@ on_query(InstanceId, {send_message, Message}, State) ->
case make_iotdb_insert_request(Message, State) of case make_iotdb_insert_request(Message, State) of
{ok, IoTDBPayload} -> {ok, IoTDBPayload} ->
handle_response( handle_response(
emqx_connector_http:on_query( emqx_bridge_http_connector:on_query(
InstanceId, {send_message, IoTDBPayload}, State InstanceId, {send_message, IoTDBPayload}, State
) )
); );
@ -142,7 +142,7 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
end, end,
[] []
}, },
emqx_connector_http:on_query_async( emqx_bridge_http_connector:on_query_async(
InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State
); );
Error -> Error ->

View File

@ -2,7 +2,8 @@
{deps, [ {deps, [
{emqx, {path, "../emqx"}}, {emqx, {path, "../emqx"}},
{emqx_utils, {path, "../emqx_utils"}} {emqx_utils, {path, "../emqx_utils"}},
{emqx_bridge_http, {path, "../emqx_bridge_http"}}
]}. ]}.
{edoc_opts, [{preprocess, true}]}. {edoc_opts, [{preprocess, true}]}.

View File

@ -5,7 +5,7 @@
{vsn, "5.0.25"}, {vsn, "5.0.25"},
{modules, []}, {modules, []},
{registered, [emqx_dashboard_sup]}, {registered, [emqx_dashboard_sup]},
{applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl]}, {applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl, emqx_bridge_http]},
{mod, {emqx_dashboard_app, []}}, {mod, {emqx_dashboard_app, []}},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -647,8 +647,9 @@ schema("/ref/complex_type") ->
{no_neg_integer, hoconsc:mk(non_neg_integer(), #{})}, {no_neg_integer, hoconsc:mk(non_neg_integer(), #{})},
{url, hoconsc:mk(url(), #{})}, {url, hoconsc:mk(url(), #{})},
{server, hoconsc:mk(emqx_schema:ip_port(), #{})}, {server, hoconsc:mk(emqx_schema:ip_port(), #{})},
{connect_timeout, hoconsc:mk(emqx_connector_http:connect_timeout(), #{})}, {connect_timeout,
{pool_type, hoconsc:mk(emqx_connector_http:pool_type(), #{})}, hoconsc:mk(emqx_bridge_http_connector:connect_timeout(), #{})},
{pool_type, hoconsc:mk(emqx_bridge_http_connector:pool_type(), #{})},
{timeout, hoconsc:mk(timeout(), #{})}, {timeout, hoconsc:mk(timeout(), #{})},
{bytesize, hoconsc:mk(emqx_schema:bytesize(), #{})}, {bytesize, hoconsc:mk(emqx_schema:bytesize(), #{})},
{wordsize, hoconsc:mk(emqx_schema:wordsize(), #{})}, {wordsize, hoconsc:mk(emqx_schema:wordsize(), #{})},

View File

@ -2,7 +2,8 @@
{deps, [ {deps, [
{emqx, {path, "../emqx"}}, {emqx, {path, "../emqx"}},
{emqx_utils, {path, "../emqx_utils"}} {emqx_utils, {path, "../emqx_utils"}},
{emqx_bridge_http, {path, "../emqx_bridge_http"}}
]}. ]}.
{edoc_opts, [{preprocess, true}]}. {edoc_opts, [{preprocess, true}]}.

View File

@ -5,7 +5,7 @@
{vsn, "5.0.26"}, {vsn, "5.0.26"},
{modules, []}, {modules, []},
{registered, [emqx_management_sup]}, {registered, [emqx_management_sup]},
{applications, [kernel, stdlib, emqx_plugins, minirest, emqx, emqx_ctl]}, {applications, [kernel, stdlib, emqx_plugins, minirest, emqx, emqx_ctl, emqx_bridge_http]},
{mod, {emqx_mgmt_app, []}}, {mod, {emqx_mgmt_app, []}},
{env, []}, {env, []},
{licenses, ["Apache-2.0"]}, {licenses, ["Apache-2.0"]},

View File

@ -153,7 +153,7 @@ api_path_without_base_path(Parts) ->
join_http_path([]) -> join_http_path([]) ->
[]; [];
join_http_path([Part | Rest]) -> join_http_path([Part | Rest]) ->
lists:foldl(fun(P, Acc) -> emqx_connector_http:join_paths(Acc, P) end, Part, Rest). lists:foldl(fun(P, Acc) -> emqx_bridge_http_connector:join_paths(Acc, P) end, Part, Rest).
%% Usage: %% Usage:
%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>, %% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>,

View File

@ -1,6 +1,7 @@
{deps, [ {deps, [
{emqx, {path, "../../apps/emqx"}}, {emqx, {path, "../../apps/emqx"}},
{erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}} {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}},
{emqx_bridge_http, {path, "../emqx_bridge_http"}}
]}. ]}.
{project_plugins, [erlfmt]}. {project_plugins, [erlfmt]}.

View File

@ -8,7 +8,8 @@
stdlib, stdlib,
gproc, gproc,
erlcloud, erlcloud,
ehttpc ehttpc,
emqx_bridge_http
]}, ]},
{mod, {emqx_s3_app, []}} {mod, {emqx_s3_app, []}}
]}. ]}.

View File

@ -136,10 +136,11 @@ fields(transport_options) ->
)} )}
] ++ ] ++
props_without( props_without(
[base_url, max_retries, retry_interval, request], emqx_connector_http:fields(config) [base_url, max_retries, retry_interval, request],
emqx_bridge_http_connector:fields(config)
) ++ ) ++
props_with( props_with(
[headers, max_retries, request_timeout], emqx_connector_http:fields("request") [headers, max_retries, request_timeout], emqx_bridge_http_connector:fields("request")
). ).
desc(s3) -> desc(s3) ->

View File

@ -370,6 +370,7 @@ defmodule EMQXUmbrella.MixProject do
emqx_exhook: :permanent, emqx_exhook: :permanent,
emqx_bridge: :permanent, emqx_bridge: :permanent,
emqx_bridge_mqtt: :permanent, emqx_bridge_mqtt: :permanent,
emqx_bridge_http: :permanent,
emqx_rule_engine: :permanent, emqx_rule_engine: :permanent,
emqx_modules: :permanent, emqx_modules: :permanent,
emqx_management: :permanent, emqx_management: :permanent,

View File

@ -431,6 +431,7 @@ relx_apps(ReleaseType, Edition) ->
emqx_exhook, emqx_exhook,
emqx_bridge, emqx_bridge,
emqx_bridge_mqtt, emqx_bridge_mqtt,
emqx_bridge_http,
emqx_rule_engine, emqx_rule_engine,
emqx_modules, emqx_modules,
emqx_management, emqx_management,

View File

@ -1,4 +1,4 @@
emqx_connector_http { emqx_bridge_http_connector {
body.desc: body.desc:
"""HTTP request body.""" """HTTP request body."""