Merge pull request #8064 from HJianBo/merge-main-v4.3-into-v4.4

Merge main v4.3 into v4.4
This commit is contained in:
JianBo He 2022-05-27 17:12:25 +08:00 committed by GitHub
commit 12de3ed250
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 759 additions and 102 deletions

View File

@ -3,7 +3,7 @@ version: '3.9'
services:
erlang:
container_name: erlang
image: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04
image: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04
env_file:
- conf.env
environment:

View File

@ -13,7 +13,7 @@ jobs:
os:
- ubuntu20.04
container: ghcr.io/emqx/emqx-builder/4.4-8:${{ matrix.erl_otp }}-${{ matrix.os }}
container: ghcr.io/emqx/emqx-builder/4.4-12:${{ matrix.erl_otp }}-${{ matrix.os }}
steps:
- uses: actions/checkout@v2

View File

@ -19,7 +19,7 @@ jobs:
prepare:
runs-on: ubuntu-20.04
# prepare source with any OTP version, no need for a matrix
container: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04
container: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04
outputs:
profiles: ${{ steps.set_profile.outputs.profiles}}
@ -34,6 +34,7 @@ jobs:
shell: bash
working-directory: source
run: |
git config --global --add safe.directory "$GITHUB_WORKSPACE"
if make emqx-ee --dry-run > /dev/null 2>&1; then
echo "::set-output name=profiles::[\"emqx-ee\"]"
else
@ -216,9 +217,9 @@ jobs:
- debian11
- debian10
- debian9
- rockylinux8
- centos7
- raspbian10
- el8
- el7
# - raspbian10 #armv6l is too slow to emulate
exclude:
- os: raspbian9
arch: amd64
@ -262,7 +263,7 @@ jobs:
--profile "${PROFILE}" \
--pkgtype "${PACKAGE}" \
--arch "${ARCH}" \
--builder "ghcr.io/emqx/emqx-builder/4.4-8:${OTP}-${SYSTEM}"
--builder "ghcr.io/emqx/emqx-builder/4.4-12:${OTP}-${SYSTEM}"
- uses: actions/upload-artifact@v1
if: startsWith(github.ref, 'refs/tags/')
with:
@ -339,7 +340,7 @@ jobs:
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
build-args: |
BUILD_FROM=ghcr.io/emqx/emqx-builder/4.4-8:${{ matrix.otp }}-alpine3.15.1
BUILD_FROM=ghcr.io/emqx/emqx-builder/4.4-12:${{ matrix.otp }}-alpine3.15.1
RUN_FROM=alpine:3.15.1
EMQX_NAME=${{ matrix.profile }}
file: source/deploy/docker/Dockerfile
@ -355,7 +356,7 @@ jobs:
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
build-args: |
BUILD_FROM=ghcr.io/emqx/emqx-builder/4.4-8:${{ matrix.otp }}-alpine3.15.1
BUILD_FROM=ghcr.io/emqx/emqx-builder/4.4-12:${{ matrix.otp }}-alpine3.15.1
RUN_FROM=alpine:3.15.1
EMQX_NAME=${{ matrix.profile }}
file: source/deploy/docker/Dockerfile.enterprise

View File

@ -21,14 +21,15 @@ jobs:
- 24.1.5-3
os:
- ubuntu20.04
- rockylinux8
- el8
container: ghcr.io/emqx/emqx-builder/4.4-8:${{ matrix.erl_otp }}-${{ matrix.os }}
container: ghcr.io/emqx/emqx-builder/4.4-12:${{ matrix.erl_otp }}-${{ matrix.os }}
steps:
- uses: actions/checkout@v1
- name: prepare
run: |
git config --global --add safe.directory "$GITHUB_WORKSPACE"
if make emqx-ee --dry-run > /dev/null 2>&1; then
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
git config --global credential.helper store

View File

@ -5,7 +5,7 @@ on: [pull_request]
jobs:
check_deps_integrity:
runs-on: ubuntu-20.04
container: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04
container: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04
steps:
- uses: actions/checkout@v2

View File

@ -7,7 +7,7 @@ on:
jobs:
prepare:
runs-on: ubuntu-20.04
container: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04
container: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04
outputs:
profiles: ${{ steps.set_profile.outputs.profiles}}

View File

@ -5,7 +5,7 @@ on: workflow_dispatch
jobs:
test:
runs-on: ubuntu-20.04
container: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04
container: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04
strategy:
fail-fast: true
env:

View File

@ -224,7 +224,7 @@ jobs:
relup_test_plan:
runs-on: ubuntu-20.04
container: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04
container: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04
outputs:
profile: ${{ steps.profile-and-versions.outputs.profile }}
vsn: ${{ steps.profile-and-versions.outputs.vsn }}
@ -275,7 +275,7 @@ jobs:
otp:
- 24.1.5-3
runs-on: ubuntu-20.04
container: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04
container: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04
defaults:
run:
shell: bash

View File

@ -10,7 +10,7 @@ on:
jobs:
run_proper_test:
runs-on: ubuntu-20.04
container: ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-ubuntu20.04
container: ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-ubuntu20.04
steps:
- uses: actions/checkout@v2

View File

@ -21,7 +21,7 @@ File format:
Now MQTT clients may be authorized with respect to a specific claim containing publish/subscribe topic whitelists.
* Better randomisation of app screts (changed from timestamp seeded sha hash (uuid) to crypto:strong_rand_bytes)
* Return a client_identifier_not_valid error when username is empty and username_as_clientid is set to true [#7862]
* Add more rule engine date functions: format_date/3, format_date/4, date_to_unix_ts/4 [#7894]
* Add more rule engine date functions: format_date/3, format_date/4, date_to_unix_ts/3, date_to_unix_ts/4 [#7894]
* Add proto_name and proto_ver fields for $event/client_disconnected event.
* Mnesia auth/acl http api support multiple condition queries.
* Inflight QoS1 Messages for shared topics are now redispatched to another alive subscribers upon chosen subscriber session termination.
@ -34,6 +34,7 @@ File format:
* SSL closed error bug fixed for redis client.
* Fix mqtt-sn client disconnected due to re-send a duplicated qos2 message
* Rule-engine function hexstr2bin/1 support half byte [#7977]
* Shared message delivery when all alive shared subs have full inflight [#7984]
* Improved resilience against autocluster partitioning during cluster
startup. [#7876]
[ekka-158](https://github.com/emqx/ekka/pull/158)

View File

@ -3,7 +3,7 @@ REBAR = $(CURDIR)/rebar3
BUILD = $(CURDIR)/build
SCRIPTS = $(CURDIR)/scripts
export EMQX_RELUP ?= true
export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-alpine3.15.1
export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-alpine3.15.1
export EMQX_DEFAULT_RUNNER = alpine:3.15.1
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)

View File

@ -204,6 +204,8 @@
, unix_ts_to_rfc3339/2
, format_date/3
, format_date/4
, timezone_to_second/1
, date_to_unix_ts/3
, date_to_unix_ts/4
, rfc3339_to_unix_ts/1
, rfc3339_to_unix_ts/2
@ -929,26 +931,37 @@ now_timestamp(Unit) ->
time_unit(<<"second">>) -> second;
time_unit(<<"millisecond">>) -> millisecond;
time_unit(<<"microsecond">>) -> microsecond;
time_unit(<<"nanosecond">>) -> nanosecond.
time_unit(<<"nanosecond">>) -> nanosecond;
time_unit(second) -> second;
time_unit(millisecond) -> millisecond;
time_unit(microsecond) -> microsecond;
time_unit(nanosecond) -> nanosecond.
format_date(TimeUnit, Offset, FormatString) ->
emqx_rule_utils:bin(
emqx_rule_date:date(time_unit(TimeUnit),
emqx_rule_utils:str(Offset),
emqx_rule_utils:str(FormatString))).
Unit = time_unit(TimeUnit),
TimeEpoch = erlang:system_time(Unit),
format_date(Unit, Offset, FormatString, TimeEpoch).
timezone_to_second(TimeZone) ->
emqx_calendar:offset_second(TimeZone).
format_date(TimeUnit, Offset, FormatString, TimeEpoch) ->
Unit = time_unit(TimeUnit),
emqx_rule_utils:bin(
emqx_rule_date:date(time_unit(TimeUnit),
emqx_rule_utils:str(Offset),
emqx_rule_utils:str(FormatString),
TimeEpoch)).
lists:concat(
emqx_calendar:format(TimeEpoch, Unit, Offset, FormatString))).
%% date string has timezone information, calculate the offset.
date_to_unix_ts(TimeUnit, FormatString, InputString) ->
Unit = time_unit(TimeUnit),
emqx_calendar:parse(InputString, Unit, FormatString).
%% date string has no timezone information, force add the offset.
date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) ->
emqx_rule_date:parse_date(time_unit(TimeUnit),
emqx_rule_utils:str(Offset),
emqx_rule_utils:str(FormatString),
emqx_rule_utils:str(InputString)).
Unit = time_unit(TimeUnit),
OffsetSecond = emqx_calendar:offset_second(Offset),
OffsetDelta = erlang:convert_time_unit(OffsetSecond, second, Unit),
date_to_unix_ts(Unit, FormatString, InputString) - OffsetDelta.
mongo_date() ->
erlang:timestamp().

View File

@ -713,24 +713,29 @@ t_format_date_funcs(_) ->
?PROPTEST(prop_format_date_fun).
prop_format_date_fun() ->
Args1 = [<<"second">>, <<"+07:00">>, <<"%m--%d--%y---%H:%M:%S%Z">>],
Args1 = [<<"second">>, <<"+07:00">>, <<"%m--%d--%Y---%H:%M:%S%z">>],
?FORALL(S, erlang:system_time(second),
S == apply_func(date_to_unix_ts,
Args1 ++ [apply_func(format_date,
Args1 ++ [S])])),
Args2 = [<<"millisecond">>, <<"+04:00">>, <<"--%m--%d--%y---%H:%M:%S%Z">>],
Args2 = [<<"millisecond">>, <<"+04:00">>, <<"--%m--%d--%Y---%H:%M:%S:%3N%z">>],
Args2DTUS = [<<"millisecond">>, <<"--%m--%d--%Y---%H:%M:%S:%3N%z">>],
?FORALL(S, erlang:system_time(millisecond),
S == apply_func(date_to_unix_ts,
Args2 ++ [apply_func(format_date,
Args2DTUS ++ [apply_func(format_date,
Args2 ++ [S])])),
Args = [<<"second">>, <<"+08:00">>, <<"%y-%m-%d-%H:%M:%S%Z">>],
Args = [<<"second">>, <<"+08:00">>, <<"%Y-%m-%d-%H:%M:%S%z">>],
ArgsDTUS = [<<"second">>, <<"%Y-%m-%d-%H:%M:%S%z">>],
?FORALL(S, erlang:system_time(second),
S == apply_func(date_to_unix_ts,
Args ++ [apply_func(format_date,
Args ++ [S])])).
%%------------------------------------------------------------------------------
%% Utility functions
%%------------------------------------------------------------------------------
ArgsDTUS ++ [apply_func(format_date,
Args ++ [S])])),
% no offset in format string. force add offset
Second = erlang:system_time(second),
Args3 = [<<"second">>, <<"+04:00">>, <<"--%m--%d--%Y---%H:%M:%S">>, Second],
Formatters3 = apply_func(format_date, Args3),
Args3DTUS = [<<"second">>, <<"+04:00">>, <<"--%m--%d--%Y---%H:%M:%S">>, Formatters3],
Second == apply_func(date_to_unix_ts, Args3DTUS).
apply_func(Name, Args) when is_atom(Name) ->
erlang:apply(emqx_rule_funcs, Name, Args);

View File

@ -1,4 +1,4 @@
ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-alpine3.15.1
ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-alpine3.15.1
ARG RUN_FROM=alpine:3.15.1
FROM ${BUILD_FROM} AS builder

View File

@ -463,7 +463,33 @@ log.file = emqx.log
## Log formatter
## Value: text | json
#log.formatter = text
log.formatter = text
## Format of the text logger.
##
## Value: rfc3339 | FORMAT
## Where FORMAT is the format string of the timestamp. Supported specifiers:
## %Y: year
## %m: month
## %d: day
## %H: hour
## %M: minute
## %S: second
## %N: nanoseconds (000000000 - 999999999)
## %6N: microseconds (00000 - 999999)
## %3N: milliseconds (000 - 999)
## %z: timezone, [+-]HHMM
## %:z: timezone, [+-]HH:MM
## %::z: timezone, [+-]HH:MM:SS
##
## For example:
## log.formatter.text.date.format = %Y-%m-%dT%H:%M:%S.%6N %:z
##
## Before 4.2, the default date format was:
## log.formatter.text.date.format = %Y-%m-%d %H:%M:%S.%3N
##
## Default: rfc3339
# log.formatter.text.date.format = rfc3339
## Log to single line
## Value: Boolean

View File

@ -563,6 +563,12 @@ end}.
{datatype, {enum, [text, json]}}
]}.
%% @doc format logs as text, date format part
{mapping, "log.formatter.text.date.format", "kernel.logger", [
{default, "rfc3339"},
{datatype, string}
]}.
%% @doc format logs in a single line.
{mapping, "log.single_line", "kernel.logger", [
{default, true},
@ -664,9 +670,38 @@ end}.
single_line => SingleLine
}};
text ->
DateFormat =
case cuttlefish:conf_get("log.formatter.text.date.format", Conf, "rfc3339") of
"rfc3339" ->
rfc3339;
DateStr ->
DateStrTrans =
fun
DST(<<>>, Formatter) -> lists:reverse(Formatter);
DST(<<"%Y", Tail/binary>>, Formatter) -> DST(Tail, [year | Formatter]);
DST(<<"%m", Tail/binary>>, Formatter) -> DST(Tail, [month | Formatter]);
DST(<<"%d", Tail/binary>>, Formatter) -> DST(Tail, [day | Formatter]);
DST(<<"%H", Tail/binary>>, Formatter) -> DST(Tail, [hour | Formatter]);
DST(<<"%M", Tail/binary>>, Formatter) -> DST(Tail, [minute | Formatter]);
DST(<<"%S", Tail/binary>>, Formatter) -> DST(Tail, [second | Formatter]);
DST(<<"%N", Tail/binary>>, Formatter) -> DST(Tail, [nanosecond | Formatter]);
DST(<<"%3N", Tail/binary>>, Formatter) -> DST(Tail, [millisecond | Formatter]);
DST(<<"%6N", Tail/binary>>, Formatter) -> DST(Tail, [microsecond | Formatter]);
DST(<<"%z", Tail/binary>>, Formatter) -> DST(Tail, [timezone | Formatter]);
DST(<<"%:z", Tail/binary>>, Formatter) -> DST(Tail, [timezone1 | Formatter]);
DST(<<"%::z", Tail/binary>>, Formatter) -> DST(Tail, [timezone2 | Formatter]);
DST(<<Char:8, Tail/binary>>, [Str | Formatter]) when is_list(Str) ->
DST(Tail, [lists:append(Str, [Char]) | Formatter]);
DST(<<Char:8, Tail/binary>>, Formatter) ->
DST(Tail, [[Char] | Formatter])
end,
DateStrTrans(list_to_binary(DateStr), [])
end,
{emqx_logger_textfmt,
#{template =>
[time," [",level,"] ",
#{
date_format => DateFormat,
template =>
[" [",level,"] ",
{clientid,
[{peername,
[clientid,"@",peername," "],

View File

@ -8,7 +8,7 @@
## i.e. will not work if docker command has to be executed with sudo
## example:
## ./scripts/buildx.sh --profile emqx --pkgtype zip --builder ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-debian10 --arch arm64
## ./scripts/buildx.sh --profile emqx --pkgtype zip --builder ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-debian10 --arch arm64
set -euo pipefail
@ -20,7 +20,7 @@ help() {
echo "--arch amd64|arm64: Target arch to build the EMQ X package for"
echo "--src_dir <SRC_DIR>: EMQ X source ode in this dir, default to PWD"
echo "--builder <BUILDER>: Builder image to pull"
echo " E.g. ghcr.io/emqx/emqx-builder/4.4-8:24.1.5-3-debian10"
echo " E.g. ghcr.io/emqx/emqx-builder/4.4-12:24.1.5-3-debian10"
}
while [ "$#" -gt 0 ]; do

View File

@ -2,7 +2,9 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.3",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
@ -17,7 +19,9 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup}]},
{"4.4.2",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@ -36,7 +40,9 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup}]},
{"4.4.1",
[{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
[{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{update,emqx_os_mon,{advanced,[]}},
@ -64,7 +70,9 @@
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{add_module,emqx_relup}]},
{"4.4.0",
[{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
[{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{update,emqx_os_mon,{advanced,[]}},
@ -97,7 +105,9 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.4.3",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
@ -111,7 +121,9 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup}]},
{"4.4.2",
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
[{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
@ -129,7 +141,9 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup}]},
{"4.4.1",
[{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
[{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{update,emqx_os_mon,{advanced,[]}},
@ -156,7 +170,9 @@
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{delete_module,emqx_relup}]},
{"4.4.0",
[{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
[{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{update,emqx_os_mon,{advanced,[]}},

436
src/emqx_calendar.erl Normal file
View File

@ -0,0 +1,436 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_calendar).
-define(SECONDS_PER_MINUTE, 60).
-define(SECONDS_PER_HOUR, 3600).
-define(SECONDS_PER_DAY, 86400).
-define(DAYS_PER_YEAR, 365).
-define(DAYS_PER_LEAP_YEAR, 366).
-define(DAYS_FROM_0_TO_1970, 719528).
-define(SECONDS_FROM_0_TO_1970, ?DAYS_FROM_0_TO_1970 * ?SECONDS_PER_DAY).
-export([ formatter/1
, format/3
, format/4
, parse/3
, offset_second/1
]).
-define(DATE_PART,
[ year
, month
, day
, hour
, minute
, second
, nanosecond
, millisecond
, microsecond
]).
-define(DATE_ZONE_NAME,
[ timezone
, timezone1
, timezone2
]).
formatter(FormatterStr) when is_list(FormatterStr) ->
formatter(list_to_binary(FormatterStr));
formatter(FormatterBin) when is_binary(FormatterBin) ->
do_formatter(FormatterBin, []).
offset_second(Offset) ->
offset_second_(Offset).
format(Time, Unit, Formatter) ->
format(Time, Unit, undefined, Formatter).
format(Time, Unit, Offset, FormatterBin) when is_binary(FormatterBin) ->
format(Time, Unit, Offset, formatter(FormatterBin));
format(Time, Unit, Offset, Formatter) ->
do_format(Time, time_unit(Unit), offset_second(Offset), Formatter).
parse(DateStr, Unit, FormatterBin) when is_binary(FormatterBin) ->
parse(DateStr, Unit, formatter(FormatterBin));
parse(DateStr, Unit, Formatter) ->
do_parse(DateStr, Unit, Formatter).
%% -------------------------------------------------------------------------------------------------
%% internal
time_unit(second) -> second;
time_unit(millisecond) -> millisecond;
time_unit(microsecond) -> microsecond;
time_unit(nanosecond) -> nanosecond;
time_unit("second") -> second;
time_unit("millisecond") -> millisecond;
time_unit("microsecond") -> microsecond;
time_unit("nanosecond") -> nanosecond;
time_unit(<<"second">>) -> second;
time_unit(<<"millisecond">>) -> millisecond;
time_unit(<<"microsecond">>) -> microsecond;
time_unit(<<"nanosecond">>) -> nanosecond.
%% -------------------------------------------------------------------------------------------------
%% internal: format part
do_formatter(<<>>, Formatter) -> lists:reverse(Formatter);
do_formatter(<<"%Y", Tail/binary>>, Formatter) -> do_formatter(Tail, [year | Formatter]);
do_formatter(<<"%m", Tail/binary>>, Formatter) -> do_formatter(Tail, [month | Formatter]);
do_formatter(<<"%d", Tail/binary>>, Formatter) -> do_formatter(Tail, [day | Formatter]);
do_formatter(<<"%H", Tail/binary>>, Formatter) -> do_formatter(Tail, [hour | Formatter]);
do_formatter(<<"%M", Tail/binary>>, Formatter) -> do_formatter(Tail, [minute | Formatter]);
do_formatter(<<"%S", Tail/binary>>, Formatter) -> do_formatter(Tail, [second | Formatter]);
do_formatter(<<"%N", Tail/binary>>, Formatter) -> do_formatter(Tail, [nanosecond | Formatter]);
do_formatter(<<"%3N", Tail/binary>>, Formatter) -> do_formatter(Tail, [millisecond | Formatter]);
do_formatter(<<"%6N", Tail/binary>>, Formatter) -> do_formatter(Tail, [microsecond | Formatter]);
do_formatter(<<"%z", Tail/binary>>, Formatter) -> do_formatter(Tail, [timezone | Formatter]);
do_formatter(<<"%:z", Tail/binary>>, Formatter) -> do_formatter(Tail, [timezone1 | Formatter]);
do_formatter(<<"%::z", Tail/binary>>, Formatter) -> do_formatter(Tail, [timezone2 | Formatter]);
do_formatter(<<Char:8, Tail/binary>>, [Str | Formatter]) when is_list(Str) ->
do_formatter(Tail, [lists:append(Str, [Char]) | Formatter]);
do_formatter(<<Char:8, Tail/binary>>, Formatter) -> do_formatter(Tail, [[Char] | Formatter]).
offset_second_(OffsetSecond) when is_integer(OffsetSecond) -> OffsetSecond;
offset_second_(undefined) -> 0;
offset_second_("local") -> offset_second_(local);
offset_second_(<<"local">>) -> offset_second_(local);
offset_second_(local) ->
UniversalTime = calendar:system_time_to_universal_time(erlang:system_time(second), second),
LocalTime = erlang:universaltime_to_localtime(UniversalTime),
LocalSecs = calendar:datetime_to_gregorian_seconds(LocalTime),
UniversalSecs = calendar:datetime_to_gregorian_seconds(UniversalTime),
LocalSecs - UniversalSecs;
offset_second_(Offset) when is_binary(Offset) ->
offset_second_(erlang:binary_to_list(Offset));
offset_second_("Z") -> 0;
offset_second_("z") -> 0;
offset_second_(Offset) when is_list(Offset) ->
Sign = hd(Offset),
((Sign == $+) orelse (Sign == $-))
orelse error({bad_time_offset, Offset}),
Signs = #{$+ => 1, $- => -1},
PosNeg = maps:get(Sign, Signs),
[Sign | HM] = Offset,
{HourStr, MinuteStr, SecondStr} =
case string:tokens(HM, ":") of
[H, M] ->
{H, M, "0"};
[H, M, S] ->
{H, M, S};
[HHMM] when erlang:length(HHMM) == 4 ->
{string:sub_string(HHMM, 1,2), string:sub_string(HHMM, 3,4), "0"};
_ ->
error({bad_time_offset, Offset})
end,
Hour = erlang:list_to_integer(HourStr),
Minute = erlang:list_to_integer(MinuteStr),
Second = erlang:list_to_integer(SecondStr),
(Hour =< 23) orelse error({bad_time_offset_hour, Hour}),
(Minute =< 59) orelse error({bad_time_offset_minute, Minute}),
(Second =< 59) orelse error({bad_time_offset_second, Second}),
PosNeg * (Hour * 3600 + Minute * 60 + Second).
do_format(Time, Unit, Offset, Formatter) ->
Adjustment = erlang:convert_time_unit(Offset, second, Unit),
AdjustedTime = Time + Adjustment,
Factor = factor(Unit),
Secs = AdjustedTime div Factor,
DateTime = system_time_to_datetime(Secs),
{{Year, Month, Day}, {Hour, Min, Sec}} = DateTime,
Date = #{
year => padding(Year, 4),
month => padding(Month, 2),
day => padding(Day, 2),
hour => padding(Hour, 2),
minute => padding(Min, 2),
second => padding(Sec, 2),
millisecond => trans_x_second(Unit, millisecond, Time),
microsecond => trans_x_second(Unit, microsecond, Time),
nanosecond => trans_x_second(Unit, nanosecond, Time)
},
Timezones = formatter_timezones(Offset, Formatter, #{}),
DateWithZone = maps:merge(Date, Timezones),
[maps:get(Key, DateWithZone, Key) || Key <- Formatter].
formatter_timezones(_Offset, [], Zones) -> Zones;
formatter_timezones(Offset, [Timezone | Formatter], Zones) ->
case lists:member(Timezone, [timezone, timezone1, timezone2]) of
true ->
NZones = Zones#{Timezone => offset_to_timezone(Offset, Timezone)},
formatter_timezones(Offset, Formatter, NZones);
false ->
formatter_timezones(Offset, Formatter, Zones)
end.
offset_to_timezone(Offset, Timezone) ->
Sign =
case Offset >= 0 of
true ->
$+;
false ->
$-
end,
{H, M, S} = seconds_to_time(abs(Offset)),
%% TODO: Support zone define %:::z
%% Numeric time zone with ":" to necessary precision (e.g., -04, +05:30).
case Timezone of
timezone ->
%% +0800
io_lib:format("~c~2.10.0B~2.10.0B", [Sign, H, M]);
timezone1 ->
%% +08:00
io_lib:format("~c~2.10.0B:~2.10.0B", [Sign, H, M]);
timezone2 ->
%% +08:00:00
io_lib:format("~c~2.10.0B:~2.10.0B:~2.10.0B", [Sign, H, M, S])
end.
factor(second) -> 1;
factor(millisecond) -> 1000;
factor(microsecond) -> 1000000;
factor(nanosecond) -> 1000000000.
system_time_to_datetime(Seconds) ->
gregorian_seconds_to_datetime(Seconds + ?SECONDS_FROM_0_TO_1970).
gregorian_seconds_to_datetime(Secs) when Secs >= 0 ->
Days = Secs div ?SECONDS_PER_DAY,
Rest = Secs rem ?SECONDS_PER_DAY,
{gregorian_days_to_date(Days), seconds_to_time(Rest)}.
seconds_to_time(Secs) when Secs >= 0, Secs < ?SECONDS_PER_DAY ->
Secs0 = Secs rem ?SECONDS_PER_DAY,
Hour = Secs0 div ?SECONDS_PER_HOUR,
Secs1 = Secs0 rem ?SECONDS_PER_HOUR,
Minute = Secs1 div ?SECONDS_PER_MINUTE,
Second = Secs1 rem ?SECONDS_PER_MINUTE,
{Hour, Minute, Second}.
gregorian_days_to_date(Days) ->
{Year, DayOfYear} = day_to_year(Days),
{Month, DayOfMonth} = year_day_to_date(Year, DayOfYear),
{Year, Month, DayOfMonth}.
day_to_year(DayOfEpoch) when DayOfEpoch >= 0 ->
YMax = DayOfEpoch div ?DAYS_PER_YEAR,
YMin = DayOfEpoch div ?DAYS_PER_LEAP_YEAR,
{Y1, D1} = dty(YMin, YMax, DayOfEpoch, dy(YMin), dy(YMax)),
{Y1, DayOfEpoch - D1}.
year_day_to_date(Year, DayOfYear) ->
ExtraDay =
case is_leap_year(Year) of
true ->
1;
false ->
0
end,
{Month, Day} = year_day_to_date2(ExtraDay, DayOfYear),
{Month, Day + 1}.
dty(Min, Max, _D1, DMin, _DMax) when Min == Max ->
{Min, DMin};
dty(Min, Max, D1, DMin, DMax) ->
Diff = Max - Min,
Mid = Min + Diff * (D1 - DMin) div (DMax - DMin),
MidLength =
case is_leap_year(Mid) of
true ->
?DAYS_PER_LEAP_YEAR;
false ->
?DAYS_PER_YEAR
end,
case dy(Mid) of
D2 when D1 < D2 ->
NewMax = Mid - 1,
dty(Min, NewMax, D1, DMin, dy(NewMax));
D2 when D1 - D2 >= MidLength ->
NewMin = Mid + 1,
dty(NewMin, Max, D1, dy(NewMin), DMax);
D2 ->
{Mid, D2}
end.
dy(Y) when Y =< 0 ->
0;
dy(Y) ->
X = Y - 1,
X div 4 - X div 100 + X div 400 + X * ?DAYS_PER_YEAR + ?DAYS_PER_LEAP_YEAR.
is_leap_year(Y) when is_integer(Y), Y >= 0 ->
is_leap_year1(Y).
is_leap_year1(Year) when Year rem 4 =:= 0, Year rem 100 > 0 ->
true;
is_leap_year1(Year) when Year rem 400 =:= 0 ->
true;
is_leap_year1(_) ->
false.
year_day_to_date2(_, Day) when Day < 31 ->
{1, Day};
year_day_to_date2(E, Day) when 31 =< Day, Day < 59 + E ->
{2, Day - 31};
year_day_to_date2(E, Day) when 59 + E =< Day, Day < 90 + E ->
{3, Day - (59 + E)};
year_day_to_date2(E, Day) when 90 + E =< Day, Day < 120 + E ->
{4, Day - (90 + E)};
year_day_to_date2(E, Day) when 120 + E =< Day, Day < 151 + E ->
{5, Day - (120 + E)};
year_day_to_date2(E, Day) when 151 + E =< Day, Day < 181 + E ->
{6, Day - (151 + E)};
year_day_to_date2(E, Day) when 181 + E =< Day, Day < 212 + E ->
{7, Day - (181 + E)};
year_day_to_date2(E, Day) when 212 + E =< Day, Day < 243 + E ->
{8, Day - (212 + E)};
year_day_to_date2(E, Day) when 243 + E =< Day, Day < 273 + E ->
{9, Day - (243 + E)};
year_day_to_date2(E, Day) when 273 + E =< Day, Day < 304 + E ->
{10, Day - (273 + E)};
year_day_to_date2(E, Day) when 304 + E =< Day, Day < 334 + E ->
{11, Day - (304 + E)};
year_day_to_date2(E, Day) when 334 + E =< Day ->
{12, Day - (334 + E)}.
trans_x_second(FromUnit, ToUnit, Time) ->
XSecond = do_trans_x_second(FromUnit, ToUnit, Time),
Len =
case ToUnit of
millisecond -> 3;
microsecond -> 6;
nanosecond -> 9
end,
padding(XSecond, Len).
do_trans_x_second(second, second, Time) -> Time div 60;
do_trans_x_second(second, _, _Time) -> 0;
do_trans_x_second(millisecond, millisecond, Time) -> Time rem 1000;
do_trans_x_second(millisecond, microsecond, Time) -> (Time rem 1000) * 1000;
do_trans_x_second(millisecond, nanosecond, Time) -> (Time rem 1000) * 1000_000;
do_trans_x_second(microsecond, millisecond, Time) -> Time div 1000 rem 1000;
do_trans_x_second(microsecond, microsecond, Time) -> Time rem 1000000;
do_trans_x_second(microsecond, nanosecond, Time) -> (Time rem 1000000) * 1000;
do_trans_x_second(nanosecond, millisecond, Time) -> Time div 1000000 rem 1000;
do_trans_x_second(nanosecond, microsecond, Time) -> Time div 1000 rem 1000000;
do_trans_x_second(nanosecond, nanosecond, Time) -> Time rem 1000000000.
padding(Data, Len) when is_integer(Data) ->
padding(integer_to_list(Data), Len);
padding(Data, Len) when Len > 0 andalso erlang:length(Data) < Len ->
[$0 | padding(Data, Len - 1)];
padding(Data, _Len) ->
Data.
%% -------------------------------------------------------------------------------------------------
%% internal
%% parse part
do_parse(DateStr, Unit, Formatter) ->
DateInfo = do_parse_date_str(DateStr, Formatter, #{}),
{Precise, PrecisionUnit} = precision(DateInfo),
Counter =
fun
(year, V, Res) ->
Res + dy(V) * ?SECONDS_PER_DAY * Precise - (?SECONDS_FROM_0_TO_1970 * Precise);
(month, V, Res) ->
Res + dm(V) * ?SECONDS_PER_DAY * Precise;
(day, V, Res) ->
Res + (V * ?SECONDS_PER_DAY * Precise);
(hour, V, Res) ->
Res + (V * ?SECONDS_PER_HOUR * Precise);
(minute, V, Res) ->
Res + (V * ?SECONDS_PER_MINUTE * Precise);
(second, V, Res) ->
Res + V * Precise;
(millisecond, V, Res) ->
case PrecisionUnit of
millisecond ->
Res + V;
microsecond ->
Res + (V * 1000);
nanosecond ->
Res + (V * 1000000)
end;
(microsecond, V, Res) ->
case PrecisionUnit of
microsecond ->
Res + V;
nanosecond ->
Res + (V * 1000)
end;
(nanosecond, V, Res) ->
Res + V;
(parsed_offset, V, Res) ->
Res - V
end,
Count = maps:fold(Counter, 0, DateInfo) - (?SECONDS_PER_DAY * Precise),
erlang:convert_time_unit(Count, PrecisionUnit, Unit).
precision(#{nanosecond := _}) -> {1000_000_000, nanosecond};
precision(#{microsecond := _}) -> {1000_000, microsecond};
precision(#{millisecond := _}) -> {1000, millisecond};
precision(#{second := _}) -> {1, second};
precision(_) -> {1, second}.
do_parse_date_str(<<>>, _, Result) -> Result;
do_parse_date_str(_, [], Result) -> Result;
do_parse_date_str(Date, [Key | Formatter], Result) ->
Size = date_size(Key),
<<DatePart:Size/binary-unit:8, Tail/binary>> = Date,
case lists:member(Key, ?DATE_PART) of
true ->
do_parse_date_str(Tail, Formatter, Result#{Key => erlang:binary_to_integer(DatePart)});
false ->
case lists:member(Key, ?DATE_ZONE_NAME) of
true ->
do_parse_date_str(Tail, Formatter, Result#{parsed_offset => offset_second(DatePart)});
false ->
do_parse_date_str(Tail, Formatter, Result)
end
end.
date_size(Str) when is_list(Str) -> erlang:length(Str);
date_size(year) -> 4;
date_size(month) -> 2;
date_size(day) -> 2;
date_size(hour) -> 2;
date_size(minute) -> 2;
date_size(second) -> 2;
date_size(millisecond) -> 3;
date_size(microsecond) -> 6;
date_size(nanosecond) -> 9;
date_size(timezone) -> 5;
date_size(timezone1) -> 6;
date_size(timezone2) -> 9.
dm(1) -> 0;
dm(2) -> 31;
dm(3) -> 59;
dm(4) -> 90;
dm(5) -> 120;
dm(6) -> 151;
dm(7) -> 181;
dm(8) -> 212;
dm(9) -> 243;
dm(10) -> 273;
dm(11) -> 304;
dm(12) -> 334.

View File

@ -28,11 +28,27 @@
, gl % not interesting
]).
check_config(X) -> logger_formatter:check_config(X).
check_config(Config0) ->
Config = maps:without([date_format], Config0),
logger_formatter:check_config(Config).
format(#{msg := Msg0, meta := Meta} = Event, Config) ->
format(#{msg := Msg0, meta := Meta} = Event,
#{date_format := rfc3339, template := Template0} = Config) ->
Msg = maybe_merge(Msg0, Meta),
logger_formatter:format(Event#{msg := Msg}, Config).
Template = [time | Template0],
logger_formatter:format(Event#{msg := Msg}, Config#{template => Template});
format(#{msg := Msg0, meta := Meta} = Event,
#{date_format := DFS} = Config) ->
Msg = maybe_merge(Msg0, Meta),
Time =
case maps:get(time, Event, undefined) of
undefined ->
erlang:system_time(microsecond);
T ->
T
end,
Date = emqx_calendar:format(Time, microsecond, local, DFS),
[Date | logger_formatter:format(Event#{msg := Msg}, Config)].
maybe_merge({report, Report}, Meta) when is_map(Report) ->
{report, maps:merge(Report, filter(Meta))};

View File

@ -486,8 +486,8 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
case emqx_inflight:is_full(Inflight) of
true ->
Session1 = case maybe_nack(Msg) of
true -> Session;
false -> enqueue(ClientInfo, Msg, Session)
drop -> Session;
store -> enqueue(ClientInfo, Msg, Session)
end,
{ok, Session1};
false ->
@ -695,7 +695,7 @@ redispatch_shared_messages(#session{inflight = Inflight}) ->
%% Note that dispatch is called with self() in failed subs
%% This is done to avoid dispatching back to caller
Delivery = #delivery{sender = self(), message = Msg},
emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]);
emqx_shared_sub:dispatch_to_non_self(Group, Topic, Delivery);
_ ->
false
end;

View File

@ -39,8 +39,8 @@
]).
-export([ dispatch/3
, dispatch/4
]).
, dispatch_to_non_self/3
]).
-export([ maybe_ack/1
, maybe_nack_dropped/1
@ -123,22 +123,28 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
record(Group, Topic, SubPid) ->
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
-spec(dispatch_to_non_self(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery())
-> emqx_types:deliver_result()).
dispatch_to_non_self(Group, Topic, Delivery) ->
Strategy = strategy(Group),
dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{self() => sender}).
-spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery())
-> emqx_types:deliver_result()).
dispatch(Group, Topic, Delivery) ->
dispatch(Group, Topic, Delivery, _FailedSubs = []).
Strategy = strategy(Group),
dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}).
dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
#message{from = ClientId, topic = SourceTopic} = Msg,
case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of
false ->
{error, no_subscribers};
case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of
false -> {error, no_subscribers};
{Type, SubPid} ->
case do_dispatch(SubPid, Group, Topic, Msg, Type) of
ok -> {ok, 1};
{error, _Reason} ->
{error, Reason} ->
%% Failed to dispatch to this sub, try next.
dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])
dispatch(Strategy, Group, Topic, Delivery, FailedSubs#{SubPid => Reason})
end
end.
@ -165,33 +171,33 @@ do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
%% For QoS 0 message, send it as regular dispatch
SubPid ! {deliver, Topic, Msg},
ok;
do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
%% Retry implies all subscribers nack:ed, send again without ack
SubPid ! {deliver, Topic, Msg},
ok;
do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
do_dispatch(SubPid, Group, Topic, Msg, Type) ->
case ack_enabled() of
true ->
dispatch_with_ack(SubPid, Group, Topic, Msg);
dispatch_with_ack(SubPid, Group, Topic, Msg, Type);
false ->
SubPid ! {deliver, Topic, Msg},
ok
end.
dispatch_with_ack(SubPid, Group, Topic, Msg) ->
dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
%% For QoS 1/2 message, expect an ack
Ref = erlang:monitor(process, SubPid),
Sender = self(),
SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)},
SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)},
Timeout = case Msg#message.qos of
?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
?QOS_2 -> infinity
end,
%% This OpaqueRef is a forward compatibilty workaround. Pre 4.3.15 versions
%% pass Ref from `{Sender, Ref}` ack header back as it is.
OpaqueRef = old_ref(Type, Group, Ref),
try
receive
{Ref, ?ACK} ->
{ReceivedRef, ?ACK} when ReceivedRef =:= Ref; ReceivedRef =:= OpaqueRef ->
ok;
{Ref, ?NACK(Reason)} ->
{ReceivedRef, ?NACK(Reason)} when ReceivedRef =:= Ref; ReceivedRef =:= OpaqueRef ->
%% the receive session may nack this message when its queue is full
{error, Reason};
{'DOWN', Ref, process, SubPid, Reason} ->
@ -204,8 +210,11 @@ dispatch_with_ack(SubPid, Group, Topic, Msg) ->
_ = erlang:demonitor(Ref, [flush])
end.
with_group_ack(Msg, Group, Sender, Ref) ->
emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg).
with_group_ack(Msg, Group, Type, Sender, Ref) ->
emqx_message:set_headers(#{shared_dispatch_ack => {Sender, old_ref(Type, Group, Ref)}}, Msg).
old_ref(Type, Group, Ref) ->
{Type, Group, Ref}.
-spec(without_group_ack(emqx_types:message()) -> emqx_types:message()).
without_group_ack(Msg) ->
@ -217,19 +226,32 @@ get_group_ack(Msg) ->
-spec(get_group(emqx_types:message()) -> {ok, any()} | error).
get_group(Msg) ->
case get_group_ack(Msg) of
?NO_ACK -> error;
{Group, _Sender, _Ref} -> {ok, Group}
{_Sender, {_Type, Group, _Ref}} -> {ok, Group};
_ -> error
end.
-spec(is_ack_required(emqx_types:message()) -> boolean()).
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
%% @doc Negative ack dropped message due to inflight window or message queue being full.
-spec(maybe_nack_dropped(emqx_types:message()) -> boolean()).
-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop).
maybe_nack_dropped(Msg) ->
case get_group_ack(Msg) of
?NO_ACK -> false;
{_Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped)
%% No ack header is present, put it into mqueue
?NO_ACK -> store;
%% For fresh Ref we send a nack and return true, to note that the inflight is full
{Sender, {fresh, _Group, Ref}} -> nack(Sender, Ref, dropped), drop;
%% For retry Ref we can't reject a message if inflight is full, so we mark it as
%% acknowledged and put it into mqueue
{_Sender, {retry, _Group, _Ref}} -> maybe_ack(Msg), store;
%% This clause is for backward compatibility
Ack ->
{Sender, Ref} = fetch_sender_ref(Ack),
nack(Sender, Ref, dropped),
drop
end.
%% @doc Negative ack message due to connection down.
@ -237,7 +259,7 @@ maybe_nack_dropped(Msg) ->
%% i.e is_ack_required returned true.
-spec(nack_no_connection(emqx_types:message()) -> ok).
nack_no_connection(Msg) ->
{_Group, Sender, Ref} = get_group_ack(Msg),
{Sender, Ref} = fetch_sender_ref(get_group_ack(Msg)),
nack(Sender, Ref, no_connection).
-spec(nack(pid(), reference(), dropped | no_connection) -> ok).
@ -250,11 +272,16 @@ maybe_ack(Msg) ->
case get_group_ack(Msg) of
?NO_ACK ->
Msg;
{_Group, Sender, Ref} ->
Ack ->
{Sender, Ref} = fetch_sender_ref(Ack),
Sender ! {Ref, ?ACK},
without_group_ack(Msg)
end.
fetch_sender_ref({Sender, {_Type, _Group, Ref}}) -> {Sender, Ref};
%% These clauses are for backward compatibility
fetch_sender_ref({Sender, Ref}) -> {Sender, Ref}.
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
case is_active_sub(Sub0, FailedSubs) of
@ -264,23 +291,37 @@ pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
{fresh, Sub0};
false ->
%% randomly pick one for the first message
{Type, Sub} = do_pick(random, ClientId, SourceTopic, Group, Topic, [Sub0 | FailedSubs]),
%% stick to whatever pick result
erlang:put({shared_sub_sticky, Group, Topic}, Sub),
{Type, Sub}
FailedSubs1 = maps_put_new(FailedSubs, Sub0, inactive),
case do_pick(random, ClientId, SourceTopic, Group, Topic, FailedSubs1) of
false -> false;
{Type, Sub} ->
%% stick to whatever pick result
erlang:put({shared_sub_sticky, Group, Topic}, Sub),
{Type, Sub}
end
end;
pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs).
do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
All = subscribers(Group, Topic),
case All -- FailedSubs of
case lists:filter(fun(Sub) -> not maps:is_key(Sub, FailedSubs) end, All) of
[] when All =:= [] ->
%% Genuinely no subscriber
false;
[] ->
%% All offline? pick one anyway
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)};
%% We try redispatch to subs who dropped the message because inflight was full.
Found = maps_find_by(FailedSubs, fun({SubPid, FailReason}) ->
FailReason == dropped andalso is_alive_sub(SubPid)
end),
case Found of
{ok, Dropped, dropped} ->
%% Found dropped client
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, [Dropped])};
error ->
%% All offline? pick one anyway
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)}
end;
Subs ->
%% More than one available
{fresh, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs)}
@ -414,7 +455,7 @@ update_stats(State) ->
%% Return 'true' if the subscriber process is alive AND not in the failed list
is_active_sub(Pid, FailedSubs) ->
is_alive_sub(Pid) andalso not lists:member(Pid, FailedSubs).
not maps:is_key(Pid, FailedSubs) andalso is_alive_sub(Pid).
%% erlang:is_process_alive/1 does not work with remote pid.
is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
@ -427,3 +468,22 @@ delete_route_if_needed({Group, Topic}) ->
true -> ok;
false -> ok = emqx_router:do_delete_route(Topic, {Group, node()})
end.
maps_find_by(Map, Predicate) when is_map(Map) ->
maps_find_by(maps:iterator(Map), Predicate);
maps_find_by(Iterator, Predicate) ->
case maps:next(Iterator) of
none -> error;
{Key, Value, NewIterator} ->
case Predicate(Key, Value) of
true -> {ok, Key, Value};
false -> maps_find_by(NewIterator, Predicate)
end
end.
maps_put_new(Map, Key, Value) ->
case Map of
#{Key := _} -> Map;
_ -> Map#{Key => Value}
end.

View File

@ -47,20 +47,20 @@ t_is_ack_required(_) ->
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
t_maybe_nack_dropped(_) ->
?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)),
?assertEqual(store, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(drop, emqx_shared_sub:maybe_nack_dropped(Msg)),
?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end).
t_nack_no_connection(_) ->
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok
after 100 -> timeout end).
t_maybe_ack(_) ->
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}},
emqx_shared_sub:maybe_ack(Msg)),
?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).
@ -446,6 +446,53 @@ t_redispatch(_) ->
emqtt:stop(UsedSubPid2),
ok.
t_dispatch_when_inflights_are_full(_) ->
ok = ensure_config(round_robin, true),
Topic = <<"foo/bar">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
%% Note that max_inflight is 1
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {max_inflight, 1}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {max_inflight, 1}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar">>, 2}),
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar">>, 2}),
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
ct:sleep(100),
sys:suspend(ConnPid1),
sys:suspend(ConnPid2),
%% Fill in the inflight for first client
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
%% Fill in the inflight for second client
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
%% Now kill any client
erlang:exit(ConnPid1, normal),
ct:sleep(100),
%% And try to send the message
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
%% And see that it gets dispatched to the client which is alive, even if it's inflight is full
sys:resume(ConnPid2),
ct:sleep(100),
?assertMatch({true, ConnPid2}, last_message(<<"hello3">>, [ConnPid1, ConnPid2])),
?assertMatch({true, ConnPid2}, last_message(<<"hello4">>, [ConnPid1, ConnPid2])),
emqtt:stop(ConnPid2),
ok.
%%--------------------------------------------------------------------
%% help functions
%%--------------------------------------------------------------------