Merge pull request #11151 from kjellwinblad/kjell/refactor/mysql_bridge/EMQX-9533

refactor: MySQL bridge and connector to separate applications
This commit is contained in:
Kjell Winblad 2023-06-28 15:15:55 +02:00 committed by GitHub
commit f9ea924cd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 95 additions and 58 deletions

View File

@ -5,7 +5,8 @@
{emqx_utils, {path, "../emqx_utils"}}, {emqx_utils, {path, "../emqx_utils"}},
{emqx_connector, {path, "../emqx_connector"}}, {emqx_connector, {path, "../emqx_connector"}},
{emqx_mongodb, {path, "../emqx_mongodb"}}, {emqx_mongodb, {path, "../emqx_mongodb"}},
{emqx_redis, {path, "../emqx_redis"}} {emqx_redis, {path, "../emqx_redis"}},
{emqx_mysql, {path, "../emqx_mysql"}}
]}. ]}.
{edoc_opts, [{preprocess, true}]}. {edoc_opts, [{preprocess, true}]}.

View File

@ -14,7 +14,8 @@
mysql, mysql,
jose, jose,
emqx_mongodb, emqx_mongodb,
emqx_redis emqx_redis,
emqx_mysql
]}, ]},
{mod, {emqx_authn_app, []}}, {mod, {emqx_authn_app, []}},
{env, []}, {env, []},

View File

@ -62,7 +62,7 @@ fields(mysql) ->
{query, fun query/1}, {query, fun query/1},
{query_timeout, fun query_timeout/1} {query_timeout, fun query_timeout/1}
] ++ emqx_authn_schema:common_fields() ++ ] ++ emqx_authn_schema:common_fields() ++
proplists:delete(prepare_statement, emqx_connector_mysql:fields(config)). proplists:delete(prepare_statement, emqx_mysql:fields(config)).
desc(mysql) -> desc(mysql) ->
?DESC(mysql); ?DESC(mysql);
@ -92,12 +92,12 @@ create(_AuthenticatorID, Config) ->
create(Config0) -> create(Config0) ->
ResourceId = emqx_authn_utils:make_resource_id(?MODULE), ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
{Config, State} = parse_config(Config0), {Config, State} = parse_config(Config0),
{ok, _Data} = emqx_authn_utils:create_resource(ResourceId, emqx_connector_mysql, Config), {ok, _Data} = emqx_authn_utils:create_resource(ResourceId, emqx_mysql, Config),
{ok, State#{resource_id => ResourceId}}. {ok, State#{resource_id => ResourceId}}.
update(Config0, #{resource_id := ResourceId} = _State) -> update(Config0, #{resource_id := ResourceId} = _State) ->
{Config, NState} = parse_config(Config0), {Config, NState} = parse_config(Config0),
case emqx_authn_utils:update_resource(emqx_connector_mysql, Config, ResourceId) of case emqx_authn_utils:update_resource(emqx_mysql, Config, ResourceId) of
{error, Reason} -> {error, Reason} ->
error({load_config_error, Reason}); error({load_config_error, Reason});
{ok, _} -> {ok, _} ->

View File

@ -62,7 +62,7 @@ init_per_suite(Config) ->
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
?MYSQL_RESOURCE, ?MYSQL_RESOURCE,
?RESOURCE_GROUP, ?RESOURCE_GROUP,
emqx_connector_mysql, emqx_mysql,
mysql_config(), mysql_config(),
#{} #{}
), ),

View File

@ -6,7 +6,8 @@
{emqx_utils, {path, "../emqx_utils"}}, {emqx_utils, {path, "../emqx_utils"}},
{emqx_connector, {path, "../emqx_connector"}}, {emqx_connector, {path, "../emqx_connector"}},
{emqx_mongodb, {path, "../emqx_mongodb"}}, {emqx_mongodb, {path, "../emqx_mongodb"}},
{emqx_redis, {path, "../emqx_redis"}} {emqx_redis, {path, "../emqx_redis"}},
{emqx_mysql, {path, "../emqx_mysql"}}
]}. ]}.
{shell, [ {shell, [

View File

@ -11,7 +11,8 @@
emqx_resource, emqx_resource,
emqx_connector, emqx_connector,
emqx_mongodb, emqx_mongodb,
emqx_redis emqx_redis,
emqx_mysql
]}, ]},
{env, []}, {env, []},
{modules, []}, {modules, []},

View File

@ -70,7 +70,7 @@ fields(mongo_sharded) ->
fields(mysql) -> fields(mysql) ->
authz_common_fields(mysql) ++ authz_common_fields(mysql) ++
[{query, query()}] ++ [{query, query()}] ++
proplists:delete(prepare_statement, emqx_connector_mysql:fields(config)); proplists:delete(prepare_statement, emqx_mysql:fields(config));
fields(postgresql) -> fields(postgresql) ->
authz_common_fields(postgresql) ++ authz_common_fields(postgresql) ++
[{query, query()}] ++ [{query, query()}] ++

View File

@ -54,13 +54,13 @@ create(#{query := SQL} = Source0) ->
{PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS), {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS),
ResourceId = emqx_authz_utils:make_resource_id(?MODULE), ResourceId = emqx_authz_utils:make_resource_id(?MODULE),
Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}}, Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}},
{ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_connector_mysql, Source), {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_mysql, Source),
Source#{annotations => #{id => ResourceId, tmpl_oken => TmplToken}}. Source#{annotations => #{id => ResourceId, tmpl_oken => TmplToken}}.
update(#{query := SQL} = Source0) -> update(#{query := SQL} = Source0) ->
{PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS), {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS),
Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}}, Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}},
case emqx_authz_utils:update_resource(emqx_connector_mysql, Source) of case emqx_authz_utils:update_resource(emqx_mysql, Source) of
{error, Reason} -> {error, Reason} ->
error({load_config_error, Reason}); error({load_config_error, Reason});
{ok, Id} -> {ok, Id} ->

View File

@ -389,7 +389,7 @@ cmd() ->
connector_fields(DB) -> connector_fields(DB) ->
connector_fields(DB, config). connector_fields(DB, config).
connector_fields(redis = DB, Fields) -> connector_fields(DB, Fields) when DB =:= redis; DB =:= mysql ->
connector_fields(DB, Fields, emqx); connector_fields(DB, Fields, emqx);
connector_fields(DB, Fields) -> connector_fields(DB, Fields) ->
connector_fields(DB, Fields, emqx_connector). connector_fields(DB, Fields, emqx_connector).

View File

@ -44,7 +44,7 @@ init_per_suite(Config) ->
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
?MYSQL_RESOURCE, ?MYSQL_RESOURCE,
?RESOURCE_GROUP, ?RESOURCE_GROUP,
emqx_connector_mysql, emqx_mysql,
mysql_config(), mysql_config(),
#{} #{}
), ),

View File

@ -141,7 +141,9 @@ start_apps() ->
%% we want to make sure they are loaded before %% we want to make sure they are loaded before
%% ekka start in emqx_common_test_helpers:start_apps/1 %% ekka start in emqx_common_test_helpers:start_apps/1
emqx_common_test_helpers:render_and_load_app_config(emqx_conf), emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine, emqx_bridge]). ok = emqx_common_test_helpers:start_apps([
emqx_conf, emqx_rule_engine, emqx_bridge, emqx_mongodb
]).
ensure_loaded() -> ensure_loaded() ->
_ = application:load(emqx_ee_bridge), _ = application:load(emqx_ee_bridge),

View File

@ -0,0 +1,2 @@
toxiproxy
mysql

View File

@ -0,0 +1,11 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}}
, {emqx_bridge, {path, "../../apps/emqx_bridge"}}
, {emqx_mysql, {path, "../../apps/emqx_mysql"}}
]}.
{shell, [
{apps, [emqx_bridge_mysql]}
]}.

View File

@ -1,8 +1,8 @@
{application, emqx_bridge_mysql, [ {application, emqx_bridge_mysql, [
{description, "EMQX Enterprise MySQL Bridge"}, {description, "EMQX Enterprise MySQL Bridge"},
{vsn, "0.1.0"}, {vsn, "0.1.1"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib]}, {applications, [kernel, stdlib, emqx_connector, emqx_resource, emqx_bridge, emqx_mysql]},
{env, []}, {env, []},
{modules, []}, {modules, []},
{links, []} {links, []}

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_ee_bridge_mysql). -module(emqx_bridge_mysql).
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
@ -80,7 +80,7 @@ fields("config") ->
#{desc => ?DESC("local_topic"), default => undefined} #{desc => ?DESC("local_topic"), default => undefined}
)} )}
] ++ emqx_resource_schema:fields("resource_opts") ++ ] ++ emqx_resource_schema:fields("resource_opts") ++
(emqx_connector_mysql:fields(config) -- (emqx_mysql:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields()); emqx_connector_schema_lib:prepare_statement_fields());
fields("post") -> fields("post") ->
[type_field(), name_field() | fields("config")]; [type_field(), name_field() | fields("config")];

View File

@ -2,7 +2,7 @@
% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. % Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_ee_bridge_mysql_SUITE). -module(emqx_bridge_mysql_SUITE).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).

View File

@ -3,25 +3,4 @@
This application is a collection of `connectors`. This application is a collection of `connectors`.
A `connector` is a callback module of `emqx_resource` that maintains the data related to A `connector` is a callback module of `emqx_resource` that maintains the data related to
external resources. Put all resource related callback modules in a single application is good as external resources.
we can put some util functions/modules here for reusing purpose.
For example, a MySQL connector is an emqx resource that maintains all the MySQL connection
related parameters (configs) and the TCP connections to the MySQL server.
An MySQL connector can be used as following:
```
(emqx@127.0.0.1)5> emqx_resource:list_instances_verbose().
[#{config =>
#{cacertfile => [],certfile => [],
database => "mqtt",keyfile => [],password => "public",
pool_size => 1,
server => {{127,0,0,1},3306},
ssl => false,user => "root",verify => false},
id => <<"mysql-abc">>,mod => emqx_connector_mysql,
state => #{poolname => 'mysql-abc'},
status => connected}]
(emqx@127.0.0.1)6> emqx_resource:query(<<"mysql-abc">>, {sql, <<"SELECT count(1)">>}).
{ok,[<<"count(1)">>],[[1]]}
```

View File

@ -1,2 +1 @@
mysql
pgsql pgsql

View File

@ -10,7 +10,6 @@
{emqx_utils, {path, "../emqx_utils"}}, {emqx_utils, {path, "../emqx_utils"}},
{emqx_resource, {path, "../emqx_resource"}}, {emqx_resource, {path, "../emqx_resource"}},
{eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}}, {eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}},
{mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.2"}}},
{epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7.0.1"}}} {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7.0.1"}}}
]}. ]}.

View File

@ -13,8 +13,6 @@
eredis, eredis,
epgsql, epgsql,
eldap2, eldap2,
mysql,
mongodb,
ehttpc, ehttpc,
jose, jose,
emqx, emqx,

View File

@ -38,6 +38,7 @@ init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]), ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector), {ok, _} = application:ensure_all_started(emqx_connector),
{ok, _} = application:ensure_all_started(emqx_mongodb),
Config; Config;
false -> false ->
{skip, no_mongo} {skip, no_mongo}

15
apps/emqx_mysql/README.md Normal file
View File

@ -0,0 +1,15 @@
# MySQL Connector
This application houses the MySQL Database connector.
It provides the APIs to connect to MySQL Databases.
It is used by the MySQL bridge to insert messages and by the emqx_authz and
emqx_authn applications to check user permissions.
## Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).
## License
See [APL](../../APL.txt).

View File

@ -0,0 +1 @@
mysql

View File

@ -0,0 +1,9 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [
%% NOTE: mind ecpool version when updating eredis_cluster version
{mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.2"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}
]}.

View File

@ -0,0 +1,14 @@
{application, emqx_mysql, [
{description, "EMQX MySQL Database Connector"},
{vsn, "0.1.0"},
{registered, []},
{applications, [
kernel,
stdlib,
mysql
]},
{env, []},
{modules, []},
{links, []}
]}.

View File

@ -13,9 +13,9 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_connector_mysql). -module(emqx_mysql).
-include("emqx_connector.hrl"). -include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").

View File

@ -13,18 +13,18 @@
% %% limitations under the License. % %% limitations under the License.
% %%-------------------------------------------------------------------- % %%--------------------------------------------------------------------
-module(emqx_connector_mysql_SUITE). -module(emqx_mysql_SUITE).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-compile(export_all). -compile(export_all).
-include("emqx_connector.hrl"). -include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
-define(MYSQL_HOST, "mysql"). -define(MYSQL_HOST, "mysql").
-define(MYSQL_RESOURCE_MOD, emqx_connector_mysql). -define(MYSQL_RESOURCE_MOD, emqx_mysql).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -60,7 +60,7 @@ end_per_testcase(_, _Config) ->
t_lifecycle(_Config) -> t_lifecycle(_Config) ->
perform_lifecycle_check( perform_lifecycle_check(
<<"emqx_connector_mysql_SUITE">>, <<"emqx_mysql_SUITE">>,
mysql_config() mysql_config()
). ).

View File

@ -0,0 +1 @@
The MySQL connector has been refactored to its own Erlang application to improve the code structure.

View File

@ -0,0 +1 @@
The MySQL bridge has been refactored to its own Erlang application to improve the code structure and to make it easier to maintain.

View File

@ -1,2 +1 @@
toxiproxy toxiproxy
mysql

View File

@ -25,7 +25,7 @@ api_schemas(Method) ->
%% to hocon; keeping this as just `kafka' for backwards compatibility. %% to hocon; keeping this as just `kafka' for backwards compatibility.
api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_producer"), api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_producer"),
api_ref(emqx_bridge_cassandra, <<"cassandra">>, Method), api_ref(emqx_bridge_cassandra, <<"cassandra">>, Method),
api_ref(emqx_ee_bridge_mysql, <<"mysql">>, Method), api_ref(emqx_bridge_mysql, <<"mysql">>, Method),
api_ref(emqx_bridge_pgsql, <<"pgsql">>, Method), api_ref(emqx_bridge_pgsql, <<"pgsql">>, Method),
api_ref(emqx_bridge_mongodb, <<"mongodb_rs">>, Method ++ "_rs"), api_ref(emqx_bridge_mongodb, <<"mongodb_rs">>, Method ++ "_rs"),
api_ref(emqx_bridge_mongodb, <<"mongodb_sharded">>, Method ++ "_sharded"), api_ref(emqx_bridge_mongodb, <<"mongodb_sharded">>, Method ++ "_sharded"),
@ -58,7 +58,7 @@ schema_modules() ->
emqx_bridge_gcp_pubsub, emqx_bridge_gcp_pubsub,
emqx_bridge_influxdb, emqx_bridge_influxdb,
emqx_bridge_mongodb, emqx_bridge_mongodb,
emqx_ee_bridge_mysql, emqx_bridge_mysql,
emqx_bridge_redis, emqx_bridge_redis,
emqx_bridge_pgsql, emqx_bridge_pgsql,
emqx_bridge_timescale, emqx_bridge_timescale,
@ -99,7 +99,7 @@ resource_type(gcp_pubsub_consumer) -> emqx_bridge_gcp_pubsub_impl_consumer;
resource_type(mongodb_rs) -> emqx_bridge_mongodb_connector; resource_type(mongodb_rs) -> emqx_bridge_mongodb_connector;
resource_type(mongodb_sharded) -> emqx_bridge_mongodb_connector; resource_type(mongodb_sharded) -> emqx_bridge_mongodb_connector;
resource_type(mongodb_single) -> emqx_bridge_mongodb_connector; resource_type(mongodb_single) -> emqx_bridge_mongodb_connector;
resource_type(mysql) -> emqx_connector_mysql; resource_type(mysql) -> emqx_mysql;
resource_type(influxdb_api_v1) -> emqx_bridge_influxdb_connector; resource_type(influxdb_api_v1) -> emqx_bridge_influxdb_connector;
resource_type(influxdb_api_v2) -> emqx_bridge_influxdb_connector; resource_type(influxdb_api_v2) -> emqx_bridge_influxdb_connector;
resource_type(redis_single) -> emqx_bridge_redis_connector; resource_type(redis_single) -> emqx_bridge_redis_connector;
@ -131,7 +131,7 @@ fields(bridges) ->
)}, )},
{mysql, {mysql,
mk( mk(
hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")), hoconsc:map(name, ref(emqx_bridge_mysql, "config")),
#{ #{
desc => <<"MySQL Bridge Config">>, desc => <<"MySQL Bridge Config">>,
required => false required => false

View File

@ -378,6 +378,7 @@ defmodule EMQXUmbrella.MixProject do
emqx_slow_subs: :permanent, emqx_slow_subs: :permanent,
emqx_mongodb: :permanent, emqx_mongodb: :permanent,
emqx_redis: :permanent, emqx_redis: :permanent,
emqx_mysql: :permanent,
emqx_plugins: :permanent, emqx_plugins: :permanent,
emqx_mix: :none emqx_mix: :none
] ++ ] ++

View File

@ -441,6 +441,7 @@ relx_apps(ReleaseType, Edition) ->
emqx_slow_subs, emqx_slow_subs,
emqx_mongodb, emqx_mongodb,
emqx_redis, emqx_redis,
emqx_mysql,
emqx_plugins emqx_plugins
] ++ ] ++
[quicer || is_quicer_supported()] ++ [quicer || is_quicer_supported()] ++

View File

@ -1,4 +1,4 @@
emqx_ee_bridge_mysql { emqx_bridge_mysql {
config_enable.desc: config_enable.desc:
"""Enable or disable this bridge""" """Enable or disable this bridge"""

View File

@ -1,4 +1,4 @@
emqx_connector_mysql { emqx_mysql {
server.desc: server.desc:
"""The IPv4 or IPv6 address or the hostname to connect to.<br/> """The IPv4 or IPv6 address or the hostname to connect to.<br/>