feat: refactor MS SQL Server bridge to connector and action

This commit is contained in:
JimMoen 2024-02-23 15:10:19 +08:00
parent ff6468d83d
commit 9143d5994d
No known key found for this signature in database
10 changed files with 255 additions and 20 deletions

View File

@ -105,6 +105,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_mysql_action_info,
emqx_bridge_pgsql_action_info,
emqx_bridge_syskeeper_action_info,
emqx_bridge_sqlserver_action_info,
emqx_bridge_timescale_action_info,
emqx_bridge_redis_action_info,
emqx_bridge_iotdb_action_info,

View File

@ -3,6 +3,7 @@
{vsn, "0.1.6"},
{registered, []},
{applications, [kernel, stdlib, emqx_resource, odbc]},
{env, [{emqx_action_info_modules, [emqx_bridge_sqlserver_action_info]}]},
{env, []},
{modules, []},
{links, []}

View File

@ -11,6 +11,8 @@
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
bridge_v2_examples/1,
connector_examples/1,
conn_bridge_examples/1
]).
@ -21,6 +23,9 @@
desc/1
]).
-define(CONNECTOR_TYPE, sqlserver).
-define(ACTION_TYPE, ?CONNECTOR_TYPE).
-define(DEFAULT_SQL, <<
"insert into t_mqtt_msg(msgid, topic, qos, payload) "
"values ( ${id}, ${topic}, ${qos}, ${payload} )"
@ -28,6 +33,9 @@
-define(DEFAULT_DRIVER, <<"ms-sql">>).
%% -------------------------------------------------------------------------------------------------
%% api.
conn_bridge_examples(Method) ->
[
#{
@ -65,12 +73,76 @@ values(post) ->
values(put) ->
values(post).
%% ====================
%% Bridge V2: Connector + Action
connector_examples(Method) ->
[
#{
<<"sqlserver">> =>
#{
summary => <<"Microsoft SQL Server Connector">>,
value => emqx_connector_schema:connector_values(
Method, ?CONNECTOR_TYPE, connector_values()
)
}
}
].
connector_values() ->
#{
server => <<"127.0.0.1:1433">>,
database => <<"test">>,
pool_size => 8,
username => <<"sa">>,
password => <<"******">>,
resource_opts => #{health_check_interval => <<"20s">>}
}.
bridge_v2_examples(Method) ->
[
#{
<<"sqlserver">> =>
#{
summary => <<"Microsoft SQL Server Action">>,
value => emqx_bridge_v2_schema:action_values(
Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
)
}
}
].
action_values() ->
#{parameters => #{sql => ?DEFAULT_SQL}}.
%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
namespace() -> "bridge_sqlserver".
roots() -> [].
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(sqlserver_action));
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
emqx_connector_schema:api_fields(
Field,
?CONNECTOR_TYPE,
fields("config_connector") -- emqx_connector_schema:common_fields()
);
fields("config_connector") ->
emqx_connector_schema:common_fields() ++
emqx_bridge_sqlserver_connector:fields(config) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields("config") ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
@ -97,6 +169,27 @@ fields("config") ->
] ++
(emqx_bridge_sqlserver_connector:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields());
fields(action) ->
{?ACTION_TYPE,
mk(
hoconsc:map(name, ref(?MODULE, sqlserver_action)),
#{desc => ?DESC("sqlserver_action"), required => false}
)};
fields(sqlserver_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
mk(
ref(?MODULE, action_parameters),
#{required => true, desc => ?DESC(action_parameters)}
)
);
fields(action_parameters) ->
[
{sql,
mk(
binary(),
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
)}
];
fields("creation_opts") ->
emqx_resource_schema:fields("creation_opts");
fields("post") ->
@ -115,6 +208,14 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for Microsoft SQL Server using `", string:to_upper(Method), "` method."];
desc("creation_opts" = Name) ->
emqx_resource_schema:desc(Name);
desc("config_connector") ->
?DESC("config_connector");
desc(sqlserver_action) ->
?DESC("sqlserver_action");
desc(action_parameters) ->
?DESC("action_parameters");
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_) ->
undefined.

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_sqlserver_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
bridge_v1_type_name() -> sqlserver.
action_type_name() -> sqlserver.
connector_type_name() -> sqlserver.
schema_module() -> emqx_bridge_sqlserver.

View File

@ -35,7 +35,11 @@
on_stop/2,
on_query/3,
on_batch_query/3,
on_get_status/2
on_get_status/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]).
%% callbacks for ecpool
@ -172,7 +176,7 @@ server() ->
callback_mode() -> always_sync.
on_start(
ResourceId = PoolName,
InstanceId = PoolName,
#{
server := Server,
username := Username,
@ -184,7 +188,7 @@ on_start(
) ->
?SLOG(info, #{
msg => "starting_sqlserver_connector",
connector => ResourceId,
connector => InstanceId,
config => emqx_utils:redact(Config)
}),
@ -199,7 +203,8 @@ on_start(
ok
end,
Options = [
%% odbc connection string required
ConnectOptions = [
{server, to_bin(Server)},
{username, Username},
{password, maps:get(password, Config, emqx_secret:wrap(""))},
@ -209,12 +214,12 @@ on_start(
],
State = #{
%% also ResourceId
%% also InstanceId
pool_name => PoolName,
sql_templates => parse_sql_template(Config),
installed_channels => #{},
resource_opts => ResourceOpts
},
case emqx_resource_pool:start(PoolName, ?MODULE, Options) of
case emqx_resource_pool:start(PoolName, ?MODULE, ConnectOptions) of
ok ->
{ok, State};
{error, Reason} ->
@ -225,12 +230,61 @@ on_start(
{error, Reason}
end.
on_stop(ResourceId, _State) ->
on_add_channel(
_InstId,
#{
installed_channels := InstalledChannels
} = OldState,
ChannelId,
ChannelConfig
) ->
{ok, ChannelState} = create_channel_state(ChannelConfig),
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}.
create_channel_state(
#{parameters := Conf} = _ChannelConfig
) ->
State = #{sql_templates => parse_sql_template(Conf)},
{ok, State}.
on_remove_channel(
_InstId,
#{
installed_channels := InstalledChannels
} = OldState,
ChannelId
) ->
NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}.
on_get_channel_status(
InstanceId,
ChannelId,
#{installed_channels := Channels} = State
) ->
case maps:find(ChannelId, Channels) of
{ok, _} -> on_get_status(InstanceId, State);
error -> ?status_disconnected
end.
on_get_channels(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId).
on_stop(InstanceId, _State) ->
?tp(
sqlserver_connector_on_stop,
#{instance_id => InstanceId}
),
?SLOG(info, #{
msg => "stopping_sqlserver_connector",
connector => ResourceId
connector => InstanceId
}),
emqx_resource_pool:stop(ResourceId).
emqx_resource_pool:stop(InstanceId).
-spec on_query(
resource_id(),
@ -273,8 +327,8 @@ on_get_status(_InstanceId, #{pool_name := PoolName} = _State) ->
),
status_result(Health).
status_result(_Status = true) -> connected;
status_result(_Status = false) -> connecting.
status_result(_Status = true) -> ?status_connected;
status_result(_Status = false) -> ?status_connecting.
%% TODO:
%% case for disconnected
@ -296,7 +350,7 @@ do_get_status(Conn) ->
end.
%%====================================================================
%% Internal Helper fns
%% Internal Functions
%%====================================================================
%% TODO && FIXME:
@ -341,7 +395,10 @@ do_query(
ResourceId,
Query,
ApplyMode,
#{pool_name := PoolName, sql_templates := Templates} = State
#{
pool_name := PoolName,
installed_channels := Channels
} = State
) ->
?TRACE(
"SINGLE_QUERY_SYNC",
@ -349,15 +406,19 @@ do_query(
#{query => Query, connector => ResourceId, state => State}
),
ChannelId = get_channel_id(Query),
QueryTuple = get_query_tuple(Query),
#{sql_templates := Templates} = _ChannelState = maps:get(ChannelId, Channels),
%% only insert sql statement for single query and batch query
case apply_template(Query, Templates) of
case apply_template(QueryTuple, Templates) of
{?ACTION_SEND_MESSAGE, SQL} ->
Result = ecpool:pick_and_do(
PoolName,
{?MODULE, worker_do_insert, [SQL, State]},
ApplyMode
);
Query ->
QueryTuple ->
Result = {error, {unrecoverable_error, invalid_query}};
_ ->
Result = {error, {unrecoverable_error, failed_to_apply_sql_template}}
@ -426,8 +487,22 @@ execute(Conn, SQL) ->
execute(Conn, SQL, Timeout) ->
odbc:sql_query(Conn, str(SQL), Timeout).
to_bin(List) when is_list(List) ->
unicode:characters_to_binary(List, utf8).
get_channel_id([{ChannelId, _Req} | _]) ->
ChannelId;
get_channel_id({ChannelId, _Req}) ->
ChannelId.
get_query_tuple({_ChannelId, {QueryType, Data}} = _Query) ->
{QueryType, Data};
get_query_tuple({_ChannelId, Data} = _Query) ->
{send_message, Data};
get_query_tuple([{_ChannelId, {_QueryType, _Data}} | _]) ->
error(
{unrecoverable_error,
{invalid_request, <<"The only query type that supports batching is insert.">>}}
);
get_query_tuple([InsertQuery | _]) ->
get_query_tuple(InsertQuery).
%% for bridge data to sql server
parse_sql_template(Config) ->
@ -506,3 +581,6 @@ proc_batch_sql(BatchReqs, BatchInserts, Tokens) ->
])
),
<<BatchInserts/binary, " values ", Values/binary>>.
to_bin(List) when is_list(List) ->
unicode:characters_to_binary(List, utf8).

View File

@ -60,6 +60,8 @@ resource_type(syskeeper_forwarder) ->
emqx_bridge_syskeeper_connector;
resource_type(syskeeper_proxy) ->
emqx_bridge_syskeeper_proxy_server;
resource_type(sqlserver) ->
emqx_bridge_sqlserver;
resource_type(timescale) ->
emqx_postgresql;
resource_type(redis) ->
@ -283,6 +285,14 @@ connector_structs() ->
required => false
}
)},
{sqlserver,
mk(
hoconsc:map(name, ref(emqx_bridge_sqlserver, "config_connector")),
#{
desc => <<"Microsoft SQL Server Connector Config">>,
required => false
}
)},
{timescale,
mk(
hoconsc:map(name, ref(emqx_bridge_timescale, "config_connector")),
@ -377,6 +387,7 @@ schema_modules() ->
emqx_bridge_mysql,
emqx_bridge_syskeeper_connector,
emqx_bridge_syskeeper_proxy,
emqx_bridge_sqlserver,
emqx_bridge_timescale,
emqx_postgresql_connector_schema,
emqx_bridge_redis_schema,
@ -427,6 +438,7 @@ api_schemas(Method) ->
api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"),
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),
api_ref(emqx_bridge_sqlserver, <<"sqlserver">>, Method ++ "_connector"),
api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"),

View File

@ -166,6 +166,8 @@ connector_type_to_bridge_types(syskeeper_forwarder) ->
[syskeeper_forwarder];
connector_type_to_bridge_types(syskeeper_proxy) ->
[];
connector_type_to_bridge_types(sqlserver) ->
[sqlserver];
connector_type_to_bridge_types(timescale) ->
[timescale];
connector_type_to_bridge_types(iotdb) ->

View File

@ -311,7 +311,7 @@ maybe_cleanup_api_key(#?APP{name = Name, api_key = ApiKey}) ->
%% Note for EMQX-11844:
%% emqx.conf has the highest priority
%% if there is a key conflict, delete the old one and keep the key which from the bootstrap filex
%% if there is a key conflict, delete the old one and keep the key which from the bootstrap file
?SLOG(info, #{
msg => "duplicated_apikey_detected",
info => <<"Delete duplicated apikeys and write a new one from bootstrap file">>,

View File

@ -37,8 +37,8 @@
%% provisional solution: rpc:multicall to all the nodes for creating/updating/removing
%% todo: replicate operations
%% store the config and start the instance
-export([
%% store the config and start the instance
create_local/4,
create_local/5,
create_dry_run_local/2,

View File

@ -46,4 +46,22 @@ sql_template.desc:
sql_template.label:
"""SQL Template"""
action_parameters.desc:
"""Action specific configuration."""
action_parameters.label:
"""Action"""
sqlserver_action.desc:
"""Configuration for Microsoft SOL Server action."""
sqlserver_action.label:
"""Microsoft SOL Server Action Configuration"""
config_connector.desc:
"""Configuration for an Microsoft SOL Server connector."""
config_connector.label:
"""Microsoft SOL Server Connector Configuration"""
}