diff --git a/Makefile b/Makefile index f6230d2c7..5c0fd0bba 100644 --- a/Makefile +++ b/Makefile @@ -101,12 +101,19 @@ rebar-eunit: $(CUTTLEFISH_SCRIPT) rebar-compile: @rebar3 compile -rebar-ct: app.config +rebar-ct-setup: app.config @rebar3 as test compile @ln -s -f '../../../../etc' _build/test/lib/emqx/ @ln -s -f '../../../../data' _build/test/lib/emqx/ + +rebar-ct: rebar-ct-setup @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(shell echo $(foreach var,$(CT_SUITES),test/$(var)_SUITE) | tr ' ' ',') +## Run one single CT with rebar3 +## e.g. make ct-one-suite suite=emqx_portal +ct-one-suite: rebar-ct-setup + @rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(suite)_SUITE + rebar-clean: @rebar3 clean diff --git a/src/portal/emqx_portal.erl b/src/portal/emqx_portal.erl index 238b6681f..a0f954e1a 100644 --- a/src/portal/emqx_portal.erl +++ b/src/portal/emqx_portal.erl @@ -73,7 +73,7 @@ -export([connecting/3, connected/3]). %% management APIs --export([get_forwards/1]). %, add_forward/2, del_forward/2]). +-export([get_forwards/1, ensure_forward_present/2]). %, del_forward/2]). -export([get_subscriptions/1]). %, add_subscription/3, del_subscription/2]). -export_type([config/0, @@ -86,6 +86,7 @@ -type config() :: map(). -type batch() :: [emqx_portal_msg:exp_msg()]. -type ack_ref() :: term(). +-type topic() :: emqx_topic:topic(). -include("logger.hrl"). -include("emqx_mqtt.hrl"). @@ -137,8 +138,13 @@ handle_ack(Pid, Ref) when node() =:= node(Pid) -> Pid ! {batch_ack, Ref}, ok. --spec get_forwards(id()) -> [emqx_topic:topic()]. -get_forwards(Id) -> gen_statem:call(id(Id), get_forwards). +%% @doc Return all forwards (local subscriptions). +-spec get_forwards(id()) -> [topic()]. +get_forwards(Id) -> gen_statem:call(id(Id), get_forwards, timer:seconds(1000)). + +%% @doc Add a new forward (local topic subscription). +-spec ensure_forward_present(id(), topic()) -> ok | {error, any()}. +ensure_forward_present(Id, Topic) -> gen_statem:call(id(Id), {ensure_forward_present, topic(Topic)}). -spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}]. get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions). @@ -185,7 +191,7 @@ init(Config) -> batch_count_limit => GetQ(batch_count_limit, ?DEFAULT_BATCH_COUNT), max_inflight_batches => Get(max_inflight_batches, ?DEFAULT_SEND_AHEAD), mountpoint => format_mountpoint(Get(mountpoint, undefined)), - topics => Topics, + forwards => Topics, subscriptions => Subs, replayq => Queue, inflight => [] @@ -274,6 +280,16 @@ connected(Type, Content, State) -> %% Common handlers common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) -> {keep_state_and_data, [{reply, From, Forwards}]}; +common(_StateName, {call, From}, {ensure_forward_present, Topic}, + #{forwards := Forwards} = State) -> + case lists:member(Topic, Forwards) of + true -> + {keep_state_and_data, [{reply, From, ok}]}; + false -> + ok = subscribe_local_topic(Topic), + {keep_state, State#{forwards := lists:usort([Topic | Forwards])}, + [{reply, From, ok}]} + end; common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) -> {keep_state_and_data, [{reply, From, Subs}]}; common(_StateName, info, {dispatch, _, Msg}, @@ -281,8 +297,8 @@ common(_StateName, info, {dispatch, _, Msg}, NewQ = replayq:append(Q, collect([Msg])), {keep_state, State#{replayq => NewQ}, ?maybe_send}; common(StateName, Type, Content, State) -> - ?DEBUG("Portal ~p discarded ~p type event at state ~p:~p", - [name(), Type, StateName, Content]), + ?INFO("Portal ~p discarded ~p type event at state ~p:~p", + [name(), Type, StateName, Content]), {keep_state, State}. collect(Acc) -> @@ -346,13 +362,19 @@ do_ack(#{inflight := Inflight}, Ref) -> false -> stale end. -subscribe_local_topics(Topics) -> - lists:foreach( - fun(Topic0) -> - Topic = iolist_to_binary(Topic0), - emqx_topic:validate({filter, Topic}) orelse erlang:error({bad_topic, Topic}), - emqx_broker:subscribe(Topic, #{qos => ?QOS_1, subid => name()}) - end, Topics). +subscribe_local_topics(Topics) -> lists:foreach(fun subscribe_local_topic/1, Topics). + +subscribe_local_topic(Topic0) -> + Topic = topic(Topic0), + try + emqx_topic:validate({filter, Topic}) + catch + error : Reason -> + erlang:error({bad_topic, Topic, Reason}) + end, + ok = emqx_broker:subscribe(Topic, #{qos => ?QOS_1, subid => name()}). + +topic(T) -> iolist_to_binary(T). disconnect(#{connection := Conn, conn_ref := ConnRef, diff --git a/test/emqx_ct_broker_helpers.erl b/test/emqx_ct_broker_helpers.erl index 1ef5b1fa3..0e0bfa3a4 100644 --- a/test/emqx_ct_broker_helpers.erl +++ b/test/emqx_ct_broker_helpers.erl @@ -54,10 +54,17 @@ "ECDH-RSA-AES128-SHA","AES128-SHA"]}]). run_setup_steps() -> + _ = run_setup_steps([]), + %% return ok to be backward compatible + ok. + +run_setup_steps(Config) -> NewConfig = generate_config(), lists:foreach(fun set_app_env/1, NewConfig), set_bridge_env(), - application:ensure_all_started(?APP). + {ok, _} = application:ensure_all_started(?APP), + set_log_level(Config), + Config. run_teardown_steps() -> ?APP:shutdown(). @@ -67,6 +74,12 @@ generate_config() -> Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]), cuttlefish_generator:map(Schema, Conf). +set_log_level(Config) -> + case proplists:get_value(log_level, Config) of + undefined -> ok; + Level -> emqx_logger:set_log_level(Level) + end. + get_base_dir(Module) -> {file, Here} = code:is_loaded(Module), filename:dirname(filename:dirname(Here)). diff --git a/test/emqx_portal_SUITE.erl b/test/emqx_portal_SUITE.erl index 50d438f16..2c92afd94 100644 --- a/test/emqx_portal_SUITE.erl +++ b/test/emqx_portal_SUITE.erl @@ -39,8 +39,7 @@ init_per_suite(Config) -> _ -> ok end, - emqx_ct_broker_helpers:run_setup_steps(), - Config. + emqx_ct_broker_helpers:run_setup_steps(Config). end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). @@ -57,12 +56,14 @@ t_forwards_mngr(Config) when is_list(Config) -> {ok, Pid} = emqx_portal:start_link(Name, Cfg), try ?assertEqual([<<"mngr">>], emqx_portal:get_forwards(Name)), + ?assertEqual(ok, emqx_portal:ensure_forward_present(Name, "mngr")), + ?assertEqual(ok, emqx_portal:ensure_forward_present(Name, "mngr2")), + ?assertEqual([<<"mngr">>, <<"mngr2">>], emqx_portal:get_forwards(Pid)), ?assertEqual(Subs, emqx_portal:get_subscriptions(Pid)) after ok = emqx_portal:stop(Pid) end. - %% A loopback RPC to local node t_rpc(Config) when is_list(Config) -> Cfg = #{address => node(),