feat: http connector support async sending

This commit is contained in:
Shawn 2022-08-11 18:13:27 +08:00
parent 03b004e0ce
commit 88388b0c54
6 changed files with 58 additions and 12 deletions

View File

@ -27,15 +27,15 @@
%%======================================================================================
%% For HTTP APIs
get_response() ->
http_schema("get").
api_schema("get").
put_request() ->
http_schema("put").
api_schema("put").
post_request() ->
http_schema("post").
api_schema("post").
http_schema(Method) ->
api_schema(Method) ->
Broker =
lists:flatmap(
fun(Type) ->
@ -46,17 +46,17 @@ http_schema(Method) ->
end,
?CONN_TYPES
) ++ [ref(Module, Method) || Module <- [emqx_bridge_webhook_schema]],
EE = ee_schemas(Method),
EE = ee_api_schemas(Method),
hoconsc:union(Broker ++ EE).
-if(?EMQX_RELEASE_EDITION == ee).
ee_schemas(Method) ->
ee_api_schemas(Method) ->
emqx_ee_bridge:api_schemas(Method).
ee_fields_bridges() ->
emqx_ee_bridge:fields(bridges).
-else.
ee_schemas(_) ->
ee_api_schemas(_) ->
[].
ee_fields_bridges() ->

View File

@ -30,6 +30,7 @@
on_start/2,
on_stop/2,
on_query/3,
on_query_async/4,
on_get_status/2
]).
@ -165,7 +166,7 @@ ref(Field) -> hoconsc:ref(?MODULE, Field).
%% ===================================================================
callback_mode() -> always_sync.
callback_mode() -> async_if_possible.
on_start(
InstId,
@ -231,7 +232,8 @@ on_stop(InstId, #{pool_name := PoolName}) ->
on_query(InstId, {send_message, Msg}, State) ->
case maps:get(request, State, undefined) of
undefined ->
?SLOG(error, #{msg => "request_not_found", connector => InstId});
?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
{error, arg_request_not_found};
Request ->
#{
method := Method,
@ -302,6 +304,51 @@ on_query(
end,
Result.
on_query_async(InstId, {send_message, Msg}, ReplyFun, State) ->
case maps:get(request, State, undefined) of
undefined ->
?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
{error, arg_request_not_found};
Request ->
#{
method := Method,
path := Path,
body := Body,
headers := Headers,
request_timeout := Timeout
} = process_request(Request, Msg),
on_query_async(
InstId,
{undefined, Method, {Path, Headers, Body}, Timeout},
ReplyFun,
State
)
end;
on_query_async(
InstId,
{KeyOrNum, Method, Request, Timeout},
ReplyFun,
#{pool_name := PoolName, base_path := BasePath} = State
) ->
?TRACE(
"QUERY_ASYNC",
"http_connector_received",
#{request => Request, connector => InstId, state => State}
),
NRequest = formalize_request(Method, BasePath, Request),
Worker =
case KeyOrNum of
undefined -> ehttpc_pool:pick_worker(PoolName);
_ -> ehttpc_pool:pick_worker(PoolName, KeyOrNum)
end,
ok = ehttpc:request_async(
Worker,
Method,
NRequest,
Timeout,
ReplyFun
).
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of
true ->
@ -343,7 +390,6 @@ do_get_status(PoolName, Timeout) ->
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
preprocess_request(undefined) ->
undefined;
preprocess_request(Req) when map_size(Req) == 0 ->

View File

@ -47,7 +47,7 @@ defmodule EMQXUmbrella.MixProject do
{:lc, github: "emqx/lc", tag: "0.3.1"},
{:redbug, "2.0.7"},
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
{:ehttpc, github: "emqx/ehttpc", tag: "0.3.0", override: true},
{:ehttpc, github: "emqx/ehttpc", tag: "0.4.0", override: true},
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},

View File

@ -49,7 +49,7 @@
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.3.0"}}}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.0"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}