refactor: reuse some parts of test code for brewity
This commit is contained in:
parent
f0395be383
commit
ad88938d34
|
@ -32,7 +32,6 @@
|
||||||
|
|
||||||
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
|
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
|
||||||
-define(TYPE_MQTT, <<"mqtt">>).
|
-define(TYPE_MQTT, <<"mqtt">>).
|
||||||
-define(NAME_MQTT, <<"my_mqtt_bridge">>).
|
|
||||||
-define(BRIDGE_NAME_INGRESS, <<"ingress_mqtt_bridge">>).
|
-define(BRIDGE_NAME_INGRESS, <<"ingress_mqtt_bridge">>).
|
||||||
-define(BRIDGE_NAME_EGRESS, <<"egress_mqtt_bridge">>).
|
-define(BRIDGE_NAME_EGRESS, <<"egress_mqtt_bridge">>).
|
||||||
|
|
||||||
|
@ -98,6 +97,24 @@
|
||||||
}
|
}
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-define(assertMetrics(Pat, BridgeID),
|
||||||
|
?assertMetrics(Pat, true, BridgeID)
|
||||||
|
).
|
||||||
|
-define(assertMetrics(Pat, Guard, BridgeID),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"metrics">> := Pat,
|
||||||
|
<<"node_metrics">> := [
|
||||||
|
#{
|
||||||
|
<<"node">> := _,
|
||||||
|
<<"metrics">> := Pat
|
||||||
|
}
|
||||||
|
]
|
||||||
|
} when Guard,
|
||||||
|
request_bridge_metrics(BridgeID)
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
||||||
inspect(Selected, _Envs, _Args) ->
|
inspect(Selected, _Envs, _Args) ->
|
||||||
persistent_term:put(?MODULE, #{inspect => Selected}).
|
persistent_term:put(?MODULE, #{inspect => Selected}).
|
||||||
|
|
||||||
|
@ -176,7 +193,7 @@ t_mqtt_conn_bridge_ingress(_) ->
|
||||||
{ok, 201, Bridge} = request(
|
{ok, 201, Bridge} = request(
|
||||||
post,
|
post,
|
||||||
uri(["bridges"]),
|
uri(["bridges"]),
|
||||||
?SERVER_CONF(User1)#{
|
ServerConf = ?SERVER_CONF(User1)#{
|
||||||
<<"type">> => ?TYPE_MQTT,
|
<<"type">> => ?TYPE_MQTT,
|
||||||
<<"name">> => ?BRIDGE_NAME_INGRESS,
|
<<"name">> => ?BRIDGE_NAME_INGRESS,
|
||||||
<<"ingress">> => ?INGRESS_CONF
|
<<"ingress">> => ?INGRESS_CONF
|
||||||
|
@ -186,6 +203,7 @@ t_mqtt_conn_bridge_ingress(_) ->
|
||||||
<<"type">> := ?TYPE_MQTT,
|
<<"type">> := ?TYPE_MQTT,
|
||||||
<<"name">> := ?BRIDGE_NAME_INGRESS
|
<<"name">> := ?BRIDGE_NAME_INGRESS
|
||||||
} = jsx:decode(Bridge),
|
} = jsx:decode(Bridge),
|
||||||
|
|
||||||
BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
|
BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
|
||||||
|
|
||||||
%% we now test if the bridge works as expected
|
%% we now test if the bridge works as expected
|
||||||
|
@ -198,34 +216,12 @@ t_mqtt_conn_bridge_ingress(_) ->
|
||||||
%% the remote broker is also the local one.
|
%% the remote broker is also the local one.
|
||||||
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
||||||
%% we should receive a message on the local broker, with specified topic
|
%% we should receive a message on the local broker, with specified topic
|
||||||
?assert(
|
assert_mqtt_msg_received(LocalTopic, Payload),
|
||||||
receive
|
|
||||||
{deliver, LocalTopic, #message{payload = Payload}} ->
|
|
||||||
ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
|
|
||||||
true;
|
|
||||||
Msg ->
|
|
||||||
ct:pal("Msg: ~p", [Msg]),
|
|
||||||
false
|
|
||||||
after 100 ->
|
|
||||||
false
|
|
||||||
end
|
|
||||||
),
|
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
{ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []),
|
?assertMetrics(
|
||||||
?assertMatch(
|
#{<<"matched">> := 0, <<"received">> := 1},
|
||||||
#{
|
BridgeIDIngress
|
||||||
<<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
|
|
||||||
<<"node_metrics">> :=
|
|
||||||
[
|
|
||||||
#{
|
|
||||||
<<"node">> := _,
|
|
||||||
<<"metrics">> :=
|
|
||||||
#{<<"matched">> := 0, <<"received">> := 1}
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
jsx:decode(BridgeMetricsStr)
|
|
||||||
),
|
),
|
||||||
|
|
||||||
%% delete the bridge
|
%% delete the bridge
|
||||||
|
@ -236,21 +232,13 @@ t_mqtt_conn_bridge_ingress(_) ->
|
||||||
|
|
||||||
t_mqtt_conn_bridge_ingress_no_payload_template(_) ->
|
t_mqtt_conn_bridge_ingress_no_payload_template(_) ->
|
||||||
User1 = <<"user1">>,
|
User1 = <<"user1">>,
|
||||||
%% create an MQTT bridge, using POST
|
BridgeIDIngress = create_bridge(
|
||||||
{ok, 201, Bridge} = request(
|
|
||||||
post,
|
|
||||||
uri(["bridges"]),
|
|
||||||
?SERVER_CONF(User1)#{
|
?SERVER_CONF(User1)#{
|
||||||
<<"type">> => ?TYPE_MQTT,
|
<<"type">> => ?TYPE_MQTT,
|
||||||
<<"name">> => ?BRIDGE_NAME_INGRESS,
|
<<"name">> => ?BRIDGE_NAME_INGRESS,
|
||||||
<<"ingress">> => ?INGRESS_CONF_NO_PAYLOAD_TEMPLATE
|
<<"ingress">> => ?INGRESS_CONF_NO_PAYLOAD_TEMPLATE
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
#{
|
|
||||||
<<"type">> := ?TYPE_MQTT,
|
|
||||||
<<"name">> := ?BRIDGE_NAME_INGRESS
|
|
||||||
} = jsx:decode(Bridge),
|
|
||||||
BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
|
|
||||||
|
|
||||||
%% we now test if the bridge works as expected
|
%% we now test if the bridge works as expected
|
||||||
RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
|
RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
|
||||||
|
@ -262,40 +250,13 @@ t_mqtt_conn_bridge_ingress_no_payload_template(_) ->
|
||||||
%% the remote broker is also the local one.
|
%% the remote broker is also the local one.
|
||||||
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
||||||
%% we should receive a message on the local broker, with specified topic
|
%% we should receive a message on the local broker, with specified topic
|
||||||
?assert(
|
Msg = assert_mqtt_msg_received(LocalTopic),
|
||||||
receive
|
?assertMatch(#{<<"payload">> := Payload}, jsx:decode(Msg#message.payload)),
|
||||||
{deliver, LocalTopic, #message{payload = MapMsg}} ->
|
|
||||||
ct:pal("local broker got message: ~p on topic ~p", [MapMsg, LocalTopic]),
|
|
||||||
%% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here.
|
|
||||||
case jsx:decode(MapMsg) of
|
|
||||||
#{<<"payload">> := Payload} ->
|
|
||||||
true;
|
|
||||||
_ ->
|
|
||||||
false
|
|
||||||
end;
|
|
||||||
Msg ->
|
|
||||||
ct:pal("Msg: ~p", [Msg]),
|
|
||||||
false
|
|
||||||
after 100 ->
|
|
||||||
false
|
|
||||||
end
|
|
||||||
),
|
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []),
|
?assertMetrics(
|
||||||
?assertMatch(
|
#{<<"matched">> := 0, <<"received">> := 1},
|
||||||
#{
|
BridgeIDIngress
|
||||||
<<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
|
|
||||||
<<"node_metrics">> :=
|
|
||||||
[
|
|
||||||
#{
|
|
||||||
<<"node">> := _,
|
|
||||||
<<"metrics">> :=
|
|
||||||
#{<<"matched">> := 0, <<"received">> := 1}
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
jsx:decode(BridgeStr)
|
|
||||||
),
|
),
|
||||||
|
|
||||||
%% delete the bridge
|
%% delete the bridge
|
||||||
|
@ -307,22 +268,15 @@ t_mqtt_conn_bridge_ingress_no_payload_template(_) ->
|
||||||
t_mqtt_conn_bridge_egress(_) ->
|
t_mqtt_conn_bridge_egress(_) ->
|
||||||
%% then we add a mqtt connector, using POST
|
%% then we add a mqtt connector, using POST
|
||||||
User1 = <<"user1">>,
|
User1 = <<"user1">>,
|
||||||
|
BridgeIDEgress = create_bridge(
|
||||||
{ok, 201, Bridge} = request(
|
|
||||||
post,
|
|
||||||
uri(["bridges"]),
|
|
||||||
?SERVER_CONF(User1)#{
|
?SERVER_CONF(User1)#{
|
||||||
<<"type">> => ?TYPE_MQTT,
|
<<"type">> => ?TYPE_MQTT,
|
||||||
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
||||||
<<"egress">> => ?EGRESS_CONF
|
<<"egress">> => ?EGRESS_CONF
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
#{
|
|
||||||
<<"type">> := ?TYPE_MQTT,
|
|
||||||
<<"name">> := ?BRIDGE_NAME_EGRESS
|
|
||||||
} = jsx:decode(Bridge),
|
|
||||||
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
|
||||||
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
||||||
|
|
||||||
%% we now test if the bridge works as expected
|
%% we now test if the bridge works as expected
|
||||||
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
||||||
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
||||||
|
@ -334,36 +288,14 @@ t_mqtt_conn_bridge_egress(_) ->
|
||||||
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
||||||
|
|
||||||
%% we should receive a message on the "remote" broker, with specified topic
|
%% we should receive a message on the "remote" broker, with specified topic
|
||||||
?assert(
|
Msg = assert_mqtt_msg_received(RemoteTopic, Payload),
|
||||||
receive
|
Size = byte_size(ResourceID),
|
||||||
{deliver, RemoteTopic, #message{payload = Payload, from = From}} ->
|
?assertMatch(<<ResourceID:Size/binary, _/binary>>, Msg#message.from),
|
||||||
ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
|
|
||||||
Size = byte_size(ResourceID),
|
|
||||||
?assertMatch(<<ResourceID:Size/binary, _/binary>>, From),
|
|
||||||
true;
|
|
||||||
Msg ->
|
|
||||||
ct:pal("Msg: ~p", [Msg]),
|
|
||||||
false
|
|
||||||
after 100 ->
|
|
||||||
false
|
|
||||||
end
|
|
||||||
),
|
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
{ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
?assertMetrics(
|
||||||
?assertMatch(
|
#{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
|
||||||
#{
|
BridgeIDEgress
|
||||||
<<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
|
|
||||||
<<"node_metrics">> :=
|
|
||||||
[
|
|
||||||
#{
|
|
||||||
<<"node">> := _,
|
|
||||||
<<"metrics">> :=
|
|
||||||
#{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
jsx:decode(BridgeMetricsStr)
|
|
||||||
),
|
),
|
||||||
|
|
||||||
%% delete the bridge
|
%% delete the bridge
|
||||||
|
@ -375,21 +307,15 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
|
||||||
%% then we add a mqtt connector, using POST
|
%% then we add a mqtt connector, using POST
|
||||||
User1 = <<"user1">>,
|
User1 = <<"user1">>,
|
||||||
|
|
||||||
{ok, 201, Bridge} = request(
|
BridgeIDEgress = create_bridge(
|
||||||
post,
|
|
||||||
uri(["bridges"]),
|
|
||||||
?SERVER_CONF(User1)#{
|
?SERVER_CONF(User1)#{
|
||||||
<<"type">> => ?TYPE_MQTT,
|
<<"type">> => ?TYPE_MQTT,
|
||||||
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
||||||
<<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE
|
<<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
#{
|
|
||||||
<<"type">> := ?TYPE_MQTT,
|
|
||||||
<<"name">> := ?BRIDGE_NAME_EGRESS
|
|
||||||
} = jsx:decode(Bridge),
|
|
||||||
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
|
||||||
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
||||||
|
|
||||||
%% we now test if the bridge works as expected
|
%% we now test if the bridge works as expected
|
||||||
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
||||||
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
||||||
|
@ -401,42 +327,15 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
|
||||||
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
||||||
|
|
||||||
%% we should receive a message on the "remote" broker, with specified topic
|
%% we should receive a message on the "remote" broker, with specified topic
|
||||||
?assert(
|
Msg = assert_mqtt_msg_received(RemoteTopic),
|
||||||
receive
|
%% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here.
|
||||||
{deliver, RemoteTopic, #message{payload = MapMsg, from = From}} ->
|
?assertMatch(<<ResourceID:(byte_size(ResourceID))/binary, _/binary>>, Msg#message.from),
|
||||||
ct:pal("local broker got message: ~p on topic ~p", [MapMsg, RemoteTopic]),
|
?assertMatch(#{<<"payload">> := Payload}, jsx:decode(Msg#message.payload)),
|
||||||
%% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here.
|
|
||||||
Size = byte_size(ResourceID),
|
|
||||||
?assertMatch(<<ResourceID:Size/binary, _/binary>>, From),
|
|
||||||
case jsx:decode(MapMsg) of
|
|
||||||
#{<<"payload">> := Payload} ->
|
|
||||||
true;
|
|
||||||
_ ->
|
|
||||||
false
|
|
||||||
end;
|
|
||||||
Msg ->
|
|
||||||
ct:pal("Msg: ~p", [Msg]),
|
|
||||||
false
|
|
||||||
after 100 ->
|
|
||||||
false
|
|
||||||
end
|
|
||||||
),
|
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
?assertMetrics(
|
||||||
?assertMatch(
|
#{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
|
||||||
#{
|
BridgeIDEgress
|
||||||
<<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
|
|
||||||
<<"node_metrics">> :=
|
|
||||||
[
|
|
||||||
#{
|
|
||||||
<<"node">> := _,
|
|
||||||
<<"metrics">> :=
|
|
||||||
#{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
jsx:decode(BridgeStr)
|
|
||||||
),
|
),
|
||||||
|
|
||||||
%% delete the bridge
|
%% delete the bridge
|
||||||
|
@ -447,9 +346,7 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
|
||||||
|
|
||||||
t_egress_custom_clientid_prefix(_Config) ->
|
t_egress_custom_clientid_prefix(_Config) ->
|
||||||
User1 = <<"user1">>,
|
User1 = <<"user1">>,
|
||||||
{ok, 201, Bridge} = request(
|
BridgeIDEgress = create_bridge(
|
||||||
post,
|
|
||||||
uri(["bridges"]),
|
|
||||||
?SERVER_CONF(User1)#{
|
?SERVER_CONF(User1)#{
|
||||||
<<"clientid_prefix">> => <<"my-custom-prefix">>,
|
<<"clientid_prefix">> => <<"my-custom-prefix">>,
|
||||||
<<"type">> => ?TYPE_MQTT,
|
<<"type">> => ?TYPE_MQTT,
|
||||||
|
@ -457,11 +354,6 @@ t_egress_custom_clientid_prefix(_Config) ->
|
||||||
<<"egress">> => ?EGRESS_CONF
|
<<"egress">> => ?EGRESS_CONF
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
#{
|
|
||||||
<<"type">> := ?TYPE_MQTT,
|
|
||||||
<<"name">> := ?BRIDGE_NAME_EGRESS
|
|
||||||
} = jsx:decode(Bridge),
|
|
||||||
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
|
||||||
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
||||||
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
||||||
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
||||||
|
@ -470,58 +362,36 @@ t_egress_custom_clientid_prefix(_Config) ->
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
||||||
|
|
||||||
receive
|
Msg = assert_mqtt_msg_received(RemoteTopic, Payload),
|
||||||
{deliver, RemoteTopic, #message{from = From}} ->
|
Size = byte_size(ResourceID),
|
||||||
Size = byte_size(ResourceID),
|
?assertMatch(<<"my-custom-prefix:", _ResouceID:Size/binary, _/binary>>, Msg#message.from),
|
||||||
?assertMatch(<<"my-custom-prefix:", _ResouceID:Size/binary, _/binary>>, From),
|
|
||||||
ok
|
|
||||||
after 1000 ->
|
|
||||||
ct:fail("should have published message")
|
|
||||||
end,
|
|
||||||
|
|
||||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_mqtt_conn_bridge_ingress_and_egress(_) ->
|
t_mqtt_conn_bridge_ingress_and_egress(_) ->
|
||||||
User1 = <<"user1">>,
|
User1 = <<"user1">>,
|
||||||
%% create an MQTT bridge, using POST
|
BridgeIDIngress = create_bridge(
|
||||||
{ok, 201, Bridge} = request(
|
|
||||||
post,
|
|
||||||
uri(["bridges"]),
|
|
||||||
?SERVER_CONF(User1)#{
|
?SERVER_CONF(User1)#{
|
||||||
<<"type">> => ?TYPE_MQTT,
|
<<"type">> => ?TYPE_MQTT,
|
||||||
<<"name">> => ?BRIDGE_NAME_INGRESS,
|
<<"name">> => ?BRIDGE_NAME_INGRESS,
|
||||||
<<"ingress">> => ?INGRESS_CONF
|
<<"ingress">> => ?INGRESS_CONF
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
BridgeIDEgress = create_bridge(
|
||||||
#{
|
|
||||||
<<"type">> := ?TYPE_MQTT,
|
|
||||||
<<"name">> := ?BRIDGE_NAME_INGRESS
|
|
||||||
} = jsx:decode(Bridge),
|
|
||||||
BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
|
|
||||||
{ok, 201, Bridge2} = request(
|
|
||||||
post,
|
|
||||||
uri(["bridges"]),
|
|
||||||
?SERVER_CONF(User1)#{
|
?SERVER_CONF(User1)#{
|
||||||
<<"type">> => ?TYPE_MQTT,
|
<<"type">> => ?TYPE_MQTT,
|
||||||
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
||||||
<<"egress">> => ?EGRESS_CONF
|
<<"egress">> => ?EGRESS_CONF
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
#{
|
|
||||||
<<"type">> := ?TYPE_MQTT,
|
|
||||||
<<"name">> := ?BRIDGE_NAME_EGRESS
|
|
||||||
} = jsx:decode(Bridge2),
|
|
||||||
|
|
||||||
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
|
||||||
%% we now test if the bridge works as expected
|
%% we now test if the bridge works as expected
|
||||||
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
||||||
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
||||||
Payload = <<"hello">>,
|
Payload = <<"hello">>,
|
||||||
emqx:subscribe(RemoteTopic),
|
emqx:subscribe(RemoteTopic),
|
||||||
|
|
||||||
{ok, 200, BridgeMetricsStr1} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
|
||||||
#{
|
#{
|
||||||
<<"metrics">> := #{
|
<<"metrics">> := #{
|
||||||
<<"matched">> := CntMatched1, <<"success">> := CntSuccess1, <<"failed">> := 0
|
<<"matched">> := CntMatched1, <<"success">> := CntSuccess1, <<"failed">> := 0
|
||||||
|
@ -538,29 +408,17 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
} = jsx:decode(BridgeMetricsStr1),
|
} = request_bridge_metrics(BridgeIDEgress),
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
%% PUBLISH a message to the 'local' broker, as we have only one broker,
|
%% PUBLISH a message to the 'local' broker, as we have only one broker,
|
||||||
%% the remote broker is also the local one.
|
%% the remote broker is also the local one.
|
||||||
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
||||||
|
|
||||||
%% we should receive a message on the "remote" broker, with specified topic
|
%% we should receive a message on the "remote" broker, with specified topic
|
||||||
?assert(
|
assert_mqtt_msg_received(RemoteTopic, Payload),
|
||||||
receive
|
|
||||||
{deliver, RemoteTopic, #message{payload = Payload}} ->
|
|
||||||
ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
|
|
||||||
true;
|
|
||||||
Msg ->
|
|
||||||
ct:pal("Msg: ~p", [Msg]),
|
|
||||||
false
|
|
||||||
after 100 ->
|
|
||||||
false
|
|
||||||
end
|
|
||||||
),
|
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
timer:sleep(1000),
|
timer:sleep(1000),
|
||||||
{ok, 200, BridgeMetricsStr2} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
|
||||||
#{
|
#{
|
||||||
<<"metrics">> := #{
|
<<"metrics">> := #{
|
||||||
<<"matched">> := CntMatched2, <<"success">> := CntSuccess2, <<"failed">> := 0
|
<<"matched">> := CntMatched2, <<"success">> := CntSuccess2, <<"failed">> := 0
|
||||||
|
@ -577,7 +435,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
} = jsx:decode(BridgeMetricsStr2),
|
} = request_bridge_metrics(BridgeIDEgress),
|
||||||
?assertEqual(CntMatched2, CntMatched1 + 1),
|
?assertEqual(CntMatched2, CntMatched1 + 1),
|
||||||
?assertEqual(CntSuccess2, CntSuccess1 + 1),
|
?assertEqual(CntSuccess2, CntSuccess1 + 1),
|
||||||
?assertEqual(NodeCntMatched2, NodeCntMatched1 + 1),
|
?assertEqual(NodeCntMatched2, NodeCntMatched1 + 1),
|
||||||
|
@ -590,16 +448,13 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_ingress_mqtt_bridge_with_rules(_) ->
|
t_ingress_mqtt_bridge_with_rules(_) ->
|
||||||
{ok, 201, _} = request(
|
BridgeIDIngress = create_bridge(
|
||||||
post,
|
|
||||||
uri(["bridges"]),
|
|
||||||
?SERVER_CONF(<<"user1">>)#{
|
?SERVER_CONF(<<"user1">>)#{
|
||||||
<<"type">> => ?TYPE_MQTT,
|
<<"type">> => ?TYPE_MQTT,
|
||||||
<<"name">> => ?BRIDGE_NAME_INGRESS,
|
<<"name">> => ?BRIDGE_NAME_INGRESS,
|
||||||
<<"ingress">> => ?INGRESS_CONF
|
<<"ingress">> => ?INGRESS_CONF
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
|
|
||||||
|
|
||||||
{ok, 201, Rule} = request(
|
{ok, 201, Rule} = request(
|
||||||
post,
|
post,
|
||||||
|
@ -624,18 +479,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
|
||||||
%% the remote broker is also the local one.
|
%% the remote broker is also the local one.
|
||||||
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
||||||
%% we should receive a message on the local broker, with specified topic
|
%% we should receive a message on the local broker, with specified topic
|
||||||
?assert(
|
assert_mqtt_msg_received(LocalTopic, Payload),
|
||||||
receive
|
|
||||||
{deliver, LocalTopic, #message{payload = Payload}} ->
|
|
||||||
ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
|
|
||||||
true;
|
|
||||||
Msg ->
|
|
||||||
ct:pal("Msg: ~p", [Msg]),
|
|
||||||
false
|
|
||||||
after 100 ->
|
|
||||||
false
|
|
||||||
end
|
|
||||||
),
|
|
||||||
%% and also the rule should be matched, with matched + 1:
|
%% and also the rule should be matched, with matched + 1:
|
||||||
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
|
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
|
||||||
{ok, 200, Metrics} = request(get, uri(["rules", RuleId, "metrics"]), []),
|
{ok, 200, Metrics} = request(get, uri(["rules", RuleId, "metrics"]), []),
|
||||||
|
@ -680,37 +524,22 @@ t_ingress_mqtt_bridge_with_rules(_) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
{ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []),
|
?assertMetrics(
|
||||||
?assertMatch(
|
#{<<"matched">> := 0, <<"received">> := 1},
|
||||||
#{
|
BridgeIDIngress
|
||||||
<<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
|
|
||||||
<<"node_metrics">> :=
|
|
||||||
[
|
|
||||||
#{
|
|
||||||
<<"node">> := _,
|
|
||||||
<<"metrics">> :=
|
|
||||||
#{<<"matched">> := 0, <<"received">> := 1}
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
jsx:decode(BridgeMetricsStr)
|
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
||||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []).
|
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []).
|
||||||
|
|
||||||
t_egress_mqtt_bridge_with_rules(_) ->
|
t_egress_mqtt_bridge_with_rules(_) ->
|
||||||
{ok, 201, Bridge} = request(
|
BridgeIDEgress = create_bridge(
|
||||||
post,
|
|
||||||
uri(["bridges"]),
|
|
||||||
?SERVER_CONF(<<"user1">>)#{
|
?SERVER_CONF(<<"user1">>)#{
|
||||||
<<"type">> => ?TYPE_MQTT,
|
<<"type">> => ?TYPE_MQTT,
|
||||||
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
||||||
<<"egress">> => ?EGRESS_CONF
|
<<"egress">> => ?EGRESS_CONF
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
#{<<"type">> := ?TYPE_MQTT, <<"name">> := ?BRIDGE_NAME_EGRESS} = jsx:decode(Bridge),
|
|
||||||
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
|
||||||
|
|
||||||
{ok, 201, Rule} = request(
|
{ok, 201, Rule} = request(
|
||||||
post,
|
post,
|
||||||
|
@ -734,18 +563,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
||||||
%% the remote broker is also the local one.
|
%% the remote broker is also the local one.
|
||||||
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
||||||
%% we should receive a message on the "remote" broker, with specified topic
|
%% we should receive a message on the "remote" broker, with specified topic
|
||||||
?assert(
|
assert_mqtt_msg_received(RemoteTopic, Payload),
|
||||||
receive
|
|
||||||
{deliver, RemoteTopic, #message{payload = Payload}} ->
|
|
||||||
ct:pal("remote broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
|
|
||||||
true;
|
|
||||||
Msg ->
|
|
||||||
ct:pal("Msg: ~p", [Msg]),
|
|
||||||
false
|
|
||||||
after 100 ->
|
|
||||||
false
|
|
||||||
end
|
|
||||||
),
|
|
||||||
emqx:unsubscribe(RemoteTopic),
|
emqx:unsubscribe(RemoteTopic),
|
||||||
|
|
||||||
%% PUBLISH a message to the rule.
|
%% PUBLISH a message to the rule.
|
||||||
|
@ -780,35 +598,12 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
%% we should receive a message on the "remote" broker, with specified topic
|
%% we should receive a message on the "remote" broker, with specified topic
|
||||||
?assert(
|
assert_mqtt_msg_received(RemoteTopic2, Payload2),
|
||||||
receive
|
|
||||||
{deliver, RemoteTopic2, #message{payload = Payload2}} ->
|
|
||||||
ct:pal("remote broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]),
|
|
||||||
true;
|
|
||||||
Msg ->
|
|
||||||
ct:pal("Msg: ~p", [Msg]),
|
|
||||||
false
|
|
||||||
after 100 ->
|
|
||||||
false
|
|
||||||
end
|
|
||||||
),
|
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
{ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
?assertMetrics(
|
||||||
?assertMatch(
|
#{<<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0},
|
||||||
#{
|
BridgeIDEgress
|
||||||
<<"metrics">> := #{<<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0},
|
|
||||||
<<"node_metrics">> :=
|
|
||||||
[
|
|
||||||
#{
|
|
||||||
<<"node">> := _,
|
|
||||||
<<"metrics">> := #{
|
|
||||||
<<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
jsx:decode(BridgeMetricsStr)
|
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
||||||
|
@ -817,10 +612,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
||||||
t_mqtt_conn_bridge_egress_reconnect(_) ->
|
t_mqtt_conn_bridge_egress_reconnect(_) ->
|
||||||
%% then we add a mqtt connector, using POST
|
%% then we add a mqtt connector, using POST
|
||||||
User1 = <<"user1">>,
|
User1 = <<"user1">>,
|
||||||
|
BridgeIDEgress = create_bridge(
|
||||||
{ok, 201, Bridge} = request(
|
|
||||||
post,
|
|
||||||
uri(["bridges"]),
|
|
||||||
?SERVER_CONF(User1)#{
|
?SERVER_CONF(User1)#{
|
||||||
<<"type">> => ?TYPE_MQTT,
|
<<"type">> => ?TYPE_MQTT,
|
||||||
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
||||||
|
@ -837,17 +629,14 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
#{
|
|
||||||
<<"type">> := ?TYPE_MQTT,
|
|
||||||
<<"name">> := ?BRIDGE_NAME_EGRESS
|
|
||||||
} = jsx:decode(Bridge),
|
|
||||||
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
|
||||||
on_exit(fun() ->
|
on_exit(fun() ->
|
||||||
%% delete the bridge
|
%% delete the bridge
|
||||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||||
ok
|
ok
|
||||||
end),
|
end),
|
||||||
|
|
||||||
%% we now test if the bridge works as expected
|
%% we now test if the bridge works as expected
|
||||||
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
||||||
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
||||||
|
@ -862,20 +651,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
||||||
assert_mqtt_msg_received(RemoteTopic, Payload0),
|
assert_mqtt_msg_received(RemoteTopic, Payload0),
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
{ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
?assertMetrics(
|
||||||
?assertMatch(
|
#{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
|
||||||
#{
|
BridgeIDEgress
|
||||||
<<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
|
|
||||||
<<"node_metrics">> :=
|
|
||||||
[
|
|
||||||
#{
|
|
||||||
<<"node">> := _,
|
|
||||||
<<"metrics">> :=
|
|
||||||
#{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
jsx:decode(BridgeMetricsStr)
|
|
||||||
),
|
),
|
||||||
|
|
||||||
%% stop the listener 1883 to make the bridge disconnected
|
%% stop the listener 1883 to make the bridge disconnected
|
||||||
|
@ -906,63 +684,91 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
||||||
{ok, _} = snabbkaffe:receive_events(SRef),
|
{ok, _} = snabbkaffe:receive_events(SRef),
|
||||||
|
|
||||||
%% verify the metrics of the bridge, the message should be queued
|
%% verify the metrics of the bridge, the message should be queued
|
||||||
{ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
|
||||||
{ok, 200, BridgeMetricsStr1} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
|
||||||
Decoded1 = jsx:decode(BridgeStr1),
|
|
||||||
DecodedMetrics1 = jsx:decode(BridgeMetricsStr1),
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
Status when (Status == <<"connecting">> orelse Status == <<"disconnected">>),
|
#{<<"status">> := Status} when Status == <<"connecting">>; Status == <<"disconnected">>,
|
||||||
maps:get(<<"status">>, Decoded1)
|
request_bridge(BridgeIDEgress)
|
||||||
),
|
),
|
||||||
%% matched >= 3 because of possible retries.
|
%% matched >= 3 because of possible retries.
|
||||||
?assertMatch(
|
?assertMetrics(
|
||||||
#{
|
#{
|
||||||
<<"matched">> := Matched,
|
<<"matched">> := Matched,
|
||||||
<<"success">> := 1,
|
<<"success">> := 1,
|
||||||
<<"failed">> := 0,
|
<<"failed">> := 0,
|
||||||
<<"queuing">> := Queuing,
|
<<"queuing">> := Queuing,
|
||||||
<<"inflight">> := Inflight
|
<<"inflight">> := Inflight
|
||||||
} when Matched >= 3 andalso Inflight + Queuing == 2,
|
},
|
||||||
maps:get(<<"metrics">>, DecodedMetrics1)
|
Matched >= 3 andalso Inflight + Queuing == 2,
|
||||||
|
BridgeIDEgress
|
||||||
),
|
),
|
||||||
|
|
||||||
%% start the listener 1883 to make the bridge reconnected
|
%% start the listener 1883 to make the bridge reconnected
|
||||||
ok = emqx_listeners:start_listener('tcp:default'),
|
ok = emqx_listeners:start_listener('tcp:default'),
|
||||||
timer:sleep(1500),
|
timer:sleep(1500),
|
||||||
%% verify the metrics of the bridge, the 2 queued messages should have been sent
|
%% verify the metrics of the bridge, the 2 queued messages should have been sent
|
||||||
{ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
?assertMatch(#{<<"status">> := <<"connected">>}, request_bridge(BridgeIDEgress)),
|
||||||
{ok, 200, BridgeMetricsStr2} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
|
||||||
Decoded2 = jsx:decode(BridgeStr2),
|
|
||||||
?assertEqual(<<"connected">>, maps:get(<<"status">>, Decoded2)),
|
|
||||||
%% matched >= 3 because of possible retries.
|
%% matched >= 3 because of possible retries.
|
||||||
?assertMatch(
|
?assertMetrics(
|
||||||
#{
|
#{
|
||||||
<<"metrics">> := #{
|
<<"matched">> := Matched,
|
||||||
<<"matched">> := Matched,
|
<<"success">> := 3,
|
||||||
<<"success">> := 3,
|
<<"failed">> := 0,
|
||||||
<<"failed">> := 0,
|
<<"queuing">> := 0,
|
||||||
<<"queuing">> := 0,
|
<<"retried">> := _
|
||||||
<<"retried">> := _
|
},
|
||||||
}
|
Matched >= 3,
|
||||||
} when Matched >= 3,
|
BridgeIDEgress
|
||||||
jsx:decode(BridgeMetricsStr2)
|
|
||||||
),
|
),
|
||||||
%% also verify the 2 messages have been sent to the remote broker
|
%% also verify the 2 messages have been sent to the remote broker
|
||||||
assert_mqtt_msg_received(RemoteTopic, Payload1),
|
assert_mqtt_msg_received(RemoteTopic, Payload1),
|
||||||
assert_mqtt_msg_received(RemoteTopic, Payload2),
|
assert_mqtt_msg_received(RemoteTopic, Payload2),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
assert_mqtt_msg_received(Topic) ->
|
||||||
|
assert_mqtt_msg_received(Topic, '_', 200).
|
||||||
|
|
||||||
assert_mqtt_msg_received(Topic, Payload) ->
|
assert_mqtt_msg_received(Topic, Payload) ->
|
||||||
ct:pal("checking if ~p has been received on ~p", [Payload, Topic]),
|
assert_mqtt_msg_received(Topic, Payload, 200).
|
||||||
|
|
||||||
|
assert_mqtt_msg_received(Topic, Payload, Timeout) ->
|
||||||
receive
|
receive
|
||||||
{deliver, Topic, #message{payload = Payload}} ->
|
{deliver, Topic, Msg = #message{}} when Payload == '_' ->
|
||||||
ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]),
|
ct:pal("received mqtt ~p on topic ~p", [Msg, Topic]),
|
||||||
ok
|
Msg;
|
||||||
after 300 ->
|
{deliver, Topic, Msg = #message{payload = Payload}} ->
|
||||||
|
ct:pal("received mqtt ~p on topic ~p", [Msg, Topic]),
|
||||||
|
Msg
|
||||||
|
after Timeout ->
|
||||||
{messages, Messages} = process_info(self(), messages),
|
{messages, Messages} = process_info(self(), messages),
|
||||||
Msg = io_lib:format("timeout waiting for ~p on topic ~p", [Payload, Topic]),
|
ct:fail("timeout waiting ~p ms for ~p on topic '~s', messages = ~0p", [
|
||||||
error({Msg, #{messages => Messages}})
|
Timeout,
|
||||||
|
Payload,
|
||||||
|
Topic,
|
||||||
|
Messages
|
||||||
|
])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
create_bridge(Config = #{<<"type">> := Type, <<"name">> := Name}) ->
|
||||||
|
{ok, 201, Bridge} = request(
|
||||||
|
post,
|
||||||
|
uri(["bridges"]),
|
||||||
|
Config
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"type">> := Type,
|
||||||
|
<<"name">> := Name
|
||||||
|
},
|
||||||
|
jsx:decode(Bridge)
|
||||||
|
),
|
||||||
|
emqx_bridge_resource:bridge_id(Type, Name).
|
||||||
|
|
||||||
|
request_bridge(BridgeID) ->
|
||||||
|
{ok, 200, Bridge} = request(get, uri(["bridges", BridgeID]), []),
|
||||||
|
jsx:decode(Bridge).
|
||||||
|
|
||||||
|
request_bridge_metrics(BridgeID) ->
|
||||||
|
{ok, 200, BridgeMetrics} = request(get, uri(["bridges", BridgeID, "metrics"]), []),
|
||||||
|
jsx:decode(BridgeMetrics).
|
||||||
|
|
||||||
request(Method, Url, Body) ->
|
request(Method, Url, Body) ->
|
||||||
request(<<"connector_admin">>, Method, Url, Body).
|
request(<<"connector_admin">>, Method, Url, Body).
|
||||||
|
|
Loading…
Reference in New Issue