Merge pull request #13037 from ieQu1/dev/merge-release57-240513

Sync release-57 to the master
This commit is contained in:
ieQu1 2024-05-14 10:50:30 +02:00 committed by GitHub
commit f663373c57
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
45 changed files with 1520 additions and 527 deletions

View File

@ -14,9 +14,6 @@ on:
env: env:
IS_CI: "yes" IS_CI: "yes"
permissions:
contents: read
jobs: jobs:
sanity-checks: sanity-checks:
runs-on: ubuntu-22.04 runs-on: ubuntu-22.04
@ -32,6 +29,9 @@ jobs:
otp_vsn: "26.2.1-2" otp_vsn: "26.2.1-2"
elixir_vsn: "1.15.7" elixir_vsn: "1.15.7"
permissions:
contents: read
steps: steps:
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 - uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2
with: with:
@ -127,6 +127,9 @@ jobs:
- emqx - emqx
- emqx-enterprise - emqx-enterprise
permissions:
contents: read
steps: steps:
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 - uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2
with: with:

View File

@ -13,9 +13,10 @@ on:
- 'master' - 'master'
- 'release-5[0-9]' - 'release-5[0-9]'
- 'ci/**' - 'ci/**'
workflow_dispatch:
permissions: inputs:
contents: read ref:
required: false
env: env:
IS_CI: 'yes' IS_CI: 'yes'
@ -36,6 +37,9 @@ jobs:
otp_vsn: '26.2.1-2' otp_vsn: '26.2.1-2'
elixir_vsn: '1.15.7' elixir_vsn: '1.15.7'
permissions:
contents: read
steps: steps:
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 - uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2
with: with:
@ -132,6 +136,9 @@ jobs:
- emqx - emqx
- emqx-enterprise - emqx-enterprise
permissions:
contents: read
steps: steps:
- uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 - uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2
with: with:

View File

@ -142,14 +142,14 @@ jobs:
- name: Login to hub.docker.com - name: Login to hub.docker.com
uses: docker/login-action@e92390c5fb421da1463c202d546fed0ec5c39f20 # v3.1.0 uses: docker/login-action@e92390c5fb421da1463c202d546fed0ec5c39f20 # v3.1.0
if: inputs.publish || github.repository_owner != 'emqx' if: inputs.publish && contains(matrix.profile[1], 'docker.io')
with: with:
username: ${{ secrets.DOCKER_HUB_USER }} username: ${{ secrets.DOCKER_HUB_USER }}
password: ${{ secrets.DOCKER_HUB_TOKEN }} password: ${{ secrets.DOCKER_HUB_TOKEN }}
- name: Login to AWS ECR - name: Login to AWS ECR
uses: docker/login-action@e92390c5fb421da1463c202d546fed0ec5c39f20 # v3.1.0 uses: docker/login-action@e92390c5fb421da1463c202d546fed0ec5c39f20 # v3.1.0
if: inputs.publish || github.repository_owner != 'emqx' if: inputs.publish && contains(matrix.profile[1], 'public.ecr.aws')
with: with:
registry: public.ecr.aws registry: public.ecr.aws
username: ${{ secrets.AWS_ACCESS_KEY_ID }} username: ${{ secrets.AWS_ACCESS_KEY_ID }}

View File

@ -10,6 +10,7 @@ permissions:
jobs: jobs:
analyze: analyze:
if: github.repository == 'emqx/emqx'
name: Analyze name: Analyze
runs-on: ubuntu-22.04 runs-on: ubuntu-22.04
timeout-minutes: 360 timeout-minutes: 360

View File

@ -30,9 +30,10 @@ jobs:
shell: bash shell: bash
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_REPO: ${{ github.repository }}
run: | run: |
gh api --method GET -f head_sha=$(git rev-parse HEAD) -f status=completed -f exclude_pull_requests=true /repos/emqx/emqx/actions/runs > runs.json gh api --method GET -f head_sha=$(git rev-parse HEAD) -f status=completed -f exclude_pull_requests=true /repos/${GITHUB_REPO}/actions/runs > runs.json
for id in $(jq -r '.workflow_runs[] | select((."conclusion" == "failure") and (."name" != "Keep master green") and .run_attempt < 3) | .id' runs.json); do for id in $(jq -r '.workflow_runs[] | select((."conclusion" == "failure") and (."name" != "Keep master green") and .run_attempt < 3) | .id' runs.json); do
echo "rerun https://github.com/emqx/emqx/actions/runs/$id" echo "rerun https://github.com/${GITHUB_REPO}/actions/runs/$id"
gh api --method POST /repos/emqx/emqx/actions/runs/$id/rerun-failed-jobs || true gh api --method POST /repos/${GITHUB_REPO}/actions/runs/$id/rerun-failed-jobs || true
done done

View File

@ -20,9 +20,6 @@ on:
required: true required: true
type: string type: string
permissions:
contents: read
env: env:
IS_CI: "yes" IS_CI: "yes"
@ -40,35 +37,39 @@ jobs:
shell: bash shell: bash
container: "ghcr.io/emqx/emqx-builder/${{ matrix.builder }}:${{ matrix.elixir }}-${{ matrix.otp }}-ubuntu22.04" container: "ghcr.io/emqx/emqx-builder/${{ matrix.builder }}:${{ matrix.elixir }}-${{ matrix.otp }}-ubuntu22.04"
env:
PROFILE: ${{ matrix.profile }}
ENABLE_COVER_COMPILE: 1
CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }}
permissions:
contents: read
steps: steps:
- uses: actions/download-artifact@c850b930e6ba138125429b7e5c93fc707a7f8427 # v4.1.4 - uses: actions/download-artifact@c850b930e6ba138125429b7e5c93fc707a7f8427 # v4.1.4
with: with:
name: ${{ matrix.profile }} name: ${{ matrix.profile }}
- name: extract artifact - name: extract artifact
run: | run: |
unzip -o -q ${{ matrix.profile }}.zip unzip -o -q ${{ matrix.profile }}.zip
git config --global --add safe.directory "$GITHUB_WORKSPACE" git config --global --add safe.directory "$GITHUB_WORKSPACE"
# produces eunit.coverdata # produces eunit.coverdata
- name: eunit - run: make eunit
env:
PROFILE: ${{ matrix.profile }}
ENABLE_COVER_COMPILE: 1
CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }}
run: make eunit
# produces proper.coverdata # produces proper.coverdata
- name: proper - run: make proper
env:
PROFILE: ${{ matrix.profile }}
ENABLE_COVER_COMPILE: 1
CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }}
run: make proper
- uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 - run: make cover
with:
name: coverdata-${{ matrix.profile }}-${{ matrix.otp }} - name: send to coveralls
path: _build/test/cover if: github.repository == 'emqx/emqx'
retention-days: 7 env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: make coveralls
- run: cat rebar3.crashdump
if: failure()
ct_docker: ct_docker:
runs-on: ${{ github.repository_owner == 'emqx' && fromJSON('["self-hosted","ephemeral","linux","x64"]') || 'ubuntu-22.04' }} runs-on: ${{ github.repository_owner == 'emqx' && fromJSON('["self-hosted","ephemeral","linux","x64"]') || 'ubuntu-22.04' }}
@ -82,6 +83,12 @@ jobs:
run: run:
shell: bash shell: bash
env:
PROFILE: ${{ matrix.profile }}
permissions:
contents: read
steps: steps:
- uses: actions/download-artifact@c850b930e6ba138125429b7e5c93fc707a7f8427 # v4.1.4 - uses: actions/download-artifact@c850b930e6ba138125429b7e5c93fc707a7f8427 # v4.1.4
with: with:
@ -89,7 +96,6 @@ jobs:
- name: extract artifact - name: extract artifact
run: | run: |
unzip -o -q ${{ matrix.profile }}.zip unzip -o -q ${{ matrix.profile }}.zip
git config --global --add safe.directory "$GITHUB_WORKSPACE"
# produces $PROFILE-<app-name>-<otp-vsn>-sg<suitegroup>.coverdata # produces $PROFILE-<app-name>-<otp-vsn>-sg<suitegroup>.coverdata
- name: run common tests - name: run common tests
@ -103,19 +109,30 @@ jobs:
TDENGINE_TAG: "3.0.2.4" TDENGINE_TAG: "3.0.2.4"
OPENTS_TAG: "9aa7f88" OPENTS_TAG: "9aa7f88"
MINIO_TAG: "RELEASE.2023-03-20T20-16-18Z" MINIO_TAG: "RELEASE.2023-03-20T20-16-18Z"
PROFILE: ${{ matrix.profile }}
SUITEGROUP: ${{ matrix.suitegroup }} SUITEGROUP: ${{ matrix.suitegroup }}
ENABLE_COVER_COMPILE: 1 ENABLE_COVER_COMPILE: 1
CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }}-sg${{ matrix.suitegroup }} CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }}-sg${{ matrix.suitegroup }}
run: ./scripts/ct/run.sh --ci --app ${{ matrix.app }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 run: ./scripts/ct/run.sh --ci --app ${{ matrix.app }} --keep-up
with:
name: coverdata-${{ matrix.profile }}-${{ matrix.prefix }}-${{ matrix.otp }}-sg${{ matrix.suitegroup }} - name: make cover
path: _build/test/cover run: |
retention-days: 7 docker exec -e PROFILE="$PROFILE" -t erlang make cover
- name: send to coveralls
if: github.repository == 'emqx/emqx'
run: |
ls _build/test/cover/*.coverdata || exit 0
docker exec -e PROFILE="$PROFILE" -t erlang make coveralls
- name: rebar3.crashdump
if: failure()
run: cat rebar3.crashdump
- name: compress logs - name: compress logs
if: failure() if: failure()
run: tar -czf logs.tar.gz _build/test/logs run: tar -czf logs.tar.gz _build/test/logs
- uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 - uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1
if: failure() if: failure()
with: with:
@ -137,6 +154,15 @@ jobs:
run: run:
shell: bash shell: bash
permissions:
contents: read
env:
PROFILE: ${{ matrix.profile }}
SUITEGROUP: ${{ matrix.suitegroup }}
ENABLE_COVER_COMPILE: 1
CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }}-sg${{ matrix.suitegroup }}
steps: steps:
- uses: actions/download-artifact@c850b930e6ba138125429b7e5c93fc707a7f8427 # v4.1.4 - uses: actions/download-artifact@c850b930e6ba138125429b7e5c93fc707a7f8427 # v4.1.4
with: with:
@ -148,22 +174,25 @@ jobs:
# produces $PROFILE-<app-name>-<otp-vsn>-sg<suitegroup>.coverdata # produces $PROFILE-<app-name>-<otp-vsn>-sg<suitegroup>.coverdata
- name: run common tests - name: run common tests
run: make "${{ matrix.app }}-ct"
- run: make cover
- name: send to coveralls
if: github.repository == 'emqx/emqx'
env: env:
PROFILE: ${{ matrix.profile }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SUITEGROUP: ${{ matrix.suitegroup }}
ENABLE_COVER_COMPILE: 1
CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }}-sg${{ matrix.suitegroup }}
run: | run: |
make "${{ matrix.app }}-ct" ls _build/test/cover/*.coverdata || exit 0
- uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 make coveralls
with:
name: coverdata-${{ matrix.profile }}-${{ matrix.prefix }}-${{ matrix.otp }}-sg${{ matrix.suitegroup }} - run: cat rebar3.crashdump
path: _build/test/cover if: failure()
if-no-files-found: warn # do not fail if no coverdata found
retention-days: 7
- name: compress logs - name: compress logs
if: failure() if: failure()
run: tar -czf logs.tar.gz _build/test/logs run: tar -czf logs.tar.gz _build/test/logs
- uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 - uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1
if: failure() if: failure()
with: with:
@ -180,61 +209,18 @@ jobs:
runs-on: ubuntu-22.04 runs-on: ubuntu-22.04
strategy: strategy:
fail-fast: false fail-fast: false
permissions:
pull-requests: write
steps: steps:
- name: Coveralls finished
if: github.repository == 'emqx/emqx'
uses: coverallsapp/github-action@3dfc5567390f6fa9267c0ee9c251e4c8c3f18949 # v2.2.3
with:
parallel-finished: true
git-branch: ${{ github.ref }}
git-commit: ${{ github.sha }}
- run: echo "All tests passed" - run: echo "All tests passed"
make_cover:
needs:
- eunit_and_proper
- ct
- ct_docker
runs-on: ${{ endsWith(github.repository, '/emqx') && 'ubuntu-22.04' || fromJSON('["self-hosted","ephemeral","linux","x64"]') }}
container: ${{ inputs.builder }}
strategy:
fail-fast: false
matrix:
profile:
- emqx-enterprise
steps:
- uses: actions/download-artifact@c850b930e6ba138125429b7e5c93fc707a7f8427 # v4.1.4
with:
name: ${{ matrix.profile }}
- name: extract artifact
run: |
unzip -o -q ${{ matrix.profile }}.zip
git config --global --add safe.directory "$GITHUB_WORKSPACE"
- uses: actions/download-artifact@c850b930e6ba138125429b7e5c93fc707a7f8427 # v4.1.4
name: download coverdata
with:
pattern: coverdata-${{ matrix.profile }}-*
path: _build/test/cover
merge-multiple: true
- name: make cover
env:
PROFILE: emqx-enterprise
run: make cover
- name: send to coveralls
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
PROFILE: emqx-enterprise
run: make coveralls
- name: get coveralls logs
if: failure()
run: cat rebar3.crashdump
# do this in a separate job
upload_coverdata:
needs: make_cover
runs-on: ubuntu-22.04
steps:
- name: Coveralls Finished
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
curl -v -k https://coveralls.io/webhook \
--header "Content-Type: application/json" \
--data "{\"repo_name\":\"$GITHUB_REPOSITORY\",\"repo_token\":\"$GITHUB_TOKEN\",\"payload\":{\"build_num\":$GITHUB_RUN_ID,\"status\":\"done\"}}" || true

View File

@ -16,8 +16,9 @@ permissions: read-all
jobs: jobs:
analysis: analysis:
if: github.repository == 'emqx/emqx'
name: Scorecard analysis name: Scorecard analysis
runs-on: ubuntu-latest runs-on: ubuntu-22.04
permissions: permissions:
security-events: write security-events: write
id-token: write id-token: write

View File

@ -13,8 +13,8 @@ permissions:
jobs: jobs:
stale: stale:
if: github.repository_owner == 'emqx' if: github.repository == 'emqx/emqx'
runs-on: ${{ endsWith(github.repository, '/emqx') && 'ubuntu-22.04' || fromJSON('["self-hosted","ephemeral","linux","x64"]') }} runs-on: ubuntu-22.04
permissions: permissions:
issues: write issues: write
pull-requests: none pull-requests: none

View File

@ -21,7 +21,7 @@ endif
# Dashboard version # Dashboard version
# from https://github.com/emqx/emqx-dashboard5 # from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.9.0-beta.1 export EMQX_DASHBOARD_VERSION ?= v1.9.0-beta.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.7.0-beta.1 export EMQX_EE_DASHBOARD_VERSION ?= e1.7.0-beta.4
PROFILE ?= emqx PROFILE ?= emqx
REL_PROFILES := emqx emqx-enterprise REL_PROFILES := emqx emqx-enterprise

View File

@ -34,7 +34,7 @@
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}},
{ra, "2.7.3"} {ra, "2.7.3"}
]}. ]}.

View File

@ -1789,7 +1789,9 @@ mqtt_listener(Bind) ->
hoconsc:array(string()), hoconsc:array(string()),
#{ #{
desc => ?DESC(mqtt_listener_access_rules), desc => ?DESC(mqtt_listener_access_rules),
default => [<<"allow all">>] default => [<<"allow all">>],
converter => fun access_rules_converter/1,
validator => fun access_rules_validator/1
} }
)}, )},
{"proxy_protocol", {"proxy_protocol",
@ -1810,6 +1812,50 @@ mqtt_listener(Bind) ->
)} )}
] ++ emqx_schema_hooks:injection_point('mqtt.listener'). ] ++ emqx_schema_hooks:injection_point('mqtt.listener').
access_rules_converter(AccessRules) ->
DeepRules =
lists:foldr(
fun(Rule, Acc) ->
Rules0 = re:split(Rule, <<"\\s*,\\s*">>, [{return, binary}]),
Rules1 = [string:trim(R) || R <- Rules0],
[Rules1 | Acc]
end,
[],
AccessRules
),
[unicode:characters_to_list(RuleBin) || RuleBin <- lists:flatten(DeepRules)].
access_rules_validator(AccessRules) ->
InvalidRules = [Rule || Rule <- AccessRules, is_invalid_rule(Rule)],
case InvalidRules of
[] ->
ok;
_ ->
MsgStr = io_lib:format("invalid_rule(s): ~ts", [string:join(InvalidRules, ", ")]),
MsgBin = unicode:characters_to_binary(MsgStr),
{error, MsgBin}
end.
is_invalid_rule(S) ->
try
[Action, CIDR] = string:tokens(S, " "),
case Action of
"allow" -> ok;
"deny" -> ok
end,
case CIDR of
"all" ->
ok;
_ ->
%% should not crash
_ = esockd_cidr:parse(CIDR, true),
ok
end,
false
catch
_:_ -> true
end.
base_listener(Bind) -> base_listener(Bind) ->
[ [
{"enable", {"enable",

View File

@ -115,6 +115,68 @@ t_update_conf(_Conf) ->
?assert(is_running('wss:default')), ?assert(is_running('wss:default')),
ok. ok.
t_update_conf_validate_access_rules(_Conf) ->
Raw = emqx:get_raw_config(?LISTENERS),
RawCorrectConf1 = emqx_utils_maps:deep_put(
[<<"tcp">>, <<"default">>, <<"access_rules">>], Raw, ["allow all"]
),
?assertMatch({ok, _}, emqx:update_config(?LISTENERS, RawCorrectConf1)),
RawCorrectConf2 = emqx_utils_maps:deep_put(
[<<"tcp">>, <<"default">>, <<"access_rules">>], Raw, ["deny all"]
),
?assertMatch({ok, _}, emqx:update_config(?LISTENERS, RawCorrectConf2)),
RawCorrectConf3 = emqx_utils_maps:deep_put(
[<<"tcp">>, <<"default">>, <<"access_rules">>], Raw, ["allow 10.0.1.0/24"]
),
?assertMatch({ok, _}, emqx:update_config(?LISTENERS, RawCorrectConf3)),
RawIncorrectConf1 = emqx_utils_maps:deep_put(
[<<"tcp">>, <<"default">>, <<"access_rules">>], Raw, ["xxx all"]
),
?assertMatch(
{error, #{
reason := <<"invalid_rule(s): xxx all">>,
value := ["xxx all"],
path := "listeners.tcp.default.access_rules",
kind := validation_error,
matched_type := "emqx:mqtt_tcp_listener"
}},
emqx:update_config(?LISTENERS, RawIncorrectConf1)
),
RawIncorrectConf2 = emqx_utils_maps:deep_put(
[<<"tcp">>, <<"default">>, <<"access_rules">>], Raw, ["allow xxx"]
),
?assertMatch(
{error, #{
reason := <<"invalid_rule(s): allow xxx">>,
value := ["allow xxx"],
path := "listeners.tcp.default.access_rules",
kind := validation_error,
matched_type := "emqx:mqtt_tcp_listener"
}},
emqx:update_config(?LISTENERS, RawIncorrectConf2)
),
ok.
t_update_conf_access_rules_split(_Conf) ->
Raw = emqx:get_raw_config(?LISTENERS),
Raw1 = emqx_utils_maps:deep_put(
[<<"tcp">>, <<"default">>, <<"access_rules">>],
Raw,
[" allow all , deny all , allow 10.0.1.0/24 "]
),
?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw1)),
?assertMatch(
#{
tcp := #{
default := #{
access_rules := ["allow all", "deny all", "allow 10.0.1.0/24"]
}
}
},
emqx:get_config(?LISTENERS)
),
ok.
t_update_tcp_keepalive_conf(_Conf) -> t_update_tcp_keepalive_conf(_Conf) ->
Keepalive = <<"240,30,5">>, Keepalive = <<"240,30,5">>,
KeepaliveStr = binary_to_list(Keepalive), KeepaliveStr = binary_to_list(Keepalive),

View File

@ -166,7 +166,7 @@ fields(producer) ->
)}, )},
{partition_key, {partition_key,
sc( sc(
binary(), emqx_schema:template(),
#{ #{
required => true, required => true,
desc => ?DESC("partition_key") desc => ?DESC("partition_key")

View File

@ -118,6 +118,7 @@ which_dbs() ->
init({#?db_sup{db = DB}, DefaultOpts}) -> init({#?db_sup{db = DB}, DefaultOpts}) ->
%% Spec for the top-level supervisor for the database: %% Spec for the top-level supervisor for the database:
logger:notice("Starting DS DB ~p", [DB]), logger:notice("Starting DS DB ~p", [DB]),
emqx_ds_builtin_sup:clean_gvars(DB),
emqx_ds_builtin_metrics:init_for_db(DB), emqx_ds_builtin_metrics:init_for_db(DB),
Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts),
ok = start_ra_system(DB, Opts), ok = start_ra_system(DB, Opts),

View File

@ -190,7 +190,7 @@ prometheus_per_db(NodeOrAggr) ->
%% ... %% ...
%% ''' %% '''
%% %%
%% If `NodeOrAggr' = `node' then node name is appended to the list of %% If `NodeOrAggr' = `aggr' then node name is appended to the list of
%% labels. %% labels.
prometheus_per_db(NodeOrAggr, DB, Acc0) -> prometheus_per_db(NodeOrAggr, DB, Acc0) ->
Labels = [ Labels = [

View File

@ -23,6 +23,7 @@
%% API: %% API:
-export([start_db/2, stop_db/1]). -export([start_db/2, stop_db/1]).
-export([set_gvar/3, get_gvar/3, clean_gvars/1]).
%% behavior callbacks: %% behavior callbacks:
-export([init/1]). -export([init/1]).
@ -39,6 +40,13 @@
-define(top, ?MODULE). -define(top, ?MODULE).
-define(databases, emqx_ds_builtin_databases_sup). -define(databases, emqx_ds_builtin_databases_sup).
-define(gvar_tab, emqx_ds_builtin_gvar).
-record(gvar, {
k :: {emqx_ds:db(), _Key},
v :: _Value
}).
%%================================================================================ %%================================================================================
%% API functions %% API functions
%%================================================================================ %%================================================================================
@ -61,11 +69,31 @@ stop_db(DB) ->
Pid when is_pid(Pid) -> Pid when is_pid(Pid) ->
_ = supervisor:terminate_child(?databases, DB), _ = supervisor:terminate_child(?databases, DB),
_ = supervisor:delete_child(?databases, DB), _ = supervisor:delete_child(?databases, DB),
ok; clean_gvars(DB);
undefined -> undefined ->
ok ok
end. end.
%% @doc Set a DB-global variable. Please don't abuse this API.
-spec set_gvar(emqx_ds:db(), _Key, _Val) -> ok.
set_gvar(DB, Key, Val) ->
ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}),
ok.
-spec get_gvar(emqx_ds:db(), _Key, Val) -> Val.
get_gvar(DB, Key, Default) ->
case ets:lookup(?gvar_tab, {DB, Key}) of
[#gvar{v = Val}] ->
Val;
[] ->
Default
end.
-spec clean_gvars(emqx_ds:db()) -> ok.
clean_gvars(DB) ->
ets:match_delete(?gvar_tab, #gvar{k = {DB, '_'}, _ = '_'}),
ok.
%%================================================================================ %%================================================================================
%% behavior callbacks %% behavior callbacks
%%================================================================================ %%================================================================================
@ -96,6 +124,7 @@ init(?top) ->
type => supervisor, type => supervisor,
shutdown => infinity shutdown => infinity
}, },
_ = ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]),
%% %%
SupFlags = #{ SupFlags = #{
strategy => one_for_all, strategy => one_for_all,

View File

@ -19,7 +19,9 @@
%% API: %% API:
-export([ -export([
trie_create/1, trie_create/0, trie_create/1, trie_create/0,
destroy/1,
trie_restore/2, trie_restore/2,
trie_update/2,
trie_copy_learned_paths/2, trie_copy_learned_paths/2,
topic_key/3, topic_key/3,
match_topics/2, match_topics/2,
@ -116,10 +118,20 @@ trie_create(UserOpts) ->
trie_create() -> trie_create() ->
trie_create(#{}). trie_create(#{}).
-spec destroy(trie()) -> ok.
destroy(#trie{trie = Trie, stats = Stats}) ->
catch ets:delete(Trie),
catch ets:delete(Stats),
ok.
%% @doc Restore trie from a dump %% @doc Restore trie from a dump
-spec trie_restore(options(), [{_Key, _Val}]) -> trie(). -spec trie_restore(options(), [{_Key, _Val}]) -> trie().
trie_restore(Options, Dump) -> trie_restore(Options, Dump) ->
Trie = trie_create(Options), trie_update(trie_create(Options), Dump).
%% @doc Update a trie with a dump of operations (used for replication)
-spec trie_update(trie(), [{_Key, _Val}]) -> trie().
trie_update(Trie, Dump) ->
lists:foreach( lists:foreach(
fun({{StateFrom, Token}, StateTo}) -> fun({{StateFrom, Token}, StateTo}) ->
trie_insert(Trie, StateFrom, Token, StateTo) trie_insert(Trie, StateFrom, Token, StateTo)

View File

@ -36,7 +36,8 @@
update_iterator/3, update_iterator/3,
next/3, next/3,
delete_next/4, delete_next/4,
shard_of_message/3 shard_of_message/3,
current_timestamp/2
]). ]).
%% internal exports: %% internal exports:
@ -65,6 +66,7 @@
-export([ -export([
init/1, init/1,
apply/3, apply/3,
tick/2,
snapshot_module/0 snapshot_module/0
]). ]).
@ -86,6 +88,7 @@
]). ]).
-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-include("emqx_ds_replication_layer.hrl"). -include("emqx_ds_replication_layer.hrl").
%%================================================================================ %%================================================================================
@ -155,12 +158,14 @@
%% Command. Each command is an entry in the replication log. %% Command. Each command is an entry in the replication log.
-type ra_command() :: #{ -type ra_command() :: #{
?tag := ?BATCH | add_generation | update_config | drop_generation, ?tag := ?BATCH | add_generation | update_config | drop_generation | storage_event,
_ => _ _ => _
}. }.
-type timestamp_us() :: non_neg_integer(). -type timestamp_us() :: non_neg_integer().
-define(gv_timestamp(SHARD), {gv_timestamp, SHARD}).
%%================================================================================ %%================================================================================
%% API functions %% API functions
%%================================================================================ %%================================================================================
@ -363,9 +368,16 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
end, end,
integer_to_binary(Hash). integer_to_binary(Hash).
-spec foreach_shard(emqx_ds:db(), fun((shard_id()) -> _)) -> ok.
foreach_shard(DB, Fun) -> foreach_shard(DB, Fun) ->
lists:foreach(Fun, list_shards(DB)). lists:foreach(Fun, list_shards(DB)).
%% @doc Messages have been replicated up to this timestamp on the
%% local server
-spec current_timestamp(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> emqx_ds:time().
current_timestamp(DB, Shard) ->
emqx_ds_builtin_sup:get_gvar(DB, ?gv_timestamp(Shard), 0).
%%================================================================================ %%================================================================================
%% behavior callbacks %% behavior callbacks
%%================================================================================ %%================================================================================
@ -490,7 +502,9 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
ShardId = {DB, Shard}, ShardId = {DB, Shard},
?IF_STORAGE_RUNNING( ?IF_STORAGE_RUNNING(
ShardId, ShardId,
emqx_ds_storage_layer:next(ShardId, Iter, BatchSize) emqx_ds_storage_layer:next(
ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard)
)
). ).
-spec do_delete_next_v4( -spec do_delete_next_v4(
@ -502,7 +516,13 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
) -> ) ->
emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()). emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()).
do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) -> do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) ->
emqx_ds_storage_layer:delete_next({DB, Shard}, Iter, Selector, BatchSize). emqx_ds_storage_layer:delete_next(
{DB, Shard},
Iter,
Selector,
BatchSize,
emqx_ds_replication_layer:current_timestamp(DB, Shard)
).
-spec do_add_generation_v2(emqx_ds:db()) -> no_return(). -spec do_add_generation_v2(emqx_ds:db()) -> no_return().
do_add_generation_v2(_DB) -> do_add_generation_v2(_DB) ->
@ -672,50 +692,69 @@ apply(
?tag := ?BATCH, ?tag := ?BATCH,
?batch_messages := MessagesIn ?batch_messages := MessagesIn
}, },
#{db_shard := DBShard, latest := Latest} = State #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0
) -> ) ->
%% NOTE %% NOTE
%% Unique timestamp tracking real time closely. %% Unique timestamp tracking real time closely.
%% With microsecond granularity it should be nearly impossible for it to run %% With microsecond granularity it should be nearly impossible for it to run
%% too far ahead than the real time clock. %% too far ahead than the real time clock.
{NLatest, Messages} = assign_timestamps(Latest, MessagesIn), ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, ts => Latest0}),
%% TODO {Latest, Messages} = assign_timestamps(Latest0, MessagesIn),
%% Batch is now reversed, but it should not make a lot of difference.
%% Even if it would be in order, it's still possible to write messages far away
%% in the past, i.e. when replica catches up with the leader. Storage layer
%% currently relies on wall clock time to decide if it's safe to iterate over
%% next epoch, this is likely wrong. Ideally it should rely on consensus clock
%% time instead.
Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}),
NState = State#{latest := NLatest}, State = State0#{latest := Latest},
set_ts(DBShard, Latest),
%% TODO: Need to measure effects of changing frequency of `release_cursor`. %% TODO: Need to measure effects of changing frequency of `release_cursor`.
Effect = {release_cursor, RaftIdx, NState}, Effect = {release_cursor, RaftIdx, State},
{NState, Result, Effect}; {State, Result, Effect};
apply( apply(
_RaftMeta, _RaftMeta,
#{?tag := add_generation, ?since := Since}, #{?tag := add_generation, ?since := Since},
#{db_shard := DBShard, latest := Latest} = State #{db_shard := DBShard, latest := Latest0} = State0
) -> ) ->
{Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest), {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp), Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp),
NState = State#{latest := NLatest}, State = State0#{latest := Latest},
{NState, Result}; set_ts(DBShard, Latest),
{State, Result};
apply( apply(
_RaftMeta, _RaftMeta,
#{?tag := update_config, ?since := Since, ?config := Opts}, #{?tag := update_config, ?since := Since, ?config := Opts},
#{db_shard := DBShard, latest := Latest} = State #{db_shard := DBShard, latest := Latest0} = State0
) -> ) ->
{Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest), {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0),
Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts), Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts),
NState = State#{latest := NLatest}, State = State0#{latest := Latest},
{NState, Result}; {State, Result};
apply( apply(
_RaftMeta, _RaftMeta,
#{?tag := drop_generation, ?generation := GenId}, #{?tag := drop_generation, ?generation := GenId},
#{db_shard := DBShard} = State #{db_shard := DBShard} = State
) -> ) ->
Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId), Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId),
{State, Result}. {State, Result};
apply(
_RaftMeta,
#{?tag := storage_event, ?payload := CustomEvent, ?now := Now},
#{db_shard := DBShard, latest := Latest0} = State
) ->
Latest = max(Latest0, Now),
set_ts(DBShard, Latest),
?tp(
debug,
emqx_ds_replication_layer_storage_event,
#{
shard => DBShard, payload => CustomEvent, latest => Latest
}
),
Effects = handle_custom_event(DBShard, Latest, CustomEvent),
{State#{latest => Latest}, ok, Effects}.
-spec tick(integer(), ra_state()) -> ra_machine:effects().
tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
%% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard),
{Timestamp, _} = ensure_monotonic_timestamp(timestamp_to_timeus(TimeMs), Latest),
?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}),
handle_custom_event(DBShard, Timestamp, tick).
assign_timestamps(Latest, Messages) -> assign_timestamps(Latest, Messages) ->
assign_timestamps(Latest, Messages, []). assign_timestamps(Latest, Messages, []).
@ -730,7 +769,7 @@ assign_timestamps(Latest, [MessageIn | Rest], Acc) ->
assign_timestamps(Latest + 1, Rest, [Message | Acc]) assign_timestamps(Latest + 1, Rest, [Message | Acc])
end; end;
assign_timestamps(Latest, [], Acc) -> assign_timestamps(Latest, [], Acc) ->
{Latest, Acc}. {Latest, lists:reverse(Acc)}.
assign_timestamp(TimestampUs, Message) -> assign_timestamp(TimestampUs, Message) ->
{TimestampUs, Message}. {TimestampUs, Message}.
@ -748,3 +787,18 @@ timeus_to_timestamp(TimestampUs) ->
snapshot_module() -> snapshot_module() ->
emqx_ds_replication_snapshot. emqx_ds_replication_snapshot.
handle_custom_event(DBShard, Latest, Event) ->
try
Events = emqx_ds_storage_layer:handle_event(DBShard, Latest, Event),
[{append, #{?tag => storage_event, ?payload => I, ?now => Latest}} || I <- Events]
catch
EC:Err:Stacktrace ->
?tp(error, ds_storage_custom_event_fail, #{
EC => Err, stacktrace => Stacktrace, event => Event
}),
[]
end.
set_ts({DB, Shard}, TS) ->
emqx_ds_builtin_sup:set_gvar(DB, ?gv_timestamp(Shard), TS).

View File

@ -41,4 +41,8 @@
%% drop_generation %% drop_generation
-define(generation, 2). -define(generation, 2).
%% custom events
-define(payload, 2).
-define(now, 3).
-endif. -endif.

View File

@ -16,6 +16,7 @@
-module(emqx_ds_replication_layer_shard). -module(emqx_ds_replication_layer_shard).
%% API:
-export([start_link/3]). -export([start_link/3]).
%% Static server configuration %% Static server configuration
@ -325,7 +326,8 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
ClusterName = cluster_name(DB, Shard), ClusterName = cluster_name(DB, Shard),
LocalServer = local_server(DB, Shard), LocalServer = local_server(DB, Shard),
Servers = shard_servers(DB, Shard), Servers = shard_servers(DB, Shard),
case ra:restart_server(DB, LocalServer) of MutableConfig = #{tick_timeout => 100},
case ra:restart_server(DB, LocalServer, MutableConfig) of
{error, name_not_registered} -> {error, name_not_registered} ->
Bootstrap = true, Bootstrap = true,
Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
@ -336,7 +338,7 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) ->
], ],
ReplicationOpts ReplicationOpts
), ),
ok = ra:start_server(DB, #{ ok = ra:start_server(DB, MutableConfig#{
id => LocalServer, id => LocalServer,
uid => server_uid(DB, Shard), uid => server_uid(DB, Shard),
cluster_name => ClusterName, cluster_name => ClusterName,

View File

@ -28,15 +28,18 @@
create/4, create/4,
open/5, open/5,
drop/5, drop/5,
store_batch/4, prepare_batch/4,
commit_batch/3,
get_streams/4, get_streams/4,
get_delete_streams/4, get_delete_streams/4,
make_iterator/5, make_iterator/5,
make_delete_iterator/5, make_delete_iterator/5,
update_iterator/4, update_iterator/4,
next/4, next/5,
delete_next/5, delete_next/6,
post_creation_actions/1 post_creation_actions/1,
handle_event/4
]). ]).
%% internal exports: %% internal exports:
@ -66,6 +69,9 @@
-define(start_time, 3). -define(start_time, 3).
-define(storage_key, 4). -define(storage_key, 4).
-define(last_seen_key, 5). -define(last_seen_key, 5).
-define(cooked_payloads, 6).
-define(cooked_lts_ops, 7).
-define(cooked_ts, 8).
-type options() :: -type options() ::
#{ #{
@ -88,16 +94,28 @@
db :: rocksdb:db_handle(), db :: rocksdb:db_handle(),
data :: rocksdb:cf_handle(), data :: rocksdb:cf_handle(),
trie :: emqx_ds_lts:trie(), trie :: emqx_ds_lts:trie(),
trie_cf :: rocksdb:cf_handle(),
keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()), keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()),
ts_offset :: non_neg_integer() ts_bits :: non_neg_integer(),
ts_offset :: non_neg_integer(),
gvars :: ets:table()
}). }).
-define(lts_persist_ops, emqx_ds_storage_bitfield_lts_ops).
-type s() :: #s{}. -type s() :: #s{}.
-type stream() :: emqx_ds_lts:msg_storage_key(). -type stream() :: emqx_ds_lts:msg_storage_key().
-type delete_stream() :: emqx_ds_lts:msg_storage_key(). -type delete_stream() :: emqx_ds_lts:msg_storage_key().
-type cooked_batch() ::
#{
?cooked_payloads := [{binary(), binary()}],
?cooked_lts_ops := [{binary(), binary()}],
?cooked_ts := integer()
}.
-type iterator() :: -type iterator() ::
#{ #{
?tag := ?IT, ?tag := ?IT,
@ -141,6 +159,10 @@
-define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]). -define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]).
%% GVar used for idle detection:
-define(IDLE_DETECT, idle_detect).
-define(EPOCH(S, TS), (TS bsl S#s.ts_bits)).
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-endif. -endif.
@ -212,8 +234,11 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
db = DBHandle, db = DBHandle,
data = DataCF, data = DataCF,
trie = Trie, trie = Trie,
trie_cf = TrieCF,
keymappers = KeymapperCache, keymappers = KeymapperCache,
ts_offset = TSOffsetBits ts_offset = TSOffsetBits,
ts_bits = TSBits,
gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}])
}. }.
-spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) -> -spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) ->
@ -238,32 +263,78 @@ post_creation_actions(
s() s()
) -> ) ->
ok. ok.
drop(_Shard, DBHandle, GenId, CFRefs, #s{}) -> drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) ->
emqx_ds_lts:destroy(Trie),
catch ets:delete(GVars),
{_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs), {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
{_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs), {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs),
ok = rocksdb:drop_column_family(DBHandle, DataCF), ok = rocksdb:drop_column_family(DBHandle, DataCF),
ok = rocksdb:drop_column_family(DBHandle, TrieCF), ok = rocksdb:drop_column_family(DBHandle, TrieCF),
ok. ok.
-spec store_batch( -spec prepare_batch(
emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:shard_id(),
s(), s(),
[{emqx_ds:time(), emqx_types:message()}], [{emqx_ds:time(), emqx_types:message()}, ...],
emqx_ds:message_store_opts() emqx_ds:message_store_opts()
) -> ) ->
emqx_ds:store_batch_result(). {ok, cooked_batch()}.
store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> prepare_batch(_ShardId, S, Messages, _Options) ->
_ = erase(?lts_persist_ops),
{Payloads, MaxTs} =
lists:mapfoldl(
fun({Timestamp, Msg}, Acc) ->
{Key, _} = make_key(S, Timestamp, Msg),
Payload = {Key, message_to_value_v1(Msg)},
{Payload, max(Acc, Timestamp)}
end,
0,
Messages
),
{ok, #{
?cooked_payloads => Payloads,
?cooked_lts_ops => pop_lts_persist_ops(),
?cooked_ts => MaxTs
}}.
-spec commit_batch(
emqx_ds_storage_layer:shard_id(),
s(),
cooked_batch()
) -> ok | emqx_ds:error(_).
commit_batch(
_ShardId,
_Data,
#{?cooked_payloads := [], ?cooked_lts_ops := LTS}
) ->
%% Assert:
[] = LTS,
ok;
commit_batch(
_ShardId,
#s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars},
#{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs}
) ->
{ok, Batch} = rocksdb:batch(), {ok, Batch} = rocksdb:batch(),
%% Commit LTS trie to the storage:
lists:foreach( lists:foreach(
fun({Timestamp, Msg}) -> fun({Key, Val}) ->
{Key, _} = make_key(S, Timestamp, Msg), ok = rocksdb:batch_put(Batch, TrieCF, term_to_binary(Key), term_to_binary(Val))
Val = serialize(Msg),
rocksdb:put(DB, Data, Key, Val, [])
end, end,
Messages LtsOps
),
%% Apply LTS ops to the memory cache:
_ = emqx_ds_lts:trie_update(Trie, LtsOps),
%% Commit payloads:
lists:foreach(
fun({Key, Val}) ->
ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val))
end,
Payloads
), ),
Result = rocksdb:write_batch(DB, Batch, []), Result = rocksdb:write_batch(DB, Batch, []),
rocksdb:release_batch(Batch), rocksdb:release_batch(Batch),
ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}),
%% NOTE %% NOTE
%% Strictly speaking, `{error, incomplete}` is a valid result but should be impossible to %% Strictly speaking, `{error, incomplete}` is a valid result but should be impossible to
%% observe until there's `{no_slowdown, true}` in write options. %% observe until there's `{no_slowdown, true}` in write options.
@ -348,13 +419,39 @@ update_iterator(
) -> ) ->
{ok, OldIter#{?last_seen_key => DSKey}}. {ok, OldIter#{?last_seen_key => DSKey}}.
next(Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> next(
%% Compute safe cutoff time. Shard,
%% It's the point in time where the last complete epoch ends, so we need to know Schema = #s{ts_offset = TSOffset, ts_bits = TSBits},
%% the current time to compute it. It = #{?storage_key := Stream},
BatchSize,
Now
) ->
init_counters(), init_counters(),
Now = emqx_ds:timestamp_us(), %% Compute safe cutoff time. It's the point in time where the last
SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, %% complete epoch ends, so we need to know the current time to
%% compute it. This is needed because new keys can be added before
%% the iterator.
IsWildcard =
case Stream of
{_StaticKey, []} -> false;
_ -> true
end,
SafeCutoffTime =
case IsWildcard of
true ->
(Now bsr TSOffset) bsl TSOffset;
false ->
%% Iterators scanning streams without varying topic
%% levels can operate on incomplete epochs, since new
%% matching keys for the single topic are added in
%% lexicographic order.
%%
%% Note: this DOES NOT apply to non-wildcard topic
%% filters operating on streams with varying parts:
%% iterator can jump to the next topic and then it
%% won't backtrack.
1 bsl TSBits - 1
end,
try try
next_until(Schema, It, SafeCutoffTime, BatchSize) next_until(Schema, It, SafeCutoffTime, BatchSize)
after after
@ -386,12 +483,11 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime,
rocksdb:iterator_close(ITHandle) rocksdb:iterator_close(ITHandle)
end. end.
delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) -> delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize, Now) ->
%% Compute safe cutoff time. %% Compute safe cutoff time.
%% It's the point in time where the last complete epoch ends, so we need to know %% It's the point in time where the last complete epoch ends, so we need to know
%% the current time to compute it. %% the current time to compute it.
init_counters(), init_counters(),
Now = emqx_message:timestamp_now(),
SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
try try
delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize) delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize)
@ -441,6 +537,24 @@ delete_next_until(
rocksdb:iterator_close(ITHandle) rocksdb:iterator_close(ITHandle)
end. end.
handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
case ets:lookup(Gvars, ?IDLE_DETECT) of
[{?IDLE_DETECT, Latch, LastWrittenTs}] ->
ok;
[] ->
Latch = false,
LastWrittenTs = 0
end,
case Latch of
false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) ->
ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}),
[dummy_event];
_ ->
[]
end;
handle_event(_ShardId, _Data, _Time, _Event) ->
[].
%%================================================================================ %%================================================================================
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================
@ -722,9 +836,6 @@ value_v1_to_message({Id, Qos, From, Flags, Headers, Topic, Payload, Timestamp, E
extra = Extra extra = Extra
}. }.
serialize(Msg) ->
term_to_binary(message_to_value_v1(Msg)).
deserialize(Blob) -> deserialize(Blob) ->
value_v1_to_message(binary_to_term(Blob)). value_v1_to_message(binary_to_term(Blob)).
@ -752,7 +863,8 @@ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) ->
-spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie(). -spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie().
restore_trie(TopicIndexBytes, DB, CF) -> restore_trie(TopicIndexBytes, DB, CF) ->
PersistCallback = fun(Key, Val) -> PersistCallback = fun(Key, Val) ->
rocksdb:put(DB, CF, term_to_binary(Key), term_to_binary(Val), []) push_lts_persist_op(Key, Val),
ok
end, end,
{ok, IT} = rocksdb:iterator(DB, CF, []), {ok, IT} = rocksdb:iterator(DB, CF, []),
try try
@ -800,8 +912,29 @@ data_cf(GenId) ->
trie_cf(GenId) -> trie_cf(GenId) ->
"emqx_ds_storage_bitfield_lts_trie" ++ integer_to_list(GenId). "emqx_ds_storage_bitfield_lts_trie" ++ integer_to_list(GenId).
-spec push_lts_persist_op(_Key, _Val) -> ok.
push_lts_persist_op(Key, Val) ->
case erlang:get(?lts_persist_ops) of
undefined ->
erlang:put(?lts_persist_ops, [{Key, Val}]);
L when is_list(L) ->
erlang:put(?lts_persist_ops, [{Key, Val} | L])
end.
-spec pop_lts_persist_ops() -> [{_Key, _Val}].
pop_lts_persist_ops() ->
case erlang:erase(?lts_persist_ops) of
undefined ->
[];
L when is_list(L) ->
L
end.
-ifdef(TEST). -ifdef(TEST).
serialize(Msg) ->
term_to_binary(message_to_value_v1(Msg)).
serialize_deserialize_test() -> serialize_deserialize_test() ->
Msg = #message{ Msg = #message{
id = <<"message_id_val">>, id = <<"message_id_val">>,

View File

@ -26,13 +26,16 @@
%% Data %% Data
store_batch/3, store_batch/3,
prepare_batch/3,
commit_batch/2,
get_streams/3, get_streams/3,
get_delete_streams/3, get_delete_streams/3,
make_iterator/4, make_iterator/4,
make_delete_iterator/4, make_delete_iterator/4,
update_iterator/3, update_iterator/3,
next/3, next/4,
delete_next/4, delete_next/5,
%% Generations %% Generations
update_config/3, update_config/3,
@ -42,7 +45,10 @@
%% Snapshotting %% Snapshotting
take_snapshot/1, take_snapshot/1,
accept_snapshot/1 accept_snapshot/1,
%% Custom events
handle_event/3
]). ]).
%% gen_server %% gen_server
@ -63,7 +69,8 @@
shard_id/0, shard_id/0,
options/0, options/0,
prototype/0, prototype/0,
post_creation_context/0 post_creation_context/0,
cooked_batch/0
]). ]).
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -79,11 +86,11 @@
%% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending %% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending
%% records over the wire. %% records over the wire.
%% tags: %% tags:
-define(STREAM, 1). -define(STREAM, 1).
-define(IT, 2). -define(IT, 2).
-define(DELETE_IT, 3). -define(DELETE_IT, 3).
-define(COOKED_BATCH, 4).
%% keys: %% keys:
-define(tag, 1). -define(tag, 1).
@ -130,6 +137,13 @@
?enc := term() ?enc := term()
}. }.
-opaque cooked_batch() ::
#{
?tag := ?COOKED_BATCH,
?generation := gen_id(),
?enc := term()
}.
%%%% Generation: %%%% Generation:
-define(GEN_KEY(GEN_ID), {generation, GEN_ID}). -define(GEN_KEY(GEN_ID), {generation, GEN_ID}).
@ -201,16 +215,23 @@
-callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
_Data. _Data.
%% Delete the schema and data
-callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) -> -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) ->
ok | {error, _Reason}. ok | {error, _Reason}.
-callback store_batch( -callback prepare_batch(
shard_id(), shard_id(),
_Data, _Data,
[{emqx_ds:time(), emqx_types:message()}], [{emqx_ds:time(), emqx_types:message()}, ...],
emqx_ds:message_store_opts() emqx_ds:message_store_opts()
) -> ) ->
emqx_ds:store_batch_result(). {ok, term()} | emqx_ds:error(_).
-callback commit_batch(
shard_id(),
_Data,
_CookedBatch
) -> ok | emqx_ds:error(_).
-callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> -callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) ->
[_Stream]. [_Stream].
@ -223,12 +244,19 @@
) -> ) ->
emqx_ds:make_delete_iterator_result(_Iterator). emqx_ds:make_delete_iterator_result(_Iterator).
-callback next(shard_id(), _Data, Iter, pos_integer()) -> -callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time()) ->
{ok, Iter, [emqx_types:message()]} | {error, _}. {ok, Iter, [emqx_types:message()]} | {error, _}.
-callback delete_next(
shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time()
) ->
{ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}.
-callback handle_event(shard_id(), _Data, emqx_ds:time(), CustomEvent | tick) -> [CustomEvent].
-callback post_creation_actions(post_creation_context()) -> _Data. -callback post_creation_actions(post_creation_context()) -> _Data.
-optional_callbacks([post_creation_actions/1]). -optional_callbacks([post_creation_actions/1, handle_event/4]).
%%================================================================================ %%================================================================================
%% API for the replication layer %% API for the replication layer
@ -251,20 +279,54 @@ drop_shard(Shard) ->
emqx_ds:message_store_opts() emqx_ds:message_store_opts()
) -> ) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
store_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> store_batch(Shard, Messages, Options) ->
%% NOTE
%% We assume that batches do not span generations. Callers should enforce this.
?tp(emqx_ds_storage_layer_store_batch, #{ ?tp(emqx_ds_storage_layer_store_batch, #{
shard => Shard, messages => Messages, options => Options shard => Shard, messages => Messages, options => Options
}), }),
#{module := Mod, data := GenData} = generation_at(Shard, Time), case prepare_batch(Shard, Messages, Options) of
{ok, CookedBatch} ->
commit_batch(Shard, CookedBatch);
ignore ->
ok;
Error = {error, _, _} ->
Error
end.
-spec prepare_batch(
shard_id(),
[{emqx_ds:time(), emqx_types:message()}],
emqx_ds:message_store_opts()
) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_).
prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) ->
%% NOTE
%% We assume that batches do not span generations. Callers should enforce this.
?tp(emqx_ds_storage_layer_prepare_batch, #{
shard => Shard, messages => Messages, options => Options
}),
{GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time),
T0 = erlang:monotonic_time(microsecond), T0 = erlang:monotonic_time(microsecond),
Result = Mod:store_batch(Shard, GenData, Messages, Options), Result =
case Mod:prepare_batch(Shard, GenData, Messages, Options) of
{ok, CookedBatch} ->
{ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
Error = {error, _, _} ->
Error
end,
T1 = erlang:monotonic_time(microsecond), T1 = erlang:monotonic_time(microsecond),
%% TODO store->prepare
emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
Result; Result;
store_batch(_Shard, [], _Options) -> prepare_batch(_Shard, [], _Options) ->
ok. ignore.
-spec commit_batch(shard_id(), cooked_batch()) -> emqx_ds:store_batch_result().
commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}) ->
#{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard),
T0 = erlang:monotonic_time(microsecond),
Result = Mod:commit_batch(Shard, GenData, CookedBatch),
T1 = erlang:monotonic_time(microsecond),
emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
Result.
-spec get_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec get_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[{integer(), stream()}]. [{integer(), stream()}].
@ -277,6 +339,13 @@ get_streams(Shard, TopicFilter, StartTime) ->
case generation_get(Shard, GenId) of case generation_get(Shard, GenId) of
#{module := Mod, data := GenData} -> #{module := Mod, data := GenData} ->
Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
?tp(get_streams_get_gen_topic, #{
gen_id => GenId,
topic => TopicFilter,
start_time => StartTime,
streams => Streams,
gen_data => GenData
}),
[ [
{GenId, ?stream_v2(GenId, InnerStream)} {GenId, ?stream_v2(GenId, InnerStream)}
|| InnerStream <- Streams || InnerStream <- Streams
@ -377,13 +446,13 @@ update_iterator(
{error, unrecoverable, generation_not_found} {error, unrecoverable, generation_not_found}
end. end.
-spec next(shard_id(), iterator(), pos_integer()) -> -spec next(shard_id(), iterator(), pos_integer(), emqx_ds:time()) ->
emqx_ds:next_result(iterator()). emqx_ds:next_result(iterator()).
next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize, Now) ->
case generation_get(Shard, GenId) of case generation_get(Shard, GenId) of
#{module := Mod, data := GenData} -> #{module := Mod, data := GenData} ->
Current = generation_current(Shard), Current = generation_current(Shard),
case Mod:next(Shard, GenData, GenIter0, BatchSize) of case Mod:next(Shard, GenData, GenIter0, BatchSize, Now) of
{ok, _GenIter, []} when GenId < Current -> {ok, _GenIter, []} when GenId < Current ->
%% This is a past generation. Storage layer won't write %% This is a past generation. Storage layer won't write
%% any more messages here. The iterator reached the end: %% any more messages here. The iterator reached the end:
@ -399,18 +468,21 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch
{error, unrecoverable, generation_not_found} {error, unrecoverable, generation_not_found}
end. end.
-spec delete_next(shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) -> -spec delete_next(
shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer(), emqx_ds:time()
) ->
emqx_ds:delete_next_result(delete_iterator()). emqx_ds:delete_next_result(delete_iterator()).
delete_next( delete_next(
Shard, Shard,
Iter = #{?tag := ?DELETE_IT, ?generation := GenId, ?enc := GenIter0}, Iter = #{?tag := ?DELETE_IT, ?generation := GenId, ?enc := GenIter0},
Selector, Selector,
BatchSize BatchSize,
Now
) -> ) ->
case generation_get(Shard, GenId) of case generation_get(Shard, GenId) of
#{module := Mod, data := GenData} -> #{module := Mod, data := GenData} ->
Current = generation_current(Shard), Current = generation_current(Shard),
case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize, Now) of
{ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current -> {ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current ->
%% This is a past generation. Storage layer won't write %% This is a past generation. Storage layer won't write
%% any more messages here. The iterator reached the end: %% any more messages here. The iterator reached the end:
@ -849,6 +921,24 @@ handle_accept_snapshot(ShardId) ->
Dir = db_dir(ShardId), Dir = db_dir(ShardId),
emqx_ds_storage_snapshot:new_writer(Dir). emqx_ds_storage_snapshot:new_writer(Dir).
%% FIXME: currently this interface is a hack to handle safe cutoff
%% timestamp in LTS. It has many shortcomings (can lead to infinite
%% loops if the CBM is not careful; events from one generation may be
%% sent to the next one, etc.) and the API is not well thought out in
%% general.
%%
%% The mechanism of storage layer events should be refined later.
-spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent].
handle_event(Shard, Time, Event) ->
{_GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time),
?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}),
case erlang:function_exported(Mod, handle_event, 4) of
true ->
Mod:handle_event(Shard, GenData, Time, Event);
false ->
[]
end.
%%-------------------------------------------------------------------------------- %%--------------------------------------------------------------------------------
%% Schema access %% Schema access
%%-------------------------------------------------------------------------------- %%--------------------------------------------------------------------------------
@ -881,7 +971,7 @@ generations_since(Shard, Since) ->
Schema Schema
). ).
-spec generation_at(shard_id(), emqx_ds:time()) -> generation(). -spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()}.
generation_at(Shard, Time) -> generation_at(Shard, Time) ->
Schema = #{current_generation := Current} = get_schema_runtime(Shard), Schema = #{current_generation := Current} = get_schema_runtime(Shard),
generation_at(Time, Current, Schema). generation_at(Time, Current, Schema).
@ -892,7 +982,7 @@ generation_at(Time, GenId, Schema) ->
#{since := Since} when Time < Since andalso GenId > 0 -> #{since := Since} when Time < Since andalso GenId > 0 ->
generation_at(Time, prev_generation_id(GenId), Schema); generation_at(Time, prev_generation_id(GenId), Schema);
_ -> _ ->
Gen {GenId, Gen}
end. end.
-define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}). -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}).

View File

@ -31,14 +31,15 @@
create/4, create/4,
open/5, open/5,
drop/5, drop/5,
store_batch/4, prepare_batch/4,
commit_batch/3,
get_streams/4, get_streams/4,
get_delete_streams/4, get_delete_streams/4,
make_iterator/5, make_iterator/5,
make_delete_iterator/5, make_delete_iterator/5,
update_iterator/4, update_iterator/4,
next/4, next/5,
delete_next/5 delete_next/6
]). ]).
%% internal exports: %% internal exports:
@ -101,12 +102,14 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
ok = rocksdb:drop_column_family(DBHandle, CFHandle), ok = rocksdb:drop_column_family(DBHandle, CFHandle),
ok. ok.
store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := true}) -> prepare_batch(_ShardId, _Data, Messages, _Options) ->
{ok, Messages}.
commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) ->
{ok, Batch} = rocksdb:batch(), {ok, Batch} = rocksdb:batch(),
lists:foreach( lists:foreach(
fun(Msg) -> fun({TS, Msg}) ->
Id = erlang:unique_integer([monotonic]), Key = <<TS:64>>,
Key = <<Id:64>>,
Val = term_to_binary(Msg), Val = term_to_binary(Msg),
rocksdb:batch_put(Batch, CF, Key, Val) rocksdb:batch_put(Batch, CF, Key, Val)
end, end,
@ -114,16 +117,7 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := tru
), ),
Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []), Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []),
rocksdb:release_batch(Batch), rocksdb:release_batch(Batch),
Res; Res.
store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
lists:foreach(
fun({Timestamp, Msg}) ->
Key = <<Timestamp:64>>,
Val = term_to_binary(Msg),
rocksdb:put(DB, CF, Key, Val, [])
end,
Messages
).
get_streams(_Shard, _Data, _TopicFilter, _StartTime) -> get_streams(_Shard, _Data, _TopicFilter, _StartTime) ->
[#stream{}]. [#stream{}].
@ -154,7 +148,7 @@ update_iterator(_Shard, _Data, OldIter, DSKey) ->
last_seen_message_key = DSKey last_seen_message_key = DSKey
}}. }}.
next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) ->
#it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0, #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0,
{ok, ITHandle} = rocksdb:iterator(DB, CF, []), {ok, ITHandle} = rocksdb:iterator(DB, CF, []),
Action = Action =
@ -170,7 +164,7 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) ->
It = It0#it{last_seen_message_key = Key}, It = It0#it{last_seen_message_key = Key},
{ok, It, lists:reverse(Messages)}. {ok, It, lists:reverse(Messages)}.
delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize) -> delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) ->
#delete_it{ #delete_it{
topic_filter = TopicFilter, topic_filter = TopicFilter,
start_time = StartTime, start_time = StartTime,

View File

@ -21,10 +21,14 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
-include_lib("snabbkaffe/include/test_macros.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(DB, testdb). -define(DB, testdb).
-define(ON(NODE, BODY),
erpc:call(NODE, erlang, apply, [fun() -> BODY end, []])
).
opts() -> opts() ->
opts(#{}). opts(#{}).
@ -32,12 +36,13 @@ opts(Overrides) ->
maps:merge( maps:merge(
#{ #{
backend => builtin, backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}}, %% storage => {emqx_ds_storage_reference, #{}},
storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 10}},
n_shards => 16, n_shards => 16,
n_sites => 1, n_sites => 1,
replication_factor => 3, replication_factor => 3,
replication_options => #{ replication_options => #{
wal_max_size_bytes => 64 * 1024, wal_max_size_bytes => 64,
wal_max_batch_size => 1024, wal_max_batch_size => 1024,
snapshot_interval => 128 snapshot_interval => 128
} }
@ -67,64 +72,61 @@ t_replication_transfers_snapshots('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)). ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_replication_transfers_snapshots(Config) -> t_replication_transfers_snapshots(Config) ->
NMsgs = 4000, NMsgs = 400,
NClients = 5,
{Stream, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages(
?FUNCTION_NAME, NClients, NMsgs
),
Nodes = [Node, NodeOffline | _] = ?config(nodes, Config), Nodes = [Node, NodeOffline | _] = ?config(nodes, Config),
_Specs = [_, SpecOffline | _] = ?config(specs, Config), _Specs = [_, SpecOffline | _] = ?config(specs, Config),
?check_trace(
begin
%% Initialize DB on all nodes and wait for it to be online.
Opts = opts(#{n_shards => 1, n_sites => 3}),
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
),
?retry(
500,
10,
?assertMatch([[_], [_], [_]], [shards_online(N, ?DB) || N <- Nodes])
),
%% Initialize DB on all nodes and wait for it to be online. %% Stop the DB on the "offline" node.
Opts = opts(#{n_shards => 1, n_sites => 3}), ok = emqx_cth_cluster:stop_node(NodeOffline),
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
),
?retry(
500,
10,
?assertMatch([[_], [_], [_]], [shards_online(N, ?DB) || N <- Nodes])
),
%% Stop the DB on the "offline" node. %% Fill the storage with messages and few additional generations.
ok = emqx_cth_cluster:stop_node(NodeOffline), emqx_ds_test_helpers:apply_stream(?DB, Nodes -- [NodeOffline], Stream),
%% Fill the storage with messages and few additional generations. %% Restart the node.
Messages = fill_storage(Node, ?DB, NMsgs, #{p_addgen => 0.01}), [NodeOffline] = emqx_cth_cluster:restart(SpecOffline),
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{
?snk_kind := dsrepl_snapshot_accepted,
?snk_meta := #{node := NodeOffline}
})
),
?assertEqual(
ok,
erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()])
),
%% Restart the node. %% Trigger storage operation and wait the replica to be restored.
[NodeOffline] = emqx_cth_cluster:restart(SpecOffline), _ = add_generation(Node, ?DB),
{ok, SRef} = snabbkaffe:subscribe( ?assertMatch(
?match_event(#{ {ok, _},
?snk_kind := dsrepl_snapshot_accepted, snabbkaffe:receive_events(SRef)
?snk_meta := #{node := NodeOffline} ),
})
),
?assertEqual(
ok,
erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()])
),
%% Trigger storage operation and wait the replica to be restored. %% Wait until any pending replication activities are finished (e.g. Raft log entries).
_ = add_generation(Node, ?DB), ok = timer:sleep(3_000),
?assertMatch(
{ok, _},
snabbkaffe:receive_events(SRef)
),
%% Wait until any pending replication activities are finished (e.g. Raft log entries). %% Check that the DB has been restored:
ok = timer:sleep(3_000), emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
end,
%% Check that the DB has been restored. []
Shard = hd(shards(NodeOffline, ?DB)),
MessagesOffline = lists:keysort(
#message.timestamp,
consume_shard(NodeOffline, ?DB, Shard, ['#'], 0)
),
?assertEqual(
sample(40, Messages),
sample(40, MessagesOffline)
),
?assertEqual(
Messages,
MessagesOffline
). ).
t_rebalance(init, Config) -> t_rebalance(init, Config) ->
@ -142,112 +144,120 @@ t_rebalance(init, Config) ->
t_rebalance('end', Config) -> t_rebalance('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)). ok = emqx_cth_cluster:stop(?config(nodes, Config)).
%% This testcase verifies that the storage rebalancing works correctly:
%% 1. Join/leave operations are applied successfully.
%% 2. Message data survives the rebalancing.
%% 3. Shard cluster membership converges to the target replica allocation.
%% 4. Replication factor is respected.
t_rebalance(Config) -> t_rebalance(Config) ->
%% This testcase verifies that the storage rebalancing works correctly: NMsgs = 50,
%% 1. Join/leave operations are applied successfully.
%% 2. Message data survives the rebalancing.
%% 3. Shard cluster membership converges to the target replica allocation.
%% 4. Replication factor is respected.
NMsgs = 800,
NClients = 5, NClients = 5,
Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages(
?FUNCTION_NAME, NClients, NMsgs
%% Initialize DB on the first node.
Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])),
?assertMatch(
Shards when length(Shards) == 16,
shards_online(N1, ?DB)
), ),
Nodes = [N1, N2 | _] = ?config(nodes, Config),
?check_trace(
#{timetrap => 30_000},
begin
%% 1. Initialize DB on the first node.
Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
?assertEqual(ok, ?ON(N1, emqx_ds:open_db(?DB, Opts))),
?assertMatch(Shards when length(Shards) == 16, shards_online(N1, ?DB)),
%% Open DB on the rest of the nodes. %% 1.1 Open DB on the rest of the nodes:
?assertEqual( [
[{ok, ok} || _ <- [N2, N3, N4]], ?assertEqual(ok, ?ON(Node, emqx_ds:open_db(?DB, Opts)))
erpc:multicall([N2, N3, N4], emqx_ds, open_db, [?DB, Opts]) || Node <- Nodes
), ],
Sites = [S1, S2 | _Rest] = [ds_repl_meta(N, this_site) || N <- Nodes], Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes],
ct:pal("Sites: ~p~n", [Sites]), ct:pal("Sites: ~p~n", [Sites]),
%% Only N1 should be responsible for all shards initially. Sequence = [
?assertEqual( %% Join the second site to the DB replication sites:
[[S1] || _ <- Nodes], {N1, join_db_site, S2},
[ds_repl_meta(N, db_sites, [?DB]) || N <- Nodes] %% Should be a no-op:
), {N2, join_db_site, S2},
%% Now join the rest of the sites:
{N2, assign_db_sites, Sites}
],
Stream1 = emqx_utils_stream:interleave(
[
{50, Stream0},
emqx_utils_stream:const(add_generation)
],
false
),
Stream = emqx_utils_stream:interleave(
[
{50, Stream0},
emqx_utils_stream:list(Sequence)
],
true
),
%% Fill the storage with messages and few additional generations. %% 1.2 Verify that all nodes have the same view of metadata storage:
%% This will force shards to trigger snapshot transfers during rebalance. [
ClientMessages = emqx_utils:pmap( ?defer_assert(
fun(CID) -> ?assertEqual(
N = lists:nth(1 + (CID rem length(Nodes)), Nodes), [S1],
fill_storage(N, ?DB, NMsgs, #{client_id => integer_to_binary(CID)}) ?ON(Node, emqx_ds_replication_layer_meta:db_sites(?DB)),
#{
msg => "Initially, only S1 should be responsible for all shards",
node => Node
}
)
)
|| Node <- Nodes
],
%% 2. Start filling the storage:
emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream),
timer:sleep(5000),
emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams),
[
?defer_assert(
?assertEqual(
16 * 3 div length(Nodes),
n_shards_online(Node, ?DB),
"Each node is now responsible for 3/4 of the shards"
)
)
|| Node <- Nodes
],
%% Verify that the set of shard servers matches the target allocation.
Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes],
ShardServers = [
shard_server_info(N, ?DB, Shard, Site, readiness)
|| {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation),
Shard <- Shards
],
?assert(
lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers),
ShardServers
),
%% Scale down the cluster by removing the first node.
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
ct:pal("Transitions (~p -> ~p): ~p~n", [
Sites, tl(Sites), emqx_ds_test_helpers:transitions(N1, ?DB)
]),
?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N2, ?DB))),
%% Verify that at the end each node is now responsible for each shard.
?defer_assert(
?assertEqual(
[0, 16, 16, 16],
[n_shards_online(N, ?DB) || N <- Nodes]
)
),
%% Verify that the messages are once again preserved after the rebalance:
emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
end, end,
lists:seq(1, NClients), []
infinity ).
),
Messages1 = lists:sort(fun compare_message/2, lists:append(ClientMessages)),
%% Join the second site to the DB replication sites.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])),
%% Should be no-op.
?assertEqual(ok, ds_repl_meta(N2, join_db_site, [?DB, S2])),
ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]),
%% Fill in some more messages *during* the rebalance.
MessagesRB1 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB1">>}),
?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))),
%% Now join the rest of the sites.
?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])),
ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]),
%% Fill in some more messages *during* the rebalance.
MessagesRB2 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB2">>}),
?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))),
%% Verify that each node is now responsible for 3/4 of the shards.
?assertEqual(
[(16 * 3) div length(Nodes) || _ <- Nodes],
[n_shards_online(N, ?DB) || N <- Nodes]
),
%% Verify that the set of shard servers matches the target allocation.
Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes],
ShardServers = [
shard_server_info(N, ?DB, Shard, Site, readiness)
|| {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation),
Shard <- Shards
],
?assert(
lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers),
ShardServers
),
%% Verify that the messages are preserved after the rebalance.
Messages = Messages1 ++ MessagesRB1 ++ MessagesRB2,
MessagesN4 = lists:sort(fun compare_message/2, consume(N4, ?DB, ['#'], 0)),
?assertEqual(sample(20, Messages), sample(20, MessagesN4)),
?assertEqual(Messages, MessagesN4),
%% Scale down the cluster by removing the first node.
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]),
?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))),
%% Verify that each node is now responsible for each shard.
?assertEqual(
[0, 16, 16, 16],
[n_shards_online(N, ?DB) || N <- Nodes]
),
%% Verify that the messages are once again preserved after the rebalance.
MessagesN3 = lists:sort(fun compare_message/2, consume(N3, ?DB, ['#'], 0)),
?assertEqual(sample(20, Messages), sample(20, MessagesN3)),
?assertEqual(Messages, MessagesN3).
t_join_leave_errors(init, Config) -> t_join_leave_errors(init, Config) ->
Apps = [appspec(emqx_durable_storage)], Apps = [appspec(emqx_durable_storage)],
@ -293,7 +303,7 @@ t_join_leave_errors(Config) ->
%% Should be no-op. %% Should be no-op.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])), ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])),
?assertEqual([], transitions(N1, ?DB)), ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)),
%% Impossible to leave the last site. %% Impossible to leave the last site.
?assertEqual( ?assertEqual(
@ -304,12 +314,12 @@ t_join_leave_errors(Config) ->
%% "Move" the DB to the other node. %% "Move" the DB to the other node.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])), ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])),
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertMatch([_ | _], transitions(N1, ?DB)), ?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?DB)),
?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), ?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
%% Should be no-op. %% Should be no-op.
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertEqual([], transitions(N1, ?DB)). ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)).
t_rebalance_chaotic_converges(init, Config) -> t_rebalance_chaotic_converges(init, Config) ->
Apps = [appspec(emqx_durable_storage)], Apps = [appspec(emqx_durable_storage)],
@ -333,78 +343,79 @@ t_rebalance_chaotic_converges(Config) ->
NMsgs = 500, NMsgs = 500,
Nodes = [N1, N2, N3] = ?config(nodes, Config), Nodes = [N1, N2, N3] = ?config(nodes, Config),
%% Initialize DB on first two nodes. NClients = 5,
Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}), {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages(
?assertEqual( ?FUNCTION_NAME, NClients, NMsgs
[{ok, ok}, {ok, ok}],
erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts])
), ),
%% Open DB on the last node. ?check_trace(
?assertEqual( #{},
ok, begin
erpc:call(N3, emqx_ds, open_db, [?DB, Opts]) %% Initialize DB on first two nodes.
), Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}),
%% Find out which sites there are. ?assertEqual(
Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], [{ok, ok}, {ok, ok}],
ct:pal("Sites: ~p~n", [Sites]), erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts])
),
%% Initially, the DB is assigned to [S1, S2]. %% Open DB on the last node.
?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])), ?assertEqual(
?assertEqual( ok,
lists:sort([S1, S2]), erpc:call(N3, emqx_ds, open_db, [?DB, Opts])
ds_repl_meta(N1, db_sites, [?DB]) ),
),
%% Fill the storage with messages and few additional generations. %% Find out which sites there are.
Messages0 = lists:append([ Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes],
fill_storage(N1, ?DB, NMsgs, #{client_id => <<"C1">>}), ct:pal("Sites: ~p~n", [Sites]),
fill_storage(N2, ?DB, NMsgs, #{client_id => <<"C2">>}),
fill_storage(N3, ?DB, NMsgs, #{client_id => <<"C3">>})
]),
%% Construct a chaotic transition sequence that changes assignment to [S2, S3]. Sequence = [
Sequence = [ {N1, join_db_site, S3},
{N1, join_db_site, S3}, {N2, leave_db_site, S2},
{N2, leave_db_site, S2}, {N3, leave_db_site, S1},
{N3, leave_db_site, S1}, {N1, join_db_site, S2},
{N1, join_db_site, S2}, {N2, join_db_site, S1},
{N2, join_db_site, S1}, {N3, leave_db_site, S3},
{N3, leave_db_site, S3}, {N1, leave_db_site, S1},
{N1, leave_db_site, S1}, {N2, join_db_site, S3}
{N2, join_db_site, S3} ],
],
%% Apply the sequence while also filling the storage with messages. %% Interleaved list of events:
TransitionMessages = lists:map( Stream = emqx_utils_stream:interleave(
fun({N, Operation, Site}) -> [
%% Apply the transition. {50, Stream0},
?assertEqual(ok, ds_repl_meta(N, Operation, [?DB, Site])), emqx_utils_stream:list(Sequence)
%% Give some time for at least one transition to complete. ],
Transitions = transitions(N, ?DB), true
ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), ),
?retry(200, 10, ?assertNotEqual(Transitions, transitions(N, ?DB))),
%% Fill the storage with messages. ?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])),
CID = integer_to_binary(erlang:system_time()), ?assertEqual(
fill_storage(N, ?DB, NMsgs, #{client_id => CID}) lists:sort([S1, S2]),
ds_repl_meta(N1, db_sites, [?DB]),
"Initially, the DB is assigned to [S1, S2]"
),
emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream),
%% Wait for the last transition to complete.
?retry(500, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
?defer_assert(
?assertEqual(
lists:sort([S2, S3]),
ds_repl_meta(N1, db_sites, [?DB])
)
),
%% Wait until the LTS timestamp is updated:
timer:sleep(5000),
%% Check that all messages are still there.
emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams)
end, end,
Sequence []
), ).
%% Wait for the last transition to complete.
?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))),
?assertEqual(
lists:sort([S2, S3]),
ds_repl_meta(N1, db_sites, [?DB])
),
%% Check that all messages are still there.
Messages = lists:append(TransitionMessages) ++ Messages0,
MessagesDB = lists:sort(fun compare_message/2, consume(N1, ?DB, ['#'], 0)),
?assertEqual(sample(20, Messages), sample(20, MessagesDB)),
?assertEqual(Messages, MessagesDB).
t_rebalance_offline_restarts(init, Config) -> t_rebalance_offline_restarts(init, Config) ->
Apps = [appspec(emqx_durable_storage)], Apps = [appspec(emqx_durable_storage)],
@ -447,7 +458,7 @@ t_rebalance_offline_restarts(Config) ->
%% Shut down N3 and then remove it from the DB. %% Shut down N3 and then remove it from the DB.
ok = emqx_cth_cluster:stop_node(N3), ok = emqx_cth_cluster:stop_node(N3),
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])), ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])),
Transitions = transitions(N1, ?DB), Transitions = emqx_ds_test_helpers:transitions(N1, ?DB),
ct:pal("Transitions: ~p~n", [Transitions]), ct:pal("Transitions: ~p~n", [Transitions]),
%% Wait until at least one transition completes. %% Wait until at least one transition completes.
@ -462,7 +473,7 @@ t_rebalance_offline_restarts(Config) ->
), ),
%% Target state should still be reached eventually. %% Target state should still be reached eventually.
?retry(1000, 20, ?assertEqual([], transitions(N1, ?DB))), ?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))),
?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])). ?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])).
%% %%
@ -478,15 +489,19 @@ ds_repl_meta(Node, Fun) ->
ds_repl_meta(Node, Fun, []). ds_repl_meta(Node, Fun, []).
ds_repl_meta(Node, Fun, Args) -> ds_repl_meta(Node, Fun, Args) ->
erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args). try
erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args)
catch
EC:Err:Stack ->
ct:pal("emqx_ds_replication_layer_meta:~p(~p) @~p failed:~n~p:~p~nStack: ~p", [
Fun, Args, Node, EC, Err, Stack
]),
error(meta_op_failed)
end.
ds_repl_shard(Node, Fun, Args) -> ds_repl_shard(Node, Fun, Args) ->
erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args). erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args).
transitions(Node, DB) ->
Shards = shards(Node, DB),
[{S, T} || S <- Shards, T <- ds_repl_meta(Node, replica_set_transitions, [DB, S])].
shards(Node, DB) -> shards(Node, DB) ->
erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]). erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).
@ -496,25 +511,6 @@ shards_online(Node, DB) ->
n_shards_online(Node, DB) -> n_shards_online(Node, DB) ->
length(shards_online(Node, DB)). length(shards_online(Node, DB)).
fill_storage(Node, DB, NMsgs, Opts) ->
fill_storage(Node, DB, NMsgs, 0, Opts).
fill_storage(Node, DB, NMsgs, I, Opts) when I < NMsgs ->
PAddGen = maps:get(p_addgen, Opts, 0.001),
R1 = push_message(Node, DB, I, Opts),
R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end),
R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts);
fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) ->
[].
push_message(Node, DB, I, Opts) ->
Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]),
{Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)),
ClientId = maps:get(client_id, Opts, <<?MODULE_STRING>>),
Message = message(ClientId, Topic, Bytes, I * 100),
ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]),
[Message].
add_generation(Node, DB) -> add_generation(Node, DB) ->
ok = erpc:call(Node, emqx_ds, add_generation, [DB]), ok = erpc:call(Node, emqx_ds, add_generation, [DB]),
[]. [].
@ -545,9 +541,14 @@ probably(P, Fun) ->
sample(N, List) -> sample(N, List) ->
L = length(List), L = length(List),
H = N div 2, case L =< N of
Filler = integer_to_list(L - N) ++ " more", true ->
lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L). L;
false ->
H = N div 2,
Filler = integer_to_list(L - N) ++ " more",
lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L)
end.
%% %%

View File

@ -23,7 +23,7 @@
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
opts() -> opts() ->
#{storage => {emqx_ds_storage_bitfield_lts, #{}}}. #{storage => {emqx_ds_storage_reference, #{}}}.
%% %%

View File

@ -73,13 +73,15 @@ t_iterate(_Config) ->
begin begin
[{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0), [{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0),
{ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0), {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0),
{ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next(?SHARD, It, 100), {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next(
?SHARD, It, 100, emqx_ds:timestamp_us()
),
Messages = [Msg || {_DSKey, Msg} <- MessagesAndKeys], Messages = [Msg || {_DSKey, Msg} <- MessagesAndKeys],
?assertEqual( ?assertEqual(
lists:map(fun integer_to_binary/1, Timestamps), lists:map(fun integer_to_binary/1, Timestamps),
payloads(Messages) payloads(Messages)
), ),
{ok, _, []} = emqx_ds_storage_layer:next(?SHARD, NextIt, 100) {ok, _, []} = emqx_ds_storage_layer:next(?SHARD, NextIt, 100, emqx_ds:timestamp_us())
end end
|| Topic <- Topics || Topic <- Topics
], ],
@ -370,7 +372,7 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) ->
F(It, 0) -> F(It, 0) ->
error({too_many_iterations, It}); error({too_many_iterations, It});
F(It, N) -> F(It, N) ->
case emqx_ds_storage_layer:next(Shard, It, BatchSize) of case emqx_ds_storage_layer:next(Shard, It, BatchSize, emqx_ds:timestamp_us()) of
end_of_stream -> end_of_stream ->
[]; [];
{ok, _NextIt, []} -> {ok, _NextIt, []} ->
@ -542,7 +544,11 @@ delete(_Shard, [], _Selector) ->
delete(Shard, Iterators, Selector) -> delete(Shard, Iterators, Selector) ->
{NewIterators0, N} = lists:foldl( {NewIterators0, N} = lists:foldl(
fun(Iterator0, {AccIterators, NAcc}) -> fun(Iterator0, {AccIterators, NAcc}) ->
case emqx_ds_storage_layer:delete_next(Shard, Iterator0, Selector, 10) of case
emqx_ds_storage_layer:delete_next(
Shard, Iterator0, Selector, 10, emqx_ds:timestamp_us()
)
of
{ok, end_of_stream} -> {ok, end_of_stream} ->
{AccIterators, NAcc}; {AccIterators, NAcc};
{ok, _Iterator1, 0} -> {ok, _Iterator1, 0} ->
@ -573,7 +579,7 @@ replay(_Shard, []) ->
replay(Shard, Iterators) -> replay(Shard, Iterators) ->
{NewIterators0, Messages0} = lists:foldl( {NewIterators0, Messages0} = lists:foldl(
fun(Iterator0, {AccIterators, AccMessages}) -> fun(Iterator0, {AccIterators, AccMessages}) ->
case emqx_ds_storage_layer:next(Shard, Iterator0, 10) of case emqx_ds_storage_layer:next(Shard, Iterator0, 10, emqx_ds:timestamp_us()) of
{ok, end_of_stream} -> {ok, end_of_stream} ->
{AccIterators, AccMessages}; {AccIterators, AccMessages};
{ok, _Iterator1, []} -> {ok, _Iterator1, []} ->

View File

@ -18,6 +18,14 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("stdlib/include/assert.hrl").
-define(ON(NODE, BODY),
erpc:call(NODE, erlang, apply, [fun() -> BODY end, []])
).
%% RPC mocking %% RPC mocking
mock_rpc() -> mock_rpc() ->
@ -57,8 +65,221 @@ mock_rpc_result(gen_rpc, ExpectFun) ->
end end
end). end).
%% Consume data from the DS storage on a given node as a stream:
-type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}).
%% @doc Create an infinite list of messages from a given client:
interleaved_topic_messages(TestCase, NClients, NMsgs) ->
%% List of fake client IDs:
Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)],
TopicStreams = [
{ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(TestCase, ClientId))}
|| ClientId <- Clients
],
%% Interleaved stream of messages:
Stream = emqx_utils_stream:interleave(
[{2, Stream} || {_ClientId, Stream} <- TopicStreams], true
),
{Stream, TopicStreams}.
topic_messages(TestCase, ClientId) ->
topic_messages(TestCase, ClientId, 0).
topic_messages(TestCase, ClientId, N) ->
fun() ->
NBin = integer_to_binary(N),
Msg = #message{
from = ClientId,
topic = client_topic(TestCase, ClientId),
timestamp = N * 100,
payload = <<NBin/binary, " ">>
},
[Msg | topic_messages(TestCase, ClientId, N + 1)]
end.
client_topic(TestCase, ClientId) when is_atom(TestCase) ->
client_topic(atom_to_binary(TestCase, utf8), ClientId);
client_topic(TestCase, ClientId) when is_binary(TestCase) ->
<<TestCase/binary, "/", ClientId/binary>>.
ds_topic_generation_stream(DB, Node, Shard, Topic, Stream) ->
{ok, Iterator} = ?ON(
Node,
emqx_ds_storage_layer:make_iterator(Shard, Stream, Topic, 0)
),
do_ds_topic_generation_stream(DB, Node, Shard, Iterator).
do_ds_topic_generation_stream(DB, Node, Shard, It0) ->
fun() ->
case
?ON(
Node,
begin
Now = emqx_ds_replication_layer:current_timestamp(DB, Shard),
emqx_ds_storage_layer:next(Shard, It0, 1, Now)
end
)
of
{ok, _It, []} ->
[];
{ok, end_of_stream} ->
[];
{ok, It, [KeyMsg]} ->
[KeyMsg | do_ds_topic_generation_stream(DB, Node, Shard, It)]
end
end.
%% Payload generation:
apply_stream(DB, Nodes, Stream) ->
apply_stream(
DB,
emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)),
Stream,
0
).
apply_stream(DB, NodeStream0, Stream0, N) ->
case emqx_utils_stream:next(Stream0) of
[] ->
?tp(all_done, #{});
[Msg = #message{} | Stream] ->
[Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
?tp(
test_push_message,
maps:merge(
emqx_message:to_map(Msg),
#{n => N}
)
),
?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})),
apply_stream(DB, NodeStream, Stream, N + 1);
[add_generation | Stream] ->
%% FIXME:
[Node | NodeStream] = emqx_utils_stream:next(NodeStream0),
?ON(Node, emqx_ds:add_generation(DB)),
apply_stream(DB, NodeStream, Stream, N);
[{Node, Operation, Arg} | Stream] when
Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites
->
?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}),
%% Apply the transition.
?assertEqual(
ok,
?ON(
Node,
emqx_ds_replication_layer_meta:Operation(DB, Arg)
)
),
%% Give some time for at least one transition to complete.
Transitions = transitions(Node, DB),
ct:pal("Transitions after ~p: ~p", [Operation, Transitions]),
?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))),
apply_stream(DB, NodeStream0, Stream, N);
[Fun | Stream] when is_function(Fun) ->
Fun(),
apply_stream(DB, NodeStream0, Stream, N)
end.
transitions(Node, DB) ->
?ON(
Node,
begin
Shards = emqx_ds_replication_layer_meta:shards(DB),
[
{S, T}
|| S <- Shards, T <- emqx_ds_replication_layer_meta:replica_set_transitions(DB, S)
]
end
).
%% Stream comparison
message_eq(Msg1, {_Key, Msg2}) ->
%% Timestamps can be modified by the replication layer, ignore them:
Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}.
%% Consuming streams and iterators %% Consuming streams and iterators
-spec verify_stream_effects(atom(), binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) ->
ok.
verify_stream_effects(DB, TestCase, Nodes0, L) ->
Checked = lists:flatmap(
fun({ClientId, Stream}) ->
Nodes = nodes_of_clientid(DB, ClientId, Nodes0),
ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]),
?defer_assert(
?assertMatch([_ | _], Nodes, ["No nodes have been allocated for ", ClientId])
),
[verify_stream_effects(DB, TestCase, Node, ClientId, Stream) || Node <- Nodes]
end,
L
),
?defer_assert(?assertMatch([_ | _], Checked, "Some messages have been verified")).
-spec verify_stream_effects(atom(), binary(), node(), emqx_types:clientid(), ds_stream()) -> ok.
verify_stream_effects(DB, TestCase, Node, ClientId, ExpectedStream) ->
ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]),
DiffOpts = #{context => 20, window => 1000, compare_fun => fun message_eq/2},
?defer_assert(
begin
snabbkaffe_diff:assert_lists_eq(
ExpectedStream,
ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node),
DiffOpts
),
ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node])
end
).
%% Create a stream from the topic (wildcards are NOT supported for a
%% good reason: order of messages is implementation-dependent!).
%%
%% Note: stream produces messages with keys
-spec ds_topic_stream(atom(), binary(), binary(), node()) -> ds_stream().
ds_topic_stream(DB, ClientId, TopicBin, Node) ->
Topic = emqx_topic:words(TopicBin),
Shard = shard_of_clientid(DB, Node, ClientId),
{ShardId, DSStreams} =
?ON(
Node,
begin
DBShard = {DB, Shard},
{DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)}
end
),
%% Sort streams by their rank Y, and chain them together:
emqx_utils_stream:chain([
ds_topic_generation_stream(DB, Node, ShardId, Topic, S)
|| {_RankY, S} <- lists:sort(DSStreams)
]).
%% Find which nodes from the list contain the shards for the given
%% client ID:
nodes_of_clientid(DB, ClientId, Nodes = [N0 | _]) ->
Shard = shard_of_clientid(DB, N0, ClientId),
SiteNodes = ?ON(
N0,
begin
Sites = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
lists:map(fun emqx_ds_replication_layer_meta:node/1, Sites)
end
),
lists:filter(
fun(N) ->
lists:member(N, SiteNodes)
end,
Nodes
).
shard_of_clientid(DB, Node, ClientId) ->
?ON(
Node,
emqx_ds_replication_layer:shard_of_message(DB, #message{from = ClientId}, clientid)
).
%% Consume eagerly:
consume(DB, TopicFilter) -> consume(DB, TopicFilter) ->
consume(DB, TopicFilter, 0). consume(DB, TopicFilter, 0).
@ -85,8 +306,14 @@ consume_stream(DB, Stream, TopicFilter, StartTime) ->
consume_iter(DB, It) -> consume_iter(DB, It) ->
consume_iter(DB, It, #{}). consume_iter(DB, It, #{}).
consume_iter(DB, It, Opts) -> consume_iter(DB, It0, Opts) ->
consume_iter_with(fun emqx_ds:next/3, [DB], It, Opts). consume_iter_with(
fun(It, BatchSize) ->
emqx_ds:next(DB, It, BatchSize)
end,
It0,
Opts
).
storage_consume(ShardId, TopicFilter) -> storage_consume(ShardId, TopicFilter) ->
storage_consume(ShardId, TopicFilter, 0). storage_consume(ShardId, TopicFilter, 0).
@ -108,16 +335,22 @@ storage_consume_stream(ShardId, Stream, TopicFilter, StartTime) ->
storage_consume_iter(ShardId, It) -> storage_consume_iter(ShardId, It) ->
storage_consume_iter(ShardId, It, #{}). storage_consume_iter(ShardId, It, #{}).
storage_consume_iter(ShardId, It, Opts) -> storage_consume_iter(ShardId, It0, Opts) ->
consume_iter_with(fun emqx_ds_storage_layer:next/3, [ShardId], It, Opts). consume_iter_with(
fun(It, BatchSize) ->
emqx_ds_storage_layer:next(ShardId, It, BatchSize, emqx_ds:timestamp_us())
end,
It0,
Opts
).
consume_iter_with(NextFun, Args, It0, Opts) -> consume_iter_with(NextFun, It0, Opts) ->
BatchSize = maps:get(batch_size, Opts, 5), BatchSize = maps:get(batch_size, Opts, 5),
case erlang:apply(NextFun, Args ++ [It0, BatchSize]) of case NextFun(It0, BatchSize) of
{ok, It, _Msgs = []} -> {ok, It, _Msgs = []} ->
{ok, It, []}; {ok, It, []};
{ok, It1, Batch} -> {ok, It1, Batch} ->
{ok, It, Msgs} = consume_iter_with(NextFun, Args, It1, Opts), {ok, It, Msgs} = consume_iter_with(NextFun, It1, Opts),
{ok, It, [Msg || {_DSKey, Msg} <- Batch] ++ Msgs}; {ok, It, [Msg || {_DSKey, Msg} <- Batch] ++ Msgs};
{ok, Eos = end_of_stream} -> {ok, Eos = end_of_stream} ->
{ok, Eos, []}; {ok, Eos, []};

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_gateway_jt808, [ {application, emqx_gateway_jt808, [
{description, "JT/T 808 Gateway"}, {description, "JT/T 808 Gateway"},
{vsn, "0.0.2"}, {vsn, "0.0.3"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]}, {applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []}, {env, []},

View File

@ -230,9 +230,14 @@ handle_in(Frame = ?MSG(MType), Channel = #channel{conn_state = ConnState}) when
handle_in(Frame, Channel = #channel{conn_state = connected}) -> handle_in(Frame, Channel = #channel{conn_state = connected}) ->
?SLOG(debug, #{msg => "recv_frame", frame => Frame}), ?SLOG(debug, #{msg => "recv_frame", frame => Frame}),
do_handle_in(Frame, Channel); do_handle_in(Frame, Channel);
handle_in(Frame = ?MSG(MType), Channel) when
MType =:= ?MC_DEREGISTER
->
?SLOG(debug, #{msg => "recv_frame", frame => Frame, info => "jt808_client_deregister"}),
do_handle_in(Frame, Channel#channel{conn_state = disconnected});
handle_in(Frame, Channel) -> handle_in(Frame, Channel) ->
?SLOG(error, #{msg => "unexpected_frame", frame => Frame}), ?SLOG(error, #{msg => "unexpected_frame", frame => Frame}),
{stop, unexpected_frame, Channel}. {shutdown, unexpected_frame, Channel}.
%% @private %% @private
do_handle_in(Frame = ?MSG(?MC_GENERAL_RESPONSE), Channel = #channel{inflight = Inflight}) -> do_handle_in(Frame = ?MSG(?MC_GENERAL_RESPONSE), Channel = #channel{inflight = Inflight}) ->
@ -241,19 +246,24 @@ do_handle_in(Frame = ?MSG(?MC_GENERAL_RESPONSE), Channel = #channel{inflight = I
{ok, Channel#channel{inflight = NewInflight}}; {ok, Channel#channel{inflight = NewInflight}};
do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) -> do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) ->
#{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame, #{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame,
case emqx_jt808_auth:register(Frame, Channel0#channel.auth) of case
{ok, Authcode} -> emqx_utils:pipeline(
{ok, Conninfo} = enrich_conninfo(Frame, Channel0#channel{authcode = Authcode}), [
{ok, Channel} = enrich_clientinfo(Frame, Conninfo), fun enrich_clientinfo/2,
handle_out({?MS_REGISTER_ACK, 0}, MsgSn, Channel); fun enrich_conninfo/2,
{error, Reason} -> fun set_log_meta/2
?SLOG(error, #{msg => "register_failed", reason => Reason}), ],
ResCode = Frame,
case is_integer(Reason) of Channel0
true -> Reason; )
false -> 1 of
end, {ok, _NFrame, Channel} ->
handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel0) case register_(Frame, Channel) of
{ok, NChannel} ->
handle_out({?MS_REGISTER_ACK, 0}, MsgSn, NChannel);
{error, ResCode} ->
handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel)
end
end; end;
do_handle_in(Frame = ?MSG(?MC_AUTH), Channel0) -> do_handle_in(Frame = ?MSG(?MC_AUTH), Channel0) ->
#{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame, #{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame,
@ -311,7 +321,7 @@ do_handle_in(
{ok, Channel#channel{inflight = ack_msg(?MC_DRIVER_ID_REPORT, none, Inflight)}} {ok, Channel#channel{inflight = ack_msg(?MC_DRIVER_ID_REPORT, none, Inflight)}}
end; end;
do_handle_in(?MSG(?MC_DEREGISTER), Channel) -> do_handle_in(?MSG(?MC_DEREGISTER), Channel) ->
{stop, normal, Channel}; {shutdown, normal, Channel};
do_handle_in(Frame = #{}, Channel = #channel{up_topic = Topic, inflight = Inflight}) -> do_handle_in(Frame = #{}, Channel = #channel{up_topic = Topic, inflight = Inflight}) ->
{MsgId, MsgSn} = msgidsn(Frame), {MsgId, MsgSn} = msgidsn(Frame),
_ = do_publish(Topic, Frame), _ = do_publish(Topic, Frame),
@ -859,6 +869,20 @@ is_driver_id_req_exist(#channel{inflight = Inflight}) ->
Key = get_msg_ack(?MC_DRIVER_ID_REPORT, none), Key = get_msg_ack(?MC_DRIVER_ID_REPORT, none),
emqx_inflight:contain(Key, Inflight). emqx_inflight:contain(Key, Inflight).
register_(Frame, Channel0) ->
case emqx_jt808_auth:register(Frame, Channel0#channel.auth) of
{ok, Authcode} ->
{ok, Channel0#channel{authcode = Authcode}};
{error, Reason} ->
?SLOG(error, #{msg => "register_failed", reason => Reason}),
ResCode =
case is_integer(Reason) of
true -> Reason;
false -> 1
end,
{error, ResCode}
end.
authenticate(_AuthFrame, #channel{authcode = anonymous}) -> authenticate(_AuthFrame, #channel{authcode = anonymous}) ->
true; true;
authenticate(AuthFrame, #channel{authcode = undefined, auth = Auth}) -> authenticate(AuthFrame, #channel{authcode = undefined, auth = Auth}) ->

View File

@ -68,6 +68,22 @@ gateway.jt808 {
} }
">>). ">>).
%% erlfmt-ignore
-define(CONF_INVALID_AUTH_SERVER, <<"
gateway.jt808 {
listeners.tcp.default {
bind = ", ?PORT_STR, "
}
proto {
auth {
allow_anonymous = false
registry = \"abc://abc\"
authentication = \"abc://abc\"
}
}
}
">>).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -77,6 +93,9 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok. ok.
init_per_testcase(Case = t_case_invalid_auth_reg_server, Config) ->
Apps = boot_apps(Case, ?CONF_INVALID_AUTH_SERVER, Config),
[{suite_apps, Apps} | Config];
init_per_testcase(Case = t_case02_anonymous_register_and_auth, Config) -> init_per_testcase(Case = t_case02_anonymous_register_and_auth, Config) ->
Apps = boot_apps(Case, ?CONF_ANONYMOUS, Config), Apps = boot_apps(Case, ?CONF_ANONYMOUS, Config),
[{suite_apps, Apps} | Config]; [{suite_apps, Apps} | Config];
@ -146,7 +165,7 @@ do_escape(<<C, Rest/binary>>, Acc) ->
client_regi_procedure(Socket) -> client_regi_procedure(Socket) ->
client_regi_procedure(Socket, <<"123456">>). client_regi_procedure(Socket, <<"123456">>).
client_regi_procedure(Socket, ExpectedCode) -> client_regi_procedure(Socket, ExpectedAuthCode) ->
% %
% send REGISTER % send REGISTER
% %
@ -170,7 +189,7 @@ client_regi_procedure(Socket, ExpectedCode) ->
ok = gen_tcp:send(Socket, S1), ok = gen_tcp:send(Socket, S1),
{ok, Packet} = gen_tcp:recv(Socket, 0, 500), {ok, Packet} = gen_tcp:recv(Socket, 0, 500),
AckPacket = <<MsgSn:?WORD, 0, ExpectedCode/binary>>, AckPacket = <<MsgSn:?WORD, 0, ExpectedAuthCode/binary>>,
Size2 = size(AckPacket), Size2 = size(AckPacket),
MsgId2 = ?MS_REGISTER_ACK, MsgId2 = ?MS_REGISTER_ACK,
MsgSn2 = 0, MsgSn2 = 0,
@ -181,7 +200,7 @@ client_regi_procedure(Socket, ExpectedCode) ->
?LOGT("S2=~p", [binary_to_hex_string(S2)]), ?LOGT("S2=~p", [binary_to_hex_string(S2)]),
?LOGT("Packet=~p", [binary_to_hex_string(Packet)]), ?LOGT("Packet=~p", [binary_to_hex_string(Packet)]),
?assertEqual(S2, Packet), ?assertEqual(S2, Packet),
{ok, ExpectedCode}. {ok, ExpectedAuthCode}.
client_auth_procedure(Socket, AuthCode) -> client_auth_procedure(Socket, AuthCode) ->
?LOGT("start auth procedure", []), ?LOGT("start auth procedure", []),
@ -2683,6 +2702,52 @@ t_case34_dl_0x8805_single_mm_data_ctrl(_Config) ->
ok = gen_tcp:close(Socket). ok = gen_tcp:close(Socket).
t_case_invalid_auth_reg_server(_Config) ->
{ok, Socket} = gen_tcp:connect({127, 0, 0, 1}, ?PORT, [binary, {active, false}]),
%
% send REGISTER
%
Manuf = <<"examp">>,
Model = <<"33333333333333333333">>,
DevId = <<"1234567">>,
Color = 3,
Plate = <<"ujvl239">>,
RegisterPacket =
<<58:?WORD, 59:?WORD, Manuf/binary, Model/binary, DevId/binary, Color, Plate/binary>>,
MsgId = ?MC_REGISTER,
PhoneBCD = <<16#00, 16#01, 16#23, 16#45, 16#67, 16#89>>,
MsgSn = 78,
Size = size(RegisterPacket),
Header =
<<MsgId:?WORD, ?RESERVE:2, ?NO_FRAGMENT:1, ?NO_ENCRYPT:3, ?MSG_SIZE(Size), PhoneBCD/binary,
MsgSn:?WORD>>,
S1 = gen_packet(Header, RegisterPacket),
%% Send REGISTER Packet
ok = gen_tcp:send(Socket, S1),
%% Receive REGISTER_ACK Packet
{ok, RecvPacket} = gen_tcp:recv(Socket, 0, 50_000),
%% No AuthCode when register failed
AuthCode = <<>>,
AckPacket = <<MsgSn:?WORD, 1, AuthCode/binary>>,
Size2 = size(AckPacket),
MsgId2 = ?MS_REGISTER_ACK,
MsgSn2 = 0,
Header2 =
<<MsgId2:?WORD, ?RESERVE:2, ?NO_FRAGMENT:1, ?NO_ENCRYPT:3, ?MSG_SIZE(Size2),
PhoneBCD/binary, MsgSn2:?WORD>>,
S2 = gen_packet(Header2, AckPacket),
?LOGT("S1=~p", [binary_to_hex_string(S1)]),
?LOGT("S2=~p", [binary_to_hex_string(S2)]),
?LOGT("Received REGISTER_ACK Packet=~p", [binary_to_hex_string(RecvPacket)]),
?assertEqual(S2, RecvPacket),
ok.
t_create_ALLOW_invalid_auth_config(_Config) -> t_create_ALLOW_invalid_auth_config(_Config) ->
test_invalid_config(create, true). test_invalid_config(create, true).

View File

@ -135,7 +135,6 @@ schema("/clients_v2") ->
#{ #{
'operationId' => list_clients_v2, 'operationId' => list_clients_v2,
get => #{ get => #{
security => [],
description => ?DESC(list_clients), description => ?DESC(list_clients),
tags => ?TAGS, tags => ?TAGS,
parameters => fields(list_clients_v2_inputs), parameters => fields(list_clients_v2_inputs),
@ -575,6 +574,11 @@ fields(client) ->
desc => desc =>
<<"Indicates whether the client is connected via bridge">> <<"Indicates whether the client is connected via bridge">>
})}, })},
{is_expired,
hoconsc:mk(boolean(), #{
desc =>
<<"Indicates whether the client session is expired">>
})},
{keepalive, {keepalive,
hoconsc:mk(integer(), #{ hoconsc:mk(integer(), #{
desc => desc =>
@ -985,7 +989,7 @@ do_list_clients_v2(Nodes, _Cursor = #{type := ?CURSOR_TYPE_DS, iterator := Iter0
#{limit := Limit} = Acc0, #{limit := Limit} = Acc0,
{Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit), {Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit),
NewCursor = next_ds_cursor(Iter), NewCursor = next_ds_cursor(Iter),
Rows1 = drop_live_and_expired(Rows0), Rows1 = check_for_live_and_expired(Rows0),
Rows = maybe_run_fuzzy_filter(Rows1, QString0), Rows = maybe_run_fuzzy_filter(Rows1, QString0),
Acc1 = maps:update_with(rows, fun(Rs) -> [{undefined, Rows} | Rs] end, Acc0), Acc1 = maps:update_with(rows, fun(Rs) -> [{undefined, Rows} | Rs] end, Acc0),
Acc = #{n := N} = maps:update_with(n, fun(N) -> N + length(Rows) end, Acc1), Acc = #{n := N} = maps:update_with(n, fun(N) -> N + length(Rows) end, Acc1),
@ -1513,7 +1517,7 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
%% through all the nodes. %% through all the nodes.
#{limit := Limit} = QueryState, #{limit := Limit} = QueryState,
{Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit), {Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit),
Rows = drop_live_and_expired(Rows0), Rows = check_for_live_and_expired(Rows0),
case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of
{enough, NResultAcc} -> {enough, NResultAcc} ->
emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true)); emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true));
@ -1523,14 +1527,15 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
do_persistent_session_query1(NResultAcc, QueryState, Iter) do_persistent_session_query1(NResultAcc, QueryState, Iter)
end. end.
drop_live_and_expired(Rows) -> check_for_live_and_expired(Rows) ->
lists:filtermap( lists:filtermap(
fun({ClientId, Session}) -> fun({ClientId, Session}) ->
case is_expired(Session) orelse is_live_session(ClientId) of case is_live_session(ClientId) of
true -> true ->
false; false;
false -> false ->
{true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}} DSSession = emqx_persistent_session_ds_state:print_session(ClientId),
{true, {ClientId, DSSession#{is_expired => is_expired(Session)}}}
end end
end, end,
Rows Rows
@ -1730,7 +1735,11 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}, Opts) ->
ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2), ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2),
ClientInfoMap4 = maps:put(port, Port, ClientInfoMap3), ClientInfoMap4 = maps:put(port, Port, ClientInfoMap3),
ClientInfoMap5 = convert_expiry_interval_unit(ClientInfoMap4), ClientInfoMap5 = convert_expiry_interval_unit(ClientInfoMap4),
ClientInfoMap = maps:put(connected, Connected, ClientInfoMap5), ClientInfoMap6 = maps:put(connected, Connected, ClientInfoMap5),
%% Since this is for the memory session format, and its lifetime is linked to the
%% channel process, we may say it's not expired. Durable sessions will override this
%% field if needed in their format function.
ClientInfoMap = maps:put(is_expired, false, ClientInfoMap6),
#{fields := RequestedFields} = Opts, #{fields := RequestedFields} = Opts,
TimesKeys = [created_at, connected_at, disconnected_at], TimesKeys = [created_at, connected_at, disconnected_at],
@ -1755,6 +1764,7 @@ format_persistent_session_info(
connected => false, connected => false,
durable => true, durable => true,
is_persistent => true, is_persistent => true,
is_expired => maps:get(is_expired, PSInfo, false),
subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{})) subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{}))
}; };
format_persistent_session_info(ClientId, PSInfo0) -> format_persistent_session_info(ClientId, PSInfo0) ->
@ -1776,6 +1786,7 @@ format_persistent_session_info(ClientId, PSInfo0) ->
connected_at => CreatedAt, connected_at => CreatedAt,
durable => true, durable => true,
ip_address => IpAddress, ip_address => IpAddress,
is_expired => maps:get(is_expired, PSInfo0, false),
is_persistent => true, is_persistent => true,
port => Port, port => Port,
heap_size => 0, heap_size => 0,

View File

@ -49,6 +49,7 @@ persistent_session_testcases() ->
t_persistent_sessions3, t_persistent_sessions3,
t_persistent_sessions4, t_persistent_sessions4,
t_persistent_sessions5, t_persistent_sessions5,
t_persistent_sessions6,
t_persistent_sessions_subscriptions1, t_persistent_sessions_subscriptions1,
t_list_clients_v2 t_list_clients_v2
]. ].
@ -553,6 +554,51 @@ t_persistent_sessions5(Config) ->
), ),
ok. ok.
%% Checks that expired durable sessions are returned with `is_expired => true'.
t_persistent_sessions6(Config) ->
[N1, _N2] = ?config(nodes, Config),
APIPort = 18084,
Port1 = get_mqtt_port(N1, tcp),
?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
?check_trace(
begin
O = #{api_port => APIPort},
ClientId = <<"c1">>,
C1 = connect_client(#{port => Port1, clientid => ClientId, expiry => 1}),
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
?retry(
100,
20,
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"is_expired">> := false}]}}},
list_request(APIPort)
)
),
ok = emqtt:disconnect(C1),
%% Wait for session to be considered expired but not GC'ed
ct:sleep(2_000),
assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}),
?retry(
100,
20,
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"is_expired">> := true}]}}},
list_request(APIPort)
)
),
C2 = connect_client(#{port => Port1, clientid => ClientId}),
disconnect_and_destroy_session(C2),
ok
end,
[]
),
ok.
%% Check that the output of `/clients/:clientid/subscriptions' has the expected keys. %% Check that the output of `/clients/:clientid/subscriptions' has the expected keys.
t_persistent_sessions_subscriptions1(Config) -> t_persistent_sessions_subscriptions1(Config) ->
[N1, _N2] = ?config(nodes, Config), [N1, _N2] = ?config(nodes, Config),

View File

@ -25,7 +25,7 @@
-define(CONFIG_FORMAT_MAP, config_format_map). -define(CONFIG_FORMAT_MAP, config_format_map).
-type schema_name() :: binary(). -type schema_name() :: binary().
-type avsc() :: binary(). -type avsc_path() :: string().
-type encoded_data() :: iodata(). -type encoded_data() :: iodata().
-type decoded_data() :: map(). -type decoded_data() :: map().

View File

@ -94,6 +94,8 @@
-define(RAW_BIN, binary). -define(RAW_BIN, binary).
-define(JSON_MAP, json_map). -define(JSON_MAP, json_map).
-define(MAX_KEEP_BACKUP_CONFIGS, 10).
%% "my_plugin-0.1.0" %% "my_plugin-0.1.0"
-type name_vsn() :: binary() | string(). -type name_vsn() :: binary() | string().
%% the parse result of the JSON info file %% the parse result of the JSON info file
@ -287,7 +289,7 @@ get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) ->
%% the avro Json Map and plugin config ALWAYS be valid before calling this function. %% the avro Json Map and plugin config ALWAYS be valid before calling this function.
put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) -> put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) ->
AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), AvroJsonBin = emqx_utils_json:encode(AvroJsonMap),
ok = write_avro_bin(NameVsn, AvroJsonBin), ok = backup_and_write_avro_bin(NameVsn, AvroJsonBin),
ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap), ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap),
ok. ok.
@ -1020,10 +1022,7 @@ for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) ->
{error, Reason} -> [{NameVsn, Reason}] {error, Reason} -> [{NameVsn, Reason}]
end; end;
for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) -> for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) ->
?SLOG(debug, #{ ?SLOG(debug, #{msg => "plugin_disabled", name_vsn => NameVsn}),
msg => "plugin_disabled",
name_vsn => NameVsn
}),
[]. [].
maybe_post_op_after_install(NameVsn) -> maybe_post_op_after_install(NameVsn) ->
@ -1057,8 +1056,69 @@ maybe_create_config_dir(NameVsn) ->
{error, {mkdir_failed, ConfigDir, Reason}} {error, {mkdir_failed, ConfigDir, Reason}}
end. end.
write_avro_bin(NameVsn, AvroBin) -> %% @private Backup the current config to a file with a timestamp suffix and
ok = file:write_file(avro_config_file(NameVsn), AvroBin). %% then save the new config to the config file.
backup_and_write_avro_bin(NameVsn, AvroBin) ->
%% this may fail, but we don't care
%% e.g. read-only file system
Path = avro_config_file(NameVsn),
_ = filelib:ensure_dir(Path),
TmpFile = Path ++ ".tmp",
case file:write_file(TmpFile, AvroBin) of
ok ->
backup_and_replace(Path, TmpFile);
{error, Reason} ->
?SLOG(error, #{
msg => "failed_to_save_plugin_conf_file",
hint =>
"The updated cluster config is not saved on this node, please check the file system.",
filename => TmpFile,
reason => Reason
}),
%% e.g. read-only, it's not the end of the world
ok
end.
backup_and_replace(Path, TmpPath) ->
Backup = Path ++ "." ++ now_time() ++ ".bak",
case file:rename(Path, Backup) of
ok ->
ok = file:rename(TmpPath, Path),
ok = prune_backup_files(Path);
{error, enoent} ->
%% not created yet
ok = file:rename(TmpPath, Path);
{error, Reason} ->
?SLOG(warning, #{
msg => "failed_to_backup_plugin_conf_file",
filename => Backup,
reason => Reason
}),
ok
end.
prune_backup_files(Path) ->
Files0 = filelib:wildcard(Path ++ ".*"),
Re = "\\.[0-9]{4}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{3}\\.bak$",
Files = lists:filter(fun(F) -> re:run(F, Re) =/= nomatch end, Files0),
Sorted = lists:reverse(lists:sort(Files)),
{_Keeps, Deletes} = lists:split(min(?MAX_KEEP_BACKUP_CONFIGS, length(Sorted)), Sorted),
lists:foreach(
fun(F) ->
case file:delete(F) of
ok ->
ok;
{error, Reason} ->
?SLOG(warning, #{
msg => "failed_to_delete_backup_plugin_conf_file",
filename => F,
reason => Reason
}),
ok
end
end,
Deletes
).
read_file_fun(Path, ErrMsg, #{read_mode := ?RAW_BIN}) -> read_file_fun(Path, ErrMsg, #{read_mode := ?RAW_BIN}) ->
fun() -> fun() ->
@ -1082,30 +1142,38 @@ read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) ->
end. end.
%% Directorys %% Directorys
-spec plugin_dir(name_vsn()) -> string().
plugin_dir(NameVsn) -> plugin_dir(NameVsn) ->
filename:join([install_dir(), NameVsn]). wrap_list_path(filename:join([install_dir(), NameVsn])).
-spec plugin_config_dir(name_vsn()) -> string().
plugin_config_dir(NameVsn) -> plugin_config_dir(NameVsn) ->
filename:join([plugin_dir(NameVsn), "data", "configs"]). wrap_list_path(filename:join([plugin_dir(NameVsn), "data", "configs"])).
%% Files %% Files
-spec pkg_file_path(name_vsn()) -> string().
pkg_file_path(NameVsn) -> pkg_file_path(NameVsn) ->
filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]). wrap_list_path(filename:join([install_dir(), bin([NameVsn, ".tar.gz"])])).
-spec info_file_path(name_vsn()) -> string().
info_file_path(NameVsn) -> info_file_path(NameVsn) ->
filename:join([plugin_dir(NameVsn), "release.json"]). wrap_list_path(filename:join([plugin_dir(NameVsn), "release.json"])).
-spec avsc_file_path(name_vsn()) -> string().
avsc_file_path(NameVsn) -> avsc_file_path(NameVsn) ->
filename:join([plugin_dir(NameVsn), "config_schema.avsc"]). wrap_list_path(filename:join([plugin_dir(NameVsn), "config_schema.avsc"])).
-spec avro_config_file(name_vsn()) -> string().
avro_config_file(NameVsn) -> avro_config_file(NameVsn) ->
filename:join([plugin_config_dir(NameVsn), "config.avro"]). wrap_list_path(filename:join([plugin_config_dir(NameVsn), "config.avro"])).
-spec i18n_file_path(name_vsn()) -> string().
i18n_file_path(NameVsn) -> i18n_file_path(NameVsn) ->
filename:join([plugin_dir(NameVsn), "config_i18n.json"]). wrap_list_path(filename:join([plugin_dir(NameVsn), "config_i18n.json"])).
-spec readme_file(name_vsn()) -> string().
readme_file(NameVsn) -> readme_file(NameVsn) ->
filename:join([plugin_dir(NameVsn), "README.md"]). wrap_list_path(filename:join([plugin_dir(NameVsn), "README.md"])).
running_apps() -> running_apps() ->
lists:map( lists:map(
@ -1115,6 +1183,17 @@ running_apps() ->
application:which_applications(infinity) application:which_applications(infinity)
). ).
%% @private This is the same human-readable timestamp format as
%% hocon-cli generated app.<time>.config file name.
now_time() ->
Ts = os:system_time(millisecond),
{{Y, M, D}, {HH, MM, SS}} = calendar:system_time_to_local_time(Ts, millisecond),
Res = io_lib:format(
"~0p.~2..0b.~2..0b.~2..0b.~2..0b.~2..0b.~3..0b",
[Y, M, D, HH, MM, SS, Ts rem 1000]
),
lists:flatten(Res).
bin_key(Map) when is_map(Map) -> bin_key(Map) when is_map(Map) ->
maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map); maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
bin_key(List = [#{} | _]) -> bin_key(List = [#{} | _]) ->
@ -1125,3 +1204,6 @@ bin_key(Term) ->
bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8); bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
bin(B) when is_binary(B) -> B. bin(B) when is_binary(B) -> B.
wrap_list_path(Path) ->
binary_to_list(iolist_to_binary(Path)).

View File

@ -58,7 +58,7 @@ lookup_serde(SchemaName) ->
{ok, Serde} {ok, Serde}
end. end.
-spec add_schema(schema_name(), avsc()) -> ok | {error, term()}. -spec add_schema(schema_name(), avsc_path()) -> ok | {error, term()}.
add_schema(NameVsn, Path) -> add_schema(NameVsn, Path) ->
case lookup_serde(NameVsn) of case lookup_serde(NameVsn) of
{ok, _Serde} -> {ok, _Serde} ->
@ -157,7 +157,7 @@ get_plugin_avscs() ->
lists:foldl( lists:foldl(
fun(AvscPath, AccIn) -> fun(AvscPath, AccIn) ->
[_, NameVsn | _] = lists:reverse(filename:split(AvscPath)), [_, NameVsn | _] = lists:reverse(filename:split(AvscPath)),
[{NameVsn, AvscPath} | AccIn] [{to_bin(NameVsn), AvscPath} | AccIn]
end, end,
_Acc0 = [], _Acc0 = [],
filelib:wildcard(Pattern) filelib:wildcard(Pattern)
@ -186,6 +186,8 @@ do_build_serde({NameVsn, AvscPath}) ->
{error, Error} {error, Error}
end. end.
make_serde(NameVsn, AvscPath) when not is_binary(NameVsn) ->
make_serde(to_bin(NameVsn), AvscPath);
make_serde(NameVsn, AvscPath) -> make_serde(NameVsn, AvscPath) ->
{ok, AvscBin} = read_avsc_file(AvscPath), {ok, AvscBin} = read_avsc_file(AvscPath),
Store0 = avro_schema_store:new([map]), Store0 = avro_schema_store:new([map]),

View File

@ -270,6 +270,7 @@ schema("/rules/:id/test") ->
tags => [<<"rules">>], tags => [<<"rules">>],
description => ?DESC("api11"), description => ?DESC("api11"),
summary => <<"Apply a rule for testing">>, summary => <<"Apply a rule for testing">>,
parameters => param_path_id(),
'requestBody' => rule_apply_test_schema(), 'requestBody' => rule_apply_test_schema(),
responses => #{ responses => #{
400 => error_schema('BAD_REQUEST', "Invalid Parameters"), 400 => error_schema('BAD_REQUEST', "Invalid Parameters"),

View File

@ -20,11 +20,15 @@
-export([ -export([
empty/0, empty/0,
list/1, list/1,
const/1,
mqueue/1, mqueue/1,
map/2, map/2,
transpose/1, transpose/1,
chain/1,
chain/2, chain/2,
repeat/1 repeat/1,
interleave/2,
limit_length/2
]). ]).
%% Evaluating %% Evaluating
@ -69,6 +73,11 @@ list([]) ->
list([X | Rest]) -> list([X | Rest]) ->
fun() -> [X | list(Rest)] end. fun() -> [X | list(Rest)] end.
%% @doc Make a stream with a single element infinitely repeated
-spec const(T) -> stream(T).
const(T) ->
fun() -> [T | const(T)] end.
%% @doc Make a stream out of process message queue. %% @doc Make a stream out of process message queue.
-spec mqueue(timeout()) -> stream(any()). -spec mqueue(timeout()) -> stream(any()).
mqueue(Timeout) -> mqueue(Timeout) ->
@ -118,6 +127,11 @@ transpose_tail(S, Tail) ->
end end
end. end.
%% @doc Make a stream by concatenating multiple streams.
-spec chain([stream(X)]) -> stream(X).
chain(L) ->
lists:foldl(fun chain/2, empty(), L).
%% @doc Make a stream by chaining (concatenating) two streams. %% @doc Make a stream by chaining (concatenating) two streams.
%% The second stream begins to produce values only after the first one is exhausted. %% The second stream begins to produce values only after the first one is exhausted.
-spec chain(stream(X), stream(Y)) -> stream(X | Y). -spec chain(stream(X), stream(Y)) -> stream(X | Y).
@ -144,6 +158,45 @@ repeat(S) ->
end end
end. end.
%% @doc Interleave the elements of the streams.
%%
%% This function accepts a list of tuples where the first element
%% specifies size of the "batch" to be consumed from the stream at a
%% time (stream is the second tuple element). If element of the list
%% is a plain stream, then the batch size is assumed to be 1.
%%
%% If `ContinueAtEmpty' is `false', and one of the streams returns
%% `[]', then the function will return `[]' as well. Otherwise, it
%% will continue consuming data from the remaining streams.
-spec interleave([stream(X) | {non_neg_integer(), stream(X)}], boolean()) -> stream(X).
interleave(L0, ContinueAtEmpty) ->
L = lists:map(
fun
(Stream) when is_function(Stream) ->
{1, Stream};
(A = {N, _}) when N >= 0 ->
A
end,
L0
),
fun() ->
do_interleave(ContinueAtEmpty, 0, L, [])
end.
%% @doc Truncate list to the given length
-spec limit_length(non_neg_integer(), stream(X)) -> stream(X).
limit_length(0, _) ->
fun() -> [] end;
limit_length(N, S) when N >= 0 ->
fun() ->
case next(S) of
[] ->
[];
[X | S1] ->
[X | limit_length(N - 1, S1)]
end
end.
%% %%
%% @doc Produce the next value from the stream. %% @doc Produce the next value from the stream.
@ -237,3 +290,24 @@ csv_read_line([Line | Lines]) ->
{Fields, Lines}; {Fields, Lines};
csv_read_line([]) -> csv_read_line([]) ->
eof. eof.
do_interleave(_Cont, _, [], []) ->
[];
do_interleave(Cont, N, [{N, S} | Rest], Rev) ->
do_interleave(Cont, 0, Rest, [{N, S} | Rev]);
do_interleave(Cont, _, [], Rev) ->
do_interleave(Cont, 0, lists:reverse(Rev), []);
do_interleave(Cont, I, [{N, S} | Rest], Rev) when I < N ->
case next(S) of
[] when Cont ->
do_interleave(Cont, 0, Rest, Rev);
[] ->
[];
[X | S1] ->
[
X
| fun() ->
do_interleave(Cont, I + 1, [{N, S1} | Rest], Rev)
end
]
end.

View File

@ -157,6 +157,22 @@ mqueue_test() ->
emqx_utils_stream:consume(emqx_utils_stream:mqueue(400)) emqx_utils_stream:consume(emqx_utils_stream:mqueue(400))
). ).
interleave_test() ->
S1 = emqx_utils_stream:list([1, 2, 3]),
S2 = emqx_utils_stream:list([a, b, c, d]),
?assertEqual(
[1, 2, a, b, 3, c, d],
emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], true))
).
interleave_stop_test() ->
S1 = emqx_utils_stream:const(1),
S2 = emqx_utils_stream:list([a, b, c, d]),
?assertEqual(
[1, 1, a, b, 1, 1, c, d, 1, 1],
emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], false))
).
csv_test() -> csv_test() ->
Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>, Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>,
?assertEqual( ?assertEqual(

View File

@ -0,0 +1,4 @@
The MQTT listerners config option `access_rules` has been improved in the following ways:
* The listener no longer crash with an incomprehensible error message if a non-valid access rule is configured. Instead a configuration error is generated.
* One can now add several rules in a single string by separating them by comma (for example, "allow 10.0.1.0/24, deny all").

View File

@ -0,0 +1 @@
Fixed the issue where the JT/T 808 gateway could not correctly reply to the REGISTER_ACK message when requesting authentication from the registration service failed.

View File

@ -71,7 +71,7 @@ defmodule EMQXUmbrella.MixProject do
{:telemetry, "1.1.0"}, {:telemetry, "1.1.0"},
# in conflict by emqtt and hocon # in conflict by emqtt and hocon
{:getopt, "1.0.2", override: true}, {:getopt, "1.0.2", override: true},
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.10", override: true},
{:hocon, github: "emqx/hocon", tag: "0.42.2", override: true}, {:hocon, github: "emqx/hocon", tag: "0.42.2", override: true},
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.3", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.3", override: true},
{:esasl, github: "emqx/esasl", tag: "0.2.1"}, {:esasl, github: "emqx/esasl", tag: "0.2.1"},

View File

@ -96,7 +96,7 @@
{observer_cli, "1.7.1"}, {observer_cli, "1.7.1"},
{system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}},
{getopt, "1.0.2"}, {getopt, "1.0.2"},
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.2"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.2"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
{esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.1"}}}, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.1"}}},

View File

@ -190,7 +190,7 @@ plugins() ->
test_plugins() -> test_plugins() ->
[ [
{rebar3_proper, "0.12.1"}, {rebar3_proper, "0.12.1"},
{coveralls, {git, "https://github.com/emqx/coveralls-erl", {tag, "v2.2.0-emqx-1"}}} {coveralls, {git, "https://github.com/emqx/coveralls-erl", {tag, "v2.2.0-emqx-2"}}}
]. ].
test_deps() -> test_deps() ->

View File

@ -50,6 +50,7 @@ topic.desc:
topic.label: topic.label:
"""RocketMQ Topic""" """RocketMQ Topic"""
namespace.label: """Namespace / Instance ID"""
namespace.desc: namespace.desc:
"""The namespace field MUST be set if you are using the RocketMQ service in """The namespace field MUST be set if you are using the RocketMQ service in
aliyun cloud and also the namespace is enabled, aliyun cloud and also the namespace is enabled,

View File

@ -79,9 +79,9 @@ emqx_prepare(){
if [ ! -d "${PAHO_MQTT_TESTING_PATH}" ]; then if [ ! -d "${PAHO_MQTT_TESTING_PATH}" ]; then
git clone -b develop-4.0 https://github.com/emqx/paho.mqtt.testing.git "${PAHO_MQTT_TESTING_PATH}" git clone -b develop-4.0 https://github.com/emqx/paho.mqtt.testing.git "${PAHO_MQTT_TESTING_PATH}"
fi fi
# Debian 12 complains if we don't use venv # Debian 12 and Ubuntu 24.04 complain if we don't use venv
case "${SYSTEM:-}" in case "${SYSTEM:-}" in
debian12) debian12|ubuntu24.04)
apt-get update -y && apt-get install -y virtualenv apt-get update -y && apt-get install -y virtualenv
virtualenv venv virtualenv venv
# https://www.shellcheck.net/wiki/SC1091 # https://www.shellcheck.net/wiki/SC1091