Merge pull request #12619 from JimMoen/EMQX-11468/refactor/sqlserver
feat: refactor MS SQL Server bridge to connector and action
This commit is contained in:
commit
3e7311ee31
|
@ -105,6 +105,7 @@ hard_coded_action_info_modules_ee() ->
|
|||
emqx_bridge_mysql_action_info,
|
||||
emqx_bridge_pgsql_action_info,
|
||||
emqx_bridge_syskeeper_action_info,
|
||||
emqx_bridge_sqlserver_action_info,
|
||||
emqx_bridge_timescale_action_info,
|
||||
emqx_bridge_redis_action_info,
|
||||
emqx_bridge_iotdb_action_info,
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_clickhouse_action_info).
|
||||
|
||||
-behaviour(emqx_action_info).
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
{vsn, "0.1.6"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, emqx_resource, odbc]},
|
||||
{env, [{emqx_action_info_modules, [emqx_bridge_sqlserver_action_info]}]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{links, []}
|
||||
|
|
|
@ -11,6 +11,8 @@
|
|||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
||||
-export([
|
||||
bridge_v2_examples/1,
|
||||
connector_examples/1,
|
||||
conn_bridge_examples/1
|
||||
]).
|
||||
|
||||
|
@ -21,6 +23,9 @@
|
|||
desc/1
|
||||
]).
|
||||
|
||||
-define(CONNECTOR_TYPE, sqlserver).
|
||||
-define(ACTION_TYPE, ?CONNECTOR_TYPE).
|
||||
|
||||
-define(DEFAULT_SQL, <<
|
||||
"insert into t_mqtt_msg(msgid, topic, qos, payload) "
|
||||
"values ( ${id}, ${topic}, ${qos}, ${payload} )"
|
||||
|
@ -28,6 +33,9 @@
|
|||
|
||||
-define(DEFAULT_DRIVER, <<"ms-sql">>).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% api.
|
||||
|
||||
conn_bridge_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
|
@ -65,12 +73,81 @@ values(post) ->
|
|||
values(put) ->
|
||||
values(post).
|
||||
|
||||
%% ====================
|
||||
%% Bridge V2: Connector + Action
|
||||
|
||||
connector_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"sqlserver">> =>
|
||||
#{
|
||||
summary => <<"Microsoft SQL Server Connector">>,
|
||||
value => emqx_connector_schema:connector_values(
|
||||
Method, ?CONNECTOR_TYPE, connector_values()
|
||||
)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
connector_values() ->
|
||||
#{
|
||||
server => <<"127.0.0.1:1433">>,
|
||||
database => <<"test">>,
|
||||
pool_size => 8,
|
||||
username => <<"sa">>,
|
||||
password => <<"******">>,
|
||||
driver => ?DEFAULT_DRIVER,
|
||||
resource_opts => #{health_check_interval => <<"20s">>}
|
||||
}.
|
||||
|
||||
bridge_v2_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"sqlserver">> =>
|
||||
#{
|
||||
summary => <<"Microsoft SQL Server Action">>,
|
||||
value => emqx_bridge_v2_schema:action_values(
|
||||
Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
|
||||
)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
action_values() ->
|
||||
#{
|
||||
<<"parameters">> =>
|
||||
#{<<"sql">> => ?DEFAULT_SQL}
|
||||
}.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Hocon Schema Definitions
|
||||
namespace() -> "bridge_sqlserver".
|
||||
|
||||
roots() -> [].
|
||||
|
||||
fields(Field) when
|
||||
Field == "get_bridge_v2";
|
||||
Field == "post_bridge_v2";
|
||||
Field == "put_bridge_v2"
|
||||
->
|
||||
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(sqlserver_action));
|
||||
fields(Field) when
|
||||
Field == "get_connector";
|
||||
Field == "put_connector";
|
||||
Field == "post_connector"
|
||||
->
|
||||
emqx_connector_schema:api_fields(
|
||||
Field,
|
||||
?CONNECTOR_TYPE,
|
||||
fields("config_connector") -- emqx_connector_schema:common_fields()
|
||||
);
|
||||
fields("config_connector") ->
|
||||
driver_fields() ++
|
||||
emqx_connector_schema:common_fields() ++
|
||||
emqx_bridge_sqlserver_connector:fields(config) ++
|
||||
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
|
||||
fields(connector_resource_opts) ->
|
||||
emqx_connector_schema:resource_opts_fields();
|
||||
fields("config") ->
|
||||
[
|
||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||
|
@ -79,7 +156,6 @@ fields("config") ->
|
|||
binary(),
|
||||
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
||||
)},
|
||||
{driver, mk(binary(), #{desc => ?DESC("driver"), default => ?DEFAULT_DRIVER})},
|
||||
{local_topic,
|
||||
mk(
|
||||
binary(),
|
||||
|
@ -94,9 +170,30 @@ fields("config") ->
|
|||
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
|
||||
}
|
||||
)}
|
||||
] ++
|
||||
] ++ driver_fields() ++
|
||||
(emqx_bridge_sqlserver_connector:fields(config) --
|
||||
emqx_connector_schema_lib:prepare_statement_fields());
|
||||
fields(action) ->
|
||||
{?ACTION_TYPE,
|
||||
mk(
|
||||
hoconsc:map(name, ref(?MODULE, sqlserver_action)),
|
||||
#{desc => ?DESC("sqlserver_action"), required => false}
|
||||
)};
|
||||
fields(sqlserver_action) ->
|
||||
emqx_bridge_v2_schema:make_producer_action_schema(
|
||||
mk(
|
||||
ref(?MODULE, action_parameters),
|
||||
#{required => true, desc => ?DESC(action_parameters)}
|
||||
)
|
||||
);
|
||||
fields(action_parameters) ->
|
||||
[
|
||||
{sql,
|
||||
mk(
|
||||
binary(),
|
||||
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
|
||||
)}
|
||||
];
|
||||
fields("creation_opts") ->
|
||||
emqx_resource_schema:fields("creation_opts");
|
||||
fields("post") ->
|
||||
|
@ -109,12 +206,23 @@ fields("get") ->
|
|||
fields("post", Type) ->
|
||||
[type_field(Type), name_field() | fields("config")].
|
||||
|
||||
driver_fields() ->
|
||||
[{driver, mk(binary(), #{desc => ?DESC("driver"), default => ?DEFAULT_DRIVER})}].
|
||||
|
||||
desc("config") ->
|
||||
?DESC("desc_config");
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
["Configuration for Microsoft SQL Server using `", string:to_upper(Method), "` method."];
|
||||
desc("creation_opts" = Name) ->
|
||||
emqx_resource_schema:desc(Name);
|
||||
desc("config_connector") ->
|
||||
?DESC("config_connector");
|
||||
desc(sqlserver_action) ->
|
||||
?DESC("sqlserver_action");
|
||||
desc(action_parameters) ->
|
||||
?DESC("action_parameters");
|
||||
desc(connector_resource_opts) ->
|
||||
?DESC(emqx_resource_schema, "resource_opts");
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_sqlserver_action_info).
|
||||
|
||||
-behaviour(emqx_action_info).
|
||||
|
||||
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
|
||||
|
||||
-export([
|
||||
bridge_v1_type_name/0,
|
||||
action_type_name/0,
|
||||
connector_type_name/0,
|
||||
schema_module/0,
|
||||
bridge_v1_config_to_action_config/2,
|
||||
bridge_v1_config_to_connector_config/1,
|
||||
connector_action_config_to_bridge_v1_config/2
|
||||
]).
|
||||
|
||||
-import(emqx_utils_conv, [bin/1]).
|
||||
|
||||
-define(ACTION_TYPE, sqlserver).
|
||||
-define(SCHEMA_MODULE, emqx_bridge_sqlserver).
|
||||
|
||||
bridge_v1_type_name() -> ?ACTION_TYPE.
|
||||
|
||||
action_type_name() -> ?ACTION_TYPE.
|
||||
|
||||
connector_type_name() -> ?ACTION_TYPE.
|
||||
|
||||
schema_module() -> ?SCHEMA_MODULE.
|
||||
|
||||
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
|
||||
ActionTopLevelKeys = schema_keys(sqlserver_action),
|
||||
ActionParametersKeys = schema_keys(action_parameters),
|
||||
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
|
||||
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
|
||||
emqx_utils_maps:update_if_present(
|
||||
<<"resource_opts">>,
|
||||
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
|
||||
ActionConfig#{<<"connector">> => ConnectorName}
|
||||
).
|
||||
|
||||
bridge_v1_config_to_connector_config(BridgeV1Config) ->
|
||||
ActionTopLevelKeys = schema_keys(sqlserver_action),
|
||||
ActionParametersKeys = schema_keys(action_parameters),
|
||||
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
|
||||
ConnectorTopLevelKeys = schema_keys("config_connector"),
|
||||
ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys),
|
||||
ConnConfig0 = maps:with(ConnectorKeys, BridgeV1Config),
|
||||
emqx_utils_maps:update_if_present(
|
||||
<<"resource_opts">>,
|
||||
fun emqx_connector_schema:project_to_connector_resource_opts/1,
|
||||
ConnConfig0
|
||||
).
|
||||
|
||||
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
|
||||
V1Config = emqx_action_info:connector_action_config_to_bridge_v1_config(
|
||||
ConnectorConfig, ActionConfig
|
||||
),
|
||||
maps:remove(<<"local_topic">>, V1Config).
|
||||
|
||||
make_config_map(PickKeys, IndentKeys, Config) ->
|
||||
Conf0 = maps:with(PickKeys, Config),
|
||||
emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
|
||||
|
||||
schema_keys(Name) ->
|
||||
schema_keys(?SCHEMA_MODULE, Name).
|
||||
|
||||
schema_keys(Mod, Name) ->
|
||||
[bin(Key) || Key <- proplists:get_keys(Mod:fields(Name))].
|
|
@ -35,7 +35,11 @@
|
|||
on_stop/2,
|
||||
on_query/3,
|
||||
on_batch_query/3,
|
||||
on_get_status/2
|
||||
on_get_status/2,
|
||||
on_add_channel/4,
|
||||
on_remove_channel/3,
|
||||
on_get_channels/1,
|
||||
on_get_channel_status/3
|
||||
]).
|
||||
|
||||
%% callbacks for ecpool
|
||||
|
@ -124,9 +128,9 @@
|
|||
%% -type size() :: integer().
|
||||
|
||||
-type state() :: #{
|
||||
installed_channels := map(),
|
||||
pool_name := binary(),
|
||||
resource_opts := map(),
|
||||
sql_templates := map()
|
||||
resource_opts := map()
|
||||
}.
|
||||
|
||||
%%====================================================================
|
||||
|
@ -172,7 +176,7 @@ server() ->
|
|||
callback_mode() -> always_sync.
|
||||
|
||||
on_start(
|
||||
ResourceId = PoolName,
|
||||
InstanceId = PoolName,
|
||||
#{
|
||||
server := Server,
|
||||
username := Username,
|
||||
|
@ -184,7 +188,7 @@ on_start(
|
|||
) ->
|
||||
?SLOG(info, #{
|
||||
msg => "starting_sqlserver_connector",
|
||||
connector => ResourceId,
|
||||
connector => InstanceId,
|
||||
config => emqx_utils:redact(Config)
|
||||
}),
|
||||
|
||||
|
@ -199,7 +203,8 @@ on_start(
|
|||
ok
|
||||
end,
|
||||
|
||||
Options = [
|
||||
%% odbc connection string required
|
||||
ConnectOptions = [
|
||||
{server, to_bin(Server)},
|
||||
{username, Username},
|
||||
{password, maps:get(password, Config, emqx_secret:wrap(""))},
|
||||
|
@ -209,12 +214,12 @@ on_start(
|
|||
],
|
||||
|
||||
State = #{
|
||||
%% also ResourceId
|
||||
%% also InstanceId
|
||||
pool_name => PoolName,
|
||||
sql_templates => parse_sql_template(Config),
|
||||
installed_channels => #{},
|
||||
resource_opts => ResourceOpts
|
||||
},
|
||||
case emqx_resource_pool:start(PoolName, ?MODULE, Options) of
|
||||
case emqx_resource_pool:start(PoolName, ?MODULE, ConnectOptions) of
|
||||
ok ->
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
|
@ -225,23 +230,72 @@ on_start(
|
|||
{error, Reason}
|
||||
end.
|
||||
|
||||
on_stop(ResourceId, _State) ->
|
||||
on_add_channel(
|
||||
_InstId,
|
||||
#{
|
||||
installed_channels := InstalledChannels
|
||||
} = OldState,
|
||||
ChannelId,
|
||||
ChannelConfig
|
||||
) ->
|
||||
{ok, ChannelState} = create_channel_state(ChannelConfig),
|
||||
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
|
||||
%% Update state
|
||||
NewState = OldState#{installed_channels => NewInstalledChannels},
|
||||
{ok, NewState}.
|
||||
|
||||
create_channel_state(
|
||||
#{parameters := Conf} = _ChannelConfig
|
||||
) ->
|
||||
State = #{sql_templates => parse_sql_template(Conf)},
|
||||
{ok, State}.
|
||||
|
||||
on_remove_channel(
|
||||
_InstId,
|
||||
#{
|
||||
installed_channels := InstalledChannels
|
||||
} = OldState,
|
||||
ChannelId
|
||||
) ->
|
||||
NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
|
||||
%% Update state
|
||||
NewState = OldState#{installed_channels => NewInstalledChannels},
|
||||
{ok, NewState}.
|
||||
|
||||
on_get_channel_status(
|
||||
InstanceId,
|
||||
ChannelId,
|
||||
#{installed_channels := Channels} = State
|
||||
) ->
|
||||
case maps:find(ChannelId, Channels) of
|
||||
{ok, _} -> on_get_status(InstanceId, State);
|
||||
error -> ?status_disconnected
|
||||
end.
|
||||
|
||||
on_get_channels(ResId) ->
|
||||
emqx_bridge_v2:get_channels_for_connector(ResId).
|
||||
|
||||
on_stop(InstanceId, _State) ->
|
||||
?tp(
|
||||
sqlserver_connector_on_stop,
|
||||
#{instance_id => InstanceId}
|
||||
),
|
||||
?SLOG(info, #{
|
||||
msg => "stopping_sqlserver_connector",
|
||||
connector => ResourceId
|
||||
connector => InstanceId
|
||||
}),
|
||||
emqx_resource_pool:stop(ResourceId).
|
||||
emqx_resource_pool:stop(InstanceId).
|
||||
|
||||
-spec on_query(
|
||||
resource_id(),
|
||||
{?ACTION_SEND_MESSAGE, map()},
|
||||
Query :: {channel_id(), map()},
|
||||
state()
|
||||
) ->
|
||||
ok
|
||||
| {ok, list()}
|
||||
| {error, {recoverable_error, term()}}
|
||||
| {error, term()}.
|
||||
on_query(ResourceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
|
||||
on_query(ResourceId, {_ChannelId, _Msg} = Query, State) ->
|
||||
?TRACE(
|
||||
"SINGLE_QUERY_SYNC",
|
||||
"bridge_sqlserver_received",
|
||||
|
@ -251,7 +305,7 @@ on_query(ResourceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) ->
|
|||
|
||||
-spec on_batch_query(
|
||||
resource_id(),
|
||||
[{?ACTION_SEND_MESSAGE, map()}],
|
||||
[{channel_id(), map()}],
|
||||
state()
|
||||
) ->
|
||||
ok
|
||||
|
@ -273,8 +327,8 @@ on_get_status(_InstanceId, #{pool_name := PoolName} = _State) ->
|
|||
),
|
||||
status_result(Health).
|
||||
|
||||
status_result(_Status = true) -> connected;
|
||||
status_result(_Status = false) -> connecting.
|
||||
status_result(_Status = true) -> ?status_connected;
|
||||
status_result(_Status = false) -> ?status_connecting.
|
||||
%% TODO:
|
||||
%% case for disconnected
|
||||
|
||||
|
@ -296,7 +350,7 @@ do_get_status(Conn) ->
|
|||
end.
|
||||
|
||||
%%====================================================================
|
||||
%% Internal Helper fns
|
||||
%% Internal Functions
|
||||
%%====================================================================
|
||||
|
||||
%% TODO && FIXME:
|
||||
|
@ -329,7 +383,7 @@ conn_str([{_, _} | Opts], Acc) ->
|
|||
%% Query with singe & batch sql statement
|
||||
-spec do_query(
|
||||
resource_id(),
|
||||
Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}],
|
||||
Query :: {channel_id(), map()} | [{channel_id(), map()}],
|
||||
ApplyMode :: handover,
|
||||
state()
|
||||
) ->
|
||||
|
@ -341,7 +395,10 @@ do_query(
|
|||
ResourceId,
|
||||
Query,
|
||||
ApplyMode,
|
||||
#{pool_name := PoolName, sql_templates := Templates} = State
|
||||
#{
|
||||
pool_name := PoolName,
|
||||
installed_channels := Channels
|
||||
} = State
|
||||
) ->
|
||||
?TRACE(
|
||||
"SINGLE_QUERY_SYNC",
|
||||
|
@ -349,15 +406,19 @@ do_query(
|
|||
#{query => Query, connector => ResourceId, state => State}
|
||||
),
|
||||
|
||||
ChannelId = get_channel_id(Query),
|
||||
QueryTuple = get_query_tuple(Query),
|
||||
#{sql_templates := Templates} = _ChannelState = maps:get(ChannelId, Channels),
|
||||
|
||||
%% only insert sql statement for single query and batch query
|
||||
case apply_template(Query, Templates) of
|
||||
case apply_template(QueryTuple, Templates) of
|
||||
{?ACTION_SEND_MESSAGE, SQL} ->
|
||||
Result = ecpool:pick_and_do(
|
||||
PoolName,
|
||||
{?MODULE, worker_do_insert, [SQL, State]},
|
||||
ApplyMode
|
||||
);
|
||||
Query ->
|
||||
QueryTuple ->
|
||||
Result = {error, {unrecoverable_error, invalid_query}};
|
||||
_ ->
|
||||
Result = {error, {unrecoverable_error, failed_to_apply_sql_template}}
|
||||
|
@ -426,8 +487,22 @@ execute(Conn, SQL) ->
|
|||
execute(Conn, SQL, Timeout) ->
|
||||
odbc:sql_query(Conn, str(SQL), Timeout).
|
||||
|
||||
to_bin(List) when is_list(List) ->
|
||||
unicode:characters_to_binary(List, utf8).
|
||||
get_channel_id([{ChannelId, _Req} | _]) ->
|
||||
ChannelId;
|
||||
get_channel_id({ChannelId, _Req}) ->
|
||||
ChannelId.
|
||||
|
||||
get_query_tuple({_ChannelId, {QueryType, Data}} = _Query) ->
|
||||
{QueryType, Data};
|
||||
get_query_tuple({_ChannelId, Data} = _Query) ->
|
||||
{send_message, Data};
|
||||
get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) ->
|
||||
error(
|
||||
{unrecoverable_error,
|
||||
{invalid_request, <<"The only query type that supports batching is insert.">>}}
|
||||
);
|
||||
get_query_tuple([InsertQuery | _]) ->
|
||||
get_query_tuple(InsertQuery).
|
||||
|
||||
%% for bridge data to sql server
|
||||
parse_sql_template(Config) ->
|
||||
|
@ -506,3 +581,6 @@ proc_batch_sql(BatchReqs, BatchInserts, Tokens) ->
|
|||
])
|
||||
),
|
||||
<<BatchInserts/binary, " values ", Values/binary>>.
|
||||
|
||||
to_bin(List) when is_list(List) ->
|
||||
unicode:characters_to_binary(List, utf8).
|
||||
|
|
|
@ -249,6 +249,7 @@ t_create_disconnected(Config) ->
|
|||
?assertMatch({ok, _}, create_bridge(Config)),
|
||||
health_check_resource_down(Config)
|
||||
end),
|
||||
timer:sleep(10_000),
|
||||
health_check_resource_ok(Config),
|
||||
ok.
|
||||
|
||||
|
@ -317,7 +318,8 @@ t_simple_query(Config) ->
|
|||
{ok, _},
|
||||
create_bridge(Config)
|
||||
),
|
||||
{Requests, Vals} = gen_batch_req(BatchSize),
|
||||
|
||||
{Requests, Vals} = gen_batch_req(Config, BatchSize),
|
||||
?check_trace(
|
||||
begin
|
||||
?wait_async_action(
|
||||
|
@ -519,14 +521,15 @@ create_bridge_http(Params) ->
|
|||
send_message(Config, Payload) ->
|
||||
Name = ?config(sqlserver_name, Config),
|
||||
BridgeType = ?config(sqlserver_bridge_type, Config),
|
||||
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
|
||||
emqx_bridge:send_message(BridgeID, Payload).
|
||||
ActionId = emqx_bridge_v2:id(BridgeType, Name),
|
||||
emqx_bridge_v2:query(BridgeType, Name, {ActionId, Payload}, #{}).
|
||||
|
||||
query_resource(Config, Request) ->
|
||||
Name = ?config(sqlserver_name, Config),
|
||||
BridgeType = ?config(sqlserver_bridge_type, Config),
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
|
||||
ID = emqx_bridge_v2:id(BridgeType, Name),
|
||||
ResID = emqx_connector_resource:resource_id(BridgeType, Name),
|
||||
emqx_resource:query(ID, Request, #{timeout => 1_000, connector_resource_id => ResID}).
|
||||
|
||||
query_resource_async(Config, Request) ->
|
||||
Name = ?config(sqlserver_name, Config),
|
||||
|
@ -545,7 +548,17 @@ resource_id(Config) ->
|
|||
_ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name).
|
||||
|
||||
health_check_resource_ok(Config) ->
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))).
|
||||
BridgeType = ?config(sqlserver_bridge_type, Config),
|
||||
Name = ?config(sqlserver_name, Config),
|
||||
% Wait for reconnection.
|
||||
?retry(
|
||||
_Sleep = 1_000,
|
||||
_Attempts = 10,
|
||||
begin
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))),
|
||||
?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name))
|
||||
end
|
||||
).
|
||||
|
||||
health_check_resource_down(Config) ->
|
||||
case emqx_resource_manager:health_check(resource_id(Config)) of
|
||||
|
@ -666,13 +679,16 @@ sent_data(Payload) ->
|
|||
qos => 0
|
||||
}.
|
||||
|
||||
gen_batch_req(Count) when
|
||||
gen_batch_req(Config, Count) when
|
||||
is_integer(Count) andalso Count > 0
|
||||
->
|
||||
BridgeType = ?config(sqlserver_bridge_type, Config),
|
||||
Name = ?config(sqlserver_name, Config),
|
||||
ActionId = emqx_bridge_v2:id(BridgeType, Name),
|
||||
Vals = [{str(erlang:unique_integer())} || _Seq <- lists:seq(1, Count)],
|
||||
Requests = [{send_message, sent_data(Payload)} || {Payload} <- Vals],
|
||||
Requests = [{ActionId, sent_data(Payload)} || {Payload} <- Vals],
|
||||
{Requests, Vals};
|
||||
gen_batch_req(Count) ->
|
||||
gen_batch_req(_Config, Count) ->
|
||||
ct:pal("Gen batch requests failed with unexpected Count: ~p", [Count]).
|
||||
|
||||
str(List) when is_list(List) ->
|
||||
|
|
|
@ -60,6 +60,8 @@ resource_type(syskeeper_forwarder) ->
|
|||
emqx_bridge_syskeeper_connector;
|
||||
resource_type(syskeeper_proxy) ->
|
||||
emqx_bridge_syskeeper_proxy_server;
|
||||
resource_type(sqlserver) ->
|
||||
emqx_bridge_sqlserver_connector;
|
||||
resource_type(timescale) ->
|
||||
emqx_postgresql;
|
||||
resource_type(redis) ->
|
||||
|
@ -283,6 +285,14 @@ connector_structs() ->
|
|||
required => false
|
||||
}
|
||||
)},
|
||||
{sqlserver,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_sqlserver, "config_connector")),
|
||||
#{
|
||||
desc => <<"Microsoft SQL Server Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)},
|
||||
{timescale,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_bridge_timescale, "config_connector")),
|
||||
|
@ -377,6 +387,7 @@ schema_modules() ->
|
|||
emqx_bridge_mysql,
|
||||
emqx_bridge_syskeeper_connector,
|
||||
emqx_bridge_syskeeper_proxy,
|
||||
emqx_bridge_sqlserver,
|
||||
emqx_bridge_timescale,
|
||||
emqx_postgresql_connector_schema,
|
||||
emqx_bridge_redis_schema,
|
||||
|
@ -427,6 +438,7 @@ api_schemas(Method) ->
|
|||
api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
|
||||
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
|
||||
api_ref(emqx_bridge_sqlserver, <<"sqlserver">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
|
||||
api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
|
||||
api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"),
|
||||
|
|
|
@ -166,6 +166,8 @@ connector_type_to_bridge_types(syskeeper_forwarder) ->
|
|||
[syskeeper_forwarder];
|
||||
connector_type_to_bridge_types(syskeeper_proxy) ->
|
||||
[];
|
||||
connector_type_to_bridge_types(sqlserver) ->
|
||||
[sqlserver];
|
||||
connector_type_to_bridge_types(timescale) ->
|
||||
[timescale];
|
||||
connector_type_to_bridge_types(iotdb) ->
|
||||
|
|
|
@ -311,7 +311,7 @@ maybe_cleanup_api_key(#?APP{name = Name, api_key = ApiKey}) ->
|
|||
|
||||
%% Note for EMQX-11844:
|
||||
%% emqx.conf has the highest priority
|
||||
%% if there is a key conflict, delete the old one and keep the key which from the bootstrap filex
|
||||
%% if there is a key conflict, delete the old one and keep the key which from the bootstrap file
|
||||
?SLOG(info, #{
|
||||
msg => "duplicated_apikey_detected",
|
||||
info => <<"Delete duplicated apikeys and write a new one from bootstrap file">>,
|
||||
|
|
|
@ -37,8 +37,8 @@
|
|||
%% provisional solution: rpc:multicall to all the nodes for creating/updating/removing
|
||||
%% todo: replicate operations
|
||||
|
||||
%% store the config and start the instance
|
||||
-export([
|
||||
%% store the config and start the instance
|
||||
create_local/4,
|
||||
create_local/5,
|
||||
create_dry_run_local/2,
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
The Microsoft SQL Server bridge has been split into connector and action components. Old Microsoft SQL Server bridges will be upgraded automatically.
|
|
@ -46,4 +46,22 @@ sql_template.desc:
|
|||
sql_template.label:
|
||||
"""SQL Template"""
|
||||
|
||||
action_parameters.desc:
|
||||
"""Action specific configuration."""
|
||||
|
||||
action_parameters.label:
|
||||
"""Action"""
|
||||
|
||||
sqlserver_action.desc:
|
||||
"""Configuration for Microsoft SOL Server action."""
|
||||
|
||||
sqlserver_action.label:
|
||||
"""Microsoft SOL Server Action Configuration"""
|
||||
|
||||
config_connector.desc:
|
||||
"""Configuration for a Microsoft SOL Server connector."""
|
||||
|
||||
config_connector.label:
|
||||
"""Microsoft SOL Server Connector Configuration"""
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue