Merge branch 'main-v4.3' into changes-restart-strategy
This commit is contained in:
commit
befde373c8
|
@ -19,6 +19,10 @@ File format:
|
|||
|
||||
- Fixed crash when shared persistent subscription [#8441]
|
||||
|
||||
### Enhancements
|
||||
- HTTP API(GET /rules/) support for pagination and fuzzy filtering. [#8450]
|
||||
- Add check_conf cli to check config format. [#8486]
|
||||
|
||||
## v4.3.16
|
||||
|
||||
### Enhancements
|
||||
|
|
|
@ -122,7 +122,7 @@ format({_Subscriber, Topic, Options = #{share := Group}}) ->
|
|||
#{node => node(), topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS};
|
||||
format({_Subscriber, Topic, Options}) ->
|
||||
QoS = maps:get(qos, Options),
|
||||
#{node => node(), topic => Topic, clientid => maps:get(subid, Options), qos => QoS}.
|
||||
#{node => node(), topic => Topic, clientid => maps:get(subid, Options, ""), qos => QoS}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Query Function
|
||||
|
|
|
@ -2,14 +2,16 @@
|
|||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.3.11",
|
||||
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.10",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.9",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
|
@ -162,14 +164,16 @@
|
|||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.3.11",
|
||||
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.10",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.9",
|
||||
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -184,6 +184,17 @@
|
|||
]).
|
||||
|
||||
-export([list_events/2]).
|
||||
-export([query/3]).
|
||||
|
||||
-define(RULE_QS_SCHEMA, {?RULE_TAB,
|
||||
[
|
||||
{<<"enabled">>, atom},
|
||||
{<<"for">>, binary},
|
||||
{<<"_like_id">>, binary},
|
||||
{<<"_like_for">>, binary},
|
||||
{<<"_match_for">>, binary},
|
||||
{<<"_like_description">>, binary}
|
||||
]}).
|
||||
|
||||
-define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~s Not Found", [(ID)]))).
|
||||
-define(ERR_NO_ACTION(NAME), list_to_binary(io_lib:format("Action ~s Not Found", [(NAME)]))).
|
||||
|
@ -261,8 +272,14 @@ update_rule(#{id := Id}, Params) ->
|
|||
return({error, 400, ?ERR_BADARGS(Reason)})
|
||||
end.
|
||||
|
||||
list_rules(_Bindings, _Params) ->
|
||||
return_all(emqx_rule_registry:get_rules_ordered_by_ts()).
|
||||
list_rules(_Bindings, Params) ->
|
||||
case proplists:get_value(<<"enable_paging">>, Params, true) of
|
||||
true ->
|
||||
SortFun = fun(#{created_at := C1}, #{created_at := C2}) -> C1 > C2 end,
|
||||
return({ok, emqx_mgmt_api:node_query(node(), Params, ?RULE_QS_SCHEMA, {?MODULE, query}, SortFun)});
|
||||
false ->
|
||||
return_all(emqx_rule_registry:get_rules_ordered_by_ts())
|
||||
end.
|
||||
|
||||
show_rule(#{id := Id}, _Params) ->
|
||||
reply_with(fun emqx_rule_registry:get_rule/1, Id).
|
||||
|
@ -454,6 +471,7 @@ record_to_map(#rule{id = Id,
|
|||
actions = Actions,
|
||||
on_action_failed = OnFailed,
|
||||
enabled = Enabled,
|
||||
created_at = CreatedAt,
|
||||
description = Descr}) ->
|
||||
#{id => Id,
|
||||
for => Hook,
|
||||
|
@ -462,6 +480,7 @@ record_to_map(#rule{id = Id,
|
|||
on_action_failed => OnFailed,
|
||||
metrics => get_rule_metrics(Id),
|
||||
enabled => Enabled,
|
||||
created_at => CreatedAt,
|
||||
description => Descr
|
||||
};
|
||||
|
||||
|
@ -599,3 +618,41 @@ get_action_metrics(Id) ->
|
|||
Res -> [maps:put(node, Node, Res)]
|
||||
end
|
||||
|| Node <- ekka_mnesia:running_nodes()]).
|
||||
|
||||
query({Qs, []}, Start, Limit) ->
|
||||
Ms = qs2ms(Qs),
|
||||
emqx_mgmt_api:select_table(?RULE_TAB, Ms, Start, Limit, fun record_to_map/1);
|
||||
|
||||
query({Qs, Fuzzy}, Start, Limit) ->
|
||||
Ms = qs2ms(Qs),
|
||||
MatchFun = match_fun(Ms, Fuzzy),
|
||||
emqx_mgmt_api:traverse_table(?RULE_TAB, MatchFun, Start, Limit, fun record_to_map/1).
|
||||
|
||||
qs2ms(Qs) ->
|
||||
Init = #rule{for = '_', enabled = '_', _ = '_'},
|
||||
MatchHead = lists:foldl(fun(Q, Acc) -> match_ms(Q, Acc) end, Init, Qs),
|
||||
[{MatchHead, [], ['$_']}].
|
||||
|
||||
match_ms({for, '=:=', Value}, MatchHead) -> MatchHead#rule{for = Value};
|
||||
match_ms({enabled, '=:=', Value}, MatchHead) -> MatchHead#rule{enabled = Value};
|
||||
match_ms(_, MatchHead) -> MatchHead.
|
||||
|
||||
match_fun(Ms, Fuzzy) ->
|
||||
MsC = ets:match_spec_compile(Ms),
|
||||
fun(Rows) ->
|
||||
Ls = ets:match_spec_run(Rows, MsC),
|
||||
lists:filter(fun(E) -> run_fuzzy_match(E, Fuzzy) end, Ls)
|
||||
end.
|
||||
|
||||
run_fuzzy_match(_, []) -> true;
|
||||
run_fuzzy_match(E = #rule{id = Id}, [{id, like, Pattern}|Fuzzy]) ->
|
||||
binary:match(Id, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy);
|
||||
run_fuzzy_match(E = #rule{description = Desc}, [{description, like, Pattern}|Fuzzy]) ->
|
||||
binary:match(Desc, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy);
|
||||
run_fuzzy_match(E = #rule{for = Topics}, [{for, match, Pattern}|Fuzzy]) ->
|
||||
lists:any(fun(For) -> emqx_topic:match(For, Pattern) end, Topics)
|
||||
andalso run_fuzzy_match(E, Fuzzy);
|
||||
run_fuzzy_match(E = #rule{for = Topics}, [{for, like, Pattern}|Fuzzy]) ->
|
||||
lists:any(fun(For) -> binary:match(For, Pattern) /= nomatch end, Topics)
|
||||
andalso run_fuzzy_match(E, Fuzzy);
|
||||
run_fuzzy_match(_E, [{_Key, like, _SubStr}| _Fuzzy]) -> false.
|
||||
|
|
|
@ -62,7 +62,8 @@ groups() ->
|
|||
t_show_action_api,
|
||||
t_crud_resources_api,
|
||||
t_list_resource_types_api,
|
||||
t_show_resource_type_api
|
||||
t_show_resource_type_api,
|
||||
t_list_rule_api
|
||||
]},
|
||||
{cli, [],
|
||||
[t_rules_cli,
|
||||
|
@ -513,6 +514,74 @@ t_crud_rule_api(_Config) ->
|
|||
?assertMatch({ok, #{code := 404, message := _Message}}, NotFound),
|
||||
ok.
|
||||
|
||||
t_list_rule_api(_Config) ->
|
||||
AddIds =
|
||||
lists:map(fun(Seq) ->
|
||||
SeqBin = integer_to_binary(Seq),
|
||||
{ok, #{code := 0, data := #{id := Id}}} =
|
||||
emqx_rule_engine_api:create_rule(#{},
|
||||
[{<<"name">>, <<"debug-rule-", SeqBin/binary>>},
|
||||
{<<"rawsql">>, <<"select * from \"t/a/", SeqBin/binary, "\"">>},
|
||||
{<<"actions">>, [[{<<"name">>,<<"inspect">>}, {<<"params">>,[{<<"arg1">>,1}]}]]},
|
||||
{<<"description">>, <<"debug rule desc ", SeqBin/binary>>}]),
|
||||
Id
|
||||
end, lists:seq(1, 20)),
|
||||
|
||||
{ok, #{code := 0, data := Rules11}} = emqx_rule_engine_api:list_rules(#{},
|
||||
[{<<"_limit">>,<<"10">>}, {<<"_page">>, <<"1">>}, {<<"enable_paging">>, true}]),
|
||||
?assertEqual(10, length(Rules11)),
|
||||
{ok, #{code := 0, data := Rules12}} = emqx_rule_engine_api:list_rules(#{},
|
||||
[{<<"_limit">>,<<"10">>}, {<<"_page">>, <<"2">>}, {<<"enable_paging">>, true}]),
|
||||
?assertEqual(10, length(Rules12)),
|
||||
Rules1 = Rules11 ++ Rules12,
|
||||
|
||||
[RuleID | _] = AddIds,
|
||||
{ok, #{code := 0}} = emqx_rule_engine_api:update_rule(#{id => RuleID},
|
||||
[{<<"enabled">>, false}]),
|
||||
Params1 = [{<<"enabled">>,<<"true">>}, {<<"enable_paging">>, true}],
|
||||
{ok, #{code := 0, data := Rules2}} = emqx_rule_engine_api:list_rules(#{}, Params1),
|
||||
?assert(lists:all(fun(#{id := ID}) -> ID =/= RuleID end, Rules2)),
|
||||
|
||||
Params2 = [{<<"for">>, RuleID}, {<<"enable_paging">>, true}],
|
||||
{ok, #{code := 0, data := Rules3}} = emqx_rule_engine_api:list_rules(#{}, Params2),
|
||||
?assert(lists:all(fun(#{id := ID}) -> ID =:= RuleID end, Rules3)),
|
||||
|
||||
Params3 = [{<<"_like_id">>,<<"rule:">>}, {<<"enable_paging">>, true}],
|
||||
{ok, #{code := 0, data := Rules4}} = emqx_rule_engine_api:list_rules(#{}, Params3),
|
||||
?assertEqual(length(Rules1), length(Rules4)),
|
||||
|
||||
Params4 = [{<<"_like_for">>,<<"t/a/">>}, {<<"enable_paging">>, true}],
|
||||
{ok, #{code := 0, data := Rules5}} = emqx_rule_engine_api:list_rules(#{}, Params4),
|
||||
?assertEqual(length(Rules1), length(Rules5)),
|
||||
{ok, #{code := 0}} = emqx_rule_engine_api:update_rule(#{id => RuleID},
|
||||
[{<<"rawsql">>, <<"select * from \"t/b/c\"">>}]),
|
||||
{ok, #{code := 0, data := Rules6}} = emqx_rule_engine_api:list_rules(#{}, Params4),
|
||||
?assert(lists:all(fun(#{id := ID}) -> ID =/= RuleID end, Rules6)),
|
||||
?assertEqual(1, length(Rules1) - length(Rules6)),
|
||||
|
||||
Params5 = [{<<"_match_for">>,<<"t/+/+">>}, {<<"enable_paging">>, true}],
|
||||
{ok, #{code := 0, data := Rules7}} = emqx_rule_engine_api:list_rules(#{}, Params5),
|
||||
?assertEqual(length(Rules1), length(Rules7)),
|
||||
{ok, #{code := 0}} = emqx_rule_engine_api:update_rule(#{id => RuleID},
|
||||
[{<<"rawsql">>, <<"select * from \"t1/b\"">>}]),
|
||||
{ok, #{code := 0, data := Rules8}} = emqx_rule_engine_api:list_rules(#{}, Params5),
|
||||
?assert(lists:all(fun(#{id := ID}) -> ID =/= RuleID end, Rules8)),
|
||||
?assertEqual(1, length(Rules1) - length(Rules8)),
|
||||
|
||||
Params6 = [{<<"_like_description">>,<<"rule">>}, {<<"enable_paging">>, true}],
|
||||
{ok, #{code := 0, data := Rules9}} = emqx_rule_engine_api:list_rules(#{}, Params6),
|
||||
?assertEqual(length(Rules1), length(Rules9)),
|
||||
{ok, #{code := 0}} = emqx_rule_engine_api:update_rule(#{id => RuleID},
|
||||
[{<<"description">>, <<"not me">>}]),
|
||||
{ok, #{code := 0, data := Rules10}} = emqx_rule_engine_api:list_rules(#{}, Params6),
|
||||
?assert(lists:all(fun(#{id := ID}) -> ID =/= RuleID end, Rules10)),
|
||||
?assertEqual(1, length(Rules1) - length(Rules10)),
|
||||
|
||||
lists:foreach(fun(ID) ->
|
||||
?assertMatch({ok, #{code := 0}}, emqx_rule_engine_api:delete_rule(#{id => ID}, []))
|
||||
end, AddIds),
|
||||
ok.
|
||||
|
||||
t_list_actions_api(_Config) ->
|
||||
{ok, #{code := 0, data := Actions}} = emqx_rule_engine_api:list_actions(#{}, []),
|
||||
%ct:pal("RList : ~p", [Actions]),
|
||||
|
|
|
@ -72,7 +72,7 @@
|
|||
enable_pipelining => #{order => 5,
|
||||
type => boolean,
|
||||
default => true,
|
||||
title => #{en => <<"Enable Pipelining">>, zh => <<"Enable Pipelining"/utf8>>},
|
||||
title => #{en => <<"Enable Pipelining">>, zh => <<"开启 Pipelining"/utf8>>},
|
||||
description => #{en => <<"Whether to enable HTTP Pipelining">>,
|
||||
zh => <<"是否开启 HTTP Pipelining"/utf8>>}
|
||||
},
|
||||
|
|
31
bin/emqx
31
bin/emqx
|
@ -8,6 +8,11 @@ RUNNER_ROOT_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")"/..; pwd -P)"
|
|||
# shellcheck disable=SC1090
|
||||
. "$RUNNER_ROOT_DIR"/releases/emqx_vars
|
||||
|
||||
EMQX_LICENSE_CONF=''
|
||||
REL_NAME="emqx"
|
||||
ERTS_PATH="$RUNNER_ROOT_DIR/erts-$ERTS_VSN/bin"
|
||||
export EMQX_DESCRIPTION
|
||||
|
||||
RUNNER_SCRIPT="$RUNNER_BIN_DIR/$REL_NAME"
|
||||
CODE_LOADING_MODE="${CODE_LOADING_MODE:-embedded}"
|
||||
REL_DIR="$RUNNER_ROOT_DIR/releases/$REL_VSN"
|
||||
|
@ -192,6 +197,7 @@ usage() {
|
|||
echo " Up/Down-grade: upgrade | downgrade | install | uninstall"
|
||||
echo " Install info: ertspath | root_dir | versions"
|
||||
echo " Runtime info: pid | ping | versions"
|
||||
echo " Config check: check_conf"
|
||||
echo " Advanced: console_clean | escript | rpc | rpcterms | eval"
|
||||
echo ''
|
||||
echo "Execute '$REL_NAME COMMAND help' for more information"
|
||||
|
@ -334,9 +340,12 @@ trim() {
|
|||
|
||||
# Function to generate app.config and vm.args
|
||||
generate_config() {
|
||||
## Delete the *.siz files first or it cann't start after
|
||||
## changing the config 'log.rotation.size'
|
||||
rm -rf "${RUNNER_LOG_DIR}"/*.siz
|
||||
check_only="$1"
|
||||
if [ "$check_only" != "check_only" ]; then
|
||||
## Delete the *.siz files first or it cann't start after
|
||||
## changing the config 'log.rotation.size'
|
||||
rm -rf "${RUNNER_LOG_DIR}"/*.siz
|
||||
fi
|
||||
|
||||
set +e
|
||||
if [ "${EMQX_LICENSE_CONF:-}" = "" ]; then
|
||||
|
@ -388,12 +397,19 @@ generate_config() {
|
|||
fi
|
||||
fi
|
||||
done
|
||||
mv -f "$TMP_ARG_FILE" "$CUTTLE_GEN_ARG_FILE"
|
||||
|
||||
if ! relx_nodetool chkconfig -config "$CONFIG_FILE"; then
|
||||
echoerr "Error reading $CONFIG_FILE"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ "$check_only" = "check_only" ]; then
|
||||
rm -f "$TMP_ARG_FILE"
|
||||
rm -f "$CUTTLE_GEN_ARG_FILE"
|
||||
rm -f "$CONFIG_FILE"
|
||||
else
|
||||
mv -f "$TMP_ARG_FILE" "$CUTTLE_GEN_ARG_FILE"
|
||||
fi
|
||||
}
|
||||
|
||||
# Call bootstrapd for daemon commands like start/stop/console
|
||||
|
@ -447,6 +463,9 @@ case "$1" in
|
|||
foreground)
|
||||
IS_BOOT_COMMAND='yes'
|
||||
;;
|
||||
check_conf)
|
||||
IS_BOOT_COMMAND='yes'
|
||||
;;
|
||||
esac
|
||||
|
||||
|
||||
|
@ -815,6 +834,10 @@ case "$1" in
|
|||
ertspath)
|
||||
echo "$ERTS_PATH"
|
||||
;;
|
||||
check_conf)
|
||||
generate_config "check_only"
|
||||
echo "$RUNNER_ETC_DIR/emqx.conf is ok"
|
||||
;;
|
||||
|
||||
ctl)
|
||||
assert_node_alive
|
||||
|
|
|
@ -0,0 +1,187 @@
|
|||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
# ==================================
|
||||
# RESCUE THE UNBOOTABLE EMQX CLUSTER
|
||||
# ==================================
|
||||
|
||||
## Global Vars
|
||||
# Steal from emqx_ctl
|
||||
THIS_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")" || true; pwd -P)"
|
||||
|
||||
usage() {
|
||||
local Script
|
||||
Script=$(basename "$0")
|
||||
|
||||
echo "
|
||||
RESCUE THE UNBOOTABLE EMQX CLUSTER
|
||||
|
||||
Use this script only when the entire cluster is stuck at booting & loading.
|
||||
|
||||
This script provides a list of methods to *hack* the DB of EMQX to bring back
|
||||
the cluster back to service but MAY come with some side effects including:
|
||||
|
||||
- Data loss
|
||||
- Inconsistent data in the cluster
|
||||
- Other undefined behaviors
|
||||
|
||||
*DO NOT* use this script unless you understand the consequences.
|
||||
*DO NOT* use this script when EMQX cluster is partitioned.
|
||||
|
||||
Use Case:
|
||||
|
||||
- Lost one node due to unrecoverable failures (hardware, cloud resource outage)
|
||||
and this node prevents other nodes in the cluster from starting.
|
||||
|
||||
Usage:
|
||||
|
||||
# For troubleshooting, find out all the tables that are pending at loading
|
||||
$Script pending-tables
|
||||
|
||||
# For troubleshooting, debug print detailed table info that is pending at loading.
|
||||
$Script table-details
|
||||
|
||||
# Force load one [Tab] or all pending tables from node local storage to bring this node up
|
||||
# Use local data as the data source for the pending tables, should bring up the node immediately and
|
||||
# spread the data to other nodes in the cluster.
|
||||
#
|
||||
# * Take effect immediately
|
||||
# * This is a node local change but the change will be lost after restart.
|
||||
$Script force-load [Tab]
|
||||
|
||||
# Remove Node from mnesia cluster.
|
||||
# Most likely will fail if the remote Node is unreachable.
|
||||
#
|
||||
# * This is a cluster wide schema change.
|
||||
$Script remove-node Node
|
||||
|
||||
# Set master node for distributed DB
|
||||
# The master node will be the data source for pending tables.
|
||||
#
|
||||
# * This is a node local change
|
||||
# * Node could be a remote Erlang node in the cluster or local erlang node
|
||||
# * Use command: 'unset-master' to rollback
|
||||
$Script set-master Node
|
||||
|
||||
# Unset master node for distributed DB, this is a node local change
|
||||
$Script unset-master
|
||||
|
||||
# Cheat the local node that RemoteNode is down so that it will not wait for it to come up.
|
||||
# Local node will take local data as the data source for pending tables and spread the data
|
||||
# to the other pending nodes.
|
||||
#
|
||||
# * Check EMQX logs to find out which remote node(s) the local node is waiting for
|
||||
# * To take effect, restart this EMQX node
|
||||
# * This is a node local setting
|
||||
|
||||
$Script lie-node-down RemoteNode
|
||||
|
||||
Tips:
|
||||
- Override local node name with envvar: \$EMQX_NODE_NAME
|
||||
"
|
||||
}
|
||||
|
||||
# Functions
|
||||
#
|
||||
print_pending_tables() {
|
||||
local erl_cmd='[ io:format("~p :: ~p~n", [T, maps:with([all_nodes, load_order, storage_type,
|
||||
active_replicas, local_content, load_by_force,
|
||||
load_node, load_reason, master_nodes]
|
||||
, maps:from_list(mnesia:table_info(T, all)))])
|
||||
|| T <- mnesia:system_info(local_tables), unknown =:= mnesia:table_info(T, load_node) ],
|
||||
ok
|
||||
'
|
||||
exec "$THIS_DIR/emqx" eval "$erl_cmd"
|
||||
}
|
||||
|
||||
print_details_per_table() {
|
||||
local erl_cmd='[ io:format("~p :: ~p~n", [T, mnesia:table_info(T, all)])
|
||||
|| T <- mnesia:system_info(local_tables), unknown =:= mnesia:table_info(T, load_node) ],
|
||||
ok
|
||||
'
|
||||
exec "$THIS_DIR/emqx" eval "$erl_cmd"
|
||||
}
|
||||
|
||||
force-load() {
|
||||
if [ $# -eq 1 ]; then
|
||||
local erl_cmd="mnesia:force_load_table(${1})"
|
||||
else
|
||||
local erl_cmd='[ {T, mnesia:force_load_table(T)}
|
||||
|| T <- mnesia:system_info(local_tables),
|
||||
unknown =:= mnesia:table_info(T, load_node)
|
||||
]
|
||||
'
|
||||
fi
|
||||
exec "$THIS_DIR/emqx" eval "$erl_cmd"
|
||||
}
|
||||
|
||||
remove-node() {
|
||||
local target_node=$1
|
||||
local erl_cmd="
|
||||
case [T || T <- mnesia:system_info(local_tables), unknown =:= mnesia:table_info(T, load_node)] of
|
||||
[] ->
|
||||
io:format(\"No table need to load\\n\"),
|
||||
skipped;
|
||||
TargetTables ->
|
||||
io:format(\"Going to remove node ${target_node} from schema of the tables:~n~p~n\", [TargetTables]),
|
||||
case io:read(\"confirm? [yes.] OR Ctrl-D to skip: \") of
|
||||
{ok, yes} ->
|
||||
lists:map(fun(T) ->
|
||||
mnesia:force_load_table(T),
|
||||
{T, mnesia:del_table_copy(T, '${target_node}') }
|
||||
end, TargetTables);
|
||||
eof -> skipped;
|
||||
R -> {skipped, R}
|
||||
end
|
||||
end
|
||||
"
|
||||
exec "$THIS_DIR/emqx" eval "$erl_cmd"
|
||||
}
|
||||
|
||||
set-master-node() {
|
||||
if [ $# -eq 1 ]; then
|
||||
local erl_cmd="mnesia:set_master_nodes(['${1}']), mnesia_recover:dump_decision_tab()"
|
||||
else
|
||||
local erl_cmd="mnesia:set_master_nodes([]), mnesia_recover:dump_decision_tab()"
|
||||
fi
|
||||
|
||||
exec "$THIS_DIR/emqx" eval "$erl_cmd"
|
||||
}
|
||||
|
||||
lie-node-down() {
|
||||
if [ $# -eq 1 ]; then
|
||||
local erl_cmd="mnesia_recover:log_mnesia_down('${1}'), mnesia_recover:dump_decision_tab()"
|
||||
exec "$THIS_DIR/emqx" eval "$erl_cmd"
|
||||
else
|
||||
usage
|
||||
fi
|
||||
}
|
||||
|
||||
|
||||
CMD=${1:-usage}
|
||||
[ $# -gt 0 ] && shift 1
|
||||
|
||||
case "$CMD" in
|
||||
force-load)
|
||||
force-load "$@"
|
||||
;;
|
||||
remove-node)
|
||||
remove-node "$@"
|
||||
;;
|
||||
pending-tables)
|
||||
print_pending_tables
|
||||
;;
|
||||
table-details)
|
||||
print_details_per_table
|
||||
;;
|
||||
set-master)
|
||||
set-master-node "$@"
|
||||
;;
|
||||
unset-master)
|
||||
set-master-node
|
||||
;;
|
||||
lie-node-down)
|
||||
lie-node-down "$@"
|
||||
;;
|
||||
*)
|
||||
usage
|
||||
esac
|
|
@ -12,12 +12,10 @@ RUNNER_LIB_DIR="{{ runner_lib_dir }}"
|
|||
RUNNER_ETC_DIR="{{ runner_etc_dir }}"
|
||||
RUNNER_DATA_DIR="{{ runner_data_dir }}"
|
||||
RUNNER_USER="{{ runner_user }}"
|
||||
EMQX_DESCRIPTION='{{ emqx_description }}'
|
||||
|
||||
EMQX_LICENSE_CONF=''
|
||||
export EMQX_DESCRIPTION='{{ emqx_description }}'
|
||||
## Warning: DO NOT create new variables using the above vars in this file,
|
||||
## as the vars above can be overwritten by the relup scripts later, like:
|
||||
## REL_VSN="new_version"
|
||||
|
||||
## computed vars
|
||||
REL_NAME="emqx"
|
||||
ERTS_PATH="$RUNNER_ROOT_DIR/erts-$ERTS_VSN/bin"
|
||||
|
||||
## updated vars here
|
||||
## overwritten vars here
|
||||
|
|
|
@ -1472,7 +1472,7 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
|
|||
## are used during server authentication and when building the client certificate chain.
|
||||
##
|
||||
## Value: File
|
||||
## listener.ssl.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
|
||||
listener.ssl.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
|
||||
|
||||
## The Ephemeral Diffie-Helman key exchange is a very effective way of
|
||||
## ensuring Forward Secrecy by exchanging a set of keys that never hit
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
|
||||
-ifndef(EMQX_ENTERPRISE).
|
||||
|
||||
-define(EMQX_RELEASE, {opensource, "4.3.16"}).
|
||||
-define(EMQX_RELEASE, {opensource, "4.3.17-beta.1"}).
|
||||
|
||||
-else.
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([ compile/1
|
||||
|
@ -45,6 +46,7 @@
|
|||
|
||||
load(RawRules) ->
|
||||
{PubRules, SubRules} = compile(RawRules),
|
||||
?LOG(info, "[Rewrite] Load rule pub ~0p sub ~0p", [PubRules, SubRules]),
|
||||
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, 1000),
|
||||
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, 1000),
|
||||
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, 1000).
|
||||
|
@ -62,6 +64,7 @@ rewrite_publish(Message = #message{topic = Topic}, Rules) ->
|
|||
{ok, Message#message{topic = match_and_rewrite(Topic, Rules, Binds)}}.
|
||||
|
||||
unload(_) ->
|
||||
?LOG(info, "[Rewrite] Unload"),
|
||||
emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}),
|
||||
emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
|
||||
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
|
||||
|
@ -93,16 +96,19 @@ match_and_rewrite(Topic, [{rewrite, Filter, MP, Dest} | Rules], Binds) ->
|
|||
end.
|
||||
|
||||
rewrite(Topic, MP, Dest, Binds) ->
|
||||
case re:run(Topic, MP, [{capture, all_but_first, list}]) of
|
||||
{match, Captured} ->
|
||||
Vars = lists:zip(["\\$" ++ integer_to_list(I)
|
||||
|| I <- lists:seq(1, length(Captured))], Captured),
|
||||
iolist_to_binary(lists:foldl(
|
||||
fun({Var, Val}, Acc) ->
|
||||
re:replace(Acc, Var, Val, [global])
|
||||
end, Dest, Binds ++ Vars));
|
||||
nomatch -> Topic
|
||||
end.
|
||||
NewTopic =
|
||||
case re:run(Topic, MP, [{capture, all_but_first, list}]) of
|
||||
{match, Captured} ->
|
||||
Vars = lists:zip(["\\$" ++ integer_to_list(I)
|
||||
|| I <- lists:seq(1, length(Captured))], Captured),
|
||||
iolist_to_binary(lists:foldl(
|
||||
fun({Var, Val}, Acc) ->
|
||||
re:replace(Acc, Var, Val, [global])
|
||||
end, Dest, Binds ++ Vars));
|
||||
nomatch -> Topic
|
||||
end,
|
||||
?LOG(debug, "[Rewrite] topic ~0p, params: ~0p dest topic: ~p", [Topic, Binds, NewTopic]),
|
||||
NewTopic.
|
||||
|
||||
fill_client_binds(#{clientid := ClientId, username := Username}) ->
|
||||
filter_client_binds([{"%c", ClientId}, {"%u", Username}]);
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_modules,
|
||||
[{description, "EMQ X Module Management"},
|
||||
{vsn, "4.3.7"},
|
||||
{vsn, "4.3.8"},
|
||||
{modules, []},
|
||||
{applications, [kernel,stdlib]},
|
||||
{mod, {emqx_modules_app, []}},
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
%% -*- mode: erlang -*-
|
||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.3.6",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||
[{"4.3.7",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.6",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.5",
|
||||
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||
|
@ -31,7 +32,8 @@
|
|||
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.3.6",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||
[{"4.3.7",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.6",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.5",
|
||||
[{load_module,emqx_modules,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]},
|
||||
|
|
|
@ -46,12 +46,12 @@
|
|||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
|
||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.6"}}}
|
||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.10"}}}
|
||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.11"}}}
|
||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.2"}}}
|
||||
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}}
|
||||
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
|
||||
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}
|
||||
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.4"}}}
|
||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
|
||||
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3.1"}}}
|
||||
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}}
|
||||
|
|
|
@ -344,6 +344,7 @@ relx_overlay(ReleaseType) ->
|
|||
, {template, "data/emqx_vars", "releases/emqx_vars"}
|
||||
, {copy, "bin/emqx", "bin/emqx"}
|
||||
, {copy, "bin/emqx_ctl", "bin/emqx_ctl"}
|
||||
, {copy, "bin/emqx_cluster_rescue", "bin/emqx_cluster_rescue"}
|
||||
, {copy, "bin/node_dump", "bin/node_dump"}
|
||||
, {copy, "bin/install_upgrade.escript", "bin/install_upgrade.escript"}
|
||||
, {copy, "bin/emqx", "bin/emqx-{{release_version}}"} %% for relup
|
||||
|
|
|
@ -8,8 +8,8 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
|
|||
PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh)}"
|
||||
case "${PKG_VSN}" in
|
||||
4.3*)
|
||||
EMQX_CE_DASHBOARD_VERSION='v4.3.8'
|
||||
EMQX_EE_DASHBOARD_VERSION='v4.3.21'
|
||||
EMQX_CE_DASHBOARD_VERSION='v4.3.9'
|
||||
EMQX_EE_DASHBOARD_VERSION='v4.3.22'
|
||||
;;
|
||||
*)
|
||||
echo "Unsupported version $PKG_VSN" >&2
|
||||
|
|
|
@ -14,15 +14,15 @@ fi
|
|||
|
||||
case $PROFILE in
|
||||
"emqx")
|
||||
DIR='broker'
|
||||
DIR='emqx-ce'
|
||||
EDITION='community'
|
||||
;;
|
||||
"emqx-ee")
|
||||
DIR='enterprise'
|
||||
DIR='emqx-ee'
|
||||
EDITION='enterprise'
|
||||
;;
|
||||
"emqx-edge")
|
||||
DIR='edge'
|
||||
DIR='emqx-edge'
|
||||
EDITION='edge'
|
||||
;;
|
||||
esac
|
||||
|
@ -55,7 +55,7 @@ pushd _upgrade_base
|
|||
|
||||
for tag in $(../scripts/relup-base-vsns.sh $EDITION | xargs echo -n); do
|
||||
filename="$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip"
|
||||
url="https://www.emqx.com/downloads/$DIR/${tag#[e|v]}/$filename"
|
||||
url="https://packages.emqx.io/$DIR/$tag/$filename"
|
||||
echo "downloading base package from ${url} ..."
|
||||
if [ ! -f "$filename" ] && curl -L -I -m 10 -o /dev/null -s -w "%{http_code}" "${url}" | grep -q -oE "^[23]+" ; then
|
||||
curl -L -o "${filename}" "${url}"
|
||||
|
|
|
@ -3,11 +3,12 @@
|
|||
{VSN,
|
||||
[{"4.3.17",
|
||||
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.16",
|
||||
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||
|
@ -34,7 +35,7 @@
|
|||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
|
@ -63,7 +64,7 @@
|
|||
{load_module,emqx_sys,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||
|
@ -86,7 +87,7 @@
|
|||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
|
@ -119,7 +120,7 @@
|
|||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
|
@ -158,7 +159,7 @@
|
|||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||
|
@ -198,7 +199,7 @@
|
|||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||
|
@ -238,7 +239,7 @@
|
|||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||
|
@ -282,7 +283,7 @@
|
|||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||
|
@ -325,7 +326,7 @@
|
|||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||
|
@ -369,7 +370,7 @@
|
|||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||
|
@ -412,7 +413,7 @@
|
|||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||
|
@ -476,7 +477,7 @@
|
|||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||
|
@ -518,7 +519,7 @@
|
|||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
|
@ -558,7 +559,7 @@
|
|||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
|
@ -599,7 +600,7 @@
|
|||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
|
@ -648,7 +649,7 @@
|
|||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
|
@ -675,11 +676,12 @@
|
|||
{<<".*">>,[]}],
|
||||
[{"4.3.17",
|
||||
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.16",
|
||||
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||
|
@ -708,7 +710,7 @@
|
|||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
|
@ -738,7 +740,7 @@
|
|||
{load_module,emqx_sys,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
|
||||
|
@ -758,7 +760,7 @@
|
|||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
|
@ -789,7 +791,7 @@
|
|||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
|
@ -826,7 +828,7 @@
|
|||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||
|
@ -864,7 +866,7 @@
|
|||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||
|
@ -902,7 +904,7 @@
|
|||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||
|
@ -944,7 +946,7 @@
|
|||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||
|
@ -984,7 +986,7 @@
|
|||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||
|
@ -1026,7 +1028,7 @@
|
|||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||
|
@ -1067,7 +1069,7 @@
|
|||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||
|
@ -1129,7 +1131,7 @@
|
|||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
|
@ -1169,7 +1171,7 @@
|
|||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
|
@ -1205,7 +1207,7 @@
|
|||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
|
@ -1244,7 +1246,7 @@
|
|||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
|
@ -1286,7 +1288,7 @@
|
|||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_broker_sup,brutal_purge,soft_purge,[]},
|
||||
{update,emqx_broker_sup,supervisor},
|
||||
{load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -164,27 +164,24 @@ ack_enabled() ->
|
|||
|
||||
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
|
||||
%% Deadlock otherwise
|
||||
SubPid ! {deliver, Topic, Msg},
|
||||
ok;
|
||||
send(SubPid, Topic, {deliver, Topic, Msg});
|
||||
%% return either 'ok' (when everything is fine) or 'error'
|
||||
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
||||
%% For QoS 0 message, send it as regular dispatch
|
||||
SubPid ! {deliver, Topic, Msg},
|
||||
ok;
|
||||
send(SubPid, Topic, {deliver, Topic, Msg});
|
||||
do_dispatch(SubPid, Group, Topic, Msg, Type) ->
|
||||
case ack_enabled() of
|
||||
true ->
|
||||
dispatch_with_ack(SubPid, Group, Topic, Msg, Type);
|
||||
false ->
|
||||
SubPid ! {deliver, Topic, Msg},
|
||||
ok
|
||||
send(SubPid, Topic, {deliver, Topic, Msg})
|
||||
end.
|
||||
|
||||
dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
|
||||
%% For QoS 1/2 message, expect an ack
|
||||
Ref = erlang:monitor(process, SubPid),
|
||||
Sender = self(),
|
||||
SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)},
|
||||
send(SubPid, Topic, {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}),
|
||||
Timeout = case Msg#message.qos of
|
||||
?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
|
||||
?QOS_2 -> infinity
|
||||
|
@ -210,6 +207,15 @@ dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
|
|||
_ = erlang:demonitor(Ref, [flush])
|
||||
end.
|
||||
|
||||
send(Pid, Topic, Msg) ->
|
||||
Node = node(Pid),
|
||||
if Node =:= node() ->
|
||||
Pid ! Msg;
|
||||
true ->
|
||||
emqx_rpc:cast(Topic, Node, erlang, send, [Pid, Msg])
|
||||
end,
|
||||
ok.
|
||||
|
||||
with_group_ack(Msg, Group, Type, Sender, Ref) ->
|
||||
emqx_message:set_headers(#{shared_dispatch_ack => {Sender, old_ref(Type, Group, Ref)}}, Msg).
|
||||
|
||||
|
|
Loading…
Reference in New Issue