diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml
index b23f91128..4a0d0403f 100644
--- a/.github/workflows/release.yaml
+++ b/.github/workflows/release.yaml
@@ -20,7 +20,14 @@ jobs:
upload:
runs-on: ubuntu-22.04
permissions:
+ contents: write
+ checks: write
packages: write
+ actions: read
+ issues: read
+ pull-requests: read
+ repository-projects: read
+ statuses: read
strategy:
fail-fast: false
steps:
@@ -45,11 +52,13 @@ jobs:
v*)
echo "profile=emqx" >> $GITHUB_OUTPUT
echo "version=$(./pkg-vsn.sh emqx)" >> $GITHUB_OUTPUT
+ echo "ref_name=v$(./pkg-vsn.sh emqx)" >> "$GITHUB_ENV"
echo "s3dir=emqx-ce" >> $GITHUB_OUTPUT
;;
e*)
echo "profile=emqx-enterprise" >> $GITHUB_OUTPUT
echo "version=$(./pkg-vsn.sh emqx-enterprise)" >> $GITHUB_OUTPUT
+ echo "ref_name=e$(./pkg-vsn.sh emqx-enterprise)" >> "$GITHUB_ENV"
echo "s3dir=emqx-ee" >> $GITHUB_OUTPUT
;;
esac
@@ -57,14 +66,15 @@ jobs:
run: |
BUCKET=${{ secrets.AWS_S3_BUCKET }}
OUTPUT_DIR=${{ steps.profile.outputs.s3dir }}
- aws s3 cp --recursive s3://$BUCKET/$OUTPUT_DIR/${{ github.ref_name }} packages
- - uses: alexellis/upload-assets@0.4.0
+ aws s3 cp --recursive s3://$BUCKET/$OUTPUT_DIR/${{ env.ref_name }} packages
+ - uses: emqx/upload-assets@8d2083b4dbe3151b0b735572eaa153b6acb647fe # 0.5.0
env:
GITHUB_TOKEN: ${{ github.token }}
with:
asset_paths: '["packages/*"]'
+ tag_name: "${{ env.ref_name }}"
- name: update to emqx.io
- if: startsWith(github.ref_name, 'v') && ((github.event_name == 'release' && !github.event.prerelease) || inputs.publish_release_artefacts)
+ if: startsWith(env.ref_name, 'v') && ((github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artefacts)
run: |
set -eux
curl -w %{http_code} \
@@ -72,10 +82,10 @@ jobs:
-H "Content-Type: application/json" \
-H "token: ${{ secrets.EMQX_IO_TOKEN }}" \
-X POST \
- -d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ github.ref_name }}\" }" \
+ -d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ env.ref_name }}\" }" \
${{ secrets.EMQX_IO_RELEASE_API }}
- name: Push to packagecloud.io
- if: (github.event_name == 'release' && !github.event.prerelease) || inputs.publish_release_artefacts
+ if: (github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artefacts
env:
PROFILE: ${{ steps.profile.outputs.profile }}
VERSION: ${{ steps.profile.outputs.version }}
diff --git a/.github/workflows/upload-helm-charts.yaml b/.github/workflows/upload-helm-charts.yaml
index 593a78a7c..44261d137 100644
--- a/.github/workflows/upload-helm-charts.yaml
+++ b/.github/workflows/upload-helm-charts.yaml
@@ -43,7 +43,7 @@ jobs:
;;
esac
- uses: emqx/push-helm-action@v1.1
- if: github.event_name == 'release' && !github.event.prerelease
+ if: github.event_name == 'release' && !github.event.release.prerelease
with:
charts_dir: "${{ github.workspace }}/deploy/charts/${{ steps.profile.outputs.profile }}"
version: ${{ steps.profile.outputs.version }}
diff --git a/Makefile b/Makefile
index 7c9638dd5..41812f6d9 100644
--- a/Makefile
+++ b/Makefile
@@ -20,8 +20,8 @@ endif
# Dashboard version
# from https://github.com/emqx/emqx-dashboard5
-export EMQX_DASHBOARD_VERSION ?= v1.5.1
-export EMQX_EE_DASHBOARD_VERSION ?= e1.3.2-beta.1
+export EMQX_DASHBOARD_VERSION ?= v1.5.2
+export EMQX_EE_DASHBOARD_VERSION ?= e1.3.2
PROFILE ?= emqx
REL_PROFILES := emqx emqx-enterprise
diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl
index 2f9254d70..299486ad1 100644
--- a/apps/emqx/include/emqx_release.hrl
+++ b/apps/emqx/include/emqx_release.hrl
@@ -35,7 +35,7 @@
-define(EMQX_RELEASE_CE, "5.3.2").
%% Enterprise edition
--define(EMQX_RELEASE_EE, "5.3.2-alpha.1").
+-define(EMQX_RELEASE_EE, "5.3.2").
%% The HTTP API version
-define(EMQX_API_VERSION, "5.0").
diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src
index 0545f36a5..915a66f17 100644
--- a/apps/emqx/src/emqx.app.src
+++ b/apps/emqx/src/emqx.app.src
@@ -2,7 +2,7 @@
{application, emqx, [
{id, "emqx"},
{description, "EMQX Core"},
- {vsn, "5.1.14"},
+ {vsn, "5.1.15"},
{modules, []},
{registered, []},
{applications, [
diff --git a/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src b/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src
index 3d4d5f467..d84d6ff81 100644
--- a/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src
+++ b/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_ldap, [
{description, "EMQX LDAP Authentication and Authorization"},
- {vsn, "0.1.1"},
+ {vsn, "0.1.2"},
{registered, []},
{mod, {emqx_auth_ldap_app, []}},
{applications, [
diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl
index 4f195b417..088a73396 100644
--- a/apps/emqx_bridge/src/emqx_action_info.erl
+++ b/apps/emqx_bridge/src/emqx_action_info.erl
@@ -89,7 +89,7 @@ hard_coded_action_info_modules_ee() ->
-endif.
hard_coded_action_info_modules_common() ->
- [].
+ [emqx_bridge_http_action_info].
hard_coded_action_info_modules() ->
hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee().
diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src
index f829b12df..2aa610f24 100644
--- a/apps/emqx_bridge/src/emqx_bridge.app.src
+++ b/apps/emqx_bridge/src/emqx_bridge.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge, [
{description, "EMQX bridges"},
- {vsn, "0.1.30"},
+ {vsn, "0.1.31"},
{registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}},
{applications, [
diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl
index 9e14c0c9a..d26a44a1d 100644
--- a/apps/emqx_bridge/src/emqx_bridge.erl
+++ b/apps/emqx_bridge/src/emqx_bridge.erl
@@ -364,7 +364,7 @@ get_metrics(Type, Name) ->
maybe_upgrade(mqtt, Config) ->
emqx_bridge_compatible_config:maybe_upgrade(Config);
maybe_upgrade(webhook, Config) ->
- emqx_bridge_compatible_config:webhook_maybe_upgrade(Config);
+ emqx_bridge_compatible_config:http_maybe_upgrade(Config);
maybe_upgrade(_Other, Config) ->
Config.
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index a3c058abb..b725eb740 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -143,7 +143,7 @@ param_path_id() ->
#{
in => path,
required => true,
- example => <<"webhook:webhook_example">>,
+ example => <<"http:http_example">>,
desc => ?DESC("desc_param_path_id")
}
)}.
@@ -166,9 +166,9 @@ bridge_info_array_example(Method) ->
bridge_info_examples(Method) ->
maps:merge(
#{
- <<"webhook_example">> => #{
- summary => <<"WebHook">>,
- value => info_example(webhook, Method)
+ <<"http_example">> => #{
+ summary => <<"HTTP">>,
+ value => info_example(http, Method)
},
<<"mqtt_example">> => #{
summary => <<"MQTT Bridge">>,
@@ -201,7 +201,7 @@ method_example(Type, Method) when Method == get; Method == post ->
method_example(_Type, put) ->
#{}.
-info_example_basic(webhook) ->
+info_example_basic(http) ->
#{
enable => true,
url => <<"http://localhost:9901/messages/${topic}">>,
@@ -212,7 +212,7 @@ info_example_basic(webhook) ->
pool_size => 4,
enable_pipelining => 100,
ssl => #{enable => false},
- local_topic => <<"emqx_webhook/#">>,
+ local_topic => <<"emqx_http/#">>,
method => post,
body => <<"${payload}">>,
resource_opts => #{
@@ -650,7 +650,8 @@ create_or_update_bridge(BridgeType0, BridgeName, Conf, HttpStatusCode) ->
get_metrics_from_local_node(BridgeType0, BridgeName) ->
BridgeType = upgrade_type(BridgeType0),
- format_metrics(emqx_bridge:get_metrics(BridgeType, BridgeName)).
+ MetricsResult = emqx_bridge:get_metrics(BridgeType, BridgeName),
+ format_metrics(MetricsResult).
'/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
?TRY_PARSE_ID(
diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl
index 231548f30..0a870abb8 100644
--- a/apps/emqx_bridge/src/emqx_bridge_resource.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl
@@ -63,18 +63,23 @@
).
-if(?EMQX_RELEASE_EDITION == ee).
-bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
-bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
-bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector;
-bridge_to_resource_type(webhook) -> emqx_bridge_http_connector;
-bridge_to_resource_type(BridgeType) -> emqx_bridge_enterprise:resource_type(BridgeType).
+bridge_to_resource_type(BridgeType) when is_binary(BridgeType) ->
+ bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8));
+bridge_to_resource_type(mqtt) ->
+ emqx_bridge_mqtt_connector;
+bridge_to_resource_type(webhook) ->
+ emqx_bridge_http_connector;
+bridge_to_resource_type(BridgeType) ->
+ emqx_bridge_enterprise:resource_type(BridgeType).
bridge_impl_module(BridgeType) -> emqx_bridge_enterprise:bridge_impl_module(BridgeType).
-else.
-bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
-bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
-bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector;
-bridge_to_resource_type(webhook) -> emqx_bridge_http_connector.
+bridge_to_resource_type(BridgeType) when is_binary(BridgeType) ->
+ bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8));
+bridge_to_resource_type(mqtt) ->
+ emqx_bridge_mqtt_connector;
+bridge_to_resource_type(webhook) ->
+ emqx_bridge_http_connector.
bridge_impl_module(_BridgeType) -> undefined.
-endif.
@@ -309,6 +314,7 @@ remove(Type, Name, _Conf, _Opts) ->
emqx_resource:remove_local(resource_id(Type, Name)).
%% convert bridge configs to what the connector modules want
+%% TODO: remove it, if the http_bridge already ported to v2
parse_confs(
<<"webhook">>,
_Name,
diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl
index 1863ed84b..97d0afb43 100644
--- a/apps/emqx_bridge/src/emqx_bridge_v2.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl
@@ -1188,7 +1188,7 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
%% If the bridge v2 does not exist, it is a valid bridge v1
PreviousRawConf = undefined,
split_bridge_v1_config_and_create_helper(
- BridgeV1Type, BridgeName, RawConf, PreviousRawConf
+ BridgeV1Type, BridgeName, RawConf, PreviousRawConf, fun() -> ok end
);
_Conf ->
case ?MODULE:bridge_v1_is_valid(BridgeV1Type, BridgeName) of
@@ -1198,9 +1198,13 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
PreviousRawConf = emqx:get_raw_config(
[?ROOT_KEY, BridgeV2Type, BridgeName], undefined
),
- bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps),
+ %% To avoid losing configurations. We have to make sure that no crash occurs
+ %% during deletion and creation of configurations.
+ PreCreateFun = fun() ->
+ bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps)
+ end,
split_bridge_v1_config_and_create_helper(
- BridgeV1Type, BridgeName, RawConf, PreviousRawConf
+ BridgeV1Type, BridgeName, RawConf, PreviousRawConf, PreCreateFun
);
false ->
%% If the bridge v2 exists, it is not a valid bridge v1
@@ -1208,16 +1212,49 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
end
end.
-split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf, PreviousRawConf) ->
- #{
- connector_type := ConnectorType,
- connector_name := NewConnectorName,
- connector_conf := NewConnectorRawConf,
- bridge_v2_type := BridgeType,
- bridge_v2_name := BridgeName,
- bridge_v2_conf := NewBridgeV2RawConf
- } =
- split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousRawConf),
+split_bridge_v1_config_and_create_helper(
+ BridgeV1Type, BridgeName, RawConf, PreviousRawConf, PreCreateFun
+) ->
+ try
+ #{
+ connector_type := ConnectorType,
+ connector_name := NewConnectorName,
+ connector_conf := NewConnectorRawConf,
+ bridge_v2_type := BridgeType,
+ bridge_v2_name := BridgeName,
+ bridge_v2_conf := NewBridgeV2RawConf
+ } = split_and_validate_bridge_v1_config(
+ BridgeV1Type,
+ BridgeName,
+ RawConf,
+ PreviousRawConf
+ ),
+
+ _ = PreCreateFun(),
+
+ do_connector_and_bridge_create(
+ ConnectorType,
+ NewConnectorName,
+ NewConnectorRawConf,
+ BridgeType,
+ BridgeName,
+ NewBridgeV2RawConf,
+ RawConf
+ )
+ catch
+ throw:Reason ->
+ {error, Reason}
+ end.
+
+do_connector_and_bridge_create(
+ ConnectorType,
+ NewConnectorName,
+ NewConnectorRawConf,
+ BridgeType,
+ BridgeName,
+ NewBridgeV2RawConf,
+ RawConf
+) ->
case emqx_connector:create(ConnectorType, NewConnectorName, NewConnectorRawConf) of
{ok, _} ->
case create(BridgeType, BridgeName, NewBridgeV2RawConf) of
@@ -1335,15 +1372,20 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
RawConf = maps:without([<<"name">>], RawConfig0),
TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
PreviousRawConf = undefined,
- #{
- connector_type := _ConnectorType,
- connector_name := _NewConnectorName,
- connector_conf := ConnectorRawConf,
- bridge_v2_type := BridgeV2Type,
- bridge_v2_name := _BridgeName,
- bridge_v2_conf := BridgeV2RawConf
- } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
- create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf).
+ try
+ #{
+ connector_type := _ConnectorType,
+ connector_name := _NewConnectorName,
+ connector_conf := ConnectorRawConf,
+ bridge_v2_type := BridgeV2Type,
+ bridge_v2_name := _BridgeName,
+ bridge_v2_conf := BridgeV2RawConf
+ } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
+ create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf)
+ catch
+ throw:Reason ->
+ {error, Reason}
+ end.
%% Only called by test cases (may create broken references)
bridge_v1_remove(BridgeV1Type, BridgeName) ->
diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl
index 13e84f84e..e6fcca50a 100644
--- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl
@@ -117,7 +117,7 @@ param_path_id() ->
#{
in => path,
required => true,
- example => <<"webhook:webhook_example">>,
+ example => <<"http:my_http_action">>,
desc => ?DESC("desc_param_path_id")
}
)}.
diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl
index 6adbf3942..b68a4c387 100644
--- a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl
+++ b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl
@@ -21,7 +21,7 @@
-export([
upgrade_pre_ee/2,
maybe_upgrade/1,
- webhook_maybe_upgrade/1
+ http_maybe_upgrade/1
]).
upgrade_pre_ee(undefined, _UpgradeFunc) ->
@@ -40,10 +40,10 @@ maybe_upgrade(#{<<"connector">> := _} = Config0) ->
maybe_upgrade(NewVersion) ->
NewVersion.
-webhook_maybe_upgrade(#{<<"direction">> := _} = Config0) ->
+http_maybe_upgrade(#{<<"direction">> := _} = Config0) ->
Config1 = maps:remove(<<"direction">>, Config0),
Config1#{<<"resource_opts">> => default_resource_opts()};
-webhook_maybe_upgrade(NewVersion) ->
+http_maybe_upgrade(NewVersion) ->
NewVersion.
binary_key({K, V}) ->
diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
index ff924ac8c..27b3a8f14 100644
--- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
+++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
@@ -162,13 +162,14 @@ roots() -> [{bridges, ?HOCON(?R_REF(bridges), #{importance => ?IMPORTANCE_LOW})}
fields(bridges) ->
[
- {webhook,
+ {http,
mk(
hoconsc:map(name, ref(emqx_bridge_http_schema, "config")),
#{
+ aliases => [webhook],
desc => ?DESC("bridges_webhook"),
required => false,
- converter => fun webhook_bridge_converter/2
+ converter => fun http_bridge_converter/2
}
)},
{mqtt,
@@ -243,7 +244,7 @@ status() ->
node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
-webhook_bridge_converter(Conf0, _HoconOpts) ->
+http_bridge_converter(Conf0, _HoconOpts) ->
emqx_bridge_compatible_config:upgrade_pre_ee(
- Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
+ Conf0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1
).
diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl
index bc8be5476..30107d0ce 100644
--- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl
@@ -30,14 +30,18 @@ init_per_suite(Config) ->
[
emqx,
emqx_conf,
+ emqx_connector,
+ emqx_bridge_http,
emqx_bridge
],
#{work_dir => ?config(priv_dir, Config)}
),
+ emqx_mgmt_api_test_util:init_suite(),
[{apps, Apps} | Config].
end_per_suite(Config) ->
Apps = ?config(apps, Config),
+ emqx_mgmt_api_test_util:end_suite(),
ok = emqx_cth_suite:stop(Apps),
ok.
@@ -58,6 +62,7 @@ end_per_testcase(t_get_basic_usage_info_1, _Config) ->
ok = emqx_bridge:remove(BridgeType, BridgeName)
end,
[
+ %% Keep using the old bridge names to avoid breaking the tests
{webhook, <<"basic_usage_info_webhook">>},
{webhook, <<"basic_usage_info_webhook_disabled">>},
{mqtt, <<"basic_usage_info_mqtt">>}
@@ -88,7 +93,7 @@ t_get_basic_usage_info_1(_Config) ->
#{
num_bridges => 3,
count_by_type => #{
- webhook => 1,
+ http => 1,
mqtt => 2
}
},
@@ -119,40 +124,33 @@ setup_fake_telemetry_data() ->
HTTPConfig = #{
url => <<"http://localhost:9901/messages/${topic}">>,
enable => true,
- local_topic => "emqx_webhook/#",
+ local_topic => "emqx_http/#",
method => post,
body => <<"${payload}">>,
headers => #{},
request_timeout => "15s"
},
- Conf =
- #{
- <<"bridges">> =>
- #{
- <<"webhook">> =>
- #{
- <<"basic_usage_info_webhook">> => HTTPConfig,
- <<"basic_usage_info_webhook_disabled">> =>
- HTTPConfig#{enable => false}
- },
- <<"mqtt">> =>
- #{
- <<"basic_usage_info_mqtt">> => MQTTConfig1,
- <<"basic_usage_info_mqtt_from_select">> => MQTTConfig2
- }
- }
- },
- ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf),
-
- ok = snabbkaffe:start_trace(),
- Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_loaded end,
- NEvents = 3,
- BackInTime = 0,
- Timeout = 11_000,
- {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, NEvents, Timeout, BackInTime),
- ok = emqx_bridge:load(),
- {ok, _} = snabbkaffe_collector:receive_events(Sub),
- ok = snabbkaffe:stop(),
+ %% Keep use the old bridge names to test the backward compatibility
+ {ok, _} = emqx_bridge_testlib:create_bridge_api(
+ <<"webhook">>,
+ <<"basic_usage_info_webhook">>,
+ HTTPConfig
+ ),
+ {ok, _} = emqx_bridge_testlib:create_bridge_api(
+ <<"webhook">>,
+ <<"basic_usage_info_webhook_disabled">>,
+ HTTPConfig#{enable => false}
+ ),
+ {ok, _} = emqx_bridge_testlib:create_bridge_api(
+ <<"mqtt">>,
+ <<"basic_usage_info_mqtt">>,
+ MQTTConfig1
+ ),
+ {ok, _} = emqx_bridge_testlib:create_bridge_api(
+ <<"mqtt">>,
+ <<"basic_usage_info_mqtt_from_select">>,
+ MQTTConfig2
+ ),
ok.
t_update_ssl_conf(Config) ->
diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
index ccc944572..e88206ccd 100644
--- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
@@ -73,13 +73,15 @@
-define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)).
-define(APPSPECS, [
- emqx_conf,
emqx,
+ emqx_conf,
emqx_auth,
emqx_auth_mnesia,
emqx_management,
- {emqx_rule_engine, "rule_engine { rules {} }"},
- {emqx_bridge, "bridges {}"}
+ emqx_connector,
+ emqx_bridge_http,
+ {emqx_bridge, "actions {}\n bridges {}"},
+ {emqx_rule_engine, "rule_engine { rules {} }"}
]).
-define(APPSPEC_DASHBOARD,
@@ -108,7 +110,7 @@ groups() ->
].
suite() ->
- [{timetrap, {seconds, 60}}].
+ [{timetrap, {seconds, 120}}].
init_per_suite(Config) ->
Config.
@@ -117,10 +119,10 @@ end_per_suite(_Config) ->
ok.
init_per_group(cluster = Name, Config) ->
- Nodes = [NodePrimary | _] = mk_cluster(Config),
+ Nodes = [NodePrimary | _] = mk_cluster(Name, Config),
init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
init_per_group(cluster_later_join = Name, Config) ->
- Nodes = [NodePrimary | _] = mk_cluster(Config, #{join_to => undefined}),
+ Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}),
init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
init_per_group(_Name, Config) ->
WorkDir = emqx_cth_suite:work_dir(Config),
@@ -132,10 +134,10 @@ init_api(Config) ->
{ok, App} = erpc:call(APINode, emqx_common_test_http, create_default_app, []),
[{api, App} | Config].
-mk_cluster(Config) ->
- mk_cluster(Config, #{}).
+mk_cluster(Name, Config) ->
+ mk_cluster(Name, Config, #{}).
-mk_cluster(Config, Opts) ->
+mk_cluster(Name, Config, Opts) ->
Node1Apps = ?APPSPECS ++ [?APPSPEC_DASHBOARD],
Node2Apps = ?APPSPECS,
emqx_cth_cluster:start(
@@ -143,7 +145,7 @@ mk_cluster(Config, Opts) ->
{emqx_bridge_api_SUITE1, Opts#{role => core, apps => Node1Apps}},
{emqx_bridge_api_SUITE2, Opts#{role => core, apps => Node2Apps}}
],
- #{work_dir => emqx_cth_suite:work_dir(Config)}
+ #{work_dir => emqx_cth_suite:work_dir(Name, Config)}
).
end_per_group(Group, Config) when
@@ -159,7 +161,7 @@ init_per_testcase(t_broken_bpapi_vsn, Config) ->
meck:new(emqx_bpapi, [passthrough]),
meck:expect(emqx_bpapi, supported_version, 1, -1),
meck:expect(emqx_bpapi, supported_version, 2, -1),
- init_per_testcase(commong, Config);
+ init_per_testcase(common, Config);
init_per_testcase(t_old_bpapi_vsn, Config) ->
meck:new(emqx_bpapi, [passthrough]),
meck:expect(emqx_bpapi, supported_version, 1, 1),
@@ -185,6 +187,18 @@ end_per_testcase(_, Config) ->
ok.
clear_resources() ->
+ lists:foreach(
+ fun(#{type := Type, name := Name}) ->
+ ok = emqx_bridge_v2:remove(Type, Name)
+ end,
+ emqx_bridge_v2:list()
+ ),
+ lists:foreach(
+ fun(#{type := Type, name := Name}) ->
+ ok = emqx_connector:remove(Type, Name)
+ end,
+ emqx_connector:list()
+ ),
lists:foreach(
fun(#{type := Type, name := Name}) ->
ok = emqx_bridge:remove(Type, Name)
@@ -407,10 +421,7 @@ t_http_crud_apis(Config) ->
Config
),
?assertMatch(
- #{
- <<"reason">> := <<"unknown_fields">>,
- <<"unknown">> := <<"curl">>
- },
+ #{<<"reason">> := <<"required_field">>},
json(maps:get(<<"message">>, PutFail2))
),
{ok, 400, _} = request_json(
@@ -419,12 +430,16 @@ t_http_crud_apis(Config) ->
?HTTP_BRIDGE(<<"localhost:1234/foo">>, Name),
Config
),
- {ok, 400, _} = request_json(
+ {ok, 400, PutFail3} = request_json(
put,
uri(["bridges", BridgeID]),
?HTTP_BRIDGE(<<"htpp://localhost:12341234/foo">>, Name),
Config
),
+ ?assertMatch(
+ #{<<"kind">> := <<"validation_error">>},
+ json(maps:get(<<"message">>, PutFail3))
+ ),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
@@ -463,7 +478,7 @@ t_http_crud_apis(Config) ->
),
%% Create non working bridge
- BrokenURL = ?URL(Port + 1, "/foo"),
+ BrokenURL = ?URL(Port + 1, "foo"),
{ok, 201, BrokenBridge} = request(
post,
uri(["bridges"]),
@@ -471,6 +486,7 @@ t_http_crud_apis(Config) ->
fun json/1,
Config
),
+
?assertMatch(
#{
<<"type">> := ?BRIDGE_TYPE_HTTP,
@@ -1307,7 +1323,9 @@ t_cluster_later_join_metrics(Config) ->
Name = ?BRIDGE_NAME,
BridgeParams = ?HTTP_BRIDGE(URL1, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
+
?check_trace(
+ #{timetrap => 15_000},
begin
%% Create a bridge on only one of the nodes.
?assertMatch({ok, 201, _}, request_json(post, uri(["bridges"]), BridgeParams, Config)),
@@ -1319,8 +1337,26 @@ t_cluster_later_join_metrics(Config) ->
}},
request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
),
+
+ ct:print("node joining cluster"),
%% Now join the other node join with the api node.
ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]),
+ %% Hack / workaround for the fact that `emqx_machine_boot' doesn't restart the
+ %% applications, in particular `emqx_conf' doesn't restart and synchronize the
+ %% transaction id. It's also unclear at the moment why the equivalent test in
+ %% `emqx_bridge_v2_api_SUITE' doesn't need this hack.
+ ok = erpc:call(OtherNode, application, stop, [emqx_conf]),
+ ok = erpc:call(OtherNode, application, start, [emqx_conf]),
+ ct:print("node joined cluster"),
+
+ %% assert: wait for the bridge to be ready on the other node.
+ {_, {ok, _}} =
+ ?wait_async_action(
+ {emqx_cluster_rpc, OtherNode} ! wake_up,
+ #{?snk_kind := cluster_rpc_caught_up, ?snk_meta := #{node := OtherNode}},
+ 10_000
+ ),
+
%% Check metrics; shouldn't crash even if the bridge is not
%% ready on the node that just joined the cluster.
?assertMatch(
@@ -1373,17 +1409,16 @@ t_create_with_bad_name(Config) ->
validate_resource_request_ttl(single, Timeout, Name) ->
SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
- BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
- ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name),
+ _BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
?check_trace(
begin
{ok, Res} =
?wait_async_action(
- emqx_bridge:send_message(BridgeID, SentData),
+ do_send_message(?BRIDGE_TYPE_HTTP, Name, SentData),
#{?snk_kind := async_query},
1000
),
- ?assertMatch({ok, #{id := ResId, query_opts := #{timeout := Timeout}}}, Res)
+ ?assertMatch({ok, #{id := _ResId, query_opts := #{timeout := Timeout}}}, Res)
end,
fun(Trace0) ->
Trace = ?of_kind(async_query, Trace0),
@@ -1394,6 +1429,10 @@ validate_resource_request_ttl(single, Timeout, Name) ->
validate_resource_request_ttl(_Cluster, _Timeout, _Name) ->
ignore.
+do_send_message(BridgeV1Type, Name, Message) ->
+ Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
+ emqx_bridge_v2:send_message(Type, Name, Message, #{}).
+
%%
request(Method, URL, Config) ->
diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl
index 86cc1f5c6..b267e9bf7 100644
--- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl
@@ -21,7 +21,7 @@ empty_config_test() ->
Conf1 = #{<<"bridges">> => #{}},
Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}},
?assertEqual(Conf1, check(Conf1)),
- ?assertEqual(Conf2, check(Conf2)),
+ ?assertEqual(#{<<"bridges">> => #{<<"http">> => #{}}}, check(Conf2)),
ok.
%% ensure webhook config can be checked
@@ -33,7 +33,7 @@ webhook_config_test() ->
?assertMatch(
#{
<<"bridges">> := #{
- <<"webhook">> := #{
+ <<"http">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
@@ -48,7 +48,7 @@ webhook_config_test() ->
?assertMatch(
#{
<<"bridges">> := #{
- <<"webhook">> := #{
+ <<"http">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
@@ -61,7 +61,7 @@ webhook_config_test() ->
),
#{
<<"bridges">> := #{
- <<"webhook">> := #{
+ <<"http">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
@@ -84,7 +84,7 @@ up(#{<<"mqtt">> := MqttBridges0} = Bridges) ->
Bridges#{<<"mqtt">> := MqttBridges};
up(#{<<"webhook">> := WebhookBridges0} = Bridges) ->
WebhookBridges = emqx_bridge_compatible_config:upgrade_pre_ee(
- WebhookBridges0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
+ WebhookBridges0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1
),
Bridges#{<<"webhook">> := WebhookBridges}.
diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl
index f486e5d64..118802551 100644
--- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl
@@ -92,7 +92,7 @@ end_per_testcase(_Testcase, Config) ->
delete_all_bridges() ->
lists:foreach(
fun(#{name := Name, type := Type}) ->
- emqx_bridge:remove(Type, Name)
+ ok = emqx_bridge:remove(Type, Name)
end,
emqx_bridge:list()
).
diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl
index 8758c325d..83a857b47 100644
--- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl
@@ -185,7 +185,7 @@ mk_cluster(Name, Config, Opts) ->
{emqx_bridge_v2_api_SUITE_1, Opts#{role => core, apps => Node1Apps}},
{emqx_bridge_v2_api_SUITE_2, Opts#{role => core, apps => Node2Apps}}
],
- #{work_dir => filename:join(?config(priv_dir, Config), Name)}
+ #{work_dir => emqx_cth_suite:work_dir(Name, Config)}
).
end_per_group(Group, Config) when
diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src
index f1c097d29..12d0890c3 100644
--- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src
+++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src
@@ -1,6 +1,6 @@
{application, emqx_bridge_azure_event_hub, [
{description, "EMQX Enterprise Azure Event Hub Bridge"},
- {vsn, "0.1.4"},
+ {vsn, "0.1.5"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src
index 6e2c93d20..59a02c190 100644
--- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src
+++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src
@@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"},
- {vsn, "0.1.10"},
+ {vsn, "0.1.11"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl
index 84a4e6d13..44b2d022a 100644
--- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl
+++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl
@@ -237,7 +237,10 @@ handle_continue(?patch_subscription, State0) ->
),
{noreply, State0};
error ->
- %% retry
+ %% retry; add a random delay for the case where multiple workers step on each
+ %% other's toes before retrying.
+ RandomMS = rand:uniform(500),
+ timer:sleep(RandomMS),
{noreply, State0, {continue, ?patch_subscription}}
end.
@@ -478,7 +481,6 @@ do_pull_async(State0) ->
Body = body(State0, pull),
PreparedRequest = {prepared_request, {Method, Path, Body}},
ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]},
- %% `ehttpc_pool'/`gproc_pool' might return `false' if there are no workers...
Res = emqx_bridge_gcp_pubsub_client:query_async(
PreparedRequest,
ReplyFunAndArgs,
diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl
index d82a61fee..86f81277c 100644
--- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl
+++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl
@@ -196,7 +196,7 @@ consumer_config(TestCase, Config) ->
" connect_timeout = \"5s\"\n"
" service_account_json = ~s\n"
" consumer {\n"
- " ack_deadline = \"60s\"\n"
+ " ack_deadline = \"10s\"\n"
" ack_retry_interval = \"1s\"\n"
" pull_max_messages = 10\n"
" consumer_workers_per_topic = 1\n"
@@ -208,7 +208,7 @@ consumer_config(TestCase, Config) ->
" resource_opts {\n"
" health_check_interval = \"1s\"\n"
%% to fail and retry pulling faster
- " request_ttl = \"5s\"\n"
+ " request_ttl = \"1s\"\n"
" }\n"
"}\n",
[
@@ -285,7 +285,7 @@ start_control_client() ->
connect_timeout => 5_000,
max_retries => 0,
pool_size => 1,
- resource_opts => #{request_ttl => 5_000},
+ resource_opts => #{request_ttl => 1_000},
service_account_json => RawServiceAccount
},
PoolName = <<"control_connector">>,
@@ -512,10 +512,23 @@ wait_acked(Opts) ->
%% no need to check return value; we check the property in
%% the check phase. this is just to give it a chance to do
%% so and avoid flakiness. should be fast.
- snabbkaffe:block_until(
+ Res = snabbkaffe:block_until(
?match_n_events(N, #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}),
Timeout
),
+ case Res of
+ {ok, _} ->
+ ok;
+ {timeout, Evts} ->
+ %% Fixme: apparently, snabbkaffe may timeout but still return the expected
+ %% events here.
+ case length(Evts) >= N of
+ true ->
+ ok;
+ false ->
+ ct:pal("timed out waiting for acks;\n expected: ~b\n received:\n ~p", [N, Evts])
+ end
+ end,
ok.
wait_forgotten() ->
@@ -652,25 +665,24 @@ setup_and_start_listeners(Node, NodeOpts) ->
end
).
+dedup([]) ->
+ [];
+dedup([X]) ->
+ [X];
+dedup([X | Rest]) ->
+ [X | dedup(X, Rest)].
+
+dedup(X, [X | Rest]) ->
+ dedup(X, Rest);
+dedup(_X, [Y | Rest]) ->
+ [Y | dedup(Y, Rest)];
+dedup(_X, []) ->
+ [].
+
%%------------------------------------------------------------------------------
%% Trace properties
%%------------------------------------------------------------------------------
-prop_pulled_only_once() ->
- {"all pulled message ids are unique", fun ?MODULE:prop_pulled_only_once/1}.
-prop_pulled_only_once(Trace) ->
- PulledIds =
- [
- MsgId
- || #{messages := Msgs} <- ?of_kind(gcp_pubsub_consumer_worker_decoded_messages, Trace),
- #{<<"message">> := #{<<"messageId">> := MsgId}} <- Msgs
- ],
- NumPulled = length(PulledIds),
- UniquePulledIds = sets:from_list(PulledIds, [{version, 2}]),
- UniqueNumPulled = sets:size(UniquePulledIds),
- ?assertEqual(UniqueNumPulled, NumPulled, #{pulled_ids => PulledIds}),
- ok.
-
prop_handled_only_once() ->
{"all pulled message are processed only once", fun ?MODULE:prop_handled_only_once/1}.
prop_handled_only_once(Trace) ->
@@ -1046,7 +1058,6 @@ t_consume_ok(Config) ->
end,
[
prop_all_pulled_are_acked(),
- prop_pulled_only_once(),
prop_handled_only_once(),
prop_acked_ids_eventually_forgotten()
]
@@ -1119,7 +1130,6 @@ t_bridge_rule_action_source(Config) ->
#{payload => Payload0}
end,
[
- prop_pulled_only_once(),
prop_handled_only_once()
]
),
@@ -1237,7 +1247,6 @@ t_multiple_topic_mappings(Config) ->
end,
[
prop_all_pulled_are_acked(),
- prop_pulled_only_once(),
prop_handled_only_once()
]
),
@@ -1265,11 +1274,12 @@ t_multiple_pull_workers(Config) ->
<<"consumer">> => #{
%% reduce flakiness
<<"ack_deadline">> => <<"10m">>,
+ <<"ack_retry_interval">> => <<"1s">>,
<<"consumer_workers_per_topic">> => NConsumers
},
<<"resource_opts">> => #{
%% reduce flakiness
- <<"request_ttl">> => <<"15s">>
+ <<"request_ttl">> => <<"20s">>
}
}
),
@@ -1297,7 +1307,6 @@ t_multiple_pull_workers(Config) ->
end,
[
prop_all_pulled_are_acked(),
- prop_pulled_only_once(),
prop_handled_only_once(),
{"message is processed only once", fun(Trace) ->
?assertMatch({timeout, _}, receive_published(#{timeout => 5_000})),
@@ -1531,11 +1540,12 @@ t_async_worker_death_mid_pull(Config) ->
ct:pal("published message"),
AsyncWorkerPids = get_async_worker_pids(Config),
+ Timeout = 20_000,
emqx_utils:pmap(
fun(AsyncWorkerPid) ->
Ref = monitor(process, AsyncWorkerPid),
ct:pal("killing pid ~p", [AsyncWorkerPid]),
- sys:terminate(AsyncWorkerPid, die, 20_000),
+ exit(AsyncWorkerPid, kill),
receive
{'DOWN', Ref, process, AsyncWorkerPid, _} ->
ct:pal("killed pid ~p", [AsyncWorkerPid]),
@@ -1544,7 +1554,8 @@ t_async_worker_death_mid_pull(Config) ->
end,
ok
end,
- AsyncWorkerPids
+ AsyncWorkerPids,
+ Timeout + 2_000
),
ok
@@ -1558,7 +1569,13 @@ t_async_worker_death_mid_pull(Config) ->
?wait_async_action(
create_bridge(
Config,
- #{<<"pool_size">> => 1}
+ #{
+ <<"pool_size">> => 1,
+ <<"consumer">> => #{
+ <<"ack_deadline">> => <<"10s">>,
+ <<"ack_retry_interval">> => <<"1s">>
+ }
+ }
),
#{?snk_kind := gcp_pubsub_consumer_worker_init},
10_000
@@ -1590,18 +1607,19 @@ t_async_worker_death_mid_pull(Config) ->
],
Trace
),
+ SubTraceEvts = ?projection(?snk_kind, SubTrace),
?assertMatch(
[
- #{?snk_kind := gcp_pubsub_consumer_worker_handled_async_worker_down},
- #{?snk_kind := gcp_pubsub_consumer_worker_reply_delegator}
+ gcp_pubsub_consumer_worker_handled_async_worker_down,
+ gcp_pubsub_consumer_worker_reply_delegator
| _
],
- SubTrace,
+ dedup(SubTraceEvts),
#{sub_trace => projection_optional_span(SubTrace)}
),
?assertMatch(
- #{?snk_kind := gcp_pubsub_consumer_worker_pull_response_received},
- lists:last(SubTrace)
+ gcp_pubsub_consumer_worker_pull_response_received,
+ lists:last(SubTraceEvts)
),
ok
end
@@ -1888,7 +1906,10 @@ t_connection_down_during_ack(Config) ->
{{ok, _}, {ok, _}} =
?wait_async_action(
- create_bridge(Config),
+ create_bridge(
+ Config,
+ #{<<"consumer">> => #{<<"ack_retry_interval">> => <<"1s">>}}
+ ),
#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
10_000
),
@@ -1930,7 +1951,6 @@ t_connection_down_during_ack(Config) ->
end,
[
prop_all_pulled_are_acked(),
- prop_pulled_only_once(),
prop_handled_only_once(),
{"message is processed only once", fun(Trace) ->
?assertMatch({timeout, _}, receive_published(#{timeout => 5_000})),
@@ -1955,7 +1975,15 @@ t_connection_down_during_ack_redeliver(Config) ->
?wait_async_action(
create_bridge(
Config,
- #{<<"consumer">> => #{<<"ack_deadline">> => <<"10s">>}}
+ #{
+ <<"consumer">> => #{
+ <<"ack_deadline">> => <<"12s">>,
+ <<"ack_retry_interval">> => <<"1s">>
+ },
+ <<"resource_opts">> => #{
+ <<"request_ttl">> => <<"11s">>
+ }
+ }
),
#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
10_000
@@ -2026,7 +2054,13 @@ t_connection_down_during_pull(Config) ->
{{ok, _}, {ok, _}} =
?wait_async_action(
- create_bridge(Config),
+ create_bridge(
+ Config,
+ #{
+ <<"consumer">> => #{<<"ack_retry_interval">> => <<"1s">>},
+ <<"resource_opts">> => #{<<"request_ttl">> => <<"11s">>}
+ }
+ ),
#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
10_000
),
diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src
index a8a938a0b..c28c3ed92 100644
--- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src
+++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src
@@ -1,6 +1,6 @@
{application, emqx_bridge_greptimedb, [
{description, "EMQX GreptimeDB Bridge"},
- {vsn, "0.1.4"},
+ {vsn, "0.1.5"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src
index 87d7e57a6..9cd71323e 100644
--- a/apps/emqx_bridge_http/src/emqx_bridge_http.app.src
+++ b/apps/emqx_bridge_http/src/emqx_bridge_http.app.src
@@ -3,7 +3,7 @@
{vsn, "0.1.5"},
{registered, []},
{applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]},
- {env, []},
+ {env, [{emqx_action_info_modules, [emqx_bridge_http_action_info]}]},
{modules, []},
{links, []}
]}.
diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl
new file mode 100644
index 000000000..457d8ff4b
--- /dev/null
+++ b/apps/emqx_bridge_http/src/emqx_bridge_http_action_info.erl
@@ -0,0 +1,102 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_http_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+ bridge_v1_type_name/0,
+ action_type_name/0,
+ connector_type_name/0,
+ schema_module/0,
+ connector_action_config_to_bridge_v1_config/2,
+ bridge_v1_config_to_action_config/2,
+ bridge_v1_config_to_connector_config/1
+]).
+
+-define(REMOVED_KEYS, [<<"direction">>]).
+-define(ACTION_KEYS, [<<"local_topic">>, <<"resource_opts">>]).
+-define(PARAMETER_KEYS, [<<"body">>, <<"max_retries">>, <<"method">>, <<"request_timeout">>]).
+
+bridge_v1_type_name() -> webhook.
+
+action_type_name() -> http.
+
+connector_type_name() -> http.
+
+schema_module() -> emqx_bridge_http_schema.
+
+connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
+ BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
+ %% Move parameters to the top level
+ ParametersMap1 = maps:get(<<"parameters">>, BridgeV1Config1, #{}),
+ ParametersMap2 = maps:without([<<"path">>, <<"headers">>], ParametersMap1),
+ BridgeV1Config2 = maps:remove(<<"parameters">>, BridgeV1Config1),
+ BridgeV1Config3 = emqx_utils_maps:deep_merge(BridgeV1Config2, ParametersMap2),
+ BridgeV1Config4 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config3),
+
+ Url = maps:get(<<"url">>, ConnectorConfig),
+ Path = maps:get(<<"path">>, ParametersMap1, <<>>),
+
+ Headers1 = maps:get(<<"headers">>, ConnectorConfig, #{}),
+ Headers2 = maps:get(<<"headers">>, ParametersMap1, #{}),
+
+ Url1 =
+ case Path of
+ <<>> -> Url;
+ _ -> iolist_to_binary(emqx_bridge_http_connector:join_paths(Url, Path))
+ end,
+
+ BridgeV1Config4#{
+ <<"headers">> => maps:merge(Headers1, Headers2),
+ <<"url">> => Url1
+ }.
+
+bridge_v1_config_to_connector_config(BridgeV1Conf) ->
+ %% To statisfy the emqx_bridge_api_SUITE:t_http_crud_apis/1
+ ok = validate_webhook_url(maps:get(<<"url">>, BridgeV1Conf, undefined)),
+ maps:without(?REMOVED_KEYS ++ ?ACTION_KEYS ++ ?PARAMETER_KEYS, BridgeV1Conf).
+
+bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
+ Parameters = maps:with(?PARAMETER_KEYS, BridgeV1Conf),
+ Parameters1 = Parameters#{<<"path">> => <<>>, <<"headers">> => #{}},
+ CommonKeys = [<<"enable">>, <<"description">>],
+ ActionConfig = maps:with(?ACTION_KEYS ++ CommonKeys, BridgeV1Conf),
+ ActionConfig#{<<"parameters">> => Parameters1, <<"connector">> => ConnectorName}.
+
+%%--------------------------------------------------------------------
+%% helpers
+
+validate_webhook_url(undefined) ->
+ throw(#{
+ kind => validation_error,
+ reason => required_field,
+ required_field => <<"url">>
+ });
+validate_webhook_url(Url) ->
+ {BaseUrl, _Path} = emqx_connector_resource:parse_url(Url),
+ case emqx_http_lib:uri_parse(BaseUrl) of
+ {ok, _} ->
+ ok;
+ {error, Reason} ->
+ throw(#{
+ kind => validation_error,
+ reason => invalid_url,
+ url => Url,
+ error => emqx_utils:readable_error_msg(Reason)
+ })
+ end.
diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl
index 5a5e790e5..5ecfa76d1 100644
--- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl
+++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl
@@ -31,9 +31,14 @@
on_query/3,
on_query_async/4,
on_get_status/2,
- reply_delegator/3
+ on_add_channel/4,
+ on_remove_channel/3,
+ on_get_channels/1,
+ on_get_channel_status/3
]).
+-export([reply_delegator/3]).
+
-export([
roots/0,
fields/1,
@@ -41,7 +46,7 @@
namespace/0
]).
-%% for other webhook-like connectors.
+%% for other http-like connectors.
-export([redact_request/1]).
-export([validate_method/1, join_paths/2]).
@@ -251,6 +256,21 @@ start_pool(PoolName, PoolOpts) ->
Error
end.
+on_add_channel(
+ _InstId,
+ OldState,
+ ActionId,
+ ActionConfig
+) ->
+ InstalledActions = maps:get(installed_actions, OldState, #{}),
+ {ok, ActionState} = do_create_http_action(ActionConfig),
+ NewInstalledActions = maps:put(ActionId, ActionState, InstalledActions),
+ NewState = maps:put(installed_actions, NewInstalledActions, OldState),
+ {ok, NewState}.
+
+do_create_http_action(_ActionConfig = #{parameters := Params}) ->
+ {ok, preprocess_request(Params)}.
+
on_stop(InstId, _State) ->
?SLOG(info, #{
msg => "stopping_http_connector",
@@ -260,6 +280,16 @@ on_stop(InstId, _State) ->
?tp(emqx_connector_http_stopped, #{instance_id => InstId}),
Res.
+on_remove_channel(
+ _InstId,
+ OldState = #{installed_actions := InstalledActions},
+ ActionId
+) ->
+ NewInstalledActions = maps:remove(ActionId, InstalledActions),
+ NewState = maps:put(installed_actions, NewInstalledActions, OldState),
+ {ok, NewState}.
+
+%% BridgeV1 entrypoint
on_query(InstId, {send_message, Msg}, State) ->
case maps:get(request, State, undefined) of
undefined ->
@@ -282,6 +312,36 @@ on_query(InstId, {send_message, Msg}, State) ->
State
)
end;
+%% BridgeV2 entrypoint
+on_query(
+ InstId,
+ {ActionId, Msg},
+ State = #{installed_actions := InstalledActions}
+) when is_binary(ActionId) ->
+ case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
+ {undefined, _} ->
+ ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
+ {error, arg_request_not_found};
+ {_, undefined} ->
+ ?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}),
+ {error, action_not_found};
+ {Request, ActionState} ->
+ #{
+ method := Method,
+ path := Path,
+ body := Body,
+ headers := Headers,
+ request_timeout := Timeout
+ } = process_request_and_action(Request, ActionState, Msg),
+ %% bridge buffer worker has retry, do not let ehttpc retry
+ Retry = 2,
+ ClientId = maps:get(clientid, Msg, undefined),
+ on_query(
+ InstId,
+ {ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
+ State
+ )
+ end;
on_query(InstId, {Method, Request}, State) ->
%% TODO: Get retry from State
on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State);
@@ -343,6 +403,7 @@ on_query(
Result
end.
+%% BridgeV1 entrypoint
on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
case maps:get(request, State, undefined) of
undefined ->
@@ -364,6 +425,36 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
State
)
end;
+%% BridgeV2 entrypoint
+on_query_async(
+ InstId,
+ {ActionId, Msg},
+ ReplyFunAndArgs,
+ State = #{installed_actions := InstalledActions}
+) when is_binary(ActionId) ->
+ case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
+ {undefined, _} ->
+ ?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
+ {error, arg_request_not_found};
+ {_, undefined} ->
+ ?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}),
+ {error, action_not_found};
+ {Request, ActionState} ->
+ #{
+ method := Method,
+ path := Path,
+ body := Body,
+ headers := Headers,
+ request_timeout := Timeout
+ } = process_request_and_action(Request, ActionState, Msg),
+ ClientId = maps:get(clientid, Msg, undefined),
+ on_query_async(
+ InstId,
+ {ClientId, Method, {Path, Headers, Body}, Timeout},
+ ReplyFunAndArgs,
+ State
+ )
+ end;
on_query_async(
InstId,
{KeyOrNum, Method, Request, Timeout},
@@ -411,6 +502,9 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->
ehttpc_pool:pick_worker(PoolName, Key)
end.
+on_get_channels(ResId) ->
+ emqx_bridge_v2:get_channels_for_connector(ResId).
+
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of
ok ->
@@ -456,6 +550,14 @@ do_get_status(PoolName, Timeout) ->
{error, timeout}
end.
+on_get_channel_status(
+ InstId,
+ _ChannelId,
+ State
+) ->
+ %% XXX: Reuse the connector status
+ on_get_status(InstId, State).
+
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
@@ -466,10 +568,10 @@ preprocess_request(Req) when map_size(Req) == 0 ->
preprocess_request(
#{
method := Method,
- path := Path,
- headers := Headers
+ path := Path
} = Req
) ->
+ Headers = maps:get(headers, Req, []),
#{
method => parse_template(to_bin(Method)),
path => parse_template(Path),
@@ -529,6 +631,49 @@ maybe_parse_template(Key, Conf) ->
parse_template(String) ->
emqx_template:parse(String).
+process_request_and_action(Request, ActionState, Msg) ->
+ MethodTemplate = maps:get(method, ActionState),
+ Method = make_method(render_template_string(MethodTemplate, Msg)),
+ BodyTemplate = maps:get(body, ActionState),
+ Body = render_request_body(BodyTemplate, Msg),
+
+ PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)),
+ PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)),
+
+ Path =
+ case PathSuffix of
+ "" -> PathPrefix;
+ _ -> join_paths(PathPrefix, PathSuffix)
+ end,
+
+ HeadersTemplate1 = maps:get(headers, Request),
+ HeadersTemplate2 = maps:get(headers, ActionState),
+ Headers = merge_proplist(
+ render_headers(HeadersTemplate1, Msg),
+ render_headers(HeadersTemplate2, Msg)
+ ),
+ #{
+ method => Method,
+ path => Path,
+ body => Body,
+ headers => Headers,
+ request_timeout => maps:get(request_timeout, ActionState)
+ }.
+
+merge_proplist(Proplist1, Proplist2) ->
+ lists:foldl(
+ fun({K, V}, Acc) ->
+ case lists:keyfind(K, 1, Acc) of
+ false ->
+ [{K, V} | Acc];
+ {K, _} = {K, V1} ->
+ [{K, V1} | Acc]
+ end
+ end,
+ Proplist2,
+ Proplist1
+ ).
+
process_request(
#{
method := MethodTemplate,
@@ -691,7 +836,7 @@ maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
true -> Context;
false -> Context#{attempt := Attempt + 1}
end,
- ?tp(webhook_will_retry_async, #{}),
+ ?tp(http_will_retry_async, #{}),
Worker = resolve_pool_worker(State, KeyOrNum),
ok = ehttpc:request_async(
Worker,
diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl
index a10646bac..6a9219c11 100644
--- a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl
+++ b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl
@@ -18,69 +18,162 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
--import(hoconsc, [mk/2, enum/1, ref/2]).
+-import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
-export([roots/0, fields/1, namespace/0, desc/1]).
+-export([
+ bridge_v2_examples/1,
+ %%conn_bridge_examples/1,
+ connector_examples/1
+]).
+
%%======================================================================================
%% Hocon Schema Definitions
namespace() -> "bridge_http".
roots() -> [].
-fields("config") ->
- basic_config() ++ request_config();
+%%--------------------------------------------------------------------
+%% v1 bridges http api
+%% see: emqx_bridge_schema:get_response/0, put_request/0, post_request/0
fields("post") ->
[
- type_field(),
+ old_type_field(),
name_field()
] ++ fields("config");
fields("put") ->
fields("config");
fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post");
-fields("creation_opts") ->
+%%--- v1 bridges config file
+%% see: emqx_bridge_schema:fields(bridges)
+fields("config") ->
+ basic_config() ++ request_config();
+%%--------------------------------------------------------------------
+%% v2: configuration
+fields(action) ->
+ {http,
+ mk(
+ hoconsc:map(name, ref(?MODULE, "http_action")),
+ #{
+ aliases => [webhook],
+ desc => <<"HTTP Action Config">>,
+ required => false
+ }
+ )};
+fields("http_action") ->
+ [
+ {enable, mk(boolean(), #{desc => ?DESC("config_enable_bridge"), default => true})},
+ {connector,
+ mk(binary(), #{
+ desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
+ })},
+ {description, emqx_schema:description_schema()},
+ %% Note: there's an implicit convention in `emqx_bridge' that,
+ %% for egress bridges with this config, the published messages
+ %% will be forwarded to such bridges.
+ {local_topic,
+ mk(
+ binary(),
+ #{
+ required => false,
+ desc => ?DESC("config_local_topic"),
+ importance => ?IMPORTANCE_HIDDEN
+ }
+ )},
+ %% Since e5.3.2, we split the http bridge to two parts: a) connector. b) actions.
+ %% some fields are moved to connector, some fields are moved to actions and composed into the
+ %% `parameters` field.
+ {parameters,
+ mk(ref("parameters_opts"), #{
+ required => true,
+ desc => ?DESC("config_parameters_opts")
+ })}
+ ] ++ http_resource_opts();
+fields("parameters_opts") ->
+ [
+ {path,
+ mk(
+ binary(),
+ #{
+ desc => ?DESC("config_path"),
+ required => false
+ }
+ )},
+ method_field(),
+ headers_field(),
+ body_field(),
+ max_retries_field(),
+ request_timeout_field()
+ ];
+%% v2: api schema
+%% The parameter equls to
+%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1
+%% `get_connector`, `post_connector`, `put_connector` from emqx_connector_schema:api_schema/1
+fields("post_" ++ Type) ->
+ [type_field(), name_field() | fields("config_" ++ Type)];
+fields("put_" ++ Type) ->
+ fields("config_" ++ Type);
+fields("get_" ++ Type) ->
+ emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type);
+fields("config_bridge_v2") ->
+ fields("http_action");
+fields("config_connector") ->
+ [
+ {enable,
+ mk(
+ boolean(),
+ #{
+ desc => <<"Enable or disable this connector">>,
+ default => true
+ }
+ )},
+ {description, emqx_schema:description_schema()}
+ ] ++ connector_url_headers() ++ connector_opts();
+%%--------------------------------------------------------------------
+%% v1/v2
+fields("resource_opts") ->
+ UnsupportedOpts = [enable_batch, batch_size, batch_time],
lists:filter(
- fun({K, _V}) ->
- not lists:member(K, unsupported_opts())
- end,
+ fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
emqx_resource_schema:fields("creation_opts")
).
desc("config") ->
?DESC("desc_config");
-desc("creation_opts") ->
+desc("resource_opts") ->
?DESC(emqx_resource_schema, "creation_opts");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for WebHook using `", string:to_upper(Method), "` method."];
+desc("config_connector") ->
+ ?DESC("desc_config");
+desc("http_action") ->
+ ?DESC("desc_config");
+desc("parameters_opts") ->
+ ?DESC("config_parameters_opts");
desc(_) ->
undefined.
+%%--------------------------------------------------------------------
+%% helpers for v1 only
+
basic_config() ->
[
{enable,
mk(
boolean(),
#{
- desc => ?DESC("config_enable"),
+ desc => ?DESC("config_enable_bridge"),
default => true
}
- )}
- ] ++ webhook_creation_opts() ++
- proplists:delete(
- max_retries, emqx_bridge_http_connector:fields(config)
- ).
+ )},
+ {description, emqx_schema:description_schema()}
+ ] ++ http_resource_opts() ++ connector_opts().
request_config() ->
[
- {url,
- mk(
- binary(),
- #{
- required => true,
- desc => ?DESC("config_url")
- }
- )},
+ url_field(),
{direction,
mk(
egress,
@@ -98,81 +191,37 @@ request_config() ->
required => false
}
)},
- {method,
- mk(
- method(),
- #{
- default => post,
- desc => ?DESC("config_method")
- }
- )},
- {headers,
- mk(
- map(),
- #{
- default => #{
- <<"accept">> => <<"application/json">>,
- <<"cache-control">> => <<"no-cache">>,
- <<"connection">> => <<"keep-alive">>,
- <<"content-type">> => <<"application/json">>,
- <<"keep-alive">> => <<"timeout=5">>
- },
- desc => ?DESC("config_headers")
- }
- )},
- {body,
- mk(
- binary(),
- #{
- default => undefined,
- desc => ?DESC("config_body")
- }
- )},
- {max_retries,
- mk(
- non_neg_integer(),
- #{
- default => 2,
- desc => ?DESC("config_max_retries")
- }
- )},
- {request_timeout,
- mk(
- emqx_schema:duration_ms(),
- #{
- default => <<"15s">>,
- deprecated => {since, "v5.0.26"},
- desc => ?DESC("config_request_timeout")
- }
- )}
+ method_field(),
+ headers_field(),
+ body_field(),
+ max_retries_field(),
+ request_timeout_field()
].
-webhook_creation_opts() ->
- [
- {resource_opts,
- mk(
- ref(?MODULE, "creation_opts"),
- #{
- required => false,
- default => #{},
- desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
- }
- )}
- ].
+%%--------------------------------------------------------------------
+%% helpers for v2 only
-unsupported_opts() ->
- [
- enable_batch,
- batch_size,
- batch_time
- ].
+connector_url_headers() ->
+ [url_field(), headers_field()].
-%%======================================================================================
+%%--------------------------------------------------------------------
+%% common funcs
+
+%% `webhook` is kept for backward compatibility.
+old_type_field() ->
+ {type,
+ mk(
+ enum([webhook, http]),
+ #{
+ required => true,
+ desc => ?DESC("desc_type")
+ }
+ )}.
type_field() ->
{type,
mk(
- webhook,
+ http,
#{
required => true,
desc => ?DESC("desc_type")
@@ -189,5 +238,189 @@ name_field() ->
}
)}.
-method() ->
- enum([post, put, get, delete]).
+url_field() ->
+ {url,
+ mk(
+ binary(),
+ #{
+ required => true,
+ desc => ?DESC("config_url")
+ }
+ )}.
+
+headers_field() ->
+ {headers,
+ mk(
+ map(),
+ #{
+ default => #{
+ <<"accept">> => <<"application/json">>,
+ <<"cache-control">> => <<"no-cache">>,
+ <<"connection">> => <<"keep-alive">>,
+ <<"content-type">> => <<"application/json">>,
+ <<"keep-alive">> => <<"timeout=5">>
+ },
+ desc => ?DESC("config_headers")
+ }
+ )}.
+
+method_field() ->
+ {method,
+ mk(
+ enum([post, put, get, delete]),
+ #{
+ default => post,
+ desc => ?DESC("config_method")
+ }
+ )}.
+
+body_field() ->
+ {body,
+ mk(
+ binary(),
+ #{
+ default => undefined,
+ desc => ?DESC("config_body")
+ }
+ )}.
+
+max_retries_field() ->
+ {max_retries,
+ mk(
+ non_neg_integer(),
+ #{
+ default => 2,
+ desc => ?DESC("config_max_retries")
+ }
+ )}.
+
+request_timeout_field() ->
+ {request_timeout,
+ mk(
+ emqx_schema:duration_ms(),
+ #{
+ default => <<"15s">>,
+ deprecated => {since, "v5.0.26"},
+ desc => ?DESC("config_request_timeout")
+ }
+ )}.
+
+http_resource_opts() ->
+ [
+ {resource_opts,
+ mk(
+ ref(?MODULE, "resource_opts"),
+ #{
+ required => false,
+ default => #{},
+ desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
+ }
+ )}
+ ].
+
+connector_opts() ->
+ mark_request_field_deperecated(
+ proplists:delete(max_retries, emqx_bridge_http_connector:fields(config))
+ ).
+
+mark_request_field_deperecated(Fields) ->
+ lists:map(
+ fun({K, V}) ->
+ case K of
+ request ->
+ {K, V#{
+ %% Note: if we want to deprecate a reference type, we have to change
+ %% it to a direct type first.
+ type => typerefl:map(),
+ deprecated => {since, "5.3.2"},
+ desc => <<"This field is never used, so we deprecated it since 5.3.2.">>
+ }};
+ _ ->
+ {K, V}
+ end
+ end,
+ Fields
+ ).
+
+%%--------------------------------------------------------------------
+%% Examples
+
+bridge_v2_examples(Method) ->
+ [
+ #{
+ <<"http">> => #{
+ summary => <<"HTTP Action">>,
+ value => values({Method, bridge_v2})
+ }
+ }
+ ].
+
+connector_examples(Method) ->
+ [
+ #{
+ <<"http">> => #{
+ summary => <<"HTTP Connector">>,
+ value => values({Method, connector})
+ }
+ }
+ ].
+
+values({get, Type}) ->
+ maps:merge(
+ #{
+ status => <<"connected">>,
+ node_status => [
+ #{
+ node => <<"emqx@localhost">>,
+ status => <<"connected">>
+ }
+ ]
+ },
+ values({post, Type})
+ );
+values({post, bridge_v2}) ->
+ maps:merge(
+ #{
+ name => <<"my_http_action">>,
+ type => <<"http">>
+ },
+ values({put, bridge_v2})
+ );
+values({post, connector}) ->
+ maps:merge(
+ #{
+ name => <<"my_http_connector">>,
+ type => <<"http">>
+ },
+ values({put, connector})
+ );
+values({put, bridge_v2}) ->
+ values(bridge_v2);
+values({put, connector}) ->
+ values(connector);
+values(bridge_v2) ->
+ #{
+ enable => true,
+ connector => <<"my_http_connector">>,
+ parameters => #{
+ path => <<"/room/${room_no}">>,
+ method => <<"post">>,
+ headers => #{},
+ body => <<"${.}">>
+ },
+ resource_opts => #{
+ worker_pool_size => 16,
+ health_check_interval => <<"15s">>,
+ query_mode => <<"async">>
+ }
+ };
+values(connector) ->
+ #{
+ enable => true,
+ url => <<"http://localhost:8080/api/v1">>,
+ headers => #{<<"content-type">> => <<"application/json">>},
+ connect_timeout => <<"15s">>,
+ pool_type => <<"hash">>,
+ pool_size => 1,
+ enable_pipelining => 100
+ }.
diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl
index d9fc595fe..3b7303300 100644
--- a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl
+++ b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl
@@ -39,18 +39,33 @@ all() ->
groups() ->
[].
-init_per_suite(_Config) ->
- emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
- ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge, emqx_rule_engine]),
- ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
- {ok, _} = application:ensure_all_started(emqx_connector),
- [].
+init_per_suite(Config0) ->
+ Config =
+ case os:getenv("DEBUG_CASE") of
+ [_ | _] = DebugCase ->
+ CaseName = list_to_atom(DebugCase),
+ [{debug_case, CaseName} | Config0];
+ _ ->
+ Config0
+ end,
+ Apps = emqx_cth_suite:start(
+ [
+ emqx,
+ emqx_conf,
+ emqx_connector,
+ emqx_bridge_http,
+ emqx_bridge,
+ emqx_rule_engine
+ ],
+ #{work_dir => emqx_cth_suite:work_dir(Config)}
+ ),
+ emqx_mgmt_api_test_util:init_suite(),
+ [{apps, Apps} | Config].
-end_per_suite(_Config) ->
- ok = emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge, emqx_conf]),
- ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
- _ = application:stop(emqx_connector),
- _ = application:stop(emqx_bridge),
+end_per_suite(Config) ->
+ Apps = ?config(apps, Config),
+ emqx_mgmt_api_test_util:end_suite(),
+ ok = emqx_cth_suite:stop(Apps),
ok.
suite() ->
@@ -115,7 +130,8 @@ end_per_testcase(TestCase, _Config) when
->
ok = emqx_bridge_http_connector_test_server:stop(),
persistent_term:erase({?MODULE, times_called}),
- emqx_bridge_testlib:delete_all_bridges(),
+ emqx_bridge_v2_testlib:delete_all_bridges(),
+ emqx_bridge_v2_testlib:delete_all_connectors(),
emqx_common_test_helpers:call_janitor(),
ok;
end_per_testcase(_TestCase, Config) ->
@@ -123,7 +139,8 @@ end_per_testcase(_TestCase, Config) ->
undefined -> ok;
Server -> stop_http_server(Server)
end,
- emqx_bridge_testlib:delete_all_bridges(),
+ emqx_bridge_v2_testlib:delete_all_bridges(),
+ emqx_bridge_v2_testlib:delete_all_connectors(),
emqx_common_test_helpers:call_janitor(),
ok.
@@ -420,7 +437,7 @@ t_send_async_connection_timeout(Config) ->
),
NumberOfMessagesToSend = 10,
[
- emqx_bridge:send_message(BridgeID, #{<<"id">> => Id})
+ do_send_message(#{<<"id">> => Id})
|| Id <- lists:seq(1, NumberOfMessagesToSend)
],
%% Make sure server receives all messages
@@ -431,7 +448,7 @@ t_send_async_connection_timeout(Config) ->
t_async_free_retries(Config) ->
#{port := Port} = ?config(http_server, Config),
- BridgeID = make_bridge(#{
+ _BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "sync",
@@ -445,7 +462,7 @@ t_async_free_retries(Config) ->
Fn = fun(Get, Error) ->
?assertMatch(
{ok, 200, _, _},
- emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
+ do_send_message(#{<<"hello">> => <<"world">>}),
#{error => Error}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@@ -456,7 +473,7 @@ t_async_free_retries(Config) ->
t_async_common_retries(Config) ->
#{port := Port} = ?config(http_server, Config),
- BridgeID = make_bridge(#{
+ _BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "sync",
@@ -471,7 +488,7 @@ t_async_common_retries(Config) ->
FnSucceed = fun(Get, Error) ->
?assertMatch(
{ok, 200, _, _},
- emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
+ do_send_message(#{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@@ -479,7 +496,7 @@ t_async_common_retries(Config) ->
FnFail = fun(Get, Error) ->
?assertMatch(
Error,
- emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}),
+ do_send_message(#{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()}
),
?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@@ -559,7 +576,7 @@ t_path_not_found(Config) ->
ok
end,
fun(Trace) ->
- ?assertEqual([], ?of_kind(webhook_will_retry_async, Trace)),
+ ?assertEqual([], ?of_kind(http_will_retry_async, Trace)),
ok
end
),
@@ -600,7 +617,7 @@ t_too_many_requests(Config) ->
ok
end,
fun(Trace) ->
- ?assertMatch([_ | _], ?of_kind(webhook_will_retry_async, Trace)),
+ ?assertMatch([_ | _], ?of_kind(http_will_retry_async, Trace)),
ok
end
),
@@ -711,6 +728,11 @@ t_bridge_probes_header_atoms(Config) ->
ok.
%% helpers
+
+do_send_message(Message) ->
+ Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(?BRIDGE_TYPE),
+ emqx_bridge_v2:send_message(Type, ?BRIDGE_NAME, Message, #{}).
+
do_t_async_retries(TestCase, TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext,
PTKey = {?MODULE, TestCase, attempts},
diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_v2_SUITE.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_v2_SUITE.erl
new file mode 100644
index 000000000..38d1d5a68
--- /dev/null
+++ b/apps/emqx_bridge_http/test/emqx_bridge_http_v2_SUITE.erl
@@ -0,0 +1,140 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_http_v2_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-import(emqx_mgmt_api_test_util, [request/3]).
+-import(emqx_common_test_helpers, [on_exit/1]).
+-import(emqx_bridge_http_SUITE, [start_http_server/1, stop_http_server/1]).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/asserts.hrl").
+
+-define(BRIDGE_TYPE, <<"http">>).
+-define(BRIDGE_NAME, atom_to_binary(?MODULE)).
+-define(CONNECTOR_NAME, atom_to_binary(?MODULE)).
+
+all() ->
+ emqx_common_test_helpers:all(?MODULE).
+
+groups() ->
+ [].
+
+init_per_suite(Config0) ->
+ Config =
+ case os:getenv("DEBUG_CASE") of
+ [_ | _] = DebugCase ->
+ CaseName = list_to_atom(DebugCase),
+ [{debug_case, CaseName} | Config0];
+ _ ->
+ Config0
+ end,
+ Apps = emqx_cth_suite:start(
+ [
+ emqx,
+ emqx_conf,
+ emqx_connector,
+ emqx_bridge_http,
+ emqx_bridge,
+ emqx_rule_engine
+ ],
+ #{work_dir => emqx_cth_suite:work_dir(Config)}
+ ),
+ emqx_mgmt_api_test_util:init_suite(),
+ [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+ Apps = ?config(apps, Config),
+ emqx_mgmt_api_test_util:end_suite(),
+ ok = emqx_cth_suite:stop(Apps),
+ ok.
+
+suite() ->
+ [{timetrap, {seconds, 60}}].
+
+init_per_testcase(_TestCase, Config) ->
+ Server = start_http_server(#{response_delay_ms => 0}),
+ [{http_server, Server} | Config].
+
+end_per_testcase(_TestCase, Config) ->
+ case ?config(http_server, Config) of
+ undefined -> ok;
+ Server -> stop_http_server(Server)
+ end,
+ emqx_bridge_v2_testlib:delete_all_bridges(),
+ emqx_bridge_v2_testlib:delete_all_connectors(),
+ emqx_common_test_helpers:call_janitor(),
+ ok.
+
+%%--------------------------------------------------------------------
+%% tests
+%%--------------------------------------------------------------------
+
+t_compose_connector_url_and_action_path(Config) ->
+ Path = <<"/foo/bar">>,
+ ConnectorCfg = make_connector_config(Config),
+ ActionCfg = make_action_config([{path, Path} | Config]),
+ CreateConfig = [
+ {bridge_type, ?BRIDGE_TYPE},
+ {bridge_name, ?BRIDGE_NAME},
+ {bridge_config, ActionCfg},
+ {connector_type, ?BRIDGE_TYPE},
+ {connector_name, ?CONNECTOR_NAME},
+ {connector_config, ConnectorCfg}
+ ],
+ {ok, _} = emqx_bridge_v2_testlib:create_bridge(CreateConfig),
+
+ %% assert: the url returned v1 api is composed by the url of the connector and the
+ %% path of the action
+ #{port := Port} = ?config(http_server, Config),
+ ExpectedUrl = iolist_to_binary(io_lib:format("http://localhost:~p/foo/bar", [Port])),
+ {ok, {_, _, [Bridge]}} = emqx_bridge_testlib:list_bridges_api(),
+ ?assertMatch(
+ #{<<"url">> := ExpectedUrl},
+ Bridge
+ ),
+ ok.
+
+%%--------------------------------------------------------------------
+%% helpers
+%%--------------------------------------------------------------------
+
+make_connector_config(Config) ->
+ #{port := Port} = ?config(http_server, Config),
+ #{
+ <<"enable">> => true,
+ <<"url">> => iolist_to_binary(io_lib:format("http://localhost:~p", [Port])),
+ <<"headers">> => #{},
+ <<"pool_type">> => <<"hash">>,
+ <<"pool_size">> => 1
+ }.
+
+make_action_config(Config) ->
+ Path = ?config(path, Config),
+ #{
+ <<"enable">> => true,
+ <<"connector">> => ?CONNECTOR_NAME,
+ <<"parameters">> => #{
+ <<"path">> => Path,
+ <<"method">> => <<"post">>,
+ <<"headers">> => #{},
+ <<"body">> => <<"${.}">>
+ }
+ }.
diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src
index c6236d97c..ef288368d 100644
--- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src
+++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src
@@ -1,6 +1,6 @@
{application, emqx_bridge_influxdb, [
{description, "EMQX Enterprise InfluxDB Bridge"},
- {vsn, "0.1.6"},
+ {vsn, "0.1.7"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src
index da8df2ddc..1d9d5c807 100644
--- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src
+++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_kafka, [
{description, "EMQX Enterprise Kafka Bridge"},
- {vsn, "0.1.12"},
+ {vsn, "0.1.13"},
{registered, [emqx_bridge_kafka_consumer_sup]},
{applications, [
kernel,
diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src
index cbef0dda8..716626bdf 100644
--- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src
+++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_mqtt, [
{description, "EMQX MQTT Broker Bridge"},
- {vsn, "0.1.5"},
+ {vsn, "0.1.6"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src
index 252b8ff00..b1d110d36 100644
--- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src
+++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src
@@ -1,6 +1,6 @@
{application, emqx_bridge_mysql, [
{description, "EMQX Enterprise MySQL Bridge"},
- {vsn, "0.1.2"},
+ {vsn, "0.1.3"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src
index 614747254..fafd49f05 100644
--- a/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src
+++ b/apps/emqx_bridge_pgsql/src/emqx_bridge_pgsql.app.src
@@ -1,6 +1,6 @@
{application, emqx_bridge_pgsql, [
{description, "EMQX Enterprise PostgreSQL Bridge"},
- {vsn, "0.1.4"},
+ {vsn, "0.1.5"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src
index 7e32b5a89..2e1ec3444 100644
--- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src
+++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src
@@ -1,6 +1,6 @@
{application, emqx_bridge_rabbitmq, [
{description, "EMQX Enterprise RabbitMQ Bridge"},
- {vsn, "0.1.6"},
+ {vsn, "0.1.7"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src
index 5b6163969..5b3bb2a2e 100644
--- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src
+++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src
@@ -1,6 +1,6 @@
{application, emqx_bridge_redis, [
{description, "EMQX Enterprise Redis Bridge"},
- {vsn, "0.1.3"},
+ {vsn, "0.1.4"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl
index 5bc330afa..756a5ec30 100644
--- a/apps/emqx_conf/src/emqx_cluster_rpc.erl
+++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl
@@ -66,6 +66,7 @@
-boot_mnesia({mnesia, [boot]}).
-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_conf.hrl").
-ifdef(TEST).
@@ -384,6 +385,7 @@ catch_up(State) -> catch_up(State, false).
catch_up(#{node := Node, retry_interval := RetryMs, is_leaving := false} = State, SkipResult) ->
case transaction(fun ?MODULE:read_next_mfa/1, [Node]) of
{atomic, caught_up} ->
+ ?tp(cluster_rpc_caught_up, #{}),
?TIMEOUT;
{atomic, {still_lagging, NextId, MFA}} ->
{Succeed, _} = apply_mfa(NextId, MFA, ?APPLY_KIND_REPLICATE),
diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src
index 3856a882c..7f495a3cd 100644
--- a/apps/emqx_conf/src/emqx_conf.app.src
+++ b/apps/emqx_conf/src/emqx_conf.app.src
@@ -1,6 +1,6 @@
{application, emqx_conf, [
{description, "EMQX configuration management"},
- {vsn, "0.1.31"},
+ {vsn, "0.1.32"},
{registered, []},
{mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib, emqx_ctl]},
diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src
index cc78829e7..4dac420d9 100644
--- a/apps/emqx_connector/src/emqx_connector.app.src
+++ b/apps/emqx_connector/src/emqx_connector.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "EMQX Data Integration Connectors"},
- {vsn, "0.1.34"},
+ {vsn, "0.1.35"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index b31fb4f74..f25fe9b7e 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -137,7 +137,7 @@ param_path_id() ->
#{
in => path,
required => true,
- example => <<"webhook:webhook_example">>,
+ example => <<"http:my_http_connector">>,
desc => ?DESC("desc_param_path_id")
}
)}.
@@ -158,17 +158,7 @@ connector_info_array_example(Method) ->
lists:map(fun(#{value := Config}) -> Config end, maps:values(connector_info_examples(Method))).
connector_info_examples(Method) ->
- maps:merge(
- #{},
- emqx_enterprise_connector_examples(Method)
- ).
-
--if(?EMQX_RELEASE_EDITION == ee).
-emqx_enterprise_connector_examples(Method) ->
- emqx_connector_ee_schema:examples(Method).
--else.
-emqx_enterprise_connector_examples(_Method) -> #{}.
--endif.
+ emqx_connector_schema:examples(Method).
schema("/connectors") ->
#{
diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl
index bc648a102..ff2790481 100644
--- a/apps/emqx_connector/src/emqx_connector_resource.erl
+++ b/apps/emqx_connector/src/emqx_connector_resource.erl
@@ -49,6 +49,8 @@
get_channels/2
]).
+-export([parse_url/1]).
+
-callback connector_config(ParsedConfig) ->
ParsedConfig
when
@@ -77,8 +79,10 @@ connector_impl_module(_ConnectorType) ->
-endif.
-connector_to_resource_type_ce(_ConnectorType) ->
- no_bridge_v2_for_c2_so_far.
+connector_to_resource_type_ce(http) ->
+ emqx_bridge_http_connector;
+connector_to_resource_type_ce(ConnectorType) ->
+ error({no_bridge_v2, ConnectorType}).
resource_id(ConnectorId) when is_binary(ConnectorId) ->
<<"connector:", ConnectorId/binary>>.
@@ -271,13 +275,11 @@ remove(Type, Name, _Conf, _Opts) ->
%% convert connector configs to what the connector modules want
parse_confs(
- <<"webhook">>,
+ <<"http">>,
_Name,
#{
url := Url,
- method := Method,
- headers := Headers,
- max_retries := Retry
+ headers := Headers
} = Conf
) ->
Url1 = bin(Url),
@@ -290,20 +292,14 @@ parse_confs(
Reason1 = emqx_utils:readable_error_msg(Reason),
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
end,
- RequestTTL = emqx_utils_maps:deep_get(
- [resource_opts, request_ttl],
- Conf
- ),
Conf#{
base_url => BaseUrl1,
request =>
#{
path => Path,
- method => Method,
- body => maps:get(body, Conf, undefined),
headers => Headers,
- request_ttl => RequestTTL,
- max_retries => Retry
+ body => undefined,
+ method => undefined
}
};
parse_confs(<<"iotdb">>, Name, Conf) ->
diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
index 389623b0a..ba3392ee3 100644
--- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
+++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl
@@ -15,7 +15,8 @@
-export([
api_schemas/1,
fields/1,
- examples/1
+ %%examples/1
+ schema_modules/0
]).
resource_type(Type) when is_binary(Type) ->
@@ -141,18 +142,6 @@ connector_structs() ->
)}
].
-examples(Method) ->
- MergeFun =
- fun(Example, Examples) ->
- maps:merge(Examples, Example)
- end,
- Fun =
- fun(Module, Examples) ->
- ConnectorExamples = erlang:apply(Module, connector_examples, [Method]),
- lists:foldl(MergeFun, Examples, ConnectorExamples)
- end,
- lists:foldl(Fun, #{}, schema_modules()).
-
schema_modules() ->
[
emqx_bridge_azure_event_hub,
diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl
index d6f8608ae..6075fe265 100644
--- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl
+++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl
@@ -42,6 +42,8 @@
-export([resource_opts_fields/0, resource_opts_fields/1]).
+-export([examples/1]).
+
-if(?EMQX_RELEASE_EDITION == ee).
enterprise_api_schemas(Method) ->
%% We *must* do this to ensure the module is really loaded, especially when we use
@@ -71,9 +73,40 @@ enterprise_fields_connectors() -> [].
-endif.
+api_schemas(Method) ->
+ [
+ %% We need to map the `type' field of a request (binary) to a
+ %% connector schema module.
+ api_ref(emqx_bridge_http_schema, <<"http">>, Method ++ "_connector")
+ ].
+
+api_ref(Module, Type, Method) ->
+ {Type, ref(Module, Method)}.
+
+examples(Method) ->
+ MergeFun =
+ fun(Example, Examples) ->
+ maps:merge(Examples, Example)
+ end,
+ Fun =
+ fun(Module, Examples) ->
+ ConnectorExamples = erlang:apply(Module, connector_examples, [Method]),
+ lists:foldl(MergeFun, Examples, ConnectorExamples)
+ end,
+ lists:foldl(Fun, #{}, schema_modules()).
+
+-if(?EMQX_RELEASE_EDITION == ee).
+schema_modules() ->
+ [emqx_bridge_http_schema] ++ emqx_connector_ee_schema:schema_modules().
+-else.
+schema_modules() ->
+ [emqx_bridge_http_schema].
+-endif.
+
connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer];
connector_type_to_bridge_types(confluent_producer) -> [confluent_producer];
connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer];
+connector_type_to_bridge_types(http) -> [http, webhook];
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
connector_type_to_bridge_types(matrix) -> [matrix];
connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
@@ -311,8 +344,9 @@ post_request() ->
api_schema("post").
api_schema(Method) ->
+ CE = api_schemas(Method),
EE = enterprise_api_schemas(Method),
- hoconsc:union(connector_api_union(EE)).
+ hoconsc:union(connector_api_union(CE ++ EE)).
connector_api_union(Refs) ->
Index = maps:from_list(Refs),
@@ -357,7 +391,17 @@ roots() ->
end.
fields(connectors) ->
- [] ++ enterprise_fields_connectors();
+ [
+ {http,
+ mk(
+ hoconsc:map(name, ref(emqx_bridge_http_schema, "config_connector")),
+ #{
+ alias => [webhook],
+ desc => <<"HTTP Connector Config">>,
+ required => false
+ }
+ )}
+ ] ++ enterprise_fields_connectors();
fields("node_status") ->
[
node_name(),
diff --git a/apps/emqx_dashboard/src/emqx_dashboard.app.src b/apps/emqx_dashboard/src/emqx_dashboard.app.src
index 97691c6cd..9474d868f 100644
--- a/apps/emqx_dashboard/src/emqx_dashboard.app.src
+++ b/apps/emqx_dashboard/src/emqx_dashboard.app.src
@@ -2,7 +2,7 @@
{application, emqx_dashboard, [
{description, "EMQX Web Dashboard"},
% strict semver, bump manually!
- {vsn, "5.0.30"},
+ {vsn, "5.0.31"},
{modules, []},
{registered, [emqx_dashboard_sup]},
{applications, [
diff --git a/apps/emqx_dashboard_rbac/src/emqx_dashboard_rbac.app.src b/apps/emqx_dashboard_rbac/src/emqx_dashboard_rbac.app.src
index ec8e6cd3f..acc5e6cbd 100644
--- a/apps/emqx_dashboard_rbac/src/emqx_dashboard_rbac.app.src
+++ b/apps/emqx_dashboard_rbac/src/emqx_dashboard_rbac.app.src
@@ -1,6 +1,6 @@
{application, emqx_dashboard_rbac, [
{description, "EMQX Dashboard RBAC"},
- {vsn, "0.1.1"},
+ {vsn, "0.1.2"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src
index 71788947b..19f3bf552 100644
--- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src
+++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso.app.src
@@ -1,6 +1,6 @@
{application, emqx_dashboard_sso, [
{description, "EMQX Dashboard Single Sign-On"},
- {vsn, "0.1.2"},
+ {vsn, "0.1.3"},
{registered, [emqx_dashboard_sso_sup]},
{applications, [
kernel,
diff --git a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src
index 2bce4ff8e..8d868bc75 100644
--- a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src
+++ b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src
@@ -2,7 +2,7 @@
{application, emqx_durable_storage, [
{description, "Message persistence and subscription replays for EMQX"},
% strict semver, bump manually!
- {vsn, "0.1.7"},
+ {vsn, "0.1.8"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, rocksdb, gproc, mria, emqx_utils]},
diff --git a/apps/emqx_enterprise/src/emqx_enterprise.app.src b/apps/emqx_enterprise/src/emqx_enterprise.app.src
index 06bc500f4..d7bcb1fd5 100644
--- a/apps/emqx_enterprise/src/emqx_enterprise.app.src
+++ b/apps/emqx_enterprise/src/emqx_enterprise.app.src
@@ -1,6 +1,6 @@
{application, emqx_enterprise, [
{description, "EMQX Enterprise Edition"},
- {vsn, "0.1.5"},
+ {vsn, "0.1.6"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_gateway/src/emqx_gateway.app.src b/apps/emqx_gateway/src/emqx_gateway.app.src
index df681b00f..81a2e65ed 100644
--- a/apps/emqx_gateway/src/emqx_gateway.app.src
+++ b/apps/emqx_gateway/src/emqx_gateway.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway, [
{description, "The Gateway management application"},
- {vsn, "0.1.27"},
+ {vsn, "0.1.28"},
{registered, []},
{mod, {emqx_gateway_app, []}},
{applications, [kernel, stdlib, emqx, emqx_auth, emqx_ctl]},
diff --git a/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src b/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src
index 97a6e04a1..f4ab5bd24 100644
--- a/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src
+++ b/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_lwm2m, [
{description, "LwM2M Gateway"},
- {vsn, "0.1.4"},
+ {vsn, "0.1.5"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap]},
{env, []},
diff --git a/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src b/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src
index 6913b2c5f..08214aee2 100644
--- a/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src
+++ b/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway_stomp, [
{description, "Stomp Gateway"},
- {vsn, "0.1.4"},
+ {vsn, "0.1.5"},
{registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []},
diff --git a/apps/emqx_ldap/src/emqx_ldap.app.src b/apps/emqx_ldap/src/emqx_ldap.app.src
index 774f11bd4..546c9975c 100644
--- a/apps/emqx_ldap/src/emqx_ldap.app.src
+++ b/apps/emqx_ldap/src/emqx_ldap.app.src
@@ -1,6 +1,6 @@
{application, emqx_ldap, [
{description, "EMQX LDAP Connector"},
- {vsn, "0.1.5"},
+ {vsn, "0.1.6"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_license/src/emqx_license.app.src b/apps/emqx_license/src/emqx_license.app.src
index eb639d164..8d11c6522 100644
--- a/apps/emqx_license/src/emqx_license.app.src
+++ b/apps/emqx_license/src/emqx_license.app.src
@@ -1,6 +1,6 @@
{application, emqx_license, [
{description, "EMQX License"},
- {vsn, "5.0.13"},
+ {vsn, "5.0.14"},
{modules, []},
{registered, [emqx_license_sup]},
{applications, [kernel, stdlib, emqx_ctl]},
diff --git a/apps/emqx_license/src/emqx_license_schema.erl b/apps/emqx_license/src/emqx_license_schema.erl
index 4d62f9be4..aeced0ab0 100644
--- a/apps/emqx_license/src/emqx_license_schema.erl
+++ b/apps/emqx_license/src/emqx_license_schema.erl
@@ -72,10 +72,16 @@ check_license_watermark(Conf) ->
undefined ->
true;
Low ->
- High = hocon_maps:get("license.connection_high_watermark", Conf),
- case High =/= undefined andalso High > Low of
- true -> true;
- false -> {bad_license_watermark, #{high => High, low => Low}}
+ case hocon_maps:get("license.connection_high_watermark", Conf) of
+ undefined ->
+ {bad_license_watermark, #{high => undefined, low => Low}};
+ High ->
+ {ok, HighFloat} = emqx_schema:to_percent(High),
+ {ok, LowFloat} = emqx_schema:to_percent(Low),
+ case HighFloat > LowFloat of
+ true -> true;
+ false -> {bad_license_watermark, #{high => High, low => Low}}
+ end
end
end.
diff --git a/apps/emqx_license/test/emqx_license_http_api_SUITE.erl b/apps/emqx_license/test/emqx_license_http_api_SUITE.erl
index 3aa54feef..ad16d75c9 100644
--- a/apps/emqx_license/test/emqx_license_http_api_SUITE.erl
+++ b/apps/emqx_license/test/emqx_license_http_api_SUITE.erl
@@ -204,6 +204,17 @@ t_license_setting(_Config) ->
?assertEqual(0.5, emqx_config:get([license, connection_low_watermark])),
?assertEqual(0.55, emqx_config:get([license, connection_high_watermark])),
+ %% update
+ Low1 = <<"50%">>,
+ High1 = <<"100%">>,
+ UpdateRes1 = request(put, uri(["license", "setting"]), #{
+ <<"connection_low_watermark">> => Low1,
+ <<"connection_high_watermark">> => High1
+ }),
+ validate_setting(UpdateRes1, Low1, High1),
+ ?assertEqual(0.5, emqx_config:get([license, connection_low_watermark])),
+ ?assertEqual(1.0, emqx_config:get([license, connection_high_watermark])),
+
%% update bad setting low >= high
?assertMatch(
{ok, 400, _},
diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src
index 496afcd64..6d7012313 100644
--- a/apps/emqx_machine/src/emqx_machine.app.src
+++ b/apps/emqx_machine/src/emqx_machine.app.src
@@ -3,7 +3,7 @@
{id, "emqx_machine"},
{description, "The EMQX Machine"},
% strict semver, bump manually!
- {vsn, "0.2.16"},
+ {vsn, "0.2.17"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, emqx_ctl]},
diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src
index efa05ad37..f9deaf819 100644
--- a/apps/emqx_management/src/emqx_management.app.src
+++ b/apps/emqx_management/src/emqx_management.app.src
@@ -2,7 +2,7 @@
{application, emqx_management, [
{description, "EMQX Management API and CLI"},
% strict semver, bump manually!
- {vsn, "5.0.33"},
+ {vsn, "5.0.34"},
{modules, []},
{registered, [emqx_management_sup]},
{applications, [kernel, stdlib, emqx_plugins, minirest, emqx, emqx_ctl, emqx_bridge_http]},
diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl
index c384b55e8..53d1fe589 100644
--- a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl
+++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl
@@ -464,6 +464,7 @@ apps_to_start() ->
emqx_modules,
emqx_gateway,
emqx_exhook,
+ emqx_bridge_http,
emqx_bridge,
emqx_auto_subscribe,
diff --git a/apps/emqx_modules/src/emqx_modules.app.src b/apps/emqx_modules/src/emqx_modules.app.src
index e986a3fe1..377644cdf 100644
--- a/apps/emqx_modules/src/emqx_modules.app.src
+++ b/apps/emqx_modules/src/emqx_modules.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_modules, [
{description, "EMQX Modules"},
- {vsn, "5.0.23"},
+ {vsn, "5.0.24"},
{modules, []},
{applications, [kernel, stdlib, emqx, emqx_ctl]},
{mod, {emqx_modules_app, []}},
diff --git a/apps/emqx_mongodb/src/emqx_mongodb.app.src b/apps/emqx_mongodb/src/emqx_mongodb.app.src
index 2212ac7d4..8279da934 100644
--- a/apps/emqx_mongodb/src/emqx_mongodb.app.src
+++ b/apps/emqx_mongodb/src/emqx_mongodb.app.src
@@ -1,6 +1,6 @@
{application, emqx_mongodb, [
{description, "EMQX MongoDB Connector"},
- {vsn, "0.1.3"},
+ {vsn, "0.1.4"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_mysql/src/emqx_mysql.app.src b/apps/emqx_mysql/src/emqx_mysql.app.src
index 135f6878e..e9f7f6f98 100644
--- a/apps/emqx_mysql/src/emqx_mysql.app.src
+++ b/apps/emqx_mysql/src/emqx_mysql.app.src
@@ -1,6 +1,6 @@
{application, emqx_mysql, [
{description, "EMQX MySQL Database Connector"},
- {vsn, "0.1.4"},
+ {vsn, "0.1.5"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_plugins/src/emqx_plugins.app.src b/apps/emqx_plugins/src/emqx_plugins.app.src
index 963d1ec39..b26836475 100644
--- a/apps/emqx_plugins/src/emqx_plugins.app.src
+++ b/apps/emqx_plugins/src/emqx_plugins.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_plugins, [
{description, "EMQX Plugin Management"},
- {vsn, "0.1.7"},
+ {vsn, "0.1.8"},
{modules, []},
{mod, {emqx_plugins_app, []}},
{applications, [kernel, stdlib, emqx]},
diff --git a/apps/emqx_postgresql/src/emqx_postgresql.app.src b/apps/emqx_postgresql/src/emqx_postgresql.app.src
index efe422cd0..9c31b49c6 100644
--- a/apps/emqx_postgresql/src/emqx_postgresql.app.src
+++ b/apps/emqx_postgresql/src/emqx_postgresql.app.src
@@ -1,6 +1,6 @@
{application, emqx_postgresql, [
{description, "EMQX PostgreSQL Database Connector"},
- {vsn, "0.1.0"},
+ {vsn, "0.1.1"},
{registered, []},
{applications, [
kernel,
diff --git a/apps/emqx_prometheus/src/emqx_prometheus.app.src b/apps/emqx_prometheus/src/emqx_prometheus.app.src
index 4631fec8b..599e20fb7 100644
--- a/apps/emqx_prometheus/src/emqx_prometheus.app.src
+++ b/apps/emqx_prometheus/src/emqx_prometheus.app.src
@@ -2,7 +2,7 @@
{application, emqx_prometheus, [
{description, "Prometheus for EMQX"},
% strict semver, bump manually!
- {vsn, "5.0.17"},
+ {vsn, "5.0.18"},
{modules, []},
{registered, [emqx_prometheus_sup]},
{applications, [kernel, stdlib, prometheus, emqx, emqx_management]},
diff --git a/apps/emqx_redis/src/emqx_redis.app.src b/apps/emqx_redis/src/emqx_redis.app.src
index c9513bcf9..1f8c5fbc3 100644
--- a/apps/emqx_redis/src/emqx_redis.app.src
+++ b/apps/emqx_redis/src/emqx_redis.app.src
@@ -1,10 +1,11 @@
{application, emqx_redis, [
{description, "EMQX Redis Database Connector"},
- {vsn, "0.1.3"},
+ {vsn, "0.1.4"},
{registered, []},
{applications, [
kernel,
stdlib,
+ eredis,
eredis_cluster,
emqx_connector,
emqx_resource
diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src
index 9edd03078..6649d0ef2 100644
--- a/apps/emqx_resource/src/emqx_resource.app.src
+++ b/apps/emqx_resource/src/emqx_resource.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_resource, [
{description, "Manager for all external resources"},
- {vsn, "0.1.25"},
+ {vsn, "0.1.26"},
{registered, []},
{mod, {emqx_resource_app, []}},
{applications, [
diff --git a/apps/emqx_resource/test/emqx_resource_schema_tests.erl b/apps/emqx_resource/test/emqx_resource_schema_tests.erl
index 51575cfe7..b6cda8e97 100644
--- a/apps/emqx_resource/test/emqx_resource_schema_tests.erl
+++ b/apps/emqx_resource/test/emqx_resource_schema_tests.erl
@@ -80,7 +80,7 @@ worker_pool_size_test_() ->
Conf = emqx_utils_maps:deep_put(
[
<<"bridges">>,
- <<"webhook">>,
+ <<"http">>,
<<"simple">>,
<<"resource_opts">>,
<<"worker_pool_size">>
@@ -88,7 +88,7 @@ worker_pool_size_test_() ->
BaseConf,
WorkerPoolSize
),
- #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := CheckedConf}}} = check(Conf),
+ #{<<"bridges">> := #{<<"http">> := #{<<"simple">> := CheckedConf}}} = check(Conf),
#{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf,
WPS
end,
@@ -117,7 +117,7 @@ worker_pool_size_test_() ->
%%===========================================================================
parse_and_check_webhook_bridge(Hocon) ->
- #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)),
+ #{<<"bridges">> := #{<<"http">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)),
Conf.
parse(Hocon) ->
diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src
index cab070826..6c0def7ae 100644
--- a/apps/emqx_retainer/src/emqx_retainer.app.src
+++ b/apps/emqx_retainer/src/emqx_retainer.app.src
@@ -2,7 +2,7 @@
{application, emqx_retainer, [
{description, "EMQX Retainer"},
% strict semver, bump manually!
- {vsn, "5.0.18"},
+ {vsn, "5.0.19"},
{modules, []},
{registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx, emqx_ctl]},
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src
index 7feacee77..b2e3b6c02 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src
@@ -2,7 +2,7 @@
{application, emqx_rule_engine, [
{description, "EMQX Rule Engine"},
% strict semver, bump manually!
- {vsn, "5.0.29"},
+ {vsn, "5.0.30"},
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl
index aadd3d4f5..afa57dfac 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl
@@ -583,10 +583,18 @@ get_referenced_hookpoints(Froms) ->
].
get_egress_bridges(Actions) ->
- [
- emqx_bridge_resource:bridge_id(BridgeType, BridgeName)
- || {bridge, BridgeType, BridgeName, _ResId} <- Actions
- ].
+ lists:foldr(
+ fun
+ ({bridge, BridgeType, BridgeName, _ResId}, Acc) ->
+ [emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc];
+ ({bridge_v2, BridgeType, BridgeName}, Acc) ->
+ [emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc];
+ (_, Acc) ->
+ Acc
+ end,
+ [],
+ Actions
+ ).
%% For allowing an external application to add extra "built-in" functions to the
%% rule engine SQL like language. The module set by
diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
index 14682eff1..f3df46b80 100644
--- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
+++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
@@ -3468,7 +3468,7 @@ t_get_basic_usage_info_1(_Config) ->
referenced_bridges =>
#{
mqtt => 1,
- webhook => 3
+ http => 3
}
},
emqx_rule_engine:get_basic_usage_info()
diff --git a/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl b/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl
index f1da349eb..8f47e8290 100644
--- a/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl
+++ b/apps/emqx_telemetry/test/emqx_telemetry_SUITE.erl
@@ -41,44 +41,32 @@ suite() ->
apps() ->
[
emqx_conf,
- emqx_management,
+ emqx_connector,
emqx_retainer,
emqx_auth,
emqx_auth_redis,
emqx_auth_mnesia,
emqx_auth_postgresql,
emqx_modules,
- emqx_telemetry
+ emqx_telemetry,
+ emqx_bridge_http,
+ emqx_bridge,
+ emqx_rule_engine,
+ emqx_management
].
init_per_suite(Config) ->
- net_kernel:start(['master@127.0.0.1', longnames]),
- ok = meck:new(emqx_authz_file, [non_strict, passthrough, no_history, no_link]),
- meck:expect(
- emqx_authz_file,
- acl_conf_file,
- fun() ->
- emqx_common_test_helpers:deps_path(emqx_auth, "etc/acl.conf")
- end
- ),
- ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
- emqx_gateway_test_utils:load_all_gateway_apps(),
- start_apps(),
- Config.
+ WorkDir = ?config(priv_dir, Config),
+ Apps = emqx_cth_suite:start(apps(), #{work_dir => WorkDir}),
+ emqx_mgmt_api_test_util:init_suite(),
+ [{apps, Apps}, {work_dir, WorkDir} | Config].
-end_per_suite(_Config) ->
- {ok, _} = emqx:update_config(
- [authorization],
- #{
- <<"no_match">> => <<"allow">>,
- <<"cache">> => #{<<"enable">> => <<"true">>},
- <<"sources">> => []
- }
- ),
+end_per_suite(Config) ->
mnesia:clear_table(cluster_rpc_commit),
mnesia:clear_table(cluster_rpc_mfa),
- stop_apps(),
- meck:unload(emqx_authz_file),
+ Apps = ?config(apps, Config),
+ emqx_mgmt_api_test_util:end_suite(),
+ ok = emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(t_get_telemetry_without_memsup, Config) ->
@@ -123,7 +111,6 @@ init_per_testcase(t_advanced_mqtt_features, Config) ->
mock_advanced_mqtt_features(),
Config;
init_per_testcase(t_authn_authz_info, Config) ->
- mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
create_authn('mqtt:global', built_in_database),
create_authn('tcp:default', redis),
@@ -141,14 +128,11 @@ init_per_testcase(t_send_after_enable, Config) ->
mock_httpc(),
Config;
init_per_testcase(t_rule_engine_and_data_bridge_info, Config) ->
- mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
- emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_bridge]),
ok = emqx_bridge_SUITE:setup_fake_telemetry_data(),
ok = setup_fake_rule_engine_data(),
Config;
init_per_testcase(t_exhook_info, Config) ->
- mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
ExhookConf =
#{
@@ -173,31 +157,8 @@ init_per_testcase(t_cluster_uuid, Config) ->
Node = start_slave(n1),
[{n1, Node} | Config];
init_per_testcase(t_uuid_restored_from_file, Config) ->
- mock_httpc(),
- NodeUUID = <<"AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE">>,
- ClusterUUID = <<"FFFFFFFF-GGGG-HHHH-IIII-JJJJJJJJJJJJ">>,
- DataDir = emqx:data_dir(),
- NodeUUIDFile = filename:join(DataDir, "node.uuid"),
- ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
- file:delete(NodeUUIDFile),
- file:delete(ClusterUUIDFile),
- ok = file:write_file(NodeUUIDFile, NodeUUID),
- ok = file:write_file(ClusterUUIDFile, ClusterUUID),
-
- %% clear the UUIDs in the DB
- {atomic, ok} = mria:clear_table(emqx_telemetry),
- stop_apps(),
- ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
- start_apps(),
- Node = start_slave(n1),
- [
- {n1, Node},
- {node_uuid, NodeUUID},
- {cluster_uuid, ClusterUUID}
- | Config
- ];
+ Config;
init_per_testcase(t_uuid_saved_to_file, Config) ->
- mock_httpc(),
DataDir = emqx:data_dir(),
NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
@@ -205,7 +166,6 @@ init_per_testcase(t_uuid_saved_to_file, Config) ->
file:delete(ClusterUUIDFile),
Config;
init_per_testcase(t_num_clients, Config) ->
- mock_httpc(),
ok = snabbkaffe:start_trace(),
Config;
init_per_testcase(_Testcase, Config) ->
@@ -227,7 +187,6 @@ end_per_testcase(t_advanced_mqtt_features, _Config) ->
{atomic, ok} = mria:clear_table(emqx_delayed),
ok;
end_per_testcase(t_authn_authz_info, _Config) ->
- meck:unload([httpc]),
emqx_authz:update({delete, postgresql}, #{}),
lists:foreach(
fun(ChainName) ->
@@ -244,19 +203,8 @@ end_per_testcase(t_enable, _Config) ->
end_per_testcase(t_send_after_enable, _Config) ->
meck:unload([httpc, emqx_telemetry_config]);
end_per_testcase(t_rule_engine_and_data_bridge_info, _Config) ->
- meck:unload(httpc),
- lists:foreach(
- fun(App) ->
- ok = application:stop(App)
- end,
- [
- emqx_bridge,
- emqx_rule_engine
- ]
- ),
ok;
end_per_testcase(t_exhook_info, _Config) ->
- meck:unload(httpc),
emqx_exhook_demo_svr:stop(),
application:stop(emqx_exhook),
ok;
@@ -264,21 +212,12 @@ end_per_testcase(t_cluster_uuid, Config) ->
Node = proplists:get_value(n1, Config),
ok = stop_slave(Node);
end_per_testcase(t_num_clients, Config) ->
- meck:unload([httpc]),
ok = snabbkaffe:stop(),
Config;
-end_per_testcase(t_uuid_restored_from_file, Config) ->
- Node = ?config(n1, Config),
- DataDir = emqx:data_dir(),
- NodeUUIDFile = filename:join(DataDir, "node.uuid"),
- ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
- ok = file:delete(NodeUUIDFile),
- ok = file:delete(ClusterUUIDFile),
- meck:unload([httpc]),
- ok = stop_slave(Node),
- ok;
end_per_testcase(_Testcase, _Config) ->
- meck:unload([httpc]),
+ case catch meck:unload([httpc]) of
+ _ -> ok
+ end,
ok.
%%------------------------------------------------------------------------------
@@ -315,19 +254,34 @@ t_cluster_uuid(Config) ->
%% should attempt read UUID from file in data dir to keep UUIDs
%% unique, in the event of a database purge.
t_uuid_restored_from_file(Config) ->
- ExpectedNodeUUID = ?config(node_uuid, Config),
- ExpectedClusterUUID = ?config(cluster_uuid, Config),
+ %% Stop the emqx_telemetry application first
+ {atomic, ok} = mria:clear_table(emqx_telemetry),
+ application:stop(emqx_telemetry),
+
+ %% Rewrite the the uuid files
+ NodeUUID = <<"AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE">>,
+ ClusterUUID = <<"FFFFFFFF-GGGG-HHHH-IIII-JJJJJJJJJJJJ">>,
+ DataDir = ?config(work_dir, Config),
+ NodeUUIDFile = filename:join(DataDir, "node.uuid"),
+ ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
+ ok = file:write_file(NodeUUIDFile, NodeUUID),
+ ok = file:write_file(ClusterUUIDFile, ClusterUUID),
+
+ %% Start the emqx_telemetry application again
+ application:start(emqx_telemetry),
+
+ %% Check the UUIDs
?assertEqual(
- {ok, ExpectedNodeUUID},
+ {ok, NodeUUID},
emqx_telemetry:get_node_uuid()
),
?assertEqual(
- {ok, ExpectedClusterUUID},
+ {ok, ClusterUUID},
emqx_telemetry:get_cluster_uuid()
),
ok.
-t_uuid_saved_to_file(_Config) ->
+t_uuid_saved_to_file(Config) ->
DataDir = emqx:data_dir(),
NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
@@ -337,9 +291,10 @@ t_uuid_saved_to_file(_Config) ->
%% clear the UUIDs in the DB
{atomic, ok} = mria:clear_table(emqx_telemetry),
- stop_apps(),
- ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
- start_apps(),
+ application:stop(emqx_telemetry),
+
+ application:start(emqx_telemetry),
+
{ok, NodeUUID} = emqx_telemetry:get_node_uuid(),
{ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(),
?assertEqual(
@@ -578,6 +533,7 @@ t_mqtt_runtime_insights(_) ->
t_rule_engine_and_data_bridge_info(_Config) ->
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
+ ct:pal("telemetry data: ~p~n", [TelemetryData]),
RuleInfo = get_value(rule_engine, TelemetryData),
BridgeInfo = get_value(bridge, TelemetryData),
?assertEqual(
@@ -588,7 +544,7 @@ t_rule_engine_and_data_bridge_info(_Config) ->
#{
data_bridge =>
#{
- webhook => #{num => 1, num_linked_by_rules => 3},
+ http => #{num => 1, num_linked_by_rules => 3},
mqtt => #{num => 2, num_linked_by_rules => 2}
},
num_data_bridges => 3
@@ -811,14 +767,6 @@ setup_fake_rule_engine_data() ->
),
ok.
-set_special_configs(emqx_auth) ->
- {ok, _} = emqx:update_config([authorization, cache, enable], false),
- {ok, _} = emqx:update_config([authorization, no_match], deny),
- {ok, _} = emqx:update_config([authorization, sources], []),
- ok;
-set_special_configs(_App) ->
- ok.
-
%% for some unknown reason, gen_rpc running locally or in CI might
%% start with different `port_discovery' modes, which means that'll
%% either be listening at the port in the config (`tcp_server_port',
@@ -887,9 +835,3 @@ leave_cluster() ->
is_official_version(V) ->
emqx_telemetry_config:is_official_version(V).
-
-start_apps() ->
- emqx_common_test_helpers:start_apps(apps(), fun set_special_configs/1).
-
-stop_apps() ->
- emqx_common_test_helpers:stop_apps(lists:reverse(apps())).
diff --git a/apps/emqx_utils/src/emqx_utils.app.src b/apps/emqx_utils/src/emqx_utils.app.src
index a86a8d841..c666d2069 100644
--- a/apps/emqx_utils/src/emqx_utils.app.src
+++ b/apps/emqx_utils/src/emqx_utils.app.src
@@ -2,7 +2,7 @@
{application, emqx_utils, [
{description, "Miscellaneous utilities for EMQX apps"},
% strict semver, bump manually!
- {vsn, "5.0.11"},
+ {vsn, "5.0.12"},
{modules, [
emqx_utils,
emqx_utils_api,
diff --git a/changes/ce/fix-12044.en.md b/changes/ce/fix-12044.en.md
new file mode 100644
index 000000000..89f114215
--- /dev/null
+++ b/changes/ce/fix-12044.en.md
@@ -0,0 +1 @@
+Fix Redis authorization, authentication, and bridges. Previously connections to Redis servers could not be established because driver was not properly loaded.
diff --git a/changes/e5.3.2.en.md b/changes/e5.3.2.en.md
new file mode 100644
index 000000000..07a924b38
--- /dev/null
+++ b/changes/e5.3.2.en.md
@@ -0,0 +1,42 @@
+# e5.3.2
+
+## Enhancements
+
+- [#11752](https://github.com/emqx/emqx/pull/11752) Changed default RPC driver from `gen_rpc` to `rpc` for core-replica database synchronization.
+
+ This improves core-replica data replication latency.
+
+- [#11785](https://github.com/emqx/emqx/pull/11785) Allowed users with the "Viewer" role to change their own passwords. However, those with the "Viewer" role do not have permission to change the passwords of other users.
+
+- [#11787](https://github.com/emqx/emqx/pull/11787) Improved the performance of the `emqx` command.
+
+- [#11790](https://github.com/emqx/emqx/pull/11790) Added validation to Redis commands in Redis authorization source.
+ Additionally, this improvement refines the parsing of Redis commands during authentication and authorization processes. The parsing now aligns with `redis-cli` compatibility standards and supports quoted arguments.
+
+- [#11541](https://github.com/emqx/emqx/pull/11541) Enhanced file transfer capabilities. Now, clients can use an asynchronous method for file transfer by sending commands to the `$file-async/...` topic and subscribing to command execution results on the `$file-response/{clientId}` topic. This improvement simplifies the use of the file transfer feature, particularly suitable for clients using MQTT v3.1/v3.1.1 or those employing MQTT bridging. For more details, please refer to [EIP-0021](https://github.com/emqx/eip).
+
+## Bug Fixes
+
+- [#11757](https://github.com/emqx/emqx/pull/11757) Fixed the error response code when downloading non-existent trace files. Now the response returns `404` instead of `500`.
+
+- [#11762](https://github.com/emqx/emqx/pull/11762) Fixed an issue in EMQX's `built_in_database` authorization source. With this update, all Access Control List (ACL) records are completely removed when an authorization source is deleted. This resolves the issue of residual records remaining in the database when re-creating authorization sources.
+
+- [#11771](https://github.com/emqx/emqx/pull/11771) Fixed validation of Bcrypt salt rounds in authentication management through the API/Dashboard.
+
+- [#11780](https://github.com/emqx/emqx/pull/11780) Fixed validation of the `iterations` field of the `pbkdf2` password hashing algorithm. Now, `iterations` must be strictly positive. Previously, it could be set to 0, which led to a nonfunctional authenticator.
+
+- [#11791](https://github.com/emqx/emqx/pull/11791) Fixed an issue in the EMQX CoAP Gateway where heartbeats were not effectively maintaining the connection's active status. This fix ensures that the heartbeat mechanism properly sustains the liveliness of CoAP Gateway connections.
+
+- [#11797](https://github.com/emqx/emqx/pull/11797) Modified HTTP API behavior for APIs managing the `built_in_database` authorization source. They will now return a `404` status code if `built_in_database` is not set as the authorization source, replacing the former `20X` response.
+
+- [#11965](https://github.com/emqx/emqx/pull/11965) Improved the termination of EMQX services to ensure a graceful stop even in the presence of an unavailable MongoDB resource.
+
+- [#11975](https://github.com/emqx/emqx/pull/11975) This fix addresses an issue where redundant error logs were generated due to a race condition during simultaneous socket closure by a peer and the server. Previously, concurrent socket close events triggered by the operating system and EMQX resulted in unnecessary error logging. The implemented fix improves event handling to eliminate unnecessary error messages.
+
+- [#11987](https://github.com/emqx/emqx/pull/11987) Fixed a bug where attempting to set the `active_n` option on a TCP/SSL socket could lead to a connection crash.
+
+ The problem occurred if the socket had already been closed by the time the connection process attempted to apply the `active_n` setting, resulting in a `case_clause` crash.
+
+- [#11731](https://github.com/emqx/emqx/pull/11731) Added hot configuration support for the file transfer feature.
+
+- [#11754](https://github.com/emqx/emqx/pull/11754) Improved the log formatting specifically for the Postgres bridge in EMQX. It addresses issues related to Unicode characters in error messages returned by the driver.
diff --git a/changes/v5.3.2.en.md b/changes/v5.3.2.en.md
new file mode 100644
index 000000000..dc8f7bdcc
--- /dev/null
+++ b/changes/v5.3.2.en.md
@@ -0,0 +1,38 @@
+# v5.3.2
+
+## Enhancements
+
+- [#11725](https://github.com/emqx/emqx/pull/11725) Introduced the LDAP as a new authentication and authorization backend.
+
+- [#11752](https://github.com/emqx/emqx/pull/11752) Changed default RPC driver from `gen_rpc` to `rpc` for core-replica database synchronization.
+
+ This improves core-replica data replication latency.
+
+- [#11785](https://github.com/emqx/emqx/pull/11785) Allowed users with the "Viewer" role to change their own passwords. However, those with the "Viewer" role do not have permission to change the passwords of other users.
+
+- [#11787](https://github.com/emqx/emqx/pull/11787) Improved the performance of the `emqx` command.
+
+- [#11790](https://github.com/emqx/emqx/pull/11790) Added validation to Redis commands in Redis authorization source.
+ Additionally, this improvement refines the parsing of Redis commands during authentication and authorization processes. The parsing now aligns with `redis-cli` compatibility standards and supports quoted arguments.
+
+## Bug Fixes
+
+- [#11757](https://github.com/emqx/emqx/pull/11757) Fixed the error response code when downloading non-existent trace files. Now the response returns `404` instead of `500`.
+
+- [#11762](https://github.com/emqx/emqx/pull/11762) Fixed an issue in EMQX's `built_in_database` authorization source. With this update, all Access Control List (ACL) records are completely removed when an authorization source is deleted. This resolves the issue of residual records remaining in the database when re-creating authorization sources.
+
+- [#11771](https://github.com/emqx/emqx/pull/11771) Fixed validation of Bcrypt salt rounds in authentication management through the API/Dashboard.
+
+- [#11780](https://github.com/emqx/emqx/pull/11780) Fixed validation of the `iterations` field of the `pbkdf2` password hashing algorithm. Now, `iterations` must be strictly positive. Previously, it could be set to 0, which led to a nonfunctional authenticator.
+
+- [#11791](https://github.com/emqx/emqx/pull/11791) Fixed an issue in the EMQX CoAP Gateway where heartbeats were not effectively maintaining the connection's active status. This fix ensures that the heartbeat mechanism properly sustains the liveliness of CoAP Gateway connections.
+
+- [#11797](https://github.com/emqx/emqx/pull/11797) Modified HTTP API behavior for APIs managing the `built_in_database` authorization source. They will now return a `404` status code if `built_in_database` is not set as the authorization source, replacing the former `20X` response.
+
+- [#11965](https://github.com/emqx/emqx/pull/11965) Improved the termination of EMQX services to ensure a graceful stop even in the presence of an unavailable MongoDB resource.
+
+- [#11975](https://github.com/emqx/emqx/pull/11975) This fix addresses an issue where redundant error logs were generated due to a race condition during simultaneous socket closure by a peer and the server. Previously, concurrent socket close events triggered by the operating system and EMQX resulted in unnecessary error logging. The implemented fix improves event handling to eliminate unnecessary error messages.
+
+- [#11987](https://github.com/emqx/emqx/pull/11987) Fixed a bug where attempting to set the `active_n` option on a TCP/SSL socket could lead to a connection crash.
+
+ The problem occurred if the socket had already been closed by the time the connection process attempted to apply the `active_n` setting, resulting in a `case_clause` crash.
diff --git a/deploy/charts/emqx-enterprise/Chart.yaml b/deploy/charts/emqx-enterprise/Chart.yaml
index aed38cd63..652b2bcf5 100644
--- a/deploy/charts/emqx-enterprise/Chart.yaml
+++ b/deploy/charts/emqx-enterprise/Chart.yaml
@@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
-version: 5.3.2-alpha.1
+version: 5.3.2
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
-appVersion: 5.3.2-alpha.1
+appVersion: 5.3.2
diff --git a/rel/i18n/emqx_bridge_http_schema.hocon b/rel/i18n/emqx_bridge_http_schema.hocon
index b7b715db1..416f77834 100644
--- a/rel/i18n/emqx_bridge_http_schema.hocon
+++ b/rel/i18n/emqx_bridge_http_schema.hocon
@@ -18,10 +18,10 @@ config_direction.desc:
config_direction.label:
"""Bridge Direction"""
-config_enable.desc:
+config_enable_bridge.desc:
"""Enable or disable this bridge"""
-config_enable.label:
+config_enable_bridge.label:
"""Enable Or Disable Bridge"""
config_headers.desc:
@@ -71,6 +71,21 @@ is not allowed."""
config_url.label:
"""HTTP Bridge"""
+config_path.desc:
+"""The URL path for this Action.
+This path will be appended to the Connector's url
configuration to form the full
+URL address.
+Template with variables is allowed in this option. For example, /room/{$room_no}
"""
+
+config_path.label:
+"""URL Path"""
+
+config_parameters_opts.desc:
+"""The parameters for HTTP action."""
+
+config_parameters_opts.label:
+"""Parameters"""
+
desc_config.desc:
"""Configuration for an HTTP bridge."""