chore: refactor mqtt connector

This commit is contained in:
JianBo He 2022-08-23 18:17:32 +08:00
parent 10f16b0adc
commit ca7ad9cc15
3 changed files with 82 additions and 202 deletions

View File

@ -39,6 +39,7 @@
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_query_async/4,
on_get_status/2 on_get_status/2
]). ]).
@ -190,6 +191,16 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
ok. ok.
on_query_async(
_InstId,
{send_message, Msg},
{ReplayFun, Args},
#{name := InstanceId}
) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplayFun, Args}),
ok.
on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) -> on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) ->
AutoReconn = maps:get(auto_reconnect, Conf, true), AutoReconn = maps:get(auto_reconnect, Conf, true),
case emqx_connector_mqtt_worker:status(InstanceId) of case emqx_connector_mqtt_worker:status(InstanceId) of

View File

@ -21,6 +21,7 @@
-export([ -export([
start/1, start/1,
send/2, send/2,
send_async/3,
stop/1, stop/1,
ping/1 ping/1
]). ]).
@ -32,7 +33,6 @@
%% callbacks for emqtt %% callbacks for emqtt
-export([ -export([
handle_puback/2,
handle_publish/3, handle_publish/3,
handle_disconnected/2 handle_disconnected/2
]). ]).
@ -134,44 +134,11 @@ safe_stop(Pid, StopF, Timeout) ->
exit(Pid, kill) exit(Pid, kill)
end. end.
send(Conn, Msgs) -> send(#{client_pid := ClientPid}, Msg) ->
send(Conn, Msgs, []). emqtt:publish(ClientPid, Msg).
send(_Conn, [], []) -> send_async(#{client_pid := ClientPid}, Msg, Callback) ->
%% all messages in the batch are QoS-0 emqtt:publish_async(ClientPid, Msg, Callback).
Ref = make_ref(),
%% QoS-0 messages do not have packet ID
%% the batch ack is simulated with a loop-back message
self() ! {batch_ack, Ref},
{ok, Ref};
send(_Conn, [], PktIds) ->
%% PktIds is not an empty list if there is any non-QoS-0 message in the batch,
%% And the worker should wait for all acks
{ok, PktIds};
send(#{client_pid := ClientPid} = Conn, [Msg | Rest], PktIds) ->
case emqtt:publish(ClientPid, Msg) of
ok ->
send(Conn, Rest, PktIds);
{ok, PktId} ->
send(Conn, Rest, [PktId | PktIds]);
{error, Reason} ->
%% NOTE: There is no partial success of a batch and recover from the middle
%% only to retry all messages in one batch
{error, Reason}
end.
handle_puback(#{packet_id := PktId, reason_code := RC}, Parent) when
RC =:= ?RC_SUCCESS;
RC =:= ?RC_NO_MATCHING_SUBSCRIBERS
->
Parent ! {batch_ack, PktId},
ok;
handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
?SLOG(warning, #{
msg => "publish_to_remote_node_falied",
packet_id => PktId,
reason_code => RC
}).
handle_publish(Msg, undefined, _Opts) -> handle_publish(Msg, undefined, _Opts) ->
?SLOG(error, #{ ?SLOG(error, #{
@ -200,7 +167,6 @@ handle_disconnected(Reason, Parent) ->
make_hdlr(Parent, Vars, Opts) -> make_hdlr(Parent, Vars, Opts) ->
#{ #{
puback => {fun ?MODULE:handle_puback/2, [Parent]},
publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]}, publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]},
disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]} disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]}
}. }.

View File

@ -91,16 +91,14 @@
ensure_stopped/1, ensure_stopped/1,
status/1, status/1,
ping/1, ping/1,
send_to_remote/2 send_to_remote/2,
send_to_remote_async/3
]). ]).
-export([get_forwards/1]). -export([get_forwards/1]).
-export([get_subscriptions/1]). -export([get_subscriptions/1]).
%% Internal
-export([msg_marshaller/1]).
-export_type([ -export_type([
config/0, config/0,
ack_ref/0 ack_ref/0
@ -133,12 +131,6 @@
%% mountpoint: The topic mount point for messages sent to remote node/cluster %% mountpoint: The topic mount point for messages sent to remote node/cluster
%% `undefined', `<<>>' or `""' to disable %% `undefined', `<<>>' or `""' to disable
%% forwards: Local topics to subscribe. %% forwards: Local topics to subscribe.
%% replayq.batch_bytes_limit: Max number of bytes to collect in a batch for each
%% send call towards emqx_bridge_connect
%% replayq.batch_count_limit: Max number of messages to collect in a batch for
%% each send call towards emqx_bridge_connect
%% replayq.dir: Directory where replayq should persist messages
%% replayq.seg_bytes: Size in bytes for each replayq segment file
%% %%
%% Find more connection specific configs in the callback modules %% Find more connection specific configs in the callback modules
%% of emqx_bridge_connect behaviour. %% of emqx_bridge_connect behaviour.
@ -173,9 +165,14 @@ ping(Name) ->
gen_statem:call(name(Name), ping). gen_statem:call(name(Name), ping).
send_to_remote(Pid, Msg) when is_pid(Pid) -> send_to_remote(Pid, Msg) when is_pid(Pid) ->
gen_statem:cast(Pid, {send_to_remote, Msg}); gen_statem:call(Pid, {send_to_remote, Msg});
send_to_remote(Name, Msg) -> send_to_remote(Name, Msg) ->
gen_statem:cast(name(Name), {send_to_remote, Msg}). gen_statem:call(name(Name), {send_to_remote, Msg}).
send_to_remote_async(Pid, Msg, Callback) when is_pid(Pid) ->
gen_statem:cast(Pid, {send_to_remote_async, Msg, Callback});
send_to_remote_async(Name, Msg, Callback) ->
gen_statem:cast(name(Name), {send_to_remote_async, Msg, Callback}).
%% @doc Return all forwards (local subscriptions). %% @doc Return all forwards (local subscriptions).
-spec get_forwards(id()) -> [topic()]. -spec get_forwards(id()) -> [topic()].
@ -194,12 +191,10 @@ init(#{name := Name} = ConnectOpts) ->
name => Name name => Name
}), }),
erlang:process_flag(trap_exit, true), erlang:process_flag(trap_exit, true),
Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})),
State = init_state(ConnectOpts), State = init_state(ConnectOpts),
self() ! idle, self() ! idle,
{ok, idle, State#{ {ok, idle, State#{
connect_opts => pre_process_opts(ConnectOpts), connect_opts => pre_process_opts(ConnectOpts)
replayq => Queue
}}. }}.
init_state(Opts) -> init_state(Opts) ->
@ -212,32 +207,11 @@ init_state(Opts) ->
start_type => StartType, start_type => StartType,
reconnect_interval => ReconnDelayMs, reconnect_interval => ReconnDelayMs,
mountpoint => format_mountpoint(Mountpoint), mountpoint => format_mountpoint(Mountpoint),
inflight => [],
max_inflight => MaxInflightSize, max_inflight => MaxInflightSize,
connection => undefined, connection => undefined,
name => Name name => Name
}. }.
open_replayq(Name, QCfg) ->
Dir = maps:get(dir, QCfg, undefined),
SegBytes = maps:get(seg_bytes, QCfg, ?DEFAULT_SEG_BYTES),
MaxTotalSize = maps:get(max_total_size, QCfg, ?DEFAULT_MAX_TOTAL_SIZE),
QueueConfig =
case Dir =:= undefined orelse Dir =:= "" of
true ->
#{mem_only => true};
false ->
#{
dir => filename:join([Dir, node(), Name]),
seg_bytes => SegBytes,
max_total_size => MaxTotalSize
}
end,
replayq:open(QueueConfig#{
sizer => fun emqx_connector_mqtt_msg:estimate_size/1,
marshaller => fun ?MODULE:msg_marshaller/1
}).
pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) -> pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) ->
ConnectOpts#{ ConnectOpts#{
subscriptions => pre_process_in_out(in, InConf), subscriptions => pre_process_in_out(in, InConf),
@ -276,9 +250,8 @@ pre_process_conf(Key, Conf) ->
code_change(_Vsn, State, Data, _Extra) -> code_change(_Vsn, State, Data, _Extra) ->
{ok, State, Data}. {ok, State, Data}.
terminate(_Reason, _StateName, #{replayq := Q} = State) -> terminate(_Reason, _StateName, State) ->
_ = disconnect(State), _ = disconnect(State),
_ = replayq:close(Q),
maybe_destroy_session(State). maybe_destroy_session(State).
maybe_destroy_session(#{connect_opts := ConnectOpts = #{clean_start := false}} = State) -> maybe_destroy_session(#{connect_opts := ConnectOpts = #{clean_start := false}} = State) ->
@ -322,15 +295,18 @@ connecting(#{reconnect_interval := ReconnectDelayMs} = State) ->
{keep_state_and_data, {state_timeout, ReconnectDelayMs, reconnect}} {keep_state_and_data, {state_timeout, ReconnectDelayMs, reconnect}}
end. end.
connected(state_timeout, connected, #{inflight := Inflight} = State) -> connected(state_timeout, connected, State) ->
case retry_inflight(State#{inflight := []}, Inflight) of %% nothing to do
{ok, NewState} -> {keep_state, State};
{keep_state, NewState, {next_event, internal, maybe_send}}; connected({call, From}, {send_to_remote, Msg}, State) ->
{error, NewState} -> case do_send(State, Msg) of
{keep_state, NewState} {ok, NState} ->
{keep_state, NState, [{reply, From, ok}]};
{error, Reason} ->
{keep_state_and_data, [[reply, From, {error, Reason}]]}
end; end;
connected(internal, maybe_send, State) -> connected(cast, {send_to_remote_async, Msg, Callback}, State) ->
{_, NewState} = pop_and_send(State), {_, NewState} = do_send_async(State, Msg, Callback),
{keep_state, NewState}; {keep_state, NewState};
connected( connected(
info, info,
@ -345,9 +321,6 @@ connected(
false -> false ->
keep_state_and_data keep_state_and_data
end; end;
connected(info, {batch_ack, Ref}, State) ->
NewState = handle_batch_ack(State, Ref),
{keep_state, NewState, {next_event, internal, maybe_send}};
connected(Type, Content, State) -> connected(Type, Content, State) ->
common(connected, Type, Content, State). common(connected, Type, Content, State).
@ -368,9 +341,6 @@ common(_StateName, {call, From}, get_subscriptions, #{connection := Connection})
{keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]}; {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]};
common(_StateName, info, {'EXIT', _, _}, State) -> common(_StateName, info, {'EXIT', _, _}, State) ->
{keep_state, State}; {keep_state, State};
common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) ->
NewQ = replayq:append(Q, [Msg]),
{keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}};
common(StateName, Type, Content, #{name := Name} = State) -> common(StateName, Type, Content, #{name := Name} = State) ->
?SLOG(notice, #{ ?SLOG(notice, #{
msg => "bridge_discarded_event", msg => "bridge_discarded_event",
@ -384,13 +354,12 @@ common(StateName, Type, Content, #{name := Name} = State) ->
do_connect( do_connect(
#{ #{
connect_opts := ConnectOpts, connect_opts := ConnectOpts,
inflight := Inflight,
name := Name name := Name
} = State } = State
) -> ) ->
case emqx_connector_mqtt_mod:start(ConnectOpts) of case emqx_connector_mqtt_mod:start(ConnectOpts) of
{ok, Conn} -> {ok, Conn} ->
?tp(info, connected, #{name => Name, inflight => length(Inflight)}), ?tp(info, connected, #{name => Name}),
{ok, State#{connection => Conn}}; {ok, State#{connection => Conn}};
{error, Reason} -> {error, Reason} ->
ConnectOpts1 = obfuscate(ConnectOpts), ConnectOpts1 = obfuscate(ConnectOpts),
@ -402,39 +371,7 @@ do_connect(
{error, Reason, State} {error, Reason, State}
end. end.
%% Retry all inflight (previously sent but not acked) batches. do_send(#{connect_opts := #{forwards := undefined}}, Msg) ->
retry_inflight(State, []) ->
{ok, State};
retry_inflight(State, [#{q_ack_ref := QAckRef, msg := Msg} | Rest] = OldInf) ->
case do_send(State, QAckRef, Msg) of
{ok, State1} ->
retry_inflight(State1, Rest);
{error, #{inflight := NewInf} = State1} ->
{error, State1#{inflight := NewInf ++ OldInf}}
end.
pop_and_send(#{inflight := Inflight, max_inflight := Max} = State) ->
pop_and_send_loop(State, Max - length(Inflight)).
pop_and_send_loop(State, 0) ->
?tp(debug, inflight_full, #{}),
{ok, State};
pop_and_send_loop(#{replayq := Q} = State, N) ->
case replayq:is_empty(Q) of
true ->
?tp(debug, replayq_drained, #{}),
{ok, State};
false ->
BatchSize = 1,
Opts = #{count_limit => BatchSize, bytes_limit => 999999999},
{Q1, QAckRef, [Msg]} = replayq:pop(Q, Opts),
case do_send(State#{replayq := Q1}, QAckRef, Msg) of
{ok, NewState} -> pop_and_send_loop(NewState, N - 1);
{error, NewState} -> {error, NewState}
end
end.
do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) ->
?SLOG(error, #{ ?SLOG(error, #{
msg => msg =>
"cannot_forward_messages_to_remote_broker" "cannot_forward_messages_to_remote_broker"
@ -443,98 +380,68 @@ do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) ->
}); });
do_send( do_send(
#{ #{
inflight := Inflight,
connection := Connection, connection := Connection,
mountpoint := Mountpoint, mountpoint := Mountpoint,
connect_opts := #{forwards := Forwards} connect_opts := #{forwards := Forwards}
} = State, } = State,
QAckRef,
Msg Msg
) -> ) ->
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
ExportMsg = fun(Message) -> ExportMsg = emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars),
emqx_connector_mqtt_msg:to_remote_msg(Message, Vars)
end,
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "publish_to_remote_broker", msg => "publish_to_remote_broker",
message => Msg, message => Msg,
vars => Vars vars => Vars
}), }),
case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(Msg)]) of case emqx_connector_mqtt_mod:send(Connection, ExportMsg) of
{ok, Refs} -> ok ->
{ok, State#{ {ok, State};
inflight := Inflight ++ {ok, #{reason_code := RC}} when
[ RC =:= ?RC_SUCCESS;
#{ RC =:= ?RC_NO_MATCHING_SUBSCRIBERS
q_ack_ref => QAckRef, ->
send_ack_ref => map_set(Refs), {ok, State};
msg => Msg {ok, #{reason_code := RC, reason_code_name := RCN}} ->
} ?SLOG(warning, #{
] msg => "publish_to_remote_node_falied",
}}; message => Msg,
reason_code => RC,
reason_code_name => RCN
}),
{error, RCN};
{error, Reason} -> {error, Reason} ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "mqtt_bridge_produce_failed", msg => "mqtt_bridge_produce_failed",
reason => Reason reason => Reason
}), }),
{error, State} {error, Reason}
end. end.
%% map as set, ack-reference -> 1 do_send_async(#{connect_opts := #{forwards := undefined}}, Msg, _Callback) ->
map_set(Ref) when is_reference(Ref) -> %% TODO: eval callback with undefined error
%% QoS-0 or RPC call returns a reference ?SLOG(error, #{
map_set([Ref]); msg =>
map_set(List) -> "cannot_forward_messages_to_remote_broker"
map_set(List, #{}). "_as_'egress'_is_not_configured",
messages => Msg
map_set([], Set) -> Set; });
map_set([H | T], Set) -> map_set(T, Set#{H => 1}). do_send_async(
#{
handle_batch_ack(#{inflight := Inflight0, replayq := Q} = State, Ref) -> connection := Connection,
Inflight1 = do_ack(Inflight0, Ref), mountpoint := Mountpoint,
Inflight = drop_acked_batches(Q, Inflight1), connect_opts := #{forwards := Forwards}
State#{inflight := Inflight}. },
Msg,
do_ack([], Ref) -> Callback
?SLOG(debug, #{
msg => "stale_batch_ack_reference",
ref => Ref
}),
[];
do_ack([#{send_ack_ref := Refs} = First | Rest], Ref) ->
case maps:is_key(Ref, Refs) of
true ->
NewRefs = maps:without([Ref], Refs),
[First#{send_ack_ref := NewRefs} | Rest];
false ->
[First | do_ack(Rest, Ref)]
end.
%% Drop the consecutive header of the inflight list having empty send_ack_ref
drop_acked_batches(_Q, []) ->
?tp(debug, inflight_drained, #{}),
[];
drop_acked_batches(
Q,
[
#{
send_ack_ref := Refs,
q_ack_ref := QAckRef
}
| Rest
] = All
) -> ) ->
case maps:size(Refs) of Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
0 -> ExportMsg = emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars),
%% all messages are acked by bridge target ?SLOG(debug, #{
%% now it's safe to ack replayq (delete from disk) msg => "publish_to_remote_broker",
ok = replayq:ack(Q, QAckRef), message => Msg,
%% continue to check more sent batches vars => Vars
drop_acked_batches(Q, Rest); }),
_ -> emqx_connector_mqtt_mod:send_async(Connection, ExportMsg, Callback).
%% the head (oldest) inflight batch is not acked, keep waiting
All
end.
disconnect(#{connection := Conn} = State) when Conn =/= undefined -> disconnect(#{connection := Conn} = State) when Conn =/= undefined ->
emqx_connector_mqtt_mod:stop(Conn), emqx_connector_mqtt_mod:stop(Conn),
@ -542,10 +449,6 @@ disconnect(#{connection := Conn} = State) when Conn =/= undefined ->
disconnect(State) -> disconnect(State) ->
State. State.
%% Called only when replayq needs to dump it to disk.
msg_marshaller(Bin) when is_binary(Bin) -> emqx_connector_mqtt_msg:from_binary(Bin);
msg_marshaller(Msg) -> emqx_connector_mqtt_msg:to_binary(Msg).
format_mountpoint(undefined) -> format_mountpoint(undefined) ->
undefined; undefined;
format_mountpoint(Prefix) -> format_mountpoint(Prefix) ->