feat: impl the http bridge v2

This commit is contained in:
JianBo He 2023-11-20 10:38:06 +08:00
parent f9a1e747fd
commit 96af7a74e8
3 changed files with 153 additions and 14 deletions

View File

@ -309,6 +309,7 @@ remove(Type, Name, _Conf, _Opts) ->
emqx_resource:remove_local(resource_id(Type, Name)).
%% convert bridge configs to what the connector modules want
%% TODO: remove it, if the http_bridge already ported to v2
parse_confs(
<<"webhook">>,
_Name,

View File

@ -31,9 +31,14 @@
on_query/3,
on_query_async/4,
on_get_status/2,
reply_delegator/3
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]).
-export([reply_delegator/3]).
-export([
roots/0,
fields/1,
@ -251,6 +256,21 @@ start_pool(PoolName, PoolOpts) ->
Error
end.
on_add_channel(
_InstId,
OldState,
ActionId,
ActionConfig
) ->
InstalledActions = maps:get(installed_actions, OldState, #{}),
{ok, ActionState} = do_create_http_action(ActionConfig),
NewInstalledActions = maps:put(ActionId, ActionState, InstalledActions),
NewState = maps:put(installed_actions, NewInstalledActions, OldState),
{ok, NewState}.
do_create_http_action(_ActionConfig = #{parameters := Params}) ->
{ok, preprocess_request(Params)}.
on_stop(InstId, _State) ->
?SLOG(info, #{
msg => "stopping_http_connector",
@ -260,6 +280,16 @@ on_stop(InstId, _State) ->
?tp(emqx_connector_http_stopped, #{instance_id => InstId}),
Res.
on_remove_channel(
InstId,
OldState = #{installed_actions := InstalledActions},
ActionId
) ->
NewInstalledActions = maps:remove(ActionId, InstalledActions),
NewState = maps:put(installed_actions, NewInstalledActions, OldState),
{ok, NewState}.
%% BridgeV1 entrypoint
on_query(InstId, {send_message, Msg}, State) ->
case maps:get(request, State, undefined) of
undefined ->
@ -282,6 +312,36 @@ on_query(InstId, {send_message, Msg}, State) ->
State
)
end;
%% BridgeV2 entrypoint
on_query(
InstId,
{ActionId, Msg},
State = #{installed_actions := InstalledActions}
) when is_binary(ActionId) ->
case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
{undefined, _} ->
?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
{error, arg_request_not_found};
{_, undefined} ->
?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}),
{error, action_not_found};
{Request, ActionState} ->
#{
method := Method,
path := Path,
body := Body,
headers := Headers,
request_timeout := Timeout
} = process_request_and_action(Request, ActionState, Msg),
%% bridge buffer worker has retry, do not let ehttpc retry
Retry = 2,
ClientId = maps:get(clientid, Msg, undefined),
on_query(
InstId,
{ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
State
)
end;
on_query(InstId, {Method, Request}, State) ->
%% TODO: Get retry from State
on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State);
@ -343,6 +403,7 @@ on_query(
Result
end.
%% BridgeV1 entrypoint
on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
case maps:get(request, State, undefined) of
undefined ->
@ -364,6 +425,36 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
State
)
end;
%% BridgeV2 entrypoint
on_query_async(
InstId,
{ActionId, Msg},
ReplyFunAndArgs,
State = #{installed_actions := InstalledActions}
) when is_binary(ActionId) ->
case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
{undefined, _} ->
?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
{error, arg_request_not_found};
{_, undefined} ->
?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}),
{error, action_not_found};
{Request, ActionState} ->
#{
method := Method,
path := Path,
body := Body,
headers := Headers,
request_timeout := Timeout
} = process_request_and_action(Request, ActionState, Msg),
ClientId = maps:get(clientid, Msg, undefined),
on_query_async(
InstId,
{ClientId, Method, {Path, Headers, Body}, Timeout},
ReplyFunAndArgs,
State
)
end;
on_query_async(
InstId,
{KeyOrNum, Method, Request, Timeout},
@ -411,6 +502,9 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->
ehttpc_pool:pick_worker(PoolName, Key)
end.
on_get_channels(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId).
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of
ok ->
@ -456,6 +550,14 @@ do_get_status(PoolName, Timeout) ->
{error, timeout}
end.
on_get_channel_status(
InstId,
_ChannelId,
State
) ->
%% XXX: Reuse the connector status
on_get_status(InstId, State).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
@ -529,6 +631,48 @@ maybe_parse_template(Key, Conf) ->
parse_template(String) ->
emqx_template:parse(String).
process_request_and_action(Request, ActionState, Msg) ->
MethodTemplate = maps:get(method, ActionState),
Method = make_method(render_template_string(MethodTemplate, Msg)),
BodyTemplate = maps:get(body, ActionState),
Body = render_request_body(BodyTemplate, Msg),
PathTemplate1 = maps:get(path, Request),
PathTemplate2 = maps:get(path, ActionState),
Path = join_paths(
unicode:characters_to_list(render_template(PathTemplate1, Msg)),
unicode:characters_to_list(render_template(PathTemplate2, Msg))
),
HeadersTemplate1 = maps:get(headers, Request),
HeadersTemplate2= maps:get(headers, ActionState),
Headers = merge_proplist(
render_headers(HeadersTemplate1, Msg),
render_headers(HeadersTemplate2, Msg)
),
#{
method => Method,
path => Path,
body => Body,
headers => Headers,
request_timeout => maps:get(request_timeout, ActionState)
}.
merge_proplist(Proplist1, Proplist2) ->
lists:foldl(
fun({K, V}, Acc) ->
case lists:keyfind(K, 1, Acc) of
false ->
[{K, V} | Acc];
{K, _} = {K, V1} ->
[{K, V1} | Acc]
end
end,
Proplist2,
Proplist1
).
process_request(
#{
method := MethodTemplate,

View File

@ -77,8 +77,10 @@ connector_impl_module(_ConnectorType) ->
-endif.
connector_to_resource_type_ce(_ConnectorType) ->
no_bridge_v2_for_c2_so_far.
connector_to_resource_type_ce(webhook) ->
emqx_bridge_http_connector;
connector_to_resource_type_ce(ConnectorType) ->
error({no_bridge_v2, ConnectorType}).
resource_id(ConnectorId) when is_binary(ConnectorId) ->
<<"connector:", ConnectorId/binary>>.
@ -275,9 +277,7 @@ parse_confs(
_Name,
#{
url := Url,
method := Method,
headers := Headers,
max_retries := Retry
headers := Headers
} = Conf
) ->
Url1 = bin(Url),
@ -290,20 +290,14 @@ parse_confs(
Reason1 = emqx_utils:readable_error_msg(Reason),
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
end,
RequestTTL = emqx_utils_maps:deep_get(
[resource_opts, request_ttl],
Conf
),
Conf#{
base_url => BaseUrl1,
request =>
#{
path => Path,
method => Method,
body => maps:get(body, Conf, undefined),
headers => Headers,
request_ttl => RequestTTL,
max_retries => Retry
body => undefined,
method => undefined
}
};
parse_confs(<<"iotdb">>, Name, Conf) ->