fix(bridges): store connector name and bridge name to config files

This commit is contained in:
Shawn 2021-12-30 12:31:13 +08:00
parent 110ae62b24
commit d11cf6ad64
11 changed files with 168 additions and 176 deletions

View File

@ -222,7 +222,10 @@ update(Type, Name, {OldConf, Conf}) ->
true -> true ->
%% we don't need to recreate the bridge if this config change is only to %% we don't need to recreate the bridge if this config change is only to
%% toggole the config 'bridge.{type}.{name}.enable' %% toggole the config 'bridge.{type}.{name}.enable'
ok case maps:get(enable, Conf, true) of
false -> stop(Type, Name);
true -> start(Type, Name)
end
end. end.
recreate(Type, Name) -> recreate(Type, Name) ->

View File

@ -356,9 +356,8 @@ operation_to_conf_req(<<"restart">>) -> restart;
operation_to_conf_req(_) -> invalid. operation_to_conf_req(_) -> invalid.
ensure_bridge_created(BridgeType, BridgeName, Conf) -> ensure_bridge_created(BridgeType, BridgeName, Conf) ->
Conf1 = maps:without([<<"type">>, <<"name">>], Conf),
case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
Conf1, #{override_to => cluster}) of Conf, #{override_to => cluster}) of
{ok, _} -> ok; {ok, _} -> ok;
{error, Reason} -> {error, Reason} ->
{error, error_msg('BAD_ARG', Reason)} {error, error_msg('BAD_ARG', Reason)}
@ -411,12 +410,12 @@ aggregate_metrics(AllMetrics) ->
format_resp(#{id := Id, raw_config := RawConf, format_resp(#{id := Id, raw_config := RawConf,
resource_data := #{status := Status, metrics := Metrics}}) -> resource_data := #{status := Status, metrics := Metrics}}) ->
{Type, Name} = emqx_bridge:parse_bridge_id(Id), {Type, BridgeName} = emqx_bridge:parse_bridge_id(Id),
IsConnected = fun(started) -> connected; (_) -> disconnected end, IsConnected = fun(started) -> connected; (_) -> disconnected end,
RawConf#{ RawConf#{
id => Id, id => Id,
type => Type, type => Type,
name => Name, name => maps:get(<<"name">>, RawConf, BridgeName),
node => node(), node => node(),
status => IsConnected(Status), status => IsConnected(Status),
metrics => Metrics metrics => Metrics
@ -431,7 +430,7 @@ rpc_multicall(Func, Args) ->
end. end.
filter_out_request_body(Conf) -> filter_out_request_body(Conf) ->
ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, ExtraConfs = [<<"id">>, <<"type">>, <<"status">>, <<"node_status">>,
<<"node_metrics">>, <<"metrics">>, <<"node">>], <<"node_metrics">>, <<"metrics">>, <<"node">>],
maps:without(ExtraConfs, Conf). maps:without(ExtraConfs, Conf).

View File

@ -84,6 +84,10 @@ basic_config() ->
#{ desc => "Enable or disable this bridge" #{ desc => "Enable or disable this bridge"
, default => true , default => true
})} })}
, {name,
mk(binary(),
#{ desc => "Bridge name, used as a human-readable description of the bridge."
})}
, {direction, , {direction,
mk(egress, mk(egress,
#{ desc => "The direction of this bridge, MUST be egress" #{ desc => "The direction of this bridge, MUST be egress"

View File

@ -43,9 +43,13 @@ http_schema(Method) ->
common_bridge_fields() -> common_bridge_fields() ->
[ {enable, [ {enable,
mk(boolean(), mk(boolean(),
#{ desc =>"Enable or disable this bridge" #{ desc => "Enable or disable this bridge"
, default => true , default => true
})} })}
, {name,
mk(binary(),
#{ desc => "Bridge name, used as a human-readable description of the bridge."
})}
, {connector, , {connector,
mk(binary(), mk(binary(),
#{ nullable => false #{ nullable => false

View File

@ -23,12 +23,13 @@
-define(CONF_DEFAULT, <<"bridges: {}">>). -define(CONF_DEFAULT, <<"bridges: {}">>).
-define(BRIDGE_TYPE, <<"http">>). -define(BRIDGE_TYPE, <<"http">>).
-define(BRIDGE_NAME, <<"test_bridge">>). -define(BRIDGE_NAME, <<"test_bridge">>).
-define(BRIDGE_ID, <<"http:test_bridge">>).
-define(URL(PORT, PATH), list_to_binary( -define(URL(PORT, PATH), list_to_binary(
io_lib:format("http://localhost:~s/~s", io_lib:format("http://localhost:~s/~s",
[integer_to_list(PORT), PATH]))). [integer_to_list(PORT), PATH]))).
-define(HTTP_BRIDGE(URL), -define(HTTP_BRIDGE(URL, TYPE, NAME),
#{ #{
<<"type">> => TYPE,
<<"name">> => NAME,
<<"url">> => URL, <<"url">> => URL,
<<"local_topic">> => <<"emqx_http/#">>, <<"local_topic">> => <<"emqx_http/#">>,
<<"method">> => <<"post">>, <<"method">> => <<"post">>,
@ -145,13 +146,10 @@ t_http_crud_apis(_) ->
%% POST /bridges/ will create a bridge %% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"), URL1 = ?URL(Port, "path1"),
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{ ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
<<"type">> => ?BRIDGE_TYPE,
<<"name">> => ?BRIDGE_NAME
}),
%ct:pal("---bridge: ~p", [Bridge]), %ct:pal("---bridge: ~p", [Bridge]),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID #{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE , <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _ , <<"status">> := _
@ -159,18 +157,7 @@ t_http_crud_apis(_) ->
, <<"metrics">> := _ , <<"metrics">> := _
, <<"node_metrics">> := [_|_] , <<"node_metrics">> := [_|_]
, <<"url">> := URL1 , <<"url">> := URL1
}, jsx:decode(Bridge)), } = jsx:decode(Bridge),
%% create a again returns an error
{ok, 400, RetMsg} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{
<<"type">> => ?BRIDGE_TYPE,
<<"name">> => ?BRIDGE_NAME
}),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"bridge already exists">>
}, jsx:decode(RetMsg)),
%% send an message to emqx and the message should be forwarded to the HTTP server %% send an message to emqx and the message should be forwarded to the HTTP server
Body = <<"my msg">>, Body = <<"my msg">>,
@ -188,9 +175,9 @@ t_http_crud_apis(_) ->
end), end),
%% update the request-path of the bridge %% update the request-path of the bridge
URL2 = ?URL(Port, "path2"), URL2 = ?URL(Port, "path2"),
{ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]), {ok, 200, Bridge2} = request(put, uri(["bridges", BridgeID]),
?HTTP_BRIDGE(URL2)), ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID ?assertMatch(#{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE , <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _ , <<"status">> := _
@ -202,7 +189,7 @@ t_http_crud_apis(_) ->
%% list all bridges again, assert Bridge2 is in it %% list all bridges again, assert Bridge2 is in it
{ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []), {ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []),
?assertMatch([#{ <<"id">> := ?BRIDGE_ID ?assertMatch([#{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE , <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _ , <<"status">> := _
@ -213,8 +200,8 @@ t_http_crud_apis(_) ->
}], jsx:decode(Bridge2Str)), }], jsx:decode(Bridge2Str)),
%% get the bridge by id %% get the bridge by id
{ok, 200, Bridge3Str} = request(get, uri(["bridges", ?BRIDGE_ID]), []), {ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID ?assertMatch(#{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE , <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _ , <<"status">> := _
@ -238,12 +225,12 @@ t_http_crud_apis(_) ->
end), end),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% update a deleted bridge returns an error %% update a deleted bridge returns an error
{ok, 404, ErrMsg2} = request(put, uri(["bridges", ?BRIDGE_ID]), {ok, 404, ErrMsg2} = request(put, uri(["bridges", BridgeID]),
?HTTP_BRIDGE(URL2)), ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
?assertMatch( ?assertMatch(
#{ <<"code">> := _ #{ <<"code">> := _
, <<"message">> := <<"bridge not found">> , <<"message">> := <<"bridge not found">>
@ -251,16 +238,15 @@ t_http_crud_apis(_) ->
ok. ok.
t_start_stop_bridges(_) -> t_start_stop_bridges(_) ->
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
Port = start_http_server(fun handle_fun_200_ok/2), Port = start_http_server(fun handle_fun_200_ok/2),
URL1 = ?URL(Port, "abc"), URL1 = ?URL(Port, "abc"),
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{ ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
<<"type">> => ?BRIDGE_TYPE,
<<"name">> => ?BRIDGE_NAME
}),
%ct:pal("the bridge ==== ~p", [Bridge]), %ct:pal("the bridge ==== ~p", [Bridge]),
?assertMatch( #{ <<"id">> := BridgeID
#{ <<"id">> := ?BRIDGE_ID
, <<"type">> := ?BRIDGE_TYPE , <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _ , <<"status">> := _
@ -268,35 +254,35 @@ t_start_stop_bridges(_) ->
, <<"metrics">> := _ , <<"metrics">> := _
, <<"node_metrics">> := [_|_] , <<"node_metrics">> := [_|_]
, <<"url">> := URL1 , <<"url">> := URL1
}, jsx:decode(Bridge)), } = jsx:decode(Bridge),
%% stop it %% stop it
{ok, 200, <<>>} = request(post, operation_path(stop), <<"">>), {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", ?BRIDGE_ID]), []), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID ?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"disconnected">> , <<"status">> := <<"disconnected">>
}, jsx:decode(Bridge2)), }, jsx:decode(Bridge2)),
%% start again %% start again
{ok, 200, <<>>} = request(post, operation_path(start), <<"">>), {ok, 200, <<>>} = request(post, operation_path(start, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID ?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)), }, jsx:decode(Bridge3)),
%% restart an already started bridge %% restart an already started bridge
{ok, 200, <<>>} = request(post, operation_path(restart), <<"">>), {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID ?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)), }, jsx:decode(Bridge3)),
%% stop it again %% stop it again
{ok, 200, <<>>} = request(post, operation_path(stop), <<"">>), {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>),
%% restart a stopped bridge %% restart a stopped bridge
{ok, 200, <<>>} = request(post, operation_path(restart), <<"">>), {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>),
{ok, 200, Bridge4} = request(get, uri(["bridges", ?BRIDGE_ID]), []), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID ?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
}, jsx:decode(Bridge4)), }, jsx:decode(Bridge4)),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -332,5 +318,5 @@ auth_header_() ->
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}. {"Authorization", "Bearer " ++ binary_to_list(Token)}.
operation_path(Oper) -> operation_path(Oper, BridgeID) ->
uri(["bridges", ?BRIDGE_ID, "operation", Oper]). uri(["bridges", BridgeID, "operation", Oper]).

View File

@ -155,8 +155,7 @@ schema("/connectors") ->
}, },
post => #{ post => #{
tags => [<<"connectors">>], tags => [<<"connectors">>],
description => <<"Create a new connector by given Id <br>" description => <<"Create a new connector">>,
"The ID must be of format '{type}:{name}'">>,
summary => <<"Create connector">>, summary => <<"Create connector">>,
requestBody => post_request_body_schema(), requestBody => post_request_body_schema(),
responses => #{ responses => #{
@ -270,16 +269,16 @@ format_resp(#{<<"id">> := Id} = RawConf) ->
format_resp(ConnId, RawConf) -> format_resp(ConnId, RawConf) ->
NumOfBridges = length(emqx_bridge:list_bridges_by_connector(ConnId)), NumOfBridges = length(emqx_bridge:list_bridges_by_connector(ConnId)),
{Type, Name} = emqx_connector:parse_connector_id(ConnId), {Type, ConnName} = emqx_connector:parse_connector_id(ConnId),
RawConf#{ RawConf#{
<<"id">> => ConnId, <<"id">> => ConnId,
<<"type">> => Type, <<"type">> => Type,
<<"name">> => Name, <<"name">> => maps:get(<<"name">>, RawConf, ConnName),
<<"num_of_bridges">> => NumOfBridges <<"num_of_bridges">> => NumOfBridges
}. }.
filter_out_request_body(Conf) -> filter_out_request_body(Conf) ->
ExtraConfs = [<<"clientid">>, <<"num_of_bridges">>, <<"type">>, <<"name">>], ExtraConfs = [<<"clientid">>, <<"num_of_bridges">>, <<"type">>],
maps:without(ExtraConfs, Conf). maps:without(ExtraConfs, Conf).
bin(S) when is_list(S) -> bin(S) when is_list(S) ->

View File

@ -112,7 +112,7 @@ If the request is provided, the caller can send HTTP requests via
<code>emqx_resource:query(ResourceId, {send_message, BridgeId, Message})</code> <code>emqx_resource:query(ResourceId, {send_message, BridgeId, Message})</code>
""" """
})} })}
] ++ emqx_connector_schema_lib:ssl_fields(); ] ++ emqx_connector_schema:common_fields() ++ emqx_connector_schema_lib:ssl_fields();
fields("request") -> fields("request") ->
[ {method, hoconsc:mk(hoconsc:enum([post, put, get, delete]), #{nullable => true})} [ {method, hoconsc:mk(hoconsc:enum([post, put, get, delete]), #{nullable => true})}

View File

@ -8,6 +8,8 @@
-export([roots/0, fields/1]). -export([roots/0, fields/1]).
-export([common_fields/0]).
-export([ get_response/0 -export([ get_response/0
, put_request/0 , put_request/0
, post_request/0 , post_request/0
@ -49,3 +51,10 @@ fields("connectors") ->
schema_mod(Type) -> schema_mod(Type) ->
list_to_atom(lists:concat(["emqx_connector_", Type])). list_to_atom(lists:concat(["emqx_connector_", Type])).
common_fields() ->
[ {name,
mk(binary(),
#{ desc => "Connector name, used as a human-readable description of the connector."
})}
].

View File

@ -94,7 +94,7 @@ topic filters for 'remote_topic' of ingress connections.
Queue messages in disk files. Queue messages in disk files.
""" """
})} })}
] ++ emqx_connector_schema_lib:ssl_fields(); ] ++ emqx_connector_schema:common_fields() ++ emqx_connector_schema_lib:ssl_fields();
fields("ingress") -> fields("ingress") ->
%% the message maybe subscribed by rules, in this case 'local_topic' is not necessary %% the message maybe subscribed by rules, in this case 'local_topic' is not necessary

View File

@ -26,11 +26,8 @@
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
-define(CONNECTR_TYPE, <<"mqtt">>). -define(CONNECTR_TYPE, <<"mqtt">>).
-define(CONNECTR_NAME, <<"test_connector">>). -define(CONNECTR_NAME, <<"test_connector">>).
-define(CONNECTR_ID, <<"mqtt:test_connector">>).
-define(BRIDGE_NAME_INGRESS, <<"ingress_test_bridge">>). -define(BRIDGE_NAME_INGRESS, <<"ingress_test_bridge">>).
-define(BRIDGE_NAME_EGRESS, <<"egress_test_bridge">>). -define(BRIDGE_NAME_EGRESS, <<"egress_test_bridge">>).
-define(BRIDGE_ID_INGRESS, <<"mqtt:ingress_test_bridge">>).
-define(BRIDGE_ID_EGRESS, <<"mqtt:egress_test_bridge">>).
-define(MQTT_CONNECOTR(Username), -define(MQTT_CONNECOTR(Username),
#{ #{
<<"server">> => <<"127.0.0.1:1883">>, <<"server">> => <<"127.0.0.1:1883">>,
@ -123,8 +120,7 @@ t_mqtt_crud_apis(_) ->
, <<"name">> => ?CONNECTR_NAME , <<"name">> => ?CONNECTR_NAME
}), }),
%ct:pal("---connector: ~p", [Connector]), #{ <<"id">> := ConnctorID
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"type">> := ?CONNECTR_TYPE , <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?CONNECTR_NAME , <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
@ -132,23 +128,13 @@ t_mqtt_crud_apis(_) ->
, <<"password">> := <<"">> , <<"password">> := <<"">>
, <<"proto_ver">> := <<"v4">> , <<"proto_ver">> := <<"v4">>
, <<"ssl">> := #{<<"enable">> := false} , <<"ssl">> := #{<<"enable">> := false}
}, jsx:decode(Connector)), } = jsx:decode(Connector),
%% create a again returns an error
{ok, 400, RetMsg} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"connector already exists">>
}, jsx:decode(RetMsg)),
%% update the request-path of the connector %% update the request-path of the connector
User2 = <<"user2">>, User2 = <<"user2">>,
{ok, 200, Connector2} = request(put, uri(["connectors", ?CONNECTR_ID]), {ok, 200, Connector2} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR(User2)), ?MQTT_CONNECOTR(User2)),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID ?assertMatch(#{ <<"id">> := ConnctorID
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User2 , <<"username">> := User2
, <<"password">> := <<"">> , <<"password">> := <<"">>
@ -158,7 +144,7 @@ t_mqtt_crud_apis(_) ->
%% list all connectors again, assert Connector2 is in it %% list all connectors again, assert Connector2 is in it
{ok, 200, Connector2Str} = request(get, uri(["connectors"]), []), {ok, 200, Connector2Str} = request(get, uri(["connectors"]), []),
?assertMatch([#{ <<"id">> := ?CONNECTR_ID ?assertMatch([#{ <<"id">> := ConnctorID
, <<"type">> := ?CONNECTR_TYPE , <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?CONNECTR_NAME , <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
@ -169,8 +155,8 @@ t_mqtt_crud_apis(_) ->
}], jsx:decode(Connector2Str)), }], jsx:decode(Connector2Str)),
%% get the connector by id %% get the connector by id
{ok, 200, Connector3Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []), {ok, 200, Connector3Str} = request(get, uri(["connectors", ConnctorID]), []),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID ?assertMatch(#{ <<"id">> := ConnctorID
, <<"type">> := ?CONNECTR_TYPE , <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?CONNECTR_NAME , <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
@ -181,11 +167,11 @@ t_mqtt_crud_apis(_) ->
}, jsx:decode(Connector3Str)), }, jsx:decode(Connector3Str)),
%% delete the connector %% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
%% update a deleted connector returns an error %% update a deleted connector returns an error
{ok, 404, ErrMsg2} = request(put, uri(["connectors", ?CONNECTR_ID]), {ok, 404, ErrMsg2} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR(User2)), ?MQTT_CONNECOTR(User2)),
?assertMatch( ?assertMatch(
#{ <<"code">> := _ #{ <<"code">> := _
@ -205,28 +191,28 @@ t_mqtt_conn_bridge_ingress(_) ->
, <<"name">> => ?CONNECTR_NAME , <<"name">> => ?CONNECTR_NAME
}), }),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID #{ <<"id">> := ConnctorID
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
, <<"num_of_bridges">> := 0 , <<"num_of_bridges">> := 0
, <<"username">> := User1 , <<"username">> := User1
, <<"password">> := <<"">> , <<"password">> := <<"">>
, <<"proto_ver">> := <<"v4">> , <<"proto_ver">> := <<"v4">>
, <<"ssl">> := #{<<"enable">> := false} , <<"ssl">> := #{<<"enable">> := false}
}, jsx:decode(Connector)), } = jsx:decode(Connector),
%% ... and a MQTT bridge, using POST %% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now %% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{ ?MQTT_BRIDGE_INGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE, <<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_INGRESS <<"name">> => ?BRIDGE_NAME_INGRESS
}), }),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS #{ <<"id">> := BridgeIDIngress
, <<"type">> := <<"mqtt">> , <<"type">> := <<"mqtt">>
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
, <<"connector">> := ?CONNECTR_ID , <<"connector">> := ConnctorID
}, jsx:decode(Bridge)), } = jsx:decode(Bridge),
%% we now test if the bridge works as expected %% we now test if the bridge works as expected
@ -252,17 +238,17 @@ t_mqtt_conn_bridge_ingress(_) ->
end), end),
%% get the connector by id, verify the num_of_bridges now is 1 %% get the connector by id, verify the num_of_bridges now is 1
{ok, 200, Connector1Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []), {ok, 200, Connector1Str} = request(get, uri(["connectors", ConnctorID]), []),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID ?assertMatch(#{ <<"id">> := ConnctorID
, <<"num_of_bridges">> := 1 , <<"num_of_bridges">> := 1
}, jsx:decode(Connector1Str)), }, jsx:decode(Connector1Str)),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_INGRESS]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector %% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
ok. ok.
@ -279,29 +265,28 @@ t_mqtt_conn_bridge_egress(_) ->
}), }),
%ct:pal("---connector: ~p", [Connector]), %ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID #{ <<"id">> := ConnctorID
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User1 , <<"username">> := User1
, <<"password">> := <<"">> , <<"password">> := <<"">>
, <<"proto_ver">> := <<"v4">> , <<"proto_ver">> := <<"v4">>
, <<"ssl">> := #{<<"enable">> := false} , <<"ssl">> := #{<<"enable">> := false}
}, jsx:decode(Connector)), } = jsx:decode(Connector),
%% ... and a MQTT bridge, using POST %% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now %% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ ?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE, <<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS <<"name">> => ?BRIDGE_NAME_EGRESS
}), }),
%ct:pal("---bridge: ~p", [Bridge]), #{ <<"id">> := BridgeIDEgress
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
, <<"type">> := ?CONNECTR_TYPE , <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?BRIDGE_NAME_EGRESS , <<"name">> := ?BRIDGE_NAME_EGRESS
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
, <<"connector">> := ?CONNECTR_ID , <<"connector">> := ConnctorID
}, jsx:decode(Bridge)), } = jsx:decode(Bridge),
%% we now test if the bridge works as expected %% we now test if the bridge works as expected
LocalTopic = <<"local_topic/1">>, LocalTopic = <<"local_topic/1">>,
@ -326,19 +311,19 @@ t_mqtt_conn_bridge_egress(_) ->
end), end),
%% verify the metrics of the bridge %% verify the metrics of the bridge
{ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS ?assertMatch(#{ <<"id">> := BridgeIDEgress
, <<"metrics">> := ?metrics(1, 1, 0, _, _, _) , <<"metrics">> := ?metrics(1, 1, 0, _, _, _)
, <<"node_metrics">> := , <<"node_metrics">> :=
[#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}] [#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}]
}, jsx:decode(BridgeStr)), }, jsx:decode(BridgeStr)),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector %% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
ok. ok.
@ -358,37 +343,37 @@ t_mqtt_conn_update(_) ->
}), }),
%ct:pal("---connector: ~p", [Connector]), %ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID #{ <<"id">> := ConnctorID
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
}, jsx:decode(Connector)), } = jsx:decode(Connector),
%% ... and a MQTT bridge, using POST %% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now %% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ ?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE, <<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS <<"name">> => ?BRIDGE_NAME_EGRESS
}), }),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS #{ <<"id">> := BridgeIDEgress
, <<"type">> := <<"mqtt">> , <<"type">> := <<"mqtt">>
, <<"name">> := ?BRIDGE_NAME_EGRESS , <<"name">> := ?BRIDGE_NAME_EGRESS
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
, <<"connector">> := ?CONNECTR_ID , <<"connector">> := ConnctorID
}, jsx:decode(Bridge)), } = jsx:decode(Bridge),
%% then we try to update 'server' of the connector, to an unavailable IP address %% then we try to update 'server' of the connector, to an unavailable IP address
%% the update should fail because of 'unreachable' or 'connrefused' %% the update should fail because of 'unreachable' or 'connrefused'
{ok, 400, _ErrorMsg} = request(put, uri(["connectors", ?CONNECTR_ID]), {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>)), ?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>)),
%% we fix the 'server' parameter to a normal one, it should work %% we fix the 'server' parameter to a normal one, it should work
{ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]), {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)), ?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector %% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_update2(_) -> t_mqtt_conn_update2(_) ->
@ -404,36 +389,36 @@ t_mqtt_conn_update2(_) ->
, <<"name">> => ?CONNECTR_NAME , <<"name">> => ?CONNECTR_NAME
}), }),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID #{ <<"id">> := ConnctorID
, <<"server">> := <<"127.0.0.1:2603">> , <<"server">> := <<"127.0.0.1:2603">>
}, jsx:decode(Connector)), } = jsx:decode(Connector),
%% ... and a MQTT bridge, using POST %% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now %% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ ?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE, <<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS <<"name">> => ?BRIDGE_NAME_EGRESS
}), }),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS #{ <<"id">> := BridgeIDEgress
, <<"type">> := <<"mqtt">> , <<"type">> := <<"mqtt">>
, <<"name">> := ?BRIDGE_NAME_EGRESS , <<"name">> := ?BRIDGE_NAME_EGRESS
, <<"status">> := <<"disconnected">> , <<"status">> := <<"disconnected">>
, <<"connector">> := ?CONNECTR_ID , <<"connector">> := ConnctorID
}, jsx:decode(Bridge)), } = jsx:decode(Bridge),
%% we fix the 'server' parameter to a normal one, it should work %% we fix the 'server' parameter to a normal one, it should work
{ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]), {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
{ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS ?assertMatch(#{ <<"id">> := BridgeIDEgress
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
}, jsx:decode(BridgeStr)), }, jsx:decode(BridgeStr)),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector %% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_testing(_) -> t_mqtt_conn_testing(_) ->

View File

@ -216,7 +216,10 @@ do_remove(Mod, InstId, ResourceState) ->
do_restart(InstId) -> do_restart(InstId) ->
case lookup(InstId) of case lookup(InstId) of
{ok, #{mod := Mod, state := ResourceState, config := Config} = Data} -> {ok, #{mod := Mod, state := ResourceState, config := Config} = Data} ->
_ = emqx_resource:call_stop(InstId, Mod, ResourceState), _ = case ResourceState of
undefine -> ok;
_ -> emqx_resource:call_stop(InstId, Mod, ResourceState)
end,
case emqx_resource:call_start(InstId, Mod, Config) of case emqx_resource:call_start(InstId, Mod, Config) of
{ok, NewResourceState} -> {ok, NewResourceState} ->
ets:insert(emqx_resource_instance, ets:insert(emqx_resource_instance,