chore: apply review suggestions
- Rename sql to cql - Add tests for `bridges_probe` API
This commit is contained in:
parent
ac41c7e653
commit
9b63bdc1e0
|
@ -14,9 +14,9 @@ services:
|
||||||
CASSANDRA_RPC_ADDRESS: "0.0.0.0"
|
CASSANDRA_RPC_ADDRESS: "0.0.0.0"
|
||||||
volumes:
|
volumes:
|
||||||
- ./certs:/certs
|
- ./certs:/certs
|
||||||
ports:
|
#ports:
|
||||||
- "9042:9042"
|
# - "9042:9042"
|
||||||
- "9142:9142"
|
# - "9142:9142"
|
||||||
command:
|
command:
|
||||||
- /bin/bash
|
- /bin/bash
|
||||||
- -c
|
- -c
|
||||||
|
|
|
@ -16,14 +16,14 @@ will be forwarded."""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sql_template {
|
cql_template {
|
||||||
desc {
|
desc {
|
||||||
en: """SQL Template"""
|
en: """CQL Template"""
|
||||||
zh: """SQL 模板"""
|
zh: """CQL 模板"""
|
||||||
}
|
}
|
||||||
label {
|
label {
|
||||||
en: "SQL Template"
|
en: "CQL Template"
|
||||||
zh: "SQL 模板"
|
zh: "CQL 模板"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
config_enable {
|
config_enable {
|
||||||
|
|
|
@ -36,7 +36,7 @@
|
||||||
conn_bridge_examples(Method) ->
|
conn_bridge_examples(Method) ->
|
||||||
[
|
[
|
||||||
#{
|
#{
|
||||||
<<"cassa">> => #{
|
<<"cassandra">> => #{
|
||||||
summary => <<"Cassandra Bridge">>,
|
summary => <<"Cassandra Bridge">>,
|
||||||
value => values(Method, cassandra)
|
value => values(Method, cassandra)
|
||||||
}
|
}
|
||||||
|
@ -54,7 +54,7 @@ values(_Method, Type) ->
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
username => <<"root">>,
|
username => <<"root">>,
|
||||||
password => <<"public">>,
|
password => <<"public">>,
|
||||||
sql => ?DEFAULT_CQL,
|
cql => ?DEFAULT_CQL,
|
||||||
local_topic => <<"local/topic/#">>,
|
local_topic => <<"local/topic/#">>,
|
||||||
resource_opts => #{
|
resource_opts => #{
|
||||||
worker_pool_size => 8,
|
worker_pool_size => 8,
|
||||||
|
@ -77,10 +77,10 @@ roots() -> [].
|
||||||
fields("config") ->
|
fields("config") ->
|
||||||
[
|
[
|
||||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||||
{sql,
|
{cql,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
binary(),
|
||||||
#{desc => ?DESC("sql_template"), default => ?DEFAULT_CQL, format => <<"sql">>}
|
#{desc => ?DESC("cql_template"), default => ?DEFAULT_CQL, format => <<"sql">>}
|
||||||
)},
|
)},
|
||||||
{local_topic,
|
{local_topic,
|
||||||
mk(
|
mk(
|
||||||
|
@ -102,7 +102,7 @@ fields("config") ->
|
||||||
fields("creation_opts") ->
|
fields("creation_opts") ->
|
||||||
emqx_resource_schema:fields("creation_opts_sync_only");
|
emqx_resource_schema:fields("creation_opts_sync_only");
|
||||||
fields("post") ->
|
fields("post") ->
|
||||||
fields("post", cassa);
|
fields("post", cassandra);
|
||||||
fields("put") ->
|
fields("put") ->
|
||||||
fields("config");
|
fields("config");
|
||||||
fields("get") ->
|
fields("get") ->
|
||||||
|
|
|
@ -27,9 +27,9 @@
|
||||||
");\n"
|
");\n"
|
||||||
""
|
""
|
||||||
).
|
).
|
||||||
-define(SQL_DROP_TABLE, "DROP TABLE mqtt_msg_test").
|
-define(SQL_DROP_TABLE, "DROP TABLE mqtt.mqtt_msg_test").
|
||||||
-define(SQL_DELETE, "TRUNCATE mqtt_msg_test").
|
-define(SQL_DELETE, "TRUNCATE mqtt.mqtt_msg_test").
|
||||||
-define(SQL_SELECT, "SELECT payload FROM mqtt_msg_test").
|
-define(SQL_SELECT, "SELECT payload FROM mqtt.mqtt_msg_test").
|
||||||
|
|
||||||
% DB defaults
|
% DB defaults
|
||||||
-define(CASSA_KEYSPACE, "mqtt").
|
-define(CASSA_KEYSPACE, "mqtt").
|
||||||
|
@ -46,6 +46,20 @@
|
||||||
-define(CERTFILE, filename:join(?CERT_ROOT, ["client.pem"])).
|
-define(CERTFILE, filename:join(?CERT_ROOT, ["client.pem"])).
|
||||||
-define(KEYFILE, filename:join(?CERT_ROOT, ["client.key"])).
|
-define(KEYFILE, filename:join(?CERT_ROOT, ["client.key"])).
|
||||||
|
|
||||||
|
%% How to run it locally:
|
||||||
|
%% 1. Start all deps services
|
||||||
|
%% sudo docker compose -f .ci/docker-compose-file/docker-compose.yaml \
|
||||||
|
%% -f .ci/docker-compose-file/docker-compose-cassandra.yaml \
|
||||||
|
%% -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
|
||||||
|
%% up --build
|
||||||
|
%%
|
||||||
|
%% 2. Run use cases with special environment variables
|
||||||
|
%% CASSA_TCP_HOST=127.0.0.1 CASSA_TCP_PORT=19042 \
|
||||||
|
%% CASSA_TLS_HOST=127.0.0.1 CASSA_TLS_PORT=19142 \
|
||||||
|
%% PROXY_HOST=127.0.0.1 ./rebar3 as test ct -c -v --name ct@127.0.0.1 \
|
||||||
|
%% --suite lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl
|
||||||
|
%%
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% CT boilerplate
|
%% CT boilerplate
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -197,7 +211,7 @@ cassa_config(BridgeType, Config) ->
|
||||||
" keyspace = ~p\n"
|
" keyspace = ~p\n"
|
||||||
" username = ~p\n"
|
" username = ~p\n"
|
||||||
" password = ~p\n"
|
" password = ~p\n"
|
||||||
" sql = ~p\n"
|
" cql = ~p\n"
|
||||||
" resource_opts = {\n"
|
" resource_opts = {\n"
|
||||||
" request_timeout = 500ms\n"
|
" request_timeout = 500ms\n"
|
||||||
" batch_size = ~b\n"
|
" batch_size = ~b\n"
|
||||||
|
@ -238,8 +252,8 @@ parse_and_check(ConfigString, BridgeType, Name) ->
|
||||||
create_bridge(Config) ->
|
create_bridge(Config) ->
|
||||||
BridgeType = ?config(cassa_bridge_type, Config),
|
BridgeType = ?config(cassa_bridge_type, Config),
|
||||||
Name = ?config(cassa_name, Config),
|
Name = ?config(cassa_name, Config),
|
||||||
PGConfig = ?config(cassa_config, Config),
|
BridgeConfig = ?config(cassa_config, Config),
|
||||||
emqx_bridge:create(BridgeType, Name, PGConfig).
|
emqx_bridge:create(BridgeType, Name, BridgeConfig).
|
||||||
|
|
||||||
delete_bridge(Config) ->
|
delete_bridge(Config) ->
|
||||||
BridgeType = ?config(cassa_bridge_type, Config),
|
BridgeType = ?config(cassa_bridge_type, Config),
|
||||||
|
@ -254,6 +268,14 @@ create_bridge_http(Params) ->
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
bridges_probe_http(Params) ->
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
|
||||||
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
|
||||||
|
{ok, _} -> ok;
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
send_message(Config, Payload) ->
|
send_message(Config, Payload) ->
|
||||||
Name = ?config(cassa_name, Config),
|
Name = ?config(cassa_name, Config),
|
||||||
BridgeType = ?config(cassa_bridge_type, Config),
|
BridgeType = ?config(cassa_bridge_type, Config),
|
||||||
|
@ -294,25 +316,33 @@ connect_direct_cassa(Config) ->
|
||||||
|
|
||||||
% These funs connect and then stop the cassandra connection
|
% These funs connect and then stop the cassandra connection
|
||||||
connect_and_create_table(Config) ->
|
connect_and_create_table(Config) ->
|
||||||
Con = connect_direct_cassa(Config),
|
with_direct_conn(Config, fun(Conn) ->
|
||||||
{ok, _} = ecql:query(Con, ?SQL_CREATE_TABLE),
|
{ok, _} = ecql:query(Conn, ?SQL_CREATE_TABLE)
|
||||||
ok = ecql:close(Con).
|
end).
|
||||||
|
|
||||||
connect_and_drop_table(Config) ->
|
connect_and_drop_table(Config) ->
|
||||||
Con = connect_direct_cassa(Config),
|
with_direct_conn(Config, fun(Conn) ->
|
||||||
{ok, _} = ecql:query(Con, ?SQL_DROP_TABLE),
|
{ok, _} = ecql:query(Conn, ?SQL_DROP_TABLE)
|
||||||
ok = ecql:close(Con).
|
end).
|
||||||
|
|
||||||
connect_and_clear_table(Config) ->
|
connect_and_clear_table(Config) ->
|
||||||
Con = connect_direct_cassa(Config),
|
with_direct_conn(Config, fun(Conn) ->
|
||||||
ok = ecql:query(Con, ?SQL_DELETE),
|
ok = ecql:query(Conn, ?SQL_DELETE)
|
||||||
ok = ecql:close(Con).
|
end).
|
||||||
|
|
||||||
connect_and_get_payload(Config) ->
|
connect_and_get_payload(Config) ->
|
||||||
Con = connect_direct_cassa(Config),
|
with_direct_conn(Config, fun(Conn) ->
|
||||||
{ok, {_Keyspace, _ColsSpec, [[Result]]}} = ecql:query(Con, ?SQL_SELECT),
|
{ok, {_Keyspace, _ColsSpec, [[Result]]}} = ecql:query(Conn, ?SQL_SELECT),
|
||||||
ok = ecql:close(Con),
|
Result
|
||||||
Result.
|
end).
|
||||||
|
|
||||||
|
with_direct_conn(Config, Fn) ->
|
||||||
|
Conn = connect_direct_cassa(Config),
|
||||||
|
try
|
||||||
|
Fn(Conn)
|
||||||
|
after
|
||||||
|
ok = ecql:close(Conn)
|
||||||
|
end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Testcases
|
%% Testcases
|
||||||
|
@ -358,14 +388,14 @@ t_setup_via_config_and_publish(Config) ->
|
||||||
t_setup_via_http_api_and_publish(Config) ->
|
t_setup_via_http_api_and_publish(Config) ->
|
||||||
BridgeType = ?config(cassa_bridge_type, Config),
|
BridgeType = ?config(cassa_bridge_type, Config),
|
||||||
Name = ?config(cassa_name, Config),
|
Name = ?config(cassa_name, Config),
|
||||||
PgsqlConfig0 = ?config(cassa_config, Config),
|
BridgeConfig0 = ?config(cassa_config, Config),
|
||||||
PgsqlConfig = PgsqlConfig0#{
|
BridgeConfig = BridgeConfig0#{
|
||||||
<<"name">> => Name,
|
<<"name">> => Name,
|
||||||
<<"type">> => BridgeType
|
<<"type">> => BridgeType
|
||||||
},
|
},
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, _},
|
{ok, _},
|
||||||
create_bridge_http(PgsqlConfig)
|
create_bridge_http(BridgeConfig)
|
||||||
),
|
),
|
||||||
Val = integer_to_binary(erlang:unique_integer()),
|
Val = integer_to_binary(erlang:unique_integer()),
|
||||||
SentData = #{
|
SentData = #{
|
||||||
|
@ -421,6 +451,18 @@ t_get_status(Config) ->
|
||||||
end),
|
end),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_bridges_probe_via_http(Config) ->
|
||||||
|
BridgeType = ?config(cassa_bridge_type, Config),
|
||||||
|
Name = ?config(cassa_name, Config),
|
||||||
|
BridgeConfig0 = ?config(cassa_config, Config),
|
||||||
|
BridgeConfig = BridgeConfig0#{
|
||||||
|
<<"name">> => Name,
|
||||||
|
<<"type">> => BridgeType
|
||||||
|
},
|
||||||
|
?assertMatch(ok, bridges_probe_http(BridgeConfig)),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
t_create_disconnected(Config) ->
|
t_create_disconnected(Config) ->
|
||||||
ProxyPort = ?config(proxy_port, Config),
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
ProxyHost = ?config(proxy_host, Config),
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
|
|
@ -33,7 +33,7 @@
|
||||||
on_start/2,
|
on_start/2,
|
||||||
on_stop/2,
|
on_stop/2,
|
||||||
on_query/3,
|
on_query/3,
|
||||||
%% TODO: now_supported_now
|
%% TODO: not_supported_now
|
||||||
%%on_batch_query/3,
|
%%on_batch_query/3,
|
||||||
on_get_status/2
|
on_get_status/2
|
||||||
]).
|
]).
|
||||||
|
@ -41,7 +41,7 @@
|
||||||
%% callbacks of ecpool
|
%% callbacks of ecpool
|
||||||
-export([
|
-export([
|
||||||
connect/1,
|
connect/1,
|
||||||
prepare_sql_to_conn/2
|
prepare_cql_to_conn/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% callbacks for query executing
|
%% callbacks for query executing
|
||||||
|
@ -55,7 +55,7 @@
|
||||||
-type state() ::
|
-type state() ::
|
||||||
#{
|
#{
|
||||||
poolname := atom(),
|
poolname := atom(),
|
||||||
prepare_sql := prepares(),
|
prepare_cql := prepares(),
|
||||||
params_tokens := params_tokens(),
|
params_tokens := params_tokens(),
|
||||||
%% returned by ecql:prepare/2
|
%% returned by ecql:prepare/2
|
||||||
prepare_statement := binary()
|
prepare_statement := binary()
|
||||||
|
@ -109,9 +109,6 @@ on_start(
|
||||||
ssl := SSL
|
ssl := SSL
|
||||||
} = Config
|
} = Config
|
||||||
) ->
|
) ->
|
||||||
{ok, _} = application:ensure_all_started(ecpool),
|
|
||||||
{ok, _} = application:ensure_all_started(ecql),
|
|
||||||
|
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{
|
||||||
msg => "starting_cassandra_connector",
|
msg => "starting_cassandra_connector",
|
||||||
connector => InstId,
|
connector => InstId,
|
||||||
|
@ -139,7 +136,7 @@ on_start(
|
||||||
end,
|
end,
|
||||||
|
|
||||||
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
||||||
Prepares = parse_prepare_sql(Config),
|
Prepares = parse_prepare_cql(Config),
|
||||||
InitState = #{poolname => PoolName, prepare_statement => #{}},
|
InitState = #{poolname => PoolName, prepare_statement => #{}},
|
||||||
State = maps:merge(InitState, Prepares),
|
State = maps:merge(InitState, Prepares),
|
||||||
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
|
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
|
||||||
|
@ -177,30 +174,30 @@ on_query(
|
||||||
Request,
|
Request,
|
||||||
#{poolname := PoolName} = State
|
#{poolname := PoolName} = State
|
||||||
) ->
|
) ->
|
||||||
{Type, PreparedKeyOrSQL, Params} = parse_request_to_sql(Request),
|
{Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request),
|
||||||
?tp(
|
?tp(
|
||||||
debug,
|
debug,
|
||||||
cassandra_connector_received_sql_query,
|
cassandra_connector_received_cql_query,
|
||||||
#{
|
#{
|
||||||
connector => InstId,
|
connector => InstId,
|
||||||
type => Type,
|
type => Type,
|
||||||
params => Params,
|
params => Params,
|
||||||
prepared_key_or_sql => PreparedKeyOrSQL,
|
prepared_key_or_cql => PreparedKeyOrSQL,
|
||||||
state => State
|
state => State
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
{PreparedKeyOrSQL1, Data} = proc_sql_params(Type, PreparedKeyOrSQL, Params, State),
|
{PreparedKeyOrSQL1, Data} = proc_cql_params(Type, PreparedKeyOrSQL, Params, State),
|
||||||
Res = exec_sql_query(InstId, PoolName, Type, PreparedKeyOrSQL1, Data),
|
Res = exec_cql_query(InstId, PoolName, Type, PreparedKeyOrSQL1, Data),
|
||||||
handle_result(Res).
|
handle_result(Res).
|
||||||
|
|
||||||
parse_request_to_sql({send_message, Params}) ->
|
parse_request_to_cql({send_message, Params}) ->
|
||||||
{prepared_query, _Key = send_message, Params};
|
{prepared_query, _Key = send_message, Params};
|
||||||
parse_request_to_sql({query, SQL}) ->
|
parse_request_to_cql({query, SQL}) ->
|
||||||
parse_request_to_sql({query, SQL, #{}});
|
parse_request_to_cql({query, SQL, #{}});
|
||||||
parse_request_to_sql({query, SQL, Params}) ->
|
parse_request_to_cql({query, SQL, Params}) ->
|
||||||
{query, SQL, Params}.
|
{query, SQL, Params}.
|
||||||
|
|
||||||
proc_sql_params(
|
proc_cql_params(
|
||||||
prepared_query,
|
prepared_query,
|
||||||
PreparedKey0,
|
PreparedKey0,
|
||||||
Params,
|
Params,
|
||||||
|
@ -209,11 +206,11 @@ proc_sql_params(
|
||||||
PreparedKey = maps:get(PreparedKey0, Prepares),
|
PreparedKey = maps:get(PreparedKey0, Prepares),
|
||||||
Tokens = maps:get(PreparedKey0, ParamsTokens),
|
Tokens = maps:get(PreparedKey0, ParamsTokens),
|
||||||
{PreparedKey, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))};
|
{PreparedKey, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))};
|
||||||
proc_sql_params(query, SQL, Params, _State) ->
|
proc_cql_params(query, SQL, Params, _State) ->
|
||||||
{SQL1, Tokens} = emqx_plugin_libs_rule:preproc_sql(SQL, '?'),
|
{SQL1, Tokens} = emqx_plugin_libs_rule:preproc_sql(SQL, '?'),
|
||||||
{SQL1, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}.
|
{SQL1, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}.
|
||||||
|
|
||||||
exec_sql_query(InstId, PoolName, Type, PreparedKey, Data) when
|
exec_cql_query(InstId, PoolName, Type, PreparedKey, Data) when
|
||||||
Type == query; Type == prepared_query
|
Type == query; Type == prepared_query
|
||||||
->
|
->
|
||||||
case ecpool:pick_and_do(PoolName, {?MODULE, Type, [PreparedKey, Data]}, no_handover) of
|
case ecpool:pick_and_do(PoolName, {?MODULE, Type, [PreparedKey, Data]}, no_handover) of
|
||||||
|
@ -239,7 +236,7 @@ on_get_status(_InstId, #{poolname := Pool} = State) ->
|
||||||
%% return new state with prepared statements
|
%% return new state with prepared statements
|
||||||
{connected, NState};
|
{connected, NState};
|
||||||
false ->
|
false ->
|
||||||
%% do not log error, it is logged in prepare_sql_to_conn
|
%% do not log error, it is logged in prepare_cql_to_conn
|
||||||
connecting
|
connecting
|
||||||
end;
|
end;
|
||||||
false ->
|
false ->
|
||||||
|
@ -249,14 +246,14 @@ on_get_status(_InstId, #{poolname := Pool} = State) ->
|
||||||
do_get_status(Conn) ->
|
do_get_status(Conn) ->
|
||||||
ok == element(1, ecql:query(Conn, "SELECT count(1) AS T FROM system.local")).
|
ok == element(1, ecql:query(Conn, "SELECT count(1) AS T FROM system.local")).
|
||||||
|
|
||||||
do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
|
do_check_prepares(#{prepare_cql := Prepares}) when is_map(Prepares) ->
|
||||||
ok;
|
ok;
|
||||||
do_check_prepares(State = #{poolname := PoolName, prepare_sql := {error, Prepares}}) ->
|
do_check_prepares(State = #{poolname := PoolName, prepare_cql := {error, Prepares}}) ->
|
||||||
%% retry to prepare
|
%% retry to prepare
|
||||||
case prepare_sql(Prepares, PoolName) of
|
case prepare_cql(Prepares, PoolName) of
|
||||||
{ok, Sts} ->
|
{ok, Sts} ->
|
||||||
%% remove the error
|
%% remove the error
|
||||||
{ok, State#{prepare_sql => Prepares, prepare_statement := Sts}};
|
{ok, State#{prepare_cql => Prepares, prepare_statement := Sts}};
|
||||||
_Error ->
|
_Error ->
|
||||||
false
|
false
|
||||||
end.
|
end.
|
||||||
|
@ -295,83 +292,83 @@ conn_opts([Opt | Opts], Acc) ->
|
||||||
%% prepare
|
%% prepare
|
||||||
|
|
||||||
%% XXX: hardcode
|
%% XXX: hardcode
|
||||||
%% note: the `sql` param is passed by emqx_ee_bridge_cassa
|
%% note: the `cql` param is passed by emqx_ee_bridge_cassa
|
||||||
parse_prepare_sql(#{sql := SQL}) ->
|
parse_prepare_cql(#{cql := SQL}) ->
|
||||||
parse_prepare_sql([{send_message, SQL}], #{}, #{});
|
parse_prepare_cql([{send_message, SQL}], #{}, #{});
|
||||||
parse_prepare_sql(_) ->
|
parse_prepare_cql(_) ->
|
||||||
#{prepare_sql => #{}, params_tokens => #{}}.
|
#{prepare_cql => #{}, params_tokens => #{}}.
|
||||||
|
|
||||||
parse_prepare_sql([{Key, H} | T], Prepares, Tokens) ->
|
parse_prepare_cql([{Key, H} | T], Prepares, Tokens) ->
|
||||||
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '?'),
|
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '?'),
|
||||||
parse_prepare_sql(
|
parse_prepare_cql(
|
||||||
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
|
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
|
||||||
);
|
);
|
||||||
parse_prepare_sql([], Prepares, Tokens) ->
|
parse_prepare_cql([], Prepares, Tokens) ->
|
||||||
#{
|
#{
|
||||||
prepare_sql => Prepares,
|
prepare_cql => Prepares,
|
||||||
params_tokens => Tokens
|
params_tokens => Tokens
|
||||||
}.
|
}.
|
||||||
|
|
||||||
init_prepare(State = #{prepare_sql := Prepares, poolname := PoolName}) ->
|
init_prepare(State = #{prepare_cql := Prepares, poolname := PoolName}) ->
|
||||||
case maps:size(Prepares) of
|
case maps:size(Prepares) of
|
||||||
0 ->
|
0 ->
|
||||||
State;
|
State;
|
||||||
_ ->
|
_ ->
|
||||||
case prepare_sql(Prepares, PoolName) of
|
case prepare_cql(Prepares, PoolName) of
|
||||||
{ok, Sts} ->
|
{ok, Sts} ->
|
||||||
State#{prepare_statement := Sts};
|
State#{prepare_statement := Sts};
|
||||||
Error ->
|
Error ->
|
||||||
?tp(
|
?tp(
|
||||||
error,
|
error,
|
||||||
cassandra_prepare_sql_failed,
|
cassandra_prepare_cql_failed,
|
||||||
#{prepares => Prepares, reason => Error}
|
#{prepares => Prepares, reason => Error}
|
||||||
),
|
),
|
||||||
%% mark the prepare_sqlas failed
|
%% mark the prepare_cql as failed
|
||||||
State#{prepare_sql => {error, Prepares}}
|
State#{prepare_cql => {error, Prepares}}
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
|
prepare_cql(Prepares, PoolName) when is_map(Prepares) ->
|
||||||
prepare_sql(maps:to_list(Prepares), PoolName);
|
prepare_cql(maps:to_list(Prepares), PoolName);
|
||||||
prepare_sql(Prepares, PoolName) ->
|
prepare_cql(Prepares, PoolName) ->
|
||||||
case do_prepare_sql(Prepares, PoolName) of
|
case do_prepare_cql(Prepares, PoolName) of
|
||||||
{ok, _Sts} = Ok ->
|
{ok, _Sts} = Ok ->
|
||||||
%% prepare for reconnect
|
%% prepare for reconnect
|
||||||
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
|
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_cql_to_conn, [Prepares]}),
|
||||||
Ok;
|
Ok;
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_prepare_sql(Prepares, PoolName) ->
|
do_prepare_cql(Prepares, PoolName) ->
|
||||||
do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}).
|
do_prepare_cql(ecpool:workers(PoolName), Prepares, PoolName, #{}).
|
||||||
|
|
||||||
do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) ->
|
do_prepare_cql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) ->
|
||||||
{ok, Conn} = ecpool_worker:client(Worker),
|
{ok, Conn} = ecpool_worker:client(Worker),
|
||||||
case prepare_sql_to_conn(Conn, Prepares) of
|
case prepare_cql_to_conn(Conn, Prepares) of
|
||||||
{ok, Sts} ->
|
{ok, Sts} ->
|
||||||
do_prepare_sql(T, Prepares, PoolName, Sts);
|
do_prepare_cql(T, Prepares, PoolName, Sts);
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
end;
|
end;
|
||||||
do_prepare_sql([], _Prepares, _PoolName, LastSts) ->
|
do_prepare_cql([], _Prepares, _PoolName, LastSts) ->
|
||||||
{ok, LastSts}.
|
{ok, LastSts}.
|
||||||
|
|
||||||
prepare_sql_to_conn(Conn, Prepares) ->
|
prepare_cql_to_conn(Conn, Prepares) ->
|
||||||
prepare_sql_to_conn(Conn, Prepares, #{}).
|
prepare_cql_to_conn(Conn, Prepares, #{}).
|
||||||
|
|
||||||
prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements};
|
prepare_cql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements};
|
||||||
prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) ->
|
prepare_cql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) ->
|
||||||
?SLOG(info, #{msg => "cassandra_prepare_sql", name => Key, prepare_sql => SQL}),
|
?SLOG(info, #{msg => "cassandra_prepare_cql", name => Key, prepare_cql => SQL}),
|
||||||
case ecql:prepare(Conn, Key, SQL) of
|
case ecql:prepare(Conn, Key, SQL) of
|
||||||
{ok, Statement} ->
|
{ok, Statement} ->
|
||||||
prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement});
|
prepare_cql_to_conn(Conn, PrepareList, Statements#{Key => Statement});
|
||||||
{error, Error} = Other ->
|
{error, Error} = Other ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "cassandra_prepare_sql_failed",
|
msg => "cassandra_prepare_cql_failed",
|
||||||
worker_pid => Conn,
|
worker_pid => Conn,
|
||||||
name => Key,
|
name => Key,
|
||||||
prepare_sql => SQL,
|
prepare_cql => SQL,
|
||||||
error => Error
|
error => Error
|
||||||
}),
|
}),
|
||||||
Other
|
Other
|
||||||
|
@ -394,19 +391,19 @@ assign_type_for_params(Params) ->
|
||||||
assign_type_for_params([], Acc) ->
|
assign_type_for_params([], Acc) ->
|
||||||
lists:reverse(Acc);
|
lists:reverse(Acc);
|
||||||
assign_type_for_params([Param | More], Acc) ->
|
assign_type_for_params([Param | More], Acc) ->
|
||||||
assign_type_for_params(More, [may_assign_type(Param) | Acc]).
|
assign_type_for_params(More, [maybe_assign_type(Param) | Acc]).
|
||||||
|
|
||||||
may_assign_type(true) ->
|
maybe_assign_type(true) ->
|
||||||
{int, 1};
|
{int, 1};
|
||||||
may_assign_type(false) ->
|
maybe_assign_type(false) ->
|
||||||
{int, 0};
|
{int, 0};
|
||||||
may_assign_type(V) when is_binary(V); is_list(V); is_atom(V) -> V;
|
maybe_assign_type(V) when is_binary(V); is_list(V); is_atom(V) -> V;
|
||||||
may_assign_type(V) when is_integer(V) ->
|
maybe_assign_type(V) when is_integer(V) ->
|
||||||
%% The max value of signed int(4) is 2147483647
|
%% The max value of signed int(4) is 2147483647
|
||||||
case V > 2147483647 orelse V < -2147483647 of
|
case V > 2147483647 orelse V < -2147483647 of
|
||||||
true -> {bigint, V};
|
true -> {bigint, V};
|
||||||
false -> {int, V}
|
false -> {int, V}
|
||||||
end;
|
end;
|
||||||
may_assign_type(V) when is_float(V) -> {double, V};
|
maybe_assign_type(V) when is_float(V) -> {double, V};
|
||||||
may_assign_type(V) ->
|
maybe_assign_type(V) ->
|
||||||
V.
|
V.
|
||||||
|
|
|
@ -85,7 +85,8 @@ init_per_suite(Config) ->
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
||||||
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
||||||
_ = application:stop(emqx_connector).
|
_ = application:stop(emqx_connector),
|
||||||
|
_ = application:stop(emqx_ee_connector).
|
||||||
|
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
Loading…
Reference in New Issue