diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index b495fa671..2216c3d05 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -92,6 +92,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_kinesis_action_info, emqx_bridge_matrix_action_info, emqx_bridge_mongodb_action_info, + emqx_bridge_oracle_action_info, emqx_bridge_influxdb_action_info, emqx_bridge_cassandra_action_info, emqx_bridge_mysql_action_info, diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src index d68c6ca9a..39b606d5f 100644 --- a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_oracle, [ {description, "EMQX Enterprise Oracle Database Bridge"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [ kernel, @@ -8,7 +8,7 @@ emqx_resource, emqx_oracle ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_oracle_action_info]}]}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl index 15b2be575..532c01b78 100644 --- a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl @@ -9,7 +9,9 @@ -include_lib("emqx_resource/include/emqx_resource.hrl"). -export([ - conn_bridge_examples/1 + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 ]). -export([ @@ -20,22 +22,25 @@ config_validator/1 ]). +-define(CONNECTOR_TYPE, oracle). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). + -define(DEFAULT_SQL, << "insert into t_mqtt_msgs(msgid, topic, qos, payload) " "values (${id}, ${topic}, ${qos}, ${payload})" >>). -conn_bridge_examples(Method) -> +conn_bridge_examples(_Method) -> [ #{ <<"oracle">> => #{ summary => <<"Oracle Database Bridge">>, - value => values(Method) + value => conn_bridge_examples_values() } } ]. -values(_Method) -> +conn_bridge_examples_values() -> #{ enable => true, type => oracle, @@ -58,6 +63,54 @@ values(_Method) -> } }. +connector_examples(Method) -> + [ + #{ + <<"oracle">> => + #{ + summary => <<"Oracle Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_values() + ) + } + } + ]. + +connector_values() -> + #{ + <<"username">> => <<"system">>, + <<"password">> => <<"oracle">>, + <<"server">> => <<"127.0.0.1:1521">>, + <<"service_name">> => <<"XE">>, + <<"sid">> => <<"XE">>, + <<"pool_size">> => 8, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_timeout">> => <<"5s">> + } + }. + +bridge_v2_examples(Method) -> + [ + #{ + <<"oracle">> => + #{ + summary => <<"Oracle Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> + #{ + parameters => #{ + <<"sql">> => ?DEFAULT_SQL + } + }. + %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions @@ -65,6 +118,55 @@ namespace() -> "bridge_oracle". roots() -> []. +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields( + Field, + ?CONNECTOR_TYPE, + fields("config_connector") + ); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(oracle_action)); +fields(action) -> + {?ACTION_TYPE, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, oracle_action)), + #{ + desc => <<"Oracle Action Config">>, + required => false + } + )}; +fields(oracle_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + hoconsc:ref(?MODULE, action_parameters), + #{ + required => true, + desc => ?DESC("action_parameters") + } + ) + ); +fields(action_parameters) -> + [ + {sql, + hoconsc:mk( + binary(), + #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + )} + ]; +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + fields(connector_fields) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields("config") -> [ {enable, @@ -83,8 +185,10 @@ fields("config") -> #{desc => ?DESC("local_topic"), default => undefined} )} ] ++ emqx_resource_schema:fields("resource_opts") ++ - (emqx_oracle_schema:fields(config) -- - emqx_connector_schema_lib:prepare_statement_fields()); + fields(connector_fields); +fields(connector_fields) -> + (emqx_oracle_schema:fields(config) -- + emqx_connector_schema_lib:prepare_statement_fields()); fields("post") -> fields("post", oracle); fields("put") -> @@ -97,6 +201,16 @@ fields("post", Type) -> desc("config") -> ?DESC("desc_config"); +desc("creation_opts") -> + ?DESC(emqx_resource_schema, "creation_opts"); +desc("config_connector") -> + ?DESC("config_connector"); +desc(oracle_action) -> + ?DESC("oracle_action"); +desc(action_parameters) -> + ?DESC("action_parameters"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> undefined. @@ -116,5 +230,5 @@ config_validator(#{<<"server">> := Server} = Config) when not is_map_key(<<"service_name">>, Config) -> {error, "neither SID nor Service Name was set"}; -config_validator(_) -> +config_validator(_Config) -> ok. diff --git a/apps/emqx_bridge_oracle/src/emqx_bridge_oracle_action_info.erl b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle_action_info.erl new file mode 100644 index 000000000..561b798bd --- /dev/null +++ b/apps/emqx_bridge_oracle/src/emqx_bridge_oracle_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_oracle_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> oracle. + +action_type_name() -> oracle. + +connector_type_name() -> oracle. + +schema_module() -> emqx_bridge_oracle. diff --git a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl index 878ae2e1d..608d81bec 100644 --- a/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl +++ b/apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl @@ -267,7 +267,12 @@ parse_and_check(ConfigString, Name) -> resource_id(Config) -> Type = ?BRIDGE_TYPE_BIN, Name = ?config(oracle_name, Config), - emqx_bridge_resource:resource_id(Type, Name). + <<"connector:", Type/binary, ":", Name/binary>>. + +action_id(Config) -> + Type = ?BRIDGE_TYPE_BIN, + Name = ?config(oracle_name, Config), + emqx_bridge_v2:id(Type, Name). bridge_id(Config) -> Type = ?BRIDGE_TYPE_BIN, @@ -378,6 +383,7 @@ create_rule_and_action_http(Config) -> t_sync_query(Config) -> ResourceId = resource_id(Config), + Name = ?config(oracle_name, Config), ?check_trace( begin reset_table(Config), @@ -387,6 +393,18 @@ t_sync_query(Config) -> _Attempts = 20, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), + ?retry( + _Sleep1 = 1_000, + _Attempts1 = 30, + ?assertMatch( + #{status := connected}, + emqx_bridge_v2:health_check( + ?BRIDGE_TYPE_BIN, + Name + ) + ) + ), + ActionId = action_id(Config), MsgId = erlang:unique_integer(), Params = #{ topic => ?config(mqtt_topic, Config), @@ -394,7 +412,7 @@ t_sync_query(Config) -> payload => ?config(oracle_name, Config), retain => true }, - Message = {send_message, Params}, + Message = {ActionId, Params}, ?assertEqual( {ok, [{affected_rows, 1}]}, emqx_resource:simple_sync_query(ResourceId, Message) ), @@ -409,7 +427,7 @@ t_batch_sync_query(Config) -> ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), ResourceId = resource_id(Config), - BridgeId = bridge_id(Config), + Name = ?config(oracle_name, Config), ?check_trace( begin reset_table(Config), @@ -419,6 +437,17 @@ t_batch_sync_query(Config) -> _Attempts = 30, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), + ?retry( + _Sleep = 1_000, + _Attempts = 30, + ?assertMatch( + #{status := connected}, + emqx_bridge_v2:health_check( + ?BRIDGE_TYPE_BIN, + Name + ) + ) + ), MsgId = erlang:unique_integer(), Params = #{ topic => ?config(mqtt_topic, Config), @@ -431,9 +460,9 @@ t_batch_sync_query(Config) -> % be sent async as callback_mode is set to async_if_possible. emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> ct:sleep(1000), - emqx_bridge:send_message(BridgeId, Params), - emqx_bridge:send_message(BridgeId, Params), - emqx_bridge:send_message(BridgeId, Params), + emqx_bridge_v2:send_message(?BRIDGE_TYPE_BIN, Name, Params, #{}), + emqx_bridge_v2:send_message(?BRIDGE_TYPE_BIN, Name, Params, #{}), + emqx_bridge_v2:send_message(?BRIDGE_TYPE_BIN, Name, Params, #{}), ok end), % Wait for reconnection. @@ -442,6 +471,17 @@ t_batch_sync_query(Config) -> _Attempts = 30, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), + ?retry( + _Sleep = 1_000, + _Attempts = 30, + ?assertMatch( + #{status := connected}, + emqx_bridge_v2:health_check( + ?BRIDGE_TYPE_BIN, + Name + ) + ) + ), ?retry( _Sleep = 1_000, _Attempts = 30, @@ -506,6 +546,17 @@ t_start_stop(Config) -> _Attempts = 20, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + #{status := connected}, + emqx_bridge_v2:health_check( + ?BRIDGE_TYPE_BIN, + OracleName + ) + ) + ), %% Check that the bridge probe API doesn't leak atoms. ProbeRes0 = probe_bridge_api( @@ -554,6 +605,7 @@ t_probe_with_nested_tokens(Config) -> t_message_with_nested_tokens(Config) -> BridgeId = bridge_id(Config), ResourceId = resource_id(Config), + Name = ?config(oracle_name, Config), reset_table(Config), ?assertMatch( {ok, _}, @@ -568,6 +620,17 @@ t_message_with_nested_tokens(Config) -> _Attempts = 20, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertMatch( + #{status := connected}, + emqx_bridge_v2:health_check( + ?BRIDGE_TYPE_BIN, + Name + ) + ) + ), MsgId = erlang:unique_integer(), Data = binary_to_list(?config(oracle_name, Config)), Params = #{ @@ -600,6 +663,7 @@ t_on_get_status(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), + Name = ?config(oracle_name, Config), ResourceId = resource_id(Config), reset_table(Config), ?assertMatch({ok, _}, create_bridge(Config)), @@ -612,13 +676,23 @@ t_on_get_status(Config) -> ), emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> ct:sleep(500), - ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)) + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch( + #{status := disconnected}, + emqx_bridge_v2:health_check(?BRIDGE_TYPE_BIN, Name) + ) end), %% Check that it recovers itself. ?retry( _Sleep = 1_000, _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + begin + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + ?assertMatch( + #{status := connected}, + emqx_bridge_v2:health_check(?BRIDGE_TYPE_BIN, Name) + ) + end ), ok. @@ -664,6 +738,7 @@ t_missing_table(Config) -> begin drop_table_if_exists(Config), ?assertMatch({ok, _}, create_bridge_api(Config)), + ActionId = emqx_bridge_v2:id(?BRIDGE_TYPE_BIN, ?config(oracle_name, Config)), ?retry( _Sleep = 1_000, _Attempts = 20, @@ -679,7 +754,7 @@ t_missing_table(Config) -> payload => ?config(oracle_name, Config), retain => true }, - Message = {send_message, Params}, + Message = {ActionId, Params}, ?assertMatch( {error, {resource_error, #{reason := not_connected}}}, emqx_resource:simple_sync_query(ResourceId, Message) @@ -698,6 +773,7 @@ t_table_removed(Config) -> begin reset_table(Config), ?assertMatch({ok, _}, create_bridge_api(Config)), + ActionId = emqx_bridge_v2:id(?BRIDGE_TYPE_BIN, ?config(oracle_name, Config)), ?retry( _Sleep = 1_000, _Attempts = 20, @@ -711,7 +787,7 @@ t_table_removed(Config) -> payload => ?config(oracle_name, Config), retain => true }, - Message = {send_message, Params}, + Message = {ActionId, Params}, ?assertEqual( {error, {unrecoverable_error, {942, "ORA-00942: table or view does not exist\n"}}}, emqx_resource:simple_sync_query(ResourceId, Message) diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 95c9d2991..dd069e4e6 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -36,6 +36,8 @@ resource_type(matrix) -> emqx_postgresql; resource_type(mongodb) -> emqx_bridge_mongodb_connector; +resource_type(oracle) -> + emqx_oracle; resource_type(influxdb) -> emqx_bridge_influxdb_connector; resource_type(cassandra) -> @@ -140,6 +142,15 @@ connector_structs() -> required => false } )}, + {oracle, + mk( + hoconsc:map(name, ref(emqx_bridge_oracle, "config_connector")), + #{ + desc => <<"Oracle Connector Config">>, + required => false, + validator => fun emqx_bridge_oracle:config_validator/1 + } + )}, {influxdb, mk( hoconsc:map(name, ref(emqx_bridge_influxdb, "config_connector")), @@ -247,6 +258,7 @@ schema_modules() -> emqx_bridge_kinesis, emqx_bridge_matrix, emqx_bridge_mongodb, + emqx_bridge_oracle, emqx_bridge_influxdb, emqx_bridge_cassandra, emqx_bridge_mysql, @@ -280,6 +292,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"), api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"), api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"), + api_ref(emqx_bridge_oracle, <<"oracle">>, Method ++ "_connector"), api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"), api_ref(emqx_bridge_cassandra, <<"cassandra">>, Method ++ "_connector"), api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index b7c4d9f74..ea589c15c 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -137,6 +137,8 @@ connector_type_to_bridge_types(matrix) -> [matrix]; connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; +connector_type_to_bridge_types(oracle) -> + [oracle]; connector_type_to_bridge_types(influxdb) -> [influxdb, influxdb_api_v1, influxdb_api_v2]; connector_type_to_bridge_types(cassandra) -> diff --git a/apps/emqx_oracle/src/emqx_oracle.app.src b/apps/emqx_oracle/src/emqx_oracle.app.src index 7740517ca..7cd4d4d0d 100644 --- a/apps/emqx_oracle/src/emqx_oracle.app.src +++ b/apps/emqx_oracle/src/emqx_oracle.app.src @@ -1,6 +1,6 @@ {application, emqx_oracle, [ {description, "EMQX Enterprise Oracle Database Connector"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index cfc67aa53..e1ac846fa 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -6,6 +6,7 @@ -behaviour(emqx_resource). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -24,7 +25,11 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). %% callbacks for ecpool @@ -103,12 +108,13 @@ on_start( {app_name, "EMQX Data To Oracle Database Action"} ], PoolName = InstId, - Prepares = parse_prepare_sql(Config), - InitState = #{pool_name => PoolName}, - State = maps:merge(InitState, Prepares), + State = #{ + pool_name => PoolName, + installed_channels => #{} + }, case emqx_resource_pool:start(InstId, ?MODULE, Options) of ok -> - {ok, init_prepare(State)}; + {ok, State}; {error, Reason} -> ?tp( oracle_connector_start_failed, @@ -125,13 +131,105 @@ on_stop(InstId, #{pool_name := PoolName}) -> ?tp(oracle_bridge_stopped, #{instance_id => InstId}), emqx_resource_pool:stop(PoolName). +on_add_channel( + _InstId, + #{ + installed_channels := InstalledChannels, + pool_name := PoolName + } = OldState, + ChannelId, + ChannelConfig +) -> + {ok, ChannelState} = create_channel_state(ChannelId, PoolName, ChannelConfig), + NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +create_channel_state( + ChannelId, + PoolName, + #{parameters := Conf} = _ChannelConfig +) -> + State0 = parse_prepare_sql(ChannelId, Conf), + State1 = init_prepare(PoolName, State0), + {ok, State1}. + +on_remove_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId +) -> + NewInstalledChannels = maps:remove(ChannelId, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +on_get_channel_status( + _ResId, + ChannelId, + #{ + pool_name := PoolName, + installed_channels := Channels + } = _State +) -> + State = maps:get(ChannelId, Channels), + case do_check_prepares(ChannelId, PoolName, State) of + ok -> + ?status_connected; + {error, undefined_table} -> + %% return new state indicating that we are connected but the target table is not created + {?status_disconnected, {unhealthy_target, ?UNHEALTHY_TARGET_MSG}}; + {error, _Reason} -> + %% do not log error, it is logged in prepare_sql_to_conn + connecting + end. +% #{stream_name := StreamName} = maps:get(ChannelId, Channels), +% case +% emqx_resource_pool:health_check_workers( +% PoolName, +% {emqx_bridge_kinesis_connector_client, connection_status, [StreamName]}, +% ?HEALTH_CHECK_TIMEOUT, +% #{return_values => true} +% ) +% of +% {ok, Values} -> +% AllOk = lists:all(fun(S) -> S =:= {ok, ?status_connected} end, Values), +% case AllOk of +% true -> +% ?status_connected; +% false -> +% Unhealthy = lists:any(fun(S) -> S =:= {error, unhealthy_target} end, Values), +% case Unhealthy of +% true -> {?status_disconnected, {unhealthy_target, ?TOPIC_MESSAGE}}; +% false -> ?status_disconnected +% end +% end; +% {error, Reason} -> +% ?SLOG(error, #{ +% msg => "kinesis_producer_get_status_failed", +% state => State, +% reason => Reason +% }), +% ?status_disconnected +% end. + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + on_query(InstId, {TypeOrKey, NameOrSQL}, #{pool_name := _PoolName} = State) -> on_query(InstId, {TypeOrKey, NameOrSQL, []}, State); on_query( InstId, {TypeOrKey, NameOrSQL, Params}, - #{pool_name := PoolName} = State + #{ + pool_name := PoolName, + installed_channels := Channels + } = _ConnectorState ) -> + State = maps:get(TypeOrKey, Channels, #{}), ?SLOG(debug, #{ msg => "oracle_connector_received_sql_query", connector => InstId, @@ -147,11 +245,19 @@ on_query( on_batch_query( InstId, BatchReq, - #{pool_name := PoolName, params_tokens := Tokens, prepare_sql := Sts} = State + #{ + pool_name := PoolName, + installed_channels := Channels + } = ConnectorState ) -> case BatchReq of [{Key, _} = Request | _] -> BinKey = to_bin(Key), + State = maps:get(BinKey, Channels), + #{ + params_tokens := Tokens, + prepare_sql := Sts + } = State, case maps:get(BinKey, Tokens, undefined) of undefined -> Log = #{ @@ -179,7 +285,7 @@ on_batch_query( Log = #{ connector => InstId, request => BatchReq, - state => State, + state => ConnectorState, msg => "invalid_request" }, ?SLOG(error, Log), @@ -232,36 +338,35 @@ on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL, Data) -> Result end. -on_get_status(_InstId, #{pool_name := Pool} = State) -> +on_get_status(_InstId, #{pool_name := Pool} = _State) -> case emqx_resource_pool:health_check_workers(Pool, fun ?MODULE:do_get_status/1) of true -> - case do_check_prepares(State) of - ok -> - connected; - {ok, NState} -> - %% return new state with prepared statements - {connected, NState}; - {error, {undefined_table, NState}} -> - %% return new state indicating that we are connected but the target table is not created - {disconnected, NState, {unhealthy_target, ?UNHEALTHY_TARGET_MSG}}; - {error, _Reason} -> - %% do not log error, it is logged in prepare_sql_to_conn - connecting - end; + ?status_connected; false -> - disconnected + ?status_disconnected end. do_get_status(Conn) -> ok == element(1, jamdb_oracle:sql_query(Conn, "select 1 from dual")). do_check_prepares( + _ChannelId, + _PoolName, #{ - pool_name := PoolName, - prepare_sql := #{<<"send_message">> := SQL}, - params_tokens := #{<<"send_message">> := Tokens} - } = State + prepare_sql := {error, _Prepares} + } = _State ) -> + {error, undefined_table}; +do_check_prepares( + ChannelId, + PoolName, + State +) -> + #{ + prepare_sql := #{ChannelId := SQL}, + params_tokens := #{ChannelId := Tokens} + } = State, + % it's already connected. Verify if target table still exists Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], lists:foldl( @@ -270,7 +375,7 @@ do_check_prepares( case ecpool_worker:client(WorkerPid) of {ok, Conn} -> case check_if_table_exists(Conn, SQL, Tokens) of - {error, undefined_table} -> {error, {undefined_table, State}}; + {error, undefined_table} -> {error, undefined_table}; _ -> ok end; _ -> @@ -281,20 +386,17 @@ do_check_prepares( end, ok, Workers - ); -do_check_prepares( - State = #{pool_name := PoolName, prepare_sql := {error, Prepares}, params_tokens := TokensMap} -) -> - case prepare_sql(Prepares, PoolName, TokensMap) of - %% remove the error - {ok, Sts} -> - {ok, State#{prepare_sql => Sts}}; - {error, undefined_table} -> - %% indicate the error - {error, {undefined_table, State#{prepare_sql => {error, Prepares}}}}; - {error, _Reason} = Error -> - Error - end. + ). +% case prepare_sql(Prepares, PoolName, TokensMap) of +% %% remove the error +% {ok, Sts} -> +% {ok, State#{prepare_sql => Sts}}; +% {error, undefined_table} -> +% %% indicate the error +% {error, {undefined_table, State#{prepare_sql => {error, Prepares}}}}; +% {error, _Reason} = Error -> +% Error +% end. %% =================================================================== @@ -328,13 +430,13 @@ execute_batch(Conn, SQL, ParamsList) -> ?tp(oracle_batch_query, #{conn => Conn, sql => SQL, params => ParamsList, result => Ret}), handle_result(Ret). -parse_prepare_sql(Config) -> +parse_prepare_sql(ChannelId, Config) -> SQL = case maps:get(prepare_statement, Config, undefined) of undefined -> case maps:get(sql, Config, undefined) of undefined -> #{}; - Template -> #{<<"send_message">> => Template} + Template -> #{ChannelId => Template} end; Any -> Any @@ -352,7 +454,7 @@ parse_prepare_sql([], Prepares, Tokens) -> params_tokens => Tokens }. -init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName, params_tokens := TokensMap}) -> +init_prepare(PoolName, State = #{prepare_sql := Prepares, params_tokens := TokensMap}) -> case prepare_sql(Prepares, PoolName, TokensMap) of {ok, Sts} -> State#{prepare_sql := Sts}; diff --git a/rel/i18n/emqx_bridge_oracle.hocon b/rel/i18n/emqx_bridge_oracle.hocon index bcf41ea2c..607976018 100644 --- a/rel/i18n/emqx_bridge_oracle.hocon +++ b/rel/i18n/emqx_bridge_oracle.hocon @@ -54,4 +54,19 @@ emqx_bridge_oracle { label = "Bridge Name" } + action_parameters { + desc = "Action specific configuration." + label = "Action" + } + + oracle_action { + desc = "Configuration for Oracle Action" + label = "Oracle Action Configuration" + } + + config_connector { + desc = "Configuration for an Oracle Client." + label = "Oracle Client Configuration" + } + }