From 9143d5994d412525fe8fee3bce7447bc4e225053 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 23 Feb 2024 15:10:19 +0800 Subject: [PATCH 1/6] feat: refactor MS SQL Server bridge to connector and action --- apps/emqx_bridge/src/emqx_action_info.erl | 1 + .../src/emqx_bridge_sqlserver.app.src | 1 + .../src/emqx_bridge_sqlserver.erl | 101 ++++++++++++++++ .../src/emqx_bridge_sqlserver_action_info.erl | 22 ++++ .../src/emqx_bridge_sqlserver_connector.erl | 114 +++++++++++++++--- .../src/schema/emqx_connector_ee_schema.erl | 12 ++ .../src/schema/emqx_connector_schema.erl | 2 + apps/emqx_management/src/emqx_mgmt_auth.erl | 2 +- apps/emqx_resource/src/emqx_resource.erl | 2 +- rel/i18n/emqx_bridge_sqlserver.hocon | 18 +++ 10 files changed, 255 insertions(+), 20 deletions(-) create mode 100644 apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_action_info.erl diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 4d47f1e09..c6282fc18 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -105,6 +105,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_mysql_action_info, emqx_bridge_pgsql_action_info, emqx_bridge_syskeeper_action_info, + emqx_bridge_sqlserver_action_info, emqx_bridge_timescale_action_info, emqx_bridge_redis_action_info, emqx_bridge_iotdb_action_info, diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src index bddf212e3..1340161a2 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src @@ -3,6 +3,7 @@ {vsn, "0.1.6"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, odbc]}, + {env, [{emqx_action_info_modules, [emqx_bridge_sqlserver_action_info]}]}, {env, []}, {modules, []}, {links, []} diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl index dde0d2c69..461ef2c85 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl @@ -11,6 +11,8 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ + bridge_v2_examples/1, + connector_examples/1, conn_bridge_examples/1 ]). @@ -21,6 +23,9 @@ desc/1 ]). +-define(CONNECTOR_TYPE, sqlserver). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). + -define(DEFAULT_SQL, << "insert into t_mqtt_msg(msgid, topic, qos, payload) " "values ( ${id}, ${topic}, ${qos}, ${payload} )" @@ -28,6 +33,9 @@ -define(DEFAULT_DRIVER, <<"ms-sql">>). +%% ------------------------------------------------------------------------------------------------- +%% api. + conn_bridge_examples(Method) -> [ #{ @@ -65,12 +73,76 @@ values(post) -> values(put) -> values(post). +%% ==================== +%% Bridge V2: Connector + Action + +connector_examples(Method) -> + [ + #{ + <<"sqlserver">> => + #{ + summary => <<"Microsoft SQL Server Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_values() + ) + } + } + ]. + +connector_values() -> + #{ + server => <<"127.0.0.1:1433">>, + database => <<"test">>, + pool_size => 8, + username => <<"sa">>, + password => <<"******">>, + resource_opts => #{health_check_interval => <<"20s">>} + }. + +bridge_v2_examples(Method) -> + [ + #{ + <<"sqlserver">> => + #{ + summary => <<"Microsoft SQL Server Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> + #{parameters => #{sql => ?DEFAULT_SQL}}. + %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions namespace() -> "bridge_sqlserver". roots() -> []. +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(sqlserver_action)); +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields( + Field, + ?CONNECTOR_TYPE, + fields("config_connector") -- emqx_connector_schema:common_fields() + ); +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + emqx_bridge_sqlserver_connector:fields(config) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, @@ -97,6 +169,27 @@ fields("config") -> ] ++ (emqx_bridge_sqlserver_connector:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields()); +fields(action) -> + {?ACTION_TYPE, + mk( + hoconsc:map(name, ref(?MODULE, sqlserver_action)), + #{desc => ?DESC("sqlserver_action"), required => false} + )}; +fields(sqlserver_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk( + ref(?MODULE, action_parameters), + #{required => true, desc => ?DESC(action_parameters)} + ) + ); +fields(action_parameters) -> + [ + {sql, + mk( + binary(), + #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + )} + ]; fields("creation_opts") -> emqx_resource_schema:fields("creation_opts"); fields("post") -> @@ -115,6 +208,14 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for Microsoft SQL Server using `", string:to_upper(Method), "` method."]; desc("creation_opts" = Name) -> emqx_resource_schema:desc(Name); +desc("config_connector") -> + ?DESC("config_connector"); +desc(sqlserver_action) -> + ?DESC("sqlserver_action"); +desc(action_parameters) -> + ?DESC("action_parameters"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> undefined. diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_action_info.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_action_info.erl new file mode 100644 index 000000000..ad8d69081 --- /dev/null +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_sqlserver_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> sqlserver. + +action_type_name() -> sqlserver. + +connector_type_name() -> sqlserver. + +schema_module() -> emqx_bridge_sqlserver. diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 566a9013b..13f03fb55 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -35,7 +35,11 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). %% callbacks for ecpool @@ -172,7 +176,7 @@ server() -> callback_mode() -> always_sync. on_start( - ResourceId = PoolName, + InstanceId = PoolName, #{ server := Server, username := Username, @@ -184,7 +188,7 @@ on_start( ) -> ?SLOG(info, #{ msg => "starting_sqlserver_connector", - connector => ResourceId, + connector => InstanceId, config => emqx_utils:redact(Config) }), @@ -199,7 +203,8 @@ on_start( ok end, - Options = [ + %% odbc connection string required + ConnectOptions = [ {server, to_bin(Server)}, {username, Username}, {password, maps:get(password, Config, emqx_secret:wrap(""))}, @@ -209,12 +214,12 @@ on_start( ], State = #{ - %% also ResourceId + %% also InstanceId pool_name => PoolName, - sql_templates => parse_sql_template(Config), + installed_channels => #{}, resource_opts => ResourceOpts }, - case emqx_resource_pool:start(PoolName, ?MODULE, Options) of + case emqx_resource_pool:start(PoolName, ?MODULE, ConnectOptions) of ok -> {ok, State}; {error, Reason} -> @@ -225,12 +230,61 @@ on_start( {error, Reason} end. -on_stop(ResourceId, _State) -> +on_add_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId, + ChannelConfig +) -> + {ok, ChannelState} = create_channel_state(ChannelConfig), + NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +create_channel_state( + #{parameters := Conf} = _ChannelConfig +) -> + State = #{sql_templates => parse_sql_template(Conf)}, + {ok, State}. + +on_remove_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId +) -> + NewInstalledChannels = maps:remove(ChannelId, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +on_get_channel_status( + InstanceId, + ChannelId, + #{installed_channels := Channels} = State +) -> + case maps:find(ChannelId, Channels) of + {ok, _} -> on_get_status(InstanceId, State); + error -> ?status_disconnected + end. + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + +on_stop(InstanceId, _State) -> + ?tp( + sqlserver_connector_on_stop, + #{instance_id => InstanceId} + ), ?SLOG(info, #{ msg => "stopping_sqlserver_connector", - connector => ResourceId + connector => InstanceId }), - emqx_resource_pool:stop(ResourceId). + emqx_resource_pool:stop(InstanceId). -spec on_query( resource_id(), @@ -273,8 +327,8 @@ on_get_status(_InstanceId, #{pool_name := PoolName} = _State) -> ), status_result(Health). -status_result(_Status = true) -> connected; -status_result(_Status = false) -> connecting. +status_result(_Status = true) -> ?status_connected; +status_result(_Status = false) -> ?status_connecting. %% TODO: %% case for disconnected @@ -296,7 +350,7 @@ do_get_status(Conn) -> end. %%==================================================================== -%% Internal Helper fns +%% Internal Functions %%==================================================================== %% TODO && FIXME: @@ -341,7 +395,10 @@ do_query( ResourceId, Query, ApplyMode, - #{pool_name := PoolName, sql_templates := Templates} = State + #{ + pool_name := PoolName, + installed_channels := Channels + } = State ) -> ?TRACE( "SINGLE_QUERY_SYNC", @@ -349,15 +406,19 @@ do_query( #{query => Query, connector => ResourceId, state => State} ), + ChannelId = get_channel_id(Query), + QueryTuple = get_query_tuple(Query), + #{sql_templates := Templates} = _ChannelState = maps:get(ChannelId, Channels), + %% only insert sql statement for single query and batch query - case apply_template(Query, Templates) of + case apply_template(QueryTuple, Templates) of {?ACTION_SEND_MESSAGE, SQL} -> Result = ecpool:pick_and_do( PoolName, {?MODULE, worker_do_insert, [SQL, State]}, ApplyMode ); - Query -> + QueryTuple -> Result = {error, {unrecoverable_error, invalid_query}}; _ -> Result = {error, {unrecoverable_error, failed_to_apply_sql_template}} @@ -426,8 +487,22 @@ execute(Conn, SQL) -> execute(Conn, SQL, Timeout) -> odbc:sql_query(Conn, str(SQL), Timeout). -to_bin(List) when is_list(List) -> - unicode:characters_to_binary(List, utf8). +get_channel_id([{ChannelId, _Req} | _]) -> + ChannelId; +get_channel_id({ChannelId, _Req}) -> + ChannelId. + +get_query_tuple({_ChannelId, {QueryType, Data}} = _Query) -> + {QueryType, Data}; +get_query_tuple({_ChannelId, Data} = _Query) -> + {send_message, Data}; +get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) -> + error( + {unrecoverable_error, + {invalid_request, <<"The only query type that supports batching is insert.">>}} + ); +get_query_tuple([InsertQuery | _]) -> + get_query_tuple(InsertQuery). %% for bridge data to sql server parse_sql_template(Config) -> @@ -506,3 +581,6 @@ proc_batch_sql(BatchReqs, BatchInserts, Tokens) -> ]) ), <>. + +to_bin(List) when is_list(List) -> + unicode:characters_to_binary(List, utf8). diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 9079bc74e..66dafd119 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -60,6 +60,8 @@ resource_type(syskeeper_forwarder) -> emqx_bridge_syskeeper_connector; resource_type(syskeeper_proxy) -> emqx_bridge_syskeeper_proxy_server; +resource_type(sqlserver) -> + emqx_bridge_sqlserver; resource_type(timescale) -> emqx_postgresql; resource_type(redis) -> @@ -283,6 +285,14 @@ connector_structs() -> required => false } )}, + {sqlserver, + mk( + hoconsc:map(name, ref(emqx_bridge_sqlserver, "config_connector")), + #{ + desc => <<"Microsoft SQL Server Connector Config">>, + required => false + } + )}, {timescale, mk( hoconsc:map(name, ref(emqx_bridge_timescale, "config_connector")), @@ -377,6 +387,7 @@ schema_modules() -> emqx_bridge_mysql, emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_proxy, + emqx_bridge_sqlserver, emqx_bridge_timescale, emqx_postgresql_connector_schema, emqx_bridge_redis_schema, @@ -427,6 +438,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), + api_ref(emqx_bridge_sqlserver, <<"sqlserver">>, Method ++ "_connector"), api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index ee034bd0a..d68514c41 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -166,6 +166,8 @@ connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder]; connector_type_to_bridge_types(syskeeper_proxy) -> []; +connector_type_to_bridge_types(sqlserver) -> + [sqlserver]; connector_type_to_bridge_types(timescale) -> [timescale]; connector_type_to_bridge_types(iotdb) -> diff --git a/apps/emqx_management/src/emqx_mgmt_auth.erl b/apps/emqx_management/src/emqx_mgmt_auth.erl index 2a75db9ea..e5788fea3 100644 --- a/apps/emqx_management/src/emqx_mgmt_auth.erl +++ b/apps/emqx_management/src/emqx_mgmt_auth.erl @@ -311,7 +311,7 @@ maybe_cleanup_api_key(#?APP{name = Name, api_key = ApiKey}) -> %% Note for EMQX-11844: %% emqx.conf has the highest priority - %% if there is a key conflict, delete the old one and keep the key which from the bootstrap filex + %% if there is a key conflict, delete the old one and keep the key which from the bootstrap file ?SLOG(info, #{ msg => "duplicated_apikey_detected", info => <<"Delete duplicated apikeys and write a new one from bootstrap file">>, diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 6e959817d..eba0d33af 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -37,8 +37,8 @@ %% provisional solution: rpc:multicall to all the nodes for creating/updating/removing %% todo: replicate operations -%% store the config and start the instance -export([ + %% store the config and start the instance create_local/4, create_local/5, create_dry_run_local/2, diff --git a/rel/i18n/emqx_bridge_sqlserver.hocon b/rel/i18n/emqx_bridge_sqlserver.hocon index 24e4615f3..aa63b0a89 100644 --- a/rel/i18n/emqx_bridge_sqlserver.hocon +++ b/rel/i18n/emqx_bridge_sqlserver.hocon @@ -46,4 +46,22 @@ sql_template.desc: sql_template.label: """SQL Template""" +action_parameters.desc: +"""Action specific configuration.""" + +action_parameters.label: +"""Action""" + +sqlserver_action.desc: +"""Configuration for Microsoft SOL Server action.""" + +sqlserver_action.label: +"""Microsoft SOL Server Action Configuration""" + +config_connector.desc: +"""Configuration for an Microsoft SOL Server connector.""" + +config_connector.label: +"""Microsoft SOL Server Connector Configuration""" + } From 00d50479f59ec44ec8aa7eb4a384b8900bc9cf13 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 1 Mar 2024 10:25:13 +0800 Subject: [PATCH 2/6] fix: compatible with bridge v1 --- .../emqx_bridge_clickhouse_action_info.erl | 4 ++ .../src/emqx_bridge_sqlserver.erl | 29 +++++++-- .../src/emqx_bridge_sqlserver_action_info.erl | 60 +++++++++++++++++-- .../src/schema/emqx_connector_ee_schema.erl | 2 +- 4 files changed, 83 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_action_info.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_action_info.erl index 066569f7e..d951497bf 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_action_info.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_action_info.erl @@ -1,3 +1,7 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + -module(emqx_bridge_clickhouse_action_info). -behaviour(emqx_action_info). diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl index 461ef2c85..c7c3a4416 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl @@ -96,6 +96,7 @@ connector_values() -> pool_size => 8, username => <<"sa">>, password => <<"******">>, + driver => ?DEFAULT_DRIVER, resource_opts => #{health_check_interval => <<"20s">>} }. @@ -113,7 +114,10 @@ bridge_v2_examples(Method) -> ]. action_values() -> - #{parameters => #{sql => ?DEFAULT_SQL}}. + #{ + <<"parameters">> => + #{<<"sql">> => ?DEFAULT_SQL} + }. %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions @@ -138,9 +142,20 @@ fields(Field) when fields("config_connector") -- emqx_connector_schema:common_fields() ); fields("config_connector") -> - emqx_connector_schema:common_fields() ++ - emqx_bridge_sqlserver_connector:fields(config) ++ - emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); + Config = + driver_fields() ++ + emqx_connector_schema:common_fields() ++ + emqx_bridge_sqlserver_connector:fields(config) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + lists:foldl( + fun(Key, Acc) -> + proplists:delete(Key, Acc) + end, + Config, + [ + auto_reconnect + ] + ); fields(connector_resource_opts) -> emqx_connector_schema:resource_opts_fields(); fields("config") -> @@ -151,7 +166,6 @@ fields("config") -> binary(), #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} )}, - {driver, mk(binary(), #{desc => ?DESC("driver"), default => ?DEFAULT_DRIVER})}, {local_topic, mk( binary(), @@ -166,7 +180,7 @@ fields("config") -> desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) } )} - ] ++ + ] ++ driver_fields() ++ (emqx_bridge_sqlserver_connector:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields()); fields(action) -> @@ -202,6 +216,9 @@ fields("get") -> fields("post", Type) -> [type_field(Type), name_field() | fields("config")]. +driver_fields() -> + [{driver, mk(binary(), #{desc => ?DESC("driver"), default => ?DEFAULT_DRIVER})}]. + desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_action_info.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_action_info.erl index ad8d69081..34754fe26 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_action_info.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_action_info.erl @@ -6,17 +6,67 @@ -behaviour(emqx_action_info). +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + -export([ bridge_v1_type_name/0, action_type_name/0, connector_type_name/0, - schema_module/0 + schema_module/0, + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1, + connector_action_config_to_bridge_v1_config/2 ]). -bridge_v1_type_name() -> sqlserver. +-import(emqx_utils_conv, [bin/1]). -action_type_name() -> sqlserver. +-define(ACTION_TYPE, sqlserver). +-define(SCHEMA_MODULE, emqx_bridge_sqlserver). -connector_type_name() -> sqlserver. +bridge_v1_type_name() -> ?ACTION_TYPE. -schema_module() -> emqx_bridge_sqlserver. +action_type_name() -> ?ACTION_TYPE. + +connector_type_name() -> ?ACTION_TYPE. + +schema_module() -> ?SCHEMA_MODULE. + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(sqlserver_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig#{<<"connector">> => ConnectorName} + ). + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ActionTopLevelKeys = schema_keys(sqlserver_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ConnectorTopLevelKeys = schema_keys("config_connector"), + ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys), + ConnConfig0 = maps:with(ConnectorKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + ConnConfig0 + ). + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + V1Config = emqx_action_info:connector_action_config_to_bridge_v1_config( + ConnectorConfig, ActionConfig + ), + maps:remove(<<"local_topic">>, V1Config). + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config), + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). + +schema_keys(Name) -> + schema_keys(?SCHEMA_MODULE, Name). + +schema_keys(Mod, Name) -> + [bin(Key) || Key <- proplists:get_keys(Mod:fields(Name))]. diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 66dafd119..d45d5de35 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -61,7 +61,7 @@ resource_type(syskeeper_forwarder) -> resource_type(syskeeper_proxy) -> emqx_bridge_syskeeper_proxy_server; resource_type(sqlserver) -> - emqx_bridge_sqlserver; + emqx_bridge_sqlserver_connector; resource_type(timescale) -> emqx_postgresql; resource_type(redis) -> From 1d5809217616bcb6d4360318c734f4c2f04b46b8 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 1 Mar 2024 11:36:15 +0800 Subject: [PATCH 3/6] fix: bridge v2 `on_query` matched ChannelId --- .../src/emqx_bridge_sqlserver_connector.erl | 12 ++++++------ rel/i18n/emqx_bridge_sqlserver.hocon | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 13f03fb55..1eb9746dc 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -128,9 +128,9 @@ %% -type size() :: integer(). -type state() :: #{ + installed_channels := map(), pool_name := binary(), - resource_opts := map(), - sql_templates := map() + resource_opts := map() }. %%==================================================================== @@ -288,14 +288,14 @@ on_stop(InstanceId, _State) -> -spec on_query( resource_id(), - {?ACTION_SEND_MESSAGE, map()}, + Query :: {channel_id(), map()}, state() ) -> ok | {ok, list()} | {error, {recoverable_error, term()}} | {error, term()}. -on_query(ResourceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> +on_query(ResourceId, {_ChannelId, _Msg} = Query, State) -> ?TRACE( "SINGLE_QUERY_SYNC", "bridge_sqlserver_received", @@ -305,7 +305,7 @@ on_query(ResourceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> -spec on_batch_query( resource_id(), - [{?ACTION_SEND_MESSAGE, map()}], + [{channel_id(), map()}], state() ) -> ok @@ -383,7 +383,7 @@ conn_str([{_, _} | Opts], Acc) -> %% Query with singe & batch sql statement -spec do_query( resource_id(), - Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}], + Query :: {channel_id(), map()} | [{channel_id(), map()}], ApplyMode :: handover, state() ) -> diff --git a/rel/i18n/emqx_bridge_sqlserver.hocon b/rel/i18n/emqx_bridge_sqlserver.hocon index aa63b0a89..b1c7f8405 100644 --- a/rel/i18n/emqx_bridge_sqlserver.hocon +++ b/rel/i18n/emqx_bridge_sqlserver.hocon @@ -59,7 +59,7 @@ sqlserver_action.label: """Microsoft SOL Server Action Configuration""" config_connector.desc: -"""Configuration for an Microsoft SOL Server connector.""" +"""Configuration for a Microsoft SOL Server connector.""" config_connector.label: """Microsoft SOL Server Connector Configuration""" From 2e3003e0f1e9ddedc440ef4b130232be3a30f940 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 1 Mar 2024 16:14:19 +0800 Subject: [PATCH 4/6] test: sqlserver bridge v2 --- .../test/emqx_bridge_sqlserver_SUITE.erl | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl b/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl index c3647d838..4bd67fc00 100644 --- a/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl +++ b/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl @@ -249,6 +249,7 @@ t_create_disconnected(Config) -> ?assertMatch({ok, _}, create_bridge(Config)), health_check_resource_down(Config) end), + timer:sleep(10_000), health_check_resource_ok(Config), ok. @@ -317,7 +318,8 @@ t_simple_query(Config) -> {ok, _}, create_bridge(Config) ), - {Requests, Vals} = gen_batch_req(BatchSize), + + {Requests, Vals} = gen_batch_req(Config, BatchSize), ?check_trace( begin ?wait_async_action( @@ -519,14 +521,15 @@ create_bridge_http(Params) -> send_message(Config, Payload) -> Name = ?config(sqlserver_name, Config), BridgeType = ?config(sqlserver_bridge_type, Config), - BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), - emqx_bridge:send_message(BridgeID, Payload). + ActionId = emqx_bridge_v2:id(BridgeType, Name), + emqx_bridge_v2:query(BridgeType, Name, {ActionId, Payload}, #{}). query_resource(Config, Request) -> Name = ?config(sqlserver_name, Config), BridgeType = ?config(sqlserver_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). + ID = emqx_bridge_v2:id(BridgeType, Name), + ResID = emqx_connector_resource:resource_id(BridgeType, Name), + emqx_resource:query(ID, Request, #{timeout => 1_000, connector_resource_id => ResID}). query_resource_async(Config, Request) -> Name = ?config(sqlserver_name, Config), @@ -545,7 +548,10 @@ resource_id(Config) -> _ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name). health_check_resource_ok(Config) -> - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))). + BridgeType = ?config(sqlserver_bridge_type, Config), + Name = ?config(sqlserver_name, Config), + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))), + ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)). health_check_resource_down(Config) -> case emqx_resource_manager:health_check(resource_id(Config)) of @@ -666,13 +672,16 @@ sent_data(Payload) -> qos => 0 }. -gen_batch_req(Count) when +gen_batch_req(Config, Count) when is_integer(Count) andalso Count > 0 -> + BridgeType = ?config(sqlserver_bridge_type, Config), + Name = ?config(sqlserver_name, Config), + ActionId = emqx_bridge_v2:id(BridgeType, Name), Vals = [{str(erlang:unique_integer())} || _Seq <- lists:seq(1, Count)], - Requests = [{send_message, sent_data(Payload)} || {Payload} <- Vals], + Requests = [{ActionId, sent_data(Payload)} || {Payload} <- Vals], {Requests, Vals}; -gen_batch_req(Count) -> +gen_batch_req(_Config, Count) -> ct:pal("Gen batch requests failed with unexpected Count: ~p", [Count]). str(List) when is_list(List) -> From 4623b73f478c0f436178f4111cb8e6d60dd00e5a Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 1 Mar 2024 17:29:31 +0800 Subject: [PATCH 5/6] fix: sqlserver connector auto reconnect --- .../src/emqx_bridge_sqlserver.erl | 18 ++++-------------- .../test/emqx_bridge_sqlserver_SUITE.erl | 11 +++++++++-- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl index c7c3a4416..850f97487 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.erl @@ -142,20 +142,10 @@ fields(Field) when fields("config_connector") -- emqx_connector_schema:common_fields() ); fields("config_connector") -> - Config = - driver_fields() ++ - emqx_connector_schema:common_fields() ++ - emqx_bridge_sqlserver_connector:fields(config) ++ - emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), - lists:foldl( - fun(Key, Acc) -> - proplists:delete(Key, Acc) - end, - Config, - [ - auto_reconnect - ] - ); + driver_fields() ++ + emqx_connector_schema:common_fields() ++ + emqx_bridge_sqlserver_connector:fields(config) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); fields(connector_resource_opts) -> emqx_connector_schema:resource_opts_fields(); fields("config") -> diff --git a/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl b/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl index 4bd67fc00..4d292254c 100644 --- a/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl +++ b/apps/emqx_bridge_sqlserver/test/emqx_bridge_sqlserver_SUITE.erl @@ -550,8 +550,15 @@ resource_id(Config) -> health_check_resource_ok(Config) -> BridgeType = ?config(sqlserver_bridge_type, Config), Name = ?config(sqlserver_name, Config), - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))), - ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)). + % Wait for reconnection. + ?retry( + _Sleep = 1_000, + _Attempts = 10, + begin + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))), + ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)) + end + ). health_check_resource_down(Config) -> case emqx_resource_manager:health_check(resource_id(Config)) of From 441b6f7a0db1e877c2b02ac2097df1b9bd97eabf Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 4 Mar 2024 17:14:05 +0800 Subject: [PATCH 6/6] docs: MS SQL Server bridge v2 change log --- changes/ee/feat-12619.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ee/feat-12619.en.md diff --git a/changes/ee/feat-12619.en.md b/changes/ee/feat-12619.en.md new file mode 100644 index 000000000..426240b5c --- /dev/null +++ b/changes/ee/feat-12619.en.md @@ -0,0 +1 @@ +The Microsoft SQL Server bridge has been split into connector and action components. Old Microsoft SQL Server bridges will be upgraded automatically.