From 9b63bdc1e01e38b3a08281a875039f26113c8aa1 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 23 Mar 2023 15:27:34 +0800 Subject: [PATCH] chore: apply review suggestions - Rename sql to cql - Add tests for `bridges_probe` API --- .../docker-compose-cassandra.yaml | 6 +- .../i18n/emqx_ee_bridge_cassa.conf | 10 +- .../src/emqx_ee_bridge_cassa.erl | 10 +- .../test/emqx_ee_bridge_cassa_SUITE.erl | 86 +++++++++--- .../src/emqx_ee_connector_cassa.erl | 125 +++++++++--------- .../test/emqx_ee_connector_cassa_SUITE.erl | 3 +- 6 files changed, 140 insertions(+), 100 deletions(-) diff --git a/.ci/docker-compose-file/docker-compose-cassandra.yaml b/.ci/docker-compose-file/docker-compose-cassandra.yaml index 393a5cac7..a54f621c1 100644 --- a/.ci/docker-compose-file/docker-compose-cassandra.yaml +++ b/.ci/docker-compose-file/docker-compose-cassandra.yaml @@ -14,9 +14,9 @@ services: CASSANDRA_RPC_ADDRESS: "0.0.0.0" volumes: - ./certs:/certs - ports: - - "9042:9042" - - "9142:9142" + #ports: + # - "9042:9042" + # - "9142:9142" command: - /bin/bash - -c diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_cassa.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_cassa.conf index b8d810413..3bbac6658 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_cassa.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_cassa.conf @@ -16,14 +16,14 @@ will be forwarded.""" } } - sql_template { + cql_template { desc { - en: """SQL Template""" - zh: """SQL 模板""" + en: """CQL Template""" + zh: """CQL 模板""" } label { - en: "SQL Template" - zh: "SQL 模板" + en: "CQL Template" + zh: "CQL 模板" } } config_enable { diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl index bcbeb8e82..20821dc8f 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl @@ -36,7 +36,7 @@ conn_bridge_examples(Method) -> [ #{ - <<"cassa">> => #{ + <<"cassandra">> => #{ summary => <<"Cassandra Bridge">>, value => values(Method, cassandra) } @@ -54,7 +54,7 @@ values(_Method, Type) -> pool_size => 8, username => <<"root">>, password => <<"public">>, - sql => ?DEFAULT_CQL, + cql => ?DEFAULT_CQL, local_topic => <<"local/topic/#">>, resource_opts => #{ worker_pool_size => 8, @@ -77,10 +77,10 @@ roots() -> []. fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {sql, + {cql, mk( binary(), - #{desc => ?DESC("sql_template"), default => ?DEFAULT_CQL, format => <<"sql">>} + #{desc => ?DESC("cql_template"), default => ?DEFAULT_CQL, format => <<"sql">>} )}, {local_topic, mk( @@ -102,7 +102,7 @@ fields("config") -> fields("creation_opts") -> emqx_resource_schema:fields("creation_opts_sync_only"); fields("post") -> - fields("post", cassa); + fields("post", cassandra); fields("put") -> fields("config"); fields("get") -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl index 666cd0caf..d040000e2 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl @@ -27,9 +27,9 @@ ");\n" "" ). --define(SQL_DROP_TABLE, "DROP TABLE mqtt_msg_test"). --define(SQL_DELETE, "TRUNCATE mqtt_msg_test"). --define(SQL_SELECT, "SELECT payload FROM mqtt_msg_test"). +-define(SQL_DROP_TABLE, "DROP TABLE mqtt.mqtt_msg_test"). +-define(SQL_DELETE, "TRUNCATE mqtt.mqtt_msg_test"). +-define(SQL_SELECT, "SELECT payload FROM mqtt.mqtt_msg_test"). % DB defaults -define(CASSA_KEYSPACE, "mqtt"). @@ -46,6 +46,20 @@ -define(CERTFILE, filename:join(?CERT_ROOT, ["client.pem"])). -define(KEYFILE, filename:join(?CERT_ROOT, ["client.key"])). +%% How to run it locally: +%% 1. Start all deps services +%% sudo docker compose -f .ci/docker-compose-file/docker-compose.yaml \ +%% -f .ci/docker-compose-file/docker-compose-cassandra.yaml \ +%% -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \ +%% up --build +%% +%% 2. Run use cases with special environment variables +%% CASSA_TCP_HOST=127.0.0.1 CASSA_TCP_PORT=19042 \ +%% CASSA_TLS_HOST=127.0.0.1 CASSA_TLS_PORT=19142 \ +%% PROXY_HOST=127.0.0.1 ./rebar3 as test ct -c -v --name ct@127.0.0.1 \ +%% --suite lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl +%% + %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ @@ -197,7 +211,7 @@ cassa_config(BridgeType, Config) -> " keyspace = ~p\n" " username = ~p\n" " password = ~p\n" - " sql = ~p\n" + " cql = ~p\n" " resource_opts = {\n" " request_timeout = 500ms\n" " batch_size = ~b\n" @@ -238,8 +252,8 @@ parse_and_check(ConfigString, BridgeType, Name) -> create_bridge(Config) -> BridgeType = ?config(cassa_bridge_type, Config), Name = ?config(cassa_name, Config), - PGConfig = ?config(cassa_config, Config), - emqx_bridge:create(BridgeType, Name, PGConfig). + BridgeConfig = ?config(cassa_config, Config), + emqx_bridge:create(BridgeType, Name, BridgeConfig). delete_bridge(Config) -> BridgeType = ?config(cassa_bridge_type, Config), @@ -254,6 +268,14 @@ create_bridge_http(Params) -> Error -> Error end. +bridges_probe_http(Params) -> + Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, _} -> ok; + Error -> Error + end. + send_message(Config, Payload) -> Name = ?config(cassa_name, Config), BridgeType = ?config(cassa_bridge_type, Config), @@ -294,25 +316,33 @@ connect_direct_cassa(Config) -> % These funs connect and then stop the cassandra connection connect_and_create_table(Config) -> - Con = connect_direct_cassa(Config), - {ok, _} = ecql:query(Con, ?SQL_CREATE_TABLE), - ok = ecql:close(Con). + with_direct_conn(Config, fun(Conn) -> + {ok, _} = ecql:query(Conn, ?SQL_CREATE_TABLE) + end). connect_and_drop_table(Config) -> - Con = connect_direct_cassa(Config), - {ok, _} = ecql:query(Con, ?SQL_DROP_TABLE), - ok = ecql:close(Con). + with_direct_conn(Config, fun(Conn) -> + {ok, _} = ecql:query(Conn, ?SQL_DROP_TABLE) + end). connect_and_clear_table(Config) -> - Con = connect_direct_cassa(Config), - ok = ecql:query(Con, ?SQL_DELETE), - ok = ecql:close(Con). + with_direct_conn(Config, fun(Conn) -> + ok = ecql:query(Conn, ?SQL_DELETE) + end). connect_and_get_payload(Config) -> - Con = connect_direct_cassa(Config), - {ok, {_Keyspace, _ColsSpec, [[Result]]}} = ecql:query(Con, ?SQL_SELECT), - ok = ecql:close(Con), - Result. + with_direct_conn(Config, fun(Conn) -> + {ok, {_Keyspace, _ColsSpec, [[Result]]}} = ecql:query(Conn, ?SQL_SELECT), + Result + end). + +with_direct_conn(Config, Fn) -> + Conn = connect_direct_cassa(Config), + try + Fn(Conn) + after + ok = ecql:close(Conn) + end. %%------------------------------------------------------------------------------ %% Testcases @@ -358,14 +388,14 @@ t_setup_via_config_and_publish(Config) -> t_setup_via_http_api_and_publish(Config) -> BridgeType = ?config(cassa_bridge_type, Config), Name = ?config(cassa_name, Config), - PgsqlConfig0 = ?config(cassa_config, Config), - PgsqlConfig = PgsqlConfig0#{ + BridgeConfig0 = ?config(cassa_config, Config), + BridgeConfig = BridgeConfig0#{ <<"name">> => Name, <<"type">> => BridgeType }, ?assertMatch( {ok, _}, - create_bridge_http(PgsqlConfig) + create_bridge_http(BridgeConfig) ), Val = integer_to_binary(erlang:unique_integer()), SentData = #{ @@ -421,6 +451,18 @@ t_get_status(Config) -> end), ok. +t_bridges_probe_via_http(Config) -> + BridgeType = ?config(cassa_bridge_type, Config), + Name = ?config(cassa_name, Config), + BridgeConfig0 = ?config(cassa_config, Config), + BridgeConfig = BridgeConfig0#{ + <<"name">> => Name, + <<"type">> => BridgeType + }, + ?assertMatch(ok, bridges_probe_http(BridgeConfig)), + + ok. + t_create_disconnected(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl index cf25cd6d8..cdece50f7 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl @@ -33,7 +33,7 @@ on_start/2, on_stop/2, on_query/3, - %% TODO: now_supported_now + %% TODO: not_supported_now %%on_batch_query/3, on_get_status/2 ]). @@ -41,7 +41,7 @@ %% callbacks of ecpool -export([ connect/1, - prepare_sql_to_conn/2 + prepare_cql_to_conn/2 ]). %% callbacks for query executing @@ -55,7 +55,7 @@ -type state() :: #{ poolname := atom(), - prepare_sql := prepares(), + prepare_cql := prepares(), params_tokens := params_tokens(), %% returned by ecql:prepare/2 prepare_statement := binary() @@ -109,9 +109,6 @@ on_start( ssl := SSL } = Config ) -> - {ok, _} = application:ensure_all_started(ecpool), - {ok, _} = application:ensure_all_started(ecql), - ?SLOG(info, #{ msg => "starting_cassandra_connector", connector => InstId, @@ -139,7 +136,7 @@ on_start( end, PoolName = emqx_plugin_libs_pool:pool_name(InstId), - Prepares = parse_prepare_sql(Config), + Prepares = parse_prepare_cql(Config), InitState = #{poolname => PoolName, prepare_statement => #{}}, State = maps:merge(InitState, Prepares), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of @@ -177,30 +174,30 @@ on_query( Request, #{poolname := PoolName} = State ) -> - {Type, PreparedKeyOrSQL, Params} = parse_request_to_sql(Request), + {Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request), ?tp( debug, - cassandra_connector_received_sql_query, + cassandra_connector_received_cql_query, #{ connector => InstId, type => Type, params => Params, - prepared_key_or_sql => PreparedKeyOrSQL, + prepared_key_or_cql => PreparedKeyOrSQL, state => State } ), - {PreparedKeyOrSQL1, Data} = proc_sql_params(Type, PreparedKeyOrSQL, Params, State), - Res = exec_sql_query(InstId, PoolName, Type, PreparedKeyOrSQL1, Data), + {PreparedKeyOrSQL1, Data} = proc_cql_params(Type, PreparedKeyOrSQL, Params, State), + Res = exec_cql_query(InstId, PoolName, Type, PreparedKeyOrSQL1, Data), handle_result(Res). -parse_request_to_sql({send_message, Params}) -> +parse_request_to_cql({send_message, Params}) -> {prepared_query, _Key = send_message, Params}; -parse_request_to_sql({query, SQL}) -> - parse_request_to_sql({query, SQL, #{}}); -parse_request_to_sql({query, SQL, Params}) -> +parse_request_to_cql({query, SQL}) -> + parse_request_to_cql({query, SQL, #{}}); +parse_request_to_cql({query, SQL, Params}) -> {query, SQL, Params}. -proc_sql_params( +proc_cql_params( prepared_query, PreparedKey0, Params, @@ -209,11 +206,11 @@ proc_sql_params( PreparedKey = maps:get(PreparedKey0, Prepares), Tokens = maps:get(PreparedKey0, ParamsTokens), {PreparedKey, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}; -proc_sql_params(query, SQL, Params, _State) -> +proc_cql_params(query, SQL, Params, _State) -> {SQL1, Tokens} = emqx_plugin_libs_rule:preproc_sql(SQL, '?'), {SQL1, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}. -exec_sql_query(InstId, PoolName, Type, PreparedKey, Data) when +exec_cql_query(InstId, PoolName, Type, PreparedKey, Data) when Type == query; Type == prepared_query -> case ecpool:pick_and_do(PoolName, {?MODULE, Type, [PreparedKey, Data]}, no_handover) of @@ -239,7 +236,7 @@ on_get_status(_InstId, #{poolname := Pool} = State) -> %% return new state with prepared statements {connected, NState}; false -> - %% do not log error, it is logged in prepare_sql_to_conn + %% do not log error, it is logged in prepare_cql_to_conn connecting end; false -> @@ -249,14 +246,14 @@ on_get_status(_InstId, #{poolname := Pool} = State) -> do_get_status(Conn) -> ok == element(1, ecql:query(Conn, "SELECT count(1) AS T FROM system.local")). -do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> +do_check_prepares(#{prepare_cql := Prepares}) when is_map(Prepares) -> ok; -do_check_prepares(State = #{poolname := PoolName, prepare_sql := {error, Prepares}}) -> +do_check_prepares(State = #{poolname := PoolName, prepare_cql := {error, Prepares}}) -> %% retry to prepare - case prepare_sql(Prepares, PoolName) of + case prepare_cql(Prepares, PoolName) of {ok, Sts} -> %% remove the error - {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}; + {ok, State#{prepare_cql => Prepares, prepare_statement := Sts}}; _Error -> false end. @@ -295,83 +292,83 @@ conn_opts([Opt | Opts], Acc) -> %% prepare %% XXX: hardcode -%% note: the `sql` param is passed by emqx_ee_bridge_cassa -parse_prepare_sql(#{sql := SQL}) -> - parse_prepare_sql([{send_message, SQL}], #{}, #{}); -parse_prepare_sql(_) -> - #{prepare_sql => #{}, params_tokens => #{}}. +%% note: the `cql` param is passed by emqx_ee_bridge_cassa +parse_prepare_cql(#{cql := SQL}) -> + parse_prepare_cql([{send_message, SQL}], #{}, #{}); +parse_prepare_cql(_) -> + #{prepare_cql => #{}, params_tokens => #{}}. -parse_prepare_sql([{Key, H} | T], Prepares, Tokens) -> +parse_prepare_cql([{Key, H} | T], Prepares, Tokens) -> {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '?'), - parse_prepare_sql( + parse_prepare_cql( T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} ); -parse_prepare_sql([], Prepares, Tokens) -> +parse_prepare_cql([], Prepares, Tokens) -> #{ - prepare_sql => Prepares, + prepare_cql => Prepares, params_tokens => Tokens }. -init_prepare(State = #{prepare_sql := Prepares, poolname := PoolName}) -> +init_prepare(State = #{prepare_cql := Prepares, poolname := PoolName}) -> case maps:size(Prepares) of 0 -> State; _ -> - case prepare_sql(Prepares, PoolName) of + case prepare_cql(Prepares, PoolName) of {ok, Sts} -> State#{prepare_statement := Sts}; Error -> ?tp( error, - cassandra_prepare_sql_failed, + cassandra_prepare_cql_failed, #{prepares => Prepares, reason => Error} ), - %% mark the prepare_sqlas failed - State#{prepare_sql => {error, Prepares}} + %% mark the prepare_cql as failed + State#{prepare_cql => {error, Prepares}} end end. -prepare_sql(Prepares, PoolName) when is_map(Prepares) -> - prepare_sql(maps:to_list(Prepares), PoolName); -prepare_sql(Prepares, PoolName) -> - case do_prepare_sql(Prepares, PoolName) of +prepare_cql(Prepares, PoolName) when is_map(Prepares) -> + prepare_cql(maps:to_list(Prepares), PoolName); +prepare_cql(Prepares, PoolName) -> + case do_prepare_cql(Prepares, PoolName) of {ok, _Sts} = Ok -> %% prepare for reconnect - ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), + ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_cql_to_conn, [Prepares]}), Ok; Error -> Error end. -do_prepare_sql(Prepares, PoolName) -> - do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}). +do_prepare_cql(Prepares, PoolName) -> + do_prepare_cql(ecpool:workers(PoolName), Prepares, PoolName, #{}). -do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) -> +do_prepare_cql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) -> {ok, Conn} = ecpool_worker:client(Worker), - case prepare_sql_to_conn(Conn, Prepares) of + case prepare_cql_to_conn(Conn, Prepares) of {ok, Sts} -> - do_prepare_sql(T, Prepares, PoolName, Sts); + do_prepare_cql(T, Prepares, PoolName, Sts); Error -> Error end; -do_prepare_sql([], _Prepares, _PoolName, LastSts) -> +do_prepare_cql([], _Prepares, _PoolName, LastSts) -> {ok, LastSts}. -prepare_sql_to_conn(Conn, Prepares) -> - prepare_sql_to_conn(Conn, Prepares, #{}). +prepare_cql_to_conn(Conn, Prepares) -> + prepare_cql_to_conn(Conn, Prepares, #{}). -prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements}; -prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) -> - ?SLOG(info, #{msg => "cassandra_prepare_sql", name => Key, prepare_sql => SQL}), +prepare_cql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements}; +prepare_cql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) -> + ?SLOG(info, #{msg => "cassandra_prepare_cql", name => Key, prepare_cql => SQL}), case ecql:prepare(Conn, Key, SQL) of {ok, Statement} -> - prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement}); + prepare_cql_to_conn(Conn, PrepareList, Statements#{Key => Statement}); {error, Error} = Other -> ?SLOG(error, #{ - msg => "cassandra_prepare_sql_failed", + msg => "cassandra_prepare_cql_failed", worker_pid => Conn, name => Key, - prepare_sql => SQL, + prepare_cql => SQL, error => Error }), Other @@ -394,19 +391,19 @@ assign_type_for_params(Params) -> assign_type_for_params([], Acc) -> lists:reverse(Acc); assign_type_for_params([Param | More], Acc) -> - assign_type_for_params(More, [may_assign_type(Param) | Acc]). + assign_type_for_params(More, [maybe_assign_type(Param) | Acc]). -may_assign_type(true) -> +maybe_assign_type(true) -> {int, 1}; -may_assign_type(false) -> +maybe_assign_type(false) -> {int, 0}; -may_assign_type(V) when is_binary(V); is_list(V); is_atom(V) -> V; -may_assign_type(V) when is_integer(V) -> +maybe_assign_type(V) when is_binary(V); is_list(V); is_atom(V) -> V; +maybe_assign_type(V) when is_integer(V) -> %% The max value of signed int(4) is 2147483647 case V > 2147483647 orelse V < -2147483647 of true -> {bigint, V}; false -> {int, V} end; -may_assign_type(V) when is_float(V) -> {double, V}; -may_assign_type(V) -> +maybe_assign_type(V) when is_float(V) -> {double, V}; +maybe_assign_type(V) -> V. diff --git a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_cassa_SUITE.erl b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_cassa_SUITE.erl index 81b9e3859..95b4407cf 100644 --- a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_cassa_SUITE.erl +++ b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_cassa_SUITE.erl @@ -85,7 +85,8 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok = emqx_common_test_helpers:stop_apps([emqx_conf]), ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), - _ = application:stop(emqx_connector). + _ = application:stop(emqx_connector), + _ = application:stop(emqx_ee_connector). init_per_testcase(_, Config) -> Config.