fix: update testcases for emqx_connector
This commit is contained in:
parent
9a9c92ae88
commit
2897af9650
|
@ -47,7 +47,7 @@
|
||||||
, recreate/3
|
, recreate/3
|
||||||
, create_dry_run/2
|
, create_dry_run/2
|
||||||
, remove/1
|
, remove/1
|
||||||
, remove/3
|
, remove/2
|
||||||
, update/2
|
, update/2
|
||||||
, update/3
|
, update/3
|
||||||
, start/2
|
, start/2
|
||||||
|
@ -286,6 +286,10 @@ remove(BridgeId) ->
|
||||||
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
||||||
remove(BridgeType, BridgeName, #{}).
|
remove(BridgeType, BridgeName, #{}).
|
||||||
|
|
||||||
|
remove(Type, Name) ->
|
||||||
|
remove(Type, Name, undefined).
|
||||||
|
|
||||||
|
%% just for perform_bridge_changes/1
|
||||||
remove(Type, Name, _Conf) ->
|
remove(Type, Name, _Conf) ->
|
||||||
?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}),
|
?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}),
|
||||||
case emqx_resource:remove_local(resource_id(Type, Name)) of
|
case emqx_resource:remove_local(resource_id(Type, Name)) of
|
||||||
|
|
|
@ -40,16 +40,15 @@ config_key_path() ->
|
||||||
-dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]).
|
-dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]).
|
||||||
post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
|
post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
|
||||||
ConnId = connector_id(Type, Name),
|
ConnId = connector_id(Type, Name),
|
||||||
try foreach_linked_bridges(ConnId, fun(#{id := BId}) ->
|
try foreach_linked_bridges(ConnId, fun(#{type := BType, name := BName}) ->
|
||||||
throw({dependency_bridges_exist, BId})
|
throw({dependency_bridges_exist, emqx_bridge:bridge_id(BType, BName)})
|
||||||
end)
|
end)
|
||||||
catch throw:Error -> {error, Error}
|
catch throw:Error -> {error, Error}
|
||||||
end;
|
end;
|
||||||
post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
|
post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
|
||||||
ConnId = connector_id(Type, Name),
|
ConnId = connector_id(Type, Name),
|
||||||
foreach_linked_bridges(ConnId,
|
foreach_linked_bridges(ConnId,
|
||||||
fun(#{id := BId}) ->
|
fun(#{type := BType, name := BName}) ->
|
||||||
{BType, BName} = emqx_bridge:parse_bridge_id(BId),
|
|
||||||
BridgeConf = emqx:get_config([bridges, BType, BName]),
|
BridgeConf = emqx:get_config([bridges, BType, BName]),
|
||||||
case emqx_bridge:update(BType, BName, {BridgeConf#{connector => OldConf},
|
case emqx_bridge:update(BType, BName, {BridgeConf#{connector => OldConf},
|
||||||
BridgeConf#{connector => NewConf}}) of
|
BridgeConf#{connector => NewConf}}) of
|
||||||
|
|
|
@ -124,8 +124,8 @@ clear_resources() ->
|
||||||
lists:foreach(fun(#{id := Id}) ->
|
lists:foreach(fun(#{id := Id}) ->
|
||||||
ok = emqx_rule_engine:delete_rule(Id)
|
ok = emqx_rule_engine:delete_rule(Id)
|
||||||
end, emqx_rule_engine:get_rules()),
|
end, emqx_rule_engine:get_rules()),
|
||||||
lists:foreach(fun(#{id := Id}) ->
|
lists:foreach(fun(#{type := Type, name := Name}) ->
|
||||||
ok = emqx_bridge:remove(Id)
|
ok = emqx_bridge:remove(Type, Name)
|
||||||
end, emqx_bridge:list()),
|
end, emqx_bridge:list()),
|
||||||
lists:foreach(fun(#{<<"id">> := Id}) ->
|
lists:foreach(fun(#{<<"id">> := Id}) ->
|
||||||
ok = emqx_connector:delete(Id)
|
ok = emqx_connector:delete(Id)
|
||||||
|
@ -231,10 +231,11 @@ t_mqtt_conn_bridge_ingress(_) ->
|
||||||
<<"type">> => ?CONNECTR_TYPE,
|
<<"type">> => ?CONNECTR_TYPE,
|
||||||
<<"name">> => ?BRIDGE_NAME_INGRESS
|
<<"name">> => ?BRIDGE_NAME_INGRESS
|
||||||
}),
|
}),
|
||||||
#{ <<"id">> := BridgeIDIngress
|
#{ <<"type">> := ?CONNECTR_TYPE
|
||||||
, <<"type">> := <<"mqtt">>
|
, <<"name">> := ?BRIDGE_NAME_INGRESS
|
||||||
, <<"connector">> := ConnctorID
|
, <<"connector">> := ConnctorID
|
||||||
} = jsx:decode(Bridge),
|
} = jsx:decode(Bridge),
|
||||||
|
BridgeIDIngress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_INGRESS),
|
||||||
wait_for_resource_ready(BridgeIDIngress, 5),
|
wait_for_resource_ready(BridgeIDIngress, 5),
|
||||||
|
|
||||||
%% we now test if the bridge works as expected
|
%% we now test if the bridge works as expected
|
||||||
|
@ -298,11 +299,11 @@ t_mqtt_conn_bridge_egress(_) ->
|
||||||
<<"type">> => ?CONNECTR_TYPE,
|
<<"type">> => ?CONNECTR_TYPE,
|
||||||
<<"name">> => ?BRIDGE_NAME_EGRESS
|
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||||
}),
|
}),
|
||||||
#{ <<"id">> := BridgeIDEgress
|
#{ <<"type">> := ?CONNECTR_TYPE
|
||||||
, <<"type">> := ?CONNECTR_TYPE
|
|
||||||
, <<"name">> := ?BRIDGE_NAME_EGRESS
|
, <<"name">> := ?BRIDGE_NAME_EGRESS
|
||||||
, <<"connector">> := ConnctorID
|
, <<"connector">> := ConnctorID
|
||||||
} = jsx:decode(Bridge),
|
} = jsx:decode(Bridge),
|
||||||
|
BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
|
||||||
wait_for_resource_ready(BridgeIDEgress, 5),
|
wait_for_resource_ready(BridgeIDEgress, 5),
|
||||||
|
|
||||||
%% we now test if the bridge works as expected
|
%% we now test if the bridge works as expected
|
||||||
|
@ -330,8 +331,7 @@ t_mqtt_conn_bridge_egress(_) ->
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
||||||
?assertMatch(#{ <<"id">> := BridgeIDEgress
|
?assertMatch(#{ <<"metrics">> := ?metrics(1, 1, 0, _, _, _)
|
||||||
, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)
|
|
||||||
, <<"node_metrics">> :=
|
, <<"node_metrics">> :=
|
||||||
[#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}]
|
[#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}]
|
||||||
}, jsx:decode(BridgeStr)),
|
}, jsx:decode(BridgeStr)),
|
||||||
|
@ -368,11 +368,11 @@ t_mqtt_conn_update(_) ->
|
||||||
<<"type">> => ?CONNECTR_TYPE,
|
<<"type">> => ?CONNECTR_TYPE,
|
||||||
<<"name">> => ?BRIDGE_NAME_EGRESS
|
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||||
}),
|
}),
|
||||||
#{ <<"id">> := BridgeIDEgress
|
#{ <<"type">> := ?CONNECTR_TYPE
|
||||||
, <<"type">> := <<"mqtt">>
|
|
||||||
, <<"name">> := ?BRIDGE_NAME_EGRESS
|
, <<"name">> := ?BRIDGE_NAME_EGRESS
|
||||||
, <<"connector">> := ConnctorID
|
, <<"connector">> := ConnctorID
|
||||||
} = jsx:decode(Bridge),
|
} = jsx:decode(Bridge),
|
||||||
|
BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
|
||||||
wait_for_resource_ready(BridgeIDEgress, 5),
|
wait_for_resource_ready(BridgeIDEgress, 5),
|
||||||
|
|
||||||
%% then we try to update 'server' of the connector, to an unavailable IP address
|
%% then we try to update 'server' of the connector, to an unavailable IP address
|
||||||
|
@ -410,12 +410,12 @@ t_mqtt_conn_update2(_) ->
|
||||||
<<"type">> => ?CONNECTR_TYPE,
|
<<"type">> => ?CONNECTR_TYPE,
|
||||||
<<"name">> => ?BRIDGE_NAME_EGRESS
|
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||||
}),
|
}),
|
||||||
#{ <<"id">> := BridgeIDEgress
|
#{ <<"type">> := ?CONNECTR_TYPE
|
||||||
, <<"type">> := <<"mqtt">>
|
|
||||||
, <<"name">> := ?BRIDGE_NAME_EGRESS
|
, <<"name">> := ?BRIDGE_NAME_EGRESS
|
||||||
, <<"status">> := <<"disconnected">>
|
, <<"status">> := <<"disconnected">>
|
||||||
, <<"connector">> := ConnctorID
|
, <<"connector">> := ConnctorID
|
||||||
} = jsx:decode(Bridge),
|
} = jsx:decode(Bridge),
|
||||||
|
BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
|
||||||
%% We try to fix the 'server' parameter, to another unavailable server..
|
%% We try to fix the 'server' parameter, to another unavailable server..
|
||||||
%% The update should success: we don't check the connectivity of the new config
|
%% The update should success: we don't check the connectivity of the new config
|
||||||
%% if the resource is now disconnected.
|
%% if the resource is now disconnected.
|
||||||
|
@ -426,8 +426,7 @@ t_mqtt_conn_update2(_) ->
|
||||||
?MQTT_CONNECTOR2(<<"127.0.0.1:1883">>)),
|
?MQTT_CONNECTOR2(<<"127.0.0.1:1883">>)),
|
||||||
wait_for_resource_ready(BridgeIDEgress, 5),
|
wait_for_resource_ready(BridgeIDEgress, 5),
|
||||||
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
||||||
?assertMatch(#{ <<"id">> := BridgeIDEgress
|
?assertMatch(#{ <<"status">> := <<"connected">>
|
||||||
, <<"status">> := <<"connected">>
|
|
||||||
}, jsx:decode(BridgeStr)),
|
}, jsx:decode(BridgeStr)),
|
||||||
%% delete the bridge
|
%% delete the bridge
|
||||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
||||||
|
@ -453,9 +452,9 @@ t_mqtt_conn_update3(_) ->
|
||||||
<<"type">> => ?CONNECTR_TYPE,
|
<<"type">> => ?CONNECTR_TYPE,
|
||||||
<<"name">> => ?BRIDGE_NAME_EGRESS
|
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||||
}),
|
}),
|
||||||
#{ <<"id">> := BridgeIDEgress
|
#{ <<"connector">> := ConnctorID
|
||||||
, <<"connector">> := ConnctorID
|
|
||||||
} = jsx:decode(Bridge),
|
} = jsx:decode(Bridge),
|
||||||
|
BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
|
||||||
wait_for_resource_ready(BridgeIDEgress, 5),
|
wait_for_resource_ready(BridgeIDEgress, 5),
|
||||||
|
|
||||||
%% delete the connector should fail because it is in use by a bridge
|
%% delete the connector should fail because it is in use by a bridge
|
||||||
|
@ -486,12 +485,12 @@ t_ingress_mqtt_bridge_with_rules(_) ->
|
||||||
}),
|
}),
|
||||||
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
|
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
|
||||||
|
|
||||||
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
{ok, 201, _} = request(post, uri(["bridges"]),
|
||||||
?MQTT_BRIDGE_INGRESS(ConnctorID)#{
|
?MQTT_BRIDGE_INGRESS(ConnctorID)#{
|
||||||
<<"type">> => ?CONNECTR_TYPE,
|
<<"type">> => ?CONNECTR_TYPE,
|
||||||
<<"name">> => ?BRIDGE_NAME_INGRESS
|
<<"name">> => ?BRIDGE_NAME_INGRESS
|
||||||
}),
|
}),
|
||||||
#{ <<"id">> := BridgeIDIngress } = jsx:decode(Bridge),
|
BridgeIDIngress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_INGRESS),
|
||||||
|
|
||||||
{ok, 201, Rule} = request(post, uri(["rules"]),
|
{ok, 201, Rule} = request(post, uri(["rules"]),
|
||||||
#{<<"name">> => <<"A rule get messages from a source mqtt bridge">>,
|
#{<<"name">> => <<"A rule get messages from a source mqtt bridge">>,
|
||||||
|
@ -572,7 +571,8 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
||||||
<<"type">> => ?CONNECTR_TYPE,
|
<<"type">> => ?CONNECTR_TYPE,
|
||||||
<<"name">> => ?BRIDGE_NAME_EGRESS
|
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||||
}),
|
}),
|
||||||
#{ <<"id">> := BridgeIDEgress } = jsx:decode(Bridge),
|
#{ <<"type">> := ?CONNECTR_TYPE, <<"name">> := ?BRIDGE_NAME_EGRESS } = jsx:decode(Bridge),
|
||||||
|
BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
|
||||||
|
|
||||||
{ok, 201, Rule} = request(post, uri(["rules"]),
|
{ok, 201, Rule} = request(post, uri(["rules"]),
|
||||||
#{<<"name">> => <<"A rule send messages to a sink mqtt bridge">>,
|
#{<<"name">> => <<"A rule send messages to a sink mqtt bridge">>,
|
||||||
|
@ -647,8 +647,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
||||||
?assertMatch(#{ <<"id">> := BridgeIDEgress
|
?assertMatch(#{ <<"metrics">> := ?metrics(2, 2, 0, _, _, _)
|
||||||
, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)
|
|
||||||
, <<"node_metrics">> :=
|
, <<"node_metrics">> :=
|
||||||
[#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}]
|
[#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}]
|
||||||
}, jsx:decode(BridgeStr)),
|
}, jsx:decode(BridgeStr)),
|
||||||
|
|
Loading…
Reference in New Issue