diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index a4cac8965..ecca0dc8b 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -19,6 +19,10 @@ File format: - Fixed crash when shared persistent subscription [#8441] +### Enhancements +- HTTP API(GET /rules/) support for pagination and fuzzy filtering. [#8450] +- Add check_conf cli to check config format. [#8486] + ## v4.3.16 ### Enhancements diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index aa3aeb3af..3f420780f 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -122,7 +122,7 @@ format({_Subscriber, Topic, Options = #{share := Group}}) -> #{node => node(), topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS}; format({_Subscriber, Topic, Options}) -> QoS = maps:get(qos, Options), - #{node => node(), topic => Topic, clientid => maps:get(subid, Options), qos => QoS}. + #{node => node(), topic => Topic, clientid => maps:get(subid, Options, ""), qos => QoS}. %%-------------------------------------------------------------------- %% Query Function diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index be070305b..fd764bae2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -2,14 +2,16 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.3.11", - [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -162,14 +164,16 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.11", - [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 6587f9c5f..223c672e6 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -184,6 +184,17 @@ ]). -export([list_events/2]). +-export([query/3]). + +-define(RULE_QS_SCHEMA, {?RULE_TAB, + [ + {<<"enabled">>, atom}, + {<<"for">>, binary}, + {<<"_like_id">>, binary}, + {<<"_like_for">>, binary}, + {<<"_match_for">>, binary}, + {<<"_like_description">>, binary} + ]}). -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~s Not Found", [(ID)]))). -define(ERR_NO_ACTION(NAME), list_to_binary(io_lib:format("Action ~s Not Found", [(NAME)]))). @@ -261,8 +272,14 @@ update_rule(#{id := Id}, Params) -> return({error, 400, ?ERR_BADARGS(Reason)}) end. -list_rules(_Bindings, _Params) -> - return_all(emqx_rule_registry:get_rules_ordered_by_ts()). +list_rules(_Bindings, Params) -> + case proplists:get_value(<<"enable_paging">>, Params, true) of + true -> + SortFun = fun(#{created_at := C1}, #{created_at := C2}) -> C1 > C2 end, + return({ok, emqx_mgmt_api:node_query(node(), Params, ?RULE_QS_SCHEMA, {?MODULE, query}, SortFun)}); + false -> + return_all(emqx_rule_registry:get_rules_ordered_by_ts()) + end. show_rule(#{id := Id}, _Params) -> reply_with(fun emqx_rule_registry:get_rule/1, Id). @@ -454,6 +471,7 @@ record_to_map(#rule{id = Id, actions = Actions, on_action_failed = OnFailed, enabled = Enabled, + created_at = CreatedAt, description = Descr}) -> #{id => Id, for => Hook, @@ -462,6 +480,7 @@ record_to_map(#rule{id = Id, on_action_failed => OnFailed, metrics => get_rule_metrics(Id), enabled => Enabled, + created_at => CreatedAt, description => Descr }; @@ -599,3 +618,41 @@ get_action_metrics(Id) -> Res -> [maps:put(node, Node, Res)] end || Node <- ekka_mnesia:running_nodes()]). + +query({Qs, []}, Start, Limit) -> + Ms = qs2ms(Qs), + emqx_mgmt_api:select_table(?RULE_TAB, Ms, Start, Limit, fun record_to_map/1); + +query({Qs, Fuzzy}, Start, Limit) -> + Ms = qs2ms(Qs), + MatchFun = match_fun(Ms, Fuzzy), + emqx_mgmt_api:traverse_table(?RULE_TAB, MatchFun, Start, Limit, fun record_to_map/1). + +qs2ms(Qs) -> + Init = #rule{for = '_', enabled = '_', _ = '_'}, + MatchHead = lists:foldl(fun(Q, Acc) -> match_ms(Q, Acc) end, Init, Qs), + [{MatchHead, [], ['$_']}]. + +match_ms({for, '=:=', Value}, MatchHead) -> MatchHead#rule{for = Value}; +match_ms({enabled, '=:=', Value}, MatchHead) -> MatchHead#rule{enabled = Value}; +match_ms(_, MatchHead) -> MatchHead. + +match_fun(Ms, Fuzzy) -> + MsC = ets:match_spec_compile(Ms), + fun(Rows) -> + Ls = ets:match_spec_run(Rows, MsC), + lists:filter(fun(E) -> run_fuzzy_match(E, Fuzzy) end, Ls) + end. + +run_fuzzy_match(_, []) -> true; +run_fuzzy_match(E = #rule{id = Id}, [{id, like, Pattern}|Fuzzy]) -> + binary:match(Id, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy); +run_fuzzy_match(E = #rule{description = Desc}, [{description, like, Pattern}|Fuzzy]) -> + binary:match(Desc, Pattern) /= nomatch andalso run_fuzzy_match(E, Fuzzy); +run_fuzzy_match(E = #rule{for = Topics}, [{for, match, Pattern}|Fuzzy]) -> + lists:any(fun(For) -> emqx_topic:match(For, Pattern) end, Topics) + andalso run_fuzzy_match(E, Fuzzy); +run_fuzzy_match(E = #rule{for = Topics}, [{for, like, Pattern}|Fuzzy]) -> + lists:any(fun(For) -> binary:match(For, Pattern) /= nomatch end, Topics) + andalso run_fuzzy_match(E, Fuzzy); +run_fuzzy_match(_E, [{_Key, like, _SubStr}| _Fuzzy]) -> false. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 663eddd5a..9031c3df8 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -62,7 +62,8 @@ groups() -> t_show_action_api, t_crud_resources_api, t_list_resource_types_api, - t_show_resource_type_api + t_show_resource_type_api, + t_list_rule_api ]}, {cli, [], [t_rules_cli, @@ -513,6 +514,74 @@ t_crud_rule_api(_Config) -> ?assertMatch({ok, #{code := 404, message := _Message}}, NotFound), ok. +t_list_rule_api(_Config) -> + AddIds = + lists:map(fun(Seq) -> + SeqBin = integer_to_binary(Seq), + {ok, #{code := 0, data := #{id := Id}}} = + emqx_rule_engine_api:create_rule(#{}, + [{<<"name">>, <<"debug-rule-", SeqBin/binary>>}, + {<<"rawsql">>, <<"select * from \"t/a/", SeqBin/binary, "\"">>}, + {<<"actions">>, [[{<<"name">>,<<"inspect">>}, {<<"params">>,[{<<"arg1">>,1}]}]]}, + {<<"description">>, <<"debug rule desc ", SeqBin/binary>>}]), + Id + end, lists:seq(1, 20)), + + {ok, #{code := 0, data := Rules11}} = emqx_rule_engine_api:list_rules(#{}, + [{<<"_limit">>,<<"10">>}, {<<"_page">>, <<"1">>}, {<<"enable_paging">>, true}]), + ?assertEqual(10, length(Rules11)), + {ok, #{code := 0, data := Rules12}} = emqx_rule_engine_api:list_rules(#{}, + [{<<"_limit">>,<<"10">>}, {<<"_page">>, <<"2">>}, {<<"enable_paging">>, true}]), + ?assertEqual(10, length(Rules12)), + Rules1 = Rules11 ++ Rules12, + + [RuleID | _] = AddIds, + {ok, #{code := 0}} = emqx_rule_engine_api:update_rule(#{id => RuleID}, + [{<<"enabled">>, false}]), + Params1 = [{<<"enabled">>,<<"true">>}, {<<"enable_paging">>, true}], + {ok, #{code := 0, data := Rules2}} = emqx_rule_engine_api:list_rules(#{}, Params1), + ?assert(lists:all(fun(#{id := ID}) -> ID =/= RuleID end, Rules2)), + + Params2 = [{<<"for">>, RuleID}, {<<"enable_paging">>, true}], + {ok, #{code := 0, data := Rules3}} = emqx_rule_engine_api:list_rules(#{}, Params2), + ?assert(lists:all(fun(#{id := ID}) -> ID =:= RuleID end, Rules3)), + + Params3 = [{<<"_like_id">>,<<"rule:">>}, {<<"enable_paging">>, true}], + {ok, #{code := 0, data := Rules4}} = emqx_rule_engine_api:list_rules(#{}, Params3), + ?assertEqual(length(Rules1), length(Rules4)), + + Params4 = [{<<"_like_for">>,<<"t/a/">>}, {<<"enable_paging">>, true}], + {ok, #{code := 0, data := Rules5}} = emqx_rule_engine_api:list_rules(#{}, Params4), + ?assertEqual(length(Rules1), length(Rules5)), + {ok, #{code := 0}} = emqx_rule_engine_api:update_rule(#{id => RuleID}, + [{<<"rawsql">>, <<"select * from \"t/b/c\"">>}]), + {ok, #{code := 0, data := Rules6}} = emqx_rule_engine_api:list_rules(#{}, Params4), + ?assert(lists:all(fun(#{id := ID}) -> ID =/= RuleID end, Rules6)), + ?assertEqual(1, length(Rules1) - length(Rules6)), + + Params5 = [{<<"_match_for">>,<<"t/+/+">>}, {<<"enable_paging">>, true}], + {ok, #{code := 0, data := Rules7}} = emqx_rule_engine_api:list_rules(#{}, Params5), + ?assertEqual(length(Rules1), length(Rules7)), + {ok, #{code := 0}} = emqx_rule_engine_api:update_rule(#{id => RuleID}, + [{<<"rawsql">>, <<"select * from \"t1/b\"">>}]), + {ok, #{code := 0, data := Rules8}} = emqx_rule_engine_api:list_rules(#{}, Params5), + ?assert(lists:all(fun(#{id := ID}) -> ID =/= RuleID end, Rules8)), + ?assertEqual(1, length(Rules1) - length(Rules8)), + + Params6 = [{<<"_like_description">>,<<"rule">>}, {<<"enable_paging">>, true}], + {ok, #{code := 0, data := Rules9}} = emqx_rule_engine_api:list_rules(#{}, Params6), + ?assertEqual(length(Rules1), length(Rules9)), + {ok, #{code := 0}} = emqx_rule_engine_api:update_rule(#{id => RuleID}, + [{<<"description">>, <<"not me">>}]), + {ok, #{code := 0, data := Rules10}} = emqx_rule_engine_api:list_rules(#{}, Params6), + ?assert(lists:all(fun(#{id := ID}) -> ID =/= RuleID end, Rules10)), + ?assertEqual(1, length(Rules1) - length(Rules10)), + + lists:foreach(fun(ID) -> + ?assertMatch({ok, #{code := 0}}, emqx_rule_engine_api:delete_rule(#{id => ID}, [])) + end, AddIds), + ok. + t_list_actions_api(_Config) -> {ok, #{code := 0, data := Actions}} = emqx_rule_engine_api:list_actions(#{}, []), %ct:pal("RList : ~p", [Actions]), diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index 67826fc9c..00094d46d 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -72,7 +72,7 @@ enable_pipelining => #{order => 5, type => boolean, default => true, - title => #{en => <<"Enable Pipelining">>, zh => <<"Enable Pipelining"/utf8>>}, + title => #{en => <<"Enable Pipelining">>, zh => <<"开启 Pipelining"/utf8>>}, description => #{en => <<"Whether to enable HTTP Pipelining">>, zh => <<"是否开启 HTTP Pipelining"/utf8>>} }, diff --git a/bin/emqx b/bin/emqx index 7eed9ab3c..503e7059e 100755 --- a/bin/emqx +++ b/bin/emqx @@ -8,6 +8,11 @@ RUNNER_ROOT_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")"/..; pwd -P)" # shellcheck disable=SC1090 . "$RUNNER_ROOT_DIR"/releases/emqx_vars +EMQX_LICENSE_CONF='' +REL_NAME="emqx" +ERTS_PATH="$RUNNER_ROOT_DIR/erts-$ERTS_VSN/bin" +export EMQX_DESCRIPTION + RUNNER_SCRIPT="$RUNNER_BIN_DIR/$REL_NAME" CODE_LOADING_MODE="${CODE_LOADING_MODE:-embedded}" REL_DIR="$RUNNER_ROOT_DIR/releases/$REL_VSN" @@ -192,6 +197,7 @@ usage() { echo " Up/Down-grade: upgrade | downgrade | install | uninstall" echo " Install info: ertspath | root_dir | versions" echo " Runtime info: pid | ping | versions" + echo " Config check: check_conf" echo " Advanced: console_clean | escript | rpc | rpcterms | eval" echo '' echo "Execute '$REL_NAME COMMAND help' for more information" @@ -334,9 +340,12 @@ trim() { # Function to generate app.config and vm.args generate_config() { - ## Delete the *.siz files first or it cann't start after - ## changing the config 'log.rotation.size' - rm -rf "${RUNNER_LOG_DIR}"/*.siz + check_only="$1" + if [ "$check_only" != "check_only" ]; then + ## Delete the *.siz files first or it cann't start after + ## changing the config 'log.rotation.size' + rm -rf "${RUNNER_LOG_DIR}"/*.siz + fi set +e if [ "${EMQX_LICENSE_CONF:-}" = "" ]; then @@ -388,12 +397,19 @@ generate_config() { fi fi done - mv -f "$TMP_ARG_FILE" "$CUTTLE_GEN_ARG_FILE" if ! relx_nodetool chkconfig -config "$CONFIG_FILE"; then echoerr "Error reading $CONFIG_FILE" exit 1 fi + + if [ "$check_only" = "check_only" ]; then + rm -f "$TMP_ARG_FILE" + rm -f "$CUTTLE_GEN_ARG_FILE" + rm -f "$CONFIG_FILE" + else + mv -f "$TMP_ARG_FILE" "$CUTTLE_GEN_ARG_FILE" + fi } # Call bootstrapd for daemon commands like start/stop/console @@ -447,6 +463,9 @@ case "$1" in foreground) IS_BOOT_COMMAND='yes' ;; + check_conf) + IS_BOOT_COMMAND='yes' + ;; esac @@ -815,6 +834,10 @@ case "$1" in ertspath) echo "$ERTS_PATH" ;; + check_conf) + generate_config "check_only" + echo "$RUNNER_ETC_DIR/emqx.conf is ok" + ;; ctl) assert_node_alive diff --git a/bin/emqx_cluster_rescue b/bin/emqx_cluster_rescue new file mode 100755 index 000000000..f399dbbee --- /dev/null +++ b/bin/emqx_cluster_rescue @@ -0,0 +1,187 @@ +#!/usr/bin/env bash +set -euo pipefail +# ================================== +# RESCUE THE UNBOOTABLE EMQX CLUSTER +# ================================== + +## Global Vars +# Steal from emqx_ctl +THIS_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")" || true; pwd -P)" + +usage() { + local Script + Script=$(basename "$0") + + echo " + RESCUE THE UNBOOTABLE EMQX CLUSTER + + Use this script only when the entire cluster is stuck at booting & loading. + + This script provides a list of methods to *hack* the DB of EMQX to bring back + the cluster back to service but MAY come with some side effects including: + + - Data loss + - Inconsistent data in the cluster + - Other undefined behaviors + + *DO NOT* use this script unless you understand the consequences. + *DO NOT* use this script when EMQX cluster is partitioned. + + Use Case: + + - Lost one node due to unrecoverable failures (hardware, cloud resource outage) + and this node prevents other nodes in the cluster from starting. + +Usage: + + # For troubleshooting, find out all the tables that are pending at loading + $Script pending-tables + + # For troubleshooting, debug print detailed table info that is pending at loading. + $Script table-details + + # Force load one [Tab] or all pending tables from node local storage to bring this node up + # Use local data as the data source for the pending tables, should bring up the node immediately and + # spread the data to other nodes in the cluster. + # + # * Take effect immediately + # * This is a node local change but the change will be lost after restart. + $Script force-load [Tab] + + # Remove Node from mnesia cluster. + # Most likely will fail if the remote Node is unreachable. + # + # * This is a cluster wide schema change. + $Script remove-node Node + + # Set master node for distributed DB + # The master node will be the data source for pending tables. + # + # * This is a node local change + # * Node could be a remote Erlang node in the cluster or local erlang node + # * Use command: 'unset-master' to rollback + $Script set-master Node + + # Unset master node for distributed DB, this is a node local change + $Script unset-master + + # Cheat the local node that RemoteNode is down so that it will not wait for it to come up. + # Local node will take local data as the data source for pending tables and spread the data + # to the other pending nodes. + # + # * Check EMQX logs to find out which remote node(s) the local node is waiting for + # * To take effect, restart this EMQX node + # * This is a node local setting + + $Script lie-node-down RemoteNode + +Tips: + - Override local node name with envvar: \$EMQX_NODE_NAME + " +} + +# Functions +# +print_pending_tables() { + local erl_cmd='[ io:format("~p :: ~p~n", [T, maps:with([all_nodes, load_order, storage_type, + active_replicas, local_content, load_by_force, + load_node, load_reason, master_nodes] + , maps:from_list(mnesia:table_info(T, all)))]) + || T <- mnesia:system_info(local_tables), unknown =:= mnesia:table_info(T, load_node) ], + ok + ' + exec "$THIS_DIR/emqx" eval "$erl_cmd" +} + +print_details_per_table() { + local erl_cmd='[ io:format("~p :: ~p~n", [T, mnesia:table_info(T, all)]) + || T <- mnesia:system_info(local_tables), unknown =:= mnesia:table_info(T, load_node) ], + ok + ' + exec "$THIS_DIR/emqx" eval "$erl_cmd" +} + +force-load() { + if [ $# -eq 1 ]; then + local erl_cmd="mnesia:force_load_table(${1})" + else + local erl_cmd='[ {T, mnesia:force_load_table(T)} + || T <- mnesia:system_info(local_tables), + unknown =:= mnesia:table_info(T, load_node) + ] + ' + fi + exec "$THIS_DIR/emqx" eval "$erl_cmd" +} + +remove-node() { + local target_node=$1 + local erl_cmd=" + case [T || T <- mnesia:system_info(local_tables), unknown =:= mnesia:table_info(T, load_node)] of + [] -> + io:format(\"No table need to load\\n\"), + skipped; + TargetTables -> + io:format(\"Going to remove node ${target_node} from schema of the tables:~n~p~n\", [TargetTables]), + case io:read(\"confirm? [yes.] OR Ctrl-D to skip: \") of + {ok, yes} -> + lists:map(fun(T) -> + mnesia:force_load_table(T), + {T, mnesia:del_table_copy(T, '${target_node}') } + end, TargetTables); + eof -> skipped; + R -> {skipped, R} + end + end + " + exec "$THIS_DIR/emqx" eval "$erl_cmd" +} + +set-master-node() { + if [ $# -eq 1 ]; then + local erl_cmd="mnesia:set_master_nodes(['${1}']), mnesia_recover:dump_decision_tab()" + else + local erl_cmd="mnesia:set_master_nodes([]), mnesia_recover:dump_decision_tab()" + fi + + exec "$THIS_DIR/emqx" eval "$erl_cmd" +} + +lie-node-down() { + if [ $# -eq 1 ]; then + local erl_cmd="mnesia_recover:log_mnesia_down('${1}'), mnesia_recover:dump_decision_tab()" + exec "$THIS_DIR/emqx" eval "$erl_cmd" + else + usage + fi +} + + +CMD=${1:-usage} +[ $# -gt 0 ] && shift 1 + +case "$CMD" in + force-load) + force-load "$@" + ;; + remove-node) + remove-node "$@" + ;; + pending-tables) + print_pending_tables + ;; + table-details) + print_details_per_table + ;; + set-master) + set-master-node "$@" + ;; + unset-master) + set-master-node + ;; + lie-node-down) + lie-node-down "$@" + ;; + *) + usage +esac diff --git a/data/emqx_vars b/data/emqx_vars index a872da03a..8ca6bf22d 100644 --- a/data/emqx_vars +++ b/data/emqx_vars @@ -12,12 +12,10 @@ RUNNER_LIB_DIR="{{ runner_lib_dir }}" RUNNER_ETC_DIR="{{ runner_etc_dir }}" RUNNER_DATA_DIR="{{ runner_data_dir }}" RUNNER_USER="{{ runner_user }}" +EMQX_DESCRIPTION='{{ emqx_description }}' -EMQX_LICENSE_CONF='' -export EMQX_DESCRIPTION='{{ emqx_description }}' +## Warning: DO NOT create new variables using the above vars in this file, +## as the vars above can be overwritten by the relup scripts later, like: +## REL_VSN="new_version" -## computed vars -REL_NAME="emqx" -ERTS_PATH="$RUNNER_ROOT_DIR/erts-$ERTS_VSN/bin" - -## updated vars here +## overwritten vars here diff --git a/etc/emqx.conf b/etc/emqx.conf index 94d5cbecb..098c908d0 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1472,7 +1472,7 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## are used during server authentication and when building the client certificate chain. ## ## Value: File -## listener.ssl.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem +listener.ssl.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem ## The Ephemeral Diffie-Helman key exchange is a very effective way of ## ensuring Forward Secrecy by exchanging a set of keys that never hit diff --git a/include/emqx_release.hrl b/include/emqx_release.hrl index 66710d35e..9506f544e 100644 --- a/include/emqx_release.hrl +++ b/include/emqx_release.hrl @@ -29,7 +29,7 @@ -ifndef(EMQX_ENTERPRISE). --define(EMQX_RELEASE, {opensource, "4.3.16"}). +-define(EMQX_RELEASE, {opensource, "4.3.17-beta.1"}). -else. diff --git a/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl b/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl index 73f4be5be..c8bd3a967 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_rewrite.erl @@ -20,6 +20,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). -ifdef(TEST). -export([ compile/1 @@ -45,6 +46,7 @@ load(RawRules) -> {PubRules, SubRules} = compile(RawRules), + ?LOG(info, "[Rewrite] Load rule pub ~0p sub ~0p", [PubRules, SubRules]), emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, 1000), emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, 1000), emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, 1000). @@ -62,6 +64,7 @@ rewrite_publish(Message = #message{topic = Topic}, Rules) -> {ok, Message#message{topic = match_and_rewrite(Topic, Rules, Binds)}}. unload(_) -> + ?LOG(info, "[Rewrite] Unload"), emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}), emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}). @@ -93,16 +96,19 @@ match_and_rewrite(Topic, [{rewrite, Filter, MP, Dest} | Rules], Binds) -> end. rewrite(Topic, MP, Dest, Binds) -> - case re:run(Topic, MP, [{capture, all_but_first, list}]) of - {match, Captured} -> - Vars = lists:zip(["\\$" ++ integer_to_list(I) - || I <- lists:seq(1, length(Captured))], Captured), - iolist_to_binary(lists:foldl( - fun({Var, Val}, Acc) -> - re:replace(Acc, Var, Val, [global]) - end, Dest, Binds ++ Vars)); - nomatch -> Topic - end. + NewTopic = + case re:run(Topic, MP, [{capture, all_but_first, list}]) of + {match, Captured} -> + Vars = lists:zip(["\\$" ++ integer_to_list(I) + || I <- lists:seq(1, length(Captured))], Captured), + iolist_to_binary(lists:foldl( + fun({Var, Val}, Acc) -> + re:replace(Acc, Var, Val, [global]) + end, Dest, Binds ++ Vars)); + nomatch -> Topic + end, + ?LOG(debug, "[Rewrite] topic ~0p, params: ~0p dest topic: ~p", [Topic, Binds, NewTopic]), + NewTopic. fill_client_binds(#{clientid := ClientId, username := Username}) -> filter_client_binds([{"%c", ClientId}, {"%u", Username}]); diff --git a/lib-ce/emqx_modules/src/emqx_modules.app.src b/lib-ce/emqx_modules/src/emqx_modules.app.src index 49af5d3ea..9db13dbc8 100644 --- a/lib-ce/emqx_modules/src/emqx_modules.app.src +++ b/lib-ce/emqx_modules/src/emqx_modules.app.src @@ -1,6 +1,6 @@ {application, emqx_modules, [{description, "EMQ X Module Management"}, - {vsn, "4.3.7"}, + {vsn, "4.3.8"}, {modules, []}, {applications, [kernel,stdlib]}, {mod, {emqx_modules_app, []}}, diff --git a/lib-ce/emqx_modules/src/emqx_modules.appup.src b/lib-ce/emqx_modules/src/emqx_modules.appup.src index d32cc7286..5b8fb4434 100644 --- a/lib-ce/emqx_modules/src/emqx_modules.appup.src +++ b/lib-ce/emqx_modules/src/emqx_modules.appup.src @@ -1,7 +1,8 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.6",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]}, + [{"4.3.7",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]}, + {"4.3.6",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]}, {"4.3.5", [{load_module,emqx_modules,brutal_purge,soft_purge,[]}, {load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]}, @@ -31,7 +32,8 @@ {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.6",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]}, + [{"4.3.7",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]}, + {"4.3.6",[{load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]}, {"4.3.5", [{load_module,emqx_modules,brutal_purge,soft_purge,[]}, {load_module,emqx_mod_rewrite,brutal_purge,soft_purge,[]}]}, diff --git a/rebar.config b/rebar.config index 7c5e01ff9..cf46d87c9 100644 --- a/rebar.config +++ b/rebar.config @@ -46,12 +46,12 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.6"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.10"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.11"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.2"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} - , {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}} + , {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.4"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3.1"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}} diff --git a/rebar.config.erl b/rebar.config.erl index c94f80c18..91f5f8058 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -344,6 +344,7 @@ relx_overlay(ReleaseType) -> , {template, "data/emqx_vars", "releases/emqx_vars"} , {copy, "bin/emqx", "bin/emqx"} , {copy, "bin/emqx_ctl", "bin/emqx_ctl"} + , {copy, "bin/emqx_cluster_rescue", "bin/emqx_cluster_rescue"} , {copy, "bin/node_dump", "bin/node_dump"} , {copy, "bin/install_upgrade.escript", "bin/install_upgrade.escript"} , {copy, "bin/emqx", "bin/emqx-{{release_version}}"} %% for relup diff --git a/scripts/get-dashboard.sh b/scripts/get-dashboard.sh index 10c393fb4..98b09921e 100755 --- a/scripts/get-dashboard.sh +++ b/scripts/get-dashboard.sh @@ -8,8 +8,8 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.." PKG_VSN="${PKG_VSN:-$(./pkg-vsn.sh)}" case "${PKG_VSN}" in 4.3*) - EMQX_CE_DASHBOARD_VERSION='v4.3.8' - EMQX_EE_DASHBOARD_VERSION='v4.3.21' + EMQX_CE_DASHBOARD_VERSION='v4.3.9' + EMQX_EE_DASHBOARD_VERSION='v4.3.22' ;; *) echo "Unsupported version $PKG_VSN" >&2 diff --git a/scripts/relup-base-packages.sh b/scripts/relup-base-packages.sh index 1c99f0c9c..01d2fa96b 100755 --- a/scripts/relup-base-packages.sh +++ b/scripts/relup-base-packages.sh @@ -14,15 +14,15 @@ fi case $PROFILE in "emqx") - DIR='broker' + DIR='emqx-ce' EDITION='community' ;; "emqx-ee") - DIR='enterprise' + DIR='emqx-ee' EDITION='enterprise' ;; "emqx-edge") - DIR='edge' + DIR='emqx-edge' EDITION='edge' ;; esac @@ -55,7 +55,7 @@ pushd _upgrade_base for tag in $(../scripts/relup-base-vsns.sh $EDITION | xargs echo -n); do filename="$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip" - url="https://www.emqx.com/downloads/$DIR/${tag#[e|v]}/$filename" + url="https://packages.emqx.io/$DIR/$tag/$filename" echo "downloading base package from ${url} ..." if [ ! -f "$filename" ] && curl -L -I -m 10 -o /dev/null -s -w "%{http_code}" "${url}" | grep -q -oE "^[23]+" ; then curl -L -o "${filename}" "${url}" diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 090f7c0b4..bad08dba2 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -3,11 +3,12 @@ {VSN, [{"4.3.17", [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}]}, {"4.3.16", [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, @@ -34,7 +35,7 @@ {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, @@ -63,7 +64,7 @@ {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, @@ -86,7 +87,7 @@ {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, @@ -119,7 +120,7 @@ {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, @@ -158,7 +159,7 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -198,7 +199,7 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -238,7 +239,7 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -282,7 +283,7 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -325,7 +326,7 @@ {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -369,7 +370,7 @@ {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -412,7 +413,7 @@ {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -476,7 +477,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, @@ -518,7 +519,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, @@ -558,7 +559,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -599,7 +600,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, @@ -648,7 +649,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -675,11 +676,12 @@ {<<".*">>,[]}], [{"4.3.17", [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}]}, {"4.3.16", [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, @@ -708,7 +710,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, @@ -738,7 +740,7 @@ {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, @@ -758,7 +760,7 @@ {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, @@ -789,7 +791,7 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, @@ -826,7 +828,7 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -864,7 +866,7 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -902,7 +904,7 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -944,7 +946,7 @@ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -984,7 +986,7 @@ {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -1026,7 +1028,7 @@ {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -1067,7 +1069,7 @@ {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -1129,7 +1131,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, @@ -1169,7 +1171,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, @@ -1205,7 +1207,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -1244,7 +1246,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, @@ -1286,7 +1288,7 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_broker_sup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_logger_jsonfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 1e77b6014..3f67c225d 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -164,27 +164,24 @@ ack_enabled() -> do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise - SubPid ! {deliver, Topic, Msg}, - ok; + send(SubPid, Topic, {deliver, Topic, Msg}); %% return either 'ok' (when everything is fine) or 'error' do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch - SubPid ! {deliver, Topic, Msg}, - ok; + send(SubPid, Topic, {deliver, Topic, Msg}); do_dispatch(SubPid, Group, Topic, Msg, Type) -> case ack_enabled() of true -> dispatch_with_ack(SubPid, Group, Topic, Msg, Type); false -> - SubPid ! {deliver, Topic, Msg}, - ok + send(SubPid, Topic, {deliver, Topic, Msg}) end. dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), Sender = self(), - SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}, + send(SubPid, Topic, {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}), Timeout = case Msg#message.qos of ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); ?QOS_2 -> infinity @@ -210,6 +207,15 @@ dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> _ = erlang:demonitor(Ref, [flush]) end. +send(Pid, Topic, Msg) -> + Node = node(Pid), + if Node =:= node() -> + Pid ! Msg; + true -> + emqx_rpc:cast(Topic, Node, erlang, send, [Pid, Msg]) + end, + ok. + with_group_ack(Msg, Group, Type, Sender, Ref) -> emqx_message:set_headers(#{shared_dispatch_ack => {Sender, old_ref(Type, Group, Ref)}}, Msg).