Merge pull request #11976 from HJianBo/impl-http-bridge

Implement the Bridge V2 for HTTP Bridge
This commit is contained in:
JianBo He 2023-11-28 15:26:39 +08:00 committed by GitHub
commit 0aa3572d6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 927 additions and 368 deletions

View File

@ -50,7 +50,7 @@
-define(APPS_CLUSTERING, [gen_rpc, mria, ekka]). -define(APPS_CLUSTERING, [gen_rpc, mria, ekka]).
-define(TIMEOUT_NODE_START_MS, 15000). -define(TIMEOUT_NODE_START_MS, 15000).
-define(TIMEOUT_APPS_START_MS, 30000). -define(TIMEOUT_APPS_START_MS, 60000).
-define(TIMEOUT_NODE_STOP_S, 15). -define(TIMEOUT_NODE_STOP_S, 15).
%% %%

View File

@ -77,7 +77,7 @@ hard_coded_action_info_modules_ee() ->
-endif. -endif.
hard_coded_action_info_modules_common() -> hard_coded_action_info_modules_common() ->
[]. [emqx_bridge_http_action_info].
hard_coded_action_info_modules() -> hard_coded_action_info_modules() ->
hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee(). hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee().

View File

@ -357,7 +357,7 @@ get_metrics(Type, Name) ->
maybe_upgrade(mqtt, Config) -> maybe_upgrade(mqtt, Config) ->
emqx_bridge_compatible_config:maybe_upgrade(Config); emqx_bridge_compatible_config:maybe_upgrade(Config);
maybe_upgrade(webhook, Config) -> maybe_upgrade(webhook, Config) ->
emqx_bridge_compatible_config:webhook_maybe_upgrade(Config); emqx_bridge_compatible_config:http_maybe_upgrade(Config);
maybe_upgrade(_Other, Config) -> maybe_upgrade(_Other, Config) ->
Config. Config.

View File

@ -143,7 +143,7 @@ param_path_id() ->
#{ #{
in => path, in => path,
required => true, required => true,
example => <<"webhook:webhook_example">>, example => <<"http:http_example">>,
desc => ?DESC("desc_param_path_id") desc => ?DESC("desc_param_path_id")
} }
)}. )}.
@ -166,9 +166,9 @@ bridge_info_array_example(Method) ->
bridge_info_examples(Method) -> bridge_info_examples(Method) ->
maps:merge( maps:merge(
#{ #{
<<"webhook_example">> => #{ <<"http_example">> => #{
summary => <<"WebHook">>, summary => <<"HTTP">>,
value => info_example(webhook, Method) value => info_example(http, Method)
}, },
<<"mqtt_example">> => #{ <<"mqtt_example">> => #{
summary => <<"MQTT Bridge">>, summary => <<"MQTT Bridge">>,
@ -201,7 +201,7 @@ method_example(Type, Method) when Method == get; Method == post ->
method_example(_Type, put) -> method_example(_Type, put) ->
#{}. #{}.
info_example_basic(webhook) -> info_example_basic(http) ->
#{ #{
enable => true, enable => true,
url => <<"http://localhost:9901/messages/${topic}">>, url => <<"http://localhost:9901/messages/${topic}">>,
@ -212,7 +212,7 @@ info_example_basic(webhook) ->
pool_size => 4, pool_size => 4,
enable_pipelining => 100, enable_pipelining => 100,
ssl => #{enable => false}, ssl => #{enable => false},
local_topic => <<"emqx_webhook/#">>, local_topic => <<"emqx_http/#">>,
method => post, method => post,
body => <<"${payload}">>, body => <<"${payload}">>,
resource_opts => #{ resource_opts => #{
@ -650,7 +650,8 @@ create_or_update_bridge(BridgeType0, BridgeName, Conf, HttpStatusCode) ->
get_metrics_from_local_node(BridgeType0, BridgeName) -> get_metrics_from_local_node(BridgeType0, BridgeName) ->
BridgeType = upgrade_type(BridgeType0), BridgeType = upgrade_type(BridgeType0),
format_metrics(emqx_bridge:get_metrics(BridgeType, BridgeName)). MetricsResult = emqx_bridge:get_metrics(BridgeType, BridgeName),
format_metrics(MetricsResult).
'/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> '/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
?TRY_PARSE_ID( ?TRY_PARSE_ID(

View File

@ -63,18 +63,23 @@
). ).
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(BridgeType) when is_binary(BridgeType) ->
bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8));
bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector; bridge_to_resource_type(mqtt) ->
bridge_to_resource_type(webhook) -> emqx_bridge_http_connector; emqx_bridge_mqtt_connector;
bridge_to_resource_type(BridgeType) -> emqx_bridge_enterprise:resource_type(BridgeType). bridge_to_resource_type(webhook) ->
emqx_bridge_http_connector;
bridge_to_resource_type(BridgeType) ->
emqx_bridge_enterprise:resource_type(BridgeType).
bridge_impl_module(BridgeType) -> emqx_bridge_enterprise:bridge_impl_module(BridgeType). bridge_impl_module(BridgeType) -> emqx_bridge_enterprise:bridge_impl_module(BridgeType).
-else. -else.
bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(BridgeType) when is_binary(BridgeType) ->
bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8));
bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector; bridge_to_resource_type(mqtt) ->
bridge_to_resource_type(webhook) -> emqx_bridge_http_connector. emqx_bridge_mqtt_connector;
bridge_to_resource_type(webhook) ->
emqx_bridge_http_connector.
bridge_impl_module(_BridgeType) -> undefined. bridge_impl_module(_BridgeType) -> undefined.
-endif. -endif.
@ -309,6 +314,7 @@ remove(Type, Name, _Conf, _Opts) ->
emqx_resource:remove_local(resource_id(Type, Name)). emqx_resource:remove_local(resource_id(Type, Name)).
%% convert bridge configs to what the connector modules want %% convert bridge configs to what the connector modules want
%% TODO: remove it, if the http_bridge already ported to v2
parse_confs( parse_confs(
<<"webhook">>, <<"webhook">>,
_Name, _Name,

View File

@ -1163,7 +1163,7 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
%% If the bridge v2 does not exist, it is a valid bridge v1 %% If the bridge v2 does not exist, it is a valid bridge v1
PreviousRawConf = undefined, PreviousRawConf = undefined,
split_bridge_v1_config_and_create_helper( split_bridge_v1_config_and_create_helper(
BridgeV1Type, BridgeName, RawConf, PreviousRawConf BridgeV1Type, BridgeName, RawConf, PreviousRawConf, fun() -> ok end
); );
_Conf -> _Conf ->
case ?MODULE:bridge_v1_is_valid(BridgeV1Type, BridgeName) of case ?MODULE:bridge_v1_is_valid(BridgeV1Type, BridgeName) of
@ -1173,9 +1173,13 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
PreviousRawConf = emqx:get_raw_config( PreviousRawConf = emqx:get_raw_config(
[?ROOT_KEY, BridgeV2Type, BridgeName], undefined [?ROOT_KEY, BridgeV2Type, BridgeName], undefined
), ),
bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps), %% To avoid losing configurations. We have to make sure that no crash occurs
%% during deletion and creation of configurations.
PreCreateFun = fun() ->
bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps)
end,
split_bridge_v1_config_and_create_helper( split_bridge_v1_config_and_create_helper(
BridgeV1Type, BridgeName, RawConf, PreviousRawConf BridgeV1Type, BridgeName, RawConf, PreviousRawConf, PreCreateFun
); );
false -> false ->
%% If the bridge v2 exists, it is not a valid bridge v1 %% If the bridge v2 exists, it is not a valid bridge v1
@ -1183,16 +1187,49 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
end end
end. end.
split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf, PreviousRawConf) -> split_bridge_v1_config_and_create_helper(
#{ BridgeV1Type, BridgeName, RawConf, PreviousRawConf, PreCreateFun
connector_type := ConnectorType, ) ->
connector_name := NewConnectorName, try
connector_conf := NewConnectorRawConf, #{
bridge_v2_type := BridgeType, connector_type := ConnectorType,
bridge_v2_name := BridgeName, connector_name := NewConnectorName,
bridge_v2_conf := NewBridgeV2RawConf connector_conf := NewConnectorRawConf,
} = bridge_v2_type := BridgeType,
split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousRawConf), bridge_v2_name := BridgeName,
bridge_v2_conf := NewBridgeV2RawConf
} = split_and_validate_bridge_v1_config(
BridgeV1Type,
BridgeName,
RawConf,
PreviousRawConf
),
_ = PreCreateFun(),
do_connector_and_bridge_create(
ConnectorType,
NewConnectorName,
NewConnectorRawConf,
BridgeType,
BridgeName,
NewBridgeV2RawConf,
RawConf
)
catch
throw:Reason ->
{error, Reason}
end.
do_connector_and_bridge_create(
ConnectorType,
NewConnectorName,
NewConnectorRawConf,
BridgeType,
BridgeName,
NewBridgeV2RawConf,
RawConf
) ->
case emqx_connector:create(ConnectorType, NewConnectorName, NewConnectorRawConf) of case emqx_connector:create(ConnectorType, NewConnectorName, NewConnectorRawConf) of
{ok, _} -> {ok, _} ->
case create(BridgeType, BridgeName, NewBridgeV2RawConf) of case create(BridgeType, BridgeName, NewBridgeV2RawConf) of
@ -1308,15 +1345,20 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
RawConf = maps:without([<<"name">>], RawConfig0), RawConf = maps:without([<<"name">>], RawConfig0),
TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
PreviousRawConf = undefined, PreviousRawConf = undefined,
#{ try
connector_type := _ConnectorType, #{
connector_name := _NewConnectorName, connector_type := _ConnectorType,
connector_conf := ConnectorRawConf, connector_name := _NewConnectorName,
bridge_v2_type := BridgeV2Type, connector_conf := ConnectorRawConf,
bridge_v2_name := _BridgeName, bridge_v2_type := BridgeV2Type,
bridge_v2_conf := BridgeV2RawConf bridge_v2_name := _BridgeName,
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf), bridge_v2_conf := BridgeV2RawConf
create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf). } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf)
catch
throw:Reason ->
{error, Reason}
end.
bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) -> bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),

View File

@ -110,7 +110,7 @@ param_path_id() ->
#{ #{
in => path, in => path,
required => true, required => true,
example => <<"webhook:webhook_example">>, example => <<"http:my_http_action">>,
desc => ?DESC("desc_param_path_id") desc => ?DESC("desc_param_path_id")
} }
)}. )}.

View File

@ -21,7 +21,7 @@
-export([ -export([
upgrade_pre_ee/2, upgrade_pre_ee/2,
maybe_upgrade/1, maybe_upgrade/1,
webhook_maybe_upgrade/1 http_maybe_upgrade/1
]). ]).
upgrade_pre_ee(undefined, _UpgradeFunc) -> upgrade_pre_ee(undefined, _UpgradeFunc) ->
@ -40,10 +40,10 @@ maybe_upgrade(#{<<"connector">> := _} = Config0) ->
maybe_upgrade(NewVersion) -> maybe_upgrade(NewVersion) ->
NewVersion. NewVersion.
webhook_maybe_upgrade(#{<<"direction">> := _} = Config0) -> http_maybe_upgrade(#{<<"direction">> := _} = Config0) ->
Config1 = maps:remove(<<"direction">>, Config0), Config1 = maps:remove(<<"direction">>, Config0),
Config1#{<<"resource_opts">> => default_resource_opts()}; Config1#{<<"resource_opts">> => default_resource_opts()};
webhook_maybe_upgrade(NewVersion) -> http_maybe_upgrade(NewVersion) ->
NewVersion. NewVersion.
binary_key({K, V}) -> binary_key({K, V}) ->

View File

@ -162,13 +162,14 @@ roots() -> [{bridges, ?HOCON(?R_REF(bridges), #{importance => ?IMPORTANCE_LOW})}
fields(bridges) -> fields(bridges) ->
[ [
{webhook, {http,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_http_schema, "config")), hoconsc:map(name, ref(emqx_bridge_http_schema, "config")),
#{ #{
aliases => [webhook],
desc => ?DESC("bridges_webhook"), desc => ?DESC("bridges_webhook"),
required => false, required => false,
converter => fun webhook_bridge_converter/2 converter => fun http_bridge_converter/2
} }
)}, )},
{mqtt, {mqtt,
@ -243,7 +244,7 @@ status() ->
node_name() -> node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
webhook_bridge_converter(Conf0, _HoconOpts) -> http_bridge_converter(Conf0, _HoconOpts) ->
emqx_bridge_compatible_config:upgrade_pre_ee( emqx_bridge_compatible_config:upgrade_pre_ee(
Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 Conf0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1
). ).

View File

@ -30,14 +30,18 @@ init_per_suite(Config) ->
[ [
emqx, emqx,
emqx_conf, emqx_conf,
emqx_connector,
emqx_bridge_http,
emqx_bridge emqx_bridge
], ],
#{work_dir => ?config(priv_dir, Config)} #{work_dir => ?config(priv_dir, Config)}
), ),
emqx_mgmt_api_test_util:init_suite(),
[{apps, Apps} | Config]. [{apps, Apps} | Config].
end_per_suite(Config) -> end_per_suite(Config) ->
Apps = ?config(apps, Config), Apps = ?config(apps, Config),
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_cth_suite:stop(Apps), ok = emqx_cth_suite:stop(Apps),
ok. ok.
@ -58,6 +62,7 @@ end_per_testcase(t_get_basic_usage_info_1, _Config) ->
ok = emqx_bridge:remove(BridgeType, BridgeName) ok = emqx_bridge:remove(BridgeType, BridgeName)
end, end,
[ [
%% Keep using the old bridge names to avoid breaking the tests
{webhook, <<"basic_usage_info_webhook">>}, {webhook, <<"basic_usage_info_webhook">>},
{webhook, <<"basic_usage_info_webhook_disabled">>}, {webhook, <<"basic_usage_info_webhook_disabled">>},
{mqtt, <<"basic_usage_info_mqtt">>} {mqtt, <<"basic_usage_info_mqtt">>}
@ -88,7 +93,7 @@ t_get_basic_usage_info_1(_Config) ->
#{ #{
num_bridges => 3, num_bridges => 3,
count_by_type => #{ count_by_type => #{
webhook => 1, http => 1,
mqtt => 2 mqtt => 2
} }
}, },
@ -119,40 +124,33 @@ setup_fake_telemetry_data() ->
HTTPConfig = #{ HTTPConfig = #{
url => <<"http://localhost:9901/messages/${topic}">>, url => <<"http://localhost:9901/messages/${topic}">>,
enable => true, enable => true,
local_topic => "emqx_webhook/#", local_topic => "emqx_http/#",
method => post, method => post,
body => <<"${payload}">>, body => <<"${payload}">>,
headers => #{}, headers => #{},
request_timeout => "15s" request_timeout => "15s"
}, },
Conf = %% Keep use the old bridge names to test the backward compatibility
#{ {ok, _} = emqx_bridge_testlib:create_bridge_api(
<<"bridges">> => <<"webhook">>,
#{ <<"basic_usage_info_webhook">>,
<<"webhook">> => HTTPConfig
#{ ),
<<"basic_usage_info_webhook">> => HTTPConfig, {ok, _} = emqx_bridge_testlib:create_bridge_api(
<<"basic_usage_info_webhook_disabled">> => <<"webhook">>,
HTTPConfig#{enable => false} <<"basic_usage_info_webhook_disabled">>,
}, HTTPConfig#{enable => false}
<<"mqtt">> => ),
#{ {ok, _} = emqx_bridge_testlib:create_bridge_api(
<<"basic_usage_info_mqtt">> => MQTTConfig1, <<"mqtt">>,
<<"basic_usage_info_mqtt_from_select">> => MQTTConfig2 <<"basic_usage_info_mqtt">>,
} MQTTConfig1
} ),
}, {ok, _} = emqx_bridge_testlib:create_bridge_api(
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf), <<"mqtt">>,
<<"basic_usage_info_mqtt_from_select">>,
ok = snabbkaffe:start_trace(), MQTTConfig2
Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_loaded end, ),
NEvents = 3,
BackInTime = 0,
Timeout = 11_000,
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, NEvents, Timeout, BackInTime),
ok = emqx_bridge:load(),
{ok, _} = snabbkaffe_collector:receive_events(Sub),
ok = snabbkaffe:stop(),
ok. ok.
t_update_ssl_conf(Config) -> t_update_ssl_conf(Config) ->

View File

@ -78,6 +78,9 @@
emqx_auth, emqx_auth,
emqx_auth_mnesia, emqx_auth_mnesia,
emqx_management, emqx_management,
emqx_connector,
emqx_bridge_http,
emqx_bridge,
{emqx_rule_engine, "rule_engine { rules {} }"}, {emqx_rule_engine, "rule_engine { rules {} }"},
{emqx_bridge, "bridges {}"} {emqx_bridge, "bridges {}"}
]). ]).
@ -108,7 +111,7 @@ groups() ->
]. ].
suite() -> suite() ->
[{timetrap, {seconds, 60}}]. [{timetrap, {seconds, 120}}].
init_per_suite(Config) -> init_per_suite(Config) ->
Config. Config.
@ -407,10 +410,7 @@ t_http_crud_apis(Config) ->
Config Config
), ),
?assertMatch( ?assertMatch(
#{ #{<<"reason">> := <<"required_field">>},
<<"reason">> := <<"unknown_fields">>,
<<"unknown">> := <<"curl">>
},
json(maps:get(<<"message">>, PutFail2)) json(maps:get(<<"message">>, PutFail2))
), ),
{ok, 400, _} = request_json( {ok, 400, _} = request_json(
@ -419,12 +419,16 @@ t_http_crud_apis(Config) ->
?HTTP_BRIDGE(<<"localhost:1234/foo">>, Name), ?HTTP_BRIDGE(<<"localhost:1234/foo">>, Name),
Config Config
), ),
{ok, 400, _} = request_json( {ok, 400, PutFail3} = request_json(
put, put,
uri(["bridges", BridgeID]), uri(["bridges", BridgeID]),
?HTTP_BRIDGE(<<"htpp://localhost:12341234/foo">>, Name), ?HTTP_BRIDGE(<<"htpp://localhost:12341234/foo">>, Name),
Config Config
), ),
?assertMatch(
#{<<"kind">> := <<"validation_error">>},
json(maps:get(<<"message">>, PutFail3))
),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
@ -463,7 +467,7 @@ t_http_crud_apis(Config) ->
), ),
%% Create non working bridge %% Create non working bridge
BrokenURL = ?URL(Port + 1, "/foo"), BrokenURL = ?URL(Port + 1, "foo"),
{ok, 201, BrokenBridge} = request( {ok, 201, BrokenBridge} = request(
post, post,
uri(["bridges"]), uri(["bridges"]),
@ -471,6 +475,7 @@ t_http_crud_apis(Config) ->
fun json/1, fun json/1,
Config Config
), ),
?assertMatch( ?assertMatch(
#{ #{
<<"type">> := ?BRIDGE_TYPE_HTTP, <<"type">> := ?BRIDGE_TYPE_HTTP,
@ -1307,6 +1312,7 @@ t_cluster_later_join_metrics(Config) ->
Name = ?BRIDGE_NAME, Name = ?BRIDGE_NAME,
BridgeParams = ?HTTP_BRIDGE(URL1, Name), BridgeParams = ?HTTP_BRIDGE(URL1, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
?check_trace( ?check_trace(
begin begin
%% Create a bridge on only one of the nodes. %% Create a bridge on only one of the nodes.
@ -1323,6 +1329,20 @@ t_cluster_later_join_metrics(Config) ->
ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]), ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]),
%% Check metrics; shouldn't crash even if the bridge is not %% Check metrics; shouldn't crash even if the bridge is not
%% ready on the node that just joined the cluster. %% ready on the node that just joined the cluster.
%% assert: wait for the bridge to be ready on the other node.
fun
WaitConfSync(0) ->
throw(waiting_config_sync_timeout);
WaitConfSync(N) ->
timer:sleep(1000),
case erpc:call(OtherNode, emqx_bridge, list, []) of
[] -> WaitConfSync(N - 1);
[_] -> ok
end
end(
60
),
?assertMatch( ?assertMatch(
{ok, 200, #{ {ok, 200, #{
<<"metrics">> := #{<<"success">> := _}, <<"metrics">> := #{<<"success">> := _},
@ -1373,17 +1393,16 @@ t_create_with_bad_name(Config) ->
validate_resource_request_ttl(single, Timeout, Name) -> validate_resource_request_ttl(single, Timeout, Name) ->
SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000}, SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), _BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name),
?check_trace( ?check_trace(
begin begin
{ok, Res} = {ok, Res} =
?wait_async_action( ?wait_async_action(
emqx_bridge:send_message(BridgeID, SentData), do_send_message(?BRIDGE_TYPE_HTTP, Name, SentData),
#{?snk_kind := async_query}, #{?snk_kind := async_query},
1000 1000
), ),
?assertMatch({ok, #{id := ResId, query_opts := #{timeout := Timeout}}}, Res) ?assertMatch({ok, #{id := _ResId, query_opts := #{timeout := Timeout}}}, Res)
end, end,
fun(Trace0) -> fun(Trace0) ->
Trace = ?of_kind(async_query, Trace0), Trace = ?of_kind(async_query, Trace0),
@ -1394,6 +1413,10 @@ validate_resource_request_ttl(single, Timeout, Name) ->
validate_resource_request_ttl(_Cluster, _Timeout, _Name) -> validate_resource_request_ttl(_Cluster, _Timeout, _Name) ->
ignore. ignore.
do_send_message(BridgeV1Type, Name, Message) ->
Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
emqx_bridge_v2:send_message(Type, Name, Message, #{}).
%% %%
request(Method, URL, Config) -> request(Method, URL, Config) ->

View File

@ -21,7 +21,7 @@ empty_config_test() ->
Conf1 = #{<<"bridges">> => #{}}, Conf1 = #{<<"bridges">> => #{}},
Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}}, Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}},
?assertEqual(Conf1, check(Conf1)), ?assertEqual(Conf1, check(Conf1)),
?assertEqual(Conf2, check(Conf2)), ?assertEqual(#{<<"bridges">> => #{<<"http">> => #{}}}, check(Conf2)),
ok. ok.
%% ensure webhook config can be checked %% ensure webhook config can be checked
@ -33,7 +33,7 @@ webhook_config_test() ->
?assertMatch( ?assertMatch(
#{ #{
<<"bridges">> := #{ <<"bridges">> := #{
<<"webhook">> := #{ <<"http">> := #{
<<"the_name">> := <<"the_name">> :=
#{ #{
<<"method">> := get, <<"method">> := get,
@ -48,7 +48,7 @@ webhook_config_test() ->
?assertMatch( ?assertMatch(
#{ #{
<<"bridges">> := #{ <<"bridges">> := #{
<<"webhook">> := #{ <<"http">> := #{
<<"the_name">> := <<"the_name">> :=
#{ #{
<<"method">> := get, <<"method">> := get,
@ -61,7 +61,7 @@ webhook_config_test() ->
), ),
#{ #{
<<"bridges">> := #{ <<"bridges">> := #{
<<"webhook">> := #{ <<"http">> := #{
<<"the_name">> := <<"the_name">> :=
#{ #{
<<"method">> := get, <<"method">> := get,
@ -84,7 +84,7 @@ up(#{<<"mqtt">> := MqttBridges0} = Bridges) ->
Bridges#{<<"mqtt">> := MqttBridges}; Bridges#{<<"mqtt">> := MqttBridges};
up(#{<<"webhook">> := WebhookBridges0} = Bridges) -> up(#{<<"webhook">> := WebhookBridges0} = Bridges) ->
WebhookBridges = emqx_bridge_compatible_config:upgrade_pre_ee( WebhookBridges = emqx_bridge_compatible_config:upgrade_pre_ee(
WebhookBridges0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 WebhookBridges0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1
), ),
Bridges#{<<"webhook">> := WebhookBridges}. Bridges#{<<"webhook">> := WebhookBridges}.

View File

@ -92,7 +92,7 @@ end_per_testcase(_Testcase, Config) ->
delete_all_bridges() -> delete_all_bridges() ->
lists:foreach( lists:foreach(
fun(#{name := Name, type := Type}) -> fun(#{name := Name, type := Type}) ->
emqx_bridge:remove(Type, Name) ok = emqx_bridge:remove(Type, Name)
end, end,
emqx_bridge:list() emqx_bridge:list()
). ).

View File

@ -3,7 +3,7 @@
{vsn, "0.1.5"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]}, {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]},
{env, []}, {env, [{emqx_action_info_modules, [emqx_bridge_http_action_info]}]},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -0,0 +1,102 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_http_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0,
connector_action_config_to_bridge_v1_config/2,
bridge_v1_config_to_action_config/2,
bridge_v1_config_to_connector_config/1
]).
-define(REMOVED_KEYS, [<<"direction">>]).
-define(ACTION_KEYS, [<<"local_topic">>, <<"resource_opts">>]).
-define(PARAMETER_KEYS, [<<"body">>, <<"max_retries">>, <<"method">>, <<"request_timeout">>]).
bridge_v1_type_name() -> webhook.
action_type_name() -> http.
connector_type_name() -> http.
schema_module() -> emqx_bridge_http_schema.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
%% Move parameters to the top level
ParametersMap1 = maps:get(<<"parameters">>, BridgeV1Config1, #{}),
ParametersMap2 = maps:without([<<"path">>, <<"headers">>], ParametersMap1),
BridgeV1Config2 = maps:remove(<<"parameters">>, BridgeV1Config1),
BridgeV1Config3 = emqx_utils_maps:deep_merge(BridgeV1Config2, ParametersMap2),
BridgeV1Config4 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config3),
Url = maps:get(<<"url">>, ConnectorConfig),
Path = maps:get(<<"path">>, ParametersMap1, <<>>),
Headers1 = maps:get(<<"headers">>, ConnectorConfig, #{}),
Headers2 = maps:get(<<"headers">>, ParametersMap1, #{}),
Url1 =
case Path of
<<>> -> Url;
_ -> emqx_bridge_http_connector:join_paths(Url, Path)
end,
BridgeV1Config4#{
<<"headers">> => maps:merge(Headers1, Headers2),
<<"url">> => Url1
}.
bridge_v1_config_to_connector_config(BridgeV1Conf) ->
%% To statisfy the emqx_bridge_api_SUITE:t_http_crud_apis/1
ok = validate_webhook_url(maps:get(<<"url">>, BridgeV1Conf, undefined)),
maps:without(?REMOVED_KEYS ++ ?ACTION_KEYS ++ ?PARAMETER_KEYS, BridgeV1Conf).
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
Parameters = maps:with(?PARAMETER_KEYS, BridgeV1Conf),
Parameters1 = Parameters#{<<"path">> => <<>>},
CommonKeys = [<<"enable">>, <<"description">>],
ActionConfig = maps:with(?ACTION_KEYS ++ CommonKeys, BridgeV1Conf),
ActionConfig#{<<"parameters">> => Parameters1, <<"connector">> => ConnectorName}.
%%--------------------------------------------------------------------
%% helpers
validate_webhook_url(undefined) ->
throw(#{
kind => validation_error,
reason => required_field,
required_field => <<"url">>
});
validate_webhook_url(Url) ->
{BaseUrl, _Path} = emqx_connector_resource:parse_url(Url),
case emqx_http_lib:uri_parse(BaseUrl) of
{ok, _} ->
ok;
{error, Reason} ->
throw(#{
kind => validation_error,
reason => invalid_url,
url => Url,
error => emqx_utils:readable_error_msg(Reason)
})
end.

View File

@ -31,9 +31,14 @@
on_query/3, on_query/3,
on_query_async/4, on_query_async/4,
on_get_status/2, on_get_status/2,
reply_delegator/3 on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]). ]).
-export([reply_delegator/3]).
-export([ -export([
roots/0, roots/0,
fields/1, fields/1,
@ -41,7 +46,7 @@
namespace/0 namespace/0
]). ]).
%% for other webhook-like connectors. %% for other http-like connectors.
-export([redact_request/1]). -export([redact_request/1]).
-export([validate_method/1, join_paths/2]). -export([validate_method/1, join_paths/2]).
@ -251,6 +256,21 @@ start_pool(PoolName, PoolOpts) ->
Error Error
end. end.
on_add_channel(
_InstId,
OldState,
ActionId,
ActionConfig
) ->
InstalledActions = maps:get(installed_actions, OldState, #{}),
{ok, ActionState} = do_create_http_action(ActionConfig),
NewInstalledActions = maps:put(ActionId, ActionState, InstalledActions),
NewState = maps:put(installed_actions, NewInstalledActions, OldState),
{ok, NewState}.
do_create_http_action(_ActionConfig = #{parameters := Params}) ->
{ok, preprocess_request(Params)}.
on_stop(InstId, _State) -> on_stop(InstId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_http_connector", msg => "stopping_http_connector",
@ -260,6 +280,16 @@ on_stop(InstId, _State) ->
?tp(emqx_connector_http_stopped, #{instance_id => InstId}), ?tp(emqx_connector_http_stopped, #{instance_id => InstId}),
Res. Res.
on_remove_channel(
_InstId,
OldState = #{installed_actions := InstalledActions},
ActionId
) ->
NewInstalledActions = maps:remove(ActionId, InstalledActions),
NewState = maps:put(installed_actions, NewInstalledActions, OldState),
{ok, NewState}.
%% BridgeV1 entrypoint
on_query(InstId, {send_message, Msg}, State) -> on_query(InstId, {send_message, Msg}, State) ->
case maps:get(request, State, undefined) of case maps:get(request, State, undefined) of
undefined -> undefined ->
@ -282,6 +312,36 @@ on_query(InstId, {send_message, Msg}, State) ->
State State
) )
end; end;
%% BridgeV2 entrypoint
on_query(
InstId,
{ActionId, Msg},
State = #{installed_actions := InstalledActions}
) when is_binary(ActionId) ->
case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
{undefined, _} ->
?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
{error, arg_request_not_found};
{_, undefined} ->
?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}),
{error, action_not_found};
{Request, ActionState} ->
#{
method := Method,
path := Path,
body := Body,
headers := Headers,
request_timeout := Timeout
} = process_request_and_action(Request, ActionState, Msg),
%% bridge buffer worker has retry, do not let ehttpc retry
Retry = 2,
ClientId = maps:get(clientid, Msg, undefined),
on_query(
InstId,
{ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
State
)
end;
on_query(InstId, {Method, Request}, State) -> on_query(InstId, {Method, Request}, State) ->
%% TODO: Get retry from State %% TODO: Get retry from State
on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State); on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State);
@ -343,6 +403,7 @@ on_query(
Result Result
end. end.
%% BridgeV1 entrypoint
on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
case maps:get(request, State, undefined) of case maps:get(request, State, undefined) of
undefined -> undefined ->
@ -364,6 +425,36 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
State State
) )
end; end;
%% BridgeV2 entrypoint
on_query_async(
InstId,
{ActionId, Msg},
ReplyFunAndArgs,
State = #{installed_actions := InstalledActions}
) when is_binary(ActionId) ->
case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
{undefined, _} ->
?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
{error, arg_request_not_found};
{_, undefined} ->
?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}),
{error, action_not_found};
{Request, ActionState} ->
#{
method := Method,
path := Path,
body := Body,
headers := Headers,
request_timeout := Timeout
} = process_request_and_action(Request, ActionState, Msg),
ClientId = maps:get(clientid, Msg, undefined),
on_query_async(
InstId,
{ClientId, Method, {Path, Headers, Body}, Timeout},
ReplyFunAndArgs,
State
)
end;
on_query_async( on_query_async(
InstId, InstId,
{KeyOrNum, Method, Request, Timeout}, {KeyOrNum, Method, Request, Timeout},
@ -411,6 +502,9 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->
ehttpc_pool:pick_worker(PoolName, Key) ehttpc_pool:pick_worker(PoolName, Key)
end. end.
on_get_channels(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId).
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of case do_get_status(PoolName, Timeout) of
ok -> ok ->
@ -456,6 +550,14 @@ do_get_status(PoolName, Timeout) ->
{error, timeout} {error, timeout}
end. end.
on_get_channel_status(
InstId,
_ChannelId,
State
) ->
%% XXX: Reuse the connector status
on_get_status(InstId, State).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -466,10 +568,10 @@ preprocess_request(Req) when map_size(Req) == 0 ->
preprocess_request( preprocess_request(
#{ #{
method := Method, method := Method,
path := Path, path := Path
headers := Headers
} = Req } = Req
) -> ) ->
Headers = maps:get(headers, Req, []),
#{ #{
method => parse_template(to_bin(Method)), method => parse_template(to_bin(Method)),
path => parse_template(Path), path => parse_template(Path),
@ -529,6 +631,49 @@ maybe_parse_template(Key, Conf) ->
parse_template(String) -> parse_template(String) ->
emqx_template:parse(String). emqx_template:parse(String).
process_request_and_action(Request, ActionState, Msg) ->
MethodTemplate = maps:get(method, ActionState),
Method = make_method(render_template_string(MethodTemplate, Msg)),
BodyTemplate = maps:get(body, ActionState),
Body = render_request_body(BodyTemplate, Msg),
PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)),
PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)),
Path =
case PathSuffix of
"" -> PathPrefix;
_ -> join_paths(PathPrefix, PathSuffix)
end,
HeadersTemplate1 = maps:get(headers, Request),
HeadersTemplate2 = maps:get(headers, ActionState),
Headers = merge_proplist(
render_headers(HeadersTemplate1, Msg),
render_headers(HeadersTemplate2, Msg)
),
#{
method => Method,
path => Path,
body => Body,
headers => Headers,
request_timeout => maps:get(request_timeout, ActionState)
}.
merge_proplist(Proplist1, Proplist2) ->
lists:foldl(
fun({K, V}, Acc) ->
case lists:keyfind(K, 1, Acc) of
false ->
[{K, V} | Acc];
{K, _} = {K, V1} ->
[{K, V1} | Acc]
end
end,
Proplist2,
Proplist1
).
process_request( process_request(
#{ #{
method := MethodTemplate, method := MethodTemplate,
@ -691,7 +836,7 @@ maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
true -> Context; true -> Context;
false -> Context#{attempt := Attempt + 1} false -> Context#{attempt := Attempt + 1}
end, end,
?tp(webhook_will_retry_async, #{}), ?tp(http_will_retry_async, #{}),
Worker = resolve_pool_worker(State, KeyOrNum), Worker = resolve_pool_worker(State, KeyOrNum),
ok = ehttpc:request_async( ok = ehttpc:request_async(
Worker, Worker,

View File

@ -18,69 +18,162 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
-export([roots/0, fields/1, namespace/0, desc/1]). -export([roots/0, fields/1, namespace/0, desc/1]).
-export([
bridge_v2_examples/1,
%%conn_bridge_examples/1,
connector_examples/1
]).
%%====================================================================================== %%======================================================================================
%% Hocon Schema Definitions %% Hocon Schema Definitions
namespace() -> "bridge_webhook".
namespace() -> "bridge_http".
roots() -> []. roots() -> [].
fields("config") -> %%--------------------------------------------------------------------
basic_config() ++ request_config(); %% v1 bridges http api
%% see: emqx_bridge_schema:get_response/0, put_request/0, post_request/0
fields("post") -> fields("post") ->
[ [
type_field(), old_type_field(),
name_field() name_field()
] ++ fields("config"); ] ++ fields("config");
fields("put") -> fields("put") ->
fields("config"); fields("config");
fields("get") -> fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post"); emqx_bridge_schema:status_fields() ++ fields("post");
fields("creation_opts") -> %%--- v1 bridges config file
%% see: emqx_bridge_schema:fields(bridges)
fields("config") ->
basic_config() ++ request_config();
%%--------------------------------------------------------------------
%% v2: configuration
fields(action) ->
{http,
mk(
hoconsc:map(name, ref(?MODULE, "http_action")),
#{
aliases => [webhook],
desc => <<"HTTP Action Config">>,
required => false
}
)};
fields("http_action") ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable_bridge"), default => true})},
{connector,
mk(binary(), #{
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
})},
{description, emqx_schema:description_schema()},
%% Note: there's an implicit convention in `emqx_bridge' that,
%% for egress bridges with this config, the published messages
%% will be forwarded to such bridges.
{local_topic,
mk(
binary(),
#{
required => false,
desc => ?DESC("config_local_topic"),
importance => ?IMPORTANCE_HIDDEN
}
)},
%% Since e5.3.2, we split the http bridge to two parts: a) connector. b) actions.
%% some fields are moved to connector, some fields are moved to actions and composed into the
%% `parameters` field.
{parameters,
mk(ref("parameters_opts"), #{
required => true,
desc => ?DESC("config_parameters_opts")
})}
] ++ http_resource_opts();
fields("parameters_opts") ->
[
{path,
mk(
binary(),
#{
desc => ?DESC("config_path"),
required => false
}
)},
method_field(),
headers_field(),
body_field(),
max_retries_field(),
request_timeout_field()
];
%% v2: api schema
%% The parameter equls to
%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1
%% `get_connector`, `post_connector`, `put_connector` from emqx_connector_schema:api_schema/1
fields("post_" ++ Type) ->
[type_field(), name_field() | fields("config_" ++ Type)];
fields("put_" ++ Type) ->
fields("config_" ++ Type);
fields("get_" ++ Type) ->
emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type);
fields("config_bridge_v2") ->
fields("http_action");
fields("config_connector") ->
[
{enable,
mk(
boolean(),
#{
desc => <<"Enable or disable this connector">>,
default => true
}
)},
{description, emqx_schema:description_schema()}
] ++ connector_url_headers() ++ connector_opts();
%%--------------------------------------------------------------------
%% v1/v2
fields("resource_opts") ->
UnsupportedOpts = [enable_batch, batch_size, batch_time],
lists:filter( lists:filter(
fun({K, _V}) -> fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
not lists:member(K, unsupported_opts())
end,
emqx_resource_schema:fields("creation_opts") emqx_resource_schema:fields("creation_opts")
). ).
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc("creation_opts") -> desc("resource_opts") ->
?DESC(emqx_resource_schema, "creation_opts"); ?DESC(emqx_resource_schema, "creation_opts");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for WebHook using `", string:to_upper(Method), "` method."]; ["Configuration for WebHook using `", string:to_upper(Method), "` method."];
desc("config_connector") ->
?DESC("desc_config");
desc("http_action") ->
?DESC("desc_config");
desc("parameters_opts") ->
?DESC("config_parameters_opts");
desc(_) -> desc(_) ->
undefined. undefined.
%%--------------------------------------------------------------------
%% helpers for v1 only
basic_config() -> basic_config() ->
[ [
{enable, {enable,
mk( mk(
boolean(), boolean(),
#{ #{
desc => ?DESC("config_enable"), desc => ?DESC("config_enable_bridge"),
default => true default => true
} }
)} )}
] ++ webhook_creation_opts() ++ ] ++ http_resource_opts() ++ connector_opts().
proplists:delete(
max_retries, emqx_bridge_http_connector:fields(config)
).
request_config() -> request_config() ->
[ [
{url, url_field(),
mk(
binary(),
#{
required => true,
desc => ?DESC("config_url")
}
)},
{direction, {direction,
mk( mk(
egress, egress,
@ -98,81 +191,37 @@ request_config() ->
required => false required => false
} }
)}, )},
{method, method_field(),
mk( headers_field(),
method(), body_field(),
#{ max_retries_field(),
default => post, request_timeout_field()
desc => ?DESC("config_method")
}
)},
{headers,
mk(
map(),
#{
default => #{
<<"accept">> => <<"application/json">>,
<<"cache-control">> => <<"no-cache">>,
<<"connection">> => <<"keep-alive">>,
<<"content-type">> => <<"application/json">>,
<<"keep-alive">> => <<"timeout=5">>
},
desc => ?DESC("config_headers")
}
)},
{body,
mk(
binary(),
#{
default => undefined,
desc => ?DESC("config_body")
}
)},
{max_retries,
mk(
non_neg_integer(),
#{
default => 2,
desc => ?DESC("config_max_retries")
}
)},
{request_timeout,
mk(
emqx_schema:duration_ms(),
#{
default => <<"15s">>,
deprecated => {since, "v5.0.26"},
desc => ?DESC("config_request_timeout")
}
)}
]. ].
webhook_creation_opts() -> %%--------------------------------------------------------------------
[ %% helpers for v2 only
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
].
unsupported_opts() -> connector_url_headers() ->
[ [url_field(), headers_field()].
enable_batch,
batch_size,
batch_time
].
%%====================================================================================== %%--------------------------------------------------------------------
%% common funcs
%% `webhook` is kept for backward compatibility.
old_type_field() ->
{type,
mk(
enum([webhook, http]),
#{
required => true,
desc => ?DESC("desc_type")
}
)}.
type_field() -> type_field() ->
{type, {type,
mk( mk(
webhook, http,
#{ #{
required => true, required => true,
desc => ?DESC("desc_type") desc => ?DESC("desc_type")
@ -189,5 +238,189 @@ name_field() ->
} }
)}. )}.
method() -> url_field() ->
enum([post, put, get, delete]). {url,
mk(
binary(),
#{
required => true,
desc => ?DESC("config_url")
}
)}.
headers_field() ->
{headers,
mk(
map(),
#{
default => #{
<<"accept">> => <<"application/json">>,
<<"cache-control">> => <<"no-cache">>,
<<"connection">> => <<"keep-alive">>,
<<"content-type">> => <<"application/json">>,
<<"keep-alive">> => <<"timeout=5">>
},
desc => ?DESC("config_headers")
}
)}.
method_field() ->
{method,
mk(
enum([post, put, get, delete]),
#{
default => post,
desc => ?DESC("config_method")
}
)}.
body_field() ->
{body,
mk(
binary(),
#{
default => undefined,
desc => ?DESC("config_body")
}
)}.
max_retries_field() ->
{max_retries,
mk(
non_neg_integer(),
#{
default => 2,
desc => ?DESC("config_max_retries")
}
)}.
request_timeout_field() ->
{request_timeout,
mk(
emqx_schema:duration_ms(),
#{
default => <<"15s">>,
deprecated => {since, "v5.0.26"},
desc => ?DESC("config_request_timeout")
}
)}.
http_resource_opts() ->
[
{resource_opts,
mk(
ref(?MODULE, "resource_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
].
connector_opts() ->
mark_request_field_deperecated(
proplists:delete(max_retries, emqx_bridge_http_connector:fields(config))
).
mark_request_field_deperecated(Fields) ->
lists:map(
fun({K, V}) ->
case K of
request ->
{K, V#{
%% Note: if we want to deprecate a reference type, we have to change
%% it to a direct type first.
type => typerefl:map(),
deprecated => {since, "5.3.2"},
desc => <<"This field is never used, so we deprecated it since 5.3.2.">>
}};
_ ->
{K, V}
end
end,
Fields
).
%%--------------------------------------------------------------------
%% Examples
bridge_v2_examples(Method) ->
[
#{
<<"http">> => #{
summary => <<"HTTP Action">>,
value => values({Method, bridge_v2})
}
}
].
connector_examples(Method) ->
[
#{
<<"http">> => #{
summary => <<"HTTP Connector">>,
value => values({Method, connector})
}
}
].
values({get, Type}) ->
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
},
values({post, Type})
);
values({post, bridge_v2}) ->
maps:merge(
#{
name => <<"my_http_action">>,
type => <<"http">>
},
values({put, bridge_v2})
);
values({post, connector}) ->
maps:merge(
#{
name => <<"my_http_connector">>,
type => <<"http">>
},
values({put, connector})
);
values({put, bridge_v2}) ->
values(bridge_v2);
values({put, connector}) ->
values(connector);
values(bridge_v2) ->
#{
enable => true,
connector => <<"my_http_connector">>,
parameters => #{
path => <<"/room/${room_no}">>,
method => <<"post">>,
headers => #{},
body => <<"${.}">>
},
resource_opts => #{
worker_pool_size => 16,
health_check_interval => <<"15s">>,
query_mode => <<"async">>
}
};
values(connector) ->
#{
enable => true,
url => <<"http://localhost:8080/api/v1">>,
headers => #{<<"content-type">> => <<"application/json">>},
connect_timeout => <<"15s">>,
pool_type => <<"hash">>,
pool_size => 1,
enable_pipelining => 100
}.

View File

@ -39,18 +39,33 @@ all() ->
groups() -> groups() ->
[]. [].
init_per_suite(_Config) -> init_per_suite(Config0) ->
emqx_common_test_helpers:render_and_load_app_config(emqx_conf), Config =
ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge, emqx_rule_engine]), case os:getenv("DEBUG_CASE") of
ok = emqx_connector_test_helpers:start_apps([emqx_resource]), [_ | _] = DebugCase ->
{ok, _} = application:ensure_all_started(emqx_connector), CaseName = list_to_atom(DebugCase),
[]. [{debug_case, CaseName} | Config0];
_ ->
Config0
end,
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge_http,
emqx_bridge,
emqx_rule_engine
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
emqx_mgmt_api_test_util:init_suite(),
[{apps, Apps} | Config].
end_per_suite(_Config) -> end_per_suite(Config) ->
ok = emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge, emqx_conf]), Apps = ?config(apps, Config),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), emqx_mgmt_api_test_util:end_suite(),
_ = application:stop(emqx_connector), ok = emqx_cth_suite:stop(Apps),
_ = application:stop(emqx_bridge),
ok. ok.
suite() -> suite() ->
@ -115,7 +130,9 @@ end_per_testcase(TestCase, _Config) when
-> ->
ok = emqx_bridge_http_connector_test_server:stop(), ok = emqx_bridge_http_connector_test_server:stop(),
persistent_term:erase({?MODULE, times_called}), persistent_term:erase({?MODULE, times_called}),
emqx_bridge_testlib:delete_all_bridges(), %emqx_bridge_testlib:delete_all_bridges(),
emqx_bridge_v2_testlib:delete_all_bridges(),
emqx_bridge_v2_testlib:delete_all_connectors(),
emqx_common_test_helpers:call_janitor(), emqx_common_test_helpers:call_janitor(),
ok; ok;
end_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, Config) ->
@ -123,7 +140,8 @@ end_per_testcase(_TestCase, Config) ->
undefined -> ok; undefined -> ok;
Server -> stop_http_server(Server) Server -> stop_http_server(Server)
end, end,
emqx_bridge_testlib:delete_all_bridges(), emqx_bridge_v2_testlib:delete_all_bridges(),
emqx_bridge_v2_testlib:delete_all_connectors(),
emqx_common_test_helpers:call_janitor(), emqx_common_test_helpers:call_janitor(),
ok. ok.
@ -420,7 +438,7 @@ t_send_async_connection_timeout(Config) ->
), ),
NumberOfMessagesToSend = 10, NumberOfMessagesToSend = 10,
[ [
emqx_bridge:send_message(BridgeID, #{<<"id">> => Id}) do_send_message(#{<<"id">> => Id})
|| Id <- lists:seq(1, NumberOfMessagesToSend) || Id <- lists:seq(1, NumberOfMessagesToSend)
], ],
%% Make sure server receives all messages %% Make sure server receives all messages
@ -431,7 +449,7 @@ t_send_async_connection_timeout(Config) ->
t_async_free_retries(Config) -> t_async_free_retries(Config) ->
#{port := Port} = ?config(http_server, Config), #{port := Port} = ?config(http_server, Config),
BridgeID = make_bridge(#{ _BridgeID = make_bridge(#{
port => Port, port => Port,
pool_size => 1, pool_size => 1,
query_mode => "sync", query_mode => "sync",
@ -445,7 +463,7 @@ t_async_free_retries(Config) ->
Fn = fun(Get, Error) -> Fn = fun(Get, Error) ->
?assertMatch( ?assertMatch(
{ok, 200, _, _}, {ok, 200, _, _},
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}), do_send_message(#{<<"hello">> => <<"world">>}),
#{error => Error} #{error => Error}
), ),
?assertEqual(ExpectedAttempts, Get(), #{error => Error}) ?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@ -456,7 +474,7 @@ t_async_free_retries(Config) ->
t_async_common_retries(Config) -> t_async_common_retries(Config) ->
#{port := Port} = ?config(http_server, Config), #{port := Port} = ?config(http_server, Config),
BridgeID = make_bridge(#{ _BridgeID = make_bridge(#{
port => Port, port => Port,
pool_size => 1, pool_size => 1,
query_mode => "sync", query_mode => "sync",
@ -471,7 +489,7 @@ t_async_common_retries(Config) ->
FnSucceed = fun(Get, Error) -> FnSucceed = fun(Get, Error) ->
?assertMatch( ?assertMatch(
{ok, 200, _, _}, {ok, 200, _, _},
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}), do_send_message(#{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()} #{error => Error, attempts => Get()}
), ),
?assertEqual(ExpectedAttempts, Get(), #{error => Error}) ?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@ -479,7 +497,7 @@ t_async_common_retries(Config) ->
FnFail = fun(Get, Error) -> FnFail = fun(Get, Error) ->
?assertMatch( ?assertMatch(
Error, Error,
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}), do_send_message(#{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()} #{error => Error, attempts => Get()}
), ),
?assertEqual(ExpectedAttempts, Get(), #{error => Error}) ?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@ -559,7 +577,7 @@ t_path_not_found(Config) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
?assertEqual([], ?of_kind(webhook_will_retry_async, Trace)), ?assertEqual([], ?of_kind(http_will_retry_async, Trace)),
ok ok
end end
), ),
@ -600,7 +618,7 @@ t_too_many_requests(Config) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
?assertMatch([_ | _], ?of_kind(webhook_will_retry_async, Trace)), ?assertMatch([_ | _], ?of_kind(http_will_retry_async, Trace)),
ok ok
end end
), ),
@ -711,6 +729,11 @@ t_bridge_probes_header_atoms(Config) ->
ok. ok.
%% helpers %% helpers
do_send_message(Message) ->
Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(?BRIDGE_TYPE),
emqx_bridge_v2:send_message(Type, ?BRIDGE_NAME, Message, #{}).
do_t_async_retries(TestCase, TestContext, Error, Fn) -> do_t_async_retries(TestCase, TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext, #{error_attempts := ErrorAttempts} = TestContext,
PTKey = {?MODULE, TestCase, attempts}, PTKey = {?MODULE, TestCase, attempts},

View File

@ -137,7 +137,7 @@ param_path_id() ->
#{ #{
in => path, in => path,
required => true, required => true,
example => <<"webhook:webhook_example">>, example => <<"http:my_http_connector">>,
desc => ?DESC("desc_param_path_id") desc => ?DESC("desc_param_path_id")
} }
)}. )}.
@ -158,17 +158,7 @@ connector_info_array_example(Method) ->
lists:map(fun(#{value := Config}) -> Config end, maps:values(connector_info_examples(Method))). lists:map(fun(#{value := Config}) -> Config end, maps:values(connector_info_examples(Method))).
connector_info_examples(Method) -> connector_info_examples(Method) ->
maps:merge( emqx_connector_schema:examples(Method).
#{},
emqx_enterprise_connector_examples(Method)
).
-if(?EMQX_RELEASE_EDITION == ee).
emqx_enterprise_connector_examples(Method) ->
emqx_connector_ee_schema:examples(Method).
-else.
emqx_enterprise_connector_examples(_Method) -> #{}.
-endif.
schema("/connectors") -> schema("/connectors") ->
#{ #{

View File

@ -49,6 +49,8 @@
get_channels/2 get_channels/2
]). ]).
-export([parse_url/1]).
-callback connector_config(ParsedConfig) -> -callback connector_config(ParsedConfig) ->
ParsedConfig ParsedConfig
when when
@ -77,8 +79,10 @@ connector_impl_module(_ConnectorType) ->
-endif. -endif.
connector_to_resource_type_ce(_ConnectorType) -> connector_to_resource_type_ce(http) ->
no_bridge_v2_for_c2_so_far. emqx_bridge_http_connector;
connector_to_resource_type_ce(ConnectorType) ->
error({no_bridge_v2, ConnectorType}).
resource_id(ConnectorId) when is_binary(ConnectorId) -> resource_id(ConnectorId) when is_binary(ConnectorId) ->
<<"connector:", ConnectorId/binary>>. <<"connector:", ConnectorId/binary>>.
@ -271,13 +275,11 @@ remove(Type, Name, _Conf, _Opts) ->
%% convert connector configs to what the connector modules want %% convert connector configs to what the connector modules want
parse_confs( parse_confs(
<<"webhook">>, <<"http">>,
_Name, _Name,
#{ #{
url := Url, url := Url,
method := Method, headers := Headers
headers := Headers,
max_retries := Retry
} = Conf } = Conf
) -> ) ->
Url1 = bin(Url), Url1 = bin(Url),
@ -290,20 +292,14 @@ parse_confs(
Reason1 = emqx_utils:readable_error_msg(Reason), Reason1 = emqx_utils:readable_error_msg(Reason),
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>) invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
end, end,
RequestTTL = emqx_utils_maps:deep_get(
[resource_opts, request_ttl],
Conf
),
Conf#{ Conf#{
base_url => BaseUrl1, base_url => BaseUrl1,
request => request =>
#{ #{
path => Path, path => Path,
method => Method,
body => maps:get(body, Conf, undefined),
headers => Headers, headers => Headers,
request_ttl => RequestTTL, body => undefined,
max_retries => Retry method => undefined
} }
}; };
parse_confs(<<"iotdb">>, Name, Conf) -> parse_confs(<<"iotdb">>, Name, Conf) ->

View File

@ -15,7 +15,8 @@
-export([ -export([
api_schemas/1, api_schemas/1,
fields/1, fields/1,
examples/1 %%examples/1
schema_modules/0
]). ]).
resource_type(Type) when is_binary(Type) -> resource_type(Type) when is_binary(Type) ->
@ -59,18 +60,6 @@ connector_structs() ->
)} )}
]. ].
examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, connector_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
lists:foldl(Fun, #{}, schema_modules()).
schema_modules() -> schema_modules() ->
[ [
emqx_bridge_kafka, emqx_bridge_kafka,

View File

@ -35,6 +35,8 @@
-export([resource_opts_fields/0, resource_opts_fields/1]). -export([resource_opts_fields/0, resource_opts_fields/1]).
-export([examples/1]).
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
enterprise_api_schemas(Method) -> enterprise_api_schemas(Method) ->
%% We *must* do this to ensure the module is really loaded, especially when we use %% We *must* do this to ensure the module is really loaded, especially when we use
@ -64,6 +66,37 @@ enterprise_fields_connectors() -> [].
-endif. -endif.
api_schemas(Method) ->
[
%% We need to map the `type' field of a request (binary) to a
%% connector schema module.
api_ref(emqx_bridge_http_schema, <<"http">>, Method ++ "_connector")
].
api_ref(Module, Type, Method) ->
{Type, ref(Module, Method)}.
examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, connector_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
lists:foldl(Fun, #{}, schema_modules()).
-if(?EMQX_RELEASE_EDITION == ee).
schema_modules() ->
[emqx_bridge_http_schema] ++ emqx_connector_ee_schema:schema_modules().
-else.
schema_modules() ->
[emqx_bridge_http_schema].
-endif.
connector_type_to_bridge_types(http) -> [http, webhook];
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]. connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer].
@ -298,8 +331,9 @@ post_request() ->
api_schema("post"). api_schema("post").
api_schema(Method) -> api_schema(Method) ->
CE = api_schemas(Method),
EE = enterprise_api_schemas(Method), EE = enterprise_api_schemas(Method),
hoconsc:union(connector_api_union(EE)). hoconsc:union(connector_api_union(CE ++ EE)).
connector_api_union(Refs) -> connector_api_union(Refs) ->
Index = maps:from_list(Refs), Index = maps:from_list(Refs),
@ -344,7 +378,17 @@ roots() ->
end. end.
fields(connectors) -> fields(connectors) ->
[] ++ enterprise_fields_connectors(). [
{http,
mk(
hoconsc:map(name, ref(emqx_bridge_http_schema, "config_connector")),
#{
alias => [webhook],
desc => <<"HTTP Connector Config">>,
required => false
}
)}
] ++ enterprise_fields_connectors().
desc(connectors) -> desc(connectors) ->
?DESC("desc_connectors"); ?DESC("desc_connectors");

View File

@ -452,6 +452,7 @@ apps_to_start() ->
emqx_modules, emqx_modules,
emqx_gateway, emqx_gateway,
emqx_exhook, emqx_exhook,
emqx_bridge_http,
emqx_bridge, emqx_bridge,
emqx_auto_subscribe, emqx_auto_subscribe,

View File

@ -80,7 +80,7 @@ worker_pool_size_test_() ->
Conf = emqx_utils_maps:deep_put( Conf = emqx_utils_maps:deep_put(
[ [
<<"bridges">>, <<"bridges">>,
<<"webhook">>, <<"http">>,
<<"simple">>, <<"simple">>,
<<"resource_opts">>, <<"resource_opts">>,
<<"worker_pool_size">> <<"worker_pool_size">>
@ -88,7 +88,7 @@ worker_pool_size_test_() ->
BaseConf, BaseConf,
WorkerPoolSize WorkerPoolSize
), ),
#{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := CheckedConf}}} = check(Conf), #{<<"bridges">> := #{<<"http">> := #{<<"simple">> := CheckedConf}}} = check(Conf),
#{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf, #{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf,
WPS WPS
end, end,
@ -117,7 +117,7 @@ worker_pool_size_test_() ->
%%=========================================================================== %%===========================================================================
parse_and_check_webhook_bridge(Hocon) -> parse_and_check_webhook_bridge(Hocon) ->
#{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)), #{<<"bridges">> := #{<<"http">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)),
Conf. Conf.
parse(Hocon) -> parse(Hocon) ->

View File

@ -583,10 +583,18 @@ get_referenced_hookpoints(Froms) ->
]. ].
get_egress_bridges(Actions) -> get_egress_bridges(Actions) ->
[ lists:foldr(
emqx_bridge_resource:bridge_id(BridgeType, BridgeName) fun
|| {bridge, BridgeType, BridgeName, _ResId} <- Actions ({bridge, BridgeType, BridgeName, _ResId}, Acc) ->
]. [emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc];
({bridge_v2, BridgeType, BridgeName}, Acc) ->
[emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc];
(_, Acc) ->
Acc
end,
[],
Actions
).
%% For allowing an external application to add extra "built-in" functions to the %% For allowing an external application to add extra "built-in" functions to the
%% rule engine SQL like language. The module set by %% rule engine SQL like language. The module set by

View File

@ -3468,7 +3468,7 @@ t_get_basic_usage_info_1(_Config) ->
referenced_bridges => referenced_bridges =>
#{ #{
mqtt => 1, mqtt => 1,
webhook => 3 http => 3
} }
}, },
emqx_rule_engine:get_basic_usage_info() emqx_rule_engine:get_basic_usage_info()

View File

@ -41,44 +41,32 @@ suite() ->
apps() -> apps() ->
[ [
emqx_conf, emqx_conf,
emqx_management, emqx_connector,
emqx_retainer, emqx_retainer,
emqx_auth, emqx_auth,
emqx_auth_redis, emqx_auth_redis,
emqx_auth_mnesia, emqx_auth_mnesia,
emqx_auth_postgresql, emqx_auth_postgresql,
emqx_modules, emqx_modules,
emqx_telemetry emqx_telemetry,
emqx_bridge_http,
emqx_bridge,
emqx_rule_engine,
emqx_management
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
net_kernel:start(['master@127.0.0.1', longnames]), WorkDir = ?config(priv_dir, Config),
ok = meck:new(emqx_authz_file, [non_strict, passthrough, no_history, no_link]), Apps = emqx_cth_suite:start(apps(), #{work_dir => WorkDir}),
meck:expect( emqx_mgmt_api_test_util:init_suite(),
emqx_authz_file, [{apps, Apps}, {work_dir, WorkDir} | Config].
acl_conf_file,
fun() ->
emqx_common_test_helpers:deps_path(emqx_auth, "etc/acl.conf")
end
),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
emqx_gateway_test_utils:load_all_gateway_apps(),
start_apps(),
Config.
end_per_suite(_Config) -> end_per_suite(Config) ->
{ok, _} = emqx:update_config(
[authorization],
#{
<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []
}
),
mnesia:clear_table(cluster_rpc_commit), mnesia:clear_table(cluster_rpc_commit),
mnesia:clear_table(cluster_rpc_mfa), mnesia:clear_table(cluster_rpc_mfa),
stop_apps(), Apps = ?config(apps, Config),
meck:unload(emqx_authz_file), emqx_mgmt_api_test_util:end_suite(),
ok = emqx_cth_suite:stop(Apps),
ok. ok.
init_per_testcase(t_get_telemetry_without_memsup, Config) -> init_per_testcase(t_get_telemetry_without_memsup, Config) ->
@ -123,7 +111,6 @@ init_per_testcase(t_advanced_mqtt_features, Config) ->
mock_advanced_mqtt_features(), mock_advanced_mqtt_features(),
Config; Config;
init_per_testcase(t_authn_authz_info, Config) -> init_per_testcase(t_authn_authz_info, Config) ->
mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
create_authn('mqtt:global', built_in_database), create_authn('mqtt:global', built_in_database),
create_authn('tcp:default', redis), create_authn('tcp:default', redis),
@ -141,14 +128,11 @@ init_per_testcase(t_send_after_enable, Config) ->
mock_httpc(), mock_httpc(),
Config; Config;
init_per_testcase(t_rule_engine_and_data_bridge_info, Config) -> init_per_testcase(t_rule_engine_and_data_bridge_info, Config) ->
mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_bridge]),
ok = emqx_bridge_SUITE:setup_fake_telemetry_data(), ok = emqx_bridge_SUITE:setup_fake_telemetry_data(),
ok = setup_fake_rule_engine_data(), ok = setup_fake_rule_engine_data(),
Config; Config;
init_per_testcase(t_exhook_info, Config) -> init_per_testcase(t_exhook_info, Config) ->
mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
ExhookConf = ExhookConf =
#{ #{
@ -173,31 +157,8 @@ init_per_testcase(t_cluster_uuid, Config) ->
Node = start_slave(n1), Node = start_slave(n1),
[{n1, Node} | Config]; [{n1, Node} | Config];
init_per_testcase(t_uuid_restored_from_file, Config) -> init_per_testcase(t_uuid_restored_from_file, Config) ->
mock_httpc(), Config;
NodeUUID = <<"AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE">>,
ClusterUUID = <<"FFFFFFFF-GGGG-HHHH-IIII-JJJJJJJJJJJJ">>,
DataDir = emqx:data_dir(),
NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
file:delete(NodeUUIDFile),
file:delete(ClusterUUIDFile),
ok = file:write_file(NodeUUIDFile, NodeUUID),
ok = file:write_file(ClusterUUIDFile, ClusterUUID),
%% clear the UUIDs in the DB
{atomic, ok} = mria:clear_table(emqx_telemetry),
stop_apps(),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
start_apps(),
Node = start_slave(n1),
[
{n1, Node},
{node_uuid, NodeUUID},
{cluster_uuid, ClusterUUID}
| Config
];
init_per_testcase(t_uuid_saved_to_file, Config) -> init_per_testcase(t_uuid_saved_to_file, Config) ->
mock_httpc(),
DataDir = emqx:data_dir(), DataDir = emqx:data_dir(),
NodeUUIDFile = filename:join(DataDir, "node.uuid"), NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"), ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
@ -205,7 +166,6 @@ init_per_testcase(t_uuid_saved_to_file, Config) ->
file:delete(ClusterUUIDFile), file:delete(ClusterUUIDFile),
Config; Config;
init_per_testcase(t_num_clients, Config) -> init_per_testcase(t_num_clients, Config) ->
mock_httpc(),
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
Config; Config;
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
@ -227,7 +187,6 @@ end_per_testcase(t_advanced_mqtt_features, _Config) ->
{atomic, ok} = mria:clear_table(emqx_delayed), {atomic, ok} = mria:clear_table(emqx_delayed),
ok; ok;
end_per_testcase(t_authn_authz_info, _Config) -> end_per_testcase(t_authn_authz_info, _Config) ->
meck:unload([httpc]),
emqx_authz:update({delete, postgresql}, #{}), emqx_authz:update({delete, postgresql}, #{}),
lists:foreach( lists:foreach(
fun(ChainName) -> fun(ChainName) ->
@ -244,19 +203,8 @@ end_per_testcase(t_enable, _Config) ->
end_per_testcase(t_send_after_enable, _Config) -> end_per_testcase(t_send_after_enable, _Config) ->
meck:unload([httpc, emqx_telemetry_config]); meck:unload([httpc, emqx_telemetry_config]);
end_per_testcase(t_rule_engine_and_data_bridge_info, _Config) -> end_per_testcase(t_rule_engine_and_data_bridge_info, _Config) ->
meck:unload(httpc),
lists:foreach(
fun(App) ->
ok = application:stop(App)
end,
[
emqx_bridge,
emqx_rule_engine
]
),
ok; ok;
end_per_testcase(t_exhook_info, _Config) -> end_per_testcase(t_exhook_info, _Config) ->
meck:unload(httpc),
emqx_exhook_demo_svr:stop(), emqx_exhook_demo_svr:stop(),
application:stop(emqx_exhook), application:stop(emqx_exhook),
ok; ok;
@ -264,21 +212,12 @@ end_per_testcase(t_cluster_uuid, Config) ->
Node = proplists:get_value(n1, Config), Node = proplists:get_value(n1, Config),
ok = stop_slave(Node); ok = stop_slave(Node);
end_per_testcase(t_num_clients, Config) -> end_per_testcase(t_num_clients, Config) ->
meck:unload([httpc]),
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
Config; Config;
end_per_testcase(t_uuid_restored_from_file, Config) ->
Node = ?config(n1, Config),
DataDir = emqx:data_dir(),
NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
ok = file:delete(NodeUUIDFile),
ok = file:delete(ClusterUUIDFile),
meck:unload([httpc]),
ok = stop_slave(Node),
ok;
end_per_testcase(_Testcase, _Config) -> end_per_testcase(_Testcase, _Config) ->
meck:unload([httpc]), case catch meck:unload([httpc]) of
_ -> ok
end,
ok. ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -315,19 +254,34 @@ t_cluster_uuid(Config) ->
%% should attempt read UUID from file in data dir to keep UUIDs %% should attempt read UUID from file in data dir to keep UUIDs
%% unique, in the event of a database purge. %% unique, in the event of a database purge.
t_uuid_restored_from_file(Config) -> t_uuid_restored_from_file(Config) ->
ExpectedNodeUUID = ?config(node_uuid, Config), %% Stop the emqx_telemetry application first
ExpectedClusterUUID = ?config(cluster_uuid, Config), {atomic, ok} = mria:clear_table(emqx_telemetry),
application:stop(emqx_telemetry),
%% Rewrite the the uuid files
NodeUUID = <<"AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE">>,
ClusterUUID = <<"FFFFFFFF-GGGG-HHHH-IIII-JJJJJJJJJJJJ">>,
DataDir = ?config(work_dir, Config),
NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
ok = file:write_file(NodeUUIDFile, NodeUUID),
ok = file:write_file(ClusterUUIDFile, ClusterUUID),
%% Start the emqx_telemetry application again
application:start(emqx_telemetry),
%% Check the UUIDs
?assertEqual( ?assertEqual(
{ok, ExpectedNodeUUID}, {ok, NodeUUID},
emqx_telemetry:get_node_uuid() emqx_telemetry:get_node_uuid()
), ),
?assertEqual( ?assertEqual(
{ok, ExpectedClusterUUID}, {ok, ClusterUUID},
emqx_telemetry:get_cluster_uuid() emqx_telemetry:get_cluster_uuid()
), ),
ok. ok.
t_uuid_saved_to_file(_Config) -> t_uuid_saved_to_file(Config) ->
DataDir = emqx:data_dir(), DataDir = emqx:data_dir(),
NodeUUIDFile = filename:join(DataDir, "node.uuid"), NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"), ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
@ -337,9 +291,10 @@ t_uuid_saved_to_file(_Config) ->
%% clear the UUIDs in the DB %% clear the UUIDs in the DB
{atomic, ok} = mria:clear_table(emqx_telemetry), {atomic, ok} = mria:clear_table(emqx_telemetry),
stop_apps(), application:stop(emqx_telemetry),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
start_apps(), application:start(emqx_telemetry),
{ok, NodeUUID} = emqx_telemetry:get_node_uuid(), {ok, NodeUUID} = emqx_telemetry:get_node_uuid(),
{ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(), {ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(),
?assertEqual( ?assertEqual(
@ -578,6 +533,7 @@ t_mqtt_runtime_insights(_) ->
t_rule_engine_and_data_bridge_info(_Config) -> t_rule_engine_and_data_bridge_info(_Config) ->
{ok, TelemetryData} = emqx_telemetry:get_telemetry(), {ok, TelemetryData} = emqx_telemetry:get_telemetry(),
ct:pal("telemetry data: ~p~n", [TelemetryData]),
RuleInfo = get_value(rule_engine, TelemetryData), RuleInfo = get_value(rule_engine, TelemetryData),
BridgeInfo = get_value(bridge, TelemetryData), BridgeInfo = get_value(bridge, TelemetryData),
?assertEqual( ?assertEqual(
@ -588,7 +544,7 @@ t_rule_engine_and_data_bridge_info(_Config) ->
#{ #{
data_bridge => data_bridge =>
#{ #{
webhook => #{num => 1, num_linked_by_rules => 3}, http => #{num => 1, num_linked_by_rules => 3},
mqtt => #{num => 2, num_linked_by_rules => 2} mqtt => #{num => 2, num_linked_by_rules => 2}
}, },
num_data_bridges => 3 num_data_bridges => 3
@ -811,14 +767,6 @@ setup_fake_rule_engine_data() ->
), ),
ok. ok.
set_special_configs(emqx_auth) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
{ok, _} = emqx:update_config([authorization, sources], []),
ok;
set_special_configs(_App) ->
ok.
%% for some unknown reason, gen_rpc running locally or in CI might %% for some unknown reason, gen_rpc running locally or in CI might
%% start with different `port_discovery' modes, which means that'll %% start with different `port_discovery' modes, which means that'll
%% either be listening at the port in the config (`tcp_server_port', %% either be listening at the port in the config (`tcp_server_port',
@ -887,9 +835,3 @@ leave_cluster() ->
is_official_version(V) -> is_official_version(V) ->
emqx_telemetry_config:is_official_version(V). emqx_telemetry_config:is_official_version(V).
start_apps() ->
emqx_common_test_helpers:start_apps(apps(), fun set_special_configs/1).
stop_apps() ->
emqx_common_test_helpers:stop_apps(lists:reverse(apps())).

View File

@ -18,10 +18,10 @@ config_direction.desc:
config_direction.label: config_direction.label:
"""Bridge Direction""" """Bridge Direction"""
config_enable.desc: config_enable_bridge.desc:
"""Enable or disable this bridge""" """Enable or disable this bridge"""
config_enable.label: config_enable_bridge.label:
"""Enable Or Disable Bridge""" """Enable Or Disable Bridge"""
config_headers.desc: config_headers.desc:
@ -71,6 +71,21 @@ is not allowed."""
config_url.label: config_url.label:
"""HTTP Bridge""" """HTTP Bridge"""
config_path.desc:
"""The URL path for this Action.<br/>
This path will be appended to the Connector's <code>url</code> configuration to form the full
URL address.
Template with variables is allowed in this option. For example, <code>/room/{$room_no}</code>"""
config_path.label:
"""URL Path"""
config_parameters_opts.desc:
"""The parameters for HTTP action."""
config_parameters_opts.label:
"""Parameters"""
desc_config.desc: desc_config.desc:
"""Configuration for an HTTP bridge.""" """Configuration for an HTTP bridge."""