diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 42d216c7c..07e0a7def 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -138,7 +138,7 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> {ok, Message}. send_to_matched_egress_bridges(Topic, Msg) -> - MatchedBridgeIds = get_matched_bridges(Topic), + MatchedBridgeIds = get_matched_egress_bridges(Topic), lists:foreach( fun(Id) -> try send_message(Id, Msg) of @@ -339,13 +339,19 @@ flatten_confs(Conf0) -> do_flatten_confs(Type, Conf0) -> [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. -get_matched_bridges(Topic) -> +get_matched_egress_bridges(Topic) -> Bridges = emqx:get_config([bridges], #{}), maps:fold( fun(BType, Conf, Acc0) -> maps:fold( - fun(BName, BConf, Acc1) -> - get_matched_bridge_id(BType, BConf, Topic, BName, Acc1) + fun + (BName, #{egress := _} = BConf, Acc1) when BType =:= mqtt -> + get_matched_bridge_id(BType, BConf, Topic, BName, Acc1); + (_BName, #{ingress := _}, Acc1) when BType =:= mqtt -> + %% ignore ingress only bridge + Acc1; + (BName, BConf, Acc1) -> + get_matched_bridge_id(BType, BConf, Topic, BName, Acc1) end, Acc0, Conf diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index c907205f1..ec3caff7d 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -34,6 +34,13 @@ -define(NAME_MQTT, <<"my_mqtt_bridge">>). -define(BRIDGE_NAME_INGRESS, <<"ingress_mqtt_bridge">>). -define(BRIDGE_NAME_EGRESS, <<"egress_mqtt_bridge">>). + +%% Having ingress/egress prefixs of topic names to avoid dead loop while bridging +-define(INGRESS_REMOTE_TOPIC, "ingress_remote_topic"). +-define(INGRESS_LOCAL_TOPIC, "ingress_local_topic"). +-define(EGRESS_REMOTE_TOPIC, "egress_remote_topic"). +-define(EGRESS_LOCAL_TOPIC, "egress_local_topic"). + -define(SERVER_CONF(Username), #{ <<"server">> => <<"127.0.0.1:1883">>, <<"username">> => Username, @@ -44,11 +51,11 @@ -define(INGRESS_CONF, #{ <<"remote">> => #{ - <<"topic">> => <<"remote_topic/#">>, + <<"topic">> => <>, <<"qos">> => 2 }, <<"local">> => #{ - <<"topic">> => <<"local_topic/${topic}">>, + <<"topic">> => <>, <<"qos">> => <<"${qos}">>, <<"payload">> => <<"${payload}">>, <<"retain">> => <<"${retain}">> @@ -57,10 +64,10 @@ -define(EGRESS_CONF, #{ <<"local">> => #{ - <<"topic">> => <<"local_topic/#">> + <<"topic">> => <> }, <<"remote">> => #{ - <<"topic">> => <<"remote_topic/${topic}">>, + <<"topic">> => <>, <<"payload">> => <<"${payload}">>, <<"qos">> => <<"${qos}">>, <<"retain">> => <<"${retain}">> @@ -155,8 +162,8 @@ t_mqtt_conn_bridge_ingress(_) -> BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS), %% we now test if the bridge works as expected - RemoteTopic = <<"remote_topic/1">>, - LocalTopic = <<"local_topic/", RemoteTopic/binary>>, + RemoteTopic = <>, + LocalTopic = <>, Payload = <<"hello">>, emqx:subscribe(LocalTopic), timer:sleep(100), @@ -219,8 +226,8 @@ t_mqtt_conn_bridge_egress(_) -> } = jsx:decode(Bridge), BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), %% we now test if the bridge works as expected - LocalTopic = <<"local_topic/1">>, - RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, + LocalTopic = <>, + RemoteTopic = <>, Payload = <<"hello">>, emqx:subscribe(RemoteTopic), timer:sleep(100), @@ -264,6 +271,113 @@ t_mqtt_conn_bridge_egress(_) -> {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), ok. +t_mqtt_conn_bridge_ingress_and_egress(_) -> + User1 = <<"user1">>, + %% create an MQTT bridge, using POST + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(User1)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_INGRESS, + <<"ingress">> => ?INGRESS_CONF + } + ), + + #{ + <<"type">> := ?TYPE_MQTT, + <<"name">> := ?BRIDGE_NAME_INGRESS + } = jsx:decode(Bridge), + BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS), + {ok, 201, Bridge2} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(User1)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_EGRESS, + <<"egress">> => ?EGRESS_CONF + } + ), + #{ + <<"type">> := ?TYPE_MQTT, + <<"name">> := ?BRIDGE_NAME_EGRESS + } = jsx:decode(Bridge2), + + BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + %% we now test if the bridge works as expected + LocalTopic = <>, + RemoteTopic = <>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + + {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), + #{ + <<"metrics">> := #{ + <<"matched">> := CntMatched1, <<"success">> := CntSuccess1, <<"failed">> := 0 + }, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := + #{ + <<"matched">> := NodeCntMatched1, + <<"success">> := NodeCntSuccess1, + <<"failed">> := 0 + } + } + ] + } = jsx:decode(BridgeStr1), + timer:sleep(100), + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic, #message{payload = Payload}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end + ), + + %% verify the metrics of the bridge + timer:sleep(1000), + {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []), + #{ + <<"metrics">> := #{ + <<"matched">> := CntMatched2, <<"success">> := CntSuccess2, <<"failed">> := 0 + }, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := + #{ + <<"matched">> := NodeCntMatched2, + <<"success">> := NodeCntSuccess2, + <<"failed">> := 0 + } + } + ] + } = jsx:decode(BridgeStr2), + ?assertEqual(CntMatched2, CntMatched1 + 1), + ?assertEqual(CntSuccess2, CntSuccess1 + 1), + ?assertEqual(NodeCntMatched2, NodeCntMatched1 + 1), + ?assertEqual(NodeCntSuccess2, NodeCntSuccess1 + 1), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + ok. + t_ingress_mqtt_bridge_with_rules(_) -> {ok, 201, _} = request( post, @@ -290,8 +404,8 @@ t_ingress_mqtt_bridge_with_rules(_) -> %% we now test if the bridge works as expected - RemoteTopic = <<"remote_topic/1">>, - LocalTopic = <<"local_topic/", RemoteTopic/binary>>, + RemoteTopic = <>, + LocalTopic = <>, Payload = <<"hello">>, emqx:subscribe(LocalTopic), timer:sleep(100), @@ -400,8 +514,8 @@ t_egress_mqtt_bridge_with_rules(_) -> #{<<"id">> := RuleId} = jsx:decode(Rule), %% we now test if the bridge works as expected - LocalTopic = <<"local_topic/1">>, - RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, + LocalTopic = <>, + RemoteTopic = <>, Payload = <<"hello">>, emqx:subscribe(RemoteTopic), timer:sleep(100), @@ -426,7 +540,7 @@ t_egress_mqtt_bridge_with_rules(_) -> %% PUBLISH a message to the rule. Payload2 = <<"hi">>, RuleTopic = <<"t/1">>, - RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, + RemoteTopic2 = <>, emqx:subscribe(RemoteTopic2), timer:sleep(100), emqx:publish(emqx_message:make(RuleTopic, Payload2)), @@ -517,8 +631,8 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> } = jsx:decode(Bridge), BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), %% we now test if the bridge works as expected - LocalTopic = <<"local_topic/1">>, - RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, + LocalTopic = <>, + RemoteTopic = <>, Payload0 = <<"hello">>, emqx:subscribe(RemoteTopic), timer:sleep(100), diff --git a/changes/v5.0.12-en.md b/changes/v5.0.12-en.md index 3363d9766..08b444c0f 100644 --- a/changes/v5.0.12-en.md +++ b/changes/v5.0.12-en.md @@ -52,3 +52,5 @@ Please note, the request body of `/bridges` API to configure MQTT brdige is chan - Return `400` if query param `node` is not a known node in `/trace/:id/download?node={node}` [#9478](https://github.com/emqx/emqx/pull/9478). - `POST /traces` to return `409` in case of duplicate [#9494](https://github.com/emqx/emqx/pull/9494). + +- Fix bridging function, when both ingress and egress bridges are configured, egress bridge does not work [#9523](https://github.com/emqx/emqx/pull/9523). diff --git a/changes/v5.0.12-zh.md b/changes/v5.0.12-zh.md index ffbb713c1..91edb8657 100644 --- a/changes/v5.0.12-zh.md +++ b/changes/v5.0.12-zh.md @@ -50,3 +50,5 @@ v5.0.11 或更早版本创建的配置文件,在新版本中会被自动转换 - 如果在调用 `/trace/:id/download?node={node}` 时,`node` 不存在,则会返回 `400` [#9478](https://github.com/emqx/emqx/pull/9478)。 - 当重复调用 `POST /traces` 时,将会返回 `409` ,而不再是 `400` [#9494](https://github.com/emqx/emqx/pull/9494)。 + +- 桥接功能修复,当同时配置了2个桥,方向为入桥和出桥时,出桥不工作的问题。[#9523](https://github.com/emqx/emqx/pull/9523).