From 29d767bd625a2430da85ac2ee3dd6c8d18fe0ece Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 16 Jan 2024 14:39:41 +0800 Subject: [PATCH 1/5] ci: add env vars to run cassandra tests locally --- .../docker-compose-toxiproxy.yaml | 4 ++ .ci/docker-compose-file/toxiproxy.json | 12 ++++++ .../test/emqx_bridge_cassandra_SUITE.erl | 8 ++++ .../emqx_bridge_cassandra_connector_SUITE.erl | 40 +++++++++++-------- 4 files changed, 47 insertions(+), 17 deletions(-) diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml index d648d9d78..568d9129c 100644 --- a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -39,6 +39,10 @@ services: - 19042:9042 # Cassandra TLS - 19142:9142 + # Cassandra No Auth + - 19043:9043 + # Cassandra TLS No Auth + - 19143:9143 # S3 - 19000:19000 # S3 TLS diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index c58474039..103bae924 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -96,6 +96,18 @@ "upstream": "cassandra:9142", "enabled": true }, + { + "name": "cassa_no_auth_tcp", + "listen": "0.0.0.0:9043", + "upstream": "cassandra_noauth:9042", + "enabled": true + }, + { + "name": "cassa_no_auth_tls", + "listen": "0.0.0.0:9143", + "upstream": "cassandra_noauth:9142", + "enabled": true + }, { "name": "sqlserver", "listen": "0.0.0.0:1433", diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index 9df219296..e0e3900b0 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -11,6 +11,14 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +%% To run this test locally: +%% ./scripts/ct/run.sh --app apps/emqx_bridge_cassandra --only-up +%% PROFILE=emqx-enterprise PROXY_HOST=localhost CASSA_TLS_HOST=localhost \ +%% CASSA_TLS_PORT=19142 CASSA_TCP_HOST=localhost CASSA_TCP_NO_AUTH_HOST=localhost \ +%% CASSA_TCP_PORT=19042 CASSA_TCP_NO_AUTH_PORT=19043 \ +%% ./rebar3 ct --name 'test@127.0.0.1' -v --suite \ +%% apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE + % SQL definitions -define(SQL_BRIDGE, "insert into mqtt_msg_test(topic, payload, arrived) " diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl index de306e3f0..245110de6 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl @@ -14,20 +14,20 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("stdlib/include/assert.hrl"). +%% To run this test locally: +%% ./scripts/ct/run.sh --app apps/emqx_bridge_cassandra --only-up +%% PROFILE=emqx-enterprise PROXY_HOST=localhost CASSA_TLS_HOST=localhost \ +%% CASSA_TLS_PORT=9142 CASSA_TCP_HOST=localhost CASSA_TCP_NO_AUTH_HOST=localhost \ +%% CASSA_TCP_PORT=19042 CASSA_TCP_NO_AUTH_PORT=19043 \ +%% ./rebar3 ct --name 'test@127.0.0.1' -v --suite \ +%% apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE + %% Cassandra servers are defined at `.ci/docker-compose-file/docker-compose-cassandra.yaml` %% You can change it to `127.0.0.1`, if you run this SUITE locally -define(CASSANDRA_HOST, "cassandra"). -define(CASSANDRA_HOST_NOAUTH, "cassandra_noauth"). -define(CASSANDRA_RESOURCE_MOD, emqx_bridge_cassandra_connector). -%% This test SUITE requires a running cassandra instance. If you don't want to -%% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script -%% you can create a cassandra instance with the following command (execute it -%% from root of the EMQX directory.). You also need to set ?CASSANDRA_HOST and -%% ?CASSANDRA_PORT to appropriate values. -%% -%% sudo docker run --rm -d --name cassandra --network host cassandra:3.11.14 - %% Cassandra default username & password once enable `authenticator: PasswordAuthenticator` %% in cassandra config -define(CASSA_USERNAME, <<"cassandra">>). @@ -45,14 +45,14 @@ groups() -> {noauth, [t_lifecycle]} ]. -cassandra_servers(CassandraHost) -> +cassandra_servers(CassandraHost, CassandraPort) -> lists:map( fun(#{hostname := Host, port := Port}) -> {Host, Port} end, emqx_schema:parse_servers( - iolist_to_binary([CassandraHost, ":", erlang:integer_to_list(?CASSANDRA_DEFAULT_PORT)]), - #{default_port => ?CASSANDRA_DEFAULT_PORT} + iolist_to_binary([CassandraHost, ":", erlang:integer_to_list(CassandraPort)]), + #{default_port => CassandraPort} ) ). @@ -63,25 +63,30 @@ init_per_suite(Config) -> Config. init_per_group(Group, Config) -> - {CassandraHost, AuthOpts} = + {CassandraHost, CassandraPort, AuthOpts} = case Group of auth -> - {?CASSANDRA_HOST, [{username, ?CASSA_USERNAME}, {password, ?CASSA_PASSWORD}]}; + TcpHost = os:getenv("CASSA_TCP_HOST", "toxiproxy"), + TcpPort = list_to_integer(os:getenv("CASSA_TCP_PORT", "9042")), + {TcpHost, TcpPort, [{username, ?CASSA_USERNAME}, {password, ?CASSA_PASSWORD}]}; noauth -> - {?CASSANDRA_HOST_NOAUTH, []} + TcpHost = os:getenv("CASSA_TCP_NO_AUTH_HOST", "toxiproxy"), + TcpPort = list_to_integer(os:getenv("CASSA_TCP_NO_AUTH_PORT", "9043")), + {TcpHost, TcpPort, []} end, - case emqx_common_test_helpers:is_tcp_server_available(CassandraHost, ?CASSANDRA_DEFAULT_PORT) of + case emqx_common_test_helpers:is_tcp_server_available(CassandraHost, CassandraPort) of true -> %% keyspace `mqtt` must be created in advance {ok, Conn} = ecql:connect([ - {nodes, cassandra_servers(CassandraHost)}, + {nodes, cassandra_servers(CassandraHost, CassandraPort)}, {keyspace, "mqtt"} | AuthOpts ]), ecql:close(Conn), [ {cassa_host, CassandraHost}, + {cassa_port, CassandraPort}, {cassa_auth_opts, AuthOpts} | Config ]; @@ -212,6 +217,7 @@ create_local_resource(ResourceId, CheckedConfig) -> cassandra_config(Config) -> Host = ?config(cassa_host, Config), + Port = ?config(cassa_port, Config), AuthOpts = maps:from_list(?config(cassa_auth_opts, Config)), CassConfig = AuthOpts#{ @@ -223,7 +229,7 @@ cassandra_config(Config) -> "~s:~b", [ Host, - ?CASSANDRA_DEFAULT_PORT + Port ] ) ) From b32c0fb0d8dc210d12e7a4d02a7f4cf798b93dc5 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 19 Jan 2024 18:46:35 +0800 Subject: [PATCH 2/5] refactor: split cassandra bridges to actions and connectors --- apps/emqx_bridge/src/emqx_action_info.erl | 1 + .../src/emqx_bridge_cassandra.app.src | 4 +- .../src/emqx_bridge_cassandra.erl | 108 ++++++- .../src/emqx_bridge_cassandra_action_info.erl | 62 ++++ .../src/emqx_bridge_cassandra_connector.erl | 286 ++++++++---------- .../test/emqx_bridge_cassandra_SUITE.erl | 21 +- .../emqx_bridge_cassandra_connector_SUITE.erl | 4 - .../src/schema/emqx_connector_ee_schema.erl | 12 + .../src/schema/emqx_connector_schema.erl | 2 + rel/i18n/emqx_bridge_cassandra.hocon | 10 + .../emqx_bridge_cassandra_connector.hocon | 6 + 11 files changed, 332 insertions(+), 184 deletions(-) create mode 100644 apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_action_info.erl diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index d80050191..3754677a8 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -92,6 +92,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_matrix_action_info, emqx_bridge_mongodb_action_info, emqx_bridge_influxdb_action_info, + emqx_bridge_cassandra_action_info, emqx_bridge_mysql_action_info, emqx_bridge_pgsql_action_info, emqx_bridge_syskeeper_action_info, diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src index 97be100d2..aa8290b98 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_cassandra, [ {description, "EMQX Enterprise Cassandra Bridge"}, - {vsn, "0.1.6"}, + {vsn, "0.2.0"}, {registered, []}, {applications, [ kernel, @@ -8,7 +8,7 @@ emqx_resource, ecql ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_cassandra_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl index 2724b7c09..83268cab5 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.erl @@ -12,11 +12,17 @@ %% schema examples -export([ - conn_bridge_examples/1, values/2, fields/2 ]). +%% Examples +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 +]). + %% schema -export([ namespace/0, @@ -26,10 +32,13 @@ ]). -define(DEFAULT_CQL, << - "insert into mqtt_msg(topic, msgid, sender, qos, payload, arrived, retain) " - "values (${topic}, ${id}, ${clientid}, ${qos}, ${payload}, ${timestamp}, ${flags.retain})" + "insert into mqtt_msg(msgid, topic, qos, payload, arrived) " + "values (${id}, ${topic}, ${qos}, ${payload}, ${timestamp})" >>). +-define(CONNECTOR_TYPE, cassandra). +-define(ACTION_TYPE, cassandra). + %%-------------------------------------------------------------------- %% schema examples @@ -43,6 +52,41 @@ conn_bridge_examples(Method) -> } ]. +bridge_v2_examples(Method) -> + ParamsExample = #{ + parameters => #{ + cql => ?DEFAULT_CQL + } + }, + [ + #{ + <<"cassandra">> => #{ + summary => <<"Cassandra Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, cassandra, cassandra, ParamsExample + ) + } + } + ]. + +connector_examples(Method) -> + [ + #{ + <<"cassandra">> => #{ + summary => <<"Cassandra Connector">>, + value => emqx_connector_schema:connector_values( + Method, cassandra, #{ + servers => <<"127.0.0.1:9042">>, + keyspace => <<"mqtt">>, + username => <<"root">>, + password => <<"******">>, + pool_size => 8 + } + ) + } + } + ]. + %% no difference in get/post/put method values(_Method, Type) -> #{ @@ -73,14 +117,47 @@ namespace() -> "bridge_cassa". roots() -> []. +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + emqx_bridge_cassandra_connector:fields("connector") ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(action) -> + {cassandra, + mk( + hoconsc:map(name, ref(?MODULE, cassandra_action)), + #{desc => <<"Cassandra Action Config">>, required => false} + )}; +fields(cassandra_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk(ref(?MODULE, action_parameters), #{ + required => true, desc => ?DESC(action_parameters) + }) + ); +fields(action_parameters) -> + [ + cql_field() + ]; +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + Fields = + emqx_bridge_cassandra_connector:fields("connector") ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(cassandra_action)); fields("config") -> [ + cql_field(), {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {cql, - mk( - binary(), - #{desc => ?DESC("cql_template"), default => ?DEFAULT_CQL, format => <<"sql">>} - )}, {local_topic, mk( binary(), @@ -99,8 +176,23 @@ fields("get") -> fields("post", Type) -> [type_field(Type), name_field() | fields("config")]. +cql_field() -> + {cql, + mk( + binary(), + #{desc => ?DESC("cql_template"), default => ?DEFAULT_CQL, format => <<"sql">>} + )}. + desc("config") -> ?DESC("desc_config"); +desc(cassandra_action) -> + ?DESC(cassandra_action); +desc(action_parameters) -> + ?DESC(action_parameters); +desc("config_connector") -> + ?DESC("desc_config"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for Cassandra using `", string:to_upper(Method), "` method."]; desc(_) -> diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_action_info.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_action_info.erl new file mode 100644 index 000000000..14db7cf50 --- /dev/null +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_action_info.erl @@ -0,0 +1,62 @@ +-module(emqx_bridge_cassandra_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1, + connector_action_config_to_bridge_v1_config/2, + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(SCHEMA_MODULE, emqx_bridge_cassandra). + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(cassandra_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig#{<<"connector">> => ConnectorName} + ). + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ActionTopLevelKeys = schema_keys(cassandra_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ConnectorTopLevelKeys = schema_keys("config_connector"), + ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys), + ConnConfig0 = maps:with(ConnectorKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + ConnConfig0 + ). + +connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) -> + RawConf = emqx_action_info:connector_action_config_to_bridge_v1_config( + ConnectorRawConf, ActionRawConf + ), + maps:without([<<"cassandra_type">>], RawConf). + +bridge_v1_type_name() -> cassandra. + +action_type_name() -> cassandra. + +connector_type_name() -> cassandra. + +schema_module() -> ?SCHEMA_MODULE. + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config), + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). + +schema_keys(Name) -> + [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))]. diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index c6bc7098c..3db71c9e0 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -14,13 +14,17 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% schema --export([roots/0, fields/1]). +-export([roots/0, fields/1, desc/1]). %% callbacks of behaviour emqx_resource -export([ callback_mode/0, on_start/2, on_stop/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channel_status/3, + on_get_channels/1, on_query/3, on_query_async/4, on_batch_query/3, @@ -28,6 +32,8 @@ on_get_status/2 ]). +-export([transform_bridge_v1_config_to_connector_config/1]). + %% callbacks of ecpool -export([ connect/1, @@ -39,16 +45,10 @@ -export([do_get_status/1]). --type prepares() :: #{atom() => binary()}. --type params_tokens() :: #{atom() => list()}. - -type state() :: #{ pool_name := binary(), - prepare_cql := prepares(), - params_tokens := params_tokens(), - %% returned by ecql:prepare/2 - prepare_statement := binary() + channels := #{} }. -define(DEFAULT_SERVER_OPTION, #{default_port => ?CASSANDRA_DEFAULT_PORT}). @@ -62,7 +62,9 @@ roots() -> fields(config) -> cassandra_db_fields() ++ emqx_connector_schema_lib:ssl_fields() ++ - emqx_connector_schema_lib:prepare_statement_fields(). + emqx_connector_schema_lib:prepare_statement_fields(); +fields("connector") -> + cassandra_db_fields() ++ emqx_connector_schema_lib:ssl_fields(). cassandra_db_fields() -> [ @@ -83,6 +85,11 @@ keyspace(desc) -> ?DESC("keyspace"); keyspace(required) -> true; keyspace(_) -> undefined. +desc(config) -> + ?DESC("config"); +desc("connector") -> + ?DESC("connector"). + %%-------------------------------------------------------------------- %% callbacks for emqx_resource @@ -130,10 +137,9 @@ on_start( false -> [] end, - State = parse_prepare_cql(Config), case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> - {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; + {ok, #{pool_name => InstId, channels => #{}}}; {error, Reason} -> ?tp( cassandra_connector_start_failed, @@ -149,23 +155,49 @@ on_stop(InstId, _State) -> }), emqx_resource_pool:stop(InstId). +on_add_channel(_InstId, #{channels := Channs} = OldState, ChannId, ChannConf0) -> + #{parameters := #{cql := CQL}} = ChannConf0, + {PrepareCQL, ParamsTokens} = emqx_placeholder:preproc_sql(CQL, '?'), + ParsedCql = #{ + prepare_key => short_prepare_key(ChannId), + prepare_cql => PrepareCQL, + params_tokens => ParamsTokens + }, + NewChanns = Channs#{ChannId => #{parsed_cql => ParsedCql, prepare_result => not_prepared}}, + {ok, OldState#{channels => NewChanns}}. + +on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannId) -> + NewState = State#{channels => maps:remove(ChannId, Channels)}, + {ok, NewState}. + +on_get_channel_status(InstanceId, ChannId, #{channels := Channels, pool_name := PoolName} = State) -> + case on_get_status(InstanceId, State) of + connected -> + #{parsed_cql := ParsedCql} = maps:get(ChannId, Channels), + case prepare_cql_to_cassandra(ParsedCql, PoolName) of + {ok, _} -> connected; + {error, Reason} -> {connecting, Reason} + end; + _ -> + connecting + end. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + -type request() :: % emqx_bridge.erl - {send_message, Params :: map()} + {ChannId :: binary(), Params :: map()} % common query - | {query, SQL :: binary()} - | {query, SQL :: binary(), Params :: map()}. + | {query, CQL :: binary()} + | {query, CQL :: binary(), Params :: map()}. -spec on_query( emqx_resource:resource_id(), request(), state() ) -> ok | {ok, ecql:cql_result()} | {error, {recoverable_error | unrecoverable_error, term()}}. -on_query( - InstId, - Request, - State -) -> +on_query(InstId, Request, State) -> do_single_query(InstId, Request, sync, State). -spec on_query_async( @@ -174,21 +206,11 @@ on_query( {function(), list()}, state() ) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}. -on_query_async( - InstId, - Request, - Callback, - State -) -> +on_query_async(InstId, Request, Callback, State) -> do_single_query(InstId, Request, {async, Callback}, State). -do_single_query( - InstId, - Request, - Async, - #{pool_name := PoolName} = State -) -> - {Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request), +do_single_query(InstId, Request, Async, #{pool_name := PoolName} = State) -> + {Type, PreparedKeyOrCQL, Params} = parse_request_to_cql(Request), ?tp( debug, cassandra_connector_received_cql_query, @@ -196,12 +218,12 @@ do_single_query( connector => InstId, type => Type, params => Params, - prepared_key_or_cql => PreparedKeyOrSQL, + prepared_key_or_cql => PreparedKeyOrCQL, state => State } ), - {PreparedKeyOrSQL1, Data} = proc_cql_params(Type, PreparedKeyOrSQL, Params, State), - Res = exec_cql_query(InstId, PoolName, Type, Async, PreparedKeyOrSQL1, Data), + {PreparedKeyOrCQL1, Data} = proc_cql_params(Type, PreparedKeyOrCQL, Params, State), + Res = exec_cql_query(InstId, PoolName, Type, Async, PreparedKeyOrCQL1, Data), handle_result(Res). -spec on_batch_query( @@ -209,11 +231,7 @@ do_single_query( [request()], state() ) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}. -on_batch_query( - InstId, - Requests, - State -) -> +on_batch_query(InstId, Requests, State) -> do_batch_query(InstId, Requests, sync, State). -spec on_batch_query_async( @@ -222,25 +240,15 @@ on_batch_query( {function(), list()}, state() ) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}. -on_batch_query_async( - InstId, - Requests, - Callback, - State -) -> +on_batch_query_async(InstId, Requests, Callback, State) -> do_batch_query(InstId, Requests, {async, Callback}, State). -do_batch_query( - InstId, - Requests, - Async, - #{pool_name := PoolName} = State -) -> +do_batch_query(InstId, Requests, Async, #{pool_name := PoolName} = State) -> CQLs = lists:map( fun(Request) -> - {Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request), - proc_cql_params(Type, PreparedKeyOrSQL, Params, State) + {Type, PreparedKeyOrCQL, Params} = parse_request_to_cql(Request), + proc_cql_params(Type, PreparedKeyOrCQL, Params, State) end, Requests ), @@ -256,26 +264,24 @@ do_batch_query( Res = exec_cql_batch_query(InstId, PoolName, Async, CQLs), handle_result(Res). -parse_request_to_cql({send_message, Params}) -> - {prepared_query, _Key = send_message, Params}; -parse_request_to_cql({query, SQL}) -> - parse_request_to_cql({query, SQL, #{}}); -parse_request_to_cql({query, SQL, Params}) -> - {query, SQL, Params}. +parse_request_to_cql({query, CQL}) -> + {query, CQL, #{}}; +parse_request_to_cql({query, CQL, Params}) -> + {query, CQL, Params}; +parse_request_to_cql({ChannId, Params}) -> + {prepared_query, ChannId, Params}. -proc_cql_params( - prepared_query, - PreparedKey0, - Params, - #{prepare_statement := Prepares, params_tokens := ParamsTokens} -) -> - %% assert - _PreparedKey = maps:get(PreparedKey0, Prepares), - Tokens = maps:get(PreparedKey0, ParamsTokens), - {PreparedKey0, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}; -proc_cql_params(query, SQL, Params, _State) -> - {SQL1, Tokens} = emqx_placeholder:preproc_sql(SQL, '?'), - {SQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}. +proc_cql_params(prepared_query, ChannId, Params, #{channels := Channs}) -> + #{ + parsed_cql := #{ + prepare_key := PrepareKey, + params_tokens := ParamsTokens + } + } = maps:get(ChannId, Channs), + {PrepareKey, assign_type_for_params(emqx_placeholder:proc_sql(ParamsTokens, Params))}; +proc_cql_params(query, CQL, Params, _State) -> + {CQL1, Tokens} = emqx_placeholder:preproc_sql(CQL, '?'), + {CQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}. exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when Type == query; Type == prepared_query @@ -314,38 +320,15 @@ exec_cql_batch_query(InstId, PoolName, Async, CQLs) -> exec(PoolName, Query) -> ecpool:pick_and_do(PoolName, Query, no_handover). -on_get_status(_InstId, #{pool_name := PoolName} = State) -> +on_get_status(_InstId, #{pool_name := PoolName}) -> case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of - true -> - case do_check_prepares(State) of - ok -> - connected; - {ok, NState} -> - %% return new state with prepared statements - {connected, NState}; - false -> - %% do not log error, it is logged in prepare_cql_to_conn - connecting - end; - false -> - connecting + true -> connected; + false -> connecting end. do_get_status(Conn) -> ok == element(1, ecql:query(Conn, "SELECT cluster_name FROM system.local")). -do_check_prepares(#{prepare_cql := Prepares}) when is_map(Prepares) -> - ok; -do_check_prepares(State = #{pool_name := PoolName, prepare_cql := {error, Prepares}}) -> - %% retry to prepare - case prepare_cql(Prepares, PoolName) of - {ok, Sts} -> - %% remove the error - {ok, State#{prepare_cql => Prepares, prepare_statement := Sts}}; - _Error -> - false - end. - %%-------------------------------------------------------------------- %% callbacks query @@ -394,88 +377,50 @@ conn_opts([Opt | Opts], Acc) -> %%-------------------------------------------------------------------- %% prepare - -%% XXX: hardcode -%% note: the `cql` param is passed by emqx_bridge_cassandra -parse_prepare_cql(#{cql := SQL}) -> - parse_prepare_cql([{send_message, SQL}], #{}, #{}); -parse_prepare_cql(_) -> - #{prepare_cql => #{}, params_tokens => #{}}. - -parse_prepare_cql([{Key, H} | T], Prepares, Tokens) -> - {PrepareSQL, ParamsTokens} = emqx_placeholder:preproc_sql(H, '?'), - parse_prepare_cql( - T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} - ); -parse_prepare_cql([], Prepares, Tokens) -> - #{ - prepare_cql => Prepares, - params_tokens => Tokens - }. - -init_prepare(State = #{prepare_cql := Prepares, pool_name := PoolName}) -> - case maps:size(Prepares) of - 0 -> - State; - _ -> - case prepare_cql(Prepares, PoolName) of - {ok, Sts} -> - State#{prepare_statement := Sts}; - Error -> - ?tp( - error, - cassandra_prepare_cql_failed, - #{prepares => Prepares, reason => Error} - ), - %% mark the prepare_cql as failed - State#{prepare_cql => {error, Prepares}} - end - end. - -prepare_cql(Prepares, PoolName) when is_map(Prepares) -> - prepare_cql(maps:to_list(Prepares), PoolName); -prepare_cql(Prepares, PoolName) -> - case do_prepare_cql(Prepares, PoolName) of - {ok, _Sts} = Ok -> +prepare_cql_to_cassandra(ParsedCql, PoolName) -> + case prepare_cql_to_cassandra(ecpool:workers(PoolName), ParsedCql, #{}) of + {ok, Statement} -> %% prepare for reconnect - ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_cql_to_conn, [Prepares]}), - Ok; + ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_cql_to_conn, [ParsedCql]}), + {ok, Statement}; Error -> + ?tp( + error, + cassandra_prepare_cql_failed, + #{parsed_cql => ParsedCql, reason => Error} + ), Error end. -do_prepare_cql(Prepares, PoolName) -> - do_prepare_cql(ecpool:workers(PoolName), Prepares, #{}). - -do_prepare_cql([{_Name, Worker} | T], Prepares, _LastSts) -> +prepare_cql_to_cassandra([{_Name, Worker} | T], ParsedCql, _LastSts) -> {ok, Conn} = ecpool_worker:client(Worker), - case prepare_cql_to_conn(Conn, Prepares) of - {ok, Sts} -> - do_prepare_cql(T, Prepares, Sts); + case prepare_cql_to_conn(Conn, ParsedCql) of + {ok, Statement} -> + prepare_cql_to_cassandra(T, ParsedCql, Statement); Error -> Error end; -do_prepare_cql([], _Prepares, LastSts) -> +prepare_cql_to_cassandra([], _ParsedCql, LastSts) -> {ok, LastSts}. -prepare_cql_to_conn(Conn, Prepares) -> - prepare_cql_to_conn(Conn, Prepares, #{}). - -prepare_cql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements}; -prepare_cql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) -> - ?SLOG(info, #{msg => "cassandra_prepare_cql", name => Key, prepare_cql => SQL}), - case ecql:prepare(Conn, Key, SQL) of +prepare_cql_to_conn(Conn, #{prepare_key := PrepareKey, prepare_cql := PrepareCQL}) when + is_pid(Conn) +-> + ?SLOG(info, #{ + msg => "cassandra_prepare_cql", prepare_key => PrepareKey, prepare_cql => PrepareCQL + }), + case ecql:prepare(Conn, PrepareKey, PrepareCQL) of {ok, Statement} -> - prepare_cql_to_conn(Conn, PrepareList, Statements#{Key => Statement}); - {error, Error} = Other -> + {ok, Statement}; + {error, Reason} = Error -> ?SLOG(error, #{ msg => "cassandra_prepare_cql_failed", worker_pid => Conn, - name => Key, - prepare_cql => SQL, - error => Error + name => PrepareKey, + prepare_cql => PrepareCQL, + reason => Reason }), - Other + Error end. handle_result({error, disconnected}) -> @@ -487,6 +432,9 @@ handle_result({error, Error}) -> handle_result(Res) -> Res. +transform_bridge_v1_config_to_connector_config(_) -> + ok. + %%-------------------------------------------------------------------- %% utils @@ -513,3 +461,11 @@ maybe_assign_type(V) when is_integer(V) -> maybe_assign_type(V) when is_float(V) -> {double, V}; maybe_assign_type(V) -> V. + +short_prepare_key(Str) when is_binary(Str) -> + true = size(Str) > 0, + Sha = crypto:hash(sha, Str), + %% TODO: change to binary:encode_hex(X, lowercase) when OTP version is always > 25 + Hex = string:lowercase(binary:encode_hex(Sha)), + <> = Hex, + binary_to_atom(<<"cassa_prepare_key:", UniqueEnough/binary>>). diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index e0e3900b0..77aec7d99 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -301,17 +301,28 @@ send_message(Config, Payload) -> query_resource(Config, Request) -> Name = ?config(cassa_name, Config), BridgeType = ?config(cassa_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). + BridgeV2Id = emqx_bridge_v2:id(BridgeType, Name), + ConnectorResId = emqx_connector_resource:resource_id( + cassandra, <<"connector_emqx_bridge_cassandra_SUITE">> + ), + emqx_resource:query(BridgeV2Id, Request, #{ + timeout => 1_000, connector_resource_id => ConnectorResId + }). query_resource_async(Config, Request) -> Name = ?config(cassa_name, Config), BridgeType = ?config(cassa_bridge_type, Config), Ref = alias([reply]), AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end, - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - Return = emqx_resource:query(ResourceID, Request, #{ - timeout => 500, async_reply_fun => {AsyncReplyFun, []} + BridgeV2Id = emqx_bridge_v2:id(BridgeType, Name), + ConnectorResId = emqx_connector_resource:resource_id( + cassandra, <<"connector_emqx_bridge_cassandra_SUITE">> + ), + Return = emqx_resource:query(BridgeV2Id, Request, #{ + timeout => 500, + async_reply_fun => {AsyncReplyFun, []}, + connector_resource_id => ConnectorResId, + query_mode => async }), {Return, Ref}. diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl index 245110de6..50d82397a 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl @@ -22,10 +22,6 @@ %% ./rebar3 ct --name 'test@127.0.0.1' -v --suite \ %% apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE -%% Cassandra servers are defined at `.ci/docker-compose-file/docker-compose-cassandra.yaml` -%% You can change it to `127.0.0.1`, if you run this SUITE locally --define(CASSANDRA_HOST, "cassandra"). --define(CASSANDRA_HOST_NOAUTH, "cassandra_noauth"). -define(CASSANDRA_RESOURCE_MOD, emqx_bridge_cassandra_connector). %% Cassandra default username & password once enable `authenticator: PasswordAuthenticator` diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 655892d88..2b50252e8 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -36,6 +36,8 @@ resource_type(mongodb) -> emqx_bridge_mongodb_connector; resource_type(influxdb) -> emqx_bridge_influxdb_connector; +resource_type(cassandra) -> + emqx_bridge_cassandra_connector; resource_type(mysql) -> emqx_bridge_mysql_connector; resource_type(pgsql) -> @@ -130,6 +132,14 @@ connector_structs() -> required => false } )}, + {cassandra, + mk( + hoconsc:map(name, ref(emqx_bridge_cassandra, "config_connector")), + #{ + desc => <<"Cassandra Connector Config">>, + required => false + } + )}, {mysql, mk( hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")), @@ -205,6 +215,7 @@ schema_modules() -> emqx_bridge_matrix, emqx_bridge_mongodb, emqx_bridge_influxdb, + emqx_bridge_cassandra, emqx_bridge_mysql, emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_proxy, @@ -234,6 +245,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"), api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"), api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"), + api_ref(emqx_bridge_cassandra, <<"cassandra">>, Method ++ "_connector"), api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 615b89230..f4bbb5459 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -137,6 +137,8 @@ connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; connector_type_to_bridge_types(influxdb) -> [influxdb, influxdb_api_v1, influxdb_api_v2]; +connector_type_to_bridge_types(cassandra) -> + [cassandra]; connector_type_to_bridge_types(mysql) -> [mysql]; connector_type_to_bridge_types(mqtt) -> diff --git a/rel/i18n/emqx_bridge_cassandra.hocon b/rel/i18n/emqx_bridge_cassandra.hocon index a96315340..29eb35de5 100644 --- a/rel/i18n/emqx_bridge_cassandra.hocon +++ b/rel/i18n/emqx_bridge_cassandra.hocon @@ -1,5 +1,15 @@ emqx_bridge_cassandra { +action_parameters.desc: +"""Action specific configs.""" +action_parameters.label: +"""Action""" + +cassandra_action.desc: +"""Action configs.""" +cassandra_action.label: +"""Action""" + config_enable.desc: """Enable or disable this bridge""" diff --git a/rel/i18n/emqx_bridge_cassandra_connector.hocon b/rel/i18n/emqx_bridge_cassandra_connector.hocon index b149cce8a..40e1c0e22 100644 --- a/rel/i18n/emqx_bridge_cassandra_connector.hocon +++ b/rel/i18n/emqx_bridge_cassandra_connector.hocon @@ -1,5 +1,11 @@ emqx_bridge_cassandra_connector { +config.desc: +"""Cassandra connection config""" + +config.label: +"""Connection config""" + keyspace.desc: """Keyspace name to connect to.""" From 0e1043f80cc397618d15bf6a24af30b3a3ed31d4 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sun, 21 Jan 2024 21:00:38 +0800 Subject: [PATCH 3/5] ci: update generated connector name --- .../test/emqx_bridge_cassandra_SUITE.erl | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index 77aec7d99..09deaa699 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -302,9 +302,7 @@ query_resource(Config, Request) -> Name = ?config(cassa_name, Config), BridgeType = ?config(cassa_bridge_type, Config), BridgeV2Id = emqx_bridge_v2:id(BridgeType, Name), - ConnectorResId = emqx_connector_resource:resource_id( - cassandra, <<"connector_emqx_bridge_cassandra_SUITE">> - ), + ConnectorResId = emqx_connector_resource:resource_id(BridgeType, Name), emqx_resource:query(BridgeV2Id, Request, #{ timeout => 1_000, connector_resource_id => ConnectorResId }). @@ -315,9 +313,7 @@ query_resource_async(Config, Request) -> Ref = alias([reply]), AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end, BridgeV2Id = emqx_bridge_v2:id(BridgeType, Name), - ConnectorResId = emqx_connector_resource:resource_id( - cassandra, <<"connector_emqx_bridge_cassandra_SUITE">> - ), + ConnectorResId = emqx_connector_resource:resource_id(BridgeType, Name), Return = emqx_resource:query(BridgeV2Id, Request, #{ timeout => 500, async_reply_fun => {AsyncReplyFun, []}, From 837b19cb1e6023a90195db0052f00aee8ace3e4f Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 22 Jan 2024 15:45:00 +0800 Subject: [PATCH 4/5] chore: update change logs for cassandra bridge_v2 --- changes/ee/feat-12330.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ee/feat-12330.en.md diff --git a/changes/ee/feat-12330.en.md b/changes/ee/feat-12330.en.md new file mode 100644 index 000000000..963098659 --- /dev/null +++ b/changes/ee/feat-12330.en.md @@ -0,0 +1 @@ +The bridges for Cassandra have been split so they are available via the connectors and actions APIs. They are still backwards compatible with the old bridge API. From 218af3fef4b62c3ca8b6c5ac57360c69585f84c6 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 23 Jan 2024 14:14:23 +0800 Subject: [PATCH 5/5] chore: update ecql to 0.6.0 --- apps/emqx_bridge_cassandra/rebar.config | 2 +- .../src/emqx_bridge_cassandra_connector.erl | 11 +++-------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/apps/emqx_bridge_cassandra/rebar.config b/apps/emqx_bridge_cassandra/rebar.config index c0a72fef9..04ee603fa 100644 --- a/apps/emqx_bridge_cassandra/rebar.config +++ b/apps/emqx_bridge_cassandra/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.5.2"}}}, + {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.6.0"}}}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index 3db71c9e0..3b30f1d26 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -159,7 +159,7 @@ on_add_channel(_InstId, #{channels := Channs} = OldState, ChannId, ChannConf0) - #{parameters := #{cql := CQL}} = ChannConf0, {PrepareCQL, ParamsTokens} = emqx_placeholder:preproc_sql(CQL, '?'), ParsedCql = #{ - prepare_key => short_prepare_key(ChannId), + prepare_key => make_prepare_key(ChannId), prepare_cql => PrepareCQL, params_tokens => ParamsTokens }, @@ -462,10 +462,5 @@ maybe_assign_type(V) when is_float(V) -> {double, V}; maybe_assign_type(V) -> V. -short_prepare_key(Str) when is_binary(Str) -> - true = size(Str) > 0, - Sha = crypto:hash(sha, Str), - %% TODO: change to binary:encode_hex(X, lowercase) when OTP version is always > 25 - Hex = string:lowercase(binary:encode_hex(Sha)), - <> = Hex, - binary_to_atom(<<"cassa_prepare_key:", UniqueEnough/binary>>). +make_prepare_key(ChannId) -> + ChannId.