chore(http): break the bridge confs to connector and action parts

This commit is contained in:
JianBo He 2023-11-15 18:24:28 +08:00
parent 95c96e2a3a
commit f9a1e747fd
7 changed files with 369 additions and 101 deletions

View File

@ -3,7 +3,7 @@
{vsn, "0.1.5"},
{registered, []},
{applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]},
{env, []},
{env, [{emqx_action_info_module, emqx_bridge_http_action_info}]},
{modules, []},
{links, []}
]}.

View File

@ -0,0 +1,34 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_http_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
bridge_v1_type_name() -> webhook.
action_type_name() -> webhook.
connector_type_name() -> webhook.
schema_module() -> emqx_bridge_http_schema.

View File

@ -18,18 +18,26 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]).
-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
-export([roots/0, fields/1, namespace/0, desc/1]).
-export([
bridge_v2_examples/1,
%%conn_bridge_examples/1,
connector_examples/1
]).
%%======================================================================================
%% Hocon Schema Definitions
namespace() -> "bridge_webhook".
roots() -> [].
fields("config") ->
basic_config() ++ request_config();
%%--------------------------------------------------------------------
%% v1 bridges http api
%% see: emqx_bridge_schema:get_response/0, put_request/0, post_request/0
fields("post") ->
[
type_field(),
@ -39,48 +47,119 @@ fields("put") ->
fields("config");
fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post");
fields("creation_opts") ->
%%--- v1 bridges config file
%% see: emqx_bridge_schema:fields(bridges)
fields("config") ->
basic_config() ++ request_config();
%%--------------------------------------------------------------------
%% v2: configuration
fields(action) ->
%% XXX: Do we need to rename it to `http`?
{webhook,
mk(
hoconsc:map(name, ref(?MODULE, webhook_action)),
#{
desc => <<"HTTP Action Config">>,
required => false
}
)};
fields(webhook_action) ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{connector,
mk(binary(), #{
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
})},
{description, emqx_schema:description_schema()},
%% Note: there's an implicit convention in `emqx_bridge' that,
%% for egress bridges with this config, the published messages
%% will be forwarded to such bridges.
{local_topic,
mk(binary(), #{required => false, desc => ?DESC(emqx_bridge_kafka, mqtt_topic)})},
%% Since e5.3.2, we split the webhook_bridge to two parts: a) connector. b) actions.
%% some fields are moved to connector, some fields are moved to actions and composed into the
%% `parameters` field.
{parameters,
mk(ref(parameters_opts), #{
required => true,
desc => ?DESC(parameters_opts)
%% TODO:
%%validator => fun producer_strategy_key_validator/1
})}
] ++ webhook_resource_opts();
fields(parameters_opts) ->
[
{path,
mk(
binary(),
#{
desc => ?DESC("config_path"),
required => false
}
)},
method_field(),
headers_field(),
body_field()
];
%% v2: api schema
%% The parameter equls to
%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1
%% `get_connector`, `post_connector`, `put_connector` from emqx_connector_schema:api_schema/1
fields("post_" ++ Type) ->
[type_field(), name_field() | fields("config_" ++ Type)];
fields("put_" ++ Type) ->
fields("config_" ++ Type);
fields("get_" ++ Type) ->
emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type);
fields("config_bridge_v2") ->
fields(webhook_action);
fields("config_connector") ->
[
{enable,
mk(
boolean(),
#{
desc => <<"Enable or disable this connector">>,
default => true
}
)}
] ++ connector_opts_1() ++ connector_opts_0();
%%--------------------------------------------------------------------
%% v1/v2
fields("resource_opts") ->
UnsupportedOpts = [enable_batch, batch_size, batch_time],
lists:filter(
fun({K, _V}) ->
not lists:member(K, unsupported_opts())
end,
fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
emqx_resource_schema:fields("creation_opts")
).
desc("config") ->
?DESC("desc_config");
desc("creation_opts") ->
desc("resource_opts") ->
?DESC(emqx_resource_schema, "creation_opts");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for WebHook using `", string:to_upper(Method), "` method."];
desc(_) ->
undefined.
%%--------------------------------------------------------------------
%% helpers for v1 only
basic_config() ->
[
{enable,
mk(
boolean(),
#{
desc => ?DESC("config_enable"),
desc => ?DESC("config_enable_bridge"),
default => true
}
)}
] ++ webhook_creation_opts() ++
proplists:delete(
max_retries, emqx_bridge_http_connector:fields(config)
).
] ++ webhook_resource_opts() ++ connector_opts_0().
request_config() ->
[
{url,
mk(
binary(),
#{
required => true,
desc => ?DESC("config_url")
}
)},
url_field(),
{direction,
mk(
egress,
@ -98,36 +177,9 @@ request_config() ->
required => false
}
)},
{method,
mk(
method(),
#{
default => post,
desc => ?DESC("config_method")
}
)},
{headers,
mk(
map(),
#{
default => #{
<<"accept">> => <<"application/json">>,
<<"cache-control">> => <<"no-cache">>,
<<"connection">> => <<"keep-alive">>,
<<"content-type">> => <<"application/json">>,
<<"keep-alive">> => <<"timeout=5">>
},
desc => ?DESC("config_headers")
}
)},
{body,
mk(
binary(),
#{
default => undefined,
desc => ?DESC("config_body")
}
)},
method_field(),
headers_field(),
body_field(),
{max_retries,
mk(
non_neg_integer(),
@ -147,27 +199,14 @@ request_config() ->
)}
].
webhook_creation_opts() ->
[
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
].
%%--------------------------------------------------------------------
%% helpers for v2 only
unsupported_opts() ->
[
enable_batch,
batch_size,
batch_time
].
connector_opts_1() ->
[url_field(), headers_field()].
%%======================================================================================
%%--------------------------------------------------------------------
%% common funcs
type_field() ->
{type,
@ -189,5 +228,168 @@ name_field() ->
}
)}.
method() ->
enum([post, put, get, delete]).
url_field() ->
{url,
mk(
binary(),
#{
required => true,
desc => ?DESC("config_url")
}
)}.
headers_field() ->
{headers,
mk(
map(),
#{
default => #{
<<"accept">> => <<"application/json">>,
<<"cache-control">> => <<"no-cache">>,
<<"connection">> => <<"keep-alive">>,
<<"content-type">> => <<"application/json">>,
<<"keep-alive">> => <<"timeout=5">>
},
desc => ?DESC("config_headers")
}
)}.
method_field() ->
{method,
mk(
enum([post, put, get, delete]),
#{
default => post,
desc => ?DESC("config_method")
}
)}.
body_field() ->
{body,
mk(
binary(),
#{
default => undefined,
desc => ?DESC("config_body")
}
)}.
webhook_resource_opts() ->
[
{resource_opts,
mk(
ref(?MODULE, "resource_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
].
connector_opts_0() ->
mark_request_field_deperecated(
proplists:delete(max_retries, emqx_bridge_http_connector:fields(config))
).
mark_request_field_deperecated(Fields) ->
lists:map(
fun({K, V}) ->
case K of
request ->
{K, V#{
%% Note: if we want to deprecate a reference type, we have to change
%% it to a direct type first.
type => typerefl:map(),
deprecated => {since, "5.3.2"},
desc => <<"This field is never used, so we deprecated it since 5.3.2.">>
}};
_ ->
{K, V}
end
end,
Fields
).
%%--------------------------------------------------------------------
%% Examples
bridge_v2_examples(Method) ->
[
#{
<<"webhook">> => #{
summary => <<"Webhook Action">>,
value => values({Method, bridge_v2})
}
}
].
connector_examples(Method) ->
[
#{
<<"webhook">> => #{
summary => <<"Webhook Connector">>,
value => values({Method, connector})
}
}
].
values({get, Type}) ->
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
},
values({post, Type})
);
values({post, bridge_v2}) ->
maps:merge(
#{
name => <<"my_webhook_action">>,
type => <<"webhook">>
},
values({put, bridge_v2})
);
values({post, connector}) ->
maps:merge(
#{
name => <<"my_webhook_connector">>,
type => <<"webhook">>
},
values({put, connector})
);
values({put, bridge_v2}) ->
values(bridge_v2);
values({put, connector}) ->
values(connector);
values(bridge_v2) ->
#{
enable => true,
connector => <<"my_webhook_connector">>,
parameters => #{
path => <<"/room/${room_no}">>,
method => <<"post">>,
headers => #{},
body => <<"${.}">>
},
resource_opts => #{
worker_pool_size => 16,
health_check_interval => <<"15s">>,
query_mode => <<"async">>
}
};
values(connector) ->
#{
enable => true,
url => <<"http://localhost:8080/api/v1">>,
headers => #{<<"content-type">> => <<"application/json">>},
connect_timeout => <<"15s">>,
pool_type => <<"hash">>,
pool_size => 1,
enable_pipelining => 100
}.

View File

@ -158,17 +158,7 @@ connector_info_array_example(Method) ->
lists:map(fun(#{value := Config}) -> Config end, maps:values(connector_info_examples(Method))).
connector_info_examples(Method) ->
maps:merge(
#{},
emqx_enterprise_connector_examples(Method)
).
-if(?EMQX_RELEASE_EDITION == ee).
emqx_enterprise_connector_examples(Method) ->
emqx_connector_ee_schema:examples(Method).
-else.
emqx_enterprise_connector_examples(_Method) -> #{}.
-endif.
emqx_connector_schema:examples(Method).
schema("/connectors") ->
#{

View File

@ -15,7 +15,8 @@
-export([
api_schemas/1,
fields/1,
examples/1
%%examples/1
schema_modules/0
]).
resource_type(Type) when is_binary(Type) ->
@ -59,18 +60,6 @@ connector_structs() ->
)}
].
examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, connector_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
lists:foldl(Fun, #{}, schema_modules()).
schema_modules() ->
[
emqx_bridge_kafka,

View File

@ -33,8 +33,11 @@
-export([connector_type_to_bridge_types/1]).
-export([resource_opts_fields/0, resource_opts_fields/1]).
-export([examples/1]).
-if(?EMQX_RELEASE_EDITION == ee).
enterprise_api_schemas(Method) ->
%% We *must* do this to ensure the module is really loaded, especially when we use
@ -64,6 +67,37 @@ enterprise_fields_connectors() -> [].
-endif.
api_schemas(Method) ->
[
%% We need to map the `type' field of a request (binary) to a
%% connector schema module.
api_ref(emqx_bridge_http_schema, <<"webhook">>, Method ++ "_connector")
].
api_ref(Module, Type, Method) ->
{Type, ref(Module, Method)}.
examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, connector_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
lists:foldl(Fun, #{}, schema_modules()).
-if(?EMQX_RELEASE_EDITION == ee).
schema_modules() ->
[emqx_bridge_http_schema] ++ emqx_connector_ee_schema:schema_modules().
-else.
schema_modules() ->
[emqx_bridge_http_schema].
-endif.
connector_type_to_bridge_types(webhook) -> [webhook];
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer].
@ -298,8 +332,9 @@ post_request() ->
api_schema("post").
api_schema(Method) ->
CE = api_schemas(Method),
EE = enterprise_api_schemas(Method),
hoconsc:union(connector_api_union(EE)).
hoconsc:union(connector_api_union(CE ++ EE)).
connector_api_union(Refs) ->
Index = maps:from_list(Refs),
@ -344,7 +379,16 @@ roots() ->
end.
fields(connectors) ->
[] ++ enterprise_fields_connectors().
[
{webhook,
mk(
hoconsc:map(name, ref(emqx_bridge_http_schema, "config_connector")),
#{
desc => <<"HTTP Connector Config">>,
required => false
}
)}
] ++ enterprise_fields_connectors().
desc(connectors) ->
?DESC("desc_connectors");

View File

@ -18,10 +18,10 @@ config_direction.desc:
config_direction.label:
"""Bridge Direction"""
config_enable.desc:
config_enable_bridge.desc:
"""Enable or disable this bridge"""
config_enable.label:
config_enable_bridge.label:
"""Enable Or Disable Bridge"""
config_headers.desc:
@ -71,6 +71,15 @@ is not allowed."""
config_url.label:
"""HTTP Bridge"""
config_path.desc:
"""The URL path for this Action.<br/>
This path will be appended to the Connector's <code>url</code> configuration to form the full
URL address.
Template with variables is allowed in this option. For example, <code>/room/{$room_no}</code>"""
config_path.label:
"""URL Path"""
desc_config.desc:
"""Configuration for an HTTP bridge."""