Merge remote-tracking branch 'origin/release-v44' into 1005-sync-v44-upstreams
This commit is contained in:
commit
28b1a4e6f1
|
@ -136,7 +136,10 @@ jobs:
|
|||
pkg_name=$(find _packages/${EMQX_NAME} -mindepth 1 -maxdepth 1 -iname \*.zip)
|
||||
unzip -q $pkg_name
|
||||
gsed -i '/emqx_telemetry/d' ./emqx/data/loaded_plugins
|
||||
./emqx/bin/emqx start || cat emqx/log/erlang.log.1
|
||||
# test with a spaces in path
|
||||
mv ./emqx "./emqx home/"
|
||||
cd "./emqx home/"
|
||||
./bin/emqx start || cat log/erlang.log.1
|
||||
ready='no'
|
||||
for i in {1..10}; do
|
||||
if curl -fs 127.0.0.1:18083 > /dev/null; then
|
||||
|
@ -147,12 +150,13 @@ jobs:
|
|||
done
|
||||
if [ "$ready" != "yes" ]; then
|
||||
echo "Timed out waiting for emqx to be ready"
|
||||
cat emqx/log/erlang.log.1
|
||||
cat log/erlang.log.1
|
||||
exit 1
|
||||
fi
|
||||
./emqx/bin/emqx_ctl status
|
||||
./emqx/bin/emqx stop
|
||||
rm -rf emqx
|
||||
./bin/emqx_ctl status
|
||||
./bin/emqx stop
|
||||
cd ..
|
||||
rm -rf "emqx home"
|
||||
- uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: macos
|
||||
|
|
|
@ -12,6 +12,8 @@ File format:
|
|||
|
||||
## v4.3.22
|
||||
|
||||
### Minor changes
|
||||
|
||||
## v4.3.21
|
||||
|
||||
### Enhancements
|
||||
|
@ -25,13 +27,27 @@ File format:
|
|||
- TLS listener default buffer size to 4KB [#9007](https://github.com/emqx/emqx/pull/9007)
|
||||
Eliminate uncertainty that the buffer size is set by OS default.
|
||||
|
||||
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
|
||||
|
||||
- Disable authorization for `api/v4/emqx_prometheus` endpoint. [8955](https://github.com/emqx/emqx/pull/8955)
|
||||
|
||||
- Added a test to prevent a last will testament message to be
|
||||
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
|
||||
|
||||
### Bug fixes
|
||||
|
||||
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
|
||||
|
||||
- Hide redis password in error logs [#9071](https://github.com/emqx/emqx/pull/9071)
|
||||
In this change, it also included more changes in redis client:
|
||||
- Improve redis connection error logging [eredis:19](https://github.com/emqx/eredis/pull/19).
|
||||
Also added support for eredis to accept an anonymous function as password instead of
|
||||
passing around plaintext args which may get dumpped to crash logs (hard to predict where).
|
||||
This change also added `format_status` callback for `gen_server` states which hold plaintext
|
||||
password so the process termination log and `sys:get_status` will print '******' instead of
|
||||
the password to console.
|
||||
- Avoid pool name clashing [eredis_cluster#22](https://github.com/emqx/eredis_cluster/pull/22)
|
||||
Same `format_status` callback is added here too for `gen_server`s which hold password in
|
||||
their state.
|
||||
|
||||
## v4.3.20
|
||||
|
||||
### Bug fixes
|
||||
|
|
|
@ -1,5 +1,11 @@
|
|||
# EMQX 4.4 Changes
|
||||
|
||||
## v4.4.10
|
||||
|
||||
### Bug fixes
|
||||
|
||||
- Fix the latency statistics error of the slow subscription module when `stats_type` is `internal` or `response`. [#8981](https://github.com/emqx/emqx/pull/8981)
|
||||
|
||||
## v4.4.9
|
||||
|
||||
### Bug fixes (synced from v4.3.20)
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
-compile(export_all).
|
||||
|
||||
-include("emqx_auth_mnesia.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
@ -78,15 +79,37 @@ init_per_testcase_migration(_, Config) ->
|
|||
emqx_acl_mnesia_migrator:migrate_records(),
|
||||
Config.
|
||||
|
||||
init_per_testcase_other(t_last_will_testament_message_check_acl, Config) ->
|
||||
OriginalACLNoMatch = application:get_env(emqx, acl_nomatch),
|
||||
application:set_env(emqx, acl_nomatch, deny),
|
||||
emqx_mod_acl_internal:unload([]),
|
||||
%% deny all for this client
|
||||
ClientID = <<"lwt_client">>,
|
||||
ok = emqx_acl_mnesia_db:add_acl({clientid, ClientID}, <<"#">>, pubsub, deny),
|
||||
[ {original_acl_nomatch, OriginalACLNoMatch}
|
||||
, {clientid, ClientID}
|
||||
| Config];
|
||||
init_per_testcase_other(_TestCase, Config) ->
|
||||
Config.
|
||||
|
||||
init_per_testcase(Case, Config) ->
|
||||
PerTestInitializers = [
|
||||
fun init_per_testcase_clean/2,
|
||||
fun init_per_testcase_migration/2,
|
||||
fun init_per_testcase_emqx_hook/2
|
||||
fun init_per_testcase_emqx_hook/2,
|
||||
fun init_per_testcase_other/2
|
||||
],
|
||||
lists:foldl(fun(Init, Conf) -> Init(Case, Conf) end, Config, PerTestInitializers).
|
||||
|
||||
end_per_testcase(_, Config) ->
|
||||
end_per_testcase(t_last_will_testament_message_check_acl, Config) ->
|
||||
emqx:unhook('client.check_acl', fun emqx_acl_mnesia:check_acl/5),
|
||||
case ?config(original_acl_nomatch, Config) of
|
||||
{ok, Original} -> application:set_env(emqx, acl_nomatch, Original);
|
||||
_ -> ok
|
||||
end,
|
||||
emqx_mod_acl_internal:load([]),
|
||||
ok;
|
||||
end_per_testcase(_TestCase, Config) ->
|
||||
emqx:unhook('client.check_acl', fun emqx_acl_mnesia:check_acl/5),
|
||||
Config.
|
||||
|
||||
|
@ -465,6 +488,35 @@ t_rest_api(_Config) ->
|
|||
{ok, Res3} = request_http_rest_list(["$all"]),
|
||||
?assertMatch([], get_http_data(Res3)).
|
||||
|
||||
%% asserts that we check ACL for the LWT topic before publishing the
|
||||
%% LWT.
|
||||
t_last_will_testament_message_check_acl(Config) ->
|
||||
ClientID = ?config(clientid, Config),
|
||||
{ok, C} = emqtt:start_link([
|
||||
{clientid, ClientID},
|
||||
{will_topic, <<"$SYS/lwt">>},
|
||||
{will_payload, <<"should not be published">>}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
ok = emqx:subscribe(<<"$SYS/lwt">>),
|
||||
unlink(C),
|
||||
ok = snabbkaffe:start_trace(),
|
||||
{true, {ok, _}} =
|
||||
?wait_async_action(
|
||||
exit(C, kill),
|
||||
#{?snk_kind := last_will_testament_publish_denied},
|
||||
1_000
|
||||
),
|
||||
ok = snabbkaffe:stop(),
|
||||
|
||||
receive
|
||||
{deliver, <<"$SYS/lwt">>, #message{payload = <<"should not be published">>}} ->
|
||||
error(lwt_should_not_be_published_to_forbidden_topic)
|
||||
after 1_000 ->
|
||||
ok
|
||||
end,
|
||||
|
||||
ok.
|
||||
|
||||
create_conflicting_records() ->
|
||||
Records = [
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-ifndef(EMQX_PSK_FILE).
|
||||
-define(EMQX_PSK_FILE, true).
|
||||
|
||||
-define(PSK_FILE_TAB, emqx_psk_file).
|
||||
|
||||
-record(psk_entry, {psk_id :: binary(),
|
||||
psk_str :: binary()}).
|
||||
|
||||
-endif.
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_psk_file,
|
||||
[{description,"EMQX PSK Plugin from File"},
|
||||
{vsn, "4.3.0"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.1"}, % strict semver, bump manually!
|
||||
{modules,[]},
|
||||
{registered,[emqx_psk_file_sup]},
|
||||
{applications,[kernel,stdlib]},
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[{"4.3.0",
|
||||
[{load_module,emqx_psk_file,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_psk_file_sup,brutal_purge,soft_purge,[]}]}
|
||||
],
|
||||
[{"4.3.0",
|
||||
[{load_module,emqx_psk_file,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_psk_file_sup,brutal_purge,soft_purge,[]}]}
|
||||
]}.
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
-module(emqx_psk_file).
|
||||
|
||||
-include("emqx_psk_file.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
|
@ -26,15 +27,10 @@
|
|||
%% Hooks functions
|
||||
-export([on_psk_lookup/2]).
|
||||
|
||||
-define(TAB, ?MODULE).
|
||||
-define(LF, 10).
|
||||
|
||||
-record(psk_entry, {psk_id :: binary(),
|
||||
psk_str :: binary()}).
|
||||
|
||||
%% Called when the plugin application start
|
||||
load(Env) ->
|
||||
_ = ets:new(?TAB, [set, named_table, {keypos, #psk_entry.psk_id}]),
|
||||
{ok, PskFile} = file:open(get_value(path, Env), [read, raw, binary, read_ahead]),
|
||||
preload_psks(PskFile, bin(get_value(delimiter, Env))),
|
||||
_ = file:close(PskFile),
|
||||
|
@ -45,7 +41,7 @@ unload() ->
|
|||
emqx:unhook('tls_handshake.psk_lookup', fun ?MODULE:on_psk_lookup/2).
|
||||
|
||||
on_psk_lookup(ClientPSKID, UserState) ->
|
||||
case ets:lookup(?TAB, ClientPSKID) of
|
||||
case ets:lookup(?PSK_FILE_TAB, ClientPSKID) of
|
||||
[#psk_entry{psk_str = PskStr}] ->
|
||||
{stop, PskStr};
|
||||
[] ->
|
||||
|
@ -57,7 +53,9 @@ preload_psks(FileHandler, Delimiter) ->
|
|||
{ok, Line} ->
|
||||
case binary:split(Line, Delimiter) of
|
||||
[Key, Rem] ->
|
||||
ets:insert(?TAB, #psk_entry{psk_id = Key, psk_str = trim_lf(Rem)}),
|
||||
ets:insert(
|
||||
?PSK_FILE_TAB,
|
||||
#psk_entry{psk_id = Key, psk_str = trim_lf(Rem)}),
|
||||
preload_psks(FileHandler, Delimiter);
|
||||
[Line] ->
|
||||
?LOG(warning, "[~p] - Invalid line: ~p, delimiter: ~p", [?MODULE, Line, Delimiter])
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-module(emqx_psk_file_sup).
|
||||
|
||||
-include("emqx_psk_file.hrl").
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
%% API
|
||||
|
@ -25,8 +27,11 @@
|
|||
-export([init/1]).
|
||||
|
||||
start_link() ->
|
||||
_ = ets:new(
|
||||
?PSK_FILE_TAB,
|
||||
[set, named_table, public, {keypos, #psk_entry.psk_id}]
|
||||
),
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
init([]) ->
|
||||
{ok, { {one_for_one, 0, 1}, []} }.
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_retainer,
|
||||
[{description, "EMQ X Retainer"},
|
||||
{vsn, "4.4.2"}, % strict semver, bump manually!
|
||||
{vsn, "4.4.3"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_retainer_sup]},
|
||||
{applications, [kernel,stdlib]},
|
||||
|
|
|
@ -34,6 +34,23 @@ init([Env]) ->
|
|||
type => worker,
|
||||
modules => [emqx_retainer]} || not is_managed_by_modules()]}}.
|
||||
|
||||
-ifdef(EMQX_ENTERPRISE).
|
||||
|
||||
is_managed_by_modules() ->
|
||||
try
|
||||
case supervisor:get_childspec(emqx_modules_sup, emqx_retainer) of
|
||||
{ok, _} -> true;
|
||||
_ -> false
|
||||
end
|
||||
catch
|
||||
exit : {noproc, _} ->
|
||||
false
|
||||
end.
|
||||
|
||||
-else.
|
||||
|
||||
is_managed_by_modules() ->
|
||||
%% always false for opensource edition
|
||||
false.
|
||||
|
||||
-endif.
|
||||
|
|
|
@ -20,7 +20,9 @@
|
|||
-export([ensure_start/0, ensure_stop/0]).
|
||||
-ifdef(EMQX_ENTERPRISE).
|
||||
ensure_start() ->
|
||||
%% for enterprise edition, retainer is started by modules
|
||||
application:stop(emqx_modules),
|
||||
ensure_stop(),
|
||||
init_conf(),
|
||||
emqx_ct_helpers:start_apps([emqx_retainer]),
|
||||
ok.
|
||||
|
@ -29,6 +31,7 @@ ensure_start() ->
|
|||
|
||||
ensure_start() ->
|
||||
init_conf(),
|
||||
ensure_stop(),
|
||||
emqx_ct_helpers:start_apps([emqx_retainer]),
|
||||
ok.
|
||||
|
||||
|
|
28
bin/emqx
28
bin/emqx
|
@ -523,6 +523,16 @@ case "$1" in
|
|||
;;
|
||||
esac
|
||||
|
||||
if [ "$IS_BOOT_COMMAND" = 'no' ]; then
|
||||
# for non-boot commands, inspect vm.<time>.args for node name
|
||||
# shellcheck disable=SC2012,SC2086
|
||||
LATEST_VM_ARGS_FILE="$(ls -t "$RUNNER_DATA_DIR"/configs/vm.*.args 2>/dev/null | head -1)"
|
||||
if [ -z "$LATEST_VM_ARGS_FILE" ]; then
|
||||
echoerr "There is no vm.*.args config file found in '$RUNNER_DATA_DIR/configs/'"
|
||||
echoerr "Please make sure the node is initialized (started for at least once)"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -z "$NAME_ARG" ]; then
|
||||
NODENAME="${EMQX_NODE_NAME:-}"
|
||||
|
@ -530,15 +540,7 @@ if [ -z "$NAME_ARG" ]; then
|
|||
[ -z "$NODENAME" ] && [ -n "$EMQX_NAME" ] && [ -n "$EMQX_HOST" ] && NODENAME="${EMQX_NAME}@${EMQX_HOST}"
|
||||
if [ -z "$NODENAME" ]; then
|
||||
if [ "$IS_BOOT_COMMAND" = 'no' ]; then
|
||||
# for non-boot commands, inspect vm.<time>.args for node name
|
||||
# shellcheck disable=SC2012,SC2086
|
||||
LATEST_VM_ARGS="$(ls -t $RUNNER_DATA_DIR/configs/vm.*.args 2>/dev/null | head -1)"
|
||||
if [ -z "$LATEST_VM_ARGS" ]; then
|
||||
echoerr "For command $1, there is no vm.*.args config file found in $RUNNER_DATA_DIR/configs/"
|
||||
echoerr "Please make sure the node is initialized (started for at least once)"
|
||||
exit 1
|
||||
fi
|
||||
NODENAME="$(grep -E '^-name' "$LATEST_VM_ARGS" | awk '{print $2}')"
|
||||
NODENAME="$(grep -E '^-name' "$LATEST_VM_ARGS_FILE" | awk '{print $2}')"
|
||||
else
|
||||
# for boot commands, inspect emqx.conf for node name
|
||||
NODENAME=$("$ERTS_PATH"/escript "$RUNNER_ROOT_DIR"/bin/cuttlefish -i "$REL_DIR"/emqx.schema -c "$RUNNER_ETC_DIR"/emqx.conf get node.name)
|
||||
|
@ -573,13 +575,7 @@ if [ -z "$COOKIE" ]; then
|
|||
if [ "$IS_BOOT_COMMAND" = 'yes' ]; then
|
||||
COOKIE=$("$ERTS_PATH"/escript "$RUNNER_ROOT_DIR"/bin/cuttlefish -i "$REL_DIR"/emqx.schema -c "$RUNNER_ETC_DIR"/emqx.conf get node.cookie)
|
||||
else
|
||||
# shellcheck disable=SC2012,SC2086
|
||||
LATEST_VM_ARGS="$(ls -t $RUNNER_DATA_DIR/configs/vm.*.args | head -1)"
|
||||
if [ -z "$LATEST_VM_ARGS" ]; then
|
||||
echo "For command $1, there is no vm.*.args config file found in $RUNNER_DATA_DIR/configs/"
|
||||
exit 1
|
||||
fi
|
||||
COOKIE="$(grep -E '^-setcookie' "$LATEST_VM_ARGS" | awk '{print $2}')"
|
||||
COOKIE="$(grep -E '^-setcookie' "$LATEST_VM_ARGS_FILE" | awk '{print $2}')"
|
||||
fi
|
||||
fi
|
||||
|
||||
|
|
|
@ -22,13 +22,25 @@
|
|||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include("include/emqx_mqtt.hrl").
|
||||
-include_lib("include/emqx.hrl").
|
||||
-define(LANTENCY, 101).
|
||||
|
||||
%-define(LOGT(Format, Args), ct:pal(Format, Args)).
|
||||
|
||||
-define(TOPK_TAB, emqx_slow_subs_topk).
|
||||
-define(NOW, erlang:system_time(millisecond)).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() ->
|
||||
[ {group, whole}
|
||||
, {group, internal}
|
||||
, {group, response}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
Cases = emqx_ct:all(?MODULE),
|
||||
[ {whole, [], Cases}
|
||||
, {internal, [], Cases}
|
||||
, {response, [], Cases}
|
||||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:start_apps([emqx]),
|
||||
|
@ -39,7 +51,8 @@ end_per_suite(Config) ->
|
|||
Config.
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
emqx_mod_slow_subs:load(base_conf()),
|
||||
Group = proplists:get_value(name, proplists:get_value(tc_group_properties, Config)),
|
||||
emqx_mod_slow_subs:load(base_conf(Group)),
|
||||
Config.
|
||||
|
||||
end_per_testcase(_, _) ->
|
||||
|
@ -49,10 +62,10 @@ end_per_testcase(_, _) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Test Cases
|
||||
%%--------------------------------------------------------------------
|
||||
t_log_and_pub(_) ->
|
||||
t_log_and_pub(Config) ->
|
||||
%% Sub topic first
|
||||
Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}],
|
||||
Clients = start_client(Subs),
|
||||
Clients = start_client(Subs, Config),
|
||||
timer:sleep(1500),
|
||||
Now = ?NOW,
|
||||
|
||||
|
@ -60,14 +73,14 @@ t_log_and_pub(_) ->
|
|||
lists:foreach(fun(I) ->
|
||||
Topic = list_to_binary(io_lib:format("/test1/~p", [I])),
|
||||
Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>),
|
||||
emqx:publish(Msg#message{timestamp = Now - 500})
|
||||
emqx:publish(Msg#message{timestamp = Now - ?LANTENCY})
|
||||
end,
|
||||
lists:seq(1, 10)),
|
||||
|
||||
lists:foreach(fun(I) ->
|
||||
Topic = list_to_binary(io_lib:format("/test2/~p", [I])),
|
||||
Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>),
|
||||
emqx:publish(Msg#message{timestamp = Now - 500})
|
||||
emqx:publish(Msg#message{timestamp = Now - ?LANTENCY})
|
||||
end,
|
||||
lists:seq(1, 10)),
|
||||
|
||||
|
@ -77,25 +90,33 @@ t_log_and_pub(_) ->
|
|||
?assert(Size =< 8 andalso Size >= 3,
|
||||
unicode:characters_to_binary(io_lib:format("size is :~p~n", [Size]))),
|
||||
|
||||
?assert(
|
||||
lists:all(
|
||||
fun(#{timespan := Ts}) ->
|
||||
Ts >= 101 andalso Ts < ?NOW - Now
|
||||
end,
|
||||
emqx_slow_subs_api:get_history()
|
||||
)
|
||||
),
|
||||
|
||||
timer:sleep(3000),
|
||||
?assert(ets:info(?TOPK_TAB, size) =:= 0),
|
||||
[Client ! stop || Client <- Clients],
|
||||
ok.
|
||||
base_conf() ->
|
||||
[ {threshold, 300}
|
||||
base_conf(Type) ->
|
||||
[ {threshold, 100}
|
||||
, {top_k_num, 5}
|
||||
, {expire_interval, timer:seconds(3)}
|
||||
, {stats_type, whole}
|
||||
, {stats_type, Type}
|
||||
].
|
||||
|
||||
start_client(Subs) ->
|
||||
[spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)].
|
||||
start_client(Subs, Config) ->
|
||||
[spawn(fun() -> client(I, Subs, Config) end) || I <- lists:seq(1, 10)].
|
||||
|
||||
client(I, Subs) ->
|
||||
{ok, C} = emqtt:start_link([{host, "localhost"},
|
||||
{clientid, io_lib:format("slow_subs_~p", [I])},
|
||||
{username, <<"plain">>},
|
||||
{password, <<"plain">>}]),
|
||||
client(I, Subs, Config) ->
|
||||
Group = proplists:get_value(name, proplists:get_value(tc_group_properties, Config)),
|
||||
ConnOptions = make_conn_options(Group, I),
|
||||
{ok, C} = emqtt:start_link(ConnOptions),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
|
||||
Len = erlang:length(Subs),
|
||||
|
@ -115,3 +136,14 @@ try_receive(Acc) ->
|
|||
after 500 ->
|
||||
Acc
|
||||
end.
|
||||
|
||||
make_conn_options(response, I) ->
|
||||
[ {msg_handler,
|
||||
#{publish => fun(_) -> timer:sleep(50) end,
|
||||
disconnected => fun(_) -> ok end}}
|
||||
| make_conn_options(whole, I)];
|
||||
make_conn_options(_, I) ->
|
||||
[{host, "localhost"},
|
||||
{clientid, io_lib:format("slow_subs_~p", [I])},
|
||||
{username, <<"plain">>},
|
||||
{password, <<"plain">>}].
|
||||
|
|
|
@ -42,7 +42,7 @@
|
|||
, {redbug, "2.0.7"}
|
||||
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}}
|
||||
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
|
||||
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.3"}}}
|
||||
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.4"}}}
|
||||
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.9.0"}}}
|
||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
||||
|
|
|
@ -62,7 +62,8 @@ app_specific_actions(_) ->
|
|||
ignored_apps() ->
|
||||
[gpb, %% only a build tool
|
||||
emqx_dashboard, %% generic appup file for all versions
|
||||
emqx_management %% generic appup file for all versions
|
||||
emqx_management, %% generic appup file for all versions
|
||||
emqx_modules_spec %% generic appup file for all versions
|
||||
] ++ otp_standard_apps().
|
||||
|
||||
main(Args) ->
|
||||
|
@ -284,9 +285,9 @@ merge_update_actions(App, Changes, Vsns, PrevVersion) ->
|
|||
%% but there is a 1.1.2 in appup we may skip merging instructions for
|
||||
%% 1.1.2 because it's not used and no way to know what has been changed
|
||||
is_skipped_version(App, Vsn, PrevVersion) when is_list(Vsn) andalso is_list(PrevVersion) ->
|
||||
case is_app_external(App) andalso parse_version_number(Vsn) of
|
||||
case is_app_external(App) andalso parse_version(Vsn, non_strict_semver) of
|
||||
{ok, VsnTuple} ->
|
||||
case parse_version_number(PrevVersion) of
|
||||
case parse_version(PrevVersion, non_strict_semver) of
|
||||
{ok, PrevVsnTuple} ->
|
||||
VsnTuple > PrevVsnTuple;
|
||||
_ ->
|
||||
|
@ -312,7 +313,7 @@ do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) ->
|
|||
true ->
|
||||
[];
|
||||
false ->
|
||||
[{load_module, M, brutal_purge, soft_purge, []} || M <- Changed] ++
|
||||
[{load_module, M, brutal_purge, soft_purge, []} || M <- Changed, not is_secret_module(M)] ++
|
||||
[{add_module, M} || M <- New]
|
||||
end,
|
||||
{OldActionsWithStop, OldActionsAfterStop} =
|
||||
|
@ -324,10 +325,18 @@ do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) ->
|
|||
true ->
|
||||
[];
|
||||
false ->
|
||||
[{delete_module, M} || M <- Deleted]
|
||||
[{delete_module, M} || M <- Deleted, not is_secret_module(M)]
|
||||
end ++
|
||||
AppSpecific.
|
||||
|
||||
%% Do not reload or delet _secret modules
|
||||
is_secret_module(Module) ->
|
||||
Suffix = "_secret",
|
||||
case string:right(atom_to_list(Module), length(Suffix)) of
|
||||
Suffix -> true;
|
||||
_ -> false
|
||||
end.
|
||||
|
||||
%% If an entry restarts an application, there's no need to use
|
||||
%% `load_module' instructions.
|
||||
contains_restart_application(Application, Actions) ->
|
||||
|
@ -397,7 +406,7 @@ contains_version(Needle, Haystack) when is_list(Needle) ->
|
|||
%% past versions that should be covered by regexes in .appup file
|
||||
%% instructions.
|
||||
enumerate_past_versions(Vsn) when is_list(Vsn) ->
|
||||
case parse_version_number(Vsn) of
|
||||
case parse_version(Vsn) of
|
||||
{ok, ParsedVsn} ->
|
||||
{ok, enumerate_past_versions(ParsedVsn)};
|
||||
Error ->
|
||||
|
@ -406,14 +415,39 @@ enumerate_past_versions(Vsn) when is_list(Vsn) ->
|
|||
enumerate_past_versions({Major, Minor, Patch}) ->
|
||||
[{Major, Minor, P} || P <- lists:seq(Patch - 1, 0, -1)].
|
||||
|
||||
parse_version_number(Vsn) when is_list(Vsn) ->
|
||||
Nums = string:split(Vsn, ".", all),
|
||||
Results = lists:map(fun string:to_integer/1, Nums),
|
||||
case Results of
|
||||
[{Major, []}, {Minor, []}, {Patch, []}] ->
|
||||
{ok, {Major, Minor, Patch}};
|
||||
_ ->
|
||||
{error, bad_version}
|
||||
parse_version(Vsn) ->
|
||||
parse_version(Vsn, strict_semver).
|
||||
|
||||
parse_version(Vsn, MaybeSemver) when is_list(Vsn) ->
|
||||
case parse_dot_separated_numbers(Vsn) of
|
||||
{ok, {_Major, _Minor, _Patch}} = Res ->
|
||||
Res;
|
||||
{ok, Nums} ->
|
||||
case MaybeSemver of
|
||||
strict_semver ->
|
||||
{error, {bad_semver, Vsn}};
|
||||
non_strict_semver ->
|
||||
{ok, Nums}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{error, {Reason, Vsn}}
|
||||
end.
|
||||
|
||||
parse_dot_separated_numbers(Str) when is_list(Str) ->
|
||||
try
|
||||
Split = string:split(Str, ".", all),
|
||||
IntL = lists:map(fun(SubStr) ->
|
||||
case string:to_integer(SubStr) of
|
||||
{Int, []} when is_integer(Int) ->
|
||||
Int;
|
||||
_ ->
|
||||
throw(no_integer)
|
||||
end
|
||||
end, Split),
|
||||
{ok, list_to_tuple(IntL)}
|
||||
catch
|
||||
_ : _ ->
|
||||
{error, bad_version_string}
|
||||
end.
|
||||
|
||||
vsn_number_to_string({Major, Minor, Patch}) ->
|
||||
|
|
|
@ -3,6 +3,9 @@
|
|||
{VSN,
|
||||
[{"4.4.9",
|
||||
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
|
@ -15,6 +18,9 @@
|
|||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.8",
|
||||
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||
|
@ -28,7 +34,9 @@
|
|||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_message,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.7",
|
||||
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_secret},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
|
@ -43,7 +51,9 @@
|
|||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.6",
|
||||
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_secret},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
|
@ -58,7 +68,8 @@
|
|||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.5",
|
||||
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_secret},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
|
@ -76,7 +87,8 @@
|
|||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.4",
|
||||
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_secret},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
|
@ -102,7 +114,8 @@
|
|||
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.3",
|
||||
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_secret},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||
|
@ -135,7 +148,8 @@
|
|||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_relup}]},
|
||||
{"4.4.2",
|
||||
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_secret},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||
|
@ -170,7 +184,8 @@
|
|||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_relup}]},
|
||||
{"4.4.1",
|
||||
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_secret},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
||||
|
@ -211,7 +226,8 @@
|
|||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_relup}]},
|
||||
{"4.4.0",
|
||||
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
[{add_module,emqx_secret},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
|
@ -256,6 +272,8 @@
|
|||
{<<".*">>,[]}],
|
||||
[{"4.4.9",
|
||||
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
|
@ -268,6 +286,8 @@
|
|||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.8",
|
||||
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||
|
@ -281,7 +301,8 @@
|
|||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_message,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.7",
|
||||
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
|
@ -296,7 +317,8 @@
|
|||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||
{"4.4.6",
|
||||
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
|
|
33
src/emqx.erl
33
src/emqx.erl
|
@ -228,6 +228,7 @@ shutdown() ->
|
|||
|
||||
shutdown(Reason) ->
|
||||
?LOG(critical, "emqx shutdown for ~s", [Reason]),
|
||||
on_shutdown(Reason),
|
||||
_ = emqx_plugins:unload(),
|
||||
lists:foreach(fun application:stop/1
|
||||
, lists:reverse(default_started_applications())
|
||||
|
@ -238,10 +239,12 @@ reboot() ->
|
|||
true ->
|
||||
_ = application:stop(emqx_dashboard), %% dashboard must be started after mnesia
|
||||
lists:foreach(fun application:start/1 , default_started_applications()),
|
||||
application:start(emqx_dashboard);
|
||||
_ = application:start(emqx_dashboard),
|
||||
on_reboot();
|
||||
|
||||
false ->
|
||||
lists:foreach(fun application:start/1 , default_started_applications())
|
||||
lists:foreach(fun application:start/1 , default_started_applications()),
|
||||
on_reboot()
|
||||
end.
|
||||
|
||||
is_application_running(App) ->
|
||||
|
@ -256,6 +259,32 @@ default_started_applications() ->
|
|||
[gproc, esockd, ranch, cowboy, ekka, emqx, emqx_modules].
|
||||
-endif.
|
||||
|
||||
-ifdef(EMQX_ENTERPRISE).
|
||||
on_reboot() ->
|
||||
try
|
||||
_ = emqx_license_api:bootstrap_license(),
|
||||
ok
|
||||
catch
|
||||
Kind:Reason:Stack ->
|
||||
?LOG(critical, "~p while rebooting: ~p, ~p", [Kind, Reason, Stack]),
|
||||
ok
|
||||
end,
|
||||
ok.
|
||||
|
||||
on_shutdown(join) ->
|
||||
emqx_modules:sync_load_modules_file(),
|
||||
ok;
|
||||
on_shutdown(_) ->
|
||||
ok.
|
||||
|
||||
-else.
|
||||
on_reboot() ->
|
||||
ok.
|
||||
|
||||
on_shutdown(_) ->
|
||||
ok.
|
||||
-endif.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -976,9 +976,10 @@ return_sub_unsub_ack(Packet, Channel) ->
|
|||
handle_call(kick, Channel = #channel{
|
||||
conn_state = ConnState,
|
||||
will_msg = WillMsg,
|
||||
clientinfo = ClientInfo,
|
||||
conninfo = #{proto_ver := ProtoVer}
|
||||
}) ->
|
||||
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
||||
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
|
||||
Channel1 = case ConnState of
|
||||
connected -> ensure_disconnected(kicked, Channel);
|
||||
_ -> Channel
|
||||
|
@ -1140,8 +1141,9 @@ handle_timeout(_TRef, expire_awaiting_rel,
|
|||
handle_timeout(_TRef, expire_session, Channel) ->
|
||||
shutdown(expired, Channel);
|
||||
|
||||
handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg}) ->
|
||||
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
||||
handle_timeout(_TRef, will_message, Channel = #channel{will_msg = WillMsg,
|
||||
clientinfo = ClientInfo}) ->
|
||||
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
|
||||
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
|
||||
|
||||
handle_timeout(_TRef, expire_quota_limit, Channel) ->
|
||||
|
@ -1197,9 +1199,10 @@ terminate(_Reason, #channel{conn_state = idle} = _Channel) ->
|
|||
ok;
|
||||
terminate(normal, Channel) ->
|
||||
run_terminate_hook(normal, Channel);
|
||||
terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
|
||||
terminate(Reason, Channel = #channel{will_msg = WillMsg,
|
||||
clientinfo = ClientInfo}) ->
|
||||
should_publish_will_message(Reason, Channel)
|
||||
andalso publish_will_msg(WillMsg),
|
||||
andalso publish_will_msg(ClientInfo, WillMsg),
|
||||
run_terminate_hook(Reason, Channel).
|
||||
|
||||
run_terminate_hook(_Reason, #channel{session = undefined} = _Channel) ->
|
||||
|
@ -1741,10 +1744,11 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
|
|||
|
||||
maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
|
||||
Channel;
|
||||
maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
|
||||
maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg,
|
||||
clientinfo = ClientInfo}) ->
|
||||
case will_delay_interval(WillMsg) of
|
||||
0 ->
|
||||
ok = publish_will_msg(WillMsg),
|
||||
ok = publish_will_msg(ClientInfo, WillMsg),
|
||||
Channel#channel{will_msg = undefined};
|
||||
I ->
|
||||
ensure_timer(will_timer, timer:seconds(I), Channel)
|
||||
|
@ -1754,9 +1758,19 @@ will_delay_interval(WillMsg) ->
|
|||
maps:get('Will-Delay-Interval',
|
||||
emqx_message:get_header(properties, WillMsg, #{}), 0).
|
||||
|
||||
publish_will_msg(Msg) ->
|
||||
_ = emqx_broker:publish(Msg),
|
||||
ok.
|
||||
publish_will_msg(ClientInfo, Msg = #message{topic = Topic}) ->
|
||||
case emqx_access_control:check_acl(ClientInfo, publish, Topic) of
|
||||
allow ->
|
||||
_ = emqx_broker:publish(Msg),
|
||||
ok;
|
||||
deny ->
|
||||
?tp(
|
||||
warning,
|
||||
last_will_testament_publish_denied,
|
||||
#{topic => Topic}
|
||||
),
|
||||
ok
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Disconnect Reason
|
||||
|
|
|
@ -96,8 +96,8 @@ do_parse(URI) ->
|
|||
%% underscores replaced with hyphens
|
||||
%% NOTE: assuming the input Headers list is a proplists,
|
||||
%% that is, when a key is duplicated, list header overrides tail
|
||||
%% e.g. [{"Content_Type", "applicaiton/binary"}, {<<"content-type">>, "applicaiton/json"}]
|
||||
%% results in: [{"content-type", "applicaiton/binary"}]
|
||||
%% e.g. [{"Content_Type", "applicaiton/binary"}, {"content-type", "applicaiton/json"}]
|
||||
%% results in: [{<<"content-type">>, "applicaiton/binary"}]
|
||||
normalise_headers(Headers0) ->
|
||||
F = fun({K0, V}) ->
|
||||
K = re:replace(K0, "_", "-", [{return,binary}]),
|
||||
|
|
|
@ -47,6 +47,7 @@
|
|||
, index_of/2
|
||||
, maybe_parse_ip/1
|
||||
, ipv6_probe/1
|
||||
, ipv6_probe/2
|
||||
, pmap/2
|
||||
, pmap/3
|
||||
]).
|
||||
|
@ -94,12 +95,15 @@ maybe_parse_ip(Host) ->
|
|||
|
||||
%% @doc Add `ipv6_probe' socket option if it's supported.
|
||||
ipv6_probe(Opts) ->
|
||||
ipv6_probe(Opts, true).
|
||||
|
||||
ipv6_probe(Opts, Ipv6Probe) when is_boolean(Ipv6Probe) orelse is_integer(Ipv6Probe) ->
|
||||
Bool = try gen_tcp:ipv6_probe()
|
||||
catch _ : _ -> false end,
|
||||
ipv6_probe(Bool, Opts).
|
||||
ipv6_probe(Bool, Opts, Ipv6Probe).
|
||||
|
||||
ipv6_probe(false, Opts) -> Opts;
|
||||
ipv6_probe(true, Opts) -> [{ipv6_probe, true} | Opts].
|
||||
ipv6_probe(false, Opts, _) -> Opts;
|
||||
ipv6_probe(true, Opts, Ipv6Probe) -> [{ipv6_probe, Ipv6Probe} | Opts].
|
||||
|
||||
%% @doc Merge options
|
||||
-spec(merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist()).
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
-export([init/0]).
|
||||
|
||||
-export([ load/0
|
||||
, force_load/0
|
||||
, load/1
|
||||
, unload/0
|
||||
, unload/1
|
||||
|
@ -59,12 +60,17 @@ init() ->
|
|||
%% @doc Load all plugins when the broker started.
|
||||
-spec(load() -> ok | ignore | {error, term()}).
|
||||
load() ->
|
||||
do_load(#{force_load => false}).
|
||||
force_load() ->
|
||||
do_load(#{force_load => true}).
|
||||
|
||||
do_load(Options) ->
|
||||
ok = load_ext_plugins(emqx:get_env(expand_plugins_dir)),
|
||||
case emqx:get_env(plugins_loaded_file) of
|
||||
undefined -> ignore; %% No plugins available
|
||||
File ->
|
||||
_ = ensure_file(File),
|
||||
with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end)
|
||||
with_loaded_file(File, fun(Names) -> load_plugins(Names, Options, false) end)
|
||||
end.
|
||||
|
||||
%% @doc Load a Plugin
|
||||
|
@ -282,18 +288,23 @@ filter_plugins([{Name, Load} | Names], Plugins) ->
|
|||
filter_plugins([Name | Names], Plugins) when is_atom(Name) ->
|
||||
filter_plugins([{Name, true} | Names], Plugins).
|
||||
|
||||
load_plugins(Names, Persistent) ->
|
||||
load_plugins(Names, Options, Persistent) ->
|
||||
Plugins = list(),
|
||||
NotFound = Names -- names(Plugins),
|
||||
case NotFound of
|
||||
[] -> ok;
|
||||
NotFound -> ?LOG(alert, "cannot_find_plugins: ~p", [NotFound])
|
||||
end,
|
||||
NeedToLoad = (Names -- NotFound) -- names(started_app),
|
||||
NeedToLoad0 = Names -- NotFound,
|
||||
NeedToLoad1 =
|
||||
case Options of
|
||||
#{force_load := true} -> NeedToLoad0;
|
||||
_ -> NeedToLoad0 -- names(started_app)
|
||||
end,
|
||||
lists:foreach(fun(Name) ->
|
||||
Plugin = find_plugin(Name, Plugins),
|
||||
load_plugin(Plugin#plugin.name, Persistent)
|
||||
end, NeedToLoad).
|
||||
end, NeedToLoad1).
|
||||
|
||||
generate_configs(App) ->
|
||||
ConfigFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".config",
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% Note: this module CAN'T be hot-patched to avoid invalidating the
|
||||
%% closures, so it must not be changed.
|
||||
-module(emqx_secret).
|
||||
|
||||
%% API:
|
||||
-export([wrap/1, unwrap/1]).
|
||||
|
||||
%%================================================================================
|
||||
%% API funcions
|
||||
%%================================================================================
|
||||
|
||||
wrap(undefined) ->
|
||||
undefined;
|
||||
wrap(Func) when is_function(Func) ->
|
||||
Func;
|
||||
wrap(Term) ->
|
||||
fun() ->
|
||||
Term
|
||||
end.
|
||||
|
||||
unwrap(Term) when is_function(Term, 0) ->
|
||||
%% Handle potentially nested funs
|
||||
unwrap(Term());
|
||||
unwrap(Term) ->
|
||||
Term.
|
|
@ -761,7 +761,7 @@ on_delivery_completed(_ClientInfo, _Ts, _Session) ->
|
|||
ok.
|
||||
|
||||
mark_begin_deliver(Msg) ->
|
||||
emqx_message:set_header(deliver_begin_at, erlang:system_time(second), Msg).
|
||||
emqx_message:set_header(deliver_begin_at, erlang:system_time(millisecond), Msg).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
|
|
|
@ -18,33 +18,57 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(SLAVE_START_APPS, [emqx]).
|
||||
%% modules is included because code is called before cluster join
|
||||
-define(SLAVE_START_APPS, [emqx, emqx_modules]).
|
||||
|
||||
-export([start_slave/1,
|
||||
start_slave/2,
|
||||
stop_slave/1]).
|
||||
stop_slave/1,
|
||||
wait_for_synced_routes/3
|
||||
]).
|
||||
|
||||
start_slave(Name) ->
|
||||
start_slave(Name, #{}).
|
||||
|
||||
start_slave(Name, Opts) ->
|
||||
{ok, Node} = ct_slave:start(list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
|
||||
[{kill_if_fail, true},
|
||||
{monitor_master, true},
|
||||
{init_timeout, 10000},
|
||||
{startup_timeout, 10000},
|
||||
{erl_flags, ebin_path()}]),
|
||||
|
||||
Node = make_node_name(Name),
|
||||
case ct_slave:start(Node, [{kill_if_fail, true},
|
||||
{monitor_master, true},
|
||||
{init_timeout, 10000},
|
||||
{startup_timeout, 10000},
|
||||
{erl_flags, ebin_path()}]) of
|
||||
{ok, _} ->
|
||||
ok;
|
||||
{error, started_not_connected, _} ->
|
||||
ok
|
||||
end,
|
||||
pong = net_adm:ping(Node),
|
||||
setup_node(Node, Opts),
|
||||
Node.
|
||||
|
||||
stop_slave(Node) ->
|
||||
rpc:call(Node, ekka, leave, []),
|
||||
ct_slave:stop(Node).
|
||||
make_node_name(Name) ->
|
||||
case string:tokens(atom_to_list(Name), "@") of
|
||||
[_Name, _Host] ->
|
||||
%% the name already has a @
|
||||
Name;
|
||||
_ ->
|
||||
list_to_atom(atom_to_list(Name) ++ "@" ++ host())
|
||||
end.
|
||||
|
||||
stop_slave(Node0) ->
|
||||
Node = make_node_name(Node0),
|
||||
case rpc:call(Node, ekka, leave, []) of
|
||||
ok -> ok;
|
||||
{badrpc, nodedown} -> ok
|
||||
end,
|
||||
case ct_slave:stop(Node) of
|
||||
{ok, _} -> ok;
|
||||
{error, not_started, _} -> ok
|
||||
end.
|
||||
|
||||
host() ->
|
||||
[_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
|
||||
[_, Host] = string:tokens(atom_to_list(node()), "@"),
|
||||
Host.
|
||||
|
||||
ebin_path() ->
|
||||
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
|
||||
|
@ -71,7 +95,12 @@ setup_node(Node, #{} = Opts) ->
|
|||
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
|
||||
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]),
|
||||
|
||||
rpc:call(Node, ekka, join, [node()]),
|
||||
case maps:get(no_join, Opts, false) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
ok = rpc:call(Node, ekka, join, [node()])
|
||||
end,
|
||||
|
||||
%% Sanity check. Assert that `gen_rpc' is set up correctly:
|
||||
?assertEqual( Node
|
||||
|
@ -81,3 +110,40 @@ setup_node(Node, #{} = Opts) ->
|
|||
, gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []])
|
||||
),
|
||||
ok.
|
||||
|
||||
%% Routes are replicated async.
|
||||
%% Call this function to wait for nodes in the cluster to have the same view
|
||||
%% for a given topic.
|
||||
wait_for_synced_routes(Nodes, Topic, Timeout) ->
|
||||
F = fun() -> do_wait_for_synced_routes(Nodes, Topic) end,
|
||||
emqx_misc:nolink_apply(F, Timeout).
|
||||
|
||||
do_wait_for_synced_routes(Nodes, Topic) ->
|
||||
PerNodeView0 =
|
||||
lists:map(
|
||||
fun(Node) ->
|
||||
{rpc:call(Node, emqx_router, match_routes, [Topic]), Node}
|
||||
end, Nodes),
|
||||
PerNodeView = lists:keysort(1, PerNodeView0),
|
||||
case check_consistent_view(PerNodeView) of
|
||||
{ok, OneView} ->
|
||||
ct:pal("consistent_routes_view~n~p", [OneView]),
|
||||
ok;
|
||||
{error, Reason}->
|
||||
ct:pal("inconsistent_routes_view~n~p", [Reason]),
|
||||
timer:sleep(10),
|
||||
do_wait_for_synced_routes(Nodes, Topic)
|
||||
end.
|
||||
|
||||
check_consistent_view(PerNodeView) ->
|
||||
check_consistent_view(PerNodeView, []).
|
||||
|
||||
check_consistent_view([], [OneView]) -> {ok, OneView};
|
||||
check_consistent_view([], MoreThanOneView) -> {error, MoreThanOneView};
|
||||
check_consistent_view([{View, Node} | Rest], [{View, Nodes} | Acc]) ->
|
||||
check_consistent_view(Rest, [{View, add_to_list(Node, Nodes)} | Acc]);
|
||||
check_consistent_view([{View, Node} | Rest], Acc) ->
|
||||
check_consistent_view(Rest, [{View, Node} | Acc]).
|
||||
|
||||
add_to_list(Node, Nodes) when is_list(Nodes) -> [Node | Nodes];
|
||||
add_to_list(Node, Node1) -> [Node, Node1].
|
||||
|
|
|
@ -40,6 +40,9 @@ init_per_suite(Config) ->
|
|||
PortDiscovery = application:get_env(gen_rpc, port_discovery),
|
||||
application:set_env(gen_rpc, port_discovery, stateless),
|
||||
application:ensure_all_started(gen_rpc),
|
||||
%% ensure emqx_moduels' app modules are loaded
|
||||
%% so the mnesia tables are created
|
||||
ok = load_app(emqx_modules),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
[{port_discovery, PortDiscovery} | Config].
|
||||
|
||||
|
@ -50,32 +53,45 @@ end_per_suite(Config) ->
|
|||
_ -> ok
|
||||
end.
|
||||
|
||||
t_is_ack_required(_) ->
|
||||
init_per_testcase(Case, Config) ->
|
||||
try
|
||||
?MODULE:Case({'init', Config})
|
||||
catch
|
||||
error : function_clause ->
|
||||
Config
|
||||
end.
|
||||
|
||||
end_per_testcase(Case, Config) ->
|
||||
try
|
||||
?MODULE:Case({'end', Config})
|
||||
catch
|
||||
error : function_clause ->
|
||||
ok
|
||||
end.
|
||||
|
||||
t_is_ack_required(Config) when is_list(Config) ->
|
||||
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
|
||||
|
||||
t_maybe_nack_dropped(_) ->
|
||||
t_maybe_nack_dropped(Config) when is_list(Config) ->
|
||||
?assertEqual(store, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
|
||||
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
||||
?assertEqual(drop, emqx_shared_sub:maybe_nack_dropped(Msg)),
|
||||
?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end).
|
||||
|
||||
t_nack_no_connection(_) ->
|
||||
t_nack_no_connection(Config) when is_list(Config) ->
|
||||
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
||||
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
|
||||
?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok
|
||||
after 100 -> timeout end).
|
||||
|
||||
t_maybe_ack(_) ->
|
||||
t_maybe_ack(Config) when is_list(Config) ->
|
||||
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
|
||||
Msg = #message{headers = #{shared_dispatch_ack => {self(), {fresh, <<"group">>, for_test}}}},
|
||||
?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}},
|
||||
emqx_shared_sub:maybe_ack(Msg)),
|
||||
?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).
|
||||
|
||||
% t_subscribers(_) ->
|
||||
% error('TODO').
|
||||
|
||||
t_random_basic(_) ->
|
||||
t_random_basic(Config) when is_list(Config) ->
|
||||
ok = ensure_config(random),
|
||||
ClientId = <<"ClientId">>,
|
||||
Topic = <<"foo">>,
|
||||
|
@ -105,7 +121,7 @@ t_random_basic(_) ->
|
|||
%% After the connection for the 2nd session is also closed,
|
||||
%% i.e. when all clients are offline, the following message(s)
|
||||
%% should be delivered randomly.
|
||||
t_no_connection_nack(_) ->
|
||||
t_no_connection_nack(Config) when is_list(Config) ->
|
||||
ok = ensure_config(sticky),
|
||||
Publisher = <<"publisher">>,
|
||||
Subscriber1 = <<"Subscriber1">>,
|
||||
|
@ -171,27 +187,27 @@ t_no_connection_nack(_) ->
|
|||
% emqx_sm:close_session(SPid2),
|
||||
ok.
|
||||
|
||||
t_random(_) ->
|
||||
t_random(Config) when is_list(Config) ->
|
||||
ok = ensure_config(random, true),
|
||||
test_two_messages(random).
|
||||
|
||||
t_round_robin(_) ->
|
||||
t_round_robin(Config) when is_list(Config) ->
|
||||
ok = ensure_config(round_robin, true),
|
||||
test_two_messages(round_robin).
|
||||
|
||||
t_sticky(_) ->
|
||||
t_sticky(Config) when is_list(Config) ->
|
||||
ok = ensure_config(sticky, true),
|
||||
test_two_messages(sticky).
|
||||
|
||||
t_hash(_) ->
|
||||
t_hash(Config) when is_list(Config) ->
|
||||
ok = ensure_config(hash, false),
|
||||
test_two_messages(hash).
|
||||
|
||||
t_hash_clinetid(_) ->
|
||||
t_hash_clinetid(Config) when is_list(Config) ->
|
||||
ok = ensure_config(hash_clientid, false),
|
||||
test_two_messages(hash_clientid).
|
||||
|
||||
t_hash_topic(_) ->
|
||||
t_hash_topic(Config) when is_list(Config) ->
|
||||
ok = ensure_config(hash_topic, false),
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
ClientId2 = <<"ClientId2">>,
|
||||
|
@ -230,7 +246,7 @@ t_hash_topic(_) ->
|
|||
ok.
|
||||
|
||||
%% if the original subscriber dies, change to another one alive
|
||||
t_not_so_sticky(_) ->
|
||||
t_not_so_sticky(Config) when is_list(Config) ->
|
||||
ok = ensure_config(sticky),
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
ClientId2 = <<"ClientId2">>,
|
||||
|
@ -303,7 +319,7 @@ last_message(ExpectedPayload, Pids, Timeout) ->
|
|||
<<"not yet?">>
|
||||
end.
|
||||
|
||||
t_dispatch(_) ->
|
||||
t_dispatch(Config) when is_list(Config) ->
|
||||
ok = ensure_config(random),
|
||||
Topic = <<"foo">>,
|
||||
?assertEqual({error, no_subscribers},
|
||||
|
@ -312,18 +328,13 @@ t_dispatch(_) ->
|
|||
?assertEqual({ok, 1},
|
||||
emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})).
|
||||
|
||||
% t_unsubscribe(_) ->
|
||||
% error('TODO').
|
||||
|
||||
% t_subscribe(_) ->
|
||||
% error('TODO').
|
||||
t_uncovered_func(_) ->
|
||||
t_uncovered_func(Config) when is_list(Config) ->
|
||||
ignored = gen_server:call(emqx_shared_sub, ignored),
|
||||
ok = gen_server:cast(emqx_shared_sub, ignored),
|
||||
ignored = emqx_shared_sub ! ignored,
|
||||
{mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
|
||||
|
||||
t_per_group_config(_) ->
|
||||
t_per_group_config(Config) when is_list(Config) ->
|
||||
ok = ensure_group_config(#{
|
||||
<<"local_group_fallback">> => local,
|
||||
<<"local_group">> => local,
|
||||
|
@ -342,8 +353,8 @@ t_per_group_config(_) ->
|
|||
test_two_messages(round_robin, <<"round_robin_group">>),
|
||||
test_two_messages(round_robin, <<"round_robin_group">>).
|
||||
|
||||
t_local(_) ->
|
||||
Node = start_slave('local_shared_sub_test19', 21884),
|
||||
t_local({'init', Config}) ->
|
||||
Node = start_slave(local_shared_sub_test19, 21884),
|
||||
GroupConfig = #{
|
||||
<<"local_group_fallback">> => local,
|
||||
<<"local_group">> => local,
|
||||
|
@ -352,7 +363,11 @@ t_local(_) ->
|
|||
},
|
||||
ok = ensure_group_config(Node, GroupConfig),
|
||||
ok = ensure_group_config(GroupConfig),
|
||||
|
||||
[{slave_node, Node} | Config];
|
||||
t_local({'end', _Config}) ->
|
||||
ok = stop_slave(local_shared_sub_test19);
|
||||
t_local(Config) when is_list(Config) ->
|
||||
Node = proplists:get_value(slave_node, Config),
|
||||
Topic = <<"local_foo1/bar">>,
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
ClientId2 = <<"ClientId2">>,
|
||||
|
@ -388,7 +403,7 @@ t_local(_) ->
|
|||
?assertNotEqual(UsedSubPid1, UsedSubPid2),
|
||||
ok.
|
||||
|
||||
t_local_fallback(_) ->
|
||||
t_local_fallback({'init', Config}) ->
|
||||
ok = ensure_group_config(#{
|
||||
<<"local_group_fallback">> => local,
|
||||
<<"local_group">> => local,
|
||||
|
@ -396,10 +411,15 @@ t_local_fallback(_) ->
|
|||
<<"sticky_group">> => sticky
|
||||
}),
|
||||
|
||||
Node = start_slave(local_fallback_shared_sub_test19, 11885),
|
||||
[{slave_node, Node} | Config];
|
||||
t_local_fallback({'end', _}) ->
|
||||
ok = stop_slave(local_fallback_shared_sub_test19);
|
||||
t_local_fallback(Config) when is_list(Config) ->
|
||||
Topic = <<"local_foo2/bar">>,
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
ClientId2 = <<"ClientId2">>,
|
||||
Node = start_slave('local_fallback_shared_sub_test19', 11885),
|
||||
Node = proplists:get_value(slave_node, Config),
|
||||
|
||||
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
||||
{ok, _} = emqtt:connect(ConnPid1),
|
||||
|
@ -407,6 +427,7 @@ t_local_fallback(_) ->
|
|||
Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
|
||||
|
||||
emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/", Topic/binary>>, 0}),
|
||||
ok = emqx_node_helpers:wait_for_synced_routes([node(), Node], Topic, timer:seconds(10)),
|
||||
|
||||
[{share, Topic, {ok, 1}}] = emqx:publish(Message1),
|
||||
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),
|
||||
|
@ -422,7 +443,7 @@ t_local_fallback(_) ->
|
|||
|
||||
%% This one tests that broker tries to select another shared subscriber
|
||||
%% If the first one doesn't return an ACK
|
||||
t_redispatch(_) ->
|
||||
t_redispatch(Config) when is_list(Config) ->
|
||||
ok = ensure_config(sticky, true),
|
||||
application:set_env(emqx, shared_dispatch_ack_enabled, true),
|
||||
|
||||
|
@ -453,7 +474,7 @@ t_redispatch(_) ->
|
|||
emqtt:stop(UsedSubPid2),
|
||||
ok.
|
||||
|
||||
t_dispatch_when_inflights_are_full(_) ->
|
||||
t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
|
||||
ok = ensure_config(round_robin, true),
|
||||
Topic = <<"foo/bar">>,
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
|
@ -536,8 +557,20 @@ recv_msgs(Count, Msgs) ->
|
|||
end.
|
||||
|
||||
start_slave(Name, Port) ->
|
||||
ok = emqx_ct_helpers:start_apps([emqx_modules]),
|
||||
Listeners = [#{listen_on => {{127,0,0,1}, Port},
|
||||
start_apps => [emqx, emqx_modules],
|
||||
name => "internal",
|
||||
opts => [{zone,internal}],
|
||||
proto => tcp}],
|
||||
emqx_node_helpers:start_slave(Name, #{listeners => Listeners}).
|
||||
|
||||
stop_slave(Name) ->
|
||||
emqx_node_helpers:stop_slave(Name).
|
||||
|
||||
load_app(App) ->
|
||||
case application:load(App) of
|
||||
ok -> ok;
|
||||
{error, {already_loaded, _}} -> ok;
|
||||
{error, Reason} -> error({failed_to_load_app, App, Reason})
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue