Add manual start API
This commit is contained in:
parent
796fc3b1ba
commit
d4495fd8e7
|
@ -36,11 +36,12 @@
|
||||||
%%
|
%%
|
||||||
%% Batch collector state diagram
|
%% Batch collector state diagram
|
||||||
%%
|
%%
|
||||||
%% [connecting] --(2)--> [connected]
|
%% [standing_by] --(0) --> [connecting] --(2)--> [connected]
|
||||||
%% | ^ |
|
%% | ^ |
|
||||||
%% | | |
|
%% | | |
|
||||||
%% '--(1)---'--------(3)------'
|
%% '--(1)---'--------(3)------'
|
||||||
%%
|
%%
|
||||||
|
%% (0): auto or manual start
|
||||||
%% (1): retry timeout
|
%% (1): retry timeout
|
||||||
%% (2): successfuly connected to remote node/cluster
|
%% (2): successfuly connected to remote node/cluster
|
||||||
%% (3): received {disconnected, conn_ref(), Reason} OR
|
%% (3): received {disconnected, conn_ref(), Reason} OR
|
||||||
|
@ -70,9 +71,10 @@
|
||||||
-export([terminate/3, code_change/4, init/1, callback_mode/0]).
|
-export([terminate/3, code_change/4, init/1, callback_mode/0]).
|
||||||
|
|
||||||
%% state functions
|
%% state functions
|
||||||
-export([connecting/3, connected/3]).
|
-export([standing_by/3, connecting/3, connected/3]).
|
||||||
|
|
||||||
%% management APIs
|
%% management APIs
|
||||||
|
-export([ensure_started/2, ensure_stopped/1, ensure_stopped/2]).
|
||||||
-export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]).
|
-export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]).
|
||||||
-export([get_subscriptions/1, ensure_subscription_present/3, ensure_subscription_absent/2]).
|
-export([get_subscriptions/1, ensure_subscription_present/3, ensure_subscription_absent/2]).
|
||||||
|
|
||||||
|
@ -100,6 +102,8 @@
|
||||||
-define(maybe_send, {next_event, internal, maybe_send}).
|
-define(maybe_send, {next_event, internal, maybe_send}).
|
||||||
|
|
||||||
%% @doc Start a portal worker. Supported configs:
|
%% @doc Start a portal worker. Supported configs:
|
||||||
|
%% start_type: 'manual' (default) or 'auto', when manual, portal will stay
|
||||||
|
%% at 'standing_by' state until a manual call to start it.
|
||||||
%% connect_module: The module which implements emqx_portal_connect behaviour
|
%% connect_module: The module which implements emqx_portal_connect behaviour
|
||||||
%% and work as message batch transport layer
|
%% and work as message batch transport layer
|
||||||
%% reconnect_delay_ms: Delay in milli-seconds for the portal worker to retry
|
%% reconnect_delay_ms: Delay in milli-seconds for the portal worker to retry
|
||||||
|
@ -123,6 +127,38 @@ start_link(Name, Config) when is_list(Config) ->
|
||||||
start_link(Name, Config) ->
|
start_link(Name, Config) ->
|
||||||
gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []).
|
gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []).
|
||||||
|
|
||||||
|
%% @doc Manually start portal worker. State idempotency ensured.
|
||||||
|
ensure_started(Name, Config) ->
|
||||||
|
case start_link(Name, Config) of
|
||||||
|
{ok, Pid} -> {ok, Pid};
|
||||||
|
{error, {already_started,Pid}} -> {ok, Pid}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Manually stop portal worker. State idempotency ensured.
|
||||||
|
ensure_stopped(Id) ->
|
||||||
|
ensure_stopped(Id, 1000).
|
||||||
|
|
||||||
|
ensure_stopped(Id, Timeout) ->
|
||||||
|
Pid = case id(Id) of
|
||||||
|
P when is_pid(P) -> P;
|
||||||
|
N -> whereis(N)
|
||||||
|
end,
|
||||||
|
case Pid of
|
||||||
|
undefined ->
|
||||||
|
ok;
|
||||||
|
_ ->
|
||||||
|
MRef = monitor(process, Pid),
|
||||||
|
unlink(Pid),
|
||||||
|
_ = gen_statem:call(id(Id), ensure_stopped, Timeout),
|
||||||
|
receive
|
||||||
|
{'DOWN', MRef, _, _, _} ->
|
||||||
|
ok
|
||||||
|
after
|
||||||
|
Timeout ->
|
||||||
|
exit(Pid, kill)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
stop(Pid) -> gen_statem:stop(Pid).
|
stop(Pid) -> gen_statem:stop(Pid).
|
||||||
|
|
||||||
%% @doc This function is to be evaluated on message/batch receiver side.
|
%% @doc This function is to be evaluated on message/batch receiver side.
|
||||||
|
@ -193,7 +229,6 @@ init(Config) ->
|
||||||
true = emqx_topic:validate({filter, T}),
|
true = emqx_topic:validate({filter, T}),
|
||||||
{T, QoS}
|
{T, QoS}
|
||||||
end, Get(subscriptions, []))),
|
end, Get(subscriptions, []))),
|
||||||
ok = subscribe_local_topics(Topics),
|
|
||||||
ConnectModule = maps:get(connect_module, Config),
|
ConnectModule = maps:get(connect_module, Config),
|
||||||
ConnectConfig = maps:without([connect_module,
|
ConnectConfig = maps:without([connect_module,
|
||||||
queue,
|
queue,
|
||||||
|
@ -203,9 +238,10 @@ init(Config) ->
|
||||||
forwards
|
forwards
|
||||||
], Config#{subscriptions => Subs}),
|
], Config#{subscriptions => Subs}),
|
||||||
ConnectFun = fun(SubsX) -> emqx_portal_connect:start(ConnectModule, ConnectConfig#{subscriptions := SubsX}) end,
|
ConnectFun = fun(SubsX) -> emqx_portal_connect:start(ConnectModule, ConnectConfig#{subscriptions := SubsX}) end,
|
||||||
{ok, connecting,
|
{ok, standing_by,
|
||||||
#{connect_module => ConnectModule,
|
#{connect_module => ConnectModule,
|
||||||
connect_fun => ConnectFun,
|
connect_fun => ConnectFun,
|
||||||
|
start_type => Get(start_type, manual),
|
||||||
reconnect_delay_ms => maps:get(reconnect_delay_ms, Config, ?DEFAULT_RECONNECT_DELAY_MS),
|
reconnect_delay_ms => maps:get(reconnect_delay_ms, Config, ?DEFAULT_RECONNECT_DELAY_MS),
|
||||||
batch_bytes_limit => GetQ(batch_bytes_limit, ?DEFAULT_BATCH_BYTES),
|
batch_bytes_limit => GetQ(batch_bytes_limit, ?DEFAULT_BATCH_BYTES),
|
||||||
batch_count_limit => GetQ(batch_count_limit, ?DEFAULT_BATCH_COUNT),
|
batch_count_limit => GetQ(batch_count_limit, ?DEFAULT_BATCH_COUNT),
|
||||||
|
@ -225,16 +261,36 @@ terminate(_Reason, _StateName, #{replayq := Q} = State) ->
|
||||||
_ = replayq:close(Q),
|
_ = replayq:close(Q),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% @doc Standing by for manual start.
|
||||||
|
standing_by(enter, _, #{start_type := auto}) ->
|
||||||
|
Action = {state_timeout, 0, do_connect},
|
||||||
|
{keep_state_and_data, Action};
|
||||||
|
standing_by(enter, _, #{start_type := manual}) ->
|
||||||
|
keep_state_and_data;
|
||||||
|
standing_by({call, From}, ensure_started, State) ->
|
||||||
|
{next_state, connecting, State, [{reply, From, ok}]};
|
||||||
|
standing_by({call, From}, ensure_stopped, _State) ->
|
||||||
|
{stop_and_reply, {shutdown, manual}, [{reply, From, ok}]};
|
||||||
|
standing_by(state_timeout, do_connect, State) ->
|
||||||
|
{next_state, connecting, State};
|
||||||
|
standing_by({call, From}, _Call, _State) ->
|
||||||
|
{keep_state_and_data, [{reply, From, {error, standing_by}}]};
|
||||||
|
standing_by(info, Info, State) ->
|
||||||
|
?INFO("Portal ~p discarded info event at state standing_by:\n~p", [name(), Info]),
|
||||||
|
{keep_state_and_data, State}.
|
||||||
|
|
||||||
%% @doc Connecting state is a state with timeout.
|
%% @doc Connecting state is a state with timeout.
|
||||||
%% After each timeout, it re-enters this state and start a retry until
|
%% After each timeout, it re-enters this state and start a retry until
|
||||||
%% successfuly connected to remote node/cluster.
|
%% successfuly connected to remote node/cluster.
|
||||||
connecting(enter, connected, #{reconnect_delay_ms := Timeout}) ->
|
connecting(enter, connected, #{reconnect_delay_ms := Timeout}) ->
|
||||||
Action = {state_timeout, Timeout, reconnect},
|
Action = {state_timeout, Timeout, reconnect},
|
||||||
{keep_state_and_data, Action};
|
{keep_state_and_data, Action};
|
||||||
connecting(enter, connecting, #{reconnect_delay_ms := Timeout,
|
connecting(enter, _, #{reconnect_delay_ms := Timeout,
|
||||||
connect_fun := ConnectFun,
|
connect_fun := ConnectFun,
|
||||||
subscriptions := Subs
|
subscriptions := Subs,
|
||||||
} = State) ->
|
forwards := Forwards
|
||||||
|
} = State) ->
|
||||||
|
ok = subscribe_local_topics(Forwards),
|
||||||
case ConnectFun(Subs) of
|
case ConnectFun(Subs) of
|
||||||
{ok, ConnRef, Conn} ->
|
{ok, ConnRef, Conn} ->
|
||||||
Action = {state_timeout, 0, connected},
|
Action = {state_timeout, 0, connected},
|
||||||
|
@ -300,6 +356,8 @@ connected(Type, Content, State) ->
|
||||||
common(connected, Type, Content, State).
|
common(connected, Type, Content, State).
|
||||||
|
|
||||||
%% Common handlers
|
%% Common handlers
|
||||||
|
common(_StateName, {call, From}, ensure_started, _State) ->
|
||||||
|
{keep_state_and_data, [{reply, From, ok}]};
|
||||||
common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) ->
|
common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) ->
|
||||||
{keep_state_and_data, [{reply, From, Forwards}]};
|
{keep_state_and_data, [{reply, From, Forwards}]};
|
||||||
common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) ->
|
common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) ->
|
||||||
|
@ -310,6 +368,8 @@ common(_StateName, {call, From}, {ensure_present, What, Topic}, State) ->
|
||||||
common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) ->
|
common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) ->
|
||||||
{Result, NewState} = ensure_absent(What, Topic, State),
|
{Result, NewState} = ensure_absent(What, Topic, State),
|
||||||
{keep_state, NewState, [{reply, From, Result}]};
|
{keep_state, NewState, [{reply, From, Result}]};
|
||||||
|
common(_StateName, {call, From}, ensure_stopped, _State) ->
|
||||||
|
{stop_and_reply, {shutdown, manual}, [{reply, From, ok}]};
|
||||||
common(_StateName, info, {dispatch, _, Msg},
|
common(_StateName, info, {dispatch, _, Msg},
|
||||||
#{replayq := Q} = State) ->
|
#{replayq := Q} = State) ->
|
||||||
NewQ = replayq:append(Q, collect([Msg])),
|
NewQ = replayq:append(Q, collect([Msg])),
|
||||||
|
|
|
@ -50,7 +50,8 @@ t_mngr(Config) when is_list(Config) ->
|
||||||
forwards => [<<"mngr">>],
|
forwards => [<<"mngr">>],
|
||||||
connect_module => emqx_portal_rpc,
|
connect_module => emqx_portal_rpc,
|
||||||
mountpoint => <<"forwarded">>,
|
mountpoint => <<"forwarded">>,
|
||||||
subscriptions => Subs
|
subscriptions => Subs,
|
||||||
|
start_type => auto
|
||||||
},
|
},
|
||||||
Name = ?FUNCTION_NAME,
|
Name = ?FUNCTION_NAME,
|
||||||
{ok, Pid} = emqx_portal:start_link(Name, Cfg),
|
{ok, Pid} = emqx_portal:start_link(Name, Cfg),
|
||||||
|
@ -76,7 +77,8 @@ t_rpc(Config) when is_list(Config) ->
|
||||||
Cfg = #{address => node(),
|
Cfg = #{address => node(),
|
||||||
forwards => [<<"t_rpc/#">>],
|
forwards => [<<"t_rpc/#">>],
|
||||||
connect_module => emqx_portal_rpc,
|
connect_module => emqx_portal_rpc,
|
||||||
mountpoint => <<"forwarded">>
|
mountpoint => <<"forwarded">>,
|
||||||
|
start_type => auto
|
||||||
},
|
},
|
||||||
{ok, Pid} = emqx_portal:start_link(?FUNCTION_NAME, Cfg),
|
{ok, Pid} = emqx_portal:start_link(?FUNCTION_NAME, Cfg),
|
||||||
ClientId = <<"ClientId">>,
|
ClientId = <<"ClientId">>,
|
||||||
|
@ -125,10 +127,10 @@ t_mqtt(Config) when is_list(Config) ->
|
||||||
},
|
},
|
||||||
reconnect_delay_ms => 1000,
|
reconnect_delay_ms => 1000,
|
||||||
ssl => false,
|
ssl => false,
|
||||||
start_type => manual,
|
|
||||||
%% Consume back to forwarded message for verification
|
%% Consume back to forwarded message for verification
|
||||||
%% NOTE: this is a indefenite loopback without mocking emqx_portal:import_batch/2
|
%% NOTE: this is a indefenite loopback without mocking emqx_portal:import_batch/2
|
||||||
subscriptions => [{ForwardedTopic, _QoS = 1}]
|
subscriptions => [{ForwardedTopic, _QoS = 1}],
|
||||||
|
start_type => auto
|
||||||
},
|
},
|
||||||
Tester = self(),
|
Tester = self(),
|
||||||
Ref = make_ref(),
|
Ref = make_ref(),
|
||||||
|
|
|
@ -96,6 +96,20 @@ test_buffer_when_disconnected() ->
|
||||||
?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000),
|
?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000),
|
||||||
ok = emqx_portal:stop(?PORTAL_REG_NAME).
|
ok = emqx_portal:stop(?PORTAL_REG_NAME).
|
||||||
|
|
||||||
|
manual_start_stop_test() ->
|
||||||
|
Ref = make_ref(),
|
||||||
|
Config0 = make_config(Ref, self(), {ok, Ref, connection}),
|
||||||
|
Config = Config0#{start_type := manual},
|
||||||
|
{ok, Pid} = emqx_portal:ensure_started(?PORTAL_NAME, Config),
|
||||||
|
%% call ensure_started again should yeld the same result
|
||||||
|
{ok, Pid} = emqx_portal:ensure_started(?PORTAL_NAME, Config),
|
||||||
|
?assertEqual(Pid, whereis(?PORTAL_REG_NAME)),
|
||||||
|
?assertEqual({error, standing_by},
|
||||||
|
emqx_portal:ensure_forward_present(Pid, "dummy")),
|
||||||
|
ok = emqx_portal:ensure_stopped(unknown),
|
||||||
|
ok = emqx_portal:ensure_stopped(Pid),
|
||||||
|
ok = emqx_portal:ensure_stopped(?PORTAL_REG_NAME).
|
||||||
|
|
||||||
%% Feed messages to portal
|
%% Feed messages to portal
|
||||||
sender_loop(_Pid, [], _) -> exit(normal);
|
sender_loop(_Pid, [], _) -> exit(normal);
|
||||||
sender_loop(Pid, [Num | Rest], Interval) ->
|
sender_loop(Pid, [Num | Rest], Interval) ->
|
||||||
|
@ -133,7 +147,8 @@ make_config(Ref, TestPid, Result) ->
|
||||||
test_ref => Ref,
|
test_ref => Ref,
|
||||||
connect_module => ?MODULE,
|
connect_module => ?MODULE,
|
||||||
reconnect_delay_ms => 50,
|
reconnect_delay_ms => 50,
|
||||||
connect_result => Result
|
connect_result => Result,
|
||||||
|
start_type => auto
|
||||||
}.
|
}.
|
||||||
|
|
||||||
make_msg(I) ->
|
make_msg(I) ->
|
||||||
|
|
Loading…
Reference in New Issue