Merge remote-tracking branch 'upstream/master' into 0817-e5.2.0-code-freeze

This commit is contained in:
Ivan Dyachkov 2023-08-17 08:16:04 +02:00
commit 7469222a17
101 changed files with 1538 additions and 738 deletions

View File

@ -63,7 +63,8 @@ jobs:
./actionlint -color \
-shellcheck= \
-ignore 'label ".+" is unknown' \
-ignore 'value "emqx-enterprise" in "exclude"'
-ignore 'value "emqx-enterprise" in "exclude"' \
-ignore 'value "emqx-enterprise-elixir" in "exclude"'
- name: Check line-break at EOF
run: |
./scripts/check-nl-at-eof.sh
@ -146,6 +147,17 @@ jobs:
path: ${{ matrix.profile }}.zip
retention-days: 1
run_emqx_app_tests:
needs:
- sanity-checks
- compile
uses: ./.github/workflows/run_emqx_app_tests.yaml
with:
runner: ${{ needs.sanity-checks.outputs.runner }}
builder: ${{ needs.sanity-checks.outputs.builder }}
before_ref: ${{ github.event_name == 'pull_request' && github.event.pull_request.base.sha || github.event.before }}
after_ref: ${{ github.sha }}
run_test_cases:
needs:
- sanity-checks

View File

@ -158,6 +158,17 @@ jobs:
path: ${{ matrix.profile }}.zip
retention-days: 1
run_emqx_app_tests:
needs:
- prepare
- compile
uses: ./.github/workflows/run_emqx_app_tests.yaml
with:
runner: ${{ needs.prepare.outputs.runner }}
builder: ${{ needs.prepare.outputs.builder }}
before_ref: ${{ github.event.before }}
after_ref: ${{ github.sha }}
run_test_cases:
if: needs.prepare.outputs.release != 'true'
needs:

View File

@ -88,6 +88,11 @@ jobs:
registry:
- 'docker.io'
- 'public.ecr.aws'
exclude:
- profile: emqx-enterprise
registry: 'public.ecr.aws'
- profile: emqx-enterprise-elixir
registry: 'public.ecr.aws'
steps:
- uses: actions/checkout@v3

View File

@ -13,8 +13,6 @@ jobs:
linux:
if: github.repository_owner == 'emqx'
runs-on: aws-${{ matrix.arch }}
# always run in builder container because the host might have the wrong OTP version etc.
# otherwise buildx.sh does not run docker if arch and os matches the target arch and os.
container:
image: "ghcr.io/emqx/emqx-builder/${{ matrix.builder }}:${{ matrix.elixir }}-${{ matrix.otp }}-${{ matrix.os }}"
@ -30,6 +28,7 @@ jobs:
arch:
- amd64
os:
- debian10
- ubuntu22.04
- amzn2023
builder:
@ -48,25 +47,32 @@ jobs:
ref: ${{ matrix.profile[1] }}
fetch-depth: 0
- name: build emqx packages
env:
ELIXIR: ${{ matrix.elixir }}
PROFILE: ${{ matrix.profile[0] }}
ARCH: ${{ matrix.arch }}
- name: fix workdir
run: |
set -eu
git config --global --add safe.directory "$GITHUB_WORKSPACE"
PKGTYPES="tgz pkg"
IS_ELIXIR="no"
for PKGTYPE in ${PKGTYPES};
do
./scripts/buildx.sh \
--profile "${PROFILE}" \
--pkgtype "${PKGTYPE}" \
--arch "${ARCH}" \
--elixir "${IS_ELIXIR}" \
--builder "force_host"
done
# Align path for CMake caches
if [ ! "$PWD" = "/emqx" ]; then
ln -s $PWD /emqx
cd /emqx
fi
echo "pwd is $PWD"
- name: build emqx packages
env:
PROFILE: ${{ matrix.profile[0] }}
ACLOCAL_PATH: "/usr/share/aclocal:/usr/local/share/aclocal"
run: |
set -eu
make "${PROFILE}-tgz"
make "${PROFILE}-pkg"
- name: test emqx packages
env:
PROFILE: ${{ matrix.profile[0] }}
run: |
set -eu
./scripts/pkg-tests.sh "${PROFILE}-tgz"
./scripts/pkg-tests.sh "${PROFILE}-pkg"
- uses: actions/upload-artifact@v3
if: success()
with:

View File

@ -59,7 +59,7 @@ jobs:
with:
asset_paths: '["packages/*"]'
- name: update to emqx.io
if: startsWith(github.ref_name, 'v') && (github.event_name == 'release' || inputs.publish_release_artefacts)
if: startsWith(github.ref_name, 'v') && ((github.event_name == 'release' && !github.event.prerelease) || inputs.publish_release_artefacts)
run: |
set -eux
curl -w %{http_code} \
@ -70,6 +70,7 @@ jobs:
-d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ github.ref_name }}\" }" \
${{ secrets.EMQX_IO_RELEASE_API }}
- name: Push to packagecloud.io
if: (github.event_name == 'release' && !github.event.prerelease) || inputs.publish_release_artefacts
env:
PROFILE: ${{ steps.profile.outputs.profile }}
VERSION: ${{ steps.profile.outputs.version }}

View File

@ -0,0 +1,65 @@
name: Check emqx app standalone
# These tests are needed because we provide the `emqx` application as a standalone
# dependency for plugins.
concurrency:
group: test-standalone-${{ github.event_name }}-${{ github.ref }}
cancel-in-progress: true
on:
workflow_call:
inputs:
runner:
required: true
type: string
builder:
required: true
type: string
before_ref:
required: true
type: string
after_ref:
required: true
type: string
env:
IS_CI: "yes"
jobs:
run_emqx_app_tests:
runs-on: ${{ inputs.runner }}
container: ${{ inputs.builder }}
defaults:
run:
shell: bash
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- name: run
env:
BEFORE_REF: ${{ inputs.before_ref }}
AFTER_REF: ${{ inputs.after_ref }}
run: |
git config --global --add safe.directory "$GITHUB_WORKSPACE"
changed_files="$(git diff --name-only ${BEFORE_REF} ${AFTER_REF} apps/emqx)"
if [ "$changed_files" = '' ]; then
echo "nothing changed in apps/emqx, ignored."
exit 0
fi
make ensure-rebar3
cp rebar3 apps/emqx/
cd apps/emqx
./rebar3 xref
./rebar3 dialyzer
./rebar3 eunit -v
./rebar3 as standalone_test ct --name 'test@127.0.0.1' -v --readable=true
./rebar3 proper -d test/props
- uses: actions/upload-artifact@v3
if: failure()
with:
name: logs-${{ inputs.runner }}
path: apps/emqx/_build/test/logs

View File

@ -43,6 +43,7 @@ jobs:
;;
esac
- uses: emqx/push-helm-action@v1.1
if: github.event_name == 'release' && !github.event.prerelease
with:
charts_dir: "${{ github.workspace }}/deploy/charts/${{ steps.profile.outputs.profile }}"
version: ${{ steps.profile.outputs.version }}

View File

@ -1,7 +1,7 @@
# EMQX
[![GitHub Release](https://img.shields.io/github/release/emqx/emqx?color=brightgreen&label=Release)](https://github.com/emqx/emqx/releases)
[![Build Status](https://github.com/emqx/emqx/actions/workflows/run_test_cases.yaml/badge.svg)](https://github.com/emqx/emqx/actions/workflows/run_test_cases.yaml)
[![Build Status](https://github.com/emqx/emqx/actions/workflows/_push-entrypoint.yaml/badge.svg)](https://github.com/emqx/emqx/actions/workflows/_push-entrypoint.yaml)
[![Coverage Status](https://img.shields.io/coveralls/github/emqx/emqx/master?label=Coverage)](https://coveralls.io/github/emqx/emqx?branch=master)
[![Docker Pulls](https://img.shields.io/docker/pulls/emqx/emqx?label=Docker%20Pulls)](https://hub.docker.com/r/emqx/emqx)
[![Slack](https://img.shields.io/badge/Slack-EMQ-39AE85?logo=slack)](https://slack-invite.emqx.io/)

View File

@ -1,7 +1,7 @@
# Брокер EMQX
[![GitHub Release](https://img.shields.io/github/release/emqx/emqx?color=brightgreen&label=Release)](https://github.com/emqx/emqx/releases)
[![Build Status](https://github.com/emqx/emqx/actions/workflows/run_test_cases.yaml/badge.svg)](https://github.com/emqx/emqx/actions/workflows/run_test_cases.yaml)
[![Build Status](https://github.com/emqx/emqx/actions/workflows/_push-entrypoint.yaml/badge.svg)](https://github.com/emqx/emqx/actions/workflows/_push-entrypoint.yaml)
[![Coverage Status](https://img.shields.io/coveralls/github/emqx/emqx/master?label=Coverage)](https://coveralls.io/github/emqx/emqx?branch=master)
[![Docker Pulls](https://img.shields.io/docker/pulls/emqx/emqx?label=Docker%20Pulls)](https://hub.docker.com/r/emqx/emqx)
[![Slack](https://img.shields.io/badge/Slack-EMQ-39AE85?logo=slack)](https://slack-invite.emqx.io/)

View File

@ -1,7 +1,7 @@
# EMQX
[![GitHub Release](https://img.shields.io/github/release/emqx/emqx?color=brightgreen&label=Release)](https://github.com/emqx/emqx/releases)
[![Build Status](https://github.com/emqx/emqx/actions/workflows/run_test_cases.yaml/badge.svg)](https://github.com/emqx/emqx/actions/workflows/run_test_cases.yaml)
[![Build Status](https://github.com/emqx/emqx/actions/workflows/_push-entrypoint.yaml/badge.svg)](https://github.com/emqx/emqx/actions/workflows/_push-entrypoint.yaml)
[![Coverage Status](https://img.shields.io/coveralls/github/emqx/emqx/master?label=Coverage)](https://coveralls.io/github/emqx/emqx?branch=master)
[![Docker Pulls](https://img.shields.io/docker/pulls/emqx/emqx?label=Docker%20Pulls)](https://hub.docker.com/r/emqx/emqx)
[![Slack](https://img.shields.io/badge/Slack-EMQ-39AE85?logo=slack)](https://slack-invite.emqx.io/)

View File

@ -680,6 +680,7 @@ end).
-define(THROW_SERIALIZE_ERROR(Reason), erlang:throw({?FRAME_SERIALIZE_ERROR, Reason})).
-define(MAX_PAYLOAD_FORMAT_SIZE, 1024).
-define(TRUNCATED_PAYLOAD_SIZE, 100).
-define(MAX_PAYLOAD_FORMAT_LIMIT(Bin), (byte_size(Bin) =< ?MAX_PAYLOAD_FORMAT_SIZE)).
-endif.

View File

View File

@ -30,7 +30,7 @@
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.10"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
@ -41,6 +41,16 @@
{extra_src_dirs, [{"etc", [recursive]}]}.
{profiles, [
{test, [
{deps, [
{meck, "0.9.2"},
{proper, "1.4.0"},
{bbmustache, "1.10.0"},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.6"}}}
]},
{extra_src_dirs, [{"test", [recursive]},
{"integration_test", [recursive]}]}
]},
{standalone_test, [
{deps, [
{meck, "0.9.2"},
{proper, "1.4.0"},

View File

@ -14,7 +14,6 @@
esockd,
cowboy,
sasl,
os_mon,
lc,
hocon,
emqx_durable_storage

View File

@ -213,7 +213,7 @@ format(Node, #deactivated_alarm{
to_rfc3339(Timestamp) ->
%% rfc3339 accuracy to millisecond
list_to_binary(calendar:system_time_to_rfc3339(Timestamp div 1000, [{unit, millisecond}])).
emqx_utils_calendar:epoch_to_rfc3339(Timestamp div 1000).
%%--------------------------------------------------------------------
%% gen_server callbacks

View File

@ -38,7 +38,8 @@
delete/1,
info/1,
format/1,
parse/1
parse/1,
clear/0
]).
%% gen_server callbacks
@ -171,7 +172,7 @@ maybe_format_host({As, Who}) ->
{As, Who}.
to_rfc3339(Timestamp) ->
list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])).
emqx_utils_calendar:epoch_to_rfc3339(Timestamp, second).
-spec create(emqx_types:banned() | map()) ->
{ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}.
@ -226,6 +227,10 @@ delete(Who) ->
info(InfoKey) ->
mnesia:table_info(?BANNED_TAB, InfoKey).
clear() ->
_ = mria:clear_table(?BANNED_TAB),
ok.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------

View File

@ -1,137 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_datetime).
-include_lib("typerefl/include/types.hrl").
%% API
-export([
to_epoch_millisecond/1,
to_epoch_second/1,
human_readable_duration_string/1
]).
-export([
epoch_to_rfc3339/1,
epoch_to_rfc3339/2
]).
-reflect_type([
epoch_millisecond/0,
epoch_second/0
]).
-type epoch_second() :: non_neg_integer().
-type epoch_millisecond() :: non_neg_integer().
-typerefl_from_string({epoch_second/0, ?MODULE, to_epoch_second}).
-typerefl_from_string({epoch_millisecond/0, ?MODULE, to_epoch_millisecond}).
to_epoch_second(DateTime) ->
to_epoch(DateTime, second).
to_epoch_millisecond(DateTime) ->
to_epoch(DateTime, millisecond).
to_epoch(DateTime, Unit) ->
try
case string:to_integer(DateTime) of
{Epoch, []} when Epoch >= 0 -> {ok, Epoch};
{_Epoch, []} -> {error, bad_epoch};
_ -> {ok, calendar:rfc3339_to_system_time(DateTime, [{unit, Unit}])}
end
catch
error:_ ->
{error, bad_rfc3339_timestamp}
end.
epoch_to_rfc3339(TimeStamp) ->
epoch_to_rfc3339(TimeStamp, millisecond).
epoch_to_rfc3339(TimeStamp, Unit) when is_integer(TimeStamp) ->
list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])).
-spec human_readable_duration_string(integer()) -> string().
human_readable_duration_string(Milliseconds) ->
Seconds = Milliseconds div 1000,
{D, {H, M, S}} = calendar:seconds_to_daystime(Seconds),
L0 = [{D, " days"}, {H, " hours"}, {M, " minutes"}, {S, " seconds"}],
L1 = lists:dropwhile(fun({K, _}) -> K =:= 0 end, L0),
L2 = lists:map(fun({Time, Unit}) -> [integer_to_list(Time), Unit] end, L1),
lists:flatten(lists:join(", ", L2)).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(nowarn_export_all).
-compile(export_all).
roots() -> [bar].
fields(bar) ->
[
{second, ?MODULE:epoch_second()},
{millisecond, ?MODULE:epoch_millisecond()}
].
-define(FORMAT(_Sec_, _Ms_),
lists:flatten(
io_lib:format("bar={second=~w,millisecond=~w}", [_Sec_, _Ms_])
)
).
epoch_ok_test() ->
Args = [
{0, 0, 0, 0},
{1, 1, 1, 1},
{"2022-01-01T08:00:00+08:00", "2022-01-01T08:00:00+08:00", 1640995200, 1640995200000}
],
lists:foreach(
fun({Sec, Ms, EpochSec, EpochMs}) ->
check_ok(?FORMAT(Sec, Ms), EpochSec, EpochMs)
end,
Args
),
ok.
check_ok(Input, Sec, Ms) ->
{ok, Data} = hocon:binary(Input, #{}),
?assertMatch(
#{bar := #{second := Sec, millisecond := Ms}},
hocon_tconf:check_plain(?MODULE, Data, #{atom_key => true}, [bar])
),
ok.
epoch_failed_test() ->
Args = [
{-1, -1},
{"1s", "1s"},
{"2022-13-13T08:00:00+08:00", "2022-13-13T08:00:00+08:00"}
],
lists:foreach(
fun({Sec, Ms}) ->
check_failed(?FORMAT(Sec, Ms))
end,
Args
),
ok.
check_failed(Input) ->
{ok, Data} = hocon:binary(Input, #{}),
?assertException(
throw,
_,
hocon_tconf:check_plain(?MODULE, Data, #{atom_key => true}, [bar])
),
ok.
-endif.

View File

@ -38,15 +38,14 @@
%% gen_server callbacks
-export([
init/1,
handle_continue/2,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-ifdef(TEST).
-export([is_sysmem_check_supported/0]).
-endif.
-export([is_os_check_supported/0]).
-include("emqx.hrl").
@ -56,7 +55,7 @@ start_link() ->
gen_server:start_link({local, ?OS_MON}, ?MODULE, [], []).
update(OS) ->
erlang:send(?MODULE, {monitor_conf_update, OS}).
gen_server:cast(?MODULE, {monitor_conf_update, OS}).
%%--------------------------------------------------------------------
%% API
@ -83,12 +82,17 @@ current_sysmem_percent() ->
%%--------------------------------------------------------------------
init([]) ->
{ok, undefined, {continue, setup}}.
handle_continue(setup, undefined) ->
%% start os_mon temporarily
{ok, _} = application:ensure_all_started(os_mon),
%% memsup is not reliable, ignore
memsup:set_sysmem_high_watermark(1.0),
SysHW = init_os_monitor(),
MemRef = start_mem_check_timer(),
CpuRef = start_cpu_check_timer(),
{ok, #{sysmem_high_watermark => SysHW, mem_time_ref => MemRef, cpu_time_ref => CpuRef}}.
{noreply, #{sysmem_high_watermark => SysHW, mem_time_ref => MemRef, cpu_time_ref => CpuRef}}.
init_os_monitor() ->
init_os_monitor(emqx:get_config([sysmon, os])).
@ -110,6 +114,12 @@ handle_call({set_sysmem_high_watermark, New}, _From, #{sysmem_high_watermark :=
handle_call(Req, _From, State) ->
{reply, {error, {unexpected_call, Req}}, State}.
handle_cast({monitor_conf_update, OS}, State) ->
cancel_outdated_timer(State),
SysHW = init_os_monitor(OS),
MemRef = start_mem_check_timer(),
CpuRef = start_cpu_check_timer(),
{noreply, #{sysmem_high_watermark => SysHW, mem_time_ref => MemRef, cpu_time_ref => CpuRef}};
handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}.
@ -151,12 +161,6 @@ handle_info({timeout, _Timer, cpu_check}, State) ->
end,
Ref = start_cpu_check_timer(),
{noreply, State#{cpu_time_ref => Ref}};
handle_info({monitor_conf_update, OS}, State) ->
cancel_outdated_timer(State),
SysHW = init_os_monitor(OS),
MemRef = start_mem_check_timer(),
CpuRef = start_cpu_check_timer(),
{noreply, #{sysmem_high_watermark => SysHW, mem_time_ref => MemRef, cpu_time_ref => CpuRef}};
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}.
@ -182,12 +186,12 @@ start_cpu_check_timer() ->
_ -> start_timer(Interval, cpu_check)
end.
is_sysmem_check_supported() ->
is_os_check_supported() ->
{unix, linux} =:= os:type().
start_mem_check_timer() ->
Interval = emqx:get_config([sysmon, os, mem_check_interval]),
case is_integer(Interval) andalso is_sysmem_check_supported() of
case is_integer(Interval) andalso is_os_check_supported() of
true ->
start_timer(Interval, mem_check);
false ->
@ -205,7 +209,7 @@ update_mem_alarm_status(HWM) when HWM > 1.0 orelse HWM < 0.0 ->
<<"Deactivated mem usage alarm due to out of range threshold">>
);
update_mem_alarm_status(HWM) ->
is_sysmem_check_supported() andalso
is_os_check_supported() andalso
do_update_mem_alarm_status(HWM),
ok.

View File

@ -55,6 +55,8 @@
format/2
]).
-export([format_truncated_payload/3]).
-define(TYPE_NAMES,
{'CONNECT', 'CONNACK', 'PUBLISH', 'PUBACK', 'PUBREC', 'PUBREL', 'PUBCOMP', 'SUBSCRIBE',
'SUBACK', 'UNSUBSCRIBE', 'UNSUBACK', 'PINGREQ', 'PINGRESP', 'DISCONNECT', 'AUTH'}
@ -614,21 +616,33 @@ format_password(undefined) -> "";
format_password(<<>>) -> "";
format_password(_Password) -> "******".
format_payload(_, hidden) ->
"Payload=******";
format_payload(Payload, text) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) ->
["Payload=", unicode:characters_to_list(Payload)];
format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) ->
["Payload(hex)=", binary:encode_hex(Payload)];
format_payload(_, hidden) ->
"Payload=******";
format_payload(<<Part:100, _/binary>> = Payload, _) ->
format_payload(<<Part:?TRUNCATED_PAYLOAD_SIZE/binary, _/binary>> = Payload, Type) ->
[
"Payload=",
Part,
"... The ",
integer_to_list(byte_size(Payload) - 100),
" bytes of this log are truncated"
format_truncated_payload(Part, byte_size(Payload), Type)
].
format_truncated_payload(Bin, Size, Type) ->
Bin2 =
case Type of
text -> Bin;
hex -> binary:encode_hex(Bin)
end,
unicode:characters_to_list(
[
Bin2,
"... The ",
integer_to_list(Size - ?TRUNCATED_PAYLOAD_SIZE),
" bytes of this log are truncated"
]
).
i(true) -> 1;
i(false) -> 0;
i(I) when is_integer(I) -> I.

View File

@ -46,7 +46,6 @@
-type timeout_duration_s() :: 0..?MAX_INT_TIMEOUT_S.
-type timeout_duration_ms() :: 0..?MAX_INT_TIMEOUT_MS.
-type bytesize() :: integer().
-type mqtt_max_packet_size() :: 1..?MAX_INT_MQTT_PACKET_SIZE.
-type wordsize() :: bytesize().
-type percent() :: float().
-type file() :: string().
@ -73,7 +72,6 @@
-typerefl_from_string({timeout_duration_s/0, emqx_schema, to_timeout_duration_s}).
-typerefl_from_string({timeout_duration_ms/0, emqx_schema, to_timeout_duration_ms}).
-typerefl_from_string({bytesize/0, emqx_schema, to_bytesize}).
-typerefl_from_string({mqtt_max_packet_size/0, emqx_schema, to_bytesize}).
-typerefl_from_string({wordsize/0, emqx_schema, to_wordsize}).
-typerefl_from_string({percent/0, emqx_schema, to_percent}).
-typerefl_from_string({comma_separated_list/0, emqx_schema, to_comma_separated_list}).
@ -93,6 +91,7 @@
-export([
validate_heap_size/1,
validate_packet_size/1,
user_lookup_fun_tr/2,
validate_alarm_actions/1,
validate_keepalive_multiplier/1,
@ -154,7 +153,6 @@
timeout_duration_s/0,
timeout_duration_ms/0,
bytesize/0,
mqtt_max_packet_size/0,
wordsize/0,
percent/0,
file/0,
@ -1584,7 +1582,7 @@ fields("sysmon_os") ->
sc(
hoconsc:union([disabled, duration()]),
#{
default => <<"60s">>,
default => default_mem_check_interval(),
desc => ?DESC(sysmon_os_mem_check_interval)
}
)},
@ -2003,8 +2001,8 @@ filter(Opts) ->
%% SSL listener and client.
-spec common_ssl_opts_schema(map(), server | client) -> hocon_schema:field_schema().
common_ssl_opts_schema(Defaults, Type) ->
D = fun(Field) -> maps:get(to_atom(Field), Defaults, undefined) end,
Df = fun(Field, Default) -> maps:get(to_atom(Field), Defaults, Default) end,
D = fun(Field) -> maps:get(Field, Defaults, undefined) end,
Df = fun(Field, Default) -> maps:get(Field, Defaults, Default) end,
Collection = maps:get(versions, Defaults, tls_all_available),
DefaultVersions = default_tls_vsns(Collection),
[
@ -2047,7 +2045,7 @@ common_ssl_opts_schema(Defaults, Type) ->
sc(
hoconsc:enum([verify_peer, verify_none]),
#{
default => Df("verify", verify_none),
default => Df(verify, verify_none),
desc => ?DESC(common_ssl_opts_schema_verify)
}
)},
@ -2055,7 +2053,7 @@ common_ssl_opts_schema(Defaults, Type) ->
sc(
boolean(),
#{
default => Df("reuse_sessions", true),
default => Df(reuse_sessions, true),
desc => ?DESC(common_ssl_opts_schema_reuse_sessions)
}
)},
@ -2063,7 +2061,7 @@ common_ssl_opts_schema(Defaults, Type) ->
sc(
non_neg_integer(),
#{
default => Df("depth", 10),
default => Df(depth, 10),
desc => ?DESC(common_ssl_opts_schema_depth)
}
)},
@ -2090,7 +2088,7 @@ common_ssl_opts_schema(Defaults, Type) ->
validator => fun(Input) -> validate_tls_versions(Collection, Input) end
}
)},
{"ciphers", ciphers_schema(D("ciphers"))},
{"ciphers", ciphers_schema(D(ciphers))},
{"user_lookup_fun",
sc(
typerefl:alias("string", any()),
@ -2105,7 +2103,7 @@ common_ssl_opts_schema(Defaults, Type) ->
sc(
boolean(),
#{
default => Df("secure_renegotiate", true),
default => Df(secure_renegotiate, true),
desc => ?DESC(common_ssl_opts_schema_secure_renegotiate)
}
)},
@ -2125,7 +2123,7 @@ common_ssl_opts_schema(Defaults, Type) ->
sc(
duration(),
#{
default => Df("hibernate_after", <<"5s">>),
default => Df(hibernate_after, <<"5s">>),
desc => ?DESC(common_ssl_opts_schema_hibernate_after)
}
)}
@ -2134,15 +2132,15 @@ common_ssl_opts_schema(Defaults, Type) ->
%% @doc Make schema for SSL listener options.
-spec server_ssl_opts_schema(map(), boolean()) -> hocon_schema:field_schema().
server_ssl_opts_schema(Defaults, IsRanchListener) ->
D = fun(Field) -> maps:get(to_atom(Field), Defaults, undefined) end,
Df = fun(Field, Default) -> maps:get(to_atom(Field), Defaults, Default) end,
D = fun(Field) -> maps:get(Field, Defaults, undefined) end,
Df = fun(Field, Default) -> maps:get(Field, Defaults, Default) end,
common_ssl_opts_schema(Defaults, server) ++
[
{"dhfile",
sc(
string(),
#{
default => D("dhfile"),
default => D(dhfile),
required => false,
desc => ?DESC(server_ssl_opts_schema_dhfile)
}
@ -2151,7 +2149,7 @@ server_ssl_opts_schema(Defaults, IsRanchListener) ->
sc(
boolean(),
#{
default => Df("fail_if_no_peer_cert", false),
default => Df(fail_if_no_peer_cert, false),
desc => ?DESC(server_ssl_opts_schema_fail_if_no_peer_cert)
}
)},
@ -2159,7 +2157,7 @@ server_ssl_opts_schema(Defaults, IsRanchListener) ->
sc(
boolean(),
#{
default => Df("honor_cipher_order", true),
default => Df(honor_cipher_order, true),
desc => ?DESC(server_ssl_opts_schema_honor_cipher_order)
}
)},
@ -2167,7 +2165,7 @@ server_ssl_opts_schema(Defaults, IsRanchListener) ->
sc(
boolean(),
#{
default => Df("client_renegotiation", true),
default => Df(client_renegotiation, true),
desc => ?DESC(server_ssl_opts_schema_client_renegotiation)
}
)},
@ -2175,7 +2173,7 @@ server_ssl_opts_schema(Defaults, IsRanchListener) ->
sc(
duration(),
#{
default => Df("handshake_timeout", <<"15s">>),
default => Df(handshake_timeout, <<"15s">>),
desc => ?DESC(server_ssl_opts_schema_handshake_timeout)
}
)}
@ -2618,6 +2616,16 @@ validate_heap_size(Siz) when is_integer(Siz) ->
validate_heap_size(_SizStr) ->
{error, invalid_heap_size}.
validate_packet_size(Siz) when is_integer(Siz) andalso Siz < 1 ->
{error, #{reason => max_mqtt_packet_size_too_small, minimum => 1}};
validate_packet_size(Siz) when is_integer(Siz) andalso Siz > ?MAX_INT_MQTT_PACKET_SIZE ->
Max = integer_to_list(round(?MAX_INT_MQTT_PACKET_SIZE / 1024 / 1024)) ++ "M",
{error, #{reason => max_mqtt_packet_size_too_large, maximum => Max}};
validate_packet_size(Siz) when is_integer(Siz) ->
ok;
validate_packet_size(_SizStr) ->
{error, invalid_packet_size}.
validate_keepalive_multiplier(Multiplier) when
is_number(Multiplier) andalso Multiplier >= 1.0 andalso Multiplier =< 65535.0
->
@ -3380,9 +3388,10 @@ mqtt_general() ->
)},
{"max_packet_size",
sc(
mqtt_max_packet_size(),
bytesize(),
#{
default => <<"1MB">>,
validator => fun ?MODULE:validate_packet_size/1,
desc => ?DESC(mqtt_max_packet_size)
}
)},
@ -3648,3 +3657,9 @@ shared_subscription_strategy() ->
desc => ?DESC(broker_shared_subscription_strategy)
}
)}.
default_mem_check_interval() ->
case emqx_os_mon:is_os_check_supported() of
true -> <<"60s">>;
false -> disabled
end.

View File

@ -29,6 +29,7 @@
%% gen_server callbacks
-export([
init/1,
handle_continue/2,
handle_call/3,
handle_cast/2,
handle_info/2,
@ -70,11 +71,14 @@ update(VM) ->
init([]) ->
emqx_logger:set_proc_metadata(#{sysmon => true}),
init_system_monitor(),
{ok, undefined, {continue, setup}}.
handle_continue(setup, undefined) ->
init_system_monitor(),
%% Monitor cluster partition event
ekka:monitor(partition, fun handle_partition_event/1),
{ok, start_timer(#{timer => undefined, events => []})}.
NewState = start_timer(#{timer => undefined, events => []}),
{noreply, NewState, hibernate}.
start_timer(State) ->
State#{timer := emqx_utils:start_timer(timer:seconds(2), reset)}.

View File

@ -19,21 +19,25 @@
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
Childs = [
child_spec(emqx_sys),
child_spec(emqx_alarm),
child_spec(emqx_sys_mon),
child_spec(emqx_os_mon),
child_spec(emqx_vm_mon)
],
{ok, {{one_for_one, 10, 100}, Childs}}.
OsMon =
case emqx_os_mon:is_os_check_supported() of
true -> [child_spec(emqx_os_mon)];
false -> []
end,
Children =
[
child_spec(emqx_sys),
child_spec(emqx_alarm),
child_spec(emqx_sys_mon),
child_spec(emqx_vm_mon)
] ++ OsMon,
{ok, {{one_for_one, 10, 100}, Children}}.
%%--------------------------------------------------------------------
%% Internal functions

View File

@ -62,6 +62,8 @@
[ocsp, issuer_pem]
]).
-define(ALLOW_EMPTY_PEM, [[<<"cacertfile">>], [cacertfile]]).
%% non-empty string
-define(IS_STRING(L), (is_list(L) andalso L =/= [] andalso is_integer(hd(L)))).
%% non-empty list of strings
@ -330,6 +332,13 @@ ensure_ssl_files_per_key(Dir, SSL, [KeyPath | KeyPaths], Opts) ->
ensure_ssl_file(_Dir, _KeyPath, SSL, undefined, _Opts) ->
{ok, SSL};
ensure_ssl_file(_Dir, KeyPath, SSL, MaybePem, _Opts) when
MaybePem =:= "" orelse MaybePem =:= <<"">>
->
case lists:member(KeyPath, ?ALLOW_EMPTY_PEM) of
true -> {ok, SSL};
false -> {error, #{reason => pem_file_path_or_string_is_required}}
end;
ensure_ssl_file(Dir, KeyPath, SSL, MaybePem, Opts) ->
case is_valid_string(MaybePem) of
true ->

View File

@ -28,7 +28,7 @@ format(
#{level := debug, meta := Meta = #{trace_tag := Tag}, msg := Msg},
#{payload_encode := PEncode}
) ->
Time = calendar:system_time_to_rfc3339(erlang:system_time(microsecond), [{unit, microsecond}]),
Time = emqx_utils_calendar:now_to_rfc3339(microsecond),
ClientId = to_iolist(maps:get(clientid, Meta, "")),
Peername = maps:get(peername, Meta, ""),
MetaBin = format_meta(Meta, PEncode),
@ -76,13 +76,8 @@ format_payload(_, hidden) ->
format_payload(Payload, text) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) ->
unicode:characters_to_list(Payload);
format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> binary:encode_hex(Payload);
format_payload(<<Part:100, _/binary>> = Payload, _) ->
[
Part,
"... The ",
integer_to_list(byte_size(Payload) - 100),
" bytes of this log are truncated"
].
format_payload(<<Part:?TRUNCATED_PAYLOAD_SIZE/binary, _/binary>> = Payload, Type) ->
emqx_packet:format_truncated_payload(Part, byte_size(Payload), Type).
to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom);
to_iolist(Int) when is_integer(Int) -> integer_to_list(Int);

View File

@ -44,7 +44,7 @@
get_otp_version/0
]).
-export([cpu_util/0]).
-export([cpu_util/0, cpu_util/1]).
-ifdef(TEST).
-compile(export_all).
@ -378,16 +378,25 @@ avg15() ->
cpu_util() ->
compat_windows(fun cpu_sup:util/0).
cpu_util(Args) ->
compat_windows(fun cpu_sup:util/1, Args).
compat_windows(Fun) ->
case os:type() of
{win32, nt} ->
0.0;
_Type ->
case catch Fun() of
Val when is_float(Val) -> floor(Val * 100) / 100;
Val when is_number(Val) -> Val;
_Error -> 0.0
end
case compat_windows(Fun, []) of
Val when is_float(Val) -> floor(Val * 100) / 100;
Val when is_number(Val) -> Val;
_ -> 0.0
end.
compat_windows(Fun, Args) ->
try
case emqx_os_mon:is_os_check_supported() of
false -> 0.0;
true when Args =:= [] -> Fun();
true -> Fun(Args)
end
catch
_:_ -> 0.0
end.
load(Avg) ->

View File

@ -55,6 +55,11 @@
-type config() :: #{atom() => scalar() | [scalar()] | config() | [config()]}.
-type scalar() :: atom() | number() | string() | binary().
-type hookfun(R) ::
fun(() -> R)
| fun((appname()) -> R)
| fun((appname(), appspec_opts()) -> R).
-type appspec_opts() :: #{
%% 1. Enable loading application config
%% If not defined or set to `false`, this step will be skipped.
@ -70,19 +75,19 @@
%% 3. Perform anything right before starting the application
%% If not defined or set to `false`, this step will be skipped.
%% Merging amounts to redefining.
before_start => fun(() -> _) | fun((appname()) -> _) | false,
before_start => hookfun(_) | false,
%% 4. Starting the application
%% If not defined or set to `true`, `application:ensure_all_started/1` is used.
%% If custom function is used, it should return list of all applications that were started.
%% If set to `false`, application will not be started.
%% Merging amounts to redefining.
start => fun(() -> {ok, [appname()]}) | fun((appname()) -> {ok, [appname()]}) | boolean(),
start => hookfun({ok, [appname()]}) | boolean(),
%% 5. Perform anything right after starting the application
%% If not defined or set to `false`, this step will be skipped.
%% Merging amounts to redefining.
after_start => fun(() -> _) | fun((appname()) -> _) | false
after_start => hookfun(_) | false
}.
%% @doc Start applications with a clean slate.
@ -214,29 +219,30 @@ maybe_override_env(App, #{override_env := Env = [{_, _} | _]}) ->
maybe_override_env(_App, #{}) ->
ok.
maybe_before_start(App, #{before_start := Fun}) when is_function(Fun, 1) ->
Fun(App);
maybe_before_start(_App, #{before_start := Fun}) when is_function(Fun, 0) ->
Fun();
maybe_before_start(App, #{before_start := Fun} = Opts) when is_function(Fun) ->
apply_hookfun(Fun, App, Opts);
maybe_before_start(_App, #{}) ->
ok.
maybe_start(_App, #{start := false}) ->
{ok, []};
maybe_start(_App, #{start := Fun}) when is_function(Fun, 0) ->
Fun();
maybe_start(App, #{start := Fun}) when is_function(Fun, 1) ->
Fun(App);
maybe_start(App, #{start := Fun} = Opts) when is_function(Fun) ->
apply_hookfun(Fun, App, Opts);
maybe_start(App, #{}) ->
application:ensure_all_started(App).
maybe_after_start(App, #{after_start := Fun}) when is_function(Fun, 1) ->
Fun(App);
maybe_after_start(_App, #{after_start := Fun}) when is_function(Fun, 0) ->
Fun();
maybe_after_start(App, #{after_start := Fun} = Opts) when is_function(Fun) ->
apply_hookfun(Fun, App, Opts);
maybe_after_start(_App, #{}) ->
ok.
apply_hookfun(Fun, _App, _Opts) when is_function(Fun, 0) ->
Fun();
apply_hookfun(Fun, App, _Opts) when is_function(Fun, 1) ->
Fun(App);
apply_hookfun(Fun, App, Opts) when is_function(Fun, 2) ->
Fun(App, Opts).
-spec merge_appspec(appspec_opts(), appspec_opts()) ->
appspec_opts().
merge_appspec(Opts1, Opts2) ->
@ -270,7 +276,11 @@ default_appspec(ekka, _SuiteOpts) ->
};
default_appspec(emqx, SuiteOpts) ->
#{
override_env => [{data_dir, maps:get(work_dir, SuiteOpts, "data")}]
override_env => [{data_dir, maps:get(work_dir, SuiteOpts, "data")}],
% NOTE
% We inform `emqx` of our config loader before starting it so that it won't
% overwrite everything with a default configuration.
before_start => fun inhibit_config_loader/2
};
default_appspec(emqx_authz, _SuiteOpts) ->
#{
@ -307,9 +317,7 @@ default_appspec(emqx_conf, SuiteOpts) ->
% NOTE
% We inform `emqx` of our config loader before starting `emqx_conf` so that it won't
% overwrite everything with a default configuration.
before_start => fun() ->
emqx_app:set_config_loader(?MODULE)
end
before_start => fun inhibit_config_loader/2
};
default_appspec(emqx_dashboard, _SuiteOpts) ->
#{
@ -329,6 +337,11 @@ start_ekka() ->
ok = emqx_common_test_helpers:start_ekka(),
{ok, [mnesia, ekka]}.
inhibit_config_loader(_App, #{config := Config}) when Config /= false ->
ok = emqx_app:set_config_loader(?MODULE);
inhibit_config_loader(_App, #{}) ->
ok.
%%
-spec stop(_StartedApps :: [appname()]) ->

View File

@ -20,31 +20,27 @@
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
%% update global default config
{ok, _} = emqx:update_config(
[flapping_detect],
#{
<<"enable">> => true,
<<"max_count">> => 3,
% 0.1s
<<"window_time">> => <<"100ms">>,
%% 2s
<<"ban_time">> => <<"2s">>
}
Apps = emqx_cth_suite:start(
[
{emqx,
"flapping_detect {"
"\n enable = true"
"\n max_count = 3"
"\n window_time = 100ms"
"\n ban_time = 2s"
"\n }"}
],
#{work_dir => ?config(priv_dir, Config)}
),
Config.
[{suite_apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]),
%% Clean emqx_banned table
mria_mnesia:delete_schema(),
ok.
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)).
t_detect_check(_) ->
ClientInfo = #{

View File

@ -39,29 +39,47 @@ init_per_testcase(t_cpu_check_alarm, Config) ->
%% 200ms
cpu_check_interval => 200
}),
ok = supervisor:terminate_child(emqx_sys_sup, emqx_os_mon),
{ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon),
restart_os_mon(),
Config;
init_per_testcase(t_sys_mem_check_alarm, Config) ->
case emqx_os_mon:is_sysmem_check_supported() of
case emqx_os_mon:is_os_check_supported() of
true ->
SysMon = emqx_config:get([sysmon, os], #{}),
emqx_config:put([sysmon, os], SysMon#{
sysmem_high_watermark => 0.51,
%% 200ms
mem_check_interval => 200
}),
ok = supervisor:terminate_child(emqx_sys_sup, emqx_os_mon),
{ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon),
Config;
});
false ->
Config
end;
ok
end,
restart_os_mon(),
Config;
init_per_testcase(_, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
restart_os_mon(),
Config.
restart_os_mon() ->
case emqx_os_mon:is_os_check_supported() of
true ->
ok = supervisor:terminate_child(emqx_sys_sup, emqx_os_mon),
{ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon);
false ->
_ = supervisor:terminate_child(emqx_sys_sup, emqx_os_mon),
_ = supervisor:delete_child(emqx_sys_sup, emqx_os_mon),
%% run test on mac/windows.
Mod = emqx_os_mon,
OsMon = #{
id => Mod,
start => {Mod, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [Mod]
},
{ok, _} = supervisor:start_child(emqx_sys_sup, OsMon)
end.
t_api(_) ->
?assertEqual(0.7, emqx_os_mon:get_sysmem_high_watermark()),
?assertEqual(ok, emqx_os_mon:set_sysmem_high_watermark(0.8)),
@ -81,7 +99,7 @@ t_api(_) ->
ok.
t_sys_mem_check_disable(Config) ->
case emqx_os_mon:is_sysmem_check_supported() of
case emqx_os_mon:is_os_check_supported() of
true -> do_sys_mem_check_disable(Config);
false -> skip
end.
@ -100,7 +118,7 @@ do_sys_mem_check_disable(_Config) ->
ok.
t_sys_mem_check_alarm(Config) ->
case emqx_os_mon:is_sysmem_check_supported() of
case emqx_os_mon:is_os_check_supported() of
true -> do_sys_mem_check_alarm(Config);
false -> skip
end.
@ -167,7 +185,7 @@ t_cpu_check_alarm(_) ->
util,
fun() -> CpuUtil end,
fun() ->
timer:sleep(500),
timer:sleep(1000),
Alarms = emqx_alarm:get_alarms(activated),
?assert(
emqx_vm_mon_SUITE:is_existing(high_cpu_usage, emqx_alarm:get_alarms(activated))
@ -193,7 +211,7 @@ t_cpu_check_alarm(_) ->
?assert(is_binary(Msg)),
emqx_config:put([sysmon, os, cpu_high_watermark], 1),
emqx_config:put([sysmon, os, cpu_low_watermark], 0.96),
timer:sleep(500),
timer:sleep(800),
?assertNot(
emqx_vm_mon_SUITE:is_existing(high_cpu_usage, emqx_alarm:get_alarms(activated))
)

View File

@ -113,11 +113,22 @@ ssl_files_failure_test_() ->
})
)
end},
{"empty_cacertfile", fun() ->
?assertMatch(
{ok, _},
emqx_tls_lib:ensure_ssl_files("/tmp", #{
<<"keyfile">> => test_key(),
<<"certfile">> => test_key(),
<<"cacertfile">> => <<"">>
})
)
end},
{"bad_pem_string", fun() ->
%% empty string
?assertMatch(
{error, #{
reason := invalid_file_path_or_pem_string, which_options := [[<<"keyfile">>]]
reason := pem_file_path_or_string_is_required,
which_options := [[<<"keyfile">>]]
}},
emqx_tls_lib:ensure_ssl_files("/tmp", #{
<<"keyfile">> => <<>>,

View File

@ -311,6 +311,60 @@ t_client_event(_Config) ->
?assert(erlang:byte_size(Bin3) > 0),
ok.
t_client_huge_payload_truncated(_Config) ->
ClientId = <<"client-truncated1">>,
Now = erlang:system_time(second),
Name = <<"test_client_id_truncated1">>,
{ok, _} = emqx_trace:create([
{<<"name">>, Name},
{<<"type">>, clientid},
{<<"clientid">>, ClientId},
{<<"start_at">>, Now}
]),
ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
{ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(Client),
emqtt:ping(Client),
NormalPayload = iolist_to_binary(lists:duplicate(1024, "x")),
ok = emqtt:publish(Client, <<"/test">>, #{}, NormalPayload, [{qos, 0}]),
HugePayload1 = iolist_to_binary(lists:duplicate(1025, "y")),
ok = emqtt:publish(Client, <<"/test">>, #{}, HugePayload1, [{qos, 0}]),
HugePayload2 = iolist_to_binary(lists:duplicate(1024 * 10, "y")),
ok = emqtt:publish(Client, <<"/test">>, #{}, HugePayload2, [{qos, 0}]),
ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
{ok, _} = emqx_trace:create([
{<<"name">>, <<"test_topic">>},
{<<"type">>, topic},
{<<"topic">>, <<"/test">>},
{<<"start_at">>, Now}
]),
ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic),
{ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)),
ok = emqtt:publish(Client, <<"/test">>, #{}, NormalPayload, [{qos, 0}]),
ok = emqtt:publish(Client, <<"/test">>, #{}, HugePayload1, [{qos, 0}]),
ok = emqtt:publish(Client, <<"/test">>, #{}, HugePayload2, [{qos, 0}]),
ok = emqtt:disconnect(Client),
ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic),
{ok, Bin2} = file:read_file(emqx_trace:log_file(Name, Now)),
{ok, Bin3} = file:read_file(emqx_trace:log_file(<<"test_topic">>, Now)),
ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]),
?assert(erlang:byte_size(Bin) > 1024),
?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)),
?assert(erlang:byte_size(Bin3) > 1024),
%% Don't have format crash
CrashBin = <<"CRASH">>,
?assertEqual(nomatch, binary:match(Bin, [CrashBin])),
?assertEqual(nomatch, binary:match(Bin2, [CrashBin])),
?assertEqual(nomatch, binary:match(Bin3, [CrashBin])),
%% have "this log are truncated" for huge payload
TruncatedLog = <<"this log are truncated">>,
?assertNotEqual(nomatch, binary:match(Bin, [TruncatedLog])),
?assertNotEqual(nomatch, binary:match(Bin2, [TruncatedLog])),
?assertNotEqual(nomatch, binary:match(Bin3, [TruncatedLog])),
ok.
t_get_log_filename(_Config) ->
Now = erlang:system_time(second),
Name = <<"name1">>,

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.1.6"},
{vsn, "0.1.7"},
{registered, []},
{applications, [
kernel,

View File

@ -113,6 +113,22 @@ fields(connector_config) ->
];
fields(producer) ->
[
{attributes_template,
sc(
hoconsc:array(ref(key_value_pair)),
#{
default => [],
desc => ?DESC("attributes_template")
}
)},
{ordering_key_template,
sc(
binary(),
#{
default => <<>>,
desc => ?DESC("ordering_key_template")
}
)},
{payload_template,
sc(
binary(),
@ -203,6 +219,18 @@ fields("consumer_resource_opts") ->
fun({Field, _Sc}) -> lists:member(Field, SupportedFields) end,
ResourceFields
);
fields(key_value_pair) ->
[
{key,
mk(binary(), #{
required => true,
validator => [
emqx_resource_validator:not_empty("Key templates must not be empty")
],
desc => ?DESC(kv_pair_key)
})},
{value, mk(binary(), #{required => true, desc => ?DESC(kv_pair_value)})}
];
fields("get_producer") ->
emqx_bridge_schema:status_fields() ++ fields("post_producer");
fields("post_producer") ->
@ -218,6 +246,8 @@ fields("put_consumer") ->
desc("config_producer") ->
?DESC("desc_config");
desc(key_value_pair) ->
?DESC("kv_pair_desc");
desc("config_consumer") ->
?DESC("desc_config");
desc("consumer_resource_opts") ->

View File

@ -9,15 +9,20 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-type config() :: #{
attributes_template := [#{key := binary(), value := binary()}],
connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(),
ordering_key_template := binary(),
payload_template := binary(),
pubsub_topic := binary(),
resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(),
any() => term()
}.
-type state() :: #{
attributes_template := #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()},
client := emqx_bridge_gcp_pubsub_client:state(),
ordering_key_template := emqx_placeholder:tmpl_token(),
payload_template := emqx_placeholder:tmpl_token(),
project_id := emqx_bridge_gcp_pubsub_client:project_id(),
pubsub_topic := binary()
@ -57,6 +62,8 @@ on_start(InstanceId, Config0) ->
}),
Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0),
#{
attributes_template := AttributesTemplate,
ordering_key_template := OrderingKeyTemplate,
payload_template := PayloadTemplate,
pubsub_topic := PubSubTopic,
service_account_json := #{<<"project_id">> := ProjectId}
@ -65,6 +72,8 @@ on_start(InstanceId, Config0) ->
{ok, Client} ->
State = #{
client => Client,
attributes_template => preproc_attributes(AttributesTemplate),
ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate),
payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate),
project_id => ProjectId,
pubsub_topic => PubSubTopic
@ -197,14 +206,107 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs0) ->
Request, ReplyFunAndArgs, Client
).
-spec encode_payload(state(), Selected :: map()) -> #{data := binary()}.
encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) ->
Interpolated =
case PayloadTemplate of
[] -> emqx_utils_json:encode(Selected);
_ -> emqx_placeholder:proc_tmpl(PayloadTemplate, Selected)
-spec encode_payload(state(), Selected :: map()) ->
#{
data := binary(),
attributes => #{binary() => binary()},
'orderingKey' => binary()
}.
encode_payload(State, Selected) ->
#{
attributes_template := AttributesTemplate,
ordering_key_template := OrderingKeyTemplate,
payload_template := PayloadTemplate
} = State,
Data = render_payload(PayloadTemplate, Selected),
OrderingKey = render_key(OrderingKeyTemplate, Selected),
Attributes = proc_attributes(AttributesTemplate, Selected),
Payload0 = #{data => base64:encode(Data)},
Payload1 = put_if(Payload0, attributes, Attributes, map_size(Attributes) > 0),
put_if(Payload1, 'orderingKey', OrderingKey, OrderingKey =/= <<>>).
put_if(Acc, K, V, true) ->
Acc#{K => V};
put_if(Acc, _K, _V, false) ->
Acc.
-spec render_payload(emqx_placeholder:tmpl_token(), map()) -> binary().
render_payload([] = _Template, Selected) ->
emqx_utils_json:encode(Selected);
render_payload(Template, Selected) ->
render_value(Template, Selected).
render_key(Template, Selected) ->
Opts = #{
return => full_binary,
var_trans => fun
(_Var, undefined) ->
<<>>;
(Var, X) when is_boolean(X) ->
throw({bad_value_for_key, Var, X});
(_Var, X) when is_binary(X); is_number(X); is_atom(X) ->
emqx_utils_conv:bin(X);
(Var, X) ->
throw({bad_value_for_key, Var, X})
end
},
try
emqx_placeholder:proc_tmpl(Template, Selected, Opts)
catch
throw:{bad_value_for_key, Var, X} ->
?tp(
warning,
"gcp_pubsub_producer_bad_value_for_key",
#{
placeholder => Var,
value => X,
action => "key ignored",
hint => "only plain values like strings and numbers can be used in keys"
}
),
<<>>
end.
render_value(Template, Selected) ->
Opts = #{
return => full_binary,
var_trans => fun
(undefined) -> <<>>;
(X) -> emqx_utils_conv:bin(X)
end
},
emqx_placeholder:proc_tmpl(Template, Selected, Opts).
-spec preproc_attributes([#{key := binary(), value := binary()}]) ->
#{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()}.
preproc_attributes(AttributesTemplate) ->
lists:foldl(
fun(#{key := K, value := V}, Acc) ->
KT = emqx_placeholder:preproc_tmpl(K),
VT = emqx_placeholder:preproc_tmpl(V),
Acc#{KT => VT}
end,
#{data => base64:encode(Interpolated)}.
#{},
AttributesTemplate
).
-spec proc_attributes(#{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()}, map()) ->
#{binary() => binary()}.
proc_attributes(AttributesTemplate, Selected) ->
maps:fold(
fun(KT, VT, Acc) ->
K = render_key(KT, Selected),
case K =:= <<>> of
true ->
Acc;
false ->
V = render_value(VT, Selected),
Acc#{K => V}
end
end,
#{},
AttributesTemplate
).
-spec to_pubsub_request([#{data := binary()}]) -> binary().
to_pubsub_request(Payloads) ->

View File

@ -63,7 +63,9 @@ single_config_tests() ->
t_get_status_down,
t_get_status_no_worker,
t_get_status_timeout_calling_workers,
t_on_start_ehttpc_pool_already_started
t_on_start_ehttpc_pool_already_started,
t_attributes,
t_bad_attributes
].
only_sync_tests() ->
@ -212,7 +214,9 @@ create_bridge_http(Config, GCPPubSubConfigOverrides) ->
Error
end,
ct:pal("bridge creation result: ~p", [Res]),
?assertEqual(element(1, ProbeResult), element(1, Res)),
?assertEqual(element(1, ProbeResult), element(1, Res), #{
creation_result => Res, probe_result => ProbeResult
}),
case ProbeResult of
{error, {{_, 500, _}, _, _}} -> error({bad_probe_result, ProbeResult});
_ -> ok
@ -456,6 +460,7 @@ assert_valid_request_headers(Headers, ServiceAccountJSON) ->
assert_valid_request_body(Body) ->
BodyMap = emqx_utils_json:decode(Body, [return_maps]),
?assertMatch(#{<<"messages">> := [_ | _]}, BodyMap),
ct:pal("request: ~p", [BodyMap]),
#{<<"messages">> := Messages} = BodyMap,
lists:map(
fun(Msg) ->
@ -480,6 +485,31 @@ assert_http_request(ServiceAccountJSON) ->
error({timeout, #{mailbox => Mailbox}})
end.
receive_http_requests(ServiceAccountJSON, Opts) ->
Default = #{n => 1},
#{n := N} = maps:merge(Default, Opts),
lists:flatmap(fun(_) -> receive_http_request(ServiceAccountJSON) end, lists:seq(1, N)).
receive_http_request(ServiceAccountJSON) ->
receive
{http, Headers, Body} ->
ct:pal("received publish:\n ~p", [#{headers => Headers, body => Body}]),
assert_valid_request_headers(Headers, ServiceAccountJSON),
#{<<"messages">> := Msgs} = emqx_utils_json:decode(Body, [return_maps]),
lists:map(
fun(Msg) ->
#{<<"data">> := Content64} = Msg,
Content = base64:decode(Content64),
Decoded = emqx_utils_json:decode(Content, [return_maps]),
Msg#{<<"data">> := Decoded}
end,
Msgs
)
after 5_000 ->
{messages, Mailbox} = process_info(self(), messages),
error({timeout, #{mailbox => Mailbox}})
end.
install_telemetry_handler(TestCase) ->
Tid = ets:new(TestCase, [ordered_set, public]),
HandlerId = TestCase,
@ -585,8 +615,8 @@ t_publish_success(Config) ->
<<"topic">> := Topic,
<<"payload">> := Payload,
<<"metadata">> := #{<<"rule_id">> := RuleId}
}
],
} = Msg
] when not (is_map_key(<<"attributes">>, Msg) orelse is_map_key(<<"orderingKey">>, Msg)),
DecodedMessages
),
%% to avoid test flakiness
@ -1524,3 +1554,251 @@ t_query_sync(Config) ->
[]
),
ok.
t_attributes(Config) ->
Name = ?config(gcp_pubsub_name, Config),
ServiceAccountJSON = ?config(service_account_json, Config),
LocalTopic = <<"t/topic">>,
?check_trace(
begin
{ok, _} = create_bridge_http(
Config,
#{
<<"local_topic">> => LocalTopic,
<<"attributes_template">> =>
[
#{
<<"key">> => <<"${.payload.key}">>,
<<"value">> => <<"fixed_value">>
},
#{
<<"key">> => <<"${.payload.key}2">>,
<<"value">> => <<"${.payload.value}">>
},
#{
<<"key">> => <<"fixed_key">>,
<<"value">> => <<"fixed_value">>
},
#{
<<"key">> => <<"fixed_key2">>,
<<"value">> => <<"${.payload.value}">>
}
],
<<"ordering_key_template">> => <<"${.payload.ok}">>
}
),
%% without ordering key
Payload0 =
emqx_utils_json:encode(
#{
<<"value">> => <<"payload_value">>,
<<"key">> => <<"payload_key">>
}
),
Message0 = emqx_message:make(LocalTopic, Payload0),
emqx:publish(Message0),
DecodedMessages0 = receive_http_request(ServiceAccountJSON),
?assertMatch(
[
#{
<<"attributes">> :=
#{
<<"fixed_key">> := <<"fixed_value">>,
<<"fixed_key2">> := <<"payload_value">>,
<<"payload_key">> := <<"fixed_value">>,
<<"payload_key2">> := <<"payload_value">>
},
<<"data">> := #{
<<"topic">> := _,
<<"payload">> := _
}
} = Msg
] when not is_map_key(<<"orderingKey">>, Msg),
DecodedMessages0
),
%% with ordering key
Payload1 =
emqx_utils_json:encode(
#{
<<"value">> => <<"payload_value">>,
<<"key">> => <<"payload_key">>,
<<"ok">> => <<"ordering_key">>
}
),
Message1 = emqx_message:make(LocalTopic, Payload1),
emqx:publish(Message1),
DecodedMessages1 = receive_http_request(ServiceAccountJSON),
?assertMatch(
[
#{
<<"attributes">> :=
#{
<<"fixed_key">> := <<"fixed_value">>,
<<"fixed_key2">> := <<"payload_value">>,
<<"payload_key">> := <<"fixed_value">>,
<<"payload_key2">> := <<"payload_value">>
},
<<"orderingKey">> := <<"ordering_key">>,
<<"data">> := #{
<<"topic">> := _,
<<"payload">> := _
}
}
],
DecodedMessages1
),
%% will result in empty key
Payload2 =
emqx_utils_json:encode(
#{
<<"value">> => <<"payload_value">>,
<<"ok">> => <<"ordering_key">>
}
),
Message2 = emqx_message:make(LocalTopic, Payload2),
emqx:publish(Message2),
[DecodedMessage2] = receive_http_request(ServiceAccountJSON),
?assertEqual(
#{
<<"fixed_key">> => <<"fixed_value">>,
<<"fixed_key2">> => <<"payload_value">>,
<<"2">> => <<"payload_value">>
},
maps:get(<<"attributes">>, DecodedMessage2)
),
%% ensure loading cluster override file doesn't mangle the attribute
%% placeholders...
#{<<"bridges">> := #{?BRIDGE_TYPE_BIN := #{Name := RawConf}}} =
emqx_config:read_override_conf(#{override_to => cluster}),
?assertEqual(
[
#{
<<"key">> => <<"${.payload.key}">>,
<<"value">> => <<"fixed_value">>
},
#{
<<"key">> => <<"${.payload.key}2">>,
<<"value">> => <<"${.payload.value}">>
},
#{
<<"key">> => <<"fixed_key">>,
<<"value">> => <<"fixed_value">>
},
#{
<<"key">> => <<"fixed_key2">>,
<<"value">> => <<"${.payload.value}">>
}
],
maps:get(<<"attributes_template">>, RawConf)
),
ok
end,
[]
),
ok.
t_bad_attributes(Config) ->
ServiceAccountJSON = ?config(service_account_json, Config),
LocalTopic = <<"t/topic">>,
?check_trace(
begin
{ok, _} = create_bridge_http(
Config,
#{
<<"local_topic">> => LocalTopic,
<<"attributes_template">> =>
[
#{
<<"key">> => <<"${.payload.key}">>,
<<"value">> => <<"${.payload.value}">>
}
],
<<"ordering_key_template">> => <<"${.payload.ok}">>
}
),
%% Ok: attribute value is a map or list
lists:foreach(
fun(OkValue) ->
Payload0 =
emqx_utils_json:encode(
#{
<<"ok">> => <<"ord_key">>,
<<"value">> => OkValue,
<<"key">> => <<"attr_key">>
}
),
Message0 = emqx_message:make(LocalTopic, Payload0),
emqx:publish(Message0)
end,
[
#{<<"some">> => <<"map">>},
[1, <<"str">>, #{<<"deep">> => true}]
]
),
DecodedMessages0 = receive_http_requests(ServiceAccountJSON, #{n => 1}),
?assertMatch(
[
#{
<<"attributes">> :=
#{<<"attr_key">> := <<"{\"some\":\"map\"}">>},
<<"orderingKey">> := <<"ord_key">>
},
#{
<<"attributes">> :=
#{<<"attr_key">> := <<"[1,\"str\",{\"deep\":true}]">>},
<<"orderingKey">> := <<"ord_key">>
}
],
DecodedMessages0
),
%% Bad: key is not a plain value
lists:foreach(
fun(BadKey) ->
Payload1 =
emqx_utils_json:encode(
#{
<<"value">> => <<"v">>,
<<"key">> => BadKey,
<<"ok">> => BadKey
}
),
Message1 = emqx_message:make(LocalTopic, Payload1),
emqx:publish(Message1)
end,
[
#{<<"some">> => <<"map">>},
[1, <<"list">>, true],
true,
false
]
),
DecodedMessages1 = receive_http_request(ServiceAccountJSON),
lists:foreach(
fun(DMsg) ->
?assertNot(is_map_key(<<"orderingKey">>, DMsg), #{decoded_message => DMsg}),
?assertNot(is_map_key(<<"attributes">>, DMsg), #{decoded_message => DMsg}),
ok
end,
DecodedMessages1
),
ok
end,
fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]),
?assertMatch(
[
#{placeholder := [<<"payload">>, <<"ok">>], value := #{}},
#{placeholder := [<<"payload">>, <<"key">>], value := #{}},
#{placeholder := [<<"payload">>, <<"ok">>], value := [_ | _]},
#{placeholder := [<<"payload">>, <<"key">>], value := [_ | _]},
#{placeholder := [<<"payload">>, <<"ok">>], value := true},
#{placeholder := [<<"payload">>, <<"key">>], value := true},
#{placeholder := [<<"payload">>, <<"ok">>], value := false},
#{placeholder := [<<"payload">>, <<"key">>], value := false}
],
?of_kind("gcp_pubsub_producer_bad_value_for_key", Trace)
),
ok
end
),
ok.

View File

@ -0,0 +1,149 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_gcp_pubsub_tests).
-include_lib("eunit/include/eunit.hrl").
%%===========================================================================
%% Data section
%%===========================================================================
%% erlfmt-ignore
gcp_pubsub_producer_hocon() ->
"""
bridges.gcp_pubsub.my_producer {
attributes_template = [
{key = \"${payload.key}\", value = fixed_value}
{key = \"${payload.key}2\", value = \"${.payload.value}\"}
{key = fixed_key, value = fixed_value}
{key = fixed_key2, value = \"${.payload.value}\"}
]
connect_timeout = 15s
enable = false
local_topic = \"t/gcp/produ\"
max_retries = 2
ordering_key_template = \"${.payload.ok}\"
payload_template = \"${.}\"
pipelining = 100
pool_size = 8
pubsub_topic = my-topic
resource_opts {
batch_size = 1
batch_time = 0ms
health_check_interval = 15s
inflight_window = 100
max_buffer_bytes = 256MB
query_mode = async
request_ttl = 15s
start_after_created = true
start_timeout = 5s
worker_pool_size = 16
}
service_account_json {
auth_provider_x509_cert_url = \"https://www.googleapis.com/oauth2/v1/certs\"
auth_uri = \"https://accounts.google.com/o/oauth2/auth\"
client_email = \"test@myproject.iam.gserviceaccount.com\"
client_id = \"123812831923812319190\"
client_x509_cert_url = \"https://www.googleapis.com/robot/v1/metadata/x509/...\"
private_key = \"-----BEGIN PRIVATE KEY-----...\"
private_key_id = \"kid\"
project_id = myproject
token_uri = \"https://oauth2.googleapis.com/token\"
type = service_account
}
}
""".
%%===========================================================================
%% Helper functions
%%===========================================================================
parse(Hocon) ->
{ok, Conf} = hocon:binary(Hocon),
Conf.
check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf).
-define(validation_error(Reason, Value),
{emqx_bridge_schema, [
#{
kind := validation_error,
reason := Reason,
value := Value
}
]}
).
-define(ok_config(Cfg), #{
<<"bridges">> :=
#{
<<"gcp_pubsub">> :=
#{
<<"my_producer">> :=
Cfg
}
}
}).
%%===========================================================================
%% Test cases
%%===========================================================================
producer_attributes_validator_test_() ->
%% ensure this module is loaded when testing only this file
_ = emqx_bridge_enterprise:module_info(),
BaseConf = parse(gcp_pubsub_producer_hocon()),
Override = fun(Cfg) ->
emqx_utils_maps:deep_merge(
BaseConf,
#{
<<"bridges">> =>
#{
<<"gcp_pubsub">> =>
#{<<"my_producer">> => Cfg}
}
}
)
end,
[
{"base config",
?_assertMatch(
?ok_config(#{
<<"attributes_template">> := [_, _, _, _]
}),
check(BaseConf)
)},
{"empty key template",
?_assertThrow(
?validation_error("Key templates must not be empty", _),
check(
Override(#{
<<"attributes_template">> => [
#{
<<"key">> => <<>>,
<<"value">> => <<"some_value">>
}
]
})
)
)},
{"empty value template",
?_assertMatch(
?ok_config(#{
<<"attributes_template">> := [_]
}),
check(
Override(#{
<<"attributes_template">> => [
#{
<<"key">> => <<"some_key">>,
<<"value">> => <<>>
}
]
})
)
)}
].

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_kinesis, [
{description, "EMQX Enterprise Amazon Kinesis Bridge"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [
kernel,

View File

@ -111,7 +111,14 @@ init(#{
erlcloud_config:configure(
to_str(AwsAccessKey), to_str(AwsSecretAccessKey), Host, Port, Scheme, New
),
{ok, State}.
% check the connection
case erlcloud_kinesis:list_streams() of
{ok, _} ->
{ok, State};
{error, Reason} ->
?tp(kinesis_init_failed, #{instance_id => InstanceId, reason => Reason}),
{stop, Reason}
end.
handle_call(connection_status, _From, #{stream_name := StreamName} = State) ->
Status =

View File

@ -114,7 +114,12 @@ on_get_status(_InstanceId, #{pool_name := Pool} = State) ->
false -> disconnected
end
end;
{error, _} ->
{error, Reason} ->
?SLOG(error, #{
msg => "kinesis_producer_get_status_failed",
state => State,
reason => Reason
}),
disconnected
end.

View File

@ -796,7 +796,9 @@ t_publish_connection_down(Config0) ->
ok.
t_wrong_server(Config) ->
TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(kinesis_name, Config),
KinesisConfig0 = ?config(kinesis_config, Config),
ResourceId = ?config(resource_id, Config),
Overrides =
#{
@ -806,12 +808,57 @@ t_wrong_server(Config) ->
<<"health_check_interval">> => <<"60s">>
}
},
% probe
KinesisConfig = emqx_utils_maps:deep_merge(KinesisConfig0, Overrides),
Params = KinesisConfig#{<<"type">> => TypeBin, <<"name">> => Name},
ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(post, ProbePath, "", AuthHeader, Params)
),
% create
?wait_async_action(
create_bridge(Config, Overrides),
#{?snk_kind := emqx_bridge_kinesis_impl_producer_start_ok},
#{?snk_kind := start_pool_failed},
30_000
),
?assertEqual({error, timeout}, emqx_resource_manager:health_check(ResourceId)),
emqx_bridge_resource:stop(?BRIDGE_TYPE, Name),
emqx_bridge_resource:remove(?BRIDGE_TYPE, Name),
?assertMatch(
{ok, _, #{error := {start_pool_failed, ResourceId, _}}},
emqx_resource_manager:lookup_cached(ResourceId)
),
ok.
t_access_denied(Config) ->
TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(kinesis_name, Config),
KinesisConfig = ?config(kinesis_config, Config),
ResourceId = ?config(resource_id, Config),
AccessError = {<<"AccessDeniedException">>, <<>>},
Params = KinesisConfig#{<<"type">> => TypeBin, <<"name">> => Name},
ProbePath = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
emqx_common_test_helpers:with_mock(
erlcloud_kinesis,
list_streams,
fun() -> {error, AccessError} end,
fun() ->
% probe
?assertMatch(
{error, {_, 400, _}},
emqx_mgmt_api_test_util:request_api(post, ProbePath, "", AuthHeader, Params)
),
% create
?wait_async_action(
create_bridge(Config),
#{?snk_kind := kinesis_init_failed},
30_000
),
?assertMatch(
{ok, _, #{error := {start_pool_failed, ResourceId, AccessError}}},
emqx_resource_manager:lookup_cached(ResourceId)
),
ok
end
),
ok.

View File

@ -29,7 +29,8 @@ group_tests() ->
t_payload_template,
t_collection_template,
t_mongo_date_rule_engine_functions,
t_get_status_server_selection_too_short
t_get_status_server_selection_too_short,
t_use_legacy_protocol_option
].
groups() ->
@ -180,6 +181,7 @@ mongo_config(MongoHost, MongoPort0, rs = Type, Config) ->
" replica_set_name = rs0\n"
" servers = [~p]\n"
" w_mode = safe\n"
" use_legacy_protocol = auto\n"
" database = mqtt\n"
" resource_opts = {\n"
" query_mode = ~s\n"
@ -205,6 +207,7 @@ mongo_config(MongoHost, MongoPort0, sharded = Type, Config) ->
" collection = mycol\n"
" servers = [~p]\n"
" w_mode = safe\n"
" use_legacy_protocol = auto\n"
" database = mqtt\n"
" resource_opts = {\n"
" query_mode = ~s\n"
@ -230,6 +233,7 @@ mongo_config(MongoHost, MongoPort0, single = Type, Config) ->
" collection = mycol\n"
" server = ~p\n"
" w_mode = safe\n"
" use_legacy_protocol = auto\n"
" database = mqtt\n"
" resource_opts = {\n"
" query_mode = ~s\n"
@ -286,10 +290,8 @@ clear_db(Config) ->
mongo_api:disconnect(Client).
find_all(Config) ->
Type = mongo_type_bin(?config(mongo_type, Config)),
Name = ?config(mongo_name, Config),
#{<<"collection">> := Collection} = ?config(mongo_config, Config),
ResourceID = emqx_bridge_resource:resource_id(Type, Name),
ResourceID = resource_id(Config),
emqx_resource:simple_sync_query(ResourceID, {find, Collection, #{}, #{}}).
find_all_wait_until_non_empty(Config) ->
@ -340,6 +342,27 @@ probe_bridge_api(Config, Overrides) ->
ct:pal("bridge probe result: ~p", [Res]),
Res.
resource_id(Config) ->
Type0 = ?config(mongo_type, Config),
Name = ?config(mongo_name, Config),
Type = mongo_type_bin(Type0),
emqx_bridge_resource:resource_id(Type, Name).
get_worker_pids(Config) ->
ResourceID = resource_id(Config),
%% abusing health check api a bit...
GetWorkerPid = fun(TopologyPid) ->
mongoc:transaction_query(TopologyPid, fun(#{pool := WorkerPid}) -> WorkerPid end)
end,
{ok, WorkerPids = [_ | _]} =
emqx_resource_pool:health_check_workers(
ResourceID,
GetWorkerPid,
5_000,
#{return_values => true}
),
WorkerPids.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -494,3 +517,30 @@ t_get_status_server_selection_too_short(Config) ->
emqx_utils_json:decode(Body)
),
ok.
t_use_legacy_protocol_option(Config) ->
ResourceID = resource_id(Config),
{ok, _} = create_bridge(Config, #{<<"use_legacy_protocol">> => <<"true">>}),
?retry(
_Interval0 = 200,
_NAttempts0 = 20,
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceID))
),
WorkerPids0 = get_worker_pids(Config),
Expected0 = maps:from_keys(WorkerPids0, true),
LegacyOptions0 = maps:from_list([{Pid, mc_utils:use_legacy_protocol(Pid)} || Pid <- WorkerPids0]),
?assertEqual(Expected0, LegacyOptions0),
{ok, _} = delete_bridge(Config),
{ok, _} = create_bridge(Config, #{<<"use_legacy_protocol">> => <<"false">>}),
?retry(
_Interval0 = 200,
_NAttempts0 = 20,
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceID))
),
WorkerPids1 = get_worker_pids(Config),
Expected1 = maps:from_keys(WorkerPids1, false),
LegacyOptions1 = maps:from_list([{Pid, mc_utils:use_legacy_protocol(Pid)} || Pid <- WorkerPids1]),
?assertEqual(Expected1, LegacyOptions1),
ok.

View File

@ -1089,7 +1089,7 @@ t_strategy_key_validation(Config) ->
#{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"Message key cannot be empty", _/binary>>
} = Msg
}
}}},
probe_bridge_api(
Config,
@ -1103,7 +1103,7 @@ t_strategy_key_validation(Config) ->
#{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"Message key cannot be empty", _/binary>>
} = Msg
}
}}},
create_bridge_api(
Config,

View File

@ -9,7 +9,6 @@
{emqx, {path, "../emqx"}},
{emqx_utils, {path, "../emqx_utils"}},
{emqx_resource, {path, "../emqx_resource"}},
{eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}},
{epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7.0.1"}}}
]}.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "EMQX Data Integration Connectors"},
{vsn, "0.1.29"},
{vsn, "0.1.30"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [
@ -12,7 +12,6 @@
eredis_cluster,
eredis,
epgsql,
eldap2,
ehttpc,
jose,
emqx,

View File

@ -2,7 +2,7 @@
{application, emqx_dashboard, [
{description, "EMQX Web Dashboard"},
% strict semver, bump manually!
{vsn, "5.0.25"},
{vsn, "5.0.26"},
{modules, []},
{registered, [emqx_dashboard_sup]},
{applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl, emqx_bridge_http]},
@ -12,6 +12,6 @@
{maintainers, ["EMQX Team <contact@emqx.io>"]},
{links, [
{"Homepage", "https://emqx.io/"},
{"Github", "https://github.com/emqx/emqx-dashboard"}
{"Github", "https://github.com/emqx/emqx-dashboard5"}
]}
]}.

View File

@ -856,8 +856,6 @@ typename_to_spec("timeout()", _Mod) ->
};
typename_to_spec("bytesize()", _Mod) ->
#{type => string, example => <<"32MB">>};
typename_to_spec("mqtt_max_packet_size()", _Mod) ->
#{type => string, example => <<"32MB">>};
typename_to_spec("wordsize()", _Mod) ->
#{type => string, example => <<"1024KB">>};
typename_to_spec("map()", _Mod) ->

View File

@ -1,6 +1,6 @@
{application, emqx_ft, [
{description, "EMQX file transfer over MQTT"},
{vsn, "0.1.4"},
{vsn, "0.1.5"},
{registered, []},
{mod, {emqx_ft_app, []}},
{applications, [

View File

@ -71,7 +71,7 @@
%% the resulting file is corrupted during transmission).
size => _Bytes :: non_neg_integer(),
checksum => checksum(),
expire_at := emqx_datetime:epoch_second(),
expire_at := emqx_utils_calendar:epoch_second(),
%% TTL of individual segments
%% Somewhat confusing that we won't know it on the nodes where the filemeta
%% is missing.

View File

@ -278,7 +278,7 @@ format_file_info(
end.
format_timestamp(Timestamp) ->
iolist_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])).
emqx_utils_calendar:epoch_to_rfc3339(Timestamp, second).
format_name(NameBin) when is_binary(NameBin) ->
NameBin;

View File

@ -68,7 +68,7 @@
transfer := emqx_ft:transfer(),
name := file:name(),
size := _Bytes :: non_neg_integer(),
timestamp := emqx_datetime:epoch_second(),
timestamp := emqx_utils_calendar:epoch_second(),
uri => uri_string:uri_string(),
meta => emqx_ft:filemeta()
}.

View File

@ -43,7 +43,7 @@
transfer := transfer(),
name := file:name(),
uri := uri_string:uri_string(),
timestamp := emqx_datetime:epoch_second(),
timestamp := emqx_utils_calendar:epoch_second(),
size := _Bytes :: non_neg_integer(),
filemeta => filemeta()
}.

View File

@ -76,7 +76,7 @@
% TODO naming
-type filefrag(T) :: #{
path := file:name(),
timestamp := emqx_datetime:epoch_second(),
timestamp := emqx_utils_calendar:epoch_second(),
size := _Bytes :: non_neg_integer(),
fragment := T
}.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway, [
{description, "The Gateway management application"},
{vsn, "0.1.22"},
{vsn, "0.1.23"},
{registered, []},
{mod, {emqx_gateway_app, []}},
{applications, [kernel, stdlib, emqx, emqx_authn, emqx_ctl]},

View File

@ -397,13 +397,13 @@ format_channel_info(WhichNode, {_, Infos, Stats} = R) ->
{ip_address, {peername, ConnInfo, fun peer_to_binary_addr/1}},
{port, {peername, ConnInfo, fun peer_to_port/1}},
{is_bridge, ClientInfo, false},
{connected_at, {connected_at, ConnInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}},
{disconnected_at, {disconnected_at, ConnInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}},
{connected_at, {connected_at, ConnInfo, fun emqx_utils_calendar:epoch_to_rfc3339/1}},
{disconnected_at, {disconnected_at, ConnInfo, fun emqx_utils_calendar:epoch_to_rfc3339/1}},
{connected, {conn_state, Infos, fun conn_state_to_connected/1}},
{keepalive, ClientInfo, 0},
{clean_start, ConnInfo, true},
{expiry_interval, ConnInfo, 0},
{created_at, {created_at, SessInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}},
{created_at, {created_at, SessInfo, fun emqx_utils_calendar:epoch_to_rfc3339/1}},
{subscriptions_cnt, Stats, 0},
{subscriptions_max, Stats, infinity},
{inflight_cnt, Stats, 0},
@ -640,28 +640,28 @@ params_client_searching_in_qs() ->
)},
{gte_created_at,
mk(
emqx_datetime:epoch_millisecond(),
emqx_utils_calendar:epoch_millisecond(),
M#{
desc => ?DESC(param_gte_created_at)
}
)},
{lte_created_at,
mk(
emqx_datetime:epoch_millisecond(),
emqx_utils_calendar:epoch_millisecond(),
M#{
desc => ?DESC(param_lte_created_at)
}
)},
{gte_connected_at,
mk(
emqx_datetime:epoch_millisecond(),
emqx_utils_calendar:epoch_millisecond(),
M#{
desc => ?DESC(param_gte_connected_at)
}
)},
{lte_connected_at,
mk(
emqx_datetime:epoch_millisecond(),
emqx_utils_calendar:epoch_millisecond(),
M#{
desc => ?DESC(param_lte_connected_at)
}
@ -888,12 +888,12 @@ common_client_props() ->
)},
{connected_at,
mk(
emqx_datetime:epoch_millisecond(),
emqx_utils_calendar:epoch_millisecond(),
#{desc => ?DESC(connected_at)}
)},
{disconnected_at,
mk(
emqx_datetime:epoch_millisecond(),
emqx_utils_calendar:epoch_millisecond(),
#{
desc => ?DESC(disconnected_at)
}
@ -931,7 +931,7 @@ common_client_props() ->
)},
{created_at,
mk(
emqx_datetime:epoch_millisecond(),
emqx_utils_calendar:epoch_millisecond(),
#{desc => ?DESC(created_at)}
)},
{subscriptions_cnt,

View File

@ -313,9 +313,9 @@ format_gateway(
[
Name,
Status,
emqx_gateway_utils:unix_ts_to_rfc3339(CreatedAt),
emqx_utils_calendar:epoch_to_rfc3339(CreatedAt),
StopOrStart,
emqx_gateway_utils:unix_ts_to_rfc3339(Timestamp),
emqx_utils_calendar:epoch_to_rfc3339(Timestamp),
Config
]
).

View File

@ -38,7 +38,6 @@
-export([
apply/2,
parse_listenon/1,
unix_ts_to_rfc3339/1,
unix_ts_to_rfc3339/2,
listener_id/3,
parse_listener_id/1,
@ -364,14 +363,10 @@ unix_ts_to_rfc3339(Key, Map) ->
Map;
Ts ->
Map#{
Key =>
emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>)
Key => emqx_utils_calendar:epoch_to_rfc3339(Ts)
}
end.
unix_ts_to_rfc3339(Ts) ->
emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>).
-spec stringfy(term()) -> binary().
stringfy(T) when is_list(T); is_binary(T) ->
iolist_to_binary(T);

View File

@ -2312,9 +2312,7 @@ t_socket_passvice(_) ->
ok.
t_clients_api(_) ->
TsNow = emqx_gateway_utils:unix_ts_to_rfc3339(
erlang:system_time(millisecond)
),
TsNow = emqx_utils_calendar:now_to_rfc3339(millisecond),
ClientId = <<"client_id_test1">>,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, ClientId),

View File

@ -1,10 +1,11 @@
{application, emqx_ldap, [
{description, "EMQX LDAP Connector"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [
kernel,
stdlib,
eldap,
emqx_authn,
emqx_authz
]},

View File

@ -17,7 +17,8 @@
asn1,
syntax_tools,
ssl,
os_mon,
%% started temporary in emqx to prevent crash vm when permanent.
{os_mon, load},
inets,
compiler,
runtime_tools,
@ -36,7 +37,6 @@
[
emqx,
emqx_conf,
esasl,
observer_cli,
tools,

View File

@ -185,25 +185,39 @@ node_info(Nodes) ->
stopped_node_info(Node) ->
{Node, #{node => Node, node_status => 'stopped', role => core}}.
%% Hide cpu stats if os_check is not supported.
vm_stats() ->
Idle = vm_stats('cpu.idle'),
{MemUsedRatio, MemTotal} = get_sys_memory(),
[
{run_queue, vm_stats('run.queue')},
{cpu_idle, Idle},
{cpu_use, 100 - Idle},
{total_memory, MemTotal},
{used_memory, erlang:round(MemTotal * MemUsedRatio)}
].
cpu_stats() ++
[
{run_queue, vm_stats('run.queue')},
{total_memory, MemTotal},
{used_memory, erlang:round(MemTotal * MemUsedRatio)}
].
cpu_stats() ->
case emqx_os_mon:is_os_check_supported() of
false ->
[];
true ->
Idle = vm_stats('cpu.idle'),
[
{cpu_idle, Idle},
{cpu_use, 100 - Idle}
]
end.
vm_stats('cpu.idle') ->
case cpu_sup:util([detailed]) of
%% Not support for Windows
{_, 0, 0, _} -> 0;
{_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0)
case emqx_vm:cpu_util([detailed]) of
{_Num, _Use, List, _} when is_list(List) -> proplists:get_value(idle, List, 0);
%% return {all, 0, 0, []} when cpu_sup is not started
_ -> 0
end;
vm_stats('cpu.use') ->
100 - vm_stats('cpu.idle');
case vm_stats('cpu.idle') of
0 -> 0;
Idle -> 100 - Idle
end;
vm_stats('total.memory') ->
{_, MemTotal} = get_sys_memory(),
MemTotal;
@ -230,7 +244,7 @@ broker_info() ->
Info#{node => node(), otp_release => otp_rel(), node_status => 'running'}.
convert_broker_info({uptime, Uptime}, M) ->
M#{uptime => emqx_datetime:human_readable_duration_string(Uptime)};
M#{uptime => emqx_utils_calendar:human_readable_duration_string(Uptime)};
convert_broker_info({K, V}, M) ->
M#{K => iolist_to_binary(V)}.

View File

@ -127,7 +127,7 @@ fields(app) ->
)},
{expired_at,
hoconsc:mk(
hoconsc:union([infinity, emqx_datetime:epoch_second()]),
hoconsc:union([infinity, emqx_utils_calendar:epoch_second()]),
#{
desc => "No longer valid datetime",
example => <<"2021-12-05T02:01:34.186Z">>,
@ -137,7 +137,7 @@ fields(app) ->
)},
{created_at,
hoconsc:mk(
emqx_datetime:epoch_second(),
emqx_utils_calendar:epoch_second(),
#{
desc => "ApiKey create datetime",
example => <<"2021-12-01T00:00:00.000Z">>

View File

@ -79,6 +79,13 @@ schema("/banned") ->
?DESC(create_banned_api_response400)
)
}
},
delete => #{
description => ?DESC(clear_banned_api),
tags => ?TAGS,
parameters => [],
'requestBody' => [],
responses => #{204 => <<"No Content">>}
}
};
schema("/banned/:as/:who") ->
@ -140,13 +147,13 @@ fields(ban) ->
example => <<"Too many requests">>
})},
{at,
hoconsc:mk(emqx_datetime:epoch_second(), #{
hoconsc:mk(emqx_utils_calendar:epoch_second(), #{
desc => ?DESC(at),
required => false,
example => <<"2021-10-25T21:48:47+08:00">>
})},
{until,
hoconsc:mk(emqx_datetime:epoch_second(), #{
hoconsc:mk(emqx_utils_calendar:epoch_second(), #{
desc => ?DESC(until),
required => false,
example => <<"2021-10-25T21:53:47+08:00">>
@ -168,7 +175,10 @@ banned(post, #{body := Body}) ->
OldBannedFormat = emqx_utils_json:encode(format(Old)),
{400, 'ALREADY_EXISTS', OldBannedFormat}
end
end.
end;
banned(delete, _) ->
emqx_banned:clear(),
{204}.
delete_banned(delete, #{bindings := Params}) ->
case emqx_banned:look_up(Params) of

View File

@ -161,7 +161,7 @@ schema("/clients") ->
desc => <<"Fuzzy search `username` as substring">>
})},
{gte_created_at,
hoconsc:mk(emqx_datetime:epoch_millisecond(), #{
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
in => query,
required => false,
desc =>
@ -169,7 +169,7 @@ schema("/clients") ->
" than or equal method, rfc3339 or timestamp(millisecond)">>
})},
{lte_created_at,
hoconsc:mk(emqx_datetime:epoch_millisecond(), #{
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
in => query,
required => false,
desc =>
@ -177,7 +177,7 @@ schema("/clients") ->
" than or equal method, rfc3339 or timestamp(millisecond)">>
})},
{gte_connected_at,
hoconsc:mk(emqx_datetime:epoch_millisecond(), #{
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
in => query,
required => false,
desc => <<
@ -186,7 +186,7 @@ schema("/clients") ->
>>
})},
{lte_connected_at,
hoconsc:mk(emqx_datetime:epoch_millisecond(), #{
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
in => query,
required => false,
desc => <<
@ -399,16 +399,16 @@ fields(client) ->
{connected, hoconsc:mk(boolean(), #{desc => <<"Whether the client is connected">>})},
{connected_at,
hoconsc:mk(
emqx_datetime:epoch_millisecond(),
emqx_utils_calendar:epoch_millisecond(),
#{desc => <<"Client connection time, rfc3339 or timestamp(millisecond)">>}
)},
{created_at,
hoconsc:mk(
emqx_datetime:epoch_millisecond(),
emqx_utils_calendar:epoch_millisecond(),
#{desc => <<"Session creation time, rfc3339 or timestamp(millisecond)">>}
)},
{disconnected_at,
hoconsc:mk(emqx_datetime:epoch_millisecond(), #{
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
desc =>
<<
"Client offline time."
@ -950,7 +950,7 @@ result_format_time_fun(Key, NClientInfoMap) ->
case NClientInfoMap of
#{Key := TimeStamp} ->
NClientInfoMap#{
Key => emqx_datetime:epoch_to_rfc3339(TimeStamp)
Key => emqx_utils_calendar:epoch_to_rfc3339(TimeStamp)
};
#{} ->
NClientInfoMap

View File

@ -281,7 +281,7 @@ fields(trace) ->
})},
{start_at,
hoconsc:mk(
emqx_datetime:epoch_second(),
emqx_utils_calendar:epoch_second(),
#{
description => ?DESC(time_format),
required => false,
@ -290,7 +290,7 @@ fields(trace) ->
)},
{end_at,
hoconsc:mk(
emqx_datetime:epoch_second(),
emqx_utils_calendar:epoch_second(),
#{
description => ?DESC(time_format),
required => false,
@ -410,8 +410,8 @@ trace(get, _Params) ->
Trace0#{
log_size => LogSize,
Type => iolist_to_binary(Filter),
start_at => list_to_binary(calendar:system_time_to_rfc3339(Start)),
end_at => list_to_binary(calendar:system_time_to_rfc3339(End)),
start_at => emqx_utils_calendar:epoch_to_rfc3339(Start, second),
end_at => emqx_utils_calendar:epoch_to_rfc3339(End, second),
status => status(Enable, Start, End, Now)
}
end,
@ -468,8 +468,8 @@ format_trace(Trace0) ->
Trace2#{
log_size => LogSize,
Type => iolist_to_binary(Filter),
start_at => list_to_binary(calendar:system_time_to_rfc3339(Start)),
end_at => list_to_binary(calendar:system_time_to_rfc3339(End)),
start_at => emqx_utils_calendar:epoch_to_rfc3339(Start, second),
end_at => emqx_utils_calendar:epoch_to_rfc3339(Start, second),
status => status(Enable, Start, End, Now)
}.

View File

@ -142,11 +142,11 @@ format(App = #{expired_at := ExpiredAt0, created_at := CreateAt}) ->
ExpiredAt =
case ExpiredAt0 of
infinity -> <<"infinity">>;
_ -> list_to_binary(calendar:system_time_to_rfc3339(ExpiredAt0))
_ -> emqx_utils_calendar:epoch_to_rfc3339(ExpiredAt0, second)
end,
App#{
expired_at => ExpiredAt,
created_at => list_to_binary(calendar:system_time_to_rfc3339(CreateAt))
created_at => emqx_utils_calendar:epoch_to_rfc3339(CreateAt, second)
}.
list() ->

View File

@ -87,7 +87,7 @@ broker([]) ->
Funs = [sysdescr, version, datetime],
[emqx_ctl:print("~-10s: ~ts~n", [Fun, emqx_sys:Fun()]) || Fun <- Funs],
emqx_ctl:print("~-10s: ~ts~n", [
uptime, emqx_datetime:human_readable_duration_string(emqx_sys:uptime())
uptime, emqx_utils_calendar:human_readable_duration_string(emqx_sys:uptime())
]);
broker(["stats"]) ->
[

View File

@ -157,6 +157,30 @@ t_delete(_Config) ->
),
ok.
t_clear(_Config) ->
Now = erlang:system_time(second),
At = emqx_banned:to_rfc3339(Now),
Until = emqx_banned:to_rfc3339(Now + 3),
Who = <<"TestClient-"/utf8>>,
By = <<"banned suite 中"/utf8>>,
Reason = <<"test测试"/utf8>>,
As = <<"clientid">>,
Banned = #{
as => clientid,
who => Who,
by => By,
reason => Reason,
at => At,
until => Until
},
{ok, _} = create_banned(Banned),
?assertMatch({ok, _}, clear_banned()),
?assertMatch(
{error, {"HTTP/1.1", 404, "Not Found"}},
delete_banned(binary_to_list(As), binary_to_list(Who))
),
ok.
list_banned() ->
Path = emqx_mgmt_api_test_util:api_path(["banned"]),
case emqx_mgmt_api_test_util:request_api(get, Path) of
@ -176,5 +200,9 @@ delete_banned(As, Who) ->
DeletePath = emqx_mgmt_api_test_util:api_path(["banned", As, Who]),
emqx_mgmt_api_test_util:request_api(delete, DeletePath).
clear_banned() ->
ClearPath = emqx_mgmt_api_test_util:api_path(["banned"]),
emqx_mgmt_api_test_util:request_api(delete, ClearPath).
to_rfc3339(Sec) ->
list_to_binary(calendar:system_time_to_rfc3339(Sec)).

View File

@ -260,7 +260,7 @@ t_query_clients_with_time(_) ->
%% Do not uri_encode `=` to `%3D`
Rfc3339String = emqx_http_lib:uri_encode(
binary:bin_to_list(
emqx_datetime:epoch_to_rfc3339(NowTimeStampInt)
emqx_utils_calendar:epoch_to_rfc3339(NowTimeStampInt)
)
),
TimeStampString = emqx_http_lib:uri_encode(integer_to_list(NowTimeStampInt)),

View File

@ -208,8 +208,8 @@ format_delayed(
},
WithPayload
) ->
PublishTime = to_rfc3339(PublishTimeStamp div 1000),
ExpectTime = to_rfc3339(ExpectTimeStamp div 1000),
PublishTime = emqx_utils_calendar:epoch_to_rfc3339(PublishTimeStamp),
ExpectTime = emqx_utils_calendar:epoch_to_rfc3339(ExpectTimeStamp),
RemainingTime = ExpectTimeStamp - ?NOW,
Result = #{
msgid => emqx_guid:to_hexstr(Id),
@ -230,9 +230,6 @@ format_delayed(
Result
end.
to_rfc3339(Timestamp) ->
list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, second}])).
-spec get_delayed_message(binary()) -> with_id_return(map()).
get_delayed_message(Id) ->
case ets:select(?TAB, ?QUERY_MS(Id)) of

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_modules, [
{description, "EMQX Modules"},
{vsn, "5.0.19"},
{vsn, "5.0.20"},
{modules, []},
{applications, [kernel, stdlib, emqx, emqx_ctl]},
{mod, {emqx_modules_app, []}},

View File

@ -295,7 +295,7 @@ terminate(_Reason, _State) ->
reset_topic({Topic, Data}, Speeds) ->
CRef = maps:get(counter_ref, Data),
ok = reset_counter(CRef),
ResetTime = emqx_rule_funcs:now_rfc3339(),
ResetTime = emqx_utils_calendar:now_to_rfc3339(),
true = ets:insert(?TAB, {Topic, Data#{reset_time => ResetTime}}),
Fun =
fun(Metric, CurrentSpeeds) ->

View File

@ -183,7 +183,7 @@ fields(topic_metrics) ->
)},
{create_time,
mk(
emqx_datetime:epoch_second(),
emqx_utils_calendar:epoch_second(),
#{
desc => ?DESC(create_time),
required => true,
@ -192,7 +192,7 @@ fields(topic_metrics) ->
)},
{reset_time,
mk(
emqx_datetime:epoch_second(),
emqx_utils_calendar:epoch_second(),
#{
desc => ?DESC(reset_time),
required => false,

View File

@ -3,5 +3,5 @@
{erl_opts, [debug_info]}.
{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}}
, {mongodb, {git, "https://github.com/emqx/mongodb-erlang", {tag, "v3.0.20"}}}
, {mongodb, {git, "https://github.com/emqx/mongodb-erlang", {tag, "v3.0.21"}}}
]}.

View File

@ -1,6 +1,6 @@
{application, emqx_mongodb, [
{description, "EMQX MongoDB Connector"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [
kernel,

View File

@ -141,6 +141,11 @@ mongo_fields() ->
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
{username, fun emqx_connector_schema_lib:username/1},
{password, fun emqx_connector_schema_lib:password/1},
{use_legacy_protocol,
hoconsc:mk(hoconsc:enum([auto, true, false]), #{
default => auto,
desc => ?DESC("use_legacy_protocol")
})},
{auth_source, #{
type => binary(),
required => false,
@ -429,6 +434,8 @@ init_worker_options([{w_mode, V} | R], Acc) ->
init_worker_options(R, [{w_mode, V} | Acc]);
init_worker_options([{r_mode, V} | R], Acc) ->
init_worker_options(R, [{r_mode, V} | Acc]);
init_worker_options([{use_legacy_protocol, V} | R], Acc) ->
init_worker_options(R, [{use_legacy_protocol, V} | Acc]);
init_worker_options([_ | R], Acc) ->
init_worker_options(R, Acc);
init_worker_options([], Acc) ->

View File

@ -1,6 +1,6 @@
{application, emqx_opentelemetry, [
{description, "OpenTelemetry for EMQX Broker"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{mod, {emqx_otel_app, []}},
{applications, [kernel, stdlib, emqx]},

View File

@ -150,9 +150,9 @@ get_vm_gauge(Name) ->
[{emqx_mgmt:vm_stats(Name), #{}}].
get_cluster_gauge('node.running') ->
length(emqx:cluster_nodes(running));
[{length(emqx:cluster_nodes(running)), #{}}];
get_cluster_gauge('node.stopped') ->
length(emqx:cluster_nodes(stopped)).
[{length(emqx:cluster_nodes(stopped)), #{}}].
get_metric_counter(Name) ->
[{emqx_metrics:val(Name), #{}}].

View File

@ -211,11 +211,8 @@ format_message(#message{
msgid => emqx_guid:to_hexstr(ID),
qos => Qos,
topic => Topic,
publish_at => list_to_binary(
calendar:system_time_to_rfc3339(
Timestamp, [{unit, millisecond}]
)
),
publish_at =>
emqx_utils_calendar:epoch_to_rfc3339(Timestamp),
from_clientid => to_bin_string(From),
from_username => maps:get(username, Headers, <<>>)
}.

View File

@ -1,270 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_date).
-export([date/3, date/4, parse_date/4]).
-export([
is_int_char/1,
is_symbol_char/1,
is_m_char/1
]).
-record(result, {
%%year()
year = "1970" :: string(),
%%month()
month = "1" :: string(),
%%day()
day = "1" :: string(),
%%hour()
hour = "0" :: string(),
%%minute() %% epoch in millisecond precision
minute = "0" :: string(),
%%second() %% epoch in millisecond precision
second = "0" :: string(),
%%integer() %% zone maybe some value
zone = "+00:00" :: string()
}).
%% -type time_unit() :: 'microsecond'
%% | 'millisecond'
%% | 'nanosecond'
%% | 'second'.
%% -type offset() :: [byte()] | (Time :: integer()).
date(TimeUnit, Offset, FormatString) ->
date(TimeUnit, Offset, FormatString, erlang:system_time(TimeUnit)).
date(TimeUnit, Offset, FormatString, TimeEpoch) ->
[Head | Other] = string:split(FormatString, "%", all),
R = create_tag([{st, Head}], Other),
Res = lists:map(
fun(Expr) ->
eval_tag(rmap(make_time(TimeUnit, Offset, TimeEpoch)), Expr)
end,
R
),
lists:concat(Res).
parse_date(TimeUnit, Offset, FormatString, InputString) ->
[Head | Other] = string:split(FormatString, "%", all),
R = create_tag([{st, Head}], Other),
IsZ = fun(V) ->
case V of
{tag, $Z} -> true;
_ -> false
end
end,
R1 = lists:filter(IsZ, R),
IfFun = fun(Con, A, B) ->
case Con of
[] -> A;
_ -> B
end
end,
Res = parse_input(FormatString, InputString),
Str =
Res#result.year ++ "-" ++
Res#result.month ++ "-" ++
Res#result.day ++ "T" ++
Res#result.hour ++ ":" ++
Res#result.minute ++ ":" ++
Res#result.second ++
IfFun(R1, Offset, Res#result.zone),
calendar:rfc3339_to_system_time(Str, [{unit, TimeUnit}]).
mlist(R) ->
%% %H Shows hour in 24-hour format [15]
[
{$H, R#result.hour},
%% %M Displays minutes [00-59]
{$M, R#result.minute},
%% %S Displays seconds [00-59]
{$S, R#result.second},
%% %y Displays year YYYY [2021]
{$y, R#result.year},
%% %m Displays the number of the month [01-12]
{$m, R#result.month},
%% %d Displays the number of the month [01-12]
{$d, R#result.day},
%% %Z Displays Time zone
{$Z, R#result.zone}
].
rmap(Result) ->
maps:from_list(mlist(Result)).
support_char() -> "HMSymdZ".
create_tag(Head, []) ->
Head;
create_tag(Head, [Val1 | RVal]) ->
case Val1 of
[] ->
create_tag(Head ++ [{st, [$%]}], RVal);
[H | Other] ->
case lists:member(H, support_char()) of
true -> create_tag(Head ++ [{tag, H}, {st, Other}], RVal);
false -> create_tag(Head ++ [{st, [$% | Val1]}], RVal)
end
end.
eval_tag(_, {st, Str}) ->
Str;
eval_tag(Map, {tag, Char}) ->
maps:get(Char, Map, "undefined").
%% make_time(TimeUnit, Offset) ->
%% make_time(TimeUnit, Offset, erlang:system_time(TimeUnit)).
make_time(TimeUnit, Offset, TimeEpoch) ->
Res = calendar:system_time_to_rfc3339(
TimeEpoch,
[{unit, TimeUnit}, {offset, Offset}]
),
[
Y1,
Y2,
Y3,
Y4,
$-,
Mon1,
Mon2,
$-,
D1,
D2,
_T,
H1,
H2,
$:,
Min1,
Min2,
$:,
S1,
S2
| TimeStr
] = Res,
IsFractionChar = fun(C) -> C >= $0 andalso C =< $9 orelse C =:= $. end,
{FractionStr, UtcOffset} = lists:splitwith(IsFractionChar, TimeStr),
#result{
year = [Y1, Y2, Y3, Y4],
month = [Mon1, Mon2],
day = [D1, D2],
hour = [H1, H2],
minute = [Min1, Min2],
second = [S1, S2] ++ FractionStr,
zone = UtcOffset
}.
is_int_char(C) ->
C >= $0 andalso C =< $9.
is_symbol_char(C) ->
C =:= $- orelse C =:= $+.
is_m_char(C) ->
C =:= $:.
parse_char_with_fun(_, []) ->
error(null_input);
parse_char_with_fun(ValidFun, [C | Other]) ->
Res =
case erlang:is_function(ValidFun) of
true -> ValidFun(C);
false -> erlang:apply(emqx_rule_date, ValidFun, [C])
end,
case Res of
true -> {C, Other};
false -> error({unexpected, [C | Other]})
end.
parse_string([], Input) ->
{[], Input};
parse_string([C | Other], Input) ->
{C1, Input1} = parse_char_with_fun(fun(V) -> V =:= C end, Input),
{Res, Input2} = parse_string(Other, Input1),
{[C1 | Res], Input2}.
parse_times(0, _, Input) ->
{[], Input};
parse_times(Times, Fun, Input) ->
{C1, Input1} = parse_char_with_fun(Fun, Input),
{Res, Input2} = parse_times((Times - 1), Fun, Input1),
{[C1 | Res], Input2}.
parse_int_times(Times, Input) ->
parse_times(Times, is_int_char, Input).
parse_fraction(Input) ->
IsFractionChar = fun(C) -> C >= $0 andalso C =< $9 orelse C =:= $. end,
lists:splitwith(IsFractionChar, Input).
parse_second(Input) ->
{M, Input1} = parse_int_times(2, Input),
{M1, Input2} = parse_fraction(Input1),
{M ++ M1, Input2}.
parse_zone(Input) ->
{S, Input1} = parse_char_with_fun(is_symbol_char, Input),
{M, Input2} = parse_int_times(2, Input1),
{C, Input3} = parse_char_with_fun(is_m_char, Input2),
{V, Input4} = parse_int_times(2, Input3),
{[S | M ++ [C | V]], Input4}.
mlist1() ->
maps:from_list(
%% %H Shows hour in 24-hour format [15]
[
{$H, fun(Input) -> parse_int_times(2, Input) end},
%% %M Displays minutes [00-59]
{$M, fun(Input) -> parse_int_times(2, Input) end},
%% %S Displays seconds [00-59]
{$S, fun(Input) -> parse_second(Input) end},
%% %y Displays year YYYY [2021]
{$y, fun(Input) -> parse_int_times(4, Input) end},
%% %m Displays the number of the month [01-12]
{$m, fun(Input) -> parse_int_times(2, Input) end},
%% %d Displays the number of the month [01-12]
{$d, fun(Input) -> parse_int_times(2, Input) end},
%% %Z Displays Time zone
{$Z, fun(Input) -> parse_zone(Input) end}
]
).
update_result($H, Res, Str) -> Res#result{hour = Str};
update_result($M, Res, Str) -> Res#result{minute = Str};
update_result($S, Res, Str) -> Res#result{second = Str};
update_result($y, Res, Str) -> Res#result{year = Str};
update_result($m, Res, Str) -> Res#result{month = Str};
update_result($d, Res, Str) -> Res#result{day = Str};
update_result($Z, Res, Str) -> Res#result{zone = Str}.
parse_tag(Res, {st, St}, InputString) ->
{_A, B} = parse_string(St, InputString),
{Res, B};
parse_tag(Res, {tag, St}, InputString) ->
Fun = maps:get(St, mlist1()),
{A, B} = Fun(InputString),
NRes = update_result(St, Res, A),
{NRes, B}.
parse_tags(Res, [], _) ->
Res;
parse_tags(Res, [Tag | Others], InputString) ->
{NRes, B} = parse_tag(Res, Tag, InputString),
parse_tags(NRes, Others, B).
parse_input(FormatString, InputString) ->
[Head | Other] = string:split(FormatString, "%", all),
R = create_tag([{st, Head}], Other),
parse_tags(#result{}, R, InputString).

View File

@ -2,7 +2,7 @@
{application, emqx_rule_engine, [
{description, "EMQX Rule Engine"},
% strict semver, bump manually!
{vsn, "5.0.22"},
{vsn, "5.0.23"},
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [kernel, stdlib, rulesql, getopt, emqx_ctl, uuid]},

View File

@ -514,7 +514,7 @@ format_rule_engine_resp(Config) ->
maps:remove(rules, Config).
format_datetime(Timestamp, Unit) ->
list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])).
emqx_utils_calendar:epoch_to_rfc3339(Timestamp, Unit).
format_action(Actions) ->
[do_format_action(Act) || Act <- Actions].

View File

@ -74,8 +74,8 @@ pretty_print_rule(ID) ->
"Updated at:\n ~ts\n"
"Actions:\n ~s\n"
,[Id, Name, left_pad(Descr), Enable, left_pad(SQL),
calendar:system_time_to_rfc3339(CreatedAt, [{unit, millisecond}]),
calendar:system_time_to_rfc3339(UpdatedAt, [{unit, millisecond}]),
emqx_utils_calendar:epoch_to_rfc3339(CreatedAt, millisecond),
emqx_utils_calendar:epoch_to_rfc3339(UpdatedAt, millisecond),
[left_pad(format_action(A)) || A <- Actions]
]
);

View File

@ -276,6 +276,8 @@
]}
).
-import(emqx_utils_calendar, [time_unit/1, now_to_rfc3339/0, now_to_rfc3339/1, epoch_to_rfc3339/2]).
%% @doc "msgid()" Func
msgid() ->
fun
@ -1077,23 +1079,19 @@ kv_store_del(Key) ->
%%--------------------------------------------------------------------
now_rfc3339() ->
now_rfc3339(<<"second">>).
now_to_rfc3339().
now_rfc3339(Unit) ->
unix_ts_to_rfc3339(now_timestamp(Unit), Unit).
now_to_rfc3339(time_unit(Unit)).
unix_ts_to_rfc3339(Epoch) ->
unix_ts_to_rfc3339(Epoch, <<"second">>).
epoch_to_rfc3339(Epoch, second).
unix_ts_to_rfc3339(Epoch, Unit) when is_integer(Epoch) ->
emqx_utils_conv:bin(
calendar:system_time_to_rfc3339(
Epoch, [{unit, time_unit(Unit)}]
)
).
epoch_to_rfc3339(Epoch, time_unit(Unit)).
rfc3339_to_unix_ts(DateTime) ->
rfc3339_to_unix_ts(DateTime, <<"second">>).
rfc3339_to_unix_ts(DateTime, second).
rfc3339_to_unix_ts(DateTime, Unit) when is_binary(DateTime) ->
calendar:rfc3339_to_system_time(
@ -1107,15 +1105,6 @@ now_timestamp() ->
now_timestamp(Unit) ->
erlang:system_time(time_unit(Unit)).
time_unit(<<"second">>) -> second;
time_unit(<<"millisecond">>) -> millisecond;
time_unit(<<"microsecond">>) -> microsecond;
time_unit(<<"nanosecond">>) -> nanosecond;
time_unit(second) -> second;
time_unit(millisecond) -> millisecond;
time_unit(microsecond) -> microsecond;
time_unit(nanosecond) -> nanosecond.
format_date(TimeUnit, Offset, FormatString) ->
Unit = time_unit(TimeUnit),
TimeEpoch = erlang:system_time(Unit),
@ -1125,17 +1114,17 @@ format_date(TimeUnit, Offset, FormatString, TimeEpoch) ->
Unit = time_unit(TimeUnit),
emqx_utils_conv:bin(
lists:concat(
emqx_calendar:format(TimeEpoch, Unit, Offset, FormatString)
emqx_utils_calendar:format(TimeEpoch, Unit, Offset, FormatString)
)
).
date_to_unix_ts(TimeUnit, FormatString, InputString) ->
Unit = time_unit(TimeUnit),
emqx_calendar:parse(InputString, Unit, FormatString).
emqx_utils_calendar:parse(InputString, Unit, FormatString).
date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) ->
Unit = time_unit(TimeUnit),
OffsetSecond = emqx_calendar:offset_second(Offset),
OffsetSecond = emqx_utils_calendar:offset_second(Offset),
OffsetDelta = erlang:convert_time_unit(OffsetSecond, second, Unit),
date_to_unix_ts(Unit, FormatString, InputString) - OffsetDelta.
@ -1143,7 +1132,7 @@ timezone_to_second(TimeZone) ->
timezone_to_offset_seconds(TimeZone).
timezone_to_offset_seconds(TimeZone) ->
emqx_calendar:offset_second(TimeZone).
emqx_utils_calendar:offset_second(TimeZone).
'$handle_undefined_function'(sprintf, [Format | Args]) ->
erlang:apply(fun sprintf_s/2, [Format, Args]);

View File

@ -2,7 +2,7 @@
{application, emqx_utils, [
{description, "Miscellaneous utilities for EMQX apps"},
% strict semver, bump manually!
{vsn, "5.0.6"},
{vsn, "5.0.7"},
{modules, [
emqx_utils,
emqx_utils_api,

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,7 +14,32 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_calendar).
-module(emqx_utils_calendar).
-include_lib("typerefl/include/types.hrl").
-export([
formatter/1,
format/3,
format/4,
parse/3,
offset_second/1
]).
%% API
-export([
to_epoch_millisecond/1,
to_epoch_second/1,
human_readable_duration_string/1
]).
-export([
epoch_to_rfc3339/1,
epoch_to_rfc3339/2,
now_to_rfc3339/0,
now_to_rfc3339/1
]).
-export([time_unit/1]).
-define(SECONDS_PER_MINUTE, 60).
-define(SECONDS_PER_HOUR, 3600).
@ -24,13 +49,11 @@
-define(DAYS_FROM_0_TO_1970, 719528).
-define(SECONDS_FROM_0_TO_1970, (?DAYS_FROM_0_TO_1970 * ?SECONDS_PER_DAY)).
-export([
formatter/1,
format/3,
format/4,
parse/3,
offset_second/1
]).
%% the maximum value is the SECONDS_FROM_0_TO_10000 in the calendar.erl,
%% here minus SECONDS_PER_DAY to tolerate timezone time offset,
%% so the maximum date can reach 9999-12-31 which is ample.
-define(MAXIMUM_EPOCH, 253402214400).
-define(MAXIMUM_EPOCH_MILLI, 253402214400_000).
-define(DATE_PART, [
year,
@ -50,6 +73,72 @@
timezone2
]).
-reflect_type([
epoch_millisecond/0,
epoch_second/0
]).
-type epoch_second() :: non_neg_integer().
-type epoch_millisecond() :: non_neg_integer().
-typerefl_from_string({epoch_second/0, ?MODULE, to_epoch_second}).
-typerefl_from_string({epoch_millisecond/0, ?MODULE, to_epoch_millisecond}).
%%--------------------------------------------------------------------
%% Epoch <-> RFC 3339
%%--------------------------------------------------------------------
to_epoch_second(DateTime) ->
to_epoch(DateTime, second).
to_epoch_millisecond(DateTime) ->
to_epoch(DateTime, millisecond).
to_epoch(DateTime, Unit) ->
try
case string:to_integer(DateTime) of
{Epoch, []} -> validate_epoch(Epoch, Unit);
_ -> {ok, calendar:rfc3339_to_system_time(DateTime, [{unit, Unit}])}
end
catch
error:_ ->
{error, bad_rfc3339_timestamp}
end.
epoch_to_rfc3339(Timestamp) ->
epoch_to_rfc3339(Timestamp, millisecond).
epoch_to_rfc3339(Timestamp, Unit) when is_integer(Timestamp) ->
list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])).
now_to_rfc3339() ->
now_to_rfc3339(second).
now_to_rfc3339(Unit) ->
epoch_to_rfc3339(erlang:system_time(Unit), Unit).
-spec human_readable_duration_string(integer()) -> string().
human_readable_duration_string(Milliseconds) ->
Seconds = Milliseconds div 1000,
{D, {H, M, S}} = calendar:seconds_to_daystime(Seconds),
L0 = [{D, " days"}, {H, " hours"}, {M, " minutes"}, {S, " seconds"}],
L1 = lists:dropwhile(fun({K, _}) -> K =:= 0 end, L0),
L2 = lists:map(fun({Time, Unit}) -> [integer_to_list(Time), Unit] end, L1),
lists:flatten(lists:join(", ", L2)).
validate_epoch(Epoch, _Unit) when Epoch < 0 ->
{error, bad_epoch};
validate_epoch(Epoch, second) when Epoch =< ?MAXIMUM_EPOCH ->
{ok, Epoch};
validate_epoch(Epoch, millisecond) when Epoch =< ?MAXIMUM_EPOCH_MILLI ->
{ok, Epoch};
validate_epoch(_Epoch, _Unit) ->
{error, bad_epoch}.
%%--------------------------------------------------------------------
%% Timestamp <-> any format date string
%% Timestamp treat as a superset for epoch, it can be any positive integer
%%--------------------------------------------------------------------
formatter(FormatterStr) when is_list(FormatterStr) ->
formatter(list_to_binary(FormatterStr));
formatter(FormatterBin) when is_binary(FormatterBin) ->
@ -70,8 +159,10 @@ parse(DateStr, Unit, FormatterBin) when is_binary(FormatterBin) ->
parse(DateStr, Unit, formatter(FormatterBin));
parse(DateStr, Unit, Formatter) ->
do_parse(DateStr, Unit, Formatter).
%% -------------------------------------------------------------------------------------------------
%% internal
%%--------------------------------------------------------------------
%% Time unit
%%--------------------------------------------------------------------
time_unit(second) -> second;
time_unit(millisecond) -> millisecond;
@ -84,10 +175,12 @@ time_unit("nanosecond") -> nanosecond;
time_unit(<<"second">>) -> second;
time_unit(<<"millisecond">>) -> millisecond;
time_unit(<<"microsecond">>) -> microsecond;
time_unit(<<"nanosecond">>) -> nanosecond.
time_unit(<<"nanosecond">>) -> nanosecond;
time_unit(Any) -> error({invalid_time_unit, Any}).
%% -------------------------------------------------------------------------------------------------
%%--------------------------------------------------------------------
%% internal: format part
%%--------------------------------------------------------------------
do_formatter(<<>>, Formatter) ->
lists:reverse(Formatter);
@ -357,9 +450,9 @@ padding(Data, Len) when Len > 0 andalso erlang:length(Data) < Len ->
padding(Data, _Len) ->
Data.
%% -------------------------------------------------------------------------------------------------
%% internal
%% parse part
%%--------------------------------------------------------------------
%% internal: parse part
%%--------------------------------------------------------------------
do_parse(DateStr, Unit, Formatter) ->
DateInfo = do_parse_date_str(DateStr, Formatter, #{}),
@ -476,3 +569,77 @@ str_to_int_or_error(Str, Error) ->
_ ->
error(Error)
end.
%%--------------------------------------------------------------------
%% Unit Test
%%--------------------------------------------------------------------
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(nowarn_export_all).
-compile(export_all).
roots() -> [bar].
fields(bar) ->
[
{second, ?MODULE:epoch_second()},
{millisecond, ?MODULE:epoch_millisecond()}
].
-define(FORMAT(_Sec_, _Ms_),
lists:flatten(
io_lib:format("bar={second=~w,millisecond=~w}", [_Sec_, _Ms_])
)
).
epoch_ok_test() ->
BigStamp = 1 bsl 37,
Args = [
{0, 0, 0, 0},
{1, 1, 1, 1},
{BigStamp, BigStamp * 1000, BigStamp, BigStamp * 1000},
{"2022-01-01T08:00:00+08:00", "2022-01-01T08:00:00+08:00", 1640995200, 1640995200000}
],
lists:foreach(
fun({Sec, Ms, EpochSec, EpochMs}) ->
check_ok(?FORMAT(Sec, Ms), EpochSec, EpochMs)
end,
Args
),
ok.
check_ok(Input, Sec, Ms) ->
{ok, Data} = hocon:binary(Input, #{}),
?assertMatch(
#{bar := #{second := Sec, millisecond := Ms}},
hocon_tconf:check_plain(?MODULE, Data, #{atom_key => true}, [bar])
),
ok.
epoch_failed_test() ->
BigStamp = 1 bsl 38,
Args = [
{-1, -1},
{"1s", "1s"},
{BigStamp, 0},
{0, BigStamp * 1000},
{"2022-13-13T08:00:00+08:00", "2022-13-13T08:00:00+08:00"}
],
lists:foreach(
fun({Sec, Ms}) ->
check_failed(?FORMAT(Sec, Ms))
end,
Args
),
ok.
check_failed(Input) ->
{ok, Data} = hocon:binary(Input, #{}),
?assertException(
throw,
_,
hocon_tconf:check_plain(?MODULE, Data, #{atom_key => true}, [bar])
),
ok.
-endif.

View File

@ -0,0 +1 @@
Added option to configure detection of legacy protocol in MondoDB connectors and bridges.

View File

@ -0,0 +1 @@
Add a new API endpoint `DELETE /banned` to clear all `banned` data.

View File

@ -0,0 +1,2 @@
Changed the type of the `mqtt.mqx_packet_size` from string to byteSize to better represent the valid numeric range.
Strings will still be accepted for backwards compatibility.

View File

@ -0,0 +1 @@
Refactored datetime-related modules and functions to simplify the code.

View File

@ -0,0 +1 @@
Add a check for the maximum value of the timestamp in the API to ensure it is a valid Unix timestamp.

View File

@ -0,0 +1,2 @@
Removed os_mon application monitor support on Windows platforms to prevent VM crashes.
Functionality remains on non-Windows platforms.

View File

@ -0,0 +1 @@
Fixed crashing when debugging/tracing with large payloads(introduce when [#11279](https://github.com/emqx/emqx/pull/11279))

View File

@ -0,0 +1,2 @@
Removed validation that enforced non-empty PEM for CA cert file.
CA certificate file PEM can now be empty.

View File

@ -0,0 +1,3 @@
Added support for defining message attributes and ordering key templates for GCP PubSub Producer bridge.
Also updated our HOCON library to fix an issue where objects in an array were being concatenated even if they lay on different lines.

View File

@ -0,0 +1 @@
Fixed error information when Kinesis bridge fails to connect to endpoint.

View File

@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do
# in conflict by emqtt and hocon
{:getopt, "1.0.2", override: true},
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
{:hocon, github: "emqx/hocon", tag: "0.39.14", override: true},
{:hocon, github: "emqx/hocon", tag: "0.39.16", override: true},
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
{:esasl, github: "emqx/esasl", tag: "0.2.0"},
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
@ -403,7 +403,8 @@ defmodule EMQXUmbrella.MixProject do
quicer: enable_quicer?(),
bcrypt: enable_bcrypt?(),
jq: enable_jq?(),
observer: is_app?(:observer)
observer: is_app?(:observer),
os_mon: enable_os_mon?()
}
|> Enum.reject(&elem(&1, 1))
|> Enum.map(&elem(&1, 0))
@ -835,6 +836,10 @@ defmodule EMQXUmbrella.MixProject do
not win32?()
end
defp enable_os_mon?() do
not win32?()
end
defp enable_jq?() do
not Enum.any?([
build_without_jq?(),

View File

@ -75,7 +75,7 @@
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
, {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.14"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}

View File

@ -405,12 +405,13 @@ relx_apps(ReleaseType, Edition) ->
ce -> CEBusinessApps
end,
BusinessApps = CommonBusinessApps ++ EditionSpecificApps,
ExcludedApps = excluded_apps(ReleaseType),
SystemApps ++
%% EMQX starts the DB and the business applications:
[{App, load} || App <- (DBApps -- ExcludedApps)] ++
[emqx_machine] ++
[{App, load} || App <- (BusinessApps -- ExcludedApps)].
Apps =
(SystemApps ++
%% EMQX starts the DB and the business applications:
[{App, load} || App <- DBApps] ++
[emqx_machine] ++
[{App, load} || App <- BusinessApps]),
lists:foldl(fun proplists:delete/2, Apps, excluded_apps(ReleaseType)).
excluded_apps(ReleaseType) ->
OptionalApps = [
@ -418,7 +419,8 @@ excluded_apps(ReleaseType) ->
{bcrypt, provide_bcrypt_release(ReleaseType)},
{jq, is_jq_supported()},
{observer, is_app(observer)},
{mnesia_rocksdb, is_rocksdb_supported()}
{mnesia_rocksdb, is_rocksdb_supported()},
{os_mon, provide_os_mon_release()}
],
[App || {App, false} <- OptionalApps].
@ -524,6 +526,9 @@ is_debug(VarName) ->
provide_bcrypt_dep() ->
not is_win32().
provide_os_mon_release() ->
not is_win32().
provide_bcrypt_release(ReleaseType) ->
provide_bcrypt_dep() andalso ReleaseType =:= cloud.

View File

@ -46,6 +46,18 @@ payload_template.desc:
payload_template.label:
"""Payload template"""
attributes_template.desc:
"""The template for formatting the outgoing message attributes. Undefined values will be rendered as empty string values. Empty keys are removed from the attribute map."""
attributes_template.label:
"""Attributes template"""
ordering_key_template.desc:
"""The template for formatting the outgoing message ordering key. Undefined values will be rendered as empty string values. This value will not be added to the message if it's empty."""
ordering_key_template.label:
"""Ordering Key template"""
pipelining.desc:
"""A positive integer. Whether to send HTTP requests continuously, when set to 1, it means that after each HTTP request is sent, you need to wait for the server to return and then continue to send the next request."""
@ -64,6 +76,36 @@ pubsub_topic.desc:
pubsub_topic.label:
"""GCP PubSub Topic"""
producer_attributes.desc:
"""List of key-value pairs representing templates to construct the attributes for a given GCP PubSub message. Both keys and values support the placeholder `${var_name}` notation. Keys that are undefined or resolve to an empty string are omitted from the attribute map."""
producer_attributes.label:
"""Attributes Template"""
producer_ordering_key.desc:
"""Template for the Ordering Key of a given GCP PubSub message. If the resolved value is undefined or an empty string, the ordering key property is omitted from the message."""
producer_ordering_key.label:
"""Ordering Key Template"""
kv_pair_desc.desc:
"""Key-value pair."""
kv_pair_desc.label:
"""Key-value pair"""
kv_pair_key.desc:
"""Key"""
kv_pair_key.label:
"""Key"""
kv_pair_value.desc:
"""Value"""
kv_pair_value.label:
"""Value"""
service_account_json.desc:
"""JSON containing the GCP Service Account credentials to be used with PubSub.
When a GCP Service Account is created (as described in https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount), you have the option of downloading the credentials in JSON form. That's the file needed."""

View File

@ -57,4 +57,9 @@ who.desc:
who.label:
"""Ban Object"""
clear_banned_api.desc:
"""Clear all banned data."""
clear_banned_api.label:
"""Clear"""
}

View File

@ -149,4 +149,10 @@ wait_queue_timeout.desc:
wait_queue_timeout.label:
"""Wait Queue Timeout"""
use_legacy_protocol.desc:
"""Whether to use MongoDB's legacy protocol for communicating with the database. The default is to attempt to automatically determine if the newer protocol is supported."""
use_legacy_protocol.label:
"""Use legacy protocol"""
}

Some files were not shown because too many files have changed in this diff Show More