diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 18a246edb..286e0e4e6 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -384,14 +384,10 @@ on_query_async( on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> case do_get_status(PoolName, Timeout) of - true -> - connected; - false -> - ?SLOG(error, #{ - msg => "http_connector_get_status_failed", - state => State - }), - disconnected + ok -> + {connected, State}; + {error, Reason} -> + {disconnected, State, Reason} end. do_get_status(PoolName, Timeout) -> @@ -400,24 +396,28 @@ do_get_status(PoolName, Timeout) -> fun(Worker) -> case ehttpc:health_check(Worker, Timeout) of ok -> - true; - {error, Reason} -> + ok; + {error, Reason} = Error -> ?SLOG(error, #{ - msg => "ehttpc_health_check_failed", + msg => "http_connector_get_status_failed", reason => Reason, worker => Worker }), - false + Error end end, try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of - [_ | _] = Status -> - lists:all(fun(St) -> St =:= true end, Status); - [] -> - false + % we crash in case of non-empty lists since we don't know what to do in that case + [_ | _] = Results -> + case [E || {error, _} = E <- Results] of + [] -> + ok; + Errors -> + hd(Errors) + end catch exit:timeout -> - false + {error, timeout} end. %%-------------------------------------------------------------------- diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 6c0ff7210..693917a27 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -66,10 +66,21 @@ roots() -> fields(config) -> [{server, server()}] ++ - emqx_connector_schema_lib:relational_db_fields() ++ + add_default_username(emqx_connector_schema_lib:relational_db_fields(), []) ++ emqx_connector_schema_lib:ssl_fields() ++ emqx_connector_schema_lib:prepare_statement_fields(). +add_default_username([{username, OrigUsernameFn} | Tail], Head) -> + Head ++ [{username, add_default_fn(OrigUsernameFn, <<"root">>)} | Tail]; +add_default_username([Field | Tail], Head) -> + add_default_username(Tail, Head ++ [Field]). + +add_default_fn(OrigFn, Default) -> + fun + (default) -> Default; + (Field) -> OrigFn(Field) + end. + server() -> Meta = #{desc => ?DESC("server")}, emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS). @@ -83,8 +94,7 @@ on_start( #{ server := Server, database := DB, - username := User, - password := Password, + username := Username, auto_reconnect := AutoReconn, pool_size := PoolSize, ssl := SSL @@ -104,13 +114,15 @@ on_start( [] end, Options = [ - {host, Host}, - {port, Port}, - {user, User}, - {password, Password}, - {database, DB}, - {auto_reconnect, reconn_interval(AutoReconn)}, - {pool_size, PoolSize} + maybe_password_opt(maps:get(password, Config, undefined)) + | [ + {host, Host}, + {port, Port}, + {user, Username}, + {database, DB}, + {auto_reconnect, reconn_interval(AutoReconn)}, + {pool_size, PoolSize} + ] ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), Prepares = parse_prepare_sql(Config), @@ -126,6 +138,11 @@ on_start( {error, Reason} end. +maybe_password_opt(undefined) -> + []; +maybe_password_opt(Password) -> + {password, Password}. + on_stop(InstId, #{poolname := PoolName}) -> ?SLOG(info, #{ msg => "stopping_mysql_connector",