diff --git a/.gitignore b/.gitignore index 7a4e891d1..ab0cbe156 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,4 @@ cuttlefish rebar.lock xrefr erlang.mk +*.coverdata diff --git a/src/portal/emqx_portal.erl b/src/portal/emqx_portal.erl index 76f7c1842..66d83eacc 100644 --- a/src/portal/emqx_portal.erl +++ b/src/portal/emqx_portal.erl @@ -74,6 +74,7 @@ -export([standing_by/3, connecting/3, connected/3]). %% management APIs +-export([start_bridge/1, stop_bridge/1, status/1]). -export([ensure_started/2, ensure_stopped/1, ensure_stopped/2]). -export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]). -export([get_subscriptions/1, ensure_subscription_present/3, ensure_subscription_absent/2]). @@ -161,6 +162,15 @@ ensure_stopped(Id, Timeout) -> stop(Pid) -> gen_statem:stop(Pid). +start_bridge(Name) -> + gen_statem:call(name(Name), ensure_started). + +stop_bridge(Name) -> + gen_statem:call(name(Name), ensure_stopped). + +status(Pid) -> + gen_statem:call(Pid, status). + %% @doc This function is to be evaluated on message/batch receiver side. -spec import_batch(batch(), fun(() -> ok)) -> ok. import_batch(Batch, AckFun) -> @@ -268,16 +278,21 @@ standing_by(enter, _, #{start_type := auto}) -> standing_by(enter, _, #{start_type := manual}) -> keep_state_and_data; standing_by({call, From}, ensure_started, State) -> - {next_state, connecting, State, [{reply, From, ok}]}; + {next_state, connecting, State, + [{reply, From, <<"starting bridge ......">>}]}; standing_by({call, From}, ensure_stopped, _State) -> - {stop_and_reply, {shutdown, manual}, [{reply, From, ok}]}; + {keep_state_and_data, [{reply, From, <<"bridge not started">>}]}; +standing_by({call, From}, status, _State) -> + {keep_state_and_data, [{reply, From, <<"Stopped">>}]}; standing_by(state_timeout, do_connect, State) -> {next_state, connecting, State}; standing_by({call, From}, _Call, _State) -> {keep_state_and_data, [{reply, From, {error, standing_by}}]}; standing_by(info, Info, State) -> ?INFO("Portal ~p discarded info event at state standing_by:\n~p", [name(), Info]), - {keep_state_and_data, State}. + {keep_state_and_data, State}; +standing_by(Type, Content, State) -> + common(connecting, Type, Content, State). %% @doc Connecting state is a state with timeout. %% After each timeout, it re-enters this state and start a retry until @@ -303,6 +318,10 @@ connecting(state_timeout, connected, State) -> {next_state, connected, State}; connecting(state_timeout, reconnect, _State) -> repeat_state_and_data; +connecting({call, From}, status, _State) -> + {keep_state_and_data, [{reply, From, <<"Stopped">>}]}; +connecting({call, From}, _Call, _State) -> + {keep_state_and_data, [{reply, From, <<"starting bridge ......">>}]}; connecting(info, {batch_ack, Ref}, State) -> case do_ack(State, Ref) of {ok, NewState} -> @@ -334,14 +353,17 @@ connected(internal, maybe_send, State) -> {error, NewState} -> {next_state, connecting, disconnect(NewState)} end; +connected({call, From}, ensure_started, _State) -> + {keep_state_and_data, [{reply, From, <<"bridge already started">>}]}; +connected({call, From}, status, _State) -> + {keep_state_and_data, [{reply, From, <<"Running">>}]}; connected(info, {disconnected, ConnRef, Reason}, #{conn_ref := ConnRef, connection := Conn} = State) -> ?INFO("Portal ~p diconnected~nreason=~p", [name(), Conn, Reason]), {next_state, connecting, State#{conn_ref := undefined, - connection := undefined - }}; + connection := undefined}}; connected(info, {batch_ack, Ref}, State) -> case do_ack(State, Ref) of stale -> @@ -369,7 +391,8 @@ common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) -> {Result, NewState} = ensure_absent(What, Topic, State), {keep_state, NewState, [{reply, From, Result}]}; common(_StateName, {call, From}, ensure_stopped, _State) -> - {stop_and_reply, {shutdown, manual}, [{reply, From, ok}]}; + {stop_and_reply, {shutdown, manual}, + [{reply, From, <<"stop bridge successfully">>}]}; common(_StateName, info, {dispatch, _, Msg}, #{replayq := Q} = State) -> NewQ = replayq:append(Q, collect([Msg])), @@ -536,4 +559,3 @@ name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])). id(Pid) when is_pid(Pid) -> Pid; id(Name) -> name(Name). - diff --git a/src/portal/emqx_portal_sup.erl b/src/portal/emqx_portal_sup.erl index 79afd6352..3f78f7680 100644 --- a/src/portal/emqx_portal_sup.erl +++ b/src/portal/emqx_portal_sup.erl @@ -15,7 +15,7 @@ -module(emqx_portal_sup). -behavior(supervisor). --export([start_link/0, start_link/1]). +-export([start_link/0, start_link/1, portals/0]). -export([init/1]). @@ -52,3 +52,6 @@ portal_spec({Name, Config}) -> modules => [emqx_portal] }. +-spec(portals() -> [{node(), map()}]). +portals() -> + [{Name, emqx_portal:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?WORKER_SUP)]. diff --git a/test/emqx_portal_tests.erl b/test/emqx_portal_tests.erl index 03f545b38..e9a3583fe 100644 --- a/test/emqx_portal_tests.erl +++ b/test/emqx_portal_tests.erl @@ -106,9 +106,9 @@ manual_start_stop_test() -> ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)), ?assertEqual({error, standing_by}, emqx_portal:ensure_forward_present(Pid, "dummy")), - ok = emqx_portal:ensure_stopped(unknown), - ok = emqx_portal:ensure_stopped(Pid), - ok = emqx_portal:ensure_stopped(?PORTAL_REG_NAME). + emqx_portal:ensure_stopped(unknown), + emqx_portal:ensure_stopped(Pid), + emqx_portal:ensure_stopped(?PORTAL_REG_NAME). %% Feed messages to portal sender_loop(_Pid, [], _) -> exit(normal); @@ -154,4 +154,3 @@ make_config(Ref, TestPid, Result) -> make_msg(I) -> Payload = integer_to_binary(I), emqx_message:make(<<"test/topic">>, Payload). -