diff --git a/apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src b/apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src index 38750b79a..24fdd2648 100644 --- a/apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src +++ b/apps/emqx_auth_mysql/src/emqx_auth_mysql.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_mysql, [ {description, "EMQX MySQL Authentication and Authorization"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {mod, {emqx_auth_mysql_app, []}}, {applications, [ diff --git a/apps/emqx_auth_mysql/src/emqx_authn_mysql_schema.erl b/apps/emqx_auth_mysql/src/emqx_authn_mysql_schema.erl index 6472794fe..df3b89ae9 100644 --- a/apps/emqx_auth_mysql/src/emqx_authn_mysql_schema.erl +++ b/apps/emqx_auth_mysql/src/emqx_authn_mysql_schema.erl @@ -55,8 +55,7 @@ fields(mysql) -> {password_hash_algorithm, fun emqx_authn_password_hashing:type_ro/1}, {query, fun query/1}, {query_timeout, fun query_timeout/1} - ] ++ emqx_authn_schema:common_fields() ++ - proplists:delete(prepare_statement, emqx_mysql:fields(config)). + ] ++ emqx_authn_schema:common_fields() ++ emqx_mysql:fields(config). desc(mysql) -> ?DESC(mysql); diff --git a/apps/emqx_auth_mysql/src/emqx_authz_mysql_schema.erl b/apps/emqx_auth_mysql/src/emqx_authz_mysql_schema.erl index 43f6ca6fa..c1cff7533 100644 --- a/apps/emqx_auth_mysql/src/emqx_authz_mysql_schema.erl +++ b/apps/emqx_auth_mysql/src/emqx_authz_mysql_schema.erl @@ -37,6 +37,7 @@ type() -> ?AUTHZ_TYPE. fields(mysql) -> emqx_authz_schema:authz_common_fields(?AUTHZ_TYPE) ++ emqx_mysql:fields(config) ++ + emqx_connector_schema_lib:prepare_statement_fields() ++ [{query, query()}]. desc(mysql) -> diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index f975a1c93..9376eeef3 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -79,6 +79,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_kafka_action_info, emqx_bridge_matrix_action_info, emqx_bridge_mongodb_action_info, + emqx_bridge_mysql_action_info, emqx_bridge_pgsql_action_info, emqx_bridge_syskeeper_action_info, emqx_bridge_timescale_action_info, diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 7cd6b5bd8..b271f4259 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -31,7 +31,8 @@ get_response/0, put_request/0, post_request/0, - examples/1 + examples/1, + action_values/4 ]). %% Exported for mocking @@ -103,6 +104,54 @@ bridge_api_union(Refs) -> end end. +-type http_method() :: get | post | put. +-type schema_example_map() :: #{atom() => term()}. + +-spec action_values(http_method(), atom(), atom(), schema_example_map()) -> schema_example_map(). +action_values(Method, ActionType, ConnectorType, ActionValues) -> + ActionTypeBin = atom_to_binary(ActionType), + ConnectorTypeBin = atom_to_binary(ConnectorType), + lists:foldl( + fun(M1, M2) -> + maps:merge(M1, M2) + end, + #{ + enable => true, + description => <<"My example ", ActionTypeBin/binary, " action">>, + connector => <>, + resource_opts => #{ + health_check_interval => "30s" + } + }, + [ + ActionValues, + method_values(Method, ActionType) + ] + ). + +-spec method_values(http_method(), atom()) -> schema_example_map(). +method_values(post, Type) -> + TypeBin = atom_to_binary(Type), + #{ + name => <>, + type => TypeBin + }; +method_values(get, Type) -> + maps:merge( + method_values(post, Type), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +method_values(put, _Type) -> + #{}. + %%====================================================================================== %% HOCON Schema Callbacks %%====================================================================================== diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl index bc5e2eb74..61d3cec61 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl @@ -36,9 +36,7 @@ namespace() -> "bridge_mongodb". -roots() -> - %% ??? - []. +roots() -> []. fields("config") -> [ diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl index 823418f28..9fa19add2 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl @@ -31,9 +31,9 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> maps:merge( maps:without( [<<"connector">>], - map_unindent(<<"parameters">>, ActionConfig) + emqx_utils_maps:unindent(<<"parameters">>, ActionConfig) ), - map_unindent(<<"parameters">>, ConnectorConfig) + emqx_utils_maps:unindent(<<"parameters">>, ConnectorConfig) ) ). @@ -66,7 +66,7 @@ bridge_v1_config_to_connector_config(BridgeV1Config) -> make_config_map(PickKeys, IndentKeys, Config) -> Conf0 = maps:with(PickKeys, Config), - map_indent(<<"parameters">>, IndentKeys, Conf0). + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). bridge_v1_type_name() -> {fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}. @@ -86,18 +86,5 @@ v1_type(<<"rs">>) -> mongodb_rs; v1_type(<<"sharded">>) -> mongodb_sharded; v1_type(<<"single">>) -> mongodb_single. -map_unindent(Key, Map) -> - maps:merge( - maps:get(Key, Map), - maps:remove(Key, Map) - ). - -map_indent(IndentKey, PickKeys, Map) -> - maps:put( - IndentKey, - maps:with(PickKeys, Map), - maps:without(PickKeys, Map) - ). - schema_keys(Name) -> [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))]. diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src index b1d110d36..50f535ac6 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src @@ -9,7 +9,7 @@ emqx_resource, emqx_mysql ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_mysql_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.erl b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.erl index a72da7a1e..05c782b96 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.erl +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.erl @@ -10,7 +10,9 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - conn_bridge_examples/1 + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 ]). -export([ @@ -20,6 +22,9 @@ desc/1 ]). +-define(CONNECTOR_TYPE, mysql). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). + -define(DEFAULT_SQL, << "insert into t_mqtt_msg(msgid, topic, qos, payload, arrived) " "values (${id}, ${topic}, ${qos}, ${payload}, FROM_UNIXTIME(${timestamp}/1000))" @@ -28,6 +33,22 @@ %% ------------------------------------------------------------------------------------------------- %% api +bridge_v2_examples(Method) -> + [ + #{ + <<"mysql">> => + #{ + summary => <<"MySQL Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> + #{parameters => #{sql => ?DEFAULT_SQL}}. + conn_bridge_examples(Method) -> [ #{ @@ -38,6 +59,29 @@ conn_bridge_examples(Method) -> } ]. +connector_examples(Method) -> + [ + #{ + <<"mysql">> => + #{ + summary => <<"MySQL Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_values() + ) + } + } + ]. + +connector_values() -> + #{ + server => <<"127.0.0.1:3306">>, + database => <<"test">>, + pool_size => 8, + username => <<"root">>, + password => <<"******">>, + resource_opts => #{health_check_interval => <<"20s">>} + }. + values(_Method) -> #{ enable => true, @@ -80,17 +124,70 @@ fields("config") -> #{desc => ?DESC("local_topic"), default => undefined} )} ] ++ emqx_resource_schema:fields("resource_opts") ++ - (emqx_mysql:fields(config) -- - emqx_connector_schema_lib:prepare_statement_fields()); + emqx_mysql:fields(config); +fields(action) -> + {mysql, + mk( + hoconsc:map(name, ref(?MODULE, mysql_action)), + #{desc => <<"MySQL Action Config">>, required => false} + )}; +fields(mysql_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk( + ref(?MODULE, action_parameters), + #{ + required => true, desc => ?DESC(action_parameters) + } + ) + ); +fields(action_parameters) -> + [ + {sql, + mk( + binary(), + #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + )} + ]; +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + emqx_mysql:fields(config) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields("post") -> [type_field(), name_field() | fields("config")]; fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:status_fields() ++ fields("post"). + emqx_bridge_schema:status_fields() ++ fields("post"); +fields("get_bridge_v2") -> + emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"); +fields("post_bridge_v2") -> + [type_field(), name_field() | fields(mysql_action)]; +fields("put_bridge_v2") -> + fields(mysql_action); +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields( + Field, + ?CONNECTOR_TYPE, + emqx_mysql:fields(config) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts) + ). desc("config") -> ?DESC("desc_config"); +desc("config_connector") -> + ?DESC("desc_config"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); +desc(action_parameters) -> + ?DESC(action_parameters); +desc(mysql_action) -> + ?DESC(mysql_action); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for MySQL using `", string:to_upper(Method), "` method."]; desc(_) -> diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_action_info.erl b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_action_info.erl new file mode 100644 index 000000000..31817b02f --- /dev/null +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_action_info.erl @@ -0,0 +1,64 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_mysql_action_info). + +-behaviour(emqx_action_info). + +%% behaviour callbacks +-export([ + action_type_name/0, + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1, + bridge_v1_type_name/0, + connector_action_config_to_bridge_v1_config/2, + connector_type_name/0, + schema_module/0 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(MYSQL_TYPE, mysql). +-define(SCHEMA_MODULE, emqx_bridge_mysql). + +action_type_name() -> ?MYSQL_TYPE. +bridge_v1_type_name() -> ?MYSQL_TYPE. +connector_type_name() -> ?MYSQL_TYPE. + +schema_module() -> ?SCHEMA_MODULE. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + MergedConfig = + emqx_utils_maps:deep_merge( + maps:without( + [<<"connector">>], + emqx_utils_maps:unindent(<<"parameters">>, ActionConfig) + ), + ConnectorConfig + ), + BridgeV1Keys = schema_keys("config"), + maps:with(BridgeV1Keys, MergedConfig). + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(mysql_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + ActionConfig#{<<"connector">> => ConnectorName}. + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ConnectorKeys = schema_keys("config_connector"), + ResourceOptsKeys = schema_keys(connector_resource_opts), + maps:update_with( + <<"resource_opts">>, + fun(ResourceOpts) -> maps:with(ResourceOptsKeys, ResourceOpts) end, + #{}, + maps:with(ConnectorKeys, BridgeV1Config) + ). + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config), + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). + +schema_keys(Name) -> + [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))]. diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl new file mode 100644 index 000000000..468f64d1f --- /dev/null +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl @@ -0,0 +1,150 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_mysql_connector). + +-behaviour(emqx_resource). + +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% `emqx_resource' API +-export([ + on_remove_channel/3, + callback_mode/0, + on_add_channel/4, + on_batch_query/3, + on_get_channel_status/3, + on_get_channels/1, + on_get_status/2, + on_query/3, + on_start/2, + on_stop/2 +]). + +%%======================================================================================== +%% `emqx_resource' API +%%======================================================================================== + +callback_mode() -> emqx_mysql:callback_mode(). + +on_add_channel( + _InstanceId, + #{channels := Channels, connector_state := ConnectorState} = State0, + ChannelId, + ChannelConfig0 +) -> + ChannelConfig1 = emqx_utils_maps:unindent(parameters, ChannelConfig0), + QueryTemplates = emqx_mysql:parse_prepare_sql(ChannelId, ChannelConfig1), + ChannelConfig2 = maps:merge(ChannelConfig1, QueryTemplates), + ChannelConfig = set_prepares(ChannelConfig2, ConnectorState), + State = State0#{ + channels => maps:put(ChannelId, ChannelConfig, Channels), + connector_state => ConnectorState + }, + {ok, State}. + +on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> + case maps:get(ChannelId, Channels) of + #{prepares := ok} -> + connected; + #{prepares := {error, _}} -> + connecting + end. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + +on_get_status(InstanceId, #{channels := Channels0, connector_state := ConnectorState} = State0) -> + case emqx_mysql:on_get_status(InstanceId, ConnectorState) of + WithState when is_tuple(WithState) -> + NewConnectorState = element(2, WithState), + State = State0#{connector_state => NewConnectorState}, + setelement(2, WithState, State); + connected -> + Channels = + maps:map( + fun + (_ChannelId, #{prepares := ok} = ChannelConfig) -> + ChannelConfig; + (_ChannelId, #{prepares := {error, _}} = ChannelConfig) -> + set_prepares(ChannelConfig, ConnectorState) + end, + Channels0 + ), + State = State0#{channels => Channels}, + {connected, State}; + Other -> + Other + end. + +on_query(InstId, {TypeOrKey, SQLOrKey}, State) -> + on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State); +on_query(InstId, {TypeOrKey, SQLOrKey, Params}, State) -> + on_query(InstId, {TypeOrKey, SQLOrKey, Params, default_timeout}, State); +on_query( + InstanceId, + {Channel, _Message, _Params, _Timeout} = Request, + #{channels := Channels, connector_state := ConnectorState} +) when is_binary(Channel) -> + ChannelConfig = maps:get(Channel, Channels), + Result = emqx_mysql:on_query( + InstanceId, + Request, + maps:merge(ConnectorState, ChannelConfig) + ), + ?tp(mysql_connector_on_query_return, #{instance_id => InstanceId, result => Result}), + Result; +on_query(InstanceId, Request, _State = #{channels := _Channels, connector_state := ConnectorState}) -> + emqx_mysql:on_query(InstanceId, Request, ConnectorState). + +on_batch_query( + InstanceId, + [Req | _] = BatchRequest, + #{channels := Channels, connector_state := ConnectorState} +) when is_binary(element(1, Req)) -> + Channel = element(1, Req), + ChannelConfig = maps:get(Channel, Channels), + Result = emqx_mysql:on_batch_query( + InstanceId, + BatchRequest, + maps:merge(ConnectorState, ChannelConfig) + ), + ?tp(mysql_connector_on_batch_query_return, #{instance_id => InstanceId, result => Result}), + Result; +on_batch_query(InstanceId, BatchRequest, _State = #{connector_state := ConnectorState}) -> + emqx_mysql:on_batch_query(InstanceId, BatchRequest, ConnectorState). + +on_remove_channel( + _InstanceId, #{channels := Channels, connector_state := ConnectorState} = State, ChannelId +) -> + ChannelConfig = maps:get(ChannelId, Channels), + emqx_mysql:unprepare_sql(maps:merge(ChannelConfig, ConnectorState)), + NewState = State#{channels => maps:remove(ChannelId, Channels)}, + {ok, NewState}. + +-spec on_start(binary(), hocon:config()) -> + {ok, #{connector_state := emqx_mysql:state(), channels := map()}} | {error, _}. +on_start(InstanceId, Config) -> + case emqx_mysql:on_start(InstanceId, Config) of + {ok, ConnectorState} -> + State = #{ + connector_state => ConnectorState, + channels => #{} + }, + {ok, State}; + {error, Reason} -> + {error, Reason} + end. + +on_stop(InstanceId, _State = #{connector_state := ConnectorState}) -> + ok = emqx_mysql:on_stop(InstanceId, ConnectorState), + ?tp(mysql_connector_stopped, #{instance_id => InstanceId}), + ok. + +%%======================================================================================== +%% Helper fns +%%======================================================================================== +set_prepares(ChannelConfig, ConnectorState) -> + #{prepares := Prepares} = + emqx_mysql:init_prepare(maps:merge(ConnectorState, ChannelConfig)), + ChannelConfig#{prepares => Prepares}. diff --git a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl index 98b957b19..f1b3f8260 100644 --- a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl +++ b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl @@ -242,13 +242,12 @@ send_message(Config, Payload) -> query_resource(Config, Request) -> Name = ?config(mysql_name, Config), BridgeType = ?config(mysql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request, #{timeout => 500}). + emqx_bridge_v2:query(BridgeType, Name, Request, #{timeout => 500}). sync_query_resource(Config, Request) -> Name = ?config(mysql_name, Config), BridgeType = ?config(mysql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + ResourceID = emqx_bridge_v2:id(BridgeType, Name), emqx_resource_buffer_worker:simple_sync_query(ResourceID, Request). query_resource_async(Config, Request) -> @@ -256,8 +255,7 @@ query_resource_async(Config, Request) -> BridgeType = ?config(mysql_bridge_type, Config), Ref = alias([reply]), AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end, - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - Return = emqx_resource:query(ResourceID, Request, #{ + Return = emqx_bridge_v2:query(BridgeType, Name, Request, #{ timeout => 500, async_reply_fun => {AsyncReplyFun, []} }), {Return, Ref}. @@ -274,7 +272,9 @@ unprepare(Config, Key) -> Name = ?config(mysql_name, Config), BridgeType = ?config(mysql_bridge_type, Config), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - {ok, _, #{state := #{pool_name := PoolName}}} = emqx_resource:get_instance(ResourceID), + {ok, _, #{state := #{connector_state := #{pool_name := PoolName}}}} = emqx_resource:get_instance( + ResourceID + ), [ begin {ok, Conn} = ecpool_worker:client(Worker), @@ -343,6 +343,17 @@ create_rule_and_action_http(Config) -> Error end. +request_api_status(BridgeId) -> + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader) of + {ok, Res0} -> + #{<<"status">> := Status} = _Res = emqx_utils_json:decode(Res0, [return_maps]), + {ok, binary_to_existing_atom(Status)}; + Error -> + Error + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -519,14 +530,18 @@ t_write_timeout(Config) -> 2 * Timeout ), emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> + Name = ?config(mysql_name, Config), + BridgeType = ?config(mysql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + case QueryMode of sync -> ?assertMatch( {error, {resource_error, #{reason := timeout}}}, - query_resource(Config, {send_message, SentData, [], Timeout}) + query_resource(Config, {ResourceID, SentData, [], Timeout}) ); async -> - query_resource(Config, {send_message, SentData, [], Timeout}), + query_resource(Config, {ResourceID, SentData, [], Timeout}), ok end, ok @@ -703,7 +718,10 @@ t_uninitialized_prepared_statement(Config) -> ), Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, - unprepare(Config, send_message), + Name = ?config(mysql_name, Config), + BridgeType = ?config(mysql_bridge_type, Config), + ResourceID = emqx_bridge_v2:id(BridgeType, Name), + unprepare(Config, ResourceID), ?check_trace( begin {Res, {ok, _}} = @@ -721,7 +739,7 @@ t_uninitialized_prepared_statement(Config) -> #{?snk_kind := mysql_connector_prepare_query_failed, error := not_prepared}, #{ ?snk_kind := mysql_connector_on_query_prepared_sql, - type_or_key := send_message + type_or_key := ResourceID }, Trace ) @@ -736,33 +754,58 @@ t_uninitialized_prepared_statement(Config) -> ok. t_missing_table(Config) -> + QueryMode = ?config(query_mode, Config), Name = ?config(mysql_name, Config), BridgeType = ?config(mysql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ?check_trace( begin connect_and_drop_table(Config), ?assertMatch({ok, _}, create_bridge(Config)), + BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), ?retry( _Sleep = 1_000, _Attempts = 20, ?assertMatch( {ok, Status} when Status == connecting orelse Status == disconnected, - emqx_resource_manager:health_check(ResourceID) + request_api_status(BridgeID) ) ), Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, - Timeout = 1000, - ?assertMatch( - {error, {resource_error, #{reason := unhealthy_target}}}, - query_resource(Config, {send_message, SentData, [], Timeout}) - ), + %Timeout = 1000, + ResourceID = emqx_bridge_v2:id(BridgeType, Name), + Request = {ResourceID, SentData}, + Result = + case QueryMode of + sync -> + query_resource(Config, Request); + async -> + {_, Ref} = query_resource_async(Config, Request), + {ok, Res} = receive_result(Ref, 2_000), + Res + end, + + BatchSize = ?config(batch_size, Config), + IsBatch = BatchSize > 1, + case IsBatch of + true -> + ?assertMatch( + {error, + {unrecoverable_error, + {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}}, + Result + ); + false -> + ?assertMatch( + {error, undefined_table}, + Result + ) + end, ok end, fun(Trace) -> - ?assertMatch([_, _, _], ?of_kind(mysql_undefined_table, Trace)), + ?assertMatch([_ | _], ?of_kind(mysql_undefined_table, Trace)), ok end ). @@ -770,9 +813,9 @@ t_missing_table(Config) -> t_table_removed(Config) -> Name = ?config(mysql_name, Config), BridgeType = ?config(mysql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), connect_and_create_table(Config), ?assertMatch({ok, _}, create_bridge(Config)), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ?retry( _Sleep = 1_000, _Attempts = 20, @@ -782,17 +825,17 @@ t_table_removed(Config) -> Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, Timeout = 1000, + ActionID = emqx_bridge_v2:id(BridgeType, Name), ?assertMatch( {error, {unrecoverable_error, {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}}, - sync_query_resource(Config, {send_message, SentData, [], Timeout}) + sync_query_resource(Config, {ActionID, SentData, [], Timeout}) ), ok. t_nested_payload_template(Config) -> Name = ?config(mysql_name, Config), BridgeType = ?config(mysql_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), Value = integer_to_binary(erlang:unique_integer()), {ok, _} = create_bridge( Config, @@ -803,6 +846,7 @@ t_nested_payload_template(Config) -> } ), {ok, #{<<"from">> := [Topic]}} = create_rule_and_action_http(Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ?retry( _Sleep = 1_000, _Attempts = 20, diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 1ca6e4a5d..4e8618915 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -34,6 +34,8 @@ resource_type(matrix) -> emqx_postgresql; resource_type(mongodb) -> emqx_bridge_mongodb_connector; +resource_type(mysql) -> + emqx_bridge_mysql_connector; resource_type(pgsql) -> emqx_postgresql; resource_type(syskeeper_forwarder) -> @@ -94,6 +96,14 @@ connector_structs() -> required => false } )}, + {matrix, + mk( + hoconsc:map(name, ref(emqx_bridge_matrix, "config_connector")), + #{ + desc => <<"Matrix Connector Config">>, + required => false + } + )}, {mongodb, mk( hoconsc:map(name, ref(emqx_bridge_mongodb, "config_connector")), @@ -102,6 +112,30 @@ connector_structs() -> required => false } )}, + {mysql, + mk( + hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")), + #{ + desc => <<"MySQL Connector Config">>, + required => false + } + )}, + {pgsql, + mk( + hoconsc:map(name, ref(emqx_bridge_pgsql, "config_connector")), + #{ + desc => <<"PostgreSQL Connector Config">>, + required => false + } + )}, + {redis, + mk( + hoconsc:map(name, ref(emqx_bridge_redis_schema, "config_connector")), + #{ + desc => <<"Redis Connector Config">>, + required => false + } + )}, {syskeeper_forwarder, mk( hoconsc:map(name, ref(emqx_bridge_syskeeper_connector, config)), @@ -118,14 +152,6 @@ connector_structs() -> required => false } )}, - {pgsql, - mk( - hoconsc:map(name, ref(emqx_bridge_pgsql, "config_connector")), - #{ - desc => <<"PostgreSQL Connector Config">>, - required => false - } - )}, {timescale, mk( hoconsc:map(name, ref(emqx_bridge_timescale, "config_connector")), @@ -133,22 +159,6 @@ connector_structs() -> desc => <<"Timescale Connector Config">>, required => false } - )}, - {matrix, - mk( - hoconsc:map(name, ref(emqx_bridge_matrix, "config_connector")), - #{ - desc => <<"Matrix Connector Config">>, - required => false - } - )}, - {redis, - mk( - hoconsc:map(name, ref(emqx_bridge_redis_schema, "config_connector")), - #{ - desc => <<"Redis Connector Config">>, - required => false - } )} ]. @@ -160,6 +170,7 @@ schema_modules() -> emqx_bridge_kafka, emqx_bridge_matrix, emqx_bridge_mongodb, + emqx_bridge_mysql, emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_proxy, emqx_bridge_timescale, @@ -185,6 +196,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"), api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"), + api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 39ae3e764..6515a45d8 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -36,9 +36,11 @@ -export([get_response/0, put_request/0, post_request/0]). -export([connector_type_to_bridge_types/1]). + -export([ api_fields/3, common_fields/0, + connector_values/3, status_and_actions_fields/0, type_and_name_fields/1 ]). @@ -128,16 +130,18 @@ connector_type_to_bridge_types(matrix) -> [matrix]; connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; +connector_type_to_bridge_types(mysql) -> + [mysql]; connector_type_to_bridge_types(pgsql) -> [pgsql]; +connector_type_to_bridge_types(redis) -> + [redis, redis_single, redis_sentinel, redis_cluster]; connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder]; connector_type_to_bridge_types(syskeeper_proxy) -> []; connector_type_to_bridge_types(timescale) -> - [timescale]; -connector_type_to_bridge_types(redis) -> - [redis, redis_single, redis_sentinel, redis_cluster]. + [timescale]. actions_config_name() -> <<"actions">>. @@ -549,6 +553,48 @@ resource_opts_fields(Overrides) -> emqx_resource_schema:create_opts(Overrides) ). +-type http_method() :: get | post | put. +-type schema_example_map() :: #{atom() => term()}. + +-spec connector_values(http_method(), atom(), schema_example_map()) -> schema_example_map(). +connector_values(Method, Type, ConnectorValues) -> + TypeBin = atom_to_binary(Type), + lists:foldl( + fun(M1, M2) -> + maps:merge(M1, M2) + end, + #{ + description => <<"My example ", TypeBin/binary, " connector">> + }, + [ + ConnectorValues, + method_values(Method, Type) + ] + ). + +method_values(post, Type) -> + TypeBin = atom_to_binary(Type), + #{ + name => <>, + type => TypeBin + }; +method_values(get, Type) -> + maps:merge( + method_values(post, Type), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ], + actions => [<<"my_action">>] + } + ); +method_values(put, _Type) -> + #{}. + %%====================================================================================== %% Helper Functions %%====================================================================================== diff --git a/apps/emqx_mysql/rebar.config b/apps/emqx_mysql/rebar.config index fc7f4df7a..657daf61b 100644 --- a/apps/emqx_mysql/rebar.config +++ b/apps/emqx_mysql/rebar.config @@ -3,7 +3,7 @@ {erl_opts, [debug_info]}. {deps, [ %% NOTE: mind ecpool version when updating eredis_cluster version - {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.4"}}}, + {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.4.1"}}}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}} ]}. diff --git a/apps/emqx_mysql/src/emqx_mysql.erl b/apps/emqx_mysql/src/emqx_mysql.erl index 66fce9fde..fcfabd61e 100644 --- a/apps/emqx_mysql/src/emqx_mysql.erl +++ b/apps/emqx_mysql/src/emqx_mysql.erl @@ -36,7 +36,13 @@ %% ecpool connect & reconnect -export([connect/1, prepare_sql_to_conn/2]). --export([prepare_sql/2]). +-export([ + init_prepare/1, + prepare_sql/2, + parse_prepare_sql/1, + parse_prepare_sql/2, + unprepare_sql/1 +]). -export([roots/0, fields/1]). @@ -51,9 +57,10 @@ #{ pool_name := binary(), prepares := ok | {error, _}, - templates := #{{atom(), batch | prepstmt} => template()} + templates := #{{atom(), batch | prepstmt} => template()}, + query_templates := map() }. - +-export_type([state/0]). %%===================================================================== %% Hocon schema roots() -> @@ -62,8 +69,7 @@ roots() -> fields(config) -> [{server, server()}] ++ add_default_username(emqx_connector_schema_lib:relational_db_fields(), []) ++ - emqx_connector_schema_lib:ssl_fields() ++ - emqx_connector_schema_lib:prepare_statement_fields(). + emqx_connector_schema_lib:ssl_fields(). add_default_username([{username, OrigUsernameFn} | Tail], Head) -> Head ++ [{username, add_default_fn(OrigUsernameFn, <<"root">>)} | Tail]; @@ -267,7 +273,7 @@ do_check_prepares( ); do_check_prepares(#{prepares := ok}) -> ok; -do_check_prepares(#{prepares := {error, _}} = State) -> +do_check_prepares(#{prepares := {error, _}, query_templates := _} = State) -> %% retry to prepare case prepare_sql(State) of ok -> @@ -275,7 +281,9 @@ do_check_prepares(#{prepares := {error, _}} = State) -> {ok, State#{prepares => ok}}; {error, Reason} -> {error, Reason} - end. + end; +do_check_prepares(_NoTemplates) -> + ok. %% =================================================================== @@ -323,16 +331,18 @@ prepare_sql(Templates, PoolName) -> end. do_prepare_sql(Templates, PoolName) -> - Conns = - [ - begin - {ok, Conn} = ecpool_worker:client(Worker), - Conn - end - || {_Name, Worker} <- ecpool:workers(PoolName) - ], + Conns = get_connections_from_pool(PoolName), prepare_sql_to_conn_list(Conns, Templates). +get_connections_from_pool(PoolName) -> + [ + begin + {ok, Conn} = ecpool_worker:client(Worker), + Conn + end + || {_Name, Worker} <- ecpool:workers(PoolName) + ]. + prepare_sql_to_conn_list([], _Templates) -> ok; prepare_sql_to_conn_list([Conn | ConnList], Templates) -> @@ -369,6 +379,18 @@ prepare_sql_to_conn(Conn, [{{Key, prepstmt}, {SQL, _RowTemplate}} | Rest]) -> prepare_sql_to_conn(Conn, [{_Key, _Template} | Rest]) -> prepare_sql_to_conn(Conn, Rest). +unprepare_sql(#{query_templates := Templates, pool_name := PoolName}) -> + ecpool:remove_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn}), + lists:foreach( + fun(Conn) -> + lists:foreach( + fun(Template) -> unprepare_sql_to_conn(Conn, Template) end, + maps:to_list(Templates) + ) + end, + get_connections_from_pool(PoolName) + ). + unprepare_sql_to_conn(Conn, {{Key, prepstmt}, _}) -> mysql:unprepare(Conn, Key); unprepare_sql_to_conn(Conn, Key) when is_atom(Key) -> @@ -377,12 +399,15 @@ unprepare_sql_to_conn(_Conn, _) -> ok. parse_prepare_sql(Config) -> + parse_prepare_sql(send_message, Config). + +parse_prepare_sql(Key, Config) -> Queries = case Config of #{prepare_statement := Qs} -> Qs; #{sql := Query} -> - #{send_message => Query}; + #{Key => Query}; _ -> #{} end, @@ -436,7 +461,9 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{query_templates := Templates}) - {emqx_jsonish, SQLOrData} ), {TypeOrKey, Row} - end. + end; +proc_sql_params(_TypeOrKey, SQLOrData, Params, _State) -> + {SQLOrData, Params}. on_batch_insert(InstId, BatchReqs, {InsertPart, RowTemplate}, State) -> Rows = [render_row(RowTemplate, Msg) || {_, Msg} <- BatchReqs], diff --git a/apps/emqx_utils/src/emqx_utils_maps.erl b/apps/emqx_utils/src/emqx_utils_maps.erl index 043ab5210..1d97a926a 100644 --- a/apps/emqx_utils/src/emqx_utils_maps.erl +++ b/apps/emqx_utils/src/emqx_utils_maps.erl @@ -16,27 +16,29 @@ -module(emqx_utils_maps). -export([ - deep_get/2, - deep_get/3, - deep_find/2, - deep_put/3, - deep_force_put/3, - deep_remove/2, - deep_merge/2, + best_effort_recursive_sum/3, binary_key_map/1, - safe_atom_key_map/1, - unsafe_atom_key_map/1, - jsonable_map/1, - jsonable_map/2, binary_string/1, deep_convert/3, + deep_find/2, + deep_force_put/3, + deep_get/2, + deep_get/3, + deep_merge/2, + deep_put/3, + deep_remove/2, diff_maps/2, - best_effort_recursive_sum/3, if_only_to_toggle_enable/2, - update_if_present/3, + indent/3, + jsonable_map/1, + jsonable_map/2, + key_comparer/1, put_if/4, rename/3, - key_comparer/1 + safe_atom_key_map/1, + unindent/2, + unsafe_atom_key_map/1, + update_if_present/3 ]). -export_type([config_key/0, config_key_path/0]). @@ -332,3 +334,18 @@ key_comparer(K) -> (M1, M2) -> M1 < M2 end. + +-spec indent(term(), [term()], map()) -> map(). +indent(IndentKey, PickKeys, Map) -> + maps:put( + IndentKey, + maps:with(PickKeys, Map), + maps:without(PickKeys, Map) + ). + +-spec unindent(term(), map()) -> map(). +unindent(Key, Map) -> + maps:merge( + maps:remove(Key, Map), + maps:get(Key, Map, #{}) + ). diff --git a/apps/emqx_utils/test/emqx_utils_maps_tests.erl b/apps/emqx_utils/test/emqx_utils_maps_tests.erl index a9f39536e..2778b5257 100644 --- a/apps/emqx_utils/test/emqx_utils_maps_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_maps_tests.erl @@ -17,6 +17,8 @@ -module(emqx_utils_maps_tests). -include_lib("eunit/include/eunit.hrl"). +-import(emqx_utils_maps, [indent/3, unindent/2]). + best_effort_recursive_sum_test_() -> DummyLogger = fun(_) -> ok end, [ @@ -129,3 +131,44 @@ key_comparer_test() -> #{} ]) ). + +map_indent_unindent_test_() -> + M = #{a => 1, b => 2}, + [ + ?_assertEqual( + #{a => 1, c => #{b => 2}}, + indent(c, [b], M) + ), + ?_assertEqual( + M, + unindent(c, indent(c, [b], M)) + ), + ?_assertEqual( + #{a => 1, b => #{b => 2}}, + indent(b, [b], M) + ), + ?_assertEqual( + M, + unindent(b, #{a => 1, b => #{b => 2}}) + ), + ?_assertEqual( + #{a => 2}, + unindent(b, #{a => 1, b => #{a => 2}}) + ), + ?_assertEqual( + #{c => #{a => 1, b => 2}}, + indent(c, [a, b], M) + ), + ?_assertEqual( + #{a => 1, b => 2, c => #{}}, + indent(c, [], M) + ), + ?_assertEqual( + #{a => 1, b => 2, c => #{}}, + indent(c, [d, e, f], M) + ), + ?_assertEqual( + #{a => 1, b => 2}, + unindent(c, M) + ) + ]. diff --git a/mix.exs b/mix.exs index 21a07dee4..74f4ae105 100644 --- a/mix.exs +++ b/mix.exs @@ -59,7 +59,7 @@ defmodule EMQXUmbrella.MixProject do {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.0", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.15", override: true}, - {:ecpool, github: "emqx/ecpool", tag: "0.5.4", override: true}, + {:ecpool, github: "emqx/ecpool", tag: "0.5.7", override: true}, {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, # maybe forbid to fetch quicer diff --git a/rebar.config b/rebar.config index 830ffc051..7380347f2 100644 --- a/rebar.config +++ b/rebar.config @@ -75,7 +75,7 @@ , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.0"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.15"}}} - , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}} + , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.7"}}} , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.10.1"}}} diff --git a/rel/i18n/emqx_bridge_mysql.hocon b/rel/i18n/emqx_bridge_mysql.hocon index 37326be81..057b1b145 100644 --- a/rel/i18n/emqx_bridge_mysql.hocon +++ b/rel/i18n/emqx_bridge_mysql.hocon @@ -40,4 +40,14 @@ sql_template.desc: sql_template.label: """SQL Template""" +action_parameters.label: +"""Action Parameters""" +action_parameters.desc: +"""Additional parameters specific to this action type""" + +mysql_action.label: +"""MySQL Action""" +mysql_action.desc: +"""Action to interact with a MySQL connector""" + }