diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 7d059ce96..a04426769 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -3,6 +3,7 @@ {VSN, [{"4.4.5", [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.4", [{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, @@ -72,6 +73,7 @@ {<<".*">>,[]}], [{"4.4.5", [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.4.4", [{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_validator.erl b/apps/emqx_rule_engine/src/emqx_rule_validator.erl index 237e5dfba..27156cb01 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_validator.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_validator.erl @@ -33,7 +33,7 @@ }. -type data_type() :: string | password | number | boolean - | object | array | file | cfgselect. + | object | array | file | binary_file | cfgselect. -type params_spec() :: #{name() => spec()} | any. -type params() :: #{binary() => term()}. @@ -46,6 +46,7 @@ , object , array , file + , binary_file , cfgselect %% TODO: [5.0] refactor this ]). @@ -84,8 +85,8 @@ validate_spec(ParamsSepc) -> %% Internal Functions %%------------------------------------------------------------------------------ -validate_value(Val, #{type := Types} = Spec) when is_list(Types) -> - validate_types(Val, Types, Spec); +validate_value(Val, #{type := Union} = Spec) when is_list(Union) -> + validate_union(Val, Union, Spec); validate_value(Val, #{enum := Enum}) -> validate_enum(Val, Enum); validate_value(Val, #{type := object} = Spec) -> @@ -93,17 +94,22 @@ validate_value(Val, #{type := object} = Spec) -> validate_value(Val, #{type := Type} = Spec) -> validate_type(Val, Type, Spec). -validate_types(Val, [], _Spec) -> - throw({invalid_data_type, Val}); -validate_types(Val, [Type | Types], Spec) -> +validate_union(Val, Union, _Spec) -> + do_validate_union(Val, Union, Union, _Spec). + +do_validate_union(Val, Union, [], _Spec) -> + throw({invalid_data_type, {union, {Val, Union}}}); +do_validate_union(Val, Union, [Type | Types], Spec) -> try validate_type(Val, Type, Spec) catch _:_ -> - validate_types(Val, Types, Spec) + do_validate_union(Val, Union, Types, Spec) end. validate_type(Val, file, _Spec) -> validate_file(Val); +validate_type(Val, binary_file, _Spec) -> + validate_binary_file(Val); validate_type(Val, String, Spec) when String =:= string; String =:= password -> validate_string(Val, reg_exp(maps:get(format, Spec, any))); @@ -118,6 +124,8 @@ validate_type(Val, cfgselect, _Spec) -> %% TODO: [5.0] refactor this. Val. +validate_enum(Val, BoolEnum) when BoolEnum == [true, false]; BoolEnum == [false, true] -> + validate_boolean(Val); validate_enum(Val, Enum) -> case lists:member(Val, Enum) of true -> Val; @@ -147,6 +155,10 @@ validate_boolean(false) -> false; validate_boolean(<<"false">>) -> false; validate_boolean(Val) -> throw({invalid_data_type, {boolean, Val}}). +validate_binary_file(#{<<"file">> := _, <<"filename">> := _} = Val) -> + Val; +validate_binary_file(Val) -> + throw({invalid_data_type, {binary_file, Val}}). validate_file(Val) when is_map(Val) -> Val; validate_file(Val) when is_list(Val) -> Val; validate_file(Val) when is_binary(Val) -> Val; @@ -163,6 +175,14 @@ do_validate_spec(Name, #{type := object} = Spec) -> fun (not_found) -> throw({required_field_missing, {schema, {in, Name}}}); (Schema) -> validate_spec(Schema) end); +do_validate_spec(Name, #{type := cfgselect} = Spec) -> + find_field(items, Spec, + fun (not_found) -> throw({required_field_missing, {items, {in, Name}}}); + (Items) -> + maps:map(fun(_K, Schema) -> + validate_spec(Schema) + end, Items) + end); do_validate_spec(Name, #{type := array} = Spec) -> find_field(items, Spec, fun (not_found) -> throw({required_field_missing, {items, {in, Name}}}); diff --git a/apps/emqx_rule_engine/test/emqx_rule_validator_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_validator_SUITE.erl index a5870ae79..8c7ff9b86 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_validator_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_validator_SUITE.erl @@ -70,6 +70,32 @@ } } }, + type_cfgselect => #{ + type => cfgselect, + enum => [<<"upload">>, <<"path">>], + default => <<"upload">>, + items => + #{ + upload => + #{ + kerberos_keytab => + #{ + order => 6, + type => binary_file, + default => #{file => <<"">>, filename => <<"no_keytab.key">>} + } + }, + path => + #{ + kerberos_keytab_path => + #{ + order => 7, + type => string, + default => <<"">> + } + } + } + }, type_array => #{ type => array, required => true, @@ -88,7 +114,7 @@ t_validate_spec_the_complex(_) -> t_validate_spec_invalid_1(_) -> ?assertThrow({required_field_missing, {type, _}}, emqx_rule_validator:validate_spec(#{ - type_enum_number => #{ + a => #{ required => true } })). @@ -96,15 +122,23 @@ t_validate_spec_invalid_1(_) -> t_validate_spec_invalid_2(_) -> ?assertThrow({required_field_missing, {schema, _}}, emqx_rule_validator:validate_spec(#{ - type_enum_number => #{ + a => #{ type => object } })). +t_validate_spec_invalid_2_1(_) -> + ?assertThrow({required_field_missing, {items, _}}, + emqx_rule_validator:validate_spec(#{ + a => #{ + type => cfgselect + } + })). + t_validate_spec_invalid_3(_) -> ?assertThrow({required_field_missing, {items, _}}, emqx_rule_validator:validate_spec(#{ - type_enum_number => #{ + a => #{ type => array } })). @@ -162,6 +196,22 @@ t_validate_params_fill_default(_) -> ?assertMatch(#{<<"abc">> := 1, <<"eee">> := <<"hello">>}, emqx_rule_validator:validate_params(Params, Specs)). +t_validate_params_binary_file(_) -> + Params = #{<<"kfile">> => #{<<"file">> => <<"foo">>, <<"filename">> => <<"foo.key">>}}, + Specs = #{<<"kfile">> => #{ + type => binary_file, + required => true + }}, + ?assertMatch(#{<<"kfile">> := #{<<"file">> := <<"foo">>, <<"filename">> := <<"foo.key">>}}, + emqx_rule_validator:validate_params(Params, Specs)), + Params1 = #{<<"kfile">> => #{<<"file">> => <<"foo">>}}, + Specs1 = #{<<"kfile">> => #{ + type => binary_file, + required => true + }}, + ?assertThrow({invalid_data_type, {binary_file, #{<<"file">> := <<"foo">>}}}, + emqx_rule_validator:validate_params(Params1, Specs1)). + t_validate_params_the_complex(_) -> Params = #{ <<"string_required">> => <<"hello">>, @@ -173,6 +223,8 @@ t_validate_params_the_complex(_) -> <<"string_required">> => <<"hello2">>, <<"type_number">> => 1.3 }, + <<"type_cfgselect">> => <<"upload">>, + <<"kerberos_keytab">> => #{<<"file">> => <<"foo">>, <<"filename">> => <<"foo.key">>}, <<"type_array">> => [<<"ok">>, <<"no">>] }, ?assertMatch( @@ -186,6 +238,8 @@ t_validate_params_the_complex(_) -> <<"string_required">> := <<"hello2">>, <<"type_number">> := 1.3 }, + <<"kerberos_keytab">> := #{<<"file">> := <<"foo">>, <<"filename">> := <<"foo.key">>}, + <<"type_cfgselect">> := <<"upload">>, <<"type_array">> := [<<"ok">>, <<"no">>] }, emqx_rule_validator:validate_params(Params, ?VALID_SPEC)). diff --git a/bin/emqx b/bin/emqx index ec3390d58..e30f08fe2 100755 --- a/bin/emqx +++ b/bin/emqx @@ -16,6 +16,7 @@ RUNNER_ROOT_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")"/..; pwd -P)" EMQX_LICENSE_CONF='' REL_NAME="emqx" ERTS_PATH="$RUNNER_ROOT_DIR/erts-$ERTS_VSN/bin" +export EMQX_DESCRIPTION RUNNER_SCRIPT="$RUNNER_BIN_DIR/$REL_NAME" CODE_LOADING_MODE="${CODE_LOADING_MODE:-embedded}" diff --git a/bin/emqx_cluster_rescue b/bin/emqx_cluster_rescue new file mode 100755 index 000000000..f399dbbee --- /dev/null +++ b/bin/emqx_cluster_rescue @@ -0,0 +1,187 @@ +#!/usr/bin/env bash +set -euo pipefail +# ================================== +# RESCUE THE UNBOOTABLE EMQX CLUSTER +# ================================== + +## Global Vars +# Steal from emqx_ctl +THIS_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")" || true; pwd -P)" + +usage() { + local Script + Script=$(basename "$0") + + echo " + RESCUE THE UNBOOTABLE EMQX CLUSTER + + Use this script only when the entire cluster is stuck at booting & loading. + + This script provides a list of methods to *hack* the DB of EMQX to bring back + the cluster back to service but MAY come with some side effects including: + + - Data loss + - Inconsistent data in the cluster + - Other undefined behaviors + + *DO NOT* use this script unless you understand the consequences. + *DO NOT* use this script when EMQX cluster is partitioned. + + Use Case: + + - Lost one node due to unrecoverable failures (hardware, cloud resource outage) + and this node prevents other nodes in the cluster from starting. + +Usage: + + # For troubleshooting, find out all the tables that are pending at loading + $Script pending-tables + + # For troubleshooting, debug print detailed table info that is pending at loading. + $Script table-details + + # Force load one [Tab] or all pending tables from node local storage to bring this node up + # Use local data as the data source for the pending tables, should bring up the node immediately and + # spread the data to other nodes in the cluster. + # + # * Take effect immediately + # * This is a node local change but the change will be lost after restart. + $Script force-load [Tab] + + # Remove Node from mnesia cluster. + # Most likely will fail if the remote Node is unreachable. + # + # * This is a cluster wide schema change. + $Script remove-node Node + + # Set master node for distributed DB + # The master node will be the data source for pending tables. + # + # * This is a node local change + # * Node could be a remote Erlang node in the cluster or local erlang node + # * Use command: 'unset-master' to rollback + $Script set-master Node + + # Unset master node for distributed DB, this is a node local change + $Script unset-master + + # Cheat the local node that RemoteNode is down so that it will not wait for it to come up. + # Local node will take local data as the data source for pending tables and spread the data + # to the other pending nodes. + # + # * Check EMQX logs to find out which remote node(s) the local node is waiting for + # * To take effect, restart this EMQX node + # * This is a node local setting + + $Script lie-node-down RemoteNode + +Tips: + - Override local node name with envvar: \$EMQX_NODE_NAME + " +} + +# Functions +# +print_pending_tables() { + local erl_cmd='[ io:format("~p :: ~p~n", [T, maps:with([all_nodes, load_order, storage_type, + active_replicas, local_content, load_by_force, + load_node, load_reason, master_nodes] + , maps:from_list(mnesia:table_info(T, all)))]) + || T <- mnesia:system_info(local_tables), unknown =:= mnesia:table_info(T, load_node) ], + ok + ' + exec "$THIS_DIR/emqx" eval "$erl_cmd" +} + +print_details_per_table() { + local erl_cmd='[ io:format("~p :: ~p~n", [T, mnesia:table_info(T, all)]) + || T <- mnesia:system_info(local_tables), unknown =:= mnesia:table_info(T, load_node) ], + ok + ' + exec "$THIS_DIR/emqx" eval "$erl_cmd" +} + +force-load() { + if [ $# -eq 1 ]; then + local erl_cmd="mnesia:force_load_table(${1})" + else + local erl_cmd='[ {T, mnesia:force_load_table(T)} + || T <- mnesia:system_info(local_tables), + unknown =:= mnesia:table_info(T, load_node) + ] + ' + fi + exec "$THIS_DIR/emqx" eval "$erl_cmd" +} + +remove-node() { + local target_node=$1 + local erl_cmd=" + case [T || T <- mnesia:system_info(local_tables), unknown =:= mnesia:table_info(T, load_node)] of + [] -> + io:format(\"No table need to load\\n\"), + skipped; + TargetTables -> + io:format(\"Going to remove node ${target_node} from schema of the tables:~n~p~n\", [TargetTables]), + case io:read(\"confirm? [yes.] OR Ctrl-D to skip: \") of + {ok, yes} -> + lists:map(fun(T) -> + mnesia:force_load_table(T), + {T, mnesia:del_table_copy(T, '${target_node}') } + end, TargetTables); + eof -> skipped; + R -> {skipped, R} + end + end + " + exec "$THIS_DIR/emqx" eval "$erl_cmd" +} + +set-master-node() { + if [ $# -eq 1 ]; then + local erl_cmd="mnesia:set_master_nodes(['${1}']), mnesia_recover:dump_decision_tab()" + else + local erl_cmd="mnesia:set_master_nodes([]), mnesia_recover:dump_decision_tab()" + fi + + exec "$THIS_DIR/emqx" eval "$erl_cmd" +} + +lie-node-down() { + if [ $# -eq 1 ]; then + local erl_cmd="mnesia_recover:log_mnesia_down('${1}'), mnesia_recover:dump_decision_tab()" + exec "$THIS_DIR/emqx" eval "$erl_cmd" + else + usage + fi +} + + +CMD=${1:-usage} +[ $# -gt 0 ] && shift 1 + +case "$CMD" in + force-load) + force-load "$@" + ;; + remove-node) + remove-node "$@" + ;; + pending-tables) + print_pending_tables + ;; + table-details) + print_details_per_table + ;; + set-master) + set-master-node "$@" + ;; + unset-master) + set-master-node + ;; + lie-node-down) + lie-node-down "$@" + ;; + *) + usage +esac diff --git a/include/emqx_release.hrl b/include/emqx_release.hrl index 4af432057..9b4c0e6f2 100644 --- a/include/emqx_release.hrl +++ b/include/emqx_release.hrl @@ -29,7 +29,7 @@ -ifndef(EMQX_ENTERPRISE). --define(EMQX_RELEASE, {opensource, "4.4.6-beta.1"}). +-define(EMQX_RELEASE, {opensource, "4.4.6-beta.2"}). -else. diff --git a/rebar.config.erl b/rebar.config.erl index dd627bc71..097fdf56c 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -338,6 +338,7 @@ relx_overlay(ReleaseType) -> , {template, "data/emqx_vars", "releases/emqx_vars"} , {copy, "bin/emqx", "bin/emqx"} , {copy, "bin/emqx_ctl", "bin/emqx_ctl"} + , {copy, "bin/emqx_cluster_rescue", "bin/emqx_cluster_rescue"} , {copy, "bin/node_dump", "bin/node_dump"} , {copy, "bin/install_upgrade.escript", "bin/install_upgrade.escript"} , {copy, "bin/emqx", "bin/emqx-{{release_version}}"} %% for relup diff --git a/src/emqx.appup.src b/src/emqx.appup.src index b4c7917fb..fe4cb32c3 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -4,11 +4,13 @@ [{"4.4.5", [ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}]}, {"4.4.4", [{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, @@ -24,6 +26,7 @@ {load_module,emqx_metrics,brutal_purge,soft_purge,[]}]}, {"4.4.3", [{add_module,emqx_calendar}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {add_module,emqx_exclusive_subscription}, @@ -49,6 +52,7 @@ {load_module,emqx_relup}]}, {"4.4.2", [{add_module,emqx_calendar}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {add_module,emqx_exclusive_subscription}, @@ -77,6 +81,7 @@ {load_module,emqx_relup}]}, {"4.4.1", [{load_module,emqx_packet,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {add_module,emqx_exclusive_subscription}, {apply,{emqx_exclusive_subscription,on_add_module,[]}}, @@ -113,6 +118,7 @@ {add_module,emqx_relup}]}, {"4.4.0", [{load_module,emqx_packet,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {add_module,emqx_calendar}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {add_module,emqx_exclusive_subscription}, @@ -153,13 +159,14 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.4.5", - [ + [{update,emqx_broker_sup,supervisor}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}]}, {"4.4.4", [{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, @@ -175,6 +182,7 @@ {load_module,emqx_metrics,brutal_purge,soft_purge,[]}]}, {"4.4.3", [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, {load_module,emqx_topic,brutal_purge,soft_purge,[]}, @@ -199,6 +207,7 @@ {load_module,emqx_relup}]}, {"4.4.2", [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, {load_module,emqx_topic,brutal_purge,soft_purge,[]}, @@ -226,6 +235,7 @@ {load_module,emqx_relup}]}, {"4.4.1", [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, {load_module,emqx_topic,brutal_purge,soft_purge,[]}, {apply,{emqx_exclusive_subscription,on_delete_module,[]}}, @@ -261,6 +271,7 @@ {delete_module,emqx_relup}]}, {"4.4.0", [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {update,emqx_broker_sup,supervisor}, {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, {load_module,emqx_topic,brutal_purge,soft_purge,[]}, {apply,{emqx_exclusive_subscription,on_delete_module,[]}}, diff --git a/src/emqx_broker_sup.erl b/src/emqx_broker_sup.erl index bc1ef965e..e4d51457f 100644 --- a/src/emqx_broker_sup.erl +++ b/src/emqx_broker_sup.erl @@ -51,5 +51,4 @@ init([]) -> type => worker, modules => [emqx_broker_helper]}, - {ok, {{one_for_all, 0, 1}, [BrokerPool, SharedSub, Helper]}}. - + {ok, {{one_for_one, 1, 5}, [BrokerPool, SharedSub, Helper]}}. diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 1e77b6014..3f67c225d 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -164,27 +164,24 @@ ack_enabled() -> do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise - SubPid ! {deliver, Topic, Msg}, - ok; + send(SubPid, Topic, {deliver, Topic, Msg}); %% return either 'ok' (when everything is fine) or 'error' do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch - SubPid ! {deliver, Topic, Msg}, - ok; + send(SubPid, Topic, {deliver, Topic, Msg}); do_dispatch(SubPid, Group, Topic, Msg, Type) -> case ack_enabled() of true -> dispatch_with_ack(SubPid, Group, Topic, Msg, Type); false -> - SubPid ! {deliver, Topic, Msg}, - ok + send(SubPid, Topic, {deliver, Topic, Msg}) end. dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> %% For QoS 1/2 message, expect an ack Ref = erlang:monitor(process, SubPid), Sender = self(), - SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}, + send(SubPid, Topic, {deliver, Topic, with_group_ack(Msg, Group, Type, Sender, Ref)}), Timeout = case Msg#message.qos of ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); ?QOS_2 -> infinity @@ -210,6 +207,15 @@ dispatch_with_ack(SubPid, Group, Topic, Msg, Type) -> _ = erlang:demonitor(Ref, [flush]) end. +send(Pid, Topic, Msg) -> + Node = node(Pid), + if Node =:= node() -> + Pid ! Msg; + true -> + emqx_rpc:cast(Topic, Node, erlang, send, [Pid, Msg]) + end, + ok. + with_group_ack(Msg, Group, Type, Sender, Ref) -> emqx_message:set_headers(#{shared_dispatch_ack => {Sender, old_ref(Type, Group, Ref)}}, Msg). diff --git a/test/emqx_broker_sup_SUITE.erl b/test/emqx_broker_sup_SUITE.erl new file mode 100644 index 000000000..04881f308 --- /dev/null +++ b/test/emqx_broker_sup_SUITE.erl @@ -0,0 +1,65 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_broker_sup_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-define(APP, emqx). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +init_per_testcase(Case, Config) -> + ?MODULE:Case({init, Config}). + +end_per_testcase(Case, Config) -> + ?MODULE:Case({'end', Config}). + +%%-------------------------------------------------------------------- +%% cases +%%-------------------------------------------------------------------- + +t_restart_shared_sub({init, Config}) -> + emqx:subscribe(<<"t/a">>, #{share => <<"groupa">>}), + true = exit(whereis(emqx_shared_sub), kill), + %% waiting for restart + timer:sleep(200), Config; +t_restart_shared_sub(Config) when is_list(Config) -> + ?assert(is_pid(whereis(emqx_shared_sub))), + emqx:publish(emqx_message:make(<<"t/a">>, <<"Hi">>)), + ?assert( + receive + {deliver, _Topic, #message{payload = <<"Hi">>}} -> true + after 2000 -> + false + end); +t_restart_shared_sub({'end', Config}) -> + emqx:unsubscribe(<<"$share/grpa/t/a">>).