Add emqx_portal interfaces

This commit is contained in:
Gilbert Wong 2019-02-25 16:59:10 +08:00
parent 921a45a505
commit ec37225333
4 changed files with 37 additions and 12 deletions

1
.gitignore vendored
View File

@ -38,3 +38,4 @@ cuttlefish
rebar.lock
xrefr
erlang.mk
*.coverdata

View File

@ -74,6 +74,7 @@
-export([standing_by/3, connecting/3, connected/3]).
%% management APIs
-export([start_bridge/1, stop_bridge/1, status/1]).
-export([ensure_started/2, ensure_stopped/1, ensure_stopped/2]).
-export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]).
-export([get_subscriptions/1, ensure_subscription_present/3, ensure_subscription_absent/2]).
@ -161,6 +162,15 @@ ensure_stopped(Id, Timeout) ->
stop(Pid) -> gen_statem:stop(Pid).
start_bridge(Name) ->
gen_statem:call(name(Name), ensure_started).
stop_bridge(Name) ->
gen_statem:call(name(Name), ensure_stopped).
status(Pid) ->
gen_statem:call(Pid, status).
%% @doc This function is to be evaluated on message/batch receiver side.
-spec import_batch(batch(), fun(() -> ok)) -> ok.
import_batch(Batch, AckFun) ->
@ -268,16 +278,21 @@ standing_by(enter, _, #{start_type := auto}) ->
standing_by(enter, _, #{start_type := manual}) ->
keep_state_and_data;
standing_by({call, From}, ensure_started, State) ->
{next_state, connecting, State, [{reply, From, ok}]};
{next_state, connecting, State,
[{reply, From, <<"starting bridge ......">>}]};
standing_by({call, From}, ensure_stopped, _State) ->
{stop_and_reply, {shutdown, manual}, [{reply, From, ok}]};
{keep_state_and_data, [{reply, From, <<"bridge not started">>}]};
standing_by({call, From}, status, _State) ->
{keep_state_and_data, [{reply, From, <<"Stopped">>}]};
standing_by(state_timeout, do_connect, State) ->
{next_state, connecting, State};
standing_by({call, From}, _Call, _State) ->
{keep_state_and_data, [{reply, From, {error, standing_by}}]};
standing_by(info, Info, State) ->
?INFO("Portal ~p discarded info event at state standing_by:\n~p", [name(), Info]),
{keep_state_and_data, State}.
{keep_state_and_data, State};
standing_by(Type, Content, State) ->
common(connecting, Type, Content, State).
%% @doc Connecting state is a state with timeout.
%% After each timeout, it re-enters this state and start a retry until
@ -303,6 +318,10 @@ connecting(state_timeout, connected, State) ->
{next_state, connected, State};
connecting(state_timeout, reconnect, _State) ->
repeat_state_and_data;
connecting({call, From}, status, _State) ->
{keep_state_and_data, [{reply, From, <<"Stopped">>}]};
connecting({call, From}, _Call, _State) ->
{keep_state_and_data, [{reply, From, <<"starting bridge ......">>}]};
connecting(info, {batch_ack, Ref}, State) ->
case do_ack(State, Ref) of
{ok, NewState} ->
@ -334,14 +353,17 @@ connected(internal, maybe_send, State) ->
{error, NewState} ->
{next_state, connecting, disconnect(NewState)}
end;
connected({call, From}, ensure_started, _State) ->
{keep_state_and_data, [{reply, From, <<"bridge already started">>}]};
connected({call, From}, status, _State) ->
{keep_state_and_data, [{reply, From, <<"Running">>}]};
connected(info, {disconnected, ConnRef, Reason},
#{conn_ref := ConnRef, connection := Conn} = State) ->
?INFO("Portal ~p diconnected~nreason=~p",
[name(), Conn, Reason]),
{next_state, connecting,
State#{conn_ref := undefined,
connection := undefined
}};
connection := undefined}};
connected(info, {batch_ack, Ref}, State) ->
case do_ack(State, Ref) of
stale ->
@ -369,7 +391,8 @@ common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) ->
{Result, NewState} = ensure_absent(What, Topic, State),
{keep_state, NewState, [{reply, From, Result}]};
common(_StateName, {call, From}, ensure_stopped, _State) ->
{stop_and_reply, {shutdown, manual}, [{reply, From, ok}]};
{stop_and_reply, {shutdown, manual},
[{reply, From, <<"stop bridge successfully">>}]};
common(_StateName, info, {dispatch, _, Msg},
#{replayq := Q} = State) ->
NewQ = replayq:append(Q, collect([Msg])),
@ -536,4 +559,3 @@ name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])).
id(Pid) when is_pid(Pid) -> Pid;
id(Name) -> name(Name).

View File

@ -15,7 +15,7 @@
-module(emqx_portal_sup).
-behavior(supervisor).
-export([start_link/0, start_link/1]).
-export([start_link/0, start_link/1, portals/0]).
-export([init/1]).
@ -52,3 +52,6 @@ portal_spec({Name, Config}) ->
modules => [emqx_portal]
}.
-spec(portals() -> [{node(), map()}]).
portals() ->
[{Name, emqx_portal:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?WORKER_SUP)].

View File

@ -106,9 +106,9 @@ manual_start_stop_test() ->
?assertEqual(Pid, whereis(?PORTAL_REG_NAME)),
?assertEqual({error, standing_by},
emqx_portal:ensure_forward_present(Pid, "dummy")),
ok = emqx_portal:ensure_stopped(unknown),
ok = emqx_portal:ensure_stopped(Pid),
ok = emqx_portal:ensure_stopped(?PORTAL_REG_NAME).
emqx_portal:ensure_stopped(unknown),
emqx_portal:ensure_stopped(Pid),
emqx_portal:ensure_stopped(?PORTAL_REG_NAME).
%% Feed messages to portal
sender_loop(_Pid, [], _) -> exit(normal);
@ -154,4 +154,3 @@ make_config(Ref, TestPid, Result) ->
make_msg(I) ->
Payload = integer_to_binary(I),
emqx_message:make(<<"test/topic">>, Payload).