diff --git a/.ci/docker-compose-file/docker-compose.yaml b/.ci/docker-compose-file/docker-compose.yaml index 819522dbd..a95acc772 100644 --- a/.ci/docker-compose-file/docker-compose.yaml +++ b/.ci/docker-compose-file/docker-compose.yaml @@ -3,7 +3,7 @@ version: '3.9' services: erlang: container_name: erlang - image: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04 + image: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04 env_file: - conf.env environment: diff --git a/.github/workflows/apps_version_check.yaml b/.github/workflows/apps_version_check.yaml index 7bfabd340..4dd058468 100644 --- a/.github/workflows/apps_version_check.yaml +++ b/.github/workflows/apps_version_check.yaml @@ -13,7 +13,7 @@ jobs: os: - ubuntu20.04 - container: ghcr.io/emqx/emqx-builder/4.4-8:${{ matrix.erl_otp }}-${{ matrix.os }} + container: ghcr.io/emqx/emqx-builder/4.4-12:${{ matrix.erl_otp }}-${{ matrix.os }} steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index ab7a0eb32..51ea77970 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -19,7 +19,7 @@ jobs: prepare: runs-on: ubuntu-20.04 # prepare source with any OTP version, no need for a matrix - container: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04 + container: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04 outputs: profiles: ${{ steps.set_profile.outputs.profiles}} @@ -34,6 +34,7 @@ jobs: shell: bash working-directory: source run: | + git config --global --add safe.directory "$GITHUB_WORKSPACE" if make emqx-ee --dry-run > /dev/null 2>&1; then echo "::set-output name=profiles::[\"emqx-ee\"]" else @@ -216,9 +217,9 @@ jobs: - debian11 - debian10 - debian9 - - rockylinux8 - - centos7 - - raspbian10 + - el8 + - el7 + # - raspbian10 #armv6l is too slow to emulate exclude: - os: raspbian9 arch: amd64 @@ -262,7 +263,7 @@ jobs: --profile "${PROFILE}" \ --pkgtype "${PACKAGE}" \ --arch "${ARCH}" \ - --builder "ghcr.io/emqx/emqx-builder/4.4-8:${OTP}-${SYSTEM}" + --builder "ghcr.io/emqx/emqx-builder/4.4-12:${OTP}-${SYSTEM}" - uses: actions/upload-artifact@v1 if: startsWith(github.ref, 'refs/tags/') with: @@ -339,7 +340,7 @@ jobs: tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} build-args: | - BUILD_FROM=ghcr.io/emqx/emqx-builder/4.4-8:${{ matrix.otp }}-alpine3.15.1 + BUILD_FROM=ghcr.io/emqx/emqx-builder/4.4-12:${{ matrix.otp }}-alpine3.15.1 RUN_FROM=alpine:3.15.1 EMQX_NAME=${{ matrix.profile }} file: source/deploy/docker/Dockerfile @@ -355,7 +356,7 @@ jobs: tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} build-args: | - BUILD_FROM=ghcr.io/emqx/emqx-builder/4.4-8:${{ matrix.otp }}-alpine3.15.1 + BUILD_FROM=ghcr.io/emqx/emqx-builder/4.4-12:${{ matrix.otp }}-alpine3.15.1 RUN_FROM=alpine:3.15.1 EMQX_NAME=${{ matrix.profile }} file: source/deploy/docker/Dockerfile.enterprise diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml index af67d3060..21156957b 100644 --- a/.github/workflows/build_slim_packages.yaml +++ b/.github/workflows/build_slim_packages.yaml @@ -21,14 +21,15 @@ jobs: - 24.1.5-3 os: - ubuntu20.04 - - rockylinux8 + - el8 - container: ghcr.io/emqx/emqx-builder/4.4-8:${{ matrix.erl_otp }}-${{ matrix.os }} + container: ghcr.io/emqx/emqx-builder/4.4-12:${{ matrix.erl_otp }}-${{ matrix.os }} steps: - uses: actions/checkout@v1 - name: prepare run: | + git config --global --add safe.directory "$GITHUB_WORKSPACE" if make emqx-ee --dry-run > /dev/null 2>&1; then echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials git config --global credential.helper store diff --git a/.github/workflows/check_deps_integrity.yaml b/.github/workflows/check_deps_integrity.yaml index 2b49d4d77..e50738914 100644 --- a/.github/workflows/check_deps_integrity.yaml +++ b/.github/workflows/check_deps_integrity.yaml @@ -5,7 +5,7 @@ on: [pull_request] jobs: check_deps_integrity: runs-on: ubuntu-20.04 - container: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04 + container: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04 steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index ffcb69247..b03ac2077 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -7,7 +7,7 @@ on: jobs: prepare: runs-on: ubuntu-20.04 - container: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04 + container: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04 outputs: profiles: ${{ steps.set_profile.outputs.profiles}} diff --git a/.github/workflows/run_acl_migration_tests.yaml b/.github/workflows/run_acl_migration_tests.yaml index 3b11a9368..b491ef47b 100644 --- a/.github/workflows/run_acl_migration_tests.yaml +++ b/.github/workflows/run_acl_migration_tests.yaml @@ -5,7 +5,7 @@ on: workflow_dispatch jobs: test: runs-on: ubuntu-20.04 - container: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04 + container: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04 strategy: fail-fast: true env: diff --git a/.github/workflows/run_fvt_tests.yaml b/.github/workflows/run_fvt_tests.yaml index dbc79a8f5..db1a351c4 100644 --- a/.github/workflows/run_fvt_tests.yaml +++ b/.github/workflows/run_fvt_tests.yaml @@ -224,7 +224,7 @@ jobs: relup_test_plan: runs-on: ubuntu-20.04 - container: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04 + container: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04 outputs: profile: ${{ steps.profile-and-versions.outputs.profile }} vsn: ${{ steps.profile-and-versions.outputs.vsn }} @@ -275,7 +275,7 @@ jobs: otp: - 24.1.5-3 runs-on: ubuntu-20.04 - container: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04 + container: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04 defaults: run: shell: bash diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index efca499ea..d58486cda 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -10,7 +10,7 @@ on: jobs: run_proper_test: runs-on: ubuntu-20.04 - container: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04 + container: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04 steps: - uses: actions/checkout@v2 diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 3086e7934..c23c061ed 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -21,7 +21,7 @@ File format: Now MQTT clients may be authorized with respect to a specific claim containing publish/subscribe topic whitelists. * Better randomisation of app screts (changed from timestamp seeded sha hash (uuid) to crypto:strong_rand_bytes) * Return a client_identifier_not_valid error when username is empty and username_as_clientid is set to true [#7862] -* Add more rule engine date functions: format_date/3, format_date/4, date_to_unix_ts/4 [#7894] +* Add more rule engine date functions: format_date/3, format_date/4, date_to_unix_ts/3, date_to_unix_ts/4 [#7894] * Add proto_name and proto_ver fields for $event/client_disconnected event. * Mnesia auth/acl http api support multiple condition queries. * Inflight QoS1 Messages for shared topics are now redispatched to another alive subscribers upon chosen subscriber session termination. @@ -34,6 +34,7 @@ File format: * SSL closed error bug fixed for redis client. * Fix mqtt-sn client disconnected due to re-send a duplicated qos2 message * Rule-engine function hexstr2bin/1 support half byte [#7977] +* Shared message delivery when all alive shared subs have full inflight [#7984] * Improved resilience against autocluster partitioning during cluster startup. [#7876] [ekka-158](https://github.com/emqx/ekka/pull/158) diff --git a/Makefile b/Makefile index 0ba09b68c..417423f4e 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ REBAR = $(CURDIR)/rebar3 BUILD = $(CURDIR)/build SCRIPTS = $(CURDIR)/scripts export EMQX_RELUP ?= true -export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-alpine3.15.1 +export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-alpine3.15.1 export EMQX_DEFAULT_RUNNER = alpine:3.15.1 export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh) diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 33d09c16d..ad41ec2d8 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -204,6 +204,8 @@ , unix_ts_to_rfc3339/2 , format_date/3 , format_date/4 + , timezone_to_second/1 + , date_to_unix_ts/3 , date_to_unix_ts/4 , rfc3339_to_unix_ts/1 , rfc3339_to_unix_ts/2 @@ -929,26 +931,37 @@ now_timestamp(Unit) -> time_unit(<<"second">>) -> second; time_unit(<<"millisecond">>) -> millisecond; time_unit(<<"microsecond">>) -> microsecond; -time_unit(<<"nanosecond">>) -> nanosecond. +time_unit(<<"nanosecond">>) -> nanosecond; +time_unit(second) -> second; +time_unit(millisecond) -> millisecond; +time_unit(microsecond) -> microsecond; +time_unit(nanosecond) -> nanosecond. format_date(TimeUnit, Offset, FormatString) -> - emqx_rule_utils:bin( - emqx_rule_date:date(time_unit(TimeUnit), - emqx_rule_utils:str(Offset), - emqx_rule_utils:str(FormatString))). + Unit = time_unit(TimeUnit), + TimeEpoch = erlang:system_time(Unit), + format_date(Unit, Offset, FormatString, TimeEpoch). + +timezone_to_second(TimeZone) -> + emqx_calendar:offset_second(TimeZone). format_date(TimeUnit, Offset, FormatString, TimeEpoch) -> + Unit = time_unit(TimeUnit), emqx_rule_utils:bin( - emqx_rule_date:date(time_unit(TimeUnit), - emqx_rule_utils:str(Offset), - emqx_rule_utils:str(FormatString), - TimeEpoch)). + lists:concat( + emqx_calendar:format(TimeEpoch, Unit, Offset, FormatString))). +%% date string has timezone information, calculate the offset. +date_to_unix_ts(TimeUnit, FormatString, InputString) -> + Unit = time_unit(TimeUnit), + emqx_calendar:parse(InputString, Unit, FormatString). + +%% date string has no timezone information, force add the offset. date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) -> - emqx_rule_date:parse_date(time_unit(TimeUnit), - emqx_rule_utils:str(Offset), - emqx_rule_utils:str(FormatString), - emqx_rule_utils:str(InputString)). + Unit = time_unit(TimeUnit), + OffsetSecond = emqx_calendar:offset_second(Offset), + OffsetDelta = erlang:convert_time_unit(OffsetSecond, second, Unit), + date_to_unix_ts(Unit, FormatString, InputString) - OffsetDelta. mongo_date() -> erlang:timestamp(). diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index 98f6ad0b7..aa2781e8c 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -713,24 +713,29 @@ t_format_date_funcs(_) -> ?PROPTEST(prop_format_date_fun). prop_format_date_fun() -> - Args1 = [<<"second">>, <<"+07:00">>, <<"%m--%d--%y---%H:%M:%S%Z">>], + Args1 = [<<"second">>, <<"+07:00">>, <<"%m--%d--%Y---%H:%M:%S%z">>], ?FORALL(S, erlang:system_time(second), S == apply_func(date_to_unix_ts, Args1 ++ [apply_func(format_date, Args1 ++ [S])])), - Args2 = [<<"millisecond">>, <<"+04:00">>, <<"--%m--%d--%y---%H:%M:%S%Z">>], + Args2 = [<<"millisecond">>, <<"+04:00">>, <<"--%m--%d--%Y---%H:%M:%S:%3N%z">>], + Args2DTUS = [<<"millisecond">>, <<"--%m--%d--%Y---%H:%M:%S:%3N%z">>], ?FORALL(S, erlang:system_time(millisecond), S == apply_func(date_to_unix_ts, - Args2 ++ [apply_func(format_date, + Args2DTUS ++ [apply_func(format_date, Args2 ++ [S])])), - Args = [<<"second">>, <<"+08:00">>, <<"%y-%m-%d-%H:%M:%S%Z">>], + Args = [<<"second">>, <<"+08:00">>, <<"%Y-%m-%d-%H:%M:%S%z">>], + ArgsDTUS = [<<"second">>, <<"%Y-%m-%d-%H:%M:%S%z">>], ?FORALL(S, erlang:system_time(second), S == apply_func(date_to_unix_ts, - Args ++ [apply_func(format_date, - Args ++ [S])])). -%%------------------------------------------------------------------------------ -%% Utility functions -%%------------------------------------------------------------------------------ + ArgsDTUS ++ [apply_func(format_date, + Args ++ [S])])), + % no offset in format string. force add offset + Second = erlang:system_time(second), + Args3 = [<<"second">>, <<"+04:00">>, <<"--%m--%d--%Y---%H:%M:%S">>, Second], + Formatters3 = apply_func(format_date, Args3), + Args3DTUS = [<<"second">>, <<"+04:00">>, <<"--%m--%d--%Y---%H:%M:%S">>, Formatters3], + Second == apply_func(date_to_unix_ts, Args3DTUS). apply_func(Name, Args) when is_atom(Name) -> erlang:apply(emqx_rule_funcs, Name, Args); diff --git a/deploy/docker/Dockerfile b/deploy/docker/Dockerfile index d3a58ce74..63702e2c4 100644 --- a/deploy/docker/Dockerfile +++ b/deploy/docker/Dockerfile @@ -1,4 +1,4 @@ -ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-alpine3.15.1 +ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-alpine3.15.1 ARG RUN_FROM=alpine:3.15.1 FROM ${BUILD_FROM} AS builder diff --git a/etc/emqx.conf b/etc/emqx.conf index 5210b1603..af25d2f08 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -463,7 +463,33 @@ log.file = emqx.log ## Log formatter ## Value: text | json -#log.formatter = text +log.formatter = text + +## Format of the text logger. +## +## Value: rfc3339 | FORMAT +## Where FORMAT is the format string of the timestamp. Supported specifiers: +## %Y: year +## %m: month +## %d: day +## %H: hour +## %M: minute +## %S: second +## %N: nanoseconds (000000000 - 999999999) +## %6N: microseconds (00000 - 999999) +## %3N: milliseconds (000 - 999) +## %z: timezone, [+-]HHMM +## %:z: timezone, [+-]HH:MM +## %::z: timezone, [+-]HH:MM:SS +## +## For example: +## log.formatter.text.date.format = %Y-%m-%dT%H:%M:%S.%6N %:z +## +## Before 4.2, the default date format was: +## log.formatter.text.date.format = %Y-%m-%d %H:%M:%S.%3N +## +## Default: rfc3339 +# log.formatter.text.date.format = rfc3339 ## Log to single line ## Value: Boolean diff --git a/priv/emqx.schema b/priv/emqx.schema index d799f170d..86bbebc09 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -563,6 +563,12 @@ end}. {datatype, {enum, [text, json]}} ]}. +%% @doc format logs as text, date format part +{mapping, "log.formatter.text.date.format", "kernel.logger", [ + {default, "rfc3339"}, + {datatype, string} +]}. + %% @doc format logs in a single line. {mapping, "log.single_line", "kernel.logger", [ {default, true}, @@ -664,9 +670,38 @@ end}. single_line => SingleLine }}; text -> + DateFormat = + case cuttlefish:conf_get("log.formatter.text.date.format", Conf, "rfc3339") of + "rfc3339" -> + rfc3339; + DateStr -> + DateStrTrans = + fun + DST(<<>>, Formatter) -> lists:reverse(Formatter); + DST(<<"%Y", Tail/binary>>, Formatter) -> DST(Tail, [year | Formatter]); + DST(<<"%m", Tail/binary>>, Formatter) -> DST(Tail, [month | Formatter]); + DST(<<"%d", Tail/binary>>, Formatter) -> DST(Tail, [day | Formatter]); + DST(<<"%H", Tail/binary>>, Formatter) -> DST(Tail, [hour | Formatter]); + DST(<<"%M", Tail/binary>>, Formatter) -> DST(Tail, [minute | Formatter]); + DST(<<"%S", Tail/binary>>, Formatter) -> DST(Tail, [second | Formatter]); + DST(<<"%N", Tail/binary>>, Formatter) -> DST(Tail, [nanosecond | Formatter]); + DST(<<"%3N", Tail/binary>>, Formatter) -> DST(Tail, [millisecond | Formatter]); + DST(<<"%6N", Tail/binary>>, Formatter) -> DST(Tail, [microsecond | Formatter]); + DST(<<"%z", Tail/binary>>, Formatter) -> DST(Tail, [timezone | Formatter]); + DST(<<"%:z", Tail/binary>>, Formatter) -> DST(Tail, [timezone1 | Formatter]); + DST(<<"%::z", Tail/binary>>, Formatter) -> DST(Tail, [timezone2 | Formatter]); + DST(<>, [Str | Formatter]) when is_list(Str) -> + DST(Tail, [lists:append(Str, [Char]) | Formatter]); + DST(<>, Formatter) -> + DST(Tail, [[Char] | Formatter]) + end, + DateStrTrans(list_to_binary(DateStr), []) + end, {emqx_logger_textfmt, - #{template => - [time," [",level,"] ", + #{ + date_format => DateFormat, + template => + [" [",level,"] ", {clientid, [{peername, [clientid,"@",peername," "], diff --git a/scripts/buildx.sh b/scripts/buildx.sh index f50315a22..3337383eb 100755 --- a/scripts/buildx.sh +++ b/scripts/buildx.sh @@ -8,7 +8,7 @@ ## i.e. will not work if docker command has to be executed with sudo ## example: -## ./scripts/buildx.sh --profile emqx --pkgtype zip --builder ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-debian10 --arch arm64 +## ./scripts/buildx.sh --profile emqx --pkgtype zip --builder ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-debian10 --arch arm64 set -euo pipefail @@ -20,7 +20,7 @@ help() { echo "--arch amd64|arm64: Target arch to build the EMQ X package for" echo "--src_dir : EMQ X source ode in this dir, default to PWD" echo "--builder : Builder image to pull" - echo " E.g. ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-debian10" + echo " E.g. ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-debian10" } while [ "$#" -gt 0 ]; do diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 6b15d8978..a949e87db 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,7 +2,9 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.3", - [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + [{add_module,emqx_calendar}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, @@ -17,7 +19,9 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_relup}]}, {"4.4.2", - [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + [{add_module,emqx_calendar}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -36,7 +40,9 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_relup}]}, {"4.4.1", - [{load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, + [{add_module,emqx_calendar}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {update,emqx_os_mon,{advanced,[]}}, @@ -64,7 +70,9 @@ {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {add_module,emqx_relup}]}, {"4.4.0", - [{load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, + [{add_module,emqx_calendar}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {update,emqx_os_mon,{advanced,[]}}, @@ -97,7 +105,9 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.4.3", - [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + [{add_module,emqx_calendar}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, @@ -111,7 +121,9 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_relup}]}, {"4.4.2", - [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + [{add_module,emqx_calendar}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, @@ -129,7 +141,9 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_relup}]}, {"4.4.1", - [{load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, + [{add_module,emqx_calendar}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {update,emqx_os_mon,{advanced,[]}}, @@ -156,7 +170,9 @@ {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {delete_module,emqx_relup}]}, {"4.4.0", - [{load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, + [{add_module,emqx_calendar}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {update,emqx_os_mon,{advanced,[]}}, diff --git a/src/emqx_calendar.erl b/src/emqx_calendar.erl new file mode 100644 index 000000000..0c094df81 --- /dev/null +++ b/src/emqx_calendar.erl @@ -0,0 +1,436 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_calendar). + +-define(SECONDS_PER_MINUTE, 60). +-define(SECONDS_PER_HOUR, 3600). +-define(SECONDS_PER_DAY, 86400). +-define(DAYS_PER_YEAR, 365). +-define(DAYS_PER_LEAP_YEAR, 366). +-define(DAYS_FROM_0_TO_1970, 719528). +-define(SECONDS_FROM_0_TO_1970, ?DAYS_FROM_0_TO_1970 * ?SECONDS_PER_DAY). + +-export([ formatter/1 + , format/3 + , format/4 + , parse/3 + , offset_second/1 + ]). + +-define(DATE_PART, + [ year + , month + , day + , hour + , minute + , second + , nanosecond + , millisecond + , microsecond + ]). + +-define(DATE_ZONE_NAME, + [ timezone + , timezone1 + , timezone2 + ]). + +formatter(FormatterStr) when is_list(FormatterStr) -> + formatter(list_to_binary(FormatterStr)); +formatter(FormatterBin) when is_binary(FormatterBin) -> + do_formatter(FormatterBin, []). + +offset_second(Offset) -> + offset_second_(Offset). + +format(Time, Unit, Formatter) -> + format(Time, Unit, undefined, Formatter). + +format(Time, Unit, Offset, FormatterBin) when is_binary(FormatterBin) -> + format(Time, Unit, Offset, formatter(FormatterBin)); +format(Time, Unit, Offset, Formatter) -> + do_format(Time, time_unit(Unit), offset_second(Offset), Formatter). + +parse(DateStr, Unit, FormatterBin) when is_binary(FormatterBin) -> + parse(DateStr, Unit, formatter(FormatterBin)); +parse(DateStr, Unit, Formatter) -> + do_parse(DateStr, Unit, Formatter). +%% ------------------------------------------------------------------------------------------------- +%% internal + +time_unit(second) -> second; +time_unit(millisecond) -> millisecond; +time_unit(microsecond) -> microsecond; +time_unit(nanosecond) -> nanosecond; +time_unit("second") -> second; +time_unit("millisecond") -> millisecond; +time_unit("microsecond") -> microsecond; +time_unit("nanosecond") -> nanosecond; +time_unit(<<"second">>) -> second; +time_unit(<<"millisecond">>) -> millisecond; +time_unit(<<"microsecond">>) -> microsecond; +time_unit(<<"nanosecond">>) -> nanosecond. + +%% ------------------------------------------------------------------------------------------------- +%% internal: format part + +do_formatter(<<>>, Formatter) -> lists:reverse(Formatter); +do_formatter(<<"%Y", Tail/binary>>, Formatter) -> do_formatter(Tail, [year | Formatter]); +do_formatter(<<"%m", Tail/binary>>, Formatter) -> do_formatter(Tail, [month | Formatter]); +do_formatter(<<"%d", Tail/binary>>, Formatter) -> do_formatter(Tail, [day | Formatter]); +do_formatter(<<"%H", Tail/binary>>, Formatter) -> do_formatter(Tail, [hour | Formatter]); +do_formatter(<<"%M", Tail/binary>>, Formatter) -> do_formatter(Tail, [minute | Formatter]); +do_formatter(<<"%S", Tail/binary>>, Formatter) -> do_formatter(Tail, [second | Formatter]); +do_formatter(<<"%N", Tail/binary>>, Formatter) -> do_formatter(Tail, [nanosecond | Formatter]); +do_formatter(<<"%3N", Tail/binary>>, Formatter) -> do_formatter(Tail, [millisecond | Formatter]); +do_formatter(<<"%6N", Tail/binary>>, Formatter) -> do_formatter(Tail, [microsecond | Formatter]); +do_formatter(<<"%z", Tail/binary>>, Formatter) -> do_formatter(Tail, [timezone | Formatter]); +do_formatter(<<"%:z", Tail/binary>>, Formatter) -> do_formatter(Tail, [timezone1 | Formatter]); +do_formatter(<<"%::z", Tail/binary>>, Formatter) -> do_formatter(Tail, [timezone2 | Formatter]); +do_formatter(<>, [Str | Formatter]) when is_list(Str) -> + do_formatter(Tail, [lists:append(Str, [Char]) | Formatter]); +do_formatter(<>, Formatter) -> do_formatter(Tail, [[Char] | Formatter]). + +offset_second_(OffsetSecond) when is_integer(OffsetSecond) -> OffsetSecond; +offset_second_(undefined) -> 0; +offset_second_("local") -> offset_second_(local); +offset_second_(<<"local">>) -> offset_second_(local); +offset_second_(local) -> + UniversalTime = calendar:system_time_to_universal_time(erlang:system_time(second), second), + LocalTime = erlang:universaltime_to_localtime(UniversalTime), + LocalSecs = calendar:datetime_to_gregorian_seconds(LocalTime), + UniversalSecs = calendar:datetime_to_gregorian_seconds(UniversalTime), + LocalSecs - UniversalSecs; +offset_second_(Offset) when is_binary(Offset) -> + offset_second_(erlang:binary_to_list(Offset)); +offset_second_("Z") -> 0; +offset_second_("z") -> 0; +offset_second_(Offset) when is_list(Offset) -> + Sign = hd(Offset), + ((Sign == $+) orelse (Sign == $-)) + orelse error({bad_time_offset, Offset}), + Signs = #{$+ => 1, $- => -1}, + PosNeg = maps:get(Sign, Signs), + [Sign | HM] = Offset, + {HourStr, MinuteStr, SecondStr} = + case string:tokens(HM, ":") of + [H, M] -> + {H, M, "0"}; + [H, M, S] -> + {H, M, S}; + [HHMM] when erlang:length(HHMM) == 4 -> + {string:sub_string(HHMM, 1,2), string:sub_string(HHMM, 3,4), "0"}; + _ -> + error({bad_time_offset, Offset}) + end, + Hour = erlang:list_to_integer(HourStr), + Minute = erlang:list_to_integer(MinuteStr), + Second = erlang:list_to_integer(SecondStr), + (Hour =< 23) orelse error({bad_time_offset_hour, Hour}), + (Minute =< 59) orelse error({bad_time_offset_minute, Minute}), + (Second =< 59) orelse error({bad_time_offset_second, Second}), + PosNeg * (Hour * 3600 + Minute * 60 + Second). + +do_format(Time, Unit, Offset, Formatter) -> + Adjustment = erlang:convert_time_unit(Offset, second, Unit), + AdjustedTime = Time + Adjustment, + Factor = factor(Unit), + Secs = AdjustedTime div Factor, + DateTime = system_time_to_datetime(Secs), + {{Year, Month, Day}, {Hour, Min, Sec}} = DateTime, + Date = #{ + year => padding(Year, 4), + month => padding(Month, 2), + day => padding(Day, 2), + hour => padding(Hour, 2), + minute => padding(Min, 2), + second => padding(Sec, 2), + millisecond => trans_x_second(Unit, millisecond, Time), + microsecond => trans_x_second(Unit, microsecond, Time), + nanosecond => trans_x_second(Unit, nanosecond, Time) + }, + Timezones = formatter_timezones(Offset, Formatter, #{}), + DateWithZone = maps:merge(Date, Timezones), + [maps:get(Key, DateWithZone, Key) || Key <- Formatter]. + +formatter_timezones(_Offset, [], Zones) -> Zones; +formatter_timezones(Offset, [Timezone | Formatter], Zones) -> + case lists:member(Timezone, [timezone, timezone1, timezone2]) of + true -> + NZones = Zones#{Timezone => offset_to_timezone(Offset, Timezone)}, + formatter_timezones(Offset, Formatter, NZones); + false -> + formatter_timezones(Offset, Formatter, Zones) + end. + +offset_to_timezone(Offset, Timezone) -> + Sign = + case Offset >= 0 of + true -> + $+; + false -> + $- + end, + {H, M, S} = seconds_to_time(abs(Offset)), + %% TODO: Support zone define %:::z + %% Numeric time zone with ":" to necessary precision (e.g., -04, +05:30). + case Timezone of + timezone -> + %% +0800 + io_lib:format("~c~2.10.0B~2.10.0B", [Sign, H, M]); + timezone1 -> + %% +08:00 + io_lib:format("~c~2.10.0B:~2.10.0B", [Sign, H, M]); + timezone2 -> + %% +08:00:00 + io_lib:format("~c~2.10.0B:~2.10.0B:~2.10.0B", [Sign, H, M, S]) + end. + +factor(second) -> 1; +factor(millisecond) -> 1000; +factor(microsecond) -> 1000000; +factor(nanosecond) -> 1000000000. + +system_time_to_datetime(Seconds) -> + gregorian_seconds_to_datetime(Seconds + ?SECONDS_FROM_0_TO_1970). + +gregorian_seconds_to_datetime(Secs) when Secs >= 0 -> + Days = Secs div ?SECONDS_PER_DAY, + Rest = Secs rem ?SECONDS_PER_DAY, + {gregorian_days_to_date(Days), seconds_to_time(Rest)}. + +seconds_to_time(Secs) when Secs >= 0, Secs < ?SECONDS_PER_DAY -> + Secs0 = Secs rem ?SECONDS_PER_DAY, + Hour = Secs0 div ?SECONDS_PER_HOUR, + Secs1 = Secs0 rem ?SECONDS_PER_HOUR, + Minute = Secs1 div ?SECONDS_PER_MINUTE, + Second = Secs1 rem ?SECONDS_PER_MINUTE, + {Hour, Minute, Second}. + +gregorian_days_to_date(Days) -> + {Year, DayOfYear} = day_to_year(Days), + {Month, DayOfMonth} = year_day_to_date(Year, DayOfYear), + {Year, Month, DayOfMonth}. + +day_to_year(DayOfEpoch) when DayOfEpoch >= 0 -> + YMax = DayOfEpoch div ?DAYS_PER_YEAR, + YMin = DayOfEpoch div ?DAYS_PER_LEAP_YEAR, + {Y1, D1} = dty(YMin, YMax, DayOfEpoch, dy(YMin), dy(YMax)), + {Y1, DayOfEpoch - D1}. + +year_day_to_date(Year, DayOfYear) -> + ExtraDay = + case is_leap_year(Year) of + true -> + 1; + false -> + 0 + end, + {Month, Day} = year_day_to_date2(ExtraDay, DayOfYear), + {Month, Day + 1}. + +dty(Min, Max, _D1, DMin, _DMax) when Min == Max -> + {Min, DMin}; +dty(Min, Max, D1, DMin, DMax) -> + Diff = Max - Min, + Mid = Min + Diff * (D1 - DMin) div (DMax - DMin), + MidLength = + case is_leap_year(Mid) of + true -> + ?DAYS_PER_LEAP_YEAR; + false -> + ?DAYS_PER_YEAR + end, + case dy(Mid) of + D2 when D1 < D2 -> + NewMax = Mid - 1, + dty(Min, NewMax, D1, DMin, dy(NewMax)); + D2 when D1 - D2 >= MidLength -> + NewMin = Mid + 1, + dty(NewMin, Max, D1, dy(NewMin), DMax); + D2 -> + {Mid, D2} + end. + +dy(Y) when Y =< 0 -> + 0; +dy(Y) -> + X = Y - 1, + X div 4 - X div 100 + X div 400 + X * ?DAYS_PER_YEAR + ?DAYS_PER_LEAP_YEAR. + +is_leap_year(Y) when is_integer(Y), Y >= 0 -> + is_leap_year1(Y). + +is_leap_year1(Year) when Year rem 4 =:= 0, Year rem 100 > 0 -> + true; +is_leap_year1(Year) when Year rem 400 =:= 0 -> + true; +is_leap_year1(_) -> + false. + +year_day_to_date2(_, Day) when Day < 31 -> + {1, Day}; +year_day_to_date2(E, Day) when 31 =< Day, Day < 59 + E -> + {2, Day - 31}; +year_day_to_date2(E, Day) when 59 + E =< Day, Day < 90 + E -> + {3, Day - (59 + E)}; +year_day_to_date2(E, Day) when 90 + E =< Day, Day < 120 + E -> + {4, Day - (90 + E)}; +year_day_to_date2(E, Day) when 120 + E =< Day, Day < 151 + E -> + {5, Day - (120 + E)}; +year_day_to_date2(E, Day) when 151 + E =< Day, Day < 181 + E -> + {6, Day - (151 + E)}; +year_day_to_date2(E, Day) when 181 + E =< Day, Day < 212 + E -> + {7, Day - (181 + E)}; +year_day_to_date2(E, Day) when 212 + E =< Day, Day < 243 + E -> + {8, Day - (212 + E)}; +year_day_to_date2(E, Day) when 243 + E =< Day, Day < 273 + E -> + {9, Day - (243 + E)}; +year_day_to_date2(E, Day) when 273 + E =< Day, Day < 304 + E -> + {10, Day - (273 + E)}; +year_day_to_date2(E, Day) when 304 + E =< Day, Day < 334 + E -> + {11, Day - (304 + E)}; +year_day_to_date2(E, Day) when 334 + E =< Day -> + {12, Day - (334 + E)}. + +trans_x_second(FromUnit, ToUnit, Time) -> + XSecond = do_trans_x_second(FromUnit, ToUnit, Time), + Len = + case ToUnit of + millisecond -> 3; + microsecond -> 6; + nanosecond -> 9 + end, + padding(XSecond, Len). + +do_trans_x_second(second, second, Time) -> Time div 60; +do_trans_x_second(second, _, _Time) -> 0; + +do_trans_x_second(millisecond, millisecond, Time) -> Time rem 1000; +do_trans_x_second(millisecond, microsecond, Time) -> (Time rem 1000) * 1000; +do_trans_x_second(millisecond, nanosecond, Time) -> (Time rem 1000) * 1000_000; + +do_trans_x_second(microsecond, millisecond, Time) -> Time div 1000 rem 1000; +do_trans_x_second(microsecond, microsecond, Time) -> Time rem 1000000; +do_trans_x_second(microsecond, nanosecond, Time) -> (Time rem 1000000) * 1000; + +do_trans_x_second(nanosecond, millisecond, Time) -> Time div 1000000 rem 1000; +do_trans_x_second(nanosecond, microsecond, Time) -> Time div 1000 rem 1000000; +do_trans_x_second(nanosecond, nanosecond, Time) -> Time rem 1000000000. + +padding(Data, Len) when is_integer(Data) -> + padding(integer_to_list(Data), Len); +padding(Data, Len) when Len > 0 andalso erlang:length(Data) < Len -> + [$0 | padding(Data, Len - 1)]; +padding(Data, _Len) -> + Data. + +%% ------------------------------------------------------------------------------------------------- +%% internal +%% parse part + +do_parse(DateStr, Unit, Formatter) -> + DateInfo = do_parse_date_str(DateStr, Formatter, #{}), + {Precise, PrecisionUnit} = precision(DateInfo), + Counter = + fun + (year, V, Res) -> + Res + dy(V) * ?SECONDS_PER_DAY * Precise - (?SECONDS_FROM_0_TO_1970 * Precise); + (month, V, Res) -> + Res + dm(V) * ?SECONDS_PER_DAY * Precise; + (day, V, Res) -> + Res + (V * ?SECONDS_PER_DAY * Precise); + (hour, V, Res) -> + Res + (V * ?SECONDS_PER_HOUR * Precise); + (minute, V, Res) -> + Res + (V * ?SECONDS_PER_MINUTE * Precise); + (second, V, Res) -> + Res + V * Precise; + (millisecond, V, Res) -> + case PrecisionUnit of + millisecond -> + Res + V; + microsecond -> + Res + (V * 1000); + nanosecond -> + Res + (V * 1000000) + end; + (microsecond, V, Res) -> + case PrecisionUnit of + microsecond -> + Res + V; + nanosecond -> + Res + (V * 1000) + end; + (nanosecond, V, Res) -> + Res + V; + (parsed_offset, V, Res) -> + Res - V + end, + Count = maps:fold(Counter, 0, DateInfo) - (?SECONDS_PER_DAY * Precise), + erlang:convert_time_unit(Count, PrecisionUnit, Unit). + +precision(#{nanosecond := _}) -> {1000_000_000, nanosecond}; +precision(#{microsecond := _}) -> {1000_000, microsecond}; +precision(#{millisecond := _}) -> {1000, millisecond}; +precision(#{second := _}) -> {1, second}; +precision(_) -> {1, second}. + +do_parse_date_str(<<>>, _, Result) -> Result; +do_parse_date_str(_, [], Result) -> Result; +do_parse_date_str(Date, [Key | Formatter], Result) -> + Size = date_size(Key), + <> = Date, + case lists:member(Key, ?DATE_PART) of + true -> + do_parse_date_str(Tail, Formatter, Result#{Key => erlang:binary_to_integer(DatePart)}); + false -> + case lists:member(Key, ?DATE_ZONE_NAME) of + true -> + do_parse_date_str(Tail, Formatter, Result#{parsed_offset => offset_second(DatePart)}); + false -> + do_parse_date_str(Tail, Formatter, Result) + end + end. + +date_size(Str) when is_list(Str) -> erlang:length(Str); +date_size(year) -> 4; +date_size(month) -> 2; +date_size(day) -> 2; +date_size(hour) -> 2; +date_size(minute) -> 2; +date_size(second) -> 2; +date_size(millisecond) -> 3; +date_size(microsecond) -> 6; +date_size(nanosecond) -> 9; +date_size(timezone) -> 5; +date_size(timezone1) -> 6; +date_size(timezone2) -> 9. + +dm(1) -> 0; +dm(2) -> 31; +dm(3) -> 59; +dm(4) -> 90; +dm(5) -> 120; +dm(6) -> 151; +dm(7) -> 181; +dm(8) -> 212; +dm(9) -> 243; +dm(10) -> 273; +dm(11) -> 304; +dm(12) -> 334. + diff --git a/src/emqx_logger_textfmt.erl b/src/emqx_logger_textfmt.erl index 98104f58c..1d1bb8dcd 100644 --- a/src/emqx_logger_textfmt.erl +++ b/src/emqx_logger_textfmt.erl @@ -28,11 +28,27 @@ , gl % not interesting ]). -check_config(X) -> logger_formatter:check_config(X). +check_config(Config0) -> + Config = maps:without([date_format], Config0), + logger_formatter:check_config(Config). -format(#{msg := Msg0, meta := Meta} = Event, Config) -> +format(#{msg := Msg0, meta := Meta} = Event, + #{date_format := rfc3339, template := Template0} = Config) -> Msg = maybe_merge(Msg0, Meta), - logger_formatter:format(Event#{msg := Msg}, Config). + Template = [time | Template0], + logger_formatter:format(Event#{msg := Msg}, Config#{template => Template}); +format(#{msg := Msg0, meta := Meta} = Event, + #{date_format := DFS} = Config) -> + Msg = maybe_merge(Msg0, Meta), + Time = + case maps:get(time, Event, undefined) of + undefined -> + erlang:system_time(microsecond); + T -> + T + end, + Date = emqx_calendar:format(Time, microsecond, local, DFS), + [Date | logger_formatter:format(Event#{msg := Msg}, Config)]. maybe_merge({report, Report}, Meta) when is_map(Report) -> {report, maps:merge(Report, filter(Meta))}; diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 429275aff..84d72d177 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -486,8 +486,8 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = case emqx_inflight:is_full(Inflight) of true -> Session1 = case maybe_nack(Msg) of - true -> Session; - false -> enqueue(ClientInfo, Msg, Session) + drop -> Session; + store -> enqueue(ClientInfo, Msg, Session) end, {ok, Session1}; false -> @@ -695,7 +695,7 @@ redispatch_shared_messages(#session{inflight = Inflight}) -> %% Note that dispatch is called with self() in failed subs %% This is done to avoid dispatching back to caller Delivery = #delivery{sender = self(), message = Msg}, - emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]); + emqx_shared_sub:dispatch_to_non_self(Group, Topic, Delivery); _ -> false end; diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 121d777ca..c7fb1a3cb 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -39,8 +39,8 @@ ]). -export([ dispatch/3 - , dispatch/4 - ]). + , dispatch_to_non_self/3 + ]). -export([ maybe_ack/1 , maybe_nack_dropped/1 @@ -123,22 +123,28 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. +-spec(dispatch_to_non_self(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery()) + -> emqx_types:deliver_result()). +dispatch_to_non_self(Group, Topic, Delivery) -> + Strategy = strategy(Group), + dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{self() => sender}). + -spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). dispatch(Group, Topic, Delivery) -> - dispatch(Group, Topic, Delivery, _FailedSubs = []). + Strategy = strategy(Group), + dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}). -dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> +dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> #message{from = ClientId, topic = SourceTopic} = Msg, - case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of - false -> - {error, no_subscribers}; + case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of + false -> {error, no_subscribers}; {Type, SubPid} -> case do_dispatch(SubPid, Group, Topic, Msg, Type) of ok -> {ok, 1}; - {error, _Reason} -> + {error, Reason} -> %% Failed to dispatch to this sub, try next. - dispatch(Group, Topic, Delivery, [SubPid | FailedSubs]) + dispatch(Strategy, Group, Topic, Delivery, FailedSubs#{SubPid => Reason}) end end. @@ -165,33 +171,33 @@ do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch SubPid ! {deliver, Topic, Msg}, ok; -do_dispatch(SubPid, _Group, Topic, Msg, retry) -> - %% Retry implies all subscribers nack:ed, send again without ack - SubPid ! {deliver, Topic, Msg}, - ok; -do_dispatch(SubPid, Group, Topic, Msg, fresh) -> +do_dispatch(SubPid, Group, Topic, Msg, Type) -> case ack_enabled() of true -> - dispatch_with_ack(SubPid, Group, Topic, Msg); + dispatch_with_ack(SubPid, Group, Topic, Msg, Type); false -> SubPid ! {deliver, Topic, Msg}, ok end. -dispatch_with_ack(SubPid, Group, Topic, Msg) -> +dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), Sender = self(), - SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)}, + SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}, Timeout = case Msg#message.qos of ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); ?QOS_2 -> infinity end, + + %% This OpaqueRef is a forward compatibilty workaround. Pre 4.3.15 versions + %% pass Ref from `{Sender, Ref}` ack header back as it is. + OpaqueRef = old_ref(Type, Group, Ref), try receive - {Ref, ?ACK} -> + {ReceivedRef, ?ACK} when ReceivedRef =:= Ref; ReceivedRef =:= OpaqueRef -> ok; - {Ref, ?NACK(Reason)} -> + {ReceivedRef, ?NACK(Reason)} when ReceivedRef =:= Ref; ReceivedRef =:= OpaqueRef -> %% the receive session may nack this message when its queue is full {error, Reason}; {'DOWN', Ref, process, SubPid, Reason} -> @@ -204,8 +210,11 @@ dispatch_with_ack(SubPid, Group, Topic, Msg) -> _ = erlang:demonitor(Ref, [flush]) end. -with_group_ack(Msg, Group, Sender, Ref) -> - emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg). +with_group_ack(Msg, Group, Type, Sender, Ref) -> + emqx_message:set_headers(#{shared_dispatch_ack => {Sender, old_ref(Type, Group, Ref)}}, Msg). + +old_ref(Type, Group, Ref) -> + {Type, Group, Ref}. -spec(without_group_ack(emqx_types:message()) -> emqx_types:message()). without_group_ack(Msg) -> @@ -217,19 +226,32 @@ get_group_ack(Msg) -> -spec(get_group(emqx_types:message()) -> {ok, any()} | error). get_group(Msg) -> case get_group_ack(Msg) of - ?NO_ACK -> error; - {Group, _Sender, _Ref} -> {ok, Group} + {_Sender, {_Type, Group, _Ref}} -> {ok, Group}; + _ -> error end. -spec(is_ack_required(emqx_types:message()) -> boolean()). is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg). %% @doc Negative ack dropped message due to inflight window or message queue being full. --spec(maybe_nack_dropped(emqx_types:message()) -> boolean()). +-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop). maybe_nack_dropped(Msg) -> case get_group_ack(Msg) of - ?NO_ACK -> false; - {_Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped) + %% No ack header is present, put it into mqueue + ?NO_ACK -> store; + + %% For fresh Ref we send a nack and return true, to note that the inflight is full + {Sender, {fresh, _Group, Ref}} -> nack(Sender, Ref, dropped), drop; + + %% For retry Ref we can't reject a message if inflight is full, so we mark it as + %% acknowledged and put it into mqueue + {_Sender, {retry, _Group, _Ref}} -> maybe_ack(Msg), store; + + %% This clause is for backward compatibility + Ack -> + {Sender, Ref} = fetch_sender_ref(Ack), + nack(Sender, Ref, dropped), + drop end. %% @doc Negative ack message due to connection down. @@ -237,7 +259,7 @@ maybe_nack_dropped(Msg) -> %% i.e is_ack_required returned true. -spec(nack_no_connection(emqx_types:message()) -> ok). nack_no_connection(Msg) -> - {_Group, Sender, Ref} = get_group_ack(Msg), + {Sender, Ref} = fetch_sender_ref(get_group_ack(Msg)), nack(Sender, Ref, no_connection). -spec(nack(pid(), reference(), dropped | no_connection) -> ok). @@ -250,11 +272,16 @@ maybe_ack(Msg) -> case get_group_ack(Msg) of ?NO_ACK -> Msg; - {_Group, Sender, Ref} -> + Ack -> + {Sender, Ref} = fetch_sender_ref(Ack), Sender ! {Ref, ?ACK}, without_group_ack(Msg) end. +fetch_sender_ref({Sender, {_Type, _Group, Ref}}) -> {Sender, Ref}; +%% These clauses are for backward compatibility +fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}. + pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> Sub0 = erlang:get({shared_sub_sticky, Group, Topic}), case is_active_sub(Sub0, FailedSubs) of @@ -264,23 +291,37 @@ pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> {fresh, Sub0}; false -> %% randomly pick one for the first message - {Type, Sub} = do_pick(random, ClientId, SourceTopic, Group, Topic, [Sub0 | FailedSubs]), - %% stick to whatever pick result - erlang:put({shared_sub_sticky, Group, Topic}, Sub), - {Type, Sub} + FailedSubs1 = maps_put_new(FailedSubs, Sub0, inactive), + case do_pick(random, ClientId, SourceTopic, Group, Topic, FailedSubs1) of + false -> false; + {Type, Sub} -> + %% stick to whatever pick result + erlang:put({shared_sub_sticky, Group, Topic}, Sub), + {Type, Sub} + end end; pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs). do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> All = subscribers(Group, Topic), - case All -- FailedSubs of + case lists:filter(fun(Sub) -> not maps:is_key(Sub, FailedSubs) end, All) of [] when All =:= [] -> %% Genuinely no subscriber false; [] -> - %% All offline? pick one anyway - {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)}; + %% We try redispatch to subs who dropped the message because inflight was full. + Found = maps_find_by(FailedSubs, fun({SubPid, FailReason}) -> + FailReason == dropped andalso is_alive_sub(SubPid) + end), + case Found of + {ok, Dropped, dropped} -> + %% Found dropped client + {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, [Dropped])}; + error -> + %% All offline? pick one anyway + {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)} + end; Subs -> %% More than one available {fresh, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs)} @@ -414,7 +455,7 @@ update_stats(State) -> %% Return 'true' if the subscriber process is alive AND not in the failed list is_active_sub(Pid, FailedSubs) -> - is_alive_sub(Pid) andalso not lists:member(Pid, FailedSubs). + not maps:is_key(Pid, FailedSubs) andalso is_alive_sub(Pid). %% erlang:is_process_alive/1 does not work with remote pid. is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> @@ -427,3 +468,22 @@ delete_route_if_needed({Group, Topic}) -> true -> ok; false -> ok = emqx_router:do_delete_route(Topic, {Group, node()}) end. + +maps_find_by(Map, Predicate) when is_map(Map) -> + maps_find_by(maps:iterator(Map), Predicate); + +maps_find_by(Iterator, Predicate) -> + case maps:next(Iterator) of + none -> error; + {Key, Value, NewIterator} -> + case Predicate(Key, Value) of + true -> {ok, Key, Value}; + false -> maps_find_by(NewIterator, Predicate) + end + end. + +maps_put_new(Map, Key, Value) -> + case Map of + #{Key := _} -> Map; + _ -> Map#{Key => Value} + end. diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 5c2de94d0..30bd96a66 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -47,20 +47,20 @@ t_is_ack_required(_) -> ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})). t_maybe_nack_dropped(_) -> - ?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, - ?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)), + ?assertEqual(store, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), + Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, + ?assertEqual(drop, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end). t_nack_no_connection(_) -> - Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok after 100 -> timeout end). t_maybe_ack(_) -> ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), - Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}}, + Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}}, ?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}}, emqx_shared_sub:maybe_ack(Msg)), ?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end). @@ -446,6 +446,53 @@ t_redispatch(_) -> emqtt:stop(UsedSubPid2), ok. +t_dispatch_when_inflights_are_full(_) -> + ok = ensure_config(round_robin, true), + Topic = <<"foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + + %% Note that max_inflight is 1 + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {max_inflight, 1}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {max_inflight, 1}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar">>, 2}), + emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar">>, 2}), + + Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>), + Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>), + Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>), + ct:sleep(100), + + sys:suspend(ConnPid1), + sys:suspend(ConnPid2), + + %% Fill in the inflight for first client + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)), + + %% Fill in the inflight for second client + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)), + + %% Now kill any client + erlang:exit(ConnPid1, normal), + ct:sleep(100), + + %% And try to send the message + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)), + ?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)), + + %% And see that it gets dispatched to the client which is alive, even if it's inflight is full + sys:resume(ConnPid2), + ct:sleep(100), + ?assertMatch({true, ConnPid2}, last_message(<<"hello3">>, [ConnPid1, ConnPid2])), + ?assertMatch({true, ConnPid2}, last_message(<<"hello4">>, [ConnPid1, ConnPid2])), + + emqtt:stop(ConnPid2), + ok. + %%-------------------------------------------------------------------- %% help functions %%--------------------------------------------------------------------