From f9a1e747fd953dc0bc955a70da11fc630c9588ed Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 15 Nov 2023 18:24:28 +0800 Subject: [PATCH 1/7] chore(http): break the bridge confs to connector and action parts --- .../src/emqx_bridge_http.app.src | 2 +- .../src/emqx_bridge_http_action_info.erl | 34 ++ .../src/emqx_bridge_http_schema.erl | 346 ++++++++++++++---- .../emqx_connector/src/emqx_connector_api.erl | 12 +- .../src/schema/emqx_connector_ee_schema.erl | 15 +- .../src/schema/emqx_connector_schema.erl | 48 ++- rel/i18n/emqx_bridge_http_schema.hocon | 13 +- 7 files changed, 369 insertions(+), 101 deletions(-) create mode 100644 apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src index 87d7e57a6..0e82d1635 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src +++ b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src @@ -3,7 +3,7 @@ {vsn, "0.1.5"}, {registered, []}, {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]}, - {env, []}, + {env, [{emqx_action_info_module, emqx_bridge_http_action_info}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl new file mode 100644 index 000000000..41be4f1e8 --- /dev/null +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl @@ -0,0 +1,34 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_http_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> webhook. + +action_type_name() -> webhook. + +connector_type_name() -> webhook. + +schema_module() -> emqx_bridge_http_schema. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl index 2e3d882d5..afe734105 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl @@ -18,18 +18,26 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --import(hoconsc, [mk/2, enum/1, ref/2]). +-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]). -export([roots/0, fields/1, namespace/0, desc/1]). +-export([ + bridge_v2_examples/1, + %%conn_bridge_examples/1, + connector_examples/1 +]). + %%====================================================================================== %% Hocon Schema Definitions + namespace() -> "bridge_webhook". roots() -> []. -fields("config") -> - basic_config() ++ request_config(); +%%-------------------------------------------------------------------- +%% v1 bridges http api +%% see: emqx_bridge_schema:get_response/0, put_request/0, post_request/0 fields("post") -> [ type_field(), @@ -39,48 +47,119 @@ fields("put") -> fields("config"); fields("get") -> emqx_bridge_schema:status_fields() ++ fields("post"); -fields("creation_opts") -> +%%--- v1 bridges config file +%% see: emqx_bridge_schema:fields(bridges) +fields("config") -> + basic_config() ++ request_config(); +%%-------------------------------------------------------------------- +%% v2: configuration +fields(action) -> + %% XXX: Do we need to rename it to `http`? + {webhook, + mk( + hoconsc:map(name, ref(?MODULE, webhook_action)), + #{ + desc => <<"HTTP Action Config">>, + required => false + } + )}; +fields(webhook_action) -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {connector, + mk(binary(), #{ + desc => ?DESC(emqx_connector_schema, "connector_field"), required => true + })}, + {description, emqx_schema:description_schema()}, + %% Note: there's an implicit convention in `emqx_bridge' that, + %% for egress bridges with this config, the published messages + %% will be forwarded to such bridges. + {local_topic, + mk(binary(), #{required => false, desc => ?DESC(emqx_bridge_kafka, mqtt_topic)})}, + %% Since e5.3.2, we split the webhook_bridge to two parts: a) connector. b) actions. + %% some fields are moved to connector, some fields are moved to actions and composed into the + %% `parameters` field. + {parameters, + mk(ref(parameters_opts), #{ + required => true, + desc => ?DESC(parameters_opts) + %% TODO: + %%validator => fun producer_strategy_key_validator/1 + })} + ] ++ webhook_resource_opts(); +fields(parameters_opts) -> + [ + {path, + mk( + binary(), + #{ + desc => ?DESC("config_path"), + required => false + } + )}, + method_field(), + headers_field(), + body_field() + ]; +%% v2: api schema +%% The parameter equls to +%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1 +%% `get_connector`, `post_connector`, `put_connector` from emqx_connector_schema:api_schema/1 +fields("post_" ++ Type) -> + [type_field(), name_field() | fields("config_" ++ Type)]; +fields("put_" ++ Type) -> + fields("config_" ++ Type); +fields("get_" ++ Type) -> + emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type); +fields("config_bridge_v2") -> + fields(webhook_action); +fields("config_connector") -> + [ + {enable, + mk( + boolean(), + #{ + desc => <<"Enable or disable this connector">>, + default => true + } + )} + ] ++ connector_opts_1() ++ connector_opts_0(); +%%-------------------------------------------------------------------- +%% v1/v2 +fields("resource_opts") -> + UnsupportedOpts = [enable_batch, batch_size, batch_time], lists:filter( - fun({K, _V}) -> - not lists:member(K, unsupported_opts()) - end, + fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end, emqx_resource_schema:fields("creation_opts") ). desc("config") -> ?DESC("desc_config"); -desc("creation_opts") -> +desc("resource_opts") -> ?DESC(emqx_resource_schema, "creation_opts"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for WebHook using `", string:to_upper(Method), "` method."]; desc(_) -> undefined. +%%-------------------------------------------------------------------- +%% helpers for v1 only + basic_config() -> [ {enable, mk( boolean(), #{ - desc => ?DESC("config_enable"), + desc => ?DESC("config_enable_bridge"), default => true } )} - ] ++ webhook_creation_opts() ++ - proplists:delete( - max_retries, emqx_bridge_http_connector:fields(config) - ). + ] ++ webhook_resource_opts() ++ connector_opts_0(). request_config() -> [ - {url, - mk( - binary(), - #{ - required => true, - desc => ?DESC("config_url") - } - )}, + url_field(), {direction, mk( egress, @@ -98,36 +177,9 @@ request_config() -> required => false } )}, - {method, - mk( - method(), - #{ - default => post, - desc => ?DESC("config_method") - } - )}, - {headers, - mk( - map(), - #{ - default => #{ - <<"accept">> => <<"application/json">>, - <<"cache-control">> => <<"no-cache">>, - <<"connection">> => <<"keep-alive">>, - <<"content-type">> => <<"application/json">>, - <<"keep-alive">> => <<"timeout=5">> - }, - desc => ?DESC("config_headers") - } - )}, - {body, - mk( - binary(), - #{ - default => undefined, - desc => ?DESC("config_body") - } - )}, + method_field(), + headers_field(), + body_field(), {max_retries, mk( non_neg_integer(), @@ -147,27 +199,14 @@ request_config() -> )} ]. -webhook_creation_opts() -> - [ - {resource_opts, - mk( - ref(?MODULE, "creation_opts"), - #{ - required => false, - default => #{}, - desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) - } - )} - ]. +%%-------------------------------------------------------------------- +%% helpers for v2 only -unsupported_opts() -> - [ - enable_batch, - batch_size, - batch_time - ]. +connector_opts_1() -> + [url_field(), headers_field()]. -%%====================================================================================== +%%-------------------------------------------------------------------- +%% common funcs type_field() -> {type, @@ -189,5 +228,168 @@ name_field() -> } )}. -method() -> - enum([post, put, get, delete]). +url_field() -> + {url, + mk( + binary(), + #{ + required => true, + desc => ?DESC("config_url") + } + )}. + +headers_field() -> + {headers, + mk( + map(), + #{ + default => #{ + <<"accept">> => <<"application/json">>, + <<"cache-control">> => <<"no-cache">>, + <<"connection">> => <<"keep-alive">>, + <<"content-type">> => <<"application/json">>, + <<"keep-alive">> => <<"timeout=5">> + }, + desc => ?DESC("config_headers") + } + )}. + +method_field() -> + {method, + mk( + enum([post, put, get, delete]), + #{ + default => post, + desc => ?DESC("config_method") + } + )}. + +body_field() -> + {body, + mk( + binary(), + #{ + default => undefined, + desc => ?DESC("config_body") + } + )}. + +webhook_resource_opts() -> + [ + {resource_opts, + mk( + ref(?MODULE, "resource_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ]. + +connector_opts_0() -> + mark_request_field_deperecated( + proplists:delete(max_retries, emqx_bridge_http_connector:fields(config)) + ). + +mark_request_field_deperecated(Fields) -> + lists:map( + fun({K, V}) -> + case K of + request -> + {K, V#{ + %% Note: if we want to deprecate a reference type, we have to change + %% it to a direct type first. + type => typerefl:map(), + deprecated => {since, "5.3.2"}, + desc => <<"This field is never used, so we deprecated it since 5.3.2.">> + }}; + _ -> + {K, V} + end + end, + Fields + ). + +%%-------------------------------------------------------------------- +%% Examples + +bridge_v2_examples(Method) -> + [ + #{ + <<"webhook">> => #{ + summary => <<"Webhook Action">>, + value => values({Method, bridge_v2}) + } + } + ]. + +connector_examples(Method) -> + [ + #{ + <<"webhook">> => #{ + summary => <<"Webhook Connector">>, + value => values({Method, connector}) + } + } + ]. + +values({get, Type}) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values({post, Type}) + ); +values({post, bridge_v2}) -> + maps:merge( + #{ + name => <<"my_webhook_action">>, + type => <<"webhook">> + }, + values({put, bridge_v2}) + ); +values({post, connector}) -> + maps:merge( + #{ + name => <<"my_webhook_connector">>, + type => <<"webhook">> + }, + values({put, connector}) + ); +values({put, bridge_v2}) -> + values(bridge_v2); +values({put, connector}) -> + values(connector); +values(bridge_v2) -> + #{ + enable => true, + connector => <<"my_webhook_connector">>, + parameters => #{ + path => <<"/room/${room_no}">>, + method => <<"post">>, + headers => #{}, + body => <<"${.}">> + }, + resource_opts => #{ + worker_pool_size => 16, + health_check_interval => <<"15s">>, + query_mode => <<"async">> + } + }; +values(connector) -> + #{ + enable => true, + url => <<"http://localhost:8080/api/v1">>, + headers => #{<<"content-type">> => <<"application/json">>}, + connect_timeout => <<"15s">>, + pool_type => <<"hash">>, + pool_size => 1, + enable_pipelining => 100 + }. diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index f6e0c0f95..58db17a03 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -158,17 +158,7 @@ connector_info_array_example(Method) -> lists:map(fun(#{value := Config}) -> Config end, maps:values(connector_info_examples(Method))). connector_info_examples(Method) -> - maps:merge( - #{}, - emqx_enterprise_connector_examples(Method) - ). - --if(?EMQX_RELEASE_EDITION == ee). -emqx_enterprise_connector_examples(Method) -> - emqx_connector_ee_schema:examples(Method). --else. -emqx_enterprise_connector_examples(_Method) -> #{}. --endif. + emqx_connector_schema:examples(Method). schema("/connectors") -> #{ diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index c8ec8e1be..ef101ad28 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -15,7 +15,8 @@ -export([ api_schemas/1, fields/1, - examples/1 + %%examples/1 + schema_modules/0 ]). resource_type(Type) when is_binary(Type) -> @@ -59,18 +60,6 @@ connector_structs() -> )} ]. -examples(Method) -> - MergeFun = - fun(Example, Examples) -> - maps:merge(Examples, Example) - end, - Fun = - fun(Module, Examples) -> - ConnectorExamples = erlang:apply(Module, connector_examples, [Method]), - lists:foldl(MergeFun, Examples, ConnectorExamples) - end, - lists:foldl(Fun, #{}, schema_modules()). - schema_modules() -> [ emqx_bridge_kafka, diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index de1ffb26b..51d716182 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -33,8 +33,11 @@ -export([connector_type_to_bridge_types/1]). + -export([resource_opts_fields/0, resource_opts_fields/1]). +-export([examples/1]). + -if(?EMQX_RELEASE_EDITION == ee). enterprise_api_schemas(Method) -> %% We *must* do this to ensure the module is really loaded, especially when we use @@ -64,6 +67,37 @@ enterprise_fields_connectors() -> []. -endif. +api_schemas(Method) -> + [ + %% We need to map the `type' field of a request (binary) to a + %% connector schema module. + api_ref(emqx_bridge_http_schema, <<"webhook">>, Method ++ "_connector") + ]. + +api_ref(Module, Type, Method) -> + {Type, ref(Module, Method)}. + +examples(Method) -> + MergeFun = + fun(Example, Examples) -> + maps:merge(Examples, Example) + end, + Fun = + fun(Module, Examples) -> + ConnectorExamples = erlang:apply(Module, connector_examples, [Method]), + lists:foldl(MergeFun, Examples, ConnectorExamples) + end, + lists:foldl(Fun, #{}, schema_modules()). + +-if(?EMQX_RELEASE_EDITION == ee). +schema_modules() -> + [emqx_bridge_http_schema] ++ emqx_connector_ee_schema:schema_modules(). +-else. +schema_modules() -> + [emqx_bridge_http_schema]. +-endif. + +connector_type_to_bridge_types(webhook) -> [webhook]; connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]. @@ -298,8 +332,9 @@ post_request() -> api_schema("post"). api_schema(Method) -> + CE = api_schemas(Method), EE = enterprise_api_schemas(Method), - hoconsc:union(connector_api_union(EE)). + hoconsc:union(connector_api_union(CE ++ EE)). connector_api_union(Refs) -> Index = maps:from_list(Refs), @@ -344,7 +379,16 @@ roots() -> end. fields(connectors) -> - [] ++ enterprise_fields_connectors(). + [ + {webhook, + mk( + hoconsc:map(name, ref(emqx_bridge_http_schema, "config_connector")), + #{ + desc => <<"HTTP Connector Config">>, + required => false + } + )} + ] ++ enterprise_fields_connectors(). desc(connectors) -> ?DESC("desc_connectors"); diff --git a/rel/i18n/emqx_bridge_http_schema.hocon b/rel/i18n/emqx_bridge_http_schema.hocon index b7b715db1..197ce0b36 100644 --- a/rel/i18n/emqx_bridge_http_schema.hocon +++ b/rel/i18n/emqx_bridge_http_schema.hocon @@ -18,10 +18,10 @@ config_direction.desc: config_direction.label: """Bridge Direction""" -config_enable.desc: +config_enable_bridge.desc: """Enable or disable this bridge""" -config_enable.label: +config_enable_bridge.label: """Enable Or Disable Bridge""" config_headers.desc: @@ -71,6 +71,15 @@ is not allowed.""" config_url.label: """HTTP Bridge""" +config_path.desc: +"""The URL path for this Action.
+This path will be appended to the Connector's url configuration to form the full +URL address. +Template with variables is allowed in this option. For example, /room/{$room_no}""" + +config_path.label: +"""URL Path""" + desc_config.desc: """Configuration for an HTTP bridge.""" From 96af7a74e8e92821b327b093fa6468c8ac7cd818 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 20 Nov 2023 10:38:06 +0800 Subject: [PATCH 2/7] feat: impl the http bridge v2 --- apps/emqx_bridge/src/emqx_bridge_resource.erl | 1 + .../src/emqx_bridge_http_connector.erl | 146 +++++++++++++++++- .../src/emqx_connector_resource.erl | 20 +-- 3 files changed, 153 insertions(+), 14 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 674eceb81..c1de3b177 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -309,6 +309,7 @@ remove(Type, Name, _Conf, _Opts) -> emqx_resource:remove_local(resource_id(Type, Name)). %% convert bridge configs to what the connector modules want +%% TODO: remove it, if the http_bridge already ported to v2 parse_confs( <<"webhook">>, _Name, diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 5a5e790e5..3080a7a9e 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -31,9 +31,14 @@ on_query/3, on_query_async/4, on_get_status/2, - reply_delegator/3 + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). +-export([reply_delegator/3]). + -export([ roots/0, fields/1, @@ -251,6 +256,21 @@ start_pool(PoolName, PoolOpts) -> Error end. +on_add_channel( + _InstId, + OldState, + ActionId, + ActionConfig +) -> + InstalledActions = maps:get(installed_actions, OldState, #{}), + {ok, ActionState} = do_create_http_action(ActionConfig), + NewInstalledActions = maps:put(ActionId, ActionState, InstalledActions), + NewState = maps:put(installed_actions, NewInstalledActions, OldState), + {ok, NewState}. + +do_create_http_action(_ActionConfig = #{parameters := Params}) -> + {ok, preprocess_request(Params)}. + on_stop(InstId, _State) -> ?SLOG(info, #{ msg => "stopping_http_connector", @@ -260,6 +280,16 @@ on_stop(InstId, _State) -> ?tp(emqx_connector_http_stopped, #{instance_id => InstId}), Res. +on_remove_channel( + InstId, + OldState = #{installed_actions := InstalledActions}, + ActionId +) -> + NewInstalledActions = maps:remove(ActionId, InstalledActions), + NewState = maps:put(installed_actions, NewInstalledActions, OldState), + {ok, NewState}. + +%% BridgeV1 entrypoint on_query(InstId, {send_message, Msg}, State) -> case maps:get(request, State, undefined) of undefined -> @@ -282,6 +312,36 @@ on_query(InstId, {send_message, Msg}, State) -> State ) end; +%% BridgeV2 entrypoint +on_query( + InstId, + {ActionId, Msg}, + State = #{installed_actions := InstalledActions} +) when is_binary(ActionId) -> + case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of + {undefined, _} -> + ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}), + {error, arg_request_not_found}; + {_, undefined} -> + ?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}), + {error, action_not_found}; + {Request, ActionState} -> + #{ + method := Method, + path := Path, + body := Body, + headers := Headers, + request_timeout := Timeout + } = process_request_and_action(Request, ActionState, Msg), + %% bridge buffer worker has retry, do not let ehttpc retry + Retry = 2, + ClientId = maps:get(clientid, Msg, undefined), + on_query( + InstId, + {ClientId, Method, {Path, Headers, Body}, Timeout, Retry}, + State + ) + end; on_query(InstId, {Method, Request}, State) -> %% TODO: Get retry from State on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State); @@ -343,6 +403,7 @@ on_query( Result end. +%% BridgeV1 entrypoint on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> case maps:get(request, State, undefined) of undefined -> @@ -364,6 +425,36 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> State ) end; +%% BridgeV2 entrypoint +on_query_async( + InstId, + {ActionId, Msg}, + ReplyFunAndArgs, + State = #{installed_actions := InstalledActions} +) when is_binary(ActionId) -> + case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of + {undefined, _} -> + ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}), + {error, arg_request_not_found}; + {_, undefined} -> + ?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}), + {error, action_not_found}; + {Request, ActionState} -> + #{ + method := Method, + path := Path, + body := Body, + headers := Headers, + request_timeout := Timeout + } = process_request_and_action(Request, ActionState, Msg), + ClientId = maps:get(clientid, Msg, undefined), + on_query_async( + InstId, + {ClientId, Method, {Path, Headers, Body}, Timeout}, + ReplyFunAndArgs, + State + ) + end; on_query_async( InstId, {KeyOrNum, Method, Request, Timeout}, @@ -411,6 +502,9 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) -> ehttpc_pool:pick_worker(PoolName, Key) end. +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> case do_get_status(PoolName, Timeout) of ok -> @@ -456,6 +550,14 @@ do_get_status(PoolName, Timeout) -> {error, timeout} end. +on_get_channel_status( + InstId, + _ChannelId, + State +) -> + %% XXX: Reuse the connector status + on_get_status(InstId, State). + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -529,6 +631,48 @@ maybe_parse_template(Key, Conf) -> parse_template(String) -> emqx_template:parse(String). +process_request_and_action(Request, ActionState, Msg) -> + MethodTemplate = maps:get(method, ActionState), + Method = make_method(render_template_string(MethodTemplate, Msg)), + BodyTemplate = maps:get(body, ActionState), + Body = render_request_body(BodyTemplate, Msg), + + PathTemplate1 = maps:get(path, Request), + PathTemplate2 = maps:get(path, ActionState), + + Path = join_paths( + unicode:characters_to_list(render_template(PathTemplate1, Msg)), + unicode:characters_to_list(render_template(PathTemplate2, Msg)) + ), + + HeadersTemplate1 = maps:get(headers, Request), + HeadersTemplate2= maps:get(headers, ActionState), + Headers = merge_proplist( + render_headers(HeadersTemplate1, Msg), + render_headers(HeadersTemplate2, Msg) + ), + #{ + method => Method, + path => Path, + body => Body, + headers => Headers, + request_timeout => maps:get(request_timeout, ActionState) + }. + +merge_proplist(Proplist1, Proplist2) -> + lists:foldl( + fun({K, V}, Acc) -> + case lists:keyfind(K, 1, Acc) of + false -> + [{K, V} | Acc]; + {K, _} = {K, V1} -> + [{K, V1} | Acc] + end + end, + Proplist2, + Proplist1 + ). + process_request( #{ method := MethodTemplate, diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index bc648a102..6568af666 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -77,8 +77,10 @@ connector_impl_module(_ConnectorType) -> -endif. -connector_to_resource_type_ce(_ConnectorType) -> - no_bridge_v2_for_c2_so_far. +connector_to_resource_type_ce(webhook) -> + emqx_bridge_http_connector; +connector_to_resource_type_ce(ConnectorType) -> + error({no_bridge_v2, ConnectorType}). resource_id(ConnectorId) when is_binary(ConnectorId) -> <<"connector:", ConnectorId/binary>>. @@ -275,9 +277,7 @@ parse_confs( _Name, #{ url := Url, - method := Method, - headers := Headers, - max_retries := Retry + headers := Headers } = Conf ) -> Url1 = bin(Url), @@ -290,20 +290,14 @@ parse_confs( Reason1 = emqx_utils:readable_error_msg(Reason), invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>) end, - RequestTTL = emqx_utils_maps:deep_get( - [resource_opts, request_ttl], - Conf - ), Conf#{ base_url => BaseUrl1, request => #{ path => Path, - method => Method, - body => maps:get(body, Conf, undefined), headers => Headers, - request_ttl => RequestTTL, - max_retries => Retry + body => undefined, + method => undefined } }; parse_confs(<<"iotdb">>, Name, Conf) -> From 8954450c0ba286a7029e1b188565822cdd7822b8 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 20 Nov 2023 10:39:57 +0800 Subject: [PATCH 3/7] chore: fix compile warnings --- apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl | 4 ++-- apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 3080a7a9e..a9705585d 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -281,7 +281,7 @@ on_stop(InstId, _State) -> Res. on_remove_channel( - InstId, + _InstId, OldState = #{installed_actions := InstalledActions}, ActionId ) -> @@ -646,7 +646,7 @@ process_request_and_action(Request, ActionState, Msg) -> ), HeadersTemplate1 = maps:get(headers, Request), - HeadersTemplate2= maps:get(headers, ActionState), + HeadersTemplate2 = maps:get(headers, ActionState), Headers = merge_proplist( render_headers(HeadersTemplate1, Msg), render_headers(HeadersTemplate2, Msg) diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl index afe734105..4875cfdc9 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl @@ -123,7 +123,7 @@ fields("config_connector") -> default => true } )} - ] ++ connector_opts_1() ++ connector_opts_0(); + ] ++ connector_url_headers() ++ connector_opts(); %%-------------------------------------------------------------------- %% v1/v2 fields("resource_opts") -> @@ -155,7 +155,7 @@ basic_config() -> default => true } )} - ] ++ webhook_resource_opts() ++ connector_opts_0(). + ] ++ webhook_resource_opts() ++ connector_opts(). request_config() -> [ @@ -202,7 +202,7 @@ request_config() -> %%-------------------------------------------------------------------- %% helpers for v2 only -connector_opts_1() -> +connector_url_headers() -> [url_field(), headers_field()]. %%-------------------------------------------------------------------- @@ -287,7 +287,7 @@ webhook_resource_opts() -> )} ]. -connector_opts_0() -> +connector_opts() -> mark_request_field_deperecated( proplists:delete(max_retries, emqx_bridge_http_connector:fields(config)) ). From dc996516904a6036487f73514471d3fab97442d9 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 21 Nov 2023 10:45:26 +0800 Subject: [PATCH 4/7] test(bridge): ensure almost test cases passed --- apps/emqx/test/emqx_cth_suite.erl | 2 + apps/emqx_bridge/src/emqx_bridge_api.erl | 3 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 86 ++++++++--- apps/emqx_bridge/test/emqx_bridge_SUITE.erl | 52 +++---- .../test/emqx_bridge_api_SUITE.erl | 32 ++-- apps/emqx_bridge/test/emqx_bridge_testlib.erl | 2 +- .../src/emqx_bridge_http.app.src | 2 +- .../src/emqx_bridge_http_action_info.erl | 70 ++++++++- .../src/emqx_bridge_http_connector.erl | 17 ++- .../src/emqx_bridge_http_schema.erl | 60 +++++--- .../test/emqx_bridge_http_SUITE.erl | 60 +++++--- .../src/emqx_connector_resource.erl | 2 + .../src/schema/emqx_connector_schema.erl | 1 - .../emqx_rule_engine/src/emqx_rule_engine.erl | 16 +- .../test/emqx_telemetry_SUITE.erl | 144 ++++++------------ 15 files changed, 329 insertions(+), 220 deletions(-) diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 401d4f59d..4cba524ae 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -453,6 +453,8 @@ stop_apps(Apps) -> %% +verify_clean_suite_state(#{allow_dirty_work_dir := true}) -> + ok; verify_clean_suite_state(#{work_dir := WorkDir}) -> {ok, []} = file:list_dir(WorkDir), false = emqx_schema_hooks:any_injections(), diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 188f26ab5..fe7b576f5 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -650,7 +650,8 @@ create_or_update_bridge(BridgeType0, BridgeName, Conf, HttpStatusCode) -> get_metrics_from_local_node(BridgeType0, BridgeName) -> BridgeType = upgrade_type(BridgeType0), - format_metrics(emqx_bridge:get_metrics(BridgeType, BridgeName)). + MetricsResult = emqx_bridge:get_metrics(BridgeType, BridgeName), + format_metrics(MetricsResult). '/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> ?TRY_PARSE_ID( diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index d9ca1acce..0c0c0752d 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -1163,7 +1163,7 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) -> %% If the bridge v2 does not exist, it is a valid bridge v1 PreviousRawConf = undefined, split_bridge_v1_config_and_create_helper( - BridgeV1Type, BridgeName, RawConf, PreviousRawConf + BridgeV1Type, BridgeName, RawConf, PreviousRawConf, fun() -> ok end ); _Conf -> case ?MODULE:bridge_v1_is_valid(BridgeV1Type, BridgeName) of @@ -1173,9 +1173,13 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) -> PreviousRawConf = emqx:get_raw_config( [?ROOT_KEY, BridgeV2Type, BridgeName], undefined ), - bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps), + %% To avoid losing configurations. We have to make sure that no crash occurs + %% during deletion and creation of configurations. + PreCreateFun = fun() -> + bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) + end, split_bridge_v1_config_and_create_helper( - BridgeV1Type, BridgeName, RawConf, PreviousRawConf + BridgeV1Type, BridgeName, RawConf, PreviousRawConf, PreCreateFun ); false -> %% If the bridge v2 exists, it is not a valid bridge v1 @@ -1183,16 +1187,49 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) -> end end. -split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf, PreviousRawConf) -> - #{ - connector_type := ConnectorType, - connector_name := NewConnectorName, - connector_conf := NewConnectorRawConf, - bridge_v2_type := BridgeType, - bridge_v2_name := BridgeName, - bridge_v2_conf := NewBridgeV2RawConf - } = - split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousRawConf), +split_bridge_v1_config_and_create_helper( + BridgeV1Type, BridgeName, RawConf, PreviousRawConf, PreCreateFun +) -> + try + #{ + connector_type := ConnectorType, + connector_name := NewConnectorName, + connector_conf := NewConnectorRawConf, + bridge_v2_type := BridgeType, + bridge_v2_name := BridgeName, + bridge_v2_conf := NewBridgeV2RawConf + } = split_and_validate_bridge_v1_config( + BridgeV1Type, + BridgeName, + RawConf, + PreviousRawConf + ), + + _ = PreCreateFun(), + + do_connector_and_bridge_create( + ConnectorType, + NewConnectorName, + NewConnectorRawConf, + BridgeType, + BridgeName, + NewBridgeV2RawConf, + RawConf + ) + catch + throw:Reason -> + {error, Reason} + end. + +do_connector_and_bridge_create( + ConnectorType, + NewConnectorName, + NewConnectorRawConf, + BridgeType, + BridgeName, + NewBridgeV2RawConf, + RawConf +) -> case emqx_connector:create(ConnectorType, NewConnectorName, NewConnectorRawConf) of {ok, _} -> case create(BridgeType, BridgeName, NewBridgeV2RawConf) of @@ -1308,15 +1345,20 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) -> RawConf = maps:without([<<"name">>], RawConfig0), TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), PreviousRawConf = undefined, - #{ - connector_type := _ConnectorType, - connector_name := _NewConnectorName, - connector_conf := ConnectorRawConf, - bridge_v2_type := BridgeV2Type, - bridge_v2_name := _BridgeName, - bridge_v2_conf := BridgeV2RawConf - } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf), - create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf). + try + #{ + connector_type := _ConnectorType, + connector_name := _NewConnectorName, + connector_conf := ConnectorRawConf, + bridge_v2_type := BridgeV2Type, + bridge_v2_name := _BridgeName, + bridge_v2_conf := BridgeV2RawConf + } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf), + create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf) + catch + throw:Reason -> + {error, Reason} + end. bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) -> BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl index bc8be5476..eef4c0efb 100644 --- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -30,14 +30,18 @@ init_per_suite(Config) -> [ emqx, emqx_conf, + emqx_connector, + emqx_bridge_http, emqx_bridge ], #{work_dir => ?config(priv_dir, Config)} ), + emqx_mgmt_api_test_util:init_suite(), [{apps, Apps} | Config]. end_per_suite(Config) -> Apps = ?config(apps, Config), + emqx_mgmt_api_test_util:end_suite(), ok = emqx_cth_suite:stop(Apps), ok. @@ -125,34 +129,26 @@ setup_fake_telemetry_data() -> headers => #{}, request_timeout => "15s" }, - Conf = - #{ - <<"bridges">> => - #{ - <<"webhook">> => - #{ - <<"basic_usage_info_webhook">> => HTTPConfig, - <<"basic_usage_info_webhook_disabled">> => - HTTPConfig#{enable => false} - }, - <<"mqtt">> => - #{ - <<"basic_usage_info_mqtt">> => MQTTConfig1, - <<"basic_usage_info_mqtt_from_select">> => MQTTConfig2 - } - } - }, - ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf), - - ok = snabbkaffe:start_trace(), - Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_loaded end, - NEvents = 3, - BackInTime = 0, - Timeout = 11_000, - {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, NEvents, Timeout, BackInTime), - ok = emqx_bridge:load(), - {ok, _} = snabbkaffe_collector:receive_events(Sub), - ok = snabbkaffe:stop(), + {ok, _} = emqx_bridge_testlib:create_bridge_api( + <<"webhook">>, + <<"basic_usage_info_webhook">>, + HTTPConfig + ), + {ok, _} = emqx_bridge_testlib:create_bridge_api( + <<"webhook">>, + <<"basic_usage_info_webhook_disabled">>, + HTTPConfig#{enable => false} + ), + {ok, _} = emqx_bridge_testlib:create_bridge_api( + <<"mqtt">>, + <<"basic_usage_info_mqtt">>, + MQTTConfig1 + ), + {ok, _} = emqx_bridge_testlib:create_bridge_api( + <<"mqtt">>, + <<"basic_usage_info_mqtt_from_select">>, + MQTTConfig2 + ), ok. t_update_ssl_conf(Config) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index ccc944572..339315941 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -78,6 +78,9 @@ emqx_auth, emqx_auth_mnesia, emqx_management, + emqx_connector, + emqx_bridge_http, + emqx_bridge, {emqx_rule_engine, "rule_engine { rules {} }"}, {emqx_bridge, "bridges {}"} ]). @@ -407,10 +410,7 @@ t_http_crud_apis(Config) -> Config ), ?assertMatch( - #{ - <<"reason">> := <<"unknown_fields">>, - <<"unknown">> := <<"curl">> - }, + #{<<"reason">> := <<"required_field">>}, json(maps:get(<<"message">>, PutFail2)) ), {ok, 400, _} = request_json( @@ -419,12 +419,16 @@ t_http_crud_apis(Config) -> ?HTTP_BRIDGE(<<"localhost:1234/foo">>, Name), Config ), - {ok, 400, _} = request_json( + {ok, 400, PutFail3} = request_json( put, uri(["bridges", BridgeID]), ?HTTP_BRIDGE(<<"htpp://localhost:12341234/foo">>, Name), Config ), + ?assertMatch( + #{<<"kind">> := <<"validation_error">>}, + json(maps:get(<<"message">>, PutFail3)) + ), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), @@ -463,7 +467,7 @@ t_http_crud_apis(Config) -> ), %% Create non working bridge - BrokenURL = ?URL(Port + 1, "/foo"), + BrokenURL = ?URL(Port + 1, "foo"), {ok, 201, BrokenBridge} = request( post, uri(["bridges"]), @@ -471,6 +475,7 @@ t_http_crud_apis(Config) -> fun json/1, Config ), + ?assertMatch( #{ <<"type">> := ?BRIDGE_TYPE_HTTP, @@ -1307,6 +1312,7 @@ t_cluster_later_join_metrics(Config) -> Name = ?BRIDGE_NAME, BridgeParams = ?HTTP_BRIDGE(URL1, Name), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), + ?check_trace( begin %% Create a bridge on only one of the nodes. @@ -1326,7 +1332,12 @@ t_cluster_later_join_metrics(Config) -> ?assertMatch( {ok, 200, #{ <<"metrics">> := #{<<"success">> := _}, - <<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _] + %% TODO: Why the node2 returns {error, bridge_not_found}? + %% ct:pal("node: ~p, bridges: ~p~n", [ + %% OtherNode, erpc:call(OtherNode, emqx_bridge, list, []) + %% ]), + %%<<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _] + <<"node_metrics">> := [#{<<"metrics">> := #{}} | _] }}, request_json(get, uri(["bridges", BridgeID, "metrics"]), Config) ), @@ -1373,17 +1384,16 @@ t_create_with_bad_name(Config) -> validate_resource_request_ttl(single, Timeout, Name) -> SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000}, - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), - ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name), + _BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), ?check_trace( begin {ok, Res} = ?wait_async_action( - emqx_bridge:send_message(BridgeID, SentData), + emqx_bridge_v2:send_message(<<"webhook">>, Name, SentData, #{}), #{?snk_kind := async_query}, 1000 ), - ?assertMatch({ok, #{id := ResId, query_opts := #{timeout := Timeout}}}, Res) + ?assertMatch({ok, #{id := _ResId, query_opts := #{timeout := Timeout}}}, Res) end, fun(Trace0) -> Trace = ?of_kind(async_query, Trace0), diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index f486e5d64..118802551 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -92,7 +92,7 @@ end_per_testcase(_Testcase, Config) -> delete_all_bridges() -> lists:foreach( fun(#{name := Name, type := Type}) -> - emqx_bridge:remove(Type, Name) + ok = emqx_bridge:remove(Type, Name) end, emqx_bridge:list() ). diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src index 0e82d1635..9cd71323e 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src +++ b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src @@ -3,7 +3,7 @@ {vsn, "0.1.5"}, {registered, []}, {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]}, - {env, [{emqx_action_info_module, emqx_bridge_http_action_info}]}, + {env, [{emqx_action_info_modules, [emqx_bridge_http_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl index 41be4f1e8..19514927e 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl @@ -22,9 +22,16 @@ bridge_v1_type_name/0, action_type_name/0, connector_type_name/0, - schema_module/0 + schema_module/0, + connector_action_config_to_bridge_v1_config/2, + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1 ]). +-define(REMOVED_KEYS, [<<"direction">>]). +-define(ACTION_KEYS, [<<"local_topic">>, <<"resource_opts">>]). +-define(PARAMETER_KEYS, [<<"body">>, <<"max_retries">>, <<"method">>, <<"request_timeout">>]). + bridge_v1_type_name() -> webhook. action_type_name() -> webhook. @@ -32,3 +39,64 @@ action_type_name() -> webhook. connector_type_name() -> webhook. schema_module() -> emqx_bridge_http_schema. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig), + %% Move parameters to the top level + ParametersMap1 = maps:get(<<"parameters">>, BridgeV1Config1, #{}), + ParametersMap2 = maps:without([<<"path">>, <<"headers">>], ParametersMap1), + BridgeV1Config2 = maps:remove(<<"parameters">>, BridgeV1Config1), + BridgeV1Config3 = emqx_utils_maps:deep_merge(BridgeV1Config2, ParametersMap2), + BridgeV1Config4 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config3), + + Url = maps:get(<<"url">>, ConnectorConfig), + Path = maps:get(<<"path">>, ParametersMap1, <<>>), + + Headers1 = maps:get(<<"headers">>, ConnectorConfig, #{}), + Headers2 = maps:get(<<"headers">>, ParametersMap1, #{}), + + Url1 = + case Path of + <<>> -> Url; + _ -> emqx_bridge_http_connector:join_paths(Url, Path) + end, + + BridgeV1Config4#{ + <<"headers">> => maps:merge(Headers1, Headers2), + <<"url">> => Url1 + }. + +bridge_v1_config_to_connector_config(BridgeV1Conf) -> + %% To statisfy the emqx_bridge_api_SUITE:t_http_crud_apis/1 + ok = validate_webhook_url(maps:get(<<"url">>, BridgeV1Conf, undefined)), + maps:without(?REMOVED_KEYS ++ ?ACTION_KEYS ++ ?PARAMETER_KEYS, BridgeV1Conf). + +bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> + Parameters = maps:with(?PARAMETER_KEYS, BridgeV1Conf), + Parameters1 = Parameters#{<<"path">> => <<>>}, + CommonKeys = [<<"enable">>, <<"description">>], + ActionConfig = maps:with(?ACTION_KEYS ++ CommonKeys, BridgeV1Conf), + ActionConfig#{<<"parameters">> => Parameters1, <<"connector">> => ConnectorName}. + +%%-------------------------------------------------------------------- +%% helpers + +validate_webhook_url(undefined) -> + throw(#{ + kind => validation_error, + reason => required_field, + required_field => <<"url">> + }); +validate_webhook_url(Url) -> + {BaseUrl, _Path} = emqx_connector_resource:parse_url(Url), + case emqx_http_lib:uri_parse(BaseUrl) of + {ok, _} -> + ok; + {error, Reason} -> + throw(#{ + kind => validation_error, + reason => invalid_url, + url => Url, + error => emqx_utils:readable_error_msg(Reason) + }) + end. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index a9705585d..a49a1b659 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -568,10 +568,10 @@ preprocess_request(Req) when map_size(Req) == 0 -> preprocess_request( #{ method := Method, - path := Path, - headers := Headers + path := Path } = Req ) -> + Headers = maps:get(headers, Req, []), #{ method => parse_template(to_bin(Method)), path => parse_template(Path), @@ -637,13 +637,14 @@ process_request_and_action(Request, ActionState, Msg) -> BodyTemplate = maps:get(body, ActionState), Body = render_request_body(BodyTemplate, Msg), - PathTemplate1 = maps:get(path, Request), - PathTemplate2 = maps:get(path, ActionState), + PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)), + PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)), - Path = join_paths( - unicode:characters_to_list(render_template(PathTemplate1, Msg)), - unicode:characters_to_list(render_template(PathTemplate2, Msg)) - ), + Path = + case PathSuffix of + "" -> PathPrefix; + _ -> join_paths(PathPrefix, PathSuffix) + end, HeadersTemplate1 = maps:get(headers, Request), HeadersTemplate2 = maps:get(headers, ActionState), diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl index 4875cfdc9..703eb01ed 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl @@ -75,7 +75,14 @@ fields(webhook_action) -> %% for egress bridges with this config, the published messages %% will be forwarded to such bridges. {local_topic, - mk(binary(), #{required => false, desc => ?DESC(emqx_bridge_kafka, mqtt_topic)})}, + mk( + binary(), + #{ + required => false, + desc => ?DESC("config_local_topic"), + importance => ?IMPORTANCE_HIDDEN + } + )}, %% Since e5.3.2, we split the webhook_bridge to two parts: a) connector. b) actions. %% some fields are moved to connector, some fields are moved to actions and composed into the %% `parameters` field. @@ -83,8 +90,6 @@ fields(webhook_action) -> mk(ref(parameters_opts), #{ required => true, desc => ?DESC(parameters_opts) - %% TODO: - %%validator => fun producer_strategy_key_validator/1 })} ] ++ webhook_resource_opts(); fields(parameters_opts) -> @@ -99,7 +104,9 @@ fields(parameters_opts) -> )}, method_field(), headers_field(), - body_field() + body_field(), + max_retries_field(), + request_timeout_field() ]; %% v2: api schema %% The parameter equls to @@ -122,7 +129,8 @@ fields("config_connector") -> desc => <<"Enable or disable this connector">>, default => true } - )} + )}, + {description, emqx_schema:description_schema()} ] ++ connector_url_headers() ++ connector_opts(); %%-------------------------------------------------------------------- %% v1/v2 @@ -139,6 +147,8 @@ desc("resource_opts") -> ?DESC(emqx_resource_schema, "creation_opts"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for WebHook using `", string:to_upper(Method), "` method."]; +desc("config_connector") -> + ?DESC("desc_config"); desc(_) -> undefined. @@ -180,23 +190,8 @@ request_config() -> method_field(), headers_field(), body_field(), - {max_retries, - mk( - non_neg_integer(), - #{ - default => 2, - desc => ?DESC("config_max_retries") - } - )}, - {request_timeout, - mk( - emqx_schema:duration_ms(), - #{ - default => <<"15s">>, - deprecated => {since, "v5.0.26"}, - desc => ?DESC("config_request_timeout") - } - )} + max_retries_field(), + request_timeout_field() ]. %%-------------------------------------------------------------------- @@ -274,6 +269,27 @@ body_field() -> } )}. +max_retries_field() -> + {max_retries, + mk( + non_neg_integer(), + #{ + default => 2, + desc => ?DESC("config_max_retries") + } + )}. + +request_timeout_field() -> + {request_timeout, + mk( + emqx_schema:duration_ms(), + #{ + default => <<"15s">>, + deprecated => {since, "v5.0.26"}, + desc => ?DESC("config_request_timeout") + } + )}. + webhook_resource_opts() -> [ {resource_opts, diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl index d9fc595fe..cc0f2046c 100644 --- a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl @@ -39,18 +39,33 @@ all() -> groups() -> []. -init_per_suite(_Config) -> - emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge, emqx_rule_engine]), - ok = emqx_connector_test_helpers:start_apps([emqx_resource]), - {ok, _} = application:ensure_all_started(emqx_connector), - []. +init_per_suite(Config0) -> + Config = + case os:getenv("DEBUG_CASE") of + [_ | _] = DebugCase -> + CaseName = list_to_atom(DebugCase), + [{debug_case, CaseName} | Config0]; + _ -> + Config0 + end, + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge_http, + emqx_bridge, + emqx_rule_engine + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + emqx_mgmt_api_test_util:init_suite(), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - ok = emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge, emqx_conf]), - ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), - _ = application:stop(emqx_connector), - _ = application:stop(emqx_bridge), +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_cth_suite:stop(Apps), ok. suite() -> @@ -115,7 +130,9 @@ end_per_testcase(TestCase, _Config) when -> ok = emqx_bridge_http_connector_test_server:stop(), persistent_term:erase({?MODULE, times_called}), - emqx_bridge_testlib:delete_all_bridges(), + %emqx_bridge_testlib:delete_all_bridges(), + emqx_bridge_v2_testlib:delete_all_bridges(), + emqx_bridge_v2_testlib:delete_all_connectors(), emqx_common_test_helpers:call_janitor(), ok; end_per_testcase(_TestCase, Config) -> @@ -123,7 +140,8 @@ end_per_testcase(_TestCase, Config) -> undefined -> ok; Server -> stop_http_server(Server) end, - emqx_bridge_testlib:delete_all_bridges(), + emqx_bridge_v2_testlib:delete_all_bridges(), + emqx_bridge_v2_testlib:delete_all_connectors(), emqx_common_test_helpers:call_janitor(), ok. @@ -420,7 +438,7 @@ t_send_async_connection_timeout(Config) -> ), NumberOfMessagesToSend = 10, [ - emqx_bridge:send_message(BridgeID, #{<<"id">> => Id}) + do_send_message(#{<<"id">> => Id}) || Id <- lists:seq(1, NumberOfMessagesToSend) ], %% Make sure server receives all messages @@ -431,7 +449,7 @@ t_send_async_connection_timeout(Config) -> t_async_free_retries(Config) -> #{port := Port} = ?config(http_server, Config), - BridgeID = make_bridge(#{ + _BridgeID = make_bridge(#{ port => Port, pool_size => 1, query_mode => "sync", @@ -445,7 +463,7 @@ t_async_free_retries(Config) -> Fn = fun(Get, Error) -> ?assertMatch( {ok, 200, _, _}, - emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}), + do_send_message(#{<<"hello">> => <<"world">>}), #{error => Error} ), ?assertEqual(ExpectedAttempts, Get(), #{error => Error}) @@ -456,7 +474,7 @@ t_async_free_retries(Config) -> t_async_common_retries(Config) -> #{port := Port} = ?config(http_server, Config), - BridgeID = make_bridge(#{ + _BridgeID = make_bridge(#{ port => Port, pool_size => 1, query_mode => "sync", @@ -471,7 +489,7 @@ t_async_common_retries(Config) -> FnSucceed = fun(Get, Error) -> ?assertMatch( {ok, 200, _, _}, - emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}), + do_send_message(#{<<"hello">> => <<"world">>}), #{error => Error, attempts => Get()} ), ?assertEqual(ExpectedAttempts, Get(), #{error => Error}) @@ -479,7 +497,7 @@ t_async_common_retries(Config) -> FnFail = fun(Get, Error) -> ?assertMatch( Error, - emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}), + do_send_message(#{<<"hello">> => <<"world">>}), #{error => Error, attempts => Get()} ), ?assertEqual(ExpectedAttempts, Get(), #{error => Error}) @@ -711,6 +729,10 @@ t_bridge_probes_header_atoms(Config) -> ok. %% helpers + +do_send_message(Message) -> + emqx_bridge_v2:send_message(?BRIDGE_TYPE, ?BRIDGE_NAME, Message, #{}). + do_t_async_retries(TestCase, TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, PTKey = {?MODULE, TestCase, attempts}, diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 6568af666..1d6ea072f 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -49,6 +49,8 @@ get_channels/2 ]). +-export([parse_url/1]). + -callback connector_config(ParsedConfig) -> ParsedConfig when diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 51d716182..2b05c2328 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -33,7 +33,6 @@ -export([connector_type_to_bridge_types/1]). - -export([resource_opts_fields/0, resource_opts_fields/1]). -export([examples/1]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index aadd3d4f5..afa57dfac 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -583,10 +583,18 @@ get_referenced_hookpoints(Froms) -> ]. get_egress_bridges(Actions) -> - [ - emqx_bridge_resource:bridge_id(BridgeType, BridgeName) - || {bridge, BridgeType, BridgeName, _ResId} <- Actions - ]. + lists:foldr( + fun + ({bridge, BridgeType, BridgeName, _ResId}, Acc) -> + [emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc]; + ({bridge_v2, BridgeType, BridgeName}, Acc) -> + [emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc]; + (_, Acc) -> + Acc + end, + [], + Actions + ). %% For allowing an external application to add extra "built-in" functions to the %% rule engine SQL like language. The module set by diff --git a/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl b/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl index 07cb18e60..91c0b3795 100644 --- a/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl @@ -41,44 +41,32 @@ suite() -> apps() -> [ emqx_conf, - emqx_management, + emqx_connector, emqx_retainer, emqx_auth, emqx_auth_redis, emqx_auth_mnesia, emqx_auth_postgresql, emqx_modules, - emqx_telemetry + emqx_telemetry, + emqx_bridge_http, + emqx_bridge, + emqx_rule_engine, + emqx_management ]. init_per_suite(Config) -> - net_kernel:start(['master@127.0.0.1', longnames]), - ok = meck:new(emqx_authz_file, [non_strict, passthrough, no_history, no_link]), - meck:expect( - emqx_authz_file, - acl_conf_file, - fun() -> - emqx_common_test_helpers:deps_path(emqx_auth, "etc/acl.conf") - end - ), - ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF), - emqx_gateway_test_utils:load_all_gateway_apps(), - start_apps(), - Config. + WorkDir = ?config(priv_dir, Config), + Apps = emqx_cth_suite:start(apps(), #{work_dir => WorkDir}), + emqx_mgmt_api_test_util:init_suite(), + [{apps, Apps}, {work_dir, WorkDir} | Config]. -end_per_suite(_Config) -> - {ok, _} = emqx:update_config( - [authorization], - #{ - <<"no_match">> => <<"allow">>, - <<"cache">> => #{<<"enable">> => <<"true">>}, - <<"sources">> => [] - } - ), +end_per_suite(Config) -> mnesia:clear_table(cluster_rpc_commit), mnesia:clear_table(cluster_rpc_mfa), - stop_apps(), - meck:unload(emqx_authz_file), + Apps = ?config(apps, Config), + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_cth_suite:stop(Apps), ok. init_per_testcase(t_get_telemetry_without_memsup, Config) -> @@ -123,7 +111,6 @@ init_per_testcase(t_advanced_mqtt_features, Config) -> mock_advanced_mqtt_features(), Config; init_per_testcase(t_authn_authz_info, Config) -> - mock_httpc(), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), create_authn('mqtt:global', built_in_database), create_authn('tcp:default', redis), @@ -141,14 +128,11 @@ init_per_testcase(t_send_after_enable, Config) -> mock_httpc(), Config; init_per_testcase(t_rule_engine_and_data_bridge_info, Config) -> - mock_httpc(), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), - emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_bridge]), ok = emqx_bridge_SUITE:setup_fake_telemetry_data(), ok = setup_fake_rule_engine_data(), Config; init_per_testcase(t_exhook_info, Config) -> - mock_httpc(), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), ExhookConf = #{ @@ -173,31 +157,8 @@ init_per_testcase(t_cluster_uuid, Config) -> Node = start_slave(n1), [{n1, Node} | Config]; init_per_testcase(t_uuid_restored_from_file, Config) -> - mock_httpc(), - NodeUUID = <<"AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE">>, - ClusterUUID = <<"FFFFFFFF-GGGG-HHHH-IIII-JJJJJJJJJJJJ">>, - DataDir = emqx:data_dir(), - NodeUUIDFile = filename:join(DataDir, "node.uuid"), - ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"), - file:delete(NodeUUIDFile), - file:delete(ClusterUUIDFile), - ok = file:write_file(NodeUUIDFile, NodeUUID), - ok = file:write_file(ClusterUUIDFile, ClusterUUID), - - %% clear the UUIDs in the DB - {atomic, ok} = mria:clear_table(emqx_telemetry), - stop_apps(), - ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF), - start_apps(), - Node = start_slave(n1), - [ - {n1, Node}, - {node_uuid, NodeUUID}, - {cluster_uuid, ClusterUUID} - | Config - ]; + Config; init_per_testcase(t_uuid_saved_to_file, Config) -> - mock_httpc(), DataDir = emqx:data_dir(), NodeUUIDFile = filename:join(DataDir, "node.uuid"), ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"), @@ -205,7 +166,6 @@ init_per_testcase(t_uuid_saved_to_file, Config) -> file:delete(ClusterUUIDFile), Config; init_per_testcase(t_num_clients, Config) -> - mock_httpc(), ok = snabbkaffe:start_trace(), Config; init_per_testcase(_Testcase, Config) -> @@ -227,7 +187,6 @@ end_per_testcase(t_advanced_mqtt_features, _Config) -> {atomic, ok} = mria:clear_table(emqx_delayed), ok; end_per_testcase(t_authn_authz_info, _Config) -> - meck:unload([httpc]), emqx_authz:update({delete, postgresql}, #{}), lists:foreach( fun(ChainName) -> @@ -244,19 +203,8 @@ end_per_testcase(t_enable, _Config) -> end_per_testcase(t_send_after_enable, _Config) -> meck:unload([httpc, emqx_telemetry_config]); end_per_testcase(t_rule_engine_and_data_bridge_info, _Config) -> - meck:unload(httpc), - lists:foreach( - fun(App) -> - ok = application:stop(App) - end, - [ - emqx_bridge, - emqx_rule_engine - ] - ), ok; end_per_testcase(t_exhook_info, _Config) -> - meck:unload(httpc), emqx_exhook_demo_svr:stop(), application:stop(emqx_exhook), ok; @@ -264,21 +212,12 @@ end_per_testcase(t_cluster_uuid, Config) -> Node = proplists:get_value(n1, Config), ok = stop_slave(Node); end_per_testcase(t_num_clients, Config) -> - meck:unload([httpc]), ok = snabbkaffe:stop(), Config; -end_per_testcase(t_uuid_restored_from_file, Config) -> - Node = ?config(n1, Config), - DataDir = emqx:data_dir(), - NodeUUIDFile = filename:join(DataDir, "node.uuid"), - ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"), - ok = file:delete(NodeUUIDFile), - ok = file:delete(ClusterUUIDFile), - meck:unload([httpc]), - ok = stop_slave(Node), - ok; end_per_testcase(_Testcase, _Config) -> - meck:unload([httpc]), + case catch meck:unload([httpc]) of + _ -> ok + end, ok. %%------------------------------------------------------------------------------ @@ -315,19 +254,34 @@ t_cluster_uuid(Config) -> %% should attempt read UUID from file in data dir to keep UUIDs %% unique, in the event of a database purge. t_uuid_restored_from_file(Config) -> - ExpectedNodeUUID = ?config(node_uuid, Config), - ExpectedClusterUUID = ?config(cluster_uuid, Config), + %% Stop the emqx_telemetry application first + {atomic, ok} = mria:clear_table(emqx_telemetry), + application:stop(emqx_telemetry), + + %% Rewrite the the uuid files + NodeUUID = <<"AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE">>, + ClusterUUID = <<"FFFFFFFF-GGGG-HHHH-IIII-JJJJJJJJJJJJ">>, + DataDir = ?config(work_dir, Config), + NodeUUIDFile = filename:join(DataDir, "node.uuid"), + ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"), + ok = file:write_file(NodeUUIDFile, NodeUUID), + ok = file:write_file(ClusterUUIDFile, ClusterUUID), + + %% Start the emqx_telemetry application again + application:start(emqx_telemetry), + + %% Check the UUIDs ?assertEqual( - {ok, ExpectedNodeUUID}, + {ok, NodeUUID}, emqx_telemetry:get_node_uuid() ), ?assertEqual( - {ok, ExpectedClusterUUID}, + {ok, ClusterUUID}, emqx_telemetry:get_cluster_uuid() ), ok. -t_uuid_saved_to_file(_Config) -> +t_uuid_saved_to_file(Config) -> DataDir = emqx:data_dir(), NodeUUIDFile = filename:join(DataDir, "node.uuid"), ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"), @@ -337,9 +291,10 @@ t_uuid_saved_to_file(_Config) -> %% clear the UUIDs in the DB {atomic, ok} = mria:clear_table(emqx_telemetry), - stop_apps(), - ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF), - start_apps(), + application:stop(emqx_telemetry), + + application:start(emqx_telemetry), + {ok, NodeUUID} = emqx_telemetry:get_node_uuid(), {ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(), ?assertEqual( @@ -578,6 +533,7 @@ t_mqtt_runtime_insights(_) -> t_rule_engine_and_data_bridge_info(_Config) -> {ok, TelemetryData} = emqx_telemetry:get_telemetry(), + ct:pal("telemetry data: ~p~n", [TelemetryData]), RuleInfo = get_value(rule_engine, TelemetryData), BridgeInfo = get_value(bridge, TelemetryData), ?assertEqual( @@ -811,14 +767,6 @@ setup_fake_rule_engine_data() -> ), ok. -set_special_configs(emqx_auth) -> - {ok, _} = emqx:update_config([authorization, cache, enable], false), - {ok, _} = emqx:update_config([authorization, no_match], deny), - {ok, _} = emqx:update_config([authorization, sources], []), - ok; -set_special_configs(_App) -> - ok. - %% for some unknown reason, gen_rpc running locally or in CI might %% start with different `port_discovery' modes, which means that'll %% either be listening at the port in the config (`tcp_server_port', @@ -887,9 +835,3 @@ leave_cluster() -> is_official_version(V) -> emqx_telemetry_config:is_official_version(V). - -start_apps() -> - emqx_common_test_helpers:start_apps(apps(), fun set_special_configs/1). - -stop_apps() -> - emqx_common_test_helpers:stop_apps(lists:reverse(apps())). From cdb90ebe6b780fcf406751b571ba679baac1262d Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 24 Nov 2023 14:47:24 +0800 Subject: [PATCH 5/7] feat: rename webhook bridge to http bridge --- apps/emqx_bridge/src/emqx_bridge_api.erl | 12 ++--- apps/emqx_bridge/src/emqx_bridge_resource.erl | 23 ++++---- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 2 +- .../schema/emqx_bridge_compatible_config.erl | 6 +-- .../src/schema/emqx_bridge_schema.erl | 9 ++-- apps/emqx_bridge/test/emqx_bridge_SUITE.erl | 6 ++- .../test/emqx_bridge_api_SUITE.erl | 6 ++- .../emqx_bridge_compatible_config_tests.erl | 2 +- .../src/emqx_bridge_http_action_info.erl | 4 +- .../src/emqx_bridge_http_connector.erl | 4 +- .../src/emqx_bridge_http_schema.erl | 52 ++++++++++++------- .../test/emqx_bridge_http_SUITE.erl | 7 +-- .../emqx_connector/src/emqx_connector_api.erl | 2 +- .../src/emqx_connector_resource.erl | 4 +- .../src/schema/emqx_connector_schema.erl | 7 +-- 15 files changed, 86 insertions(+), 60 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index fe7b576f5..c9c761105 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -143,7 +143,7 @@ param_path_id() -> #{ in => path, required => true, - example => <<"webhook:webhook_example">>, + example => <<"http:http_example">>, desc => ?DESC("desc_param_path_id") } )}. @@ -166,9 +166,9 @@ bridge_info_array_example(Method) -> bridge_info_examples(Method) -> maps:merge( #{ - <<"webhook_example">> => #{ - summary => <<"WebHook">>, - value => info_example(webhook, Method) + <<"http_example">> => #{ + summary => <<"HTTP">>, + value => info_example(http, Method) }, <<"mqtt_example">> => #{ summary => <<"MQTT Bridge">>, @@ -201,7 +201,7 @@ method_example(Type, Method) when Method == get; Method == post -> method_example(_Type, put) -> #{}. -info_example_basic(webhook) -> +info_example_basic(http) -> #{ enable => true, url => <<"http://localhost:9901/messages/${topic}">>, @@ -212,7 +212,7 @@ info_example_basic(webhook) -> pool_size => 4, enable_pipelining => 100, ssl => #{enable => false}, - local_topic => <<"emqx_webhook/#">>, + local_topic => <<"emqx_http/#">>, method => post, body => <<"${payload}">>, resource_opts => #{ diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index c1de3b177..b3dec7905 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -63,18 +63,23 @@ ). -if(?EMQX_RELEASE_EDITION == ee). -bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; -bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; -bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector; -bridge_to_resource_type(webhook) -> emqx_bridge_http_connector; -bridge_to_resource_type(BridgeType) -> emqx_bridge_enterprise:resource_type(BridgeType). +bridge_to_resource_type(BridgeType) when is_binary(BridgeType) -> + bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8)); +bridge_to_resource_type(mqtt) -> + emqx_bridge_mqtt_connector; +bridge_to_resource_type(webhook) -> + emqx_bridge_http_connector; +bridge_to_resource_type(BridgeType) -> + emqx_bridge_enterprise:resource_type(BridgeType). bridge_impl_module(BridgeType) -> emqx_bridge_enterprise:bridge_impl_module(BridgeType). -else. -bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; -bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; -bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector; -bridge_to_resource_type(webhook) -> emqx_bridge_http_connector. +bridge_to_resource_type(BridgeType) when is_binary(Type) -> + bridge_to_resource_type(binary_to_existing_atom(Type, utf8)); +bridge_to_resource_type(mqtt) -> + emqx_bridge_mqtt_connector; +bridge_to_resource_type(webhook) -> + emqx_bridge_http_connector. bridge_impl_module(_BridgeType) -> undefined. -endif. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index cb1f7cc62..f2a51c6cb 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -110,7 +110,7 @@ param_path_id() -> #{ in => path, required => true, - example => <<"webhook:webhook_example">>, + example => <<"http:my_http_action">>, desc => ?DESC("desc_param_path_id") } )}. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl index 6adbf3942..b68a4c387 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl @@ -21,7 +21,7 @@ -export([ upgrade_pre_ee/2, maybe_upgrade/1, - webhook_maybe_upgrade/1 + http_maybe_upgrade/1 ]). upgrade_pre_ee(undefined, _UpgradeFunc) -> @@ -40,10 +40,10 @@ maybe_upgrade(#{<<"connector">> := _} = Config0) -> maybe_upgrade(NewVersion) -> NewVersion. -webhook_maybe_upgrade(#{<<"direction">> := _} = Config0) -> +http_maybe_upgrade(#{<<"direction">> := _} = Config0) -> Config1 = maps:remove(<<"direction">>, Config0), Config1#{<<"resource_opts">> => default_resource_opts()}; -webhook_maybe_upgrade(NewVersion) -> +http_maybe_upgrade(NewVersion) -> NewVersion. binary_key({K, V}) -> diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index ff924ac8c..27b3a8f14 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -162,13 +162,14 @@ roots() -> [{bridges, ?HOCON(?R_REF(bridges), #{importance => ?IMPORTANCE_LOW})} fields(bridges) -> [ - {webhook, + {http, mk( hoconsc:map(name, ref(emqx_bridge_http_schema, "config")), #{ + aliases => [webhook], desc => ?DESC("bridges_webhook"), required => false, - converter => fun webhook_bridge_converter/2 + converter => fun http_bridge_converter/2 } )}, {mqtt, @@ -243,7 +244,7 @@ status() -> node_name() -> {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. -webhook_bridge_converter(Conf0, _HoconOpts) -> +http_bridge_converter(Conf0, _HoconOpts) -> emqx_bridge_compatible_config:upgrade_pre_ee( - Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 + Conf0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1 ). diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl index eef4c0efb..30107d0ce 100644 --- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -62,6 +62,7 @@ end_per_testcase(t_get_basic_usage_info_1, _Config) -> ok = emqx_bridge:remove(BridgeType, BridgeName) end, [ + %% Keep using the old bridge names to avoid breaking the tests {webhook, <<"basic_usage_info_webhook">>}, {webhook, <<"basic_usage_info_webhook_disabled">>}, {mqtt, <<"basic_usage_info_mqtt">>} @@ -92,7 +93,7 @@ t_get_basic_usage_info_1(_Config) -> #{ num_bridges => 3, count_by_type => #{ - webhook => 1, + http => 1, mqtt => 2 } }, @@ -123,12 +124,13 @@ setup_fake_telemetry_data() -> HTTPConfig = #{ url => <<"http://localhost:9901/messages/${topic}">>, enable => true, - local_topic => "emqx_webhook/#", + local_topic => "emqx_http/#", method => post, body => <<"${payload}">>, headers => #{}, request_timeout => "15s" }, + %% Keep use the old bridge names to test the backward compatibility {ok, _} = emqx_bridge_testlib:create_bridge_api( <<"webhook">>, <<"basic_usage_info_webhook">>, diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 339315941..92f71b2e6 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -1389,7 +1389,7 @@ validate_resource_request_ttl(single, Timeout, Name) -> begin {ok, Res} = ?wait_async_action( - emqx_bridge_v2:send_message(<<"webhook">>, Name, SentData, #{}), + do_send_message(?BRIDGE_TYPE_HTTP, Name, SentData), #{?snk_kind := async_query}, 1000 ), @@ -1404,6 +1404,10 @@ validate_resource_request_ttl(single, Timeout, Name) -> validate_resource_request_ttl(_Cluster, _Timeout, _Name) -> ignore. +do_send_message(BridgeV1Type, Name, Message) -> + Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), + emqx_bridge_v2:send_message(Type, Name, Message, #{}). + %% request(Method, URL, Config) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl index 540c18878..9530702bd 100644 --- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl @@ -84,7 +84,7 @@ up(#{<<"mqtt">> := MqttBridges0} = Bridges) -> Bridges#{<<"mqtt">> := MqttBridges}; up(#{<<"webhook">> := WebhookBridges0} = Bridges) -> WebhookBridges = emqx_bridge_compatible_config:upgrade_pre_ee( - WebhookBridges0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 + WebhookBridges0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1 ), Bridges#{<<"webhook">> := WebhookBridges}. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl index 19514927e..3b0543ace 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl @@ -34,9 +34,9 @@ bridge_v1_type_name() -> webhook. -action_type_name() -> webhook. +action_type_name() -> http. -connector_type_name() -> webhook. +connector_type_name() -> http. schema_module() -> emqx_bridge_http_schema. diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index a49a1b659..5ecfa76d1 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -46,7 +46,7 @@ namespace/0 ]). -%% for other webhook-like connectors. +%% for other http-like connectors. -export([redact_request/1]). -export([validate_method/1, join_paths/2]). @@ -836,7 +836,7 @@ maybe_retry({error, Reason}, Context, ReplyFunAndArgs) -> true -> Context; false -> Context#{attempt := Attempt + 1} end, - ?tp(webhook_will_retry_async, #{}), + ?tp(http_will_retry_async, #{}), Worker = resolve_pool_worker(State, KeyOrNum), ok = ehttpc:request_async( Worker, diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl index 703eb01ed..a9bd3e827 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl @@ -31,7 +31,7 @@ %%====================================================================================== %% Hocon Schema Definitions -namespace() -> "bridge_webhook". +namespace() -> "bridge_http". roots() -> []. @@ -40,7 +40,7 @@ roots() -> []. %% see: emqx_bridge_schema:get_response/0, put_request/0, post_request/0 fields("post") -> [ - type_field(), + old_type_field(), name_field() ] ++ fields("config"); fields("put") -> @@ -55,15 +55,16 @@ fields("config") -> %% v2: configuration fields(action) -> %% XXX: Do we need to rename it to `http`? - {webhook, + {http, mk( - hoconsc:map(name, ref(?MODULE, webhook_action)), + hoconsc:map(name, ref(?MODULE, http_action)), #{ + aliases => [webhook], desc => <<"HTTP Action Config">>, required => false } )}; -fields(webhook_action) -> +fields(http_action) -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {connector, @@ -83,7 +84,7 @@ fields(webhook_action) -> importance => ?IMPORTANCE_HIDDEN } )}, - %% Since e5.3.2, we split the webhook_bridge to two parts: a) connector. b) actions. + %% Since e5.3.2, we split the http bridge to two parts: a) connector. b) actions. %% some fields are moved to connector, some fields are moved to actions and composed into the %% `parameters` field. {parameters, @@ -91,7 +92,7 @@ fields(webhook_action) -> required => true, desc => ?DESC(parameters_opts) })} - ] ++ webhook_resource_opts(); + ] ++ http_resource_opts(); fields(parameters_opts) -> [ {path, @@ -119,7 +120,7 @@ fields("put_" ++ Type) -> fields("get_" ++ Type) -> emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type); fields("config_bridge_v2") -> - fields(webhook_action); + fields(http_action); fields("config_connector") -> [ {enable, @@ -165,7 +166,7 @@ basic_config() -> default => true } )} - ] ++ webhook_resource_opts() ++ connector_opts(). + ] ++ http_resource_opts() ++ connector_opts(). request_config() -> [ @@ -203,10 +204,21 @@ connector_url_headers() -> %%-------------------------------------------------------------------- %% common funcs +%% `webhook` is kept for backward compatibility. +old_type_field() -> + {type, + mk( + enum([webhook, http]), + #{ + required => true, + desc => ?DESC("desc_type") + } + )}. + type_field() -> {type, mk( - webhook, + http, #{ required => true, desc => ?DESC("desc_type") @@ -290,7 +302,7 @@ request_timeout_field() -> } )}. -webhook_resource_opts() -> +http_resource_opts() -> [ {resource_opts, mk( @@ -333,8 +345,8 @@ mark_request_field_deperecated(Fields) -> bridge_v2_examples(Method) -> [ #{ - <<"webhook">> => #{ - summary => <<"Webhook Action">>, + <<"http">> => #{ + summary => <<"HTTP Action">>, value => values({Method, bridge_v2}) } } @@ -343,8 +355,8 @@ bridge_v2_examples(Method) -> connector_examples(Method) -> [ #{ - <<"webhook">> => #{ - summary => <<"Webhook Connector">>, + <<"http">> => #{ + summary => <<"HTTP Connector">>, value => values({Method, connector}) } } @@ -366,16 +378,16 @@ values({get, Type}) -> values({post, bridge_v2}) -> maps:merge( #{ - name => <<"my_webhook_action">>, - type => <<"webhook">> + name => <<"my_http_action">>, + type => <<"http">> }, values({put, bridge_v2}) ); values({post, connector}) -> maps:merge( #{ - name => <<"my_webhook_connector">>, - type => <<"webhook">> + name => <<"my_http_connector">>, + type => <<"http">> }, values({put, connector}) ); @@ -386,7 +398,7 @@ values({put, connector}) -> values(bridge_v2) -> #{ enable => true, - connector => <<"my_webhook_connector">>, + connector => <<"my_http_connector">>, parameters => #{ path => <<"/room/${room_no}">>, method => <<"post">>, diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl index cc0f2046c..2ff7d184b 100644 --- a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl @@ -577,7 +577,7 @@ t_path_not_found(Config) -> ok end, fun(Trace) -> - ?assertEqual([], ?of_kind(webhook_will_retry_async, Trace)), + ?assertEqual([], ?of_kind(http_will_retry_async, Trace)), ok end ), @@ -618,7 +618,7 @@ t_too_many_requests(Config) -> ok end, fun(Trace) -> - ?assertMatch([_ | _], ?of_kind(webhook_will_retry_async, Trace)), + ?assertMatch([_ | _], ?of_kind(http_will_retry_async, Trace)), ok end ), @@ -731,7 +731,8 @@ t_bridge_probes_header_atoms(Config) -> %% helpers do_send_message(Message) -> - emqx_bridge_v2:send_message(?BRIDGE_TYPE, ?BRIDGE_NAME, Message, #{}). + Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(?BRIDGE_TYPE), + emqx_bridge_v2:send_message(Type, ?BRIDGE_NAME, Message, #{}). do_t_async_retries(TestCase, TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 58db17a03..a5b7692d7 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -137,7 +137,7 @@ param_path_id() -> #{ in => path, required => true, - example => <<"webhook:webhook_example">>, + example => <<"http:my_http_connector">>, desc => ?DESC("desc_param_path_id") } )}. diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 1d6ea072f..ff2790481 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -79,7 +79,7 @@ connector_impl_module(_ConnectorType) -> -endif. -connector_to_resource_type_ce(webhook) -> +connector_to_resource_type_ce(http) -> emqx_bridge_http_connector; connector_to_resource_type_ce(ConnectorType) -> error({no_bridge_v2, ConnectorType}). @@ -275,7 +275,7 @@ remove(Type, Name, _Conf, _Opts) -> %% convert connector configs to what the connector modules want parse_confs( - <<"webhook">>, + <<"http">>, _Name, #{ url := Url, diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 2b05c2328..890f84871 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -70,7 +70,7 @@ api_schemas(Method) -> [ %% We need to map the `type' field of a request (binary) to a %% connector schema module. - api_ref(emqx_bridge_http_schema, <<"webhook">>, Method ++ "_connector") + api_ref(emqx_bridge_http_schema, <<"http">>, Method ++ "_connector") ]. api_ref(Module, Type, Method) -> @@ -96,7 +96,7 @@ schema_modules() -> [emqx_bridge_http_schema]. -endif. -connector_type_to_bridge_types(webhook) -> [webhook]; +connector_type_to_bridge_types(http) -> [http, webhook]; connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]. @@ -379,10 +379,11 @@ roots() -> fields(connectors) -> [ - {webhook, + {http, mk( hoconsc:map(name, ref(emqx_bridge_http_schema, "config_connector")), #{ + alias => [webhook], desc => <<"HTTP Connector Config">>, required => false } From c8b5c51bbc019ee5fcdb4e85b62b063b9066aa58 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 24 Nov 2023 14:57:10 +0800 Subject: [PATCH 6/7] chore: fix failed test cases --- apps/emqx/test/emqx_cth_cluster.erl | 2 +- apps/emqx/test/emqx_cth_suite.erl | 2 -- apps/emqx_bridge/src/emqx_action_info.erl | 2 +- apps/emqx_bridge/src/emqx_bridge.erl | 2 +- apps/emqx_bridge/src/emqx_bridge_resource.erl | 4 ++-- .../emqx_bridge_compatible_config_tests.erl | 8 ++++---- .../src/emqx_bridge_http_schema.erl | 19 +++++++++++-------- .../test/emqx_mgmt_data_backup_SUITE.erl | 1 + .../test/emqx_resource_schema_tests.erl | 6 +++--- .../test/emqx_rule_engine_SUITE.erl | 2 +- .../test/emqx_telemetry_SUITE.erl | 2 +- rel/i18n/emqx_bridge_http_schema.hocon | 6 ++++++ 12 files changed, 32 insertions(+), 24 deletions(-) diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index b41586518..a47e96251 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -50,7 +50,7 @@ -define(APPS_CLUSTERING, [gen_rpc, mria, ekka]). -define(TIMEOUT_NODE_START_MS, 15000). --define(TIMEOUT_APPS_START_MS, 30000). +-define(TIMEOUT_APPS_START_MS, 60000). -define(TIMEOUT_NODE_STOP_S, 15). %% diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 4cba524ae..401d4f59d 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -453,8 +453,6 @@ stop_apps(Apps) -> %% -verify_clean_suite_state(#{allow_dirty_work_dir := true}) -> - ok; verify_clean_suite_state(#{work_dir := WorkDir}) -> {ok, []} = file:list_dir(WorkDir), false = emqx_schema_hooks:any_injections(), diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 7c246a797..3b5589921 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -77,7 +77,7 @@ hard_coded_action_info_modules_ee() -> -endif. hard_coded_action_info_modules_common() -> - []. + [emqx_bridge_http_action_info]. hard_coded_action_info_modules() -> hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee(). diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 569c1e75a..4156a37d1 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -357,7 +357,7 @@ get_metrics(Type, Name) -> maybe_upgrade(mqtt, Config) -> emqx_bridge_compatible_config:maybe_upgrade(Config); maybe_upgrade(webhook, Config) -> - emqx_bridge_compatible_config:webhook_maybe_upgrade(Config); + emqx_bridge_compatible_config:http_maybe_upgrade(Config); maybe_upgrade(_Other, Config) -> Config. diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index b3dec7905..7f58a880c 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -74,8 +74,8 @@ bridge_to_resource_type(BridgeType) -> bridge_impl_module(BridgeType) -> emqx_bridge_enterprise:bridge_impl_module(BridgeType). -else. -bridge_to_resource_type(BridgeType) when is_binary(Type) -> - bridge_to_resource_type(binary_to_existing_atom(Type, utf8)); +bridge_to_resource_type(BridgeType) when is_binary(BridgeType) -> + bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8)); bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(webhook) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl index 9530702bd..91c0a23d0 100644 --- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl @@ -21,7 +21,7 @@ empty_config_test() -> Conf1 = #{<<"bridges">> => #{}}, Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}}, ?assertEqual(Conf1, check(Conf1)), - ?assertEqual(Conf2, check(Conf2)), + ?assertEqual(#{<<"bridges">> => #{<<"http">> => #{}}}, check(Conf2)), ok. %% ensure webhook config can be checked @@ -33,7 +33,7 @@ webhook_config_test() -> ?assertMatch( #{ <<"bridges">> := #{ - <<"webhook">> := #{ + <<"http">> := #{ <<"the_name">> := #{ <<"method">> := get, @@ -48,7 +48,7 @@ webhook_config_test() -> ?assertMatch( #{ <<"bridges">> := #{ - <<"webhook">> := #{ + <<"http">> := #{ <<"the_name">> := #{ <<"method">> := get, @@ -61,7 +61,7 @@ webhook_config_test() -> ), #{ <<"bridges">> := #{ - <<"webhook">> := #{ + <<"http">> := #{ <<"the_name">> := #{ <<"method">> := get, diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl index a9bd3e827..958fef4ac 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl @@ -54,19 +54,18 @@ fields("config") -> %%-------------------------------------------------------------------- %% v2: configuration fields(action) -> - %% XXX: Do we need to rename it to `http`? {http, mk( - hoconsc:map(name, ref(?MODULE, http_action)), + hoconsc:map(name, ref(?MODULE, "http_action")), #{ aliases => [webhook], desc => <<"HTTP Action Config">>, required => false } )}; -fields(http_action) -> +fields("http_action") -> [ - {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {enable, mk(boolean(), #{desc => ?DESC("config_enable_bridge"), default => true})}, {connector, mk(binary(), #{ desc => ?DESC(emqx_connector_schema, "connector_field"), required => true @@ -88,12 +87,12 @@ fields(http_action) -> %% some fields are moved to connector, some fields are moved to actions and composed into the %% `parameters` field. {parameters, - mk(ref(parameters_opts), #{ + mk(ref("parameters_opts"), #{ required => true, - desc => ?DESC(parameters_opts) + desc => ?DESC("config_parameters_opts") })} ] ++ http_resource_opts(); -fields(parameters_opts) -> +fields("parameters_opts") -> [ {path, mk( @@ -120,7 +119,7 @@ fields("put_" ++ Type) -> fields("get_" ++ Type) -> emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type); fields("config_bridge_v2") -> - fields(http_action); + fields("http_action"); fields("config_connector") -> [ {enable, @@ -150,6 +149,10 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for WebHook using `", string:to_upper(Method), "` method."]; desc("config_connector") -> ?DESC("desc_config"); +desc("http_action") -> + ?DESC("desc_config"); +desc("parameters_opts") -> + ?DESC("config_parameters_opts"); desc(_) -> undefined. diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl index 46566bd6f..7809f8b3d 100644 --- a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl @@ -452,6 +452,7 @@ apps_to_start() -> emqx_modules, emqx_gateway, emqx_exhook, + emqx_bridge_http, emqx_bridge, emqx_auto_subscribe, diff --git a/apps/emqx_resource/test/emqx_resource_schema_tests.erl b/apps/emqx_resource/test/emqx_resource_schema_tests.erl index 78a761bd2..aac0a7d96 100644 --- a/apps/emqx_resource/test/emqx_resource_schema_tests.erl +++ b/apps/emqx_resource/test/emqx_resource_schema_tests.erl @@ -80,7 +80,7 @@ worker_pool_size_test_() -> Conf = emqx_utils_maps:deep_put( [ <<"bridges">>, - <<"webhook">>, + <<"http">>, <<"simple">>, <<"resource_opts">>, <<"worker_pool_size">> @@ -88,7 +88,7 @@ worker_pool_size_test_() -> BaseConf, WorkerPoolSize ), - #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := CheckedConf}}} = check(Conf), + #{<<"bridges">> := #{<<"http">> := #{<<"simple">> := CheckedConf}}} = check(Conf), #{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf, WPS end, @@ -117,7 +117,7 @@ worker_pool_size_test_() -> %%=========================================================================== parse_and_check_webhook_bridge(Hocon) -> - #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)), + #{<<"bridges">> := #{<<"http">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)), Conf. parse(Hocon) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 14682eff1..f3df46b80 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -3468,7 +3468,7 @@ t_get_basic_usage_info_1(_Config) -> referenced_bridges => #{ mqtt => 1, - webhook => 3 + http => 3 } }, emqx_rule_engine:get_basic_usage_info() diff --git a/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl b/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl index 91c0b3795..73b3e331f 100644 --- a/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl @@ -544,7 +544,7 @@ t_rule_engine_and_data_bridge_info(_Config) -> #{ data_bridge => #{ - webhook => #{num => 1, num_linked_by_rules => 3}, + http => #{num => 1, num_linked_by_rules => 3}, mqtt => #{num => 2, num_linked_by_rules => 2} }, num_data_bridges => 3 diff --git a/rel/i18n/emqx_bridge_http_schema.hocon b/rel/i18n/emqx_bridge_http_schema.hocon index 197ce0b36..416f77834 100644 --- a/rel/i18n/emqx_bridge_http_schema.hocon +++ b/rel/i18n/emqx_bridge_http_schema.hocon @@ -80,6 +80,12 @@ Template with variables is allowed in this option. For example, /room/{$ro config_path.label: """URL Path""" +config_parameters_opts.desc: +"""The parameters for HTTP action.""" + +config_parameters_opts.label: +"""Parameters""" + desc_config.desc: """Configuration for an HTTP bridge.""" From 891ecc179d755adfa5500d558657ad818fcd1f3a Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 28 Nov 2023 10:10:16 +0800 Subject: [PATCH 7/7] chore: fix flaky tests --- .../test/emqx_bridge_api_SUITE.erl | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 92f71b2e6..7b5208f06 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -111,7 +111,7 @@ groups() -> ]. suite() -> - [{timetrap, {seconds, 60}}]. + [{timetrap, {seconds, 120}}]. init_per_suite(Config) -> Config. @@ -1329,15 +1329,24 @@ t_cluster_later_join_metrics(Config) -> ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]), %% Check metrics; shouldn't crash even if the bridge is not %% ready on the node that just joined the cluster. + + %% assert: wait for the bridge to be ready on the other node. + fun + WaitConfSync(0) -> + throw(waiting_config_sync_timeout); + WaitConfSync(N) -> + timer:sleep(1000), + case erpc:call(OtherNode, emqx_bridge, list, []) of + [] -> WaitConfSync(N - 1); + [_] -> ok + end + end( + 60 + ), ?assertMatch( {ok, 200, #{ <<"metrics">> := #{<<"success">> := _}, - %% TODO: Why the node2 returns {error, bridge_not_found}? - %% ct:pal("node: ~p, bridges: ~p~n", [ - %% OtherNode, erpc:call(OtherNode, emqx_bridge, list, []) - %% ]), - %%<<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _] - <<"node_metrics">> := [#{<<"metrics">> := #{}} | _] + <<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _] }}, request_json(get, uri(["bridges", BridgeID, "metrics"]), Config) ),