feat(oracle): Oracle Database integration

This commit is contained in:
Paulo Zulato 2023-04-24 12:21:02 -03:00
parent c984449bad
commit dd90b2f498
32 changed files with 1642 additions and 14 deletions

View File

@ -0,0 +1,11 @@
version: '3.9'
services:
oracle_server:
container_name: oracle
image: oracleinanutshell/oracle-xe-11g:1.0.0
restart: always
environment:
ORACLE_DISABLE_ASYNCH_IO: true
networks:
- emqx_bridge

View File

@ -119,5 +119,11 @@
"listen": "0.0.0.0:6653", "listen": "0.0.0.0:6653",
"upstream": "pulsar:6653", "upstream": "pulsar:6653",
"enabled": true "enabled": true
},
{
"name": "oracle",
"listen": "0.0.0.0:1521",
"upstream": "oracle:1521",
"enabled": true
} }
] ]

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge, [ {application, emqx_bridge, [
{description, "EMQX bridges"}, {description, "EMQX bridges"},
{vsn, "0.1.17"}, {vsn, "0.1.18"},
{registered, [emqx_bridge_sup]}, {registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}}, {mod, {emqx_bridge_app, []}},
{applications, [ {applications, [

View File

@ -71,7 +71,8 @@
T == rocketmq; T == rocketmq;
T == cassandra; T == cassandra;
T == sqlserver; T == sqlserver;
T == pulsar_producer T == pulsar_producer;
T == oracle
). ).
load() -> load() ->

View File

@ -0,0 +1,94 @@
Business Source License 1.1
Licensor: Hangzhou EMQ Technologies Co., Ltd.
Licensed Work: EMQX Enterprise Edition
The Licensed Work is (c) 2023
Hangzhou EMQ Technologies Co., Ltd.
Additional Use Grant: Students and educators are granted right to copy,
modify, and create derivative work for research
or education.
Change Date: 2027-02-01
Change License: Apache License, Version 2.0
For information about alternative licensing arrangements for the Software,
please contact Licensor: https://www.emqx.com/en/contact
Notice
The Business Source License (this document, or the “License”) is not an Open
Source license. However, the Licensed Work will eventually be made available
under an Open Source License, as stated in this License.
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
“Business Source License” is a trademark of MariaDB Corporation Ab.
-----------------------------------------------------------------------------
Business Source License 1.1
Terms
The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited
production use.
Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.
If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.
All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.
You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.
Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.
This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.
MariaDB hereby grants you permission to use this Licenses text to license
your works, and to refer to it using the trademark “Business Source License”,
as long as you comply with the Covenants of Licensor below.
Covenants of Licensor
In consideration of the right to use this Licenses text and the “Business
Source License” name and trademark, Licensor covenants to MariaDB, and to all
other recipients of the licensed work to be provided by Licensor:
1. To specify as the Change License the GPL Version 2.0 or any later version,
or a license that is compatible with GPL Version 2.0 or a later version,
where “compatible” means that software provided under the Change License can
be included in a program with software provided under GPL Version 2.0 or a
later version. Licensor may specify additional Change Licenses without
limitation.
2. To either: (a) specify an additional grant of rights to use that does not
impose any additional restriction on the right granted in this License, as
the Additional Use Grant; or (b) insert the text “None”.
3. To specify a Change Date.
4. Not to modify this License in any other way.

View File

@ -0,0 +1,28 @@
# EMQX Oracle Database Bridge
This application houses the Oracle Database bridge for EMQX Enterprise Edition.
It implements the data bridge APIs for interacting with an Oracle Database Bridge.
# Documentation
- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)
for the EMQX rules engine introduction.
# HTTP APIs
- Several APIs are provided for bridge management, which includes create bridge,
update bridge, get bridge, stop or restart bridge and list bridges etc.
Refer to [API Docs - Bridges](https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges) for more detailed information.
## Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).
## License
See [BSL](./BSL.txt).

View File

@ -0,0 +1,2 @@
toxiproxy
oracle

View File

@ -0,0 +1,13 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {emqx_oracle, {path, "../../apps/emqx_oracle"}}
, {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}}
, {emqx_bridge, {path, "../../apps/emqx_bridge"}}
]}.
{shell, [
% {config, "config/sys.config"},
{apps, [emqx_bridge_oracle]}
]}.

View File

@ -0,0 +1,14 @@
{application, emqx_bridge_oracle, [
{description, "EMQX Enterprise Oracle Database Bridge"},
{vsn, "0.1.0"},
{registered, []},
{applications, [
kernel,
stdlib,
emqx_oracle
]},
{env, []},
{modules, []},
{links, []}
]}.

View File

@ -0,0 +1,109 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_oracle).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx_bridge/include/emqx_bridge.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-export([
conn_bridge_examples/1
]).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
-define(DEFAULT_SQL, <<
"insert into t_mqtt_msg(msgid, topic, qos, payload)"
"values (${id}, ${topic}, ${qos}, ${payload})"
>>).
conn_bridge_examples(Method) ->
[
#{
<<"oracle">> => #{
summary => <<"Oracle Database Bridge">>,
value => values(Method)
}
}
].
values(_Method) ->
#{
enable => true,
type => oracle,
name => <<"foo">>,
server => <<"127.0.0.1:1521">>,
pool_size => 8,
database => <<"ORCL">>,
sid => <<"ORCL">>,
username => <<"root">>,
password => <<"******">>,
sql => ?DEFAULT_SQL,
local_topic => <<"local/topic/#">>,
resource_opts => #{
worker_pool_size => 8,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async,
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
}
}.
%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
namespace() -> "bridge_oracle".
roots() -> [].
fields("config") ->
[
{enable,
hoconsc:mk(
boolean(),
#{desc => ?DESC("config_enable"), default => true}
)},
{sql,
hoconsc:mk(
binary(),
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
)},
{local_topic,
hoconsc:mk(
binary(),
#{desc => ?DESC("local_topic"), default => undefined}
)}
] ++ emqx_resource_schema:fields("resource_opts") ++
(emqx_oracle_schema:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields());
fields("post") ->
fields("post", oracle);
fields("put") ->
fields("config");
fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post").
fields("post", Type) ->
[type_field(Type), name_field() | fields("config")].
desc("config") ->
?DESC("desc_config");
desc(_) ->
undefined.
%% -------------------------------------------------------------------------------------------------
type_field(Type) ->
{type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
name_field() ->
{name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

View File

@ -0,0 +1,594 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_oracle_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
-define(BRIDGE_TYPE_BIN, <<"oracle">>).
-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_oracle, emqx_bridge_oracle]).
-define(DATABASE, "XE").
-define(RULE_TOPIC, "mqtt/rule").
% -define(RULE_TOPIC_BIN, <<?RULE_TOPIC>>).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
[
{group, plain}
].
groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
[
{plain, AllTCs}
].
only_once_tests() ->
[t_create_via_http].
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)),
_ = application:stop(emqx_connector),
ok.
init_per_group(plain = Type, Config) ->
OracleHost = os:getenv("ORACLE_PLAIN_HOST", "toxiproxy.emqx.net"),
OraclePort = list_to_integer(os:getenv("ORACLE_PLAIN_PORT", "1521")),
ProxyName = "oracle",
case emqx_common_test_helpers:is_tcp_server_available(OracleHost, OraclePort) of
true ->
Config1 = common_init_per_group(),
[
{proxy_name, ProxyName},
{oracle_host, OracleHost},
{oracle_port, OraclePort},
{connection_type, Type}
| Config1 ++ Config
];
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_oracle);
_ ->
{skip, no_oracle}
end
end;
init_per_group(_Group, Config) ->
Config.
end_per_group(Group, Config) when
Group =:= plain
->
common_end_per_group(Config),
ok;
end_per_group(_Group, _Config) ->
ok.
common_init_per_group() ->
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
application:load(emqx_bridge),
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps(?APPS),
{ok, _} = application:ensure_all_started(emqx_connector),
emqx_mgmt_api_test_util:init_suite(),
UniqueNum = integer_to_binary(erlang:unique_integer()),
MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>,
[
{proxy_host, ProxyHost},
{proxy_port, ProxyPort},
{mqtt_topic, MQTTTopic}
].
common_end_per_group(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
delete_all_bridges(),
ok.
init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config).
end_per_testcase(_Testcase, Config) ->
common_end_per_testcase(_Testcase, Config).
common_init_per_testcase(TestCase, Config0) ->
ct:timetrap(timer:seconds(60)),
delete_all_bridges(),
UniqueNum = integer_to_binary(erlang:unique_integer()),
OracleTopic =
<<
(atom_to_binary(TestCase))/binary,
UniqueNum/binary
>>,
ConnectionType = ?config(connection_type, Config0),
Config = [{oracle_topic, OracleTopic} | Config0],
{Name, ConfigString, OracleConfig} = oracle_config(
TestCase, ConnectionType, Config
),
ok = snabbkaffe:start_trace(),
[
{oracle_name, Name},
{oracle_config_string, ConfigString},
{oracle_config, OracleConfig}
| Config
].
common_end_per_testcase(_Testcase, Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ok;
false ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
delete_all_bridges(),
%% in CI, apparently this needs more time since the
%% machines struggle with all the containers running...
emqx_common_test_helpers:call_janitor(60_000),
ok = snabbkaffe:stop(),
ok
end.
delete_all_bridges() ->
lists:foreach(
fun(#{name := Name, type := Type}) ->
emqx_bridge:remove(Type, Name)
end,
emqx_bridge:list()
).
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
sql_insert_template_for_bridge() ->
"INSERT INTO mqtt_test(topic, msgid, payload, retain) VALUES (${topic}, ${id}, ${payload}, ${retain})".
sql_create_table() ->
"CREATE TABLE mqtt_test (topic VARCHAR2(255), msgid VARCHAR2(64), payload NCLOB, retain NUMBER(1))".
sql_drop_table() ->
"DROP TABLE mqtt_test".
reset_table(Config) ->
ResourceId = resource_id(Config),
_ = emqx_resource:simple_sync_query(ResourceId, {sql, sql_drop_table()}),
{ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query(
ResourceId, {sql, sql_create_table()}
),
ok.
drop_table(Config) ->
ResourceId = resource_id(Config),
emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}),
ok.
oracle_config(TestCase, _ConnectionType, Config) ->
UniqueNum = integer_to_binary(erlang:unique_integer()),
OracleHost = ?config(oracle_host, Config),
OraclePort = ?config(oracle_port, Config),
Name = <<
(atom_to_binary(TestCase))/binary, UniqueNum/binary
>>,
ServerURL = iolist_to_binary([
OracleHost,
":",
integer_to_binary(OraclePort)
]),
ConfigString =
io_lib:format(
"bridges.oracle.~s {\n"
" enable = true\n"
" database = \"~s\"\n"
" sid = \"~s\"\n"
" server = \"~s\"\n"
" username = \"system\"\n"
" password = \"oracle\"\n"
" pool_size = 1\n"
" sql = \"~s\"\n"
" resource_opts = {\n"
" auto_restart_interval = 5000\n"
" request_timeout = 30000\n"
" query_mode = \"async\"\n"
" enable_batch = true\n"
" batch_size = 3\n"
" batch_time = \"3s\"\n"
" worker_pool_size = 1\n"
" }\n"
"}\n",
[
Name,
?DATABASE,
?DATABASE,
ServerURL,
sql_insert_template_for_bridge()
]
),
{Name, ConfigString, parse_and_check(ConfigString, Name)}.
parse_and_check(ConfigString, Name) ->
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
TypeBin = ?BRIDGE_TYPE_BIN,
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
#{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
Config.
resource_id(Config) ->
Type = ?BRIDGE_TYPE_BIN,
Name = ?config(oracle_name, Config),
emqx_bridge_resource:resource_id(Type, Name).
create_bridge(Config) ->
create_bridge(Config, _Overrides = #{}).
create_bridge(Config, Overrides) ->
Type = ?BRIDGE_TYPE_BIN,
Name = ?config(oracle_name, Config),
OracleConfig0 = ?config(oracle_config, Config),
OracleConfig = emqx_utils_maps:deep_merge(OracleConfig0, Overrides),
emqx_bridge:create(Type, Name, OracleConfig).
create_bridge_api(Config) ->
create_bridge_api(Config, _Overrides = #{}).
create_bridge_api(Config, Overrides) ->
TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(oracle_name, Config),
OracleConfig0 = ?config(oracle_config, Config),
OracleConfig = emqx_utils_maps:deep_merge(OracleConfig0, Overrides),
Params = OracleConfig#{<<"type">> => TypeBin, <<"name">> => Name},
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("creating bridge (via http): ~p", [Params]),
Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
{ok, {Status, Headers, Body0}} ->
{ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
Error ->
Error
end,
ct:pal("bridge create result: ~p", [Res]),
Res.
update_bridge_api(Config) ->
update_bridge_api(Config, _Overrides = #{}).
update_bridge_api(Config, Overrides) ->
TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(oracle_name, Config),
OracleConfig0 = ?config(oracle_config, Config),
OracleConfig = emqx_utils_maps:deep_merge(OracleConfig0, Overrides),
BridgeId = emqx_bridge_resource:bridge_id(TypeBin, Name),
Params = OracleConfig#{<<"type">> => TypeBin, <<"name">> => Name},
Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("updating bridge (via http): ~p", [Params]),
Res =
case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Params, Opts) of
{ok, {_Status, _Headers, Body0}} -> {ok, emqx_utils_json:decode(Body0, [return_maps])};
Error -> Error
end,
ct:pal("bridge update result: ~p", [Res]),
Res.
probe_bridge_api(Config) ->
probe_bridge_api(Config, _Overrides = #{}).
probe_bridge_api(Config, _Overrides) ->
TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(oracle_name, Config),
OracleConfig = ?config(oracle_config, Config),
Params = OracleConfig#{<<"type">> => TypeBin, <<"name">> => Name},
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("probing bridge (via http): ~p", [Params]),
Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
{ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
Error -> Error
end,
ct:pal("bridge probe result: ~p", [Res]),
Res.
create_rule_and_action_http(Config) ->
OracleName = ?config(oracle_name, Config),
BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, OracleName),
Params = #{
enable => true,
sql => <<"SELECT * FROM \"", ?RULE_TOPIC, "\"">>,
actions => [BridgeId]
},
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
ct:pal("rule action params: ~p", [Params]),
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
{ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
Error -> Error
end.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
% Under normal operations, the bridge will be called async via
% `simple_async_query'.
t_sync_query(Config) ->
ResourceId = resource_id(Config),
?check_trace(
begin
?assertMatch({ok, _}, create_bridge_api(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
reset_table(Config),
MsgId = erlang:unique_integer(),
Params = #{
topic => ?config(mqtt_topic, Config),
id => MsgId,
payload => ?config(oracle_name, Config),
retain => true
},
Message = {send_message, Params},
?assertEqual(
{ok, [{affected_rows, 1}]}, emqx_resource:simple_sync_query(ResourceId, Message)
),
ok
end,
[]
),
ok.
t_async_query(Config) ->
Overrides = #{
<<"resource_opts">> => #{
<<"enable_batch">> => <<"false">>,
<<"batch_size">> => 1
}
},
ResourceId = resource_id(Config),
?check_trace(
begin
?assertMatch({ok, _}, create_bridge_api(Config, Overrides)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
reset_table(Config),
MsgId = erlang:unique_integer(),
Params = #{
topic => ?config(mqtt_topic, Config),
id => MsgId,
payload => ?config(oracle_name, Config),
retain => false
},
Message = {send_message, Params},
?assertMatch(
{
ok,
{ok, #{result := {ok, [{affected_rows, 1}]}}}
},
?wait_async_action(
emqx_resource:query(ResourceId, Message),
#{?snk_kind := oracle_query},
5_000
)
),
ok
end,
[]
),
ok.
t_batch_sync_query(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config),
?check_trace(
begin
?assertMatch({ok, _}, create_bridge_api(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 30,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
reset_table(Config),
MsgId = erlang:unique_integer(),
Params = #{
topic => ?config(mqtt_topic, Config),
id => MsgId,
payload => ?config(oracle_name, Config),
retain => false
},
% Send 3 async messages while resource is down. When it comes back, these messages
% will be delivered in sync way. If we try to send sync messages directly, it will
% be sent async as callback_mode is set to async_if_possible.
Message = {send_message, Params},
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
ct:sleep(1000),
emqx_resource:query(ResourceId, Message),
emqx_resource:query(ResourceId, Message),
emqx_resource:query(ResourceId, Message)
end),
?retry(
_Sleep = 1_000,
_Attempts = 30,
?assertMatch(
{ok, [{result_set, _, _, [[{3}]]}]},
emqx_resource:simple_sync_query(
ResourceId, {query, "SELECT COUNT(*) FROM mqtt_test"}
)
)
),
ok
end,
[]
),
ok.
t_batch_async_query(Config) ->
ResourceId = resource_id(Config),
?check_trace(
begin
?assertMatch({ok, _}, create_bridge_api(Config)),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
reset_table(Config),
MsgId = erlang:unique_integer(),
Params = #{
topic => ?config(mqtt_topic, Config),
id => MsgId,
payload => ?config(oracle_name, Config),
retain => false
},
Message = {send_message, Params},
?assertMatch(
{
ok,
{ok, #{result := {ok, [{affected_rows, 1}]}}}
},
?wait_async_action(
emqx_resource:query(ResourceId, Message),
#{?snk_kind := oracle_batch_query},
5_000
)
),
ok
end,
[]
),
ok.
t_create_via_http(Config) ->
?check_trace(
begin
?assertMatch({ok, _}, create_bridge_api(Config)),
%% lightweight matrix testing some configs
?assertMatch(
{ok, _},
update_bridge_api(
Config,
#{
<<"resource_opts">> =>
#{<<"batch_size">> => 4}
}
)
),
?assertMatch(
{ok, _},
update_bridge_api(
Config,
#{
<<"resource_opts">> =>
#{<<"batch_time">> => <<"4s">>}
}
)
),
ok
end,
[]
),
ok.
t_start_stop(Config) ->
OracleName = ?config(oracle_name, Config),
ResourceId = resource_id(Config),
?check_trace(
begin
?assertMatch({ok, _}, create_bridge(Config)),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
%% Check that the bridge probe API doesn't leak atoms.
ProbeRes0 = probe_bridge_api(
Config,
#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
AtomsBefore = erlang:system_info(atom_count),
%% Probe again; shouldn't have created more atoms.
ProbeRes1 = probe_bridge_api(
Config,
#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
AtomsAfter = erlang:system_info(atom_count),
?assertEqual(AtomsBefore, AtomsAfter),
%% Now stop the bridge.
?assertMatch(
{{ok, _}, {ok, _}},
?wait_async_action(
emqx_bridge:disable_enable(disable, ?BRIDGE_TYPE_BIN, OracleName),
#{?snk_kind := oracle_bridge_stopped},
5_000
)
),
ok
end,
fun(Trace) ->
%% one for each probe, one for real
?assertMatch([_, _, _], ?of_kind(oracle_bridge_stopped, Trace)),
ok
end
),
ok.
t_on_get_status(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config),
?assertMatch({ok, _}, create_bridge(Config)),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
ct:sleep(500),
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
end),
%% Check that it recovers itself.
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
ok.

94
apps/emqx_oracle/BSL.txt Normal file
View File

@ -0,0 +1,94 @@
Business Source License 1.1
Licensor: Hangzhou EMQ Technologies Co., Ltd.
Licensed Work: EMQX Enterprise Edition
The Licensed Work is (c) 2023
Hangzhou EMQ Technologies Co., Ltd.
Additional Use Grant: Students and educators are granted right to copy,
modify, and create derivative work for research
or education.
Change Date: 2027-02-01
Change License: Apache License, Version 2.0
For information about alternative licensing arrangements for the Software,
please contact Licensor: https://www.emqx.com/en/contact
Notice
The Business Source License (this document, or the “License”) is not an Open
Source license. However, the Licensed Work will eventually be made available
under an Open Source License, as stated in this License.
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
“Business Source License” is a trademark of MariaDB Corporation Ab.
-----------------------------------------------------------------------------
Business Source License 1.1
Terms
The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited
production use.
Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.
If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.
All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.
You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.
Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.
This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.
MariaDB hereby grants you permission to use this Licenses text to license
your works, and to refer to it using the trademark “Business Source License”,
as long as you comply with the Covenants of Licensor below.
Covenants of Licensor
In consideration of the right to use this Licenses text and the “Business
Source License” name and trademark, Licensor covenants to MariaDB, and to all
other recipients of the licensed work to be provided by Licensor:
1. To specify as the Change License the GPL Version 2.0 or any later version,
or a license that is compatible with GPL Version 2.0 or a later version,
where “compatible” means that software provided under the Change License can
be included in a program with software provided under GPL Version 2.0 or a
later version. Licensor may specify additional Change Licenses without
limitation.
2. To either: (a) specify an additional grant of rights to use that does not
impose any additional restriction on the right granted in this License, as
the Additional Use Grant; or (b) insert the text “None”.
3. To specify a Change Date.
4. Not to modify this License in any other way.

View File

@ -0,0 +1,14 @@
# Oracle Database Connector
This application houses the Oracle Database connector for EMQX Enterprise Edition.
It provides the APIs to connect to Oracle Database.
So far it is only used to insert messages as data bridge.
## Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).
## License
See [BSL](./BSL.txt).

View File

@ -0,0 +1,7 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {jamdb_oracle, {git, "https://github.com/emqx/jamdb_oracle", {tag, "0.4.9.4"}}}
, {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}}
]}.

View File

@ -0,0 +1,14 @@
{application, emqx_oracle, [
{description, "EMQX Enterprise Oracle Database Connector"},
{vsn, "0.1.0"},
{registered, []},
{applications, [
kernel,
stdlib,
jamdb_oracle
]},
{env, []},
{modules, []},
{links, []}
]}.

View File

@ -0,0 +1,434 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_oracle).
-behaviour(emqx_resource).
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(ORACLE_DEFAULT_PORT, 1521).
%%====================================================================
%% Exports
%%====================================================================
%% callbacks for behaviour emqx_resource
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
on_batch_query/3,
on_query_async/4,
on_batch_query_async/4,
on_get_status/2
]).
%% callbacks for ecpool
-export([connect/1, prepare_sql_to_conn/2]).
%% Internal exports used to execute code with ecpool worker
-export([
query/3,
execute_batch/3,
do_async_reply/2,
do_get_status/1
]).
-export([
oracle_host_options/0
]).
-define(ACTION_SEND_MESSAGE, send_message).
-define(SYNC_QUERY_MODE, no_handover).
-define(ASYNC_QUERY_MODE(REPLY), {handover_async, {?MODULE, do_async_reply, [REPLY]}}).
-define(ORACLE_HOST_OPTIONS, #{
default_port => ?ORACLE_DEFAULT_PORT
}).
-define(MAX_CURSORS, 10).
-define(DEFAULT_POOL_SIZE, 8).
-define(OPT_TIMEOUT, 30000).
-type prepares() :: #{atom() => binary()}.
-type params_tokens() :: #{atom() => list()}.
-type state() ::
#{
pool_name := binary(),
prepare_sql := prepares(),
params_tokens := params_tokens(),
batch_params_tokens := params_tokens()
}.
callback_mode() -> async_if_possible.
is_buffer_supported() -> false.
-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
on_start(
InstId,
#{
server := Server,
database := DB,
sid := Sid,
username := User
} = Config
) ->
?SLOG(info, #{
msg => "starting_oracle_connector",
connector => InstId,
config => emqx_utils:redact(Config)
}),
?tp(oracle_bridge_started, #{instance_id => InstId, config => Config}),
{ok, _} = application:ensure_all_started(ecpool),
{ok, _} = application:ensure_all_started(jamdb_oracle),
jamdb_oracle_conn:set_max_cursors_number(?MAX_CURSORS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, oracle_host_options()),
ServiceName = maps:get(<<"service_name">>, Config, Sid),
Options = [
{host, Host},
{port, Port},
{user, emqx_plugin_libs_rule:str(User)},
{password, emqx_secret:wrap(maps:get(password, Config, ""))},
{sid, emqx_plugin_libs_rule:str(Sid)},
{service_name, emqx_plugin_libs_rule:str(ServiceName)},
{database, DB},
{pool_size, maps:get(<<"pool_size">>, Config, ?DEFAULT_POOL_SIZE)},
{timeout, ?OPT_TIMEOUT},
{app_name, "EMQX Data To Oracle Database Action"}
],
PoolName = InstId,
Prepares = parse_prepare_sql(Config),
InitState = #{pool_name => PoolName, prepare_statement => #{}},
State = maps:merge(InitState, Prepares),
case emqx_resource_pool:start(InstId, ?MODULE, Options) of
ok ->
{ok, init_prepare(State)};
{error, Reason} ->
?tp(
oracle_connector_start_failed,
#{error => Reason}
),
{error, Reason}
end.
on_stop(InstId, #{pool_name := PoolName}) ->
?SLOG(info, #{
msg => "stopping_oracle_connector",
connector => InstId
}),
?tp(oracle_bridge_stopped, #{instance_id => InstId}),
emqx_resource_pool:stop(PoolName).
on_query(InstId, {TypeOrKey, NameOrSQL}, #{pool_name := _PoolName} = State) ->
on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
on_query(
InstId,
{TypeOrKey, NameOrSQL, Params},
#{pool_name := PoolName} = State
) ->
?SLOG(debug, #{
msg => "oracle database connector received sql query",
connector => InstId,
type => TypeOrKey,
sql => NameOrSQL,
state => State
}),
Type = query,
{NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
Res = on_sql_query(InstId, PoolName, Type, ?SYNC_QUERY_MODE, NameOrSQL2, Data),
handle_result(Res).
on_query_async(InstId, {TypeOrKey, NameOrSQL}, Reply, State) ->
on_query_async(InstId, {TypeOrKey, NameOrSQL, []}, Reply, State);
on_query_async(
InstId, {TypeOrKey, NameOrSQL, Params} = Query, Reply, #{pool_name := PoolName} = State
) ->
?SLOG(debug, #{
msg => "oracle database connector received async sql query",
connector => InstId,
query => Query,
reply => Reply,
state => State
}),
ApplyMode = ?ASYNC_QUERY_MODE(Reply),
Type = query,
{NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
Res = on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL2, Data),
handle_result(Res).
on_batch_query(
InstId,
BatchReq,
#{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State
) ->
case BatchReq of
[{Key, _} = Request | _] ->
BinKey = to_bin(Key),
case maps:get(BinKey, Tokens, undefined) of
undefined ->
Log = #{
connector => InstId,
first_request => Request,
state => State,
msg => "batch prepare not implemented"
},
?SLOG(error, Log),
{error, {unrecoverable_error, batch_prepare_not_implemented}};
TokenList ->
{_, Datas} = lists:unzip(BatchReq),
Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
St = maps:get(BinKey, Sts),
case
on_sql_query(InstId, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2)
of
{ok, Results} ->
handle_batch_result(Results, 0);
Result ->
Result
end
end;
_ ->
Log = #{
connector => InstId,
request => BatchReq,
state => State,
msg => "invalid request"
},
?SLOG(error, Log),
{error, {unrecoverable_error, invalid_request}}
end.
on_batch_query_async(
InstId,
BatchReq,
Reply,
#{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State
) ->
case BatchReq of
[{Key, _} = Request | _] ->
BinKey = to_bin(Key),
case maps:get(BinKey, Tokens, undefined) of
undefined ->
Log = #{
connector => InstId,
first_request => Request,
state => State,
msg => "batch prepare not implemented"
},
?SLOG(error, Log),
{error, {unrecoverable_error, batch_prepare_not_implemented}};
TokenList ->
{_, Datas} = lists:unzip(BatchReq),
Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
St = maps:get(BinKey, Sts),
case
on_sql_query(
InstId, PoolName, execute_batch, ?ASYNC_QUERY_MODE(Reply), St, Datas2
)
of
{ok, Results} ->
handle_batch_result(Results, 0);
Result ->
Result
end
end;
_ ->
Log = #{
connector => InstId,
request => BatchReq,
state => State,
msg => "invalid request"
},
?SLOG(error, Log),
{error, {unrecoverable_error, invalid_request}}
end.
proc_sql_params(query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params};
proc_sql_params(TypeOrKey, SQLOrData, Params, #{
params_tokens := ParamsTokens, prepare_sql := PrepareSql
}) ->
Key = to_bin(TypeOrKey),
case maps:get(Key, ParamsTokens, undefined) of
undefined ->
{SQLOrData, Params};
Tokens ->
case maps:get(Key, PrepareSql, undefined) of
undefined ->
{SQLOrData, Params};
Sql ->
{Sql, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
end
end.
on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL, Data) ->
case ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, ApplyMode) of
{error, Reason} = Result ->
?tp(
oracle_connector_query_return,
#{error => Reason}
),
?SLOG(error, #{
msg => "oracle database connector do sql query failed",
connector => InstId,
type => Type,
sql => NameOrSQL,
reason => Reason
}),
Result;
Result ->
?tp(
oracle_connector_query_return,
#{result => Result}
),
Result
end.
on_get_status(_InstId, #{pool_name := Pool} = State) ->
case emqx_resource_pool:health_check_workers(Pool, fun ?MODULE:do_get_status/1) of
true ->
case do_check_prepares(State) of
ok ->
connected;
{ok, NState} ->
%% return new state with prepared statements
{connected, NState}
end;
false ->
disconnected
end.
do_get_status(Conn) ->
ok == element(1, jamdb_oracle:sql_query(Conn, "select 1 from dual")).
do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
ok;
do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) ->
{ok, Sts} = prepare_sql(Prepares, PoolName),
{ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}.
%% ===================================================================
oracle_host_options() ->
?ORACLE_HOST_OPTIONS.
connect(Opts) ->
Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
NewOpts = lists:keyreplace(password, 1, Opts, {password, Password}),
jamdb_oracle:start_link(NewOpts).
sql_query_to_str(SqlQuery) ->
emqx_plugin_libs_rule:str(SqlQuery).
sql_params_to_str(Params) when is_list(Params) ->
lists:map(
fun
(false) -> "0";
(true) -> "1";
(Value) -> emqx_plugin_libs_rule:str(Value)
end,
Params
).
query(Conn, SQL, Params) ->
Ret = jamdb_oracle:sql_query(Conn, {sql_query_to_str(SQL), sql_params_to_str(Params)}),
?tp(oracle_query, #{conn => Conn, sql => SQL, params => Params, result => Ret}),
handle_result(Ret).
execute_batch(Conn, SQL, ParamsList) ->
ParamsListStr = lists:map(fun sql_params_to_str/1, ParamsList),
Ret = jamdb_oracle:sql_query(Conn, {batch, sql_query_to_str(SQL), ParamsListStr}),
?tp(oracle_batch_query, #{conn => Conn, sql => SQL, params => ParamsList, result => Ret}),
handle_result(Ret).
parse_prepare_sql(Config) ->
SQL =
case maps:get(prepare_statement, Config, undefined) of
undefined ->
case maps:get(sql, Config, undefined) of
undefined -> #{};
Template -> #{<<"send_message">> => Template}
end;
Any ->
Any
end,
parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
parse_prepare_sql([{Key, H} | T], Prepares, Tokens) ->
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, ':n'),
parse_prepare_sql(
T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
);
parse_prepare_sql([], Prepares, Tokens) ->
#{
prepare_sql => Prepares,
params_tokens => Tokens
}.
init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) ->
{ok, Sts} = prepare_sql(Prepares, PoolName),
State#{prepare_statement := Sts}.
prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
prepare_sql(maps:to_list(Prepares), PoolName);
prepare_sql(Prepares, PoolName) ->
Data = do_prepare_sql(Prepares, PoolName),
{ok, _Sts} = Data,
ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
Data.
do_prepare_sql(Prepares, PoolName) ->
do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}).
do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) ->
{ok, Conn} = ecpool_worker:client(Worker),
{ok, Sts} = prepare_sql_to_conn(Conn, Prepares),
do_prepare_sql(T, Prepares, PoolName, Sts);
do_prepare_sql([], _Prepares, _PoolName, LastSts) ->
{ok, LastSts}.
prepare_sql_to_conn(Conn, Prepares) ->
prepare_sql_to_conn(Conn, Prepares, #{}).
prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements};
prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) ->
LogMeta = #{msg => "Oracle Database Prepare Statement", name => Key, prepare_sql => SQL},
?SLOG(info, LogMeta),
prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => SQL}).
to_bin(Bin) when is_binary(Bin) ->
Bin;
to_bin(Atom) when is_atom(Atom) ->
erlang:atom_to_binary(Atom).
handle_result({error, disconnected}) ->
{error, {recoverable_error, disconnected}};
handle_result({error, Error}) ->
{error, {unrecoverable_error, Error}};
handle_result({error, socket, closed} = Error) ->
{error, {recoverable_error, Error}};
handle_result({error, Type, Reason}) ->
{error, {unrecoverable_error, {Type, Reason}}};
handle_result(Res) ->
Res.
handle_batch_result([{affected_rows, RowCount} | Rest], Acc) ->
handle_batch_result(Rest, Acc + RowCount);
handle_batch_result([{proc_result, RetCode, _Rows} | Rest], Acc) when RetCode =:= 0 ->
handle_batch_result(Rest, Acc);
handle_batch_result([{proc_result, RetCode, Reason} | _Rest], _Acc) ->
{error, {unrecoverable_error, {RetCode, Reason}}};
handle_batch_result([], Acc) ->
{ok, Acc}.
do_async_reply(Result, {ReplyFun, [Context]}) ->
ReplyFun(Context, Result).

View File

@ -0,0 +1,33 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_oracle_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-define(REF_MODULE, emqx_oracle).
%% Hocon config schema exports
-export([
roots/0,
fields/1
]).
roots() ->
[{config, #{type => hoconsc:ref(?REF_MODULE, config)}}].
fields(config) ->
[{server, server()}, {sid, fun sid/1}] ++
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:prepare_statement_fields().
server() ->
Meta = #{desc => ?DESC(?REF_MODULE, "server")},
emqx_schema:servers_sc(Meta, (?REF_MODULE):oracle_host_options()).
sid(type) -> binary();
sid(desc) -> ?DESC(?REF_MODULE, "sid");
sid(required) -> true;
sid(_) -> undefined.

View File

@ -69,7 +69,7 @@
-type preproc_sql_opts() :: #{ -type preproc_sql_opts() :: #{
placeholders => list(binary()), placeholders => list(binary()),
replace_with => '?' | '$n', replace_with => '?' | '$n' | ':n',
strip_double_quote => boolean() strip_double_quote => boolean()
}. }.
@ -149,7 +149,7 @@ proc_cmd(Tokens, Data, Opts) ->
preproc_sql(Sql) -> preproc_sql(Sql) ->
preproc_sql(Sql, '?'). preproc_sql(Sql, '?').
-spec preproc_sql(binary(), '?' | '$n' | preproc_sql_opts()) -> -spec preproc_sql(binary(), '?' | '$n' | ':n' | preproc_sql_opts()) ->
{prepare_statement_key(), tmpl_token()}. {prepare_statement_key(), tmpl_token()}.
preproc_sql(Sql, ReplaceWith) when is_atom(ReplaceWith) -> preproc_sql(Sql, ReplaceWith) when is_atom(ReplaceWith) ->
preproc_sql(Sql, #{replace_with => ReplaceWith}); preproc_sql(Sql, #{replace_with => ReplaceWith});
@ -316,13 +316,17 @@ preproc_tmpl_deep_map_key(Key, _) ->
replace_with(Tmpl, RE, '?') -> replace_with(Tmpl, RE, '?') ->
re:replace(Tmpl, RE, "?", [{return, binary}, global]); re:replace(Tmpl, RE, "?", [{return, binary}, global]);
replace_with(Tmpl, RE, '$n') -> replace_with(Tmpl, RE, '$n') ->
replace_with(Tmpl, RE, <<"$">>);
replace_with(Tmpl, RE, ':n') ->
replace_with(Tmpl, RE, <<":">>);
replace_with(Tmpl, RE, String) when is_binary(String) ->
Parts = re:split(Tmpl, RE, [{return, binary}, trim, group]), Parts = re:split(Tmpl, RE, [{return, binary}, trim, group]),
{Res, _} = {Res, _} =
lists:foldl( lists:foldl(
fun fun
([Tkn, _Phld], {Acc, Seq}) -> ([Tkn, _Phld], {Acc, Seq}) ->
Seq1 = erlang:integer_to_binary(Seq), Seq1 = erlang:integer_to_binary(Seq),
{<<Acc/binary, Tkn/binary, "$", Seq1/binary>>, Seq + 1}; {<<Acc/binary, Tkn/binary, String/binary, Seq1/binary>>, Seq + 1};
([Tkn], {Acc, Seq}) -> ([Tkn], {Acc, Seq}) ->
{<<Acc/binary, Tkn/binary>>, Seq} {<<Acc/binary, Tkn/binary>>, Seq}
end, end,
@ -330,6 +334,7 @@ replace_with(Tmpl, RE, '$n') ->
Parts Parts
), ),
Res. Res.
parse_nested(<<".", R/binary>>) -> parse_nested(<<".", R/binary>>) ->
%% ignore the root . %% ignore the root .
parse_nested(R); parse_nested(R);

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_plugin_libs, [ {application, emqx_plugin_libs, [
{description, "EMQX Plugin utility libs"}, {description, "EMQX Plugin utility libs"},
{vsn, "4.3.9"}, {vsn, "4.3.10"},
{modules, []}, {modules, []},
{applications, [kernel, stdlib]}, {applications, [kernel, stdlib]},
{env, []} {env, []}

View File

@ -105,9 +105,8 @@ proc_cmd(Tokens, Data, Opts) ->
preproc_sql(Sql) -> preproc_sql(Sql) ->
emqx_placeholder:preproc_sql(Sql). emqx_placeholder:preproc_sql(Sql).
-spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n') -> -spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n' | ':n') ->
{prepare_statement_key(), tmpl_token()}. {prepare_statement_key(), tmpl_token()}.
preproc_sql(Sql, ReplaceWith) -> preproc_sql(Sql, ReplaceWith) ->
emqx_placeholder:preproc_sql(Sql, ReplaceWith). emqx_placeholder:preproc_sql(Sql, ReplaceWith).

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_resource, [ {application, emqx_resource, [
{description, "Manager for all external resources"}, {description, "Manager for all external resources"},
{vsn, "0.1.14"}, {vsn, "0.1.15"},
{registered, []}, {registered, []},
{mod, {emqx_resource_app, []}}, {mod, {emqx_resource_app, []}},
{applications, [ {applications, [

View File

@ -0,0 +1 @@
Implement Oracle Database Bridge, which supports publishing messages to Oracle Database from MQTT topics.

View File

@ -1,6 +1,6 @@
{application, emqx_ee_bridge, [ {application, emqx_ee_bridge, [
{description, "EMQX Enterprise data bridges"}, {description, "EMQX Enterprise data bridges"},
{vsn, "0.1.11"}, {vsn, "0.1.12"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -37,7 +37,8 @@ api_schemas(Method) ->
ref(emqx_ee_bridge_rocketmq, Method), ref(emqx_ee_bridge_rocketmq, Method),
ref(emqx_ee_bridge_sqlserver, Method), ref(emqx_ee_bridge_sqlserver, Method),
ref(emqx_bridge_opents, Method), ref(emqx_bridge_opents, Method),
ref(emqx_bridge_pulsar, Method ++ "_producer") ref(emqx_bridge_pulsar, Method ++ "_producer"),
ref(emqx_bridge_oracle, Method)
]. ].
schema_modules() -> schema_modules() ->
@ -59,7 +60,8 @@ schema_modules() ->
emqx_ee_bridge_rocketmq, emqx_ee_bridge_rocketmq,
emqx_ee_bridge_sqlserver, emqx_ee_bridge_sqlserver,
emqx_bridge_opents, emqx_bridge_opents,
emqx_bridge_pulsar emqx_bridge_pulsar,
emqx_bridge_oracle
]. ].
examples(Method) -> examples(Method) ->
@ -100,7 +102,8 @@ resource_type(dynamo) -> emqx_ee_connector_dynamo;
resource_type(rocketmq) -> emqx_ee_connector_rocketmq; resource_type(rocketmq) -> emqx_ee_connector_rocketmq;
resource_type(sqlserver) -> emqx_ee_connector_sqlserver; resource_type(sqlserver) -> emqx_ee_connector_sqlserver;
resource_type(opents) -> emqx_bridge_opents_connector; resource_type(opents) -> emqx_bridge_opents_connector;
resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer. resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer;
resource_type(oracle) -> emqx_oracle.
fields(bridges) -> fields(bridges) ->
[ [
@ -167,6 +170,14 @@ fields(bridges) ->
desc => <<"OpenTSDB Bridge Config">>, desc => <<"OpenTSDB Bridge Config">>,
required => false required => false
} }
)},
{oracle,
mk(
hoconsc:map(name, ref(emqx_bridge_oracle, "config")),
#{
desc => <<"Oracle Bridge Config">>,
required => false
}
)} )}
] ++ kafka_structs() ++ pulsar_structs() ++ mongodb_structs() ++ influxdb_structs() ++ ] ++ kafka_structs() ++ pulsar_structs() ++ mongodb_structs() ++ influxdb_structs() ++
redis_structs() ++ redis_structs() ++

View File

@ -170,7 +170,9 @@ defmodule EMQXUmbrella.MixProject do
:emqx_bridge_rocketmq, :emqx_bridge_rocketmq,
:emqx_bridge_tdengine, :emqx_bridge_tdengine,
:emqx_bridge_timescale, :emqx_bridge_timescale,
:emqx_bridge_pulsar :emqx_bridge_pulsar,
:emqx_oracle,
:emqx_bridge_oracle
]) ])
end end
@ -377,6 +379,8 @@ defmodule EMQXUmbrella.MixProject do
emqx_bridge_rocketmq: :permanent, emqx_bridge_rocketmq: :permanent,
emqx_bridge_tdengine: :permanent, emqx_bridge_tdengine: :permanent,
emqx_bridge_timescale: :permanent, emqx_bridge_timescale: :permanent,
emqx_oracle: :permanent,
emqx_bridge_oracle: :permanent,
emqx_ee_schema_registry: :permanent emqx_ee_schema_registry: :permanent
], ],
else: [] else: []

View File

@ -94,6 +94,8 @@ is_community_umbrella_app("apps/emqx_bridge_redis") -> false;
is_community_umbrella_app("apps/emqx_bridge_rocketmq") -> false; is_community_umbrella_app("apps/emqx_bridge_rocketmq") -> false;
is_community_umbrella_app("apps/emqx_bridge_tdengine") -> false; is_community_umbrella_app("apps/emqx_bridge_tdengine") -> false;
is_community_umbrella_app("apps/emqx_bridge_timescale") -> false; is_community_umbrella_app("apps/emqx_bridge_timescale") -> false;
is_community_umbrella_app("apps/emqx_bridge_oracle") -> false;
is_community_umbrella_app("apps/emqx_oracle") -> false;
is_community_umbrella_app(_) -> true. is_community_umbrella_app(_) -> true.
is_jq_supported() -> is_jq_supported() ->
@ -470,6 +472,8 @@ relx_apps_per_edition(ee) ->
emqx_bridge_rocketmq, emqx_bridge_rocketmq,
emqx_bridge_tdengine, emqx_bridge_tdengine,
emqx_bridge_timescale, emqx_bridge_timescale,
emqx_oracle,
emqx_bridge_oracle,
emqx_ee_schema_registry emqx_ee_schema_registry
]; ];
relx_apps_per_edition(ce) -> relx_apps_per_edition(ce) ->

View File

@ -0,0 +1,52 @@
emqx_bridge_oracle {
local_topic {
desc = "The MQTT topic filter to be forwarded to Oracle Database. All MQTT 'PUBLISH' messages with the topic"
" matching the local_topic will be forwarded.</br>"
"NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is"
" configured, then both the data got from the rule and the MQTT messages that match local_topic"
" will be forwarded."
label = "Local Topic"
}
sql_template {
desc = "SQL Template. The template string can contain placeholders"
" for message metadata and payload field. The placeholders are inserted"
" without any checking and special formatting, so it is important to"
" ensure that the inserted values are formatted and escaped correctly."
label = "SQL Template"
}
server {
desc = "The IPv4 or IPv6 address or the hostname to connect to.<br/>"
"A host entry has the following form: `Host[:Port]`.<br/>"
"The Oracle Database default port 1521 is used if `[:Port]` is not specified."
label = "Server Host"
}
sid {
desc = "Sid for Oracle Database"
label = "Oracle Database Sid."
}
config_enable {
desc = "Enable or disable this bridge"
label = "Enable Or Disable Bridge"
}
desc_config {
desc = "Configuration for an Oracle Database bridge."
label = "Oracle Database Bridge Configuration"
}
desc_type {
desc = "The Bridge Type"
label = "Bridge Type"
}
desc_name {
desc = "Bridge name."
label = "Bridge Name"
}
}

View File

@ -0,0 +1,15 @@
emqx_oracle {
server {
desc = "The IPv4 or IPv6 address or the hostname to connect to.<br/>"
"A host entry has the following form: `Host[:Port]`.<br/>"
"The Oracle Database default port 1521 is used if `[:Port]` is not specified."
label = "Server Host"
}
sid {
desc = "Sid for Oracle Database."
label = "Oracle Database Sid"
}
}

View File

@ -0,0 +1,51 @@
emqx_bridge_oracle {
local_topic {
desc = "发送到 'local_topic' 的消息都会转发到 Oracle Database。 </br>"
"注意:如果这个 Bridge 被用作规则EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。"
label = "本地 Topic"
}
sql_template {
desc = "SQL模板。模板字符串可以包含消息元数据和有效载荷字段的占位符。占位符"
"的插入不需要任何检查和特殊格式化,因此必须确保插入的数值格式化和转义正确。模板字符串可以包含占位符"
"模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入"
"所以必须确保插入的值的格式正确。因此,确保插入的值格式化和转义正确是非常重要的。模板字符串可以包含占位符"
"模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入"
"所以必须确保插入的值的格式正确。确保插入的值被正确地格式化和转义。"
label = "SQL 模板"
}
server {
desc = "将要连接的 IPv4 或 IPv6 地址,或者主机名。<br/>"
"主机名具有以下形式:`Host[:Port]`。<br/>"
"如果未指定 `[:Port]`,则使用 Oracle Database 默认端口 1521。"
label = "服务器地址"
}
sid {
desc = "Oracle Database Sid 名称"
label = "Oracle Database Sid"
}
config_enable {
desc = "启用/禁用桥接"
label = "启用/禁用桥接"
}
desc_config {
desc = "Oracle Database 桥接配置"
label = "Oracle Database 桥接配置"
}
desc_type {
desc = "Bridge 类型"
label = "桥接类型"
}
desc_name {
desc = "桥接名字"
label = "桥接名字"
}
}

View File

@ -0,0 +1,15 @@
emqx_oracle {
server {
desc = "将要连接的 IPv4 或 IPv6 地址,或者主机名。<br/>"
"主机名具有以下形式:`Host[:Port]`。<br/>"
"如果未指定 `[:Port]`,则使用 Oracle Database 默认端口 1521。"
label = "服务器地址"
}
sid {
desc = "Oracle Database Sid 名称"
label = "Oracle Database Sid"
}
}

View File

@ -193,6 +193,9 @@ for dep in ${CT_DEPS}; do
;; ;;
pulsar) pulsar)
FILES+=( '.ci/docker-compose-file/docker-compose-pulsar.yaml' ) FILES+=( '.ci/docker-compose-file/docker-compose-pulsar.yaml' )
;;
oracle)
FILES+=( '.ci/docker-compose-file/docker-compose-oracle.yaml' )
;; ;;
*) *)
echo "unknown_ct_dependency $dep" echo "unknown_ct_dependency $dep"