0.9 project structure
This commit is contained in:
parent
986bf5d911
commit
8702ab838c
5
go
5
go
|
@ -1,5 +0,0 @@
|
|||
#!/bin/sh
|
||||
# -*- tab-width:4;indent-tabs-mode:nil -*-
|
||||
# ex: ts=4 sw=4 et
|
||||
|
||||
make && make rel && cd rel/emqttd && ./bin/emqttd console
|
|
@ -1,48 +0,0 @@
|
|||
|
||||
## Overview
|
||||
|
||||
Authentication with user table of MySQL database.
|
||||
|
||||
## etc/plugin.config
|
||||
|
||||
```
|
||||
{emysql, [
|
||||
{pool, 4},
|
||||
{host, "localhost"},
|
||||
{port, 3306},
|
||||
{username, ""},
|
||||
{password, ""},
|
||||
{database, "mqtt"},
|
||||
{encoding, utf8}
|
||||
]},
|
||||
{emqttd_auth_mysql, [
|
||||
{user_table, mqtt_users},
|
||||
%% plain password only
|
||||
{password_hash, plain},
|
||||
{field_mapper, [
|
||||
{username, username},
|
||||
{password, password}
|
||||
]}
|
||||
]}
|
||||
```
|
||||
|
||||
## Users Table(Demo)
|
||||
|
||||
Notice: This is a demo table. You could authenticate with any user tables.
|
||||
|
||||
```
|
||||
CREATE TABLE `mqtt_users` (
|
||||
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
|
||||
`username` varchar(60) DEFAULT NULL,
|
||||
`password` varchar(60) DEFAULT NULL,
|
||||
`salt` varchar(20) DEFAULT NULL,
|
||||
`created` datetime DEFAULT NULL,
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `mqtt_users_username` (`username`)
|
||||
) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
|
||||
```
|
||||
|
||||
## Load Plugin
|
||||
|
||||
Merge the'etc/plugin.config' to emqttd/etc/plugins.config, and the plugin will be loaded by the broker.
|
||||
|
|
@ -1,17 +0,0 @@
|
|||
{emysql, [
|
||||
{pool, 4},
|
||||
{host, "localhost"},
|
||||
{port, 3306},
|
||||
{username, "root"},
|
||||
{password, "public"},
|
||||
{database, "mqtt"},
|
||||
{encoding, utf8}
|
||||
]},
|
||||
{emqttd_auth_mysql, [
|
||||
{user_table, mqtt_users},
|
||||
{password_hash, plain},
|
||||
{field_mapper, [
|
||||
{username, username},
|
||||
{password, password}
|
||||
]}
|
||||
]}
|
|
@ -1,12 +0,0 @@
|
|||
{application, emqttd_auth_mysql,
|
||||
[
|
||||
{description, "emqttd MySQL Authentication Plugin"},
|
||||
{vsn, "1.0"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib
|
||||
]},
|
||||
{mod, {emqttd_auth_mysql_app, []}},
|
||||
{env, []}
|
||||
]}.
|
|
@ -1,79 +0,0 @@
|
|||
%%%-----------------------------------------------------------------------------
|
||||
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc
|
||||
%%% emqttd authentication by mysql 'user' table.
|
||||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
-module(emqttd_auth_mysql).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
||||
-include_lib("emqttd/include/emqttd.hrl").
|
||||
|
||||
-behaviour(emqttd_auth_mod).
|
||||
|
||||
-export([init/1, check/3, description/0]).
|
||||
|
||||
-record(state, {user_table, name_field, pass_field, pass_hash}).
|
||||
|
||||
init(Opts) ->
|
||||
Mapper = proplists:get_value(field_mapper, Opts),
|
||||
{ok, #state{user_table = proplists:get_value(user_table, Opts),
|
||||
name_field = proplists:get_value(username, Mapper),
|
||||
pass_field = proplists:get_value(password, Mapper),
|
||||
pass_hash = proplists:get_value(password_hash, Opts)}}.
|
||||
|
||||
check(#mqtt_client{username = undefined}, _Password, _State) ->
|
||||
{error, "Username undefined"};
|
||||
check(#mqtt_client{username = <<>>}, _Password, _State) ->
|
||||
{error, "Username undefined"};
|
||||
check(_Client, undefined, _State) ->
|
||||
{error, "Password undefined"};
|
||||
check(_Client, <<>>, _State) ->
|
||||
{error, "Password undefined"};
|
||||
check(#mqtt_client{username = Username}, Password,
|
||||
#state{user_table = UserTab, pass_hash = Type,
|
||||
name_field = NameField, pass_field = PassField}) ->
|
||||
Where = {'and', {NameField, Username}, {PassField, hash(Type, Password)}},
|
||||
case emysql:select(UserTab, Where) of
|
||||
{ok, []} -> {error, "Username or Password "};
|
||||
{ok, _Record} -> ok
|
||||
end.
|
||||
|
||||
description() -> "Authentication by MySQL".
|
||||
|
||||
hash(plain, Password) ->
|
||||
Password;
|
||||
|
||||
hash(md5, Password) ->
|
||||
hexstring(crypto:hash(md5, Password));
|
||||
|
||||
hash(sha, Password) ->
|
||||
hexstring(crypto:hash(sha, Password)).
|
||||
|
||||
hexstring(<<X:128/big-unsigned-integer>>) ->
|
||||
lists:flatten(io_lib:format("~32.16.0b", [X]));
|
||||
|
||||
hexstring(<<X:160/big-unsigned-integer>>) ->
|
||||
lists:flatten(io_lib:format("~40.16.0b", [X])).
|
||||
|
|
@ -1,59 +0,0 @@
|
|||
%%%-----------------------------------------------------------------------------
|
||||
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc
|
||||
%%% emqttd mysql authentication app.
|
||||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
-module(emqttd_auth_mysql_app).
|
||||
|
||||
-behaviour(application).
|
||||
%% Application callbacks
|
||||
-export([start/2, prep_stop/1, stop/1]).
|
||||
|
||||
-behaviour(supervisor).
|
||||
%% Supervisor callbacks
|
||||
-export([init/1]).
|
||||
|
||||
%%%=============================================================================
|
||||
%%% Application callbacks
|
||||
%%%=============================================================================
|
||||
|
||||
start(_StartType, _StartArgs) ->
|
||||
Env = application:get_all_env(),
|
||||
ok = emqttd_access_control:register_mod(auth, emqttd_auth_mysql, Env),
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
prep_stop(State) ->
|
||||
emqttd_access_control:unregister_mod(auth, emqttd_auth_mysql), State.
|
||||
|
||||
stop(_State) ->
|
||||
ok.
|
||||
|
||||
%%%=============================================================================
|
||||
%%% Supervisor callbacks(Dummy)
|
||||
%%%=============================================================================
|
||||
|
||||
init([]) ->
|
||||
{ok, { {one_for_one, 5, 10}, []} }.
|
||||
|
||||
|
|
@ -1,42 +0,0 @@
|
|||
# emysql
|
||||
|
||||
Erlang MySQL client
|
||||
|
||||
## config
|
||||
|
||||
```
|
||||
|
||||
```
|
||||
|
||||
## Select API
|
||||
|
||||
* emyssql:select(tab).
|
||||
* emysql:select({tab, [col1,col2]}).
|
||||
* emysql:select({tab, [col1, col2], {id,1}}).
|
||||
* emysql:select(Query, Load).
|
||||
|
||||
## Update API
|
||||
|
||||
* emysql:update(tab, [{Field1, Val}, {Field2, Val2}], {id, 1}).
|
||||
|
||||
## Insert API
|
||||
|
||||
* emysql:insert(tab, [{Field1, Val}, {Field2, Val2}]).
|
||||
|
||||
## Delete API
|
||||
|
||||
* emysql:delete(tab, {name, Name}]).
|
||||
|
||||
## Query API
|
||||
|
||||
* emysql:sqlquery("select * from tab;").
|
||||
|
||||
## Prepare API
|
||||
|
||||
* emysql:prepare(find_with_id, "select * from tab where id = ?;").
|
||||
* emysql:execute(find_with_id, [Id]).
|
||||
* emysql:unprepare(find_with_id).
|
||||
|
||||
## MySQL Client Protocal
|
||||
|
||||
* http://forge.mysql.com/wiki/MySQL_Internals_ClientServer_Protocol
|
|
@ -1,2 +0,0 @@
|
|||
%% MySQL result record:
|
||||
-record(mysql_result, {fieldinfo = [], rows = [], affectedrows = 0, insert_id =0, error = ""}).
|
|
@ -1,14 +0,0 @@
|
|||
{application, emysql,
|
||||
[{description, "Erlang MySQL Driver"},
|
||||
{vsn, "1.0"},
|
||||
{modules, [
|
||||
emysql,
|
||||
emysql_app,
|
||||
emysql_sup,
|
||||
emysql_auth,
|
||||
emysql_conn,
|
||||
emysql_recv]},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, sasl, crypto]},
|
||||
{env, []},
|
||||
{mod, {emysql_app, []}}]}.
|
|
@ -1,514 +0,0 @@
|
|||
%%%----------------------------------------------------------------------
|
||||
%%% File : emysql.erl
|
||||
%%% Author : Ery Lee <ery.lee@gmail.com>
|
||||
%%% Purpose : Mysql access api.
|
||||
%%% Created : 19 May 2009
|
||||
%%% License : http://www.opengoss.com
|
||||
%%%
|
||||
%%% Copyright (C) 2012, www.opengoss.com
|
||||
%%%----------------------------------------------------------------------
|
||||
-module(emysql).
|
||||
|
||||
-author('ery.lee@gmail.com').
|
||||
|
||||
-include("emysql.hrl").
|
||||
|
||||
-export([start_link/1]).
|
||||
|
||||
-ifdef(use_specs).
|
||||
|
||||
-spec(conns/0 :: () -> list()).
|
||||
|
||||
-endif.
|
||||
|
||||
%command functions
|
||||
-export([info/0,
|
||||
pool/1,
|
||||
conns/0]).
|
||||
|
||||
%sql functions
|
||||
-export([insert/2,
|
||||
insert/3,
|
||||
select/1,
|
||||
select/2,
|
||||
select/3,
|
||||
update/2,
|
||||
update/3,
|
||||
delete/1,
|
||||
delete/2,
|
||||
truncate/1,
|
||||
prepare/2,
|
||||
execute/1,
|
||||
execute/2,
|
||||
unprepare/1,
|
||||
sqlquery/1,
|
||||
sqlquery/2]).
|
||||
|
||||
-behavior(gen_server).
|
||||
|
||||
-export([init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
terminate/2,
|
||||
code_change/3]).
|
||||
|
||||
-record(state, {ids}).
|
||||
|
||||
%% External exports
|
||||
-export([encode/1,
|
||||
encode/2,
|
||||
escape/1,
|
||||
escape_like/1]).
|
||||
|
||||
start_link(PoolSize) ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [PoolSize], []).
|
||||
|
||||
info() ->
|
||||
[emysql_conn:info(Pid) || Pid <-
|
||||
pg2:get_local_members(emysql_conn)].
|
||||
|
||||
%pool pool
|
||||
pool(Id) ->
|
||||
gen_server:cast(?MODULE, {pool, Id}).
|
||||
|
||||
conns() ->
|
||||
gen_server:call(?MODULE, conns).
|
||||
|
||||
insert(Tab, Record) when is_atom(Tab) ->
|
||||
sqlquery(encode_insert(Tab, Record)).
|
||||
|
||||
insert(_Tab, _Fields, Values) when length(Values) == 0 ->
|
||||
{updated, {0, 0}};
|
||||
|
||||
insert(Tab, Fields, Values) when length(Values) > 0 ->
|
||||
sqlquery(encode_insert(Tab, Fields, Values)).
|
||||
|
||||
encode_insert(Tab, Record) ->
|
||||
{Fields, Values} = lists:unzip([{atom_to_list(F), encode(V)}
|
||||
|| {F, V} <- Record]),
|
||||
["insert into ", atom_to_list(Tab), "(",
|
||||
string:join(Fields, ","), ") values(",
|
||||
string:join(Values, ","), ");"].
|
||||
|
||||
encode_insert(Tab, Fields, Rows) ->
|
||||
Encode = fun(Row) -> string:join([encode(V) || V <- Row], ",") end,
|
||||
Rows1 = [lists:concat(["(", Encode(Row), ")"]) || Row <- Rows],
|
||||
["insert into ", atom_to_list(Tab), "(",
|
||||
string:join([atom_to_list(F) || F <- Fields], ","),
|
||||
") values", string:join(Rows1, ","), ";"].
|
||||
|
||||
select(Tab) when is_atom(Tab) ->
|
||||
sqlquery(encode_select(Tab));
|
||||
|
||||
select(Select) when is_tuple(Select) ->
|
||||
sqlquery(encode_select(Select)).
|
||||
|
||||
select(Tab, Where) when is_atom(Tab) and is_tuple(Where) ->
|
||||
sqlquery(encode_select({Tab, Where}));
|
||||
|
||||
select(Tab, Fields) when is_atom(Tab) and is_list(Fields) ->
|
||||
sqlquery(encode_select({Tab, Fields}));
|
||||
|
||||
select(Select, Load) when is_tuple(Select) and is_integer(Load) ->
|
||||
sqlquery(encode_select(Select), Load).
|
||||
|
||||
select(Tab, Fields, Where) when is_atom(Tab)
|
||||
and is_list(Fields) and is_tuple(Where) ->
|
||||
sqlquery(encode_select({Tab, Fields, Where})).
|
||||
|
||||
encode_select(Tab) when is_atom(Tab) ->
|
||||
encode_select({Tab, ['*'], undefined});
|
||||
|
||||
encode_select({Tab, Fields}) when is_atom(Tab)
|
||||
and is_list(Fields) ->
|
||||
encode_select({Tab, Fields, undefined});
|
||||
|
||||
encode_select({Tab, Where}) when is_atom(Tab)
|
||||
and is_tuple(Where) ->
|
||||
encode_select({Tab, ['*'], Where});
|
||||
|
||||
encode_select({Tab, Fields, undefined}) when is_atom(Tab)
|
||||
and is_list(Fields) ->
|
||||
["select ", encode_fields(Fields), " from ", atom_to_list(Tab), ";"];
|
||||
|
||||
encode_select({Tab, Fields, Where}) when is_atom(Tab)
|
||||
and is_list(Fields) and is_tuple(Where) ->
|
||||
["select ", encode_fields(Fields), " from ",
|
||||
atom_to_list(Tab), " where ", encode_where(Where), ";"].
|
||||
|
||||
encode_fields(Fields) ->
|
||||
string:join([atom_to_list(F) || F <- Fields], " ,").
|
||||
|
||||
update(Tab, Record) when is_atom(Tab)
|
||||
and is_list(Record) ->
|
||||
case proplists:get_value(id, Record) of
|
||||
undefined ->
|
||||
Updates = string:join([encode_column(Col) || Col <- Record], ","),
|
||||
Query = ["update ", atom_to_list(Tab), " set ", Updates, ";"],
|
||||
sqlquery(Query);
|
||||
Id ->
|
||||
update(Tab, lists:keydelete(id, 1, Record), {id, Id})
|
||||
end.
|
||||
|
||||
update(Tab, Record, Where) ->
|
||||
Update = string:join([encode_column(Col) || Col <- Record], ","),
|
||||
Query = ["update ", atom_to_list(Tab), " set ", Update,
|
||||
" where ", encode_where(Where), ";"],
|
||||
sqlquery(Query).
|
||||
|
||||
encode_column({F, V}) when is_atom(F) ->
|
||||
lists:concat([atom_to_list(F), "=", encode(V)]).
|
||||
|
||||
delete(Tab) when is_atom(Tab) ->
|
||||
sqlquery(["delete from ", atom_to_list(Tab), ";"]).
|
||||
|
||||
delete(Tab, Id) when is_atom(Tab)
|
||||
and is_integer(Id) ->
|
||||
Query = ["delete from ", atom_to_list(Tab),
|
||||
" where ", encode_where({id, Id})],
|
||||
sqlquery(Query);
|
||||
|
||||
delete(Tab, Where) when is_atom(Tab)
|
||||
and is_tuple(Where) ->
|
||||
Query = ["delete from ", atom_to_list(Tab),
|
||||
" where ", encode_where(Where)],
|
||||
sqlquery(Query).
|
||||
|
||||
truncate(Tab) when is_atom(Tab) ->
|
||||
sqlquery(["truncate table ", atom_to_list(Tab), ";"]).
|
||||
|
||||
sqlquery(Query) ->
|
||||
sqlquery(Query, 1).
|
||||
|
||||
sqlquery(Query, Load) ->
|
||||
with_next_conn(fun(Conn) ->
|
||||
case catch mysql_to_odbc(emysql_conn:sqlquery(Conn, iolist_to_binary(Query))) of
|
||||
{selected, NewFields, Records} ->
|
||||
{ok, to_tuple_records(NewFields, Records)};
|
||||
{error, Reason} ->
|
||||
{error, Reason};
|
||||
Res ->
|
||||
Res
|
||||
end
|
||||
end, Load).
|
||||
|
||||
prepare(Name, Stmt) when is_list(Stmt) ->
|
||||
prepare(Name, list_to_binary(Stmt));
|
||||
|
||||
prepare(Name, Stmt) when is_binary(Stmt) ->
|
||||
with_all_conns(fun(Conn) ->
|
||||
emysql_conn:prepare(Conn, Name, Stmt)
|
||||
end).
|
||||
|
||||
execute(Name) ->
|
||||
execute(Name, []).
|
||||
|
||||
execute(Name, Params) ->
|
||||
with_next_conn(fun(Conn) ->
|
||||
case catch mysql_to_odbc(emysql_conn:execute(Conn, Name, Params)) of
|
||||
{selected, NewFields, Records} ->
|
||||
{ok, to_tuple_records(NewFields, Records)};
|
||||
{error, Reason} ->
|
||||
{error, Reason};
|
||||
Res ->
|
||||
Res
|
||||
end
|
||||
end, 1).
|
||||
|
||||
unprepare(Name) ->
|
||||
with_all_conns(fun(Conn) ->
|
||||
emysql_conn:unprepare(Conn, Name)
|
||||
end).
|
||||
|
||||
with_next_conn(Fun, _Load) ->
|
||||
Fun(pg2:get_closest_pid(emysql_conn)).
|
||||
|
||||
with_all_conns(Fun) ->
|
||||
[Fun(Pid) || Pid <- pg2:get_local_members(emysql_conn)].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: init(Args) -> {ok, State} |
|
||||
%% {ok, State, Timeout} |
|
||||
%% ignore |
|
||||
%% {stop, Reason}
|
||||
%% Description: Initiates the server
|
||||
%%--------------------------------------------------------------------
|
||||
init([PoolSize]) ->
|
||||
Ids = lists:seq(1, PoolSize),
|
||||
[put(Id, 0) || Id <- Ids],
|
||||
[put({count, Id}, 0) || Id <- Ids],
|
||||
{ok, #state{ids = Ids}}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
|
||||
%% {reply, Reply, State, Timeout} |
|
||||
%% {noreply, State} |
|
||||
%% {noreply, State, Timeout} |
|
||||
%% {stop, Reason, Reply, State} |
|
||||
%% {stop, Reason, State}
|
||||
%% Description: Handling call messages
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
handle_call(info, _From, State) ->
|
||||
Reply = [{conn, Id, Pid, get(Id), get({total, Id})}
|
||||
|| {Id, Pid} <- get_all_conns()],
|
||||
{reply, Reply, State};
|
||||
|
||||
handle_call({next_conn, Load}, _From, #state{ids = Ids} = State) ->
|
||||
{ConnId, ConnLoad} =
|
||||
lists:foldl(fun(Id, {MinId, MinLoad}) ->
|
||||
ThisLoad = get(Id),
|
||||
if
|
||||
ThisLoad =< MinLoad -> {Id, ThisLoad};
|
||||
true -> {MinId, MinLoad}
|
||||
end
|
||||
end, {undefined, 16#ffffffff}, Ids),
|
||||
Reply =
|
||||
case ConnId of
|
||||
undefined ->
|
||||
undefined;
|
||||
_ ->
|
||||
ConnPid = get_conn_pid(ConnId),
|
||||
put(ConnId, ConnLoad+Load),
|
||||
Count = get({total, ConnId}),
|
||||
put({total, ConnId}, Count+1),
|
||||
{ConnId, ConnPid}
|
||||
end,
|
||||
{reply, Reply, State};
|
||||
|
||||
handle_call(conns, _From, State) ->
|
||||
Conns = get_all_conns(),
|
||||
{reply, Conns, State};
|
||||
|
||||
handle_call(Req, From, State) ->
|
||||
gen_server:reply(From, {badcall, Req}),
|
||||
{stop, {badcall, Req}, State}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: handle_cast(Msg, State) -> {noreply, State} |
|
||||
%% {noreply, State, Timeout} |
|
||||
%% {stop, Reason, State}
|
||||
%% Description: Handling cast messages
|
||||
%%--------------------------------------------------------------------
|
||||
handle_cast({pool, Id}, State) ->
|
||||
put(Id, 0),
|
||||
put({total, Id}, 0),
|
||||
{noreply, State};
|
||||
|
||||
handle_cast({done, ConnId, Load}, State) ->
|
||||
put(ConnId, get(ConnId) - Load),
|
||||
{noreply, State};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
{stop, {badcast, Msg}, State}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: handle_info(Info, State) -> {noreply, State} |
|
||||
%% {noreply, State, Timeout} |
|
||||
%% {stop, Reason, State}
|
||||
%% Description: Handling all non call/cast messages
|
||||
%%--------------------------------------------------------------------
|
||||
handle_info(Info, State) ->
|
||||
{stop, {badinfo, Info}, State}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: terminate(Reason, State) -> void()
|
||||
%% Description: This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any necessary
|
||||
%% cleaning up. When it returns, the gen_server terminates with Reason.
|
||||
%% The return value is ignored.
|
||||
%%--------------------------------------------------------------------
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
%%--------------------------------------------------------------------
|
||||
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
|
||||
%% Description: Convert process state when code is changed
|
||||
%%--------------------------------------------------------------------
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
get_conn_pid(CId) ->
|
||||
[{CId, Pid, _Type, _Modules} | _] =
|
||||
lists:dropwhile(fun ({Id, _Pid, _Type, _Modules})
|
||||
when Id =:= CId -> false;
|
||||
(_) -> true
|
||||
end,
|
||||
supervisor:which_children(emysql_sup)),
|
||||
Pid.
|
||||
|
||||
get_all_conns() ->
|
||||
[{Id, Pid} || {Id, Pid, _Type, _Modules} <-
|
||||
supervisor:which_children(emysql_sup), is_integer(Id)].
|
||||
|
||||
%% Convert MySQL query result to Erlang ODBC result formalism
|
||||
mysql_to_odbc({updated, #mysql_result{affectedrows=AffectedRows, insert_id = InsertId} = _MySQLRes}) ->
|
||||
{updated, {AffectedRows, InsertId}};
|
||||
|
||||
mysql_to_odbc({data, #mysql_result{fieldinfo = FieldInfo, rows=AllRows} = _MySQLRes}) ->
|
||||
mysql_item_to_odbc(FieldInfo, AllRows);
|
||||
|
||||
mysql_to_odbc({error, MySQLRes}) when is_list(MySQLRes) ->
|
||||
{error, MySQLRes};
|
||||
|
||||
mysql_to_odbc({error, #mysql_result{error=Reason} = _MySQLRes}) ->
|
||||
{error, Reason};
|
||||
|
||||
mysql_to_odbc({error, Reason}) ->
|
||||
{error, Reason}.
|
||||
|
||||
%% When tabular data is returned, convert it to the ODBC formalism
|
||||
mysql_item_to_odbc(Columns, Recs) ->
|
||||
%% For now, there is a bug and we do not get the correct value from MySQL
|
||||
%% module:
|
||||
{selected,
|
||||
[element(2, Column) || Column <- Columns],
|
||||
[list_to_tuple(Rec) || Rec <- Recs]}.
|
||||
|
||||
%%internal functions
|
||||
encode_where({'and', L, R}) ->
|
||||
encode_where(L) ++ " and " ++ encode_where(R);
|
||||
|
||||
encode_where({'and', List}) when is_list(List) ->
|
||||
string:join([encode_where(E) || E <- List], " and ");
|
||||
|
||||
encode_where({'or', L, R}) ->
|
||||
encode_where(L) ++ " or " ++ encode_where(R);
|
||||
|
||||
encode_where({'or', List}) when is_list(List) ->
|
||||
string:join([encode_where(E) || E <- List], " or ");
|
||||
|
||||
encode_where({like, Field, Value}) ->
|
||||
atom_to_list(Field) ++ " like " ++ encode(Value);
|
||||
|
||||
encode_where({'<', Field, Value}) ->
|
||||
atom_to_list(Field) ++ " < " ++ encode(Value);
|
||||
|
||||
encode_where({'<=', Field, Value}) ->
|
||||
atom_to_list(Field) ++ " <= " ++ encode(Value);
|
||||
|
||||
encode_where({'>', Field, Value}) ->
|
||||
atom_to_list(Field) ++ " > " ++ encode(Value);
|
||||
|
||||
encode_where({'>=', Field, Value}) ->
|
||||
atom_to_list(Field) ++ " >= " ++ encode(Value);
|
||||
|
||||
encode_where({'in', Field, Values}) ->
|
||||
InStr = string:join([encode(Value) || Value <- Values], ","),
|
||||
atom_to_list(Field) ++ " in (" ++ InStr ++ ")";
|
||||
|
||||
encode_where({Field, Value}) ->
|
||||
atom_to_list(Field) ++ " = " ++ encode(Value).
|
||||
|
||||
to_tuple_records(_Fields, []) ->
|
||||
[];
|
||||
|
||||
to_tuple_records(Fields, Records) ->
|
||||
[to_tuple_record(Fields, tuple_to_list(Record)) || Record <- Records].
|
||||
|
||||
to_tuple_record(Fields, Record) when length(Fields) == length(Record) ->
|
||||
to_tuple_record(Fields, Record, []).
|
||||
|
||||
to_tuple_record([], [], Acc) ->
|
||||
Acc;
|
||||
|
||||
to_tuple_record([_F|FT], [undefined|VT], Acc) ->
|
||||
to_tuple_record(FT, VT, Acc);
|
||||
|
||||
to_tuple_record([F|FT], [V|VT], Acc) ->
|
||||
to_tuple_record(FT, VT, [{list_to_atom(binary_to_list(F)), V} | Acc]).
|
||||
|
||||
%% Escape character that will confuse an SQL engine
|
||||
%% Percent and underscore only need to be escaped for pattern matching like
|
||||
%% statement
|
||||
escape_like(S) when is_list(S) ->
|
||||
[escape_like(C) || C <- S];
|
||||
escape_like($%) -> "\\%";
|
||||
escape_like($_) -> "\\_";
|
||||
escape_like(C) -> escape(C).
|
||||
|
||||
%% Escape character that will confuse an SQL engine
|
||||
escape(S) when is_list(S) ->
|
||||
[escape(C) || C <- S];
|
||||
%% Characters to escape
|
||||
escape($\0) -> "\\0";
|
||||
escape($\n) -> "\\n";
|
||||
escape($\t) -> "\\t";
|
||||
escape($\b) -> "\\b";
|
||||
escape($\r) -> "\\r";
|
||||
escape($') -> "\\'";
|
||||
escape($") -> "\\\"";
|
||||
escape($\\) -> "\\\\";
|
||||
escape(C) -> C.
|
||||
|
||||
encode(Val) ->
|
||||
encode(Val, false).
|
||||
encode(Val, false) when Val == undefined; Val == null ->
|
||||
"NULL";
|
||||
encode(Val, true) when Val == undefined; Val == null ->
|
||||
<<"NULL">>;
|
||||
encode(Val, false) when is_binary(Val) ->
|
||||
binary_to_list(quote(Val));
|
||||
encode(Val, true) when is_binary(Val) ->
|
||||
quote(Val);
|
||||
encode(Val, true) ->
|
||||
list_to_binary(encode(Val,false));
|
||||
encode(Val, false) when is_atom(Val) ->
|
||||
quote(atom_to_list(Val));
|
||||
encode(Val, false) when is_list(Val) ->
|
||||
quote(Val);
|
||||
encode(Val, false) when is_integer(Val) ->
|
||||
integer_to_list(Val);
|
||||
encode(Val, false) when is_float(Val) ->
|
||||
[Res] = io_lib:format("~w", [Val]),
|
||||
Res;
|
||||
encode({datetime, Val}, AsBinary) ->
|
||||
encode(Val, AsBinary);
|
||||
encode({{Year, Month, Day}, {Hour, Minute, Second}}, false) ->
|
||||
Res = two_digits([Year, Month, Day, Hour, Minute, Second]),
|
||||
lists:flatten(Res);
|
||||
encode({TimeType, Val}, AsBinary)
|
||||
when TimeType == 'date';
|
||||
TimeType == 'time' ->
|
||||
encode(Val, AsBinary);
|
||||
encode({Time1, Time2, Time3}, false) ->
|
||||
Res = two_digits([Time1, Time2, Time3]),
|
||||
lists:flatten(Res);
|
||||
encode(Val, _AsBinary) ->
|
||||
{error, {unrecognized_value, Val}}.
|
||||
|
||||
two_digits(Nums) when is_list(Nums) ->
|
||||
[two_digits(Num) || Num <- Nums];
|
||||
two_digits(Num) ->
|
||||
[Str] = io_lib:format("~b", [Num]),
|
||||
case length(Str) of
|
||||
1 -> [$0 | Str];
|
||||
_ -> Str
|
||||
end.
|
||||
|
||||
%% Quote a string or binary value so that it can be included safely in a
|
||||
%% MySQL query.
|
||||
quote(String) when is_list(String) ->
|
||||
[39 | lists:reverse([39 | quote(String, [])])]; %% 39 is $'
|
||||
quote(Bin) when is_binary(Bin) ->
|
||||
list_to_binary(quote(binary_to_list(Bin))).
|
||||
|
||||
quote([], Acc) ->
|
||||
Acc;
|
||||
quote([0 | Rest], Acc) ->
|
||||
quote(Rest, [$0, $\\ | Acc]);
|
||||
quote([10 | Rest], Acc) ->
|
||||
quote(Rest, [$n, $\\ | Acc]);
|
||||
quote([13 | Rest], Acc) ->
|
||||
quote(Rest, [$r, $\\ | Acc]);
|
||||
quote([$\\ | Rest], Acc) ->
|
||||
quote(Rest, [$\\ , $\\ | Acc]);
|
||||
quote([39 | Rest], Acc) -> %% 39 is $'
|
||||
quote(Rest, [39, $\\ | Acc]); %% 39 is $'
|
||||
quote([34 | Rest], Acc) -> %% 34 is $"
|
||||
quote(Rest, [34, $\\ | Acc]); %% 34 is $"
|
||||
quote([26 | Rest], Acc) ->
|
||||
quote(Rest, [$Z, $\\ | Acc]);
|
||||
quote([C | Rest], Acc) ->
|
||||
quote(Rest, [C | Acc]).
|
||||
|
|
@ -1,27 +0,0 @@
|
|||
%%%----------------------------------------------------------------------
|
||||
%%% File : emysql_app.erl
|
||||
%%% Author : Ery Lee <ery.lee@gmail.com>
|
||||
%%% Purpose : mysql driver application
|
||||
%%% Created : 21 May 2009
|
||||
%%% Updated : 11 Jan 2010
|
||||
%%% License : http://www.opengoss.com
|
||||
%%%
|
||||
%%% Copyright (C) 2007-2010, www.opengoss.com
|
||||
%%%----------------------------------------------------------------------
|
||||
-module(emysql_app).
|
||||
|
||||
-author('ery.lee@gmail.com').
|
||||
|
||||
-behavior(application).
|
||||
|
||||
-export([start/0, start/2, stop/1]).
|
||||
|
||||
start() ->
|
||||
application:start(emysql).
|
||||
|
||||
start(normal, _Args) ->
|
||||
emysql_sup:start_link(application:get_all_env()).
|
||||
|
||||
stop(_) ->
|
||||
ok.
|
||||
|
|
@ -1,102 +0,0 @@
|
|||
-module(emysql_auth).
|
||||
|
||||
-export([make_auth/2, make_new_auth/3, password_old/2, password_new/2]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Macros
|
||||
%%--------------------------------------------------------------------
|
||||
-define(LONG_PASSWORD, 1).
|
||||
-define(LONG_FLAG, 4).
|
||||
-define(PROTOCOL_41, 512).
|
||||
-define(TRANSACTIONS, 8192).
|
||||
-define(SECURE_CONNECTION, 32768).
|
||||
-define(CONNECT_WITH_DB, 8).
|
||||
-define(MAX_PACKET_SIZE, 1000000).
|
||||
|
||||
password_old(Password, Salt) ->
|
||||
{P1, P2} = hash(Password),
|
||||
{S1, S2} = hash(Salt),
|
||||
Seed1 = P1 bxor S1,
|
||||
Seed2 = P2 bxor S2,
|
||||
List = rnd(9, Seed1, Seed2),
|
||||
{L, [Extra]} = lists:split(8, List),
|
||||
list_to_binary(lists:map(fun (E) -> E bxor (Extra - 64) end, L)).
|
||||
|
||||
%% part of do_old_auth/4, which is part of mysql_init/4
|
||||
make_auth(User, Password) ->
|
||||
Caps = ?LONG_PASSWORD bor ?LONG_FLAG bor ?TRANSACTIONS,
|
||||
Maxsize = 0,
|
||||
UserB = list_to_binary(User),
|
||||
PasswordB = Password,
|
||||
<<Caps:16/little, Maxsize:24/little, UserB/binary, 0:8,
|
||||
PasswordB/binary>>.
|
||||
|
||||
%% part of do_new_auth/4, which is part of mysql_init/4
|
||||
make_new_auth(User, Password, Database) ->
|
||||
DBCaps = case Database of
|
||||
none ->
|
||||
0;
|
||||
_ ->
|
||||
?CONNECT_WITH_DB
|
||||
end,
|
||||
Caps = ?LONG_PASSWORD bor ?LONG_FLAG bor ?TRANSACTIONS bor
|
||||
?PROTOCOL_41 bor ?SECURE_CONNECTION bor DBCaps,
|
||||
Maxsize = ?MAX_PACKET_SIZE,
|
||||
UserB = list_to_binary(User),
|
||||
PasswordL = size(Password),
|
||||
DatabaseB = case Database of
|
||||
none ->
|
||||
<<>>;
|
||||
_ ->
|
||||
list_to_binary(Database)
|
||||
end,
|
||||
<<Caps:32/little, Maxsize:32/little, 8:8, 0:23/integer-unit:8,
|
||||
UserB/binary, 0:8, PasswordL:8, Password/binary, DatabaseB/binary>>.
|
||||
|
||||
hash(S) ->
|
||||
hash(S, 1345345333, 305419889, 7).
|
||||
|
||||
hash([C | S], N1, N2, Add) ->
|
||||
N1_1 = N1 bxor (((N1 band 63) + Add) * C + N1 * 256),
|
||||
N2_1 = N2 + ((N2 * 256) bxor N1_1),
|
||||
Add_1 = Add + C,
|
||||
hash(S, N1_1, N2_1, Add_1);
|
||||
hash([], N1, N2, _Add) ->
|
||||
Mask = (1 bsl 31) - 1,
|
||||
{N1 band Mask , N2 band Mask}.
|
||||
|
||||
rnd(N, Seed1, Seed2) ->
|
||||
Mod = (1 bsl 30) - 1,
|
||||
rnd(N, [], Seed1 rem Mod, Seed2 rem Mod).
|
||||
|
||||
rnd(0, List, _, _) ->
|
||||
lists:reverse(List);
|
||||
rnd(N, List, Seed1, Seed2) ->
|
||||
Mod = (1 bsl 30) - 1,
|
||||
NSeed1 = (Seed1 * 3 + Seed2) rem Mod,
|
||||
NSeed2 = (NSeed1 + Seed2 + 33) rem Mod,
|
||||
Float = (float(NSeed1) / float(Mod))*31,
|
||||
Val = trunc(Float)+64,
|
||||
rnd(N - 1, [Val | List], NSeed1, NSeed2).
|
||||
|
||||
|
||||
dualmap(_F, [], []) ->
|
||||
[];
|
||||
dualmap(F, [E1 | R1], [E2 | R2]) ->
|
||||
[F(E1, E2) | dualmap(F, R1, R2)].
|
||||
|
||||
bxor_binary(B1, B2) ->
|
||||
list_to_binary(dualmap(fun (E1, E2) ->
|
||||
E1 bxor E2
|
||||
end, binary_to_list(B1), binary_to_list(B2))).
|
||||
|
||||
password_new(Password, Salt) ->
|
||||
Stage1 = crypto:sha(Password),
|
||||
Stage2 = crypto:sha(Stage1),
|
||||
Res = crypto:sha_final(
|
||||
crypto:sha_update(
|
||||
crypto:sha_update(crypto:sha_init(), Salt),
|
||||
Stage2)
|
||||
),
|
||||
bxor_binary(Res, Stage1).
|
||||
|
|
@ -1,739 +0,0 @@
|
|||
%%% File : emysql_conn.erl
|
||||
%%% Author : Ery Lee
|
||||
%%% Purpose : connection of mysql driver
|
||||
%%% Created : 11 Jan 2010
|
||||
%%% License : http://www.opengoss.com
|
||||
%%%
|
||||
%%% Copyright (C) 2012, www.opengoss.com
|
||||
%%%----------------------------------------------------------------------
|
||||
-module(emysql_conn).
|
||||
|
||||
-include("emysql.hrl").
|
||||
|
||||
-import(proplists, [get_value/2, get_value/3]).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% External exports
|
||||
-export([start_link/2,
|
||||
info/1,
|
||||
sqlquery/2,
|
||||
sqlquery/3,
|
||||
prepare/3,
|
||||
execute/3,
|
||||
execute/4,
|
||||
unprepare/2]).
|
||||
|
||||
%% Callback
|
||||
-export([init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
terminate/2,
|
||||
code_change/3]).
|
||||
|
||||
-record(state, {
|
||||
id,
|
||||
host,
|
||||
port,
|
||||
user,
|
||||
password,
|
||||
database,
|
||||
encoding,
|
||||
mysql_version,
|
||||
recv_pid,
|
||||
socket,
|
||||
data}).
|
||||
|
||||
%%-define(KEEPALIVE_QUERY, <<"SELECT 1;">>).
|
||||
|
||||
-define(SECURE_CONNECTION, 32768).
|
||||
|
||||
-define(MYSQL_QUERY_OP, 3).
|
||||
|
||||
%CALL > CONNECT
|
||||
-define(CALL_TIMEOUT, 301000).
|
||||
|
||||
-define(CONNECT_TIMEOUT, 300000).
|
||||
|
||||
-define(MYSQL_4_0, 40). %% Support for MySQL 4.0.x
|
||||
|
||||
-define(MYSQL_4_1, 41). %% Support for MySQL 4.1.x et 5.0.x
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: start(Opts)
|
||||
%% Descrip.: Starts a mysql_conn process that connects to a MySQL
|
||||
%% server, logs in and chooses a database.
|
||||
%% Returns : {ok, Pid} | {error, Reason}
|
||||
%% Pid = pid()
|
||||
%% Reason = string()
|
||||
%%--------------------------------------------------------------------
|
||||
start_link(Id, Opts) ->
|
||||
gen_server:start_link(?MODULE, [Id, Opts], []).
|
||||
|
||||
info(Conn) ->
|
||||
gen_server:call(Conn, info).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: sqlquery(Query)
|
||||
%% Queries = A single binary() query or a list of binary() queries.
|
||||
%% If a list is provided, the return value is the return
|
||||
%% of the last query, or the first query that has
|
||||
%% returned an error. If an error occurs, execution of
|
||||
%% the following queries is aborted.
|
||||
%% From = pid() or term(), use a From of self() when
|
||||
%% using this module for a single connection,
|
||||
%% or pass the gen_server:call/3 From argument if
|
||||
%% using a gen_server to do the querys (e.g. the
|
||||
%% mysql_dispatcher)
|
||||
%% Timeout = integer() | infinity, gen_server timeout value
|
||||
%% Descrip.: Send a query or a list of queries and wait for the result
|
||||
%% if running stand-alone (From = self()), but don't block
|
||||
%% the caller if we are not running stand-alone
|
||||
%% (From = gen_server From).
|
||||
%% Returns : ok | (non-stand-alone mode)
|
||||
%% {data, #mysql_result} | (stand-alone mode)
|
||||
%% {updated, #mysql_result} | (stand-alone mode)
|
||||
%% {error, #mysql_result} (stand-alone mode)
|
||||
%% FieldInfo = term()
|
||||
%% Rows = list() of [string()]
|
||||
%% Reason = term()
|
||||
%%--------------------------------------------------------------------
|
||||
sqlquery(Conn, Query) ->
|
||||
sqlquery(Conn, Query, ?CALL_TIMEOUT).
|
||||
|
||||
sqlquery(Conn, Query, Timeout) ->
|
||||
call(Conn, {sqlquery, Query}, Timeout).
|
||||
|
||||
prepare(Conn, Name, Stmt) ->
|
||||
call(Conn, {prepare, Name, Stmt}).
|
||||
|
||||
execute(Conn, Name, Params) ->
|
||||
execute(Conn, Name, Params, ?CALL_TIMEOUT).
|
||||
|
||||
execute(Conn, Name, Params, Timeout) ->
|
||||
call(Conn, {execute, Name, Params}, Timeout).
|
||||
|
||||
unprepare(Conn, Name) ->
|
||||
call(Conn, {unprepare, Name}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: init(Host, Port, User, Password, Database, Parent)
|
||||
%% Host = string()
|
||||
%% Port = integer()
|
||||
%% User = string()
|
||||
%% Password = string()
|
||||
%% Database = string()
|
||||
%% Parent = pid() of process starting this mysql_conn
|
||||
%% Descrip.: Connect to a MySQL server, log in and chooses a database.
|
||||
%% Report result of this to Parent, and then enter loop() if
|
||||
%% we were successfull.
|
||||
%% Returns : void() | does not return
|
||||
%%--------------------------------------------------------------------
|
||||
init([Id, Opts]) ->
|
||||
put(queries, 0),
|
||||
Host = get_value(host, Opts, "localhost"),
|
||||
Port = get_value(port, Opts, 3306),
|
||||
UserName = get_value(username, Opts, "root"),
|
||||
Password = get_value(password, Opts, "public"),
|
||||
Database = get_value(database, Opts),
|
||||
Encoding = get_value(encoding, Opts, utf8),
|
||||
case emysql_recv:start_link(Host, Port) of
|
||||
{ok, RecvPid, Sock} ->
|
||||
case mysql_init(Sock, RecvPid, UserName, Password) of
|
||||
{ok, Version} ->
|
||||
Db = iolist_to_binary(Database),
|
||||
case do_query(Sock, RecvPid, <<"use ", Db/binary>>, Version) of
|
||||
{error, #mysql_result{error = Error} = _MySQLRes} ->
|
||||
error_logger:error_msg("emysql_conn: use '~p' error: ~p", [Database, Error]),
|
||||
{stop, using_db_error};
|
||||
{_ResultType, _MySQLRes} ->
|
||||
emysql:pool(Id), %pool it
|
||||
pg2:create(emysql_conn),
|
||||
pg2:join(emysql_conn, self()),
|
||||
EncodingBinary = list_to_binary(atom_to_list(Encoding)),
|
||||
do_query(Sock, RecvPid, <<"set names '", EncodingBinary/binary, "'">>, Version),
|
||||
State = #state{
|
||||
id = Id,
|
||||
host = Host,
|
||||
port = Port,
|
||||
user = UserName,
|
||||
password = Password,
|
||||
database = Database,
|
||||
encoding = Encoding,
|
||||
mysql_version = Version,
|
||||
recv_pid = RecvPid,
|
||||
socket = Sock,
|
||||
data = <<>>},
|
||||
{ok, State}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{stop, {login_failed, Reason}}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{stop, Reason}
|
||||
end.
|
||||
|
||||
handle_call(info, _From, #state{id = Id} = State) ->
|
||||
Reply = {Id, self(), get(queries)},
|
||||
{reply, Reply, State};
|
||||
|
||||
handle_call({sqlquery, Query}, _From, #state{socket = Socket,
|
||||
recv_pid = RecvPid, mysql_version = Ver} = State) ->
|
||||
put(queries, get(queries) + 1),
|
||||
case do_query(Socket, RecvPid, Query, Ver) of
|
||||
{error, mysql_timeout} = Err ->
|
||||
{stop, mysql_timeout, Err, State};
|
||||
Res ->
|
||||
{reply, Res, State}
|
||||
end;
|
||||
|
||||
handle_call({prepare, Name, Stmt}, _From, #state{socket = Socket,
|
||||
recv_pid = RecvPid, mysql_version = Ver} = State) ->
|
||||
|
||||
case do_prepare(Socket, RecvPid, Name, Stmt, Ver) of
|
||||
{error, mysql_timeout} ->
|
||||
{stop, mysql_timeout, State};
|
||||
_ ->
|
||||
{reply, ok, State}
|
||||
end;
|
||||
|
||||
handle_call({unprepare, Name}, _From, #state{socket = Socket,
|
||||
recv_pid = RecvPid, mysql_version = Ver} = State) ->
|
||||
case do_unprepare(Socket, RecvPid, Name, Ver) of
|
||||
{error, mysql_timeout} ->
|
||||
{stop, mysql_timeout, State};
|
||||
_ ->
|
||||
{reply, ok, State}
|
||||
end;
|
||||
|
||||
handle_call({execute, Name, Params}, _From, #state{socket = Socket,
|
||||
recv_pid = RecvPid, mysql_version = Ver} = State) ->
|
||||
case do_execute(Socket, RecvPid, Name, Params, Ver) of
|
||||
{error, mysql_timeout} = Err ->
|
||||
{stop, mysql_timeout, Err, State};
|
||||
Res ->
|
||||
{reply, Res, State}
|
||||
end;
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
error_logger:error_msg("badreq to emysql_conn: ~p", [Req]),
|
||||
{reply, {error, badreq}, State}.
|
||||
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({mysql_recv, _RecvPid, data, _Packet, SeqNum}, State) ->
|
||||
error_logger:error_msg("unexpected mysql_recv: seq_num = ~p", [SeqNum]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info({mysql_recv, _RecvPid, closed, E}, State) ->
|
||||
error_logger:error_msg("mysql socket closed: ~p", [E]),
|
||||
{stop, socket_closed, State};
|
||||
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
do_queries(Sock, RecvPid, Queries, Version) ->
|
||||
catch
|
||||
lists:foldl(
|
||||
fun(Query, _LastResponse) ->
|
||||
case do_query(Sock, RecvPid, Query, Version) of
|
||||
{error, _} = Err -> throw(Err);
|
||||
Res -> Res
|
||||
end
|
||||
end, ok, Queries).
|
||||
|
||||
do_query(Sock, RecvPid, Query, Version) ->
|
||||
Query1 = iolist_to_binary(Query),
|
||||
%?DEBUG("sqlquery ~p (id ~p)", [Query1, RecvPid]),
|
||||
Packet = <<?MYSQL_QUERY_OP, Query1/binary>>,
|
||||
case do_send(Sock, Packet, 0) of
|
||||
ok ->
|
||||
get_query_response(RecvPid, Version);
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
do_prepare(Socket, RecvPid, Name, Stmt, Ver) ->
|
||||
NameBin = atom_to_binary(Name),
|
||||
StmtBin = <<"PREPARE ", NameBin/binary, " FROM '", Stmt/binary, "'">>,
|
||||
do_query(Socket, RecvPid, StmtBin, Ver).
|
||||
|
||||
do_execute(Socket, RecvPid, Name, Params, Ver) ->
|
||||
Stmts = make_statements(Name, Params),
|
||||
do_queries(Socket, RecvPid, Stmts, Ver).
|
||||
|
||||
do_unprepare(Socket, RecvPid, Name, Ver) ->
|
||||
NameBin = atom_to_binary(Name),
|
||||
StmtBin = <<"UNPREPARE ", NameBin/binary>>,
|
||||
do_query(Socket, RecvPid, StmtBin, Ver).
|
||||
|
||||
make_statements(Name, []) ->
|
||||
NameBin = atom_to_binary(Name),
|
||||
[<<"EXECUTE ", NameBin/binary>>];
|
||||
|
||||
make_statements(Name, Params) ->
|
||||
NumParams = length(Params),
|
||||
ParamNums = lists:seq(1, NumParams),
|
||||
NameBin = atom_to_binary(Name),
|
||||
ParamNames =
|
||||
lists:foldl(
|
||||
fun(Num, Acc) ->
|
||||
ParamName = [$@ | integer_to_list(Num)],
|
||||
if Num == 1 ->
|
||||
ParamName ++ Acc;
|
||||
true ->
|
||||
[$, | ParamName] ++ Acc
|
||||
end
|
||||
end, [], lists:reverse(ParamNums)),
|
||||
ParamNamesBin = list_to_binary(ParamNames),
|
||||
ExecStmt = <<"EXECUTE ", NameBin/binary, " USING ",
|
||||
ParamNamesBin/binary>>,
|
||||
|
||||
ParamVals = lists:zip(ParamNums, Params),
|
||||
Stmts = lists:foldl(
|
||||
fun({Num, Val}, Acc) ->
|
||||
NumBin = emysql:encode(Num, true),
|
||||
ValBin = emysql:encode(Val, true),
|
||||
[<<"SET @", NumBin/binary, "=", ValBin/binary>> | Acc]
|
||||
end, [ExecStmt], lists:reverse(ParamVals)),
|
||||
Stmts.
|
||||
|
||||
atom_to_binary(Val) ->
|
||||
<<_:4/binary, Bin/binary>> = term_to_binary(Val),
|
||||
Bin.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% authentication
|
||||
%%--------------------------------------------------------------------
|
||||
do_old_auth(Sock, RecvPid, SeqNum, User, Password, Salt1) ->
|
||||
Auth = emysql_auth:password_old(Password, Salt1),
|
||||
Packet = emysql_auth:make_auth(User, Auth),
|
||||
do_send(Sock, Packet, SeqNum),
|
||||
do_recv(RecvPid, SeqNum).
|
||||
|
||||
do_new_auth(Sock, RecvPid, SeqNum, User, Password, Salt1, Salt2) ->
|
||||
Auth = emysql_auth:password_new(Password, Salt1 ++ Salt2),
|
||||
Packet2 = emysql_auth:make_new_auth(User, Auth, none),
|
||||
do_send(Sock, Packet2, SeqNum),
|
||||
case do_recv(RecvPid, SeqNum) of
|
||||
{ok, Packet3, SeqNum2} ->
|
||||
case Packet3 of
|
||||
<<254:8>> ->
|
||||
AuthOld = emysql_auth:password_old(Password, Salt1),
|
||||
do_send(Sock, <<AuthOld/binary, 0:8>>, SeqNum2 + 1),
|
||||
do_recv(RecvPid, SeqNum2 + 1);
|
||||
_ ->
|
||||
{ok, Packet3, SeqNum2}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: mysql_init(Sock, RecvPid, User, Password)
|
||||
%% Sock = term(), gen_tcp socket
|
||||
%% RecvPid = pid(), mysql_recv process
|
||||
%% User = string()
|
||||
%% Password = string()
|
||||
%% LogFun = undefined | function() with arity 3
|
||||
%% Descrip.: Try to authenticate on our new socket.
|
||||
%% Returns : ok | {error, Reason}
|
||||
%% Reason = string()
|
||||
%%--------------------------------------------------------------------
|
||||
mysql_init(Sock, RecvPid, User, Password) ->
|
||||
case do_recv(RecvPid, undefined) of
|
||||
{ok, Packet, InitSeqNum} ->
|
||||
{Version, Salt1, Salt2, Caps} = greeting(Packet),
|
||||
%?DEBUG("version: ~p, ~p, ~p, ~p", [Version, Salt1, Salt2, Caps]),
|
||||
AuthRes =
|
||||
case Caps band ?SECURE_CONNECTION of
|
||||
?SECURE_CONNECTION ->
|
||||
do_new_auth(Sock, RecvPid, InitSeqNum + 1, User, Password, Salt1, Salt2);
|
||||
_ ->
|
||||
do_old_auth(Sock, RecvPid, InitSeqNum + 1, User, Password, Salt1)
|
||||
end,
|
||||
case AuthRes of
|
||||
{ok, <<0:8, _Rest/binary>>, _RecvNum} ->
|
||||
{ok,Version};
|
||||
{ok, <<255:8, _Code:16/little, Message/binary>>, _RecvNum} ->
|
||||
{error, binary_to_list(Message)};
|
||||
{ok, RecvPacket, _RecvNum} ->
|
||||
{error, binary_to_list(RecvPacket)};
|
||||
{error, Reason} ->
|
||||
%?ERROR("init failed receiving data : ~p", [Reason]),
|
||||
{error, Reason}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
greeting(Packet) ->
|
||||
<<_Protocol:8, Rest/binary>> = Packet,
|
||||
{Version, Rest2} = asciz(Rest),
|
||||
<<_TreadID:32/little, Rest3/binary>> = Rest2,
|
||||
{Salt, Rest4} = asciz(Rest3),
|
||||
<<Caps:16/little, Rest5/binary>> = Rest4,
|
||||
<<_ServerChar:16/binary-unit:8, Rest6/binary>> = Rest5,
|
||||
{Salt2, _Rest7} = asciz(Rest6),
|
||||
%?DEBUG("greeting version ~p (protocol ~p) salt ~p caps ~p serverchar ~p"
|
||||
%"salt2 ~p",
|
||||
%[Version, Protocol, Salt, Caps, ServerChar, Salt2]),
|
||||
{normalize_version(Version), Salt, Salt2, Caps}.
|
||||
|
||||
%% part of greeting/2
|
||||
asciz(Data) when is_binary(Data) ->
|
||||
asciz_binary(Data, []);
|
||||
asciz(Data) when is_list(Data) ->
|
||||
{String, [0 | Rest]} = lists:splitwith(fun (C) ->
|
||||
C /= 0
|
||||
end, Data),
|
||||
{String, Rest}.
|
||||
|
||||
%% @doc Find the first zero-byte in Data and add everything before it
|
||||
%% to Acc, as a string.
|
||||
%%
|
||||
%% @spec asciz_binary(Data::binary(), Acc::list()) ->
|
||||
%% {NewList::list(), Rest::binary()}
|
||||
asciz_binary(<<>>, Acc) ->
|
||||
{lists:reverse(Acc), <<>>};
|
||||
asciz_binary(<<0:8, Rest/binary>>, Acc) ->
|
||||
{lists:reverse(Acc), Rest};
|
||||
asciz_binary(<<C:8, Rest/binary>>, Acc) ->
|
||||
asciz_binary(Rest, [C | Acc]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: get_query_response(RecvPid)
|
||||
%% RecvPid = pid(), mysql_recv process
|
||||
%% Version = integer(), Representing MySQL version used
|
||||
%% Descrip.: Wait for frames until we have a complete query response.
|
||||
%% Returns : {data, #mysql_result}
|
||||
%% {updated, #mysql_result}
|
||||
%% {error, #mysql_result}
|
||||
%% FieldInfo = list() of term()
|
||||
%% Rows = list() of [string()]
|
||||
%% AffectedRows = int()
|
||||
%% Reason = term()
|
||||
%%--------------------------------------------------------------------
|
||||
get_query_response(RecvPid, Version) ->
|
||||
case do_recv(RecvPid, undefined) of
|
||||
{ok, <<Fieldcount:8, Rest/binary>>, _} ->
|
||||
case Fieldcount of
|
||||
0 ->
|
||||
%% No Tabular data
|
||||
{AffectedRows, Rest1} = decode_length_binary(Rest),
|
||||
{InsertId, _} = decode_length_binary(Rest1),
|
||||
{updated, #mysql_result{insert_id = InsertId, affectedrows=AffectedRows}};
|
||||
255 ->
|
||||
<<_Code:16/little, Message/binary>> = Rest,
|
||||
{error, #mysql_result{error=Message}};
|
||||
_ ->
|
||||
%% Tabular data received
|
||||
case get_fields(RecvPid, [], Version) of
|
||||
{ok, Fields} ->
|
||||
case get_rows(Fields, RecvPid, []) of
|
||||
{ok, Rows} ->
|
||||
{data, #mysql_result{fieldinfo=Fields,
|
||||
rows=Rows}};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
decode_length_binary(<<Len:8, Rest/binary>>) ->
|
||||
if
|
||||
Len =< 251 ->
|
||||
{Len, Rest};
|
||||
Len == 252 -> %two bytes
|
||||
<<Val:16/little, Rest1/binary>> = Rest,
|
||||
{Val, Rest1};
|
||||
Len == 253 -> %three
|
||||
<<Val:24/little, Rest1/binary>> = Rest,
|
||||
{Val, Rest1};
|
||||
Len == 254 -> %eight
|
||||
<<Val:64/little, Rest1/binary>> = Rest,
|
||||
{Val, Rest1};
|
||||
true ->
|
||||
%?ERROR("affectedrows: ~p", [Len]),
|
||||
{0, Rest}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: do_recv(RecvPid, SeqNum)
|
||||
%% RecvPid = pid(), mysql_recv process
|
||||
%% SeqNum = undefined | integer()
|
||||
%% Descrip.: Wait for a frame decoded and sent to us by RecvPid.
|
||||
%% Either wait for a specific frame if SeqNum is an integer,
|
||||
%% or just any frame if SeqNum is undefined.
|
||||
%% Returns : {ok, Packet, Num} |
|
||||
%% {error, Reason}
|
||||
%% Reason = term()
|
||||
%%
|
||||
%% Note : Only to be used externally by the 'mysql_auth' module.
|
||||
%%--------------------------------------------------------------------
|
||||
do_recv(RecvPid, SeqNum) when SeqNum == undefined ->
|
||||
receive
|
||||
{mysql_recv, RecvPid, data, Packet, Num} ->
|
||||
{ok, Packet, Num};
|
||||
{mysql_recv, RecvPid, closed, _E} ->
|
||||
{error, socket_closed}
|
||||
after ?CONNECT_TIMEOUT ->
|
||||
{error, mysql_timeout}
|
||||
end;
|
||||
|
||||
do_recv(RecvPid, SeqNum) when is_integer(SeqNum) ->
|
||||
ResponseNum = SeqNum + 1,
|
||||
receive
|
||||
{mysql_recv, RecvPid, data, Packet, ResponseNum} ->
|
||||
{ok, Packet, ResponseNum};
|
||||
{mysql_recv, RecvPid, closed, _E} ->
|
||||
{error, socket_closed}
|
||||
after ?CONNECT_TIMEOUT ->
|
||||
{error, mysql_timeout}
|
||||
end.
|
||||
|
||||
call(Conn, Req) ->
|
||||
gen_server:call(Conn, Req).
|
||||
|
||||
call(Conn, Req, Timeout) ->
|
||||
gen_server:call(Conn, Req, Timeout).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: get_fields(RecvPid, [], Version)
|
||||
%% RecvPid = pid(), mysql_recv process
|
||||
%% Version = integer(), Representing MySQL version used
|
||||
%% Descrip.: Received and decode field information.
|
||||
%% Returns : {ok, FieldInfo} |
|
||||
%% {error, Reason}
|
||||
%% FieldInfo = list() of term()
|
||||
%% Reason = term()
|
||||
%%--------------------------------------------------------------------
|
||||
%% Support for MySQL 4.0.x:
|
||||
get_fields(RecvPid, Res, ?MYSQL_4_0) ->
|
||||
case do_recv(RecvPid, undefined) of
|
||||
{ok, Packet, _Num} ->
|
||||
case Packet of
|
||||
<<254:8>> ->
|
||||
{ok, lists:reverse(Res)};
|
||||
<<254:8, Rest/binary>> when size(Rest) < 8 ->
|
||||
{ok, lists:reverse(Res)};
|
||||
_ ->
|
||||
{Table, Rest} = get_with_length(Packet),
|
||||
{Field, Rest2} = get_with_length(Rest),
|
||||
{LengthB, Rest3} = get_with_length(Rest2),
|
||||
LengthL = size(LengthB) * 8,
|
||||
<<Length:LengthL/little>> = LengthB,
|
||||
{Type, Rest4} = get_with_length(Rest3),
|
||||
{_Flags, _Rest5} = get_with_length(Rest4),
|
||||
This = {Table,
|
||||
Field,
|
||||
Length,
|
||||
%% TODO: Check on MySQL 4.0 if types are specified
|
||||
%% using the same 4.1 formalism and could
|
||||
%% be expanded to atoms:
|
||||
Type},
|
||||
get_fields(RecvPid, [This | Res], ?MYSQL_4_0)
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end;
|
||||
%% Support for MySQL 4.1.x and 5.x:
|
||||
get_fields(RecvPid, Res, ?MYSQL_4_1) ->
|
||||
case do_recv(RecvPid, undefined) of
|
||||
{ok, Packet, _Num} ->
|
||||
case Packet of
|
||||
<<254:8>> ->
|
||||
{ok, lists:reverse(Res)};
|
||||
<<254:8, Rest/binary>> when size(Rest) < 8 ->
|
||||
{ok, lists:reverse(Res)};
|
||||
_ ->
|
||||
{_Catalog, Rest} = get_with_length(Packet),
|
||||
{_Database, Rest2} = get_with_length(Rest),
|
||||
{Table, Rest3} = get_with_length(Rest2),
|
||||
%% OrgTable is the real table name if Table is an alias
|
||||
{_OrgTable, Rest4} = get_with_length(Rest3),
|
||||
{Field, Rest5} = get_with_length(Rest4),
|
||||
%% OrgField is the real field name if Field is an alias
|
||||
{_OrgField, Rest6} = get_with_length(Rest5),
|
||||
|
||||
<<_Metadata:8/little, _Charset:16/little,
|
||||
Length:32/little, Type:8/little,
|
||||
_Flags:16/little, _Decimals:8/little,
|
||||
_Rest7/binary>> = Rest6,
|
||||
|
||||
This = {Table,
|
||||
Field,
|
||||
Length,
|
||||
get_field_datatype(Type)},
|
||||
get_fields(RecvPid, [This | Res], ?MYSQL_4_1)
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: get_rows(N, RecvPid, [])
|
||||
%% N = integer(), number of rows to get
|
||||
%% RecvPid = pid(), mysql_recv process
|
||||
%% Descrip.: Receive and decode a number of rows.
|
||||
%% Returns : {ok, Rows} |
|
||||
%% {error, Reason}
|
||||
%% Rows = list() of [string()]
|
||||
%%--------------------------------------------------------------------
|
||||
get_rows(Fields, RecvPid, Res) ->
|
||||
case do_recv(RecvPid, undefined) of
|
||||
{ok, Packet, _Num} ->
|
||||
case Packet of
|
||||
<<254:8, Rest/binary>> when size(Rest) < 8 ->
|
||||
{ok, lists:reverse(Res)};
|
||||
_ ->
|
||||
{ok, This} = get_row(Fields, Packet, []),
|
||||
get_rows(Fields, RecvPid, [This | Res])
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%% part of get_rows/4
|
||||
get_row([], _Data, Res) ->
|
||||
{ok, lists:reverse(Res)};
|
||||
get_row([Field | OtherFields], Data, Res) ->
|
||||
{Col, Rest} = get_with_length(Data),
|
||||
This = case Col of
|
||||
null ->
|
||||
undefined;
|
||||
_ ->
|
||||
convert_type(Col, element(4, Field))
|
||||
end,
|
||||
get_row(OtherFields, Rest, [This | Res]).
|
||||
|
||||
get_with_length(<<251:8, Rest/binary>>) ->
|
||||
{null, Rest};
|
||||
get_with_length(<<252:8, Length:16/little, Rest/binary>>) ->
|
||||
split_binary(Rest, Length);
|
||||
get_with_length(<<253:8, Length:24/little, Rest/binary>>) ->
|
||||
split_binary(Rest, Length);
|
||||
get_with_length(<<254:8, Length:64/little, Rest/binary>>) ->
|
||||
split_binary(Rest, Length);
|
||||
get_with_length(<<Length:8, Rest/binary>>) when Length < 251 ->
|
||||
split_binary(Rest, Length).
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: do_send(Sock, Packet, SeqNum)
|
||||
%% Sock = term(), gen_tcp socket
|
||||
%% Packet = binary()
|
||||
%% SeqNum = integer(), packet sequence number
|
||||
%% Descrip.: Send a packet to the MySQL server.
|
||||
%% Returns : result of gen_tcp:send/2
|
||||
%%--------------------------------------------------------------------
|
||||
do_send(Sock, Packet, SeqNum) when is_binary(Packet), is_integer(SeqNum) ->
|
||||
Data = <<(size(Packet)):24/little, SeqNum:8, Packet/binary>>,
|
||||
gen_tcp:send(Sock, Data).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: normalize_version(Version)
|
||||
%% Version = string()
|
||||
%% Descrip.: Return a flag corresponding to the MySQL version used.
|
||||
%% The protocol used depends on this flag.
|
||||
%% Returns : Version = string()
|
||||
%%--------------------------------------------------------------------
|
||||
normalize_version([$4,$.,$0|_T]) ->
|
||||
%?DEBUG("switching to MySQL 4.0.x protocol.", []),
|
||||
?MYSQL_4_0;
|
||||
normalize_version([$4,$.,$1|_T]) ->
|
||||
?MYSQL_4_1;
|
||||
normalize_version([$5|_T]) ->
|
||||
%% MySQL version 5.x protocol is compliant with MySQL 4.1.x:
|
||||
?MYSQL_4_1;
|
||||
normalize_version([$6|_T]) ->
|
||||
%% MySQL version 6.x protocol is compliant with MySQL 4.1.x:
|
||||
?MYSQL_4_1;
|
||||
normalize_version(_Other) ->
|
||||
%?ERROR("MySQL version '~p' not supported: MySQL Erlang module "
|
||||
% "might not work correctly.", [Other]),
|
||||
%% Error, but trying the oldest protocol anyway:
|
||||
?MYSQL_4_0.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: get_field_datatype(DataType)
|
||||
%% DataType = integer(), MySQL datatype
|
||||
%% Descrip.: Return MySQL field datatype as description string
|
||||
%% Returns : String, MySQL datatype
|
||||
%%--------------------------------------------------------------------
|
||||
get_field_datatype(0) -> 'DECIMAL';
|
||||
get_field_datatype(1) -> 'TINY';
|
||||
get_field_datatype(2) -> 'SHORT';
|
||||
get_field_datatype(3) -> 'LONG';
|
||||
get_field_datatype(4) -> 'FLOAT';
|
||||
get_field_datatype(5) -> 'DOUBLE';
|
||||
get_field_datatype(6) -> 'NULL';
|
||||
get_field_datatype(7) -> 'TIMESTAMP';
|
||||
get_field_datatype(8) -> 'LONGLONG';
|
||||
get_field_datatype(9) -> 'INT24';
|
||||
get_field_datatype(10) -> 'DATE';
|
||||
get_field_datatype(11) -> 'TIME';
|
||||
get_field_datatype(12) -> 'DATETIME';
|
||||
get_field_datatype(13) -> 'YEAR';
|
||||
get_field_datatype(14) -> 'NEWDATE';
|
||||
get_field_datatype(246) -> 'NEWDECIMAL';
|
||||
get_field_datatype(247) -> 'ENUM';
|
||||
get_field_datatype(248) -> 'SET';
|
||||
get_field_datatype(249) -> 'TINYBLOB';
|
||||
get_field_datatype(250) -> 'MEDIUM_BLOG';
|
||||
get_field_datatype(251) -> 'LONG_BLOG';
|
||||
get_field_datatype(252) -> 'BLOB';
|
||||
get_field_datatype(253) -> 'VAR_STRING';
|
||||
get_field_datatype(254) -> 'STRING';
|
||||
get_field_datatype(255) -> 'GEOMETRY'.
|
||||
|
||||
convert_type(Val, ColType) ->
|
||||
case ColType of
|
||||
T when T == 'TINY';
|
||||
T == 'SHORT';
|
||||
T == 'LONG';
|
||||
T == 'LONGLONG';
|
||||
T == 'INT24';
|
||||
T == 'YEAR' ->
|
||||
list_to_integer(binary_to_list(Val));
|
||||
T when T == 'TIMESTAMP';
|
||||
T == 'DATETIME' ->
|
||||
{ok, [Year, Month, Day, Hour, Minute, Second], _Leftovers} =
|
||||
io_lib:fread("~d-~d-~d ~d:~d:~d", binary_to_list(Val)),
|
||||
{datetime, {{Year, Month, Day}, {Hour, Minute, Second}}};
|
||||
'TIME' ->
|
||||
{ok, [Hour, Minute, Second], _Leftovers} =
|
||||
io_lib:fread("~d:~d:~d", binary_to_list(Val)),
|
||||
{time, {Hour, Minute, Second}};
|
||||
'DATE' ->
|
||||
{ok, [Year, Month, Day], _Leftovers} =
|
||||
io_lib:fread("~d-~d-~d", binary_to_list(Val)),
|
||||
{date, {Year, Month, Day}};
|
||||
T when T == 'DECIMAL';
|
||||
T == 'NEWDECIMAL';
|
||||
T == 'FLOAT';
|
||||
T == 'DOUBLE' ->
|
||||
{ok, [Num], _Leftovers} =
|
||||
case io_lib:fread("~f", binary_to_list(Val)) of
|
||||
{error, _} ->
|
||||
io_lib:fread("~d", binary_to_list(Val));
|
||||
Res ->
|
||||
Res
|
||||
end,
|
||||
Num;
|
||||
_Other ->
|
||||
Val
|
||||
end.
|
|
@ -1,130 +0,0 @@
|
|||
%%%-------------------------------------------------------------------
|
||||
%%% File : emysql_recv.erl
|
||||
%%% Author : Fredrik Thulin <ft@it.su.se>
|
||||
%%% Descrip.: Handles data being received on a MySQL socket. Decodes
|
||||
%%% per-row framing and sends each row to parent.
|
||||
%%%
|
||||
%%% Created : 4 Aug 2005 by Fredrik Thulin <ft@it.su.se>
|
||||
%%%
|
||||
%%% Note : All MySQL code was written by Magnus Ahltorp, originally
|
||||
%%% in the file mysql.erl - I just moved it here.
|
||||
%%%
|
||||
%%% Copyright (c) 2001-2004 Kungliga Tekniska
|
||||
%%% See the file COPYING
|
||||
%%%
|
||||
%%% Signals this receiver process can send to it's parent
|
||||
%%% (the parent is a mysql_conn connection handler) :
|
||||
%%%
|
||||
%%% {mysql_recv, self(), data, Packet, Num}
|
||||
%%% {mysql_recv, self(), closed, {error, Reason}}
|
||||
%%% {mysql_recv, self(), closed, normal}
|
||||
%%%
|
||||
%%% Internally (from inside init/4 to start_link/4) the
|
||||
%%% following signals may be sent to the parent process :
|
||||
%%%
|
||||
%%% {mysql_recv, self(), init, {ok, Sock}}
|
||||
%%% {mysql_recv, self(), init, {error, E}}
|
||||
%%%
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(emysql_recv).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% External exports (should only be used by the 'mysql_conn' module)
|
||||
%%--------------------------------------------------------------------
|
||||
-export([start_link/2]).
|
||||
|
||||
%callback
|
||||
-export([init/3]).
|
||||
|
||||
-record(state, {
|
||||
socket,
|
||||
parent,
|
||||
log_fun,
|
||||
data}).
|
||||
|
||||
-define(SECURE_CONNECTION, 32768).
|
||||
|
||||
-define(CONNECT_TIMEOUT, 10000).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: start_link(Host, Port, Parent)
|
||||
%% Host = string()
|
||||
%% Port = integer()
|
||||
%% Parent = pid(), process that should get received frames
|
||||
%% Descrip.: Start a process that connects to Host:Port and waits for
|
||||
%% data. When it has received a MySQL frame, it sends it to
|
||||
%% Parent and waits for the next frame.
|
||||
%% Returns : {ok, RecvPid, Socket} |
|
||||
%% {error, Reason}
|
||||
%% RecvPid = pid(), receiver process pid
|
||||
%% Socket = term(), gen_tcp socket
|
||||
%% Reason = atom() | string()
|
||||
%%--------------------------------------------------------------------
|
||||
start_link(Host, Port) ->
|
||||
proc_lib:start_link(?MODULE, init, [self(), Host, Port]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: init((Host, Port, Parent)
|
||||
%% Host = string()
|
||||
%% Port = integer()
|
||||
%% Parent = pid(), process that should get received frames
|
||||
%% Descrip.: Connect to Host:Port and then enter receive-loop.
|
||||
%% Returns : error | never returns
|
||||
%%--------------------------------------------------------------------
|
||||
init(Parent, Host, Port) ->
|
||||
case gen_tcp:connect(Host, Port, [binary, {packet, 0}]) of
|
||||
{ok, Sock} ->
|
||||
proc_lib:init_ack(Parent, {ok, self(), Sock}),
|
||||
loop(#state{socket = Sock, parent = Parent, data = <<>>});
|
||||
{error, Reason} ->
|
||||
proc_lib:init_ack(Parent, {error, Reason})
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: loop(State)
|
||||
%% State = state record()
|
||||
%% Descrip.: The main loop. Wait for data from our TCP socket and act
|
||||
%% on received data or signals that our socket was closed.
|
||||
%% Returns : error | never returns
|
||||
%%--------------------------------------------------------------------
|
||||
loop(State) ->
|
||||
Sock = State#state.socket,
|
||||
receive
|
||||
{tcp, Sock, InData} ->
|
||||
NewData = list_to_binary([State#state.data, InData]),
|
||||
%% send data to parent if we have enough data
|
||||
Rest = sendpacket(State#state.parent, NewData),
|
||||
loop(State#state{data = Rest});
|
||||
{tcp_error, Sock, Reason} ->
|
||||
State#state.parent ! {mysql_recv, self(), closed, {error, Reason}},
|
||||
error;
|
||||
{tcp_closed, Sock} ->
|
||||
State#state.parent ! {mysql_recv, self(), closed, normal},
|
||||
error;
|
||||
_Other -> %maybe system message
|
||||
loop(State)
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: sendpacket(Parent, Data)
|
||||
%% Parent = pid()
|
||||
%% Data = binary()
|
||||
%% Descrip.: Check if we have received one or more complete frames by
|
||||
%% now, and if so - send them to Parent.
|
||||
%% Returns : Rest = binary()
|
||||
%%--------------------------------------------------------------------
|
||||
%% send data to parent if we have enough data
|
||||
sendpacket(Parent, Data) ->
|
||||
case Data of
|
||||
<<Length:24/little, Num:8, D/binary>> ->
|
||||
if
|
||||
Length =< size(D) ->
|
||||
{Packet, Rest} = split_binary(D, Length),
|
||||
Parent ! {mysql_recv, self(), data, Packet, Num},
|
||||
sendpacket(Parent, Rest);
|
||||
true ->
|
||||
Data
|
||||
end;
|
||||
_ ->
|
||||
Data
|
||||
end.
|
|
@ -1,34 +0,0 @@
|
|||
%%%----------------------------------------------------------------------
|
||||
%%% File : emysql_sup.erl
|
||||
%%% Author : Ery Lee
|
||||
%%% Purpose : Mysql driver supervisor
|
||||
%%% Created : 21 May 2009
|
||||
%%% Updated : 11 Jan 2010
|
||||
%%% License : http://www.opengoss.com
|
||||
%%%
|
||||
%%% Copyright (C) 2012, www.opengoss.com
|
||||
%%%----------------------------------------------------------------------
|
||||
-module(emysql_sup).
|
||||
|
||||
-author('ery.lee@gmail.com').
|
||||
|
||||
-behavior(supervisor).
|
||||
|
||||
%% API
|
||||
-export([start_link/1, init/1]).
|
||||
|
||||
start_link(Opts) ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, Opts).
|
||||
|
||||
init(Opts) ->
|
||||
PoolSize = proplists:get_value(pool, Opts,
|
||||
erlang:system_info(schedulers)),
|
||||
{ok, {{one_for_one, 10, 10},
|
||||
[{emysql, {emysql, start_link, [PoolSize]}, transient,
|
||||
16#ffffffff, worker, [emysql]} |
|
||||
[{I, {emysql_conn, start_link, [I, Opts]}, transient, 16#ffffffff,
|
||||
worker, [emysql_conn, emysql_recv]} || I <- lists:seq(1, PoolSize)]]
|
||||
}
|
||||
}.
|
||||
|
||||
|
Loading…
Reference in New Issue