From d11cf6ad64531dd3620dcc1ba017d0433a0173d2 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 30 Dec 2021 12:31:13 +0800 Subject: [PATCH] fix(bridges): store connector name and bridge name to config files --- apps/emqx_bridge/src/emqx_bridge.erl | 5 +- apps/emqx_bridge/src/emqx_bridge_api.erl | 9 +- .../src/emqx_bridge_http_schema.erl | 4 + apps/emqx_bridge/src/emqx_bridge_schema.erl | 6 +- .../test/emqx_bridge_api_SUITE.erl | 116 +++++------- .../emqx_connector/src/emqx_connector_api.erl | 9 +- .../src/emqx_connector_http.erl | 2 +- .../src/emqx_connector_schema.erl | 9 + .../src/mqtt/emqx_connector_mqtt_schema.erl | 2 +- .../test/emqx_connector_api_SUITE.erl | 177 ++++++++---------- .../src/emqx_resource_instance.erl | 5 +- 11 files changed, 168 insertions(+), 176 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index a6681d3f1..d46ce217e 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -222,7 +222,10 @@ update(Type, Name, {OldConf, Conf}) -> true -> %% we don't need to recreate the bridge if this config change is only to %% toggole the config 'bridge.{type}.{name}.enable' - ok + case maps:get(enable, Conf, true) of + false -> stop(Type, Name); + true -> start(Type, Name) + end end. recreate(Type, Name) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 900ad99ee..a5b9aa984 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -356,9 +356,8 @@ operation_to_conf_req(<<"restart">>) -> restart; operation_to_conf_req(_) -> invalid. ensure_bridge_created(BridgeType, BridgeName, Conf) -> - Conf1 = maps:without([<<"type">>, <<"name">>], Conf), case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], - Conf1, #{override_to => cluster}) of + Conf, #{override_to => cluster}) of {ok, _} -> ok; {error, Reason} -> {error, error_msg('BAD_ARG', Reason)} @@ -411,12 +410,12 @@ aggregate_metrics(AllMetrics) -> format_resp(#{id := Id, raw_config := RawConf, resource_data := #{status := Status, metrics := Metrics}}) -> - {Type, Name} = emqx_bridge:parse_bridge_id(Id), + {Type, BridgeName} = emqx_bridge:parse_bridge_id(Id), IsConnected = fun(started) -> connected; (_) -> disconnected end, RawConf#{ id => Id, type => Type, - name => Name, + name => maps:get(<<"name">>, RawConf, BridgeName), node => node(), status => IsConnected(Status), metrics => Metrics @@ -431,7 +430,7 @@ rpc_multicall(Func, Args) -> end. filter_out_request_body(Conf) -> - ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, + ExtraConfs = [<<"id">>, <<"type">>, <<"status">>, <<"node_status">>, <<"node_metrics">>, <<"metrics">>, <<"node">>], maps:without(ExtraConfs, Conf). diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl index a5937509c..494911d21 100644 --- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl @@ -84,6 +84,10 @@ basic_config() -> #{ desc => "Enable or disable this bridge" , default => true })} + , {name, + mk(binary(), + #{ desc => "Bridge name, used as a human-readable description of the bridge." + })} , {direction, mk(egress, #{ desc => "The direction of this bridge, MUST be egress" diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index 82fc79ebf..00d461098 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -43,9 +43,13 @@ http_schema(Method) -> common_bridge_fields() -> [ {enable, mk(boolean(), - #{ desc =>"Enable or disable this bridge" + #{ desc => "Enable or disable this bridge" , default => true })} + , {name, + mk(binary(), + #{ desc => "Bridge name, used as a human-readable description of the bridge." + })} , {connector, mk(binary(), #{ nullable => false diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 7724d467c..807ad32f6 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -23,12 +23,13 @@ -define(CONF_DEFAULT, <<"bridges: {}">>). -define(BRIDGE_TYPE, <<"http">>). -define(BRIDGE_NAME, <<"test_bridge">>). --define(BRIDGE_ID, <<"http:test_bridge">>). -define(URL(PORT, PATH), list_to_binary( io_lib:format("http://localhost:~s/~s", [integer_to_list(PORT), PATH]))). --define(HTTP_BRIDGE(URL), +-define(HTTP_BRIDGE(URL, TYPE, NAME), #{ + <<"type">> => TYPE, + <<"name">> => NAME, <<"url">> => URL, <<"local_topic">> => <<"emqx_http/#">>, <<"method">> => <<"post">>, @@ -145,32 +146,18 @@ t_http_crud_apis(_) -> %% POST /bridges/ will create a bridge URL1 = ?URL(Port, "path1"), {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1)#{ - <<"type">> => ?BRIDGE_TYPE, - <<"name">> => ?BRIDGE_NAME - }), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)), %ct:pal("---bridge: ~p", [Bridge]), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID - , <<"type">> := ?BRIDGE_TYPE - , <<"name">> := ?BRIDGE_NAME - , <<"status">> := _ - , <<"node_status">> := [_|_] - , <<"metrics">> := _ - , <<"node_metrics">> := [_|_] - , <<"url">> := URL1 - }, jsx:decode(Bridge)), - - %% create a again returns an error - {ok, 400, RetMsg} = request(post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1)#{ - <<"type">> => ?BRIDGE_TYPE, - <<"name">> => ?BRIDGE_NAME - }), - ?assertMatch( - #{ <<"code">> := _ - , <<"message">> := <<"bridge already exists">> - }, jsx:decode(RetMsg)), + #{ <<"id">> := BridgeID + , <<"type">> := ?BRIDGE_TYPE + , <<"name">> := ?BRIDGE_NAME + , <<"status">> := _ + , <<"node_status">> := [_|_] + , <<"metrics">> := _ + , <<"node_metrics">> := [_|_] + , <<"url">> := URL1 + } = jsx:decode(Bridge), %% send an message to emqx and the message should be forwarded to the HTTP server Body = <<"my msg">>, @@ -188,9 +175,9 @@ t_http_crud_apis(_) -> end), %% update the request-path of the bridge URL2 = ?URL(Port, "path2"), - {ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]), - ?HTTP_BRIDGE(URL2)), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + {ok, 200, Bridge2} = request(put, uri(["bridges", BridgeID]), + ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)), + ?assertMatch(#{ <<"id">> := BridgeID , <<"type">> := ?BRIDGE_TYPE , <<"name">> := ?BRIDGE_NAME , <<"status">> := _ @@ -202,7 +189,7 @@ t_http_crud_apis(_) -> %% list all bridges again, assert Bridge2 is in it {ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []), - ?assertMatch([#{ <<"id">> := ?BRIDGE_ID + ?assertMatch([#{ <<"id">> := BridgeID , <<"type">> := ?BRIDGE_TYPE , <<"name">> := ?BRIDGE_NAME , <<"status">> := _ @@ -213,8 +200,8 @@ t_http_crud_apis(_) -> }], jsx:decode(Bridge2Str)), %% get the bridge by id - {ok, 200, Bridge3Str} = request(get, uri(["bridges", ?BRIDGE_ID]), []), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + {ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{ <<"id">> := BridgeID , <<"type">> := ?BRIDGE_TYPE , <<"name">> := ?BRIDGE_NAME , <<"status">> := _ @@ -238,12 +225,12 @@ t_http_crud_apis(_) -> end), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), %% update a deleted bridge returns an error - {ok, 404, ErrMsg2} = request(put, uri(["bridges", ?BRIDGE_ID]), - ?HTTP_BRIDGE(URL2)), + {ok, 404, ErrMsg2} = request(put, uri(["bridges", BridgeID]), + ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)), ?assertMatch( #{ <<"code">> := _ , <<"message">> := <<"bridge not found">> @@ -251,52 +238,51 @@ t_http_crud_apis(_) -> ok. t_start_stop_bridges(_) -> + %% assert we there's no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + Port = start_http_server(fun handle_fun_200_ok/2), URL1 = ?URL(Port, "abc"), {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1)#{ - <<"type">> => ?BRIDGE_TYPE, - <<"name">> => ?BRIDGE_NAME - }), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)), %ct:pal("the bridge ==== ~p", [Bridge]), - ?assertMatch( - #{ <<"id">> := ?BRIDGE_ID - , <<"type">> := ?BRIDGE_TYPE - , <<"name">> := ?BRIDGE_NAME - , <<"status">> := _ - , <<"node_status">> := [_|_] - , <<"metrics">> := _ - , <<"node_metrics">> := [_|_] - , <<"url">> := URL1 - }, jsx:decode(Bridge)), + #{ <<"id">> := BridgeID + , <<"type">> := ?BRIDGE_TYPE + , <<"name">> := ?BRIDGE_NAME + , <<"status">> := _ + , <<"node_status">> := [_|_] + , <<"metrics">> := _ + , <<"node_metrics">> := [_|_] + , <<"url">> := URL1 + } = jsx:decode(Bridge), %% stop it - {ok, 200, <<>>} = request(post, operation_path(stop), <<"">>), - {ok, 200, Bridge2} = request(get, uri(["bridges", ?BRIDGE_ID]), []), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>), + {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{ <<"id">> := BridgeID , <<"status">> := <<"disconnected">> }, jsx:decode(Bridge2)), %% start again - {ok, 200, <<>>} = request(post, operation_path(start), <<"">>), - {ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + {ok, 200, <<>>} = request(post, operation_path(start, BridgeID), <<"">>), + {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{ <<"id">> := BridgeID , <<"status">> := <<"connected">> }, jsx:decode(Bridge3)), %% restart an already started bridge - {ok, 200, <<>>} = request(post, operation_path(restart), <<"">>), - {ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>), + {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{ <<"id">> := BridgeID , <<"status">> := <<"connected">> }, jsx:decode(Bridge3)), %% stop it again - {ok, 200, <<>>} = request(post, operation_path(stop), <<"">>), + {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>), %% restart a stopped bridge - {ok, 200, <<>>} = request(post, operation_path(restart), <<"">>), - {ok, 200, Bridge4} = request(get, uri(["bridges", ?BRIDGE_ID]), []), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>), + {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{ <<"id">> := BridgeID , <<"status">> := <<"connected">> }, jsx:decode(Bridge4)), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). %%-------------------------------------------------------------------- @@ -332,5 +318,5 @@ auth_header_() -> {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), {"Authorization", "Bearer " ++ binary_to_list(Token)}. -operation_path(Oper) -> - uri(["bridges", ?BRIDGE_ID, "operation", Oper]). +operation_path(Oper, BridgeID) -> + uri(["bridges", BridgeID, "operation", Oper]). diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 82b63476d..4989cf17e 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -155,8 +155,7 @@ schema("/connectors") -> }, post => #{ tags => [<<"connectors">>], - description => <<"Create a new connector by given Id
" - "The ID must be of format '{type}:{name}'">>, + description => <<"Create a new connector">>, summary => <<"Create connector">>, requestBody => post_request_body_schema(), responses => #{ @@ -270,16 +269,16 @@ format_resp(#{<<"id">> := Id} = RawConf) -> format_resp(ConnId, RawConf) -> NumOfBridges = length(emqx_bridge:list_bridges_by_connector(ConnId)), - {Type, Name} = emqx_connector:parse_connector_id(ConnId), + {Type, ConnName} = emqx_connector:parse_connector_id(ConnId), RawConf#{ <<"id">> => ConnId, <<"type">> => Type, - <<"name">> => Name, + <<"name">> => maps:get(<<"name">>, RawConf, ConnName), <<"num_of_bridges">> => NumOfBridges }. filter_out_request_body(Conf) -> - ExtraConfs = [<<"clientid">>, <<"num_of_bridges">>, <<"type">>, <<"name">>], + ExtraConfs = [<<"clientid">>, <<"num_of_bridges">>, <<"type">>], maps:without(ExtraConfs, Conf). bin(S) when is_list(S) -> diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 509e293cf..77d498c6b 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -112,7 +112,7 @@ If the request is provided, the caller can send HTTP requests via emqx_resource:query(ResourceId, {send_message, BridgeId, Message}) """ })} - ] ++ emqx_connector_schema_lib:ssl_fields(); + ] ++ emqx_connector_schema:common_fields() ++ emqx_connector_schema_lib:ssl_fields(); fields("request") -> [ {method, hoconsc:mk(hoconsc:enum([post, put, get, delete]), #{nullable => true})} diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl index 33d10802b..ed663ee60 100644 --- a/apps/emqx_connector/src/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/emqx_connector_schema.erl @@ -8,6 +8,8 @@ -export([roots/0, fields/1]). +-export([common_fields/0]). + -export([ get_response/0 , put_request/0 , post_request/0 @@ -49,3 +51,10 @@ fields("connectors") -> schema_mod(Type) -> list_to_atom(lists:concat(["emqx_connector_", Type])). + +common_fields() -> + [ {name, + mk(binary(), + #{ desc => "Connector name, used as a human-readable description of the connector." + })} + ]. diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 303617a29..44add053c 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -94,7 +94,7 @@ topic filters for 'remote_topic' of ingress connections. Queue messages in disk files. """ })} - ] ++ emqx_connector_schema_lib:ssl_fields(); + ] ++ emqx_connector_schema:common_fields() ++ emqx_connector_schema_lib:ssl_fields(); fields("ingress") -> %% the message maybe subscribed by rules, in this case 'local_topic' is not necessary diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 307852546..1a96a3596 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -26,11 +26,8 @@ -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). -define(CONNECTR_TYPE, <<"mqtt">>). -define(CONNECTR_NAME, <<"test_connector">>). --define(CONNECTR_ID, <<"mqtt:test_connector">>). -define(BRIDGE_NAME_INGRESS, <<"ingress_test_bridge">>). -define(BRIDGE_NAME_EGRESS, <<"egress_test_bridge">>). --define(BRIDGE_ID_INGRESS, <<"mqtt:ingress_test_bridge">>). --define(BRIDGE_ID_EGRESS, <<"mqtt:egress_test_bridge">>). -define(MQTT_CONNECOTR(Username), #{ <<"server">> => <<"127.0.0.1:1883">>, @@ -123,32 +120,21 @@ t_mqtt_crud_apis(_) -> , <<"name">> => ?CONNECTR_NAME }), - %ct:pal("---connector: ~p", [Connector]), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID - , <<"type">> := ?CONNECTR_TYPE - , <<"name">> := ?CONNECTR_NAME - , <<"server">> := <<"127.0.0.1:1883">> - , <<"username">> := User1 - , <<"password">> := <<"">> - , <<"proto_ver">> := <<"v4">> - , <<"ssl">> := #{<<"enable">> := false} - }, jsx:decode(Connector)), - - %% create a again returns an error - {ok, 400, RetMsg} = request(post, uri(["connectors"]), - ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE - , <<"name">> => ?CONNECTR_NAME - }), - ?assertMatch( - #{ <<"code">> := _ - , <<"message">> := <<"connector already exists">> - }, jsx:decode(RetMsg)), + #{ <<"id">> := ConnctorID + , <<"type">> := ?CONNECTR_TYPE + , <<"name">> := ?CONNECTR_NAME + , <<"server">> := <<"127.0.0.1:1883">> + , <<"username">> := User1 + , <<"password">> := <<"">> + , <<"proto_ver">> := <<"v4">> + , <<"ssl">> := #{<<"enable">> := false} + } = jsx:decode(Connector), %% update the request-path of the connector User2 = <<"user2">>, - {ok, 200, Connector2} = request(put, uri(["connectors", ?CONNECTR_ID]), + {ok, 200, Connector2} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR(User2)), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID + ?assertMatch(#{ <<"id">> := ConnctorID , <<"server">> := <<"127.0.0.1:1883">> , <<"username">> := User2 , <<"password">> := <<"">> @@ -158,7 +144,7 @@ t_mqtt_crud_apis(_) -> %% list all connectors again, assert Connector2 is in it {ok, 200, Connector2Str} = request(get, uri(["connectors"]), []), - ?assertMatch([#{ <<"id">> := ?CONNECTR_ID + ?assertMatch([#{ <<"id">> := ConnctorID , <<"type">> := ?CONNECTR_TYPE , <<"name">> := ?CONNECTR_NAME , <<"server">> := <<"127.0.0.1:1883">> @@ -169,8 +155,8 @@ t_mqtt_crud_apis(_) -> }], jsx:decode(Connector2Str)), %% get the connector by id - {ok, 200, Connector3Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID + {ok, 200, Connector3Str} = request(get, uri(["connectors", ConnctorID]), []), + ?assertMatch(#{ <<"id">> := ConnctorID , <<"type">> := ?CONNECTR_TYPE , <<"name">> := ?CONNECTR_NAME , <<"server">> := <<"127.0.0.1:1883">> @@ -181,11 +167,11 @@ t_mqtt_crud_apis(_) -> }, jsx:decode(Connector3Str)), %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), %% update a deleted connector returns an error - {ok, 404, ErrMsg2} = request(put, uri(["connectors", ?CONNECTR_ID]), + {ok, 404, ErrMsg2} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR(User2)), ?assertMatch( #{ <<"code">> := _ @@ -205,28 +191,28 @@ t_mqtt_conn_bridge_ingress(_) -> , <<"name">> => ?CONNECTR_NAME }), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID - , <<"server">> := <<"127.0.0.1:1883">> - , <<"num_of_bridges">> := 0 - , <<"username">> := User1 - , <<"password">> := <<"">> - , <<"proto_ver">> := <<"v4">> - , <<"ssl">> := #{<<"enable">> := false} - }, jsx:decode(Connector)), + #{ <<"id">> := ConnctorID + , <<"server">> := <<"127.0.0.1:1883">> + , <<"num_of_bridges">> := 0 + , <<"username">> := User1 + , <<"password">> := <<"">> + , <<"proto_ver">> := <<"v4">> + , <<"ssl">> := #{<<"enable">> := false} + } = jsx:decode(Connector), %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{ + ?MQTT_BRIDGE_INGRESS(ConnctorID)#{ <<"type">> => ?CONNECTR_TYPE, <<"name">> => ?BRIDGE_NAME_INGRESS }), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS - , <<"type">> := <<"mqtt">> - , <<"status">> := <<"connected">> - , <<"connector">> := ?CONNECTR_ID - }, jsx:decode(Bridge)), + #{ <<"id">> := BridgeIDIngress + , <<"type">> := <<"mqtt">> + , <<"status">> := <<"connected">> + , <<"connector">> := ConnctorID + } = jsx:decode(Bridge), %% we now test if the bridge works as expected @@ -252,17 +238,17 @@ t_mqtt_conn_bridge_ingress(_) -> end), %% get the connector by id, verify the num_of_bridges now is 1 - {ok, 200, Connector1Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID + {ok, 200, Connector1Str} = request(get, uri(["connectors", ConnctorID]), []), + ?assertMatch(#{ <<"id">> := ConnctorID , <<"num_of_bridges">> := 1 }, jsx:decode(Connector1Str)), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_INGRESS]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), ok. @@ -279,29 +265,28 @@ t_mqtt_conn_bridge_egress(_) -> }), %ct:pal("---connector: ~p", [Connector]), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID - , <<"server">> := <<"127.0.0.1:1883">> - , <<"username">> := User1 - , <<"password">> := <<"">> - , <<"proto_ver">> := <<"v4">> - , <<"ssl">> := #{<<"enable">> := false} - }, jsx:decode(Connector)), + #{ <<"id">> := ConnctorID + , <<"server">> := <<"127.0.0.1:1883">> + , <<"username">> := User1 + , <<"password">> := <<"">> + , <<"proto_ver">> := <<"v4">> + , <<"ssl">> := #{<<"enable">> := false} + } = jsx:decode(Connector), %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ + ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ <<"type">> => ?CONNECTR_TYPE, <<"name">> => ?BRIDGE_NAME_EGRESS }), - %ct:pal("---bridge: ~p", [Bridge]), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS - , <<"type">> := ?CONNECTR_TYPE - , <<"name">> := ?BRIDGE_NAME_EGRESS - , <<"status">> := <<"connected">> - , <<"connector">> := ?CONNECTR_ID - }, jsx:decode(Bridge)), + #{ <<"id">> := BridgeIDEgress + , <<"type">> := ?CONNECTR_TYPE + , <<"name">> := ?BRIDGE_NAME_EGRESS + , <<"status">> := <<"connected">> + , <<"connector">> := ConnctorID + } = jsx:decode(Bridge), %% we now test if the bridge works as expected LocalTopic = <<"local_topic/1">>, @@ -326,19 +311,19 @@ t_mqtt_conn_bridge_egress(_) -> end), %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch(#{ <<"id">> := BridgeIDEgress , <<"metrics">> := ?metrics(1, 1, 0, _, _, _) , <<"node_metrics">> := [#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}] }, jsx:decode(BridgeStr)), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), ok. @@ -358,37 +343,37 @@ t_mqtt_conn_update(_) -> }), %ct:pal("---connector: ~p", [Connector]), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID - , <<"server">> := <<"127.0.0.1:1883">> - }, jsx:decode(Connector)), + #{ <<"id">> := ConnctorID + , <<"server">> := <<"127.0.0.1:1883">> + } = jsx:decode(Connector), %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ + ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ <<"type">> => ?CONNECTR_TYPE, <<"name">> => ?BRIDGE_NAME_EGRESS }), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS - , <<"type">> := <<"mqtt">> - , <<"name">> := ?BRIDGE_NAME_EGRESS - , <<"status">> := <<"connected">> - , <<"connector">> := ?CONNECTR_ID - }, jsx:decode(Bridge)), + #{ <<"id">> := BridgeIDEgress + , <<"type">> := <<"mqtt">> + , <<"name">> := ?BRIDGE_NAME_EGRESS + , <<"status">> := <<"connected">> + , <<"connector">> := ConnctorID + } = jsx:decode(Bridge), %% then we try to update 'server' of the connector, to an unavailable IP address %% the update should fail because of 'unreachable' or 'connrefused' - {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ?CONNECTR_ID]), + {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>)), %% we fix the 'server' parameter to a normal one, it should work - {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]), + {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). t_mqtt_conn_update2(_) -> @@ -404,36 +389,36 @@ t_mqtt_conn_update2(_) -> , <<"name">> => ?CONNECTR_NAME }), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID - , <<"server">> := <<"127.0.0.1:2603">> - }, jsx:decode(Connector)), + #{ <<"id">> := ConnctorID + , <<"server">> := <<"127.0.0.1:2603">> + } = jsx:decode(Connector), %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ + ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ <<"type">> => ?CONNECTR_TYPE, <<"name">> => ?BRIDGE_NAME_EGRESS }), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS - , <<"type">> := <<"mqtt">> - , <<"name">> := ?BRIDGE_NAME_EGRESS - , <<"status">> := <<"disconnected">> - , <<"connector">> := ?CONNECTR_ID - }, jsx:decode(Bridge)), + #{ <<"id">> := BridgeIDEgress + , <<"type">> := <<"mqtt">> + , <<"name">> := ?BRIDGE_NAME_EGRESS + , <<"status">> := <<"disconnected">> + , <<"connector">> := ConnctorID + } = jsx:decode(Bridge), %% we fix the 'server' parameter to a normal one, it should work - {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]), + {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), - {ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch(#{ <<"id">> := BridgeIDEgress , <<"status">> := <<"connected">> }, jsx:decode(BridgeStr)), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). t_mqtt_conn_testing(_) -> diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 497affa5e..ebc812805 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -216,7 +216,10 @@ do_remove(Mod, InstId, ResourceState) -> do_restart(InstId) -> case lookup(InstId) of {ok, #{mod := Mod, state := ResourceState, config := Config} = Data} -> - _ = emqx_resource:call_stop(InstId, Mod, ResourceState), + _ = case ResourceState of + undefine -> ok; + _ -> emqx_resource:call_stop(InstId, Mod, ResourceState) + end, case emqx_resource:call_start(InstId, Mod, Config) of {ok, NewResourceState} -> ets:insert(emqx_resource_instance,