Merge remote-tracking branch 'origin/master' into 1222-sync-e5.4.0-build.2-to-master

This commit is contained in:
Zaiming (Stone) Shi 2023-12-22 21:27:27 +01:00
commit 9fdac4af0c
158 changed files with 2118 additions and 1225 deletions

View File

@ -4,7 +4,7 @@ services:
greptimedb:
container_name: greptimedb
hostname: greptimedb
image: greptime/greptimedb:0.3.2
image: greptime/greptimedb:v0.4.4
expose:
- "4000"
- "4001"

View File

@ -18,7 +18,7 @@ services:
- /tmp/emqx-ci/emqx-shared-secret:/var/lib/secret
kdc:
hostname: kdc.emqx.net
image: ghcr.io/emqx/emqx-builder/5.2-3:1.14.5-25.3.2-2-ubuntu22.04
image: ghcr.io/emqx/emqx-builder/5.2-8:1.15.7-26.1.2-2-ubuntu22.04
container_name: kdc.emqx.net
expose:
- 88 # kdc

View File

@ -3,7 +3,7 @@ version: '3.9'
services:
erlang:
container_name: erlang
image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.2-3:1.14.5-25.3.2-2-ubuntu22.04}
image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.2-8:1.15.7-26.1.2-2-ubuntu22.04}
env_file:
- credentials.env
- conf.env

View File

@ -3,7 +3,7 @@ inputs:
profile: # emqx, emqx-enterprise
required: true
type: string
otp: # 25.3.2-2
otp:
required: true
type: string
os:

View File

@ -17,17 +17,17 @@ env:
jobs:
sanity-checks:
runs-on: ubuntu-22.04
container: "ghcr.io/emqx/emqx-builder/5.2-3:1.14.5-25.3.2-2-ubuntu22.04"
container: "ghcr.io/emqx/emqx-builder/5.2-8:1.15.7-26.1.2-2-ubuntu22.04"
outputs:
ct-matrix: ${{ steps.matrix.outputs.ct-matrix }}
ct-host: ${{ steps.matrix.outputs.ct-host }}
ct-docker: ${{ steps.matrix.outputs.ct-docker }}
version-emqx: ${{ steps.matrix.outputs.version-emqx }}
version-emqx-enterprise: ${{ steps.matrix.outputs.version-emqx-enterprise }}
builder: "ghcr.io/emqx/emqx-builder/5.2-3:1.14.5-25.3.2-2-ubuntu22.04"
builder_vsn: "5.2-3"
otp_vsn: "25.3.2-2"
elixir_vsn: "1.14.5"
builder: "ghcr.io/emqx/emqx-builder/5.2-8:1.15.7-26.1.2-2-ubuntu22.04"
builder_vsn: "5.2-8"
otp_vsn: "26.1.2-2"
elixir_vsn: "1.15.7"
steps:
- uses: actions/checkout@v3
@ -92,14 +92,14 @@ jobs:
MATRIX="$(echo "${APPS}" | jq -c '
[
(.[] | select(.profile == "emqx") | . + {
builder: "5.2-3",
otp: "25.3.2-2",
elixir: "1.14.5"
builder: "5.2-8",
otp: "26.1.2-2",
elixir: "1.15.7"
}),
(.[] | select(.profile == "emqx-enterprise") | . + {
builder: "5.2-3",
otp: ["25.3.2-2"][],
elixir: "1.14.5"
builder: "5.2-8",
otp: ["26.1.2-2"][],
elixir: "1.15.7"
})
]
')"

View File

@ -20,7 +20,7 @@ env:
jobs:
prepare:
runs-on: ubuntu-22.04
container: 'ghcr.io/emqx/emqx-builder/5.2-3:1.14.5-25.3.2-2-ubuntu22.04'
container: 'ghcr.io/emqx/emqx-builder/5.2-8:1.15.7-26.1.2-2-ubuntu22.04'
outputs:
profile: ${{ steps.parse-git-ref.outputs.profile }}
release: ${{ steps.parse-git-ref.outputs.release }}
@ -29,10 +29,10 @@ jobs:
ct-matrix: ${{ steps.matrix.outputs.ct-matrix }}
ct-host: ${{ steps.matrix.outputs.ct-host }}
ct-docker: ${{ steps.matrix.outputs.ct-docker }}
builder: 'ghcr.io/emqx/emqx-builder/5.2-3:1.14.5-25.3.2-2-ubuntu22.04'
builder_vsn: '5.2-3'
otp_vsn: '25.3.2-2'
elixir_vsn: '1.14.5'
builder: 'ghcr.io/emqx/emqx-builder/5.2-8:1.15.7-26.1.2-2-ubuntu22.04'
builder_vsn: '5.2-8'
otp_vsn: '26.1.2-2'
elixir_vsn: '1.15.7'
steps:
- uses: actions/checkout@v3
@ -62,14 +62,14 @@ jobs:
MATRIX="$(echo "${APPS}" | jq -c '
[
(.[] | select(.profile == "emqx") | . + {
builder: "5.2-3",
otp: "25.3.2-2",
elixir: "1.14.5"
builder: "5.2-8",
otp: "26.1.2-2",
elixir: "1.15.7"
}),
(.[] | select(.profile == "emqx-enterprise") | . + {
builder: "5.2-3",
otp: ["25.3.2-2"][],
elixir: "1.14.5"
builder: "5.2-8",
otp: ["26.1.2-2"][],
elixir: "1.15.7"
})
]
')"

View File

@ -58,15 +58,15 @@ on:
otp_vsn:
required: false
type: string
default: '25.3.2-2'
default: '26.1.2-2'
elixir_vsn:
required: false
type: string
default: '1.14.5'
default: '1.15.7'
builder_vsn:
required: false
type: string
default: '5.2-3'
default: '5.2-8'
permissions:
contents: read

View File

@ -54,15 +54,15 @@ on:
otp_vsn:
required: false
type: string
default: '25.3.2-2'
default: '26.1.2-2'
elixir_vsn:
required: false
type: string
default: '1.14.5'
default: '1.15.7'
builder_vsn:
required: false
type: string
default: '5.2-3'
default: '5.2-8'
jobs:
mac:

View File

@ -14,26 +14,18 @@ jobs:
if: github.repository_owner == 'emqx'
runs-on: ${{ endsWith(github.repository, '/emqx') && 'ubuntu-22.04' || fromJSON('["self-hosted","ephemeral","linux","x64"]') }}
container:
image: "ghcr.io/emqx/emqx-builder/${{ matrix.builder }}:${{ matrix.elixir }}-${{ matrix.otp }}-${{ matrix.os }}"
image: "ghcr.io/emqx/emqx-builder/${{ matrix.profile[2] }}-${{ matrix.os }}"
strategy:
fail-fast: false
matrix:
profile:
- ['emqx', 'master']
- ['emqx-enterprise', 'release-54']
otp:
- 25.3.2-2
arch:
- x64
- ['emqx', 'master', '5.2-8:1.15.7-26.1.2-2']
- ['emqx-enterprise', 'release-54', '5.2-3:1.14.5-25.3.2-2']
os:
- debian10
- ubuntu22.04
- amzn2023
builder:
- 5.2-3
elixir:
- 1.14.5
defaults:
run:
@ -98,7 +90,7 @@ jobs:
branch:
- master
otp:
- 25.3.2-2
- 26.1.2-2
os:
- macos-12-arm64

View File

@ -27,19 +27,19 @@ on:
builder:
required: false
type: string
default: 'ghcr.io/emqx/emqx-builder/5.2-3:1.14.5-25.3.2-2-ubuntu22.04'
default: 'ghcr.io/emqx/emqx-builder/5.2-8:1.15.7-26.1.2-2-ubuntu22.04'
builder_vsn:
required: false
type: string
default: '5.2-3'
default: '5.2-8'
otp_vsn:
required: false
type: string
default: '25.3.2-2'
default: '26.1.2-2'
elixir_vsn:
required: false
type: string
default: '1.14.5'
default: '1.15.7'
jobs:
linux:
@ -51,8 +51,8 @@ jobs:
fail-fast: false
matrix:
profile:
- ["emqx", "25.3.2-2", "ubuntu20.04", "elixir"]
- ["emqx-enterprise", "25.3.2-2", "ubuntu20.04", "erlang"]
- ["emqx", "26.1.2-2", "ubuntu20.04", "elixir"]
- ["emqx-enterprise", "26.1.2-2", "ubuntu20.04", "erlang"]
container: "ghcr.io/emqx/emqx-builder/${{ inputs.builder_vsn }}:${{ inputs.elixir_vsn }}-${{ matrix.profile[1] }}-${{ matrix.profile[2] }}"

View File

@ -20,7 +20,7 @@ jobs:
actions: read
security-events: write
container:
image: ghcr.io/emqx/emqx-builder/5.2-3:1.14.5-25.3.2-2-ubuntu22.04
image: ghcr.io/emqx/emqx-builder/5.2-8:1.15.7-26.1.2-2-ubuntu22.04
strategy:
fail-fast: false

View File

@ -26,7 +26,7 @@ jobs:
prepare:
runs-on: ubuntu-latest
if: github.repository_owner == 'emqx'
container: ghcr.io/emqx/emqx-builder/5.2-3:1.14.5-25.3.2-2-ubuntu20.04
container: ghcr.io/emqx/emqx-builder/5.2-8:1.15.7-26.1.2-2-ubuntu20.04
outputs:
BENCH_ID: ${{ steps.prepare.outputs.BENCH_ID }}
PACKAGE_FILE: ${{ steps.package_file.outputs.PACKAGE_FILE }}

View File

@ -74,7 +74,7 @@ jobs:
steps:
- uses: erlef/setup-beam@v1.16.0
with:
otp-version: 25.3.2
otp-version: 26.1.2
- uses: actions/checkout@v3
with:
repository: hawk/lux

2
.gitignore vendored
View File

@ -61,7 +61,7 @@ erlang_ls.config
.envrc
# elixir
mix.lock
apps/emqx/test/emqx_static_checks_data/master.bpapi
apps/emqx/test/emqx_static_checks_data/master.bpapi2
# rendered configurations
*.conf.rendered
*.conf.rendered.*

View File

@ -1,2 +1,2 @@
erlang 26.1.2-1
erlang 26.1.2-2
elixir 1.15.7-otp-26

View File

@ -7,7 +7,7 @@ REBAR = $(CURDIR)/rebar3
BUILD = $(CURDIR)/build
SCRIPTS = $(CURDIR)/scripts
export EMQX_RELUP ?= true
export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.1-4:1.14.5-25.3.2-2-debian11
export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.2-8:1.15.7-26.1.2-2-debian11
export EMQX_DEFAULT_RUNNER = public.ecr.aws/debian/debian:11-slim
export EMQX_REL_FORM ?= tgz
export QUICER_DOWNLOAD_FROM_RELEASE = 1
@ -102,7 +102,7 @@ endif
# Allow user-set GROUPS environment variable
ifneq ($(GROUPS),)
GROUPS_ARG := --groups $(GROUPS)
GROUPS_ARG := --group $(GROUPS)
endif
ifeq ($(ENABLE_COVER_COMPILE),1)

View File

@ -83,7 +83,9 @@ docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p
Ветка `master` предназначена для последней версии 5, переключитесь на ветку `main-v4.4` для версии 4.4.
EMQX требует OTP 24 для версии 4.4. Версию 5.0 можно собирать с OTP 24 или 25.
EMQX требует OTP 24 для версии 4.4.
Версии 5.0 ~ 5.3 могут быть собраны с OTP 24 или 25.
Версия 5.4 и новее могут быть собраны с OTP 25 или 26.
```bash
git clone https://github.com/emqx/emqx.git

View File

@ -95,7 +95,9 @@ For more organised improvement proposals, you can send pull requests to [EIP](ht
The `master` branch tracks the latest version 5. For version 4.4 checkout the `main-v4.4` branch.
EMQX 4.4 requires OTP 24. EMQX 5.0 and 5.1 can be built with OTP 24 or 25.
EMQX 4.4 requires OTP 24.
EMQX 5.0 ~ 5.3 can be built with OTP 24 or 25.
EMQX 5.4 and newer can be built with OTP 24 or 25.
```bash
git clone https://github.com/emqx/emqx.git

View File

@ -24,9 +24,13 @@
-define(DEFAULT_ACTION_QOS, 0).
-define(DEFAULT_ACTION_RETAIN, false).
-define(AUTHZ_SUBSCRIBE_MATCH_MAP(QOS), #{action_type := subscribe, qos := QOS}).
-define(AUTHZ_SUBSCRIBE(QOS), #{action_type => subscribe, qos => QOS}).
-define(AUTHZ_SUBSCRIBE, ?AUTHZ_SUBSCRIBE(?DEFAULT_ACTION_QOS)).
-define(AUTHZ_PUBLISH_MATCH_MAP(QOS, RETAIN), #{
action_type := publish, qos := QOS, retain := RETAIN
}).
-define(AUTHZ_PUBLISH(QOS, RETAIN), #{action_type => publish, qos => QOS, retain => RETAIN}).
-define(AUTHZ_PUBLISH(QOS), ?AUTHZ_PUBLISH(QOS, ?DEFAULT_ACTION_RETAIN)).
-define(AUTHZ_PUBLISH, ?AUTHZ_PUBLISH(?DEFAULT_ACTION_QOS)).

View File

@ -19,6 +19,7 @@
{emqx_delayed,1}.
{emqx_delayed,2}.
{emqx_ds,1}.
{emqx_ds,2}.
{emqx_eviction_agent,1}.
{emqx_eviction_agent,2}.
{emqx_exhook,1}.

View File

@ -66,7 +66,6 @@
{plt_location, "."},
{plt_prefix, "emqx_dialyzer"},
{plt_apps, all_apps},
{plt_extra_apps, [hocon]},
{statistics, true}
]}.

View File

@ -28,8 +28,7 @@ Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.308"}}}.
Dialyzer = fun(Config) ->
{dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config),
{plt_extra_apps, OldExtra} = lists:keyfind(plt_extra_apps, 1, OldDialyzerConfig),
Extra = OldExtra ++ [quicer || IsQuicSupp()],
Extra = [quicer || IsQuicSupp()],
NewDialyzerConfig = [{plt_extra_apps, Extra} | OldDialyzerConfig],
lists:keystore(
dialyzer,

View File

@ -145,12 +145,12 @@ free to cache it for the duration of the session.
# New minor release
After releasing, let's say, 5.1.0, the following actions should be performed to prepare for the next release:
After releasing, let's say, 5.5.0, the following actions should be performed to prepare for the next release:
1. Checkout 5.1.0 tag
1. Checkout 5.5.0 tag
1. Build the code
1. Replace api version string `"master"` in `apps/emqx/test/emqx_static_checks_data/master.bpapi` with `"5.1"`
1. Rename `apps/emqx/test/emqx_static_checks_data/master.bpapi` to `apps/emqx/test/emqx_static_checks_data/5.1.bpapi`
1. Add `apps/emqx/test/emqx_static_checks_data/5.1.bpapi` to the repo
1. Delete the previous file (e.g. `5.0.bpapi`), unless there is plan to support rolling upgrade from 5.0 to 5.2
1. Replace api version string `"master"` in `apps/emqx/test/emqx_static_checks_data/master.bpapi2` with `"5.5"`
1. Rename `apps/emqx/test/emqx_static_checks_data/master.bpapi` to `apps/emqx/test/emqx_static_checks_data/5.5.bpapi2`
1. Add `apps/emqx/test/emqx_static_checks_data/5.5.bpapi2` to the repo
1. Delete the previous file (e.g. `5.4.bpapi2`), unless there is plan to support rolling upgrade from 5.0.
1. Merge the commit to master branch

View File

@ -10,6 +10,7 @@
stdlib,
gproc,
gen_rpc,
mnesia,
mria,
ekka,
esockd,
@ -17,7 +18,12 @@
sasl,
lc,
hocon,
emqx_durable_storage
emqx_durable_storage,
bcrypt,
pbkdf2,
emqx_http_lib,
recon,
os_mon
]},
{mod, {emqx_app, []}},
{env, []},

View File

@ -22,7 +22,8 @@
-export([
authenticate/1,
authorize/3
authorize/3,
format_action/1
]).
-ifdef(TEST).
@ -152,6 +153,7 @@ do_authorize(ClientInfo, Action, Topic) ->
case run_hooks('client.authorize', [ClientInfo, Action, Topic], Default) of
AuthzResult = #{result := Result} when Result == allow; Result == deny ->
From = maps:get(from, AuthzResult, unknown),
ok = log_result(ClientInfo, Topic, Action, From, NoMatch),
emqx_hooks:run(
'client.check_authz_complete',
[ClientInfo, Action, Topic, Result, From]
@ -170,6 +172,42 @@ do_authorize(ClientInfo, Action, Topic) ->
deny
end.
log_result(#{username := Username}, Topic, Action, From, Result) ->
LogMeta = fun() ->
#{
username => Username,
topic => Topic,
action => format_action(Action),
source => format_from(From)
}
end,
case Result of
allow -> ?SLOG(info, (LogMeta())#{msg => "authorization_permission_allowed"});
deny -> ?SLOG(warning, (LogMeta())#{msg => "authorization_permission_denied"})
end.
%% @private Format authorization rules source.
format_from(default) ->
"'authorization.no_match' config";
format_from(unknown) ->
"'client.authorize' hook callback";
format_from(Type) ->
Type.
%% @doc Format enriched action info for logging.
format_action(?AUTHZ_SUBSCRIBE_MATCH_MAP(QoS)) ->
"SUBSCRIBE(" ++ format_qos(QoS) ++ ")";
format_action(?AUTHZ_PUBLISH_MATCH_MAP(QoS, Retain)) ->
"PUBLISH(" ++ format_qos(QoS) ++ "," ++ format_retain_flag(Retain) ++ ")".
format_qos(QoS) ->
"Q" ++ integer_to_list(QoS).
format_retain_flag(true) ->
"R1";
format_retain_flag(false) ->
"R0".
-compile({inline, [run_hooks/3]}).
run_hooks(Name, Args, Acc) ->
ok = emqx_metrics:inc(Name),

View File

@ -24,7 +24,8 @@
register_der_crls/2,
refresh/1,
evict/1,
update_config/1
update_config/1,
info/0
]).
%% gen_server callbacks
@ -104,6 +105,11 @@ update_config(Conf) ->
register_der_crls(URL, CRLs) when is_list(CRLs) ->
gen_server:cast(?MODULE, {register_der_crls, URL, CRLs}).
-spec info() -> #{atom() => _}.
info() ->
[state | State] = tuple_to_list(sys:get_state(?MODULE)),
maps:from_list(lists:zip(record_info(fields, state), State)).
%%--------------------------------------------------------------------
%% gen_server behaviour
%%--------------------------------------------------------------------

View File

@ -113,8 +113,8 @@ n_inflight(#inflight{offset_ranges = Ranges}) ->
fun
(#ds_pubrange{type = ?T_CHECKPOINT}, N) ->
N;
(#ds_pubrange{type = ?T_INFLIGHT, id = {_, First}, until = Until}, N) ->
N + range_size(First, Until)
(#ds_pubrange{type = ?T_INFLIGHT} = Range, N) ->
N + range_size(Range)
end,
0,
Ranges
@ -185,8 +185,12 @@ poll(PreprocFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSi
{[], Inflight0};
true ->
%% TODO: Wrap this in `mria:async_dirty/2`?
Streams = shuffle(get_streams(SessionId)),
fetch(PreprocFun, SessionId, Inflight0, Streams, FreeSpace, [])
Checkpoints = find_checkpoints(Inflight0#inflight.offset_ranges),
StreamGroups = group_streams(get_streams(SessionId)),
{Publihes, Inflight} =
fetch(PreprocFun, SessionId, Inflight0, Checkpoints, StreamGroups, FreeSpace, []),
%% Discard now irrelevant QoS0-only ranges, if any.
{Publihes, discard_committed(SessionId, Inflight)}
end.
%% Which seqno this track is committed until.
@ -238,7 +242,7 @@ find_committed_until(Track, Ranges) ->
Ranges
),
case RangesUncommitted of
[#ds_pubrange{id = {_, CommittedUntil}} | _] ->
[#ds_pubrange{id = {_, CommittedUntil, _StreamRef}} | _] ->
CommittedUntil;
[] ->
undefined
@ -249,28 +253,26 @@ get_ranges(SessionId) ->
Pat = erlang:make_tuple(
record_info(size, ds_pubrange),
'_',
[{1, ds_pubrange}, {#ds_pubrange.id, {SessionId, '_'}}]
[{1, ds_pubrange}, {#ds_pubrange.id, {SessionId, '_', '_'}}]
),
mnesia:match_object(?SESSION_PUBRANGE_TAB, Pat, read).
fetch(PreprocFun, SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 ->
fetch(PreprocFun, SessionId, Inflight0, CPs, Groups, N, Acc) when N > 0, Groups =/= [] ->
#inflight{next_seqno = FirstSeqno, offset_ranges = Ranges} = Inflight0,
ItBegin = get_last_iterator(DSStream, Ranges),
{ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N),
case Messages of
{Stream, Groups2} = get_the_first_stream(Groups),
case get_next_n_messages_from_stream(Stream, CPs, N) of
[] ->
fetch(PreprocFun, SessionId, Inflight0, Streams, N, Acc);
_ ->
fetch(PreprocFun, SessionId, Inflight0, CPs, Groups2, N, Acc);
{ItBegin, ItEnd, Messages} ->
%% We need to preserve the iterator pointing to the beginning of the
%% range, so that we can replay it if needed.
{Publishes, UntilSeqno} = publish_fetch(PreprocFun, FirstSeqno, Messages),
Size = range_size(FirstSeqno, UntilSeqno),
Range0 = #ds_pubrange{
id = {SessionId, FirstSeqno},
id = {SessionId, FirstSeqno, Stream#ds_stream.ref},
type = ?T_INFLIGHT,
tracks = compute_pub_tracks(Publishes),
until = UntilSeqno,
stream = DSStream#ds_stream.ref,
iterator = ItBegin
},
ok = preserve_range(Range0),
@ -282,9 +284,9 @@ fetch(PreprocFun, SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0
next_seqno = UntilSeqno,
offset_ranges = Ranges ++ [Range]
},
fetch(PreprocFun, SessionId, Inflight, Streams, N - Size, [Publishes | Acc])
fetch(PreprocFun, SessionId, Inflight, CPs, Groups2, N - Size, [Publishes | Acc])
end;
fetch(_ReplyFun, _SessionId, Inflight, _Streams, _N, Acc) ->
fetch(_ReplyFun, _SessionId, Inflight, _CPs, _Groups, _N, Acc) ->
Publishes = lists:append(lists:reverse(Acc)),
{Publishes, Inflight}.
@ -300,9 +302,9 @@ discard_committed(
find_checkpoints(Ranges) ->
lists:foldl(
fun(#ds_pubrange{stream = StreamRef, until = Until}, Acc) ->
fun(#ds_pubrange{id = {_SessionId, _, StreamRef}} = Range, Acc) ->
%% For each stream, remember the last range over this stream.
Acc#{StreamRef => Until}
Acc#{StreamRef => Range}
end,
#{},
Ranges
@ -312,7 +314,7 @@ discard_committed_ranges(
SessionId,
Commits,
Checkpoints,
Ranges = [Range = #ds_pubrange{until = Until, stream = StreamRef} | Rest]
Ranges = [Range = #ds_pubrange{id = {_SessionId, _, StreamRef}} | Rest]
) ->
case discard_committed_range(Commits, Range) of
discard ->
@ -321,11 +323,11 @@ discard_committed_ranges(
%% over this stream (i.e. a checkpoint).
RangeKept =
case maps:get(StreamRef, Checkpoints) of
CP when CP > Until ->
Range ->
[checkpoint_range(Range)];
_Previous ->
discard_range(Range),
[];
Until ->
[checkpoint_range(Range)]
[]
end,
%% Since we're (intentionally) not using transactions here, it's important to
%% issue database writes in the same order in which ranges are stored: from
@ -381,7 +383,9 @@ discard_tracks(#{ack := AckedUntil, comp := CompUntil}, Until, Tracks) ->
replay_range(
PreprocFun,
Commits,
Range0 = #ds_pubrange{type = ?T_INFLIGHT, id = {_, First}, until = Until, iterator = It},
Range0 = #ds_pubrange{
type = ?T_INFLIGHT, id = {_, First, _StreamRef}, until = Until, iterator = It
},
Acc
) ->
Size = range_size(First, Until),
@ -427,7 +431,7 @@ get_commit_next(comp, #inflight{commits = Commits}) ->
publish_fetch(PreprocFun, FirstSeqno, Messages) ->
flatmapfoldl(
fun(MessageIn, Acc) ->
fun({_DSKey, MessageIn}, Acc) ->
Message = PreprocFun(MessageIn),
publish_fetch(Message, Acc)
end,
@ -446,7 +450,7 @@ publish_fetch(Messages, Seqno) ->
publish_replay(PreprocFun, Commits, FirstSeqno, Messages) ->
#{ack := AckedUntil, comp := CompUntil, rec := RecUntil} = Commits,
flatmapfoldl(
fun(MessageIn, Acc) ->
fun({_DSKey, MessageIn}, Acc) ->
Message = PreprocFun(MessageIn),
publish_replay(Message, AckedUntil, CompUntil, RecUntil, Acc)
end,
@ -545,10 +549,10 @@ checkpoint_range(Range = #ds_pubrange{type = ?T_CHECKPOINT}) ->
%% This range should have been checkpointed already.
Range.
get_last_iterator(DSStream = #ds_stream{ref = StreamRef}, Ranges) ->
case lists:keyfind(StreamRef, #ds_pubrange.stream, lists:reverse(Ranges)) of
false ->
DSStream#ds_stream.beginning;
get_last_iterator(Stream = #ds_stream{ref = StreamRef}, Checkpoints) ->
case maps:get(StreamRef, Checkpoints, none) of
none ->
Stream#ds_stream.beginning;
#ds_pubrange{iterator = ItNext} ->
ItNext
end.
@ -593,16 +597,32 @@ packet_id_to_seqno_(NextSeqno, PacketId) ->
N - ?EPOCH_SIZE
end.
range_size(#ds_pubrange{id = {_, First, _StreamRef}, until = Until}) ->
range_size(First, Until).
range_size(FirstSeqno, UntilSeqno) ->
%% This function assumes that gaps in the sequence ID occur _only_ when the
%% packet ID wraps.
Size = UntilSeqno - FirstSeqno,
Size + (FirstSeqno bsr 16) - (UntilSeqno bsr 16).
%%================================================================================
%% stream scheduler
%% group streams by the first position in the rank
-spec group_streams(list(ds_stream())) -> list(list(ds_stream())).
group_streams(Streams) ->
Groups = maps:groups_from_list(
fun(#ds_stream{rank = {RankX, _}}) -> RankX end,
Streams
),
shuffle(maps:values(Groups)).
-spec shuffle([A]) -> [A].
shuffle(L0) ->
L1 = lists:map(
fun(A) ->
%% maybe topic/stream prioritization could be introduced here?
{rand:uniform(), A}
end,
L0
@ -611,6 +631,47 @@ shuffle(L0) ->
{_, L} = lists:unzip(L2),
L.
get_the_first_stream([Group | Groups]) ->
case get_next_stream_from_group(Group) of
{Stream, {sorted, []}} ->
{Stream, Groups};
{Stream, Group2} ->
{Stream, [Group2 | Groups]};
undefined ->
get_the_first_stream(Groups)
end;
get_the_first_stream([]) ->
%% how this possible ?
throw(#{reason => no_valid_stream}).
%% the scheduler is simple, try to get messages from the same shard, but it's okay to take turns
get_next_stream_from_group({sorted, [H | T]}) ->
{H, {sorted, T}};
get_next_stream_from_group({sorted, []}) ->
undefined;
get_next_stream_from_group(Streams) ->
[Stream | T] = lists:sort(
fun(#ds_stream{rank = {_, RankA}}, #ds_stream{rank = {_, RankB}}) ->
RankA < RankB
end,
Streams
),
{Stream, {sorted, T}}.
get_next_n_messages_from_stream(Stream, CPs, N) ->
ItBegin = get_last_iterator(Stream, CPs),
case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N) of
{ok, _ItEnd, []} ->
[];
{ok, ItEnd, Messages} ->
{ItBegin, ItEnd, Messages};
{ok, end_of_stream} ->
%% TODO: how to skip this closed stream or it should be taken over by lower level layer
[]
end.
%%================================================================================
-spec flatmapfoldl(fun((X, Acc) -> {Y | [Y], Acc}), Acc, [X]) -> {[Y], Acc}.
flatmapfoldl(_Fun, Acc, []) ->
{[], Acc};
@ -697,23 +758,23 @@ compute_inflight_range_test_() ->
?_assertEqual(
{#{ack => 12, comp => 13}, 42},
compute_inflight_range([
#ds_pubrange{id = {<<>>, 1}, until = 2, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 4}, until = 8, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 11}, until = 12, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 1, 0}, until = 2, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 4, 0}, until = 8, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 11, 0}, until = 12, type = ?T_CHECKPOINT},
#ds_pubrange{
id = {<<>>, 12},
id = {<<>>, 12, 0},
until = 13,
type = ?T_INFLIGHT,
tracks = ?TRACK_FLAG(?ACK)
},
#ds_pubrange{
id = {<<>>, 13},
id = {<<>>, 13, 0},
until = 20,
type = ?T_INFLIGHT,
tracks = ?TRACK_FLAG(?COMP)
},
#ds_pubrange{
id = {<<>>, 20},
id = {<<>>, 20, 0},
until = 42,
type = ?T_INFLIGHT,
tracks = ?TRACK_FLAG(?ACK) bor ?TRACK_FLAG(?COMP)
@ -723,10 +784,10 @@ compute_inflight_range_test_() ->
?_assertEqual(
{#{ack => 13, comp => 13}, 13},
compute_inflight_range([
#ds_pubrange{id = {<<>>, 1}, until = 2, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 4}, until = 8, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 11}, until = 12, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 12}, until = 13, type = ?T_CHECKPOINT}
#ds_pubrange{id = {<<>>, 1, 0}, until = 2, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 4, 0}, until = 8, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 11, 0}, until = 12, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 12, 0}, until = 13, type = ?T_CHECKPOINT}
])
)
].

View File

@ -214,8 +214,8 @@ info(subscriptions_max, #{props := Conf}) ->
maps:get(max_subscriptions, Conf);
info(upgrade_qos, #{props := Conf}) ->
maps:get(upgrade_qos, Conf);
% info(inflight, #sessmem{inflight = Inflight}) ->
% Inflight;
info(inflight, #{inflight := Inflight}) ->
Inflight;
info(inflight_cnt, #{inflight := Inflight}) ->
emqx_persistent_message_ds_replayer:n_inflight(Inflight);
info(inflight_max, #{receive_maximum := ReceiveMaximum}) ->
@ -433,7 +433,8 @@ handle_timeout(
{ok, Publishes, Session};
handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session) ->
renew_streams(Session),
{ok, [], emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session)};
Interval = emqx_config:get([session_persistence, renew_streams_interval]),
{ok, [], emqx_session:ensure_timer(?TIMER_GET_STREAMS, Interval, Session)};
handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) ->
%% Note: we take a pessimistic approach here and assume that the client will be alive
%% until the next bump timeout. With this, we avoid garbage collecting this session
@ -791,8 +792,8 @@ session_read_pubranges(DSSessionID) ->
session_read_pubranges(DSSessionId, LockKind) ->
MS = ets:fun2ms(
fun(#ds_pubrange{id = {Sess, First}}) when Sess =:= DSSessionId ->
{DSSessionId, First}
fun(#ds_pubrange{id = ID}) when element(1, ID) =:= DSSessionId ->
ID
end
),
mnesia:select(?SESSION_PUBRANGE_TAB, MS, LockKind).
@ -1083,10 +1084,15 @@ list_all_streams() ->
list_all_pubranges() ->
DSPubranges = mnesia:dirty_match_object(?SESSION_PUBRANGE_TAB, #ds_pubrange{_ = '_'}),
lists:foldl(
fun(Record = #ds_pubrange{id = {SessionId, First}}, Acc) ->
Range = export_record(
Record, #ds_pubrange.until, [until, stream, type, iterator], #{first => First}
),
fun(Record = #ds_pubrange{id = {SessionId, First, StreamRef}}, Acc) ->
Range = #{
session => SessionId,
stream => StreamRef,
first => First,
until => Record#ds_pubrange.until,
type => Record#ds_pubrange.type,
iterator => Record#ds_pubrange.iterator
},
maps:put(SessionId, maps:get(SessionId, Acc, []) ++ [Range], Acc)
end,
#{},

View File

@ -50,20 +50,18 @@
%% What session this range belongs to.
_Session :: emqx_persistent_session_ds:id(),
%% Where this range starts.
_First :: emqx_persistent_message_ds_replayer:seqno()
_First :: emqx_persistent_message_ds_replayer:seqno(),
%% Which stream this range is over.
_StreamRef
},
%% Where this range ends: the first seqno that is not included in the range.
until :: emqx_persistent_message_ds_replayer:seqno(),
%% Which stream this range is over.
stream :: _StreamRef,
%% Type of a range:
%% * Inflight range is a range of yet unacked messages from this stream.
%% * Checkpoint range was already acked, its purpose is to keep track of the
%% very last iterator for this stream.
type :: ?T_INFLIGHT | ?T_CHECKPOINT,
%% What commit tracks this range is part of.
%% This is rarely stored: we only need to persist it when the range contains
%% QoS 2 messages.
tracks = 0 :: non_neg_integer(),
%% Meaning of this depends on the type of the range:
%% * For inflight range, this is the iterator pointing to the first message in

View File

@ -82,7 +82,7 @@ try_gc() ->
CoreNodes = mria_membership:running_core_nodelist(),
Res = global:trans(
{?MODULE, self()},
fun() -> ?tp_span(ds_session_gc, #{}, start_gc()) end,
fun() -> ?tp_span(debug, ds_session_gc, #{}, start_gc()) end,
CoreNodes,
%% Note: we set retries to 1 here because, in rare occasions, GC might start at the
%% same time in more than one node, and each one will abort the other. By allowing

View File

@ -19,18 +19,11 @@
-include("emqx.hrl").
-export([
delete_direct_route/2,
delete_trie_route/2,
delete_session_trie_route/2,
insert_direct_route/2,
insert_trie_route/2,
insert_session_trie_route/2,
maybe_trans/3
]).
insert_direct_route(Tab, Route) ->
mria:dirty_write(Tab, Route).
insert_trie_route(RouteTab, Route = #route{topic = Topic}) ->
case mnesia:wread({RouteTab, Topic}) of
[] -> emqx_trie:insert(Topic);
@ -38,31 +31,12 @@ insert_trie_route(RouteTab, Route = #route{topic = Topic}) ->
end,
mnesia:write(RouteTab, Route, sticky_write).
insert_session_trie_route(RouteTab, Route = #route{topic = Topic}) ->
case mnesia:wread({RouteTab, Topic}) of
[] -> emqx_trie:insert_session(Topic);
_ -> ok
end,
mnesia:write(RouteTab, Route, sticky_write).
delete_direct_route(RouteTab, Route) ->
mria:dirty_delete_object(RouteTab, Route).
delete_trie_route(RouteTab, Route) ->
delete_trie_route(RouteTab, Route, normal).
delete_session_trie_route(RouteTab, Route) ->
delete_trie_route(RouteTab, Route, session).
delete_trie_route(RouteTab, Route = #route{topic = Topic}, Type) ->
delete_trie_route(RouteTab, Route = #route{topic = Topic}) ->
case mnesia:wread({RouteTab, Topic}) of
[R] when R =:= Route ->
%% Remove route and trie
ok = mnesia:delete_object(RouteTab, Route, sticky_write),
case Type of
normal -> emqx_trie:delete(Topic);
session -> emqx_trie:delete_session(Topic)
end;
emqx_trie:delete(Topic);
[_ | _] ->
%% Remove route only
mnesia:delete_object(RouteTab, Route, sticky_write);

View File

@ -1805,6 +1805,14 @@ fields("session_persistence") ->
desc => ?DESC(session_ds_last_alive_update_interval)
}
)},
{"renew_streams_interval",
sc(
timeout_duration(),
#{
default => <<"5000ms">>,
importance => ?IMPORTANCE_HIDDEN
}
)},
{"session_gc_interval",
sc(
timeout_duration(),

View File

@ -175,19 +175,11 @@
%% Behaviour
%% -------------------------------------------------------------------
-if(?OTP_RELEASE < 26).
-callback create(clientinfo(), conninfo(), conf()) ->
term().
-callback open(clientinfo(), conninfo(), conf()) ->
term().
-callback destroy(t() | clientinfo()) -> ok.
-else.
-callback create(clientinfo(), conninfo(), conf()) ->
t().
-callback open(clientinfo(), conninfo(), conf()) ->
{_IsPresent :: true, t(), _ReplayContext} | false.
-callback destroy(t() | clientinfo()) -> ok.
-endif.
%%--------------------------------------------------------------------
%% Create a Session

View File

@ -1,163 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The session router worker is responsible for buffering
%% messages for a persistent session while it is initializing. If a
%% connection process exists for a persistent session, this process is
%% used for bridging the gap while the new connection process takes
%% over the persistent session, but if there is no such process this
%% worker takes it place.
%%
%% The workers are started on all nodes, and buffers all messages that
%% are persisted to the session message table. In the final stage of
%% the initialization, the messages are delivered and the worker is
%% terminated.
-module(emqx_session_router_worker).
-behaviour(gen_server).
%% API
-export([
buffer/3,
pendings/1,
resume_end/3,
start_link/2
]).
%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
]).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-record(state, {
remote_pid :: pid(),
session_id :: binary(),
session_tab :: ets:table(),
messages :: ets:table(),
buffering :: boolean()
}).
%%%===================================================================
%%% API
%%%===================================================================
start_link(SessionTab, #{} = Opts) ->
gen_server:start_link(?MODULE, Opts#{session_tab => SessionTab}, []).
pendings(Pid) ->
gen_server:call(Pid, pendings).
resume_end(RemotePid, Pid, _SessionID) ->
case gen_server:call(Pid, {resume_end, RemotePid}) of
{ok, EtsHandle} ->
?tp(ps_worker_call_ok, #{
pid => Pid,
remote_pid => RemotePid,
sid => _SessionID
}),
{ok, ets:tab2list(EtsHandle)};
{error, _} = Err ->
?tp(ps_worker_call_failed, #{
pid => Pid,
remote_pid => RemotePid,
sid => _SessionID,
reason => Err
}),
Err
end.
buffer(Worker, STopic, Msg) ->
Worker ! {buffer, STopic, Msg},
ok.
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init(#{
remote_pid := RemotePid,
session_id := SessionID,
session_tab := SessionTab
}) ->
process_flag(trap_exit, true),
erlang:monitor(process, RemotePid),
?tp(ps_worker_started, #{
remote_pid => RemotePid,
sid => SessionID
}),
{ok, #state{
remote_pid = RemotePid,
session_id = SessionID,
session_tab = SessionTab,
messages = ets:new(?MODULE, [protected, ordered_set]),
buffering = true
}}.
handle_call(pendings, _From, State) ->
%% Debug API
{reply, {State#state.messages, State#state.remote_pid}, State};
handle_call({resume_end, RemotePid}, _From, #state{remote_pid = RemotePid} = State) ->
?tp(ps_worker_resume_end, #{sid => State#state.session_id}),
{reply, {ok, State#state.messages}, State#state{buffering = false}};
handle_call({resume_end, _RemotePid}, _From, State) ->
?tp(ps_worker_resume_end_error, #{sid => State#state.session_id}),
{reply, {error, wrong_remote_pid}, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info({buffer, _STopic, _Msg}, State) when not State#state.buffering ->
?tp(ps_worker_drop_deliver, #{
sid => State#state.session_id,
msg_id => emqx_message:id(_Msg)
}),
{noreply, State};
handle_info({buffer, STopic, Msg}, State) when State#state.buffering ->
?tp(ps_worker_deliver, #{
sid => State#state.session_id,
msg_id => emqx_message:id(Msg)
}),
ets:insert(State#state.messages, {{Msg, STopic}}),
{noreply, State};
handle_info({'DOWN', _, process, RemotePid, _Reason}, #state{remote_pid = RemotePid} = State) ->
?tp(warning, ps_worker, #{
event => worker_remote_died,
sid => State#state.session_id,
msg => "Remote pid died. Exiting."
}),
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(shutdown, _State) ->
?tp(ps_worker_shutdown, #{sid => _State#state.session_id}),
ok;
terminate(_, _State) ->
ok.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -1,64 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_session_router_worker_sup).
-behaviour(supervisor).
-export([start_link/1]).
-export([
abort_worker/1,
start_worker/2
]).
-export([init/1]).
start_link(SessionTab) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, SessionTab).
start_worker(SessionID, RemotePid) ->
supervisor:start_child(?MODULE, [
#{
session_id => SessionID,
remote_pid => RemotePid
}
]).
abort_worker(Pid) ->
supervisor:terminate_child(?MODULE, Pid).
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
init(SessionTab) ->
%% Resume worker
Worker = #{
id => session_router_worker,
start => {emqx_session_router_worker, start_link, [SessionTab]},
restart => transient,
shutdown => 2000,
type => worker,
modules => [emqx_session_router_worker]
},
Spec = #{
strategy => simple_one_for_one,
intensity => 1,
period => 5
},
{ok, {Spec, [Worker]}}.

View File

@ -26,12 +26,11 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
t_emqx_pubsub_api(_) ->
true = emqx:is_running(node()),

View File

@ -26,12 +26,14 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules([broker]),
emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start(
[{emqx, #{override_env => [{boot_modules, [broker]}]}}],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_suite(Config) ->
emqx_cth_suite:stop(proplists:get_value(apps, Config)).
init_per_testcase(_, Config) ->
Config.

View File

@ -19,29 +19,25 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_testcase(t_size_limit, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
{ok, _} = emqx:update_config([alarm], #{
<<"size_limit">> => 2
}),
Config;
init_per_testcase(_, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
{ok, _} = emqx:update_config([alarm], #{
<<"validity_period">> => <<"1s">>
}),
Config.
init_per_testcase(t_size_limit = TC, Config) ->
Apps = emqx_cth_suite:start(
[{emqx, "alarm.size_limit = 2"}],
#{work_dir => emqx_cth_suite:work_dir(TC, Config)}
),
[{apps, Apps} | Config];
init_per_testcase(TC, Config) ->
Apps = emqx_cth_suite:start(
[{emqx, "alarm.validity_period = \"1s\""}],
#{work_dir => emqx_cth_suite:work_dir(TC, Config)}
),
[{apps, Apps} | Config].
end_per_testcase(_, _Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_testcase(_, Config) ->
emqx_cth_suite:stop(proplists:get_value(apps, Config)).
t_alarm(_) ->
ok = emqx_alarm:activate(unknown_alarm),

View File

@ -24,12 +24,11 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_suite(Config) ->
emqx_cth_suite:stop(proplists:get_value(apps, Config)).
%%--------------------------------------------------------------------
%% Test cases

View File

@ -26,15 +26,11 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps([]),
ok = ekka:start(),
Config.
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
ekka:stop(),
mria:stop(),
mria_mnesia:delete_schema(),
emqx_common_test_helpers:stop_apps([]).
end_per_suite(Config) ->
emqx_cth_suite:stop(proplists:get_value(apps, Config)).
t_add_delete(_) ->
Banned = #banned{

View File

@ -23,6 +23,13 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ok = application:load(emqx),
Config.
end_per_suite(_) ->
ok = application:unload(emqx).
t_is_enabled(_) ->
try
ok = application:set_env(emqx, boot_modules, all),

View File

@ -26,16 +26,13 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps([emqx]),
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[mnesia:dirty_write(Rec) || Rec <- fake_records()],
Config.
[{apps, Apps} | Config].
end_per_suite(_Config) ->
end_per_suite(Config) ->
meck:unload(),
[mnesia:dirty_delete({?TAB, Key}) || #?TAB{key = Key} <- fake_records()],
emqx_bpapi:announce(emqx),
emqx_common_test_helpers:stop_apps([emqx]),
ok.
emqx_cth_suite:stop(?config(apps, Config)).
t_max_supported_version(_Config) ->
?assertMatch(3, emqx_bpapi:supported_version('fake-node2@localhost', api2)),

View File

@ -16,7 +16,7 @@
-module(emqx_bpapi_static_checks).
-export([run/0, dump/1, dump/0, check_compat/1, versions_file/0, dumps_dir/0]).
-export([run/0, dump/1, dump/0, check_compat/1, versions_file/0, dumps_dir/0, dump_file_extension/0]).
%% Using an undocumented API here :(
-include_lib("dialyzer/src/dialyzer.hrl").
@ -90,7 +90,7 @@
run() ->
case dump() of
true ->
Dumps = filelib:wildcard(dumps_dir() ++ "/*.bpapi"),
Dumps = filelib:wildcard(dumps_dir() ++ "/*" ++ dump_file_extension()),
case Dumps of
[] ->
logger:error("No BPAPI dumps are found in ~s, abort", [dumps_dir()]),
@ -293,7 +293,7 @@ prepare(#{reldir := RelDir, plt := PLT}) ->
xref:add_release(?XREF, RelDir),
%% Now to the dialyzer stuff:
logger:info("Loading PLT...", []),
dialyzer_plt:from_file(PLT).
load_plt(PLT).
%% erlfmt-ignore
find_remote_calls(_Opts) ->
@ -331,7 +331,7 @@ is_bpapi_call({Module, _Function, _Arity}) ->
-spec dump_api(fulldump()) -> ok.
dump_api(Term = #{api := _, signatures := _, release := Release}) ->
Filename = filename:join(dumps_dir(), Release ++ ".bpapi"),
Filename = filename:join(dumps_dir(), Release ++ dump_file_extension()),
ok = filelib:ensure_dir(Filename),
file:write_file(Filename, io_lib:format("~0p.~n", [Term])).
@ -436,3 +436,18 @@ emqx_app_dir() ->
project_root_dir() ->
filename:dirname(filename:dirname(emqx_app_dir())).
-if(?OTP_RELEASE >= 26).
load_plt(File) ->
dialyzer_cplt:from_file(File).
dump_file_extension() ->
%% OTP26 changes the internal format for the types:
".bpapi2".
-else.
load_plt(File) ->
dialyzer_plt:from_file(File).
dump_file_extension() ->
".bpapi".
-endif.

View File

@ -94,7 +94,7 @@ init_per_group(quic, Config) ->
[
{conn_fun, quic_connect},
{port, emqx_config:get([listeners, quic, test, bind])},
{ssl_opts, emqx_common_test_helpers:client_ssl_twoway()},
{ssl_opts, emqx_common_test_helpers:client_mtls()},
{ssl, true},
{group_apps, Apps}
| Config

View File

@ -31,12 +31,11 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(Case, Config) ->
?MODULE:Case({init, Config}).

View File

@ -83,14 +83,14 @@ groups() ->
].
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
emqx_config:put_listener_conf(ssl, default, [ssl_options, verify], verify_peer),
emqx_listeners:restart_listener('ssl:default'),
Config.
Apps = emqx_cth_suite:start(
[{emqx, "listeners.ssl.default.ssl_options.verify = verify_peer"}],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(_Case, Config) ->
Config.
@ -395,7 +395,7 @@ t_peercert_preserved_before_connected(_) ->
?HP_HIGHEST
),
ClientId = atom_to_binary(?FUNCTION_NAME),
SslConf = emqx_common_test_helpers:client_ssl_twoway(default),
SslConf = emqx_common_test_helpers:client_mtls(default),
{ok, Client} = emqtt:start_link([
{port, 8883},
{clientid, ClientId},
@ -455,7 +455,7 @@ tls_certcn_as_clientid(TLSVsn) ->
tls_certcn_as_clientid(TLSVsn, RequiredTLSVsn) ->
CN = <<"Client">>,
emqx_config:put_zone_conf(default, [mqtt, peer_cert_as_clientid], cn),
SslConf = emqx_common_test_helpers:client_ssl_twoway(TLSVsn),
SslConf = emqx_common_test_helpers:client_mtls(TLSVsn),
{ok, Client} = emqtt:start_link([{port, 8883}, {ssl, true}, {ssl_opts, SslConf}]),
{ok, _} = emqtt:connect(Client),
#{clientinfo := #{clientid := CN}} = emqx_cm:get_chan_info(CN),

View File

@ -19,7 +19,6 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_cm.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -54,12 +53,11 @@ suite() -> [{timetrap, {minutes, 2}}].
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_suite(Config) ->
emqx_cth_suite:stop(proplists:get_value(apps, Config)).
%%--------------------------------------------------------------------
%% TODO: Add more test cases

View File

@ -24,12 +24,11 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_suite(Config) ->
emqx_cth_suite:stop(proplists:get_value(apps, Config)).
t_start_link(_) ->
emqx_cm_locker:start_link().

View File

@ -28,12 +28,11 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_suite(Config) ->
emqx_cth_suite:stop(proplists:get_value(apps, Config)).
init_per_testcase(_TestCase, Config) ->
Config.

View File

@ -48,8 +48,10 @@
-export([
client_ssl/0,
client_ssl/1,
client_ssl_twoway/0,
client_ssl_twoway/1,
client_mtls/0,
client_mtls/1,
ssl_verify_fun_allow_any_host/0,
ssl_verify_fun_allow_any_host_impl/3,
ensure_mnesia_stopped/0,
ensure_quic_listener/2,
ensure_quic_listener/3,
@ -435,11 +437,11 @@ flush(Msgs) ->
after 0 -> lists:reverse(Msgs)
end.
client_ssl_twoway() ->
client_ssl_twoway(default).
client_mtls() ->
client_mtls(default).
client_ssl_twoway(TLSVsn) ->
client_certs() ++ ciphers(TLSVsn).
client_mtls(TLSVsn) ->
ssl_verify_fun_allow_any_host() ++ client_certs() ++ ciphers(TLSVsn).
%% Paths prepended to cert filenames
client_certs() ->

View File

@ -25,12 +25,21 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
WorkDir = emqx_cth_suite:work_dir(Config),
Apps = emqx_cth_suite:start(
[
{emqx, #{
override_env => [
{cluster_override_conf_file, filename:join(WorkDir, "cluster_override.conf")}
]
}}
],
#{work_dir => WorkDir}
),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(TestCase, Config) ->
try

View File

@ -30,12 +30,11 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(_Case, Config) ->
_ = file:delete(?CLUSTER_CONF),

View File

@ -57,10 +57,10 @@ init_per_suite(Config) ->
ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end),
ok = meck:expect(emqx_alarm, deactivate, fun(_, _) -> ok end),
emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
end_per_suite(Config) ->
ok = meck:unload(emqx_transport),
catch meck:unload(emqx_channel),
ok = meck:unload(emqx_cm),
@ -68,8 +68,8 @@ end_per_suite(_Config) ->
ok = meck:unload(emqx_metrics),
ok = meck:unload(emqx_hooks),
ok = meck:unload(emqx_alarm),
emqx_common_test_helpers:stop_apps([]),
ok.
emqx_cth_suite:stop(proplists:get_value(apps, Config)).
init_per_testcase(TestCase, Config) when
TestCase =/= t_ws_pingreq_before_connected

View File

@ -7,7 +7,8 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -34,14 +35,9 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
application:load(emqx),
{ok, _} = application:ensure_all_started(ssl),
emqx_config:save_schema_mod_and_names(emqx_schema),
emqx_common_test_helpers:boot_modules(all),
Config.
end_per_suite(_Config) ->
emqx_config:erase_all(),
ok.
init_per_testcase(TestCase, Config) when
@ -50,76 +46,78 @@ init_per_testcase(TestCase, Config) when
TestCase =:= t_revoked
->
ct:timetrap({seconds, 30}),
DataDir = ?config(data_dir, Config),
CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
{ok, CRLPem} = file:read_file(CRLFile),
[{'CertificateList', CRLDer, not_encrypted}] = public_key:pem_decode(CRLPem),
ok = snabbkaffe:start_trace(),
DataDir = ?config(data_dir, Config),
{CRLPem, CRLDer} = read_crl(filename:join(DataDir, "intermediate-revoked.crl.pem")),
ServerPid = start_crl_server(CRLPem),
IsCached = lists:member(TestCase, [t_filled_cache, t_revoked]),
ok = setup_crl_options(Config, #{is_cached => IsCached}),
Apps = start_emqx_with_crl_cache(#{is_cached => IsCached}, TestCase, Config),
[
{crl_pem, CRLPem},
{crl_der, CRLDer},
{http_server, ServerPid}
{http_server, ServerPid},
{tc_apps, Apps}
| Config
];
init_per_testcase(t_revoke_then_refresh, Config) ->
init_per_testcase(t_revoke_then_refresh = TestCase, Config) ->
ct:timetrap({seconds, 120}),
DataDir = ?config(data_dir, Config),
CRLFileNotRevoked = filename:join([DataDir, "intermediate-not-revoked.crl.pem"]),
{ok, CRLPemNotRevoked} = file:read_file(CRLFileNotRevoked),
[{'CertificateList', CRLDerNotRevoked, not_encrypted}] = public_key:pem_decode(
CRLPemNotRevoked
),
CRLFileRevoked = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
{ok, CRLPemRevoked} = file:read_file(CRLFileRevoked),
[{'CertificateList', CRLDerRevoked, not_encrypted}] = public_key:pem_decode(CRLPemRevoked),
ok = snabbkaffe:start_trace(),
DataDir = ?config(data_dir, Config),
{CRLPemNotRevoked, CRLDerNotRevoked} =
read_crl(filename:join(DataDir, "intermediate-not-revoked.crl.pem")),
{CRLPemRevoked, CRLDerRevoked} =
read_crl(filename:join(DataDir, "intermediate-revoked.crl.pem")),
ServerPid = start_crl_server(CRLPemNotRevoked),
ExtraVars = #{refresh_interval => <<"10s">>},
ok = setup_crl_options(Config, #{is_cached => true, extra_vars => ExtraVars}),
Apps = start_emqx_with_crl_cache(
#{is_cached => true, overrides => #{crl_cache => #{refresh_interval => <<"10s">>}}},
TestCase,
Config
),
[
{crl_pem_not_revoked, CRLPemNotRevoked},
{crl_der_not_revoked, CRLDerNotRevoked},
{crl_pem_revoked, CRLPemRevoked},
{crl_der_revoked, CRLDerRevoked},
{http_server, ServerPid}
{http_server, ServerPid},
{tc_apps, Apps}
| Config
];
init_per_testcase(t_cache_overflow, Config) ->
init_per_testcase(t_cache_overflow = TestCase, Config) ->
ct:timetrap({seconds, 120}),
DataDir = ?config(data_dir, Config),
CRLFileRevoked = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
{ok, CRLPemRevoked} = file:read_file(CRLFileRevoked),
ok = snabbkaffe:start_trace(),
DataDir = ?config(data_dir, Config),
{CRLPemRevoked, _} = read_crl(filename:join(DataDir, "intermediate-revoked.crl.pem")),
ServerPid = start_crl_server(CRLPemRevoked),
ExtraVars = #{cache_capacity => <<"2">>},
ok = setup_crl_options(Config, #{is_cached => false, extra_vars => ExtraVars}),
Apps = start_emqx_with_crl_cache(
#{is_cached => false, overrides => #{crl_cache => #{capacity => 2}}},
TestCase,
Config
),
[
{http_server, ServerPid}
{http_server, ServerPid},
{tc_apps, Apps}
| Config
];
init_per_testcase(t_not_cached_and_unreachable, Config) ->
init_per_testcase(TestCase, Config) when
TestCase =:= t_not_cached_and_unreachable;
TestCase =:= t_update_config
->
ct:timetrap({seconds, 30}),
DataDir = ?config(data_dir, Config),
CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
{ok, CRLPem} = file:read_file(CRLFile),
[{'CertificateList', CRLDer, not_encrypted}] = public_key:pem_decode(CRLPem),
ok = snabbkaffe:start_trace(),
application:stop(cowboy),
ok = setup_crl_options(Config, #{is_cached => false}),
DataDir = ?config(data_dir, Config),
{CRLPem, CRLDer} = read_crl(filename:join(DataDir, "intermediate-revoked.crl.pem")),
Apps = start_emqx_with_crl_cache(#{is_cached => false}, TestCase, Config),
[
{crl_pem, CRLPem},
{crl_der, CRLDer}
{crl_der, CRLDer},
{tc_apps, Apps}
| Config
];
init_per_testcase(t_refresh_config, Config) ->
init_per_testcase(t_refresh_config = TestCase, Config) ->
ct:timetrap({seconds, 30}),
ok = snabbkaffe:start_trace(),
DataDir = ?config(data_dir, Config),
CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
{ok, CRLPem} = file:read_file(CRLFile),
[{'CertificateList', CRLDer, not_encrypted}] = public_key:pem_decode(CRLPem),
{CRLPem, CRLDer} = read_crl(filename:join(DataDir, "intermediate-revoked.crl.pem")),
TestPid = self(),
ok = meck:new(emqx_crl_cache, [non_strict, passthrough, no_history, no_link]),
meck:expect(
@ -131,42 +129,49 @@ init_per_testcase(t_refresh_config, Config) ->
{ok, {{"HTTP/1.0", 200, "OK"}, [], CRLPem}}
end
),
ok = snabbkaffe:start_trace(),
ok = setup_crl_options(Config, #{is_cached => false}),
Apps = start_emqx_with_crl_cache(#{is_cached => false}, TestCase, Config),
[
{crl_pem, CRLPem},
{crl_der, CRLDer}
{crl_der, CRLDer},
{tc_apps, Apps}
| Config
];
init_per_testcase(TestCase, Config) when
TestCase =:= t_update_listener;
TestCase =:= t_validations
->
ct:timetrap({seconds, 30}),
ok = snabbkaffe:start_trace(),
%% when running emqx standalone tests, we can't use those
%% features.
case does_module_exist(emqx_mgmt_api_test_util) of
case does_module_exist(emqx_management) of
true ->
ct:timetrap({seconds, 30}),
DataDir = ?config(data_dir, Config),
PrivDir = ?config(priv_dir, Config),
CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
{ok, CRLPem} = file:read_file(CRLFile),
ok = snabbkaffe:start_trace(),
ServerPid = start_crl_server(CRLPem),
ConfFilePath = filename:join([DataDir, "emqx_just_verify.conf"]),
emqx_mgmt_api_test_util:init_suite(
[emqx_conf],
fun emqx_mgmt_api_test_util:set_special_configs/1,
#{
extra_mustache_vars => #{
test_data_dir => DataDir,
test_priv_dir => PrivDir
},
conf_file_path => ConfFilePath
ListenerConf = #{
enable => true,
ssl_options => #{
keyfile => filename:join(DataDir, "server.key.pem"),
certfile => filename:join(DataDir, "server.cert.pem"),
cacertfile => filename:join(DataDir, "ca-chain.cert.pem"),
verify => verify_peer,
enable_crl_check => false
}
},
Apps = emqx_cth_suite:start(
[
{emqx_conf, #{config => #{listeners => #{ssl => #{default => ListenerConf}}}}},
emqx,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
),
[
{http_server, ServerPid}
{http_server, ServerPid},
{tc_apps, Apps}
| Config
];
false ->
@ -174,10 +179,9 @@ init_per_testcase(TestCase, Config) when
end;
init_per_testcase(_TestCase, Config) ->
ct:timetrap({seconds, 30}),
ok = snabbkaffe:start_trace(),
DataDir = ?config(data_dir, Config),
CRLFile = filename:join([DataDir, "intermediate-revoked.crl.pem"]),
{ok, CRLPem} = file:read_file(CRLFile),
[{'CertificateList', CRLDer, not_encrypted}] = public_key:pem_decode(CRLPem),
{CRLPem, CRLDer} = read_crl(filename:join(DataDir, "intermediate-revoked.crl.pem")),
TestPid = self(),
ok = meck:new(emqx_crl_cache, [non_strict, passthrough, no_history, no_link]),
meck:expect(
@ -189,53 +193,17 @@ init_per_testcase(_TestCase, Config) ->
{ok, {{"HTTP/1.0", 200, 'OK'}, [], CRLPem}}
end
),
ok = snabbkaffe:start_trace(),
[
{crl_pem, CRLPem},
{crl_der, CRLDer}
| Config
].
end_per_testcase(TestCase, Config) when
TestCase =:= t_cache;
TestCase =:= t_filled_cache;
TestCase =:= t_revoked
->
ServerPid = ?config(http_server, Config),
emqx_crl_cache_http_server:stop(ServerPid),
emqx_common_test_helpers:stop_apps([]),
clear_listeners(),
application:stop(cowboy),
clear_crl_cache(),
ok = snabbkaffe:stop(),
ok;
end_per_testcase(TestCase, Config) when
TestCase =:= t_revoke_then_refresh;
TestCase =:= t_cache_overflow
->
ServerPid = ?config(http_server, Config),
emqx_crl_cache_http_server:stop(ServerPid),
emqx_common_test_helpers:stop_apps([]),
clear_listeners(),
clear_crl_cache(),
application:stop(cowboy),
ok = snabbkaffe:stop(),
ok;
end_per_testcase(t_not_cached_and_unreachable, _Config) ->
emqx_common_test_helpers:stop_apps([]),
clear_listeners(),
clear_crl_cache(),
ok = snabbkaffe:stop(),
ok;
end_per_testcase(t_refresh_config, _Config) ->
meck:unload([emqx_crl_cache]),
clear_crl_cache(),
emqx_common_test_helpers:stop_apps([]),
clear_listeners(),
clear_crl_cache(),
application:stop(cowboy),
ok = snabbkaffe:stop(),
ok;
read_crl(Filename) ->
{ok, PEM} = file:read_file(Filename),
[{'CertificateList', DER, not_encrypted}] = public_key:pem_decode(PEM),
{PEM, DER}.
end_per_testcase(TestCase, Config) when
TestCase =:= t_update_listener;
TestCase =:= t_validations
@ -245,18 +213,20 @@ end_per_testcase(TestCase, Config) when
true ->
ok;
false ->
ServerPid = ?config(http_server, Config),
emqx_crl_cache_http_server:stop(ServerPid),
emqx_mgmt_api_test_util:end_suite([emqx_conf]),
clear_listeners(),
ok = snabbkaffe:stop(),
clear_crl_cache(),
ok
end_per_testcase(common, Config)
end;
end_per_testcase(_TestCase, _Config) ->
meck:unload([emqx_crl_cache]),
clear_crl_cache(),
end_per_testcase(_TestCase, Config) ->
ok = snabbkaffe:stop(),
clear_crl_cache(),
_ = emqx_maybe:apply(
fun emqx_crl_cache_http_server:stop/1,
proplists:get_value(http_server, Config)
),
_ = emqx_maybe:apply(
fun emqx_cth_suite:stop/1,
proplists:get_value(tc_apps, Config)
),
catch meck:unload([emqx_crl_cache]),
ok.
%%--------------------------------------------------------------------
@ -278,11 +248,6 @@ does_module_exist(Mod) ->
end
end.
clear_listeners() ->
emqx_config:put([listeners], #{}),
emqx_config:put_raw([<<"listeners">>], #{}),
ok.
assert_http_get(URL) ->
receive
{http_get, URL} ->
@ -341,61 +306,41 @@ clear_crl_cache() ->
ensure_ssl_manager_alive(),
ok.
force_cacertfile(Cacertfile) ->
{SSLListeners0, OtherListeners} = lists:partition(
fun(#{proto := Proto}) -> Proto =:= ssl end,
emqx:get_env(listeners)
),
SSLListeners =
lists:map(
fun(Listener = #{opts := Opts0}) ->
SSLOpts0 = proplists:get_value(ssl_options, Opts0),
%% it injects some garbage...
SSLOpts1 = lists:keydelete(cacertfile, 1, lists:keydelete(cacertfile, 1, SSLOpts0)),
SSLOpts2 = [{cacertfile, Cacertfile} | SSLOpts1],
Opts1 = lists:keyreplace(ssl_options, 1, Opts0, {ssl_options, SSLOpts2}),
Listener#{opts => Opts1}
end,
SSLListeners0
),
application:set_env(emqx, listeners, SSLListeners ++ OtherListeners),
ok.
setup_crl_options(Config, #{is_cached := IsCached} = Opts) ->
start_emqx_with_crl_cache(#{is_cached := IsCached} = Opts, TC, Config) ->
DataDir = ?config(data_dir, Config),
ConfFilePath = filename:join([DataDir, "emqx.conf"]),
Defaults = #{
refresh_interval => <<"11m">>,
cache_capacity => <<"100">>,
test_data_dir => DataDir
},
ExtraVars0 = maps:get(extra_vars, Opts, #{}),
ExtraVars = maps:merge(Defaults, ExtraVars0),
emqx_common_test_helpers:start_apps(
[],
fun(_) -> ok end,
#{
extra_mustache_vars => ExtraVars,
conf_file_path => ConfFilePath
Overrides = maps:get(overrides, Opts, #{}),
ListenerConf = #{
enable => true,
ssl_options => #{
keyfile => filename:join(DataDir, "server.key.pem"),
certfile => filename:join(DataDir, "server.cert.pem"),
cacertfile => filename:join(DataDir, "ca-chain.cert.pem"),
verify => verify_peer,
enable_crl_check => true
}
},
Conf = #{
listeners => #{ssl => #{default => ListenerConf}},
crl_cache => #{
refresh_interval => <<"11m">>,
http_timeout => <<"17s">>,
capacity => 100
}
},
Apps = emqx_cth_suite:start(
[{emqx, #{config => emqx_utils_maps:deep_merge(Conf, Overrides)}}],
#{work_dir => emqx_cth_suite:work_dir(TC, Config)}
),
case IsCached of
true ->
%% wait the cache to be filled
emqx_crl_cache:refresh(?DEFAULT_URL),
receive
{http_get, <<?DEFAULT_URL>>} -> ok
after 1_000 ->
ct:pal("mailbox: ~p", [process_info(self(), messages)]),
error(crl_cache_not_filled)
end;
?assertReceive({http_get, <<?DEFAULT_URL>>});
false ->
%% ensure cache is empty
clear_crl_cache(),
ok
end,
drain_msgs(),
ok.
Apps.
start_crl_server(CRLPem) ->
application:ensure_all_started(cowboy),
@ -442,7 +387,8 @@ assert_successful_connection(Config, ClientNum) ->
{ssl, true},
{ssl_opts, [
{certfile, ClientCert},
{keyfile, ClientKey}
{keyfile, ClientKey},
{verify, verify_none}
]},
{port, 8883}
]),
@ -494,31 +440,21 @@ t_init_empty_urls(_Config) ->
ok.
t_update_config(_Config) ->
emqx_config:save_schema_mod_and_names(emqx_schema),
emqx_config_handler:start_link(),
{ok, Pid} = emqx_crl_cache:start_link(),
Conf = #{
refresh_interval => <<"5m">>,
http_timeout => <<"10m">>,
capacity => 123
<<"refresh_interval">> => <<"5m">>,
<<"http_timeout">> => <<"10m">>,
<<"capacity">> => 123
},
?assertMatch({ok, _}, emqx:update_config([<<"crl_cache">>], Conf)),
State = sys:get_state(Pid),
State = emqx_crl_cache:info(),
?assertEqual(
#{
refresh_interval => timer:minutes(5),
http_timeout => timer:minutes(10),
capacity => 123
cache_capacity => 123
},
#{
refresh_interval => element(3, State),
http_timeout => element(4, State),
capacity => element(7, State)
}
),
emqx_config:erase(<<"crl_cache">>),
emqx_config_handler:stop(),
ok.
maps:with([refresh_interval, http_timeout, cache_capacity], State)
).
t_manual_refresh(Config) ->
CRLDer = ?config(crl_der, Config),
@ -666,7 +602,8 @@ t_cache(Config) ->
{ssl, true},
{ssl_opts, [
{certfile, ClientCert},
{keyfile, ClientKey}
{keyfile, ClientKey},
{verify, verify_none}
]},
{port, 8883}
]),
@ -684,7 +621,8 @@ t_cache(Config) ->
{ssl, true},
{ssl_opts, [
{certfile, ClientCert},
{keyfile, ClientKey}
{keyfile, ClientKey},
{verify, verify_none}
]},
{port, 8883}
]),
@ -894,7 +832,8 @@ t_filled_cache(Config) ->
{ssl, true},
{ssl_opts, [
{certfile, ClientCert},
{keyfile, ClientKey}
{keyfile, ClientKey},
{verify, verify_none}
]},
{port, 8883}
]),
@ -918,7 +857,8 @@ t_not_cached_and_unreachable(Config) ->
{ssl, true},
{ssl_opts, [
{certfile, ClientCert},
{keyfile, ClientKey}
{keyfile, ClientKey},
{verify, verify_none}
]},
{port, 8883}
]),
@ -936,7 +876,8 @@ t_revoked(Config) ->
{ssl, true},
{ssl_opts, [
{certfile, ClientCert},
{keyfile, ClientKey}
{keyfile, ClientKey},
{verify, verify_none}
]},
{port, 8883}
]),
@ -958,7 +899,8 @@ t_revoke_then_refresh(Config) ->
{ssl, true},
{ssl_opts, [
{certfile, ClientCert},
{keyfile, ClientKey}
{keyfile, ClientKey},
{verify, verify_none}
]},
{port, 8883}
]),
@ -981,7 +923,8 @@ t_revoke_then_refresh(Config) ->
{ssl, true},
{ssl_opts, [
{certfile, ClientCert},
{keyfile, ClientKey}
{keyfile, ClientKey},
{verify, verify_none}
]},
{port, 8883}
]),
@ -1026,7 +969,8 @@ do_t_update_listener(Config) ->
{ssl, true},
{ssl_opts, [
{certfile, ClientCert},
{keyfile, ClientKey}
{keyfile, ClientKey},
{verify, verify_none}
]},
{port, 8883}
]),
@ -1064,7 +1008,8 @@ do_t_update_listener(Config) ->
{ssl, true},
{ssl_opts, [
{certfile, ClientCert},
{keyfile, ClientKey}
{keyfile, ClientKey},
{verify, verify_none}
]},
{port, 8883}
]),

View File

@ -1,12 +0,0 @@
crl_cache.refresh_interval = {{ refresh_interval }}
crl_cache.http_timeout = 17s
crl_cache.capacity = {{ cache_capacity }}
listeners.ssl.default {
ssl_options {
keyfile = "{{ test_data_dir }}/server.key.pem"
certfile = "{{ test_data_dir }}/server.cert.pem"
cacertfile = "{{ test_data_dir }}/ca-chain.cert.pem"
verify = verify_peer
enable_crl_check = true
}
}

View File

@ -1,12 +0,0 @@
node.name = test@127.0.0.1
node.cookie = emqxsecretcookie
node.data_dir = "{{ test_priv_dir }}"
listeners.ssl.default {
ssl_options {
keyfile = "{{ test_data_dir }}/server.key.pem"
certfile = "{{ test_data_dir }}/server.cert.pem"
cacertfile = "{{ test_data_dir }}/ca-chain.cert.pem"
verify = verify_peer
enable_crl_check = false
}
}

View File

@ -34,20 +34,14 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
ok = ekka:start(),
OldConf = emqx:get_config([zones], #{}),
emqx_config:put_zone_conf(default, [mqtt, exclusive_subscription], true),
timer:sleep(50),
[{old_conf, OldConf} | Config].
Apps = emqx_cth_suite:start(
[{emqx, "mqtt.exclusive_subscription = true"}],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(Config) ->
emqx_config:put([zones], proplists:get_value(old_conf, Config)),
ekka:stop(),
mria:stop(),
mria_mnesia:delete_schema(),
emqx_common_test_helpers:stop_apps([]).
emqx_cth_suite:stop(proplists:get_value(apps, Config)).
end_per_testcase(_TestCase, _Config) ->
emqx_exclusive_subscription:clear().

View File

@ -30,12 +30,11 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_suite(Config) ->
emqx_cth_suite:stop(proplists:get_value(apps, Config)).
init_per_testcase(_TestCase, Config) ->
Init = emqx:get_raw_config(?LISTENERS),

View File

@ -915,10 +915,8 @@ do_t_validations(_Config) ->
#{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := MsgRaw3} =
emqx_utils_json:decode(ResRaw3, [return_maps]),
%% we can't remove certfile now, because it has default value.
?assertMatch(
<<"{bad_ssl_config,#{file_read => enoent,pem_check => invalid_pem", _/binary>>,
MsgRaw3
),
?assertMatch({match, _}, re:run(MsgRaw3, <<"enoent">>)),
?assertMatch({match, _}, re:run(MsgRaw3, <<"invalid_pem">>)),
ok.
t_unknown_error_fetching_ocsp_response(_Config) ->

View File

@ -35,8 +35,6 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(_, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
emqx_olp:enable(),
case wait_for(fun() -> lc_sup:whereis_runq_flagman() end, 10) of
true -> ok;

View File

@ -258,6 +258,75 @@ t_qos0(_Config) ->
emqtt:stop(Pub)
end.
t_qos0_only_many_streams(_Config) ->
ClientId = <<?MODULE_STRING "_sub">>,
Sub = connect(ClientId, true, 30),
Pub = connect(<<?MODULE_STRING "_pub">>, true, 0),
[ConnPid] = emqx_cm:lookup_channels(ClientId),
try
{ok, _, [1]} = emqtt:subscribe(Sub, <<"t/#">>, qos1),
[
emqtt:publish(Pub, Topic, Payload, ?QOS_0)
|| {Topic, Payload} <- [
{<<"t/1">>, <<"foo">>},
{<<"t/2">>, <<"bar">>},
{<<"t/3">>, <<"baz">>}
]
],
?assertMatch(
[_, _, _],
receive_messages(3)
),
Inflight0 = get_session_inflight(ConnPid),
[
emqtt:publish(Pub, Topic, Payload, ?QOS_0)
|| {Topic, Payload} <- [
{<<"t/2">>, <<"foo">>},
{<<"t/2">>, <<"bar">>},
{<<"t/1">>, <<"baz">>}
]
],
?assertMatch(
[_, _, _],
receive_messages(3)
),
[
emqtt:publish(Pub, Topic, Payload, ?QOS_0)
|| {Topic, Payload} <- [
{<<"t/3">>, <<"foo">>},
{<<"t/3">>, <<"bar">>},
{<<"t/2">>, <<"baz">>}
]
],
?assertMatch(
[_, _, _],
receive_messages(3)
),
?assertMatch(
#{pubranges := [_, _, _]},
emqx_persistent_session_ds:print_session(ClientId)
),
Inflight1 = get_session_inflight(ConnPid),
%% TODO: Kinda stupid way to verify that the runtime state is not growing.
?assert(
erlang:external_size(Inflight1) - erlang:external_size(Inflight0) < 16,
Inflight1
)
after
emqtt:stop(Sub),
emqtt:stop(Pub)
end.
get_session_inflight(ConnPid) ->
emqx_connection:info({channel, {session, inflight}}, sys:get_state(ConnPid)).
t_publish_as_persistent(_Config) ->
Sub = connect(<<?MODULE_STRING "1">>, true, 30),
Pub = connect(<<?MODULE_STRING "2">>, true, 30),
@ -343,8 +412,8 @@ consume(It) ->
case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, 100) of
{ok, _NIt, _Msgs = []} ->
[];
{ok, NIt, Msgs} ->
Msgs ++ consume(NIt);
{ok, NIt, MsgsAndKeys} ->
[Msg || {_DSKey, Msg} <- MsgsAndKeys] ++ consume(NIt);
{ok, end_of_stream} ->
[]
end.

View File

@ -115,7 +115,7 @@ init_per_group(quic, Config) ->
[
{port, get_listener_port(quic, test)},
{conn_fun, quic_connect},
{ssl_opts, emqx_common_test_helpers:client_ssl_twoway()},
{ssl_opts, emqx_common_test_helpers:client_mtls()},
{ssl, true},
{group_apps, Apps}
| Config

View File

@ -144,19 +144,35 @@ groups() ->
].
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps([]),
UdpPort = 14567,
start_emqx_quic(UdpPort),
%% Turn off force_shutdown policy.
ShutdownPolicy = emqx_config:get_zone_conf(default, [force_shutdown]),
ct:pal("force shutdown config: ~p", [ShutdownPolicy]),
emqx_config:put_zone_conf(default, [force_shutdown], ShutdownPolicy#{enable := false}),
[{shutdown_policy, ShutdownPolicy}, {port, UdpPort}, {pub_qos, 0}, {sub_qos, 0} | Config].
Apps = start_emqx(Config),
[{port, 14567}, {pub_qos, 0}, {sub_qos, 0}, {apps, Apps} | Config].
end_per_suite(Config) ->
emqx_config:put_zone_conf(default, [force_shutdown], ?config(shutdown_policy, Config)),
emqx_common_test_helpers:stop_apps([]),
ok.
emqx_cth_suite:stop(?config(apps, Config)).
start_emqx(Config) ->
emqx_cth_suite:start(
[mk_emqx_spec()],
#{work_dir => emqx_cth_suite:work_dir(Config)}
).
stop_emqx(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
restart_emqx(Config) ->
ok = stop_emqx(Config),
emqx_cth_suite:start(
[mk_emqx_spec()],
#{work_dir => emqx_cth_suite:work_dir(Config), boot_type => restart}
).
mk_emqx_spec() ->
{emqx,
%% Turn off force_shutdown policy.
"force_shutdown.enable = false"
"\n listeners.quic.default {"
"\n enable = true, bind = 14567, acceptors = 16, idle_timeout_ms = 15000"
"\n }"}.
init_per_group(pub_qos0, Config) ->
[{pub_qos, 0} | Config];
@ -190,11 +206,6 @@ init_per_group(_, Config) ->
end_per_group(_, Config) ->
Config.
init_per_testcase(_, Config) ->
emqx_common_test_helpers:start_apps([]),
start_emqx_quic(?config(port, Config)),
Config.
t_quic_sock(Config) ->
Port = 4567,
SslOpts = [
@ -1582,9 +1593,13 @@ t_multi_streams_remote_shutdown(Config) ->
{quic, _Conn, _Ctrlstream} = proplists:get_value(socket, emqtt:info(C)),
ok = stop_emqx(),
%% Client should be closed
assert_client_die(C, 100, 200).
ok = stop_emqx(Config),
try
%% Client should be closed
assert_client_die(C, 100, 200)
after
restart_emqx(Config)
end.
t_multi_streams_remote_shutdown_with_reconnect(Config) ->
erlang:process_flag(trap_exit, true),
@ -1636,10 +1651,8 @@ t_multi_streams_remote_shutdown_with_reconnect(Config) ->
{quic, _Conn, _Ctrlstream} = proplists:get_value(socket, emqtt:info(C)),
ok = stop_emqx(),
_Apps = restart_emqx(Config),
timer:sleep(200),
start_emqx_quic(?config(port, Config)),
?assert(is_list(emqtt:info(C))),
emqtt:stop(C).
@ -2028,16 +2041,6 @@ calc_pkt_id(1, Id) ->
calc_pkt_id(2, Id) ->
Id.
-spec start_emqx_quic(inet:port_number()) -> ok.
start_emqx_quic(UdpPort) ->
emqx_common_test_helpers:start_apps([]),
application:ensure_all_started(quicer),
emqx_common_test_helpers:ensure_quic_listener(?MODULE, UdpPort).
-spec stop_emqx() -> ok.
stop_emqx() ->
emqx_common_test_helpers:stop_apps([]).
%% select a random port picked by OS
-spec select_port() -> inet:port_number().
select_port() ->

View File

@ -47,23 +47,23 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
load_conf(),
emqx_common_test_helpers:start_apps([?APP]),
Config.
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
ok = load_conf(),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([?APP]).
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(_TestCase, Config) ->
emqx_config:erase(limiter),
load_conf(),
ok = emqx_config:erase(limiter),
ok = load_conf(),
Config.
end_per_testcase(_TestCase, Config) ->
Config.
load_conf() ->
ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF).
emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF).
init_config() ->
emqx_config:init_load(emqx_limiter_schema, ?BASE_CONF).

View File

@ -22,12 +22,11 @@
-include_lib("common_test/include/ct.hrl").
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_suite(Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
all() ->
emqx_common_test_helpers:all(?MODULE).

View File

@ -49,31 +49,11 @@
all() -> emqx_common_test_helpers:all(?SUITE).
init_per_suite(Config) ->
DistPid =
case net_kernel:nodename() of
ignored ->
%% calling `net_kernel:start' without `epmd'
%% running will result in a failure.
emqx_common_test_helpers:start_epmd(),
{ok, Pid} = net_kernel:start(['master@127.0.0.1', longnames]),
ct:pal("start epmd, node name: ~p", [node()]),
Pid;
_ ->
undefined
end,
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
[{dist_pid, DistPid} | Config].
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].
end_per_suite(Config) ->
DistPid = ?config(dist_pid, Config),
case DistPid of
Pid when is_pid(Pid) ->
net_kernel:stop();
_ ->
ok
end,
emqx_common_test_helpers:stop_apps([]).
emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(Case, Config) ->
try
@ -759,11 +739,16 @@ t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) ->
?assert(is_process_alive(Pid2)),
{ok, _} = emqtt:publish(ConnPub, Topic, <<"hello11">>, 1),
ct:sleep(100),
Msgs1 = emqx_mqueue:to_list(get_mqueue(Pid1)),
Msgs2 = emqx_mqueue:to_list(get_mqueue(Pid2)),
%% assert the message is in mqueue (because socket is closed)
?assertMatch([#message{payload = <<"hello11">>}], Msgs1 ++ Msgs2),
?retry(
100,
10,
begin
Msgs1 = emqx_mqueue:to_list(get_mqueue(Pid1)),
Msgs2 = emqx_mqueue:to_list(get_mqueue(Pid2)),
%% assert the message is in mqueue (because socket is closed)
?assertMatch([#message{payload = <<"hello11">>}], Msgs1 ++ Msgs2)
end
),
emqtt:stop(ConnPub),
ok.

View File

@ -62,7 +62,10 @@ t_run_check(_) ->
error(version_mismatch)
end,
BpapiDumps = filelib:wildcard(
filename:join(emqx_bpapi_static_checks:dumps_dir(), "*.bpapi")
filename:join(
emqx_bpapi_static_checks:dumps_dir(),
"*" ++ emqx_bpapi_static_checks:dump_file_extension()
)
),
logger:info("Backplane API dump files: ~p", [BpapiDumps]),
?assert(emqx_bpapi_static_checks:check_compat(BpapiDumps))

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -24,12 +24,11 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
end_per_suite(Config) ->
emqx_cth_suite:stop(proplists:get_value(apps, Config)).
t_child(_) ->
?assertMatch({error, _}, emqx_sup:start_child(undef, worker)),

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth, [
{description, "EMQX Authentication and authorization"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{modules, []},
{registered, [emqx_auth_sup]},
{applications, [

View File

@ -408,8 +408,7 @@ init_metrics(Source) ->
{stop, #{result => deny, from => ?MODULE}}.
authorize_deny(
#{
username := Username,
peerhost := IpAddress
username := Username
} = _Client,
_PubSub,
Topic,
@ -419,13 +418,15 @@ authorize_deny(
?SLOG(warning, #{
msg => "authorization_not_initialized",
username => Username,
ipaddr => IpAddress,
topic => Topic,
source => ?MODULE
}),
{stop, #{result => deny, from => ?MODULE}}.
%% @doc Check AuthZ
%% @doc Check AuthZ.
%% DefaultResult is always ignored in this callback because the final decision
%% is to be made by `emqx_access_control' module after all authorization
%% sources are exhausted.
-spec authorize(
emqx_types:clientinfo(),
emqx_types:pubsub(),
@ -434,77 +435,36 @@ authorize_deny(
sources()
) ->
authz_result().
authorize(
#{
username := Username,
peerhost := IpAddress
} = Client,
PubSub,
Topic,
DefaultResult,
Sources
) ->
authorize(Client, PubSub, Topic, _DefaultResult, Sources) ->
case maps:get(is_superuser, Client, false) of
true ->
log_allowed(#{
username => Username,
ipaddr => IpAddress,
topic => Topic,
is_superuser => true
}),
emqx_metrics:inc(?METRIC_SUPERUSER),
{stop, #{result => allow, from => superuser}};
false ->
authorize_non_superuser(Client, PubSub, Topic, DefaultResult, Sources)
authorize_non_superuser(Client, PubSub, Topic, Sources)
end.
authorize_non_superuser(
#{
username := Username,
peerhost := IpAddress
} = Client,
PubSub,
Topic,
_DefaultResult,
Sources
) ->
authorize_non_superuser(Client, PubSub, Topic, Sources) ->
case do_authorize(Client, PubSub, Topic, sources_with_defaults(Sources)) of
{{matched, allow}, AuthzSource} ->
log_allowed(#{
username => Username,
ipaddr => IpAddress,
topic => Topic,
source => AuthzSource
}),
emqx_metrics_worker:inc(authz_metrics, AuthzSource, allow),
emqx_metrics:inc(?METRIC_ALLOW),
{stop, #{result => allow, from => AuthzSource}};
{stop, #{result => allow, from => source_for_logging(AuthzSource, Client)}};
{{matched, deny}, AuthzSource} ->
?SLOG(warning, #{
msg => "authorization_permission_denied",
username => Username,
ipaddr => IpAddress,
topic => Topic,
source => AuthzSource
}),
emqx_metrics_worker:inc(authz_metrics, AuthzSource, deny),
emqx_metrics:inc(?METRIC_DENY),
{stop, #{result => deny, from => AuthzSource}};
{stop, #{result => deny, from => source_for_logging(AuthzSource, Client)}};
nomatch ->
?tp(authz_non_superuser, #{result => nomatch}),
?SLOG(info, #{
msg => "authorization_failed_nomatch",
username => Username,
ipaddr => IpAddress,
topic => Topic,
reason => "no-match rule"
}),
emqx_metrics:inc(?METRIC_NOMATCH),
%% return ignore here because there might be other hook callbacks
ignore
end.
log_allowed(Meta) ->
?SLOG(info, Meta#{msg => "authorization_permission_allowed"}).
source_for_logging(client_info, #{acl := Acl}) ->
maps:get(source_for_logging, Acl, client_info);
source_for_logging(Type, _) ->
Type.
do_authorize(_Client, _PubSub, _Topic, []) ->
nomatch;
@ -512,8 +472,7 @@ do_authorize(Client, PubSub, Topic, [#{enable := false} | Rest]) ->
do_authorize(Client, PubSub, Topic, Rest);
do_authorize(
#{
username := Username,
peerhost := IpAddress
username := Username
} = Client,
PubSub,
Topic,
@ -527,9 +486,8 @@ do_authorize(
?TRACE("AUTHZ", "authorization_module_nomatch", #{
module => Module,
username => Username,
ipaddr => IpAddress,
topic => Topic,
pub_sub => PubSub
action => emqx_access_control:format_action(PubSub)
}),
do_authorize(Client, PubSub, Topic, Tail);
%% {matched, allow | deny | ignore}
@ -537,18 +495,16 @@ do_authorize(
?TRACE("AUTHZ", "authorization_module_match_ignore", #{
module => Module,
username => Username,
ipaddr => IpAddress,
topic => Topic,
pub_sub => PubSub
action => emqx_access_control:format_action(PubSub)
}),
do_authorize(Client, PubSub, Topic, Tail);
ignore ->
?TRACE("AUTHZ", "authorization_module_ignore", #{
module => Module,
username => Username,
ipaddr => IpAddress,
topic => Topic,
pub_sub => PubSub
action => emqx_access_control:format_action(PubSub)
}),
do_authorize(Client, PubSub, Topic, Tail);
%% {matched, allow | deny}

View File

@ -106,9 +106,15 @@ compile({Permission, Who, Action, TopicFilters}) when
|| Topic <- TopicFilters
]};
compile({Permission, _Who, _Action, _TopicFilter}) when not ?IS_PERMISSION(Permission) ->
throw({invalid_authorization_permission, Permission});
throw(#{
reason => invalid_authorization_permission,
value => Permission
});
compile(BadRule) ->
throw({invalid_authorization_rule, BadRule}).
throw(#{
reason => invalid_authorization_rule,
value => BadRule
}).
compile_action(Action) ->
compile_action(emqx_authz:feature_available(rich_actions), Action).
@ -133,7 +139,10 @@ compile_action(true = _RichActionsOn, {Action, Opts}) when
retain => retain_from_opts(Opts)
};
compile_action(_RichActionsOn, Action) ->
throw({invalid_authorization_action, Action}).
throw(#{
reason => invalid_authorization_action,
value => Action
}).
qos_from_opts(Opts) ->
try
@ -152,20 +161,29 @@ qos_from_opts(Opts) ->
)
end
catch
bad_qos ->
throw({invalid_authorization_qos, Opts})
{bad_qos, QoS} ->
throw(#{
reason => invalid_authorization_qos,
qos => QoS
})
end.
validate_qos(QoS) when is_integer(QoS), QoS >= 0, QoS =< 2 ->
QoS;
validate_qos(_) ->
throw(bad_qos).
validate_qos(QoS) ->
throw({bad_qos, QoS}).
retain_from_opts(Opts) ->
case proplists:get_value(retain, Opts, ?DEFAULT_RULE_RETAIN) of
all -> all;
Retain when is_boolean(Retain) -> Retain;
_ -> throw({invalid_authorization_retain, Opts})
all ->
all;
Retain when is_boolean(Retain) ->
Retain;
Value ->
throw(#{
reason => invalid_authorization_retain,
value => Value
})
end.
compile_who(all) ->
@ -193,7 +211,10 @@ compile_who({'and', L}) when is_list(L) ->
compile_who({'or', L}) when is_list(L) ->
{'or', [compile_who(Who) || Who <- L]};
compile_who(Who) ->
throw({invalid_who, Who}).
throw(#{
reason => invalid_client_match_condition,
identifier => Who
}).
compile_topic("eq " ++ Topic) ->
{eq, emqx_topic:words(bin(Topic))};
@ -254,9 +275,17 @@ match_action(#{action_type := subscribe, qos := QoS}, #{action_type := subscribe
match_qos(QoS, QoSCond);
match_action(#{action_type := subscribe, qos := QoS}, #{action_type := all, qos := QoSCond}) ->
match_qos(QoS, QoSCond);
match_action(_, _) ->
match_action(_, PubSubCond) ->
true = is_pubsub_cond(PubSubCond),
false.
is_pubsub_cond(publish) ->
true;
is_pubsub_cond(subscribe) ->
true;
is_pubsub_cond(#{action_type := A}) ->
is_pubsub_cond(A).
match_pubsub(publish, publish) -> true;
match_pubsub(subscribe, subscribe) -> true;
match_pubsub(_, all) -> true;

View File

@ -37,7 +37,7 @@
emqx_authz_rule:action_condition(),
emqx_authz_rule:topic_condition()
}}
| {error, term()}.
| {error, map()}.
parse_rule(
#{
<<"permission">> := PermissionRaw,
@ -51,11 +51,18 @@ parse_rule(
Action = validate_rule_action(ActionType, RuleRaw),
{ok, {Permission, Action, Topics}}
catch
throw:ValidationError ->
{error, ValidationError}
throw:{Invalid, Which} ->
{error, #{
reason => Invalid,
value => Which
}}
end;
parse_rule(RuleRaw) ->
{error, {invalid_rule, RuleRaw}}.
{error, #{
reason => invalid_rule,
value => RuleRaw,
explain => "missing 'permission' or 'action' field"
}}.
-spec format_rule({
emqx_authz_rule:permission(),
@ -88,7 +95,7 @@ validate_rule_topics(#{<<"topic">> := TopicRaw}) when is_binary(TopicRaw) ->
validate_rule_topics(#{<<"topics">> := TopicsRaw}) when is_list(TopicsRaw) ->
lists:map(fun validate_rule_topic/1, TopicsRaw);
validate_rule_topics(RuleRaw) ->
throw({invalid_topics, RuleRaw}).
throw({missing_topic_or_topics, RuleRaw}).
validate_rule_topic(<<"eq ", TopicRaw/binary>>) ->
{eq, validate_rule_topic(TopicRaw)};
@ -98,8 +105,8 @@ validate_rule_permission(<<"allow">>) -> allow;
validate_rule_permission(<<"deny">>) -> deny;
validate_rule_permission(PermissionRaw) -> throw({invalid_permission, PermissionRaw}).
validate_rule_action_type(<<"publish">>) -> publish;
validate_rule_action_type(<<"subscribe">>) -> subscribe;
validate_rule_action_type(P) when P =:= <<"pub">> orelse P =:= <<"publish">> -> publish;
validate_rule_action_type(S) when S =:= <<"sub">> orelse S =:= <<"subscribe">> -> subscribe;
validate_rule_action_type(<<"all">>) -> all;
validate_rule_action_type(ActionRaw) -> throw({invalid_action, ActionRaw}).
@ -152,7 +159,7 @@ validate_rule_qos_atomic(<<"2">>) -> 2;
validate_rule_qos_atomic(0) -> 0;
validate_rule_qos_atomic(1) -> 1;
validate_rule_qos_atomic(2) -> 2;
validate_rule_qos_atomic(_) -> throw(invalid_qos).
validate_rule_qos_atomic(QoS) -> throw({invalid_qos, QoS}).
validate_rule_retain(<<"0">>) -> false;
validate_rule_retain(<<"1">>) -> true;

View File

@ -34,6 +34,10 @@
authorize/4
]).
-define(IS_V1(Rules), is_map(Rules)).
-define(IS_V2(Rules), is_list(Rules)).
%% For v1
-define(RULE_NAMES, [
{[pub, <<"pub">>], publish},
{[sub, <<"sub">>], subscribe},
@ -55,10 +59,46 @@ update(Source) ->
destroy(_Source) -> ok.
%% @doc Authorize based on cllientinfo enriched with `acl' data.
%% e.g. From JWT.
%%
%% Supproted rules formats are:
%%
%% v1: (always deny when no match)
%%
%% #{
%% pub => [TopicFilter],
%% sub => [TopicFilter],
%% all => [TopicFilter]
%% }
%%
%% v2: (rules are checked in sequence, passthrough when no match)
%%
%% [{
%% Permission :: emqx_authz_rule:permission(),
%% Action :: emqx_authz_rule:action_condition(),
%% Topics :: emqx_authz_rule:topic_condition()
%% }]
%%
%% which is compiled from raw rules like below by emqx_authz_rule_raw
%%
%% [
%% #{
%% permission := allow | deny
%% action := pub | sub | all
%% topic => TopicFilter,
%% topics => [TopicFilter] %% when 'topic' is not provided
%% qos => 0 | 1 | 2 | [0, 1, 2]
%% retain => true | false | all %% only for pub action
%% }
%% ]
%%
authorize(#{acl := Acl} = Client, PubSub, Topic, _Source) ->
case check(Acl) of
{ok, Rules} when is_map(Rules) ->
do_authorize(Client, PubSub, Topic, Rules);
{ok, Rules} when ?IS_V2(Rules) ->
authorize_v2(Client, PubSub, Topic, Rules);
{ok, Rules} when ?IS_V1(Rules) ->
authorize_v1(Client, PubSub, Topic, Rules);
{error, MatchResult} ->
MatchResult
end;
@ -69,7 +109,7 @@ authorize(_Client, _PubSub, _Topic, _Source) ->
%% Internal functions
%%--------------------------------------------------------------------
check(#{expire := Expire, rules := Rules}) when is_map(Rules) ->
check(#{expire := Expire, rules := Rules}) ->
Now = erlang:system_time(second),
case Expire of
N when is_integer(N) andalso N > Now -> {ok, Rules};
@ -83,13 +123,13 @@ check(#{rules := Rules}) ->
check(#{}) ->
{error, nomatch}.
do_authorize(Client, PubSub, Topic, AclRules) ->
do_authorize(Client, PubSub, Topic, AclRules, ?RULE_NAMES).
authorize_v1(Client, PubSub, Topic, AclRules) ->
authorize_v1(Client, PubSub, Topic, AclRules, ?RULE_NAMES).
do_authorize(_Client, _PubSub, _Topic, _AclRules, []) ->
authorize_v1(_Client, _PubSub, _Topic, _AclRules, []) ->
{matched, deny};
do_authorize(Client, PubSub, Topic, AclRules, [{Keys, Action} | RuleNames]) ->
TopicFilters = get_topic_filters(Keys, AclRules, []),
authorize_v1(Client, PubSub, Topic, AclRules, [{Keys, Action} | RuleNames]) ->
TopicFilters = get_topic_filters_v1(Keys, AclRules, []),
case
emqx_authz_rule:match(
Client,
@ -99,13 +139,16 @@ do_authorize(Client, PubSub, Topic, AclRules, [{Keys, Action} | RuleNames]) ->
)
of
{matched, Permission} -> {matched, Permission};
nomatch -> do_authorize(Client, PubSub, Topic, AclRules, RuleNames)
nomatch -> authorize_v1(Client, PubSub, Topic, AclRules, RuleNames)
end.
get_topic_filters([], _Rules, Default) ->
get_topic_filters_v1([], _Rules, Default) ->
Default;
get_topic_filters([Key | Keys], Rules, Default) ->
get_topic_filters_v1([Key | Keys], Rules, Default) ->
case Rules of
#{Key := Value} -> Value;
#{} -> get_topic_filters(Keys, Rules, Default)
#{} -> get_topic_filters_v1(Keys, Rules, Default)
end.
authorize_v2(Client, PubSub, Topic, Rules) ->
emqx_authz_rule:matches(Client, PubSub, Topic, Rules).

View File

@ -178,11 +178,13 @@ t_bad_file_source(_) ->
BadContent = ?SOURCE_FILE(<<"{allow,{username,\"bar\"}, publish, [\"test\"]}">>),
BadContentErr = {bad_acl_file_content, {1, erl_parse, ["syntax error before: ", []]}},
BadRule = ?SOURCE_FILE(<<"{allow,{username,\"bar\"},publish}.">>),
BadRuleErr = {invalid_authorization_rule, {allow, {username, "bar"}, publish}},
BadRuleErr = #{
reason => invalid_authorization_rule, value => {allow, {username, "bar"}, publish}
},
BadPermission = ?SOURCE_FILE(<<"{not_allow,{username,\"bar\"},publish,[\"test\"]}.">>),
BadPermissionErr = {invalid_authorization_permission, not_allow},
BadPermissionErr = #{reason => invalid_authorization_permission, value => not_allow},
BadAction = ?SOURCE_FILE(<<"{allow,{username,\"bar\"},pubsub,[\"test\"]}.">>),
BadActionErr = {invalid_authorization_action, pubsub},
BadActionErr = #{reason => invalid_authorization_action, value => pubsub},
lists:foreach(
fun({Source, Error}) ->
File = emqx_authz_file:acl_conf_file(),

View File

@ -100,7 +100,7 @@ t_rich_actions(_Config) ->
t_no_rich_actions(_Config) ->
_ = emqx_authz:set_feature_available(rich_actions, false),
?assertMatch(
{error, {pre_config_update, emqx_authz, {invalid_authorization_action, _}}},
{error, {pre_config_update, emqx_authz, #{reason := invalid_authorization_action}}},
emqx_authz:update(?CMD_REPLACE, [
?RAW_SOURCE#{
<<"rules">> =>

View File

@ -176,7 +176,7 @@ t_compile_ce(_Config) ->
_ = emqx_authz:set_feature_available(rich_actions, false),
?assertThrow(
{invalid_authorization_action, _},
#{reason := invalid_authorization_action},
emqx_authz_rule:compile(
{allow, {username, "test"}, {all, [{qos, 2}, {retain, true}]}, ["topic/test"]}
)
@ -676,34 +676,34 @@ t_match(_) ->
t_invalid_rule(_) ->
?assertThrow(
{invalid_authorization_permission, _},
#{reason := invalid_authorization_permission},
emqx_authz_rule:compile({allawww, all, all, ["topic/test"]})
),
?assertThrow(
{invalid_authorization_rule, _},
#{reason := invalid_authorization_rule},
emqx_authz_rule:compile(ooops)
),
?assertThrow(
{invalid_authorization_qos, _},
#{reason := invalid_authorization_qos},
emqx_authz_rule:compile({allow, {username, "test"}, {publish, [{qos, 3}]}, ["topic/test"]})
),
?assertThrow(
{invalid_authorization_retain, _},
#{reason := invalid_authorization_retain},
emqx_authz_rule:compile(
{allow, {username, "test"}, {publish, [{retain, 'FALSE'}]}, ["topic/test"]}
)
),
?assertThrow(
{invalid_authorization_action, _},
#{reason := invalid_authorization_action},
emqx_authz_rule:compile({allow, all, unsubscribe, ["topic/test"]})
),
?assertThrow(
{invalid_who, _},
#{reason := invalid_client_match_condition},
emqx_authz_rule:compile({allow, who, all, ["topic/test"]})
).

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_auth_jwt, [
{description, "EMQX JWT Authentication and Authorization"},
{vsn, "0.1.1"},
{vsn, "0.2.0"},
{registered, []},
{mod, {emqx_auth_jwt_app, []}},
{applications, [

View File

@ -219,14 +219,24 @@ verify(undefined, _, _, _) ->
verify(JWT, JWKs, VerifyClaims, AclClaimName) ->
case do_verify(JWT, JWKs, VerifyClaims) of
{ok, Extra} ->
{ok, acl(Extra, AclClaimName)};
try
{ok, acl(Extra, AclClaimName)}
catch
throw:{bad_acl_rule, Reason} ->
%% it's a invalid token, so ok to log
?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{jwt => JWT}),
{error, bad_username_or_password}
end;
{error, {missing_claim, Claim}} ->
%% it's a invalid token, so it's ok to log
?TRACE_AUTHN_PROVIDER("missing_jwt_claim", #{jwt => JWT, claim => Claim}),
{error, bad_username_or_password};
{error, invalid_signature} ->
%% it's a invalid token, so it's ok to log
?TRACE_AUTHN_PROVIDER("invalid_jwt_signature", #{jwks => JWKs, jwt => JWT}),
ignore;
{error, {claims, Claims}} ->
%% it's a invalid token, so it's ok to log
?TRACE_AUTHN_PROVIDER("invalid_jwt_claims", #{jwt => JWT, claims => Claims}),
{error, bad_username_or_password}
end.
@ -237,7 +247,8 @@ acl(Claims, AclClaimName) ->
#{AclClaimName := Rules} ->
#{
acl => #{
rules => Rules,
rules => parse_rules(Rules),
source_for_logging => jwt,
expire => maps:get(<<"exp">>, Claims, undefined)
}
};
@ -363,3 +374,24 @@ binary_to_number(Bin) ->
_ -> false
end
end.
%% Pars rules which can be in two different formats:
%% 1. #{<<"pub">> => [<<"a/b">>, <<"c/d">>], <<"sub">> => [...], <<"all">> => [...]}
%% 2. [#{<<"permission">> => <<"allow">>, <<"action">> => <<"publish">>, <<"topic">> => <<"a/b">>}, ...]
parse_rules(Rules) when is_map(Rules) ->
Rules;
parse_rules(Rules) when is_list(Rules) ->
lists:map(fun parse_rule/1, Rules).
parse_rule(Rule) ->
case emqx_authz_rule_raw:parse_rule(Rule) of
{ok, {Permission, Action, Topics}} ->
try
emqx_authz_rule:compile({Permission, all, Action, Topics})
catch
throw:Reason ->
throw({bad_acl_rule, Reason})
end;
{error, Reason} ->
throw({bad_acl_rule, Reason})
end.

View File

@ -78,7 +78,7 @@ end_per_testcase(_TestCase, _Config) ->
%%------------------------------------------------------------------------------
t_topic_rules(_Config) ->
Payload = #{
JWT = #{
<<"exp">> => erlang:system_time(second) + 60,
<<"acl">> => #{
<<"pub">> => [
@ -99,7 +99,47 @@ t_topic_rules(_Config) ->
},
<<"username">> => <<"username">>
},
JWT = generate_jws(Payload),
test_topic_rules(JWT).
t_topic_rules_v2(_Config) ->
JWT = #{
<<"exp">> => erlang:system_time(second) + 60,
<<"acl">> => [
#{
<<"permission">> => <<"allow">>,
<<"action">> => <<"pub">>,
<<"topics">> => [
<<"eq testpub1/${username}">>,
<<"testpub2/${clientid}">>,
<<"testpub3/#">>
]
},
#{
<<"permission">> => <<"allow">>,
<<"action">> => <<"sub">>,
<<"topics">> =>
[
<<"eq testsub1/${username}">>,
<<"testsub2/${clientid}">>,
<<"testsub3/#">>
]
},
#{
<<"permission">> => <<"allow">>,
<<"action">> => <<"all">>,
<<"topics">> => [
<<"eq testall1/${username}">>,
<<"testall2/${clientid}">>,
<<"testall3/#">>
]
}
],
<<"username">> => <<"username">>
},
test_topic_rules(JWT).
test_topic_rules(JWTInput) ->
JWT = generate_jws(JWTInput),
{ok, C} = emqtt:start_link(
[
@ -350,6 +390,64 @@ t_check_undefined_expire(_Config) ->
emqx_authz_client_info:authorize(Client, ?AUTHZ_SUBSCRIBE, <<"a/bar">>, undefined)
).
t_invalid_rule(_Config) ->
emqx_logger:set_log_level(debug),
MakeJWT = fun(Acl) ->
generate_jws(#{
<<"exp">> => erlang:system_time(second) + 60,
<<"username">> => <<"username">>,
<<"acl">> => Acl
})
end,
InvalidAclList =
[
%% missing action
[#{<<"permission">> => <<"invalid">>}],
%% missing topic or topics
[#{<<"permission">> => <<"allow">>, <<"action">> => <<"pub">>}],
%% invlaid permission, must be allow | deny
[
#{
<<"permission">> => <<"invalid">>,
<<"action">> => <<"pub">>,
<<"topic">> => <<"t">>
}
],
%% invalid action, must be pub | sub | all
[
#{
<<"permission">> => <<"allow">>,
<<"action">> => <<"invalid">>,
<<"topic">> => <<"t">>
}
],
%% invalid qos
[
#{
<<"permission">> => <<"allow">>,
<<"action">> => <<"pub">>,
<<"topics">> => [<<"t">>],
<<"qos">> => 3
}
]
],
lists:foreach(
fun(InvalidAcl) ->
{ok, C} = emqtt:start_link(
[
{clean_start, true},
{proto_ver, v5},
{clientid, <<"clientid">>},
{username, <<"username">>},
{password, MakeJWT(InvalidAcl)}
]
),
unlink(C),
?assertMatch({error, {bad_username_or_password, _}}, emqtt:connect(C))
end,
InvalidAclList
).
%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------

View File

@ -190,7 +190,7 @@ t_normalize_rules(_Config) ->
?assertException(
error,
{invalid_rule, _},
#{reason := invalid_rule},
emqx_authz_mnesia:store_rules(
{username, <<"username">>},
[[<<"allow">>, <<"publish">>, <<"t">>]]
@ -199,16 +199,22 @@ t_normalize_rules(_Config) ->
?assertException(
error,
{invalid_action, _},
#{reason := invalid_action},
emqx_authz_mnesia:store_rules(
{username, <<"username">>},
[#{<<"permission">> => <<"allow">>, <<"action">> => <<"pub">>, <<"topic">> => <<"t">>}]
[
#{
<<"permission">> => <<"allow">>,
<<"action">> => <<"badaction">>,
<<"topic">> => <<"t">>
}
]
)
),
?assertException(
error,
{invalid_permission, _},
#{reason := invalid_permission},
emqx_authz_mnesia:store_rules(
{username, <<"username">>},
[

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.8.0"}}}
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}}
, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}
, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}
, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.8.0"}}}
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}}
, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}
, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}
, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-2"}}}
{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0.3"}}}
, {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}}
, {emqx_bridge, {path, "../../apps/emqx_bridge"}}

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{registered, []},
{applications, [
kernel,

View File

@ -428,7 +428,8 @@ do_get_status(ResourceId, Timeout) ->
msg => "ehttpc_health_check_failed",
connector => ResourceId,
reason => Reason,
worker => Worker
worker => Worker,
wait_time => Timeout
}),
false
end

View File

@ -310,7 +310,7 @@ gcp_pubsub_config(Config) ->
io_lib:format(
"bridges.gcp_pubsub.~s {\n"
" enable = true\n"
" connect_timeout = 1s\n"
" connect_timeout = 5s\n"
" service_account_json = ~s\n"
" payload_template = ~p\n"
" pubsub_topic = ~s\n"
@ -1404,8 +1404,23 @@ t_failure_no_body(Config) ->
),
ok.
kill_gun_process(EhttpcPid) ->
State = ehttpc:get_state(EhttpcPid, minimal),
GunPid = maps:get(client, State),
true = is_pid(GunPid),
_ = exit(GunPid, kill),
ok.
kill_gun_processes(ConnectorResourceId) ->
Pool = ehttpc:workers(ConnectorResourceId),
Workers = lists:map(fun({_, Pid}) -> Pid end, Pool),
%% assert there is at least one pool member
?assertMatch([_ | _], Workers),
lists:foreach(fun(Pid) -> kill_gun_process(Pid) end, Workers).
t_unrecoverable_error(Config) ->
ActionResourceId = ?config(action_resource_id, Config),
ConnectorResourceId = ?config(connector_resource_id, Config),
TelemetryTable = ?config(telemetry_table, Config),
TestPid = self(),
FailureNoBodyHandler =
@ -1415,10 +1430,7 @@ t_unrecoverable_error(Config) ->
%% kill the gun process while it's waiting for the
%% response so we provoke an `{error, _}' response from
%% ehttpc.
lists:foreach(
fun(Pid) -> exit(Pid, kill) end,
[Pid || {_, Pid, _, _} <- supervisor:which_children(gun_sup)]
),
ok = kill_gun_processes(ConnectorResourceId),
Rep = cowboy_req:reply(
200,
#{<<"content-type">> => <<"application/json">>},
@ -1531,7 +1543,7 @@ t_get_status_down(Config) ->
t_get_status_timeout_calling_workers(Config) ->
ResourceId = ?config(connector_resource_id, Config),
{ok, _} = create_bridge(Config),
{ok, _} = create_bridge(Config, #{<<"connect_timeout">> => <<"1s">>}),
emqx_common_test_helpers:with_mock(
ehttpc,
health_check,

View File

@ -6,7 +6,7 @@
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}},
{greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.2"}}}
{greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.6"}}}
]}.
{plugins, [rebar3_path_deps]}.
{project_plugins, [erlfmt]}.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_greptimedb, [
{description, "EMQX GreptimeDB Bridge"},
{vsn, "0.1.6"},
{vsn, "0.1.7"},
{registered, []},
{applications, [
kernel,

View File

@ -21,8 +21,11 @@
on_stop/2,
on_query/3,
on_batch_query/3,
on_query_async/4,
on_batch_query_async/4,
on_get_status/2
]).
-export([reply_callback/2]).
-export([
roots/0,
@ -57,7 +60,7 @@
%% -------------------------------------------------------------------------------------------------
%% resource callback
callback_mode() -> always_sync.
callback_mode() -> async_if_possible.
on_start(InstId, Config) ->
%% InstID as pool would be handled by greptimedb client
@ -110,6 +113,49 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client
{error, {unrecoverable_error, Reason}}
end.
on_query_async(
InstId,
{send_message, Data},
{ReplyFun, Args},
_State = #{write_syntax := SyntaxLines, client := Client}
) ->
case data_to_points(Data, SyntaxLines) of
{ok, Points} ->
?tp(
greptimedb_connector_send_query,
#{points => Points, batch => false, mode => async}
),
do_async_query(InstId, Client, Points, {ReplyFun, Args});
{error, ErrorPoints} = Err ->
?tp(
greptimedb_connector_send_query_error,
#{batch => false, mode => async, error => ErrorPoints}
),
log_error_points(InstId, ErrorPoints),
Err
end.
on_batch_query_async(
InstId,
BatchData,
{ReplyFun, Args},
#{write_syntax := SyntaxLines, client := Client}
) ->
case parse_batch_data(InstId, BatchData, SyntaxLines) of
{ok, Points} ->
?tp(
greptimedb_connector_send_query,
#{points => Points, batch => true, mode => async}
),
do_async_query(InstId, Client, Points, {ReplyFun, Args});
{error, Reason} ->
?tp(
greptimedb_connector_send_query_error,
#{batch => true, mode => async, error => Reason}
),
{error, {unrecoverable_error, Reason}}
end.
on_get_status(_InstId, #{client := Client}) ->
case greptimedb:is_alive(Client) of
true ->
@ -344,6 +390,31 @@ do_query(InstId, Client, Points) ->
end
end.
do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
?SLOG(info, #{
msg => "greptimedb_write_point_async",
connector => InstId,
points => Points
}),
WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]},
ok = greptimedb:async_write_batch(Client, Points, WrappedReplyFunAndArgs).
reply_callback(ReplyFunAndArgs, {error, {unauth, _, _}}) ->
?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}),
Result = {error, {unrecoverable_error, <<"authorization failure">>}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
reply_callback(ReplyFunAndArgs, {error, Reason} = Error) ->
case is_unrecoverable_error(Error) of
true ->
Result = {error, {unrecoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
false ->
Result = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
end;
reply_callback(ReplyFunAndArgs, Result) ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
%% -------------------------------------------------------------------------------------------------
%% Tags & Fields Config Trans

View File

@ -25,16 +25,23 @@ groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{with_batch, [
{group, sync_query}
{group, sync_query},
{group, async_query}
]},
{without_batch, [
{group, sync_query}
{group, sync_query},
{group, async_query}
]},
{sync_query, [
{group, grpcv1_tcp}
%% uncomment tls when we are ready
%% {group, grpcv1_tls}
]},
{async_query, [
{group, grpcv1_tcp}
%% uncomment tls when we are ready
%% {group, grpcv1_tls}
]},
{grpcv1_tcp, TCs}
%%{grpcv1_tls, TCs}
].
@ -130,6 +137,8 @@ init_per_group(GreptimedbType, Config0) when
end;
init_per_group(sync_query, Config) ->
[{query_mode, sync} | Config];
init_per_group(async_query, Config) ->
[{query_mode, async} | Config];
init_per_group(with_batch, Config) ->
[{batch_size, 100} | Config];
init_per_group(without_batch, Config) ->
@ -420,6 +429,9 @@ t_start_ok(Config) ->
?check_trace(
begin
case QueryMode of
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500);
sync ->
?assertMatch({ok, _}, send_message(Config, SentData))
end,
@ -666,6 +678,9 @@ t_const_timestamp(Config) ->
<<"timestamp">> => erlang:system_time(millisecond)
},
case QueryMode of
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500);
sync ->
?assertMatch({ok, _}, send_message(Config, SentData))
end,
@ -709,9 +724,12 @@ t_boolean_variants(Config) ->
},
case QueryMode of
sync ->
?assertMatch({ok, _}, send_message(Config, SentData))
?assertMatch({ok, _}, send_message(Config, SentData));
async ->
?assertMatch(ok, send_message(Config, SentData))
end,
case QueryMode of
async -> ct:sleep(500);
sync -> ok
end,
PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config),
@ -779,11 +797,29 @@ t_bad_timestamp(Config) ->
#{?snk_kind := greptimedb_connector_send_query_error},
10_000
),
fun(Result, _Trace) ->
fun(Result, Trace) ->
?assertMatch({_, {ok, _}}, Result),
{Return, {ok, _}} = Result,
IsBatch = BatchSize > 1,
case {QueryMode, IsBatch} of
{async, true} ->
?assertEqual(ok, Return),
?assertMatch(
[#{error := points_trans_failed}],
?of_kind(greptimedb_connector_send_query_error, Trace)
);
{async, false} ->
?assertEqual(ok, Return),
?assertMatch(
[
#{
error := [
{error, {bad_timestamp, <<"bad_timestamp">>}}
]
}
],
?of_kind(greptimedb_connector_send_query_error, Trace)
);
{sync, false} ->
?assertEqual(
{error, [
@ -907,17 +943,34 @@ t_write_failure(Config) ->
{error, {resource_error, #{reason := timeout}}},
send_message(Config, SentData)
),
#{?snk_kind := greptimedb_connector_do_query_failure, action := nack},
16_000
#{?snk_kind := handle_async_reply, action := nack},
1_000
);
async ->
?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := handle_async_reply},
1_000
)
end
end),
fun(Trace) ->
fun(Trace0) ->
case QueryMode of
sync ->
?assertMatch(
[#{error := _} | _],
?of_kind(greptimedb_connector_do_query_failure, Trace)
Trace = ?of_kind(handle_async_reply, Trace0),
?assertMatch([_ | _], Trace),
[#{result := Result} | _] = Trace,
?assert(
not emqx_bridge_greptimedb_connector:is_unrecoverable_error(Result),
#{got => Result}
);
async ->
Trace = ?of_kind(handle_async_reply, Trace0),
?assertMatch([_ | _], Trace),
[#{result := Result} | _] = Trace,
?assert(
not emqx_bridge_greptimedb_connector:is_unrecoverable_error(Result),
#{got => Result}
)
end,
ok
@ -1029,6 +1082,23 @@ t_authentication_error_on_send_message(Config0) ->
?assertMatch(
{error, {unrecoverable_error, <<"authorization failure">>}},
send_message(Config, SentData)
);
async ->
?check_trace(
begin
?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := handle_async_reply},
1_000
)
end,
fun(Trace) ->
?assertMatch(
[#{error := <<"authorization failure">>} | _],
?of_kind(greptimedb_connector_do_query_failure, Trace)
),
ok
end
)
end,
ok.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_http, [
{description, "EMQX HTTP Bridge and Connector Application"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{registered, []},
{applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]},
{env, [{emqx_action_info_modules, [emqx_bridge_http_action_info]}]},

View File

@ -205,7 +205,9 @@ on_start(
http ->
{tcp, []};
https ->
SSLOpts = emqx_tls_lib:to_client_opts(maps:get(ssl, Config)),
SSLConf = maps:get(ssl, Config),
%% force enable ssl
SSLOpts = emqx_tls_lib:to_client_opts(SSLConf#{enable => true}),
{tls, SSLOpts}
end,
NTransportOpts = emqx_utils:ipv6_probe(TransportOpts),

View File

@ -124,7 +124,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
t_tls_verify_none(Config) ->
PoolName = <<"emqx_bridge_influxdb_connector_SUITE">>,
PoolName = <<"testpool-1">>,
Host = ?config(influxdb_tls_host, Config),
Port = ?config(influxdb_tls_port, Config),
InitialConfig = influxdb_config(Host, Port, true, <<"verify_none">>),
@ -135,7 +135,7 @@ t_tls_verify_none(Config) ->
ok.
t_tls_verify_peer(Config) ->
PoolName = <<"emqx_bridge_influxdb_connector_SUITE">>,
PoolName = <<"testpool-2">>,
Host = ?config(influxdb_tls_host, Config),
Port = ?config(influxdb_tls_port, Config),
InitialConfig = influxdb_config(Host, Port, true, <<"verify_peer">>),
@ -157,7 +157,11 @@ perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) ->
to_client_opts,
fun(Opts) ->
Verify = {verify_fun, {custom_verify(), {return, VerifyReturn}}},
[Verify | meck:passthrough([Opts])]
[
Verify,
{cacerts, public_key:cacerts_get()}
| meck:passthrough([Opts])
]
end
),
try

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.8.0"}}}
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}}
, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}
, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}
, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_kafka, [
{description, "EMQX Enterprise Kafka Bridge"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{registered, [emqx_bridge_kafka_consumer_sup]},
{applications, [
kernel,

View File

@ -539,7 +539,7 @@ check_topic_and_leader_connections(ClientId, KafkaTopic) ->
kafka_client => ClientId,
kafka_topic => KafkaTopic
});
{error, restarting} ->
{error, client_supervisor_not_initialized} ->
throw(#{
reason => restarting,
kafka_client => ClientId,
@ -620,16 +620,19 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
partition_count_refresh_interval := PCntRefreshInterval,
max_inflight := MaxInflight,
buffer := #{
mode := BufferMode,
mode := BufferMode0,
per_partition_limit := PerPartitionLimit,
segment_bytes := SegmentBytes,
memory_overload_protection := MemOLP0
memory_overload_protection := MemOLP
}
} = Input,
MemOLP =
case os:type() of
{unix, linux} -> MemOLP0;
_ -> false
%% avoid creating dirs for probing producers
BufferMode =
case IsDryRun of
true ->
memory;
false ->
BufferMode0
end,
{OffloadMode, ReplayqDir} =
case BufferMode of
@ -638,7 +641,6 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
hybrid -> {true, replayq_dir(BridgeType, BridgeName)}
end,
#{
name => make_producer_name(BridgeType, BridgeName, IsDryRun),
partitioner => partitioner(PartitionStrategy),
partition_count_refresh_interval_seconds => PCntRefreshInterval,
replayq_dir => ReplayqDir,
@ -669,18 +671,6 @@ replayq_dir(BridgeType, BridgeName) ->
]),
filename:join([emqx:data_dir(), "kafka", DirName]).
%% Producer name must be an atom which will be used as a ETS table name for
%% partition worker lookup.
make_producer_name(_BridgeType, _BridgeName, true = _IsDryRun) ->
%% It is a dry run and we don't want to leak too many atoms
%% so we use the default producer name instead of creating
%% an unique name.
probing_wolff_producers;
make_producer_name(BridgeType, BridgeName, _IsDryRun) ->
%% Woff needs an atom for ets table name registration. The assumption here is
%% that bridges with new names are not often created.
binary_to_atom(iolist_to_binary([BridgeType, "_", bin(BridgeName)])).
with_log_at_error(Fun, Log) ->
try
Fun()

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-2"}}}
{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0.3"}}}
, {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}}
, {emqx_bridge, {path, "../../apps/emqx_bridge"}}

Some files were not shown because too many files have changed in this diff Show More