Merge pull request #9523 from qzhuyan/fix/william/bridges-ingress-and-egress
Fix/william/bridges ingress and egress
This commit is contained in:
commit
c2eabfe5f7
|
@ -138,7 +138,7 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
|
|||
{ok, Message}.
|
||||
|
||||
send_to_matched_egress_bridges(Topic, Msg) ->
|
||||
MatchedBridgeIds = get_matched_bridges(Topic),
|
||||
MatchedBridgeIds = get_matched_egress_bridges(Topic),
|
||||
lists:foreach(
|
||||
fun(Id) ->
|
||||
try send_message(Id, Msg) of
|
||||
|
@ -339,13 +339,19 @@ flatten_confs(Conf0) ->
|
|||
do_flatten_confs(Type, Conf0) ->
|
||||
[{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
|
||||
|
||||
get_matched_bridges(Topic) ->
|
||||
get_matched_egress_bridges(Topic) ->
|
||||
Bridges = emqx:get_config([bridges], #{}),
|
||||
maps:fold(
|
||||
fun(BType, Conf, Acc0) ->
|
||||
maps:fold(
|
||||
fun(BName, BConf, Acc1) ->
|
||||
get_matched_bridge_id(BType, BConf, Topic, BName, Acc1)
|
||||
fun
|
||||
(BName, #{egress := _} = BConf, Acc1) when BType =:= mqtt ->
|
||||
get_matched_bridge_id(BType, BConf, Topic, BName, Acc1);
|
||||
(_BName, #{ingress := _}, Acc1) when BType =:= mqtt ->
|
||||
%% ignore ingress only bridge
|
||||
Acc1;
|
||||
(BName, BConf, Acc1) ->
|
||||
get_matched_bridge_id(BType, BConf, Topic, BName, Acc1)
|
||||
end,
|
||||
Acc0,
|
||||
Conf
|
||||
|
|
|
@ -34,6 +34,13 @@
|
|||
-define(NAME_MQTT, <<"my_mqtt_bridge">>).
|
||||
-define(BRIDGE_NAME_INGRESS, <<"ingress_mqtt_bridge">>).
|
||||
-define(BRIDGE_NAME_EGRESS, <<"egress_mqtt_bridge">>).
|
||||
|
||||
%% Having ingress/egress prefixs of topic names to avoid dead loop while bridging
|
||||
-define(INGRESS_REMOTE_TOPIC, "ingress_remote_topic").
|
||||
-define(INGRESS_LOCAL_TOPIC, "ingress_local_topic").
|
||||
-define(EGRESS_REMOTE_TOPIC, "egress_remote_topic").
|
||||
-define(EGRESS_LOCAL_TOPIC, "egress_local_topic").
|
||||
|
||||
-define(SERVER_CONF(Username), #{
|
||||
<<"server">> => <<"127.0.0.1:1883">>,
|
||||
<<"username">> => Username,
|
||||
|
@ -44,11 +51,11 @@
|
|||
|
||||
-define(INGRESS_CONF, #{
|
||||
<<"remote">> => #{
|
||||
<<"topic">> => <<"remote_topic/#">>,
|
||||
<<"topic">> => <<?INGRESS_REMOTE_TOPIC, "/#">>,
|
||||
<<"qos">> => 2
|
||||
},
|
||||
<<"local">> => #{
|
||||
<<"topic">> => <<"local_topic/${topic}">>,
|
||||
<<"topic">> => <<?INGRESS_LOCAL_TOPIC, "/${topic}">>,
|
||||
<<"qos">> => <<"${qos}">>,
|
||||
<<"payload">> => <<"${payload}">>,
|
||||
<<"retain">> => <<"${retain}">>
|
||||
|
@ -57,10 +64,10 @@
|
|||
|
||||
-define(EGRESS_CONF, #{
|
||||
<<"local">> => #{
|
||||
<<"topic">> => <<"local_topic/#">>
|
||||
<<"topic">> => <<?EGRESS_LOCAL_TOPIC, "/#">>
|
||||
},
|
||||
<<"remote">> => #{
|
||||
<<"topic">> => <<"remote_topic/${topic}">>,
|
||||
<<"topic">> => <<?EGRESS_REMOTE_TOPIC, "/${topic}">>,
|
||||
<<"payload">> => <<"${payload}">>,
|
||||
<<"qos">> => <<"${qos}">>,
|
||||
<<"retain">> => <<"${retain}">>
|
||||
|
@ -155,8 +162,8 @@ t_mqtt_conn_bridge_ingress(_) ->
|
|||
BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
|
||||
|
||||
%% we now test if the bridge works as expected
|
||||
RemoteTopic = <<"remote_topic/1">>,
|
||||
LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
|
||||
RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
|
||||
LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
|
||||
Payload = <<"hello">>,
|
||||
emqx:subscribe(LocalTopic),
|
||||
timer:sleep(100),
|
||||
|
@ -219,8 +226,8 @@ t_mqtt_conn_bridge_egress(_) ->
|
|||
} = jsx:decode(Bridge),
|
||||
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
||||
%% we now test if the bridge works as expected
|
||||
LocalTopic = <<"local_topic/1">>,
|
||||
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
|
||||
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
||||
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
||||
Payload = <<"hello">>,
|
||||
emqx:subscribe(RemoteTopic),
|
||||
timer:sleep(100),
|
||||
|
@ -264,6 +271,113 @@ t_mqtt_conn_bridge_egress(_) ->
|
|||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||
ok.
|
||||
|
||||
t_mqtt_conn_bridge_ingress_and_egress(_) ->
|
||||
User1 = <<"user1">>,
|
||||
%% create an MQTT bridge, using POST
|
||||
{ok, 201, Bridge} = request(
|
||||
post,
|
||||
uri(["bridges"]),
|
||||
?SERVER_CONF(User1)#{
|
||||
<<"type">> => ?TYPE_MQTT,
|
||||
<<"name">> => ?BRIDGE_NAME_INGRESS,
|
||||
<<"ingress">> => ?INGRESS_CONF
|
||||
}
|
||||
),
|
||||
|
||||
#{
|
||||
<<"type">> := ?TYPE_MQTT,
|
||||
<<"name">> := ?BRIDGE_NAME_INGRESS
|
||||
} = jsx:decode(Bridge),
|
||||
BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
|
||||
{ok, 201, Bridge2} = request(
|
||||
post,
|
||||
uri(["bridges"]),
|
||||
?SERVER_CONF(User1)#{
|
||||
<<"type">> => ?TYPE_MQTT,
|
||||
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
||||
<<"egress">> => ?EGRESS_CONF
|
||||
}
|
||||
),
|
||||
#{
|
||||
<<"type">> := ?TYPE_MQTT,
|
||||
<<"name">> := ?BRIDGE_NAME_EGRESS
|
||||
} = jsx:decode(Bridge2),
|
||||
|
||||
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
||||
%% we now test if the bridge works as expected
|
||||
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
||||
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
||||
Payload = <<"hello">>,
|
||||
emqx:subscribe(RemoteTopic),
|
||||
|
||||
{ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
||||
#{
|
||||
<<"metrics">> := #{
|
||||
<<"matched">> := CntMatched1, <<"success">> := CntSuccess1, <<"failed">> := 0
|
||||
},
|
||||
<<"node_metrics">> :=
|
||||
[
|
||||
#{
|
||||
<<"node">> := _,
|
||||
<<"metrics">> :=
|
||||
#{
|
||||
<<"matched">> := NodeCntMatched1,
|
||||
<<"success">> := NodeCntSuccess1,
|
||||
<<"failed">> := 0
|
||||
}
|
||||
}
|
||||
]
|
||||
} = jsx:decode(BridgeStr1),
|
||||
timer:sleep(100),
|
||||
%% PUBLISH a message to the 'local' broker, as we have only one broker,
|
||||
%% the remote broker is also the local one.
|
||||
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
||||
|
||||
%% we should receive a message on the "remote" broker, with specified topic
|
||||
?assert(
|
||||
receive
|
||||
{deliver, RemoteTopic, #message{payload = Payload}} ->
|
||||
ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
|
||||
true;
|
||||
Msg ->
|
||||
ct:pal("Msg: ~p", [Msg]),
|
||||
false
|
||||
after 100 ->
|
||||
false
|
||||
end
|
||||
),
|
||||
|
||||
%% verify the metrics of the bridge
|
||||
timer:sleep(1000),
|
||||
{ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
||||
#{
|
||||
<<"metrics">> := #{
|
||||
<<"matched">> := CntMatched2, <<"success">> := CntSuccess2, <<"failed">> := 0
|
||||
},
|
||||
<<"node_metrics">> :=
|
||||
[
|
||||
#{
|
||||
<<"node">> := _,
|
||||
<<"metrics">> :=
|
||||
#{
|
||||
<<"matched">> := NodeCntMatched2,
|
||||
<<"success">> := NodeCntSuccess2,
|
||||
<<"failed">> := 0
|
||||
}
|
||||
}
|
||||
]
|
||||
} = jsx:decode(BridgeStr2),
|
||||
?assertEqual(CntMatched2, CntMatched1 + 1),
|
||||
?assertEqual(CntSuccess2, CntSuccess1 + 1),
|
||||
?assertEqual(NodeCntMatched2, NodeCntMatched1 + 1),
|
||||
?assertEqual(NodeCntSuccess2, NodeCntSuccess1 + 1),
|
||||
|
||||
%% delete the bridge
|
||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
|
||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||
ok.
|
||||
|
||||
t_ingress_mqtt_bridge_with_rules(_) ->
|
||||
{ok, 201, _} = request(
|
||||
post,
|
||||
|
@ -290,8 +404,8 @@ t_ingress_mqtt_bridge_with_rules(_) ->
|
|||
|
||||
%% we now test if the bridge works as expected
|
||||
|
||||
RemoteTopic = <<"remote_topic/1">>,
|
||||
LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
|
||||
RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
|
||||
LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
|
||||
Payload = <<"hello">>,
|
||||
emqx:subscribe(LocalTopic),
|
||||
timer:sleep(100),
|
||||
|
@ -400,8 +514,8 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
|||
#{<<"id">> := RuleId} = jsx:decode(Rule),
|
||||
|
||||
%% we now test if the bridge works as expected
|
||||
LocalTopic = <<"local_topic/1">>,
|
||||
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
|
||||
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
||||
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
||||
Payload = <<"hello">>,
|
||||
emqx:subscribe(RemoteTopic),
|
||||
timer:sleep(100),
|
||||
|
@ -426,7 +540,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
|||
%% PUBLISH a message to the rule.
|
||||
Payload2 = <<"hi">>,
|
||||
RuleTopic = <<"t/1">>,
|
||||
RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>,
|
||||
RemoteTopic2 = <<?EGRESS_REMOTE_TOPIC, "/", RuleTopic/binary>>,
|
||||
emqx:subscribe(RemoteTopic2),
|
||||
timer:sleep(100),
|
||||
emqx:publish(emqx_message:make(RuleTopic, Payload2)),
|
||||
|
@ -517,8 +631,8 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
|||
} = jsx:decode(Bridge),
|
||||
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
||||
%% we now test if the bridge works as expected
|
||||
LocalTopic = <<"local_topic/1">>,
|
||||
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
|
||||
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
||||
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
||||
Payload0 = <<"hello">>,
|
||||
emqx:subscribe(RemoteTopic),
|
||||
timer:sleep(100),
|
||||
|
|
|
@ -52,3 +52,5 @@ Please note, the request body of `/bridges` API to configure MQTT brdige is chan
|
|||
- Return `400` if query param `node` is not a known node in `/trace/:id/download?node={node}` [#9478](https://github.com/emqx/emqx/pull/9478).
|
||||
|
||||
- `POST /traces` to return `409` in case of duplicate [#9494](https://github.com/emqx/emqx/pull/9494).
|
||||
|
||||
- Fix bridging function, when both ingress and egress bridges are configured, egress bridge does not work [#9523](https://github.com/emqx/emqx/pull/9523).
|
||||
|
|
|
@ -50,3 +50,5 @@ v5.0.11 或更早版本创建的配置文件,在新版本中会被自动转换
|
|||
- 如果在调用 `/trace/:id/download?node={node}` 时,`node` 不存在,则会返回 `400` [#9478](https://github.com/emqx/emqx/pull/9478)。
|
||||
|
||||
- 当重复调用 `POST /traces` 时,将会返回 `409` ,而不再是 `400` [#9494](https://github.com/emqx/emqx/pull/9494)。
|
||||
|
||||
- 桥接功能修复,当同时配置了2个桥,方向为入桥和出桥时,出桥不工作的问题。[#9523](https://github.com/emqx/emqx/pull/9523).
|
||||
|
|
Loading…
Reference in New Issue