Merge pull request #8789 from HJianBo/refactor-mqtt-bridge
Support async mode for MQTT bridge
This commit is contained in:
commit
55c538993e
|
@ -43,7 +43,7 @@
|
|||
{meck, "0.9.2"},
|
||||
{proper, "1.4.0"},
|
||||
{bbmustache, "1.10.0"},
|
||||
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.6.0"}}}
|
||||
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0-rc.1"}}}
|
||||
]},
|
||||
{extra_src_dirs, [{"test", [recursive]}]}
|
||||
]}
|
||||
|
|
|
@ -115,7 +115,7 @@ message_expiry_interval_init() ->
|
|||
message_expiry_interval_exipred(CPublish, CControl, QoS) ->
|
||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a and waiting for the message expired
|
||||
emqtt:publish(
|
||||
_ = emqtt:publish(
|
||||
CPublish,
|
||||
<<"t/a">>,
|
||||
#{'Message-Expiry-Interval' => 1},
|
||||
|
@ -152,7 +152,7 @@ message_expiry_interval_exipred(CPublish, CControl, QoS) ->
|
|||
message_expiry_interval_not_exipred(CPublish, CControl, QoS) ->
|
||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||
%% publish to t/a
|
||||
emqtt:publish(
|
||||
_ = emqtt:publish(
|
||||
CPublish,
|
||||
<<"t/a">>,
|
||||
#{'Message-Expiry-Interval' => 20},
|
||||
|
|
|
@ -529,8 +529,11 @@ t_connack_max_qos_allowed(Config) ->
|
|||
%% [MQTT-3.2.2-10]
|
||||
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2),
|
||||
|
||||
{ok, _} = emqtt:publish(Client1, Topic, <<"Unsupported Qos 1">>, qos1),
|
||||
%% [MQTT-3.2.2-11]
|
||||
?assertMatch(
|
||||
{error, {disconnected, 155, _}},
|
||||
emqtt:publish(Client1, Topic, <<"Unsupported Qos 1">>, qos1)
|
||||
),
|
||||
?assertEqual(155, receive_disconnect_reasoncode()),
|
||||
waiting_client_process_exit(Client1),
|
||||
|
||||
|
@ -563,8 +566,11 @@ t_connack_max_qos_allowed(Config) ->
|
|||
%% [MQTT-3.2.2-10]
|
||||
{ok, _, [2]} = emqtt:subscribe(Client3, Topic, 2),
|
||||
|
||||
{ok, _} = emqtt:publish(Client3, Topic, <<"Unsupported Qos 2">>, qos2),
|
||||
%% [MQTT-3.2.2-11]
|
||||
?assertMatch(
|
||||
{error, {disconnected, 155, _}},
|
||||
emqtt:publish(Client3, Topic, <<"Unsupported Qos 2">>, qos2)
|
||||
),
|
||||
?assertEqual(155, receive_disconnect_reasoncode()),
|
||||
waiting_client_process_exit(Client3),
|
||||
|
||||
|
|
|
@ -230,6 +230,13 @@ mqtt_main_example() ->
|
|||
keepalive => <<"300s">>,
|
||||
retry_interval => <<"15s">>,
|
||||
max_inflight => 100,
|
||||
resource_opts => #{
|
||||
health_check_interval => <<"15s">>,
|
||||
auto_restart_interval => <<"60s">>,
|
||||
query_mode => sync,
|
||||
enable_queue => false,
|
||||
max_queue_bytes => 100 * 1024 * 1024
|
||||
},
|
||||
ssl => #{
|
||||
enable => false
|
||||
}
|
||||
|
|
|
@ -12,41 +12,46 @@
|
|||
namespace() -> "bridge_mqtt".
|
||||
|
||||
roots() -> [].
|
||||
|
||||
fields("config") ->
|
||||
%% enable
|
||||
emqx_bridge_schema:common_bridge_fields() ++
|
||||
emqx_connector_mqtt_schema:fields("config");
|
||||
fields("post") ->
|
||||
[
|
||||
type_field(),
|
||||
name_field()
|
||||
] ++ emqx_connector_mqtt_schema:fields("config");
|
||||
fields("put") ->
|
||||
{resource_opts,
|
||||
mk(
|
||||
ref(?MODULE, "creation_opts"),
|
||||
#{
|
||||
required => false,
|
||||
default => #{},
|
||||
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
|
||||
}
|
||||
)}
|
||||
] ++
|
||||
emqx_connector_mqtt_schema:fields("config");
|
||||
fields("creation_opts") ->
|
||||
Opts = emqx_resource_schema:fields("creation_opts"),
|
||||
[O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];
|
||||
fields("post") ->
|
||||
[type_field(), name_field() | fields("config")];
|
||||
fields("put") ->
|
||||
fields("config");
|
||||
fields("get") ->
|
||||
emqx_bridge_schema:metrics_status_fields() ++ fields("config").
|
||||
|
||||
desc("config") ->
|
||||
?DESC("config");
|
||||
desc("creation_opts" = Name) ->
|
||||
emqx_resource_schema:desc(Name);
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
%%======================================================================================
|
||||
%% internal
|
||||
is_hidden_opts(Field) ->
|
||||
lists:member(Field, [enable_batch, batch_size, batch_time]).
|
||||
|
||||
type_field() ->
|
||||
{type,
|
||||
mk(
|
||||
mqtt,
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC("desc_type")
|
||||
}
|
||||
)}.
|
||||
{type, mk(mqtt, #{required => true, desc => ?DESC("desc_type")})}.
|
||||
|
||||
name_field() ->
|
||||
{name,
|
||||
mk(
|
||||
binary(),
|
||||
#{
|
||||
required => true,
|
||||
desc => ?DESC("desc_name")
|
||||
}
|
||||
)}.
|
||||
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
|
||||
|
|
|
@ -20,8 +20,7 @@
|
|||
%% By accident, We have always been using the upstream fork due to
|
||||
%% eredis_cluster's dependency getting resolved earlier.
|
||||
%% Here we pin 1.5.2 to avoid surprises in the future.
|
||||
{poolboy, {git, "https://github.com/emqx/poolboy.git", {tag, "1.5.2"}}},
|
||||
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.6.0"}}}
|
||||
{poolboy, {git, "https://github.com/emqx/poolboy.git", {tag, "1.5.2"}}}
|
||||
]}.
|
||||
|
||||
{shell, [
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
on_start/2,
|
||||
on_stop/2,
|
||||
on_query/3,
|
||||
on_query_async/4,
|
||||
on_get_status/2
|
||||
]).
|
||||
|
||||
|
@ -139,7 +140,7 @@ on_message_received(Msg, HookPoint, ResId) ->
|
|||
emqx:run_hook(HookPoint, [Msg]).
|
||||
|
||||
%% ===================================================================
|
||||
callback_mode() -> always_sync.
|
||||
callback_mode() -> async_if_possible.
|
||||
|
||||
on_start(InstId, Conf) ->
|
||||
InstanceId = binary_to_atom(InstId, utf8),
|
||||
|
@ -189,6 +190,16 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
|
|||
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
|
||||
ok.
|
||||
|
||||
on_query_async(
|
||||
_InstId,
|
||||
{send_message, Msg},
|
||||
{ReplayFun, Args},
|
||||
#{name := InstanceId}
|
||||
) ->
|
||||
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
|
||||
emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplayFun, Args}),
|
||||
ok.
|
||||
|
||||
on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) ->
|
||||
AutoReconn = maps:get(auto_reconnect, Conf, true),
|
||||
case emqx_connector_mqtt_worker:status(InstanceId) of
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
-export([
|
||||
start/1,
|
||||
send/2,
|
||||
send_async/3,
|
||||
stop/1,
|
||||
ping/1
|
||||
]).
|
||||
|
@ -32,7 +33,6 @@
|
|||
|
||||
%% callbacks for emqtt
|
||||
-export([
|
||||
handle_puback/2,
|
||||
handle_publish/3,
|
||||
handle_disconnected/2
|
||||
]).
|
||||
|
@ -134,44 +134,11 @@ safe_stop(Pid, StopF, Timeout) ->
|
|||
exit(Pid, kill)
|
||||
end.
|
||||
|
||||
send(Conn, Msgs) ->
|
||||
send(Conn, Msgs, []).
|
||||
send(#{client_pid := ClientPid}, Msg) ->
|
||||
emqtt:publish(ClientPid, Msg).
|
||||
|
||||
send(_Conn, [], []) ->
|
||||
%% all messages in the batch are QoS-0
|
||||
Ref = make_ref(),
|
||||
%% QoS-0 messages do not have packet ID
|
||||
%% the batch ack is simulated with a loop-back message
|
||||
self() ! {batch_ack, Ref},
|
||||
{ok, Ref};
|
||||
send(_Conn, [], PktIds) ->
|
||||
%% PktIds is not an empty list if there is any non-QoS-0 message in the batch,
|
||||
%% And the worker should wait for all acks
|
||||
{ok, PktIds};
|
||||
send(#{client_pid := ClientPid} = Conn, [Msg | Rest], PktIds) ->
|
||||
case emqtt:publish(ClientPid, Msg) of
|
||||
ok ->
|
||||
send(Conn, Rest, PktIds);
|
||||
{ok, PktId} ->
|
||||
send(Conn, Rest, [PktId | PktIds]);
|
||||
{error, Reason} ->
|
||||
%% NOTE: There is no partial success of a batch and recover from the middle
|
||||
%% only to retry all messages in one batch
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
handle_puback(#{packet_id := PktId, reason_code := RC}, Parent) when
|
||||
RC =:= ?RC_SUCCESS;
|
||||
RC =:= ?RC_NO_MATCHING_SUBSCRIBERS
|
||||
->
|
||||
Parent ! {batch_ack, PktId},
|
||||
ok;
|
||||
handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
|
||||
?SLOG(warning, #{
|
||||
msg => "publish_to_remote_node_falied",
|
||||
packet_id => PktId,
|
||||
reason_code => RC
|
||||
}).
|
||||
send_async(#{client_pid := ClientPid}, Msg, Callback) ->
|
||||
emqtt:publish_async(ClientPid, Msg, infinity, Callback).
|
||||
|
||||
handle_publish(Msg, undefined, _Opts) ->
|
||||
?SLOG(error, #{
|
||||
|
@ -200,7 +167,6 @@ handle_disconnected(Reason, Parent) ->
|
|||
|
||||
make_hdlr(Parent, Vars, Opts) ->
|
||||
#{
|
||||
puback => {fun ?MODULE:handle_puback/2, [Parent]},
|
||||
publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]},
|
||||
disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]}
|
||||
}.
|
||||
|
|
|
@ -91,16 +91,14 @@
|
|||
ensure_stopped/1,
|
||||
status/1,
|
||||
ping/1,
|
||||
send_to_remote/2
|
||||
send_to_remote/2,
|
||||
send_to_remote_async/3
|
||||
]).
|
||||
|
||||
-export([get_forwards/1]).
|
||||
|
||||
-export([get_subscriptions/1]).
|
||||
|
||||
%% Internal
|
||||
-export([msg_marshaller/1]).
|
||||
|
||||
-export_type([
|
||||
config/0,
|
||||
ack_ref/0
|
||||
|
@ -133,12 +131,6 @@
|
|||
%% mountpoint: The topic mount point for messages sent to remote node/cluster
|
||||
%% `undefined', `<<>>' or `""' to disable
|
||||
%% forwards: Local topics to subscribe.
|
||||
%% replayq.batch_bytes_limit: Max number of bytes to collect in a batch for each
|
||||
%% send call towards emqx_bridge_connect
|
||||
%% replayq.batch_count_limit: Max number of messages to collect in a batch for
|
||||
%% each send call towards emqx_bridge_connect
|
||||
%% replayq.dir: Directory where replayq should persist messages
|
||||
%% replayq.seg_bytes: Size in bytes for each replayq segment file
|
||||
%%
|
||||
%% Find more connection specific configs in the callback modules
|
||||
%% of emqx_bridge_connect behaviour.
|
||||
|
@ -173,9 +165,14 @@ ping(Name) ->
|
|||
gen_statem:call(name(Name), ping).
|
||||
|
||||
send_to_remote(Pid, Msg) when is_pid(Pid) ->
|
||||
gen_statem:cast(Pid, {send_to_remote, Msg});
|
||||
gen_statem:call(Pid, {send_to_remote, Msg});
|
||||
send_to_remote(Name, Msg) ->
|
||||
gen_statem:cast(name(Name), {send_to_remote, Msg}).
|
||||
gen_statem:call(name(Name), {send_to_remote, Msg}).
|
||||
|
||||
send_to_remote_async(Pid, Msg, Callback) when is_pid(Pid) ->
|
||||
gen_statem:cast(Pid, {send_to_remote_async, Msg, Callback});
|
||||
send_to_remote_async(Name, Msg, Callback) ->
|
||||
gen_statem:cast(name(Name), {send_to_remote_async, Msg, Callback}).
|
||||
|
||||
%% @doc Return all forwards (local subscriptions).
|
||||
-spec get_forwards(id()) -> [topic()].
|
||||
|
@ -194,12 +191,10 @@ init(#{name := Name} = ConnectOpts) ->
|
|||
name => Name
|
||||
}),
|
||||
erlang:process_flag(trap_exit, true),
|
||||
Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})),
|
||||
State = init_state(ConnectOpts),
|
||||
self() ! idle,
|
||||
{ok, idle, State#{
|
||||
connect_opts => pre_process_opts(ConnectOpts),
|
||||
replayq => Queue
|
||||
connect_opts => pre_process_opts(ConnectOpts)
|
||||
}}.
|
||||
|
||||
init_state(Opts) ->
|
||||
|
@ -212,32 +207,11 @@ init_state(Opts) ->
|
|||
start_type => StartType,
|
||||
reconnect_interval => ReconnDelayMs,
|
||||
mountpoint => format_mountpoint(Mountpoint),
|
||||
inflight => [],
|
||||
max_inflight => MaxInflightSize,
|
||||
connection => undefined,
|
||||
name => Name
|
||||
}.
|
||||
|
||||
open_replayq(Name, QCfg) ->
|
||||
Dir = maps:get(dir, QCfg, undefined),
|
||||
SegBytes = maps:get(seg_bytes, QCfg, ?DEFAULT_SEG_BYTES),
|
||||
MaxTotalSize = maps:get(max_total_size, QCfg, ?DEFAULT_MAX_TOTAL_SIZE),
|
||||
QueueConfig =
|
||||
case Dir =:= undefined orelse Dir =:= "" of
|
||||
true ->
|
||||
#{mem_only => true};
|
||||
false ->
|
||||
#{
|
||||
dir => filename:join([Dir, node(), Name]),
|
||||
seg_bytes => SegBytes,
|
||||
max_total_size => MaxTotalSize
|
||||
}
|
||||
end,
|
||||
replayq:open(QueueConfig#{
|
||||
sizer => fun emqx_connector_mqtt_msg:estimate_size/1,
|
||||
marshaller => fun ?MODULE:msg_marshaller/1
|
||||
}).
|
||||
|
||||
pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) ->
|
||||
ConnectOpts#{
|
||||
subscriptions => pre_process_in_out(in, InConf),
|
||||
|
@ -276,9 +250,8 @@ pre_process_conf(Key, Conf) ->
|
|||
code_change(_Vsn, State, Data, _Extra) ->
|
||||
{ok, State, Data}.
|
||||
|
||||
terminate(_Reason, _StateName, #{replayq := Q} = State) ->
|
||||
terminate(_Reason, _StateName, State) ->
|
||||
_ = disconnect(State),
|
||||
_ = replayq:close(Q),
|
||||
maybe_destroy_session(State).
|
||||
|
||||
maybe_destroy_session(#{connect_opts := ConnectOpts = #{clean_start := false}} = State) ->
|
||||
|
@ -322,16 +295,19 @@ connecting(#{reconnect_interval := ReconnectDelayMs} = State) ->
|
|||
{keep_state_and_data, {state_timeout, ReconnectDelayMs, reconnect}}
|
||||
end.
|
||||
|
||||
connected(state_timeout, connected, #{inflight := Inflight} = State) ->
|
||||
case retry_inflight(State#{inflight := []}, Inflight) of
|
||||
{ok, NewState} ->
|
||||
{keep_state, NewState, {next_event, internal, maybe_send}};
|
||||
{error, NewState} ->
|
||||
{keep_state, NewState}
|
||||
connected(state_timeout, connected, State) ->
|
||||
%% nothing to do
|
||||
{keep_state, State};
|
||||
connected({call, From}, {send_to_remote, Msg}, State) ->
|
||||
case do_send(State, Msg) of
|
||||
{ok, NState} ->
|
||||
{keep_state, NState, [{reply, From, ok}]};
|
||||
{error, Reason} ->
|
||||
{keep_state_and_data, [[reply, From, {error, Reason}]]}
|
||||
end;
|
||||
connected(internal, maybe_send, State) ->
|
||||
{_, NewState} = pop_and_send(State),
|
||||
{keep_state, NewState};
|
||||
connected(cast, {send_to_remote_async, Msg, Callback}, State) ->
|
||||
_ = do_send_async(State, Msg, Callback),
|
||||
{keep_state, State};
|
||||
connected(
|
||||
info,
|
||||
{disconnected, Conn, Reason},
|
||||
|
@ -345,9 +321,6 @@ connected(
|
|||
false ->
|
||||
keep_state_and_data
|
||||
end;
|
||||
connected(info, {batch_ack, Ref}, State) ->
|
||||
NewState = handle_batch_ack(State, Ref),
|
||||
{keep_state, NewState, {next_event, internal, maybe_send}};
|
||||
connected(Type, Content, State) ->
|
||||
common(connected, Type, Content, State).
|
||||
|
||||
|
@ -368,9 +341,6 @@ common(_StateName, {call, From}, get_subscriptions, #{connection := Connection})
|
|||
{keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]};
|
||||
common(_StateName, info, {'EXIT', _, _}, State) ->
|
||||
{keep_state, State};
|
||||
common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) ->
|
||||
NewQ = replayq:append(Q, [Msg]),
|
||||
{keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}};
|
||||
common(StateName, Type, Content, #{name := Name} = State) ->
|
||||
?SLOG(notice, #{
|
||||
msg => "bridge_discarded_event",
|
||||
|
@ -384,13 +354,12 @@ common(StateName, Type, Content, #{name := Name} = State) ->
|
|||
do_connect(
|
||||
#{
|
||||
connect_opts := ConnectOpts,
|
||||
inflight := Inflight,
|
||||
name := Name
|
||||
} = State
|
||||
) ->
|
||||
case emqx_connector_mqtt_mod:start(ConnectOpts) of
|
||||
{ok, Conn} ->
|
||||
?tp(info, connected, #{name => Name, inflight => length(Inflight)}),
|
||||
?tp(info, connected, #{name => Name}),
|
||||
{ok, State#{connection => Conn}};
|
||||
{error, Reason} ->
|
||||
ConnectOpts1 = obfuscate(ConnectOpts),
|
||||
|
@ -402,39 +371,7 @@ do_connect(
|
|||
{error, Reason, State}
|
||||
end.
|
||||
|
||||
%% Retry all inflight (previously sent but not acked) batches.
|
||||
retry_inflight(State, []) ->
|
||||
{ok, State};
|
||||
retry_inflight(State, [#{q_ack_ref := QAckRef, msg := Msg} | Rest] = OldInf) ->
|
||||
case do_send(State, QAckRef, Msg) of
|
||||
{ok, State1} ->
|
||||
retry_inflight(State1, Rest);
|
||||
{error, #{inflight := NewInf} = State1} ->
|
||||
{error, State1#{inflight := NewInf ++ OldInf}}
|
||||
end.
|
||||
|
||||
pop_and_send(#{inflight := Inflight, max_inflight := Max} = State) ->
|
||||
pop_and_send_loop(State, Max - length(Inflight)).
|
||||
|
||||
pop_and_send_loop(State, 0) ->
|
||||
?tp(debug, inflight_full, #{}),
|
||||
{ok, State};
|
||||
pop_and_send_loop(#{replayq := Q} = State, N) ->
|
||||
case replayq:is_empty(Q) of
|
||||
true ->
|
||||
?tp(debug, replayq_drained, #{}),
|
||||
{ok, State};
|
||||
false ->
|
||||
BatchSize = 1,
|
||||
Opts = #{count_limit => BatchSize, bytes_limit => 999999999},
|
||||
{Q1, QAckRef, [Msg]} = replayq:pop(Q, Opts),
|
||||
case do_send(State#{replayq := Q1}, QAckRef, Msg) of
|
||||
{ok, NewState} -> pop_and_send_loop(NewState, N - 1);
|
||||
{error, NewState} -> {error, NewState}
|
||||
end
|
||||
end.
|
||||
|
||||
do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) ->
|
||||
do_send(#{connect_opts := #{forwards := undefined}}, Msg) ->
|
||||
?SLOG(error, #{
|
||||
msg =>
|
||||
"cannot_forward_messages_to_remote_broker"
|
||||
|
@ -443,98 +380,68 @@ do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) ->
|
|||
});
|
||||
do_send(
|
||||
#{
|
||||
inflight := Inflight,
|
||||
connection := Connection,
|
||||
mountpoint := Mountpoint,
|
||||
connect_opts := #{forwards := Forwards}
|
||||
} = State,
|
||||
QAckRef,
|
||||
Msg
|
||||
) ->
|
||||
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
|
||||
ExportMsg = fun(Message) ->
|
||||
emqx_connector_mqtt_msg:to_remote_msg(Message, Vars)
|
||||
end,
|
||||
ExportMsg = emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars),
|
||||
?SLOG(debug, #{
|
||||
msg => "publish_to_remote_broker",
|
||||
message => Msg,
|
||||
vars => Vars
|
||||
}),
|
||||
case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(Msg)]) of
|
||||
{ok, Refs} ->
|
||||
{ok, State#{
|
||||
inflight := Inflight ++
|
||||
[
|
||||
#{
|
||||
q_ack_ref => QAckRef,
|
||||
send_ack_ref => map_set(Refs),
|
||||
msg => Msg
|
||||
}
|
||||
]
|
||||
}};
|
||||
case emqx_connector_mqtt_mod:send(Connection, ExportMsg) of
|
||||
ok ->
|
||||
{ok, State};
|
||||
{ok, #{reason_code := RC}} when
|
||||
RC =:= ?RC_SUCCESS;
|
||||
RC =:= ?RC_NO_MATCHING_SUBSCRIBERS
|
||||
->
|
||||
{ok, State};
|
||||
{ok, #{reason_code := RC, reason_code_name := RCN}} ->
|
||||
?SLOG(warning, #{
|
||||
msg => "publish_to_remote_node_falied",
|
||||
message => Msg,
|
||||
reason_code => RC,
|
||||
reason_code_name => RCN
|
||||
}),
|
||||
{error, RCN};
|
||||
{error, Reason} ->
|
||||
?SLOG(info, #{
|
||||
msg => "mqtt_bridge_produce_failed",
|
||||
reason => Reason
|
||||
}),
|
||||
{error, State}
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%% map as set, ack-reference -> 1
|
||||
map_set(Ref) when is_reference(Ref) ->
|
||||
%% QoS-0 or RPC call returns a reference
|
||||
map_set([Ref]);
|
||||
map_set(List) ->
|
||||
map_set(List, #{}).
|
||||
|
||||
map_set([], Set) -> Set;
|
||||
map_set([H | T], Set) -> map_set(T, Set#{H => 1}).
|
||||
|
||||
handle_batch_ack(#{inflight := Inflight0, replayq := Q} = State, Ref) ->
|
||||
Inflight1 = do_ack(Inflight0, Ref),
|
||||
Inflight = drop_acked_batches(Q, Inflight1),
|
||||
State#{inflight := Inflight}.
|
||||
|
||||
do_ack([], Ref) ->
|
||||
?SLOG(debug, #{
|
||||
msg => "stale_batch_ack_reference",
|
||||
ref => Ref
|
||||
}),
|
||||
[];
|
||||
do_ack([#{send_ack_ref := Refs} = First | Rest], Ref) ->
|
||||
case maps:is_key(Ref, Refs) of
|
||||
true ->
|
||||
NewRefs = maps:without([Ref], Refs),
|
||||
[First#{send_ack_ref := NewRefs} | Rest];
|
||||
false ->
|
||||
[First | do_ack(Rest, Ref)]
|
||||
end.
|
||||
|
||||
%% Drop the consecutive header of the inflight list having empty send_ack_ref
|
||||
drop_acked_batches(_Q, []) ->
|
||||
?tp(debug, inflight_drained, #{}),
|
||||
[];
|
||||
drop_acked_batches(
|
||||
Q,
|
||||
[
|
||||
do_send_async(#{connect_opts := #{forwards := undefined}}, Msg, _Callback) ->
|
||||
%% TODO: eval callback with undefined error
|
||||
?SLOG(error, #{
|
||||
msg =>
|
||||
"cannot_forward_messages_to_remote_broker"
|
||||
"_as_'egress'_is_not_configured",
|
||||
messages => Msg
|
||||
});
|
||||
do_send_async(
|
||||
#{
|
||||
send_ack_ref := Refs,
|
||||
q_ack_ref := QAckRef
|
||||
}
|
||||
| Rest
|
||||
] = All
|
||||
connection := Connection,
|
||||
mountpoint := Mountpoint,
|
||||
connect_opts := #{forwards := Forwards}
|
||||
},
|
||||
Msg,
|
||||
Callback
|
||||
) ->
|
||||
case maps:size(Refs) of
|
||||
0 ->
|
||||
%% all messages are acked by bridge target
|
||||
%% now it's safe to ack replayq (delete from disk)
|
||||
ok = replayq:ack(Q, QAckRef),
|
||||
%% continue to check more sent batches
|
||||
drop_acked_batches(Q, Rest);
|
||||
_ ->
|
||||
%% the head (oldest) inflight batch is not acked, keep waiting
|
||||
All
|
||||
end.
|
||||
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
|
||||
ExportMsg = emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars),
|
||||
?SLOG(debug, #{
|
||||
msg => "publish_to_remote_broker",
|
||||
message => Msg,
|
||||
vars => Vars
|
||||
}),
|
||||
emqx_connector_mqtt_mod:send_async(Connection, ExportMsg, Callback).
|
||||
|
||||
disconnect(#{connection := Conn} = State) when Conn =/= undefined ->
|
||||
emqx_connector_mqtt_mod:stop(Conn),
|
||||
|
@ -542,10 +449,6 @@ disconnect(#{connection := Conn} = State) when Conn =/= undefined ->
|
|||
disconnect(State) ->
|
||||
State.
|
||||
|
||||
%% Called only when replayq needs to dump it to disk.
|
||||
msg_marshaller(Bin) when is_binary(Bin) -> emqx_connector_mqtt_msg:from_binary(Bin);
|
||||
msg_marshaller(Msg) -> emqx_connector_mqtt_msg:to_binary(Msg).
|
||||
|
||||
format_mountpoint(undefined) ->
|
||||
undefined;
|
||||
format_mountpoint(Prefix) ->
|
||||
|
|
2
mix.exs
2
mix.exs
|
@ -59,7 +59,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
{:ecpool, github: "emqx/ecpool", tag: "0.5.2", override: true},
|
||||
{:replayq, "0.3.4", override: true},
|
||||
{:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
|
||||
{:emqtt, github: "emqx/emqtt", tag: "1.6.0", override: true},
|
||||
{:emqtt, github: "emqx/emqtt", tag: "1.7.0-rc.1", override: true},
|
||||
{:rulesql, github: "emqx/rulesql", tag: "0.1.4"},
|
||||
{:observer_cli, "1.7.1"},
|
||||
{:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"},
|
||||
|
|
|
@ -61,7 +61,7 @@
|
|||
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
|
||||
, {replayq, "0.3.4"}
|
||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
|
||||
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.6.0"}}}
|
||||
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0-rc.1"}}}
|
||||
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}}
|
||||
, {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
|
||||
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
|
||||
|
|
Loading…
Reference in New Issue