From 9143d5994d412525fe8fee3bce7447bc4e225053 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 23 Feb 2024 15:10:19 +0800 Subject: [PATCH] feat: refactor MS SQL Server bridge to connector and action --- apps/emqx_bridge/src/emqx_action_info.erl | 1 + .../src/emqx_bridge_sqlserver.app.src | 1 + .../src/emqx_bridge_sqlserver.erl | 101 ++++++++++++++++ .../src/emqx_bridge_sqlserver_action_info.erl | 22 ++++ .../src/emqx_bridge_sqlserver_connector.erl | 114 +++++++++++++++--- .../src/schema/emqx_connector_ee_schema.erl | 12 ++ .../src/schema/emqx_connector_schema.erl | 2 + apps/emqx_management/src/emqx_mgmt_auth.erl | 2 +- apps/emqx_resource/src/emqx_resource.erl | 2 +- rel/i18n/emqx_bridge_sqlserver.hocon | 18 +++ 10 files changed, 255 insertions(+), 20 deletions(-) create mode 100644 apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_action_info.erl diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 4d47f1e09..c6282fc18 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -105,6 +105,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_mysql_action_info, emqx_bridge_pgsql_action_info, emqx_bridge_syskeeper_action_info, + emqx_bridge_sqlserver_action_info, emqx_bridge_timescale_action_info, emqx_bridge_redis_action_info, emqx_bridge_iotdb_action_info, diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src index bddf212e3..1340161a2 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src @@ -3,6 +3,7 @@ {vsn, "0.1.6"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, odbc]}, + {env, [{emqx_action_info_modules, [emqx_bridge_sqlserver_action_info]}]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl index dde0d2c69..461ef2c85 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl @@ -11,6 +11,8 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ + bridge_v2_examples/1, + connector_examples/1, conn_bridge_examples/1 ]). @@ -21,6 +23,9 @@ desc/1 ]). +-define(CONNECTOR_TYPE, sqlserver). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). + -define(DEFAULT_SQL, << "insert into t_mqtt_msg(msgid, topic, qos, payload) " "values ( ${id}, ${topic}, ${qos}, ${payload} )" @@ -28,6 +33,9 @@ -define(DEFAULT_DRIVER, <<"ms-sql">>). +%% ------------------------------------------------------------------------------------------------- +%% api. + conn_bridge_examples(Method) -> [ #{ @@ -65,12 +73,76 @@ values(post) -> values(put) -> values(post). +%% ==================== +%% Bridge V2: Connector + Action + +connector_examples(Method) -> + [ + #{ + <<"sqlserver">> => + #{ + summary => <<"Microsoft SQL Server Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_values() + ) + } + } + ]. + +connector_values() -> + #{ + server => <<"127.0.0.1:1433">>, + database => <<"test">>, + pool_size => 8, + username => <<"sa">>, + password => <<"******">>, + resource_opts => #{health_check_interval => <<"20s">>} + }. + +bridge_v2_examples(Method) -> + [ + #{ + <<"sqlserver">> => + #{ + summary => <<"Microsoft SQL Server Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> + #{parameters => #{sql => ?DEFAULT_SQL}}. + %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions namespace() -> "bridge_sqlserver". roots() -> []. +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(sqlserver_action)); +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields( + Field, + ?CONNECTOR_TYPE, + fields("config_connector") -- emqx_connector_schema:common_fields() + ); +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + emqx_bridge_sqlserver_connector:fields(config) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, @@ -97,6 +169,27 @@ fields("config") -> ] ++ (emqx_bridge_sqlserver_connector:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields()); +fields(action) -> + {?ACTION_TYPE, + mk( + hoconsc:map(name, ref(?MODULE, sqlserver_action)), + #{desc => ?DESC("sqlserver_action"), required => false} + )}; +fields(sqlserver_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk( + ref(?MODULE, action_parameters), + #{required => true, desc => ?DESC(action_parameters)} + ) + ); +fields(action_parameters) -> + [ + {sql, + mk( + binary(), + #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + )} + ]; fields("creation_opts") -> emqx_resource_schema:fields("creation_opts"); fields("post") -> @@ -115,6 +208,14 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for Microsoft SQL Server using `", string:to_upper(Method), "` method."]; desc("creation_opts" = Name) -> emqx_resource_schema:desc(Name); +desc("config_connector") -> + ?DESC("config_connector"); +desc(sqlserver_action) -> + ?DESC("sqlserver_action"); +desc(action_parameters) -> + ?DESC("action_parameters"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> undefined. diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_action_info.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_action_info.erl new file mode 100644 index 000000000..ad8d69081 --- /dev/null +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_sqlserver_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> sqlserver. + +action_type_name() -> sqlserver. + +connector_type_name() -> sqlserver. + +schema_module() -> emqx_bridge_sqlserver. diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 566a9013b..13f03fb55 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -35,7 +35,11 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). %% callbacks for ecpool @@ -172,7 +176,7 @@ server() -> callback_mode() -> always_sync. on_start( - ResourceId = PoolName, + InstanceId = PoolName, #{ server := Server, username := Username, @@ -184,7 +188,7 @@ on_start( ) -> ?SLOG(info, #{ msg => "starting_sqlserver_connector", - connector => ResourceId, + connector => InstanceId, config => emqx_utils:redact(Config) }), @@ -199,7 +203,8 @@ on_start( ok end, - Options = [ + %% odbc connection string required + ConnectOptions = [ {server, to_bin(Server)}, {username, Username}, {password, maps:get(password, Config, emqx_secret:wrap(""))}, @@ -209,12 +214,12 @@ on_start( ], State = #{ - %% also ResourceId + %% also InstanceId pool_name => PoolName, - sql_templates => parse_sql_template(Config), + installed_channels => #{}, resource_opts => ResourceOpts }, - case emqx_resource_pool:start(PoolName, ?MODULE, Options) of + case emqx_resource_pool:start(PoolName, ?MODULE, ConnectOptions) of ok -> {ok, State}; {error, Reason} -> @@ -225,12 +230,61 @@ on_start( {error, Reason} end. -on_stop(ResourceId, _State) -> +on_add_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId, + ChannelConfig +) -> + {ok, ChannelState} = create_channel_state(ChannelConfig), + NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +create_channel_state( + #{parameters := Conf} = _ChannelConfig +) -> + State = #{sql_templates => parse_sql_template(Conf)}, + {ok, State}. + +on_remove_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId +) -> + NewInstalledChannels = maps:remove(ChannelId, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +on_get_channel_status( + InstanceId, + ChannelId, + #{installed_channels := Channels} = State +) -> + case maps:find(ChannelId, Channels) of + {ok, _} -> on_get_status(InstanceId, State); + error -> ?status_disconnected + end. + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + +on_stop(InstanceId, _State) -> + ?tp( + sqlserver_connector_on_stop, + #{instance_id => InstanceId} + ), ?SLOG(info, #{ msg => "stopping_sqlserver_connector", - connector => ResourceId + connector => InstanceId }), - emqx_resource_pool:stop(ResourceId). + emqx_resource_pool:stop(InstanceId). -spec on_query( resource_id(), @@ -273,8 +327,8 @@ on_get_status(_InstanceId, #{pool_name := PoolName} = _State) -> ), status_result(Health). -status_result(_Status = true) -> connected; -status_result(_Status = false) -> connecting. +status_result(_Status = true) -> ?status_connected; +status_result(_Status = false) -> ?status_connecting. %% TODO: %% case for disconnected @@ -296,7 +350,7 @@ do_get_status(Conn) -> end. %%==================================================================== -%% Internal Helper fns +%% Internal Functions %%==================================================================== %% TODO && FIXME: @@ -341,7 +395,10 @@ do_query( ResourceId, Query, ApplyMode, - #{pool_name := PoolName, sql_templates := Templates} = State + #{ + pool_name := PoolName, + installed_channels := Channels + } = State ) -> ?TRACE( "SINGLE_QUERY_SYNC", @@ -349,15 +406,19 @@ do_query( #{query => Query, connector => ResourceId, state => State} ), + ChannelId = get_channel_id(Query), + QueryTuple = get_query_tuple(Query), + #{sql_templates := Templates} = _ChannelState = maps:get(ChannelId, Channels), + %% only insert sql statement for single query and batch query - case apply_template(Query, Templates) of + case apply_template(QueryTuple, Templates) of {?ACTION_SEND_MESSAGE, SQL} -> Result = ecpool:pick_and_do( PoolName, {?MODULE, worker_do_insert, [SQL, State]}, ApplyMode ); - Query -> + QueryTuple -> Result = {error, {unrecoverable_error, invalid_query}}; _ -> Result = {error, {unrecoverable_error, failed_to_apply_sql_template}} @@ -426,8 +487,22 @@ execute(Conn, SQL) -> execute(Conn, SQL, Timeout) -> odbc:sql_query(Conn, str(SQL), Timeout). -to_bin(List) when is_list(List) -> - unicode:characters_to_binary(List, utf8). +get_channel_id([{ChannelId, _Req} | _]) -> + ChannelId; +get_channel_id({ChannelId, _Req}) -> + ChannelId. + +get_query_tuple({_ChannelId, {QueryType, Data}} = _Query) -> + {QueryType, Data}; +get_query_tuple({_ChannelId, Data} = _Query) -> + {send_message, Data}; +get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) -> + error( + {unrecoverable_error, + {invalid_request, <<"The only query type that supports batching is insert.">>}} + ); +get_query_tuple([InsertQuery | _]) -> + get_query_tuple(InsertQuery). %% for bridge data to sql server parse_sql_template(Config) -> @@ -506,3 +581,6 @@ proc_batch_sql(BatchReqs, BatchInserts, Tokens) -> ]) ), <>. + +to_bin(List) when is_list(List) -> + unicode:characters_to_binary(List, utf8). diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 9079bc74e..66dafd119 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -60,6 +60,8 @@ resource_type(syskeeper_forwarder) -> emqx_bridge_syskeeper_connector; resource_type(syskeeper_proxy) -> emqx_bridge_syskeeper_proxy_server; +resource_type(sqlserver) -> + emqx_bridge_sqlserver; resource_type(timescale) -> emqx_postgresql; resource_type(redis) -> @@ -283,6 +285,14 @@ connector_structs() -> required => false } )}, + {sqlserver, + mk( + hoconsc:map(name, ref(emqx_bridge_sqlserver, "config_connector")), + #{ + desc => <<"Microsoft SQL Server Connector Config">>, + required => false + } + )}, {timescale, mk( hoconsc:map(name, ref(emqx_bridge_timescale, "config_connector")), @@ -377,6 +387,7 @@ schema_modules() -> emqx_bridge_mysql, emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_proxy, + emqx_bridge_sqlserver, emqx_bridge_timescale, emqx_postgresql_connector_schema, emqx_bridge_redis_schema, @@ -427,6 +438,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), + api_ref(emqx_bridge_sqlserver, <<"sqlserver">>, Method ++ "_connector"), api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index ee034bd0a..d68514c41 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -166,6 +166,8 @@ connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder]; connector_type_to_bridge_types(syskeeper_proxy) -> []; +connector_type_to_bridge_types(sqlserver) -> + [sqlserver]; connector_type_to_bridge_types(timescale) -> [timescale]; connector_type_to_bridge_types(iotdb) -> diff --git a/apps/emqx_management/src/emqx_mgmt_auth.erl b/apps/emqx_management/src/emqx_mgmt_auth.erl index 2a75db9ea..e5788fea3 100644 --- a/apps/emqx_management/src/emqx_mgmt_auth.erl +++ b/apps/emqx_management/src/emqx_mgmt_auth.erl @@ -311,7 +311,7 @@ maybe_cleanup_api_key(#?APP{name = Name, api_key = ApiKey}) -> %% Note for EMQX-11844: %% emqx.conf has the highest priority - %% if there is a key conflict, delete the old one and keep the key which from the bootstrap filex + %% if there is a key conflict, delete the old one and keep the key which from the bootstrap file ?SLOG(info, #{ msg => "duplicated_apikey_detected", info => <<"Delete duplicated apikeys and write a new one from bootstrap file">>, diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 6e959817d..eba0d33af 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -37,8 +37,8 @@ %% provisional solution: rpc:multicall to all the nodes for creating/updating/removing %% todo: replicate operations -%% store the config and start the instance -export([ + %% store the config and start the instance create_local/4, create_local/5, create_dry_run_local/2, diff --git a/rel/i18n/emqx_bridge_sqlserver.hocon b/rel/i18n/emqx_bridge_sqlserver.hocon index 24e4615f3..aa63b0a89 100644 --- a/rel/i18n/emqx_bridge_sqlserver.hocon +++ b/rel/i18n/emqx_bridge_sqlserver.hocon @@ -46,4 +46,22 @@ sql_template.desc: sql_template.label: """SQL Template""" +action_parameters.desc: +"""Action specific configuration.""" + +action_parameters.label: +"""Action""" + +sqlserver_action.desc: +"""Configuration for Microsoft SOL Server action.""" + +sqlserver_action.label: +"""Microsoft SOL Server Action Configuration""" + +config_connector.desc: +"""Configuration for an Microsoft SOL Server connector.""" + +config_connector.label: +"""Microsoft SOL Server Connector Configuration""" + }