diff --git a/.github/workflows/_pr_entrypoint.yaml b/.github/workflows/_pr_entrypoint.yaml index 4c8349f3f..b56c958a6 100644 --- a/.github/workflows/_pr_entrypoint.yaml +++ b/.github/workflows/_pr_entrypoint.yaml @@ -14,9 +14,6 @@ on: env: IS_CI: "yes" -permissions: - contents: read - jobs: sanity-checks: runs-on: ubuntu-22.04 @@ -32,6 +29,9 @@ jobs: otp_vsn: "26.2.1-2" elixir_vsn: "1.15.7" + permissions: + contents: read + steps: - uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 with: @@ -127,6 +127,9 @@ jobs: - emqx - emqx-enterprise + permissions: + contents: read + steps: - uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 with: diff --git a/.github/workflows/_push-entrypoint.yaml b/.github/workflows/_push-entrypoint.yaml index 24ac94db8..7033ab989 100644 --- a/.github/workflows/_push-entrypoint.yaml +++ b/.github/workflows/_push-entrypoint.yaml @@ -13,9 +13,10 @@ on: - 'master' - 'release-5[0-9]' - 'ci/**' - -permissions: - contents: read + workflow_dispatch: + inputs: + ref: + required: false env: IS_CI: 'yes' @@ -36,6 +37,9 @@ jobs: otp_vsn: '26.2.1-2' elixir_vsn: '1.15.7' + permissions: + contents: read + steps: - uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 with: @@ -132,6 +136,9 @@ jobs: - emqx - emqx-enterprise + permissions: + contents: read + steps: - uses: actions/checkout@9bb56186c3b09b4f86b1c65136769dd318469633 # v4.1.2 with: diff --git a/.github/workflows/build_and_push_docker_images.yaml b/.github/workflows/build_and_push_docker_images.yaml index dbca3fe38..ec6747c32 100644 --- a/.github/workflows/build_and_push_docker_images.yaml +++ b/.github/workflows/build_and_push_docker_images.yaml @@ -142,14 +142,14 @@ jobs: - name: Login to hub.docker.com uses: docker/login-action@e92390c5fb421da1463c202d546fed0ec5c39f20 # v3.1.0 - if: inputs.publish || github.repository_owner != 'emqx' + if: inputs.publish && contains(matrix.profile[1], 'docker.io') with: username: ${{ secrets.DOCKER_HUB_USER }} password: ${{ secrets.DOCKER_HUB_TOKEN }} - name: Login to AWS ECR uses: docker/login-action@e92390c5fb421da1463c202d546fed0ec5c39f20 # v3.1.0 - if: inputs.publish || github.repository_owner != 'emqx' + if: inputs.publish && contains(matrix.profile[1], 'public.ecr.aws') with: registry: public.ecr.aws username: ${{ secrets.AWS_ACCESS_KEY_ID }} diff --git a/.github/workflows/codeql.yaml b/.github/workflows/codeql.yaml index d0b15b119..5bb2d29f0 100644 --- a/.github/workflows/codeql.yaml +++ b/.github/workflows/codeql.yaml @@ -10,6 +10,7 @@ permissions: jobs: analyze: + if: github.repository == 'emqx/emqx' name: Analyze runs-on: ubuntu-22.04 timeout-minutes: 360 diff --git a/.github/workflows/green_master.yaml b/.github/workflows/green_master.yaml index 7053247e3..50ff087f9 100644 --- a/.github/workflows/green_master.yaml +++ b/.github/workflows/green_master.yaml @@ -30,9 +30,10 @@ jobs: shell: bash env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_REPO: ${{ github.repository }} run: | - gh api --method GET -f head_sha=$(git rev-parse HEAD) -f status=completed -f exclude_pull_requests=true /repos/emqx/emqx/actions/runs > runs.json + gh api --method GET -f head_sha=$(git rev-parse HEAD) -f status=completed -f exclude_pull_requests=true /repos/${GITHUB_REPO}/actions/runs > runs.json for id in $(jq -r '.workflow_runs[] | select((."conclusion" == "failure") and (."name" != "Keep master green") and .run_attempt < 3) | .id' runs.json); do - echo "rerun https://github.com/emqx/emqx/actions/runs/$id" - gh api --method POST /repos/emqx/emqx/actions/runs/$id/rerun-failed-jobs || true + echo "rerun https://github.com/${GITHUB_REPO}/actions/runs/$id" + gh api --method POST /repos/${GITHUB_REPO}/actions/runs/$id/rerun-failed-jobs || true done diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index 47e2fd194..0c7c4adfe 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -20,9 +20,6 @@ on: required: true type: string -permissions: - contents: read - env: IS_CI: "yes" @@ -40,35 +37,39 @@ jobs: shell: bash container: "ghcr.io/emqx/emqx-builder/${{ matrix.builder }}:${{ matrix.elixir }}-${{ matrix.otp }}-ubuntu22.04" + env: + PROFILE: ${{ matrix.profile }} + ENABLE_COVER_COMPILE: 1 + CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }} + + permissions: + contents: read + steps: - uses: actions/download-artifact@c850b930e6ba138125429b7e5c93fc707a7f8427 # v4.1.4 with: name: ${{ matrix.profile }} + - name: extract artifact run: | unzip -o -q ${{ matrix.profile }}.zip git config --global --add safe.directory "$GITHUB_WORKSPACE" # produces eunit.coverdata - - name: eunit - env: - PROFILE: ${{ matrix.profile }} - ENABLE_COVER_COMPILE: 1 - CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }} - run: make eunit + - run: make eunit # produces proper.coverdata - - name: proper - env: - PROFILE: ${{ matrix.profile }} - ENABLE_COVER_COMPILE: 1 - CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }} - run: make proper + - run: make proper - - uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 - with: - name: coverdata-${{ matrix.profile }}-${{ matrix.otp }} - path: _build/test/cover - retention-days: 7 + - run: make cover + + - name: send to coveralls + if: github.repository == 'emqx/emqx' + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: make coveralls + + - run: cat rebar3.crashdump + if: failure() ct_docker: runs-on: ${{ github.repository_owner == 'emqx' && fromJSON('["self-hosted","ephemeral","linux","x64"]') || 'ubuntu-22.04' }} @@ -82,6 +83,12 @@ jobs: run: shell: bash + env: + PROFILE: ${{ matrix.profile }} + + permissions: + contents: read + steps: - uses: actions/download-artifact@c850b930e6ba138125429b7e5c93fc707a7f8427 # v4.1.4 with: @@ -89,7 +96,6 @@ jobs: - name: extract artifact run: | unzip -o -q ${{ matrix.profile }}.zip - git config --global --add safe.directory "$GITHUB_WORKSPACE" # produces $PROFILE---sg.coverdata - name: run common tests @@ -103,19 +109,30 @@ jobs: TDENGINE_TAG: "3.0.2.4" OPENTS_TAG: "9aa7f88" MINIO_TAG: "RELEASE.2023-03-20T20-16-18Z" - PROFILE: ${{ matrix.profile }} SUITEGROUP: ${{ matrix.suitegroup }} ENABLE_COVER_COMPILE: 1 CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }}-sg${{ matrix.suitegroup }} - run: ./scripts/ct/run.sh --ci --app ${{ matrix.app }} - - uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 - with: - name: coverdata-${{ matrix.profile }}-${{ matrix.prefix }}-${{ matrix.otp }}-sg${{ matrix.suitegroup }} - path: _build/test/cover - retention-days: 7 + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: ./scripts/ct/run.sh --ci --app ${{ matrix.app }} --keep-up + + - name: make cover + run: | + docker exec -e PROFILE="$PROFILE" -t erlang make cover + + - name: send to coveralls + if: github.repository == 'emqx/emqx' + run: | + ls _build/test/cover/*.coverdata || exit 0 + docker exec -e PROFILE="$PROFILE" -t erlang make coveralls + + - name: rebar3.crashdump + if: failure() + run: cat rebar3.crashdump + - name: compress logs if: failure() run: tar -czf logs.tar.gz _build/test/logs + - uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 if: failure() with: @@ -137,6 +154,15 @@ jobs: run: shell: bash + permissions: + contents: read + + env: + PROFILE: ${{ matrix.profile }} + SUITEGROUP: ${{ matrix.suitegroup }} + ENABLE_COVER_COMPILE: 1 + CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }}-sg${{ matrix.suitegroup }} + steps: - uses: actions/download-artifact@c850b930e6ba138125429b7e5c93fc707a7f8427 # v4.1.4 with: @@ -148,22 +174,25 @@ jobs: # produces $PROFILE---sg.coverdata - name: run common tests + run: make "${{ matrix.app }}-ct" + + - run: make cover + + - name: send to coveralls + if: github.repository == 'emqx/emqx' env: - PROFILE: ${{ matrix.profile }} - SUITEGROUP: ${{ matrix.suitegroup }} - ENABLE_COVER_COMPILE: 1 - CT_COVER_EXPORT_PREFIX: ${{ matrix.profile }}-${{ matrix.otp }}-sg${{ matrix.suitegroup }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} run: | - make "${{ matrix.app }}-ct" - - uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 - with: - name: coverdata-${{ matrix.profile }}-${{ matrix.prefix }}-${{ matrix.otp }}-sg${{ matrix.suitegroup }} - path: _build/test/cover - if-no-files-found: warn # do not fail if no coverdata found - retention-days: 7 + ls _build/test/cover/*.coverdata || exit 0 + make coveralls + + - run: cat rebar3.crashdump + if: failure() + - name: compress logs if: failure() run: tar -czf logs.tar.gz _build/test/logs + - uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 if: failure() with: @@ -180,61 +209,18 @@ jobs: runs-on: ubuntu-22.04 strategy: fail-fast: false + + permissions: + pull-requests: write + steps: + - name: Coveralls finished + if: github.repository == 'emqx/emqx' + uses: coverallsapp/github-action@3dfc5567390f6fa9267c0ee9c251e4c8c3f18949 # v2.2.3 + with: + parallel-finished: true + git-branch: ${{ github.ref }} + git-commit: ${{ github.sha }} + - run: echo "All tests passed" - - make_cover: - needs: - - eunit_and_proper - - ct - - ct_docker - runs-on: ${{ endsWith(github.repository, '/emqx') && 'ubuntu-22.04' || fromJSON('["self-hosted","ephemeral","linux","x64"]') }} - container: ${{ inputs.builder }} - strategy: - fail-fast: false - matrix: - profile: - - emqx-enterprise - steps: - - uses: actions/download-artifact@c850b930e6ba138125429b7e5c93fc707a7f8427 # v4.1.4 - with: - name: ${{ matrix.profile }} - - name: extract artifact - run: | - unzip -o -q ${{ matrix.profile }}.zip - git config --global --add safe.directory "$GITHUB_WORKSPACE" - - - uses: actions/download-artifact@c850b930e6ba138125429b7e5c93fc707a7f8427 # v4.1.4 - name: download coverdata - with: - pattern: coverdata-${{ matrix.profile }}-* - path: _build/test/cover - merge-multiple: true - - - name: make cover - env: - PROFILE: emqx-enterprise - run: make cover - - - name: send to coveralls - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - PROFILE: emqx-enterprise - run: make coveralls - - - name: get coveralls logs - if: failure() - run: cat rebar3.crashdump - - # do this in a separate job - upload_coverdata: - needs: make_cover - runs-on: ubuntu-22.04 - steps: - - name: Coveralls Finished - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - run: | - curl -v -k https://coveralls.io/webhook \ - --header "Content-Type: application/json" \ - --data "{\"repo_name\":\"$GITHUB_REPOSITORY\",\"repo_token\":\"$GITHUB_TOKEN\",\"payload\":{\"build_num\":$GITHUB_RUN_ID,\"status\":\"done\"}}" || true + diff --git a/.github/workflows/scorecard.yaml b/.github/workflows/scorecard.yaml index cc65fb76b..e82162b5a 100644 --- a/.github/workflows/scorecard.yaml +++ b/.github/workflows/scorecard.yaml @@ -16,8 +16,9 @@ permissions: read-all jobs: analysis: + if: github.repository == 'emqx/emqx' name: Scorecard analysis - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 permissions: security-events: write id-token: write diff --git a/.github/workflows/stale.yaml b/.github/workflows/stale.yaml index 11afc7415..69f7eb740 100644 --- a/.github/workflows/stale.yaml +++ b/.github/workflows/stale.yaml @@ -13,8 +13,8 @@ permissions: jobs: stale: - if: github.repository_owner == 'emqx' - runs-on: ${{ endsWith(github.repository, '/emqx') && 'ubuntu-22.04' || fromJSON('["self-hosted","ephemeral","linux","x64"]') }} + if: github.repository == 'emqx/emqx' + runs-on: ubuntu-22.04 permissions: issues: write pull-requests: none diff --git a/Makefile b/Makefile index f656e74ba..8c9c7957b 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ endif # Dashboard version # from https://github.com/emqx/emqx-dashboard5 export EMQX_DASHBOARD_VERSION ?= v1.9.0-beta.1 -export EMQX_EE_DASHBOARD_VERSION ?= e1.7.0-beta.1 +export EMQX_EE_DASHBOARD_VERSION ?= e1.7.0-beta.4 PROFILE ?= emqx REL_PROFILES := emqx emqx-enterprise diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index b58cd0cb7..27648a88d 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -34,7 +34,7 @@ {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, - {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}, + {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}}, {ra, "2.7.3"} ]}. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 6ab133757..9d596ce60 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1789,7 +1789,9 @@ mqtt_listener(Bind) -> hoconsc:array(string()), #{ desc => ?DESC(mqtt_listener_access_rules), - default => [<<"allow all">>] + default => [<<"allow all">>], + converter => fun access_rules_converter/1, + validator => fun access_rules_validator/1 } )}, {"proxy_protocol", @@ -1810,6 +1812,50 @@ mqtt_listener(Bind) -> )} ] ++ emqx_schema_hooks:injection_point('mqtt.listener'). +access_rules_converter(AccessRules) -> + DeepRules = + lists:foldr( + fun(Rule, Acc) -> + Rules0 = re:split(Rule, <<"\\s*,\\s*">>, [{return, binary}]), + Rules1 = [string:trim(R) || R <- Rules0], + [Rules1 | Acc] + end, + [], + AccessRules + ), + [unicode:characters_to_list(RuleBin) || RuleBin <- lists:flatten(DeepRules)]. + +access_rules_validator(AccessRules) -> + InvalidRules = [Rule || Rule <- AccessRules, is_invalid_rule(Rule)], + case InvalidRules of + [] -> + ok; + _ -> + MsgStr = io_lib:format("invalid_rule(s): ~ts", [string:join(InvalidRules, ", ")]), + MsgBin = unicode:characters_to_binary(MsgStr), + {error, MsgBin} + end. + +is_invalid_rule(S) -> + try + [Action, CIDR] = string:tokens(S, " "), + case Action of + "allow" -> ok; + "deny" -> ok + end, + case CIDR of + "all" -> + ok; + _ -> + %% should not crash + _ = esockd_cidr:parse(CIDR, true), + ok + end, + false + catch + _:_ -> true + end. + base_listener(Bind) -> [ {"enable", diff --git a/apps/emqx/test/emqx_listeners_update_SUITE.erl b/apps/emqx/test/emqx_listeners_update_SUITE.erl index 8c72b853e..824d5fd2f 100644 --- a/apps/emqx/test/emqx_listeners_update_SUITE.erl +++ b/apps/emqx/test/emqx_listeners_update_SUITE.erl @@ -115,6 +115,68 @@ t_update_conf(_Conf) -> ?assert(is_running('wss:default')), ok. +t_update_conf_validate_access_rules(_Conf) -> + Raw = emqx:get_raw_config(?LISTENERS), + RawCorrectConf1 = emqx_utils_maps:deep_put( + [<<"tcp">>, <<"default">>, <<"access_rules">>], Raw, ["allow all"] + ), + ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, RawCorrectConf1)), + RawCorrectConf2 = emqx_utils_maps:deep_put( + [<<"tcp">>, <<"default">>, <<"access_rules">>], Raw, ["deny all"] + ), + ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, RawCorrectConf2)), + RawCorrectConf3 = emqx_utils_maps:deep_put( + [<<"tcp">>, <<"default">>, <<"access_rules">>], Raw, ["allow 10.0.1.0/24"] + ), + ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, RawCorrectConf3)), + RawIncorrectConf1 = emqx_utils_maps:deep_put( + [<<"tcp">>, <<"default">>, <<"access_rules">>], Raw, ["xxx all"] + ), + ?assertMatch( + {error, #{ + reason := <<"invalid_rule(s): xxx all">>, + value := ["xxx all"], + path := "listeners.tcp.default.access_rules", + kind := validation_error, + matched_type := "emqx:mqtt_tcp_listener" + }}, + emqx:update_config(?LISTENERS, RawIncorrectConf1) + ), + RawIncorrectConf2 = emqx_utils_maps:deep_put( + [<<"tcp">>, <<"default">>, <<"access_rules">>], Raw, ["allow xxx"] + ), + ?assertMatch( + {error, #{ + reason := <<"invalid_rule(s): allow xxx">>, + value := ["allow xxx"], + path := "listeners.tcp.default.access_rules", + kind := validation_error, + matched_type := "emqx:mqtt_tcp_listener" + }}, + emqx:update_config(?LISTENERS, RawIncorrectConf2) + ), + ok. + +t_update_conf_access_rules_split(_Conf) -> + Raw = emqx:get_raw_config(?LISTENERS), + Raw1 = emqx_utils_maps:deep_put( + [<<"tcp">>, <<"default">>, <<"access_rules">>], + Raw, + [" allow all , deny all , allow 10.0.1.0/24 "] + ), + ?assertMatch({ok, _}, emqx:update_config(?LISTENERS, Raw1)), + ?assertMatch( + #{ + tcp := #{ + default := #{ + access_rules := ["allow all", "deny all", "allow 10.0.1.0/24"] + } + } + }, + emqx:get_config(?LISTENERS) + ), + ok. + t_update_tcp_keepalive_conf(_Conf) -> Keepalive = <<"240,30,5">>, KeepaliveStr = binary_to_list(Keepalive), diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl index 40849a29d..ddba5a10d 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.erl @@ -166,7 +166,7 @@ fields(producer) -> )}, {partition_key, sc( - binary(), + emqx_schema:template(), #{ required => true, desc => ?DESC("partition_key") diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index ef1600500..06e925c1b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -118,6 +118,7 @@ which_dbs() -> init({#?db_sup{db = DB}, DefaultOpts}) -> %% Spec for the top-level supervisor for the database: logger:notice("Starting DS DB ~p", [DB]), + emqx_ds_builtin_sup:clean_gvars(DB), emqx_ds_builtin_metrics:init_for_db(DB), Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), ok = start_ra_system(DB, Opts), diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index ce984db57..763d38606 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -190,7 +190,7 @@ prometheus_per_db(NodeOrAggr) -> %% ... %% ''' %% -%% If `NodeOrAggr' = `node' then node name is appended to the list of +%% If `NodeOrAggr' = `aggr' then node name is appended to the list of %% labels. prometheus_per_db(NodeOrAggr, DB, Acc0) -> Labels = [ diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl index 45e81bdc9..971805351 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl @@ -23,6 +23,7 @@ %% API: -export([start_db/2, stop_db/1]). +-export([set_gvar/3, get_gvar/3, clean_gvars/1]). %% behavior callbacks: -export([init/1]). @@ -39,6 +40,13 @@ -define(top, ?MODULE). -define(databases, emqx_ds_builtin_databases_sup). +-define(gvar_tab, emqx_ds_builtin_gvar). + +-record(gvar, { + k :: {emqx_ds:db(), _Key}, + v :: _Value +}). + %%================================================================================ %% API functions %%================================================================================ @@ -61,11 +69,31 @@ stop_db(DB) -> Pid when is_pid(Pid) -> _ = supervisor:terminate_child(?databases, DB), _ = supervisor:delete_child(?databases, DB), - ok; + clean_gvars(DB); undefined -> ok end. +%% @doc Set a DB-global variable. Please don't abuse this API. +-spec set_gvar(emqx_ds:db(), _Key, _Val) -> ok. +set_gvar(DB, Key, Val) -> + ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}), + ok. + +-spec get_gvar(emqx_ds:db(), _Key, Val) -> Val. +get_gvar(DB, Key, Default) -> + case ets:lookup(?gvar_tab, {DB, Key}) of + [#gvar{v = Val}] -> + Val; + [] -> + Default + end. + +-spec clean_gvars(emqx_ds:db()) -> ok. +clean_gvars(DB) -> + ets:match_delete(?gvar_tab, #gvar{k = {DB, '_'}, _ = '_'}), + ok. + %%================================================================================ %% behavior callbacks %%================================================================================ @@ -96,6 +124,7 @@ init(?top) -> type => supervisor, shutdown => infinity }, + _ = ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]), %% SupFlags = #{ strategy => one_for_all, diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index bd7cb3826..184f709e9 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -19,7 +19,9 @@ %% API: -export([ trie_create/1, trie_create/0, + destroy/1, trie_restore/2, + trie_update/2, trie_copy_learned_paths/2, topic_key/3, match_topics/2, @@ -116,10 +118,20 @@ trie_create(UserOpts) -> trie_create() -> trie_create(#{}). +-spec destroy(trie()) -> ok. +destroy(#trie{trie = Trie, stats = Stats}) -> + catch ets:delete(Trie), + catch ets:delete(Stats), + ok. + %% @doc Restore trie from a dump -spec trie_restore(options(), [{_Key, _Val}]) -> trie(). trie_restore(Options, Dump) -> - Trie = trie_create(Options), + trie_update(trie_create(Options), Dump). + +%% @doc Update a trie with a dump of operations (used for replication) +-spec trie_update(trie(), [{_Key, _Val}]) -> trie(). +trie_update(Trie, Dump) -> lists:foreach( fun({{StateFrom, Token}, StateTo}) -> trie_insert(Trie, StateFrom, Token, StateTo) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 61126c164..9330e0b1a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -36,7 +36,8 @@ update_iterator/3, next/3, delete_next/4, - shard_of_message/3 + shard_of_message/3, + current_timestamp/2 ]). %% internal exports: @@ -65,6 +66,7 @@ -export([ init/1, apply/3, + tick/2, snapshot_module/0 ]). @@ -86,6 +88,7 @@ ]). -include_lib("emqx_utils/include/emqx_message.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -include("emqx_ds_replication_layer.hrl"). %%================================================================================ @@ -155,12 +158,14 @@ %% Command. Each command is an entry in the replication log. -type ra_command() :: #{ - ?tag := ?BATCH | add_generation | update_config | drop_generation, + ?tag := ?BATCH | add_generation | update_config | drop_generation | storage_event, _ => _ }. -type timestamp_us() :: non_neg_integer(). +-define(gv_timestamp(SHARD), {gv_timestamp, SHARD}). + %%================================================================================ %% API functions %%================================================================================ @@ -363,9 +368,16 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> end, integer_to_binary(Hash). +-spec foreach_shard(emqx_ds:db(), fun((shard_id()) -> _)) -> ok. foreach_shard(DB, Fun) -> lists:foreach(Fun, list_shards(DB)). +%% @doc Messages have been replicated up to this timestamp on the +%% local server +-spec current_timestamp(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> emqx_ds:time(). +current_timestamp(DB, Shard) -> + emqx_ds_builtin_sup:get_gvar(DB, ?gv_timestamp(Shard), 0). + %%================================================================================ %% behavior callbacks %%================================================================================ @@ -490,7 +502,9 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> ShardId = {DB, Shard}, ?IF_STORAGE_RUNNING( ShardId, - emqx_ds_storage_layer:next(ShardId, Iter, BatchSize) + emqx_ds_storage_layer:next( + ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard) + ) ). -spec do_delete_next_v4( @@ -502,7 +516,13 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> ) -> emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()). do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) -> - emqx_ds_storage_layer:delete_next({DB, Shard}, Iter, Selector, BatchSize). + emqx_ds_storage_layer:delete_next( + {DB, Shard}, + Iter, + Selector, + BatchSize, + emqx_ds_replication_layer:current_timestamp(DB, Shard) + ). -spec do_add_generation_v2(emqx_ds:db()) -> no_return(). do_add_generation_v2(_DB) -> @@ -672,50 +692,69 @@ apply( ?tag := ?BATCH, ?batch_messages := MessagesIn }, - #{db_shard := DBShard, latest := Latest} = State + #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0 ) -> %% NOTE %% Unique timestamp tracking real time closely. %% With microsecond granularity it should be nearly impossible for it to run %% too far ahead than the real time clock. - {NLatest, Messages} = assign_timestamps(Latest, MessagesIn), - %% TODO - %% Batch is now reversed, but it should not make a lot of difference. - %% Even if it would be in order, it's still possible to write messages far away - %% in the past, i.e. when replica catches up with the leader. Storage layer - %% currently relies on wall clock time to decide if it's safe to iterate over - %% next epoch, this is likely wrong. Ideally it should rely on consensus clock - %% time instead. + ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, ts => Latest0}), + {Latest, Messages} = assign_timestamps(Latest0, MessagesIn), Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), - NState = State#{latest := NLatest}, + State = State0#{latest := Latest}, + set_ts(DBShard, Latest), %% TODO: Need to measure effects of changing frequency of `release_cursor`. - Effect = {release_cursor, RaftIdx, NState}, - {NState, Result, Effect}; + Effect = {release_cursor, RaftIdx, State}, + {State, Result, Effect}; apply( _RaftMeta, #{?tag := add_generation, ?since := Since}, - #{db_shard := DBShard, latest := Latest} = State + #{db_shard := DBShard, latest := Latest0} = State0 ) -> - {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest), + {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0), Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp), - NState = State#{latest := NLatest}, - {NState, Result}; + State = State0#{latest := Latest}, + set_ts(DBShard, Latest), + {State, Result}; apply( _RaftMeta, #{?tag := update_config, ?since := Since, ?config := Opts}, - #{db_shard := DBShard, latest := Latest} = State + #{db_shard := DBShard, latest := Latest0} = State0 ) -> - {Timestamp, NLatest} = ensure_monotonic_timestamp(Since, Latest), + {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0), Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts), - NState = State#{latest := NLatest}, - {NState, Result}; + State = State0#{latest := Latest}, + {State, Result}; apply( _RaftMeta, #{?tag := drop_generation, ?generation := GenId}, #{db_shard := DBShard} = State ) -> Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId), - {State, Result}. + {State, Result}; +apply( + _RaftMeta, + #{?tag := storage_event, ?payload := CustomEvent, ?now := Now}, + #{db_shard := DBShard, latest := Latest0} = State +) -> + Latest = max(Latest0, Now), + set_ts(DBShard, Latest), + ?tp( + debug, + emqx_ds_replication_layer_storage_event, + #{ + shard => DBShard, payload => CustomEvent, latest => Latest + } + ), + Effects = handle_custom_event(DBShard, Latest, CustomEvent), + {State#{latest => Latest}, ok, Effects}. + +-spec tick(integer(), ra_state()) -> ra_machine:effects(). +tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> + %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard), + {Timestamp, _} = ensure_monotonic_timestamp(timestamp_to_timeus(TimeMs), Latest), + ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}), + handle_custom_event(DBShard, Timestamp, tick). assign_timestamps(Latest, Messages) -> assign_timestamps(Latest, Messages, []). @@ -730,7 +769,7 @@ assign_timestamps(Latest, [MessageIn | Rest], Acc) -> assign_timestamps(Latest + 1, Rest, [Message | Acc]) end; assign_timestamps(Latest, [], Acc) -> - {Latest, Acc}. + {Latest, lists:reverse(Acc)}. assign_timestamp(TimestampUs, Message) -> {TimestampUs, Message}. @@ -748,3 +787,18 @@ timeus_to_timestamp(TimestampUs) -> snapshot_module() -> emqx_ds_replication_snapshot. + +handle_custom_event(DBShard, Latest, Event) -> + try + Events = emqx_ds_storage_layer:handle_event(DBShard, Latest, Event), + [{append, #{?tag => storage_event, ?payload => I, ?now => Latest}} || I <- Events] + catch + EC:Err:Stacktrace -> + ?tp(error, ds_storage_custom_event_fail, #{ + EC => Err, stacktrace => Stacktrace, event => Event + }), + [] + end. + +set_ts({DB, Shard}, TS) -> + emqx_ds_builtin_sup:set_gvar(DB, ?gv_timestamp(Shard), TS). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl index 70812fa18..4472b5a47 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl @@ -41,4 +41,8 @@ %% drop_generation -define(generation, 2). +%% custom events +-define(payload, 2). +-define(now, 3). + -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index d4dfa9115..0bfa89e95 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -16,6 +16,7 @@ -module(emqx_ds_replication_layer_shard). +%% API: -export([start_link/3]). %% Static server configuration @@ -325,7 +326,8 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> ClusterName = cluster_name(DB, Shard), LocalServer = local_server(DB, Shard), Servers = shard_servers(DB, Shard), - case ra:restart_server(DB, LocalServer) of + MutableConfig = #{tick_timeout => 100}, + case ra:restart_server(DB, LocalServer, MutableConfig) of {error, name_not_registered} -> Bootstrap = true, Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, @@ -336,7 +338,7 @@ start_server(DB, Shard, #{replication_options := ReplicationOpts}) -> ], ReplicationOpts ), - ok = ra:start_server(DB, #{ + ok = ra:start_server(DB, MutableConfig#{ id => LocalServer, uid => server_uid(DB, Shard), cluster_name => ClusterName, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 2ec6674b6..ebbcde17c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -28,15 +28,18 @@ create/4, open/5, drop/5, - store_batch/4, + prepare_batch/4, + commit_batch/3, get_streams/4, get_delete_streams/4, make_iterator/5, make_delete_iterator/5, update_iterator/4, - next/4, - delete_next/5, - post_creation_actions/1 + next/5, + delete_next/6, + post_creation_actions/1, + + handle_event/4 ]). %% internal exports: @@ -66,6 +69,9 @@ -define(start_time, 3). -define(storage_key, 4). -define(last_seen_key, 5). +-define(cooked_payloads, 6). +-define(cooked_lts_ops, 7). +-define(cooked_ts, 8). -type options() :: #{ @@ -88,16 +94,28 @@ db :: rocksdb:db_handle(), data :: rocksdb:cf_handle(), trie :: emqx_ds_lts:trie(), + trie_cf :: rocksdb:cf_handle(), keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()), - ts_offset :: non_neg_integer() + ts_bits :: non_neg_integer(), + ts_offset :: non_neg_integer(), + gvars :: ets:table() }). +-define(lts_persist_ops, emqx_ds_storage_bitfield_lts_ops). + -type s() :: #s{}. -type stream() :: emqx_ds_lts:msg_storage_key(). -type delete_stream() :: emqx_ds_lts:msg_storage_key(). +-type cooked_batch() :: + #{ + ?cooked_payloads := [{binary(), binary()}], + ?cooked_lts_ops := [{binary(), binary()}], + ?cooked_ts := integer() + }. + -type iterator() :: #{ ?tag := ?IT, @@ -141,6 +159,10 @@ -define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]). +%% GVar used for idle detection: +-define(IDLE_DETECT, idle_detect). +-define(EPOCH(S, TS), (TS bsl S#s.ts_bits)). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. @@ -212,8 +234,11 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> db = DBHandle, data = DataCF, trie = Trie, + trie_cf = TrieCF, keymappers = KeymapperCache, - ts_offset = TSOffsetBits + ts_offset = TSOffsetBits, + ts_bits = TSBits, + gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}]) }. -spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) -> @@ -238,32 +263,78 @@ post_creation_actions( s() ) -> ok. -drop(_Shard, DBHandle, GenId, CFRefs, #s{}) -> +drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) -> + emqx_ds_lts:destroy(Trie), + catch ets:delete(GVars), {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs), {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs), ok = rocksdb:drop_column_family(DBHandle, DataCF), ok = rocksdb:drop_column_family(DBHandle, TrieCF), ok. --spec store_batch( +-spec prepare_batch( emqx_ds_storage_layer:shard_id(), s(), - [{emqx_ds:time(), emqx_types:message()}], + [{emqx_ds:time(), emqx_types:message()}, ...], emqx_ds:message_store_opts() ) -> - emqx_ds:store_batch_result(). -store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> + {ok, cooked_batch()}. +prepare_batch(_ShardId, S, Messages, _Options) -> + _ = erase(?lts_persist_ops), + {Payloads, MaxTs} = + lists:mapfoldl( + fun({Timestamp, Msg}, Acc) -> + {Key, _} = make_key(S, Timestamp, Msg), + Payload = {Key, message_to_value_v1(Msg)}, + {Payload, max(Acc, Timestamp)} + end, + 0, + Messages + ), + {ok, #{ + ?cooked_payloads => Payloads, + ?cooked_lts_ops => pop_lts_persist_ops(), + ?cooked_ts => MaxTs + }}. + +-spec commit_batch( + emqx_ds_storage_layer:shard_id(), + s(), + cooked_batch() +) -> ok | emqx_ds:error(_). +commit_batch( + _ShardId, + _Data, + #{?cooked_payloads := [], ?cooked_lts_ops := LTS} +) -> + %% Assert: + [] = LTS, + ok; +commit_batch( + _ShardId, + #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars}, + #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs} +) -> {ok, Batch} = rocksdb:batch(), + %% Commit LTS trie to the storage: lists:foreach( - fun({Timestamp, Msg}) -> - {Key, _} = make_key(S, Timestamp, Msg), - Val = serialize(Msg), - rocksdb:put(DB, Data, Key, Val, []) + fun({Key, Val}) -> + ok = rocksdb:batch_put(Batch, TrieCF, term_to_binary(Key), term_to_binary(Val)) end, - Messages + LtsOps + ), + %% Apply LTS ops to the memory cache: + _ = emqx_ds_lts:trie_update(Trie, LtsOps), + %% Commit payloads: + lists:foreach( + fun({Key, Val}) -> + ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val)) + end, + Payloads ), Result = rocksdb:write_batch(DB, Batch, []), rocksdb:release_batch(Batch), + ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}), %% NOTE %% Strictly speaking, `{error, incomplete}` is a valid result but should be impossible to %% observe until there's `{no_slowdown, true}` in write options. @@ -348,13 +419,39 @@ update_iterator( ) -> {ok, OldIter#{?last_seen_key => DSKey}}. -next(Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> - %% Compute safe cutoff time. - %% It's the point in time where the last complete epoch ends, so we need to know - %% the current time to compute it. +next( + Shard, + Schema = #s{ts_offset = TSOffset, ts_bits = TSBits}, + It = #{?storage_key := Stream}, + BatchSize, + Now +) -> init_counters(), - Now = emqx_ds:timestamp_us(), - SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, + %% Compute safe cutoff time. It's the point in time where the last + %% complete epoch ends, so we need to know the current time to + %% compute it. This is needed because new keys can be added before + %% the iterator. + IsWildcard = + case Stream of + {_StaticKey, []} -> false; + _ -> true + end, + SafeCutoffTime = + case IsWildcard of + true -> + (Now bsr TSOffset) bsl TSOffset; + false -> + %% Iterators scanning streams without varying topic + %% levels can operate on incomplete epochs, since new + %% matching keys for the single topic are added in + %% lexicographic order. + %% + %% Note: this DOES NOT apply to non-wildcard topic + %% filters operating on streams with varying parts: + %% iterator can jump to the next topic and then it + %% won't backtrack. + 1 bsl TSBits - 1 + end, try next_until(Schema, It, SafeCutoffTime, BatchSize) after @@ -386,12 +483,11 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, rocksdb:iterator_close(ITHandle) end. -delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) -> +delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize, Now) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know %% the current time to compute it. init_counters(), - Now = emqx_message:timestamp_now(), SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, try delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize) @@ -441,6 +537,24 @@ delete_next_until( rocksdb:iterator_close(ITHandle) end. +handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> + case ets:lookup(Gvars, ?IDLE_DETECT) of + [{?IDLE_DETECT, Latch, LastWrittenTs}] -> + ok; + [] -> + Latch = false, + LastWrittenTs = 0 + end, + case Latch of + false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) -> + ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}), + [dummy_event]; + _ -> + [] + end; +handle_event(_ShardId, _Data, _Time, _Event) -> + []. + %%================================================================================ %% Internal functions %%================================================================================ @@ -722,9 +836,6 @@ value_v1_to_message({Id, Qos, From, Flags, Headers, Topic, Payload, Timestamp, E extra = Extra }. -serialize(Msg) -> - term_to_binary(message_to_value_v1(Msg)). - deserialize(Blob) -> value_v1_to_message(binary_to_term(Blob)). @@ -752,7 +863,8 @@ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) -> -spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie(). restore_trie(TopicIndexBytes, DB, CF) -> PersistCallback = fun(Key, Val) -> - rocksdb:put(DB, CF, term_to_binary(Key), term_to_binary(Val), []) + push_lts_persist_op(Key, Val), + ok end, {ok, IT} = rocksdb:iterator(DB, CF, []), try @@ -800,8 +912,29 @@ data_cf(GenId) -> trie_cf(GenId) -> "emqx_ds_storage_bitfield_lts_trie" ++ integer_to_list(GenId). +-spec push_lts_persist_op(_Key, _Val) -> ok. +push_lts_persist_op(Key, Val) -> + case erlang:get(?lts_persist_ops) of + undefined -> + erlang:put(?lts_persist_ops, [{Key, Val}]); + L when is_list(L) -> + erlang:put(?lts_persist_ops, [{Key, Val} | L]) + end. + +-spec pop_lts_persist_ops() -> [{_Key, _Val}]. +pop_lts_persist_ops() -> + case erlang:erase(?lts_persist_ops) of + undefined -> + []; + L when is_list(L) -> + L + end. + -ifdef(TEST). +serialize(Msg) -> + term_to_binary(message_to_value_v1(Msg)). + serialize_deserialize_test() -> Msg = #message{ id = <<"message_id_val">>, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 4981c3fc1..d9a0321b0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -26,13 +26,16 @@ %% Data store_batch/3, + prepare_batch/3, + commit_batch/2, + get_streams/3, get_delete_streams/3, make_iterator/4, make_delete_iterator/4, update_iterator/3, - next/3, - delete_next/4, + next/4, + delete_next/5, %% Generations update_config/3, @@ -42,7 +45,10 @@ %% Snapshotting take_snapshot/1, - accept_snapshot/1 + accept_snapshot/1, + + %% Custom events + handle_event/3 ]). %% gen_server @@ -63,7 +69,8 @@ shard_id/0, options/0, prototype/0, - post_creation_context/0 + post_creation_context/0, + cooked_batch/0 ]). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -79,11 +86,11 @@ %% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending %% records over the wire. - %% tags: -define(STREAM, 1). -define(IT, 2). -define(DELETE_IT, 3). +-define(COOKED_BATCH, 4). %% keys: -define(tag, 1). @@ -130,6 +137,13 @@ ?enc := term() }. +-opaque cooked_batch() :: + #{ + ?tag := ?COOKED_BATCH, + ?generation := gen_id(), + ?enc := term() + }. + %%%% Generation: -define(GEN_KEY(GEN_ID), {generation, GEN_ID}). @@ -201,16 +215,23 @@ -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> _Data. +%% Delete the schema and data -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) -> ok | {error, _Reason}. --callback store_batch( +-callback prepare_batch( shard_id(), _Data, - [{emqx_ds:time(), emqx_types:message()}], + [{emqx_ds:time(), emqx_types:message()}, ...], emqx_ds:message_store_opts() ) -> - emqx_ds:store_batch_result(). + {ok, term()} | emqx_ds:error(_). + +-callback commit_batch( + shard_id(), + _Data, + _CookedBatch +) -> ok | emqx_ds:error(_). -callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> [_Stream]. @@ -223,12 +244,19 @@ ) -> emqx_ds:make_delete_iterator_result(_Iterator). --callback next(shard_id(), _Data, Iter, pos_integer()) -> +-callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time()) -> {ok, Iter, [emqx_types:message()]} | {error, _}. +-callback delete_next( + shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time() +) -> + {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}. + +-callback handle_event(shard_id(), _Data, emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. + -callback post_creation_actions(post_creation_context()) -> _Data. --optional_callbacks([post_creation_actions/1]). +-optional_callbacks([post_creation_actions/1, handle_event/4]). %%================================================================================ %% API for the replication layer @@ -251,20 +279,54 @@ drop_shard(Shard) -> emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). -store_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> - %% NOTE - %% We assume that batches do not span generations. Callers should enforce this. +store_batch(Shard, Messages, Options) -> ?tp(emqx_ds_storage_layer_store_batch, #{ shard => Shard, messages => Messages, options => Options }), - #{module := Mod, data := GenData} = generation_at(Shard, Time), + case prepare_batch(Shard, Messages, Options) of + {ok, CookedBatch} -> + commit_batch(Shard, CookedBatch); + ignore -> + ok; + Error = {error, _, _} -> + Error + end. + +-spec prepare_batch( + shard_id(), + [{emqx_ds:time(), emqx_types:message()}], + emqx_ds:message_store_opts() +) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_). +prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> + %% NOTE + %% We assume that batches do not span generations. Callers should enforce this. + ?tp(emqx_ds_storage_layer_prepare_batch, #{ + shard => Shard, messages => Messages, options => Options + }), + {GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time), T0 = erlang:monotonic_time(microsecond), - Result = Mod:store_batch(Shard, GenData, Messages, Options), + Result = + case Mod:prepare_batch(Shard, GenData, Messages, Options) of + {ok, CookedBatch} -> + {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; + Error = {error, _, _} -> + Error + end, T1 = erlang:monotonic_time(microsecond), + %% TODO store->prepare emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), Result; -store_batch(_Shard, [], _Options) -> - ok. +prepare_batch(_Shard, [], _Options) -> + ignore. + +-spec commit_batch(shard_id(), cooked_batch()) -> emqx_ds:store_batch_result(). +commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}) -> + #{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard), + T0 = erlang:monotonic_time(microsecond), + Result = Mod:commit_batch(Shard, GenData, CookedBatch), + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), + Result. -spec get_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{integer(), stream()}]. @@ -277,6 +339,13 @@ get_streams(Shard, TopicFilter, StartTime) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), + ?tp(get_streams_get_gen_topic, #{ + gen_id => GenId, + topic => TopicFilter, + start_time => StartTime, + streams => Streams, + gen_data => GenData + }), [ {GenId, ?stream_v2(GenId, InnerStream)} || InnerStream <- Streams @@ -377,13 +446,13 @@ update_iterator( {error, unrecoverable, generation_not_found} end. --spec next(shard_id(), iterator(), pos_integer()) -> +-spec next(shard_id(), iterator(), pos_integer(), emqx_ds:time()) -> emqx_ds:next_result(iterator()). -next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> +next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize, Now) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> Current = generation_current(Shard), - case Mod:next(Shard, GenData, GenIter0, BatchSize) of + case Mod:next(Shard, GenData, GenIter0, BatchSize, Now) of {ok, _GenIter, []} when GenId < Current -> %% This is a past generation. Storage layer won't write %% any more messages here. The iterator reached the end: @@ -399,18 +468,21 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch {error, unrecoverable, generation_not_found} end. --spec delete_next(shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) -> +-spec delete_next( + shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer(), emqx_ds:time() +) -> emqx_ds:delete_next_result(delete_iterator()). delete_next( Shard, Iter = #{?tag := ?DELETE_IT, ?generation := GenId, ?enc := GenIter0}, Selector, - BatchSize + BatchSize, + Now ) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> Current = generation_current(Shard), - case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of + case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize, Now) of {ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current -> %% This is a past generation. Storage layer won't write %% any more messages here. The iterator reached the end: @@ -849,6 +921,24 @@ handle_accept_snapshot(ShardId) -> Dir = db_dir(ShardId), emqx_ds_storage_snapshot:new_writer(Dir). +%% FIXME: currently this interface is a hack to handle safe cutoff +%% timestamp in LTS. It has many shortcomings (can lead to infinite +%% loops if the CBM is not careful; events from one generation may be +%% sent to the next one, etc.) and the API is not well thought out in +%% general. +%% +%% The mechanism of storage layer events should be refined later. +-spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. +handle_event(Shard, Time, Event) -> + {_GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time), + ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}), + case erlang:function_exported(Mod, handle_event, 4) of + true -> + Mod:handle_event(Shard, GenData, Time, Event); + false -> + [] + end. + %%-------------------------------------------------------------------------------- %% Schema access %%-------------------------------------------------------------------------------- @@ -881,7 +971,7 @@ generations_since(Shard, Since) -> Schema ). --spec generation_at(shard_id(), emqx_ds:time()) -> generation(). +-spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()}. generation_at(Shard, Time) -> Schema = #{current_generation := Current} = get_schema_runtime(Shard), generation_at(Time, Current, Schema). @@ -892,7 +982,7 @@ generation_at(Time, GenId, Schema) -> #{since := Since} when Time < Since andalso GenId > 0 -> generation_at(Time, prev_generation_id(GenId), Schema); _ -> - Gen + {GenId, Gen} end. -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 7aa54b9f3..10007488c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -31,14 +31,15 @@ create/4, open/5, drop/5, - store_batch/4, + prepare_batch/4, + commit_batch/3, get_streams/4, get_delete_streams/4, make_iterator/5, make_delete_iterator/5, update_iterator/4, - next/4, - delete_next/5 + next/5, + delete_next/6 ]). %% internal exports: @@ -101,12 +102,14 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) -> ok = rocksdb:drop_column_family(DBHandle, CFHandle), ok. -store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := true}) -> +prepare_batch(_ShardId, _Data, Messages, _Options) -> + {ok, Messages}. + +commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) -> {ok, Batch} = rocksdb:batch(), lists:foreach( - fun(Msg) -> - Id = erlang:unique_integer([monotonic]), - Key = <>, + fun({TS, Msg}) -> + Key = <>, Val = term_to_binary(Msg), rocksdb:batch_put(Batch, CF, Key, Val) end, @@ -114,16 +117,7 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := tru ), Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []), rocksdb:release_batch(Batch), - Res; -store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> - lists:foreach( - fun({Timestamp, Msg}) -> - Key = <>, - Val = term_to_binary(Msg), - rocksdb:put(DB, CF, Key, Val, []) - end, - Messages - ). + Res. get_streams(_Shard, _Data, _TopicFilter, _StartTime) -> [#stream{}]. @@ -154,7 +148,7 @@ update_iterator(_Shard, _Data, OldIter, DSKey) -> last_seen_message_key = DSKey }}. -next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> +next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) -> #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0, {ok, ITHandle} = rocksdb:iterator(DB, CF, []), Action = @@ -170,7 +164,7 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> It = It0#it{last_seen_message_key = Key}, {ok, It, lists:reverse(Messages)}. -delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize) -> +delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) -> #delete_it{ topic_filter = TopicFilter, start_time = StartTime, diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 3b0e37c7f..35b22cf32 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -21,10 +21,14 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). --include_lib("snabbkaffe/include/test_macros.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(DB, testdb). +-define(ON(NODE, BODY), + erpc:call(NODE, erlang, apply, [fun() -> BODY end, []]) +). + opts() -> opts(#{}). @@ -32,12 +36,13 @@ opts(Overrides) -> maps:merge( #{ backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}}, + %% storage => {emqx_ds_storage_reference, #{}}, + storage => {emqx_ds_storage_bitfield_lts, #{epoch_bits => 10}}, n_shards => 16, n_sites => 1, replication_factor => 3, replication_options => #{ - wal_max_size_bytes => 64 * 1024, + wal_max_size_bytes => 64, wal_max_batch_size => 1024, snapshot_interval => 128 } @@ -67,64 +72,61 @@ t_replication_transfers_snapshots('end', Config) -> ok = emqx_cth_cluster:stop(?config(nodes, Config)). t_replication_transfers_snapshots(Config) -> - NMsgs = 4000, + NMsgs = 400, + NClients = 5, + {Stream, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages( + ?FUNCTION_NAME, NClients, NMsgs + ), + Nodes = [Node, NodeOffline | _] = ?config(nodes, Config), _Specs = [_, SpecOffline | _] = ?config(specs, Config), + ?check_trace( + begin + %% Initialize DB on all nodes and wait for it to be online. + Opts = opts(#{n_shards => 1, n_sites => 3}), + ?assertEqual( + [{ok, ok} || _ <- Nodes], + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) + ), + ?retry( + 500, + 10, + ?assertMatch([[_], [_], [_]], [shards_online(N, ?DB) || N <- Nodes]) + ), - %% Initialize DB on all nodes and wait for it to be online. - Opts = opts(#{n_shards => 1, n_sites => 3}), - ?assertEqual( - [{ok, ok} || _ <- Nodes], - erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) - ), - ?retry( - 500, - 10, - ?assertMatch([[_], [_], [_]], [shards_online(N, ?DB) || N <- Nodes]) - ), + %% Stop the DB on the "offline" node. + ok = emqx_cth_cluster:stop_node(NodeOffline), - %% Stop the DB on the "offline" node. - ok = emqx_cth_cluster:stop_node(NodeOffline), + %% Fill the storage with messages and few additional generations. + emqx_ds_test_helpers:apply_stream(?DB, Nodes -- [NodeOffline], Stream), - %% Fill the storage with messages and few additional generations. - Messages = fill_storage(Node, ?DB, NMsgs, #{p_addgen => 0.01}), + %% Restart the node. + [NodeOffline] = emqx_cth_cluster:restart(SpecOffline), + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{ + ?snk_kind := dsrepl_snapshot_accepted, + ?snk_meta := #{node := NodeOffline} + }) + ), + ?assertEqual( + ok, + erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()]) + ), - %% Restart the node. - [NodeOffline] = emqx_cth_cluster:restart(SpecOffline), - {ok, SRef} = snabbkaffe:subscribe( - ?match_event(#{ - ?snk_kind := dsrepl_snapshot_accepted, - ?snk_meta := #{node := NodeOffline} - }) - ), - ?assertEqual( - ok, - erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()]) - ), + %% Trigger storage operation and wait the replica to be restored. + _ = add_generation(Node, ?DB), + ?assertMatch( + {ok, _}, + snabbkaffe:receive_events(SRef) + ), - %% Trigger storage operation and wait the replica to be restored. - _ = add_generation(Node, ?DB), - ?assertMatch( - {ok, _}, - snabbkaffe:receive_events(SRef) - ), + %% Wait until any pending replication activities are finished (e.g. Raft log entries). + ok = timer:sleep(3_000), - %% Wait until any pending replication activities are finished (e.g. Raft log entries). - ok = timer:sleep(3_000), - - %% Check that the DB has been restored. - Shard = hd(shards(NodeOffline, ?DB)), - MessagesOffline = lists:keysort( - #message.timestamp, - consume_shard(NodeOffline, ?DB, Shard, ['#'], 0) - ), - ?assertEqual( - sample(40, Messages), - sample(40, MessagesOffline) - ), - ?assertEqual( - Messages, - MessagesOffline + %% Check that the DB has been restored: + emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams) + end, + [] ). t_rebalance(init, Config) -> @@ -142,112 +144,120 @@ t_rebalance(init, Config) -> t_rebalance('end', Config) -> ok = emqx_cth_cluster:stop(?config(nodes, Config)). +%% This testcase verifies that the storage rebalancing works correctly: +%% 1. Join/leave operations are applied successfully. +%% 2. Message data survives the rebalancing. +%% 3. Shard cluster membership converges to the target replica allocation. +%% 4. Replication factor is respected. t_rebalance(Config) -> - %% This testcase verifies that the storage rebalancing works correctly: - %% 1. Join/leave operations are applied successfully. - %% 2. Message data survives the rebalancing. - %% 3. Shard cluster membership converges to the target replica allocation. - %% 4. Replication factor is respected. - - NMsgs = 800, + NMsgs = 50, NClients = 5, - Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), - - %% Initialize DB on the first node. - Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), - ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])), - ?assertMatch( - Shards when length(Shards) == 16, - shards_online(N1, ?DB) + {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages( + ?FUNCTION_NAME, NClients, NMsgs ), + Nodes = [N1, N2 | _] = ?config(nodes, Config), + ?check_trace( + #{timetrap => 30_000}, + begin + %% 1. Initialize DB on the first node. + Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), + ?assertEqual(ok, ?ON(N1, emqx_ds:open_db(?DB, Opts))), + ?assertMatch(Shards when length(Shards) == 16, shards_online(N1, ?DB)), - %% Open DB on the rest of the nodes. - ?assertEqual( - [{ok, ok} || _ <- [N2, N3, N4]], - erpc:multicall([N2, N3, N4], emqx_ds, open_db, [?DB, Opts]) - ), + %% 1.1 Open DB on the rest of the nodes: + [ + ?assertEqual(ok, ?ON(Node, emqx_ds:open_db(?DB, Opts))) + || Node <- Nodes + ], - Sites = [S1, S2 | _Rest] = [ds_repl_meta(N, this_site) || N <- Nodes], - ct:pal("Sites: ~p~n", [Sites]), + Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), - %% Only N1 should be responsible for all shards initially. - ?assertEqual( - [[S1] || _ <- Nodes], - [ds_repl_meta(N, db_sites, [?DB]) || N <- Nodes] - ), + Sequence = [ + %% Join the second site to the DB replication sites: + {N1, join_db_site, S2}, + %% Should be a no-op: + {N2, join_db_site, S2}, + %% Now join the rest of the sites: + {N2, assign_db_sites, Sites} + ], + Stream1 = emqx_utils_stream:interleave( + [ + {50, Stream0}, + emqx_utils_stream:const(add_generation) + ], + false + ), + Stream = emqx_utils_stream:interleave( + [ + {50, Stream0}, + emqx_utils_stream:list(Sequence) + ], + true + ), - %% Fill the storage with messages and few additional generations. - %% This will force shards to trigger snapshot transfers during rebalance. - ClientMessages = emqx_utils:pmap( - fun(CID) -> - N = lists:nth(1 + (CID rem length(Nodes)), Nodes), - fill_storage(N, ?DB, NMsgs, #{client_id => integer_to_binary(CID)}) + %% 1.2 Verify that all nodes have the same view of metadata storage: + [ + ?defer_assert( + ?assertEqual( + [S1], + ?ON(Node, emqx_ds_replication_layer_meta:db_sites(?DB)), + #{ + msg => "Initially, only S1 should be responsible for all shards", + node => Node + } + ) + ) + || Node <- Nodes + ], + + %% 2. Start filling the storage: + emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream), + timer:sleep(5000), + emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams), + [ + ?defer_assert( + ?assertEqual( + 16 * 3 div length(Nodes), + n_shards_online(Node, ?DB), + "Each node is now responsible for 3/4 of the shards" + ) + ) + || Node <- Nodes + ], + + %% Verify that the set of shard servers matches the target allocation. + Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes], + ShardServers = [ + shard_server_info(N, ?DB, Shard, Site, readiness) + || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation), + Shard <- Shards + ], + ?assert( + lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers), + ShardServers + ), + + %% Scale down the cluster by removing the first node. + ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])), + ct:pal("Transitions (~p -> ~p): ~p~n", [ + Sites, tl(Sites), emqx_ds_test_helpers:transitions(N1, ?DB) + ]), + ?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N2, ?DB))), + + %% Verify that at the end each node is now responsible for each shard. + ?defer_assert( + ?assertEqual( + [0, 16, 16, 16], + [n_shards_online(N, ?DB) || N <- Nodes] + ) + ), + + %% Verify that the messages are once again preserved after the rebalance: + emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams) end, - lists:seq(1, NClients), - infinity - ), - Messages1 = lists:sort(fun compare_message/2, lists:append(ClientMessages)), - - %% Join the second site to the DB replication sites. - ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])), - %% Should be no-op. - ?assertEqual(ok, ds_repl_meta(N2, join_db_site, [?DB, S2])), - ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]), - - %% Fill in some more messages *during* the rebalance. - MessagesRB1 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB1">>}), - - ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), - - %% Now join the rest of the sites. - ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])), - ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]), - - %% Fill in some more messages *during* the rebalance. - MessagesRB2 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB2">>}), - - ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), - - %% Verify that each node is now responsible for 3/4 of the shards. - ?assertEqual( - [(16 * 3) div length(Nodes) || _ <- Nodes], - [n_shards_online(N, ?DB) || N <- Nodes] - ), - - %% Verify that the set of shard servers matches the target allocation. - Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes], - ShardServers = [ - shard_server_info(N, ?DB, Shard, Site, readiness) - || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation), - Shard <- Shards - ], - ?assert( - lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers), - ShardServers - ), - - %% Verify that the messages are preserved after the rebalance. - Messages = Messages1 ++ MessagesRB1 ++ MessagesRB2, - MessagesN4 = lists:sort(fun compare_message/2, consume(N4, ?DB, ['#'], 0)), - ?assertEqual(sample(20, Messages), sample(20, MessagesN4)), - ?assertEqual(Messages, MessagesN4), - - %% Scale down the cluster by removing the first node. - ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])), - ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]), - - ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), - - %% Verify that each node is now responsible for each shard. - ?assertEqual( - [0, 16, 16, 16], - [n_shards_online(N, ?DB) || N <- Nodes] - ), - - %% Verify that the messages are once again preserved after the rebalance. - MessagesN3 = lists:sort(fun compare_message/2, consume(N3, ?DB, ['#'], 0)), - ?assertEqual(sample(20, Messages), sample(20, MessagesN3)), - ?assertEqual(Messages, MessagesN3). + [] + ). t_join_leave_errors(init, Config) -> Apps = [appspec(emqx_durable_storage)], @@ -293,7 +303,7 @@ t_join_leave_errors(Config) -> %% Should be no-op. ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])), - ?assertEqual([], transitions(N1, ?DB)), + ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)), %% Impossible to leave the last site. ?assertEqual( @@ -304,12 +314,12 @@ t_join_leave_errors(Config) -> %% "Move" the DB to the other node. ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])), ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), - ?assertMatch([_ | _], transitions(N1, ?DB)), - ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), + ?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?DB)), + ?retry(1000, 10, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), %% Should be no-op. ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), - ?assertEqual([], transitions(N1, ?DB)). + ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB)). t_rebalance_chaotic_converges(init, Config) -> Apps = [appspec(emqx_durable_storage)], @@ -333,78 +343,79 @@ t_rebalance_chaotic_converges(Config) -> NMsgs = 500, Nodes = [N1, N2, N3] = ?config(nodes, Config), - %% Initialize DB on first two nodes. - Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}), - ?assertEqual( - [{ok, ok}, {ok, ok}], - erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts]) + NClients = 5, + {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages( + ?FUNCTION_NAME, NClients, NMsgs ), - %% Open DB on the last node. - ?assertEqual( - ok, - erpc:call(N3, emqx_ds, open_db, [?DB, Opts]) - ), + ?check_trace( + #{}, + begin + %% Initialize DB on first two nodes. + Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}), - %% Find out which sites there are. - Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], - ct:pal("Sites: ~p~n", [Sites]), + ?assertEqual( + [{ok, ok}, {ok, ok}], + erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts]) + ), - %% Initially, the DB is assigned to [S1, S2]. - ?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])), - ?assertEqual( - lists:sort([S1, S2]), - ds_repl_meta(N1, db_sites, [?DB]) - ), + %% Open DB on the last node. + ?assertEqual( + ok, + erpc:call(N3, emqx_ds, open_db, [?DB, Opts]) + ), - %% Fill the storage with messages and few additional generations. - Messages0 = lists:append([ - fill_storage(N1, ?DB, NMsgs, #{client_id => <<"C1">>}), - fill_storage(N2, ?DB, NMsgs, #{client_id => <<"C2">>}), - fill_storage(N3, ?DB, NMsgs, #{client_id => <<"C3">>}) - ]), + %% Find out which sites there are. + Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), - %% Construct a chaotic transition sequence that changes assignment to [S2, S3]. - Sequence = [ - {N1, join_db_site, S3}, - {N2, leave_db_site, S2}, - {N3, leave_db_site, S1}, - {N1, join_db_site, S2}, - {N2, join_db_site, S1}, - {N3, leave_db_site, S3}, - {N1, leave_db_site, S1}, - {N2, join_db_site, S3} - ], + Sequence = [ + {N1, join_db_site, S3}, + {N2, leave_db_site, S2}, + {N3, leave_db_site, S1}, + {N1, join_db_site, S2}, + {N2, join_db_site, S1}, + {N3, leave_db_site, S3}, + {N1, leave_db_site, S1}, + {N2, join_db_site, S3} + ], - %% Apply the sequence while also filling the storage with messages. - TransitionMessages = lists:map( - fun({N, Operation, Site}) -> - %% Apply the transition. - ?assertEqual(ok, ds_repl_meta(N, Operation, [?DB, Site])), - %% Give some time for at least one transition to complete. - Transitions = transitions(N, ?DB), - ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), - ?retry(200, 10, ?assertNotEqual(Transitions, transitions(N, ?DB))), - %% Fill the storage with messages. - CID = integer_to_binary(erlang:system_time()), - fill_storage(N, ?DB, NMsgs, #{client_id => CID}) + %% Interleaved list of events: + Stream = emqx_utils_stream:interleave( + [ + {50, Stream0}, + emqx_utils_stream:list(Sequence) + ], + true + ), + + ?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])), + ?assertEqual( + lists:sort([S1, S2]), + ds_repl_meta(N1, db_sites, [?DB]), + "Initially, the DB is assigned to [S1, S2]" + ), + + emqx_ds_test_helpers:apply_stream(?DB, Nodes, Stream), + + %% Wait for the last transition to complete. + ?retry(500, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), + + ?defer_assert( + ?assertEqual( + lists:sort([S2, S3]), + ds_repl_meta(N1, db_sites, [?DB]) + ) + ), + + %% Wait until the LTS timestamp is updated: + timer:sleep(5000), + + %% Check that all messages are still there. + emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams) end, - Sequence - ), - - %% Wait for the last transition to complete. - ?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))), - - ?assertEqual( - lists:sort([S2, S3]), - ds_repl_meta(N1, db_sites, [?DB]) - ), - - %% Check that all messages are still there. - Messages = lists:append(TransitionMessages) ++ Messages0, - MessagesDB = lists:sort(fun compare_message/2, consume(N1, ?DB, ['#'], 0)), - ?assertEqual(sample(20, Messages), sample(20, MessagesDB)), - ?assertEqual(Messages, MessagesDB). + [] + ). t_rebalance_offline_restarts(init, Config) -> Apps = [appspec(emqx_durable_storage)], @@ -447,7 +458,7 @@ t_rebalance_offline_restarts(Config) -> %% Shut down N3 and then remove it from the DB. ok = emqx_cth_cluster:stop_node(N3), ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])), - Transitions = transitions(N1, ?DB), + Transitions = emqx_ds_test_helpers:transitions(N1, ?DB), ct:pal("Transitions: ~p~n", [Transitions]), %% Wait until at least one transition completes. @@ -462,7 +473,7 @@ t_rebalance_offline_restarts(Config) -> ), %% Target state should still be reached eventually. - ?retry(1000, 20, ?assertEqual([], transitions(N1, ?DB))), + ?retry(1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?DB))), ?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])). %% @@ -478,15 +489,19 @@ ds_repl_meta(Node, Fun) -> ds_repl_meta(Node, Fun, []). ds_repl_meta(Node, Fun, Args) -> - erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args). + try + erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args) + catch + EC:Err:Stack -> + ct:pal("emqx_ds_replication_layer_meta:~p(~p) @~p failed:~n~p:~p~nStack: ~p", [ + Fun, Args, Node, EC, Err, Stack + ]), + error(meta_op_failed) + end. ds_repl_shard(Node, Fun, Args) -> erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args). -transitions(Node, DB) -> - Shards = shards(Node, DB), - [{S, T} || S <- Shards, T <- ds_repl_meta(Node, replica_set_transitions, [DB, S])]. - shards(Node, DB) -> erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]). @@ -496,25 +511,6 @@ shards_online(Node, DB) -> n_shards_online(Node, DB) -> length(shards_online(Node, DB)). -fill_storage(Node, DB, NMsgs, Opts) -> - fill_storage(Node, DB, NMsgs, 0, Opts). - -fill_storage(Node, DB, NMsgs, I, Opts) when I < NMsgs -> - PAddGen = maps:get(p_addgen, Opts, 0.001), - R1 = push_message(Node, DB, I, Opts), - R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end), - R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts); -fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) -> - []. - -push_message(Node, DB, I, Opts) -> - Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]), - {Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)), - ClientId = maps:get(client_id, Opts, <>), - Message = message(ClientId, Topic, Bytes, I * 100), - ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]), - [Message]. - add_generation(Node, DB) -> ok = erpc:call(Node, emqx_ds, add_generation, [DB]), []. @@ -545,9 +541,14 @@ probably(P, Fun) -> sample(N, List) -> L = length(List), - H = N div 2, - Filler = integer_to_list(L - N) ++ " more", - lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L). + case L =< N of + true -> + L; + false -> + H = N div 2, + Filler = integer_to_list(L - N) ++ " more", + lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L) + end. %% diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl index eaddab0c6..39158c7ef 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl @@ -23,7 +23,7 @@ -include_lib("stdlib/include/assert.hrl"). opts() -> - #{storage => {emqx_ds_storage_bitfield_lts, #{}}}. + #{storage => {emqx_ds_storage_reference, #{}}}. %% diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 78838e675..bb6d0f917 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -73,13 +73,15 @@ t_iterate(_Config) -> begin [{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0), {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0), - {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next(?SHARD, It, 100), + {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next( + ?SHARD, It, 100, emqx_ds:timestamp_us() + ), Messages = [Msg || {_DSKey, Msg} <- MessagesAndKeys], ?assertEqual( lists:map(fun integer_to_binary/1, Timestamps), payloads(Messages) ), - {ok, _, []} = emqx_ds_storage_layer:next(?SHARD, NextIt, 100) + {ok, _, []} = emqx_ds_storage_layer:next(?SHARD, NextIt, 100, emqx_ds:timestamp_us()) end || Topic <- Topics ], @@ -370,7 +372,7 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) -> F(It, 0) -> error({too_many_iterations, It}); F(It, N) -> - case emqx_ds_storage_layer:next(Shard, It, BatchSize) of + case emqx_ds_storage_layer:next(Shard, It, BatchSize, emqx_ds:timestamp_us()) of end_of_stream -> []; {ok, _NextIt, []} -> @@ -542,7 +544,11 @@ delete(_Shard, [], _Selector) -> delete(Shard, Iterators, Selector) -> {NewIterators0, N} = lists:foldl( fun(Iterator0, {AccIterators, NAcc}) -> - case emqx_ds_storage_layer:delete_next(Shard, Iterator0, Selector, 10) of + case + emqx_ds_storage_layer:delete_next( + Shard, Iterator0, Selector, 10, emqx_ds:timestamp_us() + ) + of {ok, end_of_stream} -> {AccIterators, NAcc}; {ok, _Iterator1, 0} -> @@ -573,7 +579,7 @@ replay(_Shard, []) -> replay(Shard, Iterators) -> {NewIterators0, Messages0} = lists:foldl( fun(Iterator0, {AccIterators, AccMessages}) -> - case emqx_ds_storage_layer:next(Shard, Iterator0, 10) of + case emqx_ds_storage_layer:next(Shard, Iterator0, 10, emqx_ds:timestamp_us()) of {ok, end_of_stream} -> {AccIterators, AccMessages}; {ok, _Iterator1, []} -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index be4f7bcdf..996f39626 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -18,6 +18,14 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx_utils/include/emqx_message.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-define(ON(NODE, BODY), + erpc:call(NODE, erlang, apply, [fun() -> BODY end, []]) +). + %% RPC mocking mock_rpc() -> @@ -57,8 +65,221 @@ mock_rpc_result(gen_rpc, ExpectFun) -> end end). +%% Consume data from the DS storage on a given node as a stream: +-type ds_stream() :: emqx_utils_stream:stream({emqx_ds:message_key(), emqx_types:message()}). + +%% @doc Create an infinite list of messages from a given client: +interleaved_topic_messages(TestCase, NClients, NMsgs) -> + %% List of fake client IDs: + Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], + TopicStreams = [ + {ClientId, emqx_utils_stream:limit_length(NMsgs, topic_messages(TestCase, ClientId))} + || ClientId <- Clients + ], + %% Interleaved stream of messages: + Stream = emqx_utils_stream:interleave( + [{2, Stream} || {_ClientId, Stream} <- TopicStreams], true + ), + {Stream, TopicStreams}. + +topic_messages(TestCase, ClientId) -> + topic_messages(TestCase, ClientId, 0). + +topic_messages(TestCase, ClientId, N) -> + fun() -> + NBin = integer_to_binary(N), + Msg = #message{ + from = ClientId, + topic = client_topic(TestCase, ClientId), + timestamp = N * 100, + payload = <> + }, + [Msg | topic_messages(TestCase, ClientId, N + 1)] + end. + +client_topic(TestCase, ClientId) when is_atom(TestCase) -> + client_topic(atom_to_binary(TestCase, utf8), ClientId); +client_topic(TestCase, ClientId) when is_binary(TestCase) -> + <>. + +ds_topic_generation_stream(DB, Node, Shard, Topic, Stream) -> + {ok, Iterator} = ?ON( + Node, + emqx_ds_storage_layer:make_iterator(Shard, Stream, Topic, 0) + ), + do_ds_topic_generation_stream(DB, Node, Shard, Iterator). + +do_ds_topic_generation_stream(DB, Node, Shard, It0) -> + fun() -> + case + ?ON( + Node, + begin + Now = emqx_ds_replication_layer:current_timestamp(DB, Shard), + emqx_ds_storage_layer:next(Shard, It0, 1, Now) + end + ) + of + {ok, _It, []} -> + []; + {ok, end_of_stream} -> + []; + {ok, It, [KeyMsg]} -> + [KeyMsg | do_ds_topic_generation_stream(DB, Node, Shard, It)] + end + end. + +%% Payload generation: + +apply_stream(DB, Nodes, Stream) -> + apply_stream( + DB, + emqx_utils_stream:repeat(emqx_utils_stream:list(Nodes)), + Stream, + 0 + ). + +apply_stream(DB, NodeStream0, Stream0, N) -> + case emqx_utils_stream:next(Stream0) of + [] -> + ?tp(all_done, #{}); + [Msg = #message{} | Stream] -> + [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), + ?tp( + test_push_message, + maps:merge( + emqx_message:to_map(Msg), + #{n => N} + ) + ), + ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})), + apply_stream(DB, NodeStream, Stream, N + 1); + [add_generation | Stream] -> + %% FIXME: + [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), + ?ON(Node, emqx_ds:add_generation(DB)), + apply_stream(DB, NodeStream, Stream, N); + [{Node, Operation, Arg} | Stream] when + Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites + -> + ?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}), + %% Apply the transition. + ?assertEqual( + ok, + ?ON( + Node, + emqx_ds_replication_layer_meta:Operation(DB, Arg) + ) + ), + %% Give some time for at least one transition to complete. + Transitions = transitions(Node, DB), + ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), + ?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))), + apply_stream(DB, NodeStream0, Stream, N); + [Fun | Stream] when is_function(Fun) -> + Fun(), + apply_stream(DB, NodeStream0, Stream, N) + end. + +transitions(Node, DB) -> + ?ON( + Node, + begin + Shards = emqx_ds_replication_layer_meta:shards(DB), + [ + {S, T} + || S <- Shards, T <- emqx_ds_replication_layer_meta:replica_set_transitions(DB, S) + ] + end + ). + +%% Stream comparison + +message_eq(Msg1, {_Key, Msg2}) -> + %% Timestamps can be modified by the replication layer, ignore them: + Msg1#message{timestamp = 0} =:= Msg2#message{timestamp = 0}. + %% Consuming streams and iterators +-spec verify_stream_effects(atom(), binary(), [node()], [{emqx_types:clientid(), ds_stream()}]) -> + ok. +verify_stream_effects(DB, TestCase, Nodes0, L) -> + Checked = lists:flatmap( + fun({ClientId, Stream}) -> + Nodes = nodes_of_clientid(DB, ClientId, Nodes0), + ct:pal("Nodes allocated for client ~p: ~p", [ClientId, Nodes]), + ?defer_assert( + ?assertMatch([_ | _], Nodes, ["No nodes have been allocated for ", ClientId]) + ), + [verify_stream_effects(DB, TestCase, Node, ClientId, Stream) || Node <- Nodes] + end, + L + ), + ?defer_assert(?assertMatch([_ | _], Checked, "Some messages have been verified")). + +-spec verify_stream_effects(atom(), binary(), node(), emqx_types:clientid(), ds_stream()) -> ok. +verify_stream_effects(DB, TestCase, Node, ClientId, ExpectedStream) -> + ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]), + DiffOpts = #{context => 20, window => 1000, compare_fun => fun message_eq/2}, + ?defer_assert( + begin + snabbkaffe_diff:assert_lists_eq( + ExpectedStream, + ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node), + DiffOpts + ), + ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node]) + end + ). + +%% Create a stream from the topic (wildcards are NOT supported for a +%% good reason: order of messages is implementation-dependent!). +%% +%% Note: stream produces messages with keys +-spec ds_topic_stream(atom(), binary(), binary(), node()) -> ds_stream(). +ds_topic_stream(DB, ClientId, TopicBin, Node) -> + Topic = emqx_topic:words(TopicBin), + Shard = shard_of_clientid(DB, Node, ClientId), + {ShardId, DSStreams} = + ?ON( + Node, + begin + DBShard = {DB, Shard}, + {DBShard, emqx_ds_storage_layer:get_streams(DBShard, Topic, 0)} + end + ), + %% Sort streams by their rank Y, and chain them together: + emqx_utils_stream:chain([ + ds_topic_generation_stream(DB, Node, ShardId, Topic, S) + || {_RankY, S} <- lists:sort(DSStreams) + ]). + +%% Find which nodes from the list contain the shards for the given +%% client ID: +nodes_of_clientid(DB, ClientId, Nodes = [N0 | _]) -> + Shard = shard_of_clientid(DB, N0, ClientId), + SiteNodes = ?ON( + N0, + begin + Sites = emqx_ds_replication_layer_meta:replica_set(DB, Shard), + lists:map(fun emqx_ds_replication_layer_meta:node/1, Sites) + end + ), + lists:filter( + fun(N) -> + lists:member(N, SiteNodes) + end, + Nodes + ). + +shard_of_clientid(DB, Node, ClientId) -> + ?ON( + Node, + emqx_ds_replication_layer:shard_of_message(DB, #message{from = ClientId}, clientid) + ). + +%% Consume eagerly: + consume(DB, TopicFilter) -> consume(DB, TopicFilter, 0). @@ -85,8 +306,14 @@ consume_stream(DB, Stream, TopicFilter, StartTime) -> consume_iter(DB, It) -> consume_iter(DB, It, #{}). -consume_iter(DB, It, Opts) -> - consume_iter_with(fun emqx_ds:next/3, [DB], It, Opts). +consume_iter(DB, It0, Opts) -> + consume_iter_with( + fun(It, BatchSize) -> + emqx_ds:next(DB, It, BatchSize) + end, + It0, + Opts + ). storage_consume(ShardId, TopicFilter) -> storage_consume(ShardId, TopicFilter, 0). @@ -108,16 +335,22 @@ storage_consume_stream(ShardId, Stream, TopicFilter, StartTime) -> storage_consume_iter(ShardId, It) -> storage_consume_iter(ShardId, It, #{}). -storage_consume_iter(ShardId, It, Opts) -> - consume_iter_with(fun emqx_ds_storage_layer:next/3, [ShardId], It, Opts). +storage_consume_iter(ShardId, It0, Opts) -> + consume_iter_with( + fun(It, BatchSize) -> + emqx_ds_storage_layer:next(ShardId, It, BatchSize, emqx_ds:timestamp_us()) + end, + It0, + Opts + ). -consume_iter_with(NextFun, Args, It0, Opts) -> +consume_iter_with(NextFun, It0, Opts) -> BatchSize = maps:get(batch_size, Opts, 5), - case erlang:apply(NextFun, Args ++ [It0, BatchSize]) of + case NextFun(It0, BatchSize) of {ok, It, _Msgs = []} -> {ok, It, []}; {ok, It1, Batch} -> - {ok, It, Msgs} = consume_iter_with(NextFun, Args, It1, Opts), + {ok, It, Msgs} = consume_iter_with(NextFun, It1, Opts), {ok, It, [Msg || {_DSKey, Msg} <- Batch] ++ Msgs}; {ok, Eos = end_of_stream} -> {ok, Eos, []}; diff --git a/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src index 0409f1ed2..8e5157695 100644 --- a/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src +++ b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_jt808, [ {description, "JT/T 808 Gateway"}, - {vsn, "0.0.2"}, + {vsn, "0.0.3"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl index eec1c24da..876f623e9 100644 --- a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl @@ -230,9 +230,14 @@ handle_in(Frame = ?MSG(MType), Channel = #channel{conn_state = ConnState}) when handle_in(Frame, Channel = #channel{conn_state = connected}) -> ?SLOG(debug, #{msg => "recv_frame", frame => Frame}), do_handle_in(Frame, Channel); +handle_in(Frame = ?MSG(MType), Channel) when + MType =:= ?MC_DEREGISTER +-> + ?SLOG(debug, #{msg => "recv_frame", frame => Frame, info => "jt808_client_deregister"}), + do_handle_in(Frame, Channel#channel{conn_state = disconnected}); handle_in(Frame, Channel) -> ?SLOG(error, #{msg => "unexpected_frame", frame => Frame}), - {stop, unexpected_frame, Channel}. + {shutdown, unexpected_frame, Channel}. %% @private do_handle_in(Frame = ?MSG(?MC_GENERAL_RESPONSE), Channel = #channel{inflight = Inflight}) -> @@ -241,19 +246,24 @@ do_handle_in(Frame = ?MSG(?MC_GENERAL_RESPONSE), Channel = #channel{inflight = I {ok, Channel#channel{inflight = NewInflight}}; do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) -> #{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame, - case emqx_jt808_auth:register(Frame, Channel0#channel.auth) of - {ok, Authcode} -> - {ok, Conninfo} = enrich_conninfo(Frame, Channel0#channel{authcode = Authcode}), - {ok, Channel} = enrich_clientinfo(Frame, Conninfo), - handle_out({?MS_REGISTER_ACK, 0}, MsgSn, Channel); - {error, Reason} -> - ?SLOG(error, #{msg => "register_failed", reason => Reason}), - ResCode = - case is_integer(Reason) of - true -> Reason; - false -> 1 - end, - handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel0) + case + emqx_utils:pipeline( + [ + fun enrich_clientinfo/2, + fun enrich_conninfo/2, + fun set_log_meta/2 + ], + Frame, + Channel0 + ) + of + {ok, _NFrame, Channel} -> + case register_(Frame, Channel) of + {ok, NChannel} -> + handle_out({?MS_REGISTER_ACK, 0}, MsgSn, NChannel); + {error, ResCode} -> + handle_out({?MS_REGISTER_ACK, ResCode}, MsgSn, Channel) + end end; do_handle_in(Frame = ?MSG(?MC_AUTH), Channel0) -> #{<<"header">> := #{<<"msg_sn">> := MsgSn}} = Frame, @@ -311,7 +321,7 @@ do_handle_in( {ok, Channel#channel{inflight = ack_msg(?MC_DRIVER_ID_REPORT, none, Inflight)}} end; do_handle_in(?MSG(?MC_DEREGISTER), Channel) -> - {stop, normal, Channel}; + {shutdown, normal, Channel}; do_handle_in(Frame = #{}, Channel = #channel{up_topic = Topic, inflight = Inflight}) -> {MsgId, MsgSn} = msgidsn(Frame), _ = do_publish(Topic, Frame), @@ -859,6 +869,20 @@ is_driver_id_req_exist(#channel{inflight = Inflight}) -> Key = get_msg_ack(?MC_DRIVER_ID_REPORT, none), emqx_inflight:contain(Key, Inflight). +register_(Frame, Channel0) -> + case emqx_jt808_auth:register(Frame, Channel0#channel.auth) of + {ok, Authcode} -> + {ok, Channel0#channel{authcode = Authcode}}; + {error, Reason} -> + ?SLOG(error, #{msg => "register_failed", reason => Reason}), + ResCode = + case is_integer(Reason) of + true -> Reason; + false -> 1 + end, + {error, ResCode} + end. + authenticate(_AuthFrame, #channel{authcode = anonymous}) -> true; authenticate(AuthFrame, #channel{authcode = undefined, auth = Auth}) -> diff --git a/apps/emqx_gateway_jt808/test/emqx_jt808_SUITE.erl b/apps/emqx_gateway_jt808/test/emqx_jt808_SUITE.erl index c8c747c1c..04a7ad0ad 100644 --- a/apps/emqx_gateway_jt808/test/emqx_jt808_SUITE.erl +++ b/apps/emqx_gateway_jt808/test/emqx_jt808_SUITE.erl @@ -68,6 +68,22 @@ gateway.jt808 { } ">>). +%% erlfmt-ignore +-define(CONF_INVALID_AUTH_SERVER, <<" +gateway.jt808 { + listeners.tcp.default { + bind = ", ?PORT_STR, " + } + proto { + auth { + allow_anonymous = false + registry = \"abc://abc\" + authentication = \"abc://abc\" + } + } +} +">>). + all() -> emqx_common_test_helpers:all(?MODULE). @@ -77,6 +93,9 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. +init_per_testcase(Case = t_case_invalid_auth_reg_server, Config) -> + Apps = boot_apps(Case, ?CONF_INVALID_AUTH_SERVER, Config), + [{suite_apps, Apps} | Config]; init_per_testcase(Case = t_case02_anonymous_register_and_auth, Config) -> Apps = boot_apps(Case, ?CONF_ANONYMOUS, Config), [{suite_apps, Apps} | Config]; @@ -146,7 +165,7 @@ do_escape(<>, Acc) -> client_regi_procedure(Socket) -> client_regi_procedure(Socket, <<"123456">>). -client_regi_procedure(Socket, ExpectedCode) -> +client_regi_procedure(Socket, ExpectedAuthCode) -> % % send REGISTER % @@ -170,7 +189,7 @@ client_regi_procedure(Socket, ExpectedCode) -> ok = gen_tcp:send(Socket, S1), {ok, Packet} = gen_tcp:recv(Socket, 0, 500), - AckPacket = <>, + AckPacket = <>, Size2 = size(AckPacket), MsgId2 = ?MS_REGISTER_ACK, MsgSn2 = 0, @@ -181,7 +200,7 @@ client_regi_procedure(Socket, ExpectedCode) -> ?LOGT("S2=~p", [binary_to_hex_string(S2)]), ?LOGT("Packet=~p", [binary_to_hex_string(Packet)]), ?assertEqual(S2, Packet), - {ok, ExpectedCode}. + {ok, ExpectedAuthCode}. client_auth_procedure(Socket, AuthCode) -> ?LOGT("start auth procedure", []), @@ -2683,6 +2702,52 @@ t_case34_dl_0x8805_single_mm_data_ctrl(_Config) -> ok = gen_tcp:close(Socket). +t_case_invalid_auth_reg_server(_Config) -> + {ok, Socket} = gen_tcp:connect({127, 0, 0, 1}, ?PORT, [binary, {active, false}]), + % + % send REGISTER + % + Manuf = <<"examp">>, + Model = <<"33333333333333333333">>, + DevId = <<"1234567">>, + + Color = 3, + Plate = <<"ujvl239">>, + RegisterPacket = + <<58:?WORD, 59:?WORD, Manuf/binary, Model/binary, DevId/binary, Color, Plate/binary>>, + MsgId = ?MC_REGISTER, + PhoneBCD = <<16#00, 16#01, 16#23, 16#45, 16#67, 16#89>>, + MsgSn = 78, + Size = size(RegisterPacket), + Header = + <>, + S1 = gen_packet(Header, RegisterPacket), + + %% Send REGISTER Packet + ok = gen_tcp:send(Socket, S1), + %% Receive REGISTER_ACK Packet + {ok, RecvPacket} = gen_tcp:recv(Socket, 0, 50_000), + + %% No AuthCode when register failed + AuthCode = <<>>, + + AckPacket = <>, + Size2 = size(AckPacket), + MsgId2 = ?MS_REGISTER_ACK, + MsgSn2 = 0, + Header2 = + <>, + S2 = gen_packet(Header2, AckPacket), + + ?LOGT("S1=~p", [binary_to_hex_string(S1)]), + ?LOGT("S2=~p", [binary_to_hex_string(S2)]), + ?LOGT("Received REGISTER_ACK Packet=~p", [binary_to_hex_string(RecvPacket)]), + + ?assertEqual(S2, RecvPacket), + ok. + t_create_ALLOW_invalid_auth_config(_Config) -> test_invalid_config(create, true). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index a32397bb1..940f76b9a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -135,7 +135,6 @@ schema("/clients_v2") -> #{ 'operationId' => list_clients_v2, get => #{ - security => [], description => ?DESC(list_clients), tags => ?TAGS, parameters => fields(list_clients_v2_inputs), @@ -575,6 +574,11 @@ fields(client) -> desc => <<"Indicates whether the client is connected via bridge">> })}, + {is_expired, + hoconsc:mk(boolean(), #{ + desc => + <<"Indicates whether the client session is expired">> + })}, {keepalive, hoconsc:mk(integer(), #{ desc => @@ -985,7 +989,7 @@ do_list_clients_v2(Nodes, _Cursor = #{type := ?CURSOR_TYPE_DS, iterator := Iter0 #{limit := Limit} = Acc0, {Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit), NewCursor = next_ds_cursor(Iter), - Rows1 = drop_live_and_expired(Rows0), + Rows1 = check_for_live_and_expired(Rows0), Rows = maybe_run_fuzzy_filter(Rows1, QString0), Acc1 = maps:update_with(rows, fun(Rs) -> [{undefined, Rows} | Rs] end, Acc0), Acc = #{n := N} = maps:update_with(n, fun(N) -> N + length(Rows) end, Acc1), @@ -1513,7 +1517,7 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) -> %% through all the nodes. #{limit := Limit} = QueryState, {Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit), - Rows = drop_live_and_expired(Rows0), + Rows = check_for_live_and_expired(Rows0), case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of {enough, NResultAcc} -> emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true)); @@ -1523,14 +1527,15 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) -> do_persistent_session_query1(NResultAcc, QueryState, Iter) end. -drop_live_and_expired(Rows) -> +check_for_live_and_expired(Rows) -> lists:filtermap( fun({ClientId, Session}) -> - case is_expired(Session) orelse is_live_session(ClientId) of + case is_live_session(ClientId) of true -> false; false -> - {true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}} + DSSession = emqx_persistent_session_ds_state:print_session(ClientId), + {true, {ClientId, DSSession#{is_expired => is_expired(Session)}}} end end, Rows @@ -1730,7 +1735,11 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}, Opts) -> ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2), ClientInfoMap4 = maps:put(port, Port, ClientInfoMap3), ClientInfoMap5 = convert_expiry_interval_unit(ClientInfoMap4), - ClientInfoMap = maps:put(connected, Connected, ClientInfoMap5), + ClientInfoMap6 = maps:put(connected, Connected, ClientInfoMap5), + %% Since this is for the memory session format, and its lifetime is linked to the + %% channel process, we may say it's not expired. Durable sessions will override this + %% field if needed in their format function. + ClientInfoMap = maps:put(is_expired, false, ClientInfoMap6), #{fields := RequestedFields} = Opts, TimesKeys = [created_at, connected_at, disconnected_at], @@ -1755,6 +1764,7 @@ format_persistent_session_info( connected => false, durable => true, is_persistent => true, + is_expired => maps:get(is_expired, PSInfo, false), subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{})) }; format_persistent_session_info(ClientId, PSInfo0) -> @@ -1776,6 +1786,7 @@ format_persistent_session_info(ClientId, PSInfo0) -> connected_at => CreatedAt, durable => true, ip_address => IpAddress, + is_expired => maps:get(is_expired, PSInfo0, false), is_persistent => true, port => Port, heap_size => 0, diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 39d775a7a..c354552f9 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -49,6 +49,7 @@ persistent_session_testcases() -> t_persistent_sessions3, t_persistent_sessions4, t_persistent_sessions5, + t_persistent_sessions6, t_persistent_sessions_subscriptions1, t_list_clients_v2 ]. @@ -553,6 +554,51 @@ t_persistent_sessions5(Config) -> ), ok. +%% Checks that expired durable sessions are returned with `is_expired => true'. +t_persistent_sessions6(Config) -> + [N1, _N2] = ?config(nodes, Config), + APIPort = 18084, + Port1 = get_mqtt_port(N1, tcp), + + ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + + ?check_trace( + begin + O = #{api_port => APIPort}, + ClientId = <<"c1">>, + C1 = connect_client(#{port => Port1, clientid => ClientId, expiry => 1}), + assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), + ?retry( + 100, + 20, + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"is_expired">> := false}]}}}, + list_request(APIPort) + ) + ), + + ok = emqtt:disconnect(C1), + %% Wait for session to be considered expired but not GC'ed + ct:sleep(2_000), + assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}), + ?retry( + 100, + 20, + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"is_expired">> := true}]}}}, + list_request(APIPort) + ) + ), + + C2 = connect_client(#{port => Port1, clientid => ClientId}), + disconnect_and_destroy_session(C2), + + ok + end, + [] + ), + ok. + %% Check that the output of `/clients/:clientid/subscriptions' has the expected keys. t_persistent_sessions_subscriptions1(Config) -> [N1, _N2] = ?config(nodes, Config), diff --git a/apps/emqx_plugins/include/emqx_plugins.hrl b/apps/emqx_plugins/include/emqx_plugins.hrl index 95dc50e4f..f822b9c8d 100644 --- a/apps/emqx_plugins/include/emqx_plugins.hrl +++ b/apps/emqx_plugins/include/emqx_plugins.hrl @@ -25,7 +25,7 @@ -define(CONFIG_FORMAT_MAP, config_format_map). -type schema_name() :: binary(). --type avsc() :: binary(). +-type avsc_path() :: string(). -type encoded_data() :: iodata(). -type decoded_data() :: map(). diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 67d25bf7a..108ef1386 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -94,6 +94,8 @@ -define(RAW_BIN, binary). -define(JSON_MAP, json_map). +-define(MAX_KEEP_BACKUP_CONFIGS, 10). + %% "my_plugin-0.1.0" -type name_vsn() :: binary() | string(). %% the parse result of the JSON info file @@ -287,7 +289,7 @@ get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) -> %% the avro Json Map and plugin config ALWAYS be valid before calling this function. put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) -> AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), - ok = write_avro_bin(NameVsn, AvroJsonBin), + ok = backup_and_write_avro_bin(NameVsn, AvroJsonBin), ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap), ok. @@ -1020,10 +1022,7 @@ for_plugin(#{name_vsn := NameVsn, enable := true}, Fun) -> {error, Reason} -> [{NameVsn, Reason}] end; for_plugin(#{name_vsn := NameVsn, enable := false}, _Fun) -> - ?SLOG(debug, #{ - msg => "plugin_disabled", - name_vsn => NameVsn - }), + ?SLOG(debug, #{msg => "plugin_disabled", name_vsn => NameVsn}), []. maybe_post_op_after_install(NameVsn) -> @@ -1057,8 +1056,69 @@ maybe_create_config_dir(NameVsn) -> {error, {mkdir_failed, ConfigDir, Reason}} end. -write_avro_bin(NameVsn, AvroBin) -> - ok = file:write_file(avro_config_file(NameVsn), AvroBin). +%% @private Backup the current config to a file with a timestamp suffix and +%% then save the new config to the config file. +backup_and_write_avro_bin(NameVsn, AvroBin) -> + %% this may fail, but we don't care + %% e.g. read-only file system + Path = avro_config_file(NameVsn), + _ = filelib:ensure_dir(Path), + TmpFile = Path ++ ".tmp", + case file:write_file(TmpFile, AvroBin) of + ok -> + backup_and_replace(Path, TmpFile); + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_save_plugin_conf_file", + hint => + "The updated cluster config is not saved on this node, please check the file system.", + filename => TmpFile, + reason => Reason + }), + %% e.g. read-only, it's not the end of the world + ok + end. + +backup_and_replace(Path, TmpPath) -> + Backup = Path ++ "." ++ now_time() ++ ".bak", + case file:rename(Path, Backup) of + ok -> + ok = file:rename(TmpPath, Path), + ok = prune_backup_files(Path); + {error, enoent} -> + %% not created yet + ok = file:rename(TmpPath, Path); + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_backup_plugin_conf_file", + filename => Backup, + reason => Reason + }), + ok + end. + +prune_backup_files(Path) -> + Files0 = filelib:wildcard(Path ++ ".*"), + Re = "\\.[0-9]{4}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{3}\\.bak$", + Files = lists:filter(fun(F) -> re:run(F, Re) =/= nomatch end, Files0), + Sorted = lists:reverse(lists:sort(Files)), + {_Keeps, Deletes} = lists:split(min(?MAX_KEEP_BACKUP_CONFIGS, length(Sorted)), Sorted), + lists:foreach( + fun(F) -> + case file:delete(F) of + ok -> + ok; + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_delete_backup_plugin_conf_file", + filename => F, + reason => Reason + }), + ok + end + end, + Deletes + ). read_file_fun(Path, ErrMsg, #{read_mode := ?RAW_BIN}) -> fun() -> @@ -1082,30 +1142,38 @@ read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) -> end. %% Directorys +-spec plugin_dir(name_vsn()) -> string(). plugin_dir(NameVsn) -> - filename:join([install_dir(), NameVsn]). + wrap_list_path(filename:join([install_dir(), NameVsn])). +-spec plugin_config_dir(name_vsn()) -> string(). plugin_config_dir(NameVsn) -> - filename:join([plugin_dir(NameVsn), "data", "configs"]). + wrap_list_path(filename:join([plugin_dir(NameVsn), "data", "configs"])). %% Files +-spec pkg_file_path(name_vsn()) -> string(). pkg_file_path(NameVsn) -> - filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]). + wrap_list_path(filename:join([install_dir(), bin([NameVsn, ".tar.gz"])])). +-spec info_file_path(name_vsn()) -> string(). info_file_path(NameVsn) -> - filename:join([plugin_dir(NameVsn), "release.json"]). + wrap_list_path(filename:join([plugin_dir(NameVsn), "release.json"])). +-spec avsc_file_path(name_vsn()) -> string(). avsc_file_path(NameVsn) -> - filename:join([plugin_dir(NameVsn), "config_schema.avsc"]). + wrap_list_path(filename:join([plugin_dir(NameVsn), "config_schema.avsc"])). +-spec avro_config_file(name_vsn()) -> string(). avro_config_file(NameVsn) -> - filename:join([plugin_config_dir(NameVsn), "config.avro"]). + wrap_list_path(filename:join([plugin_config_dir(NameVsn), "config.avro"])). +-spec i18n_file_path(name_vsn()) -> string(). i18n_file_path(NameVsn) -> - filename:join([plugin_dir(NameVsn), "config_i18n.json"]). + wrap_list_path(filename:join([plugin_dir(NameVsn), "config_i18n.json"])). +-spec readme_file(name_vsn()) -> string(). readme_file(NameVsn) -> - filename:join([plugin_dir(NameVsn), "README.md"]). + wrap_list_path(filename:join([plugin_dir(NameVsn), "README.md"])). running_apps() -> lists:map( @@ -1115,6 +1183,17 @@ running_apps() -> application:which_applications(infinity) ). +%% @private This is the same human-readable timestamp format as +%% hocon-cli generated app.