Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-04-12 03:53:43 +08:00
commit 8662f55c4c
2 changed files with 34 additions and 21 deletions

View File

@ -120,6 +120,7 @@
-define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
-define(DEFAULT_SEG_BYTES, (1 bsl 20)).
-define(NO_BRIDGE_HANDLER, undefined).
-define(NO_FROM, undefined).
-define(maybe_send, {next_event, internal, maybe_send}).
%% @doc Start a bridge worker. Supported configs:
@ -297,8 +298,7 @@ standing_by(enter, _, #{start_type := auto}) ->
standing_by(enter, _, #{start_type := manual}) ->
keep_state_and_data;
standing_by({call, From}, ensure_started, State) ->
{next_state, connecting, State,
[{reply, From, ok}]};
do_connect({call, From}, standing_by, State);
standing_by(state_timeout, do_connect, State) ->
{next_state, connecting, State};
standing_by(info, Info, State) ->
@ -313,23 +313,8 @@ standing_by(Type, Content, State) ->
connecting(enter, connected, #{reconnect_delay_ms := Timeout}) ->
Action = {state_timeout, Timeout, reconnect},
{keep_state_and_data, Action};
connecting(enter, _, #{reconnect_delay_ms := Timeout,
connect_fun := ConnectFun,
subscriptions := Subs,
forwards := Forwards
} = State) ->
ok = subscribe_local_topics(Forwards),
case ConnectFun(Subs) of
{ok, ConnRef, Conn} ->
?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
Action = {state_timeout, 0, connected},
State0 = State#{conn_ref => ConnRef, connection => Conn},
State1 = eval_bridge_handler(State0, connected),
{keep_state, State1, Action};
error ->
Action = {state_timeout, Timeout, reconnect},
{keep_state_and_data, Action}
end;
connecting(enter, _, State) ->
do_connect(enter, connecting, State);
connecting(state_timeout, connected, State) ->
{next_state, connected, State};
connecting(state_timeout, reconnect, _State) ->
@ -455,6 +440,35 @@ is_topic_present({Topic, _QoS}, Topics) ->
is_topic_present(Topic, Topics) ->
lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics).
do_connect(Type, StateName, #{ forwards := Forwards
, subscriptions := Subs
, connect_fun := ConnectFun
, reconnect_delay_ms := Timeout
} = State) ->
ok = subscribe_local_topics(Forwards),
From = case StateName of
standing_by -> {call, Pid} = Type, Pid;
connecting -> ?NO_FROM
end,
DoEvent = fun (standing_by, StandingbyAction, _ConnectingAction) ->
StandingbyAction;
(connecting, _StandingbyAction, ConnectingAction) ->
ConnectingAction
end,
case ConnectFun(Subs) of
{ok, ConnRef, Conn} ->
?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
State0 = State#{conn_ref => ConnRef, connection => Conn},
State1 = eval_bridge_handler(State0, connected),
StandingbyAction = {next_state, connected, State1, [{reply, From, ok}]},
ConnectingAction = {keep_state, State1, {state_timeout, 0, connected}},
DoEvent(StateName, StandingbyAction, ConnectingAction);
{error, Reason} ->
StandingbyAction = {keep_state_and_data, [{reply, From, {error, Reason}}]},
ConnectingAction = {keep_state_and_data, {state_timeout, Timeout, reconnect}},
DoEvent(StateName, StandingbyAction, ConnectingAction)
end.
do_ensure_present(forwards, Topic, _) ->
ok = subscribe_local_topic(Topic);
do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule,

View File

@ -56,7 +56,7 @@ start(Module, Config) ->
Config1 = obfuscate(Config),
?LOG(error, "[Bridge connect] Failed to connect with module=~p\n"
"config=~p\nreason:~p", [Module, Config1, Reason]),
error
{error, Reason}
end.
obfuscate(Map) ->
@ -69,4 +69,3 @@ obfuscate(Map) ->
is_sensitive(password) -> true;
is_sensitive(_) -> false.