diff --git a/apps/emqx_bridge_cassandra/rebar.config b/apps/emqx_bridge_cassandra/rebar.config index 04ee603fa..e98146d78 100644 --- a/apps/emqx_bridge_cassandra/rebar.config +++ b/apps/emqx_bridge_cassandra/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.6.0"}}}, + {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.6.1"}}}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index 3b30f1d26..872ccb532 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -278,10 +278,24 @@ proc_cql_params(prepared_query, ChannId, Params, #{channels := Channs}) -> params_tokens := ParamsTokens } } = maps:get(ChannId, Channs), - {PrepareKey, assign_type_for_params(emqx_placeholder:proc_sql(ParamsTokens, Params))}; + {PrepareKey, assign_type_for_params(proc_sql(ParamsTokens, Params))}; proc_cql_params(query, CQL, Params, _State) -> {CQL1, Tokens} = emqx_placeholder:preproc_sql(CQL, '?'), - {CQL1, assign_type_for_params(emqx_placeholder:proc_sql(Tokens, Params))}. + {CQL1, assign_type_for_params(proc_sql(Tokens, Params))}. + +proc_sql(Tokens, Params) -> + VarTrans = fun + (null) -> null; + (X) -> emqx_placeholder:sql_data(X) + end, + emqx_placeholder:proc_tmpl( + Tokens, + Params, + #{ + return => rawlist, + var_trans => VarTrans + } + ). exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when Type == query; Type == prepared_query diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index 09deaa699..18d6993b3 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -19,6 +19,8 @@ %% ./rebar3 ct --name 'test@127.0.0.1' -v --suite \ %% apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE +-import(emqx_common_test_helpers, [on_exit/1]). + % SQL definitions -define(SQL_BRIDGE, "insert into mqtt_msg_test(topic, payload, arrived) " @@ -129,12 +131,15 @@ init_per_group(_Group, Config) -> Config. end_per_group(Group, Config) when - Group == without_batch; Group == without_batch + Group == with_batch; + Group == without_batch -> connect_and_drop_table(Config), ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), ok; end_per_group(_Group, _Config) -> ok. @@ -160,6 +165,7 @@ end_per_testcase(_Testcase, Config) -> ok = snabbkaffe:stop(), connect_and_clear_table(Config), delete_bridge(Config), + emqx_common_test_helpers:call_janitor(), ok. %%------------------------------------------------------------------------------ @@ -177,19 +183,32 @@ common_init(Config0) -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - % Ensure EE bridge module is loaded - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), - _ = emqx_bridge_enterprise:module_info(), - emqx_mgmt_api_test_util:init_suite(), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_bridge_cassandra, + emqx_bridge, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config0)} + ), + {ok, _Api} = emqx_common_test_http:create_default_app(), % Connect to cassnadra directly and create the table catch connect_and_drop_table(Config0), connect_and_create_table(Config0), {Name, CassaConf} = cassa_config(BridgeType, Config0), Config = [ + {apps, Apps}, {cassa_config, CassaConf}, {cassa_bridge_type, BridgeType}, {cassa_name, Name}, + {bridge_type, BridgeType}, + {bridge_name, Name}, + {bridge_config, CassaConf}, {proxy_host, ProxyHost}, {proxy_port, ProxyPort} | Config0 @@ -360,13 +379,19 @@ connect_direct_cassa(Config) -> % These funs connect and then stop the cassandra connection connect_and_create_table(Config) -> + connect_and_create_table(Config, ?SQL_CREATE_TABLE). + +connect_and_create_table(Config, SQL) -> with_direct_conn(Config, fun(Conn) -> - {ok, _} = ecql:query(Conn, ?SQL_CREATE_TABLE) + {ok, _} = ecql:query(Conn, SQL) end). connect_and_drop_table(Config) -> + connect_and_drop_table(Config, ?SQL_DROP_TABLE). + +connect_and_drop_table(Config, SQL) -> with_direct_conn(Config, fun(Conn) -> - {ok, _} = ecql:query(Conn, ?SQL_DROP_TABLE) + {ok, _} = ecql:query(Conn, SQL) end). connect_and_clear_table(Config) -> @@ -375,8 +400,11 @@ connect_and_clear_table(Config) -> end). connect_and_get_payload(Config) -> + connect_and_get_payload(Config, ?SQL_SELECT). + +connect_and_get_payload(Config, SQL) -> with_direct_conn(Config, fun(Conn) -> - {ok, {_Keyspace, _ColsSpec, [[Result]]}} = ecql:query(Conn, ?SQL_SELECT), + {ok, {_Keyspace, _ColsSpec, [[Result]]}} = ecql:query(Conn, SQL), Result end). @@ -685,3 +713,48 @@ t_nasty_sql_string(Config) -> %% XXX: why ok instead of {ok, AffectedLines}? ?assertEqual(ok, send_message(Config, Message)), ?assertEqual(Payload, connect_and_get_payload(Config)). + +t_insert_null_into_int_column(Config) -> + BridgeType = ?config(bridge_type, Config), + connect_and_create_table( + Config, + << + "CREATE TABLE mqtt.mqtt_msg_test2 (\n" + " topic text,\n" + " payload text,\n" + " arrived timestamp,\n" + " x int,\n" + " PRIMARY KEY (topic)\n" + ")" + >> + ), + on_exit(fun() -> connect_and_drop_table(Config, "DROP TABLE mqtt.mqtt_msg_test2") end), + {ok, {{_, 201, _}, _, _}} = + emqx_bridge_testlib:create_bridge_api( + Config, + #{ + <<"cql">> => << + "insert into mqtt_msg_test2(topic, payload, x, arrived) " + "values (${topic}, ${payload}, ${x}, ${timestamp})" + >> + } + ), + RuleTopic = <<"t/c">>, + Opts = #{ + sql => <<"select *, first(jq('null', payload)) as x from \"", RuleTopic/binary, "\"">> + }, + {ok, _} = emqx_bridge_testlib:create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts), + + Payload = <<"{}">>, + Msg = emqx_message:make(RuleTopic, Payload), + {_, {ok, _}} = + ?wait_async_action( + emqx:publish(Msg), + #{?snk_kind := cassandra_connector_query_return}, + 10_000 + ), + + %% Would return `1853189228' if it encodes `null' as an integer... + ?assertEqual(null, connect_and_get_payload(Config, "select x from mqtt.mqtt_msg_test2")), + + ok. diff --git a/changes/ee/fix-12411.en.md b/changes/ee/fix-12411.en.md new file mode 100644 index 000000000..7dead2ed7 --- /dev/null +++ b/changes/ee/fix-12411.en.md @@ -0,0 +1 @@ +Fixed a bug where `null` values would be inserted as `1853189228` in `int` columns in Cassandra data integration.