diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl
index b41586518..a47e96251 100644
--- a/apps/emqx/test/emqx_cth_cluster.erl
+++ b/apps/emqx/test/emqx_cth_cluster.erl
@@ -50,7 +50,7 @@
-define(APPS_CLUSTERING, [gen_rpc, mria, ekka]).
-define(TIMEOUT_NODE_START_MS, 15000).
--define(TIMEOUT_APPS_START_MS, 30000).
+-define(TIMEOUT_APPS_START_MS, 60000).
-define(TIMEOUT_NODE_STOP_S, 15).
%%
diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl
index 7c246a797..3b5589921 100644
--- a/apps/emqx_bridge/src/emqx_action_info.erl
+++ b/apps/emqx_bridge/src/emqx_action_info.erl
@@ -77,7 +77,7 @@ hard_coded_action_info_modules_ee() ->
-endif.
hard_coded_action_info_modules_common() ->
- [].
+ [emqx_bridge_http_action_info].
hard_coded_action_info_modules() ->
hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee().
diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl
index 569c1e75a..4156a37d1 100644
--- a/apps/emqx_bridge/src/emqx_bridge.erl
+++ b/apps/emqx_bridge/src/emqx_bridge.erl
@@ -357,7 +357,7 @@ get_metrics(Type, Name) ->
maybe_upgrade(mqtt, Config) ->
emqx_bridge_compatible_config:maybe_upgrade(Config);
maybe_upgrade(webhook, Config) ->
- emqx_bridge_compatible_config:webhook_maybe_upgrade(Config);
+ emqx_bridge_compatible_config:http_maybe_upgrade(Config);
maybe_upgrade(_Other, Config) ->
Config.
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index 188f26ab5..c9c761105 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -143,7 +143,7 @@ param_path_id() ->
#{
in => path,
required => true,
- example => <<"webhook:webhook_example">>,
+ example => <<"http:http_example">>,
desc => ?DESC("desc_param_path_id")
}
)}.
@@ -166,9 +166,9 @@ bridge_info_array_example(Method) ->
bridge_info_examples(Method) ->
maps:merge(
#{
- <<"webhook_example">> => #{
- summary => <<"WebHook">>,
- value => info_example(webhook, Method)
+ <<"http_example">> => #{
+ summary => <<"HTTP">>,
+ value => info_example(http, Method)
},
<<"mqtt_example">> => #{
summary => <<"MQTT Bridge">>,
@@ -201,7 +201,7 @@ method_example(Type, Method) when Method == get; Method == post ->
method_example(_Type, put) ->
#{}.
-info_example_basic(webhook) ->
+info_example_basic(http) ->
#{
enable => true,
url => <<"http://localhost:9901/messages/${topic}">>,
@@ -212,7 +212,7 @@ info_example_basic(webhook) ->
pool_size => 4,
enable_pipelining => 100,
ssl => #{enable => false},
- local_topic => <<"emqx_webhook/#">>,
+ local_topic => <<"emqx_http/#">>,
method => post,
body => <<"${payload}">>,
resource_opts => #{
@@ -650,7 +650,8 @@ create_or_update_bridge(BridgeType0, BridgeName, Conf, HttpStatusCode) ->
get_metrics_from_local_node(BridgeType0, BridgeName) ->
BridgeType = upgrade_type(BridgeType0),
- format_metrics(emqx_bridge:get_metrics(BridgeType, BridgeName)).
+ MetricsResult = emqx_bridge:get_metrics(BridgeType, BridgeName),
+ format_metrics(MetricsResult).
'/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
?TRY_PARSE_ID(
diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl
index 674eceb81..7f58a880c 100644
--- a/apps/emqx_bridge/src/emqx_bridge_resource.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl
@@ -63,18 +63,23 @@
).
-if(?EMQX_RELEASE_EDITION == ee).
-bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
-bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
-bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector;
-bridge_to_resource_type(webhook) -> emqx_bridge_http_connector;
-bridge_to_resource_type(BridgeType) -> emqx_bridge_enterprise:resource_type(BridgeType).
+bridge_to_resource_type(BridgeType) when is_binary(BridgeType) ->
+ bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8));
+bridge_to_resource_type(mqtt) ->
+ emqx_bridge_mqtt_connector;
+bridge_to_resource_type(webhook) ->
+ emqx_bridge_http_connector;
+bridge_to_resource_type(BridgeType) ->
+ emqx_bridge_enterprise:resource_type(BridgeType).
bridge_impl_module(BridgeType) -> emqx_bridge_enterprise:bridge_impl_module(BridgeType).
-else.
-bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
-bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
-bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector;
-bridge_to_resource_type(webhook) -> emqx_bridge_http_connector.
+bridge_to_resource_type(BridgeType) when is_binary(BridgeType) ->
+ bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8));
+bridge_to_resource_type(mqtt) ->
+ emqx_bridge_mqtt_connector;
+bridge_to_resource_type(webhook) ->
+ emqx_bridge_http_connector.
bridge_impl_module(_BridgeType) -> undefined.
-endif.
@@ -309,6 +314,7 @@ remove(Type, Name, _Conf, _Opts) ->
emqx_resource:remove_local(resource_id(Type, Name)).
%% convert bridge configs to what the connector modules want
+%% TODO: remove it, if the http_bridge already ported to v2
parse_confs(
<<"webhook">>,
_Name,
diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl
index d9ca1acce..0c0c0752d 100644
--- a/apps/emqx_bridge/src/emqx_bridge_v2.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl
@@ -1163,7 +1163,7 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
%% If the bridge v2 does not exist, it is a valid bridge v1
PreviousRawConf = undefined,
split_bridge_v1_config_and_create_helper(
- BridgeV1Type, BridgeName, RawConf, PreviousRawConf
+ BridgeV1Type, BridgeName, RawConf, PreviousRawConf, fun() -> ok end
);
_Conf ->
case ?MODULE:bridge_v1_is_valid(BridgeV1Type, BridgeName) of
@@ -1173,9 +1173,13 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
PreviousRawConf = emqx:get_raw_config(
[?ROOT_KEY, BridgeV2Type, BridgeName], undefined
),
- bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps),
+ %% To avoid losing configurations. We have to make sure that no crash occurs
+ %% during deletion and creation of configurations.
+ PreCreateFun = fun() ->
+ bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps)
+ end,
split_bridge_v1_config_and_create_helper(
- BridgeV1Type, BridgeName, RawConf, PreviousRawConf
+ BridgeV1Type, BridgeName, RawConf, PreviousRawConf, PreCreateFun
);
false ->
%% If the bridge v2 exists, it is not a valid bridge v1
@@ -1183,16 +1187,49 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
end
end.
-split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf, PreviousRawConf) ->
- #{
- connector_type := ConnectorType,
- connector_name := NewConnectorName,
- connector_conf := NewConnectorRawConf,
- bridge_v2_type := BridgeType,
- bridge_v2_name := BridgeName,
- bridge_v2_conf := NewBridgeV2RawConf
- } =
- split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousRawConf),
+split_bridge_v1_config_and_create_helper(
+ BridgeV1Type, BridgeName, RawConf, PreviousRawConf, PreCreateFun
+) ->
+ try
+ #{
+ connector_type := ConnectorType,
+ connector_name := NewConnectorName,
+ connector_conf := NewConnectorRawConf,
+ bridge_v2_type := BridgeType,
+ bridge_v2_name := BridgeName,
+ bridge_v2_conf := NewBridgeV2RawConf
+ } = split_and_validate_bridge_v1_config(
+ BridgeV1Type,
+ BridgeName,
+ RawConf,
+ PreviousRawConf
+ ),
+
+ _ = PreCreateFun(),
+
+ do_connector_and_bridge_create(
+ ConnectorType,
+ NewConnectorName,
+ NewConnectorRawConf,
+ BridgeType,
+ BridgeName,
+ NewBridgeV2RawConf,
+ RawConf
+ )
+ catch
+ throw:Reason ->
+ {error, Reason}
+ end.
+
+do_connector_and_bridge_create(
+ ConnectorType,
+ NewConnectorName,
+ NewConnectorRawConf,
+ BridgeType,
+ BridgeName,
+ NewBridgeV2RawConf,
+ RawConf
+) ->
case emqx_connector:create(ConnectorType, NewConnectorName, NewConnectorRawConf) of
{ok, _} ->
case create(BridgeType, BridgeName, NewBridgeV2RawConf) of
@@ -1308,15 +1345,20 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
RawConf = maps:without([<<"name">>], RawConfig0),
TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
PreviousRawConf = undefined,
- #{
- connector_type := _ConnectorType,
- connector_name := _NewConnectorName,
- connector_conf := ConnectorRawConf,
- bridge_v2_type := BridgeV2Type,
- bridge_v2_name := _BridgeName,
- bridge_v2_conf := BridgeV2RawConf
- } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
- create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf).
+ try
+ #{
+ connector_type := _ConnectorType,
+ connector_name := _NewConnectorName,
+ connector_conf := ConnectorRawConf,
+ bridge_v2_type := BridgeV2Type,
+ bridge_v2_name := _BridgeName,
+ bridge_v2_conf := BridgeV2RawConf
+ } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
+ create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf)
+ catch
+ throw:Reason ->
+ {error, Reason}
+ end.
bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) ->
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl
index cb1f7cc62..f2a51c6cb 100644
--- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl
@@ -110,7 +110,7 @@ param_path_id() ->
#{
in => path,
required => true,
- example => <<"webhook:webhook_example">>,
+ example => <<"http:my_http_action">>,
desc => ?DESC("desc_param_path_id")
}
)}.
diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl
index 6adbf3942..b68a4c387 100644
--- a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl
+++ b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl
@@ -21,7 +21,7 @@
-export([
upgrade_pre_ee/2,
maybe_upgrade/1,
- webhook_maybe_upgrade/1
+ http_maybe_upgrade/1
]).
upgrade_pre_ee(undefined, _UpgradeFunc) ->
@@ -40,10 +40,10 @@ maybe_upgrade(#{<<"connector">> := _} = Config0) ->
maybe_upgrade(NewVersion) ->
NewVersion.
-webhook_maybe_upgrade(#{<<"direction">> := _} = Config0) ->
+http_maybe_upgrade(#{<<"direction">> := _} = Config0) ->
Config1 = maps:remove(<<"direction">>, Config0),
Config1#{<<"resource_opts">> => default_resource_opts()};
-webhook_maybe_upgrade(NewVersion) ->
+http_maybe_upgrade(NewVersion) ->
NewVersion.
binary_key({K, V}) ->
diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
index ff924ac8c..27b3a8f14 100644
--- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
+++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
@@ -162,13 +162,14 @@ roots() -> [{bridges, ?HOCON(?R_REF(bridges), #{importance => ?IMPORTANCE_LOW})}
fields(bridges) ->
[
- {webhook,
+ {http,
mk(
hoconsc:map(name, ref(emqx_bridge_http_schema, "config")),
#{
+ aliases => [webhook],
desc => ?DESC("bridges_webhook"),
required => false,
- converter => fun webhook_bridge_converter/2
+ converter => fun http_bridge_converter/2
}
)},
{mqtt,
@@ -243,7 +244,7 @@ status() ->
node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
-webhook_bridge_converter(Conf0, _HoconOpts) ->
+http_bridge_converter(Conf0, _HoconOpts) ->
emqx_bridge_compatible_config:upgrade_pre_ee(
- Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
+ Conf0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1
).
diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl
index bc8be5476..30107d0ce 100644
--- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl
@@ -30,14 +30,18 @@ init_per_suite(Config) ->
[
emqx,
emqx_conf,
+ emqx_connector,
+ emqx_bridge_http,
emqx_bridge
],
#{work_dir => ?config(priv_dir, Config)}
),
+ emqx_mgmt_api_test_util:init_suite(),
[{apps, Apps} | Config].
end_per_suite(Config) ->
Apps = ?config(apps, Config),
+ emqx_mgmt_api_test_util:end_suite(),
ok = emqx_cth_suite:stop(Apps),
ok.
@@ -58,6 +62,7 @@ end_per_testcase(t_get_basic_usage_info_1, _Config) ->
ok = emqx_bridge:remove(BridgeType, BridgeName)
end,
[
+ %% Keep using the old bridge names to avoid breaking the tests
{webhook, <<"basic_usage_info_webhook">>},
{webhook, <<"basic_usage_info_webhook_disabled">>},
{mqtt, <<"basic_usage_info_mqtt">>}
@@ -88,7 +93,7 @@ t_get_basic_usage_info_1(_Config) ->
#{
num_bridges => 3,
count_by_type => #{
- webhook => 1,
+ http => 1,
mqtt => 2
}
},
@@ -119,40 +124,33 @@ setup_fake_telemetry_data() ->
HTTPConfig = #{
url => <<"http://localhost:9901/messages/${topic}">>,
enable => true,
- local_topic => "emqx_webhook/#",
+ local_topic => "emqx_http/#",
method => post,
body => <<"${payload}">>,
headers => #{},
request_timeout => "15s"
},
- Conf =
- #{
- <<"bridges">> =>
- #{
- <<"webhook">> =>
- #{
- <<"basic_usage_info_webhook">> => HTTPConfig,
- <<"basic_usage_info_webhook_disabled">> =>
- HTTPConfig#{enable => false}
- },
- <<"mqtt">> =>
- #{
- <<"basic_usage_info_mqtt">> => MQTTConfig1,
- <<"basic_usage_info_mqtt_from_select">> => MQTTConfig2
- }
- }
- },
- ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf),
-
- ok = snabbkaffe:start_trace(),
- Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_loaded end,
- NEvents = 3,
- BackInTime = 0,
- Timeout = 11_000,
- {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, NEvents, Timeout, BackInTime),
- ok = emqx_bridge:load(),
- {ok, _} = snabbkaffe_collector:receive_events(Sub),
- ok = snabbkaffe:stop(),
+ %% Keep use the old bridge names to test the backward compatibility
+ {ok, _} = emqx_bridge_testlib:create_bridge_api(
+ <<"webhook">>,
+ <<"basic_usage_info_webhook">>,
+ HTTPConfig
+ ),
+ {ok, _} = emqx_bridge_testlib:create_bridge_api(
+ <<"webhook">>,
+ <<"basic_usage_info_webhook_disabled">>,
+ HTTPConfig#{enable => false}
+ ),
+ {ok, _} = emqx_bridge_testlib:create_bridge_api(
+ <<"mqtt">>,
+ <<"basic_usage_info_mqtt">>,
+ MQTTConfig1
+ ),
+ {ok, _} = emqx_bridge_testlib:create_bridge_api(
+ <<"mqtt">>,
+ <<"basic_usage_info_mqtt_from_select">>,
+ MQTTConfig2
+ ),
ok.
t_update_ssl_conf(Config) ->
diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
index ccc944572..7b5208f06 100644
--- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
@@ -78,6 +78,9 @@
emqx_auth,
emqx_auth_mnesia,
emqx_management,
+ emqx_connector,
+ emqx_bridge_http,
+ emqx_bridge,
{emqx_rule_engine, "rule_engine { rules {} }"},
{emqx_bridge, "bridges {}"}
]).
@@ -108,7 +111,7 @@ groups() ->
].
suite() ->
- [{timetrap, {seconds, 60}}].
+ [{timetrap, {seconds, 120}}].
init_per_suite(Config) ->
Config.
@@ -407,10 +410,7 @@ t_http_crud_apis(Config) ->
Config
),
?assertMatch(
- #{
- <<"reason">> := <<"unknown_fields">>,
- <<"unknown">> := <<"curl">>
- },
+ #{<<"reason">> := <<"required_field">>},
json(maps:get(<<"message">>, PutFail2))
),
{ok, 400, _} = request_json(
@@ -419,12 +419,16 @@ t_http_crud_apis(Config) ->
?HTTP_BRIDGE(<<"localhost:1234/foo">>, Name),
Config
),
- {ok, 400, _} = request_json(
+ {ok, 400, PutFail3} = request_json(
put,
uri(["bridges", BridgeID]),
?HTTP_BRIDGE(<<"htpp://localhost:12341234/foo">>, Name),
Config
),
+ ?assertMatch(
+ #{<<"kind">> := <<"validation_error">>},
+ json(maps:get(<<"message">>, PutFail3))
+ ),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
@@ -463,7 +467,7 @@ t_http_crud_apis(Config) ->
),
%% Create non working bridge
- BrokenURL = ?URL(Port + 1, "/foo"),
+ BrokenURL = ?URL(Port + 1, "foo"),
{ok, 201, BrokenBridge} = request(
post,
uri(["bridges"]),
@@ -471,6 +475,7 @@ t_http_crud_apis(Config) ->
fun json/1,
Config
),
+
?assertMatch(
#{
<<"type">> := ?BRIDGE_TYPE_HTTP,
@@ -1307,6 +1312,7 @@ t_cluster_later_join_metrics(Config) ->
Name = ?BRIDGE_NAME,
BridgeParams = ?HTTP_BRIDGE(URL1, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
+
?check_trace(
begin
%% Create a bridge on only one of the nodes.
@@ -1323,6 +1329,20 @@ t_cluster_later_join_metrics(Config) ->
ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]),
%% Check metrics; shouldn't crash even if the bridge is not
%% ready on the node that just joined the cluster.
+
+ %% assert: wait for the bridge to be ready on the other node.
+ fun
+ WaitConfSync(0) ->
+ throw(waiting_config_sync_timeout);
+ WaitConfSync(N) ->
+ timer:sleep(1000),
+ case erpc:call(OtherNode, emqx_bridge, list, []) of
+ [] -> WaitConfSync(N - 1);
+ [_] -> ok
+ end
+ end(
+ 60
+ ),
?assertMatch(
{ok, 200, #{
<<"metrics">> := #{<<"success">> := _},
@@ -1373,17 +1393,16 @@ t_create_with_bad_name(Config) ->
validate_resource_request_ttl(single, Timeout, Name) ->
SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
- BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
- ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name),
+ _BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
?check_trace(
begin
{ok, Res} =
?wait_async_action(
- emqx_bridge:send_message(BridgeID, SentData),
+ do_send_message(?BRIDGE_TYPE_HTTP, Name, SentData),
#{?snk_kind := async_query},
1000
),
- ?assertMatch({ok, #{id := ResId, query_opts := #{timeout := Timeout}}}, Res)
+ ?assertMatch({ok, #{id := _ResId, query_opts := #{timeout := Timeout}}}, Res)
end,
fun(Trace0) ->
Trace = ?of_kind(async_query, Trace0),
@@ -1394,6 +1413,10 @@ validate_resource_request_ttl(single, Timeout, Name) ->
validate_resource_request_ttl(_Cluster, _Timeout, _Name) ->
ignore.
+do_send_message(BridgeV1Type, Name, Message) ->
+ Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
+ emqx_bridge_v2:send_message(Type, Name, Message, #{}).
+
%%
request(Method, URL, Config) ->
diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl
index 540c18878..91c0a23d0 100644
--- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl
@@ -21,7 +21,7 @@ empty_config_test() ->
Conf1 = #{<<"bridges">> => #{}},
Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}},
?assertEqual(Conf1, check(Conf1)),
- ?assertEqual(Conf2, check(Conf2)),
+ ?assertEqual(#{<<"bridges">> => #{<<"http">> => #{}}}, check(Conf2)),
ok.
%% ensure webhook config can be checked
@@ -33,7 +33,7 @@ webhook_config_test() ->
?assertMatch(
#{
<<"bridges">> := #{
- <<"webhook">> := #{
+ <<"http">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
@@ -48,7 +48,7 @@ webhook_config_test() ->
?assertMatch(
#{
<<"bridges">> := #{
- <<"webhook">> := #{
+ <<"http">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
@@ -61,7 +61,7 @@ webhook_config_test() ->
),
#{
<<"bridges">> := #{
- <<"webhook">> := #{
+ <<"http">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
@@ -84,7 +84,7 @@ up(#{<<"mqtt">> := MqttBridges0} = Bridges) ->
Bridges#{<<"mqtt">> := MqttBridges};
up(#{<<"webhook">> := WebhookBridges0} = Bridges) ->
WebhookBridges = emqx_bridge_compatible_config:upgrade_pre_ee(
- WebhookBridges0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
+ WebhookBridges0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1
),
Bridges#{<<"webhook">> := WebhookBridges}.
diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl
index f486e5d64..118802551 100644
--- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl
@@ -92,7 +92,7 @@ end_per_testcase(_Testcase, Config) ->
delete_all_bridges() ->
lists:foreach(
fun(#{name := Name, type := Type}) ->
- emqx_bridge:remove(Type, Name)
+ ok = emqx_bridge:remove(Type, Name)
end,
emqx_bridge:list()
).
diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src
index 87d7e57a6..9cd71323e 100644
--- a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src
+++ b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src
@@ -3,7 +3,7 @@
{vsn, "0.1.5"},
{registered, []},
{applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]},
- {env, []},
+ {env, [{emqx_action_info_modules, [emqx_bridge_http_action_info]}]},
{modules, []},
{links, []}
]}.
diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl
new file mode 100644
index 000000000..3b0543ace
--- /dev/null
+++ b/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl
@@ -0,0 +1,102 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_http_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+ bridge_v1_type_name/0,
+ action_type_name/0,
+ connector_type_name/0,
+ schema_module/0,
+ connector_action_config_to_bridge_v1_config/2,
+ bridge_v1_config_to_action_config/2,
+ bridge_v1_config_to_connector_config/1
+]).
+
+-define(REMOVED_KEYS, [<<"direction">>]).
+-define(ACTION_KEYS, [<<"local_topic">>, <<"resource_opts">>]).
+-define(PARAMETER_KEYS, [<<"body">>, <<"max_retries">>, <<"method">>, <<"request_timeout">>]).
+
+bridge_v1_type_name() -> webhook.
+
+action_type_name() -> http.
+
+connector_type_name() -> http.
+
+schema_module() -> emqx_bridge_http_schema.
+
+connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
+ BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
+ %% Move parameters to the top level
+ ParametersMap1 = maps:get(<<"parameters">>, BridgeV1Config1, #{}),
+ ParametersMap2 = maps:without([<<"path">>, <<"headers">>], ParametersMap1),
+ BridgeV1Config2 = maps:remove(<<"parameters">>, BridgeV1Config1),
+ BridgeV1Config3 = emqx_utils_maps:deep_merge(BridgeV1Config2, ParametersMap2),
+ BridgeV1Config4 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config3),
+
+ Url = maps:get(<<"url">>, ConnectorConfig),
+ Path = maps:get(<<"path">>, ParametersMap1, <<>>),
+
+ Headers1 = maps:get(<<"headers">>, ConnectorConfig, #{}),
+ Headers2 = maps:get(<<"headers">>, ParametersMap1, #{}),
+
+ Url1 =
+ case Path of
+ <<>> -> Url;
+ _ -> emqx_bridge_http_connector:join_paths(Url, Path)
+ end,
+
+ BridgeV1Config4#{
+ <<"headers">> => maps:merge(Headers1, Headers2),
+ <<"url">> => Url1
+ }.
+
+bridge_v1_config_to_connector_config(BridgeV1Conf) ->
+ %% To statisfy the emqx_bridge_api_SUITE:t_http_crud_apis/1
+ ok = validate_webhook_url(maps:get(<<"url">>, BridgeV1Conf, undefined)),
+ maps:without(?REMOVED_KEYS ++ ?ACTION_KEYS ++ ?PARAMETER_KEYS, BridgeV1Conf).
+
+bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
+ Parameters = maps:with(?PARAMETER_KEYS, BridgeV1Conf),
+ Parameters1 = Parameters#{<<"path">> => <<>>},
+ CommonKeys = [<<"enable">>, <<"description">>],
+ ActionConfig = maps:with(?ACTION_KEYS ++ CommonKeys, BridgeV1Conf),
+ ActionConfig#{<<"parameters">> => Parameters1, <<"connector">> => ConnectorName}.
+
+%%--------------------------------------------------------------------
+%% helpers
+
+validate_webhook_url(undefined) ->
+ throw(#{
+ kind => validation_error,
+ reason => required_field,
+ required_field => <<"url">>
+ });
+validate_webhook_url(Url) ->
+ {BaseUrl, _Path} = emqx_connector_resource:parse_url(Url),
+ case emqx_http_lib:uri_parse(BaseUrl) of
+ {ok, _} ->
+ ok;
+ {error, Reason} ->
+ throw(#{
+ kind => validation_error,
+ reason => invalid_url,
+ url => Url,
+ error => emqx_utils:readable_error_msg(Reason)
+ })
+ end.
diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl
index 5a5e790e5..5ecfa76d1 100644
--- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl
+++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl
@@ -31,9 +31,14 @@
on_query/3,
on_query_async/4,
on_get_status/2,
- reply_delegator/3
+ on_add_channel/4,
+ on_remove_channel/3,
+ on_get_channels/1,
+ on_get_channel_status/3
]).
+-export([reply_delegator/3]).
+
-export([
roots/0,
fields/1,
@@ -41,7 +46,7 @@
namespace/0
]).
-%% for other webhook-like connectors.
+%% for other http-like connectors.
-export([redact_request/1]).
-export([validate_method/1, join_paths/2]).
@@ -251,6 +256,21 @@ start_pool(PoolName, PoolOpts) ->
Error
end.
+on_add_channel(
+ _InstId,
+ OldState,
+ ActionId,
+ ActionConfig
+) ->
+ InstalledActions = maps:get(installed_actions, OldState, #{}),
+ {ok, ActionState} = do_create_http_action(ActionConfig),
+ NewInstalledActions = maps:put(ActionId, ActionState, InstalledActions),
+ NewState = maps:put(installed_actions, NewInstalledActions, OldState),
+ {ok, NewState}.
+
+do_create_http_action(_ActionConfig = #{parameters := Params}) ->
+ {ok, preprocess_request(Params)}.
+
on_stop(InstId, _State) ->
?SLOG(info, #{
msg => "stopping_http_connector",
@@ -260,6 +280,16 @@ on_stop(InstId, _State) ->
?tp(emqx_connector_http_stopped, #{instance_id => InstId}),
Res.
+on_remove_channel(
+ _InstId,
+ OldState = #{installed_actions := InstalledActions},
+ ActionId
+) ->
+ NewInstalledActions = maps:remove(ActionId, InstalledActions),
+ NewState = maps:put(installed_actions, NewInstalledActions, OldState),
+ {ok, NewState}.
+
+%% BridgeV1 entrypoint
on_query(InstId, {send_message, Msg}, State) ->
case maps:get(request, State, undefined) of
undefined ->
@@ -282,6 +312,36 @@ on_query(InstId, {send_message, Msg}, State) ->
State
)
end;
+%% BridgeV2 entrypoint
+on_query(
+ InstId,
+ {ActionId, Msg},
+ State = #{installed_actions := InstalledActions}
+) when is_binary(ActionId) ->
+ case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
+ {undefined, _} ->
+ ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
+ {error, arg_request_not_found};
+ {_, undefined} ->
+ ?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}),
+ {error, action_not_found};
+ {Request, ActionState} ->
+ #{
+ method := Method,
+ path := Path,
+ body := Body,
+ headers := Headers,
+ request_timeout := Timeout
+ } = process_request_and_action(Request, ActionState, Msg),
+ %% bridge buffer worker has retry, do not let ehttpc retry
+ Retry = 2,
+ ClientId = maps:get(clientid, Msg, undefined),
+ on_query(
+ InstId,
+ {ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
+ State
+ )
+ end;
on_query(InstId, {Method, Request}, State) ->
%% TODO: Get retry from State
on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State);
@@ -343,6 +403,7 @@ on_query(
Result
end.
+%% BridgeV1 entrypoint
on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
case maps:get(request, State, undefined) of
undefined ->
@@ -364,6 +425,36 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
State
)
end;
+%% BridgeV2 entrypoint
+on_query_async(
+ InstId,
+ {ActionId, Msg},
+ ReplyFunAndArgs,
+ State = #{installed_actions := InstalledActions}
+) when is_binary(ActionId) ->
+ case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
+ {undefined, _} ->
+ ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
+ {error, arg_request_not_found};
+ {_, undefined} ->
+ ?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}),
+ {error, action_not_found};
+ {Request, ActionState} ->
+ #{
+ method := Method,
+ path := Path,
+ body := Body,
+ headers := Headers,
+ request_timeout := Timeout
+ } = process_request_and_action(Request, ActionState, Msg),
+ ClientId = maps:get(clientid, Msg, undefined),
+ on_query_async(
+ InstId,
+ {ClientId, Method, {Path, Headers, Body}, Timeout},
+ ReplyFunAndArgs,
+ State
+ )
+ end;
on_query_async(
InstId,
{KeyOrNum, Method, Request, Timeout},
@@ -411,6 +502,9 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->
ehttpc_pool:pick_worker(PoolName, Key)
end.
+on_get_channels(ResId) ->
+ emqx_bridge_v2:get_channels_for_connector(ResId).
+
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of
ok ->
@@ -456,6 +550,14 @@ do_get_status(PoolName, Timeout) ->
{error, timeout}
end.
+on_get_channel_status(
+ InstId,
+ _ChannelId,
+ State
+) ->
+ %% XXX: Reuse the connector status
+ on_get_status(InstId, State).
+
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
@@ -466,10 +568,10 @@ preprocess_request(Req) when map_size(Req) == 0 ->
preprocess_request(
#{
method := Method,
- path := Path,
- headers := Headers
+ path := Path
} = Req
) ->
+ Headers = maps:get(headers, Req, []),
#{
method => parse_template(to_bin(Method)),
path => parse_template(Path),
@@ -529,6 +631,49 @@ maybe_parse_template(Key, Conf) ->
parse_template(String) ->
emqx_template:parse(String).
+process_request_and_action(Request, ActionState, Msg) ->
+ MethodTemplate = maps:get(method, ActionState),
+ Method = make_method(render_template_string(MethodTemplate, Msg)),
+ BodyTemplate = maps:get(body, ActionState),
+ Body = render_request_body(BodyTemplate, Msg),
+
+ PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)),
+ PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)),
+
+ Path =
+ case PathSuffix of
+ "" -> PathPrefix;
+ _ -> join_paths(PathPrefix, PathSuffix)
+ end,
+
+ HeadersTemplate1 = maps:get(headers, Request),
+ HeadersTemplate2 = maps:get(headers, ActionState),
+ Headers = merge_proplist(
+ render_headers(HeadersTemplate1, Msg),
+ render_headers(HeadersTemplate2, Msg)
+ ),
+ #{
+ method => Method,
+ path => Path,
+ body => Body,
+ headers => Headers,
+ request_timeout => maps:get(request_timeout, ActionState)
+ }.
+
+merge_proplist(Proplist1, Proplist2) ->
+ lists:foldl(
+ fun({K, V}, Acc) ->
+ case lists:keyfind(K, 1, Acc) of
+ false ->
+ [{K, V} | Acc];
+ {K, _} = {K, V1} ->
+ [{K, V1} | Acc]
+ end
+ end,
+ Proplist2,
+ Proplist1
+ ).
+
process_request(
#{
method := MethodTemplate,
@@ -691,7 +836,7 @@ maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
true -> Context;
false -> Context#{attempt := Attempt + 1}
end,
- ?tp(webhook_will_retry_async, #{}),
+ ?tp(http_will_retry_async, #{}),
Worker = resolve_pool_worker(State, KeyOrNum),
ok = ehttpc:request_async(
Worker,
diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl
index 2e3d882d5..958fef4ac 100644
--- a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl
+++ b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl
@@ -18,69 +18,162 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
--import(hoconsc, [mk/2, enum/1, ref/2]).
+-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
-export([roots/0, fields/1, namespace/0, desc/1]).
+-export([
+ bridge_v2_examples/1,
+ %%conn_bridge_examples/1,
+ connector_examples/1
+]).
+
%%======================================================================================
%% Hocon Schema Definitions
-namespace() -> "bridge_webhook".
+
+namespace() -> "bridge_http".
roots() -> [].
-fields("config") ->
- basic_config() ++ request_config();
+%%--------------------------------------------------------------------
+%% v1 bridges http api
+%% see: emqx_bridge_schema:get_response/0, put_request/0, post_request/0
fields("post") ->
[
- type_field(),
+ old_type_field(),
name_field()
] ++ fields("config");
fields("put") ->
fields("config");
fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post");
-fields("creation_opts") ->
+%%--- v1 bridges config file
+%% see: emqx_bridge_schema:fields(bridges)
+fields("config") ->
+ basic_config() ++ request_config();
+%%--------------------------------------------------------------------
+%% v2: configuration
+fields(action) ->
+ {http,
+ mk(
+ hoconsc:map(name, ref(?MODULE, "http_action")),
+ #{
+ aliases => [webhook],
+ desc => <<"HTTP Action Config">>,
+ required => false
+ }
+ )};
+fields("http_action") ->
+ [
+ {enable, mk(boolean(), #{desc => ?DESC("config_enable_bridge"), default => true})},
+ {connector,
+ mk(binary(), #{
+ desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
+ })},
+ {description, emqx_schema:description_schema()},
+ %% Note: there's an implicit convention in `emqx_bridge' that,
+ %% for egress bridges with this config, the published messages
+ %% will be forwarded to such bridges.
+ {local_topic,
+ mk(
+ binary(),
+ #{
+ required => false,
+ desc => ?DESC("config_local_topic"),
+ importance => ?IMPORTANCE_HIDDEN
+ }
+ )},
+ %% Since e5.3.2, we split the http bridge to two parts: a) connector. b) actions.
+ %% some fields are moved to connector, some fields are moved to actions and composed into the
+ %% `parameters` field.
+ {parameters,
+ mk(ref("parameters_opts"), #{
+ required => true,
+ desc => ?DESC("config_parameters_opts")
+ })}
+ ] ++ http_resource_opts();
+fields("parameters_opts") ->
+ [
+ {path,
+ mk(
+ binary(),
+ #{
+ desc => ?DESC("config_path"),
+ required => false
+ }
+ )},
+ method_field(),
+ headers_field(),
+ body_field(),
+ max_retries_field(),
+ request_timeout_field()
+ ];
+%% v2: api schema
+%% The parameter equls to
+%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1
+%% `get_connector`, `post_connector`, `put_connector` from emqx_connector_schema:api_schema/1
+fields("post_" ++ Type) ->
+ [type_field(), name_field() | fields("config_" ++ Type)];
+fields("put_" ++ Type) ->
+ fields("config_" ++ Type);
+fields("get_" ++ Type) ->
+ emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type);
+fields("config_bridge_v2") ->
+ fields("http_action");
+fields("config_connector") ->
+ [
+ {enable,
+ mk(
+ boolean(),
+ #{
+ desc => <<"Enable or disable this connector">>,
+ default => true
+ }
+ )},
+ {description, emqx_schema:description_schema()}
+ ] ++ connector_url_headers() ++ connector_opts();
+%%--------------------------------------------------------------------
+%% v1/v2
+fields("resource_opts") ->
+ UnsupportedOpts = [enable_batch, batch_size, batch_time],
lists:filter(
- fun({K, _V}) ->
- not lists:member(K, unsupported_opts())
- end,
+ fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
emqx_resource_schema:fields("creation_opts")
).
desc("config") ->
?DESC("desc_config");
-desc("creation_opts") ->
+desc("resource_opts") ->
?DESC(emqx_resource_schema, "creation_opts");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for WebHook using `", string:to_upper(Method), "` method."];
+desc("config_connector") ->
+ ?DESC("desc_config");
+desc("http_action") ->
+ ?DESC("desc_config");
+desc("parameters_opts") ->
+ ?DESC("config_parameters_opts");
desc(_) ->
undefined.
+%%--------------------------------------------------------------------
+%% helpers for v1 only
+
basic_config() ->
[
{enable,
mk(
boolean(),
#{
- desc => ?DESC("config_enable"),
+ desc => ?DESC("config_enable_bridge"),
default => true
}
)}
- ] ++ webhook_creation_opts() ++
- proplists:delete(
- max_retries, emqx_bridge_http_connector:fields(config)
- ).
+ ] ++ http_resource_opts() ++ connector_opts().
request_config() ->
[
- {url,
- mk(
- binary(),
- #{
- required => true,
- desc => ?DESC("config_url")
- }
- )},
+ url_field(),
{direction,
mk(
egress,
@@ -98,81 +191,37 @@ request_config() ->
required => false
}
)},
- {method,
- mk(
- method(),
- #{
- default => post,
- desc => ?DESC("config_method")
- }
- )},
- {headers,
- mk(
- map(),
- #{
- default => #{
- <<"accept">> => <<"application/json">>,
- <<"cache-control">> => <<"no-cache">>,
- <<"connection">> => <<"keep-alive">>,
- <<"content-type">> => <<"application/json">>,
- <<"keep-alive">> => <<"timeout=5">>
- },
- desc => ?DESC("config_headers")
- }
- )},
- {body,
- mk(
- binary(),
- #{
- default => undefined,
- desc => ?DESC("config_body")
- }
- )},
- {max_retries,
- mk(
- non_neg_integer(),
- #{
- default => 2,
- desc => ?DESC("config_max_retries")
- }
- )},
- {request_timeout,
- mk(
- emqx_schema:duration_ms(),
- #{
- default => <<"15s">>,
- deprecated => {since, "v5.0.26"},
- desc => ?DESC("config_request_timeout")
- }
- )}
+ method_field(),
+ headers_field(),
+ body_field(),
+ max_retries_field(),
+ request_timeout_field()
].
-webhook_creation_opts() ->
- [
- {resource_opts,
- mk(
- ref(?MODULE, "creation_opts"),
- #{
- required => false,
- default => #{},
- desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
- }
- )}
- ].
+%%--------------------------------------------------------------------
+%% helpers for v2 only
-unsupported_opts() ->
- [
- enable_batch,
- batch_size,
- batch_time
- ].
+connector_url_headers() ->
+ [url_field(), headers_field()].
-%%======================================================================================
+%%--------------------------------------------------------------------
+%% common funcs
+
+%% `webhook` is kept for backward compatibility.
+old_type_field() ->
+ {type,
+ mk(
+ enum([webhook, http]),
+ #{
+ required => true,
+ desc => ?DESC("desc_type")
+ }
+ )}.
type_field() ->
{type,
mk(
- webhook,
+ http,
#{
required => true,
desc => ?DESC("desc_type")
@@ -189,5 +238,189 @@ name_field() ->
}
)}.
-method() ->
- enum([post, put, get, delete]).
+url_field() ->
+ {url,
+ mk(
+ binary(),
+ #{
+ required => true,
+ desc => ?DESC("config_url")
+ }
+ )}.
+
+headers_field() ->
+ {headers,
+ mk(
+ map(),
+ #{
+ default => #{
+ <<"accept">> => <<"application/json">>,
+ <<"cache-control">> => <<"no-cache">>,
+ <<"connection">> => <<"keep-alive">>,
+ <<"content-type">> => <<"application/json">>,
+ <<"keep-alive">> => <<"timeout=5">>
+ },
+ desc => ?DESC("config_headers")
+ }
+ )}.
+
+method_field() ->
+ {method,
+ mk(
+ enum([post, put, get, delete]),
+ #{
+ default => post,
+ desc => ?DESC("config_method")
+ }
+ )}.
+
+body_field() ->
+ {body,
+ mk(
+ binary(),
+ #{
+ default => undefined,
+ desc => ?DESC("config_body")
+ }
+ )}.
+
+max_retries_field() ->
+ {max_retries,
+ mk(
+ non_neg_integer(),
+ #{
+ default => 2,
+ desc => ?DESC("config_max_retries")
+ }
+ )}.
+
+request_timeout_field() ->
+ {request_timeout,
+ mk(
+ emqx_schema:duration_ms(),
+ #{
+ default => <<"15s">>,
+ deprecated => {since, "v5.0.26"},
+ desc => ?DESC("config_request_timeout")
+ }
+ )}.
+
+http_resource_opts() ->
+ [
+ {resource_opts,
+ mk(
+ ref(?MODULE, "resource_opts"),
+ #{
+ required => false,
+ default => #{},
+ desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
+ }
+ )}
+ ].
+
+connector_opts() ->
+ mark_request_field_deperecated(
+ proplists:delete(max_retries, emqx_bridge_http_connector:fields(config))
+ ).
+
+mark_request_field_deperecated(Fields) ->
+ lists:map(
+ fun({K, V}) ->
+ case K of
+ request ->
+ {K, V#{
+ %% Note: if we want to deprecate a reference type, we have to change
+ %% it to a direct type first.
+ type => typerefl:map(),
+ deprecated => {since, "5.3.2"},
+ desc => <<"This field is never used, so we deprecated it since 5.3.2.">>
+ }};
+ _ ->
+ {K, V}
+ end
+ end,
+ Fields
+ ).
+
+%%--------------------------------------------------------------------
+%% Examples
+
+bridge_v2_examples(Method) ->
+ [
+ #{
+ <<"http">> => #{
+ summary => <<"HTTP Action">>,
+ value => values({Method, bridge_v2})
+ }
+ }
+ ].
+
+connector_examples(Method) ->
+ [
+ #{
+ <<"http">> => #{
+ summary => <<"HTTP Connector">>,
+ value => values({Method, connector})
+ }
+ }
+ ].
+
+values({get, Type}) ->
+ maps:merge(
+ #{
+ status => <<"connected">>,
+ node_status => [
+ #{
+ node => <<"emqx@localhost">>,
+ status => <<"connected">>
+ }
+ ]
+ },
+ values({post, Type})
+ );
+values({post, bridge_v2}) ->
+ maps:merge(
+ #{
+ name => <<"my_http_action">>,
+ type => <<"http">>
+ },
+ values({put, bridge_v2})
+ );
+values({post, connector}) ->
+ maps:merge(
+ #{
+ name => <<"my_http_connector">>,
+ type => <<"http">>
+ },
+ values({put, connector})
+ );
+values({put, bridge_v2}) ->
+ values(bridge_v2);
+values({put, connector}) ->
+ values(connector);
+values(bridge_v2) ->
+ #{
+ enable => true,
+ connector => <<"my_http_connector">>,
+ parameters => #{
+ path => <<"/room/${room_no}">>,
+ method => <<"post">>,
+ headers => #{},
+ body => <<"${.}">>
+ },
+ resource_opts => #{
+ worker_pool_size => 16,
+ health_check_interval => <<"15s">>,
+ query_mode => <<"async">>
+ }
+ };
+values(connector) ->
+ #{
+ enable => true,
+ url => <<"http://localhost:8080/api/v1">>,
+ headers => #{<<"content-type">> => <<"application/json">>},
+ connect_timeout => <<"15s">>,
+ pool_type => <<"hash">>,
+ pool_size => 1,
+ enable_pipelining => 100
+ }.
diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl
index d9fc595fe..2ff7d184b 100644
--- a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl
+++ b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl
@@ -39,18 +39,33 @@ all() ->
groups() ->
[].
-init_per_suite(_Config) ->
- emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
- ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge, emqx_rule_engine]),
- ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
- {ok, _} = application:ensure_all_started(emqx_connector),
- [].
+init_per_suite(Config0) ->
+ Config =
+ case os:getenv("DEBUG_CASE") of
+ [_ | _] = DebugCase ->
+ CaseName = list_to_atom(DebugCase),
+ [{debug_case, CaseName} | Config0];
+ _ ->
+ Config0
+ end,
+ Apps = emqx_cth_suite:start(
+ [
+ emqx,
+ emqx_conf,
+ emqx_connector,
+ emqx_bridge_http,
+ emqx_bridge,
+ emqx_rule_engine
+ ],
+ #{work_dir => emqx_cth_suite:work_dir(Config)}
+ ),
+ emqx_mgmt_api_test_util:init_suite(),
+ [{apps, Apps} | Config].
-end_per_suite(_Config) ->
- ok = emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge, emqx_conf]),
- ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
- _ = application:stop(emqx_connector),
- _ = application:stop(emqx_bridge),
+end_per_suite(Config) ->
+ Apps = ?config(apps, Config),
+ emqx_mgmt_api_test_util:end_suite(),
+ ok = emqx_cth_suite:stop(Apps),
ok.
suite() ->
@@ -115,7 +130,9 @@ end_per_testcase(TestCase, _Config) when
->
ok = emqx_bridge_http_connector_test_server:stop(),
persistent_term:erase({?MODULE, times_called}),
- emqx_bridge_testlib:delete_all_bridges(),
+ %emqx_bridge_testlib:delete_all_bridges(),
+ emqx_bridge_v2_testlib:delete_all_bridges(),
+ emqx_bridge_v2_testlib:delete_all_connectors(),
emqx_common_test_helpers:call_janitor(),
ok;
end_per_testcase(_TestCase, Config) ->
@@ -123,7 +140,8 @@ end_per_testcase(_TestCase, Config) ->
undefined -> ok;
Server -> stop_http_server(Server)
end,
- emqx_bridge_testlib:delete_all_bridges(),
+ emqx_bridge_v2_testlib:delete_all_bridges(),
+ emqx_bridge_v2_testlib:delete_all_connectors(),
emqx_common_test_helpers:call_janitor(),
ok.
@@ -420,7 +438,7 @@ t_send_async_connection_timeout(Config) ->
),
NumberOfMessagesToSend = 10,
[
- emqx_bridge:send_message(BridgeID, #{<<"id">> => Id})
+ do_send_message(#{<<"id">> => Id})
|| Id <- lists:seq(1, NumberOfMessagesToSend)
],
%% Make sure server receives all messages
@@ -431,7 +449,7 @@ t_send_async_connection_timeout(Config) ->
t_async_free_retries(Config) ->
#{port := Port} = ?config(http_server, Config),
- BridgeID = make_bridge(#{
+ _BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "sync",
@@ -445,7 +463,7 @@ t_async_free_retries(Config) ->
Fn = fun(Get, Error) ->
?assertMatch(
{ok, 200, _, _},
- emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
+ do_send_message(#{<<"hello">> => <<"world">>}),
#{error => Error}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@@ -456,7 +474,7 @@ t_async_free_retries(Config) ->
t_async_common_retries(Config) ->
#{port := Port} = ?config(http_server, Config),
- BridgeID = make_bridge(#{
+ _BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "sync",
@@ -471,7 +489,7 @@ t_async_common_retries(Config) ->
FnSucceed = fun(Get, Error) ->
?assertMatch(
{ok, 200, _, _},
- emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
+ do_send_message(#{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@@ -479,7 +497,7 @@ t_async_common_retries(Config) ->
FnFail = fun(Get, Error) ->
?assertMatch(
Error,
- emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
+ do_send_message(#{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@@ -559,7 +577,7 @@ t_path_not_found(Config) ->
ok
end,
fun(Trace) ->
- ?assertEqual([], ?of_kind(webhook_will_retry_async, Trace)),
+ ?assertEqual([], ?of_kind(http_will_retry_async, Trace)),
ok
end
),
@@ -600,7 +618,7 @@ t_too_many_requests(Config) ->
ok
end,
fun(Trace) ->
- ?assertMatch([_ | _], ?of_kind(webhook_will_retry_async, Trace)),
+ ?assertMatch([_ | _], ?of_kind(http_will_retry_async, Trace)),
ok
end
),
@@ -711,6 +729,11 @@ t_bridge_probes_header_atoms(Config) ->
ok.
%% helpers
+
+do_send_message(Message) ->
+ Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(?BRIDGE_TYPE),
+ emqx_bridge_v2:send_message(Type, ?BRIDGE_NAME, Message, #{}).
+
do_t_async_retries(TestCase, TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext,
PTKey = {?MODULE, TestCase, attempts},
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index f6e0c0f95..a5b7692d7 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -137,7 +137,7 @@ param_path_id() ->
#{
in => path,
required => true,
- example => <<"webhook:webhook_example">>,
+ example => <<"http:my_http_connector">>,
desc => ?DESC("desc_param_path_id")
}
)}.
@@ -158,17 +158,7 @@ connector_info_array_example(Method) ->
lists:map(fun(#{value := Config}) -> Config end, maps:values(connector_info_examples(Method))).
connector_info_examples(Method) ->
- maps:merge(
- #{},
- emqx_enterprise_connector_examples(Method)
- ).
-
--if(?EMQX_RELEASE_EDITION == ee).
-emqx_enterprise_connector_examples(Method) ->
- emqx_connector_ee_schema:examples(Method).
--else.
-emqx_enterprise_connector_examples(_Method) -> #{}.
--endif.
+ emqx_connector_schema:examples(Method).
schema("/connectors") ->
#{
diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl
index bc648a102..ff2790481 100644
--- a/apps/emqx_connector/src/emqx_connector_resource.erl
+++ b/apps/emqx_connector/src/emqx_connector_resource.erl
@@ -49,6 +49,8 @@
get_channels/2
]).
+-export([parse_url/1]).
+
-callback connector_config(ParsedConfig) ->
ParsedConfig
when
@@ -77,8 +79,10 @@ connector_impl_module(_ConnectorType) ->
-endif.
-connector_to_resource_type_ce(_ConnectorType) ->
- no_bridge_v2_for_c2_so_far.
+connector_to_resource_type_ce(http) ->
+ emqx_bridge_http_connector;
+connector_to_resource_type_ce(ConnectorType) ->
+ error({no_bridge_v2, ConnectorType}).
resource_id(ConnectorId) when is_binary(ConnectorId) ->
<<"connector:", ConnectorId/binary>>.
@@ -271,13 +275,11 @@ remove(Type, Name, _Conf, _Opts) ->
%% convert connector configs to what the connector modules want
parse_confs(
- <<"webhook">>,
+ <<"http">>,
_Name,
#{
url := Url,
- method := Method,
- headers := Headers,
- max_retries := Retry
+ headers := Headers
} = Conf
) ->
Url1 = bin(Url),
@@ -290,20 +292,14 @@ parse_confs(
Reason1 = emqx_utils:readable_error_msg(Reason),
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
end,
- RequestTTL = emqx_utils_maps:deep_get(
- [resource_opts, request_ttl],
- Conf
- ),
Conf#{
base_url => BaseUrl1,
request =>
#{
path => Path,
- method => Method,
- body => maps:get(body, Conf, undefined),
headers => Headers,
- request_ttl => RequestTTL,
- max_retries => Retry
+ body => undefined,
+ method => undefined
}
};
parse_confs(<<"iotdb">>, Name, Conf) ->
diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
index c8ec8e1be..ef101ad28 100644
--- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
+++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
@@ -15,7 +15,8 @@
-export([
api_schemas/1,
fields/1,
- examples/1
+ %%examples/1
+ schema_modules/0
]).
resource_type(Type) when is_binary(Type) ->
@@ -59,18 +60,6 @@ connector_structs() ->
)}
].
-examples(Method) ->
- MergeFun =
- fun(Example, Examples) ->
- maps:merge(Examples, Example)
- end,
- Fun =
- fun(Module, Examples) ->
- ConnectorExamples = erlang:apply(Module, connector_examples, [Method]),
- lists:foldl(MergeFun, Examples, ConnectorExamples)
- end,
- lists:foldl(Fun, #{}, schema_modules()).
-
schema_modules() ->
[
emqx_bridge_kafka,
diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl
index de1ffb26b..890f84871 100644
--- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl
+++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl
@@ -35,6 +35,8 @@
-export([resource_opts_fields/0, resource_opts_fields/1]).
+-export([examples/1]).
+
-if(?EMQX_RELEASE_EDITION == ee).
enterprise_api_schemas(Method) ->
%% We *must* do this to ensure the module is really loaded, especially when we use
@@ -64,6 +66,37 @@ enterprise_fields_connectors() -> [].
-endif.
+api_schemas(Method) ->
+ [
+ %% We need to map the `type' field of a request (binary) to a
+ %% connector schema module.
+ api_ref(emqx_bridge_http_schema, <<"http">>, Method ++ "_connector")
+ ].
+
+api_ref(Module, Type, Method) ->
+ {Type, ref(Module, Method)}.
+
+examples(Method) ->
+ MergeFun =
+ fun(Example, Examples) ->
+ maps:merge(Examples, Example)
+ end,
+ Fun =
+ fun(Module, Examples) ->
+ ConnectorExamples = erlang:apply(Module, connector_examples, [Method]),
+ lists:foldl(MergeFun, Examples, ConnectorExamples)
+ end,
+ lists:foldl(Fun, #{}, schema_modules()).
+
+-if(?EMQX_RELEASE_EDITION == ee).
+schema_modules() ->
+ [emqx_bridge_http_schema] ++ emqx_connector_ee_schema:schema_modules().
+-else.
+schema_modules() ->
+ [emqx_bridge_http_schema].
+-endif.
+
+connector_type_to_bridge_types(http) -> [http, webhook];
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer].
@@ -298,8 +331,9 @@ post_request() ->
api_schema("post").
api_schema(Method) ->
+ CE = api_schemas(Method),
EE = enterprise_api_schemas(Method),
- hoconsc:union(connector_api_union(EE)).
+ hoconsc:union(connector_api_union(CE ++ EE)).
connector_api_union(Refs) ->
Index = maps:from_list(Refs),
@@ -344,7 +378,17 @@ roots() ->
end.
fields(connectors) ->
- [] ++ enterprise_fields_connectors().
+ [
+ {http,
+ mk(
+ hoconsc:map(name, ref(emqx_bridge_http_schema, "config_connector")),
+ #{
+ alias => [webhook],
+ desc => <<"HTTP Connector Config">>,
+ required => false
+ }
+ )}
+ ] ++ enterprise_fields_connectors().
desc(connectors) ->
?DESC("desc_connectors");
diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl
index 46566bd6f..7809f8b3d 100644
--- a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl
+++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl
@@ -452,6 +452,7 @@ apps_to_start() ->
emqx_modules,
emqx_gateway,
emqx_exhook,
+ emqx_bridge_http,
emqx_bridge,
emqx_auto_subscribe,
diff --git a/apps/emqx_resource/test/emqx_resource_schema_tests.erl b/apps/emqx_resource/test/emqx_resource_schema_tests.erl
index 78a761bd2..aac0a7d96 100644
--- a/apps/emqx_resource/test/emqx_resource_schema_tests.erl
+++ b/apps/emqx_resource/test/emqx_resource_schema_tests.erl
@@ -80,7 +80,7 @@ worker_pool_size_test_() ->
Conf = emqx_utils_maps:deep_put(
[
<<"bridges">>,
- <<"webhook">>,
+ <<"http">>,
<<"simple">>,
<<"resource_opts">>,
<<"worker_pool_size">>
@@ -88,7 +88,7 @@ worker_pool_size_test_() ->
BaseConf,
WorkerPoolSize
),
- #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := CheckedConf}}} = check(Conf),
+ #{<<"bridges">> := #{<<"http">> := #{<<"simple">> := CheckedConf}}} = check(Conf),
#{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf,
WPS
end,
@@ -117,7 +117,7 @@ worker_pool_size_test_() ->
%%===========================================================================
parse_and_check_webhook_bridge(Hocon) ->
- #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)),
+ #{<<"bridges">> := #{<<"http">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)),
Conf.
parse(Hocon) ->
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl
index aadd3d4f5..afa57dfac 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl
@@ -583,10 +583,18 @@ get_referenced_hookpoints(Froms) ->
].
get_egress_bridges(Actions) ->
- [
- emqx_bridge_resource:bridge_id(BridgeType, BridgeName)
- || {bridge, BridgeType, BridgeName, _ResId} <- Actions
- ].
+ lists:foldr(
+ fun
+ ({bridge, BridgeType, BridgeName, _ResId}, Acc) ->
+ [emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc];
+ ({bridge_v2, BridgeType, BridgeName}, Acc) ->
+ [emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc];
+ (_, Acc) ->
+ Acc
+ end,
+ [],
+ Actions
+ ).
%% For allowing an external application to add extra "built-in" functions to the
%% rule engine SQL like language. The module set by
diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
index 14682eff1..f3df46b80 100644
--- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
+++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
@@ -3468,7 +3468,7 @@ t_get_basic_usage_info_1(_Config) ->
referenced_bridges =>
#{
mqtt => 1,
- webhook => 3
+ http => 3
}
},
emqx_rule_engine:get_basic_usage_info()
diff --git a/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl b/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl
index 07cb18e60..73b3e331f 100644
--- a/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl
+++ b/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl
@@ -41,44 +41,32 @@ suite() ->
apps() ->
[
emqx_conf,
- emqx_management,
+ emqx_connector,
emqx_retainer,
emqx_auth,
emqx_auth_redis,
emqx_auth_mnesia,
emqx_auth_postgresql,
emqx_modules,
- emqx_telemetry
+ emqx_telemetry,
+ emqx_bridge_http,
+ emqx_bridge,
+ emqx_rule_engine,
+ emqx_management
].
init_per_suite(Config) ->
- net_kernel:start(['master@127.0.0.1', longnames]),
- ok = meck:new(emqx_authz_file, [non_strict, passthrough, no_history, no_link]),
- meck:expect(
- emqx_authz_file,
- acl_conf_file,
- fun() ->
- emqx_common_test_helpers:deps_path(emqx_auth, "etc/acl.conf")
- end
- ),
- ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
- emqx_gateway_test_utils:load_all_gateway_apps(),
- start_apps(),
- Config.
+ WorkDir = ?config(priv_dir, Config),
+ Apps = emqx_cth_suite:start(apps(), #{work_dir => WorkDir}),
+ emqx_mgmt_api_test_util:init_suite(),
+ [{apps, Apps}, {work_dir, WorkDir} | Config].
-end_per_suite(_Config) ->
- {ok, _} = emqx:update_config(
- [authorization],
- #{
- <<"no_match">> => <<"allow">>,
- <<"cache">> => #{<<"enable">> => <<"true">>},
- <<"sources">> => []
- }
- ),
+end_per_suite(Config) ->
mnesia:clear_table(cluster_rpc_commit),
mnesia:clear_table(cluster_rpc_mfa),
- stop_apps(),
- meck:unload(emqx_authz_file),
+ Apps = ?config(apps, Config),
+ emqx_mgmt_api_test_util:end_suite(),
+ ok = emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(t_get_telemetry_without_memsup, Config) ->
@@ -123,7 +111,6 @@ init_per_testcase(t_advanced_mqtt_features, Config) ->
mock_advanced_mqtt_features(),
Config;
init_per_testcase(t_authn_authz_info, Config) ->
- mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
create_authn('mqtt:global', built_in_database),
create_authn('tcp:default', redis),
@@ -141,14 +128,11 @@ init_per_testcase(t_send_after_enable, Config) ->
mock_httpc(),
Config;
init_per_testcase(t_rule_engine_and_data_bridge_info, Config) ->
- mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
- emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_bridge]),
ok = emqx_bridge_SUITE:setup_fake_telemetry_data(),
ok = setup_fake_rule_engine_data(),
Config;
init_per_testcase(t_exhook_info, Config) ->
- mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
ExhookConf =
#{
@@ -173,31 +157,8 @@ init_per_testcase(t_cluster_uuid, Config) ->
Node = start_slave(n1),
[{n1, Node} | Config];
init_per_testcase(t_uuid_restored_from_file, Config) ->
- mock_httpc(),
- NodeUUID = <<"AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE">>,
- ClusterUUID = <<"FFFFFFFF-GGGG-HHHH-IIII-JJJJJJJJJJJJ">>,
- DataDir = emqx:data_dir(),
- NodeUUIDFile = filename:join(DataDir, "node.uuid"),
- ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
- file:delete(NodeUUIDFile),
- file:delete(ClusterUUIDFile),
- ok = file:write_file(NodeUUIDFile, NodeUUID),
- ok = file:write_file(ClusterUUIDFile, ClusterUUID),
-
- %% clear the UUIDs in the DB
- {atomic, ok} = mria:clear_table(emqx_telemetry),
- stop_apps(),
- ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
- start_apps(),
- Node = start_slave(n1),
- [
- {n1, Node},
- {node_uuid, NodeUUID},
- {cluster_uuid, ClusterUUID}
- | Config
- ];
+ Config;
init_per_testcase(t_uuid_saved_to_file, Config) ->
- mock_httpc(),
DataDir = emqx:data_dir(),
NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
@@ -205,7 +166,6 @@ init_per_testcase(t_uuid_saved_to_file, Config) ->
file:delete(ClusterUUIDFile),
Config;
init_per_testcase(t_num_clients, Config) ->
- mock_httpc(),
ok = snabbkaffe:start_trace(),
Config;
init_per_testcase(_Testcase, Config) ->
@@ -227,7 +187,6 @@ end_per_testcase(t_advanced_mqtt_features, _Config) ->
{atomic, ok} = mria:clear_table(emqx_delayed),
ok;
end_per_testcase(t_authn_authz_info, _Config) ->
- meck:unload([httpc]),
emqx_authz:update({delete, postgresql}, #{}),
lists:foreach(
fun(ChainName) ->
@@ -244,19 +203,8 @@ end_per_testcase(t_enable, _Config) ->
end_per_testcase(t_send_after_enable, _Config) ->
meck:unload([httpc, emqx_telemetry_config]);
end_per_testcase(t_rule_engine_and_data_bridge_info, _Config) ->
- meck:unload(httpc),
- lists:foreach(
- fun(App) ->
- ok = application:stop(App)
- end,
- [
- emqx_bridge,
- emqx_rule_engine
- ]
- ),
ok;
end_per_testcase(t_exhook_info, _Config) ->
- meck:unload(httpc),
emqx_exhook_demo_svr:stop(),
application:stop(emqx_exhook),
ok;
@@ -264,21 +212,12 @@ end_per_testcase(t_cluster_uuid, Config) ->
Node = proplists:get_value(n1, Config),
ok = stop_slave(Node);
end_per_testcase(t_num_clients, Config) ->
- meck:unload([httpc]),
ok = snabbkaffe:stop(),
Config;
-end_per_testcase(t_uuid_restored_from_file, Config) ->
- Node = ?config(n1, Config),
- DataDir = emqx:data_dir(),
- NodeUUIDFile = filename:join(DataDir, "node.uuid"),
- ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
- ok = file:delete(NodeUUIDFile),
- ok = file:delete(ClusterUUIDFile),
- meck:unload([httpc]),
- ok = stop_slave(Node),
- ok;
end_per_testcase(_Testcase, _Config) ->
- meck:unload([httpc]),
+ case catch meck:unload([httpc]) of
+ _ -> ok
+ end,
ok.
%%------------------------------------------------------------------------------
@@ -315,19 +254,34 @@ t_cluster_uuid(Config) ->
%% should attempt read UUID from file in data dir to keep UUIDs
%% unique, in the event of a database purge.
t_uuid_restored_from_file(Config) ->
- ExpectedNodeUUID = ?config(node_uuid, Config),
- ExpectedClusterUUID = ?config(cluster_uuid, Config),
+ %% Stop the emqx_telemetry application first
+ {atomic, ok} = mria:clear_table(emqx_telemetry),
+ application:stop(emqx_telemetry),
+
+ %% Rewrite the the uuid files
+ NodeUUID = <<"AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE">>,
+ ClusterUUID = <<"FFFFFFFF-GGGG-HHHH-IIII-JJJJJJJJJJJJ">>,
+ DataDir = ?config(work_dir, Config),
+ NodeUUIDFile = filename:join(DataDir, "node.uuid"),
+ ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
+ ok = file:write_file(NodeUUIDFile, NodeUUID),
+ ok = file:write_file(ClusterUUIDFile, ClusterUUID),
+
+ %% Start the emqx_telemetry application again
+ application:start(emqx_telemetry),
+
+ %% Check the UUIDs
?assertEqual(
- {ok, ExpectedNodeUUID},
+ {ok, NodeUUID},
emqx_telemetry:get_node_uuid()
),
?assertEqual(
- {ok, ExpectedClusterUUID},
+ {ok, ClusterUUID},
emqx_telemetry:get_cluster_uuid()
),
ok.
-t_uuid_saved_to_file(_Config) ->
+t_uuid_saved_to_file(Config) ->
DataDir = emqx:data_dir(),
NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
@@ -337,9 +291,10 @@ t_uuid_saved_to_file(_Config) ->
%% clear the UUIDs in the DB
{atomic, ok} = mria:clear_table(emqx_telemetry),
- stop_apps(),
- ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
- start_apps(),
+ application:stop(emqx_telemetry),
+
+ application:start(emqx_telemetry),
+
{ok, NodeUUID} = emqx_telemetry:get_node_uuid(),
{ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(),
?assertEqual(
@@ -578,6 +533,7 @@ t_mqtt_runtime_insights(_) ->
t_rule_engine_and_data_bridge_info(_Config) ->
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
+ ct:pal("telemetry data: ~p~n", [TelemetryData]),
RuleInfo = get_value(rule_engine, TelemetryData),
BridgeInfo = get_value(bridge, TelemetryData),
?assertEqual(
@@ -588,7 +544,7 @@ t_rule_engine_and_data_bridge_info(_Config) ->
#{
data_bridge =>
#{
- webhook => #{num => 1, num_linked_by_rules => 3},
+ http => #{num => 1, num_linked_by_rules => 3},
mqtt => #{num => 2, num_linked_by_rules => 2}
},
num_data_bridges => 3
@@ -811,14 +767,6 @@ setup_fake_rule_engine_data() ->
),
ok.
-set_special_configs(emqx_auth) ->
- {ok, _} = emqx:update_config([authorization, cache, enable], false),
- {ok, _} = emqx:update_config([authorization, no_match], deny),
- {ok, _} = emqx:update_config([authorization, sources], []),
- ok;
-set_special_configs(_App) ->
- ok.
-
%% for some unknown reason, gen_rpc running locally or in CI might
%% start with different `port_discovery' modes, which means that'll
%% either be listening at the port in the config (`tcp_server_port',
@@ -887,9 +835,3 @@ leave_cluster() ->
is_official_version(V) ->
emqx_telemetry_config:is_official_version(V).
-
-start_apps() ->
- emqx_common_test_helpers:start_apps(apps(), fun set_special_configs/1).
-
-stop_apps() ->
- emqx_common_test_helpers:stop_apps(lists:reverse(apps())).
diff --git a/rel/i18n/emqx_bridge_http_schema.hocon b/rel/i18n/emqx_bridge_http_schema.hocon
index b7b715db1..416f77834 100644
--- a/rel/i18n/emqx_bridge_http_schema.hocon
+++ b/rel/i18n/emqx_bridge_http_schema.hocon
@@ -18,10 +18,10 @@ config_direction.desc:
config_direction.label:
"""Bridge Direction"""
-config_enable.desc:
+config_enable_bridge.desc:
"""Enable or disable this bridge"""
-config_enable.label:
+config_enable_bridge.label:
"""Enable Or Disable Bridge"""
config_headers.desc:
@@ -71,6 +71,21 @@ is not allowed."""
config_url.label:
"""HTTP Bridge"""
+config_path.desc:
+"""The URL path for this Action.
+This path will be appended to the Connector's url
configuration to form the full
+URL address.
+Template with variables is allowed in this option. For example, /room/{$room_no}
"""
+
+config_path.label:
+"""URL Path"""
+
+config_parameters_opts.desc:
+"""The parameters for HTTP action."""
+
+config_parameters_opts.label:
+"""Parameters"""
+
desc_config.desc:
"""Configuration for an HTTP bridge."""