fix(bridge): cannot start mqtt bridge from config

This commit is contained in:
Shawn 2021-10-26 10:43:05 +08:00
parent 63f942a1b8
commit f01f9632c1
6 changed files with 73 additions and 171 deletions

View File

@ -6,7 +6,6 @@
bridges.mqtt.my_mqtt_bridge_to_aws {
server = "127.0.0.1:1883"
proto_ver = "v4"
clientid = "my_mqtt_bridge_to_aws"
username = "username1"
password = ""
clean_start = true
@ -19,7 +18,6 @@ bridges.mqtt.my_mqtt_bridge_to_aws {
dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
seg_bytes = "100MB"
offload = false
max_total_bytes = "1GB"
}
ssl {
enable = false

View File

@ -221,15 +221,19 @@ get_matched_bridges(Topic) ->
Bridges = emqx:get_config([bridges], #{}),
maps:fold(fun (BType, Conf, Acc0) ->
maps:fold(fun
(BName, #{from_local_topic := Filter}, Acc1) ->
case emqx_topic:match(Topic, Filter) of
true -> [bridge_id(BType, BName) | Acc1];
false -> Acc1
end;
(_Name, _BridgeConf, Acc1) -> Acc1
(BName, #{egress := Egress}, Acc1) ->
get_matched_bridge_id(Egress, Topic, BType, BName, Acc1);
(BName, BridgeConf, Acc1) ->
get_matched_bridge_id(BridgeConf, Topic, BType, BName, Acc1)
end, Acc0, Conf)
end, [], Bridges).
get_matched_bridge_id(#{from_local_topic := Filter}, Topic, BType, BName, Acc) ->
case emqx_topic:match(Topic, Filter) of
true -> [bridge_id(BType, BName) | Acc];
false -> Acc
end.
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).

View File

@ -74,8 +74,9 @@ load_bridges(Configs) ->
%% TODO: move this monitor into emqx_resource
%% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}).
load_bridge(<<"http">>, Name, Config) ->
Config1 = parse_http_confs(Config),
do_load_bridge(<<"http">>, Name, Config1).
do_load_bridge(<<"http">>, Name, parse_http_confs(Config));
load_bridge(Type, Name, Config) ->
do_load_bridge(Type, Name, Config).
do_load_bridge(Type, Name, Config) ->
case emqx_resource:check_and_create_local(

View File

@ -46,7 +46,7 @@
%%=====================================================================
%% Hocon schema
roots() ->
[{config, #{type => hoconsc:ref(?MODULE, "config")}}].
fields("config").
fields("config") ->
emqx_connector_mqtt_schema:fields("config").
@ -89,109 +89,62 @@ drop_bridge(Name) ->
%% ===================================================================
%% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called
%% if the bridge received msgs from the remote broker.
on_message_received(Msg, ChannId) ->
Name = atom_to_binary(ChannId, utf8),
on_message_received(Msg, BridgeId) ->
Name = atom_to_binary(BridgeId, utf8),
emqx:run_hook(<<"$bridges/", Name/binary>>, [Msg]).
%% ===================================================================
on_start(InstId, Conf) ->
?SLOG(info, #{msg => "starting mqtt connector",
connector => InstId, config => Conf}),
"bridge:" ++ NamePrefix = binary_to_list(InstId),
BridgeId = list_to_atom(NamePrefix),
?SLOG(info, #{msg => "starting mqtt connector",
connector => BridgeId, config => Conf}),
BasicConf = basic_config(Conf),
InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, channels => []}},
InOutConfigs = taged_map_list(ingress, maps:get(ingress, Conf, #{}))
++ taged_map_list(egress, maps:get(egress, Conf, #{})),
lists:foldl(fun
(_InOutConf, {error, Reason}) ->
{error, Reason};
(InOutConf, {ok, #{channels := SubBridges} = Res}) ->
case create_channel(InOutConf, NamePrefix, BasicConf) of
{error, Reason} -> {error, Reason};
{ok, Name} -> {ok, Res#{channels => [Name | SubBridges]}}
end
end, InitRes, InOutConfigs).
on_stop(InstId, #{channels := NameList}) ->
?SLOG(info, #{msg => "stopping mqtt connector",
connector => InstId}),
lists:foreach(fun(Name) ->
remove_channel(Name)
end, NameList).
%% TODO: let the emqx_resource trigger on_query/4 automatically according to the
%% `ingress` and `egress` config
on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
baisc_conf := BasicConf}) ->
create_channel(Conf, Prefix, BasicConf);
on_query(_InstId, {send_message, ChannelId, Msg}, _AfterQuery, _State) ->
?SLOG(debug, #{msg => "send msg to remote node", message => Msg,
channel_id => ChannelId}),
emqx_connector_mqtt_worker:send_to_remote(ChannelId, Msg).
on_health_check(_InstId, #{channels := NameList} = State) ->
Results = [{Name, emqx_connector_mqtt_worker:ping(Name)} || Name <- NameList],
case lists:all(fun({_, pong}) -> true; ({_, _}) -> false end, Results) of
true -> {ok, State};
false -> {error, {some_channel_down, Results}, State}
end.
create_channel({{ingress, Id}, #{from_remote_topic := RemoteT} = Conf},
NamePrefix, BasicConf) ->
LocalT = maps:get(to_local_topic, Conf, undefined),
ChannId = ingress_channel_id(NamePrefix, Id),
?SLOG(info, #{msg => "creating ingress channel",
to_remote_topic => RemoteT,
to_local_topic => LocalT,
channel_id => ChannId}),
do_create_channel(BasicConf#{
name => ChannId,
clientid => clientid(ChannId),
subscriptions => Conf#{
to_local_topic => LocalT,
on_message_received => {fun ?MODULE:on_message_received/2, [ChannId]}
SubRemoteConf = maps:get(ingress, Conf, #{}),
FrowardConf = maps:get(egress, Conf, #{}),
BridgeConf = BasicConf#{
name => BridgeId,
clientid => clientid(BridgeId),
subscriptions => SubRemoteConf#{
to_local_topic => maps:get(to_local_topic, SubRemoteConf, undefined),
on_message_received => {fun ?MODULE:on_message_received/2, [BridgeId]}
},
forwards => undefined});
create_channel({{egress, Id}, #{to_remote_topic := RemoteT} = Conf},
NamePrefix, BasicConf) ->
LocalT = maps:get(from_local_topic, Conf, undefined),
ChannId = egress_channel_id(NamePrefix, Id),
?SLOG(info, #{msg => "creating egress channel",
to_remote_topic => RemoteT,
to_local_topic => LocalT,
channel_id => ChannId}),
do_create_channel(BasicConf#{
name => ChannId,
clientid => clientid(ChannId),
subscriptions => undefined,
forwards => Conf#{from_local_topic => LocalT}}).
remove_channel(ChannId) ->
?SLOG(info, #{msg => "removing channel",
channel_id => ChannId}),
case ?MODULE:drop_bridge(ChannId) of
ok -> ok;
{error, not_found} -> ok;
{error, Reason} ->
?SLOG(error, #{msg => "stop channel failed",
channel_id => ChannId, reason => Reason})
end.
do_create_channel(#{name := Name} = Conf) ->
case ?MODULE:create_bridge(Conf) of
forwards => FrowardConf#{
from_local_topic => maps:get(from_local_topic, FrowardConf, undefined)
}
},
case ?MODULE:create_bridge(BridgeConf) of
{ok, _Pid} ->
start_channel(Name);
case emqx_connector_mqtt_worker:ensure_started(BridgeId) of
ok -> {ok, #{name => BridgeId}};
{error, Reason} -> {error, Reason}
end;
{error, {already_started, _Pid}} ->
{ok, Name};
{ok, #{name => BridgeId}};
{error, Reason} ->
{error, Reason}
end.
start_channel(Name) ->
case emqx_connector_mqtt_worker:ensure_started(Name) of
ok -> {ok, Name};
{error, Reason} -> {error, Reason}
on_stop(_InstId, #{name := BridgeId}) ->
?SLOG(info, #{msg => "stopping mqtt connector",
connector => BridgeId}),
case ?MODULE:drop_bridge(BridgeId) of
ok -> ok;
{error, not_found} -> ok;
{error, Reason} ->
?SLOG(error, #{msg => "stop mqtt connector",
connector => BridgeId, reason => Reason})
end.
on_query(_InstId, {send_message, BridgeId, Msg}, _AfterQuery, _State) ->
?SLOG(debug, #{msg => "send msg to remote node", message => Msg,
connector => BridgeId}),
emqx_connector_mqtt_worker:send_to_remote(BridgeId, Msg).
on_health_check(_InstId, #{name := BridgeId} = State) ->
case emqx_connector_mqtt_worker:ping(BridgeId) of
pong -> {ok, State};
_ -> {error, {connector_down, BridgeId}, State}
end.
basic_config(#{
@ -225,19 +178,8 @@ basic_config(#{
if_record_metrics => true
}.
taged_map_list(Tag, Map) ->
[{{Tag, K}, V} || {K, V} <- maps:to_list(Map)].
ingress_channel_id(Prefix, Id) ->
channel_name("ingress", Prefix, Id).
egress_channel_id(Prefix, Id) ->
channel_name("egress", Prefix, Id).
channel_name(Type, Prefix, Id) ->
list_to_atom(str(Prefix) ++ ":" ++ Type ++ ":" ++ str(Id)).
clientid(Id) ->
list_to_binary(str(Id) ++ ":" ++ emqx_misc:gen_id(8)).
list_to_binary(lists:concat([str(Id), ":", node()])).
str(A) when is_atom(A) ->
atom_to_list(A);

View File

@ -92,7 +92,7 @@ the rule.
"""
})}
, {egress,
sc(hoconsc:map(id, ref("egress")),
sc(ref("egress"),
#{ default => #{}
, desc => """
The egress config defines how this bridge forwards messages from the local broker to the remote

View File

@ -101,14 +101,12 @@
-export([msg_marshaller/1]).
-export_type([ config/0
, batch/0
, ack_ref/0
]).
-type id() :: atom() | string() | pid().
-type qos() :: emqx_types:qos().
-type config() :: map().
-type batch() :: [emqx_connector_mqtt_msg:exp_msg()].
-type ack_ref() :: term().
-type topic() :: emqx_types:topic().
@ -117,7 +115,7 @@
%% same as default in-flight limit for emqtt
-define(DEFAULT_BATCH_SIZE, 32).
-define(DEFAULT_INFLIGHT_SIZE, 32).
-define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
-define(DEFAULT_SEG_BYTES, (1 bsl 20)).
-define(DEFAULT_MAX_TOTAL_SIZE, (1 bsl 31)).
@ -205,12 +203,10 @@ init_state(Opts) ->
ReconnDelayMs = maps:get(reconnect_interval, Opts, ?DEFAULT_RECONNECT_DELAY_MS),
StartType = maps:get(start_type, Opts, manual),
Mountpoint = maps:get(forward_mountpoint, Opts, undefined),
MaxInflightSize = maps:get(max_inflight, Opts, ?DEFAULT_BATCH_SIZE),
BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
MaxInflightSize = maps:get(max_inflight, Opts, ?DEFAULT_INFLIGHT_SIZE),
Name = maps:get(name, Opts, undefined),
#{start_type => StartType,
reconnect_interval => ReconnDelayMs,
batch_size => BatchSize,
mountpoint => format_mountpoint(Mountpoint),
inflight => [],
max_inflight => MaxInflightSize,
@ -327,10 +323,6 @@ common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := F
{keep_state_and_data, [{reply, From, Forwards}]};
common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) ->
{keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]};
common(_StateName, info, {deliver, _, Msg}, State = #{replayq := Q}) ->
Msgs = collect([Msg]),
NewQ = replayq:append(Q, Msgs),
{keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}};
common(_StateName, info, {'EXIT', _, _}, State) ->
{keep_state, State};
common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) ->
@ -342,13 +334,9 @@ common(StateName, Type, Content, #{name := Name} = State) ->
content => Content}),
{keep_state, State}.
do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards},
do_connect(#{connect_opts := ConnectOpts,
inflight := Inflight,
name := Name} = State) ->
case Forwards of
undefined -> ok;
#{from_local_topic := Topic} -> from_local_topic(Topic, Name)
end,
case emqx_connector_mqtt_mod:start(ConnectOpts) of
{ok, Conn} ->
?tp(info, connected, #{name => Name, inflight => length(Inflight)}),
@ -360,19 +348,10 @@ do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards},
{error, Reason, State}
end.
collect(Acc) ->
receive
{deliver, _, Msg} ->
collect([Msg | Acc])
after
0 ->
lists:reverse(Acc)
end.
%% Retry all inflight (previously sent but not acked) batches.
retry_inflight(State, []) -> {ok, State};
retry_inflight(State, [#{q_ack_ref := QAckRef, batch := Batch} | Rest] = OldInf) ->
case do_send(State, QAckRef, Batch) of
retry_inflight(State, [#{q_ack_ref := QAckRef, msg := Msg} | Rest] = OldInf) ->
case do_send(State, QAckRef, Msg) of
{ok, State1} ->
retry_inflight(State1, Rest);
{error, #{inflight := NewInf} = State1} ->
@ -393,34 +372,33 @@ pop_and_send_loop(#{replayq := Q} = State, N) ->
false ->
BatchSize = 1,
Opts = #{count_limit => BatchSize, bytes_limit => 999999999},
{Q1, QAckRef, Batch} = replayq:pop(Q, Opts),
case do_send(State#{replayq := Q1}, QAckRef, Batch) of
{Q1, QAckRef, [Msg]} = replayq:pop(Q, Opts),
case do_send(State#{replayq := Q1}, QAckRef, Msg) of
{ok, NewState} -> pop_and_send_loop(NewState, N - 1);
{error, NewState} -> {error, NewState}
end
end.
%% Assert non-empty batch because we have a is_empty check earlier.
do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Batch) ->
do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) ->
?SLOG(error, #{msg => "cannot forward messages to remote broker"
" as egress_channel is not configured",
messages => Batch});
" as forwards is not configured",
messages => Msg});
do_send(#{inflight := Inflight,
connection := Connection,
mountpoint := Mountpoint,
connect_opts := #{forwards := Forwards}} = State, QAckRef, [_ | _] = Batch) ->
connect_opts := #{forwards := Forwards}} = State, QAckRef, Msg) ->
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
ExportMsg = fun(Message) ->
emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'),
emqx_connector_mqtt_msg:to_remote_msg(Message, Vars)
end,
?SLOG(debug, #{msg => "publish to remote broker",
message => Batch, vars => Vars}),
case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(M) || M <- Batch]) of
message => Msg, vars => Vars}),
case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(Msg)]) of
{ok, Refs} ->
{ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef,
send_ack_ref => map_set(Refs),
batch => Batch}]}};
msg => Msg}]}};
{error, Reason} ->
?SLOG(info, #{msg => "mqtt_bridge_produce_failed",
reason => Reason}),
@ -473,27 +451,6 @@ drop_acked_batches(Q, [#{send_ack_ref := Refs,
All
end.
from_local_topic(undefined, _Name) ->
ok;
from_local_topic(Topic, Name) ->
do_subscribe(Topic, Name).
topic(T) -> iolist_to_binary(T).
validate(RawTopic) ->
Topic = topic(RawTopic),
try emqx_topic:validate(Topic) of
_Success -> Topic
catch
error:Reason ->
error({bad_topic, Topic, Reason})
end.
do_subscribe(RawTopic, Name) ->
TopicFilter = validate(RawTopic),
{Topic, SubOpts} = emqx_topic:parse(TopicFilter, #{qos => ?QOS_2}),
emqx_broker:subscribe(Topic, Name, SubOpts).
disconnect(#{connection := Conn} = State) when Conn =/= undefined ->
emqx_connector_mqtt_mod:stop(Conn),
State#{connection => undefined};