Merge remote-tracking branch 'origin/release-v43' into release-v44

This commit is contained in:
Zaiming (Stone) Shi 2022-11-04 13:28:12 +01:00
commit 2062911e3b
18 changed files with 454 additions and 204 deletions

View File

@ -98,7 +98,7 @@ jobs:
- uses: actions/upload-artifact@v3 - uses: actions/upload-artifact@v3
with: with:
name: ${{ matrix.profile }}-windows name: ${{ matrix.profile }}-windows
path: source/_packages/${{ matrix.profile }}/. path: source/_packages/${{ matrix.profile }}/
mac: mac:
needs: prepare needs: prepare
@ -133,7 +133,7 @@ jobs:
- uses: actions/upload-artifact@v3 - uses: actions/upload-artifact@v3
with: with:
name: ${EMQX_NAME}-${{ matrix.otp }} name: ${EMQX_NAME}-${{ matrix.otp }}
path: _packages/${EMQX_NAME}/. path: _packages/${EMQX_NAME}/
linux: linux:
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04
@ -209,7 +209,7 @@ jobs:
- uses: actions/upload-artifact@v1 - uses: actions/upload-artifact@v1
with: with:
name: ${{ matrix.profile }}-${{ matrix.otp }} name: ${{ matrix.profile }}-${{ matrix.otp }}
path: source/_packages/${{ matrix.profile }}/. path: source/_packages/${{ matrix.profile }}/
docker: docker:
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04

View File

@ -10,26 +10,103 @@ on:
pull_request: pull_request:
jobs: jobs:
run_proper_test: prepare:
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04
container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04 container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04
outputs:
ct_apps: ${{ steps.run_find_apps.outputs.ct_apps }}
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v3
- name: set git credentials with:
path: source
fetch-depth: 0
- name: git credentials
run: | run: |
if make emqx-ee --dry-run > /dev/null 2>&1; then if make emqx-ee --dry-run > /dev/null 2>&1; then
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
git config --global credential.helper store git config --global credential.helper store
fi fi
- name: proper - name: find_ct_apps
run: make proper working-directory: source
id: run_find_apps
# emqx_plugin_libs doesn't have a test suite -> excluded from app list
# emqx ct is run independently -> exclude it from the app list
run: |
ct_apps="$(./scripts/find-apps.sh --json | jq -c 'del (.[] | select (. == "apps/emqx_plugin_libs" or . == "emqx"))')"
echo "ct-apps: $ct_apps"
echo "ct_apps=$ct_apps" >> $GITHUB_OUTPUT
- name: get_all_deps
working-directory: source
run: |
make deps-all
./rebar3 as test compile
cd ..
zip -ryq source.zip source/* source/.[^.]*
- uses: actions/upload-artifact@v3
with:
name: source
path: source.zip
run_common_test: eunit_and_proper:
runs-on: ${{ matrix.runs-on }} needs: prepare
runs-on: ubuntu-20.04
container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
task:
- eunit
- proper
steps:
- uses: AutoModality/action-clean@v1
- uses: actions/download-artifact@v3
with:
name: source
path: .
- name: unzip source code
run: unzip -o -q source.zip
# produces eunit.coverdata and proper.coverdata
- name: eunit and proper
working-directory: source
run: make ${{ matrix.task }}
- uses: actions/upload-artifact@v3
with:
name: cover
path: source/_build/test/cover
if-no-files-found: warn
emqx_ct:
needs: prepare
runs-on: ubuntu-20.04
container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04
strategy:
fail-fast: false
steps:
- uses: AutoModality/action-clean@v1
- uses: actions/download-artifact@v3
with:
name: source
path: .
- name: unzip source code
run: unzip -o -q source.zip
# produces emqx-emqx.coverdata
- name: emqx-ct-pipeline
working-directory: source
run: make emqx-ct-pipeline
- uses: actions/upload-artifact@v3
with:
name: cover
path: source/_build/test/cover
if-no-files-found: warn
ct:
needs: prepare
runs-on: ${{ matrix.runs-on }}
strategy:
max-parallel: 12
fail-fast: false
matrix:
app_name: ${{ fromJson(needs.prepare.outputs.ct_apps) }}
runs-on: runs-on:
- aws-amd64 - aws-amd64
- ubuntu-20.04 - ubuntu-20.04
@ -41,13 +118,20 @@ jobs:
- runs-on: aws-amd64 - runs-on: aws-amd64
use-self-hosted: false use-self-hosted: false
steps: steps:
- uses: actions/checkout@v2 - uses: AutoModality/action-clean@v1
- uses: actions/download-artifact@v3
with:
name: source
path: .
- name: unzip source code
run: unzip -q source.zip
# to avoid dirty self-hosted runners # to avoid dirty self-hosted runners
- name: stop containers - name: stop containers
run: | run: |
docker rm -f $(docker ps -qa) || true docker rm -f $(docker ps -qa) || true
docker network rm $(docker network ls -q) || true docker network rm $(docker network ls -q) || true
- name: docker compose up - name: docker compose up
working-directory: source
if: endsWith(github.repository, 'emqx') if: endsWith(github.repository, 'emqx')
env: env:
MYSQL_TAG: 8 MYSQL_TAG: 8
@ -66,7 +150,9 @@ jobs:
-f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \
up -d --build up -d --build
docker exec -i erlang bash -c "git config --global --add safe.directory /emqx"
- name: docker compose up - name: docker compose up
working-directory: source
if: endsWith(github.repository, 'emqx-enterprise') if: endsWith(github.repository, 'emqx-enterprise')
env: env:
MYSQL_TAG: 8 MYSQL_TAG: 8
@ -111,33 +197,62 @@ jobs:
!= $(docker ps -a --filter name=client | wc -l) ]; do != $(docker ps -a --filter name=client | wc -l) ]; do
sleep 5 sleep 5
done done
- name: run eunit
run: |
docker exec -i erlang bash -c "make eunit"
- name: run common test - name: run common test
run: | run: docker exec -i erlang bash -c "make ${{ matrix.app_name }}-ct-pipeline"
docker exec -i erlang bash -c "make ct"
- name: run cover
run: |
printenv > .env
docker exec -i erlang bash -c "git config --global --add safe.directory /emqx"
docker exec -i erlang bash -c "make cover"
docker exec --env-file .env -i erlang bash -c "make coveralls"
- name: cat rebar.crashdump - name: cat rebar.crashdump
if: failure() if: failure()
working-directory: source
run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi run: if [ -f 'rebar3.crashdump' ];then cat 'rebar3.crashdump'; fi
- uses: actions/upload-artifact@v1 - name: set log file name
if: failure()
run: echo "LOGFILENAME=logs-$(echo ${{ matrix.app_name }} | tr '/' '_')" >> $GITHUB_ENV
- uses: actions/upload-artifact@v3
if: failure() if: failure()
with: with:
name: logs name: ${{ env.LOGFILENAME }}
path: _build/test/logs path: source/_build/test/logs
- uses: actions/upload-artifact@v1 if-no-files-found: warn
- uses: actions/upload-artifact@v3
with: with:
name: cover name: cover
path: _build/test/cover path: source/_build/test/cover
if-no-files-found: warn
make_cover:
needs:
- eunit_and_proper
- emqx_ct
- ct
runs-on: ubuntu-20.04
container: ghcr.io/emqx/emqx-builder/4.4-20:24.3.4.2-1-ubuntu20.04
steps:
- uses: AutoModality/action-clean@v1
- uses: actions/download-artifact@v3
with:
name: source
path: .
- name: unzip source code
run: unzip -q source.zip
- uses: actions/download-artifact@v3
name: download cover data
with:
name: cover
path: source/_build/test/cover
- name: make cover
working-directory: source
run: make cover
- name: send to coveralls
working-directory: source
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: make coveralls
- name: get coveralls logs
working-directory: source
if: failure()
run: cat rebar3.crashdump
finish: finish:
needs: run_common_test needs: make_cover
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04
steps: steps:
- name: Coveralls Finished - name: Coveralls Finished

View File

@ -62,6 +62,14 @@ $1-ct: $(REBAR)
endef endef
$(foreach app,$(APPS),$(eval $(call gen-app-ct-target,$(app)))) $(foreach app,$(APPS),$(eval $(call gen-app-ct-target,$(app))))
## app/name-ct-pipeline targets are used in pipeline -> make cover data for each app
.PHONY: $(APPS:%=%-ct-pipeline)
define gen-app-ct-target-pipeline
$1-ct-pipeline: $(REBAR)
$(REBAR) ct --name 'test@127.0.0.1' -c -v --cover_export_name $(PROFILE)-$(subst /,-,$1) --suite $(shell $(CURDIR)/scripts/find-suites.sh $1)
endef
$(foreach app,$(APPS),$(eval $(call gen-app-ct-target-pipeline,$(app))))
## apps/name-prop targets ## apps/name-prop targets
.PHONY: $(APPS:%=%-prop) .PHONY: $(APPS:%=%-prop)
define gen-app-prop-target define gen-app-prop-target

View File

@ -40,6 +40,7 @@
-define(RESOURCE_TYPE_MQTT, 'bridge_mqtt'). -define(RESOURCE_TYPE_MQTT, 'bridge_mqtt').
-define(RESOURCE_TYPE_RPC, 'bridge_rpc'). -define(RESOURCE_TYPE_RPC, 'bridge_rpc').
-define(BAD_TOPIC_WITH_WILDCARD, wildcard_topic_not_allowed_for_publish).
-define(RESOURCE_CONFIG_SPEC_MQTT, #{ -define(RESOURCE_CONFIG_SPEC_MQTT, #{
address => #{ address => #{
@ -494,7 +495,7 @@ on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName,
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
TopicTks = case ForwardTopic == <<"">> of TopicTks = case ForwardTopic == <<"">> of
true -> undefined; true -> undefined;
false -> emqx_rule_utils:preproc_tmpl(ForwardTopic) false -> emqx_rule_utils:preproc_tmpl(assert_topic_valid(ForwardTopic))
end, end,
Opts. Opts.
@ -515,7 +516,7 @@ on_action_data_to_mqtt_broker(Msg, _Env =
qos = QoS, qos = QoS,
from = From, from = From,
flags = Flags, flags = Flags,
topic = Topic1, topic = assert_topic_valid(Topic1),
payload = format_data(PayloadTks, Msg), payload = format_data(PayloadTks, Msg),
timestamp = TimeStamp}, timestamp = TimeStamp},
ecpool:with_client(PoolName, ecpool:with_client(PoolName,
@ -583,7 +584,7 @@ options(Options, PoolName, ResId) ->
Get = fun(Key) -> GetD(Key, undefined) end, Get = fun(Key) -> GetD(Key, undefined) end,
Address = Get(<<"address">>), Address = Get(<<"address">>),
[{max_inflight_batches, 32}, [{max_inflight_batches, 32},
{forward_mountpoint, str(Get(<<"mountpoint">>))}, {forward_mountpoint, str(assert_topic_valid(Get(<<"mountpoint">>)))},
{disk_cache, cuttlefish_flag:parse(str(GetD(<<"disk_cache">>, "off")))}, {disk_cache, cuttlefish_flag:parse(str(GetD(<<"disk_cache">>, "off")))},
{start_type, auto}, {start_type, auto},
{reconnect_delay_ms, cuttlefish_duration:parse(str(Get(<<"reconnect_interval">>)), ms)}, {reconnect_delay_ms, cuttlefish_duration:parse(str(Get(<<"reconnect_interval">>)), ms)},
@ -610,6 +611,12 @@ options(Options, PoolName, ResId) ->
| maybe_ssl(Options, Get(<<"ssl">>), ResId)] | maybe_ssl(Options, Get(<<"ssl">>), ResId)]
end. end.
assert_topic_valid(T) ->
case emqx_topic:wildcard(T) of
true -> throw({?BAD_TOPIC_WITH_WILDCARD, T});
false -> T
end.
maybe_ssl(_Options, false, _ResId) -> maybe_ssl(_Options, false, _ResId) ->
[]; [];
maybe_ssl(Options, true, ResId) -> maybe_ssl(Options, true, ResId) ->

View File

@ -22,7 +22,7 @@ management.default_application.secret = public
## Initialize apps file ## Initialize apps file
## Is used to add administrative app/secrets when EMQX is launched for the first time. ## Is used to add administrative app/secrets when EMQX is launched for the first time.
## This config will not take any effect once EMQX database is populated with the provided apps. ## This config will not take any effect once EMQX database has one or more apps.
## The file content format is as below: ## The file content format is as below:
## ``` ## ```
##819e5db182cf:l9C5suZClIF3FvdzWqmINrVU61WNfIjcglxw9CVM7y1VI ##819e5db182cf:l9C5suZClIF3FvdzWqmINrVU61WNfIjcglxw9CVM7y1VI

View File

@ -32,8 +32,11 @@ init_per_suite(Cfg) ->
ok = emqx_dashboard_admin:mnesia(boot), ok = emqx_dashboard_admin:mnesia(boot),
application:load(emqx_modules), application:load(emqx_modules),
application:load(emqx_bridge_mqtt), application:load(emqx_bridge_mqtt),
ekka_mnesia:start(),
emqx_dashboard_admin:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]), emqx_ct_helpers:start_apps([emqx_rule_engine, emqx_management]),
application:ensure_all_started(emqx_dashboard), application:ensure_all_started(emqx_dashboard),
ok = emqx_rule_engine:load_providers(),
Cfg. Cfg.
end_per_suite(Cfg) -> end_per_suite(Cfg) ->

View File

@ -41,6 +41,8 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_) -> end_per_suite(_) ->
ok = application:unset_env(emqx_management, bootstrap_apps_file),
_ = mnesia:clear_table(mqtt_app),
emqx_ct_helpers:stop_apps([]), emqx_ct_helpers:stop_apps([]),
ok. ok.

View File

@ -22,6 +22,8 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-define(BAD_TOPIC_WITH_WILDCARD, wildcard_topic_not_allowed_for_publish).
-define(REPUBLISH_PARAMS_SPEC, #{ -define(REPUBLISH_PARAMS_SPEC, #{
target_topic => #{ target_topic => #{
order => 1, order => 1,
@ -163,7 +165,7 @@ on_action_create_republish(Id, Params = #{
}) -> }) ->
TargetRetain = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)), TargetRetain = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)),
TargetQoS = to_qos(TargetQoS0), TargetQoS = to_qos(TargetQoS0),
TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic), TopicTks = emqx_rule_utils:preproc_tmpl(assert_topic_valid(TargetTopic)),
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
Params. Params.
@ -201,7 +203,7 @@ on_action_republish(Selected, _Envs = #{
from = ActId, from = ActId,
flags = Flags#{retain => get_retain(TargetRetain, Selected)}, flags = Flags#{retain => get_retain(TargetRetain, Selected)},
headers = #{republish_by => ActId}, headers = #{republish_by => ActId},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), topic = assert_topic_valid(emqx_rule_utils:proc_tmpl(TopicTks, Selected)),
payload = format_msg(PayloadTks, Selected), payload = format_msg(PayloadTks, Selected),
timestamp = Timestamp timestamp = Timestamp
}, },
@ -226,7 +228,7 @@ on_action_republish(Selected, _Envs = #{
from = ActId, from = ActId,
flags = #{dup => false, retain => get_retain(TargetRetain, Selected)}, flags = #{dup => false, retain => get_retain(TargetRetain, Selected)},
headers = #{republish_by => ActId}, headers = #{republish_by => ActId},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), topic = assert_topic_valid(emqx_rule_utils:proc_tmpl(TopicTks, Selected)),
payload = format_msg(PayloadTks, Selected), payload = format_msg(PayloadTks, Selected),
timestamp = erlang:system_time(millisecond) timestamp = erlang:system_time(millisecond)
}, },
@ -270,6 +272,12 @@ get_qos(-1, _Data, Default) -> Default;
get_qos(TargetQoS, Data, _Default) -> get_qos(TargetQoS, Data, _Default) ->
qos(emqx_rule_utils:replace_var(TargetQoS, Data)). qos(emqx_rule_utils:replace_var(TargetQoS, Data)).
assert_topic_valid(T) ->
case emqx_topic:wildcard(T) of
true -> throw({?BAD_TOPIC_WITH_WILDCARD, T});
false -> T
end.
qos(<<"0">>) -> 0; qos(<<"0">>) -> 0;
qos(<<"1">>) -> 1; qos(<<"1">>) -> 1;
qos(<<"2">>) -> 2; qos(<<"2">>) -> 2;

View File

@ -69,3 +69,5 @@
- Make sure Rule-Engine API supports Percent-encoding `rule_id` and `resource_id` in HTTP request path [#9190](https://github.com/emqx/emqx/pull/9190). - Make sure Rule-Engine API supports Percent-encoding `rule_id` and `resource_id` in HTTP request path [#9190](https://github.com/emqx/emqx/pull/9190).
Note that the `id` in `POST /api/v4/rules` should be literals (not encoded) when creating a `rule` or `resource`. Note that the `id` in `POST /api/v4/rules` should be literals (not encoded) when creating a `rule` or `resource`.
See docs [Create Rule](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) [Create Resource](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources). See docs [Create Rule](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) [Create Resource](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources).
- When republishing messages or bridge messages to other brokers, check the validity of the topic and make sure it does not have topic wildcards [#9291](https://github.com/emqx/emqx/pull/9291).

View File

@ -63,3 +63,5 @@
- 使规则引擎 API 在 HTTP 请求路径中支持百分号编码的 `rule_id``resource_id` [#9190](https://github.com/emqx/emqx/pull/9190)。 - 使规则引擎 API 在 HTTP 请求路径中支持百分号编码的 `rule_id``resource_id` [#9190](https://github.com/emqx/emqx/pull/9190)。
注意在创建规则或资源时HTTP body 中的 `id` 字段仍为字面值,而不是编码之后的值。 注意在创建规则或资源时HTTP body 中的 `id` 字段仍为字面值,而不是编码之后的值。
详情请参考 [创建规则](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) 和 [创建资源](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources)。 详情请参考 [创建规则](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) 和 [创建资源](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources)。
- 在进行消息重发布或桥接消息到其他 mqtt broker 时,检查 topic 合法性,确定其不带有主题通配符 [#9291](https://github.com/emqx/emqx/pull/9291)。

View File

@ -23,6 +23,18 @@
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
%% do not start the application
%% only testing the root supervisor in this suite
application:stop(emqx_modules),
{ok, Pid} = emqx_mod_sup:start_link(),
unlink(Pid),
Config.
end_per_suite(_Config) ->
exit(whereis(emqx_mod_sup), kill),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases %% Test cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -5,6 +5,29 @@ set -euo pipefail
# ensure dir # ensure dir
cd -P -- "$(dirname -- "$0")/.." cd -P -- "$(dirname -- "$0")/.."
help() {
echo
echo "-h|--help: To display this usage info"
echo "--json: Print apps in json"
}
WANT_JSON='no'
while [ "$#" -gt 0 ]; do
case $1 in
-h|--help)
help
exit 0
;;
--json)
WANT_JSON='yes'
shift 1
;;
*)
echo "unknown option $1"
exit 1
;;
esac
done
if [ "$(./scripts/get-distro.sh)" = 'windows' ]; then if [ "$(./scripts/get-distro.sh)" = 'windows' ]; then
# Otherwise windows may resolve to find.exe # Otherwise windows may resolve to find.exe
FIND="/usr/bin/find" FIND="/usr/bin/find"
@ -17,17 +40,26 @@ find_app() {
"$FIND" "${appdir}" -mindepth 1 -maxdepth 1 -type d "$FIND" "${appdir}" -mindepth 1 -maxdepth 1 -type d
} }
# append emqx application first EM="emqx"
echo 'emqx' CE="$(find_app 'apps')"
find_app 'apps'
if [ -f 'EMQX_ENTERPRISE' ]; then if [ -f 'EMQX_ENTERPRISE' ]; then
find_app 'lib-ee' LIB="$(find_app 'lib-ee')"
else else
find_app 'lib-ce' LIB="$(find_app 'lib-ce')"
fi fi
## find directories in lib-extra ## find directories in lib-extra
find_app 'lib-extra' LIBE="$(find_app 'lib-extra')"
## find symlinks in lib-extra ## find symlinks in lib-extra
"$FIND" 'lib-extra' -mindepth 1 -maxdepth 1 -type l -exec test -e {} \; -print LIBES="$("$FIND" 'lib-extra' -mindepth 1 -maxdepth 1 -type l -exec test -e {} \; -print)"
APPS_ALL="$(echo -e "${EM}\n${CE}\n${LIB}\n${LIBE}\n${LIBES}")"
if [ "$WANT_JSON" = 'yes' ]; then
echo "${APPS_ALL}" | xargs | tr -d '\n' | jq -R -s -c 'split(" ")'
else
echo "${APPS_ALL}"
fi

View File

@ -825,6 +825,7 @@ t_enrich_connack_caps(_) ->
wildcard_subscription => true wildcard_subscription => true
} }
end), end),
try
AckProps = emqx_channel:enrich_connack_caps(#{}, channel()), AckProps = emqx_channel:enrich_connack_caps(#{}, channel()),
?assertMatch(#{'Retain-Available' := 1, ?assertMatch(#{'Retain-Available' := 1,
'Maximum-Packet-Size' := 1024, 'Maximum-Packet-Size' := 1024,
@ -832,8 +833,10 @@ t_enrich_connack_caps(_) ->
'Wildcard-Subscription-Available' := 1, 'Wildcard-Subscription-Available' := 1,
'Subscription-Identifier-Available' := 1, 'Subscription-Identifier-Available' := 1,
'Shared-Subscription-Available' := 1 'Shared-Subscription-Available' := 1
}, AckProps), }, AckProps)
ok = meck:unload(emqx_mqtt_caps). after
ok = meck:unload(emqx_mqtt_caps)
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases for terminate %% Test cases for terminate

View File

@ -52,7 +52,7 @@ t_check_sub(_) ->
wildcard_subscription => false wildcard_subscription => false
}, },
emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps), emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps),
timer:sleep(50), try
ClientInfo = #{zone => zone}, ClientInfo = #{zone => zone},
ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts), ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts),
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
@ -60,5 +60,7 @@ t_check_sub(_) ->
?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}, ?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)), emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)),
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true})), emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true}))
emqx_zone:unset_env(zone, '$mqtt_pub_caps'). after
emqx_zone:unset_env(zone, '$mqtt_pub_caps')
end.

View File

@ -468,8 +468,8 @@ t_connack_max_qos_allowed(_) ->
%% max_qos_allowed = 0 %% max_qos_allowed = 0
emqx_zone:set_env(external, max_qos_allowed, 0), emqx_zone:set_env(external, max_qos_allowed, 0),
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), emqx_zone:unset_env(external, '$mqtt_caps'),
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), emqx_zone:unset_env(external, '$mqtt_pub_caps'),
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
{ok, Connack1} = emqtt:connect(Client1), {ok, Connack1} = emqtt:connect(Client1),
@ -496,8 +496,8 @@ t_connack_max_qos_allowed(_) ->
%% max_qos_allowed = 1 %% max_qos_allowed = 1
emqx_zone:set_env(external, max_qos_allowed, 1), emqx_zone:set_env(external, max_qos_allowed, 1),
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), emqx_zone:unset_env(external, '$mqtt_caps'),
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), emqx_zone:unset_env(external, '$mqtt_pub_caps'),
{ok, Client3} = emqtt:start_link([{proto_ver, v5}]), {ok, Client3} = emqtt:start_link([{proto_ver, v5}]),
{ok, Connack3} = emqtt:connect(Client3), {ok, Connack3} = emqtt:connect(Client3),
@ -524,8 +524,8 @@ t_connack_max_qos_allowed(_) ->
%% max_qos_allowed = 2 %% max_qos_allowed = 2
emqx_zone:set_env(external, max_qos_allowed, 2), emqx_zone:set_env(external, max_qos_allowed, 2),
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), emqx_zone:unset_env(external, '$mqtt_caps'),
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), emqx_zone:unset_env(external, '$mqtt_pub_caps'),
{ok, Client5} = emqtt:start_link([{proto_ver, v5}]), {ok, Client5} = emqtt:start_link([{proto_ver, v5}]),
{ok, Connack5} = emqtt:connect(Client5), {ok, Connack5} = emqtt:connect(Client5),

View File

@ -32,6 +32,7 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:stop_apps([]). emqx_ct_helpers:stop_apps([]).
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->

View File

@ -51,7 +51,7 @@ init_per_suite(Config) ->
PortDiscovery = application:get_env(gen_rpc, port_discovery), PortDiscovery = application:get_env(gen_rpc, port_discovery),
application:set_env(gen_rpc, port_discovery, stateless), application:set_env(gen_rpc, port_discovery, stateless),
application:ensure_all_started(gen_rpc), application:ensure_all_started(gen_rpc),
%% ensure emqx_moduels' app modules are loaded %% ensure emqx_modules app modules are loaded
%% so the mnesia tables are created %% so the mnesia tables are created
ok = load_app(emqx_modules), ok = load_app(emqx_modules),
emqx_ct_helpers:start_apps([]), emqx_ct_helpers:start_apps([]),

View File

@ -32,25 +32,44 @@
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]), emqx_ct_helpers:start_apps([]),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]), emqx_ct_helpers:stop_apps([]),
ok. ok.
init_per_testcase(Case, Config) ->
?MODULE:Case({'init', Config}).
end_per_testcase(Case, Config) ->
?MODULE:Case({'end', Config}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Testcases %% Testcases
t_takeover(_) -> t_takeover({init, Config}) when is_list(Config) ->
process_flag(trap_exit, true), Config;
t_takeover({'end', Config}) when is_list(Config) ->
ok;
t_takeover(Config) when is_list(Config) ->
AllMsgs = messages(?CNT), AllMsgs = messages(?CNT),
Pos = rand:uniform(?CNT), Pos = rand:uniform(?CNT),
ClientId = random_clientid(),
ClientId = <<"clientid">>, ClientOpts = [{clientid, ClientId},
{ok, C1} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]), {clean_start, false},
{ok, _} = emqtt:connect(C1), {host, "127.0.0.1"},
{port, 1883}
],
C1 =
with_retry(
fun() ->
{ok, C} = emqtt:start_link(ClientOpts),
{ok, _} = emqtt:connect(C),
C
end, 5),
emqtt:subscribe(C1, <<"t">>, 1), emqtt:subscribe(C1, <<"t">>, 1),
spawn(fun() -> spawn(fun() ->
[begin [begin
emqx:publish(lists:nth(I, AllMsgs)), emqx:publish(lists:nth(I, AllMsgs)),
@ -59,31 +78,65 @@ t_takeover(_) ->
end), end),
emqtt:pause(C1), emqtt:pause(C1),
timer:sleep(?CNT*10), timer:sleep(?CNT*10),
load_meck(ClientId), load_meck(ClientId),
try
spawn(fun() -> spawn(fun() ->
[begin [begin
emqx:publish(lists:nth(I, AllMsgs)), emqx:publish(lists:nth(I, AllMsgs)),
timer:sleep(rand:uniform(10)) timer:sleep(rand:uniform(10))
end || I <- lists:seq(Pos+1, ?CNT)] end || I <- lists:seq(Pos+1, ?CNT)]
end), end),
{ok, C2} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]), {ok, C2} = emqtt:start_link(ClientOpts),
%% C1 is going down, unlink it so the test can continue to run
_ = monitor(process, C1),
?assert(erlang:is_process_alive(C1)),
unlink(C1),
{ok, _} = emqtt:connect(C2), {ok, _} = emqtt:connect(C2),
receive
{'DOWN', _, process, C1, _} ->
ok
after 1000 ->
ct:fail("timedout_waiting_for_old_connection_shutdown")
end,
Received = all_received_publishs(), Received = all_received_publishs(),
ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]), ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]),
assert_messages_missed(AllMsgs, Received), assert_messages_missed(AllMsgs, Received),
assert_messages_order(AllMsgs, Received), assert_messages_order(AllMsgs, Received),
kill_process(C2, fun emqtt:stop/1)
emqtt:disconnect(C2), after
unload_meck(ClientId). unload_meck(ClientId)
end.
t_takover_in_cluster(_) ->
todo.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helpers %% Helpers
random_clientid() ->
iolist_to_binary(["clientid", "-", integer_to_list(erlang:system_time())]).
kill_process(Pid, WithFun) ->
_ = unlink(Pid),
_ = monitor(process, Pid),
try WithFun(Pid)
catch _:_ -> ok
end,
receive
{'DOWN', _, process, Pid, _} ->
ok
after 10_000 ->
exit(Pid, kill),
error(timeout)
end.
with_retry(Fun, 1) -> Fun();
with_retry(Fun, N) when N > 1 ->
try
Fun()
catch
_ : _ ->
ct:sleep(1000),
with_retry(Fun, N - 1)
end.
load_meck(ClientId) -> load_meck(ClientId) ->
meck:new(fake_conn_mod, [non_strict]), meck:new(fake_conn_mod, [non_strict]),
HookTakeover = fun(Pid, Msg = {takeover, 'begin'}) -> HookTakeover = fun(Pid, Msg = {takeover, 'begin'}) ->