diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 55bf162eb..df2c3ff07 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -1,4 +1,3 @@ - %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -122,76 +121,9 @@ init([Options]) -> options = Options, reconnect_interval = ReconnectInterval}}. -handle_call(start_bridge, _From, State = #state{options = Options, - replayq = undefined, - client_pid = undefined, - queue_option = #{batch_size := BatchSize, - replayq_dir := ReplayqDir, - replayq_seg_bytes := ReplayqSegBytes}}) -> - case emqx_client:start_link([{owner, self()}|options(Options)]) of - {ok, ClientPid} -> - case emqx_client:connect(ClientPid) of - {ok, _} -> - emqx_logger:info("[Bridge] connected to remote successfully"), - Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])), - Forwards = subscribe_local_topics(Options), - ReplayQ = replayq:open(#{dir => ReplayqDir, - seg_bytes => ReplayqSegBytes, - sizer => fun(Term) -> - size(term_to_binary(Term)) - end, - marshaller => fun({PktId, Msg}) -> - term_to_binary({PktId, Msg}); - (Bin) -> - binary_to_term(Bin) - end - }), - {NewReplayQ, AckRef, ReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}), - {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []), - {reply, #{msg => <<"start bridge successfully">>}, State#state{client_pid = ClientPid, - subscriptions = Subs, - readq = NewReadQ, - replayq = NewReplayQ, - ackref = AckRef, - forwards = Forwards}}; - {error, Reason} -> - emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]), - {reply, #{msg => <<"connect to remote failed">>}, State#state{client_pid = ClientPid}} - end; - {error, Reason} -> - emqx_logger:error("[Bridge] start failed! error: ~p", [Reason]), - {reply, #{msg => <<"start bridge failed">>}, State} - end; - - -handle_call(start_bridge, _From, State = #state{options = Options, - client_pid = undefined, - replayq = ReplayQ, - queue_option = #{batch_size := BatchSize} - }) -> - case emqx_client:start_link([{owner, self()} | options(Options)]) of - {ok, ClientPid} -> - case emqx_client:connect(ClientPid) of - {ok, _} -> - emqx_logger:info("[Bridge] connected to remote ysucessfully"), - Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])), - Forwards = subscribe_local_topics(Options), - {NewReplayQ, AckRef, ReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}), - {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []), - {reply, #{msg => <<"start bridge successfully">>}, State#state{client_pid = ClientPid, - subscriptions = Subs, - readq = NewReadQ, - replayq = NewReplayQ, - ackref = AckRef, - forwards = Forwards}}; - {error, Reason} -> - emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]), - {reply, #{msg => <<"connect to remote failed">>}, State#state{client_pid = ClientPid}} - end; - {error, Reason} -> - emqx_logger:error("[Bridge] restart failed! error: ~p", [Reason]), - {reply, #{msg => <<"start bridge failed">>}, State} - end; +handle_call(start_bridge, _From, State = #state{client_pid = undefined}) -> + {Msg, NewState} = bridge(start, State), + {reply, #{msg => Msg}, NewState}; handle_call(start_bridge, _From, State) -> {reply, #{msg => <<"bridge already started">>}, State}; @@ -258,34 +190,16 @@ handle_cast(Msg, State) -> emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info(restart, State = #state{options = Options, - client_pid = undefined, - replayq = ReplayQ, - queue_option = #{batch_size := BatchSize} - }) -> - case emqx_client:start_link([{owner, self()} | options(Options)]) of - {ok, ClientPid} -> - case emqx_client:connect(ClientPid) of - {ok, _} -> - emqx_logger:info("[Bridge] connected to remote successfully"), - Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])), - Forwards = subscribe_local_topics(Options), - {NewReplayQ, AckRef, ReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}), - {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []), - {noreply, State#state{client_pid = ClientPid, - subscriptions = Subs, - readq = NewReadQ, - replayq = NewReplayQ, - ackref = AckRef, - forwards = Forwards}}; - {error, Reason} -> - emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]), - {noreply, State#state{client_pid = ClientPid}} - end; - {error, Reason} -> - emqx_logger:error("[Bridge] restart failed! error: ~p", [Reason]), - {noreply, State} - end; +%%---------------------------------------------------------------- +%% Start or restart bridge +%%---------------------------------------------------------------- +handle_info(start, State) -> + {_Msg, NewState} = bridge(start, State), + {noreply, NewState}; + +handle_info(restart, State) -> + {_Msg, NewState} = bridge(restart, State), + {noreply, NewState}; %%---------------------------------------------------------------- %% pop message from replayq and publish again @@ -296,7 +210,7 @@ handle_info(pop, State = #state{writeq = WriteQ, replayq = ReplayQ, {NewReadQ1, NewWriteQ} = case NewReadQ of [] -> {WriteQ, []}; _ -> {NewReadQ, WriteQ} - end, + end, self() ! replay, {noreply, State#state{readq = NewReadQ1, writeq = NewWriteQ, replayq = NewReplayQ, ackref = AckRef}}; @@ -464,3 +378,49 @@ delete(PktId, State = #state{readq = [], writeq = WriteQ}) -> delete(PktId, State = #state{readq = ReadQ}) -> State#state{readq = lists:keydelete(PktId, 1, ReadQ)}. + +bridge(Action, State = #state{options = Options, + replayq = ReplayQ, + queue_option = QueueOption}) -> + case emqx_client:start_link([{owner, self()}|options(Options)]) of + {ok, ClientPid} -> + case emqx_client:connect(ClientPid) of + {ok, _} -> + emqx_logger:info("[Bridge] connected to remote successfully"), + Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])), + Forwards = subscribe_local_topics(Options), + {NewReplayQ, AckRef, ReadQ} = open_replayq(ReplayQ, QueueOption), + {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []), + {<<"start bridge successfully">>, + State#state{client_pid = ClientPid, + subscriptions = Subs, + readq = NewReadQ, + replayq = NewReplayQ, + ackref = AckRef, + forwards = Forwards}}; + {error, Reason} -> + emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]), + {<<"connect to remote failed">>, + State#state{client_pid = ClientPid}} + end; + {error, Reason} -> + emqx_logger:error("[Bridge] ~p failed! error: ~p", [Action, Reason]), + {<<"start bridge failed">>, State} + end. + +open_replayq(undefined, #{batch_size := BatchSize, + replayq_dir := ReplayqDir, + replayq_seg_bytes := ReplayqSegBytes}) -> + ReplayQ = replayq:open(#{dir => ReplayqDir, + seg_bytes => ReplayqSegBytes, + sizer => fun(Term) -> + size(term_to_binary(Term)) + end, + marshaller => fun({PktId, Msg}) -> + term_to_binary({PktId, Msg}); + (Bin) -> + binary_to_term(Bin) + end}), + replayq:pop(ReplayQ, #{count_limit => BatchSize}); +open_replayq(ReplayQ, #{batch_size := BatchSize}) -> + replayq:pop(ReplayQ, #{count_limit => BatchSize}).