Refactor bridge (#2117)

* Refactor bridge
This commit is contained in:
Gilbert 2019-01-03 09:10:43 +08:00 committed by turtleDeng
parent 72791b569e
commit 9a2573d54b
1 changed files with 60 additions and 100 deletions

View File

@ -1,4 +1,3 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
@ -122,76 +121,9 @@ init([Options]) ->
options = Options,
reconnect_interval = ReconnectInterval}}.
handle_call(start_bridge, _From, State = #state{options = Options,
replayq = undefined,
client_pid = undefined,
queue_option = #{batch_size := BatchSize,
replayq_dir := ReplayqDir,
replayq_seg_bytes := ReplayqSegBytes}}) ->
case emqx_client:start_link([{owner, self()}|options(Options)]) of
{ok, ClientPid} ->
case emqx_client:connect(ClientPid) of
{ok, _} ->
emqx_logger:info("[Bridge] connected to remote successfully"),
Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])),
Forwards = subscribe_local_topics(Options),
ReplayQ = replayq:open(#{dir => ReplayqDir,
seg_bytes => ReplayqSegBytes,
sizer => fun(Term) ->
size(term_to_binary(Term))
end,
marshaller => fun({PktId, Msg}) ->
term_to_binary({PktId, Msg});
(Bin) ->
binary_to_term(Bin)
end
}),
{NewReplayQ, AckRef, ReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}),
{ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []),
{reply, #{msg => <<"start bridge successfully">>}, State#state{client_pid = ClientPid,
subscriptions = Subs,
readq = NewReadQ,
replayq = NewReplayQ,
ackref = AckRef,
forwards = Forwards}};
{error, Reason} ->
emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]),
{reply, #{msg => <<"connect to remote failed">>}, State#state{client_pid = ClientPid}}
end;
{error, Reason} ->
emqx_logger:error("[Bridge] start failed! error: ~p", [Reason]),
{reply, #{msg => <<"start bridge failed">>}, State}
end;
handle_call(start_bridge, _From, State = #state{options = Options,
client_pid = undefined,
replayq = ReplayQ,
queue_option = #{batch_size := BatchSize}
}) ->
case emqx_client:start_link([{owner, self()} | options(Options)]) of
{ok, ClientPid} ->
case emqx_client:connect(ClientPid) of
{ok, _} ->
emqx_logger:info("[Bridge] connected to remote ysucessfully"),
Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])),
Forwards = subscribe_local_topics(Options),
{NewReplayQ, AckRef, ReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}),
{ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []),
{reply, #{msg => <<"start bridge successfully">>}, State#state{client_pid = ClientPid,
subscriptions = Subs,
readq = NewReadQ,
replayq = NewReplayQ,
ackref = AckRef,
forwards = Forwards}};
{error, Reason} ->
emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]),
{reply, #{msg => <<"connect to remote failed">>}, State#state{client_pid = ClientPid}}
end;
{error, Reason} ->
emqx_logger:error("[Bridge] restart failed! error: ~p", [Reason]),
{reply, #{msg => <<"start bridge failed">>}, State}
end;
handle_call(start_bridge, _From, State = #state{client_pid = undefined}) ->
{Msg, NewState} = bridge(start, State),
{reply, #{msg => Msg}, NewState};
handle_call(start_bridge, _From, State) ->
{reply, #{msg => <<"bridge already started">>}, State};
@ -258,34 +190,16 @@ handle_cast(Msg, State) ->
emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(restart, State = #state{options = Options,
client_pid = undefined,
replayq = ReplayQ,
queue_option = #{batch_size := BatchSize}
}) ->
case emqx_client:start_link([{owner, self()} | options(Options)]) of
{ok, ClientPid} ->
case emqx_client:connect(ClientPid) of
{ok, _} ->
emqx_logger:info("[Bridge] connected to remote successfully"),
Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])),
Forwards = subscribe_local_topics(Options),
{NewReplayQ, AckRef, ReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}),
{ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []),
{noreply, State#state{client_pid = ClientPid,
subscriptions = Subs,
readq = NewReadQ,
replayq = NewReplayQ,
ackref = AckRef,
forwards = Forwards}};
{error, Reason} ->
emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]),
{noreply, State#state{client_pid = ClientPid}}
end;
{error, Reason} ->
emqx_logger:error("[Bridge] restart failed! error: ~p", [Reason]),
{noreply, State}
end;
%%----------------------------------------------------------------
%% Start or restart bridge
%%----------------------------------------------------------------
handle_info(start, State) ->
{_Msg, NewState} = bridge(start, State),
{noreply, NewState};
handle_info(restart, State) ->
{_Msg, NewState} = bridge(restart, State),
{noreply, NewState};
%%----------------------------------------------------------------
%% pop message from replayq and publish again
@ -296,7 +210,7 @@ handle_info(pop, State = #state{writeq = WriteQ, replayq = ReplayQ,
{NewReadQ1, NewWriteQ} = case NewReadQ of
[] -> {WriteQ, []};
_ -> {NewReadQ, WriteQ}
end,
end,
self() ! replay,
{noreply, State#state{readq = NewReadQ1, writeq = NewWriteQ, replayq = NewReplayQ, ackref = AckRef}};
@ -464,3 +378,49 @@ delete(PktId, State = #state{readq = [], writeq = WriteQ}) ->
delete(PktId, State = #state{readq = ReadQ}) ->
State#state{readq = lists:keydelete(PktId, 1, ReadQ)}.
bridge(Action, State = #state{options = Options,
replayq = ReplayQ,
queue_option = QueueOption}) ->
case emqx_client:start_link([{owner, self()}|options(Options)]) of
{ok, ClientPid} ->
case emqx_client:connect(ClientPid) of
{ok, _} ->
emqx_logger:info("[Bridge] connected to remote successfully"),
Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])),
Forwards = subscribe_local_topics(Options),
{NewReplayQ, AckRef, ReadQ} = open_replayq(ReplayQ, QueueOption),
{ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []),
{<<"start bridge successfully">>,
State#state{client_pid = ClientPid,
subscriptions = Subs,
readq = NewReadQ,
replayq = NewReplayQ,
ackref = AckRef,
forwards = Forwards}};
{error, Reason} ->
emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]),
{<<"connect to remote failed">>,
State#state{client_pid = ClientPid}}
end;
{error, Reason} ->
emqx_logger:error("[Bridge] ~p failed! error: ~p", [Action, Reason]),
{<<"start bridge failed">>, State}
end.
open_replayq(undefined, #{batch_size := BatchSize,
replayq_dir := ReplayqDir,
replayq_seg_bytes := ReplayqSegBytes}) ->
ReplayQ = replayq:open(#{dir => ReplayqDir,
seg_bytes => ReplayqSegBytes,
sizer => fun(Term) ->
size(term_to_binary(Term))
end,
marshaller => fun({PktId, Msg}) ->
term_to_binary({PktId, Msg});
(Bin) ->
binary_to_term(Bin)
end}),
replayq:pop(ReplayQ, #{count_limit => BatchSize});
open_replayq(ReplayQ, #{batch_size := BatchSize}) ->
replayq:pop(ReplayQ, #{count_limit => BatchSize}).