From 58419698775275da71fa16229ebb94aa21b4e3ec Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 20 Mar 2023 10:28:34 +0800 Subject: [PATCH] feat: implement Microsoft SQL Server bridge (e5.0) --- apps/emqx_bridge/src/emqx_bridge.erl | 3 +- changes/ee/feat-10363.en.md | 1 + lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl | 23 +- .../src/emqx_ee_bridge_sqlserver.erl | 128 +++++ .../src/emqx_ee_connector.app.src | 4 +- .../src/emqx_ee_connector_sqlserver.erl | 531 ++++++++++++++++++ rel/i18n/emqx_ee_bridge_sqlserver.hocon | 87 +++ rel/i18n/emqx_ee_connector_sqlserver.hocon | 22 + 8 files changed, 793 insertions(+), 6 deletions(-) create mode 100644 changes/ee/feat-10363.en.md create mode 100644 lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl create mode 100644 lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl create mode 100644 rel/i18n/emqx_ee_bridge_sqlserver.hocon create mode 100644 rel/i18n/emqx_ee_connector_sqlserver.hocon diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index bf91d20f7..5d1850ebd 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -69,7 +69,8 @@ T == tdengine; T == dynamo; T == rocketmq; - T == cassandra + T == cassandra; + T == sqlserver ). load() -> diff --git a/changes/ee/feat-10363.en.md b/changes/ee/feat-10363.en.md new file mode 100644 index 000000000..c3b53a538 --- /dev/null +++ b/changes/ee/feat-10363.en.md @@ -0,0 +1 @@ +Add MicroSoft SQL Server data Bridge support. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 84b0b98b0..3ad5cbbb4 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -34,7 +34,8 @@ api_schemas(Method) -> ref(emqx_ee_bridge_clickhouse, Method), ref(emqx_ee_bridge_dynamo, Method), ref(emqx_ee_bridge_rocketmq, Method), - ref(emqx_ee_bridge_cassa, Method) + ref(emqx_ee_bridge_cassa, Method), + ref(emqx_ee_bridge_sqlserver, Method) ]. schema_modules() -> @@ -53,7 +54,8 @@ schema_modules() -> emqx_ee_bridge_clickhouse, emqx_ee_bridge_dynamo, emqx_ee_bridge_rocketmq, - emqx_ee_bridge_cassa + emqx_ee_bridge_cassa, + emqx_ee_bridge_sqlserver ]. examples(Method) -> @@ -91,7 +93,8 @@ resource_type(tdengine) -> emqx_ee_connector_tdengine; resource_type(clickhouse) -> emqx_ee_connector_clickhouse; resource_type(dynamo) -> emqx_ee_connector_dynamo; resource_type(rocketmq) -> emqx_ee_connector_rocketmq; -resource_type(cassandra) -> emqx_ee_connector_cassa. +resource_type(cassandra) -> emqx_ee_connector_cassa; +resource_type(sqlserver) -> emqx_ee_connector_sqlserver. fields(bridges) -> [ @@ -152,7 +155,7 @@ fields(bridges) -> } )} ] ++ kafka_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ - pgsql_structs() ++ clickhouse_structs(). + pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs(). mongodb_structs() -> [ @@ -249,3 +252,15 @@ clickhouse_structs() -> } )} ]. + +sqlserver_structs() -> + [ + {sqlserver, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_sqlserver, "config")), + #{ + desc => <<"Microsoft SQL Server Bridge Config">>, + required => false + } + )} + ]. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl new file mode 100644 index 000000000..49a5ed0ce --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl @@ -0,0 +1,128 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_sqlserver). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(DEFAULT_SQL, << + "insert into t_mqtt_msg(msgid, topic, qos, payload)" + "values (${id}, ${topic}, ${qos}, ${payload})" +>>). + +-define(DEFAULT_DRIVER, <<"ms-sqlserver-18">>). + +conn_bridge_examples(Method) -> + [ + #{ + <<"sqlserver">> => #{ + summary => <<"Microsoft SQL Server Bridge">>, + value => values(Method) + } + } + ]. + +values(get) -> + values(post); +values(post) -> + #{ + enable => true, + type => sqlserver, + name => <<"bar">>, + server => <<"127.0.0.1:1433">>, + database => <<"test">>, + pool_size => 8, + username => <<"sa">>, + password => <<"******">>, + sql => ?DEFAULT_SQL, + driver => ?DEFAULT_DRIVER, + local_topic => <<"local/topic/#">>, + resource_opts => #{ + worker_pool_size => 1, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => async, + max_queue_bytes => ?DEFAULT_QUEUE_SIZE + } + }; +values(put) -> + values(post). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_sqlserver". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {sql, + mk( + binary(), + #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + )}, + {driver, mk(binary(), #{desc => ?DESC("driver"), default => ?DEFAULT_DRIVER})}, + {local_topic, + mk( + binary(), + #{desc => ?DESC("local_topic"), default => undefined} + )}, + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ] ++ + (emqx_ee_connector_sqlserver:fields(config) -- + emqx_connector_schema_lib:prepare_statement_fields()); +fields("creation_opts") -> + emqx_resource_schema:fields("creation_opts"); +fields("post") -> + fields("post", sqlserver); +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +fields("post", Type) -> + [type_field(Type), name_field() | fields("config")]. + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Microsoft SQL Server using `", string:to_upper(Method), "` method."]; +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- + +type_field(Type) -> + {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 2e6406d70..2e2ff2a62 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -5,13 +5,15 @@ {applications, [ kernel, stdlib, + ecpool, hstreamdb_erl, influxdb, tdengine, clickhouse, erlcloud, rocketmq, - ecql + ecql, + odbc ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl new file mode 100644 index 000000000..4cc92e80a --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl @@ -0,0 +1,531 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_connector_sqlserver). + +-behaviour(emqx_resource). + +-include_lib("kernel/include/file.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("emqx_ee_connector/include/emqx_ee_connector.hrl"). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%%==================================================================== +%% Exports +%%==================================================================== + +%% Hocon config schema exports +-export([ + roots/0, + fields/1 +]). + +%% callbacks for behaviour emqx_resource +-export([ + callback_mode/0, + is_buffer_supported/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_query_async/4, + on_batch_query_async/4, + on_get_status/2 +]). + +%% callbacks for ecpool +-export([connect/1]). + +%% Internal exports used to execute code with ecpool worker +-export([do_get_status/2, worker_do_insert/3, do_async_reply/2]). + +-import(emqx_plugin_libs_rule, [str/1]). +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-define(ACTION_SEND_MESSAGE, send_message). + +-define(SINGLE_INSERT, single_insert). + +-define(SYNC_QUERY_MODE, handover). +-define(ASYNC_QUERY_MODE(REPLY), {handover_async, {?MODULE, do_async_reply, [REPLY]}}). + +-define(SQLSERVER_HOST_OPTIONS, #{ + default_port => 1433 +}). + +-define(REQUEST_TIMEOUT(RESOURCE_OPTS), + maps:get(request_timeout, RESOURCE_OPTS, ?DEFAULT_REQUEST_TIMEOUT) +). + +-define(SINGLE_INSERT_TEMP, single_insert_temp). +-define(BATCH_INSERT_TEMP, batch_insert_temp). + +-define(BATCH_INSERT_PART, batch_insert_part). +-define(BATCH_PARAMS_TOKENS, batch_insert_tks). + +%% Copied from odbc reference page +%% https://www.erlang.org/doc/man/odbc.html + +-type bridge_id() :: binary(). +%% as returned by connect/2 +-type connection_reference() :: pid(). +-type time_out() :: milliseconds() | infinity. +-type sql() :: string() | binary(). +-type milliseconds() :: pos_integer(). +%% Tuple of column values e.g. one row of the result set. +%% it's a variable size tuple of column values. +-type row() :: tuple(). +%% Some kind of explanation of what went wrong +-type common_reason() :: connection_closed | extended_error() | term(). +%% extended error type with ODBC +%% and native database error codes, as well as the base reason that would have been +%% returned had extended_errors not been enabled. +-type extended_error() :: {string(), integer(), _Reason :: term()}. +%% Name of column in the result set +-type col_name() :: string(). +%% e.g. a list of the names of the selected columns in the result set. +-type col_names() :: [col_name()]. +%% A list of rows from the result set. +-type rows() :: list(row()). + +%% -type result_tuple() :: {updated, n_rows()} | {selected, col_names(), rows()}. +-type updated_tuple() :: {updated, n_rows()}. +-type selected_tuple() :: {selected, col_names(), rows()}. +%% The number of affected rows for UPDATE, +%% INSERT, or DELETE queries. For other query types the value +%% is driver defined, and hence should be ignored. +-type n_rows() :: integer(). + +%% These type was not used in this module, but we may use it later +%% -type odbc_data_type() :: +%% sql_integer +%% | sql_smallint +%% | sql_tinyint +%% | {sql_decimal, precision(), scale()} +%% | {sql_numeric, precision(), scale()} +%% | {sql_char, size()} +%% | {sql_wchar, size()} +%% | {sql_varchar, size()} +%% | {sql_wvarchar, size()} +%% | {sql_float, precision()} +%% | {sql_wlongvarchar, size()} +%% | {sql_float, precision()} +%% | sql_real +%% | sql_double +%% | sql_bit +%% | atom(). +%% -type precision() :: integer(). +%% -type scale() :: integer(). +%% -type size() :: integer(). + +-type state() :: #{ + poolname := binary(), + resource_opts := map(), + sql_templates := map() +}. + +%%==================================================================== +%% Configuration and default values +%%==================================================================== + +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields(config) -> + [ + {server, server()} + | add_default_username(emqx_connector_schema_lib:relational_db_fields()) + ]. + +add_default_username(Fields) -> + lists:map( + fun + ({username, OrigUsernameFn}) -> + {username, add_default_fn(OrigUsernameFn, <<"sa">>)}; + (Field) -> + Field + end, + Fields + ). + +add_default_fn(OrigFn, Default) -> + fun + (default) -> Default; + (Field) -> OrigFn(Field) + end. + +server() -> + Meta = #{desc => ?DESC("server")}, + emqx_schema:servers_sc(Meta, ?SQLSERVER_HOST_OPTIONS). + +%%==================================================================== +%% Callbacks defined in emqx_resource +%%==================================================================== + +callback_mode() -> async_if_possible. + +is_buffer_supported() -> false. + +on_start( + BridgeId = PoolName, + #{ + server := Server, + username := Username, + password := Password, + driver := Driver, + database := Database, + pool_size := PoolSize, + resource_opts := ResourceOpts + } = Config +) -> + ?SLOG(info, #{ + msg => "starting_sqlserver_connector", + connector => BridgeId, + config => emqx_misc:redact(Config) + }), + + ODBCDir = code:priv_dir(odbc), + OdbcserverDir = filename:join(ODBCDir, "bin/odbcserver"), + {ok, Info = #file_info{mode = Mode}} = file:read_file_info(OdbcserverDir), + case 33261 =:= Mode of + true -> + ok; + false -> + _ = file:write_file_info(OdbcserverDir, Info#file_info{mode = 33261}), + ok + end, + + Options = [ + {server, to_bin(Server)}, + {username, Username}, + {password, Password}, + {driver, Driver}, + {database, Database}, + {pool_size, PoolSize}, + {poolname, PoolName} + ], + + State = #{ + %% also BridgeId + poolname => PoolName, + sql_templates => parse_sql_template(Config), + resource_opts => ResourceOpts + }, + case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options) of + ok -> + {ok, State}; + {error, Reason} -> + ?tp( + sqlserver_connector_start_failed, + #{error => Reason} + ), + {error, Reason} + end. + +on_stop(InstanceId, #{poolname := PoolName} = _State) -> + ?SLOG(info, #{ + msg => "stopping_sqlserver_connector", + connector => InstanceId + }), + emqx_plugin_libs_pool:stop_pool(PoolName). + +-spec on_query( + bridge_id(), + {?ACTION_SEND_MESSAGE, map()}, + state() +) -> + ok + | {ok, list()} + | {error, {recoverable_error, term()}} + | {error, term()}. +on_query(BridgeId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> + ?TRACE( + "SINGLE_QUERY_SYNC", + "bridge_sqlserver_received", + #{requests => Query, connector => BridgeId, state => State} + ), + do_query(BridgeId, Query, ?SYNC_QUERY_MODE, State). + +-spec on_query_async( + bridge_id(), + {?ACTION_SEND_MESSAGE, map()}, + {ReplyFun :: function(), Args :: list()}, + state() +) -> + {ok, any()} + | {error, term()}. +on_query_async( + BridgeId, + {?ACTION_SEND_MESSAGE, _Msg} = Query, + ReplyFunAndArgs, + %% #{poolname := PoolName, sql_templates := Templates} = State + State +) -> + ?TRACE( + "SINGLE_QUERY_ASYNC", + "bridge_sqlserver_received", + #{requests => Query, connector => BridgeId, state => State} + ), + do_query(BridgeId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). + +-spec on_batch_query( + bridge_id(), + [{?ACTION_SEND_MESSAGE, map()}], + state() +) -> + ok + | {ok, list()} + | {error, {recoverable_error, term()}} + | {error, term()}. +on_batch_query(BridgeId, BatchRequests, State) -> + ?TRACE( + "BATCH_QUERY_SYNC", + "bridge_sqlserver_received", + #{requests => BatchRequests, connector => BridgeId, state => State} + ), + do_query(BridgeId, BatchRequests, ?SYNC_QUERY_MODE, State). + +-spec on_batch_query_async( + bridge_id(), + [{?ACTION_SEND_MESSAGE, map()}], + {ReplyFun :: function(), Args :: list()}, + state() +) -> {ok, any()}. +on_batch_query_async(BridgeId, Requests, ReplyFunAndArgs, State) -> + ?TRACE( + "BATCH_QUERY_ASYNC", + "bridge_sqlserver_received", + #{requests => Requests, connector => BridgeId, state => State} + ), + do_query(BridgeId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). + +on_get_status(_InstanceId, #{poolname := Pool, resource_opts := ResourceOpts} = _State) -> + RequestTimeout = ?REQUEST_TIMEOUT(ResourceOpts), + Health = emqx_plugin_libs_pool:health_check_ecpool_workers( + Pool, {?MODULE, do_get_status, [RequestTimeout]}, RequestTimeout + ), + status_result(Health). + +status_result(_Status = true) -> connected; +status_result(_Status = false) -> connecting. +%% TODO: +%% case for disconnected + +%%==================================================================== +%% ecpool callback fns +%%==================================================================== + +-spec connect(Options :: list()) -> {ok, connection_reference()} | {error, term()}. +connect(Options) -> + ConnectStr = lists:concat(conn_str(Options, [])), + Opts = proplists:get_value(options, Options, []), + odbc:connect(ConnectStr, Opts). + +-spec do_get_status(connection_reference(), time_out()) -> Result :: boolean(). +do_get_status(Conn, RequestTimeout) -> + case execute(Conn, <<"SELECT 1">>, RequestTimeout) of + {selected, [[]], [{1}]} -> true; + _ -> false + end. + +%%==================================================================== +%% Internal Helper fns +%%==================================================================== + +%% TODO && FIXME: +%% About the connection string attribute `Encrypt`: +%% The default value is `yes` in odbc version 18.0+ and `no` in previous versions. +%% And encrypted connections always verify the server's certificate. +%% So `Encrypt=YES;TrustServerCertificate=YES` must be set in the connection string when connecting to a server that has a self-signed certificate. +%% See also: +%% https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?source=recommendations&view=sql-server-ver16#encrypt +conn_str([], Acc) -> + %% we should use this for msodbcsql 18+ + %% lists:join(";", ["Encrypt=YES", "TrustServerCertificate=YES" | Acc]); + lists:join(";", Acc); +conn_str([{driver, Driver} | Opts], Acc) -> + conn_str(Opts, ["Driver=" ++ str(Driver) | Acc]); +conn_str([{server, Server} | Opts], Acc) -> + {Host, Port} = emqx_schema:parse_server(Server, ?SQLSERVER_HOST_OPTIONS), + conn_str(Opts, ["Server=" ++ str(Host) ++ "," ++ str(Port) | Acc]); +conn_str([{database, Database} | Opts], Acc) -> + conn_str(Opts, ["Database=" ++ str(Database) | Acc]); +conn_str([{username, Username} | Opts], Acc) -> + conn_str(Opts, ["UID=" ++ str(Username) | Acc]); +conn_str([{password, Password} | Opts], Acc) -> + conn_str(Opts, ["PWD=" ++ str(Password) | Acc]); +conn_str([{_, _} | Opts], Acc) -> + conn_str(Opts, Acc). + +%% Sync & Async query with singe & batch sql statement +-spec do_query( + bridge_id(), + Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}], + ApplyMode :: + handover + | {handover_async, {?MODULE, do_async_reply, [{ReplyFun :: function(), Args :: list()}]}}, + state() +) -> + {ok, list()} + | {error, {recoverable_error, term()}} + | {error, term()}. +do_query( + BridgeId, + Query, + ApplyMode, + #{poolname := PoolName, sql_templates := Templates} = State +) -> + ?TRACE( + "SINGLE_QUERY_SYNC", + "sqlserver_connector_received", + #{query => Query, connector => BridgeId, state => State} + ), + + %% only insert sql statement for single query and batch query + case apply_template(Query, Templates) of + {?ACTION_SEND_MESSAGE, SQL} -> + Result = ecpool:pick_and_do( + PoolName, + {?MODULE, worker_do_insert, [SQL, State]}, + ApplyMode + ); + Query -> + Result = {error, {unrecoverable_error, invalid_query}}; + _ -> + Result = {error, {unrecoverable_error, failed_to_apply_sql_template}} + end, + case Result of + {error, Reason} -> + ?tp( + sqlserver_connector_query_return, + #{error => Reason} + ), + ?SLOG(error, #{ + msg => "sqlserver_connector_do_query_failed", + connector => BridgeId, + query => Query, + reason => Reason + }), + Result; + _ -> + ?tp( + sqlserver_connector_query_return, + #{result => Result} + ), + Result + end. + +worker_do_insert( + Conn, SQL, #{resource_opts := ResourceOpts, poolname := BridgeId} = State +) -> + LogMeta = #{connector => BridgeId, state => State}, + try + case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of + {selected, Rows, _} -> + {ok, Rows}; + {updated, _} -> + ok; + {error, ErrStr} -> + ?SLOG(error, LogMeta#{msg => "invalid_request", reason => ErrStr}), + {error, {unrecoverable_error, {invalid_request, ErrStr}}} + end + catch + _Type:Reason -> + ?SLOG(error, LogMeta#{msg => "invalid_request", reason => Reason}), + {error, {unrecoverable_error, {invalid_request, Reason}}} + end. + +-spec execute(pid(), sql(), time_out()) -> + updated_tuple() + | selected_tuple() + | [updated_tuple()] + | [selected_tuple()] + | {error, common_reason()}. +execute(Conn, SQL, Timeout) -> + odbc:sql_query(Conn, str(SQL), Timeout). + +to_bin(List) when is_list(List) -> + unicode:characters_to_binary(List, utf8). + +%% for bridge data to sql server +parse_sql_template(Config) -> + RawSQLTemplates = + case maps:get(sql, Config, undefined) of + undefined -> #{}; + <<>> -> #{}; + SQLTemplate -> #{?ACTION_SEND_MESSAGE => SQLTemplate} + end, + + SingleInsertTks = #{}, + BatchInsertTks = #{}, + parse_sql_template(maps:to_list(RawSQLTemplates), SingleInsertTks, BatchInsertTks). + +parse_sql_template([{Key, H} | T], SingleInsertTks, BatchInsertTks) -> + case emqx_plugin_libs_rule:detect_sql_type(H) of + {ok, select} -> + parse_sql_template(T, SingleInsertTks, BatchInsertTks); + {ok, insert} -> + case emqx_plugin_libs_rule:split_insert_sql(H) of + {ok, {InsertSQL, Params}} -> + parse_sql_template( + T, + SingleInsertTks#{Key => emqx_plugin_libs_rule:preproc_tmpl(H)}, + BatchInsertTks#{ + Key => + #{ + ?BATCH_INSERT_PART => InsertSQL, + ?BATCH_PARAMS_TOKENS => emqx_plugin_libs_rule:preproc_tmpl( + Params + ) + } + } + ); + {error, Reason} -> + ?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}), + parse_sql_template(T, SingleInsertTks, BatchInsertTks) + end; + {error, Reason} -> + ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}), + parse_sql_template(T, SingleInsertTks, BatchInsertTks) + end; +parse_sql_template([], SingleInsertTks, BatchInsertTks) -> + #{ + ?SINGLE_INSERT_TEMP => SingleInsertTks, + ?BATCH_INSERT_TEMP => BatchInsertTks + }. + +%% single insert +apply_template( + {?ACTION_SEND_MESSAGE = Key, Msg} = Query, + #{?SINGLE_INSERT_TEMP := SingleInsertTks} = _Templates +) -> + case maps:get(Key, SingleInsertTks, undefined) of + undefined -> + Query; + Template -> + {Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)} + end; +%% batch inserts +apply_template( + [{?ACTION_SEND_MESSAGE = Key, _Msg} | _T] = BatchReqs, + #{?BATCH_INSERT_TEMP := BatchInsertsTks} = _Templates +) -> + case maps:get(Key, BatchInsertsTks, undefined) of + undefined -> + BatchReqs; + #{?BATCH_INSERT_PART := BatchInserts, ?BATCH_PARAMS_TOKENS := BatchParamsTks} -> + SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, BatchInserts, BatchParamsTks), + {Key, SQL} + end; +apply_template(Query, Templates) -> + %% TODO: more detail infomatoin + ?SLOG(error, #{msg => "apply sql template failed", query => Query, templates => Templates}). + +do_async_reply(Result, {ReplyFun, Args}) -> + erlang:apply(ReplyFun, Args ++ Result). diff --git a/rel/i18n/emqx_ee_bridge_sqlserver.hocon b/rel/i18n/emqx_ee_bridge_sqlserver.hocon new file mode 100644 index 000000000..bb82c82f8 --- /dev/null +++ b/rel/i18n/emqx_ee_bridge_sqlserver.hocon @@ -0,0 +1,87 @@ +emqx_ee_bridge_sqlserver { + + local_topic { + desc { + en: """The MQTT topic filter to be forwarded to MicroSoft SQL Server. All MQTT 'PUBLISH' messages with the topic +matching the local_topic will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is +configured, then both the data got from the rule and the MQTT messages that match local_topic +will be forwarded. +""" + zh: """发送到 'local_topic' 的消息都会转发到 MicroSoft SQL Server。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。 +""" + } + label { + en: """Local Topic""" + zh: """本地 Topic""" + } + } + + sql_template { + desc { + en: """SQL Template""" + zh: """SQL 模板""" + } + label { + en: """SQL Template""" + zh: """SQL 模板""" + } + } + + driver { + desc { + en: """SQL Server Driver Name""" + zh: """SQL Server Driver 名称""" + } + desc { + en: """SQL Server Driver Name""" + zh: """SQL Server Driver 名称""" + } + } + + config_enable { + desc { + en: """Enable or disable this bridge""" + zh: """启用/禁用桥接""" + } + label { + en: """Enable Or Disable Bridge""" + zh: """启用/禁用桥接""" + } + } + + desc_config { + desc { + en: """Configuration for an MicroSoft SQL Server bridge.""" + zh: """MicroSoft SQL Server 桥接配置""" + } + label: { + en: """MicroSoft SQL Server Bridge Configuration""" + zh: """MicroSoft SQL Server 桥接配置""" + } + } + + desc_type { + desc { + en: """The Bridge Type""" + zh: """Bridge 类型""" + } + label { + en: """Bridge Type""" + zh: """桥接类型""" + } + } + + desc_name { + desc { + en: """Bridge name.""" + zh: """桥接名字""" + } + label { + en: """Bridge Name""" + zh: """桥接名字""" + } + } + +} diff --git a/rel/i18n/emqx_ee_connector_sqlserver.hocon b/rel/i18n/emqx_ee_connector_sqlserver.hocon new file mode 100644 index 000000000..9ff24c1c1 --- /dev/null +++ b/rel/i18n/emqx_ee_connector_sqlserver.hocon @@ -0,0 +1,22 @@ +emqx_ee_connector_sqlserver { + + server { + desc { + en: """ +The IPv4 or IPv6 address or the hostname to connect to.
+A host entry has the following form: `Host[:Port]`.
+The SQL Server default port 1433 is used if `[:Port]` is not specified. +""" + zh: """ +将要连接的 IPv4 或 IPv6 地址,或者主机名。
+主机名具有以下形式:`Host[:Port]`。
+如果未指定 `[:Port]`,则使用 SQL Server 默认端口 1433。 +""" + } + label: { + en: "Server Host" + zh: "服务器地址" + } + } + +}