Merge pull request #8692 from emqx/http_async
feat: http connector support async sending
This commit is contained in:
commit
58f56bae31
|
@ -27,15 +27,15 @@
|
||||||
%%======================================================================================
|
%%======================================================================================
|
||||||
%% For HTTP APIs
|
%% For HTTP APIs
|
||||||
get_response() ->
|
get_response() ->
|
||||||
http_schema("get").
|
api_schema("get").
|
||||||
|
|
||||||
put_request() ->
|
put_request() ->
|
||||||
http_schema("put").
|
api_schema("put").
|
||||||
|
|
||||||
post_request() ->
|
post_request() ->
|
||||||
http_schema("post").
|
api_schema("post").
|
||||||
|
|
||||||
http_schema(Method) ->
|
api_schema(Method) ->
|
||||||
Broker =
|
Broker =
|
||||||
lists:flatmap(
|
lists:flatmap(
|
||||||
fun(Type) ->
|
fun(Type) ->
|
||||||
|
@ -46,17 +46,17 @@ http_schema(Method) ->
|
||||||
end,
|
end,
|
||||||
?CONN_TYPES
|
?CONN_TYPES
|
||||||
) ++ [ref(Module, Method) || Module <- [emqx_bridge_webhook_schema]],
|
) ++ [ref(Module, Method) || Module <- [emqx_bridge_webhook_schema]],
|
||||||
EE = ee_schemas(Method),
|
EE = ee_api_schemas(Method),
|
||||||
hoconsc:union(Broker ++ EE).
|
hoconsc:union(Broker ++ EE).
|
||||||
|
|
||||||
-if(?EMQX_RELEASE_EDITION == ee).
|
-if(?EMQX_RELEASE_EDITION == ee).
|
||||||
ee_schemas(Method) ->
|
ee_api_schemas(Method) ->
|
||||||
emqx_ee_bridge:api_schemas(Method).
|
emqx_ee_bridge:api_schemas(Method).
|
||||||
|
|
||||||
ee_fields_bridges() ->
|
ee_fields_bridges() ->
|
||||||
emqx_ee_bridge:fields(bridges).
|
emqx_ee_bridge:fields(bridges).
|
||||||
-else.
|
-else.
|
||||||
ee_schemas(_) ->
|
ee_api_schemas(_) ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
ee_fields_bridges() ->
|
ee_fields_bridges() ->
|
|
@ -30,6 +30,7 @@
|
||||||
on_start/2,
|
on_start/2,
|
||||||
on_stop/2,
|
on_stop/2,
|
||||||
on_query/3,
|
on_query/3,
|
||||||
|
on_query_async/4,
|
||||||
on_get_status/2
|
on_get_status/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -165,7 +166,7 @@ ref(Field) -> hoconsc:ref(?MODULE, Field).
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
|
|
||||||
callback_mode() -> always_sync.
|
callback_mode() -> async_if_possible.
|
||||||
|
|
||||||
on_start(
|
on_start(
|
||||||
InstId,
|
InstId,
|
||||||
|
@ -231,7 +232,8 @@ on_stop(InstId, #{pool_name := PoolName}) ->
|
||||||
on_query(InstId, {send_message, Msg}, State) ->
|
on_query(InstId, {send_message, Msg}, State) ->
|
||||||
case maps:get(request, State, undefined) of
|
case maps:get(request, State, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
?SLOG(error, #{msg => "request_not_found", connector => InstId});
|
?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
|
||||||
|
{error, arg_request_not_found};
|
||||||
Request ->
|
Request ->
|
||||||
#{
|
#{
|
||||||
method := Method,
|
method := Method,
|
||||||
|
@ -302,6 +304,51 @@ on_query(
|
||||||
end,
|
end,
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
|
on_query_async(InstId, {send_message, Msg}, ReplyFun, State) ->
|
||||||
|
case maps:get(request, State, undefined) of
|
||||||
|
undefined ->
|
||||||
|
?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
|
||||||
|
{error, arg_request_not_found};
|
||||||
|
Request ->
|
||||||
|
#{
|
||||||
|
method := Method,
|
||||||
|
path := Path,
|
||||||
|
body := Body,
|
||||||
|
headers := Headers,
|
||||||
|
request_timeout := Timeout
|
||||||
|
} = process_request(Request, Msg),
|
||||||
|
on_query_async(
|
||||||
|
InstId,
|
||||||
|
{undefined, Method, {Path, Headers, Body}, Timeout},
|
||||||
|
ReplyFun,
|
||||||
|
State
|
||||||
|
)
|
||||||
|
end;
|
||||||
|
on_query_async(
|
||||||
|
InstId,
|
||||||
|
{KeyOrNum, Method, Request, Timeout},
|
||||||
|
ReplyFun,
|
||||||
|
#{pool_name := PoolName, base_path := BasePath} = State
|
||||||
|
) ->
|
||||||
|
?TRACE(
|
||||||
|
"QUERY_ASYNC",
|
||||||
|
"http_connector_received",
|
||||||
|
#{request => Request, connector => InstId, state => State}
|
||||||
|
),
|
||||||
|
NRequest = formalize_request(Method, BasePath, Request),
|
||||||
|
Worker =
|
||||||
|
case KeyOrNum of
|
||||||
|
undefined -> ehttpc_pool:pick_worker(PoolName);
|
||||||
|
_ -> ehttpc_pool:pick_worker(PoolName, KeyOrNum)
|
||||||
|
end,
|
||||||
|
ok = ehttpc:request_async(
|
||||||
|
Worker,
|
||||||
|
Method,
|
||||||
|
NRequest,
|
||||||
|
Timeout,
|
||||||
|
ReplyFun
|
||||||
|
).
|
||||||
|
|
||||||
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
|
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
|
||||||
case do_get_status(PoolName, Timeout) of
|
case do_get_status(PoolName, Timeout) of
|
||||||
true ->
|
true ->
|
||||||
|
@ -343,7 +390,6 @@ do_get_status(PoolName, Timeout) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
preprocess_request(undefined) ->
|
preprocess_request(undefined) ->
|
||||||
undefined;
|
undefined;
|
||||||
preprocess_request(Req) when map_size(Req) == 0 ->
|
preprocess_request(Req) when map_size(Req) == 0 ->
|
||||||
|
|
2
mix.exs
2
mix.exs
|
@ -47,7 +47,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
{:lc, github: "emqx/lc", tag: "0.3.1"},
|
{:lc, github: "emqx/lc", tag: "0.3.1"},
|
||||||
{:redbug, "2.0.7"},
|
{:redbug, "2.0.7"},
|
||||||
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
|
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
|
||||||
{:ehttpc, github: "emqx/ehttpc", tag: "0.3.0", override: true},
|
{:ehttpc, github: "emqx/ehttpc", tag: "0.4.0", override: true},
|
||||||
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
|
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
|
||||||
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
|
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
|
||||||
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
|
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
|
||||||
|
|
|
@ -49,7 +49,7 @@
|
||||||
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
||||||
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
|
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
|
||||||
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
|
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
|
||||||
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.3.0"}}}
|
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.0"}}}
|
||||||
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
||||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
||||||
|
|
Loading…
Reference in New Issue