diff --git a/.ci/docker-compose-file/docker-compose-oracle.yaml b/.ci/docker-compose-file/docker-compose-oracle.yaml new file mode 100644 index 000000000..ea8965846 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-oracle.yaml @@ -0,0 +1,11 @@ +version: '3.9' + +services: + oracle_server: + container_name: oracle + image: oracleinanutshell/oracle-xe-11g:1.0.0 + restart: always + environment: + ORACLE_DISABLE_ASYNCH_IO: true + networks: + - emqx_bridge diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index 9cefcb808..e4fbfa62a 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -119,5 +119,11 @@ "listen": "0.0.0.0:6653", "upstream": "pulsar:6653", "enabled": true + }, + { + "name": "oracle", + "listen": "0.0.0.0:1521", + "upstream": "oracle:1521", + "enabled": true } ] diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index d6c140fef..e408250be 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.17"}, + {vsn, "0.1.18"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index fd4e16263..a37b6db3c 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -71,7 +71,8 @@ T == rocketmq; T == cassandra; T == sqlserver; - T == pulsar_producer + T == pulsar_producer; + T == oracle ). load() -> diff --git a/apps/emqx_bridge_oracle/BSL.txt b/apps/emqx_bridge_oracle/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_bridge_oracle/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_oracle/README.md b/apps/emqx_bridge_oracle/README.md new file mode 100644 index 000000000..d2974b722 --- /dev/null +++ b/apps/emqx_bridge_oracle/README.md @@ -0,0 +1,28 @@ +# EMQX Oracle Database Bridge + +This application houses the Oracle Database bridge for EMQX Enterprise Edition. +It implements the data bridge APIs for interacting with an Oracle Database Bridge. + + +# Documentation + +- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html) + for the EMQX rules engine introduction. + + +# HTTP APIs + +- Several APIs are provided for bridge management, which includes create bridge, + update bridge, get bridge, stop or restart bridge and list bridges etc. + + Refer to [API Docs - Bridges](https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges) for more detailed information. + + +## Contributing + +Please see our [contributing.md](../../CONTRIBUTING.md). + + +## License + +See [BSL](./BSL.txt). diff --git a/apps/emqx_bridge_oracle/docker-ct b/apps/emqx_bridge_oracle/docker-ct new file mode 100644 index 000000000..c24dc4bc9 --- /dev/null +++ b/apps/emqx_bridge_oracle/docker-ct @@ -0,0 +1,2 @@ +toxiproxy +oracle diff --git a/apps/emqx_bridge_oracle/etc/emqx_bridge_oracle.conf b/apps/emqx_bridge_oracle/etc/emqx_bridge_oracle.conf new file mode 100644 index 000000000..e69de29bb diff --git a/apps/emqx_bridge_oracle/rebar.config b/apps/emqx_bridge_oracle/rebar.config new file mode 100644 index 000000000..c238546c4 --- /dev/null +++ b/apps/emqx_bridge_oracle/rebar.config @@ -0,0 +1,13 @@ +%% -*- mode: erlang; -*- + +{erl_opts, [debug_info]}. +{deps, [ {emqx_oracle, {path, "../../apps/emqx_oracle"}} + , {emqx_connector, {path, "../../apps/emqx_connector"}} + , {emqx_resource, {path, "../../apps/emqx_resource"}} + , {emqx_bridge, {path, "../../apps/emqx_bridge"}} + ]}. + +{shell, [ + % {config, "config/sys.config"}, + {apps, [emqx_bridge_oracle]} +]}. diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src new file mode 100644 index 000000000..4f81c2110 --- /dev/null +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src @@ -0,0 +1,14 @@ +{application, emqx_bridge_oracle, [ + {description, "EMQX Enterprise Oracle Database Bridge"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + emqx_oracle + ]}, + {env, []}, + {modules, []}, + + {links, []} +]}. diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl new file mode 100644 index 000000000..8a87f02ba --- /dev/null +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl @@ -0,0 +1,109 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_oracle). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-export([ + conn_bridge_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(DEFAULT_SQL, << + "insert into t_mqtt_msg(msgid, topic, qos, payload)" + "values (${id}, ${topic}, ${qos}, ${payload})" +>>). + +conn_bridge_examples(Method) -> + [ + #{ + <<"oracle">> => #{ + summary => <<"Oracle Database Bridge">>, + value => values(Method) + } + } + ]. + +values(_Method) -> + #{ + enable => true, + type => oracle, + name => <<"foo">>, + server => <<"127.0.0.1:1521">>, + pool_size => 8, + database => <<"ORCL">>, + sid => <<"ORCL">>, + username => <<"root">>, + password => <<"******">>, + sql => ?DEFAULT_SQL, + local_topic => <<"local/topic/#">>, + resource_opts => #{ + worker_pool_size => 8, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => async, + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES + } + }. + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions + +namespace() -> "bridge_oracle". + +roots() -> []. + +fields("config") -> + [ + {enable, + hoconsc:mk( + boolean(), + #{desc => ?DESC("config_enable"), default => true} + )}, + {sql, + hoconsc:mk( + binary(), + #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + )}, + {local_topic, + hoconsc:mk( + binary(), + #{desc => ?DESC("local_topic"), default => undefined} + )} + ] ++ emqx_resource_schema:fields("resource_opts") ++ + (emqx_oracle_schema:fields(config) -- + emqx_connector_schema_lib:prepare_statement_fields()); +fields("post") -> + fields("post", oracle); +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +fields("post", Type) -> + [type_field(Type), name_field() | fields("config")]. + +desc("config") -> + ?DESC("desc_config"); +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- + +type_field(Type) -> + {type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl new file mode 100644 index 000000000..de77b26de --- /dev/null +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -0,0 +1,594 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_oracle_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-import(emqx_common_test_helpers, [on_exit/1]). + +-define(BRIDGE_TYPE_BIN, <<"oracle">>). +-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_oracle, emqx_bridge_oracle]). +-define(DATABASE, "XE"). +-define(RULE_TOPIC, "mqtt/rule"). +% -define(RULE_TOPIC_BIN, <>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, plain} + ]. + +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + [ + {plain, AllTCs} + ]. + +only_once_tests() -> + [t_create_via_http]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)), + _ = application:stop(emqx_connector), + ok. + +init_per_group(plain = Type, Config) -> + OracleHost = os:getenv("ORACLE_PLAIN_HOST", "toxiproxy.emqx.net"), + OraclePort = list_to_integer(os:getenv("ORACLE_PLAIN_PORT", "1521")), + ProxyName = "oracle", + case emqx_common_test_helpers:is_tcp_server_available(OracleHost, OraclePort) of + true -> + Config1 = common_init_per_group(), + [ + {proxy_name, ProxyName}, + {oracle_host, OracleHost}, + {oracle_port, OraclePort}, + {connection_type, Type} + | Config1 ++ Config + ]; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_oracle); + _ -> + {skip, no_oracle} + end + end; +init_per_group(_Group, Config) -> + Config. + +end_per_group(Group, Config) when + Group =:= plain +-> + common_end_per_group(Config), + ok; +end_per_group(_Group, _Config) -> + ok. + +common_init_per_group() -> + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + application:load(emqx_bridge), + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps(?APPS), + {ok, _} = application:ensure_all_started(emqx_connector), + emqx_mgmt_api_test_util:init_suite(), + UniqueNum = integer_to_binary(erlang:unique_integer()), + MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>, + [ + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort}, + {mqtt_topic, MQTTTopic} + ]. + +common_end_per_group(Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + delete_all_bridges(), + ok. + +init_per_testcase(TestCase, Config) -> + common_init_per_testcase(TestCase, Config). + +end_per_testcase(_Testcase, Config) -> + common_end_per_testcase(_Testcase, Config). + +common_init_per_testcase(TestCase, Config0) -> + ct:timetrap(timer:seconds(60)), + delete_all_bridges(), + UniqueNum = integer_to_binary(erlang:unique_integer()), + OracleTopic = + << + (atom_to_binary(TestCase))/binary, + UniqueNum/binary + >>, + ConnectionType = ?config(connection_type, Config0), + Config = [{oracle_topic, OracleTopic} | Config0], + {Name, ConfigString, OracleConfig} = oracle_config( + TestCase, ConnectionType, Config + ), + ok = snabbkaffe:start_trace(), + [ + {oracle_name, Name}, + {oracle_config_string, ConfigString}, + {oracle_config, OracleConfig} + | Config + ]. + +common_end_per_testcase(_Testcase, Config) -> + case proplists:get_bool(skip_does_not_apply, Config) of + true -> + ok; + false -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + delete_all_bridges(), + %% in CI, apparently this needs more time since the + %% machines struggle with all the containers running... + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(), + ok + end. + +delete_all_bridges() -> + lists:foreach( + fun(#{name := Name, type := Type}) -> + emqx_bridge:remove(Type, Name) + end, + emqx_bridge:list() + ). + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ +sql_insert_template_for_bridge() -> + "INSERT INTO mqtt_test(topic, msgid, payload, retain) VALUES (${topic}, ${id}, ${payload}, ${retain})". + +sql_create_table() -> + "CREATE TABLE mqtt_test (topic VARCHAR2(255), msgid VARCHAR2(64), payload NCLOB, retain NUMBER(1))". + +sql_drop_table() -> + "DROP TABLE mqtt_test". + +reset_table(Config) -> + ResourceId = resource_id(Config), + _ = emqx_resource:simple_sync_query(ResourceId, {sql, sql_drop_table()}), + {ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query( + ResourceId, {sql, sql_create_table()} + ), + ok. + +drop_table(Config) -> + ResourceId = resource_id(Config), + emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}), + ok. + +oracle_config(TestCase, _ConnectionType, Config) -> + UniqueNum = integer_to_binary(erlang:unique_integer()), + OracleHost = ?config(oracle_host, Config), + OraclePort = ?config(oracle_port, Config), + Name = << + (atom_to_binary(TestCase))/binary, UniqueNum/binary + >>, + ServerURL = iolist_to_binary([ + OracleHost, + ":", + integer_to_binary(OraclePort) + ]), + ConfigString = + io_lib:format( + "bridges.oracle.~s {\n" + " enable = true\n" + " database = \"~s\"\n" + " sid = \"~s\"\n" + " server = \"~s\"\n" + " username = \"system\"\n" + " password = \"oracle\"\n" + " pool_size = 1\n" + " sql = \"~s\"\n" + " resource_opts = {\n" + " auto_restart_interval = 5000\n" + " request_timeout = 30000\n" + " query_mode = \"async\"\n" + " enable_batch = true\n" + " batch_size = 3\n" + " batch_time = \"3s\"\n" + " worker_pool_size = 1\n" + " }\n" + "}\n", + [ + Name, + ?DATABASE, + ?DATABASE, + ServerURL, + sql_insert_template_for_bridge() + ] + ), + {Name, ConfigString, parse_and_check(ConfigString, Name)}. + +parse_and_check(ConfigString, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + TypeBin = ?BRIDGE_TYPE_BIN, + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf, + Config. + +resource_id(Config) -> + Type = ?BRIDGE_TYPE_BIN, + Name = ?config(oracle_name, Config), + emqx_bridge_resource:resource_id(Type, Name). + +create_bridge(Config) -> + create_bridge(Config, _Overrides = #{}). + +create_bridge(Config, Overrides) -> + Type = ?BRIDGE_TYPE_BIN, + Name = ?config(oracle_name, Config), + OracleConfig0 = ?config(oracle_config, Config), + OracleConfig = emqx_utils_maps:deep_merge(OracleConfig0, Overrides), + emqx_bridge:create(Type, Name, OracleConfig). + +create_bridge_api(Config) -> + create_bridge_api(Config, _Overrides = #{}). + +create_bridge_api(Config, Overrides) -> + TypeBin = ?BRIDGE_TYPE_BIN, + Name = ?config(oracle_name, Config), + OracleConfig0 = ?config(oracle_config, Config), + OracleConfig = emqx_utils_maps:deep_merge(OracleConfig0, Overrides), + Params = OracleConfig#{<<"type">> => TypeBin, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("creating bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}}; + Error -> + Error + end, + ct:pal("bridge create result: ~p", [Res]), + Res. + +update_bridge_api(Config) -> + update_bridge_api(Config, _Overrides = #{}). + +update_bridge_api(Config, Overrides) -> + TypeBin = ?BRIDGE_TYPE_BIN, + Name = ?config(oracle_name, Config), + OracleConfig0 = ?config(oracle_config, Config), + OracleConfig = emqx_utils_maps:deep_merge(OracleConfig0, Overrides), + BridgeId = emqx_bridge_resource:bridge_id(TypeBin, Name), + Params = OracleConfig#{<<"type">> => TypeBin, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("updating bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Params, Opts) of + {ok, {_Status, _Headers, Body0}} -> {ok, emqx_utils_json:decode(Body0, [return_maps])}; + Error -> Error + end, + ct:pal("bridge update result: ~p", [Res]), + Res. + +probe_bridge_api(Config) -> + probe_bridge_api(Config, _Overrides = #{}). + +probe_bridge_api(Config, _Overrides) -> + TypeBin = ?BRIDGE_TYPE_BIN, + Name = ?config(oracle_name, Config), + OracleConfig = ?config(oracle_config, Config), + Params = OracleConfig#{<<"type">> => TypeBin, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("probing bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of + {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0}; + Error -> Error + end, + ct:pal("bridge probe result: ~p", [Res]), + Res. + +create_rule_and_action_http(Config) -> + OracleName = ?config(oracle_name, Config), + BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, OracleName), + Params = #{ + enable => true, + sql => <<"SELECT * FROM \"", ?RULE_TOPIC, "\"">>, + actions => [BridgeId] + }, + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + ct:pal("rule action params: ~p", [Params]), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; + Error -> Error + end. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +% Under normal operations, the bridge will be called async via +% `simple_async_query'. +t_sync_query(Config) -> + ResourceId = resource_id(Config), + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge_api(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + reset_table(Config), + MsgId = erlang:unique_integer(), + Params = #{ + topic => ?config(mqtt_topic, Config), + id => MsgId, + payload => ?config(oracle_name, Config), + retain => true + }, + Message = {send_message, Params}, + ?assertEqual( + {ok, [{affected_rows, 1}]}, emqx_resource:simple_sync_query(ResourceId, Message) + ), + ok + end, + [] + ), + ok. + +t_async_query(Config) -> + Overrides = #{ + <<"resource_opts">> => #{ + <<"enable_batch">> => <<"false">>, + <<"batch_size">> => 1 + } + }, + ResourceId = resource_id(Config), + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge_api(Config, Overrides)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + reset_table(Config), + MsgId = erlang:unique_integer(), + Params = #{ + topic => ?config(mqtt_topic, Config), + id => MsgId, + payload => ?config(oracle_name, Config), + retain => false + }, + Message = {send_message, Params}, + ?assertMatch( + { + ok, + {ok, #{result := {ok, [{affected_rows, 1}]}}} + }, + ?wait_async_action( + emqx_resource:query(ResourceId, Message), + #{?snk_kind := oracle_query}, + 5_000 + ) + ), + ok + end, + [] + ), + ok. + +t_batch_sync_query(Config) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge_api(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 30, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + reset_table(Config), + MsgId = erlang:unique_integer(), + Params = #{ + topic => ?config(mqtt_topic, Config), + id => MsgId, + payload => ?config(oracle_name, Config), + retain => false + }, + % Send 3 async messages while resource is down. When it comes back, these messages + % will be delivered in sync way. If we try to send sync messages directly, it will + % be sent async as callback_mode is set to async_if_possible. + Message = {send_message, Params}, + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ct:sleep(1000), + emqx_resource:query(ResourceId, Message), + emqx_resource:query(ResourceId, Message), + emqx_resource:query(ResourceId, Message) + end), + ?retry( + _Sleep = 1_000, + _Attempts = 30, + ?assertMatch( + {ok, [{result_set, _, _, [[{3}]]}]}, + emqx_resource:simple_sync_query( + ResourceId, {query, "SELECT COUNT(*) FROM mqtt_test"} + ) + ) + ), + ok + end, + [] + ), + ok. + +t_batch_async_query(Config) -> + ResourceId = resource_id(Config), + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge_api(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + reset_table(Config), + MsgId = erlang:unique_integer(), + Params = #{ + topic => ?config(mqtt_topic, Config), + id => MsgId, + payload => ?config(oracle_name, Config), + retain => false + }, + Message = {send_message, Params}, + ?assertMatch( + { + ok, + {ok, #{result := {ok, [{affected_rows, 1}]}}} + }, + ?wait_async_action( + emqx_resource:query(ResourceId, Message), + #{?snk_kind := oracle_batch_query}, + 5_000 + ) + ), + ok + end, + [] + ), + ok. + +t_create_via_http(Config) -> + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge_api(Config)), + + %% lightweight matrix testing some configs + ?assertMatch( + {ok, _}, + update_bridge_api( + Config, + #{ + <<"resource_opts">> => + #{<<"batch_size">> => 4} + } + ) + ), + ?assertMatch( + {ok, _}, + update_bridge_api( + Config, + #{ + <<"resource_opts">> => + #{<<"batch_time">> => <<"4s">>} + } + ) + ), + ok + end, + [] + ), + ok. + +t_start_stop(Config) -> + OracleName = ?config(oracle_name, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + ?assertMatch({ok, _}, create_bridge(Config)), + %% Since the connection process is async, we give it some time to + %% stabilize and avoid flakiness. + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + + %% Check that the bridge probe API doesn't leak atoms. + ProbeRes0 = probe_bridge_api( + Config, + #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + ), + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), + AtomsBefore = erlang:system_info(atom_count), + %% Probe again; shouldn't have created more atoms. + ProbeRes1 = probe_bridge_api( + Config, + #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + ), + + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1), + AtomsAfter = erlang:system_info(atom_count), + ?assertEqual(AtomsBefore, AtomsAfter), + + %% Now stop the bridge. + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + emqx_bridge:disable_enable(disable, ?BRIDGE_TYPE_BIN, OracleName), + #{?snk_kind := oracle_bridge_stopped}, + 5_000 + ) + ), + + ok + end, + fun(Trace) -> + %% one for each probe, one for real + ?assertMatch([_, _, _], ?of_kind(oracle_bridge_stopped, Trace)), + ok + end + ), + ok. + +t_on_get_status(Config) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + ResourceId = resource_id(Config), + ?assertMatch({ok, _}, create_bridge(Config)), + %% Since the connection process is async, we give it some time to + %% stabilize and avoid flakiness. + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ct:sleep(500), + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)) + end), + %% Check that it recovers itself. + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + ok. diff --git a/apps/emqx_oracle/BSL.txt b/apps/emqx_oracle/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_oracle/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_oracle/README.md b/apps/emqx_oracle/README.md new file mode 100644 index 000000000..873d52259 --- /dev/null +++ b/apps/emqx_oracle/README.md @@ -0,0 +1,14 @@ +# Oracle Database Connector + +This application houses the Oracle Database connector for EMQX Enterprise Edition. +It provides the APIs to connect to Oracle Database. + +So far it is only used to insert messages as data bridge. + +## Contributing + +Please see our [contributing.md](../../CONTRIBUTING.md). + +## License + +See [BSL](./BSL.txt). diff --git a/apps/emqx_oracle/rebar.config b/apps/emqx_oracle/rebar.config new file mode 100644 index 000000000..14461ba34 --- /dev/null +++ b/apps/emqx_oracle/rebar.config @@ -0,0 +1,7 @@ +%% -*- mode: erlang; -*- + +{erl_opts, [debug_info]}. +{deps, [ {jamdb_oracle, {git, "https://github.com/emqx/jamdb_oracle", {tag, "0.4.9.4"}}} + , {emqx_connector, {path, "../../apps/emqx_connector"}} + , {emqx_resource, {path, "../../apps/emqx_resource"}} + ]}. diff --git a/apps/emqx_oracle/src/emqx_oracle.app.src b/apps/emqx_oracle/src/emqx_oracle.app.src new file mode 100644 index 000000000..fa48e8479 --- /dev/null +++ b/apps/emqx_oracle/src/emqx_oracle.app.src @@ -0,0 +1,14 @@ +{application, emqx_oracle, [ + {description, "EMQX Enterprise Oracle Database Connector"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + jamdb_oracle + ]}, + {env, []}, + {modules, []}, + + {links, []} +]}. diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl new file mode 100644 index 000000000..c39a6a6d7 --- /dev/null +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -0,0 +1,434 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_oracle). + +-behaviour(emqx_resource). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(ORACLE_DEFAULT_PORT, 1521). + +%%==================================================================== +%% Exports +%%==================================================================== + +%% callbacks for behaviour emqx_resource +-export([ + callback_mode/0, + is_buffer_supported/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_query_async/4, + on_batch_query_async/4, + on_get_status/2 +]). + +%% callbacks for ecpool +-export([connect/1, prepare_sql_to_conn/2]). + +%% Internal exports used to execute code with ecpool worker +-export([ + query/3, + execute_batch/3, + do_async_reply/2, + do_get_status/1 +]). + +-export([ + oracle_host_options/0 +]). + +-define(ACTION_SEND_MESSAGE, send_message). + +-define(SYNC_QUERY_MODE, no_handover). +-define(ASYNC_QUERY_MODE(REPLY), {handover_async, {?MODULE, do_async_reply, [REPLY]}}). + +-define(ORACLE_HOST_OPTIONS, #{ + default_port => ?ORACLE_DEFAULT_PORT +}). + +-define(MAX_CURSORS, 10). +-define(DEFAULT_POOL_SIZE, 8). +-define(OPT_TIMEOUT, 30000). + +-type prepares() :: #{atom() => binary()}. +-type params_tokens() :: #{atom() => list()}. + +-type state() :: + #{ + pool_name := binary(), + prepare_sql := prepares(), + params_tokens := params_tokens(), + batch_params_tokens := params_tokens() + }. + +callback_mode() -> async_if_possible. + +is_buffer_supported() -> false. + +-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}. +on_start( + InstId, + #{ + server := Server, + database := DB, + sid := Sid, + username := User + } = Config +) -> + ?SLOG(info, #{ + msg => "starting_oracle_connector", + connector => InstId, + config => emqx_utils:redact(Config) + }), + ?tp(oracle_bridge_started, #{instance_id => InstId, config => Config}), + {ok, _} = application:ensure_all_started(ecpool), + {ok, _} = application:ensure_all_started(jamdb_oracle), + jamdb_oracle_conn:set_max_cursors_number(?MAX_CURSORS), + + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, oracle_host_options()), + ServiceName = maps:get(<<"service_name">>, Config, Sid), + Options = [ + {host, Host}, + {port, Port}, + {user, emqx_plugin_libs_rule:str(User)}, + {password, emqx_secret:wrap(maps:get(password, Config, ""))}, + {sid, emqx_plugin_libs_rule:str(Sid)}, + {service_name, emqx_plugin_libs_rule:str(ServiceName)}, + {database, DB}, + {pool_size, maps:get(<<"pool_size">>, Config, ?DEFAULT_POOL_SIZE)}, + {timeout, ?OPT_TIMEOUT}, + {app_name, "EMQX Data To Oracle Database Action"} + ], + PoolName = InstId, + Prepares = parse_prepare_sql(Config), + InitState = #{pool_name => PoolName, prepare_statement => #{}}, + State = maps:merge(InitState, Prepares), + case emqx_resource_pool:start(InstId, ?MODULE, Options) of + ok -> + {ok, init_prepare(State)}; + {error, Reason} -> + ?tp( + oracle_connector_start_failed, + #{error => Reason} + ), + {error, Reason} + end. + +on_stop(InstId, #{pool_name := PoolName}) -> + ?SLOG(info, #{ + msg => "stopping_oracle_connector", + connector => InstId + }), + ?tp(oracle_bridge_stopped, #{instance_id => InstId}), + emqx_resource_pool:stop(PoolName). + +on_query(InstId, {TypeOrKey, NameOrSQL}, #{pool_name := _PoolName} = State) -> + on_query(InstId, {TypeOrKey, NameOrSQL, []}, State); +on_query( + InstId, + {TypeOrKey, NameOrSQL, Params}, + #{pool_name := PoolName} = State +) -> + ?SLOG(debug, #{ + msg => "oracle database connector received sql query", + connector => InstId, + type => TypeOrKey, + sql => NameOrSQL, + state => State + }), + Type = query, + {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), + Res = on_sql_query(InstId, PoolName, Type, ?SYNC_QUERY_MODE, NameOrSQL2, Data), + handle_result(Res). + +on_query_async(InstId, {TypeOrKey, NameOrSQL}, Reply, State) -> + on_query_async(InstId, {TypeOrKey, NameOrSQL, []}, Reply, State); +on_query_async( + InstId, {TypeOrKey, NameOrSQL, Params} = Query, Reply, #{pool_name := PoolName} = State +) -> + ?SLOG(debug, #{ + msg => "oracle database connector received async sql query", + connector => InstId, + query => Query, + reply => Reply, + state => State + }), + ApplyMode = ?ASYNC_QUERY_MODE(Reply), + Type = query, + {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), + Res = on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL2, Data), + handle_result(Res). + +on_batch_query( + InstId, + BatchReq, + #{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State +) -> + case BatchReq of + [{Key, _} = Request | _] -> + BinKey = to_bin(Key), + case maps:get(BinKey, Tokens, undefined) of + undefined -> + Log = #{ + connector => InstId, + first_request => Request, + state => State, + msg => "batch prepare not implemented" + }, + ?SLOG(error, Log), + {error, {unrecoverable_error, batch_prepare_not_implemented}}; + TokenList -> + {_, Datas} = lists:unzip(BatchReq), + Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], + St = maps:get(BinKey, Sts), + case + on_sql_query(InstId, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2) + of + {ok, Results} -> + handle_batch_result(Results, 0); + Result -> + Result + end + end; + _ -> + Log = #{ + connector => InstId, + request => BatchReq, + state => State, + msg => "invalid request" + }, + ?SLOG(error, Log), + {error, {unrecoverable_error, invalid_request}} + end. + +on_batch_query_async( + InstId, + BatchReq, + Reply, + #{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State +) -> + case BatchReq of + [{Key, _} = Request | _] -> + BinKey = to_bin(Key), + case maps:get(BinKey, Tokens, undefined) of + undefined -> + Log = #{ + connector => InstId, + first_request => Request, + state => State, + msg => "batch prepare not implemented" + }, + ?SLOG(error, Log), + {error, {unrecoverable_error, batch_prepare_not_implemented}}; + TokenList -> + {_, Datas} = lists:unzip(BatchReq), + Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], + St = maps:get(BinKey, Sts), + case + on_sql_query( + InstId, PoolName, execute_batch, ?ASYNC_QUERY_MODE(Reply), St, Datas2 + ) + of + {ok, Results} -> + handle_batch_result(Results, 0); + Result -> + Result + end + end; + _ -> + Log = #{ + connector => InstId, + request => BatchReq, + state => State, + msg => "invalid request" + }, + ?SLOG(error, Log), + {error, {unrecoverable_error, invalid_request}} + end. + +proc_sql_params(query, SQLOrKey, Params, _State) -> + {SQLOrKey, Params}; +proc_sql_params(TypeOrKey, SQLOrData, Params, #{ + params_tokens := ParamsTokens, prepare_sql := PrepareSql +}) -> + Key = to_bin(TypeOrKey), + case maps:get(Key, ParamsTokens, undefined) of + undefined -> + {SQLOrData, Params}; + Tokens -> + case maps:get(Key, PrepareSql, undefined) of + undefined -> + {SQLOrData, Params}; + Sql -> + {Sql, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} + end + end. + +on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL, Data) -> + case ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, ApplyMode) of + {error, Reason} = Result -> + ?tp( + oracle_connector_query_return, + #{error => Reason} + ), + ?SLOG(error, #{ + msg => "oracle database connector do sql query failed", + connector => InstId, + type => Type, + sql => NameOrSQL, + reason => Reason + }), + Result; + Result -> + ?tp( + oracle_connector_query_return, + #{result => Result} + ), + Result + end. + +on_get_status(_InstId, #{pool_name := Pool} = State) -> + case emqx_resource_pool:health_check_workers(Pool, fun ?MODULE:do_get_status/1) of + true -> + case do_check_prepares(State) of + ok -> + connected; + {ok, NState} -> + %% return new state with prepared statements + {connected, NState} + end; + false -> + disconnected + end. + +do_get_status(Conn) -> + ok == element(1, jamdb_oracle:sql_query(Conn, "select 1 from dual")). + +do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> + ok; +do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) -> + {ok, Sts} = prepare_sql(Prepares, PoolName), + {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}. + +%% =================================================================== + +oracle_host_options() -> + ?ORACLE_HOST_OPTIONS. + +connect(Opts) -> + Password = emqx_secret:unwrap(proplists:get_value(password, Opts)), + NewOpts = lists:keyreplace(password, 1, Opts, {password, Password}), + jamdb_oracle:start_link(NewOpts). + +sql_query_to_str(SqlQuery) -> + emqx_plugin_libs_rule:str(SqlQuery). + +sql_params_to_str(Params) when is_list(Params) -> + lists:map( + fun + (false) -> "0"; + (true) -> "1"; + (Value) -> emqx_plugin_libs_rule:str(Value) + end, + Params + ). + +query(Conn, SQL, Params) -> + Ret = jamdb_oracle:sql_query(Conn, {sql_query_to_str(SQL), sql_params_to_str(Params)}), + ?tp(oracle_query, #{conn => Conn, sql => SQL, params => Params, result => Ret}), + handle_result(Ret). + +execute_batch(Conn, SQL, ParamsList) -> + ParamsListStr = lists:map(fun sql_params_to_str/1, ParamsList), + Ret = jamdb_oracle:sql_query(Conn, {batch, sql_query_to_str(SQL), ParamsListStr}), + ?tp(oracle_batch_query, #{conn => Conn, sql => SQL, params => ParamsList, result => Ret}), + handle_result(Ret). + +parse_prepare_sql(Config) -> + SQL = + case maps:get(prepare_statement, Config, undefined) of + undefined -> + case maps:get(sql, Config, undefined) of + undefined -> #{}; + Template -> #{<<"send_message">> => Template} + end; + Any -> + Any + end, + parse_prepare_sql(maps:to_list(SQL), #{}, #{}). + +parse_prepare_sql([{Key, H} | T], Prepares, Tokens) -> + {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, ':n'), + parse_prepare_sql( + T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens} + ); +parse_prepare_sql([], Prepares, Tokens) -> + #{ + prepare_sql => Prepares, + params_tokens => Tokens + }. + +init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) -> + {ok, Sts} = prepare_sql(Prepares, PoolName), + State#{prepare_statement := Sts}. + +prepare_sql(Prepares, PoolName) when is_map(Prepares) -> + prepare_sql(maps:to_list(Prepares), PoolName); +prepare_sql(Prepares, PoolName) -> + Data = do_prepare_sql(Prepares, PoolName), + {ok, _Sts} = Data, + ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), + Data. + +do_prepare_sql(Prepares, PoolName) -> + do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}). + +do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) -> + {ok, Conn} = ecpool_worker:client(Worker), + {ok, Sts} = prepare_sql_to_conn(Conn, Prepares), + do_prepare_sql(T, Prepares, PoolName, Sts); +do_prepare_sql([], _Prepares, _PoolName, LastSts) -> + {ok, LastSts}. + +prepare_sql_to_conn(Conn, Prepares) -> + prepare_sql_to_conn(Conn, Prepares, #{}). + +prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements}; +prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) -> + LogMeta = #{msg => "Oracle Database Prepare Statement", name => Key, prepare_sql => SQL}, + ?SLOG(info, LogMeta), + prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => SQL}). + +to_bin(Bin) when is_binary(Bin) -> + Bin; +to_bin(Atom) when is_atom(Atom) -> + erlang:atom_to_binary(Atom). + +handle_result({error, disconnected}) -> + {error, {recoverable_error, disconnected}}; +handle_result({error, Error}) -> + {error, {unrecoverable_error, Error}}; +handle_result({error, socket, closed} = Error) -> + {error, {recoverable_error, Error}}; +handle_result({error, Type, Reason}) -> + {error, {unrecoverable_error, {Type, Reason}}}; +handle_result(Res) -> + Res. + +handle_batch_result([{affected_rows, RowCount} | Rest], Acc) -> + handle_batch_result(Rest, Acc + RowCount); +handle_batch_result([{proc_result, RetCode, _Rows} | Rest], Acc) when RetCode =:= 0 -> + handle_batch_result(Rest, Acc); +handle_batch_result([{proc_result, RetCode, Reason} | _Rest], _Acc) -> + {error, {unrecoverable_error, {RetCode, Reason}}}; +handle_batch_result([], Acc) -> + {ok, Acc}. + +do_async_reply(Result, {ReplyFun, [Context]}) -> + ReplyFun(Context, Result). diff --git a/apps/emqx_oracle/src/emqx_oracle_schema.erl b/apps/emqx_oracle/src/emqx_oracle_schema.erl new file mode 100644 index 000000000..cfa74054a --- /dev/null +++ b/apps/emqx_oracle/src/emqx_oracle_schema.erl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_oracle_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-define(REF_MODULE, emqx_oracle). + +%% Hocon config schema exports +-export([ + roots/0, + fields/1 +]). + +roots() -> + [{config, #{type => hoconsc:ref(?REF_MODULE, config)}}]. + +fields(config) -> + [{server, server()}, {sid, fun sid/1}] ++ + emqx_connector_schema_lib:relational_db_fields() ++ + emqx_connector_schema_lib:prepare_statement_fields(). + +server() -> + Meta = #{desc => ?DESC(?REF_MODULE, "server")}, + emqx_schema:servers_sc(Meta, (?REF_MODULE):oracle_host_options()). + +sid(type) -> binary(); +sid(desc) -> ?DESC(?REF_MODULE, "sid"); +sid(required) -> true; +sid(_) -> undefined. diff --git a/apps/emqx_plugin_libs/src/emqx_placeholder.erl b/apps/emqx_plugin_libs/src/emqx_placeholder.erl index 18ef9e8fb..dcd666f5b 100644 --- a/apps/emqx_plugin_libs/src/emqx_placeholder.erl +++ b/apps/emqx_plugin_libs/src/emqx_placeholder.erl @@ -69,7 +69,7 @@ -type preproc_sql_opts() :: #{ placeholders => list(binary()), - replace_with => '?' | '$n', + replace_with => '?' | '$n' | ':n', strip_double_quote => boolean() }. @@ -149,7 +149,7 @@ proc_cmd(Tokens, Data, Opts) -> preproc_sql(Sql) -> preproc_sql(Sql, '?'). --spec preproc_sql(binary(), '?' | '$n' | preproc_sql_opts()) -> +-spec preproc_sql(binary(), '?' | '$n' | ':n' | preproc_sql_opts()) -> {prepare_statement_key(), tmpl_token()}. preproc_sql(Sql, ReplaceWith) when is_atom(ReplaceWith) -> preproc_sql(Sql, #{replace_with => ReplaceWith}); @@ -316,13 +316,17 @@ preproc_tmpl_deep_map_key(Key, _) -> replace_with(Tmpl, RE, '?') -> re:replace(Tmpl, RE, "?", [{return, binary}, global]); replace_with(Tmpl, RE, '$n') -> + replace_with(Tmpl, RE, <<"$">>); +replace_with(Tmpl, RE, ':n') -> + replace_with(Tmpl, RE, <<":">>); +replace_with(Tmpl, RE, String) when is_binary(String) -> Parts = re:split(Tmpl, RE, [{return, binary}, trim, group]), {Res, _} = lists:foldl( fun ([Tkn, _Phld], {Acc, Seq}) -> Seq1 = erlang:integer_to_binary(Seq), - {<>, Seq + 1}; + {<>, Seq + 1}; ([Tkn], {Acc, Seq}) -> {<>, Seq} end, @@ -330,6 +334,7 @@ replace_with(Tmpl, RE, '$n') -> Parts ), Res. + parse_nested(<<".", R/binary>>) -> %% ignore the root . parse_nested(R); diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src b/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src index 24b5a3240..bfd7e68fa 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_plugin_libs, [ {description, "EMQX Plugin utility libs"}, - {vsn, "4.3.9"}, + {vsn, "4.3.10"}, {modules, []}, {applications, [kernel, stdlib]}, {env, []} diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index 8844fe586..9a4c01a2b 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -105,9 +105,8 @@ proc_cmd(Tokens, Data, Opts) -> preproc_sql(Sql) -> emqx_placeholder:preproc_sql(Sql). --spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n') -> +-spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n' | ':n') -> {prepare_statement_key(), tmpl_token()}. - preproc_sql(Sql, ReplaceWith) -> emqx_placeholder:preproc_sql(Sql, ReplaceWith). diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 2553e6dd8..3e264cb3e 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.14"}, + {vsn, "0.1.15"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/changes/ee/feat-10498.en.md b/changes/ee/feat-10498.en.md new file mode 100644 index 000000000..7222f8957 --- /dev/null +++ b/changes/ee/feat-10498.en.md @@ -0,0 +1 @@ +Implement Oracle Database Bridge, which supports publishing messages to Oracle Database from MQTT topics. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index 5544825f8..825175038 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_bridge, [ {description, "EMQX Enterprise data bridges"}, - {vsn, "0.1.11"}, + {vsn, "0.1.12"}, {registered, []}, {applications, [ kernel, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 38f471ca2..3baf056ec 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -37,7 +37,8 @@ api_schemas(Method) -> ref(emqx_ee_bridge_rocketmq, Method), ref(emqx_ee_bridge_sqlserver, Method), ref(emqx_bridge_opents, Method), - ref(emqx_bridge_pulsar, Method ++ "_producer") + ref(emqx_bridge_pulsar, Method ++ "_producer"), + ref(emqx_bridge_oracle, Method) ]. schema_modules() -> @@ -59,7 +60,8 @@ schema_modules() -> emqx_ee_bridge_rocketmq, emqx_ee_bridge_sqlserver, emqx_bridge_opents, - emqx_bridge_pulsar + emqx_bridge_pulsar, + emqx_bridge_oracle ]. examples(Method) -> @@ -100,7 +102,8 @@ resource_type(dynamo) -> emqx_ee_connector_dynamo; resource_type(rocketmq) -> emqx_ee_connector_rocketmq; resource_type(sqlserver) -> emqx_ee_connector_sqlserver; resource_type(opents) -> emqx_bridge_opents_connector; -resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer. +resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer; +resource_type(oracle) -> emqx_oracle. fields(bridges) -> [ @@ -167,6 +170,14 @@ fields(bridges) -> desc => <<"OpenTSDB Bridge Config">>, required => false } + )}, + {oracle, + mk( + hoconsc:map(name, ref(emqx_bridge_oracle, "config")), + #{ + desc => <<"Oracle Bridge Config">>, + required => false + } )} ] ++ kafka_structs() ++ pulsar_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ diff --git a/mix.exs b/mix.exs index 2c391611e..8e100967b 100644 --- a/mix.exs +++ b/mix.exs @@ -170,7 +170,9 @@ defmodule EMQXUmbrella.MixProject do :emqx_bridge_rocketmq, :emqx_bridge_tdengine, :emqx_bridge_timescale, - :emqx_bridge_pulsar + :emqx_bridge_pulsar, + :emqx_oracle, + :emqx_bridge_oracle ]) end @@ -377,6 +379,8 @@ defmodule EMQXUmbrella.MixProject do emqx_bridge_rocketmq: :permanent, emqx_bridge_tdengine: :permanent, emqx_bridge_timescale: :permanent, + emqx_oracle: :permanent, + emqx_bridge_oracle: :permanent, emqx_ee_schema_registry: :permanent ], else: [] diff --git a/rebar.config.erl b/rebar.config.erl index fa7bdbdf3..bcc104b31 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -94,6 +94,8 @@ is_community_umbrella_app("apps/emqx_bridge_redis") -> false; is_community_umbrella_app("apps/emqx_bridge_rocketmq") -> false; is_community_umbrella_app("apps/emqx_bridge_tdengine") -> false; is_community_umbrella_app("apps/emqx_bridge_timescale") -> false; +is_community_umbrella_app("apps/emqx_bridge_oracle") -> false; +is_community_umbrella_app("apps/emqx_oracle") -> false; is_community_umbrella_app(_) -> true. is_jq_supported() -> @@ -470,6 +472,8 @@ relx_apps_per_edition(ee) -> emqx_bridge_rocketmq, emqx_bridge_tdengine, emqx_bridge_timescale, + emqx_oracle, + emqx_bridge_oracle, emqx_ee_schema_registry ]; relx_apps_per_edition(ce) -> diff --git a/rel/i18n/emqx_bridge_oracle.hocon b/rel/i18n/emqx_bridge_oracle.hocon new file mode 100644 index 000000000..95e0cf4af --- /dev/null +++ b/rel/i18n/emqx_bridge_oracle.hocon @@ -0,0 +1,52 @@ +emqx_bridge_oracle { + + local_topic { + desc = "The MQTT topic filter to be forwarded to Oracle Database. All MQTT 'PUBLISH' messages with the topic" + " matching the local_topic will be forwarded.
" + "NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is" + " configured, then both the data got from the rule and the MQTT messages that match local_topic" + " will be forwarded." + label = "Local Topic" + } + + sql_template { + desc = "SQL Template. The template string can contain placeholders" + " for message metadata and payload field. The placeholders are inserted" + " without any checking and special formatting, so it is important to" + " ensure that the inserted values are formatted and escaped correctly." + label = "SQL Template" + } + + server { + desc = "The IPv4 or IPv6 address or the hostname to connect to.
" + "A host entry has the following form: `Host[:Port]`.
" + "The Oracle Database default port 1521 is used if `[:Port]` is not specified." + label = "Server Host" + } + + sid { + desc = "Sid for Oracle Database" + label = "Oracle Database Sid." + } + + config_enable { + desc = "Enable or disable this bridge" + label = "Enable Or Disable Bridge" + } + + desc_config { + desc = "Configuration for an Oracle Database bridge." + label = "Oracle Database Bridge Configuration" + } + + desc_type { + desc = "The Bridge Type" + label = "Bridge Type" + } + + desc_name { + desc = "Bridge name." + label = "Bridge Name" + } + +} diff --git a/rel/i18n/emqx_oracle.hocon b/rel/i18n/emqx_oracle.hocon new file mode 100644 index 000000000..58de8e4c7 --- /dev/null +++ b/rel/i18n/emqx_oracle.hocon @@ -0,0 +1,15 @@ +emqx_oracle { + + server { + desc = "The IPv4 or IPv6 address or the hostname to connect to.
" + "A host entry has the following form: `Host[:Port]`.
" + "The Oracle Database default port 1521 is used if `[:Port]` is not specified." + label = "Server Host" + } + + sid { + desc = "Sid for Oracle Database." + label = "Oracle Database Sid" + } + +} diff --git a/rel/i18n/zh/emqx_bridge_oracle.hocon b/rel/i18n/zh/emqx_bridge_oracle.hocon new file mode 100644 index 000000000..290ac6d07 --- /dev/null +++ b/rel/i18n/zh/emqx_bridge_oracle.hocon @@ -0,0 +1,51 @@ +emqx_bridge_oracle { + + local_topic { + desc = "发送到 'local_topic' 的消息都会转发到 Oracle Database。
" + "注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。" + label = "本地 Topic" + } + + sql_template { + desc = "SQL模板。模板字符串可以包含消息元数据和有效载荷字段的占位符。占位符" + "的插入不需要任何检查和特殊格式化,因此必须确保插入的数值格式化和转义正确。模板字符串可以包含占位符" + "模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入" + "所以必须确保插入的值的格式正确。因此,确保插入的值格式化和转义正确是非常重要的。模板字符串可以包含占位符" + "模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入" + "所以必须确保插入的值的格式正确。确保插入的值被正确地格式化和转义。" + label = "SQL 模板" + } + + server { + desc = "将要连接的 IPv4 或 IPv6 地址,或者主机名。
" + "主机名具有以下形式:`Host[:Port]`。
" + "如果未指定 `[:Port]`,则使用 Oracle Database 默认端口 1521。" + label = "服务器地址" + } + + sid { + desc = "Oracle Database Sid 名称" + label = "Oracle Database Sid" + } + + config_enable { + desc = "启用/禁用桥接" + label = "启用/禁用桥接" + } + + desc_config { + desc = "Oracle Database 桥接配置" + label = "Oracle Database 桥接配置" + } + + desc_type { + desc = "Bridge 类型" + label = "桥接类型" + } + + desc_name { + desc = "桥接名字" + label = "桥接名字" + } + +} diff --git a/rel/i18n/zh/emqx_oracle.hocon b/rel/i18n/zh/emqx_oracle.hocon new file mode 100644 index 000000000..70c597cb1 --- /dev/null +++ b/rel/i18n/zh/emqx_oracle.hocon @@ -0,0 +1,15 @@ +emqx_oracle { + + server { + desc = "将要连接的 IPv4 或 IPv6 地址,或者主机名。
" + "主机名具有以下形式:`Host[:Port]`。
" + "如果未指定 `[:Port]`,则使用 Oracle Database 默认端口 1521。" + label = "服务器地址" + } + + sid { + desc = "Oracle Database Sid 名称" + label = "Oracle Database Sid" + } + +} diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index fec0d589c..ad0736bb3 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -193,6 +193,9 @@ for dep in ${CT_DEPS}; do ;; pulsar) FILES+=( '.ci/docker-compose-file/docker-compose-pulsar.yaml' ) + ;; + oracle) + FILES+=( '.ci/docker-compose-file/docker-compose-oracle.yaml' ) ;; *) echo "unknown_ct_dependency $dep"