diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index a15851f0f..6af56d11b 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -119,6 +119,7 @@ -define(DEFAULT_SEND_AHEAD, 8). -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). -define(DEFAULT_SEG_BYTES, (1 bsl 20)). +-define(NO_BRIDGE_HANDLER, undefined). -define(maybe_send, {next_event, internal, maybe_send}). %% @doc Start a bridge worker. Supported configs: @@ -277,7 +278,8 @@ init(Config) -> subscriptions => Subs, replayq => Queue, inflight => [], - connection => undefined + connection => undefined, + bridge_handler => Get(bridge_handler, ?NO_BRIDGE_HANDLER) }}. code_change(_Vsn, State, Data, _Extra) -> @@ -321,7 +323,10 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout, {ok, ConnRef, Conn} -> ?LOG(info, "[Bridge] Bridge ~p connected", [name()]), Action = {state_timeout, 0, connected}, - {keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action}; + {keep_state, + eval_bridge_handler(State#{ conn_ref => ConnRef + , connection => Conn}, connected), + Action}; error -> Action = {state_timeout, Timeout, reconnect}, {keep_state_and_data, Action} @@ -416,6 +421,12 @@ common(StateName, Type, Content, State) -> [name(), Type, StateName, Content]), {keep_state, State}. +eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) -> + State; +eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) -> + _ = Handler(Msg), + State. + ensure_present(Key, Topic, State) -> Topics = maps:get(Key, State), case is_topic_present(Topic, Topics) of @@ -553,9 +564,11 @@ disconnect(#{connection := Conn, connect_module := Module } = State) when Conn =/= undefined -> ok = Module:stop(ConnRef, Conn), - State#{conn_ref => undefined, - connection => undefined}; -disconnect(State) -> State. + eval_bridge_handler(State#{conn_ref => undefined, + connection => undefined}, + disconnected); +disconnect(State) -> + eval_bridge_handler(State, disconnected). %% Called only when replayq needs to dump it to disk. msg_marshaller(Bin) when is_binary(Bin) -> emqx_bridge_msg:from_binary(Bin); diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index fc0be3995..b00bb9012 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -20,11 +20,12 @@ %% APIs -export([ start_link/0 , start_link/1 - , bridges/0 ]). -export([ create_bridge/2 , drop_bridge/1 + , bridges/0 + , is_bridge_exist/1 ]). %% supervisor callbacks @@ -58,6 +59,13 @@ bridge_spec({Name, Config}) -> bridges() -> [{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)]. +-spec(is_bridge_exist(atom() | pid()) -> boolean()). +is_bridge_exist(Id) -> + case supervisor:get_childspec(?SUP, Id) of + {ok, _ChildSpec} -> true; + {error, _Error} -> false + end. + create_bridge(Id, Config) -> supervisor:start_child(?SUP, bridge_spec({Id, Config})). @@ -69,4 +77,3 @@ drop_bridge(Id) -> ?LOG(error, "[Bridge] Delete bridge failed", [Error]), Error end. -