Merge pull request #8524 from emqx/copy-of_main-v4.3

merge v4.3 -> v4.4
This commit is contained in:
Xinyu Liu 2022-07-20 21:31:01 +08:00 committed by GitHub
commit 44ea4696a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 367 additions and 21 deletions

View File

@ -3,6 +3,7 @@
{VSN,
[{"4.4.5",
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
@ -72,6 +73,7 @@
{<<".*">>,[]}],
[{"4.4.5",
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},

View File

@ -33,7 +33,7 @@
}.
-type data_type() :: string | password | number | boolean
| object | array | file | cfgselect.
| object | array | file | binary_file | cfgselect.
-type params_spec() :: #{name() => spec()} | any.
-type params() :: #{binary() => term()}.
@ -46,6 +46,7 @@
, object
, array
, file
, binary_file
, cfgselect %% TODO: [5.0] refactor this
]).
@ -84,8 +85,8 @@ validate_spec(ParamsSepc) ->
%% Internal Functions
%%------------------------------------------------------------------------------
validate_value(Val, #{type := Types} = Spec) when is_list(Types) ->
validate_types(Val, Types, Spec);
validate_value(Val, #{type := Union} = Spec) when is_list(Union) ->
validate_union(Val, Union, Spec);
validate_value(Val, #{enum := Enum}) ->
validate_enum(Val, Enum);
validate_value(Val, #{type := object} = Spec) ->
@ -93,17 +94,22 @@ validate_value(Val, #{type := object} = Spec) ->
validate_value(Val, #{type := Type} = Spec) ->
validate_type(Val, Type, Spec).
validate_types(Val, [], _Spec) ->
throw({invalid_data_type, Val});
validate_types(Val, [Type | Types], Spec) ->
validate_union(Val, Union, _Spec) ->
do_validate_union(Val, Union, Union, _Spec).
do_validate_union(Val, Union, [], _Spec) ->
throw({invalid_data_type, {union, {Val, Union}}});
do_validate_union(Val, Union, [Type | Types], Spec) ->
try
validate_type(Val, Type, Spec)
catch _:_ ->
validate_types(Val, Types, Spec)
do_validate_union(Val, Union, Types, Spec)
end.
validate_type(Val, file, _Spec) ->
validate_file(Val);
validate_type(Val, binary_file, _Spec) ->
validate_binary_file(Val);
validate_type(Val, String, Spec) when String =:= string;
String =:= password ->
validate_string(Val, reg_exp(maps:get(format, Spec, any)));
@ -118,6 +124,8 @@ validate_type(Val, cfgselect, _Spec) ->
%% TODO: [5.0] refactor this.
Val.
validate_enum(Val, BoolEnum) when BoolEnum == [true, false]; BoolEnum == [false, true] ->
validate_boolean(Val);
validate_enum(Val, Enum) ->
case lists:member(Val, Enum) of
true -> Val;
@ -147,6 +155,10 @@ validate_boolean(false) -> false;
validate_boolean(<<"false">>) -> false;
validate_boolean(Val) -> throw({invalid_data_type, {boolean, Val}}).
validate_binary_file(#{<<"file">> := _, <<"filename">> := _} = Val) ->
Val;
validate_binary_file(Val) ->
throw({invalid_data_type, {binary_file, Val}}).
validate_file(Val) when is_map(Val) -> Val;
validate_file(Val) when is_list(Val) -> Val;
validate_file(Val) when is_binary(Val) -> Val;
@ -163,6 +175,14 @@ do_validate_spec(Name, #{type := object} = Spec) ->
fun (not_found) -> throw({required_field_missing, {schema, {in, Name}}});
(Schema) -> validate_spec(Schema)
end);
do_validate_spec(Name, #{type := cfgselect} = Spec) ->
find_field(items, Spec,
fun (not_found) -> throw({required_field_missing, {items, {in, Name}}});
(Items) ->
maps:map(fun(_K, Schema) ->
validate_spec(Schema)
end, Items)
end);
do_validate_spec(Name, #{type := array} = Spec) ->
find_field(items, Spec,
fun (not_found) -> throw({required_field_missing, {items, {in, Name}}});

View File

@ -70,6 +70,32 @@
}
}
},
type_cfgselect => #{
type => cfgselect,
enum => [<<"upload">>, <<"path">>],
default => <<"upload">>,
items =>
#{
upload =>
#{
kerberos_keytab =>
#{
order => 6,
type => binary_file,
default => #{file => <<"">>, filename => <<"no_keytab.key">>}
}
},
path =>
#{
kerberos_keytab_path =>
#{
order => 7,
type => string,
default => <<"">>
}
}
}
},
type_array => #{
type => array,
required => true,
@ -88,7 +114,7 @@ t_validate_spec_the_complex(_) ->
t_validate_spec_invalid_1(_) ->
?assertThrow({required_field_missing, {type, _}},
emqx_rule_validator:validate_spec(#{
type_enum_number => #{
a => #{
required => true
}
})).
@ -96,15 +122,23 @@ t_validate_spec_invalid_1(_) ->
t_validate_spec_invalid_2(_) ->
?assertThrow({required_field_missing, {schema, _}},
emqx_rule_validator:validate_spec(#{
type_enum_number => #{
a => #{
type => object
}
})).
t_validate_spec_invalid_2_1(_) ->
?assertThrow({required_field_missing, {items, _}},
emqx_rule_validator:validate_spec(#{
a => #{
type => cfgselect
}
})).
t_validate_spec_invalid_3(_) ->
?assertThrow({required_field_missing, {items, _}},
emqx_rule_validator:validate_spec(#{
type_enum_number => #{
a => #{
type => array
}
})).
@ -162,6 +196,22 @@ t_validate_params_fill_default(_) ->
?assertMatch(#{<<"abc">> := 1, <<"eee">> := <<"hello">>},
emqx_rule_validator:validate_params(Params, Specs)).
t_validate_params_binary_file(_) ->
Params = #{<<"kfile">> => #{<<"file">> => <<"foo">>, <<"filename">> => <<"foo.key">>}},
Specs = #{<<"kfile">> => #{
type => binary_file,
required => true
}},
?assertMatch(#{<<"kfile">> := #{<<"file">> := <<"foo">>, <<"filename">> := <<"foo.key">>}},
emqx_rule_validator:validate_params(Params, Specs)),
Params1 = #{<<"kfile">> => #{<<"file">> => <<"foo">>}},
Specs1 = #{<<"kfile">> => #{
type => binary_file,
required => true
}},
?assertThrow({invalid_data_type, {binary_file, #{<<"file">> := <<"foo">>}}},
emqx_rule_validator:validate_params(Params1, Specs1)).
t_validate_params_the_complex(_) ->
Params = #{
<<"string_required">> => <<"hello">>,
@ -173,6 +223,8 @@ t_validate_params_the_complex(_) ->
<<"string_required">> => <<"hello2">>,
<<"type_number">> => 1.3
},
<<"type_cfgselect">> => <<"upload">>,
<<"kerberos_keytab">> => #{<<"file">> => <<"foo">>, <<"filename">> => <<"foo.key">>},
<<"type_array">> => [<<"ok">>, <<"no">>]
},
?assertMatch(
@ -186,6 +238,8 @@ t_validate_params_the_complex(_) ->
<<"string_required">> := <<"hello2">>,
<<"type_number">> := 1.3
},
<<"kerberos_keytab">> := #{<<"file">> := <<"foo">>, <<"filename">> := <<"foo.key">>},
<<"type_cfgselect">> := <<"upload">>,
<<"type_array">> := [<<"ok">>, <<"no">>]
},
emqx_rule_validator:validate_params(Params, ?VALID_SPEC)).

View File

@ -16,6 +16,7 @@ RUNNER_ROOT_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")"/..; pwd -P)"
EMQX_LICENSE_CONF=''
REL_NAME="emqx"
ERTS_PATH="$RUNNER_ROOT_DIR/erts-$ERTS_VSN/bin"
export EMQX_DESCRIPTION
RUNNER_SCRIPT="$RUNNER_BIN_DIR/$REL_NAME"
CODE_LOADING_MODE="${CODE_LOADING_MODE:-embedded}"

187
bin/emqx_cluster_rescue Executable file
View File

@ -0,0 +1,187 @@
#!/usr/bin/env bash
set -euo pipefail
# ==================================
# RESCUE THE UNBOOTABLE EMQX CLUSTER
# ==================================
## Global Vars
# Steal from emqx_ctl
THIS_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")" || true; pwd -P)"
usage() {
local Script
Script=$(basename "$0")
echo "
RESCUE THE UNBOOTABLE EMQX CLUSTER
Use this script only when the entire cluster is stuck at booting & loading.
This script provides a list of methods to *hack* the DB of EMQX to bring back
the cluster back to service but MAY come with some side effects including:
- Data loss
- Inconsistent data in the cluster
- Other undefined behaviors
*DO NOT* use this script unless you understand the consequences.
*DO NOT* use this script when EMQX cluster is partitioned.
Use Case:
- Lost one node due to unrecoverable failures (hardware, cloud resource outage)
and this node prevents other nodes in the cluster from starting.
Usage:
# For troubleshooting, find out all the tables that are pending at loading
$Script pending-tables
# For troubleshooting, debug print detailed table info that is pending at loading.
$Script table-details
# Force load one [Tab] or all pending tables from node local storage to bring this node up
# Use local data as the data source for the pending tables, should bring up the node immediately and
# spread the data to other nodes in the cluster.
#
# * Take effect immediately
# * This is a node local change but the change will be lost after restart.
$Script force-load [Tab]
# Remove Node from mnesia cluster.
# Most likely will fail if the remote Node is unreachable.
#
# * This is a cluster wide schema change.
$Script remove-node Node
# Set master node for distributed DB
# The master node will be the data source for pending tables.
#
# * This is a node local change
# * Node could be a remote Erlang node in the cluster or local erlang node
# * Use command: 'unset-master' to rollback
$Script set-master Node
# Unset master node for distributed DB, this is a node local change
$Script unset-master
# Cheat the local node that RemoteNode is down so that it will not wait for it to come up.
# Local node will take local data as the data source for pending tables and spread the data
# to the other pending nodes.
#
# * Check EMQX logs to find out which remote node(s) the local node is waiting for
# * To take effect, restart this EMQX node
# * This is a node local setting
$Script lie-node-down RemoteNode
Tips:
- Override local node name with envvar: \$EMQX_NODE_NAME
"
}
# Functions
#
print_pending_tables() {
local erl_cmd='[ io:format("~p :: ~p~n", [T, maps:with([all_nodes, load_order, storage_type,
active_replicas, local_content, load_by_force,
load_node, load_reason, master_nodes]
, maps:from_list(mnesia:table_info(T, all)))])
|| T <- mnesia:system_info(local_tables), unknown =:= mnesia:table_info(T, load_node) ],
ok
'
exec "$THIS_DIR/emqx" eval "$erl_cmd"
}
print_details_per_table() {
local erl_cmd='[ io:format("~p :: ~p~n", [T, mnesia:table_info(T, all)])
|| T <- mnesia:system_info(local_tables), unknown =:= mnesia:table_info(T, load_node) ],
ok
'
exec "$THIS_DIR/emqx" eval "$erl_cmd"
}
force-load() {
if [ $# -eq 1 ]; then
local erl_cmd="mnesia:force_load_table(${1})"
else
local erl_cmd='[ {T, mnesia:force_load_table(T)}
|| T <- mnesia:system_info(local_tables),
unknown =:= mnesia:table_info(T, load_node)
]
'
fi
exec "$THIS_DIR/emqx" eval "$erl_cmd"
}
remove-node() {
local target_node=$1
local erl_cmd="
case [T || T <- mnesia:system_info(local_tables), unknown =:= mnesia:table_info(T, load_node)] of
[] ->
io:format(\"No table need to load\\n\"),
skipped;
TargetTables ->
io:format(\"Going to remove node ${target_node} from schema of the tables:~n~p~n\", [TargetTables]),
case io:read(\"confirm? [yes.] OR Ctrl-D to skip: \") of
{ok, yes} ->
lists:map(fun(T) ->
mnesia:force_load_table(T),
{T, mnesia:del_table_copy(T, '${target_node}') }
end, TargetTables);
eof -> skipped;
R -> {skipped, R}
end
end
"
exec "$THIS_DIR/emqx" eval "$erl_cmd"
}
set-master-node() {
if [ $# -eq 1 ]; then
local erl_cmd="mnesia:set_master_nodes(['${1}']), mnesia_recover:dump_decision_tab()"
else
local erl_cmd="mnesia:set_master_nodes([]), mnesia_recover:dump_decision_tab()"
fi
exec "$THIS_DIR/emqx" eval "$erl_cmd"
}
lie-node-down() {
if [ $# -eq 1 ]; then
local erl_cmd="mnesia_recover:log_mnesia_down('${1}'), mnesia_recover:dump_decision_tab()"
exec "$THIS_DIR/emqx" eval "$erl_cmd"
else
usage
fi
}
CMD=${1:-usage}
[ $# -gt 0 ] && shift 1
case "$CMD" in
force-load)
force-load "$@"
;;
remove-node)
remove-node "$@"
;;
pending-tables)
print_pending_tables
;;
table-details)
print_details_per_table
;;
set-master)
set-master-node "$@"
;;
unset-master)
set-master-node
;;
lie-node-down)
lie-node-down "$@"
;;
*)
usage
esac

View File

@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE).
-define(EMQX_RELEASE, {opensource, "4.4.6-beta.1"}).
-define(EMQX_RELEASE, {opensource, "4.4.6-beta.2"}).
-else.

View File

@ -338,6 +338,7 @@ relx_overlay(ReleaseType) ->
, {template, "data/emqx_vars", "releases/emqx_vars"}
, {copy, "bin/emqx", "bin/emqx"}
, {copy, "bin/emqx_ctl", "bin/emqx_ctl"}
, {copy, "bin/emqx_cluster_rescue", "bin/emqx_cluster_rescue"}
, {copy, "bin/node_dump", "bin/node_dump"}
, {copy, "bin/install_upgrade.escript", "bin/install_upgrade.escript"}
, {copy, "bin/emqx", "bin/emqx-{{release_version}}"} %% for relup

View File

@ -4,11 +4,13 @@
[{"4.4.5",
[
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
@ -24,6 +26,7 @@
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]},
{"4.4.3",
[{add_module,emqx_calendar},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
@ -49,6 +52,7 @@
{load_module,emqx_relup}]},
{"4.4.2",
[{add_module,emqx_calendar},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
@ -77,6 +81,7 @@
{load_module,emqx_relup}]},
{"4.4.1",
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{apply,{emqx_exclusive_subscription,on_add_module,[]}},
@ -113,6 +118,7 @@
{add_module,emqx_relup}]},
{"4.4.0",
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{add_module,emqx_calendar},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
@ -153,13 +159,14 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.4.5",
[
[{update,emqx_broker_sup,supervisor},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
@ -175,6 +182,7 @@
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]},
{"4.4.3",
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
@ -199,6 +207,7 @@
{load_module,emqx_relup}]},
{"4.4.2",
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
@ -226,6 +235,7 @@
{load_module,emqx_relup}]},
{"4.4.1",
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{apply,{emqx_exclusive_subscription,on_delete_module,[]}},
@ -261,6 +271,7 @@
{delete_module,emqx_relup}]},
{"4.4.0",
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{apply,{emqx_exclusive_subscription,on_delete_module,[]}},

View File

@ -51,5 +51,4 @@ init([]) ->
type => worker,
modules => [emqx_broker_helper]},
{ok, {{one_for_all, 0, 1}, [BrokerPool, SharedSub, Helper]}}.
{ok, {{one_for_one, 1, 5}, [BrokerPool, SharedSub, Helper]}}.

View File

@ -164,27 +164,24 @@ ack_enabled() ->
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
%% Deadlock otherwise
SubPid ! {deliver, Topic, Msg},
ok;
send(SubPid, Topic, {deliver, Topic, Msg});
%% return either 'ok' (when everything is fine) or 'error'
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
%% For QoS 0 message, send it as regular dispatch
SubPid ! {deliver, Topic, Msg},
ok;
send(SubPid, Topic, {deliver, Topic, Msg});
do_dispatch(SubPid, Group, Topic, Msg, Type) ->
case ack_enabled() of
true ->
dispatch_with_ack(SubPid, Group, Topic, Msg, Type);
false ->
SubPid ! {deliver, Topic, Msg},
ok
send(SubPid, Topic, {deliver, Topic, Msg})
end.
dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
%% For QoS 1/2 message, expect an ack
Ref = erlang:monitor(process, SubPid),
Sender = self(),
SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)},
send(SubPid, Topic, {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}),
Timeout = case Msg#message.qos of
?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
?QOS_2 -> infinity
@ -210,6 +207,15 @@ dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
_ = erlang:demonitor(Ref, [flush])
end.
send(Pid, Topic, Msg) ->
Node = node(Pid),
if Node =:= node() ->
Pid ! Msg;
true ->
emqx_rpc:cast(Topic, Node, erlang, send, [Pid, Msg])
end,
ok.
with_group_ack(Msg, Group, Type, Sender, Ref) ->
emqx_message:set_headers(#{shared_dispatch_ack => {Sender, old_ref(Type, Group, Ref)}}, Msg).

View File

@ -0,0 +1,65 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_broker_sup_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-define(APP, emqx).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
init_per_testcase(Case, Config) ->
?MODULE:Case({init, Config}).
end_per_testcase(Case, Config) ->
?MODULE:Case({'end', Config}).
%%--------------------------------------------------------------------
%% cases
%%--------------------------------------------------------------------
t_restart_shared_sub({init, Config}) ->
emqx:subscribe(<<"t/a">>, #{share => <<"groupa">>}),
true = exit(whereis(emqx_shared_sub), kill),
%% waiting for restart
timer:sleep(200), Config;
t_restart_shared_sub(Config) when is_list(Config) ->
?assert(is_pid(whereis(emqx_shared_sub))),
emqx:publish(emqx_message:make(<<"t/a">>, <<"Hi">>)),
?assert(
receive
{deliver, _Topic, #message{payload = <<"Hi">>}} -> true
after 2000 ->
false
end);
t_restart_shared_sub({'end', Config}) ->
emqx:unsubscribe(<<"$share/grpa/t/a">>).