Merge remote-tracking branch 'origin/master' into 0202-merge-release-50-back-to-master
This commit is contained in:
commit
94768c9f44
|
@ -25,7 +25,7 @@ jobs:
|
|||
prepare:
|
||||
runs-on: ubuntu-20.04
|
||||
# prepare source with any OTP version, no need for a matrix
|
||||
container: "ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-24.3.4.2-1-ubuntu20.04"
|
||||
container: "ghcr.io/emqx/emqx-builder/5.0-27:1.13.4-24.3.4.2-1-ubuntu20.04"
|
||||
|
||||
outputs:
|
||||
PROFILE: ${{ steps.get_profile.outputs.PROFILE }}
|
||||
|
@ -129,7 +129,7 @@ jobs:
|
|||
# NOTE: 'otp' and 'elixir' are to configure emqx-builder image
|
||||
# only support latest otp and elixir, not a matrix
|
||||
builder:
|
||||
- 5.0-26 # update to latest
|
||||
- 5.0-27 # update to latest
|
||||
otp:
|
||||
- 24.3.4.2-1 # switch to 25 once ready to release 5.1
|
||||
elixir:
|
||||
|
|
|
@ -23,7 +23,7 @@ on:
|
|||
jobs:
|
||||
prepare:
|
||||
runs-on: ubuntu-20.04
|
||||
container: ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-24.3.4.2-1-ubuntu20.04
|
||||
container: ghcr.io/emqx/emqx-builder/5.0-27:1.13.4-24.3.4.2-1-ubuntu20.04
|
||||
outputs:
|
||||
BUILD_PROFILE: ${{ steps.get_profile.outputs.BUILD_PROFILE }}
|
||||
IS_EXACT_TAG: ${{ steps.get_profile.outputs.IS_EXACT_TAG }}
|
||||
|
@ -204,6 +204,7 @@ jobs:
|
|||
- amd64
|
||||
- arm64
|
||||
os:
|
||||
- ubuntu22.04
|
||||
- ubuntu20.04
|
||||
- ubuntu18.04
|
||||
- debian11
|
||||
|
@ -215,7 +216,7 @@ jobs:
|
|||
- aws-arm64
|
||||
- ubuntu-20.04
|
||||
builder:
|
||||
- 5.0-26
|
||||
- 5.0-27
|
||||
elixir:
|
||||
- 1.13.4
|
||||
exclude:
|
||||
|
@ -227,17 +228,17 @@ jobs:
|
|||
- profile: emqx
|
||||
otp: 25.1.2-2
|
||||
arch: amd64
|
||||
os: ubuntu20.04
|
||||
build_machine: ubuntu-20.04
|
||||
builder: 5.0-26
|
||||
os: ubuntu22.04
|
||||
build_machine: ubuntu-22.04
|
||||
builder: 5.0-27
|
||||
elixir: 1.13.4
|
||||
release_with: elixir
|
||||
- profile: emqx
|
||||
otp: 25.1.2-2
|
||||
arch: amd64
|
||||
os: amzn2
|
||||
build_machine: ubuntu-20.04
|
||||
builder: 5.0-26
|
||||
build_machine: ubuntu-22.04
|
||||
builder: 5.0-27
|
||||
elixir: 1.13.4
|
||||
release_with: elixir
|
||||
|
||||
|
|
|
@ -30,10 +30,12 @@ jobs:
|
|||
matrix:
|
||||
profile:
|
||||
- ["emqx", "24.3.4.2-1", "el7"]
|
||||
- ["emqx", "25.1.2-2", "ubuntu20.04"]
|
||||
- ["emqx", "24.3.4.2-1", "ubuntu20.04"]
|
||||
- ["emqx", "25.1.2-2", "ubuntu22.04"]
|
||||
- ["emqx-enterprise", "24.3.4.2-1", "ubuntu20.04"]
|
||||
- ["emqx-enterprise", "25.1.2-2", "ubuntu22.04"]
|
||||
builder:
|
||||
- 5.0-26
|
||||
- 5.0-27
|
||||
elixir:
|
||||
- 1.13.4
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ on: [pull_request, push]
|
|||
jobs:
|
||||
check_deps_integrity:
|
||||
runs-on: ubuntu-20.04
|
||||
container: ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-25.1.2-2-ubuntu20.04
|
||||
container: ghcr.io/emqx/emqx-builder/5.0-27:1.13.4-25.1.2-2-ubuntu20.04
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
|
|
@ -5,7 +5,7 @@ on: [pull_request]
|
|||
jobs:
|
||||
code_style_check:
|
||||
runs-on: ubuntu-20.04
|
||||
container: "ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-25.1.2-2-ubuntu20.04"
|
||||
container: "ghcr.io/emqx/emqx-builder/5.0-27:1.13.4-25.1.2-2-ubuntu20.04"
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
with:
|
||||
|
|
|
@ -8,7 +8,7 @@ jobs:
|
|||
elixir_apps_check:
|
||||
runs-on: ubuntu-latest
|
||||
# just use the latest builder
|
||||
container: "ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-25.1.2-2-ubuntu20.04"
|
||||
container: "ghcr.io/emqx/emqx-builder/5.0-27:1.13.4-25.1.2-2-ubuntu20.04"
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
|
|
|
@ -7,7 +7,7 @@ on: [pull_request, push]
|
|||
jobs:
|
||||
elixir_deps_check:
|
||||
runs-on: ubuntu-20.04
|
||||
container: ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-25.1.2-2-ubuntu20.04
|
||||
container: ghcr.io/emqx/emqx-builder/5.0-27:1.13.4-25.1.2-2-ubuntu20.04
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
|
|
@ -17,7 +17,7 @@ jobs:
|
|||
profile:
|
||||
- emqx
|
||||
- emqx-enterprise
|
||||
container: ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-25.1.2-2-ubuntu20.04
|
||||
container: ghcr.io/emqx/emqx-builder/5.0-27:1.13.4-25.1.2-2-ubuntu20.04
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
|
|
|
@ -12,7 +12,7 @@ jobs:
|
|||
strategy:
|
||||
matrix:
|
||||
builder:
|
||||
- 5.0-26
|
||||
- 5.0-27
|
||||
otp:
|
||||
- 24.3.4.2-1
|
||||
- 25.1.2-2
|
||||
|
|
|
@ -16,7 +16,7 @@ jobs:
|
|||
prepare:
|
||||
runs-on: ubuntu-20.04
|
||||
# prepare source with any OTP version, no need for a matrix
|
||||
container: ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-24.3.4.2-1-alpine3.15.1
|
||||
container: ghcr.io/emqx/emqx-builder/5.0-27:1.13.4-24.3.4.2-1-alpine3.15.1
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
@ -49,7 +49,7 @@ jobs:
|
|||
os:
|
||||
- ["alpine3.15.1", "alpine:3.15.1"]
|
||||
builder:
|
||||
- 5.0-26
|
||||
- 5.0-27
|
||||
otp:
|
||||
- 24.3.4.2-1
|
||||
elixir:
|
||||
|
@ -122,7 +122,7 @@ jobs:
|
|||
os:
|
||||
- ["debian11", "debian:11-slim"]
|
||||
builder:
|
||||
- 5.0-26
|
||||
- 5.0-27
|
||||
otp:
|
||||
- 24.3.4.2-1
|
||||
elixir:
|
||||
|
|
|
@ -15,7 +15,7 @@ on:
|
|||
jobs:
|
||||
relup_test_plan:
|
||||
runs-on: ubuntu-20.04
|
||||
container: "ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-24.3.4.2-1-ubuntu20.04"
|
||||
container: "ghcr.io/emqx/emqx-builder/5.0-27:1.13.4-24.3.4.2-1-ubuntu20.04"
|
||||
outputs:
|
||||
CUR_EE_VSN: ${{ steps.find-versions.outputs.CUR_EE_VSN }}
|
||||
OLD_VERSIONS: ${{ steps.find-versions.outputs.OLD_VERSIONS }}
|
||||
|
|
|
@ -30,12 +30,12 @@ jobs:
|
|||
MATRIX="$(echo "${APPS}" | jq -c '
|
||||
[
|
||||
(.[] | select(.profile == "emqx") | . + {
|
||||
builder: "5.0-26",
|
||||
builder: "5.0-27",
|
||||
otp: "25.1.2-2",
|
||||
elixir: "1.13.4"
|
||||
}),
|
||||
(.[] | select(.profile == "emqx-enterprise") | . + {
|
||||
builder: "5.0-26",
|
||||
builder: "5.0-27",
|
||||
otp: ["24.3.4.2-1", "25.1.2-2"][],
|
||||
elixir: "1.13.4"
|
||||
})
|
||||
|
@ -223,7 +223,7 @@ jobs:
|
|||
- ct
|
||||
- ct_docker
|
||||
runs-on: ubuntu-20.04
|
||||
container: "ghcr.io/emqx/emqx-builder/5.0-26:1.13.4-24.3.4.2-1-ubuntu20.04"
|
||||
container: "ghcr.io/emqx/emqx-builder/5.0-27:1.13.4-24.3.4.2-1-ubuntu20.04"
|
||||
steps:
|
||||
- uses: AutoModality/action-clean@v1
|
||||
- uses: actions/download-artifact@v3
|
||||
|
|
|
@ -196,13 +196,13 @@ do_unsubscribe(Topic, SubPid, SubOpts) ->
|
|||
true = ets:delete(?SUBOPTION, {Topic, SubPid}),
|
||||
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
|
||||
Group = maps:get(share, SubOpts, undefined),
|
||||
do_unsubscribe(Group, Topic, SubPid, SubOpts),
|
||||
emqx_exclusive_subscription:unsubscribe(Topic, SubOpts).
|
||||
do_unsubscribe(Group, Topic, SubPid, SubOpts).
|
||||
|
||||
do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
|
||||
case maps:get(shard, SubOpts, 0) of
|
||||
0 ->
|
||||
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
|
||||
emqx_exclusive_subscription:unsubscribe(Topic, SubOpts),
|
||||
cast(pick(Topic), {unsubscribed, Topic});
|
||||
I ->
|
||||
true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
||||
|
@ -366,14 +366,7 @@ subscriber_down(SubPid) ->
|
|||
SubOpts when is_map(SubOpts) ->
|
||||
_ = emqx_broker_helper:reclaim_seq(Topic),
|
||||
true = ets:delete(?SUBOPTION, {Topic, SubPid}),
|
||||
case maps:get(shard, SubOpts, 0) of
|
||||
0 ->
|
||||
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
|
||||
ok = cast(pick(Topic), {unsubscribed, Topic});
|
||||
I ->
|
||||
true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
||||
ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
|
||||
end;
|
||||
do_unsubscribe(undefined, Topic, SubPid, SubOpts);
|
||||
undefined ->
|
||||
ok
|
||||
end
|
||||
|
|
|
@ -32,7 +32,8 @@
|
|||
|
||||
-export([
|
||||
check_subscribe/2,
|
||||
unsubscribe/2
|
||||
unsubscribe/2,
|
||||
clear/0
|
||||
]).
|
||||
|
||||
%% Internal exports (RPC)
|
||||
|
@ -77,7 +78,7 @@ on_add_module() ->
|
|||
mnesia(boot).
|
||||
|
||||
on_delete_module() ->
|
||||
mria:clear_table(?EXCLUSIVE_SHARD).
|
||||
clear().
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
|
@ -101,6 +102,9 @@ unsubscribe(Topic, #{is_exclusive := true}) ->
|
|||
unsubscribe(_Topic, _SubOpts) ->
|
||||
ok.
|
||||
|
||||
clear() ->
|
||||
mria:clear_table(?TAB).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -0,0 +1,159 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_exclusive_sub_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(EXCLUSIVE_TOPIC, <<"$exclusive/t/1">>).
|
||||
-define(NORMAL_TOPIC, <<"t/1">>).
|
||||
|
||||
-define(CHECK_SUB(Client, Code), ?CHECK_SUB(Client, ?EXCLUSIVE_TOPIC, Code)).
|
||||
-define(CHECK_SUB(Client, Topic, Code),
|
||||
{ok, _, [Code]} = emqtt:subscribe(Client, Topic, [])
|
||||
).
|
||||
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
ok = ekka:start(),
|
||||
OldConf = emqx:get_config([zones], #{}),
|
||||
emqx_config:put_zone_conf(default, [mqtt, exclusive_subscription], true),
|
||||
timer:sleep(50),
|
||||
[{old_conf, OldConf} | Config].
|
||||
|
||||
end_per_suite(Config) ->
|
||||
emqx_config:put([zones], proplists:get_value(old_conf, Config)),
|
||||
ekka:stop(),
|
||||
mria:stop(),
|
||||
mria_mnesia:delete_schema(),
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
emqx_exclusive_subscription:clear().
|
||||
|
||||
t_exclusive_sub(_) ->
|
||||
{ok, C1} = emqtt:start_link([
|
||||
{clientid, <<"client1">>},
|
||||
{clean_start, false},
|
||||
{proto_ver, v5},
|
||||
{properties, #{'Session-Expiry-Interval' => 100}}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
?CHECK_SUB(C1, 0),
|
||||
|
||||
{ok, C2} = emqtt:start_link([
|
||||
{clientid, <<"client2">>},
|
||||
{clean_start, false},
|
||||
{proto_ver, v5}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C2),
|
||||
?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
|
||||
|
||||
%% keep exclusive even disconnected
|
||||
ok = emqtt:disconnect(C1),
|
||||
timer:sleep(1000),
|
||||
|
||||
?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
|
||||
|
||||
ok = emqtt:disconnect(C2).
|
||||
|
||||
t_allow_normal_sub(_) ->
|
||||
{ok, C1} = emqtt:start_link([
|
||||
{clientid, <<"client1">>},
|
||||
{proto_ver, v5}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
?CHECK_SUB(C1, 0),
|
||||
|
||||
{ok, C2} = emqtt:start_link([
|
||||
{clientid, <<"client2">>},
|
||||
{proto_ver, v5}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C2),
|
||||
?CHECK_SUB(C2, ?NORMAL_TOPIC, 0),
|
||||
|
||||
ok = emqtt:disconnect(C1),
|
||||
ok = emqtt:disconnect(C2).
|
||||
|
||||
t_unsub(_) ->
|
||||
{ok, C1} = emqtt:start_link([
|
||||
{clientid, <<"client1">>},
|
||||
{proto_ver, v5}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
?CHECK_SUB(C1, 0),
|
||||
|
||||
{ok, C2} = emqtt:start_link([
|
||||
{clientid, <<"client2">>},
|
||||
{proto_ver, v5}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C2),
|
||||
?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
|
||||
|
||||
{ok, #{}, [0]} = emqtt:unsubscribe(C1, ?EXCLUSIVE_TOPIC),
|
||||
|
||||
?CHECK_SUB(C2, 0),
|
||||
|
||||
ok = emqtt:disconnect(C1),
|
||||
ok = emqtt:disconnect(C2).
|
||||
|
||||
t_clean_session(_) ->
|
||||
erlang:process_flag(trap_exit, true),
|
||||
{ok, C1} = emqtt:start_link([
|
||||
{clientid, <<"client1">>},
|
||||
{clean_start, true},
|
||||
{proto_ver, v5},
|
||||
{properties, #{'Session-Expiry-Interval' => 0}}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
?CHECK_SUB(C1, 0),
|
||||
|
||||
{ok, C2} = emqtt:start_link([
|
||||
{clientid, <<"client2">>},
|
||||
{proto_ver, v5}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C2),
|
||||
?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
|
||||
|
||||
%% auto clean when session was cleand
|
||||
ok = emqtt:disconnect(C1),
|
||||
|
||||
timer:sleep(1000),
|
||||
|
||||
?CHECK_SUB(C2, 0),
|
||||
|
||||
ok = emqtt:disconnect(C2).
|
||||
|
||||
t_feat_disabled(_) ->
|
||||
OldConf = emqx:get_config([zones], #{}),
|
||||
emqx_config:put_zone_conf(default, [mqtt, exclusive_subscription], false),
|
||||
|
||||
{ok, C1} = emqtt:start_link([
|
||||
{clientid, <<"client1">>},
|
||||
{proto_ver, v5}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
?CHECK_SUB(C1, ?RC_TOPIC_FILTER_INVALID),
|
||||
ok = emqtt:disconnect(C1),
|
||||
|
||||
emqx_config:put([zones], OldConf).
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_authz, [
|
||||
{description, "An OTP application"},
|
||||
{vsn, "0.1.12"},
|
||||
{vsn, "0.1.13"},
|
||||
{registered, []},
|
||||
{mod, {emqx_authz_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -100,15 +100,17 @@ compile_topic(<<"eq ", Topic/binary>>) ->
|
|||
compile_topic({eq, Topic}) ->
|
||||
{eq, emqx_topic:words(bin(Topic))};
|
||||
compile_topic(Topic) ->
|
||||
Words = emqx_topic:words(bin(Topic)),
|
||||
case pattern(Words) of
|
||||
true -> {pattern, Words};
|
||||
false -> Words
|
||||
TopicBin = bin(Topic),
|
||||
case
|
||||
emqx_placeholder:preproc_tmpl(
|
||||
TopicBin,
|
||||
#{placeholders => [?PH_USERNAME, ?PH_CLIENTID]}
|
||||
)
|
||||
of
|
||||
[{str, _}] -> emqx_topic:words(TopicBin);
|
||||
Tokens -> {pattern, Tokens}
|
||||
end.
|
||||
|
||||
pattern(Words) ->
|
||||
lists:member(?PH_USERNAME, Words) orelse lists:member(?PH_CLIENTID, Words).
|
||||
|
||||
atom(B) when is_binary(B) ->
|
||||
try
|
||||
binary_to_existing_atom(B, utf8)
|
||||
|
@ -202,8 +204,8 @@ match_who(_, _) ->
|
|||
match_topics(_ClientInfo, _Topic, []) ->
|
||||
false;
|
||||
match_topics(ClientInfo, Topic, [{pattern, PatternFilter} | Filters]) ->
|
||||
TopicFilter = feed_var(ClientInfo, PatternFilter),
|
||||
match_topic(emqx_topic:words(Topic), TopicFilter) orelse
|
||||
TopicFilter = emqx_placeholder:proc_tmpl(PatternFilter, ClientInfo),
|
||||
match_topic(emqx_topic:words(Topic), emqx_topic:words(TopicFilter)) orelse
|
||||
match_topics(ClientInfo, Topic, Filters);
|
||||
match_topics(ClientInfo, Topic, [TopicFilter | Filters]) ->
|
||||
match_topic(emqx_topic:words(Topic), TopicFilter) orelse
|
||||
|
@ -213,18 +215,3 @@ match_topic(Topic, {'eq', TopicFilter}) ->
|
|||
Topic =:= TopicFilter;
|
||||
match_topic(Topic, TopicFilter) ->
|
||||
emqx_topic:match(Topic, TopicFilter).
|
||||
|
||||
feed_var(ClientInfo, Pattern) ->
|
||||
feed_var(ClientInfo, Pattern, []).
|
||||
feed_var(_ClientInfo, [], Acc) ->
|
||||
lists:reverse(Acc);
|
||||
feed_var(ClientInfo = #{clientid := undefined}, [?PH_CLIENTID | Words], Acc) ->
|
||||
feed_var(ClientInfo, Words, [?PH_CLIENTID | Acc]);
|
||||
feed_var(ClientInfo = #{clientid := ClientId}, [?PH_CLIENTID | Words], Acc) ->
|
||||
feed_var(ClientInfo, Words, [ClientId | Acc]);
|
||||
feed_var(ClientInfo = #{username := undefined}, [?PH_USERNAME | Words], Acc) ->
|
||||
feed_var(ClientInfo, Words, [?PH_USERNAME | Acc]);
|
||||
feed_var(ClientInfo = #{username := Username}, [?PH_USERNAME | Words], Acc) ->
|
||||
feed_var(ClientInfo, Words, [Username | Acc]);
|
||||
feed_var(ClientInfo, [W | Words], Acc) ->
|
||||
feed_var(ClientInfo, Words, [W | Acc]).
|
||||
|
|
|
@ -35,6 +35,7 @@
|
|||
]},
|
||||
publish, [?PH_S_USERNAME, ?PH_S_CLIENTID]}
|
||||
).
|
||||
-define(SOURCE6, {allow, {username, "test"}, publish, ["t/foo${username}boo"]}).
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
@ -80,7 +81,7 @@ t_compile(_) ->
|
|||
{{127, 0, 0, 1}, {127, 0, 0, 1}, 32},
|
||||
{{192, 168, 1, 0}, {192, 168, 1, 255}, 24}
|
||||
]},
|
||||
subscribe, [{pattern, [?PH_CLIENTID]}]},
|
||||
subscribe, [{pattern, [{var, {var, <<"clientid">>}}]}]},
|
||||
emqx_authz_rule:compile(?SOURCE3)
|
||||
),
|
||||
|
||||
|
@ -97,9 +98,18 @@ t_compile(_) ->
|
|||
{username, {re_pattern, _, _, _, _}},
|
||||
{clientid, {re_pattern, _, _, _, _}}
|
||||
]},
|
||||
publish, [{pattern, [?PH_USERNAME]}, {pattern, [?PH_CLIENTID]}]},
|
||||
publish, [
|
||||
{pattern, [{var, {var, <<"username">>}}]}, {pattern, [{var, {var, <<"clientid">>}}]}
|
||||
]},
|
||||
emqx_authz_rule:compile(?SOURCE5)
|
||||
),
|
||||
|
||||
?assertEqual(
|
||||
{allow, {username, {eq, <<"test">>}}, publish, [
|
||||
{pattern, [{str, <<"t/foo">>}, {var, {var, <<"username">>}}, {str, <<"boo">>}]}
|
||||
]},
|
||||
emqx_authz_rule:compile(?SOURCE6)
|
||||
),
|
||||
ok.
|
||||
|
||||
t_match(_) ->
|
||||
|
@ -307,4 +317,24 @@ t_match(_) ->
|
|||
emqx_authz_rule:compile(?SOURCE5)
|
||||
)
|
||||
),
|
||||
|
||||
?assertEqual(
|
||||
nomatch,
|
||||
emqx_authz_rule:match(
|
||||
ClientInfo1,
|
||||
publish,
|
||||
<<"t/foo${username}boo">>,
|
||||
emqx_authz_rule:compile(?SOURCE6)
|
||||
)
|
||||
),
|
||||
|
||||
?assertEqual(
|
||||
{matched, allow},
|
||||
emqx_authz_rule:match(
|
||||
ClientInfo4,
|
||||
publish,
|
||||
<<"t/footestboo">>,
|
||||
emqx_authz_rule:compile(?SOURCE6)
|
||||
)
|
||||
),
|
||||
ok.
|
||||
|
|
|
@ -96,6 +96,16 @@ The configuration is only valid when the inet6 is true."""
|
|||
zh: "IPv6 only"
|
||||
}
|
||||
}
|
||||
proxy_header {
|
||||
desc {
|
||||
en: "Enable support for `HAProxy` header. Be aware once enabled regular HTTP requests can't be handled anymore."
|
||||
zh: "开启对 `HAProxy` 的支持,注意:一旦开启了这个功能,就无法再处理普通的 HTTP 请求了。"
|
||||
}
|
||||
label: {
|
||||
en: "Enable support for HAProxy header"
|
||||
zh: "开启对 `HAProxy` 的支持"
|
||||
}
|
||||
}
|
||||
desc_dashboard {
|
||||
desc {
|
||||
en: "Configuration for EMQX dashboard."
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
{application, emqx_dashboard, [
|
||||
{description, "EMQX Web Dashboard"},
|
||||
% strict semver, bump manually!
|
||||
{vsn, "5.0.12"},
|
||||
{vsn, "5.0.13"},
|
||||
{modules, []},
|
||||
{registered, [emqx_dashboard_sup]},
|
||||
{applications, [kernel, stdlib, mnesia, minirest, emqx]},
|
||||
|
|
|
@ -92,8 +92,8 @@ start_listeners(Listeners) ->
|
|||
},
|
||||
Res =
|
||||
lists:foldl(
|
||||
fun({Name, Protocol, Bind, RanchOptions}, Acc) ->
|
||||
Minirest = BaseMinirest#{protocol => Protocol},
|
||||
fun({Name, Protocol, Bind, RanchOptions, ProtoOpts}, Acc) ->
|
||||
Minirest = BaseMinirest#{protocol => Protocol, protocol_options => ProtoOpts},
|
||||
case minirest:start(Name, RanchOptions, Minirest) of
|
||||
{ok, _} ->
|
||||
?ULOG("Listener ~ts on ~ts started.~n", [
|
||||
|
@ -125,7 +125,7 @@ stop_listeners(Listeners) ->
|
|||
?SLOG(warning, #{msg => "stop_listener_failed", name => Name, port => Port})
|
||||
end
|
||||
end
|
||||
|| {Name, _, Port, _} <- listeners(Listeners)
|
||||
|| {Name, _, Port, _, _} <- listeners(Listeners)
|
||||
],
|
||||
ok.
|
||||
|
||||
|
@ -164,7 +164,13 @@ listeners(Listeners) ->
|
|||
maps:get(enable, Conf) andalso
|
||||
begin
|
||||
{Conf1, Bind} = ip_port(Conf),
|
||||
{true, {listener_name(Protocol), Protocol, Bind, ranch_opts(Conf1)}}
|
||||
{true, {
|
||||
listener_name(Protocol),
|
||||
Protocol,
|
||||
Bind,
|
||||
ranch_opts(Conf1),
|
||||
proto_opts(Conf1)
|
||||
}}
|
||||
end
|
||||
end,
|
||||
maps:to_list(Listeners)
|
||||
|
@ -197,7 +203,7 @@ ranch_opts(Options) ->
|
|||
SocketOpts = maps:fold(
|
||||
fun filter_false/3,
|
||||
[],
|
||||
maps:without([enable, inet6, ipv6_v6only | Keys], Options)
|
||||
maps:without([enable, inet6, ipv6_v6only, proxy_header | Keys], Options)
|
||||
),
|
||||
InetOpts =
|
||||
case Options of
|
||||
|
@ -210,6 +216,9 @@ ranch_opts(Options) ->
|
|||
end,
|
||||
RanchOpts#{socket_opts => InetOpts ++ SocketOpts}.
|
||||
|
||||
proto_opts(Options) ->
|
||||
maps:with([proxy_header], Options).
|
||||
|
||||
filter_false(_K, false, S) -> S;
|
||||
filter_false(K, V, S) -> [{K, V} | S].
|
||||
|
||||
|
|
|
@ -160,6 +160,14 @@ common_listener_fields() ->
|
|||
default => false,
|
||||
desc => ?DESC(ipv6_v6only)
|
||||
}
|
||||
)},
|
||||
{"proxy_header",
|
||||
?HOCON(
|
||||
boolean(),
|
||||
#{
|
||||
desc => ?DESC(proxy_header),
|
||||
default => false
|
||||
}
|
||||
)}
|
||||
].
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
-export([
|
||||
set_default_config/0,
|
||||
set_default_config/1,
|
||||
set_default_config/2,
|
||||
request/2,
|
||||
request/3,
|
||||
request/4,
|
||||
|
@ -36,6 +37,9 @@ set_default_config() ->
|
|||
set_default_config(<<"admin">>).
|
||||
|
||||
set_default_config(DefaultUsername) ->
|
||||
set_default_config(DefaultUsername, false).
|
||||
|
||||
set_default_config(DefaultUsername, HAProxyEnabled) ->
|
||||
Config = #{
|
||||
listeners => #{
|
||||
http => #{
|
||||
|
@ -46,7 +50,8 @@ set_default_config(DefaultUsername) ->
|
|||
max_connections => 512,
|
||||
num_acceptors => 4,
|
||||
send_timeout => 5000,
|
||||
backlog => 512
|
||||
backlog => 512,
|
||||
proxy_header => HAProxyEnabled
|
||||
}
|
||||
},
|
||||
default_username => DefaultUsername,
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_dashboard_haproxy_SUITE).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include("emqx_dashboard.hrl").
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_common_test_helpers:start_apps(
|
||||
[emqx_management, emqx_dashboard],
|
||||
fun set_special_configs/1
|
||||
),
|
||||
Config.
|
||||
|
||||
set_special_configs(emqx_dashboard) ->
|
||||
emqx_dashboard_api_test_helpers:set_default_config(<<"admin">>, true),
|
||||
ok;
|
||||
set_special_configs(_) ->
|
||||
ok.
|
||||
|
||||
end_per_suite(Config) ->
|
||||
application:unload(emqx_management),
|
||||
mnesia:clear_table(?ADMIN),
|
||||
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_management]),
|
||||
mria:stop(),
|
||||
Config.
|
||||
|
||||
t_status(_Config) ->
|
||||
ProxyInfo = #{
|
||||
version => 1,
|
||||
command => proxy,
|
||||
transport_family => ipv4,
|
||||
transport_protocol => stream,
|
||||
src_address => {127, 0, 0, 1},
|
||||
src_port => 444,
|
||||
dest_address => {192, 168, 0, 1},
|
||||
dest_port => 443
|
||||
},
|
||||
{ok, Socket} = gen_tcp:connect(
|
||||
"localhost",
|
||||
18083,
|
||||
[binary, {active, false}, {packet, raw}]
|
||||
),
|
||||
ok = gen_tcp:send(Socket, ranch_proxy_header:header(ProxyInfo)),
|
||||
{ok, Token} = emqx_dashboard_admin:sign_token(<<"admin">>, <<"public">>),
|
||||
ok = gen_tcp:send(
|
||||
Socket,
|
||||
"GET /status HTTP/1.1\r\n"
|
||||
"Host: localhost\r\n"
|
||||
"Authorization: Bearer " ++ binary_to_list(Token) ++
|
||||
"\r\n"
|
||||
"\r\n"
|
||||
),
|
||||
{_, 200, _, Rest0} = cow_http:parse_status_line(raw_recv_head(Socket)),
|
||||
{Headers, Body0} = cow_http:parse_headers(Rest0),
|
||||
{_, LenBin} = lists:keyfind(<<"content-length">>, 1, Headers),
|
||||
Len = binary_to_integer(LenBin),
|
||||
Body =
|
||||
if
|
||||
byte_size(Body0) =:= Len ->
|
||||
Body0;
|
||||
true ->
|
||||
{ok, Body1} = gen_tcp:recv(Socket, Len - byte_size(Body0), 5000),
|
||||
<<Body0/bits, Body1/bits>>
|
||||
end,
|
||||
?assertMatch({match, _}, re:run(Body, "Node .+ is started\nemqx is running")),
|
||||
ok.
|
||||
|
||||
raw_recv_head(Socket) ->
|
||||
{ok, Data} = gen_tcp:recv(Socket, 0, 10000),
|
||||
raw_recv_head(Socket, Data).
|
||||
|
||||
raw_recv_head(Socket, Buffer) ->
|
||||
case binary:match(Buffer, <<"\r\n\r\n">>) of
|
||||
nomatch ->
|
||||
{ok, Data} = gen_tcp:recv(Socket, 0, 10000),
|
||||
raw_recv_head(Socket, <<Buffer/binary, Data/binary>>);
|
||||
{_, _} ->
|
||||
Buffer
|
||||
end.
|
|
@ -229,6 +229,14 @@ t_banned_delayed(_) ->
|
|||
}),
|
||||
|
||||
snabbkaffe:start_trace(),
|
||||
{ok, SubRef} =
|
||||
snabbkaffe:subscribe(
|
||||
?match_event(#{?snk_kind := ignore_delayed_message_publish}),
|
||||
_NEvents = 2,
|
||||
_Timeout = 10000,
|
||||
0
|
||||
),
|
||||
|
||||
lists:foreach(
|
||||
fun(ClientId) ->
|
||||
Msg = emqx_message:make(ClientId, <<"$delayed/1/bc">>, <<"payload">>),
|
||||
|
@ -237,8 +245,7 @@ t_banned_delayed(_) ->
|
|||
[ClientId1, ClientId1, ClientId1, ClientId2, ClientId2]
|
||||
),
|
||||
|
||||
timer:sleep(2000),
|
||||
Trace = snabbkaffe:collect_trace(),
|
||||
{ok, Trace} = snabbkaffe:receive_events(SubRef),
|
||||
snabbkaffe:stop(),
|
||||
emqx_banned:delete(Who),
|
||||
mnesia:clear_table(emqx_delayed),
|
||||
|
|
|
@ -687,11 +687,19 @@ t_deliver_when_banned(_) ->
|
|||
}),
|
||||
|
||||
timer:sleep(100),
|
||||
snabbkaffe:start_trace(),
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, [{qos, 0}, {rh, 0}]),
|
||||
timer:sleep(500),
|
||||
|
||||
Trace = snabbkaffe:collect_trace(),
|
||||
snabbkaffe:start_trace(),
|
||||
{ok, SubRef} =
|
||||
snabbkaffe:subscribe(
|
||||
?match_event(#{?snk_kind := ignore_retained_message_deliver}),
|
||||
_NEvents = 3,
|
||||
_Timeout = 10000,
|
||||
0
|
||||
),
|
||||
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, [{qos, 0}, {rh, 0}]),
|
||||
|
||||
{ok, Trace} = snabbkaffe:receive_events(SubRef),
|
||||
?assertEqual(3, length(?of_kind(ignore_retained_message_deliver, Trace))),
|
||||
snabbkaffe:stop(),
|
||||
emqx_banned:delete(Who),
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Support HAProxy protocol for dashboard API.
|
|
@ -0,0 +1 @@
|
|||
现在 dashboard 增加了对 `HAProxy` 协议的支持。
|
|
@ -0,0 +1,3 @@
|
|||
Allow the placeholder to be anywhere in the topic for `authz` rules.
|
||||
e.g:
|
||||
`{allow, {username, "who"}, publish, ["t/foo${username}boo/${clientid}xxx"]}.`
|
|
@ -0,0 +1,3 @@
|
|||
允许占位符出现在 `authz` 规则中的主题里的任意位置。
|
||||
例如:
|
||||
`{allow, {username, "who"}, publish, ["t/foo${username}boo/${clientid}xxx"]}.`
|
|
@ -0,0 +1 @@
|
|||
Fix the exclusive topics aren't removed when the session has already been cleaned.
|
|
@ -0,0 +1 @@
|
|||
修复会话清除后相关的排他订阅主题没有被清理的问题。
|
2
mix.exs
2
mix.exs
|
@ -56,7 +56,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
{:ekka, github: "emqx/ekka", tag: "0.13.9", override: true},
|
||||
{:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
|
||||
{:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
|
||||
{:minirest, github: "emqx/minirest", tag: "1.3.7", override: true},
|
||||
{:minirest, github: "emqx/minirest", tag: "1.3.8", override: true},
|
||||
{:ecpool, github: "emqx/ecpool", tag: "0.5.3", override: true},
|
||||
{:replayq, github: "emqx/replayq", tag: "0.3.7", override: true},
|
||||
{:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
|
||||
|
|
|
@ -58,7 +58,7 @@
|
|||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.9"}}}
|
||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
|
||||
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.8"}}}
|
||||
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.3"}}}
|
||||
, {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
|
||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
|
||||
|
|
Loading…
Reference in New Issue