Merge remote-tracking branch 'origin/develop'
This commit is contained in:
commit
e6ccbc601c
|
@ -119,6 +119,7 @@
|
||||||
-define(DEFAULT_SEND_AHEAD, 8).
|
-define(DEFAULT_SEND_AHEAD, 8).
|
||||||
-define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
|
-define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
|
||||||
-define(DEFAULT_SEG_BYTES, (1 bsl 20)).
|
-define(DEFAULT_SEG_BYTES, (1 bsl 20)).
|
||||||
|
-define(NO_BRIDGE_HANDLER, undefined).
|
||||||
-define(maybe_send, {next_event, internal, maybe_send}).
|
-define(maybe_send, {next_event, internal, maybe_send}).
|
||||||
|
|
||||||
%% @doc Start a bridge worker. Supported configs:
|
%% @doc Start a bridge worker. Supported configs:
|
||||||
|
@ -277,7 +278,8 @@ init(Config) ->
|
||||||
subscriptions => Subs,
|
subscriptions => Subs,
|
||||||
replayq => Queue,
|
replayq => Queue,
|
||||||
inflight => [],
|
inflight => [],
|
||||||
connection => undefined
|
connection => undefined,
|
||||||
|
bridge_handler => Get(bridge_handler, ?NO_BRIDGE_HANDLER)
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
code_change(_Vsn, State, Data, _Extra) ->
|
code_change(_Vsn, State, Data, _Extra) ->
|
||||||
|
@ -321,7 +323,10 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout,
|
||||||
{ok, ConnRef, Conn} ->
|
{ok, ConnRef, Conn} ->
|
||||||
?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
|
?LOG(info, "[Bridge] Bridge ~p connected", [name()]),
|
||||||
Action = {state_timeout, 0, connected},
|
Action = {state_timeout, 0, connected},
|
||||||
{keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action};
|
{keep_state,
|
||||||
|
eval_bridge_handler(State#{ conn_ref => ConnRef
|
||||||
|
, connection => Conn}, connected),
|
||||||
|
Action};
|
||||||
error ->
|
error ->
|
||||||
Action = {state_timeout, Timeout, reconnect},
|
Action = {state_timeout, Timeout, reconnect},
|
||||||
{keep_state_and_data, Action}
|
{keep_state_and_data, Action}
|
||||||
|
@ -416,6 +421,12 @@ common(StateName, Type, Content, State) ->
|
||||||
[name(), Type, StateName, Content]),
|
[name(), Type, StateName, Content]),
|
||||||
{keep_state, State}.
|
{keep_state, State}.
|
||||||
|
|
||||||
|
eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) ->
|
||||||
|
State;
|
||||||
|
eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) ->
|
||||||
|
_ = Handler(Msg),
|
||||||
|
State.
|
||||||
|
|
||||||
ensure_present(Key, Topic, State) ->
|
ensure_present(Key, Topic, State) ->
|
||||||
Topics = maps:get(Key, State),
|
Topics = maps:get(Key, State),
|
||||||
case is_topic_present(Topic, Topics) of
|
case is_topic_present(Topic, Topics) of
|
||||||
|
@ -553,9 +564,11 @@ disconnect(#{connection := Conn,
|
||||||
connect_module := Module
|
connect_module := Module
|
||||||
} = State) when Conn =/= undefined ->
|
} = State) when Conn =/= undefined ->
|
||||||
ok = Module:stop(ConnRef, Conn),
|
ok = Module:stop(ConnRef, Conn),
|
||||||
State#{conn_ref => undefined,
|
eval_bridge_handler(State#{conn_ref => undefined,
|
||||||
connection => undefined};
|
connection => undefined},
|
||||||
disconnect(State) -> State.
|
disconnected);
|
||||||
|
disconnect(State) ->
|
||||||
|
eval_bridge_handler(State, disconnected).
|
||||||
|
|
||||||
%% Called only when replayq needs to dump it to disk.
|
%% Called only when replayq needs to dump it to disk.
|
||||||
msg_marshaller(Bin) when is_binary(Bin) -> emqx_bridge_msg:from_binary(Bin);
|
msg_marshaller(Bin) when is_binary(Bin) -> emqx_bridge_msg:from_binary(Bin);
|
||||||
|
|
|
@ -20,11 +20,12 @@
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([ start_link/0
|
-export([ start_link/0
|
||||||
, start_link/1
|
, start_link/1
|
||||||
, bridges/0
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ create_bridge/2
|
-export([ create_bridge/2
|
||||||
, drop_bridge/1
|
, drop_bridge/1
|
||||||
|
, bridges/0
|
||||||
|
, is_bridge_exist/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% supervisor callbacks
|
%% supervisor callbacks
|
||||||
|
@ -58,6 +59,13 @@ bridge_spec({Name, Config}) ->
|
||||||
bridges() ->
|
bridges() ->
|
||||||
[{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)].
|
[{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)].
|
||||||
|
|
||||||
|
-spec(is_bridge_exist(atom() | pid()) -> boolean()).
|
||||||
|
is_bridge_exist(Id) ->
|
||||||
|
case supervisor:get_childspec(?SUP, Id) of
|
||||||
|
{ok, _ChildSpec} -> true;
|
||||||
|
{error, _Error} -> false
|
||||||
|
end.
|
||||||
|
|
||||||
create_bridge(Id, Config) ->
|
create_bridge(Id, Config) ->
|
||||||
supervisor:start_child(?SUP, bridge_spec({Id, Config})).
|
supervisor:start_child(?SUP, bridge_spec({Id, Config})).
|
||||||
|
|
||||||
|
@ -69,4 +77,3 @@ drop_bridge(Id) ->
|
||||||
?LOG(error, "[Bridge] Delete bridge failed", [Error]),
|
?LOG(error, "[Bridge] Delete bridge failed", [Error]),
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue