From 4d2bc48822bba8d45f4aa256141db63312ba431d Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 11 Apr 2019 16:40:02 +0800 Subject: [PATCH] Redesign ensure_start and ensure_stop api of bridge --- src/emqx_bridge.erl | 52 +++++++++++++++++++++++-------------- src/emqx_bridge_connect.erl | 3 +-- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 03e7a478e..3261a02bc 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -120,6 +120,7 @@ -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). -define(DEFAULT_SEG_BYTES, (1 bsl 20)). -define(NO_BRIDGE_HANDLER, undefined). +-define(NO_FROM, undefined). -define(maybe_send, {next_event, internal, maybe_send}). %% @doc Start a bridge worker. Supported configs: @@ -297,8 +298,7 @@ standing_by(enter, _, #{start_type := auto}) -> standing_by(enter, _, #{start_type := manual}) -> keep_state_and_data; standing_by({call, From}, ensure_started, State) -> - {next_state, connecting, State, - [{reply, From, ok}]}; + do_connect({call, From}, standing_by, State); standing_by(state_timeout, do_connect, State) -> {next_state, connecting, State}; standing_by(info, Info, State) -> @@ -313,23 +313,8 @@ standing_by(Type, Content, State) -> connecting(enter, connected, #{reconnect_delay_ms := Timeout}) -> Action = {state_timeout, Timeout, reconnect}, {keep_state_and_data, Action}; -connecting(enter, _, #{reconnect_delay_ms := Timeout, - connect_fun := ConnectFun, - subscriptions := Subs, - forwards := Forwards - } = State) -> - ok = subscribe_local_topics(Forwards), - case ConnectFun(Subs) of - {ok, ConnRef, Conn} -> - ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), - Action = {state_timeout, 0, connected}, - State0 = State#{conn_ref => ConnRef, connection => Conn}, - State1 = eval_bridge_handler(State0, connected), - {keep_state, State1, Action}; - error -> - Action = {state_timeout, Timeout, reconnect}, - {keep_state_and_data, Action} - end; +connecting(enter, _, State) -> + do_connect(enter, connecting, State); connecting(state_timeout, connected, State) -> {next_state, connected, State}; connecting(state_timeout, reconnect, _State) -> @@ -455,6 +440,35 @@ is_topic_present({Topic, _QoS}, Topics) -> is_topic_present(Topic, Topics) -> lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics). +do_connect(Type, StateName, #{ forwards := Forwards + , subscriptions := Subs + , connect_fun := ConnectFun + , reconnect_delay_ms := Timeout + } = State) -> + ok = subscribe_local_topics(Forwards), + From = case StateName of + standing_by -> {call, Pid} = Type, Pid; + connecting -> ?NO_FROM + end, + DoEvent = fun (standing_by, StandingbyAction, _ConnectingAction) -> + StandingbyAction; + (connecting, _StandingbyAction, ConnectingAction) -> + ConnectingAction + end, + case ConnectFun(Subs) of + {ok, ConnRef, Conn} -> + ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), + State0 = State#{conn_ref => ConnRef, connection => Conn}, + State1 = eval_bridge_handler(State0, connected), + StandingbyAction = {next_state, connected, State1, [{reply, From, ok}]}, + ConnectingAction = {keep_state, State1, {state_timeout, 0, connected}}, + DoEvent(StateName, StandingbyAction, ConnectingAction); + {error, Reason} -> + StandingbyAction = {keep_state_and_data, [{reply, From, {error, Reason}}]}, + ConnectingAction = {keep_state_and_data, {state_timeout, Timeout, reconnect}}, + DoEvent(StateName, StandingbyAction, ConnectingAction) + end. + do_ensure_present(forwards, Topic, _) -> ok = subscribe_local_topic(Topic); do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule, diff --git a/src/emqx_bridge_connect.erl b/src/emqx_bridge_connect.erl index 37231ca88..8685451ae 100644 --- a/src/emqx_bridge_connect.erl +++ b/src/emqx_bridge_connect.erl @@ -56,7 +56,7 @@ start(Module, Config) -> Config1 = obfuscate(Config), ?LOG(error, "[Bridge connect] Failed to connect with module=~p\n" "config=~p\nreason:~p", [Module, Config1, Reason]), - error + {error, Reason} end. obfuscate(Map) -> @@ -69,4 +69,3 @@ obfuscate(Map) -> is_sensitive(password) -> true; is_sensitive(_) -> false. -