diff --git a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl similarity index 100% rename from apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl rename to apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl similarity index 96% rename from apps/emqx_bridge/src/emqx_bridge_schema.erl rename to apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index 46039390d..4343dc223 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -27,15 +27,15 @@ %%====================================================================================== %% For HTTP APIs get_response() -> - http_schema("get"). + api_schema("get"). put_request() -> - http_schema("put"). + api_schema("put"). post_request() -> - http_schema("post"). + api_schema("post"). -http_schema(Method) -> +api_schema(Method) -> Broker = lists:flatmap( fun(Type) -> @@ -46,17 +46,17 @@ http_schema(Method) -> end, ?CONN_TYPES ) ++ [ref(Module, Method) || Module <- [emqx_bridge_webhook_schema]], - EE = ee_schemas(Method), + EE = ee_api_schemas(Method), hoconsc:union(Broker ++ EE). -if(?EMQX_RELEASE_EDITION == ee). -ee_schemas(Method) -> +ee_api_schemas(Method) -> emqx_ee_bridge:api_schemas(Method). ee_fields_bridges() -> emqx_ee_bridge:fields(bridges). -else. -ee_schemas(_) -> +ee_api_schemas(_) -> []. ee_fields_bridges() -> diff --git a/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl similarity index 100% rename from apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl rename to apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index c5a1b89db..11030fef6 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -30,6 +30,7 @@ on_start/2, on_stop/2, on_query/3, + on_query_async/4, on_get_status/2 ]). @@ -165,7 +166,7 @@ ref(Field) -> hoconsc:ref(?MODULE, Field). %% =================================================================== -callback_mode() -> always_sync. +callback_mode() -> async_if_possible. on_start( InstId, @@ -231,7 +232,8 @@ on_stop(InstId, #{pool_name := PoolName}) -> on_query(InstId, {send_message, Msg}, State) -> case maps:get(request, State, undefined) of undefined -> - ?SLOG(error, #{msg => "request_not_found", connector => InstId}); + ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}), + {error, arg_request_not_found}; Request -> #{ method := Method, @@ -302,6 +304,51 @@ on_query( end, Result. +on_query_async(InstId, {send_message, Msg}, ReplyFun, State) -> + case maps:get(request, State, undefined) of + undefined -> + ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}), + {error, arg_request_not_found}; + Request -> + #{ + method := Method, + path := Path, + body := Body, + headers := Headers, + request_timeout := Timeout + } = process_request(Request, Msg), + on_query_async( + InstId, + {undefined, Method, {Path, Headers, Body}, Timeout}, + ReplyFun, + State + ) + end; +on_query_async( + InstId, + {KeyOrNum, Method, Request, Timeout}, + ReplyFun, + #{pool_name := PoolName, base_path := BasePath} = State +) -> + ?TRACE( + "QUERY_ASYNC", + "http_connector_received", + #{request => Request, connector => InstId, state => State} + ), + NRequest = formalize_request(Method, BasePath, Request), + Worker = + case KeyOrNum of + undefined -> ehttpc_pool:pick_worker(PoolName); + _ -> ehttpc_pool:pick_worker(PoolName, KeyOrNum) + end, + ok = ehttpc:request_async( + Worker, + Method, + NRequest, + Timeout, + ReplyFun + ). + on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> case do_get_status(PoolName, Timeout) of true -> @@ -343,7 +390,6 @@ do_get_status(PoolName, Timeout) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- - preprocess_request(undefined) -> undefined; preprocess_request(Req) when map_size(Req) == 0 -> diff --git a/mix.exs b/mix.exs index 5209de843..2e4765ce1 100644 --- a/mix.exs +++ b/mix.exs @@ -47,7 +47,7 @@ defmodule EMQXUmbrella.MixProject do {:lc, github: "emqx/lc", tag: "0.3.1"}, {:redbug, "2.0.7"}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, - {:ehttpc, github: "emqx/ehttpc", tag: "0.3.0", override: true}, + {:ehttpc, github: "emqx/ehttpc", tag: "0.4.0", override: true}, {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, diff --git a/rebar.config b/rebar.config index 3d3d5968f..bc5c8d396 100644 --- a/rebar.config +++ b/rebar.config @@ -49,7 +49,7 @@ , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}} - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.3.0"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.0"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}