feat: rename webhook bridge to http bridge

This commit is contained in:
JianBo He 2023-11-24 14:47:24 +08:00
parent dc99651690
commit cdb90ebe6b
15 changed files with 86 additions and 60 deletions

View File

@ -143,7 +143,7 @@ param_path_id() ->
#{ #{
in => path, in => path,
required => true, required => true,
example => <<"webhook:webhook_example">>, example => <<"http:http_example">>,
desc => ?DESC("desc_param_path_id") desc => ?DESC("desc_param_path_id")
} }
)}. )}.
@ -166,9 +166,9 @@ bridge_info_array_example(Method) ->
bridge_info_examples(Method) -> bridge_info_examples(Method) ->
maps:merge( maps:merge(
#{ #{
<<"webhook_example">> => #{ <<"http_example">> => #{
summary => <<"WebHook">>, summary => <<"HTTP">>,
value => info_example(webhook, Method) value => info_example(http, Method)
}, },
<<"mqtt_example">> => #{ <<"mqtt_example">> => #{
summary => <<"MQTT Bridge">>, summary => <<"MQTT Bridge">>,
@ -201,7 +201,7 @@ method_example(Type, Method) when Method == get; Method == post ->
method_example(_Type, put) -> method_example(_Type, put) ->
#{}. #{}.
info_example_basic(webhook) -> info_example_basic(http) ->
#{ #{
enable => true, enable => true,
url => <<"http://localhost:9901/messages/${topic}">>, url => <<"http://localhost:9901/messages/${topic}">>,
@ -212,7 +212,7 @@ info_example_basic(webhook) ->
pool_size => 4, pool_size => 4,
enable_pipelining => 100, enable_pipelining => 100,
ssl => #{enable => false}, ssl => #{enable => false},
local_topic => <<"emqx_webhook/#">>, local_topic => <<"emqx_http/#">>,
method => post, method => post,
body => <<"${payload}">>, body => <<"${payload}">>,
resource_opts => #{ resource_opts => #{

View File

@ -63,18 +63,23 @@
). ).
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(BridgeType) when is_binary(BridgeType) ->
bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8));
bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector; bridge_to_resource_type(mqtt) ->
bridge_to_resource_type(webhook) -> emqx_bridge_http_connector; emqx_bridge_mqtt_connector;
bridge_to_resource_type(BridgeType) -> emqx_bridge_enterprise:resource_type(BridgeType). bridge_to_resource_type(webhook) ->
emqx_bridge_http_connector;
bridge_to_resource_type(BridgeType) ->
emqx_bridge_enterprise:resource_type(BridgeType).
bridge_impl_module(BridgeType) -> emqx_bridge_enterprise:bridge_impl_module(BridgeType). bridge_impl_module(BridgeType) -> emqx_bridge_enterprise:bridge_impl_module(BridgeType).
-else. -else.
bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(BridgeType) when is_binary(Type) ->
bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(binary_to_existing_atom(Type, utf8));
bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector; bridge_to_resource_type(mqtt) ->
bridge_to_resource_type(webhook) -> emqx_bridge_http_connector. emqx_bridge_mqtt_connector;
bridge_to_resource_type(webhook) ->
emqx_bridge_http_connector.
bridge_impl_module(_BridgeType) -> undefined. bridge_impl_module(_BridgeType) -> undefined.
-endif. -endif.

View File

@ -110,7 +110,7 @@ param_path_id() ->
#{ #{
in => path, in => path,
required => true, required => true,
example => <<"webhook:webhook_example">>, example => <<"http:my_http_action">>,
desc => ?DESC("desc_param_path_id") desc => ?DESC("desc_param_path_id")
} }
)}. )}.

View File

@ -21,7 +21,7 @@
-export([ -export([
upgrade_pre_ee/2, upgrade_pre_ee/2,
maybe_upgrade/1, maybe_upgrade/1,
webhook_maybe_upgrade/1 http_maybe_upgrade/1
]). ]).
upgrade_pre_ee(undefined, _UpgradeFunc) -> upgrade_pre_ee(undefined, _UpgradeFunc) ->
@ -40,10 +40,10 @@ maybe_upgrade(#{<<"connector">> := _} = Config0) ->
maybe_upgrade(NewVersion) -> maybe_upgrade(NewVersion) ->
NewVersion. NewVersion.
webhook_maybe_upgrade(#{<<"direction">> := _} = Config0) -> http_maybe_upgrade(#{<<"direction">> := _} = Config0) ->
Config1 = maps:remove(<<"direction">>, Config0), Config1 = maps:remove(<<"direction">>, Config0),
Config1#{<<"resource_opts">> => default_resource_opts()}; Config1#{<<"resource_opts">> => default_resource_opts()};
webhook_maybe_upgrade(NewVersion) -> http_maybe_upgrade(NewVersion) ->
NewVersion. NewVersion.
binary_key({K, V}) -> binary_key({K, V}) ->

View File

@ -162,13 +162,14 @@ roots() -> [{bridges, ?HOCON(?R_REF(bridges), #{importance => ?IMPORTANCE_LOW})}
fields(bridges) -> fields(bridges) ->
[ [
{webhook, {http,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_http_schema, "config")), hoconsc:map(name, ref(emqx_bridge_http_schema, "config")),
#{ #{
aliases => [webhook],
desc => ?DESC("bridges_webhook"), desc => ?DESC("bridges_webhook"),
required => false, required => false,
converter => fun webhook_bridge_converter/2 converter => fun http_bridge_converter/2
} }
)}, )},
{mqtt, {mqtt,
@ -243,7 +244,7 @@ status() ->
node_name() -> node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
webhook_bridge_converter(Conf0, _HoconOpts) -> http_bridge_converter(Conf0, _HoconOpts) ->
emqx_bridge_compatible_config:upgrade_pre_ee( emqx_bridge_compatible_config:upgrade_pre_ee(
Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 Conf0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1
). ).

View File

@ -62,6 +62,7 @@ end_per_testcase(t_get_basic_usage_info_1, _Config) ->
ok = emqx_bridge:remove(BridgeType, BridgeName) ok = emqx_bridge:remove(BridgeType, BridgeName)
end, end,
[ [
%% Keep using the old bridge names to avoid breaking the tests
{webhook, <<"basic_usage_info_webhook">>}, {webhook, <<"basic_usage_info_webhook">>},
{webhook, <<"basic_usage_info_webhook_disabled">>}, {webhook, <<"basic_usage_info_webhook_disabled">>},
{mqtt, <<"basic_usage_info_mqtt">>} {mqtt, <<"basic_usage_info_mqtt">>}
@ -92,7 +93,7 @@ t_get_basic_usage_info_1(_Config) ->
#{ #{
num_bridges => 3, num_bridges => 3,
count_by_type => #{ count_by_type => #{
webhook => 1, http => 1,
mqtt => 2 mqtt => 2
} }
}, },
@ -123,12 +124,13 @@ setup_fake_telemetry_data() ->
HTTPConfig = #{ HTTPConfig = #{
url => <<"http://localhost:9901/messages/${topic}">>, url => <<"http://localhost:9901/messages/${topic}">>,
enable => true, enable => true,
local_topic => "emqx_webhook/#", local_topic => "emqx_http/#",
method => post, method => post,
body => <<"${payload}">>, body => <<"${payload}">>,
headers => #{}, headers => #{},
request_timeout => "15s" request_timeout => "15s"
}, },
%% Keep use the old bridge names to test the backward compatibility
{ok, _} = emqx_bridge_testlib:create_bridge_api( {ok, _} = emqx_bridge_testlib:create_bridge_api(
<<"webhook">>, <<"webhook">>,
<<"basic_usage_info_webhook">>, <<"basic_usage_info_webhook">>,

View File

@ -1389,7 +1389,7 @@ validate_resource_request_ttl(single, Timeout, Name) ->
begin begin
{ok, Res} = {ok, Res} =
?wait_async_action( ?wait_async_action(
emqx_bridge_v2:send_message(<<"webhook">>, Name, SentData, #{}), do_send_message(?BRIDGE_TYPE_HTTP, Name, SentData),
#{?snk_kind := async_query}, #{?snk_kind := async_query},
1000 1000
), ),
@ -1404,6 +1404,10 @@ validate_resource_request_ttl(single, Timeout, Name) ->
validate_resource_request_ttl(_Cluster, _Timeout, _Name) -> validate_resource_request_ttl(_Cluster, _Timeout, _Name) ->
ignore. ignore.
do_send_message(BridgeV1Type, Name, Message) ->
Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
emqx_bridge_v2:send_message(Type, Name, Message, #{}).
%% %%
request(Method, URL, Config) -> request(Method, URL, Config) ->

View File

@ -84,7 +84,7 @@ up(#{<<"mqtt">> := MqttBridges0} = Bridges) ->
Bridges#{<<"mqtt">> := MqttBridges}; Bridges#{<<"mqtt">> := MqttBridges};
up(#{<<"webhook">> := WebhookBridges0} = Bridges) -> up(#{<<"webhook">> := WebhookBridges0} = Bridges) ->
WebhookBridges = emqx_bridge_compatible_config:upgrade_pre_ee( WebhookBridges = emqx_bridge_compatible_config:upgrade_pre_ee(
WebhookBridges0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 WebhookBridges0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1
), ),
Bridges#{<<"webhook">> := WebhookBridges}. Bridges#{<<"webhook">> := WebhookBridges}.

View File

@ -34,9 +34,9 @@
bridge_v1_type_name() -> webhook. bridge_v1_type_name() -> webhook.
action_type_name() -> webhook. action_type_name() -> http.
connector_type_name() -> webhook. connector_type_name() -> http.
schema_module() -> emqx_bridge_http_schema. schema_module() -> emqx_bridge_http_schema.

View File

@ -46,7 +46,7 @@
namespace/0 namespace/0
]). ]).
%% for other webhook-like connectors. %% for other http-like connectors.
-export([redact_request/1]). -export([redact_request/1]).
-export([validate_method/1, join_paths/2]). -export([validate_method/1, join_paths/2]).
@ -836,7 +836,7 @@ maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
true -> Context; true -> Context;
false -> Context#{attempt := Attempt + 1} false -> Context#{attempt := Attempt + 1}
end, end,
?tp(webhook_will_retry_async, #{}), ?tp(http_will_retry_async, #{}),
Worker = resolve_pool_worker(State, KeyOrNum), Worker = resolve_pool_worker(State, KeyOrNum),
ok = ehttpc:request_async( ok = ehttpc:request_async(
Worker, Worker,

View File

@ -31,7 +31,7 @@
%%====================================================================================== %%======================================================================================
%% Hocon Schema Definitions %% Hocon Schema Definitions
namespace() -> "bridge_webhook". namespace() -> "bridge_http".
roots() -> []. roots() -> [].
@ -40,7 +40,7 @@ roots() -> [].
%% see: emqx_bridge_schema:get_response/0, put_request/0, post_request/0 %% see: emqx_bridge_schema:get_response/0, put_request/0, post_request/0
fields("post") -> fields("post") ->
[ [
type_field(), old_type_field(),
name_field() name_field()
] ++ fields("config"); ] ++ fields("config");
fields("put") -> fields("put") ->
@ -55,15 +55,16 @@ fields("config") ->
%% v2: configuration %% v2: configuration
fields(action) -> fields(action) ->
%% XXX: Do we need to rename it to `http`? %% XXX: Do we need to rename it to `http`?
{webhook, {http,
mk( mk(
hoconsc:map(name, ref(?MODULE, webhook_action)), hoconsc:map(name, ref(?MODULE, http_action)),
#{ #{
aliases => [webhook],
desc => <<"HTTP Action Config">>, desc => <<"HTTP Action Config">>,
required => false required => false
} }
)}; )};
fields(webhook_action) -> fields(http_action) ->
[ [
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{connector, {connector,
@ -83,7 +84,7 @@ fields(webhook_action) ->
importance => ?IMPORTANCE_HIDDEN importance => ?IMPORTANCE_HIDDEN
} }
)}, )},
%% Since e5.3.2, we split the webhook_bridge to two parts: a) connector. b) actions. %% Since e5.3.2, we split the http bridge to two parts: a) connector. b) actions.
%% some fields are moved to connector, some fields are moved to actions and composed into the %% some fields are moved to connector, some fields are moved to actions and composed into the
%% `parameters` field. %% `parameters` field.
{parameters, {parameters,
@ -91,7 +92,7 @@ fields(webhook_action) ->
required => true, required => true,
desc => ?DESC(parameters_opts) desc => ?DESC(parameters_opts)
})} })}
] ++ webhook_resource_opts(); ] ++ http_resource_opts();
fields(parameters_opts) -> fields(parameters_opts) ->
[ [
{path, {path,
@ -119,7 +120,7 @@ fields("put_" ++ Type) ->
fields("get_" ++ Type) -> fields("get_" ++ Type) ->
emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type); emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type);
fields("config_bridge_v2") -> fields("config_bridge_v2") ->
fields(webhook_action); fields(http_action);
fields("config_connector") -> fields("config_connector") ->
[ [
{enable, {enable,
@ -165,7 +166,7 @@ basic_config() ->
default => true default => true
} }
)} )}
] ++ webhook_resource_opts() ++ connector_opts(). ] ++ http_resource_opts() ++ connector_opts().
request_config() -> request_config() ->
[ [
@ -203,10 +204,21 @@ connector_url_headers() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% common funcs %% common funcs
%% `webhook` is kept for backward compatibility.
old_type_field() ->
{type,
mk(
enum([webhook, http]),
#{
required => true,
desc => ?DESC("desc_type")
}
)}.
type_field() -> type_field() ->
{type, {type,
mk( mk(
webhook, http,
#{ #{
required => true, required => true,
desc => ?DESC("desc_type") desc => ?DESC("desc_type")
@ -290,7 +302,7 @@ request_timeout_field() ->
} }
)}. )}.
webhook_resource_opts() -> http_resource_opts() ->
[ [
{resource_opts, {resource_opts,
mk( mk(
@ -333,8 +345,8 @@ mark_request_field_deperecated(Fields) ->
bridge_v2_examples(Method) -> bridge_v2_examples(Method) ->
[ [
#{ #{
<<"webhook">> => #{ <<"http">> => #{
summary => <<"Webhook Action">>, summary => <<"HTTP Action">>,
value => values({Method, bridge_v2}) value => values({Method, bridge_v2})
} }
} }
@ -343,8 +355,8 @@ bridge_v2_examples(Method) ->
connector_examples(Method) -> connector_examples(Method) ->
[ [
#{ #{
<<"webhook">> => #{ <<"http">> => #{
summary => <<"Webhook Connector">>, summary => <<"HTTP Connector">>,
value => values({Method, connector}) value => values({Method, connector})
} }
} }
@ -366,16 +378,16 @@ values({get, Type}) ->
values({post, bridge_v2}) -> values({post, bridge_v2}) ->
maps:merge( maps:merge(
#{ #{
name => <<"my_webhook_action">>, name => <<"my_http_action">>,
type => <<"webhook">> type => <<"http">>
}, },
values({put, bridge_v2}) values({put, bridge_v2})
); );
values({post, connector}) -> values({post, connector}) ->
maps:merge( maps:merge(
#{ #{
name => <<"my_webhook_connector">>, name => <<"my_http_connector">>,
type => <<"webhook">> type => <<"http">>
}, },
values({put, connector}) values({put, connector})
); );
@ -386,7 +398,7 @@ values({put, connector}) ->
values(bridge_v2) -> values(bridge_v2) ->
#{ #{
enable => true, enable => true,
connector => <<"my_webhook_connector">>, connector => <<"my_http_connector">>,
parameters => #{ parameters => #{
path => <<"/room/${room_no}">>, path => <<"/room/${room_no}">>,
method => <<"post">>, method => <<"post">>,

View File

@ -577,7 +577,7 @@ t_path_not_found(Config) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
?assertEqual([], ?of_kind(webhook_will_retry_async, Trace)), ?assertEqual([], ?of_kind(http_will_retry_async, Trace)),
ok ok
end end
), ),
@ -618,7 +618,7 @@ t_too_many_requests(Config) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
?assertMatch([_ | _], ?of_kind(webhook_will_retry_async, Trace)), ?assertMatch([_ | _], ?of_kind(http_will_retry_async, Trace)),
ok ok
end end
), ),
@ -731,7 +731,8 @@ t_bridge_probes_header_atoms(Config) ->
%% helpers %% helpers
do_send_message(Message) -> do_send_message(Message) ->
emqx_bridge_v2:send_message(?BRIDGE_TYPE, ?BRIDGE_NAME, Message, #{}). Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(?BRIDGE_TYPE),
emqx_bridge_v2:send_message(Type, ?BRIDGE_NAME, Message, #{}).
do_t_async_retries(TestCase, TestContext, Error, Fn) -> do_t_async_retries(TestCase, TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext, #{error_attempts := ErrorAttempts} = TestContext,

View File

@ -137,7 +137,7 @@ param_path_id() ->
#{ #{
in => path, in => path,
required => true, required => true,
example => <<"webhook:webhook_example">>, example => <<"http:my_http_connector">>,
desc => ?DESC("desc_param_path_id") desc => ?DESC("desc_param_path_id")
} }
)}. )}.

View File

@ -79,7 +79,7 @@ connector_impl_module(_ConnectorType) ->
-endif. -endif.
connector_to_resource_type_ce(webhook) -> connector_to_resource_type_ce(http) ->
emqx_bridge_http_connector; emqx_bridge_http_connector;
connector_to_resource_type_ce(ConnectorType) -> connector_to_resource_type_ce(ConnectorType) ->
error({no_bridge_v2, ConnectorType}). error({no_bridge_v2, ConnectorType}).
@ -275,7 +275,7 @@ remove(Type, Name, _Conf, _Opts) ->
%% convert connector configs to what the connector modules want %% convert connector configs to what the connector modules want
parse_confs( parse_confs(
<<"webhook">>, <<"http">>,
_Name, _Name,
#{ #{
url := Url, url := Url,

View File

@ -70,7 +70,7 @@ api_schemas(Method) ->
[ [
%% We need to map the `type' field of a request (binary) to a %% We need to map the `type' field of a request (binary) to a
%% connector schema module. %% connector schema module.
api_ref(emqx_bridge_http_schema, <<"webhook">>, Method ++ "_connector") api_ref(emqx_bridge_http_schema, <<"http">>, Method ++ "_connector")
]. ].
api_ref(Module, Type, Method) -> api_ref(Module, Type, Method) ->
@ -96,7 +96,7 @@ schema_modules() ->
[emqx_bridge_http_schema]. [emqx_bridge_http_schema].
-endif. -endif.
connector_type_to_bridge_types(webhook) -> [webhook]; connector_type_to_bridge_types(http) -> [http, webhook];
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]. connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer].
@ -379,10 +379,11 @@ roots() ->
fields(connectors) -> fields(connectors) ->
[ [
{webhook, {http,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_http_schema, "config_connector")), hoconsc:map(name, ref(emqx_bridge_http_schema, "config_connector")),
#{ #{
alias => [webhook],
desc => <<"HTTP Connector Config">>, desc => <<"HTTP Connector Config">>,
required => false required => false
} }