diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index 9308bbe95..7bf9e1503 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -134,7 +134,7 @@ create(#{method := Method, emqx_connector_http, Config#{base_url => maps:remove(query, URIMap), pool_type => random}, - #{waiting_connect_complete => 5000}) of + #{}) of {ok, already_created} -> {ok, State}; {ok, _} -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl index 158306a87..762813e5c 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl @@ -116,7 +116,7 @@ create(#{selector := Selector} = Config) -> ?RESOURCE_GROUP, emqx_connector_mongo, Config, - #{waiting_connect_complete => 5000}) of + #{}) of {ok, already_created} -> {ok, NState}; {ok, _} -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl index b347ff30c..61a1cdf81 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl @@ -85,7 +85,7 @@ create(#{password_hash_algorithm := Algorithm, ?RESOURCE_GROUP, emqx_connector_mysql, Config, - #{waiting_connect_complete => 5000}) of + #{}) of {ok, already_created} -> {ok, State}; {ok, _} -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl index 856271db3..7f9735349 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl @@ -81,7 +81,7 @@ create(#{query := Query0, resource_id => ResourceId}, case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_pgsql, Config#{named_queries => #{ResourceId => Query}}, - #{waiting_connect_complete => 5000}) of + #{}) of {ok, already_created} -> {ok, State}; {ok, _} -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl index e604acfe3..19d590287 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl @@ -93,7 +93,7 @@ create(#{cmd := Cmd, resource_id => ResourceId}, case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_redis, Config, - #{waiting_connect_complete => 5000}) of + #{}) of {ok, already_created} -> {ok, NState}; {ok, _} -> diff --git a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl index 4199564c2..b83f710f1 100644 --- a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl @@ -63,7 +63,7 @@ init_per_suite(Config) -> ?RESOURCE_GROUP, emqx_connector_mysql, mysql_config(), - #{waiting_connect_complete => 5000}), + #{}), Config; false -> {skip, no_mysql} diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl index a4d6f8c07..572e45f00 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl @@ -64,7 +64,7 @@ init_per_suite(Config) -> ?RESOURCE_GROUP, emqx_connector_pgsql, pgsql_config(), - #{waiting_connect_complete => 5000}), + #{}), Config; false -> {skip, no_pgsql} diff --git a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl index c02830ab7..fcc00612e 100644 --- a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl @@ -63,7 +63,7 @@ init_per_suite(Config) -> ?RESOURCE_GROUP, emqx_connector_redis, redis_config(), - #{waiting_connect_complete => 5000}), + #{}), Config; false -> {skip, no_redis} diff --git a/apps/emqx_authz/src/emqx_authz_postgresql.erl b/apps/emqx_authz/src/emqx_authz_postgresql.erl index a127a9c2b..9a783ddcf 100644 --- a/apps/emqx_authz/src/emqx_authz_postgresql.erl +++ b/apps/emqx_authz/src/emqx_authz_postgresql.erl @@ -56,7 +56,7 @@ init(#{query := SQL0} = Source) -> ?RESOURCE_GROUP, emqx_connector_pgsql, Source#{named_queries => #{ResourceID => SQL}}, - #{waiting_connect_complete => 5000}) of + #{}) of {ok, _} -> Source#{annotations => #{id => ResourceID, diff --git a/apps/emqx_authz/src/emqx_authz_utils.erl b/apps/emqx_authz/src/emqx_authz_utils.erl index 73e387d81..4a0e447e9 100644 --- a/apps/emqx_authz/src/emqx_authz_utils.erl +++ b/apps/emqx_authz/src/emqx_authz_utils.erl @@ -38,7 +38,7 @@ create_resource(Module, Config) -> case emqx_resource:create_local(ResourceID, ?RESOURCE_GROUP, Module, Config, - #{waiting_connect_complete => 5000}) of + #{}) of {ok, already_created} -> {ok, ResourceID}; {ok, _} -> {ok, ResourceID}; {error, Reason} -> {error, Reason} diff --git a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl index 1bdff9455..8852ecc98 100644 --- a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl @@ -45,7 +45,7 @@ init_per_suite(Config) -> ?RESOURCE_GROUP, emqx_connector_mysql, mysql_config(), - #{waiting_connect_complete => 5000}), + #{}), Config; false -> {skip, no_mysql} diff --git a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl index 5f8c914fe..41a1a504b 100644 --- a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl @@ -45,7 +45,7 @@ init_per_suite(Config) -> ?RESOURCE_GROUP, emqx_connector_pgsql, pgsql_config(), - #{waiting_connect_complete => 5000}), + #{}), Config; false -> {skip, no_pgsql} diff --git a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl index 519973ebe..0045ce926 100644 --- a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl @@ -46,7 +46,7 @@ init_per_suite(Config) -> ?RESOURCE_GROUP, emqx_connector_redis, redis_config(), - #{waiting_connect_complete => 5000}), + #{}), Config; false -> {skip, no_redis} diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 67e9286bf..d0c4e389c 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -50,7 +50,6 @@ , remove/2 , update/2 , update/3 - , start/2 , stop/2 , restart/2 ]). @@ -208,12 +207,10 @@ lookup(Type, Name, RawConf) -> raw_config => RawConf}} end. -start(Type, Name) -> - restart(Type, Name). - stop(Type, Name) -> emqx_resource:stop(resource_id(Type, Name)). +%% we don't provide 'start', as we want an already started bridge to be restarted. restart(Type, Name) -> emqx_resource:restart(resource_id(Type, Name)). @@ -228,7 +225,7 @@ create(Type, Name, Conf) -> <<"emqx_bridge">>, emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), - #{waiting_connect_complete => 5000}) of + #{}) of {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); {ok, _} -> maybe_disable_bridge(Type, Name, Conf); {error, Reason} -> {error, Reason} @@ -263,8 +260,8 @@ update(Type, Name, {OldConf, Conf}) -> %% we don't need to recreate the bridge if this config change is only to %% toggole the config 'bridge.{type}.{name}.enable' case maps:get(enable, Conf, true) of - false -> stop(Type, Name); - true -> start(Type, Name) + true -> restart(Type, Name); + false -> stop(Type, Name) end end. @@ -275,7 +272,7 @@ recreate(Type, Name, Conf) -> emqx_resource:recreate_local(resource_id(Type, Name), emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), - #{waiting_connect_complete => 5000}). + #{}). create_dry_run(Type, Conf) -> Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}}, diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 28420b268..5326342b5 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -33,6 +33,7 @@ -export([ '/bridges'/2 , '/bridges/:id'/2 , '/bridges/:id/operation/:operation'/2 + , '/nodes/:node/bridges/:id/operation/:operation'/2 ]). -export([ lookup_from_local_node/2 @@ -74,7 +75,8 @@ namespace() -> "bridge". api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}). -paths() -> ["/bridges", "/bridges/:id", "/bridges/:id/operation/:operation"]. +paths() -> ["/bridges", "/bridges/:id", "/bridges/:id/operation/:operation", + "/nodes/:node/bridges/:id/operation/:operation"]. error_schema(Code, Message) when is_atom(Code) -> error_schema([Code], Message); @@ -87,11 +89,28 @@ get_response_body_schema() -> emqx_dashboard_swagger:schema_with_examples(emqx_bridge_schema:get_response(), bridge_info_examples(get)). -param_path_operation() -> - {operation, mk(enum([start, stop, restart]), +param_path_operation_cluster() -> + {operation, mk(enum([enable, disable, stop, restart]), #{ in => path , required => true , example => <<"start">> + , desc => <<"Operations can be one of: enable, disable, start, stop, restart">> + })}. + +param_path_operation_on_node() -> + {operation, mk(enum([stop, restart]), + #{ in => path + , required => true + , example => <<"start">> + , desc => <<"Operations can be one of: start, stop, restart">> + })}. + +param_path_node() -> + {node, mk(binary(), + #{ in => path + , required => true + , example => <<"emqx@127.0.0.1">> + , desc => <<"The bridge Id. Must be of format {type}:{name}">> })}. param_path_id() -> @@ -219,7 +238,7 @@ schema("/bridges") -> bridge_info_examples(post)), responses => #{ 201 => get_response_body_schema(), - 400 => error_schema('BAD_REQUEST', "Create bridge failed") + 400 => error_schema('ALREADY_EXISTS', "Bridge already exists") } } }; @@ -267,11 +286,32 @@ schema("/bridges/:id/operation/:operation") -> 'operationId' => '/bridges/:id/operation/:operation', post => #{ tags => [<<"bridges">>], - summary => <<"Start/Stop/Restart Bridge">>, - description => <<"Start/Stop/Restart bridges on a specific node.">>, + summary => <<"Enable/Disable/Stop/Restart Bridge">>, + description => <<"Enable/Disable/Stop/Restart bridges on all nodes" + " in the cluster.">>, parameters => [ param_path_id(), - param_path_operation() + param_path_operation_cluster() + ], + responses => #{ + 500 => error_schema('INTERNAL_ERROR', "Operation Failed"), + 200 => <<"Operation success">> + } + } + }; + +schema("/nodes/:node/bridges/:id/operation/:operation") -> + #{ + 'operationId' => '/nodes/:node/bridges/:id/operation/:operation', + post => #{ + tags => [<<"bridges">>], + summary => <<"Stop/Restart Bridge">>, + description => <<"Stop/Restart bridges on a specific node.\n" + "NOTE: It's not allowed to disable/enable bridges on a single node.">>, + parameters => [ + param_path_node(), + param_path_id(), + param_path_operation_on_node() ], responses => #{ 500 => error_schema('INTERNAL_ERROR', "Operation Failed"), @@ -341,23 +381,51 @@ lookup_from_local_node(BridgeType, BridgeName) -> '/bridges/:id/operation/:operation'(post, #{bindings := #{id := Id, operation := Op}}) -> - ?TRY_PARSE_ID(Id, case operation_to_conf_req(Op) of + ?TRY_PARSE_ID(Id, case operation_func(Op) of invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; - UpReq -> + OperFunc when OperFunc == enable; OperFunc == disable -> case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], - {UpReq, BridgeType, BridgeName}, #{override_to => cluster}) of + {OperFunc, BridgeType, BridgeName}, #{override_to => cluster}) of {ok, _} -> {200}; {error, {pre_config_update, _, bridge_not_found}} -> {404, error_msg('NOT_FOUND', <<"bridge not found">>)}; {error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)} + end; + OperFunc -> + Nodes = mria_mnesia:running_nodes(), + operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) + end). + +'/nodes/:node/bridges/:id/operation/:operation'(post, #{bindings := + #{id := Id, operation := Op}}) -> + ?TRY_PARSE_ID(Id, case operation_func(Op) of + invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; + OperFunc when OperFunc == restart; OperFunc == stop -> + case emqx_bridge:OperFunc(BridgeType, BridgeName) of + ok -> {200}; + {error, Reason} -> + {500, error_msg('INTERNAL_ERROR', Reason)} end end). -operation_to_conf_req(<<"start">>) -> start; -operation_to_conf_req(<<"stop">>) -> stop; -operation_to_conf_req(<<"restart">>) -> restart; -operation_to_conf_req(_) -> invalid. +operation_func(<<"stop">>) -> stop; +operation_func(<<"restart">>) -> restart; +operation_func(<<"enable">>) -> enable; +operation_func(<<"disable">>) -> disable; +operation_func(_) -> invalid. + +operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) -> + RpcFunc = case OperFunc of + restart -> restart_bridges_to_all_nodes; + stop -> stop_bridges_to_all_nodes + end, + case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of + {ok, _} -> + {200}; + {error, ErrL} -> + {500, error_msg('INTERNAL_ERROR', ErrL)} + end. ensure_bridge_created(BridgeType, BridgeName, Conf) -> case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], @@ -437,7 +505,7 @@ format_metrics(#{ is_ok(ResL) -> - case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of + case lists:filter(fun({ok, _}) -> false; (ok) -> false; (_) -> true end, ResL) of [] -> {ok, [Res || {ok, Res} <- ResL]}; ErrL -> {error, ErrL} end. diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index f192cf73c..b02fe2a9c 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -39,24 +39,16 @@ stop(_State) -> ok = emqx_bridge:unload_hook(), ok. --define(IS_OPER(O), when Oper == start; Oper == stop; Oper == restart). -pre_config_update(_, {Oper, _, _}, undefined) ?IS_OPER(Oper) -> +%% NOTE: We depends on the `emqx_bridge:pre_config_update/3` to restart/stop the +%% underlying resources. +pre_config_update(_, {_Oper, _, _}, undefined) -> {error, bridge_not_found}; -pre_config_update(_, {Oper, Type, Name}, OldConfig) ?IS_OPER(Oper) -> - case perform_operation(Oper, Type, Name) of - ok -> - %% we also need to save the 'enable' to the config files - {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; - {error, _} = Err -> Err - end; -pre_config_update(_, Conf, _OldConfig) -> +pre_config_update(_, {Oper, _Type, _Name}, OldConfig) -> + %% to save the 'enable' to the config files + {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; +pre_config_update(_, Conf, _OldConfig) when is_map(Conf) -> {ok, Conf}. %% internal functions -operation_to_enable(start) -> true; -operation_to_enable(stop) -> false; -operation_to_enable(restart) -> true. - -perform_operation(start, Type, Name) -> emqx_bridge:restart(Type, Name); -perform_operation(restart, Type, Name) -> emqx_bridge:restart(Type, Name); -perform_operation(stop, Type, Name) -> emqx_bridge:stop(Type, Name). +operation_to_enable(disable) -> false; +operation_to_enable(enable) -> true. diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl index 71ea1d2dc..021074a1c 100644 --- a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl @@ -22,6 +22,8 @@ , list_bridges/1 , lookup_from_all_nodes/3 + , restart_bridges_to_all_nodes/3 + , stop_bridges_to_all_nodes/3 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -37,7 +39,20 @@ list_bridges(Node) -> -type key() :: atom() | binary() | [byte()]. +-spec restart_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall(Nodes, emqx_bridge, restart, + [BridgeType, BridgeName], ?TIMEOUT). + +-spec stop_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall(Nodes, emqx_bridge, stop, + [BridgeType, BridgeName], ?TIMEOUT). + -spec lookup_from_all_nodes([node()], key(), key()) -> - emqx_rpc:erpc_multicall(). + emqx_rpc:erpc_multicall(). lookup_from_all_nodes(Nodes, BridgeType, BridgeName) -> - erpc:multicall(Nodes, emqx_bridge_api, lookup_from_local_node, [BridgeType, BridgeName], ?TIMEOUT). + erpc:multicall(Nodes, emqx_bridge_api, lookup_from_local_node, + [BridgeType, BridgeName], ?TIMEOUT). diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 47dc55f6d..2c69dc457 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -79,8 +79,14 @@ init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), Config. end_per_testcase(_, _Config) -> + clear_resources(), ok. +clear_resources() -> + lists:foreach(fun(#{type := Type, name := Name}) -> + ok = emqx_bridge:remove(Type, Name) + end, emqx_bridge:list()). + %%------------------------------------------------------------------------------ %% HTTP server for testing %%------------------------------------------------------------------------------ @@ -239,6 +245,11 @@ t_http_crud_apis(_) -> ok. t_start_stop_bridges(_) -> + lists:foreach(fun(Type) -> + do_start_stop_bridges(Type) + end, [node, cluster]). + +do_start_stop_bridges(Type) -> %% assert we there's no bridges at first {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), @@ -249,7 +260,7 @@ t_start_stop_bridges(_) -> %ct:pal("the bridge ==== ~p", [Bridge]), #{ <<"type">> := ?BRIDGE_TYPE , <<"name">> := ?BRIDGE_NAME - , <<"status">> := _ + , <<"status">> := <<"connected">> , <<"node_status">> := [_|_] , <<"metrics">> := _ , <<"node_metrics">> := [_|_] @@ -257,24 +268,68 @@ t_start_stop_bridges(_) -> } = jsx:decode(Bridge), BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), %% stop it - {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>), + {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{ <<"status">> := <<"disconnected">> }, jsx:decode(Bridge2)), %% start again - {ok, 200, <<>>} = request(post, operation_path(start, BridgeID), <<"">>), + {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{ <<"status">> := <<"connected">> }, jsx:decode(Bridge3)), %% restart an already started bridge - {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>), + {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{ <<"status">> := <<"connected">> }, jsx:decode(Bridge3)), %% stop it again - {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>), + {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), %% restart a stopped bridge - {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>), + {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), + {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{ <<"status">> := <<"connected">> + }, jsx:decode(Bridge4)), + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). + +t_enable_disable_bridges(_) -> + %% assert we there's no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + Port = start_http_server(fun handle_fun_200_ok/2), + URL1 = ?URL(Port, "abc"), + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)), + %ct:pal("the bridge ==== ~p", [Bridge]), + #{ <<"type">> := ?BRIDGE_TYPE + , <<"name">> := ?BRIDGE_NAME + , <<"status">> := <<"connected">> + , <<"node_status">> := [_|_] + , <<"metrics">> := _ + , <<"node_metrics">> := [_|_] + , <<"url">> := URL1 + } = jsx:decode(Bridge), + BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), + %% disable it + {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>), + {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{ <<"status">> := <<"disconnected">> + }, jsx:decode(Bridge2)), + %% enable again + {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>), + {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{ <<"status">> := <<"connected">> + }, jsx:decode(Bridge3)), + %% enable an already started bridge + {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>), + {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{ <<"status">> := <<"connected">> + }, jsx:decode(Bridge3)), + %% disable it again + {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>), + %% enable a stopped bridge + {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{ <<"status">> := <<"connected">> }, jsx:decode(Bridge4)), @@ -307,7 +362,7 @@ request(Method, Url, Body) -> uri() -> uri([]). uri(Parts) when is_list(Parts) -> NParts = [E || E <- Parts], - ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]). + ?HOST ++ str(filename:join([?BASE_PATH, ?API_VERSION | NParts])). auth_header_() -> Username = <<"bridge_admin">>, @@ -315,5 +370,10 @@ auth_header_() -> {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), {"Authorization", "Bearer " ++ binary_to_list(Token)}. -operation_path(Oper, BridgeID) -> +operation_path(node, Oper, BridgeID) -> + uri(["nodes", node(), "bridges", BridgeID, "operation", Oper]); +operation_path(cluster, Oper, BridgeID) -> uri(["bridges", BridgeID, "operation", Oper]). + +str(S) when is_list(S) -> S; +str(S) when is_binary(S) -> binary_to_list(S). diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 60d1630b4..4e7b7d72e 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -21,9 +21,9 @@ , connector_id/2 ]). --export([ list/0 - , lookup/1 - , lookup/2 +-export([ list_raw/0 + , lookup_raw/1 + , lookup_raw/2 , create_dry_run/2 , update/2 , update/3 @@ -68,18 +68,18 @@ parse_connector_id(ConnectorId) -> _ -> error({invalid_connector_id, ConnectorId}) end. -list() -> +list_raw() -> lists:foldl(fun({Type, NameAndConf}, Connectors) -> lists:foldl(fun({Name, RawConf}, Acc) -> [RawConf#{<<"type">> => Type, <<"name">> => Name} | Acc] end, Connectors, maps:to_list(NameAndConf)) end, [], maps:to_list(emqx:get_raw_config(config_key_path(), #{}))). -lookup(Id) when is_binary(Id) -> +lookup_raw(Id) when is_binary(Id) -> {Type, Name} = parse_connector_id(Id), - lookup(Type, Name). + lookup_raw(Type, Name). -lookup(Type, Name) -> +lookup_raw(Type, Name) -> case emqx:get_raw_config(config_key_path() ++ [Type, Name], not_found) of not_found -> {error, not_found}; Conf -> {ok, Conf#{<<"type">> => Type, <<"name">> => Name}} diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 5262aded3..5659913f3 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -205,10 +205,10 @@ schema("/connectors/:id") -> end. '/connectors'(get, _Request) -> - {200, [format_resp(Conn) || Conn <- emqx_connector:list()]}; + {200, [format_resp(Conn) || Conn <- emqx_connector:list_raw()]}; '/connectors'(post, #{body := #{<<"type">> := ConnType, <<"name">> := ConnName} = Params}) -> - case emqx_connector:lookup(ConnType, ConnName) of + case emqx_connector:lookup_raw(ConnType, ConnName) of {ok, _} -> {400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)}; {error, not_found} -> @@ -218,13 +218,13 @@ schema("/connectors/:id") -> {201, format_resp(RawConf#{<<"type">> => ConnType, <<"name">> => ConnName})}; {error, Error} -> - {400, error_msg('ALREADY_EXISTS', Error)} + {400, error_msg('BAD_REQUEST', Error)} end end. '/connectors/:id'(get, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID(Id, - case emqx_connector:lookup(ConnType, ConnName) of + case emqx_connector:lookup_raw(ConnType, ConnName) of {ok, Conf} -> {200, format_resp(Conf)}; {error, not_found} -> @@ -234,7 +234,7 @@ schema("/connectors/:id") -> '/connectors/:id'(put, #{bindings := #{id := Id}, body := Params0}) -> Params = filter_out_request_body(Params0), ?TRY_PARSE_ID(Id, - case emqx_connector:lookup(ConnType, ConnName) of + case emqx_connector:lookup_raw(ConnType, ConnName) of {ok, _} -> case emqx_connector:update(ConnType, ConnName, Params) of {ok, #{raw_config := RawConf}} -> @@ -249,7 +249,7 @@ schema("/connectors/:id") -> '/connectors/:id'(delete, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID(Id, - case emqx_connector:lookup(ConnType, ConnName) of + case emqx_connector:lookup_raw(ConnType, ConnName) of {ok, _} -> case emqx_connector:delete(ConnType, ConnName) of {ok, _} -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 5b608635f..7d2b55275 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -114,15 +114,6 @@ set_special_configs(_) -> init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), - %% assert we there's no connectors and no bridges at first - {ok, 200, Connectors} = request(get, uri(["connectors"]), []), - lists:foreach(fun(#{<<"id">> := ConnectorID}) -> - {ok, 200, <<>>} = request(delete, uri(["connectors", ConnectorID]), []) - end, jsx:decode(Connectors)), - {ok, 200, Bridges} = request(get, uri(["bridges"]), []), - lists:foreach(fun(#{<<"id">> := BridgeID}) -> - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []) - end, jsx:decode(Bridges)), Config. end_per_testcase(_, _Config) -> clear_resources(), @@ -135,9 +126,9 @@ clear_resources() -> lists:foreach(fun(#{type := Type, name := Name}) -> ok = emqx_bridge:remove(Type, Name) end, emqx_bridge:list()), - lists:foreach(fun(#{type := Type, name := Name}) -> + lists:foreach(fun(#{<<"type">> := Type, <<"name">> := Name}) -> ok = emqx_connector:delete(Type, Name) - end, emqx_connector:list()). + end, emqx_connector:list_raw()). %%------------------------------------------------------------------------------ %% Testcases diff --git a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl index 47d8f31c8..29ba2c181 100644 --- a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl @@ -71,7 +71,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> ?CONNECTOR_RESOURCE_GROUP, ?MYSQL_RESOURCE_MOD, CheckedConfig, - #{waiting_connect_complete => 5000} + #{} ), ?assertEqual(InitialStatus, connected), % Instance should match the state and status of the just started resource diff --git a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl index bc7b2eb4d..0252e0816 100644 --- a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl @@ -72,7 +72,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> ?CONNECTOR_RESOURCE_GROUP, ?PGSQL_RESOURCE_MOD, CheckedConfig, - #{waiting_connect_complete => 5000} + #{} ), ?assertEqual(InitialStatus, connected), % Instance should match the state and status of the just started resource diff --git a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl index f1fcee67c..8e473c397 100644 --- a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl @@ -86,7 +86,7 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) -> ?CONNECTOR_RESOURCE_GROUP, ?REDIS_RESOURCE_MOD, CheckedConfig, - #{waiting_connect_complete => 5000} + #{} ), ?assertEqual(InitialStatus, connected), % Instance should match the state and status of the just started resource diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl index eef945a81..7eefe8701 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl @@ -157,6 +157,8 @@ init(Name) -> persistent_term:put(?CntrRef(Name), #{}), {ok, #state{}}. +handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) -> + {reply, #{}, State}; handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) -> {reply, case maps:get(Id, Rates, undefined) of undefined -> #{}; diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index b828f1b40..059ceefdf 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -178,14 +178,17 @@ do_recreate(InstId, ResourceType, NewConfig, Opts) -> {error, not_found} end. -wait_for_resource_ready(InstId, 0) -> - force_lookup(InstId); -wait_for_resource_ready(InstId, Retry) -> +wait_for_resource_ready(InstId, WaitTime) -> + do_wait_for_resource_ready(InstId, WaitTime div 100). + +do_wait_for_resource_ready(_InstId, 0) -> + timeout; +do_wait_for_resource_ready(InstId, Retry) -> case force_lookup(InstId) of - #{status := connected} = Data -> Data; + #{status := connected} -> ok; _ -> timer:sleep(100), - wait_for_resource_ready(InstId, Retry-1) + do_wait_for_resource_ready(InstId, Retry-1) end. do_create(InstId, Group, ResourceType, Config, Opts) -> @@ -197,8 +200,7 @@ do_create(InstId, Group, ResourceType, Config, Opts) -> ok -> ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId, [matched, success, failed, exception], [matched]), - WaitTime = maps:get(waiting_connect_complete , Opts, 0), - {ok, wait_for_resource_ready(InstId, WaitTime div 100)}; + {ok, force_lookup(InstId)}; Error -> Error end @@ -252,6 +254,7 @@ do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) -> spawn(fun() -> start_and_check(InstId, Group, ResourceType, Config, Opts, InitData) end), + _ = wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), ok. start_and_check(InstId, Group, ResourceType, Config, Opts, Data) -> diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 0a1b769a8..247f597b0 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -361,7 +361,7 @@ create_resource(Context, #{type := DB} = Config) -> <<"emqx_retainer">>, list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])), Config, - #{waiting_connect_complete => 5000}) of + #{}) of {ok, already_created} -> Context#{resource_id => ResourceID}; {ok, _} ->