Merge pull request #12058 from id/1129-sync-r53

sync release-53 to master
This commit is contained in:
Ivan Dyachkov 2023-11-30 20:42:43 +01:00 committed by GitHub
commit 17985b6016
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
80 changed files with 1330 additions and 469 deletions

View File

@ -20,7 +20,14 @@ jobs:
upload: upload:
runs-on: ubuntu-22.04 runs-on: ubuntu-22.04
permissions: permissions:
contents: write
checks: write
packages: write packages: write
actions: read
issues: read
pull-requests: read
repository-projects: read
statuses: read
strategy: strategy:
fail-fast: false fail-fast: false
steps: steps:
@ -45,11 +52,13 @@ jobs:
v*) v*)
echo "profile=emqx" >> $GITHUB_OUTPUT echo "profile=emqx" >> $GITHUB_OUTPUT
echo "version=$(./pkg-vsn.sh emqx)" >> $GITHUB_OUTPUT echo "version=$(./pkg-vsn.sh emqx)" >> $GITHUB_OUTPUT
echo "ref_name=v$(./pkg-vsn.sh emqx)" >> "$GITHUB_ENV"
echo "s3dir=emqx-ce" >> $GITHUB_OUTPUT echo "s3dir=emqx-ce" >> $GITHUB_OUTPUT
;; ;;
e*) e*)
echo "profile=emqx-enterprise" >> $GITHUB_OUTPUT echo "profile=emqx-enterprise" >> $GITHUB_OUTPUT
echo "version=$(./pkg-vsn.sh emqx-enterprise)" >> $GITHUB_OUTPUT echo "version=$(./pkg-vsn.sh emqx-enterprise)" >> $GITHUB_OUTPUT
echo "ref_name=e$(./pkg-vsn.sh emqx-enterprise)" >> "$GITHUB_ENV"
echo "s3dir=emqx-ee" >> $GITHUB_OUTPUT echo "s3dir=emqx-ee" >> $GITHUB_OUTPUT
;; ;;
esac esac
@ -57,14 +66,15 @@ jobs:
run: | run: |
BUCKET=${{ secrets.AWS_S3_BUCKET }} BUCKET=${{ secrets.AWS_S3_BUCKET }}
OUTPUT_DIR=${{ steps.profile.outputs.s3dir }} OUTPUT_DIR=${{ steps.profile.outputs.s3dir }}
aws s3 cp --recursive s3://$BUCKET/$OUTPUT_DIR/${{ github.ref_name }} packages aws s3 cp --recursive s3://$BUCKET/$OUTPUT_DIR/${{ env.ref_name }} packages
- uses: alexellis/upload-assets@0.4.0 - uses: emqx/upload-assets@8d2083b4dbe3151b0b735572eaa153b6acb647fe # 0.5.0
env: env:
GITHUB_TOKEN: ${{ github.token }} GITHUB_TOKEN: ${{ github.token }}
with: with:
asset_paths: '["packages/*"]' asset_paths: '["packages/*"]'
tag_name: "${{ env.ref_name }}"
- name: update to emqx.io - name: update to emqx.io
if: startsWith(github.ref_name, 'v') && ((github.event_name == 'release' && !github.event.prerelease) || inputs.publish_release_artefacts) if: startsWith(env.ref_name, 'v') && ((github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artefacts)
run: | run: |
set -eux set -eux
curl -w %{http_code} \ curl -w %{http_code} \
@ -72,10 +82,10 @@ jobs:
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
-H "token: ${{ secrets.EMQX_IO_TOKEN }}" \ -H "token: ${{ secrets.EMQX_IO_TOKEN }}" \
-X POST \ -X POST \
-d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ github.ref_name }}\" }" \ -d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ env.ref_name }}\" }" \
${{ secrets.EMQX_IO_RELEASE_API }} ${{ secrets.EMQX_IO_RELEASE_API }}
- name: Push to packagecloud.io - name: Push to packagecloud.io
if: (github.event_name == 'release' && !github.event.prerelease) || inputs.publish_release_artefacts if: (github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artefacts
env: env:
PROFILE: ${{ steps.profile.outputs.profile }} PROFILE: ${{ steps.profile.outputs.profile }}
VERSION: ${{ steps.profile.outputs.version }} VERSION: ${{ steps.profile.outputs.version }}

View File

@ -43,7 +43,7 @@ jobs:
;; ;;
esac esac
- uses: emqx/push-helm-action@v1.1 - uses: emqx/push-helm-action@v1.1
if: github.event_name == 'release' && !github.event.prerelease if: github.event_name == 'release' && !github.event.release.prerelease
with: with:
charts_dir: "${{ github.workspace }}/deploy/charts/${{ steps.profile.outputs.profile }}" charts_dir: "${{ github.workspace }}/deploy/charts/${{ steps.profile.outputs.profile }}"
version: ${{ steps.profile.outputs.version }} version: ${{ steps.profile.outputs.version }}

View File

@ -20,8 +20,8 @@ endif
# Dashboard version # Dashboard version
# from https://github.com/emqx/emqx-dashboard5 # from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.5.1 export EMQX_DASHBOARD_VERSION ?= v1.5.2
export EMQX_EE_DASHBOARD_VERSION ?= e1.3.2-beta.1 export EMQX_EE_DASHBOARD_VERSION ?= e1.3.2
PROFILE ?= emqx PROFILE ?= emqx
REL_PROFILES := emqx emqx-enterprise REL_PROFILES := emqx emqx-enterprise

View File

@ -35,7 +35,7 @@
-define(EMQX_RELEASE_CE, "5.3.2"). -define(EMQX_RELEASE_CE, "5.3.2").
%% Enterprise edition %% Enterprise edition
-define(EMQX_RELEASE_EE, "5.3.2-alpha.1"). -define(EMQX_RELEASE_EE, "5.3.2").
%% The HTTP API version %% The HTTP API version
-define(EMQX_API_VERSION, "5.0"). -define(EMQX_API_VERSION, "5.0").

View File

@ -2,7 +2,7 @@
{application, emqx, [ {application, emqx, [
{id, "emqx"}, {id, "emqx"},
{description, "EMQX Core"}, {description, "EMQX Core"},
{vsn, "5.1.14"}, {vsn, "5.1.15"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [ {applications, [

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_auth_ldap, [ {application, emqx_auth_ldap, [
{description, "EMQX LDAP Authentication and Authorization"}, {description, "EMQX LDAP Authentication and Authorization"},
{vsn, "0.1.1"}, {vsn, "0.1.2"},
{registered, []}, {registered, []},
{mod, {emqx_auth_ldap_app, []}}, {mod, {emqx_auth_ldap_app, []}},
{applications, [ {applications, [

View File

@ -89,7 +89,7 @@ hard_coded_action_info_modules_ee() ->
-endif. -endif.
hard_coded_action_info_modules_common() -> hard_coded_action_info_modules_common() ->
[]. [emqx_bridge_http_action_info].
hard_coded_action_info_modules() -> hard_coded_action_info_modules() ->
hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee(). hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee().

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge, [ {application, emqx_bridge, [
{description, "EMQX bridges"}, {description, "EMQX bridges"},
{vsn, "0.1.30"}, {vsn, "0.1.31"},
{registered, [emqx_bridge_sup]}, {registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}}, {mod, {emqx_bridge_app, []}},
{applications, [ {applications, [

View File

@ -364,7 +364,7 @@ get_metrics(Type, Name) ->
maybe_upgrade(mqtt, Config) -> maybe_upgrade(mqtt, Config) ->
emqx_bridge_compatible_config:maybe_upgrade(Config); emqx_bridge_compatible_config:maybe_upgrade(Config);
maybe_upgrade(webhook, Config) -> maybe_upgrade(webhook, Config) ->
emqx_bridge_compatible_config:webhook_maybe_upgrade(Config); emqx_bridge_compatible_config:http_maybe_upgrade(Config);
maybe_upgrade(_Other, Config) -> maybe_upgrade(_Other, Config) ->
Config. Config.

View File

@ -143,7 +143,7 @@ param_path_id() ->
#{ #{
in => path, in => path,
required => true, required => true,
example => <<"webhook:webhook_example">>, example => <<"http:http_example">>,
desc => ?DESC("desc_param_path_id") desc => ?DESC("desc_param_path_id")
} }
)}. )}.
@ -166,9 +166,9 @@ bridge_info_array_example(Method) ->
bridge_info_examples(Method) -> bridge_info_examples(Method) ->
maps:merge( maps:merge(
#{ #{
<<"webhook_example">> => #{ <<"http_example">> => #{
summary => <<"WebHook">>, summary => <<"HTTP">>,
value => info_example(webhook, Method) value => info_example(http, Method)
}, },
<<"mqtt_example">> => #{ <<"mqtt_example">> => #{
summary => <<"MQTT Bridge">>, summary => <<"MQTT Bridge">>,
@ -201,7 +201,7 @@ method_example(Type, Method) when Method == get; Method == post ->
method_example(_Type, put) -> method_example(_Type, put) ->
#{}. #{}.
info_example_basic(webhook) -> info_example_basic(http) ->
#{ #{
enable => true, enable => true,
url => <<"http://localhost:9901/messages/${topic}">>, url => <<"http://localhost:9901/messages/${topic}">>,
@ -212,7 +212,7 @@ info_example_basic(webhook) ->
pool_size => 4, pool_size => 4,
enable_pipelining => 100, enable_pipelining => 100,
ssl => #{enable => false}, ssl => #{enable => false},
local_topic => <<"emqx_webhook/#">>, local_topic => <<"emqx_http/#">>,
method => post, method => post,
body => <<"${payload}">>, body => <<"${payload}">>,
resource_opts => #{ resource_opts => #{
@ -650,7 +650,8 @@ create_or_update_bridge(BridgeType0, BridgeName, Conf, HttpStatusCode) ->
get_metrics_from_local_node(BridgeType0, BridgeName) -> get_metrics_from_local_node(BridgeType0, BridgeName) ->
BridgeType = upgrade_type(BridgeType0), BridgeType = upgrade_type(BridgeType0),
format_metrics(emqx_bridge:get_metrics(BridgeType, BridgeName)). MetricsResult = emqx_bridge:get_metrics(BridgeType, BridgeName),
format_metrics(MetricsResult).
'/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> '/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
?TRY_PARSE_ID( ?TRY_PARSE_ID(

View File

@ -63,18 +63,23 @@
). ).
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(BridgeType) when is_binary(BridgeType) ->
bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8));
bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector; bridge_to_resource_type(mqtt) ->
bridge_to_resource_type(webhook) -> emqx_bridge_http_connector; emqx_bridge_mqtt_connector;
bridge_to_resource_type(BridgeType) -> emqx_bridge_enterprise:resource_type(BridgeType). bridge_to_resource_type(webhook) ->
emqx_bridge_http_connector;
bridge_to_resource_type(BridgeType) ->
emqx_bridge_enterprise:resource_type(BridgeType).
bridge_impl_module(BridgeType) -> emqx_bridge_enterprise:bridge_impl_module(BridgeType). bridge_impl_module(BridgeType) -> emqx_bridge_enterprise:bridge_impl_module(BridgeType).
-else. -else.
bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(BridgeType) when is_binary(BridgeType) ->
bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(binary_to_existing_atom(BridgeType, utf8));
bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector; bridge_to_resource_type(mqtt) ->
bridge_to_resource_type(webhook) -> emqx_bridge_http_connector. emqx_bridge_mqtt_connector;
bridge_to_resource_type(webhook) ->
emqx_bridge_http_connector.
bridge_impl_module(_BridgeType) -> undefined. bridge_impl_module(_BridgeType) -> undefined.
-endif. -endif.
@ -309,6 +314,7 @@ remove(Type, Name, _Conf, _Opts) ->
emqx_resource:remove_local(resource_id(Type, Name)). emqx_resource:remove_local(resource_id(Type, Name)).
%% convert bridge configs to what the connector modules want %% convert bridge configs to what the connector modules want
%% TODO: remove it, if the http_bridge already ported to v2
parse_confs( parse_confs(
<<"webhook">>, <<"webhook">>,
_Name, _Name,

View File

@ -1188,7 +1188,7 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
%% If the bridge v2 does not exist, it is a valid bridge v1 %% If the bridge v2 does not exist, it is a valid bridge v1
PreviousRawConf = undefined, PreviousRawConf = undefined,
split_bridge_v1_config_and_create_helper( split_bridge_v1_config_and_create_helper(
BridgeV1Type, BridgeName, RawConf, PreviousRawConf BridgeV1Type, BridgeName, RawConf, PreviousRawConf, fun() -> ok end
); );
_Conf -> _Conf ->
case ?MODULE:bridge_v1_is_valid(BridgeV1Type, BridgeName) of case ?MODULE:bridge_v1_is_valid(BridgeV1Type, BridgeName) of
@ -1198,9 +1198,13 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
PreviousRawConf = emqx:get_raw_config( PreviousRawConf = emqx:get_raw_config(
[?ROOT_KEY, BridgeV2Type, BridgeName], undefined [?ROOT_KEY, BridgeV2Type, BridgeName], undefined
), ),
bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps), %% To avoid losing configurations. We have to make sure that no crash occurs
%% during deletion and creation of configurations.
PreCreateFun = fun() ->
bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps)
end,
split_bridge_v1_config_and_create_helper( split_bridge_v1_config_and_create_helper(
BridgeV1Type, BridgeName, RawConf, PreviousRawConf BridgeV1Type, BridgeName, RawConf, PreviousRawConf, PreCreateFun
); );
false -> false ->
%% If the bridge v2 exists, it is not a valid bridge v1 %% If the bridge v2 exists, it is not a valid bridge v1
@ -1208,7 +1212,10 @@ bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
end end
end. end.
split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf, PreviousRawConf) -> split_bridge_v1_config_and_create_helper(
BridgeV1Type, BridgeName, RawConf, PreviousRawConf, PreCreateFun
) ->
try
#{ #{
connector_type := ConnectorType, connector_type := ConnectorType,
connector_name := NewConnectorName, connector_name := NewConnectorName,
@ -1216,8 +1223,38 @@ split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf, Prev
bridge_v2_type := BridgeType, bridge_v2_type := BridgeType,
bridge_v2_name := BridgeName, bridge_v2_name := BridgeName,
bridge_v2_conf := NewBridgeV2RawConf bridge_v2_conf := NewBridgeV2RawConf
} = } = split_and_validate_bridge_v1_config(
split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousRawConf), BridgeV1Type,
BridgeName,
RawConf,
PreviousRawConf
),
_ = PreCreateFun(),
do_connector_and_bridge_create(
ConnectorType,
NewConnectorName,
NewConnectorRawConf,
BridgeType,
BridgeName,
NewBridgeV2RawConf,
RawConf
)
catch
throw:Reason ->
{error, Reason}
end.
do_connector_and_bridge_create(
ConnectorType,
NewConnectorName,
NewConnectorRawConf,
BridgeType,
BridgeName,
NewBridgeV2RawConf,
RawConf
) ->
case emqx_connector:create(ConnectorType, NewConnectorName, NewConnectorRawConf) of case emqx_connector:create(ConnectorType, NewConnectorName, NewConnectorRawConf) of
{ok, _} -> {ok, _} ->
case create(BridgeType, BridgeName, NewBridgeV2RawConf) of case create(BridgeType, BridgeName, NewBridgeV2RawConf) of
@ -1335,6 +1372,7 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
RawConf = maps:without([<<"name">>], RawConfig0), RawConf = maps:without([<<"name">>], RawConfig0),
TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
PreviousRawConf = undefined, PreviousRawConf = undefined,
try
#{ #{
connector_type := _ConnectorType, connector_type := _ConnectorType,
connector_name := _NewConnectorName, connector_name := _NewConnectorName,
@ -1343,7 +1381,11 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
bridge_v2_name := _BridgeName, bridge_v2_name := _BridgeName,
bridge_v2_conf := BridgeV2RawConf bridge_v2_conf := BridgeV2RawConf
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf), } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf). create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf)
catch
throw:Reason ->
{error, Reason}
end.
%% Only called by test cases (may create broken references) %% Only called by test cases (may create broken references)
bridge_v1_remove(BridgeV1Type, BridgeName) -> bridge_v1_remove(BridgeV1Type, BridgeName) ->

View File

@ -117,7 +117,7 @@ param_path_id() ->
#{ #{
in => path, in => path,
required => true, required => true,
example => <<"webhook:webhook_example">>, example => <<"http:my_http_action">>,
desc => ?DESC("desc_param_path_id") desc => ?DESC("desc_param_path_id")
} }
)}. )}.

View File

@ -21,7 +21,7 @@
-export([ -export([
upgrade_pre_ee/2, upgrade_pre_ee/2,
maybe_upgrade/1, maybe_upgrade/1,
webhook_maybe_upgrade/1 http_maybe_upgrade/1
]). ]).
upgrade_pre_ee(undefined, _UpgradeFunc) -> upgrade_pre_ee(undefined, _UpgradeFunc) ->
@ -40,10 +40,10 @@ maybe_upgrade(#{<<"connector">> := _} = Config0) ->
maybe_upgrade(NewVersion) -> maybe_upgrade(NewVersion) ->
NewVersion. NewVersion.
webhook_maybe_upgrade(#{<<"direction">> := _} = Config0) -> http_maybe_upgrade(#{<<"direction">> := _} = Config0) ->
Config1 = maps:remove(<<"direction">>, Config0), Config1 = maps:remove(<<"direction">>, Config0),
Config1#{<<"resource_opts">> => default_resource_opts()}; Config1#{<<"resource_opts">> => default_resource_opts()};
webhook_maybe_upgrade(NewVersion) -> http_maybe_upgrade(NewVersion) ->
NewVersion. NewVersion.
binary_key({K, V}) -> binary_key({K, V}) ->

View File

@ -162,13 +162,14 @@ roots() -> [{bridges, ?HOCON(?R_REF(bridges), #{importance => ?IMPORTANCE_LOW})}
fields(bridges) -> fields(bridges) ->
[ [
{webhook, {http,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_http_schema, "config")), hoconsc:map(name, ref(emqx_bridge_http_schema, "config")),
#{ #{
aliases => [webhook],
desc => ?DESC("bridges_webhook"), desc => ?DESC("bridges_webhook"),
required => false, required => false,
converter => fun webhook_bridge_converter/2 converter => fun http_bridge_converter/2
} }
)}, )},
{mqtt, {mqtt,
@ -243,7 +244,7 @@ status() ->
node_name() -> node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
webhook_bridge_converter(Conf0, _HoconOpts) -> http_bridge_converter(Conf0, _HoconOpts) ->
emqx_bridge_compatible_config:upgrade_pre_ee( emqx_bridge_compatible_config:upgrade_pre_ee(
Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 Conf0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1
). ).

View File

@ -30,14 +30,18 @@ init_per_suite(Config) ->
[ [
emqx, emqx,
emqx_conf, emqx_conf,
emqx_connector,
emqx_bridge_http,
emqx_bridge emqx_bridge
], ],
#{work_dir => ?config(priv_dir, Config)} #{work_dir => ?config(priv_dir, Config)}
), ),
emqx_mgmt_api_test_util:init_suite(),
[{apps, Apps} | Config]. [{apps, Apps} | Config].
end_per_suite(Config) -> end_per_suite(Config) ->
Apps = ?config(apps, Config), Apps = ?config(apps, Config),
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_cth_suite:stop(Apps), ok = emqx_cth_suite:stop(Apps),
ok. ok.
@ -58,6 +62,7 @@ end_per_testcase(t_get_basic_usage_info_1, _Config) ->
ok = emqx_bridge:remove(BridgeType, BridgeName) ok = emqx_bridge:remove(BridgeType, BridgeName)
end, end,
[ [
%% Keep using the old bridge names to avoid breaking the tests
{webhook, <<"basic_usage_info_webhook">>}, {webhook, <<"basic_usage_info_webhook">>},
{webhook, <<"basic_usage_info_webhook_disabled">>}, {webhook, <<"basic_usage_info_webhook_disabled">>},
{mqtt, <<"basic_usage_info_mqtt">>} {mqtt, <<"basic_usage_info_mqtt">>}
@ -88,7 +93,7 @@ t_get_basic_usage_info_1(_Config) ->
#{ #{
num_bridges => 3, num_bridges => 3,
count_by_type => #{ count_by_type => #{
webhook => 1, http => 1,
mqtt => 2 mqtt => 2
} }
}, },
@ -119,40 +124,33 @@ setup_fake_telemetry_data() ->
HTTPConfig = #{ HTTPConfig = #{
url => <<"http://localhost:9901/messages/${topic}">>, url => <<"http://localhost:9901/messages/${topic}">>,
enable => true, enable => true,
local_topic => "emqx_webhook/#", local_topic => "emqx_http/#",
method => post, method => post,
body => <<"${payload}">>, body => <<"${payload}">>,
headers => #{}, headers => #{},
request_timeout => "15s" request_timeout => "15s"
}, },
Conf = %% Keep use the old bridge names to test the backward compatibility
#{ {ok, _} = emqx_bridge_testlib:create_bridge_api(
<<"bridges">> => <<"webhook">>,
#{ <<"basic_usage_info_webhook">>,
<<"webhook">> => HTTPConfig
#{ ),
<<"basic_usage_info_webhook">> => HTTPConfig, {ok, _} = emqx_bridge_testlib:create_bridge_api(
<<"basic_usage_info_webhook_disabled">> => <<"webhook">>,
<<"basic_usage_info_webhook_disabled">>,
HTTPConfig#{enable => false} HTTPConfig#{enable => false}
}, ),
<<"mqtt">> => {ok, _} = emqx_bridge_testlib:create_bridge_api(
#{ <<"mqtt">>,
<<"basic_usage_info_mqtt">> => MQTTConfig1, <<"basic_usage_info_mqtt">>,
<<"basic_usage_info_mqtt_from_select">> => MQTTConfig2 MQTTConfig1
} ),
} {ok, _} = emqx_bridge_testlib:create_bridge_api(
}, <<"mqtt">>,
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf), <<"basic_usage_info_mqtt_from_select">>,
MQTTConfig2
ok = snabbkaffe:start_trace(), ),
Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_loaded end,
NEvents = 3,
BackInTime = 0,
Timeout = 11_000,
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, NEvents, Timeout, BackInTime),
ok = emqx_bridge:load(),
{ok, _} = snabbkaffe_collector:receive_events(Sub),
ok = snabbkaffe:stop(),
ok. ok.
t_update_ssl_conf(Config) -> t_update_ssl_conf(Config) ->

View File

@ -73,13 +73,15 @@
-define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)). -define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)).
-define(APPSPECS, [ -define(APPSPECS, [
emqx_conf,
emqx, emqx,
emqx_conf,
emqx_auth, emqx_auth,
emqx_auth_mnesia, emqx_auth_mnesia,
emqx_management, emqx_management,
{emqx_rule_engine, "rule_engine { rules {} }"}, emqx_connector,
{emqx_bridge, "bridges {}"} emqx_bridge_http,
{emqx_bridge, "actions {}\n bridges {}"},
{emqx_rule_engine, "rule_engine { rules {} }"}
]). ]).
-define(APPSPEC_DASHBOARD, -define(APPSPEC_DASHBOARD,
@ -108,7 +110,7 @@ groups() ->
]. ].
suite() -> suite() ->
[{timetrap, {seconds, 60}}]. [{timetrap, {seconds, 120}}].
init_per_suite(Config) -> init_per_suite(Config) ->
Config. Config.
@ -117,10 +119,10 @@ end_per_suite(_Config) ->
ok. ok.
init_per_group(cluster = Name, Config) -> init_per_group(cluster = Name, Config) ->
Nodes = [NodePrimary | _] = mk_cluster(Config), Nodes = [NodePrimary | _] = mk_cluster(Name, Config),
init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
init_per_group(cluster_later_join = Name, Config) -> init_per_group(cluster_later_join = Name, Config) ->
Nodes = [NodePrimary | _] = mk_cluster(Config, #{join_to => undefined}), Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}),
init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
init_per_group(_Name, Config) -> init_per_group(_Name, Config) ->
WorkDir = emqx_cth_suite:work_dir(Config), WorkDir = emqx_cth_suite:work_dir(Config),
@ -132,10 +134,10 @@ init_api(Config) ->
{ok, App} = erpc:call(APINode, emqx_common_test_http, create_default_app, []), {ok, App} = erpc:call(APINode, emqx_common_test_http, create_default_app, []),
[{api, App} | Config]. [{api, App} | Config].
mk_cluster(Config) -> mk_cluster(Name, Config) ->
mk_cluster(Config, #{}). mk_cluster(Name, Config, #{}).
mk_cluster(Config, Opts) -> mk_cluster(Name, Config, Opts) ->
Node1Apps = ?APPSPECS ++ [?APPSPEC_DASHBOARD], Node1Apps = ?APPSPECS ++ [?APPSPEC_DASHBOARD],
Node2Apps = ?APPSPECS, Node2Apps = ?APPSPECS,
emqx_cth_cluster:start( emqx_cth_cluster:start(
@ -143,7 +145,7 @@ mk_cluster(Config, Opts) ->
{emqx_bridge_api_SUITE1, Opts#{role => core, apps => Node1Apps}}, {emqx_bridge_api_SUITE1, Opts#{role => core, apps => Node1Apps}},
{emqx_bridge_api_SUITE2, Opts#{role => core, apps => Node2Apps}} {emqx_bridge_api_SUITE2, Opts#{role => core, apps => Node2Apps}}
], ],
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Name, Config)}
). ).
end_per_group(Group, Config) when end_per_group(Group, Config) when
@ -159,7 +161,7 @@ init_per_testcase(t_broken_bpapi_vsn, Config) ->
meck:new(emqx_bpapi, [passthrough]), meck:new(emqx_bpapi, [passthrough]),
meck:expect(emqx_bpapi, supported_version, 1, -1), meck:expect(emqx_bpapi, supported_version, 1, -1),
meck:expect(emqx_bpapi, supported_version, 2, -1), meck:expect(emqx_bpapi, supported_version, 2, -1),
init_per_testcase(commong, Config); init_per_testcase(common, Config);
init_per_testcase(t_old_bpapi_vsn, Config) -> init_per_testcase(t_old_bpapi_vsn, Config) ->
meck:new(emqx_bpapi, [passthrough]), meck:new(emqx_bpapi, [passthrough]),
meck:expect(emqx_bpapi, supported_version, 1, 1), meck:expect(emqx_bpapi, supported_version, 1, 1),
@ -185,6 +187,18 @@ end_per_testcase(_, Config) ->
ok. ok.
clear_resources() -> clear_resources() ->
lists:foreach(
fun(#{type := Type, name := Name}) ->
ok = emqx_bridge_v2:remove(Type, Name)
end,
emqx_bridge_v2:list()
),
lists:foreach(
fun(#{type := Type, name := Name}) ->
ok = emqx_connector:remove(Type, Name)
end,
emqx_connector:list()
),
lists:foreach( lists:foreach(
fun(#{type := Type, name := Name}) -> fun(#{type := Type, name := Name}) ->
ok = emqx_bridge:remove(Type, Name) ok = emqx_bridge:remove(Type, Name)
@ -407,10 +421,7 @@ t_http_crud_apis(Config) ->
Config Config
), ),
?assertMatch( ?assertMatch(
#{ #{<<"reason">> := <<"required_field">>},
<<"reason">> := <<"unknown_fields">>,
<<"unknown">> := <<"curl">>
},
json(maps:get(<<"message">>, PutFail2)) json(maps:get(<<"message">>, PutFail2))
), ),
{ok, 400, _} = request_json( {ok, 400, _} = request_json(
@ -419,12 +430,16 @@ t_http_crud_apis(Config) ->
?HTTP_BRIDGE(<<"localhost:1234/foo">>, Name), ?HTTP_BRIDGE(<<"localhost:1234/foo">>, Name),
Config Config
), ),
{ok, 400, _} = request_json( {ok, 400, PutFail3} = request_json(
put, put,
uri(["bridges", BridgeID]), uri(["bridges", BridgeID]),
?HTTP_BRIDGE(<<"htpp://localhost:12341234/foo">>, Name), ?HTTP_BRIDGE(<<"htpp://localhost:12341234/foo">>, Name),
Config Config
), ),
?assertMatch(
#{<<"kind">> := <<"validation_error">>},
json(maps:get(<<"message">>, PutFail3))
),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
@ -463,7 +478,7 @@ t_http_crud_apis(Config) ->
), ),
%% Create non working bridge %% Create non working bridge
BrokenURL = ?URL(Port + 1, "/foo"), BrokenURL = ?URL(Port + 1, "foo"),
{ok, 201, BrokenBridge} = request( {ok, 201, BrokenBridge} = request(
post, post,
uri(["bridges"]), uri(["bridges"]),
@ -471,6 +486,7 @@ t_http_crud_apis(Config) ->
fun json/1, fun json/1,
Config Config
), ),
?assertMatch( ?assertMatch(
#{ #{
<<"type">> := ?BRIDGE_TYPE_HTTP, <<"type">> := ?BRIDGE_TYPE_HTTP,
@ -1307,7 +1323,9 @@ t_cluster_later_join_metrics(Config) ->
Name = ?BRIDGE_NAME, Name = ?BRIDGE_NAME,
BridgeParams = ?HTTP_BRIDGE(URL1, Name), BridgeParams = ?HTTP_BRIDGE(URL1, Name),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
?check_trace( ?check_trace(
#{timetrap => 15_000},
begin begin
%% Create a bridge on only one of the nodes. %% Create a bridge on only one of the nodes.
?assertMatch({ok, 201, _}, request_json(post, uri(["bridges"]), BridgeParams, Config)), ?assertMatch({ok, 201, _}, request_json(post, uri(["bridges"]), BridgeParams, Config)),
@ -1319,8 +1337,26 @@ t_cluster_later_join_metrics(Config) ->
}}, }},
request_json(get, uri(["bridges", BridgeID, "metrics"]), Config) request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
), ),
ct:print("node joining cluster"),
%% Now join the other node join with the api node. %% Now join the other node join with the api node.
ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]), ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]),
%% Hack / workaround for the fact that `emqx_machine_boot' doesn't restart the
%% applications, in particular `emqx_conf' doesn't restart and synchronize the
%% transaction id. It's also unclear at the moment why the equivalent test in
%% `emqx_bridge_v2_api_SUITE' doesn't need this hack.
ok = erpc:call(OtherNode, application, stop, [emqx_conf]),
ok = erpc:call(OtherNode, application, start, [emqx_conf]),
ct:print("node joined cluster"),
%% assert: wait for the bridge to be ready on the other node.
{_, {ok, _}} =
?wait_async_action(
{emqx_cluster_rpc, OtherNode} ! wake_up,
#{?snk_kind := cluster_rpc_caught_up, ?snk_meta := #{node := OtherNode}},
10_000
),
%% Check metrics; shouldn't crash even if the bridge is not %% Check metrics; shouldn't crash even if the bridge is not
%% ready on the node that just joined the cluster. %% ready on the node that just joined the cluster.
?assertMatch( ?assertMatch(
@ -1373,17 +1409,16 @@ t_create_with_bad_name(Config) ->
validate_resource_request_ttl(single, Timeout, Name) -> validate_resource_request_ttl(single, Timeout, Name) ->
SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000}, SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), _BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name),
?check_trace( ?check_trace(
begin begin
{ok, Res} = {ok, Res} =
?wait_async_action( ?wait_async_action(
emqx_bridge:send_message(BridgeID, SentData), do_send_message(?BRIDGE_TYPE_HTTP, Name, SentData),
#{?snk_kind := async_query}, #{?snk_kind := async_query},
1000 1000
), ),
?assertMatch({ok, #{id := ResId, query_opts := #{timeout := Timeout}}}, Res) ?assertMatch({ok, #{id := _ResId, query_opts := #{timeout := Timeout}}}, Res)
end, end,
fun(Trace0) -> fun(Trace0) ->
Trace = ?of_kind(async_query, Trace0), Trace = ?of_kind(async_query, Trace0),
@ -1394,6 +1429,10 @@ validate_resource_request_ttl(single, Timeout, Name) ->
validate_resource_request_ttl(_Cluster, _Timeout, _Name) -> validate_resource_request_ttl(_Cluster, _Timeout, _Name) ->
ignore. ignore.
do_send_message(BridgeV1Type, Name, Message) ->
Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
emqx_bridge_v2:send_message(Type, Name, Message, #{}).
%% %%
request(Method, URL, Config) -> request(Method, URL, Config) ->

View File

@ -21,7 +21,7 @@ empty_config_test() ->
Conf1 = #{<<"bridges">> => #{}}, Conf1 = #{<<"bridges">> => #{}},
Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}}, Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}},
?assertEqual(Conf1, check(Conf1)), ?assertEqual(Conf1, check(Conf1)),
?assertEqual(Conf2, check(Conf2)), ?assertEqual(#{<<"bridges">> => #{<<"http">> => #{}}}, check(Conf2)),
ok. ok.
%% ensure webhook config can be checked %% ensure webhook config can be checked
@ -33,7 +33,7 @@ webhook_config_test() ->
?assertMatch( ?assertMatch(
#{ #{
<<"bridges">> := #{ <<"bridges">> := #{
<<"webhook">> := #{ <<"http">> := #{
<<"the_name">> := <<"the_name">> :=
#{ #{
<<"method">> := get, <<"method">> := get,
@ -48,7 +48,7 @@ webhook_config_test() ->
?assertMatch( ?assertMatch(
#{ #{
<<"bridges">> := #{ <<"bridges">> := #{
<<"webhook">> := #{ <<"http">> := #{
<<"the_name">> := <<"the_name">> :=
#{ #{
<<"method">> := get, <<"method">> := get,
@ -61,7 +61,7 @@ webhook_config_test() ->
), ),
#{ #{
<<"bridges">> := #{ <<"bridges">> := #{
<<"webhook">> := #{ <<"http">> := #{
<<"the_name">> := <<"the_name">> :=
#{ #{
<<"method">> := get, <<"method">> := get,
@ -84,7 +84,7 @@ up(#{<<"mqtt">> := MqttBridges0} = Bridges) ->
Bridges#{<<"mqtt">> := MqttBridges}; Bridges#{<<"mqtt">> := MqttBridges};
up(#{<<"webhook">> := WebhookBridges0} = Bridges) -> up(#{<<"webhook">> := WebhookBridges0} = Bridges) ->
WebhookBridges = emqx_bridge_compatible_config:upgrade_pre_ee( WebhookBridges = emqx_bridge_compatible_config:upgrade_pre_ee(
WebhookBridges0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 WebhookBridges0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1
), ),
Bridges#{<<"webhook">> := WebhookBridges}. Bridges#{<<"webhook">> := WebhookBridges}.

View File

@ -92,7 +92,7 @@ end_per_testcase(_Testcase, Config) ->
delete_all_bridges() -> delete_all_bridges() ->
lists:foreach( lists:foreach(
fun(#{name := Name, type := Type}) -> fun(#{name := Name, type := Type}) ->
emqx_bridge:remove(Type, Name) ok = emqx_bridge:remove(Type, Name)
end, end,
emqx_bridge:list() emqx_bridge:list()
). ).

View File

@ -185,7 +185,7 @@ mk_cluster(Name, Config, Opts) ->
{emqx_bridge_v2_api_SUITE_1, Opts#{role => core, apps => Node1Apps}}, {emqx_bridge_v2_api_SUITE_1, Opts#{role => core, apps => Node1Apps}},
{emqx_bridge_v2_api_SUITE_2, Opts#{role => core, apps => Node2Apps}} {emqx_bridge_v2_api_SUITE_2, Opts#{role => core, apps => Node2Apps}}
], ],
#{work_dir => filename:join(?config(priv_dir, Config), Name)} #{work_dir => emqx_cth_suite:work_dir(Name, Config)}
). ).
end_per_group(Group, Config) when end_per_group(Group, Config) when

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_azure_event_hub, [ {application, emqx_bridge_azure_event_hub, [
{description, "EMQX Enterprise Azure Event Hub Bridge"}, {description, "EMQX Enterprise Azure Event Hub Bridge"},
{vsn, "0.1.4"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [ {application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"}, {description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.1.10"}, {vsn, "0.1.11"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -237,7 +237,10 @@ handle_continue(?patch_subscription, State0) ->
), ),
{noreply, State0}; {noreply, State0};
error -> error ->
%% retry %% retry; add a random delay for the case where multiple workers step on each
%% other's toes before retrying.
RandomMS = rand:uniform(500),
timer:sleep(RandomMS),
{noreply, State0, {continue, ?patch_subscription}} {noreply, State0, {continue, ?patch_subscription}}
end. end.
@ -478,7 +481,6 @@ do_pull_async(State0) ->
Body = body(State0, pull), Body = body(State0, pull),
PreparedRequest = {prepared_request, {Method, Path, Body}}, PreparedRequest = {prepared_request, {Method, Path, Body}},
ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]}, ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]},
%% `ehttpc_pool'/`gproc_pool' might return `false' if there are no workers...
Res = emqx_bridge_gcp_pubsub_client:query_async( Res = emqx_bridge_gcp_pubsub_client:query_async(
PreparedRequest, PreparedRequest,
ReplyFunAndArgs, ReplyFunAndArgs,

View File

@ -196,7 +196,7 @@ consumer_config(TestCase, Config) ->
" connect_timeout = \"5s\"\n" " connect_timeout = \"5s\"\n"
" service_account_json = ~s\n" " service_account_json = ~s\n"
" consumer {\n" " consumer {\n"
" ack_deadline = \"60s\"\n" " ack_deadline = \"10s\"\n"
" ack_retry_interval = \"1s\"\n" " ack_retry_interval = \"1s\"\n"
" pull_max_messages = 10\n" " pull_max_messages = 10\n"
" consumer_workers_per_topic = 1\n" " consumer_workers_per_topic = 1\n"
@ -208,7 +208,7 @@ consumer_config(TestCase, Config) ->
" resource_opts {\n" " resource_opts {\n"
" health_check_interval = \"1s\"\n" " health_check_interval = \"1s\"\n"
%% to fail and retry pulling faster %% to fail and retry pulling faster
" request_ttl = \"5s\"\n" " request_ttl = \"1s\"\n"
" }\n" " }\n"
"}\n", "}\n",
[ [
@ -285,7 +285,7 @@ start_control_client() ->
connect_timeout => 5_000, connect_timeout => 5_000,
max_retries => 0, max_retries => 0,
pool_size => 1, pool_size => 1,
resource_opts => #{request_ttl => 5_000}, resource_opts => #{request_ttl => 1_000},
service_account_json => RawServiceAccount service_account_json => RawServiceAccount
}, },
PoolName = <<"control_connector">>, PoolName = <<"control_connector">>,
@ -512,10 +512,23 @@ wait_acked(Opts) ->
%% no need to check return value; we check the property in %% no need to check return value; we check the property in
%% the check phase. this is just to give it a chance to do %% the check phase. this is just to give it a chance to do
%% so and avoid flakiness. should be fast. %% so and avoid flakiness. should be fast.
snabbkaffe:block_until( Res = snabbkaffe:block_until(
?match_n_events(N, #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}), ?match_n_events(N, #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}),
Timeout Timeout
), ),
case Res of
{ok, _} ->
ok;
{timeout, Evts} ->
%% Fixme: apparently, snabbkaffe may timeout but still return the expected
%% events here.
case length(Evts) >= N of
true ->
ok;
false ->
ct:pal("timed out waiting for acks;\n expected: ~b\n received:\n ~p", [N, Evts])
end
end,
ok. ok.
wait_forgotten() -> wait_forgotten() ->
@ -652,25 +665,24 @@ setup_and_start_listeners(Node, NodeOpts) ->
end end
). ).
dedup([]) ->
[];
dedup([X]) ->
[X];
dedup([X | Rest]) ->
[X | dedup(X, Rest)].
dedup(X, [X | Rest]) ->
dedup(X, Rest);
dedup(_X, [Y | Rest]) ->
[Y | dedup(Y, Rest)];
dedup(_X, []) ->
[].
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Trace properties %% Trace properties
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
prop_pulled_only_once() ->
{"all pulled message ids are unique", fun ?MODULE:prop_pulled_only_once/1}.
prop_pulled_only_once(Trace) ->
PulledIds =
[
MsgId
|| #{messages := Msgs} <- ?of_kind(gcp_pubsub_consumer_worker_decoded_messages, Trace),
#{<<"message">> := #{<<"messageId">> := MsgId}} <- Msgs
],
NumPulled = length(PulledIds),
UniquePulledIds = sets:from_list(PulledIds, [{version, 2}]),
UniqueNumPulled = sets:size(UniquePulledIds),
?assertEqual(UniqueNumPulled, NumPulled, #{pulled_ids => PulledIds}),
ok.
prop_handled_only_once() -> prop_handled_only_once() ->
{"all pulled message are processed only once", fun ?MODULE:prop_handled_only_once/1}. {"all pulled message are processed only once", fun ?MODULE:prop_handled_only_once/1}.
prop_handled_only_once(Trace) -> prop_handled_only_once(Trace) ->
@ -1046,7 +1058,6 @@ t_consume_ok(Config) ->
end, end,
[ [
prop_all_pulled_are_acked(), prop_all_pulled_are_acked(),
prop_pulled_only_once(),
prop_handled_only_once(), prop_handled_only_once(),
prop_acked_ids_eventually_forgotten() prop_acked_ids_eventually_forgotten()
] ]
@ -1119,7 +1130,6 @@ t_bridge_rule_action_source(Config) ->
#{payload => Payload0} #{payload => Payload0}
end, end,
[ [
prop_pulled_only_once(),
prop_handled_only_once() prop_handled_only_once()
] ]
), ),
@ -1237,7 +1247,6 @@ t_multiple_topic_mappings(Config) ->
end, end,
[ [
prop_all_pulled_are_acked(), prop_all_pulled_are_acked(),
prop_pulled_only_once(),
prop_handled_only_once() prop_handled_only_once()
] ]
), ),
@ -1265,11 +1274,12 @@ t_multiple_pull_workers(Config) ->
<<"consumer">> => #{ <<"consumer">> => #{
%% reduce flakiness %% reduce flakiness
<<"ack_deadline">> => <<"10m">>, <<"ack_deadline">> => <<"10m">>,
<<"ack_retry_interval">> => <<"1s">>,
<<"consumer_workers_per_topic">> => NConsumers <<"consumer_workers_per_topic">> => NConsumers
}, },
<<"resource_opts">> => #{ <<"resource_opts">> => #{
%% reduce flakiness %% reduce flakiness
<<"request_ttl">> => <<"15s">> <<"request_ttl">> => <<"20s">>
} }
} }
), ),
@ -1297,7 +1307,6 @@ t_multiple_pull_workers(Config) ->
end, end,
[ [
prop_all_pulled_are_acked(), prop_all_pulled_are_acked(),
prop_pulled_only_once(),
prop_handled_only_once(), prop_handled_only_once(),
{"message is processed only once", fun(Trace) -> {"message is processed only once", fun(Trace) ->
?assertMatch({timeout, _}, receive_published(#{timeout => 5_000})), ?assertMatch({timeout, _}, receive_published(#{timeout => 5_000})),
@ -1531,11 +1540,12 @@ t_async_worker_death_mid_pull(Config) ->
ct:pal("published message"), ct:pal("published message"),
AsyncWorkerPids = get_async_worker_pids(Config), AsyncWorkerPids = get_async_worker_pids(Config),
Timeout = 20_000,
emqx_utils:pmap( emqx_utils:pmap(
fun(AsyncWorkerPid) -> fun(AsyncWorkerPid) ->
Ref = monitor(process, AsyncWorkerPid), Ref = monitor(process, AsyncWorkerPid),
ct:pal("killing pid ~p", [AsyncWorkerPid]), ct:pal("killing pid ~p", [AsyncWorkerPid]),
sys:terminate(AsyncWorkerPid, die, 20_000), exit(AsyncWorkerPid, kill),
receive receive
{'DOWN', Ref, process, AsyncWorkerPid, _} -> {'DOWN', Ref, process, AsyncWorkerPid, _} ->
ct:pal("killed pid ~p", [AsyncWorkerPid]), ct:pal("killed pid ~p", [AsyncWorkerPid]),
@ -1544,7 +1554,8 @@ t_async_worker_death_mid_pull(Config) ->
end, end,
ok ok
end, end,
AsyncWorkerPids AsyncWorkerPids,
Timeout + 2_000
), ),
ok ok
@ -1558,7 +1569,13 @@ t_async_worker_death_mid_pull(Config) ->
?wait_async_action( ?wait_async_action(
create_bridge( create_bridge(
Config, Config,
#{<<"pool_size">> => 1} #{
<<"pool_size">> => 1,
<<"consumer">> => #{
<<"ack_deadline">> => <<"10s">>,
<<"ack_retry_interval">> => <<"1s">>
}
}
), ),
#{?snk_kind := gcp_pubsub_consumer_worker_init}, #{?snk_kind := gcp_pubsub_consumer_worker_init},
10_000 10_000
@ -1590,18 +1607,19 @@ t_async_worker_death_mid_pull(Config) ->
], ],
Trace Trace
), ),
SubTraceEvts = ?projection(?snk_kind, SubTrace),
?assertMatch( ?assertMatch(
[ [
#{?snk_kind := gcp_pubsub_consumer_worker_handled_async_worker_down}, gcp_pubsub_consumer_worker_handled_async_worker_down,
#{?snk_kind := gcp_pubsub_consumer_worker_reply_delegator} gcp_pubsub_consumer_worker_reply_delegator
| _ | _
], ],
SubTrace, dedup(SubTraceEvts),
#{sub_trace => projection_optional_span(SubTrace)} #{sub_trace => projection_optional_span(SubTrace)}
), ),
?assertMatch( ?assertMatch(
#{?snk_kind := gcp_pubsub_consumer_worker_pull_response_received}, gcp_pubsub_consumer_worker_pull_response_received,
lists:last(SubTrace) lists:last(SubTraceEvts)
), ),
ok ok
end end
@ -1888,7 +1906,10 @@ t_connection_down_during_ack(Config) ->
{{ok, _}, {ok, _}} = {{ok, _}, {ok, _}} =
?wait_async_action( ?wait_async_action(
create_bridge(Config), create_bridge(
Config,
#{<<"consumer">> => #{<<"ack_retry_interval">> => <<"1s">>}}
),
#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
10_000 10_000
), ),
@ -1930,7 +1951,6 @@ t_connection_down_during_ack(Config) ->
end, end,
[ [
prop_all_pulled_are_acked(), prop_all_pulled_are_acked(),
prop_pulled_only_once(),
prop_handled_only_once(), prop_handled_only_once(),
{"message is processed only once", fun(Trace) -> {"message is processed only once", fun(Trace) ->
?assertMatch({timeout, _}, receive_published(#{timeout => 5_000})), ?assertMatch({timeout, _}, receive_published(#{timeout => 5_000})),
@ -1955,7 +1975,15 @@ t_connection_down_during_ack_redeliver(Config) ->
?wait_async_action( ?wait_async_action(
create_bridge( create_bridge(
Config, Config,
#{<<"consumer">> => #{<<"ack_deadline">> => <<"10s">>}} #{
<<"consumer">> => #{
<<"ack_deadline">> => <<"12s">>,
<<"ack_retry_interval">> => <<"1s">>
},
<<"resource_opts">> => #{
<<"request_ttl">> => <<"11s">>
}
}
), ),
#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
10_000 10_000
@ -2026,7 +2054,13 @@ t_connection_down_during_pull(Config) ->
{{ok, _}, {ok, _}} = {{ok, _}, {ok, _}} =
?wait_async_action( ?wait_async_action(
create_bridge(Config), create_bridge(
Config,
#{
<<"consumer">> => #{<<"ack_retry_interval">> => <<"1s">>},
<<"resource_opts">> => #{<<"request_ttl">> => <<"11s">>}
}
),
#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"},
10_000 10_000
), ),

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_greptimedb, [ {application, emqx_bridge_greptimedb, [
{description, "EMQX GreptimeDB Bridge"}, {description, "EMQX GreptimeDB Bridge"},
{vsn, "0.1.4"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -3,7 +3,7 @@
{vsn, "0.1.5"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]}, {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]},
{env, []}, {env, [{emqx_action_info_modules, [emqx_bridge_http_action_info]}]},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -0,0 +1,102 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_http_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0,
connector_action_config_to_bridge_v1_config/2,
bridge_v1_config_to_action_config/2,
bridge_v1_config_to_connector_config/1
]).
-define(REMOVED_KEYS, [<<"direction">>]).
-define(ACTION_KEYS, [<<"local_topic">>, <<"resource_opts">>]).
-define(PARAMETER_KEYS, [<<"body">>, <<"max_retries">>, <<"method">>, <<"request_timeout">>]).
bridge_v1_type_name() -> webhook.
action_type_name() -> http.
connector_type_name() -> http.
schema_module() -> emqx_bridge_http_schema.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
%% Move parameters to the top level
ParametersMap1 = maps:get(<<"parameters">>, BridgeV1Config1, #{}),
ParametersMap2 = maps:without([<<"path">>, <<"headers">>], ParametersMap1),
BridgeV1Config2 = maps:remove(<<"parameters">>, BridgeV1Config1),
BridgeV1Config3 = emqx_utils_maps:deep_merge(BridgeV1Config2, ParametersMap2),
BridgeV1Config4 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config3),
Url = maps:get(<<"url">>, ConnectorConfig),
Path = maps:get(<<"path">>, ParametersMap1, <<>>),
Headers1 = maps:get(<<"headers">>, ConnectorConfig, #{}),
Headers2 = maps:get(<<"headers">>, ParametersMap1, #{}),
Url1 =
case Path of
<<>> -> Url;
_ -> iolist_to_binary(emqx_bridge_http_connector:join_paths(Url, Path))
end,
BridgeV1Config4#{
<<"headers">> => maps:merge(Headers1, Headers2),
<<"url">> => Url1
}.
bridge_v1_config_to_connector_config(BridgeV1Conf) ->
%% To statisfy the emqx_bridge_api_SUITE:t_http_crud_apis/1
ok = validate_webhook_url(maps:get(<<"url">>, BridgeV1Conf, undefined)),
maps:without(?REMOVED_KEYS ++ ?ACTION_KEYS ++ ?PARAMETER_KEYS, BridgeV1Conf).
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
Parameters = maps:with(?PARAMETER_KEYS, BridgeV1Conf),
Parameters1 = Parameters#{<<"path">> => <<>>, <<"headers">> => #{}},
CommonKeys = [<<"enable">>, <<"description">>],
ActionConfig = maps:with(?ACTION_KEYS ++ CommonKeys, BridgeV1Conf),
ActionConfig#{<<"parameters">> => Parameters1, <<"connector">> => ConnectorName}.
%%--------------------------------------------------------------------
%% helpers
validate_webhook_url(undefined) ->
throw(#{
kind => validation_error,
reason => required_field,
required_field => <<"url">>
});
validate_webhook_url(Url) ->
{BaseUrl, _Path} = emqx_connector_resource:parse_url(Url),
case emqx_http_lib:uri_parse(BaseUrl) of
{ok, _} ->
ok;
{error, Reason} ->
throw(#{
kind => validation_error,
reason => invalid_url,
url => Url,
error => emqx_utils:readable_error_msg(Reason)
})
end.

View File

@ -31,9 +31,14 @@
on_query/3, on_query/3,
on_query_async/4, on_query_async/4,
on_get_status/2, on_get_status/2,
reply_delegator/3 on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]). ]).
-export([reply_delegator/3]).
-export([ -export([
roots/0, roots/0,
fields/1, fields/1,
@ -41,7 +46,7 @@
namespace/0 namespace/0
]). ]).
%% for other webhook-like connectors. %% for other http-like connectors.
-export([redact_request/1]). -export([redact_request/1]).
-export([validate_method/1, join_paths/2]). -export([validate_method/1, join_paths/2]).
@ -251,6 +256,21 @@ start_pool(PoolName, PoolOpts) ->
Error Error
end. end.
on_add_channel(
_InstId,
OldState,
ActionId,
ActionConfig
) ->
InstalledActions = maps:get(installed_actions, OldState, #{}),
{ok, ActionState} = do_create_http_action(ActionConfig),
NewInstalledActions = maps:put(ActionId, ActionState, InstalledActions),
NewState = maps:put(installed_actions, NewInstalledActions, OldState),
{ok, NewState}.
do_create_http_action(_ActionConfig = #{parameters := Params}) ->
{ok, preprocess_request(Params)}.
on_stop(InstId, _State) -> on_stop(InstId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_http_connector", msg => "stopping_http_connector",
@ -260,6 +280,16 @@ on_stop(InstId, _State) ->
?tp(emqx_connector_http_stopped, #{instance_id => InstId}), ?tp(emqx_connector_http_stopped, #{instance_id => InstId}),
Res. Res.
on_remove_channel(
_InstId,
OldState = #{installed_actions := InstalledActions},
ActionId
) ->
NewInstalledActions = maps:remove(ActionId, InstalledActions),
NewState = maps:put(installed_actions, NewInstalledActions, OldState),
{ok, NewState}.
%% BridgeV1 entrypoint
on_query(InstId, {send_message, Msg}, State) -> on_query(InstId, {send_message, Msg}, State) ->
case maps:get(request, State, undefined) of case maps:get(request, State, undefined) of
undefined -> undefined ->
@ -282,6 +312,36 @@ on_query(InstId, {send_message, Msg}, State) ->
State State
) )
end; end;
%% BridgeV2 entrypoint
on_query(
InstId,
{ActionId, Msg},
State = #{installed_actions := InstalledActions}
) when is_binary(ActionId) ->
case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
{undefined, _} ->
?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
{error, arg_request_not_found};
{_, undefined} ->
?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}),
{error, action_not_found};
{Request, ActionState} ->
#{
method := Method,
path := Path,
body := Body,
headers := Headers,
request_timeout := Timeout
} = process_request_and_action(Request, ActionState, Msg),
%% bridge buffer worker has retry, do not let ehttpc retry
Retry = 2,
ClientId = maps:get(clientid, Msg, undefined),
on_query(
InstId,
{ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
State
)
end;
on_query(InstId, {Method, Request}, State) -> on_query(InstId, {Method, Request}, State) ->
%% TODO: Get retry from State %% TODO: Get retry from State
on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State); on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State);
@ -343,6 +403,7 @@ on_query(
Result Result
end. end.
%% BridgeV1 entrypoint
on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) -> on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
case maps:get(request, State, undefined) of case maps:get(request, State, undefined) of
undefined -> undefined ->
@ -364,6 +425,36 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
State State
) )
end; end;
%% BridgeV2 entrypoint
on_query_async(
InstId,
{ActionId, Msg},
ReplyFunAndArgs,
State = #{installed_actions := InstalledActions}
) when is_binary(ActionId) ->
case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
{undefined, _} ->
?SLOG(error, #{msg => "arg_request_not_found", connector => InstId}),
{error, arg_request_not_found};
{_, undefined} ->
?SLOG(error, #{msg => "action_not_found", connector => InstId, action_id => ActionId}),
{error, action_not_found};
{Request, ActionState} ->
#{
method := Method,
path := Path,
body := Body,
headers := Headers,
request_timeout := Timeout
} = process_request_and_action(Request, ActionState, Msg),
ClientId = maps:get(clientid, Msg, undefined),
on_query_async(
InstId,
{ClientId, Method, {Path, Headers, Body}, Timeout},
ReplyFunAndArgs,
State
)
end;
on_query_async( on_query_async(
InstId, InstId,
{KeyOrNum, Method, Request, Timeout}, {KeyOrNum, Method, Request, Timeout},
@ -411,6 +502,9 @@ resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->
ehttpc_pool:pick_worker(PoolName, Key) ehttpc_pool:pick_worker(PoolName, Key)
end. end.
on_get_channels(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId).
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of case do_get_status(PoolName, Timeout) of
ok -> ok ->
@ -456,6 +550,14 @@ do_get_status(PoolName, Timeout) ->
{error, timeout} {error, timeout}
end. end.
on_get_channel_status(
InstId,
_ChannelId,
State
) ->
%% XXX: Reuse the connector status
on_get_status(InstId, State).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -466,10 +568,10 @@ preprocess_request(Req) when map_size(Req) == 0 ->
preprocess_request( preprocess_request(
#{ #{
method := Method, method := Method,
path := Path, path := Path
headers := Headers
} = Req } = Req
) -> ) ->
Headers = maps:get(headers, Req, []),
#{ #{
method => parse_template(to_bin(Method)), method => parse_template(to_bin(Method)),
path => parse_template(Path), path => parse_template(Path),
@ -529,6 +631,49 @@ maybe_parse_template(Key, Conf) ->
parse_template(String) -> parse_template(String) ->
emqx_template:parse(String). emqx_template:parse(String).
process_request_and_action(Request, ActionState, Msg) ->
MethodTemplate = maps:get(method, ActionState),
Method = make_method(render_template_string(MethodTemplate, Msg)),
BodyTemplate = maps:get(body, ActionState),
Body = render_request_body(BodyTemplate, Msg),
PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)),
PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)),
Path =
case PathSuffix of
"" -> PathPrefix;
_ -> join_paths(PathPrefix, PathSuffix)
end,
HeadersTemplate1 = maps:get(headers, Request),
HeadersTemplate2 = maps:get(headers, ActionState),
Headers = merge_proplist(
render_headers(HeadersTemplate1, Msg),
render_headers(HeadersTemplate2, Msg)
),
#{
method => Method,
path => Path,
body => Body,
headers => Headers,
request_timeout => maps:get(request_timeout, ActionState)
}.
merge_proplist(Proplist1, Proplist2) ->
lists:foldl(
fun({K, V}, Acc) ->
case lists:keyfind(K, 1, Acc) of
false ->
[{K, V} | Acc];
{K, _} = {K, V1} ->
[{K, V1} | Acc]
end
end,
Proplist2,
Proplist1
).
process_request( process_request(
#{ #{
method := MethodTemplate, method := MethodTemplate,
@ -691,7 +836,7 @@ maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
true -> Context; true -> Context;
false -> Context#{attempt := Attempt + 1} false -> Context#{attempt := Attempt + 1}
end, end,
?tp(webhook_will_retry_async, #{}), ?tp(http_will_retry_async, #{}),
Worker = resolve_pool_worker(State, KeyOrNum), Worker = resolve_pool_worker(State, KeyOrNum),
ok = ehttpc:request_async( ok = ehttpc:request_async(
Worker, Worker,

View File

@ -18,69 +18,162 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/1, ref/2]).
-export([roots/0, fields/1, namespace/0, desc/1]). -export([roots/0, fields/1, namespace/0, desc/1]).
-export([
bridge_v2_examples/1,
%%conn_bridge_examples/1,
connector_examples/1
]).
%%====================================================================================== %%======================================================================================
%% Hocon Schema Definitions %% Hocon Schema Definitions
namespace() -> "bridge_http". namespace() -> "bridge_http".
roots() -> []. roots() -> [].
fields("config") -> %%--------------------------------------------------------------------
basic_config() ++ request_config(); %% v1 bridges http api
%% see: emqx_bridge_schema:get_response/0, put_request/0, post_request/0
fields("post") -> fields("post") ->
[ [
type_field(), old_type_field(),
name_field() name_field()
] ++ fields("config"); ] ++ fields("config");
fields("put") -> fields("put") ->
fields("config"); fields("config");
fields("get") -> fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post"); emqx_bridge_schema:status_fields() ++ fields("post");
fields("creation_opts") -> %%--- v1 bridges config file
%% see: emqx_bridge_schema:fields(bridges)
fields("config") ->
basic_config() ++ request_config();
%%--------------------------------------------------------------------
%% v2: configuration
fields(action) ->
{http,
mk(
hoconsc:map(name, ref(?MODULE, "http_action")),
#{
aliases => [webhook],
desc => <<"HTTP Action Config">>,
required => false
}
)};
fields("http_action") ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable_bridge"), default => true})},
{connector,
mk(binary(), #{
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
})},
{description, emqx_schema:description_schema()},
%% Note: there's an implicit convention in `emqx_bridge' that,
%% for egress bridges with this config, the published messages
%% will be forwarded to such bridges.
{local_topic,
mk(
binary(),
#{
required => false,
desc => ?DESC("config_local_topic"),
importance => ?IMPORTANCE_HIDDEN
}
)},
%% Since e5.3.2, we split the http bridge to two parts: a) connector. b) actions.
%% some fields are moved to connector, some fields are moved to actions and composed into the
%% `parameters` field.
{parameters,
mk(ref("parameters_opts"), #{
required => true,
desc => ?DESC("config_parameters_opts")
})}
] ++ http_resource_opts();
fields("parameters_opts") ->
[
{path,
mk(
binary(),
#{
desc => ?DESC("config_path"),
required => false
}
)},
method_field(),
headers_field(),
body_field(),
max_retries_field(),
request_timeout_field()
];
%% v2: api schema
%% The parameter equls to
%% `get_bridge_v2`, `post_bridge_v2`, `put_bridge_v2` from emqx_bridge_v2_schema:api_schema/1
%% `get_connector`, `post_connector`, `put_connector` from emqx_connector_schema:api_schema/1
fields("post_" ++ Type) ->
[type_field(), name_field() | fields("config_" ++ Type)];
fields("put_" ++ Type) ->
fields("config_" ++ Type);
fields("get_" ++ Type) ->
emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type);
fields("config_bridge_v2") ->
fields("http_action");
fields("config_connector") ->
[
{enable,
mk(
boolean(),
#{
desc => <<"Enable or disable this connector">>,
default => true
}
)},
{description, emqx_schema:description_schema()}
] ++ connector_url_headers() ++ connector_opts();
%%--------------------------------------------------------------------
%% v1/v2
fields("resource_opts") ->
UnsupportedOpts = [enable_batch, batch_size, batch_time],
lists:filter( lists:filter(
fun({K, _V}) -> fun({K, _V}) -> not lists:member(K, UnsupportedOpts) end,
not lists:member(K, unsupported_opts())
end,
emqx_resource_schema:fields("creation_opts") emqx_resource_schema:fields("creation_opts")
). ).
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc("creation_opts") -> desc("resource_opts") ->
?DESC(emqx_resource_schema, "creation_opts"); ?DESC(emqx_resource_schema, "creation_opts");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for WebHook using `", string:to_upper(Method), "` method."]; ["Configuration for WebHook using `", string:to_upper(Method), "` method."];
desc("config_connector") ->
?DESC("desc_config");
desc("http_action") ->
?DESC("desc_config");
desc("parameters_opts") ->
?DESC("config_parameters_opts");
desc(_) -> desc(_) ->
undefined. undefined.
%%--------------------------------------------------------------------
%% helpers for v1 only
basic_config() -> basic_config() ->
[ [
{enable, {enable,
mk( mk(
boolean(), boolean(),
#{ #{
desc => ?DESC("config_enable"), desc => ?DESC("config_enable_bridge"),
default => true default => true
} }
)} )},
] ++ webhook_creation_opts() ++ {description, emqx_schema:description_schema()}
proplists:delete( ] ++ http_resource_opts() ++ connector_opts().
max_retries, emqx_bridge_http_connector:fields(config)
).
request_config() -> request_config() ->
[ [
{url, url_field(),
mk(
binary(),
#{
required => true,
desc => ?DESC("config_url")
}
)},
{direction, {direction,
mk( mk(
egress, egress,
@ -98,81 +191,37 @@ request_config() ->
required => false required => false
} }
)}, )},
{method, method_field(),
mk( headers_field(),
method(), body_field(),
#{ max_retries_field(),
default => post, request_timeout_field()
desc => ?DESC("config_method")
}
)},
{headers,
mk(
map(),
#{
default => #{
<<"accept">> => <<"application/json">>,
<<"cache-control">> => <<"no-cache">>,
<<"connection">> => <<"keep-alive">>,
<<"content-type">> => <<"application/json">>,
<<"keep-alive">> => <<"timeout=5">>
},
desc => ?DESC("config_headers")
}
)},
{body,
mk(
binary(),
#{
default => undefined,
desc => ?DESC("config_body")
}
)},
{max_retries,
mk(
non_neg_integer(),
#{
default => 2,
desc => ?DESC("config_max_retries")
}
)},
{request_timeout,
mk(
emqx_schema:duration_ms(),
#{
default => <<"15s">>,
deprecated => {since, "v5.0.26"},
desc => ?DESC("config_request_timeout")
}
)}
]. ].
webhook_creation_opts() -> %%--------------------------------------------------------------------
[ %% helpers for v2 only
{resource_opts,
connector_url_headers() ->
[url_field(), headers_field()].
%%--------------------------------------------------------------------
%% common funcs
%% `webhook` is kept for backward compatibility.
old_type_field() ->
{type,
mk( mk(
ref(?MODULE, "creation_opts"), enum([webhook, http]),
#{ #{
required => false, required => true,
default => #{}, desc => ?DESC("desc_type")
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
} }
)} )}.
].
unsupported_opts() ->
[
enable_batch,
batch_size,
batch_time
].
%%======================================================================================
type_field() -> type_field() ->
{type, {type,
mk( mk(
webhook, http,
#{ #{
required => true, required => true,
desc => ?DESC("desc_type") desc => ?DESC("desc_type")
@ -189,5 +238,189 @@ name_field() ->
} }
)}. )}.
method() -> url_field() ->
enum([post, put, get, delete]). {url,
mk(
binary(),
#{
required => true,
desc => ?DESC("config_url")
}
)}.
headers_field() ->
{headers,
mk(
map(),
#{
default => #{
<<"accept">> => <<"application/json">>,
<<"cache-control">> => <<"no-cache">>,
<<"connection">> => <<"keep-alive">>,
<<"content-type">> => <<"application/json">>,
<<"keep-alive">> => <<"timeout=5">>
},
desc => ?DESC("config_headers")
}
)}.
method_field() ->
{method,
mk(
enum([post, put, get, delete]),
#{
default => post,
desc => ?DESC("config_method")
}
)}.
body_field() ->
{body,
mk(
binary(),
#{
default => undefined,
desc => ?DESC("config_body")
}
)}.
max_retries_field() ->
{max_retries,
mk(
non_neg_integer(),
#{
default => 2,
desc => ?DESC("config_max_retries")
}
)}.
request_timeout_field() ->
{request_timeout,
mk(
emqx_schema:duration_ms(),
#{
default => <<"15s">>,
deprecated => {since, "v5.0.26"},
desc => ?DESC("config_request_timeout")
}
)}.
http_resource_opts() ->
[
{resource_opts,
mk(
ref(?MODULE, "resource_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
].
connector_opts() ->
mark_request_field_deperecated(
proplists:delete(max_retries, emqx_bridge_http_connector:fields(config))
).
mark_request_field_deperecated(Fields) ->
lists:map(
fun({K, V}) ->
case K of
request ->
{K, V#{
%% Note: if we want to deprecate a reference type, we have to change
%% it to a direct type first.
type => typerefl:map(),
deprecated => {since, "5.3.2"},
desc => <<"This field is never used, so we deprecated it since 5.3.2.">>
}};
_ ->
{K, V}
end
end,
Fields
).
%%--------------------------------------------------------------------
%% Examples
bridge_v2_examples(Method) ->
[
#{
<<"http">> => #{
summary => <<"HTTP Action">>,
value => values({Method, bridge_v2})
}
}
].
connector_examples(Method) ->
[
#{
<<"http">> => #{
summary => <<"HTTP Connector">>,
value => values({Method, connector})
}
}
].
values({get, Type}) ->
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
},
values({post, Type})
);
values({post, bridge_v2}) ->
maps:merge(
#{
name => <<"my_http_action">>,
type => <<"http">>
},
values({put, bridge_v2})
);
values({post, connector}) ->
maps:merge(
#{
name => <<"my_http_connector">>,
type => <<"http">>
},
values({put, connector})
);
values({put, bridge_v2}) ->
values(bridge_v2);
values({put, connector}) ->
values(connector);
values(bridge_v2) ->
#{
enable => true,
connector => <<"my_http_connector">>,
parameters => #{
path => <<"/room/${room_no}">>,
method => <<"post">>,
headers => #{},
body => <<"${.}">>
},
resource_opts => #{
worker_pool_size => 16,
health_check_interval => <<"15s">>,
query_mode => <<"async">>
}
};
values(connector) ->
#{
enable => true,
url => <<"http://localhost:8080/api/v1">>,
headers => #{<<"content-type">> => <<"application/json">>},
connect_timeout => <<"15s">>,
pool_type => <<"hash">>,
pool_size => 1,
enable_pipelining => 100
}.

View File

@ -39,18 +39,33 @@ all() ->
groups() -> groups() ->
[]. [].
init_per_suite(_Config) -> init_per_suite(Config0) ->
emqx_common_test_helpers:render_and_load_app_config(emqx_conf), Config =
ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge, emqx_rule_engine]), case os:getenv("DEBUG_CASE") of
ok = emqx_connector_test_helpers:start_apps([emqx_resource]), [_ | _] = DebugCase ->
{ok, _} = application:ensure_all_started(emqx_connector), CaseName = list_to_atom(DebugCase),
[]. [{debug_case, CaseName} | Config0];
_ ->
Config0
end,
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge_http,
emqx_bridge,
emqx_rule_engine
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
emqx_mgmt_api_test_util:init_suite(),
[{apps, Apps} | Config].
end_per_suite(_Config) -> end_per_suite(Config) ->
ok = emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge, emqx_conf]), Apps = ?config(apps, Config),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), emqx_mgmt_api_test_util:end_suite(),
_ = application:stop(emqx_connector), ok = emqx_cth_suite:stop(Apps),
_ = application:stop(emqx_bridge),
ok. ok.
suite() -> suite() ->
@ -115,7 +130,8 @@ end_per_testcase(TestCase, _Config) when
-> ->
ok = emqx_bridge_http_connector_test_server:stop(), ok = emqx_bridge_http_connector_test_server:stop(),
persistent_term:erase({?MODULE, times_called}), persistent_term:erase({?MODULE, times_called}),
emqx_bridge_testlib:delete_all_bridges(), emqx_bridge_v2_testlib:delete_all_bridges(),
emqx_bridge_v2_testlib:delete_all_connectors(),
emqx_common_test_helpers:call_janitor(), emqx_common_test_helpers:call_janitor(),
ok; ok;
end_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, Config) ->
@ -123,7 +139,8 @@ end_per_testcase(_TestCase, Config) ->
undefined -> ok; undefined -> ok;
Server -> stop_http_server(Server) Server -> stop_http_server(Server)
end, end,
emqx_bridge_testlib:delete_all_bridges(), emqx_bridge_v2_testlib:delete_all_bridges(),
emqx_bridge_v2_testlib:delete_all_connectors(),
emqx_common_test_helpers:call_janitor(), emqx_common_test_helpers:call_janitor(),
ok. ok.
@ -420,7 +437,7 @@ t_send_async_connection_timeout(Config) ->
), ),
NumberOfMessagesToSend = 10, NumberOfMessagesToSend = 10,
[ [
emqx_bridge:send_message(BridgeID, #{<<"id">> => Id}) do_send_message(#{<<"id">> => Id})
|| Id <- lists:seq(1, NumberOfMessagesToSend) || Id <- lists:seq(1, NumberOfMessagesToSend)
], ],
%% Make sure server receives all messages %% Make sure server receives all messages
@ -431,7 +448,7 @@ t_send_async_connection_timeout(Config) ->
t_async_free_retries(Config) -> t_async_free_retries(Config) ->
#{port := Port} = ?config(http_server, Config), #{port := Port} = ?config(http_server, Config),
BridgeID = make_bridge(#{ _BridgeID = make_bridge(#{
port => Port, port => Port,
pool_size => 1, pool_size => 1,
query_mode => "sync", query_mode => "sync",
@ -445,7 +462,7 @@ t_async_free_retries(Config) ->
Fn = fun(Get, Error) -> Fn = fun(Get, Error) ->
?assertMatch( ?assertMatch(
{ok, 200, _, _}, {ok, 200, _, _},
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}), do_send_message(#{<<"hello">> => <<"world">>}),
#{error => Error} #{error => Error}
), ),
?assertEqual(ExpectedAttempts, Get(), #{error => Error}) ?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@ -456,7 +473,7 @@ t_async_free_retries(Config) ->
t_async_common_retries(Config) -> t_async_common_retries(Config) ->
#{port := Port} = ?config(http_server, Config), #{port := Port} = ?config(http_server, Config),
BridgeID = make_bridge(#{ _BridgeID = make_bridge(#{
port => Port, port => Port,
pool_size => 1, pool_size => 1,
query_mode => "sync", query_mode => "sync",
@ -471,7 +488,7 @@ t_async_common_retries(Config) ->
FnSucceed = fun(Get, Error) -> FnSucceed = fun(Get, Error) ->
?assertMatch( ?assertMatch(
{ok, 200, _, _}, {ok, 200, _, _},
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}), do_send_message(#{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()} #{error => Error, attempts => Get()}
), ),
?assertEqual(ExpectedAttempts, Get(), #{error => Error}) ?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@ -479,7 +496,7 @@ t_async_common_retries(Config) ->
FnFail = fun(Get, Error) -> FnFail = fun(Get, Error) ->
?assertMatch( ?assertMatch(
Error, Error,
emqx_bridge:send_message(BridgeID, #{<<"hello">> => <<"world">>}), do_send_message(#{<<"hello">> => <<"world">>}),
#{error => Error, attempts => Get()} #{error => Error, attempts => Get()}
), ),
?assertEqual(ExpectedAttempts, Get(), #{error => Error}) ?assertEqual(ExpectedAttempts, Get(), #{error => Error})
@ -559,7 +576,7 @@ t_path_not_found(Config) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
?assertEqual([], ?of_kind(webhook_will_retry_async, Trace)), ?assertEqual([], ?of_kind(http_will_retry_async, Trace)),
ok ok
end end
), ),
@ -600,7 +617,7 @@ t_too_many_requests(Config) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
?assertMatch([_ | _], ?of_kind(webhook_will_retry_async, Trace)), ?assertMatch([_ | _], ?of_kind(http_will_retry_async, Trace)),
ok ok
end end
), ),
@ -711,6 +728,11 @@ t_bridge_probes_header_atoms(Config) ->
ok. ok.
%% helpers %% helpers
do_send_message(Message) ->
Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(?BRIDGE_TYPE),
emqx_bridge_v2:send_message(Type, ?BRIDGE_NAME, Message, #{}).
do_t_async_retries(TestCase, TestContext, Error, Fn) -> do_t_async_retries(TestCase, TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext, #{error_attempts := ErrorAttempts} = TestContext,
PTKey = {?MODULE, TestCase, attempts}, PTKey = {?MODULE, TestCase, attempts},

View File

@ -0,0 +1,140 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_http_v2_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_mgmt_api_test_util, [request/3]).
-import(emqx_common_test_helpers, [on_exit/1]).
-import(emqx_bridge_http_SUITE, [start_http_server/1, stop_http_server/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
-define(BRIDGE_TYPE, <<"http">>).
-define(BRIDGE_NAME, atom_to_binary(?MODULE)).
-define(CONNECTOR_NAME, atom_to_binary(?MODULE)).
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
init_per_suite(Config0) ->
Config =
case os:getenv("DEBUG_CASE") of
[_ | _] = DebugCase ->
CaseName = list_to_atom(DebugCase),
[{debug_case, CaseName} | Config0];
_ ->
Config0
end,
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge_http,
emqx_bridge,
emqx_rule_engine
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
emqx_mgmt_api_test_util:init_suite(),
[{apps, Apps} | Config].
end_per_suite(Config) ->
Apps = ?config(apps, Config),
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_cth_suite:stop(Apps),
ok.
suite() ->
[{timetrap, {seconds, 60}}].
init_per_testcase(_TestCase, Config) ->
Server = start_http_server(#{response_delay_ms => 0}),
[{http_server, Server} | Config].
end_per_testcase(_TestCase, Config) ->
case ?config(http_server, Config) of
undefined -> ok;
Server -> stop_http_server(Server)
end,
emqx_bridge_v2_testlib:delete_all_bridges(),
emqx_bridge_v2_testlib:delete_all_connectors(),
emqx_common_test_helpers:call_janitor(),
ok.
%%--------------------------------------------------------------------
%% tests
%%--------------------------------------------------------------------
t_compose_connector_url_and_action_path(Config) ->
Path = <<"/foo/bar">>,
ConnectorCfg = make_connector_config(Config),
ActionCfg = make_action_config([{path, Path} | Config]),
CreateConfig = [
{bridge_type, ?BRIDGE_TYPE},
{bridge_name, ?BRIDGE_NAME},
{bridge_config, ActionCfg},
{connector_type, ?BRIDGE_TYPE},
{connector_name, ?CONNECTOR_NAME},
{connector_config, ConnectorCfg}
],
{ok, _} = emqx_bridge_v2_testlib:create_bridge(CreateConfig),
%% assert: the url returned v1 api is composed by the url of the connector and the
%% path of the action
#{port := Port} = ?config(http_server, Config),
ExpectedUrl = iolist_to_binary(io_lib:format("http://localhost:~p/foo/bar", [Port])),
{ok, {_, _, [Bridge]}} = emqx_bridge_testlib:list_bridges_api(),
?assertMatch(
#{<<"url">> := ExpectedUrl},
Bridge
),
ok.
%%--------------------------------------------------------------------
%% helpers
%%--------------------------------------------------------------------
make_connector_config(Config) ->
#{port := Port} = ?config(http_server, Config),
#{
<<"enable">> => true,
<<"url">> => iolist_to_binary(io_lib:format("http://localhost:~p", [Port])),
<<"headers">> => #{},
<<"pool_type">> => <<"hash">>,
<<"pool_size">> => 1
}.
make_action_config(Config) ->
Path = ?config(path, Config),
#{
<<"enable">> => true,
<<"connector">> => ?CONNECTOR_NAME,
<<"parameters">> => #{
<<"path">> => Path,
<<"method">> => <<"post">>,
<<"headers">> => #{},
<<"body">> => <<"${.}">>
}
}.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_influxdb, [ {application, emqx_bridge_influxdb, [
{description, "EMQX Enterprise InfluxDB Bridge"}, {description, "EMQX Enterprise InfluxDB Bridge"},
{vsn, "0.1.6"}, {vsn, "0.1.7"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_kafka, [ {application, emqx_bridge_kafka, [
{description, "EMQX Enterprise Kafka Bridge"}, {description, "EMQX Enterprise Kafka Bridge"},
{vsn, "0.1.12"}, {vsn, "0.1.13"},
{registered, [emqx_bridge_kafka_consumer_sup]}, {registered, [emqx_bridge_kafka_consumer_sup]},
{applications, [ {applications, [
kernel, kernel,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_mqtt, [ {application, emqx_bridge_mqtt, [
{description, "EMQX MQTT Broker Bridge"}, {description, "EMQX MQTT Broker Bridge"},
{vsn, "0.1.5"}, {vsn, "0.1.6"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_mysql, [ {application, emqx_bridge_mysql, [
{description, "EMQX Enterprise MySQL Bridge"}, {description, "EMQX Enterprise MySQL Bridge"},
{vsn, "0.1.2"}, {vsn, "0.1.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pgsql, [ {application, emqx_bridge_pgsql, [
{description, "EMQX Enterprise PostgreSQL Bridge"}, {description, "EMQX Enterprise PostgreSQL Bridge"},
{vsn, "0.1.4"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rabbitmq, [ {application, emqx_bridge_rabbitmq, [
{description, "EMQX Enterprise RabbitMQ Bridge"}, {description, "EMQX Enterprise RabbitMQ Bridge"},
{vsn, "0.1.6"}, {vsn, "0.1.7"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_redis, [ {application, emqx_bridge_redis, [
{description, "EMQX Enterprise Redis Bridge"}, {description, "EMQX Enterprise Redis Bridge"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -66,6 +66,7 @@
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_conf.hrl"). -include("emqx_conf.hrl").
-ifdef(TEST). -ifdef(TEST).
@ -384,6 +385,7 @@ catch_up(State) -> catch_up(State, false).
catch_up(#{node := Node, retry_interval := RetryMs, is_leaving := false} = State, SkipResult) -> catch_up(#{node := Node, retry_interval := RetryMs, is_leaving := false} = State, SkipResult) ->
case transaction(fun ?MODULE:read_next_mfa/1, [Node]) of case transaction(fun ?MODULE:read_next_mfa/1, [Node]) of
{atomic, caught_up} -> {atomic, caught_up} ->
?tp(cluster_rpc_caught_up, #{}),
?TIMEOUT; ?TIMEOUT;
{atomic, {still_lagging, NextId, MFA}} -> {atomic, {still_lagging, NextId, MFA}} ->
{Succeed, _} = apply_mfa(NextId, MFA, ?APPLY_KIND_REPLICATE), {Succeed, _} = apply_mfa(NextId, MFA, ?APPLY_KIND_REPLICATE),

View File

@ -1,6 +1,6 @@
{application, emqx_conf, [ {application, emqx_conf, [
{description, "EMQX configuration management"}, {description, "EMQX configuration management"},
{vsn, "0.1.31"}, {vsn, "0.1.32"},
{registered, []}, {registered, []},
{mod, {emqx_conf_app, []}}, {mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib, emqx_ctl]}, {applications, [kernel, stdlib, emqx_ctl]},

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_connector, [ {application, emqx_connector, [
{description, "EMQX Data Integration Connectors"}, {description, "EMQX Data Integration Connectors"},
{vsn, "0.1.34"}, {vsn, "0.1.35"},
{registered, []}, {registered, []},
{mod, {emqx_connector_app, []}}, {mod, {emqx_connector_app, []}},
{applications, [ {applications, [

View File

@ -137,7 +137,7 @@ param_path_id() ->
#{ #{
in => path, in => path,
required => true, required => true,
example => <<"webhook:webhook_example">>, example => <<"http:my_http_connector">>,
desc => ?DESC("desc_param_path_id") desc => ?DESC("desc_param_path_id")
} }
)}. )}.
@ -158,17 +158,7 @@ connector_info_array_example(Method) ->
lists:map(fun(#{value := Config}) -> Config end, maps:values(connector_info_examples(Method))). lists:map(fun(#{value := Config}) -> Config end, maps:values(connector_info_examples(Method))).
connector_info_examples(Method) -> connector_info_examples(Method) ->
maps:merge( emqx_connector_schema:examples(Method).
#{},
emqx_enterprise_connector_examples(Method)
).
-if(?EMQX_RELEASE_EDITION == ee).
emqx_enterprise_connector_examples(Method) ->
emqx_connector_ee_schema:examples(Method).
-else.
emqx_enterprise_connector_examples(_Method) -> #{}.
-endif.
schema("/connectors") -> schema("/connectors") ->
#{ #{

View File

@ -49,6 +49,8 @@
get_channels/2 get_channels/2
]). ]).
-export([parse_url/1]).
-callback connector_config(ParsedConfig) -> -callback connector_config(ParsedConfig) ->
ParsedConfig ParsedConfig
when when
@ -77,8 +79,10 @@ connector_impl_module(_ConnectorType) ->
-endif. -endif.
connector_to_resource_type_ce(_ConnectorType) -> connector_to_resource_type_ce(http) ->
no_bridge_v2_for_c2_so_far. emqx_bridge_http_connector;
connector_to_resource_type_ce(ConnectorType) ->
error({no_bridge_v2, ConnectorType}).
resource_id(ConnectorId) when is_binary(ConnectorId) -> resource_id(ConnectorId) when is_binary(ConnectorId) ->
<<"connector:", ConnectorId/binary>>. <<"connector:", ConnectorId/binary>>.
@ -271,13 +275,11 @@ remove(Type, Name, _Conf, _Opts) ->
%% convert connector configs to what the connector modules want %% convert connector configs to what the connector modules want
parse_confs( parse_confs(
<<"webhook">>, <<"http">>,
_Name, _Name,
#{ #{
url := Url, url := Url,
method := Method, headers := Headers
headers := Headers,
max_retries := Retry
} = Conf } = Conf
) -> ) ->
Url1 = bin(Url), Url1 = bin(Url),
@ -290,20 +292,14 @@ parse_confs(
Reason1 = emqx_utils:readable_error_msg(Reason), Reason1 = emqx_utils:readable_error_msg(Reason),
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>) invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
end, end,
RequestTTL = emqx_utils_maps:deep_get(
[resource_opts, request_ttl],
Conf
),
Conf#{ Conf#{
base_url => BaseUrl1, base_url => BaseUrl1,
request => request =>
#{ #{
path => Path, path => Path,
method => Method,
body => maps:get(body, Conf, undefined),
headers => Headers, headers => Headers,
request_ttl => RequestTTL, body => undefined,
max_retries => Retry method => undefined
} }
}; };
parse_confs(<<"iotdb">>, Name, Conf) -> parse_confs(<<"iotdb">>, Name, Conf) ->

View File

@ -15,7 +15,8 @@
-export([ -export([
api_schemas/1, api_schemas/1,
fields/1, fields/1,
examples/1 %%examples/1
schema_modules/0
]). ]).
resource_type(Type) when is_binary(Type) -> resource_type(Type) when is_binary(Type) ->
@ -141,18 +142,6 @@ connector_structs() ->
)} )}
]. ].
examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, connector_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
lists:foldl(Fun, #{}, schema_modules()).
schema_modules() -> schema_modules() ->
[ [
emqx_bridge_azure_event_hub, emqx_bridge_azure_event_hub,

View File

@ -42,6 +42,8 @@
-export([resource_opts_fields/0, resource_opts_fields/1]). -export([resource_opts_fields/0, resource_opts_fields/1]).
-export([examples/1]).
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
enterprise_api_schemas(Method) -> enterprise_api_schemas(Method) ->
%% We *must* do this to ensure the module is really loaded, especially when we use %% We *must* do this to ensure the module is really loaded, especially when we use
@ -71,9 +73,40 @@ enterprise_fields_connectors() -> [].
-endif. -endif.
api_schemas(Method) ->
[
%% We need to map the `type' field of a request (binary) to a
%% connector schema module.
api_ref(emqx_bridge_http_schema, <<"http">>, Method ++ "_connector")
].
api_ref(Module, Type, Method) ->
{Type, ref(Module, Method)}.
examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, connector_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
lists:foldl(Fun, #{}, schema_modules()).
-if(?EMQX_RELEASE_EDITION == ee).
schema_modules() ->
[emqx_bridge_http_schema] ++ emqx_connector_ee_schema:schema_modules().
-else.
schema_modules() ->
[emqx_bridge_http_schema].
-endif.
connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]; connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer];
connector_type_to_bridge_types(confluent_producer) -> [confluent_producer]; connector_type_to_bridge_types(confluent_producer) -> [confluent_producer];
connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer]; connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer];
connector_type_to_bridge_types(http) -> [http, webhook];
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
connector_type_to_bridge_types(matrix) -> [matrix]; connector_type_to_bridge_types(matrix) -> [matrix];
connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
@ -311,8 +344,9 @@ post_request() ->
api_schema("post"). api_schema("post").
api_schema(Method) -> api_schema(Method) ->
CE = api_schemas(Method),
EE = enterprise_api_schemas(Method), EE = enterprise_api_schemas(Method),
hoconsc:union(connector_api_union(EE)). hoconsc:union(connector_api_union(CE ++ EE)).
connector_api_union(Refs) -> connector_api_union(Refs) ->
Index = maps:from_list(Refs), Index = maps:from_list(Refs),
@ -357,7 +391,17 @@ roots() ->
end. end.
fields(connectors) -> fields(connectors) ->
[] ++ enterprise_fields_connectors(); [
{http,
mk(
hoconsc:map(name, ref(emqx_bridge_http_schema, "config_connector")),
#{
alias => [webhook],
desc => <<"HTTP Connector Config">>,
required => false
}
)}
] ++ enterprise_fields_connectors();
fields("node_status") -> fields("node_status") ->
[ [
node_name(), node_name(),

View File

@ -2,7 +2,7 @@
{application, emqx_dashboard, [ {application, emqx_dashboard, [
{description, "EMQX Web Dashboard"}, {description, "EMQX Web Dashboard"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.30"}, {vsn, "5.0.31"},
{modules, []}, {modules, []},
{registered, [emqx_dashboard_sup]}, {registered, [emqx_dashboard_sup]},
{applications, [ {applications, [

View File

@ -1,6 +1,6 @@
{application, emqx_dashboard_rbac, [ {application, emqx_dashboard_rbac, [
{description, "EMQX Dashboard RBAC"}, {description, "EMQX Dashboard RBAC"},
{vsn, "0.1.1"}, {vsn, "0.1.2"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_dashboard_sso, [ {application, emqx_dashboard_sso, [
{description, "EMQX Dashboard Single Sign-On"}, {description, "EMQX Dashboard Single Sign-On"},
{vsn, "0.1.2"}, {vsn, "0.1.3"},
{registered, [emqx_dashboard_sso_sup]}, {registered, [emqx_dashboard_sso_sup]},
{applications, [ {applications, [
kernel, kernel,

View File

@ -2,7 +2,7 @@
{application, emqx_durable_storage, [ {application, emqx_durable_storage, [
{description, "Message persistence and subscription replays for EMQX"}, {description, "Message persistence and subscription replays for EMQX"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "0.1.7"}, {vsn, "0.1.8"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, rocksdb, gproc, mria, emqx_utils]}, {applications, [kernel, stdlib, rocksdb, gproc, mria, emqx_utils]},

View File

@ -1,6 +1,6 @@
{application, emqx_enterprise, [ {application, emqx_enterprise, [
{description, "EMQX Enterprise Edition"}, {description, "EMQX Enterprise Edition"},
{vsn, "0.1.5"}, {vsn, "0.1.6"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_gateway, [ {application, emqx_gateway, [
{description, "The Gateway management application"}, {description, "The Gateway management application"},
{vsn, "0.1.27"}, {vsn, "0.1.28"},
{registered, []}, {registered, []},
{mod, {emqx_gateway_app, []}}, {mod, {emqx_gateway_app, []}},
{applications, [kernel, stdlib, emqx, emqx_auth, emqx_ctl]}, {applications, [kernel, stdlib, emqx, emqx_auth, emqx_ctl]},

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_gateway_lwm2m, [ {application, emqx_gateway_lwm2m, [
{description, "LwM2M Gateway"}, {description, "LwM2M Gateway"},
{vsn, "0.1.4"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap]}, {applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap]},
{env, []}, {env, []},

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_gateway_stomp, [ {application, emqx_gateway_stomp, [
{description, "Stomp Gateway"}, {description, "Stomp Gateway"},
{vsn, "0.1.4"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]}, {applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []}, {env, []},

View File

@ -1,6 +1,6 @@
{application, emqx_ldap, [ {application, emqx_ldap, [
{description, "EMQX LDAP Connector"}, {description, "EMQX LDAP Connector"},
{vsn, "0.1.5"}, {vsn, "0.1.6"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_license, [ {application, emqx_license, [
{description, "EMQX License"}, {description, "EMQX License"},
{vsn, "5.0.13"}, {vsn, "5.0.14"},
{modules, []}, {modules, []},
{registered, [emqx_license_sup]}, {registered, [emqx_license_sup]},
{applications, [kernel, stdlib, emqx_ctl]}, {applications, [kernel, stdlib, emqx_ctl]},

View File

@ -72,11 +72,17 @@ check_license_watermark(Conf) ->
undefined -> undefined ->
true; true;
Low -> Low ->
High = hocon_maps:get("license.connection_high_watermark", Conf), case hocon_maps:get("license.connection_high_watermark", Conf) of
case High =/= undefined andalso High > Low of undefined ->
{bad_license_watermark, #{high => undefined, low => Low}};
High ->
{ok, HighFloat} = emqx_schema:to_percent(High),
{ok, LowFloat} = emqx_schema:to_percent(Low),
case HighFloat > LowFloat of
true -> true; true -> true;
false -> {bad_license_watermark, #{high => High, low => Low}} false -> {bad_license_watermark, #{high => High, low => Low}}
end end
end
end. end.
%% @doc The default license key. %% @doc The default license key.

View File

@ -204,6 +204,17 @@ t_license_setting(_Config) ->
?assertEqual(0.5, emqx_config:get([license, connection_low_watermark])), ?assertEqual(0.5, emqx_config:get([license, connection_low_watermark])),
?assertEqual(0.55, emqx_config:get([license, connection_high_watermark])), ?assertEqual(0.55, emqx_config:get([license, connection_high_watermark])),
%% update
Low1 = <<"50%">>,
High1 = <<"100%">>,
UpdateRes1 = request(put, uri(["license", "setting"]), #{
<<"connection_low_watermark">> => Low1,
<<"connection_high_watermark">> => High1
}),
validate_setting(UpdateRes1, Low1, High1),
?assertEqual(0.5, emqx_config:get([license, connection_low_watermark])),
?assertEqual(1.0, emqx_config:get([license, connection_high_watermark])),
%% update bad setting low >= high %% update bad setting low >= high
?assertMatch( ?assertMatch(
{ok, 400, _}, {ok, 400, _},

View File

@ -3,7 +3,7 @@
{id, "emqx_machine"}, {id, "emqx_machine"},
{description, "The EMQX Machine"}, {description, "The EMQX Machine"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "0.2.16"}, {vsn, "0.2.17"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_ctl]}, {applications, [kernel, stdlib, emqx_ctl]},

View File

@ -2,7 +2,7 @@
{application, emqx_management, [ {application, emqx_management, [
{description, "EMQX Management API and CLI"}, {description, "EMQX Management API and CLI"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.33"}, {vsn, "5.0.34"},
{modules, []}, {modules, []},
{registered, [emqx_management_sup]}, {registered, [emqx_management_sup]},
{applications, [kernel, stdlib, emqx_plugins, minirest, emqx, emqx_ctl, emqx_bridge_http]}, {applications, [kernel, stdlib, emqx_plugins, minirest, emqx, emqx_ctl, emqx_bridge_http]},

View File

@ -464,6 +464,7 @@ apps_to_start() ->
emqx_modules, emqx_modules,
emqx_gateway, emqx_gateway,
emqx_exhook, emqx_exhook,
emqx_bridge_http,
emqx_bridge, emqx_bridge,
emqx_auto_subscribe, emqx_auto_subscribe,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_modules, [ {application, emqx_modules, [
{description, "EMQX Modules"}, {description, "EMQX Modules"},
{vsn, "5.0.23"}, {vsn, "5.0.24"},
{modules, []}, {modules, []},
{applications, [kernel, stdlib, emqx, emqx_ctl]}, {applications, [kernel, stdlib, emqx, emqx_ctl]},
{mod, {emqx_modules_app, []}}, {mod, {emqx_modules_app, []}},

View File

@ -1,6 +1,6 @@
{application, emqx_mongodb, [ {application, emqx_mongodb, [
{description, "EMQX MongoDB Connector"}, {description, "EMQX MongoDB Connector"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_mysql, [ {application, emqx_mysql, [
{description, "EMQX MySQL Database Connector"}, {description, "EMQX MySQL Database Connector"},
{vsn, "0.1.4"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_plugins, [ {application, emqx_plugins, [
{description, "EMQX Plugin Management"}, {description, "EMQX Plugin Management"},
{vsn, "0.1.7"}, {vsn, "0.1.8"},
{modules, []}, {modules, []},
{mod, {emqx_plugins_app, []}}, {mod, {emqx_plugins_app, []}},
{applications, [kernel, stdlib, emqx]}, {applications, [kernel, stdlib, emqx]},

View File

@ -1,6 +1,6 @@
{application, emqx_postgresql, [ {application, emqx_postgresql, [
{description, "EMQX PostgreSQL Database Connector"}, {description, "EMQX PostgreSQL Database Connector"},
{vsn, "0.1.0"}, {vsn, "0.1.1"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -2,7 +2,7 @@
{application, emqx_prometheus, [ {application, emqx_prometheus, [
{description, "Prometheus for EMQX"}, {description, "Prometheus for EMQX"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.17"}, {vsn, "5.0.18"},
{modules, []}, {modules, []},
{registered, [emqx_prometheus_sup]}, {registered, [emqx_prometheus_sup]},
{applications, [kernel, stdlib, prometheus, emqx, emqx_management]}, {applications, [kernel, stdlib, prometheus, emqx, emqx_management]},

View File

@ -1,10 +1,11 @@
{application, emqx_redis, [ {application, emqx_redis, [
{description, "EMQX Redis Database Connector"}, {description, "EMQX Redis Database Connector"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,
stdlib, stdlib,
eredis,
eredis_cluster, eredis_cluster,
emqx_connector, emqx_connector,
emqx_resource emqx_resource

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_resource, [ {application, emqx_resource, [
{description, "Manager for all external resources"}, {description, "Manager for all external resources"},
{vsn, "0.1.25"}, {vsn, "0.1.26"},
{registered, []}, {registered, []},
{mod, {emqx_resource_app, []}}, {mod, {emqx_resource_app, []}},
{applications, [ {applications, [

View File

@ -80,7 +80,7 @@ worker_pool_size_test_() ->
Conf = emqx_utils_maps:deep_put( Conf = emqx_utils_maps:deep_put(
[ [
<<"bridges">>, <<"bridges">>,
<<"webhook">>, <<"http">>,
<<"simple">>, <<"simple">>,
<<"resource_opts">>, <<"resource_opts">>,
<<"worker_pool_size">> <<"worker_pool_size">>
@ -88,7 +88,7 @@ worker_pool_size_test_() ->
BaseConf, BaseConf,
WorkerPoolSize WorkerPoolSize
), ),
#{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := CheckedConf}}} = check(Conf), #{<<"bridges">> := #{<<"http">> := #{<<"simple">> := CheckedConf}}} = check(Conf),
#{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf, #{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf,
WPS WPS
end, end,
@ -117,7 +117,7 @@ worker_pool_size_test_() ->
%%=========================================================================== %%===========================================================================
parse_and_check_webhook_bridge(Hocon) -> parse_and_check_webhook_bridge(Hocon) ->
#{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)), #{<<"bridges">> := #{<<"http">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)),
Conf. Conf.
parse(Hocon) -> parse(Hocon) ->

View File

@ -2,7 +2,7 @@
{application, emqx_retainer, [ {application, emqx_retainer, [
{description, "EMQX Retainer"}, {description, "EMQX Retainer"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.18"}, {vsn, "5.0.19"},
{modules, []}, {modules, []},
{registered, [emqx_retainer_sup]}, {registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx, emqx_ctl]}, {applications, [kernel, stdlib, emqx, emqx_ctl]},

View File

@ -2,7 +2,7 @@
{application, emqx_rule_engine, [ {application, emqx_rule_engine, [
{description, "EMQX Rule Engine"}, {description, "EMQX Rule Engine"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.29"}, {vsn, "5.0.30"},
{modules, []}, {modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [ {applications, [

View File

@ -583,10 +583,18 @@ get_referenced_hookpoints(Froms) ->
]. ].
get_egress_bridges(Actions) -> get_egress_bridges(Actions) ->
[ lists:foldr(
emqx_bridge_resource:bridge_id(BridgeType, BridgeName) fun
|| {bridge, BridgeType, BridgeName, _ResId} <- Actions ({bridge, BridgeType, BridgeName, _ResId}, Acc) ->
]. [emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc];
({bridge_v2, BridgeType, BridgeName}, Acc) ->
[emqx_bridge_resource:bridge_id(BridgeType, BridgeName) | Acc];
(_, Acc) ->
Acc
end,
[],
Actions
).
%% For allowing an external application to add extra "built-in" functions to the %% For allowing an external application to add extra "built-in" functions to the
%% rule engine SQL like language. The module set by %% rule engine SQL like language. The module set by

View File

@ -3468,7 +3468,7 @@ t_get_basic_usage_info_1(_Config) ->
referenced_bridges => referenced_bridges =>
#{ #{
mqtt => 1, mqtt => 1,
webhook => 3 http => 3
} }
}, },
emqx_rule_engine:get_basic_usage_info() emqx_rule_engine:get_basic_usage_info()

View File

@ -41,44 +41,32 @@ suite() ->
apps() -> apps() ->
[ [
emqx_conf, emqx_conf,
emqx_management, emqx_connector,
emqx_retainer, emqx_retainer,
emqx_auth, emqx_auth,
emqx_auth_redis, emqx_auth_redis,
emqx_auth_mnesia, emqx_auth_mnesia,
emqx_auth_postgresql, emqx_auth_postgresql,
emqx_modules, emqx_modules,
emqx_telemetry emqx_telemetry,
emqx_bridge_http,
emqx_bridge,
emqx_rule_engine,
emqx_management
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
net_kernel:start(['master@127.0.0.1', longnames]), WorkDir = ?config(priv_dir, Config),
ok = meck:new(emqx_authz_file, [non_strict, passthrough, no_history, no_link]), Apps = emqx_cth_suite:start(apps(), #{work_dir => WorkDir}),
meck:expect( emqx_mgmt_api_test_util:init_suite(),
emqx_authz_file, [{apps, Apps}, {work_dir, WorkDir} | Config].
acl_conf_file,
fun() ->
emqx_common_test_helpers:deps_path(emqx_auth, "etc/acl.conf")
end
),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
emqx_gateway_test_utils:load_all_gateway_apps(),
start_apps(),
Config.
end_per_suite(_Config) -> end_per_suite(Config) ->
{ok, _} = emqx:update_config(
[authorization],
#{
<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []
}
),
mnesia:clear_table(cluster_rpc_commit), mnesia:clear_table(cluster_rpc_commit),
mnesia:clear_table(cluster_rpc_mfa), mnesia:clear_table(cluster_rpc_mfa),
stop_apps(), Apps = ?config(apps, Config),
meck:unload(emqx_authz_file), emqx_mgmt_api_test_util:end_suite(),
ok = emqx_cth_suite:stop(Apps),
ok. ok.
init_per_testcase(t_get_telemetry_without_memsup, Config) -> init_per_testcase(t_get_telemetry_without_memsup, Config) ->
@ -123,7 +111,6 @@ init_per_testcase(t_advanced_mqtt_features, Config) ->
mock_advanced_mqtt_features(), mock_advanced_mqtt_features(),
Config; Config;
init_per_testcase(t_authn_authz_info, Config) -> init_per_testcase(t_authn_authz_info, Config) ->
mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
create_authn('mqtt:global', built_in_database), create_authn('mqtt:global', built_in_database),
create_authn('tcp:default', redis), create_authn('tcp:default', redis),
@ -141,14 +128,11 @@ init_per_testcase(t_send_after_enable, Config) ->
mock_httpc(), mock_httpc(),
Config; Config;
init_per_testcase(t_rule_engine_and_data_bridge_info, Config) -> init_per_testcase(t_rule_engine_and_data_bridge_info, Config) ->
mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_bridge]),
ok = emqx_bridge_SUITE:setup_fake_telemetry_data(), ok = emqx_bridge_SUITE:setup_fake_telemetry_data(),
ok = setup_fake_rule_engine_data(), ok = setup_fake_rule_engine_data(),
Config; Config;
init_per_testcase(t_exhook_info, Config) -> init_per_testcase(t_exhook_info, Config) ->
mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
ExhookConf = ExhookConf =
#{ #{
@ -173,31 +157,8 @@ init_per_testcase(t_cluster_uuid, Config) ->
Node = start_slave(n1), Node = start_slave(n1),
[{n1, Node} | Config]; [{n1, Node} | Config];
init_per_testcase(t_uuid_restored_from_file, Config) -> init_per_testcase(t_uuid_restored_from_file, Config) ->
mock_httpc(), Config;
NodeUUID = <<"AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE">>,
ClusterUUID = <<"FFFFFFFF-GGGG-HHHH-IIII-JJJJJJJJJJJJ">>,
DataDir = emqx:data_dir(),
NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
file:delete(NodeUUIDFile),
file:delete(ClusterUUIDFile),
ok = file:write_file(NodeUUIDFile, NodeUUID),
ok = file:write_file(ClusterUUIDFile, ClusterUUID),
%% clear the UUIDs in the DB
{atomic, ok} = mria:clear_table(emqx_telemetry),
stop_apps(),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
start_apps(),
Node = start_slave(n1),
[
{n1, Node},
{node_uuid, NodeUUID},
{cluster_uuid, ClusterUUID}
| Config
];
init_per_testcase(t_uuid_saved_to_file, Config) -> init_per_testcase(t_uuid_saved_to_file, Config) ->
mock_httpc(),
DataDir = emqx:data_dir(), DataDir = emqx:data_dir(),
NodeUUIDFile = filename:join(DataDir, "node.uuid"), NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"), ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
@ -205,7 +166,6 @@ init_per_testcase(t_uuid_saved_to_file, Config) ->
file:delete(ClusterUUIDFile), file:delete(ClusterUUIDFile),
Config; Config;
init_per_testcase(t_num_clients, Config) -> init_per_testcase(t_num_clients, Config) ->
mock_httpc(),
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
Config; Config;
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
@ -227,7 +187,6 @@ end_per_testcase(t_advanced_mqtt_features, _Config) ->
{atomic, ok} = mria:clear_table(emqx_delayed), {atomic, ok} = mria:clear_table(emqx_delayed),
ok; ok;
end_per_testcase(t_authn_authz_info, _Config) -> end_per_testcase(t_authn_authz_info, _Config) ->
meck:unload([httpc]),
emqx_authz:update({delete, postgresql}, #{}), emqx_authz:update({delete, postgresql}, #{}),
lists:foreach( lists:foreach(
fun(ChainName) -> fun(ChainName) ->
@ -244,19 +203,8 @@ end_per_testcase(t_enable, _Config) ->
end_per_testcase(t_send_after_enable, _Config) -> end_per_testcase(t_send_after_enable, _Config) ->
meck:unload([httpc, emqx_telemetry_config]); meck:unload([httpc, emqx_telemetry_config]);
end_per_testcase(t_rule_engine_and_data_bridge_info, _Config) -> end_per_testcase(t_rule_engine_and_data_bridge_info, _Config) ->
meck:unload(httpc),
lists:foreach(
fun(App) ->
ok = application:stop(App)
end,
[
emqx_bridge,
emqx_rule_engine
]
),
ok; ok;
end_per_testcase(t_exhook_info, _Config) -> end_per_testcase(t_exhook_info, _Config) ->
meck:unload(httpc),
emqx_exhook_demo_svr:stop(), emqx_exhook_demo_svr:stop(),
application:stop(emqx_exhook), application:stop(emqx_exhook),
ok; ok;
@ -264,21 +212,12 @@ end_per_testcase(t_cluster_uuid, Config) ->
Node = proplists:get_value(n1, Config), Node = proplists:get_value(n1, Config),
ok = stop_slave(Node); ok = stop_slave(Node);
end_per_testcase(t_num_clients, Config) -> end_per_testcase(t_num_clients, Config) ->
meck:unload([httpc]),
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
Config; Config;
end_per_testcase(t_uuid_restored_from_file, Config) ->
Node = ?config(n1, Config),
DataDir = emqx:data_dir(),
NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
ok = file:delete(NodeUUIDFile),
ok = file:delete(ClusterUUIDFile),
meck:unload([httpc]),
ok = stop_slave(Node),
ok;
end_per_testcase(_Testcase, _Config) -> end_per_testcase(_Testcase, _Config) ->
meck:unload([httpc]), case catch meck:unload([httpc]) of
_ -> ok
end,
ok. ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -315,19 +254,34 @@ t_cluster_uuid(Config) ->
%% should attempt read UUID from file in data dir to keep UUIDs %% should attempt read UUID from file in data dir to keep UUIDs
%% unique, in the event of a database purge. %% unique, in the event of a database purge.
t_uuid_restored_from_file(Config) -> t_uuid_restored_from_file(Config) ->
ExpectedNodeUUID = ?config(node_uuid, Config), %% Stop the emqx_telemetry application first
ExpectedClusterUUID = ?config(cluster_uuid, Config), {atomic, ok} = mria:clear_table(emqx_telemetry),
application:stop(emqx_telemetry),
%% Rewrite the the uuid files
NodeUUID = <<"AAAAAAAA-BBBB-CCCC-DDDD-EEEEEEEEEEEE">>,
ClusterUUID = <<"FFFFFFFF-GGGG-HHHH-IIII-JJJJJJJJJJJJ">>,
DataDir = ?config(work_dir, Config),
NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
ok = file:write_file(NodeUUIDFile, NodeUUID),
ok = file:write_file(ClusterUUIDFile, ClusterUUID),
%% Start the emqx_telemetry application again
application:start(emqx_telemetry),
%% Check the UUIDs
?assertEqual( ?assertEqual(
{ok, ExpectedNodeUUID}, {ok, NodeUUID},
emqx_telemetry:get_node_uuid() emqx_telemetry:get_node_uuid()
), ),
?assertEqual( ?assertEqual(
{ok, ExpectedClusterUUID}, {ok, ClusterUUID},
emqx_telemetry:get_cluster_uuid() emqx_telemetry:get_cluster_uuid()
), ),
ok. ok.
t_uuid_saved_to_file(_Config) -> t_uuid_saved_to_file(Config) ->
DataDir = emqx:data_dir(), DataDir = emqx:data_dir(),
NodeUUIDFile = filename:join(DataDir, "node.uuid"), NodeUUIDFile = filename:join(DataDir, "node.uuid"),
ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"), ClusterUUIDFile = filename:join(DataDir, "cluster.uuid"),
@ -337,9 +291,10 @@ t_uuid_saved_to_file(_Config) ->
%% clear the UUIDs in the DB %% clear the UUIDs in the DB
{atomic, ok} = mria:clear_table(emqx_telemetry), {atomic, ok} = mria:clear_table(emqx_telemetry),
stop_apps(), application:stop(emqx_telemetry),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
start_apps(), application:start(emqx_telemetry),
{ok, NodeUUID} = emqx_telemetry:get_node_uuid(), {ok, NodeUUID} = emqx_telemetry:get_node_uuid(),
{ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(), {ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(),
?assertEqual( ?assertEqual(
@ -578,6 +533,7 @@ t_mqtt_runtime_insights(_) ->
t_rule_engine_and_data_bridge_info(_Config) -> t_rule_engine_and_data_bridge_info(_Config) ->
{ok, TelemetryData} = emqx_telemetry:get_telemetry(), {ok, TelemetryData} = emqx_telemetry:get_telemetry(),
ct:pal("telemetry data: ~p~n", [TelemetryData]),
RuleInfo = get_value(rule_engine, TelemetryData), RuleInfo = get_value(rule_engine, TelemetryData),
BridgeInfo = get_value(bridge, TelemetryData), BridgeInfo = get_value(bridge, TelemetryData),
?assertEqual( ?assertEqual(
@ -588,7 +544,7 @@ t_rule_engine_and_data_bridge_info(_Config) ->
#{ #{
data_bridge => data_bridge =>
#{ #{
webhook => #{num => 1, num_linked_by_rules => 3}, http => #{num => 1, num_linked_by_rules => 3},
mqtt => #{num => 2, num_linked_by_rules => 2} mqtt => #{num => 2, num_linked_by_rules => 2}
}, },
num_data_bridges => 3 num_data_bridges => 3
@ -811,14 +767,6 @@ setup_fake_rule_engine_data() ->
), ),
ok. ok.
set_special_configs(emqx_auth) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
{ok, _} = emqx:update_config([authorization, sources], []),
ok;
set_special_configs(_App) ->
ok.
%% for some unknown reason, gen_rpc running locally or in CI might %% for some unknown reason, gen_rpc running locally or in CI might
%% start with different `port_discovery' modes, which means that'll %% start with different `port_discovery' modes, which means that'll
%% either be listening at the port in the config (`tcp_server_port', %% either be listening at the port in the config (`tcp_server_port',
@ -887,9 +835,3 @@ leave_cluster() ->
is_official_version(V) -> is_official_version(V) ->
emqx_telemetry_config:is_official_version(V). emqx_telemetry_config:is_official_version(V).
start_apps() ->
emqx_common_test_helpers:start_apps(apps(), fun set_special_configs/1).
stop_apps() ->
emqx_common_test_helpers:stop_apps(lists:reverse(apps())).

View File

@ -2,7 +2,7 @@
{application, emqx_utils, [ {application, emqx_utils, [
{description, "Miscellaneous utilities for EMQX apps"}, {description, "Miscellaneous utilities for EMQX apps"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.11"}, {vsn, "5.0.12"},
{modules, [ {modules, [
emqx_utils, emqx_utils,
emqx_utils_api, emqx_utils_api,

View File

@ -0,0 +1 @@
Fix Redis authorization, authentication, and bridges. Previously connections to Redis servers could not be established because driver was not properly loaded.

42
changes/e5.3.2.en.md Normal file
View File

@ -0,0 +1,42 @@
# e5.3.2
## Enhancements
- [#11752](https://github.com/emqx/emqx/pull/11752) Changed default RPC driver from `gen_rpc` to `rpc` for core-replica database synchronization.
This improves core-replica data replication latency.
- [#11785](https://github.com/emqx/emqx/pull/11785) Allowed users with the "Viewer" role to change their own passwords. However, those with the "Viewer" role do not have permission to change the passwords of other users.
- [#11787](https://github.com/emqx/emqx/pull/11787) Improved the performance of the `emqx` command.
- [#11790](https://github.com/emqx/emqx/pull/11790) Added validation to Redis commands in Redis authorization source.
Additionally, this improvement refines the parsing of Redis commands during authentication and authorization processes. The parsing now aligns with `redis-cli` compatibility standards and supports quoted arguments.
- [#11541](https://github.com/emqx/emqx/pull/11541) Enhanced file transfer capabilities. Now, clients can use an asynchronous method for file transfer by sending commands to the `$file-async/...` topic and subscribing to command execution results on the `$file-response/{clientId}` topic. This improvement simplifies the use of the file transfer feature, particularly suitable for clients using MQTT v3.1/v3.1.1 or those employing MQTT bridging. For more details, please refer to [EIP-0021](https://github.com/emqx/eip).
## Bug Fixes
- [#11757](https://github.com/emqx/emqx/pull/11757) Fixed the error response code when downloading non-existent trace files. Now the response returns `404` instead of `500`.
- [#11762](https://github.com/emqx/emqx/pull/11762) Fixed an issue in EMQX's `built_in_database` authorization source. With this update, all Access Control List (ACL) records are completely removed when an authorization source is deleted. This resolves the issue of residual records remaining in the database when re-creating authorization sources.
- [#11771](https://github.com/emqx/emqx/pull/11771) Fixed validation of Bcrypt salt rounds in authentication management through the API/Dashboard.
- [#11780](https://github.com/emqx/emqx/pull/11780) Fixed validation of the `iterations` field of the `pbkdf2` password hashing algorithm. Now, `iterations` must be strictly positive. Previously, it could be set to 0, which led to a nonfunctional authenticator.
- [#11791](https://github.com/emqx/emqx/pull/11791) Fixed an issue in the EMQX CoAP Gateway where heartbeats were not effectively maintaining the connection's active status. This fix ensures that the heartbeat mechanism properly sustains the liveliness of CoAP Gateway connections.
- [#11797](https://github.com/emqx/emqx/pull/11797) Modified HTTP API behavior for APIs managing the `built_in_database` authorization source. They will now return a `404` status code if `built_in_database` is not set as the authorization source, replacing the former `20X` response.
- [#11965](https://github.com/emqx/emqx/pull/11965) Improved the termination of EMQX services to ensure a graceful stop even in the presence of an unavailable MongoDB resource.
- [#11975](https://github.com/emqx/emqx/pull/11975) This fix addresses an issue where redundant error logs were generated due to a race condition during simultaneous socket closure by a peer and the server. Previously, concurrent socket close events triggered by the operating system and EMQX resulted in unnecessary error logging. The implemented fix improves event handling to eliminate unnecessary error messages.
- [#11987](https://github.com/emqx/emqx/pull/11987) Fixed a bug where attempting to set the `active_n` option on a TCP/SSL socket could lead to a connection crash.
The problem occurred if the socket had already been closed by the time the connection process attempted to apply the `active_n` setting, resulting in a `case_clause` crash.
- [#11731](https://github.com/emqx/emqx/pull/11731) Added hot configuration support for the file transfer feature.
- [#11754](https://github.com/emqx/emqx/pull/11754) Improved the log formatting specifically for the Postgres bridge in EMQX. It addresses issues related to Unicode characters in error messages returned by the driver.

38
changes/v5.3.2.en.md Normal file
View File

@ -0,0 +1,38 @@
# v5.3.2
## Enhancements
- [#11725](https://github.com/emqx/emqx/pull/11725) Introduced the LDAP as a new authentication and authorization backend.
- [#11752](https://github.com/emqx/emqx/pull/11752) Changed default RPC driver from `gen_rpc` to `rpc` for core-replica database synchronization.
This improves core-replica data replication latency.
- [#11785](https://github.com/emqx/emqx/pull/11785) Allowed users with the "Viewer" role to change their own passwords. However, those with the "Viewer" role do not have permission to change the passwords of other users.
- [#11787](https://github.com/emqx/emqx/pull/11787) Improved the performance of the `emqx` command.
- [#11790](https://github.com/emqx/emqx/pull/11790) Added validation to Redis commands in Redis authorization source.
Additionally, this improvement refines the parsing of Redis commands during authentication and authorization processes. The parsing now aligns with `redis-cli` compatibility standards and supports quoted arguments.
## Bug Fixes
- [#11757](https://github.com/emqx/emqx/pull/11757) Fixed the error response code when downloading non-existent trace files. Now the response returns `404` instead of `500`.
- [#11762](https://github.com/emqx/emqx/pull/11762) Fixed an issue in EMQX's `built_in_database` authorization source. With this update, all Access Control List (ACL) records are completely removed when an authorization source is deleted. This resolves the issue of residual records remaining in the database when re-creating authorization sources.
- [#11771](https://github.com/emqx/emqx/pull/11771) Fixed validation of Bcrypt salt rounds in authentication management through the API/Dashboard.
- [#11780](https://github.com/emqx/emqx/pull/11780) Fixed validation of the `iterations` field of the `pbkdf2` password hashing algorithm. Now, `iterations` must be strictly positive. Previously, it could be set to 0, which led to a nonfunctional authenticator.
- [#11791](https://github.com/emqx/emqx/pull/11791) Fixed an issue in the EMQX CoAP Gateway where heartbeats were not effectively maintaining the connection's active status. This fix ensures that the heartbeat mechanism properly sustains the liveliness of CoAP Gateway connections.
- [#11797](https://github.com/emqx/emqx/pull/11797) Modified HTTP API behavior for APIs managing the `built_in_database` authorization source. They will now return a `404` status code if `built_in_database` is not set as the authorization source, replacing the former `20X` response.
- [#11965](https://github.com/emqx/emqx/pull/11965) Improved the termination of EMQX services to ensure a graceful stop even in the presence of an unavailable MongoDB resource.
- [#11975](https://github.com/emqx/emqx/pull/11975) This fix addresses an issue where redundant error logs were generated due to a race condition during simultaneous socket closure by a peer and the server. Previously, concurrent socket close events triggered by the operating system and EMQX resulted in unnecessary error logging. The implemented fix improves event handling to eliminate unnecessary error messages.
- [#11987](https://github.com/emqx/emqx/pull/11987) Fixed a bug where attempting to set the `active_n` option on a TCP/SSL socket could lead to a connection crash.
The problem occurred if the socket had already been closed by the time the connection process attempted to apply the `active_n` setting, resulting in a `case_clause` crash.

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes # This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version. # to the chart and its templates, including the app version.
version: 5.3.2-alpha.1 version: 5.3.2
# This is the version number of the application being deployed. This version number should be # This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. # incremented each time you make changes to the application.
appVersion: 5.3.2-alpha.1 appVersion: 5.3.2

View File

@ -18,10 +18,10 @@ config_direction.desc:
config_direction.label: config_direction.label:
"""Bridge Direction""" """Bridge Direction"""
config_enable.desc: config_enable_bridge.desc:
"""Enable or disable this bridge""" """Enable or disable this bridge"""
config_enable.label: config_enable_bridge.label:
"""Enable Or Disable Bridge""" """Enable Or Disable Bridge"""
config_headers.desc: config_headers.desc:
@ -71,6 +71,21 @@ is not allowed."""
config_url.label: config_url.label:
"""HTTP Bridge""" """HTTP Bridge"""
config_path.desc:
"""The URL path for this Action.<br/>
This path will be appended to the Connector's <code>url</code> configuration to form the full
URL address.
Template with variables is allowed in this option. For example, <code>/room/{$room_no}</code>"""
config_path.label:
"""URL Path"""
config_parameters_opts.desc:
"""The parameters for HTTP action."""
config_parameters_opts.label:
"""Parameters"""
desc_config.desc: desc_config.desc:
"""Configuration for an HTTP bridge.""" """Configuration for an HTTP bridge."""