diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 674eceb81..c1de3b177 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -309,6 +309,7 @@ remove(Type, Name, _Conf, _Opts) -> emqx_resource:remove_local(resource_id(Type, Name)). %% convert bridge configs to what the connector modules want +%% TODO: remove it, if the http_bridge already ported to v2 parse_confs( <<"webhook">>, _Name, diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 5a5e790e5..3080a7a9e 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -31,9 +31,14 @@ on_query/3, on_query_async/4, on_get_status/2, - reply_delegator/3 + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). +-export([reply_delegator/3]). + -export([ roots/0, fields/1, @@ -251,6 +256,21 @@ start_pool(PoolName, PoolOpts) -> Error end. +on_add_channel( + _InstId, + OldState, + ActionId, + ActionConfig +) -> + InstalledActions = maps:get(installed_actions, OldState, #{}), + {ok, ActionState} = do_create_http_action(ActionConfig), + NewInstalledActions = maps:put(ActionId, ActionState, InstalledActions), + NewState = maps:put(installed_actions, NewInstalledActions, OldState), + {ok, NewState}. + +do_create_http_action(_ActionConfig = #{parameters := Params}) -> + {ok, preprocess_request(Params)}. + on_stop(InstId, _State) -> ?SLOG(info, #{ msg => "stopping_http_connector", @@ -260,6 +280,16 @@ on_stop(InstId, _State) -> ?tp(emqx_connector_http_stopped, #{instance_id => InstId}), Res. +on_remove_channel( + InstId, + OldState = #{installed_actions := InstalledActions}, + ActionId +) -> + NewInstalledActions = maps:remove(ActionId, InstalledActions), + NewState = maps:put(installed_actions, NewInstalledActions, OldState), + {ok, NewState}. + +%% BridgeV1 entrypoint on_query(InstId, {send_message, Msg}, State) -> case maps:get(request, State, undefined) of undefined -> @@ -282,6 +312,36 @@ on_query(InstId, {send_message, Msg}, State) -> State ) end; +%% BridgeV2 entrypoint +on_query( + InstId, + {ActionId, Msg}, + State = #{installed_actions := InstalledActions} +) when is_binary(ActionId) -> + case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of + {undefined, _} -> + ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}), + {error, arg_request_not_found}; + {_, undefined} -> + ?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}), + {error, action_not_found}; + {Request, ActionState} -> + #{ + method := Method, + path := Path, + body := Body, + headers := Headers, + request_timeout := Timeout + } = process_request_and_action(Request, ActionState, Msg), + %% bridge buffer worker has retry, do not let ehttpc retry + Retry = 2, + ClientId = maps:get(clientid, Msg, undefined), + on_query( + InstId, + {ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, + State + ) + end; on_query(InstId, {Method, Request}, State) -> %% TODO: Get retry from State on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State); @@ -343,6 +403,7 @@ on_query( Result end. +%% BridgeV1 entrypoint on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> case maps:get(request, State, undefined) of undefined -> @@ -364,6 +425,36 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> State ) end; +%% BridgeV2 entrypoint +on_query_async( + InstId, + {ActionId, Msg}, + ReplyFunAndArgs, + State = #{installed_actions := InstalledActions} +) when is_binary(ActionId) -> + case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of + {undefined, _} -> + ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}), + {error, arg_request_not_found}; + {_, undefined} -> + ?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}), + {error, action_not_found}; + {Request, ActionState} -> + #{ + method := Method, + path := Path, + body := Body, + headers := Headers, + request_timeout := Timeout + } = process_request_and_action(Request, ActionState, Msg), + ClientId = maps:get(clientid, Msg, undefined), + on_query_async( + InstId, + {ClientId, Method, {Path, Headers, Body}, Timeout}, + ReplyFunAndArgs, + State + ) + end; on_query_async( InstId, {KeyOrNum, Method, Request, Timeout}, @@ -411,6 +502,9 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) -> ehttpc_pool:pick_worker(PoolName, Key) end. +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> case do_get_status(PoolName, Timeout) of ok -> @@ -456,6 +550,14 @@ do_get_status(PoolName, Timeout) -> {error, timeout} end. +on_get_channel_status( + InstId, + _ChannelId, + State +) -> + %% XXX: Reuse the connector status + on_get_status(InstId, State). + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -529,6 +631,48 @@ maybe_parse_template(Key, Conf) -> parse_template(String) -> emqx_template:parse(String). +process_request_and_action(Request, ActionState, Msg) -> + MethodTemplate = maps:get(method, ActionState), + Method = make_method(render_template_string(MethodTemplate, Msg)), + BodyTemplate = maps:get(body, ActionState), + Body = render_request_body(BodyTemplate, Msg), + + PathTemplate1 = maps:get(path, Request), + PathTemplate2 = maps:get(path, ActionState), + + Path = join_paths( + unicode:characters_to_list(render_template(PathTemplate1, Msg)), + unicode:characters_to_list(render_template(PathTemplate2, Msg)) + ), + + HeadersTemplate1 = maps:get(headers, Request), + HeadersTemplate2= maps:get(headers, ActionState), + Headers = merge_proplist( + render_headers(HeadersTemplate1, Msg), + render_headers(HeadersTemplate2, Msg) + ), + #{ + method => Method, + path => Path, + body => Body, + headers => Headers, + request_timeout => maps:get(request_timeout, ActionState) + }. + +merge_proplist(Proplist1, Proplist2) -> + lists:foldl( + fun({K, V}, Acc) -> + case lists:keyfind(K, 1, Acc) of + false -> + [{K, V} | Acc]; + {K, _} = {K, V1} -> + [{K, V1} | Acc] + end + end, + Proplist2, + Proplist1 + ). + process_request( #{ method := MethodTemplate, diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index bc648a102..6568af666 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -77,8 +77,10 @@ connector_impl_module(_ConnectorType) -> -endif. -connector_to_resource_type_ce(_ConnectorType) -> - no_bridge_v2_for_c2_so_far. +connector_to_resource_type_ce(webhook) -> + emqx_bridge_http_connector; +connector_to_resource_type_ce(ConnectorType) -> + error({no_bridge_v2, ConnectorType}). resource_id(ConnectorId) when is_binary(ConnectorId) -> <<"connector:", ConnectorId/binary>>. @@ -275,9 +277,7 @@ parse_confs( _Name, #{ url := Url, - method := Method, - headers := Headers, - max_retries := Retry + headers := Headers } = Conf ) -> Url1 = bin(Url), @@ -290,20 +290,14 @@ parse_confs( Reason1 = emqx_utils:readable_error_msg(Reason), invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>) end, - RequestTTL = emqx_utils_maps:deep_get( - [resource_opts, request_ttl], - Conf - ), Conf#{ base_url => BaseUrl1, request => #{ path => Path, - method => Method, - body => maps:get(body, Conf, undefined), headers => Headers, - request_ttl => RequestTTL, - max_retries => Retry + body => undefined, + method => undefined } }; parse_confs(<<"iotdb">>, Name, Conf) ->