From d4495fd8e72ee0667410bd665d9f310a9278de90 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Sun, 24 Feb 2019 21:55:03 +0100 Subject: [PATCH] Add manual start API --- src/portal/emqx_portal.erl | 82 +++++++++++++++++++++++++++++++++----- test/emqx_portal_SUITE.erl | 10 +++-- test/emqx_portal_tests.erl | 17 +++++++- 3 files changed, 93 insertions(+), 16 deletions(-) diff --git a/src/portal/emqx_portal.erl b/src/portal/emqx_portal.erl index 3673731f6..76f7c1842 100644 --- a/src/portal/emqx_portal.erl +++ b/src/portal/emqx_portal.erl @@ -36,11 +36,12 @@ %% %% Batch collector state diagram %% -%% [connecting] --(2)--> [connected] -%% | ^ | -%% | | | -%% '--(1)---'--------(3)------' +%% [standing_by] --(0) --> [connecting] --(2)--> [connected] +%% | ^ | +%% | | | +%% '--(1)---'--------(3)------' %% +%% (0): auto or manual start %% (1): retry timeout %% (2): successfuly connected to remote node/cluster %% (3): received {disconnected, conn_ref(), Reason} OR @@ -70,9 +71,10 @@ -export([terminate/3, code_change/4, init/1, callback_mode/0]). %% state functions --export([connecting/3, connected/3]). +-export([standing_by/3, connecting/3, connected/3]). %% management APIs +-export([ensure_started/2, ensure_stopped/1, ensure_stopped/2]). -export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]). -export([get_subscriptions/1, ensure_subscription_present/3, ensure_subscription_absent/2]). @@ -100,6 +102,8 @@ -define(maybe_send, {next_event, internal, maybe_send}). %% @doc Start a portal worker. Supported configs: +%% start_type: 'manual' (default) or 'auto', when manual, portal will stay +%% at 'standing_by' state until a manual call to start it. %% connect_module: The module which implements emqx_portal_connect behaviour %% and work as message batch transport layer %% reconnect_delay_ms: Delay in milli-seconds for the portal worker to retry @@ -123,6 +127,38 @@ start_link(Name, Config) when is_list(Config) -> start_link(Name, Config) -> gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []). +%% @doc Manually start portal worker. State idempotency ensured. +ensure_started(Name, Config) -> + case start_link(Name, Config) of + {ok, Pid} -> {ok, Pid}; + {error, {already_started,Pid}} -> {ok, Pid} + end. + +%% @doc Manually stop portal worker. State idempotency ensured. +ensure_stopped(Id) -> + ensure_stopped(Id, 1000). + +ensure_stopped(Id, Timeout) -> + Pid = case id(Id) of + P when is_pid(P) -> P; + N -> whereis(N) + end, + case Pid of + undefined -> + ok; + _ -> + MRef = monitor(process, Pid), + unlink(Pid), + _ = gen_statem:call(id(Id), ensure_stopped, Timeout), + receive + {'DOWN', MRef, _, _, _} -> + ok + after + Timeout -> + exit(Pid, kill) + end + end. + stop(Pid) -> gen_statem:stop(Pid). %% @doc This function is to be evaluated on message/batch receiver side. @@ -193,7 +229,6 @@ init(Config) -> true = emqx_topic:validate({filter, T}), {T, QoS} end, Get(subscriptions, []))), - ok = subscribe_local_topics(Topics), ConnectModule = maps:get(connect_module, Config), ConnectConfig = maps:without([connect_module, queue, @@ -203,9 +238,10 @@ init(Config) -> forwards ], Config#{subscriptions => Subs}), ConnectFun = fun(SubsX) -> emqx_portal_connect:start(ConnectModule, ConnectConfig#{subscriptions := SubsX}) end, - {ok, connecting, + {ok, standing_by, #{connect_module => ConnectModule, connect_fun => ConnectFun, + start_type => Get(start_type, manual), reconnect_delay_ms => maps:get(reconnect_delay_ms, Config, ?DEFAULT_RECONNECT_DELAY_MS), batch_bytes_limit => GetQ(batch_bytes_limit, ?DEFAULT_BATCH_BYTES), batch_count_limit => GetQ(batch_count_limit, ?DEFAULT_BATCH_COUNT), @@ -225,16 +261,36 @@ terminate(_Reason, _StateName, #{replayq := Q} = State) -> _ = replayq:close(Q), ok. +%% @doc Standing by for manual start. +standing_by(enter, _, #{start_type := auto}) -> + Action = {state_timeout, 0, do_connect}, + {keep_state_and_data, Action}; +standing_by(enter, _, #{start_type := manual}) -> + keep_state_and_data; +standing_by({call, From}, ensure_started, State) -> + {next_state, connecting, State, [{reply, From, ok}]}; +standing_by({call, From}, ensure_stopped, _State) -> + {stop_and_reply, {shutdown, manual}, [{reply, From, ok}]}; +standing_by(state_timeout, do_connect, State) -> + {next_state, connecting, State}; +standing_by({call, From}, _Call, _State) -> + {keep_state_and_data, [{reply, From, {error, standing_by}}]}; +standing_by(info, Info, State) -> + ?INFO("Portal ~p discarded info event at state standing_by:\n~p", [name(), Info]), + {keep_state_and_data, State}. + %% @doc Connecting state is a state with timeout. %% After each timeout, it re-enters this state and start a retry until %% successfuly connected to remote node/cluster. connecting(enter, connected, #{reconnect_delay_ms := Timeout}) -> Action = {state_timeout, Timeout, reconnect}, {keep_state_and_data, Action}; -connecting(enter, connecting, #{reconnect_delay_ms := Timeout, - connect_fun := ConnectFun, - subscriptions := Subs - } = State) -> +connecting(enter, _, #{reconnect_delay_ms := Timeout, + connect_fun := ConnectFun, + subscriptions := Subs, + forwards := Forwards + } = State) -> + ok = subscribe_local_topics(Forwards), case ConnectFun(Subs) of {ok, ConnRef, Conn} -> Action = {state_timeout, 0, connected}, @@ -300,6 +356,8 @@ connected(Type, Content, State) -> common(connected, Type, Content, State). %% Common handlers +common(_StateName, {call, From}, ensure_started, _State) -> + {keep_state_and_data, [{reply, From, ok}]}; common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) -> {keep_state_and_data, [{reply, From, Forwards}]}; common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) -> @@ -310,6 +368,8 @@ common(_StateName, {call, From}, {ensure_present, What, Topic}, State) -> common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) -> {Result, NewState} = ensure_absent(What, Topic, State), {keep_state, NewState, [{reply, From, Result}]}; +common(_StateName, {call, From}, ensure_stopped, _State) -> + {stop_and_reply, {shutdown, manual}, [{reply, From, ok}]}; common(_StateName, info, {dispatch, _, Msg}, #{replayq := Q} = State) -> NewQ = replayq:append(Q, collect([Msg])), diff --git a/test/emqx_portal_SUITE.erl b/test/emqx_portal_SUITE.erl index 8b80fb72f..3d34eda50 100644 --- a/test/emqx_portal_SUITE.erl +++ b/test/emqx_portal_SUITE.erl @@ -50,7 +50,8 @@ t_mngr(Config) when is_list(Config) -> forwards => [<<"mngr">>], connect_module => emqx_portal_rpc, mountpoint => <<"forwarded">>, - subscriptions => Subs + subscriptions => Subs, + start_type => auto }, Name = ?FUNCTION_NAME, {ok, Pid} = emqx_portal:start_link(Name, Cfg), @@ -76,7 +77,8 @@ t_rpc(Config) when is_list(Config) -> Cfg = #{address => node(), forwards => [<<"t_rpc/#">>], connect_module => emqx_portal_rpc, - mountpoint => <<"forwarded">> + mountpoint => <<"forwarded">>, + start_type => auto }, {ok, Pid} = emqx_portal:start_link(?FUNCTION_NAME, Cfg), ClientId = <<"ClientId">>, @@ -125,10 +127,10 @@ t_mqtt(Config) when is_list(Config) -> }, reconnect_delay_ms => 1000, ssl => false, - start_type => manual, %% Consume back to forwarded message for verification %% NOTE: this is a indefenite loopback without mocking emqx_portal:import_batch/2 - subscriptions => [{ForwardedTopic, _QoS = 1}] + subscriptions => [{ForwardedTopic, _QoS = 1}], + start_type => auto }, Tester = self(), Ref = make_ref(), diff --git a/test/emqx_portal_tests.erl b/test/emqx_portal_tests.erl index b44e02732..03f545b38 100644 --- a/test/emqx_portal_tests.erl +++ b/test/emqx_portal_tests.erl @@ -96,6 +96,20 @@ test_buffer_when_disconnected() -> ?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000), ok = emqx_portal:stop(?PORTAL_REG_NAME). +manual_start_stop_test() -> + Ref = make_ref(), + Config0 = make_config(Ref, self(), {ok, Ref, connection}), + Config = Config0#{start_type := manual}, + {ok, Pid} = emqx_portal:ensure_started(?PORTAL_NAME, Config), + %% call ensure_started again should yeld the same result + {ok, Pid} = emqx_portal:ensure_started(?PORTAL_NAME, Config), + ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)), + ?assertEqual({error, standing_by}, + emqx_portal:ensure_forward_present(Pid, "dummy")), + ok = emqx_portal:ensure_stopped(unknown), + ok = emqx_portal:ensure_stopped(Pid), + ok = emqx_portal:ensure_stopped(?PORTAL_REG_NAME). + %% Feed messages to portal sender_loop(_Pid, [], _) -> exit(normal); sender_loop(Pid, [Num | Rest], Interval) -> @@ -133,7 +147,8 @@ make_config(Ref, TestPid, Result) -> test_ref => Ref, connect_module => ?MODULE, reconnect_delay_ms => 50, - connect_result => Result + connect_result => Result, + start_type => auto }. make_msg(I) ->