diff --git a/apps/emqx_connector/etc/emqx_connector.conf b/apps/emqx_connector/etc/emqx_connector.conf new file mode 100644 index 000000000..54b57ebe6 --- /dev/null +++ b/apps/emqx_connector/etc/emqx_connector.conf @@ -0,0 +1,3 @@ +##-------------------------------------------------------------------- +## EMQ X CONNECTOR Plugin +##-------------------------------------------------------------------- diff --git a/apps/emqx_connector/priv/emqx_connector.schema b/apps/emqx_connector/priv/emqx_connector.schema new file mode 100644 index 000000000..b8476c4d9 --- /dev/null +++ b/apps/emqx_connector/priv/emqx_connector.schema @@ -0,0 +1,2 @@ +%%-*- mode: erlang -*- +%% emqx_connector config mapping diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index d9c414389..ad9b9a4d1 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -14,6 +14,8 @@ , on_health_check/2 ]). +-export([connect/1]). + -export([do_health_check/1]). fields("config") -> @@ -51,11 +53,16 @@ on_stop(InstId, #{poolname := PoolName}) -> logger:info("stopping mysql connector: ~p", [InstId]), emqx_plugin_libs_pool:stop_pool(PoolName). -on_query(InstId, Request, AfterQuery, State) -> - io:format("== the demo log tracer ~p received request: ~p~nstate: ~p~n", - [InstId, Request, State]), - emqx_resource:query_success(AfterQuery), - "this is a demo log messages...". +on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) -> + logger:debug("mysql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]), + case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL]}, no_handover) of + {error, Reason} -> + logger:debug("mysql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]), + emqx_resource:query_failure(AfterQuery); + _ -> + emqx_resource:query_success(AfterQuery) + end, + Result. on_health_check(_InstId, #{poolname := PoolName} = State) -> emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). @@ -66,3 +73,6 @@ do_health_check(Conn) -> %% =================================================================== reconn_interval(true) -> 15; reconn_interval(false) -> false. + +connect(Options) -> + mysql:start_link(Options).