diff --git a/apps/emqx_connector/src/emqx_connector_cassa.erl b/apps/emqx_connector/src/emqx_connector_cassa.erl new file mode 100644 index 000000000..321d285e6 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_cassa.erl @@ -0,0 +1,468 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_connector_cass). + +-include("emqx_connector.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +%% FIXME +%-include_lib("epgsql/include/epgsql.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-export([roots/0, fields/1]). + +-behaviour(emqx_resource). + +%% callbacks of behaviour emqx_resource +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_get_status/2 +]). + +-export([connect/1]). + +-export([ + query/3, + prepared_query/3, + execute_batch/3 +]). + +-export([do_get_status/1]). + +-define(CASSA_HOST_OPTIONS, #{ + default_port => 9042 +}). + +-type prepares() :: #{atom() => binary()}. +-type params_tokens() :: #{atom() => list()}. + +-type state() :: + #{ + poolname := atom(), + prepare_sql := prepares(), + params_tokens := params_tokens(), + prepare_statement := epgsql:statement() + }. + +%%==================================================================== + +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields(config) -> + cassandra_db_fields() ++ + emqx_connector_schema_lib:ssl_fields() ++ + emqx_connector_schema_lib:prepare_statement_fields(). + +cassandra_db_fields() -> + [ + {nodes, fun nodes/1}, + {keyspace, fun keyspace/1}, + {pool_size, fun emqx_connector_schema_lib:pool_size/1}, + {username, fun emqx_connector_schema_lib:username/1}, + {password, fun emqx_connector_schema_lib:password/1}, + {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} + ]. + +nodes(type) -> binary(); +nodes(desc) -> ?DESC("nodes"); +nodes(required) -> true; +nodes(converter) -> fun convert_nodes/2; +nodes(_) -> undefined. + +convert_nodes(Nodes0) when is_binary(Nodes0) -> + Nodes = binary_to_list(Nodes0), + [ + begin + [Host, Port] = string:tokens(S, ":"), + {Host, list_to_integer(Port)} + end + || S <- string:tokens(Nodes, ",") + ]. + +keyspace(type) -> binary(); +keyspace(desc) -> ?DESC("keyspace"); +keyspace(required) -> true; +keyspace(_) -> undefined. + +%%==================================================================== + +callback_mode() -> always_sync. + +-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}. +on_start( + InstId, + #{ + nodes := Nodes, + keyspace := Keyspace, + username := Username, + pool_size := PoolSize, + ssl := SSL + } = Config +) -> + {ok, _} = application:ensure_all_started(ecpool), + {ok, _} = application:ensure_all_started(ecql), + + ?SLOG(info, #{ + msg => "starting_cassandra_connector", + connector => InstId, + config => emqx_misc:redact(Config) + }), + + Options = [ + {nodes, Nodes}, + {username, Username}, + {password, emqx_secret:wrap(maps:get(password, Config, ""))}, + {keyspace, Keyspace}, + {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, + {pool_size, PoolSize} + ], + + SslOpts = + case maps:get(enable, SSL) of + true -> + [ + %% note: this is converted to `required' in + %% `conn_opts/2', and there's a boolean guard + %% there; if this is set to `required' here, + %% that'll require changing `conn_opts/2''s guard. + {ssl, true}, + {ssl_opts, emqx_tls_lib:to_client_opts(SSL)} + ]; + false -> + [{ssl, false}] + end, + + PoolName = emqx_plugin_libs_pool:pool_name(InstId), + Prepares = parse_prepare_sql(Config), + InitState = #{poolname => PoolName, prepare_statement => #{}}, + State = maps:merge(InitState, Prepares), + case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of + ok -> + {ok, init_prepare(State)}; + {error, Reason} -> + ?tp( + cassandra_connector_start_failed, + #{error => Reason} + ), + {error, Reason} + end. + +on_stop(InstId, #{poolname := PoolName}) -> + ?SLOG(info, #{ + msg => "stopping_cassandra_connector", + connector => InstId + }), + emqx_plugin_libs_pool:stop_pool(PoolName). + +on_query(InstId, {TypeOrKey, NameOrSQL}, #{poolname := _PoolName} = State) -> + on_query(InstId, {TypeOrKey, NameOrSQL, []}, State); +on_query( + InstId, + {TypeOrKey, NameOrSQL, Params}, + #{poolname := PoolName} = State +) -> + ?SLOG(debug, #{ + msg => "postgresql connector received sql query", + connector => InstId, + type => TypeOrKey, + sql => NameOrSQL, + state => State + }), + Type = pgsql_query_type(TypeOrKey), + {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), + Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data), + handle_result(Res). + +pgsql_query_type(sql) -> + query; +pgsql_query_type(query) -> + query; +pgsql_query_type(prepared_query) -> + prepared_query; +%% for bridge +pgsql_query_type(_) -> + pgsql_query_type(prepared_query). + +on_batch_query( + InstId, + BatchReq, + #{poolname := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State +) -> + case BatchReq of + [{Key, _} = Request | _] -> + BinKey = to_bin(Key), + case maps:get(BinKey, Tokens, undefined) of + undefined -> + Log = #{ + connector => InstId, + first_request => Request, + state => State, + msg => "batch prepare not implemented" + }, + ?SLOG(error, Log), + {error, {unrecoverable_error, batch_prepare_not_implemented}}; + TokenList -> + {_, Datas} = lists:unzip(BatchReq), + Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], + St = maps:get(BinKey, Sts), + case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of + {error, _Error} = Result -> + handle_result(Result); + {_Column, Results} -> + handle_batch_result(Results, 0) + end + end; + _ -> + Log = #{ + connector => InstId, + request => BatchReq, + state => State, + msg => "invalid request" + }, + ?SLOG(error, Log), + {error, {unrecoverable_error, invalid_request}} + end. + +proc_sql_params(query, SQLOrKey, Params, _State) -> + {SQLOrKey, Params}; +proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> + {SQLOrKey, Params}; +proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) -> + Key = to_bin(TypeOrKey), + case maps:get(Key, ParamsTokens, undefined) of + undefined -> + {SQLOrData, Params}; + Tokens -> + {Key, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} + end. + +on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> + try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of + {error, Reason} = Result -> + ?tp( + pgsql_connector_query_return, + #{error => Reason} + ), + ?SLOG(error, #{ + msg => "postgresql connector do sql query failed", + connector => InstId, + type => Type, + sql => NameOrSQL, + reason => Reason + }), + Result; + Result -> + ?tp( + pgsql_connector_query_return, + #{result => Result} + ), + Result + catch + error:function_clause:Stacktrace -> + ?SLOG(error, #{ + msg => "postgresql connector do sql query failed", + connector => InstId, + type => Type, + sql => NameOrSQL, + reason => function_clause, + stacktrace => Stacktrace + }), + {error, {unrecoverable_error, invalid_request}} + end. + +on_get_status(_InstId, #{poolname := Pool} = State) -> + case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of + true -> + case do_check_prepares(State) of + ok -> + connected; + {ok, NState} -> + %% return new state with prepared statements + {connected, NState}; + false -> + %% do not log error, it is logged in prepare_sql_to_conn + connecting + end; + false -> + connecting + end. + +do_get_status(Conn) -> + ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")). + +do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> + ok; +do_check_prepares(State = #{poolname := PoolName, prepare_sql := {error, Prepares}}) -> + %% retry to prepare + case prepare_sql(Prepares, PoolName) of + {ok, Sts} -> + %% remove the error + {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}; + _Error -> + false + end. + +%% =================================================================== + +connect(Opts) -> + Host = proplists:get_value(host, Opts), + Username = proplists:get_value(username, Opts), + Password = emqx_secret:unwrap(proplists:get_value(password, Opts)), + case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of + {ok, _Conn} = Ok -> + Ok; + {error, Reason} -> + {error, Reason} + end. + +query(Conn, SQL, Params) -> + epgsql:equery(Conn, SQL, Params). + +prepared_query(Conn, Name, Params) -> + epgsql:prepared_query2(Conn, Name, Params). + +execute_batch(Conn, Statement, Params) -> + epgsql:execute_batch(Conn, Statement, Params). + +conn_opts(Opts) -> + conn_opts(Opts, []). +conn_opts([], Acc) -> + Acc; +conn_opts([Opt = {database, _} | Opts], Acc) -> + conn_opts(Opts, [Opt | Acc]); +conn_opts([{ssl, Bool} | Opts], Acc) when is_boolean(Bool) -> + Flag = + case Bool of + true -> required; + false -> false + end, + conn_opts(Opts, [{ssl, Flag} | Acc]); +conn_opts([Opt = {port, _} | Opts], Acc) -> + conn_opts(Opts, [Opt | Acc]); +conn_opts([Opt = {timeout, _} | Opts], Acc) -> + conn_opts(Opts, [Opt | Acc]); +conn_opts([Opt = {ssl_opts, _} | Opts], Acc) -> + conn_opts(Opts, [Opt | Acc]); +conn_opts([_Opt | Opts], Acc) -> + conn_opts(Opts, Acc). + +parse_prepare_sql(Config) -> + SQL = + case maps:get(prepare_statement, Config, undefined) of + undefined -> + case maps:get(sql, Config, undefined) of + undefined -> #{}; + Template -> #{<<"send_message">> => Template} + end; + Any -> + Any + end, + parse_prepare_sql(maps:to_list(SQL), #{}, #{}). + +parse_prepare_sql([{Key, H} | T], Prepares, Tokens) -> + {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, '$n'), + parse_prepare_sql( + T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} + ); +parse_prepare_sql([], Prepares, Tokens) -> + #{ + prepare_sql => Prepares, + params_tokens => Tokens + }. + +init_prepare(State = #{prepare_sql := Prepares, poolname := PoolName}) -> + case maps:size(Prepares) of + 0 -> + State; + _ -> + case prepare_sql(Prepares, PoolName) of + {ok, Sts} -> + State#{prepare_statement := Sts}; + Error -> + LogMeta = #{ + msg => <<"PostgreSQL init prepare statement failed">>, error => Error + }, + ?SLOG(error, LogMeta), + %% mark the prepare_sqlas failed + State#{prepare_sql => {error, Prepares}} + end + end. + +prepare_sql(Prepares, PoolName) when is_map(Prepares) -> + prepare_sql(maps:to_list(Prepares), PoolName); +prepare_sql(Prepares, PoolName) -> + case do_prepare_sql(Prepares, PoolName) of + {ok, _Sts} = Ok -> + %% prepare for reconnect + ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), + Ok; + Error -> + Error + end. + +do_prepare_sql(Prepares, PoolName) -> + do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}). + +do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) -> + {ok, Conn} = ecpool_worker:client(Worker), + case prepare_sql_to_conn(Conn, Prepares) of + {ok, Sts} -> + do_prepare_sql(T, Prepares, PoolName, Sts); + Error -> + Error + end; +do_prepare_sql([], _Prepares, _PoolName, LastSts) -> + {ok, LastSts}. + +prepare_sql_to_conn(Conn, Prepares) -> + prepare_sql_to_conn(Conn, Prepares, #{}). + +prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements}; +prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) -> + LogMeta = #{msg => "PostgreSQL Prepare Statement", name => Key, prepare_sql => SQL}, + ?SLOG(info, LogMeta), + case epgsql:parse2(Conn, Key, SQL, []) of + {ok, Statement} -> + prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => Statement}); + {error, Error} = Other -> + ?SLOG(error, LogMeta#{msg => "PostgreSQL parse failed", error => Error}), + Other + end. + +to_bin(Bin) when is_binary(Bin) -> + Bin; +to_bin(Atom) when is_atom(Atom) -> + erlang:atom_to_binary(Atom). + +handle_result({error, disconnected}) -> + {error, {recoverable_error, disconnected}}; +handle_result({error, Error}) -> + {error, {unrecoverable_error, Error}}; +handle_result(Res) -> + Res. + +handle_batch_result([{ok, Count} | Rest], Acc) -> + handle_batch_result(Rest, Acc + Count); +handle_batch_result([{error, Error} | _Rest], _Acc) -> + {error, {unrecoverable_error, Error}}; +handle_batch_result([], Acc) -> + {ok, Acc}. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl new file mode 100644 index 000000000..2509ba77d --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl @@ -0,0 +1,129 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_cassa). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1, + values/2, + fields/2 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(DEFAULT_SQL, << + "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) " + "values (${id}, ${topic}, ${qos}, ${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))" +>>). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_examples(Method) -> + [ + #{ + <<"cassa">> => #{ + summary => <<"Cassandra Bridge">>, + value => values(Method, cassa) + } + } + ]. + +values(get, Type) -> + maps:merge(values(post, Type), ?METRICS_EXAMPLE); +values(post, Type) -> + #{ + enable => true, + type => Type, + name => <<"foo">>, + server => <<"127.0.0.1:5432">>, + database => <<"mqtt">>, + pool_size => 8, + username => <<"root">>, + password => <<"public">>, + sql => ?DEFAULT_SQL, + local_topic => <<"local/topic/#">>, + resource_opts => #{ + worker_pool_size => 8, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => async, + max_queue_bytes => ?DEFAULT_QUEUE_SIZE + } + }; +values(put, Type) -> + values(post, Type). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_cassa". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {sql, + mk( + binary(), + #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + )}, + {local_topic, + mk( + binary(), + #{desc => ?DESC("local_topic"), default => undefined} + )}, + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ] ++ + (emqx_connector_cassa:fields(config) -- + emqx_connector_schema_lib:prepare_statement_fields()); +fields("creation_opts") -> + emqx_resource_schema:fields("creation_opts_sync_only"); +fields("post") -> + fields("post", cassa); +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +fields("post", Type) -> + [type_field(Type), name_field() | fields("config")]. + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Cassandra using `", string:to_upper(Method), "` method."]; +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- + +type_field(Type) -> + {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/scripts/check-elixir-applications.exs b/scripts/check-elixir-applications.exs index 42c838199..1e604c69f 100755 --- a/scripts/check-elixir-applications.exs +++ b/scripts/check-elixir-applications.exs @@ -1,4 +1,4 @@ -#!/usr/bin/env elixir +#! /usr/bin/env elixir defmodule CheckElixirApplications do alias EMQXUmbrella.MixProject diff --git a/scripts/check-elixir-deps-discrepancies.exs b/scripts/check-elixir-deps-discrepancies.exs index eee0a9e67..b27c6bfb4 100755 --- a/scripts/check-elixir-deps-discrepancies.exs +++ b/scripts/check-elixir-deps-discrepancies.exs @@ -1,4 +1,4 @@ -#!/usr/bin/env elixir +#! /usr/bin/env elixir # ensure we have a fresh rebar.lock diff --git a/scripts/check-elixir-emqx-machine-boot-discrepancies.exs b/scripts/check-elixir-emqx-machine-boot-discrepancies.exs index d07e6978f..9ffdc47bf 100755 --- a/scripts/check-elixir-emqx-machine-boot-discrepancies.exs +++ b/scripts/check-elixir-emqx-machine-boot-discrepancies.exs @@ -1,4 +1,4 @@ -#!/usr/bin/env elixir +#! /usr/bin/env elixir defmodule CheckElixirEMQXMachineBootDiscrepancies do alias EMQXUmbrella.MixProject